#include "perl-couchbase.h"
#include "plcb-util.h"
#include <libcouchbase/vbucket.h>

static int PLCB_connect(PLCB_t* self);

void plcb_cleanup(PLCB_t *object)
{
    plcb_opctx_clear(object);
    SvREFCNT_dec(object->cachectx);

    if (object->instance) {
        lcb_destroy(object->instance);
        object->instance = NULL;
    }

    #define _free_cv(fld) if (object->fld) { SvREFCNT_dec(object->fld); object->fld = NULL; }
    _free_cv(cv_serialize); _free_cv(cv_deserialize);
    _free_cv(cv_jsonenc); _free_cv(cv_jsondec);
    _free_cv(cv_customenc); _free_cv(cv_customdec);
    #undef _free_cv
}

/*Construct a new libcouchbase object*/
static SV *
PLCB_construct(const char *pkg, HV *hvopts)
{
    lcb_t instance;
    lcb_error_t err;
    struct lcb_create_st cr_opts = { 0 };

    SV *blessed_obj;
    SV *iops_impl = NULL;
    SV *conncb = NULL;

    PLCB_t *object;
    plcb_OPTION options[] = {
        PLCB_KWARG("connstr", CSTRING, &cr_opts.v.v3.connstr),
        PLCB_KWARG("password", CSTRING, &cr_opts.v.v3.passwd),
        PLCB_KWARG("io", SV, &iops_impl),
        PLCB_KWARG("on_connect", CV, &conncb),
        { NULL }
    };

    cr_opts.version = 3;
    plcb_extract_args((SV*)hvopts, options);

    if (iops_impl && SvTYPE(iops_impl) != SVt_NULL) {
        plcb_IOPROCS *ioprocs;
        /* Validate */
        if (!sv_derived_from(iops_impl, PLCB_IOPROCS_CLASS)) {
            die("io must be a valid " PLCB_IOPROCS_CLASS);
        }
        if (!conncb) {
            die("Connection callback must be specified in async mode");
        }
        ioprocs = NUM2PTR(plcb_IOPROCS* , SvIV(SvRV(iops_impl)));
        cr_opts.v.v3.io = ioprocs->iops_ptr;
    }

    err = lcb_create(&instance, &cr_opts);

    if (!instance) {
        die("Failed to create instance: %s", lcb_strerror(NULL, err));
    }

    Newxz(object, 1, PLCB_t);
    lcb_set_cookie(instance, object);
    object->instance = instance;

    if (iops_impl) {
        object->ioprocs = newRV_inc(SvRV(iops_impl));
        object->conncb = newRV_inc(SvRV(conncb));
        object->async = 1;
    }

    plcb_callbacks_setup(object);

    #define get_stash_assert(stashname, target) \
        if (! (object->target = gv_stashpv(stashname, 0)) ) { \
            die("Couldn't load '%s'", stashname); \
        }

    get_stash_assert(PLCB_RET_CLASSNAME, ret_stash);
    get_stash_assert(PLCB_OPCTX_CLASSNAME, opctx_sync_stash);
    get_stash_assert(PLCB_VIEWHANDLE_CLASS, view_stash);
    get_stash_assert(PLCB_N1QLHANDLE_CLASS, n1ql_stash);

    blessed_obj = newSV(0);
    sv_setiv(newSVrv(blessed_obj, PLCB_BKT_CLASSNAME), PTR2IV(object));

    object->selfobj = SvRV(blessed_obj);
    return blessed_obj;
}

static int
PLCB_connect(PLCB_t *object)
{
    lcb_error_t err;
    lcb_t instance = object->instance;

    if (object->connected) {
        warn("Already connected");
        return 1;

    } else {
        if ((err = lcb_connect(instance)) != LCB_SUCCESS) {
            goto GT_ERR;
        }
        if (object->async) {
            return 0;
        }

        lcb_wait(instance);
        if ((err = lcb_get_bootstrap_status(instance)) != LCB_SUCCESS) {
            goto GT_ERR;
        }
        object->connected = 1;
        return 1;
    }

    GT_ERR:
    die("Couldn't connect: 0x%x (%s)", err, lcb_strerror(NULL, err));
    return 0;
}

