NAME

PAGI::Tutorial - A comprehensive guide to building async web applications with PAGI

DESCRIPTION

This tutorial introduces PAGI (Perl Asynchronous Gateway Interface), a modern web framework specification for Perl that brings async/await patterns to web development. Whether you're building real-time applications with WebSockets, streaming data with Server-Sent Events, or just want better concurrency handling, PAGI provides the foundation you need.

PART 1: GETTING STARTED

1.1 Introduction

What is PAGI?

PAGI (Perl Asynchronous Gateway Interface) is a specification and framework for building asynchronous web applications in Perl. It's a spiritual successor to PSGI that embraces modern async/await patterns using Future::AsyncAwait and provides first-class support for WebSockets, Server-Sent Events, and HTTP/1.1.

Unlike traditional synchronous web frameworks, PAGI allows your application to handle multiple concurrent connections efficiently without blocking, making it ideal for:

  • WebSocket applications (real-time chat, live updates)

  • Server-Sent Events (SSE) for streaming data

  • Long-polling and streaming responses

  • High-concurrency applications

  • Applications that make multiple I/O calls per request

Why Async?

Traditional synchronous web frameworks handle one request at a time per process. When your application waits for a database query, an external API call, or file I/O, the entire process is blocked and can't handle other requests.

The Traditional Solution: Forking Servers

Perl web applications have traditionally used pre-forking servers like Starman to achieve concurrency. Starman spawns multiple worker processes, each handling one request at a time. This approach works well and has served the Perl community for years:

Approach        | Forking (Starman)           | Async (PAGI)
----------------|-----------------------------|--------------------------
Concurrency     | Multiple processes          | Single process, event loop
Memory          | Each worker copies app      | Shared memory, lower usage
Connections     | Limited by worker count     | Thousands per process
WebSocket/SSE   | Impractical (ties up worker)| Natural fit
Blocking I/O    | Fine (isolated per worker)  | Must use async libraries
CPU-bound work  | Fine (isolated per worker)  | Blocks event loop (use workers)
Debugging       | Simpler (sequential code)   | Trickier (async flow)
Existing code   | Works as-is                 | May need async adapters

When to use which:

  • Starman/forking - CPU-heavy work, existing sync codebases, simple request/response apps, blocking database drivers

  • PAGI/async - WebSockets, SSE, long-polling, high connection counts, I/O-bound apps, real-time features

Many applications benefit from both: use PAGI for real-time endpoints (WebSocket, SSE) and forking for traditional request/response. PAGI also supports worker mode (--workers N) which combines both approaches.

The Async Advantage

Asynchronous programming allows your application to handle multiple requests concurrently in a single process. When one request is waiting for I/O, the event loop can switch to another request, dramatically improving resource utilization and response times.

This is especially important for:

  • Real-time bidirectional communication (WebSockets)

  • Keeping connections open for streaming (SSE)

  • Applications that make multiple external API calls

  • High-traffic applications where blocking I/O is a bottleneck

PAGI vs PSGI

If you're familiar with PSGI, here are the key differences:

Feature          | PSGI                    | PAGI
-----------------|-------------------------|---------------------------
Execution Model  | Synchronous             | Asynchronous (async/await)
Application Sig  | sub { $env }            | async sub { $scope, $receive, $send }
Request Metadata | %env hash               | %scope hash
Request Body     | $env->{'psgi.input'}    | await $receive->()
Response         | [\@status, \@headers, \@body] | await $send->({...})
WebSocket        | Not supported           | First-class support
SSE              | Hacky streaming         | First-class support
Backpressure     | No explicit control     | Explicit via Futures

The async model means you can write code that looks synchronous (thanks to async/await) while getting all the benefits of non-blocking I/O.

Prerequisites

To use PAGI, you'll need:

  • Perl 5.16 or later (released May 2012) - required by Future::AsyncAwait

  • Future::AsyncAwait - Provides async/await syntax

  • IO::Async - Event loop and async I/O primitives

  • Basic understanding of Futures and async programming (we'll cover the essentials)

1.2 Installation

Installing PAGI from CPAN:

cpanm PAGI

Or from source:

git clone https://github.com/yourusername/PAGI.git
cd PAGI
cpanm --installdeps .

To include development dependencies (for testing):

cpanm --installdeps . --with-develop

Verify the installation:

pagi-server --version

1.3 Your First PAGI App

Let's create a simple "Hello, World!" application. Create a file called hello.pl:

use strict;
use warnings;
use Future::AsyncAwait;

async sub app {
    my ($scope, $receive, $send) = @_;

    # Only handle HTTP requests (server also sends lifespan events)
    die "Expected http scope" unless $scope->{type} eq 'http';

    # Send the response status and headers
    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [['content-type', 'text/plain']],
    });

    # Send the response body
    await $send->({
        type => 'http.response.body',
        body => 'Hello, World!',
    });
}

# Return a reference to the app subroutine
\&app;

Let's break down what's happening:

1. async sub app - Declares an asynchronous subroutine using Future::AsyncAwait
2. ($scope, $receive, $send) - The three parameters every PAGI app receives:
  • $scope - Hash reference containing request metadata (method, path, headers, etc.)

  • $receive - Async code reference for receiving events from the client

  • $send - Async code reference for sending events to the client

3. return unless $scope->{type} eq 'http' - The server calls your app for different event types (HTTP requests, WebSocket, lifespan). This check ensures we only handle HTTP requests.
4. await $send->({...}) - Sends events asynchronously, waiting for each to complete
5. \&app - Returns a reference to the application subroutine

Run your application:

pagi-server hello.pl --port 5000

Test it:

curl http://localhost:5000/

You should see:

Hello, World!

Try accessing it from a browser by visiting http://localhost:5000/.

Inspecting the Scope

The $scope hash contains metadata about the current connection - similar to PSGI's $env hash. It tells you everything about the request before you read the body: the HTTP method, path, headers, query string, client address, and more. Unlike PSGI where everything is in one flat hash, PAGI separates the metadata ($scope) from the body (read via $receive).

Let's modify the app to show what's in $scope:

use strict;
use warnings;
use Future::AsyncAwait;
use Data::Dumper;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $body = Dumper($scope);

    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [['content-type', 'text/plain']],
    });

    await $send->({
        type => 'http.response.body',
        body => $body,
    });
}

\&app;

Run this and visit http://localhost:5000/test?foo=bar. You'll see the scope hash which includes:

  • type - Connection type (http, websocket, sse, lifespan)

  • method - HTTP method (GET, POST, etc.)

  • path - URL path (decoded UTF-8)

  • raw_path - URL path (raw bytes)

  • query_string - Query string (raw bytes)

  • headers - Array reference of [name, value] pairs

  • server - Server address and port

  • And more...

1.4 Understanding the Event Loop: Blocking vs Non-Blocking

Understanding the event loop is crucial to writing efficient PAGI applications. Let's visualize how it works.

The Event Loop

PAGI uses IO::Async::Loop to manage asynchronous operations. Here's a simplified view of how the event loop handles multiple connections:

Single-Threaded Event Loop

Time -->

Request A: [======API Call (async)======]----[Process]--[Send]
Request B:     [--DB Query (async)--]--------[Process]--------[Send]
Request C:         [Timer (async)]----[Process]----[Send]

Event Loop: [A][B][C][A][B][C][B][A][C][B][A][C][A][B]
            |  |  |  |  |  |  |  |  |  |  |  |  |  |
            Continuously switching between requests as they wait

When you use await, you're telling the event loop "I'm waiting for something, go handle other requests until this completes." The event loop can then switch to another request, making efficient use of CPU time.

Non-Blocking Operations (GOOD)

These operations yield control to the event loop:

use Future::AsyncAwait;
use IO::Async::Loop;
use IO::Async::Timer::Countdown;
use Net::Async::HTTP;

async sub good_app {
    my ($scope, $receive, $send) = @_;

    # IO::Async::Loop->new returns the singleton loop instance
    my $loop = IO::Async::Loop->new;

    # Good: Async timer (doesn't block)
    my $timer = IO::Async::Timer::Countdown->new(
        delay => 5,
        on_expire => sub { },
    );
    $loop->add($timer);
    $timer->start;
    await $timer->future;

    # Good: Async HTTP client (doesn't block)
    my $http = Net::Async::HTTP->new;
    $loop->add($http);
    my $response = await $http->GET('https://api.example.com/data');

    # Good: Async database query (using a hypothetical async DB driver)
    # my $row = await $db->selectrow_async('SELECT * FROM users WHERE id = ?', $user_id);

    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [['content-type', 'text/plain']],
    });

    await $send->({
        type => 'http.response.body',
        body => 'Non-blocking operations completed!',
    });
}

\&good_app;

Blocking Operations (BAD)

These operations block the entire process:

use Future::AsyncAwait;
use LWP::UserAgent;  # Synchronous HTTP client
use DBI;             # Most DBI drivers are synchronous

async sub bad_app {
    my ($scope, $receive, $send) = @_;

    # BAD: Blocks for 5 seconds, prevents other requests from being handled
    sleep 5;

    # BAD: Synchronous HTTP request blocks until complete
    my $ua = LWP::UserAgent->new;
    my $response = $ua->get('https://api.example.com/data');

    # BAD: Synchronous database query blocks
    # my $dbh = DBI->connect(...);
    # my $row = $dbh->selectrow_hashref('SELECT * FROM users WHERE id = ?', undef, $user_id);

    # BAD: Expensive CPU operation blocks (e.g., bcrypt password hashing)
    # my $hashed = bcrypt_hash($password);  # Takes ~100ms

    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [['content-type', 'text/plain']],
    });

    await $send->({
        type => 'http.response.body',
        body => 'Blocking operations completed (badly)!',
    });
}

\&bad_app;

Why Blocking is Bad

Let's do the math:

  • You have 1,000 concurrent connections

  • Each request needs to make a synchronous database query that takes 100ms

  • With a blocking operation, each connection is stuck for 100ms

  • 1,000 connections × 100ms = 100 seconds of total blocked time

  • With async operations, all 1,000 queries can be in-flight simultaneously

In a single-threaded event loop, one blocking operation prevents ALL other connections from making progress. A single sleep(5) will freeze your entire application for 5 seconds!

Solutions for Blocking Operations

When you must use blocking operations, you have three options:

1. Use Async Libraries

Prefer async alternatives when available:

2. Use IO::Async::Function

Offload blocking operations to a subprocess via IO::Async::Function:

use IO::Async::Loop;
use IO::Async::Function;

my $loop = IO::Async::Loop->new;  # Returns singleton

my $worker = IO::Async::Function->new(
    code => sub {
        my ($password) = @_;
        # This runs in a separate process (fork)
        return bcrypt_hash($password);
    },
);
$loop->add($worker);  # Forks the worker process here

# Call the blocking operation asynchronously
my $hashed = await $worker->call(args => [$password]);

Note: IO::Async::Function uses fork() to create worker processes. If you're already running the server in worker mode (--workers 4), each server worker may fork additional Function workers. Be mindful of total process count - too many forks leads to memory pressure and context-switching overhead. For high-volume blocking work, consider a dedicated job queue (Redis, RabbitMQ) instead.

3. Use Worker Mode

Run PAGI::Server with multiple worker processes (see section 1.5).

1.5 Worker Mode

PAGI::Server supports two deployment modes: single-process and multi-worker.

Single Process vs Worker Mode

Single Process Mode:

[Master Process]
  |
  +-- Runs event loop
  +-- Handles all connections
  +-- Good for: async apps, development, low-medium traffic


Worker Mode:

[Master Process]
  |
  +-- [Worker 1] (handles connections)
  +-- [Worker 2] (handles connections)
  +-- [Worker 3] (handles connections)
  +-- [Worker 4] (handles connections)

Good for: apps with blocking operations, high CPU usage, multi-core utilization

When to Use Worker Mode

Scenario                           | Single Process | Worker Mode
-----------------------------------|----------------|-------------
Pure async app (no blocking)       | Yes            | Yes
App uses some blocking operations  | No             | Yes
High CPU usage per request         | No             | Yes
WebSocket/SSE with shared state    | Yes            | Needs PubSub
Development/testing                | Yes            | Optional
Production (multi-core server)     | Optional       | Yes

Running with Workers

Start the server with multiple worker processes:

pagi-server hello.pl --port 5000 --workers 4

This creates:

  • 1 master process that accepts connections

  • 4 worker processes that handle requests

  • Automatic worker respawning if a worker crashes

