#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "perl-couchbase.h"
void plcb_evloop_wait_unref(PLCB_t *obj) {
if (obj->wait_for_kv == 0 && obj->wait_for_views == 0) {
lcb_breakout(obj->instance);
}
}
static int
chain_endure(PLCB_t *obj, AV *resobj, const lcb_RESPSTORE *resp)
{
lcb_MULTICMD_CTX *mctx = NULL;
lcb_CMDENDURE dcmd = { 0 };
lcb_durability_opts_t dopts = { 0 };
char persist_to = 0, replicate_to = 0;
lcb_error_t err = LCB_SUCCESS;
SV *optsv;
optsv = *av_fetch(resobj, PLCB_RETIDX_OPTIONS, 1);
if (!SvIOK(optsv)) {
return 0;
}
PLCB_GETDURABILITY(SvUVX(optsv), persist_to, replicate_to);
if (persist_to == 0 && replicate_to == 0) {
return 0;
}
if (persist_to < 0 || replicate_to < 0) {
dopts.v.v0.cap_max = 1;
}
dopts.v.v0.persist_to = persist_to;
dopts.v.v0.replicate_to = replicate_to;
LCB_CMD_SET_KEY(&dcmd, resp->key, resp->nkey);
dcmd.cas = resp->cas;
mctx = lcb_endure3_ctxnew(obj->instance, &dopts, &err);
if (mctx == NULL) {
plcb_doc_set_err(obj, resobj, err);
return 0;
}
err = mctx->addcmd(mctx, (lcb_CMDBASE *)&dcmd);
if (err != LCB_SUCCESS) {
mctx->fail(mctx);
return 0;
}
lcb_sched_enter(obj->instance);
err = mctx->done(mctx, resp->cookie);
if (err != LCB_SUCCESS) {
plcb_doc_set_err(obj, resobj, err);
return 0;
}
lcb_sched_leave(obj->instance);
return 1;
}
static void
call_helper(AV *resobj, int cbtype, const lcb_RESPBASE *resp)
{
dSP;
const char *methname;
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(sv_2mortal(newRV_inc((SV*)resobj)));
if (cbtype == LCB_CALLBACK_STATS) {
const lcb_RESPSTATS *sresp = (const void *)resp;
/** Call as statshelper($doc,$server,$key,$value); */
XPUSHs(sv_2mortal(newSVpv(sresp->server, 0)));
XPUSHs(sv_2mortal(newSVpvn(sresp->key, sresp->nkey)));
if (sresp->value) {
XPUSHs(sv_2mortal(newSVpvn(sresp->value, sresp->nvalue)));
}
methname = PLCB_STATS_PLHELPER;
} else if (cbtype == LCB_CALLBACK_OBSERVE) {
const lcb_RESPOBSERVE *oresp = (const void *)resp;
/** Call as obshelper($doc,$status,$cas,$ismaster) */
XPUSHs(sv_2mortal(newSVuv(oresp->status)));
XPUSHs(sv_2mortal(plcb_sv_from_u64_new(&oresp->cas)));
XPUSHs(oresp->ismaster ? &PL_sv_yes : &PL_sv_no);
methname = PLCB_OBS_PLHELPER;
} else {
return;
}
PUTBACK;
call_pv(methname, G_DISCARD|G_EVAL);
SPAGAIN;
if (SvTRUE(ERRSV)) {
warn("Got error in %s: %s", methname, SvPV_nolen(ERRSV));
}
FREETMPS;
LEAVE;
}
static void
call_async(plcb_OPCTX *ctx, AV *resobj)
{
SV *cv = ctx->u.callback;
dSP;
if (cv == NULL || SvOK(cv) == 0) {
warn("Context does not have a callback (%p)!", cv);
return;
}
if ((ctx->flags & PLCB_OPCTXf_IMPLICIT) == 0) {
if (ctx->nremaining && (ctx->flags & PLCB_OPCTXf_CALLEACH) == 0) {
return; /* Still have ops. Only call once they're all complete */
}
}
ENTER;
SAVETMPS;
PUSHMARK(SP);
XPUSHs(sv_2mortal(newRV_inc((SV*)resobj)));
PUTBACK;
call_sv(cv, G_DISCARD);
FREETMPS;
LEAVE;
if (ctx->nremaining == 0 && (ctx->flags & PLCB_OPCTXf_CALLDONE)) {
ENTER;
SAVETMPS;
PUSHMARK(SP);
call_sv(cv, G_DISCARD);
FREETMPS;
LEAVE;
}
}
/* This callback is only ever called for single operation, single key results */
static void
callback_common(lcb_t instance, int cbtype, const lcb_RESPBASE *resp)
{
AV *resobj = NULL;
PLCB_t *parent;
SV *ctxrv = (SV *)resp->cookie;
plcb_OPCTX *ctx = NUM2PTR(plcb_OPCTX*, SvIVX(SvRV(ctxrv)));
if (cbtype == LCB_CALLBACK_STATS || cbtype == LCB_CALLBACK_OBSERVE ||
cbtype == LCB_CALLBACK_HTTP) {
HE *tmp;
hv_iterinit(ctx->docs);
tmp = hv_iternext(ctx->docs);
if (tmp && HeVAL(tmp) && SvROK(HeVAL(tmp))) {
resobj = (AV *)SvRV(HeVAL(tmp));
}
} else {
SV **tmp = hv_fetch(ctx->docs, resp->key, resp->nkey, 0);
if (tmp && SvROK(*tmp)) {
resobj = (AV*)SvRV(*tmp);
}
}
if (!resobj) {
warn("Couldn't find matching object!");
return;
}
parent = (PLCB_t *)lcb_get_cookie(instance);
plcb_doc_set_err(parent, resobj, resp->rc);
switch (cbtype) {
case LCB_CALLBACK_GET: {
const lcb_RESPGET *gresp = (const lcb_RESPGET *)resp;
if (resp->rc == LCB_SUCCESS) {
SV *newval = plcb_convert_retrieval(parent,
resobj, gresp->value, gresp->nvalue, gresp->itmflags);
av_store(resobj, PLCB_RETIDX_VALUE, newval);
plcb_doc_set_cas(parent, resobj, &resp->cas);
}
break;
}
case LCB_CALLBACK_TOUCH:
case LCB_CALLBACK_REMOVE:
case LCB_CALLBACK_UNLOCK:
case LCB_CALLBACK_STORE:
case LCB_CALLBACK_ENDURE:
if (resp->cas && resp->rc == LCB_SUCCESS) {
plcb_doc_set_cas(parent, resobj, &resp->cas);
}
if (cbtype == LCB_CALLBACK_STORE && resp->rc == LCB_SUCCESS &&
chain_endure(parent, resobj, (const lcb_RESPSTORE *)resp)) {
return; /* Will be handled already */
}
break;
case LCB_CALLBACK_COUNTER: {
const lcb_RESPCOUNTER *cresp = (const lcb_RESPCOUNTER*)resp;
plcb_doc_set_numval(parent, resobj, cresp->value, resp->cas);
break;
}
case LCB_CALLBACK_STATS: {
const lcb_RESPSTATS *sresp = (const void *)resp;
if (sresp->server) {
call_helper(resobj, cbtype, (const lcb_RESPBASE *)sresp);
return;
}
break;
}
case LCB_CALLBACK_OBSERVE: {
const lcb_RESPOBSERVE *oresp = (const lcb_RESPOBSERVE*)resp;
if (oresp->nkey) {
call_helper(resobj, cbtype, (const lcb_RESPBASE*)oresp);
return;
}
break;
}
case LCB_CALLBACK_HTTP: {
const lcb_RESPHTTP *htresp = (const lcb_RESPHTTP *)resp;
/* Store the CAS */
av_store(resobj, PLCB_HTIDX_STATUS, newSViv(htresp->htstatus));
if (htresp->headers) {
const char * const * hdr_cur;
HV *headers = newHV();
av_store(resobj, PLCB_HTIDX_HEADERS, newRV_noinc((SV*)headers));
for (hdr_cur = htresp->headers; *hdr_cur; hdr_cur += 2) {
const char *hdrkey = hdr_cur[0];
const char *hdrval = hdr_cur[1];
SV *valsv = newSVpv(hdrval, 0);
hv_store(headers, hdrkey, 0, valsv, 0);
}
}
if (htresp->nbody) {
lcb_U32 fmtflags = PLCB_CF_JSON;
SV *body;
SV *fmtspec = *av_fetch(resobj, PLCB_RETIDX_VALUE, 1);
if (SvIOK(fmtspec)) {
fmtflags = SvUV(fmtspec);
}
body = plcb_convert_retrieval_ex(parent, resobj,
htresp->body, htresp->nbody, fmtflags,
PLCB_CONVERT_NOCUSTOM);
av_store(resobj, PLCB_RETIDX_VALUE, body);
}
break;
}
default:
abort();
break;
}
ctx->nremaining--;
if (parent->async) {
call_async(ctx, resobj);
} else if (ctx->flags & PLCB_OPCTXf_WAITONE) {
av_push(ctx->u.ctxqueue, newRV_inc( (SV* )resobj));
plcb_kv_waitdone(parent);
}
if (!ctx->nremaining) {
SvREFCNT_dec(ctxrv);
plcb_kv_waitdone(parent);
plcb_opctx_clear(parent);
}
}
static void
bootstrap_callback(lcb_t instance, lcb_error_t status)
{
dSP;
PLCB_t *obj = (PLCB_t*) lcb_get_cookie(instance);
if (!obj->async) {
return;
}
if (!obj->conncb) {
warn("Object %p does not have a connect callback!", obj);
return;
}
printf("Invoking callback for connect..!\n");
ENTER;SAVETMPS;PUSHMARK(SP);
XPUSHs(sv_2mortal(newRV_inc(obj->selfobj)));
XPUSHs(sv_2mortal(newSViv(status)));
PUTBACK;
call_sv(obj->conncb, G_DISCARD);
SPAGAIN;
FREETMPS;LEAVE;
SvREFCNT_dec(obj->conncb); obj->conncb = NULL;
}
void
plcb_callbacks_setup(PLCB_t *object)
{
lcb_t o = object->instance;
lcb_install_callback3(o, LCB_CALLBACK_GET, callback_common);
lcb_install_callback3(o, LCB_CALLBACK_GETREPLICA, callback_common);
lcb_install_callback3(o, LCB_CALLBACK_STORE, callback_common);
lcb_install_callback3(o, LCB_CALLBACK_TOUCH, callback_common);
lcb_install_callback3(o, LCB_CALLBACK_REMOVE, callback_common);
lcb_install_callback3(o, LCB_CALLBACK_COUNTER, callback_common);
lcb_install_callback3(o, LCB_CALLBACK_UNLOCK, callback_common);
lcb_install_callback3(o, LCB_CALLBACK_ENDURE, callback_common);
lcb_install_callback3(o, LCB_CALLBACK_STATS, callback_common);
lcb_install_callback3(o, LCB_CALLBACK_OBSERVE, callback_common);
lcb_install_callback3(o, LCB_CALLBACK_HTTP, callback_common);
lcb_set_bootstrap_callback(o, bootstrap_callback);
}