From 356f0e537876c2849ab2af04f550b28b20a97081 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sun, 7 Feb 2021 14:58:59 +0100 Subject: [PATCH] Fixed build for new shared library --- native/CMakeLists.txt | 4 +- native/serverconnection/CMakeLists.txt | 16 +- .../serverconnection/src/audio/AudioInput.cpp | 1 - .../serverconnection/src/audio/AudioInput.h | 3 +- .../src/audio/AudioOutput.cpp | 2 +- .../src/audio/driver/AudioDriver.cpp | 4 +- .../src/audio/js/AudioConsumer.cpp | 8 +- .../src/audio/js/AudioConsumer.h | 1 - .../src/connection/ProtocolHandler.cpp | 761 ++++++++---------- .../src/connection/ProtocolHandler.h | 40 +- .../connection/ProtocolHandlerCommands.cpp | 1 - .../src/connection/ProtocolHandlerCrypto.cpp | 31 +- .../src/connection/ProtocolHandlerPOW.cpp | 14 +- .../src/connection/ProtocolHandlerPackets.cpp | 48 +- .../src/connection/ServerConnection.cpp | 31 +- .../src/connection/audio/VoiceConnection.cpp | 20 +- .../src/connection/audio/VoiceConnection.h | 11 +- .../src/connection/ft/FileTransferManager.cpp | 24 +- native/serverconnection/src/logger.h | 5 +- native/serverconnection/src/thread_helper.cpp | 94 +++ native/serverconnection/src/thread_helper.h | 26 + 21 files changed, 578 insertions(+), 567 deletions(-) create mode 100644 native/serverconnection/src/thread_helper.cpp create mode 100644 native/serverconnection/src/thread_helper.h diff --git a/native/CMakeLists.txt b/native/CMakeLists.txt index 81731b1..73d17e0 100644 --- a/native/CMakeLists.txt +++ b/native/CMakeLists.txt @@ -24,8 +24,8 @@ function(setup_nodejs) set(NODEJS_URL "https://atom.io/download/atom-shell") set(NODEJS_VERSION "v8.0.0") - set(NODEJS_URL "https://nodejs.org/download/release/") - set(NODEJS_VERSION "v12.13.0") + #set(NODEJS_URL "https://nodejs.org/download/release/") + #set(NODEJS_VERSION "v12.13.0") find_package(NodeJS REQUIRED) diff --git a/native/serverconnection/CMakeLists.txt b/native/serverconnection/CMakeLists.txt index e57da70..7dacfe4 100644 --- a/native/serverconnection/CMakeLists.txt +++ b/native/serverconnection/CMakeLists.txt @@ -6,6 +6,7 @@ set(SOURCE_FILES src/EventLoop.cpp src/hwuid.cpp src/ring_buffer.cpp + src/thread_helper.cpp src/connection/ft/FileTransferManager.cpp src/connection/ft/FileTransferObject.cpp @@ -99,22 +100,17 @@ find_package(DataPipes REQUIRED) include_directories(${DataPipes_INCLUDE_DIR}) set(_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES}) +set(LIBEVENT_STATIC_LINK TRUE) +set(CMAKE_FIND_USE_PACKAGE_REGISTRY) find_package(Libevent REQUIRED) -include_directories(${LIBEVENT_INCLUDE_DIRS}) set(CMAKE_FIND_LIBRARY_SUFFIXES ${_CMAKE_FIND_LIBRARY_SUFFIXES}) find_package(TeaSpeak_SharedLib REQUIRED) include_directories(${TeaSpeak_SharedLib_INCLUDE_DIR}) -find_package(StringVariable REQUIRED) -include_directories(${StringVariable_INCLUDE_DIR}) - find_package(ed25519 REQUIRED) include_directories(${ed25519_INCLUDE_DIR}) -find_package(ThreadPool REQUIRED) -include_directories(${ThreadPool_INCLUDE_DIR}) - find_package(rnnoise REQUIRED) if (WIN32) @@ -141,11 +137,9 @@ set(REQUIRED_LIBRARIES ${TomCrypt_LIBRARIES_STATIC} ${TomMath_LIBRARIES_STATIC} - ${LIBEVENT_STATIC_LIBRARIES} + libevent::core - ${StringVariable_LIBRARIES_STATIC} DataPipes::core::static - ${ThreadPool_LIBRARIES_STATIC} ${soxr_LIBRARIES_STATIC} ${fvad_LIBRARIES_STATIC} ${opus_LIBRARIES_STATIC} @@ -154,6 +148,7 @@ set(REQUIRED_LIBRARIES rnnoise spdlog::spdlog_header_only + Nan::Helpers ) @@ -167,6 +162,7 @@ if (WIN32) set(REQUIRED_LIBRARIES ${REQUIRED_LIBRARIES} "Ws2_32.Lib") else() set(REQUIRED_LIBRARIES ${REQUIRED_LIBRARIES} + libevent::pthreads libstdc++fs.a asound jack.a diff --git a/native/serverconnection/src/audio/AudioInput.cpp b/native/serverconnection/src/audio/AudioInput.cpp index a3a333a..ae08785 100644 --- a/native/serverconnection/src/audio/AudioInput.cpp +++ b/native/serverconnection/src/audio/AudioInput.cpp @@ -1,6 +1,5 @@ #include #include -#include #include "./AudioInput.h" #include "./AudioReframer.h" #include "./AudioResampler.h" diff --git a/native/serverconnection/src/audio/AudioInput.h b/native/serverconnection/src/audio/AudioInput.h index 4865314..75949c6 100644 --- a/native/serverconnection/src/audio/AudioInput.h +++ b/native/serverconnection/src/audio/AudioInput.h @@ -5,7 +5,6 @@ #include #include #include -#include #include "AudioSamples.h" #include "driver/AudioDriver.h" @@ -25,7 +24,7 @@ namespace tc::audio { size_t const frame_size = 0; - spin_lock on_read_lock; /* locked to access the function */ + std::mutex on_read_lock{}; /* locked to access the function */ std::function on_read; private: AudioConsumer(AudioInput* handle, size_t channel_count, size_t sample_rate, size_t frame_size); diff --git a/native/serverconnection/src/audio/AudioOutput.cpp b/native/serverconnection/src/audio/AudioOutput.cpp index 47de383..63336fb 100644 --- a/native/serverconnection/src/audio/AudioOutput.cpp +++ b/native/serverconnection/src/audio/AudioOutput.cpp @@ -80,7 +80,7 @@ ssize_t AudioOutputSource::pop_samples(void *buffer, size_t samples) { //log_trace(category::audio, tr("Min: {}, Max: {}, Current: {}, Buffering: {} Required: {}, left: {}, will buffer in {}"), this->min_buffered_samples, this->max_buffered_samples, available_samples, this->buffering, samples, (int) available_samples - samples, this->will_buffer_in); if(this->will_buffer_in > 0) { - if(samples > this->will_buffer_in) { + if(samples > (size_t) this->will_buffer_in) { samples = this->will_buffer_in; this->buffering = true; this->fade_in_start = this->buffer.calculate_advanced_write_ptr(samples * sizeof(float) * this->channel_count); diff --git a/native/serverconnection/src/audio/driver/AudioDriver.cpp b/native/serverconnection/src/audio/driver/AudioDriver.cpp index 3771354..c168e6e 100644 --- a/native/serverconnection/src/audio/driver/AudioDriver.cpp +++ b/native/serverconnection/src/audio/driver/AudioDriver.cpp @@ -4,10 +4,10 @@ #include #include -#include -#include "./AudioDriver.h" #include "../../logger.h" +#include "../../thread_helper.h" #include "../AudioMerger.h" +#include "./AudioDriver.h" #ifdef HAVE_SOUNDIO #include "./SoundIO.h" diff --git a/native/serverconnection/src/audio/js/AudioConsumer.cpp b/native/serverconnection/src/audio/js/AudioConsumer.cpp index 50a664f..7f70c04 100644 --- a/native/serverconnection/src/audio/js/AudioConsumer.cpp +++ b/native/serverconnection/src/audio/js/AudioConsumer.cpp @@ -1,13 +1,13 @@ -#include -#include "AudioConsumer.h" -#include "AudioRecorder.h" -#include "AudioFilter.h" +#include "./AudioConsumer.h" +#include "./AudioRecorder.h" +#include "./AudioFilter.h" #include "../AudioInput.h" #include "../filter/Filter.h" #include "../filter/FilterVad.h" #include "../filter/FilterThreshold.h" #include "../filter/FilterState.h" #include "../../logger.h" +#include /* Must be last */ using namespace std; using namespace tc::audio; diff --git a/native/serverconnection/src/audio/js/AudioConsumer.h b/native/serverconnection/src/audio/js/AudioConsumer.h index a31302c..272ccfe 100644 --- a/native/serverconnection/src/audio/js/AudioConsumer.h +++ b/native/serverconnection/src/audio/js/AudioConsumer.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include diff --git a/native/serverconnection/src/connection/ProtocolHandler.cpp b/native/serverconnection/src/connection/ProtocolHandler.cpp index 42ca92d..0b97d98 100644 --- a/native/serverconnection/src/connection/ProtocolHandler.cpp +++ b/native/serverconnection/src/connection/ProtocolHandler.cpp @@ -3,7 +3,6 @@ #endif #include "ProtocolHandler.h" -#include "ServerConnection.h" #include "Socket.h" #include "../logger.h" #include @@ -11,7 +10,6 @@ #include #include #include -#include using namespace std; using namespace std::chrono; @@ -19,8 +17,19 @@ using namespace tc::connection; using namespace ts::protocol; using namespace ts; -ProtocolHandler::ProtocolHandler(ServerConnection* handle) : handle(handle) { - this->compression_handler.max_packet_size = 128 * 1024; /* max 128Kb */ +ProtocolHandler::ProtocolHandler(ServerConnection* handle) : handle{handle}, packet_decoder{&this->crypt_handler} { + + this->packet_decoder.callback_argument = this; + this->packet_decoder.callback_decoded_packet = ProtocolHandler::callback_packet_decoded; + this->packet_decoder.callback_decoded_command = ProtocolHandler::callback_command_decoded; + this->packet_decoder.callback_send_acknowledge = ProtocolHandler::callback_send_acknowledge; + + this->acknowledge_handler.callback_data = this; + this->acknowledge_handler.callback_resend_failed = ProtocolHandler::callback_resend_failed; + this->acknowledge_handler.destroy_packet = [](void* pkt_ptr) { + auto packet = (OutgoingClientPacket*) pkt_ptr; + packet->unref(); + }; } ProtocolHandler::~ProtocolHandler() { @@ -50,21 +59,17 @@ void ProtocolHandler::reset() { this->crypto.initiv_command = ""; this->crypto.beta_length = 0; - if(this->crypto.identity.k) + if(this->crypto.identity.k) { ecc_free(&this->crypto.identity); + } memset(&this->crypto.identity, 0, sizeof(this->crypto.identity)); } - for(auto& buffer : this->_packet_buffers) { - lock_guard lock(buffer.buffer_lock); - buffer.reset(); - } this->crypt_setupped = false; - for(auto& calculator : this->incoming_generation_estimators) - calculator.reset(); this->_packet_id_manager.reset(); this->crypt_handler.reset(); + this->packet_decoder.reset(); this->ping.ping_received_timestamp = system_clock::time_point{}; @@ -81,9 +86,7 @@ void ProtocolHandler::connect() { { auto command = this->generate_client_initiv(); - auto packet = make_shared(PacketTypeInfo::Command, pipes::buffer_view{command.data(), command.size()}); - packet->enable_flag(PacketFlag::NewProtocol); - this->send_packet(packet); + this->send_command(command, false, nullptr); } } @@ -91,8 +94,7 @@ void ProtocolHandler::execute_tick() { auto now = system_clock::now(); if(this->connection_state < connection_state::DISCONNECTED) { if(!this->pow.last_buffer.empty() && this->pow.last_resend < now - seconds(1)) { - this->pow.last_resend = now; - this->send_packet(make_shared(PacketTypeInfo::Init1, PacketFlag::Unencrypted, this->pow.last_buffer)); + this->send_init1_buffer(); } if(this->connection_state == connection_state::INIT_LOW || this->connection_state == connection_state::INIT_HIGH) { @@ -129,6 +131,15 @@ void ProtocolHandler::execute_tick() { } } +void ProtocolHandler::send_init1_buffer() { + this->pow.last_resend = std::chrono::system_clock::now(); + auto packet = protocol::allocate_outgoing_client_packet(this->pow.last_buffer.length()); + memcpy(packet->payload, this->pow.last_buffer.data_ptr(), this->pow.last_buffer.length()); + packet->type_and_flags_ = protocol::PacketType::INIT1 | protocol::PacketFlag::Unencrypted; + *(uint16_t*) packet->packet_id_bytes_ = htons(101); + this->send_packet(packet, true); +} + void ProtocolHandler::execute_resend() { if(this->connection_state >= connection_state::DISCONNECTED) { return; @@ -139,520 +150,379 @@ void ProtocolHandler::execute_resend() { system_clock::time_point next = now + seconds(5); /* in real we're doing it all 500ms */ string error; - auto resended = this->acknowledge_handler.execute_resend(now, next, buffers, error); - if(resended < 0) { - log_error(category::connection, tr("Failed to receive acknowledge: {}"), error); - - this->handle->execute_callback_disconnect(tr("packet resend failed")); - this->handle->close_connection(); - return; - } - //log_trace(category::connection, tr("Resended {}"), resended); - + this->acknowledge_handler.execute_resend(now, next, buffers); auto socket = this->handle->get_socket(); if(socket) { for(const auto& buffer : buffers) { - socket->send_message(buffer->buffer); + auto packet = (ts::protocol::OutgoingClientPacket*) buffer->packet_ptr; + socket->send_message(pipes::buffer_view{packet->packet_data(), packet->packet_length()}); /* only control packets are getting resend */ - this->statistics_.control_bytes_send += buffer->buffer.length(); + this->statistics_.control_bytes_send += packet->packet_length(); } } this->handle->schedule_resend(next); } +void ProtocolHandler::callback_resend_failed(void *ptr_this, + const std::shared_ptr &) { + auto connection = reinterpret_cast(ptr_this); + log_error(category::connection, tr("Failed to receive acknowledge")); + + connection->handle->execute_callback_disconnect(tr("packet resend failed")); + connection->handle->close_connection(); +} + void ProtocolHandler::progress_packet(const pipes::buffer_view &buffer) { if(this->connection_state >= connection_state::DISCONNECTED) { log_warn(category::connection, tr("Dropping received packet. We're already disconnected.")); return; } - if(buffer.length() < ServerPacket::META_SIZE) { + protocol::ServerPacketParser packet_parser{buffer}; + if(!packet_parser.valid()) { log_error(category::connection, tr("Received a packet which is too small. ({})"), buffer.length()); return; } - auto packet = std::shared_ptr(ts::protocol::ServerPacket::from_buffer(buffer).release()); - auto packet_type = packet->type(); - auto packet_id = packet->packetId(); - auto ordered = packet_type.type() == protocol::COMMAND || packet_type.type() == protocol::COMMAND_LOW; - //log_trace(category::connection, tr("Received packet {} with id {}"), packet->type().name(), packet->packetId()); - - switch(packet_type.type()) { - case ts::protocol::PacketType::VOICE: - case ts::protocol::PacketType::VOICE_WHISPER: + switch (packet_parser.type()) { + case protocol::PacketType::VOICE: + case protocol::PacketType::VOICE_WHISPER: this->statistics_.voice_bytes_received += buffer.length(); break; - case ts::protocol::PacketType::COMMAND: - case ts::protocol::PacketType::COMMAND_LOW: + case protocol::PacketType::COMMAND: + case protocol::PacketType::COMMAND_LOW: this->statistics_.control_bytes_received += buffer.length(); break; - } - /* special handling */ - if(packet_type.type() == protocol::INIT1) { - this->handlePacketInit(packet); - return; - } - - if(packet_type.type() < 0 || packet_type.type() >= this->_packet_buffers.size()) { - log_error(category::connection, tr("Received packet with invalid type. ({})"), packet_type.type()); - return; - } - - auto& read_queue = this->_packet_buffers[packet_type.type()]; - auto& gen_calc = this->incoming_generation_estimators[packet_type.type()]; - packet->generationId(gen_calc.visit_packet(packet_id)); - auto gen = packet->generationId(); - if(ordered) { - unique_lock queue_lock(read_queue.buffer_lock); - auto result = read_queue.accept_index(packet_id); - if(result != 0) { /* packet index is ahead buffer index */ - log_error(category::connection, tr("Failed to register command packet ({}) (Index: {} Current index: {})"), result == -1 ? tr("underflow") : tr("overflow"), packet_id, read_queue.current_index()); - - if(result == -1) { /* underflow */ - /* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */ - if(packet->type() == PacketTypeInfo::Command || packet->type() == PacketTypeInfo::CommandLow) - this->send_acknowledge(packet->packetId(), packet->type() == PacketTypeInfo::CommandLow); - } + case protocol::PacketType::INIT1: + this->handlePacketInit(packet_parser); return; - } } - packet->setEncrypted(!packet->has_flag(PacketFlag::Unencrypted)); - if(packet->type() == PacketTypeInfo::Command || packet->type() == PacketTypeInfo::CommandLow){ - packet->setCompressed(packet->has_flag(PacketFlag::Compressed)); + std::string error{}; + auto decode_result = this->packet_decoder.process_incoming_data(packet_parser, error); + using PacketProcessResult = protocol::PacketProcessResult; + switch (decode_result) { + case PacketProcessResult::SUCCESS: + case PacketProcessResult::FUZZ_DROPPED: /* maybe some kind of log? */ + case PacketProcessResult::DECRYPT_FAILED: /* Silently drop this packet */ + case PacketProcessResult::DUPLICATED_PACKET: /* no action needed, acknowledge should be send already */ + break; + + case PacketProcessResult::DECRYPT_KEY_GEN_FAILED: + /* no action needed, acknowledge should be send */ + log_error(category::connection, tr("Failed to generate decrypt key. Dropping a packet."), buffer.length()); + break; + + case PacketProcessResult::BUFFER_OVERFLOW: + case PacketProcessResult::BUFFER_UNDERFLOW: + log_error(category::connection, tr("Dropping command packet because command assembly buffer has an {}: {}"), + decode_result == PacketProcessResult::BUFFER_UNDERFLOW ? "underflow" : "overflow", + error); + break; + + case PacketProcessResult::UNKNOWN_ERROR: + log_error(category::connection, tr("Having an unknown error while processing a incoming packet: {}"), + error); + goto disconnect_client; + + case PacketProcessResult::COMMAND_BUFFER_OVERFLOW: + log_error(category::connection, tr("Having a command buffer overflow. This might cause us to drop."), + error); + break; + + case PacketProcessResult::COMMAND_DECOMPRESS_FAILED: + log_error(category::connection, tr("Failed to decompress a command packet. Dropping command."), + error); + break; + + case PacketProcessResult::COMMAND_TOO_LARGE: + log_error(category::connection, tr("Received a too large command. Dropping command."), + error); + break; + + case PacketProcessResult::COMMAND_SEQUENCE_LENGTH_TOO_LONG: + log_error(category::connection, tr("Received a too long command sequence. Dropping command."), error); + break; + + default: + assert(false); + break; } - //NOTICE I found out that the Compressed flag is set if the packet contains an audio header + return; - if(packet->isEncrypted()) { - std::string error; - - ts::connection::CryptHandler::key_t crypt_key{}; - ts::connection::CryptHandler::nonce_t crypt_nonce{}; - - bool decrypt_result; - - if(!this->crypt_setupped) { - crypt_key = ts::connection::CryptHandler::default_key; - crypt_nonce = ts::connection::CryptHandler::default_nonce; - } else { - if(!this->crypt_handler.generate_key_nonce(false, packet_type.type(), packet->packetId(), packet->generationId(), crypt_key, crypt_nonce)) { - log_error(category::connection, tr("Failed to generate crypt key/nonce. This should never happen! Dropping packet.")); - return; - } - } - - auto mac_ptr = packet->mac().data_ptr(); - auto header_ptr = packet->header().data_ptr(); - auto data_ptr = packet->data().data_ptr(); - decrypt_result = this->crypt_handler.decrypt( - header_ptr, packet->header_length(), - data_ptr, packet->data_length(), - mac_ptr, - crypt_key, crypt_nonce, - error - ); - - if(!decrypt_result) { - if(!this->crypt_setupped) - log_error(category::connection, tr("Failed to decrypt packet ({}), with default key."), packet_type.name()); - else - log_trace(category::connection, tr("Failed to decrypt packet {}."), packet_type.name()); - return; - } - } - - if(packet->type() == PacketTypeInfo::Command || packet->type() == PacketTypeInfo::CommandLow){ - if(packet->has_flag(PacketFlag::Unencrypted)) { - log_warn(category::connection, tr("Received unencrypted command packet! Dropping packet.")); - return; - } - } - - { - unique_lock queue_lock(read_queue.buffer_lock); - - if(ordered) { /* ordered */ - //log_trace(category::connection, tr("Inserting packet {} with id {}"), packet->type().name(), packet->packetId()); - if(!read_queue.insert_index(packet_id, std::forward>(packet))) { - log_warn(category::connection, tr("Failed to insert ordered packet into queue. ({} | {} | {})"), packet_type.name(), read_queue.current_index(), packet_id); - return; - } - } else { - if(!read_queue.push_back(std::forward>(packet))) { - log_warn(category::connection, tr("Failed to insert unordered packet into queue. ({} | {} | {})"), packet_type.name(), read_queue.current_index(), packet_id); - return; - /* return; dont stop here because we've to progress the packets */ - } else { - read_queue.index_set(packet_id); /* may we've skipped one packet id */ - } - } - } - - /* only send an ack when we actually succeeded registering the packet */ - if(packet_type == PacketTypeInfo::Command || packet_type == PacketTypeInfo::CommandLow) - this->send_acknowledge(packet_id, packet_type == PacketTypeInfo::CommandLow); - - while(this->handle_packets()); + disconnect_client:; + /* TODO! */ } -bool ProtocolHandler::handle_packets() { - if(this->connection_state >= connection_state::DISCONNECTED) { +void ProtocolHandler::callback_packet_decoded(void *ptr_this, const ts::protocol::PacketParser &packet) { + auto connection = reinterpret_cast(ptr_this); + + if(connection->connection_state >= connection_state::DISCONNECTED) { log_warn(category::connection, tr("Don't handle received packets because we're already disconnected.")); - return false; + return; } - bool reexecute_handle = false; - shared_ptr current_packet = nullptr; + switch (packet.type()) { + case protocol::VOICE: + case protocol::VOICE_WHISPER: + connection->handlePacketVoice(packet); + break; - packet_buffer_t* buffer = nullptr; - unique_lock buffer_lock; - unique_lock buffer_execute_lock; - std::string error = "success"; + case protocol::ACK: + case protocol::ACK_LOW: + connection->handlePacketAck(packet); + break; + case protocol::PING: + case protocol::PONG: + connection->handlePacketPing(packet); + break; - { - auto base_index = this->_packet_buffers_index; - auto select_index = base_index; - auto max_index = this->_packet_buffers.size(); - for(size_t index = 0; index < max_index; index++) { - if(!buffer) select_index++; + case protocol::INIT1: + /* We've received an init1 packet here. The connection should not send that kind of packets... */ + break; - auto& buf = this->_packet_buffers[base_index++ % max_index]; - unique_lock ring_lock(buf.buffer_lock, try_to_lock); - if(!ring_lock.owns_lock()) { - log_debug(category::connection, tr("Skipping packet type {} for handling"), base_index++ % max_index); - continue; - } - - if(buf.front_set()) { - if(!buffer) { /* lets still test for reexecute */ - buffer_execute_lock = unique_lock(buf.execute_lock, try_to_lock); - if(!buffer_execute_lock.owns_lock()) { - log_debug(category::connection, tr("Skipping packet type {} for handling (already executed)"), base_index++ % max_index); - continue; - } - - buffer_lock = move(ring_lock); - buffer = &buf; - } else { - reexecute_handle |= true; - break; - } - } - } - this->_packet_buffers_index = select_index % max_index; /* garante that we will not hangup with commands! */ + default: + log_error(category::connection, tr("Received hand decoded packet, but we've no method to handle it. Dropping packet.")); + assert(false); + break; } - - if(buffer){ - uint16_t sequence_length = 0; - current_packet = buffer->slot_value(sequence_length++); - - if(current_packet) { - if(this->_packet_buffer_overflow[current_packet->type().type()]) { - auto& overflow_flag = this->_packet_buffer_overflow[current_packet->type().type()]; - - while(current_packet && !current_packet->has_flag(PacketFlag::Fragmented) && sequence_length < buffer->capacity()) - current_packet = buffer->slot_value(sequence_length++); - - while(buffer->front_set()) - if(buffer->pop_front() == current_packet) - break; - - overflow_flag = !current_packet || !current_packet->has_flag(PacketFlag::Fragmented); - if(!overflow_flag) { - log_info(category::connection, tr("Recovered successfully from too long packet.")); - } - return false; - } - if((current_packet->type() == PacketTypeInfo::Command || current_packet->type() == PacketTypeInfo::CommandLow) && current_packet->has_flag(PacketFlag::Fragmented)) { - do { - if(sequence_length >= buffer->capacity()) { - log_error(category::connection, tr("Received fragmented packets which have a too long order (> {}). Ignoring that command and dropping its segments."), buffer->capacity()); - while(buffer->front_set()) - buffer->pop_front(); - //TODO: Log to the client! - //this->handle->execute_callback_disconnect.call(tr("received a too long packet"), true); - //this->disconnect("received a too long packet"); - this->_packet_buffer_overflow[current_packet->type().type()] = true; - return false; - } - current_packet = buffer->slot_value(sequence_length++); - } while(current_packet && !current_packet->has_flag(PacketFlag::Fragmented)); - } - } else { - log_critical(category::connection, tr("buffer->slot_value(sequence_length++) returned nullptr!")); - //FIXME! - //logCritical(this->client->getServer()->getServerId(), "buffer->slot_value(sequence_length++) returned nullptr!") - }; - - if(current_packet) { //We could reconstruct a new packet! - if(sequence_length > 1) { //We have to merge - vector append; - append.reserve(sequence_length - 1); - - uint16_t packet_count = 0; - current_packet = buffer->pop_front(); - packet_count++; - do { - auto packet = buffer->pop_front(); - packet_count++; - if(!packet) { - log_critical(category::connection, tr("readQueue->peekNext(seqIndex++) => nullptr_t!")); - return false; - } - - append.push_back(packet->data()); - if(packet->has_flag(PacketFlag::Fragmented)) break; - } while(packet_count < sequence_length); - - if(packet_count != sequence_length) { - log_critical(category::connection, tr("seqIndex != index failed! seqIndex: {} seqLength: {} This may cause a application crash!"), packet_count, sequence_length); - sequence_length = packet_count; - current_packet = nullptr; - } else { - current_packet->append_data(append); - } - } else { - if(buffer->pop_front() != current_packet) { - log_critical(category::connection, tr("buffer->pop_front() != current_packet failed.")); - } - } - reexecute_handle |= buffer->front_set(); - buffer_lock.unlock(); //We got our packet so release it - - if(current_packet) { - if(!this->compression_handler.progressPacketIn(current_packet.get(), error)) { - log_error(category::connection, tr("Failed to decompress received packet. Error: {}"), error); - current_packet = nullptr; - } - } - } - } - - if(current_packet){ - auto startTime = chrono::system_clock::now(); - try { - if(current_packet->type() == PacketTypeInfo::Command || current_packet->type() == PacketTypeInfo::CommandLow) - this->handlePacketCommand(current_packet); - else if(current_packet->type() == PacketTypeInfo::Ack || current_packet->type() == PacketTypeInfo::AckLow) - this->handlePacketAck(current_packet); - else if(current_packet->type() == PacketTypeInfo::Voice || current_packet->type() == PacketTypeInfo::VoiceWhisper) - this->handlePacketVoice(current_packet); - else if(current_packet->type() == PacketTypeInfo::Ping || current_packet->type() == PacketTypeInfo::Pong) - this->handlePacketPing(current_packet); - } catch (std::exception& ex) { - log_critical(category::connection, tr("Exception reached root tree! {}"), ex.what()); - } - - auto end = chrono::system_clock::now(); - if(end - startTime > chrono::milliseconds(5)) { - if(current_packet->type() != PacketTypeInfo::Command && current_packet->type() != PacketTypeInfo::CommandLow) { - log_warn(category::connection, - tr("Handling of packet {} ({}) needed longer than expected. Handle time {}ms"), - current_packet->packetId(), current_packet->type().name(), duration_cast(end - startTime).count()); - } - } - } - if(buffer_execute_lock.owns_lock()) - buffer_execute_lock.unlock(); - - return reexecute_handle; } -bool ProtocolHandler::create_datagram_packets(std::vector &result, const std::shared_ptr &packet) { - string error = "success"; - - if(packet->type().compressable() && !packet->memory_state.fragment_entry) { - packet->enable_flag(PacketFlag::Compressed); - if(!this->compression_handler.progressPacketOut(packet.get(), error)) { - log_error(category::connection, tr("Could not compress outgoing packet.\nThis could cause fatal failed for the client.\nError: {}"), error); - return false; - } - } - if(packet->data().length() > packet->type().max_length()){ - if(!packet->type().fragmentable()) { - log_error(category::connection, tr("We've tried to send a too long, not fragmentable packet. Dropping packet of type {} with length {}"), packet->type().name(), packet->data().length()); - return false; - } - - std::vector> siblings; - siblings.reserve(8); - - - { //Split packets - auto buffer = packet->data(); - - const auto max_length = packet->type().max_length(); - while(buffer.length() > max_length * 2) { - siblings.push_back(make_shared(packet->type(), buffer.view(0, max_length).dup(ts::buffer::allocate_buffer(max_length)))); - buffer = buffer.range(max_length); - } - - if(buffer.length() > max_length) { //Divide rest by 2 - siblings.push_back(make_shared(packet->type(), buffer.view(0, buffer.length() / 2).dup(ts::buffer::allocate_buffer(buffer.length() / 2)))); - buffer = buffer.range(buffer.length() / 2); - } - siblings.push_back(make_shared(packet->type(), buffer)); - - for(const auto& frag : siblings) { - frag->setFragmentedEntry(true); - frag->enable_flag(PacketFlag::NewProtocol); - } - } - - assert(siblings.size() >= 2); - siblings.front()->enable_flag(PacketFlag::Fragmented); - if(packet->has_flag(PacketFlag::Compressed)) - siblings.front()->enable_flag(PacketFlag::Compressed); - - siblings.back()->enable_flag(PacketFlag::Fragmented); - - if(packet->getListener()) - siblings.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :) - - result.reserve(siblings.size()); - for(const auto& frag : siblings) - create_datagram_packets(result, frag); - return true; +void ProtocolHandler::callback_command_decoded(void *ptr_this, ts::command::ReassembledCommand *&command) { + auto connection = reinterpret_cast(ptr_this); + if(connection->connection_state >= connection_state::DISCONNECTED) { + log_warn(category::connection, tr("Don't handle received command because we're already disconnected.")); + return; } - if(!packet->memory_state.id_branded) { - packet->clientId(this->client_id); - if(packet->type().type() == PacketType::INIT1) { - packet->applyPacketId(101, 0); - } else { - packet->applyPacketId(this->_packet_id_manager); + connection->handlePacketCommand(std::exchange(command, nullptr)); +} + +void ProtocolHandler::callback_send_acknowledge(void *ptr_this, uint16_t packet_id, bool low) { + auto connection = reinterpret_cast(ptr_this); + connection->send_acknowledge(packet_id, low); +} + +void ProtocolHandler::send_packet(ts::protocol::OutgoingClientPacket *packet, bool skip_id_branding) { + if(!skip_id_branding) { + uint32_t full_id; + { + std::lock_guard lock{this->packet_id_mutex}; + full_id = this->_packet_id_manager.generate_full_id(packet->packet_type()); } - //log_trace(category::connection, tr("Packet {} got packet id {}"), packet->type().name(), packet->packetId()); + packet->set_packet_id(full_id & 0xFFFFU); + packet->generation = full_id >> 16U; } - if(packet->has_flag(PacketFlag::Unencrypted)) { - this->crypt_handler.write_default_mac(packet->mac().data_ptr()); + *(uint16_t*) packet->client_id_bytes = htons(this->client_id); + packet->next = nullptr; + + auto socket = this->handle->get_socket(); + if(!socket) { + packet->unref(); + return; + } + + /* Since we assume that the packets gets written instantly we're setting the next ptr to null */ + if(packet->type_and_flags_ & PacketFlag::Unencrypted || !this->crypt_setupped) { + this->crypt_handler.write_default_mac(packet->mac); } else { ts::connection::CryptHandler::key_t crypt_key{}; ts::connection::CryptHandler::nonce_t crypt_nonce{}; if(!this->crypt_setupped) { - crypt_key = ts::connection::CryptHandler::default_key; - crypt_nonce = ts::connection::CryptHandler::default_nonce; + crypt_key = ts::connection::CryptHandler::kDefaultKey; + crypt_nonce = ts::connection::CryptHandler::kDefaultNonce; } else { - if(!this->crypt_handler.generate_key_nonce(true, packet->type().type(), packet->packetId(), packet->generationId(), crypt_key, crypt_nonce)) { - log_error(category::connection, tr("Failed to generate crypt key/nonce. Dropping packet"), error); - return false; + if(!this->crypt_handler.generate_key_nonce(true, packet->packet_type(), packet->packet_id(), packet->generation, crypt_key, crypt_nonce)) { + log_error(category::connection, tr("Failed to generate crypt key/nonce. Dropping packet")); + packet->unref(); + return; } } - auto crypt_result = this->crypt_handler.encrypt(packet->header().data_ptr(), packet->header().length(), - packet->data().data_ptr(), packet->data().length(), - packet->mac().data_ptr(), - crypt_key, crypt_nonce, error); + std::string error{}; + auto crypt_result = this->crypt_handler.encrypt( + (char*) packet->packet_data() + protocol::ClientPacketParser::kHeaderOffset, + protocol::ClientPacketParser::kHeaderLength, + packet->payload, packet->payload_size, + packet->mac, + crypt_key, crypt_nonce, error); + if(!crypt_result){ log_error(category::connection, tr("Failed to encrypt packet: {}"), error); - return false; + packet->unref(); + return; } } - /* -#ifndef CONNECTION_NO_STATISTICS - if(this->client && this->client->getServer()) - this->client->connectionStatistics->logOutgoingPacket(packet); -#endif - */ - result.push_back(packet->buffer()); + switch(packet->packet_type()) { + case PacketType::COMMAND: + case PacketType::COMMAND_LOW: + this->statistics_.control_bytes_send += packet->packet_length(); + break; - this->acknowledge_handler.process_packet(*packet); - return true; -} + case PacketType::VOICE: + case PacketType::VOICE_WHISPER: + this->statistics_.voice_bytes_send += packet->packet_length(); + break; -void ProtocolHandler::send_command(const ts::Command &cmd, const std::function &ack_callback) { - auto data = cmd.build(); - auto packet = make_shared(PacketTypeInfo::Command, pipes::buffer_view{data.data(), data.size()}); - if(ack_callback || true) { - auto begin = chrono::system_clock::now(); - packet->setListener(make_unique>()); - packet->getListener()->waitAndGetLater([ack_callback, begin](bool f) { - auto end = chrono::system_clock::now(); - if(ack_callback) - ack_callback(f); - - //log_trace(category::connection, tr("Time needed for command: {}ms. Success: {}"), chrono::duration_cast(end - begin).count(), f); - }); + default: + break; } - packet->enable_flag(PacketFlag::NewProtocol); - this->send_packet(packet); + /* TODO: Don't copy the packet for the socket. Instead just enqueue it. */ + socket->send_message(pipes::buffer_view{packet->packet_data(), packet->packet_length()}); + packet->unref(); } -void ProtocolHandler::send_packet(const std::shared_ptr &packet) { - std::vector result; - if(!this->create_datagram_packets(result, packet) || result.empty()) { - log_error(category::connection, tr("Failed to create datagram packets!")); - return; +#define MAX_COMMAND_PACKET_PAYLOAD_LENGTH (487) +void ProtocolHandler::send_command(const std::string_view &command, bool low, std::unique_ptr> ack_listener) { + bool own_data_buffer{false}; + void* own_data_buffer_ptr; /* immutable! */ + + const char* data_buffer{command.data()}; + size_t data_length{command.length()}; + + uint8_t head_pflags{0}; + protocol::PacketType ptype{low ? protocol::PacketType::COMMAND_LOW : protocol::PacketType::COMMAND}; + protocol::OutgoingClientPacket *packets_head{nullptr}; + protocol::OutgoingClientPacket **packets_tail{&packets_head}; + + /* only compress "long" commands */ + if(command.size() > 100) { + size_t max_compressed_payload_size = compression::qlz_compressed_size(command.data(), command.length()); + auto compressed_buffer = ::malloc(max_compressed_payload_size); + + size_t compressed_size{max_compressed_payload_size}; + if(!compression::qlz_compress_payload(command.data(), command.length(), compressed_buffer, &compressed_size)) { + //logCritical(0, "Failed to compress command packet. Dropping packet"); + /* TODO: Log! */ + ::free(compressed_buffer); + return; + } + + /* we don't need to make the command longer than it is */ + if(compressed_size < command.length()) { + own_data_buffer = true; + data_buffer = (char*) compressed_buffer; + own_data_buffer_ptr = compressed_buffer; + data_length = compressed_size; + head_pflags |= protocol::PacketFlag::Compressed; + } else { + ::free(compressed_buffer); + } + } + + uint8_t ptype_and_flags{(uint8_t) ((uint8_t) ptype | (uint8_t) protocol::PacketFlag::NewProtocol)}; + if(data_length > MAX_COMMAND_PACKET_PAYLOAD_LENGTH) { + auto chunk_count = (size_t) ceil((float) data_length / (float) MAX_COMMAND_PACKET_PAYLOAD_LENGTH); + auto chunk_size = (size_t) ceil((float) data_length / (float) chunk_count); + + while(true) { +#ifdef WIN32 + auto bytes = min(chunk_size, data_length); +#else + auto bytes = std::min(chunk_size, data_length); +#endif + auto packet = protocol::allocate_outgoing_client_packet(bytes); + packet->type_and_flags_ = ptype_and_flags; + memcpy(packet->payload, data_buffer, bytes); + + *packets_tail = packet; + packets_tail = &packet->next; + + data_length -= bytes; + if(data_length == 0) { + packet->type_and_flags_ |= protocol::PacketFlag::Fragmented; + break; + } + data_buffer += bytes; + } + packets_head->type_and_flags_ |= protocol::PacketFlag::Fragmented; + } else { + auto packet = protocol::allocate_outgoing_client_packet(data_length); + packet->type_and_flags_ = ptype_and_flags; + + memcpy(packet->payload, data_buffer, data_length); + + packets_head = packet; + packets_tail = &packet->next; } { - if(packet->type() == protocol::PacketTypeInfo::Command && this->connection_state == connection_state::CONNECTED && false) { - ts::Command cmd{"whoami"}; - auto data = cmd.build(); - auto p1 = make_shared(PacketTypeInfo::Command, pipes::buffer_view{data.data(), data.size()}); - if(!this->create_datagram_packets(result, p1)) - log_error(category::connection, tr("failed to encode trap")); - std::reverse(result.begin(), result.end()); + std::lock_guard id_lock{this->packet_id_mutex}; + + uint32_t full_id; + auto head = packets_head; + while(head) { + full_id = this->_packet_id_manager.generate_full_id(ptype); + + head->set_packet_id(full_id & 0xFFFFU); + head->generation = full_id >> 16U; + + head = head->next; + } + } + packets_head->type_and_flags_ |= head_pflags; + + /* ack handler */ + { + auto head = packets_head; + while(head) { + auto full_packet_id = (uint32_t) (head->generation << 16U) | head->packet_id(); + + /* increase a reference for the ack handler */ + head->ref(); + + /* Even thou the packet is yet unencrypted, it will be encrypted with the next write. The next write will be before the next resend because the next ptr must be null in order to resend a packet */ + if(&head->next == packets_tail) { + this->acknowledge_handler.process_packet(ptype, full_packet_id, head, std::move(ack_listener)); + } else { + this->acknowledge_handler.process_packet(ptype, full_packet_id, head, nullptr); + } + + head = head->next; } } - //log_trace(category::connection, tr("Split up {} {} to {} packets. Ack waiting: {}"), packet->packetId(), packet->type().name(), result.size(), this->acknowledge_handler.awaiting_acknowledge()); - auto socket = this->handle->get_socket(); - if(!socket) { - log_error(category::connection, tr("Failed to get socket!")); - return; + auto head = packets_head; + while(head) { + this->send_packet(head, true); } - size_t total_size{0}; - for(const auto& buffer : result) { - socket->send_message(buffer); - - total_size += buffer.length(); - } - - switch(packet->type().type()) { - case ts::protocol::PacketType::VOICE: - case ts::protocol::PacketType::VOICE_WHISPER: - this->statistics_.voice_bytes_send += total_size; - break; - - case ts::protocol::PacketType::COMMAND: - case ts::protocol::PacketType::COMMAND_LOW: - this->statistics_.control_bytes_send += total_size; - break; + if(own_data_buffer) { + ::free(own_data_buffer_ptr); } } +void ProtocolHandler::send_command(const ts::Command &cmd, bool low, std::unique_ptr> ack_callback) { + auto data = cmd.build(); + this->send_command(data, low, std::move(ack_callback)); +} + void ProtocolHandler::send_acknowledge(uint16_t packet_id, bool low) { - char buffer[2]; - le2be16(packet_id, buffer); - auto packet = make_shared(low ? protocol::PacketTypeInfo::AckLow : protocol::PacketTypeInfo::Ack, 0, pipes::buffer_view{buffer, 2}); - if(this->connection_state >= connection_state::CONNECTING) { - ;//packet->toggle(protocol::PacketFlag::NewProtocol, !low); - //LivingBots DDOS protection dont want a new protocol here! - } - this->send_packet(packet); + auto packet = protocol::allocate_outgoing_client_packet(2); + + packet->type_and_flags_ = (uint8_t) (low ? protocol::PacketType::ACK_LOW : protocol::PacketType::ACK) | + (uint8_t) (protocol::PacketFlag::Unencrypted | protocol::PacketFlag::NewProtocol); + + le2be16(packet_id, packet->payload); + this->send_packet(packet, false); } void ProtocolHandler::do_close_connection() { this->connection_state = connection_state::DISCONNECTED; - for(auto& buffer : this->_packet_buffers) { - lock_guard lock(buffer.buffer_lock); - buffer.clear(); - } } void ProtocolHandler::disconnect(const std::string &reason) { - if(this->connection_state >= connection_state::DISCONNECTING) + if(this->connection_state >= connection_state::DISCONNECTING) { return; + } this->connection_state = connection_state::DISCONNECTING; this->disconnect_timestamp = system_clock::now(); @@ -660,11 +530,12 @@ void ProtocolHandler::disconnect(const std::string &reason) { auto did = ++this->disconnect_id; Command cmd("clientdisconnect"); cmd["reasonmsg"] = reason; - this->send_command(cmd, [&, did](bool success){ + this->send_command(cmd, false, std::make_unique>([&, did](bool success) { /* if !success then we'll have prop already triggered the timeout and this here is obsolete */ - if(success && this->connection_state == connection_state::DISCONNECTING && this->disconnect_id == did) + if(success && this->connection_state == connection_state::DISCONNECTING && this->disconnect_id == did) { this->handle->close_connection(); - }); + } + })); } const ConnectionStatistics& ProtocolHandler::statistics() { diff --git a/native/serverconnection/src/connection/ProtocolHandler.h b/native/serverconnection/src/connection/ProtocolHandler.h index 2501a7a..afadda8 100644 --- a/native/serverconnection/src/connection/ProtocolHandler.h +++ b/native/serverconnection/src/connection/ProtocolHandler.h @@ -14,11 +14,12 @@ #include #include #include +#include +#include #include "ServerConnection.h" namespace ts::connection { class CryptionHandler; - class CompressionHandler; } namespace tc::connection { @@ -58,8 +59,6 @@ namespace tc::connection { }; class ProtocolHandler { - typedef ts::protocol::PacketRingBuffer packet_buffer_t; - typedef std::array packet_buffers_t; friend class ServerConnection; public: explicit ProtocolHandler(ServerConnection*); @@ -73,9 +72,10 @@ namespace tc::connection { const ConnectionStatistics& statistics(); void progress_packet(const pipes::buffer_view& /* buffer */); - bool handle_packets(); /* if true we have more left */ - void send_packet(const std::shared_ptr& /* packet */); - void send_command(const ts::Command& /* command */, const std::function & /* acknowledge callback */ = NULL); + + void send_packet(ts::protocol::OutgoingClientPacket* /* packet */, bool /* skip id branding */); /* will claim ownership */ + void send_command(const std::string_view& /* build command command */, bool /* command low */, std::unique_ptr> /* acknowledge listener */ = nullptr); + void send_command(const ts::Command&, bool /* command low */, std::unique_ptr> /* acknowledge listener */ = nullptr); void disconnect(const std::string& /* message */); void send_acknowledge(uint16_t /* packet id */, bool /* low */); @@ -89,13 +89,17 @@ namespace tc::connection { private: void do_close_connection(); /* only call from ServerConnection. Close all connections via ServerConnection! */ - void handlePacketCommand(const std::shared_ptr&); - void handlePacketAck(const std::shared_ptr&); - void handlePacketVoice(const std::shared_ptr&); - void handlePacketPing(const std::shared_ptr&); - void handlePacketInit(const std::shared_ptr&); + static void callback_packet_decoded(void*, const ts::protocol::PacketParser&); + static void callback_command_decoded(void*, ts::command::ReassembledCommand*&); + static void callback_send_acknowledge(void*, uint16_t, bool); + static void callback_resend_failed(void*, const std::shared_ptr&); - bool create_datagram_packets(std::vector &result, const std::shared_ptr &packet); + /* Ownership will be transfered */ + void handlePacketCommand(ts::command::ReassembledCommand* /* command */); + void handlePacketAck(const ts::protocol::PacketParser&); + void handlePacketVoice(const ts::protocol::PacketParser&); + void handlePacketPing(const ts::protocol::PacketParser&); + void handlePacketInit(const ts::protocol::PacketParser&); ServerConnection* handle; @@ -117,6 +121,7 @@ namespace tc::connection { pipes::buffer last_buffer; } pow; void pow_send_cookie_get(); + void send_init1_buffer(); struct { uint8_t alpha[10]; @@ -129,17 +134,14 @@ namespace tc::connection { } crypto; std::string generate_client_initiv(); - uint16_t client_id = 0; - ts::protocol::PacketIdManager _packet_id_manager; - packet_buffers_t _packet_buffers; - std::array _packet_buffer_overflow{false}; + uint16_t client_id{0}; - std::array incoming_generation_estimators{}; /* implementation is thread save */ - uint8_t _packet_buffers_index = 0; + std::mutex packet_id_mutex{}; + ts::protocol::PacketIdManager _packet_id_manager; bool crypt_setupped{false}; ts::connection::CryptHandler crypt_handler; - ts::connection::CompressionHandler compression_handler; + ts::protocol::PacketDecoder packet_decoder; ts::connection::AcknowledgeManager acknowledge_handler; ConnectionStatistics statistics_{}; diff --git a/native/serverconnection/src/connection/ProtocolHandlerCommands.cpp b/native/serverconnection/src/connection/ProtocolHandlerCommands.cpp index 44930f7..4cc9ecc 100644 --- a/native/serverconnection/src/connection/ProtocolHandlerCommands.cpp +++ b/native/serverconnection/src/connection/ProtocolHandlerCommands.cpp @@ -1,6 +1,5 @@ #include "ProtocolHandler.h" #include "../logger.h" -#include using namespace std; diff --git a/native/serverconnection/src/connection/ProtocolHandlerCrypto.cpp b/native/serverconnection/src/connection/ProtocolHandlerCrypto.cpp index eab2546..dfd707e 100644 --- a/native/serverconnection/src/connection/ProtocolHandlerCrypto.cpp +++ b/native/serverconnection/src/connection/ProtocolHandlerCrypto.cpp @@ -58,9 +58,9 @@ std::string ProtocolHandler::generate_client_initiv() { void ProtocolHandler::handleCommandInitIVExpend(ts::Command &cmd) { this->pow.last_buffer = pipes::buffer{}; - auto alpha = base64::decode(cmd["alpha"]); - auto beta = base64::decode(cmd["beta"]); - auto omega = base64::decode(cmd["omega"]); + auto alpha = base64::decode(cmd["alpha"].string()); + auto beta = base64::decode(cmd["beta"].string()); + auto omega = base64::decode(cmd["omega"].string()); if(alpha.length() != 10 || memcmp(alpha.data(), this->crypto.alpha, 10) != 0) { this->handle->call_connect_result.call(this->handle->errors.register_error(tr("alpha key missmatch")), true); @@ -87,7 +87,6 @@ void ProtocolHandler::handleCommandInitIVExpend(ts::Command &cmd) { log_error(category::connection, tr("Failed to setup crypto ({})"), error); return; } - this->crypt_setupped = true; if(this->server_type == server_type::UNKNOWN) { if(cmd[0].has("teaspeak") && cmd["teaspeak"].as()) { @@ -132,12 +131,12 @@ void ProtocolHandler::handleCommandInitIVExpend2(ts::Command &cmd) { if(&__ed_sha512_functions != &_ed_sha512_functions) _ed_sha512_functions = __ed_sha512_functions; - auto beta = base64::decode(cmd["beta"]); - auto omega = base64::decode(cmd["omega"]); - auto proof = base64::decode(cmd["proof"]); + auto beta = base64::decode(cmd["beta"].string()); + auto omega = base64::decode(cmd["omega"].string()); + auto proof = base64::decode(cmd["proof"].string()); - auto crypto_chain_data = base64::decode(cmd["l"]); - auto crypto_root = cmd[0].has("root") ? base64::decode(cmd["root"]) : string((char*) license::teamspeak::public_root, 32); + auto crypto_chain_data = base64::decode(cmd["l"].string()); + auto crypto_root = cmd[0].has("root") ? base64::decode(cmd["root"].string()) : string((char*) license::teamspeak::public_root, 32); auto crypto_hash = digest::sha256(crypto_chain_data); /* suspecius, tries the server to hide himself? We dont know */ @@ -200,14 +199,14 @@ void ProtocolHandler::handleCommandInitIVExpend2(ts::Command &cmd) { response["proof"] = base64::encode(sign_buffer, (unsigned long) sign_buffer_length); /* no need to send this because we're sending the clientinit as the begin packet along with the POW init */ //this->_packet_id_manager.nextPacketId(PacketTypeInfo::Command); /* skip the first because we've send our first command within the low level handshake packets */ - this->send_command(response, [&](bool success){ - if(success) { - /* trigger connected; because the connection has been established on protocol layer */ + this->send_command(response, false, std::make_unique>([&](bool success){ + if(success) { + /* trigger connected; because the connection has been established on protocol layer */ this->crypt_setupped = true; - this->handle->call_connect_result.call(0, true); - this->connection_state = connection_state::CONNECTING; - } - }); /* needs to be encrypted at the time! */ + this->handle->call_connect_result.call(0, true); + this->connection_state = connection_state::CONNECTING; + } + })); /* needs to be encrypted at the time! */ } diff --git a/native/serverconnection/src/connection/ProtocolHandlerPOW.cpp b/native/serverconnection/src/connection/ProtocolHandlerPOW.cpp index aa7ad19..5e596dd 100644 --- a/native/serverconnection/src/connection/ProtocolHandlerPOW.cpp +++ b/native/serverconnection/src/connection/ProtocolHandlerPOW.cpp @@ -43,10 +43,10 @@ inline bool solve_puzzle(mp_int& x, mp_int& n, mp_int& result, uint32_t level) { return true; } -void ProtocolHandler::handlePacketInit(const std::shared_ptr &packet) { +void ProtocolHandler::handlePacketInit(const ts::protocol::PacketParser &packet) { this->pow.last_response = system_clock::now(); - auto data = packet->data(); + auto data = packet.payload(); auto packet_state = static_cast(data[0]); if(packet_state == pow_state::COMMAND_RESET) { @@ -102,9 +102,7 @@ void ProtocolHandler::handlePacketInit(const std::shared_ptrpow.last_buffer = pipes::buffer_view{response_buffer, 25}.own_buffer(); - this->pow.last_resend = system_clock::now(); - - this->send_packet(make_shared(PacketTypeInfo::Init1, PacketFlag::Unencrypted, this->pow.last_buffer)); + this->send_init1_buffer(); } return; @@ -147,8 +145,7 @@ void ProtocolHandler::handlePacketInit(const std::shared_ptrpow.last_buffer = response_buffer; - this->pow.last_resend = system_clock::now(); - this->send_packet(make_shared(PacketTypeInfo::Init1, PacketFlag::Unencrypted, this->pow.last_buffer)); + this->send_init1_buffer(); } mp_clear_multi(&point_x, &point_n, &result, nullptr); @@ -174,6 +171,5 @@ void ProtocolHandler::pow_send_cookie_get() { memset(&response_buffer[13], 0, 8); this->pow.last_buffer = pipes::buffer_view{response_buffer, 21}.own_buffer(); - this->pow.last_resend = system_clock::now(); - this->send_packet(make_shared(PacketTypeInfo::Init1, PacketFlag::Unencrypted, this->pow.last_buffer)); + this->send_init1_buffer(); } \ No newline at end of file diff --git a/native/serverconnection/src/connection/ProtocolHandlerPackets.cpp b/native/serverconnection/src/connection/ProtocolHandlerPackets.cpp index be99b70..747278c 100644 --- a/native/serverconnection/src/connection/ProtocolHandlerPackets.cpp +++ b/native/serverconnection/src/connection/ProtocolHandlerPackets.cpp @@ -16,28 +16,34 @@ using namespace ts; //#define LOG_PING -void ProtocolHandler::handlePacketAck(const std::shared_ptr &ack) { - if(ack->data().length() < 2) return; +void ProtocolHandler::handlePacketAck(const ts::protocol::PacketParser &ack) { + if(ack.payload_length() < 2) { + return; + } string error; - auto id = be2le16(&ack->data()[0]); + auto id = be2le16(&ack.payload()[0]); //log_trace(category::connection, tr("Handle packet acknowledge for {}"), be2le16(&ack->data()[0])); - if(!this->acknowledge_handler.process_acknowledge(ack->type().type(), id, error)) { - log_warn(category::connection, tr("Failed to handle acknowledge {}: {}"), be2le16(&ack->data()[0]) ,error); + if(!this->acknowledge_handler.process_acknowledge(ack.type(), id, error)) { + log_warn(category::connection, tr("Failed to handle acknowledge {}: {}"), id, error); } } -void ProtocolHandler::handlePacketCommand(const std::shared_ptr &packet) { +void ProtocolHandler::handlePacketCommand(ts::command::ReassembledCommand* packet) { std::unique_ptr command; try { - command = make_unique(packet->asCommand()); + auto payload = packet->command_view(); + command = make_unique(Command::parse(payload, true, false)); } catch(const std::invalid_argument& ex) { log_error(category::connection, tr("Failed to parse command (invalid_argument): {}"), ex.what()); + ts::command::ReassembledCommand::free(packet); return; } catch(const std::exception& ex) { log_error(category::connection, tr("Failed to parse command (exception): {}"), ex.what()); + ts::command::ReassembledCommand::free(packet); return; } + ts::command::ReassembledCommand::free(packet); //log_trace(category::connection, tr("Handing command {}"), command->command()); if(command->command() == "initivexpand") { @@ -55,13 +61,13 @@ void ProtocolHandler::handlePacketCommand(const std::shared_ptrhandle->execute_pending_commands(); } -void ProtocolHandler::handlePacketVoice(const std::shared_ptr &packet) { +void ProtocolHandler::handlePacketVoice(const ts::protocol::PacketParser &packet) { this->handle->voice_connection->process_packet(packet); } -void ProtocolHandler::handlePacketPing(const std::shared_ptr &packet) { - if(packet->type() == PacketTypeInfo::Pong) { - uint16_t id = be2le16((char*) packet->data().data_ptr()); +void ProtocolHandler::handlePacketPing(const ts::protocol::PacketParser &packet) { + if(packet.type() == PacketType::VOICE) { + uint16_t id = be2le16((char*) packet.payload().data_ptr()); #ifdef LOG_PING cout << "Received pong (" << id << "|" << this->ping.ping_id << ")" << endl; #endif @@ -76,19 +82,21 @@ void ProtocolHandler::handlePacketPing(const std::shared_ptrpacketId(), buffer); - this->send_packet(make_shared(PacketTypeInfo::Pong, PacketFlag::Unencrypted, pipes::buffer_view{buffer, 2})); + auto response = allocate_outgoing_client_packet(2); + response->type_and_flags_ = PacketType::PONG | PacketFlag::Unencrypted; + le2be16(packet.packet_id(), response->payload); + this->send_packet(response, false); } } void ProtocolHandler::ping_send_request() { - auto packet = make_shared(PacketTypeInfo::Ping, pipes::buffer_view{}); - packet->enable_flag(PacketFlag::Unencrypted); - this->send_packet(packet); - assert(packet->memory_state.id_branded); + auto packet = allocate_outgoing_client_packet(0); + packet->type_and_flags_ = PacketType::PING | PacketFlag::Unencrypted; + + packet->ref(); + this->send_packet(packet, false); this->ping.ping_send_timestamp = chrono::system_clock::now(); - this->ping.ping_id = packet->packetId(); + this->ping.ping_id = packet->packet_id(); + packet->unref(); } \ No newline at end of file diff --git a/native/serverconnection/src/connection/ServerConnection.cpp b/native/serverconnection/src/connection/ServerConnection.cpp index a2cceca..01ea94f 100644 --- a/native/serverconnection/src/connection/ServerConnection.cpp +++ b/native/serverconnection/src/connection/ServerConnection.cpp @@ -276,7 +276,8 @@ NAN_METHOD(ServerConnection::connect) { this->protocol_handler->reset(); if(identity_key->IsString()) { auto& identity = this->protocol_handler->get_identity_key(); - auto key = base64::decode(*Nan::Utf8String(identity_key->ToString(Nan::GetCurrentContext()).ToLocalChecked())); + std::string identity_decoded{*Nan::Utf8String(identity_key->ToString(Nan::GetCurrentContext()).ToLocalChecked())}; + auto key = base64::decode(identity_decoded); if(ecc_import((u_char*) key.data(), (unsigned long) key.length(), &identity) != CRYPT_OK) { Nan::ThrowError(tr("failed to import identity")); return; @@ -528,7 +529,7 @@ NAN_METHOD(ServerConnection::send_command) { } } - this->protocol_handler->send_command(cmd); + this->protocol_handler->send_command(cmd, false); auto end = chrono::system_clock::now(); } NAN_METHOD(ServerConnection::_send_voice_data) { @@ -587,20 +588,20 @@ NAN_METHOD(ServerConnection::send_voice_data_raw) { static shared_ptr shuffle_cached_packet; #endif void ServerConnection::send_voice_data(const void *buffer, size_t buffer_length, uint8_t codec, bool head) { - auto _buffer = pipes::buffer{ts::protocol::ClientPacket::META_SIZE + buffer_length + 3}; - auto packet = ts::protocol::ClientPacket::from_buffer(_buffer); - memset(&_buffer[ts::protocol::ClientPacket::META_MAC_SIZE], 0, ts::protocol::ClientPacket::META_HEADER_SIZE); /* reset all header data */ - packet->type(ts::protocol::PacketTypeInfo::Voice); + auto packet = ts::protocol::allocate_outgoing_client_packet(buffer_length + 3); + packet->type_and_flags_ = ts::protocol::PacketType::VOICE; - auto data_buffer = packet->data(); - le2be16(this->voice_packet_id++, &data_buffer[0]); /* set voice packet id */ - data_buffer[2] = (uint8_t) codec; /* set voice codec */ - if(buffer_length > 0 && buffer) - memcpy(&data_buffer[3], buffer, buffer_length); + auto packet_payload = (uint8_t*) packet->payload; + *(uint16_t*) packet_payload = htons(this->voice_packet_id++); + packet_payload[2] = (uint8_t) codec; + if(buffer_length > 0 && buffer) { + memcpy(&packet_payload[3], buffer, buffer_length); + } - if(head) /* head packet */ - packet->enable_flag(ts::protocol::PacketFlag::Compressed); - packet->enable_flag(ts::protocol::PacketFlag::Unencrypted); + if(head) { + packet->type_and_flags_ |= ts::protocol::PacketFlag::Compressed; + } + packet->type_and_flags_ |= ts::protocol::PacketFlag::Unencrypted; #ifdef FUZZ_VOICE if((rand() % 10) < 2) { @@ -616,7 +617,7 @@ void ServerConnection::send_voice_data(const void *buffer, size_t buffer_length, shuffle_cached_packet = packet; } #else - this->protocol_handler->send_packet(std::shared_ptr(packet.release())); + this->protocol_handler->send_packet(packet, false); #endif } diff --git a/native/serverconnection/src/connection/audio/VoiceConnection.cpp b/native/serverconnection/src/connection/audio/VoiceConnection.cpp index d1f4998..c3a07ac 100644 --- a/native/serverconnection/src/connection/audio/VoiceConnection.cpp +++ b/native/serverconnection/src/connection/audio/VoiceConnection.cpp @@ -349,16 +349,18 @@ void VoiceConnection::delete_client(const std::shared_ptr &packet) { - if(packet->type() == PacketTypeInfo::Voice) { - if(packet->data().length() < 5) { +void VoiceConnection::process_packet(const ts::protocol::PacketParser &packet) { + if(packet.type() == ts::protocol::PacketType::VOICE) { + if(packet.payload_length() < 5) { //TODO log invalid voice packet return; } - auto packet_id = be2le16(&packet->data()[0]); - auto client_id = be2le16(&packet->data()[2]); - auto codec_id = (uint8_t) packet->data()[4]; - auto flag_head = packet->has_flag(PacketFlag::Compressed); + + auto payload = packet.payload(); + auto packet_id = be2le16(&payload[0]); + auto client_id = be2le16(&payload[2]); + auto codec_id = (uint8_t) payload[4]; + auto flag_head = packet.has_flag(ts::protocol::PacketFlag::Compressed); //container->voice_data = packet->data().length() > 5 ? packet->data().range(5) : pipes::buffer{}; //log_info(category::voice_connection, tr("Received voice packet from {}. Packet ID: {}"), client_id, packet_id); @@ -368,8 +370,8 @@ void VoiceConnection::process_packet(const std::shared_ptrdata().length() > 5) { - client->process_packet(packet_id, packet->data().range(5), (codec::value) codec_id, flag_head); + if(payload.length() > 5) { + client->process_packet(packet_id, payload.view(5), (codec::value) codec_id, flag_head); } else { client->process_packet(packet_id, pipes::buffer_view{nullptr, 0}, (codec::value) codec_id, flag_head); } diff --git a/native/serverconnection/src/connection/audio/VoiceConnection.h b/native/serverconnection/src/connection/audio/VoiceConnection.h index 8c2b2f0..5ee3ab4 100644 --- a/native/serverconnection/src/connection/audio/VoiceConnection.h +++ b/native/serverconnection/src/connection/audio/VoiceConnection.h @@ -4,14 +4,13 @@ #include #include #include +#include #include namespace tc { - namespace audio { - namespace recorder { - class AudioConsumerWrapper; - } - } + namespace audio::recorder { + class AudioConsumerWrapper; + } namespace connection { class ServerConnection; @@ -87,7 +86,7 @@ namespace tc { std::shared_ptr register_client(uint16_t /* client id */); void delete_client(const std::shared_ptr&); - void process_packet(const std::shared_ptr&); + void process_packet(const ts::protocol::PacketParser&); void set_encoder_codec(const uint8_t& /* codec */); uint8_t get_encoder_codec(); diff --git a/native/serverconnection/src/connection/ft/FileTransferManager.cpp b/native/serverconnection/src/connection/ft/FileTransferManager.cpp index a850083..91a3bdc 100644 --- a/native/serverconnection/src/connection/ft/FileTransferManager.cpp +++ b/native/serverconnection/src/connection/ft/FileTransferManager.cpp @@ -1,7 +1,6 @@ #include "FileTransferManager.h" #include "FileTransferObject.h" -#include #include #include #include @@ -527,11 +526,32 @@ void FileTransferManager::initialize() { this->event_io_thread = std::thread(&FileTransferManager::_execute_event_loop, this); } +bool save_join(std::thread &thread, bool rd) { + try { + if(thread.joinable()) + thread.join(); + } catch(const std::system_error& ex) { + if(ex.code() == std::errc::resource_deadlock_would_occur) { + if(rd) + return false; + throw; + } else if(ex.code() == std::errc::no_such_process) { + return false; + } else if(ex.code() == std::errc::invalid_argument) { + return false; + } else { + throw; + } + } + return true; +} + + void FileTransferManager::finalize() { this->event_io_canceled = true; event_base_loopbreak(this->event_io); - threads::save_join(this->event_io_thread, false); + save_join(this->event_io_thread, false); //TODO drop all file transfers! event_base_free(this->event_io); diff --git a/native/serverconnection/src/logger.h b/native/serverconnection/src/logger.h index 82ba37c..3c8123c 100644 --- a/native/serverconnection/src/logger.h +++ b/native/serverconnection/src/logger.h @@ -55,8 +55,9 @@ namespace tc_logger { template inline void log(category::value category, level::level_enum lvl, const char *fmt, const Args &... args) { - if (should_log(lvl)) - force_log(category, lvl, fmt, args...); + if (should_log(lvl)) { + force_log(category, lvl, fmt, args...); + } } template diff --git a/native/serverconnection/src/thread_helper.cpp b/native/serverconnection/src/thread_helper.cpp new file mode 100644 index 0000000..b3adbaf --- /dev/null +++ b/native/serverconnection/src/thread_helper.cpp @@ -0,0 +1,94 @@ +#include "./thread_helper.h" +#include +#include + +#ifdef WIN32 +#include +#else +#include +#endif + +std::string threads::name(std::thread &thread) { +#ifdef WIN32 + static std::string _empty{}; + return _empty; +#else + char buffer[255]; /* min 16 characters */ + pthread_getname_np(thread.native_handle(), buffer, 255); + return std::string{buffer}; +#endif +} + +bool threads::name(std::thread &thread, const std::string_view &name) { +#ifdef WIN32 + return false; +#else + char buffer[255]; /* min 16 characters */ + + memcpy(buffer, name.data(), name.length()); + buffer[name.length()] = '\0'; + buffer[16] = '\0'; /* cut of the name after 16 characters */ + + auto error = pthread_setname_np(thread.native_handle(), buffer); + return error == 0; +#endif +} + +bool threads::save_join(std::thread &thread, bool rd) { + try { + if(thread.joinable()) + thread.join(); + } catch(const std::system_error& ex) { + if(ex.code() == std::errc::resource_deadlock_would_occur) { + if(rd) + return false; + throw; + } else if(ex.code() == std::errc::no_such_process) { + return false; + } else if(ex.code() == std::errc::invalid_argument) { + return false; + } else { + throw; + } + } + return true; +} + +bool threads::timed_join(std::thread &thread, const std::chrono::nanoseconds &timeout) { +#ifdef WIN32 + auto result = WaitForSingleObject(thread.native_handle(), (DWORD) std::chrono::floor(timeout).count()); + if(result != 0) + return false; + if(thread.joinable()) + thread.join(); + return result; +#else + struct timespec ts{}; + if (clock_gettime(CLOCK_REALTIME, &ts) == -1) + return false; /* failed to get clock time */ + + auto seconds = std::chrono::floor(timeout); + auto nanoseconds = std::chrono::ceil(timeout) - seconds; + ts.tv_sec += seconds.count(); + ts.tv_nsec += nanoseconds.count(); + if(ts.tv_nsec >= 1e9) { + ts.tv_sec += 1; + ts.tv_nsec -= 1e9; + } + auto result = pthread_timedjoin_np(thread.native_handle(), nullptr, &ts); + if(result > 0 && result != ESRCH) return false; + + /* let the std lib set their flags */ + std::thread _empty{}; /* could be destroyed even with an "active" thread handle because we overwrote the std::terminate() function with a macro */ + _empty = std::move(thread); + + /* + * Undefined behaviour: + * We destroy everything in a non trivial class, + * But when we take a closer look its just a wrapper around the native_handle type which could be an DWORD or a pthread_t which are both trivial destructible! + */ + memset(&_empty, 0, sizeof(_empty)); // NOLINT(bugprone-undefined-memory-manipulation) + + return true; +#endif +} \ No newline at end of file diff --git a/native/serverconnection/src/thread_helper.h b/native/serverconnection/src/thread_helper.h new file mode 100644 index 0000000..f746f3e --- /dev/null +++ b/native/serverconnection/src/thread_helper.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include + +namespace threads { + extern bool name(std::thread& /* thread */, const std::string_view& /* name */); + extern std::string name(std::thread& /* thread */); + + /* + * This function will not throw an error if the thread has already been joined. + * It returns true if join succeeded, false on any error (like thread has already be joined) + */ + extern bool save_join(std::thread& /* thread */, bool /* ignore resource deadlock */ = false); + + extern bool timed_join(std::thread& /* thread */, const std::chrono::nanoseconds& /* timeout */); + + template + inline bool timed_join(std::thread& thread, const std::chrono::time_point& timeout) { + auto now = Clock::now(); + if(now > timeout) + timeout = now; + return timed_join(thread, timeout - now); + } +}