/*
* Copyright (c) 2015-2017, Ieshen Zheng <ieshen.zheng at 163 dot com>
* Copyright (c) 2020, Nick <heronr1 at gmail dot com>
* Copyright (c) 2020-2021, Bjorn Svensson <bjorn.a.svensson at est dot tech>
* Copyright (c) 2020-2021, Viktor Söderqvist <viktor.soderqvist at est dot tech>
* Copyright (c) 2021, Red Hat
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#define _XOPEN_SOURCE 600
#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include <hiredis/alloc.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "adlist.h"
#include "command.h"
#include "dict.h"
#include "hiarray.h"
#include "hircluster.h"
#include "hiutil.h"
#include "win32.h"
// Cluster errors are offset by 100 to be sufficiently out of range of
// standard Redis errors
#define REDIS_ERR_CLUSTER_TOO_MANY_RETRIES 100
#define REDIS_ERROR_MOVED "MOVED"
#define REDIS_ERROR_ASK "ASK"
#define REDIS_ERROR_TRYAGAIN "TRYAGAIN"
#define REDIS_ERROR_CLUSTERDOWN "CLUSTERDOWN"
#define REDIS_STATUS_OK "OK"
#define REDIS_COMMAND_CLUSTER_NODES "CLUSTER NODES"
#define REDIS_COMMAND_CLUSTER_SLOTS "CLUSTER SLOTS"
#define REDIS_COMMAND_ASKING "ASKING"
#define IP_PORT_SEPARATOR ':'
#define PORT_CPORT_SEPARATOR '@'
#define CLUSTER_ADDRESS_SEPARATOR ","
#define CLUSTER_DEFAULT_MAX_RETRY_COUNT 5
#define NO_RETRY -1
#define CRLF "\x0d\x0a"
#define CRLF_LEN (sizeof("\x0d\x0a") - 1)
#define SLOTMAP_UPDATE_THROTTLE_USEC 1000000
#define SLOTMAP_UPDATE_ONGOING INT64_MAX
typedef struct cluster_async_data {
redisClusterAsyncContext *acc;
struct cmd *command;
redisClusterCallbackFn *callback;
int retry_count;
void *privdata;
} cluster_async_data;
typedef enum CLUSTER_ERR_TYPE {
CLUSTER_NOT_ERR = 0,
CLUSTER_ERR_MOVED,
CLUSTER_ERR_ASK,
CLUSTER_ERR_TRYAGAIN,
CLUSTER_ERR_CLUSTERDOWN,
CLUSTER_ERR_SENTINEL
} CLUSTER_ERR_TYPE;
static void freeRedisClusterNode(redisClusterNode *node);
static void cluster_slot_destroy(cluster_slot *slot);
static void cluster_open_slot_destroy(copen_slot *oslot);
static int updateNodesAndSlotmap(redisClusterContext *cc, dict *nodes);
static int updateSlotMapAsync(redisClusterAsyncContext *acc,
redisAsyncContext *ac);
void listClusterNodeDestructor(void *val) { freeRedisClusterNode(val); }
void listClusterSlotDestructor(void *val) { cluster_slot_destroy(val); }
unsigned int dictSdsHash(const void *key) {
return dictGenHashFunction((unsigned char *)key, sdslen((char *)key));
}
int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2) {
int l1, l2;
DICT_NOTUSED(privdata);
l1 = sdslen((sds)key1);
l2 = sdslen((sds)key2);
if (l1 != l2)
return 0;
return memcmp(key1, key2, l1) == 0;
}
void dictSdsDestructor(void *privdata, void *val) {
DICT_NOTUSED(privdata);
sdsfree(val);
}
void dictClusterNodeDestructor(void *privdata, void *val) {
DICT_NOTUSED(privdata);
freeRedisClusterNode(val);
}
/* Cluster node hash table
* maps node address (1.2.3.4:6379) to a redisClusterNode
* Has ownership of redisClusterNode memory
*/
dictType clusterNodesDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictClusterNodeDestructor /* val destructor */
};
/* Referenced cluster node hash table
* maps node id (437c719f5.....) to a redisClusterNode
* No ownership of redisClusterNode memory
*/
dictType clusterNodesRefDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
NULL /* val destructor */
};
void listCommandFree(void *command) {
struct cmd *cmd = command;
command_destroy(cmd);
}
/* -----------------------------------------------------------------------------
* Key space handling
* -------------------------------------------------------------------------- */
/* We have 16384 hash slots. The hash slot of a given key is obtained
* as the least significant 14 bits of the crc16 of the key.
*
* However if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */
static unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{')
break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen)
return crc16(key, keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s + 1; e < keylen; e++)
if (key[e] == '}')
break;
/* No '}' or nothing betweeen {} ? Hash the whole key. */
if (e == keylen || e == s + 1)
return crc16(key, keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key + s + 1, e - s - 1) & 0x3FFF;
}
static void __redisClusterSetError(redisClusterContext *cc, int type,
const char *str) {
size_t len;
if (cc == NULL) {
return;
}
cc->err = type;
if (str != NULL) {
len = strlen(str);
len = len < (sizeof(cc->errstr) - 1) ? len : (sizeof(cc->errstr) - 1);
memcpy(cc->errstr, str, len);
cc->errstr[len] = '\0';
} else {
/* Only REDIS_ERR_IO may lack a description! */
assert(type == REDIS_ERR_IO);
strerror_r(errno, cc->errstr, sizeof(cc->errstr));
}
}
static int cluster_reply_error_type(redisReply *reply) {
if (reply == NULL) {
return REDIS_ERR;
}
if (reply->type == REDIS_REPLY_ERROR) {
if ((int)strlen(REDIS_ERROR_MOVED) < reply->len &&
memcmp(reply->str, REDIS_ERROR_MOVED, strlen(REDIS_ERROR_MOVED)) ==
0) {
return CLUSTER_ERR_MOVED;
} else if ((int)strlen(REDIS_ERROR_ASK) < reply->len &&
memcmp(reply->str, REDIS_ERROR_ASK,
strlen(REDIS_ERROR_ASK)) == 0) {
return CLUSTER_ERR_ASK;
} else if ((int)strlen(REDIS_ERROR_TRYAGAIN) < reply->len &&
memcmp(reply->str, REDIS_ERROR_TRYAGAIN,
strlen(REDIS_ERROR_TRYAGAIN)) == 0) {
return CLUSTER_ERR_TRYAGAIN;
} else if ((int)strlen(REDIS_ERROR_CLUSTERDOWN) < reply->len &&
memcmp(reply->str, REDIS_ERROR_CLUSTERDOWN,
strlen(REDIS_ERROR_CLUSTERDOWN)) == 0) {
return CLUSTER_ERR_CLUSTERDOWN;
} else {
return CLUSTER_ERR_SENTINEL;
}
}
return CLUSTER_NOT_ERR;
}
/* Create and initiate the cluster node structure */
static redisClusterNode *createRedisClusterNode(void) {
/* use calloc to guarantee all fields are zeroed */
return hi_calloc(1, sizeof(redisClusterNode));
}
/* Cleanup the cluster node structure */
static void freeRedisClusterNode(redisClusterNode *node) {
if (node == NULL) {
return;
}
sdsfree(node->name);
sdsfree(node->addr);
sdsfree(node->host);
redisFree(node->con);
if (node->acon != NULL) {
/* Detach this cluster node from the async context. This makes sure
* that redisAsyncFree() wont attempt to update the pointer via its
* dataCleanup and unlinkAsyncContextAndNode() */
node->acon->data = NULL;
redisAsyncFree(node->acon);
}
if (node->slots != NULL) {
listRelease(node->slots);
}
if (node->slaves != NULL) {
listRelease(node->slaves);
}
copen_slot **oslot;
if (node->migrating) {
while (hiarray_n(node->migrating)) {
oslot = hiarray_pop(node->migrating);
cluster_open_slot_destroy(*oslot);
}
hiarray_destroy(node->migrating);
}
if (node->importing) {
while (hiarray_n(node->importing)) {
oslot = hiarray_pop(node->importing);
cluster_open_slot_destroy(*oslot);
}
hiarray_destroy(node->importing);
}
hi_free(node);
}
static cluster_slot *cluster_slot_create(redisClusterNode *node) {
cluster_slot *slot;
slot = hi_calloc(1, sizeof(*slot));
if (slot == NULL) {
return NULL;
}
slot->node = node;
if (node != NULL) {
ASSERT(node->role == REDIS_ROLE_MASTER);
if (node->slots == NULL) {
node->slots = listCreate();
if (node->slots == NULL) {
cluster_slot_destroy(slot);
return NULL;
}
node->slots->free = listClusterSlotDestructor;
}
if (listAddNodeTail(node->slots, slot) == NULL) {
cluster_slot_destroy(slot);
return NULL;
}
}
return slot;
}
static int cluster_slot_ref_node(cluster_slot *slot, redisClusterNode *node) {
if (slot == NULL || node == NULL) {
return REDIS_ERR;
}
if (node->role != REDIS_ROLE_MASTER) {
return REDIS_ERR;
}
if (node->slots == NULL) {
node->slots = listCreate();
if (node->slots == NULL) {
return REDIS_ERR;
}
node->slots->free = listClusterSlotDestructor;
}
if (listAddNodeTail(node->slots, slot) == NULL) {
return REDIS_ERR;
}
slot->node = node;
return REDIS_OK;
}
static void cluster_slot_destroy(cluster_slot *slot) {
slot->start = 0;
slot->end = 0;
slot->node = NULL;
hi_free(slot);
}
static copen_slot *cluster_open_slot_create(uint32_t slot_num, int migrate,
sds remote_name,
redisClusterNode *node) {
copen_slot *oslot;
oslot = hi_calloc(1, sizeof(*oslot));
if (oslot == NULL) {
return NULL;
}
oslot->slot_num = slot_num;
oslot->migrate = migrate;
oslot->node = node;
oslot->remote_name = sdsdup(remote_name);
if (oslot->remote_name == NULL) {
hi_free(oslot);
return NULL;
}
return oslot;
}
static void cluster_open_slot_destroy(copen_slot *oslot) {
oslot->slot_num = 0;
oslot->migrate = 0;
oslot->node = NULL;
sdsfree(oslot->remote_name);
oslot->remote_name = NULL;
hi_free(oslot);
}
/**
* Handle password authentication in the synchronous API
*/
static int authenticate(redisClusterContext *cc, redisContext *c) {
if (cc == NULL || c == NULL) {
return REDIS_ERR;
}
// Skip if no password configured
if (cc->password == NULL) {
return REDIS_OK;
}
redisReply *reply;
if (cc->username != NULL) {
reply = redisCommand(c, "AUTH %s %s", cc->username, cc->password);
} else {
reply = redisCommand(c, "AUTH %s", cc->password);
}
if (reply == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command AUTH reply error (NULL)");
goto error;
}
if (reply->type == REDIS_REPLY_ERROR) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, reply->str);
goto error;
}
freeReplyObject(reply);
return REDIS_OK;
error:
freeReplyObject(reply);
return REDIS_ERR;
}
/**
* Return a new node with the "cluster slots" command reply.
*/
static redisClusterNode *node_get_with_slots(redisClusterContext *cc,
redisReply *host_elem,
redisReply *port_elem,
uint8_t role) {
redisClusterNode *node = NULL;
if (host_elem == NULL || port_elem == NULL) {
return NULL;
}
if (host_elem->type != REDIS_REPLY_STRING || host_elem->len <= 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"node ip is not string.");
goto error;
}
if (port_elem->type != REDIS_REPLY_INTEGER || port_elem->integer <= 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"node port is not integer.");
goto error;
}
if (!hi_valid_port((int)port_elem->integer)) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"node port is not valid.");
goto error;
}
node = createRedisClusterNode();
if (node == NULL) {
goto oom;
}
if (role == REDIS_ROLE_MASTER) {
node->slots = listCreate();
if (node->slots == NULL) {
goto oom;
}
node->slots->free = listClusterSlotDestructor;
}
node->addr = sdsnewlen(host_elem->str, host_elem->len);
if (node->addr == NULL) {
goto oom;
}
node->addr = sdscatfmt(node->addr, ":%i", port_elem->integer);
if (node->addr == NULL) {
goto oom;
}
node->host = sdsnewlen(host_elem->str, host_elem->len);
if (node->host == NULL) {
goto oom;
}
node->name = NULL;
node->port = (int)port_elem->integer;
node->role = role;
return node;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
// passthrough
error:
if (node != NULL) {
sdsfree(node->addr);
sdsfree(node->host);
hi_free(node);
}
return NULL;
}
/**
* Return a new node with the "cluster nodes" command reply.
*/
static redisClusterNode *node_get_with_nodes(redisClusterContext *cc,
sds *node_infos, int info_count,
uint8_t role) {
char *p = NULL;
redisClusterNode *node = NULL;
if (info_count < 8) {
return NULL;
}
node = createRedisClusterNode();
if (node == NULL) {
goto oom;
}
if (role == REDIS_ROLE_MASTER) {
node->slots = listCreate();
if (node->slots == NULL) {
goto oom;
}
node->slots->free = listClusterSlotDestructor;
}
/* Handle field <id> */
node->name = node_infos[0];
node_infos[0] = NULL; /* Ownership moved */
/* Handle field <ip:port@cport...>
* Remove @cport... since addr is used as a dict key which should be <ip>:<port> */
if ((p = strchr(node_infos[1], PORT_CPORT_SEPARATOR)) != NULL) {
sdsrange(node_infos[1], 0, p - node_infos[1] - 1 /* skip @ */);
}
node->addr = node_infos[1];
node_infos[1] = NULL; /* Ownership moved */
node->role = role;
/* Get the ip part */
if ((p = strrchr(node->addr, IP_PORT_SEPARATOR)) == NULL) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"server address is incorrect, port separator missing.");
goto error;
}
node->host = sdsnewlen(node->addr, p - node->addr);
if (node->host == NULL) {
goto oom;
}
p++; // remove found separator character
/* Get the port part */
node->port = hi_atoi(p, strlen(p));
return node;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
// passthrough
error:
freeRedisClusterNode(node);
return NULL;
}
static void cluster_nodes_swap_ctx(dict *nodes_f, dict *nodes_t) {
dictEntry *de_f, *de_t;
redisClusterNode *node_f, *node_t;
redisContext *c;
redisAsyncContext *ac;
if (nodes_f == NULL || nodes_t == NULL) {
return;
}
dictIterator di;
dictInitIterator(&di, nodes_t);
while ((de_t = dictNext(&di)) != NULL) {
node_t = dictGetEntryVal(de_t);
if (node_t == NULL) {
continue;
}
de_f = dictFind(nodes_f, node_t->addr);
if (de_f == NULL) {
continue;
}
node_f = dictGetEntryVal(de_f);
if (node_f->con != NULL) {
c = node_f->con;
node_f->con = node_t->con;
node_t->con = c;
}
if (node_f->acon != NULL) {
ac = node_f->acon;
node_f->acon = node_t->acon;
node_t->acon = ac;
node_t->acon->data = node_t;
if (node_f->acon)
node_f->acon->data = node_f;
}
}
}
static int cluster_master_slave_mapping_with_name(redisClusterContext *cc,
dict **nodes,
redisClusterNode *node,
sds master_name) {
int ret;
dictEntry *di;
redisClusterNode *node_old;
listNode *lnode;
if (node == NULL || master_name == NULL) {
return REDIS_ERR;
}
if (*nodes == NULL) {
*nodes = dictCreate(&clusterNodesRefDictType, NULL);
if (*nodes == NULL) {
goto oom;
}
}
di = dictFind(*nodes, master_name);
if (di == NULL) {
sds key = sdsnewlen(master_name, sdslen(master_name));
if (key == NULL) {
goto oom;
}
ret = dictAdd(*nodes, key, node);
if (ret != DICT_OK) {
sdsfree(key);
goto oom;
}
} else {
node_old = dictGetEntryVal(di);
if (node_old == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "dict get value null");
return REDIS_ERR;
}
if (node->role == REDIS_ROLE_MASTER &&
node_old->role == REDIS_ROLE_MASTER) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"two masters have the same name");
return REDIS_ERR;
} else if (node->role == REDIS_ROLE_MASTER &&
node_old->role == REDIS_ROLE_SLAVE) {
if (node->slaves == NULL) {
node->slaves = listCreate();
if (node->slaves == NULL) {
goto oom;
}
node->slaves->free = listClusterNodeDestructor;
}
if (node_old->slaves != NULL) {
node_old->slaves->free = NULL;
while (listLength(node_old->slaves) > 0) {
lnode = listFirst(node_old->slaves);
if (listAddNodeHead(node->slaves, lnode->value) == NULL) {
goto oom;
}
listDelNode(node_old->slaves, lnode);
}
listRelease(node_old->slaves);
node_old->slaves = NULL;
}
if (listAddNodeHead(node->slaves, node_old) == NULL) {
goto oom;
}
dictSetHashVal(*nodes, di, node);
} else if (node->role == REDIS_ROLE_SLAVE) {
if (node_old->slaves == NULL) {
node_old->slaves = listCreate();
if (node_old->slaves == NULL) {
goto oom;
}
node_old->slaves->free = listClusterNodeDestructor;
}
if (listAddNodeTail(node_old->slaves, node) == NULL) {
goto oom;
}
} else {
NOT_REACHED();
}
}
return REDIS_OK;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
/**
* Parse the "cluster slots" command reply to nodes dict.
*/
dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply,
int flags) {
int ret;
cluster_slot *slot = NULL;
dict *nodes = NULL;
dictEntry *den;
redisReply *elem_slots;
redisReply *elem_slots_begin, *elem_slots_end;
redisReply *elem_nodes;
redisReply *elem_ip, *elem_port;
redisClusterNode *master = NULL, *slave;
uint32_t i, idx;
if (reply == NULL) {
return NULL;
}
nodes = dictCreate(&clusterNodesDictType, NULL);
if (nodes == NULL) {
goto oom;
}
if (reply->type != REDIS_REPLY_ARRAY || reply->elements <= 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"reply is not an array.");
goto error;
}
for (i = 0; i < reply->elements; i++) {
elem_slots = reply->element[i];
if (elem_slots->type != REDIS_REPLY_ARRAY || elem_slots->elements < 3) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"first sub_reply is not an array.");
goto error;
}
slot = cluster_slot_create(NULL);
if (slot == NULL) {
goto oom;
}
// one slots region
for (idx = 0; idx < elem_slots->elements; idx++) {
if (idx == 0) {
elem_slots_begin = elem_slots->element[idx];
if (elem_slots_begin->type != REDIS_REPLY_INTEGER) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"slot begin is not an integer.");
goto error;
}
slot->start = (int)(elem_slots_begin->integer);
} else if (idx == 1) {
elem_slots_end = elem_slots->element[idx];
if (elem_slots_end->type != REDIS_REPLY_INTEGER) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"slot end is not an integer.");
goto error;
}
slot->end = (int)(elem_slots_end->integer);
if (slot->start > slot->end) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"slot begin is bigger than slot end.");
goto error;
}
} else {
elem_nodes = elem_slots->element[idx];
if (elem_nodes->type != REDIS_REPLY_ARRAY ||
elem_nodes->elements < 2) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"nodes sub_reply is not an correct array.");
goto error;
}
elem_ip = elem_nodes->element[0];
elem_port = elem_nodes->element[1];
if (elem_ip == NULL || elem_port == NULL ||
elem_ip->type != REDIS_REPLY_STRING ||
elem_port->type != REDIS_REPLY_INTEGER) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"Command(cluster slots) reply error: "
"master ip or port is not correct.");
goto error;
}
// this is master.
if (idx == 2) {
sds address = sdsnewlen(elem_ip->str, elem_ip->len);
if (address == NULL) {
goto oom;
}
address = sdscatfmt(address, ":%i", elem_port->integer);
if (address == NULL) {
goto oom;
}
den = dictFind(nodes, address);
sdsfree(address);
// master already exists, break to the next slots region.
if (den != NULL) {
master = dictGetEntryVal(den);
ret = cluster_slot_ref_node(slot, master);
if (ret != REDIS_OK) {
goto oom;
}
slot = NULL;
break;
}
master = node_get_with_slots(cc, elem_ip, elem_port,
REDIS_ROLE_MASTER);
if (master == NULL) {
goto error;
}
sds key = sdsnewlen(master->addr, sdslen(master->addr));
if (key == NULL) {
freeRedisClusterNode(master);
goto oom;
}
ret = dictAdd(nodes, key, master);
if (ret != DICT_OK) {
sdsfree(key);
freeRedisClusterNode(master);
goto oom;
}
ret = cluster_slot_ref_node(slot, master);
if (ret != REDIS_OK) {
goto oom;
}
slot = NULL;
} else if (flags & HIRCLUSTER_FLAG_ADD_SLAVE) {
slave = node_get_with_slots(cc, elem_ip, elem_port,
REDIS_ROLE_SLAVE);
if (slave == NULL) {
goto error;
}
if (master->slaves == NULL) {
master->slaves = listCreate();
if (master->slaves == NULL) {
freeRedisClusterNode(slave);
goto oom;
}
master->slaves->free = listClusterNodeDestructor;
}
if (listAddNodeTail(master->slaves, slave) == NULL) {
freeRedisClusterNode(slave);
goto oom;
}
}
}
}
}
return nodes;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
// passthrough
error:
if (nodes != NULL) {
dictRelease(nodes);
}
if (slot != NULL) {
cluster_slot_destroy(slot);
}
return NULL;
}
/**
* Parse the "cluster nodes" command reply to nodes dict.
*/
dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len,
int flags) {
int ret;
dict *nodes = NULL;
dict *nodes_name = NULL;
redisClusterNode *master, *slave;
cluster_slot *slot;
char *pos, *start, *end, *line_start, *line_end;
char *role;
int role_len;
int slot_start, slot_end, slot_ranges_found = 0;
sds *part = NULL, *slot_start_end = NULL;
int count_part = 0, count_slot_start_end = 0;
int k;
int len;
nodes = dictCreate(&clusterNodesDictType, NULL);
if (nodes == NULL) {
goto oom;
}
start = str;
end = start + str_len;
line_start = start;
for (pos = start; pos < end; pos++) {
if (*pos == '\n') {
line_end = pos - 1;
len = line_end - line_start;
part = sdssplitlen(line_start, len + 1, " ", 1, &count_part);
if (part == NULL) {
goto oom;
}
if (count_part < 8) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"split cluster nodes error");
goto error;
}
// if the address string starts with ":0", skip this node.
if (sdslen(part[1]) >= 2 && memcmp(part[1], ":0", 2) == 0) {
sdsfreesplitres(part, count_part);
count_part = 0;
part = NULL;
start = pos + 1;
line_start = start;
pos = start;
continue;
}
if (sdslen(part[2]) >= 7 && memcmp(part[2], "myself,", 7) == 0) {
role_len = sdslen(part[2]) - 7;
role = part[2] + 7;
} else {
role_len = sdslen(part[2]);
role = part[2];
}
// add master node
if (role_len >= 6 && memcmp(role, "master", 6) == 0) {
master = node_get_with_nodes(cc, part, count_part,
REDIS_ROLE_MASTER);
if (master == NULL) {
goto error;
}
sds key = sdsnewlen(master->addr, sdslen(master->addr));
if (key == NULL) {
freeRedisClusterNode(master);
goto oom;
}
ret = dictAdd(nodes, key, master);
if (ret != DICT_OK) {
// Key already exists, but possibly an OOM error
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"The address already exists in the nodes");
sdsfree(key);
freeRedisClusterNode(master);
goto error;
}
if (flags & HIRCLUSTER_FLAG_ADD_SLAVE) {
ret = cluster_master_slave_mapping_with_name(
cc, &nodes_name, master, master->name);
if (ret != REDIS_OK) {
freeRedisClusterNode(master);
goto error;
}
}
for (k = 8; k < count_part; k++) {
slot_start_end = sdssplitlen(part[k], sdslen(part[k]), "-",
1, &count_slot_start_end);
if (slot_start_end == NULL) {
goto oom;
}
if (count_slot_start_end == 1) {
slot_start = hi_atoi(slot_start_end[0],
sdslen(slot_start_end[0]));
slot_end = slot_start;
} else if (count_slot_start_end == 2) {
slot_start = hi_atoi(slot_start_end[0],
sdslen(slot_start_end[0]));
;
slot_end = hi_atoi(slot_start_end[1],
sdslen(slot_start_end[1]));
;
} else {
// add open slot for master
if (flags & HIRCLUSTER_FLAG_ADD_OPENSLOT &&
count_slot_start_end == 3 &&
sdslen(slot_start_end[0]) > 1 &&
sdslen(slot_start_end[1]) == 1 &&
sdslen(slot_start_end[2]) > 1 &&
slot_start_end[0][0] == '[' &&
slot_start_end[2][sdslen(slot_start_end[2]) - 1] ==
']') {
copen_slot *oslot, **oslot_elem;
sdsrange(slot_start_end[0], 1, -1);
sdsrange(slot_start_end[2], 0, -2);
if (slot_start_end[1][0] == '>') {
oslot = cluster_open_slot_create(
hi_atoi(slot_start_end[0],
sdslen(slot_start_end[0])),
1, slot_start_end[2], master);
if (oslot == NULL) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"create open slot error");
goto error;
}
if (master->migrating == NULL) {
master->migrating =
hiarray_create(1, sizeof(oslot));
if (master->migrating == NULL) {
cluster_open_slot_destroy(oslot);
goto oom;
}
}
oslot_elem = hiarray_push(master->migrating);
if (oslot_elem == NULL) {
cluster_open_slot_destroy(oslot);
goto oom;
}
*oslot_elem = oslot;
} else if (slot_start_end[1][0] == '<') {
oslot = cluster_open_slot_create(
hi_atoi(slot_start_end[0],
sdslen(slot_start_end[0])),
0, slot_start_end[2], master);
if (oslot == NULL) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"create open slot error");
goto error;
}
if (master->importing == NULL) {
master->importing =
hiarray_create(1, sizeof(oslot));
if (master->importing == NULL) {
cluster_open_slot_destroy(oslot);
goto oom;
}
}
oslot_elem = hiarray_push(master->importing);
if (oslot_elem == NULL) {
cluster_open_slot_destroy(oslot);
goto oom;
}
*oslot_elem = oslot;
}
}
slot_start = -1;
slot_end = -1;
}
sdsfreesplitres(slot_start_end, count_slot_start_end);
count_slot_start_end = 0;
slot_start_end = NULL;
if (slot_start < 0 || slot_end < 0 ||
slot_start > slot_end ||
slot_end >= REDIS_CLUSTER_SLOTS) {
continue;
}
slot_ranges_found += 1;
slot = cluster_slot_create(master);
if (slot == NULL) {
goto oom;
}
slot->start = (uint32_t)slot_start;
slot->end = (uint32_t)slot_end;
}
}
// add slave node
else if ((flags & HIRCLUSTER_FLAG_ADD_SLAVE) &&
(role_len >= 5 && memcmp(role, "slave", 5) == 0)) {
slave =
node_get_with_nodes(cc, part, count_part, REDIS_ROLE_SLAVE);
if (slave == NULL) {
goto error;
}
ret = cluster_master_slave_mapping_with_name(cc, &nodes_name,
slave, part[3]);
if (ret != REDIS_OK) {
freeRedisClusterNode(slave);
goto error;
}
}
sdsfreesplitres(part, count_part);
count_part = 0;
part = NULL;
start = pos + 1;
line_start = start;
pos = start;
}
}
if (slot_ranges_found == 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "No slot information");
goto error;
}
if (nodes_name != NULL) {
dictRelease(nodes_name);
}
return nodes;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
// passthrough
error:
sdsfreesplitres(part, count_part);
sdsfreesplitres(slot_start_end, count_slot_start_end);
if (nodes != NULL) {
dictRelease(nodes);
}
if (nodes_name != NULL) {
dictRelease(nodes_name);
}
return NULL;
}
/* Sends CLUSTER SLOTS or CLUSTER NODES to the node with context c. */
static int clusterUpdateRouteSendCommand(redisClusterContext *cc,
redisContext *c) {
const char *cmd = (cc->flags & HIRCLUSTER_FLAG_ROUTE_USE_SLOTS ?
REDIS_COMMAND_CLUSTER_SLOTS :
REDIS_COMMAND_CLUSTER_NODES);
if (redisAppendCommand(c, cmd) != REDIS_OK) {
const char *msg = (cc->flags & HIRCLUSTER_FLAG_ROUTE_USE_SLOTS ?
"Command (cluster slots) send error." :
"Command (cluster nodes) send error.");
__redisClusterSetError(cc, c->err, msg);
return REDIS_ERR;
}
/* Flush buffer to socket. */
if (redisBufferWrite(c, NULL) == REDIS_ERR)
return REDIS_ERR;
return REDIS_OK;
}
/* Receives and handles a CLUSTER SLOTS reply from node with context c. */
static int handleClusterSlotsReply(redisClusterContext *cc, redisContext *c) {
redisReply *reply = NULL;
int result = redisGetReply(c, (void **)&reply);
if (result != REDIS_OK) {
if (c->err == REDIS_ERR_TIMEOUT) {
__redisClusterSetError(
cc, c->err,
"Command (cluster slots) reply error (socket timeout)");
} else {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"Command (cluster slots) reply error (NULL).");
}
return REDIS_ERR;
} else if (reply->type != REDIS_REPLY_ARRAY) {
if (reply->type == REDIS_REPLY_ERROR) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, reply->str);
} else {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"Command (cluster slots) reply error: type is not array.");
}
freeReplyObject(reply);
return REDIS_ERR;
}
dict *nodes = parse_cluster_slots(cc, reply, cc->flags);
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}
/* Receives and handles a CLUSTER NODES reply from node with context c. */
static int handleClusterNodesReply(redisClusterContext *cc, redisContext *c) {
redisReply *reply = NULL;
int result = redisGetReply(c, (void **)&reply);
if (result != REDIS_OK) {
if (c->err == REDIS_ERR_TIMEOUT) {
__redisClusterSetError(cc, c->err,
"Command (cluster nodes) reply error "
"(socket timeout)");
} else {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command (cluster nodes) reply error "
"(NULL).");
}
return REDIS_ERR;
} else if (reply->type != REDIS_REPLY_STRING) {
if (reply->type == REDIS_REPLY_ERROR) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, reply->str);
} else {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Command(cluster nodes) reply error: "
"type is not string.");
}
freeReplyObject(reply);
return REDIS_ERR;
}
dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags);
freeReplyObject(reply);
return updateNodesAndSlotmap(cc, nodes);
}
/* Receives and handles a CLUSTER SLOTS or CLUSTER NODES reply from node with
* context c. */
static int clusterUpdateRouteHandleReply(redisClusterContext *cc,
redisContext *c) {
if (cc->flags & HIRCLUSTER_FLAG_ROUTE_USE_SLOTS) {
return handleClusterSlotsReply(cc, c);
} else {
return handleClusterNodesReply(cc, c);
}
}
/**
* Update route with the "cluster nodes" or "cluster slots" command reply.
*/
static int cluster_update_route_by_addr(redisClusterContext *cc, const char *ip,
int port) {
redisContext *c = NULL;
if (cc == NULL) {
return REDIS_ERR;
}
if (ip == NULL || port <= 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "Ip or port error!");
goto error;
}
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, ip, port);
options.connect_timeout = cc->connect_timeout;
options.command_timeout = cc->command_timeout;
c = redisConnectWithOptions(&options);
if (c == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
if (cc->on_connect) {
cc->on_connect(c, c->err ? REDIS_ERR : REDIS_OK);
}
if (c->err) {
__redisClusterSetError(cc, c->err, c->errstr);
goto error;
}
if (cc->ssl && cc->ssl_init_fn(c, cc->ssl) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
goto error;
}
if (authenticate(cc, c) != REDIS_OK) {
goto error;
}
if (clusterUpdateRouteSendCommand(cc, c) != REDIS_OK) {
goto error;
}
if (clusterUpdateRouteHandleReply(cc, c) != REDIS_OK) {
goto error;
}
redisFree(c);
return REDIS_OK;
error:
redisFree(c);
return REDIS_ERR;
}
/* Update known cluster nodes with a new collection of redisClusterNodes.
* Will also update the slot-to-node lookup table for the new nodes. */
static int updateNodesAndSlotmap(redisClusterContext *cc, dict *nodes) {
if (nodes == NULL) {
return REDIS_ERR;
}
/* Create a slot to redisClusterNode lookup table */
redisClusterNode **table;
table = hi_calloc(REDIS_CLUSTER_SLOTS, sizeof(redisClusterNode *));
if (table == NULL) {
goto oom;
}
dictIterator di;
dictInitIterator(&di, nodes);
dictEntry *de;
while ((de = dictNext(&di))) {
redisClusterNode *master = dictGetEntryVal(de);
if (master->role != REDIS_ROLE_MASTER) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Node role must be master");
goto error;
}
if (master->slots == NULL) {
continue;
}
listIter li;
listRewind(master->slots, &li);
listNode *ln;
while ((ln = listNext(&li))) {
cluster_slot *slot = listNodeValue(ln);
if (slot->start > slot->end || slot->end >= REDIS_CLUSTER_SLOTS) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Slot region for node is invalid");
goto error;
}
for (uint32_t i = slot->start; i <= slot->end; i++) {
if (table[i] != NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"Different node holds same slot");
goto error;
}
table[i] = master;
}
}
}
/* Update slot-to-node table before changing cc->nodes since
* removal of nodes might trigger user callbacks which may
* send commands, which depend on the slot-to-node table. */
if (cc->table != NULL) {
hi_free(cc->table);
}
cc->table = table;
cc->route_version++;
// Move all hiredis contexts in cc->nodes to nodes
cluster_nodes_swap_ctx(cc->nodes, nodes);
/* Replace cc->nodes before releasing the old dict since
* the release procedure might access cc->nodes. */
dict *oldnodes = cc->nodes;
cc->nodes = nodes;
if (oldnodes != NULL) {
dictRelease(oldnodes);
}
if (cc->event_callback != NULL) {
cc->event_callback(cc, HIRCLUSTER_EVENT_SLOTMAP_UPDATED,
cc->event_privdata);
if (cc->route_version == 1) {
/* Special event the first time the slotmap was updated. */
cc->event_callback(cc, HIRCLUSTER_EVENT_READY, cc->event_privdata);
}
}
cc->need_update_route = 0;
return REDIS_OK;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
// passthrough
error:
hi_free(table);
dictRelease(nodes);
return REDIS_ERR;
}
int redisClusterUpdateSlotmap(redisClusterContext *cc) {
int ret;
int flag_err_not_set = 1;
redisClusterNode *node;
dictEntry *de;
if (cc == NULL) {
return REDIS_ERR;
}
if (cc->nodes == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "no server address");
return REDIS_ERR;
}
dictIterator di;
dictInitIterator(&di, cc->nodes);
while ((de = dictNext(&di)) != NULL) {
node = dictGetEntryVal(de);
if (node == NULL || node->host == NULL) {
continue;
}
ret = cluster_update_route_by_addr(cc, node->host, node->port);
if (ret == REDIS_OK) {
if (cc->err) {
cc->err = 0;
memset(cc->errstr, '\0', strlen(cc->errstr));
}
return REDIS_OK;
}
flag_err_not_set = 0;
}
if (flag_err_not_set) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "no valid server address");
}
return REDIS_ERR;
}
redisClusterContext *redisClusterContextInit(void) {
redisClusterContext *cc;
cc = hi_calloc(1, sizeof(redisClusterContext));
if (cc == NULL)
return NULL;
cc->max_retry_count = CLUSTER_DEFAULT_MAX_RETRY_COUNT;
return cc;
}
void redisClusterFree(redisClusterContext *cc) {
if (cc == NULL)
return;
if (cc->event_callback) {
cc->event_callback(cc, HIRCLUSTER_EVENT_FREE_CONTEXT,
cc->event_privdata);
}
if (cc->connect_timeout) {
hi_free(cc->connect_timeout);
cc->connect_timeout = NULL;
}
if (cc->command_timeout) {
hi_free(cc->command_timeout);
cc->command_timeout = NULL;
}
if (cc->table != NULL) {
hi_free(cc->table);
cc->table = NULL;
}
if (cc->nodes != NULL) {
/* Clear cc->nodes before releasing the dict since the release procedure
might access cc->nodes. When a node and its hiredis context are freed
all pending callbacks are executed. Clearing cc->nodes prevents a pending
slotmap update command callback to trigger additional slotmap updates. */
dict *nodes = cc->nodes;
cc->nodes = NULL;
dictRelease(nodes);
}
if (cc->requests != NULL) {
listRelease(cc->requests);
}
if (cc->username != NULL) {
hi_free(cc->username);
cc->username = NULL;
}
if (cc->password != NULL) {
hi_free(cc->password);
cc->password = NULL;
}
hi_free(cc);
}
/* Connect to a Redis cluster. On error the field error in the returned
* context will be set to the return value of the error function.
* When no set of reply functions is given, the default set will be used. */
static int _redisClusterConnect2(redisClusterContext *cc) {
if (cc->nodes == NULL || dictSize(cc->nodes) == 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"servers address does not set up");
return REDIS_ERR;
}
return redisClusterUpdateSlotmap(cc);
}
/* Connect to a Redis cluster. On error the field error in the returned
* context will be set to the return value of the error function.
* When no set of reply functions is given, the default set will be used. */
static redisClusterContext *_redisClusterConnect(redisClusterContext *cc,
const char *addrs) {
int ret;
ret = redisClusterSetOptionAddNodes(cc, addrs);
if (ret != REDIS_OK) {
return cc;
}
redisClusterUpdateSlotmap(cc);
return cc;
}
redisClusterContext *redisClusterConnect(const char *addrs, int flags) {
redisClusterContext *cc;
cc = redisClusterContextInit();
if (cc == NULL) {
return NULL;
}
cc->flags = flags;
return _redisClusterConnect(cc, addrs);
}
redisClusterContext *redisClusterConnectWithTimeout(const char *addrs,
const struct timeval tv,
int flags) {
redisClusterContext *cc;
cc = redisClusterContextInit();
if (cc == NULL) {
return NULL;
}
cc->flags = flags;
if (cc->connect_timeout == NULL) {
cc->connect_timeout = hi_malloc(sizeof(struct timeval));
if (cc->connect_timeout == NULL) {
return NULL;
}
}
memcpy(cc->connect_timeout, &tv, sizeof(struct timeval));
return _redisClusterConnect(cc, addrs);
}
int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr) {
dictEntry *node_entry;
redisClusterNode *node = NULL;
int port, ret;
sds ip = NULL;
if (cc == NULL) {
return REDIS_ERR;
}
if (cc->nodes == NULL) {
cc->nodes = dictCreate(&clusterNodesDictType, NULL);
if (cc->nodes == NULL) {
goto oom;
}
}
sds addr_sds = sdsnew(addr);
if (addr_sds == NULL) {
goto oom;
}
node_entry = dictFind(cc->nodes, addr_sds);
sdsfree(addr_sds);
if (node_entry == NULL) {
char *p;
if ((p = strrchr(addr, IP_PORT_SEPARATOR)) == NULL) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"server address is incorrect, port separator missing.");
return REDIS_ERR;
}
// p includes separator
if (p - addr <= 0) { /* length until separator */
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"server address is incorrect, address part missing.");
return REDIS_ERR;
}
ip = sdsnewlen(addr, p - addr);
if (ip == NULL) {
goto oom;
}
p++; // remove separator character
if (strlen(p) <= 0) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"server address is incorrect, port part missing.");
goto error;
}
port = hi_atoi(p, strlen(p));
if (port <= 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"server port is incorrect");
goto error;
}
node = createRedisClusterNode();
if (node == NULL) {
goto oom;
}
node->addr = sdsnew(addr);
if (node->addr == NULL) {
goto oom;
}
node->host = ip;
node->port = port;
sds key = sdsnewlen(node->addr, sdslen(node->addr));
if (key == NULL) {
goto oom;
}
ret = dictAdd(cc->nodes, key, node);
if (ret != DICT_OK) {
sdsfree(key);
goto oom;
}
}
return REDIS_OK;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
// passthrough
error:
sdsfree(ip);
if (node != NULL) {
sdsfree(node->addr);
hi_free(node);
}
return REDIS_ERR;
}
int redisClusterSetOptionAddNodes(redisClusterContext *cc, const char *addrs) {
int ret;
sds *address = NULL;
int address_count = 0;
int i;
if (cc == NULL) {
return REDIS_ERR;
}
address = sdssplitlen(addrs, strlen(addrs), CLUSTER_ADDRESS_SEPARATOR,
strlen(CLUSTER_ADDRESS_SEPARATOR), &address_count);
if (address == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
if (address_count <= 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"invalid server addresses (example format: "
"127.0.0.1:1234,127.0.0.2:5678)");
sdsfreesplitres(address, address_count);
return REDIS_ERR;
}
for (i = 0; i < address_count; i++) {
ret = redisClusterSetOptionAddNode(cc, address[i]);
if (ret != REDIS_OK) {
sdsfreesplitres(address, address_count);
return REDIS_ERR;
}
}
sdsfreesplitres(address, address_count);
return REDIS_OK;
}
/* Deprecated function, option has no effect. */
int redisClusterSetOptionConnectBlock(redisClusterContext *cc) {
if (cc == NULL) {
return REDIS_ERR;
}
return REDIS_OK;
}
/* Deprecated function, option has no effect. */
int redisClusterSetOptionConnectNonBlock(redisClusterContext *cc) {
if (cc == NULL) {
return REDIS_ERR;
}
return REDIS_OK;
}
/**
* Configure a username used during authentication, see
* the Redis AUTH command.
* Disabled by default. Can be disabled again by providing an
* empty string or a null pointer.
*/
int redisClusterSetOptionUsername(redisClusterContext *cc,
const char *username) {
if (cc == NULL) {
return REDIS_ERR;
}
// Disabling option
if (username == NULL || username[0] == '\0') {
hi_free(cc->username);
cc->username = NULL;
return REDIS_OK;
}
hi_free(cc->username);
cc->username = hi_strdup(username);
if (cc->username == NULL) {
return REDIS_ERR;
}
return REDIS_OK;
}
/**
* Configure a password used when connecting to password-protected
* Redis instances. (See Redis AUTH command)
*/
int redisClusterSetOptionPassword(redisClusterContext *cc,
const char *password) {
if (cc == NULL) {
return REDIS_ERR;
}
// Disabling use of password
if (password == NULL || password[0] == '\0') {
hi_free(cc->password);
cc->password = NULL;
return REDIS_OK;
}
hi_free(cc->password);
cc->password = hi_strdup(password);
if (cc->password == NULL) {
return REDIS_ERR;
}
return REDIS_OK;
}
int redisClusterSetOptionParseSlaves(redisClusterContext *cc) {
if (cc == NULL) {
return REDIS_ERR;
}
cc->flags |= HIRCLUSTER_FLAG_ADD_SLAVE;
return REDIS_OK;
}
int redisClusterSetOptionParseOpenSlots(redisClusterContext *cc) {
if (cc == NULL) {
return REDIS_ERR;
}
cc->flags |= HIRCLUSTER_FLAG_ADD_OPENSLOT;
return REDIS_OK;
}
int redisClusterSetOptionRouteUseSlots(redisClusterContext *cc) {
if (cc == NULL) {
return REDIS_ERR;
}
cc->flags |= HIRCLUSTER_FLAG_ROUTE_USE_SLOTS;
return REDIS_OK;
}
int redisClusterSetOptionConnectTimeout(redisClusterContext *cc,
const struct timeval tv) {
if (cc == NULL) {
return REDIS_ERR;
}
if (cc->connect_timeout == NULL) {
cc->connect_timeout = hi_malloc(sizeof(struct timeval));
if (cc->connect_timeout == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
}
memcpy(cc->connect_timeout, &tv, sizeof(struct timeval));
return REDIS_OK;
}
int redisClusterSetOptionTimeout(redisClusterContext *cc,
const struct timeval tv) {
if (cc == NULL) {
return REDIS_ERR;
}
if (cc->command_timeout == NULL ||
cc->command_timeout->tv_sec != tv.tv_sec ||
cc->command_timeout->tv_usec != tv.tv_usec) {
if (cc->command_timeout == NULL) {
cc->command_timeout = hi_malloc(sizeof(struct timeval));
if (cc->command_timeout == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
}
memcpy(cc->command_timeout, &tv, sizeof(struct timeval));
/* Set timeout on already connected nodes */
if (cc->nodes && dictSize(cc->nodes) > 0) {
dictEntry *de;
redisClusterNode *node;
dictIterator di;
dictInitIterator(&di, cc->nodes);
while ((de = dictNext(&di)) != NULL) {
node = dictGetEntryVal(de);
if (node->acon) {
redisAsyncSetTimeout(node->acon, tv);
}
if (node->con && node->con->err == 0) {
redisSetTimeout(node->con, tv);
}
if (node->slaves && listLength(node->slaves) > 0) {
redisClusterNode *slave;
listNode *ln;
listIter li;
listRewind(node->slaves, &li);
while ((ln = listNext(&li)) != NULL) {
slave = listNodeValue(ln);
if (slave->acon) {
redisAsyncSetTimeout(slave->acon, tv);
}
if (slave->con && slave->con->err == 0) {
redisSetTimeout(slave->con, tv);
}
}
}
}
}
}
return REDIS_OK;
}
int redisClusterSetOptionMaxRetry(redisClusterContext *cc,
int max_retry_count) {
if (cc == NULL || max_retry_count <= 0) {
return REDIS_ERR;
}
cc->max_retry_count = max_retry_count;
return REDIS_OK;
}
int redisClusterConnect2(redisClusterContext *cc) {
if (cc == NULL) {
return REDIS_ERR;
}
/* Clear a previously set shutdown flag since we allow a
* reconnection of an async context using this API (legacy). */
cc->flags &= ~HIRCLUSTER_FLAG_SHUTDOWN;
return _redisClusterConnect2(cc);
}
redisContext *ctx_get_by_node(redisClusterContext *cc, redisClusterNode *node) {
redisContext *c = NULL;
if (node == NULL) {
return NULL;
}
c = node->con;
if (c != NULL) {
if (c->err) {
redisReconnect(c);
if (cc->on_connect) {
cc->on_connect(c, c->err ? REDIS_ERR : REDIS_OK);
}
if (cc->ssl && cc->ssl_init_fn(c, cc->ssl) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
}
authenticate(cc, c); // err and errstr handled in function
}
return c;
}
if (node->host == NULL || node->port <= 0) {
return NULL;
}
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, node->host, node->port);
options.connect_timeout = cc->connect_timeout;
options.command_timeout = cc->command_timeout;
c = redisConnectWithOptions(&options);
if (c == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return NULL;
}
if (cc->on_connect) {
cc->on_connect(c, c->err ? REDIS_ERR : REDIS_OK);
}
if (c->err) {
__redisClusterSetError(cc, c->err, c->errstr);
redisFree(c);
return NULL;
}
if (cc->ssl && cc->ssl_init_fn(c, cc->ssl) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
redisFree(c);
return NULL;
}
if (authenticate(cc, c) != REDIS_OK) {
redisFree(c);
return NULL;
}
node->con = c;
return c;
}
static redisClusterNode *node_get_by_table(redisClusterContext *cc,
uint32_t slot_num) {
if (cc == NULL) {
return NULL;
}
if (slot_num >= REDIS_CLUSTER_SLOTS) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "invalid slot");
return NULL;
}
if (cc->table == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "slotmap not available");
return NULL;
}
if (cc->table[slot_num] == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"slot not served by any node");
return NULL;
}
return cc->table[slot_num];
}
/* Helper function for the redisClusterAppendCommand* family of functions.
*
* Write a formatted command to the output buffer. When this family
* is used, you need to call redisGetReply yourself to retrieve
* the reply (or replies in pub/sub).
*/
static int __redisClusterAppendCommand(redisClusterContext *cc,
struct cmd *command) {
redisClusterNode *node;
redisContext *c = NULL;
if (cc == NULL || command == NULL) {
return REDIS_ERR;
}
node = node_get_by_table(cc, (uint32_t)command->slot_num);
if (node == NULL) {
return REDIS_ERR;
}
c = ctx_get_by_node(cc, node);
if (c == NULL) {
return REDIS_ERR;
} else if (c->err) {
__redisClusterSetError(cc, c->err, c->errstr);
return REDIS_ERR;
}
if (redisAppendFormattedCommand(c, command->cmd, command->clen) !=
REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
return REDIS_ERR;
}
return REDIS_OK;
}
/* Helper functions for the redisClusterGetReply* family of functions.
*/
static int __redisClusterGetReplyFromNode(redisClusterContext *cc,
redisClusterNode *node,
void **reply) {
redisContext *c;
if (cc == NULL || node == NULL || reply == NULL)
return REDIS_ERR;
c = node->con;
if (c == NULL) {
return REDIS_ERR;
} else if (c->err) {
if (cc->need_update_route == 0) {
cc->retry_count++;
if (cc->retry_count > cc->max_retry_count) {
cc->need_update_route = 1;
cc->retry_count = 0;
}
}
__redisClusterSetError(cc, c->err, c->errstr);
return REDIS_ERR;
}
if (redisGetReply(c, reply) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
return REDIS_ERR;
}
if (cluster_reply_error_type(*reply) == CLUSTER_ERR_MOVED)
cc->need_update_route = 1;
return REDIS_OK;
}
static int __redisClusterGetReply(redisClusterContext *cc, int slot_num,
void **reply) {
redisClusterNode *node;
if (cc == NULL || slot_num < 0 || reply == NULL)
return REDIS_ERR;
node = node_get_by_table(cc, (uint32_t)slot_num);
if (node == NULL) {
return REDIS_ERR;
}
return __redisClusterGetReplyFromNode(cc, node, reply);
}
/* Parses a MOVED or ASK error reply and returns the destination node. The slot
* is returned by pointer, if provided. */
static redisClusterNode *getNodeFromRedirectReply(redisClusterContext *cc,
redisReply *reply,
int *slotptr) {
redisClusterNode *node = NULL;
sds *part = NULL;
int part_len = 0;
char *p;
/* Expecting ["ASK" | "MOVED", "<slot>", "<endpoint>:<port>"] */
part = sdssplitlen(reply->str, reply->len, " ", 1, &part_len);
if (part == NULL) {
goto oom;
}
if (part_len != 3) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "failed to parse redirect");
goto done;
}
/* Parse slot if requested. */
if (slotptr != NULL) {
*slotptr = hi_atoi(part[1], sdslen(part[1]));
}
/* Find the last occurance of the port separator since
* IPv6 addresses can contain ':' */
if ((p = strrchr(part[2], IP_PORT_SEPARATOR)) == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"port separator missing in redirect");
goto done;
}
// p includes separator
/* Empty endpoint not supported yet */
if (p - part[2] == 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"endpoint missing in redirect");
goto done;
}
dictEntry *de = dictFind(cc->nodes, part[2]);
if (de != NULL) {
node = de->val;
goto done;
}
/* Add this node since it was unknown */
node = createRedisClusterNode();
if (node == NULL) {
goto oom;
}
node->role = REDIS_ROLE_MASTER;
node->addr = part[2];
part[2] = NULL; /* Memory ownership moved */
node->host = sdsnewlen(node->addr, p - node->addr);
if (node->host == NULL) {
goto oom;
}
p++; // remove found separator character
node->port = hi_atoi(p, strlen(p));
sds key = sdsnewlen(node->addr, sdslen(node->addr));
if (key == NULL) {
goto oom;
}
if (dictAdd(cc->nodes, key, node) != DICT_OK) {
sdsfree(key);
goto oom;
}
done:
sdsfreesplitres(part, part_len);
return node;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
sdsfreesplitres(part, part_len);
if (node != NULL) {
sdsfree(node->addr);
sdsfree(node->host);
hi_free(node);
}
return NULL;
}
static void *redis_cluster_command_execute(redisClusterContext *cc,
struct cmd *command) {
void *reply = NULL;
redisClusterNode *node;
redisContext *c = NULL;
int error_type;
redisContext *c_updating_route = NULL;
retry:
node = node_get_by_table(cc, (uint32_t)command->slot_num);
if (node == NULL) {
/* Update the slotmap since the slot is not served. */
if (redisClusterUpdateSlotmap(cc) != REDIS_OK) {
goto error;
}
node = node_get_by_table(cc, (uint32_t)command->slot_num);
if (node == NULL) {
/* Return error since the slot is still not served. */
goto error;
}
}
c = ctx_get_by_node(cc, node);
if (c == NULL || c->err) {
/* Failed to connect. Maybe there was a failover and this node is gone.
* Update slotmap to find out. */
if (redisClusterUpdateSlotmap(cc) != REDIS_OK) {
goto error;
}
node = node_get_by_table(cc, (uint32_t)command->slot_num);
if (node == NULL) {
goto error;
}
c = ctx_get_by_node(cc, node);
if (c == NULL) {
goto error;
} else if (c->err) {
__redisClusterSetError(cc, c->err, c->errstr);
goto error;
}
}
moved_retry:
ask_retry:
if (redisAppendFormattedCommand(c, command->cmd, command->clen) !=
REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
goto error;
}
/* If update slotmap has been scheduled, do that in the same pipeline. */
if (cc->need_update_route && c_updating_route == NULL) {
if (clusterUpdateRouteSendCommand(cc, c) == REDIS_OK) {
c_updating_route = c;
}
}
if (redisGetReply(c, &reply) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
/* We may need to update the slotmap if this node is removed from the
* cluster, but the current request may have already timed out so we
* schedule it for later. */
if (c->err != REDIS_ERR_OOM)
cc->need_update_route = 1;
goto error;
}
error_type = cluster_reply_error_type(reply);
if (error_type > CLUSTER_NOT_ERR && error_type < CLUSTER_ERR_SENTINEL) {
cc->retry_count++;
if (cc->retry_count > cc->max_retry_count) {
__redisClusterSetError(cc, REDIS_ERR_CLUSTER_TOO_MANY_RETRIES,
"too many cluster retries");
goto error;
}
int slot = -1;
switch (error_type) {
case CLUSTER_ERR_MOVED:
node = getNodeFromRedirectReply(cc, reply, &slot);
freeReplyObject(reply);
reply = NULL;
if (node == NULL) {
/* Failed to parse redirect. Specific error already set. */
goto error;
}
/* Update the slot mapping entry for this slot. */
if (slot >= 0) {
cc->table[slot] = node;
}
if (c_updating_route == NULL) {
if (clusterUpdateRouteSendCommand(cc, c) == REDIS_OK) {
/* Deferred update route using the node that sent the
* redirect. */
c_updating_route = c;
} else if (redisClusterUpdateSlotmap(cc) == REDIS_OK) {
/* Synchronous update route successful using new connection. */
cc->err = 0;
cc->errstr[0] = '\0';
} else {
/* Failed to update route. Specific error already set. */
goto error;
}
}
c = ctx_get_by_node(cc, node);
if (c == NULL) {
goto error;
} else if (c->err) {
__redisClusterSetError(cc, c->err, c->errstr);
goto error;
}
goto moved_retry;
break;
case CLUSTER_ERR_ASK:
node = getNodeFromRedirectReply(cc, reply, NULL);
if (node == NULL) {
goto error;
}
freeReplyObject(reply);
reply = NULL;
c = ctx_get_by_node(cc, node);
if (c == NULL) {
goto error;
} else if (c->err) {
__redisClusterSetError(cc, c->err, c->errstr);
goto error;
}
reply = redisCommand(c, REDIS_COMMAND_ASKING);
if (reply == NULL) {
__redisClusterSetError(cc, c->err, c->errstr);
goto error;
}
freeReplyObject(reply);
reply = NULL;
goto ask_retry;
break;
case CLUSTER_ERR_TRYAGAIN:
case CLUSTER_ERR_CLUSTERDOWN:
freeReplyObject(reply);
reply = NULL;
goto retry;
break;
default:
break;
}
}
goto done;
error:
if (reply) {
freeReplyObject(reply);
reply = NULL;
}
done:
if (c_updating_route) {
/* Deferred CLUSTER SLOTS or CLUSTER NODES in progress. Wait for the
* reply and handle it. */
if (clusterUpdateRouteHandleReply(cc, c_updating_route) != REDIS_OK) {
/* Clear error and update synchronously using another node. */
cc->err = 0;
cc->errstr[0] = '\0';
if (redisClusterUpdateSlotmap(cc) != REDIS_OK) {
/* Clear the reply to indicate failure. */
freeReplyObject(reply);
reply = NULL;
}
}
}
return reply;
}
static int command_pre_fragment(redisClusterContext *cc, struct cmd *command,
hilist *commands) {
struct keypos *kp, *sub_kp;
uint32_t key_count;
uint32_t i, j;
uint32_t idx;
uint32_t key_len;
int slot_num = -1;
struct cmd *sub_command;
struct cmd **sub_commands = NULL;
char num_str[12];
uint8_t num_str_len;
if (command == NULL || commands == NULL) {
goto done;
}
key_count = hiarray_n(command->keys);
sub_commands = hi_calloc(REDIS_CLUSTER_SLOTS, sizeof(*sub_commands));
if (sub_commands == NULL) {
goto oom;
}
command->frag_seq = hi_calloc(key_count, sizeof(*command->frag_seq));
if (command->frag_seq == NULL) {
goto oom;
}
// Fill sub_command with key, slot and command length (clen, only keylength)
for (i = 0; i < key_count; i++) {
kp = hiarray_get(command->keys, i);
slot_num = keyHashSlot(kp->start, kp->end - kp->start);
if (slot_num < 0 || slot_num >= REDIS_CLUSTER_SLOTS) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"keyHashSlot return error");
goto done;
}
if (sub_commands[slot_num] == NULL) {
sub_commands[slot_num] = command_get();
if (sub_commands[slot_num] == NULL) {
goto oom;
}
}
command->frag_seq[i] = sub_command = sub_commands[slot_num];
sub_command->narg++;
sub_kp = hiarray_push(sub_command->keys);
if (sub_kp == NULL) {
goto oom;
}
sub_kp->start = kp->start;
sub_kp->end = kp->end;
// Number of characters in key
key_len = (uint32_t)(kp->end - kp->start);
sub_command->clen += key_len + uint_len(key_len);
sub_command->slot_num = slot_num;
if (command->type == CMD_REQ_REDIS_MSET) {
uint32_t len = 0;
char *p;
for (p = sub_kp->end + 1; !isdigit(*p); p++) {
}
p = sub_kp->end + 1;
while (!isdigit(*p)) {
p++;
}
for (; isdigit(*p); p++) {
len = len * 10 + (uint32_t)(*p - '0');
}
len += CRLF_LEN * 2;
len += (p - sub_kp->end);
sub_kp->remain_len = len;
sub_command->clen += len;
}
}
/* prepend command header */
for (i = 0; i < REDIS_CLUSTER_SLOTS; i++) {
sub_command = sub_commands[i];
if (sub_command == NULL) {
continue;
}
idx = 0;
if (command->type == CMD_REQ_REDIS_MGET) {
//"*%d\r\n$4\r\nmget\r\n"
sub_command->clen += 5 * sub_command->narg;
sub_command->narg++;
hi_itoa(num_str, sub_command->narg);
num_str_len = (uint8_t)(strlen(num_str));
sub_command->clen += 13 + num_str_len;
sub_command->cmd =
hi_calloc(sub_command->clen, sizeof(*sub_command->cmd));
if (sub_command->cmd == NULL) {
goto oom;
}
sub_command->cmd[idx++] = '*';
memcpy(sub_command->cmd + idx, num_str, num_str_len);
idx += num_str_len;
memcpy(sub_command->cmd + idx, "\r\n$4\r\nmget\r\n", 12);
idx += 12;
for (j = 0; j < hiarray_n(sub_command->keys); j++) {
kp = hiarray_get(sub_command->keys, j);
key_len = (uint32_t)(kp->end - kp->start);
hi_itoa(num_str, key_len);
num_str_len = strlen(num_str);
sub_command->cmd[idx++] = '$';
memcpy(sub_command->cmd + idx, num_str, num_str_len);
idx += num_str_len;
memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN);
idx += CRLF_LEN;
memcpy(sub_command->cmd + idx, kp->start, key_len);
idx += key_len;
memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN);
idx += CRLF_LEN;
}
} else if (command->type == CMD_REQ_REDIS_DEL) {
//"*%d\r\n$3\r\ndel\r\n"
sub_command->clen += 5 * sub_command->narg;
sub_command->narg++;
hi_itoa(num_str, sub_command->narg);
num_str_len = (uint8_t)strlen(num_str);
sub_command->clen += 12 + num_str_len;
sub_command->cmd =
hi_calloc(sub_command->clen, sizeof(*sub_command->cmd));
if (sub_command->cmd == NULL) {
goto oom;
}
sub_command->cmd[idx++] = '*';
memcpy(sub_command->cmd + idx, num_str, num_str_len);
idx += num_str_len;
memcpy(sub_command->cmd + idx, "\r\n$3\r\ndel\r\n", 11);
idx += 11;
for (j = 0; j < hiarray_n(sub_command->keys); j++) {
kp = hiarray_get(sub_command->keys, j);
key_len = (uint32_t)(kp->end - kp->start);
hi_itoa(num_str, key_len);
num_str_len = strlen(num_str);
sub_command->cmd[idx++] = '$';
memcpy(sub_command->cmd + idx, num_str, num_str_len);
idx += num_str_len;
memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN);
idx += CRLF_LEN;
memcpy(sub_command->cmd + idx, kp->start, key_len);
idx += key_len;
memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN);
idx += CRLF_LEN;
}
} else if (command->type == CMD_REQ_REDIS_EXISTS) {
//"*%d\r\n$6\r\nexists\r\n"
sub_command->clen += 5 * sub_command->narg;
sub_command->narg++;
hi_itoa(num_str, sub_command->narg);
num_str_len = (uint8_t)strlen(num_str);
sub_command->clen += 15 + num_str_len;
sub_command->cmd =
hi_calloc(sub_command->clen, sizeof(*sub_command->cmd));
if (sub_command->cmd == NULL) {
goto oom;
}
sub_command->cmd[idx++] = '*';
memcpy(sub_command->cmd + idx, num_str, num_str_len);
idx += num_str_len;
memcpy(sub_command->cmd + idx, "\r\n$6\r\nexists\r\n", 14);
idx += 14;
for (j = 0; j < hiarray_n(sub_command->keys); j++) {
kp = hiarray_get(sub_command->keys, j);
key_len = (uint32_t)(kp->end - kp->start);
hi_itoa(num_str, key_len);
num_str_len = strlen(num_str);
sub_command->cmd[idx++] = '$';
memcpy(sub_command->cmd + idx, num_str, num_str_len);
idx += num_str_len;
memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN);
idx += CRLF_LEN;
memcpy(sub_command->cmd + idx, kp->start, key_len);
idx += key_len;
memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN);
idx += CRLF_LEN;
}
} else if (command->type == CMD_REQ_REDIS_MSET) {
//"*%d\r\n$4\r\nmset\r\n"
sub_command->clen += 3 * sub_command->narg;
sub_command->narg *= 2;
sub_command->narg++;
hi_itoa(num_str, sub_command->narg);
num_str_len = (uint8_t)strlen(num_str);
sub_command->clen += 13 + num_str_len;
sub_command->cmd =
hi_calloc(sub_command->clen, sizeof(*sub_command->cmd));
if (sub_command->cmd == NULL) {
goto oom;
}
sub_command->cmd[idx++] = '*';
memcpy(sub_command->cmd + idx, num_str, num_str_len);
idx += num_str_len;
memcpy(sub_command->cmd + idx, "\r\n$4\r\nmset\r\n", 12);
idx += 12;
for (j = 0; j < hiarray_n(sub_command->keys); j++) {
kp = hiarray_get(sub_command->keys, j);
key_len = (uint32_t)(kp->end - kp->start);
hi_itoa(num_str, key_len);
num_str_len = strlen(num_str);
sub_command->cmd[idx++] = '$';
memcpy(sub_command->cmd + idx, num_str, num_str_len);
idx += num_str_len;
memcpy(sub_command->cmd + idx, CRLF, CRLF_LEN);
idx += CRLF_LEN;
memcpy(sub_command->cmd + idx, kp->start,
key_len + kp->remain_len);
idx += key_len + kp->remain_len;
}
} else {
NOT_REACHED();
}
sub_command->type = command->type;
if (listAddNodeTail(commands, sub_command) == NULL) {
goto oom;
}
sub_commands[i] = NULL;
}
done:
hi_free(sub_commands);
if (slot_num >= 0 && commands != NULL && listLength(commands) == 1) {
listNode *list_node = listFirst(commands);
listDelNode(commands, list_node);
if (command->frag_seq) {
hi_free(command->frag_seq);
command->frag_seq = NULL;
}
command->slot_num = slot_num;
}
return slot_num;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
if (sub_commands != NULL) {
for (i = 0; i < REDIS_CLUSTER_SLOTS; i++) {
command_destroy(sub_commands[i]);
}
}
hi_free(sub_commands);
return -1; // failing slot_num
}
static void *command_post_fragment(redisClusterContext *cc, struct cmd *command,
hilist *commands) {
struct cmd *sub_command;
listNode *list_node;
redisReply *reply = NULL, *sub_reply;
long long count = 0;
listIter li;
listRewind(commands, &li);
while ((list_node = listNext(&li)) != NULL) {
sub_command = list_node->value;
reply = sub_command->reply;
if (reply == NULL) {
return NULL;
} else if (reply->type == REDIS_REPLY_ERROR) {
return reply;
}
if (command->type == CMD_REQ_REDIS_MGET) {
if (reply->type != REDIS_REPLY_ARRAY) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "reply type error");
return NULL;
}
} else if (command->type == CMD_REQ_REDIS_DEL) {
if (reply->type != REDIS_REPLY_INTEGER) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "reply type error");
return NULL;
}
count += reply->integer;
} else if (command->type == CMD_REQ_REDIS_EXISTS) {
if (reply->type != REDIS_REPLY_INTEGER) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "reply type error");
return NULL;
}
count += reply->integer;
} else if (command->type == CMD_REQ_REDIS_MSET) {
if (reply->type != REDIS_REPLY_STATUS || reply->len != 2 ||
strcmp(reply->str, REDIS_STATUS_OK) != 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "reply type error");
return NULL;
}
} else {
NOT_REACHED();
}
}
reply = hi_calloc(1, sizeof(*reply));
if (reply == NULL) {
goto oom;
}
if (command->type == CMD_REQ_REDIS_MGET) {
int i;
uint32_t key_count;
reply->type = REDIS_REPLY_ARRAY;
key_count = hiarray_n(command->keys);
reply->elements = key_count;
reply->element = hi_calloc(key_count, sizeof(*reply->element));
if (reply->element == NULL) {
goto oom;
}
for (i = key_count - 1; i >= 0; i--) { /* for each key */
sub_reply = command->frag_seq[i]->reply; /* get it's reply */
if (sub_reply == NULL) {
freeReplyObject(reply);
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"sub reply is null");
return NULL;
}
if (sub_reply->type == REDIS_REPLY_STRING) {
reply->element[i] = sub_reply;
} else if (sub_reply->type == REDIS_REPLY_ARRAY) {
if (sub_reply->elements == 0) {
freeReplyObject(reply);
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"sub reply elements error");
return NULL;
}
reply->element[i] = sub_reply->element[sub_reply->elements - 1];
sub_reply->elements--;
}
}
} else if (command->type == CMD_REQ_REDIS_DEL) {
reply->type = REDIS_REPLY_INTEGER;
reply->integer = count;
} else if (command->type == CMD_REQ_REDIS_EXISTS) {
reply->type = REDIS_REPLY_INTEGER;
reply->integer = count;
} else if (command->type == CMD_REQ_REDIS_MSET) {
reply->type = REDIS_REPLY_STATUS;
uint32_t str_len = strlen(REDIS_STATUS_OK);
reply->str = hi_malloc((str_len + 1) * sizeof(char));
if (reply->str == NULL) {
goto oom;
}
reply->len = str_len;
memcpy(reply->str, REDIS_STATUS_OK, str_len);
reply->str[str_len] = '\0';
} else {
NOT_REACHED();
}
return reply;
oom:
freeReplyObject(reply);
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return NULL;
}
/*
* Split the command into subcommands by slot
*
* Returns slot_num
* If slot_num < 0 or slot_num >= REDIS_CLUSTER_SLOTS means this function runs
* error; Otherwise if the commands > 1 , slot_num is the last subcommand slot
* number.
*/
static int command_format_by_slot(redisClusterContext *cc, struct cmd *command,
hilist *commands) {
struct keypos *kp;
int key_count;
int slot_num = -1;
if (cc == NULL || commands == NULL || command == NULL ||
command->cmd == NULL || command->clen <= 0) {
goto done;
}
redis_parse_cmd(command);
if (command->result == CMD_PARSE_ENOMEM) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
goto done;
} else if (command->result != CMD_PARSE_OK) {
__redisClusterSetError(cc, REDIS_ERR_PROTOCOL, command->errstr);
goto done;
}
key_count = hiarray_n(command->keys);
if (key_count <= 0) {
__redisClusterSetError(
cc, REDIS_ERR_OTHER,
"No keys in command(must have keys for redis cluster mode)");
goto done;
} else if (key_count == 1) {
kp = hiarray_get(command->keys, 0);
slot_num = keyHashSlot(kp->start, kp->end - kp->start);
command->slot_num = slot_num;
goto done;
}
slot_num = command_pre_fragment(cc, command, commands);
done:
return slot_num;
}
/* Deprecated function, replaced with redisClusterSetOptionMaxRetry() */
void redisClusterSetMaxRedirect(redisClusterContext *cc, int max_retry_count) {
if (cc == NULL || max_retry_count <= 0) {
return;
}
cc->max_retry_count = max_retry_count;
}
int redisClusterSetConnectCallback(redisClusterContext *cc,
void(fn)(const redisContext *c,
int status)) {
if (cc->on_connect == NULL) {
cc->on_connect = fn;
return REDIS_OK;
}
return REDIS_ERR;
}
int redisClusterSetEventCallback(redisClusterContext *cc,
void(fn)(const redisClusterContext *cc,
int event, void *privdata),
void *privdata) {
if (cc->event_callback == NULL) {
cc->event_callback = fn;
cc->event_privdata = privdata;
return REDIS_OK;
}
return REDIS_ERR;
}
void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd,
int len) {
redisReply *reply = NULL;
int slot_num;
struct cmd *command = NULL, *sub_command;
hilist *commands = NULL;
listNode *list_node;
if (cc == NULL) {
return NULL;
}
if (cc->err) {
cc->err = 0;
memset(cc->errstr, '\0', strlen(cc->errstr));
}
command = command_get();
if (command == NULL) {
goto oom;
}
command->cmd = cmd;
command->clen = len;
commands = listCreate();
if (commands == NULL) {
goto oom;
}
commands->free = listCommandFree;
slot_num = command_format_by_slot(cc, command, commands);
if (slot_num < 0) {
goto error;
} else if (slot_num >= REDIS_CLUSTER_SLOTS) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "slot_num is out of range");
goto error;
}
// all keys belong to one slot
if (listLength(commands) == 0) {
reply = redis_cluster_command_execute(cc, command);
goto done;
}
ASSERT(listLength(commands) != 1);
listIter li;
listRewind(commands, &li);
while ((list_node = listNext(&li)) != NULL) {
sub_command = list_node->value;
reply = redis_cluster_command_execute(cc, sub_command);
if (reply == NULL) {
goto error;
} else if (reply->type == REDIS_REPLY_ERROR) {
goto done;
}
sub_command->reply = reply;
}
reply = command_post_fragment(cc, command, commands);
done:
command->cmd = NULL;
command_destroy(command);
if (commands != NULL) {
listRelease(commands);
}
cc->retry_count = 0;
return reply;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
// passthrough
error:
if (command != NULL) {
command->cmd = NULL;
command_destroy(command);
}
if (commands != NULL) {
listRelease(commands);
}
cc->retry_count = 0;
return NULL;
}
void *redisClustervCommand(redisClusterContext *cc, const char *format,
va_list ap) {
redisReply *reply;
char *cmd;
int len;
if (cc == NULL) {
return NULL;
}
len = redisvFormatCommand(&cmd, format, ap);
if (len == -1) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return NULL;
} else if (len == -2) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "Invalid format string");
return NULL;
}
reply = redisClusterFormattedCommand(cc, cmd, len);
hi_free(cmd);
return reply;
}
void *redisClusterCommand(redisClusterContext *cc, const char *format, ...) {
va_list ap;
redisReply *reply = NULL;
va_start(ap, format);
reply = redisClustervCommand(cc, format, ap);
va_end(ap);
return reply;
}
void *redisClustervCommandToNode(redisClusterContext *cc,
redisClusterNode *node, const char *format,
va_list ap) {
redisContext *c;
int ret;
void *reply;
int updating_slotmap = 0;
c = ctx_get_by_node(cc, node);
if (c == NULL) {
return NULL;
} else if (c->err) {
__redisClusterSetError(cc, c->err, c->errstr);
return NULL;
}
if (cc->err) {
cc->err = 0;
memset(cc->errstr, '\0', sizeof(cc->errstr));
}
ret = redisvAppendCommand(c, format, ap);
if (ret != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
return NULL;
}
if (cc->need_update_route) {
/* Pipeline slotmap update on the same connection. */
if (clusterUpdateRouteSendCommand(cc, c) == REDIS_OK) {
updating_slotmap = 1;
}
}
if (redisGetReply(c, &reply) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
if (c->err != REDIS_ERR_OOM)
cc->need_update_route = 1;
return NULL;
}
if (updating_slotmap) {
/* Handle reply from pipelined CLUSTER SLOTS or CLUSTER NODES. */
if (clusterUpdateRouteHandleReply(cc, c) != REDIS_OK) {
/* Ignore error. Update will be triggered on the next command. */
cc->err = 0;
cc->errstr[0] = '\0';
}
}
return reply;
}
void *redisClusterCommandToNode(redisClusterContext *cc, redisClusterNode *node,
const char *format, ...) {
va_list ap;
redisReply *reply = NULL;
va_start(ap, format);
reply = redisClustervCommandToNode(cc, node, format, ap);
va_end(ap);
return reply;
}
void *redisClusterCommandArgv(redisClusterContext *cc, int argc,
const char **argv, const size_t *argvlen) {
redisReply *reply = NULL;
char *cmd;
int len;
len = redisFormatCommandArgv(&cmd, argc, argv, argvlen);
if (len == -1) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return NULL;
}
reply = redisClusterFormattedCommand(cc, cmd, len);
hi_free(cmd);
return reply;
}
int redisClusterAppendFormattedCommand(redisClusterContext *cc, char *cmd,
int len) {
int slot_num;
struct cmd *command = NULL, *sub_command;
hilist *commands = NULL;
listNode *list_node;
if (cc->requests == NULL) {
cc->requests = listCreate();
if (cc->requests == NULL) {
goto oom;
}
cc->requests->free = listCommandFree;
}
command = command_get();
if (command == NULL) {
goto oom;
}
command->cmd = cmd;
command->clen = len;
commands = listCreate();
if (commands == NULL) {
goto oom;
}
commands->free = listCommandFree;
slot_num = command_format_by_slot(cc, command, commands);
if (slot_num < 0) {
goto error;
} else if (slot_num >= REDIS_CLUSTER_SLOTS) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "slot_num is out of range");
goto error;
}
// Append command(s)
if (listLength(commands) == 0) {
// All keys belong to one slot
if (__redisClusterAppendCommand(cc, command) != REDIS_OK) {
goto error;
}
} else {
// Keys belongs to different slots
ASSERT(listLength(commands) != 1);
listIter li;
listRewind(commands, &li);
while ((list_node = listNext(&li)) != NULL) {
sub_command = list_node->value;
if (__redisClusterAppendCommand(cc, sub_command) != REDIS_OK) {
goto error;
}
}
}
if (listLength(commands) > 0) {
command->sub_commands = commands;
} else {
listRelease(commands);
}
commands = NULL;
command->cmd = NULL;
if (listAddNodeTail(cc->requests, command) == NULL) {
goto oom;
}
return REDIS_OK;
oom:
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
// passthrough
error:
if (command != NULL) {
command->cmd = NULL;
command_destroy(command);
}
if (commands != NULL) {
listRelease(commands);
}
/* Attention: mybe here we must pop the
sub_commands that had append to the nodes.
But now we do not handle it. */
return REDIS_ERR;
}
int redisClustervAppendCommand(redisClusterContext *cc, const char *format,
va_list ap) {
int ret;
char *cmd;
int len;
len = redisvFormatCommand(&cmd, format, ap);
if (len == -1) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
} else if (len == -2) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "Invalid format string");
return REDIS_ERR;
}
ret = redisClusterAppendFormattedCommand(cc, cmd, len);
hi_free(cmd);
return ret;
}
int redisClusterAppendCommand(redisClusterContext *cc, const char *format,
...) {
int ret;
va_list ap;
if (cc == NULL || format == NULL) {
return REDIS_ERR;
}
va_start(ap, format);
ret = redisClustervAppendCommand(cc, format, ap);
va_end(ap);
return ret;
}
int redisClustervAppendCommandToNode(redisClusterContext *cc,
redisClusterNode *node, const char *format,
va_list ap) {
redisContext *c;
struct cmd *command = NULL;
char *cmd = NULL;
int len;
if (cc->requests == NULL) {
cc->requests = listCreate();
if (cc->requests == NULL)
goto oom;
cc->requests->free = listCommandFree;
}
c = ctx_get_by_node(cc, node);
if (c == NULL) {
return REDIS_ERR;
} else if (c->err) {
__redisClusterSetError(cc, c->err, c->errstr);
return REDIS_ERR;
}
len = redisvFormatCommand(&cmd, format, ap);
if (len == -1) {
goto oom;
} else if (len == -2) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "Invalid format string");
return REDIS_ERR;
}
// Append the command to the outgoing hiredis buffer
if (redisAppendFormattedCommand(c, cmd, len) != REDIS_OK) {
__redisClusterSetError(cc, c->err, c->errstr);
hi_free(cmd);
return REDIS_ERR;
}
// Keep the command in the outstanding request list
command = command_get();
if (command == NULL) {
hi_free(cmd);
goto oom;
}
command->cmd = cmd;
command->clen = len;
command->node_addr = sdsnew(node->addr);
if (command->node_addr == NULL)
goto oom;
if (listAddNodeTail(cc->requests, command) == NULL)
goto oom;
return REDIS_OK;
oom:
command_destroy(command);
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
int redisClusterAppendCommandToNode(redisClusterContext *cc,
redisClusterNode *node, const char *format,
...) {
int ret;
va_list ap;
if (cc == NULL || node == NULL || format == NULL) {
return REDIS_ERR;
}
va_start(ap, format);
ret = redisClustervAppendCommandToNode(cc, node, format, ap);
va_end(ap);
return ret;
}
int redisClusterAppendCommandArgv(redisClusterContext *cc, int argc,
const char **argv, const size_t *argvlen) {
int ret;
char *cmd;
int len;
len = redisFormatCommandArgv(&cmd, argc, argv, argvlen);
if (len == -1) {
__redisClusterSetError(cc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
ret = redisClusterAppendFormattedCommand(cc, cmd, len);
hi_free(cmd);
return ret;
}
static int redisClusterSendAll(redisClusterContext *cc) {
dictEntry *de;
redisClusterNode *node;
redisContext *c = NULL;
int wdone = 0;
if (cc == NULL || cc->nodes == NULL) {
return REDIS_ERR;
}
dictIterator di;
dictInitIterator(&di, cc->nodes);
while ((de = dictNext(&di)) != NULL) {
node = dictGetEntryVal(de);
if (node == NULL) {
continue;
}
c = ctx_get_by_node(cc, node);
if (c == NULL) {
continue;
}
/* Write until done */
do {
if (redisBufferWrite(c, &wdone) == REDIS_ERR) {
return REDIS_ERR;
}
} while (!wdone);
}
return REDIS_OK;
}
static int redisClusterClearAll(redisClusterContext *cc) {
dictEntry *de;
redisClusterNode *node;
redisContext *c = NULL;
if (cc == NULL) {
return REDIS_ERR;
}
if (cc->err) {
cc->err = 0;
memset(cc->errstr, '\0', strlen(cc->errstr));
}
if (cc->nodes == NULL) {
return REDIS_ERR;
}
dictIterator di;
dictInitIterator(&di, cc->nodes);
while ((de = dictNext(&di)) != NULL) {
node = dictGetEntryVal(de);
if (node == NULL) {
continue;
}
c = node->con;
if (c == NULL) {
continue;
}
redisFree(c);
node->con = NULL;
}
return REDIS_OK;
}
int redisClusterGetReply(redisClusterContext *cc, void **reply) {
struct cmd *command, *sub_command;
hilist *commands = NULL;
listNode *list_command, *list_sub_command;
int slot_num;
void *sub_reply;
if (cc == NULL || reply == NULL)
return REDIS_ERR;
cc->err = 0;
cc->errstr[0] = '\0';
*reply = NULL;
if (cc->requests == NULL)
return REDIS_ERR; // No queued requests
list_command = listFirst(cc->requests);
// no more reply
if (list_command == NULL) {
*reply = NULL;
return REDIS_OK;
}
command = list_command->value;
if (command == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"command in the requests list is null");
goto error;
}
slot_num = command->slot_num;
if (slot_num >= 0) {
/* Command was sent via single slot */
listDelNode(cc->requests, list_command);
return __redisClusterGetReply(cc, slot_num, reply);
} else if (command->node_addr) {
/* Command was sent to a single node */
dictEntry *de;
de = dictFind(cc->nodes, command->node_addr);
if (de != NULL) {
listDelNode(cc->requests, list_command);
return __redisClusterGetReplyFromNode(cc, dictGetEntryVal(de),
reply);
} else {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"command was sent to a now unknown node");
goto error;
}
}
commands = command->sub_commands;
if (commands == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"sub_commands in command is null");
goto error;
}
ASSERT(listLength(commands) != 1);
listIter li;
listRewind(commands, &li);
while ((list_sub_command = listNext(&li)) != NULL) {
sub_command = list_sub_command->value;
if (sub_command == NULL) {
__redisClusterSetError(cc, REDIS_ERR_OTHER, "sub_command is null");
goto error;
}
slot_num = sub_command->slot_num;
if (slot_num < 0) {
__redisClusterSetError(cc, REDIS_ERR_OTHER,
"sub_command slot_num is less then zero");
goto error;
}
if (__redisClusterGetReply(cc, slot_num, &sub_reply) != REDIS_OK) {
goto error;
}
sub_command->reply = sub_reply;
}
*reply = command_post_fragment(cc, command, commands);
if (*reply == NULL) {
goto error;
}
listDelNode(cc->requests, list_command);
return REDIS_OK;
error:
listDelNode(cc->requests, list_command);
return REDIS_ERR;
}
/**
* Resets cluster state after pipeline.
* Resets Redis node connections if pipeline commands were not called beforehand.
*/
void redisClusterReset(redisClusterContext *cc) {
int status;
void *reply;
if (cc == NULL || cc->nodes == NULL) {
return;
}
if (cc->err) {
redisClusterClearAll(cc);
} else {
/* Write/flush each nodes output buffer to socket */
redisClusterSendAll(cc);
/* Expect a reply for each pipelined request */
do {
status = redisClusterGetReply(cc, &reply);
if (status == REDIS_OK) {
freeReplyObject(reply);
} else {
redisClusterClearAll(cc);
break;
}
} while (reply != NULL);
}
if (cc->requests) {
listRelease(cc->requests);
cc->requests = NULL;
}
if (cc->need_update_route) {
status = redisClusterUpdateSlotmap(cc);
if (status != REDIS_OK) {
/* Specific error already set */
return;
}
cc->need_update_route = 0;
}
}
/*############redis cluster async############*/
static void __redisClusterAsyncSetError(redisClusterAsyncContext *acc, int type,
const char *str) {
size_t len;
acc->err = type;
if (str != NULL) {
len = strlen(str);
len = len < (sizeof(acc->errstr) - 1) ? len : (sizeof(acc->errstr) - 1);
memcpy(acc->errstr, str, len);
acc->errstr[len] = '\0';
} else {
/* Only REDIS_ERR_IO may lack a description! */
assert(type == REDIS_ERR_IO);
strerror_r(errno, acc->errstr, sizeof(acc->errstr));
}
}
static redisClusterAsyncContext *
redisClusterAsyncInitialize(redisClusterContext *cc) {
redisClusterAsyncContext *acc;
if (cc == NULL) {
return NULL;
}
acc = hi_calloc(1, sizeof(redisClusterAsyncContext));
if (acc == NULL)
return NULL;
acc->cc = cc;
/* We want the error field to be accessible directly instead of requiring
* an indirection to the redisContext struct. */
// TODO: really needed?
acc->err = cc->err;
memcpy(acc->errstr, cc->errstr, 128);
return acc;
}
static cluster_async_data *cluster_async_data_create(void) {
/* use calloc to guarantee all fields are zeroed */
return hi_calloc(1, sizeof(cluster_async_data));
}
static void cluster_async_data_free(cluster_async_data *cad) {
if (cad == NULL) {
return;
}
command_destroy(cad->command);
hi_free(cad);
}
static void unlinkAsyncContextAndNode(void *data) {
redisClusterNode *node;
if (data) {
node = (redisClusterNode *)(data);
node->acon = NULL;
}
}
redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc,
redisClusterNode *node) {
redisAsyncContext *ac;
int ret;
if (node == NULL) {
return NULL;
}
ac = node->acon;
if (ac != NULL) {
if (ac->c.err == 0) {
return ac;
} else {
/* The cluster node has a hiredis context with errors. Hiredis
* will asynchronously destruct the context and unlink it from
* the cluster node object. Return an error until done.
* An example scenario is when sending a command from a command
* callback, which has a NULL reply due to a disconnect. */
__redisClusterAsyncSetError(acc, ac->c.err, ac->c.errstr);
return NULL;
}
}
// No async context exists, perform a connect
if (node->host == NULL || node->port <= 0) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER,
"node host or port is error");
return NULL;
}
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, node->host, node->port);
options.connect_timeout = acc->cc->connect_timeout;
options.command_timeout = acc->cc->command_timeout;
node->lastConnectionAttempt = hi_usec_now();
ac = redisAsyncConnectWithOptions(&options);
if (ac == NULL) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OOM, "Out of memory");
return NULL;
}
if (ac->err) {
__redisClusterAsyncSetError(acc, ac->err, ac->errstr);
redisAsyncFree(ac);
return NULL;
}
if (acc->cc->ssl &&
acc->cc->ssl_init_fn(&ac->c, acc->cc->ssl) != REDIS_OK) {
__redisClusterAsyncSetError(acc, ac->c.err, ac->c.errstr);
redisAsyncFree(ac);
return NULL;
}
// Authenticate when needed
if (acc->cc->password != NULL) {
if (acc->cc->username != NULL) {
ret = redisAsyncCommand(ac, NULL, NULL, "AUTH %s %s",
acc->cc->username, acc->cc->password);
} else {
ret =
redisAsyncCommand(ac, NULL, NULL, "AUTH %s", acc->cc->password);
}
if (ret != REDIS_OK) {
__redisClusterAsyncSetError(acc, ac->c.err, ac->c.errstr);
redisAsyncFree(ac);
return NULL;
}
}
if (acc->adapter) {
ret = acc->attach_fn(ac, acc->adapter);
if (ret != REDIS_OK) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER,
"Failed to attach event adapter");
redisAsyncFree(ac);
return NULL;
}
}
if (acc->onConnect) {
redisAsyncSetConnectCallback(ac, acc->onConnect);
}
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
else if (acc->onConnectNC) {
redisAsyncSetConnectCallbackNC(ac, acc->onConnectNC);
}
#endif
if (acc->onDisconnect) {
redisAsyncSetDisconnectCallback(ac, acc->onDisconnect);
}
ac->data = node;
ac->dataCleanup = unlinkAsyncContextAndNode;
node->acon = ac;
return ac;
}
redisClusterAsyncContext *redisClusterAsyncContextInit(void) {
redisClusterContext *cc;
redisClusterAsyncContext *acc;
cc = redisClusterContextInit();
if (cc == NULL) {
return NULL;
}
acc = redisClusterAsyncInitialize(cc);
if (acc == NULL) {
redisClusterFree(cc);
return NULL;
}
return acc;
}
redisClusterAsyncContext *redisClusterAsyncConnect(const char *addrs,
int flags) {
redisClusterContext *cc;
redisClusterAsyncContext *acc;
cc = redisClusterConnect(addrs, flags);
if (cc == NULL) {
return NULL;
}
acc = redisClusterAsyncInitialize(cc);
if (acc == NULL) {
redisClusterFree(cc);
return NULL;
}
return acc;
}
int redisClusterAsyncConnect2(redisClusterAsyncContext *acc) {
/* An adapter to an async event library is required. */
if (acc->adapter == NULL) {
return REDIS_ERR;
}
return updateSlotMapAsync(acc, NULL /*any node*/);
}
int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc,
redisConnectCallback *fn) {
if (acc->onConnect != NULL)
return REDIS_ERR;
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
if (acc->onConnectNC != NULL)
return REDIS_ERR;
#endif
acc->onConnect = fn;
return REDIS_OK;
}
#ifndef HIRCLUSTER_NO_NONCONST_CONNECT_CB
int redisClusterAsyncSetConnectCallbackNC(redisClusterAsyncContext *acc,
redisConnectCallbackNC *fn) {
if (acc->onConnectNC != NULL || acc->onConnect != NULL) {
return REDIS_ERR;
}
acc->onConnectNC = fn;
return REDIS_OK;
}
#endif
int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc,
redisDisconnectCallback *fn) {
if (acc->onDisconnect == NULL) {
acc->onDisconnect = fn;
return REDIS_OK;
}
return REDIS_ERR;
}
/* Reply callback function for CLUSTER SLOTS */
void clusterSlotsReplyCallback(redisAsyncContext *ac, void *r, void *privdata) {
UNUSED(ac);
redisReply *reply = (redisReply *)r;
redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata;
acc->lastSlotmapUpdateAttempt = hi_usec_now();
if (reply == NULL) {
/* Retry using available nodes */
updateSlotMapAsync(acc, NULL);
return;
}
redisClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_slots(cc, reply, cc->flags);
if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) {
/* Ignore failures for now */
}
}
/* Reply callback function for CLUSTER NODES */
void clusterNodesReplyCallback(redisAsyncContext *ac, void *r, void *privdata) {
UNUSED(ac);
redisReply *reply = (redisReply *)r;
redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata;
acc->lastSlotmapUpdateAttempt = hi_usec_now();
if (reply == NULL) {
/* Retry using available nodes */
updateSlotMapAsync(acc, NULL);
return;
}
redisClusterContext *cc = acc->cc;
dict *nodes = parse_cluster_nodes(cc, reply->str, reply->len, cc->flags);
if (updateNodesAndSlotmap(cc, nodes) != REDIS_OK) {
/* Ignore failures for now */
}
}
#define nodeIsConnected(n) \
((n)->acon != NULL && (n)->acon->err == 0 && \
(n)->acon->c.flags & REDIS_CONNECTED)
/* Select a node.
* Primarily selects a connected node found close to a randomly picked index of
* all known nodes. The random index should give a more even distribution of
* selected nodes. If no connected node is found while iterating to this index
* the remaining nodes are also checked until a connected node is found.
* If no connected node is found a node for which a connect has not been attempted
* within throttle-time, and is found near the picked index, is selected.
*/
static redisClusterNode *selectNode(dict *nodes) {
redisClusterNode *node, *selected = NULL;
dictIterator di;
dictInitIterator(&di, nodes);
int64_t throttleLimit = hi_usec_now() - SLOTMAP_UPDATE_THROTTLE_USEC;
unsigned long currentIndex = 0;
unsigned long checkIndex = random() % dictSize(nodes);
dictEntry *de;
while ((de = dictNext(&di)) != NULL) {
node = dictGetEntryVal(de);
if (nodeIsConnected(node)) {
/* Keep any connected node */
selected = node;
} else if (node->lastConnectionAttempt < throttleLimit &&
(selected == NULL || (currentIndex < checkIndex &&
!nodeIsConnected(selected)))) {
/* Keep an accepted node when none is yet found, or
any accepted node until the chosen index is reached */
selected = node;
}
/* Return a found connected node when chosen index is reached. */
if (currentIndex >= checkIndex && selected != NULL &&
nodeIsConnected(selected))
break;
currentIndex += 1;
}
return selected;
}
/* Update the slot map by querying a selected cluster node. If ac is NULL, an
* arbitrary connected node is selected. */
static int updateSlotMapAsync(redisClusterAsyncContext *acc,
redisAsyncContext *ac) {
if (acc->lastSlotmapUpdateAttempt == SLOTMAP_UPDATE_ONGOING) {
/* Don't allow concurrent slot map updates. */
return REDIS_ERR;
}
if (acc->cc->flags & HIRCLUSTER_FLAG_SHUTDOWN) {
/* No slot map updates during a client shutdown. */
return REDIS_ERR;
}
if (ac == NULL) {
if (acc->cc->nodes == NULL) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER, "no nodes added");
goto error;
}
redisClusterNode *node = selectNode(acc->cc->nodes);
if (node == NULL) {
goto error;
}
/* Get hiredis context, connect if needed */
ac = actx_get_by_node(acc, node);
}
if (ac == NULL)
goto error; /* Specific error already set */
/* Send a command depending of config */
int status;
if (acc->cc->flags & HIRCLUSTER_FLAG_ROUTE_USE_SLOTS) {
status = redisAsyncCommand(ac, clusterSlotsReplyCallback, acc,
REDIS_COMMAND_CLUSTER_SLOTS);
} else {
status = redisAsyncCommand(ac, clusterNodesReplyCallback, acc,
REDIS_COMMAND_CLUSTER_NODES);
}
if (status == REDIS_OK) {
acc->lastSlotmapUpdateAttempt = SLOTMAP_UPDATE_ONGOING;
return REDIS_OK;
}
error:
acc->lastSlotmapUpdateAttempt = hi_usec_now();
return REDIS_ERR;
}
/* Start a slotmap update if the throttling allows. */
static void throttledUpdateSlotMapAsync(redisClusterAsyncContext *acc,
redisAsyncContext *ac) {
if (acc->lastSlotmapUpdateAttempt != SLOTMAP_UPDATE_ONGOING &&
(acc->lastSlotmapUpdateAttempt + SLOTMAP_UPDATE_THROTTLE_USEC) <
hi_usec_now()) {
updateSlotMapAsync(acc, ac);
}
}
static void redisClusterAsyncCallback(redisAsyncContext *ac, void *r,
void *privdata) {
int ret;
redisReply *reply = r;
cluster_async_data *cad = privdata;
redisClusterAsyncContext *acc;
redisClusterContext *cc;
redisAsyncContext *ac_retry = NULL;
int error_type;
redisClusterNode *node;
struct cmd *command;
if (cad == NULL) {
goto error;
}
acc = cad->acc;
if (acc == NULL) {
goto error;
}
cc = acc->cc;
if (cc == NULL) {
goto error;
}
command = cad->command;
if (command == NULL) {
goto error;
}
if (reply == NULL) {
/* Copy reply specific error from hiredis */
__redisClusterAsyncSetError(acc, ac->err, ac->errstr);
node = (redisClusterNode *)ac->data;
if (node == NULL)
goto done; /* Node already removed from topology */
/* Start a slotmap update when the throttling allows */
throttledUpdateSlotMapAsync(acc, NULL);
goto done;
}
/* Skip retry handling when not expected, or during a client shutdown. */
if (cad->retry_count == NO_RETRY || cc->flags & HIRCLUSTER_FLAG_SHUTDOWN)
goto done;
error_type = cluster_reply_error_type(reply);
if (error_type > CLUSTER_NOT_ERR && error_type < CLUSTER_ERR_SENTINEL) {
cad->retry_count++;
if (cad->retry_count > cc->max_retry_count) {
cad->retry_count = 0;
__redisClusterAsyncSetError(acc, REDIS_ERR_CLUSTER_TOO_MANY_RETRIES,
"too many cluster retries");
goto done;
}
int slot = -1;
switch (error_type) {
case CLUSTER_ERR_MOVED:
/* Initiate slot mapping update using the node that sent MOVED. */
throttledUpdateSlotMapAsync(acc, ac);
node = getNodeFromRedirectReply(cc, reply, &slot);
if (node == NULL) {
__redisClusterAsyncSetError(acc, cc->err, cc->errstr);
goto done;
}
/* Update the slot mapping entry for this slot. */
if (slot >= 0) {
cc->table[slot] = node;
}
ac_retry = actx_get_by_node(acc, node);
break;
case CLUSTER_ERR_ASK:
node = getNodeFromRedirectReply(cc, reply, NULL);
if (node == NULL) {
__redisClusterAsyncSetError(acc, cc->err, cc->errstr);
goto done;
}
ac_retry = actx_get_by_node(acc, node);
if (ac_retry == NULL) {
/* Specific error already set */
goto done;
}
ret = redisAsyncCommand(ac_retry, NULL, NULL, REDIS_COMMAND_ASKING);
if (ret != REDIS_OK) {
goto error;
}
break;
case CLUSTER_ERR_TRYAGAIN:
case CLUSTER_ERR_CLUSTERDOWN:
ac_retry = ac;
break;
default:
goto done;
break;
}
goto retry;
}
done:
if (acc->err) {
cad->callback(acc, NULL, cad->privdata);
} else {
cad->callback(acc, r, cad->privdata);
}
if (cc->err) {
cc->err = 0;
memset(cc->errstr, '\0', strlen(cc->errstr));
}
if (acc->err) {
acc->err = 0;
memset(acc->errstr, '\0', strlen(acc->errstr));
}
cluster_async_data_free(cad);
return;
retry:
ret = redisAsyncFormattedCommand(ac_retry, redisClusterAsyncCallback, cad,
command->cmd, command->clen);
if (ret != REDIS_OK) {
goto error;
}
return;
error:
cluster_async_data_free(cad);
}
int redisClusterAsyncFormattedCommand(redisClusterAsyncContext *acc,
redisClusterCallbackFn *fn,
void *privdata, char *cmd, int len) {
redisClusterContext *cc;
int status = REDIS_OK;
int slot_num;
redisClusterNode *node;
redisAsyncContext *ac;
struct cmd *command = NULL;
hilist *commands = NULL;
cluster_async_data *cad = NULL;
if (acc == NULL) {
return REDIS_ERR;
}
cc = acc->cc;
/* Don't accept new commands when the client is about to be shutdown. */
if (cc->flags & HIRCLUSTER_FLAG_SHUTDOWN) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER, "client closing");
return REDIS_ERR;
}
if (cc->err) {
cc->err = 0;
memset(cc->errstr, '\0', strlen(cc->errstr));
}
if (acc->err) {
acc->err = 0;
memset(acc->errstr, '\0', strlen(acc->errstr));
}
command = command_get();
if (command == NULL) {
goto oom;
}
command->cmd = hi_calloc(len, sizeof(*command->cmd));
if (command->cmd == NULL) {
goto oom;
}
memcpy(command->cmd, cmd, len);
command->clen = len;
commands = listCreate();
if (commands == NULL) {
goto oom;
}
commands->free = listCommandFree;
slot_num = command_format_by_slot(cc, command, commands);
if (slot_num < 0) {
__redisClusterAsyncSetError(acc, cc->err, cc->errstr);
goto error;
} else if (slot_num >= REDIS_CLUSTER_SLOTS) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER,
"slot_num is out of range");
goto error;
}
// all keys not belong to one slot
if (listLength(commands) > 0) {
ASSERT(listLength(commands) != 1);
__redisClusterAsyncSetError(
acc, REDIS_ERR_OTHER,
"Asynchronous API now not support multi-key command");
goto error;
}
node = node_get_by_table(cc, (uint32_t)slot_num);
if (node == NULL) {
/* Initiate a slotmap update since the slot is not served. */
throttledUpdateSlotMapAsync(acc, NULL);
/* node_get_by_table() has set the error on cc. */
__redisClusterAsyncSetError(acc, cc->err, cc->errstr);
goto error;
}
ac = actx_get_by_node(acc, node);
if (ac == NULL) {
/* Specific error already set */
goto error;
}
cad = cluster_async_data_create();
if (cad == NULL) {
goto oom;
}
cad->acc = acc;
cad->command = command;
command = NULL; /* Memory ownership moved. */
cad->callback = fn;
cad->privdata = privdata;
status = redisAsyncFormattedCommand(ac, redisClusterAsyncCallback, cad, cmd,
len);
if (status != REDIS_OK) {
__redisClusterAsyncSetError(acc, ac->err, ac->errstr);
goto error;
}
if (commands != NULL) {
listRelease(commands);
}
return REDIS_OK;
oom:
__redisClusterAsyncSetError(acc, REDIS_ERR_OOM, "Out of memory");
// passthrough
error:
cluster_async_data_free(cad);
command_destroy(command);
if (commands != NULL) {
listRelease(commands);
}
return REDIS_ERR;
}
int redisClusterAsyncFormattedCommandToNode(redisClusterAsyncContext *acc,
redisClusterNode *node,
redisClusterCallbackFn *fn,
void *privdata, char *cmd,
int len) {
redisClusterContext *cc = acc->cc;
redisAsyncContext *ac;
int status;
cluster_async_data *cad = NULL;
struct cmd *command = NULL;
/* Don't accept new commands when the client is about to be shutdown. */
if (cc->flags & HIRCLUSTER_FLAG_SHUTDOWN) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER, "disconnecting");
return REDIS_ERR;
}
ac = actx_get_by_node(acc, node);
if (ac == NULL) {
/* Specific error already set */
return REDIS_ERR;
}
if (cc->err) {
cc->err = 0;
memset(cc->errstr, '\0', strlen(cc->errstr));
}
if (acc->err) {
acc->err = 0;
memset(acc->errstr, '\0', strlen(acc->errstr));
}
command = command_get();
if (command == NULL) {
goto oom;
}
command->cmd = hi_calloc(len, sizeof(*command->cmd));
if (command->cmd == NULL) {
goto oom;
}
memcpy(command->cmd, cmd, len);
command->clen = len;
cad = cluster_async_data_create();
if (cad == NULL)
goto oom;
cad->acc = acc;
cad->command = command;
command = NULL; /* Memory ownership moved. */
cad->callback = fn;
cad->privdata = privdata;
cad->retry_count = NO_RETRY;
status = redisAsyncFormattedCommand(ac, redisClusterAsyncCallback, cad, cmd,
len);
if (status != REDIS_OK) {
__redisClusterAsyncSetError(acc, ac->err, ac->errstr);
goto error;
}
return REDIS_OK;
oom:
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER, "Out of memory");
// passthrough
error:
cluster_async_data_free(cad);
command_destroy(command);
return REDIS_ERR;
}
int redisClustervAsyncCommand(redisClusterAsyncContext *acc,
redisClusterCallbackFn *fn, void *privdata,
const char *format, va_list ap) {
int ret;
char *cmd;
int len;
if (acc == NULL) {
return REDIS_ERR;
}
len = redisvFormatCommand(&cmd, format, ap);
if (len == -1) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
} else if (len == -2) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER,
"Invalid format string");
return REDIS_ERR;
}
ret = redisClusterAsyncFormattedCommand(acc, fn, privdata, cmd, len);
hi_free(cmd);
return ret;
}
int redisClusterAsyncCommand(redisClusterAsyncContext *acc,
redisClusterCallbackFn *fn, void *privdata,
const char *format, ...) {
int ret;
va_list ap;
va_start(ap, format);
ret = redisClustervAsyncCommand(acc, fn, privdata, format, ap);
va_end(ap);
return ret;
}
int redisClusterAsyncCommandToNode(redisClusterAsyncContext *acc,
redisClusterNode *node,
redisClusterCallbackFn *fn, void *privdata,
const char *format, ...) {
int ret;
va_list ap;
int len;
char *cmd = NULL;
/* Allocate cmd and encode the variadic command */
va_start(ap, format);
len = redisvFormatCommand(&cmd, format, ap);
va_end(ap);
if (len == -1) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER, "Out of memory");
return REDIS_ERR;
} else if (len == -2) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OTHER,
"Invalid format string");
return REDIS_ERR;
}
ret = redisClusterAsyncFormattedCommandToNode(acc, node, fn, privdata, cmd,
len);
hi_free(cmd);
return ret;
}
int redisClusterAsyncCommandArgv(redisClusterAsyncContext *acc,
redisClusterCallbackFn *fn, void *privdata,
int argc, const char **argv,
const size_t *argvlen) {
int ret;
char *cmd;
int len;
len = redisFormatCommandArgv(&cmd, argc, argv, argvlen);
if (len == -1) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
ret = redisClusterAsyncFormattedCommand(acc, fn, privdata, cmd, len);
hi_free(cmd);
return ret;
}
int redisClusterAsyncCommandArgvToNode(redisClusterAsyncContext *acc,
redisClusterNode *node,
redisClusterCallbackFn *fn,
void *privdata, int argc,
const char **argv,
const size_t *argvlen) {
int ret;
char *cmd;
int len;
len = redisFormatCommandArgv(&cmd, argc, argv, argvlen);
if (len == -1) {
__redisClusterAsyncSetError(acc, REDIS_ERR_OOM, "Out of memory");
return REDIS_ERR;
}
ret = redisClusterAsyncFormattedCommandToNode(acc, node, fn, privdata, cmd,
len);
hi_free(cmd);
return ret;
}
void redisClusterAsyncDisconnect(redisClusterAsyncContext *acc) {
redisClusterContext *cc;
redisAsyncContext *ac;
dictEntry *de;
redisClusterNode *node;
if (acc == NULL) {
return;
}
cc = acc->cc;
cc->flags |= HIRCLUSTER_FLAG_SHUTDOWN;
if (cc->nodes == NULL) {
return;
}
dictIterator di;
dictInitIterator(&di, cc->nodes);
while ((de = dictNext(&di)) != NULL) {
node = dictGetEntryVal(de);
ac = node->acon;
if (ac == NULL) {
continue;
}
redisAsyncDisconnect(ac);
}
}
void redisClusterAsyncFree(redisClusterAsyncContext *acc) {
redisClusterContext *cc;
if (acc == NULL) {
return;
}
cc = acc->cc;
cc->flags |= HIRCLUSTER_FLAG_SHUTDOWN;
redisClusterFree(cc);
hi_free(acc);
}
/* Initiate an iterator for iterating over current cluster nodes */
void redisClusterInitNodeIterator(redisClusterNodeIterator *iter,
redisClusterContext *cc) {
iter->cc = cc;
iter->route_version = cc->route_version;
dictInitIterator(&iter->di, cc->nodes);
iter->retries_left = 1;
}
/* Get next node from the iterator
* The iterator will restart if the routing table is updated
* before all nodes have been iterated. */
redisClusterNode *redisClusterNodeNext(redisClusterNodeIterator *iter) {
if (iter->retries_left <= 0)
return NULL;
if (iter->route_version != iter->cc->route_version) {
// The routing table has changed and current iterator
// is invalid. The nodes dict has been recreated in
// the cluster context. We need to re-init the dictIter.
dictInitIterator(&iter->di, iter->cc->nodes);
iter->route_version = iter->cc->route_version;
iter->retries_left--;
}
dictEntry *de;
if ((de = dictNext(&iter->di)) != NULL)
return dictGetEntryVal(de);
else
return NULL;
}
/* Get hash slot for given key string, which can include hash tags */
unsigned int redisClusterGetSlotByKey(char *key) {
return keyHashSlot(key, strlen(key));
}
/* Get node that handles given key string, which can include hash tags */
redisClusterNode *redisClusterGetNodeByKey(redisClusterContext *cc, char *key) {
return node_get_by_table(cc, keyHashSlot(key, strlen(key)));
}