NAME

Minion::Backend::mysql

VERSION

version 1.008

SYNOPSIS

use Mojolicious::Lite;

plugin Minion => {mysql => 'mysql://user@127.0.0.1/minion_jobs'};

# Slow task
app->minion->add_task(poke_mojo => sub {
  my $job = shift;
  $job->app->ua->get('mojolicio.us');
  $job->app->log->debug('We have poked mojolicio.us for a visitor');
});

# Perform job in a background worker process
get '/' => sub {
  my $c = shift;
  $c->minion->enqueue('poke_mojo');
  $c->render(text => 'We will poke mojolicio.us for you soon.');
};

app->start;

DESCRIPTION

Minion::Backend::mysql is a backend for Minion based on Mojo::mysql. All necessary tables will be created automatically with a set of migrations named minion. This backend requires at least v5.6.5 of MySQL.

NAME

Minion::Backend::mysql - MySQL backend

ATTRIBUTES

Minion::Backend::mysql inherits all attributes from Minion::Backend and implements the following new ones.

mysql

my $mysql   = $backend->mysql;
$backend = $backend->mysql(Mojo::mysql->new);

Mojo::mysql object used to store all data.

no_txn

If true, will not make a transaction around the "enqueue" insertions when a job has parent jobs. Without a transaction, the job could be dequeued before its parent relationships are written to the database. However, since MySQL does not support nested transactions (despite supporting something almost exactly like them...), you can disable transactions for testing by setting this attribute (if you perform your tests in a transaction so they can be rolled back when the test is complete).

METHODS

Minion::Backend::mysql inherits all methods from Minion::Backend and implements the following new ones.

dequeue

my $job_info = $backend->dequeue($worker_id, 0.5);
my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']});

Wait for job, dequeue it and transition from inactive to active state or return undef if queues were empty.

These options are currently available:

min_priority
min_priority => 3

Do not dequeue jobs with a lower priority.

queues
queues => ['important']

One or more queues to dequeue jobs from, defaults to default.

tasks
tasks => ['foo', 'bar']

One or more tasks to dequeue jobs for, defaults to all tasks.

These fields are currently available:

args
args => ['foo', 'bar']

Job arguments.

id
id => '10023'

Job ID.

retries
retries => 3

Number of times job has been retried.

task
task => 'foo'

Task name.

dispatch_schedules

my $dispatched = $backend->dispatch_schedules;

Enqueue jobs for all schedules whose firing time has been reached, advance their firing times to the next match, and return information about each dispatch as an array reference. A Postgres advisory lock is held for the duration of the dispatch cycle so that multiple workers ticking at the same time will not produce duplicate enqueues.

Each entry contains id, job and name.

enqueue

my $job_id = $backend->enqueue('foo');
my $job_id = $backend->enqueue(foo => [@args]);
my $job_id = $backend->enqueue(foo => [@args] => {priority => 1});

Enqueue a new job with inactive state.

These options are currently available:

attempts
attempts => 25

Number of times performing this job will be attempted, with a delay based on "backoff" in Minion after the first attempt, defaults to 1.

delay
delay => 10

Delay job for this many seconds (from now).

expire
expire => 300

Job is valid for this many seconds (from now) before it expires.

lax
lax => 1

Existing jobs this job depends on may also have transitioned to the failed state to allow for it to be processed, defaults to false.

notes
notes => {foo => 'bar', baz => [1, 2, 3]}

Hash reference with arbitrary metadata for this job.

parents
parents => [$id1, $id2, $id3]

One or more existing jobs this job depends on, and that need to have transitioned to the state finished before it can be processed.

priority
priority => 5

Job priority, defaults to 0.

queue
queue => 'important'

Queue to put job in, defaults to default.

fail_job

my $bool = $backend->fail_job($job_id, $retries);
my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!');
my $bool = $backend->fail_job(
  $job_id, $retries, {msg => 'Something went wrong!'});

Transition from active to failed state.

finish_job

my $bool = $backend->finish_job($job_id, $retries);
my $bool = $backend->finish_job($job_id, $retries, 'All went well!');
my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'});

Transition from active to finished state.

job_info

my $job_info = $backend->job_info($job_id);

Get information about a job or return undef if job does not exist.

# Check job state
my $state = $backend->job_info($job_id)->{state};

# Get job result
my $result = $backend->job_info($job_id)->{result};

These fields are currently available:

args
args => ['foo', 'bar']

Job arguments.

created
created => 784111777

Time job was created.

delayed
delayed => 784111777

Time job was delayed to.

finished
finished => 784111777

Time job was finished.

priority
priority => 3

Job priority.

