NAME
EV::Pg - asynchronous PostgreSQL client using libpq and EV
SYNOPSIS
use v5.10;
use EV;
use EV::Pg;
my $pg = EV::Pg->new(
conninfo => 'dbname=mydb',
on_error => sub { die "PG error: $_[0]\n" },
);
$pg->on_connect(sub {
$pg->query_params(
'select $1::int + $2::int', [10, 20],
sub {
my ($rows, $err) = @_;
die $err if $err;
say $rows->[0][0]; # 30
EV::break;
},
);
});
EV::run;
DESCRIPTION
EV::Pg is a non-blocking PostgreSQL client that integrates with the EV event loop. It drives the libpq async API (PQsendQuery, PQconsumeInput, PQgetResult) via ev_io watchers on the libpq socket, so the event loop never blocks on database I/O.
Features: parameterized queries, prepared statements, pipeline mode, single-row mode, chunked rows (libpq >= 17), COPY IN/OUT, LISTEN/NOTIFY, async cancel (libpq >= 17), structured error fields, protocol tracing, and notice handling.
CALLBACKS
Query callbacks receive ($result) on success, (undef, $error) on error:
- SELECT / single-row mode
-
(\@rows)where each row is an arrayref of column values.undefcolumns map to Perlundef. - INSERT / UPDATE / DELETE
-
($cmd_tuples)-- the string returned byPQcmdTuples(e.g."1","0"). - Describe
-
(\%meta)with keysnfields,nparams, and (when non-zero)fields(arrayref of{name, type}hashes) andparamtypes(arrayref of OIDs). - COPY
-
("COPY_IN"),("COPY_OUT"), or("COPY_BOTH"). - Pipeline sync
-
(1). - Error
-
(undef, $error_message).
Exceptions thrown inside callbacks are caught and emitted as warnings.
CONSTRUCTOR
new
my $pg = EV::Pg->new(%args);
Arguments:
- conninfo
-
libpq connection string. If provided,
connectis called immediately. - conninfo_params
-
Hashref of connection parameters (e.g.
{ host => 'localhost', dbname => 'mydb', port => '5432' }). Alternative toconninfo. If provided,connect_paramsis called immediately. - expand_dbname
-
If true and
conninfo_paramsis used, thedbnamevalue is parsed as a connection string (allowingdbname => 'postgresql://...'). - on_connect
-
Callback invoked (with no arguments) when the connection is established.
- on_error
-
Callback invoked as
($error_message)on connection-level errors. Defaults tosub { die @_ }. - on_notify
-
Callback invoked as
($channel, $payload, $backend_pid)on LISTEN/NOTIFY messages. - on_notice
-
Callback invoked as
($message)on PostgreSQL notice/warning messages. - on_drain
-
Callback invoked (with no arguments) when the send buffer has been flushed during COPY IN. Useful for resuming
put_copy_dataafter it returns 0. - loop
-
An EV loop object. Defaults to
EV::default_loop.
CONNECTION METHODS
connect
$pg->connect($conninfo);
Initiates an asynchronous connection. The on_connect handler fires on success; on_error fires on failure.
connect_params
$pg->connect_params(\%params);
$pg->connect_params(\%params, $expand_dbname);
Initiates an asynchronous connection using keyword/value parameters instead of a connection string. $expand_dbname allows the dbname parameter to contain a full connection URI.
reset
$pg->reset;
Drops the current connection and reconnects using the original conninfo. Pending callbacks receive (undef, "connection reset"). Alias: reconnect.
finish
$pg->finish;
Closes the connection. Pending callbacks receive (undef, "connection finished"). Alias: disconnect.
is_connected
my $bool = $pg->is_connected;
Returns 1 if connected and ready for queries.
status
my $st = $pg->status;
Returns the libpq connection status (CONNECTION_OK or CONNECTION_BAD).
QUERY METHODS
query
$pg->query($sql, sub { my ($result, $err) = @_; });
Sends a simple query. Not allowed in pipeline mode -- use query_params instead. Multi-statement strings (e.g. "SELECT 1; SELECT 2") are supported but only the last result is delivered to the callback. PostgreSQL stops executing after the first error, so errors always appear as the last result.
query_params
$pg->query_params($sql, \@params, sub { my ($result, $err) = @_; });
Sends a parameterized query. Parameters are referenced in SQL as $1, $2, etc. undef values are sent as SQL NULL.
prepare
$pg->prepare($name, $sql, sub { my ($result, $err) = @_; });
Creates a prepared statement. The callback receives an empty string ("") on success. Alias: prep.
query_prepared
$pg->query_prepared($name, \@params, sub { my ($result, $err) = @_; });
Executes a prepared statement. Alias: qx.
describe_prepared
$pg->describe_prepared($name, sub { my ($meta, $err) = @_; });
Describes a prepared statement. The callback receives a hashref with keys nfields and nparams. When nfields is non-zero, a fields key is also present (arrayref of {name, type} hashes). When nparams is non-zero, a paramtypes key is also present (arrayref of OIDs).
describe_portal
$pg->describe_portal($name, sub { my ($meta, $err) = @_; });
Describes a portal. The callback receives the same hashref structure as describe_prepared.
set_single_row_mode
my $ok = $pg->set_single_row_mode;
Switches the most recently sent query to single-row mode. Returns 1 on success, 0 on failure (e.g. no query pending). The callback fires once per row with (\@rows) where @rows is an arrayref containing a single row (e.g. [[$col1, $col2, ...]]), then a final empty (\@rows) (where @rows has zero elements) for the completion.
set_chunked_rows_mode
my $ok = $pg->set_chunked_rows_mode($chunk_size);
Switches the most recently sent query to chunked rows mode, delivering up to $chunk_size rows at a time (requires libpq >= 17). Like single-row mode, but with lower per-callback overhead for large result sets. Returns 1 on success, 0 on failure.
close_prepared
$pg->close_prepared($name, sub { my ($result, $err) = @_; });
Closes (deallocates) a prepared statement at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success. Works in pipeline mode, unlike DEALLOCATE SQL.
close_portal
$pg->close_portal($name, sub { my ($result, $err) = @_; });
Closes a portal at protocol level (requires libpq >= 17). The callback receives an empty string ("") on success.
cancel
my $err = $pg->cancel;
Sends a cancel request using the legacy PQcancel API. This is a blocking call. Returns undef on success, an error string on failure.
cancel_async
$pg->cancel_async(sub { my ($err) = @_; });
Sends an asynchronous cancel request using the PQcancelConn API (requires libpq >= 17). The callback receives no arguments on success, or an error string on failure. Croaks if libpq was built without async cancel support (LIBPQ_HAS_ASYNC_CANCEL).
pending_count
my $n = $pg->pending_count;
Returns the number of callbacks in the queue.
skip_pending
$pg->skip_pending;
Cancels all pending callbacks, invoking each with (undef, "skipped").
PIPELINE METHODS
enter_pipeline
$pg->enter_pipeline;
Enters pipeline mode. Queries are batched and sent without waiting for individual results.
exit_pipeline
$pg->exit_pipeline;
Exits pipeline mode. Croaks if the pipeline is not idle (has pending queries).
pipeline_sync
$pg->pipeline_sync(sub { my ($ok) = @_; });
Sends a pipeline sync point. The callback fires with (1) when all preceding queries have completed. Alias: sync.
send_pipeline_sync
$pg->send_pipeline_sync(sub { my ($ok) = @_; });
Like pipeline_sync but does not flush the send buffer (requires libpq >= 17). Useful for batching multiple sync points before a single manual flush via send_flush_request.
send_flush_request
$pg->send_flush_request;
Sends a flush request, asking the server to deliver results for queries sent so far. Alias: flush.
pipeline_status
my $st = $pg->pipeline_status;
Returns PQ_PIPELINE_OFF, PQ_PIPELINE_ON, or PQ_PIPELINE_ABORTED.
COPY METHODS
put_copy_data
my $ok = $pg->put_copy_data($data);
Sends data to the server during a COPY IN operation. Returns 1 on success (data flushed or flush scheduled), 0 if the send buffer is full (wait for writability and retry), or -1 on error.
put_copy_end
my $ok = $pg->put_copy_end;
my $ok = $pg->put_copy_end($errmsg);
Ends a COPY IN operation. Pass an error message to abort the COPY. Returns 1 on success, 0 if the send buffer is full (retry after writability), or -1 on error.
get_copy_data
my $row = $pg->get_copy_data;
Retrieves a row during COPY OUT. Returns the row data as a string, -1 when the COPY is complete, or undef if no data is available yet.
HANDLER METHODS
Each handler method is a getter/setter. Called with an argument, it sets the handler and returns the new value (or undef if cleared). Called without arguments, it returns the current handler.
on_connect
Called with no arguments on successful connection.
on_error
Called as ($error_message) on connection-level errors.
on_notify
Called as ($channel, $payload, $backend_pid) on LISTEN/NOTIFY.
on_notice
Called as ($message) on server notice/warning messages.
on_drain
Called with no arguments when the libpq send buffer has been fully flushed during a COPY IN operation. Use this to resume sending data after put_copy_data returns 0 (buffer full).
CONNECTION INFO
String accessors (db, user, host, port, error_message, parameter_status, ssl_attribute) return undef when not connected. Integer accessors return a default value (typically 0 or -1). Methods that require an active connection (client_encoding, set_client_encoding, set_error_verbosity, set_error_context_visibility, conninfo) croak when not connected.
error_message
Last error message. Alias: errstr.
transaction_status
Returns PQTRANS_IDLE, PQTRANS_ACTIVE, PQTRANS_INTRANS, PQTRANS_INERROR, or PQTRANS_UNKNOWN. Alias: txn_status.
parameter_status
my $val = $pg->parameter_status($name);
Returns a server parameter (e.g. "server_version", "client_encoding").
backend_pid
Backend process ID. Alias: pid.
server_version
Server version as an integer (e.g. 180000 for 18.0).
protocol_version
Protocol version (typically 3).
db
Database name.
user
Connected user name.
host
Server host.
hostaddr
Server IP address.
port
Server port.
socket
The underlying file descriptor.
ssl_in_use
Returns 1 if the connection uses SSL.
ssl_attribute
my $val = $pg->ssl_attribute($name);
Returns an SSL attribute (e.g. "protocol", "cipher").
ssl_attribute_names
my $names = $pg->ssl_attribute_names;
Returns an arrayref of available SSL attribute names, or undef if the connection does not use SSL.
client_encoding
Returns the current client encoding name.
set_client_encoding
$pg->set_client_encoding($encoding);
Sets the client encoding (e.g. "UTF8", "SQL_ASCII"). This is a synchronous (blocking) call that stalls the event loop for one server round trip. Best called right after on_connect fires, before any queries are dispatched. Croaks if there are pending queries or on failure.
set_error_verbosity
my $old = $pg->set_error_verbosity($level);
Sets error verbosity. Returns the previous setting.
set_error_context_visibility
my $old = $pg->set_error_context_visibility($level);
Sets error context visibility. $level is one of PQSHOW_CONTEXT_NEVER, PQSHOW_CONTEXT_ERRORS (default), or PQSHOW_CONTEXT_ALWAYS. Returns the previous setting.
error_fields
my $fields = $pg->error_fields;
Returns a hashref of structured error fields from the most recent PGRES_FATAL_ERROR result, or undef if no error has occurred. Keys (present only when non-NULL in the server response):
sqlstate severity primary detail
hint position context schema
table column datatype constraint
internal_position internal_query
source_file source_line source_function
result_meta
my $meta = $pg->result_meta;
Returns a hashref of metadata from the most recent query result, or undef if no result has been delivered. Keys:
nfields number of columns
cmd_status command status string (e.g. "SELECT 3", "INSERT 0 1")
inserted_oid OID of inserted row (only present when valid)
fields arrayref of column metadata hashrefs:
name, type (OID), ftable (OID), ftablecol,
fformat (0=text, 1=binary), fsize, fmod
conninfo
my $info = $pg->conninfo;
Returns a hashref of the connection parameters actually used by the live connection (keyword => value pairs).
connection_used_password
my $bool = $pg->connection_used_password;
Returns 1 if the connection authenticated with a password.
connection_used_gssapi
my $bool = $pg->connection_used_gssapi;
Returns 1 if the connection used GSSAPI authentication.
connection_needs_password
my $bool = $pg->connection_needs_password;
Returns 1 if the server requested a password during authentication.
trace
$pg->trace($filename);
Enables libpq protocol tracing to the specified file. Useful for debugging wire-level issues.
untrace
$pg->untrace;
Disables protocol tracing and closes the trace file.
set_trace_flags
$pg->set_trace_flags($flags);
Sets trace output flags (requires libpq >= 14). $flags is a bitmask of PQTRACE_SUPPRESS_TIMESTAMPS and PQTRACE_REGRESS_MODE.
UTILITY METHODS
escape_literal
my $quoted = $pg->escape_literal($string);
Returns a string literal escaped for use in SQL. Alias: quote.
escape_identifier
my $quoted = $pg->escape_identifier($string);
Returns an identifier escaped for use in SQL. Alias: quote_id.
escape_bytea
my $escaped = $pg->escape_bytea($binary);
Escapes binary data for use in a bytea column.
encrypt_password
my $hash = $pg->encrypt_password($password, $user);
my $hash = $pg->encrypt_password($password, $user, $algorithm);
Encrypts a password for use with ALTER ROLE ... PASSWORD. $algorithm is optional; defaults to the server's password_encryption setting (typically "scram-sha-256").
unescape_bytea
my $binary = EV::Pg->unescape_bytea($escaped);
Class method. Unescapes bytea data.
lib_version
my $ver = EV::Pg->lib_version;
Class method. Returns the libpq version as an integer.
conninfo_parse
my $params = EV::Pg->conninfo_parse($conninfo);
Class method. Parses a connection string and returns a hashref of the recognized keyword/value pairs. Croaks if the string is invalid. Useful for validating connection strings before connecting.
ALIASES
Short aliases for common methods:
q query
qp query_params
qx query_prepared
prep prepare
reconnect reset
disconnect finish
flush send_flush_request
sync pipeline_sync
quote escape_literal
quote_id escape_identifier
errstr error_message
txn_status transaction_status
pid backend_pid
EXPORT TAGS
:status PGRES_* result status constants
:conn CONNECTION_OK, CONNECTION_BAD
:transaction PQTRANS_* transaction status constants
:pipeline PQ_PIPELINE_* pipeline status constants
:verbosity PQERRORS_* verbosity constants
:context PQSHOW_CONTEXT_* context visibility constants
:trace PQTRACE_* trace flag constants
:all all of the above
BENCHMARK
500k queries over Unix socket, PostgreSQL 18, libpq 18:
Workload EV::Pg sequential EV::Pg pipeline DBD::Pg sync DBD::Pg async+EV
SELECT 73,109 q/s 124,092 q/s 56,496 q/s 48,744 q/s
INSERT 58,534 q/s 84,467 q/s 39,068 q/s 41,559 q/s
UPSERT 26,342 q/s 34,223 q/s 28,134 q/s 27,155 q/s
Sequential mode uses prepared statements (parse once, bind+execute per call). Pipeline mode batches queries with pipeline_sync every 1000 queries. See bench/bench.pl to reproduce.
REQUIREMENTS
libpq >= 14 (PostgreSQL client library) and EV. Some features (chunked rows, close prepared/portal, no-flush pipeline sync, async cancel) require libpq >= 17.
SEE ALSO
EV, DBD::Pg, Mojo::Pg, AnyEvent::Pg
LICENSE
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.