NAME

MCE - Many-Core Engine for Perl. Provides parallel processing cabilities.

VERSION

This document describes MCE version 1.003

SYNOPSIS

use MCE;

## A new instance shown with all available options.

my $mce = MCE->new(

   tmp_dir      => $tmp_dir,

       ## Default is $MCE::Signal::tmp_dir which points to
       ## $ENV{TEMP} if defined. Otherwise, tmp_dir points
       ## to /tmp.

   input_data   => $input_file,    ## Default is undef

       ## input_data => '/path/to/file' for input file
       ## input_data => \@array for input array
       ## input_data => \*FILE_HNDL for file handle
       ## input_data => \$scalar to treat like file

   chunk_size   => 2000,           ## Default is 500

       ## Less than or equal to 8192 is number of records.
       ## Greater than 8192 is number of bytes. MCE reads
       ## till the end of record before calling user_func.

       ## chunk_size =>     1,     ## Consists of 1 record
       ## chunk_size =>  1000,     ## Consists of 1000 records
       ## chunk_size => 16384,     ## Approximate 16384 bytes
       ## chunk_size => 50000,     ## Approximate 50000 bytes

   max_workers  => 8,              ## Default is 2
   use_slurpio  => 1,              ## Default is 0
   use_threads  => 1,              ## Default is 0 or 1

       ## Number of workers to spawn, whether or not to enable
       ## slurpio when reading files (passes raw chunk to user
       ## function), and whether or not to use threads versus
       ## forking new workers. Use_threads defaults to 1 only
       ## when script contains 'use threads' or 'use forks'
       ## and 'use_threads' is not specified. when wanting
       ## threads, please include thread modules prior to
       ## MCE. Load MCE as the very last module.

   job_delay    => 0.035,          ## Default is undef
   spawn_delay  => 0.150,          ## Default is undef
   submit_delay => 0.001,          ## Default is undef

       ## How long to wait before spawning, job (run),
       ## and params submission to workers.

   user_begin   => \&user_begin,   ## Default is undef
   user_func    => \&user_func,    ## Default is undef
   user_end     => \&user_end,     ## Default is undef

       ## Think of user_begin, user_func, user_end like the awk
       ## scripting language:
       ##    awk 'BEGIN { ... } { ... } END { ... }'

       ## MCE workers calls user_begin once per job, then
       ## calls user_func repeatedly until no chunks remain.
       ## Afterwards, user_end is called.

   user_error   => \&user_error,   ## Default is undef
   user_output  => \&user_output,  ## Default is undef

       ## When workers call the following funtions, MCE will
       ## pass the data to user_error/user_output if defined.
       ## $self->sendto('stderr', 'Sending to STDERR');
       ## $self->sendto('stdout', 'Sending to STDOUT');

   stderr_file  => 'err_file',     ## Default is STDERR
   stdout_file  => 'out_file',     ## Default is STDOUT

       ## Or to file. User_error/user_output take precedence.

   flush_stderr => 1,              ## Default is 0
   flush_stdout => 1,              ## Default is 0

       ## Flush standard error or output immediately.
);

## Run calls spawn, kicks off job, workers call user_begin,
## user_func, user_end. Run shuts down workers afterwards.
## The run method can take an optional argument. Default is
## 1 to auto-shutdown workers after processing.

$mce->run();
$mce->run(0);                      ## 0 disables auto-shutdown

## Or, spawn workers early.

$mce->spawn();

## Acquire data arrays and/or input_files. The same pool of
## workers are used. Process calls run(0).

$mce->process(\@input_data_1);     ## Process arrays
$mce->process(\@input_data_2);
$mce->process(\@input_data_n);

$mce->process('input_file_1');     ## Process files
$mce->process('input_file_2');
$mce->process('input_file_n');

## Shutdown workers afterwards.

$mce->shutdown();

SYNTAX FOR USER_BEGIN & USER_END

## Both user_begin and user_end functions, if specified, behave
## similarly to awk 'BEGIN { ... } { ... } END { ... }'.

## Each worker calls this once prior to processing.

sub user_begin {                   ## Optional via user_begin option

   my $self = shift;

   $self->{wk_total_rows} = 0;     ## Prefix variables with wk_
}

## And once after completion.

sub user_end {                     ## Optional via user_end option

   my $self = shift;

   printf "## %d: Processed %d rows\n",
      $self->wid(), $self->{wk_total_rows};
}

SYNTAX FOR USER_FUNC (with use_slurpio => 0 option)

## MCE passes a reference to an array containing the chunk data.

