NAME

EV::ClickHouse - Async ClickHouse client using EV

SYNOPSIS

use EV;
use EV::ClickHouse;

# Discrete parameters
my $ch = EV::ClickHouse->new(
    host       => '127.0.0.1',
    port       => 9000,
    protocol   => 'native',     # or 'http'
    user       => 'default',
    password   => '',
    database   => 'default',
    settings   => { max_threads => 4 },  # connection-level defaults
    on_connect => sub { print "connected\n" },
    on_error   => sub { warn "error: $_[0]\n" },
);

# Or via URI: clickhouse[+native]://user:pass@host:port/db?key=val
my $ch = EV::ClickHouse->new(
    uri        => 'clickhouse+native://default:@127.0.0.1:9000/default',
    on_connect => sub { ... },
);

# SELECT
$ch->query("SELECT number FROM system.numbers LIMIT 3", sub {
    my ($rows, $err) = @_;
    die $err if $err;
    print "row: @$_\n" for @$rows;     # row: 0 / row: 1 / row: 2
});

# Per-query settings + parameterized values (no string interpolation)
$ch->query(
    "SELECT {x:UInt32} + {y:UInt32} AS sum",
    { params => { x => 40, y => 2 }, max_execution_time => 30 },
    sub { my ($rows, $err) = @_; print $rows->[0][0], "\n" },  # 42
);

# INSERT - arrayref of rows (no TSV escaping needed)
$ch->insert("my_table", [
    [1, "hello\tworld"],   # embedded tab is fine
    [2, undef],            # NULL
    [3, [10, 20]],         # Array column
], sub { my (undef, $err) = @_; warn "insert: $err" if $err });

# INSERT - pre-formatted TSV string
$ch->insert("my_table", "1\tfoo\n2\tbar\n", sub { ... });

# Raw HTTP response body (HTTP only)
$ch->query("SELECT * FROM t FORMAT CSV", { raw => 1 }, sub {
    my ($body, $err) = @_;
    print $body;
});

EV::run;

DESCRIPTION

EV::ClickHouse is an asynchronous ClickHouse client that integrates with the EV event loop. It speaks both the ClickHouse HTTP protocol (port 8123) and the native TCP protocol (port 9000) directly in XS, with no external ClickHouse client library linked. zlib is required; OpenSSL (for TLS) and liblz4 (for native compression) are optional and detected at build time.

