NAME
PAGI::WebSocket - Convenience wrapper for PAGI WebSocket connections
SYNOPSIS
use PAGI::WebSocket;
use Future::AsyncAwait;
# Simple echo server
async sub app {
my ($scope, $receive, $send) = @_;
my $ws = PAGI::WebSocket->new($scope, $receive, $send);
await $ws->accept;
await $ws->each_text(async sub {
my ($text) = @_;
await $ws->send_text("Echo: $text");
});
}
# JSON API with cleanup
async sub json_app {
my ($scope, $receive, $send) = @_;
my $ws = PAGI::WebSocket->new($scope, $receive, $send);
await $ws->accept(subprotocol => 'json');
my $user_id = generate_id();
# Cleanup runs on any disconnect
$ws->on_close(async sub {
my ($code, $reason) = @_;
await remove_user($user_id);
log_disconnect($user_id, $code);
});
await $ws->each_json(async sub {
my ($data) = @_;
if ($data->{type} eq 'ping') {
await $ws->send_json({ type => 'pong' });
}
});
}
# Callback-based style (alternative to iteration)
async sub callback_app {
my ($scope, $receive, $send) = @_;
my $ws = PAGI::WebSocket->new($scope, $receive, $send);
await $ws->accept;
my $stash = PAGI::Stash->new($ws);
$stash->set(user => 'anonymous');
$ws->on(message => sub {
my ($data) = @_;
$ws->send_text("Echo: $data");
});
$ws->on(error => sub {
my ($error) = @_;
warn "WebSocket error: $error";
});
$ws->on(close => sub {
print "User disconnected\n";
});
await $ws->run;
}
DESCRIPTION
PAGI::WebSocket wraps the raw PAGI WebSocket protocol to provide a clean, high-level API inspired by Starlette's WebSocket class. It eliminates protocol boilerplate and provides:
Typed send/receive methods (text, bytes, JSON)
Connection state tracking (is_connected, is_closed, close_code)
Cleanup and error callback registration (on_close, on_error)
Safe send methods for broadcast scenarios (try_send_*, send_*_if_connected)
Message iteration helpers (each_text, each_json)
Callback-based event handling (on, run)
Per-connection storage (via PAGI::Stash)
CONSTRUCTOR
new
my $ws = PAGI::WebSocket->new($scope, $receive, $send);
Creates a new WebSocket wrapper. Requires:
$scope- PAGI scope hashref withtype => 'websocket'$receive- Async coderef returning Futures for events$send- Async coderef for sending events
Dies if scope type is not 'websocket'.
Singleton pattern: The WebSocket object is cached in $scope->{'pagi.websocket'}. If you call new() multiple times with the same scope, you get the same WebSocket object back. This ensures consistent state (is_connected, is_closed, callbacks) across multiple code paths that may create WebSocket objects from the same scope.
SCOPE ACCESSORS
scope, path, raw_path, query_string, scheme, http_version
my $path = $ws->path; # /chat/room1
my $qs = $ws->query_string; # token=abc
my $scheme = $ws->scheme; # ws or wss
Standard PAGI scope properties with sensible defaults.
subprotocols
my $protos = $ws->subprotocols; # ['chat', 'json']
Returns arrayref of requested subprotocols.
client, server
my $client = $ws->client; # ['192.168.1.1', 54321]
Client and server address info.
header, headers, header_all
my $origin = $ws->header('origin');
my $all_cookies = $ws->header_all('cookie');
my $hmv = $ws->headers; # Hash::MultiValue
Case-insensitive header access.
Per-Connection Shared State
See PAGI::Stash for per-connection shared state:
use PAGI::Stash;
my $stash = PAGI::Stash->new($ws);
state
my $db = $ws->state->{db};
my $config = $ws->state->{config};
Application state hashref injected by PAGI::Lifespan. Read-only access to shared application state. Returns empty hashref if not set.
Note: This is separate from PAGI::Stash (per-connection data) and connection_state (internal WebSocket state).
path_param
my $id = $ws->path_param('id');
Returns a path parameter by name. Path parameters are captured from the URL path by a router and stored in $scope->{path_params}.
# Route: /chat/:room
my $room = $ws->path_param('room');
path_params
my $params = $ws->path_params; # { room => 'general', id => '42' }
Returns hashref of all path parameters from scope.
query_params
my $params = $ws->query_params; # Hash::MultiValue
my $params = $ws->query_params(strict => 1); # Die on invalid UTF-8
my $params = $ws->query_params(raw => 1); # Skip UTF-8 decoding
Get query parameters as Hash::MultiValue.
Options:
strict- If true, die on invalid UTF-8 sequences. Default: false (invalid bytes replaced with U+FFFD).raw- If true, skip UTF-8 decoding entirely and return raw bytes. Default: false.
query
my $value = $ws->query('user');
my $value = $ws->query('page', strict => 1);
my $value = $ws->query('id', raw => 1);
Shortcut for $ws->query_params(%opts)->get($name). Accepts the same strict and raw options as query_params.
raw_query_params
my $params = $ws->raw_query_params;
Returns query params without UTF-8 decoding. Equivalent to $ws->query_params(raw => 1).
raw_query
my $value = $ws->raw_query('user');
Returns a single query param without UTF-8 decoding. Equivalent to $ws->query($name, raw => 1).
LIFECYCLE METHODS
accept
await $ws->accept;
await $ws->accept(subprotocol => 'chat');
await $ws->accept(headers => [['x-custom', 'value']]);
Accepts the WebSocket connection. Optionally specify a subprotocol to use and additional response headers.
close
await $ws->close;
await $ws->close(1000, 'Normal closure');
await $ws->close(4000, 'Custom reason');
Closes the connection. Default code is 1000 (normal closure). Idempotent - calling multiple times only sends close once.
STATE ACCESSORS
is_connected, is_closed, connection_state
if ($ws->is_connected) { ... }
if ($ws->is_closed) { ... }
my $state = $ws->connection_state; # 'connecting', 'connected', 'closed'
close_code, close_reason
my $code = $ws->close_code; # 1000, 1001, etc.
my $reason = $ws->close_reason; # 'Normal closure'
Available after connection closes. Defaults: code=1005, reason=''.
SEND METHODS
send_text, send_bytes, send_json
await $ws->send_text("Hello!");
await $ws->send_bytes("\x00\x01\x02");
await $ws->send_json({ action => 'greet', name => 'Alice' });
Send a message. Dies if connection is closed.
try_send_text, try_send_bytes, try_send_json
my $sent = await $ws->try_send_json($data);
if (!$sent) {
# Client disconnected
cleanup_user($id);
}
Returns true if sent, false if failed or closed. Does not throw. Useful for broadcasting to multiple clients.
send_text_if_connected, send_bytes_if_connected, send_json_if_connected
await $ws->send_json_if_connected($data);
Silent no-op if connection is closed. Useful for fire-and-forget.
RECEIVE METHODS
receive
my $event = await $ws->receive;
Returns raw PAGI event hashref, or undef on disconnect.
receive_text, receive_bytes
my $text = await $ws->receive_text;
my $bytes = await $ws->receive_bytes;
Waits for specific frame type, skipping others. Returns undef on disconnect.
receive_json
my $data = await $ws->receive_json;
Receives text frame and decodes as JSON. Dies on invalid JSON.
ITERATION HELPERS
each_message, each_text, each_bytes, each_json
await $ws->each_text(async sub {
my ($text) = @_;
await $ws->send_text("Got: $text");
});
await $ws->each_json(async sub {
my ($data) = @_;
if ($data->{type} eq 'ping') {
await $ws->send_json({ type => 'pong' });
}
});
Loops until disconnect, calling callback for each message. Exceptions in callback propagate to caller.
EVENT CALLBACKS
on_close
# Simple sync callback
$ws->on_close(sub {
my ($code, $reason) = @_;
print "Disconnected: $code\n";
});
# Async callback for cleanup that needs await
$ws->on_close(async sub {
my ($code, $reason) = @_;
await cleanup_resources();
});
Registers cleanup callback that runs on disconnect or close(). Callbacks can be regular subs or async subs — async results are automatically awaited. Multiple callbacks run in registration order. Exceptions are caught and warned but don't prevent other callbacks.
Returns $self for chaining.
Circular reference note: If your callback captures $ws in a closure, use Scalar::Util::weaken to avoid a memory leak:
use Scalar::Util qw(weaken);
my $weak_ws = $ws;
weaken($weak_ws);
$ws->on_close(sub { $weak_ws->... if $weak_ws });
The callback arrays are cleared after firing, so cycles via closures are broken at connection close, but weaken prevents the object from being kept alive until that point.
on_error
$ws->on_error(sub {
my ($error) = @_;
warn "WebSocket error: $error";
});
# Async callback — return value is awaited automatically
$ws->on_error(async sub {
my ($error) = @_;
await log_error_async($error);
});
Registers error callback. Called when exceptions occur in message handlers during run(). Callbacks can be regular subs or async subs — async results are automatically awaited. Multiple callbacks run in registration order. Exceptions in callbacks are caught and warned but do not prevent other callbacks.
If no error handlers are registered, errors are warned to STDERR.
Returns $self for chaining.
on_message
$ws->on_message(sub {
my ($data, $event) = @_;
# $data is text or bytes, $event is the raw PAGI event hashref
# Check $event->{text} vs $event->{bytes} to distinguish frame type
});
Registers a message callback for use with run(). Multiple callbacks can be registered and all will be called for each message.
Returns $self for chaining.
on
# Generic Socket.IO-style event registration
$ws->on(message => sub { my ($data, $event) = @_; ... });
$ws->on(close => sub { my ($code, $reason) = @_; ... });
$ws->on(error => sub { my ($error) = @_; ... });
# Methods return $self, so calls can be chained
$ws->on(message => sub { ... })
->on(close => sub { ... })
->on(error => sub { ... });
Generic event registration. Dispatches to on_message, on_close, or on_error based on the event name. Dies for unknown event types.
Returns $self for chaining.
run
# Register callbacks first
$ws->on(message => sub { my ($data) = @_; ... });
$ws->on(close => sub { ... });
# Enter event loop
await $ws->run;
Callback-based event loop (alternative to each_* iteration). Runs until disconnect, dispatching messages to registered callbacks. Errors in callbacks are caught and passed to error handlers.
KEEPALIVE
WebSocket keepalive uses protocol-level ping/pong frames (RFC 6455). The server sends ping frames automatically; clients respond with pong frames without any application code needed.
keepalive
await $ws->keepalive(30); # Ping every 30 seconds
await $ws->keepalive(30, 20); # Ping every 30s, expect pong within 20s
await $ws->keepalive(0); # Disable keepalive
Enables or disables WebSocket protocol-level keepalive by sending a websocket.keepalive event to the server. The server handles the timer and ping/pong frames.
Arguments:
$interval- Seconds between ping frames. Use0to disable.$timeout- (Optional) Seconds to wait for pong response. If no pong is received within this time, the connection is closed with code 1006 and the application receives a disconnect event withreason => 'keepalive timeout'.
Common intervals:
Returns $self for chaining.
COMPLETE EXAMPLE
use PAGI::WebSocket;
use Future::AsyncAwait;
my %connections;
async sub chat_app {
my ($scope, $receive, $send) = @_;
my $ws = PAGI::WebSocket->new($scope, $receive, $send);
await $ws->accept;
my $user_id = generate_id();
$connections{$user_id} = $ws;
$ws->on_close(async sub {
delete $connections{$user_id};
await broadcast({ type => 'leave', user => $user_id });
});
await broadcast({ type => 'join', user => $user_id });
await $ws->each_json(async sub {
my ($data) = @_;
$data->{from} = $user_id;
await broadcast($data);
});
}
async sub broadcast {
my ($data) = @_;
for my $ws (values %connections) {
await $ws->try_send_json($data);
}
}
SEE ALSO
PAGI::Request - Similar convenience wrapper for HTTP requests
PAGI::Server - PAGI protocol server
AUTHOR
PAGI Contributors
1 POD Error
The following errors were encountered while parsing the POD:
- Around line 982:
Non-ASCII character seen before =encoding in '—'. Assuming UTF-8