#include "easyxs/easyxs.h"

#include <stdbool.h>
#include <unistd.h>

#define MY_CXT_KEY "Promise::XS::_guts" XS_VERSION

#define BASE_CLASS "Promise::XS"

#define PROMISE_CLASS "Promise::XS::Promise"
#define PROMISE_CLASS_TYPE Promise__XS__Promise

#define DEFERRED_CLASS "Promise::XS::Deferred"
#define DEFERRED_CLASS_TYPE Promise__XS__Deferred

#define CONVERTER_CR_NAME "_convert_to_our_promise"

#ifdef PL_phase
#define PXS_IS_GLOBAL_DESTRUCTION PL_phase == PERL_PHASE_DESTRUCT
#else
#define PXS_IS_GLOBAL_DESTRUCTION PL_dirty
#endif

#define RESULT_IS_RESOLVED(result) (result->state == XSPR_RESULT_RESOLVED)
#define RESULT_IS_REJECTED(result) (result->state == XSPR_RESULT_REJECTED)

#define UNUSED(x) (void)(x)

#define DEBUG_AWAITABLE 0
#if DEBUG_AWAITABLE
#   define _DO_DEBUG_AWAITABLE() fprintf(stderr, "# %s\n", __func__)
#else
#   define _DO_DEBUG_AWAITABLE()
#endif

#define _MAX_RECURSION 254

/* We could look here at the full stack depth
   (PL_stack_sp - PL_stack_base), but we only really care about
   our *own* recursion, not the overall Perl stack.
*/

#define _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION \
    dMY_CXT; \
    if (MY_CXT.callback_depth > _MAX_RECURSION) { \
        croak("Exceeded %u callbacks; infinite recursion detected!", _MAX_RECURSION); \
    }

typedef enum {
    _DEFER_NONE = 0,
    _DEFER_ANYEVENT,
    _DEFER_IOASYNC,
    _DEFER_MOJO,
} event_system_t;

typedef struct xspr_callback_s xspr_callback_t;
typedef struct xspr_promise_s xspr_promise_t;
typedef struct xspr_result_s xspr_result_t;
typedef struct xspr_callback_queue_s xspr_callback_queue_t;

typedef enum {
    XSPR_STATE_NONE,
    XSPR_STATE_PENDING,
    XSPR_STATE_FINISHED,
} xspr_promise_state_t;

typedef enum {
    XSPR_RESULT_NONE,
    XSPR_RESULT_RESOLVED,
    XSPR_RESULT_REJECTED,
    XSPR_RESULT_BOTH
} xspr_result_state_t;

typedef enum {
    // from then() or catch()
    XSPR_CALLBACK_PERL,

    // from finally()
    XSPR_CALLBACK_FINALLY,


    // from a promise returned from a then() or catch() callback
    XSPR_CALLBACK_CHAIN,

    // from a promise returned from a finally() callback
    XSPR_CALLBACK_FINALLY_CHAIN
} xspr_callback_type_t;

struct xspr_callback_s {
    xspr_callback_type_t type;

    union {
        struct {
            SV* on_resolve;
            SV* on_reject;
            xspr_promise_t* next;
        } perl;

        struct {
            SV* on_finally;
            xspr_promise_t* next;
        } finally;

        xspr_promise_t* chain;

        struct {
            xspr_result_t* original_result;
            xspr_promise_t* chain_promise;
        } finally_chain;
    };
};

struct xspr_result_s {
    xspr_result_state_t state;
    SV** results;
    int count;
    int refs;
    bool rejection_should_warn;
};

struct xspr_promise_s {
    xspr_promise_state_t state;
    pid_t detect_leak_pid;
    int refs;
    union {
        struct {
            xspr_callback_t** callbacks;
            int callbacks_count;
        } pending;
        struct {
            xspr_result_t *result;
        } finished;
    };

    /* For async/await: */
    SV* on_ready_immediate;
    SV* self_sv_ref;
};

struct xspr_callback_queue_s {
    xspr_promise_t* origin;
    xspr_callback_t* callback;
    xspr_callback_queue_t* next;
};

xspr_callback_t* xspr_callback_new_perl(pTHX_ SV* on_resolve, SV* on_reject, xspr_promise_t* next);
xspr_callback_t* xspr_callback_new_chain(pTHX_ xspr_promise_t* chain);
xspr_callback_t* xspr_callback_new_finally_chain(pTHX_ xspr_result_t* original_result, xspr_promise_t* next_promise);
void xspr_callback_process(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin);
void xspr_callback_free(pTHX_ xspr_callback_t* callback);

xspr_promise_t* xspr_promise_new(pTHX);
void xspr_promise_then(pTHX_ xspr_promise_t* promise, xspr_callback_t* callback);
void xspr_promise_finish(pTHX_ xspr_promise_t* promise, xspr_result_t *result);
void xspr_promise_incref(pTHX_ xspr_promise_t* promise);
void xspr_promise_decref(pTHX_ xspr_promise_t* promise);

xspr_result_t* xspr_result_new(pTHX_ xspr_result_state_t state, unsigned count);
xspr_result_t* pxs_result_clone(pTHX_ xspr_result_t* old);
xspr_result_t* xspr_result_from_error(pTHX_ const char *error);
void xspr_result_incref(pTHX_ xspr_result_t* result);
void xspr_result_decref(pTHX_ xspr_result_t* result);

xspr_result_t* xspr_invoke_perl(pTHX_ SV* perl_fn, SV** inputs, unsigned input_count);
xspr_promise_t* xspr_promise_from_sv(pTHX_ SV* input);


typedef struct {
    xspr_callback_queue_t* queue_head;
    xspr_callback_queue_t* queue_tail;
    int in_flush;
    int backend_scheduled;
    unsigned char callback_depth;
#ifdef USE_ITHREADS
    tTHX owner;
#endif
    SV* pxs_flush_cr;
    HV* pxs_base_stash;
    HV* pxs_promise_stash;
    HV* pxs_deferred_stash;
    SV* deferral_cr;
    SV* deferral_arg;
    event_system_t event_system;
    SV* stop_cr;
} my_cxt_t;

typedef struct {
    xspr_promise_t* promise;
} DEFERRED_CLASS_TYPE;

typedef struct {
    xspr_promise_t* promise;
} PROMISE_CLASS_TYPE;

//----------------------------------------------------------------------

START_MY_CXT

