#include "streamer.h"
#include <panda/unievent/streamer/File.h>
TEST_PREFIX("streamer-file: ", "[streamer-file]")
using namespace panda::unievent::streamer;
namespace {
struct TestFileInput : FileInput {
using FileInput::FileInput;
int stop_reading_cnt = 0;
void stop_reading () override {
stop_reading_cnt++;
FileInput::stop_reading();
}
};
static string read_file (string_view path) {
size_t chunk_size = 1000000;
string ret, buf;
auto fd = Fs::open(path, Fs::OpenFlags::RDONLY).value();
do {
buf = Fs::read(fd, chunk_size).value();
ret += buf;
} while (buf.length() == chunk_size);
return ret;
}
}
TEST("normal input") {
AsyncTest test(3000, 1);
auto i = new TestFileInput("tests/streamer/file.txt", 10000);
auto o = new TestOutput(20000);
StreamerSP s = new Streamer(i, o, 100000, test.loop);
s->start();
s->finish_event.add([&](const ErrorCode& err) {
if (err) WARN(err);
CHECK_FALSE(err);
test.happens();
});
test.run();
CHECK(i->stop_reading_cnt == 0);
}
TEST("pause input") {
AsyncTest test(3000, 1);
auto i = new TestFileInput("tests/streamer/file.txt", 30000);
auto o = new TestOutput(2000);
StreamerSP s = new Streamer(i, o, 50000, test.loop);
s->start();
s->finish_event.add([&](const ErrorCode& err) {
if (err) WARN(err);
CHECK_FALSE(err);
test.happens();
});
test.run();
CHECK(i->stop_reading_cnt > 0);
}
TEST("normal output") {
Fs::mkpath("tests/var/streamer").nevermind();
AsyncTest test(3000, 1);
const char* orig_file = "tests/streamer/file.txt";
const char* tmp_file = "tests/var/streamer/fout.txt";
auto i = new TestFileInput(orig_file, 10000);
auto o = new FileOutput(tmp_file);
StreamerSP s = new Streamer(i, o, 100000, test.loop);
s->start();
s->finish_event.add([&](const ErrorCode& err) {
if (err) WARN(err);
CHECK_FALSE(err);
test.happens();
});
test.run();
auto s1 = read_file(orig_file);
auto s2 = read_file(tmp_file);
remove(tmp_file);
CHECK((s1 == s2));
}