NAME

EV::Nats::JetStream - JetStream API for EV::Nats

SYNOPSIS

use EV::Nats;
use EV::Nats::JetStream;

my $nats = EV::Nats->new(host => '127.0.0.1', ...);
my $js = EV::Nats::JetStream->new(nats => $nats);

# Create stream
$js->stream_create({
    name     => 'ORDERS',
    subjects => ['orders.>'],
}, sub {
    my ($info, $err) = @_;
    die $err if $err;
    print "stream created: $info->{config}{name}\n";
});

# Publish with ack
$js->js_publish('orders.new', '{"item":"widget"}', sub {
    my ($ack, $err) = @_;
    die $err if $err;
    print "published: seq=$ack->{seq}\n";
});

METHODS

new(%opts)

my $js = EV::Nats::JetStream->new(
    nats    => $nats,
    prefix  => '$JS.API',   # default
    timeout => 5000,         # ms
);

stream_create($config, $cb)

stream_update($config, $cb)

stream_delete($name, $cb)

stream_info($name, $cb)

stream_list($cb)

stream_purge($name, $cb)

consumer_create($stream, $config, $cb)

consumer_delete($stream, $consumer, $cb)

consumer_info($stream, $consumer, $cb)

consumer_list($stream, $cb)

js_publish($subject, $payload, $cb)

Publish with JetStream acknowledgment.

fetch($stream, $consumer, \%opts, $cb)

Pull messages from a consumer. Options: batch, expires.