๐ฏ Subject
A Subject is a special type that extends both Observable
and Observer
, acting as a bridge or proxy that can multicast values to multiple observers. Subjects are the key to implementing hot observables and enabling event broadcasting patterns.
What is a Subject?โ
A Subject
is:
- An Observable: Can be subscribed to like any other
Observable
- An Observer: Can receive values through
Next
,Error
, andComplete
methods - A multicaster: Can broadcast values to multiple subscribers
- Hot by nature: Values are shared among all subscribers
The Subject interface combines both Observable and Observer:
type Subject[T any] interface {
Observable[T]
// Subscribe(destination Observer[T]) Subscription
// SubscribeWithContext(ctx context.Context, destination Observer[T]) Subscription
Observer[T]
// Next(value T)
// NextWithContext(ctx context.Context, value T)
// Error(err error)
// ErrorWithContext(ctx context.Context, err error)
// Complete()
// CompleteWithContext(ctx context.Context)
// IsClosed() bool
// HasThrown() bool
// IsCompleted() bool
HasObserver() bool
CountObservers() int
}
Subject Typesโ
samber/ro
provides four types of subjects, each with different behaviors:
1. PublishSubjectโ
PublishSubject emits only the values that were sent after the subscription. This is the most basic subject type, perfect for simple event broadcasting.
// Create a PublishSubject
subject := ro.NewPublishSubject[int]()
// Subscriber 1 - gets all future values
subject.Subscribe(ro.OnNext(func(n int) {
fmt.Println("Subscriber 1:", n)
}))
// Emit values
subject.Next(1)
subject.Next(2)
// Subscriber 2 - gets only values sent after subscription
subject.Subscribe(ro.OnNext(func(n int) {
fmt.Println("Subscriber 2:", n)
}))
subject.Next(3)
subject.Next(4)
subject.Complete()
// Output:
// Subscriber 1: 1
// Subscriber 1: 2
// Subscriber 1: 3
// Subscriber 2: 3
// Subscriber 1: 4
// Subscriber 2: 4
// Subscriber 1: Completed
// Subscriber 2: Completed
Use cases for PublishSubject:
- Event broadcasting systems
- Real-time notifications
- Chat applications
- Live data streams where only new values matter
2. BehaviorSubjectโ
BehaviorSubject emits the last value and all subsequent values to new subscribers. It requires an initial value and is ideal for state management scenarios.
// Create a BehaviorSubject with initial value
subject := ro.NewBehaviorSubject(42)
// Subscriber 1 - immediately gets the current value
subject.Subscribe(ro.NewObserver(
func(value int) {
fmt.Println("Subscriber 1 received:", value)
},
func(err error) {
fmt.Println("Subscriber 1 error:", err)
},
func() {
fmt.Println("Subscriber 1 completed")
},
))
// Emit new values
subject.Next(100)
subject.Next(200)
// Subscriber 2 - immediately gets the latest value
subject.Subscribe(ro.NewObserver(
func(value int) {
fmt.Println("Subscriber 2 received:", value)
},
func(err error) {
fmt.Println("Subscriber 2 error:", err)
},
func() {
fmt.Println("Subscriber 2 completed")
},
))
subject.Next(300)
subject.Complete()
// Output:
// Subscriber 1 received: 42
// Subscriber 1 received: 100
// Subscriber 1 received: 200
// Subscriber 2 received: 200
// Subscriber 1 received: 300
// Subscriber 2 received: 300
// Subscriber 1 completed
// Subscriber 2 completed
Use cases for BehaviorSubject:
- State management
- Configuration settings
- Current user information
- Any scenario where new subscribers need the current state
3. ReplaySubjectโ
ReplaySubject emits a specified number of previous values and all subsequent values to new subscribers. Use it when you need to provide context or history to new subscribers.
// Create a ReplaySubject with buffer size of 3
subject := ro.NewReplaySubject[string](3)
// Emit values
subject.Next("first")
subject.Next("second")
subject.Next("third")
subject.Next("fourth")
// Subscriber 1 - gets last 3 values
subject.Subscribe(ro.OnNext(func(s string) {
fmt.Println("Subscriber 1:", s)
}))
subject.Next("fifth")
// Subscriber 2 - gets last 3 values
subject.Subscribe(ro.OnNext(func(s string) {
fmt.Println("Subscriber 2:", s)
}))
subject.Complete()
// Output:
// Subscriber 1: second
// Subscriber 1: third
// Subscriber 1: fourth
// Subscriber 1: fifth
// Subscriber 2: third
// Subscriber 2: fourth
// Subscriber 2: fifth
// Subscriber 1: Completed
// Subscriber 2: Completed
Use cases for ReplaySubject:
- Chat history
- Stock price updates
- Game replay systems
- Notification history
- Any scenario where new users need context
4. AsyncSubjectโ
AsyncSubject emits only the last value and only when the sequence completes. This is perfect for async operations that produce a single final result.
// Create an AsyncSubject
subject := ro.NewAsyncSubject[float64]()
// Subscriber 1 - will receive nothing until completion
subject.Subscribe(ro.NewObserver(
func(value float64) {
fmt.Println("Subscriber 1 received:", value)
},
func(err error) {
fmt.Println("Subscriber 1 error:", err)
},
func() {
fmt.Println("Subscriber 1 completed")
},
))
// Emit multiple values
subject.Next(1.0)
subject.Next(2.0)
subject.Next(3.0)
// Subscriber 2 - will also receive only the final value
subject.Subscribe(ro.NewObserver(
func(value float64) {
fmt.Println("Subscriber 2 received:", value)
},
func(err error) {
fmt.Println("Subscriber 2 error:", err)
},
func() {
fmt.Println("Subscriber 2 completed")
},
))
// Complete to trigger emission
subject.Complete()
// Output:
// Subscriber 1 received: 3.0
// Subscriber 1 completed
// Subscriber 2 received: 3.0
// Subscriber 2 completed
Use cases for AsyncSubject:
- Asynchronous operations that return a single result
- HTTP requests
- Database queries
- File operations
- Any computation that produces one final result
Subject Lifecycle Managementโ
Checking Subject Stateโ
Monitor subject activity to debug issues or implement conditional logic based on subscriber presence.
subject := ro.NewPublishSubject[int]()
// Check if there are observers
fmt.Println("Has observers:", subject.HasObserver()) // false
fmt.Println("Observer count:", subject.CountObservers()) // 0
// Add subscribers
subject.Subscribe(ro.OnNext(func(n int) { fmt.Println(n) }))
// Check state again
fmt.Println("Has observers:", subject.HasObserver()) // true
fmt.Println("Observer count:", subject.CountObservers()) // 1
Unsubscribing from Subjectsโ
Manage individual subscriptions to control which observers receive values. This is useful for fine-grained resource management.
subject := ro.NewPublishSubject[string]()
// Subscribe and keep the subscription
subscription1 := subject.Subscribe(ro.OnNext(func(s string) {
fmt.Println("Subscriber 1:", s)
}))
subscription2 := subject.Subscribe(ro.OnNext(func(s string) {
fmt.Println("Subscriber 2:", s)
}))
subject.Next("hello")
subject.Next("world")
// Unsubscribe one observer
subscription1.Unsubscribe()
subject.Next("again")
// Output:
// Subscriber 1: hello
// Subscriber 2: hello
// Subscriber 1: world
// Subscriber 2: world
// Subscriber 2: again
Subject vs Observableโ
// Observable (cold) - each subscription gets independent values
observable := ro.Just(1, 2, 3)
// first Subscription
observable.Subscribe(ro.OnNext(func(n int) {
fmt.Println("Observable 1:", n)
}))
// second Subscription
observable.Subscribe(ro.OnNext(func(n int) {
fmt.Println("Observable 2:", n)
}))
// Output:
// Observable 1: 1
// Observable 1: 2
// Observable 1: 3
// Observable 2: 1
// Observable 2: 2
// Observable 2: 3
// Subject (hot) - subscriptions share the same values concurrently
subject := ro.NewPublishSubject[int]()
subject.Subscribe(ro.OnNext(func(n int) {
fmt.Println("Subject 1:", n)
}))
subject.Subscribe(ro.OnNext(func(n int) {
fmt.Println("Subject 2:", n)
}))
subject.Next(1)
subject.Next(2)
subject.Next(3)
// Output:
// Subject 1: 1
// Subject 2: 1
// Subject 1: 2
// Subject 2: 2
// Subject 1: 3
// Subject 2: 3
Advanced Subject Patternsโ
State Management with BehaviorSubjectโ
// Simple state management system
type AppState struct {
User string
Counter int
}
// Create state subject
stateSubject := ro.NewBehaviorSubject(AppState{
User: "guest",
Counter: 0,
})
// Component that listens to state changes
stateSubject.Subscribe(ro.OnNext(func(state AppState) {
fmt.Printf("UI Update: User=%s, Counter=%d\n", state.User, state.Counter)
}))
// Update state
updateState := func(user string, counter int) {
current := stateSubject.LastValue() // Requires implementation
stateSubject.Next(AppState{
User: user,
Counter: counter,
})
}
// Simulate state changes
updateState("alice", 1)
updateState("alice", 2)
updateState("bob", 1)
// Output:
// UI Update: User=guest, Counter=0
// UI Update: User=alice, Counter=1
// UI Update: User=alice, Counter=2
// UI Update: User=bob, Counter=1
Event Bus with PublishSubjectโ
// Simple event bus
type EventBus struct {
events map[string]ro.Subject[string]
mutex sync.Mutex
}
func NewEventBus() *EventBus {
return &EventBus{
events: make(map[string]ro.Subject[string]),
}
}
func (eb *EventBus) Publish(eventType, data string) {
eb.mutex.Lock()
defer eb.mutex.Unlock()
if subject, exists := eb.events[eventType]; exists {
subject.Next(data)
}
}
func (eb *EventBus) Subscribe(eventType string, observer ro.Observer[string]) ro.Subscription {
eb.mutex.Lock()
defer eb.mutex.Unlock()
if _, exists := eb.events[eventType]; !exists {
eb.events[eventType] = ro.NewPublishSubject[string]()
}
return eb.events[eventType].Subscribe(observer)
}
// Usage
eventBus := NewEventBus()
// Subscribe to events
userSub := eventBus.Subscribe("user.login", ro.OnNext(func(data string) {
fmt.Println("User logged in:", data)
}))
orderSub := eventBus.Subscribe("order.created", ro.OnNext(func(data string) {
fmt.Println("Order created:", data)
}))
// Publish events
eventBus.Publish("user.login", "alice")
eventBus.Publish("order.created", "order-123")
eventBus.Publish("user.login", "bob")
// Output:
// User logged in: alice
// Order created: order-123
// User logged in: bob
Chat System with ReplaySubjectโ
// Chat room with history
type ChatRoom struct {
messages ro.Subject[string]
users map[string]ro.Subject[string]
mutex sync.Mutex
}
func NewChatRoom() *ChatRoom {
return &ChatRoom{
messages: ro.NewReplaySubject[string](100), // Keep last 100 messages
users: make(map[string]ro.Subject[string]),
}
}
func (cr *ChatRoom) Join(username string) ro.Subject[string] {
cr.mutex.Lock()
defer cr.mutex.Unlock()
if _, exists := cr.users[username]; !exists {
cr.users[username] = ro.NewPublishSubject[string]()
cr.messages.Next(fmt.Sprintf("%s joined the chat", username))
}
return cr.users[username]
}
func (cr *ChatRoom) SendMessage(username, message string) {
fullMessage := fmt.Sprintf("%s: %s", username, message)
cr.messages.Next(fullMessage)
}
func (cr *ChatRoom) GetMessages() ro.Observable[string] {
return cr.messages
}
// Usage
chatRoom := NewChatRoom()
// User joins and gets message history
user1Messages := chatRoom.Join("alice")
user1Messages.Subscribe(ro.OnNext(func(msg string) {
fmt.Println("Alice sees:", msg)
}))
// Send some messages
chatRoom.SendMessage("alice", "Hello!")
chatRoom.SendMessage("alice", "Is anyone there?")
// Another user joins later and gets history
user2Messages := chatRoom.Join("bob")
user2Messages.Subscribe(ro.OnNext(func(msg string) {
fmt.Println("Bob sees:", msg)
}))
chatRoom.SendMessage("bob", "Hi Alice!")
Subject Best Practicesโ
1. Choose the Right Subject Typeโ
// Good: Use appropriate subject for the use case
currentState := ro.NewBehaviorSubject(initialState) // For current state
eventStream := ro.NewPublishSubject[Event]() // For new events only
messageHistory := ro.NewReplaySubject[Message](1000) // For history/caching
finalResult := ro.NewAsyncSubject[Result]() // For single async result
2. Manage Subject Lifecycleโ
// Good: Clean up subjects when done
func processEvents() {
subject := ro.NewPublishSubject[Event]()
defer subject.Complete()
subscription := subject.Subscribe(observer)
defer subscription.Unsubscribe()
// Process events...
}
// Risky: Subject might outlive its usefulness
var globalSubject ro.Subject[Event] // Could leak memory
3. Handle Errors Gracefullyโ
// Good: Handle errors in subject streams
subject := ro.NewPublishSubject[Data]()
subject.Subscribe(ro.NewObserver(
func(data Data) { /* process data */ },
func(err error) {
log.Printf("Subject error: %v", err)
// Implement recovery logic
},
func() { /* handle completion */ },
))
// Don't let errors go unhandled
subject.Error(fmt.Errorf("processing failed"))
4. Avoid Memory Leaksโ
// Good: Use bounded replay subjects
history := ro.NewReplaySubject[Event](1_000) // Reasonable buffer size
// Risky: Unbounded buffer could exhaust memory
unbounded := ro.NewReplaySubject[Event](1_000_000) // Too large
Subjects provide a powerful, reactive way to implement event-driven systems with built-in multicasting, lifecycle management, and composition with other reactive operators. They are essential for building complex, real-time applications in Go.