NAME

Data::Queue::Shared - High-performance shared-memory MPMC queues for Linux

SYNOPSIS

use Data::Queue::Shared;

# Integer queue (lock-free Vyukov MPMC)
my $q = Data::Queue::Shared::Int->new('/tmp/myq.shm', 1024);

# Anonymous queue (fork-inherited, no filesystem)
my $q = Data::Queue::Shared::Int->new(undef, 1024);

# memfd-backed queue (shareable via fd passing)
my $q = Data::Queue::Shared::Str->new_memfd("my_queue", 1024);
my $fd = $q->memfd;  # pass via SCM_RIGHTS or fork
my $q2 = Data::Queue::Shared::Str->new_from_fd($fd);
$q->push(42);
my $val = $q->pop;              # non-blocking, undef if empty

# Blocking pop (waits for data)
my $val = $q->pop_wait;         # infinite wait
my $val = $q->pop_wait(1.5);    # 1.5 second timeout

# Batch operations
my $pushed = $q->push_multi(1, 2, 3, 4, 5);
my @vals = $q->pop_multi(10);   # pop up to 10

# String queue (mutex-protected, circular arena)
my $sq = Data::Queue::Shared::Str->new('/tmp/strq.shm', 1024);
$sq->push("hello world");
my $msg = $sq->pop;

# With explicit arena size (default: capacity * 256)
my $sq = Data::Queue::Shared::Str->new('/tmp/strq.shm', 1024, 1048576);

# Multiprocess
if (fork() == 0) {
    my $child = Data::Queue::Shared::Int->new('/tmp/myq.shm', 1024);
    $child->push(99);
    exit;
}
wait;
print $q->pop;  # 99

DESCRIPTION

Data::Queue::Shared provides bounded MPMC (multi-producer, multi-consumer) queues stored in file-backed shared memory (mmap(MAP_SHARED)), enabling efficient multiprocess data sharing on Linux.

Linux-only. Requires 64-bit Perl.

Variants

Data::Queue::Shared::Int - int64 values, lock-free (16 bytes/slot)

Uses the Vyukov bounded MPMC algorithm. Push and pop are lock-free (CAS-based). Optimal for integer job IDs, counters, indices.

Data::Queue::Shared::Int32 - int32 values, lock-free (8 bytes/slot)
Data::Queue::Shared::Int16 - int16 values, lock-free (8 bytes/slot)

Compact variants with 32-bit Vyukov sequence numbers. Half the memory footprint per slot = double the cache density. Same lock-free algorithm. Same API as Int. Values outside the type range are silently truncated (standard C cast semantics).

Data::Queue::Shared::Str - byte string values, mutex-protected

Uses a futex-based mutex with a circular arena for variable-length string storage. Supports UTF-8 flag preservation. Optimal for messages, serialized data, filenames.

Features

  • File-backed mmap for cross-process sharing

  • Lock-free MPMC for integer queues (Vyukov algorithm)

  • Futex-based blocking wait with timeout (no busy-spin)

  • PID-based stale lock recovery (dead process detection)

  • Batch push/pop operations

  • Circular arena for zero-fragmentation string storage

  • Optional keyword API via XS::Parse::Keyword (zero method-dispatch overhead)

Constructor

# Int queue
my $q = Data::Queue::Shared::Int->new($path, $capacity);

# Str queue
my $q = Data::Queue::Shared::Str->new($path, $capacity);
my $q = Data::Queue::Shared::Str->new($path, $capacity, $arena_bytes);

Creates or opens a shared queue backed by file $path. $capacity is rounded up to the next power of 2. When opening an existing file, parameters are read from the stored header. Multiple processes can open the same file simultaneously.

Pass undef for $path to create an anonymous queue using MAP_SHARED|MAP_ANONYMOUS. Anonymous queues are shared with child processes via fork() but cannot be opened by unrelated processes.

memfd Constructor

my $q = Data::Queue::Shared::Int->new_memfd($name, $capacity);
my $q = Data::Queue::Shared::Str->new_memfd($name, $capacity);
my $q = Data::Queue::Shared::Str->new_memfd($name, $cap, $arena);

Creates a queue backed by memfd_create(2). No filesystem path — the backing memory is identified by a file descriptor. Use memfd() to retrieve the fd and pass it to other processes via SCM_RIGHTS (Unix domain socket fd passing) or fork() inheritance.

my $q2 = Data::Queue::Shared::Int->new_from_fd($fd);
my $q2 = Data::Queue::Shared::Str->new_from_fd($fd);

Opens a queue from a received memfd. The fd is dup'd internally.

my $fd = $q->memfd;    # backing fd (-1 if file-backed/anonymous)

For Str queues, $arena_bytes sets the string storage arena size (default: $capacity * 256, minimum 4096, maximum 4GB). Strings are stored in a circular arena; total stored string bytes cannot exceed the arena capacity. Individual strings are limited to ~2GB.

API

Core operations

my $ok  = $q->push($value);             # non-blocking, false if full
my $val = $q->pop;                       # non-blocking, undef if empty
my $ok  = $q->push_wait($value);         # blocking, infinite wait
my $ok  = $q->push_wait($value, $secs);  # blocking with timeout
my $val = $q->pop_wait;                  # blocking, infinite wait
my $val = $q->pop_wait($secs);           # blocking with timeout
my $val = $q->peek;                      # read front without consuming

peek returns the front element without removing it (undef if empty). For Int, this is a best-effort snapshot (racy in concurrent MPMC). For Str, this is exact (mutex-protected).

Deque operations (Str only)