The number of workers should typically match your CPU core count:

# Check your CPU core count and set workers accordingly
# On Linux: nproc   On macOS: sysctl -n hw.ncpu
pagi-server hello.pl --port 5000 --workers 4

Worker Isolation

Important: Each worker process is isolated. They do NOT share:

  • Memory (variables are per-worker)

  • Database connections

  • WebSocket connections

  • Application state

Wrong: Shared State in Memory

This will NOT work correctly across workers:

# DON'T DO THIS in worker mode!
my $counter = 0;  # Each worker has its own $counter

async sub app {
    my ($scope, $receive, $send) = @_;

    $counter++;  # Only increments in THIS worker

    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [['content-type', 'text/plain']],
    });

    await $send->({
        type => 'http.response.body',
        body => "Counter: $counter",  # Will show different values per worker
    });
}

\&app;

Right: External State Storage

Use a shared backend for state:

use Future::AsyncAwait;
use IO::Async::Loop;
use Net::Async::Redis;

async sub app {
    my ($scope, $receive, $send) = @_;

    my $loop = IO::Async::Loop->new;

    # Connect to Redis (shared across all workers)
    my $redis = Net::Async::Redis->new;
    $loop->add($redis);
    await $redis->connect(host => 'localhost', port => 6379);

    # Increment counter in Redis
    my $counter = await $redis->incr('request_counter');

    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [['content-type', 'text/plain']],
    });

    await $send->({
        type => 'http.response.body',
        body => "Counter: $counter",  # Accurate across all workers
    });
}

\&app;

For WebSocket applications that need to broadcast messages across workers, use PAGI::Middleware::PubSub with a Redis backend.

PART 2: UNDERSTANDING RAW PAGI

In Part 1, we covered the basics of PAGI applications and the event loop. Now we'll dive deeper into the raw PAGI protocol, exploring how to work directly with the three arguments ($scope, $receive, and $send) before introducing higher-level abstractions.

Understanding raw PAGI is important because:

  • It helps you understand what's happening under the hood

  • You can build custom middleware and utilities

  • You'll appreciate what frameworks like PAGI::Endpoint::Router do for you

  • You can handle edge cases that frameworks might not cover

2.1 The Three Arguments

Every PAGI application receives exactly three arguments:

async sub app {
    my ($scope, $receive, $send) = @_;
    # ... your code here
}

Let's examine each in detail.

$scope - Connection Metadata

$scope is a hash reference containing metadata about the connection and request. The contents vary depending on the connection type.

For HTTP requests (type => 'http'):

{
    type => 'http',
    method => 'GET',               # HTTP method
    path => '/users/123',          # URL path (decoded UTF-8)
    raw_path => '/users/123',      # URL path (raw bytes)
    query_string => 'foo=bar',     # Query string (raw bytes)
    headers => [                   # Array of [name, value] pairs
        ['host', 'localhost:5000'],
        ['user-agent', 'curl/7.68.0'],
        ['content-type', 'application/json'],
    ],
    server => ['127.0.0.1', 5000], # Server address and port
    client => ['127.0.0.1', 54321],# Client address and port
    scheme => 'http',              # 'http' or 'https'
    http_version => '1.1',         # HTTP version
    loop => $loop,                 # IO::Async::Loop instance
    extensions => { ... },         # PAGI extensions
}

For WebSocket connections (type => 'websocket'):

{
    type => 'websocket',
    path => '/ws',
    headers => [ ... ],
    subprotocols => ['chat', 'superchat'],  # Requested subprotocols
    # ... other fields similar to http
}

For SSE connections (type => 'sse'):

{
    type => 'sse',
    path => '/events',
    headers => [ ... ],
    # ... other fields similar to http
}

For lifespan events (type => 'lifespan'):

{
    type => 'lifespan',
}

$receive - Receiving Client Events

$receive is an asynchronous code reference that returns a Future. Call it to receive the next event from the client:

my $event = await $receive->();

The structure of $event depends on the connection type and what's happening.

For HTTP requests, you receive body chunks:

{
    type => 'http.request',
    body => '{"name":"John"}',  # Raw bytes
    more => 0,                  # 0 = last chunk (default), 1 = more coming
}

For WebSocket, you receive messages and disconnect events:

# Message received
{
    type => 'websocket.receive',
    text => 'Hello!',           # For text messages
    # OR
    bytes => "\x00\x01\x02",    # For binary messages
}

# Connection closed
{
    type => 'websocket.disconnect',
    code => 1000,               # Close code
    reason => 'Normal closure', # Close reason
}

For lifespan, you receive startup and shutdown events:

{ type => 'lifespan.startup' }
{ type => 'lifespan.shutdown' }

$send - Sending Events to Client

$send is an asynchronous code reference that sends events to the client. It returns a Future that completes when the event has been sent:

await $send->({ type => 'http.response.start', status => 200, headers => [...] });
await $send->({ type => 'http.response.body', body => 'Hello' });

The events you send depend on the connection type.

2.2 Scope Types

PAGI defines four scope types, each representing a different kind of connection or event.

http - Standard HTTP Requests

The most common type. Used for normal HTTP request/response cycles.

Check for it:

if ($scope->{type} eq 'http') {
    # Handle HTTP request
}

websocket - WebSocket Connections

Used for bidirectional real-time communication. WebSocket connections start as HTTP upgrade requests.

Check for it:

if ($scope->{type} eq 'websocket') {
    # Handle WebSocket connection
}

See PAGI::WebSocket for WebSocket utilities.

sse - Server-Sent Events

A PAGI extension for server-to-client streaming. SSE allows the server to push updates to the client over a long-lived HTTP connection.

Check for it:

if ($scope->{type} eq 'sse') {
    # Handle SSE connection
}

See PAGI::SSE for SSE utilities.

lifespan - Application Lifecycle Events

Used for application startup and shutdown events. This allows your application to initialize resources (database connections, caches, etc.) when it starts and clean them up when it shuts down.

Check for it:

if ($scope->{type} eq 'lifespan') {
    # Handle startup/shutdown
}

Note: PAGI::Endpoint::Router handles this via on_startup and on_shutdown callbacks.

2.3 HTTP Request/Response Cycle (Raw)

Let's build a complete HTTP request handler that reads the request body and echoes it back.

use strict;
use warnings;
use Future::AsyncAwait;

async sub app {
    my ($scope, $receive, $send) = @_;

    # Only handle HTTP requests
    die "Expected http scope" unless $scope->{type} eq 'http';

    # Read the entire request body
    # NOTE: This loop does NOT block the event loop! The 'await' keyword
    # yields control while waiting for data, allowing other requests to
    # be processed. The loop only runs when there's actually data.
    my $body = '';
    while (1) {
        my $event = await $receive->();
        $body .= $event->{body} if defined $event->{body};
        last unless $event->{more};  # Stop when no more chunks
    }

    # Prepare response body
    my $response_body = "You sent: " . (length($body) ? $body : "(empty)");

    # Send response start (status and headers)
    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [
            ['content-type', 'text/plain'],
            ['content-length', length($response_body)],
        ],
    });

    # Send response body (more => 0 is the default, so can be omitted)
    await $send->({
        type => 'http.response.body',
        body => $response_body,
    });
}

\&app;

Test it:

# Terminal 1: Start server
pagi-server echo.pl --port 5000

# Terminal 2: Send a request with body
curl -X POST -d "Hello, PAGI!" http://localhost:5000/

# Output: You sent: Hello, PAGI!

Key points:

  • Always check more to know when the request body is complete

  • Send http.response.start before sending any body

  • more => 0 is the default for final chunks (can be omitted)

  • The request body is raw bytes (you may need to decode it)

2.4 WebSocket (Raw)

WebSocket enables bidirectional real-time communication. Here's a simple echo server:

use strict;
use warnings;
use Future::AsyncAwait;

async sub app {
    my ($scope, $receive, $send) = @_;

    # Only handle WebSocket connections
    die "Expected websocket scope" unless $scope->{type} eq 'websocket';

    # Accept the WebSocket connection
    await $send->({
        type => 'websocket.accept',
        # Optional: specify a subprotocol from $scope->{subprotocols}
        # subprotocol => 'chat',
    });

    # Echo loop: receive messages and send them back
    # NOTE: This loop does NOT block! The 'await' yields to the event
    # loop while waiting, allowing other connections to be processed.
    while (1) {
        my $event = await $receive->();

        # Handle disconnection
        if ($event->{type} eq 'websocket.disconnect') {
            warn "Client disconnected: code=$event->{code}, reason=$event->{reason}\n";
            last;
        }

        # Echo text messages
        if (defined $event->{text}) {
            await $send->({
                type => 'websocket.send',
                text => "Echo: $event->{text}",
            });
        }
        # Echo binary messages
        elsif (defined $event->{bytes}) {
            await $send->({
                type => 'websocket.send',
                bytes => $event->{bytes},
            });
        }
    }
}

\&app;

Test it with a WebSocket client:

# JavaScript in browser console:
const ws = new WebSocket('ws://localhost:5000/');
ws.onmessage = (event) => console.log('Received:', event.data);
ws.onopen = () => ws.send('Hello, WebSocket!');

// You should see: Received: Echo: Hello, WebSocket!

Key points:

  • Must send websocket.accept before sending any messages

  • Check type eq 'websocket.disconnect' to detect when the client closes the connection

  • Messages can be text (text) or binary (bytes)

  • The receive loop blocks waiting for the next message

See PAGI::WebSocket for higher-level utilities that make WebSocket handling easier.

2.5 SSE (Raw)

Server-Sent Events allow the server to push updates to clients over HTTP. Here's a clock that sends the current time every second:

use strict;
use warnings;
use Future::AsyncAwait;
use IO::Async::Loop;
use IO::Async::Timer::Periodic;

async sub app {
    my ($scope, $receive, $send) = @_;

    # Only handle SSE connections
    die "Expected sse scope" unless $scope->{type} eq 'sse';

    my $loop = IO::Async::Loop->new;

    # Start the SSE response
    await $send->({
        type => 'sse.response.start',
        status => 200,
        headers => [
            ['cache-control', 'no-cache'],
            ['x-custom-header', 'example'],
        ],
    });

    # Create a timer that fires every second
    my $timer = IO::Async::Timer::Periodic->new(
        interval => 1,
        on_tick => async sub {
            # Send the current time as an SSE event
            await $send->({
                type => 'sse.response.body',
                data => scalar(localtime()),
                # Optional fields:
                # event => 'clock',     # Event type
                # id => '123',          # Event ID
                # retry => 5000,        # Retry interval in ms
            });
        },
    );

    # Add timer to the event loop
    $loop->add($timer);
    $timer->start;

    # Keep the connection open indefinitely
    # (In practice, you'd detect client disconnect)
    await $loop->loop_forever;
}

\&app;

Test it:

# Terminal 1: Start server
pagi-server clock.pl --port 5000

# Terminal 2: Connect with curl
curl http://localhost:5000/

# Output (streaming):
# data: Mon Dec 23 10:30:00 2025
#
# data: Mon Dec 23 10:30:01 2025
#
# data: Mon Dec 23 10:30:02 2025
# ...

JavaScript client:

const eventSource = new EventSource('http://localhost:5000/');
eventSource.onmessage = (event) => {
    console.log('Time:', event.data);
};

Key points:

  • Send sse.response.start first with headers

  • Send sse.response.body events with data field

  • Optional fields: event (event type), id (event ID), retry (reconnect interval)

  • Connection stays open for streaming

  • SSE is unidirectional (server to client only)

See PAGI::SSE for higher-level SSE utilities.

2.6 Streaming Responses

You can stream HTTP response bodies by sending multiple http.response.body events:

use strict;
use warnings;
use Future::AsyncAwait;
use IO::Async::Loop;
use IO::Async::Timer::Countdown;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $loop = IO::Async::Loop->new;

    # Start response with chunked encoding
    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [
            ['content-type', 'text/plain'],
            ['transfer-encoding', 'chunked'],
        ],
    });

    # Send chunks over time
    for my $i (1..5) {
        await $send->({
            type => 'http.response.body',
            body => "Chunk $i\n",
            more => 1,  # More chunks coming
        });

        # Wait 1 second between chunks
        my $timer = IO::Async::Timer::Countdown->new(
            delay => 1,
            on_expire => sub { },
        );
        $loop->add($timer);
        $timer->start;
        await $timer->future;
    }

    # Send final chunk (more => 0 is the default, so can be omitted)
    await $send->({
        type => 'http.response.body',
        body => "Done!\n",
    });
}

