NAME

Thread::Conveyor::Monitored - monitor a belt for specific content

SYNOPSIS

use Thread::Conveyor::Monitored;
my $mbelt = Thread::Conveyor::Monitored->new(
 {
  monitor => sub { print "monitoring value $_[0]\n" }, # is a must
  pre => sub { print "prepare monitoring\n" },         # optional
  post => sub { print "stop monitoring\n" },           # optional
  belt => $belt,   # use existing belt, create new if not specified
  exit => 'exit',  # defaults to undef

  checkpoint => sub { print "checkpointing\n" },
  frequency => 1000,

  optimize => 'memory', # optimization
  maxboxes => 50,       # specify throttling
  minboxes => 25,       # parameters
 }
);

$mbelt->put( "foo",['listref'],{'hashref'} );
$mbelt->put( undef ); # exit value by default
$mbelt->shutdown;

$mthread = $mbelt->thread;
$mtid = $mbelt->tid;

$belt = $mbelt->belt;

@post = $mthread->join; # optional, wait for monitor thread to end

$belt = Thread::Conveyor::Monitored->belt; # "pre", "do", "post"

DESCRIPTION

                *** A note of CAUTION ***

This module only functions on Perl versions 5.8.0 and later.
And then only when threads are enabled with -Dusethreads.
It is of no use with any version of Perl before 5.8.0 or
without threads enabled.

                *************************

The Thread::Conveyor::Monitored module implements a single worker thread that takes of boxes of values from a belt created with Thread::Conveyor and which checks the boxes for specific content.

It can be used for simply logging actions that are placed on the belt. Or only output warnings if a certain value is encountered in a box. Or create a safe sandbox for Perl modules that are not thread-safe yet.

The action performed in the thread, is determined by a name or reference to a subroutine. This subroutine is called for every box of values obtained from the belt.

Any number of threads can safely put boxes with values and reference on the belt.

Optional checkpointing allows you to check and save intermediate status.

CLASS METHODS

new

$mbelt = Thread::Conveyor::Monitored->new(
 {
  pre => \&pre,
  monitor => 'monitor',
  post => \&module::post,
  belt => $belt,   # use existing belt, create new if not specified
  exit => 'exit',  # defaults to undef

  checkpoint => \&checkpoint,
  frequency => 1000,

  optimize => 'memory',
  maxboxes => 50,
  minboxes => 25,
 },
 @parameters
);

The new function creates a monitoring function on an existing or on a new (empty) belt. It returns the instantiated Thread::Conveyor::Monitored object.

The first input parameter is a reference to a hash that should at least contain the "monitor" key with a subroutine reference.

The other input parameters are optional. If specified, they are passed to the the "pre" routine which is executed once when the monitoring is started.

The following field must be specified in the hash reference:

do
monitor => 'monitor_the_belt',	# assume caller's namespace

or:

monitor => 'Package::monitor_the_belt',

or:

monitor => \&SomeOther::monitor_the_belt,

or:

monitor => sub {print "anonymous sub monitoring the belt\n"},

The "monitor" field specifies the subroutine to be executed for each set of values that is removed from the belt. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

1..N  set of values obtained from the box on the belt

What the subroutine does with the values, is entirely up to the developer.

The following fields are optional in the hash reference:

pre
pre => 'prepare_monitoring',		# assume caller's namespace

or:

pre => 'Package::prepare_monitoring',

or:

pre => \&SomeOther::prepare_monitoring,

or:

pre => sub {print "anonymous sub preparing the monitoring\n"},

The "pre" field specifies the subroutine to be executed once when the monitoring of the belt is started. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

1..N  any extra parameters that were passed with the call to L<new>.
post
post => 'stop_monitoring',		# assume caller's namespace

or:

post => 'Package::stop_monitoring',

or:

post => \&SomeOther::stop_monitoring,

or:

post => sub {print "anonymous sub when stopping the monitoring\n"},

The "post" field specifies the subroutine to be executed once when the monitoring of the belt is stopped. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

1..N  any parameters that were passed with the call to L<new>.

Any values returned by the "post" routine, can be obtained with the join method on the thread object.

belt
belt => $belt,  # create new one if not specified

The "belt" field specifies the Thread::Conveyor object that should be monitored. A new Thread::Conveyor object will be created if it is not specified.

exit
exit => 'exit',   # defaults to undef

The "exit" field specifies the value that will cause the monitoring thread to seize monitoring. The "undef" value will be assumed if it is not specified. This value should be put in a box on the belt to have the monitoring thread stop.

checkpoint
checkpoint => 'checkpointing',			# assume caller's namespace

or:

checkpoint => 'Package::checkpointing',

or:

checkpoint => \&SomeOther::checkpointing,

or:

checkpoint => sub {print "anonymous sub to do checkpointing\n"},

The "checkpoint" field specifies the subroutine to be executed everytime a checkpoint should be made (e.g. for saving or updating status). It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

No checkpointing will occur by default. The frequency of checkpointing can be specified with the "frequency" field.

The specified subroutine should not expect any parameters to be passed. Any values retuirned by the checkpointing routine, will be lost.

frequency
frequency => 100,                             # default = 1000

The "frequency" field specifies the number of boxes that should have been mnitored before the "checkpoint" routine is called. If a checkpoint routine is specified but no frequency field is specified, then a frequency of 1000 will be assumed.

This field has no meaning if no checkpoint routine is specified with the "checkpoint" field. The default frequency can be changed with the frequency method.

optimize
optimize => 'cpu', # default: 'memory'

The "optimize" field specifies which implementation of the belt will be selected if there is no existing belt specified with the 'belt' field. Currently there are two choices: 'cpu' and 'memory'. By default, the "memory" optimization will be selected if no specific optmization is specified.