static void
get_converter_pointers(PLCB_t *object, int type, SV ***cv_encode, SV ***cv_decode)
{
    if (type == PLCB_CONVERTERS_CUSTOM) {
        *cv_encode = &object->cv_customenc;
        *cv_decode = &object->cv_customdec;
    } else if (type == PLCB_CONVERTERS_JSON) {
        *cv_encode = &object->cv_jsonenc;
        *cv_decode = &object->cv_jsondec;
    } else if (type == PLCB_CONVERTERS_STORABLE) {
        *cv_encode = &object->cv_serialize;
        *cv_decode = &object->cv_deserialize;
    } else {
        die("Unrecognized converter type %d", type);
    }
}


/* lcb_cntl() APIs */
static void
PLCB__cntl_set(PLCB_t *object, int setting, int type, SV *value)
{
    lcb_error_t err;
    void *p = NULL;
    union {
        float floatval;
        int intval;
        unsigned uintval;
        size_t sizeval;
        uint32_t u32val;
    } u;
    p = &u;

    if (!SvOK(value)) {
        if (SvTYPE(value) == SVt_PVMG && SvREFCNT(value) == 1) {
            /* Workaround for local $cb->settings->{foo} with older perls */
            return;
        }
        die("Passed empty value");
    }

    if (type == PLCB_SETTING_INT) {
        u.intval = SvIV(value);
    } else if (type == PLCB_SETTING_UINT) {
        u.uintval = SvUV(value);
    } else if (type == PLCB_SETTING_U32) {
        u.u32val = SvUV(value);
    } else if (type == PLCB_SETTING_SIZE) {
        u.sizeval = SvUV(value);
    } else if (type == PLCB_SETTING_TIMEOUT) {
        u.u32val = SvNV(value) * 1000000;
    } else if (type == PLCB_SETTING_STRING) {
        p = SvPV_nolen(value);
    } else {
        die("Unrecognized type code %d", type);
    }
    err = lcb_cntl(object->instance, LCB_CNTL_SET, setting, p);
    if (err != LCB_SUCCESS) {
        warn("Failed to set setting=%d, type=%d", setting, type);
    }
}

static SV *
PLCB__cntl_get(PLCB_t *object, int setting, int type)
{
    lcb_error_t err;
    union {
        float floatval;
        int intval;
        unsigned uintval;
        size_t sizeval;
        uint32_t u32val;
        const char *strval;
    } u;

    memset(&u, 0, sizeof u);

    err = lcb_cntl(object->instance, LCB_CNTL_GET, setting, &u);
    if (err != LCB_SUCCESS) {
        warn("Couldn't get setting=%d, type=%d: %s", setting, type, lcb_strerror(NULL, err));
        SvREFCNT_inc(&PL_sv_undef);
        return &PL_sv_undef;
    }

    if (type == PLCB_SETTING_INT) {
        return newSViv(u.intval);
    } else if (type == PLCB_SETTING_UINT) {
        return newSVuv(u.uintval);
    } else if (type == PLCB_SETTING_U32) {
        return newSVuv(u.u32val);
    } else if (type == PLCB_SETTING_SIZE) {
        return newSVuv(u.sizeval);
    } else if (type == PLCB_SETTING_TIMEOUT) {
        return newSVnv((float)u.u32val / 1000000.0);
    } else if (type == PLCB_SETTING_STRING) {
        return newSVpv(u.strval ? u.strval : "", 0);
    } else {
        die("Unknown type %d", type);
        return NULL;
    }
}

#define dPLCB_INPUTS \
    SV *options = &PL_sv_undef; SV *ctx = &PL_sv_undef;

#define FILL_EXTRA_PARAMS() \
    if (items > 4) { croak_xs_usage(cv, "bucket, doc [, options, ctx ]"); } \
    if (items >= 3) { options = ST(2); } \
    if (items >= 4) { ctx = ST(3); }

MODULE = Couchbase PACKAGE = Couchbase::Bucket    PREFIX = PLCB_

