#pragma once
#include "Queue.h"
#include "Timer.h"
#include "forward.h"
#include "Request.h"
#include "StreamFilter.h"
#include "BackendHandle.h"
#include "SslContext.h"
#include "ssl/forward.h"
#include "backend/StreamImpl.h"
#include <panda/excepted.h>

namespace panda { namespace unievent {

struct IStreamListener {
    virtual StreamSP create_connection (const StreamSP&)                                             { return {}; }
    virtual void     on_establish      (const StreamSP&, const StreamSP&, const ErrorCode&)          {}
    virtual void     on_connection     (const StreamSP&, const StreamSP&, const ErrorCode&)          {}
    virtual void     on_establish      (const StreamSP&, const ErrorCode&, const ConnectRequestSP&)  {}
    virtual void     on_connect        (const StreamSP&, const ErrorCode&, const ConnectRequestSP&)  {}
    virtual void     on_read           (const StreamSP&, string&, const ErrorCode&)                  {}
    virtual void     on_write          (const StreamSP&, const ErrorCode&, const WriteRequestSP&)    {}
    virtual void     on_shutdown       (const StreamSP&, const ErrorCode&, const ShutdownRequestSP&) {}
    virtual void     on_eof            (const StreamSP&)                                             {}
};

struct IStreamSelfListener : IStreamListener {
    virtual StreamSP create_connection ()                                           { return {}; }
    virtual void     on_establish      (const StreamSP&, const ErrorCode&)          {}
    virtual void     on_connection     (const StreamSP&, const ErrorCode&)          {}
    virtual void     on_establish      (const ErrorCode&, const ConnectRequestSP&)  {}
    virtual void     on_connect        (const ErrorCode&, const ConnectRequestSP&)  {}
    virtual void     on_read           (string&, const ErrorCode&)                  {}
    virtual void     on_write          (const ErrorCode&, const WriteRequestSP&)    {}
    virtual void     on_shutdown       (const ErrorCode&, const ShutdownRequestSP&) {}
    virtual void     on_eof            ()                                           {}

    StreamSP create_connection (const StreamSP&)                                                     override { return create_connection(); }
    void     on_establish      (const StreamSP&, const StreamSP& cli, const ErrorCode& err)          override { on_establish(cli, err); }
    void     on_connection     (const StreamSP&, const StreamSP& cli, const ErrorCode& err)          override { on_connection(cli, err); }
    void     on_establish      (const StreamSP&, const ErrorCode& err, const ConnectRequestSP& req)  override { on_establish(err, req); }
    void     on_connect        (const StreamSP&, const ErrorCode& err, const ConnectRequestSP& req)  override { on_connect(err, req); }
    void     on_read           (const StreamSP&, string& buf, const ErrorCode& err)                  override { on_read(buf, err); }
    void     on_write          (const StreamSP&, const ErrorCode& err, const WriteRequestSP& req)    override { on_write(err, req); }
    void     on_shutdown       (const StreamSP&, const ErrorCode& err, const ShutdownRequestSP& req) override { on_shutdown(err, req); }
    void     on_eof            (const StreamSP&)                                                     override { on_eof(); }
};

struct Stream : virtual BackendHandle, protected backend::IStreamImplListener {
    using Filters         = IntrusiveChain<StreamFilterSP>;
    using conn_factory_fn = function<StreamSP(const StreamSP&)>;
    using connection_fptr = void(const StreamSP& handle, const StreamSP& client, const ErrorCode& err);
    using connection_fn   = function<connection_fptr>;
    using connect_fptr    = void(const StreamSP& handle, const ErrorCode& err, const ConnectRequestSP& req);
    using connect_fn      = function<connect_fptr>;
    using read_fptr       = void(const StreamSP& handle, string& buf, const ErrorCode& err);
    using read_fn         = function<read_fptr>;
    using write_fptr      = void(const StreamSP& handle, const ErrorCode& err, const WriteRequestSP& req);
    using write_fn        = function<write_fptr>;
    using shutdown_fptr   = void(const StreamSP& handle, const ErrorCode& err, const ShutdownRequestSP& req);
    using shutdown_fn     = function<shutdown_fptr>;
    using eof_fptr        = void(const StreamSP& handle);
    using eof_fn          = function<eof_fptr>;
    using run_fptr        = void(const StreamSP& handle);
    using run_fn          = function<run_fptr>;

    static const int DEFAULT_BACKLOG = 128;

