NAME
Async::Redis::Cookbook - Practical Async::Redis recipes
SYNOPSIS
This cookbook shows small, copyable Async::Redis patterns. The examples are written for use inside an async sub:
$redisis a connected Async::Redis client.$hostand$portare the Redis endpoint.$prefixis a unique key prefix for the example.Future,Future::AsyncAwait,Future::IO,Async::Redis, andAsync::Redis::Poolare loaded where needed.
The die checks are intentional. They keep the cookbook examples executable by t/00-pod/cookbook-examples.t so the documentation does not drift away from the implementation.
RECIPES
Connect And Ping
Create a client, connect explicitly, use it, then disconnect when done.
my $client = Async::Redis->new(
host => $host,
port => $port,
connect_timeout => 2,
);
await $client->connect;
my $pong = await $client->ping;
die "expected PONG" unless $pong eq 'PONG';
$client->disconnect;
Basic GET/SET
Most command methods return a Future. Inside an async sub, await the result.
await $redis->set("$prefix:hello", 'world');
my $value = await $redis->get("$prefix:hello");
die "unexpected value: $value" unless $value eq 'world';
Run Independent Commands Concurrently
Start independent commands before awaiting them, then use Future->needs_all to wait for all results. Responses are matched to the right Futures in command order by Async::Redis.
my @futures = (
$redis->set("$prefix:k1", 'v1'),
$redis->set("$prefix:k2", 'v2'),
$redis->incr("$prefix:counter"),
$redis->get("$prefix:k1"),
);
my ($set1, $set2, $count, $value) = await Future->needs_all(@futures);
die "first SET failed" unless $set1 eq 'OK';
die "second SET failed" unless $set2 eq 'OK';
die "INCR failed" unless $count == 1;
die "GET failed" unless $value eq 'v1';
Batch A Known Set Of Commands With A Pipeline
Use an explicit pipeline when you already know the full batch of commands.
my $pipe = $redis->pipeline;
$pipe->set("$prefix:pipe:a", 'one');
$pipe->set("$prefix:pipe:b", 'two');
$pipe->get("$prefix:pipe:a");
$pipe->incr("$prefix:pipe:counter");
my $results = await $pipe->execute;
die "pipeline SET a failed" unless $results->[0] eq 'OK';
die "pipeline SET b failed" unless $results->[1] eq 'OK';
die "pipeline GET failed" unless $results->[2] eq 'one';
die "pipeline INCR failed" unless $results->[3] == 1;
Use Auto-Pipeline For Burst Writes
Use auto_pipeline => 1 when application code naturally fires many commands in the same event-loop tick. Async::Redis batches them without changing the command API.
my $batched = Async::Redis->new(
host => $host,
port => $port,
auto_pipeline => 1,
);
await $batched->connect;
my @writes = map {
$batched->set("$prefix:auto:$_", "value-$_", ex => 60)
} 1 .. 25;
await Future->needs_all(@writes);
my $value = await $batched->get("$prefix:auto:25");
die "auto-pipeline write missing" unless $value eq 'value-25';
$batched->disconnect;
Use A Dedicated Connection For BLPOP Workers
Blocking Redis commands block their Redis connection, not the Perl process. Give each worker its own connection so other clients can keep doing useful work.
my $worker = Async::Redis->new(host => $host, port => $port);
await $worker->connect;
my $pop_f = $worker->blpop("$prefix:jobs", 2);
await Future::IO->sleep(0);
await $redis->rpush("$prefix:jobs", 'job-1');
my $popped = await $pop_f;
die "BLPOP did not return a list/key pair"
unless ref($popped) eq 'ARRAY' && @$popped == 2;
die "wrong queue" unless $popped->[0] eq "$prefix:jobs";
die "wrong job" unless $popped->[1] eq 'job-1';
$worker->disconnect;
Read Pub/Sub Messages With The Iterator API
Iterator mode is direct: await next whenever you are ready for another message.
my $publisher = Async::Redis->new(host => $host, port => $port);
my $subscriber = Async::Redis->new(host => $host, port => $port);
await Future->needs_all($publisher->connect, $subscriber->connect);
my $sub = await $subscriber->subscribe("$prefix:events");
my $next = $sub->next;
await $publisher->publish("$prefix:events", 'hello');
my $msg = await $next;
die "wrong channel" unless $msg->{channel} eq "$prefix:events";
die "wrong data" unless $msg->{data} eq 'hello';
await $sub->unsubscribe;
$publisher->disconnect;
$subscriber->disconnect;
Read Pub/Sub Messages With A Callback
Callback mode is useful for listeners that should run in the background. Always install on_error so fatal listener errors are visible in your application.
my $publisher = Async::Redis->new(host => $host, port => $port);
my $subscriber = Async::Redis->new(host => $host, port => $port);
await Future->needs_all($publisher->connect, $subscriber->connect);
my $sub = await $subscriber->subscribe("$prefix:callback");
my $done = Future->new;
$sub->on_error(sub {
my ($sub, $err) = @_;
$done->fail($err) unless $done->is_ready;
});
$sub->on_message(sub {
my ($sub, $msg) = @_;
$done->done($msg->{data}) unless $done->is_ready;
});
await $publisher->publish("$prefix:callback", 'callback-data');
my $data = await Future->wait_any(
$done,
Future::IO->sleep(2)->then(sub { Future->fail('callback timed out') }),
);
die "wrong callback data" unless $data eq 'callback-data';
await $sub->unsubscribe;
$publisher->disconnect;
$subscriber->disconnect;
Use MULTI/EXEC With The Callback API
The callback receives a transaction collector. Commands queued on $tx do not return per-command Futures; Redis returns the results from EXEC.
my $results = await $redis->multi(async sub {
my ($tx) = @_;
$tx->set("$prefix:tx:counter", 0);
$tx->incr("$prefix:tx:counter");
$tx->incr("$prefix:tx:counter");
$tx->get("$prefix:tx:counter");
});
die "transaction SET failed" unless $results->[0] eq 'OK';
die "first INCR failed" unless $results->[1] == 1;
die "second INCR failed" unless $results->[2] == 2;
die "transaction GET failed" unless $results->[3] eq '2';
Use WATCH With watch_multi
watch_multi reads watched values, lets you queue a transaction, and returns undef if Redis aborts because another client changed a watched key.
await $redis->set("$prefix:balance", 100);
my $results = await $redis->watch_multi(["$prefix:balance"], async sub {
my ($tx, $watched) = @_;
my $current = $watched->{"$prefix:balance"};
die "unexpected watched value" unless $current eq '100';
$tx->decrby("$prefix:balance", 25);
$tx->get("$prefix:balance");
});
die "WATCH transaction aborted" unless defined $results;
die "DECRBY failed" unless $results->[0] == 75;
die "GET failed" unless $results->[1] eq '75';
Register And Run Lua Scripts Explicitly
Register scripts with define_command, then run them explicitly with run_script. Script names are per client instance; they are not installed as Perl methods.
$redis->define_command(cookbook_incrby => {
keys => 1,
lua => <<'LUA',
local current = tonumber(redis.call('GET', KEYS[1]) or 0)
local next_value = current + tonumber(ARGV[1])
redis.call('SET', KEYS[1], next_value)
return next_value
LUA
});
my $result = await $redis->run_script(
'cookbook_incrby',
"$prefix:lua:counter",
7,
);
die "script returned wrong value" unless $result == 7;
Use A Pool With with
with is the preferred pool API because it releases the connection even when your callback dies.
my $pool = Async::Redis::Pool->new(
host => $host,
port => $port,
min => 1,
max => 2,
);
my $value = await $pool->with(async sub {
my ($conn) = @_;
await $conn->set("$prefix:pool:key", 'pooled');
return await $conn->get("$prefix:pool:key");
});
die "pool returned wrong value" unless $value eq 'pooled';
my $stats = $pool->stats;
die "pool leaked active connection" unless $stats->{active} == 0;
$pool->shutdown;
PITFALLS
Do not share one connection between a
BLPOPworker and unrelated commands. The connection is blocked until Redis replies.Do not mix
nextandon_messageon the same subscription. Callback mode is sticky.Do not return dirty connections to a pool. Use
$pool->with(...)or let the pool destroy/clean dirty connections on release.Do not rely on
prefixas a security boundary. Use Redis ACLs or separate databases for tenant isolation.
SEE ALSO
Async::Redis, Async::Redis::Pool, Async::Redis::Subscription, Async::Redis::Script