Skip to main content

๐Ÿ› Common Issues

This guide covers the most frequently encountered issues when working with samber/ro and their solutions.

1. Not Receiving Valuesโ€‹

Problem: Observable emits no valuesโ€‹

// This seems like it should work, but no values are received
observable := ro.Pipe1(
ro.Just(1, 2, 3),
ro.Map(func(x int) int { return x * 2 }),
)

// โŒ No output
observable.Subscribe(ro.OnNext(func(x int) {
fmt.Println(x) // Never called
}))

Cause: Using ro.OnNext() with a blocking observable. The observable completes synchronously before the observer can handle values.

Solution: Use a full observer or handle the blocking nature:

// โœ… Solution 1: Use full observer
observable.Subscribe(ro.NewObserver(
func(x int) { fmt.Println(x) }, // Next
func(err error) { fmt.Println(err) }, // Error
func() { fmt.Println("Done") }, // Complete
))

// โœ… Solution 2: Use TapXXX operators in the middle of your stream
observable := ro.Pipe1(
ro.Just(1, 2, 3),
ro.Take[int64](5),
ro.Map(func(x int) int { return x * 2 }),
ro.TapOnNext(func(x int) {
fmt.Println("Value: %d", n) // print debug
}),
ro.Map(func(x int64) string {
return fmt.Sprintf("Tick: %d", x)
}),
)
observable.Subscribe(...)

Problem: Hot observable not sharing valuesโ€‹

// Expected both subscribers to see same values
hot := ro.Connectable(ro.Just(1, 2, 3))

sub1 := hot.Subscribe(ro.OnNext(func(x int) {
fmt.Println("Sub1:", x)
}))
sub2 := hot.Subscribe(ro.OnNext(func(x int) {
fmt.Println("Sub2:", x)
}))

// โŒ No output - forgot to connect

Cause: Connectable observables need to be explicitly connected.

Solution: Connect the observable:

// โœ… Connect the observable
connection := hot.Connect()
// Output: Sub1: 1, Sub2: 1, Sub1: 2, Sub2: 2, Sub1: 3, Sub2: 3

2. Error Handling Issuesโ€‹

Problem: Errors being lostโ€‹

// โŒ Error is silently ignored
riskyObservable := ro.Pipe1(
ro.Just(1, 2, 3),
ro.Map(func(x int) int {
if x == 2 {
panic("error!") // This gets lost
}
return x * 2
}),
)

riskyObservable.Subscribe(ro.OnNext(func(x int) {
fmt.Println(x) // Only sees: 1, then stops
}))

Cause: Using ro.OnNext() ignores error notifications.

Solution: Use proper error handling:

// โœ… Handle errors properly
riskyObservable.Subscribe(ro.NewObserver(
func(x int) { fmt.Println("Next:", x) },
func(err error) { fmt.Println("Error:", err) }, // Catch the error
func() { fmt.Println("Complete") },
))

Problem: Operator doesn't handle returned errorsโ€‹

// โŒ MapErr errors not handled
stream := ro.Pipe1(
ro.Just(1, 2, 3),
ro.MapErr(func(x int) (int, error) {
if x == 2 { return 0, fmt.Errorf("bad number") }
return x * 2, nil
}),
)

stream.Subscribe(ro.OnNext(func(x int) {
fmt.Println(x) // Panic: unhandled error
}))

Solution: Use operators that handle errors or add error handling:

// โœ… Option 1: Use Catch operator
safeStream := ro.Pipe2(
ro.Just(1, 2, 3),
ro.MapErr(func(x int) (int, error) {
if x == 2 {
return 0, fmt.Errorf("bad number")
}
return x * 2, nil
}),
ro.Catch(func(err error) ro.Observable[int] {
fmt.Println("Recovered:", err)
return ro.Just(0) // Fallback value
}),
)

// โœ… Option 2: Handle in observer
stream.Subscribe(ro.NewObserver(
func(x int) { fmt.Println("Next:", x) },
func(err error) { fmt.Println("Error:", err) },
func() { fmt.Println("Complete") },
))

3. Context and Cancellation Issuesโ€‹

Problem: Context cancellation not respectedโ€‹

// โŒ Operator doesn't check context
func badOperator() ro.Observable[int] {
return ro.NewObservable(func(observer ro.Observer[int]) ro.Teardown {
for i := 0; i < 1000; i++ {
time.Sleep(10 * time.Second)
observer.Next(i) // Ignores context cancellation
}
observer.Complete()
return nil
})
}

Solution: Check context in long-running operations:

// โœ… Context-aware operator
func goodOperator(ctx context.Context) ro.Observable[int] {
return ro.NewObservableWithContext(func(ctx context.Context, observer ro.Observer[int]) ro.Teardown {
for i := 0; i < 1000; i++ {
select {
case <-ctx.Done():
return // Respect cancellation
default:
observer.Next(i)
time.Sleep(10 * time.Second)
}
}
observer.Complete()
return nil
})
}