    buf_alloc_fn                        buf_alloc_callback;
    conn_factory_fn                     connection_factory;
    CallbackDispatcher<connection_fptr> connection_event;
    CallbackDispatcher<connect_fptr>    connect_event;
    CallbackDispatcher<read_fptr>       read_event;
    CallbackDispatcher<write_fptr>      write_event;
    CallbackDispatcher<shutdown_fptr>   shutdown_event;
    CallbackDispatcher<eof_fptr>        eof_event;

    IStreamListener* event_listener () const             { return _listener; }
    void             event_listener (IStreamListener* l) { _listener = l; }

    string buf_alloc (size_t cap) noexcept override;

    bool   readable         () const { return impl()->readable(); }
    bool   writable         () const { return impl()->writable(); }
    bool   listening        () const { return flags & LISTENING; }
    bool   connecting       () const { return flags & CONNECTING; }
    bool   established      () const { return flags & ESTABLISHED; }
    bool   connected        () const { return flags & (IN_CONNECTED | OUT_CONNECTED); }
    bool   in_connected     () const { return flags & IN_CONNECTED; }
    bool   out_connected    () const { return flags & OUT_CONNECTED; }
    bool   wantread         () const { return !(flags & DONTREAD); }
    bool   shutting_down    () const { return flags & SHUTTING; }
    bool   is_shut_down     () const { return flags & SHUT; }
    size_t write_queue_size () const { return _wq_size + impl()->write_queue_size(); }

    excepted<void, ErrorCode>         listen (int backlog) { return listen(nullptr, backlog); }
    virtual excepted<void, ErrorCode> listen (connection_fn callback = nullptr, int backlog = DEFAULT_BACKLOG);

    virtual void   write (const WriteRequestSP&);
    WriteRequestSP write (const string& buf, write_fn callback = nullptr); /*INLINE*/
    template <class It>
    WriteRequestSP write (const It& begin, const It& end, write_fn callback = nullptr); /*INLINE*/
    template <class Range, typename = typename std::enable_if<std::is_convertible<decltype(*std::declval<Range>().begin()), string>::value>::type>
    WriteRequestSP write (const Range& range, write_fn callback = nullptr); /*INLINE*/

    virtual void shutdown (const ShutdownRequestSP&);
    /*INL*/ void shutdown (shutdown_fn callback = {});
    /*INL*/ void shutdown (uint64_t timeout, shutdown_fn callback = {});

    template <class T> void run_in_order (T&& code);

    excepted<void, ErrorCode> read_start () {
        set_wantread(true);
        flags &= ~IGNORE_READ;
        auto err = _read_start();
        if (err) return make_unexpected(ErrorCode(err));
        return {};
    }

    void read_stop ();

    void read_ignore () { flags |= IGNORE_READ; }

    virtual void disconnect ();

    void reset () override;
    void clear () override;

    void use_ssl (const SslContext& context);
    void use_ssl (const SSL_METHOD* method = nullptr);
    void no_ssl  ();

    SSL* get_ssl   () const;
    bool is_secure () const;

    void add_filter    (const StreamFilterSP&, bool force = false);
    void remove_filter (const StreamFilterSP&, bool force = false);

    template <typename F>
    iptr<F>        get_filter ()                 const { return static_pointer_cast<F>(get_filter(F::TYPE)); }
    StreamFilterSP get_filter (const void* type) const;

    void push_ahead_filter  (const StreamFilterSP& filter, bool force = false) {
        if (!force) _check_change_filters();
        _filters.insert(_filters.begin(), filter);
    }

    void push_behind_filter (const StreamFilterSP& filter, bool force = false) {
        if (!force) _check_change_filters();
        _filters.insert(_filters.end(), filter);
    }

    Filters& filters () { return _filters; }

    excepted<fh_t, ErrorCode> fileno () const { return _impl ? handle_fd_excepted(impl()->fileno()) : fh_t(); }

    excepted<int,  ErrorCode> recv_buffer_size () const { return make_excepted(impl()->recv_buffer_size()); }
    excepted<int,  ErrorCode> send_buffer_size () const { return make_excepted(impl()->send_buffer_size()); }
    excepted<void, ErrorCode> recv_buffer_size (int value) { return make_excepted(impl()->recv_buffer_size(value)); }
    excepted<void, ErrorCode> send_buffer_size (int value) { return make_excepted(impl()->send_buffer_size(value)); }
    
    virtual excepted<net::SockAddr, ErrorCode> sockaddr () const = 0;
    virtual excepted<net::SockAddr, ErrorCode> peeraddr () const = 0;

protected:
    Queue queue;

