NAME

Data::ReqRep::Shared - High-performance shared-memory request/response IPC for Linux

SYNOPSIS

use Data::ReqRep::Shared;

# Server: create channel
my $srv = Data::ReqRep::Shared->new('/tmp/rr.shm', 1024, 64, 4096);
#   path, req_capacity, resp_slots, resp_data_max

# Server loop
while (my ($req, $id) = $srv->recv_wait) {
    $srv->reply($id, process($req));
}

# Client: open existing channel
my $cli = Data::ReqRep::Shared::Client->new('/tmp/rr.shm');

# Synchronous
my $resp = $cli->req("hello");

# With timeout (single deadline covers send + wait)
my $resp = $cli->req_wait("hello", 5.0);

# Asynchronous (multiple in-flight)
my $id1 = $cli->send("req1");
my $id2 = $cli->send("req2");
my $r1  = $cli->get_wait($id1);
my $r2  = $cli->get_wait($id2);

# Integer variant (lock-free, 1.5x faster)
use Data::ReqRep::Shared::Int;
my $srv = Data::ReqRep::Shared::Int->new($path, 1024, 64);
my $cli = Data::ReqRep::Shared::Int::Client->new($path);
my $resp = $cli->req(42);

DESCRIPTION

Shared-memory request/response channel for interprocess communication on Linux. Multiple clients send requests, multiple workers process them, responses are routed back to the correct requester. All through a single shared-memory file -- no broker process, no socket pairs per connection.

Linux-only. Requires 64-bit Perl.

Architecture

  • Request queue -- bounded MPMC ring buffer. Str variant uses a futex mutex with circular arena for variable-length data. Int variant uses a lock-free Vyukov MPMC queue.

  • Response slots -- fixed pool with per-slot futex for targeted wakeup and a generation counter for ABA-safe cancel/recycle.

Flow: client acquires a response slot, pushes a request (carrying the slot ID), server pops the request, writes the response to that slot, client reads it and releases the slot.

Variants

Str -- Data::ReqRep::Shared / Data::ReqRep::Shared::Client

Variable-length byte string requests and responses. Mutex-protected request queue with circular arena. Supports UTF-8 flag preservation.

my $srv = Data::ReqRep::Shared->new($path, $cap, $slots, $resp_size);
my $srv = Data::ReqRep::Shared->new($path, $cap, $slots, $resp_size, $arena);
Int -- Data::ReqRep::Shared::Int / Data::ReqRep::Shared::Int::Client

Single int64 request and response values. Lock-free Vyukov MPMC request queue. 1.5x faster single-process. No arena, no mutex on the request path.

my $srv = Data::ReqRep::Shared::Int->new($path, $cap, $slots);

Both variants share the same response slot infrastructure, the same generation-counter ABA protection, the same eventfd integration, and the same crash recovery mechanisms.

Constructors

Server (creates or opens the channel):

->new($path, ...)             # file-backed
->new(undef, ...)             # anonymous (fork-inherited)
->new_memfd($name, ...)       # memfd (fd-passing or fork)
->new_from_fd($fd)            # open from memfd fd

Client (opens existing channel):

->new($path)
->new_from_fd($fd)

Constructor arguments for Str: $path, $req_cap, $resp_slots, $resp_size [, $arena]. For Int: $path, $req_cap, $resp_slots.

Server API

my ($data, $id) = $srv->recv;              # non-blocking
my ($data, $id) = $srv->recv_wait;         # blocking
my ($data, $id) = $srv->recv_wait($secs);  # with timeout

Returns ($request_data, $id) or empty list. For Int, $data is an integer.

my $ok = $srv->reply($id, $response);

Writes response and wakes the client. Returns false if the slot was cancelled or recycled (generation mismatch).

Batch (Str only):

my @pairs = $srv->recv_multi($n);          # up to $n under one lock
my @pairs = $srv->recv_wait_multi($n, $timeout);
my @pairs = $srv->drain;
my @pairs = $srv->drain($max);

Returns flat list ($data1, $id1, $data2, $id2, ...).

Management:

$srv->clear;       $srv->sync;        $srv->unlink;
$srv->size;        $srv->capacity;    $srv->is_empty;
$srv->resp_slots;  $srv->resp_size;   $srv->stats;
$srv->path;        $srv->memfd;

eventfd (see "Event Loop Integration"):

$srv->eventfd;             $srv->eventfd_set($fd);
$srv->eventfd_consume;     $srv->notify;
$srv->fileno;              # current request eventfd (-1 if none)
$srv->reply_eventfd;       $srv->reply_eventfd_set($fd);
$srv->reply_eventfd_consume;  $srv->reply_notify;
$srv->reply_fileno;        # current reply eventfd (-1 if none)

Client API

Synchronous:

my $resp = $cli->req($data);                # infinite wait
my $resp = $cli->req_wait($data, $secs);    # single deadline

Asynchronous:

