NAME

Async::Redis - Async Redis client using Future::IO

SYNOPSIS

use Async::Redis;
use Future::AsyncAwait;

# For standalone scripts: configure Future::IO first
use Future::IO;
Future::IO->load_best_impl;  # Selects UV, IO::Async, etc.

my $redis = Async::Redis->new(
    host => 'localhost',
    port => 6379,
);

(async sub {
    await $redis->connect;

    # Simple commands
    await $redis->set('key', 'value');
    my $value = await $redis->get('key');

    # Pipelining for efficiency
    my $pipeline = $redis->pipeline;
    $pipeline->set('k1', 'v1');
    $pipeline->set('k2', 'v2');
    $pipeline->get('k1');
    my $results = await $pipeline->execute;

    # PubSub
    my $sub = await $redis->subscribe('channel');
    while (my $msg = await $sub->next_message) {
        print "Received: $msg->{message}\n";
    }
})->()->await;

Important: If you're embedding Async::Redis in a larger application (web framework, existing event loop, etc.), see "EVENT LOOP CONFIGURATION" for how to properly configure Future::IO. Libraries should never call load_best_impl - only your application's entry point should.

DESCRIPTION

Async::Redis is an asynchronous Redis client built on Future::IO, providing a modern, non-blocking interface for Redis operations.

Key features:

  • Full async/await support via Future::AsyncAwait

  • Event loop agnostic (IO::Async, AnyEvent, UV, etc.)

  • Automatic reconnection with exponential backoff

  • Connection pooling with health checks

  • Pipelining and auto-pipelining

  • PubSub with automatic subscription replay on reconnect

  • Transaction support (MULTI/EXEC/WATCH)

  • TLS/SSL connections

  • OpenTelemetry observability integration

  • Fork-safe for pre-fork servers (Starman, etc.)

  • Full RESP2 protocol support

  • Safe concurrent commands on single connection

CONCURRENT COMMANDS

Async::Redis safely handles multiple concurrent commands on a single connection using a response queue pattern. When you fire multiple async commands without explicitly awaiting them:

my @futures = (
    $redis->set('k1', 'v1'),
    $redis->set('k2', 'v2'),
    $redis->get('k1'),
);
my @results = await Future->needs_all(@futures);

Each command is registered in an inflight queue before being sent to Redis. A single reader coroutine processes responses in FIFO order, matching each response to the correct waiting future. This prevents response mismatch bugs that can occur when multiple coroutines race to read from the socket.

For high-throughput scenarios, consider using:

  • Explicit pipelines - $redis->pipeline batches commands for a single network round-trip

  • Auto-pipeline - auto_pipeline => 1 automatically batches commands within an event loop tick

  • Connection pools - Async::Redis::Pool for parallel execution across multiple connections

CONSTRUCTOR

new

my $redis = Async::Redis->new(%options);

Creates a new Redis client instance. Does not connect immediately.

Options:

host => $hostname

Redis server hostname. Default: 'localhost'

port => $port

Redis server port. Default: 6379

uri => $uri

Connection URI (e.g., 'redis://user:pass@host:port/db'). If provided, overrides host, port, password, database options.

password => $password

Authentication password.

username => $username

Authentication username (Redis 6+ ACL).

database => $db

Database number to SELECT after connect. Default: 0

tls => $bool | \%options

Enable TLS/SSL connection. Can be a boolean or hashref with options:

tls => {
    ca_file   => '/path/to/ca.crt',
    cert_file => '/path/to/client.crt',
    key_file  => '/path/to/client.key',
    verify    => 1,  # verify server certificate
}
connect_timeout => $seconds

Connection timeout. Default: 10

request_timeout => $seconds

Per-request timeout for commands. Default: 5

Blocking commands (BLPOP, BRPOP, etc.) automatically extend this timeout based on their server-side timeout plus blocking_timeout_buffer.

blocking_timeout_buffer => $seconds

Extra time added to blocking command timeouts. Default: 2

For example, BLPOP key 30 gets a deadline of 30 + 2 = 32 seconds.

