NAME

EV::Nats - High-performance asynchronous NATS client using EV

SYNOPSIS

use EV::Nats;

my $nats = EV::Nats->new(
    host       => '127.0.0.1',
    port       => 4222,
    on_error   => sub { warn "nats error: @_" },
    on_connect => sub { warn "connected to NATS" },
);

# Subscribe
my $sid = $nats->subscribe('foo.>', sub {
    my ($subject, $payload, $reply) = @_;
    print "[$subject] $payload\n";
});

# Subscribe with queue group
$nats->subscribe('worker.>', sub {
    my ($subject, $payload, $reply) = @_;
}, 'workers');

# Publish
$nats->publish('foo.bar', 'hello world');

# Request/reply
$nats->request('service.echo', 'ping', sub {
    my ($response, $err) = @_;
    die $err if $err;
    print "reply: $response\n";
}, 5000);  # 5s timeout

# Unsubscribe
$nats->unsubscribe($sid);

EV::run;

DESCRIPTION

EV::Nats is a high-performance asynchronous NATS client that implements the NATS client protocol in pure XS with EV event loop integration. No external C NATS library is required.

Features:

  • Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG)

  • Request/reply with automatic inbox management

  • Queue group subscriptions for load balancing

  • Wildcard subjects (* and >)

  • Headers support (HPUB/HMSG)

  • Automatic PING/PONG keep-alive

  • Automatic reconnection with subscription and queue group restore

  • Fire-and-forget publish (no callback overhead)

  • Token, user/pass authentication

  • TCP keepalive and connect timeout

  • Write coalescing via ev_prepare (batches writes per event loop iteration)

  • O(1) subscription lookup via hash table

  • Graceful drain (unsubscribe all, flush, then disconnect)

  • Server pool with cluster URL failover from INFO connect_urls

  • Optional TLS via OpenSSL (auto-detected at build time)

  • Reconnect jitter to prevent thundering herd

  • Per-connection stats counters (msgs/bytes in/out)

  • JetStream API (EV::Nats::JetStream)

  • Key-Value store (EV::Nats::KV)

  • Object store with chunking (EV::Nats::ObjectStore)

  • NKey/JWT authentication (Ed25519 via OpenSSL)

  • Slow consumer detection with configurable threshold

  • Publish batching API (batch)

  • Lame duck mode (leaf node graceful shutdown) notification

Note: DNS resolution via getaddrinfo is blocking. Use numeric IP addresses for latency-sensitive applications.

METHODS

new(%options)

Create a new EV::Nats instance. Connects automatically if host is given.

my $nats = EV::Nats->new(
    host     => '127.0.0.1',
    port     => 4222,
    on_error => sub { die @_ },
);

Options:

host => 'Str'
port => 'Int' (default 4222)

Server hostname and port. If host is provided, connection starts immediately.

on_error => $cb->($errstr)

Error callback. Default: croak.

on_connect => $cb->()

Called when connection is fully established (after CONNECT/PONG handshake).

on_disconnect => $cb->()

Called on disconnect.

user => 'Str'
pass => 'Str'

Username/password authentication. Values are JSON-escaped in the CONNECT command.

token => 'Str'

Token authentication.

name => 'Str'

Client name sent in CONNECT.

verbose => $bool (default 0)

Request +OK acknowledgments from server.

pedantic => $bool (default 0)

Enable strict subject checking.

echo => $bool (default 1)

Receive messages published by this client.

no_responders => $bool (default 0)

Enable no-responders notification for requests.

reconnect => $bool (default 0)

Enable automatic reconnection.

reconnect_delay => $ms (default 2000)

Delay between reconnect attempts.

max_reconnect_attempts => $num (default 60)

Maximum reconnect attempts. 0 = unlimited.

connect_timeout => $ms

Connection timeout. 0 = no timeout.

ping_interval => $ms (default 120000)

Interval for client-initiated PING. 0 = disabled.

max_pings_outstanding => $num (default 2)

Max unanswered PINGs before declaring stale connection.

priority => $num (-2 to +2)

EV watcher priority.

keepalive => $seconds

TCP keepalive interval.

path => 'Str'

Unix socket path. Mutually exclusive with host.

loop => EV::Loop

EV loop to use. Default: EV::default_loop.

connect($host, [$port])

Connect to NATS server. Port defaults to 4222.

connect_unix($path)

Connect via Unix domain socket.

disconnect

Graceful disconnect.

is_connected

Returns true if connected.

publish($subject, [$payload], [$reply_to])

Publish a message. Alias: pub.

$nats->publish('foo', 'hello');
$nats->publish('foo', 'hello', 'reply.subject');

hpublish($subject, $headers, [$payload], [$reply_to])

Publish with headers. Alias: hpub.

$nats->hpublish('foo', "NATS/1.0\r\nX-Key: val\r\n\r\n", 'body');

subscribe($subject, $cb, [$queue_group])

Subscribe to a subject. Returns subscription ID. Alias: sub.

my $sid = $nats->subscribe('foo.*', sub {
    my ($subject, $payload, $reply, $headers) = @_;
});

Queue groups are preserved across reconnects.

Callback receives:

$subject - actual subject the message was published to
$payload - message body
$reply - reply-to subject (undef if none)
$headers - raw headers string (only for HMSG)

