NAME
Thread::Conveyor::Monitored - monitor a belt for specific content
SYNOPSIS
use Thread::Conveyor::Monitored;
my ($belt,$thread) = Thread::Conveyor::Monitored->new(
{
monitor => sub { print "monitoring value $_[0]\n" }, # is a must
pre => sub { print "prepare monitoring\n" }, # optional
post => sub { print "stop monitoring\n" }, # optional
belt => $belt, # use existing belt, create new if not specified
exit => 'exit', # defaults to undef
maxboxes => 50, # specify throttling
minboxes => 25, # parameters
}
);
$belt->put( "foo",['listref'],{'hashref'} );
$belt->put( undef ); # exit value by default
@post = $thread->join; # optional, wait for monitor thread to end
$belt = Thread::Conveyor::Monitored->belt; # "pre", "do", "post"
DESCRIPTION
*** A note of CAUTION ***
This module only functions on Perl versions 5.8.0 and later.
And then only when threads are enabled with -Dusethreads.
It is of no use with any version of Perl before 5.8.0 or
without threads enabled.
*************************
The Thread::Conveyor::Monitored
module implements a single worker thread that takes of boxes of values from a belt created with Thread::Conveyor and which checks the boxes for specific content.
It can be used for simply logging actions that are placed on the belt. Or only output warnings if a certain value is encountered in a box. Or create a safe sandbox for Perl modules that are not thread-safe yet.
The action performed in the thread, is determined by a name or reference to a subroutine. This subroutine is called for every box of values obtained from the belt.
Any number of threads can safely put boxes with values and reference on the belt.
CLASS METHODS
new
($belt,$thread) = Thread::Conveyor::Monitored->new(
{
pre => \&pre,
monitor => 'monitor',
post => \&module::post,
belt => $belt, # use existing belt, create new if not specified
exit => 'exit', # defaults to undef
}
);
The new
function creates a monitoring function on an existing or on a new (empty) belt. It returns the instantiated Thread::Conveyor::Monitored object in scalar context: in that case, the monitoring thread will be detached and will continue until the exit value is put in a box on the belt. In list context, the thread object is also returned, which can be used to wait for the thread to be really finished using the join()
method.
The first input parameter is a reference to a hash that should at least contain the "monitor" key with a subroutine reference.
The other input parameters are optional. If specified, they are passed to the the "pre" routine which is executed once when the monitoring is started.
The following field must be specified in the hash reference:
- do
-
monitor => 'monitor_the_belt', # assume caller's namespace
or:
monitor => 'Package::monitor_the_belt',
or:
monitor => \&SomeOther::monitor_the_belt,
or:
monitor => sub {print "anonymous sub monitoring the belt\n"},
The "monitor" field specifies the subroutine to be executed for each set of values that is removed from the belt. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.
The specified subroutine should expect the following parameters to be passed:
1..N set of values obtained from the box on the belt
What the subroutine does with the values, is entirely up to the developer.
The following fields are optional in the hash reference:
- pre
-
pre => 'prepare_monitoring', # assume caller's namespace
or:
pre => 'Package::prepare_monitoring',
or:
pre => \&SomeOther::prepare_monitoring,
or:
pre => sub {print "anonymous sub preparing the monitoring\n"},
The "pre" field specifies the subroutine to be executed once when the monitoring of the belt is started. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.
The specified subroutine should expect the following parameters to be passed:
1..N any extra parameters that were passed with the call to L<new>.
- post
-
post => 'stop_monitoring', # assume caller's namespace
or:
post => 'Package::stop_monitoring',
or:
post => \&SomeOther::stop_monitoring,
or:
post => sub {print "anonymous sub when stopping the monitoring\n"},
The "post" field specifies the subroutine to be executed once when the monitoring of the belt is stopped. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.
The specified subroutine should expect the following parameters to be passed:
1..N any parameters that were passed with the call to L<new>.
Any values returned by the "post" routine, can be obtained with the
join
method on the thread object. - belt
-
belt => $belt, # create new one if not specified
The "belt" field specifies the Thread::Conveyor object that should be monitored. A new Thread::Conveyor object will be created if it is not specified.
- exit
-
exit => 'exit', # defaults to undef
The "exit" field specifies the value that will cause the monitoring thread to seize monitoring. The "undef" value will be assumed if it is not specified. This value should be put in a box on the belt to have the monitoring thread stop.
- maxboxes
-
maxboxes => 50, maxboxes => undef, # disable throttling
The "maxboxes" field specifies the maximum number of boxes that can be sitting on the belt to be handled (throttling). If a new put would exceed this amount, putting of boxes will be halted until the number of boxes waiting to be handled has become at least as low as the amount specified with the "minboxes" field.
Fifty boxes will be assumed for the "maxboxes" field if it is not specified. If you do not want to have any throttling, you can specify the value "undef" for the field. But beware! If you do not have throttling active, you may wind up using excessive amounts of memory used for storing all of the boxes that have not been handled yet.
The maxboxes method can be called to change the throttling settings during the lifetime of the object.
- minboxes
-
minboxes => 25, # default: maxboxes / 2
The "minboxes" field specified the minimum number of boxes that can be waiting on the belt to be handled before the putting of boxes is allowed again (throttling).
If throttling is active and the "minboxes" field is not specified, then half of the "maxboxes" value will be assumed.
The minboxes method can be called to change the throttling settings during the lifetime of the object.
belt
$belt = Thread::Conveyor::Monitored->belt; # only within "pre" and "do"
The class method "belt" returns the Thread::Conveyor object for which this thread is monitoring. It is available within the "pre" and "do" subroutine only.
OBJECT METHODS
put
$belt->put( $scalar,[],{} );
$belt->put( 'exit' ); # stop monitoring
The "put" method freezes all specified parameters in a box and puts it on the belt. The monitoring thread will stop monitoring if the "exit" value is put in the box.
maxboxes
$belt->maxboxes( 100 );
$maxboxes = $belt->maxboxes;
The "maxboxes" method returns the maximum number of boxes that can be on the belt before throttling sets in. The input value, if specified, specifies the new maximum number of boxes that may be on the belt. Throttling will be switched off if the value undef is specified.
Specifying the "maxboxes" field when creating the object with new is equivalent to calling this method.
The minboxes method can be called to specify the minimum number of boxes that must be on the belt before the putting of boxes is allowed again after reaching the maximum number of boxes. By default, half of the "maxboxes" value is assumed.
minboxes
$belt->minboxes( 50 );
$minboxes = $belt->minboxes;
The "minboxes" method returns the minimum number of boxes that must be on the belt before the putting of boxes is allowed again after reaching the maximum number of boxes. The input value, if specified, specifies the new minimum number of boxes that must be on the belt.
Specifying the "minboxes" field when creating the object with new is equivalent to calling this method.
The maxboxes method can be called to set the maximum number of boxes that may be on the belt before the putting of boxes will be halted.
CAVEATS
You cannot remove any boxes from the belt, as that is done by the monitoring thread. Therefore, the methods "take", "take_dontwait", "peek" and "peek_dontwait" are disabled on this object.
Passing unshared values between threads is accomplished by freezing the specified values using Storable
when putting the boxes on the belt and thawing the values when the box is taken off the belt. This allows for great flexibility at the expense of more CPU usage. Unfortunately it also limits what can be passed, as e.g. code references and blessed objects can not (yet) be frozen and therefore not be passed.
AUTHOR
Elizabeth Mattijsen, <liz@dijkmat.nl>.
Please report bugs to <perlbugs@dijkmat.nl>.
COPYRIGHT
Copyright (c) 2002 Elizabeth Mattijsen <liz@dijkmat.nl>. All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.