reconnect => $bool

Enable automatic reconnection. Default: 0

reconnect_delay => $seconds

Initial reconnect delay. Default: 0.1

reconnect_delay_max => $seconds

Maximum reconnect delay. Default: 60

reconnect_jitter => $ratio

Jitter ratio for reconnect delays. Default: 0.25

on_connect => $coderef

Callback when connection established.

on_disconnect => $coderef

Callback when connection lost.

on_error => $coderef

Callback for connection errors.

prefix => $prefix

Key prefix applied to all commands.

client_name => $name

CLIENT SETNAME value sent on connect.

debug => $bool

Enable debug logging.

otel_tracer => $tracer

OpenTelemetry tracer for span creation.

otel_meter => $meter

OpenTelemetry meter for metrics.

METHODS

connect

await $redis->connect;

Establish connection to Redis server. Returns a Future that resolves to the Redis client instance.

disconnect

$redis->disconnect;

Close connection gracefully.

command

my $result = await $redis->command('GET', 'key');

Execute arbitrary Redis command.

Redis Commands

All standard Redis commands are available as methods. See https://redis.io/docs/latest/commands/ for the complete Redis command reference.

# Strings
await $redis->set('key', 'value');
await $redis->set('key', 'value', ex => 300);  # with 5min expiry
await $redis->set('key', 'value', nx => 1);    # only if not exists
my $value = await $redis->get('key');
await $redis->incr('counter');
await $redis->incrby('counter', 5);
await $redis->mset('k1', 'v1', 'k2', 'v2');
my $values = await $redis->mget('k1', 'k2');
await $redis->append('key', ' more');
await $redis->setex('key', 60, 'value');       # set with 60s expiry

# Hashes
await $redis->hset('user:1', 'name', 'Alice', 'email', 'alice@example.com');
my $name = await $redis->hget('user:1', 'name');
my $user = await $redis->hgetall('user:1');    # returns hashref
await $redis->hincrby('user:1', 'visits', 1);
my $exists = await $redis->hexists('user:1', 'name');
await $redis->hdel('user:1', 'email');

# Lists
await $redis->lpush('queue', 'job1', 'job2');
await $redis->rpush('queue', 'job3');
my $job = await $redis->lpop('queue');
my $job = await $redis->rpop('queue');
my $job = await $redis->blpop('queue', 5);     # blocking pop, 5s timeout
my $items = await $redis->lrange('queue', 0, -1);
my $len = await $redis->llen('queue');

# Sets
await $redis->sadd('tags', 'perl', 'redis', 'async');
await $redis->srem('tags', 'async');
my $members = await $redis->smembers('tags');
my $is_member = await $redis->sismember('tags', 'perl');
my $common = await $redis->sinter('tags1', 'tags2');

# Sorted Sets
await $redis->zadd('leaderboard', 100, 'alice', 85, 'bob');
await $redis->zincrby('leaderboard', 10, 'alice');
my $top = await $redis->zrange('leaderboard', 0, 9, 'WITHSCORES');
my $rank = await $redis->zrank('leaderboard', 'alice');
my $score = await $redis->zscore('leaderboard', 'alice');

# Keys
my $exists = await $redis->exists('key');
await $redis->expire('key', 300);
my $ttl = await $redis->ttl('key');
await $redis->del('key1', 'key2');
await $redis->rename('old', 'new');
my $type = await $redis->type('key');
my $keys = await $redis->keys('user:*');       # use SCAN in production

pipeline

my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->incr('counter');
my $results = await $pipeline->execute;

Create a pipeline for batched command execution. All commands are sent in a single network round-trip.

subscribe

my $sub = await $redis->subscribe('channel1', 'channel2');

Subscribe to channels. Returns a Async::Redis::Subscription object.

psubscribe

my $sub = await $redis->psubscribe('chan:*');

Subscribe to pattern. Returns a Subscription object.

multi

my $results = await $redis->multi(async sub {
    my ($tx) = @_;
    $tx->set('k1', 'v1');
    $tx->incr('counter');
});

