NAME

PAGI::Endpoint::SSE - Class-based Server-Sent Events endpoint handler

SYNOPSIS

package MyApp::Notifications;
use parent 'PAGI::Endpoint::SSE';
use Future::AsyncAwait;

sub keepalive_interval { 30 }

async sub on_connect {
    my ($self, $sse) = @_;
    my $user_id = $sse->stash->{user_id};

    # Send welcome event
    await $sse->send_event(
        event => 'connected',
        data  => { user_id => $user_id },
    );

    # Handle reconnection
    if (my $last_id = $sse->last_event_id) {
        await send_missed_events($sse, $last_id);
    }

    # Subscribe to notifications
    subscribe($user_id, sub {
        my ($event) = @_;
        $sse->try_send_json($event);
    });
}

sub on_disconnect {
    my ($self, $sse) = @_;
    unsubscribe($sse->stash->{user_id});
}

# Use with PAGI server
my $app = MyApp::Notifications->to_app;

DESCRIPTION

PAGI::Endpoint::SSE provides a class-based approach to handling Server-Sent Events connections with lifecycle hooks.

LIFECYCLE METHODS

on_connect

async sub on_connect {
    my ($self, $sse) = @_;
    await $sse->send_event(data => 'Hello!');
}

Called when a client connects. The SSE stream is automatically started before this is called. Use this to send initial events and set up subscriptions.

on_disconnect

sub on_disconnect {
    my ($self, $sse) = @_;
    # Cleanup subscriptions
}

Called when connection closes. This is synchronous (not async).

CLASS METHODS

keepalive_interval

sub keepalive_interval { 30 }

Seconds between keepalive pings. Set to 0 to disable (default). Keepalives prevent proxy timeouts on idle connections.

sse_class

sub sse_class { 'PAGI::SSE' }

Override to use a custom SSE wrapper.

to_app

my $app = MyEndpoint->to_app;

Returns a PAGI-compatible async coderef.

RECIPES

Multi-Process Broadcasting with Redis

The simple in-memory subscriber pattern only works with a single process:

my %subscribers;  # Lost when worker dies, not shared between workers

For multi-process deployments (e.g., pagi-server --workers 4), use Redis pub/sub as a message bus between workers. Each worker keeps its own local subscriber hash with real connection objects, and Redis broadcasts messages between workers.

package MyApp::Events;
use parent 'PAGI::Endpoint::SSE';
use Future::AsyncAwait;
use JSON::MaybeXS qw(encode_json decode_json);

my %subscribers;  # Local to this process
my $redis;        # Redis connection

# Call this once at server startup (e.g., in lifespan handler)
sub setup_redis {
    my ($redis_url) = @_;
    $redis = Redis::Async->new(server => $redis_url);

    # Subscribe to channel - forward to local connections
    $redis->subscribe('events', sub {
        my ($message) = @_;
        my $data = decode_json($message);
        _local_broadcast($data);
    });
}

# Broadcast to local process connections only
sub _local_broadcast {
    my ($message) = @_;
    for my $sse (values %subscribers) {
        $sse->try_send_json($message);
    }
}

# Public API: publish to Redis (all workers receive it)
sub broadcast {
    my ($message) = @_;
    $redis->publish('events', encode_json($message));
}

# Track local connections
my $sub_id = 0;

async sub on_connect {
    my ($self, $sse) = @_;
    my $id = ++$sub_id;
    $subscribers{$id} = $sse;
    $sse->stash->{sub_id} = $id;

    await $sse->send_event(
        event => 'connected',
        data  => { subscriber_id => $id },
    );
}

sub on_disconnect {
    my ($self, $sse) = @_;
    delete $subscribers{$sse->stash->{sub_id}};
}

Now when any worker calls broadcast(), the message goes to Redis, and every worker (including itself) receives it and forwards to their local SSE connections.

Setup Redis in your lifespan handler:

my $app = async sub {
    my ($scope, $receive, $send) = @_;

    if ($scope->{type} eq 'lifespan') {
        my $event = await $receive->();
        if ($event->{type} eq 'lifespan.startup') {
            MyApp::Events::setup_redis('redis://localhost:6379');
            await $send->({ type => 'lifespan.startup.complete' });
        }
        # ... shutdown handling
        return;
    }

    # ... route to SSE endpoint
};

SEE ALSO

PAGI::SSE, PAGI::Endpoint::HTTP, PAGI::Endpoint::WebSocket