NAME

PAGI::SSE - Convenience wrapper for PAGI Server-Sent Events connections

SYNOPSIS

use PAGI::SSE;
use Future::AsyncAwait;

# Simple notification stream
async sub app {
    my ($scope, $receive, $send) = @_;

    my $sse = PAGI::SSE->new($scope, $receive, $send);

    # Enable keepalive for proxy compatibility
    await $sse->keepalive(25);

    # Cleanup on disconnect - with reason for logging
    $sse->on_close(sub {
        my ($sse, $reason) = @_;
        remove_subscriber($sse->stash->{sub_id});
        log_disconnect($reason);  # 'client_closed', 'write_error', etc.
    });

    # Handle reconnection
    if (my $last_id = $sse->last_event_id) {
        my @missed = get_events_since($last_id);
        for my $event (@missed) {
            await $sse->send_event(%$event);
        }
    }

    # Subscribe to updates
    $sse->stash->{sub_id} = add_subscriber(sub {
        my ($event) = @_;
        $sse->try_send_json($event);
    });

    # Wait for disconnect
    await $sse->run;
}

DESCRIPTION

PAGI::SSE wraps the raw PAGI SSE protocol to provide a clean, high-level API inspired by Starlette. It eliminates protocol boilerplate and provides:

  • Multiple send methods (send, send_json, send_event)

  • Connection state tracking (is_started, is_closed)

  • Cleanup callback registration (on_close)

  • Safe send methods for broadcast scenarios (try_send_*)

  • Reconnection support (last_event_id)

  • Keepalive timer for proxy compatibility

  • Iteration helper (each)

  • Per-connection storage (stash)

CONSTRUCTOR

new

my $sse = PAGI::SSE->new($scope, $receive, $send);

Creates a new SSE wrapper. Requires:

  • $scope - PAGI scope hashref with type => 'sse'

  • $receive - Async coderef returning Futures for events

  • $send - Async coderef for sending events

Dies if scope type is not 'sse'.

Singleton pattern: The SSE object is cached in $scope->{'pagi.sse'}. If you call new() multiple times with the same scope, you get the same SSE object back. This ensures consistent state (is_started, is_closed, callbacks) across multiple code paths that may create SSE objects from the same scope.

SCOPE ACCESSORS

scope, path, raw_path, query_string, scheme, http_version

my $path = $sse->path;              # /events
my $qs = $sse->query_string;        # token=abc

header, headers, header_all

my $auth = $sse->header('authorization');
my @cookies = $sse->header_all('cookie');

last_event_id

my $id = $sse->last_event_id;       # From Last-Event-ID header

Returns the Last-Event-ID header sent by reconnecting clients. Use this to replay missed events.

stash

$sse->stash->{client_id} = $id;
my $user = $sse->stash->{user};

Returns the per-request stash hashref. The stash lives in the request scope and is shared across all middleware, handlers, and subrouters processing the same request.

Note: For worker-level state (database connections, config), use $sse->state to access application state injected by PAGI::Lifespan.

path_param

my $channel = $sse->path_param('channel');

Returns a path parameter by name. Path parameters are captured from the URL path by a router and stored in $scope->{path_params}.

path_params

my $params = $sse->path_params;

Returns hashref of all path parameters from scope.

state

my $state = $sse->state;
my $db = $sse->state->{db};

Returns the application state hashref injected by PAGI::Lifespan. This contains worker-level shared state like database connections and configuration. Returns empty hashref if no state was injected.

LIFECYCLE METHODS

start

await $sse->start;
await $sse->start(status => 200, headers => [...]);

Starts the SSE stream. Called automatically on first send. Idempotent - only sends sse.start once.

close

$sse->close;

Marks connection as closed and runs on_close callbacks.

run

await $sse->run;

Waits for client disconnect. Use this at the end of your handler to keep the connection open.

CONNECTION STATE ACCESSORS

is_started, is_closed, connection_state

if ($sse->is_started) { ... }
if ($sse->is_closed) { ... }
my $state = $sse->connection_state;    # 'pending', 'started', 'closed'

disconnect_reason

my $reason = $sse->disconnect_reason;

