NAME
Net::AMQP::RabbitMQ - interact with RabbitMQ over AMQP using librabbitmq
SYNOPSIS
use Net::AMQP::RabbitMQ;
my $mq = Net::AMQP::RabbitMQ->new();
$mq->connect("localhost", { user => "guest", password => "guest" });
$mq->channel_open(1);
$mq->publish(1, "queuename", "Hi there!");
my $gotten = $mq->get(1, "queuename");
print $gotten->{body} . "\n";
$mq->disconnect();
VERSION COMPATIBILITY
This module was forked from Net::RabbitMQ version 0.2.6 which uses an older version of librabbitmq, and doesn't work correctly with newer versions of RabbitMQ. The main change between this module and the original is this library uses a newer, unforked, version of librabbitmq. Version 0.7.1 to be precise.
This means this module only works with the AMQP 0.9.1 protocol, so requires RabbitMQ version 2+. Also, since the version of librabbitmq used is not a custom fork, it means this module doesn't support the basic_return callback method.
DESCRIPTION
Net::AMQP::RabbitMQ
is a fork of Net::RabbitMQ
that uses a newer version of librabbitmq and fixes some bugs. It provides a simple wrapper around the librabbitmq library that allows connecting, declaring exchanges and queues, binding and unbinding queues, publishing, consuming and receiving events.
Error handling in this module is primarily achieve by Perl_croak
(die). You should be making good use of eval around these methods to ensure that you appropriately catch the errors.
METHODS
All methods, unless specifically stated, return nothing on success and die on failure.
Failure to be connected is a fatal failure for most methods.
new()
Creates a new Net::AMQP::RabbitMQ object.
connect( $hostname, $options )
$hostname
is the host to which a connection will be attempted.
$options
is an optional hash respecting the following keys:
{
user => $user, #default 'guest'
password => $password, #default 'guest'
port => $port, #default 5672
vhost => $vhost, #default '/'
channel_max => $cmax, #default 0
frame_max => $fmax, #default 131072
heartbeat => $hearbeat, #default 0
timeout => $seconds #default undef (no timeout)
ssl => 1 | 0 #default 0
ssl_verify_host => 1 | 0 #default 1
ssl_cacert => $caert_path #needed for ssl
ssl_init => 1 | 0 #default 1, initilise the openssl library
}
You probably don't want to touch ssl_init
, unless you know what it does.
For now there is no option to disable ssl peer checking, meaning to use ssl
, ssl_cacert
is required.
SSL NOTE
if the connection is cut when using ssl, openssl will throw a SIGPIPE
, you should catch this or perl will exit with error code 141
$SIG{PIPE} = 'IGNORE';
disconnect()
Causes the connection to RabbitMQ to be torn down.
get_server_properties()
Get a hashref of server properties (these may vary, you should Data::Dumper
to inspect). They will be provided by the RabbitMQ server to which you are connected.
get_client_properties()
Get a hashref of server properties (these may vary, you should Data::Dumper
to inspect).
is_connected()
Returns true if a valid socket connection appears to exist, false otherwise.
channel_open($channel)
$channel
is a positive integer describing the channel you which to open.
channel_close($channel)
$channel
is a positive integer describing the channel you which to close.
get_channel_max()
Returns the maximum allowed channel number.
exchange_declare($channel, $exchange, $options, $arguments)
$channel
is a channel that has been opened with channel_open
.
$exchange
is the name of the exchange to be instantiated.
$options
is an optional hash respecting the following keys:
{
exchange_type => $type, #default 'direct'
passive => $boolean, #default 0
durable => $boolean, #default 0
auto_delete => $boolean, #default 1
}
$arguments
is an optional hash of additional arguments to the RabbitMQ server, such as:
{
# exchange to try if no routes apply on this exchange
alternate_exchange => 'alternate_exchange_name',
}
exchange_delete($channel, $exchange, $options)
$channel
is a channel that has been opened with channel_open
.
$exchange
is the name of the exchange to be deleted.
$options
is an optional hash respecting the following keys:
{
if_unused => $boolean, #default 1
}
exchange_bind($channel, $destination, $source, $routing_key, $arguments)
$channel
is a channel that has been opened with channel_open
.
$destination
is a previously declared exchange, $source
is yet another previously declared exchange, and $routing_key
is the routing key that will bind the specified source exchange to the specified destination exchange.
$arguments
is an optional hash which will be passed to the server. When binding to an exchange of type headers
, this can be used to only receive messages with the supplied header values.
exchange_unbind($channel, $destination, $source, $routing_key, $arguments)
$channel
is a channel that has been opened with channel_open
.
$destination
is a previously declared exchange, $source
is yet another previously declared exchange, and $routing_key
is the routing key that will unbind the specified source exchange from the specified destination exchange.
$arguments
is an optional hash which will be passed to the server. When binding to an exchange of type headers
, this can be used to only receive messages with the supplied header values.
queue_declare($channel, $queuename, $options, $arguments)
$channel
is a channel that has been opened with channel_open
.
$queuename
is the name of the queuename to be instantiated. If $queuename
is undef or an empty string, then an auto generated queuename will be used.
$options
is an optional hash respecting the following keys:
{
passive => $boolean, #default 0
durable => $boolean, #default 0
exclusive => $boolean, #default 0
auto_delete => $boolean, #default 1
}
$arguments
is an optional hash which will be passed to the server when the queue is created. This can be used for creating mirrored queues by using the x-ha-policy header.
In scalar context, this method returns the queuename declared (important for retrieving the autogenerated queuename in the event that one was requested).
In array context, this method returns three items: queuename, the number of message waiting on the queue, and the number of consumers bound to the queue.
queue_bind($channel, $queuename, $exchange, $routing_key, $arguments)
$channel
is a channel that has been opened with channel_open
.
$queuename
is a previously declared queue, $exchange
is a previously declared exchange, and $routing_key
is the routing key that will bind the specified queue to the specified exchange.
$arguments
is an optional hash which will be passed to the server. When binding to an exchange of type headers
, this can be used to only receive messages with the supplied header values.
queue_unbind($channel, $queuename, $exchange, $routing_key, $arguments)
This is like the queue_bind
with respect to arguments. This command unbinds the queue from the exchange. The $routing_key
and $arguments
must match the values supplied when the binding was created.
If this fails, you must reopen the channel
queue_delete($channel, $queuename, $options)
Deletes the queue
$options
is an optional hash respecting the following keys:
{
if_unused => $boolean, #default 1
if_empty => $boolean, #default 1
}
If this fails, you must reopen the channel
publish($channel, $routing_key, $body, $options, $props)
$channel
is a channel that has been opened with channel_open
.
$routing_key
is the name of the routing key for this message.
$body
is the payload to enqueue.
$options
is an optional hash respecting the following keys:
{
exchange => $exchange, #default 'amq.direct'
mandatory => $boolean, #default 0
immediate => $boolean, #default 0
force_utf8_in_header_strings => $boolean, #default 0
}
The force_utf8_in_header_strings
option causes all headers which look like strings to be treated as UTF-8. In an attempt to make this a non-breaking change, this option is disabled by default. However, for all headers beginning with x-
, those are treated as UTF-8 regardless of this option (per spec).
$props
is an optional hash (the AMQP 'props') respecting the following keys:
{
content_type => $string,
content_encoding => $string,
correlation_id => $string,
reply_to => $string,
expiration => $string,
message_id => $string,
type => $string,
user_id => $string,
app_id => $string,
delivery_mode => $integer,
priority => $integer,
timestamp => $integer,
headers => $headers # This should be a hashref of keys and values.
}
consume($channel, $queuename, $options)
$channel
is a channel that has been opened with channel_open
.
$queuename
is the name of the queue from which we'd like to consume.
$options
is an optional hash respecting the following keys:
{
consumer_tag => $tag, #absent by default
no_local => $boolean, #default 0
no_ack => $boolean, #default 1
exclusive => $boolean, #default 0
}
The consumer_tag is returned. This command does not return AMQP frames, it simply notifies RabbitMQ that messages for this queue should be delivered down the specified channel.
recv($timeout)
$timeout
is a positive integer, specifying the number of milliseconds to wait for a message. If you do not provide a timeout (or set it to 0), then this call will block until it receives a message. If you set it to -1 it will return immediately (waiting 0 ms).
This command receives and reconstructs AMQP frames and returns a hash containing the following information:
{
body => 'Magic Transient Payload', # the reconstructed body
routing_key => 'nr_test_q', # route the message took
exchange => 'nr_test_x', # exchange used
delivery_tag => 1, # (used for acks)
redelivered => $boolean # if message is redelivered
consumer_tag => 'c_tag', # tag from consume()
props => $props, # hashref sent in
}
$props
is the hash sent by publish() respecting the following keys:
{
content_type => $string,
content_encoding => $string,
correlation_id => $string,
reply_to => $string,
expiration => $string,
message_id => $string,
type => $string,
user_id => $string,
app_id => $string,
delivery_mode => $integer,
priority => $integer,
timestamp => $integer,
}
If you provide a timeout, then the recv()
method returns undef
if the timeout expires before a message is received from the server.
cancel($channel, $consumer_tag)
$channel
is a channel that has been opened with channel_open
.
$consumer_tag
is a tag previously passed to consume()
or one that was generated automatically as a result of calling consume()
without an explicit tag.
This method returns true or false indicating whether we got the expected "cancel-ok" response from the server.
get($channel, $queuename, $options)
This command runs an amqp_basic_get which returns undef immediately if no messages are available on the queue and returns a hash as follows if a message is available:
{
body => 'Magic Transient Payload', # the reconstructed body
routing_key => 'nr_test_q', # route the message took
exchange => 'nr_test_x', # exchange used
content_type => 'foo', # (only if specified)
delivery_tag => 1, # (used for acks)
redelivered => 0, # if message is redelivered
message_count => 0, # message count
# Not all of these will be present. Consult the RabbitMQ reference for more details.
props => {
content_type => 'text/plain',
content_encoding => 'none',
correlation_id => '123',
reply_to => 'somequeue',
expiration => 1000,
message_id => 'ABC',
type => 'notmytype',
user_id => 'guest',
app_id => 'idd',
delivery_mode => 1,
priority => 2,
timestamp => 1271857990,
headers => {
unsigned_integer => 12345,
signed_integer => -12345,
double => 3.141,
string => "string here",
# The x-death header is a special header for dead-lettered messages (rejected or timed out).
'x-death' => [
{
time => 1271857954,
exchange => $exchange,
queue => $exchange,
reason => 'expired',
'routing-keys' => [q{}],
},
],
},
},
}
$channel
is a channel that has been opened with channel_open
.
$queuename
is the name of the queue from which we'd like to consume.
$options
is an optional hash respecting the following keys:
{
no_ack => $boolean, #default 1
}
ack($channel, $delivery_tag, $multiple = 0)
$channel
is a channel that has been opened with channel_open
.
$delivery_tag
the delivery tag seen from a returned frame from the recv
method.
$multiple
specifies if multiple are to be acknowledged at once.
purge($channel, $queuename)
$channel
is a channel that has been opened with channel_open
.
$queuename
is the queue to be purged.
reject($channel, $delivery_tag, $requeue = 0)
$channel
is a channel that has been opened with channel_open
.
$delivery_tag
the delivery tag seen from a returned frame from the recv
method.
$requeue
specifies if the message should be requeued.
tx_select($channel)
$channel
is a channel that has been opened with channel_open
.
Start a server-side (tx) transaction over $channel.
tx_commit($channel)
$channel
is a channel that has been opened with channel_open
.
Commit a server-side (tx) transaction over $channel.
tx_rollback($channel)
$channel
is a channel that has been opened with channel_open
.
Rollback a server-side (tx) transaction over $channel.
basic_qos($channel, $options)
$channel
is a channel that has been opened with channel_open
.
$options
is an optional hash respecting the following keys:
{
prefetch_count => $cnt, #default 0
prefetch_size => $size, #default 0
global => $bool, #default 0
}
Set quality of service flags on the current $channel.
heartbeat()
Send a heartbeat frame. If you've connected with a heartbeat parameter, you must send a heartbeat periodically matching connection parameter or the server may snip the connection.
WARNING AND ERROR MESSAGES
Fatal Errors
It should be noted that almost all errors in this library are considered fatal, insomuch as they trigger a croak()
. In these errors, if it appears that somehow the connection has been closed by the remote host, or otherwise invalidated, the socket will also be closed and should be re-opened before any additional calls are made.
AUTHORS
Theo Schlossnagle <jesus@omniti.com>
Mark Ellis <markellis@cpan.org>
Michael Stemle, Jr. <themanchicken@gmail.com>
Dave Rolsky <autarch@urth.org>
Slaven Rezić
Armand Leclercq
Daniel W Burke
Dávid Kovács
Alexey Sheynuk
Karen Etheridge <ether@cpan.org>
Eric Brine <ikegami@cpan.org>
LICENSE
This software is licensed under the Mozilla Public License. See the LICENSE file in the top distribution directory for the full license text.
librabbitmq is licensed under the MIT License. See the LICENSE-MIT file in the top distribution directory for the full license text.