This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Combining operatorsโ
This page lists all combining operations, available in the core package of ro.
Mergeโ
Creates an Observable that emits items from multiple source Observables, interleaved as they are emitted.
Merge multiple sources
obs := ro.Merge(
ro.Just(1, 2, 3),
ro.Just(4, 5, 6),
ro.Just(7, 8, 9),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Order may vary due to interleaving
// Next: 1
// Next: 4
// Next: 7
// Next: 2
// Next: 5
// Next: 8
// Next: 3
// Next: 6
// Next: 9
// CompletedWith different emission rates
obs := ro.Merge(
ro.Pipe(ro.Interval(100*time.Millisecond), ro.Take[int64](3)), // 0,1,2
ro.Pipe(ro.Interval(200*time.Millisecond), ro.Take[int64](2)), // 0,1
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(800 * time.Millisecond)
sub.Unsubscribe()
// Values interleaved based on emission timing
// 0, 0, 1, 1, 2With hot observables
source1 := ro.Interval(100 * time.Millisecond)
source2 := ro.Interval(150 * time.Millisecond)
obs := ro.Merge(
ro.Pipe(source1, ro.Take[int64](5)),
ro.Pipe(source2, ro.Take[int64](3)),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Values interleaved based on actual emission timesWith error handling
obs := ro.Merge(
ro.Just(1, 2, 3),
ro.Pipe(
ro.Just(4, 5, 6),
ro.MapErr(func(i int) (int, error) {
if i == 5 {
return 0, fmt.Errorf("error on 5")
}
return i, nil
}),
),
)
sub := obs.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Next: %d\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Complete")
},
))
time.Sleep(100 * time.Millisecond)
sub.Unsubscribe()
// Error terminates the entire merged observableWith single source
obs := ro.Merge(
ro.Just(1, 2, 3),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// CompletedWith no sources
obs := ro.Merge[int]()
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Immediately completes with no valuesPrototype:func Merge[T any](sources ...Observable[T])
CombineLatestโ
Creates an Observable that combines the latest values from multiple source Observables, emitting tuples or arrays of the most recent values from each.
CombineLatest2
obs := ro.CombineLatest2(
ro.Just(1, 2, 3),
ro.Just("A", "B", "C"),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
defer sub.Unsubscribe()
// Next: (1, A)
// Next: (2, A)
// Next: (2, B)
// Next: (3, B)
// Next: (3, C)
// CompletedCombineLatest3
obs := ro.CombineLatest3(
ro.Just(1, 2, 3),
ro.Just("A", "B", "C"),
ro.Just(true, false),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
defer sub.Unsubscribe()
// Next: (1, A, true)
// Next: (2, A, true)
// Next: (2, B, true)
// Next: (3, B, true)
// Next: (3, B, false)
// Next: (3, C, false)
// CompletedCombineLatest4
obs := ro.CombineLatest4(
ro.Just(1, 2),
ro.Just("A", "B"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple4[int, string, bool, float64]]())
defer sub.Unsubscribe()
// Next: (1, A, true, 1.1)
// Next: (2, A, true, 1.1)
// Next: (2, B, true, 1.1)
// Next: (2, B, false, 1.1)
// Next: (2, B, false, 2.2)
// CompletedCombineLatest5
obs := ro.CombineLatest5(
ro.Just(1, 2),
ro.Just("A", "B"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
ro.Just([]int{10, 20}),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple5[int, string, bool, float64, []int]]())
defer sub.Unsubscribe()
// Next: (1, A, true, 1.1, [10, 20])
// Next: (2, A, true, 1.1, [10, 20])
// Next: (2, B, true, 1.1, [10, 20])
// Next: (2, B, false, 1.1, [10, 20])
// Next: (2, B, false, 2.2, [10, 20])
// CompletedCombineLatestAny
obs := ro.CombineLatestAny(
ro.Just(1, 2, 3),
ro.Just("A", "B", "C"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
)
sub := obs.Subscribe(ro.PrintObserver[[]any]())
defer sub.Unsubscribe()
// Next: [1, A, true, 1.1]
// Next: [2, A, true, 1.1]
// Next: [2, B, true, 1.1]
// Next: (3, B, true, 1.1)
// Next: (3, B, false, 1.1)
// Next: (3, B, false, 2.2)
// CompletedWith different emission rates
obs := ro.CombineLatest2(
ro.Pipe(ro.Interval(100*time.Millisecond), ro.Take[int64](5)), // Fast: 0,1,2,3,4
ro.Pipe(ro.Interval(300*time.Millisecond), ro.Take[int64](2)), // Slow: 0,1
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int64, int64]]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Next: (0, 0) - both emitted
// Next: (1, 0) - first emitted again
// Next: (2, 0) - first emitted again
// Next: (2, 1) - second emitted
// Next: (3, 1) - first emitted again
// Next: (4, 1) - first emitted again
// CompletedEdge case: One observable completes early
obs := ro.CombineLatest2(
ro.Just(1, 2, 3, 4, 5),
ro.Pipe(ro.Just("A", "B"), ro.Take[string](1)), // Only emits "A"
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
defer sub.Unsubscribe()
// Next: (1, A)
// Next: (2, A)
// Next: (3, A)
// Next: (4, A)
// Next: (5, A)
// CompletedPrototypes:func CombineLatest2[A any, B any](obsA Observable[A], obsB Observable[B])
func CombineLatest3[A any, B any, C any](obsA Observable[A], obsB Observable[B], obsC Observable[C])
func CombineLatest4[A any, B any, C any, D any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D])
func CombineLatest5[A any, B any, C any, D any, E any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E])
func CombineLatestAny(sources ...Observable[any])MergeAllโ
Creates an Observable that merges items from multiple Observable sources provided as a higher-order Observable, emitting items as they are emitted from any source.
obs := ro.Pipe(
ro.Just(
ro.Just(1, 2, 3),
ro.Just(4, 5, 6),
ro.Just(7, 8, 9),
),
ro.MergeAll[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Order may vary due to interleaving
// Next: 1
// Next: 4
// Next: 7
// Next: 2
// Next: 5
// Next: 8
// Next: 3
// Next: 6
// Next: 9
// CompletedWith different emission rates
obs := ro.Pipe(
ro.Just(
ro.Pipe(ro.Interval(100*time.Millisecond), ro.Take[int64](3)), // Fast: 0,1,2
ro.Pipe(ro.Interval(200*time.Millisecond), ro.Take[int64](2)), // Medium: 0,1
ro.Pipe(ro.Interval(300*time.Millisecond), ro.Take[int64](1)), // Slow: 0
),
ro.MergeAll[int64](),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Values interleaved based on emission timing
// 0, 0, 0, 1, 1, 2Dynamic observable collection
// Create observables dynamically
observables := []ro.Observable[int]{
ro.Just(1, 2),
ro.Just(3, 4),
ro.Just(5, 6),
}
obs := ro.Pipe(
ro.Just(observables...),
ro.MergeAll[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Merges all values from the collection of observablesEdge case: Empty observable of observables
obs := ro.Pipe(
ro.Empty[ro.Observable[int]](),
ro.MergeAll[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// No items emitted, completes immediately
// CompletedEdge case: Single observable in collection
obs := ro.Pipe(
ro.Just(ro.Just(1, 2, 3)),
ro.MergeAll[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// CompletedSimilar:Prototype:func MergeAll[T any]()
MergeWithโ
Creates an Observable that merges emissions from the source Observable with additional source Observables, interleaved as they are emitted.
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MergeWith(ro.Just(4, 5, 6)),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Order may vary due to interleaving
// Next: 1
// Next: 4
// Next: 2
// Next: 5
// Next: 3
// Next: 6
// CompletedMergeWith1 (alias for MergeWith)
obs := ro.Pipe(
ro.Just("A", "B", "C"),
ro.MergeWith1(ro.Just("D", "E", "F")),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// MergeWith1 is an alias for MergeWith
// Values interleaved from both sourcesMergeWith2 (three sources)
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MergeWith2(
ro.Just(4, 5, 6),
ro.Just(7, 8, 9),
),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Order may vary due to interleaving
// Combines source with two additional observables
// Next values will be interleaved from all three sourcesMergeWith3 (four sources)
obs := ro.Pipe(
ro.Just(1, 2),
ro.MergeWith3(
ro.Just(3, 4),
ro.Just(5, 6),
ro.Just(7, 8),
),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Combines source with three additional observables
// All values interleaved as they're emittedMergeWith4 (five sources)
obs := ro.Pipe(
ro.Just(1, 2),
ro.MergeWith4(
ro.Just(3, 4),
ro.Just(5, 6),
ro.Just(7, 8),
ro.Just(9, 10),
),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Combines source with four additional observablesMergeWith5 (six sources)
obs := ro.Pipe(
ro.Just(1, 2),
ro.MergeWith5(
ro.Just(3, 4),
ro.Just(5, 6),
ro.Just(7, 8),
ro.Just(9, 10),
ro.Just(11, 12),
),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Combines source with five additional observables
// Maximum convenience method for merging six sourcesWith hot observables
obs := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.Take[int64](3),
ro.MergeWith2(
ro.Pipe(ro.Interval(150 * time.Millisecond), ro.Take[int64](2)),
ro.Pipe(ro.Interval(200 * time.Millisecond), ro.Take[int64](2)),
),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Values interleaved based on actual emission timing from all sourcesWith error propagation
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MergeWith(
ro.Pipe(
ro.Just(4, 5, 6),
ro.MapErr(func(i int) (int, error) {
if i == 5 {
return 0, fmt.Errorf("error on 5")
}
return i, nil
}),
),
),
)
sub := obs.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Next: %d\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
}
))
time.Sleep(100 * time.Millisecond)
defer sub.Unsubscribe()
// Error from any source terminates the entire merged observableWith different data types
obs := ro.Pipe(
ro.Just("hello", "world"),
ro.MergeWith2(
ro.Just(1, 2, 3),
ro.Just(true, false),
),
)
sub := obs.Subscribe(ro.PrintObserver[any]())
defer sub.Unsubscribe()
// All values can be merged as any type
// Values interleaved as they're emitted from each sourceWith async operations
obs := ro.Pipe(
ro.Just("task1", "task2"),
ro.MergeWith(
ro.Pipe(
ro.Just("async1", "async2"),
ro.MapAsync(func(task string) ro.Observable[string] {
return ro.Defer(func() ro.Observable[string] {
time.Sleep(50 * time.Millisecond)
return ro.Just(task + "_done")
})
}, 2),
),
),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(300 * time.Millisecond)
defer sub.Unsubscribe()
// Values interleaved as async operations completeWith conditional merging
shouldMerge := true
var secondary ro.Observable[int] = ro.Just(4, 5, 6)
if !shouldMerge {
secondary = ro.Empty[int]()
}
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MergeWith(secondary),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Conditional merging based on runtime logicSimilar:Prototypes:func MergeWith[T any](obsB ro.Observable[T])
func MergeWith1[T any](obsB ro.Observable[T])
func MergeWith2[T any](obsB ro.Observable[T], obsC ro.Observable[T])
func MergeWith3[T any](obsB ro.Observable[T], obsC ro.Observable[T], obsD ro.Observable[T])
func MergeWith4[T any](obsB ro.Observable[T], obsC ro.Observable[T], obsD ro.Observable[T], obsE ro.Observable[T])
func MergeWith5[T any](obsB ro.Observable[T], obsC ro.Observable[T], obsD ro.Observable[T], obsE ro.Observable[T], obsF ro.Observable[T])Ambโ
Creates an Observable that mirrors the first source Observable to emit a next, error or complete notification. It's an alias for Race.
The Observable cancels subscriptions to all other Observables once one emits. It completes when the winning source Observable completes.
obs1 := ro.Pipe(
ro.Just("fast"),
ro.Delay(100*time.Millisecond),
)
obs2 := ro.Pipe(
ro.Just("slow"),
ro.Delay(200*time.Millisecond),
)
obs3 := ro.Pipe(
ro.Just("slowest"),
ro.Delay(300*time.Millisecond),
)
ambObs := ro.Amb(obs1, obs2, obs3)
sub := ambObs.Subscribe(ro.PrintObserver[string]())
time.Sleep(400 * time.Millisecond)
sub.Unsubscribe()
// Next: fast
// CompletedWith immediate winner
instant := ro.Just("immediate")
delayed := ro.Pipe(
ro.Just("delayed"),
ro.Delay(100*time.Millisecond),
)
ambObs := ro.Amb(delayed, instant)
sub := ambObs.Subscribe(ro.PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
sub.Unsubscribe()
// Next: immediate
// CompletedWith error propagation
obs1 := ro.Pipe(
ro.Just("success"),
ro.Delay(100*time.Millisecond),
)
obs2 := ro.Pipe(
ro.Throw[string](errors.New("failed")),
)
ambObs := ro.Amb(obs1, obs2)
sub := ambObs.Subscribe(ro.PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
sub.Unsubscribe()
// Next: success
// CompletedWith empty sources
ambObs := ro.Amb[string]() // No sources provided
sub := ambObs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Completed (empty observable)Variant:Prototypes:func Amb[T any](sources ...Observable[T])
func Race[T any](sources ...Observable[T])CombineLatestWithโ
Creates an Observable that combines the latest values from the source Observable with other provided Observables, emitting tuples of the most recent values.
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.CombineLatestWith(ro.Just("A", "B", "C")),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
defer sub.Unsubscribe()
// Next: (1, A)
// Next: (2, A)
// Next: (2, B)
// Next: (3, B)
// Next: (3, C)
// CompletedCombineLatestWith2 (three observables)
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.CombineLatestWith2(
ro.Just("A", "B", "C"),
ro.Just(true, false),
),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
defer sub.Unsubscribe()
// Next: (1, A, true)
// Next: (2, A, true)
// Next: (2, B, true)
// Next: (3, B, true)
// Next: (3, B, false)
// Next: (3, C, false)
// CompletedChain multiple CombineLatestWith
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.CombineLatestWith(ro.Just("A", "B")),
ro.CombineLatestWith(ro.Just(true, false)),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
defer sub.Unsubscribe()
// Same result as CombineLatestWith2
// Next: (1, A, true)
// Next: (2, A, true)
// Next: (2, B, true)
// Next: (3, B, true)
// Next: (3, B, false)
// CompletedCombineLatestWith3 (four observables)
obs := ro.Pipe(
ro.Just(1, 2),
ro.CombineLatestWith3(
ro.Just("A", "B"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple4[int, string, bool, float64]]())
defer sub.Unsubscribe()
// Combines latest from all four sources
// Next: (1, A, true, 1.1)
// Next: (2, A, true, 1.1)
// Next: (2, B, true, 1.1)
// Next: (2, B, false, 1.1)
// Next: (2, B, false, 2.2)
// CompletedPractical example: Combining data streams
// User IDs stream
userIDs := ro.Just(1, 2, 3)
// User details stream (slower)
userDetails := ro.Pipe(
ro.Just(
User{1, "Alice"},
User{2, "Bob"},
User{3, "Charlie"},
),
ro.Map(func(u User) int { return u.ID }), // Extract IDs
)
// Permissions stream
permissions := ro.Just("admin", "user", "guest")
// Combine user IDs with their details and permissions
obs := ro.Pipe(
userIDs,
ro.CombineLatestWith2(userDetails, permissions),
ro.Map(func(t lo.Tuple3[int, int, string]) string {
userID, detailID, perm := t.Get3()
return fmt.Sprintf("User %d (detail %d): %s", userID, detailID, perm)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Outputs combinations of latest values from all streamsPrototypes:func CombineLatestWith[A any, B any](obsB Observable[B])
func CombineLatestWith1[A any, B any](obsB Observable[B])
func CombineLatestWith2[A any, B any, C any](obsB Observable[B], obsC Observable[C])
func CombineLatestWith3[A any, B any, C any, D any](obsB Observable[B], obsC Observable[C], obsD Observable[D])
func CombineLatestWith4[A any, B any, C any, D any, E any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E])Zipโ
Creates an Observable that combines the values from multiple source Observables by emitting tuples or arrays of values in the order they were zipped.
Zip2
obs := ro.Zip2(
ro.Just(1, 2, 3),
ro.Just("A", "B", "C"),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
defer sub.Unsubscribe()
// Next: (1, A)
// Next: (2, B)
// Next: (3, C)
// CompletedZip3
obs := ro.Zip3(
ro.Just(1, 2, 3),
ro.Just("A", "B", "C"),
ro.Just(true, false, true),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
defer sub.Unsubscribe()
// Next: (1, A, true)
// Next: (2, B, false)
// Next: (3, C, true)
// CompletedZip4
obs := ro.Zip4(
ro.Just(1, 2),
ro.Just("A", "B"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple4[int, string, bool, float64]]())
defer sub.Unsubscribe()
// Next: (1, A, true, 1.1)
// Next: (2, B, false, 2.2)
// CompletedZip5
obs := ro.Zip5(
ro.Just(1, 2),
ro.Just("A", "B"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
ro.Just([]int{10, 20}),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple5[int, string, bool, float64, []int]]())
defer sub.Unsubscribe()
// Next: (1, A, true, 1.1, [10, 20])
// Next: (2, B, false, 2.2, [10, 20])
// CompletedZip6
obs := ro.Zip6(
ro.Just(1, 2),
ro.Just("A", "B"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
ro.Just([]int{10, 20}),
ro.Just("x", "y"),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple6[int, string, bool, float64, []int, string]]())
defer sub.Unsubscribe()
// Next: (1, A, true, 1.1, [10, 20], "x")
// Next: (2, B, false, 2.2, [10, 20], "y")
// CompletedZip with multiple sources
obs := ro.Zip(
ro.Just(1, 2, 3),
ro.Just("A", "B", "C"),
ro.Just(true, false, true),
)
sub := obs.Subscribe(ro.PrintObserver[[]any]())
defer sub.Unsubscribe()
// Next: [1, A, true]
// Next: [2, B, false]
// Next: [3, C, true]
// CompletedEdge case: Different length observables
obs := ro.Zip2(
ro.Just(1, 2, 3, 4, 5), // 5 items
ro.Just("A", "B"), // 2 items
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
defer sub.Unsubscribe()
// Next: (1, A)
// Next: (2, B)
// Completed
// Only zips up to the shortest observableEdge case: Single observable
obs := ro.Zip(
ro.Just(1, 2, 3),
)
sub := obs.Subscribe(ro.PrintObserver[[]any]())
defer sub.Unsubscribe()
// Next: [1]
// Next: [2]
// Next: [3]
// CompletedSimilar:Prototypes:func Zip[T any](sources ...Observable[T])
func Zip2[A any, B any](obsA Observable[A], obsB Observable[B])
func Zip3[A any, B any, C any](obsA Observable[A], obsB Observable[B], obsC Observable[C])
func Zip4[A any, B any, C any, D any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D])
func Zip5[A any, B any, C any, D any, E any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E])
func Zip6[A any, B any, C any, D any, E any, F any](obsA Observable[A], obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E], obsF Observable[F])ZipWithโ
Creates an Observable that combines values from the source Observable with other source Observables using a pipe operator pattern, emitting tuples of zipped values.
ZipWith
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.ZipWith(ro.Just("A", "B", "C")),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
defer sub.Unsubscribe()
// Next: (1, A)
// Next: (2, B)
// Next: (3, C)
// CompletedZipWith2 (three sources)
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.ZipWith2(
ro.Just("A", "B", "C"),
ro.Just(true, false, true),
),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
defer sub.Unsubscribe()
// Next: (1, A, true)
// Next: (2, B, false)
// Next: (3, C, true)
// CompletedZipWith3 (four sources)
obs := ro.Pipe(
ro.Just(1, 2),
ro.ZipWith3(
ro.Just("A", "B"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple4[int, string, bool, float64]]())
defer sub.Unsubscribe()
// Next: (1, A, true, 1.1)
// Next: (2, B, false, 2.2)
// CompletedZipWith4 (five sources)
obs := ro.Pipe(
ro.Just(1, 2),
ro.ZipWith4(
ro.Just("A", "B"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
ro.Just([]int{10, 20}),
),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple5[int, string, bool, float64, []int]]())
defer sub.Unsubscribe()
// Next: (1, A, true, 1.1, [10, 20])
// Next: (2, B, false, 2.2, [10, 20])
// CompletedZipWith5 (six sources)
obs := ro.Pipe(
ro.Just(1, 2),
ro.ZipWith5(
ro.Just("A", "B"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
ro.Just([]int{10, 20}),
ro.Just("x", "y"),
),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple6[int, string, bool, float64, []int, string]]())
defer sub.Unsubscribe()
// Next: (1, A, true, 1.1, [10, 20], "x")
// Next: (2, B, false, 2.2, [10, 20], "y")
// CompletedWith different emission rates
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.ZipWith(
ro.Pipe(ro.Interval(100*time.Millisecond), ro.Take[string](3)), // "A", "B", "C"
),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple2[int, string]]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Values are zipped as they arrive
// Next: (1, A)
// Next: (2, B)
// Next: (3, C)
// CompletedEdge case: Different length observables
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5), // 5 items
ro.ZipWith2(
ro.Just("A", "B"), // 2 items
ro.Just(true, false), // 2 items
),
)
sub := obs.Subscribe(ro.PrintObserver[lo.Tuple3[int, string, bool]]())
defer sub.Unsubscribe()
// Next: (1, A, true)
// Next: (2, B, false)
// Completed
// Only zips up to the shortest observableSimilar:Prototypes:func ZipWith[A any, B any](obsB Observable[B])
func ZipWith1[A any, B any](obsB Observable[B])
func ZipWith2[A any, B any, C any](obsB Observable[B], obsC Observable[C])
func ZipWith3[A any, B any, C any, D any](obsB Observable[B], obsC Observable[C], obsD Observable[D])
func ZipWith4[A any, B any, C any, D any, E any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E])
func ZipWith5[A any, B any, C any, D any, E any, F any](obsB Observable[B], obsC Observable[C], obsD Observable[D], obsE Observable[E], obsF Observable[F])ZipAllโ
Creates an Observable that zips items from multiple Observable sources provided as a higher-order Observable, emitting arrays of zipped values.
ro.Just(
ro.Just(1, 2, 3),
ro.Just("A", "B", "C"),
ro.Just(true, false, true),
),
ro.ZipAll[any](),)
sub := obs.Subscribe(ro.PrintObserver[]any)
defer sub.Unsubscribe()// Next: [1, A, true]
// Next: [2, B, false]
// Next: [3, C, true]
// Completed
### Dynamic observable collection
```go
// Create observables dynamically
observables := []Observable[int]{
ro.Just(1, 2),
ro.Just(3, 4),
ro.Just(5, 6),
}
obs := ro.Pipe(
ro.Just(observables...),
ro.ZipAll[int](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [1, 3, 5]
// Next: [2, 4, 6]
// CompletedWith different types
obs := ro.Pipe(
ro.Just(
ro.Just(1, 2),
ro.Just("A", "B"),
ro.Just(true, false),
ro.Just(1.1, 2.2),
),
ro.ZipAll[any](),
)
sub := obs.Subscribe(ro.PrintObserver[[]any]())
defer sub.Unsubscribe()
// Next: [1, "A", true, 1.1]
// Next: [2, "B", false, 2.2]
// CompletedEdge case: Empty observable of observables
obs := ro.Pipe(
ro.Empty[Observable[int]](),
ro.ZipAll[int](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// No items emitted, completes immediately
// CompletedEdge case: Single observable in collection
obs := ro.Pipe(
ro.Just(ro.Just(1, 2, 3)),
ro.ZipAll[int](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [1]
// Next: [2]
// Next: [3]
// CompletedEdge case: Different length observables
obs := ro.Pipe(
ro.Just(
ro.Just(1, 2, 3, 4, 5), // 5 items
ro.Just("A", "B"), // 2 items
ro.Just(true, false), // 2 items
),
ro.ZipAll[any](),
)
sub := obs.Subscribe(ro.PrintObserver[[]any]())
defer sub.Unsubscribe()
// Next: [1, "A", true]
// Next: [2, "B", false]
// Completed
// Only zips up to the shortest observableSimilar:Prototype:func ZipAll[T any]()
CombineLatestAllโ
Creates an Observable that combines the latest values from multiple Observable sources provided as a higher-order Observable, emitting arrays of the most recent values.
obs := ro.Pipe(
ro.Just(
ro.Just(1, 2),
ro.Just("A", "B"),
ro.Just(true, false),
),
ro.CombineLatestAll[any](),
)
sub := obs.Subscribe(ro.PrintObserver[[]any]())
defer sub.Unsubscribe()
// Next: [1, A, true]
// Next: [2, A, true]
// Next: [2, B, true]
// Next: [2, B, false]
// CompletedWith different emission rates
obs := ro.Pipe(
ro.Just(
ro.Pipe(ro.Interval(100*time.Millisecond), ro.Take[int64](3)), // 0,1,2
ro.Pipe(ro.Interval(200*time.Millisecond), ro.Take[int64](2)), // 0,1
ro.Pipe(ro.Interval(300*time.Millisecond), ro.Take[int64](1)), // 0
),
ro.CombineLatestAll[int64](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int64]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Combines latest from all sources
// Next: [0, 0, 0]
// Next: [1, 0, 0]
// Next: [1, 1, 0]
// Next: [2, 1, 0]
// CompletedDynamic observable collection
// Create observables dynamically
observables := []Observable[int]{
ro.Just(1, 2),
ro.Just(10, 20),
ro.Just(100, 200),
}
obs := ro.Pipe(
ro.Just(observables...),
ro.CombineLatestAll[int](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [1, 10, 100]
// Next: [2, 10, 100]
// Next: [2, 20, 100]
// Next: [2, 20, 200]
// CompletedCombineLatestAllAny for mixed types
obs := ro.Pipe(
ro.Just(
ro.Just(1, 2), // int
ro.Just("A", "B"), // string
ro.Just(true, false), // bool
),
ro.CombineLatestAllAny(),
)
sub := obs.Subscribe(ro.PrintObserver[[]any]())
defer sub.Unsubscribe()
// Next: [1, A, true]
// Next: [2, A, true]
// Next: [2, B, true]
// Next: [2, B, false]
// CompletedEdge case: Empty observable of observables
obs := ro.Pipe(
ro.Empty[Observable[int]](),
ro.CombineLatestAll[int](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// No items emitted, completes immediately
// CompletedEdge case: Single observable in collection
obs := ro.Pipe(
ro.Just(ro.Just(1, 2, 3)),
ro.CombineLatestAll[int](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [1]
// Next: [2]
// Next: [3]
// CompletedVariant:Prototypes:func CombineLatestAll[T any]()
func CombineLatestAllAny()MergeMapโ
Transforms each item from the source Observable into an Observable, then merges the resulting Observables, emitting all items as they are emitted from any transformed Observable.
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MergeMap(func(n int) Observable[string] {
return ro.Just(fmt.Sprintf("item-%d", n))
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Order may vary due to interleaving
// Next: item-1
// Next: item-2
// Next: item-3
// CompletedWith index parameter
obs := ro.Pipe(
ro.Just("A", "B", "C"),
ro.MergeMapI(func(letter string, index int64) Observable[string] {
return ro.Just(fmt.Sprintf("%s-%d", letter, index))
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: A-0
// Next: B-1
// Next: C-2
// CompletedWith context
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MergeMapWithContext(func(ctx context.Context, n int) (context.Context, Observable[int]) {
// Can use context for cancellation or values
return ctx, ro.Just(n * 10)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 10
// Next: 20
// Next: 30
// CompletedWith both context and index
obs := ro.Pipe(
ro.Just("hello", "world"),
ro.MergeMapIWithContext(func(ctx context.Context, word string, index int64) (context.Context, Observable[int]) {
return ctx, ro.Just(len(word))
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 5
// Next: 5
// CompletedWith async operations
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MergeMap(func(n int) Observable[int] {
return ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Take[int64](2),
ro.Map(func(_ int64) int { return n }),
)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Values interleaved from all inner observables
// Example output: 1, 1, 2, 2, 3, 3With error handling
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MergeMap(func(n int) Observable[int] {
if n == 2 {
return ro.Error[int](errors.New("error for 2"))
}
return ro.Just(n * 10)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 10
// Next: 20
// Next: 30
// CompletedEdge case: Empty source
obs := ro.Pipe(
ro.Empty[int](),
ro.MergeMap(func(n int) Observable[string] {
return ro.Just(fmt.Sprintf("item-%d", n))
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// No items emitted, completes immediately
// CompletedPrototypes:func MergeMap[T any, R any](project func(value T) Observable[R])
func MergeMapI[T any, R any](project func(value T, index int64) Observable[R])
func MergeMapWithContext[T any, R any](project func(ctx context.Context, value T) (context.Context, Observable[R]))
func MergeMapIWithContext[T any, R any](project func(ctx context.Context, value T, index int64) (context.Context, Observable[R]))Raceโ
Creates an Observable that mirrors the first source Observable to emit an item or send a notification. Amb is an alias for Race.
fast := ro.Pipe(
ro.Timer(100*time.Millisecond),
ro.Map(func(_ int64) string { return "fast" }),
)
slow := ro.Pipe(
ro.Timer(200*time.Millisecond),
ro.Map(func(_ int64) string { return "slow" }),
)
obs := ro.Race(fast, slow)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(300 * time.Millisecond)
sub.Unsubscribe()
// Next: "fast" (from the faster observable)
// CompletedWith multiple sources
sources := []Observable[int]{
ro.Pipe(ro.Timer(300*time.Millisecond), ro.Map(func(_ int64) int { return 1 })),
ro.Pipe(ro.Timer(100*time.Millisecond), ro.Map(func(_ int64) int { return 2 })),
ro.Pipe(ro.Timer(200*time.Millisecond), ro.Map(func(_ int64) int { return 3 })),
}
obs := ro.Race(sources...)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(400 * time.Millisecond)
sub.Unsubscribe()
// Next: 2 (from the 100ms timer)
// CompletedWith error handling
success := ro.Pipe(ro.Timer(200*time.Millisecond), ro.Map(func(_ int64) int { return 42 }))
failure := ro.Pipe(ro.Timer(100*time.Millisecond), ro.Throw[int](errors.New("failed")))
obs := Race(success, failure)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(300 * time.Millisecond)
sub.Unsubscribe()
// Error: failedTimeout pattern
data := ro.Future[int](func(resolve func(int), reject func(error)) {
go func() {
time.Sleep(2 * time.Second)
resolve(42)
}()
})
timeout := ro.Timer(1 * time.Second)
result := Race(data, timeout)
sub := result.Subscribe(ro.PrintObserver[any]())
time.Sleep(2500 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (from timeout after 1 second)
// CompletedFallback pattern
primary := ro.Future[string](func(resolve func(string), reject func(error)) {
go func() {
time.Sleep(500 * time.Millisecond)
resolve("primary result")
}()
})
fallback := ro.Just("fallback value")
result := Race(primary, fallback)
sub := result.Subscribe(ro.PrintObserver[string]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Next: "fallback value"
// CompletedVariant:Prototypes:func Race[T any](sources ...Observable[T])
func Amb[T any](sources ...Observable[T])Concatโ
Creates an Observable that concatenates multiple source Observables, emitting all items from each source sequentially.
obs := ro.Concat(
ro.Just(1, 2, 3),
ro.Just(4, 5, 6),
ro.Just(7, 8, 9),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1, 2, 3 (from first observable)
// Next: 4, 5, 6 (from second observable)
// Next: 7, 8, 9 (from third observable)
// CompletedWith time-based sources
obs := ro.Concat(
ro.Pipe(ro.Just(1), ro.Delay(100*time.Millisecond)),
ro.Pipe(ro.Just(2), ro.Delay(100*time.Millisecond)),
ro.Pipe(ro.Just(3), ro.Delay(100*time.Millisecond)),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Next: 1 (after 100ms)
// Next: 2 (after 200ms total)
// Next: 3 (after 300ms total)
// CompletedWith empty observables
obs := ro.Concat(
ro.Just(1, 2),
ro.Empty[int](),
ro.Just(3, 4),
ro.Empty[int](),
ro.Just(5),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// (empty observable emits nothing)
// Next: 3
// Next: 4
// (empty observable emits nothing)
// Next: 5
// CompletedError propagation
obs := ro.Concat(
ro.Just(1, 2),
ro.Pipe(ro.Just(3), ro.Throw[int](errors.New("error"))),
ro.Just(4, 5),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1, 2
// Error: error (subsequent observables are not processed)Similar:Prototype:func Concat[T any](sources ...Observable[T]) Observable[T]
StartWithโ
Emits the given values before emitting the values from the source Observable.
obs := ro.Pipe(
ro.Just("a", "b", "c"),
ro.StartWith("x", "y"),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: x
// Next: y
// Next: a
// Next: b
// Next: c
// CompletedWith single prefix value
obs := ro.Pipe(
ro.Just(2, 3, 4),
ro.StartWith(1),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// CompletedWith multiple prefix values
obs := ro.Pipe(
ro.Just("c"),
ro.StartWith("a", "b"),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: a
// Next: b
// Next: c
// CompletedWith empty source observable
obs := ro.Pipe(
ro.Empty[string](),
ro.StartWith("prefix1", "prefix2"),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: prefix1
// Next: prefix2
// CompletedWith no prefix values
obs := ro.Pipe(
ro.Just("a", "b"),
ro.StartWith[string](),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: a
// Next: b
// CompletedWith time-based observable
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.StartWith(int64(-1)),
ro.Take(3),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(400 * time.Millisecond)
sub.Unsubscribe()
// Next: -1
// Next: 0
// Next: 1
// Next: 2
// CompletedSimilar:Prototype:func StartWith[T any](prefixes ...T)
Pairwiseโ
Emits the previous and current values as a pair (array of two values). The first value doesn't emit until the second value arrives.
obs := ro.Pipe(
ro.Just("a", "b", "c", "d"),
ro.Pairwise(),
)
sub := obs.Subscribe(ro.PrintObserver[[]string]())
defer sub.Unsubscribe()
// Next: [a b]
// Next: [b c]
// Next: [c d]
// CompletedWith numbers
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Pairwise(),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [1 2]
// Next: [2 3]
// Next: [3 4]
// Next: [4 5]
// CompletedWith single value
obs := ro.Pipe(
ro.Just("only one"),
ro.Pairwise(),
)
sub := obs.Subscribe(ro.PrintObserver[[]string]())
defer sub.Unsubscribe()
// Completed (no pairs emitted)With empty observable
obs := ro.Pipe(
ro.Empty[string](),
ro.Pairwise(),
)
sub := obs.Subscribe(ro.PrintObserver[[]string]())
defer sub.Unsubscribe()
// Completed (no pairs emitted)With time-based emissions
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Pairwise(),
ro.Take(4),
)
sub := obs.Subscribe(ro.PrintObserver[[]int64]())
time.Sleep(600 * time.Millisecond)
sub.Unsubscribe()
// Next: [0 1]
// Next: [1 2]
// Next: [2 3]
// Next: [3 4]
// CompletedWith custom types
type Point struct {
X, Y int
}
obs := ro.Pipe(
ro.Just(
Point{1, 2},
Point{3, 4},
Point{5, 6},
),
ro.Pairwise(),
)
sub := obs.Subscribe(ro.PrintObserver[[]Point]())
defer sub.Unsubscribe()
// Next: [{1 2} {3 4}]
// Next: [{3 4} {5 6}]
// CompletedSimilar:Prototype:func Pairwise[T any]()