NAME

Async::Redis::Subscription - PubSub subscription handler

SYNOPSIS

my $sub = await $redis->subscribe('channel1', 'channel2');

while (my $msg = await $sub->next) {
    say "Channel: $msg->{channel}";
    say "Data: $msg->{data}";
}

await $sub->unsubscribe('channel1');
await $sub->unsubscribe;  # all remaining

DESCRIPTION

Manages Redis PubSub subscriptions with async iterator pattern.

MESSAGE STRUCTURE

{
    type    => 'message',      # or 'pmessage', 'smessage'
    channel => 'channel_name',
    pattern => 'pattern',      # defined for pmessage, undef otherwise
    data    => 'payload',
}

The pattern key is always present. It is defined for pmessage frames (the matching glob pattern) and undef for message and smessage frames. Consumers do not need exists $msg->{pattern} checks.

next() always returns real pub/sub messages. Reconnection is transparent.

RECONNECTION

When reconnect is enabled on the Redis connection, subscriptions are automatically re-established after a connection drop. To be notified:

$sub->on_reconnect(sub {
    my ($sub) = @_;
    warn "Reconnected, may have lost messages";
    # re-poll state, log, etc.
});

Messages published while the connection was down are lost (Redis pub/sub has no persistence).

CALLBACK-DRIVEN DELIVERY

As an alternative to the await $sub->next iterator, you can register a callback to receive messages:

my $sub = await $redis->subscribe('chat');
$sub->on_message(sub {
    my ($sub, $msg) = @_;
    # $msg has the same shape as next() returns:
    #   { type => 'message'|'pmessage'|'smessage',
    #     channel => ...,
    #     pattern => ...,  # defined for pmessage, undef otherwise
    #     data    => ... }
});

Callback mode is designed for fire-and-forget listeners — background dispatchers, websocket gateways, channel-layer middleware — where the iterator pattern's requirement to be inside an awaited async sub is awkward or triggers Future::AsyncAwait "lost its returning future" warnings.

Exclusivity

Once on_message is set on a Subscription, it is callback-mode for the rest of its lifetime. Calls to $sub->next will croak. This is sticky — there is no way to switch back. If you need iterator mode, construct a new Subscription.

Signature

$sub->on_message(sub {
    my ($subscription, $message) = @_;
    ...
});

The callback receives the $subscription itself as its first argument (consistent with on_reconnect), and the message hashref as its second. The return value is normally ignored; if the return is a Future, see "Backpressure".

Backpressure

If your callback returns a Future, the driver waits for that Future to resolve before reading the next frame:

$sub->on_message(async sub {
    my ($sub, $msg) = @_;
    await store_to_database($msg);    # driver waits before next read
});

Synchronous callbacks (or callbacks returning non-Future values) do not block the driver. This gives consumers opt-in backpressure with no default overhead.

If the returned Future fails, the failure is routed to on_error.

Fatal error handling

$sub->on_error(sub {
    my ($sub, $err) = @_;
    ...
});

on_error fires when the underlying read encounters an error that cannot be recovered by reconnect (e.g., reconnect is disabled, or reconnect itself failed). After on_error fires, the subscription is closed and the driver stops.

If on_error is not registered, fatal errors die. Silent death of a pub/sub consumer is a debugging nightmare; loud-by-default prevents it. If you genuinely want to swallow errors, register an explicit no-op: $sub->on_error(sub { }).

Callback exceptions (dying inside on_message) are also routed to on_error; the callback-died message is prepended to the error string.

Ordering guarantee

Callbacks fire in the order frames arrive on the connection. No concurrent invocation (Perl is single-threaded and the driver runs on the event loop). After a reconnect, on_reconnect always fires before any post-reconnect on_message.

Re-entrancy

Inside an on_message callback you may safely:

  • Call $sub->subscribe(...) — the new channel is added cleanly; messages on it arrive via the same callback.

  • Call $sub->on_message($new_cb) — the current message is dispatched to the previously-installed handler; the next frame uses the new handler.

  • die — routed to on_error.

Backpressure and Redis server limits

Synchronous callbacks provide backpressure by blocking the driver loop: while your callback runs, the driver doesn't read the next frame, so TCP fills, Redis's output buffer grows. But Redis enforces client-output-buffer-limit pubsub (defaulting to 32mb 8mb 60 in recent versions) — if your subscriber cannot keep up for sustained periods, Redis will disconnect you. There is no amount of client-side buffering that changes this: the limit is on the server.

If your processing is genuinely slow, return a Future from your callback (enabling opt-in backpressure above) AND consider moving the expensive work to a worker pool so the callback can return quickly. Long synchronous processing in pub/sub callbacks is an anti-pattern at scale regardless of client.