NAME
EV::Nats::ObjectStore - Chunked object store on top of NATS JetStream
SYNOPSIS
use EV;
use EV::Nats;
use EV::Nats::JetStream;
use EV::Nats::ObjectStore;
my $nats = EV::Nats->new(host => '127.0.0.1');
my $js = EV::Nats::JetStream->new(nats => $nats);
my $os = EV::Nats::ObjectStore->new(js => $js, bucket => 'files');
$os->create_bucket({}, sub {
$os->put('report.pdf', $pdf_data, sub {
my ($info, $err) = @_;
print "stored: $info->{size} bytes in $info->{chunks} chunks\n";
$os->get('report.pdf', sub {
my ($data, $err, $meta) = @_;
print "got $meta->{size} bytes back\n";
});
});
});
EV::run;
DESCRIPTION
An object-store bucket is a JetStream stream named OBJ_<bucket> with two subject groups:
$O.<bucket>.C.<nuid>- opaque chunks for one object (one chunk per stream message; the nuid is generated per object).$O.<bucket>.M.<encoded-name>- last-write-wins metadata describing an object (name, size, chunk count, SHA-256 digest).
put chunks the input, publishes each chunk via js_publish for durability, then writes a metadata entry. get fetches the metadata, walks the chunks back via STREAM.MSG.GET, and verifies the digest.
METHODS
All callbacks fire on the EV loop, not synchronously.
new(js => $js, bucket => $name, [chunk_size => $bytes])
Default chunk_size is 128 KiB. timeout defaults to the timeout of $js.
create_bucket(\%opts, $cb)
Provision the underlying stream. Recognised \%opts:
max_bytes- bucket-wide storage cap.max_age- per-message TTL in nanoseconds.replicas- cluster replication factor.
Callback: ($info, $err).
delete_bucket($cb)
Tear down the underlying stream. Callback: ($info, $err).
put($name, $data, $cb)
Store $data under $name, automatically chunked. Each chunk is published with JetStream ack; the metadata entry is written last so a partial upload doesn't surface a half-stored object. Callback: ($info, $err) where $info is { name, size, chunks, seq }.
get($name, $cb)
Retrieve a previously-stored object. Callback: ($data, $err, $meta). $data is undef if the object does not exist or has been deleted (the tombstone is recognised). On digest mismatch, $data is undef and $err is "digest mismatch".
delete($name, [$cb])
Looks up the object's nuid via metadata, purges all chunks under $O.<bucket>.C.<nuid> via STREAM.PURGE, then publishes a KV-Operation: PURGE tombstone on the metadata subject followed by a flush fence. Callback: ($ok, $err); $err is set if the chunk purge or flush failed.
info($name, $cb)
Fetch only the metadata entry for an object, without downloading chunks. Callback: (\%meta, $err); \%meta is undef if the object does not exist or was deleted (the tombstone is recognised). This is the recommended way to filter live objects out of a "list" result.
list($cb)
List names of all live objects in the bucket. Callback: (\@names, $err). Tombstoned entries appear in the listing -- call info to filter.
status($cb)
Returns a snapshot hashref:
{ bucket => $name, bytes => $n, sealed => 0|1 }
sealed reflects the underlying stream's config.sealed flag; this client never seals on its own, so unless someone manually sealed the stream out-of-band the value is always 0.
Callback: (\%status, $err).
SEE ALSO
EV::Nats, EV::Nats::JetStream, EV::Nats::KV, NATS Object Store.