NAME

PAGI::Cookbook - worked, runnable recipes for the PAGI protocol

DESCRIPTION

A gallery of complete recipes for "how do I do X in PAGI?". Where PAGI::Tutorial teaches the protocol as a linear path, the cookbook is for dipping in: each recipe is a self-contained application you can read top to bottom and run.

Every recipe is raw PAGI -- a single async sub over $scope, $receive, and $send, with no helper modules -- so what you see is the protocol itself, not a framework on top of it. Each one corresponds to a runnable application under the distribution's examples/ directory.

The shape of an application

A PAGI application is a file that returns an async sub. Here is the smallest complete one, in full:

use strict;
use warnings;
use Future::AsyncAwait;

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';

    await $send->({
        type    => 'http.response.start',
        status  => 200,
        headers => [ [ 'content-type', 'text/plain' ] ],
    });
    await $send->({
        type => 'http.response.body',
        body => 'Hello from PAGI',
        more => 0,
    });
};

$app;   # the server loads this file and uses the returned coderef

That is the complete examples/01-hello-http application. The recipes below show the async sub for each scenario; wrap any of them in the same header and trailing $app; to get a runnable file. Recipes that sleep also need use Future::IO;, and a couple need use Encode; -- noted where relevant.

Running a recipe

You need a PAGI server; these instructions use pagi-server from the PAGI-Server distribution, but any conforming server works:

pagi-server --app examples/05-sse-broadcaster/app.pl --port 5000

START HERE: A FRAMEWORK ON PAGI

Example: examples/mini-framework

The best way to understand PAGI's value is to see how little stands between the protocol and a real web framework. examples/mini-framework defines Nano -- routing, path parameters, and genuinely non-blocking async dispatch -- in about fifty lines of plain Perl, and an application that uses it never touches the protocol at all:

my $app = Nano->new;

$app->get('/' => sub { "Hello from a web framework built on PAGI!\n" });

$app->get('/hello/:name' => sub {
    my ($req) = @_;
    "Hello, $req->{params}{name}!\n";
});

# An async handler awaits slow work without blocking other requests:
$app->get('/slow/:secs' => async sub {
    my ($req) = @_;
    await Future::IO->sleep( $req->{params}{secs} );
    "Done.\n";
});

$app->to_app;   # turn the routes into a PAGI app

Read examples/mini-framework/README.md for the full walkthrough of how Nano is built -- it is the single best argument for building on PAGI.

HTTP

Hello, HTTP

Example: examples/01-hello-http

The complete app is shown above under "The shape of an application". The two things to take away: an HTTP response is one http.response.start event followed by one or more http.response.body events, and more => 0 marks the final body. Header names are lower-case, and bodies are bytes (encode text yourself -- see "UTF-8 round trips").

Reading a request body

Example: examples/03-request-body

The request body arrives as a stream of http.request events. Keep calling $receive until one has more false, accumulating as you go:

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';

    my $body = '';
    while (1) {
        my $event = await $receive->();
        last if $event->{type} ne 'http.request';   # e.g. http.disconnect
        $body .= $event->{body} // '';
        last unless $event->{more};                 # more => 0 ends the body
    }

    my $message = length $body ? "You sent: $body" : "No body provided";

    await $send->({
        type    => 'http.response.start',
        status  => 200,
        headers => [ [ 'content-type', 'text/plain' ] ],
    });
    await $send->({ type => 'http.response.body', body => $message, more => 0 });
};

The body bytes are raw and undecoded; the application decides how to interpret them. Even when you do not need the body, drain it before responding.

Streaming a response

Example: examples/02-streaming-response (needs use Future::IO;)