my $id   = $cli->send($data);               # non-blocking
my $id   = $cli->send_wait($data, $secs);   # blocking
my $resp = $cli->get($id);                   # non-blocking
my $resp = $cli->get_wait($id, $secs);       # blocking
$cli->cancel($id);                           # abandon request

cancel releases the slot only if the reply hasn't arrived yet. If it has (state is READY), cancel is a no-op -- call get() to drain.

Convenience (Str only):

my $id = $cli->send_notify($data);          # send + eventfd signal
my $id = $cli->send_wait_notify($data);

Status:

$cli->pending;     $cli->size;       $cli->capacity;
$cli->is_empty;    $cli->resp_slots; $cli->resp_size;
$cli->stats;       $cli->path;       $cli->memfd;

eventfd (see "Event Loop Integration"):

$cli->eventfd;             $cli->eventfd_set($fd);
$cli->eventfd_consume;     $cli->fileno;
$cli->notify;              # signal request eventfd
$cli->req_eventfd_set($fd);  $cli->req_fileno;

Event Loop Integration (eventfd)

Two eventfds for bidirectional notification. Both are opt-in -- send/reply do not signal automatically.

# Request notification (client -> server)
my $req_fd = $srv->eventfd;     # create
$srv->eventfd_consume;          # drain in callback
$cli->notify;                   # signal (or send_notify)
$cli->req_eventfd_set($fd);     # set inherited fd

# Reply notification (server -> client)
my $rep_fd = $srv->reply_eventfd;
$srv->reply_notify;             # signal after reply
$cli->eventfd;                  # create (maps to reply fd)
$cli->eventfd_consume;          # drain in callback
$cli->eventfd_set($fd);         # set inherited fd

For cross-process use, create both eventfds before fork() so child inherits the fds:

my $srv = Data::ReqRep::Shared->new($path, 1024, 64, 4096);
my $req_fd = $srv->eventfd;
my $rep_fd = $srv->reply_eventfd;

if (fork() == 0) {
    my $cli = Data::ReqRep::Shared::Client->new($path);
    $cli->req_eventfd_set($req_fd);
    $cli->eventfd_set($rep_fd);
    $cli->send_notify($data);       # wakes server
    # EV::io $rep_fd for reply ...
    exit;
}

# parent = server
my $w = EV::io $req_fd, EV::READ, sub {
    $srv->eventfd_consume;
    while (my ($req, $id) = $srv->recv) {
        $srv->reply($id, process($req));
    }
    $srv->reply_notify;
};

Crash Safety

  • Stale mutex -- if a process dies holding the request queue mutex, other processes detect it via PID tracking and recover within 2 seconds.

  • Stale response slots -- if a client dies while holding a slot (ACQUIRED or READY state), the slot is reclaimed automatically during the next slot acquisition scan.

  • ABA protection -- response slot IDs carry a generation counter. A cancelled-and-reacquired slot has a different generation, so stale reply/get/cancel calls are safely rejected.

Tuning

req_cap -- request queue capacity (power of 2). Higher for bursty workloads (1024-4096), lower for steady-state (64-256). Memory: 24 bytes/slot + arena (Str) or 24 bytes/slot (Int).
resp_slots -- max concurrent in-flight requests across all clients. One slot per outstanding async request. For synchronous req(), one per client suffices. Memory: 64 bytes/slot (Int) or (32 + resp_size rounded up to 64) bytes/slot (Str).
resp_size -- max response payload bytes (Str only). Fixed per slot. Responses exceeding this croak. Pick the 99th percentile.
arena -- request data arena bytes (Str only, default req_cap * 256). Increase for large requests. Monitor arena_used in stats().

Benchmarks

Linux x86_64. Run perl -Mblib bench/vs.pl 50000 to reproduce.

SINGLE-PROCESS ECHO (200K iterations)
ReqRep::Int (lock-free)    1.8M req/s
ReqRep::Str (12B, mutex)   1.2M req/s
ReqRep::Str batch (100x)   1.4M req/s

CROSS-PROCESS ECHO (50K iterations, 12B payload)
Pipe pair (1:1)            240K req/s
Unix socketpair (1:1)      222K req/s
ReqRep::Int                202K req/s  *
ReqRep::Str                177K req/s  *
IPC::Msg (SysV)            165K req/s
TCP loopback               115K req/s
MCE::Channel                96K req/s
Socketpair via broker       82K req/s
Forks::Queue (Shmem)         5K req/s

* = MPMC with per-request reply routing. Pipes and sockets are faster for simple 1:1 echo but require dedicated fd pairs per client-worker connection and cannot do MPMC without a broker (which halves throughput).

SEE ALSO

Data::Queue::Shared, Data::PubSub::Shared, Data::Buffer::Shared, Data::HashMap::Shared

AUTHOR

vividsnow

LICENSE

This is free software; you can redistribute it and/or modify it under the same terms as Perl itself.