NAME
Net::AMQP::RabbitMQ::Batch - simple batch processing of messages for RabbitMQ.
SYNOPSIS
my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;
$rb->process({
    from_queue  => 'test_in',
    routing_key => 'test_out',
    handler     => \&msg_handler,
    batch       => {
        size          => 10,        # batch size
        timeout       => 2,         #
        ignore_size   => 0          # ignore in/out batches size mismatch
    },
    ignore_errors => 0,             # ignore handler errors
    publish_options => {
        exchange => 'exchange_out', # exchange name, default is 'amq.direct'
    },
});
sub msg_handler {
    my $messages = shift;
    # work with 10 messages
    return $messages;
}
DESCRIPTION
Assume you read messages from a queue, process them and publish. But you would like to do it in batches, processing many messages at once.
This module:
gets messages from in queue and publish them by routing key
uses your handler to batch process messages
keeps persistency - if processing fails, nothing lost from input queue, nothing published
USAGE
Define a messages handler:
sub msg_handler {
    my $messages = shift;
    # works with hashref of messages
    return $messages;
}
$messages is an arrayref of message objects:
{
    body => 'Magic Transient Payload', # the reconstructed body
    routing_key => 'nr_test_q',        # route the message took
    delivery_tag => 1,                 # (used for acks)
    ....
    # Not all of these will be present. Consult the RabbitMQ reference for more details.
    props => { ... }
}
Handler should return arrayref of message objects (only body is required):
[
    { body => 'Processed message' },
    ...
]
Connect to RabbitMQ:
my $rb = Net::AMQP::RabbitMQ::Batch->new('localhost', { user => 'guest', password => 'guest' }) or croak;
And process a batch:
$rb->process({
    from_queue  => 'test_in',
    routing_key => 'test_out',
    handler     => \&msg_handler,
    batch       => { size => 10 }
});
You might like to wrap it with while(1) {...} loop. See process_in_batches.pl or process_in_forked_batches.pl for example.
METHODS
process()
Known Issues
Can not set infinity timeout (use very long int)
No individual messages processing possible
No tests yet which is very sad :(
AUTHORS
Alex Svetkin
LICENSE
MIT