To stream, send several http.response.body events with more => 1 and a final one with more => 0. Declaring trailers => 1 on the start event lets you finish with an http.response.trailers event. Hold a pending $receive Future and check is_ready so you stop early if the client leaves:

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';

    await $send->({
        type     => 'http.response.start',
        status   => 200,
        headers  => [ [ 'content-type', 'text/plain' ] ],
        trailers => 1,
    });

    my @chunks     = ( "Chunk 1\n", "Chunk 2\n", "Chunk 3\n" );
    my $disconnect = $receive->();   # completes when http.disconnect arrives

    for my $i (0 .. $#chunks) {
        last if $disconnect->is_ready;          # client went away
        my $more = ($i < $#chunks) ? 1 : 0;
        await $send->({ type => 'http.response.body', body => $chunks[$i], more => $more });
        await Future::IO->sleep(1) if $more;    # pace the stream
    }

    await $send->({
        type    => 'http.response.trailers',
        headers => [ [ 'x-stream-complete', '1' ] ],
    });
};

UTF-8 round trips

Example: examples/12-utf8 (needs use Encode qw(decode_utf8 encode_utf8);)

PAGI is precise about where text is decoded for you and where you get raw bytes. This complete handler echoes a ?text=... query parameter back with a character count:

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';

    # $scope->{path} is already UTF-8-decoded per the spec. The query string
    # is raw bytes: percent-decode it, then decode UTF-8 yourself.
    my $text = '';
    if ( ( $scope->{query_string} // '' ) =~ /text=([^&]*)/ ) {
        my $raw = $1;
        $raw =~ s/%([0-9A-Fa-f]{2})/chr hex $1/eg;
        $text = decode_utf8($raw);
    }

    my $reply = "You sent: $text (" . length($text) . " characters)\n";
    my $bytes = encode_utf8($reply);   # responses go out as bytes

    await $send->({
        type    => 'http.response.start',
        status  => 200,
        headers => [
            [ 'content-type',   'text/plain; charset=utf-8' ],
            [ 'content-length', length $bytes ],   # BYTE count, not characters
        ],
    });
    await $send->({ type => 'http.response.body', body => $bytes, more => 0 });
};

The full example wraps this in an HTML page that also exercises the path and a form body. See the UTF-8 sections of PAGI::Spec::Www and PAGI for the complete rules.

Detecting client disconnect

When a client goes away mid-response -- closes the tab, drops the network -- you want to stop work, not keep streaming into a dead connection. Every scope type delivers a disconnect event (http.disconnect, websocket.disconnect, sse.disconnect) to $receive, so the universal way to notice is to receive it. Note that sending after a disconnect is simply a no-op -- it never raises -- so you detect disconnection through these mechanisms, not by checking the result of $send.

For HTTP, PAGI additionally gives you a connection-state object at $scope->{'pagi.connection'}. It exists because calling $receive on an HTTP scope just to poll for disconnect would consume request-body data; the object lets you check without touching the queue. It offers a cheap synchronous check, a cleanup callback, and a Future you can race against:

use Future;
use Future::IO;

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    my $conn = $scope->{'pagi.connection'};

    await $send->({
        type    => 'http.response.start',
        status  => 200,
        headers => [ [ 'content-type', 'text/plain' ] ],
    });

    $conn->on_disconnect(sub { warn "client went away\n" });   # fires only on an abnormal drop

    for my $i (1 .. 100) {
        last unless $conn->is_connected;   # cheap check before each step
        await $send->({ type => 'http.response.body', body => "line $i\n", more => 1 });
        await Future::IO->sleep(1);
    }
    await $send->({ type => 'http.response.body', body => '', more => 0 });
};

To cancel a long-running task the moment the client leaves, race it against $conn->disconnect_future -- wait_any resolves as soon as either Future completes, and cancels the other:

my $result = await Future->wait_any( $expensive_work, $conn->disconnect_future );

on_disconnect fires only when something goes wrong -- the client drops, a timeout fires, the server aborts. Its counterpart on_complete fires only when the response was fully delivered. Exactly one of the two runs per request, so they are the natural place to put rollback-vs-commit or abort-vs-finalize logic without juggling a flag yourself:

my $txn = $db->begin_work;
$conn->on_complete(sub   { $txn->commit   });   # response fully delivered
$conn->on_disconnect(sub { $txn->rollback });   # client left, or aborted

