NAME

Async::Redis - Async Redis client using Future::IO

SYNOPSIS

use Async::Redis;
use Future::AsyncAwait;

# Use any Future::IO-compatible event loop
# IO::Async:
use IO::Async::Loop;
use Future::IO;
Future::IO->load_impl('IOAsync');
my $loop = IO::Async::Loop->new;

# Or UV:   Future::IO->load_impl('UV');
# Or Glib: Future::IO->load_impl('Glib');

my $redis = Async::Redis->new(
    host => 'localhost',
    port => 6379,
);

(async sub {
    await $redis->connect;

    # Simple commands
    await $redis->set('key', 'value');
    my $value = await $redis->get('key');

    # Pipelining for efficiency
    my $pipeline = $redis->pipeline;
    $pipeline->set('k1', 'v1');
    $pipeline->set('k2', 'v2');
    $pipeline->get('k1');
    my $results = await $pipeline->execute;

    # PubSub
    my $sub = await $redis->subscribe('channel');
    while (my $msg = await $sub->next_message) {
        print "Received: $msg->{message}\n";
    }
})->();

$loop->run;

DESCRIPTION

Async::Redis is an asynchronous Redis client built on Future::IO, providing a modern, non-blocking interface for Redis operations.

Key features:

  • Full async/await support via Future::AsyncAwait

  • Event loop agnostic (IO::Async, AnyEvent, UV, etc.)

  • Automatic reconnection with exponential backoff

  • Connection pooling with health checks

  • Pipelining and auto-pipelining

  • PubSub with automatic subscription replay on reconnect

  • Transaction support (MULTI/EXEC/WATCH)

  • TLS/SSL connections

  • OpenTelemetry observability integration

  • Fork-safe for pre-fork servers (Starman, etc.)

  • Full RESP2 protocol support

CONSTRUCTOR

new

my $redis = Async::Redis->new(%options);

Creates a new Redis client instance. Does not connect immediately.

Options:

host => $hostname

Redis server hostname. Default: 'localhost'

port => $port

Redis server port. Default: 6379

uri => $uri

Connection URI (e.g., 'redis://user:pass@host:port/db'). If provided, overrides host, port, password, database options.

password => $password

Authentication password.

username => $username

Authentication username (Redis 6+ ACL).

database => $db

Database number to SELECT after connect. Default: 0

tls => $bool | \%options

Enable TLS/SSL connection. Can be a boolean or hashref with options:

tls => {
    ca_file   => '/path/to/ca.crt',
    cert_file => '/path/to/client.crt',
    key_file  => '/path/to/client.key',
    verify    => 1,  # verify server certificate
}
connect_timeout => $seconds

Connection timeout. Default: 10

read_timeout => $seconds

Read timeout. Default: 30

request_timeout => $seconds

Per-request timeout. Default: 5

reconnect => $bool

Enable automatic reconnection. Default: 0

reconnect_delay => $seconds

Initial reconnect delay. Default: 0.1

reconnect_delay_max => $seconds

Maximum reconnect delay. Default: 60

reconnect_jitter => $ratio

Jitter ratio for reconnect delays. Default: 0.25

on_connect => $coderef

Callback when connection established.

on_disconnect => $coderef

Callback when connection lost.

on_error => $coderef

Callback for connection errors.

prefix => $prefix

Key prefix applied to all commands.

client_name => $name

CLIENT SETNAME value sent on connect.

debug => $bool

Enable debug logging.

otel_tracer => $tracer

OpenTelemetry tracer for span creation.

otel_meter => $meter

OpenTelemetry meter for metrics.

METHODS

connect

await $redis->connect;

Establish connection to Redis server. Returns a Future that resolves to the Redis client instance.

disconnect

$redis->disconnect;

Close connection gracefully.

command

my $result = await $redis->command('GET', 'key');

Execute arbitrary Redis command.

Redis Commands

All standard Redis commands are available as methods. See https://redis.io/docs/latest/commands/ for the complete Redis command reference.

# Strings
await $redis->set('key', 'value');
await $redis->set('key', 'value', ex => 300);  # with 5min expiry
await $redis->set('key', 'value', nx => 1);    # only if not exists
my $value = await $redis->get('key');
await $redis->incr('counter');
await $redis->incrby('counter', 5);
await $redis->mset('k1', 'v1', 'k2', 'v2');
my $values = await $redis->mget('k1', 'k2');
await $redis->append('key', ' more');
await $redis->setex('key', 60, 'value');       # set with 60s expiry

# Hashes
await $redis->hset('user:1', 'name', 'Alice', 'email', 'alice@example.com');
my $name = await $redis->hget('user:1', 'name');
my $user = await $redis->hgetall('user:1');    # returns hashref
await $redis->hincrby('user:1', 'visits', 1);
my $exists = await $redis->hexists('user:1', 'name');
await $redis->hdel('user:1', 'email');

