# $Id: /mirror/coderepos/lang/perl/Queue-Q4M/trunk/lib/Queue/Q4M.pm 64066 2008-06-24T04:52:43.377517Z daisuke  $
#
# Copyright (c) 2008 Daisuke Maki <daisuke@endeworks.jp>
# All rights reserved.

package Queue::Q4M;
use Moose;

has 'connect_info' => (
    is => 'rw',
    isa => 'ArrayRef',
    required => 1,
);

has 'database' => (
    is => 'rw',
    isa => 'Str'
);

has 'sql_maker' => (
    is => 'rw',
    isa => 'SQL::Abstract',
    required => 1,
    default  => sub { SQL::Abstract->new }
);

has '_dbh' => (
    is => 'rw',
    isa => 'Maybe[DBI::db]',
);

has '_next_sth' => (
    is => 'rw',
    isa => 'Maybe[DBI::st]'
);

has '_next_args' => (
    is => 'rw',
    isa => 'ArrayRef',
    auto_deref => 1,
);

has '__table' => (
    is => 'rw',
    isa => 'Maybe[Str]'
);

has '__res' => (
    is => 'rw',
    isa => 'Maybe[Queue::Q4M::Result]'
);

__PACKAGE__->meta->make_immutable;

no Moose;

use DBI;
use SQL::Abstract;

our $VERSION = '0.00004';


sub BUILD
{
    my $self = shift;

    my $connect_info = $self->connect_info;

    # XXX This is a hack. Hopefully it will be fixed in q4m
    if (! $self->database ) {
        $connect_info->[0] =~ /(?:dbname|database)=([^;]+)/;
        my $database = $1;
        $self->database($1);
    }
    $self;
}

sub connect
{
    my $self = shift;
    if (! ref $self) {
        $self = $self->new(@_);
    }

    $self->_dbh( $self->_connect() );
    $self;
}

sub _connect
{
    my $self = shift;

    return DBI->connect(@{ $self->connect_info });
}

sub dbh
{
    my $self = shift;
    my $dbh = $self->_dbh;

    if (! $dbh || ! $dbh->ping) {
        $dbh = $self->_connect();
        $self->_dbh( $dbh );
    }
    return $dbh;
}

