NAME

cPanel::TaskQueue::Recipes - some tasty uses of the cPanel::TaskQueue modules.

DESCRIPTION

This cookbook attempts to collect some of the normal modes of operation of the cPanel::TaskQueue modules. It describes some tasks you might want to accomplish and what you need to do to accomplish them.

RECIPES

Basics

This section describes some of the fundamentals of using the cPanel::TaskQueue modules.

Create a TaskQueue

Before using a cPanel::TaskQueue, you need to create one.

The first time you instantiate a cPanel::TaskQueue object, it creates the object and the cached version of the TaskQueue on disk.

use cPanel::TaskQueue;

my $q = cPanel::TaskQueue->new(
    {name=>'tasks', cache_dir=>'/var/taskqueue/'}
);

If the specified cache_dir is writable, a cache file will be created in this directory to hold the TaskQueue data.

I want a TaskQueue with different defaults.

The TaskQueue has several configurable parameters that you can use to customize the functioning of the queue. These parameters are changed by adding more named parameters to the new call, when creating the cPanel::TaskQeue object. For example,

my $q = cPanel::TaskQueue->new(
    {name=>'tasks', cache_dir=>'/var/taskqueue/', max_running=>10}
 );

changes the number of tasks that are allowed to run at the same time to 10 from the default of 2.

You can also set more than one of these parameters at a time. For example, if you need to modify all of the timeout values, you code do

my $q = cPanel::TaskQueue->new(
    {name=>'tasks', cache_dir=>'/var/taskqueue/',
    default_timeout => 30, max_timeout => 600,
    default_child_timeout => 30*60,}
);

This sets the default and maximum timeout values for an in-process task to 30 secs and 10 minutes, respectively. We also set the timeout value for a child task to 30 minutes.

I need to delay the queuing of tasks for some time in the future.

Before using a cPanel::TaskQueue::Scheduler, you need to create one.

The first time you instantiate a cPanel::TaskQueue::Scheduler object, it creates the object and the cached version of the Scheduler on disk.

use cPanel::TaskQueue::Scheduler;

my $s = cPanel::TaskQueue::Scheduler->new(
    {name=>'tasks', cache_dir=>'/var/taskqueue/'}
);

If the specified cache_dir is writable, a cache file will be created in this directory to hold the Scheduler data.

I need to access a TaskQueue defined elsewhere.

Let's say you want to access a TaskQueue that has been instantiated elsewhere (in this program or another). All you have to do is create a cPanel::TaskQueue object with the same name and cache_dir as the previous one to have access for queuing new tasks and such.

use cPanel::TaskQueue;

my $q = cPanel::TaskQueue->new(
    {name=>'tasks', cache_dir=>'/var/taskqueue/'}
);

This TaskQueue is equivalent to any other TaskQueue instantiated with the same name and cache_dir.

I need to access a Scheduler defined elsewhere.

Let's say you want to access a Scheduler that has been instantiated elsewhere (in this program or another). All you have to do is create a cPanel::TaskQueue::Scheduler object with the same name and cache_dir as the previous one to have access for queuing new tasks and such.

use cPanel::TaskQueue::Scheduler;

my $s = cPanel::TaskQueue::Scheduler->new(
    {name=>'tasks', cache_dir=>'/var/taskqueue/'}
);

This Scheduler is equivalent to any other Scheduler instantiated with the same name and cache_dir.

Using a Scheduler Token

