my
$server
= Test::RedisDB->new;
plan(
skip_all
=>
"Can't start redis-server"
)
unless
$server
;
my
$redis
=
$server
->redisdb_client;
unless
(
my
$pid
=
fork
) {
die
"Couldn't fork: $!"
unless
defined
$pid
;
sleep
1;
$redis
->publish(
"no_channel"
,
"message"
);
$redis
->publish(
"abc"
,
"abc_pass"
);
$redis
->publish(
"bar"
,
"bar message"
);
sleep
1;
$redis
->publish(
"abc"
,
"It sholdn't receive this message"
);
$redis
->publish(
"news.test"
,
"Test unfortunately failed"
);
$redis
->publish(
"bar"
,
"bar message"
);
$redis
->publish(
"boo"
,
"boo message"
);
$redis
->publish(
"other.quit"
,
"quit"
);
sleep
1;
$redis
->publish(
"quit"
,
"quit"
);
$redis
= 0;
exit
0;
}
my
%counts
;
is
$redis
->set(
"running test"
,
"subscribe.t"
),
"OK"
,
"Successfully set value in DB"
;
$redis
->set(
"__test__"
,
"__test__"
, RedisDB::IGNORE_REPLY);
$SIG
{ALRM} =
sub
{
die
"subscription loop didn't exit"
};
alarm
7;
$redis
->subscription_loop(
subscribe
=> [
abc
=> \
&abc_cb
,
'foo'
,
'bar'
],
psubscribe
=> [
'news.*'
=> \
&news_cb
,
'other.*'
],
default_callback
=> \
&def_cb
,
);
pass
"Left first subscription loop"
;
$redis
->subscription_loop(
subscribe
=> [
quit
=>
sub
{
$_
[0]->unsubscribe(
'quit'
) } ],
);
pass
"Left second subscription loop"
;
alarm
0;
eq_or_diff \
%counts
, {
bar
=> 2,
other
=> 1,
boo
=> 1 },
"Correct numbers of messages"
;
is
$redis
->get(
"running test"
),
"subscribe.t"
,
"execute available again"
;
sub
abc_cb {
my
(
$redis
,
$chan
,
$ptrn
,
$message
) =
@_
;
is
$chan
,
'abc'
,
"received message for abc"
;
is
$ptrn
,
undef
,
"no pattern defined"
;
is
$message
,
"abc_pass"
,
"message is abc_pass"
;
dies_ok {
$redis
->get(
"running test"
) }
"execute is not available in subscription mode"
;
dies_ok {
$redis
->send_command(
"get"
,
"running test"
) }
"send_command is not available in subscription mode"
;
eq_or_diff [
sort
$redis
->subscribed ], [
qw(abc bar foo)
],
"Subscribed to abc foo bar"
;
eq_or_diff [
sort
$redis
->psubscribed ], [
qw(news.* other.*)
],
"Psubscribed to news.* other.*"
;
$redis
->unsubscribe(
'abc'
,
'foo'
);
$redis
->punsubscribe(
'news.*'
);
$redis
->subscribe(
'boo'
, \
&boo_cb
);
return
;
}
sub
boo_cb {
my
(
$redis
,
$chan
,
$ptrn
,
$message
) =
@_
;
$counts
{boo}++;
is
$chan
,
'boo'
,
"received message for boo"
;
is
$ptrn
,
undef
,
"no pattern defined"
;
is
$message
,
"boo message"
,
"correct message for boo"
;
return
;
}
sub
news_cb {
fail
"news_cb callback sould not be invoked"
;
return
;
}
sub
def_cb {
my
(
$redis
,
$chan
,
$ptrn
,
$message
) =
@_
;
if
(
$chan
eq
'bar'
) {
$counts
{bar}++;
pass
"Correct channel"
;
is
$message
,
"bar message"
,
"message for bar"
;
is
$ptrn
,
undef
,
"bar is not pattern match"
;
}
elsif
(
$ptrn
&&
$ptrn
eq
'other.*'
) {
$counts
{other}++;
is
$chan
,
'other.quit'
,
"Received message for other.quit"
;
is
$message
,
'quit'
,
"Correct message for other.quit"
;
$redis
->unsubscribe;
$redis
->punsubscribe;
}
else
{
fail
"No messages for $chan should be received"
;
}
return
;
}
subtest
"subscriptions outside of subscription_loop"
=>
sub
{
my
$pub
=
$server
->redisdb_client;
my
$sub
=
$server
->redisdb_client;
my
$received
;
my
$cb
=
sub
{
$received
->{
$_
[1] } =
$_
[3] };
$sub
->subscribe(
'baz'
,
$cb
);
$sub
->psubscribe(
'un*'
,
$cb
);
my
$rep
=
$sub
->get_reply;
is
$rep
->[0],
'subscribe'
,
"got subscribe reply"
;
$rep
=
$sub
->get_reply;
is
$rep
->[0],
'psubscribe'
,
"got psubscribe reply"
;
ok !
$received
,
"p?subscribe messages didn't invoke callback"
;
dies_ok {
$sub
->get(
'key'
) }
"get is not allowed in subscription mode"
;
if
(
$pub
->version >= 2.008 ) {
subtest
"PUBSUB"
=>
sub
{
eq_or_diff
$pub
->pubsub(
'CHANNELS'
), [
'baz'
],
"Only baz channel is active"
;
eq_or_diff
$pub
->pubsub_channels, [
'baz'
],
"Only baz channel is active"
;
eq_or_diff
$pub
->pubsub_numsub(
'baz'
,
'boo'
), [
'baz'
, 1,
'boo'
, 0 ],
"baz has one subscriber, boo has none"
;
eq_or_diff
$pub
->pubsub_numpat, 1,
"one pattern subscriber"
;
};
}
else
{
diag
"Not testing PUBSUB command. Requires redis-server >= 2.8.0"
;
}
$pub
->publish(
'unexpected'
,
'msg 1'
);
$pub
->publish(
'baz'
,
'msg 2'
);
$rep
=
$sub
->get_reply;
eq_or_diff
$rep
, [
'pmessage'
,
'un*'
,
'unexpected'
,
'msg 1'
],
"got msg 1 from the unexpected channel"
;
is
$received
->{unexpected},
'msg 1'
,
"callback for unexpected channel was invoked"
;
$rep
=
$sub
->get_reply;
eq_or_diff
$rep
, [
'message'
,
'baz'
,
'msg 2'
],
"got msg 2 from the baz channel"
;
is
$received
->{baz},
'msg 2'
,
"callback for baz channel was invoked"
;
{
local
$SIG
{ALRM} =
sub
{
$pub
->publish(
'baz'
,
'msg 3'
);
delete
$SIG
{ALRM};
alarm
3; };
alarm
1;
$rep
=
$sub
->get_reply;
alarm
0;
eq_or_diff
$rep
, [
'message'
,
'baz'
,
'msg 3'
],
"got msg 3 from the baz channel"
;
is
$received
->{baz},
'msg 3'
,
"callback for baz channel was invoked"
;
}
$sub
->unsubscribe;
$sub
->punsubscribe;
};
subtest
"unsubscribe without psubscriptions (issue #18)"
=>
sub
{
my
$sub
=
$server
->redisdb_client;
$sub
->subscribe(
'foo'
);
$sub
->unsubscribe;
pass
"Unsubscribed"
;
$sub
->reset_connection;
$sub
->psubscribe(
'foo*'
);
$sub
->punsubscribe;
pass
"Punsubscribed"
;
};
subtest
"subscribe before starting subscription loop"
=>
sub
{
unless
(
my
$pid
=
fork
) {
die
"Couldn't fork: $!"
unless
defined
$pid
;
sleep
1;
$redis
->publish(
bar
=>
'bar message'
);
$redis
->publish(
'other.quit'
=>
'quit'
);
exit
0;
}
my
$sub
=
$server
->redisdb_client;
$sub
->subscribe(
'bar'
=> \
&def_cb
);
$counts
{bar} = 0;
$counts
{other} = 0;
$sub
->subscription_loop(
psubscribe
=> [
'other.*'
],
default_callback
=> \
&def_cb
,
);
is
$counts
{bar}, 1,
"got one message for bar"
;
is
$counts
{other}, 1,
"got one message for other"
;
};
done_testing;