This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Creation operatorsโ
This page lists all creation operators, available in the core package of ro.
Justโ
Creates an Observable that emits a specific sequence of values.
obs := ro.Just(1, 2, 3, 4, 5)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// Next: 5
// CompletedJust with no values
obs := ro.Just[int]() // Equivalent to ro.Empty[int]()
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// CompletedJust with complex types
type Person struct {
Name string
Age int
}
obs := ro.Just(
Person{"Alice", 25},
Person{"Bob", 30},
Person{"Charlie", 35},
)
sub := obs.Subscribe(ro.PrintObserver[Person]())
defer sub.Unsubscribe()
// Next: {Alice 25}
// Next: {Bob 30}
// Next: {Charlie 35}
// CompletedVariant:Prototypes:func Just[T any](values ...T)
func Of[T any](values ...T)Rangeโ
Creates an Observable that emits a sequence of numbers within a specified range.
obs := ro.Range(1, 5)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// Next: 5
// CompletedNegative range
obs := ro.Range(5, 1)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 5
// Next: 4
// Next: 3
// Next: 2
// Next: 1
// CompletedSingle value range
obs := ro.Range(5, 5)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 5
// CompletedPrototype:func Range(start int64, end int64)
RangeWithIntervalโ
Creates an Observable that emits a sequence of numbers within a specified range with a time interval between emissions.
obs := ro.RangeWithInterval(1, 5, 100*time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 1 (after 0ms)
// Next: 2 (after 100ms)
// Next: 3 (after 200ms)
// Next: 4 (after 300ms)
// Next: 5 (after 400ms)
// CompletedNegative range with interval
obs := ro.RangeWithInterval(5, 1, 100*time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 5 (after 0ms)
// Next: 4 (after 100ms)
// Next: 3 (after 200ms)
// Next: 2 (after 300ms)
// Next: 1 (after 400ms)
// CompletedPrototype:func RangeWithInterval(start int64, end int64, interval time.Duration)
RangeWithStepโ
Creates an Observable that emits a sequence of numbers within a specified range with a custom step.
obs := ro.RangeWithStep(1, 10, 2)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 1
// Next: 3
// Next: 5
// Next: 7
// Next: 9
// CompletedFractional range
obs := ro.RangeWithStep(0.5, 2.5, 0.5)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 0.5
// Next: 1
// Next: 1.5
// Next: 2
// Next: 2.5
// CompletedNegative step
obs := ro.RangeWithStep(10, 1, -2)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 10
// Next: 8
// Next: 6
// Next: 4
// Next: 2
// CompletedPrototype:func RangeWithStep(start float64, end float64, step float64)
RangeWithStepAndIntervalโ
Creates an Observable that emits a sequence of numbers within a specified range with a custom step and time interval between emissions.
obs := ro.RangeWithStepAndInterval(1, 10, 2, 100*time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 1 (after 0ms)
// Next: 3 (after 100ms)
// Next: 5 (after 200ms)
// Next: 7 (after 300ms)
// Next: 9 (after 400ms)
// CompletedFractional range with interval
obs := ro.RangeWithStepAndInterval(0.5, 2.5, 0.5, 100*time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 0.5 (after 0ms)
// Next: 1 (after 100ms)
// Next: 1.5 (after 200ms)
// Next: 2 (after 300ms)
// Next: 2.5 (after 400ms)
// CompletedNegative step with interval
obs := ro.RangeWithStepAndInterval(10, 1, -2, 100*time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: 10 (after 0ms)
// Next: 8 (after 100ms)
// Next: 6 (after 200ms)
// Next: 4 (after 300ms)
// Next: 2 (after 400ms)
// CompletedPrototype:func RangeWithStepAndInterval(start float64, end float64, step float64, interval time.Duration)
Intervalโ
Creates an Observable that emits sequential numbers every specified interval of time.
obs := ro.Interval(100 * time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(550 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (after 100ms)
// Next: 1 (after 200ms)
// Next: 2 (after 300ms)
// Next: 3 (after 400ms)
// Next: 4 (after 500ms)Using Interval for periodic operations
obs := ro.Interval(1 * time.Second)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(3500 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (after 1 second)
// Next: 1 (after 2 seconds)
// Next: 2 (after 3 seconds)Practical example: Heartbeat simulation
ticker := ro.Interval(500 * time.Millisecond)
heartbeat := ro.Pipe(ticker, ro.Map(func(i int64) string {
return "โค๏ธ"
}))
sub := heartbeat.Subscribe(ro.PrintObserver[string]())
time.Sleep(2200 * time.Millisecond)
sub.Unsubscribe()
// Next: "โค๏ธ" (after 500ms)
// Next: "โค๏ธ" (after 1000ms)
// Next: "โค๏ธ" (after 1500ms)
// Next: "โค๏ธ" (after 2000ms)With Take for limited emissions
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Take[int64](5),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 0
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// CompletedEdge case: Very short interval
obs := ro.Interval(1 * time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(10 * time.Millisecond)
sub.Unsubscribe()
// Will emit rapidly based on system scheduling
// Next: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9...Similar:Prototype:func Interval(interval time.Duration)
IntervalWithInitialโ
Creates an Observable that emits sequential numbers starting after an initial delay, then continuing at specified intervals.
obs := ro.IntervalWithInitial(200*time.Millisecond, 100*time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(650 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (after 200ms - initial delay)
// Next: 1 (after 300ms)
// Next: 2 (after 400ms)
// Next: 3 (after 500ms)
// Next: 4 (after 600ms)Long initial delay with short interval
obs := ro.IntervalWithInitial(1*time.Second, 200*time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(2000 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (after 1000ms - initial delay)
// Next: 1 (after 1200ms)
// Next: 2 (after 1400ms)
// Next: 3 (after 1600ms)
// Next: 4 (after 1800ms)
// Next: 5 (after 2000ms)Short initial delay with long interval
obs := ro.IntervalWithInitial(100*time.Millisecond, 1*time.Second)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(2500 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (after 100ms - initial delay)
// Next: 1 (after 1100ms)
// Next: 2 (after 2100ms)Practical example: Delayed heartbeat
heartbeat := ro.IntervalWithInitial(1*time.Second, 500*time.Millisecond)
sub := heartbeat.Subscribe(ro.PrintObserver[int64]())
time.Sleep(3000 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (after 1000ms - initial delay before heartbeat starts)
// Next: 1 (after 1500ms)
// Next: 2 (after 2000ms)
// Next: 3 (after 2500ms)
// Next: 4 (after 3000ms)With Take for limited emissions
obs := ro.Pipe(
ro.IntervalWithInitial(200*time.Millisecond, 100*time.Millisecond),
ro.Take[int64](5),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: 0 (after 200ms)
// Next: 1 (after 300ms)
// Next: 2 (after 400ms)
// Next: 3 (after 500ms)
// Next: 4 (after 600ms)
// CompletedEdge case: Zero initial delay
obs := ro.IntervalWithInitial(0, 100*time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(550 * time.Millisecond)
sub.Unsubscribe()
// Behaves like regular Interval
// Next: 0 (immediately)
// Next: 1 (after 100ms)
// Next: 2 (after 200ms)
// Next: 3 (after 300ms)
// Next: 4 (after 400ms)
// Next: 5 (after 500ms)Edge case: Very long initial delay
obs := ro.IntervalWithInitial(5*time.Second, 1*time.Second)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(8000 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (after 5000ms - long initial delay)
// Next: 1 (after 6000ms)
// Next: 2 (after 7000ms)Prototype:func IntervalWithInitial(initial time.Duration, interval time.Duration)
Timerโ
Creates an Observable that emits a single value (0) after a specified delay, then completes.
obs := ro.Timer(1 * time.Second)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(1500 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (after 1000ms)
// CompletedShort delay
obs := ro.Timer(100 * time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(200 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (after 100ms)
// CompletedWith other operators
obs := ro.Pipe(
ro.Timer(500*time.Millisecond),
ro.Map(func(_ int64) string {
return "Timer fired!"
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Next: "Timer fired!" (after 500ms)
// CompletedTimeout simulation
timeout := ro.Timer(2 * time.Second)
dataSource := ro.Just("data")
// Race between timeout and data
raceResult := ro.Race(timeout, dataSource)
sub := raceResult.Subscribe(ro.PrintObserver[any]())
defer sub.Unsubscribe()
// If data emits before timeout: Next: "data", Completed
// If timeout fires first: Next: 0, CompletedSimilar:Prototype:func Timer(d time.Duration)
Emptyโ
Creates an Observable that emits no items and immediately completes. This creation operator is very useful for unit tests.
obs := ro.Empty[int]()
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// No items emitted
// CompletedAs a source for other operators
obs := ro.Pipe(
ro.Empty[int](),
ro.DefaultIfEmpty(-1),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: -1 (default value since source is empty)
// CompletedFor unit tests
// pipeline.go
var pipeline ro.PipeOp(
ro.Map(func(x int) int {
return x*2
})
ro.Catch(func(err error) ro.Observable[int] {
return ro.Just(42)
}),
)
// pipeline_test.go
func TestMyPipeline(t *testing.T) {
// testing empty source
obs := pipeline(ro.Empty[int]())
values, err := ro.Collect(obs)
defer sub.Unsubscribe()
t.Assert(...)
// testing broken source
obs := pipeline(ro.Throw[int](errors.New("something went wrong")))
values, err = ro.Collect(obs)
defer sub.Unsubscribe()
t.Assert(...)
// testing inactive stream
obs := pipeline(ro.Never[int]())
values, err = ro.Collect(obs)
defer sub.Unsubscribe()
t.Assert(...)
}Prototype:func Empty[T any]()
Neverโ
Creates an Observable that never emits any items and never completes. This creation operator is very useful for unit tests.
obs := ro.Never[int]()
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// No items ever emitted
// Never completesWith timeout for testing
obs := ro.Never[string]()
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(100 * time.Millisecond)
sub.Unsubscribe()
// No items emitted during sleep
// Never would have completed if we waited foreverContext timeout:
obs := ro.Never[string]()
ctx := context.WithTimeout(100 * time.Millisecond)
sub := obs.SubscribeWithContext(ctx, ro.PrintObserver[string]())
defer sub.Unsubscribe()
// No items emitted during sleep
// Never would have completed if we waited foreverFor long-running operations
// Simulate a long-running operation that may never complete
obs := ro.Never[bool]()
sub := obs.Subscribe(ro.PrintObserver[bool]())
// In a real app, you might want to add a timeout
// time.Sleep(5 * time.Second)
// sub.Unsubscribe()For unit tests
// pipeline.go
var pipeline ro.PipeOp(
ro.Map(func(x int) int {
return x*2
})
ro.Catch(func(err error) ro.Observable[int] {
return ro.Just(42)
}),
)
// pipeline_test.go
func TestMyPipeline(t *testing.T) {
// testing empty source
obs := pipeline(ro.Empty[int]())
values, err := ro.Collect(obs)
defer sub.Unsubscribe()
t.Assert(...)
// testing broken source
obs := pipeline(ro.Throw[int](errors.New("something went wrong")))
values, err = ro.Collect(obs)
defer sub.Unsubscribe()
t.Assert(...)
// testing inactive stream
obs := pipeline(ro.Never[int]())
values, err = ro.Collect(obs)
defer sub.Unsubscribe()
t.Assert(...)
}Prototype:func Never[T any]()
Throwโ
Creates an Observable that emits no items and immediately terminates with an error. This creation operator is very useful for unit tests.
obs := ro.Throw[int](errors.New("something went wrong"))
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Error: something went wrongWith custom error types
type CustomError struct {
Code int
Message string
}
func (e *CustomError) Error() string {
return fmt.Sprintf("Error %d: %s", e.Code, e.Message)
}
obs := ro.Throw[string](&CustomError{Code: 404, Message: "Not found"})
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: Error 404: Not foundFor unit tests
// pipeline.go
var pipeline ro.PipeOp(
ro.Map(func(x int) int {
return x*2
})
ro.Catch(func(err error) ro.Observable[int] {
return ro.Just(42)
}),
)
// pipeline_test.go
func TestMyPipeline(t *testing.T) {
// testing empty source
obs := pipeline(ro.Empty[int]())
values, err := ro.Collect(obs)
defer sub.Unsubscribe()
t.Assert(...)
// testing broken source
obs := pipeline(ro.Throw[int](errors.New("something went wrong")))
values, err = ro.Collect(obs)
defer sub.Unsubscribe()
t.Assert(...)
// testing inactive stream
obs := pipeline(ro.Never[int]())
values, err = ro.Collect(obs)
defer sub.Unsubscribe()
t.Assert(...)
}With Retry
obs := ro.Pipe(
ro.Throw[int](errors.New("network error")),
ro.Retry(3),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(100 * time.Millisecond) // Give retry attempts time
sub.Unsubscribe()
// Will attempt retry 3 times before propagating the error
// Error: network errorWith error handling
obs := ro.Pipe(
ro.Throw[int](errors.New("original error")),
ro.Catch(func(err error) ro.Observable[int] {
return ro.Just(42) // fallback value
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 42 (fallback from Catch)
// CompletedPrototype:func Throw[T any](err error)
Deferโ
Creates an Observable that calls the specified factory function for each subscriber to create a new Observable.
obs := ro.Defer(func() ro.Observable[int] {
return ro.Just(1, 2, 3)
})
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// CompletedFresh observable for each subscriber
obs := ro.Defer(func() ro.Observable[int] {
counter := 0
return ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Take[int64](3),
ro.Map(func(i int64) int {
counter++
return counter
}),
)
})
// Each subscriber gets a fresh observable
sub1 := obs.Subscribe(ro.PrintObserver[int]())
sub2 := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(500 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3 (each subscriber gets their own counter)
// CompletedWith dynamic content
obs := ro.Defer(func() ro.Observable[string] {
timestamp := time.Now().Format("15:04:05")
return ro.Just(fmt.Sprintf("Created at %s", timestamp))
})
sub1 := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(100 * time.Millisecond)
sub2 := obs.Subscribe(ro.PrintObserver[string]())
defer sub1.Unsubscribe()
defer sub2.Unsubscribe()
// Next: "Created at HH:MM:SS" (different times for sub1 and sub2)For resource cleanup
obs := ro.Defer(func() ro.Observable[string] {
file, err := os.Open("example.txt")
if err != nil {
return ro.Throw[string](err)
}
// Return observable that cleans up when done
return ro.Pipe(
ro.FromSlice([]string{"line1", "line2", "line3"}),
ro.Finally(func() {
file.Close()
}),
)
})
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: line1
// Next: line2
// Next: line3
// Completed (file closed automatically)Conditional observable creation
obs := ro.Defer(func() ro.Observable[int] {
if time.Now().Hour() < 12 {
return ro.Just(1) // Morning
} else if time.Now().Hour() < 18 {
return ro.Just(2) // Afternoon
} else {
return ro.Just(3) // Evening
}
})
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1, 2, or 3 depending on current time
// CompletedPrototype:func Defer[T any](factory func() Observable[T])
Futureโ
Creates an Observable from a promise-style function that can resolve with a value or reject with an error.
obs := ro.Future[string](func(resolve func(string), reject func(error)) {
go func() {
time.Sleep(100 * time.Millisecond)
resolve("Hello from future!")
}()
})
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
sub.Unsubscribe()
// Next: "Hello from future!" (after 100ms)
// CompletedWith error rejection
obs := ro.Future[int](func(resolve func(int), reject func(error)) {
go func() {
time.Sleep(50 * time.Millisecond)
reject(errors.New("something went wrong"))
}()
})
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(100 * time.Millisecond)
sub.Unsubscribe()
// Error: something went wrongHTTP request simulation
obs := ro.Future[string](func(resolve func(string), reject func(error)) {
go func() {
// Simulate HTTP request
time.Sleep(200 * time.Millisecond)
// Simulate response
resolve("{\"status\": \"ok\", \"data\": [1, 2, 3]}")
}()
})
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(300 * time.Millisecond)
sub.Unsubscribe()
// Next: "{\"status\": \"ok\", \"data\": [1, 2, 3]}"
// CompletedWith conditional resolution
obs := ro.Future[int](func(resolve func(int), reject func(error)) {
go func() {
// Simulate random success/failure
time.Sleep(100 * time.Millisecond)
if rand.Intn(2) == 0 {
resolve(42)
} else {
reject(errors.New("random failure"))
}
}()
})
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(200 * time.Millisecond)
sub.Unsubscribe()
// Either:
// Next: 42
// Completed
// Or:
// Error: random failureDatabase query simulation
obs := ro.Future[[]string](func(resolve func([]string), reject func(error)) {
go func() {
// Simulate database query
time.Sleep(150 * time.Millisecond)
// Simulate query results
resolve([]string{"Alice", "Bob", "Charlie"})
}()
})
sub := obs.Subscribe(ro.PrintObserver[[]string]())
time.Sleep(300 * time.Millisecond)
sub.Unsubscribe()
// Next: ["Alice", "Bob", "Charlie"]
// CompletedPrototype:func Future[T any](promise func(resolve func(T), reject func(error)))
Startโ
Creates an Observable that emits the result of an action function for each subscriber, running the action synchronously.
obs := ro.Start(func() int {
// perform work such as HTTP request, database query...
return 42
})
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 42
// CompletedWith complex computation
obs := ro.Start(func() string {
time.Sleep(50 * time.Millisecond) // Simulate work
return fmt.Sprintf("Result: %d", 21*2)
})
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Result: 42" (after ~50ms)
// CompletedFresh execution for each subscriber
obs := ro.Start(func() int {
return rand.Intn(100) // Different random number each time
})
sub1 := obs.Subscribe(ro.PrintObserver[int]())
sub2 := obs.Subscribe(ro.PrintObserver[int]())
defer sub1.Unsubscribe()
defer sub2.Unsubscribe()
// Each subscriber gets a fresh execution
// sub1 might get: 73
// sub2 might get: 15With error handling
obs := ro.Start(func() string {
file, err := os.Open("nonexistent.txt")
if err != nil {
panic(err) // This will cause the observable to error
}
defer file.Close()
content, _ := io.ReadAll(file)
return string(content)
})
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: open nonexistent.txt: no such file or directoryFor expensive calculations
obs := ro.Start(func() []int {
// Simulate expensive calculation
result := make([]int, 0)
for i := 0; i < 1000; i++ {
result = append(result, i*i)
}
return result
})
sub := obs.Subscribe(ro.PrintObserver[[]int]())
defer sub.Unsubscribe()
// Next: [0, 1, 4, 9, 16, ..., 998001] (after calculation completes)
// CompletedWith external dependencies
obs := ro.Start(func() time.Time {
return time.Now() // Current time at execution
})
sub1 := obs.Subscribe(ro.PrintObserver[time.Time]())
time.Sleep(10 * time.Millisecond)
sub2 := obs.Subscribe(ro.PrintObserver[time.Time]())
defer sub1.Unsubscribe()
defer sub2.Unsubscribe()
// Each subscriber gets the time when their subscription started
// Different times for sub1 and sub2Prototype:func Start[T any](action func() T)
Repeatโ
Creates an Observable that repeats the source Observable sequence when it completes.
source := ro.Just(1, 2, 3)
obs := ro.Repeat(source)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(100 * time.Millisecond)
sub.Unsubscribe()
// Will repeat indefinitely:
// Next: 1, 2, 3, 1, 2, 3, 1, 2, 3, ...With Take for limited repetitions
source := ro.Just(1, 2, 3)
obs := ro.Pipe(
ro.Repeat(source),
ro.Take[int](8),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1, 2, 3, 1, 2, 3, 1, 2
// Completed (after taking 8 items)Repeat with interval
source := ro.Pipe(
ro.Just("tick"),
ro.RepeatWithInterval(source, 1*time.Second),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(3500 * time.Millisecond)
sub.Unsubscribe()
// Next: "tick" (immediately)
// Next: "tick" (after 1 second)
// Next: "tick" (after 2 seconds)
// Next: "tick" (after 3 seconds)Repeat complex sequence
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Take[int64](3), // 0, 1, 2
ro.Repeat(source),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Will repeat the sequence 0, 1, 2 indefinitely
// 0, 1, 2, 0, 1, 2, 0, 1, 2, ...Repeat with error handling
source := ro.Just(1, 2)
obs := ro.Pipe(
ro.Repeat(source),
ro.Take[int](5),
ro.Map(func(i int) int {
if i == 3 {
panic("error at 3")
}
return i
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(100 * time.Millisecond)
sub.Unsubscribe()
// Next: 1, 2, 1, 2
// Error: error at 3Similar:Prototype:func Repeat[T any](source Observable[T])
RepeatWithIntervalโ
Creates an Observable that repeats the source Observable sequence after a specified interval when it completes.
source := ro.Just("tick")
obs := ro.RepeatWithInterval(source, 1*time.Second)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(3500 * time.Millisecond)
sub.Unsubscribe()
// Next: "tick" (immediately)
// Next: "tick" (after 1 second)
// Next: "tick" (after 2 seconds)
// Next: "tick" (after 3 seconds)With Take for limited repetitions
source := ro.Just(1, 2, 3)
obs := ro.Pipe(
RepeatWithInterval(source, 500*time.Millisecond),
ro.Take[int](10),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(3000 * time.Millisecond)
sub.Unsubscribe()
// Next: 1, 2, 3 (immediately)
// Next: 1, 2, 3 (after 500ms)
// Next: 1, 2, 3 (after 1000ms)
// Next: 1 (after 1500ms)
// Completed (after taking 10 items)With complex sequences
source := ro.Pipe(
ro.Just("A", "B", "C"),
)
obs := ro.RepeatWithInterval(source, 800*time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(3000 * time.Millisecond)
sub.Unsubscribe()
// Next: A, B, C (immediately)
// Next: A, B, C (after 800ms)
// Next: A, B, C (after 1600ms)
// Next: A, B, C (after 2400ms)For periodic polling
getTime := ro.Start(func() time.Time { return time.Now() })
obs := ro.RepeatWithInterval(getTime, 1*time.Second)
sub := obs.Subscribe(ro.PrintObserver[time.Time]())
time.Sleep(3500 * time.Millisecond)
sub.Unsubscribe()
// Next: <current time> (immediately)
// Next: <current time + 1s> (after 1 second)
// Next: <current time + 2s> (after 2 seconds)
// Next: <current time + 3s> (after 3 seconds)With error handling
source := ro.Just(1, 2)
obs := ro.RepeatWithInterval(source, 500*time.Millisecond)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(2000 * time.Millisecond)
sub.Unsubscribe()
// Will repeat successfully:
// Next: 1, 2 (immediately)
// Next: 1, 2 (after 500ms)
// Next: 1, 2 (after 1000ms)
// Next: 1, 2 (after 1500ms)Similar:Prototype:func RepeatWithInterval[T any](source Observable[T], interval time.Duration)
FromChannelโ
Creates an Observable that emits items from a channel until the channel is closed.
ch := make(chan string)
go func() {
ch <- "hello"
ch <- "world"
close(ch)
}()
obs := ro.FromChannel(ch)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "hello"
// Next: "world"
// CompletedWith buffered channel
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
close(ch)
obs := ro.FromChannel(ch)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// CompletedWith concurrent writes
ch := make(chan int)
go func() {
for i := 1; i <= 5; i++ {
ch <- i
time.Sleep(100 * time.Millisecond)
}
close(ch)
}()
obs := ro.FromChannel(ch)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(800 * time.Millisecond)
sub.Unsubscribe()
// Next: 1 (after ~100ms)
// Next: 2 (after ~200ms)
// Next: 3 (after ~300ms)
// Next: 4 (after ~400ms)
// Next: 5 (after ~500ms)
// CompletedWith multiple subscribers
ch := make(chan string)
go func() {
ch <- "shared"
ch <- "data"
close(ch)
}()
obs := ro.FromChannel(ch)
sub1 := obs.Subscribe(ro.PrintObserver[string]())
sub2 := obs.Subscribe(ro.PrintObserver[string]())
defer sub1.Unsubscribe()
defer sub2.Unsubscribe()
// Both subscribers compete for the same channel values
// One might get both values, the other gets noneWith error channel pattern
type Result struct {
Value string
Err error
}
ch := make(chan Result)
go func() {
ch <- Result{Value: "success"}
ch <- Result{Err: errors.New("failed")}
close(ch)
}()
obs := ro.Pipe(
ro.FromChannel(ch),
ro.Map(func(r Result) (string, error) {
if r.Err != nil {
return "", r.Err
}
return r.Value, nil
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "success"
// Error: failedSimilar:Prototype:func FromChannel[T any](ch <-chan T)
FromSliceโ
Creates an Observable that emits each item from a slice, then completes.
data := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
obs := ro.Pipe(
ro.FromSlice(data),
ro.Filter(func(n int) bool { return n%2 == 0 }),
ro.Map(func(n int) int { return n * n }),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 4
// Next: 16
// Next: 36
// Next: 64
// Next: 100
// CompletedWith complex types
type Person struct {
Name string
Age int
}
people := []Person{
{"Alice", 25},
{"Bob", 30},
{"Charlie", 35},
}
obs := ro.FromSlice(people)
sub := obs.Subscribe(ro.PrintObserver[Person]())
defer sub.Unsubscribe()
// Next: {Alice 25}
// Next: {Bob 30}
// Next: {Charlie 35}
// CompletedFresh slice for each subscriber
obs := ro.Defer(func() ro.Observable[int] {
return ro.FromSlice([]int{
rand.Intn(100),
rand.Intn(100),
})
})
sub1 := obs.Subscribe(ro.PrintObserver[int]())
sub2 := obs.Subscribe(ro.PrintObserver[int]())
defer sub1.Unsubscribe()
defer sub2.Unsubscribe()
// Each subscriber gets potentially different random numbersSimilar:Prototype:func FromSlice[T any](slice []T)
RandIntNโ
Creates an Observable that emits a single random integer between 0 (inclusive) and n (exclusive).
obs := ro.RandIntN(100)
sub := obs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: <random number between 0-99>
// CompletedMultiple random numbers
obs := ro.Pipe(
ro.RandIntN(1000),
ro.RepeatWithInterval(ro.RandIntN(1000), 100*time.Millisecond),
ro.Take[int64](5),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Next: <random number 0-999> (immediately)
// Next: <random number 0-999> (after 100ms)
// Next: <random number 0-999> (after 200ms)
// Next: <random number 0-999> (after 300ms)
// Next: <random number 0-999> (after 400ms)
// CompletedWith different ranges
// D6 dice roll
diceObs := ro.Pipe(
ro.RandIntN(6), // 0-5, so add 1 for 1-6
ro.Map(func(n int64) int64 { return n + 1 }),
)
sub := diceObs.Subscribe(ro.PrintObserver[int64]())
defer sub.Unsubscribe()
// Next: <random number 1-6>
// CompletedFor simulation or testing
// Simulate random delays
delays := ro.Pipe(
ro.RandIntN(5000), // 0-4999ms
ro.Map(func(ms int64) time.Duration {
return time.Duration(ms) * time.Millisecond
}),
)
sub := delays.Subscribe(ro.PrintObserver[time.Duration]())
defer sub.Unsubscribe()
// Next: <random duration between 0-4999ms>
// CompletedWith error probability
shouldError := ro.Pipe(
ro.RandIntN(10), // 0-9
ro.Map(func(n int64) bool {
return n == 0 // 10% chance of error
}),
)
sub := shouldError.Subscribe(ro.PrintObserver[bool]())
defer sub.Unsubscribe()
// Next: true (10% chance) or false (90% chance)
// CompletedSimilar:Prototype:func RandIntN(n int64)
RandFloat64โ
Creates an Observable that emits a single random float64 value between 0.0 (inclusive) and 1.0 (exclusive).
obs := ro.RandFloat64()
sub := obs.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: <random float between 0.0-1.0>
// CompletedMultiple random values
obs := ro.Pipe(
ro.RandFloat64(),
RepeatWithInterval(ro.RandFloat64(), 100*time.Millisecond),
ro.Take[float64](5),
)
sub := obs.Subscribe(ro.PrintObserver[float64]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Next: <random float 0.0-1.0> (immediately)
// Next: <random float 0.0-1.0> (after 100ms)
// Next: <random float 0.0-1.0> (after 200ms)
// Next: <random float 0.0-1.0> (after 300ms)
// Next: <random float 0.0-1.0> (after 400ms)
// CompletedFor probability calculations
// 75% chance of success
successChance := ro.Pipe(
ro.RandFloat64(),
ro.Map(func(f float64) bool {
return f < 0.75
}),
)
sub := successChance.Subscribe(ro.PrintObserver[bool]())
defer sub.Unsubscribe()
// Next: true (75% chance) or false (25% chance)
// CompletedCustom range mapping
// Random temperature between 15.0 and 25.0 degrees
temperature := ro.Pipe(
ro.RandFloat64(),
ro.Map(func(f float64) float64 {
return 15.0 + (f * 10.0) // Map 0-1 to 15-25
}),
)
sub := temperature.Subscribe(ro.PrintObserver[float64]())
defer sub.Unsubscribe()
// Next: <random float between 15.0-25.0>
// CompletedFor testing variations
// Simulate network latency with random jitter
baseLatency := 100 * time.Millisecond
jitterRange := 50 * time.Millisecond
latency := ro.Pipe(
ro.RandFloat64(),
ro.Map(func(f float64) time.Duration {
jitter := time.Duration(f * float64(jitterRange))
return baseLatency + jitter
}),
)
sub := latency.Subscribe(ro.PrintObserver[time.Duration]())
defer sub.Unsubscribe()
// Next: <duration between 100-150ms>
// CompletedSimilar:Prototype:func RandFloat64()