NAME

EV::Nats - High-performance asynchronous NATS client using EV

SYNOPSIS

use EV;
use EV::Nats;

my $nats = EV::Nats->new(
    host       => '127.0.0.1',
    port       => 4222,
    reconnect  => 1,
    on_error   => sub { warn "nats: $_[0]\n" },
    on_connect => sub { warn "connected\n" },
);

# Subscribe (plain or queue group)
my $sid = $nats->subscribe('foo.>', sub {
    my ($subject, $payload, $reply, $headers) = @_;
    print "[$subject] $payload\n";
});
$nats->subscribe('work.>', sub { ... }, 'workers');

# Publish (fire-and-forget) and headered publish
$nats->publish('foo.bar', 'hello world');
$nats->hpublish('foo.bar', "NATS/1.0\r\nX-Trace: 42\r\n\r\n", 'body');

# Request / reply
$nats->request('service.echo', 'ping', sub {
    my ($response, $err) = @_;
    die $err if $err;
    print "reply: $response\n";
}, 5000);                       # 5s timeout

$nats->unsubscribe($sid);
EV::run;

DESCRIPTION

EV::Nats is an async NATS client that implements the protocol directly in XS on top of EV. There is no external C library dependency.

Protocol

Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG, PING/PONG), including headered publish/receive, wildcard subjects (*, >), queue groups, and request/reply with an automatic shared inbox subscription.

Connectivity

TCP and Unix-domain sockets; TCP keepalive; connect timeout; auto reconnect with exponential backoff and jitter; subscription and auto-unsub state restored on reconnect; cluster failover from INFO connect_urls; lame-duck-mode (leaf node graceful shutdown) callback; graceful drain.

Auth

Token, user/pass, NKey/JWT (Ed25519 via OpenSSL).

TLS

Optional, auto-detected at build time. STARTTLS-style upgrade after INFO; full hostname verification (DNS or IP literal) by default; opt-out tls_skip_verify; custom CA via tls_ca_file.

Performance

Write coalescing via ev_prepare (one write() per loop iteration); O(1) subscription lookup; per-publish allocation-free fast path; explicit batch mode for tight loops; per-connection stats counters.

Higher-level APIs

EV::Nats::JetStream, EV::Nats::KV, EV::Nats::ObjectStore.

Note: DNS resolution via getaddrinfo is blocking. Use numeric IP addresses for latency-sensitive applications.

METHODS

new(%options)

Create an EV::Nats instance. If host or path is supplied, connection is initiated immediately and the on_connect callback fires once the CONNECT/PONG handshake completes.

my $nats = EV::Nats->new(
    host       => '127.0.0.1',
    port       => 4222,
    reconnect  => 1,
    on_error   => sub { warn "nats: $_[0]\n" },
    on_connect => sub { warn "ready\n" },
);

Connection options

host => Str

Server hostname (numeric IP recommended; see "CAVEATS"). When set, connection starts immediately.

port => Int (default 4222)

Server port.

path => Str

Unix-domain socket path. Mutually exclusive with host.

connect_timeout => Int (ms; 0 = none)

How long to wait for the TCP/TLS handshake before giving up.

keepalive => Int (seconds)

If set, enables SO_KEEPALIVE with this idle interval.

priority => Int (-2 .. +2)

EV watcher priority for the I/O watchers on this connection.

loop => EV::Loop (default EV::default_loop)

The EV loop to attach watchers to.

name => Str

Client name advertised in CONNECT.

Auth options

user => Str / pass => Str

Username/password authentication. JSON-escaped in CONNECT.

token => Str

Token authentication.

nkey_seed => Str

NATS NKey seed (the SU... form). Requires the build to have OpenSSL (EV::Nats::HAS_NKEY).

jwt => Str

User JWT, paired with nkey_seed for decentralized auth. See also "creds_file".

tls => Bool / tls_ca_file => Str / tls_skip_verify => Bool

See "tls" for details.

Protocol options