WebSocket and SSE do not get a pagi.connection object: their $receive queues carry messages, not a body you must preserve, so awaiting the websocket.disconnect / sse.disconnect event is already the clean way to notice -- as the WebSocket and SSE recipes do. (The PAGI::WebSocket and PAGI::SSE helper objects in PAGI-Tools wrap this for you.)

The full connection-state interface (is_connected, disconnect_reason, on_disconnect, on_complete, disconnect_future) is specified in "Connection State" in PAGI::Spec::Www.

WEBSOCKET

Echo server

Example: examples/04-websocket-echo

A WebSocket scope opens with a websocket.connect event that you answer with websocket.accept; then you loop, receiving websocket.receive frames and sending websocket.send, until websocket.disconnect. Unlike HTTP, one scope spans the whole connection:

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'websocket';

    my $event = await $receive->();
    die "Expected websocket.connect" if $event->{type} ne 'websocket.connect';
    await $send->({ type => 'websocket.accept' });

    while (1) {
        my $frame = await $receive->();
        if ($frame->{type} eq 'websocket.receive') {
            if (defined $frame->{text}) {
                await $send->({ type => 'websocket.send', text => "echo: $frame->{text}" });
            }
            elsif (defined $frame->{bytes}) {
                await $send->({ type => 'websocket.send', bytes => $frame->{bytes} });
            }
        }
        elsif ($frame->{type} eq 'websocket.disconnect') {
            last;
        }
    }
};

Rejecting a handshake with a custom response

Sending websocket.close before websocket.accept rejects a handshake with a bare 403. To reject with a custom response -- a 401 with a WWW-Authenticate header, a 429, a redirect -- send a websocket.http.response.start + websocket.http.response.body pair instead of websocket.accept, when the server advertises the websocket.http.response extension:

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'websocket';

    await $receive->();   # websocket.connect

    if (!authorized($scope)) {                       # your auth check
        if ($scope->{extensions}{'websocket.http.response'}) {
            await $send->({
                type    => 'websocket.http.response.start',
                status  => 401,
                headers => [ [ 'content-type',     'application/json' ],
                             [ 'www-authenticate', 'Bearer' ] ],
            });
            await $send->({
                type => 'websocket.http.response.body',
                body => '{"error":"unauthorized"}',
            });
        }
        else {
            await $send->({ type => 'websocket.close' });   # bare 403 fallback
        }
        return;
    }

    await $send->({ type => 'websocket.accept' });
    # ... normal session ...
};

These events are valid only before websocket.accept: the server writes the HTTP response and closes, so the WebSocket never opens. The PAGI::WebSocket helper in PAGI-Tools wraps this as $ws->deny(status => 401, headers => [...], body => ...), with the same fallback. See "WebSocket Denial Response (extension)" in PAGI::Spec::Www.

SERVER-SENT EVENTS

Broadcasting events

Example: examples/05-sse-broadcaster (needs use Future::IO;)

Open the stream with sse.start, then push sse.send events, each a { event => ..., data => ... }. Hold a $receive Future and check it so you stop when the client closes:

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'sse';

    await $send->({
        type    => 'sse.start',
        status  => 200,
        headers => [ [ 'content-type', 'text/event-stream' ] ],
    });

    my $disconnect = $receive->();   # completes on sse.disconnect

    my @events = (
        { event => 'tick', data => '1' },
        { event => 'tick', data => '2' },
        { event => 'done', data => 'finished' },
    );

    for my $msg (@events) {
        last if $disconnect->is_ready;
        await Future::IO->sleep(1);
        await $send->({ type => 'sse.send', %$msg });
    }
};

Using Future::IO->sleep rather than sleep keeps the handler non-blocking and loop-agnostic.

FLOW CONTROL

Keeping a slow client current instead of stale

Example: examples/13-flow-control (the same pattern over SSE, easy to run with curl)

