use Carp;
use Mojo::JSON 'to_json';
use Mojo::Util 'monkey_patch';
use Scalar::Util 'weaken';
has [qw(dbh mysql)];
has results_class => 'Mojo::mysql::Results';
for my $name (qw(delete insert select update)) {
monkey_patch __PACKAGE__, $name, sub {
my $self = shift;
my @cb = ref $_[-1] eq 'CODE' ? pop : ();
return $self->query($self->mysql->abstract->$name(@_), @cb);
monkey_patch __PACKAGE__, "${name}_p", sub {
my $self = shift;
return $self->query_p($self->mysql->abstract->$name(@_));
my $self = shift;
return unless (my $mysql = $self->mysql) and (my $dbh = $self->dbh);
$mysql->_enqueue($dbh, $self->{handle});
sub backlog { scalar @{shift->{waiting} || []} }
sub begin {
my $self = shift;
my $tx = Mojo::mysql::Transaction->new(db => $self);
weaken $tx->{db};
return $tx;
sub disconnect {
my $self = shift;
sub pid { shift->_dbh_attr('mysql_thread_id') }
sub ping { shift->dbh->ping }
sub query {
my ($self, $query) = (shift, shift);
my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
# Blocking
unless ($cb) {
Carp::confess('Cannot perform blocking query, while waiting for async response') if $self->backlog;
my $sth = $self->dbh->prepare($query);
local $sth->{HandleError} = sub { $_[0] = Carp::shortmess($_[0]); 0 };
_bind_params($sth, @_);
my $rv = $sth->execute;
my $res = $self->results_class->new(db => $self, is_blocking => 1, sth => $sth);
$res->{affected_rows} = defined $rv && $rv >= 0 ? 0 + $rv : undef;
return $res;
# Non-blocking
push @{$self->{waiting}}, {args => [@_], err => Carp::shortmess('__MSG__'), cb => $cb, query => $query};
$self->$_ for qw(_next _watch);
return $self;
sub query_p {
my $self = shift;
my $promise = Mojo::Promise->new;
$self->query(@_ => sub { $_[1] ? $promise->reject($_[1]) : $promise->resolve($_[2]) });
return $promise;
sub quote { shift->dbh->quote(shift) }
sub quote_id { shift->dbh->quote_identifier(shift) }
sub tables {
shift->query('show tables')->arrays->reduce(sub { push @$a, $b->[0]; $a }, []);
sub _bind_params {
my $sth = shift;
for my $i (0 .. $#_) {
my $param = $_[$i];
my %attrs;
if (ref $param eq 'HASH') {
if (exists $param->{json}) {
$param = to_json $param->{json};
elsif (exists $param->{type} && exists $param->{value}) {
($param, $attrs{TYPE}) = @$param{qw(value type)};
$sth->bind_param($i + 1, $param, \%attrs);
return $sth;
sub _cleanup_sth {
my $self = shift;
delete $self->{done_sth};
$_->{cb}($self, 'Premature connection close', undef) for @{delete $self->{waiting} || []};
sub _dbh_attr {
my $self = shift;
my $dbh = ref $self ? $self->dbh : shift;
my $name = shift;
$name =~ s!^mysql!{lc $dbh->{Driver}{Name}}!e;
return $dbh->{$name} = shift if @_;
return $dbh->{$name};
sub _next {
my $self = shift;
return unless my $next = $self->{waiting}[0];
return if $next->{sth};
my $dbh = $self->dbh;
my $flag = lc $dbh->{Driver}{Name} eq 'mariadb' ? 'mariadb_async' : 'async';
my $sth = $next->{sth} = $self->dbh->prepare($next->{query}, {$flag => 1});
_bind_params($sth, @{$next->{args}});
sub _unwatch {
Mojo::IOLoop->singleton->reactor->remove(delete $_[0]->{handle}) if $_[0]->{handle};
sub _watch {
my $self = shift;
return if $self->{handle};
my $dbh = $self->dbh;
my $driver = lc $dbh->{Driver}{Name};
my $ready_method = "${driver}_async_ready";
my $result_method = "${driver}_async_result";
my $fd = $driver eq 'mariadb' ? $dbh->mariadb_sockfd : $dbh->mysql_fd;
open $self->{handle}, '<&', $fd or die "Could not dup $driver fd: $!";
$self->{handle} => sub {
return unless my $waiting = $self->{waiting};
return unless @$waiting and $waiting->[0]{sth} and $waiting->[0]{sth}->$ready_method;
my ($cb, $err, $sth) = @{shift @$waiting}{qw(cb err sth)};
# Do not raise exceptions inside the event loop
my $rv = do { local $sth->{RaiseError} = 0; $sth->$result_method };
my $res = $self->results_class->new(db => $self, sth => $sth);
$err = undef if defined $rv;
$err =~ s!\b__MSG__\b!{$dbh->errstr}!e if defined $err;
$res->{affected_rows} = defined $rv && $rv >= 0 ? 0 + $rv : undef;
$self->$cb($err, $res);
$self->_unwatch unless $self->backlog;
)->watch($self->{handle}, 1, 0);
=encoding utf8
=head1 NAME
Mojo::mysql::Database - Database
use Mojo::mysql::Database;
my $db = Mojo::mysql::Database->new(mysql => $mysql, dbh => $dbh);
L<Mojo::mysql::Database> is a container for database handles used by L<Mojo::mysql>.
L<Mojo::mysql::Database> implements the following attributes.
=head2 dbh
my $dbh = $db->dbh;
$db = $db->dbh(DBI->new);
Database handle used for all queries.
=head2 mysql
my $mysql = $db->mysql;
$db = $db->mysql(Mojo::mysql->new);
L<Mojo::mysql> object this database belongs to.
=head2 results_class
$class = $db->results_class;
$db = $db->results_class("MyApp::Results");
Class to be used by L</"query">, defaults to L<Mojo::mysql::Results>. Note that
this class needs to have already been loaded before L</"query"> is called.
=head1 METHODS
L<Mojo::mysql::Database> inherits all methods from L<Mojo::EventEmitter> and
implements the following new ones.
=head2 backlog
my $num = $db->backlog;
Number of waiting non-blocking queries.
=head2 begin
my $tx = $db->begin;
Begin transaction and return L<Mojo::mysql::Transaction> object, which will
automatically roll back the transaction unless
L<Mojo::mysql::Transaction/"commit"> bas been called before it is destroyed.
# Add names in a transaction
eval {
my $tx = $db->begin;
$db->query('insert into names values (?)', 'Baerbel');
$db->query('insert into names values (?)', 'Wolfgang');
say $@ if $@;
=head2 delete
my $results = $db->delete($table, \%where);
Generate a C<DELETE> statement with L<Mojo::mysql/"abstract"> (usually an
L<SQL::Abstract> object) and execute it with L</"query">. You can also append a
callback to perform operations non-blocking.
$db->delete(some_table => sub {
my ($db, $err, $results) = @_;
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
=head2 delete_p
my $promise = $db->delete_p($table, \%where, \%options);
Same as L</"delete">, but performs all operations non-blocking and returns a
L<Mojo::Promise> object instead of accepting a callback.
$db->delete_p('some_table')->then(sub {
my $results = shift;
})->catch(sub {
my $err = shift;
=head2 disconnect
Disconnect database handle and prevent it from getting cached again.
=head2 insert
my $results = $db->insert($table, \@values || \%fieldvals, \%options);
Generate an C<INSERT> statement with L<Mojo::mysql/"abstract"> (usually an
L<SQL::Abstract> object) and execute it with L</"query">. You can also append a
callback to perform operations non-blocking.
$db->insert(some_table => {foo => 'bar'} => sub {
my ($db, $err, $results) = @_;
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
=head2 insert_p
my $promise = $db->insert_p($table, \@values || \%fieldvals, \%options);
Same as L</"insert">, but performs all operations non-blocking and returns a
L<Mojo::Promise> object instead of accepting a callback.
$db->insert_p(some_table => {foo => 'bar'})->then(sub {
my $results = shift;
})->catch(sub {
my $err = shift;
=head2 pid
my $pid = $db->pid;
Return the connection id of the backend server process.
=head2 ping
my $bool = $db->ping;
Check database connection.
=head2 query
my $results = $db->query('select * from foo');
my $results = $db->query('insert into foo values (?, ?, ?)', @values);
my $results = $db->query('insert into foo values (?)', {json => {bar => 'baz'}});
my $results = $db->query('insert into foo values (?)', {type => SQL_INTEGER, value => 42});
Execute a blocking statement and return a L<Mojo::mysql::Results> object with the
results. You can also append a callback to perform operation non-blocking.
$db->query('select * from foo' => sub {
my ($db, $err, $results) = @_;
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
Hash reference arguments containing a value named C<json>, will be encoded to
JSON text with L<Mojo::JSON/"to_json">. To accomplish the reverse, you can use
the method L<Mojo::mysql::Results/"expand">, which automatically decodes data back
to Perl data structures.
$db->query('insert into foo values (x) values (?)', {json => {bar => 'baz'}});
$db->query('select * from foo')->expand->hash->{x}{bar}; # baz
Hash reference arguments containing values named C<type> and C<value> can be
used to bind specific L<DBI> data types (see L<DBI/"DBI Constants">) to
placeholders. This is needed to pass binary data in parameters; see
L<DBD::mysql/"mysql_enable_utf8"> for more information.
# Insert binary data
use DBI ':sql_types';
$db->query('insert into bar values (?)', {type => SQL_BLOB, value => $bytes});
=head2 query_p
my $promise = $db->query_p('select * from foo');
Same as L</"query">, but performs all operations non-blocking and returns a
L<Mojo::Promise> object instead of accepting a callback.
$db->query_p('insert into foo values (?, ?, ?)' => @values)->then(sub {
my $results = shift;
})->catch(sub {
my $err = shift;
=head2 quote
my $escaped = $db->quote($str);
Quote a string literal for use as a literal value in an SQL statement.
=head2 quote_id
my $escaped = $db->quote_id($id);
Quote an identifier (table name etc.) for use in an SQL statement.
=head2 select
my $results = $db->select($source, $fields, $where, $order);
Generate a C<SELECT> statement with L<Mojo::mysql/"abstract"> (usually an
L<SQL::Abstract> object) and execute it with L</"query">. You can also append a
callback to perform operations non-blocking.
$db->select(some_table => ['foo'] => sub {
my ($db, $err, $results) = @_;
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
=head2 select_p
my $promise = $db->select_p($source, $fields, $where, $order);
Same as L</"select">, but performs all operations non-blocking and returns a
L<Mojo::Promise> object instead of accepting a callback.
$db->select_p(some_table => ['foo'] => {bar => 'yada'})->then(sub {
my $results = shift;
})->catch(sub {
my $err = shift;
=head2 update
my $results = $db->update($table, \%fieldvals, \%where);
Generate an C<UPDATE> statement with L<Mojo::mysql/"abstract"> (usually an
L<SQL::Abstract> object) and execute it with L</"query">. You can also append a
callback to perform operations non-blocking.
$db->update(some_table => {foo => 'baz'} => {foo => 'bar'} => sub {
my ($db, $err, $results) = @_;
Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
=head2 update_p
my $promise = $db->update_p($table, \%fieldvals, \%where, \%options);
Same as L</"update">, but performs all operations non-blocking and returns a
L<Mojo::Promise> object instead of accepting a callback.
$db->update_p(some_table => {foo => 'baz'} => {foo => 'bar'})->then(sub {
my $results = shift;
})->catch(sub {
my $err = shift;
=head2 tables
my $tables = $db->tables;
Return an array reference with table names for this database.
=head1 SEE ALSO