<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">
<head>
<title>Thread::Queue::Duplex</title>
<link rel="stylesheet" type="text/css" href="../../podstyle.css" /><meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
</head>
<body>
<div class="box">
<h1 class="t1">Thread::Queue::Duplex</h1>
<table>
<tr>
<td class="label">Description</td>
<td class="cell">Thread-safe request/response queue with identifiable elements</td>
</tr>
</table>
</div>
<div class="path">
<a href="../../index.html">Thread::Queue::Duplex</a> > Package Manuals >
Thread-Queue-Duplex
</div>
<div>
<a href="../../Thread/Queue/Duplex.html">Classdocs</a>
</div>
<div class="pod">
<!-- INDEX START -->
<h3 id="TOP">Index</h3>
<ul>
<li><a href="#NAME">NAME</a></li>
<li><a href="#SYNOPSIS">SYNOPSIS</a></li>
<li><a href="#DESCRIPTION">DESCRIPTION</a></li>
<li><a href="#METHODS">METHODS</a></li>
<li><a href="#SEE_ALSO">SEE ALSO</a></li>
<li><a href="#AUTHOR_COPYRIGHT_amp_LICENSE">AUTHOR, COPYRIGHT, & LICENSE</a></li>
</ul>
<hr />
<!-- INDEX END -->
<h1 id="NAME">NAME <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>
<p>Thread::Queue::Duplex - thread-safe request/response queue with identifiable elements</p>
<h1 id="SYNOPSIS">SYNOPSIS <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>
<pre> use Thread::Queue::Duplex;
#
# create new queue, and require that there be
# registered listeners for an enqueue operation
# to succeed, and limit the max pending requests
# to 20
#
my $q = Thread::Queue::Duplex->new(ListenerRequired => 1, MaxPending => 20);
#
# register as a listener
#
$q->listen();
#
# unregister as a listener
#
$q->ignore();
#
# wait for a listener to register
#
$q->wait_for_listener($timeout);
#
# change the max pending limit
#
$q->set_max_pending($limit);
#
# enqueue elements, returning a unique queue ID
# (used in the client)
#
my $id = $q->enqueue("foo", "bar");
#
# enqueue elements, and wait for a response
# (used in the client)
#
my $resp = $q->enqueue_and_wait("foo", "bar");
#
# enqueue elements, and wait for a response
# until $timeout secs (used in the client)
#
my $resp = $q->enqueue_and_wait_until($timeout, "foo", "bar");
#
# enqueue elements at head of queue, returning a
# unique queue ID (used in the client)
#
my $id = $q->enqueue_urgent("foo", "bar");
#
# enqueue elements at head of queue and wait for response
#
my $resp = $q->enqueue_urgent_and_wait("foo", "bar");
#
# enqueue elements at head of queue and wait for
# response until $timeout secs
#
my $resp = $q->enqueue_urgent_and_wait_until($timeout, "foo", "bar");
#
# enqueue elements for simplex operation (no response)
# returning the queue object
#
$q->enqueue_simplex("foo", "bar");
$q->enqueue_simplex_urgent("foo", "bar");
#
# dequeue next available element (used in the server),
# waiting indefinitely for an element to be made available
# returns shared arrayref, first element is unique ID,
# which may be undef for simplex requests
#
my $foo = $q->dequeue;
#
# dequeue next available element (used in the server),
# returns undef if no element immediately available
# otherwise, returns shared arrayref, first element is unique ID,
# which may be undef for simplex requests
#
my $foo = $q->dequeue_nb;
#
# dequeue next available element (used in the server),
# returned undef if no element available within $timeout
# seconds; otherwise, returns shared arrayref, first
# element is unique ID, which may be undef for simplex requests
#
my $foo = $q->dequeue_until($timeout);
#
# dequeue next available element (used in the server),
# but only if marked urgent; otherwise, returns undef
#
my $foo = $q->dequeue_urgent();
#
# returns number of items still in queue
#
my $left = $q->pending;
#
# maps a response for the
# queued element identified by $id;
#
$q->respond($id, @list);
#
# tests for a response to the queued
# element identified by $id; returns undef if
# not yet available, else returns the queue object
#
my $result = $q->ready($id);
#
# returns list of available response ID's;
# if list provided, only returns ID's from the list.
# Returns undef if none available.
# In scalar context, returns only first available;
# Else a list of available IDs
#
my @ids = $q->available();
#
# wait for and return the response for the
# specified unique identifier
# (dequeue_response is alias)
#
my $result = $q->wait($id);
my $result = $q->dequeue_response($id);
#
# waits up to $timeout seconds for a response to
# the queued element identified by $id; returns undef if
# not available within $timeout, else returns the queue object
#
my $result = $q->wait_until($id, $timeout);
#
# wait for a response to the queued
# elements listed in @ids, returning a hashref of
# the first available response(s), keyed by id
#
my $result = $q->wait_any(@ids);
#
# wait upto $timeout seconds for a response to
# the queued elements listed in @ids, returning
# a hashref of the first available response(s), keyed by id
# Returns undef if none available in $timeout seconds
#
my $result = $q->wait_any_until($timeout, @ids);
#
# wait for responses to all the queued
# elements listed in @ids, returning a hashref of
# the response(s), keyed by id
#
my $result = $q->wait_all(@ids);
#
# wait upto $timeout seconds for responses to
# all the queued elements listed in @ids, returning
# a hashref of the response(s), keyed by id
# Returns undef if all responses not recv'd
# in $timeout seconds
#
my $result = $q->wait_all_until($timeout, @ids);
#
# mark an existing request
#
$q->mark($id, 'CANCEL');
#
# test if a request is marked
#
print "Marked for cancel!"
if $q->marked($id, 'CANCEL');
#
# cancel specific operations
#
my $result = $q->cancel(@ids);
#
# cancel all operations
#
my $result = $q->cancel_all();
#
# test if specified request has been cancelled
#
my $result = $q->cancelled($id);
#
# (class-level method) wait for an event on
# any of the listed queue objects. Returns a
# list of queues which have events pending
#
my $result = Thread::Queue::Duplex->wait_any(
[ $q1 ], [ $q2, @ids ]);
#
# (class-level method) wait upto $timeout seconds
# for an event on any of the listed queue objects.
# Returns undef if none available in $timeout seconds,
# otherwise, returns a list of queues with events pending
#
my $result = Thread::Queue::Duplex->wait_any_until(
$timeout, [ $q1 ], [ $q2, @ids ]);
#
# (class-level method) wait for events on all the listed
# queue objects. Returns the list of queue objects.
#
my $result = Thread::Queue::Duplex->wait_all(
[ $q1 ], [ $q2, @ids ]);
#
# (class-level method) wait upto $timeout seconds for
# events on all the listed queue objects.
# Returns empty list if all listed queues do not have
# an event in $timeout seconds, otherwise returns
# the list of queues
#
my $result = Thread::Queue::Duplex->wait_all_until(
$timeout, [ $q1 ], [ $q2, @ids ]);
</pre><h1 id="DESCRIPTION">DESCRIPTION <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>
<p>A mapped queue, similar to <a href="http://search.cpan.org/perldoc?Thread%3A%3AQueue">Thread::Queue</a>, except that as elements
are queued, they are assigned unique identifiers, which are used
to identify responses returned from the dequeueing thread. This
class provides a simple RPC-like mechanism between multiple client
and server threads, so that a single server thread can safely
multiplex requests from multiple client threads. <strong>Note</strong> that
simplex versions of the enqueue methods are provided which
do not assign unique identifiers, and are used for requests
to which no response is required/expected.</p>
<p>In addition, elements are inspected as they are enqueued/dequeued to determine
if they are <a href="Queueable.pod.html">Thread::Queue::Queueable</a> (<i>aka TQQ</i>) objects, and, if so,
the onEnqueue() or onDequeue() methods are called to permit any
additional class-specific marshalling/unmarshalling to be performed.
Thread::Queue::Duplex (<i>aka TQD</i>) is itself a
<a href="Queueable.pod.html">Thread::Queue::Queueable</a> object, thus permitting TQD
objects to be passed between threads.</p>
<p><strong>NOTE:</strong> Thread::Queue::Duplex does
<strong>not</strong> perform any default marshalling of complex structures; it is the
responsibility of the application to either</p>
<ul>
<li>use <a href="http://search.cpan.org/perldoc?threads%3A%3Ashared">threads::shared</a> objects for all queued structures</li>
<li>implement its own application specific marshalling via, e.g.,
<a href="http://search.cpan.org/perldoc?Storable">Storable</a></li>
<li>implement a <a href="Queueable.pod.html">Thread::Queue::Queueable</a> wrapper class for
the structure</li>
</ul>
<p>Various <code>wait()</code> methods are provided to permit waiting on individual
responses, any or all of a list of responses, and time-limited waits
for each. Additionally, class-level versions of the <code>wait()</code> methods
are provided to permit a thread to simultaneously wait for either
enqueue or response events on any of a number of queues, or
on objects implementing <a href="TQDContainer.pod.html">Thread::Queue::TQDContainer</a>.</p>
<p>A <code>mark()</code> method is provided to permit out-of-band information
to be applied to pending requests. A responder may test for marks
via the <code>marked()</code> method prior to <code>respond()</code>ing to a request.
An application may specify a mark value, which the responder can
test for; if no explicit mark value is given, the value 1 is used.</p>
<p><code>cancel()</code> and <code>cancel_all()</code> methods are provided to
explicitly cancel one or more requests, and invoke the
<code>onCancel()</code> method of any <a href="Queueable.pod.html">Thread::Queue::Queueable</a> objects
in the request. Cancelling will result in one of</p>
<ul>
<li>marking the request as cancelled if
it has not yet been dequeued (note that it cannot be
spliced from the queue due <code>threads::shared</code>'s lack
of support for array splicing)</li>
<li>removal and discarding of the response from the response map
if the request has already been processed</li>
<li>if the request is in progress, the responder will
detect the cancellation when it attempts to <code>respond()</code>,
and the response will be discarded</li>
</ul>
<p><code>listen()</code> and <code>ignore()</code> methods are provided so that
server-side threads can register/unregister as listeners
on the queue; the constructor accepts a "ListenerRequired"
attribute argument. If set, then any <code>enqueue()</code>
operation will fail and return undef if there are no
registered listeners. This feature provides some safeguard
against "stuck" requestor threads when the responder(s)
have shutdown for some reason. In addition, a <code>wait_for_listener()</code>
method is provided to permit an initiating thread to wait
until another thread registers as a listener.</p>
<p>The constructor also accepts a <code>MaxPending</code> attribute
that specifies the maximum number of requests that may
be pending in the queue before the operation will block.
Note that responses are not counted in this limit.</p>
<p><code>Thread::Queue::Duplex</code> objects encapsulate</p>
<ul>
<li>a shared array, used as the queue (same as <a href="http://search.cpan.org/perldoc?Thread%3A%3AQueue">Thread::Queue</a>)</li>
<li>a shared scalar, used to provide unique identifier sequence
numbers</li>
<li>a shared hash, <i>aka</i> the mapping hash, used to return responses
to enqueued elements, using the generated uniqiue identifier as the hash key</li>
<li>a listener count, incremented each time <code>listen()</code> is called,
decremented each time <code>ignore()</code> is called, and, if
the "listener required" flag has been set on construction, tested
for each <code>enqueue()</code> call.</li>
</ul>
<p>A normal processing sequence for Thread::Queue::Duplex might be:</p>
<pre> #
# Thread A (the client):
#
...marshal parameters for a coroutine...
my $id = $q->enqueue('function_name', \@paramlist);
my $results = $q->dequeue_response($id);
...process $results...
#
# Thread B (the server):
#
while (1) {
my $call = $q->dequeue;
my ($id, $func, @params) = @$call;
$q->respond($id, $self->$func(@params));
}
</pre><h1 id="METHODS">METHODS <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>
<p>Refer to the classdocs for summary and detailed method descriptions.</p>
<h1 id="SEE_ALSO">SEE ALSO <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>
<p><a href="Queueable.pod.html">Thread::Queue::Queueable</a>, <a href="TQDContainer.pod.html">Thread::Queue::TQDContainer</a>,
<a href="http://search.cpan.org/perldoc?threads">threads</a>, <a href="http://search.cpan.org/perldoc?threads%3A%3Ashared">threads::shared</a>, <a href="http://search.cpan.org/perldoc?Thread%3A%3AQueue">Thread::Queue</a>,
<a href="http://search.cpan.org/perldoc?Thread%3A%3AQueue%3A%3AMultiplex">Thread::Queue::Multiplex</a>, <a href="http://search.cpan.org/perldoc?Thread%3A%3AApartment">Thread::Apartment</a></p>
<h1 id="AUTHOR_COPYRIGHT_amp_LICENSE">AUTHOR, COPYRIGHT, & LICENSE <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>
<p>Dean Arnold, Presicient Corp. <a href="#darnold_presicient_com">darnold@presicient.com</a></p>
<p>Copyright(C) 2005,2006, Presicient Corp., USA</p>
<p>Licensed under the Academic Free License version 2.1, as specified in the
License.txt file included in this software package, or at OpenSource.org
<a href="http://www.opensource.org/licenses/afl-2.1.php">http://www.opensource.org/licenses/afl-2.1.php</a>.</p>
</div><div class="footer">generated by <a href="http://search.cpan.org/perldoc?Pod%3A%3AProjectDocs">Pod::ProjectDocs</a></div></body>
</html>