#define PERL_NO_GET_CONTEXT
#include "EXTERN.h"
#include "perl.h"
#include "XSUB.h"

#include "ppport.h"

#include <libwebsockets.h>

#include <poll.h>
#include <unistd.h>

#include <arpa/inet.h>

#define DEBUG 0

#include "xshelper/xshelper.h"

#include "nlws.h"
#include "nlws_frame.h"
#include "nlws_courier.h"
#include "nlws_perl_loop.h"
#include "nlws_context.h"
#include "nlws_logger.h"

#define WARN_DESTROY_AT_DESTRUCT(sv) \
    warn("Destroying %" SVf " at global destruction!\n", sv);

#if DEBUG
//#include <execinfo.h>
#define LOG_FUNC fprintf(stderr, "%s\n", __func__)
#else
#define LOG_FUNC
#endif

#if defined(LWS_WITHOUT_EXTENSIONS)
#   define NLWS_LWS_HAS_EXTENSIONS false
#else
#   define NLWS_LWS_HAS_EXTENSIONS true
#endif

#ifdef NLWS_LWS_HAS_PMD
#define _LWS_HAS_PMD 1
#else
#define _LWS_HAS_PMD 0
#endif

#define WS_CLOSE_IS_FAILURE(code) (code != LWS_CLOSE_STATUS_NOSTATUS && code != LWS_CLOSE_STATUS_NORMAL)

typedef struct {
    SV* courier_sv;
} pause_t;

static inline void _finish_deferred_sv (pTHX_ SV* loop_sv, SV** deferred_svp, const char* methname, SV* payload) {
    if (DEBUG) warn("finishing deferred (payload=%p)\n", payload);

    if (!*deferred_svp) croak("Can’t %s(); already finished!", methname);

    SV* deferred_sv = *deferred_svp;

    // The deferred’s callbacks might execute synchronously and also
    // might depend on the referent pointer being NULL as an indicator
    // that the deferred was already finished.
    //
    *deferred_svp = NULL;

    SV* deferred_args[] = {
        newSVsv(deferred_sv),
        newSVpv(methname, 0),
        payload,
        NULL,
    };

    SvREFCNT_dec(deferred_sv);

    xsh_call_object_method_void( aTHX_
        loop_sv,
        "schedule_destroy_and_finish",
        deferred_args
    );
}

SV* _create_err_obj (pTHX_ const char* type, unsigned argscount, ...) {
    va_list args;
    va_start(args, argscount);

    SV* create_args[argscount+2];
    create_args[0] = newSVpv(type, 0);
    create_args[argscount+1] = NULL;

    for (unsigned a=0; a<argscount; a++) {
        create_args[a+1] = va_arg(args, SV*);
    }

    va_end(args);

    SV* x_class_sv = newSVpvs("Net::Libwebsockets::X");

    load_module(PERL_LOADMOD_NOIMPORT, newSVsv(x_class_sv), NULL);

    return xsh_call_object_method_scalar( aTHX_
        sv_2mortal(x_class_sv),
        "create",
        create_args
    );
}

void _on_ws_close (pTHX_ my_perl_context_t* my_perl_context, nlws_abstract_loop_t* myloop_p, uint16_t code, size_t reasonlen, const U8* reason) {
    LOG_FUNC;

    char* deferred_method;
    SV* promise_value;

    SV* code_sv = newSVuv(code);
    SV* reason_sv = newSVpvn_flags(
        (const char *) reason,
        reasonlen,
        SVf_UTF8
    );

    if (WS_CLOSE_IS_FAILURE(code)) {
        deferred_method = "reject";

        promise_value = _create_err_obj( aTHX_
            "WebSocketClose",
            2,
            code_sv,
            reason_sv
        );
    }
    else {
        deferred_method = "resolve";

        SV* args[] = { code_sv, reason_sv };

        unsigned numargs = sizeof(args) / sizeof(*args);

        AV* code_reason = av_make( numargs, args );
        promise_value = newRV_noinc((SV*) code_reason);
    }

    _finish_deferred_sv( aTHX_
        myloop_p->perlobj,
        &my_perl_context->done_d,
        deferred_method,
        promise_value
    );
}

