NAME

Data::PubSub::Shared - High-performance shared-memory pub/sub for Linux

SYNOPSIS

use Data::PubSub::Shared;

# Publisher
my $ps = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024);
$ps->publish(42);
$ps->publish_multi(1, 2, 3);

# Subscriber (same or different process)
my $ps2 = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024);
my $sub  = $ps2->subscribe;       # future messages only
my $sub2 = $ps2->subscribe_all;  # from oldest available

# Polling
my $val = $sub->poll;            # non-blocking, undef if nothing
my @v   = $sub->poll_multi(10);  # batch poll
my @v   = $sub->drain;           # poll all available
my @v   = $sub->drain(100);      # poll up to 100
my $val = $sub->poll_wait;       # blocking, infinite wait
my $val = $sub->poll_wait(1.5);  # with timeout
my @v   = $sub->poll_wait_multi(10, 1.5);  # block for >=1, grab up to 10

# Combined publish + eventfd notify
$ps->publish_notify($value);

# Status
my $n = $sub->lag;               # messages behind
my $n = $sub->overflow_count;    # total messages skipped
$sub->reset;                     # skip to latest
$sub->reset_oldest;              # go back to oldest available

# String variant
my $ps = Data::PubSub::Shared::Str->new('/tmp/ps.shm', 1024);
$ps->publish("hello world");
my $sub = $ps->subscribe;
my $msg = $sub->poll;

# Anonymous (fork-inherited)
my $ps = Data::PubSub::Shared::Int->new(undef, 1024);

# memfd-backed (shareable via fd passing)
my $ps = Data::PubSub::Shared::Int->new_memfd("myps", 1024);
my $fd = $ps->memfd;
my $ps2 = Data::PubSub::Shared::Int->new_from_fd($fd);

# Multiprocess
if (fork() == 0) {
    my $child = Data::PubSub::Shared::Int->new('/tmp/ps.shm', 1024);
    my $sub = $child->subscribe;
    while (defined(my $v = $sub->poll_wait(1))) {
        print "got: $v\n";
    }
    exit;
}
$ps->publish(99);
wait;

DESCRIPTION

Data::PubSub::Shared provides broadcast pub/sub over shared memory (mmap(MAP_SHARED)). Publishers write to a ring buffer; each subscriber independently reads with its own cursor. Messages are never consumed -- the ring overwrites old data when it wraps. Slow subscribers auto-recover by resetting to the oldest available position.

Linux-only. Requires 64-bit Perl.

Features

  • File-backed mmap for cross-process sharing

  • Lock-free MPMC publish for Int (atomic fetch-and-add)

  • Lock-free subscribers for both variants (seqlock-style)

  • Variable-length Str messages with circular arena

  • Futex-based blocking poll with timeout (no busy-spin)

  • PID-based stale lock recovery (Str mode)

  • Batch publish/poll operations (drain, poll_wait_multi)

  • Per-subscriber overflow counting

  • Optional keyword API via XS::Parse::Keyword

When to Use Int vs Str

Int is best for signaling, counters, indices, timestamps, or any integer-valued broadcast. Lock-free MPMC publish means multiple publishers never block each other.

Str is best for serialized messages, log lines, JSON payloads, or any variable-length data. Mutex-protected publish serializes concurrent publishers but subscribers remain lock-free.

Variants

Data::PubSub::Shared::Int - int64 values, lock-free MPMC publish

Uses atomic fetch-and-add for multi-publisher support. Each slot has a sequence number; subscribers verify data consistency via double-check (seqlock-style). Zero contention between publishers and subscribers.

Data::PubSub::Shared::Int32 - int32 values, lock-free, 8 bytes/slot
Data::PubSub::Shared::Int16 - int16 values, lock-free, 8 bytes/slot

Compact variants with 32-bit sequence numbers. Half the memory of Int (8 bytes/slot vs 16). Same lock-free MPMC algorithm and full API. Values outside the type range are silently truncated (standard C cast).

Data::PubSub::Shared::Str - byte string values, mutex-protected publish

Mutex-protected publish with variable-length messages stored in a circular arena (max capped at msg_size). Short messages use only the space they need. Subscribers read lock-free with seqlock-style double-check. UTF-8 flag preserved. Default max message size: 256 bytes.

Key Differences from Data::Queue::Shared

  • Broadcast: every subscriber sees every message (queues consume)

  • No backpressure: publish always succeeds (ring overwrites old data)

  • Multiple independent readers: each subscriber has its own cursor

  • Lock-free subscribers: subscribers never block publishers

Constructor

# Int
my $ps = Data::PubSub::Shared::Int->new($path, $capacity);

# Str
my $ps = Data::PubSub::Shared::Str->new($path, $capacity);
my $ps = Data::PubSub::Shared::Str->new($path, $capacity, $msg_size);

Creates or opens a shared pub/sub backed by file $path. $capacity is rounded up to the next power of 2. When opening an existing file, parameters are read from the stored header. Pass undef for $path for anonymous (fork-inherited) pub/sub.

For Str, $msg_size sets the maximum bytes per message (default: 256). Messages exceeding this size will croak. A circular arena of capacity * (msg_size + 8) bytes is allocated automatically.

memfd Constructor

my $ps = Data::PubSub::Shared::Int->new_memfd($name, $capacity);
my $ps2 = Data::PubSub::Shared::Int->new_from_fd($ps->memfd);

