NAME
Async::Redis - Async Redis client using Future::IO
SYNOPSIS
use Async::Redis;
use Future::AsyncAwait;
# Use any Future::IO-compatible event loop
# IO::Async:
use IO::Async::Loop;
use Future::IO;
Future::IO->load_impl('IOAsync');
my $loop = IO::Async::Loop->new;
# Or UV: Future::IO->load_impl('UV');
# Or Glib: Future::IO->load_impl('Glib');
my $redis = Async::Redis->new(
host => 'localhost',
port => 6379,
);
(async sub {
await $redis->connect;
# Simple commands
await $redis->set('key', 'value');
my $value = await $redis->get('key');
# Pipelining for efficiency
my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->set('k2', 'v2');
$pipeline->get('k1');
my $results = await $pipeline->execute;
# PubSub
my $sub = await $redis->subscribe('channel');
while (my $msg = await $sub->next_message) {
print "Received: $msg->{message}\n";
}
})->();
$loop->run;
DESCRIPTION
Async::Redis is an asynchronous Redis client built on Future::IO, providing a modern, non-blocking interface for Redis operations.
Key features:
Full async/await support via Future::AsyncAwait
Event loop agnostic (IO::Async, AnyEvent, UV, etc.)
Automatic reconnection with exponential backoff
Connection pooling with health checks
Pipelining and auto-pipelining
PubSub with automatic subscription replay on reconnect
Transaction support (MULTI/EXEC/WATCH)
TLS/SSL connections
OpenTelemetry observability integration
Fork-safe for pre-fork servers (Starman, etc.)
Full RESP2 protocol support
CONSTRUCTOR
new
my $redis = Async::Redis->new(%options);
Creates a new Redis client instance. Does not connect immediately.
Options:
- host => $hostname
-
Redis server hostname. Default: 'localhost'
- port => $port
-
Redis server port. Default: 6379
- uri => $uri
-
Connection URI (e.g., 'redis://user:pass@host:port/db'). If provided, overrides host, port, password, database options.
- password => $password
-
Authentication password.
- username => $username
-
Authentication username (Redis 6+ ACL).
- database => $db
-
Database number to SELECT after connect. Default: 0
- tls => $bool | \%options
-
Enable TLS/SSL connection. Can be a boolean or hashref with options:
tls => { ca_file => '/path/to/ca.crt', cert_file => '/path/to/client.crt', key_file => '/path/to/client.key', verify => 1, # verify server certificate } - connect_timeout => $seconds
-
Connection timeout. Default: 10
- read_timeout => $seconds
-
Read timeout. Default: 30
- request_timeout => $seconds
-
Per-request timeout. Default: 5
- reconnect => $bool
-
Enable automatic reconnection. Default: 0
- reconnect_delay => $seconds
-
Initial reconnect delay. Default: 0.1
- reconnect_delay_max => $seconds
-
Maximum reconnect delay. Default: 60
- reconnect_jitter => $ratio
-
Jitter ratio for reconnect delays. Default: 0.25
- on_connect => $coderef
-
Callback when connection established.
- on_disconnect => $coderef
-
Callback when connection lost.
- on_error => $coderef
-
Callback for connection errors.
- prefix => $prefix
-
Key prefix applied to all commands.
- client_name => $name
-
CLIENT SETNAME value sent on connect.
- debug => $bool
-
Enable debug logging.
- otel_tracer => $tracer
-
OpenTelemetry tracer for span creation.
- otel_meter => $meter
-
OpenTelemetry meter for metrics.
METHODS
connect
await $redis->connect;
Establish connection to Redis server. Returns a Future that resolves to the Redis client instance.
disconnect
$redis->disconnect;
Close connection gracefully.
command
my $result = await $redis->command('GET', 'key');
Execute arbitrary Redis command.
Redis Commands
All standard Redis commands are available as methods. See https://redis.io/docs/latest/commands/ for the complete Redis command reference.
# Strings
await $redis->set('key', 'value');
await $redis->set('key', 'value', ex => 300); # with 5min expiry
await $redis->set('key', 'value', nx => 1); # only if not exists
my $value = await $redis->get('key');
await $redis->incr('counter');
await $redis->incrby('counter', 5);
await $redis->mset('k1', 'v1', 'k2', 'v2');
my $values = await $redis->mget('k1', 'k2');
await $redis->append('key', ' more');
await $redis->setex('key', 60, 'value'); # set with 60s expiry
# Hashes
await $redis->hset('user:1', 'name', 'Alice', 'email', 'alice@example.com');
my $name = await $redis->hget('user:1', 'name');
my $user = await $redis->hgetall('user:1'); # returns hashref
await $redis->hincrby('user:1', 'visits', 1);
my $exists = await $redis->hexists('user:1', 'name');
await $redis->hdel('user:1', 'email');
# Lists
await $redis->lpush('queue', 'job1', 'job2');
await $redis->rpush('queue', 'job3');
my $job = await $redis->lpop('queue');
my $job = await $redis->rpop('queue');
my $job = await $redis->blpop('queue', 5); # blocking pop, 5s timeout
my $items = await $redis->lrange('queue', 0, -1);
my $len = await $redis->llen('queue');
# Sets
await $redis->sadd('tags', 'perl', 'redis', 'async');
await $redis->srem('tags', 'async');
my $members = await $redis->smembers('tags');
my $is_member = await $redis->sismember('tags', 'perl');
my $common = await $redis->sinter('tags1', 'tags2');
# Sorted Sets
await $redis->zadd('leaderboard', 100, 'alice', 85, 'bob');
await $redis->zincrby('leaderboard', 10, 'alice');
my $top = await $redis->zrange('leaderboard', 0, 9, 'WITHSCORES');
my $rank = await $redis->zrank('leaderboard', 'alice');
my $score = await $redis->zscore('leaderboard', 'alice');
# Keys
my $exists = await $redis->exists('key');
await $redis->expire('key', 300);
my $ttl = await $redis->ttl('key');
await $redis->del('key1', 'key2');
await $redis->rename('old', 'new');
my $type = await $redis->type('key');
my $keys = await $redis->keys('user:*'); # use SCAN in production
pipeline
my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->incr('counter');
my $results = await $pipeline->execute;
Create a pipeline for batched command execution. All commands are sent in a single network round-trip.
subscribe
my $sub = await $redis->subscribe('channel1', 'channel2');
Subscribe to channels. Returns a Async::Redis::Subscription object.
psubscribe
my $sub = await $redis->psubscribe('chan:*');
Subscribe to pattern. Returns a Subscription object.
multi
my $results = await $redis->multi(async sub {
my ($tx) = @_;
$tx->set('k1', 'v1');
$tx->incr('counter');
});
Execute a transaction with callback.
watch
await $redis->watch('key1', 'key2');
Watch keys for transaction.
watch_multi
my $results = await $redis->watch_multi(['key'], async sub {
my ($tx, $values) = @_;
$tx->set('key', $values->{key} + 1);
});
Watch keys and execute transaction atomically. Returns undef if watched keys were modified by another client.
script
my $script = $redis->script('return redis.call("get", KEYS[1])');
my $result = await $script->run(['mykey']);
Create a Lua script object with automatic EVALSHA optimization.
scan_iter
my $iter = $redis->scan_iter(match => 'user:*', count => 100);
while (my $keys = await $iter->next) {
for my $key (@$keys) { ... }
}
Create an iterator for SCAN. Also available: hscan_iter, sscan_iter, zscan_iter.
CONNECTION POOLING
For high-throughput applications, use Async::Redis::Pool:
use Async::Redis::Pool;
my $pool = Async::Redis::Pool->new(
host => 'localhost',
min => 2,
max => 10,
);
# Use with() for automatic acquire/release
my $result = await $pool->with(sub {
my ($conn) = @_;
return $conn->get('key');
});
ERROR HANDLING
Errors are thrown as exception objects:
use Try::Tiny;
try {
await $redis->get('key');
} catch {
if ($_->isa('Async::Redis::Error::Connection')) {
# Connection error
} elsif ($_->isa('Async::Redis::Error::Timeout')) {
# Timeout error
} elsif ($_->isa('Async::Redis::Error::Redis')) {
# Redis error (e.g., WRONGTYPE)
}
};
Exception classes:
- Async::Redis::Error::Connection
-
Connection-related errors (refused, reset, etc.)
- Async::Redis::Error::Timeout
-
Timeout errors (connect, request, read).
- Async::Redis::Error::Protocol
-
Protocol parsing errors.
- Async::Redis::Error::Redis
-
Errors returned by Redis (WRONGTYPE, ERR, etc.)
- Async::Redis::Error::Disconnected
-
Operation attempted on disconnected client.
FORK SAFETY
Async::Redis is fork-safe. When a fork is detected, the child process will automatically invalidate its connection state and reconnect when needed. The parent retains ownership of the original connection.
OBSERVABILITY
OpenTelemetry integration is available:
use OpenTelemetry::SDK;
my $redis = Async::Redis->new(
host => 'localhost',
otel_tracer => OpenTelemetry->tracer_provider->tracer('my-app'),
otel_meter => OpenTelemetry->meter_provider->meter('my-app'),
);
This enables:
Distributed tracing with spans per Redis command
Metrics: command latency, connection counts, errors
Automatic attribute extraction (command, database, etc.)
SEE ALSO
Future::IO - The underlying async I/O abstraction
Future::AsyncAwait - Async/await syntax support
Async::Redis::Pool - Connection pooling
Async::Redis::Subscription - PubSub subscriptions
Redis - Synchronous Redis client
Net::Async::Redis - Another async Redis client
AUTHOR
John Googoo
COPYRIGHT AND LICENSE
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.