void _on_ws_error (pTHX_ my_perl_context_t* my_perl_context, nlws_abstract_loop_t* myloop_p, size_t reasonlen, const char* reason) {
    LOG_FUNC;

    SV** deferred_svp = &my_perl_context->done_d;

    SV* err_obj = _create_err_obj( aTHX_ "ConnectionFailed", 1, newSVpvn(reason, reasonlen) );

    _finish_deferred_sv( aTHX_ myloop_p->perlobj, deferred_svp, "reject", err_obj );
}

void _on_ws_message(pTHX_ my_perl_context_t* my_perl_context, SV* msgsv) {
    courier_t* courier = my_perl_context->courier;

    // Because of the assert() below this initialization isn’t needed,
    // but some compilers aren’t smart enough to realize that.
    unsigned cbcount = 0;

    SV** cbs;

    switch (my_perl_context->message_type) {
        case NET_LWS_MESSAGE_TYPE_TEXT:
            cbcount = courier->on_text_count;
            cbs = courier->on_text;
            break;

        case NET_LWS_MESSAGE_TYPE_BINARY:
            cbcount = courier->on_binary_count;
            cbs = courier->on_binary;
            break;

        default:
            assert(0);
    }

    SV* cbargs[] = { NULL, NULL };

    for (unsigned c=0; c<cbcount; c++) {
        cbargs[0] = (c == cbcount-1) ? msgsv : newSVsv(msgsv);
        xsh_call_sv_trap_void(cbs[c], cbargs, "Callback error: ");
    }
}

