our
$VERSION
=
"2.57"
;
$VERSION
=
eval
$VERSION
;
our
$DEBUG
= 0;
my
%key_pos
= (
append
=> 1,
bitcount
=> 1,
bitop
=> 2,
bitpos
=> 1,
blpop
=> 1,
brpop
=> 1,
brpoplpush
=> 1,
decr
=> 1,
decrby
=> 1,
del
=> 1,
dump
=> 1,
exists
=> 1,
expire
=> 1,
expireat
=> 1,
get
=> 1,
getbit
=> 1,
getrange
=> 1,
getset
=> 1,
hdel
=> 1,
hexists
=> 1,
hget
=> 1,
hgetall
=> 1,
hincrby
=> 1,
hincrbyfloat
=> 1,
hkeys
=> 1,
hlen
=> 1,
hmget
=> 1,
hmset
=> 1,
hscan
=> 1,
hset
=> 1,
hsetnx
=> 1,
hvals
=> 1,
incr
=> 1,
incrby
=> 1,
incrbyfloat
=> 1,
lindex
=> 1,
linsert
=> 1,
llen
=> 1,
lpop
=> 1,
lpush
=> 1,
lpushx
=> 1,
lrange
=> 1,
lrem
=> 1,
lset
=> 1,
ltrim
=> 1,
mget
=> 1,
move
=> 1,
mset
=> 1,
msetnx
=> 1,
object
=> 2,
persist
=> 1,
pexpire
=> 1,
pexpireat
=> 1,
pfadd
=> 1,
pfcount
=> 1,
pfmerge
=> 1,
psetex
=> 1,
pttl
=> 1,
rename
=> 1,
renamenx
=> 1,
restore
=> 1,
'restore-asking'
=> 1,
rpop
=> 1,
rpoplpush
=> 1,
rpush
=> 1,
rpushx
=> 1,
sadd
=> 1,
scard
=> 1,
sdiff
=> 1,
sdiffstore
=> 1,
set
=> 1,
setbit
=> 1,
setex
=> 1,
setnx
=> 1,
setrange
=> 1,
sinter
=> 1,
sinterstore
=> 1,
sismember
=> 1,
smembers
=> 1,
smove
=> 1,
sort
=> 1,
spop
=> 1,
srandmember
=> 1,
srem
=> 1,
sscan
=> 1,
strlen
=> 1,
substr
=> 1,
sunion
=> 1,
sunionstore
=> 1,
ttl
=> 1,
type
=> 1,
watch
=> 1,
zadd
=> 1,
zcard
=> 1,
zcount
=> 1,
zincrby
=> 1,
zlexcount
=> 1,
zrange
=> 1,
zrangebylex
=> 1,
zrangebyscore
=> 1,
zrank
=> 1,
zrem
=> 1,
zremrangebylex
=> 1,
zremrangebyrank
=> 1,
zremrangebyscore
=> 1,
zrevrange
=> 1,
zrevrangebylex
=> 1,
zrevrangebyscore
=> 1,
zrevrank
=> 1,
zscan
=> 1,
zscore
=> 1,
);
sub
new {
my
(
$class
,
%params
) =
@_
;
my
$self
= {
_slots
=> [],
_connections
=> {},
_nodes
=>
$params
{startup_nodes},
_password
=>
$params
{password},
};
$self
->{no_slots_initialization} = 1
if
$params
{no_slots_initialization};
bless
$self
,
$class
;
$self
->_initialize_slots;
return
$self
;
}
sub
_initialize_slots {
my
$self
=
shift
;
return
if
$self
->{no_slots_initialization};
unless
(
$self
->{_nodes} and @{
$self
->{_nodes} } ) {
confess
"list of cluster nodes is empty"
;
}
my
%new_nodes
;
my
$new_nodes
;
for
my
$node
( @{
$self
->{_nodes} } ) {
my
$redis
= _connect_to_node(
$self
,
$node
);
next
unless
$redis
;
my
$nodes
=
$redis
->cluster_nodes;
next
if
ref
(
$nodes
) =~ /^RedisDB::Error/;
$new_nodes
=
$nodes
;
for
(
@$nodes
) {
$new_nodes
{
"$_->{host}:$_->{port}"
}++;
}
my
$slots
=
$redis
->cluster(
'SLOTS'
);
confess
"got an error trying retrieve a list of cluster slots: $slots"
if
ref
$slots
=~ /^RedisDB::Error/;
for
(
@$slots
) {
my
(
$ip
,
$port
) = @{
$_
->[2] };
my
$node_key
=
"$ip:$port"
;
for
(
$_
->[0] ..
$_
->[1] ) {
$self
->{_slots}[
$_
] =
$node_key
;
}
}
last
;
}
unless
(
$new_nodes
and
@$new_nodes
) {
confess
"couldn't get list of cluster nodes"
;
}
$self
->{_nodes} =
$new_nodes
;
for
(
keys
%{
$self
->{_connections} } ) {
delete
$self
->{_connections}{
$_
}
unless
$new_nodes
{
$_
};
}
return
;
}
sub
execute {
my
$self
=
shift
;
my
@args
=
@_
;
my
$command
=
lc
$args
[0];
confess
"Command $command does not have key"
unless
$key_pos
{
$command
};
my
$key
=
$args
[
$key_pos
{
$command
} ];
confess
"Key is not specified in: "
,
join
" "
,
@args
unless
length
$key
;
if
(
$self
->{_refresh_slots} ) {
$self
->_initialize_slots;
}
my
$slot
= key_slot(
$key
);
my
$node_key
=
$self
->{_slots}[
$slot
]
||
"$self->{_nodes}[0]{host}:$self->{_nodes}[0]{port}"
;
my
$asking
;
my
$last_connection
;
my
$attempts
= 10;
while
(
$attempts
-- ) {
my
$redis
=
$self
->{_connections}{
$node_key
};
unless
(
$redis
) {
my
(
$host
,
$port
) =
split
/:([^:]+)$/,
$node_key
;
$redis
= _connect_to_node(
$self
,
{
host
=>
$host
,
port
=>
$port
}
);
}
my
$res
;
if
(
$redis
) {
$redis
->asking(RedisDB::IGNORE_REPLY)
if
$asking
;
$asking
= 0;
$res
=
$redis
->execute(
@args
);
}
else
{
$res
= RedisDB::Error::DISCONNECTED->new(
"Couldn't connect to redis server at $node_key"
);
}
if
(
ref
$res
eq
'RedisDB::Error::MOVED'
) {
if
(
$res
->{slot} ne
$slot
) {
confess
"Incorrectly computed slot for key '$key', ours $slot, theirs $res->{slot}"
;
}
warn
"slot $slot moved to $res->{host}:$res->{port}"
if
$DEBUG
;
$node_key
=
$self
->{_slots}[
$slot
] =
"$res->{host}:$res->{port}"
;
$self
->{_refresh_slots} = 1;
next
;
}
elsif
(
ref
$res
eq
'RedisDB::Error::ASK'
) {
warn
"asking $res->{host}:$res->{port} about slot $slot"
if
$DEBUG
;
$node_key
=
"$res->{host}:$res->{port}"
;
$asking
= 1;
next
;
}
elsif
(
ref
$res
eq
'RedisDB::Error::DISCONNECTED'
) {
warn
"$res"
if
$DEBUG
;
delete
$self
->{_connections}{
$node_key
};
usleep 100_000;
if
(
$last_connection
and
$last_connection
eq
$node_key
) {
warn
"refreshing slots table"
if
$DEBUG
;
$self
->_initialize_slots;
return
$res
if
$self
->{_slots}[
$slot
] eq
$node_key
;
warn
"got a new host for the slot"
if
$DEBUG
;
}
else
{
warn
"trying to reconnect"
if
$DEBUG
;
$last_connection
=
$node_key
;
}
next
;
}
return
$res
;
}
return
RedisDB::Error::DISCONNECTED->new(
"Couldn't send command after 10 attempts"
);
}
for
my
$command
(
keys
%key_pos
) {
no
strict
'refs'
;
*{ __PACKAGE__ .
"::$command"
} =
sub
{ execute(
shift
,
$command
,
@_
) };
}
sub
random_connection {
my
$self
=
shift
;
my
(
$connection
) =
values
%{
$self
->{_connections} };
unless
(
$connection
) {
for
( @{
$self
->{_nodes} } ) {
$connection
= _connect_to_node(
$self
,
$_
);
last
if
$connection
;
}
}
return
$connection
;
}
sub
node_for_slot {
my
(
$self
,
$slot
,
%params
) =
@_
;
if
(
$self
->{_refresh_slots} ) {
$self
->_initialize_slots;
}
my
$node_key
=
$self
->{_slots}[
$slot
]
or confess
"Don't know master node for slot $slot"
;
my
(
$host
,
$port
) =
split
/:([^:]+)$/,
$node_key
;
return
RedisDB->new(
%params
,
host
=>
$host
,
port
=>
$port
);
}
sub
node_for_key {
my
(
$self
,
$key
,
%params
) =
@_
;
return
$self
->node_for_slot(key_slot(
$key
),
%params
);
}
sub
add_new_node {
my
(
$self
,
$addr
,
$master_id
) =
@_
;
$addr
= _ensure_hash_address(
$addr
);
my
$redis
= _connect_to_node(
$self
,
$addr
);
my
$ok
;
for
my
$node
( @{
$self
->{_nodes} } ) {
$redis
->cluster(
'MEET'
,
$node
->{host},
$node
->{port},
sub
{
$ok
++
if
not
ref
$_
[1] and
$_
[1] eq
'OK'
;
warn
$_
[1]
if
ref
$_
[1]; }
);
}
$redis
->mainloop;
croak
"failed to attach node to cluster"
unless
$ok
;
if
(
$master_id
) {
my
$attempt
= 0;
my
$nodes
=
$redis
->cluster_nodes;
while
( not
grep
{
$_
->{node_id} eq
$master_id
}
@$nodes
) {
croak
"failed to start replication from $master_id - node is not present"
if
$attempt
++ >= 10;
usleep 100_000 *
$attempt
;
$nodes
=
$redis
->cluster_nodes;
}
my
$res
=
$redis
->cluster(
'REPLICATE'
,
$master_id
);
croak
$res
if
ref
$res
=~ /^RedisDB::Error/;
}
return
'OK'
;
}
sub
migrate_slot {
my
(
$self
,
$slot
,
$dst
) =
@_
;
$self
->_initialize_slots;
my
$src_key
=
$self
->{_slots}[
$slot
];
confess
"mapping for slot $slot is not defined"
unless
$src_key
;
$dst
=
$self
->_get_node_info(
$dst
)
or confess
"destination node is seems not a part of the cluster"
;
my
$dst_key
=
"$dst->{host}:$dst->{port}"
;
warn
"migrating slot $slot from $src_key to $dst_key"
if
$DEBUG
;
return
if
$src_key
eq
$dst_key
;
my
$src
=
$self
->_get_node_info(
$src_key
);
my
$dst_redis
= _connect_to_node(
$self
,
$dst
)
or confess
"couldn't connect to destination node"
;
my
$src_redis
= _connect_to_node(
$self
,
$src
)
or confess
"couldn't connect to source node"
;
my
$res
=
$dst_redis
->cluster(
'setslot'
,
$slot
,
'importing'
,
$src
->{node_id} );
confess
"$res"
unless
"$res"
eq
'OK'
;
$res
=
$src_redis
->cluster(
'setslot'
,
$slot
,
'migrating'
,
$dst
->{node_id} );
confess
"$res"
unless
"$res"
eq
'OK'
;
warn
"set slots on dst/src nodes to importing/migrating state"
if
$DEBUG
;
my
$migrated
= 0;
while
(1) {
my
$keys
=
$src_redis
->cluster(
'getkeysinslot'
,
$slot
, 1000 );
confess
"Migration failed: $keys"
if
ref
$keys
=~ /^RedisDB::Error/;
last
unless
@$keys
;
for
(
@$keys
) {
$res
=
$src_redis
->migrate(
$dst
->{host},
$dst
->{port},
$_
, 0, 60 );
confess
"Migration failed: $res"
unless
"$res"
eq
'OK'
;
$migrated
++;
}
}
warn
"migrated $migrated keys from the slot"
if
$DEBUG
;
$res
=
$dst_redis
->cluster(
'setslot'
,
$slot
,
'node'
,
$dst
->{node_id} );
confess
"$res"
unless
"$res"
eq
'OK'
;
$res
=
$src_redis
->cluster(
'setslot'
,
$slot
,
'node'
,
$src
->{node_id} );
confess
"$res"
unless
"$res"
eq
'OK'
;
warn
"migration is finished"
if
$DEBUG
;
return
1;
}
sub
remove_node {
my
(
$self
,
$node
) =
@_
;
$self
->_initialize_slots;
$node
=
$self
->_get_node_info(
$node
);
my
$node_key
=
"$node->{host}:$node->{port}"
;
if
(
$node
->{flags}{master} ) {
my
@masters
;
my
@slaves
;
for
( @{
$self
->{_nodes} } ) {
if
(
$_
->{flags}{slave} ) {
push
@slaves
,
$_
if
$_
->{master_id} eq
$node
->{node_id};
next
;
}
next
if
$_
->{node_id} eq
$node
->{node_id};
push
@masters
,
$_
;
}
my
@slots
;
my
%slots_at
;
for
my
$i
( 0 .. 16383 ) {
push
@slots
,
$i
if
$self
->{_slots}[
$i
] eq
$node_key
;
$slots_at
{
$self
->{_slots}[
$i
] }++;
}
if
(
$DEBUG
) {
warn
"Node to remove is a master with "
.
scalar
(
@slaves
)
.
"\nIt holds "
.
scalar
(
@slots
)
.
" slots."
.
"\nThere are "
.
scalar
(
@masters
)
.
" other masters in cluster\n"
;
}
my
$slots_per_master
=
int
( 16384 /
@masters
+ 1 );
my
$slaves_per_master
=
int
(
@slaves
/
@masters
+ 1 );
for
my
$master
(
@masters
) {
my
$key
=
"$master->{host}:$master->{port}"
;
for
(
$slots_at
{
$key
} + 1 ..
$slots_per_master
) {
my
$slot
=
shift
@slots
;
last
unless
defined
$slot
;
$self
->migrate_slot(
$slot
,
$master
);
}
for
( 1 ..
$slaves_per_master
) {
my
$slave
=
shift
@slaves
or
last
;
my
$redis
=
$self
->_connect_to_node(
$slave
) or
next
;
my
$res
=
$redis
->cluster(
'replicate'
,
$master
->{node_id} );
warn
"Failed to reconfigure slave $slave->{host}:$slave->{port}"
.
" to replicate from $master->{node_id}: $res"
if
ref
$res
=~ /^RedisDB::Error/;
}
}
}
my
$redis
=
delete
$self
->{_connections}{
$node_key
};
$redis
->
shutdown
;
my
@nodes
;
for
( @{
$self
->{_nodes} } ) {
next
if
$_
->{node_id} eq
$node
->{node_id};
push
@nodes
,
$_
;
my
$redis
=
$self
->_connect_to_node(
$_
) or
next
;
my
$res
=
$redis
->cluster(
'forget'
,
$node
->{node_id} );
warn
"$_->{host}:$_->{port} could not forget the node: $res"
if
$res
=~ /^RedisDB::Error/;
}
$self
->{_nodes} = \
@nodes
;
return
1;
}
sub
_get_node_info {
my
(
$self
,
$node
) =
@_
;
$node
= _ensure_hash_address(
$node
);
for
( @{
$self
->{_nodes} } ) {
return
$_
if
$node
->{host} eq
$_
->{host} and
$node
->{port} eq
$_
->{port};
}
return
;
}
sub
_ensure_hash_address {
my
$addr
=
shift
;
unless
(
ref
$addr
eq
'HASH'
) {
my
(
$host
,
$port
) =
split
/:([^:]+)$/,
$addr
;
croak
"invalid address spec: $addr"
unless
$host
and
$port
;
$addr
= {
host
=>
$host
,
port
=>
$port
};
}
return
$addr
;
}
sub
_connect_to_node {
my
(
$self
,
$node
) =
@_
;
my
$host_key
=
"$node->{host}:$node->{port}"
;
unless
(
$self
->{_connections}{
$host_key
} ) {
my
$redis
= RedisDB->new(
host
=>
$node
->{host},
port
=>
$node
->{port},
raise_error
=> 0,
password
=>
$self
->{_password},
);
$self
->{_connections}{
$host_key
} =
$redis
->{_socket} ?
$redis
:
undef
;
}
return
$self
->{_connections}{
$host_key
};
}
my
@crc16tab
= (
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50a5, 0x60c6, 0x70e7,
0x8108, 0x9129, 0xa14a, 0xb16b, 0xc18c, 0xd1ad, 0xe1ce, 0xf1ef,
0x1231, 0x0210, 0x3273, 0x2252, 0x52b5, 0x4294, 0x72f7, 0x62d6,
0x9339, 0x8318, 0xb37b, 0xa35a, 0xd3bd, 0xc39c, 0xf3ff, 0xe3de,
0x2462, 0x3443, 0x0420, 0x1401, 0x64e6, 0x74c7, 0x44a4, 0x5485,
0xa56a, 0xb54b, 0x8528, 0x9509, 0xe5ee, 0xf5cf, 0xc5ac, 0xd58d,
0x3653, 0x2672, 0x1611, 0x0630, 0x76d7, 0x66f6, 0x5695, 0x46b4,
0xb75b, 0xa77a, 0x9719, 0x8738, 0xf7df, 0xe7fe, 0xd79d, 0xc7bc,
0x48c4, 0x58e5, 0x6886, 0x78a7, 0x0840, 0x1861, 0x2802, 0x3823,
0xc9cc, 0xd9ed, 0xe98e, 0xf9af, 0x8948, 0x9969, 0xa90a, 0xb92b,
0x5af5, 0x4ad4, 0x7ab7, 0x6a96, 0x1a71, 0x0a50, 0x3a33, 0x2a12,
0xdbfd, 0xcbdc, 0xfbbf, 0xeb9e, 0x9b79, 0x8b58, 0xbb3b, 0xab1a,
0x6ca6, 0x7c87, 0x4ce4, 0x5cc5, 0x2c22, 0x3c03, 0x0c60, 0x1c41,
0xedae, 0xfd8f, 0xcdec, 0xddcd, 0xad2a, 0xbd0b, 0x8d68, 0x9d49,
0x7e97, 0x6eb6, 0x5ed5, 0x4ef4, 0x3e13, 0x2e32, 0x1e51, 0x0e70,
0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
);
sub
crc16 {
my
$buf
=
shift
;
if
( utf8::is_utf8(
$buf
) ) {
die
"Can't compute crc16 for string with wide characters.\n"
.
"You should encode strings you pass to redis as bytes"
;
}
my
$crc
= 0;
for
(
split
//,
$buf
) {
$crc
=
(
$crc
<< 8 & 0xFF00 ) ^
$crc16tab
[ ( (
$crc
>> 8 ) ^
ord
) & 0x00FF ];
}
return
$crc
;
}
sub
key_slot {
my
$key
=
shift
;
if
(
$key
=~ /\{([^}]+)\}/ ) {
$key
= $1;
}
return
crc16(
$key
) & 16383;
}
1;