# ro - Reactive programming for Go A Go implementation of the ReactiveX spec that provides a declarative and composable way to handle asynchronous data streams. The library simplifies development of event-driven and asynchronous applications by offering over 200 operators for transforming, filtering, combining, and managing streams of data. ## Getting Started ```go import "github.com/samber/ro" // Basic reactive stream observable := ro.Pipe( ro.Range(0, 10), ro.Filter(func(x int) bool { return x%2 == 0 }), ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }), ) // Subscribe to consume values observable.Subscribe(ro.OnNext(func(s string) { fmt.Println(s) })) // Output: even-0, even-2, even-4, even-6, even-8 ``` ## Core Concepts - **Observable**: A stream of data that emits values over time - **Observer**: Consumes values from an Observable via Next, Error, and Complete events - **Operator**: Functions that transform, filter, or combine Observables - **Subscription**: Represents the execution of an Observable that can be cancelled ## Core Operators ### Creation Operators - `Of`, `Just` - Create Observable from specified values - `Start` - Create Observable that emits a single lazily-evaluated value - `Timer` - Emit after specified duration - `Interval` - Emit sequential numbers at time intervals - `IntervalWithInitial` - Like Interval but with custom initial interval - `Range` - Emit range of integers - `RangeWithStep` - Emit range of floats with custom step - `RangeWithInterval` - Emit range of integers with time intervals - `RangeWithStepAndInterval` - Emit range of floats with step and intervals - `FromSlice` - Create Observable from slice - `FromChannel` - Create Observable from channel - `Empty` - Emit no values and complete - `Never` - Never emit or complete - `Throw` - Emit an error - `Defer` - Create Observable lazily for each Observer - `Future` - Create Observable from async function returning value/error - `Repeat` - Emit a single value multiple times - `RepeatWithInterval` - Emit a single value multiple times with intervals - `RandIntN` - Emit random integers in range [0, n) - `RandFloat64` - Emit random float64 values in range [0, 1) - `Merge` - Merge multiple Observables (static version) - `CombineLatest2/3/4/5` - Combine latest values from 2-5 Observables - `CombineLatestAny` - Combine latest values from any Observables - `Zip2/3/4/5/6` - Combine values from 2-6 Observables in order - `Zip` - Combine values from multiple Observables in order - `Concat` - Concatenate Observables sequentially - `Race` - Emit from first Observable to emit - `Amb` - Alias for Race ### Transformation Operators - `Map` - Transform each item using a function - `MapTo` - Map each item to a constant value - `MapErr` - Transform with error handling - `FlatMap` - Map to Observables and flatten - `Flatten` - Flatten Observable of arrays - `Cast` - Convert values to specified type - `Scan` - Accumulate values with seed - `GroupBy` - Group items by key - `BufferWhen` - Buffers items until boundary Observable emits - `BufferWithTimeOrCount` - Buffers by time or count - `BufferWithCount` - Buffers by count - `BufferWithTime` - Buffers by time - `WindowWhen` - Creates windows based on boundary Observable - `SampleWhen` - Samples latest value when tick Observable emits - `SampleTime` - Samples values at time intervals - `ThrottleWhen` - Throttles using tick Observable - `ThrottleTime` - Throttles for time duration ### Filtering Operators - `Filter` - Emit items passing predicate test - `Distinct` - Suppress duplicate items - `DistinctBy` - Suppress duplicate items, based on key selector - `IgnoreElements` - Ignores all items, only termination notifications - `Take` - Emit only first n items - `Skip` - Skip first n items - `TakeWhile` - Take items while condition is true - `SkipWhile` - Skip items while condition is true - `TakeLast` - Emit only last n items - `SkipLast` - Suppresses last n items - `TakeUntil` - Takes items until signal Observable emits - `SkipUntil` - Skips items until signal Observable emits - `First` - Emit first item matching predicate - `Last` - Emit last item matching predicate - `Head` - Emit only first item (error if empty) - `Tail` - Emit only last item (error if empty) - `ElementAt` - Emit nth item - `ElementAtOrDefault` - Emit nth item or fallback ### Combining Operators - `Merge` - Merge multiple Observables - `MergeWith` - Merge with 1 Observable (alias for MergeWith1) - `MergeWith1/2/3/4/5` - Merge with 1-5 Observables - `MergeAll` - Merges higher-order Observable - `MergeMap` - Maps to Observables then merges - `Concat` - Concatenate Observables sequentially - `ConcatWith` - Concatenates with other Observables - `ConcatAll` - Concatenates higher-order Observable - `CombineLatestWith` - Combine with 1 Observable (alias for CombineLatestWith1) - `CombineLatestWith1/2/3/4` - Combine with 1-4 Observables - `CombineLatestAll` - Combine all Observables from higher-order Observable - `CombineLatestAllAny` - CombineLatestAll for any type - `ZipWith` - Zip with 1 Observable (alias for ZipWith1) - `ZipWith1/2/3/4/5` - Zip with 1-5 Observables - `ZipAll` - Zips all Observables from higher-order Observable - `Race` - Emit from first Observable to emit - `RaceWith` - Races with other Observables - `StartWith` - Emit values before source - `EndWith` - Emit values after source - `Pairwise` - Emit pairs of consecutive values ### Error Handling Operators - `Catch` - Catch errors and return fallback Observable - `OnErrorResumeNextWith` - Continues with fallback Observables on error - `OnErrorReturn` - Emit fallback value on error - `Retry` - Retries infinitely on error - `RetryWithConfig` - Retries with configurable options - `ThrowIfEmpty` - Throws error if source is empty - `DoWhile` - Repeats while condition is true (do-while loop) - `While` - Repeats while condition is true (while loop) ### Math & Aggregation Operators - `Count` - Count number of items - `Sum` - Sum numeric values - `Average` - Calculate average of numeric values - `Min` - Emit minimum value - `Max` - Emit maximum value - `Clamp` - Clamp values within bounds - `Abs` - Emit absolute values - `Round` - Round float values - `Floor` - Emit floor of values - `FloorWithPrecision` - Floor values with any integer precision (positive or negative) - `Ceil` / `CeilWithPrecision` - Emit ceiling of values (optionally with precision) - `Trunc` - Emit truncated values - `Reduce` - Reduce to single value with accumulator ### Utility Operators - `Tap` / `Do` - Perform side effects (alias for each other) - `TapOnNext` / `DoOnNext` - Side effects for Next notifications - `TapOnError` / `DoOnError` - Side effects for Error notifications - `TapOnComplete` / `DoOnComplete` - Side effects for Complete notifications - `TapOnSubscribe` / `DoOnSubscribe` - Side effects on subscription - `TapOnFinalize` / `DoOnFinalize` - Side effects on unsubscription - `Delay` - Delay all notifications by duration - `DelayEach` - Delay each item by duration - `Timeout` - Error if no item within duration - `Timestamp` - Emit values with timestamp - `TimeInterval` - Emit values with time elapsed between emissions - `Materialize` - Convert to Notification stream - `Dematerialize` - Convert from Notification stream - `RepeatWith` - Repeats source Observable n times - `Serialize` - Ensures thread-safe message passing by wrapping observable in SafeObservable ### Conditional Operators - `All` - Test if all items satisfy condition - `Contains` - Test if any item satisfies condition - `Find` - Find first item matching condition - `DefaultIfEmpty` - Emit default value if source is empty - `Iif` - Return Observable based on condition - `SequenceEqual` - Tests if two Observables are equal ### Context Operators - `ContextWithValue` - Add key-value to context - `ContextWithTimeout` - Add timeout to context - `ContextWithTimeoutCause` - Add timeout with cause to context - `ContextWithDeadline` - Add deadline to context - `ContextWithDeadlineCause` - Add deadline with cause to context - `ContextReset` - Reset context to new context - `ContextMap` - Map context using function - `ThrowOnContextCancel` - Throws error if context is cancelled ### Connectable Operators - `Share` - Share Observable among multiple subscribers - `ShareWithConfig` - Share with custom configuration - `ShareReplay` - Share with replay buffer - `ShareReplayWithConfig` - ShareReplay with custom configuration ### Sink Operators - `ToSlice` - Collect all items into a slice - `ToMap` - Collect items into a map - `ToChannel` - Forward items to a channel ## Available Plugins ### Data Manipulation - **bytes** - Byte slice manipulation operators - **strings** - String manipulation operators (Capitalize, CamelCase, SnakeCase, etc.) - **sort** - Sorting operators - **time** - Time manipulation - **exp/simd** - SIMD-accelerated math operators (Add, Sub, Min, Max, Clamp...) ### Encoding & Serialization - **encoding/json** - JSON marshaling and unmarshaling - **encoding/csv** - CSV reading and writing - **encoding/base64** - Base64 encoding and decoding - **encoding/gob** - Go binary serialization ### Scheduling & Timing - **cron** - Schedule jobs using cron expressions or intervals - **ICS** - Read and parse ICS/iCal calendars ### Network & I/O - **http/client** - HTTP request operators - **io** - File and stream I/O operators - **fsnotify** - File system monitoring operators - **websocket/client** - WebSocket client operators ### Observability & Logging - **observability/log** - Standard logging operators - **observability/zap** - Structured logging with zap - **observability/logrus** - Structured logging with logrus - **observability/slog** - Structured logging with slog - **observability/zerolog** - Structured logging with zerolog - **observability/sentry** - Error tracking with Sentry ### Rate Limiting - **ratelimit/native** - Native rate limiting operators - **ratelimit/ulule** - Rate limiting with ulule/limiter ### Text Processing - **regexp** - Regular expression operators - **template** - Template processing operators ### System Integration - **proc** - Process execution operators - **signal** - Signal handling operators - **iter** - Iterator operators ### Data Validation - **ozzo/ozzo-validation** - Data validation operators ### Utilities - **hyperloglog** - Cardinality estimation operators - **samber/hot** - In-memory cache - **samber/psi** - Starvation notifier - **testify** - Testing utilities ## AI Agent Skill: ```bash npx skills add https://github.com/samber/cc-skills-golang --skill golang-samber-ro ``` ## Examples ### HTTP Requests with Error Handling ```go import rohttp "github.com/samber/ro/plugin/http/client" urls := []string{ "https://api.github.com/users/samber", "https://api.github.com/users/ansible", } obs := ro.Pipe( ro.FromSlice(urls), ro.MergeMap(func(url string) ro.Observable[[]byte] { return rohttp.Get[[]byte](url) }), ro.Map(func(body []byte] map[string]any { var user map[string]any json.Unmarshal(body, &user) return user }), ro.Map(func(user map[string]any) string { return user["login"].(string) }), ) obs.Subscribe(ro.OnNext(func(login string) { fmt.Printf("User: %s\n", login) })) ``` ### Real-time File Monitoring ```go import rofsnotify "github.com/samber/ro/plugin/fsnotify" obs := rofsnotify.Watch("./config.yaml") obs.Subscribe( ro.OnNext(func(event fsnotify.Event) { fmt.Printf("File event: %s\n", event.Op.String()) }), ro.OnError(func(err error) { fmt.Printf("Error: %v\n", err) }), ) ``` ### Data Pipeline with Validation ```go import rovalidation "github.com/samber/ro/plugin/ozzo/ozzo-validation" type User struct { Name string `json:"name"` Email string `json:"email"` Age int `json:"age"` } obs := ro.Pipe( ro.FromChannel(userChannel), rovalidation.Validate(func(user User) error { return validation.ValidateStruct(&user, validation.Field(&user.Name, validation.Required), validation.Field(&user.Email, validation.Required, is.Email), validation.Field(&user.Age, validation.Required, validation.Min(18)), ) }), ro.Map(func(user User) string { return fmt.Sprintf("Valid user: %s (%s)", user.Name, user.Email) }), ) obs.Subscribe(ro.OnNext(func(msg string) { fmt.Println(msg) })) ``` ### Rate Limited Processing ```go import rorate "github.com/samber/ro/plugin/ratelimit/native" obs := ro.Pipe( ro.Range(0, 100), rorate.Limit(10), // max 10 items per second ro.Map(func(i int) string { return fmt.Sprintf("Processing item %d", i) }), ) obs.Subscribe(ro.OnNext(func(msg string) { fmt.Println(msg) })) ``` ## Documentation - [Official Documentation](https://ro.samber.dev) - [API Reference](https://pkg.go.dev/github.com/samber/ro) - [Examples](https://github.com/samber/ro/tree/main/examples) ## Comparison with other libraries `samber/ro` vs `samber/lo`: | Feature | `samber/lo` | `samber/ro` | |---------|-------------|-------------| | Usage | Loop over in-memory objects | Manipulate event-driven objects | | Data structure | Slices, maps, iterators | Streams, observables, observers | | Model | Synchronous | Asynchronous | | Size | Finite sequences | Infinite streams | | Flow control | Pull-based (iterators) | Push-based | `samber/lo` is a bunch of helpers for looping across finite sequences (maps, slices...) `samber/ro` is focused on processing of infinite data streams in event-driven applications.