Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Utility operatorsโ€‹

This page lists all utility operators, available in the core package of ro.

  • Performs side effects for each emission from an Observable, without modifying the emitted values.

    var nextCount, errorCount, completeCount int

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.Do(
    func(value int) {
    nextCount++
    fmt.Printf("Next: %d\n", value)
    },
    func(err error) {
    errorCount++
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    completeCount++
    fmt.Println("Complete")
    },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed

    fmt.Printf("Counts: next=%d, error=%d, complete=%d\n", nextCount, errorCount, completeCount)
    // Counts: next=3, error=0, complete=1

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.DoWithContext(
    func(ctx context.Context, value int) {
    fmt.Printf("Next with context: %d\n", value)
    },
    func(ctx context.Context, err error) {
    fmt.Printf("Error with context: %v\n", err)
    },
    func(ctx context.Context) {
    fmt.Println("Complete with context")
    },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    DoOnNext

    var nextValues []int

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.DoOnNext(func(value int) {
    nextValues = append(nextValues, value)
    fmt.Printf("Received: %d\n", value)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Received: 1
    // Received: 2
    // Received: 3

    fmt.Printf("Collected values: %v\n", nextValues)
    // Collected values: [1 2 3]

    DoOnError

    var lastError error

    obs := ro.Pipe(
    ro.Throw[int](fmt.Errorf("test error")),
    ro.DoOnError(func(err error) {
    lastError = err
    fmt.Printf("Error captured: %v\n", err)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Error captured: test error
    // Error: test error

    DoOnComplete

    var completed bool

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.DoOnComplete(func() {
    completed = true
    fmt.Println("Stream completed")
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Stream completed

    DoOnSubscribe

    var subscribed bool

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.DoOnSubscribe(func() {
    subscribed = true
    fmt.Println("Subscribed to observable")
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Subscribed to observable

    DoOnFinalize

    var finalized bool

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.DoOnFinalize(func() {
    finalized = true
    fmt.Println("Observable finalized")
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Observable finalized
    Similar:
    Prototypes:
    func Do[T any](onNext func(value T), onError func(err error), onComplete func())
    func DoWithContext[T any](onNext func(ctx context.Context, value T), onError func(ctx context.Context, err error), onComplete func(ctx context.Context))
    func DoOnNext[T any](onNext func(value T))
    func DoOnNextWithContext[T any](onNext func(ctx context.Context, value T))
    func DoOnError[T any](onError func(err error))
    func DoOnErrorWithContext[T any](onError func(ctx context.Context, err error))
    func DoOnComplete[T any](onComplete func())
    func DoOnCompleteWithContext[T any](onComplete func(ctx context.Context))
    func DoOnSubscribe[T any](onSubscribe func())
    func DoOnSubscribeWithContext[T any](onSubscribe func(ctx context.Context))
    func DoOnFinalize[T any](onFinalize func())
  • Allows you to perform side effects for notifications from the source Observable without modifying the emitted items. It mirrors the source Observable and forwards its emissions to the provided observer.

    var nextCount, errorCount, completeCount int

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.Tap(
    func(value int) {
    nextCount++
    fmt.Printf("Next: %d\n", value)
    },
    func(err error) {
    errorCount++
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    completeCount++
    fmt.Println("Complete")
    },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed

    fmt.Printf("Counts: next=%d, error=%d, complete=%d\n", nextCount, errorCount, completeCount)
    // Counts: next=3, error=0, complete=1

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.TapWithContext(
    func(ctx context.Context, value int) {
    fmt.Printf("Next with context: %d\n", value)
    },
    func(ctx context.Context, err error) {
    fmt.Printf("Error with context: %v\n", err)
    },
    func(ctx context.Context) {
    fmt.Println("Complete with context")
    },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    TapOnNext

    var nextValues []int

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.TapOnNext(func(value int) {
    nextValues = append(nextValues, value)
    fmt.Printf("Received: %d\n", value)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Received: 1
    // Received: 2
    // Received: 3

    fmt.Printf("Collected values: %v\n", nextValues)
    // Collected values: [1 2 3]

    TapOnError

    var lastError error

    obs := ro.Pipe(
    ro.Throw[int](fmt.Errorf("test error")),
    ro.TapOnError(func(err error) {
    lastError = err
    fmt.Printf("Error captured: %v\n", err)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Error captured: test error
    // Error: test error

    TapOnComplete

    var completed bool

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.TapOnComplete(func() {
    completed = true
    fmt.Println("Stream completed")
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Stream completed

    With context error handling

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.TapOnErrorWithContext(func(ctx context.Context, err error) {
    fmt.Printf("Error with context: %v\n", err)
    }),
    ro.TapOnCompleteWithContext(func(ctx context.Context) {
    fmt.Println("Completed with context")
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    For debugging

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Take[int64](3),
    ro.TapOnNext(func(value int64) {
    fmt.Printf("[DEBUG] Received: %d\n", value)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // [DEBUG] Received: 0
    // [DEBUG] Received: 1
    // [DEBUG] Received: 2

    With error scenarios

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, fmt.Errorf("error on 3")
    }
    return i, nil
    }),
    ),
    ro.Tap(
    func(value int) {
    fmt.Printf("Next tap: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Error tap: %v\n", err)
    },
    func() {
    fmt.Println("Complete tap")
    },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next tap: 1
    // Next tap: 2
    // Error tap: error on 3

    With hot observables

    source := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.TapOnNext(func(value int64) {
    fmt.Printf("Source value: %d\n", value)
    }),
    )

    // Multiple subscribers get the same tap side effects
    sub1 := source.Subscribe(ro.PrintObserver[int64]())
    sub2 := source.Subscribe(ro.PrintObserver[int64]())

    time.Sleep(350 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    // Each subscriber triggers the tap side effects

    With cleanup operations

    cleanup := func() {
    fmt.Println("Cleaning up resources...")
    }

    obs := ro.Pipe(
    ro.Just("data1", "data2"),
    ro.TapOnComplete(cleanup),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Cleaning up resources...
    Similar:
    Prototypes:
    func Tap[T any](onNext func(value T), onError func(err error), onComplete func())
    func TapWithContext[T any](onNext func(ctx context.Context, value T), onError func(ctx context.Context, err error), onComplete func(ctx context.Context))
    func TapOnNext[T any](onNext func(value T))
    func TapOnNextWithContext[T any](onNext func(ctx context.Context, value T))
    func TapOnError[T any](onError func(err error))
    func TapOnErrorWithContext[T any](onError func(ctx context.Context, err error))
    func TapOnComplete[T any](onComplete func())
    func TapOnCompleteWithContext[T any](onComplete func(ctx context.Context))
  • Raises an error if the source Observable does not emit any item within the specified duration. The timeout resets after each emission.

    obs := ro.Pipe(
    ro.Interval(200*time.Millisecond),
    ro.Timeout(100*time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(300 * time.Millisecond)
    sub.Unsubscribe()

    // Error: timeout after 100ms

    With fast emissions (no timeout)

    obs := ro.Pipe(
    ro.Interval(50*time.Millisecond),
    ro.Timeout(200*time.Millisecond),
    ro.Take(3),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(300 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0
    // Next: 1
    // Next: 2
    // Completed

    With slow emissions (timeout occurs)

    obs := ro.Pipe(
    ro.Interval(500*time.Millisecond),
    ro.Timeout(200*time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(800 * time.Millisecond)
    sub.Unsubscribe()

    // Error: timeout after 200ms

    With delayed first emission

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just("delayed"),
    ro.Delay(300*time.Millisecond),
    ),
    ro.Timeout(100*time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Error: timeout after 100ms (before first emission)

    With error in source (propagates immediately)

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just("will error"),
    ro.Throw[string](errors.New("source error")),
    ),
    ro.Timeout(1*time.Second),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: source error (propagates before timeout)

    With multiple emissions and varying intervals

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just("fast"),
    ro.Delay(50*time.Millisecond),
    ),
    ro.Pipe(
    ro.Just("slow"),
    ro.Delay(300*time.Millisecond),
    ),
    ro.Timeout(200*time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(400 * time.Millisecond)
    sub.Unsubscribe()

    // Next: fast (emitted within timeout)
    // Error: timeout after 200ms (waiting for slow)
    Prototype:
    func Timeout[T any](duration time.Duration)
  • Schedule the upstream flow to a different goroutine. This detaches the subscription from the current goroutine and processes emissions in a separate goroutine.

    obs := ro.Pipe(
    ro.Just("main", "thread"),
    ro.SubscribeOn(10), // Process in background goroutine with buffer size 10
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: main (processed in background goroutine)
    // Next: thread (processed in background goroutine)
    // Completed

    With heavy computations

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Map(func(i int) int {
    // Simulate heavy computation
    time.Sleep(100 * time.Millisecond)
    return i * i
    }),
    ro.SubscribeOn(100), // Large buffer for heavy computations
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1, 4, 9, 16, 25 (computed in background goroutine)
    // Completed

    With backpressure control

    obs := ro.Pipe(
    ro.Interval(10*time.Millisecond), // Fast producer
    ro.SubscribeOn(5), // Small buffer causes backpressure
    )

    sub := obs.Subscribe(ro.NewObserver[int64](
    func(value int64) {
    time.Sleep(50 * time.Millisecond) // Slow consumer
    fmt.Printf("Next: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    time.Sleep(200 * time.Millisecond)
    sub.Unsubscribe()

    // Next: values processed in background goroutine with backpressure
    // Completed

    With error handling

    obs := ro.Pipe(
    ro.Throw[string](errors.New("background error")),
    ro.SubscribeOn(10),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: background error

    With multiple operators

    obs := ro.Pipe(
    ro.Just("a", "b", "c"),
    ro.Map(func(s string) string { return strings.ToUpper(s) }),
    ro.Filter(func(s string) bool { return s != "B" }),
    ro.SubscribeOn(20), // All upstream operations in background
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: A
    // Next: C
    // Completed
    Similar:
    Prototype:
    func SubscribeOn[T any](bufferSize int)
  • Schedule the downstream flow to a different goroutine. Converts a push-based Observable into a pullable stream with backpressure capabilities.

    obs := ro.Pipe(
    ro.Just("fast", "emissions"),
    ro.ObserveOn(2), // Small buffer size
    )

    sub := obs.Subscribe(ro.NewObserver[string](
    func(value string) {
    time.Sleep(200 * time.Millisecond) // Slow consumer
    fmt.Printf("Next: %s\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: fast (after 200ms delay)
    // Next: emissions (after 200ms delay)
    // Completed

    With backpressure control

    obs := ro.Pipe(
    ro.Interval(10*time.Millisecond), // Fast producer
    ro.ObserveOn(5), // Small buffer for backpressure
    )

    sub := obs.Subscribe(ro.NewObserver[int64](
    func(value int64) {
    time.Sleep(100 * time.Millisecond) // Slow consumer
    fmt.Printf("Next: %d\n", value)
    },
    ))
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: values processed with backpressure control
    // (Fast producer will be blocked when buffer is full)

    With large buffer

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
    ro.ObserveOn(100), // Large buffer
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 (processed in background goroutine)
    // Completed

    With error propagation

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just("will error"),
    ro.Throw[string](errors.New("propagated error")),
    ),
    ro.ObserveOn(10),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: propagated error

    Combined with SubscribeOn

    obs := ro.Pipe(
    ro.Just("background", "processing"),
    ro.SubscribeOn(10), // Upstream in background
    ro.Map(strings.ToUpper),
    ro.ObserveOn(5), // Downstream in different background thread
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: BACKGROUND
    // Next: PROCESSING
    // Completed
    Similar:
    Prototype:
    func ObserveOn[T any](bufferSize int)
  • Records the interval of time between emissions from the source Observable and emits this information as ro.IntervalValue objects.

    obs := ro.Pipe(
    ro.Just("A", "B", "C"),
    ro.TimeInterval[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[ro.IntervalValue[string]]())
    defer sub.Unsubscribe()

    // Shows interval between emissions:
    // Next: {Value:A Interval:0ms}
    // Next: {Value:B Interval:<time between A and B>}
    // Next: {Value:C Interval:<time between B and C>}
    // Completed

    With hot observable

    obs := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.Take[int64](5),
    ro.TimeInterval[int64](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[int64]) {
    fmt.Printf("Value: %d, Interval: %v\n", value.Value, value.Interval)
    ))
    time.Sleep(700 * time.Millisecond)
    sub.Unsubscribe()

    //
    // Value: 0, Interval: 0s
    // Value: 1, Interval: ~100ms
    // Value: 2, Interval: ~100ms
    // Value: 3, Interval: ~100ms
    // Value: 4, Interval: ~100ms

    With async operations

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just("task1", "task2", "task3"),
    ro.MapAsync(func(task string) Observable[string] {
    return ro.Defer(func() Observable[string] {
    delay := time.Duration(rand.Intn(200)) * time.Millisecond
    time.Sleep(delay)
    return ro.Just(task)
    })
    }, 2),
    ),
    ro.TimeInterval[string](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[string]) {
    fmt.Printf("Task: %s completed after %v\n", value.Value, value.Interval)
    }))
    time.Sleep(500 * time.Millisecond)
    defer sub.Unsubscribe()

    // Shows variable intervals due to async processing

    With error handling

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, fmt.Errorf("error on 3")
    }
    time.Sleep(50 * time.Millisecond)
    return i, nil
    }),
    ),
    ro.TimeInterval[int](),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value ro.IntervalValue[int]) {
    fmt.Printf("Value: %d, Interval: %v\n", value.Value, value.Interval)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Complete")
    },
    ))
    defer sub.Unsubscribe()

    //
    // Value: 1, Interval: 0s
    // Value: 2, Interval: ~50ms
    // Error: error on 3

    With performance monitoring

    // Monitor processing time for expensive operations
    obs := ro.Pipe(
    ro.Just("data1", "data2", "data3"),
    ro.Map(func(data string) string {
    // Simulate expensive processing
    time.Sleep(100 * time.Millisecond)
    return "processed_" + data
    }),
    ro.TimeInterval[string](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[string]) {
    fmt.Printf("Processed: %s in %v\n", value.Value, value.Interval)
    }))
    defer sub.Unsubscribe()

    //
    // Processed: processed_data1 in ~100ms
    // Processed: processed_data2 in ~100ms
    // Processed: processed_data3 in ~100ms

    With real-time data stream

    // Monitor intervals in real-time data stream
    source := ro.Interval(200 * time.Millisecond)
    obs := ro.Pipe(
    source,
    ro.Take[int64](10),
    ro.Map(func(ts int64) float64 {
    // Simulate sensor reading
    return 20.0 + rand.Float64()*10
    }),
    ro.TimeInterval[float64](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[float64]) {
    fmt.Printf("[%v] Reading: %.2f (interval: %v)\n",
    time.Now().Format("15:04:05.000"),
    value.Value,
    value.Interval.Round(time.Millisecond))
    }))
    time.Sleep(2500 * time.Millisecond)
    sub.Unsubscribe()

    With batch processing analysis

    // Analyze batch processing times
    obs := ro.Pipe(
    ro.Range(1, 6),
    ro.BufferWithCount[int](2),
    ro.TimeInterval[[]int](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[[]int]) {
    fmt.Printf("Batch %v processed in %v\n", value.Value, value.Interval)
    }))
    defer sub.Unsubscribe()

    //
    // Batch [1 2] processed in ~0s
    // Batch [3 4] processed in ~<interval>
    // Batch [5] processed in ~<interval>

    With conditional intervals

    // Measure intervals only for certain values
    obs := ro.Pipe(
    ro.Range(1, 10),
    ro.Filter(func(i int) bool {
    return i%3 == 0 // Only multiples of 3
    }),
    ro.TimeInterval[int](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value ro.IntervalValue[int]) {
    fmt.Printf("Filtered value: %d (interval: %v)\n", value.Value, value.Interval)
    }))
    defer sub.Unsubscribe()

    //
    // Filtered value: 3 (interval: <time from start to 3>)
    // Filtered value: 6 (interval: <time between 3 and 6>)
    // Filtered value: 9 (interval: <time between 6 and 9>)
    Similar:
    Prototype:
    func TimeInterval[T any]()
  • Attaches a timestamp to each emission from the source Observable, indicating when it was emitted.

    obs := ro.Pipe(
    ro.Just("A", "B", "C"),
    ro.Timestamp[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[TimestampValue[string]]())
    defer sub.Unsubscribe()

    // Next: {Value:A Timestamp:<current time>}
    // Next: {Value:B Timestamp:<current time>}
    // Next: {Value:C Timestamp:<current time>}
    // Completed

    With hot observable

    obs := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.Take[int64](3),
    ro.Timestamp[int64](),
    )

    sub := obs.Subscribe(ro.OnNext(
    func(value TimestampValue[int64]) {
    fmt.Printf("Value: %d at %v\n", value.Value, value.Timestamp.Format("15:04:05.000"))
    },
    ))
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Value: 0 at 12:34:56.789
    // Value: 1 at 12:34:56.889
    // Value: 2 at 12:34:56.989

    With async operations

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just("task1", "task2", "task3"),
    ro.MapAsync(func(task string) Observable[string] {
    return ro.Defer(func() Observable[string] {
    time.Sleep(50 * time.Millisecond)
    return ro.Just(task)
    })
    }, 2),
    ),
    ro.Timestamp[string](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value TimestampValue[string]) {
    fmt.Printf("Task %s completed at %v\n", value.Value, value.Timestamp.Format("15:04:05.000"))
    }))
    time.Sleep(300 * time.Millisecond)
    defer sub.Unsubscribe()

    With data logging

    type LogEntry struct {
    Message string
    Level string
    Timestamp time.Time
    }

    obs := ro.Pipe(
    ro.Just(
    LogEntry{Message: "Server started", Level: "INFO"},
    LogEntry{Message: "User connected", Level: "INFO"},
    LogEntry{Message: "Database error", Level: "ERROR"},
    ),
    ro.Timestamp[LogEntry](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value TimestampValue[LogEntry]) {
    entry := value.Value
    fmt.Printf("[%s] %s: %s\n",
    value.Timestamp.Format("2006-01-02 15:04:05"),
    entry.Level,
    entry.Message)
    }))
    defer sub.Unsubscribe()

    // [2024-01-01 12:34:56] INFO: Server started
    // [2024-01-01 12:34:56] INFO: User connected
    // [2024-01-01 12:34:56] ERROR: Database error

    With real-time sensor data

    type SensorReading struct {
    ID string
    Value float64
    Timestamp time.Time
    }

    obs := ro.Pipe(
    ro.Interval(1 * time.Second),
    ro.Take[int64](5),
    ro.Map(func(ts int64) SensorReading {
    return SensorReading{
    ID: "temp-01",
    Value: 20.0 + rand.Float64()*10,
    Timestamp: time.Now(),
    }
    }),
    ro.Timestamp[SensorReading](),
    )

    sub := obs.Subscribe(ro.OnNext(
    func(value TimestampValue[SensorReading]) {
    reading := value.Value
    fmt.Printf("[%s] %s: %.2fยฐC (system time: %v)\n",
    reading.Timestamp.Format("15:04:05"),
    reading.ID,
    reading.Value,
    value.Timestamp.Format("15:04:05.000"))
    },
    ))
    time.Sleep(6 * time.Second)
    sub.Unsubscribe()

    With event ordering

    // Track event order and timing
    type Event struct {
    ID string
    Action string
    Payload interface{}
    }

    obs := ro.Pipe(
    ro.Just(
    Event{ID: "1", Action: "click", Payload: "button"},
    Event{ID: "2", Action: "scroll", Payload: 100},
    Event{ID: "3", Action: "input", Payload: "text"},
    ),
    ro.Timestamp[Event](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value TimestampValue[Event]) {
    event := value.Value
    fmt.Printf("%s | Event %s: %s (%v)\n",
    value.Timestamp.Format("15:04:05.000"),
    event.ID,
    event.Action,
    event.Payload)
    }))
    defer sub.Unsubscribe()

    // Shows precise timing of each event

    With batch processing

    // Timestamp batch completion times
    obs := ro.Pipe(
    ro.Range(1, 10),
    ro.BufferWithCount[int](3),
    ro.Timestamp[[]int](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value TimestampValue[[]int]) {
    fmt.Printf("Batch %v completed at %v\n",
    value.Value,
    value.Timestamp.Format("15:04:05.000"))
    }))
    defer sub.Unsubscribe()

    // Batch [1 2 3] completed at 12:34:56.789
    // Batch [4 5 6] completed at 12:34:56.789
    // Batch [7 8 9] completed at 12:34:56.789

    With error tracking

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, fmt.Errorf("processing error")
    }
    return i * 10, nil
    }),
    ),
    ro.Timestamp[int](),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value TimestampValue[int]) {
    fmt.Printf("Success: %d at %v\n", value.Value, value.Timestamp.Format("15:04:05.000"))
    },
    func(err error) {
    fmt.Printf("Error occurred at %v: %v\n", time.Now().Format("15:04:05.000"), err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Success: 10 at 12:34:56.789
    // Success: 20 at 12:34:56.789
    // Error occurred at 12:34:56.789: processing error
    Similar:
    Prototype:
    func Timestamp[T any]()
  • Shifts the emissions from the source Observable forward in time by a specified amount.

    obs := ro.Pipe(
    ro.Just("A", "B", "C"),
    ro.Delay(100 * time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(200 * time.Millisecond)
    defer sub.Unsubscribe()

    // (100ms delay)
    // Next: A
    // Next: B
    // Next: C
    // Completed

    With hot observable

    start := time.Now()
    obs := ro.Pipe(
    ro.Interval(50 * time.Millisecond),
    ro.Take[int64](3),
    ro.Delay(200 * time.Millisecond),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value int64) {
    elapsed := time.Since(start)
    fmt.Printf("Value: %d at %v\n", value, elapsed.Round(time.Millisecond))
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Value: 0 at ~250ms (200ms delay + 50ms emission)
    // Value: 1 at ~300ms
    // Value: 2 at ~350ms

    With error propagation

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, fmt.Errorf("error on 3")
    }
    return i, nil
    }),
    ro.Delay(100 * time.Millisecond),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value int) {
    fmt.Printf("Next: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    time.Sleep(300 * time.Millisecond)
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Error: error on 3 (delayed by 100ms)

    With multiple delays in pipeline

    obs := ro.Pipe(
    ro.Just("start", "middle", "end"),
    ro.Delay(50 * time.Millisecond),
    ro.Map(func(s string) string {
    return s + "_processed"
    }),
    ro.Delay(25 * time.Millisecond),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    fmt.Printf("Received: %s\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    time.Sleep(200 * time.Millisecond)
    defer sub.Unsubscribe()

    // Total delay: ~75ms per item
    // Received: start_processed
    // Received: middle_processed
    // Received: end_processed

    With async operations

    obs := ro.Pipe(
    ro.Just("task1", "task2", "task3"),
    ro.MapAsync(func(task string) ro.Observable[string] {
    return ro.Defer(func() ro.Observable[string] {
    time.Sleep(30 * time.Millisecond)
    return ro.Just(task + "_done")
    })
    }, 2),
    ro.Delay(100 * time.Millisecond),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    fmt.Printf("Task completed: %s\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    time.Sleep(400 * time.Millisecond)
    defer sub.Unsubscribe()

    // Each async result is delayed by additional 100ms

    With context cancellation

    ctx, cancel := context.WithTimeout(context.Background(), 200 * time.Millisecond)
    defer cancel()

    obs := ro.Pipe(
    ro.Interval(50 * time.Millisecond),
    ro.Delay(150 * time.Millisecond),
    )

    sub := obs.SubscribeWithContext(ctx, ro.NewObserver(
    func(value int64) {
    fmt.Printf("Value: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))

    // No values will be emitted because context times out
    // before the delay completes
    time.Sleep(300 * time.Millisecond)
    sub.Unsubscribe()

    // Error: context deadline exceeded

    With real-time data

    // Delay real-time price updates
    type PriceUpdate struct {
    Symbol string
    Price float64
    Time time.Time
    }

    obs := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.Take[int64](5),
    ro.Map(func(ts int64) PriceUpdate {
    return PriceUpdate{
    Symbol: "BTC",
    Price: 50000 + rand.Float64()*1000,
    Time: time.Now(),
    }
    }),
    ro.Delay(200 * time.Millisecond),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(update PriceUpdate) {
    delay := time.Since(update.Time)
    fmt.Printf("Price: $%.2f (delayed by %v)\n", update.Price, delay.Round(time.Millisecond))
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    time.Sleep(1 * time.Second)
    sub.Unsubscribe()

    With conditional application

    // Apply delay only to certain items
    obs := ro.Pipe(
    ro.Just(
    "immediate", // No delay
    "delayed", // Apply delay
    "immediate", // No delay
    ),
    ro.Map(func(item string) ro.Observable[string] {
    if item == "delayed" {
    return ro.Pipe(
    ro.Just(item),
    ro.Delay(100 * time.Millisecond),
    )
    }
    return ro.Just(item)
    }),
    ro.Merge[string](),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    fmt.Printf("Received: %s\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    time.Sleep(200 * time.Millisecond)
    defer sub.Unsubscribe()

    // Received: immediate
    // Received: immediate
    // (100ms delay)
    // Received: delayed
    Prototype:
    func Delay[T any](duration time.Duration)
  • Converts an observable sequence into an observable of notifications representing the original sequence's events.

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.Materialize[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[ro.Notification[int]]())
    defer sub.Unsubscribe()

    // Next: {Value: 1, HasValue: true}
    // Next: {Value: 2, HasValue: true}
    // Next: {Value: 3, HasValue: true}
    // Next: {Error: nil, HasValue: false}
    // Completed

    With errors

    obs := ro.Pipe(
    ro.Concat(
    ro.Just(1, 2),
    ro.Throw[int](errors.New("test error")),
    ),
    ro.Materialize[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[ro.Notification[int]]())
    defer sub.Unsubscribe()

    // Next: {Value: 1, HasValue: true}
    // Next: {Value: 2, HasValue: true}
    // Next: {Error: test error, HasValue: false}
    // Completed

    Processing notifications

    obs := ro.Pipe(
    ro.Just("hello", "world"),
    ro.Materialize[string](),
    )

    sub := obs.Subscribe(ro.NewObserver[ro.Notification[string]](
    func(notification ro.Notification[string]) {
    if notification.HasValue {
    fmt.Printf("Value: %v\n", notification.Value)
    } else if notification.Error != nil {
    fmt.Printf("Error: %v\n", notification.Error)
    } else {
    fmt.Println("Completed")
    }
    },
    func(err error) {
    fmt.Printf("Observer error: %v\n", err)
    },
    func() {
    fmt.Println("Observer completed")
    },
    ))
    defer sub.Unsubscribe()

    // Value: hello
    // Value: world
    // Completed
    // Observer completed
    Prototype:
    func Materialize[T any]()
  • Converts an observable of notifications back into an observable sequence.

    // Create notifications
    notifications := []ro.Notification[int]{
    {Value: 1, HasValue: true},
    {Value: 2, HasValue: true},
    {Value: 3, HasValue: true},
    {Error: nil, HasValue: false}, // Completion
    }

    obs := ro.Pipe(
    ro.FromSlice(notifications),
    ro.Dematerialize[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed

    With error notifications

    // Create notifications including error
    notifications := []ro.Notification[string]{
    {Value: "hello", HasValue: true},
    {Value: "world", HasValue: true},
    {Error: errors.New("test error"), HasValue: false},
    }

    obs := ro.Pipe(
    ro.FromSlice(notifications),
    ro.Dematerialize[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: hello
    // Next: world
    // Error: test error

    Round trip with Materialize

    original := ro.Just(1, 2, 3)

    // Materialize then dematerialize
    roundTrip := ro.Pipe(
    original,
    ro.Materialize[int](),
    ro.Dematerialize[int](),
    )

    sub := roundTrip.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed

    Processing notifications manually

    // Create custom notification sequence
    notifications := []ro.Notification[float64]{
    {Value: 3.14, HasValue: true},
    {Value: 2.71, HasValue: true},
    {Error: nil, HasValue: false},
    }

    obs := ro.Pipe(
    ro.FromSlice(notifications),
    ro.Dematerialize[float64](),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value float64) {
    fmt.Printf("Received: %.2f\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Sequence completed")
    },
    ))
    defer sub.Unsubscribe()

    // Received: 3.14
    // Received: 2.71
    // Sequence completed
    Similar:
    Prototype:
    func Dematerialize[T any]()