NAME

IO::Lambda - non-blocking I/O in lambda style

DESCRIPTION

This module is another attempt to fight the horrors of non-blocking I/O programming. The simplicity of the sequential programming is only available when one employs threads, coroutines, or coprocesses. Otherwise state machines are to be built, often quite complex, which fact doesn't help the clarity of the code. This module uses closures to achieve clarity of sequential programming with single-process, single-thread, non-blocking I/O.

SYNOPSIS

Basics

Prerequisite

use IO::Lambda qw(:all);

Create an empty IO::Lambda object

my $q = lambda {};

Wait for it to finish

$q-> wait;

Create lambda object and get its value

$q = lambda { 42 };
print $q-> wait; # will print 42

Create pipeline of two lambda objects

    $q = lambda {
        context lambda { 42 };
	tail { 1 + shift };
    };
    print $q-> wait; # will print 43

Create pipeline that waits for 2 lambdas

    $q = lambda {
        context lambda { 2 }, lambda { 3 };
	tail { sort @_ }; # order is not guaranteed
    };
    print $q-> wait; # will print 23

Non-blocking I/O

Given a socket, create a lambda that implements http protocol

    sub talk
    {
        my $req    = shift;
        my $socket = IO::Socket::INET-> new( $req-> host, $req-> port);

	lambda {
	    context $socket;
	    write {
	        # connected
		print $socket "GET ", $req-> uri, "\r\n\r\n";
		my $buf = '';
		read {
		    sysread $socket, $buf, 1024, length($buf) or return $buf;
		    again; # wait for reading and re-do the block
		}
	    }
	}
    }

Connect and talk to the remote

$request = HTTP::Request-> new( GET => 'http://www.perl.com');

my $q = talk( $request );
print $q-> wait; # will print content of $buf

Connect two parallel connections: by explicitly waiting for each

    $q = lambda {
        context talk($request);
	tail { print shift };
        context talk($request2);
	tail { print shift };
    };
    $q-> wait;

Connect two parallel connections: by waiting for all

    $q = lambda {
        context talk($request1), talk($request2);
	tail { print for @_ };
    };
    $q-> wait;

Teach our simple http request to redirect by wrapping talk(). talk_redirect() will have exactly the same properties as talk() does

    sub talk_redirect
    {
        my $req = shift;
	lambda {
	    context talk( $req);
	    tail {
	        my $res = HTTP::Response-> parse( shift );
		return $res unless $res-> code == 302;

		$req-> uri( $res-> uri);
	        context talk( $req);
		again;
	    }
	}
    }

Working example

    use strict;
    use IO::Lambda qw(:all);
    use IO::Socket::INET;
    my $q = lambda {
        my ( $socket, $url) = @_;
        context $socket;
        write {
            print $socket "GET $url HTTP/1.0\r\n\r\n";
            my $buf = '';
            read {
                my $n = sysread( $socket, $buf, 1024, length($buf));
		return "read error:$!" unless defined $n;
		return $buf unless $n;
                again;
            }
        }
    };
    print $q-> wait( 
        IO::Socket::INET-> new( 
            PeerAddr => 'www.perl.com', 
            PeerPort => 80 
        ),
        '/index.html'
    );

See tests and examples in directory eg/ for more.

API

Events and states

A lambda is an IO::Lambda object, that waits for IO and timeout events, and for events generated when other lambdas are finished. On each such event a callback is executed. The result of the execution is saved, and passed on to the next callback, when the next event arrives.

Life cycle of a lambda goes through three modes: passive, waiting, and stopped. A lambda that is just created, or was later reset with reset call, is in passive state. When the lambda is started, the only callback associated with the lambda will be executed:

$q = lambda { print "hello world!\n" };
# not printed anything yet
$q-> wait; # <- here will

Lambdas are never started explicitly; wait will start passive lambdas, and will wait for the caller lambda to finish. Lambda is finished when there are no more events to listen to. The example lambda above will finish right after print statement.

Lambda can listen to events by calling predicates, that internally subscribe the lambda object to corresponding file handles, timers, and other lambdas. There are only those three types of events that basically constitute everything needed for building a state machive driven by external events, in particular, by non-blocking I/O. Parameters passed to predicates with explicit context call, not by perl subroutine call convention. In the example below, lambda watches for file handle readability:

    $q = lambda {
        context \*SOCKET;
	read { print "I'm readable!\n"; }
	# here is nothing printed yet
    };
    # and here is nothing printed yet

Such lambda, when started, will switch to the waiting state, - will be waiting for the socket. The lambda will finish only after the callback associated with read predicate is called.

