NAME
Net::Async::AMQP::ConnectionManager - handle MQ connections
VERSION
version 0.020
SYNOPSIS
use IO::Async::Loop;
use Net::Async::AMQP;
my $loop = IO::Async::Loop->new;
$loop->add(
my $cm = Net::Async::AMQP::ConnectionManager->new
);
$cm->add(
host => 'localhost',
user => 'guest',
pass => 'guest',
vhost => 'vhost',
);
$cm->request_channel->then(sub {
my $ch = shift;
Future->needs_all(
$ch->declare_exchange(
'exchange_name'
),
$ch->declare_queue(
'queue_name'
),
)->transform(done => sub { $ch })
})->then(sub {
my $ch = shift;
$ch->bind_queue(
'exchange_name',
'queue_name',
'*'
)
})->get;
DESCRIPTION
Channel management
Each connection has N total available channels, recorded in a hash. The total number of channels per connection is negotiated via the initial AMQP Tune/TuneOk sequence on connection.
We also maintain lists:
Unassigned channel - these are channels which were in use and have now been released.
Closed channel - any time a channel is closed, the ID is pushed onto this list so we can reopen it later without needing to scan the hash, contains arrayrefs of [$mq_conn, $id]
Highest-assigned ID is also recorded per connection.
if(have unassigned) {
return shift unassigned
} elsif(have closed) {
my $closed = shift closed;
return $closed->{mq}->open_channel($closed->{id})
} elsif(my $next_id = $mq->next_id) {
return $mq->open_channel($next_id)
} else {
}
Calling methods on the channel proxy will establish a cycle for the duration of the pending request. This cycle will not be resolved until after all the callbacks have completed for a given request.
The channel object does not expose any methods that allow altering QoS or other channel state settings. These must be requested on channel assignment. This does not necessarily mean that any QoS change will require allocation of a new channel.
Bypassing the proxy object to change QoS flags is not recommended.
Connection pool
Connections are established on demand.
METHODS
request_channel
Attempts to assign a channel with the given QoS settings.
Available QoS settings are:
prefetch_count - number of messages that can be delivered at a time
prefetch_size - total size of messages allowed before acknowledging
confirm_mode - explicit publish ack
Confirm mode isn't really QoS but it fits in with the others since it modifies the channel state (and once enabled, cannot be disabled without closing and reopening the channel).
Will resolve to a Net::Async::AMQP::ConnectionManager::Channel instance on success.
apply_qos
Set QoS on the given channel.
Expects the Net::Async::AMQP::Channel object as the first parameter, followed by the key/value pairs corresponding to the desired QoS settings:
prefetch_count - number of messages that can be delivered before ACK is required
Returns a Future which will resolve to the original Net::Async::AMQP::Channel instance.
request_connection
Attempts to connect to one of the known AMQP servers.
next_host
Returns the next AMQP host.
connect
Attempts a connection to an AMQP host.
mark_connection_full
Indicate that this connection has already allocated all available channels.
key_for_args
Returns a key that represents the given arguments.
on_channel_close
Called when one of our channels has been closed.
release_channel
Releases the given channel back to our channel pool.
add
Adds connection details for an AMQP server to the pool.
exch
release_connection
Releases a connection.
Doesn't really do anything.
INHERITED METHODS
- IO::Async::Notifier
-
add_child, adopt_future, can_event, children, configure, configure_unknown, debug_printf, get_loop, invoke_error, invoke_event, loop, make_event_cb, maybe_invoke_event, maybe_make_event_cb, new, notifier_name, parent, remove_child, remove_from_parent
AUTHOR
Tom Molesworth <cpan@perlsite.co.uk>
LICENSE
Licensed under the same terms as Perl itself, with additional licensing terms for the MQ spec to be found in share/amqp0-9-1.extended.xml
('a worldwide, perpetual, royalty-free, nontransferable, nonexclusive license to (i) copy, display, distribute and implement the Advanced Messaging Queue Protocol ("AMQP") Specification').