NAME

EV::MariaDB - Async MariaDB/MySQL client using libmariadb and EV

SYNOPSIS

use EV;
use EV::MariaDB;

my $m = EV::MariaDB->new(
    host       => 'localhost',
    user       => 'root',
    password   => '',
    database   => 'test',
    on_connect => sub { print "connected\n" },
    on_error   => sub { warn "error: $_[0]\n" },
);

# simple query (with column metadata)
$m->query("select * from users", sub {
    my ($rows, $err, $fields) = @_;
    if ($err) { warn $err; return }
    print join(", ", @$fields), "\n";  # column names
    for my $row (@$rows) {
        print join(", ", @$row), "\n";
    }
});

# prepared statement
$m->prepare("select * from users where id = ?", sub {
    my ($stmt, $err) = @_;
    die $err if $err;
    $m->execute($stmt, [42], sub {
        my ($rows, $err, $fields) = @_;
        # ...
        $m->close_stmt($stmt, sub { });
    });
});

# pipelined queries (all sent before reading results)
for my $id (1..100) {
    $m->q("select * from t where id = $id", sub {
        my ($rows, $err) = @_;
        # callbacks fire in order
    });
}

EV::run;

DESCRIPTION

EV::MariaDB is an asynchronous MariaDB/MySQL client that integrates with the EV event loop. It uses the MariaDB Connector/C non-blocking API to perform all database operations without blocking the event loop.

Key features:

  • Fully asynchronous connect, query, and prepared statement execution

  • Query pipelining via mysql_send_query/mysql_read_query_result for high throughput

  • Prepared statements with automatic buffer management

  • Column metadata (field names) returned with query results

  • Streaming row-by-row results via query_stream

  • Async transaction control (commit, rollback, autocommit)

  • Connection utility operations (ping, reset, change_user, select_db, set_charset)

  • BLOB/TEXT streaming via send_long_data

  • Async graceful close via close_async

  • Multi-result set support for multi-statement queries

CONSTRUCTOR

new

my $m = EV::MariaDB->new(%args);

Creates a new EV::MariaDB object. If host or user is provided, connects immediately (asynchronously).

Connection parameters:

host => $hostname

Server hostname. Default: localhost. Note: localhost may connect via Unix socket; use 127.0.0.1 to force TCP.

port => $port

Server port. Default: 3306.

user => $username

Username for authentication.

password => $password

Password for authentication.

database => $dbname

Default database. Also accepts db as an alias.

unix_socket => $path

Path to Unix domain socket.

Callbacks:

on_connect => sub { }

Called when the connection is established. No arguments. Exceptions thrown inside this handler are caught and re-emitted as warnings to protect the event loop.

on_error => sub { my ($message) = @_ }

Called on connection-level errors. Default: sub { die @_ }. Note: exceptions thrown inside this handler are caught and re-emitted as warnings to protect the event loop.

Connection options:

connect_timeout => $seconds
read_timeout => $seconds
write_timeout => $seconds
compress => 1

Enable protocol compression.

multi_statements => 1

Allow multiple SQL statements per query string.

charset => $name

Character set name (e.g., utf8mb4).

init_command => $sql

SQL statement executed automatically after connecting.

ssl_key => $path
ssl_cert => $path
ssl_ca => $path
ssl_cipher => $list
ssl_verify_server_cert => 1

SSL/TLS connection options.

Event loop:

loop => $ev_loop

EV loop to use. Default: EV::default_loop.

METHODS

All asynchronous methods take a callback as the last argument. The callback convention is ($result, $error): on success $error is undef; on failure $result is undef and $error contains the error message.

connect

$m->connect($host, $user, $password, $database, $port, $unix_socket);

Connects to the server. Called automatically by new when host or user is provided. Use this for deferred connection:

my $m = EV::MariaDB->new(
    on_connect => sub { ... },
    on_error   => sub { ... },
);
$m->connect('localhost', 'root', '', 'test', 3306);

Dies if a connection is in progress or already established. $port defaults to 3306. $unix_socket is optional (pass undef or omit).

query

$m->query($sql, sub { my ($result, $err, $fields) = @_ });

Executes a SQL query. The callback receives:

  • For select: ($arrayref_of_arrayrefs, undef, $field_names)

    $field_names is an arrayref of column name strings.

  • For DML (insert/update/delete): ($affected_rows, undef)

  • On error: (undef, $error_message)

Queries are pipelined: multiple calls to query before the event loop runs will be sent as a batch, with results read back in order. May be called after connect has been initiated (even before it completes); queries are buffered and sent once connected. Also safe to call while a utility operation (ping, select_db, etc.) is active - the query is buffered and executed when the operation completes. Dies if connect has not been called at all.

Note: Result strings are returned as byte strings without Perl's internal UTF-8 flag set, regardless of the connection charset. Use Encode::decode_utf8 if you need character semantics on UTF-8 data.

prepare

$m->prepare($sql, sub { my ($stmt, $err) = @_ });

Prepares a server-side statement. The callback receives an opaque statement handle or an error. Pass the handle to execute, close_stmt, and stmt_reset.

execute

$m->execute($stmt, \@params, sub { my ($result, $err, $fields) = @_ });

Executes a prepared statement with the given parameters. Parameters are type-detected: integers bind as BIGINT (unsigned integers are flagged accordingly), floats as DOUBLE, all others as STRING. Pass undef for NULL. The callback receives results in the same format as query (including $fields for SELECT results).

Pass undef instead of \@params to skip parameter binding and use previously bound parameters (see bind_params and send_long_data).

close_stmt

$m->close_stmt($stmt, sub { my ($ok, $err) = @_ });

