Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

Filtering operatorsโ€‹

This page lists all filtering operations, available in the core package of ro.

  • Emits only those items from an Observable that pass a predicate test.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Filter(func(i int) bool {
    return i%2 == 0
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 2
    // Next: 4
    // Completed

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.FilterWithContext(func(ctx context.Context, i int) (context.Context, bool) {
    return ctx, i%2 == 0
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 2
    // Next: 4
    // Completed

    With index

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.FilterI(func(i int, index int64) bool {
    return index > 1 // Skip first two elements
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 3
    // Next: 4
    // Next: 5
    // Completed

    With index and context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.FilterIWithContext(func(ctx context.Context, i int, index int64) (context.Context, bool) {
    return ctx, index > 1 && i%2 == 0
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 4
    // Completed
    Prototypes:
    func Filter[T any](predicate func(item T) bool)
    func FilterWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool))
    func FilterI[T any](predicate func(item T, index int64) bool)
    func FilterIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool))
  • Emits only the first n items emitted by an Observable. If the count is greater than the number of items emitted by the source Observable, Take will emit all items. If the count is zero, Take will not emit any items.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Take[int](3),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed

    Edge case: Taking more items than available

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.Take[int](5),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed

    Edge case: Taking zero items

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Take[int](0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed
    Prototype:
    func Take[T any](count int64)
  • Emits items emitted by an Observable so long as a specified condition is true, then completes.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
    ro.TakeWhile(func(i int) bool {
    return i < 5
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Completed

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
    ro.TakeWhileWithContext(func(ctx context.Context, i int) (context.Context, bool) {
    return ctx, i < 5
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Completed

    With index

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
    ro.TakeWhileI(func(i int, index int64) bool {
    return index < 3 // Take first 4 elements (index 0, 1, 2, 3)
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Completed

    With index and context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
    ro.TakeWhileIWithContext(func(ctx context.Context, i int, index int64) (context.Context, bool) {
    return ctx, i < 5 && index < 3
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Completed

    Edge case: Never completes if condition always true

    // In practice, this would complete when the source completes
    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.TakeWhile(func(i int) bool {
    return true // Always true
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Next: 5
    // Completed

    Edge case: Completes immediately if condition always false

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.TakeWhile(func(i int) bool {
    return false // Always false
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed
    Prototypes:
    func TakeWhile[T any](predicate func(item T) bool)
    func TakeWhileWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool))
    func TakeWhileI[T any](predicate func(item T, index int64) bool)
    func TakeWhileIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool))
  • Emits only the first item (or the first item that satisfies a predicate) from an Observable sequence.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.First[int](func(i int) bool {
    return true // Match all items, returns first
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Completed

    First with predicate

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.First[int](func(i int) bool {
    return i > 3 // Find first item greater than 3
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 4
    // Completed

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.FirstWithContext(func(ctx context.Context, i int) (context.Context, bool) {
    return ctx, i > 3
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 4
    // Completed

    With index

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.FirstI(func(i int, index int64) bool {
    return index > 2 // Find first item with index > 2
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 4 (index 3)
    // Completed

    With index and context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.FirstIWithContext(func(ctx context.Context, i int, index int64) (context.Context, bool) {
    return ctx, i > 3 && index > 2
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 4
    // Completed

    Edge case: No matching items

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.First[int](func(i int) bool {
    return i > 10 // No items match
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // No Next values, completes without emitting
    // Completed
    Similar:
    Prototypes:
    func First[T any](predicate func(item T) bool)
    func FirstWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool))
    func FirstI[T any](predicate func(item T, index int64) bool)
    func FirstIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool))
  • Emits only the last item (or the last item that satisfies a predicate) from an Observable sequence.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Last[int](func(i int) bool {
    return true // Match all items, returns last
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 5
    // Completed

    Last with predicate

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Last[int](func(i int) bool {
    return i < 4 // Find last item less than 4
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 3
    // Completed

    With context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.LastWithContext(func(ctx context.Context, i int) (context.Context, bool) {
    return ctx, i < 4
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 3
    // Completed

    With index

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.LastI(func(i int, index int64) bool {
    return index < 3 // Find last item with index < 3
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 4 (index 3)
    // Completed

    With index and context

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.LastIWithContext(func(ctx context.Context, i int, index int64) (context.Context, bool) {
    return ctx, i < 4 && index < 3
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 3
    // Completed

    Edge case: No matching items

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Last[int](func(i int) bool {
    return i > 10 // No items match
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // No Next values, completes without emitting
    // Completed

    Edge case: Single item

    obs := ro.Pipe(
    ro.Just(42),
    ro.Last[int](func(i int) bool {
    return i > 0
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 42
    // Completed
    Prototypes:
    func Last[T any](predicate func(item T) bool)
    func LastWithContext[T any](predicate func(ctx context.Context, item T) (context.Context, bool))
    func LastI[T any](predicate func(item T, index int64) bool)
    func LastIWithContext[T any](predicate func(ctx context.Context, item T, index int64) (context.Context, bool))
  • Emits only the first item emitted by an Observable. If the source Observable is empty, Head will emit an error.

    obs := ro.Pipe(
    ro.Just("first", "second", "third"),
    ro.Head(),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: first
    // Completed

    With empty observable

    obs := ro.Pipe(
    ro.Empty[string](),
    ro.Head(),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: head of empty observable

    With single item

    obs := ro.Pipe(
    ro.Just("only one"),
    ro.Head(),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: only one
    // Completed

    With numbers

    obs := ro.Pipe(
    ro.Just(10, 20, 30, 40, 50),
    ro.Head(),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 10
    // Completed

    With time-based emissions

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Head(),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(200 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 0 (emitted immediately)
    // Completed

    With error in source

    obs := ro.Pipe(
    ro.Throw[string](errors.New("source error")),
    ro.Head(),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: source error
    Prototype:
    func Head[T any]()
  • Emits only the last item emitted by an Observable. If the source Observable is empty, Tail will emit an error.

    obs := ro.Pipe(
    ro.Just("first", "second", "third"),
    ro.Tail(),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: third
    // Completed

    With single item

    obs := ro.Pipe(
    ro.Just("only one"),
    ro.Tail(),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: only one
    // Completed

    With empty observable

    obs := ro.Pipe(
    ro.Empty[string](),
    ro.Tail(),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: tail of empty observable

    With numbers

    obs := ro.Pipe(
    ro.Just(10, 20, 30, 40, 50),
    ro.Tail(),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 50
    // Completed

    With time-based emissions

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Tail(),
    ro.Take(3),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(400 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 2 (last value before completion)
    // Completed

    With error in source

    obs := ro.Pipe(
    ro.Throw[string](errors.New("source error")),
    ro.Tail(),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: source error

    With large number of items

    obs := ro.Pipe(
    ro.Just(makeRange(1, 1000)...), // Emits 1 through 1000
    ro.Tail(),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1000
    // Completed
    Prototype:
    func Tail[T any]()
  • Returns an observable sequence that contains only distinct elements.

    obs := ro.Pipe(
    ro.Just(1, 2, 2, 3, 1, 4, 3, 5),
    ro.Distinct[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Next: 5
    // Completed

    With strings

    obs := ro.Pipe(
    ro.Just("apple", "banana", "apple", "cherry", "banana"),
    ro.Distinct[string](),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: apple
    // Next: banana
    // Next: cherry
    // Completed

    With custom types

    type Person struct {
    ID int
    Name string
    }

    obs := ro.Pipe(
    ro.Just(
    Person{1, "Alice"},
    Person{2, "Bob"},
    Person{1, "Alice"}, // Duplicate
    Person{3, "Charlie"},
    ),
    ro.Distinct[Person](),
    )

    sub := obs.Subscribe(ro.PrintObserver[Person]())
    defer sub.Unsubscribe()

    // Next: {1 Alice}
    // Next: {2 Bob}
    // Next: {3 Charlie}
    // Completed

    Empty sequence

    obs := ro.Pipe(
    ro.Empty[int](),
    ro.Distinct[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed (no values emitted)
    Prototype:
    func Distinct[T comparable]()
  • Emits only the nth item emitted by an Observable. If the source Observable emits fewer than n items, ElementAt will emit an error.

    obs := ro.Pipe(
    ro.Just("apple", "banana", "cherry", "date"),
    ro.ElementAt(2),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: cherry
    // Completed

    With zero-based indexing

    obs := ro.Pipe(
    ro.Just("first", "second", "third"),
    ro.ElementAt(0),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: first
    // Completed

    With out of bounds index

    obs := ro.Pipe(
    ro.Just("a", "b", "c"),
    ro.ElementAt(5),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Error: element at index 5 not found

    With numbers

    obs := ro.Pipe(
    ro.Just(10, 20, 30, 40, 50),
    ro.ElementAt(3),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 40
    // Completed

    With time-based observable

    obs := ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.ElementAt(5),
    )

    sub := obs.Subscribe(ro.PrintObserver[int64]())
    time.Sleep(800 * time.Millisecond)
    sub.Unsubscribe()

    // Next: 5 (emitted after 500ms)
    // Completed
    Prototype:
    func ElementAt[T any](nth int)
  • IgnoreElementsโ€‹

    Ignores all elements emitted by the source observable, only allowing completion or error notifications to pass through.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.IgnoreElements[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed (no values emitted)

    With error handling

    obs := ro.Pipe(
    ro.Concat(
    ro.Just(1, 2, 3),
    ro.Throw[int](errors.New("something went wrong")),
    ),
    ro.IgnoreElements[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Error: something went wrong

    For side effects only

    // Use IgnoreElements when you only care about side effects
    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Tap(func(n int) {
    fmt.Printf("Processing item: %d\n", n)
    }),
    ro.IgnoreElements[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Processing item: 1
    // Processing item: 2
    // Processing item: 3
    // Processing item: 4
    // Processing item: 5
    // Completed

    With delayed completion

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.Delay(100*time.Millisecond),
    ro.IgnoreElements[int](),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed after 100ms (no values emitted)
    Prototype:
    func IgnoreElements[T any]()
  • ElementAtOrDefaultโ€‹

    Emits only the nth item emitted by an Observable. If the source Observable emits fewer than n items, ElementAtOrDefault will emit a fallback value.

    obs := ro.Pipe(
    ro.Just("a", "b", "c"),
    ro.ElementAtOrDefault(5, "fallback"),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: fallback
    // Completed

    Within bounds (emits the nth item)

    obs := ro.Pipe(
    ro.Just("first", "second", "third"),
    ro.ElementAtOrDefault(1, "fallback"),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: second
    // Completed

    With zero-based indexing and fallback

    obs := ro.Pipe(
    ro.Just("apple"),
    ro.ElementAtOrDefault(0, "no fruit"),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: apple
    // Completed

    With numbers and fallback

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.ElementAtOrDefault(10, 999),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 999
    // Completed

    With empty observable

    obs := ro.Pipe(
    ro.Empty[string](),
    ro.ElementAtOrDefault(0, "default value"),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: default value
    // Completed

    With negative index (will panic)

    // This will panic because nth cannot be negative
    defer func() {
    if r := recover(); r != nil {
    fmt.Println("Recovered from panic:", r)
    }
    }()

    obs := ro.Pipe(
    ro.Just("a", "b"),
    ro.ElementAtOrDefault(-1, "fallback"), // This will panic
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()
    Prototype:
    func ElementAtOrDefault[T any](nth int64, fallback T)
  • Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Skip(2),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 3
    // Next: 4
    // Next: 5
    // Completed

    Skip more than available

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.Skip(5),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed (no values emitted)

    Skip zero

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.Skip(0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Next: 4
    // Next: 5
    // Completed

    With infinite sequence

    obs := ro.Pipe(
    ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Take[int64](10),
    ),
    ro.Skip(3),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 3 (after 400ms)
    // Next: 4 (after 500ms)
    // Next: 5 (after 600ms)
    // ...
    // Next: 9 (after 1000ms)
    // Completed
    Prototype:
    func Skip(count int64)
  • Bypasses elements in an observable sequence as long as a specified condition is true, and then returns the remaining elements.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.SkipWhile(func(i int) bool {
    return i < 3
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 3
    // Next: 4
    // Next: 5
    // Completed

    With context

    obs := ro.Pipe(
    ro.Just("apple", "banana", "cherry", "date"),
    ro.SkipWhileWithContext(func(ctx context.Context, fruit string) bool {
    return strings.HasPrefix(fruit, "a") || strings.HasPrefix(fruit, "b")
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: cherry
    // Next: date
    // Completed

    With index

    obs := ro.Pipe(
    ro.Just(10, 20, 30, 40, 50),
    ro.SkipWhileI(func(item int, index int64) bool {
    return index < 2 // Skip first 2 items regardless of value
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 30
    // Next: 40
    // Next: 50
    // Completed

    With index and context

    obs := ro.Pipe(
    ro.Just("a", "b", "c", "d", "e"),
    ro.SkipWhileIWithContext(func(ctx context.Context, item string, index int64) bool {
    return index < 3 && item != "d"
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: d
    // Next: e
    // Completed

    When condition never becomes false

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.SkipWhile(func(i int) bool {
    return i > 0 // Always true
    }),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed (no values emitted)
    Prototypes:
    func SkipWhile[T any](predicate func(item T) bool)
    func SkipWhileWithContext[T any](predicate func(ctx context.Context, item T) bool)
    func SkipWhileI[T any](predicate func(item T, index int64) bool)
    func SkipWhileIWithContext[T any](predicate func(ctx context.Context, item T, index int64) bool)
  • Returns a specified number of contiguous elements from the end of an observable sequence.

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.TakeLast(2),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 4
    // Next: 5
    // Completed

    Take all elements

    obs := ro.Pipe(
    ro.Just(1, 2, 3),
    ro.TakeLast(5),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 1
    // Next: 2
    // Next: 3
    // Completed

    TakeLast with zero

    obs := ro.Pipe(
    ro.Just(1, 2, 3, 4, 5),
    ro.TakeLast(0),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Completed (no values emitted)

    With strings

    obs := ro.Pipe(
    ro.Just("apple", "banana", "cherry", "date", "elderberry"),
    ro.TakeLast(3),
    )

    sub := obs.Subscribe(ro.PrintObserver[string]())
    defer sub.Unsubscribe()

    // Next: cherry
    // Next: date
    // Next: elderberry
    // Completed

    Memory usage note

    // TakeLast must buffer all elements to determine the last N
    // Use with caution on very long or infinite sequences
    obs := ro.Pipe(
    ro.Pipe(
    ro.Interval(100*time.Millisecond),
    ro.Take[int64](10),
    ),
    ro.TakeLast(3),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 7
    // Next: 8
    // Next: 9
    // Completed
    Prototype:
    func TakeLast(count int64)