#include "perl-couchbase.h"
#include <libcouchbase/views.h>
#include <libcouchbase/n1ql.h>

static void
rowreq_init_common(PLCB_t *parent, AV *req)
{
    SV *selfref;

    av_fill(req, PLCB_VHIDX_MAX);
    av_store(req, PLCB_VHIDX_ROWBUF, newRV_noinc((SV *)newAV()));
    av_store(req, PLCB_VHIDX_RAWROWS, newRV_noinc((SV *)newAV()));
    av_store(req, PLCB_VHIDX_PARENT, newRV_inc(parent->selfobj));

    selfref = newRV_inc((SV*)req);
    sv_rvweaken(selfref);
    av_store(req, PLCB_VHIDX_SELFREF, selfref);
}

static PLCB_t *
parent_from_req(AV *req)
{
    SV **pp = av_fetch(req, PLCB_VHIDX_PARENT, 0);
    return NUM2PTR(PLCB_t*,SvUV(SvRV(*pp)));
}

/* Handles the row, adding it into the internal structure */
static void
invoke_row(AV *req, SV *reqrv, SV *rowsrv)
{
    SV *meth;

    dSP;
    ENTER;
    SAVETMPS;
    PUSHMARK(SP);

    /* First arg */
    XPUSHs(reqrv);

    meth = *av_fetch(req, PLCB_VHIDX_PRIVCB, 0);
    if (rowsrv) {
        XPUSHs(rowsrv);
    }

    PUTBACK;
    call_sv(meth, G_DISCARD|G_EVAL);
    SPAGAIN;

    if (SvTRUE(ERRSV)) {
        warn("Got error in %s", SvPV_nolen(ERRSV));
    }

    if (rowsrv) {
        av_clear((AV *)SvRV(rowsrv));
    }

    FREETMPS;
    LEAVE;
}

/* Wraps the buf:length pair as an SV */
static SV *
sv_from_rowdata(const char *s, size_t n)
{
    if (s && n) {
        SV *ret = newSVpvn(s, n);
        SvUTF8_on(ret);
        return ret;
    } else {
        return SvREFCNT_inc(&PL_sv_undef);
    }
}

static SV*
make_views_row(PLCB_t *parent, const lcb_RESPVIEWQUERY *resp)
{
    HV *rowdata = newHV();
    SV *docid = sv_from_rowdata(resp->docid, resp->ndocid);

    /* Key, Value, Doc ID, Geo, Doc */
    hv_stores(rowdata, "key", sv_from_rowdata(resp->key, resp->nkey));
    hv_stores(rowdata, "value", sv_from_rowdata(resp->value, resp->nvalue));
    hv_stores(rowdata, "geometry", sv_from_rowdata(resp->geometry, resp->ngeometry));
    hv_stores(rowdata, "id", docid);

    if (resp->docresp) {
        const lcb_RESPGET *docresp = resp->docresp;
        AV *docav = newAV();

        hv_stores(rowdata, "__doc__", newRV_noinc((SV*)docav));
        av_store(docav, PLCB_RETIDX_KEY, SvREFCNT_inc(docid));
        plcb_doc_set_err(parent, docav, resp->rc);

        if (docresp->rc == LCB_SUCCESS) {
            SV *docval = plcb_convert_getresp(parent, docav, docresp);
            av_store(docav, PLCB_RETIDX_VALUE, docval);
            plcb_doc_set_cas(parent, docav, &docresp->cas);
        }
    }
    return newRV_noinc((SV *)rowdata);
}

static SV *
make_n1ql_row(const lcb_RESPN1QL *resp)
{
    return sv_from_rowdata(resp->row, resp->nrow);
}

static void
common_callback(lcb_t obj, const lcb_RESPBASE *resp,
    const char *meta, size_t nmeta, const lcb_RESPHTTP *htresp,
    int is_n1ql)
{
    AV *req = resp->cookie;
    SV *req_weakrv = *av_fetch(req, PLCB_VHIDX_SELFREF, 0);
    SV *rawrows_rv = *av_fetch(req, PLCB_VHIDX_RAWROWS, 0);
    AV *rawrows = (AV *)SvRV(rawrows_rv);

    PLCB_t *plobj = parent_from_req(req);

    plcb_views_waitdone(plobj);

    if (resp->rflags & LCB_RESP_F_FINAL) {
        av_store(req, PLCB_VHIDX_VHANDLE, SvREFCNT_inc(&PL_sv_undef));

        /* Flush any remaining rows.. */
        invoke_row(req, req_weakrv, rawrows_rv);

        av_store(req, PLCB_VHIDX_ISDONE, SvREFCNT_inc(&PL_sv_yes));
        av_store(req, PLCB_VHIDX_RC, newSViv(resp->rc));
        av_store(req, PLCB_VHIDX_META, sv_from_rowdata(meta, nmeta));

        if (htresp) {
            av_store(req, PLCB_VHIDX_HTCODE, newSViv(htresp->htstatus));
        }
        invoke_row(req, req_weakrv, NULL);
        SvREFCNT_dec(req);
    } else {
        SV *row;
        if (is_n1ql) {
            row = make_n1ql_row((const lcb_RESPN1QL *)resp);
        } else {
            row = make_views_row(plobj, (const lcb_RESPVIEWQUERY *)resp);
        }

        av_push(rawrows, row);
        if (av_len(rawrows) >= 1) {
            invoke_row(req, req_weakrv, rawrows_rv);
        }
    }

}

