NAME

PAGI::SSE - Convenience wrapper for PAGI Server-Sent Events connections

SYNOPSIS

use PAGI::SSE;
use Future::AsyncAwait;

# Simple notification stream
async sub app {
    my ($scope, $receive, $send) = @_;

    my $sse = PAGI::SSE->new($scope, $receive, $send);

    # Enable keepalive for proxy compatibility
    await $sse->keepalive(25);

    # Per-connection state
    use PAGI::Stash;
    my $stash = PAGI::Stash->new($sse);

    # Cleanup on disconnect - with reason for logging
    $sse->on_close(sub {
        my ($sse, $reason) = @_;
        remove_subscriber($stash->get('sub_id'));
        log_disconnect($reason);  # 'client_closed', 'write_error', etc.
    });

    # Handle reconnection
    if (my $last_id = $sse->last_event_id) {
        my @missed = get_events_since($last_id);
        for my $event (@missed) {
            await $sse->send_event(%$event);
        }
    }

    # Subscribe to updates
    $stash->set(sub_id => add_subscriber(sub {
        my ($event) = @_;
        $sse->try_send_json($event);
    }));

    # Wait for disconnect
    await $sse->run;
}

DESCRIPTION

PAGI::SSE wraps the raw PAGI SSE protocol to provide a clean, high-level API inspired by Starlette. It eliminates protocol boilerplate and provides:

  • Multiple send methods (send, send_json, send_event)

  • Connection state tracking (is_started, is_closed)

  • Cleanup callback registration (on_close)

  • Safe send methods for broadcast scenarios (try_send_*)

  • Reconnection support (last_event_id)

  • Keepalive timer for proxy compatibility

  • Iteration helper (each)

  • Per-connection storage (via PAGI::Stash)

CONSTRUCTOR

new

my $sse = PAGI::SSE->new($scope, $receive, $send);

Creates a new SSE wrapper. Requires:

  • $scope - PAGI scope hashref with type => 'sse'

  • $receive - Async coderef returning Futures for events

  • $send - Async coderef for sending events

Dies if scope type is not 'sse'.

Singleton pattern: The SSE object is cached in $scope->{'pagi.sse'}. If you call new() multiple times with the same scope, you get the same SSE object back. This ensures consistent state (is_started, is_closed, callbacks) across multiple code paths that may create SSE objects from the same scope.

SCOPE ACCESSORS

scope, path, raw_path, query_string, scheme, http_version

my $path = $sse->path;              # /events
my $qs = $sse->query_string;        # token=abc

header, headers, header_all

my $auth = $sse->header('authorization');
my @cookies = $sse->header_all('cookie');

last_event_id

my $id = $sse->last_event_id;       # From Last-Event-ID header

Returns the Last-Event-ID header sent by reconnecting clients. Use this to replay missed events.

Per-Connection Shared State

See PAGI::Stash for per-connection shared state:

use PAGI::Stash;
my $stash = PAGI::Stash->new($sse);

path_param

my $channel = $sse->path_param('channel');

Returns a path parameter by name. Path parameters are captured from the URL path by a router and stored in $scope->{path_params}.

path_params

my $params = $sse->path_params;

Returns hashref of all path parameters from scope.

query_params

my $params = $sse->query_params;
my $params = $sse->query_params(strict => 1);
my $params = $sse->query_params(raw => 1);

Returns query string parameters as a Hash::MultiValue. Handles URL decoding and UTF-8 decoding automatically.

Options:

strict => 1

Croak on invalid UTF-8 sequences instead of replacing with substitution character.

raw => 1

Skip UTF-8 decoding, return raw bytes after URL decoding.

raw_query_params

my $params = $sse->raw_query_params;

Shortcut for query_params(raw => 1).

query_param

my $value = $sse->query_param('name');
my $value = $sse->query_param('name', strict => 1);

Returns a single query parameter value by name. Accepts same options as query_params.

raw_query_param

my $value = $sse->raw_query_param('name');

Shortcut for query_param($name, raw => 1).

state

my $state = $sse->state;
my $db = $sse->state->{db};

Returns the application state hashref injected by PAGI::Lifespan. This contains worker-level shared state like database connections and configuration. Returns empty hashref if no state was injected.

LIFECYCLE METHODS

start

await $sse->start;
await $sse->start(status => 200, headers => [...]);

Starts the SSE stream. Called automatically on first send. Idempotent - only sends sse.start once.

close

$sse->close;

Marks connection as closed and runs on_close callbacks.

run

await $sse->run;

Waits for client disconnect. Use this at the end of your handler to keep the connection open.

CONNECTION STATE ACCESSORS

is_started, is_closed, connection_state

if ($sse->is_started) { ... }
if ($sse->is_closed) { ... }
my $state = $sse->connection_state;    # 'pending', 'started', 'closed'

disconnect_reason

my $reason = $sse->disconnect_reason;

Returns the reason for disconnect, if available. Common values:

  • client_closed - Client closed connection normally

  • write_error - Failed to write data (network error)

  • send_timeout - Send operation timed out

  • idle_timeout - Connection closed due to inactivity

Returns undef if connection is still open or reason is unknown.

SEND METHODS

send

await $sse->send("Hello world");

Sends a data-only event.

send_json

await $sse->send_json({ type => 'update', data => $payload });

JSON-encodes data before sending.

send_event

await $sse->send_event(
    data  => $data,              # Required (auto JSON-encodes refs)
    event => 'notification',     # Optional event type
    id    => 'msg-123',          # Optional event ID
    retry => 5000,               # Optional reconnect hint (ms)
);

