#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"
#include "ppport.h"

#include <stdio.h>
#include <string.h>

#include <pzk.h>
#include <pzk_channel.h>
#include <pzk_pipe_dispatcher.h>
#include <pzk_interrupt_dispatcher.h>
#include <pzk_transaction.h>
#include <pzk_xs_utils.h>
#include <zookeeper/zookeeper.h>
#include <zookeeper/zookeeper_version.h>


MODULE = ZooKeeper PACKAGE = ZooKeeper 

BOOT:
{
    zoo_set_debug_level(0);
    zoo_set_log_stream(NULL);
}

void
trace(SV* self, int level, const char* path=NULL)
    PPCODE:
        if (level > ZOO_LOG_LEVEL_DEBUG) {
            level = ZOO_LOG_LEVEL_DEBUG;
        }
        zoo_set_debug_level(level);

        FILE* stream = path ? fopen(path, "a") : NULL;
        zoo_set_log_stream(stream);

void
_xs_init(SV* self)
    PPCODE:
        if (!SvROK(self) || (SvTYPE(SvRV(self)) != SVt_PVHV)) XSRETURN_EMPTY;
        pzk_t* pzk = new_pzk(NULL);
        sv_magic(SvRV(self), Nullsv, PERL_MAGIC_ext, (const char*) pzk, 0);

void
connect(pzk, hosts, recv_timeout, _watcher=NULL, clientid=NULL, flags=0)
        pzk_t*            pzk
        const char*       hosts
        SV*               _watcher
        int               recv_timeout
        const clientid_t* clientid
        int               flags
    PPCODE:
        pzk_watcher_t* watcher = (pzk_watcher_t*) unsafe_tied_object_to_ptr(aTHX_ _watcher);
        watcher_fn cb = watcher ? pzk_watcher_cb : NULL;
        zhandle_t* handle = zookeeper_init(hosts, cb, recv_timeout, clientid, (void*) watcher, flags);
        if (!handle) throw_zerror(aTHX_ errno, "Error initializing ZooKeeper handle for '%s': %s", hosts, strerror(errno));

        pzk->handle = handle;

int
is_unrecoverable(pzk_t* pzk)
    CODE:
        RETVAL = is_unrecoverable(pzk->handle);
    OUTPUT:
        RETVAL

void
close(SV* self)
    PPCODE:
        pzk_t* pzk = (pzk_t*) unsafe_tied_object_to_ptr(aTHX_ self);
        if (pzk) pzk->close(pzk);

void
_xs_destroy(SV* self)
    PPCODE:
        pzk_t* pzk = (pzk_t*) unsafe_tied_object_to_ptr(aTHX_ self);
        if (pzk) pzk->destroy(pzk);

void
_set_watcher(pzk_t* pzk, SV* _watcher=NULL)
    PPCODE:
        pzk_watcher_t* watcher = (pzk_watcher_t*) unsafe_tied_object_to_ptr(aTHX_ _watcher);
        zoo_set_watcher(pzk->handle, pzk_watcher_cb);
        zoo_set_context(pzk->handle, (void*) watcher);

int
state(pzk_t* pzk)
    CODE:
        RETVAL = zoo_state(pzk->handle);
    OUTPUT:
        RETVAL

void
add_auth(pzk_t* pzk, char* scheme, char* credential=NULL, SV* _watcher=NULL)
    PPCODE:
        pzk_watcher_t* watcher = (pzk_watcher_t*) unsafe_tied_object_to_ptr(aTHX_ _watcher);
        void_completion_t cb = watcher ? pzk_auth_cb : NULL;
        int rc = zoo_add_auth(pzk->handle, scheme, credential, strlen(credential), cb, (void*) watcher);
        if (rc != ZOK) throw_zerror(aTHX_ rc, "Error trying to authenticate: %s", zerror(rc));

