NAME

EV::Nats::ObjectStore - Chunked object store on top of NATS JetStream

SYNOPSIS

use EV;
use EV::Nats;
use EV::Nats::JetStream;
use EV::Nats::ObjectStore;

my $nats = EV::Nats->new(host => '127.0.0.1');
my $js   = EV::Nats::JetStream->new(nats => $nats);
my $os   = EV::Nats::ObjectStore->new(js => $js, bucket => 'files');

$os->create_bucket({}, sub {
    $os->put('report.pdf', $pdf_data, sub {
        my ($info, $err) = @_;
        print "stored: $info->{size} bytes in $info->{chunks} chunks\n";
        $os->get('report.pdf', sub {
            my ($data, $err, $meta) = @_;
            print "got $meta->{size} bytes back\n";
        });
    });
});

EV::run;

DESCRIPTION

An object-store bucket is a JetStream stream named OBJ_<bucket> with two subject groups:

  • $O.<bucket>.C.<nuid> - opaque chunks for one object (one chunk per stream message; the nuid is generated per object).

  • $O.<bucket>.M.<encoded-name> - last-write-wins metadata describing an object (name, size, chunk count, SHA-256 digest).

put chunks the input, publishes each chunk via js_publish for durability, then writes a metadata entry. get fetches the metadata, walks the chunks back via STREAM.MSG.GET, and verifies the digest.

METHODS

All callbacks fire on the EV loop, not synchronously.

new(js => $js, bucket => $name, [chunk_size => $bytes])

Default chunk_size is 128 KiB. timeout defaults to the timeout of $js.

create_bucket(\%opts, $cb)

Provision the underlying stream. Recognised \%opts:

  • max_bytes - bucket-wide storage cap.

  • max_age - per-message TTL in nanoseconds.

  • replicas - cluster replication factor.

Callback: ($info, $err).

delete_bucket($cb)

Tear down the underlying stream. Callback: ($info, $err).

put($name, $data, $cb)

Store $data under $name, automatically chunked. Each chunk is published with JetStream ack; the metadata entry is written last so a partial upload doesn't surface a half-stored object. Callback: ($info, $err) where $info is { name, size, chunks, seq }.

get($name, $cb)

Retrieve a previously-stored object. Callback: ($data, $err, $meta). $data is undef if the object does not exist or has been deleted (the tombstone is recognised). On digest mismatch, $data is undef and $err is "digest mismatch".

delete($name, [$cb])

Looks up the object's nuid via metadata, purges all chunks under $O.<bucket>.C.<nuid> via STREAM.PURGE, then publishes a KV-Operation: PURGE tombstone on the metadata subject followed by a flush fence. Callback: ($ok, $err); $err is set if the chunk purge or flush failed.

info($name, $cb)

Fetch only the metadata entry for an object, without downloading chunks. Callback: (\%meta, $err); \%meta is undef if the object does not exist or was deleted (the tombstone is recognised). This is the recommended way to filter live objects out of a "list" result.

list($cb)

List names of all live objects in the bucket. Callback: (\@names, $err). Tombstoned entries appear in the listing -- call info to filter.

status($cb)

Returns a snapshot hashref:

{ bucket => $name, bytes => $n, sealed => 0|1 }

sealed reflects the underlying stream's config.sealed flag; this client never seals on its own, so unless someone manually sealed the stream out-of-band the value is always 0.

Callback: (\%status, $err).

SEE ALSO

EV::Nats, EV::Nats::JetStream, EV::Nats::KV, NATS Object Store.