NAME

EV::Nats::KV - Key-Value store on top of NATS JetStream

SYNOPSIS

use EV;
use EV::Nats;
use EV::Nats::JetStream;
use EV::Nats::KV;

my $nats = EV::Nats->new(host => '127.0.0.1');
my $js   = EV::Nats::JetStream->new(nats => $nats);
my $kv   = EV::Nats::KV->new(js => $js, bucket => 'config');

$kv->create_bucket({}, sub {
    $kv->put('app.setting', 'on', sub {
        $kv->get('app.setting', sub {
            my ($val, $err) = @_;
            print "got: $val\n";
        });
    });
});

# Live updates
$kv->watch('app.>', sub {
    my ($key, $value, $op) = @_;
    print "$op $key = $value\n";
});

EV::run;

DESCRIPTION

A KV bucket is a JetStream stream named KV_<bucket> with subjects $KV.<bucket>.>, history of 1, rollup headers allowed, and deletes denied (purge tombstones are used instead). This module wraps that convention: put/get become single calls that hide the underlying js_publish + STREAM.MSG.GET dance.

METHODS

All callbacks fire on the EV loop, not synchronously.

new(js => $js, bucket => $name, [timeout => $ms])

The bucket need not exist yet; call "create_bucket" first to provision it. timeout defaults to the timeout of $js.

get($key, $cb)

Fetch the latest value for $key. Callback: ($value, $err). $value is undef if the key does not exist, or has been deleted or purged (the tombstone is recognised and surfaces as a clean miss).

put($key, $value, $cb)

Set $key to $value. Callback: ($seq, $err), where $seq is the JetStream sequence number assigned by the server.

create($key, $value, $cb)

Like put, but only succeeds if $key does not yet exist. Uses Nats-Expected-Last-Subject-Sequence: 0; concurrent creators see a wrong-last-sequence error from the server. Callback: ($seq, $err).

delete($key, [$cb])

Mark $key as deleted by publishing a KV-Operation: DEL tombstone, followed by a flush fence so a subsequent get won't race the publish. Callback: ($ok, $err); $err is set if the connection dropped before the PONG arrived.

purge($key, [$cb])

Like delete, but emits Nats-Rollup: sub too -- the prior history of $key is rolled up and replaced by the tombstone, freeing storage. Callback: ($ok, $err).

keys($cb)

List all keys currently stored in the bucket. Callback: (\@keys, $err). Tombstoned keys are not filtered out -- check with get if needed.

watch($pattern, $cb)

Subscribe to live changes. $pattern is a NATS subject suffix relative to the bucket (e.g. > for all keys, app.> for everything under app.). Callback receives ($key, $value, $op) where $op is PUT, DEL, or PURGE. Returns the underlying subscription id; pass to "unsubscribe" in EV::Nats to stop.

create_bucket(\%opts, $cb)

Provision the underlying stream. Recognised \%opts:

  • max_history - default 1; how many revisions to keep per key.

  • max_bytes - bucket-wide storage cap.

  • max_age - per-message TTL in nanoseconds.

  • replicas - cluster replication factor.

Callback: ($info, $err).

delete_bucket($cb)

Tear down the underlying stream. Callback: ($info, $err).

status($cb)

Returns a snapshot hashref:

{ bucket => $name, values => $count, bytes => $n, history => $h }

Callback: (\%status, $err).

SEE ALSO

EV::Nats, EV::Nats::JetStream, EV::Nats::ObjectStore, NATS KV documentation.