NAME

PAGI::WebSocket - Convenience wrapper for PAGI WebSocket connections

SYNOPSIS

use PAGI::WebSocket;
use Future::AsyncAwait;

# Simple echo server
async sub app {
    my ($scope, $receive, $send) = @_;

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    await $ws->each_text(async sub {
        my ($text) = @_;
        await $ws->send_text("Echo: $text");
    });
}

# JSON API with cleanup
async sub json_app {
    my ($scope, $receive, $send) = @_;

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept(subprotocol => 'json');

    my $user_id = generate_id();

    # Cleanup runs on any disconnect
    $ws->on_close(async sub {
        my ($code, $reason) = @_;
        await remove_user($user_id);
        log_disconnect($user_id, $code);
    });

    await $ws->each_json(async sub {
        my ($data) = @_;

        if ($data->{type} eq 'ping') {
            await $ws->send_json({ type => 'pong' });
        }
    });
}

# Callback-based style (alternative to iteration)
async sub callback_app {
    my ($scope, $receive, $send) = @_;

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    my $stash = PAGI::Stash->new($ws);
    $stash->set(user => 'anonymous');

    $ws->on(message => sub {
        my ($data) = @_;
        $ws->send_text("Echo: $data");
    });

    $ws->on(error => sub {
        my ($error) = @_;
        warn "WebSocket error: $error";
    });

    $ws->on(close => sub {
        print "User disconnected\n";
    });

    await $ws->run;
}

DESCRIPTION

PAGI::WebSocket wraps the raw PAGI WebSocket protocol to provide a clean, high-level API inspired by Starlette's WebSocket class. It eliminates protocol boilerplate and provides:

  • Typed send/receive methods (text, bytes, JSON)

  • Connection state tracking (is_connected, is_closed, close_code)

  • Cleanup and error callback registration (on_close, on_error)

  • Safe send methods for broadcast scenarios (try_send_*, send_*_if_connected)

  • Message iteration helpers (each_text, each_json)

  • Callback-based event handling (on, run)

  • Per-connection storage (via PAGI::Stash)

CONSTRUCTOR

new

my $ws = PAGI::WebSocket->new($scope, $receive, $send);

Creates a new WebSocket wrapper. Requires:

  • $scope - PAGI scope hashref with type => 'websocket'

  • $receive - Async coderef returning Futures for events

  • $send - Async coderef for sending events

Dies if scope type is not 'websocket'.

Singleton pattern: The WebSocket object is cached in $scope->{'pagi.websocket'}. If you call new() multiple times with the same scope, you get the same WebSocket object back. This ensures consistent state (is_connected, is_closed, callbacks) across multiple code paths that may create WebSocket objects from the same scope.

SCOPE ACCESSORS

scope, path, raw_path, query_string, scheme, http_version

my $path = $ws->path;              # /chat/room1
my $qs = $ws->query_string;        # token=abc
my $scheme = $ws->scheme;          # ws or wss

Standard PAGI scope properties with sensible defaults.

subprotocols

my $protos = $ws->subprotocols;    # ['chat', 'json']

Returns arrayref of requested subprotocols.

client, server

my $client = $ws->client;          # ['192.168.1.1', 54321]

Client and server address info.

header, headers, header_all

my $origin = $ws->header('origin');
my $all_cookies = $ws->header_all('cookie');
my $hmv = $ws->headers;            # Hash::MultiValue

Case-insensitive header access.

Per-Connection Shared State

See PAGI::Stash for per-connection shared state:

use PAGI::Stash;
my $stash = PAGI::Stash->new($ws);

state

my $db = $ws->state->{db};
my $config = $ws->state->{config};

Application state hashref injected by PAGI::Lifespan. Read-only access to shared application state. Returns empty hashref if not set.

Note: This is separate from PAGI::Stash (per-connection data) and connection_state (internal WebSocket state).

path_param

my $id = $ws->path_param('id');

Returns a path parameter by name. Path parameters are captured from the URL path by a router and stored in $scope->{path_params}.

# Route: /chat/:room
my $room = $ws->path_param('room');

path_params

my $params = $ws->path_params;  # { room => 'general', id => '42' }

