NAME

Async::Redis - Async Redis client using Future::IO

SYNOPSIS

use Async::Redis;
use Future::AsyncAwait;

# Future::IO 0.23+ has a built-in poll-based impl that works
# out of the box. For IO::Async or UV, require the impl directly:
# require Future::IO::Impl::IOAsync;  # if using IO::Async
# require Future::IO::Impl::UV;       # if using UV

my $redis = Async::Redis->new(
    host => 'localhost',
    port => 6379,
);

(async sub {
    await $redis->connect;

    # Simple commands
    await $redis->set('key', 'value');
    my $value = await $redis->get('key');

    # Pipelining for efficiency
    my $pipeline = $redis->pipeline;
    $pipeline->set('k1', 'v1');
    $pipeline->set('k2', 'v2');
    $pipeline->get('k1');
    my $results = await $pipeline->execute;

    # PubSub
    my $sub = await $redis->subscribe('channel');
    while (my $msg = await $sub->next_message) {
        print "Received: $msg->{message}\n";
    }
})->()->await;

Important: If you're embedding Async::Redis in a larger application (web framework, existing event loop, etc.), see "EVENT LOOP CONFIGURATION" for how to properly configure Future::IO. Libraries should never configure the Future::IO backend - only your application's entry point should.

DESCRIPTION

Async::Redis is an asynchronous Redis client built on Future::IO, providing a modern, non-blocking interface for Redis operations.

Key features:

  • Full async/await support via Future::AsyncAwait

  • Event loop agnostic (IO::Async, AnyEvent, UV, etc.)

  • Automatic reconnection with exponential backoff

  • Connection pooling with health checks

  • Pipelining and auto-pipelining

  • PubSub with automatic subscription replay on reconnect

    When a connection drops during pub/sub mode and reconnect is enabled, all subscriptions are automatically re-established. Use $subscription->on_reconnect(sub { ... }) to be notified when this happens (e.g., to re-poll state that may have changed during the outage).

  • Transaction support (MULTI/EXEC/WATCH)

  • TLS/SSL connections

  • OpenTelemetry observability integration

  • Fork-safe for pre-fork servers (Starman, etc.)

  • Full RESP2 protocol support

  • Safe concurrent commands on single connection

CONCURRENT COMMANDS

Async::Redis safely handles multiple concurrent commands on a single connection using a response queue pattern. When you fire multiple async commands without explicitly awaiting them:

my @futures = (
    $redis->set('k1', 'v1'),
    $redis->set('k2', 'v2'),
    $redis->get('k1'),
);
my @results = await Future->needs_all(@futures);

Each command is registered in an inflight queue before being sent to Redis. A single reader coroutine processes responses in FIFO order, matching each response to the correct waiting future. This prevents response mismatch bugs that can occur when multiple coroutines race to read from the socket.

For high-throughput scenarios, consider using:

  • Explicit pipelines - $redis->pipeline batches commands for a single network round-trip

  • Auto-pipeline - auto_pipeline => 1 automatically batches commands within an event loop tick

  • Connection pools - Async::Redis::Pool for parallel execution across multiple connections

CONSTRUCTOR

new

my $redis = Async::Redis->new(%options);

Creates a new Redis client instance. Does not connect immediately.

Options:

host => $hostname

Redis server hostname. Default: 'localhost'

port => $port

Redis server port. Default: 6379

uri => $uri

Connection URI (e.g., 'redis://user:pass@host:port/db'). If provided, overrides host, port, password, database options.

path => $path

Unix domain socket path. When provided, host and port are ignored. Also available via redis+unix:// URIs.

password => $password

Authentication password.

username => $username

Authentication username (Redis 6+ ACL).

database => $db

Database number to SELECT after connect. Default: 0

tls => $bool | \%options

Enable TLS/SSL connection. Can be a boolean or hashref with options:

tls => {
    ca_file   => '/path/to/ca.crt',
    cert_file => '/path/to/client.crt',
    key_file  => '/path/to/client.key',
    verify    => 1,  # verify server certificate
    verify_hostname => 1,  # verify certificate name/IP
}
connect_timeout => $seconds

Connection timeout. Default: 10

request_timeout => $seconds

Per-request timeout for commands. Default: 5

Blocking commands (BLPOP, BRPOP, etc.) automatically extend this timeout based on their server-side timeout plus blocking_timeout_buffer. Blocking commands with a Redis timeout of 0 block indefinitely and do not get a client-side request deadline.

