Skip to main content
Help improve this documentation

This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.

Your feedback helps us improve!

I/O operatorsโ€‹

This page lists all operators available in the io sub-package of ro.

Installโ€‹

First, import the sub-package in your project:

go get -u github.com/samber/ro/plugins/io
  • Creates an observable that reads data from an io.Reader in chunks.

    import (
    "io"
    "strings"

    "github.com/samber/ro"
    roio "github.com/samber/ro/plugins/io"
    )

    data := strings.NewReader("Hello, World!")
    obs := roio.NewIOReader(data)

    sub := obs.Subscribe(ro.PrintObserver[[]byte]())
    defer sub.Unsubscribe()

    // Next: [72 101 108 108 111 44 32 87 111 114 108 100 33]
    // Completed
    Prototype:
    func NewIOReader(reader io.Reader)
  • Creates an observable that prompts the user for input.

    import (
    "github.com/samber/ro"
    roio "github.com/samber/ro/plugins/io"
    )

    obs := roio.NewPrompt("Enter your name: ")

    sub := obs.Subscribe(ro.NewObserver(
    func(input []byte) {
    println("You entered:", string(input))
    },
    func(err error) {
    println("Error:", err.Error())
    },
    func() {
    println("Input completed")
    },
    ))
    defer sub.Unsubscribe()

    // (User sees prompt: "Enter your name: ")
    // User types: "Alice"
    // You entered: Alice
    Prototype:
    func NewPrompt(prompt string)
  • Creates an observable that reads lines from an io.Reader.

    import (
    "strings"

    "github.com/samber/ro"
    roio "github.com/samber/ro/plugins/io"
    )

    data := strings.NewReader("line1\nline2\nline3")
    obs := roio.NewIOReaderLine(data)

    sub := obs.Subscribe(ro.PrintObserver[[]byte]())
    defer sub.Unsubscribe()

    // Next: [108 105 110 101 49]
    // Next: [108 105 110 101 50]
    // Next: [108 105 110 101 51]
    // Completed
    Prototype:
    func NewIOReaderLine(reader io.Reader)
  • Creates an observable that reads data from standard input.

    import (
    "github.com/samber/ro"
    roio "github.com/samber/ro/plugins/io"
    )

    obs := roio.NewStdReader()

    sub := obs.Subscribe(ro.PrintObserver[[]byte]())
    defer sub.Unsubscribe()

    // Reads from stdin when data is available
    // Next: [104 101 108 108 111] // if user types "hello"
    // Completed
    Prototype:
    func NewStdReader()
  • Creates an observable that reads lines from standard input.

    import (
    "github.com/samber/ro"
    roio "github.com/samber/ro/plugins/io"
    )

    obs := roio.NewStdReaderLine()

    sub := obs.Subscribe(ro.PrintObserver[[]byte]())
    defer sub.Unsubscribe()

    // Reads lines from stdin when user enters data
    // Next: [104 101 108 108 111] // if user types "hello"
    // Completed
    Prototype:
    func NewStdReaderLine()
  • Creates an operator that writes byte arrays to an io.Writer and returns the count of written bytes.

    import (
    "bytes"

    "github.com/samber/ro"
    roio "github.com/samber/ro/plugins/io"
    )

    var buf bytes.Buffer
    obs := ro.Pipe[[]byte, int](
    ro.Just([]byte("Hello, "), []byte("World!")),
    roio.NewIOWriter(&buf),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Next: 13
    // Completed
    Similar:
    Prototype:
    func NewIOWriter(writer io.Writer)
  • Creates an operator that writes byte arrays to standard output and returns the count of written bytes.

    import (
    "github.com/samber/ro"
    roio "github.com/samber/ro/plugins/io"
    )

    obs := ro.Pipe[[]byte, int](
    ro.Just([]byte("Hello, World!")),
    roio.NewStdWriter(),
    )

    sub := obs.Subscribe(ro.PrintObserver[int]())
    defer sub.Unsubscribe()

    // Hello, World! (written to stdout)
    // Next: 13
    // Completed
    Similar:
    Prototype:
    func NewStdWriter()