Execute a transaction with callback.

watch

await $redis->watch('key1', 'key2');

Watch keys for transaction.

watch_multi

my $results = await $redis->watch_multi(['key'], async sub {
    my ($tx, $values) = @_;
    $tx->set('key', $values->{key} + 1);
});

Watch keys and execute transaction atomically. Returns undef if watched keys were modified by another client.

script

my $script = $redis->script('return redis.call("get", KEYS[1])');
my $result = await $script->run(['mykey']);

Create a Lua script object with automatic EVALSHA optimization. See Async::Redis::Script for details.

define_command

$redis->define_command(my_command => {
    keys        => 1,               # Number of KEYS (or 'dynamic')
    lua         => 'return ...',    # Lua script code
    description => 'Does X',        # Optional documentation
    install     => 1,               # Optional: install as method
});

Register a named Lua script for reuse. The script is automatically cached and uses EVALSHA for efficiency.

Options:

  • keys - Number of KEYS the script expects. Use 'dynamic' if variable (first arg to run_script will be the key count).

  • lua - The Lua script source code.

  • description - Optional description for documentation.

  • install - If true, install as a method on the Async::Redis class.

run_script

my $result = await $redis->run_script('my_command', @keys, @args);

Execute a registered script by name. For scripts with fixed key count, pass keys then args. For dynamic scripts, pass key count first:

# Fixed keys (keys => 2)
await $redis->run_script('two_key_script', 'key1', 'key2', 'arg1');

# Dynamic keys
await $redis->run_script('dynamic_script', 2, 'key1', 'key2', 'arg1');

get_script

my $script = $redis->get_script('my_command');

Get a registered script object by name. Returns undef if not found.

list_scripts

my @names = $redis->list_scripts;

List all registered script names.

preload_scripts

my $count = await $redis->preload_scripts;

Load all registered scripts to Redis server. Useful before pipeline execution to ensure EVALSHA will succeed.

LUA SCRIPTING

Async::Redis provides comprehensive support for Redis Lua scripting with automatic EVALSHA optimization.

Quick Start

# Define a reusable script
$redis->define_command(atomic_incr => {
    keys => 1,
    lua  => <<'LUA',
        local current = tonumber(redis.call('GET', KEYS[1]) or 0)
        local result = current + tonumber(ARGV[1])
        redis.call('SET', KEYS[1], result)
        return result
LUA
});

# Use it
my $result = await $redis->run_script('atomic_incr', 'counter', 5);

Pipeline Integration

Registered scripts work in pipelines:

my $pipe = $redis->pipeline;
$pipe->run_script('atomic_incr', 'counter:a', 1);
$pipe->run_script('atomic_incr', 'counter:b', 1);
$pipe->set('other:key', 'value');
my $results = await $pipe->execute;

Scripts are automatically preloaded before pipeline execution.

Method Installation

For frequently used scripts, install as methods:

$redis->define_command(cache_get => {
    keys    => 1,
    lua     => 'return redis.call("GET", KEYS[1])',
    install => 1,
});

# Now call directly
my $value = await $redis->cache_get('my:key');

EVALSHA Optimization

Scripts automatically use EVALSHA (by SHA1 hash) for efficiency. If the script isn't cached on the server, it falls back to EVAL and caches for future calls. This is transparent to your code.

scan_iter

my $iter = $redis->scan_iter(match => 'user:*', count => 100);
while (my $keys = await $iter->next) {
    for my $key (@$keys) { ... }
}

Create an iterator for SCAN. Also available: hscan_iter, sscan_iter, zscan_iter.

CONNECTION POOLING

For high-throughput applications, use Async::Redis::Pool:

use Async::Redis::Pool;

my $pool = Async::Redis::Pool->new(
    host => 'localhost',
    min  => 2,
    max  => 10,
);

# Use with() for automatic acquire/release
my $result = await $pool->with(sub {
    my ($conn) = @_;
    return $conn->get('key');
});

