NAME

Net::Async::AMQP::Channel - represents a single channel in an MQ connection

VERSION

version 2.000

SYNOPSIS

use IO::Async::Loop;
use Net::Async::AMQP;
my $loop = IO::Async::Loop->new;
$loop->add(my $amqp = Net::Async::AMQP->new);
$amqp->connect(
  host => 'localhost',
  username => 'guest',
  password => 'guest',
)->then(sub {
 shift->open_channel->publish(
  type => 'application/json'
 )
});

DESCRIPTION

Each Net::Async::AMQP::Channel instance represents a virtual channel for communicating with the MQ server.

Channels are layered over the TCP protocol and most of the common AMQP frames operate at channel level - typically you'd connect to the server, open one channel for one-shot requests such as binding/declaring/publishing, and a further channel for every consumer.

Since any error typically results in a closed channel, it's not recommended to have multiple consumers on the same channel if there's any chance the Basic.Consume request will fail.

METHODS

confirm_mode

Switches confirmation mode on for this channel. In confirm mode, all messages must be ACKed explicitly after delivery.

Note that this is an irreversible operation - once confirm mode has been enabled on a channel, closing that channel and reopening is the only way to turn off confirm mode again.

Returns a Future which will resolve with this channel once complete.

$ch->confirm_mode ==> $ch

nowait_from_args

If we have a wait argument, then return the inverse of that.

Otherwise, return zero.

exchange_declare

Declares a new exchange.

Returns a Future which will resolve with this channel once complete.

$ch->exchange_declare(
 exchange   => 'some_exchange',
 type       => 'fanout',
 autodelete => 1,
) ==> $ch

exchange_bind

Binds an exchange to another exchange. This is a RabbitMQ-specific extension.

queue_declare

Returns a Future which will resolve with the new Net::Async::AMQP::Queue instance, the number of messages in the queue, and the number of consumers.

$ch->queue_declare(
 queue      => 'some_queue',
) ==> ($q, $message_count, $consumer_count)

publish

Publishes a message on this channel.

Returns a Future which will resolve with the channel instance once the server has confirmed publishing is complete.

$ch->publish(
 exchange => 'some_exchange',
 routing_key => 'some.rkey.here',
 type => 'some_type',
) ==> $ch

Some named parameters currently accepted - note that this list is likely to expand in future:

  • ack - we default to ACK mode, so set this to 0 to turn off explicit server ACK on message routing/delivery

  • immediate - if set, will cause a failure if the message could not be routed immediately to a consumer

  • mandatory - if set, will require that the message ends up in a queue (i.e. will fail messages sent to an exchange that do not have an appropriate binding)

  • content_type - defaults to application/binary

  • content_encoding - defaults to undef (none)

  • timestamp - the message timestamp, defaults to epoch time

  • expiration - use this to set per-message expiry, see https://www.rabbitmq.com/ttl.html

  • priority - defaults to undef (none), use this to take advantage of RabbitMQ 3.5+ priority support

  • reply_to - which queue to reply to (used for RPC, default undef)

  • correlation_id - unique message ID (used for RPC, default undef)

  • delivery_mode - whether to persist message (default 1, don't persist - set to 2 for persistent, see also "durable" flag for queues)

qos

Changes QOS settings on the channel. Probably most useful for limiting the number of messages that can be delivered to us before we have to ACK/NAK to proceed.

Returns a Future which will resolve with the channel instance once the operation is complete.

$ch->qos(
 prefetch_count => 5,
 prefetch_size  => 1048576,
) ==> $ch

ack

Acknowledge a specific delivery.

Returns a Future which will resolve with the channel instance once the operation is complete.

$ch->ack(
 delivery_tag => 123,
) ==> $ch

nack

Negative acknowledgement for a specific delivery.

Returns a Future which will resolve with the channel instance once the operation is complete.

$ch->nack(
 delivery_tag => 123,
) ==> $ch

reject

Reject a specific delivery.

Returns a Future which will resolve with the channel instance once the operation is complete.

$ch->nack(
 delivery_tag => 123,
) ==> $ch

Example output:

'method_id' => 40,
'reply_code' => 404,
'class_id' => 60,
'reply_text' => 'NOT_FOUND - no exchange \'invalidchan\' in vhost \'vhost\''

on_close

Called when the channel has been closed.

send_frame

Proxy frame sending requests to the parent Net::Async::AMQP instance.

close

Ask the server to close this channel.

Returns a Future which will resolve with the channel instance once the operation is complete.

$ch->close(
 code => 404,
 text => 'something went wrong',
) ==> $ch

push_pending

remove_pending

Removes a coderef from the pending event handler.

Returns $self .

next_pending

Retrieves the next pending handler for the given incoming frame type (see "amqp_frame_type" in Net::Async::AMQP::Utils), and calls it.

Takes the following parameters:

  • $frame - the frame itself

Returns $self.

METHODS - Accessors

amqp

The parent Net::Async::AMQP instance.

bus

Event bus. Used for sharing channel-specific events.

future

The underlying Future for this channel.

Will resolve to the Net::Async::Channel instance once the channel is open.

id

This channel ID.

closed

Returns true if the channel has been closed, 1 if not (which could mean it is either not yet open, or that it is open and has not yet been closed by either side).

closure_protection

Helper method for marking any outstanding requests as failed when the channel closes.

Takes a Future, returns a Future (probably the same one).

INHERITED METHODS

IO::Async::Notifier

add_child, adopt_future, can_event, children, configure_unknown, debug_printf, get_loop, invoke_error, invoke_event, loop, make_event_cb, maybe_invoke_event, maybe_make_event_cb, new, notifier_name, parent, remove_child, remove_from_parent

AUTHOR

Tom Molesworth <TEAM@cpan.org>

LICENSE

Licensed under the same terms as Perl itself, with additional licensing terms for the MQ spec to be found in share/amqp0-9-1.extended.xml ('a worldwide, perpetual, royalty-free, nontransferable, nonexclusive license to (i) copy, display, distribute and implement the Advanced Messaging Queue Protocol ("AMQP") Specification').