PROTOTYPES: DISABLE

SV *
PLCB_construct(const char *pkg, HV *options)

int
PLCB_connect(PLCB_t *object)

SV *
PLCB__codec_common(PLCB_t *object, int type, ...)
    ALIAS:
    _encoder = 1
    _decoder = 2

    PREINIT:
    SV **cv_encode = NULL, **cv_decode = NULL, **target = NULL;

    CODE:
    get_converter_pointers(object, type, &cv_encode, &cv_decode);
    target = ix == 1 ? cv_encode : cv_decode;

    if (items == 2) {
        if (*target) {
            RETVAL = newRV_inc(*target);
        } else {
            RETVAL = &PL_sv_undef; SvREFCNT_inc(&PL_sv_undef);
        }
    } else {
        SV *tmpsv = ST(2);
        SV *to_decref = *target;

        RETVAL = &PL_sv_undef;
        if (tmpsv != &PL_sv_undef) {
            if (SvROK(tmpsv) == 0 || SvTYPE(SvRV(tmpsv)) != SVt_PVCV) {
                die("Argument passed must be undef or CODE reference");
            }
            *target = SvRV(tmpsv);
            SvREFCNT_inc(*target);
        } else {
            *target = NULL;
        }
        SvREFCNT_dec(to_decref);
        SvREFCNT_inc(RETVAL);
    }
    OUTPUT: RETVAL

void
PLCB__cntl_set(PLCB_t *object, int setting, int type, SV *value)

SV *
PLCB__cntl_get(PLCB_t *object, int setting, int type)

void
PLCB_DESTROY(PLCB_t *object)
    CODE:
    plcb_cleanup(object);
    Safefree(object);

SV *
PLCB__get(PLCB_t *self, SV *doc, ...)
    ALIAS:
    get = PLCB_CMD_GET
    get_and_touch = PLCB_CMD_GAT
    get_and_lock = PLCB_CMD_LOCK
    touch = PLCB_CMD_TOUCH

    PREINIT:
    plcb_SINGLEOP opinfo = { ix };
    dPLCB_INPUTS

    CODE:
    FILL_EXTRA_PARAMS()

    plcb_opctx_initop(&opinfo, self, doc, ctx, options);
    RETVAL = PLCB_op_get(self, &opinfo);
    OUTPUT: RETVAL
    
SV *
PLCB__store(PLCB_t *self, SV *doc, ...)
    ALIAS:
    upsert = PLCB_CMD_SET
    insert = PLCB_CMD_ADD
    replace = PLCB_CMD_REPLACE
    append_bytes = PLCB_CMD_APPEND
    prepend_bytes = PLCB_CMD_PREPEND

    PREINIT:
    plcb_SINGLEOP opinfo = { 0 };
    dPLCB_INPUTS
    
    CODE:
    FILL_EXTRA_PARAMS()
    opinfo.cmdbase = ix;
    plcb_opctx_initop(&opinfo, self, doc, ctx, options);

    
    RETVAL = PLCB_op_set(self, &opinfo);
    OUTPUT: RETVAL
    
SV *
PLCB_remove(PLCB_t *self, SV *doc, ...)
    PREINIT:
    plcb_SINGLEOP opinfo = { PLCB_CMD_REMOVE };
    dPLCB_INPUTS

    CODE:
    FILL_EXTRA_PARAMS()
    plcb_opctx_initop(&opinfo, self, doc, ctx, options);
    
    RETVAL = PLCB_op_remove(self, &opinfo);
    OUTPUT: RETVAL


SV *
PLCB_unlock(PLCB_t *self, SV *doc, ...)
    PREINIT:
    plcb_SINGLEOP opinfo = { PLCB_CMD_UNLOCK };
    dPLCB_INPUTS

    CODE:
    FILL_EXTRA_PARAMS()
    plcb_opctx_initop(&opinfo, self, doc, ctx, options);
    RETVAL = PLCB_op_unlock(self, &opinfo);
    OUTPUT: RETVAL

