NAME
PAGI::Request::BodyStream - Streaming body consumption for PAGI requests
SYNOPSIS
use PAGI::Request::BodyStream;
use Future::AsyncAwait;
# Basic streaming
my $stream = PAGI::Request::BodyStream->new(receive => $receive);
while (!$stream->is_done) {
my $chunk = await $stream->next_chunk;
last unless defined $chunk;
print "Got chunk: ", length($chunk), " bytes\n";
}
# With size limit
my $stream = PAGI::Request::BodyStream->new(
receive => $receive,
max_bytes => 1024 * 1024, # 1MB limit
);
# With UTF-8 decoding
my $stream = PAGI::Request::BodyStream->new(
receive => $receive,
decode => 'UTF-8',
);
# Stream to file
await $stream->stream_to_file('/tmp/upload.dat');
# Stream to custom sink
await $stream->stream_to(async sub ($chunk) {
# Process chunk
print STDERR "Processing: ", length($chunk), " bytes\n";
});
DESCRIPTION
PAGI::Request::BodyStream provides streaming body consumption for large request bodies. This is useful when you need to process request data incrementally without loading the entire body into memory.
The stream is pull-based: you call next_chunk() to receive the next chunk of data. The stream handles:
Size limits with customizable error messages
UTF-8 decoding with proper handling of incomplete sequences at chunk boundaries
Client disconnect detection
Efficient file streaming using async I/O
Important: Streaming is mutually exclusive with buffered body methods like body(), json(), form() in PAGI::Request. Once you start streaming, you cannot use those methods.
CONSTRUCTOR
new
my $stream = PAGI::Request::BodyStream->new(
receive => $receive, # Required: PAGI receive callback
max_bytes => 10485760, # Optional: max body size
decode => 'UTF-8', # Optional: decode to UTF-8
strict => 1, # Optional: strict UTF-8 (croak on invalid)
loop => $loop, # Optional: IO::Async::Loop instance
limit_name => 'body_size', # Optional: name for limit error message
);
Creates a new body stream.
receive- Required. The PAGI receive callback.max_bytes- Optional. Maximum bytes to read. Throws error if exceeded.decode- Optional. Encoding to decode chunks to (typically 'UTF-8').strict- Optional. If true, throw on invalid UTF-8. If false (default), use replacement characters.loop- Optional. IO::Async::Loop instance for async file operations. If not provided, a new loop will be created when needed.limit_name- Optional. Name to use in error message when max_bytes is exceeded (default: 'max_bytes').
METHODS
next_chunk
my $chunk = await $stream->next_chunk;
Returns a Future that resolves to the next chunk of data, or undef when the stream is exhausted or client disconnects.
If decode was specified in the constructor, chunks are decoded to the specified encoding. UTF-8 decoding properly handles incomplete multi-byte sequences at chunk boundaries.
Throws an exception if max_bytes is exceeded.
bytes_read
my $total = $stream->bytes_read;
Returns the total number of raw bytes read so far (before any decoding).
is_done
if ($stream->is_done) { ... }
Returns true if the stream has been exhausted (no more chunks available).
error
my $error = $stream->error;
Returns any error that occurred during streaming, or undef.
stream_to_file
await $stream->stream_to_file($path);
Streams the entire request body to a file using async I/O. Returns a Future that resolves to the number of bytes written.
This is efficient for large uploads as it doesn't load the entire body into memory.
Note: Cannot be used with the decode option as that would corrupt binary data. Use stream_to() with a custom handler if you need decoded chunks written to a file.
stream_to
await $stream->stream_to(async sub ($chunk) {
# Process chunk
});
Streams the entire request body to a custom sink callback. The callback receives each chunk and can be async (return a Future).
Returns a Future that resolves to the number of bytes processed.
INTERNAL METHODS
_decode_chunk
Internal method to decode a chunk with proper handling of incomplete UTF-8 sequences at boundaries.
EXAMPLES
Processing Large Uploads
async sub upload_handler ($scope, $receive, $send) {
my $stream = PAGI::Request::BodyStream->new(
receive => $receive,
max_bytes => 100 * 1024 * 1024, # 100MB limit
);
# Stream directly to file
my $bytes = await $stream->stream_to_file('/uploads/data.bin');
await $send->({
type => 'http.response.start',
status => 200,
headers => [['content-type', 'text/plain']],
});
await $send->({
type => 'http.response.body',
body => "Uploaded $bytes bytes\n",
});
}
Line-by-Line Processing
async sub process_csv ($scope, $receive, $send) {
my $stream = PAGI::Request::BodyStream->new(
receive => $receive,
decode => 'UTF-8',
);
my $line_buffer = '';
my $line_count = 0;
while (!$stream->is_done) {
my $chunk = await $stream->next_chunk;
last unless defined $chunk;
$line_buffer .= $chunk;
# Process complete lines
while ($line_buffer =~ s/^(.*?)\n//) {
my $line = $1;
$line_count++;
# Process $line...
}
}
# Process final line if no trailing newline
$line_count++ if length($line_buffer);
# Send response...
}
Custom Processing with Backpressure
async sub hash_upload ($scope, $receive, $send) {
use Digest::SHA;
my $stream = PAGI::Request::BodyStream->new(receive => $receive);
my $sha = Digest::SHA->new(256);
my $bytes = await $stream->stream_to(async sub ($chunk) {
$sha->add($chunk);
# Simulate slow processing (backpressure)
await some_slow_operation($chunk);
});
my $digest = $sha->hexdigest;
# Send digest response...
}
ERROR HANDLING
The stream throws exceptions in these cases:
max_bytesexceeded - Request body too largeUTF-8 decoding errors (when
strict =1>)File I/O errors during
stream_to_file
Always wrap stream operations in eval/try-catch:
use Syntax::Keyword::Try;
try {
await $stream->stream_to_file($path);
}
catch ($e) {
# Handle error
await send_error($send, 400, "Upload failed: $e");
}
SEE ALSO
PAGI::Request, PAGI::Util::AsyncFile, Future::AsyncAwait
AUTHOR
PAGI Contributors