<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en">
<head>
	<title>Thread::Queue::Duplex</title>
<link rel="stylesheet" type="text/css" href="../../podstyle.css" /><meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
</head>
<body>
<div class="box">
  <h1 class="t1">Thread::Queue::Duplex</h1>
  <table>
    <tr>
      <td class="label">Description</td>
      <td class="cell">Thread-safe request/response queue with identifiable elements</td>
    </tr>
  </table>
</div>
<div class="path">
  <a href="../../index.html">Thread::Queue::Duplex</a> &gt; Package Manuals &gt;
  Thread-Queue-Duplex
</div>
<div>
<a href="../../Thread/Queue/Duplex.html">Classdocs</a>
</div>


<div class="pod">
<!-- INDEX START -->
<h3 id="TOP">Index</h3>
<ul>
	<li><a href="#NAME">NAME</a></li>
	<li><a href="#SYNOPSIS">SYNOPSIS</a></li>
	<li><a href="#DESCRIPTION">DESCRIPTION</a></li>
	<li><a href="#METHODS">METHODS</a></li>
	<li><a href="#SEE_ALSO">SEE ALSO</a></li>
	<li><a href="#AUTHOR_COPYRIGHT_amp_LICENSE">AUTHOR, COPYRIGHT, &amp; LICENSE</a></li>
</ul>
<hr />
<!-- INDEX END -->

<h1 id="NAME">NAME <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>

<p>Thread::Queue::Duplex - thread-safe request/response queue with identifiable elements</p>

<h1 id="SYNOPSIS">SYNOPSIS <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>

