NAME

Atomic::Pipe - Send atomic messages from multiple writers across a POSIX pipe.

DESCRIPTION

Normally if you write to a pipe from multiple processes/threads, the messages will come mixed together unpredictably. Some messages may be interrupted by parts of messages from other writers. This module takes advantage of some POSIX specifications to allow multiple writers to send arbitrary data down a pipe in atomic chunks to avoid the issue.

NOTE: This only works for POSIX compliant pipes on POSIX compliant systems. Also some features may not be available on older systems, or some platforms.

Also: https://man7.org/linux/man-pages/man7/pipe.7.html

POSIX.1 says that write(2)s of less than PIPE_BUF bytes must be
atomic: the output data is written to the pipe as a contiguous
sequence.  Writes of more than PIPE_BUF bytes may be nonatomic: the
kernel may interleave the data with data written by other processes.
POSIX.1 requires PIPE_BUF to be at least 512 bytes.  (On Linux,
PIPE_BUF is 4096 bytes.) [...]

Under the hood this module will split your message into small sections of slightly smaller than the PIPE_BUF limit. Each message will be sent as 1 atomic chunk with a 4 byte prefix indicating what process id it came from, what thread id it came from, a chunk ID (in descending order, so if there are 3 chunks the first will have id 2, the second 1, and the final chunk is always 0 allowing a flush as it knows it is done) and then 1 byte with the length of the data section to follow.

On the receiving end this module will read chunks and re-assemble them based on the header data. So the reader will always get complete messages. Note that message order is not guarenteed when messages are sent from multiple processes or threads. Though all messages from any given thread/process should be in order.

SYNOPSIS

use Atomic::Pipe;

my ($r, $w) = Atomic::Pipe->pair;

# Chunks will be set to the number of atomic chunks the message was split
# into. It is fine to ignore the value returned, it will always be an
# integer 1 or larger.
my $chunks = $w->write_message("Hello");

# $msg now contains "Hello";
my $msg = $r->read_message;

# Note, you can set the reader to be non-blocking:
$r->blocking(0);

# Writer too (but buffers unwritten items until your next write_burst(),
# write_message(), or flush(), or will do a writing block when the pipe
# instance is destroyed.
$w->blocking(0);

# $msg2 will be undef as no messages were sent, and blocking is turned off.
my $msg2 = $r->read_message;

Fork example from tests:

use Test2::V0;
use Test2::Require::RealFork;
use Test2::IPC;
use Atomic::Pipe;

my ($r, $w) = Atomic::Pipe->pair;

# For simplicty
$SIG{CHLD} = 'IGNORE';

# Forks and runs your coderef, then exits.
sub worker(&) { ... }

worker { is($w->write_message("aa" x $w->PIPE_BUF), 3, "$$ Wrote 3 chunks") };
worker { is($w->write_message("bb" x $w->PIPE_BUF), 3, "$$ Wrote 3 chunks") };
worker { is($w->write_message("cc" x $w->PIPE_BUF), 3, "$$ Wrote 3 chunks") };

my @messages = ();
push @messages => $r->read_message for 1 .. 3;

is(
    [sort @messages],
    [sort(('aa' x PIPE_BUF), ('bb' x PIPE_BUF), ('cc' x PIPE_BUF))],
    "Got all 3 long messages, not mangled or mixed, order not guarenteed"
);

done_testing;

MIXED DATA MODE

Mixed data mode is a special use-case for Atomic::Pipe. In this mode the assumption is that the writer end of the pipe uses the pipe as STDOUT or STDERR, and as such a lot of random non-atomic prints can happen on the writer end of the pipe. The special case is when you want to send atomic-chunks of data inline with the random prints, and in the end extract the data from the noise. The atomic nature of messages and bursts makes this possible.

Please note that mixed data mode makes use of 3 ASCII control characters:

If the random prints include SHIFT OUT then they will confuse the read-side parser and it will not be possible to extract data, in fact reading from the pipe will become quite unpredictable. In practice this is unlikely to cause issues, but printing a binary file or random noise could do it.

A burst may not include SHIFT IN as the SHIFT IN control+character marks the end of a burst. A burst may also not start with the DATA LINK ESCAPE control character as that is used to mark the start of a data-message.

data-messages may contain any data/characters/bytes as they messages include a length so an embedded SHIFT IN will not terminate things early.

# Create a pair in mixed-data mode
my ($r, $w) = Atomic::Pipe->pair(mixed_data_mode => 1);

# Open STDOUT to the write handle
open(STDOUT, '>&', $w->{wh}) or die "Could not clone write handle: $!";

# For sanity
$wh->autoflush(1);

print "A line!\n";

print "Start a line ..."; # Note no "\n"

# Any number of newlines is fine the message will send/recieve as a whole.
$w->write_burst("This is a burst message\n\n\n");

# Data will be broken into atomic chunks and sent
$w->write_message($data);

print "... Finish the line we started earlier\n";

my ($type, $data) = $r->get_line_burst_or_data;
# Type: 'line'
# Data: "A line!\n"

($type, $data) = $r->get_line_burst_or_data;
# Type: 'burst'
# Data: "This is a burst message\n\n\n"

($type, $data) = $r->get_line_burst_or_data;
# Type: 'message'
# Data: $data

($type, $data) = $r->get_line_burst_or_data;
# Type: 'line'
# Data: "Start a line ...... Finish the line we started earlier\n"

# mixed-data mode is always non-blocking
($type, $data) = $r->get_line_burst_or_data;
# Type: undef
# Data: undef

You can also turn mixed-data mode after construction, but you must do so on both ends:

$r->set_mixed_data_mode();
$w->set_mixed_data_mode();

Doing so will make the pipe non-blocking, and will make all bursts/messages include the necessary control characters.

METHODS

CLASS METHODS

OBJECT METHODS

PRIMARY INTERFACE

RESIZING THE PIPE BUFFER

On some newer linux systems it is possible to get/set the pipe size. On supported systems these allow you to do that, on other systems they are no-ops, and any that return a value will return undef.

Note: This has nothing to do with the similarly named PIPE_BUF which cannot be changed. This simply effects how much data can sit in a pipe before the writers block, it does not effect the max size of atomic writes.

SPLITTING THE PIPE INTO READER AND WRITER

If you used Atomic::Pipe->new() you need to now split the one object into readers and writers. These help you do that.

MIXED DATA MODE METHODS

SOURCE

The source code repository for Atomic-Pipe can be found at http://github.com/exodist/Atomic-Pipe.

MAINTAINERS

AUTHORS

COPYRIGHT

Copyright 2020 Chad Granum exodist7@gmail.com.

This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

See http://dev.perl.org/licenses/