NAME
Async::Redis - Async Redis client using Future::IO
SYNOPSIS
use Async::Redis;
use Future::AsyncAwait;
# For standalone scripts: configure Future::IO first
use Future::IO;
Future::IO->load_best_impl; # Selects UV, IO::Async, etc.
my $redis = Async::Redis->new(
host => 'localhost',
port => 6379,
);
(async sub {
await $redis->connect;
# Simple commands
await $redis->set('key', 'value');
my $value = await $redis->get('key');
# Pipelining for efficiency
my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->set('k2', 'v2');
$pipeline->get('k1');
my $results = await $pipeline->execute;
# PubSub
my $sub = await $redis->subscribe('channel');
while (my $msg = await $sub->next_message) {
print "Received: $msg->{message}\n";
}
})->()->await;
Important: If you're embedding Async::Redis in a larger application (web framework, existing event loop, etc.), see "EVENT LOOP CONFIGURATION" for how to properly configure Future::IO. Libraries should never call load_best_impl - only your application's entry point should.
DESCRIPTION
Async::Redis is an asynchronous Redis client built on Future::IO, providing a modern, non-blocking interface for Redis operations.
Key features:
Full async/await support via Future::AsyncAwait
Event loop agnostic (IO::Async, AnyEvent, UV, etc.)
Automatic reconnection with exponential backoff
Connection pooling with health checks
Pipelining and auto-pipelining
PubSub with automatic subscription replay on reconnect
Transaction support (MULTI/EXEC/WATCH)
TLS/SSL connections
OpenTelemetry observability integration
Fork-safe for pre-fork servers (Starman, etc.)
Full RESP2 protocol support
Safe concurrent commands on single connection
CONCURRENT COMMANDS
Async::Redis safely handles multiple concurrent commands on a single connection using a response queue pattern. When you fire multiple async commands without explicitly awaiting them:
my @futures = (
$redis->set('k1', 'v1'),
$redis->set('k2', 'v2'),
$redis->get('k1'),
);
my @results = await Future->needs_all(@futures);
Each command is registered in an inflight queue before being sent to Redis. A single reader coroutine processes responses in FIFO order, matching each response to the correct waiting future. This prevents response mismatch bugs that can occur when multiple coroutines race to read from the socket.
For high-throughput scenarios, consider using:
Explicit pipelines -
$redis->pipelinebatches commands for a single network round-tripAuto-pipeline -
auto_pipeline => 1automatically batches commands within an event loop tickConnection pools - Async::Redis::Pool for parallel execution across multiple connections
CONSTRUCTOR
new
my $redis = Async::Redis->new(%options);
Creates a new Redis client instance. Does not connect immediately.
Options:
- host => $hostname
-
Redis server hostname. Default: 'localhost'
- port => $port
-
Redis server port. Default: 6379
- uri => $uri
-
Connection URI (e.g., 'redis://user:pass@host:port/db'). If provided, overrides host, port, password, database options.
- password => $password
-
Authentication password.
- username => $username
-
Authentication username (Redis 6+ ACL).
- database => $db
-
Database number to SELECT after connect. Default: 0
- tls => $bool | \%options
-
Enable TLS/SSL connection. Can be a boolean or hashref with options:
tls => { ca_file => '/path/to/ca.crt', cert_file => '/path/to/client.crt', key_file => '/path/to/client.key', verify => 1, # verify server certificate } - connect_timeout => $seconds
-
Connection timeout. Default: 10
- request_timeout => $seconds
-
Per-request timeout for commands. Default: 5
Blocking commands (BLPOP, BRPOP, etc.) automatically extend this timeout based on their server-side timeout plus
blocking_timeout_buffer. - blocking_timeout_buffer => $seconds
-
Extra time added to blocking command timeouts. Default: 2
For example,
BLPOP key 30gets a deadline of 30 + 2 = 32 seconds. - reconnect => $bool
-
Enable automatic reconnection. Default: 0
- reconnect_delay => $seconds
-
Initial reconnect delay. Default: 0.1
- reconnect_delay_max => $seconds
-
Maximum reconnect delay. Default: 60
- reconnect_jitter => $ratio
-
Jitter ratio for reconnect delays. Default: 0.25
- on_connect => $coderef
-
Callback when connection established.
- on_disconnect => $coderef
-
Callback when connection lost.
- on_error => $coderef
-
Callback for connection errors.
- prefix => $prefix
-
Key prefix applied to all commands.
- client_name => $name
-
CLIENT SETNAME value sent on connect.
- debug => $bool
-
Enable debug logging.
- otel_tracer => $tracer
-
OpenTelemetry tracer for span creation.
- otel_meter => $meter
-
OpenTelemetry meter for metrics.
METHODS
connect
await $redis->connect;
Establish connection to Redis server. Returns a Future that resolves to the Redis client instance.
disconnect
$redis->disconnect;
Close connection gracefully.
command
my $result = await $redis->command('GET', 'key');
Execute arbitrary Redis command.
Redis Commands
All standard Redis commands are available as methods. See https://redis.io/docs/latest/commands/ for the complete Redis command reference.
# Strings
await $redis->set('key', 'value');
await $redis->set('key', 'value', ex => 300); # with 5min expiry
await $redis->set('key', 'value', nx => 1); # only if not exists
my $value = await $redis->get('key');
await $redis->incr('counter');
await $redis->incrby('counter', 5);
await $redis->mset('k1', 'v1', 'k2', 'v2');
my $values = await $redis->mget('k1', 'k2');
await $redis->append('key', ' more');
await $redis->setex('key', 60, 'value'); # set with 60s expiry
# Hashes
await $redis->hset('user:1', 'name', 'Alice', 'email', 'alice@example.com');
my $name = await $redis->hget('user:1', 'name');
my $user = await $redis->hgetall('user:1'); # returns hashref
await $redis->hincrby('user:1', 'visits', 1);
my $exists = await $redis->hexists('user:1', 'name');
await $redis->hdel('user:1', 'email');
# Lists
await $redis->lpush('queue', 'job1', 'job2');
await $redis->rpush('queue', 'job3');
my $job = await $redis->lpop('queue');
my $job = await $redis->rpop('queue');
my $job = await $redis->blpop('queue', 5); # blocking pop, 5s timeout
my $items = await $redis->lrange('queue', 0, -1);
my $len = await $redis->llen('queue');
# Sets
await $redis->sadd('tags', 'perl', 'redis', 'async');
await $redis->srem('tags', 'async');
my $members = await $redis->smembers('tags');
my $is_member = await $redis->sismember('tags', 'perl');
my $common = await $redis->sinter('tags1', 'tags2');
# Sorted Sets
await $redis->zadd('leaderboard', 100, 'alice', 85, 'bob');
await $redis->zincrby('leaderboard', 10, 'alice');
my $top = await $redis->zrange('leaderboard', 0, 9, 'WITHSCORES');
my $rank = await $redis->zrank('leaderboard', 'alice');
my $score = await $redis->zscore('leaderboard', 'alice');
# Keys
my $exists = await $redis->exists('key');
await $redis->expire('key', 300);
my $ttl = await $redis->ttl('key');
await $redis->del('key1', 'key2');
await $redis->rename('old', 'new');
my $type = await $redis->type('key');
my $keys = await $redis->keys('user:*'); # use SCAN in production
pipeline
my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->incr('counter');
my $results = await $pipeline->execute;
Create a pipeline for batched command execution. All commands are sent in a single network round-trip.
subscribe
my $sub = await $redis->subscribe('channel1', 'channel2');
Subscribe to channels. Returns a Async::Redis::Subscription object.
psubscribe
my $sub = await $redis->psubscribe('chan:*');
Subscribe to pattern. Returns a Subscription object.
multi
my $results = await $redis->multi(async sub {
my ($tx) = @_;
$tx->set('k1', 'v1');
$tx->incr('counter');
});
Execute a transaction with callback.
watch
await $redis->watch('key1', 'key2');
Watch keys for transaction.
watch_multi
my $results = await $redis->watch_multi(['key'], async sub {
my ($tx, $values) = @_;
$tx->set('key', $values->{key} + 1);
});
Watch keys and execute transaction atomically. Returns undef if watched keys were modified by another client.
script
my $script = $redis->script('return redis.call("get", KEYS[1])');
my $result = await $script->run(['mykey']);
Create a Lua script object with automatic EVALSHA optimization. See Async::Redis::Script for details.
define_command
$redis->define_command(my_command => {
keys => 1, # Number of KEYS (or 'dynamic')
lua => 'return ...', # Lua script code
description => 'Does X', # Optional documentation
install => 1, # Optional: install as method
});
Register a named Lua script for reuse. The script is automatically cached and uses EVALSHA for efficiency.
Options:
keys- Number of KEYS the script expects. Use'dynamic'if variable (first arg to run_script will be the key count).lua- The Lua script source code.description- Optional description for documentation.install- If true, install as a method on the Async::Redis class.
run_script
my $result = await $redis->run_script('my_command', @keys, @args);
Execute a registered script by name. For scripts with fixed key count, pass keys then args. For dynamic scripts, pass key count first:
# Fixed keys (keys => 2)
await $redis->run_script('two_key_script', 'key1', 'key2', 'arg1');
# Dynamic keys
await $redis->run_script('dynamic_script', 2, 'key1', 'key2', 'arg1');
get_script
my $script = $redis->get_script('my_command');
Get a registered script object by name. Returns undef if not found.
list_scripts
my @names = $redis->list_scripts;
List all registered script names.
preload_scripts
my $count = await $redis->preload_scripts;
Load all registered scripts to Redis server. Useful before pipeline execution to ensure EVALSHA will succeed.
LUA SCRIPTING
Async::Redis provides comprehensive support for Redis Lua scripting with automatic EVALSHA optimization.
Quick Start
# Define a reusable script
$redis->define_command(atomic_incr => {
keys => 1,
lua => <<'LUA',
local current = tonumber(redis.call('GET', KEYS[1]) or 0)
local result = current + tonumber(ARGV[1])
redis.call('SET', KEYS[1], result)
return result
LUA
});
# Use it
my $result = await $redis->run_script('atomic_incr', 'counter', 5);
Pipeline Integration
Registered scripts work in pipelines:
my $pipe = $redis->pipeline;
$pipe->run_script('atomic_incr', 'counter:a', 1);
$pipe->run_script('atomic_incr', 'counter:b', 1);
$pipe->set('other:key', 'value');
my $results = await $pipe->execute;
Scripts are automatically preloaded before pipeline execution.
Method Installation
For frequently used scripts, install as methods:
$redis->define_command(cache_get => {
keys => 1,
lua => 'return redis.call("GET", KEYS[1])',
install => 1,
});
# Now call directly
my $value = await $redis->cache_get('my:key');
EVALSHA Optimization
Scripts automatically use EVALSHA (by SHA1 hash) for efficiency. If the script isn't cached on the server, it falls back to EVAL and caches for future calls. This is transparent to your code.
scan_iter
my $iter = $redis->scan_iter(match => 'user:*', count => 100);
while (my $keys = await $iter->next) {
for my $key (@$keys) { ... }
}
Create an iterator for SCAN. Also available: hscan_iter, sscan_iter, zscan_iter.
CONNECTION POOLING
For high-throughput applications, use Async::Redis::Pool:
use Async::Redis::Pool;
my $pool = Async::Redis::Pool->new(
host => 'localhost',
min => 2,
max => 10,
);
# Use with() for automatic acquire/release
my $result = await $pool->with(sub {
my ($conn) = @_;
return $conn->get('key');
});
ERROR HANDLING
Errors are thrown as exception objects:
use Try::Tiny;
try {
await $redis->get('key');
} catch {
if ($_->isa('Async::Redis::Error::Connection')) {
# Connection error
} elsif ($_->isa('Async::Redis::Error::Timeout')) {
# Timeout error
} elsif ($_->isa('Async::Redis::Error::Redis')) {
# Redis error (e.g., WRONGTYPE)
}
};
Exception classes:
- Async::Redis::Error::Connection
-
Connection-related errors (refused, reset, etc.)
- Async::Redis::Error::Timeout
-
Timeout errors (connect, request, read).
- Async::Redis::Error::Protocol
-
Protocol parsing errors.
- Async::Redis::Error::Redis
-
Errors returned by Redis (WRONGTYPE, ERR, etc.)
- Async::Redis::Error::Disconnected
-
Operation attempted on disconnected client.
FORK SAFETY
Async::Redis is fork-safe. When a fork is detected, the child process will automatically invalidate its connection state and reconnect when needed. The parent retains ownership of the original connection.
EVENT LOOP CONFIGURATION
Async::Redis uses Future::IO for event loop abstraction, making it compatible with IO::Async, UV, AnyEvent, and other event loops. However, Async::Redis does not choose which event loop to use - that's the application's responsibility.
The Golden Rule
Only executable scripts should configure Future::IO. Library modules (.pm files) should never call load_best_impl or load_impl because they don't know what event loop the application wants to use.
When you use Async::Redis inside a larger application, you are a "guest" in that application's event loop. The application (the "host") decides which Future::IO implementation to use, and all libraries must cooperate.
For Standalone Scripts
If you're writing a standalone script that uses Async::Redis directly, configure Future::IO at the top of your script:
#!/usr/bin/env perl
use strict;
use warnings;
use Future::IO;
Future::IO->load_best_impl; # Auto-select best available
use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');
# ...
load_best_impl will select the best available backend, typically preferring UV if installed, then IO::Async, then others.
For IO::Async Applications
If your application uses IO::Async for its event loop:
use IO::Async::Loop;
use Future::IO;
Future::IO->load_impl('IOAsync'); # Explicitly use IO::Async
my $loop = IO::Async::Loop->new;
use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');
For UV Applications
If your application uses UV (libuv) for its event loop:
use UV;
use Future::IO;
Future::IO->load_impl('UV'); # Explicitly use UV
use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');
When Using Multiple Async Libraries
When combining Async::Redis with other Future::IO-based libraries (like web frameworks, database clients, etc.), all libraries will share the same Future::IO backend. This is by design - they're all cooperating within the same event loop.
The key is that the application configures Future::IO once, before loading any libraries that use it:
# Application startup
use Future::IO;
Future::IO->load_impl('IOAsync'); # Application's choice
# Now load libraries - they all use the configured backend
use Async::Redis;
use Some::Other::Async::Library;
use My::Web::Framework;
What Happens Without Configuration
If nothing explicitly configures Future::IO before the first async operation, Future::IO will auto-select an implementation. This can lead to unexpected behavior if different parts of your application assume different backends.
To avoid surprises, always configure Future::IO explicitly in your application's entry point.
Checking the Current Implementation
To see which Future::IO implementation is active:
use Future::IO;
print "Using: $Future::IO::IMPL\n";
OBSERVABILITY
OpenTelemetry integration is available:
use OpenTelemetry::SDK;
my $redis = Async::Redis->new(
host => 'localhost',
otel_tracer => OpenTelemetry->tracer_provider->tracer('my-app'),
otel_meter => OpenTelemetry->meter_provider->meter('my-app'),
);
This enables:
Distributed tracing with spans per Redis command
Metrics: command latency, connection counts, errors
Automatic attribute extraction (command, database, etc.)
SEE ALSO
Future::IO - The underlying async I/O abstraction
Future::AsyncAwait - Async/await syntax support
Async::Redis::Pool - Connection pooling
Async::Redis::Subscription - PubSub subscriptions
Redis - Synchronous Redis client
Net::Async::Redis - Another async Redis client
AUTHOR
John Napiorkowski
COPYRIGHT AND LICENSE
This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.