NAME
Redis::JobQueue - Object interface for the creation, execution the job queues, as well as the status and outcome objectives
VERSION
This documentation refers to Redis::JobQueue
version 0.09
SYNOPSIS
#-- Common
use Redis::JobQueue qw( DEFAULT_SERVER DEFAULT_PORT );
my $connection_string = DEFAULT_SERVER.':'.DEFAULT_PORT;
my $jq = Redis::JobQueue->new( redis => $connection_string );
#-- Producer
my $job = $jq->add_job(
{
queue => 'xxx',
workload => \'Some stuff up to 512MB long',
expire => 12*60*60, # 12h,
}
);
#-- Worker
sub xxx {
my $job = shift;
my $workload = ${$job->workload};
# do something with workload;
$job->result( 'XXX JOB result comes here, up to 512MB long' );
}
while ( $job = $jq->get_next_job(
queue => 'xxx',
blocking => 1,
) )
{
$job->status( 'working' );
$jq->update_job( $job );
# do my stuff
xxx( $job );
$job->status( 'completed' );
$jq->update_job( $job );
}
#-- Consumer
my $id = $ARGV[0];
my $status = $jq->check_job_status( $id );
if ( $status eq 'completed' )
{
# now safe it from JobQueue, since it's completed
my $job = $jq->load_job( $id );
$jq->delete_job( $id );
print "Job result: ", ${$job->result}, "\n";
}
else
{
print "Job is not complete, has current '$status' status\n";
}
To see a brief but working code example of the Redis::JobQueue
package usage look at the "An Example" section.
To see a description of the used Redis::JobQueue
data structure (on Redis server) look at the "JobQueue data structure" section.
ABSTRACT
The Redis::JobQueue
package is a set of Perl modules which provides a simple job queue with Redis server capabilities.
DESCRIPTION
The user modules in this package provide an object oriented API. The job queues interface and the jobs are all represented by objects. This makes a simple and powerful interface to these services.
The main features of the package are:
Contains various reusable components that can be used separately or together.
Provides an object oriented model of communication.
Support the work with data structures on the Redis server.
Supports the automatic creation of job queue, job status monitoring, updating the job data set, obtaining a consistent job from the queue, remove job, the classification of possible errors.
Simple methods for organizing producer, worker and consumer clients.
CONSTRUCTOR
new( redis => $server, timeout => $timeout )
It generates a Redis::JobQueue
object to communicate with the Redis server and can be called as either a class method or an object method. If invoked with no arguments the constructor new
creates and returns a Redis::JobQueue
object that is configured to work with the default settings.
If invoked with the first argument being an object of Redis::JobQueue
class or Redis class, then the new object attribute values are taken from the object of the first argument. It does not create a new connection to the Redis server. A created object uses the default value "DEFAULT_TIMEOUT" when a Redis class object is passed to the new
constructor, as Redis class does not support the timeout attribute while waiting for a message from the queue.
new
optionally takes arguments. These arguments are in key-value pairs.
This example illustrates a new()
call with all the valid arguments:
my $jq = Redis::JobQueue->new(
redis => "$server:$port", # Default Redis local server and port
timeout => $timeout, # Maximum wait time (in seconds)
# you receive a message from the queue
# 0 for an unlimited wait time
);
The following examples illustrate other uses of the new
method:
$jq = Redis::JobQueue->new();
my $next_jq = Redis::JobQueue->new( $jq );
my $redis = Redis->new( redis => "$server:$port" );
$next_jq = Redis::JobQueue->new(
$redis,
timeout => $timeout,
);
An error will cause the program to halt (confess
) if an argument is not valid.
METHODS
An error will cause the program to halt (confess
) if an argument is not valid.
ATTENTION: In the Redis module the synchronous commands throw an exception on receipt of an error reply, or return a non-error reply directly.
add_job( $pre_job, LPUSH => 1 )
Adds a job to the queue on the Redis server. At the same time creates and returns a new Redis::JobQueue::Job object with a new unique identifier. Job status is set to "STATUS_CREATED".
The first argument should be an Redis::JobQueue::Job object or a reference to a hash describing Redis::JobQueue::Job object attributes.
add_job
optionally takes arguments. These arguments are in key-value pairs.
This example illustrates a add_job()
call with all the valid arguments:
my $pre_job = {
id => '4BE19672-C503-11E1-BF34-28791473A258',
queue => 'lovely_queue', # required
job => 'strong_job', # optional attribute
expire => 12*60*60,
status => 'created',
workload => \'Some stuff up to 512MB long',
result => \'JOB result comes here, up to 512MB long',
};
my $job = Redis::JobQueue::Job->new(
id => $pre_job->{id},
queue => $pre_job->{queue}, # required
job => $pre_job->{job}, # optional attribute
expire => $pre_job->{expire},
status => $pre_job->{status},
workload => $pre_job->{workload},
result => $pre_job->{result},
);
my $resulting_job = $jq->add_job( $job );
# or
$resulting_job = $jq->add_job(
$pre_job,
LPUSH => 1,
);
If used with a LPUSH
optional argument, the job is placed in front of the queue and not in its end (if the argument is true).
TTL job data sets on the Redis server in accordance with the expire
attribute of the Redis::JobQueue::Job object.
Method returns the object corresponding to the added job.
check_job_status( $job )
Status of the job is requested from the Redis server. Job ID is obtained from the argument. The argument can be either a string or a Redis::JobQueue::Job object.
Returns the status string or undef
when the job data does not exist. Returns undef
if the job is not on the Redis server.
The following examples illustrate uses of the check_job_status
method:
my $status = $jq->check_job_status( $id );
# or
$status = $jq->check_job_status( $job );
load_job( $job )
Jobs data are loaded from the Redis server. Job ID is obtained from the argument. The argument can be either a string or a Redis::JobQueue::Job object.
Method returns the object corresponding to the loaded job. Returns undef
if the job is not on the Redis server.
The following examples illustrate uses of the check_job_status
method:
$job = $jq->load_job( $id );
# or
$job = $jq->load_job( $job );
get_next_job( queue => $queue_name, $blocking => 1 )
Selects the first job identifier in the queue.
get_next_job
takes arguments in key-value pairs. You can specify a queue name or a reference to an array of names of queues. Queues from the list are processed in random order.
If the optional blocking
argument is true, then the get_next_job
method waits for a maximum period of time specified in the timeout
attribute of the queue object. Use timeout
= 0 for an unlimited wait time. By default, the result is returned immediately. If the optional blocking
argument is true, then all queues are processed in a single request to Redis server; otherwise, each queue is processed in a separate request. Default - blocking
is false (0).
Method returns the job object corresponding to the received job identifier. Returns the undef
if there is no job in the queue.
These examples illustrates a get_next_job
call with all the valid arguments:
$job = $jq->get_next_job(
queue => 'xxx',
blocking => 1,
);
# or
$job = $jq->get_next_job(
queue => [ 'aaa', 'bbb' ],
blocking => 1,
);
TTL job data for the job resets on the Redis server in accordance with the expire
attribute of the job object.
update_job( $job )
Saves the changes to the job data to the Redis server. Job ID is obtained from the argument. The argument can be a Redis::JobQueue::Job object.
Returns undef
if the job is not on the Redis server and the number of the attributes that were updated in the opposite case.
Changing the expire
attribute is ignored.
The following examples illustrate uses of the update_job
method:
$jq->update_job( $job );
TTL job data for the job resets on the Redis server in accordance with the expire
attribute of the job object.
delete_job( $job )
Deletes the job data in Redis server. Job ID is obtained from the argument. The argument can be either a string or a Redis::JobQueue::Job object.
Returns undef
if the job is not on the Redis server and true in the opposite case.
The following examples illustrate uses of the delete_job
method:
$jq->delete_job( $job );
# or
$jq->delete_job( $id );
Use this method immediately after receiving the results of the job for the early release of memory on the Redis server.
When the job is deleted, the data set on the Redis server are changed as follows:
All fields are removed (except
status
).status
field is set toSTATUS_DELETED
.For a hash of the data set TTL = 24h, if the job was
expire
= 0.Hash of the job data is automatically deleted in accordance with the established value of TTL (
expire
).
To see a description of the used Redis::JobQueue
data structure (on Redis server) look at the "JobQueue data structure" section.
get_jobs
Gets a list of job ids on the Redis server.
The following examples illustrate uses of the get_jobs
method:
@jobs = $jq->get_jobs;
quit
Ask the Redis server to close the connection.
The following examples illustrate uses of the quit
method:
$jq->quit;
timeout
The method of access to the timeout
attribute.
The method returns the current value of the attribute if called without arguments.
Non-negative integer value can be used to specify a new value of the maximum waiting time data from the queue (in the "get_next_job" method). Use timeout
= 0 for an unlimited wait time.
max_datasize
The method of access to the max_datasize
attribute.
The method returns the current value of the attribute if called without arguments.
Non-negative integer value can be used to specify a new value to the maximum size of data in the attributes of a Redis::JobQueue::Job object.
The max_datasize
attribute value is used in the constructor and operations data entry jobs on the Redis server.
The constructor uses the smaller of the values of 512MB and maxmemory
limit from a redis.conf
file.
last_errorcode
The method of access to the code of the last identified errors.
To see more description of the identified errors look at the "DIAGNOSTICS" section.
EXPORT
None by default.
Additional constants are available for import, which can be used to define some type of parameters.
These are the defaults:
DEFAULT_SERVER
-
Default Redis local server -
'localhost'
. DEFAULT_PORT
-
Default Redis server port - 6379.
DEFAULT_TIMEOUT
-
Maximum wait time (in seconds) you receive a message from the queue - 0 for an unlimited timeout.
NAMESPACE
-
Namespace name used keys on the Redis server -
'JobQueue'
. STATUS_CREATED
-
Text of the status of the job after it is created -
'_created_'
. STATUS_WORKING
-
Text of the status of the job at run-time -
'working'
. Must be set from the worker function. STATUS_COMPLETED
-
Text of the status of the job at the conclusion -
'completed'
. Must be set at the conclusion of the worker function. STATUS_DELETED
-
Text of the status of the job after removal -
'_deleted_'
. - Error codes are identified
-
To see more description of the identified errors look at the "DIAGNOSTICS" section.
These are the defaults:
Redis::JobQueue::EXPIRE_DELETED
-
TTL (24h) for a hash of the deleted data set, if the job was
expire
= 0.
DIAGNOSTICS
The method for the possible error to analyse: "last_errorcode".
A Redis error will cause the program to halt (confess
). In addition to errors in the Redis module detected errors "EMISMATCHARG", "EDATATOOLARGE", "EMAXMEMORYPOLICY", "EJOBDELETED". All recognizable errors in Redis::JobQueue
lead to the installation of the corresponding value in the "last_errorcode" and cause an exception (confess
). Unidentified errors cause an exception ("last_errorcode" remains equal to 0). The initial value of $@
is preserved.
The user has the choice:
Use the package methods and independently analyze the situation without the use of "last_errorcode".
Piece of code wrapped in
eval {...};
and analyze "last_errorcode" (look at the "An Example" section).
In "last_errortsode" recognizes the following:
ENOERROR
-
No error.
EMISMATCHARG
-
This means that you didn't give the right argument to a
new
or to other method. EDATATOOLARGE
-
This means that the data is too large.
ENETWORK
-
This means that an error in connection to Redis server was detected.
EMAXMEMORYLIMIT
-
This means that the command not allowed when used memory >
maxmemory
. EMAXMEMORYPOLICY
-
This means that the job was removed by
maxmemory-policy
. EJOBDELETED
-
This means that the job was removed prior to use.
EREDIS
-
This means that other Redis error message detected.
An Example
The example shows a possible treatment for possible errors.
#-- Common ---------------------------------------------------------------
use Redis::JobQueue qw(
DEFAULT_SERVER
DEFAULT_PORT
STATUS_CREATED
STATUS_WORKING
STATUS_COMPLETED
ENOERROR
EMISMATCHARG
EDATATOOLARGE
ENETWORK
EMAXMEMORYLIMIT
EMAXMEMORYPOLICY
EJOBDELETED
EREDIS
);
my $server = DEFAULT_SERVER.':'.DEFAULT_PORT; # the Redis Server
# A possible treatment for possible errors
sub exception {
my $jq = shift;
my $err = shift;
if ( $jq->last_errorcode == ENOERROR )
{
# For example, to ignore
return unless $err;
}
elsif ( $jq->last_errorcode == EMISMATCHARG )
{
# Necessary to correct the code
}
elsif ( $jq->last_errorcode == EDATATOOLARGE )
{
# You must use the control data length
}
elsif ( $jq->last_errorcode == ENETWORK )
{
# For example, sleep
#sleep 60;
# and return code to repeat the operation
#return "to repeat";
}
elsif ( $jq->last_errorcode == EMAXMEMORYLIMIT )
{
# For example, return code to restart the server
#return "to restart the redis server";
}
elsif ( $jq->last_errorcode == EMAXMEMORYPOLICY )
{
# For example, return code to recreate the job
my $id = $err =~ /^(\S+)/;
#return "to recreate $id";
}
elsif ( $jq->last_errorcode == EJOBDELETED )
{
# For example, return code to ignore
my $id = $err =~ /^(\S+)/;
#return "to ignore $id";
}
elsif ( $jq->last_errorcode == EREDIS )
{
# Independently analyze the $err
}
else
{
# Unknown error code
}
die $err if $err;
}
my $jq;
eval {
$jq = Redis::JobQueue->new(
redis => $server,
timeout => 1, # DEFAULT_TIMEOUT = 0 for an unlimited timeout
);
};
exception( $jq, $@ ) if $@;
#-- Producer -------------------------------------------------------------
#-- Adding new job
my $job;
eval {
$job = $jq->add_job(
{
queue => 'xxx',
workload => \'Some stuff up to 512MB long',
expire => 12*60*60,
} );
};
exception( $jq, $@ ) if $@;
print 'Added job ', $job->id, "\n" if $job;
eval {
$job = $jq->add_job(
{
queue => 'yyy',
workload => \'Some stuff up to 512MB long',
expire => 12*60*60,
} );
};
exception( $jq, $@ ) if $@;
print 'Added job ', $job->id, "\n" if $job;
#-- Worker ---------------------------------------------------------------
#-- Run your jobs
sub xxx {
my $job = shift;
my $workload = ${$job->workload};
# do something with workload;
print "XXX workload: $workload\n";
$job->result( 'XXX JOB result comes here, up to 512MB long' );
}
sub yyy {
my $job = shift;
my $workload = ${$job->workload};
# do something with workload;
print "YYY workload: $workload\n";
$job->result( \'YYY JOB result comes here, up to 512MB long' );
}
eval {
while ( my $job = $jq->get_next_job(
queue => [ 'xxx','yyy' ],
blocking => 1,
) )
{
my $id = $job->id;
my $status = $jq->check_job_status( $id );
print "Job '", $id, "' was '$status' status\n";
$job->status( STATUS_WORKING );
$jq->update_job( $job );
$status = $jq->check_job_status( $id );
print "Job '", $id, "' has new '$status' status\n";
# do my stuff
if ( $job->queue eq 'xxx' )
{
xxx( $job );
}
elsif ( $job->queue eq 'yyy' )
{
yyy( $job );
}
$job->status( STATUS_COMPLETED );
$jq->update_job( $job );
$status = $jq->check_job_status( $id );
print "Job '", $id, "' has last '$status' status\n";
}
};
exception( $jq, $@ ) if $@;
#-- Consumer -------------------------------------------------------------
#-- Check the job status
eval {
# For example:
# my $status = $jq->check_job_status( $ARGV[0] );
# or:
my @jobs = $jq->get_jobs;
foreach my $id ( @jobs )
{
my $status = $jq->check_job_status( $id );
print "Job '$id' has '$status' status\n";
}
};
exception( $jq, $@ ) if $@;
#-- Fetching the result
eval {
# For example:
# my $id = $ARGV[0];
# or:
my @jobs = $jq->get_jobs;
foreach my $id ( @jobs )
{
my $status = $jq->check_job_status( $id );
print "Job '$id' has '$status' status\n";
if ( $status eq STATUS_COMPLETED )
{
my $job = $jq->load_job( $id );
# now safe to compelete it from JobQueue, since it's completed
$jq->delete_job( $id );
print 'Job result: ', ${$job->result}, "\n";
}
else
{
print "Job is not complete, has current '$status' status\n";
}
}
};
exception( $jq, $@ ) if $@;
#-- Closes and cleans up -------------------------------------------------
eval { $jq->quit };
exception( $jq, $@ ) if $@;
JobQueue data structure
Using the currently selected database (default = 0).
While working on the Redis server creates and uses these data structures:
#-- To store the job data:
# HASH Namespace:id
For example:
$ redis-cli
redis 127.0.0.1:6379> KEYS JobQueue:*
1) "JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783"
2) "JobQueue:478C81B2-C5B8-11E1-B5B1-16670A986783"
3) "JobQueue:89116152-C5BD-11E1-931B-0A690A986783"
# | |
# Namespace |
# Job id (UUID)
...
redis 127.0.0.1:6379> hgetall JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783
1) "queue" # hash key
2) "xxx" # the key value
3) "job" # hash key
4) "Some description" # the key value
5) "workload" # hash key
6) "Some stuff up to 512MB long" # the key value
7) "expire" # hash key
8) "43200" # the key value
9) "status" # hash key
10) "_created_" # the key value
After you create ("add_job" method) or modify ("update_job" method) the data set are within the time specified expire
attribute (seconds). For example:
redis 127.0.0.1:6379> TTL JobQueue:478B9C84-C5B8-11E1-A2C5-D35E0A986783
(integer) 42062
Hash of the job data is deleted when you delete the job ("delete_job" method).
# -- To store the job queue (the list created but not yet requested jobs):
# LIST JobQueue:queue:queue_name:job_name
For example:
redis 127.0.0.1:6379> KEYS JobQueue:queue:*
...
4) "JobQueue:queue:xxx"
5) "JobQueue:queue:yyy"
# | | |
# Namespace | |
# Fixed key word |
# Queue name
...
redis 127.0.0.1:6379> LRANGE JobQueue:queue:xxx 0 -1
1) "478B9C84-C5B8-11E1-A2C5-D35E0A986783 1344070066"
2) "89116152-C5BD-11E1-931B-0A690A986783 1344070067"
# | |
# Job id (UUID) |
# Expire time (UTC)
...
Job queue will be created automatically when the data arrives to contain. Job queue will be deleted automatically in the exhaustion of its contents.
DEPENDENCIES
In order to install and use this package is desirable to use a Perl version 5.010 or later. Some modules within this package depend on other packages that are distributed separately from Perl. We recommend that you have the following packages installed before you install Redis::JobQueue
package:
Data::UUID
Mouse
Params::Util
Redis
Redis::JobQueue
package has the following optional dependencies:
Test::Distribution
Test::Exception
Test::Kwalitee
Test::Perl::Critic
Test::Pod
Test::Pod::Coverage
Test::RedisServer
Test::TCP
If the optional modules are missing, some "prereq" tests are skipped.
BUGS AND LIMITATIONS
The use of maxmemory-police all*
in the redis.conf
file could lead to a serious (but hard to detect) problem as Redis server may delete the job queue lists.
We strongly recommend using the option maxmemory
in the redis.conf
file if the data set may be large.
The Redis::JobQueue
package was written, tested, and found working on recent Linux distributions.
There are no known bugs in this package.
Please report problems to the "AUTHOR".
Patches are welcome.
MORE DOCUMENTATION
All modules contain detailed information on the interfaces they provide.
SEE ALSO
The basic operation of the Redis::JobQueue
package modules:
Redis::JobQueue - Object interface for creating, executing jobs queues, as well as monitoring the status and results of jobs.
Redis::JobQueue::Job - Object interface for jobs creating and manipulating.
Redis - Perl binding for Redis database.
AUTHOR
Sergey Gladkov, <sgladkov@trackingsoft.com>
CONTRIBUTORS
Alexander Solovey
Jeremy Jordan
Vlad Marchenko
COPYRIGHT AND LICENSE
Copyright (C) 2012-2013 by TrackingSoft LLC. All rights reserved.
This package is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic at http://dev.perl.org/licenses/artistic.html.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.