Problem: Context not propagated through pipelineโ€‹

// โŒ Context lost in custom operator
brokenOperator := func(source ro.Observable[int]) ro.Observable[string] {
return ro.NewUnsafeObservable(func(destination ro.Observer[string]) ro.Teardown {
// Context not passed through!
sub := source.Subscribe(ro.NewObserver(
func(value int) {
destination.Next(fmt.Sprintf("item-%d", value))
},
destination.Error,
destination.Complete,
))
return sub.Unsubscribe
})
}

Solution: Use context-aware observable creation:

// โœ… Proper context propagation
workingOperator := func(source ro.Observable[int]) ro.Observable[string] {
return ro.NewUnsafeObservableWithContext(func(subscriberCtx context.Context, destination ro.Observer[string]) ro.Teardown {
sub := source.SubscribeWithContext(subscriberCtx, ro.NewObserverWithContext(
func(ctx context.Context, value int) {
destination.NextWithContext(ctx, fmt.Sprintf("item-%d", value))
},
destination.ErrorWithContext,
destination.CompleteWithContext,
))
return sub.Unsubscribe
})
}

4. Backpressure and Performance Issuesโ€‹

Problem: Fast producer overwhelms slow consumerโ€‹

// โŒ No backpressure handling
func fastProducer() ro.Observable[int] {
return ro.NewObservable(func(observer ro.Observer[int]) ro.Teardown {
// put in memory every values up-front
numbers := []int{}
for i := 0; i < 1000000; i++ {
numbers = append(numbers, rand.IntN(42))
}

for i := 0; i < len(numbers); i++ {
observer.Next(numbers[i]) // Produces faster than consumer can handle
}

observer.Complete()
return nil
})
}

slowConsumer := fastProducer().Subscribe(ro.OnNext(func(x int) {
time.Sleep(1 * time.Millisecond) // Slow processing
fmt.Println(x)
}))
// Result: Memory usage explodes

Solution: Implement buffering or throttling:

// โœ… Add backpressure with buffer
func fastProducer() ro.Observable[int] {
return ro.NewObservable(func(observer ro.Observer[int]) ro.Teardown {
for i := 0; i < 1000000; i++ {
observer.Next(rand.IntN(42)) // Produces values just in time
}

observer.Complete()
return nil
})
}

slowConsumer := ro.Pipe2(
fastProducer(),
ro.ObserveOn(10), // a few values will be accumulated without blocking source
ro.Flatten(),
).Subscribe(ro.OnNext(func(x int) {
time.Sleep(1 * time.Millisecond) // Slow processing
fmt.Println(x)
}))

5. Memory and Resource Leaksโ€‹

Problem: Goroutine leak from unsubscriptionโ€‹

// โŒ Goroutine continues after unsubscription
func leakyOperator() ro.Observable[int] {
return ro.NewObservable(func(observer ro.Observer[int]) ro.Teardown {
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

i := 0
for {
select {
case <-ticker.C:
observer.Next(i)
i++
}
// No way to stop this goroutine!
}
}()

return func() {
// Cleanup doesn't stop the goroutine
}
})
}

Solution: Provide proper cleanup mechanism:

// โœ… Proper goroutine cleanup
func nonLeakyOperator() ro.Observable[int] {
return ro.NewObservable(func(observer ro.Observer[int]) ro.Teardown {
ctx, cancel := context.WithCancel(context.Background())
ticker := time.NewTicker(100 * time.Millisecond)

go func() {
defer ticker.Stop()
i := 0

for {
select {
case <-ctx.Done():
return // Goroutine exits on cancellation
case <-ticker.C:
observer.Next(i)
i++
}
}
}()

return func() {
cancel() // Stop the goroutine
}
})
}

6. Operator Chaining Issuesโ€‹

Problem: Wrong operator variantโ€‹

// โŒ Using wrong variant
numbers := ro.Just([]int{1, 2, 3}, []int{4, 5, 6})

// Want to flatten, but using Map instead of Flatten
obs := ro.Pipe1(
numbers,
ro.Map(func(slice []int) int {
return slice[0] // Only gets first element
}),
)
// Output: 1, 4 (missing other values)

Solution: Use correct operator:

// โœ… Use Flatten for nested collections
obs := ro.Pipe1(
ro.Just([]int{1, 2, 3}, []int{4, 5, 6}),
ro.Flatten[int](), // Flatten nested slices
)
// Output: 1, 2, 3, 4, 5, 6

Quick Referenceโ€‹

IssueSymptomQuick Fix
No values from ro.JustSilent subscriptionUse ro.NewObserver instead of ro.OnNext
Hot observable not workingNo output from multiple subscribersCall .Connect()
Lost errorsStream stops silentlyAdd error observer with ro.NewObserver
Context ignoredLong operations don't cancelUse *WithContext variants and check context
Memory explosionFast producer, slow consumerAdd buffering or throttling
Goroutine leakMemory grows over timeProvide proper cleanup in teardown

Next Stepsโ€‹