NAME

Parallel::WorkUnit - Provide multi-paradigm forking with ability to pass back data

VERSION

version 2.243480

SYNOPSIS

#
# Standard Interface
#
my $wu = Parallel::WorkUnit->new();
$wu->async( sub { ... }, \&callback );

$wu->waitall();


#
# Limiting Maximum Parallelization
#
$wu->max_children(5);
$wu->queue( sub { ... }, \&callback );
$wu->queueall( \@data, sub { ... }, \&callback );
$wu->waitall();


#
# Ordered Responses
#
my $wu = Parallel::WorkUnit->new();
$wu->async( sub { ... } );

@results = $wu->waitall();

#
# Spawning off X number of workers
# (Ordered Response paradigm shown with 10 children)
#
my $wu = Parallel::WorkUnit->new();
$wu->asyncs( 10, sub { ... } );

@results = $wu->waitall();


#
# AnyEvent Interface (only usable if AnyEvent is installed)
#
use AnyEvent;
my $wu = Parallel::WorkUnit->new();

$wu->use_anyevent(1);
$wu->async( sub { ... }, \&callback );
$wu->waitall();  # Not strictly necessary, callbacks happen within event loop


#
# Just spawn something into another process, don't capture return
# values, don't allow waiting on process, etc.
#
my $wu = Parallel::Workunit->new();
$wu->start( { ... } );

DESCRIPTION

This is a very simple forking implementation of parallelism, with the ability to pass data back from the asyncronous child process in a relatively efficient way (with the limitation of using a pipe to pass the information, serialized, back). It was designed to be very simple for a developer to use, with the ability to pass reasonably large amounts of data back to the parent process.

This module is also designed to work with AnyEvent when desired, but it does not require AnyEvent to be installed for other functionality to work.

There are many other Parallel::* applications in CPAN - it would be worth any developer's time to look through those and choose the best one.

ATTRIBUTES

use_anyevent

$wu->use_anyevent(1);

If set to a value that is true, creates AnyEvent watchers for each asyncronous or queued job. The equivilent of an AnyEvent condition variable recv(), used when all processes finish executing, is the waitall() method. However, the processes are integrated into a standard AnyEvent loop, so it isn't strictly necessary to call waitall(). In addition, a call to waitall() will execute other processes in the AnyEvent event loop.

The default value is false.

max_children

$wu->max_children(5);
$wu->max_children(undef);

say "Max number of children: " . $wu->max_children();

If set to a value other than zero or undef, limits the number of outstanding queue children (created by the queue() or queueall() method) that can be executing at any given time.

This defaults to 5.

This attribute does not impact the async() method's ability to create children, but these children will count against the limit used by queue() or queueall().

Calling without any parameters will return the number of children.

METHODS

new

Create a new workunit class. Optionally, takes a list that corresponds to a hashref, in the form of key and value. This accepts the key max_children, which, if present (and not undef) will limit the number of spawned subprocesses that can be active when using the queue() or queueall() methods. Defaults to 5. See the max_children method for additional information.

async( sub { ... }, \&callback )

$wu->async( sub { return 1 }, \&callback );

# To get back results in "ordered" return mode
$wu->async( sub { return 1 } );
@results = $wu->waitall();

Spawns work on a new forked process. The forked process inherits all Perl state from the parent process, as would be expected with a standard fork() call. The child shares nothing with the parent, other than the return value of the work done.

The work is specified either as a subroutine reference or an anonymous sub (sub { ... }) and should return a scalar. Any scalar that Storable's freeze() method can deal with is acceptable (for instance, a hash reference or undef).

The result is serialized and streamed back to the parent process via a pipe. The parent, in a waitall() call, will call the callback function (if provided) with the unserialized return value.

If a callback is not provided, the parent, in the waitall() call, will gather these results and return them as an ordered list.

In all modes, should the child process die, the parent process will also die (inside the waitall() method).

The PID of the child is returned to the parent process when this method is executed.

The max_children attribute is not examined in this method - you can spawn a new child regardless of the number of children already spawned. However, you children started with this method still count against the limit used by queue() or queueall().

