#include "Connection.h"
#include "panda/log/log.h"
#include "panda/protocol/websocket/Frame.h"
#include <panda/encode/base16.h>
#include <numeric>
#include <panda/unievent/Tcp.h>
#include <panda/unievent/Pipe.h>
namespace panda { namespace unievent { namespace websocket {
using protocol::websocket::ccfmt;
using protocol::websocket::Frame;
Builder::Builder (Builder&& b) : MessageBuilder(std::move(b)), _connection{b._connection} {}
Builder::Builder (Connection& connection) : MessageBuilder(connection.parser->message()), _connection{connection} {}
WriteRequestSP Builder::send (string_view payload, const send_fn& callback) {
if (!_connection.connected()) {
panda_log_warn("WS: writeing to closed connection");
if (callback) callback(&_connection, errc::WRITE_ERROR, new unievent::WriteRequest());
return nullptr;
}
WriteRequestSP req = new WriteRequest(MessageBuilder::send(payload));
if (callback) req->event.add(_connection.wrap_send_fn(callback));
_connection.stream()->write(req);
return req;
}
void Connection::configure (const Config& conf) {
parser->configure(conf);
shutdown_timeout = conf.shutdown_timeout;
}
static void log_use_after_close () {
panda_log_info("using websocket::Connection after close");
}
void Connection::on_read (string& buf, const ErrorCode& err) {
ConnectionSP hold = this; (void)hold;
panda_log_debug("Websocket on read:" << log::escaped{buf});
assert(_state == State::CONNECTED && parser->established());
if (err) return process_error(nest_error(errc::READ_ERROR, err));
if (stats) {
stats->bytes_in += buf.size();
stats->msgs_in++;
}
auto msg_range = parser->get_messages(buf);
for (const auto& msg : msg_range) {
if (msg->error()) {
panda_log_notice("protocol error: " << msg->error());
process_error(nest_error(errc::READ_ERROR, msg->error()), parser->suggested_close_code());
break;
}
switch (msg->opcode()) {
case Opcode::CLOSE:
panda_log_notice("connection closed by peer:" << ccfmt(msg->close_code(), msg->close_message()));
return process_peer_close(msg);
case Opcode::PING:
on_ping(msg);
break;
case Opcode::PONG:
on_pong(msg);
break;
default:
on_message(msg);
}
if (_state != State::CONNECTED) break;
}
}
void Connection::on_message (const MessageSP& msg) {
panda_log_verbose_debug([&]{
log << "websocket Connection::on_message: payload=\n";
for (const auto& str : msg->payload) log << encode::encode_base16(str);
});
message_event(this, msg);
}
void Connection::send_ping (string_view payload) {
if (_state != State::CONNECTED) return log_use_after_close();
_stream->write(parser->send_ping(payload));
}
void Connection::send_pong (string_view payload) {
if (_state != State::CONNECTED) return log_use_after_close();
_stream->write(parser->send_pong(payload));
}
void Connection::process_peer_close (const MessageSP& msg) {
if (_state == State::INITIAL) return; // just ignore everything, we are here after close
_error_state = true;
auto suggested_code = parser->suggested_close_code();
on_peer_close(msg);
if (_error_state) {
if (msg) close(suggested_code, msg->close_message());
else close(CloseCode::ABNORMALLY);
}
}
void Connection::on_peer_close (const MessageSP& msg) {
peer_close_event(this, msg);
}
void Connection::on_ping (const MessageSP& msg) {
if (msg->payload_length() > Frame::MAX_CONTROL_PAYLOAD) {
panda_log_notice("something weird, ping payload is bigger than possible");
send_pong(); // send pong without payload
} else {
switch (msg->payload.size()) {
case 0: send_pong(); break;
case 1: send_pong(msg->payload.front()); break;
default:
string acc;
for (const auto& str : msg->payload) acc += str;
send_pong(acc);
}
}
ping_event(this, msg);
}
void Connection::on_pong (const MessageSP& msg) {
pong_event(this, msg);
}
void Connection::process_error (const ErrorCode& err, uint16_t code) {
panda_log_notice("websocket error: " << err.message());
if (_state == State::INITIAL) return; // just ignore everything, we are here after close
_error_state = true;
on_error(err);
if (_error_state) close(code);
}
void Connection::on_error (const ErrorCode& err) {
error_event(this, err);
}
void Connection::on_eof () {
ConnectionSP hold = this; (void)hold;
panda_log_notice("websocket on_eof");
process_peer_close(nullptr);
}
void Connection::on_write (const ErrorCode& err, const WriteRequestSP& req) {
ConnectionSP hold = this; (void)hold;
panda_log_debug("websocket on_write: " << err);
if (err && !(err & std::errc::operation_canceled || err & std::errc::broken_pipe || err & std::errc::not_connected)) {
process_error(nest_error(errc::WRITE_ERROR, err));
} else if (stats) {
size_t size = std::accumulate(req->bufs.begin(), req->bufs.end(), size_t(0), [](size_t r, const string& s) {return r + s.size();});
stats->bytes_out += size;
stats->msgs_out++;
}
}
void Connection::on_shutdown(const ErrorCode& err, const ShutdownRequestSP&) {
ConnectionSP hold = this; (void)hold;
panda_log_debug("websocket on_shutdown " << err);
if (err & std::errc::timed_out) {
_stream->reset();
}
}
void Connection::do_close (uint16_t code, const string& payload) {
panda_log_debug("Connection[close]: code=" << ccfmt(code, payload));
bool was_connected = connected();
//in_connected, not out. it checks if we are in eof callback
if (_stream && _stream->in_connected() && was_connected && !parser->send_closed()) {
_stream->write(parser->send_close(code, payload));
}
parser->reset();
_state = State::INITIAL;
// remove stream before resetting because it may call user callbacks and start new connection with new stream
auto stream = std::move(_stream);
_stream = nullptr;
if (stream) {
if (stream->connected()) {
stream->read_stop();
stream->shutdown(shutdown_timeout);
stream->disconnect();
} else {
// will call on_connect / on_write etc with status cancelled and indirectly lead to calling connection failure callbacks
stream->reset();
}
stream->event_listener(nullptr);
}
_error_state = false;
if (was_connected) on_close(code, payload);
}
void Connection::on_close (uint16_t code, const string& payload) {
close_event(this, code, payload);
}
Connection::~Connection () {
// need to call reset to call possible callbacks (write/shutdown/etc)
// TODO: should be exception-guarded
if (_state != State::INITIAL && _stream) _stream->reset();
}
std::ostream& operator<< (std::ostream& stream, const Connection::Config& conf) {
stream << "Connection::Config{max_frame_size:" << conf.max_frame_size
<< ", max_message_size:" << conf.max_message_size
<< ", max_handshake_size:" << conf.max_handshake_size
<< "}";
return stream;
}
namespace {
struct PrettyBytes {
size_t count;
};
std::ostream& operator<< (std::ostream& stream, const PrettyBytes& c) {
std::array<const char*, 5> NAMES = {" B", " KiB", " MiB", " GiB", " TiB"};
size_t val = c.count;
size_t i = 0;
while (val > 1200 && i < NAMES.size() - 1) {
val /= 1024;
++i;
}
stream << val << NAMES[i];
return stream;
}
}
std::ostream& operator<< (std::ostream& stream, const Connection::Statistics& c) {
stream << "total " << (c.msgs_in + c.msgs_out) << "pps(" << PrettyBytes{c.bytes_in + c.bytes_out} << "/s),"
"input " << c.msgs_in << "pps(" << PrettyBytes{c.bytes_in} << "/s),"
"output " << c.msgs_out << "pps(" << PrettyBytes{c.bytes_out} << "/s)";
return stream;
}
}}}