NAME

Async::Redis::Cookbook - Practical Async::Redis recipes

SYNOPSIS

This cookbook shows small, copyable Async::Redis patterns. The examples are written for use inside an async sub:

  • $redis is a connected Async::Redis client.

  • $host and $port are the Redis endpoint.

  • $prefix is a unique key prefix for the example.

  • Future, Future::AsyncAwait, Future::IO, Async::Redis, and Async::Redis::Pool are loaded where needed.

The die checks are intentional. They keep the cookbook examples executable by t/00-pod/cookbook-examples.t so the documentation does not drift away from the implementation.

RECIPES

Connect And Ping

Create a client, connect explicitly, use it, then disconnect when done.

my $client = Async::Redis->new(
    host            => $host,
    port            => $port,
    connect_timeout => 2,
);

await $client->connect;
my $pong = await $client->ping;
die "expected PONG" unless $pong eq 'PONG';

$client->disconnect;

Basic GET/SET

Most command methods return a Future. Inside an async sub, await the result.

await $redis->set("$prefix:hello", 'world');

my $value = await $redis->get("$prefix:hello");
die "unexpected value: $value" unless $value eq 'world';

Run Independent Commands Concurrently

Start independent commands before awaiting them, then use Future->needs_all to wait for all results. Responses are matched to the right Futures in command order by Async::Redis.

my @futures = (
    $redis->set("$prefix:k1", 'v1'),
    $redis->set("$prefix:k2", 'v2'),
    $redis->incr("$prefix:counter"),
    $redis->get("$prefix:k1"),
);

my ($set1, $set2, $count, $value) = await Future->needs_all(@futures);

die "first SET failed"  unless $set1 eq 'OK';
die "second SET failed" unless $set2 eq 'OK';
die "INCR failed"      unless $count == 1;
die "GET failed"       unless $value eq 'v1';

Batch A Known Set Of Commands With A Pipeline

Use an explicit pipeline when you already know the full batch of commands.

my $pipe = $redis->pipeline;

$pipe->set("$prefix:pipe:a", 'one');
$pipe->set("$prefix:pipe:b", 'two');
$pipe->get("$prefix:pipe:a");
$pipe->incr("$prefix:pipe:counter");

my $results = await $pipe->execute;

die "pipeline SET a failed" unless $results->[0] eq 'OK';
die "pipeline SET b failed" unless $results->[1] eq 'OK';
die "pipeline GET failed"   unless $results->[2] eq 'one';
die "pipeline INCR failed"  unless $results->[3] == 1;

Use Auto-Pipeline For Burst Writes

Use auto_pipeline => 1 when application code naturally fires many commands in the same event-loop tick. Async::Redis batches them without changing the command API.

my $batched = Async::Redis->new(
    host          => $host,
    port          => $port,
    auto_pipeline => 1,
);
await $batched->connect;

my @writes = map {
    $batched->set("$prefix:auto:$_", "value-$_", ex => 60)
} 1 .. 25;

await Future->needs_all(@writes);

my $value = await $batched->get("$prefix:auto:25");
die "auto-pipeline write missing" unless $value eq 'value-25';

$batched->disconnect;

Use A Dedicated Connection For BLPOP Workers

Blocking Redis commands block their Redis connection, not the Perl process. Give each worker its own connection so other clients can keep doing useful work.

my $worker = Async::Redis->new(host => $host, port => $port);
await $worker->connect;

my $pop_f = $worker->blpop("$prefix:jobs", 2);

await Future::IO->sleep(0);
await $redis->rpush("$prefix:jobs", 'job-1');

my $popped = await $pop_f;
die "BLPOP did not return a list/key pair"
    unless ref($popped) eq 'ARRAY' && @$popped == 2;
die "wrong queue" unless $popped->[0] eq "$prefix:jobs";
die "wrong job"   unless $popped->[1] eq 'job-1';

$worker->disconnect;

Read Pub/Sub Messages With The Iterator API

Iterator mode is direct: await next whenever you are ready for another message.

my $publisher = Async::Redis->new(host => $host, port => $port);
my $subscriber = Async::Redis->new(host => $host, port => $port);
await Future->needs_all($publisher->connect, $subscriber->connect);

my $sub = await $subscriber->subscribe("$prefix:events");
my $next = $sub->next;

await $publisher->publish("$prefix:events", 'hello');
my $msg = await $next;

