NAME

EV::Pg - asynchronous PostgreSQL client using libpq and EV

SYNOPSIS

use v5.10;
use EV;
use EV::Pg;

my $pg = EV::Pg->new(
    conninfo   => 'dbname=mydb',
    on_error   => sub { die "PG error: $_[0]\n" },
);
$pg->on_connect(sub {
    $pg->query_params(
        'select $1::int + $2::int', [10, 20],
        sub {
            my ($rows, $err) = @_;
            die $err if $err;
            say $rows->[0][0];  # 30
            EV::break;
        },
    );
});
EV::run;

DESCRIPTION

EV::Pg is a non-blocking PostgreSQL client that integrates with the EV event loop. It drives the libpq async API (PQsendQuery, PQconsumeInput, PQgetResult) via ev_io watchers on the libpq socket, so the event loop never blocks on database I/O.

Features: parameterized queries, prepared statements, pipeline mode, single-row mode, chunked rows (libpq >= 17), COPY IN/OUT, LISTEN/NOTIFY, async cancel (libpq >= 17), structured error fields, protocol tracing, and notice handling.

CALLBACKS

Query callbacks receive ($result) on success, (undef, $error) on error:

SELECT / single-row mode

(\@rows) where each row is an arrayref of column values. undef columns map to Perl undef.

INSERT / UPDATE / DELETE

($cmd_tuples) -- the string returned by PQcmdTuples (e.g. "1", "0").

Describe

(\%meta) with keys nfields, nparams, and (when non-zero) fields (arrayref of {name, type} hashes) and paramtypes (arrayref of OIDs).

COPY

("COPY_IN"), ("COPY_OUT"), or ("COPY_BOTH").

Pipeline sync

(1).

Error

(undef, $error_message).

Exceptions thrown inside callbacks are caught and emitted as warnings.

CONSTRUCTOR

new

my $pg = EV::Pg->new(%args);

Arguments:

conninfo

libpq connection string. If provided, connect is called immediately.

conninfo_params

Hashref of connection parameters (e.g. { host => 'localhost', dbname => 'mydb', port => '5432' }). Alternative to conninfo. If provided, connect_params is called immediately.

expand_dbname

If true and conninfo_params is used, the dbname value is parsed as a connection string (allowing dbname => 'postgresql://...').

on_connect

Callback invoked (with no arguments) when the connection is established.

on_error

Callback invoked as ($error_message) on connection-level errors. Defaults to sub { die @_ }.

on_notify

Callback invoked as ($channel, $payload, $backend_pid) on LISTEN/NOTIFY messages.

on_notice

Callback invoked as ($message) on PostgreSQL notice/warning messages.

on_drain

Callback invoked (with no arguments) when the send buffer has been flushed during COPY IN. Useful for resuming put_copy_data after it returns 0.

loop

An EV loop object. Defaults to EV::default_loop.

CONNECTION METHODS

connect

$pg->connect($conninfo);

Initiates an asynchronous connection. The on_connect handler fires on success; on_error fires on failure.

connect_params

$pg->connect_params(\%params);
$pg->connect_params(\%params, $expand_dbname);

Initiates an asynchronous connection using keyword/value parameters instead of a connection string. $expand_dbname allows the dbname parameter to contain a full connection URI.

reset

$pg->reset;

Drops the current connection and reconnects using the original conninfo. Pending callbacks receive (undef, "connection reset"). Alias: reconnect.

finish

$pg->finish;

Closes the connection. Pending callbacks receive (undef, "connection finished"). Alias: disconnect.

is_connected

my $bool = $pg->is_connected;

Returns 1 if connected and ready for queries.

status

my $st = $pg->status;

Returns the libpq connection status (CONNECTION_OK or CONNECTION_BAD).

QUERY METHODS

query

$pg->query($sql, sub { my ($result, $err) = @_; });

