NAME
PAGI::Test::SSE - Server-Sent Events connection for testing PAGI applications
SYNOPSIS
use PAGI::Test::Client;
my $client = PAGI::Test::Client->new(app => $sse_app);
# Callback style (auto-close)
$client->sse('/events', sub {
my ($sse) = @_;
my $event = $sse->receive_event;
is $event->{event}, 'connected';
is $event->{data}, '{"subscriber_id":1}';
});
# Explicit style
my $sse = $client->sse('/events');
my $event = $sse->receive_event;
is $event->{event}, 'update';
$sse->close;
# JSON convenience
my $sse = $client->sse('/events');
my $data = $sse->receive_json;
is $data->{subscriber_id}, 1;
$sse->close;
DESCRIPTION
PAGI::Test::SSE provides a test client for Server-Sent Events (SSE) connections in PAGI applications. It handles the SSE protocol handshake and event reception, making it easy to test SSE endpoints without starting a real server.
This module is typically used via PAGI::Test::Client's sse method rather than directly.
SSE is a unidirectional protocol where the server sends events to the client. Unlike WebSocket, the client cannot send messages back (except for disconnect).
This module is a simplified in-process model of an SSE connection. It is well-suited to application-level event testing, but it does not emulate transport timing, buffering, or wire-format behavior.
CONSTRUCTOR
new
my $sse = PAGI::Test::SSE->new(
app => $app, # Required: PAGI app coderef
scope => $scope, # Required: SSE scope hashref
);
Creates a new SSE test connection. Typically you don't call this directly; use PAGI::Test::Client's sse method instead.
METHODS
receive_event
my $event = $sse->receive_event;
my $event = $sse->receive_event(timeout => 10);
Waits for and returns the next event from the server. Returns a hashref with the following fields:
- event
-
The event type (optional). If not specified in the server message, this will be undef.
- data
-
The event data (required). This is the raw string data sent by the server.
- id
-
The event ID (optional). Can be used for reconnection logic.
- retry
-
The retry time in milliseconds (optional). Indicates how long the client should wait before reconnecting.
Returns undef if the connection is closed. Throws an exception if timeout is reached.
Current limitation: this method does not actually block or wait for the timeout duration. If no queued SSE event is immediately available, it throws an exception right away.
Example:
my $event = $sse->receive_event;
if ($event->{event} eq 'update') {
say "Received update: $event->{data}";
}
receive_json
my $data = $sse->receive_json;
my $data = $sse->receive_json(timeout => 10);
Waits for an event, extracts the data field, decodes it as JSON, and returns the resulting Perl data structure. Dies if the data is not valid JSON.
This is a convenience method equivalent to:
my $event = $sse->receive_event;
my $data = decode_json($event->{data});
LIMITATIONS
This helper does not simulate real SSE wire formatting, chunking, buffering, or transport timing.
The receive timeout arguments are advisory only at present; receive methods check the current queue immediately rather than waiting asynchronously.
For keepalive behavior, disconnect timing, or full transport-level testing, test against PAGI::Server.
Example:
my $data = $sse->receive_json;
is $data->{subscriber_id}, 1;
close
$sse->close;
Closes the SSE connection. This sends a sse.disconnect event to the application, allowing it to clean up resources.
is_closed
if ($sse->is_closed) {
say "Connection closed";
}
Returns true if the SSE connection has been closed.
INTERNAL METHODS
_start
$sse->_start;
Internal method called by PAGI::Test::Client to start the SSE connection, send the initial scope to the app, and wait for the sse.start event.
SSE PROTOCOL
This module implements the PAGI SSE protocol:
- 1. App sends
sse.startevent with status and headers - 2. App sends
sse.sendevents with event/data/id/retry fields - 3. Test sends
sse.disconnectevent when connection is closed
EXAMPLE
use Test2::V0;
use PAGI::Test::Client;
use Future::AsyncAwait;
# Simple SSE app that sends a few events
my $sse_app = async sub {
my ($scope, $receive, $send) = @_;
die "Expected sse scope" unless $scope->{type} eq 'sse';
await $send->({
type => 'sse.start',
status => 200,
headers => [],
});
await $send->({
type => 'sse.send',
event => 'connected',
data => '{"subscriber_id":1}',
});
await $send->({
type => 'sse.send',
event => 'update',
data => '{"count":42}',
id => 'msg-1',
});
};
# Test it
my $client = PAGI::Test::Client->new(app => $sse_app);
$client->sse('/events', sub {
my ($sse) = @_;
my $event1 = $sse->receive_event;
is $event1->{event}, 'connected', 'first event type';
my $event2 = $sse->receive_event;
is $event2->{event}, 'update', 'second event type';
is $event2->{id}, 'msg-1', 'event id';
});
SEE ALSO
PAGI::Test::Client, PAGI::Test::Response, PAGI::Test::WebSocket
AUTHOR
PAGI Contributors