This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Utility operatorsโ
This page lists all utility operators, available in the core package of ro.
Doโ
Performs side effects for each emission from an Observable, without modifying the emitted values.
var nextCount, errorCount, completeCount int
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.Do(
func(value int) {
nextCount++
fmt.Printf("Next: %d\n", value)
},
func(err error) {
errorCount++
fmt.Printf("Error: %v\n", err)
},
func() {
completeCount++
fmt.Println("Complete")
},
),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Completed
fmt.Printf("Counts: next=%d, error=%d, complete=%d\n", nextCount, errorCount, completeCount)
// Counts: next=3, error=0, complete=1With context
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.DoWithContext(
func(ctx context.Context, value int) {
fmt.Printf("Next with context: %d\n", value)
},
func(ctx context.Context, err error) {
fmt.Printf("Error with context: %v\n", err)
},
func(ctx context.Context) {
fmt.Println("Complete with context")
},
),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()DoOnNext
var nextValues []int
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.DoOnNext(func(value int) {
nextValues = append(nextValues, value)
fmt.Printf("Received: %d\n", value)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Received: 1
// Received: 2
// Received: 3
fmt.Printf("Collected values: %v\n", nextValues)
// Collected values: [1 2 3]DoOnError
var lastError error
obs := ro.Pipe(
ro.Throw[int](fmt.Errorf("test error")),
ro.DoOnError(func(err error) {
lastError = err
fmt.Printf("Error captured: %v\n", err)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Error captured: test error
// Error: test errorDoOnComplete
var completed bool
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.DoOnComplete(func() {
completed = true
fmt.Println("Stream completed")
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Stream completedDoOnSubscribe
var subscribed bool
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.DoOnSubscribe(func() {
subscribed = true
fmt.Println("Subscribed to observable")
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Subscribed to observableDoOnFinalize
var finalized bool
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.DoOnFinalize(func() {
finalized = true
fmt.Println("Observable finalized")
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Observable finalizedSimilar:Prototypes:func Do[T any](onNext func(value T), onError func(err error), onComplete func())
func DoWithContext[T any](onNext func(ctx context.Context, value T), onError func(ctx context.Context, err error), onComplete func(ctx context.Context))
func DoOnNext[T any](onNext func(value T))
func DoOnNextWithContext[T any](onNext func(ctx context.Context, value T))
func DoOnError[T any](onError func(err error))
func DoOnErrorWithContext[T any](onError func(ctx context.Context, err error))
func DoOnComplete[T any](onComplete func())
func DoOnCompleteWithContext[T any](onComplete func(ctx context.Context))
func DoOnSubscribe[T any](onSubscribe func())
func DoOnSubscribeWithContext[T any](onSubscribe func(ctx context.Context))
func DoOnFinalize[T any](onFinalize func())Tapโ
Allows you to perform side effects for notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer.
var nextCount, errorCount, completeCount int
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.Tap(
func(value int) {
nextCount++
fmt.Printf("Next: %d\n", value)
},
func(err error) {
errorCount++
fmt.Printf("Error: %v\n", err)
},
func() {
completeCount++
fmt.Println("Complete")
},
),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Completed
fmt.Printf("Counts: next=%d, error=%d, complete=%d\n", nextCount, errorCount, completeCount)
// Counts: next=3, error=0, complete=1With context
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.TapWithContext(
func(ctx context.Context, value int) {
fmt.Printf("Next with context: %d\n", value)
},
func(ctx context.Context, err error) {
fmt.Printf("Error with context: %v\n", err)
},
func(ctx context.Context) {
fmt.Println("Complete with context")
},
),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()TapOnNext
var nextValues []int
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.TapOnNext(func(value int) {
nextValues = append(nextValues, value)
fmt.Printf("Received: %d\n", value)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Received: 1
// Received: 2
// Received: 3
fmt.Printf("Collected values: %v\n", nextValues)
// Collected values: [1 2 3]TapOnError
var lastError error
obs := ro.Pipe(
ro.Throw[int](fmt.Errorf("test error")),
ro.TapOnError(func(err error) {
lastError = err
fmt.Printf("Error captured: %v\n", err)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Error captured: test error
// Error: test errorTapOnComplete
var completed bool
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.TapOnComplete(func() {
completed = true
fmt.Println("Stream completed")
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Stream completedWith context error handling
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.TapOnErrorWithContext(func(ctx context.Context, err error) {
fmt.Printf("Error with context: %v\n", err)
}),
ro.TapOnCompleteWithContext(func(ctx context.Context) {
fmt.Println("Completed with context")
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()For debugging
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Take[int64](3),
ro.TapOnNext(func(value int64) {
fmt.Printf("[DEBUG] Received: %d\n", value)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// [DEBUG] Received: 0
// [DEBUG] Received: 1
// [DEBUG] Received: 2With error scenarios
obs := ro.Pipe(
ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, fmt.Errorf("error on 3")
}
return i, nil
}),
),
ro.Tap(
func(value int) {
fmt.Printf("Next tap: %d\n", value)
},
func(err error) {
fmt.Printf("Error tap: %v\n", err)
},
func() {
fmt.Println("Complete tap")
},
),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next tap: 1
// Next tap: 2
// Error tap: error on 3With hot observables
source := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.TapOnNext(func(value int64) {
fmt.Printf("Source value: %d\n", value)
}),
)
// Multiple subscribers get the same tap side effects
sub1 := source.Subscribe(ro.PrintObserver[int64]())
sub2 := source.Subscribe(ro.PrintObserver[int64]())
time.Sleep(350 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
// Each subscriber triggers the tap side effectsWith cleanup operations
cleanup := func() {
fmt.Println("Cleaning up resources...")
}
obs := ro.Pipe(
ro.Just("data1", "data2"),
ro.TapOnComplete(cleanup),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Cleaning up resources...Similar:Prototypes:func Tap[T any](onNext func(value T), onError func(err error), onComplete func())
func TapWithContext[T any](onNext func(ctx context.Context, value T), onError func(ctx context.Context, err error), onComplete func(ctx context.Context))
func TapOnNext[T any](onNext func(value T))
func TapOnNextWithContext[T any](onNext func(ctx context.Context, value T))
func TapOnError[T any](onError func(err error))
func TapOnErrorWithContext[T any](onError func(ctx context.Context, err error))
func TapOnComplete[T any](onComplete func())
func TapOnCompleteWithContext[T any](onComplete func(ctx context.Context))Timeoutโ
Raises an error if the source Observable does not emit any item within the specified duration. The timeout resets after each emission.
obs := ro.Pipe(
ro.Interval(200*time.Millisecond),
ro.Timeout(100*time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(300 * time.Millisecond)
sub.Unsubscribe()
// Error: timeout after 100msWith fast emissions (no timeout)
obs := ro.Pipe(
ro.Interval(50*time.Millisecond),
ro.Timeout(200*time.Millisecond),
ro.Take(3),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(300 * time.Millisecond)
sub.Unsubscribe()
// Next: 0
// Next: 1
// Next: 2
// CompletedWith slow emissions (timeout occurs)
obs := ro.Pipe(
ro.Interval(500*time.Millisecond),
ro.Timeout(200*time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(800 * time.Millisecond)
sub.Unsubscribe()
// Error: timeout after 200msWith delayed first emission
obs := ro.Pipe(
ro.Pipe(
ro.Just("delayed"),
ro.Delay(300*time.Millisecond),
),
ro.Timeout(100*time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Error: timeout after 100ms (before first emission)With error in source (propagates immediately)
obs := ro.Pipe(
ro.Pipe(
ro.Just("will error"),
ro.Throw[string](errors.New("source error")),
),
ro.Timeout(1*time.Second),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: source error (propagates before timeout)With multiple emissions and varying intervals
obs := ro.Pipe(
ro.Pipe(
ro.Just("fast"),
ro.Delay(50*time.Millisecond),
),
ro.Pipe(
ro.Just("slow"),
ro.Delay(300*time.Millisecond),
),
ro.Timeout(200*time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(400 * time.Millisecond)
sub.Unsubscribe()
// Next: fast (emitted within timeout)
// Error: timeout after 200ms (waiting for slow)Similar:Prototype:func Timeout[T any](duration time.Duration)
SubscribeOnโ
Schedule the upstream flow to a different goroutine. This detaches the subscription from the current goroutine and processes emissions in a separate goroutine.
obs := ro.Pipe(
ro.Just("main", "thread"),
ro.SubscribeOn(10), // Process in background goroutine with buffer size 10
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: main (processed in background goroutine)
// Next: thread (processed in background goroutine)
// CompletedWith heavy computations
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Map(func(i int) int {
// Simulate heavy computation
time.Sleep(100 * time.Millisecond)
return i * i
}),
ro.SubscribeOn(100), // Large buffer for heavy computations
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1, 4, 9, 16, 25 (computed in background goroutine)
// CompletedWith backpressure control
obs := ro.Pipe(
ro.Interval(10*time.Millisecond), // Fast producer
ro.SubscribeOn(5), // Small buffer causes backpressure
)
sub := obs.Subscribe(ro.NewObserver[int64](
func(value int64) {
time.Sleep(50 * time.Millisecond) // Slow consumer
fmt.Printf("Next: %d\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
time.Sleep(200 * time.Millisecond)
sub.Unsubscribe()
// Next: values processed in background goroutine with backpressure
// CompletedWith error handling
obs := ro.Pipe(
ro.Throw[string](errors.New("background error")),
ro.SubscribeOn(10),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: background errorWith multiple operators
obs := ro.Pipe(
ro.Just("a", "b", "c"),
ro.Map(func(s string) string { return strings.ToUpper(s) }),
ro.Filter(func(s string) bool { return s != "B" }),
ro.SubscribeOn(20), // All upstream operations in background
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: A
// Next: C
// CompletedSimilar:Prototype:func SubscribeOn[T any](bufferSize int)
ObserveOnโ
Schedule the downstream flow to a different goroutine. Converts a push-based Observable into a pullable stream with backpressure capabilities.
obs := ro.Pipe(
ro.Just("fast", "emissions"),
ro.ObserveOn(2), // Small buffer size
)
sub := obs.Subscribe(ro.NewObserver[string](
func(value string) {
time.Sleep(200 * time.Millisecond) // Slow consumer
fmt.Printf("Next: %s\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: fast (after 200ms delay)
// Next: emissions (after 200ms delay)
// CompletedWith backpressure control
obs := ro.Pipe(
ro.Interval(10*time.Millisecond), // Fast producer
ro.ObserveOn(5), // Small buffer for backpressure
)
sub := obs.Subscribe(ro.NewObserver[int64](
func(value int64) {
time.Sleep(100 * time.Millisecond) // Slow consumer
fmt.Printf("Next: %d\n", value)
},
))
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Next: values processed with backpressure control
// (Fast producer will be blocked when buffer is full)With large buffer
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
ro.ObserveOn(100), // Large buffer
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 (processed in background goroutine)
// CompletedWith error propagation
obs := ro.Pipe(
ro.Pipe(
ro.Just("will error"),
ro.Throw[string](errors.New("propagated error")),
),
ro.ObserveOn(10),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: propagated errorCombined with SubscribeOn
obs := ro.Pipe(
ro.Just("background", "processing"),
ro.SubscribeOn(10), // Upstream in background
ro.Map(strings.ToUpper),
ro.ObserveOn(5), // Downstream in different background thread
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: BACKGROUND
// Next: PROCESSING
// CompletedSimilar:Prototype:func ObserveOn[T any](bufferSize int)
TimeIntervalโ
Records the interval of time between emissions from the source Observable and emits this information as ro.IntervalValue objects.
obs := ro.Pipe(
ro.Just("A", "B", "C"),
ro.TimeInterval[string](),
)
sub := obs.Subscribe(ro.PrintObserver[ro.IntervalValue[string]]())
defer sub.Unsubscribe()
// Shows interval between emissions:
// Next: {Value:A Interval:0ms}
// Next: {Value:B Interval:<time between A and B>}
// Next: {Value:C Interval:<time between B and C>}
// CompletedWith hot observable
obs := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.Take[int64](5),
ro.TimeInterval[int64](),
)
sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[int64]) {
fmt.Printf("Value: %d, Interval: %v\n", value.Value, value.Interval)
))
time.Sleep(700 * time.Millisecond)
sub.Unsubscribe()
//
// Value: 0, Interval: 0s
// Value: 1, Interval: ~100ms
// Value: 2, Interval: ~100ms
// Value: 3, Interval: ~100ms
// Value: 4, Interval: ~100msWith async operations
obs := ro.Pipe(
ro.Pipe(
ro.Just("task1", "task2", "task3"),
ro.MapAsync(func(task string) Observable[string] {
return ro.Defer(func() Observable[string] {
delay := time.Duration(rand.Intn(200)) * time.Millisecond
time.Sleep(delay)
return ro.Just(task)
})
}, 2),
),
ro.TimeInterval[string](),
)
sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[string]) {
fmt.Printf("Task: %s completed after %v\n", value.Value, value.Interval)
}))
time.Sleep(500 * time.Millisecond)
defer sub.Unsubscribe()
// Shows variable intervals due to async processingWith error handling
obs := ro.Pipe(
ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, fmt.Errorf("error on 3")
}
time.Sleep(50 * time.Millisecond)
return i, nil
}),
),
ro.TimeInterval[int](),
)
sub := obs.Subscribe(ro.NewObserver(
func(value ro.IntervalValue[int]) {
fmt.Printf("Value: %d, Interval: %v\n", value.Value, value.Interval)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Complete")
},
))
defer sub.Unsubscribe()
//
// Value: 1, Interval: 0s
// Value: 2, Interval: ~50ms
// Error: error on 3With performance monitoring
// Monitor processing time for expensive operations
obs := ro.Pipe(
ro.Just("data1", "data2", "data3"),
ro.Map(func(data string) string {
// Simulate expensive processing
time.Sleep(100 * time.Millisecond)
return "processed_" + data
}),
ro.TimeInterval[string](),
)
sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[string]) {
fmt.Printf("Processed: %s in %v\n", value.Value, value.Interval)
}))
defer sub.Unsubscribe()
//
// Processed: processed_data1 in ~100ms
// Processed: processed_data2 in ~100ms
// Processed: processed_data3 in ~100msWith real-time data stream
// Monitor intervals in real-time data stream
source := ro.Interval(200 * time.Millisecond)
obs := ro.Pipe(
source,
ro.Take[int64](10),
ro.Map(func(ts int64) float64 {
// Simulate sensor reading
return 20.0 + rand.Float64()*10
}),
ro.TimeInterval[float64](),
)
sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[float64]) {
fmt.Printf("[%v] Reading: %.2f (interval: %v)\n",
time.Now().Format("15:04:05.000"),
value.Value,
value.Interval.Round(time.Millisecond))
}))
time.Sleep(2500 * time.Millisecond)
sub.Unsubscribe()With batch processing analysis
// Analyze batch processing times
obs := ro.Pipe(
ro.Range(1, 6),
ro.BufferWithCount[int](2),
ro.TimeInterval[[]int](),
)
sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[[]int]) {
fmt.Printf("Batch %v processed in %v\n", value.Value, value.Interval)
}))
defer sub.Unsubscribe()
//
// Batch [1 2] processed in ~0s
// Batch [3 4] processed in ~<interval>
// Batch [5] processed in ~<interval>With conditional intervals
// Measure intervals only for certain values
obs := ro.Pipe(
ro.Range(1, 10),
ro.Filter(func(i int) bool {
return i%3 == 0 // Only multiples of 3
}),
ro.TimeInterval[int](),
)
sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[int]) {
fmt.Printf("Filtered value: %d (interval: %v)\n", value.Value, value.Interval)
}))
defer sub.Unsubscribe()
//
// Filtered value: 3 (interval: <time from start to 3>)
// Filtered value: 6 (interval: <time between 3 and 6>)
// Filtered value: 9 (interval: <time between 6 and 9>)Similar:Prototype:func TimeInterval[T any]()
Timestampโ
Attaches a timestamp to each emission from the source Observable, indicating when it was emitted.
obs := ro.Pipe(
ro.Just("A", "B", "C"),
ro.Timestamp[string](),
)
sub := obs.Subscribe(ro.PrintObserver[TimestampValue[string]]())
defer sub.Unsubscribe()
// Next: {Value:A Timestamp:<current time>}
// Next: {Value:B Timestamp:<current time>}
// Next: {Value:C Timestamp:<current time>}
// CompletedWith hot observable
obs := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.Take[int64](3),
ro.Timestamp[int64](),
)
sub := obs.Subscribe(ro.OnNext(
func(value TimestampValue[int64]) {
fmt.Printf("Value: %d at %v\n", value.Value, value.Timestamp.Format("15:04:05.000"))
},
))
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Value: 0 at 12:34:56.789
// Value: 1 at 12:34:56.889
// Value: 2 at 12:34:56.989With async operations
obs := ro.Pipe(
ro.Pipe(
ro.Just("task1", "task2", "task3"),
ro.MapAsync(func(task string) Observable[string] {
return ro.Defer(func() Observable[string] {
time.Sleep(50 * time.Millisecond)
return ro.Just(task)
})
}, 2),
),
ro.Timestamp[string](),
)
sub := obs.Subscribe(ro.OnNext(func(value TimestampValue[string]) {
fmt.Printf("Task %s completed at %v\n", value.Value, value.Timestamp.Format("15:04:05.000"))
}))
time.Sleep(300 * time.Millisecond)
defer sub.Unsubscribe()With data logging
type LogEntry struct {
Message string
Level string
Timestamp time.Time
}
obs := ro.Pipe(
ro.Just(
LogEntry{Message: "Server started", Level: "INFO"},
LogEntry{Message: "User connected", Level: "INFO"},
LogEntry{Message: "Database error", Level: "ERROR"},
),
ro.Timestamp[LogEntry](),
)
sub := obs.Subscribe(ro.OnNext(func(value TimestampValue[LogEntry]) {
entry := value.Value
fmt.Printf("[%s] %s: %s\n",
value.Timestamp.Format("2006-01-02 15:04:05"),
entry.Level,
entry.Message)
}))
defer sub.Unsubscribe()
// [2024-01-01 12:34:56] INFO: Server started
// [2024-01-01 12:34:56] INFO: User connected
// [2024-01-01 12:34:56] ERROR: Database errorWith real-time sensor data
type SensorReading struct {
ID string
Value float64
Timestamp time.Time
}
obs := ro.Pipe(
ro.Interval(1 * time.Second),
ro.Take[int64](5),
ro.Map(func(ts int64) SensorReading {
return SensorReading{
ID: "temp-01",
Value: 20.0 + rand.Float64()*10,
Timestamp: time.Now(),
}
}),
ro.Timestamp[SensorReading](),
)
sub := obs.Subscribe(ro.OnNext(
func(value TimestampValue[SensorReading]) {
reading := value.Value
fmt.Printf("[%s] %s: %.2fยฐC (system time: %v)\n",
reading.Timestamp.Format("15:04:05"),
reading.ID,
reading.Value,
value.Timestamp.Format("15:04:05.000"))
},
))
time.Sleep(6 * time.Second)
sub.Unsubscribe()With event ordering
// Track event order and timing
type Event struct {
ID string
Action string
Payload interface{}
}
obs := ro.Pipe(
ro.Just(
Event{ID: "1", Action: "click", Payload: "button"},
Event{ID: "2", Action: "scroll", Payload: 100},
Event{ID: "3", Action: "input", Payload: "text"},
),
ro.Timestamp[Event](),
)
sub := obs.Subscribe(ro.OnNext(func(value TimestampValue[Event]) {
event := value.Value
fmt.Printf("%s | Event %s: %s (%v)\n",
value.Timestamp.Format("15:04:05.000"),
event.ID,
event.Action,
event.Payload)
}))
defer sub.Unsubscribe()
// Shows precise timing of each eventWith batch processing
// Timestamp batch completion times
obs := ro.Pipe(
ro.Range(1, 10),
ro.BufferWithCount[int](3),
ro.Timestamp[[]int](),
)
sub := obs.Subscribe(ro.OnNext(func(value TimestampValue[[]int]) {
fmt.Printf("Batch %v completed at %v\n",
value.Value,
value.Timestamp.Format("15:04:05.000"))
}))
defer sub.Unsubscribe()
// Batch [1 2 3] completed at 12:34:56.789
// Batch [4 5 6] completed at 12:34:56.789
// Batch [7 8 9] completed at 12:34:56.789With error tracking
obs := ro.Pipe(
ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, fmt.Errorf("processing error")
}
return i * 10, nil
}),
),
ro.Timestamp[int](),
)
sub := obs.Subscribe(ro.NewObserver(
func(value TimestampValue[int]) {
fmt.Printf("Success: %d at %v\n", value.Value, value.Timestamp.Format("15:04:05.000"))
},
func(err error) {
fmt.Printf("Error occurred at %v: %v\n", time.Now().Format("15:04:05.000"), err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Success: 10 at 12:34:56.789
// Success: 20 at 12:34:56.789
// Error occurred at 12:34:56.789: processing errorSimilar:Prototype:func Timestamp[T any]()
Delayโ
Shifts the emissions from the source Observable forward in time by a specified amount.
obs := ro.Pipe(
ro.Just("A", "B", "C"),
ro.Delay(100 * time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
defer sub.Unsubscribe()
// (100ms delay)
// Next: A
// Next: B
// Next: C
// CompletedWith hot observable
start := time.Now()
obs := ro.Pipe(
ro.Interval(50 * time.Millisecond),
ro.Take[int64](3),
ro.Delay(200 * time.Millisecond),
)
sub := obs.Subscribe(ro.NewObserver(
func(value int64) {
elapsed := time.Since(start)
fmt.Printf("Value: %d at %v\n", value, elapsed.Round(time.Millisecond))
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Value: 0 at ~250ms (200ms delay + 50ms emission)
// Value: 1 at ~300ms
// Value: 2 at ~350msWith error propagation
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, fmt.Errorf("error on 3")
}
return i, nil
}),
ro.Delay(100 * time.Millisecond),
)
sub := obs.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Next: %d\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
time.Sleep(300 * time.Millisecond)
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Error: error on 3 (delayed by 100ms)With multiple delays in pipeline
obs := ro.Pipe(
ro.Just("start", "middle", "end"),
ro.Delay(50 * time.Millisecond),
ro.Map(func(s string) string {
return s + "_processed"
}),
ro.Delay(25 * time.Millisecond),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
fmt.Printf("Received: %s\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
time.Sleep(200 * time.Millisecond)
defer sub.Unsubscribe()
// Total delay: ~75ms per item
// Received: start_processed
// Received: middle_processed
// Received: end_processedWith async operations
obs := ro.Pipe(
ro.Just("task1", "task2", "task3"),
ro.MapAsync(func(task string) ro.Observable[string] {
return ro.Defer(func() ro.Observable[string] {
time.Sleep(30 * time.Millisecond)
return ro.Just(task + "_done")
})
}, 2),
ro.Delay(100 * time.Millisecond),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
fmt.Printf("Task completed: %s\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
time.Sleep(400 * time.Millisecond)
defer sub.Unsubscribe()
// Each async result is delayed by additional 100msWith context cancellation
ctx, cancel := context.WithTimeout(context.Background(), 200 * time.Millisecond)
defer cancel()
obs := ro.Pipe(
ro.Interval(50 * time.Millisecond),
ro.Delay(150 * time.Millisecond),
)
sub := obs.SubscribeWithContext(ctx, ro.NewObserver(
func(value int64) {
fmt.Printf("Value: %d\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
// No values will be emitted because context times out
// before the delay completes
time.Sleep(300 * time.Millisecond)
sub.Unsubscribe()
// Error: context deadline exceededWith real-time data
// Delay real-time price updates
type PriceUpdate struct {
Symbol string
Price float64
Time time.Time
}
obs := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.Take[int64](5),
ro.Map(func(ts int64) PriceUpdate {
return PriceUpdate{
Symbol: "BTC",
Price: 50000 + rand.Float64()*1000,
Time: time.Now(),
}
}),
ro.Delay(200 * time.Millisecond),
)
sub := obs.Subscribe(ro.NewObserver(
func(update PriceUpdate) {
delay := time.Since(update.Time)
fmt.Printf("Price: $%.2f (delayed by %v)\n", update.Price, delay.Round(time.Millisecond))
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
time.Sleep(1 * time.Second)
sub.Unsubscribe()With conditional application
// Apply delay only to certain items
obs := ro.Pipe(
ro.Just(
"immediate", // No delay
"delayed", // Apply delay
"immediate", // No delay
),
ro.Map(func(item string) ro.Observable[string] {
if item == "delayed" {
return ro.Pipe(
ro.Just(item),
ro.Delay(100 * time.Millisecond),
)
}
return ro.Just(item)
}),
ro.Merge[string](),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
fmt.Printf("Received: %s\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
time.Sleep(200 * time.Millisecond)
defer sub.Unsubscribe()
// Received: immediate
// Received: immediate
// (100ms delay)
// Received: delayedPrototype:func Delay[T any](duration time.Duration)
Materializeโ
Converts an observable sequence into an observable of notifications representing the original sequence's events.
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.Materialize[int](),
)
sub := obs.Subscribe(ro.PrintObserver[ro.Notification[int]]())
defer sub.Unsubscribe()
// Next: {Value: 1, HasValue: true}
// Next: {Value: 2, HasValue: true}
// Next: {Value: 3, HasValue: true}
// Next: {Error: nil, HasValue: false}
// CompletedWith errors
obs := ro.Pipe(
ro.Concat(
ro.Just(1, 2),
ro.Throw[int](errors.New("test error")),
),
ro.Materialize[int](),
)
sub := obs.Subscribe(ro.PrintObserver[ro.Notification[int]]())
defer sub.Unsubscribe()
// Next: {Value: 1, HasValue: true}
// Next: {Value: 2, HasValue: true}
// Next: {Error: test error, HasValue: false}
// CompletedProcessing notifications
obs := ro.Pipe(
ro.Just("hello", "world"),
ro.Materialize[string](),
)
sub := obs.Subscribe(ro.NewObserver[ro.Notification[string]](
func(notification ro.Notification[string]) {
if notification.HasValue {
fmt.Printf("Value: %v\n", notification.Value)
} else if notification.Error != nil {
fmt.Printf("Error: %v\n", notification.Error)
} else {
fmt.Println("Completed")
}
},
func(err error) {
fmt.Printf("Observer error: %v\n", err)
},
func() {
fmt.Println("Observer completed")
},
))
defer sub.Unsubscribe()
// Value: hello
// Value: world
// Completed
// Observer completedSimilar:Prototype:func Materialize[T any]()
Dematerializeโ
Converts an observable of notifications back into an observable sequence.
// Create notifications
notifications := []ro.Notification[int]{
{Value: 1, HasValue: true},
{Value: 2, HasValue: true},
{Value: 3, HasValue: true},
{Error: nil, HasValue: false}, // Completion
}
obs := ro.Pipe(
ro.FromSlice(notifications),
ro.Dematerialize[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// CompletedWith error notifications
// Create notifications including error
notifications := []ro.Notification[string]{
{Value: "hello", HasValue: true},
{Value: "world", HasValue: true},
{Error: errors.New("test error"), HasValue: false},
}
obs := ro.Pipe(
ro.FromSlice(notifications),
ro.Dematerialize[string](),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: hello
// Next: world
// Error: test errorRound trip with Materialize
original := ro.Just(1, 2, 3)
// Materialize then dematerialize
roundTrip := ro.Pipe(
original,
ro.Materialize[int](),
ro.Dematerialize[int](),
)
sub := roundTrip.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// CompletedProcessing notifications manually
// Create custom notification sequence
notifications := []ro.Notification[float64]{
{Value: 3.14, HasValue: true},
{Value: 2.71, HasValue: true},
{Error: nil, HasValue: false},
}
obs := ro.Pipe(
ro.FromSlice(notifications),
ro.Dematerialize[float64](),
)
sub := obs.Subscribe(ro.NewObserver(
func(value float64) {
fmt.Printf("Received: %.2f\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Sequence completed")
},
))
defer sub.Unsubscribe()
// Received: 3.14
// Received: 2.71
// Sequence completedSimilar:Prototype:func Dematerialize[T any]()