#include <EXTERN.h>
#include <perl.h>
#include <XSUB.h>

#define NEED_newRV_noinc
#define NEED_sv_2pv_flags
#include "ppport.h"

#include "rdb_parser.h"

RDB_parser* rdb_parser__init(SV* master, SV* error_class, int utf8) {
    RDB_parser *parser;

    Newx(parser, 1, RDB_parser);
    if (parser == NULL) {
        croak("Couldn't allocate memory for parser");
    }

    if (SvROK(master)) {
        parser->master = SvRV(master);
    }
    else {
        parser->master = &PL_sv_undef;
    }

    parser->utf8 = utf8;
    parser->callbacks = newAV();
    parser->default_cb = NULL;
    parser->mblk_reply = NULL;
    parser->mblk_store = NULL;
    parser->buffer = newSVpvn("", 0);
    parser->state = RDBP_CLEAN;

    parser->error_class = newSVsv(error_class);
    parser->error_class_constructor = newSVsv(error_class);
    sv_catpv(parser->error_class_constructor, "::new");

    return parser;
}

void rdb_parser__free(RDB_parser *parser) {
    struct rdbp_mblk_store *store, *next;

    SvREFCNT_dec(parser->callbacks);
    SvREFCNT_dec(parser->buffer);
    SvREFCNT_dec(parser->error_class);
    SvREFCNT_dec(parser->error_class_constructor);
    if (parser->default_cb != NULL)
        SvREFCNT_dec(parser->default_cb);
    if (parser->mblk_reply != NULL)
        SvREFCNT_dec(parser->mblk_reply);

    store = parser->mblk_store;
    while (store != NULL) {
        next = store->next;
        SvREFCNT_dec(store->mblk_reply);
        Safefree(store);
        store = next;
    }

    Safefree(parser);
}

void rdb_parser__propagate_reply(RDB_parser *parser, SV *reply) {
    SV *cb;

    while (1) {
        if(av_len(parser->callbacks) >= 0) {
            cb = av_shift(parser->callbacks);
            sv_2mortal(cb);
        }
        else if (parser->default_cb != NULL) {
            cb = parser->default_cb;
            parser->default_cb = NULL;
        }
        else {
            break;
        }

        {
            dSP;
            ENTER;
            SAVETMPS;
            PUSHMARK(SP);
            XPUSHs(sv_2mortal(newRV_inc(parser->master)));
            XPUSHs(sv_2mortal(newSVsv(reply)));
            PUTBACK;
            call_sv(cb, G_VOID|G_DISCARD);
            FREETMPS;
            LEAVE;
        }
    }
}

static
long _line_length(char *start, size_t length) {
    int i;
    char *pos = start;

    for (i=0; i < length - 1; i++, pos++) {
        if (*pos == '\r' && pos[1] == '\n') {
            return (long)(pos - start);
        }
    }

    return -1;
}

/*
 * read line from the buffer and return it as SV
 */
static
SV* _read_line(SV *buffer) {
    char *pv;
    long len;
    SV *line = NULL;

    pv  = SvPVX(buffer);
    len = _line_length(pv, sv_len(buffer));
    if (len >= 0) {
        line = newSVpvn(pv, len);
        sv_chop(buffer, pv + len + 2);
    }

    return line;
};

/*
 * read line containing integer number
 */
static
SV* _read_number(SV *buffer) {
    char *pv;
    long len;
    SV *num = NULL;

    pv = SvPVX(buffer);
    len = _line_length(pv, sv_len(buffer));
    if (len >= 0) {
        pv[len] = 0;
        num = newSViv(atol(pv));
        sv_chop(buffer, pv + len + 2);
    }

    return num;
}

/*
 * read line containing length
 * returns length >= -1, or -2 if line is not finished
 */
static
long _read_length(SV *buffer) {
    char *pv;
    long len;
    long num = -2;

    pv = SvPVX(buffer);
    len = _line_length(pv, sv_len(buffer));
    if (len >= 0) {
        pv[len] = 0;
        num = atol(pv);
        sv_chop(buffer, pv + len + 2);
    }

    return num;
}

/*
 * creates RedisDB::Parser::Error object from the message
 */
static
SV* _create_rdb_error(RDB_parser *parser, SV *msg) {
    int count;
    SV* err;

    dSP;
    ENTER;
    SAVETMPS;
    PUSHMARK(SP);
    XPUSHs(parser->error_class);
    XPUSHs(sv_2mortal(msg));
    PUTBACK;
    count = call_sv(parser->error_class_constructor, G_SCALAR);
    if (count != 1)
        croak("Expected single return value from new, but got many");
    SPAGAIN;
    err = newSVsv(POPs);
    PUTBACK;
    FREETMPS;
    LEAVE;

    return err;
}

/*
 * stores current multibulk reply status in mblk_stack
 */
static
void _mblk_status_store(RDB_parser *parser) {
    struct rdbp_mblk_store *store;
    Newx(store, 1, struct rdbp_mblk_store);
    store->mblk_reply  = parser->mblk_reply;
    parser->mblk_reply = NULL;
    store->mblk_len    = parser->mblk_len;
    store->next        = parser->mblk_store;
    parser->mblk_store = store;
}

/*
 * fetches status of the multibulk reply from the mblk_stack
 */
static
void _mblk_status_fetch(RDB_parser *parser) {
    struct rdbp_mblk_store *store;

    store = parser->mblk_store;
    if (store == NULL)
        croak("Already at the upper level of multi-bulk reply");
    parser->mblk_len   = store->mblk_len;
    parser->mblk_reply = store->mblk_reply;
    parser->mblk_store = store->next;
    Safefree(store);
}

/*
 * process new value of multi-bulk reply
 */