Returns the reason for disconnect, if available. Common values:

  • client_closed - Client closed connection normally

  • write_error - Failed to write data (network error)

  • send_timeout - Send operation timed out

  • idle_timeout - Connection closed due to inactivity

Returns undef if connection is still open or reason is unknown.

SEND METHODS

send

await $sse->send("Hello world");

Sends a data-only event.

send_json

await $sse->send_json({ type => 'update', data => $payload });

JSON-encodes data before sending.

send_event

await $sse->send_event(
    data  => $data,              # Required (auto JSON-encodes refs)
    event => 'notification',     # Optional event type
    id    => 'msg-123',          # Optional event ID
    retry => 5000,               # Optional reconnect hint (ms)
);

Sends a full SSE event with all fields.

try_send, try_send_json, try_send_event

my $ok = await $sse->try_send_json($data);
if (!$ok) {
    # Client disconnected
}

Returns true on success, false on failure. Does not throw. Useful for broadcasting to multiple clients.

KEEPALIVE

keepalive

await $sse->keepalive(30);              # Ping every 30 seconds
await $sse->keepalive(30, 'ping');      # Custom comment text
await $sse->keepalive(0);               # Disable

Sends an sse.keepalive event to the server, which then handles sending periodic SSE comments to keep the connection alive and prevent proxy timeouts. The server manages the timer internally - this method is loop-agnostic.

ITERATION

each

# Simple iteration
await $sse->each(\@items, async sub {
    my ($item) = @_;
    await $sse->send_json($item);
});

# With transformer - return event spec
await $sse->each(\@items, async sub {
    my ($item, $index) = @_;
    return {
        data  => $item,
        event => 'item',
        id    => $index,
    };
});

# Coderef iterator
await $sse->each($iterator_sub, async sub { ... });

Iterates over items, calling callback for each. If callback returns a hashref, sends it as an event.

every

# Send metrics every 2 seconds
await $sse->every(2, async sub {
    await $sse->send_event(
        event => 'metrics',
        data  => get_current_metrics(),
    );
});

Periodically executes a callback with a delay between iterations. The loop continues until the connection closes or the callback throws.

Requires Future::IO - this method will croak if Future::IO is not installed. Install with: cpanm Future::IO

Parameters:

  • $interval - Seconds between iterations (required, must be > 0)

  • $callback - Async coderef to execute (required)

The callback is executed first, then the method sleeps for the interval before the next iteration. If the callback throws, the connection is closed and on_close callbacks are run.

EVENT CALLBACKS

on_close

$sse->on_close(sub {
    my ($sse, $reason) = @_;
    if ($reason eq 'client_closed') {
        # Normal disconnect
        cleanup_resources();
    } else {
        # Error condition
        log_error("Unexpected disconnect: $reason");
    }
});

Registers cleanup callback. Runs on disconnect or close(). Multiple callbacks run in registration order.

Callbacks receive two arguments:

  • $sse - The SSE connection object

  • $reason - Why the connection closed (see "disconnect_reason")

on_error

$sse->on_error(sub {
    my ($sse, $error) = @_;
    warn "SSE error: $error";
});

Registers error callback.

EXAMPLE: LIVE DASHBOARD

async sub dashboard_sse {
    my ($scope, $receive, $send) = @_;

    my $sse = PAGI::SSE->new($scope, $receive, $send);

    await $sse->keepalive(25);

    # Send initial state
    await $sse->send_event(
        event => 'connected',
        data  => { time => time() },
    );

    # Subscribe to metrics
    my $sub_id = subscribe_metrics(sub {
        my ($metrics) = @_;
        $sse->try_send_event(
            event => 'metrics',
            data  => $metrics,
        );
    });

    $sse->on_close(sub {
        my ($sse, $reason) = @_;
        unsubscribe_metrics($sub_id);
        # Log abnormal disconnects for debugging
        warn "SSE client disconnected: $reason"
            unless $reason eq 'client_closed';
    });

    await $sse->run;
}

SEE ALSO

PAGI::WebSocket - Similar wrapper for WebSocket connections

PAGI::Server - PAGI protocol server

AUTHOR

PAGI Contributors