#include "fmacros.h"
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <assert.h>
#include <ctype.h>
#include <errno.h>
#include "async.h"
#include "net.h"
#include "dict.c"
#include "sds.h"
#define _EL_ADD_READ(ctx) do { \
if
((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
}
while
(0)
#define _EL_DEL_READ(ctx) do { \
if
((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
}
while
(0)
#define _EL_ADD_WRITE(ctx) do { \
if
((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
}
while
(0)
#define _EL_DEL_WRITE(ctx) do { \
if
((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
}
while
(0)
#define _EL_CLEANUP(ctx) do { \
if
((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
}
while
(0);
void
__redisAppendCommand(redisContext *c,
char
*cmd,
size_t
len);
static
unsigned
int
callbackHash(
const
void
*key) {
return
dictGenHashFunction((unsigned
char
*)key,sdslen((
char
*)key));
}
static
void
*callbackValDup(
void
*privdata,
const
void
*src) {
((
void
) privdata);
redisCallback *dup =
malloc
(
sizeof
(*dup));
memcpy
(dup,src,
sizeof
(*dup));
return
dup;
}
static
int
callbackKeyCompare(
void
*privdata,
const
void
*key1,
const
void
*key2) {
int
l1, l2;
((
void
) privdata);
l1 = sdslen((sds)key1);
l2 = sdslen((sds)key2);
if
(l1 != l2)
return
0;
return
memcmp
(key1,key2,l1) == 0;
}
static
void
callbackKeyDestructor(
void
*privdata,
void
*key) {
((
void
) privdata);
sdsfree((sds)key);
}
static
void
callbackValDestructor(
void
*privdata,
void
*val) {
((
void
) privdata);
free
(val);
}
static
dictType callbackDict = {
callbackHash,
NULL,
callbackValDup,
callbackKeyCompare,
callbackKeyDestructor,
callbackValDestructor
};
static
redisAsyncContext *redisAsyncInitialize(redisContext *c) {
redisAsyncContext *ac =
realloc
(c,
sizeof
(redisAsyncContext));
c = &(ac->c);
c->flags &= ~REDIS_CONNECTED;
ac->err = 0;
ac->errstr = NULL;
ac->data = NULL;
ac->ev.data = NULL;
ac->ev.addRead = NULL;
ac->ev.delRead = NULL;
ac->ev.addWrite = NULL;
ac->ev.delWrite = NULL;
ac->ev.cleanup = NULL;
ac->onConnect = NULL;
ac->onDisconnect = NULL;
ac->replies.head = NULL;
ac->replies.tail = NULL;
ac->sub.invalid.head = NULL;
ac->sub.invalid.tail = NULL;
ac->sub.channels = dictCreate(&callbackDict,NULL);
ac->sub.patterns = dictCreate(&callbackDict,NULL);
return
ac;
}
static
void
__redisAsyncCopyError(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
ac->err = c->err;
ac->errstr = c->errstr;
}
redisAsyncContext *redisAsyncConnect(
const
char
*ip,
int
port) {
redisContext *c = redisConnectNonBlock(ip,port);
redisAsyncContext *ac = redisAsyncInitialize(c);
__redisAsyncCopyError(ac);
return
ac;
}
redisAsyncContext *redisAsyncConnectUnix(
const
char
*path) {
redisContext *c = redisConnectUnixNonBlock(path);
redisAsyncContext *ac = redisAsyncInitialize(c);
__redisAsyncCopyError(ac);
return
ac;
}
int
redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
if
(ac->onConnect == NULL) {
ac->onConnect = fn;
_EL_ADD_WRITE(ac);
return
REDIS_OK;
}
return
REDIS_ERR;
}
int
redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
if
(ac->onDisconnect == NULL) {
ac->onDisconnect = fn;
return
REDIS_OK;
}
return
REDIS_ERR;
}
static
int
__redisPushCallback(redisCallbackList *list, redisCallback *source) {
redisCallback *cb;
cb =
malloc
(
sizeof
(*cb));
if
(source != NULL) {
memcpy
(cb,source,
sizeof
(*cb));
cb->next = NULL;
}
if
(list->head == NULL)
list->head = cb;
if
(list->tail != NULL)
list->tail->next = cb;
list->tail = cb;
return
REDIS_OK;
}
static
int
__redisShiftCallback(redisCallbackList *list, redisCallback *target) {
redisCallback *cb = list->head;
if
(cb != NULL) {
list->head = cb->next;
if
(cb == list->tail)
list->tail = NULL;
if
(target != NULL)
memcpy
(target,cb,
sizeof
(*cb));
free
(cb);
return
REDIS_OK;
}
return
REDIS_ERR;
}
static
void
__redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
redisContext *c = &(ac->c);
if
(cb->fn != NULL) {
c->flags |= REDIS_IN_CALLBACK;
cb->fn(ac,reply,cb->privdata);
c->flags &= ~REDIS_IN_CALLBACK;
}
}
static
void
__redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb;
dictIterator *it;
dictEntry *de;
while
(__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);
while
(__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);
it = dictGetIterator(ac->sub.channels);
while
((de = dictNext(it)) != NULL)
__redisRunCallback(ac,dictGetEntryVal(de),NULL);
dictReleaseIterator(it);
dictRelease(ac->sub.channels);
it = dictGetIterator(ac->sub.patterns);
while
((de = dictNext(it)) != NULL)
__redisRunCallback(ac,dictGetEntryVal(de),NULL);
dictReleaseIterator(it);
dictRelease(ac->sub.patterns);
_EL_CLEANUP(ac);
if
(ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
if
(c->flags & REDIS_FREEING) {
ac->onDisconnect(ac,REDIS_OK);
}
else
{
ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
}
}
redisFree(c);
}
void
redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
c->flags |= REDIS_FREEING;
if
(!(c->flags & REDIS_IN_CALLBACK))
__redisAsyncFree(ac);
}
static
void
__redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
__redisAsyncCopyError(ac);
if
(ac->err == 0) {
assert
(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR);
}
else
{
c->flags |= REDIS_DISCONNECTING;
}
__redisAsyncFree(ac);
}
void
redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
c->flags |= REDIS_DISCONNECTING;
if
(!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
__redisAsyncDisconnect(ac);
}
static
int
__redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
redisContext *c = &(ac->c);
dict *callbacks;
dictEntry *de;
int
pvariant;
char
*stype;
sds sname;
if
(reply->type == REDIS_REPLY_ARRAY) {
assert
(reply->elements >= 2);
assert
(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str;
pvariant = (
tolower
(stype[0]) ==
'p'
) ? 1 : 0;
if
(pvariant)
callbacks = ac->sub.patterns;
else
callbacks = ac->sub.channels;
assert
(reply->element[1]->type == REDIS_REPLY_STRING);
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
de = dictFind(callbacks,sname);
if
(de != NULL) {
memcpy
(dstcb,dictGetEntryVal(de),
sizeof
(*dstcb));
if
(strcasecmp(stype+pvariant,
"unsubscribe"
) == 0) {
dictDelete(callbacks,sname);
assert
(reply->element[2]->type == REDIS_REPLY_INTEGER);
if
(reply->element[2]->integer == 0)
c->flags &= ~REDIS_SUBSCRIBED;
}
}
sdsfree(sname);
}
else
{
__redisShiftCallback(&ac->sub.invalid,dstcb);
}
return
REDIS_OK;
}
void
redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb;
void
*reply = NULL;
int
status;
while
((status = redisGetReply(c,&reply)) == REDIS_OK) {
if
(reply == NULL) {
if
(c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) {
__redisAsyncDisconnect(ac);
return
;
}
if
(c->flags & REDIS_MONITORING) {
__redisPushCallback(&ac->replies,&cb);
}
break
;
}
if
(__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
if
(((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
c->err = REDIS_ERR_OTHER;
snprintf(c->errstr,
sizeof
(c->errstr),
"%s"
,((redisReply*)reply)->str);
__redisAsyncDisconnect(ac);
return
;
}
assert
((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
if
(c->flags & REDIS_SUBSCRIBED)
__redisGetSubscribeCallback(ac,reply,&cb);
}
if
(cb.fn != NULL) {
__redisRunCallback(ac,&cb,reply);
c->reader->fn->freeObject(reply);
if
(c->flags & REDIS_FREEING) {
__redisAsyncFree(ac);
return
;
}
}
else
{
c->reader->fn->freeObject(reply);
}
}
if
(status != REDIS_OK)
__redisAsyncDisconnect(ac);
}
static
int
__redisAsyncHandleConnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
if
(redisCheckSocketError(c,c->fd) == REDIS_ERR) {
if
(
errno
== EINPROGRESS)
return
REDIS_OK;
if
(ac->onConnect) ac->onConnect(ac,REDIS_ERR);
__redisAsyncDisconnect(ac);
return
REDIS_ERR;
}
c->flags |= REDIS_CONNECTED;
if
(ac->onConnect) ac->onConnect(ac,REDIS_OK);
return
REDIS_OK;
}
void
redisAsyncHandleRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
if
(!(c->flags & REDIS_CONNECTED)) {
if
(__redisAsyncHandleConnect(ac) != REDIS_OK)
return
;
if
(!(c->flags & REDIS_CONNECTED))
return
;
}
if
(redisBufferRead(c) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
}
else
{
_EL_ADD_READ(ac);
redisProcessCallbacks(ac);
}
}
void
redisAsyncHandleWrite(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
int
done = 0;
if
(!(c->flags & REDIS_CONNECTED)) {
if
(__redisAsyncHandleConnect(ac) != REDIS_OK)
return
;
if
(!(c->flags & REDIS_CONNECTED))
return
;
}
if
(redisBufferWrite(c,&done) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
}
else
{
if
(!done)
_EL_ADD_WRITE(ac);
else
_EL_DEL_WRITE(ac);
_EL_ADD_READ(ac);
}
}
static
char
*nextArgument(
char
*start,
char
**str,
size_t
*len) {
char
*p = start;
if
(p[0] !=
'$'
) {
p =
strchr
(p,
'$'
);
if
(p == NULL)
return
NULL;
}
*len = (
int
)
strtol
(p+1,NULL,10);
p =
strchr
(p,
'\r'
);
assert
(p);
*str = p+2;
return
p+2+(*len)+2;
}
static
int
__redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn,
void
*privdata,
char
*cmd,
size_t
len) {
redisContext *c = &(ac->c);
redisCallback cb;
int
pvariant, hasnext;
char
*cstr, *astr;
size_t
clen, alen;
char
*p;
sds sname;
if
(c->flags & (REDIS_DISCONNECTING | REDIS_FREEING))
return
REDIS_ERR;
cb.fn = fn;
cb.privdata = privdata;
p = nextArgument(cmd,&cstr,&clen);
assert
(p != NULL);
hasnext = (p[0] ==
'$'
);
pvariant = (
tolower
(cstr[0]) ==
'p'
) ? 1 : 0;
cstr += pvariant;
clen -= pvariant;
if
(hasnext && strncasecmp(cstr,
"subscribe\r\n"
,11) == 0) {
c->flags |= REDIS_SUBSCRIBED;
while
((p = nextArgument(p,&astr,&alen)) != NULL) {
sname = sdsnewlen(astr,alen);
if
(pvariant)
dictReplace(ac->sub.patterns,sname,&cb);
else
dictReplace(ac->sub.channels,sname,&cb);
}
}
else
if
(strncasecmp(cstr,
"unsubscribe\r\n"
,13) == 0) {
if
(!(c->flags & REDIS_SUBSCRIBED))
return
REDIS_ERR;
}
else
if
(strncasecmp(cstr,
"monitor\r\n"
,9) == 0) {
c->flags |= REDIS_MONITORING;
__redisPushCallback(&ac->replies,&cb);
}
else
{
if
(c->flags & REDIS_SUBSCRIBED)
__redisPushCallback(&ac->sub.invalid,&cb);
else
__redisPushCallback(&ac->replies,&cb);
}
__redisAppendCommand(c,cmd,len);
_EL_ADD_WRITE(ac);
return
REDIS_OK;
}
int
redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn,
void
*privdata,
const
char
*format,
va_list
ap) {
char
*cmd;
int
len;
int
status;
len = redisvFormatCommand(&cmd,format,ap);
status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
free
(cmd);
return
status;
}
int
redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn,
void
*privdata,
const
char
*format, ...) {
va_list
ap;
int
status;
va_start
(ap,format);
status = redisvAsyncCommand(ac,fn,privdata,format,ap);
va_end
(ap);
return
status;
}
int
redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn,
void
*privdata,
int
argc,
const
char
**argv,
const
size_t
*argvlen) {
char
*cmd;
int
len;
int
status;
len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
free
(cmd);
return
status;
}