This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Context control operatorsโ
This page lists all context operations, available in the core package of ro.
ContextWithValueโ
Adds a key-value pair to the context of each item in the observable sequence.
obs := ro.Pipe[string, string](ro.Just("request1", "request2"),ro.ContextWithValue[string]("requestID", "req-123"),ro.Map(func(s string) string {return fmt.Sprintf("Processing %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Each item now has context with requestID: "req-123"// Next: "Processing request1"// Next: "Processing request2"// CompletedWith context extraction in downstream operators
obs := ro.Pipe[string, string](ro.Just("data1", "data2"),ro.ContextWithValue[string]("userID", 42),ro.Map(func(s string) string {return fmt.Sprintf("User data: %s", s)}),)// Extract context in subscriptionsub := obs.Subscribe(ro.NewObserver(func(value string) {userID := ctx.Value("userID")fmt.Printf("Next: %s (userID: %v)\n", value, userID)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: User data: data1 (userID: 42)// Next: User data: data2 (userID: 42)// CompletedWith multiple context values
obs := ro.Pipe[string, string](ro.Just("task1", "task2"),ro.ContextWithValue[string]("traceID", "trace-abc"),ro.ContextWithValue[string]("sessionID", "session-xyz"),ro.Map(func(s string) string {return fmt.Sprintf("Executing %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {traceID := ctx.Value("traceID")sessionID := ctx.Value("sessionID")fmt.Printf("Next: %s (trace: %v, session: %v)\n", value, traceID, sessionID)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Executing task1 (trace: trace-abc, session: session-xyz)// Next: Executing task2 (trace: trace-abc, session: session-xyz)// CompletedWith structured context values
type RequestMetadata struct {TraceID stringTimestamp time.TimeUserID int}metadata := RequestMetadata{TraceID: "req-456",Timestamp: time.Now(),UserID: 789,}obs := ro.Pipe[string, string](ro.Just("api_call1", "api_call2"),ro.ContextWithValue[string]("metadata", metadata),ro.Map(func(s string) string {return fmt.Sprintf("API call: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {if meta, ok := ctx.Value("metadata").(RequestMetadata); ok {fmt.Printf("Next: %s (user: %d, trace: %s)\n",value, meta.UserID, meta.TraceID)}},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: API call: api_call1 (user: 789, trace: req-456)// Next: API call: api_call2 (user: 789, trace: req-456)// CompletedWith context-aware error handling
obs := ro.Pipe[string, string](ro.Just("critical_op1", "critical_op2"),ro.ContextWithValue[string]("operationType", "high_priority"),ro.MapErr(func(s string) (string, error) {if s == "critical_op2" {return "", fmt.Errorf("failed operation: %s", s)}return s, nil}),ro.Catch(func(err error) Observable[string] {return ro.Just(fmt.Sprintf("Fallback for error: %v", err))}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {opType := ctx.Value("operationType")fmt.Printf("Next: %s (type: %v)\n", value, opType)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: critical_op1 (type: high_priority)// Next: Fallback for error: failed operation: critical_op2 (type: high_priority)// CompletedWith nested context in async operations
processAsync := func(ctx context.Context, item string) ro.Observable[string] {traceID := ctx.Value("traceID")return ro.Defer(func() ro.Observable[string] {time.Sleep(50 * time.Millisecond) // Simulate async workreturn ro.Just(fmt.Sprintf("Processed %s (trace: %v)", item, traceID))})}obs := ro.Pipe[string, string](ro.Just("item1", "item2"),ro.ContextWithValue[string]("traceID", "async-trace-789"),ro.MergeMap(func(item string) ro.Observable[string] {return ro.Defer(func() ro.Observable[string] {// Extract current context for async operationreturn processAsync(context.Background(), item)})}),)sub := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(200 * time.Millisecond)sub.Unsubscribe()// Next: Processed item1 (trace: async-trace-789)// Next: Processed item2 (trace: async-trace-789)// CompletedSimilar:Prototype:func ContextWithValue[T any](k any, v any)ContextWithTimeoutโ
Adds a timeout to the context of each item in the observable sequence. Should be chained with ThrowOnContextCancel to handle timeout errors.
obs := ro.Pipe[string, string](ro.Just("slow_operation"),ro.ContextWithTimeout[string](100 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(150 * time.Millisecond) // Simulate slow operationreturn fmt.Sprintf("Completed %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Error: context deadline exceeded (operation took longer than 100ms timeout)With successful completion within timeout
obs := ro.Pipe[string, string](ro.Just("fast_operation"),ro.ContextWithTimeout[string](200 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(100 * time.Millisecond) // Completes within timeoutreturn fmt.Sprintf("Success: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Next: "Success: fast_operation"// CompletedWith ContextWithTimeoutCause
timeoutError := errors.New("operation timed out")obs := ro.Pipe[string, string](ro.Just("data_processing"),ro.ContextWithTimeoutCause[string](50 * time.Millisecond, timeoutError),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(100 * time.Millisecond) // Will timeoutreturn fmt.Sprintf("Result: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Error: operation timed out (custom cause error)With multiple operations and timeout
obs := ro.Pipe[string, string](ro.Just("op1", "op2", "op3"),ro.ContextWithTimeout[string](150 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {if s == "op2" {time.Sleep(200 * time.Millisecond) // This one will timeout}return fmt.Sprintf("Processed: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Next: "Processed: op1"// Error: context deadline exceeded (on op2)With retry mechanism after timeout
obs := ro.Pipe[string, string](ro.Defer(func() Observable[string] {return ro.Pipe[string, string](ro.Just("retry_operation"),ro.ContextWithTimeout[string](100 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(150 * time.Millisecond) // Always timeoutreturn fmt.Sprintf("Success: %s", s)}),)}),ro.RetryWithConfig[string](ro.RetryConfig{MaxRetries: 3,Delay: 50 * time.Millisecond,}),)sub := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(500 * time.Millisecond)sub.Unsubscribe()// Will retry up to 3 times with 50ms delays// Error: context deadline exceeded (if all retries timeout)With async operations
processItem := func(item string) Observable[string] {return ro.Defer(func() Observable[string] {time.Sleep(80 * time.Millisecond) // Simulate async processingreturn ro.Just(fmt.Sprintf("Async result: %s", item))})}obs := ro.Pipe[string, string](ro.Just("item1", "item2"),ro.ContextWithTimeout[string](60 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.MergeMap(processItem),)sub := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(200 * time.Millisecond)sub.Unsubscribe()// Error: context deadline exceeded (async processing takes longer than timeout)With different timeout values
type Task struct {Name stringDuration time.Duration}tasks := []Task{{"quick", 50 * time.Millisecond},{"medium", 100 * time.Millisecond},{"slow", 200 * time.Millisecond},}obs := ro.Pipe[Task, string](ro.FromSlice(tasks),ro.ContextWithTimeout[Task](150 * time.Millisecond),ro.ThrowOnContextCancel[Task](),ro.Map(func(task Task) string {time.Sleep(task.Duration)return fmt.Sprintf("Task %s completed", task.Name)}),)sub := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(500 * time.Millisecond)sub.Unsubscribe()// Next: "Task quick completed"// Next: "Task medium completed"// Error: context deadline exceeded (slow task times out)With context timeout handling
obs := ro.Pipe[string, string](ro.Just("timed_operation"),ro.ContextWithTimeout[string](100 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.Catch(func(err error) Observable[string] {if errors.Is(err, context.DeadlineExceeded) {return ro.Just("Operation timed out - using fallback")}return ro.Throw[string](err)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Next: "Operation timed out - using fallback"// CompletedVariant:Prototypes:func ContextWithTimeout[T any](timeout time.Duration)func ContextWithTimeoutCause[T any](timeout time.Duration, cause error)ContextWithDeadlineโ
Adds a deadline to the context of each item in the observable sequence. Should be chained with ThrowOnContextCancel to handle deadline errors.
deadline := time.Now().Add(100 * time.Millisecond)obs := ro.Pipe[string, string](ro.Just("deadline_operation"),ro.ContextWithDeadline[string](deadline),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(150 * time.Millisecond) // Simulate slow operationreturn fmt.Sprintf("Completed %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Error: context deadline exceeded (operation exceeded deadline)With successful completion before deadline
deadline := time.Now().Add(200 * time.Millisecond)obs := ro.Pipe[string, string](ro.Just("fast_operation"),ro.ContextWithDeadline[string](deadline),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(100 * time.Millisecond) // Completes before deadlinereturn fmt.Sprintf("Success: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Next: "Success: fast_operation"// CompletedWith ContextWithDeadlineCause
deadline := time.Now().Add(50 * time.Millisecond)deadlineError := errors.New("processing deadline exceeded")obs := ro.Pipe[string, string](ro.Just("data_processing"),ro.ContextWithDeadlineCause[string](deadline, deadlineError),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(100 * time.Millisecond) // Will exceed deadlinereturn fmt.Sprintf("Result: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Error: processing deadline exceeded (custom cause error)With future deadline
// Set deadline for 5 seconds from nowdeadline := time.Now().Add(5 * time.Second)obs := ro.Pipe[string, string](ro.Just("long_operation"),ro.ContextWithDeadline[string](deadline),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(2 * time.Second) // Completes well before deadlinereturn fmt.Sprintf("Long operation completed: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(2500 * time.Millisecond)sub.Unsubscribe()// Next: "Long operation completed: long_operation"// CompletedWith multiple operations and shared deadline
deadline := time.Now().Add(200 * time.Millisecond)obs := ro.Pipe[string, string](ro.Just("op1", "op2", "op3"),ro.ContextWithDeadline[string](deadline),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {if s == "op2" {time.Sleep(250 * time.Millisecond) // This will exceed deadline}return fmt.Sprintf("Processed: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Next: "Processed: op1"// Error: context deadline exceeded (on op2)With deadline in the past
// Set deadline to past time (immediate timeout)deadline := time.Now().Add(-1 * time.Hour)obs := ro.Pipe[string, string](ro.Just("immediate_timeout"),ro.ContextWithDeadline[string](deadline),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {return fmt.Sprintf("This won't execute: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Error: context deadline exceeded (deadline already passed)With deadline-based batch processing
type BatchJob struct {ID stringWork func() string}jobs := []BatchJob{{"job1", func() string { time.Sleep(50 * time.Millisecond); return "job1 result" }},{"job2", func() string { time.Sleep(150 * time.Millisecond); return "job2 result" }},{"job3", func() string { time.Sleep(200 * time.Millisecond); return "job3 result" }},}deadline := time.Now().Add(180 * time.Millisecond)obs := ro.Pipe[BatchJob, string](ro.FromSlice(jobs),ro.ContextWithDeadline[BatchJob](deadline),ro.ThrowOnContextCancel[BatchJob](),ro.Map(func(job BatchJob) string {return job.Work() // Execute the job}),)sub := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(400 * time.Millisecond)sub.Unsubscribe()// Next: "job1 result"// Next: "job2 result"// Error: context deadline exceeded (job3 times out)With deadline retry mechanism
attempt := 0obs := ro.Pipe[string, string](ro.Defer(func() Observable[string] {attempt++deadline := time.Now().Add(100 * time.Millisecond)return ro.Pipe[string, string](ro.Just(fmt.Sprintf("attempt_%d", attempt)),ro.ContextWithDeadline[string](deadline),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(120 * time.Millisecond) // Always exceed deadlinereturn fmt.Sprintf("Success: %s", s)}),)}),ro.RetryWithConfig[string](ro.RetryConfig{MaxRetries: 2,Delay: 50 * time.Millisecond,}),)sub := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(500 * time.Millisecond)sub.Unsubscribe()// Will retry twice (attempt_1 and attempt_2 both timeout)// Error: context deadline exceededWith graceful deadline handling
deadline := time.Now().Add(100 * time.Millisecond)obs := ro.Pipe[string, string](ro.Just("graceful_operation"),ro.ContextWithDeadline[string](deadline),ro.ThrowOnContextCancel[string](),ro.Catch(func(err error) Observable[string] {if errors.Is(err, context.DeadlineExceeded) {return ro.Just("Operation cancelled due to deadline - data saved")}return ro.Throw[string](err)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Next: "Operation cancelled due to deadline - data saved"// CompletedVariant:Prototypes:func ContextWithDeadline[T any](deadline time.Time)func ContextWithDeadlineCause[T any](deadline time.Time, cause error)ContextResetโ
Replaces the context with a new one (or context.Background() if nil) for each item in the observable sequence.
// First add some context valuesobs := ro.Pipe[string, string](ro.Just("data1", "data2"),ro.ContextWithValue[string]("oldKey", "oldValue"),ro.ContextReset[string](context.Background()), // Reset to empty contextro.Map(func(s string) string {return fmt.Sprintf("Processed: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {oldValue := ctx.Value("oldKey")fmt.Printf("Next: %s (oldKey: %v)\n", value, oldValue)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Processed: data1 (oldKey: <nil>)// Next: Processed: data2 (oldKey: <nil>)// CompletedWith custom new context
newCtx := context.WithValue(context.Background(), "newKey", "newValue")obs := ro.Pipe[string, string](ro.Just("item1", "item2"),ro.ContextWithValue[string]("originalKey", "originalValue"),ro.ContextReset[string](newCtx),ro.Map(func(s string) string {return fmt.Sprintf("Item: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {original := ctx.Value("originalKey")newKey := ctx.Value("newKey")fmt.Printf("Next: %s (original: %v, new: %v)\n", value, original, newKey)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Item: item1 (original: <nil>, new: newValue)// Next: Item: item2 (original: <nil>, new: newValue)// CompletedWith context timeout reset
// Original context with timeouttimeoutCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)defer cancel()// New context with longer timeoutnewTimeoutCtx, newCancel := context.WithTimeout(context.Background(), 200*time.Millisecond)defer newCancel()obs := ro.Pipe[string, string](ro.Just("slow_operation"),ro.ContextReset[string](newTimeoutCtx),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(100 * time.Millisecond) // Would timeout with original contextreturn fmt.Sprintf("Completed %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Next: "Completed: slow_operation"// Completed (succeeds because context was reset to longer timeout)With nil context (resets to Background)
obs := ro.Pipe[string, string](ro.Just("reset_test"),ro.ContextWithValue[string]("someKey", "someValue"),ro.ContextReset[string](nil), // Resets to context.Background()ro.Map(func(s string) string {return fmt.Sprintf("Reset: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {someKey := ctx.Value("someKey")fmt.Printf("Next: %s (someKey: %v)\n", value, someKey)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Reset: reset_test (someKey: <nil>)// CompletedWith context isolation in async operations
processAsync := func(item string) ro.Observable[string] {return ro.Defer(func() ro.Observable[string] {time.Sleep(50 * time.Millisecond)return ro.Just(fmt.Sprintf("Async: %s", item))})}obs := ro.Pipe[string, string](ro.Just("item1", "item2"),ro.ContextWithValue[string]("sharedID", "shared-123"),// Each item gets fresh context for async processingro.ContextReset[string](context.Background()),ro.MergeMap(processAsync),)sub := obs.Subscribe(ro.NewObserver(func(value string) {sharedID := ctx.Value("sharedID")fmt.Printf("Next: %s (sharedID: %v)\n", value, sharedID)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))time.Sleep(200 * time.Millisecond)sub.Unsubscribe()// Next: Async: item1 (sharedID: <nil>)// Next: Async: item2 (sharedID: <nil>)// CompletedWith multiple context transformations
initialCtx := context.WithValue(context.Background(), "step1", "value1")step2Ctx := context.WithValue(context.Background(), "step2", "value2")step3Ctx := context.WithValue(context.Background(), "step3", "value3")obs := ro.Pipe[string, string](ro.Just("multi_context"),ro.ContextWithValue[string]("extra", "extra_value"),ro.ContextReset[string](initialCtx),ro.ContextWithValue[string]("added", "added_value"),ro.ContextReset[string](step2Ctx),ro.Map(func(s string) string {return fmt.Sprintf("Final: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {step1 := ctx.Value("step1")step2 := ctx.Value("step2")step3 := ctx.Value("step3")extra := ctx.Value("extra")added := ctx.Value("added")fmt.Printf("Next: %s (step1: %v, step2: %v, step3: %v, extra: %v, added: %v)\n",value, step1, step2, step3, extra, added)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Final: multi_context (step1: <nil>, step2: value2, step3: <nil>, extra: <nil>, added: <nil>)// CompletedWith context reset for error isolation
obs := ro.Pipe[string, string](ro.Just("operation1", "operation2"),ro.ContextWithValue[string]("requestID", "req-abc"),ro.MapErr(func(s string) (string, error) {if s == "operation2" {return "", fmt.Errorf("error in %s", s)}return s, nil}),ro.Catch(func(err error) Observable[string] {// Reset context for fallback to avoid leaking sensitive datareturn ro.Pipe[string, string](ro.Just(fmt.Sprintf("Fallback: %v", err)),ro.ContextReset[string](context.Background()),)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {requestID := ctx.Value("requestID")fmt.Printf("Next: %s (requestID: %v)\n", value, requestID)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: operation1 (requestID: req-abc)// Next: Fallback: error in operation2 (requestID: <nil>)// CompletedSimilar:Prototype:func ContextReset[T any](newCtx context.Context)ContextMapโ
Transforms the context using a project function for each item in the observable sequence.
obs := ro.Pipe[string, string](ro.Just("item1", "item2"),ro.ContextMap[string](func(ctx context.Context) context.Context {return context.WithValue(ctx, "processed", true)}),ro.Map(func(s string) string {return fmt.Sprintf("Processed: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {processed := ctx.Value("processed")fmt.Printf("Next: %s (processed: %v)\n", value, processed)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Processed: item1 (processed: true)// Next: Processed: item2 (processed: true)// CompletedContextMapI with index
obs := ro.Pipe[string, string](ro.Just("a", "b", "c"),ro.ContextMapI[string](func(ctx context.Context, index int64) context.Context {return context.WithValue(ctx, "itemIndex", index)}),ro.Map(func(s string) string {return fmt.Sprintf("Item: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {itemIndex := ctx.Value("itemIndex")fmt.Printf("Next: %s (index: %v)\n", value, itemIndex)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Item: a (index: 0)// Next: Item: b (index: 1)// Next: Item: c (index: 2)// CompletedWith context transformation chain
obs := ro.Pipe[string, string](ro.Just("data"),ro.ContextWithValue[string]("userID", 123),ro.ContextMap[string](func(ctx context.Context) context.Context {// Add timestamp to contextreturn context.WithValue(ctx, "timestamp", time.Now())}),ro.ContextMap[string](func(ctx context.Context) context.Context {// Add request ID based on existing contextuserID := ctx.Value("userID")requestID := fmt.Sprintf("req-%v-%d", userID, time.Now().Unix())return context.WithValue(ctx, "requestID", requestID)}),ro.Map(func(s string) string {return fmt.Sprintf("Data: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {userID := ctx.Value("userID")timestamp := ctx.Value("timestamp")requestID := ctx.Value("requestID")fmt.Printf("Next: %s (userID: %v, timestamp: %v, requestID: %v)\n",value, userID, timestamp, requestID)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Data: data (userID: 123, timestamp: <current_time>, requestID: req-123-<timestamp>)// CompletedWith context-based routing
obs := ro.Pipe[string, string](ro.Just("request1", "request2", "request3"),ro.ContextMapI[string](func(ctx context.Context, index int64) context.Context {// Route even and odd items to different contextsif index%2 == 0 {return context.WithValue(ctx, "route", "primary")}return context.WithValue(ctx, "route", "secondary")}),ro.Map(func(s string) string {return fmt.Sprintf("Processed: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {route := ctx.Value("route")fmt.Printf("Next: %s (route: %v)\n", value, route)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Processed: request1 (route: primary)// Next: Processed: request2 (route: secondary)// Next: Processed: request3 (route: primary)// CompletedWith context modification based on item content
obs := ro.Pipe[string, string](ro.Just("urgent", "normal", "critical", "low"),ro.ContextMap[string](func(ctx context.Context) context.Context {// This would typically need access to the item value// For this example, we'll simulate context modificationreturn context.WithValue(ctx, "processedAt", time.Now())}),ro.Map(func(s string) string {priority := "normal"if s == "urgent" || s == "critical" {priority = "high"}return fmt.Sprintf("%s (%s priority)", s, priority)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {processedAt := ctx.Value("processedAt")fmt.Printf("Next: %s (processedAt: %v)\n", value, processedAt)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: urgent (high priority) (processedAt: <timestamp>)// Next: normal (normal priority) (processedAt: <timestamp>)// Next: critical (high priority) (processedAt: <timestamp>)// Next: low (normal priority) (processedAt: <timestamp>)// CompletedWith context inheritance and modification
baseCtx := context.WithValue(context.Background(), "sessionID", "session-abc")baseCtx = context.WithValue(baseCtx, "userID", 456)obs := ro.Pipe[string, string](ro.Just("operation1", "operation2"),ro.ContextReset[string](baseCtx),ro.ContextMap[string](func(ctx context.Context) context.Context {// Inherit from base context and add operation-specific datareturn context.WithValue(ctx, "operationID", fmt.Sprintf("op-%d", time.Now().UnixNano()))}),ro.ContextMapI[string](func(ctx context.Context, index int64) context.Context {// Add sequential informationreturn context.WithValue(ctx, "sequence", index+1)}),ro.Map(func(s string) string {return fmt.Sprintf("Executed: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(value string) {sessionID := ctx.Value("sessionID")userID := ctx.Value("userID")operationID := ctx.Value("operationID")sequence := ctx.Value("sequence")fmt.Printf("Next: %s (session: %v, user: %v, opID: %v, seq: %v)\n",value, sessionID, userID, operationID, sequence)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Executed: operation1 (session: session-abc, user: 456, opID: op-<nanotime>, seq: 1)// Next: Executed: operation2 (session: session-abc, user: 456, opID: op-<nanotime>, seq: 2)// CompletedWith conditional context transformation
obs := ro.Pipe[string, string](ro.Just("debug", "info", "error", "warning"),ro.ContextMapI[string](func(ctx context.Context, index int64) context.Context {// Transform context based on indexctx = context.WithValue(ctx, "index", index)if index >= 2 { // error and warningreturn context.WithValue(ctx, "severity", "high")}return context.WithValue(ctx, "severity", "low")}),ro.Map(func(s string) string {return fmt.Sprintf("Log: %s", s)}),)sub := obs.Subscribe(ro.NewObserver(func(ctx context.Context, value string) {index := ctx.Value("index")severity := ctx.Value("severity")fmt.Printf("Next: %s (index: %v, severity: %v)\n", value, index, severity)},func(err error) {fmt.Printf("Error: %v\n", err)},func() {fmt.Println("Completed")},))defer sub.Unsubscribe()// Next: Log: debug (index: 0, severity: low)// Next: Log: info (index: 1, severity: low)// Next: Log: error (index: 2, severity: high)// Next: Log: warning (index: 3, severity: high)// CompletedVariant:Similar:Prototypes:func ContextMap[T any](project func(ctx context.Context) context.Context)func ContextMapI[T any](project func(ctx context.Context, index int64) context.Context)ThrowOnContextCancelโ
Throws an error if the context is canceled. Should be chained after timeout/deadline operators to handle context cancellation.
obs := ro.Pipe[string, string](ro.Just("timed_operation"),ro.ContextWithTimeout[string](100 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(150 * time.Millisecond) // Will exceed timeoutreturn fmt.Sprintf("Completed %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Error: context deadline exceededWith successful completion
obs := ro.Pipe[string, string](ro.Just("fast_operation"),ro.ContextWithTimeout[string](200 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(100 * time.Millisecond) // Completes within timeoutreturn fmt.Sprintf("Success: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Next: "Success: fast_operation"// CompletedWith manual context cancellation
ctx, cancel := context.WithCancel(context.Background())obs := ro.Pipe[string, string](ro.Just("cancelable_operation"),ro.ContextReset[string](ctx),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(200 * time.Millisecond)return fmt.Sprintf("Result: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())// Cancel after short delaygo func() {time.Sleep(100 * time.Millisecond)cancel()}()time.Sleep(300 * time.Millisecond)sub.Unsubscribe()// Error: context canceledWith deadline
deadline := time.Now().Add(100 * time.Millisecond)obs := ro.Pipe[string, string](ro.Just("deadline_operation"),ro.ContextWithDeadline[string](deadline),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(150 * time.Millisecond) // Will exceed deadlinereturn fmt.Sprintf("Deadline result: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Error: context deadline exceededWith retry on cancellation
obs := ro.Pipe[string, string](ro.Defer(func() Observable[string] {ctx, cancel := context.WithCancel(context.Background())// Auto-cancel after short delaygo func() {time.Sleep(50 * time.Millisecond)cancel()}()return ro.Pipe[string, string](ro.Just("retryable_operation"),ro.ContextReset[string](ctx),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(100 * time.Millisecond) // Will be cancelledreturn fmt.Sprintf("Success: %s", s)}),)}),ro.RetryWithConfig[string](RetryConfig{MaxRetries: 3,Delay: 100 * time.Millisecond,}),)sub := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(500 * time.Millisecond)sub.Unsubscribe()// Will retry up to 3 times when context is cancelled// If all attempts are cancelled, final error will be context canceledWith graceful error handling
obs := ro.Pipe[string, string](ro.Just("graceful_operation"),ro.ContextWithTimeout[string](100 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.Catch(func(err error) Observable[string] {if errors.Is(err, context.Canceled) {return ro.Just("Operation was cancelled gracefully")}if errors.Is(err, context.DeadlineExceeded) {return ro.Just("Operation timed out - using cached result")}return ro.Throw[string](err)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Next: "Operation timed out - using cached result"// CompletedWith async operations
processAsync := func(item string) Observable[string] {return ro.Defer(func() Observable[string] {time.Sleep(150 * time.Millisecond) // Simulate slow async workreturn ro.Just(fmt.Sprintf("Async result: %s", item))})}obs := ro.Pipe[string, string](ro.Just("item1", "item2"),ro.ContextWithTimeout[string](100 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.MergeMap(processAsync),)sub := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(300 * time.Millisecond)sub.Unsubscribe()// Error: context deadline exceeded (async operations take longer than timeout)With complex pipeline and multiple cancellation points
obs := ro.Pipe[string, string](ro.Just("complex_pipeline"),ro.ContextWithValue[string]("requestID", "req-123"),ro.ContextWithTimeout[string](80 * time.Millisecond),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(50 * time.Millisecond)return fmt.Sprintf("Step1: %s", s)}),ro.Map(func(s string) string {time.Sleep(50 * time.Millisecond) // Will exceed timeout herereturn fmt.Sprintf("Step2: %s", s)}),ro.Map(func(s string) string {return fmt.Sprintf("Final: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())defer sub.Unsubscribe()// Error: context deadline exceeded (pipeline cancelled during second map operation)With context cancellation from parent
parentCtx, parentCancel := context.WithCancel(context.Background())obs := ro.Pipe[string, string](ro.Just("child_operation"),ro.ContextReset[string](parentCtx),ro.ThrowOnContextCancel[string](),ro.Map(func(s string) string {time.Sleep(200 * time.Millisecond)return fmt.Sprintf("Child result: %s", s)}),)sub := obs.Subscribe(ro.PrintObserver[string]())// Cancel from parent contextgo func() {time.Sleep(100 * time.Millisecond)parentCancel()}()time.Sleep(300 * time.Millisecond)sub.Unsubscribe()// Error: context canceled (inherited from parent context)Prototype:func ThrowOnContextCancel[T any]()