NAME
EV::Nats::JetStream - JetStream API for EV::Nats
SYNOPSIS
use EV::Nats;
use EV::Nats::JetStream;
my $nats = EV::Nats->new(host => '127.0.0.1', ...);
my $js = EV::Nats::JetStream->new(nats => $nats);
# Create stream
$js->stream_create({
name => 'ORDERS',
subjects => ['orders.>'],
}, sub {
my ($info, $err) = @_;
die $err if $err;
print "stream created: $info->{config}{name}\n";
});
# Publish with ack
$js->js_publish('orders.new', '{"item":"widget"}', sub {
my ($ack, $err) = @_;
die $err if $err;
print "published: seq=$ack->{seq}\n";
});
METHODS
new(%opts)
my $js = EV::Nats::JetStream->new(
nats => $nats,
prefix => '$JS.API', # default
timeout => 5000, # ms
);
stream_create($config, $cb)
stream_update($config, $cb)
stream_delete($name, $cb)
stream_info($name, $cb)
stream_list($cb)
stream_purge($name, $cb)
consumer_create($stream, $config, $cb)
consumer_delete($stream, $consumer, $cb)
consumer_info($stream, $consumer, $cb)
consumer_list($stream, $cb)
js_publish($subject, $payload, $cb)
Publish with JetStream acknowledgment.
fetch($stream, $consumer, \%opts, $cb)
Pull messages from a consumer. Options: batch, expires.