#include "perl-couchbase.h"
SV *
plcb_opctx_new(PLCB_t *parent, int flags)
{
plcb_OPCTX *ctx;
SV *blessed;
if (parent->curctx) {
ctx = NUM2PTR(plcb_OPCTX*,SvIVX(SvRV(parent->curctx)));
if (ctx->nremaining == 0) {
plcb_opctx_clear(parent);
} else {
die("Existing context found. Existing context must be waited for or cleared");
}
}
if (parent->cachectx && (flags & PLCB_OPCTXf_IMPLICIT)) {
blessed = parent->cachectx;
parent->cachectx = NULL;
ctx = NUM2PTR(plcb_OPCTX*,SvIVX(SvRV(blessed)));
} else {
Newxz(ctx, 1, plcb_OPCTX);
ctx->docs = newHV();
ctx->parent = newRV_inc(parent->selfobj);
sv_rvweaken(ctx->parent);
blessed = newRV_noinc(newSViv(PTR2IV(ctx)));
sv_bless(blessed, parent->opctx_sync_stash);
}
ctx->flags = flags;
ctx->nremaining = 0;
parent->curctx = blessed;
SvREFCNT_inc(parent->curctx);
lcb_sched_enter(parent->instance);
return blessed;
}
void
plcb_opctx_clear(PLCB_t *parent)
{
plcb_OPCTX *ctx;
if (!parent->curctx) {
return;
}
if (!SvROK(parent->curctx)) {
SvREFCNT_dec(parent->curctx);
parent->curctx = NULL;
return;
}
ctx = NUM2PTR(plcb_OPCTX*,SvIVX(SvRV(parent->curctx)));
hv_clear(ctx->docs);
if (ctx->multi) {
ctx->multi->fail(ctx->multi);
ctx->multi = NULL;
}
if ((ctx->flags & PLCB_OPCTXf_IMPLICIT) && parent->cachectx == NULL) {
parent->cachectx = parent->curctx;
} else {
SvREFCNT_dec(parent->curctx);
}
parent->curctx = NULL;
}
void
plcb_opctx_initop(plcb_SINGLEOP *so, PLCB_t *parent, SV *doc, SV *ctx, SV *options)
{
if (!plcb_doc_isa(parent, doc)) {
die("Must pass a " PLCB_RET_CLASSNAME);
}
so->docrv = doc;
so->docav = (AV *)SvRV(doc);
so->opctx = ctx;
so->parent = parent;
plcb_doc_set_err(parent, so->docav, -1);
/* Extract options for this command */
if (options && SvTYPE(options) != SVt_NULL) {
if (SvROK(options) == 0 || SvTYPE(SvRV(options)) != SVt_PVHV) {
die("options must be undef or a HASH reference");
}
so->cmdopts = options;
}
if (ctx && SvTYPE(ctx) != SVt_NULL) {
if (SvRV(so->opctx) != SvRV(parent->curctx)) {
die("Got a different context than current!");
}
so->opctx = parent->curctx;
} else {
so->opctx = plcb_opctx_new(parent, PLCB_OPCTXf_IMPLICIT);
/* If we get an error, don't leave the pointer dangling */
SAVEFREESV(so->opctx);
}
so->cookie = so->opctx;
so->ctxptr = NUM2PTR(plcb_OPCTX*, SvIVX(SvRV(so->opctx)));
}
SV *
plcb_opctx_return(plcb_SINGLEOP *so, lcb_error_t err)
{
/* Figure out what type of context we are */
int haserr = 0;
SV *retval;
SV *ksv;
HE *ent;
plcb_OPCTX *ctx = NUM2PTR(plcb_OPCTX*, SvIVX(SvRV(so->opctx)));
if (err != LCB_SUCCESS) {
plcb_doc_set_err(so->parent, so->docav, err);
if (ctx->flags & PLCB_OPCTXf_IMPLICIT) {
lcb_sched_fail(so->parent->instance);
}
warn("Couldn't schedule operation. Code 0x%x (%s)\n", err, lcb_strerror(NULL, err));
haserr = 1;
goto GT_RET;
}
/* Get the key */
if (so->cmdbase == PLCB_CMD_STATS || so->cmdbase == PLCB_CMD_OBSERVE ||
so->cmdbase == PLCB_CMD_HTTP) {
ksv = &PL_sv_yes;
} else {
ksv = *av_fetch(so->docav, PLCB_RETIDX_KEY, 1);
}
ent = hv_fetch_ent(ctx->docs, ksv, 1, 0);
if (SvOK(HeVAL(ent))) {
die("Found duplicate item inside batch context");
} else {
SvREFCNT_dec(HeVAL(ent));
HeVAL(ent) = newRV_inc((SV*)so->docav);
}
/* Increment remaining count on the context */
ctx->nremaining++;
if (ctx->flags & PLCB_OPCTXf_IMPLICIT) {
SvREFCNT_inc(so->opctx); /* Undo SAVEFREESV */
lcb_sched_leave(so->parent->instance);
if (so->parent->async) {
/* Clear this context right now */
SvREFCNT_dec(so->parent->curctx);
so->parent->curctx = NULL;
goto GT_RET;
}
plcb_kv_wait(so->parent);
/* See if we have an error */
if (plcb_doc_get_err(so->docav) != LCB_SUCCESS) {
haserr = 1;
}
}
GT_RET:
if (haserr) {
retval = &PL_sv_no;
} else if (so->parent->async || (ctx->flags & PLCB_OPCTXf_IMPLICIT) == 0) {
retval = so->opctx;
} else {
retval = &PL_sv_yes;
}
SvREFCNT_inc(retval);
return retval;
}
void
plcb_opctx_submit(PLCB_t *parent, plcb_OPCTX *ctx)
{
lcb_error_t err = LCB_SUCCESS;
if (ctx->multi) {
err = ctx->multi->done(ctx->multi, parent->curctx);
ctx->multi = NULL;
if (err != LCB_SUCCESS) {
die("Couldn't submit multi context: Code=0x%x", err);
}
}
lcb_sched_leave(parent->instance);
}