static int
net_lws_wsclient_callback(
    struct lws *wsi,
    enum lws_callback_reasons reason,
    void *user,
    void *in,
    size_t len
) {
    my_perl_context_t* my_perl_context = user;

    if (DEBUG) fprintf(stderr, "LWS callback: %d\n", reason);

    if (!my_perl_context) {
        // It happens that we don’t need any of the callbacks where
        // this applies.

        if (DEBUG) fprintf(stderr, "--> no context given; skipping ...\n");

        return 0;
    }

    PERL_CONTEXT_FROM_STRUCT(my_perl_context);

    switch (reason) {

    case LWS_CALLBACK_WSI_DESTROY:
        if (my_perl_context->courier_sv) {
            SvREFCNT_dec(my_perl_context->courier_sv);
        }

        break;

    case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: {
        unsigned char **p = (unsigned char **)in;
        unsigned char *end = (*p) + len;

        AV* headers_av = (AV*) SvRV(my_perl_context->headers_ar);

        int headers_len = 1 + av_top_index(headers_av);

        STRLEN valuelen;
        SV** key;
        SV** value;

        int failed = 0;

        for (int h=0; h<headers_len; h += 2) {
            key = av_fetch(headers_av, h, 0);
            assert(key);

            value = av_fetch(headers_av, 1 + h, 0);
            assert(value);

            const U8* valuestr = (const U8*) SvPVbyte( *value, valuelen );

            int failed = lws_add_http_header_by_name(
                wsi,
                (const U8*) xsh_sv_to_str(*key),
                valuestr,
                valuelen,
                p, end
            );

            if (failed) break;
        }

        SvREFCNT_dec(my_perl_context->headers_ar);

        if (failed) return -1;

        } break;

    case LWS_CALLBACK_CLIENT_ESTABLISHED: {
        courier_t* courier = nlws_create_courier(aTHX_ wsi);

        my_perl_context->courier = courier;

        SV* courier_sv = xsh_ptr_to_svrv(courier, gv_stashpv(COURIER_CLASS, FALSE));
        my_perl_context->courier_sv = courier_sv;

        SV* args[] = {
            newSVsv(courier_sv),
            NULL,
        };

        xsh_call_sv_trap_void(
            my_perl_context->on_ready,
            args,
            "on_ready"
        );

        SvREFCNT_dec(my_perl_context->on_ready);

        } break;

    case LWS_CALLBACK_CLIENT_WRITEABLE: {
        courier_t* courier = my_perl_context->courier;

        // Idea taken from LWS’s lws-minimal-client-echo demo:
        // permessage-deflate requires that we forgo consuming the
        // item from the ring buffer until the next writeable.
        //
        if (courier->consume_pending_count) {
            courier->consume_pending_count -= lws_ring_consume(courier->ring, NULL, NULL, courier->consume_pending_count);
        }

        const frame_t *frame_p = lws_ring_get_element(courier->ring, NULL);

        if (frame_p) {
            int wrote = lws_write(
                wsi,
                LWS_PRE + frame_p->pre_plus_payload,
                frame_p->len,
                frame_p->flags
            );

            if (wrote < (int)frame_p->len) {
                warn("ERROR %d while writing to WebSocket!", wrote);
                return -1;
            }

            courier->consume_pending_count++;
            lws_callback_on_writable(wsi);
        }

        // Don’t close until we’ve flushed the buffer:
        else if (courier->close_requested) {
            if (courier->close_status != LWS_CLOSE_STATUS_NOSTATUS) {
                lws_close_reason(
                    wsi,
                    courier->close_status,
                    courier->close_reason,
                    courier->close_reason_length
                );
            }

            return -1;
        }

        } break;

    case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: {
        nlws_abstract_loop_t* myloop_p = (nlws_abstract_loop_t*) lws_evlib_wsi_to_evlib_pt(wsi);

        _on_ws_error(aTHX_ my_perl_context, myloop_p, len, in);
        } break;

    case LWS_CALLBACK_CLIENT_RECEIVE: {

        if (lws_is_first_fragment(wsi)) {

            my_perl_context->message_type = lws_frame_is_binary(wsi) ? NET_LWS_MESSAGE_TYPE_BINARY : NET_LWS_MESSAGE_TYPE_TEXT;

            // In this (generally prevalent) case we can create our SV
            // directly from the incoming frame.
            if (lws_is_final_fragment(wsi)) {
                _on_ws_message(aTHX_ my_perl_context, newSVpvn_flags(in, len, lws_frame_is_binary(wsi) ? 0 : SVf_UTF8));
                break;
            }

            my_perl_context->content_length = len;
        }
        else {
            my_perl_context->content_length += len;
        }

        Renew(my_perl_context->message_content, my_perl_context->content_length, char);
        Copy(
            in,
            my_perl_context->message_content + my_perl_context->content_length - len,
            len,
            char
        );

        if (lws_is_final_fragment(wsi)) {
            SV* msgsv = newSVpvn_flags(
                my_perl_context->message_content,
                my_perl_context->content_length,
                my_perl_context->message_type == NET_LWS_MESSAGE_TYPE_TEXT ? SVf_UTF8 : 0
            );

            _on_ws_message(aTHX_ my_perl_context, msgsv );
        }

        } break;

    case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
        courier_t* courier = my_perl_context->courier;

        courier->close_status = ntohs( *(uint16_t *) in );
        courier->close_reason_length = len - sizeof(uint16_t);

        memcpy(courier->close_reason, sizeof(uint16_t) + in, courier->close_reason_length);

        } break;

    case LWS_CALLBACK_CLIENT_CLOSED: {

        courier_t* courier = my_perl_context->courier;

        nlws_abstract_loop_t* myloop_p = (nlws_abstract_loop_t*) lws_evlib_wsi_to_evlib_pt(wsi);

        _on_ws_close(aTHX_
            my_perl_context,
            myloop_p,
            courier->close_status,
            courier->close_reason_length,
            courier->close_reason
        );

        } break;

    default:
        break;
    }

    return 0;
}


void _courier_send( pTHX_ courier_t* courier, U8* buf, STRLEN len, enum lws_write_protocol protocol ) {

    frame_t frame = {
        .len = len,
        .flags = lws_write_ws_flags(protocol, 1, 1),
    };

    Newx(frame.pre_plus_payload, len + LWS_PRE, U8);

    Copy(buf, LWS_PRE + frame.pre_plus_payload, len, U8);

    if (!lws_ring_insert( courier->ring, &frame, 1 )) {
        nlws_destroy_frame(&frame);

        size_t count = lws_ring_get_count_free_elements(courier->ring);

        croak("Failed to add message to ring buffer! (%zu ring nodes free)", count);
    }

    lws_callback_on_writable(courier->wsi);
}