\&app;

Test it:

curl http://localhost:5000/

# Output (appears gradually):
# Chunk 1
# Chunk 2
# Chunk 3
# Chunk 4
# Chunk 5
# Done!

Key points:

  • Use transfer-encoding: chunked or omit content-length

  • Set more => 1 on all chunks except the last

  • Omit more on the final chunk (defaults to 0)

  • Useful for large files, real-time data, or progressive rendering

2.7 Lifespan Protocol (Application Lifecycle)

The lifespan protocol allows your application to initialize resources on startup and clean them up on shutdown. The key feature is $scope->{state} - a hash that persists across all requests within a worker process.

Important: In worker mode (--workers N), each worker runs lifespan independently after forking. This means each worker initializes its own database connections, cache handles, etc. This is actually good - most database connections can't be shared across forks anyway.

use strict;
use warnings;
use Future::AsyncAwait;

async sub app {
    my ($scope, $receive, $send) = @_;

    # Handle lifespan events (startup then shutdown - a fixed sequence)
    if ($scope->{type} eq 'lifespan') {
        my $state = $scope->{state};  # Shared with all requests in this worker

        # Wait for startup event
        my $event = await $receive->();

        if ($event->{type} eq 'lifespan.startup') {
            eval {
                # Store resources in $state - available to all requests
                $state->{db} = MyDB->connect('dbi:Pg:dbname=myapp');
                $state->{cache} = Cache::Memcached->new(servers => ['localhost:11211']);
                warn "Application started successfully\n";
                await $send->({ type => 'lifespan.startup.complete' });
            };
            if ($@) {
                await $send->({ type => 'lifespan.startup.failed', message => $@ });
                return;
            }
        }

        # Wait for shutdown event
        $event = await $receive->();

        if ($event->{type} eq 'lifespan.shutdown') {
            eval {
                $state->{db}->disconnect if $state->{db};
                $state->{cache}->disconnect_all if $state->{cache};
                warn "Application shut down successfully\n";
                await $send->({ type => 'lifespan.shutdown.complete' });
            };
            if ($@) {
                await $send->({ type => 'lifespan.shutdown.failed', message => $@ });
            }
        }

        return;
    }

    # Handle HTTP requests
    if ($scope->{type} eq 'http') {
        my $state = $scope->{state};  # Same hash from lifespan!
        my $db = $state->{db};

        my $data = $db->selectrow_hashref('SELECT * FROM users LIMIT 1');

        await $send->({
            type => 'http.response.start',
            status => 200,
            headers => [['content-type', 'text/plain']],
        });

        await $send->({
            type => 'http.response.body',
            body => "Database connected: " . (defined $data ? 'yes' : 'no'),
        });
    }
}

\&app;

Key points:

  • Lifespan events are sent to the same application entry point

  • Handle lifespan.startup to initialize resources (database, cache, etc.)

  • Handle lifespan.shutdown to clean up resources

  • Must respond with complete or failed for each event

  • PAGI::Endpoint::Router provides on_startup and on_shutdown callbacks to make this easier

Example using PAGI::Endpoint::Router:

use PAGI::Endpoint::Router;

my $db;

my $router = PAGI::Endpoint::Router->new(
    on_startup => async sub ($scope) {
        $db = MyDB->connect('dbi:Pg:dbname=myapp');
        warn "Database connected\n";
    },
    on_shutdown => async sub ($scope) {
        $db->disconnect;
        warn "Database disconnected\n";
    },
);

# ... define routes ...

$router->to_app;

2.8 UTF-8 Handling

PAGI follows specific rules for UTF-8 encoding/decoding:

Request Paths

$scope->{path}       # UTF-8 decoded string (use this for display/matching)
$scope->{raw_path}   # Raw bytes (preserves original encoding)

PAGI uses Mojolicious-style path decoding: percent-encoded bytes are first URL-decoded, then UTF-8 decoded. If UTF-8 decoding fails (invalid byte sequences), the original URL-decoded bytes are preserved as-is.

Example:

# URL: /users/%E4%B8%AD%E6%96%87
$scope->{path} = '/users/中文'                    # (decoded)
$scope->{raw_path} = '/users/%E4%B8%AD%E6%96%87'  # (raw bytes)

# Invalid UTF-8: /users/%FF%FE
$scope->{path} = '/users/\xFF\xFE'                # (falls back to bytes)
$scope->{raw_path} = '/users/%FF%FE'              # (raw bytes)

Query Strings

Query strings are raw bytes. Use URI to parse them properly:

use strict;
use warnings;
use Future::AsyncAwait;
use URI;
use Encode qw(encode_utf8);

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    # Use URI to parse query string (handles decoding automatically)
    my $uri = URI->new('?' . ($scope->{query_string} // ''));
    my %params = $uri->query_form;

    my $name = $params{name} // 'World';

    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [['content-type', 'text/plain; charset=utf-8']],
    });

    await $send->({
        type => 'http.response.body',
        body => encode_utf8("Hello, $name!"),
    });
}

\&app;

Request Bodies

Request bodies are always raw bytes:

my $event = await $receive->();
my $raw_bytes = $event->{body};  # Raw bytes

# If you know it's UTF-8 JSON:
use Encode qw(decode_utf8);
use JSON::MaybeXS;

my $text = decode_utf8($raw_bytes);
my $data = decode_json($text);

Response Bodies

Response bodies must be encoded to bytes:

use Encode qw(encode_utf8);

my $text = "Hello, 世界!";  # Perl string (characters)
my $bytes = encode_utf8($text);  # Bytes

await $send->({
    type => 'http.response.body',
    body => $bytes,  # Must be bytes, not characters
});

Complete UTF-8 Example

use strict;
use warnings;
use Future::AsyncAwait;
use Encode qw(encode_utf8 decode_utf8);
use URI::Encode qw(uri_decode);

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    # 1. Path is already decoded
    my $path = $scope->{path};  # UTF-8 string

    # 2. Query string needs decoding
    my $query = $scope->{query_string} // '';
    my ($key, $value) = split /=/, $query, 2;
    $value = decode_utf8(uri_decode($value // '')) if defined $value;

    # 3. Request body is raw bytes
    # NOTE: This loop does NOT block - await yields to event loop
    my $body_bytes = '';
    while (1) {
        my $event = await $receive->();
        $body_bytes .= $event->{body} if defined $event->{body};
        last unless $event->{more};
    }
    my $body_text = decode_utf8($body_bytes);  # Decode to string

    # 4. Build response (as string)
    my $response = "Path: $path\n";
    $response .= "Param: $value\n" if defined $value;
    $response .= "Body: $body_text\n" if length $body_text;

    # 5. Encode response to bytes
    my $response_bytes = encode_utf8($response);

    await $send->({
        type => 'http.response.start',
        status => 200,
        headers => [
            ['content-type', 'text/plain; charset=utf-8'],
            ['content-length', length($response_bytes)],
        ],
    });

    await $send->({
        type => 'http.response.body',
        body => $response_bytes,  # Bytes, not string
    });
}

\&app;

Using High-Level Helpers

PAGI::Request and PAGI::Response handle UTF-8 encoding/decoding automatically:

use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Path is already decoded
    my $path = $req->path;

    # Query params are automatically decoded
    my $name = $req->query_parameters->get('name') // 'World';

    # Body is automatically decoded (if content-type indicates UTF-8)
    my $body = await $req->body;

    # Response automatically encodes to UTF-8 bytes
    await $res->text("Hello, $name!");
}

\&app;

Key points:

  • $scope->{path} is decoded UTF-8; $scope->{raw_path} is bytes

  • Query strings and request bodies are raw bytes (decode them yourself)

  • Response bodies must be encoded to bytes before sending

  • Use Encode for explicit encoding/decoding

  • PAGI::Request and PAGI::Response handle this automatically

NEXT STEPS

In Part 3, we'll cover:

PART 3: THE HELPER CLASSES

In Part 2, we worked directly with the raw PAGI protocol ($scope, $receive, $send). While powerful, this low-level approach requires a lot of boilerplate code for common tasks.

PAGI provides four helper classes that make web development much easier:

These classes handle UTF-8 encoding/decoding, body parsing, cookie handling, and more automatically.

3.1 PAGI::Response - Fluent Response Builder

PAGI::Response provides a fluent interface for building HTTP responses. Instead of manually constructing http.response.start and http.response.body events, you use chainable methods.

Creating a Response

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    # Create a response builder
    my $res = PAGI::Response->new($send);

    # Use it to send responses
    await $res->text('Hello, World!');
}

\&app;

The new() method takes the $send callback from your PAGI app.

Simple Responses

PAGI::Response provides methods for common response types:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $res = PAGI::Response->new($send);
    my $path = $scope->{path};

    # Plain text
    if ($path eq '/text') {
        await $res->text('Hello, World!');
    }
    # HTML
    elsif ($path eq '/html') {
        await $res->html('<h1>Hello, World!</h1>');
    }
    # JSON
    elsif ($path eq '/json') {
        await $res->json({ message => 'Hello, World!' });
    }
    # Empty response (204 No Content)
    elsif ($path eq '/empty') {
        await $res->empty();
    }
    else {
        await $res->text('Unknown path');
    }
}

\&app;

Key points:

  • text() sets Content-Type to text/plain; charset=utf-8

  • html() sets Content-Type to text/html; charset=utf-8

  • json() sets Content-Type to application/json; charset=utf-8

  • All methods automatically encode strings to UTF-8 bytes

Status and Headers

Use chainable methods to set status and headers before sending:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $res = PAGI::Response->new($send);

    # Fluent chaining: set status and headers, then send
    await $res->status(201)
              ->header('X-Request-ID', '12345')
              ->header('X-Custom', 'value')
              ->content_type('application/json')
              ->json({ created => 1 });
}

\&app;

Chainable methods:

  • status($code) - Set HTTP status (default: 200)

  • header($name, $value) - Add a response header

  • content_type($type) - Set Content-Type header

  • cookie($name, $value, %opts) - Set a cookie

Cookies

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $res = PAGI::Response->new($send);

    # Set a simple cookie
    $res->cookie('session_id', 'abc123');

    # Set a cookie with options
    $res->cookie('user_pref', 'dark_mode',
        expires  => time + 86400,      # 1 day from now
        path     => '/',
        domain   => '.example.com',
        secure   => 1,                 # HTTPS only
        httponly => 1,                 # No JavaScript access
        samesite => 'Strict',          # CSRF protection
    );

    # Delete a cookie
    $res->delete_cookie('old_session');

    await $res->text('Cookies set!');
}

\&app;

Redirects

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $res = PAGI::Response->new($send);
    my $path = $scope->{path};

    # Temporary redirect (302 Found - default)
    if ($path eq '/old-page') {
        await $res->redirect('/new-page');
    }
    # Permanent redirect (301 Moved Permanently)
    elsif ($path eq '/legacy') {
        await $res->redirect('/modern', 301);
    }
    # See Other (303 - used after POST)
    elsif ($path eq '/after-post') {
        await $res->redirect('/success', 303);
    }
    else {
        await $res->text('No redirect needed');
    }
}

\&app;

Note: redirect() sends the response immediately but does NOT stop Perl execution. Code after it still runs. Use return after redirect if you have more code below, or use control flow (if/elsif) as shown above.

Errors

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $res = PAGI::Response->new($send);
    my $path = $scope->{path};

    # Simple error
    if ($path eq '/gone') {
        await $res->status(404)->json({ error => 'Not Found' });
    }
    # Error with extra data
    elsif ($path eq '/invalid') {
        await $res->status(422)->json({
            error => 'Validation Failed',
            errors => [
                { field => 'email', message => 'Invalid format' },
                { field => 'age', message => 'Must be >= 18' },
            ],
        });
    }
    # Internal server error
    elsif ($path eq '/error') {
        await $res->status(500)->json({ error => 'Internal Server Error' });
    }
    else {
        await $res->text('OK');
    }
}

\&app;

The pattern $res->status(N)->json({...}) gives you full control over the error response structure, which is especially useful for APIs.

Streaming Responses

For large responses or real-time data, use stream():