Every streaming scope -- http, websocket, and sse -- may carry a flow-control handle at $scope->{'pagi.transport'}. It reports how many bytes the server has queued for the client but not yet written to the network, so you can make your own delivery decisions instead of only blocking until the buffer drains. It is the server-side analogue of the browser's WebSocket.bufferedAmount -- a primitive many async stacks do not expose at all.

The handle is optional: a server that cannot measure its outbound buffer omits it, so guard for undef. The first use is conflation -- on a high-frequency feed, skip an update when the client is already behind, because it will get the next, fresher one. A sparse-but-current feed beats a dense-but-lagging one:

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'websocket';

    await $receive->();                          # websocket.connect
    await $send->({ type => 'websocket.accept' });

    my $transport = $scope->{'pagi.transport'};  # undef if unsupported

    while (1) {
        my $update = await next_update();        # your high-frequency feed

        # Skip this tick if the client is falling behind. Threshold relative
        # to the server's own ceiling, not a hard-coded byte count.
        if ($transport) {
            my $ceiling = $transport->high_water_mark // 65536;
            next if $transport->buffered_amount > $ceiling / 2;
        }

        await $send->({ type => 'websocket.send', text => $update });
    }
};

buffered_amount is a synchronous, non-blocking read; high_water_mark and low_water_mark report the band at which the server engages and releases backpressure. (The PAGI-Tools helpers wrap the same handle as $ws->buffered_amount and $ws->is_writable.)

Pausing a producer you do not control

Conflation works when you drive the loop and can choose to skip. But sometimes an external source -- a pub/sub subscription, a database change feed -- pushes events at you faster than a slow client drains, and you cannot self-pace by awaiting $send. For that the handle offers two edge-triggered callbacks: on_high_water fires once when the buffer crosses up to the high mark, and on_drain fires once when it falls back below the low mark. Use them to pause and resume the source:

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'sse';

    await $send->({
        type    => 'sse.start',
        status  => 200,
        headers => [ [ 'content-type', 'text/event-stream' ] ],
    });

    my $feed = subscribe_to_feed();         # your external event source
    $feed->on_event(sub {
        my ($data) = @_;
        $send->({ type => 'sse.send', data => $data });   # not awaited
    });

    # The feed ignores the client's pace, so let the transport throttle it:
    # pause when the buffer fills, resume when it drains. Each fires once per
    # crossing, then re-arms.
    my $transport = $scope->{'pagi.transport'};
    if ($transport) {
        $transport->on_high_water(sub { $feed->pause });
        $transport->on_drain(sub     { $feed->resume });
    }

    await $receive->();   # park here until sse.disconnect
    $feed->cancel;
};

The two callbacks form a hysteresis cycle -- pause at the high mark, resume at the low mark, with a gap between so a buffer hovering near one threshold does not flap. They are the push-model counterpart to the synchronous reads: reach for the reads when you drive the loop, the callbacks when something else does.

The full interface (buffered_amount, high_water_mark, low_water_mark, on_high_water, on_drain) is specified in "Transport Flow Control" in PAGI::Spec::Www.

APPLICATION LIFECYCLE

Lifespan and shared state

Example: examples/06-lifespan-state

The lifespan scope runs once per process, around startup and shutdown. Reply to each event, and use it to build state that later requests read from $scope->{state}. This complete app handles both scope types:

my $app = async sub {
    my ($scope, $receive, $send) = @_;

    if ($scope->{type} eq 'lifespan') {
        my $state = $scope->{state} //= {};
        while (1) {
            my $event = await $receive->();
            if ($event->{type} eq 'lifespan.startup') {
                $state->{greeting} = 'Hello from lifespan';
                await $send->({ type => 'lifespan.startup.complete' });
            }
            elsif ($event->{type} eq 'lifespan.shutdown') {
                await $send->({ type => 'lifespan.shutdown.complete' });
                last;
            }
        }
        return;
    }

    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';

    # Drain the request body, then reply using the shared state.
    while (1) {
        my $event = await $receive->();
        last if $event->{type} ne 'http.request';
        last unless $event->{more};
    }

    my $greeting = ( $scope->{state} // {} )->{greeting} // 'Hello';
    await $send->({
        type    => 'http.response.start',
        status  => 200,
        headers => [ [ 'content-type', 'text/plain' ] ],
    });
    await $send->({ type => 'http.response.body', body => "$greeting via shared state", more => 0 });
};

Not every server or run drives lifespan, so an app should still work when the startup event never arrives (hence the // {} and // 'Hello' fallbacks).

Each request gets a shallow copy of state, so this read-only pattern -- set a value at startup, read it in requests -- is exactly what it is built for. To share something mutable, store a single object or container at startup and mutate it through that shared reference; never reassign a top-level state key from a request, which changes only that request's copy. See "Lifespan State" in PAGI::Spec::Lifespan.

SERVER EXTENSIONS

Forcing a flush: the fullflush extension

Example: examples/07-extension-fullflush

Servers advertise optional capabilities in $scope->{extensions}; always check for one before using it. fullflush pushes buffered bytes to the client immediately:

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';

    my $supports_fullflush = exists $scope->{extensions}{fullflush};

    await $send->({
        type    => 'http.response.start',
        status  => 200,
        headers => [ [ 'content-type', 'text/plain' ] ],
    });

    my @chunks = ( "Line 1\n", "Line 2\n", "Line 3\n" );
    for my $i (0 .. $#chunks) {
        my $is_last = ( $i == $#chunks );
        await $send->({ type => 'http.response.body', body => $chunks[$i], more => $is_last ? 0 : 1 });
        await $send->({ type => 'http.fullflush' }) if $supports_fullflush && !$is_last;
    }
};

The extension mechanism is described in PAGI::Spec::Extensions.

Inspecting the TLS connection

Example: examples/08-tls-introspection

When the connection is over TLS and the server supports the extension, $scope->{extensions}{tls} carries the connection's security metadata; the key is absent for cleartext connections:

my $app = async sub {
    my ($scope, $receive, $send) = @_;
    die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';

    my $tls  = $scope->{extensions}{tls};
    my $body = $tls
        ? sprintf("TLS version 0x%04x; client cert: %s\n",
                  $tls->{tls_version} // 0, $tls->{client_cert_name} // '(none)')
        : "Connection is not using TLS\n";

    await $send->({
        type    => 'http.response.start',
        status  => 200,
        headers => [ [ 'content-type', 'text/plain' ] ],
    });
    await $send->({ type => 'http.response.body', body => $body, more => 0 });
};

The full example pretty-prints more fields (cipher suite, certificate chain) as JSON. See PAGI::Spec::Tls for the complete set.

PUTTING IT TOGETHER

A real-time job runner

Example: examples/11-job-runner

The capstone example is too large to inline -- it spans several modules -- but it is worth reading once you are comfortable with the recipes above. It combines all three application protocols in one app sharing a single job queue: a REST API over HTTP to create and cancel jobs, per-job progress over SSE, and a live queue dashboard over WebSocket. It is the most complete demonstration of raw PAGI.

Note: it drives IO::Async timers directly, so it runs only under an IO::Async-based server such as PAGI::Server -- see its README.md for how it could be made loop-agnostic with Future::IO.

INTEROP

Running a PSGI app under PAGI

You do not have to rewrite a PSGI application to use PAGI. Wrap it once with PAGI::App::WrapPSGI (from the PAGI-Tools distribution) -- PAGI::App::WrapPSGI->new( psgi_app => $app )->to_app -- and it runs on any conforming PAGI server, with native async, WebSocket, or SSE handlers dispatched beside it. A runnable demo ships with PAGI-Tools at examples/09-psgi-bridge. See PAGI::PSGI for the full migration guide.

SEE ALSO

AUTHOR

John Napiorkowski <jjnapiork@cpan.org>

COPYRIGHT AND LICENSE

This software is licensed under the same terms as Perl itself.