NAME
Net::Async::AMQP - provides client interface to AMQP using IO::Async
VERSION
version 0.009
SYNOPSIS
use IO::Async::Loop;
use Net::Async::AMQP;
my $loop = IO::Async::Loop->new;
$loop->add(my $amqp = Net::Async::AMQP->new);
$amqp->connect(
host => 'localhost',
user => 'guest',
pass => 'guest',
on_connected => sub { ... }
);
$loop->run;
DESCRIPTION
Does AMQP things. Note that the API may change before the stable 1.000 release - "SEE ALSO" has some alternative modules if you're looking for something that has been around for longer.
If you want a higher-level API which manages channels and connections, try Net::Async::AMQP::ConnectionManager.
CONSTANTS
AUTH_MECH
Defines the mechanism used for authentication. Currently only AMQPLAIN is supported.
PAYLOAD_HEADER_LENGTH
Length of header used in payload messages. Defined by the AMQP standard.
MAX_FRAME_SIZE
Largest amount of data we'll attempt to send in a single frame. Actual frame limit will be negotiated with the remote server. Defaults to 262144.
MAX_CHANNELS
Maximum number of channels to request. Defaults to the AMQP limit (65535). Attempting to set this any higher will not end well, it's an unsigned 16-bit value.
DEBUG
Debugging flag - set PERL_AMQP_DEBUG
to 1 in the environment to enable informational messages to STDERR.
HEARTBEAT_INTERVAL
Interval in seconds between heartbeat frames, zero to disable. Can be overridden by PERL_AMQP_HEARTBEAT_INTERVAL
in the environment, default is 0 (disabled).
PACKAGE VARIABLES
$XML_SPEC
This defines the path to the AMQP XML spec, which Net::AMQP uses to create methods and handlers for the appropriate version of the MQ protocol.
Defaults to an extended version of the 0.9.1 protocol as used by RabbitMQ, this is found in the amqp0-9-1.extended.xml
distribution sharedir (see File::ShareDir).
Normally, you should be able to ignore this. If you want to load an alternative spec, note that (a) this is global, rather than per-instance, (b) it needs to be set before you use
this module.
BEGIN { $Net::Async::AMQP::XML_SPEC = '/tmp/amqp.xml' }
use Net::Async::AMQP;
%CONNECTION_DEFAULTS
The default parameters to use for "connect". Changing these values is permitted, but do not attempt to delete or add any entries from the hash.
Passing parameters directly to "connect" is much safer, please do that instead.
METHODS
configure
Set up variables. Takes the following optional named parameters:
heartbeat_interval - (optional) interval between heartbeat messages, default is set by the "HEARTBEAT_INTERVAL" constant
Returns the new instance.
bus
Event bus. Used for sharing global events such as connection closure.
connect
Takes the following parameters:
port - the AMQP port, defaults to 5672, can be a service name if preferred
host - host to connect to, defaults to localhost
local_host - our local IP to connect from
user - which user to connect as, defaults to guest
pass - the password for this user, defaults to guest
on_connected - callback for when we establish a connection
on_error - callback for any errors encountered during connection
Returns $self.
handle_heartbeat_failure
Called when heartbeats are enabled and we've had no response from the server for 3 heartbeat intervals. We'd expect some frame from the remote - even if just a heartbeat frame - at least once every heartbeat interval so if this triggers then we're likely dealing with a dead or heavily loaded server.
This will invoke the "heartbeat_failure event" then close the connection.
send_heartbeat
Sends the heartbeat frame.
post_connect
Sends initial startup header and applies listener for the Connection::Start message.
Returns $self.
setup_tuning
Applies listener for the Connection::Tune message, used for determining max frame size and heartbeat settings.
Returns $self.
open_connection
Establish a new connection to a vhost - this is called after tuning is complete, and must happen before any channel connections are attempted.
Returns $self.
setup_connection
Applies listener for the Connection::OpenOk message, which triggers the connected
event.
Returns $self.
next_channel
Returns the next available channel ready for "open_channel". Note that whatever it reports will be completely wrong if you've manually specified a channel anywhere, so don't do that.
open_channel
Opens a new channel.
Returns the new Net::Async::AMQP::Channel instance.
close
Close the connection.
Returns a Future which will resolve with $self
when the connection is closed.
next_pending
Retrieves the next pending handler for the given incoming frame type (see "get_frame_type"), and calls it.
Takes the following parameters:
$type - the frame type, such as 'Basic::ConnectOk'
$frame - the frame itself
Returns $self.
METHODS - Accessors
host
The current host.
vhost
Virtual host.
port
Port number. Usually 5672.
user
MQ user.
frame_max
Maximum number of bytes allowed in any given frame. This is the value negotiated with the remote server.
channel_max
Maximum number of channels. This is whatever we ended up with after initial negotiation.
last_frame_time
Timestamp of the last frame we received from the remote. Used for handling heartbeats.
stream
Returns the current IO::Async::Stream for the AMQP connection.
incoming_message
Future for the current incoming message (received in two or more parts: the header then all body chunks).
METHODS - Internal
The following methods are intended for internal use. They are documented for completeness but should not normally be needed outside this library.
push_pending
Adds the given handler(s) to the pending handler list for the given type(s).
Takes one or more of the following parameter pairs:
$type - the frame type, see "get_frame_type"
$code - the coderef to call, will be invoked once as follows when a matching frame is received:
$code->($self, $frame, @_)
Returns $self
.
remove_pending
Removes a coderef from the pending event handler.
Returns $self
.
write
Writes data to the server.
get_frame_type
Takes the following parameters:
$frame - the Net::AMQP::Frame instance
Returns string representing type, typically the base class with Net::AMQP::Protocol prefix removed.
process_frame
Process a single incoming frame.
Takes the following parameters:
$frame - the Net::AMQP::Frame instance
Returns $self.
split_payload
Splits a message into separate frames.
Takes the $payload as a scalar containing byte data, and the following parameters:
exchange - where we're sending the message
routing_key - other part of message destination
Returns list of frames suitable for passing to "send_frame".
send_frame
Send a single frame.
Takes the $frame instance followed by these optional named parameters:
channel - which channel we should send on
Returns $self.
reset_heartbeat
Resets our side of the heartbeat timer.
This is used to ensure we send data at least once every "heartbeat_interval" seconds.
future
Returns a new IO::Async::Future instance.
Supports optional named parameters for setting label etc.
EVENTS
The following events may be raised by this class - use "subscribe_to_event" in Mixin::Event::Dispatch to watch for them:
$mq->bus->subscribe_to_event(
heartbeat_failure => sub {
my ($ev, $last) = @_;
print "Heartbeat failure detected\n";
}
);
connected event
Called after the connection has been opened.
close event
Called after the remote has closed the connection.
heartbeat_failure event
Raised if we receive no data from the remote for more than 3 heartbeat intervals and heartbeats are enabled,
unexpected_frame event
If we receive an unsolicited frame from the server this event will be raised:
$mq->bus->subscribe_to_event(
unexpected_frame => sub {
my ($ev, $type, $frame) = @_;
warn "Frame type $type received: $frame\n";
}
)
SEE ALSO
Net::AMQP - this does all the hard work of converting the XML protocol specification into appropriate Perl methods and classes.
Net::AMQP::RabbitMQ - librabbitmq support
POE::Component::Client::AMQP - POE equivalent of this module
AUTHOR
Tom Molesworth <cpan@perlsite.co.uk>
LICENSE
Licensed under the same terms as Perl itself, with additional licensing terms for the MQ spec to be found in share/amqp0-9-1.extended.xml
('a worldwide, perpetual, royalty-free, nontransferable, nonexclusive license to (i) copy, display, distribute and implement the Advanced Messaging Queue Protocol ("AMQP") Specification').