use strict;
use warnings;
use Future::AsyncAwait;
use IO::Async::Loop;
use IO::Async::Timer::Countdown;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $loop = IO::Async::Loop->new;
    my $res = PAGI::Response->new($send);

    # Stream response chunks
    await $res->stream(async sub {
        my ($writer) = @_;

        for my $i (1..5) {
            await $writer->write("Chunk $i\n");

            # Wait 1 second between chunks
            my $timer = IO::Async::Timer::Countdown->new(
                delay => 1,
                on_expire => sub { },
            );
            $loop->add($timer);
            $timer->start;
            await $timer->future;
        }

        # Close the stream
        await $writer->close();
    });
}

\&app;

The writer object has three methods:

  • write($chunk) - Send a chunk (returns a Future)

  • close() - Close the stream (returns a Future)

  • bytes_written() - Get total bytes written so far

See examples/02-streaming-response/ for complete examples.

File Downloads

Send files efficiently without loading them into memory:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $res = PAGI::Response->new($send);

    # Send file with auto-detected MIME type
    await $res->send_file('/path/to/document.pdf');

    # Or with custom filename for download
    # await $res->send_file('/path/to/report.pdf',
    #     filename => 'Q4-Report.pdf',
    # );

    # Or display inline (in browser)
    # await $res->send_file('/path/to/image.jpg',
    #     inline => 1,
    # );
}

\&app;

PAGI::Response uses the PAGI protocol's file key, which allows PAGI::Server to use efficient zero-copy mechanisms like sendfile().

3.2 PAGI::Request - Request Parsing

PAGI::Request parses HTTP requests and provides convenient accessors for headers, query parameters, cookies, and request bodies.

Creating a Request

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    # Create request and response objects
    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    await $res->text("You requested: " . $req->path);
}

\&app;

The new() method takes both $scope and $receive from your PAGI app.

Basic Properties

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    my $info = join "\n", (
        "Method: " . $req->method,
        "Path: " . $req->path,
        "Query: " . $req->query_string,
        "Scheme: " . $req->scheme,
        "HTTP Version: " . $req->http_version,
        "Client: " . join(':', @{$req->client}),
    );

    await $res->text($info);
}

\&app;

Available methods:

  • method() - HTTP method (GET, POST, etc.)

  • path() - URL path (decoded UTF-8)

  • raw_path() - URL path (raw bytes)

  • query_string() - Query string (raw bytes)

  • scheme() - 'http' or 'https'

  • http_version() - HTTP version (e.g., '1.1')

  • client() - Client address and port arrayref

  • host() - Host header value

Headers

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Get single header (case-insensitive)
    my $user_agent = $req->header('user-agent') // 'Unknown';

    # Get content type
    my $ct = $req->content_type;  # Just the type, without charset

    # Get content length
    my $len = $req->content_length;

    # Check authorization header
    my $auth = $req->header('authorization');

    # Get bearer token from Authorization: Bearer <token>
    my $token = $req->bearer_token;

    await $res->text("User-Agent: $user_agent");
}

\&app;

Query Parameters

Query parameters are automatically decoded from URL encoding and UTF-8:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Get single parameter
    my $page = $req->query('page') // '1';
    my $search = $req->query('q') // '';

    # Get all parameters as Hash::MultiValue
    my $params = $req->query_params;
    my @tags = $params->get_all('tag');  # For ?tag=foo&tag=bar

    # Get all as hashref (last value wins for duplicates)
    my $hash = $params->as_hashref;

    await $res->json({
        page => $page,
        search => $search,
        tags => \@tags,
    });
}

\&app;

Test it:

curl 'http://localhost:5000/?page=2&q=hello&tag=perl&tag=async'

Cookies

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Get single cookie
    my $session_id = $req->cookie('session_id') // 'none';

    # Get all cookies as hashref
    my $cookies = $req->cookies;

    await $res->json({
        session_id => $session_id,
        all_cookies => $cookies,
    });
}

\&app;

Body Parsing

PAGI::Request provides async methods for parsing different body types:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Read entire body as raw bytes
    if ($req->path eq '/raw') {
        my $bytes = await $req->body;
        await $res->text("Received " . length($bytes) . " bytes");
    }
    # Parse JSON body
    elsif ($req->path eq '/json' && $req->is_json) {
        my $data = await $req->json;
        await $res->json({
            received => $data,
        });
    }
    # Parse URL-encoded form
    elsif ($req->path eq '/form' && $req->is_form) {
        my $form = await $req->form;
        my $name = $form->get('name');
        my $email = $form->get('email');

        await $res->json({
            name => $name,
            email => $email,
        });
    }
    else {
        await $res->text('Send POST data to /json or /form');
    }
}

\&app;

Available body methods:

  • body() - Raw bytes (entire body)

  • json() - Parse as JSON, returns hashref/arrayref

  • form() - Parse as application/x-www-form-urlencoded or multipart/form-data, returns Hash::MultiValue

Content-type predicates:

  • is_json() - True if Content-Type is application/json

  • is_form() - True if form data (urlencoded or multipart)

  • is_multipart() - True if multipart/form-data

Test JSON:

curl -X POST http://localhost:5000/json \
  -H 'Content-Type: application/json' \
  -d '{"name":"Alice","age":30}'

Test form:

curl -X POST http://localhost:5000/form \
  -d 'name=Bob&email=bob@example.com'

See examples/03-request-body/ for complete examples.

File Uploads

PAGI::Request handles multipart file uploads:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    return await $res->text('Send POST to /upload')
        unless $req->method eq 'POST' && $req->path eq '/upload';

    # Get single upload
    my $file = await $req->upload('avatar');

    if ($file && !$file->is_empty) {
        # Get upload metadata
        my $filename = $file->filename;
        my $content_type = $file->content_type;
        my $size = $file->size;

        # Save to disk
        await $file->save_to("/tmp/uploads/$filename");

        # Or get content in memory (careful with large files!)
        # my $content = await $file->content;

        await $res->json({
            uploaded => 1,
            filename => $filename,
            type => $content_type,
            size => $size,
        });
    } else {
        await $res->status(400)->json({ error => 'No file uploaded' });
    }
}

\&app;

For multiple files:

# Get all uploads for a field
my @files = await $req->upload_all('attachments');

for my $file (@files) {
    next if $file->is_empty;
    my $name = $file->filename;
    await $file->save_to("/tmp/uploads/$name");
}

Upload object methods (PAGI::Request::Upload):

  • filename() - Original filename

  • content_type() - MIME type

  • size() - File size in bytes

  • is_empty() - True if no file was uploaded

  • content() - Get content as bytes (async)

  • save_to($path) - Save to disk (async)

See examples/13-contact-form/ for a complete file upload example.

Request Stash

The stash is a per-request storage area for sharing data between middleware and handlers:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected http scope" unless $scope->{type} eq 'http';

    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Store data in stash (e.g., from authentication middleware)
    $req->stash->{user} = { id => 123, name => 'Alice' };

    # Retrieve data later
    my $user = $req->stash->{user};

    await $res->json($user);
}

\&app;

The stash lives in $scope->{'pagi.stash'} and flows through middleware and subrouters.

3.3 PAGI::WebSocket - WebSocket Helper

PAGI::WebSocket simplifies WebSocket connections. Instead of manually handling websocket.accept, websocket.receive, and websocket.send events, you use high-level methods.

Creating a WebSocket

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::WebSocket;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected websocket scope" unless $scope->{type} eq 'websocket';

    # Create WebSocket helper
    my $ws = PAGI::WebSocket->new($scope, $receive, $send);

    # Accept the connection
    await $ws->accept;

    # Now you can send and receive messages
    await $ws->send_text("Welcome!");
}

\&app;

Echo Server

A simple echo server that receives text messages and sends them back:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::WebSocket;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected websocket scope" unless $scope->{type} eq 'websocket';

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    # Receive messages in a loop
    # NOTE: This loop does NOT block! The 'await' yields to the event
    # loop while waiting, allowing other connections to be processed.
    while (1) {
        my $message = await $ws->receive;

        # Check if connection closed
        last unless defined $message;

        # Echo back
        await $ws->send_text("Echo: $message");
    }
}

\&app;

See examples/04-websocket-echo/ and examples/websocket-echo-v2/ for complete examples.

Send and Receive

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::WebSocket;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected websocket scope" unless $scope->{type} eq 'websocket';

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    # Send text message
    await $ws->send_text("Hello!");

    # Send binary message
    await $ws->send_bytes("\x00\x01\x02");

    # Receive any message (text or binary)
    my $msg = await $ws->receive;

    # Receive specifically text (waits for text message)
    my $text = await $ws->receive_text;

    # Receive specifically binary
    my $bytes = await $ws->receive_bytes;

    await $ws->close(1000, 'Goodbye');
}

\&app;

JSON Messages

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::WebSocket;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected websocket scope" unless $scope->{type} eq 'websocket';

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    # Send JSON
    await $ws->send_json({ type => 'greeting', message => 'Hello!' });

    # Receive and parse JSON
    my $data = await $ws->receive_json;

    # $data is a hashref/arrayref
    my $type = $data->{type};
    my $payload = $data->{payload};

    await $ws->close(1000);
}

\&app;

Message Loop with Callbacks

Process each message with a callback:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::WebSocket;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected websocket scope" unless $scope->{type} eq 'websocket';

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    # Process each text message
    await $ws->each_text(async sub {
        my ($text) = @_;

        # Handle message
        warn "Received: $text\n";

        # Send response
        await $ws->send_text("Got: $text");
    });

    # Loop exits when connection closes
}

\&app;

Close Handling

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::WebSocket;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected websocket scope" unless $scope->{type} eq 'websocket';

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);

    # Register close callback
    $ws->on_close(sub {
        my ($code, $reason) = @_;
        warn "Connection closed: $code - $reason\n";
    });

    # Register error callback
    $ws->on_error(sub {
        my ($error) = @_;
        warn "WebSocket error: $error\n";
    });

    await $ws->accept;

    # Your message handling code (await yields, doesn't block)
    while (1) {
        my $msg = await $ws->receive;
        last unless defined $msg;
        # ...
    }
}

\&app;

WebSocket Stash

Like PAGI::Request, WebSocket has a stash for per-connection data:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::WebSocket;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected websocket scope" unless $scope->{type} eq 'websocket';

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    # Store connection-specific data
    $ws->stash->{user_id} = 123;
    $ws->stash->{room} = 'lobby';

    # Use it later
    my $user_id = $ws->stash->{user_id};

    # Your message handling...
    await $ws->close(1000);
}

\&app;

See examples/websocket-chat-v2/ for a complete chat application using WebSocket stash.

3.4 PAGI::SSE - Server-Sent Events Helper

PAGI::SSE simplifies Server-Sent Events (SSE) for server-to-client streaming. SSE is perfect for live updates, dashboards, and notifications.

Creating an SSE Stream

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::SSE;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected sse scope" unless $scope->{type} eq 'sse';

    # Create SSE helper
    my $sse = PAGI::SSE->new($scope, $receive, $send);

    # Start the SSE response
    await $sse->start;

    # Send events
    await $sse->send_event("Hello from server!");
}

\&app;

Sending Events

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::SSE;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected sse scope" unless $scope->{type} eq 'sse';

    my $sse = PAGI::SSE->new($scope, $receive, $send);
    await $sse->start;

    # Simple event (data only)
    await $sse->send_event("Server time: " . scalar(localtime));

    # Event with type and ID
    await $sse->send_event(
        "User Alice logged in",
        event => 'user_login',    # Event type
        id => '42',                # Event ID
    );

    # Multiple lines of data
    await $sse->send_event(
        "Line 1\nLine 2\nLine 3",
        event => 'multiline',
    );
}

\&app;

The browser receives these events:

const eventSource = new EventSource('/events');

// Listen for all events
eventSource.onmessage = (e) => {
    console.log('Data:', e.data);
    console.log('ID:', e.lastEventId);
};

// Listen for specific event type
eventSource.addEventListener('user_login', (e) => {
    console.log('User logged in:', e.data);
});

JSON Events

Send structured data as JSON:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::SSE;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected sse scope" unless $scope->{type} eq 'sse';

    my $sse = PAGI::SSE->new($scope, $receive, $send);
    await $sse->start;

    # Send JSON data
    await $sse->send_json({
        type => 'notification',
        message => 'You have a new message',
        count => 3,
    }, event => 'notification');
}

\&app;

Browser:

eventSource.addEventListener('notification', (e) => {
    const data = JSON.parse(e.data);
    console.log('Notifications:', data.count);
});

Periodic Updates

