#include "panda/unievent/http/Server.h"
#include <panda/log.h>
#include <panda/unievent/test/AsyncTest.h>
#include <panda/unievent/websocket/Client.h>
#include <panda/unievent/websocket/Server.h>
#include <thread>
#include <openssl/ssl.h>
#include <catch2/catch_test_macros.hpp>

#define panda_log_module panda::unievent::websocket::panda_log_module

using namespace panda::unievent::websocket;
using Location = panda::unievent::http::Server::Location;
using panda::unievent::test::AsyncTest;
using panda::unievent::LoopSP;
using panda::unievent::StreamSP;
using panda::string;

struct Pair {
    ServerSP server;
    ClientSP client;
};

using panda::unievent::SslContext;
static SslContext get_server_context(string ca_name) {
    auto ctx = SSL_CTX_new(SSLv23_server_method());

    auto r = SslContext::attach(ctx);

    string path("tests/cert");
    string cert = path + "/" + ca_name + ".pem";
    string key = path + "/" + ca_name + ".key";
    int err;

    err = SSL_CTX_use_certificate_file(ctx, cert.c_str(), SSL_FILETYPE_PEM);
    assert(err);

    err = SSL_CTX_use_PrivateKey_file(ctx, key.c_str(), SSL_FILETYPE_PEM);
    assert(err);

    err = SSL_CTX_check_private_key(ctx);
    assert(err);

    err = SSL_CTX_load_verify_locations(ctx, cert.c_str(), nullptr);
    assert(err);

    SSL_CTX_set_verify(ctx, SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, nullptr);
    SSL_CTX_set_verify_depth(ctx, 4);
    return r;
}

static ServerSP make_server (LoopSP loop, uint16_t& port, bool secure = false) {
    ServerSP server = new Server(loop);
    Server::Config conf;
    Location loc;
    loc.host = "127.0.0.1";
    if (secure) {
        loc.ssl_ctx = get_server_context("ca");
    }
    conf.locations.push_back(loc);
    server->configure(conf);
    server->run();
    port = server->sockaddr()->port();
    return server;
}

static ClientConnectRequestSP make_connect(uint16_t port) {
    ClientConnectRequestSP req = new ClientConnectRequest();
    req->uri = new URI();
    req->uri->host("127.0.0.1");
    req->uri->scheme("ws");
    req->uri->port(port);
    return req;
}

static Pair make_pair (LoopSP loop) {
    uint16_t port;
    ServerSP server = make_server(loop, port);
    ClientSP client = new Client(loop);
    client->connect(make_connect(port));
    return {server, client};
}

TEST_CASE("send ranges compiles", "[uews]") {
    if (false) {
        ServerConnectionSP conn;

        string msgs[] = {"1", "2", "3"};
        auto rr = msgs | ::ranges::view::transform([](string& s) -> string& {
            return s;
        });
        conn->send_message(begin(rr), end(rr));
    }
}

TEST_CASE("on_read after close", "[uews]") {
    AsyncTest test(1000, {"connect", "close"});
    {
        auto p = make_pair(test.loop);

        ServerConnectionSP sconn;
        p.server->connection_event.add([&](auto, auto conn, auto) {
            sconn = conn;
            string msg = "123";
            conn->send_message(msg);
        });
        test.await(p.client->message_event, "connect");

        sconn->stream()->write(string(120 * 1024, '1'));

        size_t rcount = 0;
        p.client->stream()->read_event.add([&](auto client, auto&, auto& err){
            if (err) WARN(err);
            REQUIRE_FALSE(err);
            rcount++;
            client->shutdown();
            client->disconnect();
        });

        test.await(sconn->stream()->eof_event, "close");
        REQUIRE(rcount == 1);
    }

    test.run();
}

