NAME
Net::AMQP::RabbitMQ - interact with RabbitMQ over AMQP using librabbitmq
SYNOPSIS
use Net::AMQP::RabbitMQ;
my $mq = Net::AMQP::RabbitMQ->new();
$mq->connect("localhost", { user => "guest", password => "guest" });
$mq->channel_open(1);
$mq->queue_declare(1, "queuename");
$mq->publish(1, "queuename", "Hi there!");
my $gotten = $mq->get(1, "queuename");
print $gotten->{body} . "\n";
$mq->disconnect();
DESCRIPTION
Net::AMQP::RabbitMQ
provides a simple wrapper around the librabbitmq library that allows connecting, declaring exchanges and queues, binding and unbinding queues, publishing, consuming and receiving events.
Error handling in this module is primarily achieve by Perl_croak
(die). You should be making good use of eval
around these methods to ensure that you appropriately catch the errors.
INSTALLATION
cpanm Net::AMQP::RabbitMQ
or cpan Net::AMQP::RabbitMQ
Note that the Net::AMQP::RabbitMQ
module includes the associated librabbitmq C library. Thus there is no need to install this separately beforehand.
METHODS
All methods, unless specifically stated, return nothing on success and die on failure.
Failure to be connected is a fatal failure for most methods.
new()
Creates a new Net::AMQP::RabbitMQ object.
connect( $hostname, $options )
Connect to RabbitMQ server.
$hostname
is the host to which a connection will be attempted.
$options
is an optional hash respecting the following keys:
{
user => $user, #default 'guest'
password => $password, #default 'guest'
port => $port, #default 5672
vhost => $vhost, #default '/'
channel_max => $cmax, #default 0
frame_max => $fmax, #default 131072
heartbeat => $heartbeat, #default 0
timeout => $seconds, #default undef (no timeout)
ssl => 1 | 0, #default 0
ssl_verify_host => 1 | 0, #default 1
ssl_cacert => $caert_path, #needed for ssl
ssl_cert => $cert_path, #client cert.pem and key.pem when using ssl certificate chains
ssl_key => $key_path #(with RabbitMQ's fail_if_no_peer_cert = true)
}
For now there is no option to disable ssl peer checking, meaning to use ssl
, ssl_cacert
is required.
SSL NOTE
if the connection is cut when using ssl, openssl will throw a SIGPIPE
, you should catch this or perl will exit with error code 141
$SIG{PIPE} = 'IGNORE';
disconnect()
Disconnect from the RabbitMQ server.
get_server_properties()
Get a reference to hash (hashref) of server properties. These may vary, you should use Data::Dumper
to inspect. Properties will be provided for the RabbitMQ server to which you are connected.
get_client_properties()
Get a reference to hash (hashref) of client properties. These may vary, you should use Data::Dumper
to inspect.
is_connected()
Returns true if a valid socket connection appears to exist, false otherwise.
channel_open($channel)
Open an AMQP channel on the connection.
$channel
is a positive integer describing the channel you which to open.
channel_close($channel)
Close the specified channel.
$channel
is a positive integer describing the channel you which to close.
get_channel_max()
Returns the maximum allowed channel number.
exchange_declare($channel, $exchange, $options, $arguments)
Declare an AMQP exchange on the RabbitMQ server unless it already exists. Bad things will happen if the exchange name already exists and different parameters are provided.
$channel
is a channel that has been opened with channel_open
.
$exchange
is the name of the exchange to be instantiated.
$options
is an optional hash respecting the following keys:
{
exchange_type => $type, #default 'direct'
passive => $boolean, #default 0
durable => $boolean, #default 0
auto_delete => $boolean, #default 0
}
Note that the default for the auto_delete
option is different for exchange_declare
and for queue_declare
.
$arguments
is an optional hash of additional arguments to the RabbitMQ server, such as:
{
# exchange to try if no routes apply on this exchange
alternate_exchange => 'alternate_exchange_name',
}
exchange_delete($channel, $exchange, $options)
Delete a AMQP exchange on the RabbitMQ server.
$channel
is a channel that has been opened with channel_open
.
$exchange
is the name of the exchange to be deleted.
$options
is an optional hash respecting the following keys:
{
if_unused => $boolean, #default 1
}
exchange_bind($channel, $destination, $source, $routing_key, $arguments)
Bind a source exchange to a destination exchange with a given routing key and/or parameters.
$channel
is a channel that has been opened with channel_open
.
$destination
is a previously declared exchange, $source
is yet another previously declared exchange, and $routing_key
is the routing key that will bind the specified source exchange to the specified destination exchange.
$arguments
is an optional hash which will be passed to the server. When binding to an exchange of type headers
, this can be used to only receive messages with the supplied header values.
exchange_unbind($channel, $destination, $source, $routing_key, $arguments)
Remove a binding between source and destination exchanges.
$channel
is a channel that has been opened with channel_open
.
$destination
is a previously declared exchange, $source
is yet another previously declared exchange, and $routing_key
is the routing key that will unbind the specified source exchange from the specified destination exchange.
$arguments
is an optional hash which will be passed to the server. When binding to an exchange of type headers
, this can be used to only receive messages with the supplied header values.
queue_declare($channel, $queuename, $options, $arguments)
Declare an AMQP queue on the RabbitMQ server.
In scalar context, this method returns the queuename declared (important for retrieving the auto-generated queuename in the event that one was requested).
In array context, this method returns three items: queuename, the number of message waiting on the queue, and the number of consumers bound to the queue.
$channel
is a channel that has been opened with channel_open
.
$queuename
is the name of the queuename to be instantiated. If $queuename
is undef or an empty string, then an auto generated queuename will be used.
$options
is an optional hash respecting the following keys:
{
passive => $boolean, #default 0
durable => $boolean, #default 0
exclusive => $boolean, #default 0
auto_delete => $boolean, #default 1
}
Note that the default for the auto_delete
option is different for exchange_declare
and for queue_declare
.
$arguments
is an optional hash which will be passed to the server when the queue is created. This can be used for creating mirrored queues by using the x-ha-policy header.
queue_bind($channel, $queuename, $exchange, $routing_key, $arguments)
Bind the specified queue to the specified exchange with a routing key.
$channel
is a channel that has been opened with channel_open
.
$queuename
is a previously declared queue, $exchange
is a previously declared exchange, and $routing_key
is the routing key that will bind the specified queue to the specified exchange.
$arguments
is an optional hash which will be passed to the server. When binding to an exchange of type headers
, this can be used to only receive messages with the supplied header values.
queue_unbind($channel, $queuename, $exchange, $routing_key, $arguments)
Remove a binding between a queue and an exchange. If this fails, you must reopen the channel.
This is like the queue_bind
with respect to arguments. This command unbinds the queue from the exchange. The $routing_key
and $arguments
must match the values supplied when the binding was created.
queue_delete($channel, $queuename, $options)
Delete a specified queue. If this fails, you must reopen the channel.
$options
is an optional hash respecting the following keys:
{
if_unused => $boolean, #default 1
if_empty => $boolean, #default 1
}
publish($channel, $routing_key, $body, $options, $props)
Publish a message to an exchange.
$channel
is a channel that has been opened with channel_open
.
$routing_key
is the name of the routing key for this message.
$body
is the payload to enqueue.
$options
is an optional hash respecting the following keys:
{
exchange => $exchange, #default 'amq.direct'
mandatory => $boolean, #default 0
immediate => $boolean, #default 0
force_utf8_in_header_strings => $boolean, #default 0
}
The force_utf8_in_header_strings
option causes all headers which look like strings to be treated as UTF-8. In an attempt to make this a non-breaking change, this option is disabled by default. However, for all headers beginning with x-
, those are treated as UTF-8 regardless of this option (per spec).
$props
is an optional hash (the AMQP 'props') respecting the following keys:
{
content_type => $string,
content_encoding => $string,
correlation_id => $string,
reply_to => $string,
expiration => $string,
message_id => $string,
type => $string,
user_id => $string,
app_id => $string,
delivery_mode => $integer,
priority => $integer,
timestamp => $integer,
headers => $headers # This should be a hashref of keys and values.
}
consume($channel, $queuename, $options)
Put the channel into consume mode.
The consumer_tag
is returned. This command does not return AMQP messages, for that the recv
method should be used.
$channel
is a channel that has been opened with channel_open
.
$queuename
is the name of the queue from which we'd like to consume.
$options
is an optional hash respecting the following keys:
{
consumer_tag => $tag, #absent by default
no_local => $boolean, #default 0
no_ack => $boolean, #default 1
exclusive => $boolean, #default 0
}
recv($timeout)
Receive AMQP messages.
This method returns a reference to a hash (hashref) containing the following information:
{
body => 'Magic Transient Payload', # the reconstructed body
routing_key => 'nr_test_q', # route the message took
exchange => 'nr_test_x', # exchange used
delivery_tag => 1, # (used for acks)
redelivered => $boolean # if message is redelivered
consumer_tag => 'c_tag', # tag from consume()
props => $props, # hashref sent in
}
$props
is the hash sent by publish() respecting the following keys:
{
content_type => $string,
content_encoding => $string,
correlation_id => $string,
reply_to => $string,
expiration => $string,
message_id => $string,
type => $string,
user_id => $string,
app_id => $string,
delivery_mode => $integer,
priority => $integer,
timestamp => $integer,
}
$timeout
is a positive integer, specifying the number of milliseconds to wait for a message. If you do not provide a timeout (or set it to 0), then this call will block until it receives a message. If you set it to -1 it will return immediately (waiting 0 ms).
If you provide a timeout, then the recv
method returns undef
if the timeout expires before a message is received from the server.
cancel($channel, $consumer_tag)
Take the channel out of consume mode previously enabled with consume
.
This method returns true or false indicating whether we got the expected "cancel-ok" response from the server.
$channel
is a channel that has been opened with channel_open
.
$consumer_tag
is a tag previously passed to consume()
or one that was generated automatically as a result of calling consume()
without an explicit tag.
get($channel, $queuename, $options)
Get a message from the specified queue (via amqp_basic_get()
).
The method returns undef
immediately if no messages are available on the queue. If a message is available a reference to a hash (hashref) is returned with the following contents:
{
body => 'Magic Transient Payload', # the reconstructed body
routing_key => 'nr_test_q', # route the message took
exchange => 'nr_test_x', # exchange used
content_type => 'foo', # (only if specified)
delivery_tag => 1, # (used for acks)
redelivered => 0, # if message is redelivered
message_count => 0, # message count
# Not all of these will be present. Consult the RabbitMQ reference for more details.
props => {
content_type => 'text/plain',
content_encoding => 'none',
correlation_id => '123',
reply_to => 'somequeue',
expiration => 1000,
message_id => 'ABC',
type => 'notmytype',
user_id => 'guest',
app_id => 'idd',
delivery_mode => 1,
priority => 2,
timestamp => 1271857990,
headers => {
unsigned_integer => 12345,
signed_integer => -12345,
double => 3.141,
string => "string here",
# The x-death header is a special header for dead-lettered messages (rejected or timed out).
'x-death' => [
{
time => 1271857954,
exchange => $exchange,
queue => $exchange,
reason => 'expired',
'routing-keys' => [q{}],
},
],
},
},
}
$channel
is a channel that has been opened with channel_open
.
$queuename
is the name of the queue from which we'd like to consume.
$options
is an optional hash respecting the following keys:
{
no_ack => $boolean, #default 1
}
ack($channel, $delivery_tag, $multiple = 0)
Acknowledge a message.
$channel
is a channel that has been opened with channel_open
.
$delivery_tag
the delivery tag seen from a returned message from the recv
method.
$multiple
specifies if multiple are to be acknowledged at once. If $multiple
is non-zero, the broker will operate on all messages delivered with a delivery tag less than or equal to $delivery_tag
.
nack($channel, $delivery_tag, $multiple = 0)
Negatively acknowledge a message.
$channel
is a channel that has been opened with channel_open
.
$delivery_tag
the delivery tag seen from a returned message from the recv
method.
$multiple
specifies if multiple are to be acknowledged at once. If $multiple
is non-zero, the broker will operate on all messages delivered with a delivery tag less than or equal to $delivery_tag
.
reject($channel, $delivery_tag, $requeue = 0)
Reject a message with the specified delivery tag.
$channel
is a channel that has been opened with channel_open
.
$delivery_tag
the delivery tag seen from a returned message from the recv
method.
$requeue
specifies if the message should be requeued.
purge($channel, $queuename)
Purge all messages from the specified queue.
$channel
is a channel that has been opened with channel_open
.
$queuename
is the queue to be purged.
tx_select($channel)
Start a server-side (tx) transaction over $channel.
$channel
is a channel that has been opened with channel_open
.
tx_commit($channel)
Commit a server-side (tx) transaction over $channel.
$channel
is a channel that has been opened with channel_open
.
tx_rollback($channel)
Rollback a server-side (tx) transaction over $channel.
$channel
is a channel that has been opened with channel_open
.
get_rpc_timeout()
Return the RPC timeout on the current connection.
The value returned will be either undef
, if the RPC timeout is unlimited, or a hashref with tv_sec
for the number of seconds and tv_usec
for the number of microseconds.
set_rpc_timeout({ tv_sec => SECONDS, tv_usec => MICROSECONDS })
Set the RPC timeout for the current connection, using the seconds (tv_sec
) and microseconds (tv_usec
) provided. The arguments supplied can be either in the form of a hash or a hashref, so all of the following are valid:
$mq->set_rpc_timeout(tv_sec => 10, tv_usec => 500000)
$mq->set_rpc_timeout( { tv_sec => 10, tv_usec => 500000 } )
$mq->set_rpc_timeout(tv_sec => 10)
$mq->set_rpc_timeout(tv_usec => 500000)
In order to remove the time limit for RPC calls, simply pass undef
.
$mq->set_rpc_timeout( undef )
basic_qos($channel, $options)
Set quality of service flags on the current $channel.
$channel
is a channel that has been opened with channel_open
.
$options
is an optional hash respecting the following keys:
{
prefetch_count => $cnt, #default 0
prefetch_size => $size, #default 0
global => $bool, #default 0
}
heartbeat()
Send a heartbeat. If you've connected with a heartbeat parameter, you must send a heartbeat periodically matching connection parameter or the server may snip the connection.
Note that since recv
blocks for up to $timeout
milliseconds, it automatically handles sending heartbeats for you while active.
has_ssl
Returns true if the module was compiled with SSL support, false otherwise
confirm_select($channel)
Put the $channel
into select mode so that publisher confirmations will be sent by the broker.
$channel
is the channel number you wish to put into select mode.
Note that there is presently no way to disable select mode on a channel, so in order to cancel select mode you will need to close the channel and open another one.
publisher_confirm_wait($timeout)
Wait for a publisher confirm from the broker. If no publisher confirm has appeared before the timeout expires, undef
is returned.
$timeout
is an E<integer> representing the amount of time, in seconds, to wait for a confirmation. If a positive timeout is not specified or is specified as zero, this call will block until a response is received. If you specify a negative value for the timeout, it will time out immediately.
When a response is received, a hashref will be returned in the appropriate format for the method returned.
For a `basic.ack` response:
{
channel => 2,
method => 'basic.ack',
delivery_tag => 12,
multiple => 0,
}
For a `basic.nack` response:
{
channel => 2,
method => 'basic.nack',
delivery_tag => 12,
multiple => 0,
requeue => 1,
}
For a `basic.reject` response:
{
channel => 2,
method => 'basic.reject',
delivery_tag => 12,
requeue => 1,
}
channel
-
This is the channel for which the publisher confirmation was received.
method
-
The method received from the broker, which will always be one of
basic.ack
,basic.nack
, orbasic.reject
. delivery_tag
-
A numeric value identifying a message. This is a sequential integer set by the broker for messages delivered in order.
For example, if you publish one message, that message will have
delivery_tag
ofn
. When you publish another message, that message will have adelivery_tag
ofn+1
. multiple
-
Both
basic.ack
andbasic.nack
can be sent once for multiple messages. This boolean field, when true, indicates that all messages up to the currentdelivery_tag
since the last response have been confirmed using the same message.So if you publish three messages, and you get a single
basic.ack
with themultiple
field set to1
, then you know that all three of those messages have confirmed with the same method. requeue
-
For both
basic.nack
andbasic.reject
, a message can be requeued by whichever consumer received the message. If you receive a confirmation with this set to1
, then you know that the message(s) have already been requeued.
WARNING AND ERROR MESSAGES
Fatal Errors
It should be noted that almost all errors in this library are considered fatal, insomuch as they trigger a croak()
. In these errors, if it appears that somehow the connection has been closed by the remote host, or otherwise invalidated, the socket will also be closed and should be re-opened before any additional calls are made.
EXAMPLES
Simple publisher
use Net::AMQP::RabbitMQ;
my $channel = 1;
my $exchange = "MyExchange.x"; # This exchange must exist already
my $routing_key = "foobar";
my $mq = Net::AMQP::RabbitMQ->new();
$mq->connect("localhost", { user => "guest", password => "guest" });
$mq->channel_open(1);
$mq->publish($channel, $routing_key, "Message Here", { exchange => $exchange });
$mq->disconnect();
Simple consumer
use Net::AMQP::RabbitMQ;
use Data::Dumper;
my $channel = 1;
my $exchange = "MyExchange.x"; # This exchange must exist already
my $routing_key = "foobar";
my $mq = Net::AMQP::RabbitMQ->new();
$mq->connect("localhost", { user => "guest", password => "guest" });
$mq->channel_open($channel);
# Declare queue, letting the server auto-generate one and collect the name
my $queuename = $mq->queue_declare($channel, "");
# Bind the new queue to the exchange using the routing key
$mq->queue_bind($channel, $queuename, $exchange, $routing_key);
# Request that messages be sent and receive them until interrupted
$mq->consume($channel, $queuename);
while ( my $message = $mq->recv(0) )
{
print "Received message:\n";
print Dumper($message);
}
$mq->disconnect();
Using QOS
use Net::AMQP::RabbitMQ;
my $channel = 1;
my $exchange = "MyExchange.x"; # This exchange must exist already
my $routing_key = "foobar";
my $mq = Net::AMQP::RabbitMQ->new();
$mq->connect("localhost", { user => "guest", password => "guest" });
$mq->channel_open($channel);
# Prefetch 5 messages per window
$mq->basic_qos( $channel, { prefetch_count => 5 });
RUNNING THE TEST SUITE
This module is tested with private RabbitMQ services, and for security and compliance reasons it is no longer possible to expose this to the public.
You can create your own free instance to use with testing at https://www.cloudamqp.com/.
There are separate variables for the ssl and none ssl host/user/password/port, as well as the admin capabilities. In order to run the full test suite, you must have the management module enabled.
NOTE ON TESTS: The full set of tests (especially the xt
tests) can take quite some time, and may only work on GNU/Linux environments. By "quite some time," I mean that they may take more than two hours depending on your RMQ server's capacity.
These are the environment variables which control test behavior:
- MQHOST
-
Hostname or IP address of the RabbitMQ server to connect to.
- MQUSERNAME
-
Username for authentication.
- MQPASSWORD
-
Password for authentication.
- MQPORT
-
Port of the RabbitMQ server to connect to (defaults to 5672)
- MQVHOST
-
Vhost to use.
- MQSSL
-
Whether the tests should run with SSL enabled (defaults to false, but see also
MQSKIPSSL
). - MQSKIPSSL
-
Whether the SSL tests should be skipped entirely. This option exists because the SSL tests used to ignore
MQSSL
, and to maintain backwards compatibility, still do. - MQSSLHOST
-
Hostname or IP address of the RabbitMQ server to connect to.
- MQSSLUSERNAME
-
Username for authentication.
- MQSSLPASSWORD
-
Password for authentication.
- MQSSLPORT
-
Port of the RabbitMQ server to connect to (defaults to 5671)
- MQSSLCACERT
-
Path to the certificate file for SSL-enabled connections.
- MQSSLVERIFYHOST
-
Whether SSL hostname verification should be enabled (defaults to true).
- MQSSLVHOST
-
Vhost to use when in SSL mode.
- MQADMINPROTOCOL
-
Protocol to use for accessing the admin. Defaults to https
- MQADMINPORT
-
Port to use for accessing the admin interface. Defaults to 443
- MQADMINCACERT
-
CA certificate to use for the admin port. There is no default.
VERSION COMPATIBILITY
This module was forked from Net::RabbitMQ version 0.2.6 which uses an older version of librabbitmq, and doesn't work correctly with newer versions of RabbitMQ. The main change between this module and the original is this library uses a newer, unforked, version of librabbitmq.
This means this module only works with the AMQP 0.9.1 protocol, so requires RabbitMQ version 2+. Also, since the version of librabbitmq used is not a custom fork, it means this module doesn't support the basic_return callback method.
This module has been tested with OpenSSL up to version 3.3.1.
Please note that legacy versions of OpenSSL may or may not work, but are indeed unsupported. Only currently-supported versions of OpenSSL will be supported.
AUTHORS
Mike "manchicken" Stemle, Jr. <hello@mikestemle.com>
Theo Schlossnagle <jesus@omniti.com>
Mark Ellis <markellis@cpan.org>
Dave Rolsky <autarch@urth.org>
Slaven Rezić
Armand Leclercq
Daniel W Burke
Dávid Kovács
Alexey Sheynuk
Karen Etheridge <ether@cpan.org>
Eric Brine <ikegami@cpan.org>
Peter Valdemar Mørch <pmorch@cpan.org>
LICENSE
This software is licensed under the Mozilla Public License. See the LICENSE file in the top distribution directory for the full license text.
librabbitmq is licensed under the MIT License. See the LICENSE-MIT file in the top distribution directory for the full license text.
1 POD Error
The following errors were encountered while parsing the POD:
- Around line 580:
Unknown E content in E<integer>