#include <xs/unievent/Timer.h>
#include <xs/unievent/Stream.h>
#include <xs/typemap/expected.h>
#include <xs/unievent/Listener.h>
#include <xs/CallbackDispatcher.h>

using namespace xs;
using namespace xs::unievent;
using namespace panda::unievent;
using panda::string;

static PERL_ITHREADS_LOCAL struct {
    Simple create_connection = Simple::shared("create_connection");
    Simple on_establish      = Simple::shared("on_establish");
    Simple on_connection     = Simple::shared("on_connection");
    Simple on_connect        = Simple::shared("on_connect");
    Simple on_read           = Simple::shared("on_read");
    Simple on_write          = Simple::shared("on_write");
    Simple on_shutdown       = Simple::shared("on_shutdown");
    Simple on_eof            = Simple::shared("on_eof");
} cbn;

struct XSStreamListener : IStreamListener, XSListener {
    StreamSP create_connection (const StreamSP& h) override {
        auto ret = call<Scalar>(cbn.create_connection, xs::out(h));
        return ret ? xs::in<StreamSP>(ret) : StreamSP();
    }

    void on_establish (const StreamSP& h, const StreamSP& client, const ErrorCode& err) override {
        call(cbn.on_establish, xs::out(h), xs::out(client), xs::out(err));
    }

    void on_connection (const StreamSP& h, const StreamSP& client, const ErrorCode& err) override {
        call(cbn.on_connection, xs::out(h), xs::out(client), xs::out(err));
    }

    void on_establish (const StreamSP& h, const ErrorCode& err, const ConnectRequestSP& req) override {
        call(cbn.on_establish, xs::out(h), xs::out(err), xs::out(req));
    }

    void on_connect (const StreamSP& h, const ErrorCode& err, const ConnectRequestSP& req) override {
        call(cbn.on_connect, xs::out(h), xs::out(err), xs::out(req));
    }

    void on_read (const StreamSP& h, string& buf, const ErrorCode& err) override {
        call(cbn.on_read, xs::out(h), err ? Sv::undef : xs::out(buf), xs::out(err));
    }

    void on_write (const StreamSP& h, const ErrorCode& err, const WriteRequestSP& req) override {
        call(cbn.on_write, xs::out(h), xs::out(err), xs::out(req));
    }

    void on_shutdown (const StreamSP& h, const ErrorCode& err, const ShutdownRequestSP& req) override {
        call(cbn.on_shutdown, xs::out(h), xs::out(err), xs::out(req));
    }

    void on_eof (const StreamSP& h) override {
        call(cbn.on_eof, xs::out(h));
    }
};

MODULE = UniEvent::Stream                PACKAGE = UniEvent::Stream
PROTOTYPES: DISABLE

BOOT {
    Stash s(__PACKAGE__);
    s.inherit("UniEvent::Handle");

    xs::at_perl_destroy([]() {
        cbn.create_connection = nullptr;
        cbn.on_establish      = nullptr;
        cbn.on_connection     = nullptr;
        cbn.on_connect        = nullptr;
        cbn.on_read           = nullptr;
        cbn.on_write          = nullptr;
        cbn.on_shutdown       = nullptr;
        cbn.on_eof            = nullptr;
    });
}

bool Stream::readable ()

bool Stream::writable ()

bool Stream::listening ()

bool Stream::connecting ()

bool Stream::established ()

bool Stream::connected ()

bool Stream::wantread ()

bool Stream::shutting_down ()

bool Stream::is_shut_down ()

size_t Stream::write_queue_size ()

bool Stream::is_secure ()

void Stream::connection_factory (Sub callback) {
    THIS->connection_factory = [callback](const StreamSP& h) -> StreamSP {
        return xs::in<StreamSP>( callback.call(xs::out(h)) );
    };
}

XSCallbackDispatcher* Stream::connection_event () {
    RETVAL = XSCallbackDispatcher::create(THIS->connection_event);
}

void Stream::connection_callback (Stream::connection_fn cb) {
    THIS->connection_event.remove_all();
    if (cb) THIS->connection_event.add(cb);
}

XSCallbackDispatcher* Stream::connect_event () {
    RETVAL = XSCallbackDispatcher::create(THIS->connect_event);
}

void Stream::connect_callback (Stream::connect_fn cb) {
    THIS->connect_event.remove_all();
    if (cb) THIS->connect_event.add(cb);
}

XSCallbackDispatcher* Stream::read_event () {
    RETVAL = XSCallbackDispatcher::create(THIS->read_event);
}

void Stream::read_callback (Stream::read_fn cb) {
    THIS->read_event.remove_all();
    if (cb) THIS->read_event.add(cb);
}

