This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Transformation operatorsโ
This page lists all transformation operations, available in the core package of ro.
Mapโ
Applies a given project function to each item emitted by the source Observable, and emits the results of these function applications as an Observable sequence.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Map(func(i int) string {
return fmt.Sprintf("Item-%d", i)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: Item-1
// Next: Item-2
// Next: Item-3
// Next: Item-4
// Next: Item-5
// CompletedWith context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.MapWithContext(func(ctx context.Context, i int) (context.Context, string) {
return ctx, fmt.Sprintf("Item-%d", i)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: Item-1
// Next: Item-2
// Next: Item-3
// Next: Item-4
// Next: Item-5
// CompletedWith index
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.MapI(func(i int, index int64) string {
return fmt.Sprintf("Item-%d-Index-%d", i, index)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: Item-1-Index-0
// Next: Item-2-Index-1
// Next: Item-3-Index-2
// Next: Item-4-Index-3
// Next: Item-5-Index-4
// CompletedWith index and context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.MapIWithContext(func(ctx context.Context, i int, index int64) (context.Context, string) {
return ctx, fmt.Sprintf("Item-%d-Index-%d", i, index)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: Item-1-Index-0
// Next: Item-2-Index-1
// Next: Item-3-Index-2
// Next: Item-4-Index-3
// Next: Item-5-Index-4
// CompletedPrototypes:func Map[T any, R any](project func(item T) R)
func MapWithContext[T any, R any](project func(ctx context.Context, item T) (context.Context, R))
func MapI[T any, R any](project func(item T, index int64) R)
func MapIWithContext[T any, R any](project func(ctx context.Context, item T, index int64) (context.Context, R))FlatMapโ
Applies a given project function to each item emitted by the source Observable, where the project function returns an Observable, and then flattens the resulting Observables into a single Observable.
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.FlatMap(func(i int) Observable[int] {
return ro.Just(i*10, i*10+1, i*10+2)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Order may vary due to interleaving
// Next: 10
// Next: 20
// Next: 11
// Next: 30
// Next: 21
// Next: 31
// Next: 12
// Next: 22
// Next: 32
// CompletedWith context
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.FlatMapWithContext(func(ctx context.Context, i int) Observable[int] {
return ro.Just(i*10, i*10+1, i*10+2)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Order may vary due to interleaving
// Next: 10
// Next: 20
// Next: 11
// Next: 30
// Next: 21
// Next: 31
// Next: 12
// Next: 22
// Next: 32
// CompletedWith index
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.FlatMapI(func(i int, index int64) Observable[int] {
return ro.Just(i*10+int(index))
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Order may vary due to interleaving
// Next: 10
// Next: 20
// Next: 11
// Next: 30
// Next: 21
// Next: 31
// CompletedWith index and context
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.FlatMapIWithContext(func(ctx context.Context, i int, index int64) Observable[int] {
return ro.Just(i*10+int(index))
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Order may vary due to interleaving
// Next: 10
// Next: 20
// Next: 11
// Next: 30
// Next: 21
// Next: 31
// CompletedPractical example: Converting single items to multiple
obs := ro.Pipe(
ro.Just("hello", "world"),
ro.FlatMap(func(s string) Observable[rune] {
runes := []rune(s)
return ro.Just(runes...)
}),
)
sub := obs.Subscribe(ro.PrintObserver[rune]())
defer sub.Unsubscribe()
// Order may vary due to interleaving
// Next: 'h'
// Next: 'e'
// Next: 'l'
// Next: 'l'
// Next: 'o'
// Next: 'w'
// Next: 'o'
// Next: 'r'
// Next: 'l'
// Next: 'd'
// CompletedSimilar:Prototypes:func FlatMap[T any, R any](project func(item T) Observable[R])
func FlatMapWithContext[T any, R any](project func(ctx context.Context, item T) Observable[R])
func FlatMapI[T any, R any](project func(item T, index int64) Observable[R])
func FlatMapIWithContext[T any, R any](project func(ctx context.Context, item T, index int64) Observable[R])Scanโ
Applies an accumulator function over an Observable sequence, and returns each intermediate result, with an optional seed value.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Scan(func(acc int, item int) int {
return acc + item
}, 0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 3
// Next: 6
// Next: 10
// Next: 15
// CompletedWith context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.ScanWithContext(func(ctx context.Context, acc int, item int) (context.Context, int) {
return ctx, acc + item
}, 0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 3
// Next: 6
// Next: 10
// Next: 15
// CompletedWith index
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.ScanI(func(acc int, item int, index int64) int {
return acc + (item * int(index+1)) // Multiply by position
}, 0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1 (0 + 1*1)
// Next: 5 (1 + 2*2)
// Next: 14 (5 + 3*3)
// Next: 30 (14 + 4*4)
// Next: 55 (30 + 5*5)
// CompletedWith index and context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.ScanIWithContext(func(ctx context.Context, acc int, item int, index int64) (context.Context, int) {
return ctx, acc + (item * int(index+1))
}, 0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1 (0 + 1*1)
// Next: 5 (1 + 2*2)
// Next: 14 (5 + 3*3)
// Next: 30 (14 + 4*4)
// Next: 55 (30 + 5*5)
// CompletedPractical example: Building a string
obs := ro.Pipe(
ro.Just("hello", "world", "rx"),
Scan(func(acc string, item string) string {
if acc == "" {
return item
}
return acc + " " + item
}, ""),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "hello"
// Next: "hello world"
// Next: "hello world rx"
// CompletedSimilar:Prototypes:func Scan[T any, R any](reduce func(accumulator R, item T) R, seed R)
func ScanWithContext[T any, R any](reduce func(ctx context.Context, accumulator R, item T) (context.Context, R), seed R)
func ScanI[T any, R any](reduce func(accumulator R, item T, index int64) R, seed R)
func ScanIWithContext[T any, R any](reduce func(ctx context.Context, accumulator R, item T, index int64) (context.Context, R), seed R)BufferWhenโ
Buffers the source Observable values until a boundary Observable emits an item, then emits the buffered values as an array.
// Create boundary observable that emits every 3 items
boundary := ro.Pipe(
ro.Interval(200*time.Millisecond),
ro.Take[int64](3),
)
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.BufferWhen[int64, int64](boundary),
)
sub := obs.Subscribe(ro.PrintObserver[[]int64]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Buffers when boundary observable emits
// Next: [0, 1] (after first boundary)
// Next: [2, 3, 4] (after second boundary)
// Next: [5, 6, 7] (after third boundary)With custom boundary
// Create boundary based on clicks or events
clickBoundary := ro.Pipe(
ro.Interval(500*time.Millisecond),
ro.Take[int64](2),
)
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5, 6, 7, 8),
ro.BufferWhen[int, int64](clickBoundary),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [1, 2, 3, 4] (after first boundary at 500ms)
// Next: [5, 6, 7, 8] (after second boundary at 1000ms)
// CompletedEdge case: Empty source
boundary := ro.Just("trigger")
obs := ro.Pipe(
ro.Empty[int](),
ro.BufferWhen[int, string](boundary),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [] (empty buffer when boundary emits)
// CompletedPrototype:func BufferWhen[T any, B any](boundary Observable[B])
BufferWithCountโ
Buffers the source Observable values into non-overlapping buffers of a specific size, and emits these buffers as arrays.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
ro.BufferWithCount[int](3),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [1, 2, 3]
// Next: [4, 5, 6]
// Next: [7, 8, 9]
// Next: [10]
// CompletedPractical example: Batch processing
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
ro.BufferWithCount[int](4),
ro.Map(func(batch []int) int {
// Process batch of items
sum := 0
for _, item := range batch {
sum += item
}
return sum
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 10 (1+2+3+4)
// Next: 22 (5+6+7+8)
// Next: 19 (9+10)
// CompletedEdge case: Single item buffer
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.BufferWithCount[int](1),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [1]
// Next: [2]
// Next: [3]
// Next: [4]
// Next: [5]
// CompletedEdge case: Buffer larger than source
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.BufferWithCount[int](10),
)
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [1, 2, 3]
// CompletedPrototype:func BufferWithCount[T any](size int)
BufferWithTimeโ
Buffers the source Observable values for a specified time duration, then emits the buffered values as an array.
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.BufferWithTime[int64](300*time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[[]int64]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Next: [0, 1, 2] (after 300ms)
// Next: [3, 4, 5] (after 600ms)
// Next: [6, 7, 8] (after 900ms)With sparse emissions
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Take[int64](10),
// Add delay to make emissions sparse
ro.Map(func(i int64) int64 {
time.Sleep(50 * time.Millisecond)
return i
}),
ro.BufferWithTime[int64](250*time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[[]int64]())
time.Sleep(2000 * time.Millisecond)
sub.Unsubscribe()
// Buffers based on time, not item count
// Next: [0] (after 250ms)
// Next: [1, 2] (after 500ms)
// Next: [3, 4] (after 750ms)
// Next: [5] (after 1000ms)Practical example: Debounced batching
obs := ro.Pipe(
ro.Just("A", "B", "C", "D", "E"),
// Simulate rapid events
ro.Map(func(s string) string {
time.Sleep(10 * time.Millisecond)
return s
}),
ro.BufferWithTime[string](100*time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[[]string]())
defer sub.Unsubscribe()
// Depending on timing, might get:
// Next: ["A", "B", "C", "D", "E"] (all within 100ms)
// Or split into smaller batches based on timingPrototype:func BufferWithTime[T any](duration time.Duration)
BufferWithTimeOrCountโ
Buffers the source Observable values until either the buffer reaches the specified size or the specified time duration elapses, whichever occurs first.
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.BufferWithTimeOrCount[int64](5, 300*time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[[]int64]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Buffers when either condition is met
// Next: [0, 1, 2] (after 300ms - time limit reached)
// Next: [3, 4, 5, 6, 7] (count limit reached)
// Next: [8, 9, 10] (time limit reached)Time-limited scenario
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.BufferWithTimeOrCount[int64](10, 200*time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[[]int64]())
time.Sleep(800 * time.Millisecond)
sub.Unsubscribe()
// Time limit reached before count
// Next: [0, 1] (after 200ms)
// Next: [2, 3] (after 400ms)
// Next: [4, 5] (after 600ms)Count-limited scenario
obs := ro.Pipe(
ro.Interval(50*time.Millisecond), // Fast emissions
ro.BufferWithTimeOrCount[int64](3, 1000*time.Millisecond),
)
sub := obs.Subscribe(ro.PrintObserver[[]int64]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Count limit reached before time
// Next: [0, 1, 2] (count reached after 150ms)
// Next: [3, 4, 5] (count reached after 300ms)
// Next: [6, 7, 8] (count reached after 450ms)Practical example: Batching with safety limits
obs := ro.Pipe(
// Simulate user events
ro.Interval(30*time.Millisecond),
ro.Take[int64](20),
ro.BufferWithTimeOrCount[int64](5, 200*time.Millisecond),
ro.Map(func(batch []int64) int {
return len(batch) // Show batch sizes
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Outputs batch sizes based on either count (5) or time (200ms)
// Prevents memory buildup while ensuring timely processingPrototype:func BufferWithTimeOrCount[T any](size int, duration time.Duration)
WindowWhenโ
Branches out the source Observable values as a nested Observable whenever the boundary Observable emits an item, and a new window opens when the boundary Observable emits an item.
boundary := ro.Interval(2000*time.Millisecond)
obs := ro.Pipe(
ro.Interval(500*time.Millisecond),
ro.WindowWhen(boundary),
ro.Take(3),
)
sub := obs.Subscribe(ro.NewObserver[ro.Observable[int64]](
func(window ro.Observable[int64]) {
fmt.Println("New window opened")
windowSub := window.Subscribe(ro.PrintObserver[int64]())
time.Sleep(2500 * time.Millisecond)
windowSub.Unsubscribe()
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("Completed")
},
))
time.Sleep(7000 * time.Millisecond)
sub.Unsubscribe()
// New window opened
// Next: 0, 1, 2, 3 (emitted over 2 seconds)
// New window opened
// Next: 4, 5, 6, 7 (emitted over 2 seconds)
// New window opened
// Next: 8, 9, 10, 11 (emitted over 2 seconds)
// CompletedWith time-based boundary
boundary := ro.Timer(1000*time.Millisecond) // Window closes after 1 second
obs := ro.Pipe(
ro.Just("a", "b", "c", "d", "e", "f"),
ro.WindowWhen(boundary),
)
sub := obs.Subscribe(ro.NewObserver[ro.Observable[string]](
func(window ro.Observable[string]) {
fmt.Println("New window:")
windowSub := window.Subscribe(ro.PrintObserver[string]())
windowSub.Unsubscribe()
},
))
defer sub.Unsubscribe()
// New window:
// Next: a, b, c, d, e, f (all items before boundary)
// CompletedWith count-based boundary
boundary := ro.Pipe(
ro.Interval(500*time.Millisecond),
ro.Map(func(_ int64) string { return "close" }),
)
obs := ro.Pipe(
ro.Just("x1", "x2", "x3", "x4", "x5", "x6", "x7", "x8"),
ro.WindowWhen(boundary),
)
sub := obs.Subscribe(ro.NewObserver[ro.Observable[string]](
func(window ro.Observable[string]) {
fmt.Println("Window opened:")
windowSub := window.Subscribe(ro.PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
windowSub.Unsubscribe()
},
))
defer sub.Unsubscribe()
// Window opened:
// Next: x1 (first item before first boundary)
// Window opened:
// Next: x2 (second item before second boundary)
// Window opened:
// Next: x3 (third item before third boundary)
// ... and so onWith complex boundary conditions
// Boundary emits every 3 source items
counter := 0
boundary := ro.Pipe(
ro.Just("trigger"),
ro.Map(func(_ string) int {
counter++
return counter
}),
ro.Filter(func(c int) bool { return c%3 == 0 }),
)
obs := ro.Pipe(
ro.Just("a", "b", "c", "d", "e", "f", "g", "h", "i"),
ro.WindowWhen(boundary),
)
sub := obs.Subscribe(ro.NewObserver[ro.Observable[string]](
func(window ro.Observable[string]) {
fmt.Println("Window:")
windowSub := window.Subscribe(ro.PrintObserver[string]())
windowSub.Unsubscribe()
},
))
defer sub.Unsubscribe()
// Window:
// Next: a, b, c (first 3 items)
// Window:
// Next: d, e, f (next 3 items)
// Window:
// Next: g, h, i (last 3 items)
// CompletedWith error in source
boundary := ro.Timer(1000*time.Millisecond)
obs := ro.Pipe(
ro.Pipe(
ro.Just("will error"),
ro.Throw[string](errors.New("source error")),
),
ro.WindowWhen(boundary),
)
sub := obs.Subscribe(ro.NewObserver[ro.Observable[string]](
func(window ro.Observable[string]) {
fmt.Println("Window opened")
windowSub := window.Subscribe(ro.PrintObserver[string]())
windowSub.Unsubscribe()
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
))
defer sub.Unsubscribe()
// Error: source errorPrototype:func WindowWhen[T any, B any](boundary Observable[B])
MapErrโ
Transforms items emitted by an observable sequence with a function that can return errors.
obs := ro.Pipe(
ro.Just("1", "2", "three", "4"),
ro.MapErr(func(s string) (int, error) {
return strconv.Atoi(s)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Error: strconv.Atoi: parsing "three": invalid syntaxWith context
obs := ro.Pipe(
ro.Just("file1.txt", "file2.txt", "invalid.txt"),
ro.MapErrWithContext(func(ctx context.Context, filename string) (string, error) {
if !strings.HasSuffix(filename, ".txt") {
return "", fmt.Errorf("invalid file extension: %s", filename)
}
return fmt.Sprintf("processed: %s", filename), nil
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: processed: file1.txt
// Next: processed: file2.txt
// Error: invalid file extension: invalid.txtWith index
obs := ro.Pipe(
ro.Just("apple", "banana", "cherry"),
ro.MapErrI(func(fruit string, index int64) (string, error) {
if index == 1 {
return "", fmt.Errorf("skipping item at index %d", index)
}
return strings.ToUpper(fruit), nil
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: APPLE
// Error: skipping item at index 1With index and context
obs := ro.Pipe(
ro.Just("test1", "test2", "test3"),
ro.MapErrIWithContext(func(ctx context.Context, item string, index int64) (int, error) {
if index > 1 {
return 0, fmt.Errorf("index %d out of range", index)
}
return len(item), nil
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 5
// Next: 5
// Error: index 2 out of rangeSimilar:Prototypes:func MapErr[T any, R any](project func(item T) (R, error))
func MapErrWithContext[T any, R any](project func(ctx context.Context, item T) (R, error))
func MapErrI[T any, R any](project func(item T, index int64) (R, error))
func MapErrIWithContext[T any, R any](project func(ctx context.Context, item T, index int64) (R, error))GroupByโ
Groups the items emitted by an observable sequence according to a specified key selector function.
obs := ro.Pipe(
ro.Just("apple", "banana", "avocado", "blueberry", "cherry"),
ro.GroupBy(func(fruit string) string {
return string(fruit[0]) // Group by first letter
}),
)
sub := obs.Subscribe(ro.PrintObserver[ro.Observable[string]]())
defer sub.Unsubscribe()
// Each emission is an observable of grouped items
// Need to subscribe to each group observableWith context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5, 6),
ro.GroupByWithContext(func(ctx context.Context, n int) string {
if n%2 == 0 {
return "even"
}
return "odd"
}),
)
sub := obs.Subscribe(ro.PrintObserver[ro.Observable[int]]())
defer sub.Unsubscribe()With index
obs := ro.Pipe(
ro.Just("a", "b", "c", "d", "e", "f"),
ro.GroupByI(func(item string, index int64) int {
return int(index / 2) // Group by pairs
}),
)
sub := obs.Subscribe(ro.PrintObserver[ro.Observable[string]]())
defer sub.Unsubscribe()With index and context
obs := ro.Pipe(
ro.Just("file1.txt", "image1.jpg", "file2.txt", "image2.jpg"),
ro.GroupByIWithContext(func(ctx context.Context, filename string, index int64) string {
if strings.HasSuffix(filename, ".jpg") {
return "images"
}
return "documents"
}),
)
sub := obs.Subscribe(ro.PrintObserver[ro.Observable[string]]())
defer sub.Unsubscribe()Processing groups example
// Source observable
source := ro.Just("apple", "apricot", "banana", "blueberry", "cherry")
// Group by first letter
groupedObs := ro.Pipe(
source,
ro.GroupBy(func(fruit string) string {
return string(fruit[0])
}),
)
// Subscribe to groups
groupedSub := groupedObs.Subscribe(ro.NewObserver[ro.Observable[string]](
func(group ro.Observable[string]) {
// Subscribe to each group
groupSub := group.Subscribe(ro.NewObserver[string](
func(item string) {
fmt.Printf("Group item: %s\n", item)
},
func(err error) {
fmt.Printf("Group error: %v\n", err)
},
func() {
fmt.Println("Group completed")
},
))
defer groupSub.Unsubscribe()
},
func(err error) {
fmt.Printf("Error: %v\n", err)
},
func() {
fmt.Println("All groups completed")
},
))
defer groupedSub.Unsubscribe()Prototypes:func GroupBy[T any, K comparable](keySelector func(item T) K)
func GroupByWithContext[T any, K comparable](keySelector func(ctx context.Context, item T) K)
func GroupByI[T any, K comparable](keySelector func(item T, index int64) K)
func GroupByIWithContext[T any, K comparable](keySelector func(ctx context.Context, item T, index int64) K)