package Data::Queue::Persistent;

use 5.008004;
use strict;
use warnings;
use Carp qw / croak /;
use DBI;

our $VERSION = '0.13';

our $schema = q{
    CREATE TABLE %s (
                     qkey VARCHAR(255) NOT NULL,
                     idx INTEGER UNSIGNED NOT NULL,
                     value BLOB,
                     PRIMARY KEY (qkey, idx)
                     )
    };

sub new {
    my ($class, %opts) = @_;

    my $dsn = delete $opts{dsn};
    my $dbh = delete $opts{dbh};

    croak "No DSN or database handle passed to Data::Queue::Persistent->new"
        unless $dsn || $dbh;

    my $username = delete $opts{username};
    my $pass = delete $opts{pass};

    my $cache = delete $opts{cache} || 0;

    my $key = delete $opts{id} or croak "No queue id defined";

    my $table = delete $opts{table} || 'persistent_queue';

    my $noload = delete $opts{noload};
    my $max_size = delete $opts{max_size};

    # connect to db
    if ($dsn) {
        $dbh = DBI->connect($dsn, $username, $pass)
            or croak "Could not connect to database";
    }

    my $self = {
        cache      => $cache,
        dbh        => $dbh,
        q          => [],
        key        => $key,
        table_name => $table,
        max_size   => $max_size,
    };

    bless $self, $class;
    $self->init;
    $self->load if $self->caching && ! $noload;

    return $self;
}

sub table_name {
    my $self = shift;
    return $self->dbh->quote_identifier($self->{table_name});
}

sub dbh { $_[0]->{dbh} }
sub key { $_[0]->{key} }
sub q { $_[0]->{q} }
sub caching { $_[0]->{cache} }
sub max_size { $_[0]->{max_size} }

# returns how many items are in the queue
sub length {
    my $self = CORE::shift();

    return (scalar @{$self->{q}}) if $self->caching;

    my $table = $self->table_name;
    my ($length) = $self->dbh->selectrow_array("SELECT COUNT(idx) FROM $table WHERE qkey=?",
                                               undef, $self->key);
    die $self->dbh->errstr if $self->dbh->err;

    return $length || 0;
}

sub _max_idx {
    my $self = CORE::shift();

    # TODO: cache max index

    my $table = $self->table_name;
    my ($idx) = $self->dbh->selectrow_array("SELECT MAX(idx) FROM $table WHERE qkey=?",
                                               undef, $self->key);
    die $self->dbh->errstr if $self->dbh->err;

    return defined $idx ? $idx + 1 : 0;
}

# do a sql statement and die if it fails
sub do {
    my ($self, $sql, @vals) = @_;

    $self->dbh->do($sql, undef, @vals);
    croak $self->dbh->errstr if $self->dbh->err;
}

# initialize the storage
sub init {
    my ($self) = @_;

    croak "No table name defined" unless $self->table_name;

    # don't do anything if table already exists
    return if $self->table_exists;

    # table doesn't exist, create it
    my $sql = sprintf($schema, $self->table_name);
    $self->do($sql);
}

# load data from db
sub load {
    my $self = CORE::shift();

    my $table = $self->table_name or croak "No table name defined";
    die "Table $table does not exist." unless $self->table_exists;

    my $rows = $self->dbh->selectall_arrayref("SELECT value FROM $table WHERE qkey=? ORDER BY idx", undef, $self->key);
    die $self->dbh->errstr if $self->dbh->err;

    return unless $rows && @$rows;
    $self->absorb_rows(@$rows);
}

sub absorb_rows {
    my ($self, @rows) = @_;
    push @{$self->{q}}, map { $_->[0] } @rows;
}

# delete everything from the queue
sub empty {
    my ($self) = @_;

    my $table = $self->table_name;
    $self->do("DELETE FROM $table WHERE qkey=?", $self->key);

    $self->{q} = [] if $self->caching;
}

sub table_exists {
    my $self = CORE::shift();
    # get table info, see if our table exists
    my @tables = $self->dbh->tables(undef, undef, $self->{table_name}, "TABLE");
    my $table = $self->{table_name};

    $table = $self->dbh->quote_identifier($self->{table_name})
        if $self->dbh->get_info(29); # quote if the db driver uses table name quoting

    return grep { $_ =~ m/$table$/ } @tables;
}

# add @vals to the queue
*add = \&unshift;
sub unshift {
    my ($self, @vals) = @_;

    my $idx = $self->_max_idx;

    my $key = $self->dbh->quote($self->key);
    my $table = $self->table_name;
    my $dbh = $self->dbh;

    $dbh->begin_work;
    my $sth = $dbh->prepare(qq[ INSERT INTO $table (qkey, idx, value) VALUES ($key, ?, ?) ]);

    foreach my $val (@vals) {
        push @{$self->{q}}, $val if $self->caching;

        $sth->execute($idx++, $val);
        if ($dbh->err) {
            die $dbh->errstr;
            $dbh->rollback;
        }
    }

    $dbh->commit;

    # truncate queue to max_size
    my $max_size = $self->max_size;
    my $length = $self->length;
    $self->shift($length - $max_size) if defined $max_size && $length > $max_size;
}