TEST_CASE("destroying server&client in callbacks", "[uews]") {
    AsyncTest test(1000, 1);
    {
        auto p = make_pair(test.loop);
        auto srm = [&](auto...) {
            test.happens();
            CHECK(true);
            p.server->stop();
            p.server = nullptr;
        };
        auto crm = [&](auto...){
            test.happens();
            CHECK(true);
            p.client = nullptr;
            p.server->stop_listening();
        };

        SECTION("server - handshake_callback")  { p.server->handshake_callback = srm; }
        SECTION("server - connection_event")    { p.server->connection_event.add(srm); }
        SECTION("server - disconnection_event") { p.server->disconnection_event.add(srm); }

        p.server->connection_event.add([&](auto, auto conn, auto) {
            panda_log_info("s-conn");
            SECTION("server - message_event")    { conn->message_event.add(srm); }
            SECTION("server - close_event")      { conn->close_event.add(srm); }
            SECTION("server - peer_close_event") { conn->peer_close_event.add(srm); }
            SECTION("server - ping_event")       { conn->ping_event.add(srm); }
            SECTION("server - pong_event")       { conn->pong_event.add(srm); }

            conn->message_event.add([&](auto conn, auto) {
                conn->send_ping();
            });
            conn->ping_event.add([&](auto conn, auto) {
                string omsg = "epta";
                conn->send_message(omsg);
            });

            conn->error_event.add([&](auto, auto& err) { panda_log_info("s-err: " << err); });
            conn->message_event.add([&](auto...)       { panda_log_info("s-message"); });
            conn->close_event.add([&](auto...)         { panda_log_info("s-close"); });
            conn->peer_close_event.add([&](auto...)    { panda_log_info("s-peer-close"); });
            conn->ping_event.add([&](auto...)          { panda_log_info("s-ping"); });
            conn->pong_event.add([&](auto...)          { panda_log_info("s-pong"); });
        });

        bool ce = true;
        SECTION("client - connect_event") { p.client->connect_event.add(crm); ce = false; }

        if (ce) p.client->connect_event.add([&](auto client, auto res) {
            panda_log_info("c-conn");
            SECTION("client - message_event")    { client->message_event.add(crm); }
            SECTION("client - close_event")      { client->close_event.add(crm); }
            SECTION("client - ping_event")       { client->ping_event.add(crm); }
            SECTION("client - pong_event")       { client->pong_event.add(crm); }

            if (res->error()) return;

            string omsg = "nah";
            client->send_message(omsg);

            client->ping_event.add([&](auto client, auto) {
                client->send_ping();
            });
            client->message_event.add([&](auto client, auto) {
                client->close();
            });

            client->error_event.add([&](auto, auto& err) { panda_log_info("c-err: " << err); });
            client->message_event.add([&](auto...)       { panda_log_info("c-message"); });
            client->close_event.add([&](auto...)         { panda_log_info("c-close"); });
            client->peer_close_event.add([&](auto...)    { panda_log_info("c-peer-close"); });
            client->ping_event.add([&](auto...)          { panda_log_info("c-ping"); });
            client->pong_event.add([&](auto...)          { panda_log_info("c-pong"); });
        });

        test.run();
    }

    test.run(); // hang check
}

TEST_CASE("destroying server in error callback", "[uews]") {
    AsyncTest test(1000, 1);
    auto p = make_pair(test.loop);

    p.server->connection_event.add([&](auto, auto conn, auto) {
        conn->error_event.add([&](auto, auto&){
            test.happens();
            CHECK(true);
            //WARN("err = " << err.what());
            p.server->stop();
            p.server = nullptr;
        });
    });

    p.client->connect_event.add([&](auto client, auto) {
        client->stream()->write("fuck you bitch");
    });

    test.run();
    test.run(); // hang check
}

TEST_CASE("destroying client in error callback", "[uews]") {
    AsyncTest test(1000, 1);
    auto p = make_pair(test.loop);

    p.server->connection_event.add([&](auto, auto conn, auto) {
        conn->stream()->write("f1uck you dudefuck you dudefuck you dudefuck you dudefuck you dudefuck you dudefuck you dudefuck you");
    });

    p.client->error_event.add([&](auto, auto&){
        test.happens();
        CHECK(true);
        //WARN("err = " << err.what());
        p.server->stop_listening();
        p.client = nullptr;
    });

    test.run();
    test.run(); // hang check
}

TEST_CASE("cleanup on success", "[uews]") {
    AsyncTest test(1000, {"connect", "srecv", "crecv", "disconn"});
    {
        auto p = make_pair(test.loop);
        p.server->connection_event.add([&](auto, auto conn, auto) {
            conn->message_event.add([&](auto conn, auto msg) {
                test.happens("srecv");
                CHECK(msg->payload[0] == "nah");
                string omsg = "epta";
                conn->send_message(omsg);
            });
            conn->peer_close_event.add([&](auto...) {
                test.happens("disconn");
                SECTION("stop everything") {
                    p.server->stop();
                }
                SECTION("stop listening") {
                    p.server->stop_listening();
                }
                SECTION("stop loop") {
                    test.loop->stop();
                }
            });
        });

        p.client->connect_event.add([&](auto client, auto res) {
            CHECK(!res->error());
            test.happens("connect");
            string omsg = "nah";
            client->send_message(omsg);

            client->message_event.add([&](auto client, auto msg) {
                test.happens("crecv");
                CHECK(msg->payload[0] == "epta");
                client->close();
            });
        });

        test.run();
    }

    test.run(); // everything should be destroyed and not holding the loop
}

TEST_CASE("connect and close", "[uews]") {
    AsyncTest test(1000, 0);
    auto p = make_pair(test.loop);
    p.client->close();
    p.server = nullptr;
    test.run();
    SUCCEED("ok");
}

