Sponsoring The Perl Toolchain Summit 2025: Help make this important event another success Learn more

use Test::Most 0.22;
use RedisDB;
my $server = Test::RedisDB->new;
plan( skip_all => "Can't start redis-server" ) unless $server;
my $redis = $server->redisdb_client;
unless ( my $pid = fork ) {
die "Couldn't fork: $!" unless defined $pid;
# give it some time to subscribe
sleep 1;
# this nobody should receive
$redis->publish( "no_channel", "message" );
# parent should unsubscribe from abc, foo, and news.* after this
$redis->publish( "abc", "abc_pass" );
$redis->publish( "bar", "bar message" );
# give it some time to unsubscribe
sleep 1;
$redis->publish( "abc", "It sholdn't receive this message" );
$redis->publish( "news.test", "Test unfortunately failed" );
$redis->publish( "bar", "bar message" );
$redis->publish( "boo", "boo message" );
$redis->publish( "other.quit", "quit" );
# wait for first subscription loop to exit
sleep 1;
$redis->publish( "quit", "quit" );
$redis = 0;
exit 0;
}
my %counts;
is $redis->set( "running test", "subscribe.t" ), "OK", "Successfully set value in DB";
$redis->set("__test__", "__test__", RedisDB::IGNORE_REPLY);
$SIG{ALRM} = sub { die "subscription loop didn't exit" };
alarm 7;
$redis->subscription_loop(
subscribe => [ abc => \&abc_cb, 'foo', 'bar' ],
psubscribe => [ 'news.*' => \&news_cb, 'other.*' ],
default_callback => \&def_cb,
);
pass "Left first subscription loop";
$redis->subscription_loop(
subscribe => [ quit => sub { $_[0]->unsubscribe('quit') } ],
);
pass "Left second subscription loop";
alarm 0;
eq_or_diff \%counts, { bar => 2, other => 1, boo => 1 }, "Correct numbers of messages";
is $redis->get("running test"), "subscribe.t", "execute available again";
sub abc_cb {
my ( $redis, $chan, $ptrn, $message ) = @_;
is $chan, 'abc', "received message for abc";
is $ptrn, undef, "no pattern defined";
is $message, "abc_pass", "message is abc_pass";
dies_ok { $redis->get("running test") } "execute is not available in subscription mode";
dies_ok { $redis->send_command( "get", "running test" ) }
"send_command is not available in subscription mode";
eq_or_diff [ sort $redis->subscribed ], [qw(abc bar foo)], "Subscribed to abc foo bar";
eq_or_diff [ sort $redis->psubscribed ], [qw(news.* other.*)], "Psubscribed to news.* other.*";
$redis->unsubscribe( 'abc', 'foo' );
$redis->punsubscribe('news.*');
$redis->subscribe( 'boo', \&boo_cb );
return;
}
sub boo_cb {
my ( $redis, $chan, $ptrn, $message ) = @_;
$counts{boo}++;
is $chan, 'boo', "received message for boo";
is $ptrn, undef, "no pattern defined";
is $message, "boo message", "correct message for boo";
return;
}
sub news_cb {
fail "news_cb callback sould not be invoked";
return;
}
sub def_cb {
my ( $redis, $chan, $ptrn, $message ) = @_;
if ( $chan eq 'bar' ) {
$counts{bar}++;
pass "Correct channel";
is $message, "bar message", "message for bar";
is $ptrn, undef, "bar is not pattern match";
}
elsif ( $ptrn && $ptrn eq 'other.*' ) {
$counts{other}++;
is $chan, 'other.quit', "Received message for other.quit";
is $message, 'quit', "Correct message for other.quit";
$redis->unsubscribe;
$redis->punsubscribe;
}
else {
fail "No messages for $chan should be received";
}
return;
}
subtest "subscriptions outside of subscription_loop" => sub {
my $pub = $server->redisdb_client;
my $sub = $server->redisdb_client;
my $received;
my $cb = sub { $received->{ $_[1] } = $_[3] };
$sub->subscribe( 'baz', $cb );
$sub->psubscribe( 'un*', $cb );
my $rep = $sub->get_reply;
is $rep->[0], 'subscribe', "got subscribe reply";
$rep = $sub->get_reply;
is $rep->[0], 'psubscribe', "got psubscribe reply";
ok !$received, "p?subscribe messages didn't invoke callback";
dies_ok { $sub->get('key') } "get is not allowed in subscription mode";
if ( $pub->version >= 2.008 ) {
subtest "PUBSUB" => sub {
eq_or_diff $pub->pubsub('CHANNELS'), ['baz'], "Only baz channel is active";
eq_or_diff $pub->pubsub_channels, ['baz'], "Only baz channel is active";
eq_or_diff $pub->pubsub_numsub( 'baz', 'boo' ), [ 'baz', 1, 'boo', 0 ],
"baz has one subscriber, boo has none";
eq_or_diff $pub->pubsub_numpat, 1, "one pattern subscriber";
};
}
else {
diag "Not testing PUBSUB command. Requires redis-server >= 2.8.0";
}
$pub->publish( 'unexpected', 'msg 1' );
$pub->publish( 'baz', 'msg 2' );
$rep = $sub->get_reply;
eq_or_diff $rep, [ 'pmessage', 'un*', 'unexpected', 'msg 1' ],
"got msg 1 from the unexpected channel";
is $received->{unexpected}, 'msg 1', "callback for unexpected channel was invoked";
$rep = $sub->get_reply;
eq_or_diff $rep, [ 'message', 'baz', 'msg 2' ], "got msg 2 from the baz channel";
is $received->{baz}, 'msg 2', "callback for baz channel was invoked";
{
# this also checks how recv in get_reply deals with interrupts
local $SIG{ALRM} = sub { $pub->publish( 'baz', 'msg 3' ); delete $SIG{ALRM}; alarm 3; };
alarm 1;
$rep = $sub->get_reply;
alarm 0;
eq_or_diff $rep, [ 'message', 'baz', 'msg 3' ], "got msg 3 from the baz channel";
is $received->{baz}, 'msg 3', "callback for baz channel was invoked";
}
$sub->unsubscribe;
$sub->punsubscribe;
};
subtest "unsubscribe without psubscriptions (issue #18)" => sub {
my $sub = $server->redisdb_client;
$sub->subscribe('foo');
$sub->unsubscribe;
pass "Unsubscribed";
$sub->reset_connection;
$sub->psubscribe('foo*');
$sub->punsubscribe;
pass "Punsubscribed";
};
subtest "subscribe before starting subscription loop" => sub {
unless ( my $pid = fork ) {
die "Couldn't fork: $!" unless defined $pid;
sleep 1;
$redis->publish( bar => 'bar message' );
$redis->publish( 'other.quit' => 'quit' );
exit 0;
}
my $sub = $server->redisdb_client;
$sub->subscribe( 'bar' => \&def_cb );
$counts{bar} = 0;
$counts{other} = 0;
$sub->subscription_loop(
psubscribe => ['other.*'],
default_callback => \&def_cb,
);
is $counts{bar}, 1, "got one message for bar";
is $counts{other}, 1, "got one message for other";
};
done_testing;