/* Process a single callback */
void xspr_callback_process(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin)
{
    ASSUME(origin->state == XSPR_STATE_FINISHED);

    if (callback->type == XSPR_CALLBACK_CHAIN) {
        xspr_promise_finish(aTHX_ callback->chain, origin->finished.result);

    } else if (callback->type == XSPR_CALLBACK_FINALLY_CHAIN) {
        xspr_promise_finish(aTHX_
            callback->finally_chain.chain_promise,
            RESULT_IS_REJECTED(origin->finished.result) ? origin->finished.result : callback->finally_chain.original_result
        );

    } else if (callback->type == XSPR_CALLBACK_PERL || callback->type == XSPR_CALLBACK_FINALLY) {
        SV* callback_fn;
        xspr_promise_t* next_promise;

        if (callback->type == XSPR_CALLBACK_FINALLY) {
            callback_fn = callback->finally.on_finally;
            next_promise = callback->finally.next;

            /* A finally() “catches” its parent promise, even as it
               rethrows any failure from it. */
            if (callback_fn && SvOK(callback_fn)) {
                origin->finished.result->rejection_should_warn = false;
            }
        } else {
            next_promise = callback->perl.next;

            if (RESULT_IS_RESOLVED(origin->finished.result)) {
                callback_fn = callback->perl.on_resolve;
            } else if (RESULT_IS_REJECTED(origin->finished.result)) {
                callback_fn = callback->perl.on_reject;

                if (callback_fn && SvOK(callback_fn)) {
                    origin->finished.result->rejection_should_warn = false;
                }

            } else {
                callback_fn = NULL; /* Be quiet, bad compiler! */
                ASSUME(0);
            }
        }

        if (callback_fn != NULL) {
            xspr_result_t* callback_result;

            if (callback->type == XSPR_CALLBACK_FINALLY) {
                callback_result = xspr_invoke_perl(aTHX_ callback_fn, NULL, 0);
            }
            else {
                callback_result = xspr_invoke_perl(aTHX_
                    callback_fn,
                    origin->finished.result->results,
                    origin->finished.result->count
                );
            }

            if (next_promise == NULL) {
                if (callback->type == XSPR_CALLBACK_FINALLY && RESULT_IS_RESOLVED(callback_result) && RESULT_IS_REJECTED(origin->finished.result)) {

                    /* This handles the case where finally() is called in
                       void context and the parent promise rejects. In this
                       case we need an unhandled-rejection warning right
                       away since, given the absence of a next_promise,
                       by definition we have an unhandled rejection.
                    */
                    xspr_result_decref(aTHX_ callback_result);
                    callback_result = pxs_result_clone( aTHX_ origin->finished.result );
                }
            }
            else {
                bool finish_promise = true;

                if (callback_result->count > 0 && callback_result->state == XSPR_RESULT_RESOLVED) {
                    xspr_promise_t* promise = xspr_promise_from_sv(aTHX_ callback_result->results[0]);

                    if (promise != NULL) {

                        if (callback_result->count > 1) {
                            warn( BASE_CLASS ": %d extra response(s) returned after promise! Treating promise like normal return.", callback_result->count - 1 );
                        }
                        else if (promise == next_promise) {
                            finish_promise = false;

                            /* This is an extreme corner case the A+ spec made us implement: we need to reject
                            * cases where the promise created from then() is passed back to its own callback */
                            xspr_result_t* chain_error = xspr_result_from_error(aTHX_ "TypeError");
                            xspr_promise_finish(aTHX_ next_promise, chain_error);

                            xspr_result_decref(aTHX_ chain_error);
                        }
                        else {
                            finish_promise = false;

                            /* Fairly normal case: we returned a promise from the callback */
                            xspr_callback_t* chainback;

                            if (callback->type == XSPR_CALLBACK_FINALLY) {
                                chainback = xspr_callback_new_finally_chain(aTHX_ origin->finished.result, next_promise);
                            }
                            else {
                                chainback = xspr_callback_new_chain(aTHX_ next_promise);
                            }

                            xspr_promise_then(aTHX_ promise, chainback);
                        }

                        xspr_promise_decref(aTHX_ promise);
                    }
                }

                if (finish_promise) {
                    xspr_result_t* final_result;
                    bool final_result_needs_decref = false;;

                    if ((callback->type == XSPR_CALLBACK_FINALLY) && RESULT_IS_RESOLVED(callback_result)) {
                        final_result = origin->finished.result;

                        if (RESULT_IS_REJECTED(final_result)) {

                            // If finally()’s callback succeeds, it takes
                            // on the resolution status of the “parent”
                            // promise. If that promise rejected, then,
                            // the finally’s promise also rejects. Notably,
                            // the finally’s promise should STILL trigger
                            // an unhandled-rejection warning, even if the
                            // parent’s rejection is eventually handled.
                            final_result = pxs_result_clone(aTHX_ final_result);
                            final_result_needs_decref = true;
                        }
                    }
                    else {
                        final_result = callback_result;
                    }

                    xspr_promise_finish(aTHX_ next_promise, final_result);

                    if (final_result_needs_decref) {
                        xspr_result_decref(aTHX_ final_result);
                    }
                }
            }

            xspr_result_decref(aTHX_ callback_result);

        } else if (next_promise) {
            /* No callback, so we're just passing the result along. */
            xspr_result_t* result = origin->finished.result;
            xspr_promise_finish(aTHX_ next_promise, result);
        }

    } else {
        ASSUME(0);
    }
}

/* Frees the xspr_callback_t structure */
void xspr_callback_free(pTHX_ xspr_callback_t *callback)
{
    if (callback->type == XSPR_CALLBACK_CHAIN) {
        xspr_promise_decref(aTHX_ callback->chain);

    } else if (callback->type == XSPR_CALLBACK_PERL) {
        SvREFCNT_dec(callback->perl.on_resolve);
        SvREFCNT_dec(callback->perl.on_reject);
        if (callback->perl.next != NULL)
            xspr_promise_decref(aTHX_ callback->perl.next);

    } else if (callback->type == XSPR_CALLBACK_FINALLY) {
        SvREFCNT_dec(callback->finally.on_finally);
        if (callback->finally.next != NULL)
            xspr_promise_decref(aTHX_ callback->finally.next);

    } else if (callback->type == XSPR_CALLBACK_FINALLY_CHAIN) {
        xspr_promise_decref(aTHX_ callback->finally_chain.chain_promise);
        xspr_result_decref(aTHX_ callback->finally_chain.original_result);

    } else {
        ASSUME(0);
    }

    Safefree(callback);
}

