NAME
MCE::Shared::Queue - Hybrid-queue helper class
VERSION
This document describes MCE::Shared::Queue version 1.002
SYNOPSIS
# non-shared
use MCE::Shared::Queue;
my $qu = MCE::Shared::Queue->new(
await => 1, fast => 0, queue => [ "." ]
);
# shared
use MCE::Shared;
use MCE::Shared::Queue;
my $qu = MCE::Shared->queue(
porder => $MCE::Shared::Queue::HIGHEST,
type => $MCE::Shared::Queue::FIFO,
fast => 0
);
# Possible values for "porder" and "type".
porder =>
$MCE::Shared::Queue::HIGHEST # Highest priority items dequeue first
$MCE::Shared::Queue::LOWEST # Lowest priority items dequeue first
type =>
$MCE::Shared::Queue::FIFO # First in, first out
$MCE::Shared::Queue::LIFO # Last in, first out
$MCE::Shared::Queue::LILO # Synonym for FIFO
$MCE::Shared::Queue::FILO # Synonym for LIFO
# Below, [ ... ] denotes optional parameters.
$qu->await( [ $pending_threshold ] );
$qu->clear();
$qu->enqueue( $item [, $item, ... ] );
$qu->enqueuep( $priority, $item [, $item, ... ] );
$item = $qu->dequeue();
@items = $qu->dequeue( $count );
$item = $qu->dequeue_nb();
@items = $qu->dequeue_nb( $count );
$qu->insert( $index, $item [, $item, ... ] );
$qu->insertp( $priority, $index, $item [, $item, ... ] );
$count = $qu->pending();
$item = $qu->peek( [ $index ] );
$item = $qu->peekp( $priority [, $index ] );
@array = $qu->heap();
DESCRIPTION
Helper class for MCE::Shared.
This module is mostly compatible with MCE::Queue except for the gather
option which is not supported in this context. It provides a queue interface supporting normal and priority queues.
Data from shared queues reside under the shared-manager process, otherwise locally.
API DOCUMENTATION
- new ( [ options ] )
-
Constructs a new object. Supported options are queue, porder, type, await, and fast.
# non-shared use MCE::Shared::Queue; $q1 = MCE::Shared::Queue->new(); $q2 = MCE::Shared::Queue->new( queue => [ 0, 1, 2 ] ); $q3 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::HIGHEST ); $q4 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::LOWEST ); $q5 = MCE::Shared::Queue->new( type => $MCE::Shared::Queue::FIFO ); $q6 = MCE::Shared::Queue->new( type => $MCE::Shared::Queue::LIFO ); $q7 = MCE::Shared::Queue->new( await => 1 ); $q8 = MCE::Shared::Queue->new( fast => 1 ); # shared; use MCE::Shared; use MCE::Shared::Queue; $q1 = MCE::Shared->queue(); $q2 = MCE::Shared->queue( queue => [ 0, 1, 2 ] ); $q3 = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST ); $q4 = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST ); $q5 = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO ); $q6 = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO ); $q7 = MCE::Shared->queue( await => 1 ); $q8 = MCE::Shared->queue( fast => 1 );
The
await
option, when enabled, allows workers to block (semaphore-like) until the number of items pending is equal or less than a threshold value. Theawait
method is described below.The
fast
option speeds up dequeues and is not enabled by default. It is beneficial for queues not calling (->clear or ->dequeue_nb) and not altering the optional count value while running; e.g. ->dequeue($count). Basically, do not enable 'fast' if varying the count dynamically. - await ( pending_threshold )
-
Waits until the queue drops down to threshold items. The
await
method is beneficial when wanting to throttle worker(s) appending to the queue. Perhaps, consumers are running a bit behind and wanting prevent memory consumption from increasing too high. Below, the number of items pending will never go above 20.use Time::HiRes qw( sleep ); use MCE::Flow; use MCE::Shared; my $q = MCE::Shared->queue( await => 1, fast => 1 ); my ( $producers, $consumers ) = ( 1, 8 ); mce_flow { task_name => [ 'producer', 'consumer' ], max_workers => [ $producers, $consumers ], }, sub { ## producer for my $item ( 1 .. 100 ) { $q->enqueue($item); ## blocks until the # of items pending reaches <= 10 if ($item % 10 == 0) { MCE->say( 'pending: '.$q->pending() ); $q->await(10); } } ## notify consumers no more work $q->enqueue( (undef) x $consumers ); }, sub { ## consumers while (defined (my $next = $q->dequeue())) { MCE->say( MCE->task_wid().': '.$next ); sleep 0.100; } };
- clear ( )
-
Clears the queue of any items.
$q->clear;
- enqueue ( item [, item, ... ] )
-
Appends a list of items onto the end of the normal queue.
$q->enqueue( 'foo' ); $q->enqueue( 'bar', 'baz' );
- enqueuep ( priority, item [, item, ... ] )
-
Appends a list of items onto the end of the priority queue with priority.
$q->enqueue( $priority, 'foo' ); $q->enqueue( $priority, 'bar', 'baz' );
- dequeue ( [ count ] )
-
Returns the requested number of items (default 1) from the queue. Priority data will always dequeue first before any data from the normal queue.
$q->dequeue( 2 ); $q->dequeue; # default 1
The method will block if the queue contains zero items. If the queue contains fewer than the requested number of items, the method will not block, but return the remaining items and
undef
for up to the count requested.The $count, used for requesting the number of items, is beneficial when workers are passing parameters through the queue. For this reason, always remember to dequeue using the same multiple for the count. This is unlike Thread::Queue which will block until the requested number of items are available.
- dequeue_nb ( [ count ] )
-
Returns the requested number of items (default 1) from the queue. Like with dequeue, priority data will always dequeue first. This method is non-blocking and will return
undef
in the absence of data from the queue.$q->dequeue_nb( 2 ); $q->dequeue_nb; # default 1
- insert ( index, item [, item, ... ] )
-
Adds the list of items to the queue at the specified index position (0 is the head of the list). The head of the queue is that item which would be removed by a call to dequeue.
$q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO ); $q->enqueue(1, 2, 3, 4); $q->insert(1, 'foo', 'bar'); # Queue now contains: 1, foo, bar, 2, 3, 4 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO ); $q->enqueue(1, 2, 3, 4); $q->insert(1, 'foo', 'bar'); # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
- insertp ( priority, index, item [, item, ... ] )
-
Adds the list of items to the queue at the specified index position with priority. The behavior is similarly to
$q-
insert> otherwise. - pending ( )
-
Returns the number of items in the queue. The count includes both normal and priority data.
$q = MCE::Shared->queue(); $q->enqueuep(5, 'foo', 'bar'); $q->enqueue('sunny', 'day'); print $q->pending(), "\n"; # Output: 4
- peek ( [ index ] )
-
Returns an item from the normal queue, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified. The head of the queue is that item which would be removed by a call to dequeue. Negative index values are supported, similarly to arrays.
$q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO ); $q->enqueue(1, 2, 3, 4, 5); print $q->peek(1), ' ', $q->peek(-2), "\n"; # Output: 2 4 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO ); $q->enqueue(1, 2, 3, 4, 5); print $q->peek(1), ' ', $q->peek(-2), "\n"; # Output: 4 2
- peekp ( priority [, index ] )
-
Returns an item from the queue with priority, at the specified index, without dequeuing anything. It defaults to the head of the queue if index is not specified. The behavior is similarly to
$q-
peek> otherwise. - peekh ( [ index ] )
-
Returns an item from the head of the heap or at the specified index.
$q = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST ); $q->enqueuep(5, 'foo'); $q->enqueuep(6, 'bar'); $q->enqueuep(4, 'sun'); print $q->peekh(0), "\n"; # Output: 6 $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST ); $q->enqueuep(5, 'foo'); $q->enqueuep(6, 'bar'); $q->enqueuep(4, 'sun'); print $q->peekh(0), "\n"; # Output: 4
- heap ( )
-
Returns an array containing the heap data. Heap data consists of priority numbers, not the data.
@h = $q->heap; # $MCE::Shared::Queue::HIGHEST # Heap contains: 6, 5, 4 @h = $q->heap; # $MCE::Shared::Queue::LOWEST # Heap contains: 4, 5, 6
ACKNOWLEDGEMENTS
- List::BinarySearch
-
The bsearch_num_pos method was helpful for accommodating the highest and lowest order in MCE::Shared::Queue.
- POE::Queue::Array
-
For extra optimization, two if statements were adopted for checking if the item belongs at the end or head of the queue.
- List::Priority
-
MCE::Shared::Queue supports both normal and priority queues.
- Thread::Queue
-
Thread::Queue is used as a template for identifying and documenting the methods.
MCE::Shared::Queue is not fully compatible due to supporting normal and priority queues simultaneously; e.g.
$q->enqueue( $item [, $item, ... ] ); # normal queue $q->enqueuep( $p, $item [, $item, ... ] ); # priority queue $q->dequeue( [ $count ] ); # priority data dequeues first $q->dequeue_nb( [ $count ] ); $q->pending(); # counts both normal/priority queues
LIMITATION
Perl must have the IO::FDPass module installed for constructing a shared queue
or condvar
while the shared-manager process is running.
For platforms where IO::FDPass
is not feasible, construct queues
or condvars
first before other classes. The shared-manager process is delayed until sharing other classes or explicitly starting the process.
use MCE::Shared;
my $q1 = MCE::Shared->queue();
my $cv = MCE::Shared->condvar();
MCE::Shared->start();
INDEX
AUTHOR
Mario E. Roy, <marioeroy AT gmail DOT com>