static inline void _lws_service_fd (pTHX_ UV lws_context_uv, int fd, short event) {
    uintptr_t lws_context_int = lws_context_uv;

    struct lws_context *context = (void *) lws_context_int;

    struct lws_pollfd pollfd = {
        .fd = fd,
        .events = event,
        .revents = event,
    };

    lws_service_fd(context, &pollfd);
}

const struct lws_protocols wsclient_protocols[] = {
    {
        .name = NET_LWS_LOCAL_PROTOCOL_NAME,
        .callback = net_lws_wsclient_callback,
        .per_session_data_size = sizeof(void*),
        .rx_buffer_size = 0,
    },
    { NULL }
};

void _populate_extensions (pTHX_ struct lws_extension* extensions, AV* compressions_av) {
#if _LWS_HAS_PMD
    SSize_t compressions_len = 1 + av_top_index(compressions_av);

    for (SSize_t c=0; c<compressions_len; c++) {
        SV** cur_p = av_fetch(compressions_av, c, FALSE);

        assert(cur_p);
        assert(*cur_p);
        assert(SvROK(*cur_p));
        assert(SVt_PVAV == SvTYPE(SvRV(*cur_p)));

        AV* cur_av = (AV*) SvRV(*cur_p);

        SV** extn_name_p = av_fetch(cur_av, 0, FALSE);
        assert(extn_name_p);
        assert(*extn_name_p);
        assert(SvOK(*extn_name_p));

        if (!strEQ("deflate", xsh_sv_to_str(*extn_name_p)) ) {
            croak("Bad extension name: %" SVf, *extn_name_p);
        }

        SV** client_offer_p = av_fetch(cur_av, 1, FALSE);
        assert(client_offer_p);
        assert(*client_offer_p);
        assert(SvOK(*client_offer_p));

        extensions[c] = (struct lws_extension) {
            .name = "permessage-deflate",
            .callback = lws_extension_callback_pm_deflate,
            .client_offer = xsh_sv_to_str(*client_offer_p),
        };
    }
#endif
}

/* ---------------------------------------------------------------------- */

MODULE = Net::Libwebsockets     PACKAGE = Net::Libwebsockets

PROTOTYPES: DISABLE

BOOT:
    // ----------------------------------------------------------------------
    // LWS config:
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "HAS_PMD", boolSV(_LWS_HAS_PMD));

    // ----------------------------------------------------------------------
    // TLS:
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LCCSCF_ALLOW_SELFSIGNED", newSVuv(LCCSCF_ALLOW_SELFSIGNED));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK", newSVuv(LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LCCSCF_ALLOW_EXPIRED", newSVuv(LCCSCF_ALLOW_EXPIRED));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LCCSCF_ALLOW_INSECURE", newSVuv(LCCSCF_ALLOW_INSECURE));

    // ----------------------------------------------------------------------
    // Logging:
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_ERR", newSVuv(LLL_ERR));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_WARN", newSVuv(LLL_WARN));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_NOTICE", newSVuv(LLL_NOTICE));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_INFO", newSVuv(LLL_INFO));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_DEBUG", newSVuv(LLL_DEBUG));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_PARSER", newSVuv(LLL_PARSER));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_HEADER", newSVuv(LLL_HEADER));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_EXT", newSVuv(LLL_EXT));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_CLIENT", newSVuv(LLL_CLIENT));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_LATENCY", newSVuv(LLL_LATENCY));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_USER", newSVuv(LLL_USER));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "LLL_THREAD", newSVuv(LLL_THREAD));

    // ----------------------------------------------------------------------
    // Privates:
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "_LCCSCF_USE_SSL", newSVuv(LCCSCF_USE_SSL));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "_LWS_EV_READ", newSVuv(LWS_EV_READ));
    newCONSTSUB(gv_stashpv("Net::Libwebsockets", FALSE), "_LWS_EV_WRITE", newSVuv(LWS_EV_WRITE));

