NAME
EV::ClickHouse - Async ClickHouse client using EV
SYNOPSIS
use EV;
use EV::ClickHouse;
# Discrete parameters
my $ch = EV::ClickHouse->new(
host => '127.0.0.1',
port => 9000,
protocol => 'native', # or 'http'
user => 'default',
password => '',
database => 'default',
settings => { max_threads => 4 }, # connection-level defaults
on_connect => sub { print "connected\n" },
on_error => sub { warn "error: $_[0]\n" },
);
# Or via URI: clickhouse[+native]://user:pass@host:port/db?key=val
my $ch = EV::ClickHouse->new(
uri => 'clickhouse+native://default:@127.0.0.1:9000/default',
on_connect => sub { ... },
);
# select
$ch->query("select number from system.numbers limit 3", sub {
my ($rows, $err) = @_;
die $err if $err;
print "row: @$_\n" for @$rows; # row: 0 / row: 1 / row: 2
});
# Per-query settings + parameterized values (no string interpolation)
$ch->query(
"select {x:UInt32} + {y:UInt32} as sum",
{ params => { x => 40, y => 2 }, max_execution_time => 30 },
sub { my ($rows, $err) = @_; print $rows->[0][0], "\n" }, # 42
);
# insert - arrayref of rows (no TSV escaping needed)
$ch->insert("my_table", [
[1, "hello\tworld"], # embedded tab is fine
[2, undef], # null
[3, [10, 20]], # Array column
], sub { my (undef, $err) = @_; warn "insert: $err" if $err });
# insert - pre-formatted TSV string
$ch->insert("my_table", "1\tfoo\n2\tbar\n", sub { ... });
# Raw HTTP response body (HTTP only)
$ch->query("select * from t format CSV", { raw => 1 }, sub {
my ($body, $err) = @_;
print $body;
});
EV::run;
DESCRIPTION
EV::ClickHouse is an asynchronous ClickHouse client that integrates with the EV event loop. It speaks both the ClickHouse HTTP protocol (port 8123) and the native TCP protocol (port 9000) directly in XS, with no external ClickHouse client library linked. zlib is required; OpenSSL (for TLS) and liblz4 (for native compression) are optional and detected at build time.
Features
- HTTP and native TCP protocols, with the same Perl API
- gzip compression (HTTP) and LZ4 compression with CityHash checksums (native)
- TLS/SSL via OpenSSL, with optional
tls_skip_verifyfor self-signed certs andtls_ca_filefor additional roots - Connection URIs (
clickhouse[+native]://user:pass@host:port/db), including bracketed IPv6 literals - Per-query and connection-level ClickHouse settings; parameterized
queries via
params; external tables (native) viaexternal - Auto-reconnect with exponential backoff; queued (unsent) queries are preserved across reconnects
- Keepalive pings for idle native connections; graceful drain; query cancellation and skip_pending
- Streaming results via
on_dataper-block callback (native); on_progress for native progress packets - Raw HTTP response mode for CSV / JSONEachRow / Parquet / etc.
- 35+ ClickHouse types including Int/UInt 8..256, Float32/64, BFloat16, Decimal32/64/128/256, UUID, IPv4/IPv6, Nullable, Array, Tuple, Map, LowCardinality (with cross-block dictionaries), SimpleAggregateFunction, Nested, Geo (Point/Ring/LineString/Polygon and the Multi variants), and JSON / Object('json') with auto-flattened hashref leaves (Int64/Float64/Bool/String + Array variants).
- Opt-in decode of Date/DateTime, Decimal, and Enum columns; named-rows (hashref) mode
CONSTRUCTOR
new
my $ch = EV::ClickHouse->new(%args);
The connection is initiated immediately; new returns before it
completes. Queries issued before on_connect fires are queued and
dispatched once the connection is ready.
Connection parameters:
-
uri => $uri_string
Single-string connection target:
clickhouse[+native]://user:pass@host:port/database?key=value.The
+nativesuffix selects the native protocol; otherwise HTTP is used. Hostnames, IPv4 addresses, and bracketed IPv6 literals are all accepted (e.g.clickhouse://[::1]:9000/db). Query-string values are merged into the constructor arguments. Discretehost,port, etc. arguments override the URI. -
host => $hostname
Server hostname. Default:
127.0.0.1.Note: DNS resolution is blocking unless EV::cares is installed. With EV::cares available, hostnames are resolved off-loop at construct time (the constructor returns immediately, queries queue until the resolved address is connected). Falls back to blocking
getaddrinfootherwise. -
hosts => [$h1, $h2, ...]
Multi-host failover list. Each entry is
host,host:port, or a bracketed-IPv6 literal. On a connect-phase failure (refused, timeout, ServerHello stall), the client advances to the next host in round-robin order; pair withauto_reconnect => 1for automatic recovery. The singlehostargument is honoured as a fallback whenhostsisn't given. -
port => $port
Server port. Default:
8123(HTTP),9000(native). -
protocol => 'http' | 'native'
Protocol to use. Default:
http. -
user => $username
Username. Default:
default. -
password => $password
Password. Default: empty.
-
database => $dbname
Default database. Default:
default. The shorter aliasdbis also accepted. -
tls => 0 | 1
Enable TLS. Default:
0. Requires the module to be built with OpenSSL (otherwise the constructor croaks). -
tls_ca_file => $path
Additional CA certificate file for TLS verification, used alongside the system trust store.
-
tls_cert_file => $path, tls_key_file => $path
PEM-encoded client certificate and matching private key for mutual TLS (mTLS). Both must be set together. The client certificate is sent during the TLS handshake; the server's trust chain decides whether to accept it. Required by managed ClickHouse offerings (Aiven, Altinity Cloud) that enforce cert-based auth. The private key must match the public key in the certificate; the constructor errors out at handshake time with
"TLS client cert / private key mismatch"otherwise. -
tls_skip_verify => 0 | 1
Skip TLS certificate verification. Default:
0. Useful in development with self-signed certs; do not use in production. -
loop => $ev_loop
EV event loop object. Default:
EV::default_loop.
Callbacks:
-
on_connect => sub { }
Called once the connection is fully established (after the native ServerHello, or after the TCP/TLS handshake for HTTP).
-
on_error => sub { my ($message) = @_ }
Called on connection-level errors (DNS failure, socket error, TLS failure, read/write errors, etc.). Default:
sub { die @_ }. Per-query errors are delivered to the query's own callback as the second argument; they do not invokeon_error.When a connection drops mid-flight,
on_errorfires first with the underlying cause, andon_disconnectfires immediately after as the state machine tears the socket down. Ifauto_reconnectis set, the reconnect attempt happens afteron_disconnectreturns.It is safe to call
reset(orreconnect) from insideon_error- the freshly-armed socket survives the outer teardown that would otherwise close it. Use this for custom recovery logic (e.g. switching to a backup host on specific errors). -
on_progress => sub { my ($rows, $bytes, $total_rows, $written_rows, $written_bytes) = @_ }
Called on native protocol progress packets. Not fired for HTTP.
-
on_disconnect => sub { }
Called when an established connection closes (by
finish, server disconnect, or mid-flight error). Only fires ifon_connecthad previously fired - it does not fire for connect-phase failures (refused, timeout, ServerHello stall) since no connection was ever established. Fires after internal state has been reset, so it is safe to queue new queries or callresetfrom inside the handler. -
on_trace => sub { my ($message) = @_ }
Debug trace callback. Called with internal state-machine messages (connect, dispatch, disconnect). Useful for diagnosing protocol issues.
-
on_failover => sub { my ($old_host, $old_port, $new_host, $new_port, $msg) = @_ }
Multi-host only. Fires after the failover wrapper rotates to the next host in the
hosts => [...]list, with the old and new (host, port) pair plus the triggering error message. Use it for metrics ("which host am I on?") or to log host transitions. Fires before the user'son_error.
Options:
-
compress => 0 | 1
Enable compression: gzip on HTTP (request and response), LZ4 with CityHash checksums on the native protocol. Default:
0. Native compression requires liblz4 at build time. -
session_id => $id
HTTP session id for stateful operations (temporary tables, SET, etc.). Native protocol has stateful sessions intrinsically; this option is HTTP-only.
-
connect_timeout => $seconds
TCP/TLS connection timeout.
0(default) means no timeout. Floating point allowed. -
query_timeout => $seconds
Default per-query timeout applied to every query and insert. The query callback receives a
timeouterror if exceeded. Override per-call via thequery_timeoutkey in the settings hashref. -
max_query_size => $bytes
Client-side guard: croak before sending any query whose SQL text exceeds this many bytes.
0(default) disables the check. Useful as a last-resort defense against accidentally sending unbounded strings. -
max_recv_buffer => $bytes
Defensive ceiling on the response. The cap applies to the raw recv buffer (every protocol), the chunked-decoded body (HTTP), and the gzip-decompressed body (HTTP), so the same upper bound applies to the user-visible payload regardless of transport encoding. On overflow the query callback receives an appropriate error ("recv buffer overflow", "chunked response too large", or "gzip body exceeds max_recv_buffer") and the connection is torn down so no subsequent query can slip past the cap on the same socket.
0(default) keeps the historical no-cap behaviour (still bounded internally by a hard 128 MB ceiling on compressed paths). Recommended in production when the schema is constrained and you want a hard upper bound (e.g.128 * 1024 * 1024for 128 MB). -
http_basic_auth => 0 | 1
HTTP only. When set, send credentials as
Authorization: Basic base64(user:password)instead of the defaultX-ClickHouse-User/X-ClickHouse-Keyheader pair. Use this when the connection passes through an HTTP gateway (nginx, Envoy, ...) that strips the X-ClickHouse-* headers but forwards Basic auth verbatim. Default:0. -
auto_reconnect => 0 | 1
Reconnect automatically on connection loss. Default:
0. When enabled, queued (unsent) queries are preserved across reconnects; in-flight queries receive an error.The reconnect path covers TCP/TLS connect failures,
connect_timeoutorquery_timeoutexpiry, and any clean server-side EOF (idle or mid-request). Mid-query I/O errors (ECONNRESET / EPIPE) and a malformed native ServerHello are not retried - they typically indicate a misconfigured peer or client-side bug that retry would only loop on. Combine withreconnect_max_attemptsfor an explicit ceiling. -
settings => \%hash
ClickHouse settings applied to every query and insert. Per-call settings (see "query", "insert") override these.
settings => { async_insert => 1, max_threads => 4 } -
keepalive => $seconds
Send a keepalive request every N seconds while the connection is idle: a native CLIENT_PING on the native protocol or a
GET /pingon HTTP (some load balancers / NATs drop idle HTTP connections after a few seconds; TCP-level keepalive is too coarse). Default:0(disabled). -
reconnect_delay => $seconds
Initial delay for the
auto_reconnectexponential backoff. Each failed attempt doubles the delay, capped atreconnect_max_delay. Default:0(immediate retry, no backoff). -
reconnect_max_delay => $seconds
Backoff ceiling. Default:
0, meaning no explicit cap; the implementation still bounds the backoff exponent at 20 doublings, so withreconnect_delay = 0.5the worst case is roughly 6 days. Setting an explicit ceiling is recommended in production. -
reconnect_jitter => $fraction
Multiplicative jitter applied to each backoff delay: the actual sleep is uniformly random in
[delay, delay * (1 + jitter)].0(default) disables. Set to0.1-0.5when many clients reconnect against a shared cluster - without jitter, every replica restart causes a synchronised reconnect storm at the same backoff intervals. Jitter is applied afterreconnect_max_delayclamping, then re-clamped, so the ceiling is never exceeded. -
reconnect_max_attempts => $N
Cap the total number of reconnect attempts before giving up. Once the cap is reached,
on_errorfires with the message"max reconnect attempts exceeded"and no further attempts are made (the user can manually callresetlater). Default:0(unlimited retries; be careful with permanent failures like wrong host). -
progress_period => $seconds
Coalesce
on_progresspackets so the callback fires at most once per N seconds, with the per-field counters accumulated over the interval. Useful for big SELECTs where the server can emit hundreds of progress packets per second. Default:0(fire on every packet). -
query_log_comment => 1 | $string
Prepend a SQL block comment to every query for
system.query_logtraceability.1auto-generatesev_ch user=$ENV{USER} pid=$$; a string is taken literally. Omit (or pass a falsy value) to disable. Embedded*/sequences are escaped to keep the comment well-formed.
Decode options (native protocol only):
These shape how column values are returned. All are opt-in and default
to 0, which returns raw numeric forms for stable round-tripping.
-
decode_datetime => 0 | 1
Return
Date,Date32,DateTime, andDateTime64as formatted strings (e.g."2024-01-15","2024-01-15 10:30:00") instead of raw integers. Uses UTC; columns with an explicit timezone (DateTime('America/New_York')) are converted to that zone. -
decode_decimal => 0 | 1
Return
Decimal32/Decimal64/Decimal128as scaled floating-point numbers instead of unscaled integers. Note: at large precisions, double loses bits, so leave disabled if you need exact arithmetic. -
decode_enum => 0 | 1
Return
Enum8/Enum16as string labels instead of numeric codes. -
named_rows => 0 | 1
Return each row as a hashref keyed by column name instead of an arrayref.
my $ch = EV::ClickHouse->new(named_rows => 1, ...); $ch->query("select 1 as n", sub { my ($rows, $err) = @_; print $rows->[0]{n}; # 1 });
METHODS
query
$ch->query($sql, sub { my ($rows, $err) = @_ });
$ch->query($sql, \%settings, sub { my ($rows, $err) = @_ });
Executes a SQL statement. The callback receives:
($arrayref_of_arrayrefs)for select with at least one row(undef)for DDL/DML on success and for select with zero rows (both protocols). When in doubt, treatundefand[]equivalently withmy @rows = @{$rows // []};.(undef, $error_message)on error (server exception or connection error)
The optional \%settings hashref passes per-query ClickHouse settings
(max_execution_time, max_threads, async_insert, etc.), overriding
connection-level defaults.
The following keys are intercepted by the client and not sent verbatim to the server:
-
params => \%hashParameterized values for
{name:Type}placeholders in the SQL. Encoding and quoting is the server's job, so values do not need escaping:$ch->query( "select * from t where id = {id:UInt64} and name = {n:String}", { params => { id => 42, n => "O'Brien" } }, sub { ... }, );Works on both protocols (HTTP uses URL-encoded
param_*query string; native uses dedicated wire fields). -
query_id => $stringSet the protocol-level query identifier. Retrievable later via "last_query_id".
-
raw => 1HTTP only. The callback receives the raw response body as a scalar string instead of parsed rows. Use with an explicit
formatclause:$ch->query("select * from t format CSV", { raw => 1 }, sub { my ($body, $err) = @_; });Croaks if used with the native protocol.
-
query_timeout => $secondsPer-query timeout, overriding the connection-level
query_timeout. -
on_data => sub { my ($rows) = @_; ... }Native protocol only. A code ref called for each data block as it arrives, for streaming large result sets. Rows are delivered incrementally and not accumulated, so the final callback receives
(undef)rather than all rows. The final callback always fires on completion or error, even if no data block was emitted (empty result, server-side error before the first block).$ch->query("select * from big_table", { on_data => sub { my ($rows) = @_; process_batch($rows) } }, sub { my (undef, $err) = @_; warn $err if $err }, ); -
external => \%tablesNative protocol only. Ships one or more in-memory data blocks that the query can reference as tables, JOIN against, or filter with
IN- without creating a server-side temporary table. Each entry maps a table name to{ structure => [...], data => [...] }:$ch->query( "select u.id, u.name from users u where u.id in _wanted", { external => { _wanted => { structure => [ id => 'UInt64' ], data => [ [7], [42], [911] ], }, } }, sub { my ($rows, $err) = @_; ... }, );structureis a flat list ofname => typepairs (ClickHouse type names, e.g.UInt64,String,Float64);datais an arrayref of row arrayrefs, encoded with the same type machinery as "insert". An emptydataarrayref is a valid zero-row table. Several external tables may be supplied at once. Croaks on the HTTP protocol or on a malformed spec (odd structure list, non-arrayref row, or a column type that cannot be encoded).
Native protocol type notes: values come back as typed Perl scalars.
By default Date/DateTime are integers (days since epoch / Unix
timestamps); enable decode_datetime for strings. Enum values are
numeric codes; decode_enum returns labels. Decimal values are
unscaled integers; decode_decimal scales them to floats.
SimpleAggregateFunction is transparently decoded as its inner type.
Nested columns become arrays of tuples. LowCardinality works
correctly across multi-block results with shared dictionaries.
insert
$ch->insert($table, $data, sub { my (undef, $err) = @_ });
$ch->insert($table, $data, \%settings, sub { my (undef, $err) = @_ });
$data may be either:
- A pre-formatted TabSeparated string (tabs separate columns, newlines separate rows, with the standard ClickHouse escapes).
- An arrayref of arrayrefs (rows of column values).
When using arrayrefs, no TSV escaping is needed: undef maps to null
and strings may contain tabs and newlines freely.
Nested arrayrefs (Array/Tuple columns) and hashrefs (Map columns) are supported only on the native protocol, where the encoder has the column type from the server's sample block. On HTTP the same call croaks rather than silently produce malformed TSV; use the native protocol or pre-serialise nested types into ClickHouse TSV literal form.
# Native: nested types encode directly.
$ch->insert("my_table", [
[1, "hello\tworld"], # embedded tab
[2, undef], # null
[3, [10, 20]], # Array column (native only)
[4, { a => 1, b => 2 }], # Map column (native only)
], sub { ... });
The optional \%settings hashref works exactly as in "query",
including query_id, query_timeout, and params. Two extra
flags are recognised here:
-
idempotent => 1 | $tokenAuto-mints (or uses the supplied)
insert_deduplication_token, so a reconnect-driven retry of the same insert doesn't double-write. Falsy values are a no-op. -
async_insert => 1Enables ClickHouse server-side insert batching by setting
async_insert=1, wait_for_async_insert=0. Both sub-settings can be overridden by passing them explicitly.
ping
$ch->ping(sub { my ($result, $err) = @_ });
Send a no-op round trip to verify the connection is alive. On success
$result is true, $err is undef. On error: (undef, $error).
is_healthy
$ch->is_healthy(sub { my ($ok, $err) = @_ });
$ch->is_healthy(sub { ... }, $timeout_seconds);
Bounded health probe: wraps "ping" with a deadline (default 5s). The
callback receives (1, undef) on a successful round trip, or
(0, $msg) on ping error or timeout. Failure does not tear down the
connection; recovery (reset, host rotation, etc.) is the caller's
choice. Useful for L4 load-balancer probes and self-monitoring loops.
ping_round_trip
$ch->ping_round_trip(sub {
my ($seconds, $err) = @_;
die "ping: $err" if $err;
printf "rtt = %.3fms\n", $seconds * 1000;
});
Issue a single PING and report wall-clock latency in seconds. Lighter
than installing "track_query_durations" for a one-shot probe;
returns (undef, $err) on transport failure. Pairs well with
"is_healthy" for health-check endpoints that want both liveness and
latency.
slow_query_log
my $prev = $ch->slow_query_log(0.1, sub {
my ($qid, $rows, $bytes, $code, $dur, $err) = @_;
warn sprintf("SLOW %.3fs %s\n", $dur, $qid // '?');
});
Filtered variant of "on_query_complete" that fires only when the
query took at least $threshold seconds. Returns the previous
on_query_complete so the caller can restore it. The previous
handler is also chained on every call, so installing this on top of
existing instrumentation is safe.
server_setting
$ch->server_setting('max_threads', sub {
my ($value, $err) = @_;
warn "max_threads = $value\n";
});
Looks one value up from system.settings. Convenient one-liner for
"what's the server's effective $x?". Returns undef via the
callback if the setting name isn't present on this server.
row_count
$ch->row_count('events', sub { ... });
$ch->row_count('events', "ts > now() - interval 1 hour", sub { ... });
select count() from $table [where $where]. $where is interpolated
literally; use parameterized predicates via the "query" params
mechanism for user-supplied filters. Returns the row count or
(undef, $err).
table_size
$ch->table_size('events', sub {
my ($info, $err) = @_;
# $info = { rows => N, bytes_on_disk => N, data_uncompressed_bytes => N }
});
Sums system.parts for the (optionally database-qualified) table
and returns a hashref. Active parts only - does not count detached
parts. Suitable for ops dashboards; not authoritative for per-row
billing (parts may double-count rows during MERGE).
ddl
$ch->ddl("create table t (n UInt32) engine=Memory", sub {
my (undef, $err) = @_; die "ddl: $err" if $err;
});
Strict variant of "query" for DDL/DML. Identical wire behaviour; the separate name is a readability marker for migration scripts so the intent of each call is obvious at the call site.
dictionary_reload
$ch->dictionary_reload('my_dict', $cb);
Shortcut for system reload dictionary my_dict. The dictionary
name is validated against [A-Za-z_][A-Za-z0-9_]*(?:\.[A-Za-z_][A-Za-z0-9_]*)?
before splicing into the SQL.
refresh_view
$ch->refresh_view('mv_aggregated_hourly', $cb);
Shortcut for system refresh view mv_aggregated_hourly (requires
ClickHouse 23.12+). Same name validation as "dictionary_reload".
wait_mutation
$ch->query("alter table events update tag = 'x' where id = 7", sub {
my (undef, $err) = @_;
die $err if $err;
$ch->wait_mutation('events', sub {
my ($info, $err) = @_;
die $err if $err; # a mutation failed, or timed out
print "mutation done\n";
}, poll => 0.5, timeout => 30);
});
ALTER TABLE ... UPDATE/DELETE runs asynchronously on the server;
the statement returns before the mutation has been applied.
wait_mutation polls system.mutations until the table has no
incomplete mutations, then fires $cb->({ pending => 0 }).
A mutation that keeps failing stays incomplete with a
latest_fail_reason - once that reason persists across consecutive
polls (a single transient failure that the mutation's next retry
clears is tolerated) it is surfaced as
$cb->(undef, "wait_mutation: ..."). Options:
poll- seconds between polls (default1).timeout- give up after N seconds, delivering a timeout error. Omitted by default (polls indefinitely).mutation_id- wait only for one specific mutation rather than every incomplete mutation on the table.
Table-name validation matches "for_table"; a db.table name also
filters system.mutations by database.
parse_uri
my $parsed = EV::ClickHouse->parse_uri(
'clickhouse+native://u:p@host:9000/db?compress=1'
);
# $parsed = {
# protocol => 'native', host => 'host', port => 9000,
# user => 'u', password => 'p', database => 'db',
# compress => '1',
# };
my $ch = EV::ClickHouse->new(%$parsed, on_connect => sub { ... });
Class method that parses a ClickHouse URI into a hash, returning
undef if the URI doesn't match the expected shape. Lets tooling
validate user-supplied URIs without opening a connection. Query-
string keys are flattened to top-level args so the result drops
straight into "new"; matches the inline URI parser there.
is_retryable_error
EV::ClickHouse->is_retryable_error($code) # class method
$ch->is_retryable_error($code) # also works on instance
Returns true if the given ClickHouse error code (as reported by
"last_error_code" or the per-query $err argument's prefix) is a
common transient failure that warrants automatic retry: timeouts,
network errors, memory pressure, replica catch-up, keeper exceptions,
etc. Authoritative-looking source list curated against ClickHouse's
src/Common/ErrorCodes.cpp; expect the set to grow conservatively.
$ch->query($sql, sub {
my ($r, $err) = @_;
if ($err && EV::ClickHouse->is_retryable_error($ch->last_error_code)) {
schedule_retry($sql);
}
});
server_supports
$ch->server_supports($feature_name)
Returns true if the live native server's protocol revision is high enough to support the given feature. Feature names map to documented protocol-revision thresholds so user code can branch cleanly on capability instead of hard-coding revision numbers. Supported names:
block_info 51903 block_info packet in DATA blocks
server_display_name 54372 ServerHello carries display name
version_patch 54401 ServerHello carries patch version
progress_writes 54420 Progress packets include write counters
server_timezone 54423 Server timezone string in ServerHello
addendum 54458 Native ClientHello addendum block
HTTP connections have no protocol revision (server_revision is 0),
so server_supports returns false on HTTP for any feature. Unknown
feature names also return false. Use server_revision directly if you
need the raw integer.
for_table
$ch->for_table('events', sub {
my ($info, $err) = @_;
die $err if $err;
for my $col (@{ $info->{columns} }) {
printf "%-20s %s\n", $col->{name}, $col->{type};
}
});
Schema introspection: issues describe table $name and delivers
{ columns => [{name=>..., type=>...}, ...] } to the
callback. Useful for generic insert pipelines that need column types
without hard-coding them. $name may be table or db.table;
non-identifier characters are rejected up-front.
iterate
my $it = $ch->iterate("select number from numbers(1_000_000)");
while (my $batch = $it->next($timeout)) {
process($_) for @$batch;
}
die $it->error if $it->error;
Native protocol only - relies on the per-block on_data hook and
will croak if invoked on an HTTP connection.
Synchronous-feeling pull iterator over a streaming select. Internally
wraps the native on_data per-block callback and drives the EV loop
from inside ->next until the next block arrives, the query
completes, or the optional timeout (seconds) expires. Useful for
procedural ETL / export code that doesn't fit a callback shape.
->error, ->is_done, and ->cancel are also
available on the returned iterator object.
on_log
$ch->on_log(sub {
my ($entry) = @_;
# $entry: { event_time, host_name, query_id, thread_id,
# priority, source, text }
printf "[CH %s] %s\n", $entry->{priority}, $entry->{text};
});
Native protocol only. Fires once per row inside any SERVER_LOG
packet the server emits. Useful for surfacing
send_logs_level => 'information' server-side trace events to
the application's own log stream without polling system.text_log.
The row hash keys mirror the server-side log block schema; missing
keys (older revisions) come through as undef.
on_query_start
$ch->on_query_start(sub {
my ($query_id) = @_;
log_metric_start($query_id);
});
Optional connection-level hook that fires the moment a query is
dispatched to the wire (after the query_id has been resolved, before
the first send byte). Symmetric with "on_query_complete"; useful for
deriving accurate "query in flight" durations without depending on
the per-query callback closure. Keepalive PINGs are suppressed, the
same as for on_query_complete. Also accepted as a constructor
argument.
on_query_complete
$ch->on_query_complete(sub {
my ($query_id, $rows, $bytes, $error_code, $duration_s, $err) = @_;
log_metric(...);
});
Optional connection-level hook that fires after every query (success or error). Arguments: query_id (or undef), profile_rows, profile_bytes, last_error_code, wall-clock duration in seconds, error message (or undef). Useful for statsd/Prometheus-style instrumentation. Also accepted as a constructor argument.
A per-query override may be passed in the \%settings hashref of
"query" or "insert". When set, it replaces (does not augment)
the connection-level handler for that single call, so per-query
instrumentation doesn't double-count against global metrics:
$ch->query(
$sql,
{ on_query_complete => sub {
my ($qid, $rows, $bytes, $code, $dur, $err) = @_;
record_slow_query($qid, $dur);
} },
$cb,
);
insert_streamer
my $s = $ch->insert_streamer('events',
batch_size => 5_000,
settings => { query_id => 'ingest-1' }, # optional
on_batch_error => sub { warn "batch err: $_[0]" }, # per-failure
);
while (my $row = next_event()) {
$s->push_row($row);
}
$s->finish(sub {
my (undef, $err) = @_;
die "ingest failed: $err" if $err;
});
Buffered streaming insert for ETL workloads. Rows are buffered until
batch_size is reached, then dispatched as a single insert().
Dispatches are serialised; push_row keeps buffering while a batch is
in flight (the native protocol cannot pipeline INSERTs). finish
flushes the remaining buffer and fires its callback once all batches
complete; if any batch failed the first error is delivered as
$err. The streamer also offers buffered_count and in_flight
accessors for backpressure logic.
$streamer->reset discards any rows still in the local buffer
and clears the sticky error so the streamer can be reused after a
permanent error (e.g. a schema fix). Does not touch the underlying
$ch - any batch already on the wire still completes normally. Any
callback registered via finish or await_drain that has not yet
fired is invoked with a 'streamer reset' error rather than being
silently dropped.
high_water + on_high_water trigger a one-shot notification when
the buffered row count crosses the watermark, intended as a hint to
slow the producer. Set high_water below batch_size; if
high_water > batch_size, the buffer drains via batch_size
flushes before the watermark is reached and on_high_water never
fires. When high_water == batch_size the watermark and the flush
threshold coincide: on_high_water fires once per batch, right after
the flush dispatches (so in_flight is already 1). The notification
re-arms only after the buffer drops below high_water.
$streamer->await_drain($cb) registers a callback that fires
once the buffer drops to low_water (default high_water / 2; 0
when high_water isn't set). Pairs with on_high_water to close
the backpressure loop:
my $s = $ch->insert_streamer($table,
batch_size => 5_000,
high_water => 10_000,
low_water => 3_000,
on_high_water => sub { $producer->pause },
);
$s->await_drain(sub { $producer->resume });
Fires synchronously if the buffer is already at/below low_water.
Re-arms each call: register again from inside the callback if you
want to keep watching. The callback receives one argument: undef
on a normal drain, or an error string if the streamer was reset
before the buffer drained (so my ($err) = @_ distinguishes them).
Named-row mode: pass columns => [@col_names] at construction
to accept hashref rows instead of positional arrayrefs. The streamer
reorders each pushed hash into the declared column order, so producer
code does not have to know where each column lives in the table.
my $s = $ch->insert_streamer('events',
columns => [qw(ts user_id action payload)],
batch_size => 5_000,
);
$s->push_row({ user_id => 7, action => 'click', ts => time });
$s->push_row([ 1735, 7, 'view', '...' ]); # arrayref still works
Hash keys missing from a row become undef; extra keys are ignored.
Mixing arrayref and hashref pushes is allowed.
$streamer->columns_from_table($cb) looks up the target table's
column list via "for_table" and stores it as the streamer's named-row
columns, so callers can construct a streamer without knowing the schema
in advance. The callback fires once the lookup completes (undef on
success, an error string on failure).
my $s = $ch->insert_streamer('events');
$s->columns_from_table(sub {
my ($err) = @_;
die "describe: $err" if $err;
$s->push_row({ ts => time, user_id => 7, action => 'click' });
...
});
insert_iter
$ch->insert_iter('events', sub {
# producer: return next row (arrayref or hashref) or undef when done
return undef unless my $row = next_event();
return $row;
}, sub {
my (undef, $err) = @_;
die "ingest: $err" if $err;
}, batch_size => 5_000, columns => [qw(ts user_id action payload)]);
Generator-driven insert. Internally wraps "insert_streamer" with an
EV::idle pump that calls $producer repeatedly, respecting
high_water for backpressure. undef from the producer signals
end-of-stream and triggers finish. %opts is forwarded to
"insert_streamer".
kill_query
$ch->kill_query($query_id, sub {
my ($rows, $err) = @_;
warn "kill: $err" if $err;
});
Shortcut for kill query where query_id = '...' sync. Validates
$query_id against [A-Za-z0-9_-]+ before interpolating into the
SQL. Pass async => 1 to send ASYNC instead of SYNC when
fire-and-forget semantics are wanted.
cancel_by_query_id
$ch->cancel_by_query_id($qid);
Race-safe local cancel: only triggers "cancel" if the connection's
current in-flight query (last_query_id) matches $qid, so the
caller can't accidentally kill a different query that has already
started in the meantime. Returns 1 if it cancelled, 0 if the id no
longer matched.
retry
$ch->retry($sql,
retries => 3,
backoff => 0.5, # initial delay; doubles each attempt
jitter => 0.25, # add 0..25% random jitter to each wait
cb => sub { my ($rows, $err) = @_; ... },
);
Retry a select (or any read-only statement) on the same connection
over exponential backoff, but only when the server-side error is in
the "is_retryable_error" set (timeouts, memory limits, replica catch-up,
etc.). The callback fires exactly once with the final result - either
the first success or the error from the last attempt. Per-attempt
settings => \%h are honoured. jitter => $fraction adds
up to $fraction * current_wait randomness on top of each
exponential step, so the spread scales with the backoff window.
Not for insert - a partial-write that the server logged but didn't
acknowledge will be re-applied; use idempotent => 1 on the
underlying "insert" instead.
insert_async
$ch->insert_async('events', \@rows, sub { ... });
$ch->insert_async('events', \@rows, sub { ... }, wait_for_flush => 0);
Ergonomic wrapper around server-side
async_insert => 1. Defaults to wait_for_flush => 1
(the callback fires after the batch has flushed); pass 0 for
fire-and-forget. Pass additional settings via settings => \%h.
insert_aggregated
$ch->insert_aggregated(
'metrics_agg',
agg_col => { func => 'uniqExact', args => ['UInt64'] },
key_cols => [qw(ts site)],
rows => [[1700000000, 'a', 42], ...],
cb => sub { my (undef, $err) = @_; ... },
);
Generic client-side state serialization for AggregateFunction
columns isn't feasible (each aggregator has its own binary format),
so this helper builds
insert into t (cols) select k as k, funcState(cast(v as T)) as agg union all ...
- one single-row select per row in rows, wrapping each
per-row value with the ${func}State combinator so the server
constructs the aggregate state. Validates table, column, and
function names; per-row values are quoted via a small SQL-literal
helper (numbers raw, strings escape-quoted, undef becomes NULL).
Only scalar leaf values are supported (no nested arrays).
bind_ident
my $sql = "select * from " . EV::ClickHouse->bind_ident($table);
Backtick-quote an identifier safely for SQL splicing. Accepts simple
or dotted (db.table) identifiers matching
[A-Za-z_][A-Za-z0-9_]*; anything else croaks. The regex rejects
backticks outright so no escaping is needed inside the result.
track_query_durations
$ch->track_query_durations(1024);
say $ch->query_duration_p(0.95);
Install a fixed-size ring buffer of recent query durations (sourced
from on_query_complete's $duration_s argument). Composes with a
user-supplied on_query_complete (preserved, called first). Pass
0 to disable and restore the previous handler. Subsequent calls
replace the previous ring size.
query_duration_p($p) returns the $p-quantile in seconds ($p
in [0,1]); query_duration_count returns the number of samples
currently buffered.
pending_queries
for my $q (@{ $ch->pending_queries }) {
printf "%s %s age=%.3fs\n",
$q->{state}, $q->{query_id} // '-', $q->{age};
}
Snapshot of pending queries: returns arrayref of hashrefs. The head
of the in-flight queue (if any) appears first with
state => 'in_flight', query_id => last_query_id, and
age => seconds since dispatch. Queued entries follow with
state => 'queued' and age => 0 (they have no dispatch
time yet). SQL/settings are not retained after enqueue and so are
not included.
dump_state
my $h = $ch->dump_state;
# { connected, connecting, dns_pending, pending_count,
# callback_depth, send_len/pos/cap, recv_len/cap, fd,
# protocol, server_revision, reconnect_attempts,
# host, port, send_count, compress, tls }
Read-only diagnostic snapshot of internal struct state. Intended for debugging stuck connections; field set may shift between releases (don't script against it in production).
for_json_paths
$ch->for_json_paths('events', 'payload', sub {
my ($paths, $err) = @_;
for my $p (@$paths) { say "$p->{path} : $p->{type}" }
});
Discovers the dynamic JSON path layout of a JSON/Object('json')
column. Internally walks the Map(String, String) returned by
JSONAllPathsWithTypes(col) with a single arrayJoin(mapKeys(m))
(the map alias is preserved so each path's type is correlated via a
second lookup), dedupes, sorts by path, and returns
[ { path => 'a.b.c', type => 'Int64' }, ... ]. Useful for
monitoring schema drift on weakly-typed columns.
EV::ClickHouse::Pool
my $pool = EV::ClickHouse::Pool->new(
host => 'ch', port => 9000, protocol => 'native',
size => 8, # other %args pass through to ::new
);
$pool->query($sql, $cb);
$pool->insert($table, $data, $cb);
$pool->drain(sub { ... }); # all connections drained
$pool->finish;
Built-in connection pool. Each member is an independent
EV::ClickHouse with its own auto_reconnect, send queue, and
in-flight callback queue, so a hung query on one connection doesn't
block the others. Dispatch picks the least-busy connection; ties are
broken round-robin.
The Pool exposes per-pick dispatch via query, insert, ping,
for_table, iterate, insert_streamer; aggregate stats via
size, pending_count, conns (the underlying connection list);
and broadcast lifecycle methods drain, finish, cancel,
skip_pending, reset (each affects every member because the state
they touch is owned per connection, not per query). The broadcast
cancel, skip_pending, and reset methods wrap each per-member
call in eval so a member that croaks doesn't abort the broadcast;
per-member errors are silently discarded (the surviving members still
receive the call). Iterate conns yourself if you need per-member
error handling.
$pool->with_each(sub { my ($conn, $idx) = @_; ... }) calls
$cb once per member, passing the connection object and its index.
Each per-member call is wrapped in eval so a single croak does not
abort the iteration; per-member errors are silently discarded - wrap
the body yourself if you need them. Useful for one-off per-member
work that doesn't justify a new broadcast method (e.g. resetting a
counter, asking each member for last_error_code, kicking off a
custom probe).
Queries that need server-side state (temporary tables, session variables) must use a single connection, not a Pool, since successive calls may land on different members.
$pool->with_session(sub { my ($conn, $release) = @_; ... })
checks out a least-busy member and "pins" it for the duration of the
callback: while pinned, _pick avoids that member when other
callers request a connection (it remains selectable as a fallback if
every other member is unavailable). The callback must call
$release->() when its multi-query sequence completes - typically
from the innermost query's callback so the pin lasts across the
async chain.
$pool->with_session(sub {
my ($ch, $release) = @_;
$ch->query("create temporary table t (n UInt32)", sub {
$ch->query("insert into t values (1),(2),(3)", sub {
$ch->query("select sum(n) from t", sub {
my ($rows) = @_;
say $rows->[0][0];
$release->();
});
});
});
});
$pool->query_to($idx, $sql, $cb) /
$pool->insert_to($idx, $table, $data, $cb) force-routes a
call to a specific member without going through _pick. Circuit
breaker observation still applies (success/failure is recorded
against that member). Useful for replica-targeted DDL, S3 ingest
that has to land on a chosen node, or sticky-affinity reads.
$pool->nominate($idx) returns the underlying connection so
subsequent calls bypass the pool entirely. Use sparingly - calls
made directly on the nominated connection don't update the
circuit-breaker state.
$pool->hedged_query($sql, hedge => 2, $cb) dispatches
the same select to hedge distinct random members and resolves
with whichever returns first. The callback receives
($rows, undef, $member_idx) on success (so callers can attribute
wins per member) or (undef, $err) if every member fails.
Extra completions after the winner are silently discarded.
Recommended for tail-latency-sensitive selects on replicated tables.
Do not use for insert - would silently double-write when the
server's dedupe window misses.
$pool->fan_out($sql, $cb) sends the same select to every
member and collects per-member results into one arrayref:
$pool->fan_out("select hostName(), uptime()", sub {
for my $r (@{ $_[0] }) {
printf "[%d] err=%s rows=%s\n",
$r->{member}, $r->{err} // '-',
$r->{rows} ? scalar @{$r->{rows}} : '-';
}
});
Useful for shard-aware diagnostics (per-replica lag, distinct
system.* values across the pool). Errors are per-member, not
aggregated - the callback always fires with a complete list. Pass
settings => \%h for per-query options.
Circuit breaker: pass circuit_threshold => N at construction
to enable per-member fail-fast. After N consecutive query/insert/ping
errors on a given member, that member is excluded from _pick for
circuit_cooldown seconds (default 30). A successful callback resets
the per-member fail counter. If every member is dead at pick time the
breaker is bypassed so the next attempt still has a chance to recover.
Inspect with $pool->circuit_state which returns one
{ fails => N, dead_until => $epoch, alive => 0|1 }
hashref per member.
Graceful shutdown: $pool->shutdown($grace_seconds, $cb)
drains every member, then calls finish on each. If $grace_seconds
elapses before every member drains, members still in flight are
force-finished and $cb receives the string
"Pool::shutdown timed out after Ns". On a clean shutdown $cb
receives undef. $grace_seconds may be 0 (or undef) to wait
indefinitely. The callback fires exactly once.
$SIG{TERM} = sub { $pool->shutdown(10, sub { EV::break }) };
LIFECYCLE
finish
$ch->finish;
Close the connection. Pending queries receive an error callback. Aliased
as disconnect.
reset
$ch->reset;
Disconnect and immediately reconnect using the original parameters.
Aliased as reconnect.
drain
$ch->drain(sub { ... });
Register a callback to fire once all pending queries (queued + in-flight) have completed. If nothing is pending, the callback fires synchronously. The classic graceful-shutdown pattern:
$ch->query("select 1", sub { ... });
$ch->query("select 2", sub { ... });
$ch->drain(sub {
$ch->finish;
EV::break;
});
cancel
$ch->cancel;
Cancel the currently in-flight query. Native protocol sends CLIENT_CANCEL
and waits for the server's EndOfStream/Exception; HTTP closes the connection
(use auto_reconnect or call "reset" to recover). The query's callback
receives an error.
skip_pending
$ch->skip_pending;
Drop every pending operation: each queued and in-flight callback is invoked
with (undef, $error_message). If a request was on the wire, the connection
is torn down; call "reset" (or rely on auto_reconnect) before issuing
new queries.
ACCESSORS
All per-query accessors (column_names, column_types, last_query_id,
last_error_code, last_totals, last_extremes, profile_rows,
profile_bytes, profile_rows_before_limit) are reset at the moment a
new query is dispatched (queued or sent), not when its callback fires.
It is always safe to read them inside the query's own callback. Reading
them after dispatching a subsequent query but before its callback fires
returns the initial state (0 or undef), never the previous query's
data. Connection-level accessors (is_connected, server_info,
server_version, server_timezone, pending_count) are unaffected.
-
is_connected
True if the connection is established.
-
current_host
The host the connection is presently pointed at as a string. After a multi-host failover rotation, this reflects the new target rather than the originally-supplied one.
-
current_port
The port the connection is presently pointed at as an integer.
-
server_revision
The native protocol revision the server reports in its ServerHello, as a positive integer (e.g.
54459).0before the handshake completes and for HTTP connections (which have no native handshake). Use "server_supports" for named-capability checks; this raw integer is the escape hatch when you need to compare against a specific revision number from the ClickHouse source. -
pending_count
Number of pending operations (queued + in-flight).
-
server_info
Full server identification string (e.g.
"ClickHouse 24.1.0 (revision 54459)"), populated from the native ServerHello.undeffor HTTP connections. -
server_version
Server version (e.g.
"24.1.0"). Native only;undeffor HTTP. -
server_timezone
Server timezone (e.g.
"UTC","Europe/Moscow"). Native only;undeffor HTTP. -
column_names
Arrayref of column names from the most recent native query result, or
undefif no query has run. Native protocol only - HTTP responses do not carry column metadata.$ch->query("select 1 as foo, 2 as bar", sub { my $names = $ch->column_names; # ['foo', 'bar'] }); -
column_types
Arrayref of ClickHouse type strings from the most recent native query (e.g.
['UInt32', 'String', 'Nullable(DateTime)']). Native protocol only -undefon HTTP. -
last_query_id
query_idof the most recently dispatched query, orundef. Set via{ query_id => 'my-id' }in the settings hash of "query"/"insert". -
last_tls_error
The most recent OpenSSL error string captured during TLS context setup or handshake (e.g.
certificate verify failed,key values mismatch), orundefif no TLS error has occurred. Alwaysundefwhen built without OpenSSL. Useful for surfacing the actual crypto reason to the operator after a connection has failed - the on_error message itself only names the failing call site (e.g.SSL_connect failed). -
last_error_code
ClickHouse error code (integer) of the most recent server-side exception, or
0if no error. The top-level code is reported even when the exception is a chain. Useful for distinguishing retryable errors (e.g.202=TOO_MANY_SIMULTANEOUS_QUERIES) from permanent ones (60=UNKNOWN_TABLE,516=AUTHENTICATION_FAILED). -
last_totals
Arrayref of totals rows from the last query that used
with totals, orundef. Native only. -
last_extremes
Arrayref of extremes rows from the last native query, or
undef. -
profile_rows_before_limit
Rows that would have been returned without
limit. Useful for pagination UIs. Native only. -
profile_rows
Total rows processed by the last query. Populated from the native ProfileInfo packet on the native protocol, or from
X-ClickHouse-Summary(read_rows) on HTTP. -
profile_bytes
Total bytes processed by the last query. Populated from the native ProfileInfo packet on the native protocol, or from
X-ClickHouse-Summary(read_bytes) on HTTP.
ALIASES
q -> query
ddl -> query
reconnect -> reset
disconnect -> finish
REQUIREMENTS
- Perl 5.14 or newer
- EV 4.11 or newer (event loop)
- zlib (required)
- OpenSSL (optional, for TLS; auto-detected at build time)
- liblz4 (optional, for native protocol compression; auto-detected)
TROUBLESHOOTING
-
AUTHENTICATION_FAILED on the first query
The native handshake authenticates lazily; the first query is what surfaces a bad
user/password. Check the server'susers.xmland the URI formclickhouse://user:pass@host:port/db. -
DateTime returns a number, not a string
DateTime/Datedecode to raw integers (Unix epoch / days since epoch) by default for stable round-tripping. Passdecode_datetime => 1to get ISO-formatted strings. -
ClickHouse error
UNKNOWN_DATABASEon connectThe
databaseargument is sent as the default; the server must already have that database. Usedatabase => 'default'while bootstrapping. -
Insert silently dropped (counts don't match)
Likely
insert_deduplication_tokendedupe; either you're reusing a token across distinct batches, or the table isReplicatedMergeTreewith the default dedupe window. Seeeg/idempotent_insert.pl. -
Hangs on connect when host is a hostname
Without EV::cares, DNS resolution falls back to blocking
getaddrinfo. Install EV::cares for non-blocking lookup; otherwise use an IP literal or a local caching resolver (nscd / systemd-resolved). -
connect_timeoutdoesn't fireIt does across TCP connect, TLS handshake, and native ServerHello. If the timer doesn't fire, the underlying issue is usually a synchronous DNS stall (see above) which happens before
start_connectarms the timer; install EV::cares to push DNS off the loop. -
Per-query
query_timeoutis ignoredSet it inside the
\%settingshashref, not as a top-level argument:$ch->query($sql, { query_timeout => 5 }, $cb). -
Which host am I currently pointed at after failover?
$ch->current_hostand$ch->current_portreflect the live target after a multi-host rotation. Useon_failover => sub { ... }to get notified at the moment of each rotation. -
How do I retry only on transient errors?
EV::ClickHouse->is_retryable_error($code)returns true for the common transient codes (timeouts, network errors, replica catch-up, keeper exceptions, ...). Inspect$ch->last_error_codefrom inside your query callback and schedule a retry only when the predicate fires - permanent errors (auth failures, missing tables) won't qualify.Sample skeleton:
$ch->query($sql, sub { my ($r, $err) = @_; if ($err && EV::ClickHouse->is_retryable_error($ch->last_error_code)) { schedule_retry($sql); } elsif ($err) { warn "permanent: $err" } }); -
Idempotent insert silently drops some rows
idempotent => 1auto-mintsinsert_deduplication_token; if your producer issues the SAME logical batch twice (e.g. retry after a transient network blip) only the first write lands, by design. To force two distinct logical batches through, either pass an explicitidempotent => $tokenper batch or omit the option for fresh inserts. Seeeg/idempotent_insert.pl. -
on_datavsiterate- which should I pick?on_data => sub { }in the per-query settings is the lowest-overhead streaming path: each native data block is delivered as soon as the parser has it, no per-row allocation overhead beyond the batch arrayref.iterateis a synchronous-feeling pull wrapper around the same machinery - useful when the surrounding code is procedural (ETL scripts, exporters) and a callback shape doesn't fit. Both are native-only. -
Connection in front of nginx / reverse proxy strips X-ClickHouse-* headers
Pass
http_basic_auth => 1to send the credentials asAuthorization: Basic ...instead. Most HTTP gateways forward Authorization verbatim while filtering proprietary headers.
TUNING
-
Native vs HTTP
Native (port 9000) is typically 2-5x faster for insert and select-of-many-rows because rows ship as binary columns instead of TSV text. Use HTTP only when the network path requires HTTPS-only or when you need
raw => 1CSV / JSONEachRow / Parquet bodies. -
compress => 1Enables LZ4 (native) or gzip (HTTP). LZ4 cost is small and saves ~50-70% on text-heavy columns. Gzip is heavier; turn on only if you're bandwidth-bound.
-
insert_streamerbatch_sizeDefault 10_000 is a good baseline. Smaller (1k-2k) reduces memory pressure on the producer; larger (50k-100k) reduces server-side merge cost on MergeTree. Match to your row width: ~1 MB per batch is a sweet spot.
-
keepaliveEnable on long-lived idle connections (HTTP behind a load balancer or NAT, or a native connection that may sit minutes between queries). 15-30s is typical.
-
reconnect_max_attemptsAlways set in production. Default is unlimited; a permanent failure (wrong host, wrong port, dead server) will spin
on_errorforever otherwise. -
progress_periodCoalesce on_progress packets to one fire per N seconds. Big SELECTs can emit hundreds per second; throttle to 1-5s for monitoring dashboards.
-
Pull-iterator vs
on_dataon_datahas lower per-block overhead.iteratetrades that for a synchronous-feeling API; use it when the surrounding code is procedural. -
EV::ClickHouse::PoolA Pool fans concurrent queries across N independent connections, so a slow query on one doesn't head-of-line-block the others. Use it for read-mostly fan-out; do not use it for queries that depend on session-level state (temporary tables,
set) since each query may land on a different connection.
Performance tuning checklist
-
- Pick the right protocol
Native (port 9000) beats HTTP (port 8123) for almost all workloads. HTTP is only required for HTTPS-fronted ingress, the
rawmode that returnsRowBinary/JSONEachRow/Parquetbodies unparsed, or gateway authentication that strips proprietary CH headers (seehttp_basic_auth). -
- Tune
batch_sizefor INSERTs
Aim for ~1 MB per batch. ClickHouse merges every block into a part on disk, so 1k blocks of 1k rows each is dramatically slower than 1 block of 1M rows because of merge amplification.
insert_streamerwithbatch_size => $rows_for_1MB+high_waterbackpressure is the production-grade default. - Tune
-
- Cap
max_recv_buffer
Without a cap, a runaway select (or a buggy upstream that returns gigabytes) will grow the recv buffer until the process is OOM-killed. Set
max_recv_buffer => 64 * 1024 * 1024(64 MB) and let the parser tear the connection down with a clean error if exceeded - the caller's on_error can decide whether to retry or surface to the user. - Cap
-
- Watch for head-of-line blocking
A single
EV::ClickHouseserialises queries. Use EV::ClickHouse::Pool when concurrent queries should run in parallel (read-mostly workloads, dashboard fan-out). For latency-sensitive SELECTs against replicated tables,$pool->hedged_querysends the request to N members and resolves with the first reply, shaving tail latency at the cost of extra server work. -
- Measure latencies cheaply
$ch->track_query_durations(1024)installs a fixed-size ring buffer of recent query durations; subsequent$ch->query_duration_p(0.95)reports the p95. Useful for in-process histograms when you don't want to wire up a metrics backend just for one connection. Composes with a user-suppliedon_query_complete(which is preserved and called first). -
- Use
server_supportsfor capability gating
Don't hard-code
server_revision >= 54420; ask$ch->server_supports('progress_writes'). The capability table maps human-readable feature names to revisions and is updated when the client revision changes, so callers don't have to track protocol numbers. - Use
-
- Inspect via
dump_stateandpending_queries
When a connection seems stuck,
$ch->dump_statereturns a hashref snapshot (fd state, send/recv buffer pos, callback depth, pending_count, ...) and$ch->pending_querieslists the in-flight + queued entries with their query_ids and age. Both are read-only debug accessors - safe to call from a signal-handler-style dump path. - Inspect via
-
- Don't fight the freelist
The XS layer keeps freelists for both cb_queue and send_queue entries, so allocating callbacks is essentially free after warm-up. The implication: avoid wrapping the connection in heavy wrappers that clone the connection per call - there is no per-call setup cost worth amortising away.
ARCHITECTURE
The client is a single state machine driven by an EV event loop. Each connection holds: a TCP fd (non-blocking), a send buffer, a receive buffer, a callback queue (next-in-line per protocol), and a pending send queue (buffered before connect).
State transitions:
Connect TCP --> [TLS handshake] --> [Native ServerHello]
--> Connected --> { dispatch from send_queue;
parse response; deliver via cb_queue }
The connect_timeout timer covers all three pre-Connected stages.
auto_reconnect re-runs the chain via schedule_reconnect.
Two key invariants:
- Native protocol is strictly request/response. Only one query is
in-flight per connection at a time.
insert_streamerserialises batches against this constraint. callback_depthguards againstselfbeing freed mid-callback. Every callback dispatch increments it;check_destroyeddefers the finalSafefreeuntil depth returns to zero.
For deeper detail (state-machine table, queue semantics) see CLAUDE.md
in the source distribution.
TYPES
Per-column wire format and Perl-side gotchas. All numeric types
round-trip stable raw values by default; opt into string forms via
decode_datetime, decode_decimal, decode_enum.
-
Integers
Int8..Int64 / UInt8..UInt64: native Perl IV/UV. Int128/UInt128/Int256/UInt256 return decimal string representations on platforms with
__int128(Int128/UInt128) or always for the 256-bit forms. -
Floats
Float32/Float64 round-trip exactly within IEEE-754 limits.
NaN/+Inf/-Infare preserved. -
BFloat16
Top 16 bits of a Float32. Encoded by truncation; decoded by zero-extension. Suitable for ML feature columns; not for accounting.
-
Decimal32/64/128
Decoded as IV (raw integer) or NV (scaled to N decimal digits if
decode_decimal => 1). Decimal128 over very long precision may lose trailing digits in the NV form; passdecode_decimal => 0and divide yourself with Math::BigInt for exact arithmetic. -
Decimal256
Returns raw 32 LE bytes. Decode with Math::BigInt (see
eg/decimal_bigmath.pl). -
Date / Date32 / DateTime / DateTime64
Default: integer (days since epoch / Unix seconds). With
decode_datetime:YYYY-MM-DDorYYYY-MM-DD HH:MM:SSorYYYY-MM-DD HH:MM:SS.ffffff. DateTime carries a timezone string; the formatted output uses it. -
Bool
Decoded as 0/1. Encoded from any truthy/falsy SV. ClickHouse stores internally as UInt8 0/1.
-
String / FixedString
Bytes-in, bytes-out. No UTF-8 transformation.
-
UUID
Canonical hex form
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx. Encode accepts the same. -
IPv4 / IPv6
Dotted-quad / canonical IPv6 strings.
-
Enum8 / Enum16
Default: integer code. With
decode_enum => 1: label string. -
Nullable(T)
undefin Perl maps to null; otherwise the inner type's encoding. -
Array(T)
Perl arrayref of inner-type values.
-
Tuple(T1, T2, ...)
Perl arrayref ordered as the type declaration. Named tuples (
Tuple(a Int32, b String)) are still arrayref-positional; parse the name fromcolumn_typesif you need it. -
Map(K, V)
Perl hashref. Keys are stringified.
-
LowCardinality(T)
Transparent: encodes/decodes as the inner type. Cross-block dictionaries are managed internally.
-
SimpleAggregateFunction / AggregateFunction
Decoded as the inner declared type (correct for sum/min/max/avg-ish functions). For complex states (quantile, uniqExact, ...) wrap the select with
finalizeAggregation(col)server-side. -
Geo (Point/Ring/LineString/MultiLineString/Polygon/MultiPolygon)
Decoded as the underlying nested arrayref/tuple shape.
-
JSON / Object('json')
Decoded as a Perl hashref with dotted-path leaves auto-unflattened to nested hashes. Encode accepts arbitrarily-nested hashrefs; supported leaf kinds are Int64, Float64, Bool (recognised JSON::PP::Boolean classes or
SvIsBOOL), String, and Array(<those>). -
Variant / Dynamic
Recognised by the type parser, but the wire format is per-server- version and not implemented here. Selecting a
Variant(...)orDynamiccolumn raises a clean decode error and tears the connection down (this is safer than guessing the framing and corrupting every subsequent column). Wrap withtoString(col)orCAST(col AS String)server-side to read the value as its JSON representation. -
Interval (Second/Minute/Hour/Day/Week/Month/Quarter/Year)
Decoded as Int64 (the unit count). The unit is implicit from the column type.
COOKBOOK
The eg/ directory in the source distribution carries runnable
patterns for the common production shapes. Each one is self-contained
and reads top-to-bottom.
-
eg/etl_pipeline.plProducer + Pool + "insert_streamer" with
high_waterbackpressure andidempotenttokens. The reliable-ingest baseline. -
eg/health_probe.plPeriodic "is_healthy" probe with bounded timeout, transition logging, and automatic "reset" on failure. Drop-in for self-monitoring.
-
eg/circuit_breaker.plPool with
circuit_threshold+circuit_cooldownshielding the rotation from a sticky bad member. Demonstratescircuit_stateintrospection. -
eg/csv_export.plStreams a multi-million-row select to a CSV file via the per-block
on_datahook (no full-result buffering). Mirrors the equivalent "iterate" form in a comment. -
eg/migration_runner.plApply numbered SQL migration files in order, recording successes in a
_migrationstable and usingidempotenton the registry insert so a partial apply doesn't leave the registry out of sync. -
eg/failover.pl+eg/pool.plMulti-host failover and built-in connection pool - the reliability primitives the cookbook recipes layer on top of.
-
eg/async_dns.plConstructor returns immediately even for hostnames; queries queue behind EV::cares resolution.
-
eg/idempotent_insert.plAuto-minted insert deduplication tokens that survive a reconnect- driven retry without double-writing.
-
eg/external_tables.plShips a client-side data block with a query as an external table - an
INfilter against an id set, and a JOIN against a client lookup table - in a single round trip.
EV::ClickHouse::Error
Lightweight error class. Callbacks always receive ($result, $err_msg)
plain strings (preserved for compatibility); this object is opt-in via
EV::ClickHouse::Error->from_ch($ch, $err) when callers want
structured access to the code, symbolic name, and retryability of the
last server-side exception.
my $e = EV::ClickHouse::Error->from_ch($ch, $err) or return;
if ($e->is_retryable) { schedule_retry() }
elsif ($e->code == 60) { # UNKNOWN_TABLE
warn "table missing: $e"; # stringifies to msg
}
-
new(message => $msg, code => $code)
Plain constructor; rarely used directly.
-
from_ch($ch, $err)
Build from the
($ch, $err)pair available in callbacks.codecomes from$ch->last_error_code. Returnsundefif$erris empty (so the idiommy $e = ...-from_ch(...) or return> works). -
message / code / name / is_retryable
Field accessors.
namelooks up the symbolic name for the code (e.g.UNKNOWN_TABLEfor 60); returnsundeffor codes not in the table.is_retryableconsults the same list "is_retryable_error" uses. -
EV::ClickHouse::Error->code_name($code)
Class-method lookup of the symbolic name for a numeric code.
-
EV::ClickHouse::Error->known_codes
Sorted list of all numeric codes the symbolic-name table covers. The table is informational only - codes outside it are still valid ClickHouse errors, just unnamed by this module.
The object overloads stringification to return the message, so legacy
callsites that string-compare or interpolate $err keep working
verbatim when the error is wrapped.
SEE ALSO
- EV - the underlying event loop.
- EV::cares - optional async DNS resolver picked up automatically when installed.
- https://clickhouse.com/docs/en/interfaces/tcp - native binary protocol reference.
- https://clickhouse.com/docs/en/interfaces/http - HTTP interface.
- https://clickhouse.com/docs/en/operations/server-configuration-parameters/settings
- server-side settings forwarded via the
settingshash.
AUTHOR
vividsnow
LICENSE
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.