From Code to Community: Sponsoring The Perl and Raku Conference 2025 Learn more

NAME

AnyEvent::RabbitMQ::Simple - Easy to use asynchronous AMQP client

VERSION

version 0.02

SYNOPSIS

use strict;
# create main loop
my $loop = AE::cv;
my $rmq = AnyEvent::RabbitMQ::Simple->new(
host => '127.0.0.1',
port => 5672,
user => 'username',
pass => 'password',
vhost => '/',
timeout => 1,
tls => 0,
verbose => 0,
confirm_publish => 1,
prefetch_count => 10,
failure_cb => sub {
my ($event, $details, $why) = @_;
if ( ref $why ) {
my $method_frame = $why->method_frame;
$why = $method_frame->reply_text;
}
$loop->croak("[ERROR] $event($details): $why" );
},
# routing layout
# [========== exchanges ===================] [===== queues ==============]
# [ (type/routing key) ] [ (routing key) ]
# logger ----------> stats --------------> stats-logs
# |(fanout) (direct) (mail.stats)
# | |
# | | \----------> errors -------------> ftp-error-logs
# | | | (topic:*.error.#) (ftp.error.#)
# | | |
# | | \-------------------> mail-error-logs
# | | (mail.error.#)
# | |
# | \-----------> info ---------------> info-logs
# | (topic:*.info.#) (*.info.#)
# |
# \------------------------------------> debug-queue
# declare exchanges
exchanges => [
'logger' => {
durable => 0,
type => 'fanout',
internal => 0,
auto_delete => 1,
},
'stats' => {
durable => 0,
type => 'direct',
internal => 0,
auto_delete => 1,
},
'errors' => {
durable => 0,
type => 'topic',
internal => 0,
auto_delete => 1,
},
'info' => {
durable => 0,
type => 'topic',
internal => 0,
auto_delete => 1,
},
],
# declare queues
queues => [
'debug-queue' => {
durable => 0,
auto_delete => 1,
},
'stats-logs' => {
durable => 0,
auto_delete => 1,
},
'ftp-error-logs' => {
durable => 0,
auto_delete => 1,
},
'mail-error-logs' => {
durable => 0,
auto_delete => 1,
},
'info-logs' => {
durable => 0,
auto_delete => 1,
},
],
# exchange to exchange bindings, with optional routing key
bind_exchanges => [
{ 'stats' => 'logger' },
{ 'errors' => [ 'logger', '*.error.#' ] },
{ 'info' => [ 'logger', '*.info.#' ] },
],
# queue to exchange bindings, with optional routing key
bind_queues => [
{ 'debug-queue' => 'logger' },
{ 'ftp-error-logs' => [ 'errors', 'ftp.error.#' ] },
{ 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
{ 'info-logs' => [ 'info', 'info.#' ] },
{ 'stats-logs' => [ 'stats', 'mail.stats' ] },
],
);
# publisher timer
my $t;
# connect and set up channel
my $conn = $rmq->connect();
$conn->cb(
sub {
print "waiting for channel..\n";
my $channel = shift->recv or $loop->croak("Could not open channel");
print "************* consuming\n";
for my $q ( qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs ) ) {
consume($channel, $q);
}
print "************* starting publishing\n";
$t = AE::timer 0, 1.0, sub { publish($channel, "message prepared at ". scalar(localtime) ) };
}
);
# consumes from requested queue
sub consume {
my ($channel, $queue) = @_;
my $consumer_tag;
$channel->consume(
queue => $queue,
no_ack => 0,
on_success => sub {
my $frame = shift;
$consumer_tag = $frame->method_frame->consumer_tag;
print "************* consuming from $queue with $consumer_tag\n";
},
on_consume => sub {
my $res = shift;
my $body = $res->{body}->payload;
print "+++++++++++++ consumed($queue): $body\n";
$channel->ack(
delivery_tag => $res->{deliver}->method_frame->delivery_tag
);
},
on_failure => sub {
print "************* failed to consume($queue)\n";
}
);
}
# randomly generates routing key and message body
sub publish {
my ($channel, $msg) = @_;
unless ( $channel->is_open ) {
warn "Cannot publish, channel closed";
return;
}
my @system = qw( mail ftp web );
my @levels = qw( debug info error stats );
my $routing_key = $system[rand @system] .'.'. $levels[ rand @levels ];
$msg = sprintf("[%s] %s", uc($routing_key), $msg);
print "\n------- publishing: $msg\n";
$channel->publish(
routing_key => $routing_key,
exchange => 'logger',
body => $msg,
on_ack => sub {
print "------- published: $msg\n";
},
on_return => sub {
print "************* failed to publish: $msg\n";
}
);
}
# wait forever or die on error
my $done = $loop->recv;

DESCRIPTION

This module is meant to simplify the process of setting up the RabbitMQ channel, so you can start publishing and/or consuming messages without chaining on_success callbacks.

METHODS

new

my $rmq = AnyEvent::RabbitMQ::Simple->new(
...
);

Returns configured object using following parameters:

host

my $rmq = AnyEvent::RabbitMQ::Simple->new(
host => '127.0.0.1', # default
...
);

Host IP.

port