/* Process the queue until it's empty */
void xspr_queue_flush(pTHX)
{
    dMY_CXT;

    if (MY_CXT.in_flush) {
        /* XXX: is there a reasonable way to trigger this? */
        warn("Rejecting request to flush promises queue: already processing");
        return;
    }
    MY_CXT.in_flush = 1;

    while (MY_CXT.queue_head != NULL) {
        /* Save some typing... */
        xspr_callback_queue_t *cur = MY_CXT.queue_head;

        /* Process the callback. This could trigger some Perl code, meaning we
         * could end up with additional queue entries after this */
        xspr_callback_process(aTHX_ cur->callback, cur->origin);

        /* Free-ing the callback structure could theoretically trigger DESTROY subs,
         * enqueueing new callbacks, so we can't assume the loop ends here! */
        MY_CXT.queue_head = cur->next;
        if (cur->next == NULL) {
            MY_CXT.queue_tail = NULL;
        }

        /* Destroy the structure */
        xspr_callback_free(aTHX_ cur->callback);
        xspr_promise_decref(aTHX_ cur->origin);
        Safefree(cur);
    }

    MY_CXT.in_flush = 0;
    MY_CXT.backend_scheduled = 0;
}

/* Add a callback invocation into the queue for the given origin promise.
 * Takes ownership of the callback structure */
void xspr_queue_add(pTHX_ xspr_callback_t* callback, xspr_promise_t* origin)
{
    dMY_CXT;

    xspr_callback_queue_t* entry;
    Newxz(entry, 1, xspr_callback_queue_t);
    entry->origin = origin;
    xspr_promise_incref(aTHX_ entry->origin);
    entry->callback = callback;

    if (MY_CXT.queue_head == NULL) {
        ASSUME(MY_CXT.queue_tail == NULL);
        /* Empty queue, so now it's just us */
        MY_CXT.queue_head = entry;
        MY_CXT.queue_tail = entry;

    } else {
        ASSUME(MY_CXT.queue_tail != NULL);
        /* Existing queue, add to the tail */
        MY_CXT.queue_tail->next = entry;
        MY_CXT.queue_tail = entry;
    }
}

void _call_with_1_or_2_args( pTHX_ SV* cb, SV* maybe_arg0, SV* arg1 ) {
    // --- Almost all copy-paste from “perlcall” … blegh!
    dSP;

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);

    if (maybe_arg0) {
        EXTEND(SP, 2);
        PUSHs(maybe_arg0);
    }
    else {
        EXTEND(SP, 1);
    }

    PUSHs( arg1 );
    PUTBACK;

    call_sv(cb, G_VOID);

    FREETMPS;
    LEAVE;

    return;
}

void _call_pv_with_args( pTHX_ const char* subname, SV** args, unsigned argscount )
{
    // --- Almost all copy-paste from “perlcall” … blegh!
    dSP;

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    EXTEND(SP, argscount);

    unsigned i;
    for (i=0; i<argscount; i++) {
        PUSHs(args[i]);
    }

    PUTBACK;

    call_pv(subname, G_VOID);

    FREETMPS;
    LEAVE;

    return;
}

void xspr_queue_maybe_schedule(pTHX)
{
    dMY_CXT;
    if (MY_CXT.queue_head == NULL || MY_CXT.backend_scheduled || MY_CXT.in_flush) {
        return;
    }

    MY_CXT.backend_scheduled = 1;
    /* We trust our backends to be sane, so little guarding against errors here */

    if (!MY_CXT.pxs_flush_cr) {
        HV *stash = gv_stashpv(DEFERRED_CLASS, 0);
        GV* method_gv = gv_fetchmethod_autoload(stash, "___flush", FALSE);
        if (method_gv != NULL && isGV(method_gv) && GvCV(method_gv) != NULL) {
            MY_CXT.pxs_flush_cr = newRV_inc( (SV*)GvCV(method_gv) );
        }
        else {
            ASSUME(0);
        }
    }

    _call_with_1_or_2_args(aTHX_ MY_CXT.deferral_cr, MY_CXT.deferral_arg, MY_CXT.pxs_flush_cr);
}

/* Invoke the user's perl code. We need to be really sure this doesn't return early via croak/next/etc. */
xspr_result_t* xspr_invoke_perl(pTHX_ SV* perl_fn, SV** inputs, unsigned input_count)
{
    dSP;
    unsigned count, i;
    xspr_result_t* result;

    if (!SvROK(perl_fn)) {
        return xspr_result_from_error(aTHX_ "promise callbacks need to be a CODE reference");
    }

    ENTER;
    SAVETMPS;

    PUSHMARK(SP);
    EXTEND(SP, input_count);
    for (i = 0; i < input_count; i++) {
        PUSHs(inputs[i]);
    }
    PUTBACK;

    /* Clear $_ so that callbacks don't end up talking to each other by accident */
    SAVE_DEFSV;
    DEFSV_set(sv_newmortal());

    count = call_sv(perl_fn, G_EVAL | G_ARRAY);

    SPAGAIN;

    if (SvTRUE(ERRSV)) {
        result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, 1);
        result->results[0] = newSVsv(ERRSV);
    } else {
        result = xspr_result_new(aTHX_ XSPR_RESULT_RESOLVED, count);
        for (i = 0; i < count; i++) {
            result->results[count-i-1] = SvREFCNT_inc(POPs);
        }
    }
    PUTBACK;

    FREETMPS;
    LEAVE;

    return result;
}

/* Increments the ref count for xspr_result_t */
void xspr_result_incref(pTHX_ xspr_result_t* result)
{
    result->refs++;
}

/* Decrements the ref count for the xspr_result_t, freeing the structure if needed */
void xspr_result_decref(pTHX_ xspr_result_t* result)
{
    if (--(result->refs) == 0) {
        if (RESULT_IS_REJECTED(result) && result->rejection_should_warn) {
            SV* warn_args[result->count];

            // Dupe the results to warn about:
            Copy(result->results, warn_args, result->count, SV*);

            _call_pv_with_args(aTHX_ "Promise::XS::Promise::_warn_unhandled", warn_args, result->count);
        }

        unsigned i;
        for (i = 0; i < result->count; i++) {
            SvREFCNT_dec(result->results[i]);
        }
        Safefree(result->results);
        Safefree(result);
    }
}

void xspr_immediate_process(pTHX_ xspr_callback_t* callback, xspr_promise_t* promise)
{
    dMY_CXT;

    MY_CXT.callback_depth++;

    xspr_callback_process(aTHX_ callback, promise);

    MY_CXT.callback_depth--;

    /* Destroy the structure */
    xspr_callback_free(aTHX_ callback);
}

#define _XSPR_FREE_ON_READY_IMMEDIATE(promise) \
    SvREFCNT_dec(SvRV(promise->on_ready_immediate)); \
    SvREFCNT_dec(promise->on_ready_immediate);