    Stream () : flags(), _wq_size(), _listener() {
        panda_log_ctor();
    }

    virtual void accept ();
    virtual void accept (const StreamSP& client);

    virtual StreamSP create_connection () = 0;

    void set_listening   () { flags |= LISTENING; }
    void set_connecting  () { flags |= CONNECTING; }
    void set_established () { flags |= ESTABLISHED; }

    std::error_code set_connect_result (bool ok) {
        flags &= ~CONNECTING;
        if (ok) {
            flags |= IN_CONNECTED|OUT_CONNECTED|ESTABLISHED;
            if (wantread()) return _read_start();
        }
        return {};
    }

    void set_wantread (bool on) { on ? (flags &= ~DONTREAD) : (flags |= DONTREAD); }
    void set_reading  (bool on) { on ? (flags |= READING) : (flags &= ~READING); }
    void set_shutting ()        { flags |= SHUTTING; }

    void clear_out_connected () {
        flags &= ~OUT_CONNECTED;
        if (!(flags & IN_CONNECTED)) flags &= ~ESTABLISHED;
    }

    void clear_in_connected () {
        flags &= ~IN_CONNECTED;
        if (!(flags & OUT_CONNECTED)) flags &= ~ESTABLISHED;
    }

    void set_shutdown (bool ok) {
        flags &= ~SHUTTING;
        flags |= SHUT;
        if (ok) clear_out_connected();
    }

    virtual void on_reset () {}

    ~Stream ();

private:
    friend StreamFilter; friend ConnectRequest; friend WriteRequest; friend ShutdownRequest;
    friend struct DisconnectRequest; friend AcceptRequest; friend RunInOrderRequest;

    static const uint32_t LISTENING     = 1;
    static const uint32_t CONNECTING    = 2;
    static const uint32_t ESTABLISHED   = 4;  // physically connected
    static const uint32_t IN_CONNECTED  = 8;  // logically connected for reading (connected and eof not received)
    static const uint32_t OUT_CONNECTED = 16; // logically connected for writing (connected and shutdown not done)
    static const uint32_t DONTREAD      = 32; // turn off incoming events completely (eof as well)
    static const uint32_t READING       = 64;
    static const uint32_t SHUTTING      = 128;
    static const uint32_t SHUT          = 256;
    static const uint32_t IGNORE_READ   = 512; // turn off only reading (eof is enabled)

    uint32_t         flags;
    Filters          _filters;
    size_t           _wq_size;
    IStreamListener* _listener;

    backend::StreamImpl* impl () const { return static_cast<backend::StreamImpl*>(BackendHandle::impl()); }

    bool reading () const { return flags & READING; }

    template <class T, class...Args>
    void invoke_sync (T filter_method, Args&&...args) {
        if (_filters.size()) (_filters.front()->*filter_method)(std::forward<Args>(args)...);
    }

    void handle_connection          (const std::error_code&) override;
    void finalize_handle_connection (const StreamSP& client, const ErrorCode&, const AcceptRequestSP&);
    void finalize_handle_connect    (const ErrorCode&, const ConnectRequestSP&);
    void notify_on_connect          (const ErrorCode&, const ConnectRequestSP&);
    void handle_read                (string&, const std::error_code&) override;
    void finalize_handle_read       (string& buf, const ErrorCode&);
    void finalize_write             (const WriteRequestSP&);
    void finalize_handle_write      (const ErrorCode&, const WriteRequestSP&);
    void notify_on_write            (const ErrorCode&, const WriteRequestSP&);
    void handle_eof                 () override;
    void finalize_handle_eof        ();
    void finalize_shutdown          (const ShutdownRequestSP&);
    void finalize_handle_shutdown   (const ErrorCode&, const ShutdownRequestSP&);
    void notify_on_shutdown         (const ErrorCode&, const ShutdownRequestSP&);

    void _reset ();
    void _clear ();

    std::error_code _read_start ();

    void _check_change_filters () {
        if (connecting() || established()) throw Error("can't change stream filters when active");
    }
};

struct StreamRequest : Request {
protected:
    friend Stream; friend StreamFilter;
    Stream* handle;

    StreamRequest () : handle() {}

    void set (Stream* h) {
        handle = h;
        Request::set(h);
    }
};
using StreamRequestSP = iptr<StreamRequest>;


struct AcceptRequest : StreamRequest, AllocatedObject<AcceptRequest> {
    AcceptRequest (Stream* h) { set(h); }

