NAME

EV::Pg - asynchronous PostgreSQL client using libpq and EV

SYNOPSIS

use v5.10;
use EV;
use EV::Pg;

my $pg = EV::Pg->new(
    conninfo   => 'dbname=mydb',
    on_error   => sub { die "PG error: $_[0]\n" },
);
$pg->on_connect(sub {
    $pg->query_params(
        'select $1::int + $2::int', [10, 20],
        sub {
            my ($rows, $err) = @_;
            die $err if $err;
            say $rows->[0][0];  # 30
            EV::break;
        },
    );
});
EV::run;

DESCRIPTION

EV::Pg is a non-blocking PostgreSQL client built on top of libpq and the EV event loop. It drives the libpq async API (PQsendQuery, PQconsumeInput, PQgetResult) through ev_io watchers on the libpq socket, so the event loop never blocks on database I/O.

Features: parameterized queries, prepared statements, pipeline mode, single-row and chunked rows (libpq >= 17), COPY IN/OUT, LISTEN/NOTIFY, async cancel (libpq >= 17), structured error fields, protocol tracing, and notice handling.

CALLBACKS

Query callbacks always receive a single positional argument on success and (undef, $error_message) on error, so

my ($result, $err) = @_;

works for every shape: $result is the success payload, $err is defined only on error. The shape of $result depends on the query:

SELECT (or single-row / chunked mode)

\@rows -- arrayref of rows; each row is an arrayref of column values with SQL NULL mapping to Perl undef.

INSERT / UPDATE / DELETE

$cmd_tuples -- the string from PQcmdTuples (e.g. "1", "0").

PREPARE / close_prepared / close_portal

"" -- always an empty string (these commands return no row count).

describe_prepared / describe_portal

\%meta -- hashref with nfields, nparams, and (when non-zero) fields (arrayref of {name, type} hashes) and paramtypes (arrayref of OIDs).

COPY

"COPY_IN", "COPY_OUT", or "COPY_BOTH" -- a string tag identifying the COPY direction.

pipeline_sync

1.

Exceptions thrown inside callbacks are caught and reported via warn so that one bad callback does not derail the rest of the queue.

CONSTRUCTOR

new

my $pg = EV::Pg->new(%args);

Returns a new EV::Pg object. If conninfo or conninfo_params is supplied, an asynchronous connect starts immediately; otherwise call connect later.

Recognized arguments:

conninfo

libpq connection string passed to connect.

conninfo_params

Hashref of connection parameters (e.g. { host => 'localhost', dbname => 'mydb', port => 5432 }), passed to connect_params. Mutually exclusive with conninfo.

expand_dbname

When true together with conninfo_params, the dbname value is itself parsed as a connection string -- so dbname => 'postgresql://host/db?sslmode=require' works.

on_connect

Fires once with no arguments when the handshake completes.

on_error

Fires as ($error_message) on connection-level errors. Defaults to sub { die @_ }; pass an explicit handler to keep the loop alive.

on_notify

Fires as ($channel, $payload, $backend_pid) for LISTEN/NOTIFY messages.

on_notice

Fires as ($message) for server NOTICE/WARNING messages.

on_drain

Fires with no arguments when the libpq send buffer has been fully flushed during a COPY -- use it to resume sending after put_copy_data returned 0.

keep_alive

When true, the connection keeps EV::run alive even with an empty callback queue. See "keep_alive".

loop

An EV loop object. Defaults to EV::default_loop.

Unknown arguments produce a carp warning and are otherwise ignored.

CONNECTION METHODS

connect

$pg->connect($conninfo);

Starts an asynchronous connection from a libpq connection string. on_connect fires on success, on_error on failure.

connect_params

$pg->connect_params(\%params);
$pg->connect_params(\%params, $expand_dbname);

Like connect but takes a hashref of keyword/value parameters. When $expand_dbname is true, the dbname entry may itself be a connection string or URI.

reset

$pg->reset;

