#include "Stream.h"
#include "ssl/SslFilter.h"

namespace panda { namespace unievent {

using ssl::SslFilter;

#define HOLD_ON(what) StreamSP __hold = what; (void)__hold;

#define INVOKE(h, f, fm, hm, ...) do { \
    if (f) f->fm(__VA_ARGS__);      \
    else   h->hm(__VA_ARGS__);      \
} while(0)

#define REQUEST_REQUIRE_WRITE_STATE do {                        \
    if (!handle->out_connected())                               \
        return delay([this]{                                    \
            cancel(make_error_code(std::errc::not_connected));  \
        });                                                     \
} while(0)

Stream::~Stream () {
    panda_log_dtor();
}

string Stream::buf_alloc (size_t cap) noexcept {
    try {
        return buf_alloc_callback ? buf_alloc_callback(cap) : string(cap);
    } catch (...) {
        return {};
    }
}

// ===================== CONNECTION ===============================
excepted<void, ErrorCode> Stream::listen (connection_fn callback, int backlog) {
    panda_log_info("Stream::listen, backlog " << backlog);
    invoke_sync(&StreamFilter::listen);
    if (callback) connection_event.add(callback);
    auto code = impl()->listen(backlog);
    if (code) {
        panda_log_info("Stream::listen error:" << code);
        return make_unexpected(ErrorCode(errc::listen_error, code));
    } else {
        set_listening();
        return {};
    }
}

void Stream::handle_connection (const std::error_code& err) {
    panda_log_info("handle_connection err: " << err << " this: " << this);
    HOLD_ON(this);
    if (err) INVOKE(this, _filters.back(), handle_connection, finalize_handle_connection, nullptr, err, nullptr);
    else     accept();
}

void Stream::accept () {
    StreamSP self = this;
    StreamSP client;
    if (connection_factory) client = connection_factory(self);
    else {
        if (_listener) client = _listener->create_connection(self);
        if (!client)   client = create_connection();
    }
    accept(client);
}

void Stream::accept (const StreamSP& client) {
    panda_log_debug("accept client " << client << ", this: " << this);
    auto err = impl()->accept(client->impl());
    if (!err) {
        client->set_connecting();
        client->set_established();
    }

    if (_listener) _listener->on_establish(this, client, err);

    // filters may delay handle_connection() and make subrequests
    // creating dummy AcceptRequest follows 2 purposes: holding the only client reference and delaying users requests until handle_connection() is done
    AcceptRequestSP areq;
    if (_filters.size()) {
        areq = new AcceptRequest(client);
        client->queue.push(areq);
    }
    INVOKE(this, _filters.back(), handle_connection, finalize_handle_connection, client, err, areq);
}

void Stream::finalize_handle_connection (const StreamSP& client, const ErrorCode& connection_err, const AcceptRequestSP& req) {
    ErrorCode err;
    if (!connection_err) {
        auto read_start_err = client->set_connect_result(true);
        if (read_start_err) err = ErrorCode(errc::read_start_error, read_start_err);
    } else {
        if (client) client->set_connect_result(false);
        err = connection_err;
    }
    panda_log_debug("finalize_handle_connection err: " << err << "client: " << client << ", this: " << this);

    if (req && client) client->queue.done(req, []{});
    StreamSP self = this;

    // call on client stream as well (may be useful)
    if (client) {
        client->connection_event(self, client, err);
        if (client->_listener) client->_listener->on_connection(self, client, err);
    }

    connection_event(self, client, err);
    if (_listener) _listener->on_connection(self, client, err);
}

// ===================== CONNECT ===============================
void ConnectRequest::exec () {
    panda_log_debug("ConnectRequest::exec " << this);
    handle->set_connecting();

    if (timeout) {
        _timer = new Timer(handle->loop());
        _timer->event.add([this](const TimerSP&){ cancel(make_error_code(std::errc::timed_out)); });
        _timer->once(timeout);
    }
}

void ConnectRequest::handle_event (const ErrorCode& err) {
    panda_log_debug("ConnectRequest::handle_event " << this);
    if (!err) handle->set_established();
    StreamSP stream = handle;
    ConnectRequestSP self = this;
    if (handle->_listener) handle->_listener->on_establish(stream, err, self);
    INVOKE(stream, last_filter, handle_connect, finalize_handle_connect, err, self);
}

void ConnectRequest::notify (const ErrorCode& err) { handle->notify_on_connect(err, this); }

void Stream::finalize_handle_connect (const ErrorCode& connect_err, const ConnectRequestSP& req) {
    ErrorCode err;
    if (!connect_err) {
        auto read_start_err = set_connect_result(true);
        if (read_start_err) err = ErrorCode(errc::read_start_error, read_start_err);
    } else {
        set_connect_result(false);
        err = connect_err;
    }
    panda_log_debug("finalize_handle_connect err: " << err << "req: " << req << ", this: " << this);

    if (req->_timer) {
        req->_timer->pause();
        req->_timer->event.remove_all();
    }

    // if we are already canceling queue now, do not start recursive cancel
    if (!err || queue.canceling()) {
        queue.done(req, [=]{ notify_on_connect(err, req); });
    }
    else { // cancel everything till the end of queue, but call connect callback with actual status(err), not with ECANCELED
        queue.cancel([&]{
            // temporary variable for lambda fixes some kind of bug in clang 8 compiler on FreeBSD: somewhy dtor of "req" variable is called twice
            auto l = [=]{ notify_on_connect(err, req); };
            queue.done(req, l);
        }, [&]{
            _reset();
        });
    }
}

