NAME

AnyEvent::ProcessPool - Asynchronously runs code concurrently in a pool of perl processes

VERSION

version 0.06_001

SYNOPSIS

use AnyEvent::ProcessPool;

my $pool = AnyEvent::ProcessPool->new(
  workers => 8,
  limit   => 10,
  include => ['lib', 'some/lib/path'],
);

my $condvar = $pool->async(sub{
  # do task type stuff...
});

# Block until result is ready
my $result = $condvar->recv;

DESCRIPTION

Executes code using a pool a forked Perl subprocesses. Supports configurable pool size, automatically restarting processes after a configurable number of requests, and closures (with the caveat that changes are not propagated back to the parent process).

CONSTRUCTOR

workers

Required attribute specifying the number of worker processes to launch. Defaults to the number of CPUs.

limit

Optional attribute that causes a worker process to be restarted after performing limit tasks. This can be useful when calling code which may be leaky. When unspecified or set to zero, worker processes will only be restarted if it unexpectedly fails.

include

An optional array ref of paths to add to the perl command string used to start the sub-process worker.

METHODS

async

Executes the supplied code ref in a worker sub-process. Remaining (optional) arguments are passed unchanged to the code ref in the worker process. Returns a condvar that will block and return the task result when recv is called on it.

Alternately, the name of a task class may be supplied. The class must implement the methods 'new' (as a constructor) and 'run'. When using a task class, the arguments will be passed to the constructor (new) and the result of 'run' will be returned.

# With an anonymous subroutine
my $cv = $pool->async(sub{ ... });

# With a code ref
my $cv = $pool->async(\&do_stuff);

# With optional parameter list
my $cv = $pool->async(sub{ ... }, $arg1, $arg2, ...);

# With a task class
my $cv = $pool->async('My::Task', $arg1, ...);

join

Blocks until all pending tasks have completed. This does not prevent new tasks from being queued while waiting (for example, in the callback of an already queued task's condvar).

PIPELINES

Pipelinelines are alternative way of using the process pool. See AnyEvent::ProcessPool::Pipeline for details.

use AnyEvent::ProcessPool::Pipeline;

pipeline workers => 4,
  in  { get_next_task() }
  out { do_stuff_with_result(shift->recv) };

DIAGNOSTICS

Task errors

Error messages resulting from a die or croak in task code executed in a worker process are rethrown in the parent process when the condition variable's recv method is called.

CAVEATS ON MSWIN32

In addition to the usual caveats with regard to emulated forking on Windows, the platform's poor support for pipes means resorting to a pair of TCP sockets instead (which works but is much, much, slower). See the notes for portable_socketpair and fork_call in AnyEvent::Util.

SEE ALSO

Parallel::ForkManager

Highly reliable, but somewhat arcane, blocking, and can be tricky to integrate into non-blocking code.

Coro::ProcessPool

Similar in function, but runs only under Coro (which as of 6.513 has experimental support for 5.22).

AUTHOR

Jeff Ober <sysread@fastmail.fm>

COPYRIGHT AND LICENSE

This software is copyright (c) 2017 by Jeff Ober.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.