NAME

Net::Async::AMQP::Queue - deal with queue-specific functionality

VERSION

version 1.000

METHODS

listen

Starts a consumer on this queue.

$q->listen(
 channel => $ch,
 ack => 1
)->then(sub {
 my ($q, $ctag) = @_;
 print "Queue $q has ctag $ctag\n";
 ...
})

Expects the following named parameters:

  • channel - which channel to listen on

  • ack (optional) - true to enable ACKs

  • consumer_tag (optional) - specific consumer tag

Returns a Future which resolves with ($queue, $consumer_tag) on completion. If this is cancelled before we receive the Basic.ConsumeOk acknowledgement from the server, we'll issue an explicit cancel.

cancel

Cancels the given consumer.

$q->cancel(
 consumer_tag => '...',
)->then(sub {
 my ($q, $ctag) = @_;
 print "Queue $q ctag $ctag cancelled\n";
 ...
})

Expects the following named parameters:

  • consumer_tag (optional) - specific consumer tag

Returns a Future which resolves with ($queue, $consumer_tag) on completion.

consumer

Similar to "listen", but applies the event handlers so you can just provide an on_message callback.

Takes the following extra named parameters:

  • on_message - callback for message handling

  • on_cancel - will be called if the consumer is cancelled (either by the server or client)

For server consumer cancellation notification, you'll need consumer_cancel_notifications:

$mq->connect(
 ...
 client_properties => {
  capabilities => {
   'consumer_cancel_notify' => Net::AMQP::Value->true
  },
 },
)

The on_message callback receives the following named parameters:

  • type

  • payload

  • consumer_tag

  • delivery_tag

  • routing_key

See examples/alternative-consumer.pl for a usage example.

bind_exchange

Binds this queue to an exchange.

$q->bind_exchange(
 channel => $ch,
 exchange => '',
)->then(sub {
 my ($q) = @_;
 print "Queue $q bound to default exchange\n";
 ...
})

Expects the following named parameters:

  • channel - which channel to perform the bind on

  • exchange - the exchange to bind, can be '' for default

  • routing_key (optional) - a routing key for the binding

Returns a Future which resolves with ($queue) on completion.

unbind_exchange

Unbinds this queue from an exchange.

$q->unbind_exchange(
 channel => $ch,
 exchange => '',
)->then(sub {
 my ($q) = @_;
 print "Queue $q unbound from default exchange\n";
 ...
})

Expects the following named parameters:

  • channel - which channel to perform the bind on

  • exchange - the exchange to bind, can be '' for default

  • routing_key (optional) - a routing key for the binding

Returns a Future which resolves with ($queue) on completion.

delete

Deletes this queue.

$q->delete(
 channel => $ch,
)->then(sub {
 my ($q) = @_;
 print "Queue $q deleted\n";
 ...
})

Expects the following named parameters:

  • channel - which channel to perform the bind on

Returns a Future which resolves with ($queue) on completion.

ACCESSORS

These are mostly intended for internal use only.

configure

Applies amqp or future value.

amqp

A weakref to the Net::Async::AMQP instance.

future

A ref to the Future representing the queue readiness.

queue_name

Sets or returns the queue name.

AUTHOR

Tom Molesworth <cpan@perlsite.co.uk>

LICENSE

Licensed under the same terms as Perl itself, with additional licensing terms for the MQ spec to be found in share/amqp0-9-1.extended.xml ('a worldwide, perpetual, royalty-free, nontransferable, nonexclusive license to (i) copy, display, distribute and implement the Advanced Messaging Queue Protocol ("AMQP") Specification').