/* Transitions a promise from pending to finished, using the given result */
void xspr_promise_finish(pTHX_ xspr_promise_t* promise, xspr_result_t* result)
{
    dMY_CXT;

    ASSUME(promise->state == XSPR_STATE_PENDING);
    xspr_callback_t** pending_callbacks = promise->pending.callbacks;
    int count = promise->pending.callbacks_count;

    promise->state = XSPR_STATE_FINISHED;
    promise->finished.result = result;
    xspr_result_incref(aTHX_ promise->finished.result);

    /* fprintf(stderr, "finishing p=%p (%d callbacks)\n", promise, count); */

    /* For async/await: */
    if (promise->on_ready_immediate != NULL) {
        xspr_invoke_perl(aTHX_ promise->on_ready_immediate, NULL, 0);

        _XSPR_FREE_ON_READY_IMMEDIATE(promise);
        promise->on_ready_immediate = NULL;
    }

    unsigned i;
    for (i = 0; i < count; i++) {

        // If any of this promise’s callbacks has an on_reject, then
        // the promise’s result is rejection-handled.
        if (pending_callbacks[i]->type == XSPR_CALLBACK_PERL && RESULT_IS_REJECTED(result) && result->rejection_should_warn) {
            SV* on_reject = pending_callbacks[i]->perl.on_reject;
            if (on_reject && SvOK(on_reject)) {
                result->rejection_should_warn = false;
            }
        }

        if (MY_CXT.deferral_cr) {
            xspr_queue_add(aTHX_ pending_callbacks[i], promise);
        }
        else {
            xspr_immediate_process(aTHX_ pending_callbacks[i], promise);
        }
    }

    if (promise->self_sv_ref != NULL) {

        // After we set self_sv_ref, Future::AsyncAwait manipulates
        // things a bit such that WEAKREF is set on the reference and
        // the referent’s refcount is decremented. Thus, we can forgo
        // the reference-count decrement here. We still check for the
        // WEAKREF flag, though, just in case something changed.
        //
        if (!SvWEAKREF(promise->self_sv_ref)) {
            SvREFCNT_dec(SvRV(promise->self_sv_ref));
        }

        SvREFCNT_dec(promise->self_sv_ref);
        promise->self_sv_ref = NULL;
    }

    if (MY_CXT.deferral_cr) {
        xspr_queue_maybe_schedule(aTHX);
    }

    Safefree(pending_callbacks);
}

/* Create a new xspr_result_t object with the given number of item slots */
xspr_result_t* xspr_result_new(pTHX_ xspr_result_state_t state, unsigned count)
{
    xspr_result_t* result;
    Newxz(result, 1, xspr_result_t);
    Newxz(result->results, count, SV*);
    result->rejection_should_warn = true;
    result->state = state;
    result->refs = 1;
    result->count = count;
    return result;
}

xspr_result_t* pxs_result_clone(pTHX_ xspr_result_t* old)
{
    xspr_result_t* new = xspr_result_new(aTHX_ old->state, old->count);

    unsigned i;
    for (i=0; i<old->count; i++) {
        new->results[i] = SvREFCNT_inc( old->results[i] );
    }

    return new;
}

xspr_result_t* xspr_result_from_error(pTHX_ const char *error)
{
    xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, 1);
    result->results[0] = newSVpv(error, 0);
    return result;
}

/* Increments the ref count for xspr_promise_t */
void xspr_promise_incref(pTHX_ xspr_promise_t* promise)
{
    (promise->refs)++;
}

/* Decrements the ref count for the xspr_promise_t, freeing the structure if needed */
void xspr_promise_decref(pTHX_ xspr_promise_t *promise)
{
    if (--(promise->refs) == 0) {
        if (promise->state == XSPR_STATE_PENDING) {
            /* XXX: is this a bad thing we should warn for? */
            int count = promise->pending.callbacks_count;
            xspr_callback_t **callbacks = promise->pending.callbacks;
            int i;
            for (i = 0; i < count; i++) {
                xspr_callback_free(aTHX_ callbacks[i]);
            }
            Safefree(callbacks);

        } else if (promise->state == XSPR_STATE_FINISHED) {
            xspr_result_decref(aTHX_ promise->finished.result);

        } else {
            ASSUME(0);
        }

        if (promise->on_ready_immediate != NULL) {
            _XSPR_FREE_ON_READY_IMMEDIATE(promise);
        }

        Safefree(promise);
    }
}

/* Creates a new promise. It's that simple. */
xspr_promise_t* xspr_promise_new(pTHX)
{
    xspr_promise_t* promise;
    Newxz(promise, 1, xspr_promise_t);

    *promise = (xspr_promise_t) {
        .refs = 1,
        .state = XSPR_STATE_PENDING,
    };

    return promise;
}

xspr_callback_t* xspr_callback_new_perl(pTHX_ SV* on_resolve, SV* on_reject, xspr_promise_t* next)
{
    xspr_callback_t* callback;
    Newxz(callback, 1, xspr_callback_t);
    callback->type = XSPR_CALLBACK_PERL;
    if (SvOK(on_resolve))
        callback->perl.on_resolve = newSVsv(on_resolve);
    if (SvOK(on_reject))
        callback->perl.on_reject = newSVsv(on_reject);
    callback->perl.next = next;
    if (next)
        xspr_promise_incref(aTHX_ callback->perl.next);
    return callback;
}

xspr_callback_t* xspr_callback_new_finally(pTHX_ SV* on_finally, xspr_promise_t* next)
{
    xspr_callback_t* callback;
    Newxz(callback, 1, xspr_callback_t);
    callback->type = XSPR_CALLBACK_FINALLY;
    if (SvOK(on_finally))
        callback->finally.on_finally = newSVsv(on_finally);
    callback->finally.next = next;
    if (next)
        xspr_promise_incref(aTHX_ callback->finally.next);
    return callback;
}

xspr_callback_t* xspr_callback_new_chain(pTHX_ xspr_promise_t* chain)
{
    xspr_callback_t* callback;
    Newxz(callback, 1, xspr_callback_t);
    callback->type = XSPR_CALLBACK_CHAIN;
    callback->chain = chain;
    xspr_promise_incref(aTHX_ chain);
    return callback;
}

xspr_callback_t* xspr_callback_new_finally_chain(pTHX_ xspr_result_t* original_result, xspr_promise_t* next_promise)
{
    xspr_callback_t* callback;
    Newxz(callback, 1, xspr_callback_t);
    callback->type = XSPR_CALLBACK_FINALLY_CHAIN;

    /*
    callback->finally_chain.original_result = original_result;
    xspr_result_incref(aTHX_ original_result);
    */
    callback->finally_chain.original_result = pxs_result_clone(aTHX_ original_result);

    callback->finally_chain.chain_promise = next_promise;
    xspr_promise_incref(aTHX_ next_promise);

    return callback;
}