Returns hashref of all path parameters from scope.

query_params

my $params = $ws->query_params;  # Hash::MultiValue
my $params = $ws->query_params(strict => 1);  # Die on invalid UTF-8
my $params = $ws->query_params(raw => 1);     # Skip UTF-8 decoding

Get query parameters as Hash::MultiValue.

Options:

  • strict - If true, die on invalid UTF-8 sequences. Default: false (invalid bytes replaced with U+FFFD).

  • raw - If true, skip UTF-8 decoding entirely and return raw bytes. Default: false.

query

my $value = $ws->query('user');
my $value = $ws->query('page', strict => 1);
my $value = $ws->query('id', raw => 1);

Shortcut for $ws->query_params(%opts)->get($name). Accepts the same strict and raw options as query_params.

raw_query_params

my $params = $ws->raw_query_params;

Returns query params without UTF-8 decoding. Equivalent to $ws->query_params(raw => 1).

raw_query

my $value = $ws->raw_query('user');

Returns a single query param without UTF-8 decoding. Equivalent to $ws->query($name, raw => 1).

LIFECYCLE METHODS

accept

await $ws->accept;
await $ws->accept(subprotocol => 'chat');
await $ws->accept(headers => [['x-custom', 'value']]);

Accepts the WebSocket connection. Optionally specify a subprotocol to use and additional response headers.

close

await $ws->close;
await $ws->close(1000, 'Normal closure');
await $ws->close(4000, 'Custom reason');

Closes the connection. Default code is 1000 (normal closure). Idempotent - calling multiple times only sends close once.

STATE ACCESSORS

is_connected, is_closed, connection_state

if ($ws->is_connected) { ... }
if ($ws->is_closed) { ... }
my $state = $ws->connection_state; # 'connecting', 'connected', 'closed'

close_code, close_reason

my $code = $ws->close_code;        # 1000, 1001, etc.
my $reason = $ws->close_reason;    # 'Normal closure'

Available after connection closes. Defaults: code=1005, reason=''.

SEND METHODS

send_text, send_bytes, send_json

await $ws->send_text("Hello!");
await $ws->send_bytes("\x00\x01\x02");
await $ws->send_json({ action => 'greet', name => 'Alice' });

Send a message. Dies if connection is closed.

try_send_text, try_send_bytes, try_send_json

my $sent = await $ws->try_send_json($data);
if (!$sent) {
    # Client disconnected
    cleanup_user($id);
}

Returns true if sent, false if failed or closed. Does not throw. Useful for broadcasting to multiple clients.

send_text_if_connected, send_bytes_if_connected, send_json_if_connected

await $ws->send_json_if_connected($data);

Silent no-op if connection is closed. Useful for fire-and-forget.

RECEIVE METHODS

receive

my $event = await $ws->receive;

Returns raw PAGI event hashref, or undef on disconnect.

receive_text, receive_bytes

my $text = await $ws->receive_text;
my $bytes = await $ws->receive_bytes;

Waits for specific frame type, skipping others. Returns undef on disconnect.

receive_json

my $data = await $ws->receive_json;

Receives text frame and decodes as JSON. Dies on invalid JSON.

ITERATION HELPERS

each_message, each_text, each_bytes, each_json

await $ws->each_text(async sub {
    my ($text) = @_;
    await $ws->send_text("Got: $text");
});

await $ws->each_json(async sub {
    my ($data) = @_;
    if ($data->{type} eq 'ping') {
        await $ws->send_json({ type => 'pong' });
    }
});

Loops until disconnect, calling callback for each message. Exceptions in callback propagate to caller.

EVENT CALLBACKS

on_close

# Simple sync callback
$ws->on_close(sub {
    my ($code, $reason) = @_;
    print "Disconnected: $code\n";
});

# Async callback for cleanup that needs await
$ws->on_close(async sub {
    my ($code, $reason) = @_;
    await cleanup_resources();
});

Registers cleanup callback that runs on disconnect or close(). Callbacks can be regular subs or async subs — async results are automatically awaited. Multiple callbacks run in registration order. Exceptions are caught and warned but don't prevent other callbacks.

Returns $self for chaining.

