NAME
Data::Queue::Shared - High-performance shared-memory MPMC queues for Linux
SYNOPSIS
use Data::Queue::Shared;
# Integer queue (lock-free Vyukov MPMC)
my $q = Data::Queue::Shared::Int->new('/tmp/myq.shm', 1024);
# Anonymous queue (fork-inherited, no filesystem)
my $q = Data::Queue::Shared::Int->new(undef, 1024);
# memfd-backed queue (shareable via fd passing)
my $q = Data::Queue::Shared::Str->new_memfd("my_queue", 1024);
my $fd = $q->memfd; # pass via SCM_RIGHTS or fork
my $q2 = Data::Queue::Shared::Str->new_from_fd($fd);
$q->push(42);
my $val = $q->pop; # non-blocking, undef if empty
# Blocking pop (waits for data)
my $val = $q->pop_wait; # infinite wait
my $val = $q->pop_wait(1.5); # 1.5 second timeout
# Batch operations
my $pushed = $q->push_multi(1, 2, 3, 4, 5);
my @vals = $q->pop_multi(10); # pop up to 10
# String queue (mutex-protected, circular arena)
my $sq = Data::Queue::Shared::Str->new('/tmp/strq.shm', 1024);
$sq->push("hello world");
my $msg = $sq->pop;
# With explicit arena size (default: capacity * 256)
my $sq = Data::Queue::Shared::Str->new('/tmp/strq.shm', 1024, 1048576);
# Multiprocess
if (fork() == 0) {
my $child = Data::Queue::Shared::Int->new('/tmp/myq.shm', 1024);
$child->push(99);
exit;
}
wait;
print $q->pop; # 99
DESCRIPTION
Data::Queue::Shared provides bounded MPMC (multi-producer, multi-consumer) queues stored in file-backed shared memory (mmap(MAP_SHARED)), enabling efficient multiprocess data sharing on Linux.
Linux-only. Requires 64-bit Perl.
Variants
-
Uses the Vyukov bounded MPMC algorithm. Push and pop are lock-free (CAS-based). Optimal for integer job IDs, counters, indices.
-
Compact variants with 32-bit Vyukov sequence numbers. Half the memory footprint per slot = double the cache density. Same lock-free algorithm. Same API as Int. Values outside the type range are silently truncated (standard C cast semantics).
-
Uses a futex-based mutex with a circular arena for variable-length string storage. Supports UTF-8 flag preservation. Optimal for messages, serialized data, filenames.
Features
File-backed mmap for cross-process sharing
Lock-free MPMC for integer queues (Vyukov algorithm)
Futex-based blocking wait with timeout (no busy-spin)
PID-based stale lock recovery (dead process detection)
Batch push/pop operations
Circular arena for zero-fragmentation string storage
Optional keyword API via XS::Parse::Keyword (zero method-dispatch overhead)
Constructor
# Int queue
my $q = Data::Queue::Shared::Int->new($path, $capacity);
# Str queue
my $q = Data::Queue::Shared::Str->new($path, $capacity);
my $q = Data::Queue::Shared::Str->new($path, $capacity, $arena_bytes);
Creates or opens a shared queue backed by file $path. $capacity is rounded up to the next power of 2. When opening an existing file, parameters are read from the stored header. Multiple processes can open the same file simultaneously.
Pass undef for $path to create an anonymous queue using MAP_SHARED|MAP_ANONYMOUS. Anonymous queues are shared with child processes via fork() but cannot be opened by unrelated processes.
memfd Constructor
my $q = Data::Queue::Shared::Int->new_memfd($name, $capacity);
my $q = Data::Queue::Shared::Str->new_memfd($name, $capacity);
my $q = Data::Queue::Shared::Str->new_memfd($name, $cap, $arena);
Creates a queue backed by memfd_create(2). No filesystem path — the backing memory is identified by a file descriptor. Use memfd() to retrieve the fd and pass it to other processes via SCM_RIGHTS (Unix domain socket fd passing) or fork() inheritance.
my $q2 = Data::Queue::Shared::Int->new_from_fd($fd);
my $q2 = Data::Queue::Shared::Str->new_from_fd($fd);
Opens a queue from a received memfd. The fd is dup'd internally.
my $fd = $q->memfd; # backing fd (-1 if file-backed/anonymous)
For Str queues, $arena_bytes sets the string storage arena size (default: $capacity * 256, minimum 4096, maximum 4GB). Strings are stored in a circular arena; total stored string bytes cannot exceed the arena capacity. Individual strings are limited to ~2GB.
API
Core operations
my $ok = $q->push($value); # non-blocking, false if full
my $val = $q->pop; # non-blocking, undef if empty
my $ok = $q->push_wait($value); # blocking, infinite wait
my $ok = $q->push_wait($value, $secs); # blocking with timeout
my $val = $q->pop_wait; # blocking, infinite wait
my $val = $q->pop_wait($secs); # blocking with timeout
my $val = $q->peek; # read front without consuming
peek returns the front element without removing it (undef if empty). For Int, this is a best-effort snapshot (racy in concurrent MPMC). For Str, this is exact (mutex-protected).
Deque operations (Str only)
my $ok = $q->push_front($value); # non-blocking push to front
my $ok = $q->push_front_wait($value); # blocking push to front
my $ok = $q->push_front_wait($val, $secs); # with timeout
my $val = $q->pop_back; # non-blocking pop from back
my $val = $q->pop_back_wait; # blocking pop from back
my $val = $q->pop_back_wait($timeout); # with timeout
push_front inserts at the head — useful for requeueing failed jobs. pop_back removes from the tail — useful for work-stealing or undo. Not available for Int (Vyukov algorithm is strictly FIFO).
Batch operations
my $n = $q->push_multi(@values); # non-blocking, returns pushed count
my @v = $q->pop_multi($count); # non-blocking, pop up to $count
my $n = $q->push_wait_multi($timeout, @values); # blocking batch push
my @v = $q->pop_wait_multi($n, $timeout); # block for >=1, grab up to $n
my @v = $q->drain; # pop all elements
my @v = $q->drain($max); # pop up to $max elements
pop_wait_multi blocks until at least one element is available (or timeout), then grabs up to $n elements non-blocking. Returns empty list on timeout.
push_wait_multi pushes all values, blocking if the queue is full. $timeout is seconds (-1 = infinite, 0 = try once).
Status
my $n = $q->size; # approximate for Int (lock-free), exact for Str
my $cap = $q->capacity; # max elements
my $ok = $q->is_empty;
my $ok = $q->is_full;
Management
$q->clear; # remove all elements
$q->sync; # msync — flush to disk for crash durability
$q->unlink; # remove backing file
Class->unlink($path); # class method form
my $p = $q->path; # backing file path
my $s = $q->stats; # diagnostic hashref
Stats keys: size, capacity, mmap_size, push_ok, pop_ok, push_full, pop_empty, recoveries, push_waiters, pop_waiters. Str queues additionally include arena_cap and arena_used. All counters are approximate under concurrent access (diagnostic only). push_waiters/pop_waiters show currently blocked producers/consumers.
Event Loop Integration (eventfd)
my $fd = $q->eventfd; # create eventfd, returns fd number
$q->eventfd_set($fd); # use an existing fd (e.g. inherited via fork)
my $fd = $q->fileno; # current eventfd (-1 if none)
$q->notify; # signal eventfd (call after push)
$q->eventfd_consume; # drain notification counter
Notification is opt-in: push does not write to the eventfd automatically. Call notify explicitly after pushing. This gives full control over batching (push N items, notify once) and avoids any overhead when eventfd is not used.
use EV;
my $q = Data::Queue::Shared::Str->new($path, 1024);
my $fd = $q->eventfd;
my $w = EV::io $fd, EV::READ, sub {
$q->eventfd_consume;
while (defined(my $item = $q->pop)) {
process($item);
}
};
# Producer side:
$q->push($item);
$q->notify; # wake the EV watcher
EV::run;
For cross-process notification, create the eventfd before fork(). Child processes inherit the fd and should call eventfd_set($fd) on their queue handle. Writes from any process sharing the fd will wake all event-loop watchers.
Crash Safety
If a process dies while holding the Str queue mutex, other processes detect the stale lock within 2 seconds via PID tracking and automatically recover. The Int queue is lock-free and requires no crash recovery for normal push/pop operations.
Keyword API
When XS::Parse::Keyword is installed at build time, keyword forms are available that bypass method dispatch:
use Data::Queue::Shared::Int; # activates q_int_* keywords
q_int_push $q, $value;
my $val = q_int_pop $q;
my $val = q_int_peek $q;
my $n = q_int_size $q;
Replace int with int32, int16, or str for other variants. Keywords are lexically scoped and require use (not require).
BENCHMARKS
Throughput versus other Perl queue/IPC modules, 200K items, single process and cross-process, Linux x86_64. Run perl -Mblib bench/vs.pl 200000 to reproduce.
SINGLE-PROCESS INTEGER PUSH+POP (interleaved)
Rate
Data::Queue::Shared::Int 5.0M/s
MCE::Queue 1.8M/s
POSIX::RT::MQ 806K/s
IPC::Msg (SysV) 802K/s
IPC::Transit 58K/s
Forks::Queue (Shmem) 11K/s
SINGLE-PROCESS STRING PUSH+POP (~50B, interleaved)
Rate
Data::Queue::Shared::Str 2.6M/s
MCE::Queue 1.5M/s
POSIX::RT::MQ 990K/s
IPC::Msg (SysV) 857K/s
Forks::Queue (Shmem) 11K/s
BATCH PUSH+POP (100 per batch, integers)
Rate
Shared::Int push_multi 14.9M/s
MCE::Queue 4.5M/s
CROSS-PROCESS (1 producer + 1 consumer, integers)
Rate
Shared::Int 6.0M/s
MCE::Queue 4.1M/s
POSIX::RT::MQ 1.2M/s
IPC::Msg (SysV) 956K/s
Forks::Queue (Shmem) 4K/s
Key takeaways:
2.8x faster than MCE::Queue for single-process integer ops
1.5x faster than MCE::Queue for cross-process integers
6x faster than kernel IPC (POSIX mq / SysV msgq)
3.3x faster batch ops (single mutex hold vs per-item)
True concurrent MPMC (MCE::Queue is workers-to-manager only)
AUTHOR
vividsnow
LICENSE
This is free software; you can redistribute it and/or modify it under the same terms as Perl itself.