NAME

PAGI::WebSocket - Convenience wrapper for PAGI WebSocket connections

SYNOPSIS

use PAGI::WebSocket;
use Future::AsyncAwait;

# Simple echo server
async sub app {
    my ($scope, $receive, $send) = @_;

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    await $ws->each_text(async sub {
        my ($text) = @_;
        await $ws->send_text("Echo: $text");
    });
}

# JSON API with cleanup
async sub json_app {
    my ($scope, $receive, $send) = @_;

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept(subprotocol => 'json');

    my $user_id = generate_id();

    # Cleanup runs on any disconnect
    $ws->on_close(async sub {
        my ($code, $reason) = @_;
        await remove_user($user_id);
        log_disconnect($user_id, $code);
    });

    await $ws->each_json(async sub {
        my ($data) = @_;

        if ($data->{type} eq 'ping') {
            await $ws->send_json({ type => 'pong' });
        }
    });
}

# Callback-based style (alternative to iteration)
async sub callback_app {
    my ($scope, $receive, $send) = @_;

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    $ws->stash->{user} = 'anonymous';

    $ws->on(message => sub {
        my ($data) = @_;
        $ws->send_text("Echo: $data");
    });

    $ws->on(error => sub {
        my ($error) = @_;
        warn "WebSocket error: $error";
    });

    $ws->on(close => sub {
        print "User disconnected\n";
    });

    await $ws->run;
}

DESCRIPTION

PAGI::WebSocket wraps the raw PAGI WebSocket protocol to provide a clean, high-level API inspired by Starlette's WebSocket class. It eliminates protocol boilerplate and provides:

  • Typed send/receive methods (text, bytes, JSON)

  • Connection state tracking (is_connected, is_closed, close_code)

  • Cleanup and error callback registration (on_close, on_error)

  • Safe send methods for broadcast scenarios (try_send_*, send_*_if_connected)

  • Message iteration helpers (each_text, each_json)

  • Callback-based event handling (on, run)

  • Per-connection storage (stash)

  • Timeout support for receives

CONSTRUCTOR

new

my $ws = PAGI::WebSocket->new($scope, $receive, $send);

Creates a new WebSocket wrapper. Requires:

  • $scope - PAGI scope hashref with type = 'websocket'>

  • $receive - Async coderef returning Futures for events

  • $send - Async coderef for sending events

Dies if scope type is not 'websocket'.

SCOPE ACCESSORS

scope, path, raw_path, query_string, scheme, http_version

my $path = $ws->path;              # /chat/room1
my $qs = $ws->query_string;        # token=abc
my $scheme = $ws->scheme;          # ws or wss

Standard PAGI scope properties with sensible defaults.

subprotocols

my $protos = $ws->subprotocols;    # ['chat', 'json']

Returns arrayref of requested subprotocols.

client, server

my $client = $ws->client;          # ['192.168.1.1', 54321]

Client and server address info.

header, headers, header_all

my $origin = $ws->header('origin');
my $all_cookies = $ws->header_all('cookie');
my $hmv = $ws->headers;            # Hash::MultiValue

Case-insensitive header access.

stash

$ws->stash->{user} = $user;
my $room = $ws->stash->{current_room};

Per-connection storage hashref. Useful for storing user data without external variables.

state

my $db = $ws->state->{db};
my $config = $ws->state->{config};

Application state hashref injected by PAGI::Lifespan. Read-only access to shared application state. Returns empty hashref if not set.

Note: This is separate from stash (per-connection data) and connection_state (internal WebSocket state).

path_param

my $id = $ws->path_param('id');

Returns a path parameter by name. Path parameters are captured from the URL path by a router and stored in $scope->{path_params}.

# Route: /chat/:room
my $room = $ws->path_param('room');

path_params

my $params = $ws->path_params;  # { room => 'general', id => '42' }

Returns hashref of all path parameters from scope.

LIFECYCLE METHODS

accept

await $ws->accept;
await $ws->accept(subprotocol => 'chat');
await $ws->accept(headers => [['x-custom', 'value']]);

Accepts the WebSocket connection. Optionally specify a subprotocol to use and additional response headers.

close

await $ws->close;
await $ws->close(1000, 'Normal closure');
await $ws->close(4000, 'Custom reason');

Closes the connection. Default code is 1000 (normal closure). Idempotent - calling multiple times only sends close once.

STATE ACCESSORS

is_connected, is_closed, connection_state

if ($ws->is_connected) { ... }
if ($ws->is_closed) { ... }
my $state = $ws->connection_state; # 'connecting', 'connected', 'closed'

close_code, close_reason

my $code = $ws->close_code;        # 1000, 1001, etc.
my $reason = $ws->close_reason;    # 'Normal closure'

Available after connection closes. Defaults: code=1005, reason=''.

SEND METHODS