verbose => Bool (default 0)

Request +OK acknowledgments after each command.

pedantic => Bool (default 0)

Server-side strict subject checking.

echo => Bool (default 1)

Receive messages this client itself publishes.

no_responders => Bool (default 0)

Ask the server to send a 503 status reply when a request has no responders, surfaced as the "no responders" error in request.

ping_interval => Int (ms, default 120000; 0 = disabled)

Client-initiated PING interval for keep-alive.

max_pings_outstanding => Int (default 2)

Maximum unacked PINGs before the connection is declared stale.

Reconnect options

reconnect => Bool (default 0)

Enable automatic reconnection.

reconnect_delay => Int (ms, default 2000)

Initial delay between reconnect attempts; subsequent attempts use exponential backoff with jitter, capped by max_reconnect_delay.

max_reconnect_delay => Int (ms, default 30000)

Upper bound on the backoff delay.

max_reconnect_attempts => Int (default 60; 0 = unlimited)

Give up after this many consecutive failures.

Callback options

All callbacks fire on the EV loop, never inline.

on_connect => sub { }

Called after the CONNECT/PONG handshake completes.

on_disconnect => sub { }

Called when the connection drops, before any auto-reconnect attempt.

on_error => sub { my ($err) = @_ }

Receives a string. If unset, errors croak.

on_lame_duck => sub { }

Called once when the server signals lame-duck-mode shutdown via INFO ldm:true.

on_slow_consumer => sub { my ($pending_bytes) = @_ }

See "slow_consumer".

connect($host, [$port])

Initiate a TCP connection. Port defaults to 4222. Croaks if already connected or in the middle of connecting; otherwise returns immediately and signals completion via on_connect.

connect_unix($path)

Initiate a Unix-domain-socket connection. Same async semantics as "connect".

disconnect

Cancel any pending reconnect, drop queued writes, close the socket, and fire on_disconnect. intentional_disconnect is set so no auto-reconnect is scheduled. For a clean shutdown that flushes pending writes first, see "drain".

is_connected

True if the CONNECT/PONG handshake has completed and no disconnect or reconnect is in progress.

publish($subject, [$payload], [$reply_to])

Publish a message. Alias: pub.

$nats->publish('foo', 'hello');
$nats->publish('foo', 'hello', 'reply.subject');

hpublish($subject, $headers, [$payload], [$reply_to])

Publish with headers. Alias: hpub.

$nats->hpublish('foo', "NATS/1.0\r\nX-Key: val\r\n\r\n", 'body');

subscribe($subject, $cb, [$queue_group])

Subscribe to a subject. Returns subscription ID. Alias: sub.

my $sid = $nats->subscribe('foo.*', sub {
    my ($subject, $payload, $reply, $headers) = @_;
});

Queue groups are preserved across reconnects.

Callback receives:

$subject - actual subject the message was published to
$payload - message body
$reply - reply-to subject (undef if none)
$headers - raw headers string (only for HMSG)

subscribe_max($subject, $cb, $max_msgs, [$queue_group])

Convenience: "subscribe" followed by an auto-unsubscribe after $max_msgs messages have been delivered.

unsubscribe($sid, [$max_msgs])

Unsubscribe. With $max_msgs, the server is told to deliver that many more messages and then drop the subscription. The auto-unsub state is restored on reconnect (so the partial count survives a disconnect). Alias: unsub.

request($subject, $payload, $cb, [$timeout_ms])

Request/reply. Uses automatic inbox subscription. Alias: req.

$nats->request('service', 'data', sub {
    my ($response, $err) = @_;
    die $err if $err;
    print "got: $response\n";
}, 5000);

Callback receives ($response, $error). For replies that include NATS message headers (HMSG), a third argument $headers with the raw header block is also passed. Error is set on timeout ("request timeout") or no responders ("no responders").

drain([$cb])

Graceful shutdown: sends UNSUB for all subscriptions, flushes pending writes with a PING fence, fires $cb when the server confirms with PONG, then disconnects. No new messages will be received after drain is initiated.

