NAME
Thread::Pipeline - multithreaded pipeline manager
VERSION
version 0.003
SYNOPSIS
my %blocks = (
map1 => { sub => \&mapper, num_threads => 2, main_input => 1, out => 'map2' },
map2 => { sub => \&another_mapper, num_threads => 5, out => [ 'log', 'reduce' ] },
reduce => { sub => \&reducer, need_finalize => 1, out => '_out' },
log => { sub => \&logger },
);
# create pipeline
my $pipeline = Thread::Pipeline->new( \%blocks );
# fill its input queue
for my $data_item ( @data_array ) {
$pipeline->enqueue( $data_item );
}
# say that there's nothing more to process
$pipeline->no_more_data();
# get results from pipeline's output queue
my @results = $pipeline->get_results();
METHODS
new
my $pl = Thread::Pipeline->new( $blocks_description );
Constructor. Creates pipeline object, initializes blocks if defined.
Blocks description is a hashref { $id => $descr, ... } or an arrayref [ $id => $lite_descr, ... ] (see add_block). For arrayrefs constructor assumes direct block chain and automatically adds 'main_input' and 'out' fields.
add_block
my %block_info = (
sub => \&worker_sub,
num_threads => $num_of_threads,
out => $next_block_id,
);
$pl->add_block( $block_id => \%block_info );
Add new block to the pipeline. Worker threads and associated incoming queue would be created.
Block info is a hash containing keys:
* sub - worker coderef (required)
* num_threads - number of parallel threads of worker, default 1
* out - id of block where processed data should be sent,
use '_out' for pipeline's main output
* main_input - mark this block as default for enqueue
* post_sub - code that run when all theads ends
* need_finalize - run worker with undef when queue is finished
Worker is a sub that will be executed with two params: &worker_sub($data, $pipeline). When $data is undefined that means that it is latest data item in sequence.
enqueue
$pl->enqueue( $data, %opts );
Puts the data into block's queue
Options:
* block - id of block, default is pipeline's main input block
no_more_data
$pl->no_more_data( %opts );
get_results
my @result = $pl->get_results();
Wait for all pipeline operations to finish. Returns content of outlet queue
get_threads_num
my $num = $pl->get_threads_num($block_id);
AUTHOR
liosha <liosha@cpan.org>
COPYRIGHT AND LICENSE
This software is copyright (c) 2012 by liosha.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.