Help improve this documentation
This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Sink operatorsโ
This page lists all sink operators, available in the core package of ro.
ToSliceโ
Collects all emissions from the source Observable into a single slice and emits that slice when the source completes.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.ToSlice[int](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [1 2 3 4 5]
// CompletedWith empty observable
obs := ro.Pipe(
ro.Empty[int](),
ro.ToSlice[int](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [] (empty slice)
// CompletedWith error handling
obs := ro.Pipe(
ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, fmt.Errorf("error on 3")
}
return i, nil
}),
),
ro.ToSlice[int](),
)
sub := obs.Subscribe(ro.NewObserver(
func(value []int) {
fmt.Printf("Slice: %v\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Complete")
},
))
defer sub.Unsubscribe()
// Error: error on 3
// (No slice emitted due to error)With hot observable
source := ro.Interval(100 * time.Millisecond)
obs := ro.Pipe(
source,
ro.Take[int64](5),
ro.ToSlice[int64](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int64]())
time.Sleep(700 * time.Millisecond)
sub.Unsubscribe()
// Next: [0 1 2 3 4]
// CompletedWith large data sets
obs := ro.Pipe(
ro.Range(1, 1000),
ro.ToSlice[int](),
)
sub := obs.Subscribe(ro.OnNext(func(value []int) {
fmt.Printf("Collected %d items\n", len(value))
fmt.Printf("First: %d, Last: %d\n", value[0], value[len(value)-1])
}))
defer sub.Unsubscribe()
// Collected 1000 items
// First: 1, Last: 1000With conditional emission
obs := ro.Pipe(
ro.Range(1, 10),
ro.Filter(func(i int) bool {
return i%2 == 0 // Only even numbers
}),
ro.ToSlice[int](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [2 4 6 8]
// CompletedWith transformation pipeline
obs := ro.Pipe(
ro.Just("hello", "world", "reactive", "programming"),
ro.Map(func(s string) int {
return len(s)
}),
ro.Filter(func(length int) bool {
return length > 4
}),
ro.ToSlice[int](),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [5 5 11]
// CompletedWith async operations
obs := ro.Pipe(
ro.Pipe(
ro.Just("url1", "url2", "url3"),
ro.MapAsync(func(url string) ro.Observable[string] {
return ro.Defer(func() ro.Observable[string] {
time.Sleep(50 * time.Millisecond)
return ro.Just("data_" + url)
})
}, 2),
),
ro.ToSlice[string](),
)
sub := obs.Subscribe(ro.PrintObserver[[]string]())
time.Sleep(300 * time.Millisecond)
defer sub.Unsubscribe()
// Next: [data_url1 data_url2 data_url3]
// CompletedWith real-time data collection
// Simulate sensor readings
sensorData := ro.Interval(1 * time.Second)
obs := ro.Pipe(
sensorData,
ro.Take[int64](10),
ro.Map(func(timestamp int64) float64 {
// Simulate temperature readings
return 20.0 + rand.Float64()*10
}),
ro.ToSlice[float64](),
)
sub := obs.Subscribe(ro.OnNext(func(readings []float64) {
fmt.Printf("Collected %d temperature readings\n", len(readings))
avg := 0.0
for _, r := range readings {
avg += r
}
avg /= float64(len(readings))
fmt.Printf("Average temperature: %.2fยฐC\n", avg)
}))
time.Sleep(12 * time.Second)
sub.Unsubscribe()With batch processing
// Process items in batches but collect results
obs := ro.Pipe(
ro.Range(1, 25),
ro.BufferWithCount[int](5),
ro.Map(func(batch []int) string {
sum := 0
for _, num := range batch {
sum += num
}
return fmt.Sprintf("batch-sum-%d", sum)
}),
ro.ToSlice[string](),
)
sub := obs.Subscribe(ro.PrintObserver[[]string]())
defer sub.Unsubscribe()
// Next: [batch-sum-15 batch-sum-40 batch-sum-65 batch-sum-90 batch-sum-115]
// CompletedPrototype:func ToSlice[T any]()
ToMapโ
Collects all emissions from the source Observable into a map. Items are keyed by the result of the key selector function.
obs := ro.Pipe(
ro.Just("apple", "banana", "cherry"),
ro.ToMap(func(s string) (string, string) {
return s[:1], s // Use first letter as key, whole string as value
}),
)
sub := obs.Subscribe(ro.PrintObserver[map[string]string]())
defer sub.Unsubscribe()
// Next: map[a:apple b:banana c:cherry]
// CompletedToMapWithValue
type User struct {
id int
name string
}
obs := ro.Pipe(
ro.Just(
User{id: 1, name: "Alice"},
User{id: 2, name: "Bob"},
User{id: 3, name: "Charlie"},
),
ro.ToMapWithValue(
func(u User) int { return u.id },
func(u User) string { return u.name },
),
)
sub := obs.Subscribe(ro.PrintObserver[map[int]string]())
defer sub.Unsubscribe()
// Next: map[1:Alice 2:Bob 3:Charlie]
// CompletedWith key collisions (last value wins)
obs := ro.Pipe(
ro.Just("apple", "avocado", "banana"),
ro.ToMap(func(s string) string {
return s[:1] // 'a' appears twice
}),
)
sub := obs.Subscribe(ro.PrintObserver[map[string]string]())
defer sub.Unsubscribe()
// Next: map[a:avocado b:banana]
// Completed (avocado overwrites apple)With empty observable
obs := ro.Pipe(
ro.Empty[string](),
ro.ToMap(func(s string) int {
return len(s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[map[int]string]())
defer sub.Unsubscribe()
// Next: map[]
// CompletedWith error handling
obs := ro.Pipe(
ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, fmt.Errorf("error on 3")
}
return i, nil
}),
),
ro.ToMap(func(i int) int {
return i
}),
)
sub := obs.Subscribe(ro.NewObserver(
func(value map[int]int) {
fmt.Printf("Map: %v\n", value)
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Complete")
},
))
defer sub.Unsubscribe()
// Error: error on 3
// (No map emitted due to error)With duplicate keys handling
// Example showing how to handle collisions by creating composite values
obs := ro.Pipe(
ro.Just(
"file1.txt",
"file2.txt",
"image1.jpg",
"image2.jpg",
"doc1.pdf",
),
ro.ToMapWithValue(
func(s string) string {
// Extract extension as key
if dot := strings.LastIndex(s, "."); dot > 0 {
return s[dot+1:]
}
return "unknown"
},
func(s string) []string {
// Collect all files for each extension
return []string{s}
},
),
)
sub := obs.Subscribe(ro.PrintObserver[map[string][]string]())
defer sub.Unsubscribe()
// Next: map[jpg:[image1.jpg image2.jpg] pdf:[doc1.pdf] txt:[file1.txt file2.txt]]
// This won't work as expected since values get overwritten.
// See next example for proper collision handling.With complex value aggregation
// For true duplicate handling, preprocess the data first
type FileGroup struct {
Extension string
Files []string
}
obs := ro.Pipe(
ro.Just("file1.txt", "file2.txt", "image1.jpg", "image2.jpg"),
ro.ToSlice[string](),
ro.Map(func(files []string) map[string][]string {
result := make(map[string][]string)
for _, file := range files {
if dot := strings.LastIndex(file, "."); dot > 0 {
ext := file[dot+1:]
result[ext] = append(result[ext], file)
}
}
return result
}),
)
sub := obs.Subscribe(ro.PrintObserver[map[string][]string]())
defer sub.Unsubscribe()
// Next: map[jpg:[image1.jpg image2.jpg] txt:[file1.txt file2.txt]]
// CompletedWith struct transformation
type Product struct {
SKU string
Name string
Price float64
}
obs := ro.Pipe(
ro.Just(
Product{SKU: "P001", Name: "Laptop", Price: 999.99},
Product{SKU: "P002", Name: "Mouse", Price: 29.99},
Product{SKU: "P003", Name: "Keyboard", Price: 79.99},
),
ro.ToMapWithValue(
func(p Product) string { return p.SKU },
func(p Product) Product { return p },
),
)
sub := obs.Subscribe(ro.PrintObserver[map[string]Product]())
defer sub.Unsubscribe()
// Next: map[P001:{SKU:P001 Name:Laptop Price:999.99} P002:{SKU:P002 Name:Mouse Price:29.99} P003:{SKU:P003 Name:Keyboard Price:79.99}]
// CompletedWith hot observable
source := ro.Interval(100 * time.Millisecond)
obs := ro.Pipe(
source,
ro.Take[int64](5),
ro.ToMapWithValue(
func(i int64) int { return int(i) },
func(i int64) string { return fmt.Sprintf("item-%d", i) },
),
)
sub := obs.Subscribe(ro.PrintObserver[map[int]string]())
time.Sleep(700 * time.Millisecond)
sub.Unsubscribe()
// Next: map[0:item-0 1:item-1 2:item-2 3:item-3 4:item-4]
// CompletedWith filtered data
obs := ro.Pipe(
ro.Range(1, 20),
ro.Filter(func(i int) bool {
return i%3 == 0 // Only multiples of 3
}),
ro.ToMapWithValue(
func(i int) int { return i },
func(i int) string { return fmt.Sprintf("multiple-%d", i) },
),
)
sub := obs.Subscribe(ro.PrintObserver[map[int]string]())
defer sub.Unsubscribe()
// Next: map[3:multiple-3 6:multiple-6 9:multiple-9 12:multiple-12 15:multiple-15 18:multiple-18]
// CompletedWith async operations
obs := ro.Pipe(
ro.Pipe(
ro.Just("user1", "user2", "user3"),
MapAsync(func(userID string) Observable[struct {
ID string
Data string
}] {
return Defer(func() Observable[struct {
ID string
Data string
}] {
time.Sleep(50 * time.Millisecond)
return ro.Just(struct {
ID string
Data string
}{
ID: userID,
Data: "data_for_" + userID,
})
})
}, 2),
),
ro.ToMapWithValue(
func(user struct {
ID string
Data string
}) string { return user.ID },
func(user struct {
ID string
Data string
}) string { return user.Data },
),
)
sub := obs.Subscribe(ro.PrintObserver[map[string]string]())
time.Sleep(300 * time.Millisecond)
defer sub.Unsubscribe()
// Next: map[user1:data_for_user1 user2:data_for_user2 user3:data_for_user3]
// CompletedWith real-time data collection
// Simulate sensor data collection by sensor ID
sensorReadings := ro.Interval(500 * time.Millisecond)
obs := ro.Pipe(
sensorReadings,
ro.Take[int64](10),
ro.Map(func(timestamp int64) struct {
SensorID string
Value float64
Time int64
} {
sensors := []string{"temp-01", "temp-02", "humidity-01"}
sensor := sensors[int(timestamp)%len(sensors)]
return struct {
SensorID string
Value float64
Time int64
}{
SensorID: sensor,
Value: 20.0 + rand.Float64()*15,
Time: time.Now().Unix(),
}
}),
ro.ToMapWithValue(
func(reading struct {
SensorID string
Value float64
Time int64
}) string { return reading.SensorID },
func(reading struct {
SensorID string
Value float64
Time int64
}) float64 { return reading.Value },
),
)
sub := obs.Subscribe(ro.OnNext(func(readings map[string]float64) {
fmt.Printf("Latest sensor readings: %v\n", readings)
}))
time.Sleep(6 * time.Second)
sub.Unsubscribe()Prototypes:func ToMap[T any, K comparable, V any](project func(item T) (K, V))
func ToMapWithContext[T any, K comparable, V any](project func(ctx context.Context, item T) (K, V))
func ToMapI[T any, K comparable, V any](mapper func(item T, index int64) (K, V))
func ToMapIWithContext[T any, K comparable, V any](mapper func(ctx context.Context, item T, index int64) (K, V))ToChannelโ
Converts the source Observable to a channel, emitting all values from the Observable through the channel.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.ToChannel[int](0), // Unbuffered channel
)
sub := obs.Subscribe(ro.PrintObserver[<-chan int]())
defer sub.Unsubscribe()
// Next: 0xc0000a2000 (channel address)
// Completed
// You would typically consume the channel like this:
// channel := <-sub.Next()
// for value := range channel {
// fmt.Println("Received:", value)
// }With buffered channel
obs := ro.Pipe(
ro.Just("hello", "world", "reactive"),
ro.ToChannel[string](5), // Buffered channel with capacity 5
)
sub := obs.Subscribe(ro.OnNext(func(ch <-chan string) {
fmt.Println("Channel received, consuming values:")
for value := range ch {
fmt.Printf(" %s\n", value)
}
fmt.Println("Channel closed")
}))
defer sub.Unsubscribe()
// Channel received, consuming values:
// hello
// world
// reactive
// Channel closedToChannelWithContext
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
obs := ro.Pipe(
ro.Interval(500*time.Millisecond),
ro.ToChannelWithContext[int64](ctx, 3),
)
sub := obs.Subscribe(ro.OnNext(func(ch <-chan int64) {
fmt.Println("Reading from channel with context:")
for value := range ch {
fmt.Printf(" Value: %d\n", value)
}
fmt.Println("Channel closed (context cancelled or source completed)")
}))
time.Sleep(3 * time.Second)
sub.Unsubscribe()
// Reading from channel with context:
// Value: 0
// Value: 1
// Value: 2
// Value: 3
// Channel closed (context cancelled or source completed)With hot observable and multiple consumers
// Create a shared channel from a hot observable
source := ro.Interval(200 * time.Millisecond)
channelObs := ro.Pipe(
source,
ro.Take[int64](10),
ro.ToChannel[int64](5),
)
sub := channelObs.Subscribe(ro.PrintObserver[<-chan int64]())
defer sub.Unsubscribe()
// Get the channel
var resultChan <-chan int64
sub = channelObs.Subscribe(ro.OnNext(func(ch <-chan int64) {
resultChan = ch
}))
time.Sleep(100 * time.Millisecond)
sub.Unsubscribe()
if resultChan != nil {
// Multiple goroutines can consume from the same channel
var wg sync.WaitGroup
// Consumer 1
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
value, ok := <-resultChan
if !ok {
break
}
fmt.Printf("Consumer 1: %d\n", value)
}
}()
// Consumer 2
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 5; i++ {
value, ok := <-resultChan
if !ok {
break
}
fmt.Printf("Consumer 2: %d\n", value)
}
}()
wg.Wait()
}With error handling
obs := ro.Pipe(
ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, fmt.Errorf("error on 3")
}
return i, nil
}),
),
ro.ToChannel[int](3),
)
sub := obs.Subscribe(ro.NewObserver(
func(ch <-chan int) {
fmt.Println("Channel received, consuming values:")
for value := range ch {
fmt.Printf(" %d\n", value)
}
fmt.Println("Channel closed")
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Complete")
},
))
defer sub.Unsubscribe()
// Error: error on 3
// (No channel emitted due to error)With finite stream and timeout
obs := ro.Pipe(
ro.Just("data1", "data2", "data3", "data4", "data5"),
ro.ToChannel[string](2),
)
sub := obs.Subscribe(ro.OnNext(func(ch <-chan string) {
fmt.Println("Processing channel with timeout:")
timeout := time.After(3 * time.Second)
for {
select {
case value, ok := <-ch:
if !ok {
fmt.Println("Channel closed normally")
return
}
fmt.Printf(" Received: %s\n", value)
case <-timeout:
fmt.Println("Timeout reached")
return
}
}
}))
defer sub.Unsubscribe()With complex data transformation
type Event struct {
ID string
Type string
Payload interface{}
Time time.Time
}
obs := ro.Pipe(
ro.Just(
Event{ID: "1", Type: "click", Payload: "button", Time: time.Now()},
Event{ID: "2", Type: "scroll", Payload: 100, Time: time.Now()},
Event{ID: "3", Type: "click", Payload: "link", Time: time.Now()},
),
ro.ToChannel[Event](3),
)
sub := obs.Subscribe(ro.OnNext(func(ch <-chan Event) {
fmt.Println("Processing events from channel:")
for event := range ch {
fmt.Printf(" Event %s (%s): %v\n", event.ID, event.Type, event.Payload)
}
fmt.Println("All events processed")
}))
defer sub.Unsubscribe()With backpressure control
// Fast producer with slow consumer through buffered channel
fastProducer := ro.Interval(10 * time.Millisecond) // 100 values/second
obs := ro.Pipe(
fastProducer,
ro.Take[int64](50),
ro.ToChannel[int64](10), // Buffer of 10 provides backpressure
)
sub := obs.Subscribe(ro.OnNext(func(ch <-chan int64) {
fmt.Println("Processing with backpressure:")
processed := 0
for value := range ch {
// Simulate slow processing
time.Sleep(50 * time.Millisecond)
processed++
if processed%10 == 0 {
fmt.Printf(" Processed %d values (buffer managing backpressure)\n", processed)
}
}
fmt.Printf("Total processed: %d\n", processed)
}))
time.Sleep(3 * time.Second)
sub.Unsubscribe()With context cancellation
ctx, cancel := context.WithCancel(context.Background())
obs := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.ToChannelWithContext[int64](ctx, 5),
)
sub := obs.Subscribe(ro.OnNext(func(ch <-chan int64) {
fmt.Println("Reading from cancellable channel:")
count := 0
// Cancel after receiving 3 values
go func() {
time.Sleep(350 * time.Millisecond)
fmt.Println("Cancelling context...")
cancel()
}()
for value := range ch {
count++
fmt.Printf(" Value %d: %d\n", count, value)
}
fmt.Printf("Channel closed after %d values\n", count)
}))
time.Sleep(1 * time.Second)
sub.Unsubscribe()
// Reading from cancellable channel:
// Value 1: 0
// Value 2: 1
// Value 3: 2
// Cancelling context...
// Channel closed after 3 valuesWith real-time data streaming
// Simulate real-time sensor data streaming
sensorData := func() Observable[struct {
SensorID string
Temperature float64
Humidity float64
Timestamp time.Time
}] {
return ro.Pipe(
ro.Interval(1 * time.Second),
ro.Map(func(_ int64) struct {
SensorID string
Temperature float64
Humidity float64
Timestamp time.Time
} {
return struct {
SensorID string
Temperature float64
Humidity float64
Timestamp time.Time
}{
SensorID: "sensor-01",
Temperature: 20.0 + rand.Float64()*10,
Humidity: 40.0 + rand.Float64()*20,
Timestamp: time.Now(),
}
}),
)
}
obs := ro.Pipe(
sensorData(),
ro.Take[struct {
SensorID string
Temperature float64
Humidity float64
Timestamp time.Time
}](5),
ro.ToChannel[struct {
SensorID string
Temperature float64
Humidity float64
Timestamp time.Time
}](1),
)
sub := obs.Subscribe(ro.OnNext(
func(ch <-chan struct {
SensorID string
Temperature float64
Humidity float64
Timestamp time.Time
}) {
fmt.Println("Real-time sensor data streaming:")
for reading := range ch {
fmt.Printf(" [%s] %s: %.1fยฐC, %.1f%%\n",
reading.Timestamp.Format("15:04:05"),
reading.SensorID,
reading.Temperature,
reading.Humidity)
}
fmt.Println("Stream ended")
},
))
time.Sleep(6 * time.Second)
sub.Unsubscribe()Variant:Prototypes:func ToChannel[T any](bufferSize int)
func ToChannelWithContext[T any](ctx context.Context, bufferSize int)