NAME

EV::Nats::JetStream - JetStream API client for EV::Nats

SYNOPSIS

use EV;
use EV::Nats;
use EV::Nats::JetStream;

my $nats = EV::Nats->new(host => '127.0.0.1');
my $js   = EV::Nats::JetStream->new(nats => $nats);

$js->stream_create({ name => 'ORDERS', subjects => ['orders.>'] },
                   sub {
    my ($info, $err) = @_;
    die $err if $err;
    $js->js_publish('orders.new', '{"item":"widget"}', sub {
        my ($ack, $err) = @_;
        print "stored at seq=$ack->{seq}\n";
    });
});

EV::run;

DESCRIPTION

Thin async wrapper over the JetStream $JS.API.* request/reply endpoints. Each method is a single request whose callback is invoked with the decoded JSON response (or an error string). The $nats connection passed to "new" handles all the actual I/O.

EV::Nats::KV and EV::Nats::ObjectStore build on top of this module -- see those for higher-level KV / blob APIs.

METHODS

All methods are async. Callbacks fire on the EV loop.

new(%opts)

my $js = EV::Nats::JetStream->new(
    nats    => $nats,
    prefix  => '$JS.API',   # default API subject prefix
    timeout => 5000,        # ms; default 5000
);

Stream management

stream_create($config, $cb)

Create a stream. $config is passed verbatim as the StreamConfig request body. Callback: ($info, $err).

stream_update($config, $cb)

Update an existing stream. Same shape as stream_create.

stream_delete($name, $cb)

Delete the stream by name.

stream_info($name, [\%opts], $cb)

Fetch stream config + state. Optional \%opts may include subjects_filter (e.g. >) to populate state.subjects; without it the server omits that field for performance.

stream_list($cb)

List all streams' info.

stream_purge($name, $cb)

Purge all messages from the stream.

stream_msg_get($stream, \%opts, $cb)

Fetch a single message from $stream. \%opts selects the message:

{ seq => $n }                                # by sequence number
{ last_by_subj => $subject }                 # latest matching subject
{ next_by_subj => $subject, seq => $start }  # next at-or-after $start

The message body and headers in the response are base64-encoded under $resp->{message}{data} and $resp->{message}{hdrs}.

Consumer management

consumer_create($stream, $config, $cb)

Create a consumer (push or pull). $config is the consumer config hashref; durable_name makes it durable, ack_policy controls ack semantics.

consumer_delete($stream, $consumer, $cb)

consumer_info($stream, $consumer, $cb)

consumer_list($stream, $cb)

Publishing and fetching

js_publish($subject, $payload, $cb)

Publish with JetStream acknowledgment. Callback: ($ack, $err) where $ack is { stream, seq, duplicate }.

fetch($stream, $consumer, \%opts, $cb)

Pull messages from a pull-mode consumer. Options:

batch

Maximum number of messages to fetch (default 1).

expires

Server-side wait time in nanoseconds (default 5_000_000_000 = 5s).

no_wait

If true, return immediately if no messages are currently available.

Callback: (\@messages, $err). Each message is a hashref:

{
    subject => 'orders.new',
    payload => '...',
    reply   => '$JS.ACK....',  # for explicit ack/nak/wpi
    headers => "...",          # raw NATS/1.0 header block, or undef
}

To acknowledge a message:

$nats->publish($msg->{reply}, '+ACK');   # success
$nats->publish($msg->{reply}, '-NAK');   # negative --redeliver after ack_wait
$nats->publish($msg->{reply}, '+WPI');   # work-in-progress --extend ack_wait

INTERNAL

These are exposed for sibling modules (EV::Nats::KV, EV::Nats::ObjectStore) -- not part of the end-user API and subject to change.

decode_json_or_error($json)

Decode $json. Returns ($decoded, $error_or_undef); the error string already includes a "JSON decode error: " prefix when set. Gates on $@ so falsy-but-valid JSON (null, 0, false, empty string) is reported as a clean decode rather than a failure.

msg_is_tombstone($msg)

True if a STREAM.MSG.GET response message carries a KV-Operation: DEL or KV-Operation: PURGE header. $msg is the decoded message hash from the response. Used by EV::Nats::KV and EV::Nats::ObjectStore to surface deleted/purged entries as clean misses rather than as malformed payloads.

SEE ALSO

EV::Nats, EV::Nats::KV, EV::Nats::ObjectStore, JetStream API reference.