NAME

RabbitMQ::Consumer::Batcher - batch consumer of RMQ messages

SYNOPSIS

use AnyEvent;
use AnyEvent::RabbitMQ::PubSub;
use AnyEvent::RabbitMQ::PubSub::Consumer;
use RabbitMQ::Consumer::Batcher;

my ($rmq_connection, $channel) = AnyEvent::RabbitMQ::PubSub::connect(
    host  => 'localhost',
    port  => 5672,
    user  => 'guest',
    pass  => 'guest',
    vhost => '/',
);

my $exchange = {
    exchange    => 'my_test_exchange',
    type        => 'topic',
    durable     => 0,
    auto_delete => 1,
};

my $queue = {
    queue       => 'my_test_queue';
    auto_delete => 1,
};

my $routing_key = 'my_rk';

my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
    channel        => $channel,
    exchange       => $exchange,
    queue          => $queue,
    routing_key    => $routing_key,
);
$consumer->init(); #declares channel, queue and binding

my $batcher = RabbitMQ::Consumer::Batcher->new(
    batch_size              => $consumer->prefetch_count,
    on_add                  => sub {
        my ($batcher, $msg) = @_;

        my $decode_payload = decode_payload($msg->{header}, $msg->{body}->payload());
        return $decode_payload;
    },
    on_add_catch            => sub {
        my ($batcher, $msg, $exception) = @_;

        if ($exception->$_isa('failure') && $exception->{payload}{stats_key}) {
            $stats->increment($exception->{payload}{stats_key});
        }

        if ($exception->$_isa('failure') && $exception->{payload}{reject}) {
            $batcher->reject($msg);
            $log->error("consume failed - reject: $exception\n".$msg->{body}->payload());
        }
        else {
            $batcher->reject_and_republish($msg);
            $log->error("consume failed - republish: $exception");
        }
    },
    on_batch_complete       => sub {
        my ($batcher, $batch) = @_;

        path(...)->spew(join "\t", map { $_->value() } @$batch);
    },
    on_batch_complete_catch => sub {
        my ($batcher, $batch, $exception) = @_;

        $log->error("save messages to file failed: $exception");
    }
);

my $cv = AnyEvent->condvar();
$consumer->consume($cv, $batcher->consume_code())->then(sub {
    say 'Consumer was started...';
});

DESCRIPTION

If you need batch of messages from RabbitMQ - this module is for you.

This module work well with AnyEvent::RabbitMQ::PubSub::Consumer

Idea of this module is - in on_add phase is message validate and if is corrupted, can be reject. In on_batch_complete phase we manipulated with message which we don't miss. If is some problem in this phase, messages are republished..

METHODS

new(%attributes)

attributes

batch_size

Max batch size (trigger for on_batch_complete)

batch_size must be prefetch_count or bigger!

this is required attribute

on_add

this callback are called after consume one single message. Is usefully for decoding for example.

return value of callback are used as value in batch item (RabbitMQ::Consumer::Batcher::Item)

default behaviour is payload of message is used as item in batch

return sub {
    my($batcher, $msg) = @_;
    return $msg->{body}->payload()
}

parameters which are give to callback:

$batcher

self instance of RabbitMQ::Consumer::Batcher

$msg

consumed message "on_consume" in AnyEvent::RabbitMQ::Channel

on_add_catch

this callback are called if on_add callback throws

default behaviour do reject message

return sub {
    my ($batcher, $msg, $exception) = @_;

    $batcher->reject($msg);
}

parameters which are give to callback:

$batcher

self instance of RabbitMQ::Consumer::Batcher

$msg

consumed message "on_consume" in AnyEvent::RabbitMQ::Channel

$exception

exception string

on_batch_complete

this callback is triggered if batch is complete (count of items is batch_size)

this is required attribute

parameters which are give to callback:

$batcher

self instance of RabbitMQ::Consumer::Batcher

$batch

batch is ArrayRef of RabbitMQ::Consumer::Batcher::Item

example on_batch_complete CodeRef (item value are strings)

return sub {
    my($batcher, $batch) = @_;

    print join "\n", map { $_->value() } @$batch;
    $batcher->ack($batch);
}

on_batch_complete_catch

this callback are called if on_batch_complete callback throws

after this callback is batch reject_and_republish

If you need change reject_and_republish of batch to (for example) reject, you can do:

return sub {
    my ($batcher, $batch, $exception) = @_;

    $batcher->reject($batch);
    #batch_clean must be called,
    #because reject_and_republish after this exception handler will be called to...
    $batcher->batch_clean();
}

parameters which are give to callback:

$batcher

self instance of RabbitMQ::Consumer::Batcher

$batch

ArrayRef of RabbitMQ::Consumer::Batcher::Items

$exception

exception string

consume_code()

return sub{} for handling messages in consume method of AnyEvent::RabbitMQ::PubSub::Consumer

$consumer->consume($cv, $batcher->consume_code());

ack(@items)

ack all @items (instances of RabbitMQ::Consumer::Batcher::Item or RabbitMQ::Consumer::Batcher::Message)

reject(@items)

reject all @items (instances of RabbitMQ::Consumer::Batcher::Item or RabbitMQ::Consumer::Batcher::Message)

reject_and_republish(@items)

reject and republish all @items (instances of RabbitMQ::Consumer::Batcher::Item or RabbitMQ::Consumer::Batcher::Message)

contributing

for dependency use cpanfile...

for resolve dependency use Carton (or Carmel - is more experimental)

carton install

for run test use minil test

carton exec minil test

if you don't have perl environment, is best way use docker

docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton install
docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton exec minil test

warning

docker run default as root, all files which will be make in docker will be have root rights

one solution is change rights in docker

docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended bash -c "carton install; chmod -R 0777 ."

or after docker command (but you must have root rights)

LICENSE

Copyright (C) Avast Software.

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

AUTHOR

Jan Seidl <seidl@avast.com>