NAME

App::Basis::Queue

SYNOPSIS

use App::Basis::Queue;

my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
my $dbh = DBI->connect( $dsn, "", "",
    { RaiseError => 1, PrintError => 0, } )
    or die "Could not connect to DB $dsn" ;

my $queue = App::Basis::Queue->new( dbh => $dbh) ;

# save some application audit data for later processing
$queue->add(
    queue => '/invoice/pay',
    data => {
        ip => 12.12.12.12,
        session_id => 12324324345,
        client_id => 248296432984,
        amount => 250.45,
        reply => '/payments/made'
        },
) ;

# in another process, we want to process that data

use App::Basis::Queue;

# for the example this will be paying an invoice
sub processing_callback {
    my ( $queue, $qname, $record, $params ) = @_;

    # call the payment system
    # pay_money( $params->{auth}, $record->{client_id}, $record->{amount}) ;

    # chatter back that the payment has been made, assume it worked
    $queue->pub( queue => $record->{reply},
        data => {
        client_id => $record->{ client_id},
        success => 1,
        }
    ) ;
}

my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
my $dbh = DBI->connect( $dsn, "", "",
    { RaiseError => 1, PrintError => 0, } )
    or die "Could not connect to DB $dsn" ;
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
$queue->process(
     queue => 'app_start',
     count => 10,
     callback => \&processing_callback,
     callback_params => { auth => 'sometoken:12345'}
) ;

# for pubsub we do

use App::Basis::Queue;

my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
my $dbh = DBI->connect( $dsn, "", "",
    { RaiseError => 1, PrintError => 0, } )
    or die "Could not connect to DB $dsn" ;