queue
queue => 'important'

Queue name.

result
result => 'All went well!'

Job result.

retried
retried => 784111777

Time job has been retried.

retries
retries => 3

Number of times job has been retried.

started
started => 784111777

Time job was started.

state
state => 'inactive'

Current job state, usually active, failed, finished or inactive.

task
task => 'foo'

Task name.

worker
worker => '154'

Id of worker that is processing the job.

list_jobs

my $batch = $backend->list_jobs($offset, $limit);
my $batch = $backend->list_jobs($offset, $limit, {states => 'inactive'});

Returns the same information as "job_info" but in batches.

These options are currently available:

state
state => 'inactive'

List only jobs in this state.

task
task => 'test'

List only jobs for this task.

list_schedules

my $results = $backend->list_schedules($offset, $limit);
my $results = $backend->list_schedules($offset, $limit, {names => ['daily']});

Returns information about schedules in batches.

# Get the total number of results (without limit)
my $num = $backend->list_schedules(0, 100)->{total};

# Check next firing time
my $results = $backend->list_schedules(0, 1, {names => ['daily']});
my $next    = $results->{schedules}[0]{next_run};

These options are currently available:

before
before => 23

List only schedules before this id.

ids
ids => ['23', '24']

List only schedules with these ids.

names
names => ['foo', 'bar']

List only schedules with these names.

These fields are currently available:

args
args => ['foo', 'bar']

Job arguments used for each enqueued job.

attempts
attempts => 25

Number of attempts each enqueued job will get.

created
created => 784111777

Epoch time the schedule was created.

cron
cron => '0 9 * * 1-5'

Cron expression.

expire
expire => 300

Expiration in seconds for each enqueued job.

id
id => 23

Schedule id.

last_job
last_job => '10025'

Id of the most recently enqueued job, or undef if the schedule has not fired yet.

last_run
last_run => 784111777

Epoch time the schedule last fired, or undef if it has not fired yet.

lax
lax => 0

Lax dependency setting for each enqueued job.

name
name => 'daily'

Schedule name.

next_run
next_run => 784111777

Epoch time the schedule will fire next.

notes
notes => {foo => 'bar'}

Hash reference with arbitrary metadata applied to each enqueued job.

paused
paused => 0

True if the schedule is paused and will not fire.

priority
priority => 0

Priority of each enqueued job.

queue
queue => 'default'

Queue each enqueued job is placed in.

task
task => 'foo'

Task name.

list_workers

my $results = $backend->list_workers($offset, $limit);
my $results = $backend->list_workers($offset, $limit, {ids => [23]});

Returns information about workers in batches.

# Get the total number of results (without limit)
my $num = $backend->list_workers(0, 100)->{total};

# Check worker host
my $results = $backend->list_workers(0, 1, {ids => [$worker_id]});
my $host    = $results->{workers}[0]{host};

These options are currently available:

before
before => 23

List only workers before this id.

ids
ids => ['23', '24']

List only workers with these ids.

These fields are currently available:

id
id => 22

Worker id.

host
host => 'localhost'

Worker host.

jobs
jobs => ['10023', '10024', '10025', '10029']

Ids of jobs the worker is currently processing.

notified
notified => 784111777

Epoch time worker sent the last heartbeat.

pid
pid => 12345

Process id of worker.

started
started => 784111777

Epoch time worker was started.

status
status => {queues => ['default', 'important']}

Hash reference with whatever status information the worker would like to share.

new

my $backend = Minion::Backend::mysql->new('mysql://mysql@/test');

Construct a new Minion::Backend::mysql object.

note

my $bool = $backend->note($job_id, {mojo => 'rocks', minion => 'too'});

Change one or more metadata fields for a job. Setting a value to undef will remove the field.

pause_schedule

my $bool = $backend->pause_schedule('daily');

Pause a schedule by name so it stops firing until resumed. Returns true on success, false if the schedule does not exist.

receive

my $commands = $backend->receive($worker_id);

Receive remote control commands for worker.

register_worker

my $worker_id = $backend->register_worker;
my $worker_id = $backend->register_worker($worker_id);

Register worker or send heartbeat to show that this worker is still alive.

remove_job

my $bool = $backend->remove_job($job_id);

Remove failed, finished or inactive job from queue.

repair

$backend->repair;

Repair worker registry and job queue if necessary.

reset

$backend->reset;

Reset job queue.

resume_schedule

my $bool = $backend->resume_schedule('daily');

Resume a previously paused schedule. Returns true on success, false if the schedule does not exist.

retry_job

my $bool = $backend->retry_job($job_id, $retries);
my $bool = $backend->retry_job($job_id, $retries, {delay => 10});