blocking_timeout_buffer => $seconds

Extra time added to blocking command timeouts. Default: 2

For example, BLPOP key 30 gets a deadline of 30 + 2 = 32 seconds.

reconnect => $bool

Enable automatic reconnection. Default: 0

reconnect_delay => $seconds

Initial reconnect delay. Default: 0.1

reconnect_delay_max => $seconds

Maximum reconnect delay. Default: 60

reconnect_jitter => $ratio

Jitter ratio for reconnect delays. Default: 0.25

reconnect_max_attempts => $int

Maximum number of reconnect attempts before _reconnect gives up and dies with an Async::Redis::Error::Disconnected. Default: 10. Set to 0 for unlimited retries (not recommended in production: an unreachable Redis will loop with exponential backoff forever, giving consumers no way to distinguish "reconnecting" from "broken").

When the cap is exceeded, the failure propagates through _reconnect_pubsub to any active Async::Redis::Subscription's read loop, where it routes to on_error (or dies loudly if no on_error is registered) per the existing Subscription fatal-error contract.

on_connect => $coderef

Callback when connection is established. Called as $coderef->($redis).

on_disconnect => $coderef

Callback when a live connection is intentionally closed or lost. Called as $coderef->($redis, $reason).

on_error => $coderef

Callback for connection/read errors before they are propagated. Called as $coderef->($redis, $error).

prefix => $prefix

Key prefix applied to supported key-bearing commands. See "PREFIX LIMITATIONS".

client_name => $name

CLIENT SETNAME value sent on connect.

pipeline_depth => $int

Maximum commands allowed in an explicit pipeline. Default: 10000

auto_pipeline => $bool

If true, commands issued in the same event-loop tick are automatically batched and sent as one pipeline. Default: 0

message_queue_depth => $int

Maximum number of locally queued pub/sub messages before the reader applies backpressure. Must be at least 1. Default: 1

debug => $bool | $coderef

Enable debug logging. A true non-coderef logs to STDERR. A coderef receives ($direction, $data).

otel_tracer => $tracer

OpenTelemetry tracer for span creation.

otel_meter => $meter

OpenTelemetry meter for metrics.

otel_include_args => $bool

Include command arguments in OpenTelemetry span statements. Default: 0. See "OPENTELEMETRY ARGUMENTS".

otel_redact => $bool

Apply built-in credential redaction when command arguments are included in logs or spans. Default: 1.

PREFIX LIMITATIONS

The prefix option is a convenience for namespacing keys; it is not a hard security boundary. Key extraction is driven by a hand-maintained map covering common Redis commands. As of 0.002000, the following commands have incomplete or missing key extraction: BITFIELD, LMPOP, BLMPOP, ZINTERCARD, LCS, GEORADIUS_RO, GEORADIUSBYMEMBER_RO. Calls to these commands will not have prefixes applied. For multi-tenant isolation, use Redis ACLs or separate Redis databases rather than relying on prefix.

TLS HOSTNAME VERIFICATION

Starting in 0.002000, TLS connections verify the server certificate's hostname (or IP SAN) by default when verify is on. If you connect by hostname, the certificate must have a matching CN or SAN. If you connect by IP literal, the certificate must have a matching IP SAN.

For deployments where the certificate does not match the connected hostname/IP (common when connecting to internal IPs with hostname-only certs), set tls => { verify_hostname => 0 } to skip hostname identity while still verifying the CA chain.

METHODS

connect

await $redis->connect;

Establish connection to Redis server. Returns a Future that resolves to the Redis client instance.

disconnect

$redis->disconnect;

Close connection gracefully.

is_connected

my $ok = $redis->is_connected;

Return true when the client currently has an open Redis connection.

ping

my $pong = await $redis->ping;

Send PING and return Redis's response.

command

my $result = await $redis->command('GET', 'key');

Execute arbitrary Redis command.

Redis Commands

All standard Redis commands are available as methods. See https://redis.io/docs/latest/commands/ for the complete Redis command reference.

# Strings
await $redis->set('key', 'value');
await $redis->set('key', 'value', ex => 300);  # with 5min expiry
await $redis->set('key', 'value', nx => 1);    # only if not exists
my $value = await $redis->get('key');
await $redis->incr('counter');
await $redis->incrby('counter', 5);
await $redis->mset('k1', 'v1', 'k2', 'v2');
my $values = await $redis->mget('k1', 'k2');
await $redis->append('key', ' more');
await $redis->setex('key', 60, 'value');       # set with 60s expiry

