Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Sink operatorsโ€‹

This page lists all sink operators, available in the core package of ro.

  • Collects all emissions from the source Observable into a single slice and emits that slice when the source completes.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.ToSlice[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [1 2 3 4 5]
    // Completed

    With empty observable

    obs := ro.Pipe(
    ro.Empty[int](),
    ro.ToSlice[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [] (empty slice)
    // Completed

    With error handling

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, fmt.Errorf("error on 3")
    }
    return i, nil
    }),
    ),
    ro.ToSlice[int](),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value []int) {
    fmt.Printf("Slice: %v\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Complete")
    },
    ))
    defer sub.Unsubscribe()

    // Error: error on 3
    // (No slice emitted due to error)

    With hot observable

    source := ro.Interval(100 * time.Millisecond)
    obs := ro.Pipe(
    source,
    ro.Take[int64](5),
    ro.ToSlice[int64](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int64]())
    time.Sleep(700 * time.Millisecond)
    sub.Unsubscribe()

    // Next: [0 1 2 3 4]
    // Completed

    With large data sets

    obs := ro.Pipe(
    ro.Range(1, 1000),
    ro.ToSlice[int](),
    )

    sub := obs.Subscribe(ro.OnNext(func(value []int) {
    fmt.Printf("Collected %d items\n", len(value))
    fmt.Printf("First: %d, Last: %d\n", value[0], value[len(value)-1])
    }))
    defer sub.Unsubscribe()

    // Collected 1000 items
    // First: 1, Last: 1000

    With conditional emission

    obs := ro.Pipe(
    ro.Range(1, 10),
    ro.Filter(func(i int) bool {
    return i%2 == 0 // Only even numbers
    }),
    ro.ToSlice[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [2 4 6 8]
    // Completed

    With transformation pipeline

    obs := ro.Pipe(
    ro.Just("hello", "world", "reactive", "programming"),
    ro.Map(func(s string) int {
    return len(s)
    }),
    ro.Filter(func(length int) bool {
    return length > 4
    }),
    ro.ToSlice[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [5 5 11]
    // Completed

    With async operations

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just("url1", "url2", "url3"),
    ro.MapAsync(func(url string) ro.Observable[string] {
    return ro.Defer(func() ro.Observable[string] {
    time.Sleep(50 * time.Millisecond)
    return ro.Just("data_" + url)
    })
    }, 2),
    ),
    ro.ToSlice[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]string]())
    time.Sleep(300 * time.Millisecond)
    defer sub.Unsubscribe()

    // Next: [data_url1 data_url2 data_url3]
    // Completed

    With real-time data collection

    // Simulate sensor readings
    sensorData := ro.Interval(1 * time.Second)
    obs := ro.Pipe(
    sensorData,
    ro.Take[int64](10),
    ro.Map(func(timestamp int64) float64 {
    // Simulate temperature readings
    return 20.0 + rand.Float64()*10
    }),
    ro.ToSlice[float64](),
    )

    sub := obs.Subscribe(ro.OnNext(func(readings []float64) {
    fmt.Printf("Collected %d temperature readings\n", len(readings))
    avg := 0.0
    for _, r := range readings {
    avg += r
    }
    avg /= float64(len(readings))
    fmt.Printf("Average temperature: %.2fยฐC\n", avg)
    }))
    time.Sleep(12 * time.Second)
    sub.Unsubscribe()

    With batch processing

    // Process items in batches but collect results
    obs := ro.Pipe(
    ro.Range(1, 25),
    ro.BufferWithCount[int](5),
    ro.Map(func(batch []int) string {
    sum := 0
    for _, num := range batch {
    sum += num
    }
    return fmt.Sprintf("batch-sum-%d", sum)
    }),
    ro.ToSlice[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]string]())
    defer sub.Unsubscribe()

    // Next: [batch-sum-15 batch-sum-40 batch-sum-65 batch-sum-90 batch-sum-115]
    // Completed
    Prototype:
    func ToSlice[T any]()
  • Collects all emissions from the source Observable into a map. Items are keyed by the result of the key selector function.

    obs := ro.Pipe(
    ro.Just("apple", "banana", "cherry"),
    ro.ToMap(func(s string) (string, string) {
    return s[:1], s // Use first letter as key, whole string as value
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[string]string]())
    defer sub.Unsubscribe()

    // Next: map[a:apple b:banana c:cherry]
    // Completed

    ToMapWithValue

    type User struct {
    id int
    name string
    }

    obs := ro.Pipe(
    ro.Just(
    User{id: 1, name: "Alice"},
    User{id: 2, name: "Bob"},
    User{id: 3, name: "Charlie"},
    ),
    ro.ToMapWithValue(
    func(u User) int { return u.id },
    func(u User) string { return u.name },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[int]string]())
    defer sub.Unsubscribe()

    // Next: map[1:Alice 2:Bob 3:Charlie]
    // Completed

    With key collisions (last value wins)

    obs := ro.Pipe(
    ro.Just("apple", "avocado", "banana"),
    ro.ToMap(func(s string) string {
    return s[:1] // 'a' appears twice
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[string]string]())
    defer sub.Unsubscribe()

    // Next: map[a:avocado b:banana]
    // Completed (avocado overwrites apple)

    With empty observable

    obs := ro.Pipe(
    ro.Empty[string](),
    ro.ToMap(func(s string) int {
    return len(s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[int]string]())
    defer sub.Unsubscribe()

    // Next: map[]
    // Completed

    With error handling

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, fmt.Errorf("error on 3")
    }
    return i, nil
    }),
    ),
    ro.ToMap(func(i int) int {
    return i
    }),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(value map[int]int) {
    fmt.Printf("Map: %v\n", value)
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Complete")
    },
    ))
    defer sub.Unsubscribe()

    // Error: error on 3
    // (No map emitted due to error)

    With duplicate keys handling

    // Example showing how to handle collisions by creating composite values
    obs := ro.Pipe(
    ro.Just(
    "file1.txt",
    "file2.txt",
    "image1.jpg",
    "image2.jpg",
    "doc1.pdf",
    ),
    ro.ToMapWithValue(
    func(s string) string {
    // Extract extension as key
    if dot := strings.LastIndex(s, "."); dot > 0 {
    return s[dot+1:]
    }
    return "unknown"
    },
    func(s string) []string {
    // Collect all files for each extension
    return []string{s}
    },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[string][]string]())
    defer sub.Unsubscribe()

    // Next: map[jpg:[image1.jpg image2.jpg] pdf:[doc1.pdf] txt:[file1.txt file2.txt]]
    // This won't work as expected since values get overwritten.
    // See next example for proper collision handling.

    With complex value aggregation

    // For true duplicate handling, preprocess the data first
    type FileGroup struct {
    Extension string
    Files []string
    }

    obs := ro.Pipe(
    ro.Just("file1.txt", "file2.txt", "image1.jpg", "image2.jpg"),
    ro.ToSlice[string](),
    ro.Map(func(files []string) map[string][]string {
    result := make(map[string][]string)
    for _, file := range files {
    if dot := strings.LastIndex(file, "."); dot > 0 {
    ext := file[dot+1:]
    result[ext] = append(result[ext], file)
    }
    }
    return result
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[string][]string]())
    defer sub.Unsubscribe()

    // Next: map[jpg:[image1.jpg image2.jpg] txt:[file1.txt file2.txt]]
    // Completed

    With struct transformation

    type Product struct {
    SKU string
    Name string
    Price float64
    }

    obs := ro.Pipe(
    ro.Just(
    Product{SKU: "P001", Name: "Laptop", Price: 999.99},
    Product{SKU: "P002", Name: "Mouse", Price: 29.99},
    Product{SKU: "P003", Name: "Keyboard", Price: 79.99},
    ),
    ro.ToMapWithValue(
    func(p Product) string { return p.SKU },
    func(p Product) Product { return p },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[string]Product]())
    defer sub.Unsubscribe()

    // Next: map[P001:{SKU:P001 Name:Laptop Price:999.99} P002:{SKU:P002 Name:Mouse Price:29.99} P003:{SKU:P003 Name:Keyboard Price:79.99}]
    // Completed

    With hot observable

    source := ro.Interval(100 * time.Millisecond)
    obs := ro.Pipe(
    source,
    ro.Take[int64](5),
    ro.ToMapWithValue(
    func(i int64) int { return int(i) },
    func(i int64) string { return fmt.Sprintf("item-%d", i) },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[int]string]())
    time.Sleep(700 * time.Millisecond)
    sub.Unsubscribe()

    // Next: map[0:item-0 1:item-1 2:item-2 3:item-3 4:item-4]
    // Completed

    With filtered data

    obs := ro.Pipe(
    ro.Range(1, 20),
    ro.Filter(func(i int) bool {
    return i%3 == 0 // Only multiples of 3
    }),
    ro.ToMapWithValue(
    func(i int) int { return i },
    func(i int) string { return fmt.Sprintf("multiple-%d", i) },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[int]string]())
    defer sub.Unsubscribe()

    // Next: map[3:multiple-3 6:multiple-6 9:multiple-9 12:multiple-12 15:multiple-15 18:multiple-18]
    // Completed

    With async operations

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just("user1", "user2", "user3"),
    MapAsync(func(userID string) Observable[struct {
    ID string
    Data string
    }] {
    return Defer(func() Observable[struct {
    ID string
    Data string
    }] {
    time.Sleep(50 * time.Millisecond)
    return ro.Just(struct {
    ID string
    Data string
    }{
    ID: userID,
    Data: "data_for_" + userID,
    })
    })
    }, 2),
    ),
    ro.ToMapWithValue(
    func(user struct {
    ID string
    Data string
    }) string { return user.ID },
    func(user struct {
    ID string
    Data string
    }) string { return user.Data },
    ),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[string]string]())
    time.Sleep(300 * time.Millisecond)
    defer sub.Unsubscribe()

    // Next: map[user1:data_for_user1 user2:data_for_user2 user3:data_for_user3]
    // Completed

    With real-time data collection

    // Simulate sensor data collection by sensor ID
    sensorReadings := ro.Interval(500 * time.Millisecond)
    obs := ro.Pipe(
    sensorReadings,
    ro.Take[int64](10),
    ro.Map(func(timestamp int64) struct {
    SensorID string
    Value float64
    Time int64
    } {
    sensors := []string{"temp-01", "temp-02", "humidity-01"}
    sensor := sensors[int(timestamp)%len(sensors)]
    return struct {
    SensorID string
    Value float64
    Time int64
    }{
    SensorID: sensor,
    Value: 20.0 + rand.Float64()*15,
    Time: time.Now().Unix(),
    }
    }),
    ro.ToMapWithValue(
    func(reading struct {
    SensorID string
    Value float64
    Time int64
    }) string { return reading.SensorID },
    func(reading struct {
    SensorID string
    Value float64
    Time int64
    }) float64 { return reading.Value },
    ),
    )

    sub := obs.Subscribe(ro.OnNext(func(readings map[string]float64) {
    fmt.Printf("Latest sensor readings: %v\n", readings)
    }))
    time.Sleep(6 * time.Second)
    sub.Unsubscribe()
    Prototypes:
    func ToMap[T any, K comparable, V any](project func(item T) (K, V))
    func ToMapWithContext[T any, K comparable, V any](project func(ctx context.Context, item T) (K, V))
    func ToMapI[T any, K comparable, V any](mapper func(item T, index int64) (K, V))
    func ToMapIWithContext[T any, K comparable, V any](mapper func(ctx context.Context, item T, index int64) (K, V))
  • Converts the source Observable to a channel, emitting all values from the Observable through the channel.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.ToChannel[int](0), // Unbuffered channel
    )

    sub := obs.Subscribe(ro.PrintObserver[<-chan int]())
    defer sub.Unsubscribe()

    // Next: 0xc0000a2000 (channel address)
    // Completed

    // You would typically consume the channel like this:
    // channel := <-sub.Next()
    // for value := range channel {
    // fmt.Println("Received:", value)
    // }

    With buffered channel

    obs := ro.Pipe(
    ro.Just("hello", "world", "reactive"),
    ro.ToChannel[string](5), // Buffered channel with capacity 5
    )

    sub := obs.Subscribe(ro.OnNext(func(ch <-chan string) {
    fmt.Println("Channel received, consuming values:")
    for value := range ch {
    fmt.Printf(" %s\n", value)
    }
    fmt.Println("Channel closed")
    }))
    defer sub.Unsubscribe()

    // Channel received, consuming values:
    // hello
    // world
    // reactive
    // Channel closed

    ToChannelWithContext

    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    obs := ro.Pipe(
    ro.Interval(500*time.Millisecond),
    ro.ToChannelWithContext[int64](ctx, 3),
    )

    sub := obs.Subscribe(ro.OnNext(func(ch <-chan int64) {
    fmt.Println("Reading from channel with context:")
    for value := range ch {
    fmt.Printf(" Value: %d\n", value)
    }
    fmt.Println("Channel closed (context cancelled or source completed)")
    }))
    time.Sleep(3 * time.Second)
    sub.Unsubscribe()

    // Reading from channel with context:
    // Value: 0
    // Value: 1
    // Value: 2
    // Value: 3
    // Channel closed (context cancelled or source completed)

    With hot observable and multiple consumers

    // Create a shared channel from a hot observable
    source := ro.Interval(200 * time.Millisecond)
    channelObs := ro.Pipe(
    source,
    ro.Take[int64](10),
    ro.ToChannel[int64](5),
    )

    sub := channelObs.Subscribe(ro.PrintObserver[<-chan int64]())
    defer sub.Unsubscribe()

    // Get the channel
    var resultChan <-chan int64
    sub = channelObs.Subscribe(ro.OnNext(func(ch <-chan int64) {
    resultChan = ch
    }))
    time.Sleep(100 * time.Millisecond)
    sub.Unsubscribe()

    if resultChan != nil {
    // Multiple goroutines can consume from the same channel
    var wg sync.WaitGroup

    // Consumer 1
    wg.Add(1)
    go func() {
    defer wg.Done()
    for i := 0; i < 5; i++ {
    value, ok := <-resultChan
    if !ok {
    break
    }
    fmt.Printf("Consumer 1: %d\n", value)
    }
    }()

    // Consumer 2
    wg.Add(1)
    go func() {
    defer wg.Done()
    for i := 0; i < 5; i++ {
    value, ok := <-resultChan
    if !ok {
    break
    }
    fmt.Printf("Consumer 2: %d\n", value)
    }
    }()

    wg.Wait()
    }

    With error handling

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, fmt.Errorf("error on 3")
    }
    return i, nil
    }),
    ),
    ro.ToChannel[int](3),
    )

    sub := obs.Subscribe(ro.NewObserver(
    func(ch <-chan int) {
    fmt.Println("Channel received, consuming values:")
    for value := range ch {
    fmt.Printf(" %d\n", value)
    }
    fmt.Println("Channel closed")
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Complete")
    },
    ))
    defer sub.Unsubscribe()

    // Error: error on 3
    // (No channel emitted due to error)

    With finite stream and timeout

    obs := ro.Pipe(
    ro.Just("data1", "data2", "data3", "data4", "data5"),
    ro.ToChannel[string](2),
    )

    sub := obs.Subscribe(ro.OnNext(func(ch <-chan string) {
    fmt.Println("Processing channel with timeout:")

    timeout := time.After(3 * time.Second)
    for {
    select {
    case value, ok := <-ch:
    if !ok {
    fmt.Println("Channel closed normally")
    return
    }
    fmt.Printf(" Received: %s\n", value)
    case <-timeout:
    fmt.Println("Timeout reached")
    return
    }
    }
    }))
    defer sub.Unsubscribe()

    With complex data transformation

    type Event struct {
    ID string
    Type string
    Payload interface{}
    Time time.Time
    }

    obs := ro.Pipe(
    ro.Just(
    Event{ID: "1", Type: "click", Payload: "button", Time: time.Now()},
    Event{ID: "2", Type: "scroll", Payload: 100, Time: time.Now()},
    Event{ID: "3", Type: "click", Payload: "link", Time: time.Now()},
    ),
    ro.ToChannel[Event](3),
    )

    sub := obs.Subscribe(ro.OnNext(func(ch <-chan Event) {
    fmt.Println("Processing events from channel:")
    for event := range ch {
    fmt.Printf(" Event %s (%s): %v\n", event.ID, event.Type, event.Payload)
    }
    fmt.Println("All events processed")
    }))
    defer sub.Unsubscribe()

    With backpressure control

    // Fast producer with slow consumer through buffered channel
    fastProducer := ro.Interval(10 * time.Millisecond) // 100 values/second
    obs := ro.Pipe(
    fastProducer,
    ro.Take[int64](50),
    ro.ToChannel[int64](10), // Buffer of 10 provides backpressure
    )

    sub := obs.Subscribe(ro.OnNext(func(ch <-chan int64) {
    fmt.Println("Processing with backpressure:")
    processed := 0
    for value := range ch {
    // Simulate slow processing
    time.Sleep(50 * time.Millisecond)
    processed++
    if processed%10 == 0 {
    fmt.Printf(" Processed %d values (buffer managing backpressure)\n", processed)
    }
    }
    fmt.Printf("Total processed: %d\n", processed)
    }))
    time.Sleep(3 * time.Second)
    sub.Unsubscribe()

    With context cancellation

    ctx, cancel := context.WithCancel(context.Background())

    obs := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.ToChannelWithContext[int64](ctx, 5),
    )

    sub := obs.Subscribe(ro.OnNext(func(ch <-chan int64) {
    fmt.Println("Reading from cancellable channel:")
    count := 0

    // Cancel after receiving 3 values
    go func() {
    time.Sleep(350 * time.Millisecond)
    fmt.Println("Cancelling context...")
    cancel()
    }()

    for value := range ch {
    count++
    fmt.Printf(" Value %d: %d\n", count, value)
    }
    fmt.Printf("Channel closed after %d values\n", count)
    }))
    time.Sleep(1 * time.Second)
    sub.Unsubscribe()

    // Reading from cancellable channel:
    // Value 1: 0
    // Value 2: 1
    // Value 3: 2
    // Cancelling context...
    // Channel closed after 3 values

    With real-time data streaming

    // Simulate real-time sensor data streaming
    sensorData := func() Observable[struct {
    SensorID string
    Temperature float64
    Humidity float64
    Timestamp time.Time
    }] {
    return ro.Pipe(
    ro.Interval(1 * time.Second),
    ro.Map(func(_ int64) struct {
    SensorID string
    Temperature float64
    Humidity float64
    Timestamp time.Time
    } {
    return struct {
    SensorID string
    Temperature float64
    Humidity float64
    Timestamp time.Time
    }{
    SensorID: "sensor-01",
    Temperature: 20.0 + rand.Float64()*10,
    Humidity: 40.0 + rand.Float64()*20,
    Timestamp: time.Now(),
    }
    }),
    )
    }

    obs := ro.Pipe(
    sensorData(),
    ro.Take[struct {
    SensorID string
    Temperature float64
    Humidity float64
    Timestamp time.Time
    }](5),
    ro.ToChannel[struct {
    SensorID string
    Temperature float64
    Humidity float64
    Timestamp time.Time
    }](1),
    )

    sub := obs.Subscribe(ro.OnNext(
    func(ch <-chan struct {
    SensorID string
    Temperature float64
    Humidity float64
    Timestamp time.Time
    }) {
    fmt.Println("Real-time sensor data streaming:")
    for reading := range ch {
    fmt.Printf(" [%s] %s: %.1fยฐC, %.1f%%\n",
    reading.Timestamp.Format("15:04:05"),
    reading.SensorID,
    reading.Temperature,
    reading.Humidity)
    }
    fmt.Println("Stream ended")
    },
    ))
    time.Sleep(6 * time.Second)
    sub.Unsubscribe()
    Similar:
    Prototypes:
    func ToChannel[T any](bufferSize int)
    func ToChannelWithContext[T any](ctx context.Context, bufferSize int)