Closes a prepared statement, freeing server resources.

stmt_reset

$m->stmt_reset($stmt, sub { my ($ok, $err) = @_ });

Resets a prepared statement (clears errors and unbinds parameters) without closing it.

ping

$m->ping(sub { my ($ok, $err) = @_ });

Checks if the connection is alive.

select_db

$m->select_db($dbname, sub { my ($ok, $err) = @_ });

Changes the default database.

change_user

$m->change_user($user, $password, $db_or_undef, sub { my ($ok, $err) = @_ });

Changes the user and optionally the database. Pass undef for $db to keep the current database.

reset_connection

$m->reset_connection(sub { my ($ok, $err) = @_ });

Resets session state (variables, temporary tables, etc.) without reconnecting. Equivalent to COM_RESET_CONNECTION.

set_charset

$m->set_charset($charset, sub { my ($ok, $err) = @_ });

Changes the connection character set asynchronously (e.g., utf8mb4).

commit

$m->commit(sub { my ($ok, $err) = @_ });

Commits the current transaction.

rollback

$m->rollback(sub { my ($ok, $err) = @_ });

Rolls back the current transaction.

autocommit

$m->autocommit($mode, sub { my ($ok, $err) = @_ });

Enables ($mode = 1) or disables ($mode = 0) autocommit mode.

query_stream

$m->query_stream($sql, sub {
    my ($row, $err) = @_;
    if ($err) { warn $err; return }
    if (!defined $row) { print "done\n"; return }
    # process $row (arrayref)
});

Executes a SELECT query and streams results row-by-row using mysql_use_result/mysql_fetch_row. The callback is invoked once per row with ($arrayref), once at EOF with (undef), or on error with (undef, $error_message). Unlike query, results are not buffered in memory - suitable for large result sets.

This is an exclusive operation: no other queries can be queued while streaming is active.

close_async

$m->close_async(sub { my ($ok, $err) = @_ });

Gracefully closes the connection asynchronously (sends COM_QUIT without blocking the event loop). After completion, is_connected returns false. Use finish for immediate synchronous close.

send_long_data

$m->send_long_data($stmt, $param_idx, $data, sub { my ($ok, $err) = @_ });

Sends long parameter data (BLOB/TEXT) for a prepared statement. Can be called multiple times for the same parameter to send data in chunks. Must be called after bind_params and before execute.

Typical workflow:

$m->prepare("INSERT INTO t VALUES (?, ?)", sub {
    my ($stmt) = @_;
    $m->bind_params($stmt, [1, ""]);  # bind all params first
    $m->send_long_data($stmt, 1, $blob_data, sub {
        $m->execute($stmt, undef, sub {  # undef = skip re-binding
            # ...
        });
    });
});

bind_params

$m->bind_params($stmt, \@params);

Synchronously binds parameters to a prepared statement without executing it. Required before send_long_data. Parameter types are auto-detected the same way as in execute.

reset

$m->reset;

Disconnects and reconnects using the original connection parameters. Cancels all pending operations. Dies if no prior connection exists.

finish

$m->finish;

Disconnects from the server. Cancels all pending operations, invoking their callbacks with an error.

escape

my $escaped = $m->escape($string);

Escapes a string for safe use in SQL, respecting the connection's character set. Warns if the string has Perl's UTF-8 flag set but the connection charset is not utf8/utf8mb4.

skip_pending

$m->skip_pending;

Cancels all pending, queued, and in-flight operations, invoking their callbacks with (undef, "skipped"). If an async operation is active or sent queries are awaiting results, the connection is closed (use reset to reconnect). Queued but unsent queries are cancelled without closing the connection.

on_connect

$m->on_connect(sub { ... });   # set handler
my $cb = $m->on_connect;       # get handler

Get or set the connect handler. When called with a CODE reference, sets the handler. When called without arguments, returns the current handler (or undef if unset).

on_error

$m->on_error(sub { my ($msg) = @_ });   # set handler
my $cb = $m->on_error;                  # get handler

Get or set the error handler. When called with a CODE reference, sets the handler. When called without arguments, returns the current handler (or undef if unset).

ACCESSORS

is_connected

Returns true if connected to the server.

error_message

Last error message, or undef.

error_number

Last error number (0 if no error).

sqlstate

SQLSTATE code (5-character string) for the last error.

insert_id

auto_increment value from the last insert.

warning_count

Number of warnings from the last query.

info

Additional info about the last query (e.g., rows matched for update), or undef.

server_version

Server version as an integer (e.g., 110206 for 11.2.6).

server_info

Server version string.

thread_id

Connection thread ID.

host_info

String describing connection type and host.

character_set_name

Current character set name.

socket

File descriptor of the connection socket.

pending_count

Number of pending operations (queued + in-flight).

CLASS METHODS

lib_version
EV::MariaDB->lib_version;

Client library version as an integer.

lib_info
EV::MariaDB->lib_info;

Client library version string.

ALIASES

q          -> query
prep       -> prepare
reconnect  -> reset
disconnect -> finish
errstr     -> error_message
errno      -> error_number

PIPELINING

When multiple queries are submitted before the event loop processes I/O, EV::MariaDB pipelines them: all queries are sent to the server before reading any results. This reduces round-trip overhead and can achieve 2-3x higher throughput than sequential execution.

# all 100 queries are pipelined
for (1..100) {
    $m->q("select $_", sub { ... });
}

The maximum pipeline depth is 64 queries. Additional queries are buffered and sent as earlier results are received.

SEE ALSO

EV, DBD::MariaDB, AnyEvent::MySQL

AUTHOR

vividsnow

LICENSE

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.