static void
viewrow_callback(lcb_t obj, int ct, const lcb_RESPVIEWQUERY *resp)
{
    common_callback(obj, (lcb_RESPBASE*)resp,
        resp->value, resp->nvalue, resp->htresp, 0);
}

static void
n1ql_callback(lcb_t obj, int ct, const lcb_RESPN1QL *resp)
{
    common_callback(obj, (lcb_RESPBASE *)resp, resp->row, resp->nrow,
        resp->htresp, 1);
}

SV *
PLCB__viewhandle_new(PLCB_t *parent,
    const char *ddoc, const char *view, const char *options, int flags)
{
    AV *req = NULL;
    SV *blessed;
    lcb_CMDVIEWQUERY cmd = { 0 };
    lcb_VIEWHANDLE vh = NULL;
    lcb_error_t rc;

    req = newAV();
    rowreq_init_common(parent, req);
    blessed = newRV_noinc((SV*)req);
    sv_bless(blessed, parent->view_stash);

    lcb_view_query_initcmd(&cmd, ddoc, view, options, viewrow_callback);
    cmd.cmdflags = flags; /* Trust lcb on this */
    cmd.handle = &vh;

    rc = lcb_view_query(parent->instance, req, &cmd);

    if (rc != LCB_SUCCESS) {
        SvREFCNT_dec(blessed);
        die("Couldn't issue view query: (0x%x): %s", rc, lcb_strerror(NULL, rc));
    } else {
        SvREFCNT_inc(req); /* For the callback */
        av_store(req, PLCB_VHIDX_VHANDLE, newSVuv(PTR2UV(vh)));
    }
    return blessed;
}

void
PLCB__viewhandle_fetch(SV *pp)
{
    AV *req = (AV *)SvRV(pp);
    PLCB_t *parent = parent_from_req(req);
    plcb_views_wait(parent);
}

void
PLCB__viewhandle_stop(SV *pp)
{
    AV *req = (AV *)SvRV(pp);
    PLCB_t *parent = parent_from_req(req);
    SV **tmp, *vhsv;

    tmp = av_fetch(req, PLCB_VHIDX_VHANDLE, 0);
    if (!tmp) {
        return;
    }

    vhsv = *tmp;
    if (SvIOK(vhsv)) {
        lcb_VIEWHANDLE vh = NUM2PTR(lcb_VIEWHANDLE, SvUV(vhsv));
        lcb_view_cancel(parent->instance, vh);
        av_store(req, PLCB_VHIDX_VHANDLE, SvREFCNT_inc(&PL_sv_undef));
        av_store(req, PLCB_VHIDX_ISDONE, SvREFCNT_inc(&PL_sv_yes));
        SvREFCNT_dec((SV *)req);
    }
}

SV *
PLCB__n1qlhandle_new(PLCB_t *parent, lcb_N1QLPARAMS *params, const char *host)
{
    AV *req;
    SV *blessed;
    lcb_CMDN1QL cmd = { 0 };
    lcb_error_t rc;

    rc = lcb_n1p_mkcmd(params, &cmd);
    if (rc != LCB_SUCCESS) {
        die("Error encoding N1QL parameters: %s", lcb_strerror(NULL, rc));
    }

    if (host && *host) {
        cmd.host = host;
    }
    cmd.callback = n1ql_callback;

    req = newAV();
    rowreq_init_common(parent, req);
    blessed = newRV_noinc((SV*)req);
    sv_bless(blessed, parent->n1ql_stash);

    rc = lcb_n1ql_query(parent->instance, req, &cmd);
    if (rc != LCB_SUCCESS) {
        SvREFCNT_dec(blessed);
        die("Couldn't issue N1QL query: (0x%x): %s", rc, lcb_strerror(NULL, rc));
    } else {
        SvREFCNT_inc(req);
    }

    return blessed;
}