Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Combining operatorsโ€‹

This page lists all combining operations, available in the core package of ro.

  • Creates an Observable that emits items from multiple source Observables, interleaved as they are emitted.

    Merge multiple sources

    obs := ro.Merge(
    ro.Just(1, 2, 3),
    ro.Just(4, 5, 6),
    ro.Just(7, 8, 9),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Order may vary due to interleaving
    // Next: 1
    // Next: 4
    // Next: 7
    // Next: 2
    // Next: 5
    // Next: 8
    // Next: 3
    // Next: 6
    // Next: 9
    // Completed

    With different emission rates

    obs := ro.Merge(
    ro.Pipe(ro.Interval(100*time.Millisecond), ro.Take[int64](3)), // 0,1,2
    ro.Pipe(ro.Interval(200*time.Millisecond), ro.Take[int64](2)), // 0,1
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(800 * time.Millisecond)
    sub.Unsubscribe()

    // Values interleaved based on emission timing
    // 0, 0, 1, 1, 2

    With hot observables

    source1 := ro.Interval(100 * time.Millisecond)
    source2 := ro.Interval(150 * time.Millisecond)
    obs := ro.Merge(
    ro.Pipe(source1, ro.Take[int64](5)),
    ro.Pipe(source2, ro.Take[int64](3)),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Values interleaved based on actual emission times

    With error handling

    obs := ro.Merge(
    ro.Just(1, 2, 3),
    ro.Pipe(
    ro.Just(4, 5, 6),
    ro.MapErr(func(i int) (int, error) {
    if i == 5 {
    return 0, fmt.Errorf("error on 5")
    }
    return i, nil
    }),
    ),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value int) {
    fmt.Printf("Next: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Complete")
    },
    ))
    time.Sleep(100 * time.Millisecond)
    sub.Unsubscribe()

    // Error terminates the entire merged observable

    With single source

    obs := ro.Merge(
    ro.Just(1, 2, 3),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed

    With no sources

    obs := ro.Merge[int]()

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Immediately completes with no values
    Prototype:
    func Merge[T any](sources ...Observable[T])
  • Creates an Observable that combines the latest values from multiple source Observables, emitting tuples or arrays of the most recent values from each.

    CombineLatest2

    obs := ro.CombineLatest2(
    ro.Just(1, 2, 3),
    ro.Just("A", "B", "C"),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
    defer sub.Unsubscribe()

    // Next: (1, A)
    // Next: (2, A)
    // Next: (2, B)
    // Next: (3, B)
    // Next: (3, C)
    // Completed

    CombineLatest3

    obs := ro.CombineLatest3(
    ro.Just(1, 2, 3),
    ro.Just("A", "B", "C"),
    ro.Just(true, false),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true)
    // Next: (2, A, true)
    // Next: (2, B, true)
    // Next: (3, B, true)
    // Next: (3, B, false)
    // Next: (3, C, false)
    // Completed

    CombineLatest4

    obs := ro.CombineLatest4(
    ro.Just(1, 2),
    ro.Just("A", "B"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple4[int, string, bool, float64]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true, 1.1)
    // Next: (2, A, true, 1.1)
    // Next: (2, B, true, 1.1)
    // Next: (2, B, false, 1.1)
    // Next: (2, B, false, 2.2)
    // Completed

    CombineLatest5

    obs := ro.CombineLatest5(
    ro.Just(1, 2),
    ro.Just("A", "B"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    ro.Just([]int{10, 20}),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple5[int, string, bool, float64, []int]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true, 1.1, [10, 20])
    // Next: (2, A, true, 1.1, [10, 20])
    // Next: (2, B, true, 1.1, [10, 20])
    // Next: (2, B, false, 1.1, [10, 20])
    // Next: (2, B, false, 2.2, [10, 20])
    // Completed

    CombineLatestAny

    obs := ro.CombineLatestAny(
    ro.Just(1, 2, 3),
    ro.Just("A", "B", "C"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]any]())
    defer sub.Unsubscribe()

    // Next: [1, A, true, 1.1]
    // Next: [2, A, true, 1.1]
    // Next: [2, B, true, 1.1]
    // Next: (3, B, true, 1.1)
    // Next: (3, B, false, 1.1)
    // Next: (3, B, false, 2.2)
    // Completed

    With different emission rates

    obs := ro.CombineLatest2(
    ro.Pipe(ro.Interval(100*time.Millisecond), ro.Take[int64](5)), // Fast: 0,1,2,3,4
    ro.Pipe(ro.Interval(300*time.Millisecond), ro.Take[int64](2)), // Slow: 0,1
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int64, int64]]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: (0, 0) - both emitted
    // Next: (1, 0) - first emitted again
    // Next: (2, 0) - first emitted again
    // Next: (2, 1) - second emitted
    // Next: (3, 1) - first emitted again
    // Next: (4, 1) - first emitted again
    // Completed

    Edge case: One observable completes early

    obs := ro.CombineLatest2(
    ro.Just(1, 2, 3, 4, 5),
    ro.Pipe(ro.Just("A", "B"), ro.Take[string](1)), // Only emits "A"
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
    defer sub.Unsubscribe()

    // Next: (1, A)
    // Next: (2, A)
    // Next: (3, A)
    // Next: (4, A)
    // Next: (5, A)
    // Completed
    Prototypes:
    func CombineLatest2[A any, B any](obsA Observable[A], obsB Observable[B])
    func CombineLatest3[A any, B any, C any](obsA Observable[A], obsB Observable[B], obsC Observable[C])
    func CombineLatest4[A any, B any, C any, D any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D])
    func CombineLatest5[A any, B any, C any, D any, E any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E])
    func CombineLatestAny(sources ...Observable[any])
  • Creates an Observable that merges items from multiple Observable sources provided as a higher-order Observable, emitting items as they are emitted from any source.

    obs := ro.Pipe(
    ro.Just(
    ro.Just(1, 2, 3),
    ro.Just(4, 5, 6),
    ro.Just(7, 8, 9),
    ),
    ro.MergeAll[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Order may vary due to interleaving
    // Next: 1
    // Next: 4
    // Next: 7
    // Next: 2
    // Next: 5
    // Next: 8
    // Next: 3
    // Next: 6
    // Next: 9
    // Completed

    With different emission rates

    obs := ro.Pipe(
    ro.Just(
    ro.Pipe(ro.Interval(100*time.Millisecond), ro.Take[int64](3)), // Fast: 0,1,2
    ro.Pipe(ro.Interval(200*time.Millisecond), ro.Take[int64](2)), // Medium: 0,1
    ro.Pipe(ro.Interval(300*time.Millisecond), ro.Take[int64](1)), // Slow: 0
    ),
    ro.MergeAll[int64](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Values interleaved based on emission timing
    // 0, 0, 0, 1, 1, 2

    Dynamic observable collection

    // Create observables dynamically
    observables := []ro.Observable[int]{
    ro.Just(1, 2),
    ro.Just(3, 4),
    ro.Just(5, 6),
    }

    obs := ro.Pipe(
    ro.Just(observables...),
    ro.MergeAll[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Merges all values from the collection of observables

    Edge case: Empty observable of observables

    obs := ro.Pipe(
    ro.Empty[ro.Observable[int]](),
    ro.MergeAll[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // No items emitted, completes immediately
    // Completed

    Edge case: Single observable in collection

    obs := ro.Pipe(
    ro.Just(ro.Just(1, 2, 3)),
    ro.MergeAll[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed
    Prototype:
    func MergeAll[T any]()
  • Creates an Observable that merges emissions from the source Observable with additional source Observables, interleaved as they are emitted.

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MergeWith(ro.Just(4, 5, 6)),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Order may vary due to interleaving
    // Next: 1
    // Next: 4
    // Next: 2
    // Next: 5
    // Next: 3
    // Next: 6
    // Completed

    MergeWith1 (alias for MergeWith)

    obs := ro.Pipe(
    ro.Just("A", "B", "C"),
    ro.MergeWith1(ro.Just("D", "E", "F")),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // MergeWith1 is an alias for MergeWith
    // Values interleaved from both sources

    MergeWith2 (three sources)

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MergeWith2(
    ro.Just(4, 5, 6),
    ro.Just(7, 8, 9),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Order may vary due to interleaving
    // Combines source with two additional observables
    // Next values will be interleaved from all three sources

    MergeWith3 (four sources)

    obs := ro.Pipe(
    ro.Just(1, 2),
    ro.MergeWith3(
    ro.Just(3, 4),
    ro.Just(5, 6),
    ro.Just(7, 8),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Combines source with three additional observables
    // All values interleaved as they're emitted

    MergeWith4 (five sources)

    obs := ro.Pipe(
    ro.Just(1, 2),
    ro.MergeWith4(
    ro.Just(3, 4),
    ro.Just(5, 6),
    ro.Just(7, 8),
    ro.Just(9, 10),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Combines source with four additional observables

    MergeWith5 (six sources)

    obs := ro.Pipe(
    ro.Just(1, 2),
    ro.MergeWith5(
    ro.Just(3, 4),
    ro.Just(5, 6),
    ro.Just(7, 8),
    ro.Just(9, 10),
    ro.Just(11, 12),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Combines source with five additional observables
    // Maximum convenience method for merging six sources

    With hot observables

    obs := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.Take[int64](3),
    ro.MergeWith2(
    ro.Pipe(ro.Interval(150 * time.Millisecond), ro.Take[int64](2)),
    ro.Pipe(ro.Interval(200 * time.Millisecond), ro.Take[int64](2)),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Values interleaved based on actual emission timing from all sources

    With error propagation

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MergeWith(
    ro.Pipe(
    ro.Just(4, 5, 6),
    ro.MapErr(func(i int) (int, error) {
    if i == 5 {
    return 0, fmt.Errorf("error on 5")
    }
    return i, nil
    }),
    ),
    ),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value int) {
    fmt.Printf("Next: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    }
    ))
    time.Sleep(100 * time.Millisecond)
    defer sub.Unsubscribe()

    // Error from any source terminates the entire merged observable

    With different data types

    obs := ro.Pipe(
    ro.Just("hello", "world"),
    ro.MergeWith2(
    ro.Just(1, 2, 3),
    ro.Just(true, false),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[any]())
    defer sub.Unsubscribe()

    // All values can be merged as any type
    // Values interleaved as they're emitted from each source

    With async operations

    obs := ro.Pipe(
    ro.Just("task1", "task2"),
    ro.MergeWith(
    ro.Pipe(
    ro.Just("async1", "async2"),
    ro.MapAsync(func(task string) ro.Observable[string] {
    return ro.Defer(func() ro.Observable[string] {
    time.Sleep(50 * time.Millisecond)
    return ro.Just(task + "_done")
    })
    }, 2),
    ),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(300 * time.Millisecond)
    defer sub.Unsubscribe()

    // Values interleaved as async operations complete

    With conditional merging

    shouldMerge := true
    var secondary ro.Observable[int] = ro.Just(4, 5, 6)

    if !shouldMerge {
    secondary = ro.Empty[int]()
    }

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MergeWith(secondary),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Conditional merging based on runtime logic
    Prototypes:
    func MergeWith[T any](obsB ro.Observable[T])
    func MergeWith1[T any](obsB ro.Observable[T])
    func MergeWith2[T any](obsB ro.Observable[T], obsC ro.Observable[T])
    func MergeWith3[T any](obsB ro.Observable[T], obsC ro.Observable[T], obsD ro.Observable[T])
    func MergeWith4[T any](obsB ro.Observable[T], obsC ro.Observable[T], obsD ro.Observable[T], obsE ro.Observable[T])
    func MergeWith5[T any](obsB ro.Observable[T], obsC ro.Observable[T], obsD ro.Observable[T], obsE ro.Observable[T], obsF ro.Observable[T])
  • Creates an Observable that mirrors the first source Observable to emit a next, error or complete notification. It's an alias for Race.

    The Observable cancels subscriptions to all other Observables once one emits. It completes when the winning source Observable completes.

    obs1 := ro.Pipe(
    ro.Just("fast"),
    ro.Delay(100*time.Millisecond),
    )

    obs2 := ro.Pipe(
    ro.Just("slow"),
    ro.Delay(200*time.Millisecond),
    )

    obs3 := ro.Pipe(
    ro.Just("slowest"),
    ro.Delay(300*time.Millisecond),
    )

    ambObs := ro.Amb(obs1, obs2, obs3)

    sub := ambObs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(400 * time.Millisecond)
    sub.Unsubscribe()

    // Next: fast
    // Completed

    With immediate winner

    instant := ro.Just("immediate")
    delayed := ro.Pipe(
    ro.Just("delayed"),
    ro.Delay(100*time.Millisecond),
    )

    ambObs := ro.Amb(delayed, instant)

    sub := ambObs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(200 * time.Millisecond)
    sub.Unsubscribe()

    // Next: immediate
    // Completed

    With error propagation

    obs1 := ro.Pipe(
    ro.Just("success"),
    ro.Delay(100*time.Millisecond),
    )

    obs2 := ro.Pipe(
    ro.Throw[string](errors.New("failed")),
    )

    ambObs := ro.Amb(obs1, obs2)

    sub := ambObs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(200 * time.Millisecond)
    sub.Unsubscribe()

    // Next: success
    // Completed

    With empty sources

    ambObs := ro.Amb[string]() // No sources provided

    sub := ambObs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Completed (empty observable)
    Similar:
    Prototypes:
    func Amb[T any](sources ...Observable[T])
    func Race[T any](sources ...Observable[T])
  • CombineLatestWithโ€‹

    Creates an Observable that combines the latest values from the source Observable with other provided Observables, emitting tuples of the most recent values.

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.CombineLatestWith(ro.Just("A", "B", "C")),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
    defer sub.Unsubscribe()

    // Next: (1, A)
    // Next: (2, A)
    // Next: (2, B)
    // Next: (3, B)
    // Next: (3, C)
    // Completed

    CombineLatestWith2 (three observables)

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.CombineLatestWith2(
    ro.Just("A", "B", "C"),
    ro.Just(true, false),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true)
    // Next: (2, A, true)
    // Next: (2, B, true)
    // Next: (3, B, true)
    // Next: (3, B, false)
    // Next: (3, C, false)
    // Completed

    Chain multiple CombineLatestWith

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.CombineLatestWith(ro.Just("A", "B")),
    ro.CombineLatestWith(ro.Just(true, false)),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
    defer sub.Unsubscribe()

    // Same result as CombineLatestWith2
    // Next: (1, A, true)
    // Next: (2, A, true)
    // Next: (2, B, true)
    // Next: (3, B, true)
    // Next: (3, B, false)
    // Completed

    CombineLatestWith3 (four observables)

    obs := ro.Pipe(
    ro.Just(1, 2),
    ro.CombineLatestWith3(
    ro.Just("A", "B"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple4[int, string, bool, float64]]())
    defer sub.Unsubscribe()

    // Combines latest from all four sources
    // Next: (1, A, true, 1.1)
    // Next: (2, A, true, 1.1)
    // Next: (2, B, true, 1.1)
    // Next: (2, B, false, 1.1)
    // Next: (2, B, false, 2.2)
    // Completed

    Practical example: Combining data streams

    // User IDs stream
    userIDs := ro.Just(1, 2, 3)

    // User details stream (slower)
    userDetails := ro.Pipe(
    ro.Just(
    User{1, "Alice"},
    User{2, "Bob"},
    User{3, "Charlie"},
    ),
    ro.Map(func(u User) int { return u.ID }), // Extract IDs
    )

    // Permissions stream
    permissions := ro.Just("admin", "user", "guest")

    // Combine user IDs with their details and permissions
    obs := ro.Pipe(
    userIDs,
    ro.CombineLatestWith2(userDetails, permissions),
    ro.Map(func(t lo.Tuple3[int, int, string]) string {
    userID, detailID, perm := t.Get3()
    return fmt.Sprintf("User %d (detail %d): %s", userID, detailID, perm)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Outputs combinations of latest values from all streams
    Prototypes:
    func CombineLatestWith[A any, B any](obsB Observable[B])
    func CombineLatestWith1[A any, B any](obsB Observable[B])
    func CombineLatestWith2[A any, B any, C any](obsB Observable[B], obsC Observable[C])
    func CombineLatestWith3[A any, B any, C any, D any](obsB Observable[B], obsC Observable[C], obsD Observable[D])
    func CombineLatestWith4[A any, B any, C any, D any, E any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E])
  • Creates an Observable that combines the values from multiple source Observables by emitting tuples or arrays of values in the order they were zipped.

    Zip2

    obs := ro.Zip2(
    ro.Just(1, 2, 3),
    ro.Just("A", "B", "C"),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
    defer sub.Unsubscribe()

    // Next: (1, A)
    // Next: (2, B)
    // Next: (3, C)
    // Completed

    Zip3

    obs := ro.Zip3(
    ro.Just(1, 2, 3),
    ro.Just("A", "B", "C"),
    ro.Just(true, false, true),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true)
    // Next: (2, B, false)
    // Next: (3, C, true)
    // Completed

    Zip4

    obs := ro.Zip4(
    ro.Just(1, 2),
    ro.Just("A", "B"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple4[int, string, bool, float64]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true, 1.1)
    // Next: (2, B, false, 2.2)
    // Completed

    Zip5

    obs := ro.Zip5(
    ro.Just(1, 2),
    ro.Just("A", "B"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    ro.Just([]int{10, 20}),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple5[int, string, bool, float64, []int]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true, 1.1, [10, 20])
    // Next: (2, B, false, 2.2, [10, 20])
    // Completed

    Zip6

    obs := ro.Zip6(
    ro.Just(1, 2),
    ro.Just("A", "B"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    ro.Just([]int{10, 20}),
    ro.Just("x", "y"),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple6[int, string, bool, float64, []int, string]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true, 1.1, [10, 20], "x")
    // Next: (2, B, false, 2.2, [10, 20], "y")
    // Completed

    Zip with multiple sources

    obs := ro.Zip(
    ro.Just(1, 2, 3),
    ro.Just("A", "B", "C"),
    ro.Just(true, false, true),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]any]())
    defer sub.Unsubscribe()

    // Next: [1, A, true]
    // Next: [2, B, false]
    // Next: [3, C, true]
    // Completed

    Edge case: Different length observables

    obs := ro.Zip2(
    ro.Just(1, 2, 3, 4, 5), // 5 items
    ro.Just("A", "B"), // 2 items
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
    defer sub.Unsubscribe()

    // Next: (1, A)
    // Next: (2, B)
    // Completed
    // Only zips up to the shortest observable

    Edge case: Single observable

    obs := ro.Zip(
    ro.Just(1, 2, 3),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]any]())
    defer sub.Unsubscribe()

    // Next: [1]
    // Next: [2]
    // Next: [3]
    // Completed
    Prototypes:
    func Zip[T any](sources ...Observable[T])
    func Zip2[A any, B any](obsA Observable[A], obsB Observable[B])
    func Zip3[A any, B any, C any](obsA Observable[A], obsB Observable[B], obsC Observable[C])
    func Zip4[A any, B any, C any, D any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D])
    func Zip5[A any, B any, C any, D any, E any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E])
    func Zip6[A any, B any, C any, D any, E any, F any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E], obsF Observable[F])
  • Creates an Observable that combines values from the source Observable with other source Observables using a pipe operator pattern, emitting tuples of zipped values.

    ZipWith

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.ZipWith(ro.Just("A", "B", "C")),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
    defer sub.Unsubscribe()

    // Next: (1, A)
    // Next: (2, B)
    // Next: (3, C)
    // Completed

    ZipWith2 (three sources)

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.ZipWith2(
    ro.Just("A", "B", "C"),
    ro.Just(true, false, true),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true)
    // Next: (2, B, false)
    // Next: (3, C, true)
    // Completed

    ZipWith3 (four sources)

    obs := ro.Pipe(
    ro.Just(1, 2),
    ro.ZipWith3(
    ro.Just("A", "B"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple4[int, string, bool, float64]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true, 1.1)
    // Next: (2, B, false, 2.2)
    // Completed

    ZipWith4 (five sources)

    obs := ro.Pipe(
    ro.Just(1, 2),
    ro.ZipWith4(
    ro.Just("A", "B"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    ro.Just([]int{10, 20}),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple5[int, string, bool, float64, []int]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true, 1.1, [10, 20])
    // Next: (2, B, false, 2.2, [10, 20])
    // Completed

    ZipWith5 (six sources)

    obs := ro.Pipe(
    ro.Just(1, 2),
    ro.ZipWith5(
    ro.Just("A", "B"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    ro.Just([]int{10, 20}),
    ro.Just("x", "y"),
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple6[int, string, bool, float64, []int, string]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true, 1.1, [10, 20], "x")
    // Next: (2, B, false, 2.2, [10, 20], "y")
    // Completed

    With different emission rates

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.ZipWith(
    ro.Pipe(ro.Interval(100*time.Millisecond), ro.Take[string](3)), // "A", "B", "C"
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Values are zipped as they arrive
    // Next: (1, A)
    // Next: (2, B)
    // Next: (3, C)
    // Completed

    Edge case: Different length observables

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5), // 5 items
    ro.ZipWith2(
    ro.Just("A", "B"), // 2 items
    ro.Just(true, false), // 2 items
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
    defer sub.Unsubscribe()

    // Next: (1, A, true)
    // Next: (2, B, false)
    // Completed
    // Only zips up to the shortest observable
    Prototypes:
    func ZipWith[A any, B any](obsB Observable[B])
    func ZipWith1[A any, B any](obsB Observable[B])
    func ZipWith2[A any, B any, C any](obsB Observable[B], obsC Observable[C])
    func ZipWith3[A any, B any, C any, D any](obsB Observable[B], obsC Observable[C], obsD Observable[D])
    func ZipWith4[A any, B any, C any, D any, E any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E])
    func ZipWith5[A any, B any, C any, D any, E any, F any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E], obsF Observable[F])
  • Creates an Observable that zips items from multiple Observable sources provided as a higher-order Observable, emitting arrays of zipped values.

    ro.Just(
    ro.Just(1, 2, 3),
    ro.Just("A", "B", "C"),
    ro.Just(true, false, true),
    ),
    ro.ZipAll[any](),

    )

    sub := obs.Subscribe(ro.PrintObserver[]any)
    defer sub.Unsubscribe()

    // Next: [1, A, true]
    // Next: [2, B, false]
    // Next: [3, C, true]
    // Completed


    ### Dynamic observable collection

    ```go
    // Create observables dynamically
    observables := []Observable[int]{
    ro.Just(1, 2),
    ro.Just(3, 4),
    ro.Just(5, 6),
    }

    obs := ro.Pipe(
    ro.Just(observables...),
    ro.ZipAll[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [1, 3, 5]
    // Next: [2, 4, 6]
    // Completed

    With different types

    obs := ro.Pipe(
    ro.Just(
    ro.Just(1, 2),
    ro.Just("A", "B"),
    ro.Just(true, false),
    ro.Just(1.1, 2.2),
    ),
    ro.ZipAll[any](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]any]())
    defer sub.Unsubscribe()

    // Next: [1, "A", true, 1.1]
    // Next: [2, "B", false, 2.2]
    // Completed

    Edge case: Empty observable of observables

    obs := ro.Pipe(
    ro.Empty[Observable[int]](),
    ro.ZipAll[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // No items emitted, completes immediately
    // Completed

    Edge case: Single observable in collection

    obs := ro.Pipe(
    ro.Just(ro.Just(1, 2, 3)),
    ro.ZipAll[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [1]
    // Next: [2]
    // Next: [3]
    // Completed

    Edge case: Different length observables

    obs := ro.Pipe(
    ro.Just(
    ro.Just(1, 2, 3, 4, 5), // 5 items
    ro.Just("A", "B"), // 2 items
    ro.Just(true, false), // 2 items
    ),
    ro.ZipAll[any](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]any]())
    defer sub.Unsubscribe()

    // Next: [1, "A", true]
    // Next: [2, "B", false]
    // Completed
    // Only zips up to the shortest observable
    Prototype:
    func ZipAll[T any]()
  • CombineLatestAllโ€‹

    Creates an Observable that combines the latest values from multiple Observable sources provided as a higher-order Observable, emitting arrays of the most recent values.

    obs := ro.Pipe(
    ro.Just(
    ro.Just(1, 2),
    ro.Just("A", "B"),
    ro.Just(true, false),
    ),
    ro.CombineLatestAll[any](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]any]())
    defer sub.Unsubscribe()

    // Next: [1, A, true]
    // Next: [2, A, true]
    // Next: [2, B, true]
    // Next: [2, B, false]
    // Completed

    With different emission rates

    obs := ro.Pipe(
    ro.Just(
    ro.Pipe(ro.Interval(100*time.Millisecond), ro.Take[int64](3)), // 0,1,2
    ro.Pipe(ro.Interval(200*time.Millisecond), ro.Take[int64](2)), // 0,1
    ro.Pipe(ro.Interval(300*time.Millisecond), ro.Take[int64](1)), // 0
    ),
    ro.CombineLatestAll[int64](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int64]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Combines latest from all sources
    // Next: [0, 0, 0]
    // Next: [1, 0, 0]
    // Next: [1, 1, 0]
    // Next: [2, 1, 0]
    // Completed

    Dynamic observable collection

    // Create observables dynamically
    observables := []Observable[int]{
    ro.Just(1, 2),
    ro.Just(10, 20),
    ro.Just(100, 200),
    }

    obs := ro.Pipe(
    ro.Just(observables...),
    ro.CombineLatestAll[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [1, 10, 100]
    // Next: [2, 10, 100]
    // Next: [2, 20, 100]
    // Next: [2, 20, 200]
    // Completed

    CombineLatestAllAny for mixed types

    obs := ro.Pipe(
    ro.Just(
    ro.Just(1, 2), // int
    ro.Just("A", "B"), // string
    ro.Just(true, false), // bool
    ),
    ro.CombineLatestAllAny(),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]any]())
    defer sub.Unsubscribe()

    // Next: [1, A, true]
    // Next: [2, A, true]
    // Next: [2, B, true]
    // Next: [2, B, false]
    // Completed

    Edge case: Empty observable of observables

    obs := ro.Pipe(
    ro.Empty[Observable[int]](),
    ro.CombineLatestAll[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // No items emitted, completes immediately
    // Completed

    Edge case: Single observable in collection

    obs := ro.Pipe(
    ro.Just(ro.Just(1, 2, 3)),
    ro.CombineLatestAll[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [1]
    // Next: [2]
    // Next: [3]
    // Completed
    Prototypes:
    func CombineLatestAll[T any]()
    func CombineLatestAllAny()
  • Transforms each item from the source Observable into an Observable, then merges the resulting Observables, emitting all items as they are emitted from any transformed Observable.

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MergeMap(func(n int) Observable[string] {
    return ro.Just(fmt.Sprintf("item-%d", n))
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Order may vary due to interleaving
    // Next: item-1
    // Next: item-2
    // Next: item-3
    // Completed

    With index parameter

    obs := ro.Pipe(
    ro.Just("A", "B", "C"),
    ro.MergeMapI(func(letter string, index int64) Observable[string] {
    return ro.Just(fmt.Sprintf("%s-%d", letter, index))
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: A-0
    // Next: B-1
    // Next: C-2
    // Completed

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MergeMapWithContext(func(ctx context.Context, n int) (context.Context, Observable[int]) {
    // Can use context for cancellation or values
    return ctx, ro.Just(n * 10)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 10
    // Next: 20
    // Next: 30
    // Completed

    With both context and index

    obs := ro.Pipe(
    ro.Just("hello", "world"),
    ro.MergeMapIWithContext(func(ctx context.Context, word string, index int64) (context.Context, Observable[int]) {
    return ctx, ro.Just(len(word))
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 5
    // Next: 5
    // Completed

    With async operations

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MergeMap(func(n int) Observable[int] {
    return ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Take[int64](2),
    ro.Map(func(_ int64) int { return n }),
    )
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Values interleaved from all inner observables
    // Example output: 1, 1, 2, 2, 3, 3

    With error handling

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MergeMap(func(n int) Observable[int] {
    if n == 2 {
    return ro.Error[int](errors.New("error for 2"))
    }
    return ro.Just(n * 10)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 10
    // Next: 20
    // Next: 30
    // Completed

    Edge case: Empty source

    obs := ro.Pipe(
    ro.Empty[int](),
    ro.MergeMap(func(n int) Observable[string] {
    return ro.Just(fmt.Sprintf("item-%d", n))
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // No items emitted, completes immediately
    // Completed
    Prototypes:
    func MergeMap[T any, R any](project func(value T) Observable[R])
    func MergeMapI[T any, R any](project func(value T, index int64) Observable[R])
    func MergeMapWithContext[T any, R any](project func(ctx context.Context, value T) (context.Context, Observable[R]))
    func MergeMapIWithContext[T any, R any](project func(ctx context.Context, value T, index int64) (context.Context, Observable[R]))
  • Creates an Observable that mirrors the first source Observable to emit an item or send a notification. Amb is an alias for Race.

    fast := ro.Pipe(
    ro.Timer(100*time.Millisecond),
    ro.Map(func(_ int64) string { return "fast" }),
    )
    slow := ro.Pipe(
    ro.Timer(200*time.Millisecond),
    ro.Map(func(_ int64) string { return "slow" }),
    )

    obs := ro.Race(fast, slow)

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(300 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "fast" (from the faster observable)
    // Completed

    With multiple sources

    sources := []Observable[int]{
    ro.Pipe(ro.Timer(300*time.Millisecond), ro.Map(func(_ int64) int { return 1 })),
    ro.Pipe(ro.Timer(100*time.Millisecond), ro.Map(func(_ int64) int { return 2 })),
    ro.Pipe(ro.Timer(200*time.Millisecond), ro.Map(func(_ int64) int { return 3 })),
    }

    obs := ro.Race(sources...)

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(400 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 2 (from the 100ms timer)
    // Completed

    With error handling

    success := ro.Pipe(ro.Timer(200*time.Millisecond), ro.Map(func(_ int64) int { return 42 }))
    failure := ro.Pipe(ro.Timer(100*time.Millisecond), ro.Throw[int](errors.New("failed")))

    obs := Race(success, failure)

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(300 * time.Millisecond)
    sub.Unsubscribe()

    // Error: failed

    Timeout pattern

    data := ro.Future[int](func(resolve func(int), reject func(error)) {
    go func() {
    time.Sleep(2 * time.Second)
    resolve(42)
    }()
    })

    timeout := ro.Timer(1 * time.Second)
    result := Race(data, timeout)

    sub := result.Subscribe(ro.PrintObserver[any]())
    time.Sleep(2500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (from timeout after 1 second)
    // Completed

    Fallback pattern

    primary := ro.Future[string](func(resolve func(string), reject func(error)) {
    go func() {
    time.Sleep(500 * time.Millisecond)
    resolve("primary result")
    }()
    })

    fallback := ro.Just("fallback value")
    result := Race(primary, fallback)

    sub := result.Subscribe(ro.PrintObserver[string]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "fallback value"
    // Completed
    Prototypes:
    func Race[T any](sources ...Observable[T])
    func Amb[T any](sources ...Observable[T])
  • Creates an Observable that concatenates multiple source Observables, emitting all items from each source sequentially.

    obs := ro.Concat(
    ro.Just(1, 2, 3),
    ro.Just(4, 5, 6),
    ro.Just(7, 8, 9),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1, 2, 3 (from first observable)
    // Next: 4, 5, 6 (from second observable)
    // Next: 7, 8, 9 (from third observable)
    // Completed

    With time-based sources

    obs := ro.Concat(
    ro.Pipe(ro.Just(1), ro.Delay(100*time.Millisecond)),
    ro.Pipe(ro.Just(2), ro.Delay(100*time.Millisecond)),
    ro.Pipe(ro.Just(3), ro.Delay(100*time.Millisecond)),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 1 (after 100ms)
    // Next: 2 (after 200ms total)
    // Next: 3 (after 300ms total)
    // Completed

    With empty observables

    obs := ro.Concat(
    ro.Just(1, 2),
    ro.Empty[int](),
    ro.Just(3, 4),
    ro.Empty[int](),
    ro.Just(5),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // (empty observable emits nothing)
    // Next: 3
    // Next: 4
    // (empty observable emits nothing)
    // Next: 5
    // Completed

    Error propagation

    obs := ro.Concat(
    ro.Just(1, 2),
    ro.Pipe(ro.Just(3), ro.Throw[int](errors.New("error"))),
    ro.Just(4, 5),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1, 2
    // Error: error (subsequent observables are not processed)
    Prototype:
    func Concat[T any](sources ...Observable[T]) Observable[T]
  • Emits the given values before emitting the values from the source Observable.

    obs := ro.Pipe(
    ro.Just("a", "b", "c"),
    ro.StartWith("x", "y"),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: x
    // Next: y
    // Next: a
    // Next: b
    // Next: c
    // Completed

    With single prefix value

    obs := ro.Pipe(
    ro.Just(2, 3, 4),
    ro.StartWith(1),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Completed

    With multiple prefix values

    obs := ro.Pipe(
    ro.Just("c"),
    ro.StartWith("a", "b"),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: a
    // Next: b
    // Next: c
    // Completed

    With empty source observable

    obs := ro.Pipe(
    ro.Empty[string](),
    ro.StartWith("prefix1", "prefix2"),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: prefix1
    // Next: prefix2
    // Completed

    With no prefix values

    obs := ro.Pipe(
    ro.Just("a", "b"),
    ro.StartWith[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: a
    // Next: b
    // Completed

    With time-based observable

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.StartWith(int64(-1)),
    ro.Take(3),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(400 * time.Millisecond)
    sub.Unsubscribe()

    // Next: -1
    // Next: 0
    // Next: 1
    // Next: 2
    // Completed
    Prototype:
    func StartWith[T any](prefixes ...T)
  • Emits the previous and current values as a pair (array of two values). The first value doesn't emit until the second value arrives.

    obs := ro.Pipe(
    ro.Just("a", "b", "c", "d"),
    ro.Pairwise(),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]string]())
    defer sub.Unsubscribe()

    // Next: [a b]
    // Next: [b c]
    // Next: [c d]
    // Completed

    With numbers

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Pairwise(),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [1 2]
    // Next: [2 3]
    // Next: [3 4]
    // Next: [4 5]
    // Completed

    With single value

    obs := ro.Pipe(
    ro.Just("only one"),
    ro.Pairwise(),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]string]())
    defer sub.Unsubscribe()

    // Completed (no pairs emitted)

    With empty observable

    obs := ro.Pipe(
    ro.Empty[string](),
    ro.Pairwise(),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]string]())
    defer sub.Unsubscribe()

    // Completed (no pairs emitted)

    With time-based emissions

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Pairwise(),
    ro.Take(4),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int64]())
    time.Sleep(600 * time.Millisecond)
    sub.Unsubscribe()

    // Next: [0 1]
    // Next: [1 2]
    // Next: [2 3]
    // Next: [3 4]
    // Completed

    With custom types

    type Point struct {
    X, Y int
    }

    obs := ro.Pipe(
    ro.Just(
    Point{1, 2},
    Point{3, 4},
    Point{5, 6},
    ),
    ro.Pairwise(),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]Point]())
    defer sub.Unsubscribe()

    // Next: [{1 2} {3 4}]
    // Next: [{3 4} {5 6}]
    // Completed
    Prototype:
    func Pairwise[T any]()