#include "adapters/libevent.h"
#include "hircluster.h"
#include "test_utils.h"
#include <assert.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define CLUSTER_NODE "127.0.0.1:7000"
// Test of two pipelines using sync API
void test_pipeline(void) {
redisClusterContext *cc = redisClusterContextInit();
assert(cc);
int status;
status = redisClusterSetOptionAddNodes(cc, CLUSTER_NODE);
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterConnect2(cc);
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "SET foo one");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "SET bar two");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "GET foo");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "GET bar");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "SUNION a b");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
redisReply *reply;
redisClusterGetReply(cc, (void *)&reply); // reply for: SET foo one
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply); // reply for: SET bar two
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply); // reply for: GET foo
CHECK_REPLY_STR(cc, reply, "one");
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply); // reply for: GET bar
CHECK_REPLY_STR(cc, reply, "two");
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply); // reply for: SUNION a b
CHECK_REPLY_ERROR(cc, reply, "CROSSSLOT");
freeReplyObject(reply);
redisClusterFree(cc);
}
// Test of pipelines containing multi-node commands
void test_pipeline_with_multinode_commands(void) {
redisClusterContext *cc = redisClusterContextInit();
assert(cc);
int status;
status = redisClusterSetOptionAddNodes(cc, CLUSTER_NODE);
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterConnect2(cc);
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "MSET key1 Hello key2 World key3 !");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
status = redisClusterAppendCommand(cc, "MGET key1 key2 key3");
ASSERT_MSG(status == REDIS_OK, cc->errstr);
redisReply *reply;
redisClusterGetReply(cc, (void *)&reply);
CHECK_REPLY_OK(cc, reply);
freeReplyObject(reply);
redisClusterGetReply(cc, (void *)&reply);
CHECK_REPLY_ARRAY(cc, reply, 3);
CHECK_REPLY_STR(cc, reply->element[0], "Hello");
CHECK_REPLY_STR(cc, reply->element[1], "World");
CHECK_REPLY_STR(cc, reply->element[2], "!");
freeReplyObject(reply);
redisClusterFree(cc);
}
//------------------------------------------------------------------------------
// Async API
//------------------------------------------------------------------------------
typedef struct ExpectedResult {
int type;
char *str;
bool disconnect;
} ExpectedResult;
// Callback for Redis connects and disconnects
void callbackExpectOk(const redisAsyncContext *ac, int status) {
UNUSED(ac);
assert(status == REDIS_OK);
}
// Callback for async commands, verifies the redisReply
void commandCallback(redisClusterAsyncContext *cc, void *r, void *privdata) {
redisReply *reply = (redisReply *)r;
ExpectedResult *expect = (ExpectedResult *)privdata;
assert(reply != NULL);
assert(reply->type == expect->type);
assert(strcmp(reply->str, expect->str) == 0);
if (expect->disconnect) {
redisClusterAsyncDisconnect(cc);
}
}
// Test of two pipelines using async API
// In an asynchronous context, commands are automatically pipelined due to the
// nature of an event loop. Therefore, unlike the synchronous API, there is only
// a single way to send commands.
void test_async_pipeline(void) {
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
redisClusterAsyncSetConnectCallback(acc, callbackExpectOk);
redisClusterAsyncSetDisconnectCallback(acc, callbackExpectOk);
redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
int status;
status = redisClusterConnect2(acc->cc);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);
ExpectedResult r1 = {.type = REDIS_REPLY_STATUS, .str = "OK"};
status = redisClusterAsyncCommand(acc, commandCallback, &r1, "SET foo six");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
ExpectedResult r2 = {.type = REDIS_REPLY_STATUS, .str = "OK"};
status = redisClusterAsyncCommand(acc, commandCallback, &r2, "SET bar ten");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
ExpectedResult r3 = {.type = REDIS_REPLY_STRING, .str = "six"};
status = redisClusterAsyncCommand(acc, commandCallback, &r3, "GET foo");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
ExpectedResult r4 = {.type = REDIS_REPLY_STRING, .str = "ten"};
status = redisClusterAsyncCommand(acc, commandCallback, &r4, "GET bar");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
ExpectedResult r5 = {
.type = REDIS_REPLY_ERROR,
.str = "CROSSSLOT Keys in request don't hash to the same slot",
.disconnect = true};
status = redisClusterAsyncCommand(acc, commandCallback, &r5, "SUNION a b");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
event_base_dispatch(base);
redisClusterAsyncFree(acc);
event_base_free(base);
}
int main(void) {
test_pipeline();
test_pipeline_with_multinode_commands();
test_async_pipeline();
// Asynchronous API does not support multi-key commands
return 0;
}