Sends a simple query. Not allowed in pipeline mode -- use query_params instead. Multi-statement strings (e.g. "SELECT 1; SELECT 2") are supported but only the last result is delivered to the callback. PostgreSQL stops executing after the first error, so errors always appear as the last result.

query_params

$pg->query_params($sql, \@params, sub { my ($result, $err) = @_; });

Sends a parameterized query. Parameters are referenced in SQL as $1, $2, etc. undef values are sent as SQL NULL.

prepare

$pg->prepare($name, $sql, sub { my ($result, $err) = @_; });

Creates a prepared statement. The callback receives an empty string ("") on success. Alias: prep.

query_prepared

$pg->query_prepared($name, \@params, sub { my ($result, $err) = @_; });

Executes a prepared statement. Alias: qx.

describe_prepared

$pg->describe_prepared($name, sub { my ($meta, $err) = @_; });

Describes a prepared statement. The callback receives a hashref with keys nfields and nparams. When nfields is non-zero, a fields key is also present (arrayref of {name, type} hashes). When nparams is non-zero, a paramtypes key is also present (arrayref of OIDs).

describe_portal

$pg->describe_portal($name, sub { my ($meta, $err) = @_; });

Describes a portal. The callback receives the same hashref structure as describe_prepared.

set_single_row_mode

my $ok = $pg->set_single_row_mode;

Switches the most recently sent query to single-row mode. Returns 1 on success, 0 on failure (e.g. no query pending). The callback fires once per row with (\@rows) where @rows is an arrayref containing a single row (e.g. [[$col1, $col2, ...]]), then a final empty (\@rows) (where @rows has zero elements) for the completion.

set_chunked_rows_mode

my $ok = $pg->set_chunked_rows_mode($chunk_size);

Switches the most recently sent query to chunked rows mode, delivering up to $chunk_size rows at a time (requires libpq >= 17). Like single-row mode, but with lower per-callback overhead for large result sets. Returns 1 on success, 0 on failure.

close_prepared

$pg->close_prepared($name, sub { my ($result, $err) = @_; });

Closes (deallocates) a prepared statement at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success. Works in pipeline mode, unlike DEALLOCATE SQL.

close_portal

$pg->close_portal($name, sub { my ($result, $err) = @_; });

Closes a portal at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success.

cancel

my $err = $pg->cancel;

Sends a cancel request using the legacy PQcancel API. This is a blocking call. Returns undef on success, an error string on failure.

cancel_async

$pg->cancel_async(sub { my ($err) = @_; });

Sends an asynchronous cancel request using the PQcancelConn API (requires libpq >= 17). The callback receives no arguments on success, or an error string on failure. Croaks if libpq was built without async cancel support (LIBPQ_HAS_ASYNC_CANCEL).

pending_count

my $n = $pg->pending_count;

Returns the number of callbacks in the queue.

skip_pending

$pg->skip_pending;

Cancels all pending callbacks, invoking each with (undef, "skipped").

PIPELINE METHODS

enter_pipeline

$pg->enter_pipeline;

Enters pipeline mode. Queries are batched and sent without waiting for individual results.

exit_pipeline

$pg->exit_pipeline;

Exits pipeline mode. Croaks if the pipeline is not idle (has pending queries).

pipeline_sync

$pg->pipeline_sync(sub { my ($ok) = @_; });

Sends a pipeline sync point. The callback fires with (1) when all preceding queries have completed. Alias: sync.

send_pipeline_sync

$pg->send_pipeline_sync(sub { my ($ok) = @_; });

Like pipeline_sync but does not flush the send buffer (requires libpq >= 17). Useful for batching multiple sync points before a single manual flush via send_flush_request.

send_flush_request

$pg->send_flush_request;

Sends a flush request, asking the server to deliver results for queries sent so far. Alias: flush.

pipeline_status

my $st = $pg->pipeline_status;

Returns PQ_PIPELINE_OFF, PQ_PIPELINE_ON, or PQ_PIPELINE_ABORTED.

COPY METHODS

put_copy_data

