Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Creation operatorsโ€‹

This page lists all creation operators, available in the core package of ro.

  • Creates an Observable that emits a specific sequence of values.

    obs := ro.Just(1, 2, 3, 4, 5)

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Next: 5
    // Completed

    Just with no values

    obs := ro.Just[int]() // Equivalent to ro.Empty[int]()

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed

    Just with complex types

    type Person struct {
    Name string
    Age int
    }

    obs := ro.Just(
    Person{"Alice", 25},
    Person{"Bob", 30},
    Person{"Charlie", 35},
    )

    sub := obs.Subscribe(ro.PrintObserver[Person]())
    defer sub.Unsubscribe()

    // Next: {Alice 25}
    // Next: {Bob 30}
    // Next: {Charlie 35}
    // Completed
    Variant:
    Prototypes:
    func Just[T any](values ...T)
    func Of[T any](values ...T)
  • Creates an Observable that emits a sequence of numbers within a specified range.

    obs := ro.Range(1, 5)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Next: 5
    // Completed

    Negative range

    obs := ro.Range(5, 1)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 5
    // Next: 4
    // Next: 3
    // Next: 2
    // Next: 1
    // Completed

    Single value range

    obs := ro.Range(5, 5)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 5
    // Completed
    Prototype:
    func Range(start int64, end int64)
  • RangeWithIntervalโ€‹

    Creates an Observable that emits a sequence of numbers within a specified range with a time interval between emissions.

    obs := ro.RangeWithInterval(1, 5, 100*time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 1 (after 0ms)
    // Next: 2 (after 100ms)
    // Next: 3 (after 200ms)
    // Next: 4 (after 300ms)
    // Next: 5 (after 400ms)
    // Completed

    Negative range with interval

    obs := ro.RangeWithInterval(5, 1, 100*time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 5 (after 0ms)
    // Next: 4 (after 100ms)
    // Next: 3 (after 200ms)
    // Next: 2 (after 300ms)
    // Next: 1 (after 400ms)
    // Completed
    Prototype:
    func RangeWithInterval(start int64, end int64, interval time.Duration)
  • Creates an Observable that emits a sequence of numbers within a specified range with a custom step.

    obs := ro.RangeWithStep(1, 10, 2)

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 3
    // Next: 5
    // Next: 7
    // Next: 9
    // Completed

    Fractional range

    obs := ro.RangeWithStep(0.5, 2.5, 0.5)

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 0.5
    // Next: 1
    // Next: 1.5
    // Next: 2
    // Next: 2.5
    // Completed

    Negative step

    obs := ro.RangeWithStep(10, 1, -2)

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 10
    // Next: 8
    // Next: 6
    // Next: 4
    // Next: 2
    // Completed
    Prototype:
    func RangeWithStep(start float64, end float64, step float64)
  • RangeWithStepAndIntervalโ€‹

    Creates an Observable that emits a sequence of numbers within a specified range with a custom step and time interval between emissions.

    obs := ro.RangeWithStepAndInterval(1, 10, 2, 100*time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 1 (after 0ms)
    // Next: 3 (after 100ms)
    // Next: 5 (after 200ms)
    // Next: 7 (after 300ms)
    // Next: 9 (after 400ms)
    // Completed

    Fractional range with interval

    obs := ro.RangeWithStepAndInterval(0.5, 2.5, 0.5, 100*time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 0.5 (after 0ms)
    // Next: 1 (after 100ms)
    // Next: 1.5 (after 200ms)
    // Next: 2 (after 300ms)
    // Next: 2.5 (after 400ms)
    // Completed

    Negative step with interval

    obs := ro.RangeWithStepAndInterval(10, 1, -2, 100*time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: 10 (after 0ms)
    // Next: 8 (after 100ms)
    // Next: 6 (after 200ms)
    // Next: 4 (after 300ms)
    // Next: 2 (after 400ms)
    // Completed
    Prototype:
    func RangeWithStepAndInterval(start float64, end float64, step float64, interval time.Duration)
  • Creates an Observable that emits sequential numbers every specified interval of time.

    obs := ro.Interval(100 * time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(550 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (after 100ms)
    // Next: 1 (after 200ms)
    // Next: 2 (after 300ms)
    // Next: 3 (after 400ms)
    // Next: 4 (after 500ms)

    Using Interval for periodic operations

    obs := ro.Interval(1 * time.Second)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(3500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (after 1 second)
    // Next: 1 (after 2 seconds)
    // Next: 2 (after 3 seconds)

    Practical example: Heartbeat simulation

    ticker := ro.Interval(500 * time.Millisecond)
    heartbeat := ro.Pipe(ticker, ro.Map(func(i int64) string {
    return "โค๏ธ"
    }))

    sub := heartbeat.Subscribe(ro.PrintObserver[string]())
    time.Sleep(2200 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "โค๏ธ" (after 500ms)
    // Next: "โค๏ธ" (after 1000ms)
    // Next: "โค๏ธ" (after 1500ms)
    // Next: "โค๏ธ" (after 2000ms)

    With Take for limited emissions

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Take[int64](5),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 0
    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Completed

    Edge case: Very short interval

    obs := ro.Interval(1 * time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(10 * time.Millisecond)
    sub.Unsubscribe()

    // Will emit rapidly based on system scheduling
    // Next: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9...
    Prototype:
    func Interval(interval time.Duration)
  • IntervalWithInitialโ€‹

    Creates an Observable that emits sequential numbers starting after an initial delay, then continuing at specified intervals.

    obs := ro.IntervalWithInitial(200*time.Millisecond, 100*time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(650 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (after 200ms - initial delay)
    // Next: 1 (after 300ms)
    // Next: 2 (after 400ms)
    // Next: 3 (after 500ms)
    // Next: 4 (after 600ms)

    Long initial delay with short interval

    obs := ro.IntervalWithInitial(1*time.Second, 200*time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(2000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (after 1000ms - initial delay)
    // Next: 1 (after 1200ms)
    // Next: 2 (after 1400ms)
    // Next: 3 (after 1600ms)
    // Next: 4 (after 1800ms)
    // Next: 5 (after 2000ms)

    Short initial delay with long interval

    obs := ro.IntervalWithInitial(100*time.Millisecond, 1*time.Second)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(2500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (after 100ms - initial delay)
    // Next: 1 (after 1100ms)
    // Next: 2 (after 2100ms)

    Practical example: Delayed heartbeat

    heartbeat := ro.IntervalWithInitial(1*time.Second, 500*time.Millisecond)

    sub := heartbeat.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(3000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (after 1000ms - initial delay before heartbeat starts)
    // Next: 1 (after 1500ms)
    // Next: 2 (after 2000ms)
    // Next: 3 (after 2500ms)
    // Next: 4 (after 3000ms)

    With Take for limited emissions

    obs := ro.Pipe(
    ro.IntervalWithInitial(200*time.Millisecond, 100*time.Millisecond),
    ro.Take[int64](5),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: 0 (after 200ms)
    // Next: 1 (after 300ms)
    // Next: 2 (after 400ms)
    // Next: 3 (after 500ms)
    // Next: 4 (after 600ms)
    // Completed

    Edge case: Zero initial delay

    obs := ro.IntervalWithInitial(0, 100*time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(550 * time.Millisecond)
    sub.Unsubscribe()

    // Behaves like regular Interval
    // Next: 0 (immediately)
    // Next: 1 (after 100ms)
    // Next: 2 (after 200ms)
    // Next: 3 (after 300ms)
    // Next: 4 (after 400ms)
    // Next: 5 (after 500ms)

    Edge case: Very long initial delay

    obs := ro.IntervalWithInitial(5*time.Second, 1*time.Second)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(8000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (after 5000ms - long initial delay)
    // Next: 1 (after 6000ms)
    // Next: 2 (after 7000ms)
    Prototype:
    func IntervalWithInitial(initial time.Duration, interval time.Duration)
  • Creates an Observable that emits a single value (0) after a specified delay, then completes.

    obs := ro.Timer(1 * time.Second)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(1500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (after 1000ms)
    // Completed

    Short delay

    obs := ro.Timer(100 * time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(200 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (after 100ms)
    // Completed

    With other operators

    obs := ro.Pipe(
    ro.Timer(500*time.Millisecond),
    ro.Map(func(_ int64) string {
    return "Timer fired!"
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "Timer fired!" (after 500ms)
    // Completed

    Timeout simulation

    timeout := ro.Timer(2 * time.Second)
    dataSource := ro.Just("data")

    // Race between timeout and data
    raceResult := ro.Race(timeout, dataSource)

    sub := raceResult.Subscribe(ro.PrintObserver[any]())
    defer sub.Unsubscribe()

    // If data emits before timeout: Next: "data", Completed
    // If timeout fires first: Next: 0, Completed
    Prototype:
    func Timer(d time.Duration)
  • Creates an Observable that emits no items and immediately completes. This creation operator is very useful for unit tests.

    obs := ro.Empty[int]()

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // No items emitted
    // Completed

    As a source for other operators

    obs := ro.Pipe(
    ro.Empty[int](),
    ro.DefaultIfEmpty(-1),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: -1 (default value since source is empty)
    // Completed

    For unit tests

    // pipeline.go
    var pipeline ro.PipeOp(
    ro.Map(func(x int) int {
    return x*2
    })
    ro.Catch(func(err error) ro.Observable[int] {
    return ro.Just(42)
    }),
    )

    // pipeline_test.go
    func TestMyPipeline(t *testing.T) {
    // testing empty source
    obs := pipeline(ro.Empty[int]())

    values, err := ro.Collect(obs)
    defer sub.Unsubscribe()

    t.Assert(...)

    // testing broken source
    obs := pipeline(ro.Throw[int](errors.New("something went wrong")))

    values, err = ro.Collect(obs)
    defer sub.Unsubscribe()

    t.Assert(...)

    // testing inactive stream
    obs := pipeline(ro.Never[int]())

    values, err = ro.Collect(obs)
    defer sub.Unsubscribe()

    t.Assert(...)
    }
    Similar:
    Prototype:
    func Empty[T any]()
  • Creates an Observable that never emits any items and never completes. This creation operator is very useful for unit tests.

    obs := ro.Never[int]()

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // No items ever emitted
    // Never completes

    With timeout for testing

    obs := ro.Never[string]()

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(100 * time.Millisecond)
    sub.Unsubscribe()

    // No items emitted during sleep
    // Never would have completed if we waited forever

    Context timeout:

    obs := ro.Never[string]()

    ctx := context.WithTimeout(100 * time.Millisecond)
    sub := obs.SubscribeWithContext(ctx, ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // No items emitted during sleep
    // Never would have completed if we waited forever

    For long-running operations

    // Simulate a long-running operation that may never complete
    obs := ro.Never[bool]()

    sub := obs.Subscribe(ro.PrintObserver[bool]())
    // In a real app, you might want to add a timeout
    // time.Sleep(5 * time.Second)
    // sub.Unsubscribe()

    For unit tests

    // pipeline.go
    var pipeline ro.PipeOp(
    ro.Map(func(x int) int {
    return x*2
    })
    ro.Catch(func(err error) ro.Observable[int] {
    return ro.Just(42)
    }),
    )

    // pipeline_test.go
    func TestMyPipeline(t *testing.T) {
    // testing empty source
    obs := pipeline(ro.Empty[int]())

    values, err := ro.Collect(obs)
    defer sub.Unsubscribe()

    t.Assert(...)

    // testing broken source
    obs := pipeline(ro.Throw[int](errors.New("something went wrong")))

    values, err = ro.Collect(obs)
    defer sub.Unsubscribe()

    t.Assert(...)

    // testing inactive stream
    obs := pipeline(ro.Never[int]())

    values, err = ro.Collect(obs)
    defer sub.Unsubscribe()

    t.Assert(...)
    }
    Similar:
    Prototype:
    func Never[T any]()
  • Creates an Observable that emits no items and immediately terminates with an error. This creation operator is very useful for unit tests.

    obs := ro.Throw[int](errors.New("something went wrong"))

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Error: something went wrong

    With custom error types

    type CustomError struct {
    Code int
    Message string
    }

    func (e *CustomError) Error() string {
    return fmt.Sprintf("Error %d: %s", e.Code, e.Message)
    }

    obs := ro.Throw[string](&CustomError{Code: 404, Message: "Not found"})

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: Error 404: Not found

    For unit tests

    // pipeline.go
    var pipeline ro.PipeOp(
    ro.Map(func(x int) int {
    return x*2
    })
    ro.Catch(func(err error) ro.Observable[int] {
    return ro.Just(42)
    }),
    )

    // pipeline_test.go
    func TestMyPipeline(t *testing.T) {
    // testing empty source
    obs := pipeline(ro.Empty[int]())

    values, err := ro.Collect(obs)
    defer sub.Unsubscribe()

    t.Assert(...)

    // testing broken source
    obs := pipeline(ro.Throw[int](errors.New("something went wrong")))

    values, err = ro.Collect(obs)
    defer sub.Unsubscribe()

    t.Assert(...)

    // testing inactive stream
    obs := pipeline(ro.Never[int]())

    values, err = ro.Collect(obs)
    defer sub.Unsubscribe()

    t.Assert(...)
    }

    With Retry

    obs := ro.Pipe(
    ro.Throw[int](errors.New("network error")),
    ro.Retry(3),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(100 * time.Millisecond) // Give retry attempts time
    sub.Unsubscribe()

    // Will attempt retry 3 times before propagating the error
    // Error: network error

    With error handling

    obs := ro.Pipe(
    ro.Throw[int](errors.New("original error")),
    ro.Catch(func(err error) ro.Observable[int] {
    return ro.Just(42) // fallback value
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 42 (fallback from Catch)
    // Completed
    Similar:
    Prototype:
    func Throw[T any](err error)
  • Creates an Observable that calls the specified factory function for each subscriber to create a new Observable.

    obs := ro.Defer(func() ro.Observable[int] {
    return ro.Just(1, 2, 3)
    })

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed

    Fresh observable for each subscriber

    obs := ro.Defer(func() ro.Observable[int] {
    counter := 0
    return ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Take[int64](3),
    ro.Map(func(i int64) int {
    counter++
    return counter
    }),
    )
    })

    // Each subscriber gets a fresh observable
    sub1 := obs.Subscribe(ro.PrintObserver[int]())
    sub2 := obs.Subscribe(ro.PrintObserver[int]())

    time.Sleep(500 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3 (each subscriber gets their own counter)
    // Completed

    With dynamic content

    obs := ro.Defer(func() ro.Observable[string] {
    timestamp := time.Now().Format("15:04:05")
    return ro.Just(fmt.Sprintf("Created at %s", timestamp))
    })

    sub1 := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(100 * time.Millisecond)
    sub2 := obs.Subscribe(ro.PrintObserver[string]())

    defer sub1.Unsubscribe()
    defer sub2.Unsubscribe()

    // Next: "Created at HH:MM:SS" (different times for sub1 and sub2)

    For resource cleanup

    obs := ro.Defer(func() ro.Observable[string] {
    file, err := os.Open("example.txt")
    if err != nil {
    return ro.Throw[string](err)
    }

    // Return observable that cleans up when done
    return ro.Pipe(
    ro.FromSlice([]string{"line1", "line2", "line3"}),
    ro.Finally(func() {
    file.Close()
    }),
    )
    })

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: line1
    // Next: line2
    // Next: line3
    // Completed (file closed automatically)

    Conditional observable creation

    obs := ro.Defer(func() ro.Observable[int] {
    if time.Now().Hour() < 12 {
    return ro.Just(1) // Morning
    } else if time.Now().Hour() < 18 {
    return ro.Just(2) // Afternoon
    } else {
    return ro.Just(3) // Evening
    }
    })

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1, 2, or 3 depending on current time
    // Completed
    Similar:
    Prototype:
    func Defer[T any](factory func() Observable[T])
  • Creates an Observable from a promise-style function that can resolve with a value or reject with an error.

    obs := ro.Future[string](func(resolve func(string), reject func(error)) {
    go func() {
    time.Sleep(100 * time.Millisecond)
    resolve("Hello from future!")
    }()
    })

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(200 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "Hello from future!" (after 100ms)
    // Completed

    With error rejection

    obs := ro.Future[int](func(resolve func(int), reject func(error)) {
    go func() {
    time.Sleep(50 * time.Millisecond)
    reject(errors.New("something went wrong"))
    }()
    })

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(100 * time.Millisecond)
    sub.Unsubscribe()

    // Error: something went wrong

    HTTP request simulation

    obs := ro.Future[string](func(resolve func(string), reject func(error)) {
    go func() {
    // Simulate HTTP request
    time.Sleep(200 * time.Millisecond)

    // Simulate response
    resolve("{\"status\": \"ok\", \"data\": [1, 2, 3]}")
    }()
    })

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(300 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "{\"status\": \"ok\", \"data\": [1, 2, 3]}"
    // Completed

    With conditional resolution

    obs := ro.Future[int](func(resolve func(int), reject func(error)) {
    go func() {
    // Simulate random success/failure
    time.Sleep(100 * time.Millisecond)
    if rand.Intn(2) == 0 {
    resolve(42)
    } else {
    reject(errors.New("random failure"))
    }
    }()
    })

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(200 * time.Millisecond)
    sub.Unsubscribe()

    // Either:
    // Next: 42
    // Completed
    // Or:
    // Error: random failure

    Database query simulation

    obs := ro.Future[[]string](func(resolve func([]string), reject func(error)) {
    go func() {
    // Simulate database query
    time.Sleep(150 * time.Millisecond)

    // Simulate query results
    resolve([]string{"Alice", "Bob", "Charlie"})
    }()
    })

    sub := obs.Subscribe(ro.PrintObserver[[]string]())
    time.Sleep(300 * time.Millisecond)
    sub.Unsubscribe()

    // Next: ["Alice", "Bob", "Charlie"]
    // Completed
    Similar:
    Prototype:
    func Future[T any](promise func(resolve func(T), reject func(error)))
  • Creates an Observable that emits the result of an action function for each subscriber, running the action synchronously.

    obs := ro.Start(func() int {
    // perform work such as HTTP request, database query...
    return 42
    })

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 42
    // Completed

    With complex computation

    obs := ro.Start(func() string {
    time.Sleep(50 * time.Millisecond) // Simulate work
    return fmt.Sprintf("Result: %d", 21*2)
    })

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "Result: 42" (after ~50ms)
    // Completed

    Fresh execution for each subscriber

    obs := ro.Start(func() int {
    return rand.Intn(100) // Different random number each time
    })

    sub1 := obs.Subscribe(ro.PrintObserver[int]())
    sub2 := obs.Subscribe(ro.PrintObserver[int]())

    defer sub1.Unsubscribe()
    defer sub2.Unsubscribe()

    // Each subscriber gets a fresh execution
    // sub1 might get: 73
    // sub2 might get: 15

    With error handling

    obs := ro.Start(func() string {
    file, err := os.Open("nonexistent.txt")
    if err != nil {
    panic(err) // This will cause the observable to error
    }
    defer file.Close()

    content, _ := io.ReadAll(file)
    return string(content)
    })

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: open nonexistent.txt: no such file or directory

    For expensive calculations

    obs := ro.Start(func() []int {
    // Simulate expensive calculation
    result := make([]int, 0)
    for i := 0; i < 1000; i++ {
    result = append(result, i*i)
    }
    return result
    })

    sub := obs.Subscribe(ro.PrintObserver[[]int]())
    defer sub.Unsubscribe()

    // Next: [0, 1, 4, 9, 16, ..., 998001] (after calculation completes)
    // Completed

    With external dependencies

    obs := ro.Start(func() time.Time {
    return time.Now() // Current time at execution
    })

    sub1 := obs.Subscribe(ro.PrintObserver[time.Time]())
    time.Sleep(10 * time.Millisecond)
    sub2 := obs.Subscribe(ro.PrintObserver[time.Time]())

    defer sub1.Unsubscribe()
    defer sub2.Unsubscribe()

    // Each subscriber gets the time when their subscription started
    // Different times for sub1 and sub2
    Similar:
    Prototype:
    func Start[T any](action func() T)
  • Creates an Observable that repeats the source Observable sequence when it completes.

    source := ro.Just(1, 2, 3)
    obs := ro.Repeat(source)

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(100 * time.Millisecond)
    sub.Unsubscribe()

    // Will repeat indefinitely:
    // Next: 1, 2, 3, 1, 2, 3, 1, 2, 3, ...

    With Take for limited repetitions

    source := ro.Just(1, 2, 3)
    obs := ro.Pipe(
    ro.Repeat(source),
    ro.Take[int](8),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1, 2, 3, 1, 2, 3, 1, 2
    // Completed (after taking 8 items)

    Repeat with interval

    source := ro.Pipe(
    ro.Just("tick"),
    ro.RepeatWithInterval(source, 1*time.Second),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(3500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "tick" (immediately)
    // Next: "tick" (after 1 second)
    // Next: "tick" (after 2 seconds)
    // Next: "tick" (after 3 seconds)

    Repeat complex sequence

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Take[int64](3), // 0, 1, 2
    ro.Repeat(source),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Will repeat the sequence 0, 1, 2 indefinitely
    // 0, 1, 2, 0, 1, 2, 0, 1, 2, ...

    Repeat with error handling

    source := ro.Just(1, 2)
    obs := ro.Pipe(
    ro.Repeat(source),
    ro.Take[int](5),
    ro.Map(func(i int) int {
    if i == 3 {
    panic("error at 3")
    }
    return i
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(100 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 1, 2, 1, 2
    // Error: error at 3
    Prototype:
    func Repeat[T any](source Observable[T])
  • RepeatWithIntervalโ€‹

    Creates an Observable that repeats the source Observable sequence after a specified interval when it completes.

    source := ro.Just("tick")
    obs := ro.RepeatWithInterval(source, 1*time.Second)

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(3500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: "tick" (immediately)
    // Next: "tick" (after 1 second)
    // Next: "tick" (after 2 seconds)
    // Next: "tick" (after 3 seconds)

    With Take for limited repetitions

    source := ro.Just(1, 2, 3)
    obs := ro.Pipe(
    RepeatWithInterval(source, 500*time.Millisecond),
    ro.Take[int](10),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(3000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 1, 2, 3 (immediately)
    // Next: 1, 2, 3 (after 500ms)
    // Next: 1, 2, 3 (after 1000ms)
    // Next: 1 (after 1500ms)
    // Completed (after taking 10 items)

    With complex sequences

    source := ro.Pipe(
    ro.Just("A", "B", "C"),
    )
    obs := ro.RepeatWithInterval(source, 800*time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(3000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: A, B, C (immediately)
    // Next: A, B, C (after 800ms)
    // Next: A, B, C (after 1600ms)
    // Next: A, B, C (after 2400ms)

    For periodic polling

    getTime := ro.Start(func() time.Time { return time.Now() })
    obs := ro.RepeatWithInterval(getTime, 1*time.Second)

    sub := obs.Subscribe(ro.PrintObserver[time.Time]())
    time.Sleep(3500 * time.Millisecond)
    sub.Unsubscribe()

    // Next: <current time> (immediately)
    // Next: <current time + 1s> (after 1 second)
    // Next: <current time + 2s> (after 2 seconds)
    // Next: <current time + 3s> (after 3 seconds)

    With error handling

    source := ro.Just(1, 2)
    obs := ro.RepeatWithInterval(source, 500*time.Millisecond)

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(2000 * time.Millisecond)
    sub.Unsubscribe()

    // Will repeat successfully:
    // Next: 1, 2 (immediately)
    // Next: 1, 2 (after 500ms)
    // Next: 1, 2 (after 1000ms)
    // Next: 1, 2 (after 1500ms)
    Similar:
    Prototype:
    func RepeatWithInterval[T any](source Observable[T], interval time.Duration)
  • Creates an Observable that emits items from a channel until the channel is closed.

    ch := make(chan string)
    go func() {
    ch <- "hello"
    ch <- "world"
    close(ch)
    }()

    obs := ro.FromChannel(ch)

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "hello"
    // Next: "world"
    // Completed

    With buffered channel

    ch := make(chan int, 3)
    ch <- 1
    ch <- 2
    ch <- 3
    close(ch)

    obs := ro.FromChannel(ch)

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed

    With concurrent writes

    ch := make(chan int)
    go func() {
    for i := 1; i <= 5; i++ {
    ch <- i
    time.Sleep(100 * time.Millisecond)
    }
    close(ch)
    }()

    obs := ro.FromChannel(ch)

    sub := obs.Subscribe(ro.PrintObserver[int]())
    time.Sleep(800 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 1 (after ~100ms)
    // Next: 2 (after ~200ms)
    // Next: 3 (after ~300ms)
    // Next: 4 (after ~400ms)
    // Next: 5 (after ~500ms)
    // Completed

    With multiple subscribers

    ch := make(chan string)
    go func() {
    ch <- "shared"
    ch <- "data"
    close(ch)
    }()

    obs := ro.FromChannel(ch)

    sub1 := obs.Subscribe(ro.PrintObserver[string]())
    sub2 := obs.Subscribe(ro.PrintObserver[string]())

    defer sub1.Unsubscribe()
    defer sub2.Unsubscribe()

    // Both subscribers compete for the same channel values
    // One might get both values, the other gets none

    With error channel pattern

    type Result struct {
    Value string
    Err error
    }

    ch := make(chan Result)
    go func() {
    ch <- Result{Value: "success"}
    ch <- Result{Err: errors.New("failed")}
    close(ch)
    }()

    obs := ro.Pipe(
    ro.FromChannel(ch),
    ro.Map(func(r Result) (string, error) {
    if r.Err != nil {
    return "", r.Err
    }
    return r.Value, nil
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: "success"
    // Error: failed
    Similar:
    Prototype:
    func FromChannel[T any](ch <-chan T)
  • Creates an Observable that emits each item from a slice, then completes.

    data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    obs := ro.Pipe(
    ro.FromSlice(data),
    ro.Filter(func(n int) bool { return n%2 == 0 }),
    ro.Map(func(n int) int { return n * n }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 4
    // Next: 16
    // Next: 36
    // Next: 64
    // Next: 100
    // Completed

    With complex types

    type Person struct {
    Name string
    Age int
    }

    people := []Person{
    {"Alice", 25},
    {"Bob", 30},
    {"Charlie", 35},
    }

    obs := ro.FromSlice(people)

    sub := obs.Subscribe(ro.PrintObserver[Person]())
    defer sub.Unsubscribe()

    // Next: {Alice 25}
    // Next: {Bob 30}
    // Next: {Charlie 35}
    // Completed

    Fresh slice for each subscriber

    obs := ro.Defer(func() ro.Observable[int] {
    return ro.FromSlice([]int{
    rand.Intn(100),
    rand.Intn(100),
    })
    })

    sub1 := obs.Subscribe(ro.PrintObserver[int]())
    sub2 := obs.Subscribe(ro.PrintObserver[int]())

    defer sub1.Unsubscribe()
    defer sub2.Unsubscribe()

    // Each subscriber gets potentially different random numbers
    Prototype:
    func FromSlice[T any](slice []T)
  • Creates an Observable that emits a single random integer between 0 (inclusive) and n (exclusive).

    obs := ro.RandIntN(100)

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: <random number between 0-99>
    // Completed

    Multiple random numbers

    obs := ro.Pipe(
    ro.RandIntN(1000),
    ro.RepeatWithInterval(ro.RandIntN(1000), 100*time.Millisecond),
    ro.Take[int64](5),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: <random number 0-999> (immediately)
    // Next: <random number 0-999> (after 100ms)
    // Next: <random number 0-999> (after 200ms)
    // Next: <random number 0-999> (after 300ms)
    // Next: <random number 0-999> (after 400ms)
    // Completed

    With different ranges

    // D6 dice roll
    diceObs := ro.Pipe(
    ro.RandIntN(6), // 0-5, so add 1 for 1-6
    ro.Map(func(n int64) int64 { return n + 1 }),
    )

    sub := diceObs.Subscribe(ro.PrintObserver[int64]())
    defer sub.Unsubscribe()

    // Next: <random number 1-6>
    // Completed

    For simulation or testing

    // Simulate random delays
    delays := ro.Pipe(
    ro.RandIntN(5000), // 0-4999ms
    ro.Map(func(ms int64) time.Duration {
    return time.Duration(ms) * time.Millisecond
    }),
    )

    sub := delays.Subscribe(ro.PrintObserver[time.Duration]())
    defer sub.Unsubscribe()

    // Next: <random duration between 0-4999ms>
    // Completed

    With error probability

    shouldError := ro.Pipe(
    ro.RandIntN(10), // 0-9
    ro.Map(func(n int64) bool {
    return n == 0 // 10% chance of error
    }),
    )

    sub := shouldError.Subscribe(ro.PrintObserver[bool]())
    defer sub.Unsubscribe()

    // Next: true (10% chance) or false (90% chance)
    // Completed
    Similar:
    Prototype:
    func RandIntN(n int64)
  • Creates an Observable that emits a single random float64 value between 0.0 (inclusive) and 1.0 (exclusive).

    obs := ro.RandFloat64()

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: <random float between 0.0-1.0>
    // Completed

    Multiple random values

    obs := ro.Pipe(
    ro.RandFloat64(),
    RepeatWithInterval(ro.RandFloat64(), 100*time.Millisecond),
    ro.Take[float64](5),
    )

    sub := obs.Subscribe(ro.PrintObserver[float64]())
    time.Sleep(1000 * time.Millisecond)
    sub.Unsubscribe()

    // Next: <random float 0.0-1.0> (immediately)
    // Next: <random float 0.0-1.0> (after 100ms)
    // Next: <random float 0.0-1.0> (after 200ms)
    // Next: <random float 0.0-1.0> (after 300ms)
    // Next: <random float 0.0-1.0> (after 400ms)
    // Completed

    For probability calculations

    // 75% chance of success
    successChance := ro.Pipe(
    ro.RandFloat64(),
    ro.Map(func(f float64) bool {
    return f < 0.75
    }),
    )

    sub := successChance.Subscribe(ro.PrintObserver[bool]())
    defer sub.Unsubscribe()

    // Next: true (75% chance) or false (25% chance)
    // Completed

    Custom range mapping

    // Random temperature between 15.0 and 25.0 degrees
    temperature := ro.Pipe(
    ro.RandFloat64(),
    ro.Map(func(f float64) float64 {
    return 15.0 + (f * 10.0) // Map 0-1 to 15-25
    }),
    )

    sub := temperature.Subscribe(ro.PrintObserver[float64]())
    defer sub.Unsubscribe()

    // Next: <random float between 15.0-25.0>
    // Completed

    For testing variations

    // Simulate network latency with random jitter
    baseLatency := 100 * time.Millisecond
    jitterRange := 50 * time.Millisecond

    latency := ro.Pipe(
    ro.RandFloat64(),
    ro.Map(func(f float64) time.Duration {
    jitter := time.Duration(f * float64(jitterRange))
    return baseLatency + jitter
    }),
    )

    sub := latency.Subscribe(ro.PrintObserver[time.Duration]())
    defer sub.Unsubscribe()

    // Next: <duration between 100-150ms>
    // Completed
    Similar:
    Prototype:
    func RandFloat64()