die "wrong channel" unless $msg->{channel} eq "$prefix:events";
die "wrong data"    unless $msg->{data} eq 'hello';

await $sub->unsubscribe;
$publisher->disconnect;
$subscriber->disconnect;

Read Pub/Sub Messages With A Callback

Callback mode is useful for listeners that should run in the background. Always install on_error so fatal listener errors are visible in your application.

my $publisher = Async::Redis->new(host => $host, port => $port);
my $subscriber = Async::Redis->new(host => $host, port => $port);
await Future->needs_all($publisher->connect, $subscriber->connect);

my $sub = await $subscriber->subscribe("$prefix:callback");
my $done = Future->new;

$sub->on_error(sub {
    my ($sub, $err) = @_;
    $done->fail($err) unless $done->is_ready;
});

$sub->on_message(sub {
    my ($sub, $msg) = @_;
    $done->done($msg->{data}) unless $done->is_ready;
});

await $publisher->publish("$prefix:callback", 'callback-data');

my $data = await Future->wait_any(
    $done,
    Future::IO->sleep(2)->then(sub { Future->fail('callback timed out') }),
);
die "wrong callback data" unless $data eq 'callback-data';

await $sub->unsubscribe;
$publisher->disconnect;
$subscriber->disconnect;

Use MULTI/EXEC With The Callback API

The callback receives a transaction collector. Commands queued on $tx do not return per-command Futures; Redis returns the results from EXEC.

my $results = await $redis->multi(async sub {
    my ($tx) = @_;
    $tx->set("$prefix:tx:counter", 0);
    $tx->incr("$prefix:tx:counter");
    $tx->incr("$prefix:tx:counter");
    $tx->get("$prefix:tx:counter");
});

die "transaction SET failed" unless $results->[0] eq 'OK';
die "first INCR failed"      unless $results->[1] == 1;
die "second INCR failed"     unless $results->[2] == 2;
die "transaction GET failed" unless $results->[3] eq '2';

Use WATCH With watch_multi

watch_multi reads watched values, lets you queue a transaction, and returns undef if Redis aborts because another client changed a watched key.

await $redis->set("$prefix:balance", 100);

my $results = await $redis->watch_multi(["$prefix:balance"], async sub {
    my ($tx, $watched) = @_;
    my $current = $watched->{"$prefix:balance"};

    die "unexpected watched value" unless $current eq '100';

    $tx->decrby("$prefix:balance", 25);
    $tx->get("$prefix:balance");
});

die "WATCH transaction aborted" unless defined $results;
die "DECRBY failed" unless $results->[0] == 75;
die "GET failed"    unless $results->[1] eq '75';

Register And Run Lua Scripts Explicitly

Register scripts with define_command, then run them explicitly with run_script. Script names are per client instance; they are not installed as Perl methods.

$redis->define_command(cookbook_incrby => {
    keys => 1,
    lua  => <<'LUA',
        local current = tonumber(redis.call('GET', KEYS[1]) or 0)
        local next_value = current + tonumber(ARGV[1])
        redis.call('SET', KEYS[1], next_value)
        return next_value
LUA
});

my $result = await $redis->run_script(
    'cookbook_incrby',
    "$prefix:lua:counter",
    7,
);

die "script returned wrong value" unless $result == 7;

Use A Pool With with

with is the preferred pool API because it releases the connection even when your callback dies.

my $pool = Async::Redis::Pool->new(
    host => $host,
    port => $port,
    min  => 1,
    max  => 2,
);

my $value = await $pool->with(async sub {
    my ($conn) = @_;
    await $conn->set("$prefix:pool:key", 'pooled');
    return await $conn->get("$prefix:pool:key");
});

die "pool returned wrong value" unless $value eq 'pooled';

my $stats = $pool->stats;
die "pool leaked active connection" unless $stats->{active} == 0;

$pool->shutdown;

PITFALLS

  • Do not share one connection between a BLPOP worker and unrelated commands. The connection is blocked until Redis replies.

  • Do not mix next and on_message on the same subscription. Callback mode is sticky.

  • Do not return dirty connections to a pool. Use $pool->with(...) or let the pool destroy/clean dirty connections on release.

  • Do not rely on prefix as a security boundary. Use Redis ACLs or separate databases for tenant isolation.

SEE ALSO

Async::Redis, Async::Redis::Pool, Async::Redis::Subscription, Async::Redis::Script