Of course, new events can be created inside all callbacks, on each state. This style resembles a dynamic programming of sorts, when the state machine is not hard-coded in advance, but is built as soon as code that gets there is executed.

The events can be created either by explicitly calling predicates, or by restarting the last predicate with again call. For example, code

read { int(rand 2) ? print 1 : again }

will print undeterminable number of ones.

Contexts

Each lambda callback (further on, merely lambda) executes in its own, private context. The context here means that all predicates register callbacks on an implicitly given lambda object, and keep the passed parameters on the context stack. The fact that context is preserved between states, help build terser code with series of IO calls:

context \*SOCKET;
write {
read {
}}

is actually a shorter form for

context \*SOCKET;
write {
context \*SOCKET; # <-- context here is retained from one frame up
read {
}}

And as context is kept, the current lambda object is also, but in this property. The code above is actually

my $self = this;
context \*SOCKET;
write {
this $self;      # <-- object reference is retained here
context \*SOCKET;
read {
}}

this can be used if more than one lambda needs to be accessed. In which case,

this $object;
context @context;

is the same as

this $object, @context;

which means that explicitly setting this will always clear the context.

Data and execution flow

Lambda is initially called with arguments passed from outside. These arguments can be stored using call method; wait also issues call internally, thus replacing any previous data stored by call. Inside the lambda these arguments are available as @_.

Whatever is returned by a predicate callback (including lambda predicate), will be passed as @_ to the next callback, or to outside, if the lambda is finished. The result of a finished lambda is available by peek method, that returns either all array of data available in the array context, or first item in the array otherwise. wait returns the same data as peek does.

When more than one lambda watches for another lambda, the latter will get its last callback results passed to all the watchers. However, when a lambda creates more than one state that derive from the current state, a forking behaviour of sorts, the latest stored results will get overwritten by the first executed callback, so constructions like

read  { 1 + shift };
write { 2 + shift };
...
wait(0)

will eventually return 3, but whether it will be 1+2 or 2+1, is not known.

wait is not the only function that synchronizes input and output data. wait_for_all method waits for all lambdas, including the caller, to finish. It returns collected results of all the objects in a single list. wait_for_any method waits for at least one lambda, from the list of passed lambdas (again, including the caller), to finish. It returns list of finished objects as soon as possible.

Time

Timers and I/O timeouts are given not in the timeout values, as it usually is in event libraries, but as deadlines in (fractional) seconds since epoch. This decision, strange at first sight, actually helps a lot when total execution time is to be tracked. For example, the following code reads as many bytes from a socket within 5 seconds:

   lambda {
       my $buf = '';
       context $socket, time + 5;
       read {
           if ( shift ) {
	       return again if sysread $socket, $buf, 1024, length($buf);
	   } else {
	       print "oops! a timeout\n";
	   }
	   $buf;
       }
   };

Rewriting it with read semantics that accepts time as timeout instead, is left as an exercise to the reader.

Internally, timers use Time::HiRes::time that gives fractional number of seconds. This however is not required for the caller, in which case timeouts will simply be less precise, and will jitter plus-minus half a second.

Predicates

All predicates receive their parameters from the context stack, or simply the context. The only parameter passed to them by using perl call, is a callback itself. Predicates can also be called without a callback, in which case, they will simply pass further data that otherwise would be passed as @_ to the callback. Thus, a predicate can be called either as

read { .. code ... }

or

&read; # no callback

Predicates can either be used after explicit exporting of them by

use IO::Lambda qw(:all);
lambda { ... }

or by using the package syntax,

use IO::Lambda;
IO::Lambda::lambda { ... };
lambda()

Creates a new IO::Lambda object.

read($filehandle, $deadline = undef)

Executes either when $filehandle becomes readable, or after $deadline. Passes one argument, which is either TRUE if the handle is readable, or FALSE if time is expired. If deadline is undef, then no timeout is registered, that means that it will never execute with FALSE.

getline($filehandle, $$buffer, $deadline = undef)

Executes either when a line can be read from $filehandle, or after $deadline if the latter is defined. On success, returns the line read. Otherwise, returns undef and line describing the reason: either timeout, eof, or $! value.

Note: buffer must be shared for all $filehandle operations.

This predicate is a higher-level predicate, that operates on predicates itself. You're welcome to study its source code.

write($filehandle, $deadline = undef)

Exaclty same as read, but executes when $filehandle becomes writable.

io($flags, $filehandle, $deadline = undef)