SV*
create(pzk_t* pzk, char* path, SV* value_sv, int buffer_len, struct ACL_vector* acl=NULL, int flags=0)
    CODE:
        char* buffer; Newxz(buffer, buffer_len + 1, char);
        size_t value_len = -1;
        char* value      = SvOK(value_sv) ? SvPV(value_sv, value_len) : NULL;

        int rc = zoo_create(pzk->handle, path, value, value_len, acl, flags, buffer, buffer_len);
        RETVAL = newSVpv(buffer, 0);
        Safefree(buffer);
        if (rc != ZOK) throw_zerror(aTHX_ rc, "Error creating node '%s': %s", path, zerror(rc));
    OUTPUT:
        RETVAL

void
delete(pzk_t* pzk, char* path, int version=-1)
    PPCODE:
        int rc = zoo_delete(pzk->handle, path, version);
        if (rc != ZOK) throw_zerror(aTHX_ rc, "Error deleting node '%s': %s", path, zerror(rc));

SV*
exists(pzk_t* pzk, char* path, SV* _watcher=NULL)
    CODE:
        int rc;
        struct Stat stat; Zero(&stat, 1, struct Stat);
        pzk_watcher_t* watcher = (pzk_watcher_t*) unsafe_tied_object_to_ptr(aTHX_ _watcher);
        if (watcher) {
            rc = zoo_wexists(pzk->handle, path, pzk_watcher_cb, (void*) watcher, &stat);
        } else {
            rc = zoo_exists(pzk->handle, path, 0, &stat);
        }

        if (rc == ZOK) {
            RETVAL = stat_to_sv(aTHX_ &stat);
        } else if (rc == ZNONODE) {
            RETVAL = &PL_sv_undef;
        } else {
            throw_zerror(aTHX_ rc, "Error checking existence of node '%s': %s", path, zerror(rc));
        }
    OUTPUT:
        RETVAL

void
get_children(pzk_t* pzk, char* path, SV* _watcher=NULL)
    PPCODE:
        int rc;
        struct String_vector strings; Zero(&strings, 1, struct String_vector);
        pzk_watcher_t* watcher = (pzk_watcher_t*) unsafe_tied_object_to_ptr(aTHX_ _watcher);
        if (watcher) {
            rc = zoo_wget_children(pzk->handle, path, pzk_watcher_cb, (void*) watcher, &strings);
        } else {
            rc = zoo_get_children(pzk->handle, path, 0, &strings);
        }

        if (rc == ZOK) {
            int32_t size = strings.count;
            int i; for (i = 0; i < size; i++) {
                ST(i) = sv_2mortal(newSVpv(strings.data[i], 0));
            }
            deallocate_String_vector(&strings);
            XSRETURN(size);
        } else {
            deallocate_String_vector(&strings);
            throw_zerror(aTHX_ rc, "Error getting children for node '%s': %s", path, zerror(rc));
        }

void
get(pzk_t* pzk, char* path, int buffer_len, SV* _watcher=NULL)
    PPCODE:
        int rc;
        char* buffer; Newxz(buffer, buffer_len + 1, char);
        struct Stat stat; Zero(&stat, 1, struct Stat);
        pzk_watcher_t* watcher = (pzk_watcher_t*) unsafe_tied_object_to_ptr(aTHX_ _watcher);
        if (watcher) {
            rc = zoo_wget(pzk->handle, path, pzk_watcher_cb, watcher, buffer, &buffer_len, &stat);
        } else {
            rc = zoo_get(pzk->handle, path, 0, buffer, &buffer_len, &stat);
        }

        if (rc != ZOK) throw_zerror(aTHX_ rc, "Error getting data for node '%s': %s", path, zerror(rc));

        ST(0) = buffer_len == -1 ? &PL_sv_undef : newSVpv(buffer, buffer_len);
        Safefree(buffer);
        if (GIMME_V == G_ARRAY) {
            ST(1) = sv_2mortal(stat_to_sv(aTHX_ &stat));
            XSRETURN(2);
        } else {
            XSRETURN(1);
        }