<pre>	use Thread::Queue::Duplex;
	#
	#	create new queue, and require that there be
	#	registered listeners for an enqueue operation
	#	to succeed, and limit the max pending requests
	#	to 20
	#
	my $q = Thread::Queue::Duplex-&gt;new(ListenerRequired =&gt; 1, MaxPending =&gt; 20);
	#
	#	register as a listener
	#
	$q-&gt;listen();
	#
	#	unregister as a listener
	#
	$q-&gt;ignore();
	#
	#	wait for a listener to register
	#
	$q-&gt;wait_for_listener($timeout);
	#
	#	change the max pending limit
	#
	$q-&gt;set_max_pending($limit);
	#
	#	enqueue elements, returning a unique queue ID
	#	(used in the client)
	#
	my $id = $q-&gt;enqueue(&quot;foo&quot;, &quot;bar&quot;);
	#
	#	enqueue elements, and wait for a response
	#	(used in the client)
	#
	my $resp = $q-&gt;enqueue_and_wait(&quot;foo&quot;, &quot;bar&quot;);
	#
	#	enqueue elements, and wait for a response
	#	until $timeout secs (used in the client)
	#
	my $resp = $q-&gt;enqueue_and_wait_until($timeout, &quot;foo&quot;, &quot;bar&quot;);
	#
	#	enqueue elements at head of queue, returning a
	#	unique queue ID (used in the client)
	#
	my $id = $q-&gt;enqueue_urgent(&quot;foo&quot;, &quot;bar&quot;);
	#
	#	enqueue elements at head of queue and wait for response
	#
	my $resp = $q-&gt;enqueue_urgent_and_wait(&quot;foo&quot;, &quot;bar&quot;);
	#
	#	enqueue elements at head of queue and wait for
	#	response until $timeout secs
	#
	my $resp = $q-&gt;enqueue_urgent_and_wait_until($timeout, &quot;foo&quot;, &quot;bar&quot;);
	#
	#	enqueue elements for simplex operation (no response)
	#	returning the queue object
	#
	$q-&gt;enqueue_simplex(&quot;foo&quot;, &quot;bar&quot;);

	$q-&gt;enqueue_simplex_urgent(&quot;foo&quot;, &quot;bar&quot;);
	#
	#	dequeue next available element (used in the server),
	#	waiting indefinitely for an element to be made available
	#	returns shared arrayref, first element is unique ID,
	#	which may be undef for simplex requests
	#
	my $foo = $q-&gt;dequeue;
	#
	#	dequeue next available element (used in the server),
	#	returns undef if no element immediately available
	#	otherwise, returns shared arrayref, first element is unique ID,
	#	which may be undef for simplex requests
	#
	my $foo = $q-&gt;dequeue_nb;
	#
	#	dequeue next available element (used in the server),
	#	returned undef if no element available within $timeout
	#	seconds; otherwise, returns shared arrayref, first
	#	element is unique ID, which may be undef for simplex requests
	#
	my $foo = $q-&gt;dequeue_until($timeout);
	#
	#	dequeue next available element (used in the server),
	#	but only if marked urgent; otherwise, returns undef
	#
	my $foo = $q-&gt;dequeue_urgent();
	#
	#	returns number of items still in queue
	#
	my $left = $q-&gt;pending;
	#
	#	maps a response for the
	#	queued element identified by $id;
	#
	$q-&gt;respond($id, @list);
	#
	#	tests for a response to the queued
	#	element identified by $id; returns undef if
	#	not yet available, else returns the queue object
	#
	my $result = $q-&gt;ready($id);
	#
	#	returns list of available response ID's;
	#	if list provided, only returns ID's from the list.
	#	Returns undef if none available.
	#	In scalar context, returns only first available;
	#	Else a list of available IDs
	#
	my @ids = $q-&gt;available();
	#
	#	wait for and return the response for the
	#	specified unique identifier
	#	(dequeue_response is alias)
	#
	my $result = $q-&gt;wait($id);
	my $result = $q-&gt;dequeue_response($id);
	#
	#	waits up to $timeout seconds for a response to
	#	the queued element identified by $id; returns undef if
	#	not available within $timeout, else returns the queue object
	#
	my $result = $q-&gt;wait_until($id, $timeout);
	#
	#	wait for a response to the queued
	#	elements listed in @ids, returning a hashref of
	#	the first available response(s), keyed by id
	#
	my $result = $q-&gt;wait_any(@ids);
	#
	#	wait upto $timeout seconds for a response to
	#	the queued elements listed in @ids, returning
	#	a hashref of the first available response(s), keyed by id
	#	Returns undef if none available in $timeout seconds
	#
	my $result = $q-&gt;wait_any_until($timeout, @ids);
	#
	#	wait for responses to all the queued
	#	elements listed in @ids, returning a hashref of
	#	the response(s), keyed by id
	#
	my $result = $q-&gt;wait_all(@ids);
	#
	#	wait upto $timeout seconds for responses to
	#	all the queued elements listed in @ids, returning
	#	a hashref of the response(s), keyed by id
	#	Returns undef if all responses not recv'd
	#	in $timeout seconds
	#
	my $result = $q-&gt;wait_all_until($timeout, @ids);
	#
	#	mark an existing request
	#
	$q-&gt;mark($id, 'CANCEL');
	#
	#	test if a request is marked
	#
	print &quot;Marked for cancel!&quot;
		if $q-&gt;marked($id, 'CANCEL');
	#
	#	cancel specific operations
	#
	my $result = $q-&gt;cancel(@ids);
	#
	#	cancel all operations
	#
	my $result = $q-&gt;cancel_all();
	#
	#	test if specified request has been cancelled
	#
	my $result = $q-&gt;cancelled($id);
	#
	#	(class-level method) wait for an event on
	#	any of the listed queue objects. Returns a
	#	list of queues which have events pending
	#
	my $result = Thread::Queue::Duplex-&gt;wait_any(
		[ $q1 ], [ $q2, @ids ]);
	#
	#	(class-level method) wait upto $timeout seconds
	#	for an event on any of the listed queue objects.
	#	Returns undef if none available in $timeout seconds,
	#	otherwise, returns a list of queues with events pending
	#
	my $result = Thread::Queue::Duplex-&gt;wait_any_until(
		$timeout, [ $q1 ], [ $q2, @ids ]);
	#
	#	(class-level method) wait for events on all the listed
	#	queue objects. Returns the list of queue objects.
	#
	my $result = Thread::Queue::Duplex-&gt;wait_all(
		[ $q1 ], [ $q2, @ids ]);
	#
	#	(class-level method) wait upto $timeout seconds for
	#	events on all the listed queue objects.
	#	Returns empty list if all listed queues do not have
	#	an event in $timeout seconds, otherwise returns
	#	the list of queues
	#
	my $result = Thread::Queue::Duplex-&gt;wait_all_until(
		$timeout, [ $q1 ], [ $q2, @ids ]);

