- parallel data processing conveyor
use Parallel::DataPipe;
Parallel::DataPipe::run {
input_iterator => [1..100],
process_data => sub { "$_:$$" },
number_of_data_processors => 100,
merge_data => sub { print "$_\n" },
If you have some long running script processing data item by item (having on input some data and having on output some processed data i.e. aggregation, webcrawling,etc) you can speed it up 4-20 times using parallel datapipe conveyour. Modern computer (even modern smartphones ;) ) have multiple CPU cores: 2,4,8, even 24! And huge amount of memory: memory is cheap now. So they are ready for parallel data processing. With this script there is an easy and flexible way to use that power.
So what are the benefits of this module?
1) because it uses input_iterator it does not have to know all input data before starting parallel processing
2) because it uses merge_data processed data is ready for using in main thread immediately.
1) and 2) remove requirements for memory which is needed to store data items before and after parallel work. and allows parallelize work on collecting, processing and using processed data.
If you don't want to overload your database with multiple simultaneous queries you make queries only within input_iterator and then process_data and then flush it with merge_data. On the other hand you usually win if make queries in process_data and do a lot of data processors. Possibly even more then physical cores if database queries takes a long time and then small amount to process.
It's not surprise, that DB servers usually serves N queries simultaneously faster then N queries one by one.
Make tests and you will know.
To (re)write your script for using all processing power of your server you have to find out:
1) the method to obtain source/input data. I call it input iterator. It can be either array with some identifiers/urls or reference to subroutine which returns next portion of data or undef if there is nor more data to process.
2) how to process data i.e. method which receives input item and produce output item. I call it process_data subroutine. The good news is that item which is processed and then returned can be any scalar value in perl, including references to array and hashes. It can be everything that Storable can freeze and then thaw.
3) how to use processed data. I call it merge_data. In the example above it just prints an item, but you could do buffered inserts to database, send email, etc.
Take into account that 1) and 3) is executed in main script thread. While all 2) work is done in parallel forked threads. So for 1) and 3) it's better not to do things that block execution and remains hungry dogs 2) without meat to eat. So (still) this approach will benefit if you know that bottleneck in you script is CPU on processing step. Of course it's not the case for some web crawling tasks unless you do some heavy calculations
This is subroutine which covers magic of parallelizing data processing. It receives paramaters with these keys via hash ref.
input - reference to array or subroutine which should return data item to be processed. in case of subroutine it should return undef to signal EOF. In case of array it uses it as queue, i.e. shift(@$array) until there is no data item, This behaviour has been introduced in 0.06. Also you can use these aliases: input_iterator, queue, data
Note: in version before 0.06 it was input_iterator and if reffered to array it remained untouched.
while new behaviour is to treat this parameter like a queue.
0.06 support old behaviour only for input_iterator,
while in the future it will behave as a queue to make life easier
process - reference to subroutine which process data items. they are passed via $_ variable Then it should return processed data. this subroutine is executed in forked process so don't use any shared resources inside it. Also you can update children state, but it will not affect parent state. Also you can use these aliases: process_data process processor map
These parameters are optional and has reasonable defaults, so you change them only know what you do
output - optional. either reference to a subroutine or array which receives processed data item. subroutine can use $_ or $_[0] to access data item and $_[1] to access item_number. this subroutine is executed in parent thread, so you can rely on changes that it made. if you don't specify this parameter array with processed data can be received as a subroutine result. You can use this aliseases for this parameter: merge_data, output_iterator, output, output_queue, output_data, merge, reduce
number_of_data_processors - (optional) number of parallel data processors. if you don't specify, it tries to find out a number of cpu cores and create the same number of data processor children. It looks for NUMBER_OF_PROCESSORS environment variable, which is set under Windows NT. If this environment variable is not found it looks to /proc/cpuinfo which is availbale under Unix env. It makes sense to have explicit number_of_data_processors
which possibly is greater then cpu cores number if you are to use all slave DB servers in your environment and making query to DB servers takes more time then processing returned data. Otherwise it's optimal to have number_of_data_processors
equal to number of cpu cores.
freeze, thaw - you can use alternative serializer. for example if you know that you are working with array of words (0..65535) you can use freeze => sub {pack('S*',@{$_[0]})} and thaw => sub {[unpack('S*',$_[0])]} which will reduce the amount of bytes exchanged between processes. But do it as the last optimization resort only. In fact automatic choise is quite good and efficient. It uses encode_sereal and decode_sereal if Sereal module is found. Otherwise it use Storable freeze and thaw.
Note: run has also undocumented prototype for calling (\@\$) i.e.
my @x2 = Parallel::DataPipe::run([1..100],sub {$_*2});
This prototype is not guaranteed to be supported. Use it at your own risk. =head2 HOW IT WORKS
1) Main thread (parent) forks number_of_data_processors
of children for processing data.
2) As soon as data comes from input_iterator
it sends it to next child using pipe mechanizm.
3) Child processes data and returns result back to parent using pipe.
4) Parent firstly fills up all the pipes to children with data and then starts to expect processed data on pipes from children.
5) If it receives result from chidlren it sends processed data to data_merge
subroutine, and starts loop 2) again.
6) loop 2) continues until input data is ended (end of input_iterator
array or input_iterator
sub returned undef).
7) In the end parent expects processed data from all busy chidlren and puts processed data to data_merge
8) After having all the children sent processed data they are killed and run returns to the caller.
Note: If input_iterator
or <process_data> returns reference, it serialize/deserialize data before/after pipe. That way you have full control whether data will be serialized on IPC.
IO::Pipely - pipes that work almost everywhere
POE - portable multitasking and networking framework for any event loop
Only core modules are used.
if found it uses Sereal module for serialization instead of Storable as the former is more efficient.
For all bugs please send an email to
See the git source on github
Copyright (c) 2013 Oleksandr Kharchenko <>
All right reserved. This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
Oleksandr Kharchenko <>
1 POD Error
The following errors were encountered while parsing the POD:
- Around line 325:
Unknown directive: =comment