NAME
PAGI::SSE - Convenience wrapper for PAGI Server-Sent Events connections
SYNOPSIS
use PAGI::SSE;
use Future::AsyncAwait;
# Simple notification stream
async sub app {
my ($scope, $receive, $send) = @_;
my $sse = PAGI::SSE->new($scope, $receive, $send);
# Enable keepalive for proxy compatibility
await $sse->keepalive(25);
# Per-connection state
use PAGI::Stash;
my $stash = PAGI::Stash->new($sse);
# Cleanup on disconnect - with reason for logging
$sse->on_close(sub {
my ($sse, $reason) = @_;
remove_subscriber($stash->get('sub_id'));
log_disconnect($reason); # 'client_closed', 'write_error', etc.
});
# Handle reconnection
if (my $last_id = $sse->last_event_id) {
my @missed = get_events_since($last_id);
for my $event (@missed) {
await $sse->send_event(%$event);
}
}
# Subscribe to updates
$stash->set(sub_id => add_subscriber(sub {
my ($event) = @_;
$sse->try_send_json($event);
}));
# Wait for disconnect
await $sse->run;
}
DESCRIPTION
PAGI::SSE wraps the raw PAGI SSE protocol to provide a clean, high-level API inspired by Starlette. It eliminates protocol boilerplate and provides:
Multiple send methods (send, send_json, send_event)
Connection state tracking (is_started, is_closed)
Cleanup callback registration (on_close)
Safe send methods for broadcast scenarios (try_send_*)
Reconnection support (last_event_id)
Keepalive timer for proxy compatibility
Iteration helper (each)
Per-connection storage (via PAGI::Stash)
CONSTRUCTOR
new
my $sse = PAGI::SSE->new($scope, $receive, $send);
Creates a new SSE wrapper. Requires:
$scope- PAGI scope hashref withtype => 'sse'$receive- Async coderef returning Futures for events$send- Async coderef for sending events
Dies if scope type is not 'sse'.
Singleton pattern: The SSE object is cached in $scope->{'pagi.sse'}. If you call new() multiple times with the same scope, you get the same SSE object back. This ensures consistent state (is_started, is_closed, callbacks) across multiple code paths that may create SSE objects from the same scope.
SCOPE ACCESSORS
scope, path, raw_path, query_string, scheme, http_version
my $path = $sse->path; # /events
my $qs = $sse->query_string; # token=abc
header, headers, header_all
my $auth = $sse->header('authorization');
my @cookies = $sse->header_all('cookie');
last_event_id
my $id = $sse->last_event_id; # From Last-Event-ID header
Returns the Last-Event-ID header sent by reconnecting clients. Use this to replay missed events.
Per-Connection Shared State
See PAGI::Stash for per-connection shared state:
use PAGI::Stash;
my $stash = PAGI::Stash->new($sse);
path_param
my $channel = $sse->path_param('channel');
Returns a path parameter by name. Path parameters are captured from the URL path by a router and stored in $scope->{path_params}.
path_params
my $params = $sse->path_params;
Returns hashref of all path parameters from scope.
query_params
my $params = $sse->query_params;
my $params = $sse->query_params(strict => 1);
my $params = $sse->query_params(raw => 1);
Returns query string parameters as a Hash::MultiValue. Handles URL decoding and UTF-8 decoding automatically.
Options:
- strict => 1
-
Croak on invalid UTF-8 sequences instead of replacing with substitution character.
- raw => 1
-
Skip UTF-8 decoding, return raw bytes after URL decoding.
raw_query_params
my $params = $sse->raw_query_params;
Shortcut for query_params(raw => 1).
query_param
my $value = $sse->query_param('name');
my $value = $sse->query_param('name', strict => 1);
Returns a single query parameter value by name. Accepts same options as query_params.
raw_query_param
my $value = $sse->raw_query_param('name');
Shortcut for query_param($name, raw => 1).
state
my $state = $sse->state;
my $db = $sse->state->{db};
Returns the application state hashref injected by PAGI::Lifespan. This contains worker-level shared state like database connections and configuration. Returns empty hashref if no state was injected.
LIFECYCLE METHODS
start
await $sse->start;
await $sse->start(status => 200, headers => [...]);
Starts the SSE stream. Called automatically on first send. Idempotent - only sends sse.start once.
close
$sse->close;
Marks connection as closed and runs on_close callbacks.
run
await $sse->run;
Waits for client disconnect. Use this at the end of your handler to keep the connection open.
CONNECTION STATE ACCESSORS
is_started, is_closed, connection_state
if ($sse->is_started) { ... }
if ($sse->is_closed) { ... }
my $state = $sse->connection_state; # 'pending', 'started', 'closed'
disconnect_reason
my $reason = $sse->disconnect_reason;
Returns the reason for disconnect, if available. Common values:
client_closed- Client closed connection normallywrite_error- Failed to write data (network error)send_timeout- Send operation timed outidle_timeout- Connection closed due to inactivity
Returns undef if connection is still open or reason is unknown.
SEND METHODS
send
await $sse->send("Hello world");
Sends a data-only event.
send_json
await $sse->send_json({ type => 'update', data => $payload });
JSON-encodes data before sending.
send_event
await $sse->send_event(
data => $data, # Required (auto JSON-encodes refs)
event => 'notification', # Optional event type
id => 'msg-123', # Optional event ID
retry => 5000, # Optional reconnect hint (ms)
);
Sends a full SSE event with all fields.
try_send, try_send_json, try_send_event
my $ok = await $sse->try_send_json($data);
if (!$ok) {
# Client disconnected
}
Returns true on success, false on failure. Does not throw. Useful for broadcasting to multiple clients.
KEEPALIVE
keepalive
await $sse->keepalive(30); # Ping every 30 seconds
await $sse->keepalive(30, 'ping'); # Custom comment text
await $sse->keepalive(0); # Disable
Sends an sse.keepalive event to the server, which then handles sending periodic SSE comments to keep the connection alive and prevent proxy timeouts. The server manages the timer internally - this method is loop-agnostic.
ITERATION
each
# Simple iteration
await $sse->each(\@items, async sub {
my ($item) = @_;
await $sse->send_json($item);
});
# With transformer - return event spec
await $sse->each(\@items, async sub {
my ($item, $index) = @_;
return {
data => $item,
event => 'item',
id => $index,
};
});
# Coderef iterator
await $sse->each($iterator_sub, async sub { ... });
Iterates over items, calling callback for each. If callback returns a hashref, sends it as an event.
every
# Send metrics every 2 seconds
await $sse->every(2, async sub {
await $sse->send_event(
event => 'metrics',
data => get_current_metrics(),
);
});
Periodically executes a callback with a delay between iterations. The loop continues until the connection closes or the callback throws.
Requires Future::IO - this method will croak if Future::IO is not installed and configured. You must configure Future::IO in your app before using every().
For apps running under PAGI::Server (using pagi-server):
# app.pl
use Future::IO::Impl::IOAsync;
# ... rest of your app
For apps running under other event loops:
# If using IO::Async (PAGI::Server)
use Future::IO::Impl::IOAsync;
# If using UV
use Future::IO::Impl::UV;
Parameters:
$interval- Seconds between iterations (required, must be > 0)$callback- Async coderef to execute (required)
The callback is executed first, then the method sleeps for the interval before the next iteration. If the callback throws, the connection is closed and on_close callbacks are run.
EVENT CALLBACKS
on_close
$sse->on_close(sub {
my ($sse, $reason) = @_;
if ($reason eq 'client_closed') {
cleanup_resources();
} else {
log_error("Unexpected disconnect: $reason");
}
});
# Async callback — return value is awaited automatically
$sse->on_close(async sub {
my ($sse, $reason) = @_;
await cleanup_async($reason);
});
Registers cleanup callback. Runs on disconnect or close(). Callbacks can be regular subs or async subs — async results are automatically awaited. Multiple callbacks run in registration order. Exceptions are caught and warned but do not prevent other callbacks.
Callbacks receive two arguments:
$sse- The SSE connection object (same as$self)$reason- Why the connection closed (see "disconnect_reason")
Returns $self for chaining.
Circular reference note: If your callback captures the $sse object in a closure, use the $sse argument instead — it is the same object but does not create an additional reference cycle. If you must capture it, use Scalar::Util::weaken:
use Scalar::Util qw(weaken);
my $weak_sse = $sse;
weaken($weak_sse);
$sse->on_close(sub { $weak_sse->... if $weak_sse });
The callback arrays are cleared after firing, so any cycle via a closure is broken at connection close, but weaken prevents the object from being kept alive until that point.
on_error
$sse->on_error(sub {
my ($sse, $error) = @_;
warn "SSE error: $error";
});
# Async callback — return value is awaited automatically
$sse->on_error(async sub {
my ($sse, $error) = @_;
await log_error_async($error);
});
Registers error callback. Called when a try_send* method fails (e.g., because the client disconnected mid-write). Callbacks can be regular subs or async subs — async results are automatically awaited. Multiple callbacks run in registration order. Exceptions are caught and warned but do not prevent other callbacks.
Callbacks receive two arguments:
$sse- The SSE connection object$error- The error string
If no error handlers are registered, the error is warned to STDERR.
Returns $self for chaining.
on
$sse->on(close => sub { my ($sse, $reason) = @_; ... });
$sse->on(error => sub { my ($sse, $error) = @_; ... });
# Chaining
$sse->on(close => sub { ... })
->on(error => sub { ... });
Generic event dispatcher. Dispatches to on_close or on_error based on the event name. Dies if an unknown event name is given.
Returns $self for chaining.
EXAMPLE: LIVE DASHBOARD
async sub dashboard_sse {
my ($scope, $receive, $send) = @_;
my $sse = PAGI::SSE->new($scope, $receive, $send);
await $sse->keepalive(25);
# Send initial state
await $sse->send_event(
event => 'connected',
data => { time => time() },
);
# Subscribe to metrics
my $sub_id = subscribe_metrics(sub {
my ($metrics) = @_;
$sse->try_send_event(
event => 'metrics',
data => $metrics,
);
});
$sse->on_close(sub {
my ($sse, $reason) = @_;
unsubscribe_metrics($sub_id);
# Log abnormal disconnects for debugging
warn "SSE client disconnected: $reason"
unless $reason eq 'client_closed';
});
await $sse->run;
}
SEE ALSO
PAGI::WebSocket - Similar wrapper for WebSocket connections
PAGI::Server - PAGI protocol server
AUTHOR
PAGI Contributors
1 POD Error
The following errors were encountered while parsing the POD:
- Around line 1035:
Non-ASCII character seen before =encoding in '—'. Assuming UTF-8