NAME

PAGI::SSE - Convenience wrapper for PAGI Server-Sent Events connections

SYNOPSIS

use PAGI::SSE;
use Future::AsyncAwait;

# Simple notification stream
async sub app {
    my ($scope, $receive, $send) = @_;

    my $sse = PAGI::SSE->new($scope, $receive, $send);

    # Enable keepalive for proxy compatibility
    $sse->keepalive(25);

    # Cleanup on disconnect
    $sse->on_close(sub {
        remove_subscriber($sse->stash->{sub_id});
    });

    # Handle reconnection
    if (my $last_id = $sse->last_event_id) {
        my @missed = get_events_since($last_id);
        for my $event (@missed) {
            await $sse->send_event(%$event);
        }
    }

    # Subscribe to updates
    $sse->stash->{sub_id} = add_subscriber(sub {
        my ($event) = @_;
        $sse->try_send_json($event);
    });

    # Wait for disconnect
    await $sse->run;
}

DESCRIPTION

PAGI::SSE wraps the raw PAGI SSE protocol to provide a clean, high-level API inspired by Starlette. It eliminates protocol boilerplate and provides:

  • Multiple send methods (send, send_json, send_event)

  • Connection state tracking (is_started, is_closed)

  • Cleanup callback registration (on_close)

  • Safe send methods for broadcast scenarios (try_send_*)

  • Reconnection support (last_event_id)

  • Keepalive timer for proxy compatibility

  • Iteration helper (each)

  • Per-connection storage (stash)

CONSTRUCTOR

new

my $sse = PAGI::SSE->new($scope, $receive, $send);

Creates a new SSE wrapper. Requires:

  • $scope - PAGI scope hashref with type = 'sse'>

  • $receive - Async coderef returning Futures for events

  • $send - Async coderef for sending events

Dies if scope type is not 'sse'.

SCOPE ACCESSORS

scope, path, raw_path, query_string, scheme, http_version

my $path = $sse->path;              # /events
my $qs = $sse->query_string;        # token=abc

header, headers, header_all

my $auth = $sse->header('authorization');
my @cookies = $sse->header_all('cookie');

last_event_id

my $id = $sse->last_event_id;       # From Last-Event-ID header

Returns the Last-Event-ID header sent by reconnecting clients. Use this to replay missed events.

stash

$sse->stash->{client_id} = $id;
my $user = $sse->stash->{user};

Returns the per-request stash hashref. The stash lives in the request scope and is shared across all middleware, handlers, and subrouters processing the same request.

Note: For worker-level state (database connections, config), use $sse->state to access application state injected by PAGI::Lifespan.

path_param

my $channel = $sse->path_param('channel');

Returns a path parameter by name. Path parameters are captured from the URL path by a router and stored in $scope->{path_params}.

path_params

my $params = $sse->path_params;

Returns hashref of all path parameters from scope.

state

my $state = $sse->state;
my $db = $sse->state->{db};

Returns the application state hashref injected by PAGI::Lifespan. This contains worker-level shared state like database connections and configuration. Returns empty hashref if no state was injected.

LIFECYCLE METHODS

start

await $sse->start;
await $sse->start(status => 200, headers => [...]);

Starts the SSE stream. Called automatically on first send. Idempotent - only sends sse.start once.

close

$sse->close;

Marks connection as closed and runs on_close callbacks.

run

await $sse->run;

Waits for client disconnect. Use this at the end of your handler to keep the connection open.

CONNECTION STATE ACCESSORS

is_started, is_closed, connection_state

if ($sse->is_started) { ... }
if ($sse->is_closed) { ... }
my $state = $sse->connection_state;    # 'pending', 'started', 'closed'

SEND METHODS

send

await $sse->send("Hello world");

Sends a data-only event.

send_json

await $sse->send_json({ type => 'update', data => $payload });

JSON-encodes data before sending.

send_event

await $sse->send_event(
    data  => $data,              # Required (auto JSON-encodes refs)
    event => 'notification',     # Optional event type
    id    => 'msg-123',          # Optional event ID
    retry => 5000,               # Optional reconnect hint (ms)
);

Sends a full SSE event with all fields.

try_send, try_send_json, try_send_event

my $ok = await $sse->try_send_json($data);
if (!$ok) {
    # Client disconnected
}

Returns true on success, false on failure. Does not throw. Useful for broadcasting to multiple clients.

KEEPALIVE

keepalive

$sse->keepalive(30);              # Ping every 30 seconds
$sse->keepalive(30, ':ping');     # Custom comment text
$sse->keepalive(0);               # Disable

Sends periodic comment pings to prevent proxy timeouts. Requires an event loop (auto-created if needed).

ITERATION

each

# Simple iteration
await $sse->each(\@items, async sub {
    my ($item) = @_;
    await $sse->send_json($item);
});

# With transformer - return event spec
await $sse->each(\@items, async sub {
    my ($item, $index) = @_;
    return {
        data  => $item,
        event => 'item',
        id    => $index,
    };
});

# Coderef iterator
await $sse->each($iterator_sub, async sub { ... });

Iterates over items, calling callback for each. If callback returns a hashref, sends it as an event.

every

await $sse->every(1, async sub {
    await $sse->send_event(
        event => 'tick',
        data  => { ts => time },
    );
});

Calls the callback every $interval seconds until client disconnects. Useful for periodic updates.

EVENT CALLBACKS

on_close

$sse->on_close(sub {
    my ($sse) = @_;
    cleanup_resources();
});

Registers cleanup callback. Runs on disconnect or close(). Multiple callbacks run in registration order.

on_error

$sse->on_error(sub {
    my ($sse, $error) = @_;
    warn "SSE error: $error";
});

Registers error callback.

EXAMPLE: LIVE DASHBOARD

async sub dashboard_sse {
    my ($scope, $receive, $send) = @_;

    my $sse = PAGI::SSE->new($scope, $receive, $send);

    $sse->keepalive(25);

    # Send initial state
    await $sse->send_event(
        event => 'connected',
        data  => { time => time() },
    );

    # Subscribe to metrics
    my $sub_id = subscribe_metrics(sub {
        my ($metrics) = @_;
        $sse->try_send_event(
            event => 'metrics',
            data  => $metrics,
        );
    });

    $sse->on_close(sub {
        unsubscribe_metrics($sub_id);
    });

    await $sse->run;
}

SEE ALSO

PAGI::WebSocket - Similar wrapper for WebSocket connections

PAGI::Server - PAGI protocol server

AUTHOR

PAGI Contributors