NAME
AnyEvent::ProcessPool - Asynchronously runs code concurrently in a pool of perl processes
VERSION
version 0.07
SYNOPSIS
use AnyEvent::ProcessPool;
my $pool = AnyEvent::ProcessPool->new(
workers => 8,
limit => 10,
include => ['lib', 'some/lib/path'],
);
my $condvar = $pool->async(sub{
# do task type stuff...
});
# Block until result is ready
my $result = $condvar->recv;
DESCRIPTION
Executes code using a pool a forked Perl subprocesses. Supports configurable pool size, automatically restarting processes after a configurable number of requests, and closures (with the caveat that changes are not propagated back to the parent process).
CONSTRUCTOR
workers
Required attribute specifying the number of worker processes to launch. Defaults to the number of CPUs.
limit
Optional attribute that causes a worker process to be restarted after performing limit
tasks. This can be useful when calling code which may be leaky. When unspecified or set to zero, worker processes will only be restarted if it unexpectedly fails.
include
An optional array ref of paths to add to the perl command string used to start the sub-process worker.
METHODS
async
Executes the supplied code ref in a worker sub-process. Remaining (optional) arguments are passed unchanged to the code ref in the worker process. Returns a condvar that will block and return the task result when recv
is called on it.
Alternately, the name of a task class may be supplied. The class must implement the methods 'new' (as a constructor) and 'run'. When using a task class, the arguments will be passed to the constructor (new) and the result of 'run' will be returned.
# With an anonymous subroutine
my $cv = $pool->async(sub{ ... });
# With a code ref
my $cv = $pool->async(\&do_stuff);
# With optional parameter list
my $cv = $pool->async(sub{ ... }, $arg1, $arg2, ...);
# With a task class
my $cv = $pool->async('My::Task', $arg1, ...);
join
Blocks until all pending tasks have completed. This does not prevent new tasks from being queued while waiting (for example, in the callback of an already queued task's condvar).
PIPELINES
Pipelinelines are alternative way of using the process pool. See AnyEvent::ProcessPool::Pipeline for details.
use AnyEvent::ProcessPool::Pipeline;
pipeline workers => 4,
in { get_next_task() }
out { do_stuff_with_result(shift->recv) };
DIAGNOSTICS
Task errors
Error messages resulting from a die
or croak
in task code executed in a worker process are rethrown in the parent process when the condition variable's recv
method is called.
"AnyEvent::ProcessPool::Worker: ..." (warning)
When a worker sub-process emits output to STDERR
, the process pool warns the message out to its own STDERR
.
"error launching worker process: ..."
Thrown when a worker sub-process failed to launch due to an execution error.
"worker terminated in response to signal: ..."
Thrown when a worker sub-process exits as a result of a signal received.
"worker terminated with non-zero exit status: ..."
Thrown when a worker sub-process terminates with a non-zero exit code. The worker will be automatically restarted.
INCOMPATIBILITIES
Will not work on MSWin32 (although Cygwin should be fine) due to lack of support for non-blocking writes to process pipes (see notes in AnyEvent::Open3::Simple.
SEE ALSO
- Parallel::ForkManager
-
Highly reliable, but somewhat arcane, blocking, and can be tricky to integrate into non-blocking code.
- Coro::ProcessPool
-
Similar in function, but runs only under Coro (which as of 6.513 has experimental support for 5.22).
AUTHOR
Jeff Ober <sysread@fastmail.fm>
COPYRIGHT AND LICENSE
This software is copyright (c) 2017 by Jeff Ober.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.