Skip to main content

Testing Reactive Pipelines

The ro library provides a comprehensive testing framework for unit testing reactive pipelines. This framework helps you verify that your Observable streams emit the expected sequence of values, errors, and completion signals.

The testing framework is designed to be intuitive and expressive, allowing you to write clear assertions about reactive streams without complex boilerplate code.

Testing Packageโ€‹

The testing functionality is available in the /testing package:

import "github.com/samber/ro/testing"

Basic Usageโ€‹

Simple Assertionโ€‹

The fluent API makes it easy to verify exact sequences of values. Each expectation builds on the previous one, creating readable and maintainable test code.

import (
"testing"
"github.com/samber/ro"
"github.com/samber/ro/testing"
)

func TestObservable(t *testing.T) {
observable := ro.Just(1, 2, 3)

testing.Assert[int](t).
Source(observable).
ExpectNext(1).
ExpectNext(2).
ExpectNext(3).
ExpectComplete().
Verify()
}

Mockingโ€‹

Build reusable pipelines with ro.PipeOpX(...) variants, then invoke them with predictable test data. This pattern promotes clean separation between pipeline definition and testing.

// Feature
var pipeline = ro.PipeOp3(
ro.Filter(func(x int) int {
return x%2 == 1
})
ro.Map(func(x int) string {
return fmt.Sprintf("processed-%d", x)
}),
ro.DelayEach[string](100 * time.Millisecond)
)
// Tests
func TestObservable(t *testing.T) {
observable := pipeline(ro.Just(1, 2, 3, 4))

testing.Assert[string](t).
Source(observable).
ExpectNext("processed-1").
ExpectNext("processed-3").
ExpectComplete().
Verify()
}

Testing Error Casesโ€‹

Test error scenarios by creating observables that emit specific errors and verifying they're handled correctly. This is crucial for building robust reactive applications.

func TestObservableError(t *testing.T) {
observable := ro.Throw[string](errors.New("something went wrong"))

testing.Assert[string](t).
Source(observable).
ExpectError(errors.New("something went wrong")).
Verify()
}

Sequence Assertionsโ€‹

Use ExpectNextSeq() to verify multiple values at once. This is more concise than multiple ExpectNext() calls and improves test readability.

func TestObservableSequence(t *testing.T) {
observable := ro.Range(1, 5)

testing.Assert[int](t).
Source(observable).
ExpectNextSeq(1, 2, 3, 4, 5).
ExpectComplete().
Verify()
}

API Referenceโ€‹

AssertSpec Interfaceโ€‹

The AssertSpec[T] interface provides a fluent API for testing observables:

type AssertSpec[T any] interface {
Source(source ro.Observable[T]) AssertSpec[T]
ExpectNext(value T, msgAndArgs ...any) AssertSpec[T]
ExpectNextSeq(items ...T) AssertSpec[T]
ExpectError(err error, msgAndArgs ...any) AssertSpec[T]
ExpectComplete(msgAndArgs ...any) AssertSpec[T]
Verify()
VerifyWithContext(ctx context.Context)
}

Methodsโ€‹

Source(source ro.Observable[T]) AssertSpec[T]โ€‹

Sets the observable to test.

ExpectNext(value T, msgAndArgs ...any) AssertSpec[T]โ€‹

Expects the next value emitted by the observable. Optionally accepts custom error messages.

ExpectNextSeq(items ...T) AssertSpec[T]โ€‹

Expects a sequence of values to be emitted by the observable.

ExpectError(err error, msgAndArgs ...any) AssertSpec[T]โ€‹

Expects the observable to emit a specific error.

ExpectComplete(msgAndArgs ...any) AssertSpec[T]โ€‹

Expects the observable to complete successfully.

Verify()โ€‹

Subscribes to the observable and verifies all assertions.

VerifyWithContext(ctx context.Context)โ€‹

Same as Verify() but with a custom context (eg: for timeout control). Context will be transmitted to the .SubscribeWithContext(...) method.

Advanced Testing Patternsโ€‹

Testing with Contextโ€‹

Use context for timeout control and cancellation in time-based tests. This prevents infinite tests and mirrors real-world usage patterns.

func TestObservableWithContext(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

observable := ro.Pipe1(
ro.Interval(1 * time.Second),
ro.Take(3),
)

testing.Assert[int](t).
Source(observable).
ExpectNextSeq(0, 1, 2).
ExpectComplete().
VerifyWithContext(ctx)
}

Testing Custom Messagesโ€‹

Custom error messages make test failures easier to debug. When assertions fail, your custom messages will provide clear context about what went wrong.

func TestObservableWithCustomMessages(t *testing.T) {
observable := ro.Just("hello", "world")

testing.Assert[string](t).
Source(observable).
ExpectNext("hello", "expected first value to be 'hello'").
ExpectNext("world", "expected second value to be 'world'").
ExpectComplete("expected observable to complete").
Verify()
}

Testing Hot Observablesโ€‹

Testing hot observables requires careful consideration of timing and concurrency. Use goroutines to simulate real-world scenarios where multiple subscribers receive values concurrently.

func TestHotObservable(t *testing.T) {
subject := ro.NewSubject[int]()

// Start emitting values
go func() {
subject.Next(1)
subject.Next(2)
subject.Complete()
}()

testing.Assert[int](t).
Source(subject).
ExpectNextSeq(1, 2).
ExpectComplete().
Verify()
}

Best Practicesโ€‹

Follow these practices to ensure comprehensive and maintainable test coverage for your reactive pipelines:

  1. Test Cold and Hot Observables: Ensure your tests cover both cold (start on subscription) and hot (start immediately) observables.

  2. Use Context for Time-based Tests: Always use context with timeout when testing time-based observables to prevent infinite tests.

  3. Test Error Cases: Don't forget to test error scenarios in addition to success cases.

  4. Keep Tests Focused: Each test should verify one specific behavior or scenario.

  5. Use Descriptive Messages: Provide custom error messages to make test failures easier to understand.

Integration with Testifyโ€‹

The ro library includes a testify plugin for enhanced assertion capabilities:

import "github.com/samber/ro/plugins/testify"

This plugin provides additional helpers and integrations with the popular testify testing framework, making it easier to integrate reactive testing into existing Go codebases.

See plugins documentation for more details.