NAME
RabbitMQ::Consumer::Batcher - batch consumer of RMQ messages
SYNOPSIS
use AnyEvent;
use AnyEvent::RabbitMQ::PubSub;
use AnyEvent::RabbitMQ::PubSub::Consumer;
use RabbitMQ::Consumer::Batcher;
my ($rmq_connection, $channel) = AnyEvent::RabbitMQ::PubSub::connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
);
my $exchange = {
exchange => 'my_test_exchange',
type => 'topic',
durable => 0,
auto_delete => 1,
};
my $queue = {
queue => 'my_test_queue';
auto_delete => 1,
};
my $routing_key = 'my_rk';
my $consumer = AnyEvent::RabbitMQ::PubSub::Consumer->new(
channel => $channel,
exchange => $exchange,
queue => $queue,
routing_key => $routing_key,
);
$consumer->init(); #declares channel, queue and binding
my $batcher = RabbitMQ::Consumer::Batcher->new(
batch_size => $consumer->prefetch_count,
on_add => sub {
my ($batcher, $msg) = @_;
my $decode_payload = decode_payload($msg->{header}, $msg->{body}->payload());
return $decode_payload;
},
on_add_catch => sub {
my ($batcher, $msg, $exception) = @_;
if ($exception->$_isa('failure') && $exception->{payload}{stats_key}) {
$stats->increment($exception->{payload}{stats_key});
}
if ($exception->$_isa('failure') && $exception->{payload}{reject}) {
$batcher->reject($msg);
$log->error("consume failed - reject: $exception\n".$msg->{body}->payload());
}
else {
$batcher->reject_and_republish($msg);
$log->error("consume failed - republish: $exception");
}
},
on_batch_complete => sub {
my ($batcher, $batch) = @_;
path(...)->spew(join "\t", map { $_->value() } @$batch);
},
on_batch_complete_catch => sub {
my ($batcher, $batch, $exception) = @_;
$log->error("save messages to file failed: $exception");
}
);
my $cv = AnyEvent->condvar();
$consumer->consume($cv, $batcher->consume_code())->then(sub {
say 'Consumer was started...';
});
DESCRIPTION
If you need batch of messages from RabbitMQ - this module is for you.
This module work well with AnyEvent::RabbitMQ::PubSub::Consumer
Idea of this module is - in on_add phase is message validate and if is corrupted, can be reject. In on_batch_complete phase we manipulated with message which we don't miss. If is some problem in this phase, messages are republished..
METHODS
new(%attributes)
attributes
batch_size
Max batch size (trigger for on_batch_complete
)
batch_size
must be prefetch_count
or bigger!
this is required attribute
on_add
this callback are called after consume one single message. Is usefully for decoding for example.
return value of callback are used as value in batch item (RabbitMQ::Consumer::Batcher::Item)
default behaviour is payload of message is used as item in batch
return sub {
my($batcher, $msg) = @_;
return $msg->{body}->payload()
}
parameters which are give to callback:
$batcher
-
self instance of RabbitMQ::Consumer::Batcher
$msg
-
consumed message "on_consume" in AnyEvent::RabbitMQ::Channel
on_add_catch
this callback are called if on_add
callback throws
default behaviour do reject message
return sub {
my ($batcher, $msg, $exception) = @_;
$batcher->reject($msg);
}
parameters which are give to callback:
$batcher
-
self instance of RabbitMQ::Consumer::Batcher
$msg
-
consumed message "on_consume" in AnyEvent::RabbitMQ::Channel
$exception
-
exception string
on_batch_complete
this callback is triggered if batch is complete (count of items is batch_size
)
this is required attribute
parameters which are give to callback:
$batcher
-
self instance of RabbitMQ::Consumer::Batcher
$batch
-
batch is ArrayRef of RabbitMQ::Consumer::Batcher::Item
example on_batch_complete
CodeRef (item value are strings)
return sub {
my($batcher, $batch) = @_;
print join "\n", map { $_->value() } @$batch;
$batcher->ack($batch);
}
on_batch_complete_catch
this callback are called if on_batch_complete
callback throws
after this callback is batch reject_and_republish
If you need change reject_and_republish of batch to (for example) reject, you can do:
return sub {
my ($batcher, $batch, $exception) = @_;
$batcher->reject($batch);
#batch_clean must be called,
#because reject_and_republish after this exception handler will be called to...
$batcher->batch_clean();
}
parameters which are give to callback:
$batcher
-
self instance of RabbitMQ::Consumer::Batcher
$batch
-
ArrayRef of RabbitMQ::Consumer::Batcher::Items
$exception
-
exception string
consume_code()
return sub{}
for handling messages in consume
method of AnyEvent::RabbitMQ::PubSub::Consumer
$consumer->consume($cv, $batcher->consume_code());
ack(@items)
ack all @items
(instances of RabbitMQ::Consumer::Batcher::Item or RabbitMQ::Consumer::Batcher::Message)
reject(@items)
reject all @items
(instances of RabbitMQ::Consumer::Batcher::Item or RabbitMQ::Consumer::Batcher::Message)
reject_and_republish(@items)
reject and republish all @items
(instances of RabbitMQ::Consumer::Batcher::Item or RabbitMQ::Consumer::Batcher::Message)
contributing
for dependency use cpanfile...
for resolve dependency use Carton (or Carmel - is more experimental)
carton install
for run test use minil test
carton exec minil test
if you don't have perl environment, is best way use docker
docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton install
docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended carton exec minil test
warning
docker run default as root, all files which will be make in docker will be have root rights
one solution is change rights in docker
docker run -it -v $PWD:/tmp/work -w /tmp/work avastsoftware/perl-extended bash -c "carton install; chmod -R 0777 ."
or after docker command (but you must have root rights)
LICENSE
Copyright (C) Avast Software.
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.
AUTHOR
Jan Seidl <seidl@avast.com>