SV*
set(pzk_t* pzk, char* path, SV* value_sv, int version=-1)
    CODE:
        struct Stat stat; Zero(&stat, 1, struct Stat);
        size_t value_len = -1;
        char* value      = SvOK(value_sv) ? SvPV(value_sv, value_len) : NULL;

        int rc = zoo_set2(pzk->handle, path, value, value_len, version, &stat);
        if (rc != ZOK) throw_zerror(aTHX_ rc, "Error setting data for node '%s': %s", path, zerror(rc));
        RETVAL = stat_to_sv(aTHX_ &stat);

    OUTPUT:
        RETVAL

void
get_acl(pzk_t* pzk, char* path)
    PPCODE:
        struct Stat stat; Zero(&stat, 1, struct Stat);
        struct ACL_vector acl; Zero(&acl, 1, struct ACL_vector);
        int rc = zoo_get_acl(pzk->handle, path, &acl, &stat);

        ST(0) = sv_2mortal(acl_vector_to_sv(aTHX_ &acl));
        deallocate_ACL_vector(&acl);

        if (rc != ZOK) throw_zerror(aTHX_ rc, "Error getting acl for node '%s': %s", path, zerror(rc));

        if (GIMME_V == G_ARRAY) {
            ST(1) = sv_2mortal(stat_to_sv(aTHX_ &stat));
            XSRETURN(2);
        } else {
            XSRETURN(1);
        }

void
set_acl(pzk_t* pzk, char* path, SV* acl_sv, int version=-1)
    PPCODE:
        struct ACL_vector* acl = sv_to_acl_vector(aTHX_ acl_sv);
        int rc = zoo_set_acl(pzk->handle, path, version, acl);
        Safefree(acl->data);
        Safefree(acl);
        if (rc != ZOK) throw_zerror(aTHX_ rc, "Error setting acl for node '%s': %s", path, zerror(rc));

const clientid_t*
client_id(pzk_t* pzk)
    CODE:
        RETVAL = zoo_client_id(pzk->handle);
    OUTPUT:
        RETVAL


MODULE = ZooKeeper PACKAGE = ZooKeeper::Channel

void
_xs_init(SV* self)
    PPCODE:
        if (!SvROK(self) || (SvTYPE(SvRV(self)) != SVt_PVHV)) XSRETURN_EMPTY;

        pzk_channel_t* channel = new_pzk_channel();
        sv_magic(SvRV(self), Nullsv, PERL_MAGIC_ext, (const char*) channel, 0);

void
DESTROY(SV* self)
    PPCODE:
        pzk_channel_t* channel = (pzk_channel_t*) unsafe_tied_object_to_ptr(aTHX_ self);
        if (channel) channel->destroy(channel);

int
size(pzk_channel_t* channel)
    CODE:
        RETVAL = channel->size;
    OUTPUT:
        RETVAL

void
send(pzk_channel_t* channel, ...)
    PPCODE:
        int i;
        for (i = 1; i < items; i++) {
            channel->push(channel, SvREFCNT_inc(ST(i)));
        }
        XSRETURN_YES;

SV*
recv(pzk_channel_t* channel)
    CODE:
        SV* element = (SV*) channel->shift(channel);
        if (!element) element = &PL_sv_undef;
        RETVAL = element;
    OUTPUT:
        RETVAL


MODULE = ZooKeeper PACKAGE = ZooKeeper::Dispatcher

SV*
recv_event(pzk_dispatcher_t* dispatcher)
    CODE:
        pzk_channel_t* channel = dispatcher->channel;
        if (!channel->size) XSRETURN_EMPTY;

        pzk_event_t* event = (pzk_event_t*) channel->shift(channel);
        RETVAL = event ? event_to_sv(aTHX_ event) : &PL_sv_undef;
        if (event) event->destroy(event);
    OUTPUT:
        RETVAL

int
send_event(dispatcher, type, state, path, watcher)
        pzk_dispatcher_t* dispatcher
        int               type
        int               state
        const char*       path
        pzk_watcher_t*    watcher
    CODE:
        if (!dispatcher) Perl_croak(aTHX_ "dispatcher has not yet been initialized by ZooKeeper::Dispatcher");
        pzk_watcher_cb(NULL, type, state, path, (void*) watcher);
    OUTPUT:
        RETVAL

