NAME
Queue::DBI - A queueing module with an emphasis on safety, using DBI as a storage system for queued data.
VERSION
Version 1.7.0
SYNOPSIS
This module allows you to safely use a queueing system by preventing backtracking, infinite loops and data loss.
An emphasis of this distribution is to provide an extremely reliable dequeueing mechanism without having to use transactions.
use Queue::DBI;
my $queue = Queue::DBI->new(
'queue_name' => $queue_name,
'database_handle' => $dbh,
'cleanup_timeout' => 3600,
'verbose' => 1,
);
$queue->enqueue( $data );
while ( my $queue_element = $queue->next() )
{
next
unless $queue_element->lock();
eval {
# Do some work
process( $queue_element->{'email'} );
};
if ( $@ )
{
# Something failed, we clear the lock but don't delete the record in the
# queue so that we can try again next time
$queue_element->requeue();
}
else
{
# All good, remove definitively the element
$queue_element->success();
}
}
# Requeue items that have been locked for more than 6 hours
$queue->cleanup( 6 * 3600 );
METHODS
new()
Create a new Queue::DBI object.
my $queue = Queue::DBI->new(
'queue_name' => $queue_name,
'database_handle' => $dbh,
'cleanup_timeout' => 3600,
'verbose' => 1,
'max_requeue_count' => 5,
);
# Custom table names (optional).
my $queue = Queue::DBI->new(
'queue_name' => $queue_name,
'database_handle' => $dbh,
'cleanup_timeout' => 3600,
'verbose' => 1,
'max_requeue_count' => 5,
'queues_table_name' => $custom_queues_table_name,
'queue_elements_table_name' => $custom_queue_elements_table_name,
);
Parameters:
'queue_name'
Mandatory, the name of the queue elements will be added to / removed from.
'database handle'
Mandatory, a DBI / DBD::Mysql object.
'cleanup_timeout'
Optional, if set to an integer representing a time in seconds, the module will automatically make available again elements that have been locked longuer than that time.
'verbose'
Optional, see verbose() for options.
'max_requeue_count'
By default, Queue:::DBI will retrieve again the queue elements that were requeued without limit to the number of times they have been requeued. Use this option to specify how many times an element can be requeued before it is ignored when retrieving elements.
'queues_table_name'
By default, Queue::DBI uses a table named 'queues' to store the queue definitions. This allows using your own name, if you want to support separate queuing systems or legacy systems.
'queue_elements_table_name'
By default, Queue::DBI uses a table named 'queue_elements' to store the queued data. This allows using your own name, if you want to support separate queuing systems or legacy systems.
verbose()
Control the verbosity of the warnings in the code.
$queue->verbose(1); # turn on verbose information
$queue->verbose(2); # be extra verbose
$queue->verbose(0); # quiet now!
warn 'Verbose' if $queue->verbose(); # getter-style
warn 'Very verbose' if $queue->verbose() > 1;
0 will not display any warning, 1 will only give one line warnings about the current operation and 2 will also usually output the SQL queries performed.
max_requeue_count()
Sets the number of time an element can be requeued before it is ignored when retrieving elements. Set it to $Queue::DBI::UNLIMITED_RETRIES to reset Queue::DBI back to its default behavior of re-pulling elements without limit.
# Don't keep pulling the element if it has been requeued more than 5 times.
$queue->max_requeue_count( 5 );
# Keep pulling elements regardless of the number of times they have been
# requeued.
$queue->max_requeue_count( $UNLIMITED_RETRIES );
# Find how many times a queue object will try to requeue.
my $max_requeue_count = $queue->max_requeue_count();
get_queue_id()
Returns the queue ID corresponding to the current queue object.
my $queue_id = $self->get_queue_id();
count()
Returns the number of elements in the queue.
enqueue()
Adds a new element at the end of the current queue.
my $queue_element_id = $queue->enqueue( $data );
The data passed can be a scalar or a reference to a complex data structure. There is no limitation on the type of data that can be stored as it is serialized for storage in the database.
next()
Retrieves the next element from the queue and returns it in the form of a Queue::DBI::Element object.
my $queue_element = $queue->next();
while ( my $queue_element = $queue->next() )
{
# [...]
}
Additionally, for testing purposes, a list of IDs to use when trying to retrieve elements can be specified using 'search_in_ids':
my $queue_item = $queue->next( 'search_in_ids' => [ 123, 124, 125 ] );
retrieve_batch()
Retrieves a batch of elements from the queue and returns them in an arrayref.
This method requires an integer to be passed as parameter to indicate the maximum size of the batch to be retrieved.
my $queue_elements = $queue->retrieve_batch( 500 );
foreach ( @$queue_elements )
{
# [...]
}
Additionally, for testing purposes, a list of IDs to use when trying to retrieve elements can be specified using 'search_in_ids':
my $queue_items = $queue->retrieve_batch(
10,
'search_in_ids' => [ 123, 124, 125 ],
);
get_element_by_id()
Retrieves a queue element using a queue element ID, ignoring any lock placed on that element.
This method is mostly useful when doing a lock on an element and then calling success/requeue asynchroneously.
This method requires a queue element ID to be passed as parameter.
my $queue_element = $queue->get_element_by_id( 123456 );
cleanup()
Requeue items that have been locked for more than the time in seconds specified as parameter.
Returns the items requeued so that a specific action can be taken on them.
my $elements = $queue->cleanup( $time_in_seconds );
foreach my $element ( @$elements )
{
# $element is a Queue::DBI::Element object
}
create_tables()
Creates the tables in the database the database handle passed as parameter points to. This allows setting up Queue::DBI's underlying database structure quickly.
Queue::DBI::create_tables(
dbh => $dbh,
drop_if_exist => $boolean,
sqlite => $boolean,
);
By default, it won't drop any table but you can force that by setting 'drop_if_exist' to 1. 'sqlite' is also set to 0 by default, as this parameter is used only for testing.
INTERNAL METHODS
get_dbh()
Returns the database handle used for this queue.
my $dbh = $queue->get_dbh();
get_queues_table_name()
Returns the name of the table used to store queue definitions.
my $queues_table_name = $queue->get_queues_table_name();
get_queue_elements_table_name()
Returns the name of the table used to store queue definitions.
my $queue_elements_table_name = $queue->get_queue_elements_table_name();
AUTHOR
Guillaume Aubert, <aubertg at cpan.org>
.
BUGS
Please report any bugs or feature requests to bug-queue-dbi at rt.cpan.org
, or through the web interface at http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Queue-DBI. I will be notified, and then you'll automatically be notified of progress on your bug as I make changes.
SUPPORT
You can find documentation for this module with the perldoc command.
perldoc Queue::DBI
You can also look for information at:
RT: CPAN's request tracker
AnnoCPAN: Annotated CPAN documentation
CPAN Ratings
Search CPAN
ACKNOWLEDGEMENTS
Thanks to Geeknet, Inc. http://www.geek.net for funding the initial development of this code!
Thanks to Jamie McCarthy for the locking mechanism improvements in version 1.1.
COPYRIGHT & LICENSE
Copyright 2009-2011 Guillaume Aubert.
This program is free software; you can redistribute it and/or modify it under the terms of the Artistic License.
See http://dev.perl.org/licenses/ for more information.