NAME
AnyEvent::ProcessPool - Asynchronously runs code concurrently in a pool of perl processes
VERSION
version 0.06_001
SYNOPSIS
use AnyEvent::ProcessPool;
my $pool = AnyEvent::ProcessPool->new(
workers => 8,
limit => 10,
include => ['lib', 'some/lib/path'],
);
my $condvar = $pool->async(sub{
# do task type stuff...
});
# Block until result is ready
my $result = $condvar->recv;
DESCRIPTION
Executes code using a pool a forked Perl subprocesses. Supports configurable pool size, automatically restarting processes after a configurable number of requests, and closures (with the caveat that changes are not propagated back to the parent process).
CONSTRUCTOR
workers
Required attribute specifying the number of worker processes to launch. Defaults to the number of CPUs.
limit
Optional attribute that causes a worker process to be restarted after performing limit
tasks. This can be useful when calling code which may be leaky. When unspecified or set to zero, worker processes will only be restarted if it unexpectedly fails.
include
An optional array ref of paths to add to the perl command string used to start the sub-process worker.
METHODS
async
Executes the supplied code ref in a worker sub-process. Remaining (optional) arguments are passed unchanged to the code ref in the worker process. Returns a condvar that will block and return the task result when recv
is called on it.
Alternately, the name of a task class may be supplied. The class must implement the methods 'new' (as a constructor) and 'run'. When using a task class, the arguments will be passed to the constructor (new) and the result of 'run' will be returned.
# With an anonymous subroutine
my $cv = $pool->async(sub{ ... });
# With a code ref
my $cv = $pool->async(\&do_stuff);
# With optional parameter list
my $cv = $pool->async(sub{ ... }, $arg1, $arg2, ...);
# With a task class
my $cv = $pool->async('My::Task', $arg1, ...);
join
Blocks until all pending tasks have completed. This does not prevent new tasks from being queued while waiting (for example, in the callback of an already queued task's condvar).
PIPELINES
Pipelinelines are alternative way of using the process pool. See AnyEvent::ProcessPool::Pipeline for details.
use AnyEvent::ProcessPool::Pipeline;
pipeline workers => 4,
in { get_next_task() }
out { do_stuff_with_result(shift->recv) };
DIAGNOSTICS
Task errors
Error messages resulting from a die
or croak
in task code executed in a worker process are rethrown in the parent process when the condition variable's recv
method is called.
CAVEATS ON MSWIN32
In addition to the usual caveats with regard to emulated forking on Windows, the platform's poor support for pipes means resorting to a pair of TCP sockets instead (which works but is much, much, slower). See the notes for portable_socketpair
and fork_call
in AnyEvent::Util.
SEE ALSO
- Parallel::ForkManager
-
Highly reliable, but somewhat arcane, blocking, and can be tricky to integrate into non-blocking code.
- Coro::ProcessPool
-
Similar in function, but runs only under Coro (which as of 6.513 has experimental support for 5.22).
AUTHOR
Jeff Ober <sysread@fastmail.fm>
COPYRIGHT AND LICENSE
This software is copyright (c) 2017 by Jeff Ober.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.