# Lists
await $redis->lpush('queue', 'job1', 'job2');
await $redis->rpush('queue', 'job3');
my $job = await $redis->lpop('queue');
my $job = await $redis->rpop('queue');
my $job = await $redis->blpop('queue', 5);     # blocking pop, 5s timeout
my $items = await $redis->lrange('queue', 0, -1);
my $len = await $redis->llen('queue');

# Sets
await $redis->sadd('tags', 'perl', 'redis', 'async');
await $redis->srem('tags', 'async');
my $members = await $redis->smembers('tags');
my $is_member = await $redis->sismember('tags', 'perl');
my $common = await $redis->sinter('tags1', 'tags2');

# Sorted Sets
await $redis->zadd('leaderboard', 100, 'alice', 85, 'bob');
await $redis->zincrby('leaderboard', 10, 'alice');
my $top = await $redis->zrange('leaderboard', 0, 9, 'WITHSCORES');
my $rank = await $redis->zrank('leaderboard', 'alice');
my $score = await $redis->zscore('leaderboard', 'alice');

# Keys
my $exists = await $redis->exists('key');
await $redis->expire('key', 300);
my $ttl = await $redis->ttl('key');
await $redis->del('key1', 'key2');
await $redis->rename('old', 'new');
my $type = await $redis->type('key');
my $keys = await $redis->keys('user:*');       # use SCAN in production

pipeline

my $pipeline = $redis->pipeline;
$pipeline->set('k1', 'v1');
$pipeline->incr('counter');
my $results = await $pipeline->execute;

Create a pipeline for batched command execution. All commands are sent in a single network round-trip.

subscribe

my $sub = await $redis->subscribe('channel1', 'channel2');

Subscribe to channels. Returns a Async::Redis::Subscription object.

psubscribe

my $sub = await $redis->psubscribe('chan:*');

Subscribe to pattern. Returns a Subscription object.

multi

my $results = await $redis->multi(async sub {
    my ($tx) = @_;
    $tx->set('k1', 'v1');
    $tx->incr('counter');
});

Execute a transaction with callback.

watch

await $redis->watch('key1', 'key2');

Watch keys for transaction.

watch_multi

my $results = await $redis->watch_multi(['key'], async sub {
    my ($tx, $values) = @_;
    $tx->set('key', $values->{key} + 1);
});

Watch keys and execute transaction atomically. Returns undef if watched keys were modified by another client.

script

my $script = $redis->script('return redis.call("get", KEYS[1])');
my $result = await $script->run(['mykey']);

Create a Lua script object with automatic EVALSHA optimization.

scan_iter

my $iter = $redis->scan_iter(match => 'user:*', count => 100);
while (my $keys = await $iter->next) {
    for my $key (@$keys) { ... }
}

Create an iterator for SCAN. Also available: hscan_iter, sscan_iter, zscan_iter.

CONNECTION POOLING

For high-throughput applications, use Async::Redis::Pool:

use Async::Redis::Pool;

my $pool = Async::Redis::Pool->new(
    host => 'localhost',
    min  => 2,
    max  => 10,
);

# Use with() for automatic acquire/release
my $result = await $pool->with(sub {
    my ($conn) = @_;
    return $conn->get('key');
});

ERROR HANDLING

Errors are thrown as exception objects:

use Try::Tiny;

try {
    await $redis->get('key');
} catch {
    if ($_->isa('Async::Redis::Error::Connection')) {
        # Connection error
    } elsif ($_->isa('Async::Redis::Error::Timeout')) {
        # Timeout error
    } elsif ($_->isa('Async::Redis::Error::Redis')) {
        # Redis error (e.g., WRONGTYPE)
    }
};

Exception classes:

Async::Redis::Error::Connection

Connection-related errors (refused, reset, etc.)

Async::Redis::Error::Timeout

Timeout errors (connect, request, read).

Async::Redis::Error::Protocol

Protocol parsing errors.

Async::Redis::Error::Redis

Errors returned by Redis (WRONGTYPE, ERR, etc.)

Async::Redis::Error::Disconnected

Operation attempted on disconnected client.

FORK SAFETY

Async::Redis is fork-safe. When a fork is detected, the child process will automatically invalidate its connection state and reconnect when needed. The parent retains ownership of the original connection.

OBSERVABILITY

OpenTelemetry integration is available:

use OpenTelemetry::SDK;

my $redis = Async::Redis->new(
    host        => 'localhost',
    otel_tracer => OpenTelemetry->tracer_provider->tracer('my-app'),
    otel_meter  => OpenTelemetry->meter_provider->meter('my-app'),
);

This enables:

  • Distributed tracing with spans per Redis command

  • Metrics: command latency, connection counts, errors

  • Automatic attribute extraction (command, database, etc.)

SEE ALSO

AUTHOR

John Googoo

COPYRIGHT AND LICENSE

This is free software; you can redistribute it and/or modify it under the same terms as the Perl 5 programming language system itself.