๐ Getting started
Welcome to samber/ro! This guide will help you get started with reactive programming in Go. You'll learn the core concepts and see practical examples.
This guide is designed to get you up and running in under 5 minutes. Each example builds on previous concepts.
Installationโ
Make sure you have Go 1.18+ installed. The library uses modern Go features like generics.
go get -u github.com/samber/ro
Your First Observable Streamโ
Let's start with a simple example that creates a stream of values and processes them. This demonstrates the core reactive pattern: create, transform, and subscribe.
package main
import (
"fmt"
"time"
"github.com/samber/ro"
)
func main() {
// Create a simple stream
observable := ro.Pipe2(
ro.Interval(1 * time.Second),
ro.Take[int64](5),
ro.Map(func(x int64) string {
return fmt.Sprintf("Tick: %d", x)
}),
)
// Subscribe and print values
subscription := observable.Subscribe(ro.OnNext(func(s string) {
fmt.Println(s)
}))
// Wait for completion
subscription.Wait()
}
Output:
Tick: 0
Tick: 1
Tick: 2
Tick: 3
Tick: 4
Core Conceptsโ
These four concepts are the building blocks of reactive programming with ro:
1. Observablesโ
An Observable is a stream of values that can be observed over time:
// Create from values
numbers := ro.Just(1, 2, 3, 4, 5)
// Create from a slice
letters := ro.FromSlice([]string{"a", "b", "c"})
2. Operatorsโ
Operators transform, filter, or combine streams:
// Chain operators with ro.Pipe
result := ro.Pipe2(
ro.Range(0, 10),
ro.Filter(func(x int64) bool {
return x%2 == 0 // Keep only even numbers
}),
ro.Map(func(x int64) string {
return fmt.Sprintf("even-%d", x) // Transform to string
}),
)
result.Subscribe(ro.OnNext(func(s string) {
fmt.Println(s)
}))
// "even-0", "even-2", "even-4", "even-6", "even-8"
3. Subscriptionsโ
Subscriptions receive values from observables and manage cleanup:
subscription := observable.Subscribe(ro.NewObserver(
func(value int) { // OnNext
fmt.Println("Received:", value)
},
func(err error) { // OnError
fmt.Println("Error:", err)
},
func() { // OnCompleted
fmt.Println("Done!")
},
))
// Cancel subscription if needed
subscription.Unsubscribe()
4. Multiple Subscriptionsโ
Each call to .Subscribe() creates a new independent subscription that restarts the stream from the beginning. This is called a "cold" observable.
source := ro.Just(1, 2, 3)
// First subscription
sub1 := source.Subscribe(ro.OnNext(func(x int) {
fmt.Println("Subscriber 1:", x)
}))
// Second subscription - restarts from beginning
sub2 := source.Subscribe(ro.OnNext(func(x int) {
fmt.Println("Subscriber 2:", x)
}))
// Output:
// Subscriber 1: 1
// Subscriber 1: 2
// Subscriber 1: 3
// Subscriber 2: 1
// Subscriber 2: 2
// Subscriber 2: 3
To share a single stream execution across multiple subscribers, use .Share() to create a hot observable (covered later in this guide).
Common Operationsโ
These are the most frequently used operations in reactive programming:
Filtering Valuesโ
Filter operators let you select which values to process:
obs := ro.Pipe1(
ro.Range(0, 10),
ro.Filter(func(x int64) bool {
return x > 5 // Keep values greater than 5
}),
)
// Output: 6, 7, 8, 9
Transforming Dataโ
Transform operators convert values from one type to another:
obs := ro.Pipe1(
ro.Just("apple", "banana", "cherry"),
ro.Map(func(s string) string {
return strings.ToUpper(s)
}),
)
// Output: APPLE, BANANA, CHERRY
Combining Streamsโ
Combine multiple streams into one:
stream1 := ro.Just(1, 2, 3)
stream2 := ro.Just(4, 5, 6)
obs := ro.Concat(stream1, stream2)
// Output: 1, 2, 3, 4, 5, 6
Error Handlingโ
Handle errors gracefully to prevent application crashes:
riskyStream := ro.Pipe2(
ro.Just(1, 2, 3, 4, 5),
ro.MapErr(func(x int) (int, error) {
if x == 3 {
return 0, fmt.Errorf("error at %d", x)
}
return x * 2, nil
}),
ro.Catch(func(err error) ro.Observable[int] {
fmt.Println("Recovered from error:", err)
return ro.Just(42) // Fallback value
}),
)
Panics recoveryโ
The ro framework automatically catches panics that occur in operators and converts them to errors passed through destination.Error(...):
// A stream with a potential panic in Map operator
stream := ro.Pipe1(
ro.Just(1, 2, 3, 4, 5),
ro.Map(func(x int) string {
// This will panic when x == 3
if x == 3 {
panic("something went wrong!")
}
return fmt.Sprintf("processed-%d", x)
}),
)
// The framework automatically catches the panic and converts it to an error
subscription := stream.Subscribe(ro.NewObserver(
func(value string) {
fmt.Println("Received:", value)
},
func(err error) {
fmt.Println("Error (automatically caught):", err)
},
func() {
fmt.Println("Stream completed!")
},
))
subscription.Wait()
Output:
Received: processed-1
Received: processed-2
Error (automatically caught): something went wrong!
Real-world Example: API Rate Limitingโ
This example shows how to handle API calls with rate limiting and retry logic - a common real-world scenario.
package main
import (
"fmt"
"net/http"
"time"
"github.com/samber/ro"
)
func fetchUser(id int) (string, error) {
// Simulate API call
time.Sleep(100 * time.Millisecond)
return fmt.Sprintf("user-%d", id), nil
}
func main() {
// Create a stream of user IDs
userIds := ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Process with rate limiting
userStream := ro.Pipe3(
userIds,
ro.Map(fetchUser),
ro.DelayEach[string](200 * time.Millisecond), // 200ms pause between items
ro.RetryWithConfig(RetryConfig{MaxRetries: 2}), // Retry failed requests
)
// Subscribe and collect results
var results []string
subscription := userStream.Subscribe(ro.NewObserver(
func(user string) {
results = append(results, user)
fmt.Println("Fetched:", user)
},
func(err error) {
fmt.Println("Error:", err)
},
func() {
fmt.Println("All users fetched!")
fmt.Println("Results:", results)
},
))
// Wait for completion
subscription.Wait()
}
Creating Custom Operatorsโ
Create reusable operators to encapsulate common transformations:
// Custom operator that squares numbers
func Square[T constraints.Integer](observable ro.Observable[T]) ro.Observable[T] {
return ro.Map(func(x T) T {
return x * x
})(observable)
}
func main() {
result := Square(ro.Just(1, 2, 3, 4, 5))
result.Subscribe(ro.OnNext(func(x int) {
fmt.Println(x) // 1, 4, 9, 16, 25
}))
}
Hot vs Cold Observablesโ
Understanding the difference between hot and cold observables is crucial for building correct reactive applications:
Cold Observables (default)โ
Each subscriber gets their own independent stream:
cold := ro.Just(1, 2, 3)
// Each subscriber sees the same values independently. Consumption starts on subscription.
cold.Subscribe(ro.OnNext(func(x int) { fmt.Println("Sub1:", x) }))
cold.Subscribe(ro.OnNext(func(x int) { fmt.Println("Sub2:", x) }))
Hot Observablesโ
Multiple subscribers share the same stream. See Subject for more details.
// Create a hot observable from a cold one
hot := ro.Connectable(ro.Just(1, 2, 3))
// Both subscribers share the same sequence simultaneously
sub1 := hot.Subscribe(ro.OnNext(func(x int) { fmt.Println("Sub1:", x) }))
sub2 := hot.Subscribe(ro.OnNext(func(x int) { fmt.Println("Sub2:", x) }))
// Start subscription
subscription := connectable.Connect()
Best Practicesโ
Follow these practices to write maintainable and robust reactive code:
1. Use Pipeline Operatorsโ
Pipeline operators promote clean, reusable code:
// Good: Composable pipeline
pipeline := ro.Pipe3(
source,
ro.Filter(predicate),
ro.Map(transformer),
ro.Retry(3),
)
// Reusable pipeline
result1 := pipeline(stream1)
result2 := pipeline(stream2)
2. Handle Errors Gracefullyโ
Always handle errors to prevent application crashes:
stream := ro.Pipe2(
riskyOperation,
ro.Catch(func(err error) ro.Observable[string] {
// Log error and provide fallback
log.Println("Operation failed:", err)
return fallbackStream
}),
)
3. Manage Resourcesโ
Clean up resources to prevent memory leaks:
// Clean up resources when done
subscription := stream.Subscribe(observer)
defer subscription.Unsubscribe()
4. Avoid Memory Leaksโ
// Use Take to limit infinite streams
obs1 := ro.Pipe1(
source,
ro.Take[int](10), // Only 10 values
)
// Use TakeUntil with timeout
obs2 := ro.Pipe1(
source,
stream.TakeUntil[int](ro.Timer(30*time.Second))
)
Next Stepsโ
Now that you understand the basics, explore:
- Operators Reference: Learn about all available operators
- Examples: Check out practical examples in the examples directory
- Comparison Guides: See how
samber/rocompares tochannel,iter, andsamber/lo - Advanced Patterns:
Subjects, backpressure, and custom operators - Troubleshooting Guide: Debug and resolve common issues
Happy streaming! ๐