sub next
{
    my $self = shift;
    my @args = @_;

    # First, undef any cached table name that we might have had
    $self->__table(undef);

    my @tables = 
        grep { !/^\d+$/ }
        map  {
            s/\[.*$//;
            $_
        }
        @args
    ;

    # Cache this statement handler so we don't unnecessarily create
    # string or handles
    my $sth = $self->_next_sth;
    if (! $sth || @args) {
        my $dbh = $self->_dbh;
        my $sql = sprintf(
            "SELECT queue_wait(%s)",
            join(',', (('?') x scalar(@args)))
        );
        my $timeout = $args[-1] =~ /^\d+$/ ? pop @args  : undef;
        my @binds   = map {
            # if no dot exists, then add database\. to the beginning
            if ( index('.', $_) < 0 ) {
                $_ = join('.', $self->database, $_);
            }
            $_
        } @args;
        if ($timeout) {
            push @binds, $timeout;
        }

        $sth = $dbh->prepare( $sql ) ;
        $self->_next_sth( $sth );
        $self->_next_args( \@binds );
    }

    my $rv = $sth->execute($self->_next_args);
    my ($index) = $sth->fetchrow_array;
    $sth->finish;

    my $table = ($rv > 0 && $index > 0) ? $tables[$index - 1] : undef;
    my $res = Queue::Q4M::Result->new(
        rv         => defined $table,
        table      => $table,
        on_release => sub { $self->__table(undef) }
    );

    if (defined $table) {
        $self->__table($table);
    }
    $self->__res($res);
    return $res;
}

sub _fetch_execute
{
    my $self = shift;
    my $table = (!@_ || (ref $_[0] && eval { ! $_[0]->isa('Queue::Q4M::Result') })) ? $self->__table : shift;
    $table or die "no table";

    my ($sql, @bind) = $self->sql_maker->select($table, @_);
    my $dbh = $self->dbh;
    my $sth = $dbh->prepare($sql);
    $sth->execute(@bind); # XXX - currently always empty
    return $sth;
}

*fetch = \&fetch_array;
sub fetch_array
{
    my $self = shift;
    my $sth  = $self->_fetch_execute(@_);
    my @ret  = $sth->fetchrow_array();
    $sth->finish;
    @ret;
}

sub fetch_arrayref
{
    my $self = shift;
    my $sth  = $self->_fetch_execute(@_);
    my $ret  = $sth->fetchrow_arrayref();
    $sth->finish;
    $ret;
}

sub fetch_hashref
{
    my $self = shift;
    my $sth  = $self->_fetch_execute(@_);
    my $ret  =  $sth->fetchrow_hashref();
    $sth->finish;
    $ret;
}

sub insert
{
    my $self  = shift;
    my $table = shift;

    my ($sql, @bind) = $self->sql_maker->insert($table, @_);
    my $dbh = $self->_dbh;
    my $sth = $dbh->prepare($sql);
    my $rv = $sth->execute(@bind);
    $sth->finish;
    return $rv;
}

sub disconnect
{
    my $self = shift;
    my $dbh  = $self->_dbh;
    if ($dbh) {
        $dbh->do("select queue_end()");
        $dbh->disconnect;
        $self->_dbh(undef);
    }
}

sub DEMOLISH
{
    my $self = shift;
    $self->disconnect;
}

package
    Queue::Q4M::Result;
use overload
    bool => \&as_bool,
    '""' => \&as_string,
    fallback => 1
;
use Scope::Guard;

sub new
{
    my $class = shift;
    my %args  = @_;
    return bless [ $args{rv}, $args{table}, Scope::Guard->new( $args{on_release} ) ], $class;
}

sub as_bool { $_[0]->[0] }
sub as_string { $_[0]->[1] }
sub DESTROY { $_[0]->[2]->dismiss(1) if $_[0]->[2] }

1;

__END__

=head1 NAME

Queue::Q4M - Simple Interface To q4m

=head1 SYNOPSIS

  use Queue::Q4M;

  my $q = Queue::Q4M->connect(
    connect_info => [
      'dbi:mysql:dbname=mydb',
      $username,
      $password
    ],
  );

  for (1..10) {
    $q->insert($table, \%fieldvals);
  }

  while ($q->next($table)) {
    my ($col1, $col2, $col3) = $q->fetch($table, \@fields);
    print "col1 = $col1, col2 = $col2, col3 = $col3\n";
  }

  while ($q->next($table)) {
    my $cols = $q->fetch_arrayref($table, \@fields);
    print "col1 = $cols->[0], col2 = $cols->[1], col3 = $cols->[2]\n";
  }

  while ($q->next($table)) {
    my $cols = $q->fetch_hashref($table, \@fields);
    print "col1 = $cols->{col1}, col2 = $cols->{col2}, col3 = $cols->{col3}\n";
  }

  # to use queue_wait(table_cond1,table_cond2,timeout)
  while (my $which = $q->next(@table_conds)) {
    # $which contains the table name
  }

  $q->disconnect;

=head1 DESCRIPTION

Queue::Q4M is a simple wrapper to q4m, which is an implementation of a queue
using mysql.

=head1 METHODS

=head2 new

Creates a new Queue::Q4M instance. Normally you should use connect() instead.

=head2 connect

Connects to the target database.

  my $q = Queue::Q4M->connect(
    connect_info => [
      'dbi:mysql:dbname=q4m',
    ]
  );

=head2 next($table_cond1[, $table_cond2, $table_cond3, ..., $timeout])

Blocks until the next item is available. This is equivalent to calling
queue_wait() on the given table.

  my $which = $q->next( $table_cond1, $table_cond2, $table_cond3 );

=head2 fetch

=head2 fetch_array

Fetches the next available row. Takes the list of columns to be fetched.

  my ($col1, $col2, $col3) = $q->fetch( $table, [ qw(col1 col2 col3) ] );

=head2 fetch_arrayref

Same as fetch_array, but fetches using fetchrow_arrayref()

  my $arrayref = $q->fetch_arrayref( $table, [ qw(col1 col2 col3) ] );

=head2 fetch_hashref

Same as fetch_array, but fetches using fetchrow_hashref()

  my $hashref = $q->fetch_hashref( $table, [ qw(col1 col2 col3) ] );

=head2 insert($table, \%field)

Inserts into the queue. The first argument should be a scalar specifying
a table name. The second argument is a hashref that specifies the mapping
between column names and their respective values.

  $q->insert($table, { col1 => $val1, col2 => $val2, col3 => $val3 });

For backwards compatibility, you may omit $table if you specified $table
in the constructor.

=head2 dbh

Returns the database handle after making sure that it's connected.

=head2 disconnect

Disconnects.

=head2 BUILD

=head2 DEMOLISH

These are defined as part of Moose infrastructure

=head1 AUTHOR

Copyright (c) 2008 Daisuke Maki E<lt>daisuke@endeworks.jpE<gt>

=head1 LICENSE

This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.

See http://www.perl.com/perl/misc/Artistic.html

=cut