NAME
AnyEvent::RabbitMQ::Simple - Easy to use asynchronous AMQP client
VERSION
version 0.02
SYNOPSIS
use
strict;
use
warnings;
# create main loop
my
$loop
= AE::cv;
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
host
=>
'127.0.0.1'
,
port
=> 5672,
user
=>
'username'
,
pass
=>
'password'
,
vhost
=>
'/'
,
timeout
=> 1,
tls
=> 0,
verbose
=> 0,
confirm_publish
=> 1,
prefetch_count
=> 10,
failure_cb
=>
sub
{
my
(
$event
,
$details
,
$why
) =
@_
;
if
(
ref
$why
) {
my
$method_frame
=
$why
->method_frame;
$why
=
$method_frame
->reply_text;
}
$loop
->croak(
"[ERROR] $event($details): $why"
);
},
# routing layout
# [========== exchanges ===================] [===== queues ==============]
# [ (type/routing key) ] [ (routing key) ]
# logger ----------> stats --------------> stats-logs
# |(fanout) (direct) (mail.stats)
# | |
# | | \----------> errors -------------> ftp-error-logs
# | | | (topic:*.error.#) (ftp.error.#)
# | | |
# | | \-------------------> mail-error-logs
# | | (mail.error.#)
# | |
# | \-----------> info ---------------> info-logs
# | (topic:*.info.#) (*.info.#)
# |
# \------------------------------------> debug-queue
# declare exchanges
exchanges
=> [
'logger'
=> {
durable
=> 0,
type
=>
'fanout'
,
internal
=> 0,
auto_delete
=> 1,
},
'stats'
=> {
durable
=> 0,
type
=>
'direct'
,
internal
=> 0,
auto_delete
=> 1,
},
'errors'
=> {
durable
=> 0,
type
=>
'topic'
,
internal
=> 0,
auto_delete
=> 1,
},
'info'
=> {
durable
=> 0,
type
=>
'topic'
,
internal
=> 0,
auto_delete
=> 1,
},
],
# declare queues
queues
=> [
'debug-queue'
=> {
durable
=> 0,
auto_delete
=> 1,
},
'stats-logs'
=> {
durable
=> 0,
auto_delete
=> 1,
},
'ftp-error-logs'
=> {
durable
=> 0,
auto_delete
=> 1,
},
'mail-error-logs'
=> {
durable
=> 0,
auto_delete
=> 1,
},
'info-logs'
=> {
durable
=> 0,
auto_delete
=> 1,
},
],
# exchange to exchange bindings, with optional routing key
bind_exchanges
=> [
{
'stats'
=>
'logger'
},
{
'errors'
=> [
'logger'
,
'*.error.#'
] },
{
'info'
=> [
'logger'
,
'*.info.#'
] },
],
# queue to exchange bindings, with optional routing key
bind_queues
=> [
{
'debug-queue'
=>
'logger'
},
{
'ftp-error-logs'
=> [
'errors'
,
'ftp.error.#'
] },
{
'mail-error-logs'
=> [
'errors'
,
'mail.error.#'
] },
{
'info-logs'
=> [
'info'
,
'info.#'
] },
{
'stats-logs'
=> [
'stats'
,
'mail.stats'
] },
],
);
# publisher timer
my
$t
;
# connect and set up channel
my
$conn
=
$rmq
->
connect
();
$conn
->cb(
sub
{
"waiting for channel..\n"
;
my
$channel
=
shift
->
recv
or
$loop
->croak(
"Could not open channel"
);
"************* consuming\n"
;
for
my
$q
(
qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs )
) {
consume(
$channel
,
$q
);
}
"************* starting publishing\n"
;
$t
= AE::timer 0, 1.0,
sub
{ publish(
$channel
,
"message prepared at "
.
scalar
(
localtime
) ) };
}
);
# consumes from requested queue
sub
consume {
my
(
$channel
,
$queue
) =
@_
;
my
$consumer_tag
;
$channel
->consume(
queue
=>
$queue
,
no_ack
=> 0,
on_success
=>
sub
{
my
$frame
=
shift
;
$consumer_tag
=
$frame
->method_frame->consumer_tag;
"************* consuming from $queue with $consumer_tag\n"
;
},
on_consume
=>
sub
{
my
$res
=
shift
;
my
$body
=
$res
->{body}->payload;
"+++++++++++++ consumed($queue): $body\n"
;
$channel
->ack(
delivery_tag
=>
$res
->{deliver}->method_frame->delivery_tag
);
},
on_failure
=>
sub
{
"************* failed to consume($queue)\n"
;
}
);
}
# randomly generates routing key and message body
sub
publish {
my
(
$channel
,
$msg
) =
@_
;
unless
(
$channel
->is_open ) {
warn
"Cannot publish, channel closed"
;
return
;
}
my
@system
=
qw( mail ftp web )
;
my
@levels
=
qw( debug info error stats )
;
my
$routing_key
=
$system
[
rand
@system
] .
'.'
.
$levels
[
rand
@levels
];
$msg
=
sprintf
(
"[%s] %s"
,
uc
(
$routing_key
),
$msg
);
"\n------- publishing: $msg\n"
;
$channel
->publish(
routing_key
=>
$routing_key
,
exchange
=>
'logger'
,
body
=>
$msg
,
on_ack
=>
sub
{
"------- published: $msg\n"
;
},
on_return
=>
sub
{
"************* failed to publish: $msg\n"
;
}
);
}
# wait forever or die on error
my
$done
=
$loop
->
recv
;
DESCRIPTION
This module is meant to simplify the process of setting up the RabbitMQ channel, so you can start publishing and/or consuming messages without chaining on_success
callbacks.
METHODS
new
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
...
);
Returns configured object using following parameters:
host
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
host
=>
'127.0.0.1'
,
# default
...
);
Host IP.
port
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
port
=> 5672,
# default
...
);
Port number.
vhost
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
vhost
=>
'/'
,
# default
...
);
Virtual host namespace.
user
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
user
=>
'guest'
,
# default
...
);
User name.
pass
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
pass
=>
'guest'
,
# default
...
);
Password.
tune
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
tune
=> {
heartbeat
=>
$connection_heartbeat
,
channel_max
=>
$max_channel_number
,
frame_max
=>
$max_frame_size
},
...
);
Optional connection tuning options.
timeout
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
timeout
=> 0,
# default
...
);
Connection timeout.
tls
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
tls
=> 0,
# default
...
);
Use TLS.
verbose
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
verbose
=> 0,
# default
...
);
Turn on protocol debug.
confirm_publish
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
confirm_publish
=> 0,
# default
...
);
Turn on confirm mode on channel. If set it enables the on_ack
callback of channel's publish
method.
prefetch_count
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
prefetch_count
=> 0,
# default
...
);
Specify the number of prefetched messages when consuming from the channel.
exchange
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
exchange
=>
'name_of_exchange'
,
...
);
Optional name of exchange to declare with its default configuration options.
See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for details.
exchanges
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
exchanges
=> [
'name_of_exchange'
=> {
durable
=> 1,
type
=>
'fanout'
,
...
# other exchange configuration parameters
},
...
],
...
);
Optional list of exchanges to declare with their configuration options.
See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for details.
queue
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
queue
=>
'name_of_queue'
,
...
);
Optional name of queue to declare with its default configuration options.
If no queues were declared or empty name has been specified a unique generated queue name will be available:
my
$gen_queue
=
$rmq
->gen_queue;
See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.
queues
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
queues
=> [
'name_of_queue'
=> {
durable
=> 1,
no_ack
=> 0,
...
# other queue configuration parameters
},
...
],
...
);
Optional list of queues to declare with their configuration options.
See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.
bind_exchanges
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
bind_exchanges
=> [
# without routing key
{
'destination1'
=>
'source'
},
# with routing key
{
'destination2'
=> [
'source'
,
'routing_key'
] },
...
],
...
);
Optional list of exchange-to-exchange bindings.
See "bind_exchange" in AnyEvent::RabbitMQ::Channel for details.
bind_queues
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
bind_queues
=> [
# without routing key
{
'queue1'
=>
'exchange'
},
# with routing key
{
'queue2'
=> [
'exchange'
,
'routing_key'
] },
...
],
...
);
Optional list of queue-to-exchange bindings.
See "bind_queue" in AnyEvent::RabbitMQ::Channel for details.
failure_cb
my
$rmq
= AnyEvent::RabbitMQ::Simple->new(
failure_cb
=>
sub
{
my
(
$event
,
$details
,
$why
) =
@_
;
if
(
ref
$why
) {
my
$method_frame
=
$why
->method_frame;
$why
=
$method_frame
->reply_text;
}
$loop
->croak(
"[ERROR] $event($details): $why"
);
},
...
);
Required catch-all error handling callback. The value of $event
is one of:
- ConnectOnFailure
- ConnectOnReadFailure
- ConnectOnReturn
- ConnectOnClose
- OpenChannelOnFailure
- OpenChannelOnReturn
- OpenChannelOnClose
- DeclareExchangeOnFailure
-
Value of
$details
has following format:name:$name_of_exchange
. - BindExchangeOnFailure
-
Value of
$details
has following format:source:$name_of_source_exchange, destination:$name_of_destination_exchange
. - DeclareQueueOnFailure
-
Value of
$details
has following format:name:$name_of_queue
. - BindQueueOnFailure
-
Value of
$details
has following format:queue:$name_of_queue, exchange:$name_of_exchange
. - ConfirmChannelOnFailure
- QosChannelOnFailure
connect
my
$conn
=
$rmq
->
connect
();
$conn
->cb(
sub
{
my
$channel
=
shift
->
recv
or
$loop
->croak(
"Could not open channel"
);
...
}
);
Returns the AnyEvent condvar that returns AnyEvent::RabbitMQ::Channel object after all the configuration steps were successful.
disconnect
$rmq
->disconnect();
Disconnects from RabbitMQ server.
gen_queue
my
$gen_queue
=
$rmq
->gen_queue;
Name of the generated queue if no queues were declared (or queue with empty name has been specified).
SEE ALSO
AUTHOR
Alex J. G. Burzyński <ajgb@cpan.org>
COPYRIGHT AND LICENSE
This software is copyright (c) 2016 by Alex J. G. Burzyński <ajgb@cpan.org>.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.