# Hashes
await $redis->hset('user:1', 'name', 'Alice', 'email', 'alice@example.com');
my $name = await $redis->hget('user:1', 'name');
my $user = await $redis->hgetall('user:1');    # returns hashref
await $redis->hincrby('user:1', 'visits', 1);
my $exists = await $redis->hexists('user:1', 'name');
await $redis->hdel('user:1', 'email');

# Lists
await $redis->lpush('queue', 'job1', 'job2');
await $redis->rpush('queue', 'job3');
my $left = await $redis->lpop('queue');
my $right = await $redis->rpop('queue');
my $popped = await $redis->blpop('queue', 5);  # ['queue', 'job'] or undef
my $items = await $redis->lrange('queue', 0, -1);
my $len = await $redis->llen('queue');

# Sets
await $redis->sadd('tags', 'perl', 'redis', 'async');
await $redis->srem('tags', 'async');
my $members = await $redis->smembers('tags');
my $is_member = await $redis->sismember('tags', 'perl');
my $common = await $redis->sinter('tags1', 'tags2');

# Sorted Sets
await $redis->zadd('leaderboard', 100, 'alice', 85, 'bob');
await $redis->zincrby('leaderboard', 10, 'alice');
my $top = await $redis->zrange('leaderboard', 0, 9, 'WITHSCORES');
my $rank = await $redis->zrank('leaderboard', 'alice');
my $score = await $redis->zscore('leaderboard', 'alice');

# Keys
my $exists = await $redis->exists('key');
await $redis->expire('key', 300);
my $ttl = await $redis->ttl('key');
await $redis->del('key1', 'key2');
await $redis->rename('old', 'new');
my $type = await $redis->type('key');
my $keys = await $redis->keys('user:*');       # use SCAN in production

pipeline

my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->incr('counter');
my $results = await $pipeline->execute;

Create a pipeline for batched command execution. All commands are sent in a single network round-trip.

subscribe

my $sub = await $redis->subscribe('channel1', 'channel2');

Subscribe to channels. Returns a Async::Redis::Subscription object.

psubscribe

my $sub = await $redis->psubscribe('chan:*');

Subscribe to patterns. Returns a Async::Redis::Subscription object.

ssubscribe

my $sub = await $redis->ssubscribe('shard-channel');

Subscribe to Redis 7 sharded pub/sub channels. Returns a Async::Redis::Subscription object.

publish

my $receivers = await $redis->publish('channel', 'message');

Publish a regular pub/sub message.

spublish

my $receivers = await $redis->spublish('shard-channel', 'message');

Publish a Redis 7 sharded pub/sub message.

multi

my $results = await $redis->multi(async sub {
    my ($tx) = @_;
    $tx->set('k1', 'v1');
    $tx->incr('counter');
});

Execute a transaction with callback.

watch

await $redis->watch('key1', 'key2');

Watch keys for a manual optimistic transaction. Redis clears watched keys on EXEC, DISCARD, or UNWATCH.

unwatch

await $redis->unwatch;

Clear all watched keys on the current connection.

multi_start

await $redis->multi_start;

Start a manual MULTI transaction and mark the connection dirty until exec or discard.

exec

my $results = await $redis->exec;

Execute a manual transaction. Returns undef if Redis aborts because a watched key changed.

discard

await $redis->discard;

Abort a manual transaction. Redis also clears any active watched keys.

watch_multi

my $results = await $redis->watch_multi(['key'], async sub {
    my ($tx, $values) = @_;
    $tx->set('key', $values->{key} + 1);
});

Watch keys and execute transaction atomically. Returns undef if watched keys were modified by another client.

script

my $script = $redis->script('return redis.call("get", KEYS[1])');
my $result = await $script->run(['mykey']);

Create a Lua script object with automatic EVALSHA optimization. See Async::Redis::Script for details.

define_command

$redis->define_command(my_command => {
    keys        => 1,               # Number of KEYS (or 'dynamic')
    lua         => 'return ...',    # Lua script code
    description => 'Does X',        # Optional documentation
});

Register a named Lua script for reuse. The script is automatically cached and uses EVALSHA for efficiency. Script names are kept in this Redis object's registry only; they are not installed as Perl methods. Execute registered scripts with run_script.

Options:

  • keys - Number of KEYS the script expects. Use 'dynamic' if variable (first arg to run_script will be the key count).

  • lua - The Lua script source code.

  • description - Optional description for documentation.

run_script