Circular reference note: If your callback captures $ws in a closure, use Scalar::Util::weaken to avoid a memory leak:

use Scalar::Util qw(weaken);
my $weak_ws = $ws;
weaken($weak_ws);
$ws->on_close(sub { $weak_ws->... if $weak_ws });

The callback arrays are cleared after firing, so cycles via closures are broken at connection close, but weaken prevents the object from being kept alive until that point.

on_error

$ws->on_error(sub {
    my ($error) = @_;
    warn "WebSocket error: $error";
});

# Async callback — return value is awaited automatically
$ws->on_error(async sub {
    my ($error) = @_;
    await log_error_async($error);
});

Registers error callback. Called when exceptions occur in message handlers during run(). Callbacks can be regular subs or async subs — async results are automatically awaited. Multiple callbacks run in registration order. Exceptions in callbacks are caught and warned but do not prevent other callbacks.

If no error handlers are registered, errors are warned to STDERR.

Returns $self for chaining.

on_message

$ws->on_message(sub {
    my ($data, $event) = @_;
    # $data is text or bytes, $event is the raw PAGI event hashref
    # Check $event->{text} vs $event->{bytes} to distinguish frame type
});

Registers a message callback for use with run(). Multiple callbacks can be registered and all will be called for each message.

Returns $self for chaining.

on

# Generic Socket.IO-style event registration
$ws->on(message => sub { my ($data, $event) = @_; ... });
$ws->on(close   => sub { my ($code, $reason) = @_; ... });
$ws->on(error   => sub { my ($error) = @_; ... });

# Methods return $self, so calls can be chained
$ws->on(message => sub { ... })
   ->on(close   => sub { ... })
   ->on(error   => sub { ... });

Generic event registration. Dispatches to on_message, on_close, or on_error based on the event name. Dies for unknown event types.

Returns $self for chaining.

run

# Register callbacks first
$ws->on(message => sub { my ($data) = @_; ... });
$ws->on(close => sub { ... });

# Enter event loop
await $ws->run;

Callback-based event loop (alternative to each_* iteration). Runs until disconnect, dispatching messages to registered callbacks. Errors in callbacks are caught and passed to error handlers.

KEEPALIVE

WebSocket keepalive uses protocol-level ping/pong frames (RFC 6455). The server sends ping frames automatically; clients respond with pong frames without any application code needed.

keepalive

await $ws->keepalive(30);       # Ping every 30 seconds
await $ws->keepalive(30, 20);   # Ping every 30s, expect pong within 20s
await $ws->keepalive(0);        # Disable keepalive

Enables or disables WebSocket protocol-level keepalive by sending a websocket.keepalive event to the server. The server handles the timer and ping/pong frames.

Arguments:

$interval - Seconds between ping frames. Use 0 to disable.
$timeout - (Optional) Seconds to wait for pong response. If no pong is received within this time, the connection is closed with code 1006 and the application receives a disconnect event with reason => 'keepalive timeout'.

Common intervals:

25 - Safe for most proxies (30s timeout common)
55 - Safe for aggressive proxies (60s timeout)

Returns $self for chaining.

COMPLETE EXAMPLE

use PAGI::WebSocket;
use Future::AsyncAwait;

my %connections;

async sub chat_app {
    my ($scope, $receive, $send) = @_;

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    my $user_id = generate_id();
    $connections{$user_id} = $ws;

    $ws->on_close(async sub {
        delete $connections{$user_id};
        await broadcast({ type => 'leave', user => $user_id });
    });

    await broadcast({ type => 'join', user => $user_id });

    await $ws->each_json(async sub {
        my ($data) = @_;
        $data->{from} = $user_id;
        await broadcast($data);
    });
}

async sub broadcast {
    my ($data) = @_;
    for my $ws (values %connections) {
        await $ws->try_send_json($data);
    }
}

SEE ALSO

PAGI::Request - Similar convenience wrapper for HTTP requests

PAGI::Server - PAGI protocol server

AUTHOR

PAGI Contributors

1 POD Error

The following errors were encountered while parsing the POD:

Around line 982:

Non-ASCII character seen before =encoding in '—'. Assuming UTF-8