no
strict
'refs'
;
use
Socket
qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM )
;
use
Errno
qw( EINPROGRESS EWOULDBLOCK EISCONN )
;
debug no_rehash stats compress_threshold compress_enable stat_callback
readonly select_timeout namespace namespace_len servers active buckets
pref_ip
bucketcount _single_sock _stime
connect_timeout cb_connect_fail
parser_class
buck2sock buck2sock_generation
}
;
use
vars
qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL $HAVE_SOCKET6)
;
$VERSION
=
"1.30"
;
BEGIN {
$HAVE_ZLIB
=
eval
"use Compress::Zlib (); 1;"
;
$HAVE_SOCKET6
=
eval
"use Socket6 qw(AF_INET6 PF_INET6); 1;"
;
}
my
$HAVE_XS
=
eval
"use Cache::Memcached::GetParserXS; 1;"
;
$HAVE_XS
= 0
if
$ENV
{NO_XS};
my
$parser_class
=
$HAVE_XS
?
"Cache::Memcached::GetParserXS"
:
"Cache::Memcached::GetParser"
;
if
(
$ENV
{XS_DEBUG}) {
print
"using parser: $parser_class\n"
;
}
$FLAG_NOSIGNAL
= 0;
eval
{
$FLAG_NOSIGNAL
= MSG_NOSIGNAL; };
my
%host_dead
;
my
%cache_sock
;
my
$socket_cache_generation
= 1;
my
$PROTO_TCP
;
our
$SOCK_TIMEOUT
= 2.6;
sub
new {
my
Cache::Memcached
$self
=
shift
;
$self
= fields::new(
$self
)
unless
ref
$self
;
my
$args
= (
@_
== 1) ?
shift
: {
@_
};
$self
->{
'buck2sock'
}= [];
$self
->{
'buck2sock_generation'
} = 0;
$self
->set_servers(
$args
->{
'servers'
});
$self
->{
'debug'
} =
$args
->{
'debug'
} || 0;
$self
->{
'no_rehash'
} =
$args
->{
'no_rehash'
};
$self
->{
'stats'
} = {};
$self
->{
'pref_ip'
} =
$args
->{
'pref_ip'
} || {};
$self
->{
'compress_threshold'
} =
$args
->{
'compress_threshold'
};
$self
->{
'compress_enable'
} = 1;
$self
->{
'stat_callback'
} =
$args
->{
'stat_callback'
} ||
undef
;
$self
->{
'readonly'
} =
$args
->{
'readonly'
};
$self
->{
'parser_class'
} =
$args
->{
'parser_class'
} ||
$parser_class
;
$self
->{
'connect_timeout'
} =
$args
->{
'connect_timeout'
} || 0.25;
$self
->{
'select_timeout'
} =
$args
->{
'select_timeout'
} || 1.0;
$self
->{namespace} =
$args
->{namespace} ||
''
;
$self
->{namespace_len} =
length
$self
->{namespace};
return
$self
;
}
sub
set_pref_ip {
my
Cache::Memcached
$self
=
shift
;
$self
->{
'pref_ip'
} =
shift
;
}
sub
set_servers {
my
Cache::Memcached
$self
=
shift
;
my
(
$list
) =
@_
;
$self
->{
'servers'
} =
$list
|| [];
$self
->{
'active'
} =
scalar
@{
$self
->{
'servers'
}};
$self
->{
'buckets'
} =
undef
;
$self
->{
'bucketcount'
} = 0;
$self
->init_buckets;
$self
->{
'buck2sock_generation'
} = 0;
$self
->{
'_single_sock'
} =
undef
;
if
(@{
$self
->{
'servers'
}} == 1) {
$self
->{
'_single_sock'
} =
$self
->{
'servers'
}[0];
}
return
$self
;
}
sub
set_cb_connect_fail {
my
Cache::Memcached
$self
=
shift
;
$self
->{
'cb_connect_fail'
} =
shift
;
}
sub
set_connect_timeout {
my
Cache::Memcached
$self
=
shift
;
$self
->{
'connect_timeout'
} =
shift
;
}
sub
set_debug {
my
Cache::Memcached
$self
=
shift
;
my
(
$dbg
) =
@_
;
$self
->{
'debug'
} =
$dbg
|| 0;
}
sub
set_readonly {
my
Cache::Memcached
$self
=
shift
;
my
(
$ro
) =
@_
;
$self
->{
'readonly'
} =
$ro
;
}
sub
set_norehash {
my
Cache::Memcached
$self
=
shift
;
my
(
$val
) =
@_
;
$self
->{
'no_rehash'
} =
$val
;
}
sub
set_compress_threshold {
my
Cache::Memcached
$self
=
shift
;
my
(
$thresh
) =
@_
;
$self
->{
'compress_threshold'
} =
$thresh
;
}
sub
enable_compress {
my
Cache::Memcached
$self
=
shift
;
my
(
$enable
) =
@_
;
$self
->{
'compress_enable'
} =
$enable
;
}
sub
forget_dead_hosts {
my
Cache::Memcached
$self
=
shift
;
%host_dead
= ();
$socket_cache_generation
++;
return
1;
}
sub
set_stat_callback {
my
Cache::Memcached
$self
=
shift
;
my
(
$stat_callback
) =
@_
;
$self
->{
'stat_callback'
} =
$stat_callback
;
}
my
%sock_map
;
sub
_dead_sock {
my
(
$self
,
$sock
,
$ret
,
$dead_for
) =
@_
;
if
(
my
$ipport
=
$sock_map
{
$sock
}) {
my
$now
=
time
();
$host_dead
{
$ipport
} =
$now
+
$dead_for
if
$dead_for
;
delete
$cache_sock
{
$ipport
};
delete
$sock_map
{
$sock
};
}
$socket_cache_generation
++;
return
$ret
;
}
sub
_close_sock {
my
(
$self
,
$sock
) =
@_
;
if
(
my
$ipport
=
$sock_map
{
$sock
}) {
close
$sock
;
delete
$cache_sock
{
$ipport
};
delete
$sock_map
{
$sock
};
}
$socket_cache_generation
++;
return
1;
}
sub
_connect_sock {
my
(
$sock
,
$sin
,
$timeout
) =
@_
;
$timeout
= 0.25
if
not
defined
$timeout
;
if
(
$timeout
) {
IO::Handle::blocking(
$sock
, 0);
}
else
{
IO::Handle::blocking(
$sock
, 1);
}
my
$ret
=
connect
(
$sock
,
$sin
);
if
(!
$ret
&&
$timeout
&& $!==EINPROGRESS) {
my
$win
=
''
;
vec
(
$win
,
fileno
(
$sock
), 1) = 1;
if
(
select
(
undef
,
$win
,
undef
,
$timeout
) > 0) {
$ret
=
connect
(
$sock
,
$sin
);
$ret
= 1
if
!
$ret
&& $!==EISCONN;
}
}
unless
(
$timeout
) {
IO::Handle::blocking(
$sock
, 0);
}
return
$ret
;
}
sub
sock_to_host {
my
Cache::Memcached
$self
=
ref
$_
[0] ?
shift
:
undef
;
my
$host
=
$_
[0];
return
$cache_sock
{
$host
}
if
$cache_sock
{
$host
};
my
$now
=
time
();
my
(
$ip
,
$port
) =
$host
=~ /(.*):(\d+)$/;
if
(
defined
(
$ip
)) {
$ip
=~ s/[\[\]]//g;
}
return
undef
if
$host_dead
{
$host
} &&
$host_dead
{
$host
} >
$now
;
my
$sock
;
my
$connected
= 0;
my
$sin
;
my
$proto
=
$PROTO_TCP
||=
getprotobyname
(
'tcp'
);
if
(
index
(
$host
,
'/'
) != 0 )
{
if
(
$self
&&
$self
->{pref_ip}{
$ip
}) {
my
$prefip
=
$self
->{pref_ip}{
$ip
};
if
(
$HAVE_SOCKET6
&&
index
(
$prefip
,
':'
) != -1) {
no
strict
'subs'
;
socket
(
$sock
, PF_INET6, SOCK_STREAM,
$proto
);
$sock_map
{
$sock
} =
$host
;
$sin
= Socket6::pack_sockaddr_in6(
$port
,
Socket6::inet_pton(AF_INET6,
$prefip
));
}
else
{
socket
(
$sock
, PF_INET, SOCK_STREAM,
$proto
);
$sock_map
{
$sock
} =
$host
;
$sin
= Socket::sockaddr_in(
$port
, Socket::inet_aton(
$prefip
));
}
if
(_connect_sock(
$sock
,
$sin
,
$self
->{connect_timeout})) {
$connected
= 1;
}
else
{
if
(
my
$cb
=
$self
->{cb_connect_fail}) {
$cb
->(
$prefip
);
}
close
$sock
;
}
}
unless
(
$connected
) {
if
(
$HAVE_SOCKET6
&&
index
(
$ip
,
':'
) != -1) {
no
strict
'subs'
;
socket
(
$sock
, PF_INET6, SOCK_STREAM,
$proto
);
$sock_map
{
$sock
} =
$host
;
$sin
= Socket6::pack_sockaddr_in6(
$port
,
Socket6::inet_pton(AF_INET6,
$ip
));
}
else
{
socket
(
$sock
, PF_INET, SOCK_STREAM,
$proto
);
$sock_map
{
$sock
} =
$host
;
$sin
= Socket::sockaddr_in(
$port
, Socket::inet_aton(
$ip
));
}
my
$timeout
=
$self
?
$self
->{connect_timeout} : 0.25;
unless
(_connect_sock(
$sock
,
$sin
,
$timeout
)) {
my
$cb
=
$self
?
$self
->{cb_connect_fail} :
undef
;
$cb
->(
$ip
)
if
$cb
;
return
_dead_sock(
$self
,
$sock
,
undef
, 20 +
int
(
rand
(10)));
}
}
}
else
{
socket
(
$sock
, PF_UNIX, SOCK_STREAM, 0);
$sock_map
{
$sock
} =
$host
;
$sin
= Socket::sockaddr_un(
$host
);
my
$timeout
=
$self
?
$self
->{connect_timeout} : 0.25;
unless
(_connect_sock(
$sock
,
$sin
,
$timeout
)) {
my
$cb
=
$self
?
$self
->{cb_connect_fail} :
undef
;
$cb
->(
$host
)
if
$cb
;
return
_dead_sock(
$self
,
$sock
,
undef
, 20 +
int
(
rand
(10)));
}
}
my
$old
=
select
(
$sock
);
$| = 1;
select
(
$old
);
$cache_sock
{
$host
} =
$sock
;
return
$sock
;
}
sub
get_sock {
my
Cache::Memcached
$self
=
$_
[0];
my
$key
=
$_
[1];
return
$self
->sock_to_host(
$self
->{
'_single_sock'
})
if
$self
->{
'_single_sock'
};
return
undef
unless
$self
->{
'active'
};
my
$hv
=
ref
$key
?
int
(
$key
->[0]) : _hashfunc(
$key
);
my
$real_key
=
ref
$key
?
$key
->[1] :
$key
;
my
$tries
= 0;
while
(
$tries
++ < 20) {
my
$host
=
$self
->{
'buckets'
}->[
$hv
%
$self
->{
'bucketcount'
}];
my
$sock
=
$self
->sock_to_host(
$host
);
return
$sock
if
$sock
;
return
undef
if
$self
->{
'no_rehash'
};
$hv
+= _hashfunc(
$tries
.
$real_key
);
}
return
undef
;
}
sub
init_buckets {
my
Cache::Memcached
$self
=
shift
;
return
if
$self
->{
'buckets'
};
my
$bu
=
$self
->{
'buckets'
} = [];
foreach
my
$v
(@{
$self
->{
'servers'
}}) {
if
(
ref
$v
eq
"ARRAY"
) {
for
(1..
$v
->[1]) {
push
@$bu
,
$v
->[0]; }
}
else
{
push
@$bu
,
$v
;
}
}
$self
->{
'bucketcount'
} =
scalar
@{
$self
->{
'buckets'
}};
}
sub
disconnect_all {
my
Cache::Memcached
$self
=
shift
;
my
$sock
;
foreach
$sock
(
values
%cache_sock
) {
close
$sock
;
}
%cache_sock
= ();
$socket_cache_generation
++;
}
sub
_write_and_read {
my
Cache::Memcached
$self
=
shift
;
my
(
$sock
,
$line
,
$check_complete
) =
@_
;
my
$res
;
my
(
$ret
,
$offset
) = (
undef
, 0);
$check_complete
||=
sub
{
return
(
rindex
(
$ret
,
"\r\n"
) + 2 ==
length
(
$ret
));
};
my
$state
= 0;
my
(
$rin
,
$rout
,
$win
,
$wout
);
my
$nfound
;
my
$copy_state
= -1;
local
$SIG
{
'PIPE'
} =
"IGNORE"
unless
$FLAG_NOSIGNAL
;
while
(1) {
if
(
$copy_state
!=
$state
) {
last
if
$state
==2;
(
$rin
,
$win
) = (
''
,
''
);
vec
(
$rin
,
fileno
(
$sock
), 1) = 1
if
$state
==1;
vec
(
$win
,
fileno
(
$sock
), 1) = 1
if
$state
==0;
$copy_state
=
$state
;
}
$nfound
=
select
(
$rout
=
$rin
,
$wout
=
$win
,
undef
,
$self
->{
'select_timeout'
});
last
unless
$nfound
;
if
(
vec
(
$wout
,
fileno
(
$sock
), 1)) {
$res
=
send
(
$sock
,
$line
,
$FLAG_NOSIGNAL
);
next
if
not
defined
$res
and $!==EWOULDBLOCK;
unless
(
$res
> 0) {
$self
->_close_sock(
$sock
);
return
undef
;
}
if
(
$res
==
length
(
$line
)) {
$state
= 1;
}
else
{
substr
(
$line
, 0,
$res
,
''
);
}
}
if
(
vec
(
$rout
,
fileno
(
$sock
), 1)) {
$res
=
sysread
(
$sock
,
$ret
, 255,
$offset
);
next
if
!
defined
(
$res
) and $!==EWOULDBLOCK;
if
(
$res
== 0) {
$self
->_close_sock(
$sock
);
return
undef
;
}
$offset
+=
$res
;
$state
= 2
if
$check_complete
->(\
$ret
);
}
}
unless
(
$state
== 2) {
$self
->_dead_sock(
$sock
);
return
undef
;
}
return
$ret
;
}
sub
delete
{
my
Cache::Memcached
$self
=
shift
;
my
(
$key
,
$time
) =
@_
;
return
0
if
!
$self
->{
'active'
} ||
$self
->{
'readonly'
};
my
$stime
= Time::HiRes::
time
()
if
$self
->{
'stat_callback'
};
my
$sock
=
$self
->get_sock(
$key
);
return
0
unless
$sock
;
$self
->{
'stats'
}->{
"delete"
}++;
$key
=
ref
$key
?
$key
->[1] :
$key
;
$time
=
$time
?
" $time"
:
""
;
my
$cmd
=
"delete $self->{namespace}$key$time\r\n"
;
my
$res
= _write_and_read(
$self
,
$sock
,
$cmd
);
if
(
$self
->{
'stat_callback'
}) {
my
$etime
= Time::HiRes::
time
();
$self
->{
'stat_callback'
}->(
$stime
,
$etime
,
$sock
,
'delete'
);
}
return
defined
$res
&&
$res
eq
"DELETED\r\n"
;
}
*remove
= \
&delete
;
sub
add {
_set(
"add"
,
@_
);
}
sub
replace {
_set(
"replace"
,
@_
);
}
sub
set {
_set(
"set"
,
@_
);
}
sub
append {
_set(
"append"
,
@_
);
}
sub
prepend {
_set(
"prepend"
,
@_
);
}
sub
_set {
my
$cmdname
=
shift
;
my
Cache::Memcached
$self
=
shift
;
my
(
$key
,
$val
,
$exptime
) =
@_
;
return
0
if
!
$self
->{
'active'
} ||
$self
->{
'readonly'
};
my
$stime
= Time::HiRes::
time
()
if
$self
->{
'stat_callback'
};
my
$sock
=
$self
->get_sock(
$key
);
return
0
unless
$sock
;
my
$app_or_prep
=
$cmdname
eq
'append'
||
$cmdname
eq
'prepend'
? 1 : 0;
$self
->{
'stats'
}->{
$cmdname
}++;
my
$flags
= 0;
$key
=
ref
$key
?
$key
->[1] :
$key
;
if
(
ref
$val
) {
die
"append or prepend cannot take a reference"
if
$app_or_prep
;
local
$Carp::CarpLevel
= 2;
$val
= Storable::nfreeze(
$val
);
$flags
|= F_STORABLE;
}
warn
"value for memkey:$key is not defined"
unless
defined
$val
;
my
$len
=
length
(
$val
);
if
(
$self
->{
'compress_threshold'
} &&
$HAVE_ZLIB
&&
$self
->{
'compress_enable'
} &&
$len
>=
$self
->{
'compress_threshold'
} && !
$app_or_prep
) {
my
$c_val
= Compress::Zlib::memGzip(
$val
);
my
$c_len
=
length
(
$c_val
);
if
(
$c_len
<
$len
*(1 - COMPRESS_SAVINGS)) {
$val
=
$c_val
;
$len
=
$c_len
;
$flags
|= F_COMPRESS;
}
}
$exptime
=
int
(
$exptime
|| 0);
local
$SIG
{
'PIPE'
} =
"IGNORE"
unless
$FLAG_NOSIGNAL
;
my
$line
=
"$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n"
;
my
$res
= _write_and_read(
$self
,
$sock
,
$line
);
if
(
$self
->{
'debug'
} &&
$line
) {
chop
$line
;
chop
$line
;
print
STDERR
"Cache::Memcache: $cmdname $self->{namespace}$key = $val ($line)\n"
;
}
if
(
$self
->{
'stat_callback'
}) {
my
$etime
= Time::HiRes::
time
();
$self
->{
'stat_callback'
}->(
$stime
,
$etime
,
$sock
,
$cmdname
);
}
return
defined
$res
&&
$res
eq
"STORED\r\n"
;
}
sub
incr {
_incrdecr(
"incr"
,
@_
);
}
sub
decr {
_incrdecr(
"decr"
,
@_
);
}
sub
_incrdecr {
my
$cmdname
=
shift
;
my
Cache::Memcached
$self
=
shift
;
my
(
$key
,
$value
) =
@_
;
return
undef
if
!
$self
->{
'active'
} ||
$self
->{
'readonly'
};
my
$stime
= Time::HiRes::
time
()
if
$self
->{
'stat_callback'
};
my
$sock
=
$self
->get_sock(
$key
);
return
undef
unless
$sock
;
$key
=
$key
->[1]
if
ref
$key
;
$self
->{
'stats'
}->{
$cmdname
}++;
$value
= 1
unless
defined
$value
;
my
$line
=
"$cmdname $self->{namespace}$key $value\r\n"
;
my
$res
= _write_and_read(
$self
,
$sock
,
$line
);
if
(
$self
->{
'stat_callback'
}) {
my
$etime
= Time::HiRes::
time
();
$self
->{
'stat_callback'
}->(
$stime
,
$etime
,
$sock
,
$cmdname
);
}
return
undef
unless
defined
$res
&&
$res
=~ /^(\d+)/;
return
$1;
}
sub
get {
my
Cache::Memcached
$self
=
$_
[0];
my
$key
=
$_
[1];
my
$r
=
$self
->get_multi(
$key
);
my
$kval
=
ref
$key
?
$key
->[1] :
$key
;
Encode::_utf8_off(
$kval
)
if
Encode::is_utf8(
$kval
);
return
$r
->{
$kval
};
}
sub
get_multi {
my
Cache::Memcached
$self
=
shift
;
return
{}
unless
$self
->{
'active'
};
$self
->{
'_stime'
} = Time::HiRes::
time
()
if
$self
->{
'stat_callback'
};
$self
->{
'stats'
}->{
"get_multi"
}++;
my
%val
;
my
%sock_keys
;
my
$sock
;
if
(
$self
->{
'_single_sock'
}) {
$sock
=
$self
->sock_to_host(
$self
->{
'_single_sock'
});
unless
(
$sock
) {
return
{};
}
foreach
my
$key
(
@_
) {
my
$kval
=
ref
$key
?
$key
->[1] :
$key
;
push
@{
$sock_keys
{
$sock
}},
$kval
;
}
}
else
{
my
$bcount
=
$self
->{
'bucketcount'
};
my
$sock
;
if
(
$self
->{
'buck2sock_generation'
} !=
$socket_cache_generation
) {
$self
->{
'buck2sock_generation'
} =
$socket_cache_generation
;
$self
->{
'buck2sock'
} = [];
}
KEY:
foreach
my
$key
(
@_
) {
my
(
$hv
,
$real_key
) =
ref
$key
?
(
int
(
$key
->[0]),
$key
->[1]) :
((crc32(
$key
) >> 16) & 0x7fff,
$key
);
my
$tries
;
while
(1) {
my
$bucket
=
$hv
%
$bcount
;
$sock
=
$self
->{
'buck2sock'
}->[
$bucket
] ||
$self
->sock_to_host(
$self
->{buckets}[
$bucket
]);
if
(
$sock
) {
$self
->{
'buck2sock'
}->[
$bucket
] =
$sock
;
last
;
}
next
KEY
if
$tries
++ >= 20;
$hv
+= _hashfunc(
$tries
.
$real_key
);
}
push
@{
$sock_keys
{
$sock
}},
$real_key
;
}
}
$self
->{
'stats'
}->{
"get_keys"
} +=
@_
;
$self
->{
'stats'
}->{
"get_socks"
} +=
keys
%sock_keys
;
local
$SIG
{
'PIPE'
} =
"IGNORE"
unless
$FLAG_NOSIGNAL
;
_load_multi(
$self
, \
%sock_keys
, \
%val
);
if
(
$self
->{
'debug'
}) {
while
(
my
(
$k
,
$v
) =
each
%val
) {
print
STDERR
"MemCache: got $k = $v\n"
;
}
}
return
\
%val
;
}
sub
_load_multi {
my
Cache::Memcached
$self
;
my
(
$sock_keys
,
$ret
);
(
$self
,
$sock_keys
,
$ret
) =
@_
;
my
%reading
;
my
%writing
;
my
%buf
;
my
%parser
;
my
$active_changed
= 1;
my
$dead
=
sub
{
my
$sock
=
shift
;
print
STDERR
"killing socket $sock\n"
if
$self
->{
'debug'
} >= 2;
delete
$reading
{
$sock
};
delete
$writing
{
$sock
};
if
(
my
$p
=
$parser
{
$sock
}) {
my
$key
=
$p
->current_key;
delete
$ret
->{
$key
}
if
$key
;
}
if
(
$self
->{
'stat_callback'
}) {
my
$etime
= Time::HiRes::
time
();
$self
->{
'stat_callback'
}->(
$self
->{
'_stime'
},
$etime
,
$sock
,
'get_multi'
);
}
close
$sock
;
$self
->_dead_sock(
$sock
);
};
my
$finalize
=
sub
{
my
$map
=
$_
[0];
$map
= {
@_
}
unless
ref
$map
;
while
(
my
(
$k
,
$flags
) =
each
%$map
) {
chop
$ret
->{
$k
};
chop
$ret
->{
$k
};
$ret
->{
$k
} = Compress::Zlib::memGunzip(
$ret
->{
$k
})
if
$HAVE_ZLIB
&&
$flags
& F_COMPRESS;
if
(
$flags
& F_STORABLE) {
eval
{
$ret
->{
$k
} = Storable::thaw(
$ret
->{
$k
});
};
if
($@) {
delete
$ret
->{
$k
};
}
}
}
};
foreach
(
keys
%$sock_keys
) {
my
$ipport
=
$sock_map
{
$_
} or
die
"No map found matching for $_"
;
my
$sock
=
$cache_sock
{
$ipport
} or
die
"No sock found for $ipport"
;
print
STDERR
"processing socket $_\n"
if
$self
->{
'debug'
} >= 2;
$writing
{
$_
} =
$sock
;
if
(
$self
->{namespace}) {
$buf
{
$_
} =
join
(
" "
,
'get'
, (
map
{
"$self->{namespace}$_"
} @{
$sock_keys
->{
$_
}}),
"\r\n"
);
}
else
{
$buf
{
$_
} =
join
(
" "
,
'get'
, @{
$sock_keys
->{
$_
}},
"\r\n"
);
}
$parser
{
$_
} =
$self
->{parser_class}->new(
$ret
,
$self
->{namespace_len},
$finalize
);
}
my
$read
=
sub
{
my
$sockstr
=
"$_[0]"
;
my
$p
=
$parser
{
$sockstr
} or
die
;
my
$rv
=
$p
->parse_from_sock(
$_
[0]);
if
(
$rv
> 0) {
delete
$reading
{
$sockstr
};
}
elsif
(
$rv
< 0) {
$dead
->(
$_
[0]);
}
return
$rv
;
};
my
$write
=
sub
{
my
(
$sock
,
$sockstr
) = (
$_
[0],
"$_[0]"
);
my
$res
;
$res
=
send
(
$sock
,
$buf
{
$sockstr
},
$FLAG_NOSIGNAL
);
return
0
if
not
defined
$res
and $!==EWOULDBLOCK;
unless
(
$res
> 0) {
$dead
->(
$sock
);
return
1;
}
if
(
$res
==
length
(
$buf
{
$sockstr
})) {
$buf
{
$sockstr
} =
""
;
delete
$writing
{
$sockstr
};
$reading
{
$sockstr
} =
$sock
;
return
1;
}
else
{
substr
(
$buf
{
$sockstr
}, 0,
$res
,
''
);
}
return
0;
};
my
(
$rin
,
$rout
,
$win
,
$wout
);
my
$nfound
;
while
(1) {
if
(
$active_changed
) {
last
unless
%reading
or
%writing
;
(
$rin
,
$win
) = (
''
,
''
);
foreach
(
values
%reading
) {
vec
(
$rin
,
fileno
(
$_
), 1) = 1;
}
foreach
(
values
%writing
) {
vec
(
$win
,
fileno
(
$_
), 1) = 1;
}
$active_changed
= 0;
}
$nfound
=
select
(
$rout
=
$rin
,
$wout
=
$win
,
undef
,
$self
->{
'select_timeout'
});
last
unless
$nfound
;
foreach
(
values
%writing
) {
if
(
vec
(
$wout
,
fileno
(
$_
), 1)) {
$active_changed
= 1
if
$write
->(
$_
);
}
}
foreach
(
values
%reading
) {
if
(
vec
(
$rout
,
fileno
(
$_
), 1)) {
$active_changed
= 1
if
$read
->(
$_
);
}
}
}
foreach
(
values
%writing
) {
$dead
->(
$_
);
}
foreach
(
values
%reading
) {
$dead
->(
$_
);
}
return
;
}
sub
_hashfunc {
return
(crc32(
$_
[0]) >> 16) & 0x7fff;
}
sub
flush_all {
my
Cache::Memcached
$self
=
shift
;
my
$success
= 1;
my
@hosts
= @{
$self
->{
'buckets'
}};
foreach
my
$host
(
@hosts
) {
my
$sock
=
$self
->sock_to_host(
$host
);
my
@res
=
$self
->run_command(
$sock
,
"flush_all\r\n"
);
$success
= 0
unless
(
scalar
@res
== 1 && ((
$res
[0] ||
""
) eq
"OK\r\n"
));
}
return
$success
;
}
sub
run_command {
my
Cache::Memcached
$self
=
shift
;
my
(
$sock
,
$cmd
) =
@_
;
return
()
unless
$sock
;
my
$ret
;
my
$line
=
$cmd
;
while
(
my
$res
= _write_and_read(
$self
,
$sock
,
$line
)) {
undef
$line
;
$ret
.=
$res
;
last
if
$ret
=~ /(?:OK|END|ERROR)\r\n$/;
}
chop
$ret
;
chop
$ret
;
return
map
{
"$_\r\n"
}
split
(/\r\n/,
$ret
);
}
sub
stats {
my
Cache::Memcached
$self
=
shift
;
my
(
$types
) =
@_
;
return
0
unless
$self
->{
'active'
};
return
0
unless
!
ref
(
$types
) ||
ref
(
$types
) eq
'ARRAY'
;
if
(!
ref
(
$types
)) {
if
(!
$types
) {
$types
= [
qw( misc malloc self )
];
}
else
{
$types
= [
$types
];
}
}
my
$stats_hr
= { };
if
(
grep
/^self$/,
@$types
) {
$stats_hr
->{
'self'
} = \%{
$self
->{
'stats'
} };
}
my
%misc_keys
=
map
{
$_
=> 1 }
qw/ bytes bytes_read bytes_written
cmd_get cmd_set connection_structures curr_items
get_hits get_misses
total_connections total_items
/
;
my
@hosts
= @{
$self
->{
'buckets'
}};
HOST:
foreach
my
$host
(
@hosts
) {
my
$sock
=
$self
->sock_to_host(
$host
);
next
HOST
unless
$sock
;
TYPE:
foreach
my
$typename
(
grep
!/^self$/,
@$types
) {
my
$type
=
$typename
eq
'misc'
?
""
:
" $typename"
;
my
$lines
= _write_and_read(
$self
,
$sock
,
"stats$type\r\n"
,
sub
{
my
$bref
=
shift
;
return
$$bref
=~ /^(?:END|ERROR)\r?\n/m;
});
unless
(
$lines
) {
$self
->_dead_sock(
$sock
);
next
HOST;
}
$lines
=~ s/\0//g;
my
@lines
=
split
(/\r?\n/,
$lines
);
if
(
$typename
=~ /^(malloc|sizes|misc)$/) {
foreach
my
$line
(
@lines
) {
my
(
$key
,
$value
) =
$line
=~ /^(?:STAT )?(\w+)\s(.*)/;
if
(
$key
) {
$stats_hr
->{
'hosts'
}{
$host
}{
$typename
}{
$key
} =
$value
;
}
$stats_hr
->{
'total'
}{
$key
} +=
$value
if
$typename
eq
'misc'
&&
$key
&&
$misc_keys
{
$key
};
$stats_hr
->{
'total'
}{
"malloc_$key"
} +=
$value
if
$typename
eq
'malloc'
&&
$key
;
}
}
else
{
$lines
=~ s/^END\r?\n//m;
$stats_hr
->{
'hosts'
}{
$host
}{
$typename
} ||=
""
;
$stats_hr
->{
'hosts'
}{
$host
}{
$typename
} .=
"$lines"
;
}
}
}
return
$stats_hr
;
}
sub
stats_reset {
my
Cache::Memcached
$self
=
shift
;
my
(
$types
) =
@_
;
return
0
unless
$self
->{
'active'
};
HOST:
foreach
my
$host
(@{
$self
->{
'buckets'
}}) {
my
$sock
=
$self
->sock_to_host(
$host
);
next
HOST
unless
$sock
;
my
$ok
= _write_and_read(
$self
,
$sock
,
"stats reset"
);
unless
(
defined
$ok
&&
$ok
eq
"RESET\r\n"
) {
$self
->_dead_sock(
$sock
);
}
}
return
1;
}
1;