#include "Streamer.h"

namespace panda { namespace unievent {

Streamer::Streamer (const IInputSP& input, const IOutputSP& output, size_t max_buf, const LoopSP& loop)
    : loop(loop), input(input), output(output), max_buf(max_buf)
{
    assert(input); assert(output);
    input->streamer = output->streamer = this;
    thr_buf = max_buf * 2 / 3;
}

void Streamer::start () {
    auto err = input->start(loop);
    if (err) return finish_event(nest_error(streamer_errc::read_error, err));

    err = output->start(loop);
    if (err) {
        input->stop();
        return finish_event(nest_error(streamer_errc::write_error, err));
    }

    started = true;
}

void Streamer::stop () {
    finish(make_error_code(std::errc::operation_canceled));
}

void Streamer::handle_read (const string& data, const ErrorCode& err) {
    if (err) return finish(nest_error(streamer_errc::read_error, err));

    nread++;
    auto werr = output->write(data);
    if (werr) return finish(nest_error(streamer_errc::write_error, werr));

    if (max_buf && output->write_queue_size() >= max_buf) {
        input->stop_reading();
        inread = false;
    }
}

void Streamer::handle_write (const ErrorCode& err) {
    if (err) return finish(nest_error(streamer_errc::write_error, err));
    nwrite++;

    if (eof && nwrite == nread) return finish();

    if (!inread && output->write_queue_size() <= thr_buf) {
        auto rerr = input->start_reading();
        if (rerr) return finish(nest_error(streamer_errc::read_error, rerr));
        inread = true;
    }
}

void Streamer::handle_eof () {
    if (nwrite == nread) return finish();
    eof = true;
}

void Streamer::finish (const ErrorCode& err) {
    if (!started) return;
    input->stop();
    output->stop();

    started = false;
    inread = true;
    eof = false;
    nread = nwrite = 0;

    finish_event(err);
}

}}