my $result = await $redis->run_script('my_command', @keys, @args);

Execute a registered script by name. For scripts with fixed key count, pass keys then args. For dynamic scripts, pass key count first:

# Fixed keys (keys => 2)
await $redis->run_script('two_key_script', 'key1', 'key2', 'arg1');

# Dynamic keys
await $redis->run_script('dynamic_script', 2, 'key1', 'key2', 'arg1');

get_script

my $script = $redis->get_script('my_command');

Get a registered script object by name. Returns undef if not found.

list_scripts

my @names = $redis->list_scripts;

List all registered script names.

preload_scripts

my $count = await $redis->preload_scripts;

Load all registered scripts to Redis server. Useful before pipeline execution to ensure EVALSHA will succeed.

script_load / script_exists / script_flush / script_kill

my $sha = await $redis->script_load($lua);
my $flags = await $redis->script_exists($sha1, $sha2);
await $redis->script_flush('ASYNC');
await $redis->script_kill;

Thin wrappers around Redis SCRIPT subcommands.

is_dirty

my $dirty = $redis->is_dirty;

Return true if the connection has state that makes it unsafe to return to a pool: active transaction, watched keys, pub/sub mode, or pending responses.

in_multi / watching / in_pubsub / inflight_count

State accessors used by Async::Redis::Pool and useful for diagnostics.

LUA SCRIPTING

Async::Redis provides comprehensive support for Redis Lua scripting with automatic EVALSHA optimization.

Quick Start

# Define a reusable script
$redis->define_command(atomic_incr => {
    keys => 1,
    lua  => <<'LUA',
        local current = tonumber(redis.call('GET', KEYS[1]) or 0)
        local result = current + tonumber(ARGV[1])
        redis.call('SET', KEYS[1], result)
        return result
LUA
});

# Use it
my $result = await $redis->run_script('atomic_incr', 'counter', 5);

Pipeline Integration

Registered scripts work in pipelines:

my $pipe = $redis->pipeline;
$pipe->run_script('atomic_incr', 'counter:a', 1);
$pipe->run_script('atomic_incr', 'counter:b', 1);
$pipe->set('other:key', 'value');
my $results = await $pipe->execute;

Scripts are automatically preloaded before pipeline execution.

EVALSHA Optimization

Scripts automatically use EVALSHA (by SHA1 hash) for efficiency. If the script isn't cached on the server, it falls back to EVAL and caches for future calls. This is transparent to your code.

scan_iter

my $iter = $redis->scan_iter(match => 'user:*', count => 100);
while (my $keys = await $iter->next) {
    for my $key (@$keys) { ... }
}

Create an iterator for SCAN. Also available:

my $hash_iter = $redis->hscan_iter('hash', match => 'field:*');
my $set_iter  = $redis->sscan_iter('set', count => 100);
my $zset_iter = $redis->zscan_iter('zset');

Iterators return batches. ZSCAN batches are the Redis flat member/score list.

CONNECTION POOLING

For high-throughput applications, use Async::Redis::Pool:

use Async::Redis::Pool;

my $pool = Async::Redis::Pool->new(
    host => 'localhost',
    min  => 2,
    max  => 10,
);

# Use with() for automatic acquire/release
my $result = await $pool->with(sub {
    my ($conn) = @_;
    return $conn->get('key');
});

ERROR HANDLING

Errors are thrown as exception objects:

eval {
    await $redis->get('key');
    1;
} or do {
    my $error = $@;
    if (ref($error) && $error->isa('Async::Redis::Error::Connection')) {
        # Connection error
    } elsif (ref($error) && $error->isa('Async::Redis::Error::Timeout')) {
        # Timeout error
    } elsif (ref($error) && $error->isa('Async::Redis::Error::Redis')) {
        # Redis error (e.g., WRONGTYPE)
    }
};

Exception classes:

Async::Redis::Error::Connection

Connection-related errors (refused, reset, etc.)

Async::Redis::Error::Timeout

Timeout errors (connect, request, read).

Async::Redis::Error::Protocol

Protocol parsing errors.

Async::Redis::Error::Redis

Errors returned by Redis (WRONGTYPE, ERR, etc.)

Async::Redis::Error::Disconnected

Operation attempted on disconnected client.

FORK SAFETY

Async::Redis is fork-safe. When a fork is detected, the child process will automatically invalidate its connection state and reconnect when needed. The parent retains ownership of the original connection.

EVENT LOOP CONFIGURATION

