Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Context control operatorsโ€‹

This page lists all context operations, available in the core package of ro.

  • ContextWithValueโ€‹

    Adds a key-value pair to the context of each item in the observable sequence.

    obs := ro.Pipe(
    ro.Just("request1", "request2"),
    ro.ContextWithValue[string]("requestID", "req-123"),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Processing %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Each item now has context with requestID: "req-123"
    // Next: "Processing request1"
    // Next: "Processing request2"
    // Completed

    With context extraction in downstream operators

    obs := ro.Pipe(
    ro.Just("data1", "data2"),
    ro.ContextWithValue[string]("userID", 42),
    ro.Map(func(s string) string {
    return fmt.Sprintf("User data: %s", s)
    }),
    )

    // Extract context in subscription
    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    userID := ctx.Value("userID")
    fmt.Printf("Next: %s (userID: %v)\n", value, userID)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: User data: data1 (userID: 42)
    // Next: User data: data2 (userID: 42)
    // Completed

    With multiple context values

    obs := ro.Pipe(
    ro.Just("task1", "task2"),
    ro.ContextWithValue[string]("traceID", "trace-abc"),
    ro.ContextWithValue[string]("sessionID", "session-xyz"),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Executing %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    traceID := ctx.Value("traceID")
    sessionID := ctx.Value("sessionID")
    fmt.Printf("Next: %s (trace: %v, session: %v)\n", value, traceID, sessionID)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Executing task1 (trace: trace-abc, session: session-xyz)
    // Next: Executing task2 (trace: trace-abc, session: session-xyz)
    // Completed

    With structured context values

    type RequestMetadata struct {
    TraceID string
    Timestamp time.Time
    UserID int
    }

    metadata := RequestMetadata{
    TraceID: "req-456",
    Timestamp: time.Now(),
    UserID: 789,
    }

    obs := ro.Pipe(
    ro.Just("api_call1", "api_call2"),
    ro.ContextWithValue[string]("metadata", metadata),
    ro.Map(func(s string) string {
    return fmt.Sprintf("API call: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    if meta, ok := ctx.Value("metadata").(RequestMetadata); ok {
    fmt.Printf("Next: %s (user: %d, trace: %s)\n",
    value, meta.UserID, meta.TraceID)
    }
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: API call: api_call1 (user: 789, trace: req-456)
    // Next: API call: api_call2 (user: 789, trace: req-456)
    // Completed

    With context-aware error handling

    obs := ro.Pipe(
    ro.Just("critical_op1", "critical_op2"),
    ro.ContextWithValue[string]("operationType", "high_priority"),
    ro.MapErr(func(s string) (string, error) {
    if s == "critical_op2" {
    return "", fmt.Errorf("failed operation: %s", s)
    }
    return s, nil
    }),
    ro.Catch(func(err error) Observable[string] {
    return ro.Just(fmt.Sprintf("Fallback for error: %v", err))
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    opType := ctx.Value("operationType")
    fmt.Printf("Next: %s (type: %v)\n", value, opType)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: critical_op1 (type: high_priority)
    // Next: Fallback for error: failed operation: critical_op2 (type: high_priority)
    // Completed

    With nested context in async operations

    processAsync := func(ctx context.Context, item string) ro.Observable[string] {
    traceID := ctx.Value("traceID")
    return ro.Defer(func() ro.Observable[string] {
    time.Sleep(50 * time.Millisecond) // Simulate async work
    return ro.Just(fmt.Sprintf("Processed %s (trace: %v)", item, traceID))
    })
    }

    obs := ro.Pipe(
    ro.Just("item1", "item2"),
    ro.ContextWithValue[string]("traceID", "async-trace-789"),
    ro.MergeMap(func(item string) ro.Observable[string] {
    return ro.Defer(func() ro.Observable[string] {
    // Extract current context for async operation
    return processAsync(context.Background(), item)
    })
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(200 * time.Millisecond)
    sub.Unsubscribe()

    // Next: Processed item1 (trace: async-trace-789)
    // Next: Processed item2 (trace: async-trace-789)
    // Completed
    Prototype:
    func ContextWithValue[T any](k any, v any)
  • ContextWithTimeoutโ€‹

    Adds a timeout to the context of each item in the observable sequence. Should be chained with ThrowOnContextCancel to handle timeout errors.

    obs := ro.Pipe(
    ro.Just("slow_operation"),
    ro.ContextWithTimeout[string](100 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(150 * time.Millisecond) // Simulate slow operation
    return fmt.Sprintf("Completed %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: context deadline exceeded (operation took longer than 100ms timeout)

    With successful completion within timeout

    obs := ro.Pipe(
    ro.Just("fast_operation"),
    ro.ContextWithTimeout[string](200 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(100 * time.Millisecond) // Completes within timeout
    return fmt.Sprintf("Success: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Success: fast_operation"
    // Completed

    With ContextWithTimeoutCause

    timeoutError := errors.New("operation timed out")
    obs := ro.Pipe(
    ro.Just("data_processing"),
    ro.ContextWithTimeoutCause[string](50 * time.Millisecond, timeoutError),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(100 * time.Millisecond) // Will timeout
    return fmt.Sprintf("Result: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: operation timed out (custom cause error)

    With multiple operations and timeout

    obs := ro.Pipe(
    ro.Just("op1", "op2", "op3"),
    ro.ContextWithTimeout[string](150 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    if s == "op2" {
    time.Sleep(200 * time.Millisecond) // This one will timeout
    }
    return fmt.Sprintf("Processed: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Processed: op1"
    // Error: context deadline exceeded (on op2)

    With retry mechanism after timeout

    obs := ro.Pipe(
    ro.Defer(func() Observable[string] {
    return ro.Pipe(
    ro.Just("retry_operation"),
    ro.ContextWithTimeout[string](100 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(150 * time.Millisecond) // Always timeout
    return fmt.Sprintf("Success: %s", s)
    }),
    )
    }),
    ro.RetryWithConfig[string](ro.RetryConfig{
    MaxRetries: 3,
    Delay: 50 * time.Millisecond,
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Will retry up to 3 times with 50ms delays
    // Error: context deadline exceeded (if all retries timeout)

    With async operations

    processItem := func(item string) Observable[string] {
    return ro.Defer(func() Observable[string] {
    time.Sleep(80 * time.Millisecond) // Simulate async processing
    return ro.Just(fmt.Sprintf("Async result: %s", item))
    })
    }

    obs := ro.Pipe(
    ro.Just("item1", "item2"),
    ro.ContextWithTimeout[string](60 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.MergeMap(processItem),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(200 * time.Millisecond)
    sub.Unsubscribe()

    // Error: context deadline exceeded (async processing takes longer than timeout)

    With different timeout values

    type Task struct {
    Name string
    Duration time.Duration
    }

    tasks := []Task{
    {"quick", 50 * time.Millisecond},
    {"medium", 100 * time.Millisecond},
    {"slow", 200 * time.Millisecond},
    }

    obs := ro.Pipe(
    ro.FromSlice(tasks),
    ro.ContextWithTimeout[Task](150 * time.Millisecond),
    ro.ThrowOnContextCancel[Task](),
    ro.Map(func(task Task) string {
    time.Sleep(task.Duration)
    return fmt.Sprintf("Task %s completed", task.Name)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "Task quick completed"
    // Next: "Task medium completed"
    // Error: context deadline exceeded (slow task times out)

    With context timeout handling

    obs := ro.Pipe(
    ro.Just("timed_operation"),
    ro.ContextWithTimeout[string](100 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.Catch(func(err error) Observable[string] {
    if errors.Is(err, context.DeadlineExceeded) {
    return ro.Just("Operation timed out - using fallback")
    }
    return ro.Throw[string](err)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Operation timed out - using fallback"
    // Completed
    Prototypes:
    func ContextWithTimeout[T any](timeout time.Duration)
    func ContextWithTimeoutCause[T any](timeout time.Duration, cause error)
  • ContextWithDeadlineโ€‹

    Adds a deadline to the context of each item in the observable sequence. Should be chained with ThrowOnContextCancel to handle deadline errors.

    deadline := time.Now().Add(100 * time.Millisecond)
    obs := ro.Pipe(
    ro.Just("deadline_operation"),
    ro.ContextWithDeadline[string](deadline),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(150 * time.Millisecond) // Simulate slow operation
    return fmt.Sprintf("Completed %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: context deadline exceeded (operation exceeded deadline)

    With successful completion before deadline

    deadline := time.Now().Add(200 * time.Millisecond)
    obs := ro.Pipe(
    ro.Just("fast_operation"),
    ro.ContextWithDeadline[string](deadline),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(100 * time.Millisecond) // Completes before deadline
    return fmt.Sprintf("Success: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Success: fast_operation"
    // Completed

    With ContextWithDeadlineCause

    deadline := time.Now().Add(50 * time.Millisecond)
    deadlineError := errors.New("processing deadline exceeded")
    obs := ro.Pipe(
    ro.Just("data_processing"),
    ro.ContextWithDeadlineCause[string](deadline, deadlineError),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(100 * time.Millisecond) // Will exceed deadline
    return fmt.Sprintf("Result: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: processing deadline exceeded (custom cause error)

    With future deadline

    // Set deadline for 5 seconds from now
    deadline := time.Now().Add(5 * time.Second)
    obs := ro.Pipe(
    ro.Just("long_operation"),
    ro.ContextWithDeadline[string](deadline),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(2 * time.Second) // Completes well before deadline
    return fmt.Sprintf("Long operation completed: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(2500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "Long operation completed: long_operation"
    // Completed

    With multiple operations and shared deadline

    deadline := time.Now().Add(200 * time.Millisecond)
    obs := ro.Pipe(
    ro.Just("op1", "op2", "op3"),
    ro.ContextWithDeadline[string](deadline),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    if s == "op2" {
    time.Sleep(250 * time.Millisecond) // This will exceed deadline
    }
    return fmt.Sprintf("Processed: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Processed: op1"
    // Error: context deadline exceeded (on op2)

    With deadline in the past

    // Set deadline to past time (immediate timeout)
    deadline := time.Now().Add(-1 * time.Hour)
    obs := ro.Pipe(
    ro.Just("immediate_timeout"),
    ro.ContextWithDeadline[string](deadline),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    return fmt.Sprintf("This won't execute: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: context deadline exceeded (deadline already passed)

    With deadline-based batch processing

    type BatchJob struct {
    ID string
    Work func() string
    }

    jobs := []BatchJob{
    {"job1", func() string { time.Sleep(50 * time.Millisecond); return "job1 result" }},
    {"job2", func() string { time.Sleep(150 * time.Millisecond); return "job2 result" }},
    {"job3", func() string { time.Sleep(200 * time.Millisecond); return "job3 result" }},
    }

    deadline := time.Now().Add(180 * time.Millisecond)
    obs := ro.Pipe(
    ro.FromSlice(jobs),
    ro.ContextWithDeadline[BatchJob](deadline),
    ro.ThrowOnContextCancel[BatchJob](),
    ro.Map(func(job BatchJob) string {
    return job.Work() // Execute the job
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(400 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "job1 result"
    // Next: "job2 result"
    // Error: context deadline exceeded (job3 times out)

    With deadline retry mechanism

    attempt := 0
    obs := ro.Pipe(
    ro.Defer(func() Observable[string] {
    attempt++
    deadline := time.Now().Add(100 * time.Millisecond)
    return ro.Pipe(
    ro.Just(fmt.Sprintf("attempt_%d", attempt)),
    ro.ContextWithDeadline[string](deadline),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(120 * time.Millisecond) // Always exceed deadline
    return fmt.Sprintf("Success: %s", s)
    }),
    )
    }),
    ro.RetryWithConfig[string](ro.RetryConfig{
    MaxRetries: 2,
    Delay: 50 * time.Millisecond,
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Will retry twice (attempt_1 and attempt_2 both timeout)
    // Error: context deadline exceeded

    With graceful deadline handling

    deadline := time.Now().Add(100 * time.Millisecond)
    obs := ro.Pipe(
    ro.Just("graceful_operation"),
    ro.ContextWithDeadline[string](deadline),
    ro.ThrowOnContextCancel[string](),
    ro.Catch(func(err error) Observable[string] {
    if errors.Is(err, context.DeadlineExceeded) {
    return ro.Just("Operation cancelled due to deadline - data saved")
    }
    return ro.Throw[string](err)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Operation cancelled due to deadline - data saved"
    // Completed
    Prototypes:
    func ContextWithDeadline[T any](deadline time.Time)
    func ContextWithDeadlineCause[T any](deadline time.Time, cause error)
  • Replaces the context with a new one (or context.Background() if nil) for each item in the observable sequence.

    // First add some context values
    obs := ro.Pipe(
    ro.Just("data1", "data2"),
    ro.ContextWithValue[string]("oldKey", "oldValue"),
    ro.ContextReset[string](context.Background()), // Reset to empty context
    ro.Map(func(s string) string {
    return fmt.Sprintf("Processed: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    oldValue := ctx.Value("oldKey")
    fmt.Printf("Next: %s (oldKey: %v)\n", value, oldValue)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Processed: data1 (oldKey: <nil>)
    // Next: Processed: data2 (oldKey: <nil>)
    // Completed

    With custom new context

    newCtx := context.WithValue(context.Background(), "newKey", "newValue")

    obs := ro.Pipe(
    ro.Just("item1", "item2"),
    ro.ContextWithValue[string]("originalKey", "originalValue"),
    ro.ContextReset[string](newCtx),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Item: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    original := ctx.Value("originalKey")
    newKey := ctx.Value("newKey")
    fmt.Printf("Next: %s (original: %v, new: %v)\n", value, original, newKey)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Item: item1 (original: <nil>, new: newValue)
    // Next: Item: item2 (original: <nil>, new: newValue)
    // Completed

    With context timeout reset

    // Original context with timeout
    timeoutCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
    defer cancel()

    // New context with longer timeout
    newTimeoutCtx, newCancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
    defer newCancel()

    obs := ro.Pipe(
    ro.Just("slow_operation"),
    ro.ContextReset[string](newTimeoutCtx),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(100 * time.Millisecond) // Would timeout with original context
    return fmt.Sprintf("Completed %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Completed: slow_operation"
    // Completed (succeeds because context was reset to longer timeout)

    With nil context (resets to Background)

    obs := ro.Pipe(
    ro.Just("reset_test"),
    ro.ContextWithValue[string]("someKey", "someValue"),
    ro.ContextReset[string](nil), // Resets to context.Background()
    ro.Map(func(s string) string {
    return fmt.Sprintf("Reset: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    someKey := ctx.Value("someKey")
    fmt.Printf("Next: %s (someKey: %v)\n", value, someKey)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Reset: reset_test (someKey: <nil>)
    // Completed

    With context isolation in async operations

    processAsync := func(item string) ro.Observable[string] {
    return ro.Defer(func() ro.Observable[string] {
    time.Sleep(50 * time.Millisecond)
    return ro.Just(fmt.Sprintf("Async: %s", item))
    })
    }

    obs := ro.Pipe(
    ro.Just("item1", "item2"),
    ro.ContextWithValue[string]("sharedID", "shared-123"),
    // Each item gets fresh context for async processing
    ro.ContextReset[string](context.Background()),
    ro.MergeMap(processAsync),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    sharedID := ctx.Value("sharedID")
    fmt.Printf("Next: %s (sharedID: %v)\n", value, sharedID)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    time.Sleep(200 * time.Millisecond)
    sub.Unsubscribe()

    // Next: Async: item1 (sharedID: <nil>)
    // Next: Async: item2 (sharedID: <nil>)
    // Completed

    With multiple context transformations

    initialCtx := context.WithValue(context.Background(), "step1", "value1")
    step2Ctx := context.WithValue(context.Background(), "step2", "value2")
    step3Ctx := context.WithValue(context.Background(), "step3", "value3")

    obs := ro.Pipe(
    ro.Just("multi_context"),
    ro.ContextWithValue[string]("extra", "extra_value"),
    ro.ContextReset[string](initialCtx),
    ro.ContextWithValue[string]("added", "added_value"),
    ro.ContextReset[string](step2Ctx),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Final: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    step1 := ctx.Value("step1")
    step2 := ctx.Value("step2")
    step3 := ctx.Value("step3")
    extra := ctx.Value("extra")
    added := ctx.Value("added")
    fmt.Printf("Next: %s (step1: %v, step2: %v, step3: %v, extra: %v, added: %v)\n",
    value, step1, step2, step3, extra, added)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Final: multi_context (step1: <nil>, step2: value2, step3: <nil>, extra: <nil>, added: <nil>)
    // Completed

    With context reset for error isolation

    obs := ro.Pipe(
    ro.Just("operation1", "operation2"),
    ro.ContextWithValue[string]("requestID", "req-abc"),
    ro.MapErr(func(s string) (string, error) {
    if s == "operation2" {
    return "", fmt.Errorf("error in %s", s)
    }
    return s, nil
    }),
    ro.Catch(func(err error) Observable[string] {
    // Reset context for fallback to avoid leaking sensitive data
    return ro.Pipe(
    ro.Just(fmt.Sprintf("Fallback: %v", err)),
    ro.ContextReset[string](context.Background()),
    )
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    requestID := ctx.Value("requestID")
    fmt.Printf("Next: %s (requestID: %v)\n", value, requestID)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: operation1 (requestID: req-abc)
    // Next: Fallback: error in operation2 (requestID: <nil>)
    // Completed
    Prototype:
    func ContextReset[T any](newCtx context.Context)
  • Transforms the context using a project function for each item in the observable sequence.

    obs := ro.Pipe(
    ro.Just("item1", "item2"),
    ro.ContextMap[string](func(ctx context.Context) context.Context {
    return context.WithValue(ctx, "processed", true)
    }),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Processed: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    processed := ctx.Value("processed")
    fmt.Printf("Next: %s (processed: %v)\n", value, processed)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Processed: item1 (processed: true)
    // Next: Processed: item2 (processed: true)
    // Completed

    ContextMapI with index

    obs := ro.Pipe(
    ro.Just("a", "b", "c"),
    ro.ContextMapI[string](func(ctx context.Context, index int64) context.Context {
    return context.WithValue(ctx, "itemIndex", index)
    }),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Item: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    itemIndex := ctx.Value("itemIndex")
    fmt.Printf("Next: %s (index: %v)\n", value, itemIndex)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Item: a (index: 0)
    // Next: Item: b (index: 1)
    // Next: Item: c (index: 2)
    // Completed

    With context transformation chain

    obs := ro.Pipe(
    ro.Just("data"),
    ro.ContextWithValue[string]("userID", 123),
    ro.ContextMap[string](func(ctx context.Context) context.Context {
    // Add timestamp to context
    return context.WithValue(ctx, "timestamp", time.Now())
    }),
    ro.ContextMap[string](func(ctx context.Context) context.Context {
    // Add request ID based on existing context
    userID := ctx.Value("userID")
    requestID := fmt.Sprintf("req-%v-%d", userID, time.Now().Unix())
    return context.WithValue(ctx, "requestID", requestID)
    }),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Data: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    userID := ctx.Value("userID")
    timestamp := ctx.Value("timestamp")
    requestID := ctx.Value("requestID")
    fmt.Printf("Next: %s (userID: %v, timestamp: %v, requestID: %v)\n",
    value, userID, timestamp, requestID)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Data: data (userID: 123, timestamp: <current_time>, requestID: req-123-<timestamp>)
    // Completed

    With context-based routing

    obs := ro.Pipe(
    ro.Just("request1", "request2", "request3"),
    ro.ContextMapI[string](func(index int64) context.Context {
    // Route even and odd items to different contexts
    if index%2 == 0 {
    return context.WithValue(ctx, "route", "primary")
    }
    return context.WithValue(ctx, "route", "secondary")
    }),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Processed: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    route := ctx.Value("route")
    fmt.Printf("Next: %s (route: %v)\n", value, route)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Processed: request1 (route: primary)
    // Next: Processed: request2 (route: secondary)
    // Next: Processed: request3 (route: primary)
    // Completed

    With context modification based on item content

    obs := ro.Pipe(
    ro.Just("urgent", "normal", "critical", "low"),
    ro.ContextMap[string](func(ctx context.Context) context.Context {
    // This would typically need access to the item value
    // For this example, we'll simulate context modification
    return context.WithValue(ctx, "processedAt", time.Now())
    }),
    ro.Map(func(s string) string {
    priority := "normal"
    if s == "urgent" || s == "critical" {
    priority = "high"
    }
    return fmt.Sprintf("%s (%s priority)", s, priority)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    processedAt := ctx.Value("processedAt")
    fmt.Printf("Next: %s (processedAt: %v)\n", value, processedAt)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: urgent (high priority) (processedAt: <timestamp>)
    // Next: normal (normal priority) (processedAt: <timestamp>)
    // Next: critical (high priority) (processedAt: <timestamp>)
    // Next: low (normal priority) (processedAt: <timestamp>)
    // Completed

    With context inheritance and modification

    baseCtx := context.WithValue(context.Background(), "sessionID", "session-abc")
    baseCtx = context.WithValue(baseCtx, "userID", 456)

    obs := ro.Pipe(
    ro.Just("operation1", "operation2"),
    ro.ContextReset[string](baseCtx),
    ro.ContextMap[string](func(ctx context.Context) context.Context {
    // Inherit from base context and add operation-specific data
    return context.WithValue(ctx, "operationID", fmt.Sprintf("op-%d", time.Now().UnixNano()))
    }),
    ro.ContextMapI[string](func(ctx context.Context, index int64) context.Context {
    // Add sequential information
    return context.WithValue(ctx, "sequence", index+1)
    }),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Executed: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value string) {
    sessionID := ctx.Value("sessionID")
    userID := ctx.Value("userID")
    operationID := ctx.Value("operationID")
    sequence := ctx.Value("sequence")
    fmt.Printf("Next: %s (session: %v, user: %v, opID: %v, seq: %v)\n",
    value, sessionID, userID, operationID, sequence)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Executed: operation1 (session: session-abc, user: 456, opID: op-<nanotime>, seq: 1)
    // Next: Executed: operation2 (session: session-abc, user: 456, opID: op-<nanotime>, seq: 2)
    // Completed

    With conditional context transformation

    obs := ro.Pipe(
    ro.Just("debug", "info", "error", "warning"),
    ro.ContextMapI[string](func(ctx context.Context, index int64) context.Context {
    // Transform context based on index
    ctx = context.WithValue(ctx, "index", index)
    if index >= 2 { // error and warning
    return context.WithValue(ctx, "severity", "high")
    }
    return context.WithValue(ctx, "severity", "low")
    }),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Log: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(ctx context.Context, value string) {
    index := ctx.Value("index")
    severity := ctx.Value("severity")
    fmt.Printf("Next: %s (index: %v, severity: %v)\n", value, index, severity)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    defer sub.Unsubscribe()

    // Next: Log: debug (index: 0, severity: low)
    // Next: Log: info (index: 1, severity: low)
    // Next: Log: error (index: 2, severity: high)
    // Next: Log: warning (index: 3, severity: high)
    // Completed
    Variant:
    Prototypes:
    func ContextMap[T any](project func(ctx context.Context) context.Context)
    func ContextMapI[T any](project func(ctx context.Context, index int64) context.Context)
  • ThrowOnContextCancelโ€‹

    Throws an error if the context is canceled. Should be chained after timeout/deadline operators to handle context cancellation.

    obs := ro.Pipe(
    ro.Just("timed_operation"),
    ro.ContextWithTimeout[string](100 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(150 * time.Millisecond) // Will exceed timeout
    return fmt.Sprintf("Completed %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: context deadline exceeded

    With successful completion

    obs := ro.Pipe(
    ro.Just("fast_operation"),
    ro.ContextWithTimeout[string](200 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(100 * time.Millisecond) // Completes within timeout
    return fmt.Sprintf("Success: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Success: fast_operation"
    // Completed

    With manual context cancellation

    ctx, cancel := context.WithCancel(context.Background())

    obs := ro.Pipe(
    ro.Just("cancelable_operation"),
    ro.ContextReset[string](ctx),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(200 * time.Millisecond)
    return fmt.Sprintf("Result: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())

    // Cancel after short delay
    go func() {
    time.Sleep(100 * time.Millisecond)
    cancel()
    }()

    time.Sleep(300 * time.Millisecond)
    sub.Unsubscribe()

    // Error: context canceled

    With deadline

    deadline := time.Now().Add(100 * time.Millisecond)
    obs := ro.Pipe(
    ro.Just("deadline_operation"),
    ro.ContextWithDeadline[string](deadline),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(150 * time.Millisecond) // Will exceed deadline
    return fmt.Sprintf("Deadline result: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: context deadline exceeded

    With retry on cancellation

    obs := ro.Pipe(
    ro.Defer(func() Observable[string] {
    ctx, cancel := context.WithCancel(context.Background())

    // Auto-cancel after short delay
    go func() {
    time.Sleep(50 * time.Millisecond)
    cancel()
    }()

    return ro.Pipe(
    ro.Just("retryable_operation"),
    ro.ContextReset[string](ctx),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(100 * time.Millisecond) // Will be cancelled
    return fmt.Sprintf("Success: %s", s)
    }),
    )
    }),
    ro.RetryWithConfig[string](RetryConfig{
    MaxRetries: 3,
    Delay: 100 * time.Millisecond,
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Will retry up to 3 times when context is cancelled
    // If all attempts are cancelled, final error will be context canceled

    With graceful error handling

    obs := ro.Pipe(
    ro.Just("graceful_operation"),
    ro.ContextWithTimeout[string](100 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.Catch(func(err error) Observable[string] {
    if errors.Is(err, context.Canceled) {
    return ro.Just("Operation was cancelled gracefully")
    }
    if errors.Is(err, context.DeadlineExceeded) {
    return ro.Just("Operation timed out - using cached result")
    }
    return ro.Throw[string](err)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Operation timed out - using cached result"
    // Completed

    With async operations

    processAsync := func(item string) Observable[string] {
    return ro.Defer(func() Observable[string] {
    time.Sleep(150 * time.Millisecond) // Simulate slow async work
    return ro.Just(fmt.Sprintf("Async result: %s", item))
    })
    }

    obs := ro.Pipe(
    ro.Just("item1", "item2"),
    ro.ContextWithTimeout[string](100 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.MergeMap(processAsync),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(300 * time.Millisecond)
    sub.Unsubscribe()

    // Error: context deadline exceeded (async operations take longer than timeout)

    With complex pipeline and multiple cancellation points

    obs := ro.Pipe(
    ro.Just("complex_pipeline"),
    ro.ContextWithValue[string]("requestID", "req-123"),
    ro.ContextWithTimeout[string](80 * time.Millisecond),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(50 * time.Millisecond)
    return fmt.Sprintf("Step1: %s", s)
    }),
    ro.Map(func(s string) string {
    time.Sleep(50 * time.Millisecond) // Will exceed timeout here
    return fmt.Sprintf("Step2: %s", s)
    }),
    ro.Map(func(s string) string {
    return fmt.Sprintf("Final: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: context deadline exceeded (pipeline cancelled during second map operation)

    With context cancellation from parent

    parentCtx, parentCancel := context.WithCancel(context.Background())

    obs := ro.Pipe(
    ro.Just("child_operation"),
    ro.ContextReset[string](parentCtx),
    ro.ThrowOnContextCancel[string](),
    ro.Map(func(s string) string {
    time.Sleep(200 * time.Millisecond)
    return fmt.Sprintf("Child result: %s", s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())

    // Cancel from parent context
    go func() {
    time.Sleep(100 * time.Millisecond)
    parentCancel()
    }()

    time.Sleep(300 * time.Millisecond)
    sub.Unsubscribe()

    // Error: context canceled (inherited from parent context)
    Prototype:
    func ThrowOnContextCancel[T any]()