NAME
PAGI::SSE - Convenience wrapper for PAGI Server-Sent Events connections
SYNOPSIS
use PAGI::SSE;
use Future::AsyncAwait;
# Simple notification stream
async sub app {
my ($scope, $receive, $send) = @_;
my $sse = PAGI::SSE->new($scope, $receive, $send);
# Enable keepalive for proxy compatibility
$sse->keepalive(25);
# Cleanup on disconnect
$sse->on_close(sub {
remove_subscriber($sse->stash->{sub_id});
});
# Handle reconnection
if (my $last_id = $sse->last_event_id) {
my @missed = get_events_since($last_id);
for my $event (@missed) {
await $sse->send_event(%$event);
}
}
# Subscribe to updates
$sse->stash->{sub_id} = add_subscriber(sub {
my ($event) = @_;
$sse->try_send_json($event);
});
# Wait for disconnect
await $sse->run;
}
DESCRIPTION
PAGI::SSE wraps the raw PAGI SSE protocol to provide a clean, high-level API inspired by Starlette. It eliminates protocol boilerplate and provides:
Multiple send methods (send, send_json, send_event)
Connection state tracking (is_started, is_closed)
Cleanup callback registration (on_close)
Safe send methods for broadcast scenarios (try_send_*)
Reconnection support (last_event_id)
Keepalive timer for proxy compatibility
Iteration helper (each)
Per-connection storage (stash)
CONSTRUCTOR
new
my $sse = PAGI::SSE->new($scope, $receive, $send);
Creates a new SSE wrapper. Requires:
$scope- PAGI scope hashref withtype ='sse'>$receive- Async coderef returning Futures for events$send- Async coderef for sending events
Dies if scope type is not 'sse'.
SCOPE ACCESSORS
scope, path, raw_path, query_string, scheme, http_version
my $path = $sse->path; # /events
my $qs = $sse->query_string; # token=abc
header, headers, header_all
my $auth = $sse->header('authorization');
my @cookies = $sse->header_all('cookie');
last_event_id
my $id = $sse->last_event_id; # From Last-Event-ID header
Returns the Last-Event-ID header sent by reconnecting clients. Use this to replay missed events.
stash
$sse->stash->{client_id} = $id;
my $user = $sse->stash->{user};
Returns the per-request stash hashref. The stash lives in the request scope and is shared across all middleware, handlers, and subrouters processing the same request.
Note: For worker-level state (database connections, config), use $sse->state to access application state injected by PAGI::Lifespan.
path_param
my $channel = $sse->path_param('channel');
Returns a path parameter by name. Path parameters are captured from the URL path by a router and stored in $scope->{path_params}.
path_params
my $params = $sse->path_params;
Returns hashref of all path parameters from scope.
state
my $state = $sse->state;
my $db = $sse->state->{db};
Returns the application state hashref injected by PAGI::Lifespan. This contains worker-level shared state like database connections and configuration. Returns empty hashref if no state was injected.
LIFECYCLE METHODS
start
await $sse->start;
await $sse->start(status => 200, headers => [...]);
Starts the SSE stream. Called automatically on first send. Idempotent - only sends sse.start once.
close
$sse->close;
Marks connection as closed and runs on_close callbacks.
run
await $sse->run;
Waits for client disconnect. Use this at the end of your handler to keep the connection open.
CONNECTION STATE ACCESSORS
is_started, is_closed, connection_state
if ($sse->is_started) { ... }
if ($sse->is_closed) { ... }
my $state = $sse->connection_state; # 'pending', 'started', 'closed'
SEND METHODS
send
await $sse->send("Hello world");
Sends a data-only event.
send_json
await $sse->send_json({ type => 'update', data => $payload });
JSON-encodes data before sending.
send_event
await $sse->send_event(
data => $data, # Required (auto JSON-encodes refs)
event => 'notification', # Optional event type
id => 'msg-123', # Optional event ID
retry => 5000, # Optional reconnect hint (ms)
);
Sends a full SSE event with all fields.
try_send, try_send_json, try_send_event
my $ok = await $sse->try_send_json($data);
if (!$ok) {
# Client disconnected
}
Returns true on success, false on failure. Does not throw. Useful for broadcasting to multiple clients.
KEEPALIVE
keepalive
$sse->keepalive(30); # Ping every 30 seconds
$sse->keepalive(30, ':ping'); # Custom comment text
$sse->keepalive(0); # Disable
Sends periodic comment pings to prevent proxy timeouts. Requires an event loop (auto-created if needed).
ITERATION
each
# Simple iteration
await $sse->each(\@items, async sub {
my ($item) = @_;
await $sse->send_json($item);
});
# With transformer - return event spec
await $sse->each(\@items, async sub {
my ($item, $index) = @_;
return {
data => $item,
event => 'item',
id => $index,
};
});
# Coderef iterator
await $sse->each($iterator_sub, async sub { ... });
Iterates over items, calling callback for each. If callback returns a hashref, sends it as an event.
every
await $sse->every(1, async sub {
await $sse->send_event(
event => 'tick',
data => { ts => time },
);
});
Calls the callback every $interval seconds until client disconnects. Useful for periodic updates.
EVENT CALLBACKS
on_close
$sse->on_close(sub {
my ($sse) = @_;
cleanup_resources();
});
Registers cleanup callback. Runs on disconnect or close(). Multiple callbacks run in registration order.
on_error
$sse->on_error(sub {
my ($sse, $error) = @_;
warn "SSE error: $error";
});
Registers error callback.
EXAMPLE: LIVE DASHBOARD
async sub dashboard_sse {
my ($scope, $receive, $send) = @_;
my $sse = PAGI::SSE->new($scope, $receive, $send);
$sse->keepalive(25);
# Send initial state
await $sse->send_event(
event => 'connected',
data => { time => time() },
);
# Subscribe to metrics
my $sub_id = subscribe_metrics(sub {
my ($metrics) = @_;
$sse->try_send_event(
event => 'metrics',
data => $metrics,
);
});
$sse->on_close(sub {
unsubscribe_metrics($sub_id);
});
await $sse->run;
}
SEE ALSO
PAGI::WebSocket - Similar wrapper for WebSocket connections
PAGI::Server - PAGI protocol server
AUTHOR
PAGI Contributors