This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Math operatorsโ
This page lists all math operations, available in the core package of ro.
Absโ
Emits the absolute value of each number emitted by the source Observable.
obs := ro.Pipe(
ro.Just(-3.5, 2.1, -7.8, 0.0, 5.3),
ro.Abs(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 3.5
// Next: 2.1
// Next: 7.8
// Next: 0.0
// Next: 5.3
// CompletedWith time-based emissions
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Map(func(i int64) float64 {
return float64(i-5) // Emit -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5...
}),
ro.Abs(),
ro.Take(5),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
time.Sleep(600 * time.Millisecond)
sub.Unsubscribe()
// Next: 5
// Next: 4
// Next: 3
// Next: 2
// Next: 1
// CompletedWith negative infinity
obs := ro.Pipe(
ro.Just(math.Inf(-1), -42.0, math.Inf(1)),
ro.Abs(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: +Inf
// Next: 42
// Next: +Inf
// CompletedPrototype:func Abs()
Averageโ
Calculates the average of the values emitted by the source Observable. It emits the average when the source completes. If the source is empty, it emits NaN.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Average[int](),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 3
// CompletedPrototype:func Average[T Numeric]()
Floorโ
Emits the floor (rounded down) of each number emitted by the source Observable.
obs := ro.Pipe(
ro.Just(3.7, 4.2, -2.3, -5.8, 0.0, 7.0),
ro.Floor(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 3
// Next: 4
// Next: -3
// Next: -6
// Next: 0
// Next: 7
// CompletedWith infinity values
obs := ro.Pipe(
ro.Just(math.Inf(-1), -42.7, math.Inf(1), 3.14),
ro.Floor(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: -Inf
// Next: -43
// Next: +Inf
// Next: 3
// CompletedWith NaN values
obs := ro.Pipe(
ro.Just(math.NaN(), 2.3, math.NaN(), -1.7),
ro.Floor(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: NaN
// Next: 2
// Next: NaN
// Next: -2
// CompletedWith time-based emissions
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Map(func(i int64) float64 {
return float64(i) * 0.7 // Emit 0, 0.7, 1.4, 2.1, 2.8...
}),
ro.Floor(),
ro.Take(5),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
time.Sleep(600 * time.Millisecond)
sub.Unsubscribe()
// Next: 0
// Next: 0
// Next: 1
// Next: 2
// Next: 2
// CompletedPrototype:func Floor()
Ceilโ
Emits the ceiling (rounded up) of each number emitted by the source Observable.
obs := ro.Pipe(
ro.Just(3.2, 4.7, -2.3, -5.8, 0.0, 7.0),
ro.Ceil(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 4
// Next: 5
// Next: -2
// Next: -5
// Next: 0
// Next: 7
// CompletedWith infinity values
obs := ro.Pipe(
ro.Just(math.Inf(-1), -42.7, math.Inf(1), 3.14),
ro.Ceil(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: -Inf
// Next: -42
// Next: +Inf
// Next: 4
// CompletedWith NaN values
obs := ro.Pipe(
ro.Just(math.NaN(), 2.3, math.NaN(), -1.7),
ro.Ceil(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: NaN
// Next: 3
// Next: NaN
// Next: -1
// CompletedWith time-based emissions
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Map(func(i int64) float64 {
return float64(i) * 0.7 // Emit 0, 0.7, 1.4, 2.1, 2.8...
}),
ro.Ceil(),
ro.Take(5),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
time.Sleep(600 * time.Millisecond)
sub.Unsubscribe()
// Next: 0
// Next: 1
// Next: 2
// Next: 3
// Next: 3
// CompletedPrototype:func Ceil()
Countโ
Counts the number of items emitted by an Observable and emits the total count when the source completes.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Count[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 5
// CompletedCount with empty observable
obs := ro.Pipe(
ro.Empty[int](),
ro.Count[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 0
// CompletedCount with single value
obs := ro.Pipe(
ro.Just("hello"),
ro.Count[string](),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 1
// CompletedCount with complex types
type Person struct {
Name string
Age int
}
obs := ro.Pipe(
ro.Just(
Person{"Alice", 25},
Person{"Bob", 30},
Person{"Charlie", 35},
),
ro.Count[Person](),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 3
// CompletedCount after filtering
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
ro.Filter(func(i int) bool {
return i%2 == 0 // Count even numbers
}),
ro.Count[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 5
// CompletedPrototype:func Count[T any]()
Sumโ
Calculates the sum of all values emitted by an Observable sequence and emits the total sum when the source completes.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Sum[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 15
// CompletedSum with floating point numbers
obs := ro.Pipe(
ro.Just(1.5, 2.5, 3.5),
ro.Sum[float64](),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 7.5
// CompletedSum with negative numbers
obs := ro.Pipe(
ro.Just(10, -5, 3, -2),
ro.Sum[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 6
// CompletedSum with single value
obs := ro.Pipe(
ro.Just(42),
ro.Sum[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 42
// CompletedSum with empty observable
obs := ro.Pipe(
ro.Empty[int](),
ro.Sum[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Completed (no Next values emitted)Prototype:func Sum[T Numeric]()
Reduceโ
Applies an accumulator function over an Observable sequence, and returns the final accumulated result when the source completes.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Reduce(func(acc int, item int) int {
return acc + item
}, 0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 15
// CompletedWith context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.ReduceWithContext(func(ctx context.Context, acc int, item int) (context.Context, int) {
return ctx, acc + item
}, 0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 15
// CompletedWith index
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.ReduceI(func(acc int, item int, index int64) int {
return acc + (item * int(index+1)) // Multiply by position
}, 0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 55 (0 + 1*1 + 2*2 + 3*3 + 4*4 + 5*5)
// CompletedWith index and context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.ReduceIWithContext(func(ctx context.Context, acc int, item int, index int64) (context.Context, int) {
return ctx, acc + (item * int(index+1))
}, 0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 55
// CompletedReduce to different type
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Reduce(func(acc string, item int) string {
return fmt.Sprintf("%s%d", acc, item)
}, ""),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "12345"
// CompletedPractical example: Building a map
obs := ro.Pipe(
ro.Just("apple", "banana", "cherry"),
ro.Reduce(func(acc map[string]int, item string) map[string]int {
acc[item] = len(item)
return acc
}, make(map[string]int)),
)
sub := obs.Subscribe(ro.PrintObserver[map[string]int]())
defer sub.Unsubscribe()
// Next: map[apple:5 banana:6 cherry:6]
// CompletedReduce with no seed (first item as seed)
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Reduce(func(acc int, item int) int {
return acc * item
}, 1), // 1 is the seed
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 120 (1 * 2 * 3 * 4 * 5)
// CompletedSimilar:Prototypes:func Reduce[T any, R any](accumulator func(agg R, item T) R, seed R)
func ReduceWithContext[T any, R any](accumulator func(ctx context.Context, agg R, item T) (context.Context, R), seed R)
func ReduceI[T any, R any](accumulator func(agg R, item T, index int64) R, seed R)
func ReduceIWithContext[T any, R any](accumulator func(ctx context.Context, agg R, item T, index int64) (context.Context, R), seed R)Roundโ
Emits the rounded values from the source Observable using standard mathematical rounding rules.
obs := ro.Pipe(
ro.Just(1.1, 1.5, 1.9, 2.5, -1.5),
ro.Round(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 1, 2, 2, 2, -2
// CompletedWith decimal precision
obs := ro.Pipe(
ro.Just(3.14159, 2.71828, 1.41421),
ro.Round(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 3, 3, 1
// CompletedWith negative numbers
obs := ro.Pipe(
ro.Just(-2.3, -2.7, -3.5),
ro.Round(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: -2, -3, -4
// CompletedWith integer-like values
obs := ro.Pipe(
ro.Just(5.0, 6.0001, 4.9999),
ro.Round(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 5, 6, 5
// CompletedIn data processing pipeline
obs := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.Take[int64](5),
ro.Map(func(_ int64) float64 {
return rand.Float64() * 100 // Random values 0-100
}),
ro.Round(), // Round to whole numbers
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
time.Sleep(600 * time.Millisecond)
sub.Unsubscribe()
// Random rounded integers between 0-100
// Example: 42, 87, 15, 93, 28With financial calculations
obs := ro.Pipe(
ro.Just(12.345, 67.890, 123.456),
ro.Round(),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 12, 68, 123
// CompletedPrototype:func Round()
Minโ
Finds the minimum value in an observable sequence.
obs := ro.Pipe(
ro.Just(5, 3, 8, 1, 4),
ro.Min[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// CompletedWith floats
obs := ro.Pipe(
ro.Just(3.14, 2.71, 1.61, 0.99),
ro.Min[float64](),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 0.99
// CompletedWith strings
obs := ro.Pipe(
ro.Just("zebra", "apple", "banana", "cherry"),
ro.Min[string](),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: apple
// CompletedEmpty sequence handling
obs := ro.Pipe(
ro.Empty[int](),
ro.Min[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Completed (no values emitted)Prototype:func Min[T constraints.Ordered]()
Maxโ
Finds the maximum value in an observable sequence.
obs := ro.Pipe(
ro.Just(5, 3, 8, 1, 4),
ro.Max[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 8
// CompletedWith floats
obs := ro.Pipe(
ro.Just(3.14, 2.71, 1.61, 0.99),
ro.Max[float64](),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 3.14
// CompletedWith strings
obs := ro.Pipe(
ro.Just("zebra", "apple", "banana", "cherry"),
ro.Max[string](),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: zebra
// CompletedWith custom types
type Person struct {
Name string
Age int
}
obs := ro.Pipe(
ro.Just(
Person{"Alice", 25},
Person{"Bob", 30},
Person{"Charlie", 20},
),
ro.Map[Person, int](func(p Person) int { return p.Age }),
ro.Max[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 30
// CompletedPrototype:func Max[T constraints.Ordered]()
Clampโ
Clamps values to be within a specified range.
obs := ro.Pipe(
ro.Just(-5, 0, 5, 10, 15),
ro.Clamp(0, 10),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 0
// Next: 0
// Next: 5
// Next: 10
// Next: 10
// CompletedWith floats
obs := ro.Pipe(
ro.Just(-1.5, 0.0, 0.5, 1.0, 1.5),
ro.Clamp(0.0, 1.0),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 0.0
// Next: 0.0
// Next: 0.5
// Next: 1.0
// Next: 1.0
// CompletedWith negative values
obs := ro.Pipe(
ro.Just(-20, -10, 0, 10, 20),
ro.Clamp(-15, 15),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: -15
// Next: -10
// Next: 0
// Next: 10
// Next: 15
// CompletedEdge case: min equals max
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Clamp(3, 3),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 3
// Next: 3
// Next: 3
// Next: 3
// Next: 3
// CompletedPrototype:func Clamp[T constraints.Ordered](min, max T)