#include "adapters/libevent.h"
#include "hircluster.h"
#include "test_utils.h"
#include <assert.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define CLUSTER_NODE "127.0.0.1:7000"
void test_command_to_single_node(redisClusterContext *cc) {
redisReply *reply;
dictIterator di;
dictInitIterator(&di, cc->nodes);
dictEntry *de = dictNext(&di);
assert(de);
redisClusterNode *node = dictGetEntryVal(de);
assert(node);
reply = redisClusterCommandToNode(cc, node, "DBSIZE");
CHECK_REPLY(cc, reply);
CHECK_REPLY_TYPE(reply, REDIS_REPLY_INTEGER);
freeReplyObject(reply);
}
void test_command_to_all_nodes(redisClusterContext *cc) {
redisClusterNodeIterator ni;
redisClusterInitNodeIterator(&ni, cc);
redisClusterNode *node;
while ((node = redisClusterNodeNext(&ni)) != NULL) {
redisReply *reply;
reply = redisClusterCommandToNode(cc, node, "DBSIZE");
CHECK_REPLY(cc, reply);
CHECK_REPLY_TYPE(reply, REDIS_REPLY_INTEGER);
freeReplyObject(reply);
}
}
void test_transaction(redisClusterContext *cc) {
redisClusterNode *node = redisClusterGetNodeByKey(cc, "foo");
assert(node);
redisReply *reply;
reply = redisClusterCommandToNode(cc, node, "MULTI");
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);
reply = redisClusterCommandToNode(cc, node, "SET foo 99");
CHECK_REPLY_QUEUED(cc, reply);
freeReplyObject(reply);
reply = redisClusterCommandToNode(cc, node, "INCR foo");
CHECK_REPLY_QUEUED(cc, reply);
freeReplyObject(reply);
reply = redisClusterCommandToNode(cc, node, "EXEC");
CHECK_REPLY_ARRAY(cc, reply, 2);
CHECK_REPLY_OK(cc, reply->element[0]);
CHECK_REPLY_INT(cc, reply->element[1], 100);
freeReplyObject(reply);
}
void test_streams(redisClusterContext *cc) {
redisReply *reply;
char *id;
/* Get the node that handles given stream */
redisClusterNode *node = redisClusterGetNodeByKey(cc, "mystream");
assert(node);
/* Preparation: remove old stream/key */
reply = redisClusterCommandToNode(cc, node, "DEL mystream");
CHECK_REPLY_TYPE(reply, REDIS_REPLY_INTEGER);
freeReplyObject(reply);
/* Query wrong node */
redisClusterNode *wrongNode = redisClusterGetNodeByKey(cc, "otherstream");
assert(node != wrongNode);
reply = redisClusterCommandToNode(cc, wrongNode, "XLEN mystream");
CHECK_REPLY_ERROR(cc, reply, "MOVED");
freeReplyObject(reply);
/* Verify stream length before adding entries */
reply = redisClusterCommandToNode(cc, node, "XLEN mystream");
CHECK_REPLY_INT(cc, reply, 0);
freeReplyObject(reply);
/* Add entries to a created stream */
reply = redisClusterCommandToNode(cc, node, "XADD mystream * name t800");
CHECK_REPLY_TYPE(reply, REDIS_REPLY_STRING);
freeReplyObject(reply);
reply = redisClusterCommandToNode(
cc, node, "XADD mystream * name Sara surname OConnor");
CHECK_REPLY_TYPE(reply, REDIS_REPLY_STRING);
id = strdup(reply->str); /* Keep this id for later inspections */
freeReplyObject(reply);
/* Verify stream length after adding entries */
reply = redisClusterCommandToNode(cc, node, "XLEN mystream");
CHECK_REPLY_INT(cc, reply, 2);
freeReplyObject(reply);
/* Modify the stream */
reply = redisClusterCommandToNode(cc, node, "XTRIM mystream MAXLEN 1");
CHECK_REPLY_INT(cc, reply, 1);
freeReplyObject(reply);
/* Verify stream length after modifying the stream */
reply = redisClusterCommandToNode(cc, node, "XLEN mystream");
CHECK_REPLY_INT(cc, reply, 1); /* 1 entry left */
freeReplyObject(reply);
/* Read from the stream */
reply =
redisClusterCommandToNode(cc, node, "XREAD COUNT 2 STREAMS mystream 0");
CHECK_REPLY_ARRAY(cc, reply, 1); /* Reply from a single stream */
/* Verify the reply from stream */
CHECK_REPLY_ARRAY(cc, reply->element[0], 2);
CHECK_REPLY_STR(cc, reply->element[0]->element[0], "mystream");
CHECK_REPLY_ARRAY(cc, reply->element[0]->element[1], 1); /* single entry */
/* Verify the entry, an array of id and field+value elements */
redisReply *entry = reply->element[0]->element[1]->element[0];
CHECK_REPLY_ARRAY(cc, entry, 2);
CHECK_REPLY_STR(cc, entry->element[0], id);
CHECK_REPLY_ARRAY(cc, entry->element[1], 4);
CHECK_REPLY_STR(cc, entry->element[1]->element[0], "name");
CHECK_REPLY_STR(cc, entry->element[1]->element[1], "Sara");
CHECK_REPLY_STR(cc, entry->element[1]->element[2], "surname");
CHECK_REPLY_STR(cc, entry->element[1]->element[3], "OConnor");
freeReplyObject(reply);
/* Delete the entry in stream */
reply = redisClusterCommandToNode(cc, node, "XDEL mystream %s", id);
CHECK_REPLY_INT(cc, reply, 1);
freeReplyObject(reply);
/* Blocking read of stream */
reply = redisClusterCommandToNode(
cc, node, "XREAD COUNT 2 BLOCK 200 STREAMS mystream 0");
CHECK_REPLY_NIL(cc, reply);
freeReplyObject(reply);
/* Create a consumer group */
reply = redisClusterCommandToNode(cc, node,
"XGROUP CREATE mystream mygroup1 0");
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);
if (!redis_version_less_than(6, 2)) {
/* Create a consumer */
reply = redisClusterCommandToNode(
cc, node, "XGROUP CREATECONSUMER mystream mygroup1 myconsumer123");
CHECK_REPLY_INT(cc, reply, 1);
freeReplyObject(reply);
}
/* Blocking read of consumer group */
reply = redisClusterCommandToNode(cc, node,
"XREADGROUP GROUP mygroup1 myconsumer123 "
"COUNT 2 BLOCK 200 STREAMS mystream 0");
CHECK_REPLY_TYPE(reply, REDIS_REPLY_ARRAY);
freeReplyObject(reply);
free(id);
}
void test_pipeline_to_single_node(redisClusterContext *cc) {
int status;
redisReply *reply;
dictIterator di;
dictInitIterator(&di, cc->nodes);
dictEntry *de = dictNext(&di);
assert(de);
redisClusterNode *node = dictGetEntryVal(de);
assert(node);
status = redisClusterAppendCommandToNode(cc, node, "DBSIZE");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
// Trigger send of pipeline commands
redisClusterGetReply(cc, (void *)&reply);
CHECK_REPLY(cc, reply);
CHECK_REPLY_TYPE(reply, REDIS_REPLY_INTEGER);
freeReplyObject(reply);
}
void test_pipeline_to_all_nodes(redisClusterContext *cc) {
redisClusterNodeIterator ni;
redisClusterInitNodeIterator(&ni, cc);
redisClusterNode *node;
while ((node = redisClusterNodeNext(&ni)) != NULL) {
int status = redisClusterAppendCommandToNode(cc, node, "DBSIZE");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
}
// Get replies from 3 node cluster
redisReply *reply;
redisClusterGetReply(cc, (void *)&reply);
CHECK_REPLY(cc, reply);
CHECK_REPLY_TYPE(reply, REDIS_REPLY_INTEGER);
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply);
CHECK_REPLY(cc, reply);
CHECK_REPLY_TYPE(reply, REDIS_REPLY_INTEGER);
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply);
CHECK_REPLY(cc, reply);
CHECK_REPLY_TYPE(reply, REDIS_REPLY_INTEGER);
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply);
assert(reply == NULL);
}
void test_pipeline_transaction(redisClusterContext *cc) {
int status;
redisReply *reply;
redisClusterNode *node = redisClusterGetNodeByKey(cc, "foo");
assert(node);
status = redisClusterAppendCommandToNode(cc, node, "MULTI");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommandToNode(cc, node, "SET foo 199");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommandToNode(cc, node, "INCR foo");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommandToNode(cc, node, "EXEC");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
// Trigger send of pipeline commands
{
redisClusterGetReply(cc, (void *)&reply); // MULTI
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply); // SET
CHECK_REPLY_QUEUED(cc, reply);
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply); // INCR
CHECK_REPLY_QUEUED(cc, reply);
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply); // EXEC
CHECK_REPLY_ARRAY(cc, reply, 2);
CHECK_REPLY_OK(cc, reply->element[0]);
CHECK_REPLY_INT(cc, reply->element[1], 200);
freeReplyObject(reply);
}
}
//------------------------------------------------------------------------------
// Async API
//------------------------------------------------------------------------------
typedef struct ExpectedResult {
int type;
char *str;
size_t elements;
bool disconnect;
bool noreply;
char *errstr;
} ExpectedResult;
// Callback for Redis connects and disconnects
void callbackExpectOk(const redisAsyncContext *ac, int status) {
UNUSED(ac);
assert(status == REDIS_OK);
}
// Callback for async commands, verifies the redisReply
void commandCallback(redisClusterAsyncContext *cc, void *r, void *privdata) {
redisReply *reply = (redisReply *)r;
ExpectedResult *expect = (ExpectedResult *)privdata;
if (expect->noreply) {
assert(reply == NULL);
assert(strcmp(cc->errstr, expect->errstr) == 0);
} else {
assert(reply != NULL);
assert(reply->type == expect->type);
switch (reply->type) {
case REDIS_REPLY_ARRAY:
assert(reply->elements == expect->elements);
assert(reply->str == NULL);
break;
case REDIS_REPLY_INTEGER:
assert(reply->elements == 0);
assert(reply->str == NULL);
break;
case REDIS_REPLY_STATUS:
assert(strcmp(reply->str, expect->str) == 0);
assert(reply->elements == 0);
break;
default:
assert(0);
}
}
if (expect->disconnect)
redisClusterAsyncDisconnect(cc);
}
void test_async_to_single_node(void) {
int status;
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
redisClusterAsyncSetConnectCallback(acc, callbackExpectOk);
redisClusterAsyncSetDisconnectCallback(acc, callbackExpectOk);
redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
redisClusterSetOptionMaxRetry(acc->cc, 1);
redisClusterSetOptionRouteUseSlots(acc->cc);
status = redisClusterConnect2(acc->cc);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);
dictIterator di;
dictInitIterator(&di, acc->cc->nodes);
dictEntry *de = dictNext(&di);
assert(de);
redisClusterNode *node = dictGetEntryVal(de);
assert(node);
ExpectedResult r1 = {.type = REDIS_REPLY_INTEGER, .disconnect = true};
status = redisClusterAsyncCommandToNode(acc, node, commandCallback, &r1,
"DBSIZE");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
event_base_dispatch(base);
redisClusterAsyncFree(acc);
event_base_free(base);
}
void test_async_formatted_to_single_node(void) {
int status;
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
redisClusterAsyncSetConnectCallback(acc, callbackExpectOk);
redisClusterAsyncSetDisconnectCallback(acc, callbackExpectOk);
redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
redisClusterSetOptionMaxRetry(acc->cc, 1);
redisClusterSetOptionRouteUseSlots(acc->cc);
status = redisClusterConnect2(acc->cc);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);
dictIterator di;
dictInitIterator(&di, acc->cc->nodes);
dictEntry *de = dictNext(&di);
assert(de);
redisClusterNode *node = dictGetEntryVal(de);
assert(node);
ExpectedResult r1 = {.type = REDIS_REPLY_INTEGER, .disconnect = true};
char command[] = "*1\r\n$6\r\nDBSIZE\r\n";
status = redisClusterAsyncFormattedCommandToNode(
acc, node, commandCallback, &r1, command, strlen(command));
ASSERT_MSG(status == REDIS_OK, acc->errstr);
event_base_dispatch(base);
redisClusterAsyncFree(acc);
event_base_free(base);
}
void test_async_command_argv_to_single_node(void) {
int status;
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
redisClusterAsyncSetConnectCallback(acc, callbackExpectOk);
redisClusterAsyncSetDisconnectCallback(acc, callbackExpectOk);
redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
redisClusterSetOptionMaxRetry(acc->cc, 1);
redisClusterSetOptionRouteUseSlots(acc->cc);
status = redisClusterConnect2(acc->cc);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);
dictIterator di;
dictInitIterator(&di, acc->cc->nodes);
dictEntry *de = dictNext(&di);
assert(de);
redisClusterNode *node = dictGetEntryVal(de);
assert(node);
ExpectedResult r1 = {.type = REDIS_REPLY_INTEGER, .disconnect = true};
status = redisClusterAsyncCommandArgvToNode(acc, node, commandCallback, &r1,
1, (const char *[]){"DBSIZE"},
(size_t[]){6});
ASSERT_MSG(status == REDIS_OK, acc->errstr);
event_base_dispatch(base);
redisClusterAsyncFree(acc);
event_base_free(base);
}
void test_async_to_all_nodes(void) {
int status;
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
redisClusterAsyncSetConnectCallback(acc, callbackExpectOk);
redisClusterAsyncSetDisconnectCallback(acc, callbackExpectOk);
redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
redisClusterSetOptionMaxRetry(acc->cc, 1);
redisClusterSetOptionRouteUseSlots(acc->cc);
status = redisClusterConnect2(acc->cc);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);
redisClusterNodeIterator ni;
redisClusterInitNodeIterator(&ni, acc->cc);
ExpectedResult r1 = {.type = REDIS_REPLY_INTEGER};
redisClusterNode *node;
while ((node = redisClusterNodeNext(&ni)) != NULL) {
status = redisClusterAsyncCommandToNode(acc, node, commandCallback, &r1,
"DBSIZE");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
}
// Normal command to trigger disconnect
ExpectedResult r2 = {
.type = REDIS_REPLY_STATUS, .str = "OK", .disconnect = true};
status = redisClusterAsyncCommand(acc, commandCallback, &r2, "SET foo bar");
event_base_dispatch(base);
redisClusterAsyncFree(acc);
event_base_free(base);
}
void test_async_transaction(void) {
int status;
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
redisClusterAsyncSetConnectCallback(acc, callbackExpectOk);
redisClusterAsyncSetDisconnectCallback(acc, callbackExpectOk);
redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
redisClusterSetOptionMaxRetry(acc->cc, 1);
redisClusterSetOptionRouteUseSlots(acc->cc);
status = redisClusterConnect2(acc->cc);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);
redisClusterNode *node = redisClusterGetNodeByKey(acc->cc, "foo");
assert(node);
ExpectedResult r1 = {.type = REDIS_REPLY_STATUS, .str = "OK"};
status = redisClusterAsyncCommandToNode(acc, node, commandCallback, &r1,
"MULTI");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
ExpectedResult r2 = {.type = REDIS_REPLY_STATUS, .str = "QUEUED"};
status = redisClusterAsyncCommandToNode(acc, node, commandCallback, &r2,
"SET foo 99");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
ExpectedResult r3 = {.type = REDIS_REPLY_STATUS, .str = "QUEUED"};
status = redisClusterAsyncCommandToNode(acc, node, commandCallback, &r3,
"INCR foo");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
/* The EXEC command will return an array with result from 2 queued commands. */
ExpectedResult r4 = {
.type = REDIS_REPLY_ARRAY, .elements = 2, .disconnect = true};
status = redisClusterAsyncCommandToNode(acc, node, commandCallback, &r4,
"EXEC ");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
event_base_dispatch(base);
redisClusterAsyncFree(acc);
event_base_free(base);
}
int main(void) {
int status;
redisClusterContext *cc = redisClusterContextInit();
assert(cc);
redisClusterSetOptionAddNodes(cc, CLUSTER_NODE);
redisClusterSetOptionRouteUseSlots(cc);
redisClusterSetOptionMaxRetry(cc, 1);
status = redisClusterConnect2(cc);
ASSERT_MSG(status == REDIS_OK, cc->errstr);
load_redis_version(cc);
// Synchronous API
test_command_to_single_node(cc);
test_command_to_all_nodes(cc);
test_transaction(cc);
test_streams(cc);
// Pipeline API
test_pipeline_to_single_node(cc);
test_pipeline_to_all_nodes(cc);
test_pipeline_transaction(cc);
redisClusterFree(cc);
// Asynchronous API
test_async_to_single_node();
test_async_formatted_to_single_node();
test_async_command_argv_to_single_node();
test_async_to_all_nodes();
test_async_transaction();
return 0;
}