NAME
IO::Lambda - non-blocking I/O in lambda style
DESCRIPTION
This module is another attempt to fight the horrors of non-blocking I/O programming. The simplicity of the sequential programming is only available when one employs threads, coroutines, or coprocesses. Otherwise state machines are to be built, often quite complex, which fact doesn't help the clarity of the code. This module uses closures to achieve clarity of sequential programming with single-process, single-thread, non-blocking I/O.
SYNOPSIS
Basics
Prerequisite
use IO::Lambda qw(:all);
Create an empty IO::Lambda object
my $q = lambda {};
Wait for it to finish
$q-> wait;
Create lambda object and get its value
$q = lambda { 42 };
print $q-> wait; # will print 42
Create pipeline of two lambda objects
$q = lambda {
context lambda { 42 };
tail { 1 + shift };
};
print $q-> wait; # will print 43
Create pipeline that waits for 2 lambdas
$q = lambda {
context lambda { 2 }, lambda { 3 };
tail { sort @_ }; # order is not guaranteed
};
print $q-> wait; # will print 23
Non-blocking I/O
Given a socket, create a lambda that implements http protocol
sub talk
{
my $req = shift;
my $socket = IO::Socket::INET-> new( $req-> host, $req-> port);
lambda {
context $socket;
write {
# connected
print $socket "GET ", $req-> uri, "\r\n\r\n";
my $buf = '';
read {
sysread $socket, $buf, 1024, length($buf) or return $buf;
again; # wait for reading and re-do the block
}
}
}
}
Connect and talk to the remote
$request = HTTP::Request-> new( GET => 'http://www.perl.com');
my $q = talk( $request );
print $q-> wait; # will print content of $buf
Connect two parallel connections: by explicitly waiting for each
$q = lambda {
context talk($request);
tail { print shift };
context talk($request2);
tail { print shift };
};
$q-> wait;
Connect two parallel connections: by waiting for all
$q = lambda {
context talk($request1), talk($request2);
tail { print for @_ };
};
$q-> wait;
Teach our simple http request to redirect by wrapping talk(). talk_redirect() will have exactly the same properties as talk() does
sub talk_redirect
{
my $req = shift;
lambda {
context talk( $req);
tail {
my $res = HTTP::Response-> parse( shift );
return $res unless $res-> code == 302;
$req-> uri( $res-> uri);
context talk( $req);
again;
}
}
}
Working example
use strict;
use IO::Lambda qw(:all);
use IO::Socket::INET;
my $q = lambda {
my ( $socket, $url) = @_;
context $socket;
write {
print $socket "GET $url HTTP/1.0\r\n\r\n";
my $buf = '';
read {
my $n = sysread( $socket, $buf, 1024, length($buf));
return "read error:$!" unless defined $n;
return $buf unless $n;
again;
}
}
};
print $q-> wait(
IO::Socket::INET-> new(
PeerAddr => 'www.perl.com',
PeerPort => 80
),
'/index.html'
);
See tests and examples in directory eg/
for more.
API
Events and states
A lambda is a IO::Lambda
object, that waits for IO and timeout events, and also events generated when other lambdas are finished. On each event a callback bound to the event is executed. The result of this code is saved, and passed on the next callback.
A lambda can be in one of three modes: passive, waiting, and stopped. A lambda that is just created, or was later reset with reset
call, is in passive state. When it will be started, the only callback associated with the lambda will be executed:
$q = lambda { print "hello world!\n" };
# here not printed anything yet
A lambda is never started explicitly; wait
will start passive lambdas, and will wait for the caller lambda to finish. A lambda is finished when there are no more events to listen to. The example lambda above will be finished as soon as it is started.
Lambda can listen to events by calling predicates, that internally subscribe the lambda object to either corresponding file handles, timers, or other lambdas. There are only those three types of events that basically constitute everything needed for building state machive driven by non-blocking IO. Parameters to be passed to predicates are stored on stack with context
call; for example, to listen for when a file handle becomes readable, such code is used:
$q = lambda {
context \*SOCKET;
read { print "I'm readable!\n"; }
# here is nothing printed yet
};
# and here is nothing printed yet
This lambda, when started, will switch to the waiting state, - waiting for the socket. After the callback associated with read
will be called, only then the lambda will finish.
Of course, new events can be created inside all callbacks, on each state. This way, lambdas resemble dynamic programming, when the state machine is not given in advance, but is built as soon as code that gets there is executed.
The events can be created either by explicitly calling predicates, or restarting the last predicate with again
call. For example, code
read { int(rand 2) ? 0 : again }
will be impossible to tell how many times was called.
Contexts
Each lambda executes in its own, private context. The context here means that all predicates register callbacks on an implicitly given lambda object, and retain the parameters passed to them further on. That helps for example to rely on the fact that context is preserved in a series on IO calls,
context \*SOCKET;
write {
read {
}}
which is actually a shorter form for
context \*SOCKET;
write {
context \*SOCKET; # <-- context here is retained from one frame up
read {
}}
Where the parameters to predicates are stored in context, the current lambda object is also implicitly stored in this
property. The above code is actually is
my $self = this;
context \*SOCKET;
write {
this $self; # <-- object reference is retained here
context \*SOCKET;
read {
}}
this
can be used if more than one lambda is need to be accessed. In which case,
this $object;
context @context;
is the same as
this $object, @context;
which means that explicitly setting this
will always clear the context.
Data and execution flow
Lambda's first callback is called with arguments passed from outside. These arguments can be stored using call
method; wait
also issues call
internally, thus replacing any previous data stored by call
. Inside the first callback these arguments are available as @_
.
Whatever is returned by a predicate callback (including lambda
predicate), will be passed as @_
to the next callback, or to outside, if lambda is finished. The result of a finished lambda is available by peek
method, that returns either all array of data available in the array context, or first item in the array otherwise. wait
returns the same data as peek
does.
When more than one lambda watches for another lambda, the latter will get its last callback results passed to all callbacks of the wathers. However, when a lambda creates more than one state that derive from the current state, a 'forking' of sorts, the latest stored results will get overwritten by the first executed callback, so the constructions like
read { 1 + shift };
write { 2 + shift };
...
wait(0)
will eventually return 3, but will it be 1+2 or 2+1, is never guaranteed.
wait
is not the only function that synchronizes input and output. wait_for_all
method waits for all lambdas, including the caller, to finish. It returns collected results of all the objects as a single list. wait_for_any
method waits for at least one lambda, from the list of passed lambdas (again, including the caller), to finish. It returns list of finished objects as soon as possible.
Time
Timers and I/O timeouts are given not in the timeout values, as it usually is in event libraries, but as deadline in (fractional) seconds. This, strange at first sight decision, actually helps a lot when a total execution time is to be tracked. For example, the following code reads as many bytes from a socket within 5 seconds:
lambda {
my $buf = '';
context $socket, time + 5;
read {
if ( shift ) {
return again if sysread $socket, $buf, 1024, length($buf);
} else {
print "oops! a timeout\n";
}
$buf;
}
};
Internally, timers use Time::HiRes::time
that gives fractional seconds. However, this is not required for the caller, in which case timeouts will be simply rounded to integer second.
Predicates
All predicates read parameters from the context. The only parameter passed with perl call, is a callback. Predicates can be called without the callback, in which case, they will simply pass further data that otherwise would be passed as @_
to the callback. So, a predicate can be called either as
read { .. code ... }
or
&read; # no callback
Predicates can only be used after explicit exporting of them by
use IO::Lambda qw(:all);
- lambda()
-
Creates a new
IO::Lambda
object. - read($filehandle, $deadline = undef)
-
Executes either when
$filehandle
becomes readable, or after$deadline
. Passes one argument, which is either TRUE if the handle is readable, or FALSE if time is expired. Ifdeadline
isundef
, no timeout is registered, i.e. will never execute with FALSE. - write($filehandle, $deadline = undef)
-
Exaclty same as
read
, but executes when$filehandle
becomes writable. - io($flags, $filehandle, $deadline = undef)
-
Executes either when
$filehandle
satisfies the condition passed in$flags
, or after$deadline
.$flags
is a combination of three integer constants,IO_READ
,IO_WRITE
, andIO_EXCEPTION
, that are imported withuse IO::Lambda qw(:constants);
Passes one argument, which is either a combination of the same
IO_XXX
flags, that show what conditions the handle satisfies, or 0 if time is expired. Ifdeadline
isundef
, no timeout is registered, i.e. will never execute with 0. - sleep($deadline)
-
Executes after
$deadline
.$deadline
cannot beundef
. - tail(@lambdas)
-
Executes when all objects in
@lambdas
are finished, passes the collected results of the lambdas to the callback. The result order is not guaranteed. - again()
-
Restarts the current state with the current context. All the predicates above, including
lambda
, are restartable. The codecontext $obj1; tail { return if $null++; context $obj2; again; };
is thus equivalent to
context $obj1; tail { context $obj2; &tail; };
- context @ctx
-
If called with no parameters, returns the current context, otherwise replaces the current context with
@ctx
. It is thus not possible (not that it is practical anyway) to clear the context with this call. If really needed, usethis(this)
syntax. - this $this, @ctx
-
If called with no parameters, returns the current lambda objects. Otherwise, replaces both the current lambda and the current context. Can be useful either when juggling with several lambdas, or as a conveniency over
my
variables, for example,this lambda { ... }; this-> wait;
instead of
my $q = lambda { ... }; $q-> wait;
- restart $method, $callback
-
The predicate used for declaration of user-defined restartable predicates (it is not a requirement for a predicate to be restartable though).
This predicate is mostly for internal use, and it is not really decided yet if it will stay. See the module source code for use details.
Object API
This section lists methods of IO::Lambda
class. Note that all lambda-style functionality by design is also available for object-style programming -- not that it looks nice, or has much practical value, but still.
- new
-
Creates new
IO::Lambda
object in passive state. - watch_io($flags, $handle, $deadline, $callback)
-
Registers an IO event listener that will call
$callback
either after$handle
will satisfy condition of$flags
( a combination of IO_READ, IO_WRITE, and IO_EXCEPTION bits), or after$deadline
time is passed. If$deadline
is undef, will watch for the file handle indefinitely.The callback will be called with first parameter as integer set of IO_XXX flags, or 0 if timed out. Other parameters, as with the other callbacks, will be passed the result of the last called callback. The result of the callback will be stored and passed on to the next callback.
- watch_timer($deadline, $callback)
-
Registers a timer listener that will call
$callback
after$deadline
time. - watch_lambda($lambda, $callback)
-
Registers a listener that will call
$callback
after$lambda
, aIO::Lambda
object is finished. If$lambda
is in passive state, it will be started first. - is_stopped
-
Reports whether lambda is stopped or not.
- is_waiting
-
Reports whether lambda has any registered callbacks left or not.
- is_passive
-
Reports if lambda wasn't run yet, -- either after
new
orreset
. - is_active
-
Reports if lambda was run.
- reset
-
Cancels all watchers and resets lambda in the passive state. If there are any lambdas that watch for this object, these will be called first.
- peek
-
At any given time, returns stored data that are either passed in by
call
if the lambda is in the passive state, or stored result of execution of the latest callback. - call @args
-
Stores
@args
internally to be passed to a first callback. Only works in passive state, croaks otherwise. If called multiple times, arguments from the previous calls are discarded. - terminate @args
-
Cancels all watchers and resets lambda in the stopped state. If there are any lambdas that watch for this object, these will be called first.
@args
will be stored and available for later calls bypeek
. - wait @args
-
Waits for the caller lambda to finish, returns the result of
peek
. If the object was in passive state, callscall(@args)
, otherwise@args
are not used. - wait_for_all @lambdas
-
Waits for caller lambda and
@lambdas
to finish. Returns collection ofpeek
results for all objects. The return results are unordered. - wait_for_any @lambdas
-
Waits for at least one lambda from list of caller lambda and
@lambdas
to finish. Returns list of finished objects. - run
-
Enters the event loop and doesn't exit until there are no registered events. Can be also called as package method.
SEE ALSO
LICENSE AND COPYRIGHT
Copyright (c) 2007 capmon ApS. All rights reserved.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
AUTHOR
Dmitry Karasik, <dmitry@karasik.eu.org>.