NAME

Linux::Event::Stream - Buffered, backpressure-aware I/O for nonblocking file descriptors

SYNOPSIS

use v5.36;
use Linux::Event;
use Linux::Event::Stream;

my $loop = Linux::Event->new;

# Raw bytes mode (TCP/pipes): on_read receives arbitrary chunks.
my $stream = Linux::Event::Stream->new(
  loop => $loop,
  fh   => $fh,

  on_read  => sub ($stream, $bytes, $data) {
    # Called whenever bytes are received.
    # If you call $stream->close or $stream->close_after_drain here,
    # further callbacks will not be invoked.
  },

  on_error => sub ($stream, $errno, $data) {
    # Called on fatal I/O error. Stream will close immediately after this.
  },

  on_close => sub ($stream, $data) {
    # Called exactly once when the stream closes.
  },

  high_watermark => 1_048_576,  # bytes
  low_watermark  =>   262_144,  # bytes

  read_size         => 8192,
  max_read_per_tick => 0,

  data => $user_data,
);

$stream->write("hello\n");
$loop->run;

# Framed/message mode: on_message receives complete messages.
my $framed = Linux::Event::Stream->new(
  loop       => $loop,
  fh         => $fh,
  codec      => 'line',
  on_message => sub ($stream, $line, $data) {
    # Called for each complete line ("\n" removed by the default line codec).
    $stream->write_message("echo: $line");
  },
);

DESCRIPTION

Linux::Event::Stream provides buffered, backpressure-aware I/O on top of Linux::Event watchers.

It wraps a nonblocking file descriptor and adds:

  • Write buffering

  • High/low watermark backpressure tracking (hysteresis latch)

  • Graceful close-after-drain support

  • Optional read throttling

Stream does not create sockets or modify the event loop. It is a small policy layer over a file descriptor.

In raw mode, Stream delivers arbitrary byte chunks via on_read.

In framed/message mode, Stream buffers incoming bytes internally and uses a codec to emit complete messages via on_message.

CONSTRUCTOR

new(%args)

Required arguments:

loop

A Linux::Event loop instance.

fh

A filehandle or socket. It will be placed into nonblocking mode.

Optional arguments:

on_read => sub ($stream, $bytes, $data)

Raw bytes mode.

Called whenever bytes are read.

This callback may call close or close_after_drain.

codec

Framed/message mode.

Required when using on_message.

May be either a codec object (a hashref with decode and encode coderefs), or one of the builtin aliases:

  • line - newline-delimited messages

  • netstring - <len>:<payload>,

  • u32be - 32-bit big-endian length prefix

on_message => sub ($stream, $msg, $data)

Framed/message mode.

Enables internal read buffering and emits complete messages produced by the codec.

When on_message is provided, on_read is not used.

max_inbuf

Framed/message mode.

Maximum buffered undecoded bytes (default 1MB). If exceeded, Stream fires on_error with errno 0, sets last_error, and closes.

on_error => sub ($stream, $errno, $data)

Called when a fatal I/O error occurs.

After firing on_error, the stream closes immediately (buffer is discarded) and then on_close fires.

on_close => sub ($stream, $data)

Called exactly once when the stream closes (for any reason).

high_watermark

Defaults to 1MB.

If pending buffered bytes exceed this value, is_write_blocked becomes true.

low_watermark

Defaults to 256KB.

When pending buffered bytes drop below this value, is_write_blocked becomes false.

If low_watermark is greater than high_watermark, it is clamped to high_watermark.

read_size

Maximum bytes per sysread() call (default 8192).

max_read_per_tick

Limit total bytes read per loop tick. 0 means unlimited.

data

Opaque user data passed to callbacks.

close_fh

If true, the underlying filehandle is closed when the stream closes.

By default Stream does not assume ownership of the filehandle.

METHODS

write($bytes)

Queues bytes for sending and returns true if the bytes were accepted for buffering.

If there is no pending buffered data, Stream attempts an immediate syswrite() before buffering the remainder.

A true return value does not imply that the peer has received the bytes; delivery is asynchronous.

buffered_bytes

Returns the number of bytes currently buffered and not yet written to the file descriptor.

is_write_blocked

Returns true if Stream is in a backpressured state due to the configured watermarks.

This is a hysteresis latch: it becomes true above high_watermark and becomes false below low_watermark.

pause_read

Disables read notifications.

resume_read

Re-enables read notifications (unless the stream is closing).

close

Immediately closes the stream.

Buffered data is discarded.

close_after_drain

Stops reading and closes once all buffered data has been written.

is_closed

Returns true once closed.

is_closing

Returns true if close_after_drain has been requested (the stream is closing but may still have buffered bytes to write).

write_message($msg)

Framed/message mode.

Encodes $msg using the configured codec and then writes the resulting bytes.

Returns true if the encoded bytes were accepted for buffering.

last_error

Returns the last non-EOF error recorded by the stream.

For I/O errors, on_error receives a non-zero errno.

For codec/buffer errors in framed mode, on_error receives errno 0 and last_error contains a string like codec:....

CODEC CONTRACT

In framed/message mode, Stream expects a codec object that is a hashref (or a blessed hashref) with two coderefs:

  • $codec->{decode}->($codec, \$inbuf, \@out)

    Consumes from $$inbuf (in place) and pushes zero or more decoded messages onto @out. Returns true on success, or (0, $err) on failure.

  • $codec->{encode}->($codec, $msg)

    Returns a byte string to write for the given message.

Stream validates the codec once at construction time and calls these coderefs directly on the hot path to avoid per-message method dispatch overhead.

BACKPRESSURE SEMANTICS

Watermarks operate on pending buffered bytes (buffered_bytes).

is_write_blocked transitions:

  • false -> true when buffered_bytes > high_watermark

  • true -> false when buffered_bytes < low_watermark

The transition back to false occurs inside the WRITE-ready drain path, not necessarily during a READ event on the peer.

INTEGRATION NOTES

Stream is intended to be composed by higher-level modules (for example a future Linux::Event::Listen connection wrapper) to provide consistent buffered I/O and backpressure behavior.

AUTHOR

Joshua S. Day

LICENSE

Same terms as Perl itself.