Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Connectable operatorsโ€‹

This page lists all connectable operators, available in the core package of ro.

  • Creates a new Observable that multicasts (shares) the original Observable. This allows multiple subscribers to share the same underlying subscription.

    // Without Share - each subscriber gets separate execution
    source := ro.Interval(100 * time.Millisecond).Take[int64](5)

    obs1 := source
    obs2 := source

    sub1 := obs1.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Sub1: %d\n", value)
    }))

    sub2 := obs2.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Sub2: %d\n", value)
    }))

    time.Sleep(600 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    //
    // Sub1: 0
    // Sub2: 0
    // Sub1: 1
    // Sub2: 1
    // ... (each subscriber gets all values independently)

    With Share - shared execution

    // With Share - subscribers share the same execution
    source := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.Take[int64](5),
    ro.Share[int64](), // Share the observable
    )

    sub1 := source.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Shared Sub1: %d\n", value)
    }))

    // Second subscriber subscribes later
    time.Sleep(250 * time.Millisecond)
    sub2 := source.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Shared Sub2: %d\n", value)
    }))

    time.Sleep(500 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    //
    // Shared Sub1: 0
    // Shared Sub1: 1
    // Shared Sub2: 2 (sub2 starts here)
    // Shared Sub1: 2
    // Shared Sub2: 3
    // Shared Sub1: 3
    // Shared Sub2: 4
    // Shared Sub1: 4
    // (both subscribers share the same sequence)

    With expensive operations

    // Simulate expensive API call
    expensiveOperation := func() Observable[string] {
    return ro.Defer(func() Observable[string] {
    fmt.Println("Expensive API call started...")
    time.Sleep(100 * time.Millisecond)
    return ro.Just("api_result_1", "api_result_2")
    })
    }

    // Without Share - each subscriber triggers separate API call
    withoutShare := expensiveOperation()

    sub1 := withoutShare.Subscribe(ro.PrintObserver[string]())
    sub2 := withoutShare.Subscribe(ro.PrintObserver[string]())

    time.Sleep(200 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    //
    // Expensive API call started...
    // Expensive API call started...
    // Next: api_result_1, Next: api_result_2 (twice)

    // With Share - API call shared
    withShare := ro.Pipe(
    expensiveOperation(),
    Share[string](),
    )

    sub3 := withShare.Subscribe(ro.PrintObserver[string]())
    sub4 := withShare.Subscribe(ro.PrintObserver[string]())

    time.Sleep(200 * time.Millisecond)
    sub3.Unsubscribe()
    sub4.Unsubscribe()

    //
    // Expensive API call started...
    // Next: api_result_1, Next: api_result_2 (once, shared)

    With error handling

    source := ro.Pipe(
    Defer(func() Observable[int] {
    fmt.Println("Source execution started...")
    return ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, errors.New("something went wrong")
    }
    return i, nil
    }),
    )
    }),
    Share[int](),
    )

    sub1 := source.Subscribe(ro.NewObserver(
    func(value int) {
    fmt.Printf("Sub1: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Sub1 Error: %v\n", err)
    },
    func() {
    fmt.Println("Sub1 completion")
    },
    ))

    sub2 := source.Subscribe(ro.NewObserver(
    func(value int) {
    fmt.Printf("Sub2: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Sub2 Error: %v\n", err)
    },
    func() {
    fmt.Println("Sub2 completion")
    },
    ))

    time.Sleep(100 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    //
    // Source execution started...
    // Sub1: 1
    // Sub2: 1
    // Sub1: 2
    // Sub2: 2
    // Sub1 Error: something went wrong
    // Sub2 Error: something went wrong

    With hot observable

    // Create a hot observable (starts immediately)
    hotSource := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    Take[int64](10),
    ro.Share[int64](), // Make it hot and shareable
    )

    // Multiple subscribers can join at different times
    var subs []*Subscription

    for i := 0; i < 3; i++ {
    go func(idx int) {
    time.Sleep(time.Duration(idx) * 150 * time.Millisecond)
    sub := hotSource.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Subscriber %d: %d\n", idx, value)
    }))
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()
    }(i)
    }

    time.Sleep(1200 * time.Millisecond)

    // Each subscriber starts at different times but gets shared values
    // from the point they subscribe onward

    With reference counting

    source := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.Share[int64](),
    )

    fmt.Println("Creating first subscription...")
    sub1 := source.Subscribe(ro.PrintObserver[int64]())

    time.Sleep(250 * time.Millisecond)

    fmt.Println("Creating second subscription...")
    sub2 := source.Subscribe(ro.PrintObserver[int64]())

    time.Sleep(250 * time.Millisecond)

    fmt.Println("Unsubscribing first...")
    sub1.Unsubscribe()

    time.Sleep(250 * time.Millisecond)

    fmt.Println("Unsubscribing second...")
    sub2.Unsubscribe()

    fmt.Println("All subscriptions done")

    // The shared observable manages reference counting automatically
    // Values are emitted while at least one subscription is active
    Prototype:
    func Share[T any]()
  • ShareWithConfigโ€‹

    Creates a shared Observable with customizable configuration. Allows fine-grained control over subject selection and reset behavior.

    config := ShareConfig[string]{
    Connector: func() Subject[string] { return ro.NewPublishSubject[string]() },
    ResetOnError: true,
    ResetOnComplete: true,
    ResetOnRefCountZero: true,
    }

    obs := ro.Pipe(
    ro.Just("hello", "world"),
    ro.ShareWithConfig(config),
    )

    sub1 := obs.Subscribe(ro.PrintObserver[string]())
    sub2 := obs.Subscribe(ro.PrintObserver[string]())

    time.Sleep(100 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    With custom subject

    // Custom subject that logs all operations
    type LoggingSubject struct {
    *PublishSubject[string]
    id string
    }

    func NewLoggingSubject(id string) *LoggingSubject {
    return &LoggingSubject{
    PublishSubject: ro.NewPublishSubject[string](),
    id: id,
    }
    }

    config := ShareConfig[string]{
    Connector: func() Subject[string] {
    subject := ro.NewLoggingSubject("custom-subject")
    fmt.Printf("Created new logging subject: %s\n", subject.id)
    return subject
    },
    ResetOnError: true,
    }

    obs := ro.Pipe(
    ro.Just("data1", "data2"),
    ro.ShareWithConfig(config),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub.Unsubscribe()

    With error reset behavior

    config := ShareConfig[int]{
    Connector: func() Subject[int] { return ro.NewPublishSubject[int]() },
    ResetOnError: true, // Reset on error
    ResetOnComplete: false,
    }

    source := ro.Pipe(
    ro.Defer(func() Observable[int] {
    fmt.Println("Starting new source execution...")
    return ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, errors.New("test error")
    }
    return i, nil
    }),
    )
    }),
    ro.ShareWithConfig(config),
    )

    // First subscriber - will trigger error
    sub1 := source.Subscribe(ro.NewObserver(
    func(value int) {
    fmt.Printf("Sub1: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Sub1 Error: %v\n", err)
    },
    func() {
    fmt.Println("Sub1 completion")
    },
    ))

    time.Sleep(100 * time.Millisecond)

    // Second subscriber - gets fresh source due to ResetOnError
    sub2 := source.Subscribe(ro.NewObserver(
    func(value int) {
    fmt.Printf("Sub2: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Sub2 Error: %v\n", err)
    },
    func() {
    fmt.Println("Sub2 completion")
    },
    ))

    time.Sleep(100 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    //
    // Starting new source execution...
    // Sub1: 1
    // Sub1: 2
    // Sub1 Error: test error
    // Starting new source execution... (ResetOnError triggered)
    // Sub2: 1
    // Sub2: 2
    // Sub2 Error: test error

    With completion reset behavior

    config := ShareConfig[string]{
    Connector: func() Subject[string] { return ro.NewPublishSubject[string]() },
    ResetOnError: false,
    ResetOnComplete: true, // Reset on completion
    }

    source := ro.Pipe(
    ro.Defer(func() Observable[string] {
    fmt.Println("New source execution...")
    return ro.Just("once", "twice")
    }),
    ro.ShareWithConfig(config),
    )

    sub1 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub1.Unsubscribe()

    // Second subscriber gets fresh source due to ResetOnComplete
    sub2 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub2.Unsubscribe()

    //
    // New source execution...
    // Next: once, Next: twice
    // New source execution... (ResetOnComplete triggered)
    // Next: once, Next: twice

    With ref count zero behavior

    config := ShareConfig[int]{
    Connector: func() Subject[int] { return ro.NewPublishSubject[int]() },
    ResetOnError: false,
    ResetOnComplete: false,
    ResetOnRefCountZero: true, // Reset when no subscribers left
    }

    source := ro.Pipe(
    ro.Defer(func() Observable[int] {
    fmt.Println("Source created...")
    return ro.Interval(100 * time.Millisecond).ro.Take[int64](5)
    }),
    ro.ShareWithConfig(config),
    )

    fmt.Println("First subscriber...")
    sub1 := source.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Sub1: %d\n", value)
    }))

    time.Sleep(350 * time.Millisecond)
    fmt.Println("Unsubscribing first...")
    sub1.Unsubscribe()

    time.Sleep(100 * time.Millisecond)

    fmt.Println("Second subscriber...")
    sub2 := source.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Sub2: %d\n", value)
    }))

    time.Sleep(350 * time.Millisecond)
    fmt.Println("Unsubscribing second...")
    sub2.Unsubscribe()

    //
    // First subscriber...
    // Source created...
    // Sub1: 0, Sub1: 1, Sub1: 2
    // Unsubscribing first...
    // Second subscriber...
    // Source created... (ResetOnRefCountZero triggered)
    // Sub2: 0, Sub2: 1, Sub2: 2
    // Unsubscribing second...

    With BehaviorSubject for initial value

    config := ShareConfig[int]{
    Connector: func() Subject[int] {
    // Use BehaviorSubject with initial value
    return ro.NewBehaviorSubject[int](42)
    },
    ResetOnError: false,
    ResetOnComplete: false,
    }

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.ShareWithConfig(config),
    )

    // First subscriber gets immediate initial value
    sub1 := obs.Subscribe(ro.OnNext(func(value int) {
    fmt.Printf("Sub1: %d\n", value)
    }))

    time.Sleep(50 * time.Millisecond)

    // Second subscriber gets current value
    sub2 := obs.Subscribe(ro.OnNext(func(value int) {
    fmt.Printf("Sub2: %d\n", value)
    }))

    time.Sleep(50 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    //
    // Sub1: 42 (initial value from BehaviorSubject)
    // Sub1: 1, Sub1: 2, Sub1: 3
    // Sub2: 3 (current value)

    With ReplaySubject for caching

    config := ShareConfig[string]{
    Connector: func() Subject[string] {
    // Use ReplaySubject to cache last 2 values
    return ro.NewReplaySubject[string](2)
    },
    ResetOnError: false,
    ResetOnComplete: false,
    }

    obs := ro.Pipe(
    ro.Just("first", "second", "third"),
    ro.ShareWithConfig(config),
    )

    sub1 := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub1.Unsubscribe()

    // Second subscriber gets replayed values
    sub2 := obs.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub2.Unsubscribe()

    //
    // First subscriber: first, second, third
    // Second subscriber: second, third (replayed from cache)
    Prototype:
    func ShareWithConfig[T any](config ShareConfig[T])
  • Creates a shared Observable that replays a specified number of items to future subscribers.

    // Create source that emits values over time
    source := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.Take[int64](5),
    ro.ShareReplay[int64](2), // Cache last 2 values
    )

    // First subscriber
    sub1 := source.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Sub1: %d\n", value)
    }))

    time.Sleep(350 * time.Millisecond) // Let first 3-4 values emit

    // Second subscriber joins later and gets replayed values
    sub2 := source.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Sub2: %d\n", value)
    }))

    time.Sleep(300 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    //
    // Sub1: 0
    // Sub1: 1
    // Sub1: 2
    // Sub2: 1 (replayed from cache)
    // Sub2: 2 (replayed from cache)
    // Sub1: 3
    // Sub2: 3
    // Sub1: 4
    // Sub2: 4

    With bufferSize 1 (latest value only)

    source := ro.Pipe(
    ro.Just("first", "second", "third", "fourth"),
    ro.ShareReplay[string](1), // Cache only latest value
    )

    sub1 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub1.Unsubscribe()

    // Second subscriber gets only the last value
    sub2 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub2.Unsubscribe()

    //
    // First subscriber: first, second, third, fourth
    // Second subscriber: fourth (only last value replayed)

    With bufferSize 0 (no replay, just sharing)

    source := ro.Pipe(
    ro.Interval(50 * time.Millisecond),
    ro.Take[int64](3),
    ro.ShareReplay[int64](0), // No replay, just sharing
    )

    sub1 := source.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Sub1: %d\n", value)
    }))

    time.Sleep(125 * time.Millisecond) // After 2-3 values

    sub2 := source.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Sub2: %d\n", value)
    }))

    time.Sleep(100 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    //
    // Sub1: 0
    // Sub1: 1
    // Sub2: 2 (sub2 starts here, no replay)
    // Sub1: 2

    With expensive operation caching

    // Simulate expensive API call
    expensiveAPI := func() Observable[string] {
    return ro.Defer(func() Observable[string] {
    fmt.Println("๐Ÿš€ Expensive API call...")
    time.Sleep(100 * time.Millisecond)
    return ro.Just("result1", "result2", "result3")
    })
    }

    // Cache the last 2 results
    cachedAPI := ro.Pipe(
    expensiveAPI(),
    ro.ShareReplay[string](2),
    )

    // First subscriber triggers API call
    sub1 := cachedAPI.Subscribe(ro.PrintObserver[string]())
    time.Sleep(200 * time.Millisecond)
    sub1.Unsubscribe()

    // Subsequent subscribers get cached results (no new API call)
    sub2 := cachedAPI.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub2.Unsubscribe()

    sub3 := cachedAPI.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub3.Unsubscribe()

    //
    // ๐Ÿš€ Expensive API call...
    // Sub1: result1, result2, result3
    // Sub2: result2, result3 (from cache)
    // Sub3: result2, result3 (from cache)

    With error handling

    source := ro.Pipe(
    Defer(func() Observable[int] {
    fmt.Println("Source execution...")
    return ro.Pipe(
    ro.Just(1, 2, 3),
    ro.MapErr(func(i int) (int, error) {
    if i == 3 {
    return 0, errors.New("api failure")
    }
    return i, nil
    }),
    )
    }),
    ro.ShareReplay[int](2),
    )

    sub1 := source.Subscribe(ro.NewObserver(
    func(value int) {
    fmt.Printf("Sub1: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Sub1 Error: %v\n", err)
    },
    func() {
    fmt.Println("Sub1 completion")
    },
    ))

    time.Sleep(100 * time.Millisecond)

    // Second subscriber gets replayed values before error
    sub2 := source.Subscribe(ro.NewObserver(
    func(value int) {
    fmt.Printf("Sub2: %d\n", value)
    },
    func(err error) {
    fmt.Printf("Sub2 Error: %v\n", err)
    },
    func() {
    fmt.Println("Sub2 completion")
    },
    ))

    time.Sleep(100 * time.Millisecond)
    sub1.Unsubscribe()
    sub2.Unsubscribe()

    //
    // Source execution...
    // Sub1: 1
    // Sub1: 2
    // Sub2: 1 (replayed)
    // Sub2: 2 (replayed)
    // Sub1 Error: api failure
    // Sub2 Error: api failure

    With hot observable and late subscribers

    // Create a hot observable with replay
    hotSource := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.Take[int64](8),
    ro.ShareReplay[int64](3), // Cache last 3 values
    )

    // Simulate subscribers joining at different times
    go func() {
    time.Sleep(0 * time.Millisecond)
    sub := hotSource.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Early sub: %d\n", value)
    }))
    time.Sleep(500 * time.Millisecond)
    sub.Unsubscribe()
    }()

    go func() {
    time.Sleep(250 * time.Millisecond)
    sub := hotSource.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Middle sub: %d\n", value)
    }))
    time.Sleep(400 * time.Millisecond)
    sub.Unsubscribe()
    }()

    go func() {
    time.Sleep(500 * time.Millisecond)
    sub := hotSource.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf("Late sub: %d\n", value)
    }))
    time.Sleep(400 * time.Millisecond)
    sub.Unsubscribe()
    }()

    time.Sleep(1200 * time.Millisecond)

    // Shows replay behavior for late subscribers

    With large buffer for complete history

    source := ro.Pipe(
    ro.Just("apple", "banana", "cherry", "date", "elderberry"),
    ro.ShareReplay[string](10), // Large enough for all values
    )

    sub1 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub1.Unsubscribe()

    // Subscribers joining later get complete history
    sub2 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub2.Unsubscribe()

    sub3 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub3.Unsubscribe()

    // All subscribers get the complete sequence
    // due to large replay buffer
    Prototype:
    func ShareReplay[T any](bufferSize int)
  • ShareReplayWithConfigโ€‹

    Creates a shared Observable with replay functionality and custom configuration. Provides control over buffer size and reset behavior.

    config := ShareReplayConfig{
    ResetOnRefCountZero: true, // Reset when no subscribers left
    }

    source := ro.Pipe(
    ro.Just("first", "second", "third"),
    ro.ShareReplayWithConfig[string](2, config), // Cache last 2 values
    )

    sub1 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub1.Unsubscribe()

    // Second subscriber gets replayed values
    sub2 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub2.Unsubscribe()

    //
    // Sub1: first, second, third
    // Sub2: second, third (replayed from cache)

    With ResetOnRefCountZero disabled

    config := ShareReplayConfig{
    ResetOnRefCountZero: false, // Keep cache even when no subscribers
    }

    source := ro.Pipe(
    ro.Defer(func() ro.Observable[int] {
    fmt.Println("๐Ÿ”„ Creating new source...")
    return ro.Just(1, 2, 3, 4, 5)
    }),
    ro.ShareReplayWithConfig[int](3, config), // Cache last 3 values
    )

    // First subscriber triggers source creation
    sub1 := source.Subscribe(ro.PrintObserver[int]())
    time.Sleep(50 * time.Millisecond)
    sub1.Unsubscribe()

    // Wait a bit, then subscribe again
    time.Sleep(100 * time.Millisecond)
    fmt.Println("Subscribing again...")

    // Second subscriber gets cached values (no new source creation)
    sub2 := source.Subscribe(ro.PrintObserver[int]())
    time.Sleep(50 * time.Millisecond)
    sub2.Unsubscribe()

    //
    // ๐Ÿ”„ Creating new source...
    // Sub1: 1, 2, 3, 4, 5
    // Subscribing again...
    // Sub2: 3, 4, 5 (replayed from persistent cache)

    With ResetOnRefCountZero enabled

    config := ShareReplayConfig{
    ResetOnRefCountZero: true, // Reset cache when no subscribers
    }

    source := ro.Pipe(
    ro.Defer(func() ro.Observable[string] {
    fmt.Println("๐Ÿ”„ New source execution...")
    return ro.Just("hello", "world", "again")
    }),
    ro.ShareReplayWithConfig[string](2, config),
    )

    // First subscriber
    sub1 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub1.Unsubscribe()

    // Cache resets after all unsubscribe
    time.Sleep(50 * time.Millisecond)

    // Second subscriber triggers new source creation
    sub2 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub2.Unsubscribe()

    //
    // ๐Ÿ”„ New source execution...
    // Sub1: hello, world, again
    // ๐Ÿ”„ New source execution... (cache was reset)
    // Sub2: hello, world, again

    With large buffer and persistent cache

    config := ShareReplayConfig{
    ResetOnRefCountZero: false, // Keep cache forever
    }

    source := ro.Pipe(
    ro.Just("data1", "data2", "data3", "data4", "data5"),
    ro.ShareReplayWithConfig[string](10, config), // Large buffer
    )

    sub1 := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub1.Unsubscribe()

    // Multiple subscribers can get complete history
    for i := 2; i <= 4; i++ {
    sub := source.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub.Unsubscribe()
    }

    // All subscribers get the complete sequence
    // due to persistent large cache

    With expensive operation and persistent cache

    config := ShareReplayConfig{
    ResetOnRefCountZero: false, // Cache expensive results
    }

    expensiveOperation := func() ro.Observable[string] {
    return ro.Defer(func() ro.Observable[string] {
    fmt.Println("๐Ÿ’ธ Expensive database query...")
    time.Sleep(200 * time.Millisecond)
    return ro.Just("user1", "user2", "user3")
    })
    }

    // Cache the expensive operation results
    cachedUsers := ro.Pipe(
    expensiveOperation(),
    ro.ShareReplayWithConfig[string](5, config),
    )

    // Multiple subscribers over time without re-querying
    for i := 1; i <= 3; i++ {
    time.Sleep(300 * time.Millisecond)
    fmt.Printf("Query %d:\n", i)
    sub := cachedUsers.Subscribe(ro.PrintObserver[string]())
    time.Sleep(50 * time.Millisecond)
    sub.Unsubscribe()
    }

    //
    // ๐Ÿ’ธ Expensive database query...
    // Query 1: user1, user2, user3
    // Query 2: user1, user2, user3 (from cache)
    // Query 3: user1, user2, user3 (from cache)

    With real-time data stream

    config := ShareReplayConfig{
    ResetOnRefCountZero: false, // Keep latest data available
    }

    // Simulate real-time price updates
    priceStream := ro.Pipe(
    ro.Interval(1 * time.Second),
    ro.Map(func(_ int64) float64 {
    return 100 + rand.Float64()*10 // Price between 100-110
    }),
    ro.ShareReplayWithConfig[float64](1, config), // Keep only latest price
    )

    // Multiple price checkers
    for i := 1; i <= 3; i++ {
    go func(checkerID int) {
    time.Sleep(time.Duration(checkerID) * 500 * time.Millisecond)
    sub := priceStream.Subscribe(ro.OnNext(func(price float64) {
    fmt.Printf("Checker %d: Price $%.2f\n", checkerID, price)
    }))
    time.Sleep(2 * time.Second)
    sub.Unsubscribe()
    }(i)
    }

    time.Sleep(4 * time.Second)

    // Each checker gets the latest available price
    // when they subscribe

    With error handling and persistent cache

    config := ShareReplayConfig{
    ResetOnRefCountZero: false, // Keep error state too
    }

    source := ro.Pipe(
    ro.Defer(func() ro.Observable[int] {
    fmt.Println("๐Ÿ”„ Attempting operation...")
    if rand.Intn(3) == 0 {
    return ro.Throw[int](errors.New("random failure"))
    }
    return ro.Just(42, 84, 126)
    }),
    ro.ShareReplayWithConfig[int](3, config),
    )

    // Multiple attempts may get cached error or success
    for i := 1; i <= 3; i++ {
    time.Sleep(200 * time.Millisecond)
    fmt.Printf("Attempt %d:\n", i)
    sub := source.Subscribe(ro.NewObserver(
    func(value int) {
    fmt.Printf(" Success: %d\n", value)
    },
    func(err error) {
    fmt.Printf(" Error: %v\n", err)
    },
    func() {
    fmt.Println(" Completed")
    },
    ))
    time.Sleep(50 * time.Millisecond)
    sub.Unsubscribe()
    }

    // If first attempt fails, subsequent attempts get cached error
    // If first succeeds, subsequent attempts get cached success

    With buffer management

    config := ShareReplayConfig{
    ResetOnRefCountZero: true,
    }

    // Stream with varying data rates
    dataStream := ro.Pipe(
    ro.Interval(100 * time.Millisecond),
    ro.Take[int64](20),
    ro.ShareReplayWithConfig[int64](5, config), // Keep last 5 values
    )

    // Simulate periodic subscribers
    for i := 0; i < 4; i++ {
    go func(batch int) {
    time.Sleep(time.Duration(batch) * 300 * time.Millisecond)
    fmt.Printf("Batch %d subscribing:\n", batch+1)
    sub := dataStream.Subscribe(ro.OnNext(func(value int64) {
    fmt.Printf(" B%d: %d\n", batch+1, value)
    }))
    time.Sleep(400 * time.Millisecond)
    sub.Unsubscribe()
    fmt.Printf("Batch %d done\n", batch+1)
    }(i)
    }

    time.Sleep(1500 * time.Millisecond)

    // Shows how each batch gets replayed last 5 values
    // from when they subscribed
    Prototype:
    func ShareReplayWithConfig[T any](bufferSize int, config ShareReplayConfig)