$cb receives a single argument: undef on clean drain, or an error string (e.g. "disconnected") if the connection dropped before the PONG arrived.

$nats->drain(sub {
    my ($err) = @_;
    die "drain failed: $err" if $err;
    print "drained, safe to exit\n";
});

ping

Send PING to server.

flush([$cb])

Send PING as a write fence; the subsequent PONG guarantees all prior messages were processed by the server. If $cb is given, it is invoked when the PONG arrives. The callback receives a single argument: undef on success, or an error string (e.g. "disconnected") if the connection dropped before the PONG arrived.

creds_file($path)

Read a NATS .creds file and apply the embedded JWT and NKey seed via "jwt" and "nkey_seed". Apply this BEFORE connect so the credentials are available during the CONNECT handshake. Dies if the file is unreadable or missing either the USER JWT or USER NKEY SEED block.

new_inbox

Returns a fresh subject suitable for use as a private reply target (_INBOX.<rand>.<n>). Each call burns a slot from the same counter that "request" uses, so manual subscribers must treat the returned subject as opaque.

subscription_count

Returns the number of currently-registered subscriptions, including the implicit _INBOX.> subscription used by "request".

server_info

Returns the raw JSON string of the most recent INFO frame received from the server (or undef before the first INFO). Useful for inspecting server_id, version, cluster, connect_urls, etc.

max_payload([$limit])

Server-advertised maximum payload size in bytes. Returns the current value; with an argument, overrides it (publishes above this croak locally before reaching the wire).

waiting_count

Number of writes queued locally during connect or reconnect (i.e. publish/request calls made while the connection is not yet ready). They flush when the handshake completes.

skip_waiting

Drop all queued writes without sending them. Useful before disconnect if reconnect is enabled and you don't want stale publishes replayed.

reconnect($enable, [$delay_ms], [$max_attempts])

Configure reconnection. $delay_ms and $max_attempts are only written when supplied; omitted args leave the existing value unchanged.

reconnect_enabled

Returns true if reconnect is enabled.

connect_timeout([$ms])

Get/set connect timeout.

ping_interval([$ms])

Get/set PING interval.

max_pings_outstanding([$num])

Get/set max outstanding PINGs.

priority([$num])

Get/set EV watcher priority.

keepalive([$seconds])

Get/set TCP keepalive.

batch($coderef)

Batch multiple publishes into a single write. Suppresses per-publish write scheduling; all buffered data is flushed after the coderef returns.

$nats->batch(sub {
    $nats->publish("foo.$_", "msg-$_") for 1..1000;
});

slow_consumer($bytes_threshold, [$cb])

Enable slow consumer detection. When the write buffer exceeds $bytes_threshold bytes, $cb is called with the current buffer size.

$nats->slow_consumer(1024*1024, sub {
    my ($pending_bytes) = @_;
    warn "slow consumer: ${pending_bytes}B pending\n";
});

on_lame_duck([$cb])

Get/set the lame-duck callback. Fires once when the server signals shutdown (leaf node, rolling restart) via INFO ldm:true. Use this to migrate work to another server before the grace period elapses.

nkey_seed($seed)

Set the NKey seed (the SU... base32-encoded form) for Ed25519 authentication. Requires the build to have OpenSSL (see "HAS_NKEY" in EV::Nats). The server nonce from INFO is automatically signed during CONNECT. May also be passed to "new" as nkey_seed => ....

jwt($token)

Set the user JWT. Combine with "nkey_seed" for NATS decentralized auth. May also be passed to "new". See "creds_file" for the common case of loading both from a .creds file.

EV::Nats->nkey_generate_user_seed

Class method. Returns a fresh, valid NATS User NKey seed (the SU... form). Useful for tests and provisioning scripts that don't have the nk CLI available. Requires HAS_NKEY; croaks otherwise.

EV::Nats->nkey_public_from_seed($seed)