sub user_func {

   my ($self, $chunk_ref, $chunk_id) = @_;

   foreach my $row ( @{ $chunk_ref } ) {
      print $row;
      $self->{wk_total_rows} += 1;
   }
}

SYNTAX FOR USER_FUNC (with use_slurpio => 1 option)

## MCE passes a reference to a scalar containing the raw chunk data.

sub user_func {

   my ($self, $chunk_ref, $chunk_id) = @_;

   my $count = () = $$chunk_ref =~ /abc/;
}

SYNTAX FOR USER_ERROR & USER_OUTPUT

## MCE will direct $self->sendto('stderr/out', ...) calls to these
## functions in a serialized fashion. This is handy if one wants to
## filter, modify, and/or send the data elsewhere.

sub user_error {                   ## Optional via user_error option

   my $error = shift;

   print LOGERR $error;
}

sub user_output {                  ## Optional via user_output option

   my $output = shift;

   print LOGOUT $output;
}

DESCRIPTION

Many-core Engine (MCE) for Perl helps enable a new level of performance by maximizing all available cores. One immediate benefit is that MCE does not fork a new worker process per each element in an array. Instead, MCE follows a bank queuing model. Imagine the line being the data and bank-tellers the parallel workers. MCE enhances that model by adding the ability to chunk the next n elements from the input stream to the next available worker.

MCE EXAMPLE WITH CHUNK_SIZE => 1

## Imagine a long running process and wanting to parallelize an array
## against a pool of workers.

my @input_data  = (0 .. 18000 - 1);
my $max_workers = 3;
my $order_id    = 1;
my %result;

## Callback function for displaying results. The logic below shows how
## one can display results immediately while still preserving output
## order. The %result hash is a temporary cache to store results
## for out-of-order replies.

sub display_result {

   my ($wk_result, $chunk_id) = @_;
   $result{$chunk_id} = $wk_result;

   while (1) {
      last unless exists $result{$order_id};

      printf "i: %d sqrt(i): %f\n",
         $input_data[$order_id - 1], $result{$order_id};

      delete $result{$order_id};
      $order_id++;
   }
}

## Compute via MCE.

my $mce = MCE->new(
   input_data  => \@input_data,
   max_workers => $max_workers,
   chunk_size  => 1,

   user_func => sub {

      my ($self, $chunk_ref, $chunk_id) = @_;
      my $wk_result = sqrt($chunk_ref->[0]);

      $self->do('display_result', $wk_result, $chunk_id);
   }
);

$mce->run();

FOREACH SUGAR METHOD

## Compute via MCE. Foreach implies chunk_size => 1.

my $mce = MCE->new(
   max_workers => 3
);

## Worker calls code block passing a reference to an array containing
## one item. Use $chunk_ref->[0] to retrieve the single element.

$mce->foreach(\@input_data, sub {

   my ($self, $chunk_ref, $chunk_id) = @_;
   my $wk_result = sqrt($chunk_ref->[0]);

   $self->do('display_result', $wk_result, $chunk_id);
});

MCE EXAMPLE WITH CHUNK_SIZE => 500

## Chunking reduces overhead manyfolds. Instead of passing a single
## item from @input_data, a chunk of $chunk_size is sent instead to
## the next available worker.

my @input_data  = (0 .. 385000 - 1);
my $max_workers = 3;
my $chunk_size  = 500;
my $order_id    = 1;
my %result;

## Callback function for displaying results.

sub display_result {

   my ($wk_result, $chunk_id) = @_;
   $result{$chunk_id} = $wk_result;

   while (1) {
      last unless exists $result{$order_id};
      my $i = ($order_id - 1) * $chunk_size;

      for ( @{ $result{$order_id} } ) {
         printf "i: %d sqrt(i): %f\n", $input_data[$i++], $_;
      }

      delete $result{$order_id};
      $order_id++;
   }
}

## Compute via MCE.

my $mce = MCE->new(
   input_data  => \@input_data,
   max_workers => $max_workers,
   chunk_size  => $chunk_size,

   user_func => sub {

      my ($self, $chunk_ref, $chunk_id) = @_;
      my @wk_result;

      for ( @{ $chunk_ref } ) {
         push @wk_result, sqrt($_);
      }

      $self->do('display_result', \@wk_result, $chunk_id);
   }
);

$mce->run();

FORCHUNK SUGAR METHOD

## Compute via MCE.

my $mce = MCE->new(
   max_workers => $max_workers,
   chunk_size  => $chunk_size
);

## Below, $chunk_ref is a reference to an array containing the next
## $chunk_size items from @input_data.

