NAME
Coro::ProcessPool - an asynchronous process pool
SYNOPSIS
use Coro::ProcessPool;
my $pool = Coro::ProcessPool->new(
max_procs => 4,
max_reqs => 100,
);
my $double = sub { $_[0] * 2 };
# Process in sequence
my %result;
foreach my $i (1 .. 1000) {
$result{$i} = $pool->process($double, [$i]);
}
# Process as a batch
my @results = $pool->map($double, 1 .. 1000);
# Defer waiting for result
my %deferred = map { $_ => $pool->defer($double, [$_]) } 1 .. 1000);
foreach my $i (keys %deferred) {
print "$i = " . $deferred{$i}->() . "\n";
}
# Use a "task class", implementing 'new' and 'run'
my $result = $pool->process('Task::Doubler', 21);
$pool->shutdown;
DESCRIPTION
Processes tasks using a pool of external Perl processes.
ATTRIBUTES
max_procs
The maximum number of processes to run within the process pool. Defaults to the number of CPUs on the ssytem.
max_reqs
The maximum number of tasks a worker process may run before being terminated and replaced with a fresh process. This is useful for tasks that might leak memory over time.
PRIVATE ATTRIBUTES
procs_lock
Semaphore used to control access to the worker processes. Starts incremented to the number of processes (max_procs
).
num_procs
Running total of processes that are currently running.
procs
Array holding the Coro::Process objects.
inbox
Coro::Channel instanced used to queue tasks from the queue
method.
inbox_worker
Coro thread monitoring the inbox
.
is_running
Boolean which signals to the instance that the shutdown
method has been called.
METHODS
capacity
Returns the number of free worker processes.
shutdown
Shuts down all processes and resets state on the process pool. After calling this method, the pool is effectively in a new state and may be used normally.
process($f, $args)
Processes code ref $f
in a child process from the pool. If $args
is provided, it is an array ref of arguments that will be passed to $f
. Returns the result of calling $f->(@$args).
Alternately, $f
may be the name of a class implementing the methods new
and run
, in which case the result is equivalent to calling $f->new(@$args)->run(). Note that the include path for worker processes is identical to that of the calling process.
This call will yield until the results become available. If all processes are busy, this method will block until one becomes available. Processes are spawned as needed, up to max_procs
, from this method. Also note that the use of max_reqs
can cause this method to yield while a new process is spawned.
map($f, @args)
Applies $f
to each value in @args
in turn and returns a list of the results. Although the order in which each argument is processed is not guaranteed, the results are guaranteed to be in the same order as @args
, even if the result of calling $f
returns a list itself (in which case, the results of that calcuation is flattened into the list returned by map
.
defer($f, $args)
Similar to "process" in ., but returns immediately. The return value is a code reference that, when called, returns the results of calling $f-
(@$args)>.
my $deferred = $pool->defer($coderef, [ $x, $y, $z ]);
my $result = $deferred->();
queue($f, $args, $callback)
Queues the execution of $f-
(@$args)> and returns immediately. If $callback
is specified and is a code ref, it will be called with the result of the executed code once the task is processed. Note that the callback is not provided with any identifying information about the task being executed. That is the responsibility of the caller. It is also the caller's responsibility to coordinate any code that depends on the result, for example using a condition variable.
my $cv = AnyEvent->condvar;
sub make_callback {
my $n = shift;
return sub {
my $result = shift;
print "2 * $n = $result\n";
$cv->send;
}
}
$pool->queue($doubler_function, [21], make_callback(21));
$cv->recv;
A NOTE ABOUT IMPORTS AND CLOSURES
Code refs are serialized using Storable to pass them to the worker processes. Once deserialized in the pool process, these functions can no longer see the stack as it is in the parent process. Therefore, imports and variables external to the function are unavailable.
Something like this will not work:
use Foo;
my $foo = Foo->new();
my $result = $pool->process(sub {
return $foo->bar; # $foo not found
});
Nor will this:
use Foo;
my $result = $pool->process(sub {
my $foo = Foo->new; # Foo not found
return $foo->bar;
});
The correct way to do this is to import from within the function:
my $result = $pool->process(sub {
require Foo;
my $foo = Foo->new();
return $foo->bar;
});
...or to pass in external variables that are needed by the function:
use Foo;
my $foo = Foo->new();
my $result = $pool->process(sub { $_[0]->bar }, [ $foo ]);
Use versus require
The use
pragma is run a compile time, whereas require
is evaluated at runtime. Because of this, the use of use
in code passed directly to the process
method can fail because the use
statement has already been evaluated when the calling code was compiled.
This will not work:
$pool->process(sub {
use Foo;
my $foo = Foo->new();
});
This will work:
$pool->process(sub {
require Foo;
my $foo = Foo->new();
});
If use
is necessary (for example, to import a method or transform the calling code via import), it is recommended to move the code into its own module, which can then be called in the anonymous routine:
package Bar;
use Foo;
sub dostuff {
...
}
Then, in your caller:
$pool->process(sub {
require Bar;
Bar::dostuff();
});
If it's a problem...
Use the task class method if the loading requirements are causing headaches:
my $result = $pool->process('Task::Class', [@args]);
COMPATIBILITY
Coro::ProcessPool
will likely break on Win32 due to missing support for non-blocking file descriptors (Win32 can only call select
and poll
on actual network sockets). Without rewriting this as a network server, which would impact performance and be really annoying, it is likely this module will not support Win32 in the near future.
The following modules will get you started if you wish to explore a synchronous process pool on Windows:
AUTHOR
Jeff Ober <jeffober@gmail.com>
LICENSE
BSD License
2 POD Errors
The following errors were encountered while parsing the POD:
- Around line 390:
=back without =over
- Around line 496:
You forgot a '=back' before '=head1'