# shift $count elements off the queue
*remove = \&shift;
sub shift {
    my ($self, $_count) = @_;

    my $count = defined $_count ? $_count : 1;
    $count += 0;

    if ($self->caching) {
        CORE::shift(@{$self->{q}}) for 1 .. $count;
    }

    my $table = $self->table_name;

    # begin transaction
    $self->dbh->begin_work;

    # get $count elements
    my $rows = $self->dbh->selectall_arrayref("SELECT idx, value FROM $table WHERE qkey = ? ORDER BY idx LIMIT $count",
                                              undef, $self->key);
    die $self->dbh->errstr if $self->dbh->err;

    my @idx = map { $_->[0] } @$rows;
    my @vals = map { $_->[1] } @$rows;

    return () unless @vals;

    # remove the retreived elements
    my $bindstr = join(',', map { '?' } @idx);
    $self->do("DELETE FROM $table WHERE qkey=? AND idx BETWEEN ? AND ?", $self->key, $idx[0], $idx[-1]);

    # commit transaction
    $self->dbh->commit;

    # return first element if no $count defined, otherwise return array of values
    return $vals[0] unless defined $_count;
    return @vals;
}

# retreive elements at an index
sub get {
    my ($self, $offset, $length, %opts) = @_;

    return $self->all unless defined $offset || defined $length;

    $length = -1 unless defined $length; # need to specify a limit when selecting an offset, this is wack
    $offset += 0;

    my $direction = $opts{reverse} ? "DESC" : '';

    my $table = $self->table_name;
    my $rows = $self->dbh->selectcol_arrayref("SELECT value FROM $table WHERE qkey = ? ORDER BY idx $direction LIMIT $length OFFSET $offset",
                                              undef, $self->key);
    die $self->dbh->errstr if $self->dbh->err;

    return wantarray ? @$rows : $rows->[0];
}

# returns all elements of the queue
sub all {
    my $self = CORE::shift();

    return @{$self->{q}} if $self->caching;

    my $valsref = $self->dbh->selectall_arrayref("SELECT value FROM " . $self->table_name . " WHERE qkey = ?",
                                                 undef, $self->key);
    return map { @$_ } @$valsref;
}

1;

__END__

=head1 NAME

Data::Queue::Persistent - Perisistent database-backed queue

=head1 SYNOPSIS

  use Data::Queue::Persistent;
  my $q = Data::Queue::Persistent->new(
    table  => 'persistent_queue', # name to save queues in
    dsn    => 'dbi:SQLite:dbname=queue.db', # dsn for database to save queues
    id     => 'testqueue', # queue identifier
    cache  => 1,
    noload => 1, # don't load saved queue automatically
    max_size => 100, # limit to 100 items
  );
  $q->add('first', 'second', 'third', 'fourth');
  $q->remove;      # returns 'first'
  $q->remove(2);   # returns ('second', 'third')
  $q->empty;       # removes everything

=head1 DESCRIPTION

This is a simple module to keep a persistent queue around. It is just
a normal implementation of a queue, except it is backed by a database
so that when your program exits the data won't disappear.

=head2 EXPORT

None by default.

=head2 Methods

=over 4

=item * new(%opts)

Creates a new persistent data queue object. This will also initialize
the database storage, and load the saved queue data if it already
exists.

Options:

dsn: DSN for database connection.

dbh: Already initialized DBI connection handle.

id: The ID of this queue. You can have multiple queues stored in the
same table, distinguished by their IDs.

user: The username for database connection (optional).

pass: The password for database connection (optional).

cache: Enable caching of the queue for speed. Not reccommended if
multiple instances of the queue will be used concurrently. Default is
0.

table: The table name to use ('persistent_queue' by default).

noload: Don't load queue data when initialized (only applicable if caching is used)

max_size: Limit the queue to max_size, with the oldest elements falling off

=item * add(@items)

Adds a list of items to the queue.

=item * remove($count)

Removes $count (1 by default) items from the queue and returns
them. Returns value if no $count specified, otherwise returns an array
of values.

=item * get([$offset[, $length]])

Gets $length elements starting at offset $offset

=item * all

Returns all elements in the queue. Does not modify the queue.

=item * length

Returns count of elements in the queue.

=item * empty

Removes all elements from the queue.

=item * unshift(@items)

Alias for C<add(@items)>.

=item * shift($count)

Alias for C<remove($count)>.

=back

=head1 SEE ALSO

Any data structures book.

=head1 AUTHOR

Mischa Spiegelmock, E<lt>mspiegelmock@gmail.comE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2007 by Mischa Spiegelmock

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.8.4 or,
at your option, any later version of Perl 5 you may have available.


=cut