NAME

PAGI::Request::MultipartStream - Pull-based streaming multipart/form-data engine

SYNOPSIS

use PAGI::Request::MultipartStream;
use Future::AsyncAwait;

# Usually obtained via $req->multipart_stream, not constructed directly:
my $stream = $req->multipart_stream;

while (defined(my $part = await $stream->next)) {
    if ($part->is_file) {
        await $part->stream_to_file($path);
    }
    else {
        my $value = await $part->value;  # raw bytes; you decode
    }
}

DESCRIPTION

A pull-based streaming parser for multipart/form-data request bodies. Each part of the body is exposed in turn as a PAGI::Request::Part via next, and the application decides where each part goes: you choose its sink (a file, an object store, an async transform) per part, rather than accepting the buffered, spool-each-upload-to-a-temp-file behaviour of form_params and upload in PAGI::Request.

Because you own the sink, it can be fully asynchronous: stream_to awaits a sink that returns a Future, so a slow downstream naturally backpressures the read. This is what the buffered multipart path cannot offer -- its spool to a temp file is blocking.

Internally this drives HTTP::MultiPartParser on demand, bridging its push-based callbacks onto an internal event queue that next and the part methods consume.

Mutually exclusive with the buffered body methods. An HTTP request body can only be consumed once. Once you create a multipart stream you cannot also call body/text/json/form_params/uploads, and a stream cannot be created if the body was already read; see "multipart_stream" in PAGI::Request.

CONSTRUCTOR

new

my $stream = PAGI::Request::MultipartStream->new(
    receive          => $receive,   # required: PAGI receive callback
    boundary         => $boundary,  # required: multipart boundary
    max_files        => 1000,       # optional limits (defaults shown)
    max_fields       => 1000,
    max_field_size   => 1024 * 1024,
    max_file_size    => 100 * 1024 * 1024,
    max_request_body => 1024 * 1024 * 1024,
);

Creates a new streaming multipart engine. Most applications do not call this directly -- they obtain a ready-built stream from "multipart_stream" in PAGI::Request, which extracts the boundary from the request's Content-Type and passes through the same limit options.

receive and boundary are required. The remaining options cap the body to bound memory and resource use:

  • max_files - Maximum number of file parts. Default: 1000.

  • max_fields - Maximum number of non-file (field) parts. Default: 1000.

  • max_field_size - Maximum size, in bytes, of any single field part. Default: 1 MiB (1024 * 1024).

  • max_file_size - Maximum size, in bytes, of any single file part. Default: 100 MiB (100 * 1024 * 1024).

  • max_request_body - Maximum total bytes read from the request body. Default: 1 GiB (1024 * 1024 * 1024). This is a per-stream defence-in-depth cap; the PAGI server's max_body_size is the primary aggregate limit on the request body.

METHODS

next

my $part = await $stream->next;

Returns a Future resolving to the next PAGI::Request::Part, or undef when the stream is exhausted (end of body).

Advancing past a part whose body you have not fully consumed auto-drains the remainder of that part first, so you can always loop on next without reading every part. To discard a part deliberately (and signal that intent), call $part->skip.

Croaks if a size or count limit is breached, or if the upload is truncated (see "LIMITS AND ERRORS").

NAME

PAGI::Request::Part - A single part of a streaming multipart request

DESCRIPTION

A value object representing one part yielded by PAGI::Request::MultipartStream. It carries the part's metadata (name, filename, headers) and provides the methods that consume the part's body: pull it chunk by chunk, buffer it whole, or drain it to a sink of your choosing.

A part's body must be consumed before the next part is fetched. Calling $stream->next while a part is only partially read drains the rest of the current part automatically.

CONSTRUCTOR

new

my $part = PAGI::Request::Part->new(stream => $stream, meta => \%meta);

Constructs a part bound to its owning stream. Parts are normally created by "next" in PAGI::Request::MultipartStream, not by application code.

METHODS

name

my $name = $part->name;

The part's form field name, taken from its Content-Disposition header.

filename

my $filename = $part->filename;

The part's filename from Content-Disposition, or undef for non-file (field) parts.

content_type

my $type = $part->content_type;

The part's Content-Type header. Defaults to text/plain if the part sent no Content-Type.

encoding

my $encoding = $part->encoding;

The part's Content-Transfer-Encoding header, or undef if not present.

headers

my $headers = $part->headers;

A hashref of all the part's headers, keyed by lower-cased header name.

is_file

if ($part->is_file) { ... }

True if the part has a filename (i.e. is a file upload), false otherwise.

type

my $type = $part->type;   # 'file' or 'field'

Returns the string 'file' for file parts and 'field' for non-file parts.

next_chunk

my $chunk = await $part->next_chunk;

Returns this part's next body chunk as raw bytes, or undef once the part's body is exhausted. This is the low-level primitive; value, stream_to, and stream_to_file are built on it.

value

my $bytes = await $part->value;

Buffers and returns the part's entire body as raw bytes -- no decoding is applied, so a text field encoded as UTF-8 (or any other charset) must be decoded by the caller. Intended for small field parts; the buffered size is bounded by the relevant per-part size limit (max_field_size for fields, max_file_size for files), which croaks if exceeded.

stream_to

my $count = await $part->stream_to($cb);
my $count = await $part->stream_to(async sub ($chunk) { await $sink->write($chunk) });

Drains the rest of the part to a sink callback, returning the number of bytes processed. The callback is invoked with each chunk of raw bytes and may be asynchronous: if it returns a Future, stream_to awaits it before reading the next chunk, giving the sink natural backpressure over the network read.

If the sink callback throws, the error poisons the stream -- a later $stream->next will croak -- and the failed part is not auto-drained, since the application has signalled it is aborting. The exception is re-thrown to the caller.

stream_to_file

my $count = await $part->stream_to_file($path);

Writes the part's body to a new file at $path, returning the number of bytes written. The file is opened with O_CREAT|O_EXCL|O_NOFOLLOW: the call refuses to overwrite an existing path or to follow a symlink, croaking instead. The result of close is checked. On any error -- a write failure, a limit breach, a truncated upload, or a failed close -- the partially written file is unlinked before the method croaks.

skip

await $part->skip;

Drains and discards any remaining body of this part. Use this to deliberately ignore a part; $stream->next would otherwise drain it for you anyway.

LIMITS AND ERRORS

Size and count limits are enforced as bytes arrive from the network, before your sink ever sees them, so an oversized part cannot stream partway into your sink before being rejected. A per-part size overflow names the offending part in its error message (for example, File part 'avatar' too large). Exceeding max_files, max_fields, or max_request_body likewise causes the next next/consume call to croak.

A client disconnect that truncates a part mid-stream croaks with an "Incomplete multipart upload" message (the closing boundary was never seen). By contrast, a complete body -- or an entirely empty one -- followed by a disconnect ends cleanly, with next simply returning undef.

SEE ALSO

PAGI::Request, PAGI::Request::BodyStream