send_text, send_bytes, send_json

await $ws->send_text("Hello!");
await $ws->send_bytes("\x00\x01\x02");
await $ws->send_json({ action => 'greet', name => 'Alice' });

Send a message. Dies if connection is closed.

try_send_text, try_send_bytes, try_send_json

my $sent = await $ws->try_send_json($data);
if (!$sent) {
    # Client disconnected
    cleanup_user($id);
}

Returns true if sent, false if failed or closed. Does not throw. Useful for broadcasting to multiple clients.

send_text_if_connected, send_bytes_if_connected, send_json_if_connected

await $ws->send_json_if_connected($data);

Silent no-op if connection is closed. Useful for fire-and-forget.

RECEIVE METHODS

receive

my $event = await $ws->receive;

Returns raw PAGI event hashref, or undef on disconnect.

receive_text, receive_bytes

my $text = await $ws->receive_text;
my $bytes = await $ws->receive_bytes;

Waits for specific frame type, skipping others. Returns undef on disconnect.

receive_json

my $data = await $ws->receive_json;

Receives text frame and decodes as JSON. Dies on invalid JSON.

receive_with_timeout, receive_text_with_timeout, etc.

my $event = await $ws->receive_with_timeout(30);  # 30 seconds

Returns undef on timeout (connection remains open).

ITERATION HELPERS

each_message, each_text, each_bytes, each_json

await $ws->each_text(async sub {
    my ($text) = @_;
    await $ws->send_text("Got: $text");
});

await $ws->each_json(async sub {
    my ($data) = @_;
    if ($data->{type} eq 'ping') {
        await $ws->send_json({ type => 'pong' });
    }
});

Loops until disconnect, calling callback for each message. Exceptions in callback propagate to caller.

EVENT CALLBACKS

on_close

# Simple sync callback
$ws->on_close(sub {
    my ($code, $reason) = @_;
    print "Disconnected: $code\n";
});

# Async callback for cleanup that needs await
$ws->on_close(async sub {
    my ($code, $reason) = @_;
    await cleanup_resources();
});

Registers cleanup callback that runs on disconnect or close(). Callbacks can be regular subs or async subs - async results are automatically awaited. Multiple callbacks run in registration order. Exceptions are caught and warned but don't prevent other callbacks.

on_error

$ws->on_error(sub {
    my ($error) = @_;
    warn "WebSocket error: $error";
});

Registers error callback. Called when exceptions occur in message handlers during run(). If no error handlers are registered, errors are warned to STDERR.

on_message, on

$ws->on_message(sub {
    my ($data, $event) = @_;
    # $data is text or bytes, $event is raw PAGI event
});

# Generic form (Socket.IO style)
$ws->on(message => sub { ... });
$ws->on(close => sub { ... });
$ws->on(error => sub { ... });

Registers message callback for use with run(). Multiple callbacks can be registered for each event type.

run

# Register callbacks first
$ws->on(message => sub { my ($data) = @_; ... });
$ws->on(close => sub { ... });

# Enter event loop
await $ws->run;

Callback-based event loop (alternative to each_* iteration). Runs until disconnect, dispatching messages to registered callbacks. Errors in callbacks are caught and passed to error handlers.

HEARTBEAT / KEEPALIVE

start_heartbeat

$ws->start_heartbeat(25);  # Ping every 25 seconds

Starts sending periodic JSON ping messages to keep the connection alive. Useful for preventing proxy/NAT timeout on idle connections.

The ping message format is:

{ "type": "ping", "ts": <unix_timestamp> }

Common intervals:

25 - Safe for most proxies (30s timeout common)
55 - Safe for aggressive proxies (60s timeout)

Automatically stops when connection closes. Returns $self for chaining.

stop_heartbeat

$ws->stop_heartbeat;

Manually stops the heartbeat timer. Called automatically on connection close. Returns $self for chaining.

COMPLETE EXAMPLE

use PAGI::WebSocket;
use Future::AsyncAwait;

my %connections;

async sub chat_app {
    my ($scope, $receive, $send) = @_;

    my $ws = PAGI::WebSocket->new($scope, $receive, $send);
    await $ws->accept;

    my $user_id = generate_id();
    $connections{$user_id} = $ws;

    $ws->on_close(async sub {
        delete $connections{$user_id};
        await broadcast({ type => 'leave', user => $user_id });
    });

    await broadcast({ type => 'join', user => $user_id });

    await $ws->each_json(async sub {
        my ($data) = @_;
        $data->{from} = $user_id;
        await broadcast($data);
    });
}

async sub broadcast {
    my ($data) = @_;
    for my $ws (values %connections) {
        await $ws->try_send_json($data);
    }
}

SEE ALSO

PAGI::Request - Similar convenience wrapper for HTTP requests

PAGI::Server - PAGI protocol server

AUTHOR

PAGI Contributors