Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Math operatorsโ€‹

This page lists all math operations, available in the core package of ro.

  • Emits the absolute value of each number emitted by the source Observable.

    obs := ro.Pipe(
    ro.Just(-3.5, 2.1, -7.8, 0.0, 5.3),
    ro.Abs(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 3.5
    // Next: 2.1
    // Next: 7.8
    // Next: 0.0
    // Next: 5.3
    // Completed

    With time-based emissions

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Map(func(i int64) float64 {
    return float64(i-5) // Emit -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5...
    }),
    ro.Abs(),
    ro.Take(5),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    time.Sleep(600 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 5
    // Next: 4
    // Next: 3
    // Next: 2
    // Next: 1
    // Completed

    With negative infinity

    obs := ro.Pipe(
    ro.Just(math.Inf(-1), -42.0, math.Inf(1)),
    ro.Abs(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: +Inf
    // Next: 42
    // Next: +Inf
    // Completed
    Prototype:
    func Abs()
  • Calculates the average of the values emitted by the source Observable. It emits the average when the source completes. If the source is empty, it emits NaN.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Average[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 3
    // Completed
    Prototype:
    func Average[T Numeric]()
  • Emits the floor (rounded down) of each number emitted by the source Observable.

    obs := ro.Pipe(
    ro.Just(3.7, 4.2, -2.3, -5.8, 0.0, 7.0),
    ro.Floor(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 3
    // Next: 4
    // Next: -3
    // Next: -6
    // Next: 0
    // Next: 7
    // Completed

    With infinity values

    obs := ro.Pipe(
    ro.Just(math.Inf(-1), -42.7, math.Inf(1), 3.14),
    ro.Floor(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: -Inf
    // Next: -43
    // Next: +Inf
    // Next: 3
    // Completed

    With NaN values

    obs := ro.Pipe(
    ro.Just(math.NaN(), 2.3, math.NaN(), -1.7),
    ro.Floor(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: NaN
    // Next: 2
    // Next: NaN
    // Next: -2
    // Completed

    With time-based emissions

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Map(func(i int64) float64 {
    return float64(i) * 0.7 // Emit 0, 0.7, 1.4, 2.1, 2.8...
    }),
    ro.Floor(),
    ro.Take(5),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    time.Sleep(600 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0
    // Next: 0
    // Next: 1
    // Next: 2
    // Next: 2
    // Completed
    Similar:
    Prototype:
    func Floor()
  • Emits the ceiling (rounded up) of each number emitted by the source Observable.

    obs := ro.Pipe(
    ro.Just(3.2, 4.7, -2.3, -5.8, 0.0, 7.0),
    ro.Ceil(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 4
    // Next: 5
    // Next: -2
    // Next: -5
    // Next: 0
    // Next: 7
    // Completed

    With infinity values

    obs := ro.Pipe(
    ro.Just(math.Inf(-1), -42.7, math.Inf(1), 3.14),
    ro.Ceil(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: -Inf
    // Next: -42
    // Next: +Inf
    // Next: 4
    // Completed

    With NaN values

    obs := ro.Pipe(
    ro.Just(math.NaN(), 2.3, math.NaN(), -1.7),
    ro.Ceil(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: NaN
    // Next: 3
    // Next: NaN
    // Next: -1
    // Completed

    With time-based emissions

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Map(func(i int64) float64 {
    return float64(i) * 0.7 // Emit 0, 0.7, 1.4, 2.1, 2.8...
    }),
    ro.Ceil(),
    ro.Take(5),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    time.Sleep(600 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0
    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 3
    // Completed
    Prototype:
    func Ceil()
  • Counts the number of items emitted by an Observable and emits the total count when the source completes.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Count[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 5
    // Completed

    Count with empty observable

    obs := ro.Pipe(
    ro.Empty[int](),
    ro.Count[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 0
    // Completed

    Count with single value

    obs := ro.Pipe(
    ro.Just("hello"),
    ro.Count[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 1
    // Completed

    Count with complex types

    type Person struct {
    Name string
    Age int
    }

    obs := ro.Pipe(
    ro.Just(
    Person{"Alice", 25},
    Person{"Bob", 30},
    Person{"Charlie", 35},
    ),
    ro.Count[Person](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 3
    // Completed

    Count after filtering

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
    ro.Filter(func(i int) bool {
    return i%2 == 0 // Count even numbers
    }),
    ro.Count[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 5
    // Completed
    Prototype:
    func Count[T any]()
  • Calculates the sum of all values emitted by an Observable sequence and emits the total sum when the source completes.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Sum[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 15
    // Completed

    Sum with floating point numbers

    obs := ro.Pipe(
    ro.Just(1.5, 2.5, 3.5),
    ro.Sum[float64](),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 7.5
    // Completed

    Sum with negative numbers

    obs := ro.Pipe(
    ro.Just(10, -5, 3, -2),
    ro.Sum[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 6
    // Completed

    Sum with single value

    obs := ro.Pipe(
    ro.Just(42),
    ro.Sum[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 42
    // Completed

    Sum with empty observable

    obs := ro.Pipe(
    ro.Empty[int](),
    ro.Sum[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed (no Next values emitted)
    Prototype:
    func Sum[T Numeric]()
  • Applies an accumulator function over an Observable sequence, and returns the final accumulated result when the source completes.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Reduce(func(acc int, item int) int {
    return acc + item
    }, 0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 15
    // Completed

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.ReduceWithContext(func(ctx context.Context, acc int, item int) (context.Context, int) {
    return ctx, acc + item
    }, 0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 15
    // Completed

    With index

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.ReduceI(func(acc int, item int, index int64) int {
    return acc + (item * int(index+1)) // Multiply by position
    }, 0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 55 (0 + 1*1 + 2*2 + 3*3 + 4*4 + 5*5)
    // Completed

    With index and context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.ReduceIWithContext(func(ctx context.Context, acc int, item int, index int64) (context.Context, int) {
    return ctx, acc + (item * int(index+1))
    }, 0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 55
    // Completed

    Reduce to different type

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Reduce(func(acc string, item int) string {
    return fmt.Sprintf("%s%d", acc, item)
    }, ""),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "12345"
    // Completed

    Practical example: Building a map

    obs := ro.Pipe(
    ro.Just("apple", "banana", "cherry"),
    ro.Reduce(func(acc map[string]int, item string) map[string]int {
    acc[item] = len(item)
    return acc
    }, make(map[string]int)),
    )

    sub := obs.Subscribe(ro.PrintObserver[map[string]int]())
    defer sub.Unsubscribe()

    // Next: map[apple:5 banana:6 cherry:6]
    // Completed

    Reduce with no seed (first item as seed)

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Reduce(func(acc int, item int) int {
    return acc * item
    }, 1), // 1 is the seed
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 120 (1 * 2 * 3 * 4 * 5)
    // Completed
    Prototypes:
    func Reduce[T any, R any](accumulator func(agg R, item T) R, seed R)
    func ReduceWithContext[T any, R any](accumulator func(ctx context.Context, agg R, item T) (context.Context, R), seed R)
    func ReduceI[T any, R any](accumulator func(agg R, item T, index int64) R, seed R)
    func ReduceIWithContext[T any, R any](accumulator func(ctx context.Context, agg R, item T, index int64) (context.Context, R), seed R)
  • Emits the rounded values from the source Observable using standard mathematical rounding rules.

    obs := ro.Pipe(
    ro.Just(1.1, 1.5, 1.9, 2.5, -1.5),
    ro.Round(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 1, 2, 2, 2, -2
    // Completed

    With decimal precision

    obs := ro.Pipe(
    ro.Just(3.14159, 2.71828, 1.41421),
    ro.Round(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 3, 3, 1
    // Completed

    With negative numbers

    obs := ro.Pipe(
    ro.Just(-2.3, -2.7, -3.5),
    ro.Round(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: -2, -3, -4
    // Completed

    With integer-like values

    obs := ro.Pipe(
    ro.Just(5.0, 6.0001, 4.9999),
    ro.Round(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 5, 6, 5
    // Completed

    In data processing pipeline

    obs := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.Take[int64](5),
    ro.Map(func(_ int64) float64 {
    return rand.Float64() * 100 // Random values 0-100
    }),
    ro.Round(), // Round to whole numbers
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    time.Sleep(600 * time.Millisecond)
    sub.Unsubscribe()

    // Random rounded integers between 0-100
    // Example: 42, 87, 15, 93, 28

    With financial calculations

    obs := ro.Pipe(
    ro.Just(12.345, 67.890, 123.456),
    ro.Round(),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 12, 68, 123
    // Completed
    Prototype:
    func Round()
  • Finds the minimum value in an observable sequence.

    obs := ro.Pipe(
    ro.Just(5, 3, 8, 1, 4),
    ro.Min[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Completed

    With floats

    obs := ro.Pipe(
    ro.Just(3.14, 2.71, 1.61, 0.99),
    ro.Min[float64](),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 0.99
    // Completed

    With strings

    obs := ro.Pipe(
    ro.Just("zebra", "apple", "banana", "cherry"),
    ro.Min[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: apple
    // Completed

    Empty sequence handling

    obs := ro.Pipe(
    ro.Empty[int](),
    ro.Min[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed (no values emitted)
    Prototype:
    func Min[T constraints.Ordered]()
  • Finds the maximum value in an observable sequence.

    obs := ro.Pipe(
    ro.Just(5, 3, 8, 1, 4),
    ro.Max[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 8
    // Completed

    With floats

    obs := ro.Pipe(
    ro.Just(3.14, 2.71, 1.61, 0.99),
    ro.Max[float64](),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 3.14
    // Completed

    With strings

    obs := ro.Pipe(
    ro.Just("zebra", "apple", "banana", "cherry"),
    ro.Max[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: zebra
    // Completed

    With custom types

    type Person struct {
    Name string
    Age int
    }

    obs := ro.Pipe(
    ro.Just(
    Person{"Alice", 25},
    Person{"Bob", 30},
    Person{"Charlie", 20},
    ),
    ro.Map[Person, int](func(p Person) int { return p.Age }),
    ro.Max[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 30
    // Completed
    Prototype:
    func Max[T constraints.Ordered]()
  • Clamps values to be within a specified range.

    obs := ro.Pipe(
    ro.Just(-5, 0, 5, 10, 15),
    ro.Clamp(0, 10),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 0
    // Next: 0
    // Next: 5
    // Next: 10
    // Next: 10
    // Completed

    With floats

    obs := ro.Pipe(
    ro.Just(-1.5, 0.0, 0.5, 1.0, 1.5),
    ro.Clamp(0.0, 1.0),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 0.0
    // Next: 0.0
    // Next: 0.5
    // Next: 1.0
    // Next: 1.0
    // Completed

    With negative values

    obs := ro.Pipe(
    ro.Just(-20, -10, 0, 10, 20),
    ro.Clamp(-15, 15),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: -15
    // Next: -10
    // Next: 0
    // Next: 10
    // Next: 15
    // Completed

    Edge case: min equals max

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Clamp(3, 3),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 3
    // Next: 3
    // Next: 3
    // Next: 3
    // Next: 3
    // Completed
    Prototype:
    func Clamp[T constraints.Ordered](min, max T)