Executes either when $filehandle satisfies any of the condition $flags, or after $deadline. $flags is a combination of three integer constants, IO_READ, IO_WRITE, and IO_EXCEPTION, that are imported with

use IO::Lambda qw(:constants);

Passes one argument, which is either a combination of the same IO_XXX flags, that report which conditions the handle satisfied, or 0 if time is expired. If deadline is undef, no timeout is registered, i.e. will never return 0.

sleep($deadline)

Executes after $deadline. $deadline cannot be undef.

tail(@lambdas)

Executes when all objects in @lambdas are finished, returns the collected, unordered results of the objects.

again()

Restarts the current state with the current context. All the predicates above, including lambda, are restartable. The code

context $obj1;
tail {
    return if $null++;
    context $obj2;
    again;
};

is thus equivalent to

context $obj1;
tail {
    context $obj2;
    &tail;
};

again passes the current context to the predicate.

context @ctx

If called with no parameters, returns the current context, otherwise replaces the current context with @ctx. It is thus not possible (not that it is practical anyway) to clear the context with this call. If really needed, use this(this) syntax.

this $this, @ctx

If called with no parameters, returns the current lambda. Otherwise, replaces both the current lambda and the current context. Can be useful either when juggling with several lambdas, or as a conveniency over my variables, for example,

this lambda { ... };
this-> wait;

instead of

my $q = lambda { ... };
$q-> wait;
restart $method, $callback

The predicate used for declaration of user-defined restartable predicates (it is not a requirement for a predicate to be restartable though).

This predicate is mostly for internal use, and it is not really decided yet if it will stay. See the module source code for details of the use.

Object API

This section lists methods of IO::Lambda class. Note that all lambda-style functionality is also available for object-style programming by design. Together with the fact that lambda syntax is not exported by default, it thus leaves a place for possible implementations of independent syntaxes, either with or without lambdas, on top of the object API, without accessing the internals.

new

Creates new IO::Lambda object in passive state.

watch_io($flags, $handle, $deadline, $callback)

Registers an IO event listener that will call $callback either after $handle will satisfy condition of $flags ( a combination of IO_READ, IO_WRITE, and IO_EXCEPTION bits), or after $deadline time is passed. If $deadline is undef, will watch for the file handle indefinitely.

The callback will be called with first parameter as integer set of IO_XXX flags, or 0 if timed out. Other parameters, as with the other callbacks, will be passed the result of the last called callback. The result of the callback will be stored and passed on to the next callback.

watch_timer($deadline, $callback)

Registers a timer listener that will call $callback after $deadline time.

watch_lambda($lambda, $callback)

Registers a listener that will call $callback after $lambda, a IO::Lambda object is finished. If $lambda is in passive state, it will be started first.

is_stopped

Reports whether lambda is stopped or not.

is_waiting

Reports whether lambda has any registered callbacks left or not.

is_passive

Reports if lambda wasn't run yet, -- either after new or reset.

is_active

Reports if lambda was run.

reset

Cancels all watchers and switches the lambda to the passive state. If there are any lambdas that watch for this object, these will be called first.

peek

At any given time, returns stored data that are either passed in by call if the lambda is in the passive state, or stored result of execution of the latest callback.

call @args

Stores @args internally, to be passed on to the first callback. Only works in passive state, croaks otherwise. If called multiple times, arguments from the previous calls are overwritten.

terminate @args

Cancels all watchers and resets lambda to the stopped state. If there are any lambdas that watch for this object, these will be called first. @args will be stored and available for later calls by peek.

wait @args

Waits for the caller lambda to finish, returns the result of peek. If the object was in passive state, calls call(@args), otherwise @args are not used.

wait_for_all @lambdas

Waits for caller lambda and @lambdas to finish. Returns collection of peek results for all objects. The return results are unordered.

wait_for_any @lambdas

Waits for at least one lambda from list of caller lambda and @lambdas to finish. Returns list of finished objects.

run

Enters the event loop and doesn't exit until there are no registered events. Can be also called as package method.

bind @args

Creates an event record that contains the lambda and @args, and returns it. The lambda won't finish until this event is returned with resolve.

bind can be called several times on a single lambda; each event requires individual resolve.

resolve $event

Removes $event from the internal waiting list. If lambda has no more events to wait, notifies eventual lambdas that wait to the objects, and the stops.

Note that resolve doesn't call provide any means to call associated callbacks, which is intentional.

SEE ALSO

Coro, threads, POE.

LICENSE AND COPYRIGHT

Copyright (c) 2007 capmon ApS. All rights reserved.

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

AUTHOR

Dmitry Karasik, <dmitry@karasik.eu.org>.