Async::Redis uses Future::IO for event loop abstraction, making it compatible with IO::Async, UV, AnyEvent, and other event loops. However, Async::Redis does not choose which event loop to use - that's the application's responsibility.

Default (No Configuration Needed)

Future::IO 0.23+ includes a built-in poll-based implementation that works out of the box. For standalone scripts, you don't need to configure anything:

#!/usr/bin/env perl
use strict;
use warnings;
use Async::Redis;

my $redis = Async::Redis->new(host => 'localhost');
# Just works - Future::IO uses its built-in IO::Poll backend

The Golden Rule

Only executable scripts should configure Future::IO. Library modules (.pm files) should never configure the backend because they don't know what event loop the application wants to use.

For IO::Async Applications

If your application already uses IO::Async for its event loop, load the implementation directly:

use IO::Async::Loop;
require Future::IO::Impl::IOAsync;

my $loop = IO::Async::Loop->new;

use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');

Note: Use require rather than Future::IO->load_impl('IOAsync') for compatibility with Future::IO 0.22+ which gates load_impl on the newer poll API.

For UV Applications

If your application uses UV (libuv) for its event loop:

use UV;
require Future::IO::Impl::UV;

use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');

Checking the Current Implementation

To see which Future::IO implementation is active:

use Future::IO;
print "Using: $Future::IO::IMPL\n";

OBSERVABILITY

OpenTelemetry integration is available:

use OpenTelemetry::SDK;

my $redis = Async::Redis->new(
    host        => 'localhost',
    otel_tracer => OpenTelemetry->tracer_provider->tracer('my-app'),
    otel_meter  => OpenTelemetry->meter_provider->meter('my-app'),
);

This enables:

  • Distributed tracing with spans per Redis command

  • Metrics: command latency, connection counts, errors

  • Automatic attribute extraction (command, database, etc.)

OPENTELEMETRY ARGUMENTS

Starting in 0.002000, command arguments are no longer included in spans by default. Redis values frequently contain session tokens, user IDs, and other PII; exporting them to a tracing backend is a privacy hazard. Pass otel_include_args => 1 to re-enable, and implement custom redaction for your data shapes before doing so.

MESSAGE QUEUE DEPTH

message_queue_depth limits the number of queued pubsub messages. Callback invocation is always serialized. With message_queue_depth => 1 (the default), one message may be queued while one callback is still processing; the reader pauses when that queue slot is full. Higher values allow more messages to buffer locally before the reader pauses.

TASK LIFECYCLE

Async::Redis organizes all fire-and-forget background work (the socket reader, reconnect attempts, auto-pipeline submit batches, the pubsub callback driver) under a single per-client Future::Selector instance, following Paul Evans's client pattern from Sys::Async::Virt and IPC::MicroSocket.

Each background task is registered with the selector via $selector->add(data => $label, f => $task_future). Command execution awaits responses via $selector->run_until_ready($response_future), which pumps the selector and propagates any task failure to the awaiting caller. The practical guarantee: if a background task dies (including from a coding bug that escapes explicit fatal-error handling), awaiting callers see a typed failure rather than hanging forever.

This structure provides the five structured-concurrency properties articulated by the trio / asyncio.TaskGroup ecosystems:

  • GC safety - every background task is held by the selector.

  • Error propagation - any task's failure reaches callers awaiting the selector via run_until_ready.

  • Cancellation - socket closure propagates to pending I/O, which fails the owning task, which the selector propagates.

  • Scope cleanup - disconnect tears down state; remaining selector tasks unwind via their existing on_fail handlers.

  • Local reasoning - all concurrent work on one connection is owned by one place.

There is no user-facing API for the selector; it is internal machinery. Clients should not call $redis->{_tasks} directly.

Note: the only use of Future->retain in this codebase is avoided in favor of selector ownership. Any patch that introduces ->retain on an Async::Redis-owned Future should instead add the task to $self->{_tasks} so failure propagation and lifetime ownership are consistent.

KNOWN LIMITATIONS

  • Hostname resolution is synchronous. connect() calls inet_aton before the async connect, which blocks during DNS lookup. Not covered by connect_timeout.

  • IPv6 URI hosts are not yet supported.

  • Some generated wrappers expose mode-changing commands (HELLO, CLIENT REPLY, MONITOR, SYNC, PSYNC) that interact poorly with the response model. Avoid them unless you understand the protocol consequences.

SEE ALSO

AUTHOR

John Napiorkowski

COPYRIGHT AND LICENSE

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.