NAME
MCE::Shared::Condvar - Condvar helper class
VERSION
This document describes MCE::Shared::Condvar version 1.869
DESCRIPTION
This helper class for MCE::Shared provides a Scalar
, Mutex
, and primitives for conditional locking.
SYNOPSIS
use
MCE::Shared;
my
$cv
= MCE::Shared->condvar( 0 );
# OO interface
$val
=
$cv
->set(
$val
);
$val
=
$cv
->get();
$len
=
$cv
->len();
# conditional locking primitives
$cv
->
lock
();
$cv
->unlock();
$cv
->broadcast(0.05);
# delay before broadcasting
$cv
->broadcast();
$cv
->signal(0.05);
# delay before signaling
$cv
->signal();
$cv
->timedwait(2.5);
$cv
->
wait
();
# included, sugar methods without having to call set/get explicitly
$val
=
$cv
->append(
$string
);
# $val .= $string
$val
=
$cv
->decr();
# --$val
$val
=
$cv
->decrby(
$number
);
# $val -= $number
$val
=
$cv
->getdecr();
# $val--
$val
=
$cv
->getincr();
# $val++
$val
=
$cv
->incr();
# ++$val
$val
=
$cv
->incrby(
$number
);
# $val += $number
$old
=
$cv
->getset(
$new
);
# $o = $v, $v = $n, $o
EXAMPLE
The following example demonstrates barrier synchronization.
use
MCE;
use
MCE::Shared;
my
$num_workers
= 8;
my
$count
= MCE::Shared->condvar(0);
my
$state
= MCE::Shared->
scalar
(
'ready'
);
my
$microsecs
= ( $^O =~ /mswin|mingw|msys|cygwin/i ) ? 0 : 200;
# The lock is released upon entering ->broadcast, ->signal, ->timedwait,
# and ->wait. For performance reasons, the condition variable is *not*
# re-locked prior to exiting the call. Therefore, obtain the lock when
# synchronization is desired subsequently.
sub
barrier_sync {
usleep(
$microsecs
)
while
$state
->get eq
'down'
;
$count
->
lock
;
$state
->set(
'up'
),
$count
->incr;
if
(
$count
->get ==
$num_workers
) {
$count
->decr,
$state
->set(
'down'
);
$count
->broadcast;
}
else
{
$count
->
wait
while
$state
->get eq
'up'
;
$count
->
lock
;
$state
->set(
'ready'
)
if
$count
->decr == 0;
$count
->unlock;
}
}
sub
user_func {
my
$id
= MCE->wid;
for
(1 .. 400) {
MCE->
(
"$_: $id\n"
);
barrier_sync();
# made possible by MCE::Shared::Condvar
# MCE->sync(); # same thing via the MCE-Core API
}
}
my
$mce
= MCE->new(
max_workers
=>
$num_workers
,
user_func
=> \
&user_func
)->run;
# Time taken from a 2.6 GHz machine running Mac OS X.
# threads::shared: 0.207s Perl threads
# forks::shared: 36.426s child processes
# MCE::Shared: 0.353s child processes
# MCE Sync: 0.062s child processes
API DOCUMENTATION
MCE::Shared::Condvar->new ( )
Called by MCE::Shared for constructing a shared-condvar object.
MCE::Shared->condvar ( [ value ] )
Constructs a new condition variable. Its value defaults to 0
when value
is not specified.
set ( value )
Sets the value associated with the cv
object. The new value is returned in scalar context.
$val
=
$cv
->set( 10 );
$cv
->set( 10 );
get
Returns the value associated with the cv
object.
$val
=
$cv
->get;
len
Returns the length of the value. It returns the undef
value if the value is not defined.
$len
=
$var
->len;
lock
Attempts to grab the lock and waits if not available. Multiple calls to $cv->lock
by the same process or thread is safe. The mutex will remain locked until $cv->unlock
is called.
$cv
->
lock
;
unlock
Releases the lock. A held lock by an exiting process or thread is released automatically.
$cv
->unlock;
signal ( [ floating_seconds ] )
Releases a held lock on the variable. Then, unblocks one process or thread that's wait
ing on that variable. The variable is *not* locked upon return.
Optionally, delay floating_seconds
before signaling.
$count
->signal;
$count
->signal( 0.5 );
broadcast ( [ floating_seconds ] )
The broadcast
method works similarly to signal
. It releases a held lock on the variable. Then, unblocks all the processes or threads that are blocked in a condition wait
on the variable, rather than only one. The variable is *not* locked upon return.
Optionally, delay floating_seconds
before broadcasting.
$count
->broadcast;
$count
->broadcast( 0.5 );
wait
Releases a held lock on the variable. Then, waits until another thread does a signal
or broadcast
for the same variable. The variable is *not* locked upon return.
$count
->
wait
()
while
$state
->get() eq
"bar"
;
timedwait ( floating_seconds )
Releases a held lock on the variable. Then, waits until another thread does a signal
or broadcast
for the same variable or if the timeout exceeds floating_seconds
.
A false value is returned if the timeout is reached, and a true value otherwise. In either case, the variable is *not* locked upon return.
$count
->timedwait( 10 )
while
$state
->get() eq
"foo"
;
SUGAR METHODS
This module is equipped with sugar methods to not have to call set
and get
explicitly. In shared context, the benefit is atomicity and reduction in inter-process communication.
The API resembles a subset of the Redis primitives http://redis.io/commands#strings without the key argument.
append ( value )
Appends a value at the end of the current value and returns its new length.
$len
=
$cv
->append(
"foo"
);
decr
Decrements the value by one and returns its new value.
$num
=
$cv
->decr;
decrby ( number )
Decrements the value by the given number and returns its new value.
$num
=
$cv
->decrby( 2 );
getdecr
Decrements the value by one and returns its old value.
$old
=
$cv
->getdecr;
getincr
Increments the value by one and returns its old value.
$old
=
$cv
->getincr;
getset ( value )
Sets the value and returns its old value.
$old
=
$cv
->getset(
"baz"
);
incr
Increments the value by one and returns its new value.
$num
=
$cv
->incr;
incrby ( number )
Increments the value by the given number and returns its new value.
$num
=
$cv
->incrby( 2 );
CHAMENEOS DEMONSTRATION
The MCE example is derived from the chameneos example by Jonathan DePeri and Andrew Rodland.
use
5.010;
use
strict;
use
warnings;
use
MCE::Hobo;
use
MCE::Shared;
die
'No argument given'
if
not
@ARGV
;
my
$start
=
time
;
my
%color
= (
blue
=> 1,
red
=> 2,
yellow
=> 4 );
my
(
@colors
,
@complement
);
@colors
[
values
%color
] =
keys
%color
;
for
my
$triple
(
[
qw(blue blue blue)
],
[
qw(red red red)
],
[
qw(yellow yellow yellow)
],
[
qw(blue red yellow)
],
[
qw(blue yellow red)
],
[
qw(red blue yellow)
],
[
qw(red yellow blue)
],
[
qw(yellow red blue)
],
[
qw(yellow blue red)
],
) {
$complement
[
$color
{
$triple
->[0]} |
$color
{
$triple
->[1]} ] =
$color
{
$triple
->[2]};
}
my
@numbers
=
qw(zero one two three four five six seven eight nine)
;
sub
display_complements
{
for
my
$i
(1, 2, 4) {
for
my
$j
(1, 2, 4) {
"$colors[$i] + $colors[$j] -> $colors[ $complement[$i | $j] ]\n"
;
}
}
"\n"
;
}
sub
num2words
{
join
' '
,
''
,
map
$numbers
[
$_
],
split
//,
shift
;
}
# Construct condvars and queues first before other shared objects or in
# any order when IO::FDPass is installed, used by MCE::Shared::Server.
my
$meetings
= MCE::Shared->condvar();
tie
my
@creatures
,
'MCE::Shared'
;
tie
my
$first
,
'MCE::Shared'
,
undef
;
tie
my
@met
,
'MCE::Shared'
;
tie
my
@met_self
,
'MCE::Shared'
;
sub
chameneos
{
my
$id
=
shift
;
while
(1) {
$meetings
->
lock
();
unless
(
$meetings
->get()) {
$meetings
->unlock();
last
;
}
if
(
defined
$first
) {
$creatures
[
$first
] =
$creatures
[
$id
] =
$complement
[
$creatures
[
$first
] |
$creatures
[
$id
]];
$met_self
[
$first
]++
if
(
$first
==
$id
);
$met
[
$first
]++;
$met
[
$id
]++;
$meetings
->decr();
$first
=
undef
;
# Unlike threads::shared (condvar) which retains the lock
# while in the scope, MCE::Shared signal and wait methods
# must be called prior to leaving the block, due to lock
# being released upon return.
$meetings
->signal();
}
else
{
$first
=
$id
;
$meetings
->
wait
();
# ditto ^^
}
}
}
sub
pall_mall
{
my
$N
=
shift
;
@creatures
=
map
$color
{
$_
},
@_
;
my
@threads
;
" "
,
join
(
" "
,
@_
);
$meetings
->set(
$N
);
for
(0 ..
$#creatures
) {
$met
[
$_
] =
$met_self
[
$_
] = 0;
push
@threads
, MCE::Hobo->create(\
&chameneos
,
$_
);
}
for
(
@threads
) {
$_
->
join
();
}
$meetings
->set(0);
for
(0 ..
$#creatures
) {
"\n$met[$_]"
, num2words(
$met_self
[
$_
]);
$meetings
->incrby(
$met
[
$_
]);
}
"\n"
, num2words(
$meetings
->get()),
"\n\n"
;
}
display_complements();
pall_mall(
$ARGV
[0],
qw(blue red yellow)
);
pall_mall(
$ARGV
[0],
qw(blue red yellow red yellow blue red yellow red blue)
);
printf
"duration: %0.03f\n"
,
time
-
$start
;
CREDITS
The conditional locking feature is inspired by threads::shared.
LIMITATIONS
Perl must have IO::FDPass for constructing a shared condvar
or queue
while the shared-manager process is running. For platforms where IO::FDPass isn't possible, construct condvar
and queue
before other classes. On systems without IO::FDPass
, the manager process is delayed until sharing other classes or started explicitly.
use
MCE::Shared;
my
$has_IO_FDPass
=
$INC
{
'IO/FDPass.pm'
} ? 1 : 0;
my
$cv
= MCE::Shared->condvar();
my
$que
= MCE::Shared->queue();
MCE::Shared->start()
unless
$has_IO_FDPass
;
Regarding mce_open, IO::FDPass
is needed for constructing a shared-handle from a non-shared handle not yet available inside the shared-manager process. The workaround is to have the non-shared handle made before the shared-manager is started. Passing a file by reference is fine for the three STD* handles.
# The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR.
mce_open
my
$shared_in
,
"<"
, \
*STDIN
;
# ok
mce_open
my
$shared_out
,
">>"
, \
*STDOUT
;
# ok
mce_open
my
$shared_err
,
">>"
, \
*STDERR
;
# ok
mce_open
my
$shared_fh1
,
"<"
,
"/path/to/sequence.fasta"
;
# ok
mce_open
my
$shared_fh2
,
">>"
,
"/path/to/results.log"
;
# ok
mce_open
my
$shared_fh
,
">>"
, \
*NON_SHARED_FH
;
# requires IO::FDPass
The IO::FDPass module is known to work reliably on most platforms. Install 1.1 or later to rid of limitations described above.
perl -MIO::FDPass -le
"print 'Cheers! Perl has IO::FDPass.'"
INDEX
AUTHOR
Mario E. Roy, <marioeroy AT gmail DOT com>