NAME
Net::Async::AMQP::Channel - represents a single channel in an MQ connection
VERSION
version 2.000
SYNOPSIS
use IO::Async::Loop;
use Net::Async::AMQP;
my $loop = IO::Async::Loop->new;
$loop->add(my $amqp = Net::Async::AMQP->new);
$amqp->connect(
host => 'localhost',
username => 'guest',
password => 'guest',
)->then(sub {
shift->open_channel->publish(
type => 'application/json'
)
});
DESCRIPTION
Each Net::Async::AMQP::Channel instance represents a virtual channel for communicating with the MQ server.
Channels are layered over the TCP protocol and most of the common AMQP frames operate at channel level - typically you'd connect to the server, open one channel for one-shot requests such as binding/declaring/publishing, and a further channel for every consumer.
Since any error typically results in a closed channel, it's not recommended to have multiple consumers on the same channel if there's any chance the Basic.Consume request will fail.
METHODS
confirm_mode
Switches confirmation mode on for this channel. In confirm mode, all messages must be ACKed explicitly after delivery.
Note that this is an irreversible operation - once confirm mode has been enabled on a channel, closing that channel and reopening is the only way to turn off confirm mode again.
Returns a Future which will resolve with this channel once complete.
$ch->confirm_mode ==> $ch
nowait_from_args
If we have a wait
argument, then return the inverse of that.
Otherwise, return zero.
exchange_declare
Declares a new exchange.
Returns a Future which will resolve with this channel once complete.
$ch->exchange_declare(
exchange => 'some_exchange',
type => 'fanout',
autodelete => 1,
) ==> $ch
exchange_bind
Binds an exchange to another exchange. This is a RabbitMQ-specific extension.
queue_declare
Returns a Future which will resolve with the new Net::Async::AMQP::Queue instance, the number of messages in the queue, and the number of consumers.
$ch->queue_declare(
queue => 'some_queue',
) ==> ($q, $message_count, $consumer_count)
publish
Publishes a message on this channel.
Returns a Future which will resolve with the channel instance once the server has confirmed publishing is complete.
$ch->publish(
exchange => 'some_exchange',
routing_key => 'some.rkey.here',
type => 'some_type',
) ==> $ch
Some named parameters currently accepted - note that this list is likely to expand in future:
ack - we default to ACK mode, so set this to 0 to turn off explicit server ACK on message routing/delivery
immediate - if set, will cause a failure if the message could not be routed immediately to a consumer
mandatory - if set, will require that the message ends up in a queue (i.e. will fail messages sent to an exchange that do not have an appropriate binding)
content_type - defaults to application/binary
content_encoding - defaults to undef (none)
timestamp - the message timestamp, defaults to epoch time
expiration - use this to set per-message expiry, see https://www.rabbitmq.com/ttl.html
priority - defaults to undef (none), use this to take advantage of RabbitMQ 3.5+ priority support
reply_to - which queue to reply to (used for RPC, default undef)
correlation_id - unique message ID (used for RPC, default undef)
delivery_mode - whether to persist message (default 1, don't persist - set to 2 for persistent, see also "durable" flag for queues)
qos
Changes QOS settings on the channel. Probably most useful for limiting the number of messages that can be delivered to us before we have to ACK/NAK to proceed.
Returns a Future which will resolve with the channel instance once the operation is complete.
$ch->qos(
prefetch_count => 5,
prefetch_size => 1048576,
) ==> $ch
ack
Acknowledge a specific delivery.
Returns a Future which will resolve with the channel instance once the operation is complete.
$ch->ack(
delivery_tag => 123,
) ==> $ch
nack
Negative acknowledgement for a specific delivery.
Returns a Future which will resolve with the channel instance once the operation is complete.
$ch->nack(
delivery_tag => 123,
) ==> $ch
reject
Reject a specific delivery.
Returns a Future which will resolve with the channel instance once the operation is complete.
$ch->nack(
delivery_tag => 123,
) ==> $ch
Example output:
'method_id' => 40,
'reply_code' => 404,
'class_id' => 60,
'reply_text' => 'NOT_FOUND - no exchange \'invalidchan\' in vhost \'vhost\''
on_close
Called when the channel has been closed.
send_frame
Proxy frame sending requests to the parent Net::Async::AMQP instance.
close
Ask the server to close this channel.
Returns a Future which will resolve with the channel instance once the operation is complete.
$ch->close(
code => 404,
text => 'something went wrong',
) ==> $ch
push_pending
remove_pending
Removes a coderef from the pending event handler.
Returns $self
.
next_pending
Retrieves the next pending handler for the given incoming frame type (see "amqp_frame_type" in Net::Async::AMQP::Utils), and calls it.
Takes the following parameters:
$frame - the frame itself
Returns $self.
METHODS - Accessors
amqp
The parent Net::Async::AMQP instance.
bus
Event bus. Used for sharing channel-specific events.
future
The underlying Future for this channel.
Will resolve to the Net::Async::Channel instance once the channel is open.
id
This channel ID.
closed
Returns true if the channel has been closed, 1 if not (which could mean it is either not yet open, or that it is open and has not yet been closed by either side).
closure_protection
Helper method for marking any outstanding requests as failed when the channel closes.
Takes a Future, returns a Future (probably the same one).
INHERITED METHODS
- IO::Async::Notifier
-
add_child, adopt_future, can_event, children, configure_unknown, debug_printf, get_loop, invoke_error, invoke_event, loop, make_event_cb, maybe_invoke_event, maybe_make_event_cb, new, notifier_name, parent, remove_child, remove_from_parent
AUTHOR
Tom Molesworth <TEAM@cpan.org>
LICENSE
Licensed under the same terms as Perl itself, with additional licensing terms for the MQ spec to be found in share/amqp0-9-1.extended.xml
('a worldwide, perpetual, royalty-free, nontransferable, nonexclusive license to (i) copy, display, distribute and implement the Advanced Messaging Queue Protocol ("AMQP") Specification').