SV *
PLCB_counter(PLCB_t *self, SV *doc, ...)
    PREINIT:
    plcb_SINGLEOP opinfo = { PLCB_CMD_COUNTER };
    dPLCB_INPUTS;
    CODE:
    FILL_EXTRA_PARAMS()
    plcb_opctx_initop(&opinfo, self, doc, ctx, options);
    RETVAL = PLCB_op_counter(self, &opinfo);
    OUTPUT: RETVAL


SV *
PLCB_endure(PLCB_t *self, SV *doc, ...)
    PREINIT:
    plcb_SINGLEOP opinfo = { PLCB_CMD_ENDURE };
    dPLCB_INPUTS;
    CODE:
    FILL_EXTRA_PARAMS()
    plcb_opctx_initop(&opinfo, self, doc, ctx, options);
    RETVAL = PLCB_op_endure(self, &opinfo);
    OUTPUT: RETVAL


SV *
PLCB__stats_common(PLCB_t *self, SV *doc, ...)
    ALIAS:
    _stats = PLCB_CMD_STATS
    _keystats = PLCB_CMD_KEYSTATS

    PREINIT:
    plcb_SINGLEOP opinfo = { ix };
    dPLCB_INPUTS

    CODE:
    FILL_EXTRA_PARAMS()
    plcb_opctx_initop(&opinfo, self, doc, ctx, options);
    RETVAL = PLCB_op_stats(self, &opinfo);
    OUTPUT: RETVAL


SV *
PLCB__observe(PLCB_t *self, SV *doc, ...)
    PREINIT:
    plcb_SINGLEOP opinfo = { PLCB_CMD_OBSERVE };
    dPLCB_INPUTS
    CODE:
    FILL_EXTRA_PARAMS()
    plcb_opctx_initop(&opinfo, self, doc, ctx, options);
    RETVAL = PLCB_op_observe(self, &opinfo);
    OUTPUT: RETVAL

SV *
PLCB__http(PLCB_t *self, SV *doc, ...)
    PREINIT:
    plcb_SINGLEOP opinfo = { PLCB_CMD_HTTP };
    dPLCB_INPUTS
    CODE:
    FILL_EXTRA_PARAMS()
    plcb_opctx_initop(&opinfo, self, doc, ctx, options);
    RETVAL = PLCB_op_http(self, &opinfo);
    OUTPUT: RETVAL

SV *
PLCB_cluster_nodes(PLCB_t *object)
    PREINIT:
    AV *retav;
    const char * const * server_nodes;

    CODE:
    server_nodes = lcb_get_server_list(object->instance);
    retav = newAV();
    RETVAL = newRV_noinc((SV*)retav);

    if (server_nodes) {
        const char * const *cur_node;
        for (cur_node = server_nodes; *cur_node; cur_node++) {
            av_push(retav, newSVpv(*cur_node, 0));
        }
    }

    OUTPUT: RETVAL
    
lcbvb_CONFIG *
PLCB_get_bucket_config(PLCB_t *object)
    PREINIT:
    lcbvb_CONFIG *orig, *cp;
    lcb_error_t err;
    char *tmpstr;

    CODE:
    err = lcb_cntl(object->instance, LCB_CNTL_GET, LCB_CNTL_VBCONFIG, &orig);
    if (err != LCB_SUCCESS) {
        die("Couldn't get config: %s", lcb_strerror(NULL, err));
    }
    if (orig == NULL) {
        die("Client does not have a config yet");
    }
    tmpstr = lcbvb_save_json(orig);
    if (!tmpstr) {
        die("Couldn't get JSON dump");
    }
    cp = lcbvb_create();
    if (!cp) {
        free(tmpstr);
        die("Couldn't allocate new config");
    }
    if (0 != lcbvb_load_json(cp, tmpstr)) {
        const char *err = lcbvb_get_error(cp);
        free(tmpstr);
        lcbvb_destroy(cp);
        die("Couldn't load new config: %s", err);
    }
    free(tmpstr);
    RETVAL = cp;
    OUTPUT: RETVAL