Drops the current connection and reconnects with the same parameters. Pending callbacks fire with (undef, "connection reset") first. Alias: reconnect.

finish

$pg->finish;

Closes the connection. Pending callbacks fire with (undef, "connection finished"). Alias: disconnect.

is_connected

my $bool = $pg->is_connected;

True if the handshake has completed and the connection is ready for queries. False during connect, after finish, and after a fatal error.

status

my $st = $pg->status;

libpq connection status: CONNECTION_OK or CONNECTION_BAD. Returns CONNECTION_BAD when not connected.

QUERY METHODS

query

$pg->query($sql, sub { my ($result, $err) = @_; });

Sends a simple query. Multi-statement strings (e.g. "SELECT 1; SELECT 2") are accepted, but only the final result reaches the callback -- intermediate results are silently discarded, and because PostgreSQL stops at the first error, errors always arrive as that final result. Not allowed in pipeline mode -- use query_params there. Alias: q.

query_params

$pg->query_params($sql, \@params, sub { my ($result, $err) = @_; });

Sends a parameterized query. Parameters are referenced in SQL as $1, $2, etc.; undef elements become SQL NULL.

Values are sent in PostgreSQL's text format. Embedded NUL bytes cause the call to croak (text-format params cannot legally contain NULs) -- pass binary data through escape_bytea if you need a bytea column. Alias: qp.

prepare

$pg->prepare($name, $sql, sub { my ($result, $err) = @_; });

Creates a prepared statement at the protocol level (no SQL PREPARE parsing). The callback receives "" on success. Alias: prep.

query_prepared

$pg->query_prepared($name, \@params, sub { my ($result, $err) = @_; });

Executes a prepared statement created by prepare. Same param rules as query_params. Alias: qx.

describe_prepared

$pg->describe_prepared($name, sub { my ($meta, $err) = @_; });

Describes a prepared statement. The callback receives a hashref with keys nfields and nparams. When nfields is non-zero, a fields key is also present (arrayref of {name, type} hashes). When nparams is non-zero, a paramtypes key is also present (arrayref of OIDs).

describe_portal

$pg->describe_portal($name, sub { my ($meta, $err) = @_; });

Describes a portal. The callback receives the same hashref structure as describe_prepared.

set_single_row_mode

my $ok = $pg->set_single_row_mode;

Switches the most recently sent query into single-row mode. Must be called immediately after a send method (query, query_params, ...) and before the event loop delivers any results -- a 0 return means no query was in the right async state and should be treated as a programmer error rather than a runtime condition.

The query callback then fires once per row with a single-row \@rows (e.g. [[$col0, $col1, ...]]), and once more at the end with an empty \@rows as the completion sentinel.

set_chunked_rows_mode

my $ok = $pg->set_chunked_rows_mode($chunk_size);

Like set_single_row_mode but delivers up to $chunk_size rows per callback (requires libpq >= 17), reducing per-callback overhead for large result sets. Same call-timing constraint and same trailing empty-rows completion sentinel.

close_prepared

$pg->close_prepared($name, sub { my ($result, $err) = @_; });

Closes (deallocates) a prepared statement at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success. Works in pipeline mode, unlike DEALLOCATE SQL.

close_portal

$pg->close_portal($name, sub { my ($result, $err) = @_; });

Closes a portal at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success.

cancel

my $err = $pg->cancel;

Sends a cancel request using the legacy PQcancel API. Blocks the event loop for one network round trip; prefer cancel_async on libpq >= 17. Returns undef on success or an error string on failure.

cancel_async

$pg->cancel_async(sub { my ($r, $err) = @_; });

Sends a non-blocking cancel request using the PQcancelConn API (requires libpq >= 17). The callback receives (1) on success or (undef, $errmsg) on failure. Croaks if a cancel is already in progress.

pending_count

my $n = $pg->pending_count;

Number of callbacks currently in the queue (queries sent but not yet delivered).

keep_alive

$pg->keep_alive(1);
my $bool = $pg->keep_alive;

