This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
Filtering operatorsโ
This page lists all filtering operations, available in the core package of ro.
Filterโ
Emits only those items from an Observable that pass a predicate test.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Filter(func(i int) bool {
return i%2 == 0
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 2
// Next: 4
// CompletedWith context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.FilterWithContext(func(ctx context.Context, i int) (context.Context, bool) {
return ctx, i%2 == 0
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 2
// Next: 4
// CompletedWith index
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.FilterI(func(i int, index int64) bool {
return index > 1 // Skip first two elements
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 3
// Next: 4
// Next: 5
// CompletedWith index and context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.FilterIWithContext(func(ctx context.Context, i int, index int64) (context.Context, bool) {
return ctx, index > 1 && i%2 == 0
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 4
// CompletedPrototypes:func Filter[T any](predicate func(item T) bool)
func FilterWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool))
func FilterI[T any](predicate func(item T, index int64) bool)
func FilterIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool))Takeโ
Emits only the first n items emitted by an Observable. If the count is greater than the number of items emitted by the source Observable, Take will emit all items. If the count is zero, Take will not emit any items.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Take[int](3),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// CompletedEdge case: Taking more items than available
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.Take[int](5),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// CompletedEdge case: Taking zero items
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Take[int](0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// CompletedPrototype:func Take[T any](count int64)
TakeWhileโ
Emits items emitted by an Observable so long as a specified condition is true, then completes.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
ro.TakeWhile(func(i int) bool {
return i < 5
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// CompletedWith context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
ro.TakeWhileWithContext(func(ctx context.Context, i int) (context.Context, bool) {
return ctx, i < 5
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// CompletedWith index
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
ro.TakeWhileI(func(i int, index int64) bool {
return index < 3 // Take first 4 elements (index 0, 1, 2, 3)
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// CompletedWith index and context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
ro.TakeWhileIWithContext(func(ctx context.Context, i int, index int64) (context.Context, bool) {
return ctx, i < 5 && index < 3
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// CompletedEdge case: Never completes if condition always true
// In practice, this would complete when the source completes
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.TakeWhile(func(i int) bool {
return true // Always true
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// Next: 5
// CompletedEdge case: Completes immediately if condition always false
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.TakeWhile(func(i int) bool {
return false // Always false
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// CompletedPrototypes:func TakeWhile[T any](predicate func(item T) bool)
func TakeWhileWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool))
func TakeWhileI[T any](predicate func(item T, index int64) bool)
func TakeWhileIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool))Firstโ
Emits only the first item (or the first item that satisfies a predicate) from an Observable sequence.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.First[int](func(i int) bool {
return true // Match all items, returns first
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// CompletedFirst with predicate
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.First[int](func(i int) bool {
return i > 3 // Find first item greater than 3
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 4
// CompletedWith context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.FirstWithContext(func(ctx context.Context, i int) (context.Context, bool) {
return ctx, i > 3
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 4
// CompletedWith index
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.FirstI(func(i int, index int64) bool {
return index > 2 // Find first item with index > 2
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 4 (index 3)
// CompletedWith index and context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.FirstIWithContext(func(ctx context.Context, i int, index int64) (context.Context, bool) {
return ctx, i > 3 && index > 2
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 4
// CompletedEdge case: No matching items
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.First[int](func(i int) bool {
return i > 10 // No items match
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// No Next values, completes without emitting
// CompletedPrototypes:func First[T any](predicate func(item T) bool)
func FirstWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool))
func FirstI[T any](predicate func(item T, index int64) bool)
func FirstIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool))Lastโ
Emits only the last item (or the last item that satisfies a predicate) from an Observable sequence.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Last[int](func(i int) bool {
return true // Match all items, returns last
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 5
// CompletedLast with predicate
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Last[int](func(i int) bool {
return i < 4 // Find last item less than 4
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 3
// CompletedWith context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.LastWithContext(func(ctx context.Context, i int) (context.Context, bool) {
return ctx, i < 4
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 3
// CompletedWith index
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.LastI(func(i int, index int64) bool {
return index < 3 // Find last item with index < 3
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 4 (index 3)
// CompletedWith index and context
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.LastIWithContext(func(ctx context.Context, i int, index int64) (context.Context, bool) {
return ctx, i < 4 && index < 3
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 3
// CompletedEdge case: No matching items
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Last[int](func(i int) bool {
return i > 10 // No items match
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// No Next values, completes without emitting
// CompletedEdge case: Single item
obs := ro.Pipe(
ro.Just(42),
ro.Last[int](func(i int) bool {
return i > 0
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 42
// CompletedPrototypes:func Last[T any](predicate func(item T) bool)
func LastWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool))
func LastI[T any](predicate func(item T, index int64) bool)
func LastIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool))Headโ
Emits only the first item emitted by an Observable. If the source Observable is empty, Head will emit an error.
obs := ro.Pipe(
ro.Just("first", "second", "third"),
ro.Head(),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: first
// CompletedWith empty observable
obs := ro.Pipe(
ro.Empty[string](),
ro.Head(),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: head of empty observableWith single item
obs := ro.Pipe(
ro.Just("only one"),
ro.Head(),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: only one
// CompletedWith numbers
obs := ro.Pipe(
ro.Just(10, 20, 30, 40, 50),
ro.Head(),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 10
// CompletedWith time-based emissions
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Head(),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(200 * time.Millisecond)
sub.Unsubscribe()
// Next: 0 (emitted immediately)
// CompletedWith error in source
obs := ro.Pipe(
ro.Throw[string](errors.New("source error")),
ro.Head(),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: source errorPrototype:func Head[T any]()
Tailโ
Emits only the last item emitted by an Observable. If the source Observable is empty, Tail will emit an error.
obs := ro.Pipe(
ro.Just("first", "second", "third"),
ro.Tail(),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: third
// CompletedWith single item
obs := ro.Pipe(
ro.Just("only one"),
ro.Tail(),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: only one
// CompletedWith empty observable
obs := ro.Pipe(
ro.Empty[string](),
ro.Tail(),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: tail of empty observableWith numbers
obs := ro.Pipe(
ro.Just(10, 20, 30, 40, 50),
ro.Tail(),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 50
// CompletedWith time-based emissions
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Tail(),
ro.Take(3),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(400 * time.Millisecond)
sub.Unsubscribe()
// Next: 2 (last value before completion)
// CompletedWith error in source
obs := ro.Pipe(
ro.Throw[string](errors.New("source error")),
ro.Tail(),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: source errorWith large number of items
obs := ro.Pipe(
ro.Just(makeRange(1, 1000)...), // Emits 1 through 1000
ro.Tail(),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1000
// CompletedPrototype:func Tail[T any]()
Distinctโ
Returns an observable sequence that contains only distinct elements.
obs := ro.Pipe(
ro.Just(1, 2, 2, 3, 1, 4, 3, 5),
ro.Distinct[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// Next: 5
// CompletedWith strings
obs := ro.Pipe(
ro.Just("apple", "banana", "apple", "cherry", "banana"),
ro.Distinct[string](),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: apple
// Next: banana
// Next: cherry
// CompletedWith custom types
type Person struct {
ID int
Name string
}
obs := ro.Pipe(
ro.Just(
Person{1, "Alice"},
Person{2, "Bob"},
Person{1, "Alice"}, // Duplicate
Person{3, "Charlie"},
),
ro.Distinct[Person](),
)
sub := obs.Subscribe(ro.PrintObserver[Person]())
defer sub.Unsubscribe()
// Next: {1 Alice}
// Next: {2 Bob}
// Next: {3 Charlie}
// CompletedEmpty sequence
obs := ro.Pipe(
ro.Empty[int](),
ro.Distinct[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Completed (no values emitted)Prototype:func Distinct[T comparable]()
ElementAtโ
Emits only the nth item emitted by an Observable. If the source Observable emits fewer than n items, ElementAt will emit an error.
obs := ro.Pipe(
ro.Just("apple", "banana", "cherry", "date"),
ro.ElementAt(2),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: cherry
// CompletedWith zero-based indexing
obs := ro.Pipe(
ro.Just("first", "second", "third"),
ro.ElementAt(0),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: first
// CompletedWith out of bounds index
obs := ro.Pipe(
ro.Just("a", "b", "c"),
ro.ElementAt(5),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Error: element at index 5 not foundWith numbers
obs := ro.Pipe(
ro.Just(10, 20, 30, 40, 50),
ro.ElementAt(3),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 40
// CompletedWith time-based observable
obs := ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.ElementAt(5),
)
sub := obs.Subscribe(ro.PrintObserver[int64]())
time.Sleep(800 * time.Millisecond)
sub.Unsubscribe()
// Next: 5 (emitted after 500ms)
// CompletedSimilar:Prototype:func ElementAt[T any](nth int)
IgnoreElementsโ
Ignores all elements emitted by the source observable, only allowing completion or error notifications to pass through.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.IgnoreElements[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Completed (no values emitted)With error handling
obs := ro.Pipe(
ro.Concat(
ro.Just(1, 2, 3),
ro.Throw[int](errors.New("something went wrong")),
),
ro.IgnoreElements[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Error: something went wrongFor side effects only
// Use IgnoreElements when you only care about side effects
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Tap(func(n int) {
fmt.Printf("Processing item: %d\n", n)
}),
ro.IgnoreElements[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Processing item: 1
// Processing item: 2
// Processing item: 3
// Processing item: 4
// Processing item: 5
// CompletedWith delayed completion
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.Delay(100*time.Millisecond),
ro.IgnoreElements[int](),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Completed after 100ms (no values emitted)Prototype:func IgnoreElements[T any]()
ElementAtOrDefaultโ
Emits only the nth item emitted by an Observable. If the source Observable emits fewer than n items, ElementAtOrDefault will emit a fallback value.
obs := ro.Pipe(
ro.Just("a", "b", "c"),
ro.ElementAtOrDefault(5, "fallback"),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: fallback
// CompletedWithin bounds (emits the nth item)
obs := ro.Pipe(
ro.Just("first", "second", "third"),
ro.ElementAtOrDefault(1, "fallback"),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: second
// CompletedWith zero-based indexing and fallback
obs := ro.Pipe(
ro.Just("apple"),
ro.ElementAtOrDefault(0, "no fruit"),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: apple
// CompletedWith numbers and fallback
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.ElementAtOrDefault(10, 999),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 999
// CompletedWith empty observable
obs := ro.Pipe(
ro.Empty[string](),
ro.ElementAtOrDefault(0, "default value"),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: default value
// CompletedWith negative index (will panic)
// This will panic because nth cannot be negative
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic:", r)
}
}()
obs := ro.Pipe(
ro.Just("a", "b"),
ro.ElementAtOrDefault(-1, "fallback"), // This will panic
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()Prototype:func ElementAtOrDefault[T any](nth int64, fallback T)
Skipโ
Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Skip(2),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 3
// Next: 4
// Next: 5
// CompletedSkip more than available
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.Skip(5),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Completed (no values emitted)Skip zero
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.Skip(0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// Next: 4
// Next: 5
// CompletedWith infinite sequence
obs := ro.Pipe(
ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Take[int64](10),
),
ro.Skip(3),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 3 (after 400ms)
// Next: 4 (after 500ms)
// Next: 5 (after 600ms)
// ...
// Next: 9 (after 1000ms)
// CompletedPrototype:func Skip(count int64)
SkipWhileโ
Bypasses elements in an observable sequence as long as a specified condition is true, and then returns the remaining elements.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.SkipWhile(func(i int) bool {
return i < 3
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 3
// Next: 4
// Next: 5
// CompletedWith context
obs := ro.Pipe(
ro.Just("apple", "banana", "cherry", "date"),
ro.SkipWhileWithContext(func(ctx context.Context, fruit string) bool {
return strings.HasPrefix(fruit, "a") || strings.HasPrefix(fruit, "b")
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: cherry
// Next: date
// CompletedWith index
obs := ro.Pipe(
ro.Just(10, 20, 30, 40, 50),
ro.SkipWhileI(func(item int, index int64) bool {
return index < 2 // Skip first 2 items regardless of value
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 30
// Next: 40
// Next: 50
// CompletedWith index and context
obs := ro.Pipe(
ro.Just("a", "b", "c", "d", "e"),
ro.SkipWhileIWithContext(func(ctx context.Context, item string, index int64) bool {
return index < 3 && item != "d"
}),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: d
// Next: e
// CompletedWhen condition never becomes false
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.SkipWhile(func(i int) bool {
return i > 0 // Always true
}),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Completed (no values emitted)Prototypes:func SkipWhile[T any](predicate func(item T) bool)
func SkipWhileWithContext[T any](predicate func(ctx context.Context, item T) bool)
func SkipWhileI[T any](predicate func(item T, index int64) bool)
func SkipWhileIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool)TakeLastโ
Returns a specified number of contiguous elements from the end of an observable sequence.
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.TakeLast(2),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 4
// Next: 5
// CompletedTake all elements
obs := ro.Pipe(
ro.Just(1, 2, 3),
ro.TakeLast(5),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 1
// Next: 2
// Next: 3
// CompletedTakeLast with zero
obs := ro.Pipe(
ro.Just(1, 2, 3, 4, 5),
ro.TakeLast(0),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Completed (no values emitted)With strings
obs := ro.Pipe(
ro.Just("apple", "banana", "cherry", "date", "elderberry"),
ro.TakeLast(3),
)
sub := obs.Subscribe(ro.PrintObserver[string]())
defer sub.Unsubscribe()
// Next: cherry
// Next: date
// Next: elderberry
// CompletedMemory usage note
// TakeLast must buffer all elements to determine the last N
// Use with caution on very long or infinite sequences
obs := ro.Pipe(
ro.Pipe(
ro.Interval(100*time.Millisecond),
ro.Take[int64](10),
),
ro.TakeLast(3),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 7
// Next: 8
// Next: 9
// CompletedPrototype:func TakeLast(count int64)