Publishing

$ps->publish($value);                 # returns true
my $n = $ps->publish_multi(@values);  # returns count published
$ps->publish_notify($value);          # publish + eventfd notify

Publish writes to the ring buffer and wakes any blocked subscribers. Int publish is lock-free (atomic fetch-and-add); publish_multi claims all slots in a single atomic operation (one fetch-add instead of N). Str publish is mutex-protected.

publish_notify combines publish and notify in a single XS call, saving method dispatch overhead in the common non-batching case.

Management

$ps->clear;                  # reset ring: write_pos=0, all slots cleared
$ps->sync;                   # msync to disk
$ps->unlink;                 # remove backing file
Class->unlink($path);        # class method form
my $p = $ps->path;           # backing file path (undef for anon/memfd)
my $s = $ps->stats;          # diagnostic hashref

clear resets the ring buffer to its initial state: write_pos, stat_publish_ok, and all slot sequences are zeroed. Existing subscribers will need to call reset_oldest to see new messages. For Str mode, the arena write position is also reset.

For Str, publish_multi holds the mutex for the entire batch (one lock/unlock cycle instead of N), which significantly improves throughput for batch string publishing.

Subscribing

my $sub = $ps->subscribe;       # future messages only
my $sub = $ps->subscribe_all;   # from oldest available

Creates a subscriber with its own cursor. Subscribers are process-local and cannot be shared between processes. Each process should create its own subscribers.

Subscriber API

Polling

my $val = $sub->poll;            # non-blocking, undef if empty
my @v   = $sub->poll_multi($n);  # batch, up to $n items
my @v   = $sub->drain;           # poll all available
my @v   = $sub->drain($max);     # poll up to $max
my $val = $sub->poll_wait;       # blocking, infinite wait
my $val = $sub->poll_wait($t);   # blocking with timeout (seconds)
my @v   = $sub->poll_wait_multi($n, $timeout);

drain returns all currently available messages in one call.

poll_wait_multi blocks until at least one message is available (or timeout), then grabs up to $n messages non-blocking. Returns empty list on timeout.

Status

my $n = $sub->lag;             # messages behind write_pos
my $c = $sub->cursor;         # current read position
$sub->cursor($new_pos);       # seek to specific position
my $o = $sub->has_overflow;    # true if ring wrapped past us
my $n = $sub->overflow_count;  # total messages skipped due to overflow
my $p = $sub->write_pos;      # publisher's current position

overflow_count is a cumulative counter of messages skipped due to ring overflow. It increments each time poll auto-recovers by the number of messages that were lost.

Callback-based Polling

my $n = $sub->poll_cb(\&handler);    # call handler for each message
my @v = $sub->drain_notify;          # eventfd_consume + drain
my @v = $sub->drain_notify($max);    # eventfd_consume + drain up to $max

poll_cb calls handler once per available message without returning to Perl between messages. Eliminates per-message method dispatch overhead. Returns the number of messages processed.

drain_notify combines eventfd_consume and drain in a single XS call. Designed for event-loop callbacks:

my $w = EV::io $fd, EV::READ, sub {
    my @msgs = $sub->drain_notify;
    process($_) for @msgs;
};

Subscribers inherit the handle's eventfd at creation time. Use $sub->eventfd_set($fd) to set it manually, or $sub->fileno to query it.

Cursor Management

$sub->reset;         # skip to current write_pos (future only)
$sub->reset_oldest;  # go back to oldest available

If a subscriber falls behind by more than capacity messages, poll auto-recovers by resetting to the oldest available position and returning the next available message. The skipped message count is added to overflow_count.

Event Loop Integration

my $fd = $ps->eventfd;
$ps->notify;            # after publish (opt-in)

use EV;
my $sub = $ps->subscribe;
my $w = EV::io $fd, EV::READ, sub {
    $ps->eventfd_consume;
    while (defined(my $v = $sub->poll)) { process($v) }
};

Crash Safety

Str mode uses a futex-based mutex with PID tracking. If a publisher dies while holding the mutex, other publishers detect the stale lock within 2 seconds and automatically recover. Int mode is lock-free and requires no crash recovery.

Keyword API

use Data::PubSub::Shared::Int;

ps_int_publish $ps, $value;
my $val = ps_int_poll $sub;
my $n   = ps_int_lag $sub;

use Data::PubSub::Shared::Str;

ps_str_publish $ps, $value;
my $val = ps_str_poll $sub;

Replace int with int32, int16, or str for other variants. Keywords are lexically scoped and require XS::Parse::Keyword at build time.

BENCHMARKS

Single-process throughput, 1M items, Linux x86_64. Run perl -Mblib bench/throughput.pl to reproduce.

PUBLISH + POLL (interleaved)
Int     5.0M/s   (16 bytes/slot)
Int32   5.9M/s   (8 bytes/slot)
Int16   5.7M/s   (8 bytes/slot)
Str     2.5M/s   (~30B messages)

BATCH PUBLISH (100/batch)
Int publish_multi:    170M/s
Str publish_multi:     42M/s

Fan-out: publish throughput is independent of subscriber count (subscribers are lock-free and read-only).

AUTHOR

vividsnow

LICENSE

This is free software; you can redistribute it and/or modify it under the same terms as Perl itself.