NAME

Thread::Conveyor::Monitored - monitor a belt for specific content

SYNOPSIS

use Thread::Conveyor::Monitored;
my ($belt,$thread) = Thread::Conveyor::Monitored->new(
 {
  monitor => sub { print "monitoring value $_[0]\n" }, # is a must
  pre => sub { print "prepare monitoring\n" },         # optional
  post => sub { print "stop monitoring\n" },           # optional
  belt => $belt,   # use existing belt, create new if not specified
  exit => 'exit',  # defaults to undef

  maxboxes => 50,  # specify throttling
  minboxes => 25,  # parameters
 }
);

$belt->put( "foo",['listref'],{'hashref'} );
$belt->put( undef ); # exit value by default

@post = $thread->join; # optional, wait for monitor thread to end

$belt = Thread::Conveyor::Monitored->belt; # "pre", "do", "post"

DESCRIPTION

                *** A note of CAUTION ***

This module only functions on Perl versions 5.8.0 and later.
And then only when threads are enabled with -Dusethreads.
It is of no use with any version of Perl before 5.8.0 or
without threads enabled.

                *************************

The Thread::Conveyor::Monitored module implements a single worker thread that takes of boxes of values from a belt created with Thread::Conveyor and which checks the boxes for specific content.

It can be used for simply logging actions that are placed on the belt. Or only output warnings if a certain value is encountered in a box. Or create a safe sandbox for Perl modules that are not thread-safe yet.

The action performed in the thread, is determined by a name or reference to a subroutine. This subroutine is called for every box of values obtained from the belt.

Any number of threads can safely put boxes with values and reference on the belt.

CLASS METHODS

new

($belt,$thread) = Thread::Conveyor::Monitored->new(
 {
  pre => \&pre,
  monitor => 'monitor',
  post => \&module::post,
  belt => $belt,   # use existing belt, create new if not specified
  exit => 'exit',  # defaults to undef
 }
);

The new function creates a monitoring function on an existing or on a new (empty) belt. It returns the instantiated Thread::Conveyor::Monitored object in scalar context: in that case, the monitoring thread will be detached and will continue until the exit value is put in a box on the belt. In list context, the thread object is also returned, which can be used to wait for the thread to be really finished using the join() method.

The first input parameter is a reference to a hash that should at least contain the "monitor" key with a subroutine reference.

The other input parameters are optional. If specified, they are passed to the the "pre" routine which is executed once when the monitoring is started.

The following field must be specified in the hash reference:

do
monitor => 'monitor_the_belt',	# assume caller's namespace

or:

monitor => 'Package::monitor_the_belt',

or:

monitor => \&SomeOther::monitor_the_belt,

or:

monitor => sub {print "anonymous sub monitoring the belt\n"},

The "monitor" field specifies the subroutine to be executed for each set of values that is removed from the belt. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

1..N  set of values obtained from the box on the belt

What the subroutine does with the values, is entirely up to the developer.

The following fields are optional in the hash reference:

pre
pre => 'prepare_monitoring',		# assume caller's namespace

or:

pre => 'Package::prepare_monitoring',

or:

pre => \&SomeOther::prepare_monitoring,

or:

pre => sub {print "anonymous sub preparing the monitoring\n"},

The "pre" field specifies the subroutine to be executed once when the monitoring of the belt is started. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

1..N  any extra parameters that were passed with the call to L<new>.
post
post => 'stop_monitoring',		# assume caller's namespace

or:

post => 'Package::stop_monitoring',

or:

post => \&SomeOther::stop_monitoring,

or:

post => sub {print "anonymous sub when stopping the monitoring\n"},

The "post" field specifies the subroutine to be executed once when the monitoring of the belt is stopped. It must be specified as either the name of a subroutine or as a reference to a (anonymous) subroutine.

The specified subroutine should expect the following parameters to be passed:

1..N  any parameters that were passed with the call to L<new>.

