NAME

Net::RabbitMQ - interact with RabbitMQ over AMQP using librabbitmq

SYNOPSIS

use Net::RabbitMQ;
my $mq = Net::RabbitMQ->new();
$mq->connect("localhost", { user => "guest", password => "guest" });
$mq->channel_open(1);
$mq->publish(1, "queuename", "Hi there!");
$mq->disconnect();

DESCRIPTION

Net::RabbitMQ provides a simple wrapper around the librabbitmq library that allows connecting, delcaring exchanges and queues, binding and unbinding queues, publising, consuming and receiving events.

Error handling in this module is primarily achieve by Perl_croak (die). You should be making good use of eval around these methods to ensure that you appropriately catch the errors.

Methods

All methods, unless specifically stated, return nothing on success and die on failure.

new()

Creates a new Net::RabbitMQ object.

connect( $hostname, $options )

$hostname is the host to which a connection will be attempted.

$options is an optional hash respecting the following keys:

{
  user => $user,           #default 'guest'
  password => $password,   #default 'guest'
  port => $port,           #default 5672
  vhost => $vhost,         #default '/'
  channel_max => $cmax,    #default 0
  frame_max => $fmax,      #default 131072
  heartbeat => $hearbeat,  #default 0
}
disconnect()

Causes the connection to RabbitMQ to be torn down.

channel_open($channel)

$channel is a positive integer describing the channel you which to open.

channel_close($channel)

$channel is a positive integer describing the channel you which to close.

get_channel_max()

Returns the maximum allowed channel number.

exchange_declare($channel, $exchange, $options)

$channel is a channel that has been opened with channel_open.

$exchange is the name of the exchange to be instantiated.

$options is an optional hash respecting the following keys:

{
  exchange_type => $type,  #default 'direct'
  passive => $boolean,     #default 0
  durable => $boolean,     #default 0
  auto_delete => $boolean, #default 1
}
exchange_delete($channel, $exchange, $options)

$channel is a channel that has been opened with channel_open.

$exchange is the name of the exchange to be deleted.

$options is an optional hash respecting the following keys:

{
  if_unused => $boolean,   #default 1
  nowait => $boolean,      #default 0
}
queue_declare($channel, $queuename, $options)

$channel is a channel that has been opened with channel_open.

$queuename is the name of the queuename to be instantiated. If $queuename is undef or an empty string, then an auto generated queuename will be used.

$options is an optional hash respecting the following keys:

{
  passive => $boolean,     #default 0
  durable => $boolean,     #default 0
  exclusive => $boolean,   #default 0
  auto_delete => $boolean, #default 1
}

This method returns the queuename delcared (important for retrieving the autogenerated queuename in the event that one was requested).

queue_bind($channel, $queuename, $exchange, $routing_key)

$channel is a channel that has been opened with channel_open.

$queuename is a previously declared queue, $exchange is a previously declared exchange, and $routing_key is the routing key that will bind the specified queue to the specified exchange.

queue_unbind($channel, $queuename, $exchange, $routing_key)

This is like the queue_bind with respect to arguments. This command unbinds the queue from the exchange.

publish($channel, $routing_key, $body, $options, $props)

$channel is a channel that has been opened with channel_open.

$routing_key is the name of the routing key for this message.

$body is the payload to enqueue.

$options is an optional hash respecting the following keys:

{
  exchange => $exchange,   #default 'amq.direct'
  mandatory => $boolean,   #default 0
  immediate => $boolean,   #default 0
}

$props is an optional hash (the AMQP 'props') respecting the following keys: { content_type => $string, content_encoding => $string, correlation_id => $string, reply_to => $string, expiration => $string, message_id => $string, type => $string, user_id => $string, app_id => $string, delivery_mode => $integer, priority => $integer, timestamp => $integer, }

consume($channel, $queuename, $options)

$channel is a channel that has been opened with channel_open.

$queuename is the name of the queue from which we'd like to consume.

$options is an optional hash respecting the following keys:

{
  consumer_tag => $tag,    #absent by default
  no_local => $boolean,    #default 0
  no_ack => $boolean,      #default 1
  exclusive => $boolean,   #default 0
}

The consumer_tag is returned. This command does not return AMQP frames, it simply notifies RabbitMQ that messages for this queue should be delivered down the specified channel.

recv()

This command receives and reconstructs AMQP frames and returns a hash containing the following information:

{
  body => 'Magic Transient Payload', # the reconstructed body
  routing_key => 'nr_test_q',        # route the message took
  exchange => 'nr_test_x',           # exchange used
  delivery_tag => 1,                 # (used for acks)
  consumer_tag => 'c_tag',           # tag from consume()
  props => $props,                   # hashref sent in
}

$props is the hash sent by publish() respecting the following keys: { content_type => $string, content_encoding => $string, correlation_id => $string, reply_to => $string, expiration => $string, message_id => $string, type => $string, user_id => $string, app_id => $string, delivery_mode => $integer, priority => $integer, timestamp => $integer, }

get($channel, $queuename, $options)

$channel is a channel that has been opened with channel_open.

$queuename is the name of the queue from which we'd like to consume.

$options is an optional hash respecting the following keys:

This command runs an amqp_basic_get which returns undef immediately if no messages are available on the queue and returns a has as follows if a message is available.

{
  body => 'Magic Transient Payload', # the reconstructed body
  routing_key => 'nr_test_q',        # route the message took
  exchange => 'nr_test_x',           # exchange used
  content_type => 'foo',             # (only if specified)
  delivery_tag => 1,                 # (used for acks)
  redelivered => 0,                  # if message is redelivered
  message_count => 0,                # message count
}
ack($channel, $delivery_tag, $multiple = 0)

$channel is a channel that has been opened with channel_open.

$delivery_tag the delivery tag seen from a returned frame from the recv method.

$multiple specifies if multiple are to be acknowledged at once.

purge($channel, $queuename, $no_wait = 0)

$channel is a channel that has been opened with channel_open.

$queuename is the queue to be purged.

$no_wait a boolean specifying if the call should not wait for the server to acknowledge the acknoledgement.

tx_select($channel)

$channel is a channel that has been opened with channel_open.

Start a server-side (tx) transaction over $channel.

tx_commit($channel)

$channel is a channel that has been opened with channel_open.

Commit a server-side (tx) transaction over $channel.

tx_rollback($channel)

$channel is a channel that has been opened with channel_open.

Rollback a server-side (tx) transaction over $channel.

basic_qos($channel, $options)

$channel is a channel that has been opened with channel_open.

$options is an optional hash respecting the following keys:

{
  prefetch_count => $cnt,  #default 0
  prefetch_size  => $size, #default 0
  global         => $bool, #default 0
}

Set quality of service flags on the current $channel.

basic_return($channel, $subroutine)

$channel is a channel that has been opened with channel_open.

$subroutine is a perl coderef that takes two arguments:

$channel is the channel on which information is being returned.

$m the message which is a hash ref containing reply_code,
reply_text, exchange, and routing_key.