</pre><h1 id="DESCRIPTION">DESCRIPTION <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>

<p>A mapped queue, similar to <a href="http://search.cpan.org/perldoc?Thread%3A%3AQueue">Thread::Queue</a>, except that as elements
are queued, they are assigned unique identifiers, which are used
to identify responses returned from the dequeueing thread. This
class provides a simple RPC-like mechanism between multiple client
and server threads, so that a single server thread can safely
multiplex requests from multiple client threads. <strong>Note</strong> that
simplex versions of the enqueue methods are provided which
do not assign unique identifiers, and are used for requests
to which no response is required/expected.</p>

<p>In addition, elements are inspected as they are enqueued/dequeued to determine
if they are <a href="Queueable.pod.html">Thread::Queue::Queueable</a> (<i>aka TQQ</i>) objects, and, if so,
the onEnqueue() or onDequeue() methods are called to permit any
additional class-specific marshalling/unmarshalling to be performed.
Thread::Queue::Duplex (<i>aka TQD</i>) is itself a
<a href="Queueable.pod.html">Thread::Queue::Queueable</a> object, thus permitting TQD
objects to be passed between threads.</p>

<p><strong>NOTE:</strong> Thread::Queue::Duplex does
<strong>not</strong> perform any default marshalling of complex structures; it is the
responsibility of the application to either</p>

<ul>
	<li>use <a href="http://search.cpan.org/perldoc?threads%3A%3Ashared">threads::shared</a> objects for all queued structures</li>
	<li>implement its own application specific marshalling via, e.g.,
<a href="http://search.cpan.org/perldoc?Storable">Storable</a></li>
	<li>implement a <a href="Queueable.pod.html">Thread::Queue::Queueable</a> wrapper class for
the structure</li>
</ul>

<p>Various <code>wait()</code> methods are provided to permit waiting on individual
responses, any or all of a list of responses, and time-limited waits
for each. Additionally, class-level versions of the <code>wait()</code> methods
are provided to permit a thread to simultaneously wait for either
enqueue or response events on any of a number of queues, or
on objects implementing <a href="TQDContainer.pod.html">Thread::Queue::TQDContainer</a>.</p>

<p>A <code>mark()</code> method is provided to permit out-of-band information
to be applied to pending requests. A responder may test for marks
via the <code>marked()</code> method prior to <code>respond()</code>ing to a request.
An application may specify a mark value, which the responder can
test for; if no explicit mark value is given, the value 1 is used.</p>

<p><code>cancel()</code> and <code>cancel_all()</code> methods are provided to
explicitly cancel one or more requests, and invoke the
<code>onCancel()</code> method of any <a href="Queueable.pod.html">Thread::Queue::Queueable</a> objects
in the request. Cancelling will result in one of</p>

<ul>
	<li>marking the request as cancelled if
it has not yet been dequeued (note that it cannot be
spliced from the queue due <code>threads::shared</code>'s lack
of support for array splicing)</li>
	<li>removal and discarding of the response from the response map
if the request has already been processed</li>
	<li>if the request is in progress, the responder will
