NAME
Async::Redis::Subscription - PubSub subscription handler
SYNOPSIS
my $sub = await $redis->subscribe('channel1', 'channel2');
while (my $msg = await $sub->next) {
say "Channel: $msg->{channel}";
say "Data: $msg->{data}";
}
await $sub->unsubscribe('channel1');
await $sub->unsubscribe; # all remaining
DESCRIPTION
Manages Redis PubSub subscriptions with async iterator pattern.
METHODS
next
my $msg = await $sub->next;
Return the next pub/sub message hashref, or undef after a clean close. Fatal subscription errors are rethrown to the caller. Cannot be used after on_message switches the subscription into callback mode.
next_message
my $msg = await $sub->next_message;
Backward-compatible wrapper around next. It returns the same message data using message instead of data:
{
channel => 'channel_name',
message => 'payload',
pattern => undef,
type => 'message',
}
on_reconnect
$sub->on_reconnect(sub {
my ($sub) = @_;
...
});
Set or get the callback fired after subscriptions are replayed on a reconnected socket.
on_message
$sub->on_message(sub {
my ($sub, $msg) = @_;
...
});
Set or get callback-driven delivery. See "CALLBACK-DRIVEN DELIVERY".
on_error
$sub->on_error(sub {
my ($sub, $err) = @_;
...
});
Set or get the fatal-error callback used by callback-driven delivery.
unsubscribe
await $sub->unsubscribe('channel1');
await $sub->unsubscribe; # all regular channels
Unsubscribe regular channels. With no arguments, unsubscribes all regular channels tracked by this subscription.
punsubscribe
await $sub->punsubscribe('prefix:*');
await $sub->punsubscribe; # all patterns
Unsubscribe pattern subscriptions.
sunsubscribe
await $sub->sunsubscribe('shard-channel');
await $sub->sunsubscribe; # all sharded channels
Unsubscribe sharded pub/sub channels.
channels / patterns / sharded_channels
Return the currently tracked regular channels, patterns, or sharded channels.
channel_count
Return the total number of tracked regular, pattern, and sharded subscriptions.
is_closed
Return true after the subscription has been closed.
MESSAGE STRUCTURE
{
type => 'message', # or 'pmessage', 'smessage'
channel => 'channel_name',
pattern => 'pattern', # defined for pmessage, undef otherwise
data => 'payload',
}
The pattern key is always present. It is defined for pmessage frames (the matching glob pattern) and undef for message and smessage frames. Consumers do not need exists $msg->{pattern} checks.
next() always returns real pub/sub messages. Reconnection is transparent.
RECONNECTION
When reconnect is enabled on the Redis connection, subscriptions are automatically re-established after a connection drop. To be notified:
$sub->on_reconnect(sub {
my ($sub) = @_;
warn "Reconnected, may have lost messages";
# re-poll state, log, etc.
});
Messages published while the connection was down are lost (Redis pub/sub has no persistence).
CALLBACK-DRIVEN DELIVERY
As an alternative to the await $sub->next iterator, you can register a callback to receive messages:
my $sub = await $redis->subscribe('chat');
$sub->on_message(sub {
my ($sub, $msg) = @_;
# $msg has the same shape as next() returns:
# { type => 'message'|'pmessage'|'smessage',
# channel => ...,
# pattern => ..., # defined for pmessage, undef otherwise
# data => ... }
});
Callback mode is designed for fire-and-forget listeners — background dispatchers, websocket gateways, channel-layer middleware — where the iterator pattern's requirement to be inside an awaited async sub is awkward or triggers Future::AsyncAwait "lost its returning future" warnings.
Exclusivity
Once on_message is set on a Subscription, it is callback-mode for the rest of its lifetime. Calls to $sub->next will croak. This is sticky — there is no way to switch back. If you need iterator mode, construct a new Subscription.
Signature
$sub->on_message(sub {
my ($subscription, $message) = @_;
...
});
The callback receives the $subscription itself as its first argument (consistent with on_reconnect), and the message hashref as its second. The return value is normally ignored; if the return is a Future, see "Backpressure".
Backpressure
If your callback returns a Future, the driver waits for that Future to resolve before reading the next frame:
$sub->on_message(async sub {
my ($sub, $msg) = @_;
await store_to_database($msg); # driver waits before next read
});
Synchronous callbacks (or callbacks returning non-Future values) do not block the driver. This gives consumers opt-in backpressure with no default overhead.
If the returned Future fails, the failure is routed to on_error.
Fatal error handling
$sub->on_error(sub {
my ($sub, $err) = @_;
...
});
on_error fires when the underlying read encounters an error that cannot be recovered by reconnect (e.g., reconnect is disabled, or reconnect itself failed). After on_error fires, the subscription is closed and the driver stops.
If on_error is not registered, fatal errors die. Silent death of a pub/sub consumer is a debugging nightmare; loud-by-default prevents it. If you genuinely want to swallow errors, register an explicit no-op: $sub->on_error(sub { }).
Callback exceptions (dying inside on_message) are also routed to on_error; the callback-died message is prepended to the error string.
Ordering guarantee
Callbacks fire in the order frames arrive on the connection. No concurrent invocation (Perl is single-threaded and the driver runs on the event loop). After a reconnect, on_reconnect always fires before any post-reconnect on_message.
Re-entrancy
Inside an on_message callback you may safely:
Call
$sub->subscribe(...)— the new channel is added cleanly; messages on it arrive via the same callback.Call
$sub->on_message($new_cb)— the current message is dispatched to the previously-installed handler; the next frame uses the new handler.die— routed toon_error.
Backpressure and Redis server limits
Synchronous callbacks provide backpressure by blocking the driver loop: while your callback runs, the driver doesn't read the next frame, so TCP fills, Redis's output buffer grows. But Redis enforces client-output-buffer-limit pubsub (defaulting to 32mb 8mb 60 in recent versions) — if your subscriber cannot keep up for sustained periods, Redis will disconnect you. There is no amount of client-side buffering that changes this: the limit is on the server.
If your processing is genuinely slow, return a Future from your callback (enabling opt-in backpressure above) AND consider moving the expensive work to a worker pool so the callback can return quickly. Long synchronous processing in pub/sub callbacks is an anti-pattern at scale regardless of client.
INTERNAL LIFECYCLE METHODS
The following methods are used by Async::Redis to manage subscription state. They are not part of the public API for end consumers, but are documented here for maintainers.
_close
Intentional teardown. Marks the subscription closed and wakes any blocked next() with undef. Clears the parent _subscription slot on the Async::Redis object with an identity guard — a stale _close call from an earlier subscription object cannot evict a newer one that has since taken the slot.
_fail_fatal($typed_error)
Unrecoverable failure. Marks the subscription closed with a typed error object. Any blocked next() call will die with that error. The error is preserved for callers who call next() after the fact. Routes through _close's identity guard for parent-slot clearing.
_pause_for_reconnect
Called before a reconnect attempt. Does not mark the subscription closed — the underlying reader has already exited due to the connection drop. Channel/pattern tracking hashes are left intact for replay.
_resume_after_reconnect
Async. Replays all tracked SUBSCRIBE, PSUBSCRIBE, and SSUBSCRIBE commands on the freshly reconnected socket. Sets in_pubsub=1 before sending replay commands so that racing message frames classify correctly (mirrors the timing of the initial subscribe). Fires on_reconnect after replay. Callback-mode subscriptions restart their callback driver; iterator-mode subscriptions continue to receive frames through the parent Redis connection's reader.