When working with the TaskQueue system, you may need to pass a Scheduler object to another process and access it there. (This is actually how the retry logic works. We can't reliably serialize/deserialize a cPanel::TaskQueue::Scheduler object because of the potential for inconsistent handling of the locks, so the Scheduler provides a method to retrieve a Token that can be used to reconstruct an equivalent Scheduler object.

# in one place in the code.
my $token = $s->get_token();

# From some other place in the code or another process.
my $s2 = cPanel::TaskQueue::Scheduler->new( {token=>$token} );

At this point, $s and $s2 reference the same Scheduler file safely and can be treated as one instance of the object.

I want to add a command processor of my own.

The TaskQueue system would not be too useful without an ability to add more commands to process. You can add commands to the TaskQueue system with the cPanel::TaskQueue::register_task_processor class method. It associates a command name with a new processor object.

cPanel::TaskQueue->register_task_processor( 'doit', MyTasks::Doit->new() );

If there was no doit command registered, this associates the supplied new object as the processor for this command.

My task processor is too simple to need a full object.

Say you have a very simple command to process and don't want to create a class to process it. You can also call register_task_processor with a code reference and it will create the necessary wrapper to do what you want. It expects to pass the arguments from the command request to the coderef.

cPanel::TaskQueue->register_task_processor( 'echo', sub { print @_; } );

If you use this method, you obviously have to accept the default behavior for all of the other features of the Processor object.

I have too many tasks to register them individually.

Once you have more than a handful of cPanel::TaskQueue::Processors, registering them individually by hand is going to get tedious and error-prone. The cPanel::TaskQueue::PluginManager class solves this problem. This class expects packages defining cPanel::TaskQueue::Processor-derived objects to define a class method to_register that returns a list of pairs of command name and processing object. The PluginManager can use this information to register the classes automatically.

use cPanel::TaskQueue::PluginManager;
cPanel::TaskQueue::PluginManager::load_plugins(
    '/home/fred/MyTasks', 'TaskQueue::MyPlugins'
);

This loads all of the files from the namespace TaskQueue::MyPlugins in the directory /home/fred/MyTasks (or equivalently the *.pm files in the directory /home/fred/MyTasks/TaskQueue/MyPlugins) provided that /home/fred/MyTasks is part of perl's include path. Now, if you have all of your TaskProcessor plugins in a single directory and namespace, you can load them all at once.

I have so many tasks that one plugin namespace is not enough.

If your plugins exist in more than one group, you can apply "Loading Plugins" once for each combination of root directory and namespace. But, the PluginManager provides another convenience method to make this easier: load_all_plugins.

use cPanel::TaskQueue::PluginManager;
cPanel::TaskQueue::PluginManager::load_all_plugins(
    directories => [ '/usr/lib/TaskPlugins', '/home/fred/MyTasks' ],
    namespaces => [ 'cPanel::TaskQueue::Plugin', 'TaskQueue::MyPlugins', 'TaskProcessors::Plugin' ]
);

As with load_plugins, each listed directory must be part of perl's include path. In each supplied directory, we attempt to load any plugins in each of the specified namespaces. In this example, that would be 6 directories in all.

How do I define a Plugin Module?

The only distinguishing feature of a TaskQueue plugin module is the class method to_register. This method must exist in the package declared by the module and it must return a list of array references. Each of these array references should contain two items: a command name and a cPanel::TaskQueue::Processor-derived object or code reference that can process this command.

package TaskQueue::MyPlugins::TestTasks;

{
    package TaskQueue::MyPlugins::TestTasks::SimpleChild;
    use base 'cPanel::TaskQueue::ChildProcessor;
    # define the rest of the class.
}

sub to_register {
    return (
        [ 'echo', sub { print "@_\n"; return; } ],
        [ 'sleep', sub { sleep $_[0]; return; } ],
        [ 'child', TaskQueue::MyPlugins::TestTasks::SimpleChild->new() ],
    );
}

Although people normally create one class per module, it is often useful in plugins to define a set of simple classes to support a set of related commands in a single module. The to_register method simplifies this use case.

Queuing Commands

This section focuses on using the TaskQueue-based code to execute a set of tasks. This section assumes that the appropriate cPanel::TaskQueue::Processor-derived objects have been created and registered already. If you need help with Processors, see "Defining Processors".

I need to queue a command.

You will need a cPanel::TaskQueue object to queue a command. If you do not have a cPanel::TaskQueue object, see "I need to access a TaskQueue defined elsewhere."

Assume you have a processor for the command notify that takes the name of a user and a message to send. You can use the following code (assuming $tq is your TaskQueue):

my $qid = $tq->queue_task( 'notify fred "Hi, Fred! This is your wake-up call."' );

The return value of queue_task is a queue_id which you could use to remove the task before it is processed. If the value is undef it usually means that the task was considered to be a duplicate, it has invalid arguments, or there is no processor for notify.

I need to schedule a command to be executed after a particular time.

You will need a cPanel::TaskQueue::Scheduler object to schedule a command. If you do not have a cPanel::TaskQueue::Scheduler object, see "I need to access a Scheduler defined elsewhere."

Assume you have a processor for the command notify that takes the name of a user and a message to send. If you want to queue the command on Feb 13, 2009 at 5:31::30 pm, you can do the following (assuming $ts is your TaskQueue::Scheduler):

my $sid = $ts->schedule_task(
    'notify fred "Hi, Fred! This is your wake-up call."',
    {at_time=>123456789}
);

The at_time parameter of this method is the time you want to execute the command in epoch seconds. The command will not be queued before that point. The actual time that the command is queued will be determined by the processing code.

I need to schedule a command to be executed after a delay.

You will need a cPanel::TaskQueue::Scheduler object to schedule a command. If you do not have a cPanel::TaskQueue::Scheduler object, see "I need to access a Scheduler defined elsewhere."

Assume you have a processor for the command notify that takes the name of a user and a message to send. If you want to queue the command in an hour, you can do the following (assuming $ts is your TaskQueue::Scheduler):

my $sid = $ts->schedule_task(
    'notify fred "Hi, Fred! This is your wake-up call."',
    {delay_seconds=>3600}
);

The delay_seconds parameter of this method is the number of seconds to delay before queuing the command. The command will be queued some time after the delay specified. The actual time that the command is queued is determined by the processing code.

I need to schedule a task that will retry if it doesn't complete.

You will need a cPanel::TaskQueue::Scheduler object to schedule a command. If you do not have a cPanel::TaskQueue::Scheduler object, see "I need to access a Scheduler defined elsewhere."

Assume you have a ChildProcessor for the command notify that takes the name of a user and a message to send. If you want to queue the command in an hour and have it retry up to 5 times, you can do the following (assuming $ts is your TaskQueue::Scheduler):

my $sid = $ts->schedule_task(
    'notify fred "Hi, Fred! This is your wake-up call."',
    {delay_seconds=>3600, attempts=>5}
);

The delay_seconds parameter of this method is the number of seconds to delay before queuing the command. The command will be queued some time after the delay specified. The actual time that the command is queued is determined by the processing code. If this task times out, the task will be rescheduled up to 4 more times, as specified by the attempts parameter.

Defining Processors

In order to have commands to queue, we must have code that executes those commands. The cPanel::TaskQueue::Processor class defines the interface for this functionality. The cPanel::TaskQueue::ChildProcessor class extends that interface to better handle long running tasks. To add you own functionality, define a new class that derives from one of these and provides the extra functionality you need. The following sections should help in understanding what you might want to change in your subclasses.

I need a simple processor that executes some code quickly.

Derive your class from cPanel::TaskQueue::Processor and override the process_task method. You will also need to make certain your new command is registered with the TaskQeue system.

For example,

package TaskQueue::Messaging::Ping;
use strict;
use warnings;

use Messaging();
use base 'cPanel::TaskQueue::Processor';

sub process_task {
    my ($self, $task) = @_;

    my $target = $task->args()->[0];
    my $m = Messaging->new( $target );
    $m->ping();
    return;
}

This example assumes that there is a Messaging module that does all of the work, of course. This task will run in-process, so any further queue processing is blocked until it completes.

I need a command that only accepts certain valid arguments.

In general, the arguments to your command will require some level of validation. This validation is provided by overriding the is_valid_args method of cPanel::TaskQueue::Processor. For example, let's say that the notify command expects a user name and a message string. Your validation method might look like this:

sub is_valid_args {
    my ($self, $task) = @_;

    return if 2 != @{$task->args()};
    return unless $task->get_arg( 0 ) =~ /^[-\w]+$/;
}

The method verifies that we have both arguments and that the first looks like a valid user name.

I need a command that is smarter about squashing duplicates.

The default method of recognizing duplicates is to verify that the command name and all of the arguments are identical. If you want to consider similar commands identical (say notify and email are close enough) or you don't want to compare all of the arguments, then you need to override the is_dupe method of cPanel::TaskQueue::Processor.

For example, let's say that the notify and email commands are close enough that we want to treat them as duplicates. Your duplicate processing would then look like this:

sub is_dupe {
    my ($self, $a, $b) = @_;

    return unless $a->command() =~ /^(?:email|notify)$/;
    return unless $b->command() =~ /^(?:email|notify)$/;

    my $a_args = $a->args();
    my $b_args = $b->args();
    return unless @{$a_args} == @{$b_args};

    foreach my $i ( 0 .. $#{$a_args} ) {
        return unless $a_args->[$i] eq $b_args->[$i];
    }

    return 1;
}

So, we treat email and notify as the same for duplicate processing, but we still check all of the arguments to make sure they are the same.

I need a command that overrides some other commands in the queue.

Sometimes a command may invalidate the need for other commands in the queue. For example, a command to restart a server is not very useful if a command to shut down the server is in the queue behind it. To provide the ability to remove commands that have previously been queued in favor of a new command, you can override the overrides method of cPanell:TaskQueue::Processor.

For example, let's say we have a special feature in the notify command to send a message to ALL. This should obviously override a notification to any individual with the same message. The overrides method would then look like this:

sub overrides {
    my ($self, $new, $old) = @_;
    return unless $old->command() eq 'notify';
    return unless $new->get_arg( 0 ) eq 'ALL';

    return $old->get_arg( 1 ) eq $new->get_arg( 1 );
}

If the old command is not a notify or the current notify is not sent to ALL, we won't override. This version of the subroutine takes advantage of fact that there are only two parameters, to do a simple test of the message.

I need a command that may take a long time to process, without stopping queue.

Some tasks should probably run in the background. If your task is waiting on a network resource or server, we shouldn't tie up the queue-processing process while we wait. If this describes your task, derive from cPanel::TaskQueue::ChildProcess instead of cPanel::TaskQueue::Process and override _do_child_task instead of process_task.

For example, let's say you have a task that needs to make a request to a webserver to complete its work. You might implement that as follows:

package MyTasks::WebRequest;

use strict;
use warnings;
use base 'cPanel::TaskQueue::ChildProcessor';
use LWP::Simple ();
use Carp();

my %url_of = (
     # assume a hash of keywords to urls
);

sub is_valid_args {
    my ($self, $task) = @_;
    return exists $url_of{ $task->get_arg( 0 ) };
}

sub _do_child_task {
    my ($self, $task) = @_;

    if ( !LWP::Simple::get( $url_of{ $task->get_arg( 0 ) } ) ) {
        Carp::croak "Could not access site for '", $task->get_arg( 0 ), "'.\n";
    }

    return;
}

When the task is processed, a child process will be launched that will perform this function. Meanwhile, control will return to the queue-processing code which may be able to launch another task while this one runs.

I need a command that takes longer than normal to process.

Some background processes may need to run for a very long time. The default timeout for a child process is an hour, which should handle most jobs. However, if you need to run a very long process, such as a backup or data migration, an hour may not be enough. On the other hand, you might have a task that is guaranteed to complete in ten minutes if it is successful. You might want to reduce the timeout in that case.

To provide this ability, override the get_child_timeout method to return the appropriate value. A false return value uses the default timeout.

If you needed a 2 hour timeout, your get_child_timeout method would look like this:

sub get_child_timeout {
    my ($self) = @_;

    return 2 * 3600;
}

I want to specify the amount of time to delay a retry.

If a child process times out, the default retry is scheduled 15 minutes later. Depending on the task, this delay may not be enough (or may be too much). You can change this value by overriding the get_reschedule_delay method from cPanel::TaskQueue::ChildProcessor. This method is passed the $task as a parameter and should return the number of seconds until the task should be queued again. For example,

sub get_reschedule_delay {
    my ($self, $task) = @_;

    # wait two hours if we're down to our last try.
    return 2*3600 if 1 == $task->retries_remaining();
    # Otherwise try in one hour.
    return 3600; 
}

I want to retry on failure, not just on timeout.

If your process fails quickly but you would still like to retry, you can call the cPanel::TaskQueue::ChildProcessor::retry_task method directly. This method expects the task as a required parameter. It also excepts a reschedule delay that will override the get_reschedule_delay value.

As an example, let's say that the main code of the task is in the method doit, which throws an exception on error. You could add retry on error as follows:

sub _do_child_task {
    my ($self, $task) = @_;

    eval {
        doit();
    };
    if ($@) {
        my $ex = $@;

        # Specific retry on two errors
        return $self->retry_task( $task, 3600 ) if $ex =~ /try again later/;
        return $self->retry_task( $task, 300 )  if $ex =~ /unavailable/;

        # Let the unrecognized ones go through.
        Carp::croak $ex;
    }
}

This example not only shows the ability to retry, but also the ability to pick a different delay for the different tasks. We also allow exceptions we don't recognize to continue to the calling code which we expect to handle them.

Queue Processing

I want to process the next command in the queue.

You will need a cPanel::TaskQueue object to process commands from that queue. If you do not have a cPanel::TaskQueue object, see "I need to access a TaskQueue defined elsewhere."

You process the next command in the queue as follows:

$tq->process_next_task();

This method returns true if the processing is complete or false if a child task was launched. This method also blocks until a task in the queue can be executed. If there are too many outstanding processes, this method will block until a task can be executed.

If there are no items in the queue, this method returns true.

I want to process the next command in the queue without blocking.

You will need a cPanel::TaskQueue object to process commands from that queue. If you do not have a cPanel::TaskQueue object, see "I need to access a TaskQueue defined elsewhere."

If you only want to process an item from the queue if there is a process slot open, you need to test the state of the processing queue before execution as follows:

if ( $tq->has_work_to_do() ) {
    $tq->process_next_task();
}

One problem with this approach occurs if more than one process is executing items out of the queue, that would generate a race condition between the test and the process methods. This is only one reason why you should avoid having more than one processing program.

I want to wait until the commands that are currently processing are complete.

You will need a cPanel::TaskQueue object to process commands from that queue. If you do not have a cPanel::TaskQueue object, see "I need to access a TaskQueue defined elsewhere."

For some uses, you might want to stop processing the queue and wait for all of the currently executing child tasks to complete before continuing.

$tq->finish_all_processing();

Warning: This method can block for an extremely long period of time depending on how the queue is configured and what tasks are currently being processed.

I want to know when the next scheduled task should start.

You will need a cPanel::TaskQueue::Scheduler object for this action. If you do not have a cPanel::TaskQueue::Scheduler object, see "I need to access a Scheduler defined elsewhere."

The scheduler always knows when the next scheduled task should run:

my $secs = $ts->seconds_until_next_task();

If this number is 0 or negative, the next scheduled command is ready to be queued.

I want to be queue all scheduled tasks that are ready for processing.

The most useful thing to do with the scheduler is to queue any commands that are ready to be queued. You will need both a cPanel::TaskQueue::Scheduler object and a cPanel::TaskQueue object for this action. If you do not have a cPanel::TaskQueue::Scheduler object, see "I need to access a Scheduler defined elsewhere." If you do not have a cPanel::TaskQueue object, see "I need to access a TaskQueue defined elsewhere."

my $count = $ts->process_ready_tasks( $tq ); 

The return value is the number of tasks queued in this call. This is necessary because it is very possible that more than one task will be ready to be queued at any given point in time.

I need a processing loop to efficiently handle the Tasks in a TaskQueue.

When processing tasks in the cPanel::TaskQueue, we want to process the Tasks as quickly as possibly without wasting a lot of CPU time if there is no work to do. Although the cPanel::TaskQueue class has methods for processing individual tasks, a continuous, efficient processing loop requires a little more code.

Given a cPanel::TaskQueue object in $queue, the following loop could be the core of a processing system.

while ( $is_running ) {
    eval {
        if ( $queue->has_work_to_do() ) {
            $queue->process_next_task();
        }
        else {
            sleep $wait;
        }
    };
    warn "Exception detected: $@" if $@;
}

This code assumes the variable $is_running is controlled by some other code to determine if the processing loop should stop. It also assumes $wait holds the number of seconds to wait between attempts to run tasks. The value of $wait is used to trade off between CPU load when there are no tasks to run and the speed with which the loop will recognize new tasks.

The eval statement protects the loop from stopping due to exceptions.

Each pass through the loop processes a Task if one is available. One feature of this design is that the loop does not sleep if there are Tasks to be processed. If you don't want to process Tasks as fast as possible, you need to modify the loop.

I need a processing loop to efficiently handle the Tasks in both the TaskQueue and a Scheduler.

When processing Tasks that might need scheduling, we need to use both a cPanel::TaskQueue and a cPanel::TaskQueue::Scheduler. Although the loop for processing Tasks without scheduling is pretty straight-forward. This processing loop is a bit more finicky.

Given a cPanel::TaskQueue object in $queue and a cPanel::TaskQueue::Scheduler object in $sched, the following loop would perform the necessary processing.

while ( $is_running ) {
    eval {
        $sched->process_ready_tasks( $queue );
        if ( $queue->has_work_to_do() ) {
            $queue->process_next_task();
        }
        else {
            my $wait = $sched->seconds_until_next_task();
            # loop if some task is now ready
            next if defined $wait and 0 == $wait;

            $wait = $wait_time if !$wait || $wait > $wait_time;
            sleep $wait;
        }
    };
    warn "Exception detected: $@" if $@;
}

This code assumes the variable $is_running is controlled by some other code to determine if the processing loop should stop. It also assumes $wait_time holds the number of seconds to wait between attempts to run tasks. The value of $wait_time is used to trade off between CPU load when there are no tasks to run and the speed with which the loop will recognize new tasks.

The eval statement protects the loop from stopping due to exceptions.

This loop is somewhat more complicated due to the need to service the Scheduler in a timely fashion. Any time we don't have something to process, we want to sleep the lesser of the default wait time and the time until the next scheduled event. We also want to schedule Tasks as soon as they are ready. It's also important to check the Scheduler on every pass through the loop, otherwise a series of long-running Tasks could prevent us from scheduling Tasks that are now ready.

BUGS AND LIMITATIONS

No bugs have been reported in these recipes.

SEE ALSO

cPanel::TaskQueue::Processor, cPanel::TaskQueue::Task, and cPanel::CacheFile

LICENCE AND COPYRIGHT

Copyright (c) 2010, cPanel, Inc. All rights reserved.

This module is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See perlartistic.

DISCLAIMER OF WARRANTY

BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY FOR THE SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES PROVIDE THE SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE SOFTWARE IS WITH YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING, REPAIR, OR CORRECTION.

IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR REDISTRIBUTE THE SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE SOFTWARE (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.