#ifdef __cplusplus
extern "C" {
#endif
#define PERL_NO_GET_CONTEXT /* we want efficiency */
#include <EXTERN.h>
#include <perl.h>
#include <XSUB.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include "adapters/libevent.h"
#include "hiredis_cluster/hircluster.h"
#ifdef __cplusplus
} /* extern "C" */
#endif
#define NEED_newRV_noinc
#define NEED_my_strlcpy
#include "ppport.h"
#define ONE_SECOND_TO_MICRO 1000000
#define NANO_SECOND_TO_MICRO 1000
#define MIN_ATTEMPT_TO_GET_RESULT 2
/* libevent adapter priority configuration
Uses 2 priority levels to ensure I/O events are processed before timeouts:
- Priority 0: I/O events (Redis responses) - highest priority
- Priority 1: Timer events (timeouts) - lower priority
EVENT_BASE_PRIORITY_NUMBER sets the total priorities for event_base_priority_init() */
#define EVENT_BASE_PRIORITY_NUMBER 2
#define DEBUG_MSG(fmt, ...) \
if (self->debug) { \
fprintf(stderr, "[%d][%d][%s:%d:%s]: ", getpid(), getppid(), __FILE__, __LINE__, __func__); \
fprintf(stderr, fmt, __VA_ARGS__); \
fprintf(stderr, "\n"); \
}
#define DEBUG_EVENT_BASE() \
if (self->debug) { \
event_base_dump_events(self->cluster_event_base, stderr); \
}
typedef struct redis_cluster_fast_reply_s {
SV *result;
SV *error;
} redis_cluster_fast_reply_t;
typedef struct cmd_reply_context_s {
void *self;
SV *result;
SV *error;
int done;
} cmd_reply_context_t;
typedef struct cmd_reply_context_pipeline_s {
void *self;
SV *result;
SV *error;
SV *cb;
} cmd_reply_context_pipeline_t;
typedef struct redis_cluster_fast_s {
redisClusterAsyncContext *acc;
struct event_base *cluster_event_base;
char *hostnames;
int debug;
int max_retry;
int use_cluster_slots;
int event_ready;
struct timeval connect_timeout;
struct timeval command_timeout;
int64_t discovery_timeout_usec;
pid_t pid;
int64_t pipeline_callback_remain;
} redis_cluster_fast_t, *Redis__Cluster__Fast;
int64_t get_usec_timestamp(void) {
struct timespec ts;
int status;
status = clock_gettime(CLOCK_MONOTONIC, &ts);
if (status < 0) {
return -1;
}
return (int64_t) ts.tv_sec * ONE_SECOND_TO_MICRO + (int64_t) (ts.tv_nsec / NANO_SECOND_TO_MICRO);
}
static redis_cluster_fast_reply_t
Redis__Cluster__Fast_decode_reply(pTHX_ Redis__Cluster__Fast self, redisReply *reply) {
redis_cluster_fast_reply_t res = {NULL, NULL};
switch (reply->type) {
case REDIS_REPLY_ERROR:
res.error = newSVpvn(reply->str, reply->len);
break;
case REDIS_REPLY_BIGNUM:
case REDIS_REPLY_DOUBLE:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
case REDIS_REPLY_VERB:
res.result = newSVpvn(reply->str, reply->len);
break;
case REDIS_REPLY_INTEGER:
case REDIS_REPLY_BOOL:
res.result = newSViv(reply->integer);
break;
case REDIS_REPLY_NIL:
res.result = &PL_sv_undef;
break;
case REDIS_REPLY_MAP:
case REDIS_REPLY_SET:
case REDIS_REPLY_ATTR: {
size_t i;
char *key;
HV *hv = newHV();
res.result = newRV_noinc((SV *) hv);
for (i = 0; i < reply->elements; i++) {
if (i % 2 == 0) {
key = reply->element[i]->str;
} else {
redis_cluster_fast_reply_t elem = {NULL, NULL};
elem = Redis__Cluster__Fast_decode_reply(aTHX_ self, reply->element[i]);
if (elem.result) {
hv_store(hv, key, strlen(key), SvREFCNT_inc(elem.result), 0);
} else {
hv_store(hv, key, strlen(key), newSV(0), 0);
}
if (elem.error && !res.error) {
res.error = elem.error;
}
}
}
break;
}
case REDIS_REPLY_PUSH:
case REDIS_REPLY_ARRAY: {
AV *av = newAV();
size_t i;
res.result = newRV_noinc((SV *) av);
for (i = 0; i < reply->elements; i++) {
redis_cluster_fast_reply_t elem = {NULL, NULL};
elem = Redis__Cluster__Fast_decode_reply(aTHX_ self, reply->element[i]);
if (elem.result) {
av_push(av, elem.result);
} else {
av_push(av, newSV(0));
}
if (elem.error && !res.error) {
res.error = elem.error;
}
}
break;
}
}
return res;
}
void replyCallback(redisClusterAsyncContext *cc, void *r, void *privdata) {
dTHX;
cmd_reply_context_t *reply_t;
Redis__Cluster__Fast self;
redisReply *reply;
reply_t = (cmd_reply_context_t *) privdata;
self = (Redis__Cluster__Fast) reply_t->self;
DEBUG_MSG("replycb %s", "start");
reply = (redisReply *) r;
if (reply) {
redis_cluster_fast_reply_t res;
res = Redis__Cluster__Fast_decode_reply(aTHX_ self, reply);
reply_t->result = res.result;
reply_t->error = res.error;
} else {
DEBUG_MSG("error: err=%d errstr=%s", cc->err, cc->errstr);
reply_t->error = newSVpvf("%s", cc->errstr);
}
reply_t->done = 1;
}
void replyCallbackPipeline(redisClusterAsyncContext *cc, void *r, void *privdata) {
dTHX;
cmd_reply_context_pipeline_t *reply_pipeline_t;
Redis__Cluster__Fast self;
redisReply *reply;
reply_pipeline_t = (cmd_reply_context_pipeline_t *) privdata;
self = (Redis__Cluster__Fast) reply_pipeline_t->self;
DEBUG_MSG("replycb pipeline %s", "start");
reply = (redisReply *) r;
if (reply) {
redis_cluster_fast_reply_t res;
res = Redis__Cluster__Fast_decode_reply(aTHX_ self, reply);
reply_pipeline_t->result = res.result;
reply_pipeline_t->error = res.error;
} else {
DEBUG_MSG("error: err=%d errstr=%s", cc->err, cc->errstr);
reply_pipeline_t->error = newSVpvf("%s", cc->errstr);
}
{
dSP;
ENTER;
SAVETMPS;
PUSHMARK(SP);
EXTEND(SP, 2);
PUSHs(reply_pipeline_t->result ? sv_2mortal(reply_pipeline_t->result) : &PL_sv_undef);
PUSHs(reply_pipeline_t->error ? sv_2mortal(reply_pipeline_t->error) : &PL_sv_undef);
PUTBACK;
call_sv(reply_pipeline_t->cb, G_DISCARD);
FREETMPS;
LEAVE;
}
SvREFCNT_dec(reply_pipeline_t->cb);
Safefree(reply_pipeline_t);
self->pipeline_callback_remain--;
DEBUG_MSG("pipeline callback remain: %ld", self->pipeline_callback_remain);
}
void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
Redis__Cluster__Fast self = (Redis__Cluster__Fast) privdata;
DEBUG_MSG("event: %d", event);
if (event == HIRCLUSTER_EVENT_READY) {
self->event_ready = 1;
}
}
SV *Redis__Cluster__Fast_connect(pTHX_ Redis__Cluster__Fast self) {
DEBUG_MSG("%s", "start connect");
if (self->cluster_event_base && self->acc) {
return newSVpvf("%s", "already connected");
}
self->pipeline_callback_remain = 0;
self->pid = getpid();
self->acc = redisClusterAsyncContextInit();
if (redisClusterSetOptionAddNodes(self->acc->cc, self->hostnames) != REDIS_OK) {
return newSVpvf("failed to add nodes: %s", self->acc->cc->errstr);
}
if (redisClusterSetOptionConnectTimeout(self->acc->cc, self->connect_timeout) != REDIS_OK) {
return newSVpvf("failed to set connect timeout: %s", self->acc->cc->errstr);
}
if (redisClusterSetOptionTimeout(self->acc->cc, self->command_timeout) != REDIS_OK) {
return newSVpvf("failed to set command timeout: %s", self->acc->cc->errstr);
}
if (redisClusterSetOptionMaxRetry(self->acc->cc, self->max_retry) != REDIS_OK) {
return newSVpvf("%s", "failed to set max retry");
}
if (self->use_cluster_slots) {
DEBUG_MSG("%s", "use cluster slots");
if (redisClusterSetOptionRouteUseSlots(self->acc->cc) != REDIS_OK) {
return newSVpvf("%s", "failed to set redisClusterSetOptionRouteUseSlots");
}
}
self->cluster_event_base = event_base_new();
if (event_base_priority_init(self->cluster_event_base, EVENT_BASE_PRIORITY_NUMBER) != 0) {
return newSVpvf("%s", "failed to initialize event base priorities");
}
if (redisClusterLibeventAttach(self->acc, self->cluster_event_base) != REDIS_OK) {
return newSVpvf("%s", "failed to attach event base");
}
self->event_ready = 0;
if (redisClusterSetEventCallback(self->acc->cc, eventCallback, self) != REDIS_OK) {
return newSVpvf("%s", "failed to set event callback");
}
if (redisClusterAsyncConnect2(self->acc) != REDIS_OK) {
return newSVpvf("failed to connect async: %s", self->acc->cc->errstr);
}
DEBUG_MSG("%s", "done connect");
return NULL;
}
SV *Redis__Cluster__Fast_disconnect(pTHX_ Redis__Cluster__Fast self) {
if (self->cluster_event_base == NULL && self->acc == NULL) {
return NULL;
}
if (event_reinit(self->cluster_event_base) != 0) {
return newSVpvf("%s", "event reinit failed");
}
redisClusterAsyncDisconnect(self->acc);
if (event_base_dispatch(self->cluster_event_base) == -1) {
return newSVpvf("%s", "event_base_dispatch failed after forking");
}
event_base_free(self->cluster_event_base);
self->cluster_event_base = NULL;
redisClusterAsyncFree(self->acc);
self->acc = NULL;
return NULL;
}
SV *Redis__Cluster__Fast_wait_until_event_ready(pTHX_ Redis__Cluster__Fast self) {
int event_loop_error;
int count = 0;
int64_t timestamp_current, timeout_after;
timestamp_current = get_usec_timestamp();
if (timestamp_current < 0) {
return newSVpvf("%s", "failed to get current timestamp");
}
timeout_after = timestamp_current + self->discovery_timeout_usec;
DEBUG_MSG("%s", "start wait_until_event_ready");
while (!self->event_ready) {
DEBUG_EVENT_BASE();
if (count >= MIN_ATTEMPT_TO_GET_RESULT) {
timestamp_current = get_usec_timestamp();
if (timestamp_current < 0) {
return newSVpvf("%s", "failed to get current timestamp");
}
if (timestamp_current > timeout_after) {
return newSVpvf("%s", "Timeout. The cluster discovery timeout reached.");
}
}
event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_ONCE);
if (event_loop_error == -1) {
return newSVpvf("%s", "event_base_loop failed");
}
if (event_loop_error == 1) {
return newSVpvf("%s", "Timeout. All of the given hostnames are unreachable.");
};
count++;
}
DEBUG_MSG("%s", "done wait_until_event_ready");
return NULL;
}
cluster_node *get_node_by_random(pTHX_ Redis__Cluster__Fast self) {
cluster_node *selected;
cluster_node *candidate;
int node_count;
nodeIterator ni;
initNodeIterator(&ni, self->acc->cc);
/* Select a random node by reservoir sampling. */
node_count = 1;
if ((selected = nodeNext(&ni)) == NULL)
return NULL;
while ((candidate = nodeNext(&ni)) != NULL) {
node_count++;
if ((int) (Drand01() * node_count) == 0)
selected = candidate;
}
return selected;
}
void run_cmd_impl(pTHX_ Redis__Cluster__Fast self, int argc, const char **argv, size_t *argvlen,
cmd_reply_context_t *reply_t) {
int status, event_loop_error;
DEBUG_MSG("start: %s", *argv);
status = redisClusterAsyncCommandArgv(self->acc, replyCallback, reply_t, argc, argv, argvlen);
if (status != REDIS_OK) {
if (self->acc->err == REDIS_ERR_OTHER &&
strcmp(self->acc->errstr, "No keys in command(must have keys for redis cluster mode)") == 0) {
cluster_node *node;
DEBUG_MSG("not cluster command, fallback to CommandToNode: err=%d errstr=%s",
self->acc->err,
self->acc->errstr);
node = get_node_by_random(aTHX_ self);
if (node == NULL) {
reply_t->error = newSVpvf("%s", "No node found");
return;
}
status = redisClusterAsyncCommandArgvToNode(self->acc, node, replyCallback, reply_t, argc, argv, argvlen);
if (status != REDIS_OK) {
DEBUG_MSG("error: err=%d errstr=%s", self->acc->err, self->acc->errstr);
reply_t->error = newSVpvf("%s", self->acc->errstr);
return;
}
} else {
DEBUG_MSG("error: err=%d errstr=%s", self->acc->err, self->acc->errstr);
reply_t->error = newSVpvf("%s", self->acc->errstr);
return;
}
}
while (!reply_t->done) {
DEBUG_EVENT_BASE();
event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_ONCE);
if (event_loop_error != 0) {
reply_t->error = newSVpvf("%s %d", "event_base_loop failed", event_loop_error);
break;
}
}
}
void run_cmd_impl_pipeline(pTHX_ Redis__Cluster__Fast self, int argc, const char **argv, size_t *argvlen,
cmd_reply_context_t *reply_t, SV *cb) {
int status;
/* In pipeline mode, the callback are executed later, so it is necessary to create a dedicated reply context.
This context is freed when the callback is executed. */
cmd_reply_context_pipeline_t *reply_pipeline_t;
Newx(reply_pipeline_t, sizeof(cmd_reply_context_pipeline_t), cmd_reply_context_pipeline_t);
reply_pipeline_t->self = (void *) self;
reply_pipeline_t->result = NULL;
reply_pipeline_t->error = NULL;
reply_pipeline_t->cb = SvREFCNT_inc(cb);
DEBUG_MSG("start pipeline: %s", *argv);
status = redisClusterAsyncCommandArgv(self->acc, replyCallbackPipeline, reply_pipeline_t, argc, argv, argvlen);
if (status != REDIS_OK) {
if (self->acc->err == REDIS_ERR_OTHER &&
strcmp(self->acc->errstr, "No keys in command(must have keys for redis cluster mode)") == 0) {
cluster_node *node;
DEBUG_MSG("not cluster command, fallback to CommandToNode: err=%d errstr=%s",
self->acc->err,
self->acc->errstr);
node = get_node_by_random(aTHX_ self);
if (node == NULL) {
reply_t->error = newSVpvf("%s", "No node found");
goto error;
}
status = redisClusterAsyncCommandArgvToNode(self->acc, node, replyCallbackPipeline, reply_pipeline_t, argc, argv, argvlen);
if (status != REDIS_OK) {
DEBUG_MSG("error: err=%d errstr=%s", self->acc->err, self->acc->errstr);
reply_t->error = newSVpvf("%s", self->acc->errstr);
goto error;
}
} else {
DEBUG_MSG("error: err=%d errstr=%s", self->acc->err, self->acc->errstr);
reply_t->error = newSVpvf("%s", self->acc->errstr);
goto error;
}
}
self->pipeline_callback_remain++;
DEBUG_MSG("pipeline callback remain: %ld", self->pipeline_callback_remain);
return;
error:
SvREFCNT_dec(reply_pipeline_t->cb);
Safefree(reply_pipeline_t);
}
int Redis__Cluster__Fast_run_event_loop(pTHX_ Redis__Cluster__Fast self) {
int event_loop_error;
if (self->pipeline_callback_remain <= 0) {
return 0;
}
DEBUG_EVENT_BASE();
event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_NONBLOCK);
if (event_loop_error != 0) {
return -1;
}
return 1;
}
int Redis__Cluster__Fast_wait_one_response(pTHX_ Redis__Cluster__Fast self) {
int event_loop_error;
int64_t callback_remain_current = self->pipeline_callback_remain;
if (callback_remain_current <= 0) {
return 0;
}
while (self->pipeline_callback_remain == callback_remain_current) {
DEBUG_EVENT_BASE();
event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_ONCE);
if (event_loop_error != 0) {
return -1;
}
}
return 1;
}
int Redis__Cluster__Fast_wait_all_responses(pTHX_ Redis__Cluster__Fast self) {
int event_loop_error;
if (self->pipeline_callback_remain <= 0) {
return 0;
}
while (self->pipeline_callback_remain > 0) {
DEBUG_EVENT_BASE();
event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_ONCE);
if (event_loop_error != 0) {
return -1;
}
}
return 1;
}
void Redis__Cluster__Fast_run_cmd(pTHX_ Redis__Cluster__Fast self, int argc, const char **argv, size_t *argvlen,
cmd_reply_context_t *reply_t, SV *cb) {
reply_t->self = (void *) self;
reply_t->result = NULL;
reply_t->error = NULL;
reply_t->done = 0;
if (self->pid != getpid()) {
DEBUG_MSG("%s", "pid changed");
reply_t->error = Redis__Cluster__Fast_disconnect(aTHX_ self);
if (reply_t->error) {
return;
}
reply_t->error = Redis__Cluster__Fast_connect(aTHX_ self);
if (reply_t->error) {
return;
}
reply_t->error = Redis__Cluster__Fast_wait_until_event_ready(aTHX_ self);
if (reply_t->error) {
return;
}
}
if (cb) {
run_cmd_impl_pipeline(aTHX_ self, argc, argv, argvlen, reply_t, cb);
} else {
run_cmd_impl(aTHX_ self, argc, argv, argvlen, reply_t);
}
}
MODULE = Redis::Cluster::Fast PACKAGE = Redis::Cluster::Fast
PROTOTYPES: DISABLE
Redis::Cluster::Fast
_new(char* cls);
PREINIT:
redis_cluster_fast_t* self;
CODE:
Newxz(self, sizeof(redis_cluster_fast_t), redis_cluster_fast_t);
RETVAL = self;
OUTPUT:
RETVAL
void
__srandom(char *cls, unsigned int seed_value)
CODE:
srandom(seed_value);
int
__set_debug(Redis::Cluster::Fast self, int val)
CODE:
DEBUG_MSG("%s", "DEBUG true");
RETVAL = self->debug = val;
OUTPUT:
RETVAL
void
__set_servers(Redis::Cluster::Fast self, char* hostnames)
CODE:
if (self->hostnames) {
Safefree(self->hostnames);
self->hostnames = NULL;
}
if (hostnames) {
Newx(self->hostnames, strlen(hostnames) + 1, char);
my_strlcpy(self->hostnames, hostnames, strlen(hostnames) + 1);
DEBUG_MSG("%s %s", "set hostnames", self->hostnames);
}
void
__set_connect_timeout(Redis::Cluster::Fast self, double double_sec)
PREINIT:
int second, micro_second;
struct timeval timeout;
CODE:
second = (int) (double_sec);
micro_second = (int) (fmod(double_sec * ONE_SECOND_TO_MICRO, ONE_SECOND_TO_MICRO) + 0.999);
timeout.tv_sec = second;
timeout.tv_usec = micro_second;
self->connect_timeout = timeout;
DEBUG_MSG("connect timeout %d, %d", second, micro_second);
void
__set_command_timeout(Redis::Cluster::Fast self, double double_sec)
PREINIT:
int second, micro_second;
struct timeval timeout;
CODE:
second = (int) (double_sec);
micro_second = (int) (fmod(double_sec * ONE_SECOND_TO_MICRO, ONE_SECOND_TO_MICRO) + 0.999);
timeout.tv_sec = second;
timeout.tv_usec = micro_second;
self->command_timeout = timeout;
DEBUG_MSG("command timeout %d, %d", second, micro_second);
void
__set_max_retry(Redis::Cluster::Fast self, int max_retry)
CODE:
self->max_retry = max_retry;
DEBUG_MSG("max_retry %d", max_retry);
void
__set_route_use_slots(Redis::Cluster::Fast self, int use_slot)
CODE:
self->use_cluster_slots = use_slot;
void
__set_cluster_discovery_retry_timeout(Redis::Cluster::Fast self, double double_sec)
CODE:
self->discovery_timeout_usec = (int64_t) (double_sec * ONE_SECOND_TO_MICRO);
DEBUG_MSG("discovery timeout %ld", self->discovery_timeout_usec);
SV*
__connect(Redis::Cluster::Fast self)
CODE:
RETVAL = Redis__Cluster__Fast_connect(aTHX_ self);
if (RETVAL == NULL) {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
SV*
__disconnect(Redis::Cluster::Fast self)
CODE:
RETVAL = Redis__Cluster__Fast_disconnect(aTHX_ self);
if (RETVAL == NULL) {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
SV*
__wait_until_event_ready(Redis::Cluster::Fast self)
CODE:
RETVAL = Redis__Cluster__Fast_wait_until_event_ready(aTHX_ self);
if (RETVAL == NULL) {
RETVAL = &PL_sv_undef;
}
OUTPUT:
RETVAL
void
__std_cmd(Redis::Cluster::Fast self, ...)
PREINIT:
cmd_reply_context_t* result_context;
char** argv;
size_t* argvlen;
STRLEN len;
int argc, i;
SV* cb;
PPCODE:
if (!self->acc) {
croak("Not connected to any server");
}
cb = ST(items - 1);
if (SvROK(cb) && SvTYPE(SvRV(cb)) == SVt_PVCV) {
argc = items - 2;
} else {
cb = NULL;
argc = items - 1;
}
Newx(argv, sizeof(char*) * argc, char*);
Newx(argvlen, sizeof(size_t) * argc, size_t);
Newx(result_context, sizeof(cmd_reply_context_t), cmd_reply_context_t);
for (i = 0; i < argc; i++) {
argv[i] = SvPV(ST(i + 1), len);
argvlen[i] = len;
}
Redis__Cluster__Fast_run_cmd(aTHX_ self, argc, (const char **) argv, argvlen, result_context, cb);
ST(0) = result_context->result ?
sv_2mortal(result_context->result) : &PL_sv_undef;
ST(1) = result_context->error ?
sv_2mortal(result_context->error) : &PL_sv_undef;
Safefree(argv);
Safefree(argvlen);
Safefree(result_context);
XSRETURN(2);
int
__run_event_loop(Redis::Cluster::Fast self)
CODE:
RETVAL = Redis__Cluster__Fast_run_event_loop(aTHX_ self);
OUTPUT:
RETVAL
int
__wait_one_response(Redis::Cluster::Fast self)
CODE:
RETVAL = Redis__Cluster__Fast_wait_one_response(aTHX_ self);
OUTPUT:
RETVAL
int
__wait_all_responses(Redis::Cluster::Fast self)
CODE:
RETVAL = Redis__Cluster__Fast_wait_all_responses(aTHX_ self);
OUTPUT:
RETVAL
void
DESTROY(Redis::Cluster::Fast self)
CODE:
if (self->cluster_event_base) {
DEBUG_MSG("%s", "trying to free event_base");
if ((self->pid == getpid()) || (event_reinit(self->cluster_event_base) == 0)){
redisClusterAsyncDisconnect(self->acc);
if (event_base_dispatch(self->cluster_event_base) == -1) {
warn("event_base_dispatch failed.");
}
event_base_free(self->cluster_event_base);
self->cluster_event_base = NULL;
} else {
warn("event_reinit failed. Skip disconnecting and freeing event_base on destruction");
}
}
redisClusterAsyncFree(self->acc);
self->acc = NULL;
if (self->hostnames) {
DEBUG_MSG("%s", "free hostnames");
Safefree(self->hostnames);
self->hostnames = NULL;
}
DEBUG_MSG("%s", "done");
Safefree(self);