Skip to main content

๐Ÿ‘๏ธ Observer

An Observer is the consumer side of reactive programming in samber/ro. It receives notifications from Observable through three essential methods: Next, Error, and Complete. Observer are the destination for values emitted by Observable.

What is an Observer?โ€‹

An Observer is:

  • A consumer of values: It receives values emitted by Observables
  • Notification handler: It processes Next, Error, and Complete notifications
  • Stateful: It tracks whether it's active, completed, or errored
  • Thread-safe: Multiple goroutines can safely call Observer methods

Observer Interfaceโ€‹

The Observer interface defines three core methods Next, Error, and Complete, with XxxxWithContext variants.

type Observer[T any] interface {
// Next receives the next value from the Observable
Next(value T)
NextWithContext(ctx context.Context, value T)

// Error receives an error notification (terminal)
Error(err error)
ErrorWithContext(ctx context.Context, err error)

// Complete receives a completion notification (terminal)
Complete()
CompleteWithContext(ctx context.Context)

// State checking methods
IsClosed() bool
HasThrown() bool
IsCompleted() bool
}

Creating Observersโ€‹

Complete Observerโ€‹

Always use complete observers in production code to handle errors and completion signals properly.

// Create a full Observer with all callbacks
observer := ro.NewObserver(
func(value int) {
fmt.Println("Received:", value)
},
func(err error) {
fmt.Println("Error:", err)
},
func() {
fmt.Println("Completed")
},
)

// Use with an Observable
observable := ro.Just(1, 2, 3)
observable.Subscribe(observer)
// Output:
// Received: 1
// Received: 2
// Received: 3
// Completed

Context-aware Observerโ€‹

Context-aware Observer

Use context-aware observers when you need timeout control or cancellation. The context is passed through the entire pipeline and can be used to stop processing.

// Create an Observer with context support
observer := ro.NewObserverWithContext(
func(ctx context.Context, value int) {
fmt.Printf("Received %d with context\n", value)
},
func(ctx context.Context, err error) {
fmt.Printf("Error %v with context\n", err)
},
func(ctx context.Context) {
fmt.Println("Completed with context")
},
)

observable.SubscribeWithContext(context.Background(), observer)

Partial Observersโ€‹

Next-only Observerโ€‹

Use ro.OnNext() when you only need to handle values and can ignore errors and completion signals for simple use cases.

// Handle only Next values, ignore errors and completion
observer := ro.OnNext(func(value string) {
fmt.Println("Got:", strings.ToUpper(value))
})

ro.Just("hello", "world").Subscribe(observer)
// Output: GOT: HELLO, GOT: WORLD

Error-only Observerโ€‹

Error Handling

Error-only observers are useful for logging or monitoring failure scenarios without processing the actual values. Always handle errors in production code.

// Handle only errors, ignore values
observer := ro.OnError(func(err error) {
log.Printf("Operation failed: %v", err)
})

riskyObservable.Subscribe(observer)

Complete-only Observerโ€‹

Complete-only observers are useful for cleanup operations or triggering follow-up actions after a stream finishes.

// Handle only completion
observer := ro.OnComplete(func() {
fmt.Println("All processing completed")
})

longRunningObservable.Subscribe(observer)

Observer Lifecycleโ€‹

An Observer can be in one of three states:

  1. Active: Ready to receive notifications (default state)
  2. Completed: Received a Complete notification, no more notifications accepted
  3. Errored: Received an Error notification, no more notifications accepted
observer := ro.NewObserver(
func(value int) {
fmt.Println("State:", observer.IsClosed()) // false while active
fmt.Println("Received:", value)
},
func(err error) {
fmt.Println("State:", observer.IsClosed()) // true
fmt.Println("HasThrown:", observer.HasThrown()) // true
fmt.Println("Error:", err)
},
func() {
fmt.Println("State:", observer.IsClosed()) // true
fmt.Println("IsCompleted:", observer.IsCompleted()) // true
},
)

Error Handling in Observersโ€‹

Panic Recoveryโ€‹

Panic Recovery

Observers automatically recover from panics in callback functions, preventing application crashes.

observer := ro.NewObserver(
func(value int) {
if value == 3 {
panic("something went wrong!")
}
fmt.Println("Value:", value)
},
func(err error) {
fmt.Println("Recovered error:", err) // Handles the panic
},
func() {
fmt.Println("Completed")
},
)

ro.Just(1, 2, 3, 4).Subscribe(observer)
// Output:
// Value: 1
// Value: 2
// Recovered error: something went wrong!

State After Errorโ€‹

Once an Observer receives an error, it rejects further notifications:

observer := ro.NewObserver(
func(value int) {
fmt.Println("Got:", value)
},
func(err error) {
fmt.Println("Error:", err)
},
func() {
fmt.Println("Completed")
},
)

// Send error
observer.Error(fmt.Errorf("network failure"))

// Try to send more values (will be ignored)
observer.Next(42) // Ignored
observer.Complete() // Ignored

fmt.Println("IsClosed:", observer.IsClosed()) // true
fmt.Println("HasThrown:", observer.HasThrown()) // true

Utility Observersโ€‹

No-op Observerโ€‹

// Observer that does nothing
noop := ro.NoopObserver[int]()
ro.Just("hello", "world").Subscribe(noop) // Values are silently consumed
// Output:
// Observer that prints all notifications for debugging
debugObserver := ro.PrintObserver[string]()
ro.Just("hello", "world").Subscribe(debugObserver)
// Output:
// Next: hello
// Next: world
// Completed

Observer Best Practicesโ€‹

