NAME
Data::ReqRep::Shared - High-performance shared-memory request/response IPC for Linux
SYNOPSIS
use Data::ReqRep::Shared;
# Server: create channel
my $srv = Data::ReqRep::Shared->new('/tmp/rr.shm', 1024, 64, 4096);
# path, req_capacity, resp_slots, resp_data_max
# Server loop
while (my ($req, $id) = $srv->recv_wait) {
$srv->reply($id, process($req));
}
# Client: open existing channel
my $cli = Data::ReqRep::Shared::Client->new('/tmp/rr.shm');
# Synchronous
my $resp = $cli->req("hello");
# With timeout (single deadline covers send + wait)
my $resp = $cli->req_wait("hello", 5.0);
# Asynchronous (multiple in-flight)
my $id1 = $cli->send("req1");
my $id2 = $cli->send("req2");
my $r1 = $cli->get_wait($id1);
my $r2 = $cli->get_wait($id2);
# Integer variant (lock-free, 1.5x faster)
use Data::ReqRep::Shared::Int;
my $srv = Data::ReqRep::Shared::Int->new($path, 1024, 64);
my $cli = Data::ReqRep::Shared::Int::Client->new($path);
my $resp = $cli->req(42);
DESCRIPTION
Shared-memory request/response channel for interprocess communication on Linux. Multiple clients send requests, multiple workers process them, responses are routed back to the correct requester. All through a single shared-memory file -- no broker process, no socket pairs per connection.
Linux-only. Requires 64-bit Perl.
Architecture
Request queue -- bounded MPMC ring buffer. Str variant uses a futex mutex with circular arena for variable-length data. Int variant uses a lock-free Vyukov MPMC queue.
Response slots -- fixed pool with per-slot futex for targeted wakeup and a generation counter for ABA-safe cancel/recycle.
Flow: client acquires a response slot, pushes a request (carrying the slot ID), server pops the request, writes the response to that slot, client reads it and releases the slot.
Variants
-
Variable-length byte string requests and responses. Mutex-protected request queue with circular arena. Supports UTF-8 flag preservation.
my $srv = Data::ReqRep::Shared->new($path, $cap, $slots, $resp_size); my $srv = Data::ReqRep::Shared->new($path, $cap, $slots, $resp_size, $arena); -
Single int64 request and response values. Lock-free Vyukov MPMC request queue. 1.5x faster single-process. No arena, no mutex on the request path.
my $srv = Data::ReqRep::Shared::Int->new($path, $cap, $slots);
Both variants share the same response slot infrastructure, the same generation-counter ABA protection, the same eventfd integration, and the same crash recovery mechanisms.
Constructors
Server (creates or opens the channel):
->new($path, ...) # file-backed
->new(undef, ...) # anonymous (fork-inherited)
->new_memfd($name, ...) # memfd (fd-passing or fork)
->new_from_fd($fd) # open from memfd fd
Client (opens existing channel):
->new($path)
->new_from_fd($fd)
Constructor arguments for Str: $path, $req_cap, $resp_slots, $resp_size [, $arena]. For Int: $path, $req_cap, $resp_slots.
Server API
my ($data, $id) = $srv->recv; # non-blocking
my ($data, $id) = $srv->recv_wait; # blocking
my ($data, $id) = $srv->recv_wait($secs); # with timeout
Returns ($request_data, $id) or empty list. For Int, $data is an integer.
my $ok = $srv->reply($id, $response);
Writes response and wakes the client. Returns false if the slot was cancelled or recycled (generation mismatch).
Batch (Str only):
my @pairs = $srv->recv_multi($n); # up to $n under one lock
my @pairs = $srv->recv_wait_multi($n, $timeout);
my @pairs = $srv->drain;
my @pairs = $srv->drain($max);
Returns flat list ($data1, $id1, $data2, $id2, ...).
Management:
$srv->clear; $srv->sync; $srv->unlink;
$srv->size; $srv->capacity; $srv->is_empty;
$srv->resp_slots; $srv->resp_size; $srv->stats;
$srv->path; $srv->memfd;
eventfd (see "Event Loop Integration"):
$srv->eventfd; $srv->eventfd_set($fd);
$srv->eventfd_consume; $srv->notify;
$srv->fileno; # current request eventfd (-1 if none)
$srv->reply_eventfd; $srv->reply_eventfd_set($fd);
$srv->reply_eventfd_consume; $srv->reply_notify;
$srv->reply_fileno; # current reply eventfd (-1 if none)
Client API
Synchronous:
my $resp = $cli->req($data); # infinite wait
my $resp = $cli->req_wait($data, $secs); # single deadline
Asynchronous:
my $id = $cli->send($data); # non-blocking
my $id = $cli->send_wait($data, $secs); # blocking
my $resp = $cli->get($id); # non-blocking
my $resp = $cli->get_wait($id, $secs); # blocking
$cli->cancel($id); # abandon request
cancel releases the slot only if the reply hasn't arrived yet. If it has (state is READY), cancel is a no-op -- call get() to drain.
Convenience (Str only):
my $id = $cli->send_notify($data); # send + eventfd signal
my $id = $cli->send_wait_notify($data);
Status:
$cli->pending; $cli->size; $cli->capacity;
$cli->is_empty; $cli->resp_slots; $cli->resp_size;
$cli->stats; $cli->path; $cli->memfd;
eventfd (see "Event Loop Integration"):
$cli->eventfd; $cli->eventfd_set($fd);
$cli->eventfd_consume; $cli->fileno;
$cli->notify; # signal request eventfd
$cli->req_eventfd_set($fd); $cli->req_fileno;
Event Loop Integration (eventfd)
Two eventfds for bidirectional notification. Both are opt-in -- send/reply do not signal automatically.
# Request notification (client -> server)
my $req_fd = $srv->eventfd; # create
$srv->eventfd_consume; # drain in callback
$cli->notify; # signal (or send_notify)
$cli->req_eventfd_set($fd); # set inherited fd
# Reply notification (server -> client)
my $rep_fd = $srv->reply_eventfd;
$srv->reply_notify; # signal after reply
$cli->eventfd; # create (maps to reply fd)
$cli->eventfd_consume; # drain in callback
$cli->eventfd_set($fd); # set inherited fd
For cross-process use, create both eventfds before fork() so child inherits the fds:
my $srv = Data::ReqRep::Shared->new($path, 1024, 64, 4096);
my $req_fd = $srv->eventfd;
my $rep_fd = $srv->reply_eventfd;
if (fork() == 0) {
my $cli = Data::ReqRep::Shared::Client->new($path);
$cli->req_eventfd_set($req_fd);
$cli->eventfd_set($rep_fd);
$cli->send_notify($data); # wakes server
# EV::io $rep_fd for reply ...
exit;
}
# parent = server
my $w = EV::io $req_fd, EV::READ, sub {
$srv->eventfd_consume;
while (my ($req, $id) = $srv->recv) {
$srv->reply($id, process($req));
}
$srv->reply_notify;
};
Crash Safety
Stale mutex -- if a process dies holding the request queue mutex, other processes detect it via PID tracking and recover within 2 seconds.
Stale response slots -- if a client dies while holding a slot (ACQUIRED or READY state), the slot is reclaimed automatically during the next slot acquisition scan.
ABA protection -- response slot IDs carry a generation counter. A cancelled-and-reacquired slot has a different generation, so stale
reply/get/cancelcalls are safely rejected.
Tuning
req_cap-- request queue capacity (power of 2). Higher for bursty workloads (1024-4096), lower for steady-state (64-256). Memory: 24 bytes/slot + arena (Str) or 24 bytes/slot (Int).resp_slots-- max concurrent in-flight requests across all clients. One slot per outstanding async request. For synchronousreq(), one per client suffices. Memory: 64 bytes/slot (Int) or (32 +resp_sizerounded up to 64) bytes/slot (Str).resp_size-- max response payload bytes (Str only). Fixed per slot. Responses exceeding this croak. Pick the 99th percentile.arena-- request data arena bytes (Str only, defaultreq_cap * 256). Increase for large requests. Monitorarena_usedinstats().
Benchmarks
Linux x86_64. Run perl -Mblib bench/vs.pl 50000 to reproduce.
SINGLE-PROCESS ECHO (200K iterations)
ReqRep::Int (lock-free) 1.8M req/s
ReqRep::Str (12B, mutex) 1.2M req/s
ReqRep::Str batch (100x) 1.4M req/s
CROSS-PROCESS ECHO (50K iterations, 12B payload)
Pipe pair (1:1) 240K req/s
Unix socketpair (1:1) 222K req/s
ReqRep::Int 202K req/s *
ReqRep::Str 177K req/s *
IPC::Msg (SysV) 165K req/s
TCP loopback 115K req/s
MCE::Channel 96K req/s
Socketpair via broker 82K req/s
Forks::Queue (Shmem) 5K req/s
* = MPMC with per-request reply routing. Pipes and sockets are faster for simple 1:1 echo but require dedicated fd pairs per client-worker connection and cannot do MPMC without a broker (which halves throughput).
SEE ALSO
Data::Queue::Shared, Data::PubSub::Shared, Data::Buffer::Shared, Data::HashMap::Shared
AUTHOR
vividsnow
LICENSE
This is free software; you can redistribute it and/or modify it under the same terms as Perl itself.