void
DESTROY(SV* self)
    PPCODE:
        pzk_dispatcher_t* dispatcher = (pzk_dispatcher_t*) unsafe_tied_object_to_ptr(aTHX_ self);
        if (dispatcher) {
            pzk_event_t*   event;
            pzk_channel_t* channel = dispatcher->channel;
            while ((event = (pzk_event_t*) channel->shift(channel))) {
                event->destroy(event);
            }
        }


MODULE = ZooKeeper PACKAGE = ZooKeeper::Dispatcher::Pipe

void
_xs_init(SV* self, pzk_channel_t* channel)
    PPCODE:
        if (!SvROK(self) || (SvTYPE(SvRV(self)) != SVt_PVHV)) XSRETURN_EMPTY;

        pzk_pipe_dispatcher_t* dispatcher = new_pzk_pipe_dispatcher(channel);
        sv_magic(SvRV(self), Nullsv, PERL_MAGIC_ext, (const char*) dispatcher, 0);

int
fd(pzk_pipe_dispatcher_t* dispatcher)
    CODE:
        RETVAL = dispatcher->fd[0];
    OUTPUT:
        RETVAL

int
read_pipe(pzk_pipe_dispatcher_t* dispatcher)
    CODE:
        RETVAL = dispatcher->read_pipe(dispatcher);
    OUTPUT:
        RETVAL

void
DESTROY(SV* self)
    PPCODE:
        pzk_pipe_dispatcher_t* dispatcher = (pzk_pipe_dispatcher_t*) unsafe_tied_object_to_ptr(aTHX_ self);
        if (dispatcher) dispatcher->destroy(dispatcher);


MODULE = ZooKeeper PACKAGE = ZooKeeper::Dispatcher::Interrupt

void
_xs_init(SV* self, pzk_channel_t* channel, interrupt_fn func, void* arg)
    PPCODE:
        if (!SvROK(self) || (SvTYPE(SvRV(self)) != SVt_PVHV)) XSRETURN_EMPTY;

        pzk_interrupt_dispatcher_t* dispatcher = new_pzk_interrupt_dispatcher(channel, func, arg);
        sv_magic(SvRV(self), Nullsv, PERL_MAGIC_ext, (const char*) dispatcher, 0);

void
DESTROY(SV* self)
    PPCODE:
        pzk_interrupt_dispatcher_t* dispatcher = (pzk_interrupt_dispatcher_t*) unsafe_tied_object_to_ptr(aTHX_ self);
        if (dispatcher) dispatcher->destroy(dispatcher);


MODULE = ZooKeeper PACKAGE = ZooKeeper::Watcher

void
_xs_init(SV* self, pzk_dispatcher_t* dispatcher)
    PPCODE:
        if (!SvROK(self) || (SvTYPE(SvRV(self)) != SVt_PVHV)) XSRETURN_EMPTY;

        pzk_watcher_t* watcher; Newxz(watcher, 1, pzk_watcher_t);
        watcher->dispatcher = dispatcher;
        watcher->ctx        = (void*) sv_rvweaken(SvREFCNT_inc(self));
        sv_magic(SvRV(self), Nullsv, PERL_MAGIC_ext, (const char*) watcher, 0);

void
trigger(pzk_watcher_t* watcher, const char* path, int state, int type)
    PPCODE:
        pzk_watcher_cb(NULL, type, state, path, (void*) watcher);

void
DESTROY(SV* self)
    PPCODE:
        pzk_watcher_t* watcher = (pzk_watcher_t*) unsafe_tied_object_to_ptr(aTHX_ self);
        if (watcher) {
            SvREFCNT_dec(watcher->ctx);
            Safefree(watcher);
        }


MODULE = ZooKeeper PACKAGE = ZooKeeper::Constants