Any values returned by the "post" routine, can be obtained with the join method on the thread object.

belt
belt => $belt,  # create new one if not specified

The "belt" field specifies the Thread::Conveyor object that should be monitored. A new Thread::Conveyor object will be created if it is not specified.

exit
exit => 'exit',   # defaults to undef

The "exit" field specifies the value that will cause the monitoring thread to seize monitoring. The "undef" value will be assumed if it is not specified. This value should be put in a box on the belt to have the monitoring thread stop.

maxboxes
maxboxes => 50,

maxboxes => undef,  # disable throttling

The "maxboxes" field specifies the maximum number of boxes that can be sitting on the belt to be handled (throttling). If a new put would exceed this amount, putting of boxes will be halted until the number of boxes waiting to be handled has become at least as low as the amount specified with the "minboxes" field.

Fifty boxes will be assumed for the "maxboxes" field if it is not specified. If you do not want to have any throttling, you can specify the value "undef" for the field. But beware! If you do not have throttling active, you may wind up using excessive amounts of memory used for storing all of the boxes that have not been handled yet.

The maxboxes method can be called to change the throttling settings during the lifetime of the object.

minboxes
minboxes => 25, # default: maxboxes / 2

The "minboxes" field specified the minimum number of boxes that can be waiting on the belt to be handled before the putting of boxes is allowed again (throttling).

If throttling is active and the "minboxes" field is not specified, then half of the "maxboxes" value will be assumed.

The minboxes method can be called to change the throttling settings during the lifetime of the object.

belt

$belt = Thread::Conveyor::Monitored->belt; # only within "pre" and "do"

The class method "belt" returns the Thread::Conveyor object for which this thread is monitoring. It is available within the "pre" and "do" subroutine only.

OBJECT METHODS

put

$belt->put( $scalar,[],{} );
$belt->put( 'exit' ); # stop monitoring

The "put" method freezes all specified parameters in a box and puts it on the belt. The monitoring thread will stop monitoring if the "exit" value is put in the box.

maxboxes

$belt->maxboxes( 100 );
$maxboxes = $belt->maxboxes;

The "maxboxes" method returns the maximum number of boxes that can be on the belt before throttling sets in. The input value, if specified, specifies the new maximum number of boxes that may be on the belt. Throttling will be switched off if the value undef is specified.

Specifying the "maxboxes" field when creating the object with new is equivalent to calling this method.

The minboxes method can be called to specify the minimum number of boxes that must be on the belt before the putting of boxes is allowed again after reaching the maximum number of boxes. By default, half of the "maxboxes" value is assumed.

minboxes

$belt->minboxes( 50 );
$minboxes = $belt->minboxes;

The "minboxes" method returns the minimum number of boxes that must be on the belt before the putting of boxes is allowed again after reaching the maximum number of boxes. The input value, if specified, specifies the new minimum number of boxes that must be on the belt.

Specifying the "minboxes" field when creating the object with new is equivalent to calling this method.

The maxboxes method can be called to set the maximum number of boxes that may be on the belt before the putting of boxes will be halted.

CAVEATS

You cannot remove any boxes from the belt, as that is done by the monitoring thread. Therefore, the methods "take", "take_dontwait", "peek" and "peek_dontwait" are disabled on this object.

Passing unshared values between threads is accomplished by freezing the specified values using Storable when putting the boxes on the belt and thawing the values when the box is taken off the belt. This allows for great flexibility at the expense of more CPU usage. Unfortunately it also limits what can be passed, as e.g. code references and blessed objects can not (yet) be frozen and therefore not be passed.

AUTHOR

Elizabeth Mattijsen, <liz@dijkmat.nl>.

Please report bugs to <perlbugs@dijkmat.nl>.

COPYRIGHT

Copyright (c) 2002 Elizabeth Mattijsen <liz@dijkmat.nl>. All rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

SEE ALSO

threads, threads::shared, Thread::Conveyor, Storable.