NAME
EV::Nats - High-performance asynchronous NATS client using EV
SYNOPSIS
use EV::Nats;
my $nats = EV::Nats->new(
host => '127.0.0.1',
port => 4222,
on_error => sub { warn "nats error: @_" },
on_connect => sub { warn "connected to NATS" },
);
# Subscribe
my $sid = $nats->subscribe('foo.>', sub {
my ($subject, $payload, $reply) = @_;
print "[$subject] $payload\n";
});
# Subscribe with queue group
$nats->subscribe('worker.>', sub {
my ($subject, $payload, $reply) = @_;
}, 'workers');
# Publish
$nats->publish('foo.bar', 'hello world');
# Request/reply
$nats->request('service.echo', 'ping', sub {
my ($response, $err) = @_;
die $err if $err;
print "reply: $response\n";
}, 5000); # 5s timeout
# Unsubscribe
$nats->unsubscribe($sid);
EV::run;
DESCRIPTION
EV::Nats is a high-performance asynchronous NATS client that implements the NATS client protocol in pure XS with EV event loop integration. No external C NATS library is required.
Features:
Full NATS client protocol (PUB, SUB, UNSUB, MSG, HMSG)
Request/reply with automatic inbox management
Queue group subscriptions for load balancing
Wildcard subjects (
*and>)Headers support (HPUB/HMSG)
Automatic PING/PONG keep-alive
Automatic reconnection with subscription and queue group restore
Fire-and-forget publish (no callback overhead)
Token, user/pass authentication
TCP keepalive and connect timeout
Write coalescing via ev_prepare (batches writes per event loop iteration)
O(1) subscription lookup via hash table
Graceful drain (unsubscribe all, flush, then disconnect)
Server pool with cluster URL failover from INFO connect_urls
Optional TLS via OpenSSL (auto-detected at build time)
Reconnect jitter to prevent thundering herd
Per-connection stats counters (msgs/bytes in/out)
JetStream API (EV::Nats::JetStream)
Key-Value store (EV::Nats::KV)
Object store with chunking (EV::Nats::ObjectStore)
NKey/JWT authentication (Ed25519 via OpenSSL)
Slow consumer detection with configurable threshold
Publish batching API (
batch)Lame duck mode (leaf node graceful shutdown) notification
Note: DNS resolution via getaddrinfo is blocking. Use numeric IP addresses for latency-sensitive applications.
METHODS
new(%options)
Create a new EV::Nats instance. Connects automatically if host is given.
my $nats = EV::Nats->new(
host => '127.0.0.1',
port => 4222,
on_error => sub { die @_ },
);
Options:
- host => 'Str'
- port => 'Int' (default 4222)
-
Server hostname and port. If
hostis provided, connection starts immediately. - on_error => $cb->($errstr)
-
Error callback. Default:
croak. - on_connect => $cb->()
-
Called when connection is fully established (after CONNECT/PONG handshake).
- on_disconnect => $cb->()
-
Called on disconnect.
- user => 'Str'
- pass => 'Str'
-
Username/password authentication. Values are JSON-escaped in the CONNECT command.
- token => 'Str'
-
Token authentication.
- name => 'Str'
-
Client name sent in CONNECT.
- verbose => $bool (default 0)
-
Request +OK acknowledgments from server.
- pedantic => $bool (default 0)
-
Enable strict subject checking.
- echo => $bool (default 1)
-
Receive messages published by this client.
- no_responders => $bool (default 0)
-
Enable no-responders notification for requests.
- reconnect => $bool (default 0)
-
Enable automatic reconnection.
- reconnect_delay => $ms (default 2000)
-
Delay between reconnect attempts.
- max_reconnect_attempts => $num (default 60)
-
Maximum reconnect attempts. 0 = unlimited.
- connect_timeout => $ms
-
Connection timeout. 0 = no timeout.
- ping_interval => $ms (default 120000)
-
Interval for client-initiated PING. 0 = disabled.
- max_pings_outstanding => $num (default 2)
-
Max unanswered PINGs before declaring stale connection.
- priority => $num (-2 to +2)
-
EV watcher priority.
- keepalive => $seconds
-
TCP keepalive interval.
- path => 'Str'
-
Unix socket path. Mutually exclusive with
host. - loop => EV::Loop
-
EV loop to use. Default:
EV::default_loop.
connect($host, [$port])
Connect to NATS server. Port defaults to 4222.
connect_unix($path)
Connect via Unix domain socket.
disconnect
Graceful disconnect.
is_connected
Returns true if connected.
publish($subject, [$payload], [$reply_to])
Publish a message. Alias: pub.
$nats->publish('foo', 'hello');
$nats->publish('foo', 'hello', 'reply.subject');
hpublish($subject, $headers, [$payload], [$reply_to])
Publish with headers. Alias: hpub.
$nats->hpublish('foo', "NATS/1.0\r\nX-Key: val\r\n\r\n", 'body');
subscribe($subject, $cb, [$queue_group])
Subscribe to a subject. Returns subscription ID. Alias: sub.
my $sid = $nats->subscribe('foo.*', sub {
my ($subject, $payload, $reply, $headers) = @_;
});
Queue groups are preserved across reconnects.
Callback receives:
$subject- actual subject the message was published to$payload- message body$reply- reply-to subject (undef if none)$headers- raw headers string (only for HMSG)
subscribe_max($subject, $cb, $max_msgs, [$queue_group])
Subscribe and auto-unsubscribe after $max_msgs messages in one call.
unsubscribe($sid, [$max_msgs])
Unsubscribe. With $max_msgs, auto-unsubscribes after receiving that many messages. Auto-unsub state is restored on reconnect. Alias: unsub.
request($subject, $payload, $cb, [$timeout_ms])
Request/reply. Uses automatic inbox subscription. Alias: req.
$nats->request('service', 'data', sub {
my ($response, $err) = @_;
die $err if $err;
print "got: $response\n";
}, 5000);
Callback receives ($response, $error). Error is set on timeout ("request timeout") or no responders ("no responders").
drain([$cb])
Graceful shutdown: sends UNSUB for all subscriptions, flushes pending writes with a PING fence, fires $cb when the server confirms with PONG, then disconnects. No new messages will be received after drain is initiated.
$nats->drain(sub {
print "drained, safe to exit\n";
});
ping
Send PING to server.
flush
Send PING as a write fence; the subsequent PONG guarantees all prior messages were processed by the server.
server_info
Returns raw INFO JSON string from server.
max_payload([$limit])
Get/set max payload size.
waiting_count
Number of writes queued locally (during connect/reconnect).
skip_waiting
Cancel all waiting writes.
reconnect($enable, [$delay_ms], [$max_attempts])
Configure reconnection.
reconnect_enabled
Returns true if reconnect is enabled.
connect_timeout([$ms])
Get/set connect timeout.
ping_interval([$ms])
Get/set PING interval.
max_pings_outstanding([$num])
Get/set max outstanding PINGs.
priority([$num])
Get/set EV watcher priority.
keepalive([$seconds])
Get/set TCP keepalive.
batch($coderef)
Batch multiple publishes into a single write. Suppresses per-publish write scheduling; all buffered data is flushed after the coderef returns.
$nats->batch(sub {
$nats->publish("foo.$_", "msg-$_") for 1..1000;
});
slow_consumer($bytes_threshold, [$cb])
Enable slow consumer detection. When the write buffer exceeds $bytes_threshold bytes, $cb is called with the current buffer size.
$nats->slow_consumer(1024*1024, sub {
my ($pending_bytes) = @_;
warn "slow consumer: ${pending_bytes}B pending\n";
});
on_lame_duck([$cb])
Get/set callback for lame duck mode. Fired when the server signals it's shutting down (leaf node / rolling restart). Use this to migrate to another server.
nkey_seed($seed)
Set NKey seed for Ed25519 authentication (requires OpenSSL at build time). The seed is a base32-encoded NATS NKey. The server nonce from INFO is automatically signed during CONNECT.
$nats->nkey_seed('SUAM...');
Or via constructor: nkey_seed => 'SUAM...'.
jwt($token)
Set user JWT for authentication. Combined with nkey_seed for NATS decentralized auth.
tls($enable, [$ca_file], [$skip_verify])
Configure TLS (requires OpenSSL at build time).
$nats->tls(1); # system CA
$nats->tls(1, '/path/to/ca.pem'); # custom CA
$nats->tls(1, undef, 1); # skip verification
Or via constructor: tls => 1, tls_ca_file => $path.
stats
Returns a hash of connection statistics:
my %s = $nats->stats;
# msgs_in, msgs_out, bytes_in, bytes_out
reset_stats
Reset all stats counters to zero.
on_error([$cb])
on_connect([$cb])
on_disconnect([$cb])
Get/set handler callbacks.
BENCHMARKS
Measured on Linux with TCP loopback, Perl 5.40, nats-server 2.12, 100-byte payloads (bench/benchmark.pl):
100K msgs 200K msgs
PUB fire-and-forget 4.7M 5.0M msgs/sec
PUB + SUB (loopback) 1.8M 1.6M msgs/sec
PUB + SUB (8B payload) 2.2M 1.9M msgs/sec
REQ/REP (pipelined, 128) 334K msgs/sec
Connected-path publish appends directly to the write buffer with no per-message allocation. Write coalescing via ev_prepare batches all publishes per event-loop iteration into a single write() syscall.
Run perl bench/benchmark.pl for full results. Set BENCH_MESSAGES, BENCH_PAYLOAD, BENCH_HOST, BENCH_PORT to customize.
NATS PROTOCOL
This module implements the NATS client protocol directly in XS. The protocol is text-based with CRLF-delimited control lines and binary payloads.
Connection flow: server sends INFO, client sends CONNECT + PING, server responds with PONG to confirm. All subscriptions (including queue groups and auto-unsub state) are automatically restored on reconnect.
Request/reply uses a single wildcard inbox subscription (_INBOX.<random>.*) for all requests, with unique suffixes per request.
CAVEATS
DNS resolution via
getaddrinfois blocking. Use numeric IP addresses for latency-sensitive applications.TLS requires OpenSSL headers at build time (auto-detected).
NKey auth requires OpenSSL with Ed25519 support (1.1.1+).
The module handles all data as bytes. Encode UTF-8 strings before passing them.
ENVIRONMENT
- TEST_NATS_HOST, TEST_NATS_PORT
-
Set these to run the test suite against a NATS server (default: 127.0.0.1:4222).
SEE ALSO
EV, NATS protocol, nats-server
AUTHOR
vividsnow
LICENSE
This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself.