/*
 * This program connects to a Redis node and then reads commands from stdin, such
 * as "SET foo bar", one per line and prints the results to stdout.
 *
 * The behaviour is similar to that of clusterclient_async.c, but it sends the
 * next command after receiving a reply from the previous command. It also works
 * for standalone Redis nodes (without cluster mode), and uses the
 * redisClusterAsyncCommandToNode function to send the command to the first node.
 * If it receives any I/O error, the program performs a reconnect.
 */

#include "adapters/libevent.h"
#include "hircluster.h"
#include "test_utils.h"
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

/* Unfortunately there is no error code for this error to match */
#define REDIS_ENOCLUSTER "ERR This instance has cluster support disabled"

void sendNextCommand(evutil_socket_t, short, void *);

void connectToRedis(redisClusterAsyncContext *acc) {
    /* reset Redis context in case of reconnect */
    redisClusterAsyncDisconnect(acc);

    int status = redisClusterConnect2(acc->cc);
    if (status == REDIS_OK) {
        // cluster mode
    } else if (acc->cc->err && strcmp(acc->cc->errstr, REDIS_ENOCLUSTER) == 0) {
        printf("[no cluster]\n");
        acc->cc->err = 0;
        memset(acc->cc->errstr, '\0', strlen(acc->cc->errstr));
    } else {
        printf("Connect error: %s\n", acc->cc->errstr);
        exit(-1);
    }
}

void replyCallback(redisClusterAsyncContext *acc, void *r, void *privdata) {
    UNUSED(privdata);
    redisReply *reply = (redisReply *)r;

    if (reply == NULL) {
        if (acc->err == REDIS_ERR_IO || acc->err == REDIS_ERR_EOF) {
            printf("[reconnect]\n");
            connectToRedis(acc);
        } else if (acc->err) {
            printf("error: %s\n", acc->errstr);
        } else {
            printf("unknown error\n");
        }
    } else {
        printf("%s\n", reply->str);
    }

    // schedule reading from stdin and sending next command
    event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);
}

void sendNextCommand(evutil_socket_t fd, short kind, void *arg) {
    UNUSED(fd);
    UNUSED(kind);
    redisClusterAsyncContext *acc = arg;

    char command[256];
    if (fgets(command, 256, stdin)) {
        size_t len = strlen(command);
        if (command[len - 1] == '\n') // Chop trailing line break
            command[len - 1] = '\0';

        dictIterator di;
        dictInitIterator(&di, acc->cc->nodes);

        dictEntry *de = dictNext(&di);
        assert(de);
        redisClusterNode *node = dictGetEntryVal(de);
        assert(node);

        // coverity[tainted_scalar]
        int status = redisClusterAsyncCommandToNode(acc, node, replyCallback,
                                                    NULL, command);
        ASSERT_MSG(status == REDIS_OK, acc->errstr);
    } else {
        // disconnect if nothing is left to read from stdin
        redisClusterAsyncDisconnect(acc);
    }
}

int main(int argc, char **argv) {
    if (argc <= 1) {
        fprintf(stderr, "Usage: %s HOST:PORT\n", argv[0]);
        exit(1);
    }
    const char *initnode = argv[1];

    redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
    assert(acc);
    redisClusterSetOptionAddNodes(acc->cc, initnode);
    redisClusterSetOptionRouteUseSlots(acc->cc);

    struct event_base *base = event_base_new();
    int status = redisClusterLibeventAttach(acc, base);
    assert(status == REDIS_OK);

    connectToRedis(acc);
    // schedule reading from stdin and sending next command
    event_base_once(acc->adapter, -1, EV_TIMEOUT, sendNextCommand, acc, NULL);

    event_base_dispatch(base);

    redisClusterAsyncFree(acc);
    event_base_free(base);
    return 0;
}