Transition from failed or finished state back to inactive.

These options are currently available:

delay
delay => 10

Delay job for this many seconds (from now).

parents
parents => [$id1, $id2, $id3]

Jobs this job depends on.

priority
priority => 5

Job priority.

queue
queue => 'important'

Queue to put job in.

schedule

my $id = $backend->schedule('daily', '0 4 * * *', 'cleanup');
my $id = $backend->schedule('daily', '0 4 * * *', 'cleanup', [@args]);
my $id = $backend->schedule(
  'daily', '0 4 * * *', 'cleanup', [@args], {priority => 5});

Create or replace a schedule by unique name. Updating a schedule with the same cron expression preserves its current firing time; changing the expression recomputes it.

These options are currently available:

attempts
attempts => 25

Number of times performing each enqueued job will be attempted, defaults to 1.

expire
expire => 300

Each enqueued job is valid for this many seconds before it expires.

lax
lax => 1

Existing jobs each enqueued job depends on may also have transitioned to the failed state, defaults to false.

notes
notes => {foo => 'bar'}

Hash reference with arbitrary metadata applied to each enqueued job.

priority
priority => 5

Priority of each enqueued job, defaults to 0.

queue
queue => 'important'

Queue to put each enqueued job in, defaults to default.

stats

my $stats = $backend->stats;

Get statistics for jobs and workers.

unregister_worker

$backend->unregister_worker($worker_id);

Unregister worker.

worker_info

my $worker_info = $backend->worker_info($worker_id);

Get information about a worker or return undef if worker does not exist.

# Check worker host
my $host = $backend->worker_info($worker_id)->{host};

These fields are currently available:

host
host => 'localhost'

Worker host.

jobs
jobs => ['10023', '10024', '10025', '10029']

Ids of jobs the worker is currently processing.

notified
notified => 784111777

Last time worker sent a heartbeat.

pid
pid => 12345

Process id of worker.

started
started => 784111777

Time worker was started.

PREFETCH AND JOB VOLUME

Currently, the query to look for the next job to run is slower than it could be. Between all the features like dependencies, priorities, retries, and expiration, the query is complicated and requires some expensive data manipulation. It still generally runs in less than one second even for hundreds of thousands of jobs, but if jobs take less than one second to process, a worker will end up spending most of its time looking up new jobs to run, and wasting all the effort it took to get those pending jobs in the right order.

With v1.007, this backend prefetches a number of jobs in one query. These jobs are then fed to worker processes as they ask. If a job is already claimed, the worker tries again to claim the next job.

If you are not seeing the throughput you expect, and your jobs are shorter than one second, you can change the $Minion::Backend::mysql::PREFETCH value to be higher. A good goal is for a worker to perform work for a few seconds before going back to the database. The prefetch cache expires after 30 seconds (which can be adjusted with $Minion::Backend::mysql::PREFETCH_EXPIRY).

ERRORS

DBD::mysql::st execute failed: Table '*.minion_workers' doesn't exist

This may happen when the SQL create/upgrade scripts fail to run completely due to permission errors. Re-running with the environment variable MOJO_MIGRATIONS_DEBUG=1 should produce the error message returned by the database.

A common reason for the database install to fail on MySQL >= 8 is that the user installing the database does not have SUPER privileges needed to create functions when binlogs are enabled: (DBD::mysql::st execute failed: You do not have the SUPER privilege and binary logging is enabled). See the MySQL documentation for Stored Program Binary Logging for more information about this problem and how to correct it.

SEE ALSO

Minion, Minion::Guide, https://minion.pm, Mojolicious::Guides, https://mojolicious.org.

AUTHORS

  • Brian Medley <bpmedley@cpan.org>

  • Doug Bell <preaction@cpan.org>

CONTRIBUTORS

  • a-leelan <40534142+a-leelan@users.noreply.github.com>

  • Alexander Nalobin <nalobin@reg.ru>

  • Dmitry Krylov <pentabion@gmail.com>

  • Hu Yin <huyin8@gmail.com>

  • Jason A. Crome <jcrome@empoweredbenefits.com>

  • Larry Leszczynski <larryl@cpan.org>

  • Olaf Alders <olaf@wundersolutions.com>

  • Paul Cochrane <paul@liekut.de>

  • Peter Joh <peter.joh@grantstreet.com>

  • Sergey Andreev <40195653+saintserge@users.noreply.github.com>

  • Zoffix Znet <cpan@zoffix.com>

COPYRIGHT AND LICENSE

This software is copyright (c) 2021 by Doug Bell and Brian Medley.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.