1. Always Handle Errorsโ€‹

Best Practice

In production code, always handle all three observer callbacks (Next, Error, Complete) to ensure proper error handling and resource cleanup.

// Good: Handle errors
observer := ro.NewObserver(
func(value int) { /* process value */ },
func(err error) { /* handle error */ },
func() { /* handle completion */ },
)

// Risky: No error handling in potentially failing operations
observer := ro.OnNext(func(value int) { /* process value */ })
Operator-based Processing

While Observers can handle side effects, it's better to perform async operations using operators like .FlatMap to maintain the reactive pipeline benefits.

2. Handle Async Operations with Operatorsโ€‹

While Observers can handle side effects, it's better to perform async operations using operators like .FlatMap:

// Good: Keep OnNext method focused on terminal consumption
observer := ro.OnNext(func(value int) {
result := value * 2
fmt.Println("Processed:", result)
})

// Avoid: Complex async operations in terminal Observer
observer := ro.OnNext(func(value int) {
// This blocks the Observer and loses error handling benefits
if err := writeToDatabase(value); err != nil {
log.Println("Failed to save:", err) // No clean error propagation
}
})

// Better: Use MapErr for async operations
// Chain operations in the pipeline
pipeline := ro.Pipe2(
ro.Just(1, 2, 3),
ro.Map(func(value int) int { return value * 2 }),
ro.MapErr(func(value int) (int, error) {
err := writeToDatabase(value)
return value, err
}),
)

pipeline.Subscribe(ro.NewObserver(
func(value int) {
fmt.Println("Saved to database:", value)
},
func(err error) {
fmt.Println("Error saving:", err)
},
func() {
fmt.Println("All saves completed")
},
))

3. Handle Backpressureโ€‹

Backpressure Advantage

In samber/ro, backpressure is handled naturally through blocking behavior. When you call observer.Next(), the call blocks until all downstream operators in the pipeline have processed the value. This ensures the stream is consumed sequentially and in the right order.

// Example: Processing values with backpressure
func createProcessingObservable() ro.Observable[int] {
return ro.NewObservable(func(observer ro.Observer[int]) ro.Teardown {
for i := 1; i <= 5; i++ {
fmt.Printf("Emitting %d\n", i)

// This blocks until downstream operators complete
observer.Next(i)

fmt.Printf("Downstream completed for %d\n", i)
}
observer.Complete()
return nil
})
}

// Create a pipeline with a slow operator
pipeline := ro.Pipe1(
createProcessingObservable(),
ro.Map(func(value int) int {
// Simulate slow processing
time.Sleep(100 * time.Millisecond)
return value * 2
}),
)

pipeline.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Received: %d\n", value)
},
func(err error) {
fmt.Println("Error:", err)
},
func() {
fmt.Println("Completed")
},
))
// Output:
// Emitting 1
// Received: 2
// Downstream completed for 1
// Emitting 2
// Received: 4
// Downstream completed for 2
// ...

This blocking behavior ensures that:

  • The producer waits for consumers to be ready
  • Memory usage remains bounded
  • No values are lost due to overflow
  • The pipeline naturally regulates flow rate
  • The stream is consumed in a sequential fashion
  • The message order is preserved

Observer vs Go Channelsโ€‹

When producing a message into a Go channel, the producer is released as soon as the channel buffer is partially filled or the consumer read on it.

Go Channelsโ€‹

func producer(ch chan int) {
for i := 1; i <= 3; i++ {
fmt.Printf("Produced %d\n", i)
ch <- i // Producer doesn't wait for consumer to finish
fmt.Printf("Producer continued for %d\n", i)
}
close(ch)
}

func consumer(ch chan int) {
for value := range ch {
fmt.Printf("Consuming %d\n", value)
time.Sleep(100 * time.Millisecond) // Simulate slow processing
fmt.Printf("Finished processing %d\n", value)
}
}

ch := make(chan int, 42)
go producer(ch)
consumer(ch)
// Output:
// Produced 1
// Produced 2
// Produced 3
// Producer continued for 1
// Producer continued for 2
// Producer continued for 3
// Consuming 1
// Finished processing 1
// Consuming 2
// Finished processing 2
// Consuming 3
// Finished processing 3

Observer Patternโ€‹

func observableProducer() ro.Observable[int] {
return ro.NewObservable(func(observer ro.Observer[int]) ro.Teardown {
for i := 1; i <= 3; i++ {
fmt.Printf("Produced %d\n", i)
observer.Next(i) // Producer waits for consumer to finish
fmt.Printf("Producer continued for %d\n", i)
}
observer.Complete()
return nil
})
}

observableProducer().Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Consuming %d\n", value)
time.Sleep(100 * time.Millisecond) // Simulate slow processing
fmt.Printf("Finished processing %d\n", value)
},
func(err error) {
fmt.Println("Error:", err)
},
func() {
fmt.Println("Completed")
},
))
// Output:
// Produced 1
// Consuming 1
// Finished processing 1
// Producer continued for 1
// Produced 2
// Consuming 2
// Finished processing 2
// Producer continued for 2
// Produced 3
// Consuming 3
// Finished processing 3
// Producer continued for 3
// Completed

This fundamental difference affects how you design concurrent systems:

  • Go channels: Producer continues immediately, consumer processes asynchronously
  • Observables: Producer waits for consumer to finish, providing synchronous semantics

See Channels vs ro for a detailed comparison.

Observers are the essential consumer interface in reactive programming, providing a clean, thread-safe way to handle streams of values with proper error handling and lifecycle management.