NAME
Thread::Conveyor::Monitored - monitor a belt for specific content
SYNOPSIS
use Thread::Conveyor::Monitored;
my $mbelt = Thread::Conveyor::Monitored->new(
{
monitor => sub { print "monitoring value $_[0]\n" }, # is a must
pre => sub { print "prepare monitoring\n" }, # optional
post => sub { print "stop monitoring\n" }, # optional
belt => $belt, # use existing belt, create new if not specified
exit => 'exit', # defaults to undef
checkpoint => sub { print "checkpointing\n" },
frequency => 1000,
optimize => 'memory', # optimization
maxboxes => 50, # specify throttling
minboxes => 25, # parameters
}
);
$mbelt->put( "foo",['listref'],{'hashref'} );
$mbelt->put( undef ); # exit value by default
$mbelt->shutdown;
$mthread = $mbelt->thread;
$mtid = $mbelt->tid;
$belt = $mbelt->belt;
@post = $mthread->join; # optional, wait for monitor thread to end
$belt = Thread::Conveyor::Monitored->belt; # "pre", "do", "post"
DESCRIPTION
*** A note of CAUTION ***
This module only functions on Perl versions 5.8.0 and later.
And then only when threads are enabled with -Dusethreads.
It is of no use with any version of Perl before 5.8.0 or
without threads enabled.
*************************
The Thread::Conveyor::Monitored
module implements a single worker thread that takes of boxes of values from a belt created with Thread::Conveyor and which checks the boxes for specific content.
It can be used for simply logging actions that are placed on the belt. Or only output warnings if a certain value is encountered in a box. Or create a safe sandbox for Perl modules that are not thread-safe yet.
The action performed in the thread, is determined by a name or reference to a subroutine. This subroutine is called for every box of values obtained from the belt.
Any number of threads can safely put boxes with values and reference on the belt.
Optional checkpointing allows you to check and save intermediate status.
CLASS METHODS
new
$mbelt = Thread::Conveyor::Monitored->new(
{
pre => \&pre,
monitor => 'monitor',
post => \&module::post,
belt => $belt, # use existing belt, create new if not specified
exit => 'exit', # defaults to undef
checkpoint => \&checkpoint,
frequency => 1000,
optimize => 'memory',
maxboxes => 50,
minboxes => 25,
},
@parameters
);
The new
function creates a monitoring function on an existing or on a new (empty) belt. It returns the instantiated Thread::Conveyor::Monitored object.
The first input parameter is a reference to a hash that should at least contain the "monitor" key with a subroutine reference.
The other input parameters are optional. If specified, they are passed to the the "pre" routine which is executed once when the monitoring is started.
The following field must be specified in the hash reference:
- do
-
monitor => 'monitor_the_belt', # assume caller's namespace
or:
monitor => 'Package::monitor_the_belt',
or:
monitor => \&SomeOther::monitor_the_belt,
or:
monitor => sub {print "anonymous sub monitoring the belt\n"},
The "monitor" field specifies the subroutine to be executed for each set of values that is removed from the belt. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.
The specified subroutine should expect the following parameters to be passed:
1..N set of values obtained from the box on the belt
What the subroutine does with the values, is entirely up to the developer.
The following fields are optional in the hash reference:
- pre
-
pre => 'prepare_monitoring', # assume caller's namespace
or:
pre => 'Package::prepare_monitoring',
or:
pre => \&SomeOther::prepare_monitoring,
or:
pre => sub {print "anonymous sub preparing the monitoring\n"},
The "pre" field specifies the subroutine to be executed once when the monitoring of the belt is started. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.
The specified subroutine should expect the following parameters to be passed:
1..N any extra parameters that were passed with the call to L<new>.
- post
-
post => 'stop_monitoring', # assume caller's namespace
or:
post => 'Package::stop_monitoring',
or:
post => \&SomeOther::stop_monitoring,
or:
post => sub {print "anonymous sub when stopping the monitoring\n"},
The "post" field specifies the subroutine to be executed once when the monitoring of the belt is stopped. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.
The specified subroutine should expect the following parameters to be passed:
1..N any parameters that were passed with the call to L<new>.
Any values returned by the "post" routine, can be obtained with the
join
method on the thread object. - belt
-
belt => $belt, # create new one if not specified
The "belt" field specifies the Thread::Conveyor object that should be monitored. A new Thread::Conveyor object will be created if it is not specified.
- exit
-
exit => 'exit', # defaults to undef
The "exit" field specifies the value that will cause the monitoring thread to seize monitoring. The "undef" value will be assumed if it is not specified. This value should be put in a box on the belt to have the monitoring thread stop.
- checkpoint
-
checkpoint => 'checkpointing', # assume caller's namespace
or:
checkpoint => 'Package::checkpointing',
or:
checkpoint => \&SomeOther::checkpointing,
or:
checkpoint => sub {print "anonymous sub to do checkpointing\n"},
The "checkpoint" field specifies the subroutine to be executed everytime a checkpoint should be made (e.g. for saving or updating status). It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.
No checkpointing will occur by default. The frequency of checkpointing can be specified with the "frequency" field.
The specified subroutine should not expect any parameters to be passed. Any values retuirned by the checkpointing routine, will be lost.
- frequency
-
frequency => 100, # default = 1000
The "frequency" field specifies the number of boxes that should have been mnitored before the "checkpoint" routine is called. If a checkpoint routine is specified but no frequency field is specified, then a frequency of 1000 will be assumed.
This field has no meaning if no checkpoint routine is specified with the "checkpoint" field. The default frequency can be changed with the frequency method.
- optimize
-
optimize => 'cpu', # default: 'memory'
The "optimize" field specifies which implementation of the belt will be selected if there is no existing belt specified with the 'belt' field. Currently there are two choices: 'cpu' and 'memory'. By default, the "memory" optimization will be selected if no specific optmization is specified.
You can call the class method optimize to change the default optimization.
- maxboxes
-
maxboxes => 50, maxboxes => undef, # disable throttling
The "maxboxes" field specifies the maximum number of boxes that can be sitting on the belt to be handled (throttling). If a new put would exceed this amount, putting of boxes will be halted until the number of boxes waiting to be handled has become at least as low as the amount specified with the "minboxes" field.
Fifty boxes will be assumed for the "maxboxes" field if it is not specified. If you do not want to have any throttling, you can specify the value "undef" for the field. But beware! If you do not have throttling active, you may wind up using excessive amounts of memory used for storing all of the boxes that have not been handled yet.
The maxboxes method can be called to change the throttling settings during the lifetime of the object.
- minboxes
-
minboxes => 25, # default: maxboxes / 2
The "minboxes" field specified the minimum number of boxes that can be waiting on the belt to be handled before the putting of boxes is allowed again (throttling).
If throttling is active and the "minboxes" field is not specified, then half of the "maxboxes" value will be assumed.
The minboxes method can be called to change the throttling settings during the lifetime of the object.
belt
$belt = Thread::Conveyor::Monitored->belt; # only within "pre" and "do"
The class method "belt" returns the Thread::Conveyor::xxx object that this thread is monitoring. It is available within the "pre" and "do" subroutine only.
frequency
Thread::Conveyor::Monitored->frequency( 100 );
$frequency = Thread::Conveyor::Monitored->frequency;
The "frequency" class method allows you to specify the default frequency that will be used when a checkpoint routine is specified with the "checkpoint" field. The default frequency is set to 1000 if no other value has been previously specified.
optimize
Thread::Conveyor::Monitored->optimize( 'cpu' );
$optimize = Thread::Conveyor::Monitored->optimize;
The "optimize" class method allows you to specify the default optimization type that will be used if no "optimize" field has been explicitely specified with a call to new. It returns the current default type of optimization.
Currently two types of optimization can be selected:
- memory
-
Attempt to use as little memory as possible. Currently, this is achieved by starting a seperate thread which hosts an unshared array. This uses the "Thread::Conveyor::Thread" sub-class.
- cpu
-
Attempt to use as little CPU as possible. Currently, this is achieved by using a shared array (using the "Thread::Conveyor::Array" sub-class), encapsulated in a hash reference if throttling is activated (then also using the "Thread::Conveyor::Throttled" sub-class).
OBJECT METHODS
put
$mbelt->put( $scalar,[],{} );
$mbelt->put( 'exit' ); # stop monitoring
The "put" method freezes all specified parameters in a box and puts it on the belt. The monitoring thread will stop monitoring if the "exit" value is put in the box.
Please note that if you need to be very efficient, it may be wortwhile to extract the actual belt object first and use that to put boxes on the belt. The monitored "put" method is in fact only a gateway to the actual belt that is inside this object.
maxboxes
$mbelt->maxboxes( 100 );
$maxboxes = $mbelt->maxboxes;
The "maxboxes" method returns the maximum number of boxes that can be on the belt before throttling sets in. The input value, if specified, specifies the new maximum number of boxes that may be on the belt. Throttling will be switched off if the value undef is specified.
Specifying the "maxboxes" field when creating the object with new is equivalent to calling this method.
The minboxes method can be called to specify the minimum number of boxes that must be on the belt before the putting of boxes is allowed again after reaching the maximum number of boxes. By default, half of the "maxboxes" value is assumed.
minboxes
$mbelt->minboxes( 50 );
$minboxes = $mbelt->minboxes;
The "minboxes" method returns the minimum number of boxes that must be on the belt before the putting of boxes is allowed again after reaching the maximum number of boxes. The input value, if specified, specifies the new minimum number of boxes that must be on the belt.
Specifying the "minboxes" field when creating the object with new is equivalent to calling this method.
The maxboxes method can be called to set the maximum number of boxes that may be on the belt before the putting of boxes will be halted.
belt
$belt = $mbelt->belt;
The "belt" instance method returns the Thread::Conveyor::xxx object that is being monitored.
frequency
$frequency = $mbelt->frequency;
The "frequency" instance method returns the frequency with which the checkpoint routine is being called. Returns undef if no checkpointing is being done.
shutdown
$mbelt->shutdown;
@from_monitor_thread = $mbelt->shutdown;
The "shutdown" method performs an orderly shutdown of the belt. It waits until all of the boxes on the belt have been removed before it returns.
Whatever was returned by the "post" routine of the monitoring thread, will also be returned by the "shutdown" method.
thread
$mthread = $mbelt->thread;
The "thread" method returns the thread object that is monitoring the contents of the belt.
tid
$tid = $mbelt->tid;
The "tid" method returns the thread id of the thread object that is monitoring the contents of the belt.
CAVEATS
You cannot remove any boxes from the belt, as that is done by the monitoring thread. Therefore, the methods "take", "take_dontwait", "peek" and "peek_dontwait" are disabled on this object.
Passing unshared values between threads is accomplished by freezing the specified values using Storable
when putting the boxes on the belt and thawing the values when the box is taken off the belt. This allows for great flexibility at the expense of more CPU usage. Unfortunately it also limits what can be passed, as e.g. code references and blessed objects can not (yet) be frozen and therefore not be passed.
AUTHOR
Elizabeth Mattijsen, <liz@dijkmat.nl>.
Please report bugs to <perlbugs@dijkmat.nl>.
COPYRIGHT
Copyright (c) 2002 Elizabeth Mattijsen <liz@dijkmat.nl>. All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.