From Code to Community: Sponsoring The Perl and Raku Conference 2025 Learn more

#include "../lib/test.h"
#include <panda/unievent/Streamer.h>
#include <deque>
namespace {
struct TestInput : Streamer::IInput {
TimerSP timer;
size_t size;
size_t speed;
int start_reading_cnt = 0;
int stop_reading_cnt = 0;
TestInput (size_t size, size_t speed) : size(size), speed(speed) {}
ErrorCode start (const LoopSP& loop) override {
//printf("start\n");
timer = new Timer(loop);
timer->start(1);
timer->event.add([this](auto){
//printf("on read\n");
if (!size) {
this->handle_eof();
timer->stop();
return;
}
if (speed > size) speed = size;
this->handle_read(string(speed, 'x'), {});
size -= speed;
});
return {};
}
ErrorCode start_reading () override {
//printf("start reading\n");
timer->start(1);
start_reading_cnt++;
return {};
}
void stop_reading () override {
//printf("stop reading %d\n", stop_reading_cnt);
timer->stop();
stop_reading_cnt++;
}
void stop () override {
//printf("stop\n");
timer->stop();
}
};
struct TestOutput : Streamer::IOutput {
size_t speed;
TimerSP timer;
std::deque<size_t> bufs;
TestOutput (size_t speed) : speed(speed) {}
ErrorCode start (const LoopSP& loop) override {
//printf("writer start\n");
timer = new Timer(loop);
timer->event.add([this](auto){
bufs.pop_front();
//printf("on write que left %d\n", write_queue_size());
this->handle_write({});
this->_write();
});
return {};
}
void stop () override {
//printf("writer stop\n");
timer->stop();
}
ErrorCode write (const string& data) override {
auto len = data.length();
//printf("writer write %d, que=%d\n", len, write_queue_size());
bufs.push_back(len);
if (bufs.size() == 1) _write();
return {};
}
void _write () {
if (!bufs.size()) return;
auto len = bufs.front();
size_t tmt = len / speed;
//printf("writer _write\n");
timer->once(tmt);
}
size_t write_queue_size () const override {
size_t que = 0;
for (auto n : bufs) { que += n; }
return que;
}
};
}