NAME
Redis::JobQueue - Job queue management implemented using Redis server.
VERSION
This documentation refers to Redis::JobQueue
version 1.02
SYNOPSIS
#-- Common
use Redis::JobQueue qw( DEFAULT_SERVER DEFAULT_PORT );
my $connection_string = DEFAULT_SERVER.':'.DEFAULT_PORT;
my $jq = Redis::JobQueue->new( redis => $connection_string );
#-- Producer
my $job = $jq->add_job(
{
queue => 'xxx',
workload => \'Some stuff',
expire => 12*60*60, # 12h,
}
);
#-- Worker
sub xxx {
my $job = shift;
my $workload = ${$job->workload};
# do something with workload;
$job->result( 'XXX JOB result comes here' );
}
while ( $job = $jq->get_next_job(
queue => 'xxx',
blocking => 1,
) )
{
$job->status( 'working' );
$jq->update_job( $job );
# do my stuff
xxx( $job );
$job->status( 'completed' );
$jq->update_job( $job );
}
#-- Consumer
my $id = $ARGV[0];
my $status = $jq->get_job_data( $id, 'status' );
if ( $status eq 'completed' )
{
# it is now safe to remove it from JobQueue, since it's completed
my $job = $jq->load_job( $id );
$jq->delete_job( $id );
print "Job result: ", ${$job->result}, "\n";
}
else
{
print "Job is not complete, has current '$status' status\n";
}
To see a brief but working code example of the Redis::JobQueue
package usage look at the "An Example" section.
Description of the used by Redis::JobQueue
data structures (on Redis server) is provided in "JobQueue data structure stored in Redis" section.
ABSTRACT
The Redis::JobQueue
package is a set of Perl modules which allows creation of a simple job queue based on Redis server capabilities.
DESCRIPTION
The main features of the package are:
Supports the automatic creation of job queues, job status monitoring, updating the job data set, obtaining a consistent job from the queue, removing jobs, and the classification of possible errors.
Contains various reusable components that can be used separately or together.
Provides an object oriented API.
Support of storing arbitrary job-related data structures.
Simple methods for organizing producer, worker, and consumer clients.
Atributes
id
-
An id that uniquely identifies the job, scalar.
queue
-
Queue name in which job is placed, scalar.
expire
-
For how long (seconds) job data structures will be kept in memory.
status
-
Job status, scalar. See Redis::JobQueue::Job EXPORT section for the list of pre-defined statuses. Can be also set to any arbitrary value.
workload
,result
-
User-set data structures which will be serialized before stored in Redis server. Suitable for passing large amounts of data.
*
-
Any other custom-named field passed to "constructor" or "update_job" method will be stored as metadata scalar in Redis server. Suitable for storing scalar values with fast access (will be serialized before stored in Redis server).
CONSTRUCTOR
new( redis => $server, timeout => $timeout )
Created a new Redis::JobQueue
object to communicate with the Redis server. If invoked without any arguments, the constructor new
creates and returns a Redis::JobQueue
object that is configured with the default settings and uses local redis
server.
Caveats related to connection with Redis server
Since Redis knows nothing about encoding, it forces a utf-8 flag on all data by default. If you want to store binary data in your job, you can disable automatic encoding by passing an option to Redis
new
:encoding => undef
.When Redis connection is establed with
encoding => undef
, non-serialize-able fields (like status or message) passed in UTF-8 can not be transferred correctly to the Redis server.By default
Redis::JobQueue
constructor creates connection to the Redis server withencoding => undef
argument. If a different encoding is desired, pass an established connection as instance of Redis class."DEFAULT_TIMEOUT" value is used when a Redis class object is passed to the
new
constructor without additionaltimeout
argument.
This example illustrates a new()
call with all the valid arguments:
my $jq = Redis::JobQueue->new(
redis => "$server:$port", # Connection info for Redis which hosts queue
timeout => $timeout, # Default wait time (in seconds)
# for blocking call of get_next_job.
# Set 0 for unlimited wait time
);
The following examples illustrate other uses of the new
method:
$jq = Redis::JobQueue->new();
my $next_jq = Redis::JobQueue->new( $jq );
my $redis = Redis->new( redis => "$server:$port" );
$next_jq = Redis::JobQueue->new(
$redis,
timeout => $timeout,
);
An invalid argument causes die (confess
).
METHODS
add_job( $job_data, LPUSH => 1 )
Adds a job to the queue on the Redis server.
The first argument should be either Redis::JobQueue::Job object (which is modified by the method) or a reference to a hash representing Redis::JobQueue::Job - in the latter case a new Redis::JobQueue::Job object is created.
Returns a Redis::JobQueue::Job object with a new unique identifier.
Job status is set to STATUS_CREATED.
add_job
optionally takes arguments in key-value pairs.
The following example illustrates a add_job()
call with all the valid arguments:
my $job_data = {
id => '4BE19672-C503-11E1-BF34-28791473A258',
queue => 'lovely_queue', # required
job => 'strong_job', # optional attribute
expire => 12*60*60,
status => 'created',
workload => \'Some stuff',
result => \'JOB result comes here',
};
my $job = Redis::JobQueue::Job->new( $job_data );
my $resulting_job = $jq->add_job( $job );
# or
$resulting_job = $jq->add_job(
$pre_job,
LPUSH => 1,
);
If used with the optional argument LPUSH => 1
, the job is placed at the front of the queue and will be returned by the next call to get_next_job.
TTL of job data on Redis server is set in accordance with the "expire" attribute of the Redis::JobQueue::Job object. Make sure it's higher than the time needed to process the other jobs in the queue.
get_job_data( $job, $data_key )
Data of the job is requested from the Redis server. First argument can be either a job ID or Redis::JobQueue::Job object.
Returns undef
when the job was not found on Redis server.
The method returns the jobs data from the Redis server. See Redis::JobQueue::Job for the list of standard jobs data.
The method returns a reference to a hash of the standard jobs data if only the first argument is specified.
If given a key name $data_key
, it returns data corresponding to the key or undef
when the value is undefined or key is not in the data or metadata.
The following examples illustrate uses of the get_job_data
method:
my $data_href = $jq->get_job_data( $id );
# or
$data_href = $jq->get_job_data( $job );
# or
my $data_key = 'foo';
my $data = $jq->get_job_data( $job->id, $data_key );
You can specify a list of names of key data or metadata. In this case it returns the corresponding list of data. For example:
my ( $status, $foo ) = $jq->get_job_data( $job->id, 'status', 'foo' );
See meta_data for more informations about the jobs metadata.
get_job_meta_fields( $job )
The list of names of metadata fields of the job is requested from the Redis server. First argument can be either a job ID or Redis::JobQueue::Job object.
Returns empty list when the job was not found on Redis server or the job does not have metadata.
The following examples illustrate uses of the get_job_meta_fields
method:
my @fields = $jq->get_job_meta_fields( $id );
# or
@fields = $jq->get_job_meta_fields( $job );
# or
@fields = $jq->get_job_meta_fields( $job->id );
See meta_data for more informations about the jobs metadata.
load_job( $job )
Loads job data from the Redis server. The argument is either job ID or a Redis::JobQueue::Job object.
Method returns the object corresponding to the loaded job. Returns undef
if the job is not found on the Redis server.
The following examples illustrate uses of the load_job
method:
$job = $jq->load_job( $id );
# or
$job = $jq->load_job( $job );
get_next_job( queue => $queue_name, $blocking => 1 )
Selects the job identifier which is at the beginning of the queue.
get_next_job
takes arguments in key-value pairs. You can specify a queue name or a reference to an array of queue names. Queues from the list are processed in random order.
By default, each queue is processed in a separate request with the result returned immediately if a job is found (waiting) in that queue. If no waiting job found, returns undef. In case optional blocking
argument is true, all queues are processed in a single request to Redis server and if no job is found waiting in queue(s), get_next_job execution will be paused for up to timeout
seconds or until a job becomes available in any of the listed queues.
Use timeout
= 0 for an unlimited wait time. Default - blocking
is false (0).
Method returns the job object corresponding to the received job identifier. Returns the undef
if there is no job in the queue.
These examples illustrate a get_next_job
call with all the valid arguments:
$job = $jq->get_next_job(
queue => 'xxx',
blocking => 1,
);
# or
$job = $jq->get_next_job(
queue => [ 'aaa', 'bbb' ],
blocking => 1,
);
TTL job data for the job resets on the Redis server in accordance with the "expire" attribute of the job object.
update_job( $job )
Saves job data changes to the Redis server. Job ID is obtained from the argument, which can be a Redis::JobQueue::Job object.
Returns the number of attributes that were updated if the job was on the Redis server and undef
if it was not. When you change a single attribute, returns 2
because updated also changes.
Changing the "expire" attribute is ignored.
The following examples illustrate uses of the update_job
method:
$jq->update_job( $job );
TTL job data for the job resets on the Redis server in accordance with the "expire" attribute of the job object.
delete_job( $job )
Deletes the job data in Redis server. The Job ID is obtained from the argument, which can be either a string or a Redis::JobQueue::Job object.
Returns true if job and its metadata was successfully deleted from Redis server. False if jobs or its metadata wasn't found.
The following examples illustrate uses of the delete_job
method:
$jq->delete_job( $job );
# or
$jq->delete_job( $id );
Use this method immediately after receiving the results of the job for the early release of memory on the Redis server.
To see a description of the Redis::JobQueue
data structure used on the Redis server look at the "JobQueue data structure stored in Redis" section.
get_job_ids
Gets a list of job IDs on the Redis server. These IDs are identifiers of job data structures, not only the identifiers which get derived from the queue by "get_next_job".
The following examples illustrate simple uses of the get_job_ids
method (IDs of all existing jobs):
@jobs = $jq->get_job_ids;
These are identifiers of jobs data structures related to the queue determined by the arguments.
get_job_ids
takes arguments in key-value pairs. You can specify a queue name or job status (or a reference to an array of queue names or job statuses) to filter the list of identifiers.
You can also specify an argument queued
(true or false).
When filtering by the names of the queues and queued
is set to true, only the identifiers of the jobs which have not yet been derived from the queues using "get_next_job" are returned.
Filtering by status returns the task IDs whose status is exactly the same as the specified status.
The following examples illustrate uses of the get_job_ids
method:
# filtering the identifiers of jobs data structures
@ids = $jq->get_job_ids( queue => 'some_queue' );
# or
@ids = $jq->get_job_ids( queue => [ 'foo_queue', 'bar_queue' ] );
# or
@ids = $jq->get_job_ids( status => STATUS_COMPLETED );
# or
@ids = $jq->get_job_ids( status => [ STATUS_COMPLETED, STATUS_FAILED ] );
# filter the IDs are in the queues
@ids = $jq->get_job_ids( queued => 1, queue => 'some_queue' );
# etc.
server
Returns the address of the Redis server used by the queue (in the form 'host:port').
The following examples illustrate uses of the server
method:
$redis_address = $jq->server;
ping
This command is used to test if a connection is still alive.
Returns 1 if a connection is still alive or 0 otherwise.
The following examples illustrate uses of the ping
method:
$is_alive = $jq->ping;
quit
Close connection to the Redis server.
The following examples illustrate uses of the quit
method:
$jq->quit;
queue_status
Gets queue status from the Redis server. Queue name is obtained from the argument. The argument can be either a string representing a queue name or a Redis::JobQueue::Job object.
Returns a reference to a hash describing state of the queue or a reference to an empty hash when the queue wasn't found.
The following examples illustrate uses of the queue_status
method:
$qstatus = $jq->queue_status( $queue_name );
# or
$qstatus = $jq->queue_status( $job );
The returned hash contains the following information related to the queue:
length
The number of jobs in the active queue that are waiting to be selected by "get_next_job" and then processed.
all_jobs
The number of ALL jobs tagged with the queue, i.e. including those that were processed before and other jobs, not presently waiting in the active queue.
max_job_age
The age of the oldest job (the lifetime of the queue) in the active queue.
min_job_age
The age of the youngest job in the acitve queue.
lifetime
Time it currently takes for a job to pass through the queue.
Statistics based on the jobs that have not yet been removed. Some fields may be missing if the status of the job prevents determining the desired information (eg, there are no jobs in the queue).
timeout
The method of access to the timeout
attribute.
The method returns the current value of the attribute if called without arguments.
Non-negative integer value can be used to specify a new value of the maximum waiting time for queue (of the "get_next_job" method). Use timeout
= 0 for an unlimited wait time.
max_datasize
The method of access to the max_datasize
attribute.
The method returns the current value of the attribute if called without arguments.
Non-negative integer value can be used to specify a new value for the maximum size of data in the attributes of a Redis::JobQueue::Job object.
The check is done before sending data to the module Redis, after possible processing by methods of module Storable (attributes workload, result and meta_data). It is automatically serialized.
The max_datasize
attribute value is used in the constructor and data entry job operations on the Redis server.
The constructor uses the smaller of the values of 512MB and maxmemory
limit from a redis.conf
file.
last_errorcode
The method of access to the code of the last identified error.
To see more description of the identified errors look at the "DIAGNOSTICS" section.
last_error
The method of access to the error message of the last identified error.
To see more description of the identified errors look at the "DIAGNOSTICS" section.
EXPORT
None by default.
Additional constants are available for import, which can be used to define some type of parameters.
These are the defaults:
Redis::JobQueue::MAX_DATASIZE
-
Maximum size of the data stored in
workload
,result
: 512MB. DEFAULT_SERVER
-
Default Redis local server -
'localhost'
. DEFAULT_PORT
-
Default Redis server port - 6379.
DEFAULT_TIMEOUT
-
Maximum wait time (in seconds) you receive a message from the queue - 0 for an unlimited timeout.
NAMESPACE
-
Namespace name used for keys on the Redis server -
'JobQueue'
. NS_METADATA_SUFFIX
-
Namespace name suffix used for metadata keys on the Redis server -
'M'
. - Error codes are identified
-
To see more description of the identified errors look at the "DIAGNOSTICS" section.
DIAGNOSTICS
The method to retrieve a possible error for analysis is: "last_errorcode".
A Redis error will cause the program to halt (confess
). In addition to errors in the Redis module detected errors "E_MISMATCH_ARG", "E_DATA_TOO_LARGE", "E_JOB_DELETED". All recognizable errors in Redis::JobQueue
lead to the installation of the corresponding value in the "last_errorcode" and cause an exception (confess
). Unidentified errors cause an exception ("last_errorcode" remains equal to 0). The initial value of $@
is preserved.
The user has the choice:
Use the package methods and independently analyze the situation without the use of "last_errorcode".
Piece of code wrapped in
eval {...};
and analyze "last_errorcode" (look at the "An Example" section).
In "last_errorcode" recognizes the following:
E_NO_ERROR
-
No error.
E_MISMATCH_ARG
-
Invalid argument of
new
or to other method. E_DATA_TOO_LARGE
-
This means that the data is too large.
E_NETWORK
-
Error connecting to Redis server.
E_MAX_MEMORY_LIMIT
-
Command failed because it requires more than allowed memory, set in
maxmemory
. E_JOB_DELETED
-
Job's data was removed.
E_REDIS
-
This means that other Redis error message detected.
An Example
This example shows a possible treatment for possible errors.
#-- Common ---------------------------------------------------------------
use Redis::JobQueue qw(
DEFAULT_SERVER
DEFAULT_PORT
E_NO_ERROR
E_MISMATCH_ARG
E_DATA_TOO_LARGE
E_NETWORK
E_MAX_MEMORY_LIMIT
E_JOB_DELETED
E_REDIS
);
use Redis::JobQueue::Job qw(
STATUS_CREATED
STATUS_WORKING
STATUS_COMPLETED
);
my $server = DEFAULT_SERVER.':'.DEFAULT_PORT; # the Redis Server
# Example of error handling
sub exception {
my $jq = shift;
my $err = shift;
if ( $jq->last_errorcode == E_NO_ERROR )
{
# For example, to ignore
return unless $err;
}
elsif ( $jq->last_errorcode == E_MISMATCH_ARG )
{
# Necessary to correct the code
}
elsif ( $jq->last_errorcode == E_DATA_TOO_LARGE )
{
# You must use the control data length
}
elsif ( $jq->last_errorcode == E_NETWORK )
{
# For example, sleep
#sleep 60;
# and return code to repeat the operation
#return "to repeat";
}
elsif ( $jq->last_errorcode == E_JOB_DELETED )
{
# For example, return code to ignore
my $id = $err =~ /^(\S+)/;
#return "to ignore $id";
}
elsif ( $jq->last_errorcode == E_REDIS )
{
# Independently analyze the $err
}
else
{
# Unknown error code
}
die $err if $err;
}
my $jq;
eval {
$jq = Redis::JobQueue->new(
redis => $server,
timeout => 1, # DEFAULT_TIMEOUT = 0 for an unlimited timeout
);
};
exception( $jq, $@ ) if $@;
#-- Producer -------------------------------------------------------------
#-- Adding new job
my $job;
eval {
$job = $jq->add_job(
{
queue => 'xxx',
workload => \'Some stuff',
expire => 12*60*60,
} );
};
exception( $jq, $@ ) if $@;
print 'Added job ', $job->id, "\n" if $job;
eval {
$job = $jq->add_job(
{
queue => 'yyy',
workload => \'Some stuff',
expire => 12*60*60,
} );
};
exception( $jq, $@ ) if $@;
print 'Added job ', $job->id, "\n" if $job;
#-- Worker ---------------------------------------------------------------
#-- Run your jobs
sub xxx {
my $job = shift;
my $workload = ${$job->workload};
# do something with workload;
print "XXX workload: $workload\n";
$job->result( 'XXX JOB result comes here' );
}
sub yyy {
my $job = shift;
my $workload = ${$job->workload};
# do something with workload;
print "YYY workload: $workload\n";
$job->result( \'YYY JOB result comes here' );
}
eval {
while ( my $job = $jq->get_next_job(
queue => [ 'xxx','yyy' ],
blocking => 1,
) )
{
my $id = $job->id;
my $status = $jq->get_job_data( $id, 'status' );
print "Job '", $id, "' was '$status' status\n";
$job->status( STATUS_WORKING );
$jq->update_job( $job );
$status = $jq->get_job_data( $id, 'status' );
print "Job '", $id, "' has new '$status' status\n";
# do my stuff
if ( $job->queue eq 'xxx' )
{
xxx( $job );
}
elsif ( $job->queue eq 'yyy' )
{
yyy( $job );
}
$job->status( STATUS_COMPLETED );
$jq->update_job( $job );
$status = $jq->get_job_data( $id, 'status' );
print "Job '", $id, "' has last '$status' status\n";
}
};
exception( $jq, $@ ) if $@;
#-- Consumer -------------------------------------------------------------
#-- Check the job status
eval {
# For example:
# my $status = $jq->get_job_data( $ARGV[0], 'status' );
# or:
my @ids = $jq->get_job_ids;
foreach my $id ( @ids )
{
my $status = $jq->get_job_data( $id, 'status' );
print "Job '$id' has '$status' status\n";
}
};
exception( $jq, $@ ) if $@;
#-- Fetching the result
eval {
# For example:
# my $id = $ARGV[0];
# or:
my @ids = $jq->get_job_ids;
foreach my $id ( @ids )
{
my $status = $jq->get_job_data( $id, 'status' );
print "Job '$id' has '$status' status\n";
if ( $status eq STATUS_COMPLETED )
{
my $job = $jq->load_job( $id );
# it is now safe to remove it from JobQueue, since it is completed
$jq->delete_job( $id );
print 'Job result: ', ${$job->result}, "\n";
}
else
{
print "Job is not complete, has current '$status' status\n";
}
}
};
exception( $jq, $@ ) if $@;
#-- Closes and cleans up -------------------------------------------------
eval { $jq->quit };
exception( $jq, $@ ) if $@;
JobQueue data structure stored in Redis
The following data structures are stored in Redis server:
While working on the Redis server creates and uses these data structures:
#-- To store the job data:
# HASH Namespace:id
For example:
$ redis-cli
redis 127.0.0.1:6379> KEYS JobQueue:*
1) "JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783"
1) "JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783:M"
2) "JobQueue:478C81B2-C5B8-11E1-B5B1-16670A986783"
2) "JobQueue:478C81B2-C5B8-11E1-B5B1-16670A986783:M"
3) "JobQueue:89116152-C5BD-11E1-931B-0A690A986783"
3) "JobQueue:89116152-C5BD-11E1-931B-0A690A986783:M"
# | | |
# Namespace | |
# Job id (UUID) |
# Namespace name suffix used metadata keys
...
redis 127.0.0.1:6379> hgetall JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783
1) "queue" # hash key
2) "xxx" # the key value
3) "job" # hash key
4) "Some description" # the key value
5) "workload" # hash key
6) "Some stuff" # the key value
7) "expire" # hash key
8) "43200" # the key value
9) "status" # hash key
10) "_created_" # the key value
#-- To store the job metadata:
# HASH Namespace:id:suffix
#
For example:
redis 127.0.0.1:6379> hgetall JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783:M
1) "foo" # hash key
2) "xxx" # the key value
3) "bar" # hash key
4) "yyy" # the key value
...
After you create ("add_job" method) or modify ("update_job" method) the data set are within the time specified "expire" attribute (seconds). For example:
redis 127.0.0.1:6379> TTL JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783
(integer) 42062
Hashes of the job data and metadata are deleted when you delete the job ("delete_job" method).
# -- To store the job queue (the list created but not yet requested jobs):
# LIST JobQueue:queue:queue_name:job_name
For example:
redis 127.0.0.1:6379> KEYS JobQueue:queue:*
...
4) "JobQueue:queue:xxx"
5) "JobQueue:queue:yyy"
# | | |
# Namespace | |
# Fixed key word |
# Queue name
...
redis 127.0.0.1:6379> LRANGE JobQueue:queue:xxx 0 -1
1) "478B9C84-C5B8-11E1-A2C5-D35E0A986783 1344070066"
2) "89116152-C5BD-11E1-931B-0A690A986783 1344070067"
# | |
# Job id (UUID) |
# Expire time (UTC)
...
Job queue will be created automatically when data arrives to be contained. Job queue will be deleted automatically in the exhaustion of its contents.
DEPENDENCIES
In order to install and use this package is desirable to use a Perl version 5.010 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Redis::JobQueue
package:
Data::UUID
Digest::SHA1
List::MoreUtils
Mouse
Params::Util
Redis
Storable
Redis::JobQueue
package has the following optional dependencies:
Net::EmptyPort
Test::Deep
Test::Exception
Test::NoWarnings
Test::RedisServer
If the optional modules are missing, some "prereq" tests are skipped.
BUGS AND LIMITATIONS
Needs Redis server version 2.6 or higher as module uses Redis Lua scripting.
The use of maxmemory-police all*
in the redis.conf
file could lead to a serious (but hard to detect) problem as Redis server may delete the job queue lists.
It may not be possible to use this module with the cluster of Redis servers because full name of some Redis keys may not be known at the time of the call the Redis Lua script ('EVAL'
or 'EVALSHA'
command). So the Redis server may not be able to correctly forward the request to the appropriate node in the cluster.
We strongly recommend using the option maxmemory
in the redis.conf
file if the data set may be large.
The Redis::JobQueue
package was written, tested, and found working on recent Linux distributions.
There are no known bugs in this package.
Please report problems to the "AUTHOR".
Patches are welcome.
MORE DOCUMENTATION
All modules contain detailed information on the interfaces they provide.
SEE ALSO
The basic operation of the Redis::JobQueue
package modules:
Redis::JobQueue - Object interface for creating and executing jobs queues, as well as monitoring the status and results of jobs.
Redis::JobQueue::Job - Object interface for creating and manipulating jobs.
Redis - Perl binding for Redis database.
SOURCE CODE
Redis::JobQueue is hosted on GitHub: https://github.com/TrackingSoft/Redis-JobQueue
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2013 by TrackingSoft LLC.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.