void Stream::notify_on_connect (const ErrorCode& err, const ConnectRequestSP& req) {
    StreamSP self = this;
    req->event(self, err, req);
    connect_event(self, err, req);
    if (_listener) _listener->on_connect(self, err, req);
}

// ===================== READ ===============================
std::error_code Stream::_read_start () {
    if (reading() || !established()) return {};
    auto err = impl()->read_start();
    if (!err) set_reading(true);
    return err;
}

void Stream::read_stop () {
    set_wantread(false);
    if (!reading()) return;
    impl()->read_stop();
    set_reading(false);
}

void Stream::handle_read (string& buf, const std::error_code& err) {
    if (err == std::errc::connection_reset) return handle_eof(); // sometimes (when we were WRITING) read with error occurs instead of EOF
    if (flags & IGNORE_READ) return;
    HOLD_ON(this);
    INVOKE(this, _filters.back(), handle_read, finalize_handle_read, buf, err);
}

void Stream::finalize_handle_read (string& buf, const ErrorCode& err) {
    StreamSP self = this;
    read_event(self, buf, err);
    if (_listener) _listener->on_read(self, buf, err);
}

// ===================== WRITE ===============================
void Stream::write (const WriteRequestSP& req) {
    panda_log_debug("WriteRequest::exec req: " << req << ", this: " << this);
    for (const auto& buf : req->bufs) _wq_size += buf.length();
    req->set(this);
    queue.push(req);
}

void WriteRequest::exec () {
    panda_log_debug([&]{
        size_t sum = 0;
        for (const auto b : bufs) {
            sum += b.size();
        }
        log << "WriteRequest::exec " << this << " " << sum << " bytes";
    });
    REQUEST_REQUIRE_WRITE_STATE;
    last_filter = handle->_filters.front();
    for (const auto& buf : bufs) handle->_wq_size -= buf.length();
    INVOKE(handle, last_filter, write, finalize_write, this);
}

void Stream::finalize_write (const WriteRequestSP& req) {
    panda_log_debug("WriteRequest::finalize_write " << this);
    auto err = impl()->write(req->bufs, req->impl());
    if (err) req->delay([=]{ req->cancel(err); });
}

void WriteRequest::handle_event (const ErrorCode& err) {
    panda_log_debug("WriteRequest::handle_event " << this << ", err" << err);
    if (err & std::errc::broken_pipe) handle->clear_out_connected();
    HOLD_ON(handle);
    INVOKE(handle, last_filter, handle_write, finalize_handle_write, err, this);
}

void WriteRequest::notify (const ErrorCode& err) { handle->notify_on_write(err, this); }

void Stream::finalize_handle_write (const ErrorCode& err, const WriteRequestSP& req) {
    panda_log_debug("finalize_handle_write err: " << err << ", request" << req << ", this" << this);
    queue.done(req, [=]{ notify_on_write(err, req); });
}

void Stream::notify_on_write (const ErrorCode& err, const WriteRequestSP& req) {
    StreamSP self = this;
    req->event(self, err, req);
    write_event(self, err, req);
    if (_listener) _listener->on_write(self, err, req);
}

// ===================== EOF ===============================
void Stream::handle_eof () {
    if (!established()) return;
    clear_in_connected();
    HOLD_ON(this);
    INVOKE(this, _filters.back(), handle_eof, finalize_handle_eof);
}

void Stream::finalize_handle_eof () {
    StreamSP self = this;
    eof_event(self);
    if (_listener) _listener->on_eof(self);
}

// ===================== SHUTDOWN ===============================
void Stream::shutdown (const ShutdownRequestSP& req) {
    panda_log_debug("shutdown req: " << req << ", this:" << this);
    req->set(this);
    req->timed_out = false;

    if (req->timeout) {
        req->timer = new Timer(loop());
        auto reqp = req.get();
        req->timer->event.add([this, reqp](const TimerSP&){
            // if we have any requests not completed before shutdown - cancel it with status "cancelled" and then cancel shutdown request with status "timed out"
            // otherwise, just cancel shutdown request with status "timed out"
            // notice that we don't cancel all the queue, but only everything before shutdown request. next request after shutdown request will start running right now
            auto prev_req = intrusive_chain_prev(RequestSP(reqp));
            if (prev_req) {
                reqp->timed_out = true; // needed for calling with correct err if somebody calls reset() in previous requests handlers
                queue.cancel([]{}, [reqp] {
                    reqp->cancel(make_error_code(std::errc::timed_out));
                }, prev_req);
            } else {
                reqp->cancel(make_error_code(std::errc::timed_out));
            }
        });
        req->timer->once(req->timeout);
    }

    queue.push(req);
}

