NAME
PAGI::WebSocket - Convenience wrapper for PAGI WebSocket connections
SYNOPSIS
use PAGI::WebSocket;
use Future::AsyncAwait;
# Simple echo server
async sub app {
my ($scope, $receive, $send) = @_;
my $ws = PAGI::WebSocket->new($scope, $receive, $send);
await $ws->accept;
await $ws->each_text(async sub {
my ($text) = @_;
await $ws->send_text("Echo: $text");
});
}
# JSON API with cleanup
async sub json_app {
my ($scope, $receive, $send) = @_;
my $ws = PAGI::WebSocket->new($scope, $receive, $send);
await $ws->accept(subprotocol => 'json');
my $user_id = generate_id();
# Cleanup runs on any disconnect
$ws->on_close(async sub {
my ($code, $reason) = @_;
await remove_user($user_id);
log_disconnect($user_id, $code);
});
await $ws->each_json(async sub {
my ($data) = @_;
if ($data->{type} eq 'ping') {
await $ws->send_json({ type => 'pong' });
}
});
}
# Callback-based style (alternative to iteration)
async sub callback_app {
my ($scope, $receive, $send) = @_;
my $ws = PAGI::WebSocket->new($scope, $receive, $send);
await $ws->accept;
$ws->stash->{user} = 'anonymous';
$ws->on(message => sub {
my ($data) = @_;
$ws->send_text("Echo: $data");
});
$ws->on(error => sub {
my ($error) = @_;
warn "WebSocket error: $error";
});
$ws->on(close => sub {
print "User disconnected\n";
});
await $ws->run;
}
DESCRIPTION
PAGI::WebSocket wraps the raw PAGI WebSocket protocol to provide a clean, high-level API inspired by Starlette's WebSocket class. It eliminates protocol boilerplate and provides:
Typed send/receive methods (text, bytes, JSON)
Connection state tracking (is_connected, is_closed, close_code)
Cleanup and error callback registration (on_close, on_error)
Safe send methods for broadcast scenarios (try_send_*, send_*_if_connected)
Message iteration helpers (each_text, each_json)
Callback-based event handling (on, run)
Per-connection storage (stash)
Timeout support for receives
CONSTRUCTOR
new
my $ws = PAGI::WebSocket->new($scope, $receive, $send);
Creates a new WebSocket wrapper. Requires:
$scope- PAGI scope hashref withtype ='websocket'>$receive- Async coderef returning Futures for events$send- Async coderef for sending events
Dies if scope type is not 'websocket'.
SCOPE ACCESSORS
scope, path, raw_path, query_string, scheme, http_version
my $path = $ws->path; # /chat/room1
my $qs = $ws->query_string; # token=abc
my $scheme = $ws->scheme; # ws or wss
Standard PAGI scope properties with sensible defaults.
subprotocols
my $protos = $ws->subprotocols; # ['chat', 'json']
Returns arrayref of requested subprotocols.
client, server
my $client = $ws->client; # ['192.168.1.1', 54321]
Client and server address info.
header, headers, header_all
my $origin = $ws->header('origin');
my $all_cookies = $ws->header_all('cookie');
my $hmv = $ws->headers; # Hash::MultiValue
Case-insensitive header access.
stash
$ws->stash->{user} = $user;
my $room = $ws->stash->{current_room};
Per-connection storage hashref. Useful for storing user data without external variables.
state
my $db = $ws->state->{db};
my $config = $ws->state->{config};
Application state hashref injected by PAGI::Lifespan. Read-only access to shared application state. Returns empty hashref if not set.
Note: This is separate from stash (per-connection data) and connection_state (internal WebSocket state).
path_param
my $id = $ws->path_param('id');
Returns a path parameter by name. Path parameters are captured from the URL path by a router and stored in $scope->{path_params}.
# Route: /chat/:room
my $room = $ws->path_param('room');
path_params
my $params = $ws->path_params; # { room => 'general', id => '42' }
Returns hashref of all path parameters from scope.
LIFECYCLE METHODS
accept
await $ws->accept;
await $ws->accept(subprotocol => 'chat');
await $ws->accept(headers => [['x-custom', 'value']]);
Accepts the WebSocket connection. Optionally specify a subprotocol to use and additional response headers.
close
await $ws->close;
await $ws->close(1000, 'Normal closure');
await $ws->close(4000, 'Custom reason');
Closes the connection. Default code is 1000 (normal closure). Idempotent - calling multiple times only sends close once.
STATE ACCESSORS
is_connected, is_closed, connection_state
if ($ws->is_connected) { ... }
if ($ws->is_closed) { ... }
my $state = $ws->connection_state; # 'connecting', 'connected', 'closed'
close_code, close_reason
my $code = $ws->close_code; # 1000, 1001, etc.
my $reason = $ws->close_reason; # 'Normal closure'
Available after connection closes. Defaults: code=1005, reason=''.
SEND METHODS
send_text, send_bytes, send_json
await $ws->send_text("Hello!");
await $ws->send_bytes("\x00\x01\x02");
await $ws->send_json({ action => 'greet', name => 'Alice' });
Send a message. Dies if connection is closed.
try_send_text, try_send_bytes, try_send_json
my $sent = await $ws->try_send_json($data);
if (!$sent) {
# Client disconnected
cleanup_user($id);
}
Returns true if sent, false if failed or closed. Does not throw. Useful for broadcasting to multiple clients.
send_text_if_connected, send_bytes_if_connected, send_json_if_connected
await $ws->send_json_if_connected($data);
Silent no-op if connection is closed. Useful for fire-and-forget.
RECEIVE METHODS
receive
my $event = await $ws->receive;
Returns raw PAGI event hashref, or undef on disconnect.
receive_text, receive_bytes
my $text = await $ws->receive_text;
my $bytes = await $ws->receive_bytes;
Waits for specific frame type, skipping others. Returns undef on disconnect.
receive_json
my $data = await $ws->receive_json;
Receives text frame and decodes as JSON. Dies on invalid JSON.
receive_with_timeout, receive_text_with_timeout, etc.
my $event = await $ws->receive_with_timeout(30); # 30 seconds
Returns undef on timeout (connection remains open).
ITERATION HELPERS
each_message, each_text, each_bytes, each_json
await $ws->each_text(async sub {
my ($text) = @_;
await $ws->send_text("Got: $text");
});
await $ws->each_json(async sub {
my ($data) = @_;
if ($data->{type} eq 'ping') {
await $ws->send_json({ type => 'pong' });
}
});
Loops until disconnect, calling callback for each message. Exceptions in callback propagate to caller.
EVENT CALLBACKS
on_close
# Simple sync callback
$ws->on_close(sub {
my ($code, $reason) = @_;
print "Disconnected: $code\n";
});
# Async callback for cleanup that needs await
$ws->on_close(async sub {
my ($code, $reason) = @_;
await cleanup_resources();
});
Registers cleanup callback that runs on disconnect or close(). Callbacks can be regular subs or async subs - async results are automatically awaited. Multiple callbacks run in registration order. Exceptions are caught and warned but don't prevent other callbacks.
on_error
$ws->on_error(sub {
my ($error) = @_;
warn "WebSocket error: $error";
});
Registers error callback. Called when exceptions occur in message handlers during run(). If no error handlers are registered, errors are warned to STDERR.
on_message, on
$ws->on_message(sub {
my ($data, $event) = @_;
# $data is text or bytes, $event is raw PAGI event
});
# Generic form (Socket.IO style)
$ws->on(message => sub { ... });
$ws->on(close => sub { ... });
$ws->on(error => sub { ... });
Registers message callback for use with run(). Multiple callbacks can be registered for each event type.
run
# Register callbacks first
$ws->on(message => sub { my ($data) = @_; ... });
$ws->on(close => sub { ... });
# Enter event loop
await $ws->run;
Callback-based event loop (alternative to each_* iteration). Runs until disconnect, dispatching messages to registered callbacks. Errors in callbacks are caught and passed to error handlers.
HEARTBEAT / KEEPALIVE
start_heartbeat
$ws->start_heartbeat(25); # Ping every 25 seconds
Starts sending periodic JSON ping messages to keep the connection alive. Useful for preventing proxy/NAT timeout on idle connections.
The ping message format is:
{ "type": "ping", "ts": <unix_timestamp> }
Common intervals:
Automatically stops when connection closes. Returns $self for chaining.
stop_heartbeat
$ws->stop_heartbeat;
Manually stops the heartbeat timer. Called automatically on connection close. Returns $self for chaining.
COMPLETE EXAMPLE
use PAGI::WebSocket;
use Future::AsyncAwait;
my %connections;
async sub chat_app {
my ($scope, $receive, $send) = @_;
my $ws = PAGI::WebSocket->new($scope, $receive, $send);
await $ws->accept;
my $user_id = generate_id();
$connections{$user_id} = $ws;
$ws->on_close(async sub {
delete $connections{$user_id};
await broadcast({ type => 'leave', user => $user_id });
});
await broadcast({ type => 'join', user => $user_id });
await $ws->each_json(async sub {
my ($data) = @_;
$data->{from} = $user_id;
await broadcast($data);
});
}
async sub broadcast {
my ($data) = @_;
for my $ws (values %connections) {
await $ws->try_send_json($data);
}
}
SEE ALSO
PAGI::Request - Similar convenience wrapper for HTTP requests
PAGI::Server - PAGI protocol server
AUTHOR
PAGI Contributors