NAME
AnyEvent::Task - Client/server-based asynchronous worker pool
SYNOPSIS 1: PASSWORD HASHING
Server
use AnyEvent::Task;
use Authen::Passphrase::BlowfishCrypt;
my $dev_urandom;
my $server = AnyEvent::Task::Server->new(
listen => ['unix/', '/tmp/anyevent-task.socket'],
setup => sub {
open($dev_urandom, "/dev/urandom") || die "open urandom: $!";
},
interface => {
hash => sub {
my ($plaintext) = @_;
read($dev_urandom, my $salt, 16) == 16 || die "bad read from urandom";
return Authen::Passphrase::BlowfishCrypt->new(cost => 10,
salt => $salt,
passphrase => $plaintext)
->as_crypt;
},
verify => sub {
my ($crypted, $plaintext) = @_;
return Authen::Passphrase::BlowfishCrypt->from_crypt($crypted)
->match($plaintext);
},
},
);
$server->run; # or AE::cv->recv
Client
use AnyEvent::Task::Client;
my $client = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task.socket'],
);
my $checkout; $checkout = $client->checkout( timeout => 5, );
$checkout->hash('secret',
sub {
my ($checkout, $crypted) = @_;
die "hashing process died: $@" if defined $@;
print "Hashed password is $crypted\n";
$checkout->verify($crypted,
sub {
my ($checkout, $result) = @_;
print "Verify result is $result\n":
});
});
SYNOPSIS 2: DBI
Server
use AnyEvent::Task::Server;
use DBI;
my $dbh;
my $server = AnyEvent::Task::Server->new(
listen => ['unix/', '/tmp/anyevent-task.socket'],
setup => sub {
$dbh = DBI->connect(...);
},
interface => sub {
my ($method, @args) = @_;
$args[0] = $dbh->prepare_cached($args[0]) if defined $args[0];
$dbh->$method(@args);
},
);
$server->run; # or AE::cv->recv
Client
use AnyEvent::Task::Client;
my $dbh_pool = AnyEvent::Task::Client->new(
connect => ['unix/', '/tmp/anyevent-task.socket'],
);
my $username = 'jimmy';
my $dbh = $dbh_pool->checkout;
$dbh->selectrow_hashref(q{ SELECT email FROM user WHERE username = ? },
undef, $username,
sub {
my ($dbh, $row) = @_;
die "DB lookup failed: $@" if defined $dbh;
print "User's email is $row->{email}\n";
## Use same $dbh here if using transactions
});
DESCRIPTION
WARNING: This module's API may change without warning. Also, the docs
are somewhat incomplete and out of date. I will be fixing this soonish.
The synopsis makes this module sounds much more complicated than it
actually is. AnyEvent::Task is a fork-on-demand but persistent-worker
server (AnyEvent::Task::Server) combined with an asynchronous interface
to a request queue and pooled-worker client (AnyEvent::Task::Client).
Both client and server are of course built with AnyEvent because it's
awesome. However, workers can't use AnyEvent (yet).
A server is started with "AnyEvent::Task::Server->new". This should at
least be passed the "listen" and "interface" arguments. Keep the
returned server object around for as long as you want the server to be
running. "interface" is the code that should handle each request. See
the interface section below for its specification. A "setup" coderef can
be passed in to run some code when a new worker is forked. A
"checkout_done" coderef can be passed in to run some code whenever a
checkout is released (see below).
A client is started with "AnyEvent::Task::Client->new". You only need to
pass "connect" to this. Keep the returned client object around as long
as you wish the client to be connected.
After both the server and client are initialised, each process must
enter AnyEvent's "main loop" in some way, possibly just "AE::cv->recv".
In the client process, you may call the "checkout" method on the client
object. This checkout object can be used to run code on a remote worker
process in a non-blocking manner. The "checkout" method doesn't require
any arguments, but "timeout" is recommended.
You can treat a checkout object as an object that proxies its method
calls to a worker process or a function that does the same. You pass the
arguments to these method calls as an argument to the checkout object,
followed by a callback as the last argument. This callback will be
called once the worker process has returned the results. This callback
will normally be passed two arguments, the checkout object and the
return value. In the event of an exception thrown inside the worker,
only the checkout object will be passed in and $@ will be set to the
error message.
INTERFACE
There are two formats possible for the "interface" option when creating
a server. The first (and most general) is a coderef. This coderef will
be passed the list of arguments that were sent when the checkout was
called in the client process (without the trailing callback of course).
As described above, you can use a checkout object as a coderef or as an
object with methods. If the checkout is invoked as an object, the method
name is prepended to the arguments passed to "interface":
interface => sub {
my ($method, @args) = @_;
},
If the checkout is invoked as a coderef, method is omitted:
interface => sub {
my (@args) = @_;
},
The second format possible for "interface" is a hash ref. This is a
minor short-cut for method dispatch where the method invoked on the
checkout object is the key to which coderef to be run in the worker:
interface => {
method1 => sub {
my (@args) = @_;
},
method2 => sub {
my (@args) = @_;
},
},
Note that since the protocol between the client and the worker process
is JSON-based, all arguments and return values must be serializable to
JSON. This includes most perl scalars like strings, a limited range of
numerical types, and hash/list constructs with no cyclical references.
A future backwards compatible RPC protocol may use Storable or something
else, although note that you can already serialise an object with
Storable manually, send the resulting string over the existing protocol,
and then deserialise it in the worker.
STARTING THE SERVER
Technically, running the server and the client in the same process is
possible, but is highly discouraged since the server will "fork()" when
the client desires a worker process. When this happens, all descriptors
in use by the client and server are duped into the worker process. This
will at least interfere with cleaning up (closing) these descriptors in
the client. So after a "fork()" the worker should close all descriptors
except for its connection to the client and a pipe to the server which
is used in order to detect a server shutdown (and then gracefully exit).
Also, forking a busy client may be memory-inefficient.
Since it's more of a bother than it's worth to run the server and the
client in the same process, there is an alternate server constructor,
"AnyEvent::Task::Server::fork_task_server". It can be passed the same
arguments as the regular "new" constructor:
## my ($keepalive_pipe, $pid) =
AnyEvent::Task::Server::fork_task_server(
listen => ['unix/', '/tmp/anyevent-task.socket'],
interface => sub {
return "Hello from PID $$";
},
);
The only differences between this and the regular constructor is that
this will fork a process which becomes the server, and that it will
install a "keep-alive" pipe between the server and the client. This
keep-alive pipe will be used by the server to detect when the
client/parent process exits.
If "AnyEvent::Task::Server::fork_task_server" is called in a void
context, then the reference to this keep-alive pipe is pushed onto
@AnyEvent::Task::Server::children_sockets. Otherwise, the keep-alive
pipe and the server's PID are returned. Closing the pipe will terminate
the worker gracefully. Killing the PID will attempt to terminate the
worker immediately.
Since this constructor forks and requires using AnyEvent in both the
parent and child processes, it is important that you not install any
AnyEvent watchers before calling it. The usual caveats about forking
AnyEvent applications apply (see AnyEvent docs).
DESIGN
The first thing to realise is that each client maintains a "pool" of
connections to worker processes. Every time a checkout is issued, it is
placed into a first-come, first-serve queue. Once a worker process
becomes available, it is associated with that checkout until that
checkout is garbage collected. Each checkout also maintains a queue of
requests, so that as soon as this worker process is allocated, the
requests are filled also on a first-come, first-server basis.
"timeout" can be passed as a keyword argument to "checkout". Once a
request is queued up on that checkout, a timer of "timout" seconds
(default is 30, undef means infinity) is started. If the request
completes during this timeframe, the timer is cancelled. If the timer
expires however, the worker connection is terminated and an exception is
thrown in the dynamic context of the callback (see Callback::Frame).
FIXME: document this better.
Note that since timeouts are associated with a checkout, the client
process can be started before the server and as long as the server is
started within "timeout" seconds, no requests will be lost. The client
will continually try to acquire worker processes until a server is
available, and once one is available it will attempt to fill all queued
checkouts. Because of this, you should usually install a Callback::Frame
"catch" block to handle timeout errors gracefully (log something and
send error message to the client if applicable).
Additionally, because of checkout queuing the maximum number of worker
processes a client should attempt to obtain can be limited with the
"max_workers" argument when creating a client object. If there are more
live checkouts than "max_workers", the remaining checkouts will have to
wait until one of the other checkouts becomes available. Note that
typically a request is issued as soon as the checkout is created and in
this case the timer starts then, meaning that some checkouts may never
be serviced if the system can't handle the load.
The "min_workers" argument can be used to "pre-fork" some "hot-standby"
worker processes when creating the client. The default is 2 though note
that this may change (FIXME: consider if the default should be 0).
COMPARISON WITH HTTP
Why a custom protocol, client, and server? Can't we just use something
like HTTP?
It depends.
AnyEvent::Task clients send discrete messages and receive ordered,
discrete replies from workers, much like HTTP. The AnyEvent::Task
protocol can be extended in a backwards compatible manner like HTTP.
AnyEvent::Task communication can be pipelined (and possibly in the
future even compressed), like HTTP.
AnyEvent::Task servers (currently) all obey a very specific
implementation policy: They are kind of like CGI servers in that each
process is guaranteed to be handling only one connection at once so it
can perform blocking operations without worrying about holding up other
connections.
Actually, since a single process can handle many requests in a row, the
AnyEvent::Task server is more like a FastCGI server, except that while a
client holds a checkout, it is guaranteed an exclusive lock on that
process. With a FastCGI server, it is assumed that requests are
stateless so you can't necessarily be sure you'll get the same process
for two consecutive requests. In fact, if an error is thrown in the
FastCGI handler you may never get the same process back again.
Probably the most fundamental difference between the AnyEvent::Task
protocol and HTTP is that in AnyEvent::Task, the client is the dominant
protocol orchestrator whereas in HTTP it is the server.
In AnyEvent::Task, the client manages the worker pool and the client
decides if/when the worker process should terminate. In the normal case,
a client will just return the worker to its worker pool. A worker can
request a shutdown when its parent server dies but can't outright refuse
to accept commands until the client is good and ready.
Client process can be started and checkouts can be obtained before the
server is even started. The client will continue to try to obtain worker
processes until either the server starts or the checkout in question
times out.
The client decides the timeout for each checkout and different clients
can have different timeouts while connecting to the same server.
The client even decides how many minimum and maximum workers it will run
at once. The server is really just a simple on-demand-forking server and
most of the sophistication is in the asynchronous client.
SEE ALSO
<The AnyEvent::Task github repo>
There's about a million CPAN modules that do similar things.
This module is designed to be used in a non-blocking, process-based
program on unix. Depending on your exact requirements you might find
something else useful: Parallel::ForkManager, Thread::Pool, an HTTP
server of some kind, &c.
If you're into AnyEvent, AnyEvent::DBI and AnyEvent::Worker (based on
AnyEvent::DBI), and AnyEvent::ForkObject send and receive commands from
worker processes similar to this module. AnyEvent::Worker::Pool also has
an implementation of a worker pool. AnyEvent::Gearman can interface with
Gearman services.
If you're into POE there is POE::Component::Pool::DBI, POEx::WorkerPool,
POE::Component::ResourcePool, POE::Component::PreforkDispatch,
Cantella::Worker, &c.
BUGS
This module is still being developed and there are still some important
FIXMEs remaining. Please sit tight.
AUTHOR
Doug Hoyte, "<doug@hcsw.org>"
COPYRIGHT & LICENSE
Copyright 2012 Doug Hoyte.
This module is licensed under the same terms as perl itself.