void
set_log_level(SV* level_sv)
    CODE:
        IV level = xsh_sv_to_iv(level_sv);

        if ((level < 0) || (level >= (1 << LLL_COUNT))) {
            croak("%s: Invalid level: %" IVdf, xsh_PL_package, level);
        }

        lws_set_log_level(level, NULL);

# ----------------------------------------------------------------------
# Privates:

void
_lws_service_fd_read( UV lws_context_uv, int fd )
    CODE:
        _lws_service_fd(aTHX_ lws_context_uv, fd, POLLIN);

void
_lws_service_fd_write( UV lws_context_uv, int fd )
    CODE:
        _lws_service_fd(aTHX_ lws_context_uv, fd, POLLOUT);

int
_get_timeout( UV lws_context_uv )
    CODE:
        uintptr_t lws_context_int = lws_context_uv;

        struct lws_context *context = (void *) lws_context_int;

        RETVAL = lws_service_adjust_timeout(
            context,
            DEFAULT_POLL_TIMEOUT,
            0
        );

    OUTPUT:
        RETVAL

void
_lws_context_destroy( UV lws_context_uv )
    CODE:
        lws_context_destroy( (struct lws_context*) lws_context_uv );

MODULE = Net::Libwebsockets     PACKAGE = Net::Libwebsockets::WebSocket::Client

PROTOTYPES: DISABLE

void
_new (SV* hostname, int port, SV* path, SV* compression_sv, SV* subprotocols_sv, SV* headers_ar, int tls_opts, unsigned ping_interval, unsigned ping_timeout, SV* loop_obj, SV* done_d, SV* on_ready_sv, SV* logger_obj)
    CODE:
        assert(SvROK(compression_sv));
        assert(SVt_PVAV == SvTYPE(SvRV(compression_sv)));

        AV* compressions_av = (AV*) SvRV(compression_sv);
        SSize_t compressions_len = 1 + av_top_index(compressions_av);

        struct lws_extension* extensions_p;

        if (NLWS_LWS_HAS_EXTENSIONS) {
            Newxz(extensions_p, 1 + compressions_len, struct lws_extension);

            _populate_extensions(aTHX_ extensions_p, compressions_av);
        }
        else {
            extensions_p = NULL;
        }

        lws_log_cx_t* log_cx;

        if (logger_obj && SvOK(logger_obj)) {
            log_cx = xsh_svrv_to_ptr(logger_obj);
        }
        else {
            log_cx = NULL;
        }

        // This struct gets copied; we defer bumping loop_obj’s refcount
        // until that time. See nlws_perl_loop.c.
        nlws_abstract_loop_t abstract_loop = {
            PERL_CONTEXT_IN_STRUCT
            .perlobj = loop_obj,
        };

        struct lws_context_creation_info info = {
            .options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT
                        | LWS_SERVER_OPTION_VALIDATE_UTF8,

            .extensions = extensions_p,

            .event_lib_custom = &evlib_custom,

            .foreign_loops = (void *[]) {
                &abstract_loop,
            },

            .port = CONTEXT_PORT_NO_LISTEN, /* we do not run any server */

            .protocols = wsclient_protocols,

            // Ref-counting of logger_obj happens via the logger’s
            // own reference count.
            .log_cx = log_cx,
        };

        // This function never croaks; it only rejects the promise.
        // We thus bump the promise’s refcount here.
        //
        SvREFCNT_inc(done_d);

        struct lws_context *context = lws_create_context(&info);
        if (!context) {
            if (extensions_p) Safefree(extensions_p);

            _finish_deferred_sv( aTHX_
                loop_obj,
                &done_d,
                "reject",
                _create_err_obj( aTHX_
                    "General",
                    1,
                    newSVpvs("lws_create_context failed")
                )
            );

            return;
        }

        my_perl_context_t* my_perl_context;
        Newxz(my_perl_context, 1, my_perl_context_t);

        *my_perl_context = (my_perl_context_t) {
            PERL_CONTEXT_IN_STRUCT
            .pid = getpid(),

            .extensions = extensions_p,

            .courier = NULL,
            .courier_sv = NULL,

            // We bump the refcounts of these below:
            .on_ready = on_ready_sv,
            .done_d   = done_d,
            .headers_ar = headers_ar,

            .logger_obj = logger_obj,

            .lws_retry = (lws_retry_bo_t) {
                .retry_ms_table_count = 0,
                .conceal_count = 0,
                .secs_since_valid_ping = ping_interval,
                .secs_since_valid_hangup = ping_timeout,
            },
        };

        const char* hostname_str = xsh_sv_to_str(hostname);

        struct lws_client_connect_info client = {
            .context = context,
            .port = port,

            .address = hostname_str,
            .path = xsh_sv_to_str(path),
            .host = hostname_str,
            .origin = hostname_str,
            .ssl_connection = tls_opts,
            .retry_and_idle_policy = &my_perl_context->lws_retry,

            // The callback’s `user`:
            .userdata = my_perl_context,

            .protocol = SvOK(subprotocols_sv) ? xsh_sv_to_str( subprotocols_sv) : NULL,
        };

        if (!lws_client_connect_via_info(&client)) {
            if (extensions_p) Safefree(extensions_p);
            lws_context_destroy(context);
            return;
        }

        SvREFCNT_inc(on_ready_sv);
        SvREFCNT_inc(headers_ar);

