From 4367736c77e934bd1a2f05247c2673a1786a6b05 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sun, 7 Feb 2021 12:27:27 +0100 Subject: [PATCH] Moved some voice udp utils to the shared teaspeak part --- CMakeLists.txt | 14 +- src/PermissionManager.cpp | 1 + src/lookup/ipv4.h | 60 +++--- src/lookup/ipv6.h | 64 +++--- src/misc/spin_mutex.h | 3 +- src/protocol/Packet.cpp | 50 ++++- src/protocol/Packet.h | 159 ++++++++++++--- src/protocol/PacketDecoder.cpp | 312 ++++++++++++++++++++++++++++++ src/protocol/PacketDecoder.h | 94 +++++++++ src/protocol/PacketStatistics.cpp | 124 ++++++++++++ src/protocol/PacketStatistics.h | 68 +++++++ src/protocol/PingHandler.cpp | 76 ++++++++ src/protocol/PingHandler.h | 46 +++++ src/protocol/RawCommand.cpp | 19 ++ src/protocol/RawCommand.h | 48 +++++ 15 files changed, 1042 insertions(+), 96 deletions(-) create mode 100644 src/protocol/PacketDecoder.cpp create mode 100644 src/protocol/PacketDecoder.h create mode 100644 src/protocol/PacketStatistics.cpp create mode 100644 src/protocol/PacketStatistics.h create mode 100644 src/protocol/PingHandler.cpp create mode 100644 src/protocol/PingHandler.h create mode 100644 src/protocol/RawCommand.cpp create mode 100644 src/protocol/RawCommand.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 70e9ae1..ea3e68b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -137,6 +137,10 @@ set(SOURCE_FILES src/protocol/ringbuffer.cpp src/protocol/AcknowledgeManager.cpp src/protocol/PacketLossCalculator.cpp + src/protocol/RawCommand.cpp + src/protocol/PacketDecoder.cpp + src/protocol/PingHandler.cpp + src/protocol/PacketStatistics.cpp ) set(HEADER_FILES @@ -234,18 +238,14 @@ target_link_libraries(TeaSpeak PUBLIC dl ) -find_package(mysql) -if(mysql) +if (TEASPEAK_SERVER) + find_package(mysql REQUIRED) message("Found MySQL") target_link_libraries(TeaSpeak PUBLIC mysql::client::static - ) + ) target_compile_options(TeaSpeak PRIVATE "-Wall" "-DHAVE_MYSQL_H") -else() - message("Building without MySQL Support") -endif() -if (TEASPEAK_SERVER) target_link_libraries(TeaSpeak PUBLIC CXXTerminal::static) endif () diff --git a/src/PermissionManager.cpp b/src/PermissionManager.cpp index 70a0134..cb26a63 100644 --- a/src/PermissionManager.cpp +++ b/src/PermissionManager.cpp @@ -2,6 +2,7 @@ #include #include "misc/memtracker.h" #include "./PermissionManager.h" +#include "./BasicChannel.h" using namespace std; using namespace ts; diff --git a/src/lookup/ipv4.h b/src/lookup/ipv4.h index f174f2e..5e625c3 100644 --- a/src/lookup/ipv4.h +++ b/src/lookup/ipv4.h @@ -3,32 +3,44 @@ #include #include "./ip.h" -namespace lookup::ipv4_impl { - union uaddress_t { - struct { - uint32_t address; - uint16_t port; +namespace lookup { + namespace ipv4_impl { + union uaddress_t { + struct { + uint32_t address{0}; + uint16_t port{0}; + }; + + uint64_t value; }; - uint64_t value; - }; + struct converter { + constexpr inline void operator()(uaddress_t& result, const sockaddr_in& addr) { + result.address = addr.sin_addr.s_addr; + result.port = addr.sin_port; + } + }; - struct converter { - constexpr inline void operator()(uaddress_t& result, const sockaddr_in& addr) { - result.address = addr.sin_addr.s_addr; - result.port = addr.sin_port; - } - }; + struct comparator { + constexpr inline bool operator()(const uaddress_t& a, const uaddress_t& b) { + return a.value == b.value; + } + }; - struct comparator { - constexpr inline bool operator()(const uaddress_t& a, const uaddress_t& b) { - return a.value == b.value; - } - }; + struct hash { + constexpr inline uint8_t operator()(const sockaddr_in& address) { + return (address.sin_addr.s_addr & 0xFFU) ^ (address.sin_port); + } + }; + } - struct hash { - constexpr inline uint8_t operator()(const sockaddr_in& address) { - return (address.sin_addr.s_addr & 0xFFU) ^ (address.sin_port); - } - }; -} \ No newline at end of file + template + using ip_v4 = ip_vx< + T, + sockaddr_in, + ipv4_impl::uaddress_t, + ipv4_impl::converter, + ipv4_impl::comparator, + ipv4_impl::hash + >; +} diff --git a/src/lookup/ipv6.h b/src/lookup/ipv6.h index 30c61a9..4a7174f 100644 --- a/src/lookup/ipv6.h +++ b/src/lookup/ipv6.h @@ -3,37 +3,49 @@ #include #include "./ip.h" -namespace lookup::ipv6_impl { - struct address_t { - union { - uint64_t address_u64[ 2]; +namespace lookup { + namespace ipv6_impl { + struct address_t { + union { + uint64_t address_u64[ 2]; + }; + + uint16_t port; }; - uint16_t port; - }; + struct converter { + constexpr inline void operator()(address_t& result, const sockaddr_in6& addr) { + auto addr_ptr = (uint64_t*) &addr.sin6_addr; - struct converter { - constexpr inline void operator()(address_t& result, const sockaddr_in6& addr) { - auto addr_ptr = (uint64_t*) &addr.sin6_addr; + result.address_u64[0] = addr_ptr[0]; + result.address_u64[1] = addr_ptr[1]; - result.address_u64[0] = addr_ptr[0]; - result.address_u64[1] = addr_ptr[1]; + result.port = addr.sin6_port; + } + }; - result.port = addr.sin6_port; - } - }; + struct comparator { + constexpr inline bool operator()(const address_t& a, const address_t& b) { + return a.address_u64[0] == b.address_u64[0] && a.address_u64[1] == b.address_u64[1] && a.port == b.port; + } + }; - struct comparator { - constexpr inline bool operator()(const address_t& a, const address_t& b) { - return a.address_u64[0] == b.address_u64[0] && a.address_u64[1] == b.address_u64[1] && a.port == b.port; - } - }; + struct hash { + constexpr inline uint8_t operator()(const sockaddr_in6& address) { + auto addr_ptr = (uint8_t*) &address.sin6_addr; - struct hash { - constexpr inline uint8_t operator()(const sockaddr_in6& address) { - auto addr_ptr = (uint8_t*) &address.sin6_addr; + return (uint8_t) (addr_ptr[8] ^ addr_ptr[9]) ^ (uint8_t) (addr_ptr[15] ^ address.sin6_port); + } + }; + } - return (uint8_t) (addr_ptr[8] ^ addr_ptr[9]) ^ (uint8_t) (addr_ptr[15] ^ address.sin6_port); - } - }; -} \ No newline at end of file + template + using ip_v6 = ip_vx< + T, + sockaddr_in6, + ipv6_impl::address_t, + ipv6_impl::converter, + ipv6_impl::comparator, + ipv6_impl::hash + >; +} diff --git a/src/misc/spin_mutex.h b/src/misc/spin_mutex.h index 8fe5db6..c2e78e7 100644 --- a/src/misc/spin_mutex.h +++ b/src/misc/spin_mutex.h @@ -22,8 +22,9 @@ class spin_mutex { uint8_t round = 0; while (locked.load(std::memory_order_relaxed)) { //Yield when we're using this lock for a longer time, which we usually not doing - if(round++ % 8 == 0) + if(round++ % 8 == 0) { std::this_thread::yield(); + } } } diff --git a/src/protocol/Packet.cpp b/src/protocol/Packet.cpp index 1a8dba9..8647f19 100644 --- a/src/protocol/Packet.cpp +++ b/src/protocol/Packet.cpp @@ -4,7 +4,6 @@ #include #include -#include #include #include "./Packet.h" #include "../misc/endianness.h" @@ -23,19 +22,33 @@ namespace ts { uint8_t ServerPacketParser::flags() const { return (uint8_t) this->_buffer[ClientPacketParser::kHeaderOffset + 2] & 0xF0U; } } + void construct_ocp(protocol::OutgoingClientPacket* packet) { + new (packet) protocol::OutgoingClientPacket{}; + } + + void deconstruct_ocp(protocol::OutgoingClientPacket* packet) { + packet->~OutgoingClientPacket(); + } + + void reset_ocp(protocol::OutgoingClientPacket* packet, size_t payload_size) { + packet->next = nullptr; + packet->payload_size = payload_size; + packet->type_and_flags_ = 0; + packet->generation = 0; + } + void construct_osp(protocol::OutgoingServerPacket* packet) { - new (&packet->ref_count) std::atomic{}; + new (packet) protocol::OutgoingServerPacket{}; } void deconstruct_osp(protocol::OutgoingServerPacket* packet) { - packet->ref_count.~atomic(); + packet->~OutgoingServerPacket(); } void reset_osp(protocol::OutgoingServerPacket* packet, size_t payload_size) { packet->next = nullptr; packet->payload_size = payload_size; - packet->type_and_flags = 0; - + packet->type_and_flags_ = 0; packet->generation = 0; } @@ -78,7 +91,7 @@ namespace ts { return bentry; } - void protocol::OutgoingServerPacket::object_freed() { + void protocol::OutgoingServerPacket::free_object() { auto bentry = (OSPBukkitEntry*) bosp_from_osp(this); if(bentry->extra_allocated) { destroy_bosp(bentry); @@ -98,8 +111,8 @@ namespace ts { sdp_count++; } - protocol::OutgoingServerPacket* protocol::allocate_outgoing_packet(size_t payload_size) { - if(BUKKIT_ENTRY_SIZE > payload_size) { + protocol::OutgoingServerPacket* protocol::allocate_outgoing_server_packet(size_t payload_size) { + if(BUKKIT_ENTRY_SIZE >= payload_size) { std::lock_guard block{osp_mutex}; if(osp_head) { assert(sdp_count > 0); @@ -140,12 +153,12 @@ namespace ts { return result; } #else - void protocol::OutgoingServerPacket::object_freed() { + void protocol::OutgoingServerPacket::free_object() { deconstruct_osp(this); ::free(this); } - protocol::OutgoingServerPacket* protocol::allocate_outgoing_packet(size_t payload_size) { + protocol::OutgoingServerPacket* protocol::allocate_outgoing_server_packet(size_t payload_size) { auto base_size = sizeof(protocol::OutgoingServerPacket) - 1; auto full_size = base_size + payload_size; auto result = (protocol::OutgoingServerPacket*) malloc(full_size); @@ -157,4 +170,21 @@ namespace ts { return result; } #endif + + void protocol::OutgoingClientPacket::free_object() { + deconstruct_ocp(this); + ::free(this); + } + + protocol::OutgoingClientPacket* protocol::allocate_outgoing_client_packet(size_t payload_size) { + auto base_size = sizeof(protocol::OutgoingClientPacket) - 1; + auto full_size = base_size + payload_size; + auto result = (protocol::OutgoingClientPacket*) malloc(full_size); + + construct_ocp(result); + reset_ocp(result, payload_size); + result->ref_count++; + + return result; + } } \ No newline at end of file diff --git a/src/protocol/Packet.h b/src/protocol/Packet.h index 2bb8caa..e113c59 100644 --- a/src/protocol/Packet.h +++ b/src/protocol/Packet.h @@ -98,7 +98,9 @@ namespace ts::protocol { [[nodiscard]] inline pipes::buffer_view buffer() const { return this->_buffer; } [[nodiscard]] inline pipes::buffer_view mac() const { return this->_buffer.view(0, 8); } + [[nodiscard]] virtual pipes::buffer_view header() const = 0; [[nodiscard]] virtual pipes::buffer_view payload() const = 0; + [[nodiscard]] virtual void* payload_ptr_mut() = 0; [[nodiscard]] virtual size_t payload_length() const = 0; [[nodiscard]] inline uint32_t full_packet_id() const { return this->packet_id() | (uint32_t) ((uint32_t) this->estimated_generation() << 16U); } @@ -145,8 +147,9 @@ namespace ts::protocol { if(this->_buffer.length() < kPayloadOffset) return false; return this->type() <= 8; } - + [[nodiscard]] inline pipes::buffer_view header() const override { return this->_buffer.view(kHeaderOffset, kHeaderLength); } [[nodiscard]] inline pipes::buffer_view payload() const override { return this->_buffer.view(kPayloadOffset); } + [[nodiscard]] inline void* payload_ptr_mut() override { return (char*) this->mutable_data_ptr() + kPayloadOffset; }; [[nodiscard]] inline size_t payload_length() const override { return this->_buffer.length() - kPayloadOffset; } [[nodiscard]] uint16_t client_id() const; @@ -170,7 +173,9 @@ namespace ts::protocol { return this->type() <= 8; } + [[nodiscard]] inline pipes::buffer_view header() const override { return this->_buffer.view(kHeaderOffset, kHeaderLength); } [[nodiscard]] inline pipes::buffer_view payload() const override { return this->_buffer.view(kPayloadOffset); } + [[nodiscard]] inline void* payload_ptr_mut() override { return (char*) this->mutable_data_ptr() + kPayloadOffset; }; [[nodiscard]] inline size_t payload_length() const override { return this->_buffer.length() - kPayloadOffset; } [[nodiscard]] uint16_t packet_id() const override; @@ -178,29 +183,16 @@ namespace ts::protocol { [[nodiscard]] uint8_t flags() const override; }; - struct OutgoingServerPacket { + struct OutgoingPacket { public: + OutgoingPacket() = default; + virtual ~OutgoingPacket() = default; + /* general info */ - std::atomic ref_count; + std::atomic ref_count{0}; size_t payload_size; - - OutgoingServerPacket* next; /* used within the write/process queue */ uint16_t generation; - /* actual packet data */ - uint8_t mac[8]; - uint8_t packet_id_bytes[2]; - uint8_t type_and_flags; - uint8_t payload[1]; /* variable size */ - - [[nodiscard]] inline const void* packet_data() const { - return this->mac; - } - - [[nodiscard]] inline size_t packet_length() const { - return this->payload_size + (8 + 2 + 1); - } - inline auto ref() { auto count = ++ref_count; assert(count > 1); @@ -208,29 +200,140 @@ namespace ts::protocol { } inline void unref() { - if(--this->ref_count == 0) - this->object_freed(); + if(--this->ref_count == 0) { + this->free_object(); + } } /* some helper methods */ inline void set_packet_id(uint16_t id) { - this->packet_id_bytes[0] = id >> 8U; - this->packet_id_bytes[1] = id & 0xFFU; + auto packet_id_bytes = this->packet_id_bytes(); + packet_id_bytes[0] = id >> 8U; + packet_id_bytes[1] = id & 0xFFU; } [[nodiscard]] inline uint16_t packet_id() const { - return (uint16_t) (this->packet_id_bytes[0] << 8U) | this->packet_id_bytes[1]; + auto packet_id_bytes = this->packet_id_bytes(); + return (uint16_t) (packet_id_bytes[0] << 8U) | packet_id_bytes[1]; } [[nodiscard]] inline auto packet_type() const { - return (PacketType) (this->type_and_flags & 0xFU); + auto type_and_flags = this->type_and_flags(); + return (PacketType) (type_and_flags & 0xFU); } - private: - void object_freed(); + + /** + * @returns a pointer to the beginning of the packet including the packet header + */ + [[nodiscard]] virtual const void* packet_data() const = 0; + + /** + * @returns the full packet length including the packet header + */ + [[nodiscard]] virtual size_t packet_length() const = 0; + [[nodiscard]] virtual uint8_t type_and_flags() const = 0; + + [[nodiscard]] virtual OutgoingPacket* next_in_queue() const = 0; + virtual void set_next_in_queue(OutgoingPacket*) = 0; + protected: + [[nodiscard]] virtual const uint8_t* packet_id_bytes() const = 0; + [[nodiscard]] virtual uint8_t* packet_id_bytes() = 0; + virtual void free_object() = 0; + }; + + struct OutgoingClientPacket : public OutgoingPacket { + public: + OutgoingClientPacket() = default; + virtual ~OutgoingClientPacket() = default; + + OutgoingClientPacket* next; /* used within the write/process queue */ + + /* actual packet data */ + uint8_t mac[8]; + uint8_t packet_id_bytes_[2]; + uint8_t client_id_bytes[2]; + uint8_t type_and_flags_; + uint8_t payload[1]; /* variable size */ + + [[nodiscard]] inline const void* packet_data() const override { + return this->mac; + } + + [[nodiscard]] inline size_t packet_length() const override { + return this->payload_size + (8 + 2 + 2 + 1); + } + + [[nodiscard]] inline uint8_t type_and_flags() const override { + return this->type_and_flags_; + } + + [[nodiscard]] inline OutgoingPacket* next_in_queue() const override { + return this->next; + } + + inline void set_next_in_queue(OutgoingPacket* packet) override { + this->next = dynamic_cast(packet); + assert(!packet || this->next); + } + protected: + [[nodiscard]] inline const uint8_t* packet_id_bytes() const override { + return this->packet_id_bytes_; + } + + [[nodiscard]] inline uint8_t* packet_id_bytes() override { + return this->packet_id_bytes_; + } + + void free_object() override; + }; + + struct OutgoingServerPacket : public OutgoingPacket { + public: + virtual ~OutgoingServerPacket() = default; + + OutgoingServerPacket* next; /* used within the write/process queue */ + + /* actual packet data */ + uint8_t mac[8]; + uint8_t packet_id_bytes_[2]; + uint8_t type_and_flags_; + uint8_t payload[1]; /* variable size */ + + [[nodiscard]] inline uint8_t type_and_flags() const override { + return this->type_and_flags_; + } + + [[nodiscard]] inline const void* packet_data() const override { + return this->mac; + } + + [[nodiscard]] inline size_t packet_length() const override { + return this->payload_size + (8 + 2 + 1); + } + + [[nodiscard]] inline OutgoingPacket* next_in_queue() const override { + return this->next; + } + + inline void set_next_in_queue(OutgoingPacket* packet) override { + this->next = dynamic_cast(packet); + assert(!packet || this->next); + } + protected: + [[nodiscard]] inline const uint8_t* packet_id_bytes() const override { + return this->packet_id_bytes_; + } + + [[nodiscard]] inline uint8_t* packet_id_bytes() override { + return this->packet_id_bytes_; + } + + void free_object() override; }; /* This will allocate a new outgoing packet. To delete just unref the packet! */ - OutgoingServerPacket* allocate_outgoing_packet(size_t /* payload size */); + OutgoingServerPacket* allocate_outgoing_server_packet(size_t /* payload size */); + OutgoingClientPacket* allocate_outgoing_client_packet(size_t /* payload size */); inline PacketFlags& operator|=(PacketFlags& flags, const PacketFlag& flag) { flags |= (uint8_t) flag; diff --git a/src/protocol/PacketDecoder.cpp b/src/protocol/PacketDecoder.cpp new file mode 100644 index 0000000..176690a --- /dev/null +++ b/src/protocol/PacketDecoder.cpp @@ -0,0 +1,312 @@ +// +// Created by WolverinDEV on 10/03/2020. +// + +#include "PacketDecoder.h" + +#include "../misc/memtracker.h" +#include "./AcknowledgeManager.h" +#include "./CompressionHandler.h" +#include "./CryptHandler.h" + +#ifdef FEATURE_LOGGING +#include +#endif + +using namespace ts; +using namespace ts::protocol; +using namespace ts::connection; + +PacketDecoder::PacketDecoder(ts::connection::CryptHandler *crypt_handler) + : crypt_handler_{crypt_handler} { + memtrack::allocated(this); +} + +PacketDecoder::~PacketDecoder() { + memtrack::freed(this); + this->reset(); +} + +void PacketDecoder::reset() { + std::lock_guard buffer_lock(this->packet_buffer_lock); + for(auto& buffer : this->_command_fragment_buffers) { + buffer.reset(); + } +} + +PacketProcessResult PacketDecoder::process_incoming_data(PacketParser &packet_parser, std::string& error) { +#ifdef FUZZING_TESTING_INCOMMING + if(rand() % 100 < 20) { + return PacketProcessResult::FUZZ_DROPPED; + } + + #ifdef FIZZING_TESTING_DISABLE_HANDSHAKE + if (this->client->state == ConnectionState::CONNECTED) { + #endif + if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) { + debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping incoming packet of length {}", CLIENT_STR_LOG_PREFIX_(this->client), buffer.length()); + return; + } + #ifdef FIZZING_TESTING_DISABLE_HANDSHAKE + } + #endif +#endif + auto result = this->decode_incoming_packet(error, packet_parser); + if(result != PacketProcessResult::SUCCESS) { + return result; + } + +#ifdef LOG_INCOMPING_PACKET_FRAGMENTS + debugMessage(lstream << CLIENT_LOG_PREFIX << "Recived packet. PacketId: " << packet->packetId() << " PacketType: " << packet->type().name() << " Flags: " << packet->flags() << " - " << packet->data() << endl); +#endif + auto is_command = packet_parser.type() == protocol::COMMAND || packet_parser.type() == protocol::COMMAND_LOW; + if(is_command) { + auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())]; + CommandFragment fragment_entry{ + packet_parser.packet_id(), + packet_parser.estimated_generation(), + + packet_parser.flags(), + (uint32_t) packet_parser.payload_length(), + packet_parser.payload().own_buffer() + }; + + std::unique_lock queue_lock(fragment_buffer.buffer_lock); + + auto insert_result = fragment_buffer.insert_index2(packet_parser.full_packet_id(), std::move(fragment_entry)); + if(insert_result != 0) { + queue_lock.unlock(); + + error = "pid: " + std::to_string(packet_parser.packet_id()) + ", "; + error += "bidx: " + std::to_string(fragment_buffer.current_index()) + ", "; + error += "bcap: " + std::to_string(fragment_buffer.capacity()); + + if(insert_result == -2) { + return PacketProcessResult::DUPLICATED_PACKET; + } else if(insert_result == -1) { + this->callback_send_acknowledge(this->callback_argument, packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW); + return PacketProcessResult::BUFFER_UNDERFLOW; + } else if(insert_result == 1) { + return PacketProcessResult::BUFFER_OVERFLOW; + } + + assert(false); + return PacketProcessResult::UNKNOWN_ERROR; + } + + this->callback_send_acknowledge(this->callback_argument, packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW); + + ReassembledCommand* command{nullptr}; + CommandReassembleResult assemble_result; + do { + if(!queue_lock.owns_lock()) { + queue_lock.lock(); + } + + assemble_result = this->try_reassemble_ordered_packet(fragment_buffer, queue_lock, command); + + if(assemble_result == CommandReassembleResult::SUCCESS || assemble_result == CommandReassembleResult::MORE_COMMANDS_PENDING) { + this->callback_decoded_command(this->callback_argument, command); + } + + if(command) { + /* ownership hasn't transferred */ + ReassembledCommand::free(command); + command = nullptr; + } + + switch (assemble_result) { + case CommandReassembleResult::NO_COMMANDS_PENDING: + case CommandReassembleResult::SUCCESS: + case CommandReassembleResult::MORE_COMMANDS_PENDING: + break; + + case CommandReassembleResult::SEQUENCE_LENGTH_TOO_LONG: + return PacketProcessResult::COMMAND_BUFFER_OVERFLOW; + + case CommandReassembleResult::COMMAND_TOO_LARGE: + return PacketProcessResult::COMMAND_TOO_LARGE; + + case CommandReassembleResult::COMMAND_DECOMPRESS_FAILED: + return PacketProcessResult::COMMAND_DECOMPRESS_FAILED; + + default: + assert(false); + break; + } + } while(assemble_result == CommandReassembleResult::MORE_COMMANDS_PENDING); + } else { + this->callback_decoded_packet(this->callback_argument, packet_parser); + } + + return PacketProcessResult::SUCCESS; +} + +PacketProcessResult PacketDecoder::decode_incoming_packet(std::string& error, PacketParser &packet_parser) { + assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size()); + + auto& generation_estimator = this->incoming_generation_estimators[packet_parser.type()]; + { + std::lock_guard glock{this->incoming_generation_estimator_lock}; + packet_parser.set_estimated_generation(generation_estimator.visit_packet(packet_parser.packet_id())); + } + + /* decrypt the packet if needed */ + if(packet_parser.is_encrypted()) { + CryptHandler::key_t crypt_key{}; + CryptHandler::nonce_t crypt_nonce{}; + + bool use_default_key{!this->crypt_handler_->encryption_initialized()}, decrypt_result; + + decrypt_packet: + if(use_default_key) { + crypt_key = CryptHandler::kDefaultKey; + crypt_nonce = CryptHandler::kDefaultNonce; + } else { + if(!this->crypt_handler_->generate_key_nonce(true, packet_parser.type(), packet_parser.packet_id(), packet_parser.estimated_generation(), crypt_key, crypt_nonce)) { + return PacketProcessResult::DECRYPT_KEY_GEN_FAILED; + } + } + + auto mac = packet_parser.mac(); + auto header = packet_parser.header(); + auto payload = packet_parser.payload_ptr_mut(); + decrypt_result = this->crypt_handler_->decrypt( + header.data_ptr(), header.length(), + payload, packet_parser.payload_length(), + mac.data_ptr(), + crypt_key, crypt_nonce, + error + ); + + if(!decrypt_result) { + if(packet_parser.packet_id() < 10 && packet_parser.estimated_generation() == 0) { + if(use_default_key) { + return PacketProcessResult::DECRYPT_FAILED; + } else { + use_default_key = true; + goto decrypt_packet; + } + } else { + return PacketProcessResult::DECRYPT_FAILED; + } + } + packet_parser.set_decrypted(); + } + + return PacketProcessResult::SUCCESS; +} + +bool PacketDecoder::verify_encryption_client_packet(const pipes::buffer_view &buffer) { + ClientPacketParser packet_parser{buffer}; + if(!packet_parser.valid() || !packet_parser.is_encrypted()) { + return false; + } + + assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size()); + return this->crypt_handler_->verify_encryption(buffer, packet_parser.packet_id(), this->incoming_generation_estimators[packet_parser.type()].generation()); +} + +void PacketDecoder::register_initiv_packet() { + auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)]; + std::unique_lock buffer_lock(fragment_buffer.buffer_lock); + fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */ +} + +CommandReassembleResult PacketDecoder::try_reassemble_ordered_packet( + command_fragment_buffer_t &buffer, + std::unique_lock &buffer_lock, + ReassembledCommand *&assembled_command) { + assert(buffer_lock.owns_lock()); + + if(!buffer.front_set()) { + return CommandReassembleResult::NO_COMMANDS_PENDING; + } + + uint8_t packet_flags; + + std::unique_ptr rcommand{nullptr, ReassembledCommand::free}; + + /* lets find out if we've to reassemble the packet */ + auto& first_buffer = buffer.slot_value(0); + if(first_buffer.packet_flags & PacketFlag::Fragmented) { + uint16_t sequence_length{1}; + size_t total_payload_length{first_buffer.payload_length}; + do { + if(sequence_length >= buffer.capacity()) { + return CommandReassembleResult::SEQUENCE_LENGTH_TOO_LONG; + } + + if(!buffer.slot_set(sequence_length)) { + return CommandReassembleResult::NO_COMMANDS_PENDING; /* we need more packets */ + } + + auto& packet = buffer.slot_value(sequence_length++); + total_payload_length += packet.payload_length; + if(packet.packet_flags & PacketFlag::Fragmented) { + /* yep we find the end */ + break; + } + } while(true); + + /* ok we have all fragments lets reassemble */ + /* + * Packet sequence could never be so long. If it is so then the data_length() returned an invalid value. + * We're checking it here because we dont want to make a huge allocation + */ + assert(total_payload_length < 512 * 1024 * 1024); + + rcommand.reset(ReassembledCommand::allocate(total_payload_length)); + char* packet_buffer_ptr = rcommand->command(); + size_t packet_count{0}; + + packet_flags = buffer.slot_value(0).packet_flags; + while(packet_count < sequence_length) { + auto fragment = buffer.pop_front(); + memcpy(packet_buffer_ptr, fragment.payload.data_ptr(), fragment.payload_length); + + packet_buffer_ptr += fragment.payload_length; + packet_count++; + } + + /* We don't have log functions for our TeaClient */ +#if !defined(_NDEBUG) && defined(FEATURE_LOGGING) + if((packet_buffer_ptr - 1) != &rcommand->command()[rcommand->length() - 1]) { + logCritical(0, + "Buffer over/underflow: packet_buffer_ptr != &packet_buffer[packet_buffer.length() - 1]; packet_buffer_ptr := {}; packet_buffer.end() := {}", + (void*) packet_buffer_ptr, + (void*) &rcommand->command()[rcommand->length() - 1] + ); + } +#endif + } else { + auto packet = buffer.pop_front(); + packet_flags = packet.packet_flags; + + rcommand.reset(ReassembledCommand::allocate(packet.payload_length)); + memcpy(rcommand->command(), packet.payload.data_ptr(), packet.payload_length); + } + + auto more_commands_pending = buffer.front_set(); /* set the more flag if we have more to process */ + buffer_lock.unlock(); + + if(packet_flags & PacketFlag::Compressed) { + std::string error{}; + + auto compressed_command = std::move(rcommand); + auto decompressed_size = compression::qlz_decompressed_size(compressed_command->command(), compressed_command->length()); + if(decompressed_size > 64 * 1024 * 1024) { + return CommandReassembleResult::COMMAND_TOO_LARGE; + } + + rcommand.reset(ReassembledCommand::allocate(decompressed_size)); + if(!compression::qlz_decompress_payload(compressed_command->command(), rcommand->command(), &decompressed_size)) { + return CommandReassembleResult::COMMAND_DECOMPRESS_FAILED; + } + + rcommand->set_length(decompressed_size); + } + + assembled_command = rcommand.release(); + return more_commands_pending ? CommandReassembleResult::MORE_COMMANDS_PENDING : CommandReassembleResult::SUCCESS; +} \ No newline at end of file diff --git a/src/protocol/PacketDecoder.h b/src/protocol/PacketDecoder.h new file mode 100644 index 0000000..a98647e --- /dev/null +++ b/src/protocol/PacketDecoder.h @@ -0,0 +1,94 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "./RawCommand.h" + +namespace ts::connection { + class CryptHandler; +} + +namespace ts::stats { + class ConnectionStatistics; +} + +namespace ts::protocol { + enum struct PacketProcessResult { + SUCCESS, + UNKNOWN_ERROR, + + FUZZ_DROPPED, + + DUPLICATED_PACKET, /* error message contains debug properties */ + BUFFER_OVERFLOW, /* error message contains debug properties */ + BUFFER_UNDERFLOW, /* error message contains debug properties */ + + COMMAND_BUFFER_OVERFLOW, /* can cause a total connection drop */ + COMMAND_SEQUENCE_LENGTH_TOO_LONG, /* unrecoverable error */ + COMMAND_TOO_LARGE, + COMMAND_DECOMPRESS_FAILED, + + DECRYPT_KEY_GEN_FAILED, + DECRYPT_FAILED, /* has custom message */ + }; + + enum struct CommandReassembleResult { + SUCCESS, + + MORE_COMMANDS_PENDING, /* equal with success */ + NO_COMMANDS_PENDING, + + COMMAND_TOO_LARGE, /* this is a fatal error to the connection */ + COMMAND_DECOMPRESS_FAILED, + + SEQUENCE_LENGTH_TOO_LONG /* unrecoverable error */ + }; + + class PacketDecoder { + using CommandFragment = command::CommandFragment; + using ReassembledCommand = command::ReassembledCommand; + + typedef protocol::FullPacketRingBuffer command_fragment_buffer_t; + typedef std::array command_packet_reassembler; + public: + /* direct function calls are better optimized out */ + typedef void(*callback_decoded_packet_t)(void* /* cb argument */, const protocol::PacketParser&); + typedef void(*callback_decoded_command_t)(void* /* cb argument */, ReassembledCommand*& /* command */); /* must move the command, else it gets freed*/ + typedef void(*callback_send_acknowledge_t)(void* /* cb argument */, uint16_t /* packet id */, bool /* is command low */); + + explicit PacketDecoder(connection::CryptHandler* /* crypt handler */); + ~PacketDecoder(); + + void reset(); + + bool verify_encryption_client_packet(const pipes::buffer_view& /* full packet */); + + /* true if commands might be pending */ + PacketProcessResult process_incoming_data(protocol::PacketParser &/* packet */, std::string& /* error detail */); + void register_initiv_packet(); + + void* callback_argument{nullptr}; + callback_decoded_packet_t callback_decoded_packet{[](auto, auto&){}}; /* needs to be valid all the time! */ + callback_decoded_command_t callback_decoded_command{[](auto, auto&){}}; /* needs to be valid all the time! */ + callback_send_acknowledge_t callback_send_acknowledge{[](auto, auto, auto){}}; /* needs to be valid all the time! */ + private: + connection::CryptHandler* crypt_handler_{nullptr}; + + spin_mutex incoming_generation_estimator_lock{}; + std::array incoming_generation_estimators{}; /* implementation is thread save */ + + std::recursive_mutex packet_buffer_lock; + command_packet_reassembler _command_fragment_buffers; + + static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) { + return packet_index & 0x1U; /* use 0 for command and 1 for command low */ + } + + PacketProcessResult decode_incoming_packet(std::string &error /* error */, protocol::PacketParser &packet_parser/* packet */); + CommandReassembleResult try_reassemble_ordered_packet(command_fragment_buffer_t& /* buffer */, std::unique_lock& /* buffer lock */, ReassembledCommand*& /* command */); + }; +} diff --git a/src/protocol/PacketStatistics.cpp b/src/protocol/PacketStatistics.cpp new file mode 100644 index 0000000..a5267f8 --- /dev/null +++ b/src/protocol/PacketStatistics.cpp @@ -0,0 +1,124 @@ +// +// Created by WolverinDEV on 06/04/2020. +// + +#include +#include "./PacketStatistics.h" + +using namespace ts::protocol; + +void PacketStatistics::received_packet(ts::protocol::PacketType type, uint32_t pid) { + std::lock_guard lock{this->data_mutex}; + switch (type) { + case protocol::PacketType::VOICE: + this->calculator_voice.packet_received(pid); + return; + case protocol::PacketType::VOICE_WHISPER: + this->calculator_voice_whisper.packet_received(pid); + return; + + case protocol::PacketType::COMMAND: + case protocol::PacketType::COMMAND_LOW: + return; + + case protocol::PacketType::ACK: + this->calculator_ack.packet_received(pid); + return; + case protocol::PacketType::ACK_LOW: + this->calculator_ack_low.packet_received(pid); + return; + case protocol::PacketType::PING: + this->calculator_ping.packet_received(pid); + return; + + default: + /* some invalid packet lul */ + return; + } +} + +void PacketStatistics::send_command(ts::protocol::PacketType type, uint32_t pid) { + std::lock_guard lock{this->data_mutex}; + if(type == protocol::PacketType::COMMAND) + this->calculator_command.packet_send(pid); + else if(type == protocol::PacketType::COMMAND_LOW) + this->calculator_command_low.packet_send(pid); +} + +void PacketStatistics::received_acknowledge(ts::protocol::PacketType type, uint32_t pid) { + std::lock_guard lock{this->data_mutex}; + if(type == protocol::PacketType::ACK) + this->calculator_command.ack_received(pid); + else if(type == protocol::PacketType::ACK_LOW) + this->calculator_command_low.ack_received(pid); +} + +PacketStatistics::PacketLossReport PacketStatistics::loss_report() const { + PacketStatistics::PacketLossReport result{}; + + result.received_voice = this->calculator_voice.received_packets() + this->calculator_voice_whisper.received_packets(); + result.lost_voice = this->calculator_voice.lost_packets() + this->calculator_voice_whisper.lost_packets(); + + result.received_keep_alive = this->calculator_ping.received_packets(); + result.lost_keep_alive = this->calculator_ping.lost_packets(); + + result.received_control = this->calculator_command.received_packets() + this->calculator_command_low.received_packets(); + result.lost_control = this->calculator_command.lost_packets() + this->calculator_command_low.lost_packets(); + //result.lost_control -= this->calculator_ack.lost_packets() + this->calculator_ack_low.lost_packets(); /* subtract the lost acks (command received but ack got lost) */ + + result.received_control += this->calculator_ack.received_packets() + this->calculator_ack_low.received_packets(); + //result.lost_control += this->calculator_ack.lost_packets() + this->calculator_ack_low.lost_packets(); /* this cancels out the line above */ + return result; +} + +void PacketStatistics::tick() { + auto now = std::chrono::system_clock::now(); + if(now + std::chrono::seconds{15} > this->last_short) { + this->last_short = now; + + std::lock_guard lock{this->data_mutex}; + this->calculator_command.short_stats(); + this->calculator_command_low.short_stats(); + + this->calculator_ack.short_stats(); + this->calculator_ack_low.short_stats(); + + this->calculator_voice.short_stats(); + this->calculator_voice_whisper.short_stats(); + + this->calculator_ping.short_stats(); + } +} + +void PacketStatistics::reset() { + std::lock_guard lock{this->data_mutex}; + this->calculator_command.reset(); + this->calculator_command_low.reset(); + + this->calculator_ack.reset(); + this->calculator_ack_low.reset(); + + this->calculator_voice.reset(); + this->calculator_voice_whisper.reset(); + + this->calculator_ping.reset(); +} + +void PacketStatistics::reset_offsets() { + std::lock_guard lock{this->data_mutex}; + this->calculator_command.reset_offsets(); + this->calculator_command_low.reset_offsets(); + + this->calculator_ack.reset_offsets(); + this->calculator_ack_low.reset_offsets(); + + this->calculator_voice.reset_offsets(); + this->calculator_voice_whisper.reset_offsets(); + + this->calculator_ping.reset_offsets(); +} + +float PacketStatistics::current_packet_loss() const { + auto report = this->loss_report(); + return report.total_loss(); +} \ No newline at end of file diff --git a/src/protocol/PacketStatistics.h b/src/protocol/PacketStatistics.h new file mode 100644 index 0000000..26449fc --- /dev/null +++ b/src/protocol/PacketStatistics.h @@ -0,0 +1,68 @@ +#pragma once + +#include "../misc/spin_mutex.h" +#include "./PacketLossCalculator.h" +#include "./Packet.h" + +namespace ts::protocol { + class PacketStatistics { + public: + struct PacketLossReport { + uint32_t lost_voice{0}; + uint32_t lost_control{0}; + uint32_t lost_keep_alive{0}; + + uint32_t received_voice{0}; + uint32_t received_control{0}; + uint32_t received_keep_alive{0}; + + [[nodiscard]] inline float voice_loss() const { + const auto total_packets = this->received_voice + this->lost_voice; + if(total_packets == 0) return 0; + return this->lost_voice / (float) total_packets; + } + [[nodiscard]] inline float control_loss() const { + const auto total_packets = this->received_control + this->lost_control; + //if(total_packets == 0) return 0; /* not possible so remove this to speed it up */ + return this->lost_control / (float) total_packets; + } + [[nodiscard]] inline float keep_alive_loss() const { + const auto total_packets = this->received_keep_alive + this->lost_keep_alive; + if(total_packets == 0) return 0; + return this->lost_keep_alive / (float) total_packets; + } + + [[nodiscard]] inline float total_loss() const { + const auto total_lost = this->lost_voice + this->lost_control + this->lost_keep_alive; + const auto total_received = this->received_control + this->received_voice + this->received_keep_alive; + //if(total_received + total_lost == 0) return 0; /* not possible to speed this up */ + return total_lost / (float) (total_lost + total_received); + } + }; + + [[nodiscard]] PacketLossReport loss_report() const; + [[nodiscard]] float current_packet_loss() const; + + void send_command(protocol::PacketType /* type */, uint32_t /* packet id */); + void received_acknowledge(protocol::PacketType /* type */, uint32_t /* packet id */); + + void received_packet(protocol::PacketType /* type */, uint32_t /* packet id */); + void tick(); + void reset(); + void reset_offsets(); + private: + std::chrono::system_clock::time_point last_short{}; + + spin_mutex data_mutex{}; + protocol::UnorderedPacketLossCalculator calculator_voice_whisper{}; + protocol::UnorderedPacketLossCalculator calculator_voice{}; + + protocol::UnorderedPacketLossCalculator calculator_ack_low{}; + protocol::UnorderedPacketLossCalculator calculator_ack{}; + + protocol::UnorderedPacketLossCalculator calculator_ping{}; + + protocol::CommandPacketLossCalculator calculator_command{}; + protocol::CommandPacketLossCalculator calculator_command_low{}; + }; +} \ No newline at end of file diff --git a/src/protocol/PingHandler.cpp b/src/protocol/PingHandler.cpp new file mode 100644 index 0000000..4297f6f --- /dev/null +++ b/src/protocol/PingHandler.cpp @@ -0,0 +1,76 @@ +// +// Created by WolverinDEV on 11/03/2020. +// + +#include "./PingHandler.h" + +using namespace ts::server::server::udp; + +void PingHandler::reset() { + this->last_ping_id = 0; + this->current_ping_ = std::chrono::milliseconds{0}; + + this->last_recovery_command_send = std::chrono::system_clock::time_point{}; + this->last_command_acknowledge_ = std::chrono::system_clock::time_point{}; + + this->last_response_ = std::chrono::system_clock::time_point{}; + this->last_request_ = std::chrono::system_clock::time_point{}; +} + +void PingHandler::received_pong(uint16_t ping_id) { + if(this->last_ping_id != ping_id) return; + + auto now = std::chrono::system_clock::now(); + this->current_ping_ = std::chrono::floor(now - this->last_request_); + + this->last_response_ = now; + this->last_command_acknowledge_ = now; /* That's here for purpose!*/ +} + +void PingHandler::received_command_acknowledged() { + this->last_command_acknowledge_ = std::chrono::system_clock::now(); +} + +void PingHandler::tick(const std::chrono::system_clock::time_point& now) { + if(this->last_request_ + PingHandler::kPingRequestInterval < now) { + this->send_ping_request(); /* may update last_response_ */ + } + + if(this->last_response_ + PingHandler::kPingTimeout < now) { + if(this->last_recovery_command_send + PingHandler::kRecoveryRequestInterval < now) { + this->send_recovery_request(); + } + + if(this->last_command_acknowledge_ + PingHandler::kRecoveryTimeout < now) { + if(auto callback{this->callback_time_outed}; callback) { + callback(this->callback_argument); + } + } + } +} + +void PingHandler::send_ping_request() { + auto now = std::chrono::system_clock::now(); + if(this->last_response_.time_since_epoch().count() == 0) { + this->last_response_ = now; + } + + this->last_request_ = now; + + if(auto callback{this->callback_send_ping}; callback) { + callback(this->callback_argument, this->last_ping_id); + } +} + +void PingHandler::send_recovery_request() { + auto now = std::chrono::system_clock::now(); + if(this->last_command_acknowledge_.time_since_epoch().count() == 0) { + this->last_command_acknowledge_ = now; + } + + this->last_recovery_command_send = now; + + if(auto callback{this->callback_send_recovery_command}; callback) { + callback(this->callback_argument); + } +} diff --git a/src/protocol/PingHandler.h b/src/protocol/PingHandler.h new file mode 100644 index 0000000..b4187a7 --- /dev/null +++ b/src/protocol/PingHandler.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include + +namespace ts::server::server::udp { + class PingHandler { + public: + typedef void(*callback_time_outed_t)(void* /* cb data */); + typedef void(*callback_send_ping_t)(void* /* cb data */, uint16_t& /* ping id */); + typedef void(*callback_send_recovery_command_t)(void* /* cb data */); + + void reset(); + + void tick(const std::chrono::system_clock::time_point&); + void received_pong(uint16_t /* ping id */); + void received_command_acknowledged(); + + [[nodiscard]] inline std::chrono::milliseconds current_ping() const { return this->current_ping_; } + [[nodiscard]] inline std::chrono::system_clock::time_point last_ping_response() const { return this->last_response_; } + [[nodiscard]] inline std::chrono::system_clock::time_point last_command_acknowledged() const { return this->last_command_acknowledge_; } + + void* callback_argument{nullptr}; + callback_send_ping_t callback_send_ping{nullptr}; + callback_send_recovery_command_t callback_send_recovery_command{nullptr}; + callback_time_outed_t callback_time_outed{nullptr}; + private: + constexpr static std::chrono::milliseconds kPingRequestInterval{1000}; + constexpr static std::chrono::milliseconds kPingTimeout{15 * 1000}; + + constexpr static std::chrono::milliseconds kRecoveryRequestInterval{1000}; + constexpr static std::chrono::milliseconds kRecoveryTimeout{15 * 1000}; + + std::chrono::milliseconds current_ping_{0}; + + uint16_t last_ping_id{0}; + std::chrono::system_clock::time_point last_response_{}; + std::chrono::system_clock::time_point last_request_{}; + + std::chrono::system_clock::time_point last_command_acknowledge_{}; + std::chrono::system_clock::time_point last_recovery_command_send{}; + + void send_ping_request(); + void send_recovery_request(); + }; +} diff --git a/src/protocol/RawCommand.cpp b/src/protocol/RawCommand.cpp new file mode 100644 index 0000000..f692ba5 --- /dev/null +++ b/src/protocol/RawCommand.cpp @@ -0,0 +1,19 @@ +// +// Created by WolverinDEV on 28/01/2021. +// + +#include "./RawCommand.h" + +using namespace ts::command; + +ReassembledCommand *ReassembledCommand::allocate(size_t size) { + auto instance = (ReassembledCommand*) malloc(sizeof(ReassembledCommand) + size); + instance->length_ = size; + instance->capacity_ = size; + instance->next_command = nullptr; + return instance; +} + +void ReassembledCommand::free(ReassembledCommand *command) { + ::free(command); +} \ No newline at end of file diff --git a/src/protocol/RawCommand.h b/src/protocol/RawCommand.h new file mode 100644 index 0000000..0e0c421 --- /dev/null +++ b/src/protocol/RawCommand.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include + +namespace ts::command { + struct CommandFragment { + uint16_t packet_id{0}; + uint16_t packet_generation{0}; + + uint8_t packet_flags{0}; + uint32_t payload_length : 24; + pipes::buffer payload{}; + + CommandFragment() : payload_length{0} { } + CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload) + : packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags}, payload_length{payloadLength}, payload{std::move(payload)} {} + + CommandFragment& operator=(const CommandFragment&) = default; + CommandFragment(const CommandFragment& other) = default; + CommandFragment(CommandFragment&&) = default; + }; + static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer)); + + struct ReassembledCommand { + public: + static ReassembledCommand* allocate(size_t /* command length */); + static void free(ReassembledCommand* /* command */); + + [[nodiscard]] inline size_t length() const { return this->length_; } + inline void set_length(size_t length) { assert(this->capacity_ >= length); this->length_ = length; } + + [[nodiscard]] inline size_t capacity() const { return this->capacity_; } + + [[nodiscard]] inline const char* command() const { return (const char*) this + sizeof(ReassembledCommand); } + [[nodiscard]] inline char* command() { return (char*) this + sizeof(ReassembledCommand); } + + [[nodiscard]] inline std::string_view command_view() const { return std::string_view{this->command(), this->length()}; } + + mutable ReassembledCommand* next_command; /* nullptr by default */ + private: + explicit ReassembledCommand() = default; + + size_t capacity_; + size_t length_; + }; +} \ No newline at end of file