asyncs( $children, sub { ... }, \&callback )

$wu->asyncs( 10, sub { return 1 }, \&callback );

# To get back results in "ordered" return mode
$wu->asyncs( 10, sub { return 1 } );
@results = $wu->waitall();

Added in 1.117.

This functions similarly to the async() method, with a couple key differences.

First, it takes an additional parameter, $children, that specifies the number of child processes to spawn. Like the async() method, the children are spawned immediately, regardless of the value of the max_children attribute.

In addition, when the sub/coderef is executed, it is called with a single parameter representing the child number in that particular instance (between zero to $children-1).

Returns the number of children spawned.

See async() for more details on how this function works.

waitall()

Called from the parent method while waiting for the children to exit. This method handles children that die() or return a serializable data structure. When all children return, this method will return.

If a child dies unexpectedly, this method will die() and propagate a modified exception.

In the standard (not ordered) mode, I.E. where a callback was passed to async(), this will return nothing.

In the ordered mode, I.E. where no callbacks were provided to async(), this will return the results of the async calls in an ordered list. The list will be ordered by the order in which the async calls were executed.

waitone()

This method similarly to waitall(), but only waits for a single PID. It will return after any PID exits.

If this method is called when there is no processes executing, it will simply return undef. Otherwise, it will wait and then return 1.

wait($pid)

This functions simiarly to waitone(), but waits only for a specific PID. See the waitone() documentation above for details.

If wait() is called on a process that is already done executing, it simply returns. Otherwise, it waits until the child process's work unit is complete and executes the callback routine, then returns.

count()

This method returns the number of currently outstanding processes (in either a running state or a waiting to send their output).

queue( sub { ... }, \&callback )

Spawns work on a new forked process, doing so immediately if less than max_children are running. If there are already max_children are running, this will run the process once a slot becomes available.

This method should be treated as nearly identical to async(), with the only difference being the above behavior (limiting to max_children) and not returning a PID. Instead, a value of 1 is returned if the process is immediately started, undef otherwise.

The result is serialized and streamed back to the parent process via a pipe. The parent, in a waitall() call, will call the callback function (if provided) with the unserialized return value.

If a callback is not provided, the parent, in the waitall() call, will gather these results and return them as an ordered list.

queueall( $data, sub { ... }, \&callback )

Added in 2.243450.

The $data paraemter should be a reference to a list.

Spawns work on a new forked process, for each element in @$data, doing so immediately if less than max_children are running. If there are already max_children are running, this will run the process once a slot becomes available.

This method should be treated as nearly identical to queue(), if queue() was called within a for loop, and the work subroutine took one paramenter (the element of the data to operate on).

The result is serialized and streamed back to the parent process via a pipe. The parent, in a waitall() call, will call the callback function (if provided) with the unserialized return value.

If a callback is not provided, the parent, in the waitall() call, will gather these results and return them as an ordered list.

This always returns 1 on enqueuing the processes.

start( sub { ... } );

Added in 1.191810.

Spawns work on a new forked process, doing so immediately regardless of how many other children are running.

This method is similar to async(), but unlike async(), no provision to receive return value or wait on the child is made. This is somewhat similar to start in Raku (but differs as this starts a subprocess, not a new thread, and there is thus no shared data (changes to data in the child process will not be seen in the parent process).

Note that the child inherits all open file descriptors.

Not also that the child process will be part of the same process group as the parent process. Additional work is required to daemonize the child.

CAVEATS

This module uses Storable to serialize objects, except for objects that have a FREEZE and THAW method defined, in which case those methods are used. As a result, it cannot serialize some objects, such as REGEXP (sometimes), CODE, or OBJECT (I.E. Corinna objects created with the class statement in Perl 5.38+), unless these objects also have a FREEZE and THAW method defined.

AUTHOR

Joelle Maslak <jmaslak@antelope.net>

COPYRIGHT AND LICENSE

This software is copyright (c) 2015-2024 by Joelle Maslak.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.