ERROR HANDLING

Errors are thrown as exception objects:

use Try::Tiny;

try {
    await $redis->get('key');
} catch {
    if ($_->isa('Async::Redis::Error::Connection')) {
        # Connection error
    } elsif ($_->isa('Async::Redis::Error::Timeout')) {
        # Timeout error
    } elsif ($_->isa('Async::Redis::Error::Redis')) {
        # Redis error (e.g., WRONGTYPE)
    }
};

Exception classes:

Async::Redis::Error::Connection

Connection-related errors (refused, reset, etc.)

Async::Redis::Error::Timeout

Timeout errors (connect, request, read).

Async::Redis::Error::Protocol

Protocol parsing errors.

Async::Redis::Error::Redis

Errors returned by Redis (WRONGTYPE, ERR, etc.)

Async::Redis::Error::Disconnected

Operation attempted on disconnected client.

FORK SAFETY

Async::Redis is fork-safe. When a fork is detected, the child process will automatically invalidate its connection state and reconnect when needed. The parent retains ownership of the original connection.

EVENT LOOP CONFIGURATION

Async::Redis uses Future::IO for event loop abstraction, making it compatible with IO::Async, UV, AnyEvent, and other event loops. However, Async::Redis does not choose which event loop to use - that's the application's responsibility.

The Golden Rule

Only executable scripts should configure Future::IO. Library modules (.pm files) should never call load_best_impl or load_impl because they don't know what event loop the application wants to use.

When you use Async::Redis inside a larger application, you are a "guest" in that application's event loop. The application (the "host") decides which Future::IO implementation to use, and all libraries must cooperate.

For Standalone Scripts

If you're writing a standalone script that uses Async::Redis directly, configure Future::IO at the top of your script:

#!/usr/bin/env perl
use strict;
use warnings;
use Future::IO;
Future::IO->load_best_impl;  # Auto-select best available

use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');
# ...

load_best_impl will select the best available backend, typically preferring UV if installed, then IO::Async, then others.

For IO::Async Applications

If your application uses IO::Async for its event loop:

use IO::Async::Loop;
use Future::IO;
Future::IO->load_impl('IOAsync');  # Explicitly use IO::Async

my $loop = IO::Async::Loop->new;

use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');

For UV Applications

If your application uses UV (libuv) for its event loop:

use UV;
use Future::IO;
Future::IO->load_impl('UV');  # Explicitly use UV

use Async::Redis;
my $redis = Async::Redis->new(host => 'localhost');

When Using Multiple Async Libraries

When combining Async::Redis with other Future::IO-based libraries (like web frameworks, database clients, etc.), all libraries will share the same Future::IO backend. This is by design - they're all cooperating within the same event loop.

The key is that the application configures Future::IO once, before loading any libraries that use it:

# Application startup
use Future::IO;
Future::IO->load_impl('IOAsync');  # Application's choice

# Now load libraries - they all use the configured backend
use Async::Redis;
use Some::Other::Async::Library;
use My::Web::Framework;

What Happens Without Configuration

If nothing explicitly configures Future::IO before the first async operation, Future::IO will auto-select an implementation. This can lead to unexpected behavior if different parts of your application assume different backends.

To avoid surprises, always configure Future::IO explicitly in your application's entry point.

Checking the Current Implementation

To see which Future::IO implementation is active:

use Future::IO;
print "Using: $Future::IO::IMPL\n";

OBSERVABILITY

OpenTelemetry integration is available:

use OpenTelemetry::SDK;

my $redis = Async::Redis->new(
    host        => 'localhost',
    otel_tracer => OpenTelemetry->tracer_provider->tracer('my-app'),
    otel_meter  => OpenTelemetry->meter_provider->meter('my-app'),
);

This enables:

  • Distributed tracing with spans per Redis command

  • Metrics: command latency, connection counts, errors

  • Automatic attribute extraction (command, database, etc.)

SEE ALSO

AUTHOR

John Napiorkowski

COPYRIGHT AND LICENSE

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.