detect the cancellation when it attempts to <code>respond()</code>,
and the response will be discarded</li>
</ul>

<p><code>listen()</code> and <code>ignore()</code> methods are provided so that
server-side threads can register/unregister as listeners
on the queue; the constructor accepts a &quot;ListenerRequired&quot;
attribute argument. If set, then any <code>enqueue()</code>
operation will fail and return undef if there are no
registered listeners. This feature provides some safeguard
against &quot;stuck&quot; requestor threads when the responder(s)
have shutdown for some reason. In addition, a <code>wait_for_listener()</code>
method is provided to permit an initiating thread to wait
until another thread registers as a listener.</p>

<p>The constructor also accepts a <code>MaxPending</code> attribute
that specifies the maximum number of requests that may
be pending in the queue before the operation will block.
Note that responses are not counted in this limit.</p>

<p><code>Thread::Queue::Duplex</code> objects encapsulate</p>

<ul>
	<li>a shared array, used as the queue (same as <a href="http://search.cpan.org/perldoc?Thread%3A%3AQueue">Thread::Queue</a>)</li>
	<li>a shared scalar, used to provide unique identifier sequence
numbers</li>
	<li>a shared hash, <i>aka</i> the mapping hash, used to return responses
to enqueued elements, using the generated uniqiue identifier as the hash key</li>
	<li>a listener count, incremented each time <code>listen()</code> is called,
decremented each time <code>ignore()</code> is called, and, if
the &quot;listener required&quot; flag has been set on construction, tested
for each <code>enqueue()</code> call.</li>
</ul>

<p>A normal processing sequence for Thread::Queue::Duplex might be:</p>

<pre>	#
	#	Thread A (the client):
	#
		...marshal parameters for a coroutine...
		my $id = $q-&gt;enqueue('function_name', \@paramlist);
		my $results = $q-&gt;dequeue_response($id);
		...process $results...
	#
	#	Thread B (the server):
	#
		while (1) {
			my $call = $q-&gt;dequeue;
			my ($id, $func, @params) = @$call;
			$q-&gt;respond($id, $self-&gt;$func(@params));
		}

</pre><h1 id="METHODS">METHODS <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>

<p>Refer to the classdocs for summary and detailed method descriptions.</p>

<h1 id="SEE_ALSO">SEE ALSO <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>

<p><a href="Queueable.pod.html">Thread::Queue::Queueable</a>, <a href="TQDContainer.pod.html">Thread::Queue::TQDContainer</a>,
<a href="http://search.cpan.org/perldoc?threads">threads</a>, <a href="http://search.cpan.org/perldoc?threads%3A%3Ashared">threads::shared</a>, <a href="http://search.cpan.org/perldoc?Thread%3A%3AQueue">Thread::Queue</a>,
<a href="http://search.cpan.org/perldoc?Thread%3A%3AQueue%3A%3AMultiplex">Thread::Queue::Multiplex</a>, <a href="http://search.cpan.org/perldoc?Thread%3A%3AApartment">Thread::Apartment</a></p>

<h1 id="AUTHOR_COPYRIGHT_amp_LICENSE">AUTHOR, COPYRIGHT, &amp; LICENSE <a href="#TOP" class="toplink"><img alt="^" src="../../up.gif" /></a></h1>

<p>Dean Arnold, Presicient Corp. <a href="#darnold_presicient_com">darnold@presicient.com</a></p>

<p>Copyright(C) 2005,2006, Presicient Corp., USA</p>

<p>Licensed under the Academic Free License version 2.1, as specified in the
License.txt file included in this software package, or at OpenSource.org
<a href="http://www.opensource.org/licenses/afl-2.1.php">http://www.opensource.org/licenses/afl-2.1.php</a>.</p>


</div><div class="footer">generated by <a href="http://search.cpan.org/perldoc?Pod%3A%3AProjectDocs">Pod::ProjectDocs</a></div></body>
</html>