/* Adds a then to the promise. Takes ownership of the callback */
void xspr_promise_then(pTHX_ xspr_promise_t* promise, xspr_callback_t* callback)
{
    dMY_CXT;

    if (promise->state == XSPR_STATE_PENDING) {
        promise->pending.callbacks_count++;
        Renew(promise->pending.callbacks, promise->pending.callbacks_count, xspr_callback_t*);
        promise->pending.callbacks[promise->pending.callbacks_count-1] = callback;

    } else if (promise->state == XSPR_STATE_FINISHED) {

        if (MY_CXT.deferral_cr) {
            xspr_queue_add(aTHX_ callback, promise);
            xspr_queue_maybe_schedule(aTHX);
        }
        else {
            xspr_immediate_process(aTHX_ callback, promise);
        }
    } else {
        ASSUME(0);
    }
}

/* Returns a promise if the given SV is a thenable. Ownership handed to the caller! */
xspr_promise_t* xspr_promise_from_sv(pTHX_ SV* input)
{
    if (input == NULL || !sv_isobject(input)) {
        return NULL;
    }

    /* If we got one of our own promises: great, not much to do here! */
    if (sv_derived_from(input, PROMISE_CLASS)) {
        IV tmp = SvIV((SV*)SvRV(input));
        PROMISE_CLASS_TYPE* promise = INT2PTR(PROMISE_CLASS_TYPE*, tmp);
        xspr_promise_incref(aTHX_ promise->promise);
        return promise->promise;
    }

    /* Maybe we got another type of promise. Let's convert it */
    GV* method_gv = gv_fetchmethod_autoload(SvSTASH(SvRV(input)), "then", FALSE);
    if (method_gv != NULL && isGV(method_gv) && GvCV(method_gv) != NULL) {

        CV* converter_cv = get_cv(BASE_CLASS "::" CONVERTER_CR_NAME, 0);
        if (!converter_cv) croak("Need " CONVERTER_CR_NAME "!");

        SV* converter_svcv = newRV_inc((SV*) converter_cv);
        sv_2mortal(converter_svcv);

        xspr_result_t* new_result = xspr_invoke_perl(aTHX_ converter_svcv, &input, 1);
        if (new_result->state == XSPR_RESULT_RESOLVED &&
            new_result->results != NULL &&
            new_result->count == 1 &&
            SvROK(new_result->results[0]) &&
            sv_derived_from(new_result->results[0], PROMISE_CLASS)) {
            /* This is expected: our conversion function returned us one of our own promises */
            IV tmp = SvIV((SV*)SvRV(new_result->results[0]));
            PROMISE_CLASS_TYPE* new_promise = INT2PTR(PROMISE_CLASS_TYPE*, tmp);

            xspr_promise_t* promise = new_promise->promise;
            xspr_promise_incref(aTHX_ promise);

            xspr_result_decref(aTHX_ new_result);
            return promise;

        } else {
            xspr_promise_t* promise = xspr_promise_new(aTHX);
            xspr_promise_finish(aTHX_ promise, new_result);
            xspr_result_decref(aTHX_ new_result);
            return promise;
        }
    }

    /* We didn't get a promise. */
    return NULL;
}

DEFERRED_CLASS_TYPE* _get_deferred_from_sv(pTHX_ SV *self_sv) {
    SV *referent = SvRV(self_sv);
    return INT2PTR(DEFERRED_CLASS_TYPE*, SvUV(referent));
}

PROMISE_CLASS_TYPE* _get_promise_from_sv(pTHX_ SV *self_sv) {
    SV *referent = SvRV(self_sv);
    return INT2PTR(PROMISE_CLASS_TYPE*, SvUV(referent));
}

SV* _ptr_to_svrv(pTHX_ void* ptr, HV* stash) {
    SV* referent = newSVuv( PTR2UV(ptr) );
    SV* retval = newRV_noinc(referent);
    sv_bless(retval, stash);

    return retval;
}

static inline xspr_promise_t* create_promise(pTHX) {
    dMY_CXT;

    xspr_promise_t* promise = xspr_promise_new(aTHX);

    SV *detect_leak_perl = NULL;

    SV** dml_svgv = hv_fetchs( MY_CXT.pxs_base_stash, "DETECT_MEMORY_LEAKS", 0 );

    if (dml_svgv) {
        detect_leak_perl = GvSV(*dml_svgv);
    }

    promise->detect_leak_pid = detect_leak_perl && SvTRUE(detect_leak_perl) ? getpid() : 0;

    return promise;
}

/* Many promises are just thrown away after the final callback, no need to allocate a next promise for those */
static inline xspr_promise_t* create_next_promise_if_needed(pTHX_ SV* original, SV** stack_ptr) {
    if (GIMME_V != G_VOID) {
        PROMISE_CLASS_TYPE* next_promise;
        Newxz(next_promise, 1, PROMISE_CLASS_TYPE);

        xspr_promise_t* next = create_promise(aTHX);
        next_promise->promise = next;

        *stack_ptr = sv_newmortal();

        // This would be simpler, but let’s facilitate subclassing.
        // sv_setref_pv(*stack_ptr, PROMISE_CLASS, (void*)next_promise);

        sv_setref_pv(*stack_ptr, NULL, (void*)next_promise);
        sv_bless(*stack_ptr, SvSTASH(SvRV(original)));

        return next;
    }

    return NULL;
}

static inline void _warn_on_destroy_if_needed(pTHX_ xspr_promise_t* promise, SV* self_sv) {
    if (promise->detect_leak_pid && PXS_IS_GLOBAL_DESTRUCTION && promise->detect_leak_pid == getpid()) {
        warn( "======================================================================\nXXXXXX - %s survived until global destruction; memory leak likely!\n======================================================================\n", SvPV_nolen(self_sv) );
    }
}

static inline void _warn_weird_reject_if_needed( pTHX_ SV* self_sv, const char* funcname, I32 my_items ) {

    char *pkgname = NULL;

    HV *stash = (self_sv == NULL) ? NULL : SvSTASH( SvRV(self_sv) );

    if (stash != NULL) {
        pkgname = HvNAME(stash);
    }

    if (pkgname == NULL) pkgname = DEFERRED_CLASS;

    if (my_items == 0) {
        warn( "%s: Empty call to %s()", pkgname, funcname );
    }
    else {
        warn( "%s: %s() called with only uninitialized values (%" IVdf ")", pkgname, funcname, (IV) my_items);
    }
}