subscribe_max($subject, $cb, $max_msgs, [$queue_group])

Subscribe and auto-unsubscribe after $max_msgs messages in one call.

unsubscribe($sid, [$max_msgs])

Unsubscribe. With $max_msgs, auto-unsubscribes after receiving that many messages. Auto-unsub state is restored on reconnect. Alias: unsub.

request($subject, $payload, $cb, [$timeout_ms])

Request/reply. Uses automatic inbox subscription. Alias: req.

$nats->request('service', 'data', sub {
    my ($response, $err) = @_;
    die $err if $err;
    print "got: $response\n";
}, 5000);

Callback receives ($response, $error). Error is set on timeout ("request timeout") or no responders ("no responders").

drain([$cb])

Graceful shutdown: sends UNSUB for all subscriptions, flushes pending writes with a PING fence, fires $cb when the server confirms with PONG, then disconnects. No new messages will be received after drain is initiated.

$nats->drain(sub {
    print "drained, safe to exit\n";
});

ping

Send PING to server.

flush

Send PING as a write fence; the subsequent PONG guarantees all prior messages were processed by the server.

server_info

Returns raw INFO JSON string from server.

max_payload([$limit])

Get/set max payload size.

waiting_count

Number of writes queued locally (during connect/reconnect).

skip_waiting

Cancel all waiting writes.

reconnect($enable, [$delay_ms], [$max_attempts])

Configure reconnection.

reconnect_enabled

Returns true if reconnect is enabled.

connect_timeout([$ms])

Get/set connect timeout.

ping_interval([$ms])

Get/set PING interval.

max_pings_outstanding([$num])

Get/set max outstanding PINGs.

priority([$num])

Get/set EV watcher priority.

keepalive([$seconds])

Get/set TCP keepalive.

batch($coderef)

Batch multiple publishes into a single write. Suppresses per-publish write scheduling; all buffered data is flushed after the coderef returns.

$nats->batch(sub {
    $nats->publish("foo.$_", "msg-$_") for 1..1000;
});

slow_consumer($bytes_threshold, [$cb])

Enable slow consumer detection. When the write buffer exceeds $bytes_threshold bytes, $cb is called with the current buffer size.

$nats->slow_consumer(1024*1024, sub {
    my ($pending_bytes) = @_;
    warn "slow consumer: ${pending_bytes}B pending\n";
});

on_lame_duck([$cb])

Get/set callback for lame duck mode. Fired when the server signals it's shutting down (leaf node / rolling restart). Use this to migrate to another server.

nkey_seed($seed)

Set NKey seed for Ed25519 authentication (requires OpenSSL at build time). The seed is a base32-encoded NATS NKey. The server nonce from INFO is automatically signed during CONNECT.

$nats->nkey_seed('SUAM...');

Or via constructor: nkey_seed => 'SUAM...'.

jwt($token)

Set user JWT for authentication. Combined with nkey_seed for NATS decentralized auth.

tls($enable, [$ca_file], [$skip_verify])

Configure TLS (requires OpenSSL at build time).

$nats->tls(1);                           # system CA
$nats->tls(1, '/path/to/ca.pem');        # custom CA
$nats->tls(1, undef, 1);                 # skip verification

Or via constructor: tls => 1, tls_ca_file => $path.

stats

Returns a hash of connection statistics:

my %s = $nats->stats;
# msgs_in, msgs_out, bytes_in, bytes_out

reset_stats

Reset all stats counters to zero.

on_error([$cb])

on_connect([$cb])

on_disconnect([$cb])

Get/set handler callbacks.

BENCHMARKS

Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12, 100-byte payloads (bench/benchmark.pl):

                            100K msgs    200K msgs
PUB fire-and-forget         4.7M         5.0M msgs/sec
PUB + SUB (loopback)        1.8M         1.6M msgs/sec
PUB + SUB (8B payload)      2.2M         1.9M msgs/sec
REQ/REP (pipelined, 128)    334K               msgs/sec

Connected-path publish appends directly to the write buffer with no per-message allocation. Write coalescing via ev_prepare batches all publishes per event-loop iteration into a single write() syscall.

Run perl bench/benchmark.pl for full results. Set BENCH_MESSAGES, BENCH_PAYLOAD, BENCH_HOST, BENCH_PORT to customize.

NATS PROTOCOL

This module implements the NATS client protocol directly in XS. The protocol is text-based with CRLF-delimited control lines and binary payloads.

Connection flow: server sends INFO, client sends CONNECT + PING, server responds with PONG to confirm. All subscriptions (including queue groups and auto-unsub state) are automatically restored on reconnect.

Request/reply uses a single wildcard inbox subscription (_INBOX.<random>.*) for all requests, with unique suffixes per request.

CAVEATS

  • DNS resolution via getaddrinfo is blocking. Use numeric IP addresses for latency-sensitive applications.

  • TLS requires OpenSSL headers at build time (auto-detected).

  • NKey auth requires OpenSSL with Ed25519 support (1.1.1+).

  • The module handles all data as bytes. Encode UTF-8 strings before passing them.

ENVIRONMENT

TEST_NATS_HOST, TEST_NATS_PORT

Set these to run the test suite against a NATS server (default: 127.0.0.1:4222).

SEE ALSO

EV, NATS protocol, nats-server

AUTHOR

vividsnow

LICENSE

This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.