NAME
IO::Lambda - non-blocking I/O in lambda style
DESCRIPTION
This module is another attempt to fight the horrors of non-blocking I/O programming. The simplicity of the sequential programming is only available when one employs threads, coroutines, or Co-processes. Otherwise state machines are to be built, often quite complex, which fact doesn't help the clarity of the code. This module uses closures to achieve clarity of sequential programming with single-process, single-thread, non-blocking I/O.
SYNOPSIS
This is a fairly large document, so depending on your reading tastes, you may either read all from here - it begins with code examples, then with more code examples, then the explanation of basic concepts, and finally gets to the complex ones. Or, you may skip directly to the fun part ("Stream IO", where functional style mixes with I/O. Also note that io
and lambda
are synonyms.
Read line by line from filehandle
Given $filehandle
is non-blocking, the following code creates a lambda than reads from it util EOF or error occured. getline
(see "Stream IO" below) is a similar lambda that reads single line from a filehandle.
use IO::Lambda qw(:all);
sub my_reader
{
my $filehandle = shift;
io {
context getline, $filehandle, \(my $buf = '');
tail {
my ( $string, $error) = @_;
if ( $error) {
warn "error: $error\n";
} else {
print $string;
return again;
}
}}
}
Assume we have two socket connections, and sockets are non-blocking - read from both of them simulteously. The following code creates a lambda that reads from two readers:
sub my_reader_all
{
my @filehandles = @_;
lambda {
context map { my_reader($_) } @filehandles;
tails { print "all is finished\n" };
}
}
my_reader_all( $socket1, $socket2)-> wait;
Non-blocking HTTP
Given a socket, create a lambda that implements http protocol
use IO::Lambda qw(:all);
use IO::Socket;
use HTTP::Request;
sub talk
{
my $req = shift;
my $socket = IO::Socket::INET-> new( PeerAddr => 'www.perl.com', PeerPort => 80);
lambda {
context $socket;
write {
# connected
print $socket "GET ", $req-> uri, "\r\n\r\n";
my $buf = '';
read {
sysread $socket, $buf, 1024, length($buf) or return $buf;
again; # wait for reading and re-do the block
}
}
}
}
Connect and talk to the remote
$request = HTTP::Request-> new( GET => 'http://www.perl.com');
my $q = talk( $request );
print $q-> wait; # will print content of $buf
Connect two parallel connections: by explicitly waiting for each
$q = lambda {
context talk($request);
tail { print shift };
context talk($request2);
tail { print shift };
};
$q-> wait;
Connect two parallel connections: by waiting for all
$q = lambda {
context talk($request1), talk($request2);
tails { print for @_ };
};
$q-> wait;
Teach our simple http request to redirect by wrapping talk(). talk_redirect() will have exactly the same properties as talk() does
sub talk_redirect
{
my $req = shift;
lambda {
context talk( $req);
tail {
my $res = HTTP::Response-> parse( shift );
return $res unless $res-> code == 302;
$req-> uri( $res-> uri);
context talk( $req);
again;
}
}
}
Working example
use strict;
use IO::Lambda qw(:lambda);
use IO::Socket::INET;
sub get
{
my ( $socket, $url) = @_;
lambda {
context $socket;
write {
print $socket "GET $url HTTP/1.0\r\n\r\n";
my $buf = '';
read {
my $n = sysread( $socket, $buf, 1024, length($buf));
return "read error:$!" unless defined $n;
return $buf unless $n;
again;
}}}
}
sub get_parallel
{
my @hosts = @_;
lambda {
context map { get(
IO::Socket::INET-> new(
PeerAddr => $_,
PeerPort => 80
), '/index.html') } @hosts;
tails {
join("\n\n\n", @_ )
}
}
}
print get_parallel('www.perl.com', 'www.google.com')-> wait;
See tests and examples in directory eg/
for more.
API
Events and states
A lambda is an IO::Lambda
object, that waits for IO and timeout events, and for events generated when other lambdas are finished. On each such event a callback is executed. The result of the execution is saved, and passed on to the next callback, when the next event arrives.
Life cycle of a lambda goes through three modes: passive, waiting, and stopped. A lambda that is just created, or was later reset with reset
call, is in passive state. When the lambda is started, the only callback associated with the lambda will be executed:
$q = lambda { print "hello world!\n" };
# not printed anything yet
$q-> wait; # <- here will
Lambdas are usually not started explicitly; the function that waits for a lambda, also starts it. wait
, the synchronous waiter, and tail
/tails
, the asynchronous ones, start passive lambdas when called. Lambda is finished when there are no more events to listen to. The example lambda above will finish right after print
statement.
Lambda can listen to events by calling predicates, that internally subscribe the lambda object to corresponding file handles, timers, and other lambdas. There are only those three types of events that basically constitute everything needed for building a state machine driven by external events, in particular, by non-blocking I/O. Parameters passed to predicates with explicit context
call, not by perl subroutine call convention. In the example below, lambda watches for file handle readability:
$q = lambda {
context \*SOCKET;
read { print "I'm readable!\n"; }
# here is nothing printed yet
};
# and here is nothing printed yet
Such lambda, when started, will switch to the waiting state, - will be waiting for the socket. The lambda will finish only after the callback associated with read
predicate is called.
Of course, new events can be created inside all callbacks, on each state. This style resembles a dynamic programming of sorts, when the state machine is not hard-coded in advance, but is built as soon as code that gets there is executed.
The events can be created either by explicitly calling predicates, or by restarting the last predicate with again
call. For example, code
read { int(rand 2) ? print 1 : again }
will print indeterminable number of ones.
Contexts
Each lambda callback (further on, merely lambda) executes in its own, private context. The context here means that all predicates register callbacks on an implicitly given lambda object, and keep the passed parameters on the context stack. The fact that context is preserved between states, helps building terser code with series of IO calls:
context \*SOCKET;
write {
read {
}}
is actually a shorter form for
context \*SOCKET;
write {
context \*SOCKET; # <-- context here is retained from one frame up
read {
}}
And as the context is kept, the current lambda object is also, in this
property. The code above is actually
my $self = this;
context \*SOCKET;
write {
this $self; # <-- object reference is retained here
context \*SOCKET;
read {
}}
this
can be used if more than one lambda needs to be accessed. In which case,
this $object;
context @context;
is the same as
this $object, @context;
which means that explicitly setting this
will always clear the context.
Data and execution flow
A lambda is initially called with arguments passed from outside. These arguments can be stored using call
method; wait
and tail
also issue call
internally, thus replacing any previous data stored by call
. Inside the lambda these arguments are available as @_
.
Whatever is returned by a predicate callback (including lambda
predicate), will be passed as @_
to the next callback, or to the outside, if the lambda is finished. The result of a finished lambda is available by peek
method, that returns either all array of data available in the array context, or first item in the array otherwise. wait
returns the same data as peek
does.
When more than one lambda watches for another lambda, the latter will get its last callback results passed to all the watchers. However, when a lambda creates more than one state that derive from the current state, a forking behaviour of sorts, the latest stored results will get overwritten by the first executed callback, so constructions like
read { 1 + shift };
write { 2 + shift };
...
wait(0)
will eventually return 3, but whether it will be 1+2 or 2+1, is not known.
wait
is not the only function that synchronises input and output data. wait_for_all
method waits for all lambdas, including the caller, to finish. It returns collected results of all the objects in a single list. wait_for_any
method waits for at least one lambda, from the list of passed lambdas (again, including the caller), to finish. It returns list of finished objects as soon as possible.
Time
Timers and I/O timeouts can be given not only in the timeout values, as it usually is in event libraries, but also as deadlines in (fractional) seconds since epoch. This decision, strange at first sight, actually helps a lot when total execution time is to be tracked. For example, the following code reads as many bytes from a socket within 5 seconds:
lambda {
my $buf = '';
context $socket, time + 5;
read {
if ( shift ) {
return again if sysread $socket, $buf, 1024, length($buf);
} else {
print "oops! a timeout\n";
}
$buf;
}
};
Rewriting the same code with read
semantics that accepts time as timeout instead, would be not that elegant:
lambda {
my $buf = '';
my $time_left = 5;
my $now = time;
context $socket, $time_left;
read {
if ( shift ) {
if (sysread $socket, $buf, 1024, length($buf)) {
$time_left -= (time - $now);
$now = time;
context $socket, $time_left;
return again;
}
} else {
print "oops! a timeout\n";
}
$buf;
}
};
However, the exact opposite is true for sleep
. The following two lines both sleep 5 seconds:
lambda { context 5; sleep {} }
lambda { context time + 5; sleep {} }
Internally, timers use Time::HiRes::time
that gives fractional number of seconds. This however is not required for the caller, in which case timeouts will simply be less precise, and will jitter plus-minus half a second.
Predicates
All predicates receive their parameters from the context stack, or simply the context. The only parameter passed to them by using perl call, is a callback itself. Predicates can also be called without a callback, in which case, they will pass further data that otherwise would be passed as @_
to the callback. Thus, a predicate can be called either as
read { .. code ... }
or
&read(); # no callback
Predicates can either be used after explicit exporting
use IO::Lambda qw(:lambda);
lambda { ... }
or by using the package syntax,
use IO::Lambda;
IO::Lambda::lambda { ... };
- lambda()
-
Creates a new
IO::Lambda
object. - io()
-
Same as
lambda
. - read($filehandle, $deadline = undef)
-
Executes either when
$filehandle
becomes readable, or after$deadline
. Passes one argument, which is either TRUE if the handle is readable, or FALSE if time is expired. Ifdeadline
isundef
, then no timeout is registered, that means that it will never be called with FALSE. - write($filehandle, $deadline = undef)
-
Exactly same as
read
, but executes when$filehandle
becomes writable. - readwrite($flags, $filehandle, $deadline = undef)
-
Executes either when
$filehandle
satisfies any of the condition$flags
, or after$deadline
.$flags
is a combination of three integer constants,IO_READ
,IO_WRITE
, andIO_EXCEPTION
, that are imported withuse IO::Lambda qw(:constants);
Passes one argument, which is either a combination of the same
IO_XXX
flags, that report which conditions the handle satisfied, or 0 if time is expired. Ifdeadline
isundef
, no timeout is registered, i.e. will never return 0. - sleep($deadline)
-
Executes after
$deadline
.$deadline
cannot beundef
. - tail($lambda, @parameters)
-
Issues
$lambda-> call(@parameters)
, then waits for the$lambda
to complete. - tails(@lambdas)
-
Executes when all objects in
@lambdas
are finished, returns the collected, unordered results of the objects. - tailo(@lambdas)
-
Same as
tails
, but the results are ordered. - again(@frame = ())
-
Restarts the current state with the current context. All the predicates above, excluding
lambda
, are restartable withagain
call (seestart
for restarting alambda
). The codecontext $obj1; tail { return if $null++; context $obj2; again; };
is thus equivalent to
context $obj1; tail { context $obj2; &tail(); };
again
passes the current context to the predicate.If
@frame
is provided, then it is treated as result of previousthis_frame
call. It contains data sufficient to restarting another call, instead of the current. Seethis_frame
for details. - context @ctx
-
If called with no parameters, returns the current context, otherwise replaces the current context with
@ctx
. It is thus not possible (not that it is practical anyway) to clear the context with this call. If really needed, usethis(this)
syntax. - this $this, @ctx
-
If called with no parameters, returns the current lambda. Otherwise, replaces both the current lambda and the current context. Can be useful either when juggling with several lambdas, or as a convenience over
my
variables, for example,this lambda { ... }; this-> wait;
instead of
my $q = lambda { ... }; $q-> wait;
- this_frame(@frame)
-
If called without parameters, returns the current callback frame, that can be later used in
again
. Otherwise, replaces the internal frame variables, that doesn't affect anything immediately, but will be used byagain
that is called without parameters.This property is only used when the predicate inside which
this_frame
was fetched, is restartable. Since it is not a requirement for a user-defined predicate to be restartable, this property is not universally useful.Example:
context lambda { 1 }; tail { return if 3 == shift; my @frame = this_frame; context lambda { 2 }; tail { context lambda { 3 }; again( @frame); } }
The outermost tail callback will be called twice: first time in the normal course of event, and second time as a result of the
again
call.this_frame
andagain
thus provide a kind of restartable continuations.
Stream IO
The whole point of this module is to help building complex protocols in a clear, consequent programming style. Consider how perl's low-level sysread
and syswrite
relate to its higher-level readline
, where the latter not only does the buffering, but also recognizes $/
as input record separator. The section above described lower-level lambda I/O predicates, that are only useful for sysread
and syswrite
; this section tells about higher-level lambdas that relate to these low-level ones, as the aforementioned readline
relates to sysread
.
All functions in this section return the lambda, that does the actual work. Not unlike as a class constructor returns a newly created class instance, these functions return newly created lambdas. Therefore, these functions are documented here as having two inputs and one output, as for example a function sysreader
is a function that takes 0 parameters, always returns a new lambda, and this lambda, in turn, takes four parameters and returns two. This function will be described as
# sysreader() :: ($fh,$buf,$length,$deadline) -> ($result,$error)
Since all stream I/O lambdas return same set of scalars, the return type will be further on referred as ioresult
:
# ioresult :: ($result, $error)
# sysreader() :: ($fh,$buf,$length,$deadline) -> ioresult
ioresult
's first scalar is defined on success, and is not otherwise. In the latter case, the second scalar contains the error, usually either $!
or 'timeout'
(if $deadline
was set).
- sysreader() :: ($fh, $buf, $length, $deadline) -> ioresult
-
Creates a lambda that accepts all the parameters used by
sysread
(except$offset
though), plus$deadline
. The lambda tries to read$length
bytes from$fh
into$buf
, when$fh
becomes available for reading. If$deadline
expires, fails with'timeout'
error. On successful read, returns number of bytes read, or$!
otherwise. - syswriter() :: ($fh, $buf, $length, $offset, $deadline) -> ioresult
-
Creates a lambda that accepts all the parameters used by
syswrite
plus$deadline
. The lambda tries to write$length
bytes to$fh
from$buf
from$offset
, when$fh
becomes available for writing. If$deadline
expires, fails with'timeout'
error. On successful write, returns number of bytes written, or$!
otherwise. - readbuf($reader = sysreader()) :: ($fh, $buf, $cond, $deadline) -> ioresult
-
Creates a lambda that is able to perform buffered reads from
$fh
, either using custom lambdareader
, or using one newly generated bysysreader
. The lambda when called, will read continually from$fh
into$buf
, and will either fail on timeout, I/O error, or end of file, or succeed if$cond
condition matches.The condition
$cond
is a "smart match" of sorts, and can be one of:- integer
-
The lambda will succeed when exactly
$cond
bytes are read from$fh
. - regexp
-
The lambda will succeed when
$cond
matches the content of$buf
. Note thatreadbuf
saves and restores value ofpos($$buf)
, so use of\G
is encouraged here. - coderef :: ($buf -> BOOL)
-
The lambda will succeed if coderef called with
$buf
returns true value. - undef
-
The lambda will succeed on end of file. Note that for all other conditions end of file is reported as an error, with literal
"eof"
string.
- writebuf($writer) :: ($fh, $buf, $length, $offset, $deadline) -> ioresult
-
Creates a lambda that is able to perform buffered writes to
$fh
, either using custom lambdawriter
, or using one newly generated bysyswriter
. That lambda, in turn, will write continually$buf
(from$offset
,$length
bytes) and will either fail on timeout or I/O error, or succeed when$length
bytes are written successfully. - getline($reader) :: ($fh, $buf, $deadline) -> ioresult
-
Same as
readbuf
, but succeeds when a string of bytes ended by a newline is read.
Object API
This section lists methods of IO::Lambda
class. Note that by design all lambda-style functionality is also available for object-style programming. Together with the fact that lambda syntax is not exported by default, it thus leaves a place for possible implementations of independent syntaxes, either with or without lambdas, on top of the object API, without accessing the internals.
The object API is mostly targeted to developers that need to connect third-party asynchronous events with the lambda interface.
- new
-
Creates new
IO::Lambda
object in the passive state. - watch_io($flags, $handle, $deadline, $callback)
-
Registers an IO event listener that will call
$callback
either after$handle
will satisfy condition of$flags
( a combination of IO_READ, IO_WRITE, and IO_EXCEPTION bits), or after$deadline
time is passed. If$deadline
is undef, will watch for the file handle indefinitely.The callback will be called with first parameter as integer set of IO_XXX flags, or 0 if timed out. Other parameters, as with the other callbacks, will be passed the result of the last called callback. The result of the callback will be stored and passed on to the next callback.
- watch_timer($deadline, $callback)
-
Registers a timer listener that will call
$callback
after$deadline
time. - watch_lambda($lambda, $callback)
-
Registers a listener that will call
$callback
after$lambda
, aIO::Lambda
object is finished. If$lambda
is in passive state, it will be started first. - is_stopped
-
Reports whether lambda is stopped or not.
- is_waiting
-
Reports whether lambda has any registered callbacks left or not.
- is_passive
-
Reports if lambda wasn't run yet, -- either after
new
orreset
. - is_active
-
Reports if lambda was run.
- reset
-
Cancels all watchers and switches the lambda to the passive state. If there are any lambdas that watch for this object, these will be called first.
- peek
-
At any given time, returns stored data that are either passed in by
call
if the lambda is in the passive state, or stored result of execution of the latest callback. - start
-
Starts a passive lambda. Can be used for effective restart of the whole lambda; the only requirement is that the lambda should have no pending events.
- call @args
-
Stores
@args
internally, to be passed on to the first callback. Only works in passive state, croaks otherwise. If called multiple times, arguments from the previous calls are overwritten. - terminate @args
-
Cancels all watchers and resets lambda to the stopped state. If there are any lambdas that watch for this object, these will be notified first.
@args
will be stored and available for later calls bypeek
. - destroy
-
Cancels all watchers and resets lambda to the stopped state. Does the same to all lambdas the caller lambda watches after, recursively. Useful where explicit, long-lived lambdas shouldn't be subject to global destruction, which kills objects in random order;
destroy
kills them in some order, at least. - wait @args
-
Waits for the caller lambda to finish, returns the result of
peek
. If the object was in passive state, callscall(@args)
, otherwise@args
are not used. - wait_for_all @lambdas
-
Waits for caller lambda and
@lambdas
to finish. Returns collection ofpeek
results for all objects. The results are unordered. - wait_for_any @lambdas
-
Waits for at least one lambda from list of caller lambda and
@lambdas
to finish. Returns list of finished objects. - yield
-
Runs onle round of dispatching events. Returns 1 if there are more events in internal queues, 0 otherwise.
- run
-
Enters the event loop and doesn't exit until there are no registered events. Can be also called as package method.
- bind @args
-
Creates an event record that contains the lambda and
@args
, and returns it. The lambda won't finish until this event is returned withresolve
.bind
can be called several times on a single lambda; each event requires individualresolve
. - resolve $event
-
Removes
$event
from the internal waiting list. If lambda has no more events to wait, notifies eventual lambdas that wait to the objects, and then stops.Note that
resolve
doesn't provide any means to call associated callbacks, which is intentional. - override $PREDICATE [ $STATE ] $CODEREF
-
Installs a
$CODEREF
as a overriding hook for a predicate -tail
,read
,write
, etc, possibly with a named state. Whenever a lambda calls one of these predicates, the hook will be called instead, that should be able to analyze the call and pass or deny it from the further processing.$STATE
, if omitted, is equivalent to'*'
, that means that checks on lambda state are omitted too. Setting$STATE
toundef
is allowed though, and will match when the lambda state is also undefined (which it is by default).There can be stacked more than one
override
handlers; if$CODEREF
isundef
, removes the last registered hook.Example:
my $q = lambda { ... tail { ... }}; $q-> override( tail => sub { if ( stars are aligned right) { # pass this-> super; } else { # deny and rewrite result return tail { 'not right' } } });
See also
state
andsuper
. - super
-
Analogous to native perl's
SUPER
, but on the predicate level, this method is to be called from overridden predicates to call the original predicate. - state $STATE
-
Helper function for explicit naming of predicate calls. The function stores
$STATE
string on the current lambda, so that eventualoverride
, needing to override internal states of the lambda, will make use of the string to identify a particular state.The rule of thumb is to use it when a lambda contains more than one predicate of a certain type; for example the code
tail { tail { ... }}
is therefore better to be written as
state A => tail { state B => tail { ... }}
SEE ALSO
The package contains backends for other libraries that benefit from asynchronous I/O, but may or may not list them as explicit dependency. If you need to use these, install these separately:
IO::Lambda::SNMP requires SNMP.
Lambda
pid
in IO::Lambda::Signal requires functioningPOSIX::waitpid
.IO::Lambda::HTTP::Authen::NTLM requires Authen::NTLM.
BENCHMARKS
Single-process tcp client and server; server echoes back everything is sent by the client. 500 connections sequentially created, instructed to send a single line to the server, and destroyed.
2.4GHz x86-64 linux 1.2GHz win32 Lambda using select 0.694 sec 6.364 sec Lambda using AnyEvent 0.684 sec 7.031 sec Raw sockets using select 0.145 sec 4.141 sec POE using select 5.349 sec 14.887 sec
LICENSE AND COPYRIGHT
Copyright (c) 2008 capmon ApS. All rights reserved.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
AUTHOR
Dmitry Karasik, <dmitry@karasik.eu.org>.