$mce->forchunk(\@input_data, sub {

   my ($self, $chunk_ref, $chunk_id) = @_;
   my @wk_result;

   for ( @{ $chunk_ref } ) {
      push @wk_result, sqrt($_);
   }

   $self->do('display_result', \@wk_result, $chunk_id);
});

LAST & NEXT METHODS

## Both last and next methods work inside foreach, forchunk,
## and user_func code blocks.

## ->last: Worker immediately exits the chunking loop

my @list = (1..80);

$mce->forchunk(\@list, { chunk_size => 2 }, sub {

   my ($self, $chunk_ref, $chunk_id) = @_;

   $self->last if ($chunk_id > 4);

   my @output = ();

   for my $rec ( @{ $chunk_ref } ) {
      push @output, $rec, "\n";
   }

   $self->sendto('stdout', \@output);
});

-- Output (each chunk above consists of 2 elements)

1
2
3
4
5
6
7
8

## ->next: Worker starts the next iteration of the chunking loop

my @list = (1..80);

$mce->forchunk(\@list, { chunk_size => 4 }, sub {

   my ($self, $chunk_ref, $chunk_id) = @_;

   $self->next if ($chunk_id < 20);

   my @output = ();

   for my $rec ( @{ $chunk_ref } ) {
      push @output, $rec, "\n";
   }

   $self->sendto('stdout', \@output);
});

-- Output (each chunk above consists of 4 elements)

77
78
79
80

MISCELLANEOUS METHODS

## Notifies workers to abort after processing the current chunk. The
## abort method is only meaningful when processing input_data.

   $self->abort();

## Worker exits current job.

   $self->exit();

## Returns worker ID of worker.

   $self->wid();

DO & SENDTO METHODS

MCE can serialized data transfers from worker processes via helper functions. The main MCE thread will process these in a serial fashion. This utilizes the Storable Perl module for passing data from a worker process to the main MCE thread. In addition, the callback function can optionally return a reply.

[ $reply = ] $self->do('callback_func' [, $arg1, $arg2, ...]);

## Passing arguments to a callback function using references & scalar:

sub callback_func {  
   my ($array_ref, $hash_ref, $scalar_ref, $scalar) = @_;
   ...
}

$self->do('main::callback_func', \@a, \%h, \$s, 'hello');

## MCE knows if wanting void, a list, a hash, or a scalar return value.

$self->do('callback_func' [, ...]);
my @array  = $self->do('callback_func' [, ...]);
my %hash   = $self->do('callback_func' [, ...]);
my $scalar = $self->do('callback_func' [, ...]);

## Display content to STDOUT or STDERR. Same as above, supports only 1
## level scalar values for arrays.

$self->sendto('stdout', \@array);
$self->sendto('stdout', \$scalar);
$self->sendto('stdout', $scalar);

$self->sendto('stderr', \@array);
$self->sendto('stderr', \$scalar);
$self->sendto('stderr', $scalar);

## Append content to the end of file. Supports 1 level scalar values for
## arrays.

$self->sendto('file', \@array, '/path/to/file');
$self->sendto('file', \$scalar, '/path/to/file');
$self->sendto('file', $scalar, '/path/to/file');

EXAMPLES

MCE comes with various examples showing real-world use case scenarios on parallelizing something as small as cat (try with -n) to grepping for patterns and word count aggregation.

cat.pl    Concatenation script, similar to the cat binary.
egrep.pl  Egrep script, similar to the egrep binary.
wc.pl     Word count script, similar to the wc binary.

findnull.pl
          A parallel driven script to report lines containing
          null fields. It's many times faster than the binary
          egrep command. Try against a large file containing
          very long lines.

scaling_pings.pl
          Perform ping test and report back failing IPs to
          standard output.

widefinder.pl
          An implementation utilizing MCE.
          As fast as MMAP IO when file resides in OS FS cache
          2x ~ 3x faster when reading directly from disk

foreach.pl
forchunk.pl
          These take the same sqrt example from Parallel::Loops
          and measures the overhead of the engine. The number
          indicates the size of @input which can be submitted
          and results displayed in 1 second.

          Parallel::Loops:     600  Forking each @input is expensive
          MCE foreach....:  18,000  Sends result after each @input
          MCE forchunk...: 385,000  Chunking reduces overhead 

REQUIREMENTS

Perl 5.8.0 or later

SEE ALSO

MCE::Signal

SOURCE

The source is hosted at: http://code.google.com/p/many-core-engine-perl/

AUTHOR

Mario E. Roy, <marioeroy AT gmail DOT com>

COPYRIGHT AND LICENSE

Copyright (C) 2012 by Mario E. Roy

MCE is free software; you can redistribute it and/or modify it under the same terms as Perl itself.