#include "streamer.h"
#include <panda/unievent/streamer/File.h>
#include <panda/unievent/streamer/Stream.h>
TEST_PREFIX("streamer-stream: ", "[streamer-stream]");
using namespace panda::unievent::streamer;
namespace {
struct TestStreamInput : StreamInput {
using StreamInput::StreamInput;
int stop_reading_cnt = 0;
void stop_reading () override {
stop_reading_cnt++;
StreamInput::stop_reading();
}
};
TcpP2P make_pair (const LoopSP& loop, size_t amount, size_t count) {
auto p = make_p2p(loop);
size_t cnt = 0;
TimerSP t = new Timer(loop);
t->event.add([=](auto) mutable {
p.client->write(string(amount, 'x'));
if (++cnt == count) {
t.reset();
p.client->disconnect();
}
});
t->start(1);
return p;
}
}
TEST("normal input") {
AsyncTest test(3000, 1);
auto p = make_pair(test.loop, 10, 10);
auto i = new TestStreamInput(p.sconn);
auto o = new TestOutput(20000);
StreamerSP s = new Streamer(i, o, 100000, test.loop);
s->start();
s->finish_event.add([&](const ErrorCode& err) {
if (err) WARN(err);
CHECK(!err);
test.happens();
test.loop->stop();
});
test.run();
CHECK(i->stop_reading_cnt == 0);
}
#ifndef _WIN32 // win32 has low timer resolution for this test
TEST("pause input") {
AsyncTest test(3000, 1);
auto p = make_pair(test.loop, 1000, 20);
auto i = new TestStreamInput(p.sconn);
auto o = new TestOutput(400);
StreamerSP s = new Streamer(i, o, 3000, test.loop);
s->start();
s->finish_event.add([&](const ErrorCode& err) {
if (err) WARN(err);
CHECK(!err);
test.happens();
test.loop->stop();
});
test.run();
CHECK(i->stop_reading_cnt > 0);
}
#endif
TEST("normal output") {
AsyncTest test(3000, 2);
auto p2 = make_p2p(test.loop);
auto p1 = make_pair(test.loop, 10000, 20);
auto i = new TestStreamInput(p1.sconn);
auto o = new StreamOutput(p2.sconn);
StreamerSP s = new Streamer(i, o, 50000, test.loop);
s->start();
s->finish_event.add([&](const ErrorCode& err) {
if (err) WARN(err);
CHECK(!err);
test.happens();
p2.sconn->disconnect();
});
string res;
p2.client->read_event.add([&](auto&, const string& data, auto...) {
res += data;
});
p2.client->eof_event.add([&](auto){
test.happens();
test.loop->stop();
});
test.run();
CHECK((res == string(200000, 'x')));
}
TEST("file in stream out with busy buffer") {
AsyncTest test(3000, 1);
auto p = make_p2p(test.loop);
string file = "tests/streamer/file.txt";
auto i = new FileInput(file, 10000);
auto o = new StreamOutput(p.sconn);
StreamerSP s = new Streamer(i, o, 100000, test.loop);
s->start();
size_t count = 0;
p.client->read_event.add([&count](auto&, auto& data, auto& err) {
if (err) throw err;
count += data.length();
});
p.client->eof_event.add([&](auto&) {
test.loop->stop();
});
s->finish_event.add([&](auto& err) {
CHECK(!err);
test.happens();
p.sconn->disconnect();
});
string ku = "ku-ku";
p.sconn->write(ku);
test.run();
auto res = Fs::stat(file).value();
CHECK(count == res.size + ku.length());
}