NAME
MongoDBx::Queue - A work queue implemented with MongoDB
VERSION
version 0.001
SYNOPSIS
use v5.10;
use MongoDB;
use MongoDBx::Queue;
my $connection = MongoDB::Connection->new( @parameters );
my $database = $connection->get_database("queue_db");
my $queue = MongoDBx::Queue->new( db => $database );
$queue->add_task( { msg => "Hello World" } );
$queue->add_task( { msg => "Goodbye World" } );
while ( my $task = $queue->reserve_task ) {
say $task->{msg};
$queue->remove_task( $task );
}
DESCRIPTION
ALPHA -- this is an early release and is still in development. Testing and feedback welcome.
MongoDBx::Queue implements a simple message queue using MongoDB as a backend.
On a single host with MongoDB, it provides a zero-configuration message service across local applications. Alternatively, it can use a MongoDB database cluster that provides replication and fail-over for an even more durable queue.
Features:
hash references, not objects
arbitrary message fields
stalled tasks can be timed-out
task rescheduling
Not yet implemented:
arbitrary scheduling on insertion
parameter checking
error handling
Warning: do not use with capped collections, as the queued messages will not meet the constraints required by a capped collection.
ATTRIBUTES
db
A MongoDB::Database object to hold the queue. Required.
name
A collection name for the queue. Defaults to 'queue'. The collection must only be used by MongoDBx::Queue or unpredictable awful things will happen.
safe
Boolean that controls whether 'safe' inserts/updates are done. Defaults to true.
METHODS
new
my $queue = MongoDBx::Queue->new( db => $database, @options );
Creates and returns a new queue object. The db
argument is required. Other attributes may be provided as well.
add_task
$queue->add_task( $hashref );
Adds a task to the queue. The hash reference will be shallow copied into the task. Keys must not start with underscores, which are reserved for MongoDBx::Queue.
reserve_task
$task = $queue->reserve_task;
Atomically marks and returns a task. The task is marked in the queue as in-progress so it can not be reserved again unless is is rescheduled or timed-out. The task returned is a hash reference containing the data added in add_task
, including private keys for use by MongoDBx::Queue.
Tasks are returned in insertion time order with a resolution of one second.
reschedule_task
$queue->reschedule_task( $task );
$queue->reschedule_task( $task, time() );
$queue->reschedule_task( $task, $when );
Releases the reservation on a task. If there is no second argument, the task keeps its original priority. If a second argument is provided, it sets a new insertion time for the task. The schedule is ordered by epoch seconds, so an arbitrary past or future time can be set and affects subsequent reservation order.
remove_task
$queue->remove_task( $task );
Removes a task from the queue (i.e. indicating the task has been processed).
apply_timeout
$queue->apply_timeout( $seconds );
Removes reservations that occurred more than $seconds
ago. If no argument is given, the timeout defaults to 120 seconds. The timeout should be set longer than the expected task processing time, so that only dead/hung tasks are returned to the active queue.
size
$queue->size;
Returns the number of tasks in the queue, including in-progress ones.
waiting
$queue->waiting;
Returns the number of tasks in the queue that have not been reserved.
SUPPORT
Bugs / Feature Requests
Please report any bugs or feature requests through the issue tracker at https://github.com/dagolden/mongodbx-queue/issues. You will be notified automatically of any progress on your issue.
Source Code
This is open source software. The code repository is available for public review and contribution under the terms of the license.
https://github.com/dagolden/mongodbx-queue
git clone git://github.com/dagolden/mongodbx-queue.git
AUTHOR
David Golden <dagolden@cpan.org>
COPYRIGHT AND LICENSE
This software is Copyright (c) 2012 by David Golden.
This is free software, licensed under:
The Apache License, Version 2.0, January 2004