XSCallbackDispatcher* Stream::write_event () {
    RETVAL = XSCallbackDispatcher::create(THIS->write_event);
}

void Stream::write_callback (Stream::write_fn cb) {
    THIS->write_event.remove_all();
    if (cb) THIS->write_event.add(cb);
}

XSCallbackDispatcher* Stream::shutdown_event () {
    RETVAL = XSCallbackDispatcher::create(THIS->shutdown_event);
}

void Stream::shutdown_callback (Stream::shutdown_fn cb) {
    THIS->shutdown_event.remove_all();
    if (cb) THIS->shutdown_event.add(cb);
}

XSCallbackDispatcher* Stream::eof_event () {
    RETVAL = XSCallbackDispatcher::create(THIS->eof_event);
}

void Stream::eof_callback (Stream::eof_fn cb) {
    THIS->eof_event.remove_all();
    if (cb) THIS->eof_event.add(cb);
}

Ref Stream::event_listener (Sv lst = Sv(), bool weak = false) {
    RETVAL = event_listener<XSStreamListener>(THIS, ST(0), lst, weak);
}

#// listen([$callback], [$backlog])
#// listen($backlog)
void Stream::listen (Sv cb_bl = Sv(), int backlog = Stream::DEFAULT_BACKLOG) {
    if (items == 2 && cb_bl.is_simple()) {
        backlog = SvIV(cb_bl);
        cb_bl.reset();
    }
    if (cb_bl) {
        auto fn = xs::in<Stream::connection_fn>(cb_bl);
        if (fn) THIS->connection_event.add(fn);
    }
    XSRETURN_EXPECTED(THIS->listen(backlog));
}

void Stream::read_start () {
    XSRETURN_EXPECTED(THIS->read_start());
}

void Stream::read_stop ()

WriteRequestSP Stream::write (Sv sv, Stream::write_fn cb = nullptr) {
    auto buf = sv2buf(sv);
    if (!buf) XSRETURN(0);

    WriteRequestSP req = make_backref<WriteRequest>(buf);
    if (cb) req->event.add(cb);
    THIS->write(req);
    RETVAL = req;
}

#// shutdown([$timeout], [$cb]) or shutdown($cb)
ShutdownRequestSP Stream::shutdown (Sv arg1 = {}, Sv arg2 = {}) {
    double timeout = 0;
    Stream::shutdown_fn cb;

    if (arg2) {
        timeout = SvNV(arg1);
        cb = xs::in<Stream::shutdown_fn>(arg2);
    }
    else if (arg1) {
        if (arg1.is_sub_ref()) cb = xs::in<Stream::shutdown_fn>(arg1);
        else                   timeout = SvNV(arg1);
    }

    ShutdownRequestSP req = make_backref<ShutdownRequest>(cb, timeout * 1000);
    THIS->shutdown(req);
    RETVAL = req;
}

void Stream::disconnect ()

void Stream::sockaddr () : ALIAS(peeraddr=1) {
    auto res = ix == 0 ? THIS->sockaddr() : THIS->peeraddr();
    XSRETURN_EXPECTED(res);
}

void Stream::use_ssl (SslContext ctx = NULL) {
    if (ctx) THIS->use_ssl(ctx);
    else THIS->use_ssl();
}

void Stream::no_ssl ()

void Stream::recv_buffer_size (SV* newval = {}) {
    if (newval) XSRETURN_EXPECTED(THIS->recv_buffer_size(xs::in<int>(newval)));
    else        XSRETURN_EXPECTED(THIS->recv_buffer_size());
}

void Stream::send_buffer_size (SV* newval = {}) {
    if (newval) XSRETURN_EXPECTED(THIS->send_buffer_size(xs::in<int>(newval)));
    else        XSRETURN_EXPECTED(THIS->send_buffer_size());
}

void Stream::run_in_order (Stream::run_fn cb)


MODULE = UniEvent::Stream                PACKAGE = UniEvent::Request::Connect
PROTOTYPES: DISABLE

BOOT {
    Stash stash(__PACKAGE__, GV_ADD);
    stash.inherit("UniEvent::Request");
}

TimerSP ConnectRequest::timeout_timer () {
    RETVAL = THIS->timeout_timer();
}


MODULE = UniEvent::Stream                PACKAGE = UniEvent::Request::Write
PROTOTYPES: DISABLE

BOOT {
    Stash stash(__PACKAGE__, GV_ADD);
    stash.inherit("UniEvent::Request");
}


MODULE = UniEvent::Stream                PACKAGE = UniEvent::Request::Shutdown
PROTOTYPES: DISABLE

BOOT {
    Stash stash(__PACKAGE__, GV_ADD);
    stash.inherit("UniEvent::Request");
}