Net::Async::AMQP - provides client interface to AMQP using IO::Async


version 0.031


use IO::Async::Loop;
use Net::Async::AMQP;
my $loop = IO::Async::Loop->new;
$loop->add(my $amqp = Net::Async::AMQP->new);
  host => 'localhost',
  user => 'guest',
  pass => 'guest',


Does AMQP things. Note that the API may change before the stable 1.000 release - "SEE ALSO" has some alternative modules if you're looking for something that has been around for longer.

If you want a higher-level API which manages channels and connections, try Net::Async::AMQP::ConnectionManager.

AMQP support

The following AMQP features are supported:

  • Queue declare, bind, delete

  • Exchange declare, delete

  • Consumer setup and cancellation

  • Message publishing

  • Explicit ACK

  • QoS

  • SSL

RabbitMQ-specific features

RabbitMQ provides some additional features:

  • Exchange-to-exchange binding

  • Server flow control notification

  • Consumer cancellation notification

Missing features

The following features aren't currently implemented - raise a request via RT or by email ("AUTHOR") if you want any of these:

  • Transactions

  • Flow control

  • SASL auth



Defines the mechanism used for authentication. Currently only AMQPLAIN is supported.


Length of header used in payload messages. Defined by the AMQP standard as 8 bytes.


Largest amount of data we'll attempt to send in a single frame. Actual frame limit will be negotiated with the remote server. Defaults to 262144.


Maximum number of channels to request. Defaults to the AMQP limit (65535). Attempting to set this any higher will not end well, it's an unsigned 16-bit value.


Interval in seconds between heartbeat frames, zero to disable. Can be overridden by PERL_AMQP_HEARTBEAT_INTERVAL in the environment, default is 0 (disabled).



This defines the path to the AMQP XML spec, which Net::AMQP uses to create methods and handlers for the appropriate version of the MQ protocol.

Defaults to an extended version of the 0.9.1 protocol as used by RabbitMQ, this is found in the amqp0-9-1.extended.xml distribution sharedir (see File::ShareDir).

Normally, you should be able to ignore this. If you want to load an alternative spec, note that (a) this is global, rather than per-instance, (b) it needs to be set before you use this module.

BEGIN { $Net::Async::AMQP::XML_SPEC = '/tmp/amqp.xml' }
use Net::Async::AMQP;

Once loaded, this module will not attempt to apply the spec again.


The default parameters to use for "connect". Changing these values is permitted, but do not attempt to delete from or add any entries to the hash.

Passing parameters directly to "connect" is much safer, please do that instead.



Set up variables. Takes the following optional named parameters:

  • heartbeat_interval - (optional) interval between heartbeat messages, default is set by the "HEARTBEAT_INTERVAL" constant

  • max_channels - how many channels to allow on this connection, default is defined by the "MAX_CHANNELS" constant

Returns the new instance.


Event bus. Used for sharing global events such as connection closure.


Takes the following parameters:

  • port - the AMQP port, defaults to 5672, can be a service name if preferred

  • host - host to connect to, defaults to localhost

  • local_host - our local IP to connect from

  • user - which user to connect as, defaults to guest

  • pass - the password for this user, defaults to guest

  • ssl - true if you want to connect over SSL

  • SSL_* - SSL-specific parameters, see IO::Async::SSL and IO::Socket::SSL for details

Returns $self.


Called once the underlying TCP connection has been established.

Returns nothing of importance.


Called whenever there's data available to be read.


Called when the TCP connection is closed.


Sends initial startup header and applies listener for the Connection::Start message.

Returns $self.


Applies listener for the Connection::Tune message, used for determining max frame size and heartbeat settings.

Returns $self.


Establish a new connection to a vhost - this is called after tuning is complete, and must happen before any channel connections are attempted.

Returns $self.


Applies listener for the Connection::OpenOk message, which triggers the connected event.

Returns $self.


Returns the next available channel ready for "open_channel". Note that whatever it reports will be completely wrong if you've manually specified a channel anywhere, so don't do that.

If channels have been closed on this connection, those IDs will be reused in preference to handing out a new ID.


Returns a new ::Channel instance, populating the map of assigned channels in the process. Takes a single parameter:


Opens a new channel.

Returns the new Net::Async::AMQP::Channel instance.


