NAME
Parallel::MapReduce - MapReduce Infrastructure, multithreaded
SYNOPSIS
## THIS IS ALL STILL EXPERIMENTAL!!
## DO NOT USE FOR PRODUCTION!!
## LOOK AT THE ROADMAP AND FEEDBACK WHAT YOU FIND IMPORTANT!!
use Parallel::MapReduce;
my $mri = new Parallel::MapReduce (MemCacheds => [ '127.0.0.1:11211', .... ],
Workers => [ '10.0.10.1', '10.0.10.2', ...]);
my $A = {1 => 'this is something ',
2 => 'this is something else',
3 => 'something else completely'};
# apply MR algorithm (word count) on $A
my $B = $mri->mapreduce (
sub {
my ($k, $v) = (shift, shift);
return map { $_ => 1 } split /\s+/, $v;
},
sub {
my ($k, $v) = (shift, shift);
my $sum = 0;
map { $sum += $_ } @$v;
return $sum;
},
$A
);
# prefabricate mapreducer
my $string2lines = $mri->mapreducer (sub {...}, sub {...});
# apply it
my $B = &$string2lines ($A);
# pipeline it with some other mapreducer
my $pipeline = $mri->pipeline ($string2lines,
$lines2wordcounts);
# apply that
my $B = &$pipeline ($A);
ABSTRACT
The MapReduce framework allows a parallel, and possibly distributed computation of CPU intensive computations on several, if not many hosts.
For this purpose you will have to formulate your problem into one which only deals with list traversal (map) and list comprehension (reduce), something which is not unnatural for Perl programmers. In effect you end up with a hash-to-hash transform and this is exactly what this package implements.
This package implements MapReduce for local invocations, parallelized (but still local) invocations and for fully distributed invocations. For the latter it is not using a file system to propagate data, but instead a pool of memcached
servers.
DESCRIPTION
In a nutshell, the MapReduce algorithm is this in sequential form):
sub mapreduce {
my $mri = shift;
my $map = shift;
my $reduce = shift;
my $h1 = shift;
my %h3;
while (my ($k, $v) = each %$h1) {
my %h2 = &$map ($k => $v);
map { push @{ $h3{$_} }, $h2{$_} } keys %h2;
}
my %h4;
while (my ($k, $v) = each %h3) {
$h4{$k} = &$reduce ($k => $v);
}
return \%h4;
}
It is the task of the application programmer to determine the functions $map
and $reduce
, which when applied to the hash $h1
will produce the wanted result. The infrastructure $mri
is not used above, but it becomes relevant when the individual invocations of $map
and $reduce
are (a) parallelized or (b) are distributed. And this is what this package does.
- Master
-
This is the host where you initiate the computation and this is where the central algorithm will be executed.
- Workers
-
Each worker can execute either the
$map
function or the$reduce
over the subslice of the overall data. Workers can run local simply as subroutine (see Parallel::MapReduce::Worker, or can be a thread talking to a remote instance of a worker (see Parallel::MapReduce::Worker::SSH).When you create your MR infrastructure you can specify which kind of workers you want to use (via a
WorkerClass
in the constructor).NOTE: Feel free to propose more workers.
- Servers
-
To exchange hash data between master and workers and also between workers this package makes use of an existing
memcached
server pool (see http://www.danga.com/memcached/). Obviously, the more servers there are running, the merrier.NOTE: The (Debian-packaged) Perl client is somewhat flaky in multi-threaded environments. I made some work-arounds, but other options should be investigated.
INTERFACE
Constructor
$mri = new Parallel::MapReduce (...)
The constructor accepts the following options:
MemCacheds
(default: none)-
A list reference to IP:port strings detailing how the
memcached
can be reached. You must specify at least one. If you have nomemcached
running, your only option is to use Parallel::MapReduce::Testing instead. That runs the whole thing locally. Workers
(default: none)-
A list reference to IP addresses on which hosts the workers should be run. You can name one and the same IP address multiple times to rebalance the load.
For worker implementations which are not farmed out, the IP addresses do not count. But their number does.
WorkerClass
(default:Parallel::MapReduce::Worker
)-
Which worker implementation to be used.
Methods
- shutdown
-
$mri->shutdown
Especially when you use the SSH workers you should make sure that you terminate them properly. So better run this method if you do not want to have plenty of SSH sessions being left over.
- mapreduce
-
$A = $mri->mapreduce ($map_coderef, $reduce_coderef, $B)
This method applies to hash (reference)
$B
the MR algorithm. You have to pass in CODE refs to the map and the reduce function. The result a reference to a hash. - mapreducer
-
$op = $mri->mapreducer ($map_coderef, $reduce_coderef)
This method returns a prefabricated mapreducer (see SYNOPSIS). You also have to pass in CODE refs to the map and the reduce function.
- pipeline
-
$op = $mri->pipeline ($op1, $op2, ...)
This method takes a number of prefabricated mapreducers and pipelines them into one. That is returned.
NOTE: When a pipeline is executed the processor could be clever not to retrieve intermediate hashes. At the moment, though, this is still the case.
SEE ALSO
Parallel::MapReduce::Sequential, Parallel::MapReduce::Testing, Parallel::MapReduce::Worker, Log::Log4perl
COPYRIGHT AND LICENSE
Copyright 200[8] by Robert Barta, <drrho@cpan.org>
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.