NAME

Queue::DBI - A queueing module with an emphasis on safety, using DBI as a storage system for queued data.

VERSION

Version 2.6.1

SYNOPSIS

This module allows you to safely use a queueing system by preventing backtracking, infinite loops and data loss.

An emphasis of this distribution is to provide an extremely reliable dequeueing mechanism without having to use transactions.

use Queue::DBI;
my $queue = Queue::DBI->new(
	'queue_name'      => $queue_name,
	'database_handle' => $dbh,
	'cleanup_timeout' => 3600,
	'verbose'         => 1,
);

$queue->enqueue( $data );

while ( my $queue_element = $queue->next() )
{
	next
		unless $queue_element->lock();

	eval {
		# Do some work
		process( $queue_element->{'email'} );
	};
	if ( $@ )
	{
		# Something failed, we clear the lock but don't delete the record in the
		# queue so that we can try again next time
		$queue_element->requeue();
	}
	else
	{
		# All good, remove definitively the element
		$queue_element->success();
	}
}

# Requeue items that have been locked for more than 6 hours
$queue->cleanup( 6 * 3600 );

SUPPORTED DATABASES

This distribution currently supports:

  • SQLite

  • MySQL

  • PostgreSQL

Please contact me if you need support for another database type, I'm always glad to add extensions if you can help me with testing.

METHODS

new()

Create a new Queue::DBI object.

my $queue = Queue::DBI->new(
	'queue_name'        => $queue_name,
	'database_handle'   => $dbh,
	'cleanup_timeout'   => 3600,
	'verbose'           => 1,
	'max_requeue_count' => 5,
);

# Custom table names (optional).
my $queue = Queue::DBI->new(
	'queue_name'                => $queue_name,
	'database_handle'           => $dbh,
	'cleanup_timeout'           => 3600,
	'verbose'                   => 1,
	'max_requeue_count'         => 5,
	'queues_table_name'         => $custom_queues_table_name,
	'queue_elements_table_name' => $custom_queue_elements_table_name,
);

Parameters:

  • 'queue_name'

    Mandatory, the name of the queue elements will be added to / removed from.

  • 'database handle'

    Mandatory, a DBI object.

  • 'cleanup_timeout'

    Optional, if set to an integer representing a time in seconds, the module will automatically make available again elements that have been locked longuer than that time.

  • 'verbose'

    Optional, control the verbosity of the warnings in the code. 0 will not display any warning; 1 will only give one line warnings about the current operation; 2 will also usually output the SQL queries performed.

  • 'max_requeue_count'

    By default, Queue:::DBI will retrieve again the queue elements that were requeued without limit to the number of times they have been requeued. Use this option to specify how many times an element can be requeued before it is ignored when retrieving elements.

  • 'queues_table_name'

    By default, Queue::DBI uses a table named 'queues' to store the queue definitions. This allows using your own name, if you want to support separate queuing systems or legacy systems.

  • 'queue_elements_table_name'

    By default, Queue::DBI uses a table named 'queue_elements' to store the queued data. This allows using your own name, if you want to support separate queuing systems or legacy systems.

  • 'lifetime'

    By default, Queue:::DBI will fetch elements regardless of how old they are. Use this option to specify how old (in seconds) an element can be and still be retrieved for processing.

get_queue_id()

Returns the queue ID corresponding to the current queue object.

my $queue_id = $queue->get_queue_id();

count()

Returns the number of elements in the queue.

my $elements_count = $queue->count();

Optional parameter:

  • exclude_locked_elements

    Exclude locked elements from the count. Default 0.

my $unlocked_elements_count = $queue->count(
	exclude_locked_elements => 1
);

enqueue()

Adds a new element at the end of the current queue.

my $queue_element_id = $queue->enqueue( $data );

The data passed can be a scalar or a reference to a complex data structure. There is no limitation on the type of data that can be stored as it is serialized for storage in the database.

next()

Retrieves the next element from the queue and returns it in the form of a Queue::DBI::Element object.

my $queue_element = $queue->next();

while ( my $queue_element = $queue->next() )
{
	# [...]
}

Additionally, for testing purposes, a list of IDs to use when trying to retrieve elements can be specified using 'search_in_ids':

my $queue_item = $queue->next( 'search_in_ids' => [ 123, 124, 125 ] );

retrieve_batch()

Retrieves a batch of elements from the queue and returns them in an arrayref.

This method requires an integer to be passed as parameter to indicate the maximum size of the batch to be retrieved.

my $queue_elements = $queue->retrieve_batch( 500 );

foreach ( @$queue_elements )
{
	# [...]
}

Additionally, for testing purposes, a list of IDs to use when trying to retrieve elements can be specified using 'search_in_ids':

my $queue_items = $queue->retrieve_batch(
	10,
	'search_in_ids' => [ 123, 124, 125 ],
);

get_element_by_id()