# ----------------------------------------------------------------------

MODULE = Net::Libwebsockets     PACKAGE = Net::Libwebsockets::WebSocket::Pause

PROTOTYPES: DISABLE

void
DESTROY (SV* self_sv)
    CODE:
        pause_t* my_pause = xsh_svrv_to_ptr(self_sv);

        courier_t* courier = xsh_svrv_to_ptr(my_pause->courier_sv);

        courier->pauses--;

        if (!courier->pauses) {
            lws_rx_flow_control(courier->wsi, 1);
        }

        SvREFCNT_dec(my_pause->courier_sv);

        Safefree(my_pause);

# ----------------------------------------------------------------------

MODULE = Net::Libwebsockets     PACKAGE = Net::Libwebsockets::WebSocket::Courier

PROTOTYPES: DISABLE

void
on_text (SV* self_sv, SV* cbref)
    CODE:
        courier_t* courier = xsh_svrv_to_ptr(self_sv);

        SvREFCNT_inc(cbref);

        courier->on_text_count++;
        Renew(courier->on_text, courier->on_text_count, SV*);
        courier->on_text[courier->on_text_count - 1] = cbref;

void
on_binary (SV* self_sv, SV* cbref)
    CODE:
        courier_t* courier = xsh_svrv_to_ptr(self_sv);

        SvREFCNT_inc(cbref);

        courier->on_binary_count++;
        Renew(courier->on_binary, courier->on_binary_count, SV*);
        courier->on_binary[courier->on_binary_count - 1] = cbref;

void
send_text (SV* self_sv, SV* payload_sv)
    CODE:
        courier_t* courier = xsh_svrv_to_ptr(self_sv);

        STRLEN len;
        U8* buf = (U8*) SvPVutf8(payload_sv, len);

        _courier_send(aTHX_ courier, buf, len, LWS_WRITE_TEXT);

void
send_binary (SV* self_sv, SV* payload_sv)
    CODE:
        courier_t* courier = xsh_svrv_to_ptr(self_sv);

        STRLEN len;
        U8* buf = (U8*) SvPVbyte(payload_sv, len);

        _courier_send(aTHX_ courier, buf, len, LWS_WRITE_BINARY);

