Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Transformation operatorsโ€‹

This page lists all transformation operations, available in the core package of ro.

  • Applies a given project function to each item emitted by the source Observable, and emits the results of these function applications as an Observable sequence.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Map(func(i int) string {
    return fmt.Sprintf("Item-%d", i)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: Item-1
    // Next: Item-2
    // Next: Item-3
    // Next: Item-4
    // Next: Item-5
    // Completed

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.MapWithContext(func(ctx context.Context, i int) (context.Context, string) {
    return ctx, fmt.Sprintf("Item-%d", i)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: Item-1
    // Next: Item-2
    // Next: Item-3
    // Next: Item-4
    // Next: Item-5
    // Completed

    With index

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.MapI(func(i int, index int64) string {
    return fmt.Sprintf("Item-%d-Index-%d", i, index)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: Item-1-Index-0
    // Next: Item-2-Index-1
    // Next: Item-3-Index-2
    // Next: Item-4-Index-3
    // Next: Item-5-Index-4
    // Completed

    With index and context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.MapIWithContext(func(ctx context.Context, i int, index int64) (context.Context, string) {
    return ctx, fmt.Sprintf("Item-%d-Index-%d", i, index)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: Item-1-Index-0
    // Next: Item-2-Index-1
    // Next: Item-3-Index-2
    // Next: Item-4-Index-3
    // Next: Item-5-Index-4
    // Completed
    Prototypes:
    func Map[T any, R any](project func(item T) R)
    func MapWithContext[T any, R any](project func(ctx context.Context, item T) (context.Context, R))
    func MapI[T any, R any](project func(item T, index int64) R)
    func MapIWithContext[T any, R any](project func(ctx context.Context, item T, index int64) (context.Context, R))
  • Applies a given project function to each item emitted by the source Observable, where the project function returns an Observable, and then flattens the resulting Observables into a single Observable.

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.FlatMap(func(i int) Observable[int] {
    return ro.Just(i*10, i*10+1, i*10+2)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Order may vary due to interleaving
    // Next: 10
    // Next: 20
    // Next: 11
    // Next: 30
    // Next: 21
    // Next: 31
    // Next: 12
    // Next: 22
    // Next: 32
    // Completed

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.FlatMapWithContext(func(ctx context.Context, i int) Observable[int] {
    return ro.Just(i*10, i*10+1, i*10+2)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Order may vary due to interleaving
    // Next: 10
    // Next: 20
    // Next: 11
    // Next: 30
    // Next: 21
    // Next: 31
    // Next: 12
    // Next: 22
    // Next: 32
    // Completed

    With index

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.FlatMapI(func(i int, index int64) Observable[int] {
    return ro.Just(i*10+int(index))
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Order may vary due to interleaving
    // Next: 10
    // Next: 20
    // Next: 11
    // Next: 30
    // Next: 21
    // Next: 31
    // Completed

    With index and context

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.FlatMapIWithContext(func(ctx context.Context, i int, index int64) Observable[int] {
    return ro.Just(i*10+int(index))
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Order may vary due to interleaving
    // Next: 10
    // Next: 20
    // Next: 11
    // Next: 30
    // Next: 21
    // Next: 31
    // Completed

    Practical example: Converting single items to multiple

    obs := ro.Pipe(
    ro.Just("hello", "world"),
    ro.FlatMap(func(s string) Observable[rune] {
    runes := []rune(s)
    return ro.Just(runes...)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[rune]())
    defer sub.Unsubscribe()

    // Order may vary due to interleaving
    // Next: 'h'
    // Next: 'e'
    // Next: 'l'
    // Next: 'l'
    // Next: 'o'
    // Next: 'w'
    // Next: 'o'
    // Next: 'r'
    // Next: 'l'
    // Next: 'd'
    // Completed
    Prototypes:
    func FlatMap[T any, R any](project func(item T) Observable[R])
    func FlatMapWithContext[T any, R any](project func(ctx context.Context, item T) Observable[R])
    func FlatMapI[T any, R any](project func(item T, index int64) Observable[R])
    func FlatMapIWithContext[T any, R any](project func(ctx context.Context, item T, index int64) Observable[R])
  • Applies an accumulator function over an Observable sequence, and returns each intermediate result, with an optional seed value.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Scan(func(acc int, item int) int {
    return acc + item
    }, 0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 3
    // Next: 6
    // Next: 10
    // Next: 15
    // Completed

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.ScanWithContext(func(ctx context.Context, acc int, item int) (context.Context, int) {
    return ctx, acc + item
    }, 0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 3
    // Next: 6
    // Next: 10
    // Next: 15
    // Completed

    With index

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.ScanI(func(acc int, item int, index int64) int {
    return acc + (item * int(index+1)) // Multiply by position
    }, 0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1 (0 + 1*1)
    // Next: 5 (1 + 2*2)
    // Next: 14 (5 + 3*3)
    // Next: 30 (14 + 4*4)
    // Next: 55 (30 + 5*5)
    // Completed

    With index and context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.ScanIWithContext(func(ctx context.Context, acc int, item int, index int64) (context.Context, int) {
    return ctx, acc + (item * int(index+1))
    }, 0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1 (0 + 1*1)
    // Next: 5 (1 + 2*2)
    // Next: 14 (5 + 3*3)
    // Next: 30 (14 + 4*4)
    // Next: 55 (30 + 5*5)
    // Completed

    Practical example: Building a string

    obs := ro.Pipe(
    ro.Just("hello", "world", "rx"),
    Scan(func(acc string, item string) string {
    if acc == "" {
    return item
    }
    return acc + " " + item
    }, ""),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "hello"
    // Next: "hello world"
    // Next: "hello world rx"
    // Completed
    Prototypes:
    func Scan[T any, R any](reduce func(accumulator R, item T) R, seed R)
    func ScanWithContext[T any, R any](reduce func(ctx context.Context, accumulator R, item T) (context.Context, R), seed R)
    func ScanI[T any, R any](reduce func(accumulator R, item T, index int64) R, seed R)
    func ScanIWithContext[T any, R any](reduce func(ctx context.Context, accumulator R, item T, index int64) (context.Context, R), seed R)
  • BufferWhenโ€‹

    Buffers the source Observable values until a boundary Observable emits an item, then emits the buffered values as an array.

    // Create boundary observable that emits every 3 items
    boundary := ro.Pipe(
    ro.Interval(200*time.Millisecond),
    ro.Take[int64](3),
    )

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.BufferWhen[int64, int64](boundary),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int64]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Buffers when boundary observable emits
    // Next: [0, 1] (after first boundary)
    // Next: [2, 3, 4] (after second boundary)
    // Next: [5, 6, 7] (after third boundary)

    With custom boundary

    // Create boundary based on clicks or events
    clickBoundary := ro.Pipe(
    ro.Interval(500*time.Millisecond),
    ro.Take[int64](2),
    )

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5, 6, 7, 8),
    ro.BufferWhen[int, int64](clickBoundary),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [1, 2, 3, 4] (after first boundary at 500ms)
    // Next: [5, 6, 7, 8] (after second boundary at 1000ms)
    // Completed

    Edge case: Empty source

    boundary := ro.Just("trigger")
    obs := ro.Pipe(
    ro.Empty[int](),
    ro.BufferWhen[int, string](boundary),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [] (empty buffer when boundary emits)
    // Completed
    Prototype:
    func BufferWhen[T any, B any](boundary Observable[B])
  • BufferWithCountโ€‹

    Buffers the source Observable values into non-overlapping buffers of a specific size, and emits these buffers as arrays.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
    ro.BufferWithCount[int](3),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [1, 2, 3]
    // Next: [4, 5, 6]
    // Next: [7, 8, 9]
    // Next: [10]
    // Completed

    Practical example: Batch processing

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
    ro.BufferWithCount[int](4),
    ro.Map(func(batch []int) int {
    // Process batch of items
    sum := 0
    for _, item := range batch {
    sum += item
    }
    return sum
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 10 (1+2+3+4)
    // Next: 22 (5+6+7+8)
    // Next: 19 (9+10)
    // Completed

    Edge case: Single item buffer

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.BufferWithCount[int](1),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [1]
    // Next: [2]
    // Next: [3]
    // Next: [4]
    // Next: [5]
    // Completed

    Edge case: Buffer larger than source

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.BufferWithCount[int](10),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [1, 2, 3]
    // Completed
    Prototype:
    func BufferWithCount[T any](size int)
  • BufferWithTimeโ€‹

    Buffers the source Observable values for a specified time duration, then emits the buffered values as an array.

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.BufferWithTime[int64](300*time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int64]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: [0, 1, 2] (after 300ms)
    // Next: [3, 4, 5] (after 600ms)
    // Next: [6, 7, 8] (after 900ms)

    With sparse emissions

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Take[int64](10),
    // Add delay to make emissions sparse
    ro.Map(func(i int64) int64 {
    time.Sleep(50 * time.Millisecond)
    return i
    }),
    ro.BufferWithTime[int64](250*time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int64]())
    time.Sleep(2000 * time.Millisecond)
    sub.Unsubscribe()

    // Buffers based on time, not item count
    // Next: [0] (after 250ms)
    // Next: [1, 2] (after 500ms)
    // Next: [3, 4] (after 750ms)
    // Next: [5] (after 1000ms)

    Practical example: Debounced batching

    obs := ro.Pipe(
    ro.Just("A", "B", "C", "D", "E"),
    // Simulate rapid events
    ro.Map(func(s string) string {
    time.Sleep(10 * time.Millisecond)
    return s
    }),
    ro.BufferWithTime[string](100*time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]string]())
    defer sub.Unsubscribe()

    // Depending on timing, might get:
    // Next: ["A", "B", "C", "D", "E"] (all within 100ms)
    // Or split into smaller batches based on timing
    Prototype:
    func BufferWithTime[T any](duration time.Duration)
  • BufferWithTimeOrCountโ€‹

    Buffers the source Observable values until either the buffer reaches the specified size or the specified time duration elapses, whichever occurs first.

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.BufferWithTimeOrCount[int64](5, 300*time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int64]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Buffers when either condition is met
    // Next: [0, 1, 2] (after 300ms - time limit reached)
    // Next: [3, 4, 5, 6, 7] (count limit reached)
    // Next: [8, 9, 10] (time limit reached)

    Time-limited scenario

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.BufferWithTimeOrCount[int64](10, 200*time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int64]())
    time.Sleep(800 * time.Millisecond)
    sub.Unsubscribe()

    // Time limit reached before count
    // Next: [0, 1] (after 200ms)
    // Next: [2, 3] (after 400ms)
    // Next: [4, 5] (after 600ms)

    Count-limited scenario

    obs := ro.Pipe(
    ro.Interval(50*time.Millisecond), // Fast emissions
    ro.BufferWithTimeOrCount[int64](3, 1000*time.Millisecond),
    )

    sub := obs.Subscribe(ro.PrintObserver[[]int64]())
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()

    // Count limit reached before time
    // Next: [0, 1, 2] (count reached after 150ms)
    // Next: [3, 4, 5] (count reached after 300ms)
    // Next: [6, 7, 8] (count reached after 450ms)

    Practical example: Batching with safety limits

    obs := ro.Pipe(
    // Simulate user events
    ro.Interval(30*time.Millisecond),
    ro.Take[int64](20),
    ro.BufferWithTimeOrCount[int64](5, 200*time.Millisecond),
    ro.Map(func(batch []int64) int {
    return len(batch) // Show batch sizes
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Outputs batch sizes based on either count (5) or time (200ms)
    // Prevents memory buildup while ensuring timely processing
    Prototype:
    func BufferWithTimeOrCount[T any](size int, duration time.Duration)
  • WindowWhenโ€‹

    Branches out the source Observable values as a nested Observable whenever the boundary Observable emits an item, and a new window opens when the boundary Observable emits an item.

    boundary := ro.Interval(2000*time.Millisecond)

    obs := ro.Pipe(
    ro.Interval(500*time.Millisecond),
    ro.WindowWhen(boundary),
    ro.Take(3),
    )

    sub := obs.Subscribe(ro.NewObserver[ro.Observable[int64]](
    func(window ro.Observable[int64]) {
    fmt.Println("New window opened")

    windowSub := window.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(2500 * time.Millisecond)
    windowSub.Unsubscribe()
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("Completed")
    },
    ))
    time.Sleep(7000 * time.Millisecond)
    sub.Unsubscribe()

    // New window opened
    // Next: 0, 1, 2, 3 (emitted over 2 seconds)
    // New window opened
    // Next: 4, 5, 6, 7 (emitted over 2 seconds)
    // New window opened
    // Next: 8, 9, 10, 11 (emitted over 2 seconds)
    // Completed

    With time-based boundary

    boundary := ro.Timer(1000*time.Millisecond) // Window closes after 1 second

    obs := ro.Pipe(
    ro.Just("a", "b", "c", "d", "e", "f"),
    ro.WindowWhen(boundary),
    )

    sub := obs.Subscribe(ro.NewObserver[ro.Observable[string]](
    func(window ro.Observable[string]) {
    fmt.Println("New window:")
    windowSub := window.Subscribe(ro.PrintObserver[string]())
    windowSub.Unsubscribe()
    },
    ))
    defer sub.Unsubscribe()

    // New window:
    // Next: a, b, c, d, e, f (all items before boundary)
    // Completed

    With count-based boundary

    boundary := ro.Pipe(
    ro.Interval(500*time.Millisecond),
    ro.Map(func(_ int64) string { return "close" }),
    )

    obs := ro.Pipe(
    ro.Just("x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8"),
    ro.WindowWhen(boundary),
    )

    sub := obs.Subscribe(ro.NewObserver[ro.Observable[string]](
    func(window ro.Observable[string]) {
    fmt.Println("Window opened:")
    windowSub := window.Subscribe(ro.PrintObserver[string]())
    time.Sleep(200 * time.Millisecond)
    windowSub.Unsubscribe()
    },
    ))
    defer sub.Unsubscribe()

    // Window opened:
    // Next: x1 (first item before first boundary)
    // Window opened:
    // Next: x2 (second item before second boundary)
    // Window opened:
    // Next: x3 (third item before third boundary)
    // ... and so on

    With complex boundary conditions

    // Boundary emits every 3 source items
    counter := 0
    boundary := ro.Pipe(
    ro.Just("trigger"),
    ro.Map(func(_ string) int {
    counter++
    return counter
    }),
    ro.Filter(func(c int) bool { return c%3 == 0 }),
    )

    obs := ro.Pipe(
    ro.Just("a", "b", "c", "d", "e", "f", "g", "h", "i"),
    ro.WindowWhen(boundary),
    )

    sub := obs.Subscribe(ro.NewObserver[ro.Observable[string]](
    func(window ro.Observable[string]) {
    fmt.Println("Window:")
    windowSub := window.Subscribe(ro.PrintObserver[string]())
    windowSub.Unsubscribe()
    },
    ))
    defer sub.Unsubscribe()

    // Window:
    // Next: a, b, c (first 3 items)
    // Window:
    // Next: d, e, f (next 3 items)
    // Window:
    // Next: g, h, i (last 3 items)
    // Completed

    With error in source

    boundary := ro.Timer(1000*time.Millisecond)

    obs := ro.Pipe(
    ro.Pipe(
    ro.Just("will error"),
    ro.Throw[string](errors.New("source error")),
    ),
    ro.WindowWhen(boundary),
    )

    sub := obs.Subscribe(ro.NewObserver[ro.Observable[string]](
    func(window ro.Observable[string]) {
    fmt.Println("Window opened")
    windowSub := window.Subscribe(ro.PrintObserver[string]())
    windowSub.Unsubscribe()
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    ))
    defer sub.Unsubscribe()

    // Error: source error
    Prototype:
    func WindowWhen[T any, B any](boundary Observable[B])
  • Transforms items emitted by an observable sequence with a function that can return errors.

    obs := ro.Pipe(
    ro.Just("1", "2", "three", "4"),
    ro.MapErr(func(s string) (int, error) {
    return strconv.Atoi(s)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Error: strconv.Atoi: parsing "three": invalid syntax

    With context

    obs := ro.Pipe(
    ro.Just("file1.txt", "file2.txt", "invalid.txt"),
    ro.MapErrWithContext(func(ctx context.Context, filename string) (string, error) {
    if !strings.HasSuffix(filename, ".txt") {
    return "", fmt.Errorf("invalid file extension: %s", filename)
    }
    return fmt.Sprintf("processed: %s", filename), nil
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: processed: file1.txt
    // Next: processed: file2.txt
    // Error: invalid file extension: invalid.txt

    With index

    obs := ro.Pipe(
    ro.Just("apple", "banana", "cherry"),
    ro.MapErrI(func(fruit string, index int64) (string, error) {
    if index == 1 {
    return "", fmt.Errorf("skipping item at index %d", index)
    }
    return strings.ToUpper(fruit), nil
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: APPLE
    // Error: skipping item at index 1

    With index and context

    obs := ro.Pipe(
    ro.Just("test1", "test2", "test3"),
    ro.MapErrIWithContext(func(ctx context.Context, item string, index int64) (int, error) {
    if index > 1 {
    return 0, fmt.Errorf("index %d out of range", index)
    }
    return len(item), nil
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 5
    // Next: 5
    // Error: index 2 out of range
    Similar:
    Prototypes:
    func MapErr[T any, R any](project func(item T) (R, error))
    func MapErrWithContext[T any, R any](project func(ctx context.Context, item T) (R, error))
    func MapErrI[T any, R any](project func(item T, index int64) (R, error))
    func MapErrIWithContext[T any, R any](project func(ctx context.Context, item T, index int64) (R, error))
  • Groups the items emitted by an observable sequence according to a specified key selector function.

    obs := ro.Pipe(
    ro.Just("apple", "banana", "avocado", "blueberry", "cherry"),
    ro.GroupBy(func(fruit string) string {
    return string(fruit[0]) // Group by first letter
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[ro.Observable[string]]())
    defer sub.Unsubscribe()

    // Each emission is an observable of grouped items
    // Need to subscribe to each group observable

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5, 6),
    ro.GroupByWithContext(func(ctx context.Context, n int) string {
    if n%2 == 0 {
    return "even"
    }
    return "odd"
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[ro.Observable[int]]())
    defer sub.Unsubscribe()

    With index

    obs := ro.Pipe(
    ro.Just("a", "b", "c", "d", "e", "f"),
    ro.GroupByI(func(item string, index int64) int {
    return int(index / 2) // Group by pairs
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[ro.Observable[string]]())
    defer sub.Unsubscribe()

    With index and context

    obs := ro.Pipe(
    ro.Just("file1.txt", "image1.jpg", "file2.txt", "image2.jpg"),
    ro.GroupByIWithContext(func(ctx context.Context, filename string, index int64) string {
    if strings.HasSuffix(filename, ".jpg") {
    return "images"
    }
    return "documents"
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[ro.Observable[string]]())
    defer sub.Unsubscribe()

    Processing groups example

    // Source observable
    source := ro.Just("apple", "apricot", "banana", "blueberry", "cherry")

    // Group by first letter
    groupedObs := ro.Pipe(
    source,
    ro.GroupBy(func(fruit string) string {
    return string(fruit[0])
    }),
    )

    // Subscribe to groups
    groupedSub := groupedObs.Subscribe(ro.NewObserver[ro.Observable[string]](
    func(group ro.Observable[string]) {
    // Subscribe to each group
    groupSub := group.Subscribe(ro.NewObserver[string](
    func(item string) {
    fmt.Printf("Group item: %s\n", item)
    },
    func(err error) {
    fmt.Printf("Group error: %v\n", err)
    },
    func() {
    fmt.Println("Group completed")
    },
    ))
    defer groupSub.Unsubscribe()
    },
    func(err error) {
    fmt.Printf("Error: %v\n", err)
    },
    func() {
    fmt.Println("All groups completed")
    },
    ))
    defer groupedSub.Unsubscribe()
    Prototypes:
    func GroupBy[T any, K comparable](keySelector func(item T) K)
    func GroupByWithContext[T any, K comparable](keySelector func(ctx context.Context, item T) K)
    func GroupByI[T any, K comparable](keySelector func(item T, index int64) K)
    func GroupByIWithContext[T any, K comparable](keySelector func(ctx context.Context, item T, index int64) K)