#pragma once
#include <panda/log.h>
#include <panda/unievent/Stream.h>
#include <panda/protocol/websocket/Parser.h>
#include <panda/error.h>

#include "Error.h"
#include "panda/net/sockaddr.h"
#include "panda/protocol/websocket/utils.h"

namespace panda { namespace unievent { namespace websocket {

using Opcode    = protocol::websocket::Opcode;
using Message   = protocol::websocket::Message;
using MessageSP = protocol::websocket::MessageSP;
namespace CloseCode = protocol::websocket::CloseCode;

struct Connection;
using ConnectionSP = iptr<Connection>;

struct Builder : private protocol::websocket::MessageBuilder {
    using MessageBuilder::opcode;
    using MessageBuilder::deflate;

    using send_fn = function<void(const ConnectionSP&, const ErrorCode&, const WriteRequestSP&)>;

    Builder (Builder&&);

    WriteRequestSP send (string_view payload, const send_fn& callback = {});

    Builder& opcode  (Opcode value) { MessageBuilder::opcode(value); return *this; }
    Builder& deflate (bool value)   { MessageBuilder::deflate(value); return *this; }

    template <class B, class E, class T = decltype(*std::declval<B>()), class = std::enable_if_t<std::is_convertible<T, string_view>::value>>
    WriteRequestSP send (B&& begin, E&& end, const send_fn& callback = {});

    template <class B, class E, class T = decltype(*std::declval<B>()), class = std::enable_if_t<std::is_convertible<T, string_view>::value>>
    WriteRequestSP send_multiframe (B&& begin, E&& end, const send_fn& callback = {});

    template <class B, class E, class T = decltype(*((*std::declval<B>()).begin()))>
    std::enable_if_t<std::is_convertible<T, string_view>::value, WriteRequestSP>
    send_multiframe (B&& begin, E&& end, const send_fn& callback = {});

protected:
    friend struct Connection;
    Connection& _connection;

private:
    Builder (Connection& connection);
};

struct Connection : Refcnt, protected IStreamSelfListener {
    using Parser = protocol::websocket::Parser;
    using message_fptr    = void(const ConnectionSP&, const MessageSP&);
    using message_fn      = function<message_fptr>;
    using error_fptr      = void(const ConnectionSP&, const ErrorCode&);
    using error_fn        = function<error_fptr>;
    using close_fptr      = void(const ConnectionSP&, uint16_t, const string&);
    using close_fn        = function<close_fptr>;
    using peer_close_fptr = void(const ConnectionSP&, const MessageSP&);
    using peer_close_fn   = function<peer_close_fptr>;
    using ping_fptr       = void(const ConnectionSP&, const MessageSP&);
    using ping_fn         = function<ping_fptr>;
    using pong_fptr       = ping_fptr;
    using pong_fn         = function<pong_fptr>;
    using send_fn         = Builder::send_fn;

    struct Config : Parser::Config {
        uint64_t shutdown_timeout = 5000; //ms
    };

    enum class State { INITIAL, TCP_CONNECTING, CONNECTING, CONNECTED, HALT };

    CallbackDispatcher<message_fptr>    message_event;
    CallbackDispatcher<error_fptr>      error_event;
    CallbackDispatcher<close_fptr>      close_event;
    CallbackDispatcher<peer_close_fptr> peer_close_event;
    CallbackDispatcher<ping_fptr>       ping_event;
    CallbackDispatcher<pong_fptr>       pong_event;

    Connection () : _state(State::INITIAL), _error_state() {}

    void configure (const Config& conf);

    const LoopSP& loop() const { return _loop; }

    const StreamSP& stream () const { return _stream; }

    State state () const { return _state; }

    bool connected () const { return _state == State::CONNECTED; }
    bool connecting () const { return _state == State::CONNECTING || _state == State::TCP_CONNECTING; }

    excepted<net::SockAddr, ErrorCode> sockaddr () const { return _stream ? _stream->sockaddr() : net::SockAddr{}; }
    excepted<net::SockAddr, ErrorCode> peeraddr () const { return _stream ? _stream->peeraddr() : net::SockAddr{}; }

    Builder message () { return Builder(*this); }

    void send_message (string_view payload, const send_fn& callback = {}) {
        message().opcode(Opcode::BINARY).send(payload, callback);
    }

    template <class B, class E, class T = decltype(*std::declval<B>()), class = std::enable_if_t<std::is_convertible<T, string_view>::value>>
    void send_message (B&& begin, E&& end, const send_fn& callback = {}) {
        message().opcode(Opcode::BINARY).send(std::forward<B>(begin), std::forward<E>(end), callback);
    }

    void send_text (string_view payload, const send_fn& callback = {}) {
        message().opcode(Opcode::TEXT).send(payload, callback);
    }

    template <class B, class E, class T = decltype(*std::declval<B>()), class = std::enable_if_t<std::is_convertible<T, string_view>::value>>
    void send_text (B&& begin, E&& end, const send_fn& callback = {}) {
        message().opcode(Opcode::TEXT).send(std::forward<B>(begin), std::forward<E>(end), callback);
    }