SV*
pause (SV* self_sv)
    CODE:
        if (GIMME_V == G_VOID) croak("Don’t call pause() in void context!");

        courier_t* courier = xsh_svrv_to_ptr(self_sv);

        pause_t* my_pause;
        Newx(my_pause, 1, pause_t);

        my_pause->courier_sv = self_sv;
        SvREFCNT_inc(self_sv);

        if (!courier->pauses) {
            lws_rx_flow_control(courier->wsi, 0);
        }

        courier->pauses++;

        RETVAL = xsh_ptr_to_svrv(my_pause, gv_stashpv(PAUSE_CLASS, FALSE));

    OUTPUT:
        RETVAL

void
close (SV* self_sv, U16 code=LWS_CLOSE_STATUS_NOSTATUS, SV* reason_sv=NULL)
    CODE:
        courier_t* courier = xsh_svrv_to_ptr(self_sv);

        if (reason_sv && SvOK(reason_sv)) {
            U8* reason = (U8*) SvPVutf8(reason_sv, courier->close_reason_length);

            if (courier->close_reason_length > MAX_CLOSE_REASON_LENGTH) {
                warn("Truncating %zu-byte close reason (%.*s) to %d bytes …", courier->close_reason_length, (int) courier->close_reason_length, reason, MAX_CLOSE_REASON_LENGTH);
                courier->close_reason_length = MAX_CLOSE_REASON_LENGTH;
            }

            memcpy(courier->close_reason, reason, courier->close_reason_length);
        }
        else {
            courier->close_reason_length = 0;
        }

        courier->close_requested = true;
        courier->close_status = code;

        // Force a writable callback, which will trigger our close.
        lws_callback_on_writable(courier->wsi);

void
DESTROY (SV* self_sv)
    CODE:
        courier_t* courier = xsh_svrv_to_ptr(self_sv);

        if (IS_GLOBAL_DESTRUCTION && (getpid() == courier->pid)) {
            WARN_DESTROY_AT_DESTRUCT(self_sv);
        }

        nlws_destroy_courier(aTHX_ courier);

# ----------------------------------------------------------------------

MODULE = Net::Libwebsockets     PACKAGE = Net::Libwebsockets::Logger

PROTOTYPES: DISABLE

SV*
_new (SV* classname_sv, SV* level_sv, SV* callback)
    CODE:
        U32 level;

        if (SvOK(level_sv)) {
            level = xsh_sv_to_uv(level_sv);
        }
        else {
            level = nlws_get_global_lwsl_level();
        }

        lws_log_cx_t* my_logger_p;
        Newx(my_logger_p, 1, lws_log_cx_t);

        SV* cb_or_null = (callback && SvOK(callback)) ? SvREFCNT_inc(callback) : NULL;

        nlws_logger_opaque_t* opaque;
        Newx(opaque, 1, nlws_logger_opaque_t);

        SV* self_sv = xsh_ptr_to_svrv(my_logger_p, gv_stashpv(SvPVbyte_nolen(classname_sv), FALSE));

        *opaque = (nlws_logger_opaque_t) {
            PERL_CONTEXT_IN_STRUCT
            .pid = getpid(),
            .perlobj = self_sv,
        };

        *my_logger_p = (lws_log_cx_t) {
            .lll_flags = level,
            .refcount_cb = nlws_logger_on_refcount_change,
            .opaque = opaque,
        };

        if (cb_or_null) {
            opaque->callback = cb_or_null;

            my_logger_p->lll_flags |= LLLF_LOG_CONTEXT_AWARE;
            my_logger_p->u.emit_cx = nlws_logger_emit;
        }
        else {
            my_logger_p->u.emit = lwsl_emit_stderr;
        }

        RETVAL = self_sv;

    OUTPUT:
        RETVAL

void
DESTROY (SV* self_sv)
    CODE:
        lws_log_cx_t* my_logger_p = xsh_svrv_to_ptr(self_sv);

        if (my_logger_p->opaque) {
            nlws_logger_opaque_t* opaque = my_logger_p->opaque;

            if (IS_GLOBAL_DESTRUCTION && (getpid() == opaque->pid)) {
                WARN_DESTROY_AT_DESTRUCT(self_sv);
            }

            if (opaque->callback) SvREFCNT_dec(opaque->callback);

            Safefree(opaque);
        }

        Safefree(my_logger_p);