Skip to main content

๐Ÿš€ Getting started

Welcome to samber/ro! This guide will help you get started with reactive programming in Go. You'll learn the core concepts and see practical examples.

Quick Start

This guide is designed to get you up and running in under 5 minutes. Each example builds on previous concepts.

Installationโ€‹

Make sure you have Go 1.18+ installed. The library uses modern Go features like generics.

go get -u github.com/samber/ro

Your First Observable Streamโ€‹

Hello World

Let's start with a simple example that creates a stream of values and processes them. This demonstrates the core reactive pattern: create, transform, and subscribe.

package main

import (
"fmt"
"time"

"github.com/samber/ro"
)

func main() {
// Create a simple stream
observable := ro.Pipe2(
ro.Interval(1 * time.Second),
ro.Take[int64](5),
ro.Map(func(x int64) string {
return fmt.Sprintf("Tick: %d", x)
}),
)

// Subscribe and print values
subscription := observable.Subscribe(ro.OnNext(func(s string) {
fmt.Println(s)
}))

// Wait for completion
subscription.Wait()
}

Output:

Tick: 0
Tick: 1
Tick: 2
Tick: 3
Tick: 4

Core Conceptsโ€‹

These four concepts are the building blocks of reactive programming with ro:

1. Observablesโ€‹

Data Sources

An Observable is a stream of values that can be observed over time:

// Create from values
numbers := ro.Just(1, 2, 3, 4, 5)

// Create from a slice
letters := ro.FromSlice([]string{"a", "b", "c"})

2. Operatorsโ€‹

Operators transform, filter, or combine streams:

// Chain operators with ro.Pipe
result := ro.Pipe2(
ro.Range(0, 10),
ro.Filter(func(x int64) bool {
return x%2 == 0 // Keep only even numbers
}),
ro.Map(func(x int64) string {
return fmt.Sprintf("even-%d", x) // Transform to string
}),
)

result.Subscribe(ro.OnNext(func(s string) {
fmt.Println(s)
}))
// "even-0", "even-2", "even-4", "even-6", "even-8"

3. Subscriptionsโ€‹

Resource Management

Subscriptions receive values from observables and manage cleanup:

subscription := observable.Subscribe(ro.NewObserver(
func(value int) { // OnNext
fmt.Println("Received:", value)
},
func(err error) { // OnError
fmt.Println("Error:", err)
},
func() { // OnCompleted
fmt.Println("Done!")
},
))

// Cancel subscription if needed
subscription.Unsubscribe()

4. Multiple Subscriptionsโ€‹

Each call to .Subscribe() creates a new independent subscription that restarts the stream from the beginning. This is called a "cold" observable.

source := ro.Just(1, 2, 3)

// First subscription
sub1 := source.Subscribe(ro.OnNext(func(x int) {
fmt.Println("Subscriber 1:", x)
}))

// Second subscription - restarts from beginning
sub2 := source.Subscribe(ro.OnNext(func(x int) {
fmt.Println("Subscriber 2:", x)
}))

// Output:
// Subscriber 1: 1
// Subscriber 1: 2
// Subscriber 1: 3
// Subscriber 2: 1
// Subscriber 2: 2
// Subscriber 2: 3

To share a single stream execution across multiple subscribers, use .Share() to create a hot observable (covered later in this guide).

Common Operationsโ€‹

Daily Operations

These are the most frequently used operations in reactive programming:

Filtering Valuesโ€‹

Filter operators let you select which values to process:

obs := ro.Pipe1(
ro.Range(0, 10),
ro.Filter(func(x int64) bool {
return x > 5 // Keep values greater than 5
}),
)
// Output: 6, 7, 8, 9

Transforming Dataโ€‹

Data Mapping

Transform operators convert values from one type to another:

obs := ro.Pipe1(
ro.Just("apple", "banana", "cherry"),
ro.Map(func(s string) string {
return strings.ToUpper(s)
}),
)
// Output: APPLE, BANANA, CHERRY

Combining Streamsโ€‹

Combine multiple streams into one:

stream1 := ro.Just(1, 2, 3)
stream2 := ro.Just(4, 5, 6)

obs := ro.Concat(stream1, stream2)
// Output: 1, 2, 3, 4, 5, 6

Error Handlingโ€‹

Robust Applications

Handle errors gracefully to prevent application crashes:

riskyStream := ro.Pipe2(
ro.Just(1, 2, 3, 4, 5),
ro.MapErr(func(x int) (int, error) {
if x == 3 {
return 0, fmt.Errorf("error at %d", x)
}
return x * 2, nil
}),
ro.Catch(func(err error) ro.Observable[int] {
fmt.Println("Recovered from error:", err)
return ro.Just(42) // Fallback value
}),
)

Panics recoveryโ€‹

The ro framework automatically catches panics that occur in operators and converts them to errors passed through destination.Error(...):