Send events on a schedule using every():

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::SSE;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected sse scope" unless $scope->{type} eq 'sse';

    my $sse = PAGI::SSE->new($scope, $receive, $send);
    await $sse->start;

    # Send updates every 2 seconds
    await $sse->every(2, async sub {
        my $time = scalar(localtime);
        await $sse->send_json({
            time => $time,
            random => int(rand(100)),
        }, event => 'update');
    });

    # This never returns (keeps sending until client disconnects)
}

\&app;

See examples/05-sse-broadcaster/ and examples/sse-dashboard/ for complete examples.

Keepalive

Keep connections alive by sending periodic comments:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::SSE;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected sse scope" unless $scope->{type} eq 'sse';

    my $sse = PAGI::SSE->new($scope, $receive, $send);
    await $sse->start;

    # Send keepalive comment every 15 seconds
    $sse->keepalive(15);

    # Your event sending code...
    await $sse->every(5, async sub {
        await $sse->send_event("Update: " . time);
    });
}

\&app;

Keepalive prevents proxy timeouts on idle connections.

Close Detection

Detect when the client disconnects:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::SSE;

async sub app {
    my ($scope, $receive, $send) = @_;

    die "Expected sse scope" unless $scope->{type} eq 'sse';

    my $sse = PAGI::SSE->new($scope, $receive, $send);

    # Register close callback
    $sse->on_close(sub {
        warn "Client disconnected\n";
    });

    await $sse->start;

    # Send events...
    await $sse->every(1, async sub {
        # This stops when client disconnects
        await $sse->send_event("tick: " . time);
    });
}

\&app;

PART 4: ROUTING

In Parts 1-3, we built PAGI applications using raw protocol handlers and helper classes. While these work well for simple applications, real-world applications need routing to handle multiple endpoints, path parameters, and different HTTP methods.

PAGI provides two routing approaches:

Both routers support HTTP, WebSocket, and SSE routing in a unified interface.

4.1 PAGI::App::Router - Functional Routing

PAGI::App::Router provides a lightweight functional approach to routing. You create a router instance, register routes, and convert it to a PAGI app.

Creating a Router

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::Request;
use PAGI::Response;

# Create router instance
my $router = PAGI::App::Router->new;

# Add routes
$router->get('/' => async sub {
    my ($scope, $receive, $send) = @_;
    my $res = PAGI::Response->new($send);
    await $res->text('Welcome!');
});

# Convert to PAGI app
my $app = $router->to_app;

# Return from app.pl
$app;

HTTP Method Routing

PAGI::App::Router provides methods for all standard HTTP verbs:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::Request;
use PAGI::Response;

my $router = PAGI::App::Router->new;

# GET /users
$router->get('/users' => async sub {
    my ($scope, $receive, $send) = @_;
    my $res = PAGI::Response->new($send);
    await $res->json([
        { id => 1, name => 'Alice' },
        { id => 2, name => 'Bob' },
    ]);
});

# POST /users
$router->post('/users' => async sub {
    my ($scope, $receive, $send) = @_;
    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    my $data = await $req->json;
    # ... create user ...

    await $res->status(201)->json({ id => 3, name => $data->{name} });
});

# PUT /users/:id
$router->put('/users/:id' => async sub {
    my ($scope, $receive, $send) = @_;
    # ... update user ...
});

# PATCH /users/:id
$router->patch('/users/:id' => async sub {
    my ($scope, $receive, $send) = @_;
    # ... partial update ...
});

# DELETE /users/:id
$router->delete('/users/:id' => async sub {
    my ($scope, $receive, $send) = @_;
    # ... delete user ...
});

# HEAD /users/:id
$router->head('/users/:id' => async sub {
    my ($scope, $receive, $send) = @_;
    # ... return headers only ...
});

# OPTIONS /users/:id
$router->options('/users/:id' => async sub {
    my ($scope, $receive, $send) = @_;
    # ... return allowed methods ...
});

$router->to_app;

Available methods: get(), post(), put(), patch(), delete(), head(), options().

Path Parameters

Use :param syntax for named parameters in routes:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::Request;
use PAGI::Response;

my $router = PAGI::App::Router->new;

# Single parameter
$router->get('/users/:id' => async sub {
    my ($scope, $receive, $send) = @_;
    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Access path parameter via $req->path_param()
    my $id = $req->path_param('id');

    await $res->json({ id => $id, name => 'User ' . $id });
});

# Multiple parameters
$router->get('/posts/:post_id/comments/:comment_id' => async sub {
    my ($scope, $receive, $send) = @_;
    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    my $post_id = $req->path_param('post_id');
    my $comment_id = $req->path_param('comment_id');

    await $res->json({
        post_id => $post_id,
        comment_id => $comment_id,
    });
});

$router->to_app;

Parameters are stored in $scope->{path_params} and accessed via $req->path_param($name).

Wildcard Routes

Use *name syntax to capture multiple path segments:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::Request;
use PAGI::Response;

my $router = PAGI::App::Router->new;

# Wildcard captures everything after /files/
$router->get('/files/*path' => async sub {
    my ($scope, $receive, $send) = @_;
    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Access wildcard via $req->path_param()
    my $path = $req->path_param('path');

    # /files/docs/readme.txt -> path = 'docs/readme.txt'
    # /files/images/logo.png -> path = 'images/logo.png'

    await $res->text("File path: $path");
});

$router->to_app;

Wildcards match one or more path segments, while named parameters (:param) match a single segment.

WebSocket Routes

Route WebSocket connections by path:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::WebSocket;

my $router = PAGI::App::Router->new;

# WebSocket echo server
$router->websocket('/ws' => async sub {
    my ($scope, $receive, $send) = @_;
    my $ws = PAGI::WebSocket->new($scope, $receive, $send);

    await $ws->accept;

    await $ws->each_text(async sub {
        my ($text) = @_;
        await $ws->send_text("Echo: $text");
    });
});

# WebSocket with path parameters
$router->websocket('/ws/chat/:room' => async sub {
    my ($scope, $receive, $send) = @_;
    my $ws = PAGI::WebSocket->new($scope, $receive, $send);

    # Access room parameter
    my $room = $ws->stash->{'pagi.params'}{room};

    await $ws->accept;
    await $ws->send_text("Joined room: $room");

    # ... chat logic ...
});

$router->to_app;

SSE Routes

Route Server-Sent Events by path:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::SSE;

my $router = PAGI::App::Router->new;

# SSE clock
$router->sse('/events' => async sub {
    my ($scope, $receive, $send) = @_;
    my $sse = PAGI::SSE->new($scope, $receive, $send);

    await $sse->start;

    await $sse->every(1, async sub {
        await $sse->send_event("Time: " . time);
    });
});

# SSE with path parameters
$router->sse('/events/:channel' => async sub {
    my ($scope, $receive, $send) = @_;
    my $sse = PAGI::SSE->new($scope, $receive, $send);

    # Access channel parameter
    my $channel = $sse->stash->{'pagi.params'}{channel};

    await $sse->start;
    await $sse->send_event("Subscribed to: $channel");

    # ... streaming logic ...
});

$router->to_app;

Route-Level Middleware

Apply middleware to specific routes by passing an arrayref:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::Request;
use PAGI::Response;

my $router = PAGI::App::Router->new;

# Define middleware
my $auth_mw = async sub {
    my ($scope, $receive, $send, $next) = @_;

    # Check authorization
    my $token = '';
    for my $h (@{$scope->{headers}}) {
        if (lc($h->[0]) eq 'authorization') {
            $token = $h->[1];
            last;
        }
    }

    unless ($token eq 'Bearer secret123') {
        my $res = PAGI::Response->new($send);
        await $res->status(401)->json({ error => 'Unauthorized' });
        return;  # Don't call $next->()
    }

    # Authorized - continue to handler
    await $next->();
};

# Apply middleware to specific routes
$router->get('/admin' => [$auth_mw] => async sub {
    my ($scope, $receive, $send) = @_;
    my $res = PAGI::Response->new($send);
    await $res->text('Admin panel');
});

# Multiple middleware
my $log_mw = async sub {
    my ($scope, $receive, $send, $next) = @_;
    warn "Request: $scope->{method} $scope->{path}\n";
    await $next->();
};

$router->post('/admin/users' => [$log_mw, $auth_mw] => async sub {
    my ($scope, $receive, $send) = @_;
    # ... handler ...
});

$router->to_app;

Middleware are executed in order ($log_mw runs before $auth_mw in the example above).

Mounting Sub-Applications

Use mount() to attach sub-applications or routers at a prefix:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::Request;
use PAGI::Response;

# Create API router
my $api_router = PAGI::App::Router->new;
$api_router->get('/users' => async sub {
    my ($scope, $receive, $send) = @_;
    my $res = PAGI::Response->new($send);
    await $res->json([{ id => 1, name => 'Alice' }]);
});

# Create main router
my $main_router = PAGI::App::Router->new;

# Mount API router at /api prefix
# /api/users will be handled by $api_router
$main_router->mount('/api' => $api_router->to_app);

# Mount with middleware (applies to all sub-routes)
my $cors_mw = async sub {
    my ($scope, $receive, $send, $next) = @_;
    # Add CORS headers...
    await $next->();
};

$main_router->mount('/api' => [$cors_mw] => $api_router->to_app);

$main_router->to_app;

See examples/endpoint-demo/ for a complete routing example using PAGI::App::Router.

4.2 PAGI::Endpoint::Router - Class-Based Routing

PAGI::Endpoint::Router provides an object-oriented approach to routing. You define a class that extends PAGI::Endpoint::Router, implement a routes() method, and use method names as handlers.

Basic Router Class

package MyApp;
use parent 'PAGI::Endpoint::Router';
use strict;
use warnings;
use Future::AsyncAwait;

# Define routes
sub routes {
    my ($self, $r) = @_;

    $r->get('/' => 'home');
    $r->get('/about' => 'about');
    $r->post('/contact' => 'contact');
}

# Handler methods receive ($self, $req, $res) for HTTP
async sub home {
    my ($self, $req, $res) = @_;
    await $res->html('<h1>Welcome!</h1>');
}

async sub about {
    my ($self, $req, $res) = @_;
    await $res->html('<h1>About Us</h1>');
}

async sub contact {
    my ($self, $req, $res) = @_;
    my $data = await $req->json;
    # ... process contact form ...
    await $res->json({ success => 1 });
}

1;

# In app.pl:
# use MyApp;
# MyApp->to_app;

Handler methods are called with:

  • $self - Router instance (access $self->state for per-instance data)

  • $req - PAGI::Request object (already constructed)

  • $res - PAGI::Response object (already constructed)

Path Parameters

Access path parameters via $req->param():

package MyApp;
use parent 'PAGI::Endpoint::Router';
use strict;
use warnings;
use Future::AsyncAwait;

sub routes {
    my ($self, $r) = @_;

    $r->get('/users/:id' => 'get_user');
    $r->get('/posts/:post_id/comments/:comment_id' => 'get_comment');
}

async sub get_user {
    my ($self, $req, $res) = @_;

    my $id = $req->path_param('id');

    await $res->json({
        id => $id,
        name => "User $id",
    });
}

async sub get_comment {
    my ($self, $req, $res) = @_;

    my $post_id = $req->path_param('post_id');
    my $comment_id = $req->path_param('comment_id');

    await $res->json({
        post_id => $post_id,
        comment_id => $comment_id,
    });
}

1;

WebSocket Handlers

WebSocket handlers receive ($self, $ws):

package MyApp;
use parent 'PAGI::Endpoint::Router';
use strict;
use warnings;
use Future::AsyncAwait;

sub routes {
    my ($self, $r) = @_;

    $r->get('/' => 'home');
    $r->websocket('/ws' => 'handle_ws');
    $r->websocket('/ws/chat/:room' => 'handle_chat');
}

async sub home {
    my ($self, $req, $res) = @_;
    await $res->html('<h1>Home</h1>');
}

# WebSocket handler receives ($self, $ws)
async sub handle_ws {
    my ($self, $ws) = @_;

    await $ws->accept;

    await $ws->each_text(async sub {
        my ($text) = @_;
        await $ws->send_text("Echo: $text");
    });
}

# WebSocket with path parameters
async sub handle_chat {
    my ($self, $ws) = @_;

    # Access path parameter via $ws->path_param()
    my $room = $ws->path_param('room');

    await $ws->accept;
    await $ws->send_text("Joined room: $room");

    # ... chat logic ...
}

