This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Connectable operatorsโ
This page lists all connectable operators, available in the core package of ro.
Shareโ
Creates a new Observable that multicasts (shares) the original Observable. This allows multiple subscribers to share the same underlying subscription.
// Without Share - each subscriber gets separate execution
source := ro.Interval(100 * time.Millisecond).Take[int64](5)
obs1 := source
obs2 := source
sub1 := obs1.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Sub1: %d\n", value)
}))
sub2 := obs2.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Sub2: %d\n", value)
}))
time.Sleep(600 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
//
// Sub1: 0
// Sub2: 0
// Sub1: 1
// Sub2: 1
// ... (each subscriber gets all values independently)With Share - shared execution
// With Share - subscribers share the same execution
source := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.Take[int64](5),
ro.Share[int64](), // Share the observable
)
sub1 := source.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Shared Sub1: %d\n", value)
}))
// Second subscriber subscribes later
time.Sleep(250 * time.Millisecond)
sub2 := source.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Shared Sub2: %d\n", value)
}))
time.Sleep(500 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
//
// Shared Sub1: 0
// Shared Sub1: 1
// Shared Sub2: 2 (sub2 starts here)
// Shared Sub1: 2
// Shared Sub2: 3
// Shared Sub1: 3
// Shared Sub2: 4
// Shared Sub1: 4
// (both subscribers share the same sequence)With expensive operations
// Simulate expensive API call
expensiveOperation := func() Observable[string] {
return ro.Defer(func() Observable[string] {
fmt.Println("Expensive API call started...")
time.Sleep(100 * time.Millisecond)
return ro.Just("api_result_1", "api_result_2")
})
}
// Without Share - each subscriber triggers separate API call
withoutShare := expensiveOperation()
sub1 := withoutShare.Subscribe(ro.PrintObserver[string]())
sub2 := withoutShare.Subscribe(ro.PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
//
// Expensive API call started...
// Expensive API call started...
// Next: api_result_1, Next: api_result_2 (twice)
// With Share - API call shared
withShare := ro.Pipe(
expensiveOperation(),
Share[string](),
)
sub3 := withShare.Subscribe(ro.PrintObserver[string]())
sub4 := withShare.Subscribe(ro.PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
sub3.Unsubscribe()
sub4.Unsubscribe()
//
// Expensive API call started...
// Next: api_result_1, Next: api_result_2 (once, shared)With error handling
source := ro.Pipe(
Defer(func() Observable[int] {
fmt.Println("Source execution started...")
return ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, errors.New("something went wrong")
}
return i, nil
}),
)
}),
Share[int](),
)
sub1 := source.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Sub1: %d\n", value)
},
func(err error) {
fmt.Printf("Sub1 Error: %v\n", err)
},
func() {
fmt.Println("Sub1 completion")
},
))
sub2 := source.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Sub2: %d\n", value)
},
func(err error) {
fmt.Printf("Sub2 Error: %v\n", err)
},
func() {
fmt.Println("Sub2 completion")
},
))
time.Sleep(100 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
//
// Source execution started...
// Sub1: 1
// Sub2: 1
// Sub1: 2
// Sub2: 2
// Sub1 Error: something went wrong
// Sub2 Error: something went wrongWith hot observable
// Create a hot observable (starts immediately)
hotSource := ro.Pipe(
ro.Interval(100 * time.Millisecond),
Take[int64](10),
ro.Share[int64](), // Make it hot and shareable
)
// Multiple subscribers can join at different times
var subs []*Subscription
for i := 0; i < 3; i++ {
go func(idx int) {
time.Sleep(time.Duration(idx) * 150 * time.Millisecond)
sub := hotSource.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Subscriber %d: %d\n", idx, value)
}))
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
}(i)
}
time.Sleep(1200 * time.Millisecond)
// Each subscriber starts at different times but gets shared values
// from the point they subscribe onwardWith reference counting
source := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.Share[int64](),
)
fmt.Println("Creating first subscription...")
sub1 := source.Subscribe(ro.PrintObserver[int64]())
time.Sleep(250 * time.Millisecond)
fmt.Println("Creating second subscription...")
sub2 := source.Subscribe(ro.PrintObserver[int64]())
time.Sleep(250 * time.Millisecond)
fmt.Println("Unsubscribing first...")
sub1.Unsubscribe()
time.Sleep(250 * time.Millisecond)
fmt.Println("Unsubscribing second...")
sub2.Unsubscribe()
fmt.Println("All subscriptions done")
// The shared observable manages reference counting automatically
// Values are emitted while at least one subscription is activeSimilar:Prototype:func Share[T any]()
ShareWithConfigโ
Creates a shared Observable with customizable configuration. Allows fine-grained control over subject selection and reset behavior.
config := ShareConfig[string]{
Connector: func() Subject[string] { return ro.NewPublishSubject[string]() },
ResetOnError: true,
ResetOnComplete: true,
ResetOnRefCountZero: true,
}
obs := ro.Pipe(
ro.Just("hello", "world"),
ro.ShareWithConfig(config),
)
sub1 := obs.Subscribe(ro.PrintObserver[string]())
sub2 := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(100 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()With custom subject
// Custom subject that logs all operations
type LoggingSubject struct {
*PublishSubject[string]
id string
}
func NewLoggingSubject(id string) *LoggingSubject {
return &LoggingSubject{
PublishSubject: ro.NewPublishSubject[string](),
id: id,
}
}
config := ShareConfig[string]{
Connector: func() Subject[string] {
subject := ro.NewLoggingSubject("custom-subject")
fmt.Printf("Created new logging subject: %s\n", subject.id)
return subject
},
ResetOnError: true,
}
obs := ro.Pipe(
ro.Just("data1", "data2"),
ro.ShareWithConfig(config),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub.Unsubscribe()With error reset behavior
config := ShareConfig[int]{
Connector: func() Subject[int] { return ro.NewPublishSubject[int]() },
ResetOnError: true, // Reset on error
ResetOnComplete: false,
}
source := ro.Pipe(
ro.Defer(func() Observable[int] {
fmt.Println("Starting new source execution...")
return ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, errors.New("test error")
}
return i, nil
}),
)
}),
ro.ShareWithConfig(config),
)
// First subscriber - will trigger error
sub1 := source.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Sub1: %d\n", value)
},
func(err error) {
fmt.Printf("Sub1 Error: %v\n", err)
},
func() {
fmt.Println("Sub1 completion")
},
))
time.Sleep(100 * time.Millisecond)
// Second subscriber - gets fresh source due to ResetOnError
sub2 := source.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Sub2: %d\n", value)
},
func(err error) {
fmt.Printf("Sub2 Error: %v\n", err)
},
func() {
fmt.Println("Sub2 completion")
},
))
time.Sleep(100 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
//
// Starting new source execution...
// Sub1: 1
// Sub1: 2
// Sub1 Error: test error
// Starting new source execution... (ResetOnError triggered)
// Sub2: 1
// Sub2: 2
// Sub2 Error: test errorWith completion reset behavior
config := ShareConfig[string]{
Connector: func() Subject[string] { return ro.NewPublishSubject[string]() },
ResetOnError: false,
ResetOnComplete: true, // Reset on completion
}
source := ro.Pipe(
ro.Defer(func() Observable[string] {
fmt.Println("New source execution...")
return ro.Just("once", "twice")
}),
ro.ShareWithConfig(config),
)
sub1 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub1.Unsubscribe()
// Second subscriber gets fresh source due to ResetOnComplete
sub2 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub2.Unsubscribe()
//
// New source execution...
// Next: once, Next: twice
// New source execution... (ResetOnComplete triggered)
// Next: once, Next: twiceWith ref count zero behavior
config := ShareConfig[int]{
Connector: func() Subject[int] { return ro.NewPublishSubject[int]() },
ResetOnError: false,
ResetOnComplete: false,
ResetOnRefCountZero: true, // Reset when no subscribers left
}
source := ro.Pipe(
ro.Defer(func() Observable[int] {
fmt.Println("Source created...")
return ro.Interval(100 * time.Millisecond).ro.Take[int64](5)
}),
ro.ShareWithConfig(config),
)
fmt.Println("First subscriber...")
sub1 := source.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Sub1: %d\n", value)
}))
time.Sleep(350 * time.Millisecond)
fmt.Println("Unsubscribing first...")
sub1.Unsubscribe()
time.Sleep(100 * time.Millisecond)
fmt.Println("Second subscriber...")
sub2 := source.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Sub2: %d\n", value)
}))
time.Sleep(350 * time.Millisecond)
fmt.Println("Unsubscribing second...")
sub2.Unsubscribe()
//
// First subscriber...
// Source created...
// Sub1: 0, Sub1: 1, Sub1: 2
// Unsubscribing first...
// Second subscriber...
// Source created... (ResetOnRefCountZero triggered)
// Sub2: 0, Sub2: 1, Sub2: 2
// Unsubscribing second...With BehaviorSubject for initial value
config := ShareConfig[int]{
Connector: func() Subject[int] {
// Use BehaviorSubject with initial value
return ro.NewBehaviorSubject[int](42)
},
ResetOnError: false,
ResetOnComplete: false,
}
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.ShareWithConfig(config),
)
// First subscriber gets immediate initial value
sub1 := obs.Subscribe(ro.OnNext(func(value int) {
fmt.Printf("Sub1: %d\n", value)
}))
time.Sleep(50 * time.Millisecond)
// Second subscriber gets current value
sub2 := obs.Subscribe(ro.OnNext(func(value int) {
fmt.Printf("Sub2: %d\n", value)
}))
time.Sleep(50 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
//
// Sub1: 42 (initial value from BehaviorSubject)
// Sub1: 1, Sub1: 2, Sub1: 3
// Sub2: 3 (current value)With ReplaySubject for caching
config := ShareConfig[string]{
Connector: func() Subject[string] {
// Use ReplaySubject to cache last 2 values
return ro.NewReplaySubject[string](2)
},
ResetOnError: false,
ResetOnComplete: false,
}
obs := ro.Pipe(
ro.Just("first", "second", "third"),
ro.ShareWithConfig(config),
)
sub1 := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub1.Unsubscribe()
// Second subscriber gets replayed values
sub2 := obs.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub2.Unsubscribe()
//
// First subscriber: first, second, third
// Second subscriber: second, third (replayed from cache)Similar:Prototype:func ShareWithConfig[T any](config ShareConfig[T])
ShareReplayโ
Creates a shared Observable that replays a specified number of items to future subscribers.
// Create source that emits values over time
source := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.Take[int64](5),
ro.ShareReplay[int64](2), // Cache last 2 values
)
// First subscriber
sub1 := source.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Sub1: %d\n", value)
}))
time.Sleep(350 * time.Millisecond) // Let first 3-4 values emit
// Second subscriber joins later and gets replayed values
sub2 := source.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Sub2: %d\n", value)
}))
time.Sleep(300 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
//
// Sub1: 0
// Sub1: 1
// Sub1: 2
// Sub2: 1 (replayed from cache)
// Sub2: 2 (replayed from cache)
// Sub1: 3
// Sub2: 3
// Sub1: 4
// Sub2: 4With bufferSize 1 (latest value only)
source := ro.Pipe(
ro.Just("first", "second", "third", "fourth"),
ro.ShareReplay[string](1), // Cache only latest value
)
sub1 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub1.Unsubscribe()
// Second subscriber gets only the last value
sub2 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub2.Unsubscribe()
//
// First subscriber: first, second, third, fourth
// Second subscriber: fourth (only last value replayed)With bufferSize 0 (no replay, just sharing)
source := ro.Pipe(
ro.Interval(50 * time.Millisecond),
ro.Take[int64](3),
ro.ShareReplay[int64](0), // No replay, just sharing
)
sub1 := source.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Sub1: %d\n", value)
}))
time.Sleep(125 * time.Millisecond) // After 2-3 values
sub2 := source.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Sub2: %d\n", value)
}))
time.Sleep(100 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
//
// Sub1: 0
// Sub1: 1
// Sub2: 2 (sub2 starts here, no replay)
// Sub1: 2With expensive operation caching
// Simulate expensive API call
expensiveAPI := func() Observable[string] {
return ro.Defer(func() Observable[string] {
fmt.Println("๐ Expensive API call...")
time.Sleep(100 * time.Millisecond)
return ro.Just("result1", "result2", "result3")
})
}
// Cache the last 2 results
cachedAPI := ro.Pipe(
expensiveAPI(),
ro.ShareReplay[string](2),
)
// First subscriber triggers API call
sub1 := cachedAPI.Subscribe(ro.PrintObserver[string]())
time.Sleep(200 * time.Millisecond)
sub1.Unsubscribe()
// Subsequent subscribers get cached results (no new API call)
sub2 := cachedAPI.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub2.Unsubscribe()
sub3 := cachedAPI.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub3.Unsubscribe()
//
// ๐ Expensive API call...
// Sub1: result1, result2, result3
// Sub2: result2, result3 (from cache)
// Sub3: result2, result3 (from cache)With error handling
source := ro.Pipe(
Defer(func() Observable[int] {
fmt.Println("Source execution...")
return ro.Pipe(
ro.Just(1, 2, 3),
ro.MapErr(func(i int) (int, error) {
if i == 3 {
return 0, errors.New("api failure")
}
return i, nil
}),
)
}),
ro.ShareReplay[int](2),
)
sub1 := source.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Sub1: %d\n", value)
},
func(err error) {
fmt.Printf("Sub1 Error: %v\n", err)
},
func() {
fmt.Println("Sub1 completion")
},
))
time.Sleep(100 * time.Millisecond)
// Second subscriber gets replayed values before error
sub2 := source.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf("Sub2: %d\n", value)
},
func(err error) {
fmt.Printf("Sub2 Error: %v\n", err)
},
func() {
fmt.Println("Sub2 completion")
},
))
time.Sleep(100 * time.Millisecond)
sub1.Unsubscribe()
sub2.Unsubscribe()
//
// Source execution...
// Sub1: 1
// Sub1: 2
// Sub2: 1 (replayed)
// Sub2: 2 (replayed)
// Sub1 Error: api failure
// Sub2 Error: api failureWith hot observable and late subscribers
// Create a hot observable with replay
hotSource := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.Take[int64](8),
ro.ShareReplay[int64](3), // Cache last 3 values
)
// Simulate subscribers joining at different times
go func() {
time.Sleep(0 * time.Millisecond)
sub := hotSource.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Early sub: %d\n", value)
}))
time.Sleep(500 * time.Millisecond)
sub.Unsubscribe()
}()
go func() {
time.Sleep(250 * time.Millisecond)
sub := hotSource.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Middle sub: %d\n", value)
}))
time.Sleep(400 * time.Millisecond)
sub.Unsubscribe()
}()
go func() {
time.Sleep(500 * time.Millisecond)
sub := hotSource.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf("Late sub: %d\n", value)
}))
time.Sleep(400 * time.Millisecond)
sub.Unsubscribe()
}()
time.Sleep(1200 * time.Millisecond)
// Shows replay behavior for late subscribersWith large buffer for complete history
source := ro.Pipe(
ro.Just("apple", "banana", "cherry", "date", "elderberry"),
ro.ShareReplay[string](10), // Large enough for all values
)
sub1 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub1.Unsubscribe()
// Subscribers joining later get complete history
sub2 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub2.Unsubscribe()
sub3 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub3.Unsubscribe()
// All subscribers get the complete sequence
// due to large replay bufferSimilar:Prototype:func ShareReplay[T any](bufferSize int)
ShareReplayWithConfigโ
Creates a shared Observable with replay functionality and custom configuration. Provides control over buffer size and reset behavior.
config := ShareReplayConfig{
ResetOnRefCountZero: true, // Reset when no subscribers left
}
source := ro.Pipe(
ro.Just("first", "second", "third"),
ro.ShareReplayWithConfig[string](2, config), // Cache last 2 values
)
sub1 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub1.Unsubscribe()
// Second subscriber gets replayed values
sub2 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub2.Unsubscribe()
//
// Sub1: first, second, third
// Sub2: second, third (replayed from cache)With ResetOnRefCountZero disabled
config := ShareReplayConfig{
ResetOnRefCountZero: false, // Keep cache even when no subscribers
}
source := ro.Pipe(
ro.Defer(func() ro.Observable[int] {
fmt.Println("๐ Creating new source...")
return ro.Just(1, 2, 3, 4, 5)
}),
ro.ShareReplayWithConfig[int](3, config), // Cache last 3 values
)
// First subscriber triggers source creation
sub1 := source.Subscribe(ro.PrintObserver[int]())
time.Sleep(50 * time.Millisecond)
sub1.Unsubscribe()
// Wait a bit, then subscribe again
time.Sleep(100 * time.Millisecond)
fmt.Println("Subscribing again...")
// Second subscriber gets cached values (no new source creation)
sub2 := source.Subscribe(ro.PrintObserver[int]())
time.Sleep(50 * time.Millisecond)
sub2.Unsubscribe()
//
// ๐ Creating new source...
// Sub1: 1, 2, 3, 4, 5
// Subscribing again...
// Sub2: 3, 4, 5 (replayed from persistent cache)With ResetOnRefCountZero enabled
config := ShareReplayConfig{
ResetOnRefCountZero: true, // Reset cache when no subscribers
}
source := ro.Pipe(
ro.Defer(func() ro.Observable[string] {
fmt.Println("๐ New source execution...")
return ro.Just("hello", "world", "again")
}),
ro.ShareReplayWithConfig[string](2, config),
)
// First subscriber
sub1 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub1.Unsubscribe()
// Cache resets after all unsubscribe
time.Sleep(50 * time.Millisecond)
// Second subscriber triggers new source creation
sub2 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub2.Unsubscribe()
//
// ๐ New source execution...
// Sub1: hello, world, again
// ๐ New source execution... (cache was reset)
// Sub2: hello, world, againWith large buffer and persistent cache
config := ShareReplayConfig{
ResetOnRefCountZero: false, // Keep cache forever
}
source := ro.Pipe(
ro.Just("data1", "data2", "data3", "data4", "data5"),
ro.ShareReplayWithConfig[string](10, config), // Large buffer
)
sub1 := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub1.Unsubscribe()
// Multiple subscribers can get complete history
for i := 2; i <= 4; i++ {
sub := source.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub.Unsubscribe()
}
// All subscribers get the complete sequence
// due to persistent large cacheWith expensive operation and persistent cache
config := ShareReplayConfig{
ResetOnRefCountZero: false, // Cache expensive results
}
expensiveOperation := func() ro.Observable[string] {
return ro.Defer(func() ro.Observable[string] {
fmt.Println("๐ธ Expensive database query...")
time.Sleep(200 * time.Millisecond)
return ro.Just("user1", "user2", "user3")
})
}
// Cache the expensive operation results
cachedUsers := ro.Pipe(
expensiveOperation(),
ro.ShareReplayWithConfig[string](5, config),
)
// Multiple subscribers over time without re-querying
for i := 1; i <= 3; i++ {
time.Sleep(300 * time.Millisecond)
fmt.Printf("Query %d:\n", i)
sub := cachedUsers.Subscribe(ro.PrintObserver[string]())
time.Sleep(50 * time.Millisecond)
sub.Unsubscribe()
}
//
// ๐ธ Expensive database query...
// Query 1: user1, user2, user3
// Query 2: user1, user2, user3 (from cache)
// Query 3: user1, user2, user3 (from cache)With real-time data stream
config := ShareReplayConfig{
ResetOnRefCountZero: false, // Keep latest data available
}
// Simulate real-time price updates
priceStream := ro.Pipe(
ro.Interval(1 * time.Second),
ro.Map(func(_ int64) float64 {
return 100 + rand.Float64()*10 // Price between 100-110
}),
ro.ShareReplayWithConfig[float64](1, config), // Keep only latest price
)
// Multiple price checkers
for i := 1; i <= 3; i++ {
go func(checkerID int) {
time.Sleep(time.Duration(checkerID) * 500 * time.Millisecond)
sub := priceStream.Subscribe(ro.OnNext(func(price float64) {
fmt.Printf("Checker %d: Price $%.2f\n", checkerID, price)
}))
time.Sleep(2 * time.Second)
sub.Unsubscribe()
}(i)
}
time.Sleep(4 * time.Second)
// Each checker gets the latest available price
// when they subscribeWith error handling and persistent cache
config := ShareReplayConfig{
ResetOnRefCountZero: false, // Keep error state too
}
source := ro.Pipe(
ro.Defer(func() ro.Observable[int] {
fmt.Println("๐ Attempting operation...")
if rand.Intn(3) == 0 {
return ro.Throw[int](errors.New("random failure"))
}
return ro.Just(42, 84, 126)
}),
ro.ShareReplayWithConfig[int](3, config),
)
// Multiple attempts may get cached error or success
for i := 1; i <= 3; i++ {
time.Sleep(200 * time.Millisecond)
fmt.Printf("Attempt %d:\n", i)
sub := source.Subscribe(ro.NewObserver(
func(value int) {
fmt.Printf(" Success: %d\n", value)
},
func(err error) {
fmt.Printf(" Error: %v\n", err)
},
func() {
fmt.Println(" Completed")
},
))
time.Sleep(50 * time.Millisecond)
sub.Unsubscribe()
}
// If first attempt fails, subsequent attempts get cached error
// If first succeeds, subsequent attempts get cached successWith buffer management
config := ShareReplayConfig{
ResetOnRefCountZero: true,
}
// Stream with varying data rates
dataStream := ro.Pipe(
ro.Interval(100 * time.Millisecond),
ro.Take[int64](20),
ro.ShareReplayWithConfig[int64](5, config), // Keep last 5 values
)
// Simulate periodic subscribers
for i := 0; i < 4; i++ {
go func(batch int) {
time.Sleep(time.Duration(batch) * 300 * time.Millisecond)
fmt.Printf("Batch %d subscribing:\n", batch+1)
sub := dataStream.Subscribe(ro.OnNext(func(value int64) {
fmt.Printf(" B%d: %d\n", batch+1, value)
}))
time.Sleep(400 * time.Millisecond)
sub.Unsubscribe()
fmt.Printf("Batch %d done\n", batch+1)
}(i)
}
time.Sleep(1500 * time.Millisecond)
// Shows how each batch gets replayed last 5 values
// from when they subscribedSimilar:Prototype:func ShareReplayWithConfig[T any](bufferSize int, config ShareReplayConfig)