When true, the read watcher keeps EV::run alive even when the callback queue is empty. Required when waiting for server-side NOTIFY events via on_notify -- without this flag the loop would exit as soon as the LISTEN query completes. Getter/setter.

skip_pending

$pg->skip_pending;

Drops every queued callback, invoking each with (undef, "skipped"). Any in-flight server results are drained and discarded; the connection remains usable for new queries.

PIPELINE METHODS

Pipeline mode lets you send multiple queries without waiting for individual results, then receive the results in order after a sync point. Inside a pipeline you must use query_params or query_prepared -- query is rejected.

enter_pipeline

$pg->enter_pipeline;

Switches the connection into pipeline mode. Croaks if there are unfinished results outstanding.

exit_pipeline

$pg->exit_pipeline;

Returns to normal mode. Croaks if the pipeline is not idle.

pipeline_sync

$pg->pipeline_sync(sub { my ($r, $err) = @_; });

Sends a pipeline sync point. The callback fires with (1) after all preceding queries in the batch have completed, or (undef, $errmsg) if the connection drops first. Alias: sync.

send_pipeline_sync

$pg->send_pipeline_sync(sub { my ($r, $err) = @_; });

Like pipeline_sync but does not flush the send buffer (requires libpq >= 17). Useful for batching multiple sync points before a single manual flush via send_flush_request.

send_flush_request

$pg->send_flush_request;

Asks the server to deliver results for queries sent so far -- the manual companion to send_pipeline_sync. Alias: flush.

pipeline_status

my $st = $pg->pipeline_status;

One of PQ_PIPELINE_OFF, PQ_PIPELINE_ON, or PQ_PIPELINE_ABORTED.

COPY METHODS

A COPY command runs in two phases: the query callback first fires with a string tag ("COPY_IN" / "COPY_OUT" / "COPY_BOTH") to signal that streaming has started, then fires a second time with the final command result (or error) when the stream ends. See eg/copy_in.pl and eg/copy_out.pl.

put_copy_data

my $rc = $pg->put_copy_data($data);

Sends a chunk during COPY IN. Returns 1 on success (data buffered or flushed), 0 if the send buffer is full (wait for writability via on_drain, then retry), or -1 on error.

put_copy_end

my $rc = $pg->put_copy_end;
my $rc = $pg->put_copy_end($errmsg);

Ends a COPY IN. With $errmsg aborts the COPY server-side. Same return convention as put_copy_data.

get_copy_data

my $row = $pg->get_copy_data;

Retrieves the next row during COPY OUT. Returns the row bytes, the integer -1 when the stream is complete, or undef if nothing is currently buffered (call again after the next read).

HANDLER METHODS

Each handler is a getter/setter: pass a coderef to install it (returning the new value), pass undef to clear it, or call without arguments to read the current handler.

on_connect

Fires once with no arguments after the handshake completes.

on_error

Fires as ($error_message) for connection-level errors (handshake failure, lost socket, libpq protocol errors). Per-query errors come through the query callback, not here.

on_notify

Fires as ($channel, $payload, $backend_pid) for each LISTEN/NOTIFY message.

on_notice

Fires as ($message) for server NOTICE/WARNING messages.

on_drain

Fires with no arguments when the libpq send buffer has been fully flushed during a COPY -- use it to resume put_copy_data after a 0 return.

CONNECTION INFO

String accessors (db, user, host, hostaddr, port, error_message, parameter_status, ssl_attribute) return undef when not connected. Integer accessors return a default value (typically 0 or -1). Methods that require an active connection (client_encoding, set_client_encoding, set_error_verbosity, set_error_context_visibility, conninfo) croak otherwise.

error_message

my $msg = $pg->error_message;

Last error message from libpq. Alias: errstr.

transaction_status

my $st = $pg->transaction_status;

One of PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, PQTRANS_UNKNOWN. Alias: txn_status.

parameter_status

my $val = $pg->parameter_status($name);