SV *
PLCB_batch(PLCB_t *object)
    PREINIT:
    SV *ctxrv = NULL;

    CODE:
    ctxrv = plcb_opctx_new(object, 0);
    RETVAL = newRV_inc(SvRV(ctxrv));

    lcb_sched_enter(object->instance);
    OUTPUT: RETVAL

SV *
PLCB_durability_batch(PLCB_t *object, SV *options)
    PREINIT:
    int persist_to = 0, replicate_to = 0, is_delete = 0;
    SV *ctxrv = NULL;
    plcb_OPCTX *opctx;
    lcb_error_t err;
    lcb_durability_opts_t dopts = { 0 };
    plcb_OPTION args[] = {
        PLCB_KWARG(PLCB_ARG_K_PERSIST, INT, &persist_to),
        PLCB_KWARG(PLCB_ARG_K_REPLICATE, INT, &replicate_to),
        PLCB_KWARG(PLCB_ARG_K_ISDELETE, BOOL, &is_delete),
        { NULL }
    };
    CODE:
    plcb_extract_args(options, args);
    ctxrv = plcb_opctx_new(object, 0);
    RETVAL = newRV_inc(SvRV(ctxrv));
    dopts.v.v0.persist_to = persist_to;
    dopts.v.v0.replicate_to = replicate_to;
    dopts.v.v0.check_delete = is_delete;
    if (replicate_to == -1 || persist_to == -1) {
        dopts.v.v0.cap_max = 1;
    }

    opctx = NUM2PTR(plcb_OPCTX*,SvIVX(SvRV(ctxrv)));
    assert(opctx->multi == NULL);
    opctx->multi = lcb_endure3_ctxnew(object->instance, &dopts, &err);
    if (!opctx->multi) {
        SvREFCNT_dec(RETVAL);
        die("Bad parameters for durability: 0x%x (%s)", err, lcb_strerror(NULL, err));
    }

    OUTPUT: RETVAL

void
PLCB__ctx_clear(PLCB_t *object)
    CODE:
    plcb_opctx_clear(object);


SV *
PLCB_user_data(PLCB_t *object, ...)
    PREINIT:
    CODE:
    if (items == 1) {
        RETVAL = object->udata;
    } else {
        SvREFCNT_dec(object->udata);
        object->udata = ST(1);
        SvREFCNT_inc(object->udata);
        RETVAL = &PL_sv_undef;
    }
    SvREFCNT_inc(RETVAL);
    OUTPUT: RETVAL

int
PLCB_connected(PLCB_t *object)
    CODE:
    RETVAL = object->connected;
    OUTPUT: RETVAL

MODULE = Couchbase PACKAGE = Couchbase::OpContext PREFIX = PLCB_ctx_

void
PLCB_ctx_wait_all(plcb_OPCTX *ctx)
    PREINIT:
    PLCB_t *parent;

    CODE:
    if (!parent) {
        die("Parent context is destroyed");
    }

    if (!parent->curctx) {
        die("Current context is not active");
    }

    if (!ctx->nremaining) {
        return;
    }
    /* Remove the 'wait_one' flag */
    ctx->flags &= ~PLCB_OPCTXf_WAITONE;
    plcb_opctx_submit(parent, ctx);
    lcb_wait3(parent->instance, LCB_WAIT_NOCHECK);


SV *
PLCB_ctx_wait_one(plcb_OPCTX *ctx)
    PREINIT:
    PLCB_t *parent;

    CODE:
    if (!parent) {
        die("Parent context is destroyed");
    }

    if (ctx->u.ctxqueue) {
        RETVAL = av_shift(ctx->u.ctxqueue);
        if (RETVAL != &PL_sv_undef) {
            goto GT_DONE;
        }
    }

    if (!ctx->nremaining) {
        RETVAL = &PL_sv_undef;
        SvREFCNT_inc(&PL_sv_undef);
        goto GT_DONE;
    }

    if (!ctx->u.ctxqueue) {
        ctx->u.ctxqueue = newAV();
    }

    ctx->flags |= PLCB_OPCTXf_WAITONE;
    plcb_opctx_submit(parent, ctx);
    lcb_wait3(parent->instance, LCB_WAIT_NOCHECK);
    RETVAL = av_shift(ctx->u.ctxqueue);

    GT_DONE: ;
    OUTPUT: RETVAL