    void exec         ()                 override {}
    void cancel       (const ErrorCode&) override { handle->queue.done(this, []{}); }
    void notify       (const ErrorCode&) override {}
    void handle_event (const ErrorCode&) override {}
};


struct ConnectRequest : StreamRequest {
    CallbackDispatcher<Stream::connect_fptr> event;
    
    const TimerSP& timeout_timer () const { return _timer; }

protected:
    friend Stream;
    uint64_t timeout;

    ConnectRequest (Stream::connect_fn callback = {}, uint64_t timeout = 0) : timeout(timeout) {
        panda_log_ctor();
        if (callback) event.add(callback);
    }

    backend::ConnectRequestImpl* impl () {
        if (!_impl) _impl = handle->impl()->new_connect_request(this);
        return static_cast<backend::ConnectRequestImpl*>(_impl);
    }

    void exec         () override = 0;
    void handle_event (const ErrorCode&) override;
    void notify       (const ErrorCode&) override;
    
private:
    TimerSP _timer;
};


struct WriteRequest : StreamRequest, AllocatedObject<WriteRequest> {
    CallbackDispatcher<Stream::write_fptr> event;
    std::vector<string> bufs;

    WriteRequest () {}

    WriteRequest (const string& data) {
        bufs.push_back(data);
    }

    template <class It>
    WriteRequest (const It& begin, const It& end) {
        bufs.reserve(std::distance(begin, end));
        for (auto it = begin; it != end; ++it) bufs.push_back(*it);
    }

    template <class Range, typename = decltype(*std::declval<Range>().begin())>
    WriteRequest (const Range& range) {
        bufs.reserve(range.size());
        for (auto iter = range.begin(); iter != range.end(); ++iter) bufs.push_back(*iter);
    }

private:
    friend Stream;

    backend::WriteRequestImpl* impl () {
        if (!_impl) _impl = handle->impl()->new_write_request(this);
        return static_cast<backend::WriteRequestImpl*>(_impl);
    }

    void exec         () override;
    void handle_event (const ErrorCode&) override;
    void notify       (const ErrorCode&) override;
};


struct ShutdownRequest : StreamRequest {
    CallbackDispatcher<Stream::shutdown_fptr> event;

    ShutdownRequest (Stream::shutdown_fn callback = {}, uint64_t timeout = 0) : timeout(timeout) {
        panda_log_ctor();
        if (callback) event.add(callback);
    }

private:
    friend Stream;

    uint64_t timeout;
    TimerSP  timer;
    bool     timed_out = false;

    backend::ShutdownRequestImpl* impl () {
        if (!_impl) _impl = handle->impl()->new_shutdown_request(this);
        return static_cast<backend::ShutdownRequestImpl*>(_impl);
    }

    void exec         () override;
    void handle_event (const ErrorCode&) override;
    void notify       (const ErrorCode&) override;
    void cancel       (const ErrorCode& err = make_error_code(std::errc::operation_canceled)) override;
};

struct RunInOrderRequest : StreamRequest {
    Stream::run_fn code;

    template <class T>
    RunInOrderRequest (T&& _code) {
        code = std::forward<T>(_code);
    }

    void exec         () override;
    void handle_event (const ErrorCode&) override;
    void notify       (const ErrorCode&) override;
};

inline WriteRequestSP Stream::write (const string& data, write_fn callback) {
    WriteRequestSP req = new WriteRequest(data);
    if (callback) req->event.add(callback);
    write(req);
    return req;
}

template <class It>
inline WriteRequestSP Stream::write (const It& begin, const It& end, write_fn callback) {
    WriteRequestSP req = new WriteRequest(begin, end);
    if (callback) req->event.add(callback);
    write(req);
    return req;
}

template <class Range, typename>
inline WriteRequestSP Stream::write (const Range& range, write_fn callback) {
    WriteRequestSP req = new WriteRequest(range);
    if (callback) req->event.add(callback);
    write(req);
    return req;
}

inline void Stream::shutdown (shutdown_fn callback)                   { shutdown(new ShutdownRequest(callback)); }
inline void Stream::shutdown (uint64_t timeout, shutdown_fn callback) { shutdown(new ShutdownRequest(callback, timeout)); }

template <class T>
inline void Stream::run_in_order (T&& code) {
    if (!queue.size()) {
        auto param = StreamSP(this);
        code(param);
        return;
    }
    RunInOrderRequestSP req = new RunInOrderRequest(std::forward<T>(code));
    req->set(this);
    queue.push(req);
}

}}