Features

  • HTTP and native TCP protocols, with the same Perl API

  • gzip compression (HTTP) and LZ4 compression with CityHash checksums (native)

  • TLS/SSL via OpenSSL, with optional tls_skip_verify for self-signed certs and tls_ca_file for additional roots

  • Connection URIs (clickhouse[+native]://user:pass@host:port/db), including bracketed IPv6 literals

  • Per-query and connection-level ClickHouse settings; parameterized queries via params

  • Auto-reconnect with exponential backoff; queued (unsent) queries are preserved across reconnects

  • Keepalive pings for idle native connections; graceful drain; query cancellation and skip_pending

  • Streaming results via on_data per-block callback (native); on_progress for native progress packets

  • Raw HTTP response mode for CSV / JSONEachRow / Parquet / etc.

  • 30+ ClickHouse types including Decimal128, UUID, IPv4/IPv6, Nullable, Array, Tuple, Map, LowCardinality (with cross-block dictionaries), SimpleAggregateFunction, Nested

  • Opt-in decode of Date/DateTime, Decimal, and Enum columns; named-rows (hashref) mode

CONSTRUCTOR

new

my $ch = EV::ClickHouse->new(%args);

The connection is initiated immediately; new returns before it completes. Queries issued before on_connect fires are queued and dispatched once the connection is ready.

Connection parameters:

uri => $uri_string

Single-string connection target: clickhouse[+native]://user:pass@host:port/database?key=value.

The +native suffix selects the native protocol; otherwise HTTP is used. Hostnames, IPv4 addresses, and bracketed IPv6 literals are all accepted (e.g. clickhouse://[::1]:9000/db). Query-string values are merged into the constructor arguments. Discrete host, port, etc. arguments override the URI.

host => $hostname

Server hostname. Default: 127.0.0.1.

Note: DNS resolution is currently blocking. For fully asynchronous behaviour, use an IP literal or a local caching resolver.

port => $port

Server port. Default: 8123 (HTTP), 9000 (native).

protocol => 'http' | 'native'

Protocol to use. Default: http.

user => $username

Username. Default: default.

password => $password

Password. Default: empty.

database => $dbname

Default database. Default: default. The shorter alias db is also accepted.

tls => 0 | 1

Enable TLS. Default: 0. Requires the module to be built with OpenSSL (otherwise the constructor croaks).

tls_ca_file => $path

Additional CA certificate file for TLS verification, used alongside the system trust store.

tls_skip_verify => 0 | 1

Skip TLS certificate verification. Default: 0. Useful in development with self-signed certs; do not use in production.

loop => $ev_loop

EV event loop object. Default: EV::default_loop.

Callbacks:

on_connect => sub { }

Called once the connection is fully established (after the native ServerHello, or after the TCP/TLS handshake for HTTP).

on_error => sub { my ($message) = @_ }

Called on connection-level errors (DNS failure, socket error, TLS failure, read/write errors, etc.). Default: sub { die @_ }. Per-query errors are delivered to the query's own callback as the second argument; they do not invoke on_error.

When a connection drops mid-flight, on_error fires first with the underlying cause, and on_disconnect fires immediately after as the state machine tears the socket down. If auto_reconnect is set, the reconnect attempt happens after on_disconnect returns.

on_progress => sub { my ($rows, $bytes, $total_rows, $written_rows, $written_bytes) = @_ }

Called on native protocol progress packets. Not fired for HTTP.

on_disconnect => sub { }

Called when the connection is closed (by finish, server disconnect, or error). Fires after internal state has been reset, so it is safe to queue new queries or call reset from inside the handler.

on_trace => sub { my ($message) = @_ }

Debug trace callback. Called with internal state-machine messages (connect, dispatch, disconnect). Useful for diagnosing protocol issues.

Options:

compress => 0 | 1

Enable compression: gzip on HTTP (request and response), LZ4 with CityHash checksums on the native protocol. Default: 0. Native compression requires liblz4 at build time.

session_id => $id

HTTP session id for stateful operations (temporary tables, SET, etc.). Native protocol has stateful sessions intrinsically; this option is HTTP-only.

connect_timeout => $seconds

TCP/TLS connection timeout. 0 (default) means no timeout. Floating point allowed.

query_timeout => $seconds

Default per-query timeout applied to every query and insert. The query callback receives a timeout error if exceeded. Override per-call via the query_timeout key in the settings hashref.

auto_reconnect => 0 | 1

Reconnect automatically on connection loss. Default: 0. When enabled, queued (unsent) queries are preserved across reconnects; in-flight queries receive an error.

settings => \%hash

ClickHouse settings applied to every query and insert. Per-call settings (see "query", "insert") override these.

settings => { async_insert => 1, max_threads => 4 }
keepalive => $seconds

Send a native protocol PING every N seconds while the connection is idle. Default: 0 (disabled). Native protocol only.

reconnect_delay => $seconds

Initial delay for the auto_reconnect exponential backoff. Each failed attempt doubles the delay, capped at reconnect_max_delay. Default: 0 (immediate retry, no backoff).

reconnect_max_delay => $seconds

Backoff ceiling. Default: 0, meaning no explicit cap; the implementation still bounds the backoff exponent at 20 doublings, so with reconnect_delay = 0.5 the worst case is roughly 6 days. Setting an explicit ceiling is recommended in production.

Decode options (native protocol only):

These shape how column values are returned. All are opt-in and default to 0, which returns raw numeric forms for stable round-tripping.

decode_datetime => 0 | 1

Return Date, Date32, DateTime, and DateTime64 as formatted strings (e.g. "2024-01-15", "2024-01-15 10:30:00") instead of raw integers. Uses UTC; columns with an explicit timezone (DateTime('America/New_York')) are converted to that zone.

decode_decimal => 0 | 1

Return Decimal32/Decimal64/Decimal128 as scaled floating-point numbers instead of unscaled integers. Note: at large precisions, double loses bits, so leave disabled if you need exact arithmetic.

decode_enum => 0 | 1

Return Enum8/Enum16 as string labels instead of numeric codes.

named_rows => 0 | 1

Return each row as a hashref keyed by column name instead of an arrayref.

my $ch = EV::ClickHouse->new(named_rows => 1, ...);
$ch->query("SELECT 1 AS n", sub {
    my ($rows, $err) = @_;
    print $rows->[0]{n};  # 1
});

METHODS

query

$ch->query($sql, sub { my ($rows, $err) = @_ });
$ch->query($sql, \%settings, sub { my ($rows, $err) = @_ });

Executes a SQL statement. The callback receives:

  • ($arrayref_of_arrayrefs) for SELECT with at least one row

  • (undef) for DDL/DML on success and for SELECT with zero rows on the native protocol (HTTP returns an empty arrayref). When in doubt, treat undef and [] equivalently with my @rows = @{$rows // []};.

  • (undef, $error_message) on error (server exception or connection error)

The optional \%settings hashref passes per-query ClickHouse settings (max_execution_time, max_threads, async_insert, etc.), overriding connection-level defaults.

The following keys are intercepted by the client and not sent verbatim to the server:

params = \%hash>

Parameterized values for {name:Type} placeholders in the SQL. Encoding and quoting is the server's job, so values do not need escaping:

$ch->query(
    "SELECT * FROM t WHERE id = {id:UInt64} AND name = {n:String}",
    { params => { id => 42, n => "O'Brien" } },
    sub { ... },
);

Works on both protocols (HTTP uses URL-encoded param_* query string; native uses dedicated wire fields).

query_id = $string>

Set the protocol-level query identifier. Retrievable later via "last_query_id".

raw = 1>

HTTP only. The callback receives the raw response body as a scalar string instead of parsed rows. Use with an explicit FORMAT clause:

$ch->query("SELECT * FROM t FORMAT CSV", { raw => 1 }, sub {
    my ($body, $err) = @_;
});

Croaks if used with the native protocol.

query_timeout = $seconds>

Per-query timeout, overriding the connection-level query_timeout.

on_data = sub { my ($rows) = @_; ... }>

Native protocol only. A code ref called for each data block as it arrives, for streaming large result sets. Rows are delivered incrementally and not accumulated, so the final callback receives (undef) rather than all rows. The final callback always fires on completion or error, even if no data block was emitted (empty result, server-side error before the first block).

$ch->query("SELECT * FROM big_table",
    { on_data => sub { my ($rows) = @_; process_batch($rows) } },
    sub { my (undef, $err) = @_; warn $err if $err },
);

Native protocol type notes: values come back as typed Perl scalars. By default Date/DateTime are integers (days since epoch / Unix timestamps); enable decode_datetime for strings. Enum values are numeric codes; decode_enum returns labels. Decimal values are unscaled integers; decode_decimal scales them to floats. SimpleAggregateFunction is transparently decoded as its inner type. Nested columns become arrays of tuples. LowCardinality works correctly across multi-block results with shared dictionaries.

insert

$ch->insert($table, $data, sub { my (undef, $err) = @_ });
$ch->insert($table, $data, \%settings, sub { my (undef, $err) = @_ });

$data may be either:

  • A pre-formatted TabSeparated string (tabs separate columns, newlines separate rows, with the standard ClickHouse escapes).

  • An arrayref of arrayrefs (rows of column values).

When using arrayrefs, no TSV escaping is needed: undef maps to NULL and strings may contain tabs and newlines freely.

Nested arrayrefs (Array/Tuple columns) and hashrefs (Map columns) are supported only on the native protocol, where the encoder has the column type from the server's sample block. On HTTP the same call croaks rather than silently produce malformed TSV; use the native protocol or pre-serialise nested types into ClickHouse TSV literal form.

# Native: nested types encode directly.
$ch->insert("my_table", [
    [1, "hello\tworld"],   # embedded tab
    [2, undef],            # NULL
    [3, [10, 20]],         # Array column   (native only)
    [4, { a => 1, b => 2 }],  # Map column  (native only)
], sub { ... });

The optional \%settings hashref works exactly as in "query", including query_id, query_timeout, and params.

ping

$ch->ping(sub { my ($result, $err) = @_ });

Send a no-op round trip to verify the connection is alive. On success $result is true, $err is undef. On error: (undef, $error).

finish

$ch->finish;

Close the connection. Pending queries receive an error callback. Aliased as disconnect.

reset

$ch->reset;

Disconnect and immediately reconnect using the original parameters. Aliased as reconnect.

drain

$ch->drain(sub { ... });

Register a callback to fire once all pending queries (queued + in-flight) have completed. If nothing is pending, the callback fires synchronously. The classic graceful-shutdown pattern:

$ch->query("SELECT 1", sub { ... });
$ch->query("SELECT 2", sub { ... });
$ch->drain(sub {
    $ch->finish;
    EV::break;
});

cancel

$ch->cancel;

Cancel the currently in-flight query. Native protocol sends CLIENT_CANCEL and waits for the server's EndOfStream/Exception; HTTP closes the connection (use auto_reconnect or call "reset" to recover). The query's callback receives an error.

skip_pending

$ch->skip_pending;

Drop every pending operation: each queued and in-flight callback is invoked with (undef, $error_message). If a request was on the wire, the connection is torn down; call "reset" (or rely on auto_reconnect) before issuing new queries.

ACCESSORS

is_connected

True if the connection is established.

pending_count

Number of pending operations (queued + in-flight).

server_info

Full server identification string (e.g. "ClickHouse 24.1.0 (revision 54459)"), populated from the native ServerHello. undef for HTTP connections.

server_version

Server version (e.g. "24.1.0"). Native only; undef for HTTP.

server_timezone

Server timezone (e.g. "UTC", "Europe/Moscow"). Native only; undef for HTTP.

column_names

Arrayref of column names from the most recent native query result, or undef if no query has run.

$ch->query("SELECT 1 AS foo, 2 AS bar", sub {
    my $names = $ch->column_names;  # ['foo', 'bar']
});
column_types

Arrayref of ClickHouse type strings from the most recent native query (e.g. ['UInt32', 'String', 'Nullable(DateTime)']).

last_query_id

query_id of the most recently dispatched query, or undef. Set via { query_id => 'my-id' } in the settings hash of "query"/"insert".

last_error_code

ClickHouse error code (integer) of the most recent server-side exception, or 0 if no error. The top-level code is reported even when the exception is a chain. Useful for distinguishing retryable errors (e.g. 202 = TOO_MANY_SIMULTANEOUS_QUERIES) from permanent ones (60 = UNKNOWN_TABLE, 516 = AUTHENTICATION_FAILED).

last_totals

Arrayref of totals rows from the last query that used WITH TOTALS, or undef. Native only.

last_extremes

Arrayref of extremes rows from the last native query, or undef.

profile_rows_before_limit

Rows that would have been returned without LIMIT. Useful for pagination UIs. Native only.

profile_rows

Total rows processed by the last query (native ProfileInfo).

profile_bytes

Total bytes processed by the last query (native ProfileInfo).

ALIASES

q          -> query
reconnect  -> reset
disconnect -> finish

REQUIREMENTS

  • Perl 5.12 or newer

  • EV 4.11 or newer (event loop)

  • zlib (required)

  • OpenSSL (optional, for TLS; auto-detected at build time)

  • liblz4 (optional, for native protocol compression; auto-detected)

SEE ALSO

EV, https://clickhouse.com/docs

AUTHOR

vividsnow

LICENSE

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.