static inline void _resolve_promise(pTHX_ xspr_promise_t* promise_p, SV** args, I32 argslen) {
    xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_RESOLVED, argslen);

    unsigned i;
    for (i = 0; i < argslen; i++) {
        result->results[i] = newSVsv(args[i]);
    }

    xspr_promise_finish(aTHX_ promise_p, result);
    xspr_result_decref(aTHX_ result);
}

static inline void _reject_promise(pTHX_ SV* self_sv, xspr_promise_t* promise_p, SV** args, I32 argslen) {
    xspr_result_t* result = xspr_result_new(aTHX_ XSPR_RESULT_REJECTED, argslen);

    bool has_defined = false;

    unsigned i;
    for (i = 0; i < argslen; i++) {
        result->results[i] = newSVsv(args[i]);

        if (!has_defined && SvOK(result->results[i])) {
            has_defined = true;
        }
    }

    if (!has_defined) {
        const char* funcname = (self_sv == NULL) ? "rejected" : "reject";

        _warn_weird_reject_if_needed( aTHX_ self_sv, funcname, argslen );
    }

    xspr_promise_finish(aTHX_ promise_p, result);
    xspr_result_decref(aTHX_ result);
}

SV* _promise_to_sv(pTHX_ xspr_promise_t* promise_p) {
    dMY_CXT;

    PROMISE_CLASS_TYPE* promise_ptr;
    Newxz(promise_ptr, 1, PROMISE_CLASS_TYPE);
    promise_ptr->promise = promise_p;

    return _ptr_to_svrv(aTHX_ promise_ptr, MY_CXT.pxs_promise_stash);
}

/* When Future::AsyncAwait creates a promise/future it does NOT
   hold a strong reference to that object. Consequently, we have to
   ensure that the object lasts until we’re done with it. So introduce
   a (temporary!) circular reference. */
#define _IMMORTALIZE_PROMISE_SV(promise_sv, promise_p) \
    do { \
        /* fprintf(stderr, "making immortal: sv=%p p=%p\n", promise_sv, promise_p); */ \
        promise_p->self_sv_ref = promise_sv; \
        SvREFCNT_inc(promise_sv); \
        SvREFCNT_inc(SvRV(promise_sv)); \
    } while (0)

static inline SV* _create_preresolved_promise(pTHX_ SV** args, I32 argslen, bool immortalize) {
    xspr_promise_t* promise_p = create_promise(aTHX);

    _resolve_promise(aTHX_ promise_p, args, argslen);

    SV* promise_sv = _promise_to_sv(aTHX_ promise_p);

    if (immortalize) _IMMORTALIZE_PROMISE_SV(promise_sv, promise_p);

    return promise_sv;
}

static inline SV* _create_prerejected_promise(pTHX_ SV** args, I32 argslen, bool immortalize) {
    xspr_promise_t* promise_p = create_promise(aTHX);

    _reject_promise(aTHX_ NULL, promise_p, args, argslen);

    SV* promise_sv = _promise_to_sv(aTHX_ promise_p);

    if (immortalize) _IMMORTALIZE_PROMISE_SV(promise_sv, promise_p);

    return promise_sv;
}

//----------------------------------------------------------------------
static SV* _get_nothing_cr_arg (pTHX) {
    return SvREFCNT_inc( get_sv("Promise::XS::Deferred::_NOTHING_CR", 0) );
}

static void _anyevent_wait_promise (pTHX_ SV* promise_sv) {
    SV* condvar = exs_call_method_scalar(
        sv_2mortal( newSVpvs("AnyEvent") ),
        "condvar",
        NULL
    );

    SV* catch_args[] = {
        _get_nothing_cr_arg(aTHX),
        NULL,
    };

    SV* caught = exs_call_method_scalar(
        promise_sv,
        "catch",
        catch_args
    );

    SV* finally_args[] = {
        SvREFCNT_inc(condvar),
        NULL,
    };

    exs_call_method_void(
        caught,
        "finally",
        finally_args
    );

    sv_2mortal(caught);

    exs_call_method_void(
        condvar,
        "recv",
        NULL
    );

    sv_2mortal(condvar);
}

static void _ioasync_wait_promise (pTHX_ SV* promise_sv, SV* loop_sv, SV* stop_cr) {
    SV* catch_args[] = {
        _get_nothing_cr_arg(aTHX),
        NULL,
    };

    SV* caught = exs_call_method_scalar(
        promise_sv,
        "catch",
        catch_args
    );

    SV* finally_args[] = { SvREFCNT_inc(stop_cr), NULL };

    exs_call_method_void(
        caught,
        "finally",
        finally_args
    );

    sv_2mortal(caught);

    exs_call_method_void(
        loop_sv,
        "run",
        NULL
    );
}

static void _mojo_wait_promise(pTHX_ SV* promise_sv, SV* stop_cr) {
    SV* catch_args[] = {
        _get_nothing_cr_arg(aTHX),
        NULL,
    };

    SV* caught = exs_call_method_scalar(
        promise_sv,
        "catch",
        catch_args
    );

    SV* finally_args[] = { SvREFCNT_inc(stop_cr), NULL };

    exs_call_method_void(
        caught,
        "finally",
        finally_args
    );

    sv_2mortal(caught);

    exs_call_method_void(
        sv_2mortal( newSVpvs("Mojo::IOLoop") ),
        "start",
        NULL
    );
}

//----------------------------------------------------------------------

MODULE = Promise::XS     PACKAGE = Promise::XS

BOOT:
{
    MY_CXT_INIT;
#ifdef USE_ITHREADS
    MY_CXT.owner = aTHX;
#endif
    MY_CXT.queue_head = NULL;
    MY_CXT.queue_tail = NULL;
    MY_CXT.in_flush = 0;
    MY_CXT.backend_scheduled = 0;
    MY_CXT.callback_depth = 0;

    MY_CXT.pxs_base_stash = gv_stashpv(BASE_CLASS, FALSE);
    MY_CXT.pxs_promise_stash = gv_stashpv(PROMISE_CLASS, FALSE);
    MY_CXT.pxs_deferred_stash = gv_stashpv(DEFERRED_CLASS, FALSE);

    MY_CXT.deferral_cr = NULL;
    MY_CXT.deferral_arg = NULL;
    MY_CXT.event_system = _DEFER_NONE;
    MY_CXT.stop_cr = NULL;
    MY_CXT.pxs_flush_cr = NULL;
}

