NAME

PAGI::Request::BodyStream - Streaming body consumption for PAGI requests

SYNOPSIS

use PAGI::Request::BodyStream;
use Future::AsyncAwait;

# Basic streaming
my $stream = PAGI::Request::BodyStream->new(receive => $receive);

while (!$stream->is_done) {
    my $chunk = await $stream->next_chunk;
    last unless defined $chunk;
    print "Got chunk: ", length($chunk), " bytes\n";
}

# With size limit
my $stream = PAGI::Request::BodyStream->new(
    receive   => $receive,
    max_bytes => 1024 * 1024,  # 1MB limit
);

# With UTF-8 decoding
my $stream = PAGI::Request::BodyStream->new(
    receive => $receive,
    decode  => 'UTF-8',
);

# Stream to file
await $stream->stream_to_file('/tmp/upload.dat');

# Stream to custom sink
await $stream->stream_to(async sub ($chunk) {
    # Process chunk
    print STDERR "Processing: ", length($chunk), " bytes\n";
});

DESCRIPTION

PAGI::Request::BodyStream provides streaming body consumption for large request bodies. This is useful when you need to process request data incrementally without loading the entire body into memory.

The stream is pull-based: you call next_chunk() to receive the next chunk of data. The stream handles:

  • Size limits with customizable error messages

  • UTF-8 decoding with proper handling of incomplete sequences at chunk boundaries

  • Client disconnect detection

  • Efficient file streaming using async I/O

Important: Streaming is mutually exclusive with buffered body methods like body(), json(), form() in PAGI::Request. Once you start streaming, you cannot use those methods.

CONSTRUCTOR

new

my $stream = PAGI::Request::BodyStream->new(
    receive    => $receive,      # Required: PAGI receive callback
    max_bytes  => 10485760,      # Optional: max body size
    decode     => 'UTF-8',       # Optional: decode to UTF-8
    strict     => 1,             # Optional: strict UTF-8 (croak on invalid)
    loop       => $loop,         # Optional: IO::Async::Loop instance
    limit_name => 'body_size',   # Optional: name for limit error message
);

Creates a new body stream.

  • receive - Required. The PAGI receive callback.

  • max_bytes - Optional. Maximum bytes to read. Throws error if exceeded.

  • decode - Optional. Encoding to decode chunks to (typically 'UTF-8').

  • strict - Optional. If true, throw on invalid UTF-8. If false (default), use replacement characters.

  • loop - Optional. IO::Async::Loop instance for async file operations. If not provided, a new loop will be created when needed.

  • limit_name - Optional. Name to use in error message when max_bytes is exceeded (default: 'max_bytes').

METHODS

next_chunk

my $chunk = await $stream->next_chunk;

Returns a Future that resolves to the next chunk of data, or undef when the stream is exhausted or client disconnects.

If decode was specified in the constructor, chunks are decoded to the specified encoding. UTF-8 decoding properly handles incomplete multi-byte sequences at chunk boundaries.

Throws an exception if max_bytes is exceeded.

bytes_read

my $total = $stream->bytes_read;

Returns the total number of raw bytes read so far (before any decoding).

is_done

if ($stream->is_done) { ... }

Returns true if the stream has been exhausted (no more chunks available).

error

my $error = $stream->error;

Returns any error that occurred during streaming, or undef.

stream_to_file

await $stream->stream_to_file($path);

Streams the entire request body to a file using async I/O. Returns a Future that resolves to the number of bytes written.

This is efficient for large uploads as it doesn't load the entire body into memory.

Note: Cannot be used with the decode option as that would corrupt binary data. Use stream_to() with a custom handler if you need decoded chunks written to a file.

stream_to

await $stream->stream_to(async sub ($chunk) {
    # Process chunk
});

Streams the entire request body to a custom sink callback. The callback receives each chunk and can be async (return a Future).

Returns a Future that resolves to the number of bytes processed.

INTERNAL METHODS

_decode_chunk

Internal method to decode a chunk with proper handling of incomplete UTF-8 sequences at boundaries.

EXAMPLES

Processing Large Uploads

async sub upload_handler ($scope, $receive, $send) {
    my $stream = PAGI::Request::BodyStream->new(
        receive   => $receive,
        max_bytes => 100 * 1024 * 1024,  # 100MB limit
    );

    # Stream directly to file
    my $bytes = await $stream->stream_to_file('/uploads/data.bin');

    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [['content-type', 'text/plain']],
    });
    await $send->({
        type => 'http.response.body',
        body => "Uploaded $bytes bytes\n",
    });
}

Line-by-Line Processing

async sub process_csv ($scope, $receive, $send) {
    my $stream = PAGI::Request::BodyStream->new(
        receive => $receive,
        decode  => 'UTF-8',
    );

    my $line_buffer = '';
    my $line_count = 0;

    while (!$stream->is_done) {
        my $chunk = await $stream->next_chunk;
        last unless defined $chunk;

        $line_buffer .= $chunk;

        # Process complete lines
        while ($line_buffer =~ s/^(.*?)\n//) {
            my $line = $1;
            $line_count++;
            # Process $line...
        }
    }

    # Process final line if no trailing newline
    $line_count++ if length($line_buffer);

    # Send response...
}

Custom Processing with Backpressure

async sub hash_upload ($scope, $receive, $send) {
    use Digest::SHA;

    my $stream = PAGI::Request::BodyStream->new(receive => $receive);
    my $sha = Digest::SHA->new(256);

    my $bytes = await $stream->stream_to(async sub ($chunk) {
        $sha->add($chunk);

        # Simulate slow processing (backpressure)
        await some_slow_operation($chunk);
    });

    my $digest = $sha->hexdigest;

    # Send digest response...
}

ERROR HANDLING

The stream throws exceptions in these cases:

  • max_bytes exceeded - Request body too large

  • UTF-8 decoding errors (when strict = 1>)

  • File I/O errors during stream_to_file

Always wrap stream operations in eval/try-catch:

use Syntax::Keyword::Try;

try {
    await $stream->stream_to_file($path);
}
catch ($e) {
    # Handle error
    await send_error($send, 400, "Upload failed: $e");
}

SEE ALSO

PAGI::Request, PAGI::Util::AsyncFile, Future::AsyncAwait

AUTHOR

PAGI Contributors