1;

SSE Handlers

SSE handlers receive ($self, $sse):

package MyApp;
use parent 'PAGI::Endpoint::Router';
use strict;
use warnings;
use Future::AsyncAwait;

sub routes {
    my ($self, $r) = @_;

    $r->get('/' => 'home');
    $r->sse('/events' => 'handle_events');
    $r->sse('/events/:channel' => 'handle_channel');
}

async sub home {
    my ($self, $req, $res) = @_;
    await $res->html('<h1>Home</h1>');
}

# SSE handler receives ($self, $sse)
async sub handle_events {
    my ($self, $sse) = @_;

    await $sse->start;

    await $sse->every(1, async sub {
        await $sse->send_event("Time: " . time);
    });
}

# SSE with path parameters
async sub handle_channel {
    my ($self, $sse) = @_;

    # Access path parameter
    my $channel = $sse->scope->{'pagi.params'}{channel};

    await $sse->start;
    await $sse->send_event("Channel: $channel");

    # ... streaming logic ...
}

1;

Lifecycle Hooks

Override on_startup and on_shutdown for application lifecycle management:

package MyApp;
use parent 'PAGI::Endpoint::Router';
use strict;
use warnings;
use Future::AsyncAwait;

# Called when application starts
async sub on_startup {
    my ($self) = @_;

    # Initialize resources
    $self->state->{db} = DBI->connect('dbi:SQLite:dbname=app.db');
    $self->state->{started_at} = time();

    warn "Application started\n";
}

# Called when application shuts down
async sub on_shutdown {
    my ($self) = @_;

    # Clean up resources
    $self->state->{db}->disconnect if $self->state->{db};

    warn "Application shut down\n";
}

sub routes {
    my ($self, $r) = @_;

    $r->get('/' => 'home');
}

async sub home {
    my ($self, $req, $res) = @_;

    # Access state initialized in on_startup
    my $db = $self->state->{db};
    my $uptime = time() - $self->state->{started_at};

    await $res->json({
        uptime => $uptime,
        database => defined($db) ? 'connected' : 'disconnected',
    });
}

1;

Note: $self->state is per-worker in multi-worker mode. For shared state, use an external store (Redis, database, etc.).

Per-Instance State

Use $self->state to store data that persists across requests:

package MyApp;
use parent 'PAGI::Endpoint::Router';
use strict;
use warnings;
use Future::AsyncAwait;

async sub on_startup {
    my ($self) = @_;

    # Initialize per-instance state
    $self->state->{request_count} = 0;
    $self->state->{config} = {
        app_name => 'MyApp',
        version => '1.0',
    };
}

sub routes {
    my ($self, $r) = @_;
    $r->get('/' => 'home');
    $r->get('/stats' => 'stats');
}

async sub home {
    my ($self, $req, $res) = @_;

    # Increment counter
    $self->state->{request_count}++;

    await $res->html('<h1>Welcome!</h1>');
}

async sub stats {
    my ($self, $req, $res) = @_;

    await $res->json({
        config => $self->state->{config},
        requests => $self->state->{request_count},
    });
}

1;

State is also available via $req->state, $ws->state, and $sse->state in handlers.

Route-Level Middleware with Method Names

Define middleware methods and reference them by name:

package MyApp;
use parent 'PAGI::Endpoint::Router';
use strict;
use warnings;
use Future::AsyncAwait;

sub routes {
    my ($self, $r) = @_;

    $r->get('/' => 'home');

    # Apply middleware by method name
    $r->get('/admin' => ['require_auth'] => 'admin_page');

    # Multiple middleware
    $r->post('/admin/users' => ['log_request', 'require_auth'] => 'create_user');
}

# Middleware method signature: ($self, $req, $res, $next)
async sub require_auth {
    my ($self, $req, $res, $next) = @_;

    my $token = $req->header('authorization');

    unless ($token && $token eq 'Bearer secret123') {
        await $res->status(401)->json({ error => 'Unauthorized' });
        return;  # Don't call $next->()
    }

    # Authorized - continue to handler
    await $next->();
}

async sub log_request {
    my ($self, $req, $res, $next) = @_;

    warn "Request: " . $req->method . " " . $req->path . "\n";

    await $next->();
}

async sub home {
    my ($self, $req, $res) = @_;
    await $res->html('<h1>Home</h1>');
}

async sub admin_page {
    my ($self, $req, $res) = @_;
    await $res->html('<h1>Admin Panel</h1>');
}

async sub create_user {
    my ($self, $req, $res) = @_;
    my $data = await $req->json;
    # ... create user ...
    await $res->status(201)->json({ success => 1 });
}

1;

Middleware methods receive ($self, $req, $res, $next) and must call await $next->() to continue to the handler.

Mounting Sub-Routers

Create modular applications by mounting sub-routers:

# lib/MyApp/API.pm
package MyApp::API;
use parent 'PAGI::Endpoint::Router';
use strict;
use warnings;
use Future::AsyncAwait;

sub routes {
    my ($self, $r) = @_;

    $r->get('/users' => 'list_users');
    $r->get('/users/:id' => 'get_user');
    $r->post('/users' => 'create_user');
}

async sub list_users {
    my ($self, $req, $res) = @_;
    await $res->json([
        { id => 1, name => 'Alice' },
        { id => 2, name => 'Bob' },
    ]);
}

async sub get_user {
    my ($self, $req, $res) = @_;
    my $id = $req->path_param('id');
    await $res->json({ id => $id, name => "User $id" });
}

async sub create_user {
    my ($self, $req, $res) = @_;
    my $data = await $req->json;
    await $res->status(201)->json({ id => 3, name => $data->{name} });
}

1;

# lib/MyApp.pm
package MyApp;
use parent 'PAGI::Endpoint::Router';
use strict;
use warnings;
use Future::AsyncAwait;
use MyApp::API;

sub routes {
    my ($self, $r) = @_;

    $r->get('/' => 'home');

    # Mount API router at /api prefix
    $r->mount('/api' => MyApp::API->to_app);
}

async sub home {
    my ($self, $req, $res) = @_;
    await $res->html('<h1>Home</h1>');
}

1;

# app.pl
# use MyApp;
# MyApp->to_app;

Now /api/users and /api/users/:id are handled by MyApp::API.

See examples/endpoint-router-demo/ for a comprehensive class-based routing example.

Complete Example

Here's a complete working example demonstrating all features:

package MyApp;
use parent 'PAGI::Endpoint::Router';
use strict;
use warnings;
use Future::AsyncAwait;

async sub on_startup {
    my ($self) = @_;
    $self->state->{started} = time();
    $self->state->{users} = [
        { id => 1, name => 'Alice' },
        { id => 2, name => 'Bob' },
    ];
}

sub routes {
    my ($self, $r) = @_;

    # HTTP routes
    $r->get('/' => 'home');
    $r->get('/users' => 'list_users');
    $r->get('/users/:id' => 'get_user');
    $r->post('/users' => 'create_user');

    # Protected route
    $r->get('/admin' => ['require_auth'] => 'admin');

    # WebSocket
    $r->websocket('/ws' => 'ws_echo');

    # SSE
    $r->sse('/events' => 'events');
}

async sub require_auth {
    my ($self, $req, $res, $next) = @_;
    my $token = $req->header('authorization');
    return await $res->status(401)->json({ error => 'Unauthorized' })
        unless $token && $token eq 'Bearer secret';
    await $next->();
}

async sub home {
    my ($self, $req, $res) = @_;
    await $res->html('<h1>Welcome to MyApp</h1>');
}

async sub list_users {
    my ($self, $req, $res) = @_;
    await $res->json($self->state->{users});
}

async sub get_user {
    my ($self, $req, $res) = @_;
    my $id = $req->path_param('id');
    my ($user) = grep { $_->{id} == $id } @{$self->state->{users}};
    return await $res->status(404)->json({ error => 'Not found' })
        unless $user;
    await $res->json($user);
}

async sub create_user {
    my ($self, $req, $res) = @_;
    my $data = await $req->json;
    my $user = { id => scalar(@{$self->state->{users}}) + 1, name => $data->{name} };
    push @{$self->state->{users}}, $user;
    await $res->status(201)->json($user);
}

async sub admin {
    my ($self, $req, $res) = @_;
    await $res->json({ message => 'Admin access granted' });
}

async sub ws_echo {
    my ($self, $ws) = @_;
    await $ws->accept;
    await $ws->each_text(async sub {
        my ($text) = @_;
        await $ws->send_text("Echo: $text");
    });
}

async sub events {
    my ($self, $sse) = @_;
    await $sse->start;
    await $sse->every(1, async sub {
        await $sse->send_json({ time => time });
    });
}

1;

Run it:

# app.pl
use MyApp;
MyApp->to_app;

# Terminal:
pagi-server app.pl --port 5000

Test it:

# HTTP
curl http://localhost:5000/
curl http://localhost:5000/users
curl http://localhost:5000/users/1
curl -X POST http://localhost:5000/users -H 'Content-Type: application/json' -d '{"name":"Charlie"}'

# Protected route
curl http://localhost:5000/admin
curl -H 'Authorization: Bearer secret' http://localhost:5000/admin

# WebSocket (JavaScript)
const ws = new WebSocket('ws://localhost:5000/ws');
ws.onmessage = (e) => console.log(e.data);
ws.send('Hello!');

# SSE (JavaScript)
const sse = new EventSource('http://localhost:5000/events');
sse.onmessage = (e) => console.log(JSON.parse(e.data));

PART 5: MIDDLEWARE

Middleware provides reusable functionality that wraps your application handlers. PAGI includes 30+ middleware components for logging, security, compression, sessions, and more.

5.1 Using Middleware with Builder

The PAGI::Middleware::Builder DSL lets you compose middleware into your application.

Global Middleware

Apply middleware to all routes using the builder function:

use PAGI::Middleware::Builder;
use PAGI::App::Router;

my $router = PAGI::App::Router->new;

$router->get('/' => async sub {
    my ($scope, $receive, $send) = @_;
    await $send->({
        type    => 'http.response.start',
        status  => 200,
        headers => [['content-type', 'text/plain']],
    });
    await $send->({
        type => 'http.response.body',
        body => 'Hello World',
    });
});

my $inner_app = $router->to_app;

# Wrap with middleware using builder
my $app = builder {
    enable 'AccessLog', format => 'combined';
    enable 'CORS', origins => '*';
    enable 'GZip', min_size => 1024;
    $inner_app;
};

Middleware executes in order: AccessLog -> CORS -> GZip -> app.

Multiple Middleware Example

my $app = builder {
    enable 'RequestId';                      # Add X-Request-Id header
    enable 'AccessLog', format => 'tiny';    # Log requests
    enable 'Runtime';                        # Add X-Runtime header
    enable 'CORS', origins => '*';           # Enable CORS
    enable 'SecurityHeaders';                # Add security headers
    enable 'GZip', min_size => 1024;         # Compress responses
    enable 'ErrorHandler', mode => 'development';  # Pretty errors
    $router->to_app;
};

5.2 Route-Level Middleware

Apply middleware to specific routes only:

use PAGI::App::Router;
use PAGI::Middleware::RateLimit;

my $router = PAGI::App::Router->new;

my $rate_limit = PAGI::Middleware::RateLimit->new(
    rate   => 10,
    period => 60,
);

# Apply rate limiting only to this route
$router->get('/api/heavy' => [$rate_limit] => async sub {
    my ($scope, $receive, $send) = @_;
    await $send->({
        type    => 'http.response.start',
        status  => 200,
        headers => [['content-type', 'text/plain']],
    });
    await $send->({
        type => 'http.response.body',
        body => 'Heavy resource',
    });
});

my $app = $router->to_app;

You can stack multiple middleware per route:

use PAGI::Middleware::CSRF;
use PAGI::Middleware::Session;

my $session = PAGI::Middleware::Session->new(secret => 'secret-key');
my $csrf = PAGI::Middleware::CSRF->new;

$router->post('/admin/delete' => [$session, $csrf] => async sub {
    # Handler has session and CSRF protection
    ...
});

5.3 Essential Middleware Reference

Logging & Debugging

  • PAGI::Middleware::AccessLog

    Log HTTP requests in Apache-style formats:

    enable 'AccessLog', format => 'combined';  # Apache combined log
    enable 'AccessLog', format => 'common';    # Apache common log
    enable 'AccessLog', format => 'tiny';      # Minimal format
  • PAGI::Middleware::RequestId

    Add unique request ID to each request:

    enable 'RequestId';  # Adds X-Request-Id header

    Access via $scope->{'pagi.request_id'}.

  • PAGI::Middleware::Runtime

    Add response time header:

    enable 'Runtime';  # Adds X-Runtime header in seconds