# In some old thread-multi perls sv_dup_inc() wasn’t defined.

#if defined(USE_ITHREADS) && defined(sv_dup_inc)

# ithreads would seem to be a very bad idea in Promise-based code,
# but anyway ..

void
CLONE(...)
    PPCODE:

        SV* pxs_flush_cr = NULL;
        SV* deferral_cr = NULL;
        event_system_t event_system;
        SV* deferral_arg = NULL;
        SV* stop_cr = NULL;

        {
            dMY_CXT;

            CLONE_PARAMS params = {NULL, 0, MY_CXT.owner};

            if ( MY_CXT.pxs_flush_cr ) {
                pxs_flush_cr = sv_dup_inc( MY_CXT.pxs_flush_cr, &params );
            }

            if ( MY_CXT.deferral_cr ) {
                deferral_cr = sv_dup_inc( MY_CXT.deferral_cr, &params );
            }

            if ( MY_CXT.deferral_arg ) {
                deferral_arg = sv_dup_inc( MY_CXT.deferral_arg, &params );
            }

            event_system = MY_CXT.event_system;

            if ( MY_CXT.stop_cr ) {
                stop_cr = sv_dup_inc( MY_CXT.stop_cr, &params );
            }
        }

        {
            MY_CXT_CLONE;
            MY_CXT.owner = aTHX;

            // Clone SVs
            MY_CXT.pxs_flush_cr = pxs_flush_cr;
            MY_CXT.deferral_cr = deferral_cr;
            MY_CXT.deferral_arg = deferral_arg;
            MY_CXT.event_system = event_system;
            MY_CXT.stop_cr      = stop_cr;

            // Clone HVs
            MY_CXT.pxs_base_stash = gv_stashpv(BASE_CLASS, FALSE);
            MY_CXT.pxs_promise_stash = gv_stashpv(PROMISE_CLASS, FALSE);
            MY_CXT.pxs_deferred_stash = gv_stashpv(DEFERRED_CLASS, FALSE);
        }

        XSRETURN_UNDEF;

#endif /* USE_ITHREADS && defined(sv_dup_inc) */

SV *
resolved(...)
    CODE:
        RETVAL = _create_preresolved_promise(aTHX_ &(ST(0)), items, false);
    OUTPUT:
        RETVAL

SV *
rejected(...)
    CODE:
        RETVAL = _create_prerejected_promise(aTHX_ &(ST(0)), items, false);
    OUTPUT:
        RETVAL

#----------------------------------------------------------------------

MODULE = Promise::XS     PACKAGE = Promise::XS::Deferred

PROTOTYPES: DISABLE

BOOT:
    newCONSTSUB( gv_stashpvs(BASE_CLASS "::Deferred", FALSE), "_DEFER_ANYEVENT", newSVuv(_DEFER_ANYEVENT));
    newCONSTSUB( gv_stashpvs(BASE_CLASS "::Deferred", FALSE), "_DEFER_IOASYNC", newSVuv(_DEFER_IOASYNC));
    newCONSTSUB( gv_stashpvs(BASE_CLASS "::Deferred", FALSE), "_DEFER_MOJO", newSVuv(_DEFER_MOJO));

SV *
create()
    CODE:
        dMY_CXT;

        DEFERRED_CLASS_TYPE* deferred_ptr;
        Newxz(deferred_ptr, 1, DEFERRED_CLASS_TYPE);

        xspr_promise_t* promise = create_promise(aTHX);

        deferred_ptr->promise = promise;

        RETVAL = _ptr_to_svrv(aTHX_ deferred_ptr, MY_CXT.pxs_deferred_stash);
    OUTPUT:
        RETVAL

void
___set_deferral_generic(SV* deferral_cr, SV* deferral_arg, UV event_system, SV* stop_cr=NULL)
    CODE:
        dMY_CXT;

        // deferral_cr = SvRV(deferral_cr);

        if (MY_CXT.deferral_cr) {
            SvREFCNT_dec(MY_CXT.deferral_cr);
        }

        if (MY_CXT.deferral_arg) {
            SvREFCNT_dec(MY_CXT.deferral_arg);
        }

        if (MY_CXT.stop_cr) {
            SvREFCNT_dec(MY_CXT.stop_cr);
        }

        MY_CXT.deferral_cr = SvREFCNT_inc(deferral_cr);

        MY_CXT.deferral_arg = SvOK(deferral_arg) ? SvREFCNT_inc(deferral_arg) : NULL;

        MY_CXT.event_system = event_system;

        MY_CXT.stop_cr = stop_cr ? SvREFCNT_inc(stop_cr) : NULL;

# We don’t care if there are args or not.
void
___flush(...)
    CODE:
        UNUSED(items);
        xspr_queue_flush(aTHX);

SV*
promise(SV* self_sv)
    CODE:
        DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);

        xspr_promise_incref(aTHX_ self->promise);

        RETVAL = _promise_to_sv(aTHX_ self->promise);
    OUTPUT:
        RETVAL

SV*
resolve(SV *self_sv, ...)
    CODE:
        DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);

        if (self->promise->state != XSPR_STATE_PENDING) {
            croak("Cannot resolve deferred: not pending");
        }

        _resolve_promise(aTHX_ self->promise, &(ST(1)), items - 1);

        if (GIMME_V == G_VOID) {
            RETVAL = NULL;
        }
        else {
            SvREFCNT_inc(self_sv);
            RETVAL = self_sv;
        }
    OUTPUT:
        RETVAL

SV*
reject(SV *self_sv, ...)
    CODE:
        DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);

        if (self->promise->state != XSPR_STATE_PENDING) {
            croak("Cannot reject deferred: not pending");
        }

        _reject_promise(aTHX_ self_sv, self->promise, &(ST(1)), items - 1);

        if (GIMME_V == G_VOID) {
            RETVAL = NULL;
        }
        else {
            SvREFCNT_inc(self_sv);
            RETVAL = self_sv;
        }
    OUTPUT:
        RETVAL

bool
is_pending(SV *self_sv)
    CODE:
        DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);

        RETVAL = (self->promise->state == XSPR_STATE_PENDING);
    OUTPUT:
        RETVAL

SV*
clear_unhandled_rejection(SV *self_sv)
    CODE:
        DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);

        if (self->promise->state == XSPR_STATE_FINISHED) {
            self->promise->finished.result->rejection_should_warn = false;
        }

        if (GIMME_V == G_VOID) {
            RETVAL = NULL;
        }
        else {
            SvREFCNT_inc(self_sv);
            RETVAL = self_sv;
        }
    OUTPUT:
        RETVAL

