NAME
Async::Redis - Async Redis client using Future::IO
SYNOPSIS
use Async::Redis;
use Future::AsyncAwait;
# Future::IO 0.23+ has a built-in poll-based impl that works
# out of the box. For IO::Async or UV, require the impl directly:
# require Future::IO::Impl::IOAsync; # if using IO::Async
# require Future::IO::Impl::UV; # if using UV
my $redis = Async::Redis->new(
host => 'localhost',
port => 6379,
);
(async sub {
await $redis->connect;
# Simple commands
await $redis->set('key', 'value');
my $value = await $redis->get('key');
# Pipelining for efficiency
my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->set('k2', 'v2');
$pipeline->get('k1');
my $results = await $pipeline->execute;
# PubSub
my $sub = await $redis->subscribe('channel');
while (my $msg = await $sub->next_message) {
print "Received: $msg->{message}\n";
}
})->()->await;
Important: If you're embedding Async::Redis in a larger application (web framework, existing event loop, etc.), see "EVENT LOOP CONFIGURATION" for how to properly configure Future::IO. Libraries should never configure the Future::IO backend - only your application's entry point should.
DESCRIPTION
Async::Redis is an asynchronous Redis client built on Future::IO, providing a modern, non-blocking interface for Redis operations.
Key features:
Full async/await support via Future::AsyncAwait
Event loop agnostic (IO::Async, AnyEvent, UV, etc.)
Automatic reconnection with exponential backoff
Connection pooling with health checks
Pipelining and auto-pipelining
PubSub with automatic subscription replay on reconnect
When a connection drops during pub/sub mode and
reconnectis enabled, all subscriptions are automatically re-established. Use$subscription->on_reconnect(sub { ... })to be notified when this happens (e.g., to re-poll state that may have changed during the outage).Transaction support (MULTI/EXEC/WATCH)
TLS/SSL connections
OpenTelemetry observability integration
Fork-safe for pre-fork servers (Starman, etc.)
Full RESP2 protocol support
Safe concurrent commands on single connection
CONCURRENT COMMANDS
Async::Redis safely handles multiple concurrent commands on a single connection using a response queue pattern. When you fire multiple async commands without explicitly awaiting them:
my @futures = (
$redis->set('k1', 'v1'),
$redis->set('k2', 'v2'),
$redis->get('k1'),
);
my @results = await Future->needs_all(@futures);
Each command is registered in an inflight queue before being sent to Redis. A single reader coroutine processes responses in FIFO order, matching each response to the correct waiting future. This prevents response mismatch bugs that can occur when multiple coroutines race to read from the socket.
For high-throughput scenarios, consider using:
Explicit pipelines -
$redis->pipelinebatches commands for a single network round-tripAuto-pipeline -
auto_pipeline => 1automatically batches commands within an event loop tickConnection pools - Async::Redis::Pool for parallel execution across multiple connections
CONSTRUCTOR
new
my $redis = Async::Redis->new(%options);
Creates a new Redis client instance. Does not connect immediately.
Options:
- host => $hostname
-
Redis server hostname. Default: 'localhost'
- port => $port
-
Redis server port. Default: 6379
- uri => $uri
-
Connection URI (e.g., 'redis://user:pass@host:port/db'). If provided, overrides host, port, password, database options.
- path => $path
-
Unix domain socket path. When provided,
hostandportare ignored. Also available viaredis+unix://URIs. - password => $password
-
Authentication password.
- username => $username
-
Authentication username (Redis 6+ ACL).
- database => $db
-
Database number to SELECT after connect. Default: 0
- tls => $bool | \%options
-
Enable TLS/SSL connection. Can be a boolean or hashref with options:
tls => { ca_file => '/path/to/ca.crt', cert_file => '/path/to/client.crt', key_file => '/path/to/client.key', verify => 1, # verify server certificate verify_hostname => 1, # verify certificate name/IP } - connect_timeout => $seconds
-
Connection timeout. Default: 10
- request_timeout => $seconds
-
Per-request timeout for commands. Default: 5
Blocking commands (BLPOP, BRPOP, etc.) automatically extend this timeout based on their server-side timeout plus
blocking_timeout_buffer. Blocking commands with a Redis timeout of0block indefinitely and do not get a client-side request deadline. - blocking_timeout_buffer => $seconds
-
Extra time added to blocking command timeouts. Default: 2
For example,
BLPOP key 30gets a deadline of 30 + 2 = 32 seconds. - reconnect => $bool
-
Enable automatic reconnection. Default: 0
- reconnect_delay => $seconds
-
Initial reconnect delay. Default: 0.1
- reconnect_delay_max => $seconds
-
Maximum reconnect delay. Default: 60
- reconnect_jitter => $ratio
-
Jitter ratio for reconnect delays. Default: 0.25
- reconnect_max_attempts => $int
-
Maximum number of reconnect attempts before
_reconnectgives up and dies with an Async::Redis::Error::Disconnected. Default: 10. Set to0for unlimited retries (not recommended in production: an unreachable Redis will loop with exponential backoff forever, giving consumers no way to distinguish "reconnecting" from "broken").When the cap is exceeded, the failure propagates through
_reconnect_pubsubto any active Async::Redis::Subscription's read loop, where it routes toon_error(ordies loudly if noon_erroris registered) per the existing Subscription fatal-error contract. - on_connect => $coderef
-
Callback when connection is established. Called as
$coderef->($redis). - on_disconnect => $coderef
-
Callback when a live connection is intentionally closed or lost. Called as
$coderef->($redis, $reason). - on_error => $coderef
-
Callback for connection/read errors before they are propagated. Called as
$coderef->($redis, $error). - prefix => $prefix
-
Key prefix applied to supported key-bearing commands. See "PREFIX LIMITATIONS".
- client_name => $name
-
CLIENT SETNAME value sent on connect.
- pipeline_depth => $int
-
Maximum commands allowed in an explicit pipeline. Default: 10000
- auto_pipeline => $bool
-
If true, commands issued in the same event-loop tick are automatically batched and sent as one pipeline. Default: 0
- message_queue_depth => $int
-
Maximum number of locally queued pub/sub messages before the reader applies backpressure. Must be at least 1. Default: 1
- debug => $bool | $coderef
-
Enable debug logging. A true non-coderef logs to STDERR. A coderef receives
($direction, $data). - otel_tracer => $tracer
-
OpenTelemetry tracer for span creation.
- otel_meter => $meter
-
OpenTelemetry meter for metrics.
- otel_include_args => $bool
-
Include command arguments in OpenTelemetry span statements. Default: 0. See "OPENTELEMETRY ARGUMENTS".
- otel_redact => $bool
-
Apply built-in credential redaction when command arguments are included in logs or spans. Default: 1.
PREFIX LIMITATIONS
The prefix option is a convenience for namespacing keys; it is not a hard security boundary. Key extraction is driven by a hand-maintained map covering common Redis commands. As of 0.002000, the following commands have incomplete or missing key extraction: BITFIELD, LMPOP, BLMPOP, ZINTERCARD, LCS, GEORADIUS_RO, GEORADIUSBYMEMBER_RO. Calls to these commands will not have prefixes applied. For multi-tenant isolation, use Redis ACLs or separate Redis databases rather than relying on prefix.
TLS HOSTNAME VERIFICATION
Starting in 0.002000, TLS connections verify the server certificate's hostname (or IP SAN) by default when verify is on. If you connect by hostname, the certificate must have a matching CN or SAN. If you connect by IP literal, the certificate must have a matching IP SAN.
For deployments where the certificate does not match the connected hostname/IP (common when connecting to internal IPs with hostname-only certs), set tls => { verify_hostname => 0 } to skip hostname identity while still verifying the CA chain.
METHODS
connect
await $redis->connect;
Establish connection to Redis server. Returns a Future that resolves to the Redis client instance.
disconnect
$redis->disconnect;
Close connection gracefully.
is_connected
my $ok = $redis->is_connected;
Return true when the client currently has an open Redis connection.
ping
my $pong = await $redis->ping;
Send PING and return Redis's response.
command
my $result = await $redis->command('GET', 'key');
Execute arbitrary Redis command.
Redis Commands
All standard Redis commands are available as methods. See https://redis.io/docs/latest/commands/ for the complete Redis command reference.
# Strings
await $redis->set('key', 'value');
await $redis->set('key', 'value', ex => 300); # with 5min expiry
await $redis->set('key', 'value', nx => 1); # only if not exists
my $value = await $redis->get('key');
await $redis->incr('counter');
await $redis->incrby('counter', 5);
await $redis->mset('k1', 'v1', 'k2', 'v2');
my $values = await $redis->mget('k1', 'k2');
await $redis->append('key', ' more');
await $redis->setex('key', 60, 'value'); # set with 60s expiry
# Hashes
await $redis->hset('user:1', 'name', 'Alice', 'email', 'alice@example.com');
my $name = await $redis->hget('user:1', 'name');
my $user = await $redis->hgetall('user:1'); # returns hashref
await $redis->hincrby('user:1', 'visits', 1);
my $exists = await $redis->hexists('user:1', 'name');
await $redis->hdel('user:1', 'email');
# Lists
await $redis->lpush('queue', 'job1', 'job2');
await $redis->rpush('queue', 'job3');
my $left = await $redis->lpop('queue');
my $right = await $redis->rpop('queue');
my $popped = await $redis->blpop('queue', 5); # ['queue', 'job'] or undef
my $items = await $redis->lrange('queue', 0, -1);
my $len = await $redis->llen('queue');
# Sets
await $redis->sadd('tags', 'perl', 'redis', 'async');
await $redis->srem('tags', 'async');
my $members = await $redis->smembers('tags');
my $is_member = await $redis->sismember('tags', 'perl');
my $common = await $redis->sinter('tags1', 'tags2');
# Sorted Sets
await $redis->zadd('leaderboard', 100, 'alice', 85, 'bob');
await $redis->zincrby('leaderboard', 10, 'alice');
my $top = await $redis->zrange('leaderboard', 0, 9, 'WITHSCORES');
my $rank = await $redis->zrank('leaderboard', 'alice');
my $score = await $redis->zscore('leaderboard', 'alice');
# Keys
my $exists = await $redis->exists('key');
await $redis->expire('key', 300);
my $ttl = await $redis->ttl('key');
await $redis->del('key1', 'key2');
await $redis->rename('old', 'new');
my $type = await $redis->type('key');
my $keys = await $redis->keys('user:*'); # use SCAN in production
pipeline
my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->incr('counter');
my $results = await $pipeline->execute;
Create a pipeline for batched command execution. All commands are sent in a single network round-trip.
subscribe
my $sub = await $redis->subscribe('channel1', 'channel2');
Subscribe to channels. Returns a Async::Redis::Subscription object.
psubscribe
my $sub = await $redis->psubscribe('chan:*');
Subscribe to patterns. Returns a Async::Redis::Subscription object.
ssubscribe
my $sub = await $redis->ssubscribe('shard-channel');
Subscribe to Redis 7 sharded pub/sub channels. Returns a Async::Redis::Subscription object.
publish
my $receivers = await $redis->publish('channel', 'message');
Publish a regular pub/sub message.
spublish
my $receivers = await $redis->spublish('shard-channel', 'message');
Publish a Redis 7 sharded pub/sub message.
multi
my $results = await $redis->multi(async sub {
my ($tx) = @_;
$tx->set('k1', 'v1');
$tx->incr('counter');
});
Execute a transaction with callback.
watch
await $redis->watch('key1', 'key2');
Watch keys for a manual optimistic transaction. Redis clears watched keys on EXEC, DISCARD, or UNWATCH.
unwatch
await $redis->unwatch;
Clear all watched keys on the current connection.
multi_start
await $redis->multi_start;
Start a manual MULTI transaction and mark the connection dirty until exec or discard.
exec
my $results = await $redis->exec;
Execute a manual transaction. Returns undef if Redis aborts because a watched key changed.
discard
await $redis->discard;
Abort a manual transaction. Redis also clears any active watched keys.
watch_multi
my $results = await $redis->watch_multi(['key'], async sub {
my ($tx, $values) = @_;
$tx->set('key', $values->{key} + 1);
});
Watch keys and execute transaction atomically. Returns undef if watched keys were modified by another client.
script
my $script = $redis->script('return redis.call("get", KEYS[1])');
my $result = await $script->run(['mykey']);
Create a Lua script object with automatic EVALSHA optimization. See Async::Redis::Script for details.
define_command
$redis->define_command(my_command => {
keys => 1, # Number of KEYS (or 'dynamic')
lua => 'return ...', # Lua script code
description => 'Does X', # Optional documentation
});
Register a named Lua script for reuse. The script is automatically cached and uses EVALSHA for efficiency. Script names are kept in this Redis object's registry only; they are not installed as Perl methods. Execute registered scripts with run_script.
Options:
keys- Number of KEYS the script expects. Use'dynamic'if variable (first arg to run_script will be the key count).lua- The Lua script source code.description- Optional description for documentation.
run_script
my $result = await $redis->run_script('my_command', @keys, @args);
Execute a registered script by name. For scripts with fixed key count, pass keys then args. For dynamic scripts, pass key count first:
# Fixed keys (keys => 2)
await $redis->run_script('two_key_script', 'key1', 'key2', 'arg1');
# Dynamic keys
await $redis->run_script('dynamic_script', 2, 'key1', 'key2', 'arg1');
get_script
my $script = $redis->get_script('my_command');
Get a registered script object by name. Returns undef if not found.
list_scripts
my @names = $redis->list_scripts;
List all registered script names.
preload_scripts
my $count = await $redis->preload_scripts;
Load all registered scripts to Redis server. Useful before pipeline execution to ensure EVALSHA will succeed.
script_load / script_exists / script_flush / script_kill
my $sha = await $redis->script_load($lua);
my $flags = await $redis->script_exists($sha1, $sha2);
await $redis->script_flush('ASYNC');
await $redis->script_kill;
Thin wrappers around Redis SCRIPT subcommands.
is_dirty
my $dirty = $redis->is_dirty;
Return true if the connection has state that makes it unsafe to return to a pool: active transaction, watched keys, pub/sub mode, or pending responses.
in_multi / watching / in_pubsub / inflight_count
State accessors used by Async::Redis::Pool and useful for diagnostics.
LUA SCRIPTING
Async::Redis provides comprehensive support for Redis Lua scripting with automatic EVALSHA optimization.
Quick Start
# Define a reusable script
$redis->define_command(atomic_incr => {
keys => 1,
lua => <<'LUA',
local current = tonumber(redis.call('GET', KEYS[1]) or 0)
local result = current + tonumber(ARGV[1])
redis.call('SET', KEYS[1], result)
return result
LUA
});
# Use it
my $result = await $redis->run_script('atomic_incr', 'counter', 5);
Pipeline Integration
Registered scripts work in pipelines:
my $pipe = $redis->pipeline;
$pipe->run_script('atomic_incr', 'counter:a', 1);
$pipe->run_script('atomic_incr', 'counter:b', 1);
$pipe->set('other:key', 'value');
my $results = await $pipe->execute;
Scripts are automatically preloaded before pipeline execution.
EVALSHA Optimization
Scripts automatically use EVALSHA (by SHA1 hash) for efficiency. If the script isn't cached on the server, it falls back to EVAL and caches for future calls. This is transparent to your code.
scan_iter
my $iter = $redis->scan_iter(match => 'user:*', count => 100);
while (my $keys = await $iter->next) {
for my $key (@$keys) { ... }
}
Create an iterator for SCAN. Also available:
my $hash_iter = $redis->hscan_iter('hash', match => 'field:*');
my $set_iter = $redis->sscan_iter('set', count => 100);
my $zset_iter = $redis->zscan_iter('zset');
Iterators return batches. ZSCAN batches are the Redis flat member/score list.
CONNECTION POOLING
For high-throughput applications, use Async::Redis::Pool:
use Async::Redis::Pool;
my $pool = Async::Redis::Pool->new(
host => 'localhost',
min => 2,
max => 10,
);
# Use with() for automatic acquire/release
my $result = await $pool->with(sub {
my ($conn) = @_;
return $conn->get('key');
});
ERROR HANDLING
Errors are thrown as exception objects:
eval {
await $redis->get('key');
1;
} or do {
my $error = $@;
if (ref($error) && $error->isa('Async::Redis::Error::Connection')) {
# Connection error
} elsif (ref($error) && $error->isa('Async::Redis::Error::Timeout')) {
# Timeout error
} elsif (ref($error) && $error->isa('Async::Redis::Error::Redis')) {
# Redis error (e.g., WRONGTYPE)
}
};
Exception classes:
- Async::Redis::Error::Connection
-
Connection-related errors (refused, reset, etc.)
- Async::Redis::Error::Timeout
-
Timeout errors (connect, request, read).
- Async::Redis::Error::Protocol
-
Protocol parsing errors.
- Async::Redis::Error::Redis
-
Errors returned by Redis (WRONGTYPE, ERR, etc.)
- Async::Redis::Error::Disconnected
-
Operation attempted on disconnected client.
FORK SAFETY
Async::Redis is fork-safe. When a fork is detected, the child process will automatically invalidate its connection state and reconnect when needed. The parent retains ownership of the original connection.
EVENT LOOP CONFIGURATION
Async::Redis uses Future::IO for event loop abstraction, making it compatible with IO::Async, UV, AnyEvent, and other event loops. However, Async::Redis does not choose which event loop to use - that's the application's responsibility.
Default (No Configuration Needed)
Future::IO 0.23+ includes a built-in poll-based implementation that works out of the box. For standalone scripts, you don't need to configure anything:
#!/usr/bin/env perl
use strict;
use warnings;
use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');
# Just works - Future::IO uses its built-in IO::Poll backend
The Golden Rule
Only executable scripts should configure Future::IO. Library modules (.pm files) should never configure the backend because they don't know what event loop the application wants to use.
For IO::Async Applications
If your application already uses IO::Async for its event loop, load the implementation directly:
use IO::Async::Loop;
require Future::IO::Impl::IOAsync;
my $loop = IO::Async::Loop->new;
use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');
Note: Use require rather than Future::IO->load_impl('IOAsync') for compatibility with Future::IO 0.22+ which gates load_impl on the newer poll API.
For UV Applications
If your application uses UV (libuv) for its event loop:
use UV;
require Future::IO::Impl::UV;
use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');
Checking the Current Implementation
To see which Future::IO implementation is active:
use Future::IO;
print "Using: $Future::IO::IMPL\n";
OBSERVABILITY
OpenTelemetry integration is available:
use OpenTelemetry::SDK;
my $redis = Async::Redis->new(
host => 'localhost',
otel_tracer => OpenTelemetry->tracer_provider->tracer('my-app'),
otel_meter => OpenTelemetry->meter_provider->meter('my-app'),
);
This enables:
Distributed tracing with spans per Redis command
Metrics: command latency, connection counts, errors
Automatic attribute extraction (command, database, etc.)
OPENTELEMETRY ARGUMENTS
Starting in 0.002000, command arguments are no longer included in spans by default. Redis values frequently contain session tokens, user IDs, and other PII; exporting them to a tracing backend is a privacy hazard. Pass otel_include_args => 1 to re-enable, and implement custom redaction for your data shapes before doing so.
MESSAGE QUEUE DEPTH
message_queue_depth limits the number of queued pubsub messages. Callback invocation is always serialized. With message_queue_depth => 1 (the default), one message may be queued while one callback is still processing; the reader pauses when that queue slot is full. Higher values allow more messages to buffer locally before the reader pauses.
TASK LIFECYCLE
Async::Redis organizes all fire-and-forget background work (the socket reader, reconnect attempts, auto-pipeline submit batches, the pubsub callback driver) under a single per-client Future::Selector instance, following Paul Evans's client pattern from Sys::Async::Virt and IPC::MicroSocket.
Each background task is registered with the selector via $selector->add(data => $label, f => $task_future). Command execution awaits responses via $selector->run_until_ready($response_future), which pumps the selector and propagates any task failure to the awaiting caller. The practical guarantee: if a background task dies (including from a coding bug that escapes explicit fatal-error handling), awaiting callers see a typed failure rather than hanging forever.
This structure provides the five structured-concurrency properties articulated by the trio / asyncio.TaskGroup ecosystems:
GC safety - every background task is held by the selector.
Error propagation - any task's failure reaches callers awaiting the selector via
run_until_ready.Cancellation - socket closure propagates to pending I/O, which fails the owning task, which the selector propagates.
Scope cleanup -
disconnecttears down state; remaining selector tasks unwind via their existing on_fail handlers.Local reasoning - all concurrent work on one connection is owned by one place.
There is no user-facing API for the selector; it is internal machinery. Clients should not call $redis->{_tasks} directly.
Note: the only use of Future->retain in this codebase is avoided in favor of selector ownership. Any patch that introduces ->retain on an Async::Redis-owned Future should instead add the task to $self->{_tasks} so failure propagation and lifetime ownership are consistent.
KNOWN LIMITATIONS
Hostname resolution is synchronous.
connect()callsinet_atonbefore the async connect, which blocks during DNS lookup. Not covered byconnect_timeout.IPv6 URI hosts are not yet supported.
Some generated wrappers expose mode-changing commands (HELLO, CLIENT REPLY, MONITOR, SYNC, PSYNC) that interact poorly with the response model. Avoid them unless you understand the protocol consequences.
SEE ALSO
Future::IO - The underlying async I/O abstraction
Future::AsyncAwait - Async/await syntax support
Async::Redis::Pool - Connection pooling
Async::Redis::Subscription - PubSub subscriptions
Async::Redis::Cookbook - Practical usage recipes
Redis - Synchronous Redis client
Net::Async::Redis - Another async Redis client
AUTHOR
John Napiorkowski
COPYRIGHT AND LICENSE
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.