NAME
EV::Pg - asynchronous PostgreSQL client using libpq and EV
SYNOPSIS
use v5.10;
use EV;
use EV::Pg;
my $pg = EV::Pg->new(
conninfo => 'dbname=mydb',
on_error => sub { die "PG error: $_[0]\n" },
);
$pg->on_connect(sub {
$pg->query_params(
'select $1::int + $2::int', [10, 20],
sub {
my ($rows, $err) = @_;
die $err if $err;
say $rows->[0][0]; # 30
EV::break;
},
);
});
EV::run;
DESCRIPTION
EV::Pg is a non-blocking PostgreSQL client built on top of libpq and the EV event loop. It drives the libpq async API (PQsendQuery, PQconsumeInput, PQgetResult) through ev_io watchers on the libpq socket, so the event loop never blocks on database I/O.
Features: parameterized queries, prepared statements, pipeline mode, single-row and chunked rows (libpq >= 17), COPY IN/OUT, LISTEN/NOTIFY, async cancel (libpq >= 17), structured error fields, protocol tracing, and notice handling.
CALLBACKS
Query callbacks always receive a single positional argument on success and (undef, $error_message) on error, so
my ($result, $err) = @_;
works for every shape: $result is the success payload, $err is defined only on error. The shape of $result depends on the query:
- SELECT (or single-row / chunked mode)
-
\@rows-- arrayref of rows; each row is an arrayref of column values with SQL NULL mapping to Perlundef. - INSERT / UPDATE / DELETE
-
$cmd_tuples-- the string fromPQcmdTuples(e.g."1","0"). - PREPARE / close_prepared / close_portal
-
""-- always an empty string (these commands return no row count). - describe_prepared / describe_portal
-
\%meta-- hashref withnfields,nparams, and (when non-zero)fields(arrayref of{name, type}hashes) andparamtypes(arrayref of OIDs). - COPY
-
"COPY_IN","COPY_OUT", or"COPY_BOTH"-- a string tag identifying the COPY direction. - pipeline_sync
-
1.
Exceptions thrown inside callbacks are caught and reported via warn so that one bad callback does not derail the rest of the queue.
CONSTRUCTOR
new
my $pg = EV::Pg->new(%args);
Returns a new EV::Pg object. If conninfo or conninfo_params is supplied, an asynchronous connect starts immediately; otherwise call connect later.
Recognized arguments:
- conninfo
-
libpq connection string passed to
connect. - conninfo_params
-
Hashref of connection parameters (e.g.
{ host => 'localhost', dbname => 'mydb', port => 5432 }), passed toconnect_params. Mutually exclusive withconninfo. - expand_dbname
-
When true together with
conninfo_params, thedbnamevalue is itself parsed as a connection string -- sodbname => 'postgresql://host/db?sslmode=require'works. - on_connect
-
Fires once with no arguments when the handshake completes.
- on_error
-
Fires as
($error_message)on connection-level errors. Defaults tosub { die @_ }; pass an explicit handler to keep the loop alive. - on_notify
-
Fires as
($channel, $payload, $backend_pid)for LISTEN/NOTIFY messages. - on_notice
-
Fires as
($message)for server NOTICE/WARNING messages. - on_drain
-
Fires with no arguments when the libpq send buffer has been fully flushed during a COPY -- use it to resume sending after
put_copy_datareturned 0. - keep_alive
-
When true, the connection keeps
EV::runalive even with an empty callback queue. See "keep_alive". - loop
-
An EV loop object. Defaults to
EV::default_loop.
Unknown arguments produce a carp warning and are otherwise ignored.
CONNECTION METHODS
connect
$pg->connect($conninfo);
Starts an asynchronous connection from a libpq connection string. on_connect fires on success, on_error on failure.
connect_params
$pg->connect_params(\%params);
$pg->connect_params(\%params, $expand_dbname);
Like connect but takes a hashref of keyword/value parameters. When $expand_dbname is true, the dbname entry may itself be a connection string or URI.
reset
$pg->reset;
Drops the current connection and reconnects with the same parameters. Pending callbacks fire with (undef, "connection reset") first. Alias: reconnect.
finish
$pg->finish;
Closes the connection. Pending callbacks fire with (undef, "connection finished"). Alias: disconnect.
is_connected
my $bool = $pg->is_connected;
True if the handshake has completed and the connection is ready for queries. False during connect, after finish, and after a fatal error.
status
my $st = $pg->status;
libpq connection status: CONNECTION_OK or CONNECTION_BAD. Returns CONNECTION_BAD when not connected.
QUERY METHODS
query
$pg->query($sql, sub { my ($result, $err) = @_; });
Sends a simple query. Multi-statement strings (e.g. "SELECT 1; SELECT 2") are accepted, but only the final result reaches the callback -- intermediate results are silently discarded, and because PostgreSQL stops at the first error, errors always arrive as that final result. Not allowed in pipeline mode -- use query_params there. Alias: q.
query_params
$pg->query_params($sql, \@params, sub { my ($result, $err) = @_; });
Sends a parameterized query. Parameters are referenced in SQL as $1, $2, etc.; undef elements become SQL NULL.
Values are sent in PostgreSQL's text format. Embedded NUL bytes cause the call to croak (text-format params cannot legally contain NULs) -- pass binary data through escape_bytea if you need a bytea column. Alias: qp.
prepare
$pg->prepare($name, $sql, sub { my ($result, $err) = @_; });
Creates a prepared statement at the protocol level (no SQL PREPARE parsing). The callback receives "" on success. Alias: prep.
query_prepared
$pg->query_prepared($name, \@params, sub { my ($result, $err) = @_; });
Executes a prepared statement created by prepare. Same param rules as query_params. Alias: qx.
describe_prepared
$pg->describe_prepared($name, sub { my ($meta, $err) = @_; });
Describes a prepared statement. The callback receives a hashref with keys nfields and nparams. When nfields is non-zero, a fields key is also present (arrayref of {name, type} hashes). When nparams is non-zero, a paramtypes key is also present (arrayref of OIDs).
describe_portal
$pg->describe_portal($name, sub { my ($meta, $err) = @_; });
Describes a portal. The callback receives the same hashref structure as describe_prepared.
set_single_row_mode
my $ok = $pg->set_single_row_mode;
Switches the most recently sent query into single-row mode. Must be called immediately after a send method (query, query_params, ...) and before the event loop delivers any results -- a 0 return means no query was in the right async state and should be treated as a programmer error rather than a runtime condition.
The query callback then fires once per row with a single-row \@rows (e.g. [[$col0, $col1, ...]]), and once more at the end with an empty \@rows as the completion sentinel.
set_chunked_rows_mode
my $ok = $pg->set_chunked_rows_mode($chunk_size);
Like set_single_row_mode but delivers up to $chunk_size rows per callback (requires libpq >= 17), reducing per-callback overhead for large result sets. Same call-timing constraint and same trailing empty-rows completion sentinel.
close_prepared
$pg->close_prepared($name, sub { my ($result, $err) = @_; });
Closes (deallocates) a prepared statement at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success. Works in pipeline mode, unlike DEALLOCATE SQL.
close_portal
$pg->close_portal($name, sub { my ($result, $err) = @_; });
Closes a portal at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success.
cancel
my $err = $pg->cancel;
Sends a cancel request using the legacy PQcancel API. Blocks the event loop for one network round trip; prefer cancel_async on libpq >= 17. Returns undef on success or an error string on failure.
cancel_async
$pg->cancel_async(sub { my ($r, $err) = @_; });
Sends a non-blocking cancel request using the PQcancelConn API (requires libpq >= 17). The callback receives (1) on success or (undef, $errmsg) on failure. Croaks if a cancel is already in progress.
pending_count
my $n = $pg->pending_count;
Number of callbacks currently in the queue (queries sent but not yet delivered).
keep_alive
$pg->keep_alive(1);
my $bool = $pg->keep_alive;
When true, the read watcher keeps EV::run alive even when the callback queue is empty. Required when waiting for server-side NOTIFY events via on_notify -- without this flag the loop would exit as soon as the LISTEN query completes. Getter/setter.
skip_pending
$pg->skip_pending;
Drops every queued callback, invoking each with (undef, "skipped"). Any in-flight server results are drained and discarded; the connection remains usable for new queries.
PIPELINE METHODS
Pipeline mode lets you send multiple queries without waiting for individual results, then receive the results in order after a sync point. Inside a pipeline you must use query_params or query_prepared -- query is rejected.
enter_pipeline
$pg->enter_pipeline;
Switches the connection into pipeline mode. Croaks if there are unfinished results outstanding.
exit_pipeline
$pg->exit_pipeline;
Returns to normal mode. Croaks if the pipeline is not idle.
pipeline_sync
$pg->pipeline_sync(sub { my ($r, $err) = @_; });
Sends a pipeline sync point. The callback fires with (1) after all preceding queries in the batch have completed, or (undef, $errmsg) if the connection drops first. Alias: sync.
send_pipeline_sync
$pg->send_pipeline_sync(sub { my ($r, $err) = @_; });
Like pipeline_sync but does not flush the send buffer (requires libpq >= 17). Useful for batching multiple sync points before a single manual flush via send_flush_request.
send_flush_request
$pg->send_flush_request;
Asks the server to deliver results for queries sent so far -- the manual companion to send_pipeline_sync. Alias: flush.
pipeline_status
my $st = $pg->pipeline_status;
One of PQ_PIPELINE_OFF, PQ_PIPELINE_ON, or PQ_PIPELINE_ABORTED.
COPY METHODS
A COPY command runs in two phases: the query callback first fires with a string tag ("COPY_IN" / "COPY_OUT" / "COPY_BOTH") to signal that streaming has started, then fires a second time with the final command result (or error) when the stream ends. See eg/copy_in.pl and eg/copy_out.pl.
put_copy_data
my $rc = $pg->put_copy_data($data);
Sends a chunk during COPY IN. Returns 1 on success (data buffered or flushed), 0 if the send buffer is full (wait for writability via on_drain, then retry), or -1 on error.
put_copy_end
my $rc = $pg->put_copy_end;
my $rc = $pg->put_copy_end($errmsg);
Ends a COPY IN. With $errmsg aborts the COPY server-side. Same return convention as put_copy_data.
get_copy_data
my $row = $pg->get_copy_data;
Retrieves the next row during COPY OUT. Returns the row bytes, the integer -1 when the stream is complete, or undef if nothing is currently buffered (call again after the next read).
HANDLER METHODS
Each handler is a getter/setter: pass a coderef to install it (returning the new value), pass undef to clear it, or call without arguments to read the current handler.
on_connect
Fires once with no arguments after the handshake completes.
on_error
Fires as ($error_message) for connection-level errors (handshake failure, lost socket, libpq protocol errors). Per-query errors come through the query callback, not here.
on_notify
Fires as ($channel, $payload, $backend_pid) for each LISTEN/NOTIFY message.
on_notice
Fires as ($message) for server NOTICE/WARNING messages.
on_drain
Fires with no arguments when the libpq send buffer has been fully flushed during a COPY -- use it to resume put_copy_data after a 0 return.
CONNECTION INFO
String accessors (db, user, host, hostaddr, port, error_message, parameter_status, ssl_attribute) return undef when not connected. Integer accessors return a default value (typically 0 or -1). Methods that require an active connection (client_encoding, set_client_encoding, set_error_verbosity, set_error_context_visibility, conninfo) croak otherwise.
error_message
my $msg = $pg->error_message;
Last error message from libpq. Alias: errstr.
transaction_status
my $st = $pg->transaction_status;
One of PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, PQTRANS_UNKNOWN. Alias: txn_status.
parameter_status
my $val = $pg->parameter_status($name);
Server parameter value (e.g. "server_version", "client_encoding", "server_encoding").
backend_pid
my $pid = $pg->backend_pid;
Backend process ID. Alias: pid.
server_version
my $ver = $pg->server_version;
Server version as a packed integer: MMmmpp (major * 10000 + minor * 100 + patch) for releases before 10, MM0000 + patch from 10 onwards. PostgreSQL 18.0 returns 180000; 17.5 returns 170005.
protocol_version
my $ver = $pg->protocol_version;
Frontend/backend protocol version (typically 3).
db
my $dbname = $pg->db;
Database name.
user
my $user = $pg->user;
Connected user name.
host
my $host = $pg->host;
Server host as supplied to connect (may be a hostname or socket dir).
hostaddr
my $addr = $pg->hostaddr;
Server IP address.
port
my $port = $pg->port;
Server port.
socket
my $fd = $pg->socket;
Underlying socket file descriptor (for advanced uses such as installing your own watcher). Returns -1 when not connected.
ssl_in_use
my $bool = $pg->ssl_in_use;
True if the connection is encrypted with SSL.
ssl_attribute
my $val = $pg->ssl_attribute($name);
SSL attribute (e.g. "protocol", "cipher", "key_bits").
ssl_attribute_names
my $names = $pg->ssl_attribute_names;
Arrayref of available SSL attribute names, or undef if the connection does not use SSL.
client_encoding
my $enc = $pg->client_encoding;
Current client encoding name (e.g. "UTF8").
set_client_encoding
$pg->set_client_encoding($encoding);
Sets the client encoding (e.g. "UTF8", "SQL_ASCII"). This is a synchronous (blocking) call that stalls the event loop for one server round trip, so it is best invoked right after on_connect fires and before any queries are dispatched. Croaks if there are pending queries or on failure.
set_error_verbosity
my $old = $pg->set_error_verbosity($level);
Sets error verbosity. $level is one of PQERRORS_TERSE, PQERRORS_DEFAULT, PQERRORS_VERBOSE, or PQERRORS_SQLSTATE. Returns the previous setting.
set_error_context_visibility
my $old = $pg->set_error_context_visibility($level);
Sets error context visibility. $level is one of PQSHOW_CONTEXT_NEVER, PQSHOW_CONTEXT_ERRORS (default), or PQSHOW_CONTEXT_ALWAYS. Returns the previous setting.
error_fields
my $fields = $pg->error_fields;
Returns a hashref of structured error fields from the most recent PGRES_FATAL_ERROR result, or undef if no fatal error has been seen. Persists until the next fatal error; successful queries do not clear it. Each key is present only when the corresponding field is non-NULL in the server response:
sqlstate severity primary
detail hint position
context schema table
column datatype constraint
internal_position internal_query
source_file source_line source_function
result_meta
my $meta = $pg->result_meta;
Returns a hashref of metadata for the most recent query result, or undef if no result has been delivered. Refreshed by every successful result (including commands with no columns) but not by errors, COPY, or pipeline sync results -- so after an error this returns metadata for the last successful query and you should check $err before relying on it. Cleared by reset/finish.
Keys:
nfields number of columns
cmd_status command status string (e.g. "SELECT 3", "INSERT 0 1")
inserted_oid OID of inserted row -- only present for single-row
INSERTs that generated an OID (legacy WITH OIDS
tables); absent for normal INSERTs and other commands
fields arrayref of column metadata hashrefs:
name, type (OID), ftable (OID), ftablecol,
fformat (0=text, 1=binary), fsize, fmod
conninfo
my $info = $pg->conninfo;
Returns a hashref of the connection parameters actually used by the live connection (keyword => value pairs).
connection_used_password
my $bool = $pg->connection_used_password;
Returns 1 if the connection authenticated with a password.
connection_used_gssapi
my $bool = $pg->connection_used_gssapi;
Returns 1 if the connection used GSSAPI authentication.
connection_needs_password
my $bool = $pg->connection_needs_password;
Returns 1 if the server requested a password during authentication.
trace
$pg->trace($filename);
Enables libpq protocol tracing, writing the wire-level frontend/backend exchange to $filename. Croaks if the file cannot be opened.
untrace
$pg->untrace;
Stops tracing and closes the trace file. Safe to call when tracing is not active.
set_trace_flags
$pg->set_trace_flags($flags);
Sets the trace output style. $flags is a bitmask of PQTRACE_SUPPRESS_TIMESTAMPS and/or PQTRACE_REGRESS_MODE (handy when diffing traces).
UTILITY METHODS
escape_literal
my $quoted = $pg->escape_literal($string);
Quotes and escapes a string for safe interpolation into SQL (wraps the value in single quotes and doubles internal quotes). Alias: quote.
escape_identifier
my $quoted = $pg->escape_identifier($string);
Quotes and escapes an identifier for safe interpolation into SQL (wraps in double quotes). Alias: quote_id.
escape_bytea
my $escaped = $pg->escape_bytea($binary);
Escapes binary bytes into the textual bytea form expected by the server (the \x... hex notation). Pair with unescape_bytea to go the other way.
encrypt_password
my $hashed = $pg->encrypt_password($password, $user);
my $hashed = $pg->encrypt_password($password, $user, $algorithm);
Hashes a password client-side (so the cleartext never reaches the server) ready to be passed to ALTER ROLE ... PASSWORD. $algorithm is optional; when omitted the server's password_encryption setting decides (typically "scram-sha-256").
unescape_bytea
my $binary = EV::Pg->unescape_bytea($escaped);
Class method. Decodes the textual bytea form back to raw bytes.
lib_version
my $ver = EV::Pg->lib_version;
Class method. Returns the libpq version as an integer (same encoding as server_version; e.g. 170000 for libpq 17.0).
conninfo_parse
my $params = EV::Pg->conninfo_parse($conninfo);
Class method. Parses a connection string and returns a hashref of the recognized keyword/value pairs, croaking if the string is malformed. Handy for validating connection strings before connecting.
ALIASES
Short aliases for common methods:
q query
qp query_params
qx query_prepared
prep prepare
reconnect reset
disconnect finish
flush send_flush_request (libpq >= 17)
sync pipeline_sync
quote escape_literal
quote_id escape_identifier
errstr error_message
txn_status transaction_status
pid backend_pid
EXPORT TAGS
:status PGRES_* result status constants
:conn CONNECTION_OK, CONNECTION_BAD
:transaction PQTRANS_* transaction status constants
:pipeline PQ_PIPELINE_* pipeline status constants
:verbosity PQERRORS_* verbosity constants
:context PQSHOW_CONTEXT_* context visibility constants
:trace PQTRACE_* trace flag constants
:all all of the above
BENCHMARK
500k queries over Unix socket, PostgreSQL 18, libpq 18:
Workload EV::Pg sequential EV::Pg pipeline DBD::Pg sync DBD::Pg async+EV
SELECT 83,998 q/s 144,939 q/s 73,195 q/s 65,966 q/s
INSERT 67,053 q/s 85,701 q/s 60,127 q/s 58,329 q/s
UPSERT 37,360 q/s 43,019 q/s 40,278 q/s 40,173 q/s
Sequential mode uses prepared statements (parse once, bind+execute per call). Pipeline mode batches queries with pipeline_sync every 1000 queries. See bench/bench.pl to reproduce.
REQUIREMENTS
libpq >= 14 (PostgreSQL client library) and EV. A handful of features -- chunked rows mode, close_prepared/close_portal, send_pipeline_sync/send_flush_request, and cancel_async -- require libpq >= 17 and degrade gracefully when not available (the methods are simply not defined).
SEE ALSO
EV, DBD::Pg, Mojo::Pg, AnyEvent::Pg
LICENSE
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.