๐ Debugging Techniques
Debugging reactive streams requires different approaches than traditional imperative code. This guide covers systematic techniques for identifying and resolving issues in samber/ro applications.
1. Stream Inspection with Tap Operatorsโ
The most effective way to debug streams is to add inspection points using Tap operators.
Basic Stream Debuggingโ
// Add Tap operators to see what's happening at each step
func debugPipeline(source ro.Observable[int]) ro.Observable[string] {
return ro.Pipe3(
source,
ro.TapOnNext(func(v int) {
log.Printf("๐ต Source emitted: %v", v)
}),
ro.Map(func(x int) int { return x * 2 }),
ro.TapOnNext(func(v int) {
log.Printf("๐ก After Map: %v", v)
}),
ro.Filter(func(x int) bool { return x > 5 }),
ro.TapOnNext(func(v int) {
log.Printf("๐ข After Filter: %v", v)
}),
ro.Map(func(x int) string { return fmt.Sprintf("result-%d", x) }),
)
}
// Usage with error and completion logging
debugPipeline(ro.Just(1, 2, 3, 4, 5, 6)).Subscribe(
ro.NewObserver(
func(v string) { log.Printf("โ
Final result: %v", v) },
func(err error) { log.Printf("โ Error: %v", err) },
func() { log.Printf("๐ Completed") },
),
)
// Or use the builtin debugger:
debugPipeline(ro.Just(1, 2, 3, 4, 5, 6)).Subscribe(
ro.PrintObserver[string](),
)
Conditional Debuggingโ
// Debug only specific values
func debugConditional[T any](predicate func(T) bool, message string) func(ro.Observable[T]) ro.Observable[T] {
return ro.TapOnNext(func(v T) {
if predicate(v) {
log.Printf("๐ DEBUG [%s]: %v", message, v)
}
})
}
// Usage: Debug only large numbers
pipeline := ro.Pipe2(
ro.Just(1, 100, 2, 200, 3, 300),
debugConditional(func(x int) bool { return x > 50 }, "large-numbers"),
ro.Map(func(x int) int { return x / 2 }),
)
Debug with a custom operatorโ
// Reusable debug operator
func DebugStream[T any](name string, logValues bool, logErrors bool, logCompletion bool) func(ro.Observable[T]) ro.Observable[T] {
return func(source ro.Observable[T]) ro.Observable[T] {
return ro.NewObservable(func(observer ro.Observer[T]) ro.Teardown {
sub := source.Subscribe(ro.NewObserver(
func(value T) {
if logValues {
log.Printf("๐ก [%s] Next: %v", name, value)
}
observer.Next(value)
},
func(err error) {
if logErrors {
log.Printf("๐ฅ [%s] Error: %v", name, err)
}
observer.Error(err)
},
func() {
if logCompletion {
log.Printf("โจ [%s] Complete", name)
}
observer.Complete()
},
))
return sub.Unsubscribe
})
}
}
// Usage
pipeline := ro.Pipe3(
ro.Just(1, 2, 3),
DebugStream("input", true, true, true),
ro.Map(func(x int) int { return x * 2 }),
DebugStream("mapped", true, true, false), // No completion logging
)
2. Test-Driven Debuggingโ
Isolate problematic components by testing them individually.
Unit Testing Individual Operatorsโ
func TestProblematicOperator(t *testing.T) {
// Test with known input
input := ro.Just(1, 2, 3, 4, 5)
obs := yourOperator()(input)
// Collect results
values, err := ro.Collect(obs)
require.NoError(t, err)
// Verify expectations
expected := []int{2, 4, 6, 8, 10}
assert.Equal(t, expected, values)
}
func TestErrorHandling(t *testing.T) {
// Test with error-producing input
input := ro.Throw[int](fmt.Errorf("test error"))
obs := yourOperator()(input)
// Should handle the error gracefully
values, err := ro.Collect(obs)
assert.Error(t, err)
assert.Empty(t, values)
}
func TestEmptySource(t *testing.T) {
// Test with error-producing input
input := ro.Empty[int]()
obs := yourOperator()(input)
// Collect results
values, err := ro.Collect(obs)
require.NoError(t, err)
// Verify expectations
assert.Equal(t, []int{}, values)
}
func TestBlockedSource(t *testing.T) {
// Test with error-producing input
input := ro.Never[int]()
obs := yourOperator()(input)
// Collect results
ctx, _ := context.WithTimeout(context.Background(), 5*time.Millisecond)
sub := obs.SubscribeWithContext(ctx, ro.PrintObserver[int]()) // ๐ฅ blocking
t.False(t, sub.IsClosed())
time.Sleep(10*time.Millisecond)
t.True(t, sub.IsClosed())
}
3. Go Tooling Integrationโ
Race Detectionโ
# Run tests with race detector
go test -race ./...
# Run your application with race detector
go run -race main.go
// Test that demonstrates potential race condition
func TestConcurrentAccess(t *testing.T) {
observable := ro.Just(1, 2, 3, 4, 5)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
values, err := ro.Collect(observable)
require.NoError(t, err)
t.Logf("Goroutine %d got: %v", i, values)
}()
}
wg.Wait()
}
Memory Profilingโ
# Generate CPU profile
go test -cpuprofile=cpu.prof -bench=.
# Generate memory profile
go test -memprofile=mem.prof -bench=.
# Analyze profiles
go tool pprof cpu.prof
go tool pprof mem.prof
func BenchmarkYourOperator(b *testing.B) {
source := ro.Just(1, 2, 3, 4, 5)
operator := YourOperator()
b.ResetTimer()
for i := 0; i < b.N; i++ {
values, err := ro.Collect(operator(source))
if err != nil {
b.Fatal(err)
}
_ = values
}
}
Runtime Tracingโ
func EnableTracing() {
// Enable runtime tracing
trace.Start(os.Stdout)
defer trace.Stop()
// Your reactive code here
source := ro.Just(1, 2, 3, 4, 5)
obs := yourOperator()(source)
values, err := ro.Collect(obs)
if err != nil {
log.Printf("Error: %v", err)
}
log.Printf("Result: %v", values)
}
4. Custom Debugging Operatorsโ
Value History Trackingโ
type ValueTracker[T any] struct {
mu sync.Mutex
values []T
errors []error
}
func (t *ValueTracker[T]) Track() func(ro.Observable[T]) ro.Observable[T] {
return func(source ro.Observable[T]) ro.Observable[T] {
return ro.NewObservable(func(observer ro.Observer[T]) ro.Teardown {
sub := source.Subscribe(ro.NewObserver(
func(value T) {
t.mu.Lock()
t.values = append(t.values, value)
t.mu.Unlock()
observer.Next(value)
},
func(err error) {
t.mu.Lock()
t.errors = append(t.errors, err)
t.mu.Unlock()
observer.Error(err)
},
observer.Complete,
))
return sub.Unsubscribe
})
}
}
func (t *ValueTracker[T]) GetHistory() ([]T, []error) {
t.mu.Lock()
defer t.mu.Unlock()
return append([]T(nil), t.values...), append([]error(nil), t.errors...)
}
// Usage
tracker := &ValueTracker[int]{}
pipeline := ro.Pipe4(
ro.Just(1, 2, 3),
ro.Filter(...),
tracker.Track(),
ro.Map(...),
ro.Take(42),
)
values, err := ro.Collect(pipeline)
fmt.Printf("Final values: %v\n", values)
fmt.Printf("Tracked history: %v\n", tracker.GetHistory())
5. Debugging Checklistโ
When debugging reactive streams, follow this systematic approach:
Step 1: Verify Basic Flowโ
- Add Tap operators to identify where values stop flowing
- Check if observable is hot vs cold as expected
- Verify subscription count and timing
Step 2: Check Error Handlingโ
- Ensure all observers handle errors
- Verify error propagation through pipeline
- Check for panic recovery behavior
Step 3: Examine Context Usageโ
- Verify context propagation through all operators
- Check for unexpected cancellations
- Validate timeout and deadline behavior
Step 4: Profile Resourcesโ
- Run with race detector (
go test -race) - Profile memory usage (
go test -memprofile) - Profile CPU usage (
go test -cpuprofile)
Step 5: Isolate Componentsโ
- Test individual operators in isolation
- Replace complex sources with simple test data
- Build up complexity gradually
Next Stepsโ
- Common Issues - Specific problem solutions
- Performance Issues - Performance optimization
- Memory Leaks - Memory leak detection