This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Context control operatorsโ
This page lists all context operations, available in the core package of ro.
ContextWithValueโ
Adds a key-value pair to the context of each item in the observable sequence.
obs := ro.Pipe(
ro.Just("request1", "request2"),
ro.ContextWithValue[string]("requestID", "req-123"),
ro.Map(func(s string) string {
return fmt.Sprintf("Processing %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Each item now has context with requestID: "req-123"
// Next: "Processing request1"
// Next: "Processing request2"
// CompletedWith context extraction in downstream operators
obs := ro.Pipe(
ro.Just("data1", "data2"),
ro.ContextWithValue[string]("userID", 42),
ro.Map(func(s string) string {
return fmt.Sprintf("User data: %s", s)
}),
)
// Extract context in subscription
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
userID := ctx.Value("userID")
fmt.Printf("Next: %s (userID: %v)\n", value, userID)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: User data: data1 (userID: 42)
// Next: User data: data2 (userID: 42)
// CompletedWith multiple context values
obs := ro.Pipe(
ro.Just("task1", "task2"),
ro.ContextWithValue[string]("traceID", "trace-abc"),
ro.ContextWithValue[string]("sessionID", "session-xyz"),
ro.Map(func(s string) string {
return fmt.Sprintf("Executing %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
traceID := ctx.Value("traceID")
sessionID := ctx.Value("sessionID")
fmt.Printf("Next: %s (trace: %v, session: %v)\n", value, traceID, sessionID)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Executing task1 (trace: trace-abc, session: session-xyz)
// Next: Executing task2 (trace: trace-abc, session: session-xyz)
// CompletedWith structured context values
type RequestMetadata struct {
TraceID string
Timestamp time.Time
UserID int
}
metadata := RequestMetadata{
TraceID: "req-456",
Timestamp: time.Now(),
UserID: 789,
}
obs := ro.Pipe(
ro.Just("api_call1", "api_call2"),
ro.ContextWithValue[string]("metadata", metadata),
ro.Map(func(s string) string {
return fmt.Sprintf("API call: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
if meta, ok := ctx.Value("metadata").(RequestMetadata); ok {
fmt.Printf("Next: %s (user: %d, trace: %s)\n",
value, meta.UserID, meta.TraceID)
}
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: API call: api_call1 (user: 789, trace: req-456)
// Next: API call: api_call2 (user: 789, trace: req-456)
// CompletedWith context-aware error handling
obs := ro.Pipe(
ro.Just("critical_op1", "critical_op2"),
ro.ContextWithValue[string]("operationType", "high_priority"),
ro.MapErr(func(s string) (string, error) {
if s == "critical_op2" {
return "", fmt.Errorf("failed operation: %s", s)
}
return s, nil
}),
ro.Catch(func(err error) Observable[string] {
return ro.Just(fmt.Sprintf("Fallback for error: %v", err))
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
opType := ctx.Value("operationType")
fmt.Printf("Next: %s (type: %v)\n", value, opType)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: critical_op1 (type: high_priority)
// Next: Fallback for error: failed operation: critical_op2 (type: high_priority)
// CompletedWith nested context in async operations
processAsync := func(ctx context.Context, item string) ro.Observable[string] {
traceID := ctx.Value("traceID")
return ro.Defer(func() ro.Observable[string] {
time.Sleep(50 * time.Millisecond) // Simulate async work
return ro.Just(fmt.Sprintf("Processed %s (trace: %v)", item, traceID))
})
}
obs := ro.Pipe(
ro.Just("item1", "item2"),
ro.ContextWithValue[string]("traceID", "async-trace-789"),
ro.MergeMap(func(item string) ro.Observable[string] {
return ro.Defer(func() ro.Observable[string] {
// Extract current context for async operation
return processAsync(context.Background(), item)
})
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
sub.Unsubscribe()
// Next: Processed item1 (trace: async-trace-789)
// Next: Processed item2 (trace: async-trace-789)
// CompletedSimilar:Prototype:func ContextWithValue[T any](k any, v any)
ContextWithTimeoutโ
Adds a timeout to the context of each item in the observable sequence. Should be chained with ThrowOnContextCancel to handle timeout errors.
obs := ro.Pipe(
ro.Just("slow_operation"),
ro.ContextWithTimeout[string](100 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(150 * time.Millisecond) // Simulate slow operation
return fmt.Sprintf("Completed %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: context deadline exceeded (operation took longer than 100ms timeout)With successful completion within timeout
obs := ro.Pipe(
ro.Just("fast_operation"),
ro.ContextWithTimeout[string](200 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(100 * time.Millisecond) // Completes within timeout
return fmt.Sprintf("Success: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Success: fast_operation"
// CompletedWith ContextWithTimeoutCause
timeoutError := errors.New("operation timed out")
obs := ro.Pipe(
ro.Just("data_processing"),
ro.ContextWithTimeoutCause[string](50 * time.Millisecond, timeoutError),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(100 * time.Millisecond) // Will timeout
return fmt.Sprintf("Result: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: operation timed out (custom cause error)With multiple operations and timeout
obs := ro.Pipe(
ro.Just("op1", "op2", "op3"),
ro.ContextWithTimeout[string](150 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
if s == "op2" {
time.Sleep(200 * time.Millisecond) // This one will timeout
}
return fmt.Sprintf("Processed: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Processed: op1"
// Error: context deadline exceeded (on op2)With retry mechanism after timeout
obs := ro.Pipe(
ro.Defer(func() Observable[string] {
return ro.Pipe(
ro.Just("retry_operation"),
ro.ContextWithTimeout[string](100 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(150 * time.Millisecond) // Always timeout
return fmt.Sprintf("Success: %s", s)
}),
)
}),
ro.RetryWithConfig[string](ro.RetryConfig{
MaxRetries: 3,
Delay: 50 * time.Millisecond,
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Will retry up to 3 times with 50ms delays
// Error: context deadline exceeded (if all retries timeout)With async operations
processItem := func(item string) Observable[string] {
return ro.Defer(func() Observable[string] {
time.Sleep(80 * time.Millisecond) // Simulate async processing
return ro.Just(fmt.Sprintf("Async result: %s", item))
})
}
obs := ro.Pipe(
ro.Just("item1", "item2"),
ro.ContextWithTimeout[string](60 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.MergeMap(processItem),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
sub.Unsubscribe()
// Error: context deadline exceeded (async processing takes longer than timeout)With different timeout values
type Task struct {
Name string
Duration time.Duration
}
tasks := []Task{
{"quick", 50 * time.Millisecond},
{"medium", 100 * time.Millisecond},
{"slow", 200 * time.Millisecond},
}
obs := ro.Pipe(
ro.FromSlice(tasks),
ro.ContextWithTimeout[Task](150 * time.Millisecond),
ro.ThrowOnContextCancel[Task](),
ro.Map(func(task Task) string {
time.Sleep(task.Duration)
return fmt.Sprintf("Task %s completed", task.Name)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Next: "Task quick completed"
// Next: "Task medium completed"
// Error: context deadline exceeded (slow task times out)With context timeout handling
obs := ro.Pipe(
ro.Just("timed_operation"),
ro.ContextWithTimeout[string](100 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.Catch(func(err error) Observable[string] {
if errors.Is(err, context.DeadlineExceeded) {
return ro.Just("Operation timed out - using fallback")
}
return ro.Throw[string](err)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Operation timed out - using fallback"
// CompletedVariant:Prototypes:func ContextWithTimeout[T any](timeout time.Duration)
func ContextWithTimeoutCause[T any](timeout time.Duration, cause error)ContextWithDeadlineโ
Adds a deadline to the context of each item in the observable sequence. Should be chained with ThrowOnContextCancel to handle deadline errors.
deadline := time.Now().Add(100 * time.Millisecond)
obs := ro.Pipe(
ro.Just("deadline_operation"),
ro.ContextWithDeadline[string](deadline),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(150 * time.Millisecond) // Simulate slow operation
return fmt.Sprintf("Completed %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: context deadline exceeded (operation exceeded deadline)With successful completion before deadline
deadline := time.Now().Add(200 * time.Millisecond)
obs := ro.Pipe(
ro.Just("fast_operation"),
ro.ContextWithDeadline[string](deadline),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(100 * time.Millisecond) // Completes before deadline
return fmt.Sprintf("Success: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Success: fast_operation"
// CompletedWith ContextWithDeadlineCause
deadline := time.Now().Add(50 * time.Millisecond)
deadlineError := errors.New("processing deadline exceeded")
obs := ro.Pipe(
ro.Just("data_processing"),
ro.ContextWithDeadlineCause[string](deadline, deadlineError),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(100 * time.Millisecond) // Will exceed deadline
return fmt.Sprintf("Result: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: processing deadline exceeded (custom cause error)With future deadline
// Set deadline for 5 seconds from now
deadline := time.Now().Add(5 * time.Second)
obs := ro.Pipe(
ro.Just("long_operation"),
ro.ContextWithDeadline[string](deadline),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(2 * time.Second) // Completes well before deadline
return fmt.Sprintf("Long operation completed: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(2500 * time.Millisecond)
sub.Unsubscribe()
// Next: "Long operation completed: long_operation"
// CompletedWith multiple operations and shared deadline
deadline := time.Now().Add(200 * time.Millisecond)
obs := ro.Pipe(
ro.Just("op1", "op2", "op3"),
ro.ContextWithDeadline[string](deadline),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
if s == "op2" {
time.Sleep(250 * time.Millisecond) // This will exceed deadline
}
return fmt.Sprintf("Processed: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Processed: op1"
// Error: context deadline exceeded (on op2)With deadline in the past
// Set deadline to past time (immediate timeout)
deadline := time.Now().Add(-1 * time.Hour)
obs := ro.Pipe(
ro.Just("immediate_timeout"),
ro.ContextWithDeadline[string](deadline),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
return fmt.Sprintf("This won't execute: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: context deadline exceeded (deadline already passed)With deadline-based batch processing
type BatchJob struct {
ID string
Work func() string
}
jobs := []BatchJob{
{"job1", func() string { time.Sleep(50 * time.Millisecond); return "job1 result" }},
{"job2", func() string { time.Sleep(150 * time.Millisecond); return "job2 result" }},
{"job3", func() string { time.Sleep(200 * time.Millisecond); return "job3 result" }},
}
deadline := time.Now().Add(180 * time.Millisecond)
obs := ro.Pipe(
ro.FromSlice(jobs),
ro.ContextWithDeadline[BatchJob](deadline),
ro.ThrowOnContextCancel[BatchJob](),
ro.Map(func(job BatchJob) string {
return job.Work() // Execute the job
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(400 * time.Millisecond)
sub.Unsubscribe()
// Next: "job1 result"
// Next: "job2 result"
// Error: context deadline exceeded (job3 times out)With deadline retry mechanism
attempt := 0
obs := ro.Pipe(
ro.Defer(func() Observable[string] {
attempt++
deadline := time.Now().Add(100 * time.Millisecond)
return ro.Pipe(
ro.Just(fmt.Sprintf("attempt_%d", attempt)),
ro.ContextWithDeadline[string](deadline),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(120 * time.Millisecond) // Always exceed deadline
return fmt.Sprintf("Success: %s", s)
}),
)
}),
ro.RetryWithConfig[string](ro.RetryConfig{
MaxRetries: 2,
Delay: 50 * time.Millisecond,
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Will retry twice (attempt_1 and attempt_2 both timeout)
// Error: context deadline exceededWith graceful deadline handling
deadline := time.Now().Add(100 * time.Millisecond)
obs := ro.Pipe(
ro.Just("graceful_operation"),
ro.ContextWithDeadline[string](deadline),
ro.ThrowOnContextCancel[string](),
ro.Catch(func(err error) Observable[string] {
if errors.Is(err, context.DeadlineExceeded) {
return ro.Just("Operation cancelled due to deadline - data saved")
}
return ro.Throw[string](err)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Operation cancelled due to deadline - data saved"
// CompletedVariant:Prototypes:func ContextWithDeadline[T any](deadline time.Time)
func ContextWithDeadlineCause[T any](deadline time.Time, cause error)ContextResetโ
Replaces the context with a new one (or context.Background() if nil) for each item in the observable sequence.
// First add some context values
obs := ro.Pipe(
ro.Just("data1", "data2"),
ro.ContextWithValue[string]("oldKey", "oldValue"),
ro.ContextReset[string](context.Background()), // Reset to empty context
ro.Map(func(s string) string {
return fmt.Sprintf("Processed: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
oldValue := ctx.Value("oldKey")
fmt.Printf("Next: %s (oldKey: %v)\n", value, oldValue)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Processed: data1 (oldKey: <nil>)
// Next: Processed: data2 (oldKey: <nil>)
// CompletedWith custom new context
newCtx := context.WithValue(context.Background(), "newKey", "newValue")
obs := ro.Pipe(
ro.Just("item1", "item2"),
ro.ContextWithValue[string]("originalKey", "originalValue"),
ro.ContextReset[string](newCtx),
ro.Map(func(s string) string {
return fmt.Sprintf("Item: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
original := ctx.Value("originalKey")
newKey := ctx.Value("newKey")
fmt.Printf("Next: %s (original: %v, new: %v)\n", value, original, newKey)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Item: item1 (original: <nil>, new: newValue)
// Next: Item: item2 (original: <nil>, new: newValue)
// CompletedWith context timeout reset
// Original context with timeout
timeoutCtx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
// New context with longer timeout
newTimeoutCtx, newCancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer newCancel()
obs := ro.Pipe(
ro.Just("slow_operation"),
ro.ContextReset[string](newTimeoutCtx),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(100 * time.Millisecond) // Would timeout with original context
return fmt.Sprintf("Completed %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Completed: slow_operation"
// Completed (succeeds because context was reset to longer timeout)With nil context (resets to Background)
obs := ro.Pipe(
ro.Just("reset_test"),
ro.ContextWithValue[string]("someKey", "someValue"),
ro.ContextReset[string](nil), // Resets to context.Background()
ro.Map(func(s string) string {
return fmt.Sprintf("Reset: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
someKey := ctx.Value("someKey")
fmt.Printf("Next: %s (someKey: %v)\n", value, someKey)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Reset: reset_test (someKey: <nil>)
// CompletedWith context isolation in async operations
processAsync := func(item string) ro.Observable[string] {
return ro.Defer(func() ro.Observable[string] {
time.Sleep(50 * time.Millisecond)
return ro.Just(fmt.Sprintf("Async: %s", item))
})
}
obs := ro.Pipe(
ro.Just("item1", "item2"),
ro.ContextWithValue[string]("sharedID", "shared-123"),
// Each item gets fresh context for async processing
ro.ContextReset[string](context.Background()),
ro.MergeMap(processAsync),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
sharedID := ctx.Value("sharedID")
fmt.Printf("Next: %s (sharedID: %v)\n", value, sharedID)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
time.Sleep(200 * time.Millisecond)
sub.Unsubscribe()
// Next: Async: item1 (sharedID: <nil>)
// Next: Async: item2 (sharedID: <nil>)
// CompletedWith multiple context transformations
initialCtx := context.WithValue(context.Background(), "step1", "value1")
step2Ctx := context.WithValue(context.Background(), "step2", "value2")
step3Ctx := context.WithValue(context.Background(), "step3", "value3")
obs := ro.Pipe(
ro.Just("multi_context"),
ro.ContextWithValue[string]("extra", "extra_value"),
ro.ContextReset[string](initialCtx),
ro.ContextWithValue[string]("added", "added_value"),
ro.ContextReset[string](step2Ctx),
ro.Map(func(s string) string {
return fmt.Sprintf("Final: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
step1 := ctx.Value("step1")
step2 := ctx.Value("step2")
step3 := ctx.Value("step3")
extra := ctx.Value("extra")
added := ctx.Value("added")
fmt.Printf("Next: %s (step1: %v, step2: %v, step3: %v, extra: %v, added: %v)\n",
value, step1, step2, step3, extra, added)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Final: multi_context (step1: <nil>, step2: value2, step3: <nil>, extra: <nil>, added: <nil>)
// CompletedWith context reset for error isolation
obs := ro.Pipe(
ro.Just("operation1", "operation2"),
ro.ContextWithValue[string]("requestID", "req-abc"),
ro.MapErr(func(s string) (string, error) {
if s == "operation2" {
return "", fmt.Errorf("error in %s", s)
}
return s, nil
}),
ro.Catch(func(err error) Observable[string] {
// Reset context for fallback to avoid leaking sensitive data
return ro.Pipe(
ro.Just(fmt.Sprintf("Fallback: %v", err)),
ro.ContextReset[string](context.Background()),
)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
requestID := ctx.Value("requestID")
fmt.Printf("Next: %s (requestID: %v)\n", value, requestID)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: operation1 (requestID: req-abc)
// Next: Fallback: error in operation2 (requestID: <nil>)
// CompletedSimilar:Prototype:func ContextReset[T any](newCtx context.Context)
ContextMapโ
Transforms the context using a project function for each item in the observable sequence.
obs := ro.Pipe(
ro.Just("item1", "item2"),
ro.ContextMap[string](func(ctx context.Context) context.Context {
return context.WithValue(ctx, "processed", true)
}),
ro.Map(func(s string) string {
return fmt.Sprintf("Processed: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
processed := ctx.Value("processed")
fmt.Printf("Next: %s (processed: %v)\n", value, processed)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Processed: item1 (processed: true)
// Next: Processed: item2 (processed: true)
// CompletedContextMapI with index
obs := ro.Pipe(
ro.Just("a", "b", "c"),
ro.ContextMapI[string](func(ctx context.Context, index int64) context.Context {
return context.WithValue(ctx, "itemIndex", index)
}),
ro.Map(func(s string) string {
return fmt.Sprintf("Item: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
itemIndex := ctx.Value("itemIndex")
fmt.Printf("Next: %s (index: %v)\n", value, itemIndex)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Item: a (index: 0)
// Next: Item: b (index: 1)
// Next: Item: c (index: 2)
// CompletedWith context transformation chain
obs := ro.Pipe(
ro.Just("data"),
ro.ContextWithValue[string]("userID", 123),
ro.ContextMap[string](func(ctx context.Context) context.Context {
// Add timestamp to context
return context.WithValue(ctx, "timestamp", time.Now())
}),
ro.ContextMap[string](func(ctx context.Context) context.Context {
// Add request ID based on existing context
userID := ctx.Value("userID")
requestID := fmt.Sprintf("req-%v-%d", userID, time.Now().Unix())
return context.WithValue(ctx, "requestID", requestID)
}),
ro.Map(func(s string) string {
return fmt.Sprintf("Data: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
userID := ctx.Value("userID")
timestamp := ctx.Value("timestamp")
requestID := ctx.Value("requestID")
fmt.Printf("Next: %s (userID: %v, timestamp: %v, requestID: %v)\n",
value, userID, timestamp, requestID)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Data: data (userID: 123, timestamp: <current_time>, requestID: req-123-<timestamp>)
// CompletedWith context-based routing
obs := ro.Pipe(
ro.Just("request1", "request2", "request3"),
ro.ContextMapI[string](func(index int64) context.Context {
// Route even and odd items to different contexts
if index%2 == 0 {
return context.WithValue(ctx, "route", "primary")
}
return context.WithValue(ctx, "route", "secondary")
}),
ro.Map(func(s string) string {
return fmt.Sprintf("Processed: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
route := ctx.Value("route")
fmt.Printf("Next: %s (route: %v)\n", value, route)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Processed: request1 (route: primary)
// Next: Processed: request2 (route: secondary)
// Next: Processed: request3 (route: primary)
// CompletedWith context modification based on item content
obs := ro.Pipe(
ro.Just("urgent", "normal", "critical", "low"),
ro.ContextMap[string](func(ctx context.Context) context.Context {
// This would typically need access to the item value
// For this example, we'll simulate context modification
return context.WithValue(ctx, "processedAt", time.Now())
}),
ro.Map(func(s string) string {
priority := "normal"
if s == "urgent" || s == "critical" {
priority = "high"
}
return fmt.Sprintf("%s (%s priority)", s, priority)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
processedAt := ctx.Value("processedAt")
fmt.Printf("Next: %s (processedAt: %v)\n", value, processedAt)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: urgent (high priority) (processedAt: <timestamp>)
// Next: normal (normal priority) (processedAt: <timestamp>)
// Next: critical (high priority) (processedAt: <timestamp>)
// Next: low (normal priority) (processedAt: <timestamp>)
// CompletedWith context inheritance and modification
baseCtx := context.WithValue(context.Background(), "sessionID", "session-abc")
baseCtx = context.WithValue(baseCtx, "userID", 456)
obs := ro.Pipe(
ro.Just("operation1", "operation2"),
ro.ContextReset[string](baseCtx),
ro.ContextMap[string](func(ctx context.Context) context.Context {
// Inherit from base context and add operation-specific data
return context.WithValue(ctx, "operationID", fmt.Sprintf("op-%d", time.Now().UnixNano()))
}),
ro.ContextMapI[string](func(ctx context.Context, index int64) context.Context {
// Add sequential information
return context.WithValue(ctx, "sequence", index+1)
}),
ro.Map(func(s string) string {
return fmt.Sprintf("Executed: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value string) {
sessionID := ctx.Value("sessionID")
userID := ctx.Value("userID")
operationID := ctx.Value("operationID")
sequence := ctx.Value("sequence")
fmt.Printf("Next: %s (session: %v, user: %v, opID: %v, seq: %v)\n",
value, sessionID, userID, operationID, sequence)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Executed: operation1 (session: session-abc, user: 456, opID: op-<nanotime>, seq: 1)
// Next: Executed: operation2 (session: session-abc, user: 456, opID: op-<nanotime>, seq: 2)
// CompletedWith conditional context transformation
obs := ro.Pipe(
ro.Just("debug", "info", "error", "warning"),
ro.ContextMapI[string](func(ctx context.Context, index int64) context.Context {
// Transform context based on index
ctx = context.WithValue(ctx, "index", index)
if index >= 2 { // error and warning
return context.WithValue(ctx, "severity", "high")
}
return context.WithValue(ctx, "severity", "low")
}),
ro.Map(func(s string) string {
return fmt.Sprintf("Log: %s", s)
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(ctx context.Context, value string) {
index := ctx.Value("index")
severity := ctx.Value("severity")
fmt.Printf("Next: %s (index: %v, severity: %v)\n", value, index, severity)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
defer sub.Unsubscribe()
// Next: Log: debug (index: 0, severity: low)
// Next: Log: info (index: 1, severity: low)
// Next: Log: error (index: 2, severity: high)
// Next: Log: warning (index: 3, severity: high)
// CompletedVariant:Similar:Prototypes:func ContextMap[T any](project func(ctx context.Context) context.Context)
func ContextMapI[T any](project func(ctx context.Context, index int64) context.Context)ThrowOnContextCancelโ
Throws an error if the context is canceled. Should be chained after timeout/deadline operators to handle context cancellation.
obs := ro.Pipe(
ro.Just("timed_operation"),
ro.ContextWithTimeout[string](100 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(150 * time.Millisecond) // Will exceed timeout
return fmt.Sprintf("Completed %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: context deadline exceededWith successful completion
obs := ro.Pipe(
ro.Just("fast_operation"),
ro.ContextWithTimeout[string](200 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(100 * time.Millisecond) // Completes within timeout
return fmt.Sprintf("Success: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Success: fast_operation"
// CompletedWith manual context cancellation
ctx, cancel := context.WithCancel(context.Background())
obs := ro.Pipe(
ro.Just("cancelable_operation"),
ro.ContextReset[string](ctx),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(200 * time.Millisecond)
return fmt.Sprintf("Result: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
// Cancel after short delay
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
time.Sleep(300 * time.Millisecond)
sub.Unsubscribe()
// Error: context canceledWith deadline
deadline := time.Now().Add(100 * time.Millisecond)
obs := ro.Pipe(
ro.Just("deadline_operation"),
ro.ContextWithDeadline[string](deadline),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(150 * time.Millisecond) // Will exceed deadline
return fmt.Sprintf("Deadline result: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: context deadline exceededWith retry on cancellation
obs := ro.Pipe(
ro.Defer(func() Observable[string] {
ctx, cancel := context.WithCancel(context.Background())
// Auto-cancel after short delay
go func() {
time.Sleep(50 * time.Millisecond)
cancel()
}()
return ro.Pipe(
ro.Just("retryable_operation"),
ro.ContextReset[string](ctx),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(100 * time.Millisecond) // Will be cancelled
return fmt.Sprintf("Success: %s", s)
}),
)
}),
ro.RetryWithConfig[string](RetryConfig{
MaxRetries: 3,
Delay: 100 * time.Millisecond,
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Will retry up to 3 times when context is cancelled
// If all attempts are cancelled, final error will be context canceledWith graceful error handling
obs := ro.Pipe(
ro.Just("graceful_operation"),
ro.ContextWithTimeout[string](100 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.Catch(func(err error) Observable[string] {
if errors.Is(err, context.Canceled) {
return ro.Just("Operation was cancelled gracefully")
}
if errors.Is(err, context.DeadlineExceeded) {
return ro.Just("Operation timed out - using cached result")
}
return ro.Throw[string](err)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Operation timed out - using cached result"
// CompletedWith async operations
processAsync := func(item string) Observable[string] {
return ro.Defer(func() Observable[string] {
time.Sleep(150 * time.Millisecond) // Simulate slow async work
return ro.Just(fmt.Sprintf("Async result: %s", item))
})
}
obs := ro.Pipe(
ro.Just("item1", "item2"),
ro.ContextWithTimeout[string](100 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.MergeMap(processAsync),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(300 * time.Millisecond)
sub.Unsubscribe()
// Error: context deadline exceeded (async operations take longer than timeout)With complex pipeline and multiple cancellation points
obs := ro.Pipe(
ro.Just("complex_pipeline"),
ro.ContextWithValue[string]("requestID", "req-123"),
ro.ContextWithTimeout[string](80 * time.Millisecond),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(50 * time.Millisecond)
return fmt.Sprintf("Step1: %s", s)
}),
ro.Map(func(s string) string {
time.Sleep(50 * time.Millisecond) // Will exceed timeout here
return fmt.Sprintf("Step2: %s", s)
}),
ro.Map(func(s string) string {
return fmt.Sprintf("Final: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: context deadline exceeded (pipeline cancelled during second map operation)With context cancellation from parent
parentCtx, parentCancel := context.WithCancel(context.Background())
obs := ro.Pipe(
ro.Just("child_operation"),
ro.ContextReset[string](parentCtx),
ro.ThrowOnContextCancel[string](),
ro.Map(func(s string) string {
time.Sleep(200 * time.Millisecond)
return fmt.Sprintf("Child result: %s", s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
// Cancel from parent context
go func() {
time.Sleep(100 * time.Millisecond)
parentCancel()
}()
time.Sleep(300 * time.Millisecond)
sub.Unsubscribe()
// Error: context canceled (inherited from parent context)Prototype:func ThrowOnContextCancel[T any]()