my $ok  = $q->push_front($value);           # non-blocking push to front
my $ok  = $q->push_front_wait($value);       # blocking push to front
my $ok  = $q->push_front_wait($val, $secs);  # with timeout
my $val = $q->pop_back;                 # non-blocking pop from back
my $val = $q->pop_back_wait;            # blocking pop from back
my $val = $q->pop_back_wait($timeout);  # with timeout

push_front inserts at the head — useful for requeueing failed jobs. pop_back removes from the tail — useful for work-stealing or undo. Not available for Int (Vyukov algorithm is strictly FIFO).

Batch operations

my $n  = $q->push_multi(@values);          # non-blocking, returns pushed count
my @v  = $q->pop_multi($count);            # non-blocking, pop up to $count
my $n  = $q->push_wait_multi($timeout, @values);  # blocking batch push
my @v  = $q->pop_wait_multi($n, $timeout); # block for >=1, grab up to $n
my @v  = $q->drain;                        # pop all elements
my @v  = $q->drain($max);                  # pop up to $max elements

pop_wait_multi blocks until at least one element is available (or timeout), then grabs up to $n elements non-blocking. Returns empty list on timeout.

push_wait_multi pushes all values, blocking if the queue is full. $timeout is seconds (-1 = infinite, 0 = try once).

Status

my $n   = $q->size;         # approximate for Int (lock-free), exact for Str
my $cap = $q->capacity;     # max elements
my $ok  = $q->is_empty;
my $ok  = $q->is_full;

Management

$q->clear;                  # remove all elements
$q->sync;                   # msync — flush to disk for crash durability
$q->unlink;                 # remove backing file
Class->unlink($path);       # class method form
my $p = $q->path;           # backing file path
my $s = $q->stats;          # diagnostic hashref

Stats keys: size, capacity, mmap_size, push_ok, pop_ok, push_full, pop_empty, recoveries, push_waiters, pop_waiters. Str queues additionally include arena_cap and arena_used. All counters are approximate under concurrent access (diagnostic only). push_waiters/pop_waiters show currently blocked producers/consumers.

Event Loop Integration (eventfd)

my $fd = $q->eventfd;           # create eventfd, returns fd number
$q->eventfd_set($fd);           # use an existing fd (e.g. inherited via fork)
my $fd = $q->fileno;            # current eventfd (-1 if none)
$q->notify;                     # signal eventfd (call after push)
$q->eventfd_consume;            # drain notification counter

Notification is opt-in: push does not write to the eventfd automatically. Call notify explicitly after pushing. This gives full control over batching (push N items, notify once) and avoids any overhead when eventfd is not used.

use EV;
my $q = Data::Queue::Shared::Str->new($path, 1024);
my $fd = $q->eventfd;
my $w = EV::io $fd, EV::READ, sub {
    $q->eventfd_consume;
    while (defined(my $item = $q->pop)) {
        process($item);
    }
};
# Producer side:
$q->push($item);
$q->notify;   # wake the EV watcher
EV::run;

For cross-process notification, create the eventfd before fork(). Child processes inherit the fd and should call eventfd_set($fd) on their queue handle. Writes from any process sharing the fd will wake all event-loop watchers.

Crash Safety

If a process dies while holding the Str queue mutex, other processes detect the stale lock within 2 seconds via PID tracking and automatically recover. The Int queue is lock-free and requires no crash recovery for normal push/pop operations.

Keyword API

When XS::Parse::Keyword is installed at build time, keyword forms are available that bypass method dispatch:

use Data::Queue::Shared::Int;    # activates q_int_* keywords

q_int_push $q, $value;
my $val = q_int_pop $q;
my $val = q_int_peek $q;
my $n   = q_int_size $q;

Replace int with int32, int16, or str for other variants. Keywords are lexically scoped and require use (not require).

BENCHMARKS

Throughput versus other Perl queue/IPC modules, 200K items, single process and cross-process, Linux x86_64. Run perl -Mblib bench/vs.pl 200000 to reproduce.

SINGLE-PROCESS INTEGER PUSH+POP (interleaved)
                           Rate
Data::Queue::Shared::Int  5.0M/s
MCE::Queue                1.8M/s
POSIX::RT::MQ             806K/s
IPC::Msg (SysV)           802K/s
IPC::Transit               58K/s
Forks::Queue (Shmem)       11K/s

SINGLE-PROCESS STRING PUSH+POP (~50B, interleaved)
                           Rate
Data::Queue::Shared::Str  2.6M/s
MCE::Queue                1.5M/s
POSIX::RT::MQ             990K/s
IPC::Msg (SysV)           857K/s
Forks::Queue (Shmem)       11K/s

BATCH PUSH+POP (100 per batch, integers)
                           Rate
Shared::Int push_multi    14.9M/s
MCE::Queue                 4.5M/s

CROSS-PROCESS (1 producer + 1 consumer, integers)
                           Rate
Shared::Int               6.0M/s
MCE::Queue                4.1M/s
POSIX::RT::MQ             1.2M/s
IPC::Msg (SysV)           956K/s
Forks::Queue (Shmem)        4K/s

Key takeaways:

  • 2.8x faster than MCE::Queue for single-process integer ops

  • 1.5x faster than MCE::Queue for cross-process integers

  • 6x faster than kernel IPC (POSIX mq / SysV msgq)

  • 3.3x faster batch ops (single mutex hold vs per-item)

  • True concurrent MPMC (MCE::Queue is workers-to-manager only)

AUTHOR

vividsnow

LICENSE

This is free software; you can redistribute it and/or modify it under the same terms as Perl itself.