BOOT:
{
    HV* stash = gv_stashpv("ZooKeeper::Constants", GV_ADDWARN);

    newCONSTSUB(stash, "ZOK",                      newSViv(ZOK));
    newCONSTSUB(stash, "ZSYSTEMERROR",             newSViv(ZSYSTEMERROR));
    newCONSTSUB(stash, "ZRUNTIMEINCONSISTENCY",    newSViv(ZRUNTIMEINCONSISTENCY));
    newCONSTSUB(stash, "ZDATAINCONSISTENCY",       newSViv(ZDATAINCONSISTENCY));
    newCONSTSUB(stash, "ZCONNECTIONLOSS",          newSViv(ZCONNECTIONLOSS));
    newCONSTSUB(stash, "ZMARSHALLINGERROR",        newSViv(ZMARSHALLINGERROR));
    newCONSTSUB(stash, "ZUNIMPLEMENTED",           newSViv(ZUNIMPLEMENTED));
    newCONSTSUB(stash, "ZOPERATIONTIMEOUT",        newSViv(ZOPERATIONTIMEOUT));
    newCONSTSUB(stash, "ZBADARGUMENTS",            newSViv(ZBADARGUMENTS));
    newCONSTSUB(stash, "ZINVALIDSTATE",            newSViv(ZINVALIDSTATE));
    newCONSTSUB(stash, "ZAPIERROR",                newSViv(ZAPIERROR));
    newCONSTSUB(stash, "ZNONODE",                  newSViv(ZNONODE));
    newCONSTSUB(stash, "ZNOAUTH",                  newSViv(ZNOAUTH));
    newCONSTSUB(stash, "ZBADVERSION",              newSViv(ZBADVERSION));
    newCONSTSUB(stash, "ZNOCHILDRENFOREPHEMERALS", newSViv(ZNOCHILDRENFOREPHEMERALS));
    newCONSTSUB(stash, "ZNODEEXISTS",              newSViv(ZNODEEXISTS));
    newCONSTSUB(stash, "ZNOTEMPTY",                newSViv(ZNOTEMPTY));
    newCONSTSUB(stash, "ZSESSIONEXPIRED",          newSViv(ZSESSIONEXPIRED));
    newCONSTSUB(stash, "ZINVALIDCALLBACK",         newSViv(ZINVALIDCALLBACK));
    newCONSTSUB(stash, "ZINVALIDACL",              newSViv(ZINVALIDACL));
    newCONSTSUB(stash, "ZAUTHFAILED",              newSViv(ZAUTHFAILED));
    newCONSTSUB(stash, "ZCLOSING",                 newSViv(ZCLOSING));
    newCONSTSUB(stash, "ZNOTHING",                 newSViv(ZNOTHING));

    newCONSTSUB(stash, "ZOO_EPHEMERAL", newSViv(ZOO_EPHEMERAL));
    newCONSTSUB(stash, "ZOO_SEQUENCE",  newSViv(ZOO_SEQUENCE));

    newCONSTSUB(stash, "ZOO_OPEN_ACL_UNSAFE", acl_vector_to_sv(aTHX_ &ZOO_OPEN_ACL_UNSAFE));
    newCONSTSUB(stash, "ZOO_READ_ACL_UNSAFE", acl_vector_to_sv(aTHX_ &ZOO_READ_ACL_UNSAFE));
    newCONSTSUB(stash, "ZOO_CREATOR_ALL_ACL", acl_vector_to_sv(aTHX_ &ZOO_CREATOR_ALL_ACL));

    newCONSTSUB(stash, "ZOO_PERM_READ",   newSViv(ZOO_PERM_READ));
    newCONSTSUB(stash, "ZOO_PERM_WRITE",  newSViv(ZOO_PERM_WRITE));
    newCONSTSUB(stash, "ZOO_PERM_CREATE", newSViv(ZOO_PERM_CREATE));
    newCONSTSUB(stash, "ZOO_PERM_DELETE", newSViv(ZOO_PERM_DELETE));
    newCONSTSUB(stash, "ZOO_PERM_ADMIN",  newSViv(ZOO_PERM_ADMIN));
    newCONSTSUB(stash, "ZOO_PERM_ALL",    newSViv(ZOO_PERM_ALL));

    newCONSTSUB(stash, "ZOO_CREATED_EVENT",     newSViv(ZOO_CREATED_EVENT));
    newCONSTSUB(stash, "ZOO_DELETED_EVENT",     newSViv(ZOO_DELETED_EVENT));
    newCONSTSUB(stash, "ZOO_CHANGED_EVENT",     newSViv(ZOO_CHANGED_EVENT));
    newCONSTSUB(stash, "ZOO_CHILD_EVENT",       newSViv(ZOO_CHILD_EVENT));
    newCONSTSUB(stash, "ZOO_SESSION_EVENT",     newSViv(ZOO_SESSION_EVENT));
    newCONSTSUB(stash, "ZOO_NOTWATCHING_EVENT", newSViv(ZOO_NOTWATCHING_EVENT));

    newCONSTSUB(stash, "ZOO_EXPIRED_SESSION_STATE", newSViv(ZOO_EXPIRED_SESSION_STATE));
    newCONSTSUB(stash, "ZOO_AUTH_FAILED_STATE",     newSViv(ZOO_AUTH_FAILED_STATE));
    newCONSTSUB(stash, "ZOO_CONNECTING_STATE",      newSViv(ZOO_CONNECTING_STATE));
    newCONSTSUB(stash, "ZOO_ASSOCIATING_STATE",     newSViv(ZOO_ASSOCIATING_STATE));
    newCONSTSUB(stash, "ZOO_CONNECTED_STATE",       newSViv(ZOO_CONNECTED_STATE));

    newCONSTSUB(stash, "ZOO_CREATE_OP",  newSViv(ZOO_CREATE_OP));
    newCONSTSUB(stash, "ZOO_DELETE_OP",  newSViv(ZOO_DELETE_OP));
    newCONSTSUB(stash, "ZOO_SETDATA_OP", newSViv(ZOO_SETDATA_OP));
    newCONSTSUB(stash, "ZOO_CHECK_OP",   newSViv(ZOO_CHECK_OP));
}