my $queue = App::Basis::Queue->new( dbh => $dbh) ;
# for a system that wants to know when servers have started
$queue->publish( queue => '/chat/helo', data => { host => 'abc, msg => 'helo world') ;

# in another process

use App::Basis::Queue;
    my $dsn = "dbi:SQLite:/location/of/sqlite_db.sqlite3" ;
my $dbh = DBI->connect( $dsn, "", "",
    { RaiseError => 1, PrintError => 0, } )
    or die "Could not connect to DB $dsn" ;
my $queue = App::Basis::Queue->new( dbh => $dbh) ;

DESCRIPTION

Why have another queuing system? Well for me I wanted a queuing system that did not mean I needed to install and maintain another server (ie RabbitMQ). Something that could run against existing DBs (eg PostgreSQL). PGQ was an option, but as it throws away queued items if there is not a listener, then this was useless! Some of the Job/Worker systems required you to create classes and plugins to process the queue. Queue::DBI almost made the grade but only has one queue. Minon maybe could do what was needed but I did not find it in time.

I need multiple queues plus new requirement queue wildcards!

So I created this simple/basic system. You need to expire items, clean the queue and do things like that by hand, there is no automation. You process items in the queue in chunks, not via a nice iterator.

There is no queue polling per se you need to process the queue and try again when all are done, there can only be one consumer of a record which is a good thing, if you cannot process an item it can be marked as failed to be handled by a cleanup function you will need to create.

End of Life

I created this project mostly as a learning project, my requirements for what it does are changing which will involve client/server operations, a shared cache and locking system for the task clients, so I am going to leave this project parked and start something new

AUTHOR

kmulholland, moodfarm@cpan.org

See Also

Queue::DBI, AnyMQ::Queue, Minion

API

new

Create a new instance of a queue

Parameters

Hash of

dbh (required)

DBI database handle of database previously connected to

prefix

set a prefix name of the tables, allows you to have dev/test/live versions in the same database

debug (optional)

set basic STDERR debugging on or off

skip_table_check

don't check to see if the tables need creating

default_queue

optionally provide a default queue to work with

my $queue = App::Basis::Queue->new( dbh => $dbh ) ;

add

Add task data into a named queue. This creates a 'task' that needs to be processed.

Parameters

Hash of

queue

Name of the queue, wildcard NOT allowed

data

Data to store against the queue, can be a scalar, hashref or arrayref

Example usage

my $queue = App::Basis::Queue->new( dbh => $dbh) ;

# save some application audit data
$queue->add(
    queue => 'app_start',
    data => {
        ip => 12.12.12.12, session_id => 12324324345, client_id => 248296432984,
        appid => 2, app_name => 'twitter'
    },
) ;

push

Push simple data onto the end of a named queue.

Parameters

Hash of

queue

Name of the queue, wildcard NOT allowed

data

Data to store against the queue, can be a scalar, hashref or arrayref

Example usage

my $queue = App::Basis::Queue->new( dbh => $dbh) ;

# save some application audit data
$queue->push(
    queue => 'app_start',
    data => {
        ip => 12.12.12.12, session_id => 12324324345, client_id => 248296432984,
        appid => 2, app_name => 'twitter'
    },
) ;

pop

Remove the top item from the named queue - the oldest item on the queue

Parameters

Hash of

queue

Name of the queue, wildcards allowed

Returns

The message data only

Example usage

my $data = $queue->pop( queue => 'queue_name') ;

size

Get size of a SIMPLE queue

Parameters

Hash of

queue

Name of the queue, wildcards allowed

Example usage

my $count = $queue->size( queue => 'queue_name') ;
say "there are $count items in the queue" ;

# size can manage wildcards
$queue->size( queue => '/celestial/*') ;

process

Process up to 100 tasks from the named queue(s)

Parameters

Hash of

queue

Name of the queue, wildcards allowed

count

Number of items to process from the queue

callback

coderef to be called to each queue item, expects queue (object), queue_name and the data of the queue item (record)

A reference to the queue object is passed to the callback along with the name of the queue and the record that is to be procssed.

If the callback returns a non-zero value then the record will be marked as processed. If the callback returns a zero value, then the processing is assumed to have failed and the failure count will be incremented by 1. If the failue count matches our maximum allowed limit then the item will not be available for any further processing.

Example usage

sub processing_callback {
    my ( $queue, $qname, $record, $params ) = @_;
    # $params = { something => 'data'} ; from the process call

    return 1;
}

$queue->process(
    queue => 'queue_name',
    count => 5,
    callback => \&processing_callback
) ;

qname can contain wildcards and all matching queues will be scanned

# add things to different queues, but with a common root
$queue->add( queue => '/celestial/stars', data => { list: [ "sun", "alpha centuri"]}) ;
$queue->add( queue => '/celestial/planets', data => { list: [ "earth", "pluto", "mars"]}) ;

# process all the 'celestial' bodies queues
$queue->process( queue => '/celestial/*', count => 5,
    callback => \&processing_callback,
    callback_params => { something => 'data'}
) ;

process_failures, process_deadletters

Process up to 100 failed tasks from the queue

Parameters

Hash of

queue

Name of the queue, wildcards allowed

count

Number of items to process from the queue

callback

Coderef to be called to each queue item, expects queue (object), queue_name and the data of the queue item (record)

a refrence to the queue object is passed to the callback along with the name of the queue and the record that is to be procssed. As these are failures we are not interested in an value of the callback function.

Example usage

sub processing_failure_callback {
    my ( $queue, $qname, $record, $params ) = @_;
    # $params = { something => 'data'} ; from the process call

    # items before 2013 were completely wrong so we can delete
    if( $record->{added} < '2013-01-01') {
        $queue->delete_record( $record) ;
    } else {
        # failures in 2013 was down to a bad processing function
        $queue->reset_record( $record) ;
    }
}

$queue->process(
    queue => 'queue_name',
    count => 5,
    callback => \&processing_failure_callback,
    callback_params => { something => 'data'}
) ;

# again we can use wildcards here for queue names

# add things to different queues, but with a common root
$queue->add( queue => '/celestial/stars', data => { list: [ "sun", "alpha centuri"]}) ;
$queue->add( queue => '/celestial/planets', data => { list: [ "moon", "pluto", "mars"]}) ;
# process, obviously 'moon' will fail our planet processing
$queue->process(
    queue => 'queue_name',
    count => 5,
    callback => \&processing_callback,
    callback_params => { something => 'data'}

) ;

# process all the 'celestial' bodies queues for failures - probably will just have the moon in it
$queue->process_failures(
    queue => '/celestial/*',
    count => 5,
    callback => \&processing_failure_callback,
    callback_params => { something => 'data'}
) ;

queue_size

Get the count of unprocessed TASK items in the queue

Parameters

Hash of

queue

Name of the queue, wildcards allowed

Example usage

my $count = $queue->queue_size( queue => 'queue_name') ;
say "there are $count unprocessed items in the queue" ;

# queue size can manage wildcards
$queue->queue_size( queue => '/celestial/*') ;

list_queues

Qbtains a list of all the queues used by this database

Example usage

my $qlist = $queue->list_queues() ;
foreach my $q (@$qlist) {
    say $q ;
}

peek

Have a look at an unprocessed item in a TASK queue

Parameters

Hash of

queue

Name of the queue, wildcards allowed

position

position in the queue you want to peek at (head/start) or (tail/end) - defaults to head

count

number of items to peek, defaults to 1, max is 100 (PEEK_MAX)

Returns

Hashref with the following fields queue_name added activates expires data

Example usage

my $data = $queue->peek( queue => 'queue_name', position => 'head') ;

stats

Obtains stats about the task data in the queue, this may be time/processor intensive so use with care!

Parameters

Hash of

queue

Name of the queue, wildcards allowed

provides counts of unprocessed, processed, failures max process_failure, avg process_failure, earliest_added, latest_added, min_data_size, max_data_size, avg_data_size, total_records avg_elapsed, max_elapsed, min_elapsed

Example usage

my $stats = $queue->stats( queue => 'queue_name') ;
say "processed $stats->{processed}, failures $stats->{failure}, unprocessed $stats->{unprocessed}" ;

# for all matching wildcard queues
my $all_stats = $queue->stats( queue => '/celestial/*') ;

delete_record

Delete a single task record from the queue

Parameters

record

Hashref to a record fetched with process or process_failures/deadletters

Requires a data record which contains infomation we will use to determine the record

May be used in processing callback functions

Example usage

sub processing_callback {
    my ( $queue, $qname, $record, $params ) = @_;

    # lets remove records before 2013
    if( $record->{added) < '2013-01-01') {
        $queue->delete_record( $record) ;
    }
    return 1 ;
}

reset_record

Clear the failure flag from a failed task record

Parameters

record

Hashref of data fetched with process or process_failure/deadletters

Requires a data record which contains infomation we will use to determine the record

may be used in processing callback functions

Example usage

sub processing_callback {
    my ( $queue, $qname, $record, $params ) = @_;

    # allow partially failed (and failed) records to be processed
    if( $record->{process_failure) {
        $queue->reset_record( $record) ;
    }
    return 1 ;
}

publish

Publish some chatter data into a named queue.

Parameters

Hash of

queue

Name of the queue to publish to, wildcards NOT allowed

data

Hashref of the data to be published

persist (optional)

Flag to show that this data data should be persisited (0 or 1). This will become the only persistent record available until either it is replaced or expires.

expires (optional)

Time after which this data should be ignored. Accepts unix epoch time or parsable datetime string

Example usage

my $queue = App::Basis::Queue->new( dbh => $dbh) ;

# keep track of a bit of info
$queue->publish( queue => 'app_log',
    data => {
        ip => 12.12.12.12, session_id => 12324324345, client_id => 248296432984,
        appid => 2, app_name => 'twitter'
    }
) ;

subscribe

Subscribe to a named queue with a callback.

Parameters

Hash of

queue

Name of the queue, wildcard allowed

callback

Coderef to handle any matched events

after (optional)

Unix time after which to listen for events, defaults to now, if set will skip persistent item checks

persist (optional)

Include the most recent persistent item, if using a wild card, this will match all the queues and could find multiple persistent items

Example usage

my $queue = App::Basis::Queue->new( dbh => $dbh) ;

# keep track of a bit of info
$queue->subscribe( queue => 'app_logs/*', callback => \&handler) ;
$queue->listen() ;

listen

Listen to all subcribed channels. Loops forever unless told to stop. If there is a persistent message in a queue, this will be passed to the callback before the other records.

Parameters

Hash of

events (optional)

Minimum number of events to listen for, stop after this many, may stop after more - this is across ALL the subscriptions

datetime (optional)

Unix epoch time or parsable datetime when to stop listening

persist (optional)

Include the most recent persistent item, if subscribed using using a wild card, this will match all the queues and could find multiple persistent items

listen_delay (optional)

Override the class delay, obtain events at this rate

returns

Number of chatter events actually passed to ALL the handlers

Example usage

my $queue = App::Basis::Queue->new( dbh => $dbh) ;
$queue->subscribe( '/logs/*', \&handler) ;
$queue->listen() ;    # listening forever

# or listen until christmas, checking every 30s
$queue->subscribe( '/presents/*', \&handler) ;
$queue->listen( datetime => '2015-12-25', listen_delay => 30) ;

unsubscribe

Unsubscribe from a named queue.

Parameters

Hash of

queue

Name of the queue, wildcard allowed

Example usage

sub handler {
    state $counter = 0 ;
    my $q = shift ;             # we get the queue object
    # the queue trigger that matched, the actual queue name and the data
    my ($qmatch, $queue, $data) = @_ ;

    # we are only interested in 10 messages
    if( ++$counter > 10) {
        $q->unsubscribe( queue => $queue) ;
    } else {
        say Data::Dumper( $data) ;
    }
}

my $queue = App::Basis::Queue->new( dbh => $dbh) ;
$queue->subscribe( queue => '/logs/*', callback => \&handler) ;
$queue->listen() ;

purge_tasks

Purge will remove all processed task items and failures/deadletters (process_failure >= 5). These are completely removed from the database

Parameters

Hash of

queue

Name of the queue, wildcard allowed

before (optional)

Unix epoch or parsable datetime before which items should be purged

defaults to 'now'

Example usage

my $before = $queue->stats( queue => 'queue_name', before => '2015-11-24') ;
$queue->purge_tasks( queue => 'queue_name') ;
my $after = $queue->stats( queue  => 'queue_name') ;

say "removed " .( $before->{total_records} - $after->{total_records}) ;

purge_chatter

purge will remove all chatter messages. These are completely removed from the database

Parameters

Hash of

queue

Name of the queue, wildcard allowed

before (optional)

Unix epoch or parsable datetime before which items should be purged

defaults to 'now'

Example usage

my $del = $queue->purge_chatter( queue => 'queue_name', before => '2015-11-24') ;

say "removed $del messages" ;

remove_queue

Remove a queue and all of its records (task and chatter)

Parameters

Takes a hash of

queue

Name of the queue, wildcards allowed

Example usage

$queue->remove_queue( queue => 'queue_name') ;
my $after = $queue->list_queues() ;
# convert list into a hash for easier checking
my %a = map { $_ => 1} @after ;
say "queue removed" if( !$q->{queue_name}) ;

reset_failures, reset_deadletters

Clear any process_failure values from all unprocessed task items

Parameters

Hash of

queue

Name of the queue, wildcard allowed

Example usage

my $before = $queue->stats( queue => 'queue_name') ;
$queue->reset_failures( queue => 'queue_name') ;
my $after = $queue->stats( queue => 'queue_name') ;

say "reset " .( $after->{unprocessed} - $before->{unprocessed}) ;

remove_failures, remove_deadletters

Permanently delete task failures from the database

Parameters

Hash of

queue

Name of the queue, wildcard allowed

Example usage

$queue->remove_failues( queue => 'queue_name') ;
my $stats = $queue->stats( queue => 'queue_name') ;
say "failues left " .( $stats->{failures}) ;

remove_tables

If you never need to use the database again, it can be completely removed

Example usage

$queue_>remove_tables() ;