NAME
Parallel::DataPipe
- parallel data processing conveyor
SYNOPSIS
use Parallel::DataPipe;
Parallel::DataPipe::run {
input_iterator => [1..100],
process_data => sub { "$_:$$" },
number_of_data_processors => 100,
merge_data => sub { print "$_\n" },
};
DESCRIPTION
If you have some long running script processing data item by item (having on input some data and having on output some processed data i.e. aggregation, webcrawling,etc) here is good news for you:
You can speed it up 4-20 times with minimal efforts from you. Modern computer (even modern smartphones ;) ) have multiple CPU cores: 2,4,8, even 24! And huge amount of memory: memory is cheap now. So they are ready for parallel data processing. With this script there is an easy and flexible way to use that power.
Well, it is not the first method on parallelizm in Perl. You could write an efficient crawler using single core and framework like Coro::LWP or AnyEvent::HTTP::LWP. Also you can elegantly use all your cpu cores for parallel processing using Parallel::Loop. So what are the benefits of this module?
1) because it uses input_iterator it does not have to know all input data before starting parallel processing
2) because it uses merge_data processed data is ready for using in main thread immediately.
1) and 2) remove requirements for memory which is needed to store data items before and after parallel work. and allows parallelize work on collecting, processing and using processed data.
If you don't want to overload your database with multiple simultaneous queries you make queries only within input_iterator and then process_data and then flush it with merge_data. On the other hand you usually win if make queries in process_data and do a lot of data processors. This guarantees full load of your cpu capabilities. It's not surprise, that DB servers usually serves N queries simultaneously faster then N queries one by one. Make tests and you will know.
To (re)write your script for using all processing power of your server you have to find out:
1) the method to obtain source/input data. I call it input iterator. It can be either array with some identifiers/urls or reference to subroutine which returns next portion of data or undef if there is nor more data to process.
2) how to process data i.e. method which receives input item and produce output item. I call it process_data subroutine. The good news is that item which is processed and then returned can be any scalar value in perl, including references to array and hashes. It can be everything that Storable can freeze and then thaw.
3) how to use processed data. I call it merge_data. In the example above it just prints an item, but you could do buffered inserts to database, send email, etc.
Take into account that 1) and 3) is executed in main script thread. While all 2) work is done in parallel forked threads. So for 1) and 3) it's better not to do things that block execution and remains hungry dogs 2) without meat to eat. So (still) this approach will benefit if you know that bottleneck in you script is CPU on processing step. Of course it's not the case for some web crawling tasks unless you do some heavy calculations
run
This is subroutine which covers magic of parallelizing data processing. It receives paramaters with these keys via hash ref.
input_iterator - reference to array or subroutine which should return data item to be processed. in case of subroutine it should return undef to signal EOF.
process_data - reference to subroutine which process data items. they are passed via $_ variable Then it should return processed data. this subroutine is executed in forked process so don't use any shared resources inside it. Also you can update children state, but it will not affect parent state.
merge_data - reference to a subroutine which receives data item which was processed in $_ and now going to be merged this subroutine is executed in parent thread, so you can rely on changes that it made after process_data
completion.
These parameters are optional and has reasonable defaults, so you change them only know what you do
number_of_data_processors - (optional) number of parallel data processors. if you don't specify, it tries to find out a number of cpu cores and create the same number of data processor children. It looks for NUMBER_OF_PROCESSORS environment variable, which is set under Windows NT. If this environment variable is not found it looks to /proc/cpuinfo which is availbale under Unix env. It makes sense to have explicit number_of_data_processors
which possibly is greater then cpu cores number if you are to use all slave DB servers in your environment and making query to DB servers takes more time then processing returned data. Otherwise it's optimal to have number_of_data_processors
equal to number of cpu cores.
freeze, thaw - you can use alternative serializer. for example if you know that you are working with array of words (0..65535) you can use freeze => sub {pack('S*',@{$_[0]})} and thaw => sub {[unpack('S*',$_[0])]} which will reduce the amount of bytes exchanged between processes. But do it as the last optimization resort only. In fact automatic choise is quite good and efficient. It uses encode_sereal and decode_sereal if Sereal module is found. Otherwise it use Storable freeze and thaw. =head3 How It Works
1. Main thread (parent) forks number_of_data_processors
of children for processing data.
2. As soon as data comes from input_iterator
it sends it to to next child using serialization and pipe mechanizm.
3. Child deserialize it, process it, serialize the result and put it to pipe for parent.
4. Parent firstly fills up all the pipes to children with data and then starts to expect processed data on pipes from children.
5. If it receives data from chidlren it sends processed data to data_merge
subroutine, puts new portion of unprocessed data to that childs pipe (step 2).
6. This conveyor works until input data is ended (end of input_iterator
array or input_iterator
sub returned undef).
7. In the end parent expects processed data from all busy chidlren and puts processed data to data_merge
8. After having all the children sent processed data they are killed and run returns to the caller.
SEE ALSO
DEPENDENCIES
These should all be in perl's core:
use Storable qw(freeze thaw);
use IO::Select;
use POSIX ":sys_wait_h";
For tests:
use Test::More tests => 21;
use Time::HiRes qw(time);
if found it uses Sereal module for serialization instead of Storable as the former is more efficient.
BUGS
For all bugs small and big and also what do you think about this stuff please send email to okharch@gmail.com.
SOURCE REPOSITORY
See the git source on github https://github.com/okharch/Parallel-DataPipe
COPYRIGHT
Copyright (c) 2013 Oleksandr Kharchenko <okharch@gmail.com>
All right reserved. This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
AUTHOR
Oleksandr Kharchenko <okharch@gmail.com>