Security

  • PAGI::Middleware::CORS

    Enable Cross-Origin Resource Sharing:

    enable 'CORS',
        origins     => '*',                    # or ['https://example.com']
        methods     => ['GET', 'POST'],
        headers     => ['Content-Type'],
        credentials => 1,
        max_age     => 86400;
  • PAGI::Middleware::SecurityHeaders

    Add security-related HTTP headers:

    enable 'SecurityHeaders';  # Adds CSP, X-Frame-Options, etc.

    Includes:

    X-Frame-Options: DENY
    X-Content-Type-Options: nosniff
    X-XSS-Protection: 1; mode=block
    Referrer-Policy: strict-origin-when-cross-origin
  • PAGI::Middleware::CSRF

    Protect against Cross-Site Request Forgery:

    enable 'CSRF', cookie_name => '_csrf_token';

    Validates CSRF tokens on POST/PUT/PATCH/DELETE requests.

Sessions & Cookies

  • PAGI::Middleware::Cookie

    Parse request cookies:

    enable 'Cookie';

    Access via $scope->{'pagi.cookies'}.

  • PAGI::Middleware::Session

    Server-side sessions with signed cookies:

    enable 'Session',
        secret      => 'your-secret-key',
        cookie_name => 'session',
        max_age     => 86400;

    Access via $scope->{'pagi.session'}.

Performance

  • PAGI::Middleware::GZip

    Compress responses with gzip:

    enable 'GZip',
        min_size => 1024,               # Only compress if > 1KB
        types    => ['text/', 'application/json'];
  • PAGI::Middleware::ETag

    Automatic ETag generation and validation:

    enable 'ETag';

    Returns 304 Not Modified for matching If-None-Match headers.

  • PAGI::Middleware::RateLimit

    Rate limiting with configurable strategies:

    enable 'RateLimit',
        rate   => 100,                  # requests
        period => 60,                   # seconds
        by     => sub {                 # key generator
            my ($scope) = @_;
            return $scope->{client}[0];  # by IP
        };

Error Handling

  • PAGI::Middleware::ErrorHandler

    Catch and format errors:

    enable 'ErrorHandler', mode => 'development';  # Pretty HTML errors
    enable 'ErrorHandler', mode => 'production';   # JSON errors

5.4 Writing Custom Middleware

Create reusable middleware by extending PAGI::Middleware:

package MyApp::Middleware::Auth;
use parent 'PAGI::Middleware';
use Future::AsyncAwait;

sub new {
    my ($class, %args) = @_;
    return bless { secret => $args{secret} }, $class;
}

sub wrap {
    my ($self, $app) = @_;

    return async sub {
        my ($scope, $receive, $send) = @_;

        # Check auth header
        my $auth = $self->get_header($scope, 'Authorization');

        if ($auth && $auth =~ /^Bearer (.+)/) {
            my $token = $1;
            my $user = verify_token($token, $self->{secret});
            $scope->{'pagi.stash'}{user} = $user if $user;
        }

        # Call next middleware/app
        await $app->($scope, $receive, $send);
    };
}

sub get_header {
    my ($self, $scope, $name) = @_;
    $name = lc $name;
    for my $header (@{$scope->{headers} || []}) {
        return $header->[1] if lc($header->[0]) eq $name;
    }
    return undef;
}

sub verify_token {
    my ($token, $secret) = @_;
    # Token verification logic here
    return { id => 1, username => 'alice' };
}

1;

Use it with the builder:

my $app = builder {
    enable 'MyApp::Middleware::Auth', secret => 'secret-key';
    $router->to_app;
};

Or per-route:

my $auth = MyApp::Middleware::Auth->new(secret => 'secret-key');
$router->get('/admin' => [$auth] => $handler);

Middleware Patterns

  • Modify scope

    Add data for downstream handlers:

    sub wrap {
        my ($self, $app) = @_;
        return async sub {
            my ($scope, $receive, $send) = @_;
            $scope->{'pagi.stash'}{custom_data} = 'value';
            await $app->($scope, $receive, $send);
        };
    }
  • Intercept responses

    Modify outgoing events:

    sub wrap {
        my ($self, $app) = @_;
        return async sub {
            my ($scope, $receive, $send) = @_;
    
            my $wrapped_send = $self->intercept_send($send, async sub {
                my ($event, $original_send) = @_;
                if ($event->{type} eq 'http.response.start') {
                    push @{$event->{headers}}, ['x-custom', 'value'];
                }
                await $original_send->($event);
            });
    
            await $app->($scope, $receive, $wrapped_send);
        };
    }
  • Short-circuit requests

    Return early without calling inner app:

    sub wrap {
        my ($self, $app) = @_;
        return async sub {
            my ($scope, $receive, $send) = @_;
    
            # Check condition
            unless ($self->is_authorized($scope)) {
                await $send->({
                    type    => 'http.response.start',
                    status  => 403,
                    headers => [['content-type', 'text/plain']],
                });
                await $send->({
                    type => 'http.response.body',
                    body => 'Forbidden',
                });
                return;  # Don't call $app
            }
    
            await $app->($scope, $receive, $send);
        };
    }

PART 6: BUILT-IN APPLICATIONS

PAGI ships with several ready-to-use applications for common tasks. These can be mounted with routers or used standalone.

6.1 PAGI::App::File - Static Files

PAGI::App::File serves static files with security, caching, and streaming.

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::App::File;

my $router = PAGI::App::Router->new;

# Serve static files from ./public directory
my $static = PAGI::App::File->new(root => './public');
$router->mount('/static' => $static->to_app);

# Your API routes
$router->get('/api/hello' => async sub {
    my ($scope, $receive, $send) = @_;
    require PAGI::Response;
    my $res = PAGI::Response->new($send);
    await $res->json({ message => 'Hello!' });
});

$router->to_app;

Features:

  • Efficient streaming (large files don't consume memory)

  • ETag caching with 304 Not Modified support

  • HTTP Range requests for resume support

  • Automatic MIME type detection

  • Security: path traversal protection, symlink escape detection

  • Index files (index.html) for directories

Configuration options:

my $static = PAGI::App::File->new(
    root         => './public',               # Root directory
    default_type => 'application/octet-stream', # Fallback MIME type
    index        => ['index.html', 'index.htm'], # Index file names
);

See examples/sse-dashboard/ for a real-world example using PAGI::App::File.

6.2 PAGI::App::Healthcheck - Health Endpoints

PAGI::App::Healthcheck creates health check endpoints for load balancers and monitoring.

use strict;
use warnings;
use PAGI::App::Router;
use PAGI::App::Healthcheck;

my $router = PAGI::App::Router->new;

# Simple health endpoint
my $health = PAGI::App::Healthcheck->new(
    version => '1.0.0',
    checks => {
        database => sub {
            # Return true if healthy, false if not
            return $db && $db->ping;
        },
        cache => sub {
            return $redis && $redis->ping;
        },
    },
);

$router->mount('/health' => $health->to_app);

$router->to_app;

Response format (JSON):

# 200 OK when all checks pass:
{
    "status": "ok",
    "timestamp": 1703318400,
    "uptime": 86400,
    "version": "1.0.0",
    "checks": {
        "database": { "status": "ok" },
        "cache": { "status": "ok" }
    }
}

# 503 Service Unavailable when any check fails:
{
    "status": "error",
    "checks": {
        "database": { "status": "error", "message": "Connection refused" }
    }
}

6.3 PAGI::App::URLMap - Mount Applications

PAGI::App::URLMap routes requests to different apps based on URL prefix.

use strict;
use warnings;
use PAGI::App::URLMap;
use PAGI::App::File;

# Create applications
my $api_app = require './api.pl';
my $admin_app = require './admin.pl';
my $static = PAGI::App::File->new(root => './public')->to_app;

# Mount at different paths
my $urlmap = PAGI::App::URLMap->new;
$urlmap->mount('/api' => $api_app);
$urlmap->mount('/admin' => $admin_app);
$urlmap->mount('/static' => $static);

# Default handler for unmatched paths
$urlmap->mount('/' => sub {
    my ($scope, $receive, $send) = @_;
    # ...serve homepage
});

$urlmap->to_app;

Key features:

  • Path prefixes are stripped before passing to mounted apps

  • script_name is set so apps know their mount point

  • Longest prefix matches first

Alternative syntax with hashref:

$urlmap->map({
    '/api'    => $api_app,
    '/admin'  => $admin_app,
    '/'       => $main_app,
});

6.4 PAGI::App::Cascade - Try Apps in Sequence

PAGI::App::Cascade tries apps in order until one returns a non-404 response.

use strict;
use warnings;
use PAGI::App::Cascade;
use PAGI::App::File;

my $static = PAGI::App::File->new(root => './public')->to_app;
my $api = require './api.pl';
my $fallback = require './fallback.pl';

# Try static files first, then API, then fallback
my $app = PAGI::App::Cascade->new(
    apps => [$static, $api, $fallback],
    catch => [404, 405],  # Status codes to try next app on
);

$app->to_app;

Use cases:

  • Serve static files if they exist, otherwise route to dynamic handler

  • Try multiple microservices until one handles the request

  • Implement feature flags (new handler with old handler fallback)

6.5 PAGI::App::Proxy - Reverse Proxy

PAGI::App::Proxy forwards requests to backend servers.

use strict;
use warnings;
use PAGI::App::Router;
use PAGI::App::Proxy;

my $router = PAGI::App::Router->new;

# Proxy all /api requests to backend
my $proxy = PAGI::App::Proxy->new(
    backend => 'http://localhost:8080',
    timeout => 30,
    headers => {
        'X-Request-ID' => 'generated-id',
    },
);

$router->mount('/api' => $proxy->to_app);

$router->to_app;

The proxy automatically adds:

  • X-Forwarded-For (client IP)

  • X-Forwarded-Proto (http/https)

6.6 PAGI::App::WrapPSGI - Use Existing PSGI Apps

PAGI::App::WrapPSGI lets you use existing PSGI applications with PAGI.

use strict;
use warnings;
use PAGI::App::Router;
use PAGI::App::WrapPSGI;

# Your existing PSGI app (or from a framework like Dancer2)
my $legacy_psgi = sub {
    my ($env) = @_;
    return [200, ['Content-Type' => 'text/plain'], ['Hello from PSGI!']];
};

# Wrap it for PAGI
my $wrapped = PAGI::App::WrapPSGI->new(
    psgi_app => $legacy_psgi,
);

my $router = PAGI::App::Router->new;
$router->mount('/legacy' => $wrapped->to_app);

$router->to_app;

This is useful for:

  • Migrating from PSGI to PAGI incrementally

  • Using existing PSGI middleware or frameworks

  • Running legacy apps alongside new PAGI endpoints

PART 7: REAL-WORLD PATTERNS

7.1 Background Tasks

Web requests should respond quickly. Long-running work should be done in the background.

Pattern 1: Fire-and-Forget Async I/O

For non-blocking operations (async HTTP calls, async database queries), use ->retain() to prevent "lost future" warnings:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::Response;

my $router = PAGI::App::Router->new;

# Simulated async email service
async sub send_welcome_email {
    my ($email) = @_;
    # In reality: await $http_client->post_async($email_api, ...);
    await IO::Async::Loop->new->delay_future(after => 2);
    warn "Email sent to $email\n";
}

$router->post('/signup' => async sub {
    my ($scope, $receive, $send) = @_;
    my $res = PAGI::Response->new($send);

    # Respond immediately
    await $res->status(201)->json({ status => 'created' });

    # Fire-and-forget: retain() prevents GC and warnings
    send_welcome_email('user@example.com')->retain();
});

$router->to_app;

Pattern 2: CPU-Bound Work in Subprocess

For blocking or CPU-intensive work, use IO::Async::Function to run in a child process:

use strict;
use warnings;
use Future::AsyncAwait;
use IO::Async::Function;
use IO::Async::Loop;
use PAGI::App::Router;
use PAGI::Response;

my $loop = IO::Async::Loop->new;

# Worker runs code in subprocess
my $pdf_worker = IO::Async::Function->new(
    code => sub {
        my ($data) = @_;
        # This blocking code runs in a CHILD PROCESS
        # It doesn't block the main event loop
        sleep 5;  # Simulate heavy work
        return "PDF generated for $data";
    },
);
$loop->add($pdf_worker);

my $router = PAGI::App::Router->new;

$router->post('/generate-pdf' => async sub {
    my ($scope, $receive, $send) = @_;
    my $res = PAGI::Response->new($send);

    # Option A: Wait for result (blocks this request only)
    my $result = await $pdf_worker->call(args => ['report']);
    await $res->json({ result => $result });

    # Option B: Fire-and-forget (respond immediately)
    # my $f = $pdf_worker->call(args => ['report']);
    # $f->on_done(sub { warn "PDF ready: $_[0]\n" });
    # $f->on_fail(sub { warn "PDF failed: $_[0]\n" });
    # $f->retain();
    # await $res->json({ status => 'processing' });
});

$router->to_app;

Warning: IO::Async::Function forks child processes. If running with --workers N, each server worker forks its own Function workers. With 4 server workers and 2 Function workers each, you have 12 processes. For high-volume CPU work, consider a dedicated job queue instead.

See examples/background-tasks/ for complete working examples.

7.2 Form Handling

URL-Encoded Forms

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    if ($req->method eq 'POST') {
        my $form = await $req->form;
        my $username = $form->{username};
        my $password = $form->{password};

        # Validate and process...
        await $res->redirect('/dashboard');
    }
    else {
        await $res->html(<<'HTML');
<form method="POST">
    <input name="username" placeholder="Username">
    <input name="password" type="password" placeholder="Password">
    <button type="submit">Login</button>
</form>
HTML
    }
}

