NAME
App::Basis::Queue - Simple database backed FIFO queues
VERSION
version 0.2
SYNOPSIS
use App::Basis::Queue;
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
# save some application audit data
$queue->add( 'app_start', {
ip => 12.12.12.12,
session_id => 12324324345,
client_id => 248296432984,
appid => 2,
app_name => 'twitter'
}) ;
# in another process, we want to process that data
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
$queue->process( 'app_start', count => 10, callback => \&processing_callback) ;
DESCRIPTION
Why have another queuing system? Well for me I wanted a queuing system that did not mean I needed to install and maintain another server (ie RabbitMQ). Something that could run against existing DBs (eg PostgreSQL). PGQ was an option, but as it throws away queued items if there is not a listener, then this was useless! Some of the Job/Worker systems required you to create classes and plugins to process the queue. Queue::DBI almost made the grade but only has one queue. I need multiple queues!
So I created this simple/basic system. You need to expire items, clean the queue and do things like that by hand, there is no automation. You process items in the queue in chunks, not via a nice iterator.
There is no queue polling, there can only be one consumer of a record.
NAME
App::Basis::Queue
NOTES
I would use msgpack instead of JSON to store the data, but processing BLOBS in PostgreSQL is tricky.
To make the various inserts/queries work faster I cache the prepared statement handles against a key and the fields that are being inserted, this speeds up the inserts roughly by 3 times Adding 1000 records was taking 30+ seconds, now its more like 7 on PostgreSQL.
AUTHOR
kmulholland, moodfarm@cpan.org
VERSIONS
v0.1 2013-08-02, initial work
TODO
Currently the processing functions only process the earliest $MAX_PROCESS_ITEMS but by making use of the counter in the info table, then we could procss the entire table or at least a much bigger number and do it in chunks of $MAX_PROCESS_ITEMS
Processing could be by date
Add a method to move processed items to queue_name/processed and failures to queue_name/failures or add them to these queues when marking them as processed or failed, will need a number of other methods to be updated but keeps less items in the unprocessed queue
See Also
API
new
Create a new instance of a queue
prefix - set a prefix name of the tables, allows you to have dev/test/live versions in the same database debug - set basic STDERR debugging on or off skip_table_check - don't check to see if the tables need creating
my $queue = App::Basis::Queue->new( dbh => $dbh ) ;
add
Add some data into a named queue.
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
# save some application audit data
$queue->add( 'app_start', {
ip => 12.12.12.12, session_id => 12324324345, client_id => 248296432984,
appid => 2, app_name => 'twitter'
}) ;
process
process up to 100 from the queue
a refrence to the queue object is passed to the callback along with the name of the queue and the record that is to be procssed.
If the callback returns a non-zero value then the record will be marked as processed. If the callback returns a zero value, then the processing is assumed to have failed and the failure count will be incremented by 1. If the failue count matches our maximum allowed limit then the item will not be available for any further processing.
sub processing_callback {
my ( $queue, $qname, $record ) = @_;
return 1;
}
$queue->process( 'queue_name', count => 5, callback => \&processing_callback) ;
process_failures
process up to 100 from the queue a refrence to the queue object is passed to the callback along with the name of the queue and the record that is to be procssed. As these are failures we are not interested in an value of the callback function.
sub processing_failure_callback {
my ( $queue, $qname, $record ) = @_;
# items before 2013 were completely wrong so we can delete
if( $record->{added} < '2013-01-01') {
$queue->delete_record( $record) ;
} else {
# failures in 2013 was down to a bad processing function
$queue->reset_record( $record) ;
}
}
$queue->process( 'queue_name', count => 5, callback => \&processing_callback) ;
queue_size
get the count of unprocessed items in the queue
my $count = $queue->queue_size( 'queue_name') ;
say "there are $count unprocessed items in the queue" ;
list_queues
obtains a list of all the queues used by this database
my $qlist = $queue->list_queues() ;
foreach my $q (@$qlist) {
say $q ;
}
stats
obtains stats about the data in the queue, this may be time/processor intensive so use with care!
provides counts of unprocessed, processed, failures max process_failure, avg process_failure, earliest_added, latest_added, min_data_size, max_data_size, avg_data_size, total_records avg_elapsed, max_elapsed, min_elapsed
my $stats = $queue->stats( 'queue_name') ;
say "processed $stats->{processed}, failures $stats->{failure}, unprocessed $stats->{unprocessed}" ;
delete_record
delete a single record from the queue requires a data record which contains infomation we will use to determine the record
may be used in processing callback functions
sub processing_callback {
my ( $queue, $qname, $record ) = @_;
# lets remove records before 2013
if( $record->{added) < '2013-01-01') {
$queue->delete_record( $record) ;
}
return 1 ;
}
reset_record
clear failure flag from a failed record requires a data record which contains infomation we will use to determine the record
may be used in processing callback functions
sub processing_callback {
my ( $queue, $qname, $record ) = @_;
# allow partially failed (and failed) records to be processed
if( $record->{process_failure) {
$queue->reset_record( $record) ;
}
return 1 ;
}
purge_queue
purge will remove all processed items and failures (process_failure >= 5). These are completely removed from the database
my $before = $queue->stats( 'queue_name') ;
$queue->purge_queue( 'queue_name') ;
my $after = $queue->stats( 'queue_name') ;
say "removed " .( $before->{total_records} - $after->{total_records}) ;
remove_queue
remove a queue and all of its records
$queue->remove_queue( 'queue_name') ;
my $after = $queue->list_queues() ;
# convert list into a hash for easier checking
my %a = map { $_ => 1} @after ;
say "queue removed" if( !$q->{queue_name}) ;
reset_failures
clear any process_failure values from all unprocessed items
my $before = $queue->stats( 'queue_name') ;
$queue->reset_failures( 'queue_name') ;
my $after = $queue->stats( 'queue_name') ;
say "reset " .( $after->{unprocessed} - $before->{unprocessed}) ;
remove_failues
permanently delete failures from the database
$queue->remove_failues( 'queue_name') ;
my $stats = $queue->stats( 'queue_name') ;
say "failues left " .( $stats->{failures}) ;
remove_tables
If you never need to use the database again, it can be completely removed
$queue_>remove_tables() ;
AUTHOR
Kevin Mulholland <moodfarm@cpan.org>
COPYRIGHT AND LICENSE
This software is copyright (c) 2014 by Kevin Mulholland.
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.