TEST_CASE("connect timeout", "[uews]") {
    AsyncTest test(1000);
    using panda::net::SockAddr;
    using panda::uri::URI;

    panda::iptr<panda::Refcnt> server;
    panda::unievent::StreamSP sconn;
    bool error = true;
    const uint64_t TIME = 5;

    SockAddr addr;
    ClientSP client = new Client(test.loop);

    SECTION("blackhole") {
        addr = test.get_blackhole_addr();
    }
    SECTION("no ws") {
        panda::unievent::TcpSP tserver = new panda::unievent::Tcp(test.loop);
        tserver->bind("127.0.0.1", 0);
        tserver->listen(1);
        tserver->connection_event.add([&](auto, auto conn, auto) {
            sconn = conn;
        });
        addr = tserver->sockaddr().value();
        server = tserver;
    }
    SECTION("real") {
        error = false;
        uint16_t port = 0;
        auto wsserver = make_server(test.loop, port);
        addr = SockAddr::Inet4("127.0.0.1", port);
        test.expected.push_back("cb");
        client->connect_event.add([&](auto, auto) {
            test.happens("cb"); // to check it called once. Checking error is wrong in cases of slow connect or under stress
        });
        server = wsserver;
    }


    ClientConnectRequestSP req = new ClientConnectRequest();
    req->uri = new URI;
    req->uri->host(addr.ip());
    req->uri->port(addr.port());
    req->uri->scheme(panda::unievent::websocket::ws_scheme(false));
    req->connect_timeout = TIME;

    client->connect(req);

    test.expected.push_back("connect");
    auto tup = test.await(client->connect_event, "connect");
    if (error) {
        REQUIRE(std::get<1>(tup)->error().contains(make_error_code(std::errc::timed_out)));
    } else {
        test.wait(TIME + 1);
    }
}

TEST_CASE("last ref in connect timeout", "[errors]") {
    AsyncTest test(1000, {"timeout"});

    StreamSP sconn;
    panda::unievent::TcpSP tserver = new panda::unievent::Tcp(test.loop);
    tserver->bind("127.0.0.1", 0);
    tserver->listen(1);
    tserver->connection_event.add([&](auto, auto conn, auto) {
        sconn = conn;
    });
    auto addr = tserver->sockaddr().value();

    ClientSP client = new Client(test.loop);

    ClientConnectRequestSP req = new ClientConnectRequest();
    req->uri = new panda::uri::URI;
    req->uri->host(addr.ip());
    req->uri->port(addr.port());
    req->uri->scheme(panda::unievent::websocket::ws_scheme(false));
    req->connect_timeout = 10;
    client->connect(req);

    client->connect_event.add([&](const auto&...) {
        client.reset();
        test.happens("timeout");
        test.loop->stop();
    });

    test.run();
}

TEST_CASE("simple connect", "[errors]") {
    AsyncTest test(1000, {"connected"});

    uint16_t port = 0;
    auto server = make_server(test.loop, port);

    ClientSP client = new Client(test.loop);
    client->connect("127.0.0.1/path/abc", false, port);

    test.await(client->connect_event, "connected");
}

TEST_CASE("no close frame on_eof", "[errors]") {
    AsyncTest test(1000, {"connected", "eof", "close"});

    uint16_t port = 0;
    auto server = make_server(test.loop, port);
    ClientSP client = new Client(test.loop);
    client->connect("127.0.0.1", false, port);
    test.await(client->connect_event, "connected");

    client->stream()->shutdown(); // emulates broken connection

    client->close_event.add([&](auto...) {
        test.happens("close");
    });

    test.await(client->stream()->eof_event, "eof"); // close should be emulated with 1006 after eof received, if server answer it happens before
}

TEST_CASE("shutdown timeout", "[uews]") {
    AsyncTest test(1000, {"shutdown"});
    auto p = make_pair(test.loop);

    test.await(p.client->connect_event);
    Connection::Config conf;
    conf.shutdown_timeout = 1;
    p.client->configure(conf);

    auto stream = p.client->stream().get(); // stream via stream() getter will no longer be accessible after close()
    p.client->close();
    std::this_thread::sleep_for(std::chrono::milliseconds(5));
    {
        auto tup = test.await(stream->shutdown_event, "shutdown");
        REQUIRE(std::get<1>(tup) & std::errc::timed_out);
    }
    REQUIRE(p.client->refcnt() == 1);
}

TEST_CASE("no ssl in client", "[uews]") {
    AsyncTest test(1000, {"conn"});

    uint16_t port;
    ServerSP server = make_server(test.loop, port, true);
    ClientSP client = new Client(test.loop);
    client->connect(make_connect(port));

    auto ret = test.await(client->connect_event, "conn");
    auto response = std::get<1>(ret);
    REQUIRE(response->error());
}


TEST_CASE("connect on close", "[uews]") {
    AsyncTest test(1000, {"sconn", "conn", "conn"});

    uint16_t port;
    ServerSP server = make_server(test.loop, port);
    ClientSP client = new Client(test.loop);
    client->connect(make_connect(port));

    auto ret = test.await(server->connection_event, "sconn");
    test.await(client->connect_event, "conn");
    auto sconn = std::get<1>(ret);
    sconn->close();

    client->close_event.add([&](auto...) {
        client->close();
        client->connect(make_connect(port));
    });

    ret = test.await(server->connection_event, "conn");
}