This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Connectable operatorsโ
This page lists all connectable operators, available in the core package of ro.
Shareโ
Creates a new Observable that multicasts (shares) the original Observable. This allows multiple subscribers to share the same underlying subscription.
// Without Share - each subscriber gets separate executionsource := ro.Interval(100 * time.Millisecond).Take[int64](5)obs1 := sourceobs2 := sourcesub1 := obs1.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Sub1: %d\n", value)}))sub2 := obs2.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Sub2: %d\n", value)}))time.Sleep(600 * time.Millisecond)sub1.Unsubscribe()sub2.Unsubscribe()//// Sub1: 0// Sub2: 0// Sub1: 1// Sub2: 1// ... (each subscriber gets all values independently)With Share - shared execution
// With Share - subscribers share the same executionsource := ro.Pipe[int64, int64](ro.Interval(100 * time.Millisecond),ro.Take[int64](5),ro.Share[int64](), // Share the observable)sub1 := source.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Shared Sub1: %d\n", value)}))// Second subscriber subscribes latertime.Sleep(250 * time.Millisecond)sub2 := source.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Shared Sub2: %d\n", value)}))time.Sleep(500 * time.Millisecond)sub1.Unsubscribe()sub2.Unsubscribe()//// Shared Sub1: 0// Shared Sub1: 1// Shared Sub2: 2 (sub2 starts here)// Shared Sub1: 2// Shared Sub2: 3// Shared Sub1: 3// Shared Sub2: 4// Shared Sub1: 4// (both subscribers share the same sequence)With expensive operations
// Simulate expensive API callexpensiveOperation := func() Observable[string] {return ro.Defer(func() Observable[string] {fmt.Println("Expensive API call started...")time.Sleep(100 * time.Millisecond)return ro.Just("api_result_1", "api_result_2")})}// Without Share - each subscriber triggers separate API callwithoutShare := expensiveOperation()sub1 := withoutShare.Subscribe(ro.PrintObserver[string]())sub2 := withoutShare.Subscribe(ro.PrintObserver[string]())time.Sleep(200 * time.Millisecond)sub1.Unsubscribe()sub2.Unsubscribe()//// Expensive API call started...// Expensive API call started...// Next: api_result_1, Next: api_result_2 (twice)// With Share - API call sharedwithShare := ro.Pipe[string, string](expensiveOperation(),Share[string](),)sub3 := withShare.Subscribe(ro.PrintObserver[string]())sub4 := withShare.Subscribe(ro.PrintObserver[string]())time.Sleep(200 * time.Millisecond)sub3.Unsubscribe()sub4.Unsubscribe()//// Expensive API call started...// Next: api_result_1, Next: api_result_2 (once, shared)With error handling
source := ro.Pipe[int, int](Defer(func() Observable[int] {fmt.Println("Source execution started...")return ro.Pipe[int, int](ro.Just(1, 2, 3),ro.MapErr(func(i int) (int, error) {if i == 3 {return 0, errors.New("something went wrong")}return i, nil}),)}),Share[int](),)sub1 := source.Subscribe(ro.NewObserver(func(value int) {fmt.Printf("Sub1: %d\n", value)},func(err error) {fmt.Printf("Sub1 Error: %v\n", err)},func() {fmt.Println("Sub1 completion")},))sub2 := source.Subscribe(ro.NewObserver(func(value int) {fmt.Printf("Sub2: %d\n", value)},func(err error) {fmt.Printf("Sub2 Error: %v\n", err)},func() {fmt.Println("Sub2 completion")},))time.Sleep(100 * time.Millisecond)sub1.Unsubscribe()sub2.Unsubscribe()//// Source execution started...// Sub1: 1// Sub2: 1// Sub1: 2// Sub2: 2// Sub1 Error: something went wrong// Sub2 Error: something went wrongWith hot observable
// Create a hot observable (starts immediately)hotSource := ro.Pipe[int64, int64](ro.Interval(100 * time.Millisecond),Take[int64](10),ro.Share[int64](), // Make it hot and shareable)// Multiple subscribers can join at different timesvar subs []*Subscriptionfor i := 0; i < 3; i++ {go func(idx int) {time.Sleep(time.Duration(idx) * 150 * time.Millisecond)sub := hotSource.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Subscriber %d: %d\n", idx, value)}))time.Sleep(500 * time.Millisecond)sub.Unsubscribe()}(i)}time.Sleep(1200 * time.Millisecond)// Each subscriber starts at different times but gets shared values// from the point they subscribe onwardWith reference counting
source := ro.Pipe[int64, int64](ro.Interval(100 * time.Millisecond),ro.Share[int64](),)fmt.Println("Creating first subscription...")sub1 := source.Subscribe(ro.PrintObserver[int64]())time.Sleep(250 * time.Millisecond)fmt.Println("Creating second subscription...")sub2 := source.Subscribe(ro.PrintObserver[int64]())time.Sleep(250 * time.Millisecond)fmt.Println("Unsubscribing first...")sub1.Unsubscribe()time.Sleep(250 * time.Millisecond)fmt.Println("Unsubscribing second...")sub2.Unsubscribe()fmt.Println("All subscriptions done")// The shared observable manages reference counting automatically// Values are emitted while at least one subscription is activeSimilar:Prototype:func Share[T any]()ShareWithConfigโ
Creates a shared Observable with customizable configuration. Allows fine-grained control over subject selection and reset behavior.
config := ShareConfig[string]{Connector: func() Subject[string] { return ro.NewPublishSubject[string]() },ResetOnError: true,ResetOnComplete: true,ResetOnRefCountZero: true,}obs := ro.Pipe[string, string](ro.Just("hello", "world"),ro.ShareWithConfig(config),)sub1 := obs.Subscribe(ro.PrintObserver[string]())sub2 := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(100 * time.Millisecond)sub1.Unsubscribe()sub2.Unsubscribe()With custom subject
// Custom subject that logs all operationstype LoggingSubject struct {*PublishSubject[string]id string}func NewLoggingSubject(id string) *LoggingSubject {return &LoggingSubject{PublishSubject: ro.NewPublishSubject[string](),id: id,}}config := ShareConfig[string]{Connector: func() Subject[string] {subject := ro.NewLoggingSubject("custom-subject")fmt.Printf("Created new logging subject: %s\n", subject.id)return subject},ResetOnError: true,}obs := ro.Pipe[string, string](ro.Just("data1", "data2"),ro.ShareWithConfig(config),)sub := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub.Unsubscribe()With error reset behavior
config := ShareConfig[int]{Connector: func() Subject[int] { return ro.NewPublishSubject[int]() },ResetOnError: true, // Reset on errorResetOnComplete: false,}source := ro.Pipe[int, int](ro.Defer(func() Observable[int] {fmt.Println("Starting new source execution...")return ro.Pipe[int, int](ro.Just(1, 2, 3),ro.MapErr(func(i int) (int, error) {if i == 3 {return 0, errors.New("test error")}return i, nil}),)}),ro.ShareWithConfig(config),)// First subscriber - will trigger errorsub1 := source.Subscribe(ro.NewObserver(func(value int) {fmt.Printf("Sub1: %d\n", value)},func(err error) {fmt.Printf("Sub1 Error: %v\n", err)},func() {fmt.Println("Sub1 completion")},))time.Sleep(100 * time.Millisecond)// Second subscriber - gets fresh source due to ResetOnErrorsub2 := source.Subscribe(ro.NewObserver(func(value int) {fmt.Printf("Sub2: %d\n", value)},func(err error) {fmt.Printf("Sub2 Error: %v\n", err)},func() {fmt.Println("Sub2 completion")},))time.Sleep(100 * time.Millisecond)sub1.Unsubscribe()sub2.Unsubscribe()//// Starting new source execution...// Sub1: 1// Sub1: 2// Sub1 Error: test error// Starting new source execution... (ResetOnError triggered)// Sub2: 1// Sub2: 2// Sub2 Error: test errorWith completion reset behavior
config := ShareConfig[string]{Connector: func() Subject[string] { return ro.NewPublishSubject[string]() },ResetOnError: false,ResetOnComplete: true, // Reset on completion}source := ro.Pipe[string, string](ro.Defer(func() Observable[string] {fmt.Println("New source execution...")return ro.Just("once", "twice")}),ro.ShareWithConfig(config),)sub1 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub1.Unsubscribe()// Second subscriber gets fresh source due to ResetOnCompletesub2 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub2.Unsubscribe()//// New source execution...// Next: once, Next: twice// New source execution... (ResetOnComplete triggered)// Next: once, Next: twiceWith ref count zero behavior
config := ShareConfig[int]{Connector: func() Subject[int] { return ro.NewPublishSubject[int]() },ResetOnError: false,ResetOnComplete: false,ResetOnRefCountZero: true, // Reset when no subscribers left}source := ro.Pipe[int64, int64](ro.Defer(func() Observable[int64] {fmt.Println("Source created...")return ro.Interval(100 * time.Millisecond).ro.Take[int64](5)}),ro.ShareWithConfig(config),)fmt.Println("First subscriber...")sub1 := source.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Sub1: %d\n", value)}))time.Sleep(350 * time.Millisecond)fmt.Println("Unsubscribing first...")sub1.Unsubscribe()time.Sleep(100 * time.Millisecond)fmt.Println("Second subscriber...")sub2 := source.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Sub2: %d\n", value)}))time.Sleep(350 * time.Millisecond)fmt.Println("Unsubscribing second...")sub2.Unsubscribe()//// First subscriber...// Source created...// Sub1: 0, Sub1: 1, Sub1: 2// Unsubscribing first...// Second subscriber...// Source created... (ResetOnRefCountZero triggered)// Sub2: 0, Sub2: 1, Sub2: 2// Unsubscribing second...With BehaviorSubject for initial value
config := ShareConfig[int]{Connector: func() Subject[int] {// Use BehaviorSubject with initial valuereturn ro.NewBehaviorSubject[int](42)},ResetOnError: false,ResetOnComplete: false,}obs := ro.Pipe[int, int](ro.Just(1, 2, 3),ro.ShareWithConfig(config),)// First subscriber gets immediate initial valuesub1 := obs.Subscribe(ro.OnNext(func(value int) {fmt.Printf("Sub1: %d\n", value)}))time.Sleep(50 * time.Millisecond)// Second subscriber gets current valuesub2 := obs.Subscribe(ro.OnNext(func(value int) {fmt.Printf("Sub2: %d\n", value)}))time.Sleep(50 * time.Millisecond)sub1.Unsubscribe()sub2.Unsubscribe()//// Sub1: 42 (initial value from BehaviorSubject)// Sub1: 1, Sub1: 2, Sub1: 3// Sub2: 3 (current value)With ReplaySubject for caching
config := ShareConfig[string]{Connector: func() Subject[string] {// Use ReplaySubject to cache last 2 valuesreturn ro.NewReplaySubject[string](2)},ResetOnError: false,ResetOnComplete: false,}obs := ro.Pipe[string, string](ro.Just("first", "second", "third"),ro.ShareWithConfig(config),)sub1 := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub1.Unsubscribe()// Second subscriber gets replayed valuessub2 := obs.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub2.Unsubscribe()//// First subscriber: first, second, third// Second subscriber: second, third (replayed from cache)Similar:Prototype:func ShareWithConfig[T any](config ShareConfig[T])ShareReplayโ
Creates a shared Observable that replays a specified number of items to future subscribers.
// Create source that emits values over timesource := ro.Pipe[int64, int64](ro.Interval(100 * time.Millisecond),ro.Take[int64](5),ro.ShareReplay[int64](2), // Cache last 2 values)// First subscribersub1 := source.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Sub1: %d\n", value)}))time.Sleep(350 * time.Millisecond) // Let first 3-4 values emit// Second subscriber joins later and gets replayed valuessub2 := source.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Sub2: %d\n", value)}))time.Sleep(300 * time.Millisecond)sub1.Unsubscribe()sub2.Unsubscribe()//// Sub1: 0// Sub1: 1// Sub1: 2// Sub2: 1 (replayed from cache)// Sub2: 2 (replayed from cache)// Sub1: 3// Sub2: 3// Sub1: 4// Sub2: 4With bufferSize 1 (latest value only)
source := ro.Pipe[string, string](ro.Just("first", "second", "third", "fourth"),ro.ShareReplay[string](1), // Cache only latest value)sub1 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub1.Unsubscribe()// Second subscriber gets only the last valuesub2 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub2.Unsubscribe()//// First subscriber: first, second, third, fourth// Second subscriber: fourth (only last value replayed)With bufferSize 0 (no replay, just sharing)
source := ro.Pipe[int64, int64](ro.Interval(50 * time.Millisecond),ro.Take[int64](3),ro.ShareReplay[int64](0), // No replay, just sharing)sub1 := source.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Sub1: %d\n", value)}))time.Sleep(125 * time.Millisecond) // After 2-3 valuessub2 := source.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Sub2: %d\n", value)}))time.Sleep(100 * time.Millisecond)sub1.Unsubscribe()sub2.Unsubscribe()//// Sub1: 0// Sub1: 1// Sub2: 2 (sub2 starts here, no replay)// Sub1: 2With expensive operation caching
// Simulate expensive API callexpensiveAPI := func() Observable[string] {return ro.Defer(func() Observable[string] {fmt.Println("๐ Expensive API call...")time.Sleep(100 * time.Millisecond)return ro.Just("result1", "result2", "result3")})}// Cache the last 2 resultscachedAPI := ro.Pipe[string, string](expensiveAPI(),ro.ShareReplay[string](2),)// First subscriber triggers API callsub1 := cachedAPI.Subscribe(ro.PrintObserver[string]())time.Sleep(200 * time.Millisecond)sub1.Unsubscribe()// Subsequent subscribers get cached results (no new API call)sub2 := cachedAPI.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub2.Unsubscribe()sub3 := cachedAPI.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub3.Unsubscribe()//// ๐ Expensive API call...// Sub1: result1, result2, result3// Sub2: result2, result3 (from cache)// Sub3: result2, result3 (from cache)With error handling
source := ro.Pipe[int, int](Defer(func() Observable[int] {fmt.Println("Source execution...")return ro.Pipe[int, int](ro.Just(1, 2, 3),ro.MapErr(func(i int) (int, error) {if i == 3 {return 0, errors.New("api failure")}return i, nil}),)}),ro.ShareReplay[int](2),)sub1 := source.Subscribe(ro.NewObserver(func(value int) {fmt.Printf("Sub1: %d\n", value)},func(err error) {fmt.Printf("Sub1 Error: %v\n", err)},func() {fmt.Println("Sub1 completion")},))time.Sleep(100 * time.Millisecond)// Second subscriber gets replayed values before errorsub2 := source.Subscribe(ro.NewObserver(func(value int) {fmt.Printf("Sub2: %d\n", value)},func(err error) {fmt.Printf("Sub2 Error: %v\n", err)},func() {fmt.Println("Sub2 completion")},))time.Sleep(100 * time.Millisecond)sub1.Unsubscribe()sub2.Unsubscribe()//// Source execution...// Sub1: 1// Sub1: 2// Sub2: 1 (replayed)// Sub2: 2 (replayed)// Sub1 Error: api failure// Sub2 Error: api failureWith hot observable and late subscribers
// Create a hot observable with replayhotSource := ro.Pipe[int64, int64](ro.Interval(100 * time.Millisecond),ro.Take[int64](8),ro.ShareReplay[int64](3), // Cache last 3 values)// Simulate subscribers joining at different timesgo func() {time.Sleep(0 * time.Millisecond)sub := hotSource.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Early sub: %d\n", value)}))time.Sleep(500 * time.Millisecond)sub.Unsubscribe()}()go func() {time.Sleep(250 * time.Millisecond)sub := hotSource.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Middle sub: %d\n", value)}))time.Sleep(400 * time.Millisecond)sub.Unsubscribe()}()go func() {time.Sleep(500 * time.Millisecond)sub := hotSource.Subscribe(ro.OnNext(func(value int64) {fmt.Printf("Late sub: %d\n", value)}))time.Sleep(400 * time.Millisecond)sub.Unsubscribe()}()time.Sleep(1200 * time.Millisecond)// Shows replay behavior for late subscribersWith large buffer for complete history
source := ro.Pipe[string, string](ro.Just("apple", "banana", "cherry", "date", "elderberry"),ro.ShareReplay[string](10), // Large enough for all values)sub1 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub1.Unsubscribe()// Subscribers joining later get complete historysub2 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub2.Unsubscribe()sub3 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub3.Unsubscribe()// All subscribers get the complete sequence// due to large replay bufferSimilar:Prototype:func ShareReplay[T any](bufferSize int)ShareReplayWithConfigโ
Creates a shared Observable with replay functionality and custom configuration. Provides control over buffer size and reset behavior.
config := ShareReplayConfig{ResetOnRefCountZero: true, // Reset when no subscribers left}source := ro.Pipe[string, string](ro.Just("first", "second", "third"),ro.ShareReplayWithConfig[string](2, config), // Cache last 2 values)sub1 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub1.Unsubscribe()// Second subscriber gets replayed valuessub2 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub2.Unsubscribe()//// Sub1: first, second, third// Sub2: second, third (replayed from cache)With ResetOnRefCountZero disabled
config := ShareReplayConfig{ResetOnRefCountZero: false, // Keep cache even when no subscribers}source := ro.Pipe[int, int](ro.Defer(func() ro.Observable[int] {fmt.Println("๐ Creating new source...")return ro.Just(1, 2, 3, 4, 5)}),ro.ShareReplayWithConfig[int](3, config), // Cache last 3 values)// First subscriber triggers source creationsub1 := source.Subscribe(ro.PrintObserver[int]())time.Sleep(50 * time.Millisecond)sub1.Unsubscribe()// Wait a bit, then subscribe againtime.Sleep(100 * time.Millisecond)fmt.Println("Subscribing again...")// Second subscriber gets cached values (no new source creation)sub2 := source.Subscribe(ro.PrintObserver[int]())time.Sleep(50 * time.Millisecond)sub2.Unsubscribe()//// ๐ Creating new source...// Sub1: 1, 2, 3, 4, 5// Subscribing again...// Sub2: 3, 4, 5 (replayed from persistent cache)With ResetOnRefCountZero enabled
config := ShareReplayConfig{ResetOnRefCountZero: true, // Reset cache when no subscribers}source := ro.Pipe[string, string](ro.Defer(func() ro.Observable[string] {fmt.Println("๐ New source execution...")return ro.Just("hello", "world", "again")}),ro.ShareReplayWithConfig[string](2, config),)// First subscribersub1 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub1.Unsubscribe()// Cache resets after all unsubscribetime.Sleep(50 * time.Millisecond)// Second subscriber triggers new source creationsub2 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub2.Unsubscribe()//// ๐ New source execution...// Sub1: hello, world, again// ๐ New source execution... (cache was reset)// Sub2: hello, world, againWith large buffer and persistent cache
config := ShareReplayConfig{ResetOnRefCountZero: false, // Keep cache forever}source := ro.Pipe[string, string](ro.Just("data1", "data2", "data3", "data4", "data5"),ro.ShareReplayWithConfig[string](10, config), // Large buffer)sub1 := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub1.Unsubscribe()// Multiple subscribers can get complete historyfor i := 2; i <= 4; i++ {sub := source.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub.Unsubscribe()}// All subscribers get the complete sequence// due to persistent large cacheWith expensive operation and persistent cache
config := ShareReplayConfig{ResetOnRefCountZero: false, // Cache expensive results}expensiveOperation := func() ro.Observable[string] {return ro.Defer(func() ro.Observable[string] {fmt.Println("๐ธ Expensive database query...")time.Sleep(200 * time.Millisecond)return ro.Just("user1", "user2", "user3")})}// Cache the expensive operation resultscachedUsers := ro.Pipe[string, string](expensiveOperation(),ro.ShareReplayWithConfig[string](5, config),)// Multiple subscribers over time without re-queryingfor i := 1; i <= 3; i++ {time.Sleep(300 * time.Millisecond)fmt.Printf("Query %d:\n", i)sub := cachedUsers.Subscribe(ro.PrintObserver[string]())time.Sleep(50 * time.Millisecond)sub.Unsubscribe()}//// ๐ธ Expensive database query...// Query 1: user1, user2, user3// Query 2: user1, user2, user3 (from cache)// Query 3: user1, user2, user3 (from cache)With real-time data stream
config := ShareReplayConfig{ResetOnRefCountZero: false, // Keep latest data available}// Simulate real-time price updatespriceStream := ro.Pipe[int64, float64](ro.Interval(1 * time.Second),ro.Map(func(_ int64) float64 {return 100 + rand.Float64()*10 // Price between 100-110}),ro.ShareReplayWithConfig[float64](1, config), // Keep only latest price)// Multiple price checkersfor i := 1; i <= 3; i++ {go func(checkerID int) {time.Sleep(time.Duration(checkerID) * 500 * time.Millisecond)sub := priceStream.Subscribe(ro.OnNext(func(price float64) {fmt.Printf("Checker %d: Price $%.2f\n", checkerID, price)}))time.Sleep(2 * time.Second)sub.Unsubscribe()}(i)}time.Sleep(4 * time.Second)// Each checker gets the latest available price// when they subscribeWith error handling and persistent cache
config := ShareReplayConfig{ResetOnRefCountZero: false, // Keep error state too}source := ro.Pipe[int, int](ro.Defer(func() ro.Observable[int] {fmt.Println("๐ Attempting operation...")if rand.Intn(3) == 0 {return ro.Throw[int](errors.New("random failure"))}return ro.Just(42, 84, 126)}),ro.ShareReplayWithConfig[int](3, config),)// Multiple attempts may get cached error or successfor i := 1; i <= 3; i++ {time.Sleep(200 * time.Millisecond)fmt.Printf("Attempt %d:\n", i)sub := source.Subscribe(ro.NewObserver(func(value int) {fmt.Printf(" Success: %d\n", value)},func(err error) {fmt.Printf(" Error: %v\n", err)},func() {fmt.Println(" Completed")},))time.Sleep(50 * time.Millisecond)sub.Unsubscribe()}// If first attempt fails, subsequent attempts get cached error// If first succeeds, subsequent attempts get cached successWith buffer management
config := ShareReplayConfig{ResetOnRefCountZero: true,}// Stream with varying data ratesdataStream := ro.Pipe[int64, int64](ro.Interval(100 * time.Millisecond),ro.Take[int64](20),ro.ShareReplayWithConfig[int64](5, config), // Keep last 5 values)// Simulate periodic subscribersfor i := 0; i < 4; i++ {go func(batch int) {time.Sleep(time.Duration(batch) * 300 * time.Millisecond)fmt.Printf("Batch %d subscribing:\n", batch+1)sub := dataStream.Subscribe(ro.OnNext(func(value int64) {fmt.Printf(" B%d: %d\n", batch+1, value)}))time.Sleep(400 * time.Millisecond)sub.Unsubscribe()fmt.Printf("Batch %d done\n", batch+1)}(i)}time.Sleep(1500 * time.Millisecond)// Shows how each batch gets replayed last 5 values// from when they subscribedSimilar:Prototype:func ShareReplayWithConfig[T any](bufferSize int, config ShareReplayConfig)