Sends a full SSE event with all fields.

try_send, try_send_json, try_send_event

my $ok = await $sse->try_send_json($data);
if (!$ok) {
    # Client disconnected
}

Returns true on success, false on failure. Does not throw. Useful for broadcasting to multiple clients.

KEEPALIVE

keepalive

await $sse->keepalive(30);              # Ping every 30 seconds
await $sse->keepalive(30, 'ping');      # Custom comment text
await $sse->keepalive(0);               # Disable

Sends an sse.keepalive event to the server, which then handles sending periodic SSE comments to keep the connection alive and prevent proxy timeouts. The server manages the timer internally - this method is loop-agnostic.

ITERATION

each

# Simple iteration
await $sse->each(\@items, async sub {
    my ($item) = @_;
    await $sse->send_json($item);
});

# With transformer - return event spec
await $sse->each(\@items, async sub {
    my ($item, $index) = @_;
    return {
        data  => $item,
        event => 'item',
        id    => $index,
    };
});

# Coderef iterator
await $sse->each($iterator_sub, async sub { ... });

Iterates over items, calling callback for each. If callback returns a hashref, sends it as an event.

every

# Send metrics every 2 seconds
await $sse->every(2, async sub {
    await $sse->send_event(
        event => 'metrics',
        data  => get_current_metrics(),
    );
});

Periodically executes a callback with a delay between iterations. The loop continues until the connection closes or the callback throws.

Requires Future::IO - this method will croak if Future::IO is not installed and configured. You must configure Future::IO in your app before using every().

For apps running under PAGI::Server (using pagi-server):

# app.pl
use Future::IO::Impl::IOAsync;

# ... rest of your app

For apps running under other event loops:

# If using IO::Async (PAGI::Server)
use Future::IO::Impl::IOAsync;

# If using UV
use Future::IO::Impl::UV;

Parameters:

  • $interval - Seconds between iterations (required, must be > 0)

  • $callback - Async coderef to execute (required)

The callback is executed first, then the method sleeps for the interval before the next iteration. If the callback throws, the connection is closed and on_close callbacks are run.

EVENT CALLBACKS

on_close

$sse->on_close(sub {
    my ($sse, $reason) = @_;
    if ($reason eq 'client_closed') {
        cleanup_resources();
    } else {
        log_error("Unexpected disconnect: $reason");
    }
});

# Async callback — return value is awaited automatically
$sse->on_close(async sub {
    my ($sse, $reason) = @_;
    await cleanup_async($reason);
});

Registers cleanup callback. Runs on disconnect or close(). Callbacks can be regular subs or async subs — async results are automatically awaited. Multiple callbacks run in registration order. Exceptions are caught and warned but do not prevent other callbacks.

Callbacks receive two arguments:

  • $sse - The SSE connection object (same as $self)

  • $reason - Why the connection closed (see "disconnect_reason")

Returns $self for chaining.

Circular reference note: If your callback captures the $sse object in a closure, use the $sse argument instead — it is the same object but does not create an additional reference cycle. If you must capture it, use Scalar::Util::weaken:

use Scalar::Util qw(weaken);
my $weak_sse = $sse;
weaken($weak_sse);
$sse->on_close(sub { $weak_sse->... if $weak_sse });

The callback arrays are cleared after firing, so any cycle via a closure is broken at connection close, but weaken prevents the object from being kept alive until that point.

on_error

$sse->on_error(sub {
    my ($sse, $error) = @_;
    warn "SSE error: $error";
});

# Async callback — return value is awaited automatically
$sse->on_error(async sub {
    my ($sse, $error) = @_;
    await log_error_async($error);
});

Registers error callback. Called when a try_send* method fails (e.g., because the client disconnected mid-write). Callbacks can be regular subs or async subs — async results are automatically awaited. Multiple callbacks run in registration order. Exceptions are caught and warned but do not prevent other callbacks.

Callbacks receive two arguments:

  • $sse - The SSE connection object

  • $error - The error string

If no error handlers are registered, the error is warned to STDERR.

Returns $self for chaining.

on

$sse->on(close => sub { my ($sse, $reason) = @_; ... });
$sse->on(error => sub { my ($sse, $error)  = @_; ... });

# Chaining
$sse->on(close => sub { ... })
    ->on(error => sub { ... });

Generic event dispatcher. Dispatches to on_close or on_error based on the event name. Dies if an unknown event name is given.

Returns $self for chaining.

EXAMPLE: LIVE DASHBOARD

async sub dashboard_sse {
    my ($scope, $receive, $send) = @_;

    my $sse = PAGI::SSE->new($scope, $receive, $send);

    await $sse->keepalive(25);

    # Send initial state
    await $sse->send_event(
        event => 'connected',
        data  => { time => time() },
    );

    # Subscribe to metrics
    my $sub_id = subscribe_metrics(sub {
        my ($metrics) = @_;
        $sse->try_send_event(
            event => 'metrics',
            data  => $metrics,
        );
    });

    $sse->on_close(sub {
        my ($sse, $reason) = @_;
        unsubscribe_metrics($sub_id);
        # Log abnormal disconnects for debugging
        warn "SSE client disconnected: $reason"
            unless $reason eq 'client_closed';
    });

    await $sse->run;
}

SEE ALSO

PAGI::WebSocket - Similar wrapper for WebSocket connections

PAGI::Server - PAGI protocol server

AUTHOR

PAGI Contributors

1 POD Error

The following errors were encountered while parsing the POD:

Around line 1035:

Non-ASCII character seen before =encoding in '—'. Assuming UTF-8