NAME

Net::Async::AMQP::ConnectionManager - handle MQ connections

VERSION

version 0.031

SYNOPSIS

 use IO::Async::Loop;
 use Net::Async::AMQP;
 my $loop = IO::Async::Loop->new;
 $loop->add(
  my $cm = Net::Async::AMQP::ConnectionManager->new
 );
 $cm->add(
   host  => 'localhost',
   user  => 'guest',
   pass  => 'guest',
   vhost => 'vhost',
 );
 $cm->request_channel->then(sub {
   my $ch = shift;
   Future->needs_all(
     $ch->declare_exchange(
       'exchange_name'
     ),
     $ch->declare_queue(
       'queue_name'
     ),
   )->transform(done => sub { $ch })
 })->then(sub {
   my $ch = shift;
   $ch->bind_queue(
     'exchange_name',
	 'queue_name',
	 '*'
   )
 })->get;

DESCRIPTION

Channel management

Each connection has N total available channels, recorded in a hash. The total number of channels per connection is negotiated via the initial AMQP Tune/TuneOk sequence on connection.

We also maintain lists:

  • Unassigned channel - these are channels which were in use and have now been released.

  • Closed channel - any time a channel is closed, the ID is pushed onto this list so we can reopen it later without needing to scan the hash, contains arrayrefs of [$mq_conn, $id]

Highest-assigned ID is also recorded per connection.

if(have unassigned) {
	return shift unassigned
} elsif(have closed) {
	my $closed = shift closed;
	return $closed->{mq}->open_channel($closed->{id})
} elsif(my $next_id = $mq->next_id) {
	return $mq->open_channel($next_id)
} else {

}

Calling methods on the channel proxy will establish a cycle for the duration of the pending request. This cycle will not be resolved until after all the callbacks have completed for a given request.

The channel object does not expose any methods that allow altering QoS or other channel state settings. These must be requested on channel assignment. This does not necessarily mean that any QoS change will require allocation of a new channel.

Bypassing the proxy object to change QoS flags is not recommended.

Connection pool

Connections are established on demand.

METHODS

request_channel

Attempts to assign a channel with the given QoS settings.

Available QoS settings are:

  • prefetch_count - number of messages that can be delivered at a time

  • prefetch_size - total size of messages allowed before acknowledging

  • confirm_mode - explicit publish ack

Confirm mode isn't really QoS but it fits in with the others since it modifies the channel state (and once enabled, cannot be disabled without closing and reopening the channel).

Will resolve to a Net::Async::AMQP::ConnectionManager::Channel instance on success.

can_reopen_channels

A constant which indicates whether we can reopen channels. The AMQP0.9.1 spec doesn't seem to explicitly allow this, but it works with RabbitMQ 3.4.3 (and probably older versions) so it's enabled by default.

channel_retry_count

Returns the channel retry count. The default is 10, call "configure" with undef to retry indefinitely, 0 to avoid retrying at all:

# Keep trying until it works
$mq->configure(channel_retry_count => undef);
# Don't retry at all
$mq->configure(channel_retry_count => 0);

connect_timeout

Returns the current connection timeout. undef/zero means "no timeout".

apply_qos

Set QoS on the given channel.

Expects the Net::Async::AMQP::Channel object as the first parameter, followed by the key/value pairs corresponding to the desired QoS settings:

  • prefetch_count - number of messages that can be delivered before ACK is required

Returns a Future which will resolve to the original Net::Async::AMQP::Channel instance.

request_connection

Attempts to connect to one of the known AMQP servers.

next_host

Returns the next AMQP host.

connect

Attempts a connection to an AMQP host.

mark_connection_full

Indicate that this connection has already allocated all available channels.

key_for_args

Returns a key that represents the given arguments.

on_channel_close

Called when one of our channels has been closed.

release_channel

Releases the given channel back to our channel pool.

connection_valid

Returns true if this connection is one we know about, false if it's closed or otherwise not usable.

add

Adds connection details for an AMQP server to the pool.

exch

release_connection

Releases a connection.

Doesn't really do anything.

INHERITED METHODS

IO::Async::Notifier

add_child, adopt_future, can_event, children, configure_unknown, debug_printf, get_loop, invoke_error, invoke_event, loop, make_event_cb, maybe_invoke_event, maybe_make_event_cb, new, notifier_name, parent, remove_child, remove_from_parent

AUTHOR

Tom Molesworth <cpan@perlsite.co.uk>

LICENSE

Licensed under the same terms as Perl itself, with additional licensing terms for the MQ spec to be found in share/amqp0-9-1.extended.xml ('a worldwide, perpetual, royalty-free, nontransferable, nonexclusive license to (i) copy, display, distribute and implement the Advanced Messaging Queue Protocol ("AMQP") Specification').