This documentation is still new and evolving. If you spot any mistakes, unclear explanations, or missing details, please open an issue.
Your feedback helps us improve!
I/O operatorsโ
This page lists all operators available in the io sub-package of ro.
Installโ
First, import the sub-package in your project:
go get -u github.com/samber/ro/plugins/io
NewIOReaderโ
Creates an observable that reads data from an io.Reader in chunks.
import (
"io"
"strings"
"github.com/samber/ro"
roio "github.com/samber/ro/plugins/io"
)
data := strings.NewReader("Hello, World!")
obs := roio.NewIOReader(data)
sub := obs.Subscribe(ro.PrintObserver[[]byte]())
defer sub.Unsubscribe()
// Next: [72 101 108 108 111 44 32 87 111 114 108 100 33]
// CompletedSimilar:Prototype:func NewIOReader(reader io.Reader)NewPromptโ
Creates an observable that prompts the user for input.
import (
"github.com/samber/ro"
roio "github.com/samber/ro/plugins/io"
)
obs := roio.NewPrompt("Enter your name: ")
sub := obs.Subscribe(ro.NewObserver(
func(input []byte) {
println("You entered:", string(input))
},
func(err error) {
println("Error:", err.Error())
},
func() {
println("Input completed")
},
))
defer sub.Unsubscribe()
// (User sees prompt: "Enter your name: ")
// User types: "Alice"
// You entered: AliceSimilar:Prototype:func NewPrompt(prompt string)NewIOReaderLineโ
Creates an observable that reads lines from an io.Reader.
import (
"strings"
"github.com/samber/ro"
roio "github.com/samber/ro/plugins/io"
)
data := strings.NewReader("line1\nline2\nline3")
obs := roio.NewIOReaderLine(data)
sub := obs.Subscribe(ro.PrintObserver[[]byte]())
defer sub.Unsubscribe()
// Next: [108 105 110 101 49]
// Next: [108 105 110 101 50]
// Next: [108 105 110 101 51]
// CompletedSimilar:Prototype:func NewIOReaderLine(reader io.Reader)NewStdReaderโ
Creates an observable that reads data from standard input.
import (
"github.com/samber/ro"
roio "github.com/samber/ro/plugins/io"
)
obs := roio.NewStdReader()
sub := obs.Subscribe(ro.PrintObserver[[]byte]())
defer sub.Unsubscribe()
// Reads from stdin when data is available
// Next: [104 101 108 108 111] // if user types "hello"
// CompletedSimilar:Prototype:func NewStdReader()NewStdReaderLineโ
Creates an observable that reads lines from standard input.
import (
"github.com/samber/ro"
roio "github.com/samber/ro/plugins/io"
)
obs := roio.NewStdReaderLine()
sub := obs.Subscribe(ro.PrintObserver[[]byte]())
defer sub.Unsubscribe()
// Reads lines from stdin when user enters data
// Next: [104 101 108 108 111] // if user types "hello"
// CompletedSimilar:Prototype:func NewStdReaderLine()NewIOWriterโ
Creates an operator that writes byte arrays to an io.Writer and returns the count of written bytes.
import (
"bytes"
"github.com/samber/ro"
roio "github.com/samber/ro/plugins/io"
)
var buf bytes.Buffer
obs := ro.Pipe[[]byte, int](
ro.Just([]byte("Hello, "), []byte("World!")),
roio.NewIOWriter(&buf),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Next: 13
// CompletedSimilar:Prototype:func NewIOWriter(writer io.Writer)NewStdWriterโ
Creates an operator that writes byte arrays to standard output and returns the count of written bytes.
import (
"github.com/samber/ro"
roio "github.com/samber/ro/plugins/io"
)
obs := ro.Pipe[[]byte, int](
ro.Just([]byte("Hello, World!")),
roio.NewStdWriter(),
)
sub := obs.Subscribe(ro.PrintObserver[int]())
defer sub.Unsubscribe()
// Hello, World! (written to stdout)
// Next: 13
// CompletedSimilar:Prototype:func NewStdWriter()