static
int _mblk_item(RDB_parser *parser, SV *value) {
    SV *tmp;
    int repeat = 0;

    av_push(parser->mblk_reply, value);
    if (parser->mblk_len > 1) {
        parser->mblk_len--;
        parser->state = RDBP_WAIT_BUCKS;
        repeat = 1;
    }
    else if (parser->mblk_level > 1) {
        parser->mblk_level--;
        tmp = newRV_noinc((SV *)(parser->mblk_reply));
        _mblk_status_fetch(parser);
        repeat = _mblk_item(parser, tmp);
    }

    return repeat;
}

/*
 * check if we finished with this reply, and invoke callback if needed.
 * returns 1 if reply completed or 0 otherwise
 */
static
int _reply_completed(RDB_parser *parser, SV *value) {
    SV *reply, *cb;

    if (parser->mblk_level) {
        if (_mblk_item(parser, value))
            return 0;
        reply = newRV_noinc((SV *)(parser->mblk_reply));
        parser->mblk_reply = NULL;
    }
    else
        reply = value;

    parser->state = RDBP_CLEAN;

    {
        dSP;
        ENTER;
        SAVETMPS;
        if (av_len(parser->callbacks) >= 0) {
            cb = av_shift(parser->callbacks);
            sv_2mortal(cb);
        }
        else if (parser->default_cb != NULL) {
            cb = parser->default_cb;
        }
        else croak("No callbacks in the queue and no default callback set");
        PUSHMARK(SP);
        XPUSHs(sv_2mortal(newRV_inc(parser->master)));
        XPUSHs(sv_2mortal(reply));
        PUTBACK;
        call_sv(cb, G_VOID|G_DISCARD);
        FREETMPS;
        LEAVE;
    }

    return 1;
}

int rdb_parser__parse_reply(RDB_parser *parser) {
    char op;
    char *pv;
    SV *line, *err, *bulk, *mblk;
    long length;

    if (sv_len(parser->buffer) == 0) return 0;

    if (parser->state == RDBP_CLEAN) {
        parser->mblk_level = 0;

        /* remove first character from the buffer */
        pv = SvPVX(parser->buffer);
        op = *pv;
        sv_chop(parser->buffer, pv + 1);

        if (op == '+')
            parser->state = RDBP_READ_LINE;
        else if (op == '-')
            parser->state = RDBP_READ_ERROR;
        else if (op == ':')
            parser->state = RDBP_READ_NUMBER;
        else if (op == '$')
            parser->state = RDBP_READ_BULK_LEN;
        else if (op == '*') {
            parser->state = RDBP_READ_MBLK_LEN;
            parser->mblk_level = 1;
        }
        else {
            croak("Got invalid reply");
        }
    }

    while (1) {
        if (sv_len(parser->buffer) < 2) return 0;

        if (parser->state == RDBP_READ_LINE) {
            line = _read_line(parser->buffer);
            if (line == NULL) return 0;
            if (_reply_completed(parser, line)) return 1;
        }
        else if (parser->state == RDBP_READ_ERROR) {
            line = _read_line(parser->buffer);
            if (line == NULL) return 0;
            err = _create_rdb_error(parser, line);
            if (_reply_completed(parser, err)) return 1;
        }
        else if (parser->state == RDBP_READ_NUMBER) {
            line = _read_number(parser->buffer);
            if (line == NULL) return 0;
            if (_reply_completed(parser, line)) return 1;
        }
        else if (parser->state == RDBP_READ_BULK_LEN) {
            length = _read_length(parser->buffer);
            if (length >= 0) {
                parser->state = RDBP_READ_BULK;
                parser->bulk_len = length;
            }
            else if (length == -1) {
                if (_reply_completed(parser, newSVpvn(NULL, 0))) return 1;
            }
            else return 0;
        }
        else if (parser->state == RDBP_READ_BULK) {
            if (sv_len(parser->buffer) < 2 + parser->bulk_len) return 0;
            pv = SvPVX(parser->buffer);
            bulk = newSVpvn(pv, parser->bulk_len);
            sv_chop(parser->buffer, pv + parser->bulk_len + 2);
            if (parser->utf8) {
                if (!sv_utf8_decode(bulk))
                    croak("Received invalid UTF-8 string from the server");
            }
            if (_reply_completed(parser, bulk)) return 1;
        }
        else if (parser->state == RDBP_READ_MBLK_LEN) {
            length = _read_length(parser->buffer);
            if (length > 0) {
                parser->mblk_len = length;
                parser->state = RDBP_WAIT_BUCKS;
                parser->mblk_reply = newAV();
            }
            else if (length == 0 || length == -1) {
                mblk = (length == 0) ? newRV_noinc((SV *)newAV()) : newSVpvn(NULL, 0);
                parser->mblk_level--;
                if (parser->mblk_level > 0)
                    _mblk_status_fetch(parser);
                if (_reply_completed(parser, mblk)) return 1;
            }
            else return 0;
        }
        else if (parser->state == RDBP_WAIT_BUCKS) {
            /* remove first character from the buffer */
            pv = SvPVX(parser->buffer);
            op = *pv;
            sv_chop(parser->buffer, pv + 1);

            if (op == '$') parser->state = RDBP_READ_BULK_LEN;
            else if (op == ':') parser->state = RDBP_READ_NUMBER;
            else if (op == '+') parser->state = RDBP_READ_LINE;
            else if (op == '-') parser->state = RDBP_READ_ERROR;
            else if (op == '*') {
                parser->state = RDBP_READ_MBLK_LEN;
                parser->mblk_level++;
                _mblk_status_store(parser);
            }
            else croak("Invalid multi-bulk reply. Expected [$:+-*] but got something else");
        }
    }

    return 0;
}