NAME

Thread::Workers - Creates a boss which feeds a queue consumed by workers. This module aims to be lightweight with limited features. Its primary aim is to provide simple Boss/Worker thread management while keeping dependencies low.

This is currently in experimental and development state and will be solidified more over time, but it works as advertised now.

SYNOPSIS

use Thread::Workers;

my $pool = Thread::Workers->new();
$pool->set_boss_fetch_cb(\&function_returns_work);
$pool->set_boss_log_cb(\&function_processes_worker_returns);
$pool->set_worker_work_cb(\&function_does_work);
$pool->start_boss();
$pool->start_workers();
$pool->add_worker();
$pool->sleep_workers();
$pool->wake_workers();

#internal control loops
# we have orders to increase the load! add 500 workers
for (1..500) { 
  $pool->add_worker();
}

#time to cleanup

$pool->stop_boss(); #signal boss thread to die
$pool->stop_workers(); #stop the workers, may leave unfinished items in queue.
# Or! 
$pool->stop_finish_work(); #gracefully stop boss and finish work queue, then shut down workers.

DESCRIPTION

Thread::Workers utilizes threads, Thread::Sempahore, and Thread::Queue to create a pool of workers which are serviced with work by a boss thread. The boss thread could be fed data from a socket listening on the master thread, or could have a routine to check a database for work.

EXAMPLE

sub fetch_data {

my $work = get_data_from_a_data_sourc();
# if you have an array of items and wish it to be processed you can do
# my %hash = map { (0..$#{$work}) => $_ } @{$work}; # or something
# the hask keys represent a 'priority' so to speak.
# an array or a scalar being put into the work queue are passed directly
# to a worker to be processed. if you have a single hash item you wish to pass,
# do something like return [ %hash ]
return $work;
}
sub work_data {
my $work = shift;
# process the work.
# we can log a return by returning a value.
return do_something_with($work);
}
sub work_log {
my $log = shift; # this is an array of hashes. each array item is { workitem => $original_work_item, return => $return_from_worker };
do_something_with_the_log($log);
}
my $workers = Thread::Workers->new(threadinterval => 5, bossinterval => 5, totalthreads => 15);
$workers->set_boss_log_cb->(\&work_log);
$workers->set_boss_fetch_cb->(\&fetch_data);
$workers->set_workers_work_cb->(\&work_data);
$workers->start_boss();
$workers->start_workers();

# would probably do other things in your code to spin in loops. # In my own code, I'm doing system monitoring and injecting some jobs locally, handling logging of the boss/worker subs, # and other tasks.

The boss thread will do the following actions when it receives work:

For scalars or arrays, it will post it as a single item into the work queue. Your workers need to know how to deal with the data being presented. This is not the expected use case.

For a hash, the boss expects the work to be presented with the keys being unique integers. The integers correspond to the order they are placed into the queue.

{

0 => 'step 0'
1 => { 'step all' => 'data' } # scalar, maybe you just want a simple scalar for a worker.
2 => { cmd1 => { something => 'data', blah => 'blah' }} # or some object or some array.

}

This will create 3 separate "work items" to be placed into the queue.

If you need to feed your workers with a single block of data from a hash, you *must* assign it this way.

job_returning_from_boss_fetch =

{

  0 => { 
	cmd1 => { 
		    something => 'data', somethingelse => 'data', jobid => '121'   # whatever your worker callback is expecting
		}
       } 

}# or cmd1 => scalar or an array or object or something.

If the client returns data, say for 'step 0' it returned a value, it will be given to the log queue as an array of hashes. Lets say the worker logged { timestamp => '201209030823', jobid => 'cmd1', return = 'success' }

The log queue will have

[

{ job => 'cmd1', return => {timestamp => '201209030823', jobid => '121', return = 'success' } } # value of return is from worker callback return

]

Whether you set a log callback or not, the log is flushed at the end of every boss interval. Use it or lose it.

Currently there is no signal to tell the boss to refeed its queue back upstream, though the Thread::Pool object can be accessed via $pool->{_queue}. Future revisions will include a callback for this ability.

SEE ALSO

threads

Thread::Queue

Thread::Sempahore

If this doesn't suit your needs, see the following projects:

Thread::Pool - very similar in goals to this project with a larger feature set.

Gearman::Client - client/server worker pool

TheSchwartz (or Helios) - DBI fed backend to pools of workers

Beanstalk::Client and/or Beanstalk::Pool - another client/server worker pool

Hopkins - "a better cronjob" with work queues, async driven backend

AUTHOR

Kal Aeolian, <kalielaeolian@gmail.com<gt>

COPYRIGHT AND LICENSE

Copyright (C) 2012 by Kal Aeolian

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.14.2 or, at your option, any later version of Perl 5 you may have available.