Close the connection.

Returns a Future which will resolve with $self when the connection is closed.



Retrieves the next pending handler for the given incoming frame type (see "amqp_frame_type" in Net::Async::AMQP::Utils), and calls it.

Takes the following parameters:

  • $type - the frame type, such as 'Basic::ConnectOk'

  • $frame - the frame itself

Returns $self.

METHODS - Accessors


The current host.


Virtual host.


Port number. Usually 5672.


MQ user.


Maximum number of bytes allowed in any given frame. This is the value negotiated with the remote server.


Maximum number of channels. This is whatever we ended up with after initial negotiation.


Timestamp of the last frame we received from the remote. Used for handling heartbeats.


Returns the current IO::Async::Stream for the AMQP connection.


Future for the current incoming message (received in two or more parts: the header then all body chunks).

METHODS - Internal

The following methods are intended for internal use. They are documented for completeness but should not normally be needed outside this library.


Current maximum interval between frames.


How many times we allow the remote to miss the frame-sending deadline in a row before we give up and close the connection. Defined by the protocol, should be 3x heartbeats.


Enable both heartbeat timers.


Resets our side of the heartbeat timer.

This is used to ensure we send data at least once every "heartbeat_interval" seconds.


Timer for tracking frames we've received.


Timer for tracking when we're due to send out something.


Called when heartbeats are enabled and we've had no response from the server for 3 heartbeat intervals (see "missed_heartbeats_allowed"). We'd expect some frame from the remote - even if just a heartbeat frame - at least once every heartbeat interval so if this triggers then we're likely dealing with a dead or heavily loaded server.

This will invoke the "heartbeat_failure event" then close the connection.


Sends the heartbeat frame.


Adds the given handler(s) to the pending handler list for the given type(s).

Takes one or more of the following parameter pairs:

Returns $self .


Removes a coderef from the pending event handler.

Returns $self .


Writes data to the server.

Returns a Future which will resolve to an empty list when done.


Process a single incoming frame.

Takes the following parameters:

Returns $self.


Splits a message into separate frames.

Takes the $payload as a scalar containing byte data, and the following parameters:

  • exchange - where we're sending the message

  • routing_key - other part of message destination

Additionally, the following headers can be passed:

  • content_type

  • content_encoding

  • headers

  • delivery_mode

  • priority

  • correlation_id

  • reply_to

  • expiration

  • message_id

  • timestamp

  • type

  • user_id

  • app_id

  • cluster_id

Returns list of frames suitable for passing to "send_frame".


Send a single frame.

Takes the $frame instance followed by these optional named parameters:

  • channel - which channel we should send on

Returns a Future which will resolve to an empty list when the frame has been written (this does not guarantee that the server has received it).


Byte string representing the header bytes we should send on initial TCP connect. Net::AMQP uses AMQP\x01\x01\x09\x01, which does not appear to comply with AMQP 0.9.1 section 4.2.2.


Returns a new IO::Async::Future instance.

Supports optional named parameters for setting label etc.


The following events may be raised by this class - use "subscribe_to_event" in Mixin::Event::Dispatch to watch for them:

   heartbeat_failure => sub {
	 my ($ev, $last) = @_;
	 print "Heartbeat failure detected\n";

connected event

Called after the connection has been opened.

close event

Called after the remote has closed the connection.

heartbeat_failure event

Raised if we receive no data from the remote for more than 3 heartbeat intervals and heartbeats are enabled,

unexpected_frame event

If we receive an unsolicited frame from the server this event will be raised:

 unexpected_frame => sub {
  my ($ev, $type, $frame) = @_;
  warn "Frame type $type received: $frame\n";




add_child, adopt_future, can_event, children, configure_unknown, debug_printf, get_loop, invoke_error, invoke_event, loop, make_event_cb, maybe_invoke_event, maybe_make_event_cb, new, notifier_name, parent, remove_child, remove_from_parent


Tom Molesworth <>


Licensed under the same terms as Perl itself, with additional licensing terms for the MQ spec to be found in share/amqp0-9-1.extended.xml ('a worldwide, perpetual, royalty-free, nontransferable, nonexclusive license to (i) copy, display, distribute and implement the Advanced Messaging Queue Protocol ("AMQP") Specification').