Class method. Derives the matching public key (the U... form) from a User NKey seed. Croaks on an invalid seed. Pair with "nkey_generate_user_seed" to provision the server with the public key while the client keeps the seed.

tls($enable, [$ca_file], [$skip_verify])

Configure TLS. Requires OpenSSL at build time (see "HAS_TLS" in EV::Nats).

$nats->tls(1);                           # system CA
$nats->tls(1, '/path/to/ca.pem');        # custom CA
$nats->tls(1, undef, 1);                 # skip verification

When verification is enabled (the default), the server certificate's SAN must match either the resolved IP literal or the DNS hostname passed to "connect". May also be passed to "new" as tls => 1, tls_ca_file => $path.

stats

Returns a hash of connection counters:

my %s = $nats->stats;
# ( msgs_in, msgs_out, bytes_in, bytes_out )

reset_stats

Zero all counters returned by "stats".

on_error([$cb])

on_connect([$cb])

on_disconnect([$cb])

Get/set the corresponding callback at runtime. With no argument, returns the current value (or undef). With an argument, replaces it; pass undef to clear.

BUILD-TIME FEATURES

EV::Nats::HAS_TLS

True if compiled with OpenSSL (TLS supported).

EV::Nats::HAS_NKEY

True if NKey/JWT signing is available (also requires OpenSSL).

BENCHMARKS

Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12, 100-byte payloads (bench/benchmark.pl):

                            100K msgs    200K msgs
PUB fire-and-forget         4.7M         5.0M msgs/sec
PUB + SUB (loopback)        1.8M         1.6M msgs/sec
PUB + SUB (8B payload)      2.2M         1.9M msgs/sec
REQ/REP (pipelined, 64)     334K               msgs/sec

Connected-path publish appends directly to the write buffer with no per-message allocation. Write coalescing via ev_prepare batches all publishes per event-loop iteration into a single write() syscall.

Run perl bench/benchmark.pl for full results. Set BENCH_MESSAGES, BENCH_PAYLOAD, BENCH_HOST, BENCH_PORT to customize.

NATS PROTOCOL

This module implements the NATS client protocol directly in XS. The protocol is text-based with CRLF-delimited control lines and binary payloads.

Connection flow: server sends INFO, client sends CONNECT + PING, server responds with PONG to confirm. All subscriptions (including queue groups and auto-unsub state) are automatically restored on reconnect.

Request/reply uses a single wildcard inbox subscription (_INBOX.<random>.*) for all requests, with unique suffixes per request.

CAVEATS

  • DNS resolution via getaddrinfo is blocking. Use numeric IP addresses for latency-sensitive applications.

  • TLS requires OpenSSL headers at build time (auto-detected).

  • NKey auth requires OpenSSL with Ed25519 support (1.1.1+).

  • The module handles all data as bytes. Encode UTF-8 strings before passing them.

  • Do not let the EV::Nats instance go out of scope (or be explicitly undef-ed) from inside a callback while that callback is still executing. The callback closure normally references $nats (via $nats->publish(...) etc.), which keeps it alive; if you write a callback that does not capture $nats and you undef the last outer reference inside that callback, Perl will run DESTROY mid-callback and free the underlying state. Any subsequent operation on $nats in that callback is undefined behavior.

  • Cluster URL discovery (the connect_urls field of INFO) is trusted by default. On failover the client connects to whatever hostnames the previous server advertised, and TLS hostname verification is performed against those names. Use a private CA (tls_ca_file) to restrict which certificates are acceptable, or do not enable tls on public-CA topologies where any holder of a valid cert could redirect clients.

ENVIRONMENT

TEST_NATS_HOST, TEST_NATS_PORT

Set these to run the test suite against a NATS server (default: 127.0.0.1:4222).

SEE ALSO

EV::Nats::JetStream, EV::Nats::KV, EV::Nats::ObjectStore, EV, NATS protocol, nats-server.

AUTHOR

vividsnow

LICENSE

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.