NAME
AnyEvent::RabbitMQ::Simple - Easy to use asynchronous AMQP client
VERSION
version 0.02
SYNOPSIS
use strict;
use warnings;
use AnyEvent::RabbitMQ::Simple;
# create main loop
my $loop = AE::cv;
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    host       => '127.0.0.1',
    port       => 5672,
    user       => 'username',
    pass       => 'password',
    vhost      => '/',
    timeout    => 1,
    tls        => 0,
    verbose    => 0,
    confirm_publish => 1,
    prefetch_count => 10,
    failure_cb => sub {
        my ($event, $details, $why) = @_;
        if ( ref $why ) {
            my $method_frame = $why->method_frame;
            $why = $method_frame->reply_text;
        }
        $loop->croak("[ERROR] $event($details): $why" );
    },
    # routing layout
    # [========== exchanges ===================] [===== queues ==============]
    # [             (type/routing key)         ] [        (routing key) ]
    #  logger ----------> stats -------------->   stats-logs
    #   |(fanout)           (direct)                (mail.stats)
    #   |  |
    #   |  | \----------> errors ------------->   ftp-error-logs
    #   |  |              | (topic:*.error.#)       (ftp.error.#)
    #   |  |              |
    #   |  |              \------------------->   mail-error-logs
    #   |  |                                        (mail.error.#)
    #   |  |
    #   |   \-----------> info --------------->   info-logs
    #   |                   (topic:*.info.#)        (*.info.#)
    #   |
    #    \------------------------------------>   debug-queue
    # declare exchanges
    exchanges => [
        'logger' => {
            durable => 0,
            type => 'fanout',
            internal => 0,
            auto_delete => 1,
        },
        'stats' => {
            durable => 0,
            type => 'direct',
            internal => 0,
            auto_delete => 1,
        },
        'errors' => {
            durable => 0,
            type => 'topic',
            internal => 0,
            auto_delete => 1,
        },
        'info' => {
            durable => 0,
            type => 'topic',
            internal => 0,
            auto_delete => 1,
        },
    ],
    # declare queues
    queues => [
        'debug-queue' => {
            durable => 0,
            auto_delete => 1,
        },
        'stats-logs' => {
            durable => 0,
            auto_delete => 1,
        },
        'ftp-error-logs' => {
            durable => 0,
            auto_delete => 1,
        },
        'mail-error-logs' => {
            durable => 0,
            auto_delete => 1,
        },
        'info-logs' => {
            durable => 0,
            auto_delete => 1,
        },
    ],
    # exchange to exchange bindings, with optional routing key
    bind_exchanges => [
        { 'stats'   =>   'logger'                 },
        { 'errors'  => [ 'logger', '*.error.#' ]  },
        { 'info'    => [ 'logger', '*.info.#'  ]  },
    ],
    # queue to exchange bindings, with optional routing key
    bind_queues => [
        { 'debug-queue'     =>   'logger'                   },
        { 'ftp-error-logs'  => [ 'errors', 'ftp.error.#'  ] },
        { 'mail-error-logs' => [ 'errors', 'mail.error.#' ] },
        { 'info-logs'       => [ 'info',   'info.#'       ] },
        { 'stats-logs'      => [ 'stats',  'mail.stats'   ] },
    ],
);
# publisher timer
my $t;
# connect and set up channel
my $conn = $rmq->connect();
$conn->cb(
    sub {
        print "waiting for channel..\n";
        my $channel = shift->recv or $loop->croak("Could not open channel");
        print "************* consuming\n";
        for my $q ( qw( debug-queue ftp-error-logs mail-error-logs info-logs stats-logs ) ) {
            consume($channel, $q);
        }
        print "************* starting publishing\n";
        $t = AE::timer 0, 1.0, sub { publish($channel, "message prepared at ". scalar(localtime) ) };
    }
);
# consumes from requested queue
sub consume {
    my ($channel, $queue) = @_;
    my $consumer_tag;
    $channel->consume(
        queue => $queue,
        no_ack => 0,
        on_success => sub {
            my $frame = shift;
            $consumer_tag = $frame->method_frame->consumer_tag;
            print "************* consuming from $queue with $consumer_tag\n";
        },
        on_consume => sub {
            my $res = shift;
            my $body = $res->{body}->payload;
            print "+++++++++++++ consumed($queue): $body\n";
            $channel->ack(
                delivery_tag => $res->{deliver}->method_frame->delivery_tag
            );
        },
        on_failure => sub {
            print "************* failed to consume($queue)\n";
        }
    );
}
# randomly generates routing key and message body
sub publish {
    my ($channel, $msg) = @_;
    unless ( $channel->is_open ) {
        warn "Cannot publish, channel closed";
        return;
    }
    my @system = qw( mail ftp web );
    my @levels = qw( debug info error stats );
    my $routing_key = $system[rand @system] .'.'. $levels[ rand @levels ];
    $msg = sprintf("[%s] %s", uc($routing_key), $msg);
    print "\n------- publishing: $msg\n";
    $channel->publish(
        routing_key => $routing_key,
        exchange => 'logger',
        body => $msg,
        on_ack => sub {
            print "------- published: $msg\n";
        },
        on_return => sub {
            print "************* failed to publish: $msg\n";
        }
    );
}
# wait forever or die on error
my $done = $loop->recv;
DESCRIPTION
This module is meant to simplify the process of setting up the RabbitMQ channel, so you can start publishing and/or consuming messages without chaining on_success callbacks.
METHODS
new
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    ...
);
Returns configured object using following parameters:
host
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    host => '127.0.0.1', # default
    ...
);
Host IP.
port
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    port => 5672, # default
    ...
);
Port number.
vhost
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    vhost => '/', # default
    ...
);
Virtual host namespace.
user
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    user => 'guest', # default
    ...
);
User name.
pass
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    pass => 'guest', # default
    ...
);
Password.
tune
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    tune => {
        heartbeat => $connection_heartbeat,
        channel_max => $max_channel_number,
        frame_max => $max_frame_size
    },
    ...
);
Optional connection tuning options.
timeout
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    timeout => 0, # default
    ...
);
Connection timeout.
tls
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    tls => 0, # default
    ...
);
Use TLS.
verbose
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    verbose => 0, # default
    ...
);
Turn on protocol debug.
confirm_publish
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    confirm_publish => 0, # default
    ...
);
Turn on confirm mode on channel. If set it enables the on_ack callback of channel's publish method.
prefetch_count
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    prefetch_count => 0, # default
    ...
);
Specify the number of prefetched messages when consuming from the channel.
exchange
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    exchange => 'name_of_exchange',
    ...
);
Optional name of exchange to declare with its default configuration options.
See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for details.
exchanges
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    exchanges => [
        'name_of_exchange' => {
            durable => 1,
            type => 'fanout',
            ... # other exchange configuration parameters
        },
        ...
    ],
    ...
);
Optional list of exchanges to declare with their configuration options.
See "declare_exchange (%args)" in AnyEvent::RabbitMQ::Channel for details.
queue
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    queue => 'name_of_queue',
    ...
);
Optional name of queue to declare with its default configuration options.
If no queues were declared or empty name has been specified a unique generated queue name will be available:
my $gen_queue = $rmq->gen_queue;
See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.
queues
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    queues => [
        'name_of_queue' => {
            durable => 1,
            no_ack => 0,
            ... # other queue configuration parameters
        },
        ...
    ],
    ...
);
Optional list of queues to declare with their configuration options.
See "declare_queue" in AnyEvent::RabbitMQ::Channel for details.
bind_exchanges
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    bind_exchanges => [
        # without routing key
        { 'destination1' => 'source' },
        # with routing key
        { 'destination2'  => [ 'source', 'routing_key' ]  },
        ...
    ],
    ...
);
Optional list of exchange-to-exchange bindings.
See "bind_exchange" in AnyEvent::RabbitMQ::Channel for details.
bind_queues
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    bind_queues => [
        # without routing key
        { 'queue1' => 'exchange' },
        # with routing key
        { 'queue2'  => [ 'exchange', 'routing_key' ]  },
        ...
    ],
    ...
);
Optional list of queue-to-exchange bindings.
See "bind_queue" in AnyEvent::RabbitMQ::Channel for details.
failure_cb
my $rmq = AnyEvent::RabbitMQ::Simple->new(
    failure_cb => sub {
        my ($event, $details, $why) = @_;
        if ( ref $why ) {
            my $method_frame = $why->method_frame;
            $why = $method_frame->reply_text;
        }
        $loop->croak("[ERROR] $event($details): $why" );
    },
    ...
);
Required catch-all error handling callback. The value of $event is one of:
- ConnectOnFailure
 - ConnectOnReadFailure
 - ConnectOnReturn
 - ConnectOnClose
 - OpenChannelOnFailure
 - OpenChannelOnReturn
 - OpenChannelOnClose
 - DeclareExchangeOnFailure
 - 
Value of
$detailshas following format:name:$name_of_exchange. - BindExchangeOnFailure
 - 
Value of
$detailshas following format:source:$name_of_source_exchange, destination:$name_of_destination_exchange. - DeclareQueueOnFailure
 - 
Value of
$detailshas following format:name:$name_of_queue. - BindQueueOnFailure
 - 
Value of
$detailshas following format:queue:$name_of_queue, exchange:$name_of_exchange. - ConfirmChannelOnFailure
 - QosChannelOnFailure
 
connect
my $conn = $rmq->connect();
$conn->cb(
    sub {
        my $channel = shift->recv or $loop->croak("Could not open channel");
        ...
    }
);
Returns the AnyEvent condvar that returns AnyEvent::RabbitMQ::Channel object after all the configuration steps were successful.
disconnect
$rmq->disconnect();
Disconnects from RabbitMQ server.
gen_queue
my $gen_queue = $rmq->gen_queue;
Name of the generated queue if no queues were declared (or queue with empty name has been specified).
SEE ALSO
AUTHOR
Alex J. G. Burzyński <ajgb@cpan.org>
COPYRIGHT AND LICENSE
This software is copyright (c) 2016 by Alex J. G. Burzyński <ajgb@cpan.org>.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.