// A stream with a potential panic in Map operator
stream := ro.Pipe1(
ro.Just(1, 2, 3, 4, 5),
ro.Map(func(x int) string {
// This will panic when x == 3
if x == 3 {
panic("something went wrong!")
}
return fmt.Sprintf("processed-%d", x)
}),
)

// The framework automatically catches the panic and converts it to an error
subscription := stream.Subscribe(ro.NewObserver(
func(value string) {
fmt.Println("Received:", value)
},
func(err error) {
fmt.Println("Error (automatically caught):", err)
},
func() {
fmt.Println("Stream completed!")
},
))

subscription.Wait()

Output:

Received: processed-1
Received: processed-2
Error (automatically caught): something went wrong!

Real-world Example: API Rate Limitingโ€‹

Practical Application

This example shows how to handle API calls with rate limiting and retry logic - a common real-world scenario.

package main

import (
"fmt"
"net/http"
"time"

"github.com/samber/ro"
)

func fetchUser(id int) (string, error) {
// Simulate API call
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("user-%d", id), nil
}

func main() {
// Create a stream of user IDs
userIds := ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Process with rate limiting
userStream := ro.Pipe3(
userIds,
ro.Map(fetchUser),
ro.DelayEach[string](200 * time.Millisecond), // 200ms pause between items
ro.RetryWithConfig(RetryConfig{MaxRetries: 2}), // Retry failed requests
)

// Subscribe and collect results
var results []string
subscription := userStream.Subscribe(ro.NewObserver(
func(user string) {
results = append(results, user)
fmt.Println("Fetched:", user)
},
func(err error) {
fmt.Println("Error:", err)
},
func() {
fmt.Println("All users fetched!")
fmt.Println("Results:", results)
},
))

// Wait for completion
subscription.Wait()
}

Creating Custom Operatorsโ€‹

Create reusable operators to encapsulate common transformations:

// Custom operator that squares numbers
func Square[T constraints.Integer](observable ro.Observable[T]) ro.Observable[T] {
return ro.Map(func(x T) T {
return x * x
})(observable)
}

func main() {
result := Square(ro.Just(1, 2, 3, 4, 5))

result.Subscribe(ro.OnNext(func(x int) {
fmt.Println(x) // 1, 4, 9, 16, 25
}))
}

Hot vs Cold Observablesโ€‹

Stream Behavior

Understanding the difference between hot and cold observables is crucial for building correct reactive applications:

Cold Observables (default)โ€‹

Each subscriber gets their own independent stream:

cold := ro.Just(1, 2, 3)

// Each subscriber sees the same values independently. Consumption starts on subscription.
cold.Subscribe(ro.OnNext(func(x int) { fmt.Println("Sub1:", x) }))
cold.Subscribe(ro.OnNext(func(x int) { fmt.Println("Sub2:", x) }))

Hot Observablesโ€‹

Shared Execution

Multiple subscribers share the same stream. See Subject for more details.

// Create a hot observable from a cold one
hot := ro.Connectable(ro.Just(1, 2, 3))

// Both subscribers share the same sequence simultaneously
sub1 := hot.Subscribe(ro.OnNext(func(x int) { fmt.Println("Sub1:", x) }))
sub2 := hot.Subscribe(ro.OnNext(func(x int) { fmt.Println("Sub2:", x) }))

// Start subscription
subscription := connectable.Connect()

Best Practicesโ€‹

Production Ready

Follow these practices to write maintainable and robust reactive code:

1. Use Pipeline Operatorsโ€‹

Pipeline operators promote clean, reusable code:

// Good: Composable pipeline
pipeline := ro.Pipe3(
source,
ro.Filter(predicate),
ro.Map(transformer),
ro.Retry(3),
)

// Reusable pipeline
result1 := pipeline(stream1)
result2 := pipeline(stream2)

2. Handle Errors Gracefullyโ€‹

Error Recovery

Always handle errors to prevent application crashes:

stream := ro.Pipe2(
riskyOperation,
ro.Catch(func(err error) ro.Observable[string] {
// Log error and provide fallback
log.Println("Operation failed:", err)
return fallbackStream
}),
)

3. Manage Resourcesโ€‹

Resource Leaks

Clean up resources to prevent memory leaks:

// Clean up resources when done
subscription := stream.Subscribe(observer)
defer subscription.Unsubscribe()

4. Avoid Memory Leaksโ€‹

// Use Take to limit infinite streams
obs1 := ro.Pipe1(
source,
ro.Take[int](10), // Only 10 values
)

// Use TakeUntil with timeout
obs2 := ro.Pipe1(
source,
stream.TakeUntil[int](ro.Timer(30*time.Second))
)

Next Stepsโ€‹

Continue Learning

Now that you understand the basics, explore:

Happy streaming! ๐Ÿš€