NAME
PAGI::Context::SSE - SSE context with protocol operations
SYNOPSIS
use PAGI::Context;
use Future::AsyncAwait;
# Simple notification stream — $ctx is all you need
async sub notifications {
my ($scope, $receive, $send) = @_;
my $ctx = PAGI::Context->new($scope, $receive, $send);
await $ctx->keepalive(25);
# Replay missed events on reconnect
if (my $last_id = $ctx->last_event_id) {
for my $evt (get_events_since($last_id)) {
await $ctx->send_event(%$evt);
}
}
# Periodic metrics push
await $ctx->every(2, async sub {
await $ctx->send_event(
event => 'metrics',
data => get_metrics(),
);
});
}
# Event-dispatched with channels
async sub dashboard {
my ($scope, $receive, $send) = @_;
my $ctx = PAGI::Context->new($scope, $receive, $send);
await $ctx->start;
await $ctx->keepalive(25);
my $reason = await $ctx
->on('metrics.update', async sub {
my ($ctx, $event) = @_;
await $ctx->send_json($event);
})
->on('alert.fired', async sub {
my ($ctx, $event) = @_;
await $ctx->send_event(
event => 'alert',
data => $event,
);
})
->on_error(sub {
my ($ctx, $err, $source) = @_;
warn "[$source] $err";
})
->run;
}
DESCRIPTION
Returned by PAGI::Context->new(...) when $scope->{type} is 'sse'. Inherits all shared methods from PAGI::Context (scope accessors, stash, session, event dispatcher) and adds SSE protocol operations delegated to PAGI::SSE.
This means handler code can use a single $ctx object for everything: dispatching events with on()/run(), sending SSE events with send_json() or send_event(), and reading query params with query_param().
Underlying object access
my $sse = $ctx->sse;
The underlying PAGI::SSE object is still available if you need direct access. In most cases you should not need it.
Methods NOT delegated
The following PAGI::SSE methods are not available on $ctx. They are intentionally omitted because PAGI::Context has its own versions with different semantics:
on()— On$ctx, this is the generic event dispatcher ("on" in PAGI::Context) that accepts any event type string. On PAGI::SSE, it only dispatchescloseanderror.on_error()— On$ctx, callbacks receive($ctx, $error, $source). On PAGI::SSE, they receive($sse, $error).on_close()— PAGI::SSE-specific callback registration. Use$ctx->on('sse.disconnect', ...)instead.run()— On$ctx, this is the generic event dispatch loop ("run" in PAGI::Context). On PAGI::SSE, it only waits for disconnect.
CONNECTION LIFECYCLE
start
await $ctx->start;
await $ctx->start(status => 200, headers => [...]);
Starts the SSE stream. Called automatically on first send. Idempotent — only sends sse.start once.
close
$ctx->close;
Marks connection as closed and runs on_close callbacks.
SEND METHODS
send
await $ctx->send("Hello world");
Sends a data-only SSE event.
send_json
await $ctx->send_json({ type => 'update', data => $payload });
JSON-encodes data before sending.
send_event
await $ctx->send_event(
data => $data, # Required (auto JSON-encodes refs)
event => 'notification', # Optional event type
id => 'msg-123', # Optional event ID
retry => 5000, # Optional reconnect hint (ms)
);
Sends a full SSE event with all fields.
send_comment
await $ctx->send_comment('keepalive');
Sends an SSE comment (does not trigger onmessage in browsers).
try_send, try_send_json, try_send_comment, try_send_event
my $ok = await $ctx->try_send_json($data);
Returns true on success, false on failure. Does not throw.
ITERATION HELPERS
each
await $ctx->each(\@items, async sub {
my ($item, $index) = @_;
await $ctx->send_json($item);
});
Iterates over items (arrayref or coderef iterator), calling callback for each.
every
await $ctx->every(2, async sub {
await $ctx->send_json(get_metrics());
});
Periodic callback execution with interval delay. Requires Future::IO.
STATE INSPECTION
is_started
if ($ctx->is_started) { ... }
True after start() or first send.
is_closed
if ($ctx->is_closed) { ... }
True after close or disconnect.
last_event_id
my $id = $ctx->last_event_id;
Returns the Last-Event-ID header sent by reconnecting clients.
QUERY PARAMETERS
query_param
my $value = $ctx->query_param('channel');
my $value = $ctx->query_param('channel', strict => 1);
Returns a single query parameter value.
query_params
my $params = $ctx->query_params; # Hash::MultiValue
All query parameters as Hash::MultiValue.
raw_query_param, raw_query_params
my $val = $ctx->raw_query_param('name');
my $params = $ctx->raw_query_params;
Skip UTF-8 decoding.
HEADER EXTRAS
header_all
my @accepts = $ctx->header_all('accept');
All values for a multi-value header.
http_version
my $ver = $ctx->http_version; # '1.1' or '2'
KEEPALIVE
keepalive
await $ctx->keepalive(25); # Comment every 25s
await $ctx->keepalive(25, 'ping'); # Custom comment text
Enables SSE keepalive comments for proxy compatibility.
SEE ALSO
PAGI::Context, PAGI::SSE, PAGI::Context::WebSocket, PAGI::Context::HTTP
1 POD Error
The following errors were encountered while parsing the POD:
- Around line 77:
Non-ASCII character seen before =encoding in '—'. Assuming UTF-8