/*
* This program connects to a cluster and then reads commands from stdin, such
* as "SET foo bar", one per line and prints the results to stdout.
*
* The behaviour is the same as that of clusterclient.c, but the asynchronous
* API of the library is used rather than the synchronous API.
* The following action commands can alter the default behaviour:
*
* !async - Send multiple commands and then wait for their responses.
* Will send all following commands until EOF or the command `!sync`
*
* !sync - Send a single command and wait for its response before sending next
* command. This is the default behaviour.
*
* !resend - Resend a failed command from its reply callback.
* Will resend all following failed commands until EOF.
*
* !sleep - Sleep a second. Can be used to allow timers to timeout.
* Currently not supported while in !async mode.
*
* !all - Send each command to all nodes in the cluster.
* Will send following commands using the `..ToNode()` API and a
* cluster node iterator to send each command to all known nodes.
*
* !disconnect - Disconnect the client.
*
* An example input of first sending 2 commands and waiting for their responses,
* before sending a single command and waiting for its response:
*
* !async
* SET dual-1 command
* SET dual-2 command
* !sync
* SET single command
*
*/
#include "adapters/libevent.h"
#include "hircluster.h"
#include "test_utils.h"
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define CMD_SIZE 256
#define HISTORY_DEPTH 16
char cmd_history[HISTORY_DEPTH][CMD_SIZE];
int num_running = 0;
int resend_failed_cmd = 0;
int send_to_all = 0;
void sendNextCommand(evutil_socket_t, short, void *);
void printReply(const redisReply *reply) {
switch (reply->type) {
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
case REDIS_REPLY_VERB:
case REDIS_REPLY_BIGNUM:
printf("%s\n", reply->str);
break;
case REDIS_REPLY_INTEGER:
printf("%lld\n", reply->integer);
break;
default:
printf("Unhandled reply type: %d\n", reply->type);
}
}
void replyCallback(redisClusterAsyncContext *acc, void *r, void *privdata) {
redisReply *reply = (redisReply *)r;
intptr_t cmd_id = (intptr_t)privdata; /* Id to corresponding cmd */
if (reply == NULL) {
if (acc->err) {
printf("error: %s\n", acc->errstr);
} else {
printf("unknown error\n");
}
if (resend_failed_cmd) {
printf("resend '%s'\n", cmd_history[cmd_id]);
if (redisClusterAsyncCommand(acc, replyCallback, (void *)cmd_id,
cmd_history[cmd_id]) != REDIS_OK)
printf("send error\n");
return;
}
} else {
printReply(reply);
}
if (--num_running == 0) {
/* Schedule a read from stdin and send next command */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc,
NULL);
}
}
void sendNextCommand(evutil_socket_t fd, short kind, void *arg) {
UNUSED(fd);
UNUSED(kind);
redisClusterAsyncContext *acc = arg;
int async = 0;
char cmd[CMD_SIZE];
while (fgets(cmd, CMD_SIZE, stdin)) {
size_t len = strlen(cmd);
if (cmd[len - 1] == '\n') /* Chop trailing line break */
cmd[len - 1] = '\0';
if (cmd[0] == '\0') /* Skip empty lines */
continue;
if (cmd[0] == '#') /* Skip comments */
continue;
if (cmd[0] == '!') {
if (strcmp(cmd, "!sleep") == 0) {
ASSERT_MSG(async == 0, "!sleep in !async not supported");
struct timeval timeout = {1, 0};
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand,
acc, &timeout);
return;
}
if (strcmp(cmd, "!async") == 0) /* Enable async send */
async = 1;
if (strcmp(cmd, "!sync") == 0) { /* Disable async send */
if (async)
return; /* We are done sending commands */
}
if (strcmp(cmd, "!resend") == 0) /* Enable resend of failed cmd */
resend_failed_cmd = 1;
if (strcmp(cmd, "!all") == 0) { /* Enable send to all nodes */
ASSERT_MSG(resend_failed_cmd == 0,
"!all in !resend not supported");
send_to_all = 1;
}
if (strcmp(cmd, "!disconnect") == 0)
redisClusterAsyncDisconnect(acc);
continue; /* Skip line */
}
/* Copy command string to history buffer */
assert(num_running < HISTORY_DEPTH);
strcpy(cmd_history[num_running], cmd);
if (send_to_all) {
nodeIterator ni;
initNodeIterator(&ni, acc->cc);
redisClusterNode *node;
while ((node = nodeNext(&ni)) != NULL) {
int status = redisClusterAsyncCommandToNode(
acc, node, replyCallback, (void *)((intptr_t)num_running),
cmd);
ASSERT_MSG(status == REDIS_OK, acc->errstr);
num_running++;
}
} else {
int status = redisClusterAsyncCommand(
acc, replyCallback, (void *)((intptr_t)num_running), cmd);
if (status == REDIS_OK) {
num_running++;
} else {
printf("error: %s\n", acc->errstr);
/* Schedule a read from stdin and handle next command. */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand,
acc, NULL);
}
}
if (async)
continue; /* Send next command as well */
return;
}
/* Disconnect if nothing is left to read from stdin */
redisClusterAsyncDisconnect(acc);
}
void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
(void)cc;
(void)privdata;
char *e = NULL;
switch (event) {
case HIRCLUSTER_EVENT_SLOTMAP_UPDATED:
e = "slotmap-updated";
break;
case HIRCLUSTER_EVENT_READY:
e = "ready";
break;
case HIRCLUSTER_EVENT_FREE_CONTEXT:
e = "free-context";
break;
default:
e = "unknown";
}
printf("Event: %s\n", e);
}
void connectCallback(const redisAsyncContext *ac, int status) {
char *s = "";
if (status != REDIS_OK)
s = "failed to ";
printf("Event: %sconnect to %s:%d\n", s, ac->c.tcp.host, ac->c.tcp.port);
}
void disconnectCallback(const redisAsyncContext *ac, int status) {
char *s = "";
if (status != REDIS_OK)
s = "failed to ";
printf("Event: %sdisconnect from %s:%d\n", s, ac->c.tcp.host,
ac->c.tcp.port);
}
int main(int argc, char **argv) {
int use_cluster_slots = 1; // Get topology via CLUSTER SLOTS
int show_events = 0;
int show_connection_events = 0;
int optind;
for (optind = 1; optind < argc && argv[optind][0] == '-'; optind++) {
if (strcmp(argv[optind], "--use-cluster-nodes") == 0) {
use_cluster_slots = 0; // Use the default CLUSTER NODES instead
} else if (strcmp(argv[optind], "--events") == 0) {
show_events = 1;
} else if (strcmp(argv[optind], "--connection-events") == 0) {
show_connection_events = 1;
} else {
fprintf(stderr, "Unknown argument: '%s'\n", argv[optind]);
}
}
if (optind >= argc) {
fprintf(stderr,
"Usage: clusterclient_async [--use-cluster-nodes] HOST:PORT\n");
exit(1);
}
const char *initnode = argv[optind];
struct timeval timeout = {0, 500000};
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
redisClusterSetOptionAddNodes(acc->cc, initnode);
redisClusterSetOptionTimeout(acc->cc, timeout);
redisClusterSetOptionConnectTimeout(acc->cc, timeout);
redisClusterSetOptionMaxRetry(acc->cc, 1);
if (use_cluster_slots) {
redisClusterSetOptionRouteUseSlots(acc->cc);
}
if (show_events) {
redisClusterSetEventCallback(acc->cc, eventCallback, NULL);
}
if (show_connection_events) {
redisClusterAsyncSetConnectCallback(acc, connectCallback);
redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
}
if (redisClusterConnect2(acc->cc) != REDIS_OK) {
printf("Connect error: %s\n", acc->cc->errstr);
exit(2);
}
struct event_base *base = event_base_new();
int status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);
/* Schedule a read from stdin and send next command */
event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
event_base_dispatch(base);
redisClusterAsyncFree(acc);
event_base_free(base);
return 0;
}