    void send_ping (string_view payload = {});
    void send_pong (string_view payload = {});

    void close (uint16_t code = uint16_t(CloseCode::DONE)) {
        close(code, protocol::websocket::close_message(code));
    }

    void close (uint16_t code, const string& payload) {
        if (_state == State::INITIAL) return;
        do_close(code, payload);
    }

    struct Statistics : Refcnt {
        size_t msgs_in   = 0;
        size_t msgs_out  = 0;
        size_t bytes_in  = 0;
        size_t bytes_out = 0;

        Statistics() = default;
        Statistics(size_t msgs_in, size_t msgs_out, size_t bytes_in, size_t bytes_out)
            : msgs_in(msgs_in), msgs_out(msgs_out), bytes_in(bytes_in), bytes_out(bytes_out)
        {}

        void reset() {
            msgs_in = msgs_out = bytes_in = bytes_out = 0;
        }
        Statistics normalized(double secs) {
            return Statistics(msgs_in / secs, msgs_out / secs, bytes_in / secs, bytes_out / secs);
        }

        Statistics fetch(double secs) {
            auto ret = normalized(secs);
            reset();
            return ret;
        }
    };
    using StatisticsSP = iptr<Statistics>;

    void stats_counter(const StatisticsSP& val) {stats = val;}
    StatisticsSP stats_counter() {return stats;}

    bool should_deflate(Opcode opcode, size_t payload_length) const {
        return parser->should_deflate(opcode, payload_length);
    }

protected:
    State    _state;
    uint64_t shutdown_timeout = Config().shutdown_timeout;

    void set_stream (const StreamSP& s) {
        if (_stream) _stream->event_listener(nullptr);
        _stream = s;
        _stream->event_listener(this);
    }

    void init (Parser& parser, const LoopSP& loop) {
    	this->parser = &parser;
    	this->_loop  = loop;
    }

    virtual void on_message    (const MessageSP&);
    virtual void on_error      (const ErrorCode&);
    virtual void on_peer_close (const MessageSP&);
    virtual void on_ping       (const MessageSP&);
    virtual void on_pong       (const MessageSP&);
    virtual void on_close      (uint16_t code, const string& payload);

    virtual void do_close (uint16_t code, const string& payload);

    void on_read  (string&, const ErrorCode&) override;
    void on_eof   () override;
    void on_write (const ErrorCode&, const WriteRequestSP&) override;
    void on_shutdown (const ErrorCode&, const ShutdownRequestSP&) override;

    void process_error (const ErrorCode& err, uint16_t code = CloseCode::ABNORMALLY);

    virtual ~Connection () = 0;

private:
    friend struct Builder;
    LoopSP       _loop;
    StreamSP     _stream;
    Parser*      parser;
    StatisticsSP stats;
    bool         _error_state;

    Stream::write_fn wrap_send_fn (const send_fn& callback) {
        return [this, callback](auto&, auto& err, auto& wreq) { callback(this, err, wreq); };
    }

    void process_peer_close (const MessageSP&);
};

template <class B, class E, class, class>
WriteRequestSP Builder::send (B&& begin, E&& end, const send_fn& callback) {
    if (!_connection.connected()) {
        if (callback) callback(&_connection, errc::WRITE_ERROR, new unievent::WriteRequest());
        return nullptr;
    }
    WriteRequestSP req = new WriteRequest(MessageBuilder::send(std::forward<B>(begin), std::forward<E>(end)));
    if (callback) req->event.add(_connection.wrap_send_fn(callback));
    _connection.stream()->write(req);
    return req;
}

template <class B, class E, class, class>
WriteRequestSP Builder::send_multiframe (B&& begin, E&& end, const send_fn& callback) {
    if (!_connection.connected()) {
        if (callback) callback(&_connection, errc::WRITE_ERROR, new unievent::WriteRequest());
        return nullptr;
    }
    auto vdata = MessageBuilder::send_multiframe(std::forward<B>(begin), std::forward<E>(end));
    WriteRequestSP req = new WriteRequest(vdata.begin(), vdata.end());
    if (callback) req->event.add(_connection.wrap_send_fn(callback));
    _connection.stream()->write(req);
    return req;
}

template <class B, class E, class T>
std::enable_if_t<std::is_convertible<T, string_view>::value, WriteRequestSP>
Builder::send_multiframe (B&& begin, E&& end, const send_fn& callback) {
    if (!_connection.connected()) {
        if (callback) callback(&_connection, errc::WRITE_ERROR, new unievent::WriteRequest());
        return nullptr;
    }
    auto vdata = MessageBuilder::send_multiframe(std::forward<B>(begin), std::forward<E>(end));
    WriteRequestSP req = new WriteRequest(vdata.begin(), vdata.end());
    if (callback) req->event.add(_connection.wrap_send_fn(callback));
    _connection.stream()->write(req);
    return req;
}

std::ostream& operator<< (std::ostream& stream, const Connection::Config& conf);
std::ostream& operator<< (std::ostream& stream, const Connection::Statistics& conf);

}}}