NAME

DR::TarantoolQueue::Worker - template for workers

SYNOPSIS

my $worker = DR::TarantoolQueue::Worker->new(
    count       => 10,      # defaults 1
    queue       => $queue
);

sub process {
    my ($task) = @_;


    ... do something with task


}

$worker->run(\&process)

DESCRIPTION

  • Process function can throw exception. The task will be buried (if process function didn't change task status yet.

  • If process function didn't change task status (didn't call ack, or "release" in DR::TarantoolQueue::Task) worker calls "ack" in DR::TarantoolQueue::Task.

  • run method catches SIGTERM and SIGINT and waits for all process functions are done and then do return.

  • Worker uses default tube and space in queue. So You have to define them in Your queue or here.

ATTRIBUTES

count

Count of process functions that can do something at the same time. Default value is 1. The attribute means something if Your process function uses Coro and Your queue uses Coro, too.

queue

Ref to Your queue.

space & tube

Space and tube for processing queue.

restart

The function will be called if restart_limit is reached.

restart_limit

How many tasks can be processed before restart worker.

If restart_limit is 0, restart mechanizm will be disabled.

If restart callback isn't defined, restart mechanizm will be disabled.

Each processed task increments common taskcounter. When restart_limit is reached by the counter, worker don't take new task and call restart function. After restart worker will continue to process tasks.

In restart callback user can do "exec" in perlfunc or "exit" in perlfunc to avoid memory leaks.

DR::TarantoolQueue::Worker->new(
    restart_limit   => 100,
    restart         => sub { exec perl => $0 },
    queue           => $q,
    count           => 10
)->run(sub { ... });

PRIVATE ATTRIBUTES

timeout

timeout for queue.take

is_run

True means that workers are run

is_stopping

True means that workers are stopping (by SIGTERM/SIGINT/stop)

METHODS

run(CODEREF[, CODEREF])

Run workers. Two arguments:

process function

Function will receive three arguments:

task
queue
task number
debug function

The function can be used to show internal debug messages.

stop

Stop worker cycle