void ShutdownRequest::exec () {
    panda_log_debug("ShutdownRequest::exec " << this);
    REQUEST_REQUIRE_WRITE_STATE;
    last_filter = handle->_filters.front();
    INVOKE(handle, last_filter, shutdown, finalize_shutdown, this);
}

void Stream::finalize_shutdown (const ShutdownRequestSP& req) {
    panda_log_debug("finalize_shutdown " << this);
    set_shutting();
    impl()->shutdown(req->impl());
}

void ShutdownRequest::handle_event (const ErrorCode& err) {
    panda_log_debug("ShutdownRequest::handle_event " << this);
    HOLD_ON(handle);
    INVOKE(handle, last_filter, handle_shutdown, finalize_handle_shutdown, err, this);
}

void ShutdownRequest::notify (const ErrorCode& err) { handle->notify_on_shutdown(err, this); }

void ShutdownRequest::cancel (const ErrorCode& err) {
    if (timed_out && err & std::errc::operation_canceled) {
        timed_out = false;
        StreamRequest::cancel(make_error_code(std::errc::timed_out));
    }
    else StreamRequest::cancel(err);
}

void Stream::finalize_handle_shutdown (const ErrorCode& err, const ShutdownRequestSP& req) {
    panda_log_debug("finalize_handle_shutdown req: " << err << "req: " << req << ", this: " << this);
    set_shutdown(!err);
    req->timer = nullptr;
    queue.done(req, [=]{ notify_on_shutdown(err, req); });
}

void Stream::notify_on_shutdown (const ErrorCode& err, const ShutdownRequestSP& req) {
    StreamSP self = this;
    req->event(self, err, req);
    shutdown_event(self, err, req);
    if (_listener) _listener->on_shutdown(self, err, req);
}

// ===================== DISCONNECT/RESET/CLEAR ===============================
struct DisconnectRequest : StreamRequest {
    DisconnectRequest (Stream* h) { set(h); }

    void exec         ()                 override { handle->queue.done(this, [&]{ HOLD_ON(handle); handle->_reset(); }); }
    void cancel       (const ErrorCode&) override { handle->queue.done(this, []{}); }
    void handle_event (const ErrorCode&) override {}
    void notify       (const ErrorCode&) override {}
};

void Stream::disconnect () {
    if (!queue.size()) {
        HOLD_ON(this);
        _reset();
    }
    else if (queue.size() == 1 && connecting()) reset();
    else queue.push(new DisconnectRequest(this));
}

void Stream::reset () {
    HOLD_ON(this);
    queue.cancel([&]{ _reset(); });
}

void Stream::_reset () {
    BackendHandle::reset();
    invoke_sync(&StreamFilter::reset);
    flags &= DONTREAD; // clear flags except DONTREAD
    _wq_size = 0;
    on_reset();
}

void Stream::clear () {
    HOLD_ON(this);
    queue.cancel([&]{ _clear(); });
}

void Stream::_clear () {
    BackendHandle::clear();
    invoke_sync(&StreamFilter::reset);
    _filters.clear();
    flags              = 0;
    _wq_size           = 0;
    _listener          = nullptr;
    buf_alloc_callback = nullptr;
    connection_factory = nullptr;
    connection_event.remove_all();
    connect_event.remove_all();
    read_event.remove_all();
    write_event.remove_all();
    shutdown_event.remove_all();
    eof_event.remove_all();
    on_reset();
}

// ===================== FILTERS ADD/REMOVE ===============================
void Stream::add_filter (const StreamFilterSP& filter, bool force) {
    assert(filter);
    if (!force) _check_change_filters();
    auto it = _filters.begin();
    auto pos = it;
    bool found = false;
    while (it != _filters.end()) {
        if ((*it)->type() == filter->type()) {
            *it = filter;
            return;
        }
        if ((*it)->priority() > filter->priority() && !found) {
            pos = it;
            found = true;
        }
        it++;
    }
    if (found) _filters.insert(pos, filter);
    else _filters.push_back(filter);
}

void Stream::remove_filter (const StreamFilterSP& filter, bool force) {
    if (!force) _check_change_filters();
    _filters.erase(filter);
}

StreamFilterSP Stream::get_filter (const void* type) const {
    for (const auto& f : _filters) if (f->type() == type) return f;
    return {};
}

void Stream::use_ssl (const SslContext &context)  { add_filter(new SslFilter(this, context)); }
void Stream::use_ssl (const SSL_METHOD* method)   { add_filter(new SslFilter(this, method)); }

bool Stream::is_secure () const { return get_filter(SslFilter::TYPE); }

SSL* Stream::get_ssl() const {
    auto filter = get_filter<SslFilter>();
    return filter ? filter->get_ssl() : nullptr;
}

void Stream::no_ssl () {
    auto filter = get_filter<SslFilter>();
    if (!filter) return;
    remove_filter(filter);
}

// ===================== RUN IN ORDER REQUEST ===============================
void RunInOrderRequest::exec         ()                     { handle->queue.done(this, [this]{ code(handle); }); }
void RunInOrderRequest::handle_event (const ErrorCode& err) { assert(err); }
void RunInOrderRequest::notify       (const ErrorCode& err) { assert(err); }

}}