NAME

Thread::Queue::Duplex - thread-safe request/response queue with identifiable elements

SYNOPSIS

use Thread::Queue::Duplex;
#
#	create new queue, and require that there be
#	registered listeners for an enqueue operation
#	to succeed, and limit the max pending requests
#	to 20
#
my $q = Thread::Queue::Duplex->new(ListenerRequired => 1, MaxPending => 20);
#
#	register as a listener
#
$q->listen();
#
#	unregister as a listener
#
$q->ignore();
#
#	wait for a listener to register
#
$q->wait_for_listener($timeout);
#
#	change the max pending limit
#
$q->set_max_pending($limit);
#
#	enqueue elements, returning a unique queue ID
#	(used in the client)
#
my $id = $q->enqueue("foo", "bar");
#
#	enqueue elements, and wait for a response
#	(used in the client)
#
my $resp = $q->enqueue_and_wait("foo", "bar");
#
#	enqueue elements, and wait for a response
#	until $timeout secs (used in the client)
#
my $resp = $q->enqueue_and_wait_until($timeout, "foo", "bar");
#
#	enqueue elements at head of queue, returning a 
#	unique queue ID (used in the client)
#
my $id = $q->enqueue_urgent("foo", "bar");
#
#	enqueue elements at head of queue and wait for response
#
my $resp = $q->enqueue_urgent_and_wait("foo", "bar");
#
#	enqueue elements at head of queue and wait for 
#	response until $timeout secs
#
my $resp = $q->enqueue_urgent_and_wait_until($timeout, "foo", "bar");
#
#	enqueue elements for simplex operation (no response)
#	returning the queue object
#
$q->enqueue_simplex("foo", "bar");

$q->enqueue_simplex_urgent("foo", "bar");
#
#	dequeue next available element (used in the server),
#	waiting indefinitely for an element to be made available
#	returns shared arrayref, first element is unique ID,
#	which may be undef for simplex requests
#
my $foo = $q->dequeue;
#
#	dequeue next available element (used in the server),
#	returns undef if no element immediately available
#	otherwise, returns shared arrayref, first element is unique ID,
#	which may be undef for simplex requests
#
my $foo = $q->dequeue_nb;
#
#	dequeue next available element (used in the server),
#	returned undef if no element available within $timeout 
#	seconds; otherwise, returns shared arrayref, first 
#	element is unique ID, which may be undef for simplex requests
#
my $foo = $q->dequeue_until($timeout);
#
#	dequeue next available element (used in the server),
#	but only if marked urgent; otherwise, returns undef
#
my $foo = $q->dequeue_urgent();
#
#	returns number of items still in queue
#
my $left = $q->pending;
#
#	maps a response for the
#	queued element identified by $id;
#
$q->respond($id, @list);
#
#	tests for a response to the queued 
#	element identified by $id; returns undef if
#	not yet available, else returns the queue object
#
my $result = $q->ready($id);
#
#	returns list of available response ID's;
#	if list provided, only returns ID's from the list.
#	Returns undef if none available.
#	In scalar context, returns only first available;
#	Else a list of available IDs
#
my @ids = $q->available();
#
#	wait for and return the response for the 
#	specified unique identifier
#	(dequeue_response is alias)
#
my $result = $q->wait($id);
my $result = $q->dequeue_response($id);
#
#	waits up to $timeout seconds for a response to 
#	the queued element identified by $id; returns undef if
#	not available within $timeout, else returns the queue object
#
my $result = $q->wait_until($id, $timeout);
#
#	wait for a response to the queued 
#	elements listed in @ids, returning a hashref of
#	the first available response(s), keyed by id
#
my $result = $q->wait_any(@ids);
#
#	wait upto $timeout seconds for a response to 
#	the queued elements listed in @ids, returning 
#	a hashref of the first available response(s), keyed by id
#	Returns undef if none available in $timeout seconds
#
my $result = $q->wait_any_until($timeout, @ids);
#
#	wait for responses to all the queued 
#	elements listed in @ids, returning a hashref of
#	the response(s), keyed by id
#
my $result = $q->wait_all(@ids);
#
#	wait upto $timeout seconds for responses to 
#	all the queued elements listed in @ids, returning 
#	a hashref of the response(s), keyed by id
#	Returns undef if all responses not recv'd 
#	in $timeout seconds
#
my $result = $q->wait_all_until($timeout, @ids);
#
#	mark an existing request
#
$q->mark($id, 'CANCEL');
#
#	test if a request is marked
#
print "Marked for cancel!"
	if $q->marked($id, 'CANCEL');
