NAME
MCE::Hobo - A threads-like parallelization module
VERSION
This document describes MCE::Hobo version 1.836
SYNOPSIS
use MCE::Hobo;
MCE::Hobo->init(
max_workers => 'auto', # default undef, unlimited
hobo_timeout => 20, # default undef, no timeout
posix_exit => 1, # default undef, CORE::exit
on_start => sub {
my ( $pid, $ident ) = @_;
...
},
on_finish => sub {
my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
...
}
);
MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
sub parallel {
my ($arg1) = @_;
print "Hello again, $arg1\n" if defined($arg1);
print "Hello again, $_\n"; # same thing
}
MCE::Hobo->create( \¶llel, $_ ) for 1 .. 3;
my @hobos = MCE::Hobo->list();
my @running = MCE::Hobo->list_running();
my @joinable = MCE::Hobo->list_joinable();
my @count = MCE::Hobo->pending();
# Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
$_->join() for @hobos;
# Joining occurs immediately as hobo(s) complete execution.
1 while MCE::Hobo->wait_one();
my $hobo = mce_async { foreach (@files) { ... } };
$hobo->join();
if ( my $err = $hobo->error() ) {
warn "Hobo error: $err\n";
}
# Get a hobo's object
$hobo = MCE::Hobo->self();
# Get a hobo's ID
$pid = MCE::Hobo->pid(); # $$
$pid = $hobo->pid();
$pid = MCE::Hobo->tid(); # tid is an alias for pid
$pid = $hobo->tid();
# Test hobo objects
if ( $hobo1 == $hobo2 ) {
...
}
# Give other hobos a chance to run
MCE::Hobo->yield();
MCE::Hobo->yield(0.05);
# Return context, wantarray aware
my ($value1, $value2) = $hobo->join();
my $value = $hobo->join();
# Check hobo's state
if ( $hobo->is_running() ) {
sleep 1;
}
if ( $hobo->is_joinable() ) {
$hobo->join();
}
# Send a signal to a hobo
$hobo->kill('SIGUSR1');
# Exit a hobo
MCE::Hobo->exit(0);
MCE::Hobo->exit(0, @ret); # MCE::Hobo 1.827+
DESCRIPTION
A Hobo is a migratory worker inside the machine that carries the asynchronous gene. Hobos are equipped with threads
-like capability for running code asynchronously. Unlike threads, each hobo is a unique process to the underlying OS. The IPC is managed by MCE::Shared
, which runs on all the major platforms including Cygwin.
An exception was made on the Windows platform to spawn threads versus children in MCE::Hobo
1.807 until 1.816. For consistency, the 1.817 release reverts back to spawning children on all supported platforms.
MCE::Hobo
may be used as a standalone or together with MCE
including running alongside threads
.
use MCE::Hobo;
use MCE::Shared;
# synopsis: head -20 file.txt | perl script.pl
my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
my $ofh = MCE::Shared->handle( ">", \*STDOUT );
my $ary = MCE::Shared->array();
sub parallel_task {
my ( $id ) = @_;
while ( <$ifh> ) {
printf {$ofh} "[ %4d ] %s", $., $_;
# $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
$ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
}
}
my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
my $hobo2 = MCE::Hobo->new( \¶llel_task, 2 );
my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
$_->join for MCE::Hobo->list(); # ditto: MCE::Hobo->wait_all();
# search array (total one round-trip via IPC)
my @vals = $ary->vals( "val =~ / ID 2 /" );
print {*STDERR} join("", @vals);
API DOCUMENTATION
- $hobo = MCE::Hobo->create( FUNCTION, ARGS )
- $hobo = MCE::Hobo->new( FUNCTION, ARGS )
-
This will create a new hobo that will begin execution with function as the entry point, and optionally ARGS for list of parameters. It will return the corresponding MCE::Hobo object, or undef if hobo creation failed.
FUNCTION may either be the name of a function, an anonymous subroutine, or a code ref.
my $hobo = MCE::Hobo->create( "func_name", ... ); # or my $hobo = MCE::Hobo->create( sub { ... }, ... ); # or my $hobo = MCE::Hobo->create( \&func, ... );
- $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
- $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
-
Options, excluding
ident
, may be specified globally via theinit
function. Otherwise,ident
,hobo_timeout
, andposix_exit
may be set uniquely.The
ident
option, available since 1.827, is used by callback functionson_start
andon_finish
, for identifying the started and finished process respectively.my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub { ... } ); $hobo1->join; my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub { sleep 1 for ( 1 .. 9 ); } ); $hobo2->join; if ( $hobo2->error() eq "Hobo timed out\n" ) { ... }
The
new()
method is an alias forcreate()
. - mce_async { BLOCK } ARGS;
- mce_async { BLOCK };
-
mce_async
runs the block asynchronously similarly toMCE::Hobo-
create()>. It returns the hobo object, or undef if hobo creation failed.my $hobo = mce_async { foreach (@files) { ... } }; $hobo->join(); if ( my $err = $hobo->error() ) { warn("Hobo error: $err\n"); }
- $hobo->join()
-
This will wait for the corresponding hobo to complete its execution. In non-voided context,
join()
will return the value(s) of the entry point function.The context (void, scalar or list) for the return value(s) for
join
is determined at the time of joining and mostlywantarray
aware.my $hobo1 = MCE::Hobo->create( sub { my @res = qw(foo bar baz); return (@res); }); my @res1 = $hobo1->join(); # ( foo, bar, baz ) my $res1 = $hobo1->join(); # baz my $hobo2 = MCE::Hobo->create( sub { return 'foo'; }); my @res2 = $hobo2->join(); # ( foo ) my $res2 = $hobo2->join(); # foo
- $hobo1->equal( $hobo2 )
-
Tests if two hobo objects are the same hobo or not. Hobo comparison is based on process IDs. This is overloaded to the more natural forms.
if ( $hobo1 == $hobo2 ) { print("Hobos are the same\n"); } # or if ( $hobo1 != $hobo2 ) { print("Hobos differ\n"); }
- $hobo->error()
-
Hobos are executed in an
eval
context. This method will returnundef
if the hobo terminates normally. Otherwise, it returns the value of$@
associated with the hobo's execution status in itseval
context. - $hobo->exit()
-
This sends
'SIGQUIT'
to the hobo object, notifying hobo to exit. It returns the hobo object to allow for method chaining. It is important to join later if not immediately to not leave a zombie or defunct process.$hobo->exit()->join(); ... $hobo->join(); # later
- MCE::Hobo->exit( 0 )
- MCE::Hobo->exit( 0, @ret )
-
A hobo can exit at any time by calling
MCE::Hobo-
exit()>. Otherwise, the behavior is the same asexit(status)
when called from the main process. Current since 1.827, a worker may optionally return data, to be transmitted to the parent process. - MCE::Hobo->finish()
-
This class method is called automatically by
END
, but may be called explicitly. An error is emitted via croak if there are active hobos not yet joined.MCE::Hobo->create( 'task1', $_ ) for 1 .. 4; $_->join for MCE::Hobo->list(); MCE::Hobo->create( 'task2', $_ ) for 1 .. 4; $_->join for MCE::Hobo->list(); MCE::Hobo->create( 'task3', $_ ) for 1 .. 4; $_->join for MCE::Hobo->list(); MCE::Hobo->finish();
- MCE::Hobo->init( options )
-
The init function accepts a list of MCE::Hobo options.
MCE::Hobo->init( max_workers => 'auto', # default undef, unlimited hobo_timeout => 20, # default undef, no timeout posix_exit => 1, # default undef, CORE::exit on_start => sub { my ( $pid, $ident ) = @_; ... }, on_finish => sub { my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_; ... } ); # Identification given as option or 1st argument. # Current API available since 1.827. for my $key ( 'aa' .. 'zz' ) { MCE::Hobo->create( { ident => $key }, sub { ... } ); MCE::Hobo->create( $key, sub { ... } ); } MCE::Hobo->wait_all;
Set
max_workers
if you want to limit the number of workers by waiting automatically for an available slot. Specifyauto
to obtain the number of logical cores viaMCE::Util::get_ncpu()
.Set
hobo_timeout
, in number of seconds, if you want the hobo process to terminate after some time. The default is0
for no timeout.Set
posix_exit
to avoid all END and destructor processing. Constructing MCE::Hobo inside a thread implies 1 or if present CGI, FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent, Mojo::IOLoop, Prima, STFL, Tk, Wx, or Win32::GUI.The callback options
on_start
andon_finish
are called in the parent process after starting a Hobo and later when terminated. The arguments for the subroutines were inspired by Parallel::ForkManager.The parameters for
on_start
are the following:- pid of the process - identification (ident option or 1st arg to create)
The parameters for
on_finish
are the following:- pid of the process - program exit code - identification (ident option or 1st arg to create) - exit signal id - error message from eval inside MCE::Hobo - returned data
- $hobo->is_running()
-
Returns true if a hobo is still running.
- $hobo->is_joinable()
-
Returns true if the hobo has finished running and not yet joined.
- $hobo->kill( 'SIG...' )
-
Sends the specified signal to the hobo. Returns the hobo object to allow for method chaining. As with
exit
, it is important to join eventually if not immediately to not leave a zombie or defunct process.$hobo->kill('SIG...')->join();
The following is a parallel demonstration comparing
MCE::Shared
againstRedis
andRedis::Fast
on a Fedora 23 VM. Joining begins after all workers have been notified to quit.use Time::HiRes qw(time); use Redis; use Redis::Fast; use MCE::Hobo; use MCE::Shared; my $redis = Redis->new(); my $rfast = Redis::Fast->new(); my $array = MCE::Shared->array(); sub parallel_redis { my ($_redis) = @_; my ($count, $quit, $len) = (0, 0); # instead, use a flag to exit loop $SIG{'QUIT'} = sub { $quit = 1 }; while () { $len = $_redis->rpush('list', $count++); last if $quit; } $count; } sub parallel_array { my ($count, $quit, $len) = (0, 0); # do not exit from inside handler $SIG{'QUIT'} = sub { $quit = 1 }; while () { $len = $array->push($count++); last if $quit; } $count; } sub benchmark_this { my ($desc, $num_hobos, $timeout, $code, @args) = @_; my ($start, $total) = (time(), 0); MCE::Hobo->new($code, @args) for 1..$num_hobos; sleep $timeout; # joining is not immediate; ok $_->kill('QUIT') for MCE::Hobo->list(); # joining later; ok $total += $_->join() for MCE::Hobo->list(); printf "$desc <> duration: %0.03f secs, count: $total\n", time() - $start; sleep 0.2; } benchmark_this('Redis ', 8, 5.0, \¶llel_redis, $redis); benchmark_this('Redis::Fast', 8, 5.0, \¶llel_redis, $rfast); benchmark_this('MCE::Shared', 8, 5.0, \¶llel_array);
- MCE::Hobo->list()
-
Returns a list of all hobos not yet joined.
@hobos = MCE::Hobo->list();
- MCE::Hobo->list_running()
-
Returns a list of all hobos that are still running.
@hobos = MCE::Hobo->list_running();
- MCE::Hobo->list_joinable()
-
Returns a list of all hobos that have completed running. Thus, ready to be joined without blocking.
@hobos = MCE::Hobo->list_joinable();
- MCE::Hobo->max_workers([ N ])
-
Getter and setter for max_workers. Specify a number or 'auto' to acquire the total number of cores via MCE::Util::get_ncpu. Specify a false value to set back to no limit.
API available since 1.835.
- MCE::Hobo->pending()
-
Returns a count of all hobos not yet joined.
$count = MCE::Hobo->pending();
- $hobo->result()
-
Returns the result obtained by
join
,wait_one
, orwait_all
. If the process has not yet exited, waits for the corresponding hobo to complete its execution.use MCE::Hobo; use Time::HiRes qw(sleep); sub task { my ($id) = @_; sleep $id * 0.333; return $id; } MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 ); # 1 while MCE::Hobo->wait_one(); while ( my $hobo = MCE::Hobo->wait_one() ) { my $err = $hobo->error() // 'no error'; my $res = $hobo->result(); my $pid = $hobo->pid(); print "[$pid] $err : $res\n"; }
Like
join
described above, the context (void, scalar or list) for the return value(s) is determined at the timeresult
is called and mostlywantarray
aware.my $hobo1 = MCE::Hobo->create( sub { my @res = qw(foo bar baz); return (@res); }); my @res1 = $hobo1->result(); # ( foo, bar, baz ) my $res1 = $hobo1->result(); # baz my $hobo2 = MCE::Hobo->create( sub { return 'foo'; }); my @res2 = $hobo2->result(); # ( foo ) my $res2 = $hobo2->result(); # foo
- MCE::Hobo->self()
-
Class method that allows a hobo to obtain it's own MCE::Hobo object.
- $hobo->pid()
- $hobo->tid()
-
Returns the ID of the hobo.
pid: $$ process id tid: $$ alias for pid
- MCE::Hobo->pid()
- MCE::Hobo->tid()
-
Class methods that allows a hobo to obtain its own ID.
pid: $$ process id tid: $$ alias for pid
- MCE::Hobo->wait_one()
- MCE::Hobo->wait_all()
-
Meaningful for the manager process only, waits for one or all hobos to complete execution. Afterwards, returns the corresponding hobo(s). If a hobo doesn't exist, returns the
undef
value or an empty list forwait_one
andwait_all
respectively.The
waitone
andwaitall
methods are aliases since 1.827 for backwards compatibility.use MCE::Hobo; use Time::HiRes qw(sleep); sub task { my $id = shift; sleep $id * 0.333; return $id; } MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 ); # join, traditional use case $_->join() for MCE::Hobo->list(); # wait_one, simplistic use case 1 while MCE::Hobo->wait_one(); # wait_one while ( my $hobo = MCE::Hobo->wait_one() ) { my $err = $hobo->error() // 'no error'; my $res = $hobo->result(); my $pid = $hobo->pid(); print "[$pid] $err : $res\n"; } # wait_all my @hobos = MCE::Hobo->wait_all(); for ( @hobos ) { my $err = $_->error() // 'no error'; my $res = $_->result(); my $pid = $_->pid(); print "[$pid] $err : $res\n"; }
- MCE::Hobo->yield( [ floating_seconds ] )
-
Prior API till 1.826.
Let this hobo yield CPU time to other hobos. By default, the class method calls
sleep(0.008)
on UNIX andsleep(0.015)
on Windows including Cygwin.MCE::Hobo->yield(); MCE::Hobo->yield(0.05); # total run time: 0.25 seconds, sleep occurs in parallel MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4; MCE::Hobo->wait_all();
Current API available since 1.827.
Give other hobos a chance to run, optionally for given time. Yield behaves similarly to MCE's interval option. It throttles hobos from running too fast. A demonstration is provided in the next section for fetching URLs in parallel.
# total run time: 1.00 second MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4; MCE::Hobo->wait_all();
PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT
This demonstration constructs two queues, two handles, starts the shared-manager process if needed, and spawns four hobo workers. For this demonstration, am chunking 64 URLs per job. In reality, one may run with 200 workers and chunk 300 URLs on a 24-way box.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# perl demo.pl -- all output
# perl demo.pl >/dev/null -- mngr/hobo output
# perl demo.pl 2>/dev/null -- show results only
#
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;
use Time::HiRes qw( time );
use MCE::Hobo;
use MCE::Shared;
# Construct two queues, input and return.
my $que = MCE::Shared->queue();
my $ret = MCE::Shared->queue();
# Construct shared handles to serialize output from many hobos
# writing simultaneously. This prevents garbled output.
mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
# Spawn workers early for minimum memory consumption.
MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
# Obtain or generate input data for hobos to process.
my ( $count, @urls ) = ( 0 );
push @urls, map { "http://127.0.0.$_/" } 1..254;
push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
while ( @urls ) {
my @chunk = splice(@urls, 0, 64);
$que->enqueue( { ID => ++$count, INPUT => \@chunk } );
}
# So that workers leave the loop after consuming the queue.
$que->end();
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Loop for the manager process. The manager may do other work if
# need be and periodically check $ret->pending() not shown here.
#
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
my $start = time;
printf {$ERR} "Mngr - entering loop\n";
while ( $count ) {
my ( $result, $failed ) = $ret->dequeue( 2 );
# Remove ID from result, so not treated as a URL item.
printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
# Display the URL and the size captured.
foreach my $url ( keys %{ $result } ) {
printf {$OUT} "%s: %d\n", $url, length($result->{$url})
if $result->{$url}; # url has content
}
# Display URLs the hobo worker could not reach.
if ( @{ $failed } ) {
foreach my $url ( @{ $failed } ) {
print {$OUT} "Failed: $url\n";
}
}
# Decrement the count.
$count--;
}
MCE::Hobo->wait_all();
printf {$ERR} "Mngr - exiting loop\n\n";
printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
exit;
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Hobos enqueue two items ( $result and $failed ) per each job
# for the manager process. Likewise, the manager process dequeues
# two items above. Optionally, Hobos add ID to result.
#
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
sub task {
my ( $id ) = @_;
printf {$ERR} "Hobo $id entering loop\n";
while ( my $job = $que->dequeue() ) {
my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
# Walk URLs, provide a hash and array refs for data.
printf {$ERR} "Hobo $id running job $job->{ID}\n";
walk( $job, $result, $failed );
# Send results to the manager process.
$ret->enqueue( $result, $failed );
}
printf {$ERR} "Hobo $id exiting loop\n";
}
sub walk {
my ( $job, $result, $failed ) = @_;
# Yielding is critical when running an event loop in parallel.
# Not doing so means that the app may reach contention points
# with the firewall and likely impose unnecessary hardship at
# the OS level. The idea here is not to have multiple workers
# initiate HTTP requests to a batch of URLs at the same time.
# Yielding in 1.827+ behaves more like scatter for the worker
# to run solo in a fraction of time.
MCE::Hobo->yield( 0.03 ); # MCE::Hobo 1.827
my $cv = AnyEvent->condvar();
# Populate the hash ref for URLs it could reach.
# Do not mix AnyEvent timeout and Hobo timeout.
# Choose to do the event timeout if available.
foreach my $url ( @{ $job->{INPUT} } ) {
$cv->begin();
http_get $url, timeout => 2, sub {
my ( $data, $headers ) = @_;
$result->{$url} = $data;
$cv->end();
};
}
$cv->recv();
# Populate the array ref for URLs it could not reach.
foreach my $url ( @{ $job->{INPUT} } ) {
push @{ $failed }, $url unless (exists $result->{ $url });
}
return;
}
__END__
$ perl demo.pl
Hobo 1 entering loop
Hobo 2 entering loop
Hobo 3 entering loop
Mngr - entering loop
Hobo 2 running job 2
Hobo 3 running job 3
Hobo 1 running job 1
Hobo 4 entering loop
Hobo 4 running job 4
Hobo 2 running job 5
Mngr - received job 2
Hobo 3 running job 6
Mngr - received job 3
Hobo 1 running job 7
Mngr - received job 1
Hobo 4 running job 8
Mngr - received job 4
http://192.168.0.1/: 3729
Hobo 2 exiting loop
Mngr - received job 5
Hobo 3 exiting loop
Mngr - received job 6
Hobo 1 exiting loop
Mngr - received job 7
Hobo 4 exiting loop
Mngr - received job 8
Mngr - exiting loop
Duration: 4.131 seconds
CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE
Making an executable is possible with the PAR::Packer module. On the Windows platform, threads, threads::shared, and exiting via threads are necessary for the binary to exit successfully.
# https://metacpan.org/pod/PAR::Packer
# https://metacpan.org/pod/pp
#
# pp -o demo.exe demo.pl
# ./demo.exe
use strict;
use warnings;
use if $^O eq "MSWin32", "threads";
use if $^O eq "MSWin32", "threads::shared";
# Include minimum dependencies for MCE::Hobo.
# Add other modules required by your application here.
use Storable ();
use Time::HiRes ();
# use IO::FDPass (); # optional: for condvar, handle, queue
# use Sereal (); # optional: for faster serialization
use MCE::Hobo;
use MCE::Shared;
# For PAR to work on the Windows platform, one must include manually
# any shared modules used by the application.
# use MCE::Shared::Array; # for MCE::Shared->array
# use MCE::Shared::Cache; # for MCE::Shared->cache
# use MCE::Shared::Condvar; # for MCE::Shared->condvar
# use MCE::Shared::Handle; # for MCE::Shared->handle, mce_open
# use MCE::Shared::Hash; # for MCE::Shared->hash
# use MCE::Shared::Minidb; # for MCE::Shared->minidb
# use MCE::Shared::Ordhash; # for MCE::Shared->ordhash
# use MCE::Shared::Queue; # for MCE::Shared->queue
# use MCE::Shared::Scalar; # for MCE::Shared->scalar
# Et cetera. Only load modules needed for your application.
use MCE::Shared::Sequence; # for MCE::Shared->sequence
my $seq = MCE::Shared->sequence( 1, 9 );
sub task {
my ( $id ) = @_;
while ( defined ( my $num = $seq->next() ) ) {
print "$id: $num\n";
sleep 1;
}
}
sub main {
MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
MCE::Hobo->wait_all();
}
# Main must run inside a thread on the Windows platform or workers
# will fail duing exiting, causing the exe to crash. The reason is
# that PAR or a dependency isn't multi-process safe.
( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
threads->exit(0) if $INC{"threads.pm"};
CREDITS
The inspiration for MCE::Hobo
comes from wanting threads
-like behavior for processes. Both can run side-by-side including safe-use by MCE workers. Likewise, the documentation resembles threads
.
The inspiration for wait_all
and wait_one
comes from the Parallel::WorkUnit
module.
SEE ALSO
INDEX
AUTHOR
Mario E. Roy, <marioeroy AT gmail DOT com>