Server parameter value (e.g. "server_version", "client_encoding", "server_encoding").

backend_pid

my $pid = $pg->backend_pid;

Backend process ID. Alias: pid.

server_version

my $ver = $pg->server_version;

Server version as a packed integer: MMmmpp (major * 10000 + minor * 100 + patch) for releases before 10, MM0000 + patch from 10 onwards. PostgreSQL 18.0 returns 180000; 17.5 returns 170005.

protocol_version

my $ver = $pg->protocol_version;

Frontend/backend protocol version (typically 3).

db

my $dbname = $pg->db;

Database name.

user

my $user = $pg->user;

Connected user name.

host

my $host = $pg->host;

Server host as supplied to connect (may be a hostname or socket dir).

hostaddr

my $addr = $pg->hostaddr;

Server IP address.

port

my $port = $pg->port;

Server port.

socket

my $fd = $pg->socket;

Underlying socket file descriptor (for advanced uses such as installing your own watcher). Returns -1 when not connected.

ssl_in_use

my $bool = $pg->ssl_in_use;

True if the connection is encrypted with SSL.

ssl_attribute

my $val = $pg->ssl_attribute($name);

SSL attribute (e.g. "protocol", "cipher", "key_bits").

ssl_attribute_names

my $names = $pg->ssl_attribute_names;

Arrayref of available SSL attribute names, or undef if the connection does not use SSL.

client_encoding

my $enc = $pg->client_encoding;

Current client encoding name (e.g. "UTF8").

set_client_encoding

$pg->set_client_encoding($encoding);

Sets the client encoding (e.g. "UTF8", "SQL_ASCII"). This is a synchronous (blocking) call that stalls the event loop for one server round trip, so it is best invoked right after on_connect fires and before any queries are dispatched. Croaks if there are pending queries or on failure.

set_error_verbosity

my $old = $pg->set_error_verbosity($level);

Sets error verbosity. $level is one of PQERRORS_TERSE, PQERRORS_DEFAULT, PQERRORS_VERBOSE, or PQERRORS_SQLSTATE. Returns the previous setting.

set_error_context_visibility

my $old = $pg->set_error_context_visibility($level);

Sets error context visibility. $level is one of PQSHOW_CONTEXT_NEVER, PQSHOW_CONTEXT_ERRORS (default), or PQSHOW_CONTEXT_ALWAYS. Returns the previous setting.

error_fields

my $fields = $pg->error_fields;

Returns a hashref of structured error fields from the most recent PGRES_FATAL_ERROR result, or undef if no fatal error has been seen. Persists until the next fatal error; successful queries do not clear it. Each key is present only when the corresponding field is non-NULL in the server response:

sqlstate            severity            primary
detail              hint                position
context             schema              table
column              datatype            constraint
internal_position   internal_query
source_file         source_line         source_function

result_meta

my $meta = $pg->result_meta;

Returns a hashref of metadata for the most recent query result, or undef if no result has been delivered. Refreshed by every successful result (including commands with no columns) but not by errors, COPY, or pipeline sync results -- so after an error this returns metadata for the last successful query and you should check $err before relying on it. Cleared by reset/finish.

Keys:

nfields       number of columns
cmd_status    command status string (e.g. "SELECT 3", "INSERT 0 1")
inserted_oid  OID of inserted row -- only present for single-row
                INSERTs that generated an OID (legacy WITH OIDS
                tables); absent for normal INSERTs and other commands
fields        arrayref of column metadata hashrefs:
                name, type (OID), ftable (OID), ftablecol,
                fformat (0=text, 1=binary), fsize, fmod

conninfo

my $info = $pg->conninfo;

Returns a hashref of the connection parameters actually used by the live connection (keyword => value pairs).

connection_used_password

my $bool = $pg->connection_used_password;

Returns 1 if the connection authenticated with a password.

connection_used_gssapi

my $bool = $pg->connection_used_gssapi;

Returns 1 if the connection used GSSAPI authentication.

connection_needs_password