my $rmq = AnyEvent::RabbitMQ::Simple->new(
port => 5672, # default
...
);

Port number.

vhost

my $rmq = AnyEvent::RabbitMQ::Simple->new(
vhost => '/', # default
...
);

Virtual host namespace.

user

my $rmq = AnyEvent::RabbitMQ::Simple->new(
user => 'guest', # default
...
);

User name.

pass

my $rmq = AnyEvent::RabbitMQ::Simple->new(
pass => 'guest', # default
...
);

Password.

tune

my $rmq = AnyEvent::RabbitMQ::Simple->new(
tune => {
heartbeat => $connection_heartbeat,
channel_max => $max_channel_number,
frame_max => $max_frame_size
},
...
);

Optional connection tuning options.

timeout

my $rmq = AnyEvent::RabbitMQ::Simple->new(
timeout => 0, # default
...
);

Connection timeout.

tls

my $rmq = AnyEvent::RabbitMQ::Simple->new(
tls => 0, # default
...
);

Use TLS.

verbose

my $rmq = AnyEvent::RabbitMQ::Simple->new(
verbose => 0, # default
...
);

Turn on protocol debug.

confirm_publish

my $rmq = AnyEvent::RabbitMQ::Simple->new(
confirm_publish => 0, # default
...
);

Turn on confirm mode on channel. If set it enables the on_ack callback of channel's publish method.

prefetch_count

my $rmq = AnyEvent::RabbitMQ::Simple->new(
prefetch_count => 0, # default
...
);

Specify the number of prefetched messages when consuming from the channel.

exchange

my $rmq = AnyEvent::RabbitMQ::Simple->new(
exchange => 'name_of_exchange',
...
);

Optional name of exchange to declare with its default configuration options.

See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for details.

exchanges

my $rmq = AnyEvent::RabbitMQ::Simple->new(
exchanges => [
'name_of_exchange' => {
durable => 1,
type => 'fanout',
... # other exchange configuration parameters
},
...
],
...
);

Optional list of exchanges to declare with their configuration options.

See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for details.

queue

my $rmq = AnyEvent::RabbitMQ::Simple->new(
queue => 'name_of_queue',
...
);

Optional name of queue to declare with its default configuration options.

If no queues were declared or empty name has been specified a unique generated queue name will be available:

my $gen_queue = $rmq->gen_queue;

See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.

queues

my $rmq = AnyEvent::RabbitMQ::Simple->new(
queues => [
'name_of_queue' => {
durable => 1,
no_ack => 0,
... # other queue configuration parameters
},
...
],
...
);

Optional list of queues to declare with their configuration options.

See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.

bind_exchanges

my $rmq = AnyEvent::RabbitMQ::Simple->new(
bind_exchanges => [
# without routing key
{ 'destination1' => 'source' },
# with routing key
{ 'destination2' => [ 'source', 'routing_key' ] },
...
],
...
);

Optional list of exchange-to-exchange bindings.

See "bind_exchange" in AnyEvent::RabbitMQ::Channel for details.

bind_queues

my $rmq = AnyEvent::RabbitMQ::Simple->new(
bind_queues => [
# without routing key
{ 'queue1' => 'exchange' },
# with routing key
{ 'queue2' => [ 'exchange', 'routing_key' ] },
...
],
...
);

Optional list of queue-to-exchange bindings.

See "bind_queue" in AnyEvent::RabbitMQ::Channel for details.

failure_cb

my $rmq = AnyEvent::RabbitMQ::Simple->new(
failure_cb => sub {
my ($event, $details, $why) = @_;
if ( ref $why ) {
my $method_frame = $why->method_frame;
$why = $method_frame->reply_text;
}
$loop->croak("[ERROR] $event($details): $why" );
},
...
);

Required catch-all error handling callback. The value of $event is one of:

ConnectOnFailure
ConnectOnReadFailure
ConnectOnReturn
ConnectOnClose
OpenChannelOnFailure
OpenChannelOnReturn
OpenChannelOnClose
DeclareExchangeOnFailure

Value of $details has following format: name:$name_of_exchange.

BindExchangeOnFailure

Value of $details has following format: source:$name_of_source_exchange, destination:$name_of_destination_exchange.

DeclareQueueOnFailure

Value of $details has following format: name:$name_of_queue.

BindQueueOnFailure

Value of $details has following format: queue:$name_of_queue, exchange:$name_of_exchange.

ConfirmChannelOnFailure
QosChannelOnFailure

connect

my $conn = $rmq->connect();
$conn->cb(
sub {
my $channel = shift->recv or $loop->croak("Could not open channel");
...
}
);

Returns the AnyEvent condvar that returns AnyEvent::RabbitMQ::Channel object after all the configuration steps were successful.

disconnect

$rmq->disconnect();

Disconnects from RabbitMQ server.

gen_queue

my $gen_queue = $rmq->gen_queue;

Name of the generated queue if no queues were declared (or queue with empty name has been specified).

SEE ALSO

AUTHOR

Alex J. G. Burzyński <ajgb@cpan.org>

COPYRIGHT AND LICENSE

This software is copyright (c) 2016 by Alex J. G. Burzyński <ajgb@cpan.org>.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.