const char*
zerror(int c)

MODULE = ZooKeeper PACKAGE = ZooKeeper::Transaction

void
commit(SV* self, pzk_t* pzk, int count, SV* ops_sv)
    PPCODE:
        int i;
        zoo_op_result_t* results; Newxz(results, count, zoo_op_result_t);
        zoo_op_t* ops = sv_to_ops(aTHX_ ops_sv);

        int rc = zoo_multi(pzk->handle, count, ops, results);
        if (rc == ZMARSHALLINGERROR && is_unrecoverable(pzk->handle)) {
            // assume the error is ZINVALIDSTATE if handle is unrecoverable
            // internally Request_path_init checks the zhandle status
            //  and returns an error if it is unrecoverable
            // zoo_multi considers this a marshalling error, because it's generated
            //  while forming the request, which is misleading to the user
            rc = ZINVALIDSTATE;
        }

        // handle any system errors
        if (rc != ZOK) {
            int op_errors = 0;
            for (i = 0; i < count; i++) {
                if (results[i].err != ZOK) op_errors++;
            }
            if (!op_errors) {
                // if the multi call failed, but no ops had errors
                // assume this is a system error and throw an exception
                Safefree(results);
                free_ops(aTHX_ ops, count);
                throw_zerror(aTHX_ rc, "Error committing transaction: %s", zerror(rc));
            }
        }

        for (i = 0; i < count; i++) {
            if (rc == ZOK) {
                ST(i) = sv_2mortal(op_to_sv(aTHX_ ops[i]));
            } else {
                ST(i) = sv_2mortal(op_error_to_sv(aTHX_ results[i]));
            }
        }
        Safefree(results);
        free_ops(aTHX_ ops, count);
        XSRETURN(count);