#
#	cancel specific operations
#
my $result = $q->cancel(@ids);
#
#	cancel all operations
#
my $result = $q->cancel_all();
#
#	test if specified request has been cancelled
#
my $result = $q->cancelled($id);
#
#	(class-level method) wait for an event on
#	any of the listed queue objects. Returns a
#	list of queues which have events pending
#
my $result = Thread::Queue::Duplex->wait_any( 
	[ $q1 ], [ $q2, @ids ]);
#
#	(class-level method) wait upto $timeout seconds 
#	for an event on any of the listed queue objects.
#	Returns undef if none available in $timeout seconds,
#	otherwise, returns a list of queues with events pending
#
my $result = Thread::Queue::Duplex->wait_any_until(
	$timeout, [ $q1 ], [ $q2, @ids ]);
#
#	(class-level method) wait for events on all the listed
#	queue objects. Returns the list of queue objects.
#
my $result = Thread::Queue::Duplex->wait_all( 
	[ $q1 ], [ $q2, @ids ]);
#
#	(class-level method) wait upto $timeout seconds for 
#	events on all the listed queue objects.
#	Returns empty list if all listed queues do not have
#	an event in $timeout seconds, otherwise returns
#	the list of queues
#
my $result = Thread::Queue::Duplex->wait_all_until(
	$timeout, [ $q1 ], [ $q2, @ids ]);

DESCRIPTION

A mapped queue, similar to Thread::Queue, except that as elements are queued, they are assigned unique identifiers, which are used to identify responses returned from the dequeueing thread. This class provides a simple RPC-like mechanism between multiple client and server threads, so that a single server thread can safely multiplex requests from multiple client threads. Note that simplex versions of the enqueue methods are provided which do not assign unique identifiers, and are used for requests to which no response is required/expected.

In addition, elements are inspected as they are enqueued/dequeued to determine if they are Thread::Queue::Queueable (aka TQQ) objects, and, if so, the onEnqueue() or onDequeue() methods are called to permit any additional class-specific marshalling/unmarshalling to be performed. NOTE: Thread::Queue::Duplex (aka TQD) is itself a Thread::Queue::Queueable object, thus permitting TQD objects to be passed between threads.

Various wait() methods are provided to permit waiting on individual responses, any or all of a list of responses, and time-limited waits for each. Additionally, class-level versions of the wait() methods are provided to permit a thread to simultaneously wait for either enqueue or response events on any of a number of queues.

A mark() method is provided to permit out-of-band information to be applied to pending requests. A responder may test for marks via the marked() method prior to respond()ing to a request. An application may specify a mark value, which the responder can test for; if no explicit mark value is given, the value 1 is used.

cancel() and cancel_all() methods are provided to explicitly cancel one or more requests, and invoke the onCancel() method of any Thread::Queue::Queueable objects in the request. Cancelling will result in one of

