NAME
EV::Nats::KV - Key-Value store on top of NATS JetStream
SYNOPSIS
use EV;
use EV::Nats;
use EV::Nats::JetStream;
use EV::Nats::KV;
my $nats = EV::Nats->new(host => '127.0.0.1');
my $js = EV::Nats::JetStream->new(nats => $nats);
my $kv = EV::Nats::KV->new(js => $js, bucket => 'config');
$kv->create_bucket({}, sub {
$kv->put('app.setting', 'on', sub {
$kv->get('app.setting', sub {
my ($val, $err) = @_;
print "got: $val\n";
});
});
});
# Live updates
$kv->watch('app.>', sub {
my ($key, $value, $op) = @_;
print "$op $key = $value\n";
});
EV::run;
DESCRIPTION
A KV bucket is a JetStream stream named KV_<bucket> with subjects $KV.<bucket>.>, history of 1, rollup headers allowed, and deletes denied (purge tombstones are used instead). This module wraps that convention: put/get become single calls that hide the underlying js_publish + STREAM.MSG.GET dance.
METHODS
All callbacks fire on the EV loop, not synchronously.
new(js => $js, bucket => $name, [timeout => $ms])
The bucket need not exist yet; call "create_bucket" first to provision it. timeout defaults to the timeout of $js.
get($key, $cb)
Fetch the latest value for $key. Callback: ($value, $err). $value is undef if the key does not exist, or has been deleted or purged (the tombstone is recognised and surfaces as a clean miss).
put($key, $value, $cb)
Set $key to $value. Callback: ($seq, $err), where $seq is the JetStream sequence number assigned by the server.
create($key, $value, $cb)
Like put, but only succeeds if $key does not yet exist. Uses Nats-Expected-Last-Subject-Sequence: 0; concurrent creators see a wrong-last-sequence error from the server. Callback: ($seq, $err).
delete($key, [$cb])
Mark $key as deleted by publishing a KV-Operation: DEL tombstone, followed by a flush fence so a subsequent get won't race the publish. Callback: ($ok, $err); $err is set if the connection dropped before the PONG arrived.
purge($key, [$cb])
Like delete, but emits Nats-Rollup: sub too -- the prior history of $key is rolled up and replaced by the tombstone, freeing storage. Callback: ($ok, $err).
keys($cb)
List all keys currently stored in the bucket. Callback: (\@keys, $err). Tombstoned keys are not filtered out -- check with get if needed.
watch($pattern, $cb)
Subscribe to live changes. $pattern is a NATS subject suffix relative to the bucket (e.g. > for all keys, app.> for everything under app.). Callback receives ($key, $value, $op) where $op is PUT, DEL, or PURGE. Returns the underlying subscription id; pass to "unsubscribe" in EV::Nats to stop.
create_bucket(\%opts, $cb)
Provision the underlying stream. Recognised \%opts:
max_history- default 1; how many revisions to keep per key.max_bytes- bucket-wide storage cap.max_age- per-message TTL in nanoseconds.replicas- cluster replication factor.
Callback: ($info, $err).
delete_bucket($cb)
Tear down the underlying stream. Callback: ($info, $err).
status($cb)
Returns a snapshot hashref:
{ bucket => $name, values => $count, bytes => $n, history => $h }
Callback: (\%status, $err).
SEE ALSO
EV::Nats, EV::Nats::JetStream, EV::Nats::ObjectStore, NATS KV documentation.