Retrieves a queue element using a queue element ID, ignoring any lock placed on that element.

This method is mostly useful when doing a lock on an element and then calling success/requeue asynchroneously.

This method requires a queue element ID to be passed as parameter.

my $queue_element = $queue->get_element_by_id( 123456 );

cleanup()

Requeue items that have been locked for more than the time in seconds specified as parameter.

Returns the items requeued so that a specific action can be taken on them.

my $elements = $queue->cleanup( $time_in_seconds );
foreach my $element ( @$elements )
{
	# $element is a Queue::DBI::Element object
}

purge()

Remove (permanently, caveat emptor!) queue elements based on how many times they've been requeued or how old they are, and return the number of elements deleted.

# Remove permanently elements that have been requeued more than 10 times.
my $deleted_elements_count = $queue->purge( max_requeue_count => 10 );

# Remove permanently elements that were created over an hour ago.
my $deleted_elements_count = $queue->purge( lifetime => 3600 );

Important: locked elements are not purged even if they match the criteria, as they are presumed to be currently in process and purging them would create unexpected failures in the application processing them.

Also note that max_requeue_count and lifetime cannot be combined.

ACCESSORS

get_max_requeue_count()

Return how many times an element can be requeued before it is ignored when retrieving elements.

my $max_requeue_count = $queue->get_max_requeue_count();

set_max_requeue_count()

Set the number of time an element can be requeued before it is ignored when retrieving elements. Set it to undef to disable the limit.

# Don't keep pulling the element if it has been requeued more than 5 times.
$queue->set_max_requeue_count( 5 );+

# Retry without limit.
$queue->set_max_requeue_count( undef );

get_lifetime()

Return how old an element can be before it is ignored when retrieving elements.

# Find how old an element can be before the queue will stop retrieving it.
my $lifetime = $queue->get_lifetime();

set_lifetime()

Set how old an element can be before it is ignored when retrieving elements.

Set it to undef to reset Queue::DBI back to its default behavior of retrieving elements without time limit.

# Don't pull queue elements that are more than an hour old.
$queue->set_lifetime( 3600 );

# Pull elements without time limit.
$queue->set_lifetime( undef );

get_verbose()

Return the verbosity level, which is used in the module to determine when and what type of debugging statements / information should be warned out.

See set_verbose() for the possible values this function can return.

warn 'Verbose' if $queue->get_verbose();

warn 'Very verbose' if $queue->get_verbose() > 1;

set_verbose()

Control the verbosity of the warnings in the code:

  • 0 will not display any warning;

  • 1 will only give one line warnings about the current operation;

  • 2 will also usually output the SQL queries performed.

$queue->set_verbose(1); # turn on verbose information

$queue->set_verbose(2); # be extra verbose

$queue->set_verbose(0); # quiet now!

INTERNAL METHODS

freeze()

Serialize an element to store it in a SQL "text" column.

my $frozen_data = $queue->freeze( $data );

thaw()

Deserialize an element which was stored a SQL "text" column.

my $thawed_data = $queue->thaw( $frozen_data );

DEPRECATED METHODS

create_tables()

Please use create_tables() in Queue::DBI::Admin instead.

Here is an example that shows how to refactor your call to this deprecated function:

# Load the admin module.
use Queue::DBI::Admin;

# Create the object which will allow managing the queues.
my $queues_admin = Queue::DBI::Admin->new(
	database_handle => $dbh,
);

# Create the tables required by Queue::DBI to store the queues and data.
$queues_admin->create_tables(
	drop_if_exist => $boolean,
);

lifetime()

Please use get_lifetime() and set_lifetime() instead.

verbose()

Please use get_verbose() and set_verbose() instead.

max_requeue_count()

Please use get_max_requeue_count() and set_max_requeue_count() instead.

INTERNAL METHODS

get_dbh()

Returns the database handle used for this queue.

my $dbh = $queue->get_dbh();

get_queues_table_name()

Returns the name of the table used to store queue definitions.

my $queues_table_name = $queue->get_queues_table_name();

get_queue_elements_table_name()

Returns the name of the table used to store queue definitions.

my $queue_elements_table_name = $queue->get_queue_elements_table_name();

BUGS

Please report any bugs or feature requests through the web interface at https://github.com/guillaumeaubert/Queue-DBI/issues/new. I will be notified, and then you'll automatically be notified of progress on your bug as I make changes.

SUPPORT

You can find documentation for this module with the perldoc command.

perldoc Queue::DBI

You can also look for information at:

AUTHOR

Guillaume Aubert, <aubertg at cpan.org>.

ACKNOWLEDGEMENTS

I originally developed this project for ThinkGeek (http://www.thinkgeek.com/). Thanks for allowing me to open-source it!

COPYRIGHT & LICENSE

Copyright 2009-2014 Guillaume Aubert.

This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License version 3 as published by the Free Software Foundation.

This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with this program. If not, see http://www.gnu.org/licenses/