NAME
EV::Nats::JetStream - JetStream API client for EV::Nats
SYNOPSIS
use EV;
use EV::Nats;
use EV::Nats::JetStream;
my $nats = EV::Nats->new(host => '127.0.0.1');
my $js = EV::Nats::JetStream->new(nats => $nats);
$js->stream_create({ name => 'ORDERS', subjects => ['orders.>'] },
sub {
my ($info, $err) = @_;
die $err if $err;
$js->js_publish('orders.new', '{"item":"widget"}', sub {
my ($ack, $err) = @_;
print "stored at seq=$ack->{seq}\n";
});
});
EV::run;
DESCRIPTION
Thin async wrapper over the JetStream $JS.API.* request/reply endpoints. Each method is a single request whose callback is invoked with the decoded JSON response (or an error string). The $nats connection passed to "new" handles all the actual I/O.
EV::Nats::KV and EV::Nats::ObjectStore build on top of this module -- see those for higher-level KV / blob APIs.
METHODS
All methods are async. Callbacks fire on the EV loop.
new(%opts)
my $js = EV::Nats::JetStream->new(
nats => $nats,
prefix => '$JS.API', # default API subject prefix
timeout => 5000, # ms; default 5000
);
Stream management
stream_create($config, $cb)
Create a stream. $config is passed verbatim as the StreamConfig request body. Callback: ($info, $err).
stream_update($config, $cb)
Update an existing stream. Same shape as stream_create.
stream_delete($name, $cb)
Delete the stream by name.
stream_info($name, [\%opts], $cb)
Fetch stream config + state. Optional \%opts may include subjects_filter (e.g. >) to populate state.subjects; without it the server omits that field for performance.
stream_list($cb)
List all streams' info.
stream_purge($name, $cb)
Purge all messages from the stream.
stream_msg_get($stream, \%opts, $cb)
Fetch a single message from $stream. \%opts selects the message:
{ seq => $n } # by sequence number
{ last_by_subj => $subject } # latest matching subject
{ next_by_subj => $subject, seq => $start } # next at-or-after $start
The message body and headers in the response are base64-encoded under $resp->{message}{data} and $resp->{message}{hdrs}.
Consumer management
consumer_create($stream, $config, $cb)
Create a consumer (push or pull). $config is the consumer config hashref; durable_name makes it durable, ack_policy controls ack semantics.
consumer_delete($stream, $consumer, $cb)
consumer_info($stream, $consumer, $cb)
consumer_list($stream, $cb)
Publishing and fetching
js_publish($subject, $payload, $cb)
Publish with JetStream acknowledgment. Callback: ($ack, $err) where $ack is { stream, seq, duplicate }.
fetch($stream, $consumer, \%opts, $cb)
Pull messages from a pull-mode consumer. Options:
batch-
Maximum number of messages to fetch (default 1).
expires-
Server-side wait time in nanoseconds (default 5_000_000_000 = 5s).
no_wait-
If true, return immediately if no messages are currently available.
Callback: (\@messages, $err). Each message is a hashref:
{
subject => 'orders.new',
payload => '...',
reply => '$JS.ACK....', # for explicit ack/nak/wpi
headers => "...", # raw NATS/1.0 header block, or undef
}
To acknowledge a message:
$nats->publish($msg->{reply}, '+ACK'); # success
$nats->publish($msg->{reply}, '-NAK'); # negative --redeliver after ack_wait
$nats->publish($msg->{reply}, '+WPI'); # work-in-progress --extend ack_wait
INTERNAL
These are exposed for sibling modules (EV::Nats::KV, EV::Nats::ObjectStore) -- not part of the end-user API and subject to change.
decode_json_or_error($json)
Decode $json. Returns ($decoded, $error_or_undef); the error string already includes a "JSON decode error: " prefix when set. Gates on $@ so falsy-but-valid JSON (null, 0, false, empty string) is reported as a clean decode rather than a failure.
msg_is_tombstone($msg)
True if a STREAM.MSG.GET response message carries a KV-Operation: DEL or KV-Operation: PURGE header. $msg is the decoded message hash from the response. Used by EV::Nats::KV and EV::Nats::ObjectStore to surface deleted/purged entries as clean misses rather than as malformed payloads.
SEE ALSO
EV::Nats, EV::Nats::KV, EV::Nats::ObjectStore, JetStream API reference.