marking the request as cancelled if it has not yet been dequeued (note that it cannot be spliced from the queue due threads::shared's lack of support for array splicing)
removal and discarding of the response from the response map if the request has already been processed
if the request is in progress, the responder will detect the cancellation when it attempts to respond(), and the response will be discarded

listen() and ignore() methods are provided so that server-side threads can register/unregister as listeners on the queue; the constructor accepts a "ListenerRequired" attribute argument. If set, then any enqueue() operation will fail and return undef if there are no registered listeners. This feature provides some safeguard against "stuck" requestor threads when the responder(s) have shutdown for some reason. In addition, a wait_for_listener() method is provided to permit an initiating thread to wait until another thread registers as a listener.

The constructor also accepts a MaxPending attribute that specifies the maximum number of requests that may be pending in the queue before the operation will block. Note that responses are not counted in this limit.

Thread::Queue::Duplex objects encapsulate

a shared array, used as the queue (same as Thread::Queue)
a shared scalar, used to provide unique identifier sequence numbers
a shared hash, aka the mapping hash, used to return responses to enqueued elements, using the generated uniqiue identifier as the hash key
a listener count, incremented each time listen() is called, decremented each time ignore() is called, and, if the "listener required" flag has been set on construction, tested for each enqueue() call.

A normal processing sequence for Thread::Queue::Duplex might be:

#
#	Thread A (the client):
#
	...marshal parameters for a coroutine...
	my $id = $q->enqueue('function_name', \@paramlist);
	my $results = $q->dequeue_response($id);
	...process $results...
#
#	Thread B (the server):
#
	while (1) {
		my $call = $q->dequeue;
		my ($id, $func, @params) = @$call;
		$q->respond($id, $self->$func(@params));
	}

FUNCTIONS AND METHODS

new([ListenerRequired => $val, MaxPending => $limit])

The new function creates a new empty queue, and associated mapping hash. If a "true" ListenerRequired value is provided, then all enqueue operations require that at least one thread has registered as a listener via listen(). If the MaxPending value is a non-zero value, the number of pending requests will be limited to $limit, and any further attempt to queue a request will block until the pending count drops below $limit. This limit may be applied or modified later via the set_max_pending() method (see below).

enqueue(@request)

Creates a shared array, pushes a unique identifier onto the shared array, then pushes the LIST onto the array, then pushes the shared arrayref onto the queue.

enqueue_and_wait(@request)

Same as enqueue, except that it waits for and returns the response, rather than returning immediately with the request ID.

enqueue_and_wait_until($timeout, @request)

Same as enqueue, except that it waits up to $timeout seconds for a response, returning the response, rather than returning immediately with the request ID.

enqueue_urgent(@request)

Same as enqueue, but adds the element to head of queue, rather than tail.

enqueue_urgent_and_wait(@request)

Same as enqueue_and_wait, but adds the element to head of queue, rather than tail.

enqueue_urgent_and_wait_until($timeout, @request)

Same as enqueue_and_wait_until, but adds the element to head of queue, rather than tail.

enqueue_simplex(@request)

Same as enqueue, but does not allocate an identifier, nor expect a response.

enqueue_simplex_urgent(@request)

Same as enqueue_simplex, but adds the element to head of queue, rather than tail.

dequeue

Waits indefinitely for an element to become available in the queue, then removes and returns it.

dequeue_nb

The dequeue_nb method is identical to dequeue(), except it will return undef immediately if there are no elements currently in the queue.

dequeue_until

Identical to dequeue(), except it accepts a $timeout parameter specifying a duration (in seconds) to wait for an available element. If no element is available within the $timeout, it returns undef.

dequeue_urgent

Identical to dequeue_nb(), except it only returns the next available element if it has been queued via either enqueue_urgent() or enqueue_simplex_urgent(). Useful for servers which poll for events, e.g., for external aborts of long-running operations.

pending

Returns the number of items still in the queue.

set_max_pending($limit)

Set the maximum number of requests that may be queued without blocking the requestor.

respond($id [, LIST ])

Creates a new element in the mapping hash, keyed by $id, with a value set to a shared arrayref containing LIST. If $id is undef, the operation is silently ignored (in order to gracefully support simplex requests).

ready($id)

Tests for a response to a uniquely identified previously enqueue'd LIST. Returns undef if no response is available, otherwise returns the Thread::Queue::Duplex object.

available([@ids])

Returns list of available response ID's; if @ids provided, only returns ID's from the list. Returns undef if none available; in scalar context, returns only first available; else a returns a list of available IDs.

wait($id) aka dequeue_response($id)

Waits indefinitely for a response to a uniquely identified previously enqueue'd LIST. Returns the returned result.

wait_until($id, $timeout)

Waits up to $timeout seconds for a response to to a uniquely identified previously enqueue'd LIST. Returns undef if no response is available in the specified $timeout duration, otherwise, returns the result.

wait_any(@ids) [instance method form]

Wait indefinitely for a response to any of the previously enqueue'd elements specified in the the supplied @ids. Returns a hashref of available responses keyed by their identifiers

wait_any_until($timeout, @ids) [instance method form]

Wait upto $timeout seconds for a response to any of the previously enqueue'd elements specified in the the supplied @ids. Returns a hashref of available responses keyed by their identifiers, or undef if none available within $timeout seconds.

wait_all(@ids) [instance method form]

Wait indefinitely for a response to all the previously enqueue'd elements specified in the supplied @ids. Returns a hashref of responses keyed by their identifiers.

wait_all_until($timeout, @ids) [instance method form]

Wait upto $timeout seconds for a response to all the previously enqueue'd elements specified in the supplied @ids. Returns a hashref of responses keyed by their identifiers, or undef if all responses are not available within $timeout seconds.

wait_any(@queue_refs) [class method form]

Wait indefinitely for an event on any of the listed queue objects. Returns a list of queues which have events pending. @queue_refs elements may be either TQD objects, or arrayrefs whose first element is a TQD object, and the remaining elements are queue'd element identifiers. For bare TQD elements and arrayref elements with no identifiers, wait_any waits for an enqueue event on the queue; otherwise, it waits for a response event for any of the specified identifiers.

NOTE:In order for the class method form of these wait_XXX() functions to behave properly, the "main" application should "use Thread::Queue::Duplex;" in order to install the class-level shared variable used for signalling events across all TQD instances. Failure to do so could cause a segregation of TQD objects created in threads descended from different parent threads (due to the perl interpretter cloning when threads are created).

Also note that only enqueue and response events are detected; cancel events are not reported by these class methods.

Finally, note that there is no guarantee that the queue objects returned by the class-level wait() methods will still have events pending on them when they are returned, since multiple threads may be notified of an event on the same queue, but one thread may have handled the event before the other thread(s).

wait_any_until($timeout, @queue_refs) [class method form]

Wait upto $timeout seconds for an event on any of the listed queue objects. Returns undef if none available in $timeout seconds, otherwise, returns a list of queues with events pending. @queue_refs is the same as for wait_any().

wait_all(@queue_refs) [class method form]

Wait indefinitely for events on all the listed queue objects. Returns the list of queue objects.

wait_all_until($timeout, @queue_refs) [class method form]

Wait upto $timeout seconds for events on all the listed queue objects. Returns an empty list if all listed queues do not have an event in $timeout seconds, otherwise returns the list of queue objects.

mark($id [ , $value ])

Marks the specified request with the given value. If the request has not been processed, it will be marked with the specified $value; if no $value is given, the mark is set to 1. The request may later be tested for a given mark via the marked($id [, $value ]) method. This is useful for out-of-band tagging of requests, e.g., to cancel a request but retaining a response for the request.

Note that $value' are not cumulative, i.e., if multiple marks are applied, only the most recent $value is retained.

marked($id [, $value ])

Tests if the specified request has been marked with the given $value. If no $value is specified, simply tests if the request is marked. Returns true or false.

get_mark($id)

Returns the current mark value of a request, if any.

cancel(@ids)

Cancels all the requests identified in @ids. If a response to a cancelled request has already been posted to the queue response map (i.e., the request has already been serviced), the response is removed from the map, the onCancel() method is invoked on each Thread::Queue::Queueable object in the response, and the response is discarded.

If a response to a cancelled request has not yet been posted to the queue response map, an empty entry is added to the queue response map. (Note: threads::shared doesn't permit splicing shared arrays yet, so we can't remove the request from the queue).

When a server thread attempts to dequeue[_nb|_until]() a cancelled request, the request is discarded and the dequeue operation is retried. If the cancelled request is already dequeued, the server thread will detect the cancellation when it attempts to respond() to the request, and will invoke the onCancel() method on any Thread::Queue::Queueable objects in the response, and then discards the response.

Note that, simplex requests do not have an identifier, there is no way to explicitly cancel a specific simplex request.

cancel_all()

Cancels all current requests and responses, using the cancel() algorithm above, plus cancels all simplex requests still in the queue.

Note: In-progress requests (i.e., request which have been removed from the queue, but do not yet have an entry in the response map) will not be cancelled.

SEE ALSO

Thread::Queue::Queueable, threads, threads::shared, Thread::Queue

AUTHOR, COPYRIGHT, & LICENSE

Dean Arnold, Presicient Corp. darnold@presicient.com

Copyright(C) 2005, Presicient Corp., USA

Permission is granted to use this software under the same terms as Perl itself. Refer to the Perl Artistic License for details.