This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Error handling operatorsโ
This page lists all error handling operations, available in the core package of ro.
Catchโ
Catches errors on the observable to be handled by returning a new observable.
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, errors.New("number 3 is not allowed")
}
return i * 2, nil
}),
ro.Catch(func(err error) ro.Observable[int] {
fmt.Printf("Error: %v\n", err)
return ro.Just(99) // Fallback value
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 2 (1*2)
// Next: 4 (2*2)
// Error: number 3 is not allowed
// Next: 99 (fallback value)
// CompletedWith retry logic
attempt := 0
obs := ro.Pipe(
ro.Defer(func() ro.Observable[int] {
attempt++
if attempt <= 2 {
return ro.Pipe(
ro.Just(1),
ro.Throw[int](errors.New("network error")),
)
}
return ro.Just(42)
}),
ro.Catch(func(err error) ro.Observable[int] {
fmt.Printf("Attempt %d failed: %v\n", attempt, err)
if attempt < 3 {
return ro.Empty[int]() // Stop this attempt, allow retry
}
return ro.Just(-1) // Final fallback
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Attempt 1 failed: network error
// Attempt 2 failed: network error
// Next: 42 (success on 3rd attempt)
// CompletedWith different error types
obs := ro.Pipe(
ro.Just("data1", "data2", "invalid"),
ro.MapErr(func(s string) (string, error) {
if s == "invalid" {
return "", errors.New("invalid data")
}
return strings.ToUpper(s), nil
}),
ro.Catch(func(err error) ro.Observable[string] {
if strings.Contains(err.Error(), "invalid") {
return ro.Just("DEFAULT") // Handle validation errors
}
return ro.Throw[string](err) // Re-throw other errors
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "DATA1"
// Next: "DATA2"
// Next: "DEFAULT"
// CompletedWith logging and fallback sequence
obs := ro.Pipe(
ro.Just(1, 2, 3, 4),
ro.MapErr(func(i int) (int, error) {
if i%2 == 0 {
return 0, fmt.Errorf("even number %d rejected", i)
}
return i, nil
}),
ro.Catch(func(err error) ro.Observable[int] {
log.Printf("Error caught: %v", err)
// Provide fallback sequence
return ro.FromSlice([]int{100, 200, 300})
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Error caught: even number 2 rejected
// Next: 100
// Next: 200
// Next: 300
// CompletedPrototype:func Catch[T any](finally func(err error) Observable[T])
Retryโ
Retries the source observable sequence when it encounters an error. Retry uses infinite retries with default settings, while RetryWithConfig provides configurable retry behavior.
attempt := 0
obs := ro.Pipe(
ro.Defer(func() Observable[string] {
attempt++
if attempt < 3 {
return ro.Pipe(
ro.Just("data"),
ro.Throw[string](errors.New("temporary failure")),
)
}
return ro.Just("success!")
}),
ro.Retry[string](),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(100 * time.Millisecond) // Allow time for retries
sub.Unsubscribe()
// Next: "success!" (after 3 attempts)
// CompletedRetryWithConfig with limited retries
attempt := 0
obs := ro.Pipe(
ro.Defer(func() Observable[int] {
attempt++
if attempt == 1 {
return ro.Throw[int](errors.New("first attempt failed"))
}
return ro.Just(42)
}),
ro.RetryWithConfig[int](RetryConfig{
MaxRetries: 3,
Delay: 100 * time.Millisecond,
ResetOnSuccess: true,
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Next: 42 (success on second attempt)
// CompletedWith exponential backoff
obs := ro.Pipe(
ro.Defer(func() Observable[string] {
return ro.Pipe(
ro.Just("api_data"),
ro.Throw[string](errors.New("rate limited")),
)
}),
ro.RetryWithConfig[string](RetryConfig{
MaxRetries: 5,
Delay: 1 * time.Second,
ResetOnSuccess: true,
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(6 * time.Second)
sub.Unsubscribe()
// Would retry up to 5 times with 1-second delays
// (assuming API continues to fail)With ResetOnSuccess behavior
successCount := 0
obs := ro.Pipe(
ro.Defer(func() Observable[int] {
successCount++
if successCount <= 2 {
return ro.Just(successCount)
}
return ro.Throw[int](errors.New("suddenly failed"))
}),
ro.RetryWithConfig[int](RetryConfig{
MaxRetries: 3,
Delay: 50 * time.Millisecond,
ResetOnSuccess: true, // Success resets retry counter
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(1000 * time.Millisecond)
sub.Unsubscribe()
// Next: 1 (success, counter resets)
// Next: 2 (success, counter resets)
// Next: 3 (success, counter resets)
// (would retry 3 times after failure since counter reset after each success)Network request simulation
type Response struct {
Data string
Err error
}
simulateAPICall := func() Observable[Response] {
return ro.Defer(func() Observable[Response] {
// Simulate intermittent network failures
if rand.Intn(5) != 0 { // 80% failure rate
return ro.Just(Response{Err: errors.New("network timeout")})
}
return ro.Just(Response{Data: "api_response"})
})
}
obs := ro.Pipe(
simulateAPICall(),
ro.RetryWithConfig[Response](RetryConfig{
MaxRetries: 10,
Delay: 200 * time.Millisecond,
ResetOnSuccess: true,
}),
ro.Map(func(r Response) string {
if r.Err != nil {
return "error: " + r.Err.Error()
}
return "success: " + r.Data
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(3 * time.Second)
sub.Unsubscribe()
// Will keep retrying until successful or max retries reached
// Expected: "success: api_response" (eventually)Variant:Prototypes:func Retry[T any]()
func RetryWithConfig[T any](opts RetryConfig)ThrowIfEmptyโ
Throws an error if the source observable is empty, otherwise emits all items normally.
obs := ro.Pipe(
ro.Empty[int](),
ThrowIfro.Empty[int](func() error {
return errors.New("no data available")
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Error: no data availableWith data present
obs := ro.Pipe(
ro.Just(1, 2, 3),
ThrowIfro.Empty[int](func() error {
return errors.New("this won't be thrown")
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Completed (no error thrown)With filtered data
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Filter(func(i int) bool {
return i > 10 // No items match
}),
ThrowIfro.Empty[int](func() error {
return errors.New("no items found matching criteria")
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Error: no items found matching criteriaWith API response validation
type User struct {
ID int
Name string
}
fetchUsers := func() Observable[User] {
// Simulate empty API response
return ro.FromSlice([]User{})
}
obs := ro.Pipe(
fetchUsers(),
ThrowIfro.Empty[User](func() error {
return errors.New("no users found in database")
}),
)
sub := obs.Subscribe(ro.PrintObserver[User]())
defer sub.Unsubscribe()
// Error: no users found in databaseWith conditional throwing
shouldThrowError := true
obs := ro.Pipe(
ro.Empty[string](),
ThrowIfro.Empty[string](func() error {
if shouldThrowError {
return fmt.Errorf("empty sequence not allowed at %v", time.Now())
}
return nil // No error
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: empty sequence not allowed at [current time]With retry mechanism
attempt := 0
getData := func() Observable[int] {
attempt++
if attempt < 3 {
return ro.Empty[int]() // Simulate empty response
}
return ro.Just(42) // Success on third attempt
}
obs := ro.Pipe(
ro.Defer(getData),
ThrowIfro.Empty[int](func() error {
return fmt.Errorf("attempt %d: no data available", attempt)
}),
ro.RetryWithConfig[int](RetryConfig{
MaxRetries: 5,
Delay: 100 * time.Millisecond,
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
// Error: attempt 1: no data available
// Error: attempt 2: no data available
// Next: 42 (success on third attempt)
// CompletedPrototype:func ThrowIfro.Empty[T any](throw func() error)
OnErrorResumeNextWithโ
Begins emitting a second observable sequence if it encounters an error with the first observable.
primary := ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, errors.New("error occurred")
}
return i, nil
}),
)
fallback := ro.Just(99, 100, 101)
obs := ro.Pipe(primary, ro.OnErrorResumeNextWith(fallback))
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 99 (fallback starts)
// Next: 100
// Next: 101
// CompletedWith multiple fallback sequences
primary := ro.Pipe(
ro.Just("data1", "data2"),
ro.Throw[string](errors.New("primary failed")),
)
fallback1 := ro.Just("fallback1", "fallback2")
fallback2 := ro.Just("final1", "final2")
obs := ro.Pipe(primary, ro.OnErrorResumeNextWith(fallback1, fallback2))
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "data1"
// Next: "data2"
// Next: "fallback1"
// Next: "fallback2"
// (fallback2 is ignored, only first fallback is used)With empty fallback
primary := ro.Throw[int](errors.New("always fails"))
fallback := ro.Empty[int]()
obs := ro.Pipe(primary, ro.OnErrorResumeNextWith(fallback))
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Completed (no items, no error)API fallback pattern
primaryAPI := func() Observable[string] {
return ro.Pipe(
ro.Just("user_data"),
ro.Throw[string](errors.New("API timeout")),
)
}
cacheAPI := func() Observable[string] {
return ro.Just("cached_data")
}
obs := ro.Pipe(
primaryAPI(),
ro.OnErrorResumeNextWith(cacheAPI()),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "user_data" (from primary before error)
// Next: "cached_data" (from fallback)
// CompletedDatabase connection fallback
connectPrimary := func() Observable[string] {
// Simulate primary database failure
return ro.Throw[string](errors.New("primary database unavailable"))
}
connectSecondary := func() Observable[string] {
return ro.Just("connected to secondary database")
}
obs := ro.Pipe(
connectPrimary(),
ro.OnErrorResumeNextWith(connectSecondary()),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "connected to secondary database"
// CompletedWith conditional fallback
shouldUseFallback := true
primary := ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 && shouldUseFallback {
return 0, errors.New("switch to fallback")
}
return i, nil
}),
)
fallback := ro.Pipe(
ro.Just(4, 5, 6),
ro.Map(func(i int) int {
return i * 10
}),
)
obs := ro.Pipe(primary, ro.OnErrorResumeNextWith(fallback))
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 40 (fallback: 4*10)
// Next: 50 (fallback: 5*10)
// Next: 60 (fallback: 6*10)
// CompletedSimilar:Prototype:func OnErrorResumeNextWith[T any](finally ...Observable[T])
OnErrorReturnโ
Emits a particular item when it encounters an error, then completes.
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, errors.New("something went wrong")
}
return i, nil
}),
ro.OnErrorReturn(-1),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: -1 (error fallback)
// CompletedWith string fallback
obs := ro.Pipe(
ro.Just("apple", "banana", "invalid"),
ro.MapErr(func(s string) (string, error) {
if s == "invalid" {
return "", errors.New("invalid fruit")
}
return strings.ToUpper(s), nil
}),
ro.OnErrorReturn("UNKNOWN"),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "APPLE"
// Next: "BANANA"
// Next: "UNKNOWN" (error fallback)
// CompletedAPI request with default value
fetchUser := func(id int) Observable[string] {
return ro.Defer(func() Observable[string] {
if id == 999 {
return ro.Throw[string](errors.New("user not found"))
}
return ro.Just(fmt.Sprintf("User%d", id))
})
}
obs := ro.Pipe(
fetchUser(999),
ro.OnErrorReturn("Guest"),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Guest" (error fallback)
// CompletedWith multiple error handling
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, fmt.Errorf("error at %d", i)
}
return i * 10, nil
}),
ro.OnErrorReturn(-999),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 10 (1*10)
// Next: 20 (2*10)
// Next: -999 (error fallback)
// CompletedConfiguration loading with default
loadConfig := func() Observable[string] {
return ro.Defer(func() Observable[string] {
// Simulate config file not found
return ro.Throw[string](errors.New("config.json not found"))
})
}
obs := ro.Pipe(
loadConfig(),
ro.OnErrorReturn("default_config"),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "default_config" (fallback to default)
// CompletedWith complex fallback object
type User struct {
ID int
Name string
}
fetchUser := func(id int) Observable[User] {
return ro.Defer(func() Observable[User] {
if id <= 0 {
return ro.Throw[User](errors.New("invalid user ID"))
}
return User{ID: id, Name: fmt.Sprintf("User%d", id)}
})
}
obs := ro.Pipe(
fetchUser(-1),
ro.OnErrorReturn(User{ID: 0, Name: "Anonymous"}),
)
sub := obs.Subscribe(ro.PrintObserver[User]())
defer sub.Unsubscribe()
// Next: {ID:0 Name:Anonymous} (error fallback)
// CompletedIn a processing pipeline
processData := func(data []int) Observable[int] {
return ro.Pipe(
ro.FromSlice(data),
ro.MapErr(func(i int) (int, error) {
if i < 0 {
return 0, fmt.Errorf("negative value: %d", i)
}
return i * 2, nil
}),
ro.OnErrorReturn(0), // Use 0 for negative values
)
}
obs := processData([]int{1, 2, -3, 4})
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 2 (1*2)
// Next: 4 (2*2)
// Next: 0 (fallback for -3)
// Next: 8 (4*2)
// CompletedSimilar:Prototype:func OnErrorReturn[T any](finally T)
DoWhileโ
Emits values from the source observable, then repeats the sequence as long as the condition returns true.
counter := 0
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.DoWhile(func() bool {
counter++
return counter <= 3
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1, 2, 3 (1st iteration)
// Next: 1, 2, 3 (2nd iteration)
// Next: 1, 2, 3 (3rd iteration)
// CompletedDoWhileI with index
obs := ro.Pipe(
ro.Just("a", "b"),
ro.DoWhileI(func(index int64) bool {
return index < 2 // Repeat twice (index 0 and 1)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "a", "b" (index 0)
// Next: "a", "b" (index 1)
// CompletedDoWhileWithContext with cancellation
ctx, cancel := context.WithCancel(context.Background())
obs := ro.Pipe(
ro.Just(1, 2),
ro.DoWhileWithContext(func(ctx context.Context) (context.Context, bool) {
select {
case <-ctx.Done():
return ctx, false
default:
return ctx, true // Continue repeating
}
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
// After some items...
cancel() // Stop repeating
defer sub.Unsubscribe()DoWhileIWithContext with index and context
ctx := context.Background()
obs := ro.Pipe(
ro.Just("x"),
ro.DoWhileIWithContext(func(ctx context.Context, index int64) (context.Context, bool) {
fmt.Printf("Iteration %d\n", index)
return ctx, index < 2 // Repeat for 2 iterations
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Iteration 0
// Next: "x"
// Iteration 1
// Next: "x"
// CompletedRetry pattern with DoWhile
maxAttempts := 3
attempt := 0
shouldRetry := func() bool {
attempt++
return attempt <= maxAttempts
}
obs := ro.Pipe(
ro.Defer(func() ro.Observable[int] {
if attempt < maxAttempts {
return ro.Throw[int](errors.New("temporary failure"))
}
return ro.Just(42) // Success on final attempt
}),
ro.DoWhile(shouldRetry),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Would retry until maxAttempts reached
// Next: 42 (success on 3rd attempt)Polling with DoWhile
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
obs := ro.Pipe(
ro.Defer(func() ro.Observable[int] {
// Simulate checking for new data
if rand.Intn(10) == 0 {
return ro.Just(rand.Intn(100))
}
return ro.Empty[int]()
}),
ro.DoWhileWithContext(func(ctx context.Context) (context.Context, bool) {
select {
case <-ticker.C:
return ctx, true // Continue polling
case <-ctx.Done():
return ctx, false
}
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
time.Sleep(1 * time.Second)
sub.Unsubscribe()
// Emits random values as they become available
// Stops after 1 secondWith external state
type GameState struct {
Score int
Lives int
GameOver bool
}
game := &GameState{Lives: 3}
obs := ro.Pipe(
ro.Defer(func() ro.Observable[string] {
if game.Lives <= 0 {
game.GameOver = true
return ro.Just("Game Over")
}
action := fmt.Sprintf("Action - Lives: %d", game.Lives)
game.Lives--
return ro.Just(action)
}),
ro.DoWhile(func() bool {
return !game.GameOver
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Action - Lives: 3"
// Next: "Action - Lives: 2"
// Next: "Action - Lives: 1"
// Next: "Game Over"
// CompletedSimilar:Prototypes:func DoWhile[T any](condition func() bool)
func DoWhileI[T any](condition func(index int64) bool)
func DoWhileWithContext[T any](condition func(context.Context) (context.Context, bool))
func DoWhileIWithContext[T any](condition func(context.Context, index int64) (context.Context, bool))Whileโ
Repeats the source observable as long as the condition returns true. Unlike DoWhile, While checks the condition before each iteration.
counter := 0
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.While(func() bool {
counter++
return counter <= 3
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1, 2, 3 (counter becomes 1, condition: 1 <= 3 = true)
// Next: 1, 2, 3 (counter becomes 2, condition: 2 <= 3 = true)
// Next: 1, 2, 3 (counter becomes 3, condition: 3 <= 3 = true)
// Completed (counter becomes 4, condition: 4 <= 3 = false)WhileI with index
obs := ro.Pipe(
ro.Just("a", "b"),
ro.WhileI(func(index int64) bool {
return index < 2 // Repeat twice (index 0 and 1)
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "a", "b" (index 0)
// Next: "a", "b" (index 1)
// Completed (index 2, condition false)WhileWithContext with cancellation
ctx, cancel := context.WithCancel(context.Background())
obs := ro.Pipe(
ro.Just(1, 2),
ro.WhileWithContext(func(ctx context.Context) (context.Context, bool) {
select {
case <-ctx.Done():
return ctx, false
default:
return ctx, true // Continue repeating
}
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
// Cancel after some iterations
cancel()
defer sub.Unsubscribe()WhileIWithContext with index and context
ctx := context.Background()
obs := ro.Pipe(
ro.Just("test"),
ro.WhileIWithContext(func(ctx context.Context, index int64) (context.Context, bool) {
fmt.Printf("Checking iteration %d\n", index)
return ctx, index < 3 // Repeat 3 times
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Checking iteration 0
// Next: "test"
// Checking iteration 1
// Next: "test"
// Checking iteration 2
// Next: "test"
// CompletedConditional data generation
dataAvailable := true
obs := ro.Pipe(
ro.Defer(func() Observable[int] {
// Simulate data fetch
if !dataAvailable {
return ro.Empty[int]()
}
dataAvailable = rand.Intn(2) == 0 // Randomly set availability
return ro.Just(rand.Intn(100))
}),
ro.While(func() bool {
return dataAvailable
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Emits values while data is available
// Stops when dataAvailable becomes falsePolling with timeout
startTime := time.Now()
timeout := 2 * time.Second
obs := ro.Pipe(
ro.Defer(func() Observable[string] {
// Check if timeout reached
if time.Since(startTime) > timeout {
return ro.Empty[string]()
}
// Simulate checking for messages
if rand.Intn(5) == 0 {
return ro.Just("new message")
}
return ro.Empty[string]()
}),
ro.While(func() bool {
return time.Since(startTime) <= timeout
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(2500 * time.Millisecond)
sub.Unsubscribe()
// Polls for messages for 2 seconds
// Emits "new message" when availableRate-limited processing
processed := 0
maxItems := 10
obs := ro.Pipe(
ro.Defer(func() Observable[int] {
if processed >= maxItems {
return ro.Empty[int]()
}
processed++
return ro.Just(processed)
}),
ro.While(func() bool {
return processed < maxItems
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
// CompletedWith external resource monitoring
type ResourceMonitor struct {
isActive bool
count int
}
monitor := &ResourceMonitor{isActive: true}
obs := ro.Pipe(
ro.Defer(func() Observable[string] {
if !monitor.isActive || monitor.count >= 5 {
return ro.Empty[string]()
}
monitor.count++
return ro.Just(fmt.Sprintf("Resource update %d", monitor.count))
}),
ro.While(func() bool {
return monitor.isActive && monitor.count < 5
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: "Resource update 1"
// Next: "Resource update 2"
// Next: "Resource update 3"
// Next: "Resource update 4"
// Next: "Resource update 5"
// CompletedSimilar:Prototypes:func While[T any](condition func() bool)
func WhileI[T any](condition func(index int64) bool)
func WhileWithContext[T any](condition func(context.Context) (context.Context, bool))
func WhileIWithContext[T any](condition func(context.Context, index int64) (context.Context, bool))