my $ok = $pg->put_copy_data($data);

Sends data to the server during a COPY IN operation. Returns 1 on success (data flushed or flush scheduled), 0 if the send buffer is full (wait for writability and retry), or -1 on error.

put_copy_end

my $ok = $pg->put_copy_end;
my $ok = $pg->put_copy_end($errmsg);

Ends a COPY IN operation. Pass an error message to abort the COPY. Returns 1 on success, 0 if the send buffer is full (retry after writability), or -1 on error.

get_copy_data

my $row = $pg->get_copy_data;

Retrieves a row during COPY OUT. Returns the row data as a string, -1 when the COPY is complete, or undef if no data is available yet.

HANDLER METHODS

Each handler method is a getter/setter. Called with an argument, it sets the handler and returns the new value (or undef if cleared). Called without arguments, it returns the current handler.

on_connect

Called with no arguments on successful connection.

on_error

Called as ($error_message) on connection-level errors.

on_notify

Called as ($channel, $payload, $backend_pid) on LISTEN/NOTIFY.

on_notice

Called as ($message) on server notice/warning messages.

on_drain

Called with no arguments when the libpq send buffer has been fully flushed during a COPY IN operation. Use this to resume sending data after put_copy_data returns 0 (buffer full).

CONNECTION INFO

String accessors (db, user, host, port, error_message, parameter_status, ssl_attribute) return undef when not connected. Integer accessors return a default value (typically 0 or -1). Methods that require an active connection (client_encoding, set_client_encoding, set_error_verbosity, set_error_context_visibility, conninfo) croak when not connected.

error_message

Last error message. Alias: errstr.

transaction_status

Returns PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, or PQTRANS_UNKNOWN. Alias: txn_status.

parameter_status

my $val = $pg->parameter_status($name);

Returns a server parameter (e.g. "server_version", "client_encoding").

backend_pid

Backend process ID. Alias: pid.

server_version

Server version as an integer (e.g. 180000 for 18.0).

protocol_version

Protocol version (typically 3).

db

Database name.

user

Connected user name.

host

Server host.

hostaddr

Server IP address.

port

Server port.

socket

The underlying file descriptor.

ssl_in_use

Returns 1 if the connection uses SSL.

ssl_attribute

my $val = $pg->ssl_attribute($name);

Returns an SSL attribute (e.g. "protocol", "cipher").

ssl_attribute_names

my $names = $pg->ssl_attribute_names;

Returns an arrayref of available SSL attribute names, or undef if the connection does not use SSL.

client_encoding

Returns the current client encoding name.

set_client_encoding

$pg->set_client_encoding($encoding);

Sets the client encoding (e.g. "UTF8", "SQL_ASCII"). This is a synchronous (blocking) call that stalls the event loop for one server round trip. Best called right after on_connect fires, before any queries are dispatched. Croaks if there are pending queries or on failure.

set_error_verbosity

my $old = $pg->set_error_verbosity($level);

Sets error verbosity. Returns the previous setting.

set_error_context_visibility

my $old = $pg->set_error_context_visibility($level);

Sets error context visibility. $level is one of PQSHOW_CONTEXT_NEVER, PQSHOW_CONTEXT_ERRORS (default), or PQSHOW_CONTEXT_ALWAYS. Returns the previous setting.

error_fields

my $fields = $pg->error_fields;

Returns a hashref of structured error fields from the most recent PGRES_FATAL_ERROR result, or undef if no error has occurred. Keys (present only when non-NULL in the server response):

sqlstate    severity    primary     detail
hint        position    context     schema
table       column      datatype    constraint
internal_position       internal_query
source_file source_line source_function

result_meta

my $meta = $pg->result_meta;

Returns a hashref of metadata from the most recent query result, or undef if no result has been delivered. Keys:

nfields       number of columns
cmd_status    command status string (e.g. "SELECT 3", "INSERT 0 1")
inserted_oid  OID of inserted row (only present when valid)
fields        arrayref of column metadata hashrefs:
                name, type (OID), ftable (OID), ftablecol,
                fformat (0=text, 1=binary), fsize, fmod