void
DESTROY(SV *self_sv)
    CODE:
        DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);

        _warn_on_destroy_if_needed(aTHX_ self->promise, self_sv);

        xspr_promise_decref(aTHX_ self->promise);
        Safefree(self);

# ----------------------------------------------------------------------

MODULE = Promise::XS     PACKAGE = Promise::XS::Promise

PROTOTYPES: DISABLE

void
then(SV* self_sv, SV* on_resolve = NULL, SV* on_reject = NULL)
    PPCODE:
        _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;

        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);

        xspr_promise_t* next;

        if (on_resolve == NULL) on_resolve = &PL_sv_undef;
        if (on_reject == NULL) on_reject = &PL_sv_undef;

        next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));

        xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ on_resolve, on_reject, next);
        xspr_promise_then(aTHX_ self->promise, callback);

        XSRETURN(next ? 1 : 0);

void
catch(SV* self_sv, SV* on_reject)
    PPCODE:
        _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;

        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);

        xspr_promise_t* next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));

        xspr_callback_t* callback = xspr_callback_new_perl(aTHX_ &PL_sv_undef, on_reject, next);
        xspr_promise_then(aTHX_ self->promise, callback);

        XSRETURN(next ? 1 : 0);

void
finally(SV* self_sv, SV* on_finally)
    PPCODE:
        _CROAK_IF_LOOKS_LIKE_INFINITE_RECURSION;

        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);

        xspr_promise_t* next = create_next_promise_if_needed(aTHX_ self_sv, &ST(0));

        xspr_callback_t* callback = xspr_callback_new_finally(aTHX_ on_finally, next);
        xspr_promise_then(aTHX_ self->promise, callback);

        XSRETURN(next ? 1 : 0);

void
DESTROY(SV* self_sv)
    CODE:
        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
        /* fprintf(stderr, "DESTROYing sv=%p, p=%p\n", self_sv, self->promise); */

        _warn_on_destroy_if_needed(aTHX_ self->promise, self_sv);

        xspr_promise_decref(aTHX_ self->promise);
        Safefree(self);

# ----------------------------------------------------------------------
# Future::AsyncAwait interface:
# ----------------------------------------------------------------------

SV*
AWAIT_NEW_DONE(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);
        RETVAL = _create_preresolved_promise(aTHX_ &(ST(1)), items - 1, true);
    OUTPUT:
        RETVAL

SV*
AWAIT_NEW_FAIL(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);
        RETVAL = _create_prerejected_promise(aTHX_ &(ST(1)), items - 1, true);
    OUTPUT:
        RETVAL

SV*
AWAIT_CLONE(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);

        xspr_promise_t* promise_p = create_promise(aTHX);

        RETVAL = _promise_to_sv(aTHX_ promise_p);

        _IMMORTALIZE_PROMISE_SV(RETVAL, promise_p);

        if (DEBUG_AWAITABLE) {
            fprintf(stderr, "#   SvREFCNT(RETVAL)=%d\n", SvREFCNT(RETVAL));
            fprintf(stderr, "#   SvREFCNT(SvRV(RETVAL))=%d\n", SvREFCNT(SvRV(RETVAL)));
            sv_dump(RETVAL);
        }
    OUTPUT:
        RETVAL

void
AWAIT_DONE(SV* self_sv, ...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
        _resolve_promise(aTHX_ self->promise, &ST(1), items - 1);

void
AWAIT_FAIL(SV* self_sv, ...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);
        _reject_promise(aTHX_ NULL, self->promise, &ST(1), items - 1);

bool
AWAIT_IS_READY(SV *self_sv)
    CODE:
        _DO_DEBUG_AWAITABLE();
        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);

        RETVAL = (self->promise->state != XSPR_STATE_PENDING);
    OUTPUT:
        RETVAL

void
AWAIT_GET(SV *self_sv)
    PPCODE:
        _DO_DEBUG_AWAITABLE();
        DEFERRED_CLASS_TYPE* self = _get_deferred_from_sv(aTHX_ self_sv);

        ASSUME(self->promise->state == XSPR_STATE_FINISHED);

        SV** results = self->promise->finished.result->results;
        int result_count = self->promise->finished.result->count;

        if (RESULT_IS_RESOLVED(self->promise->finished.result)) {
            int i;

            if (!result_count) XSRETURN_EMPTY;

            switch (GIMME_V) {

                case G_ARRAY:
                    EXTEND(SP, result_count);

                    for (i=0; i<result_count; i++) {
                        PUSHs( sv_2mortal( newSVsv(results[i]) ) );
                    }

                    XSRETURN(result_count);

                case G_SCALAR:
                    EXTEND(SP, 1);
                    PUSHs( sv_2mortal( newSVsv(results[0]) ) );
                    XSRETURN(1);

                case G_VOID:
                    XSRETURN_EMPTY;

                default:
                    ASSUME(0);
            }
        }
        else {
            SV* err;
            if (result_count) {
                err = sv_2mortal( newSVsv( results[0] ) );
            }
            else {
                err = &PL_sv_undef;
            }

            croak_sv(err);
        }

void
AWAIT_CHAIN_CANCEL(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);

void
AWAIT_ON_CANCEL(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);

UV
AWAIT_IS_CANCELLED(...)
    CODE:
        _DO_DEBUG_AWAITABLE();
        UNUSED(items);
        RETVAL = 0;
    OUTPUT:
        RETVAL

void
AWAIT_ON_READY(SV *self_sv, SV* coderef)
    CODE:
        _DO_DEBUG_AWAITABLE();
        PROMISE_CLASS_TYPE* self = _get_promise_from_sv(aTHX_ self_sv);

        self->promise->on_ready_immediate = coderef;
        SvREFCNT_inc(coderef);
        SvREFCNT_inc(SvRV(coderef));

void
AWAIT_WAIT(SV* self_sv)
    PPCODE:
        _DO_DEBUG_AWAITABLE();
        dMY_CXT;

        switch (MY_CXT.event_system) {
            case _DEFER_ANYEVENT:
                _anyevent_wait_promise(aTHX_ self_sv);
                break;

            case _DEFER_IOASYNC:
                _ioasync_wait_promise(aTHX_ self_sv, MY_CXT.deferral_arg, MY_CXT.stop_cr);
                break;

            case _DEFER_MOJO:
                _mojo_wait_promise(aTHX_ self_sv, MY_CXT.stop_cr);
                break;

            default:
                croak(BASE_CLASS ": No event loop set up! Did you forget to call use_event()?");
        }

        PUSHMARK(SP);

        int count = call_method("AWAIT_GET", GIMME_V);
        XSRETURN(count);