NAME
Minion::Backend::mysql
VERSION
version 1.008
SYNOPSIS
use Mojolicious::Lite;
plugin Minion => {mysql => 'mysql://user@127.0.0.1/minion_jobs'};
# Slow task
app->minion->add_task(poke_mojo => sub {
my $job = shift;
$job->app->ua->get('mojolicio.us');
$job->app->log->debug('We have poked mojolicio.us for a visitor');
});
# Perform job in a background worker process
get '/' => sub {
my $c = shift;
$c->minion->enqueue('poke_mojo');
$c->render(text => 'We will poke mojolicio.us for you soon.');
};
app->start;
DESCRIPTION
Minion::Backend::mysql is a backend for Minion based on Mojo::mysql. All necessary tables will be created automatically with a set of migrations named minion. This backend requires at least v5.6.5 of MySQL.
NAME
Minion::Backend::mysql - MySQL backend
ATTRIBUTES
Minion::Backend::mysql inherits all attributes from Minion::Backend and implements the following new ones.
mysql
my $mysql = $backend->mysql;
$backend = $backend->mysql(Mojo::mysql->new);
Mojo::mysql object used to store all data.
no_txn
If true, will not make a transaction around the "enqueue" insertions when a job has parent jobs. Without a transaction, the job could be dequeued before its parent relationships are written to the database. However, since MySQL does not support nested transactions (despite supporting something almost exactly like them...), you can disable transactions for testing by setting this attribute (if you perform your tests in a transaction so they can be rolled back when the test is complete).
METHODS
Minion::Backend::mysql inherits all methods from Minion::Backend and implements the following new ones.
dequeue
my $job_info = $backend->dequeue($worker_id, 0.5);
my $job_info = $backend->dequeue($worker_id, 0.5, {queues => ['important']});
Wait for job, dequeue it and transition from inactive to active state or return undef if queues were empty.
These options are currently available:
- min_priority
-
min_priority => 3Do not dequeue jobs with a lower priority.
- queues
-
queues => ['important']One or more queues to dequeue jobs from, defaults to
default. - tasks
-
tasks => ['foo', 'bar']One or more tasks to dequeue jobs for, defaults to all tasks.
These fields are currently available:
- args
-
args => ['foo', 'bar']Job arguments.
- id
-
id => '10023'Job ID.
- retries
-
retries => 3Number of times job has been retried.
- task
-
task => 'foo'Task name.
dispatch_schedules
my $dispatched = $backend->dispatch_schedules;
Enqueue jobs for all schedules whose firing time has been reached, advance their firing times to the next match, and return information about each dispatch as an array reference. A Postgres advisory lock is held for the duration of the dispatch cycle so that multiple workers ticking at the same time will not produce duplicate enqueues.
Each entry contains id, job and name.
enqueue
my $job_id = $backend->enqueue('foo');
my $job_id = $backend->enqueue(foo => [@args]);
my $job_id = $backend->enqueue(foo => [@args] => {priority => 1});
Enqueue a new job with inactive state.
These options are currently available:
- attempts
-
attempts => 25Number of times performing this job will be attempted, with a delay based on "backoff" in Minion after the first attempt, defaults to
1. - delay
-
delay => 10Delay job for this many seconds (from now).
- expire
-
expire => 300Job is valid for this many seconds (from now) before it expires.
- lax
-
lax => 1Existing jobs this job depends on may also have transitioned to the
failedstate to allow for it to be processed, defaults tofalse. - notes
-
notes => {foo => 'bar', baz => [1, 2, 3]}Hash reference with arbitrary metadata for this job.
- parents
-
parents => [$id1, $id2, $id3]One or more existing jobs this job depends on, and that need to have transitioned to the state
finishedbefore it can be processed. - priority
-
priority => 5Job priority, defaults to
0. - queue
-
queue => 'important'Queue to put job in, defaults to
default.
fail_job
my $bool = $backend->fail_job($job_id, $retries);
my $bool = $backend->fail_job($job_id, $retries, 'Something went wrong!');
my $bool = $backend->fail_job(
$job_id, $retries, {msg => 'Something went wrong!'});
Transition from active to failed state.
finish_job
my $bool = $backend->finish_job($job_id, $retries);
my $bool = $backend->finish_job($job_id, $retries, 'All went well!');
my $bool = $backend->finish_job($job_id, $retries, {msg => 'All went well!'});
Transition from active to finished state.
job_info
my $job_info = $backend->job_info($job_id);
Get information about a job or return undef if job does not exist.
# Check job state
my $state = $backend->job_info($job_id)->{state};
# Get job result
my $result = $backend->job_info($job_id)->{result};
These fields are currently available:
- args
-
args => ['foo', 'bar']Job arguments.
- created
-
created => 784111777Time job was created.
- delayed
-
delayed => 784111777Time job was delayed to.
- finished
-
finished => 784111777Time job was finished.
- priority
-
priority => 3Job priority.
- queue
-
queue => 'important'Queue name.
- result
-
result => 'All went well!'Job result.
- retried
-
retried => 784111777Time job has been retried.
- retries
-
retries => 3Number of times job has been retried.
- started
-
started => 784111777Time job was started.
- state
-
state => 'inactive'Current job state, usually
active,failed,finishedorinactive. - task
-
task => 'foo'Task name.
- worker
-
worker => '154'Id of worker that is processing the job.
list_jobs
my $batch = $backend->list_jobs($offset, $limit);
my $batch = $backend->list_jobs($offset, $limit, {states => 'inactive'});
Returns the same information as "job_info" but in batches.
These options are currently available:
- state
-
state => 'inactive'List only jobs in this state.
- task
-
task => 'test'List only jobs for this task.
list_schedules
my $results = $backend->list_schedules($offset, $limit);
my $results = $backend->list_schedules($offset, $limit, {names => ['daily']});
Returns information about schedules in batches.
# Get the total number of results (without limit)
my $num = $backend->list_schedules(0, 100)->{total};
# Check next firing time
my $results = $backend->list_schedules(0, 1, {names => ['daily']});
my $next = $results->{schedules}[0]{next_run};
These options are currently available:
- before
-
before => 23List only schedules before this id.
- ids
-
ids => ['23', '24']List only schedules with these ids.
- names
-
names => ['foo', 'bar']List only schedules with these names.
These fields are currently available:
- args
-
args => ['foo', 'bar']Job arguments used for each enqueued job.
- attempts
-
attempts => 25Number of attempts each enqueued job will get.
- created
-
created => 784111777Epoch time the schedule was created.
- cron
-
cron => '0 9 * * 1-5'Cron expression.
- expire
-
expire => 300Expiration in seconds for each enqueued job.
- id
-
id => 23Schedule id.
- last_job
-
last_job => '10025'Id of the most recently enqueued job, or
undefif the schedule has not fired yet. - last_run
-
last_run => 784111777Epoch time the schedule last fired, or
undefif it has not fired yet. - lax
-
lax => 0Lax dependency setting for each enqueued job.
- name
-
name => 'daily'Schedule name.
- next_run
-
next_run => 784111777Epoch time the schedule will fire next.
- notes
-
notes => {foo => 'bar'}Hash reference with arbitrary metadata applied to each enqueued job.
- paused
-
paused => 0True if the schedule is paused and will not fire.
- priority
-
priority => 0Priority of each enqueued job.
- queue
-
queue => 'default'Queue each enqueued job is placed in.
- task
-
task => 'foo'Task name.
list_workers
my $results = $backend->list_workers($offset, $limit);
my $results = $backend->list_workers($offset, $limit, {ids => [23]});
Returns information about workers in batches.
# Get the total number of results (without limit)
my $num = $backend->list_workers(0, 100)->{total};
# Check worker host
my $results = $backend->list_workers(0, 1, {ids => [$worker_id]});
my $host = $results->{workers}[0]{host};
These options are currently available:
- before
-
before => 23List only workers before this id.
- ids
-
ids => ['23', '24']List only workers with these ids.
These fields are currently available:
- id
-
id => 22Worker id.
- host
-
host => 'localhost'Worker host.
- jobs
-
jobs => ['10023', '10024', '10025', '10029']Ids of jobs the worker is currently processing.
- notified
-
notified => 784111777Epoch time worker sent the last heartbeat.
- pid
-
pid => 12345Process id of worker.
- started
-
started => 784111777Epoch time worker was started.
- status
-
status => {queues => ['default', 'important']}Hash reference with whatever status information the worker would like to share.
new
my $backend = Minion::Backend::mysql->new('mysql://mysql@/test');
Construct a new Minion::Backend::mysql object.
note
my $bool = $backend->note($job_id, {mojo => 'rocks', minion => 'too'});
Change one or more metadata fields for a job. Setting a value to undef will remove the field.
pause_schedule
my $bool = $backend->pause_schedule('daily');
Pause a schedule by name so it stops firing until resumed. Returns true on success, false if the schedule does not exist.
receive
my $commands = $backend->receive($worker_id);
Receive remote control commands for worker.
register_worker
my $worker_id = $backend->register_worker;
my $worker_id = $backend->register_worker($worker_id);
Register worker or send heartbeat to show that this worker is still alive.
remove_job
my $bool = $backend->remove_job($job_id);
Remove failed, finished or inactive job from queue.
repair
$backend->repair;
Repair worker registry and job queue if necessary.
reset
$backend->reset;
Reset job queue.
resume_schedule
my $bool = $backend->resume_schedule('daily');
Resume a previously paused schedule. Returns true on success, false if the schedule does not exist.
retry_job
my $bool = $backend->retry_job($job_id, $retries);
my $bool = $backend->retry_job($job_id, $retries, {delay => 10});
Transition from failed or finished state back to inactive.
These options are currently available:
- delay
-
delay => 10Delay job for this many seconds (from now).
- parents
-
parents => [$id1, $id2, $id3]Jobs this job depends on.
- priority
-
priority => 5Job priority.
- queue
-
queue => 'important'Queue to put job in.
schedule
my $id = $backend->schedule('daily', '0 4 * * *', 'cleanup');
my $id = $backend->schedule('daily', '0 4 * * *', 'cleanup', [@args]);
my $id = $backend->schedule(
'daily', '0 4 * * *', 'cleanup', [@args], {priority => 5});
Create or replace a schedule by unique name. Updating a schedule with the same cron expression preserves its current firing time; changing the expression recomputes it.
These options are currently available:
- attempts
-
attempts => 25Number of times performing each enqueued job will be attempted, defaults to
1. - expire
-
expire => 300Each enqueued job is valid for this many seconds before it expires.
- lax
-
lax => 1Existing jobs each enqueued job depends on may also have transitioned to the
failedstate, defaults tofalse. - notes
-
notes => {foo => 'bar'}Hash reference with arbitrary metadata applied to each enqueued job.
- priority
-
priority => 5Priority of each enqueued job, defaults to
0. - queue
-
queue => 'important'Queue to put each enqueued job in, defaults to
default.
stats
my $stats = $backend->stats;
Get statistics for jobs and workers.
unregister_worker
$backend->unregister_worker($worker_id);
Unregister worker.
worker_info
my $worker_info = $backend->worker_info($worker_id);
Get information about a worker or return undef if worker does not exist.
# Check worker host
my $host = $backend->worker_info($worker_id)->{host};
These fields are currently available:
- host
-
host => 'localhost'Worker host.
- jobs
-
jobs => ['10023', '10024', '10025', '10029']Ids of jobs the worker is currently processing.
- notified
-
notified => 784111777Last time worker sent a heartbeat.
- pid
-
pid => 12345Process id of worker.
- started
-
started => 784111777Time worker was started.
PREFETCH AND JOB VOLUME
Currently, the query to look for the next job to run is slower than it could be. Between all the features like dependencies, priorities, retries, and expiration, the query is complicated and requires some expensive data manipulation. It still generally runs in less than one second even for hundreds of thousands of jobs, but if jobs take less than one second to process, a worker will end up spending most of its time looking up new jobs to run, and wasting all the effort it took to get those pending jobs in the right order.
With v1.007, this backend prefetches a number of jobs in one query. These jobs are then fed to worker processes as they ask. If a job is already claimed, the worker tries again to claim the next job.
If you are not seeing the throughput you expect, and your jobs are shorter than one second, you can change the $Minion::Backend::mysql::PREFETCH value to be higher. A good goal is for a worker to perform work for a few seconds before going back to the database. The prefetch cache expires after 30 seconds (which can be adjusted with $Minion::Backend::mysql::PREFETCH_EXPIRY).
ERRORS
DBD::mysql::st execute failed: Table '*.minion_workers' doesn't exist
This may happen when the SQL create/upgrade scripts fail to run completely due to permission errors. Re-running with the environment variable MOJO_MIGRATIONS_DEBUG=1 should produce the error message returned by the database.
A common reason for the database install to fail on MySQL >= 8 is that the user installing the database does not have SUPER privileges needed to create functions when binlogs are enabled: (DBD::mysql::st execute failed: You do not have the SUPER privilege and binary logging is enabled). See the MySQL documentation for Stored Program Binary Logging for more information about this problem and how to correct it.
SEE ALSO
Minion, Minion::Guide, https://minion.pm, Mojolicious::Guides, https://mojolicious.org.
AUTHORS
Brian Medley <bpmedley@cpan.org>
Doug Bell <preaction@cpan.org>
CONTRIBUTORS
a-leelan <40534142+a-leelan@users.noreply.github.com>
Alexander Nalobin <nalobin@reg.ru>
Dmitry Krylov <pentabion@gmail.com>
Hu Yin <huyin8@gmail.com>
Jason A. Crome <jcrome@empoweredbenefits.com>
Larry Leszczynski <larryl@cpan.org>
Olaf Alders <olaf@wundersolutions.com>
Paul Cochrane <paul@liekut.de>
Peter Joh <peter.joh@grantstreet.com>
Sergey Andreev <40195653+saintserge@users.noreply.github.com>
Zoffix Znet <cpan@zoffix.com>
COPYRIGHT AND LICENSE
This software is copyright (c) 2021 by Doug Bell and Brian Medley.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.