NAME

Async::Redis::Subscription - PubSub subscription handler

SYNOPSIS

my $sub = await $redis->subscribe('channel1', 'channel2');

while (my $msg = await $sub->next) {
    say "Channel: $msg->{channel}";
    say "Data: $msg->{data}";
}

await $sub->unsubscribe('channel1');
await $sub->unsubscribe;  # all remaining

DESCRIPTION

Manages Redis PubSub subscriptions with async iterator pattern.

METHODS

next

my $msg = await $sub->next;

Return the next pub/sub message hashref, or undef after a clean close. Fatal subscription errors are rethrown to the caller. Cannot be used after on_message switches the subscription into callback mode.

next_message

my $msg = await $sub->next_message;

Backward-compatible wrapper around next. It returns the same message data using message instead of data:

{
    channel => 'channel_name',
    message => 'payload',
    pattern => undef,
    type    => 'message',
}

on_reconnect

$sub->on_reconnect(sub {
    my ($sub) = @_;
    ...
});

Set or get the callback fired after subscriptions are replayed on a reconnected socket.

on_message

$sub->on_message(sub {
    my ($sub, $msg) = @_;
    ...
});

Set or get callback-driven delivery. See "CALLBACK-DRIVEN DELIVERY".

on_error

$sub->on_error(sub {
    my ($sub, $err) = @_;
    ...
});

Set or get the fatal-error callback used by callback-driven delivery.

unsubscribe

await $sub->unsubscribe('channel1');
await $sub->unsubscribe;  # all regular channels

Unsubscribe regular channels. With no arguments, unsubscribes all regular channels tracked by this subscription.

punsubscribe

await $sub->punsubscribe('prefix:*');
await $sub->punsubscribe;  # all patterns

Unsubscribe pattern subscriptions.

sunsubscribe

await $sub->sunsubscribe('shard-channel');
await $sub->sunsubscribe;  # all sharded channels

Unsubscribe sharded pub/sub channels.

channels / patterns / sharded_channels

Return the currently tracked regular channels, patterns, or sharded channels.

channel_count

Return the total number of tracked regular, pattern, and sharded subscriptions.

is_closed

Return true after the subscription has been closed.

MESSAGE STRUCTURE

{
    type    => 'message',      # or 'pmessage', 'smessage'
    channel => 'channel_name',
    pattern => 'pattern',      # defined for pmessage, undef otherwise
    data    => 'payload',
}

The pattern key is always present. It is defined for pmessage frames (the matching glob pattern) and undef for message and smessage frames. Consumers do not need exists $msg->{pattern} checks.

next() always returns real pub/sub messages. Reconnection is transparent.

RECONNECTION

When reconnect is enabled on the Redis connection, subscriptions are automatically re-established after a connection drop. To be notified:

$sub->on_reconnect(sub {
    my ($sub) = @_;
    warn "Reconnected, may have lost messages";
    # re-poll state, log, etc.
});

Messages published while the connection was down are lost (Redis pub/sub has no persistence).

CALLBACK-DRIVEN DELIVERY

As an alternative to the await $sub->next iterator, you can register a callback to receive messages:

my $sub = await $redis->subscribe('chat');
$sub->on_message(sub {
    my ($sub, $msg) = @_;
    # $msg has the same shape as next() returns:
    #   { type => 'message'|'pmessage'|'smessage',
    #     channel => ...,
    #     pattern => ...,  # defined for pmessage, undef otherwise
    #     data    => ... }
});

Callback mode is designed for fire-and-forget listeners — background dispatchers, websocket gateways, channel-layer middleware — where the iterator pattern's requirement to be inside an awaited async sub is awkward or triggers Future::AsyncAwait "lost its returning future" warnings.

Exclusivity

Once on_message is set on a Subscription, it is callback-mode for the rest of its lifetime. Calls to $sub->next will croak. This is sticky — there is no way to switch back. If you need iterator mode, construct a new Subscription.

Signature

$sub->on_message(sub {
    my ($subscription, $message) = @_;
    ...
});

The callback receives the $subscription itself as its first argument (consistent with on_reconnect), and the message hashref as its second. The return value is normally ignored; if the return is a Future, see "Backpressure".

Backpressure

If your callback returns a Future, the driver waits for that Future to resolve before reading the next frame:

$sub->on_message(async sub {
    my ($sub, $msg) = @_;
    await store_to_database($msg);    # driver waits before next read
});

Synchronous callbacks (or callbacks returning non-Future values) do not block the driver. This gives consumers opt-in backpressure with no default overhead.

If the returned Future fails, the failure is routed to on_error.

Fatal error handling

$sub->on_error(sub {
    my ($sub, $err) = @_;
    ...
});

on_error fires when the underlying read encounters an error that cannot be recovered by reconnect (e.g., reconnect is disabled, or reconnect itself failed). After on_error fires, the subscription is closed and the driver stops.

If on_error is not registered, fatal errors die. Silent death of a pub/sub consumer is a debugging nightmare; loud-by-default prevents it. If you genuinely want to swallow errors, register an explicit no-op: $sub->on_error(sub { }).

Callback exceptions (dying inside on_message) are also routed to on_error; the callback-died message is prepended to the error string.

Ordering guarantee

Callbacks fire in the order frames arrive on the connection. No concurrent invocation (Perl is single-threaded and the driver runs on the event loop). After a reconnect, on_reconnect always fires before any post-reconnect on_message.

Re-entrancy

Inside an on_message callback you may safely:

  • Call $sub->subscribe(...) — the new channel is added cleanly; messages on it arrive via the same callback.

  • Call $sub->on_message($new_cb) — the current message is dispatched to the previously-installed handler; the next frame uses the new handler.

  • die — routed to on_error.

Backpressure and Redis server limits

Synchronous callbacks provide backpressure by blocking the driver loop: while your callback runs, the driver doesn't read the next frame, so TCP fills, Redis's output buffer grows. But Redis enforces client-output-buffer-limit pubsub (defaulting to 32mb 8mb 60 in recent versions) — if your subscriber cannot keep up for sustained periods, Redis will disconnect you. There is no amount of client-side buffering that changes this: the limit is on the server.

If your processing is genuinely slow, return a Future from your callback (enabling opt-in backpressure above) AND consider moving the expensive work to a worker pool so the callback can return quickly. Long synchronous processing in pub/sub callbacks is an anti-pattern at scale regardless of client.

INTERNAL LIFECYCLE METHODS

The following methods are used by Async::Redis to manage subscription state. They are not part of the public API for end consumers, but are documented here for maintainers.

_close

Intentional teardown. Marks the subscription closed and wakes any blocked next() with undef. Clears the parent _subscription slot on the Async::Redis object with an identity guard — a stale _close call from an earlier subscription object cannot evict a newer one that has since taken the slot.

_fail_fatal($typed_error)

Unrecoverable failure. Marks the subscription closed with a typed error object. Any blocked next() call will die with that error. The error is preserved for callers who call next() after the fact. Routes through _close's identity guard for parent-slot clearing.

_pause_for_reconnect

Called before a reconnect attempt. Does not mark the subscription closed — the underlying reader has already exited due to the connection drop. Channel/pattern tracking hashes are left intact for replay.

_resume_after_reconnect

Async. Replays all tracked SUBSCRIBE, PSUBSCRIBE, and SSUBSCRIBE commands on the freshly reconnected socket. Sets in_pubsub=1 before sending replay commands so that racing message frames classify correctly (mirrors the timing of the initial subscribe). Fires on_reconnect after replay. Callback-mode subscriptions restart their callback driver; iterator-mode subscriptions continue to receive frames through the parent Redis connection's reader.