Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Error handling operatorsโ€‹

This page lists all error handling operations, available in the core package of ro.

  • Catches errors on the observable to be handled by returning a new observable.

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, errors.New("number 3 is not allowed")
    }
    return i * 2, nil
    }),
    ro.Catch(func(err error) ro.Observable[int] {
    fmt.Printf("Error: %v\n", err)
    return ro.Just(99) // Fallback value
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 2 (1*2)
    // Next: 4 (2*2)
    // Error: number 3 is not allowed
    // Next: 99 (fallback value)
    // Completed

    With retry logic

    attempt := 0
    obs := ro.Pipe(
    ro.Defer(func() ro.Observable[int] {
    attempt++
    if attempt <= 2 {
    return ro.Pipe(
    ro.Just(1),
    ro.Throw[int](errors.New("network error")),
    )
    }
    return ro.Just(42)
    }),
    ro.Catch(func(err error) ro.Observable[int] {
    fmt.Printf("Attempt %d failed: %v\n", attempt, err)
    if attempt < 3 {
    return ro.Empty[int]() // Stop this attempt, allow retry
    }
    return ro.Just(-1) // Final fallback
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Attempt 1 failed: network error
    // Attempt 2 failed: network error
    // Next: 42 (success on 3rd attempt)
    // Completed

    With different error types

    obs := ro.Pipe(
    ro.Just("data1", "data2", "invalid"),
    ro.MapErr(func(s string) (string, error) {
    if s == "invalid" {
    return "", errors.New("invalid data")
    }
    return strings.ToUpper(s), nil
    }),
    ro.Catch(func(err error) ro.Observable[string] {
    if strings.Contains(err.Error(), "invalid") {
    return ro.Just("DEFAULT") // Handle validation errors
    }
    return ro.Throw[string](err) // Re-throw other errors
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "DATA1"
    // Next: "DATA2"
    // Next: "DEFAULT"
    // Completed

    With logging and fallback sequence

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4),
    ro.MapErr(func(i int) (int, error) {
    if i%2 == 0 {
    return 0, fmt.Errorf("even number %d rejected", i)
    }
    return i, nil
    }),
    ro.Catch(func(err error) ro.Observable[int] {
    log.Printf("Error caught: %v", err)
    // Provide fallback sequence
    return ro.FromSlice([]int{100, 200, 300})
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Error caught: even number 2 rejected
    // Next: 100
    // Next: 200
    // Next: 300
    // Completed
    Prototype:
    func Catch[T any](finally func(err error) Observable[T])
  • Retries the source observable sequence when it encounters an error. Retry uses infinite retries with default settings, while RetryWithConfig provides configurable retry behavior.

    attempt := 0
    obs := ro.Pipe(
    ro.Defer(func() Observable[string] {
    attempt++
    if attempt < 3 {
    return ro.Pipe(
    ro.Just("data"),
    ro.Throw[string](errors.New("temporary failure")),
    )
    }
    return ro.Just("success!")
    }),
    ro.Retry[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(100 * time.Millisecond) // Allow time for retries
    sub.Unsubscribe()

    // Next: "success!" (after 3 attempts)
    // Completed

    RetryWithConfig with limited retries

    attempt := 0
    obs := ro.Pipe(
    ro.Defer(func() Observable[int] {
    attempt++
    if attempt == 1 {
    return ro.Throw[int](errors.New("first attempt failed"))
    }
    return ro.Just(42)
    }),
    ro.RetryWithConfig[int](RetryConfig{
    MaxRetries: 3,
    Delay: 100 * time.Millisecond,
    ResetOnSuccess: true,
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 42 (success on second attempt)
    // Completed

    With exponential backoff

    obs := ro.Pipe(
    ro.Defer(func() Observable[string] {
    return ro.Pipe(
    ro.Just("api_data"),
    ro.Throw[string](errors.New("rate limited")),
    )
    }),
    ro.RetryWithConfig[string](RetryConfig{
    MaxRetries: 5,
    Delay: 1 * time.Second,
    ResetOnSuccess: true,
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(6 * time.Second)
    sub.Unsubscribe()

    // Would retry up to 5 times with 1-second delays
    // (assuming API continues to fail)

    With ResetOnSuccess behavior

    successCount := 0
    obs := ro.Pipe(
    ro.Defer(func() Observable[int] {
    successCount++
    if successCount <= 2 {
    return ro.Just(successCount)
    }
    return ro.Throw[int](errors.New("suddenly failed"))
    }),
    ro.RetryWithConfig[int](RetryConfig{
    MaxRetries: 3,
    Delay: 50 * time.Millisecond,
    ResetOnSuccess: true, // Success resets retry counter
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 1 (success, counter resets)
    // Next: 2 (success, counter resets)
    // Next: 3 (success, counter resets)
    // (would retry 3 times after failure since counter reset after each success)

    Network request simulation

    type Response struct {
    Data string
    Err error
    }

    simulateAPICall := func() Observable[Response] {
    return ro.Defer(func() Observable[Response] {
    // Simulate intermittent network failures
    if rand.Intn(5) != 0 { // 80% failure rate
    return ro.Just(Response{Err: errors.New("network timeout")})
    }
    return ro.Just(Response{Data: "api_response"})
    })
    }

    obs := ro.Pipe(
    simulateAPICall(),
    ro.RetryWithConfig[Response](RetryConfig{
    MaxRetries: 10,
    Delay: 200 * time.Millisecond,
    ResetOnSuccess: true,
    }),
    ro.Map(func(r Response) string {
    if r.Err != nil {
    return "error: " + r.Err.Error()
    }
    return "success: " + r.Data
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(3 * time.Second)
    sub.Unsubscribe()

    // Will keep retrying until successful or max retries reached
    // Expected: "success: api_response" (eventually)
    Prototypes:
    func Retry[T any]()
    func RetryWithConfig[T any](opts RetryConfig)
  • ThrowIfEmptyโ€‹

    Throws an error if the source observable is empty, otherwise emits all items normally.

    obs := ro.Pipe(
    ro.Empty[int](),
    ThrowIfro.Empty[int](func() error {
    return errors.New("no data available")
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Error: no data available

    With data present

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ThrowIfro.Empty[int](func() error {
    return errors.New("this won't be thrown")
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed (no error thrown)

    With filtered data

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Filter(func(i int) bool {
    return i > 10 // No items match
    }),
    ThrowIfro.Empty[int](func() error {
    return errors.New("no items found matching criteria")
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Error: no items found matching criteria

    With API response validation

    type User struct {
    ID int
    Name string
    }

    fetchUsers := func() Observable[User] {
    // Simulate empty API response
    return ro.FromSlice([]User{})
    }

    obs := ro.Pipe(
    fetchUsers(),
    ThrowIfro.Empty[User](func() error {
    return errors.New("no users found in database")
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[User]())
    defer sub.Unsubscribe()

    // Error: no users found in database

    With conditional throwing

    shouldThrowError := true
    obs := ro.Pipe(
    ro.Empty[string](),
    ThrowIfro.Empty[string](func() error {
    if shouldThrowError {
    return fmt.Errorf("empty sequence not allowed at %v", time.Now())
    }
    return nil // No error
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: empty sequence not allowed at [current time]

    With retry mechanism

    attempt := 0
    getData := func() Observable[int] {
    attempt++
    if attempt < 3 {
    return ro.Empty[int]() // Simulate empty response
    }
    return ro.Just(42) // Success on third attempt
    }

    obs := ro.Pipe(
    ro.Defer(getData),
    ThrowIfro.Empty[int](func() error {
    return fmt.Errorf("attempt %d: no data available", attempt)
    }),
    ro.RetryWithConfig[int](RetryConfig{
    MaxRetries: 5,
    Delay: 100 * time.Millisecond,
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Error: attempt 1: no data available
    // Error: attempt 2: no data available
    // Next: 42 (success on third attempt)
    // Completed
    Prototype:
    func ThrowIfro.Empty[T any](throw func() error)
  • OnErrorResumeNextWithโ€‹

    Begins emitting a second observable sequence if it encounters an error with the first observable.

    primary := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, errors.New("error occurred")
    }
    return i, nil
    }),
    )

    fallback := ro.Just(99, 100, 101)

    obs := ro.Pipe(primary, ro.OnErrorResumeNextWith(fallback))

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 99 (fallback starts)
    // Next: 100
    // Next: 101
    // Completed

    With multiple fallback sequences

    primary := ro.Pipe(
    ro.Just("data1", "data2"),
    ro.Throw[string](errors.New("primary failed")),
    )

    fallback1 := ro.Just("fallback1", "fallback2")
    fallback2 := ro.Just("final1", "final2")

    obs := ro.Pipe(primary, ro.OnErrorResumeNextWith(fallback1, fallback2))

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "data1"
    // Next: "data2"
    // Next: "fallback1"
    // Next: "fallback2"
    // (fallback2 is ignored, only first fallback is used)

    With empty fallback

    primary := ro.Throw[int](errors.New("always fails"))
    fallback := ro.Empty[int]()

    obs := ro.Pipe(primary, ro.OnErrorResumeNextWith(fallback))

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed (no items, no error)

    API fallback pattern

    primaryAPI := func() Observable[string] {
    return ro.Pipe(
    ro.Just("user_data"),
    ro.Throw[string](errors.New("API timeout")),
    )
    }

    cacheAPI := func() Observable[string] {
    return ro.Just("cached_data")
    }

    obs := ro.Pipe(
    primaryAPI(),
    ro.OnErrorResumeNextWith(cacheAPI()),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "user_data" (from primary before error)
    // Next: "cached_data" (from fallback)
    // Completed

    Database connection fallback

    connectPrimary := func() Observable[string] {
    // Simulate primary database failure
    return ro.Throw[string](errors.New("primary database unavailable"))
    }

    connectSecondary := func() Observable[string] {
    return ro.Just("connected to secondary database")
    }

    obs := ro.Pipe(
    connectPrimary(),
    ro.OnErrorResumeNextWith(connectSecondary()),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "connected to secondary database"
    // Completed

    With conditional fallback

    shouldUseFallback := true
    primary := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 && shouldUseFallback {
    return 0, errors.New("switch to fallback")
    }
    return i, nil
    }),
    )

    fallback := ro.Pipe(
    ro.Just(4, 5, 6),
    ro.Map(func(i int) int {
    return i * 10
    }),
    )

    obs := ro.Pipe(primary, ro.OnErrorResumeNextWith(fallback))

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 40 (fallback: 4*10)
    // Next: 50 (fallback: 5*10)
    // Next: 60 (fallback: 6*10)
    // Completed
    Prototype:
    func OnErrorResumeNextWith[T any](finally ...Observable[T])
  • OnErrorReturnโ€‹

    Emits a particular item when it encounters an error, then completes.

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, errors.New("something went wrong")
    }
    return i, nil
    }),
    ro.OnErrorReturn(-1),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: -1 (error fallback)
    // Completed

    With string fallback

    obs := ro.Pipe(
    ro.Just("apple", "banana", "invalid"),
    ro.MapErr(func(s string) (string, error) {
    if s == "invalid" {
    return "", errors.New("invalid fruit")
    }
    return strings.ToUpper(s), nil
    }),
    ro.OnErrorReturn("UNKNOWN"),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "APPLE"
    // Next: "BANANA"
    // Next: "UNKNOWN" (error fallback)
    // Completed

    API request with default value

    fetchUser := func(id int) Observable[string] {
    return ro.Defer(func() Observable[string] {
    if id == 999 {
    return ro.Throw[string](errors.New("user not found"))
    }
    return ro.Just(fmt.Sprintf("User%d", id))
    })
    }

    obs := ro.Pipe(
    fetchUser(999),
    ro.OnErrorReturn("Guest"),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Guest" (error fallback)
    // Completed

    With multiple error handling

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, fmt.Errorf("error at %d", i)
    }
    return i * 10, nil
    }),
    ro.OnErrorReturn(-999),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 10 (1*10)
    // Next: 20 (2*10)
    // Next: -999 (error fallback)
    // Completed

    Configuration loading with default

    loadConfig := func() Observable[string] {
    return ro.Defer(func() Observable[string] {
    // Simulate config file not found
    return ro.Throw[string](errors.New("config.json not found"))
    })
    }

    obs := ro.Pipe(
    loadConfig(),
    ro.OnErrorReturn("default_config"),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "default_config" (fallback to default)
    // Completed

    With complex fallback object

    type User struct {
    ID int
    Name string
    }

    fetchUser := func(id int) Observable[User] {
    return ro.Defer(func() Observable[User] {
    if id <= 0 {
    return ro.Throw[User](errors.New("invalid user ID"))
    }
    return User{ID: id, Name: fmt.Sprintf("User%d", id)}
    })
    }

    obs := ro.Pipe(
    fetchUser(-1),
    ro.OnErrorReturn(User{ID: 0, Name: "Anonymous"}),
    )

    sub := obs.Subscribe(ro.PrintObserver[User]())
    defer sub.Unsubscribe()

    // Next: {ID:0 Name:Anonymous} (error fallback)
    // Completed

    In a processing pipeline

    processData := func(data []int) Observable[int] {
    return ro.Pipe(
    ro.FromSlice(data),
    ro.MapErr(func(i int) (int, error) {
    if i < 0 {
    return 0, fmt.Errorf("negative value: %d", i)
    }
    return i * 2, nil
    }),
    ro.OnErrorReturn(0), // Use 0 for negative values
    )
    }

    obs := processData([]int{1, 2, -3, 4})

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 2 (1*2)
    // Next: 4 (2*2)
    // Next: 0 (fallback for -3)
    // Next: 8 (4*2)
    // Completed
    Prototype:
    func OnErrorReturn[T any](finally T)
  • Emits values from the source observable, then repeats the sequence as long as the condition returns true.

    counter := 0
    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.DoWhile(func() bool {
    counter++
    return counter <= 3
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1, 2, 3 (1st iteration)
    // Next: 1, 2, 3 (2nd iteration)
    // Next: 1, 2, 3 (3rd iteration)
    // Completed

    DoWhileI with index

    obs := ro.Pipe(
    ro.Just("a", "b"),
    ro.DoWhileI(func(index int64) bool {
    return index < 2 // Repeat twice (index 0 and 1)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "a", "b" (index 0)
    // Next: "a", "b" (index 1)
    // Completed

    DoWhileWithContext with cancellation

    ctx, cancel := context.WithCancel(context.Background())

    obs := ro.Pipe(
    ro.Just(1, 2),
    ro.DoWhileWithContext(func(ctx context.Context) (context.Context, bool) {
    select {
    case <-ctx.Done():
    return ctx, false
    default:
    return ctx, true // Continue repeating
    }
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())

    // After some items...
    cancel() // Stop repeating
    defer sub.Unsubscribe()

    DoWhileIWithContext with index and context

    ctx := context.Background()
    obs := ro.Pipe(
    ro.Just("x"),
    ro.DoWhileIWithContext(func(ctx context.Context, index int64) (context.Context, bool) {
    fmt.Printf("Iteration %d\n", index)
    return ctx, index < 2 // Repeat for 2 iterations
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Iteration 0
    // Next: "x"
    // Iteration 1
    // Next: "x"
    // Completed

    Retry pattern with DoWhile

    maxAttempts := 3
    attempt := 0
    shouldRetry := func() bool {
    attempt++
    return attempt <= maxAttempts
    }

    obs := ro.Pipe(
    ro.Defer(func() ro.Observable[int] {
    if attempt < maxAttempts {
    return ro.Throw[int](errors.New("temporary failure"))
    }
    return ro.Just(42) // Success on final attempt
    }),
    ro.DoWhile(shouldRetry),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Would retry until maxAttempts reached
    // Next: 42 (success on 3rd attempt)

    Polling with DoWhile

    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()

    obs := ro.Pipe(
    ro.Defer(func() ro.Observable[int] {
    // Simulate checking for new data
    if rand.Intn(10) == 0 {
    return ro.Just(rand.Intn(100))
    }
    return ro.Empty[int]()
    }),
    ro.DoWhileWithContext(func(ctx context.Context) (context.Context, bool) {
    select {
    case <-ticker.C:
    return ctx, true // Continue polling
    case <-ctx.Done():
    return ctx, false
    }
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(1 * time.Second)
    sub.Unsubscribe()

    // Emits random values as they become available
    // Stops after 1 second

    With external state

    type GameState struct {
    Score int
    Lives int
    GameOver bool
    }

    game := &GameState{Lives: 3}
    obs := ro.Pipe(
    ro.Defer(func() ro.Observable[string] {
    if game.Lives <= 0 {
    game.GameOver = true
    return ro.Just("Game Over")
    }
    action := fmt.Sprintf("Action - Lives: %d", game.Lives)
    game.Lives--
    return ro.Just(action)
    }),
    ro.DoWhile(func() bool {
    return !game.GameOver
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Action - Lives: 3"
    // Next: "Action - Lives: 2"
    // Next: "Action - Lives: 1"
    // Next: "Game Over"
    // Completed
    Similar:
    Prototypes:
    func DoWhile[T any](condition func() bool)
    func DoWhileI[T any](condition func(index int64) bool)
    func DoWhileWithContext[T any](condition func(context.Context) (context.Context, bool))
    func DoWhileIWithContext[T any](condition func(context.Context, index int64) (context.Context, bool))
  • Repeats the source observable as long as the condition returns true. Unlike DoWhile, While checks the condition before each iteration.

    counter := 0
    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.While(func() bool {
    counter++
    return counter <= 3
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1, 2, 3 (counter becomes 1, condition: 1 <= 3 = true)
    // Next: 1, 2, 3 (counter becomes 2, condition: 2 <= 3 = true)
    // Next: 1, 2, 3 (counter becomes 3, condition: 3 <= 3 = true)
    // Completed (counter becomes 4, condition: 4 <= 3 = false)

    WhileI with index

    obs := ro.Pipe(
    ro.Just("a", "b"),
    ro.WhileI(func(index int64) bool {
    return index < 2 // Repeat twice (index 0 and 1)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "a", "b" (index 0)
    // Next: "a", "b" (index 1)
    // Completed (index 2, condition false)

    WhileWithContext with cancellation

    ctx, cancel := context.WithCancel(context.Background())

    obs := ro.Pipe(
    ro.Just(1, 2),
    ro.WhileWithContext(func(ctx context.Context) (context.Context, bool) {
    select {
    case <-ctx.Done():
    return ctx, false
    default:
    return ctx, true // Continue repeating
    }
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())

    // Cancel after some iterations
    cancel()
    defer sub.Unsubscribe()

    WhileIWithContext with index and context

    ctx := context.Background()
    obs := ro.Pipe(
    ro.Just("test"),
    ro.WhileIWithContext(func(ctx context.Context, index int64) (context.Context, bool) {
    fmt.Printf("Checking iteration %d\n", index)
    return ctx, index < 3 // Repeat 3 times
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Checking iteration 0
    // Next: "test"
    // Checking iteration 1
    // Next: "test"
    // Checking iteration 2
    // Next: "test"
    // Completed

    Conditional data generation

    dataAvailable := true
    obs := ro.Pipe(
    ro.Defer(func() Observable[int] {
    // Simulate data fetch
    if !dataAvailable {
    return ro.Empty[int]()
    }
    dataAvailable = rand.Intn(2) == 0 // Randomly set availability
    return ro.Just(rand.Intn(100))
    }),
    ro.While(func() bool {
    return dataAvailable
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Emits values while data is available
    // Stops when dataAvailable becomes false

    Polling with timeout

    startTime := time.Now()
    timeout := 2 * time.Second

    obs := ro.Pipe(
    ro.Defer(func() Observable[string] {
    // Check if timeout reached
    if time.Since(startTime) > timeout {
    return ro.Empty[string]()
    }
    // Simulate checking for messages
    if rand.Intn(5) == 0 {
    return ro.Just("new message")
    }
    return ro.Empty[string]()
    }),
    ro.While(func() bool {
    return time.Since(startTime) <= timeout
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(2500 * time.Millisecond)
    sub.Unsubscribe()

    // Polls for messages for 2 seconds
    // Emits "new message" when available

    Rate-limited processing

    processed := 0
    maxItems := 10
    obs := ro.Pipe(
    ro.Defer(func() Observable[int] {
    if processed >= maxItems {
    return ro.Empty[int]()
    }
    processed++
    return ro.Just(processed)
    }),
    ro.While(func() bool {
    return processed < maxItems
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
    // Completed

    With external resource monitoring

    type ResourceMonitor struct {
    isActive bool
    count int
    }

    monitor := &ResourceMonitor{isActive: true}
    obs := ro.Pipe(
    ro.Defer(func() Observable[string] {
    if !monitor.isActive || monitor.count >= 5 {
    return ro.Empty[string]()
    }
    monitor.count++
    return ro.Just(fmt.Sprintf("Resource update %d", monitor.count))
    }),
    ro.While(func() bool {
    return monitor.isActive && monitor.count < 5
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Resource update 1"
    // Next: "Resource update 2"
    // Next: "Resource update 3"
    // Next: "Resource update 4"
    // Next: "Resource update 5"
    // Completed
    Similar:
    Prototypes:
    func While[T any](condition func() bool)
    func WhileI[T any](condition func(index int64) bool)
    func WhileWithContext[T any](condition func(context.Context) (context.Context, bool))
    func WhileIWithContext[T any](condition func(context.Context, index int64) (context.Context, bool))