Reflex::Role::Streaming - add streaming I/O behavior to a class


version 0.056


use Moose;

has socket => ( is => 'rw', isa => 'FileHandle', required => 1 );

with 'Reflex::Role::Streaming' => {
	handle    => 'socket',
	cb_data   => 'on_socket_data',    # default
	cb_error  => 'on_socket_error',   # default
	cb_closed => 'on_socket_closed',  # default

sub on_socket_data {
	my ($self, $arg) = @_;

sub on_socket_error {
	my ($self, $arg) = @_;
	print "$arg->{errfun} error $arg->{errnum}: $arg->{errstr}\n";

sub on_socket_closed {
	my $self = shift;
	print "Connection closed.\n";


Reflex::Role::Streaming is a Moose parameterized role that adds streaming I/O behavior to Reflex-based classes. In the SYNOPSIS, a filehandle named "socket" is turned into a NBIO stream by the addition addition of Reflex::Role::Streaming.

See Reflex::Stream if you prefer runtime composition with objects, or you just find Moose syntax difficult to handle.

Required Role Parameters


The handle parameter must contain the name of the attribute that contains the handle to stream. The name indirection allows the role to generate methods that are unique to the handle. For example, a handle named "XYZ" would generate these methods by default:

cb_closed   => "on_XYZ_closed",
cb_data     => "on_XYZ_data",
cb_error    => "on_XYZ_error",
method_put  => "put_XYZ",

This naming convention allows the role to be used for more than one handle in the same class. Each handle will have its own name, and the mixed in methods associated with them will also be unique.

Optional Role Parameters


cb_closed names the $self method that will be called whenever handle has reached the end of readable data. For sockets, this means the remote endpoint has closed or shutdown for writing.

cb_closed is by default the catenation of "on_", the handle name, and "_closed". A handle named "XYZ" will by default trigger on_XYZ_closed() callbacks. The role defines a default callback that will emit a "closed" event and call stopped(), which is provided by Reflex::Role::Collectible.

Currently the second parameter to the cb_closed callback contains no parameters of note.

When overriding this callback, please be sure to call stopped(), which is provided by Reflex::Role::Collectible. Calling stopped() is vital for collectible objects to be released from memory when managed by Reflex::Collection.


cb_data names the $self method that will be called whenever the stream for handle has provided new data. By default, it's the catenation of "on_", the handle name, and "_data". A handle named "XYZ" will by default trigger on_XYZ_data() callbacks. The role defines a default callback that will emit a "data" event with cb_data()'s parameters.

All Reflex parameterized role calblacks are invoked with two parameters: $self and an anonymous hashref of named values specific to the callback. cb_data callbacks include a single named value, data, that contains the raw octets received from the filehandle.


cb_error names the $self method that will be called whenever the stream produces an error. By default, this method will be the catenation of "on_", the handle name, and "_error". As in on_XYZ_error(), if the handle is named "XYZ". The role defines a default callback that will emit an "error" event with cb_error()'s parameters, then will call stopped() so that streams managed by Reflex::Collection will be automatically cleaned up after stopping.

cb_error callbacks receive two parameters, $self and an anonymous hashref of named values specific to the callback. Reflex error callbacks include three standard values. errfun contains a single word description of the function that failed. errnum contains the numeric value of $! at the time of failure. errstr holds the stringified version of $!.

Values of $! are passed as parameters since the global variable may change before the callback can be invoked.

When overriding this callback, please be sure to call stopped(), which is provided by Reflex::Role::Collectible. Calling stopped() is vital for collectible objects to be released from memory when managed by Reflex::Collection.


method_put defines the name of a method that will put data octets into the stream's output buffer. The default name is "put_" followed by the streaming handle's name, such as put_XYZ().

The put method takes an array of strings of raw octets:

my $pending_count = $self->put_XYZ(
	"raw octets here", " and some more"

The put method will try to flush the stream's output buffer immediately. Any data that cannot be flushed will remain in the buffer. The streaming code will attempt to flush it later when the stream becomes writable again.

The put method returns the number of buffered octets waiting to be flushed---zero if the buffer has been synchronously flushed. If the synchronous syswrite() fails, it will invoke cb_error and return undef.


TODO - I'm sure there are some.


Reflex Reflex::Role::Readable Reflex::Role::Writable Reflex::Stream