conninfo

my $info = $pg->conninfo;

Returns a hashref of the connection parameters actually used by the live connection (keyword => value pairs).

connection_used_password

my $bool = $pg->connection_used_password;

Returns 1 if the connection authenticated with a password.

connection_used_gssapi

my $bool = $pg->connection_used_gssapi;

Returns 1 if the connection used GSSAPI authentication.

connection_needs_password

my $bool = $pg->connection_needs_password;

Returns 1 if the server requested a password during authentication.

trace

$pg->trace($filename);

Enables libpq protocol tracing to the specified file. Useful for debugging wire-level issues.

untrace

$pg->untrace;

Disables protocol tracing and closes the trace file.

set_trace_flags

$pg->set_trace_flags($flags);

Sets trace output flags (requires libpq >= 14). $flags is a bitmask of PQTRACE_SUPPRESS_TIMESTAMPS and PQTRACE_REGRESS_MODE.

UTILITY METHODS

escape_literal

my $quoted = $pg->escape_literal($string);

Returns a string literal escaped for use in SQL. Alias: quote.

escape_identifier

my $quoted = $pg->escape_identifier($string);

Returns an identifier escaped for use in SQL. Alias: quote_id.

escape_bytea

my $escaped = $pg->escape_bytea($binary);

Escapes binary data for use in a bytea column.

encrypt_password

my $hash = $pg->encrypt_password($password, $user);
my $hash = $pg->encrypt_password($password, $user, $algorithm);

Encrypts a password for use with ALTER ROLE ... PASSWORD. $algorithm is optional; defaults to the server's password_encryption setting (typically "scram-sha-256").

unescape_bytea

my $binary = EV::Pg->unescape_bytea($escaped);

Class method. Unescapes bytea data.

lib_version

my $ver = EV::Pg->lib_version;

Class method. Returns the libpq version as an integer.

conninfo_parse

my $params = EV::Pg->conninfo_parse($conninfo);

Class method. Parses a connection string and returns a hashref of the recognized keyword/value pairs. Croaks if the string is invalid. Useful for validating connection strings before connecting.

ALIASES

Short aliases for common methods:

q           query
qp          query_params
qx          query_prepared
prep        prepare
reconnect   reset
disconnect  finish
flush       send_flush_request
sync        pipeline_sync
quote       escape_literal
quote_id    escape_identifier
errstr      error_message
txn_status  transaction_status
pid         backend_pid

EXPORT TAGS

:status       PGRES_* result status constants
:conn         CONNECTION_OK, CONNECTION_BAD
:transaction  PQTRANS_* transaction status constants
:pipeline     PQ_PIPELINE_* pipeline status constants
:verbosity    PQERRORS_* verbosity constants
:context      PQSHOW_CONTEXT_* context visibility constants
:trace        PQTRACE_* trace flag constants
:all          all of the above

BENCHMARK

500k queries over Unix socket, PostgreSQL 18, libpq 18:

Workload   EV::Pg sequential  EV::Pg pipeline  DBD::Pg sync  DBD::Pg async+EV
SELECT          73,109 q/s      124,092 q/s     56,496 q/s      48,744 q/s
INSERT          58,534 q/s       84,467 q/s     39,068 q/s      41,559 q/s
UPSERT          26,342 q/s       34,223 q/s     28,134 q/s      27,155 q/s

Sequential mode uses prepared statements (parse once, bind+execute per call). Pipeline mode batches queries with pipeline_sync every 1000 queries. See bench/bench.pl to reproduce.

REQUIREMENTS

libpq >= 14 (PostgreSQL client library) and EV. Some features (chunked rows, close prepared/portal, no-flush pipeline sync, async cancel) require libpq >= 17.

SEE ALSO

EV, DBD::Pg, Mojo::Pg, AnyEvent::Pg

LICENSE

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.