#include "rdkafkaxs.h"
#include "ppport.h"

#define ERRSTR_SIZE 1024

rd_kafka_topic_partition_list_t*
krd_parse_topic_partition_list(pTHX_ AV* tplist) {
    char errstr[ERRSTR_SIZE];
    rd_kafka_topic_partition_list_t* tpar;

    int tplen = av_len(tplist)+1;
    tpar = rd_kafka_topic_partition_list_new(tplen);
    int i;
    for (i=0; i<tplen; i++) {
        SV** elemr = av_fetch(tplist, i, 0);
        if (elemr == NULL)
            continue;
        SV* conf = *elemr;
        if (!SvROK(conf) || strncmp(sv_reftype(SvRV(conf), 0), "HASH", 5) != 0) {
            strncpy(errstr, "elements of topic partition list expected to be hashes", ERRSTR_SIZE);
            goto CROAK;
        }
        HV* confhv = (HV*)SvRV(conf);
        SV** topicsv = hv_fetch(confhv, "topic", 5, 0);
        if (topicsv == NULL) {
            snprintf(errstr, ERRSTR_SIZE, "topic is not specified for element %d of the list", i);
            goto CROAK;
        }
        STRLEN len;
        char* topic = SvPV(*topicsv, len);
        SV** partitionsv = hv_fetch(confhv, "partition", 9, 0);
        if (partitionsv == NULL) {
            snprintf(errstr, ERRSTR_SIZE, "partition is not specified for element %d of the list", i);
            goto CROAK;
        }
        int32_t partition = SvIV(*partitionsv);
        rd_kafka_topic_partition_t* tp = rd_kafka_topic_partition_list_add(tpar, topic, partition);
        hv_iterinit(confhv);
        HE* he;
        while ((he = hv_iternext(confhv)) != NULL) {
            char* key = HePV(he, len);
            SV* val = HeVAL(he);
            if (strncmp(key, "topic", 6) == 0 || strncmp(key, "partition", 10) == 0) {
                /* this we already handled */
                ;
            } else if (strncmp(key, "offset", 7) == 0) {
                tp->offset = SvIV(val);
            } else if (strncmp(key, "metadata", 9) == 0) {
                tp->metadata = SvPV(val, len);
                tp->metadata_size = len;
            } else {
                snprintf(errstr, ERRSTR_SIZE, "unknown option %s for element %d of the list", key, i);
                goto CROAK;
            }
        }
    }
    return tpar;

CROAK:
    rd_kafka_topic_partition_list_destroy(tpar);
    croak("%s", errstr);
    return NULL;
}

AV* krd_expand_topic_partition_list(pTHX_ rd_kafka_topic_partition_list_t* tpar) {
    AV* tplist = newAV();
    int i;
    for (i = 0; i < tpar->cnt; i++) {
        rd_kafka_topic_partition_t* elem = &(tpar->elems[i]);
        HV* tp = newHV();
        hv_stores(tp, "topic", newSVpv(elem->topic, 0));
        hv_stores(tp, "partition", newSViv(elem->partition));
        hv_stores(tp, "offset", newSViv(elem->offset));
        if(elem->metadata_size > 0) {
            hv_stores(tp, "metadata", newSVpvn(elem->metadata, elem->metadata_size));
        }
        av_push(tplist, newRV_noinc((SV*)tp));
    }
    return tplist;
}

rd_kafka_conf_t* krd_parse_config(pTHX_ rdkafka_t *krd, HV* params) {
    char errstr[ERRSTR_SIZE];
    rd_kafka_conf_t* krdconf;
    rd_kafka_conf_res_t res;
    HE *he;

    krdconf = rd_kafka_conf_new();
    rd_kafka_conf_set_opaque(krdconf, (void *)krd);
    hv_iterinit(params);
    while ((he = hv_iternext(params)) != NULL) {
        STRLEN len;
        char* key = HePV(he, len);
        SV* val = HeVAL(he);
        if (strncmp(key, "default_topic_config", len) == 0) {
            if (!SvROK(val) || strncmp(sv_reftype(SvRV(val), 0), "HASH", 5) != 0) {
                strncpy(errstr, "default_topic_config must be a hash reference", ERRSTR_SIZE);
                goto CROAK;
            }
            rd_kafka_topic_conf_t* topconf = krd_parse_topic_config(aTHX_ (HV*)SvRV(val), errstr);
            if (topconf == NULL) goto CROAK;
            rd_kafka_conf_set_default_topic_conf(krdconf, topconf);
        } else {
            /* set named configuration property */
            char *strval = SvPV(val, len);
            res = rd_kafka_conf_set(
                    krdconf,
                    key,
                    strval,
                    errstr,
                    ERRSTR_SIZE);
            if (res != RD_KAFKA_CONF_OK)
                goto CROAK;
        }
    }

    return krdconf;

CROAK:
    rd_kafka_conf_destroy(krdconf);
    croak("%s", errstr);
    return NULL;
}

rd_kafka_topic_conf_t* krd_parse_topic_config(pTHX_ HV *params, char* errstr) {
    rd_kafka_topic_conf_t* topconf = rd_kafka_topic_conf_new();
    rd_kafka_conf_res_t res;
    HE *he;

    hv_iterinit(params);
    while ((he = hv_iternext(params)) != NULL) {
        STRLEN len;
        char* key = HePV(he, len);
        SV* val = HeVAL(he);
        char *strval = SvPV(val, len);
        res = rd_kafka_topic_conf_set(
                topconf,
                key,
                strval,
                errstr,
                ERRSTR_SIZE);
        if (res != RD_KAFKA_CONF_OK) {
            rd_kafka_topic_conf_destroy(topconf);
            return NULL;
        }
    }

    return topconf;
}