NAME
PAGI::SSE - Convenience wrapper for PAGI Server-Sent Events connections
SYNOPSIS
use PAGI::SSE;
use Future::AsyncAwait;
# Simple notification stream
async sub app {
my ($scope, $receive, $send) = @_;
my $sse = PAGI::SSE->new($scope, $receive, $send);
# Enable keepalive for proxy compatibility
await $sse->keepalive(25);
# Cleanup on disconnect - with reason for logging
$sse->on_close(sub {
my ($sse, $reason) = @_;
remove_subscriber($sse->stash->{sub_id});
log_disconnect($reason); # 'client_closed', 'write_error', etc.
});
# Handle reconnection
if (my $last_id = $sse->last_event_id) {
my @missed = get_events_since($last_id);
for my $event (@missed) {
await $sse->send_event(%$event);
}
}
# Subscribe to updates
$sse->stash->{sub_id} = add_subscriber(sub {
my ($event) = @_;
$sse->try_send_json($event);
});
# Wait for disconnect
await $sse->run;
}
DESCRIPTION
PAGI::SSE wraps the raw PAGI SSE protocol to provide a clean, high-level API inspired by Starlette. It eliminates protocol boilerplate and provides:
Multiple send methods (send, send_json, send_event)
Connection state tracking (is_started, is_closed)
Cleanup callback registration (on_close)
Safe send methods for broadcast scenarios (try_send_*)
Reconnection support (last_event_id)
Keepalive timer for proxy compatibility
Iteration helper (each)
Per-connection storage (stash)
CONSTRUCTOR
new
my $sse = PAGI::SSE->new($scope, $receive, $send);
Creates a new SSE wrapper. Requires:
$scope- PAGI scope hashref withtype => 'sse'$receive- Async coderef returning Futures for events$send- Async coderef for sending events
Dies if scope type is not 'sse'.
Singleton pattern: The SSE object is cached in $scope->{'pagi.sse'}. If you call new() multiple times with the same scope, you get the same SSE object back. This ensures consistent state (is_started, is_closed, callbacks) across multiple code paths that may create SSE objects from the same scope.
SCOPE ACCESSORS
scope, path, raw_path, query_string, scheme, http_version
my $path = $sse->path; # /events
my $qs = $sse->query_string; # token=abc
header, headers, header_all
my $auth = $sse->header('authorization');
my @cookies = $sse->header_all('cookie');
last_event_id
my $id = $sse->last_event_id; # From Last-Event-ID header
Returns the Last-Event-ID header sent by reconnecting clients. Use this to replay missed events.
stash
$sse->stash->{client_id} = $id;
my $user = $sse->stash->{user};
Returns the per-request stash hashref. The stash lives in the request scope and is shared across all middleware, handlers, and subrouters processing the same request.
Note: For worker-level state (database connections, config), use $sse->state to access application state injected by PAGI::Lifespan.
path_param
my $channel = $sse->path_param('channel');
Returns a path parameter by name. Path parameters are captured from the URL path by a router and stored in $scope->{path_params}.
path_params
my $params = $sse->path_params;
Returns hashref of all path parameters from scope.
state
my $state = $sse->state;
my $db = $sse->state->{db};
Returns the application state hashref injected by PAGI::Lifespan. This contains worker-level shared state like database connections and configuration. Returns empty hashref if no state was injected.
LIFECYCLE METHODS
start
await $sse->start;
await $sse->start(status => 200, headers => [...]);
Starts the SSE stream. Called automatically on first send. Idempotent - only sends sse.start once.
close
$sse->close;
Marks connection as closed and runs on_close callbacks.
run
await $sse->run;
Waits for client disconnect. Use this at the end of your handler to keep the connection open.
CONNECTION STATE ACCESSORS
is_started, is_closed, connection_state
if ($sse->is_started) { ... }
if ($sse->is_closed) { ... }
my $state = $sse->connection_state; # 'pending', 'started', 'closed'
disconnect_reason
my $reason = $sse->disconnect_reason;
Returns the reason for disconnect, if available. Common values:
client_closed- Client closed connection normallywrite_error- Failed to write data (network error)send_timeout- Send operation timed outidle_timeout- Connection closed due to inactivity
Returns undef if connection is still open or reason is unknown.
SEND METHODS
send
await $sse->send("Hello world");
Sends a data-only event.
send_json
await $sse->send_json({ type => 'update', data => $payload });
JSON-encodes data before sending.
send_event
await $sse->send_event(
data => $data, # Required (auto JSON-encodes refs)
event => 'notification', # Optional event type
id => 'msg-123', # Optional event ID
retry => 5000, # Optional reconnect hint (ms)
);
Sends a full SSE event with all fields.
try_send, try_send_json, try_send_event
my $ok = await $sse->try_send_json($data);
if (!$ok) {
# Client disconnected
}
Returns true on success, false on failure. Does not throw. Useful for broadcasting to multiple clients.
KEEPALIVE
keepalive
await $sse->keepalive(30); # Ping every 30 seconds
await $sse->keepalive(30, 'ping'); # Custom comment text
await $sse->keepalive(0); # Disable
Sends an sse.keepalive event to the server, which then handles sending periodic SSE comments to keep the connection alive and prevent proxy timeouts. The server manages the timer internally - this method is loop-agnostic.
ITERATION
each
# Simple iteration
await $sse->each(\@items, async sub {
my ($item) = @_;
await $sse->send_json($item);
});
# With transformer - return event spec
await $sse->each(\@items, async sub {
my ($item, $index) = @_;
return {
data => $item,
event => 'item',
id => $index,
};
});
# Coderef iterator
await $sse->each($iterator_sub, async sub { ... });
Iterates over items, calling callback for each. If callback returns a hashref, sends it as an event.
every
# Send metrics every 2 seconds
await $sse->every(2, async sub {
await $sse->send_event(
event => 'metrics',
data => get_current_metrics(),
);
});
Periodically executes a callback with a delay between iterations. The loop continues until the connection closes or the callback throws.
Requires Future::IO - this method will croak if Future::IO is not installed. Install with: cpanm Future::IO
Parameters:
$interval- Seconds between iterations (required, must be > 0)$callback- Async coderef to execute (required)
The callback is executed first, then the method sleeps for the interval before the next iteration. If the callback throws, the connection is closed and on_close callbacks are run.
EVENT CALLBACKS
on_close
$sse->on_close(sub {
my ($sse, $reason) = @_;
if ($reason eq 'client_closed') {
# Normal disconnect
cleanup_resources();
} else {
# Error condition
log_error("Unexpected disconnect: $reason");
}
});
Registers cleanup callback. Runs on disconnect or close(). Multiple callbacks run in registration order.
Callbacks receive two arguments:
$sse- The SSE connection object$reason- Why the connection closed (see "disconnect_reason")
on_error
$sse->on_error(sub {
my ($sse, $error) = @_;
warn "SSE error: $error";
});
Registers error callback.
EXAMPLE: LIVE DASHBOARD
async sub dashboard_sse {
my ($scope, $receive, $send) = @_;
my $sse = PAGI::SSE->new($scope, $receive, $send);
await $sse->keepalive(25);
# Send initial state
await $sse->send_event(
event => 'connected',
data => { time => time() },
);
# Subscribe to metrics
my $sub_id = subscribe_metrics(sub {
my ($metrics) = @_;
$sse->try_send_event(
event => 'metrics',
data => $metrics,
);
});
$sse->on_close(sub {
my ($sse, $reason) = @_;
unsubscribe_metrics($sub_id);
# Log abnormal disconnects for debugging
warn "SSE client disconnected: $reason"
unless $reason eq 'client_closed';
});
await $sse->run;
}
SEE ALSO
PAGI::WebSocket - Similar wrapper for WebSocket connections
PAGI::Server - PAGI protocol server
AUTHOR
PAGI Contributors