\&app;

File Uploads (Multipart)

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::Request;
use PAGI::Response;

async sub app {
    my ($scope, $receive, $send) = @_;

    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    if ($req->method eq 'POST') {
        my $uploads = await $req->uploads;

        for my $upload (@$uploads) {
            my $filename = $upload->filename;
            my $size = $upload->size;
            my $type = $upload->content_type;

            # Save to disk
            $upload->save_to('/uploads/' . $filename);

            # Or read content
            # my $content = $upload->content;
        }

        await $res->json({ uploaded => scalar(@$uploads) });
    }
    else {
        await $res->html(<<'HTML');
<form method="POST" enctype="multipart/form-data">
    <input type="file" name="file" multiple>
    <button type="submit">Upload</button>
</form>
HTML
    }
}

\&app;

See examples/10-forms-and-uploads/ for complete examples.

7.3 Sessions

Use PAGI::Middleware::Session for session management:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::Middleware::Session;
use PAGI::Request;
use PAGI::Response;

my $router = PAGI::App::Router->new;

# Add session middleware
$router->use(PAGI::Middleware::Session->new(
    secret => 'your-secret-key-here',
    cookie_name => 'session_id',
    expires => 86400,  # 1 day
));

$router->get('/login' => async sub {
    my ($scope, $receive, $send) = @_;
    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Set session data
    $req->session->{user_id} = 123;
    $req->session->{username} = 'alice';

    await $res->text('Logged in!');
});

$router->get('/profile' => async sub {
    my ($scope, $receive, $send) = @_;
    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Read session data
    my $username = $req->session->{username} // 'Guest';

    await $res->text("Hello, $username!");
});

$router->get('/logout' => async sub {
    my ($scope, $receive, $send) = @_;
    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # Clear session
    $req->session({});

    await $res->text('Logged out!');
});

$router->to_app;

7.4 Authentication

Basic Auth

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::Middleware::BasicAuth;
use PAGI::Response;

my $router = PAGI::App::Router->new;

# Protect routes with Basic Auth
$router->use(PAGI::Middleware::BasicAuth->new(
    realm => 'Admin Area',
    authenticator => sub {
        my ($username, $password) = @_;
        return $username eq 'admin' && $password eq 'secret';
    },
));

$router->get('/' => async sub {
    my ($scope, $receive, $send) = @_;
    my $res = PAGI::Response->new($send);
    await $res->text('Protected content');
});

$router->to_app;

Bearer Token (JWT)

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::Middleware::Auth::Bearer;
use PAGI::Request;
use PAGI::Response;

my $router = PAGI::App::Router->new;

# Verify JWT tokens
$router->use(PAGI::Middleware::Auth::Bearer->new(
    validate => sub {
        my ($token) = @_;
        # Return user data if valid, undef if not
        return decode_jwt($token);  # Your JWT decode logic
    },
));

$router->get('/api/me' => async sub {
    my ($scope, $receive, $send) = @_;
    my $req = PAGI::Request->new($scope, $receive);
    my $res = PAGI::Response->new($send);

    # User data from validated token
    my $user = $req->stash->{user};

    await $res->json({ user => $user });
});

$router->to_app;

7.5 PubSub for WebSocket

For real-time features with multiple WebSocket clients:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::WebSocket;
use PAGI::Server::PubSub;

my $pubsub = PAGI::Server::PubSub->new;
my $router = PAGI::App::Router->new;

$router->websocket('/chat/:room' => async sub {
    my ($scope, $receive, $send) = @_;
    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    my $room = $scope->{'pagi.params'}{room};

    await $ws->accept;

    # Subscribe to room
    my $sub = $pubsub->subscribe($room, sub {
        my ($message) = @_;
        $ws->try_send_text($message);
    });

    # Broadcast received messages
    await $ws->each_text(sub {
        my ($text) = @_;
        $pubsub->publish($room, $text);
    });

    # Unsubscribe on disconnect
    $pubsub->unsubscribe($room, $sub);
});

$router->to_app;

Important: PAGI::Server::PubSub is in-memory only and works within a single process. For multi-worker or multi-server deployments, use Redis pub/sub or a message broker.

See examples/websocket-chat/ for a complete chat application example.

7.6 SSE Dashboard

Server-Sent Events are ideal for real-time dashboards:

use strict;
use warnings;
use Future::AsyncAwait;
use PAGI::App::Router;
use PAGI::SSE;

my $router = PAGI::App::Router->new;

$router->get('/events' => async sub {
    my ($scope, $receive, $send) = @_;

    my $sse = PAGI::SSE->new($scope, $receive, $send);
    await $sse->start;

    # Enable keepalive to prevent timeout
    $sse->keepalive(15);

    # Send updates every second
    await $sse->every(1, async sub {
        await $sse->send_json({
            cpu => rand(100),
            memory => rand(100),
            requests => int(rand(1000)),
            timestamp => time(),
        }, event => 'metrics');
    });
});

$router->to_app;

JavaScript client:

const events = new EventSource('/events');

events.addEventListener('metrics', (e) => {
    const data = JSON.parse(e.data);
    document.getElementById('cpu').textContent = data.cpu.toFixed(1) + '%';
    document.getElementById('memory').textContent = data.memory.toFixed(1) + '%';
});

See examples/sse-dashboard/ for a complete dashboard example.

7.7 TLS/HTTPS

PAGI::Server supports TLS natively:

pagi-server app.pl --port 443 \
    --tls-cert /path/to/cert.pem \
    --tls-key /path/to/key.pem

For development, generate a self-signed certificate:

openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem \
    -days 365 -nodes -subj '/CN=localhost'

pagi-server app.pl --port 3443 \
    --tls-cert cert.pem --tls-key key.pem

Then access via https://localhost:3443 (browser will warn about self-signed cert).

See examples/11-tls-https/ for more TLS configuration options.

7.8 Testing PAGI Applications

Test PAGI apps directly without a running server:

use strict;
use warnings;
use Test2::V0;
use Future::AsyncAwait;

# Load your app
my $app = require './app.pl';

# Test helper to create scope and capture response
async sub test_request {
    my (%opts) = @_;

    my $scope = {
        type         => 'http',
        method       => $opts{method} // 'GET',
        path         => $opts{path} // '/',
        query_string => $opts{query} // '',
        headers      => $opts{headers} // [],
    };

    my @body_parts = defined $opts{body} ? ($opts{body}) : ();
    my $body_sent = 0;

    my $receive = async sub {
        if (@body_parts) {
            return {
                type => 'http.request',
                body => shift @body_parts,
                more => scalar(@body_parts) > 0,
            };
        }
        return { type => 'http.disconnect' };
    };

    my %response = (status => undef, headers => [], body => '');
    my $send = async sub {
        my ($event) = @_;
        if ($event->{type} eq 'http.response.start') {
            $response{status} = $event->{status};
            $response{headers} = $event->{headers};
        }
        elsif ($event->{type} eq 'http.response.body') {
            $response{body} .= $event->{body} // '';
        }
    };

    await $app->($scope, $receive, $send);
    return \%response;
}

# Tests
subtest 'GET /' => async sub {
    my $res = await test_request(path => '/');
    is $res->{status}, 200, 'status is 200';
    like $res->{body}, qr/Hello/, 'body contains Hello';
};

subtest 'POST /api/users' => async sub {
    my $res = await test_request(
        method => 'POST',
        path => '/api/users',
        headers => [['content-type', 'application/json']],
        body => '{"name":"Alice"}',
    );
    is $res->{status}, 201, 'status is 201';
};

done_testing;

PART 8: REFERENCE

8.1 Scope Reference

The $scope hashref contains connection metadata:

Accessing the Event Loop

The event loop is not part of $scope. To access it, use the IO::Async::Loop singleton:

use IO::Async::Loop;
my $loop = IO::Async::Loop->new;  # Returns cached singleton

This works because IO::Async::Loop caches the loop instance - calling ->new always returns the same loop that the server created at startup.

Common Fields (All Types)

$scope->{type}         # 'http', 'websocket', 'sse', 'lifespan'

HTTP Scope

$scope->{type}         # 'http'
$scope->{method}       # 'GET', 'POST', etc.
$scope->{path}         # URL path (decoded UTF-8)
$scope->{raw_path}     # URL path (raw bytes)
$scope->{query_string} # Query string (raw bytes)
$scope->{headers}      # Arrayref of [name, value] pairs
$scope->{scheme}       # 'http' or 'https'
$scope->{http_version} # '1.0' or '1.1'
$scope->{client}       # [ip, port] of client
$scope->{server}       # [ip, port] of server

WebSocket Scope

$scope->{type}         # 'websocket'
$scope->{path}         # URL path
$scope->{query_string} # Query string
$scope->{headers}      # Request headers
$scope->{subprotocols} # Arrayref of requested subprotocols
$scope->{client}       # [ip, port] of client

SSE Scope

$scope->{type}         # 'sse'
$scope->{path}         # URL path
$scope->{query_string} # Query string
$scope->{headers}      # Request headers
$scope->{client}       # [ip, port] of client

Lifespan Scope

$scope->{type}         # 'lifespan'

8.2 Event Reference

HTTP Events

Receive:

{ type => 'http.request', body => $bytes, more => 1 }
{ type => 'http.disconnect' }

Send:

{ type => 'http.response.start', status => 200, headers => [...] }
{ type => 'http.response.body', body => $bytes, more => 0 }

WebSocket Events

Receive:

{ type => 'websocket.connect' }
{ type => 'websocket.receive', text => $text }
{ type => 'websocket.receive', bytes => $bytes }
{ type => 'websocket.disconnect', code => 1000, reason => '' }

Send:

{ type => 'websocket.accept', subprotocol => 'optional' }
{ type => 'websocket.send', text => $text }
{ type => 'websocket.send', bytes => $bytes }
{ type => 'websocket.close', code => 1000, reason => '' }

SSE Events

Receive:

{ type => 'sse.connect' }
{ type => 'sse.disconnect' }

Send:

{ type => 'sse.response.start', status => 200, headers => [...] }
{ type => 'sse.response.body', data => $text, event => 'name', id => '1' }

Lifespan Events

Receive:

{ type => 'lifespan.startup' }
{ type => 'lifespan.shutdown' }

Send:

{ type => 'lifespan.startup.complete' }
{ type => 'lifespan.startup.failed', message => $error }
{ type => 'lifespan.shutdown.complete' }
{ type => 'lifespan.shutdown.failed', message => $error }

NEXT STEPS

You've covered the essentials! For more information:

  • Browse examples/ for complete working applications

  • Check individual module documentation (PAGI::Response, PAGI::WebSocket, etc.)

  • Read the PAGI specification in docs/pagi-spec.md

SEE ALSO

AUTHOR

PAGI Contributors

COPYRIGHT AND LICENSE

This software is copyright (c) 2025 by the PAGI contributors.

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.