SV *
PLCB_ctx__cbo(plcb_OPCTX *ctx)
    PREINIT:
    PLCB_t *parent;
    CODE:
    (void)parent;
    RETVAL = newRV_inc(SvRV(ctx->parent));
    OUTPUT: RETVAL

void
PLCB_ctx_set_callback(plcb_OPCTX *ctx, CV *cv)
    PREINIT:
    PLCB_t *parent;
    CODE:
    SvREFCNT_dec(ctx->u.callback);
    ctx->u.callback = newRV_inc((SV*)cv);

SV *
PLCB_ctx_get_callback(plcb_OPCTX *ctx)
    PREINIT:
    PLCB_t *parent;
    CODE:
    if (!ctx->u.callback) {
        RETVAL = &PL_sv_undef;
    } else {
        RETVAL = ctx->u.callback;
    }
    SvREFCNT_inc(RETVAL);
    OUTPUT: RETVAL


void
PLCB_ctx_DESTROY(plcb_OPCTX *ctx)
    PREINIT:
    PLCB_t *parent;
    CODE:

    SvREFCNT_dec(ctx->parent);
    SvREFCNT_dec(ctx->u.ctxqueue);
    SvREFCNT_dec(ctx->docs);
    Safefree(ctx);

MODULE = Couchbase PACKAGE = Couchbase    PREFIX = PLCB_

HV *
PLCB_lcb_version()
    PREINIT:
    lcb_U32 ivers;
    HV *ret;
    const char *tmp;

    CODE:
    ret = newHV();
    tmp = lcb_get_version(&ivers);
    
    (void)hv_stores(ret, "hex", newSVuv(ivers));
    (void)hv_stores(ret, "str", newSVpv(tmp, 0));
    if (lcb_cntl(NULL, LCB_CNTL_GET, LCB_CNTL_CHANGESET, &tmp) == LCB_SUCCESS) {
        (void)hv_stores(ret, "rev", newSVpv(tmp, 0));
    }
    
    RETVAL = ret;
    OUTPUT: RETVAL

IV
PLCB__get_errtype(int code)
    CODE:
    RETVAL = lcb_get_errtype(code);
    OUTPUT: RETVAL

SV *
PLCB_strerror(int code)
    PREINIT:
    const char *msg;
    unsigned len;
    CODE:
    msg = lcb_strerror(NULL, code);
    len = strlen(msg);
    RETVAL = newSVpvn_share(msg, len, 0);
    SvREADONLY_on(RETVAL);
    OUTPUT: RETVAL


SV *
PLCB__viewhandle_new(PLCB_t *obj, \
    const char *view, const char *design, const char *options, int flags)

void
PLCB__viewhandle_fetch(SV *vh)

void
PLCB__viewhandle_stop(SV *vh)

SV *
PLCB__n1qlhandle_new(PLCB_t *parent, lcb_N1QLPARAMS *params, const char *host)


BOOT:
/*XXX: DO NOT MODIFY WHITESPACE HERE. xsubpp is touchy*/
#define PLCB_BOOTSTRAP_DEPENDENCY(bootfunc) \
PUSHMARK(SP); \
mXPUSHs(newSVpv("Couchbase", sizeof("Couchbase")-1)); \
mXPUSHs(newSVpv(XS_VERSION, sizeof(XS_VERSION)-1)); \
PUTBACK; \
bootfunc(aTHX_ cv); \
SPAGAIN;
{
    plcb_define_constants();
    PLCB_BOOTSTRAP_DEPENDENCY(boot_Couchbase__BucketConfig);
    PLCB_BOOTSTRAP_DEPENDENCY(boot_Couchbase__IO);
    PLCB_BOOTSTRAP_DEPENDENCY(boot_Couchbase__N1QL__Params);
}
#undef PLCB_BOOTSTRAP_DEPENDENCY