You can call the class method optimize to change the default optimization.

maxboxes
maxboxes => 50,

maxboxes => undef,  # disable throttling

The "maxboxes" field specifies the maximum number of boxes that can be sitting on the belt to be handled (throttling). If a new put would exceed this amount, putting of boxes will be halted until the number of boxes waiting to be handled has become at least as low as the amount specified with the "minboxes" field.

Fifty boxes will be assumed for the "maxboxes" field if it is not specified. If you do not want to have any throttling, you can specify the value "undef" for the field. But beware! If you do not have throttling active, you may wind up using excessive amounts of memory used for storing all of the boxes that have not been handled yet.

The maxboxes method can be called to change the throttling settings during the lifetime of the object.

minboxes
minboxes => 25, # default: maxboxes / 2

The "minboxes" field specified the minimum number of boxes that can be waiting on the belt to be handled before the putting of boxes is allowed again (throttling).

If throttling is active and the "minboxes" field is not specified, then half of the "maxboxes" value will be assumed.

The minboxes method can be called to change the throttling settings during the lifetime of the object.

belt

$belt = Thread::Conveyor::Monitored->belt; # only within "pre" and "do"

The class method "belt" returns the Thread::Conveyor::xxx object that this thread is monitoring. It is available within the "pre" and "do" subroutine only.

frequency

Thread::Conveyor::Monitored->frequency( 100 );

$frequency = Thread::Conveyor::Monitored->frequency;

The "frequency" class method allows you to specify the default frequency that will be used when a checkpoint routine is specified with the "checkpoint" field. The default frequency is set to 1000 if no other value has been previously specified.

optimize

Thread::Conveyor::Monitored->optimize( 'cpu' );

$optimize = Thread::Conveyor::Monitored->optimize;

The "optimize" class method allows you to specify the default optimization type that will be used if no "optimize" field has been explicitely specified with a call to new. It returns the current default type of optimization.

Currently two types of optimization can be selected:

memory

Attempt to use as little memory as possible. Currently, this is achieved by starting a seperate thread which hosts an unshared array. This uses the "Thread::Conveyor::Thread" sub-class.

cpu

Attempt to use as little CPU as possible. Currently, this is achieved by using a shared array (using the "Thread::Conveyor::Array" sub-class), encapsulated in a hash reference if throttling is activated (then also using the "Thread::Conveyor::Throttled" sub-class).

OBJECT METHODS

put

$mbelt->put( $scalar,[],{} );
$mbelt->put( 'exit' ); # stop monitoring

The "put" method freezes all specified parameters in a box and puts it on the belt. The monitoring thread will stop monitoring if the "exit" value is put in the box.

Please note that if you need to be very efficient, it may be wortwhile to extract the actual belt object first and use that to put boxes on the belt. The monitored "put" method is in fact only a gateway to the actual belt that is inside this object.

maxboxes

$mbelt->maxboxes( 100 );
$maxboxes = $mbelt->maxboxes;

The "maxboxes" method returns the maximum number of boxes that can be on the belt before throttling sets in. The input value, if specified, specifies the new maximum number of boxes that may be on the belt. Throttling will be switched off if the value undef is specified.

Specifying the "maxboxes" field when creating the object with new is equivalent to calling this method.

The minboxes method can be called to specify the minimum number of boxes that must be on the belt before the putting of boxes is allowed again after reaching the maximum number of boxes. By default, half of the "maxboxes" value is assumed.

minboxes

$mbelt->minboxes( 50 );
$minboxes = $mbelt->minboxes;

The "minboxes" method returns the minimum number of boxes that must be on the belt before the putting of boxes is allowed again after reaching the maximum number of boxes. The input value, if specified, specifies the new minimum number of boxes that must be on the belt.

Specifying the "minboxes" field when creating the object with new is equivalent to calling this method.

The maxboxes method can be called to set the maximum number of boxes that may be on the belt before the putting of boxes will be halted.

belt

$belt = $mbelt->belt;

The "belt" instance method returns the Thread::Conveyor::xxx object that is being monitored.

frequency

$frequency = $mbelt->frequency;

The "frequency" instance method returns the frequency with which the checkpoint routine is being called. Returns undef if no checkpointing is being done.

shutdown

$mbelt->shutdown;

@from_monitor_thread = $mbelt->shutdown;

The "shutdown" method performs an orderly shutdown of the belt. It waits until all of the boxes on the belt have been removed before it returns.

Whatever was returned by the "post" routine of the monitoring thread, will also be returned by the "shutdown" method.

thread

$mthread = $mbelt->thread;

The "thread" method returns the thread object that is monitoring the contents of the belt.

tid

$tid = $mbelt->tid;

The "tid" method returns the thread id of the thread object that is monitoring the contents of the belt.

CAVEATS

You cannot remove any boxes from the belt, as that is done by the monitoring thread. Therefore, the methods "take", "take_dontwait", "peek" and "peek_dontwait" are disabled on this object.

Passing unshared values between threads is accomplished by freezing the specified values using Storable when putting the boxes on the belt and thawing the values when the box is taken off the belt. This allows for great flexibility at the expense of more CPU usage. Unfortunately it also limits what can be passed, as e.g. code references and blessed objects can not (yet) be frozen and therefore not be passed.

AUTHOR

Elizabeth Mattijsen, <liz@dijkmat.nl>.

Please report bugs to <perlbugs@dijkmat.nl>.

COPYRIGHT

Copyright (c) 2002 Elizabeth Mattijsen <liz@dijkmat.nl>. All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

SEE ALSO

threads, threads::shared, Thread::Conveyor, Storable.