NAME
Async::Redis - Async Redis client using Future::IO
SYNOPSIS
use Async::Redis;
use Future::AsyncAwait;
# Use any Future::IO-compatible event loop
# IO::Async:
use IO::Async::Loop;
use Future::IO;
Future::IO->load_impl('IOAsync');
my $loop = IO::Async::Loop->new;
# Or UV: Future::IO->load_impl('UV');
# Or Glib: Future::IO->load_impl('Glib');
my $redis = Async::Redis->new(
host => 'localhost',
port => 6379,
);
(async sub {
await $redis->connect;
# Simple commands
await $redis->set('key', 'value');
my $value = await $redis->get('key');
# Pipelining for efficiency
my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->set('k2', 'v2');
$pipeline->get('k1');
my $results = await $pipeline->execute;
# PubSub
my $sub = await $redis->subscribe('channel');
while (my $msg = await $sub->next_message) {
print "Received: $msg->{message}\n";
}
})->();
$loop->run;
DESCRIPTION
Async::Redis is an asynchronous Redis client built on Future::IO, providing a modern, non-blocking interface for Redis operations.
Key features:
Full async/await support via Future::AsyncAwait
Event loop agnostic (IO::Async, AnyEvent, UV, etc.)
Automatic reconnection with exponential backoff
Connection pooling with health checks
Pipelining and auto-pipelining
PubSub with automatic subscription replay on reconnect
Transaction support (MULTI/EXEC/WATCH)
TLS/SSL connections
OpenTelemetry observability integration
Fork-safe for pre-fork servers (Starman, etc.)
Full RESP2 protocol support
Safe concurrent commands on single connection
CONCURRENT COMMANDS
Async::Redis safely handles multiple concurrent commands on a single connection using a response queue pattern. When you fire multiple async commands without explicitly awaiting them:
my @futures = (
$redis->set('k1', 'v1'),
$redis->set('k2', 'v2'),
$redis->get('k1'),
);
my @results = await Future->needs_all(@futures);
Each command is registered in an inflight queue before being sent to Redis. A single reader coroutine processes responses in FIFO order, matching each response to the correct waiting future. This prevents response mismatch bugs that can occur when multiple coroutines race to read from the socket.
For high-throughput scenarios, consider using:
Explicit pipelines -
$redis->pipelinebatches commands for a single network round-tripAuto-pipeline -
auto_pipeline => 1automatically batches commands within an event loop tickConnection pools - Async::Redis::Pool for parallel execution across multiple connections
CONSTRUCTOR
new
my $redis = Async::Redis->new(%options);
Creates a new Redis client instance. Does not connect immediately.
Options:
- host => $hostname
-
Redis server hostname. Default: 'localhost'
- port => $port
-
Redis server port. Default: 6379
- uri => $uri
-
Connection URI (e.g., 'redis://user:pass@host:port/db'). If provided, overrides host, port, password, database options.
- password => $password
-
Authentication password.
- username => $username
-
Authentication username (Redis 6+ ACL).
- database => $db
-
Database number to SELECT after connect. Default: 0
- tls => $bool | \%options
-
Enable TLS/SSL connection. Can be a boolean or hashref with options:
tls => { ca_file => '/path/to/ca.crt', cert_file => '/path/to/client.crt', key_file => '/path/to/client.key', verify => 1, # verify server certificate } - connect_timeout => $seconds
-
Connection timeout. Default: 10
- read_timeout => $seconds
-
Read timeout. Default: 30
- request_timeout => $seconds
-
Per-request timeout. Default: 5
- reconnect => $bool
-
Enable automatic reconnection. Default: 0
- reconnect_delay => $seconds
-
Initial reconnect delay. Default: 0.1
- reconnect_delay_max => $seconds
-
Maximum reconnect delay. Default: 60
- reconnect_jitter => $ratio
-
Jitter ratio for reconnect delays. Default: 0.25
- on_connect => $coderef
-
Callback when connection established.
- on_disconnect => $coderef
-
Callback when connection lost.
- on_error => $coderef
-
Callback for connection errors.
- prefix => $prefix
-
Key prefix applied to all commands.
- client_name => $name
-
CLIENT SETNAME value sent on connect.
- debug => $bool
-
Enable debug logging.
- otel_tracer => $tracer
-
OpenTelemetry tracer for span creation.
- otel_meter => $meter
-
OpenTelemetry meter for metrics.
METHODS
connect
await $redis->connect;
Establish connection to Redis server. Returns a Future that resolves to the Redis client instance.
disconnect
$redis->disconnect;
Close connection gracefully.
command
my $result = await $redis->command('GET', 'key');
Execute arbitrary Redis command.
Redis Commands
All standard Redis commands are available as methods. See https://redis.io/docs/latest/commands/ for the complete Redis command reference.
# Strings
await $redis->set('key', 'value');
await $redis->set('key', 'value', ex => 300); # with 5min expiry
await $redis->set('key', 'value', nx => 1); # only if not exists
my $value = await $redis->get('key');
await $redis->incr('counter');
await $redis->incrby('counter', 5);
await $redis->mset('k1', 'v1', 'k2', 'v2');
my $values = await $redis->mget('k1', 'k2');
await $redis->append('key', ' more');
await $redis->setex('key', 60, 'value'); # set with 60s expiry
# Hashes
await $redis->hset('user:1', 'name', 'Alice', 'email', 'alice@example.com');
my $name = await $redis->hget('user:1', 'name');
my $user = await $redis->hgetall('user:1'); # returns hashref
await $redis->hincrby('user:1', 'visits', 1);
my $exists = await $redis->hexists('user:1', 'name');
await $redis->hdel('user:1', 'email');
# Lists
await $redis->lpush('queue', 'job1', 'job2');
await $redis->rpush('queue', 'job3');
my $job = await $redis->lpop('queue');
my $job = await $redis->rpop('queue');
my $job = await $redis->blpop('queue', 5); # blocking pop, 5s timeout
my $items = await $redis->lrange('queue', 0, -1);
my $len = await $redis->llen('queue');
# Sets
await $redis->sadd('tags', 'perl', 'redis', 'async');
await $redis->srem('tags', 'async');
my $members = await $redis->smembers('tags');
my $is_member = await $redis->sismember('tags', 'perl');
my $common = await $redis->sinter('tags1', 'tags2');
# Sorted Sets
await $redis->zadd('leaderboard', 100, 'alice', 85, 'bob');
await $redis->zincrby('leaderboard', 10, 'alice');
my $top = await $redis->zrange('leaderboard', 0, 9, 'WITHSCORES');
my $rank = await $redis->zrank('leaderboard', 'alice');
my $score = await $redis->zscore('leaderboard', 'alice');
# Keys
my $exists = await $redis->exists('key');
await $redis->expire('key', 300);
my $ttl = await $redis->ttl('key');
await $redis->del('key1', 'key2');
await $redis->rename('old', 'new');
my $type = await $redis->type('key');
my $keys = await $redis->keys('user:*'); # use SCAN in production
pipeline
my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->incr('counter');
my $results = await $pipeline->execute;
Create a pipeline for batched command execution. All commands are sent in a single network round-trip.
subscribe
my $sub = await $redis->subscribe('channel1', 'channel2');
Subscribe to channels. Returns a Async::Redis::Subscription object.
psubscribe
my $sub = await $redis->psubscribe('chan:*');
Subscribe to pattern. Returns a Subscription object.
multi
my $results = await $redis->multi(async sub {
my ($tx) = @_;
$tx->set('k1', 'v1');
$tx->incr('counter');
});
Execute a transaction with callback.
watch
await $redis->watch('key1', 'key2');
Watch keys for transaction.
watch_multi
my $results = await $redis->watch_multi(['key'], async sub {
my ($tx, $values) = @_;
$tx->set('key', $values->{key} + 1);
});
Watch keys and execute transaction atomically. Returns undef if watched keys were modified by another client.
script
my $script = $redis->script('return redis.call("get", KEYS[1])');
my $result = await $script->run(['mykey']);
Create a Lua script object with automatic EVALSHA optimization.
scan_iter
my $iter = $redis->scan_iter(match => 'user:*', count => 100);
while (my $keys = await $iter->next) {
for my $key (@$keys) { ... }
}
Create an iterator for SCAN. Also available: hscan_iter, sscan_iter, zscan_iter.
CONNECTION POOLING
For high-throughput applications, use Async::Redis::Pool:
use Async::Redis::Pool;
my $pool = Async::Redis::Pool->new(
host => 'localhost',
min => 2,
max => 10,
);
# Use with() for automatic acquire/release
my $result = await $pool->with(sub {
my ($conn) = @_;
return $conn->get('key');
});
ERROR HANDLING
Errors are thrown as exception objects:
use Try::Tiny;
try {
await $redis->get('key');
} catch {
if ($_->isa('Async::Redis::Error::Connection')) {
# Connection error
} elsif ($_->isa('Async::Redis::Error::Timeout')) {
# Timeout error
} elsif ($_->isa('Async::Redis::Error::Redis')) {
# Redis error (e.g., WRONGTYPE)
}
};
Exception classes:
- Async::Redis::Error::Connection
-
Connection-related errors (refused, reset, etc.)
- Async::Redis::Error::Timeout
-
Timeout errors (connect, request, read).
- Async::Redis::Error::Protocol
-
Protocol parsing errors.
- Async::Redis::Error::Redis
-
Errors returned by Redis (WRONGTYPE, ERR, etc.)
- Async::Redis::Error::Disconnected
-
Operation attempted on disconnected client.
FORK SAFETY
Async::Redis is fork-safe. When a fork is detected, the child process will automatically invalidate its connection state and reconnect when needed. The parent retains ownership of the original connection.
OBSERVABILITY
OpenTelemetry integration is available:
use OpenTelemetry::SDK;
my $redis = Async::Redis->new(
host => 'localhost',
otel_tracer => OpenTelemetry->tracer_provider->tracer('my-app'),
otel_meter => OpenTelemetry->meter_provider->meter('my-app'),
);
This enables:
Distributed tracing with spans per Redis command
Metrics: command latency, connection counts, errors
Automatic attribute extraction (command, database, etc.)
SEE ALSO
Future::IO - The underlying async I/O abstraction
Future::AsyncAwait - Async/await syntax support
Async::Redis::Pool - Connection pooling
Async::Redis::Subscription - PubSub subscriptions
Redis - Synchronous Redis client
Net::Async::Redis - Another async Redis client
AUTHOR
John Googoo
COPYRIGHT AND LICENSE
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.