my $bool = $pg->connection_needs_password;

Returns 1 if the server requested a password during authentication.

trace

$pg->trace($filename);

Enables libpq protocol tracing, writing the wire-level frontend/backend exchange to $filename. Croaks if the file cannot be opened.

untrace

$pg->untrace;

Stops tracing and closes the trace file. Safe to call when tracing is not active.

set_trace_flags

$pg->set_trace_flags($flags);

Sets the trace output style. $flags is a bitmask of PQTRACE_SUPPRESS_TIMESTAMPS and/or PQTRACE_REGRESS_MODE (handy when diffing traces).

UTILITY METHODS

escape_literal

my $quoted = $pg->escape_literal($string);

Quotes and escapes a string for safe interpolation into SQL (wraps the value in single quotes and doubles internal quotes). Alias: quote.

escape_identifier

my $quoted = $pg->escape_identifier($string);

Quotes and escapes an identifier for safe interpolation into SQL (wraps in double quotes). Alias: quote_id.

escape_bytea

my $escaped = $pg->escape_bytea($binary);

Escapes binary bytes into the textual bytea form expected by the server (the \x... hex notation). Pair with unescape_bytea to go the other way.

encrypt_password

my $hashed = $pg->encrypt_password($password, $user);
my $hashed = $pg->encrypt_password($password, $user, $algorithm);

Hashes a password client-side (so the cleartext never reaches the server) ready to be passed to ALTER ROLE ... PASSWORD. $algorithm is optional; when omitted the server's password_encryption setting decides (typically "scram-sha-256").

unescape_bytea

my $binary = EV::Pg->unescape_bytea($escaped);

Class method. Decodes the textual bytea form back to raw bytes.

lib_version

my $ver = EV::Pg->lib_version;

Class method. Returns the libpq version as an integer (same encoding as server_version; e.g. 170000 for libpq 17.0).

conninfo_parse

my $params = EV::Pg->conninfo_parse($conninfo);

Class method. Parses a connection string and returns a hashref of the recognized keyword/value pairs, croaking if the string is malformed. Handy for validating connection strings before connecting.

ALIASES

Short aliases for common methods:

q           query
qp          query_params
qx          query_prepared
prep        prepare
reconnect   reset
disconnect  finish
flush       send_flush_request  (libpq >= 17)
sync        pipeline_sync
quote       escape_literal
quote_id    escape_identifier
errstr      error_message
txn_status  transaction_status
pid         backend_pid

EXPORT TAGS

:status       PGRES_* result status constants
:conn         CONNECTION_OK, CONNECTION_BAD
:transaction  PQTRANS_* transaction status constants
:pipeline     PQ_PIPELINE_* pipeline status constants
:verbosity    PQERRORS_* verbosity constants
:context      PQSHOW_CONTEXT_* context visibility constants
:trace        PQTRACE_* trace flag constants
:all          all of the above

BENCHMARK

500k queries over Unix socket, PostgreSQL 18, libpq 18:

Workload   EV::Pg sequential  EV::Pg pipeline  DBD::Pg sync  DBD::Pg async+EV
SELECT          83,998 q/s      144,939 q/s     73,195 q/s      65,966 q/s
INSERT          67,053 q/s       85,701 q/s     60,127 q/s      58,329 q/s
UPSERT          37,360 q/s       43,019 q/s     40,278 q/s      40,173 q/s

Sequential mode uses prepared statements (parse once, bind+execute per call). Pipeline mode batches queries with pipeline_sync every 1000 queries. See bench/bench.pl to reproduce.

REQUIREMENTS

libpq >= 14 (PostgreSQL client library) and EV. A handful of features -- chunked rows mode, close_prepared/close_portal, send_pipeline_sync/send_flush_request, and cancel_async -- require libpq >= 17 and degrade gracefully when not available (the methods are simply not defined).

SEE ALSO

EV, DBD::Pg, Mojo::Pg, AnyEvent::Pg

LICENSE

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.