NAME
Linux::Event::Stream - Buffered, backpressure-aware I/O for nonblocking file descriptors
SYNOPSIS
use v5.36;
use Linux::Event;
use Linux::Event::Stream;
my $loop = Linux::Event->new;
# Raw bytes mode (TCP/pipes): on_read receives arbitrary chunks.
my $stream = Linux::Event::Stream->new(
loop => $loop,
fh => $fh,
on_read => sub ($stream, $bytes, $data) {
# Called whenever bytes are received.
# If you call $stream->close or $stream->close_after_drain here,
# further callbacks will not be invoked.
},
on_error => sub ($stream, $errno, $data) {
# Called on fatal I/O error. Stream will close immediately after this.
},
on_close => sub ($stream, $data) {
# Called exactly once when the stream closes.
},
high_watermark => 1_048_576, # bytes
low_watermark => 262_144, # bytes
read_size => 8192,
max_read_per_tick => 0,
data => $user_data,
);
$stream->write("hello\n");
$loop->run;
# Framed/message mode: on_message receives complete messages.
my $framed = Linux::Event::Stream->new(
loop => $loop,
fh => $fh,
codec => 'line',
on_message => sub ($stream, $line, $data) {
# Called for each complete line ("\n" removed by the default line codec).
$stream->write_message("echo: $line");
},
);
DESCRIPTION
Linux::Event::Stream provides buffered, backpressure-aware I/O on top of Linux::Event watchers.
It wraps a nonblocking file descriptor and adds:
Write buffering
High/low watermark backpressure tracking (hysteresis latch)
Graceful close-after-drain support
Optional read throttling
Stream does not create sockets or modify the event loop. It is a small policy layer over a file descriptor.
In raw mode, Stream delivers arbitrary byte chunks via on_read.
In framed/message mode, Stream buffers incoming bytes internally and uses a codec to emit complete messages via on_message.
CONSTRUCTOR
new(%args)
Required arguments:
- loop
-
A Linux::Event loop instance.
- fh
-
A filehandle or socket. It will be placed into nonblocking mode.
Optional arguments:
- on_read => sub ($stream, $bytes, $data)
-
Raw bytes mode.
Called whenever bytes are read.
This callback may call
closeorclose_after_drain. - codec
-
Framed/message mode.
Required when using
on_message.May be either a codec object (a hashref with
decodeandencodecoderefs), or one of the builtin aliases:line- newline-delimited messagesnetstring-<len>:<payload>,u32be- 32-bit big-endian length prefix
- on_message => sub ($stream, $msg, $data)
-
Framed/message mode.
Enables internal read buffering and emits complete messages produced by the codec.
When
on_messageis provided,on_readis not used. - max_inbuf
-
Framed/message mode.
Maximum buffered undecoded bytes (default 1MB). If exceeded, Stream fires
on_errorwith errno 0, setslast_error, and closes. - on_error => sub ($stream, $errno, $data)
-
Called when a fatal I/O error occurs.
After firing
on_error, the stream closes immediately (buffer is discarded) and thenon_closefires. - on_close => sub ($stream, $data)
-
Called exactly once when the stream closes (for any reason).
- high_watermark
-
Defaults to 1MB.
If pending buffered bytes exceed this value,
is_write_blockedbecomes true. - low_watermark
-
Defaults to 256KB.
When pending buffered bytes drop below this value,
is_write_blockedbecomes false.If low_watermark is greater than high_watermark, it is clamped to high_watermark.
- read_size
-
Maximum bytes per sysread() call (default 8192).
- max_read_per_tick
-
Limit total bytes read per loop tick. 0 means unlimited.
- data
-
Opaque user data passed to callbacks.
- close_fh
-
If true, the underlying filehandle is closed when the stream closes.
By default Stream does not assume ownership of the filehandle.
METHODS
write($bytes)
Queues bytes for sending and returns true if the bytes were accepted for buffering.
If there is no pending buffered data, Stream attempts an immediate syswrite() before buffering the remainder.
A true return value does not imply that the peer has received the bytes; delivery is asynchronous.
buffered_bytes
Returns the number of bytes currently buffered and not yet written to the file descriptor.
is_write_blocked
Returns true if Stream is in a backpressured state due to the configured watermarks.
This is a hysteresis latch: it becomes true above high_watermark and becomes false below low_watermark.
pause_read
Disables read notifications.
resume_read
Re-enables read notifications (unless the stream is closing).
close
Immediately closes the stream.
Buffered data is discarded.
close_after_drain
Stops reading and closes once all buffered data has been written.
is_closed
Returns true once closed.
is_closing
Returns true if close_after_drain has been requested (the stream is closing but may still have buffered bytes to write).
write_message($msg)
Framed/message mode.
Encodes $msg using the configured codec and then writes the resulting bytes.
Returns true if the encoded bytes were accepted for buffering.
last_error
Returns the last non-EOF error recorded by the stream.
For I/O errors, on_error receives a non-zero errno.
For codec/buffer errors in framed mode, on_error receives errno 0 and last_error contains a string like codec:....
CODEC CONTRACT
In framed/message mode, Stream expects a codec object that is a hashref (or a blessed hashref) with two coderefs:
$codec->{decode}->($codec, \$inbuf, \@out)Consumes from
$$inbuf(in place) and pushes zero or more decoded messages onto@out. Returns true on success, or(0, $err)on failure.$codec->{encode}->($codec, $msg)Returns a byte string to write for the given message.
Stream validates the codec once at construction time and calls these coderefs directly on the hot path to avoid per-message method dispatch overhead.
BACKPRESSURE SEMANTICS
Watermarks operate on pending buffered bytes (buffered_bytes).
is_write_blocked transitions:
false -> true when buffered_bytes > high_watermark
true -> false when buffered_bytes < low_watermark
The transition back to false occurs inside the WRITE-ready drain path, not necessarily during a READ event on the peer.
INTEGRATION NOTES
Stream is intended to be composed by higher-level modules (for example a future Linux::Event::Listen connection wrapper) to provide consistent buffered I/O and backpressure behavior.
AUTHOR
Joshua S. Day
LICENSE
Same terms as Perl itself.