BEGIN {
our
$VERSION
=
'0.08'
;
XSLoader::load __PACKAGE__,
$VERSION
;
}
sub
_new_on_connect_cb {
my
(
$self
,
$on_conn
,
$password
,
$name
) =
@_
;
weaken
$self
;
return
sub
{
if
(!
$self
->is_subscriber) {
defined
$name
and
try
{
my
$n
=
$name
;
$n
=
$n
->(
$self
)
if
ref
(
$n
) eq
'CODE'
;
$self
->client_setname(
$n
)
if
defined
$n
;
};
my
$data
=
$self
->__get_data;
defined
$data
->{current_database}
and
$self
->
select
(
$data
->{current_database});
}
my
$subscribers
=
$self
->__get_data->{subscribers};
$self
->__get_data->{subscribers} = {};
$self
->__get_data->{cbs} =
undef
;
foreach
my
$topic
(CORE::
keys
(%{
$subscribers
})) {
if
(
$topic
=~ /(p?message):(.*)$/ ) {
my
(
$key
,
$channel
) = ($1, $2);
my
$subs
=
$subscribers
->{
$topic
};
if
(
$key
eq
'message'
) {
$self
->__subscription_cmd(
''
, 0,
subscribe
=>
$channel
,
$_
)
for
@$subs
;
}
else
{
$self
->__subscription_cmd(
'p'
, 0,
psubscribe
=>
$channel
,
$_
)
for
@$subs
;
}
}
}
defined
$on_conn
and
$on_conn
->(
$self
);
};
}
sub
new {
my
$class
=
shift
;
my
%args
=
@_
;
my
$self
=
$class
->_new;
if
(
$ENV
{REDIS_SERVER} && !
$args
{sock} && !
$args
{server}) {
if
(
$ENV
{REDIS_SERVER} =~ m!^/!) {
$args
{sock} =
$ENV
{REDIS_SERVER};
}
elsif
(
$ENV
{REDIS_SERVER} =~ m!^unix:(.+)!) {
$args
{sock} = $1;
}
elsif
(
$ENV
{REDIS_SERVER} =~ m!^(tcp:)?(.+)!) {
$args
{server} = $2;
}
}
my
$on_conn
=
$args
{on_connect};
my
$password
=
$args
{password};
my
$name
=
$args
{name};
$self
->__set_on_connect(
$self
->_new_on_connect_cb(
$on_conn
,
$password
,
$name
));
$self
->__set_data({
subscribers
=> {},
sentinels_cnx_timeout
=>
$args
{sentinels_cnx_timeout},
sentinels_read_timeout
=>
$args
{sentinels_read_timeout},
sentinels_write_timeout
=>
$args
{sentinels_write_timeout},
no_sentinels_list_update
=>
$args
{no_sentinels_list_update},
});
if
(
$args
{sock}) {
$self
->__connection_info_unix(
$args
{sock});
}
elsif
(
$args
{sentinels}) {
my
$sentinels
=
$args
{sentinels};
ref
$sentinels
eq
'ARRAY'
or croak(
"'sentinels' param must be an ArrayRef"
);
defined
(
$self
->__get_data->{service} =
$args
{service})
or croak(
"Need 'service' name when using 'sentinels'!"
);
$self
->__get_data->{sentinels} =
$sentinels
;
my
$on_build_sock
=
sub
{
my
$data
=
$self
->__get_data;
my
$sentinels
=
$data
->{sentinels};
my
$status
;
foreach
my
$sentinel_address
(
@$sentinels
) {
my
$sentinel
=
eval
{
Redis::Fast::Sentinel->new(
server
=>
$sentinel_address
,
cnx_timeout
=> (
exists
$data
->{sentinels_cnx_timeout}
?
$data
->{sentinels_cnx_timeout} : 0.1),
read_timeout
=> (
exists
$data
->{sentinels_read_timeout}
?
$data
->{sentinels_read_timeout} : 1 ),
write_timeout
=> (
exists
$data
->{sentinels_write_timeout}
?
$data
->{sentinels_write_timeout} : 1 ),
)
} or
next
;
my
$server_address
=
$sentinel
->get_service_address(
$data
->{service});
defined
$server_address
or
$status
||=
"Sentinels don't know this service"
,
next
;
$server_address
eq
'IDONTKNOW'
and
$status
=
"service is configured in one Sentinel, but was never reached"
,
next
;
my
(
$server
,
$port
) =
split
/:/,
$server_address
;
$self
->__connection_info(
$server
,
$port
);
if
(!
$data
->{no_sentinels_list_update} ) {
my
$idx
= 2;
my
%h
= ( (
map
{
$_
=>
$idx
++ } @{
$data
->{sentinels}}),
$sentinel_address
=> 1,
);
$data
->{sentinels} = [
(
sort
{
$h
{
$a
} <=>
$h
{
$b
} }
keys
%h
),
grep
{ !
$h
{
$_
}; }
map
{ +{
@$_
}->{name}; }
$sentinel
->sentinel(
sentinels
=>
$data
->{service}
)
];
}
}
};
$self
->__set_on_build_sock(
$on_build_sock
);
}
else
{
my
(
$server
,
$port
) =
split
/:/, (
$args
{server} ||
'127.0.0.1:6379'
);
$self
->__connection_info(
$server
,
$port
);
}
$self
->__set_reconnect(
$args
{reconnect} || 0);
$self
->__set_every(
$args
{every} || 1000);
$self
->__set_cnx_timeout(
$args
{cnx_timeout} || -1);
$self
->__set_read_timeout(
$args
{read_timeout} || -1);
$self
->__set_write_timeout(
$args
{write_timeout} || -1);
$self
->
connect
unless
$args
{no_auto_connect_on_new};
return
$self
;
}
our
$AUTOLOAD
;
sub
AUTOLOAD {
my
$command
=
$AUTOLOAD
;
$command
=~ s/.*://;
my
@command
=
split
/_/,
uc
$command
;
my
$method
=
sub
{
my
$self
=
shift
;
$self
->__is_valid_command(
$command
);
my
(
$ret
,
$error
) =
$self
->__std_cmd(
@command
,
@_
);
confess
"[$command] $error, "
if
defined
$error
;
return
(
wantarray
&&
ref
$ret
eq
'ARRAY'
) ?
@$ret
:
$ret
;
};
no
strict
'refs'
;
*$AUTOLOAD
=
$method
;
goto
$method
;
}
sub
__with_reconnect {
my
(
$self
,
$cb
) =
@_
;
confess
"not implemented"
;
}
sub
keys
{
my
$self
=
shift
;
$self
->__is_valid_command(
'keys'
);
my
(
$ret
,
$error
) =
$self
->__keys(
@_
);
confess
"[keys] $error, "
if
defined
$error
;
return
$ret
unless
ref
$ret
eq
'ARRAY'
;
return
@$ret
;
}
sub
ping {
my
$self
=
shift
;
$self
->__is_valid_command(
'ping'
);
return
unless
$self
->__sock;
return
scalar
try
{
my
(
$ret
,
$error
) =
$self
->__std_cmd(
'ping'
);
return
if
defined
$error
;
return
$ret
;
}
catch
{
return
;
};
}
sub
info {
my
$self
=
shift
;
$self
->__is_valid_command(
'info'
);
my
(
$ret
,
$error
) =
$self
->__info(
@_
);
confess
"[keys] $error, "
if
defined
$error
;
return
$ret
unless
ref
$ret
eq
'ARRAY'
;
return
@$ret
;
}
sub
quit {
my
$self
=
shift
;
$self
->__is_valid_command(
'quit'
);
$self
->__quit(
@_
);
}
sub
shutdown
{
my
$self
=
shift
;
$self
->__is_valid_command(
'shutdown'
);
$self
->__shutdown(
@_
);
}
sub
select
{
my
$self
=
shift
;
my
$database
=
shift
;
$self
->__is_valid_command(
'select'
);
my
(
$ret
,
$error
) =
$self
->__std_cmd(
'SELECT'
,
$database
,
@_
);
confess
"[SELECT] $error, "
if
defined
$error
;
$self
->__get_data->{current_database} =
$database
;
return
$ret
;
}
sub
__subscription_cmd {
my
$self
=
shift
;
my
$pr
=
shift
;
my
$unsub
=
shift
;
my
$command
=
shift
;
my
$cb
=
pop
;
weaken
$self
;
confess(
"Missing required callback in call to $command(), "
)
unless
ref
(
$cb
) eq
'CODE'
;
$self
->wait_all_responses;
while
(
$self
->__get_data->{cbs}) {
$self
->__wait_for_event(1);
}
my
@subs
=
@_
;
@subs
=
$self
->__process_unsubscribe_requests(
$cb
,
$pr
,
@subs
)
if
$unsub
;
if
(
@subs
) {
$self
->__get_data->{cbs} = {
map
{ (
"${pr}message:$_"
=>
$cb
) }
@subs
};
for
my
$sub
(
@subs
) {
$self
->__send_subscription_cmd(
$command
,
$sub
,
$self
->__subscription_callbak,
);
}
while
(
$self
->__get_data->{cbs}) {
$self
->__wait_for_event(1);
}
}
}
sub
__subscription_callbak {
my
$self
=
shift
;
my
$cb
=
$self
->__get_data->{callback};
return
$cb
if
$cb
;
weaken
$self
;
$cb
=
sub
{
my
$cbs
=
$self
->__get_data->{cbs};
if
(
$cbs
) {
$self
->__process_subscription_changes(
$cbs
,
@_
);
unless
(
%$cbs
) {
$self
->__get_data->{cbs} =
undef
;
}
}
else
{
$self
->__process_pubsub_msg(
@_
);
}
};
$self
->__get_data->{callback} =
$cb
;
return
$cb
;
}
sub
subscribe {
shift
->__subscription_cmd(
''
, 0,
subscribe
=>
@_
) }
sub
psubscribe {
shift
->__subscription_cmd(
'p'
, 0,
psubscribe
=>
@_
) }
sub
unsubscribe {
shift
->__subscription_cmd(
''
, 1,
unsubscribe
=>
@_
) }
sub
punsubscribe {
shift
->__subscription_cmd(
'p'
, 1,
punsubscribe
=>
@_
) }
sub
__process_unsubscribe_requests {
my
(
$self
,
$cb
,
$pr
,
@unsubs
) =
@_
;
my
$subs
=
$self
->__get_data->{subscribers};
my
@subs_to_unsubscribe
;
for
my
$sub
(
@unsubs
) {
my
$key
=
"${pr}message:$sub"
;
next
unless
$subs
->{
$key
} && @{
$subs
->{
$key
} };
my
$cbs
=
$subs
->{
$key
} = [
grep
{
$_
ne
$cb
} @{
$subs
->{
$key
} }];
next
if
@$cbs
;
delete
$subs
->{
$key
};
push
@subs_to_unsubscribe
,
$sub
;
}
return
@subs_to_unsubscribe
;
}
sub
__process_subscription_changes {
my
(
$self
,
$expected
,
$m
,
$error
) =
@_
;
my
$subs
=
$self
->__get_data->{subscribers};
if
(
$m
->[0] =~ /^p?message$/) {
$self
->__process_pubsub_msg(
$m
);
return
;
}
my
(
$key
,
$unsub
) =
$m
->[0] =~ m/^(p)?(un)?subscribe$/;
$key
.=
"message:$m->[1]"
;
my
$cb
=
delete
$expected
->{
$key
};
push
@{
$subs
->{
$key
} },
$cb
unless
$unsub
;
}
sub
__process_pubsub_msg {
my
(
$self
,
$m
) =
@_
;
my
$subs
=
$self
->__get_data->{subscribers};
my
$sub
=
$m
->[1];
my
$cbid
=
"$m->[0]:$sub"
;
my
$data
=
pop
@$m
;
my
$topic
=
$m
->[2] ||
$sub
;
if
(!
exists
$subs
->{
$cbid
}) {
warn
"Message for topic '$topic' ($cbid) without expected callback, "
;
return
0;
}
$_
->(
$data
,
$topic
,
$sub
)
for
@{
$subs
->{
$cbid
} };
return
1;
}
sub
__is_valid_command {
my
(
$self
,
$cmd
) =
@_
;
confess(
"Cannot use command '$cmd' while in SUBSCRIBE mode, "
)
if
$self
->is_subscriber;
}
1;
Hide Show 186 lines of Pod