diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index a330e99..405185c 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -49,7 +49,6 @@ set(SERVER_SOURCE_FILES src/client/voice/VoiceClientHandschake.cpp src/client/voice/VoiceClientCommandHandler.cpp src/client/voice/VoiceClientPacketHandler.cpp - src/client/voice/VoiceClientView.cpp src/TS3ServerClientManager.cpp src/VirtualServer.cpp src/TS3ServerHeartbeat.cpp @@ -161,7 +160,7 @@ if (COMPILE_WEB_CLIENT) src/client/web/WSWebClient.cpp src/client/web/SampleHandler.cpp src/client/web/VoiceBridge.cpp - src/client/command_handler/helpers.h src/music/PlaylistPermissions.cpp src/music/PlaylistPermissions.h src/lincense/LicenseService.cpp src/lincense/LicenseService.h src/groups/GroupManager.cpp src/groups/GroupManager.h src/groups/GroupAssignmentManager.cpp src/groups/GroupAssignmentManager.h src/groups/Group.cpp src/groups/Group.h src/services/VirtualServerInformation.cpp src/services/VirtualServerInformation.h src/vserver/VirtualServerManager.cpp src/vserver/VirtualServerManager.h src/services/VirtualServerBroadcastService.cpp src/services/VirtualServerBroadcastService.h src/server/udp-server/UDPServer.cpp src/server/udp-server/UDPServer.h src/client/voice/PacketEncoder.cpp src/client/voice/PacketEncoder.h src/client/voice/PacketDecoder.cpp src/client/voice/PacketDecoder.h) + src/client/command_handler/helpers.h src/music/PlaylistPermissions.cpp src/music/PlaylistPermissions.h src/lincense/LicenseService.cpp src/lincense/LicenseService.h src/groups/GroupManager.cpp src/groups/GroupManager.h src/groups/GroupAssignmentManager.cpp src/groups/GroupAssignmentManager.h src/groups/Group.cpp src/groups/Group.h src/services/VirtualServerInformation.cpp src/services/VirtualServerInformation.h src/vserver/VirtualServerManager.cpp src/vserver/VirtualServerManager.h src/services/VirtualServerBroadcastService.cpp src/services/VirtualServerBroadcastService.h src/server/udp-server/UDPServer.cpp src/server/udp-server/UDPServer.h src/client/voice/PacketEncoder.cpp src/client/voice/PacketEncoder.h src/client/voice/PacketDecoder.cpp src/client/voice/PacketDecoder.h src/client/voice/PingHandler.cpp src/client/voice/PingHandler.h) endif () add_executable(PermHelper helpers/permgen.cpp) diff --git a/server/src/client/voice/PacketDecoder.cpp b/server/src/client/voice/PacketDecoder.cpp index b445346..71e7b51 100644 --- a/server/src/client/voice/PacketDecoder.cpp +++ b/server/src/client/voice/PacketDecoder.cpp @@ -12,4 +12,285 @@ #include "../../ConnectionStatistics.h" using namespace ts; -using namespace ts::server::server::udp; \ No newline at end of file +using namespace ts::server::server::udp; + +PacketDecoder::PacketDecoder(ts::connection::CryptHandler *crypt_handler) + : crypt_handler_{crypt_handler} { + memtrack::allocated(this); +} + +PacketDecoder::~PacketDecoder() { + memtrack::freed(this); + this->reset(); +} + +void PacketDecoder::reset() { + std::lock_guard buffer_lock(this->packet_buffer_lock); + for(auto& buffer : this->_command_fragment_buffers) + buffer.reset(); +} + +bool PacketDecoder::decode_incoming_data(const pipes::buffer_view &buffer) { + std::string error{}; + bool needs_command_reassemble{false}; + + auto result = this->decode_incoming_data_(error, needs_command_reassemble, buffer); + if(result != PacketDecodeResult::SUCCESS) + if(auto callback{this->callback_decode_failed}; callback) + callback(this->callback_argument, result, error); + + return needs_command_reassemble; +} + +PacketDecodeResult PacketDecoder::decode_incoming_data_(std::string& error, bool& commands_pending, const pipes::buffer_view &buffer) { +#ifdef FUZZING_TESTING_INCOMMING + #ifdef FIZZING_TESTING_DISABLE_HANDSHAKE + if (this->client->state == ConnectionState::CONNECTED) { + #endif + if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) { + debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping incoming packet of length {}", CLIENT_STR_LOG_PREFIX_(this->client), buffer.length()); + return; + } + #ifdef FIZZING_TESTING_DISABLE_HANDSHAKE + } + #endif +#endif + + ClientPacketParser packet_parser{buffer}; + if(!packet_parser.valid()) + return PacketDecodeResult::INVALID_PACKET; + + assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size()); + packet_parser.set_estimated_generation(this->incoming_generation_estimators[packet_parser.type()].visit_packet(packet_parser.packet_id())); + + auto is_command = packet_parser.type() == protocol::COMMAND || packet_parser.type() == protocol::COMMAND_LOW; + /* pretest if the packet is worth the effort of decoding it */ + if(is_command) { + /* handle the order stuff */ + auto& fragment_buffer = this->_command_fragment_buffers[PacketDecoder::command_fragment_buffer_index(packet_parser.type())]; + + unique_lock queue_lock(fragment_buffer.buffer_lock); + auto result = fragment_buffer.accept_index(packet_parser.packet_id()); + if(result != 0) { /* packet index is ahead buffer index */ + error = "pid: " + std::to_string(packet_parser.packet_id()) + ","; + error += "bidx: " + std::to_string(fragment_buffer.current_index()) + ","; + error += "bcap: " + std::to_string(fragment_buffer.capacity()); + + if(result == -1) { /* underflow */ + /* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */ + this->callback_send_acknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW); + return PacketDecodeResult::DUPLICATED_PACKET; + } + + return PacketDecodeResult::BUFFER_OVERFLOW; + } + } + + //NOTICE I found out that the Compressed flag is set if the packet contains an audio header + /* decrypt the packet if needed */ + if(packet_parser.is_encrypted()) { + CryptHandler::key_t crypt_key{}; + CryptHandler::nonce_t crypt_nonce{}; + + auto data = (uint8_t*) packet_parser.mutable_data_ptr(); + bool use_default_key{!this->protocol_encrypted}, decrypt_result; + + decrypt_packet: + if(use_default_key) { + crypt_key = CryptHandler::default_key; + crypt_nonce = CryptHandler::default_nonce; + } else { + if(!this->crypt_handler_->generate_key_nonce(true, packet_parser.type(), packet_parser.packet_id(), packet_parser.estimated_generation(), crypt_key, crypt_nonce)) + return PacketDecodeResult::DECRYPT_KEY_GEN_FAILED; + } + + decrypt_result = this->crypt_handler_->decrypt( + data + ClientPacketParser::kHeaderOffset, ClientPacketParser::kHeaderLength, + data + ClientPacketParser::kPayloadOffset, packet_parser.payload_length(), + data, + crypt_key, crypt_nonce, + error + ); + + if(!decrypt_result) { + if(packet_parser.packet_id() < 10 && packet_parser.estimated_generation() == 0) { + if(use_default_key) { + return PacketDecodeResult::DECRYPT_FAILED; + } else { + use_default_key = true; + goto decrypt_packet; + } + } else { + return PacketDecodeResult::DECRYPT_FAILED; + } + } + packet_parser.set_decrypted(); + } + + if(auto statistics{this->statistics_}; statistics) + statistics->logIncomingPacket(stats::ConnectionStatistics::category::from_type(packet_parser.type()), buffer.length()); + +#ifdef LOG_INCOMPING_PACKET_FRAGMENTS + debugMessage(lstream << CLIENT_LOG_PREFIX << "Recived packet. PacketId: " << packet->packetId() << " PacketType: " << packet->type().name() << " Flags: " << packet->flags() << " - " << packet->data() << endl); +#endif + if(is_command) { + auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())]; + CommandFragment fragment_entry{ + packet_parser.packet_id(), + packet_parser.estimated_generation(), + + packet_parser.flags(), + (uint32_t) packet_parser.payload_length(), + packet_parser.payload().own_buffer() + }; + + { + unique_lock queue_lock(fragment_buffer.buffer_lock); + + if(!fragment_buffer.insert_index(packet_parser.packet_id(), std::move(fragment_entry))) + return PacketDecodeResult::COMMAND_INSTERT_FAILED; + } + + this->callback_send_acknowledge(this->callback_argument, packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW); + commands_pending = true; + } else { + this->callback_decoded_packet(this->callback_argument, packet_parser); + } +} + +bool PacketDecoder::verify_encryption(const pipes::buffer_view &buffer) { + ClientPacketParser packet_parser{buffer}; + if(!packet_parser.valid() || !packet_parser.is_encrypted()) return false; + + assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size()); + return this->crypt_handler_->verify_encryption(buffer, packet_parser.packet_id(), this->incoming_generation_estimators[packet_parser.type()].generation()); +} + +CommandReassembleResult PacketDecoder::reassemble_command(pipes::buffer &result, bool &is_command_low) { + bool more_commands_pending{false}; + command_fragment_buffer_t* buffer{nullptr}; + unique_lock buffer_lock; /* general buffer lock */ + + { + //FIXME: Currently command low packets cant be handled if there is a command packet stuck in reassemble queue + + /* handle commands before command low packets */ + for(auto& buf : this->_command_fragment_buffers) { + unique_lock ring_lock(buf.buffer_lock, try_to_lock); + if(!ring_lock.owns_lock()) continue; + + if(buf.front_set()) { + if(!buffer) { /* lets still test for reexecute */ + buffer_lock = move(ring_lock); + buffer = &buf; + } else { + more_commands_pending = true; + break; + } + } + } + } + + if(!buffer) + return CommandReassembleResult::NO_COMMANDS_PENDING; + + uint8_t packet_flags{0}; + pipes::buffer payload{}; + + /* lets find out if we've to reassemble the packet */ + auto& first_buffer = buffer->slot_value(0); + if(first_buffer.packet_flags & PacketFlag::Fragmented) { + uint16_t sequence_length{1}; + size_t total_payload_length{first_buffer.payload_length}; + do { + if(sequence_length >= buffer->capacity()) + return CommandReassembleResult::SEQUENCE_LENGTH_TOO_LONG; + + if(!buffer->slot_set(sequence_length)) + return CommandReassembleResult::NO_COMMANDS_PENDING; /* we need more packets */ + + auto& packet = buffer->slot_value(sequence_length++); + total_payload_length += packet.payload_length; + if(packet.packet_flags & PacketFlag::Fragmented) { + /* yep we find the end */ + break; + } + } while(true); + /* ok we have all fragments lets reassemble */ + + /* + * Packet sequence could never be so long. If it is so then the data_length() returned an invalid value. + * We're checking it here because we dont want to make a huge allocation + */ + assert(total_payload_length < 512 * 1024 * 1024); + + pipes::buffer packet_buffer{total_payload_length}; + char* packet_buffer_ptr = &packet_buffer[0]; + size_t packet_count{0}; + + packet_flags = buffer->slot_value(0).packet_flags; + while(packet_count < sequence_length) { + auto fragment = buffer->pop_front(); + memcpy(packet_buffer_ptr, fragment.payload.data_ptr(), fragment.payload_length); + + packet_buffer_ptr += fragment.payload_length; + packet_count++; + } + +#ifndef _NDEBUG + if((packet_buffer_ptr - 1) != &packet_buffer[packet_buffer.length() - 1]) { + logCritical(0, + "Buffer over/underflow: packet_buffer_ptr != &packet_buffer[packet_buffer.length() - 1]; packet_buffer_ptr := {}; packet_buffer.end() := {}", + (void*) packet_buffer_ptr, + (void*) &packet_buffer[packet_buffer.length() - 1] + ); + } +#endif + payload = packet_buffer; + } else { + auto packet = buffer->pop_front(); + packet_flags = packet.packet_flags; + payload = packet.payload; + } + + more_commands_pending |= buffer->front_set(); /* set the more flag if we have more to process */ + buffer_lock.unlock(); + + if(packet_flags & PacketFlag::Compressed) { + std::string error{}; + + auto decompressed_size = compression::qlz_decompressed_size(payload.data_ptr(), payload.length()); + auto uncompressed_buffer = buffer::allocate_buffer(decompressed_size); + if(!compression::qlz_decompress_payload(payload.data_ptr(), uncompressed_buffer.data_ptr(), &decompressed_size)) + return CommandReassembleResult::COMMAND_DECOMPRESS_FAILED; + + payload = uncompressed_buffer.range(0, decompressed_size); + } + + result = std::move(payload); + return more_commands_pending ? CommandReassembleResult::MORE_COMMANDS_PENDING : CommandReassembleResult::SUCCESS; +} + +void PacketDecoder::force_insert_command(const pipes::buffer_view &buffer) { + CommandFragment fragment_entry{ + 0, + 0, + + PacketFlag::Unencrypted, + (uint32_t) buffer.length(), + buffer.own_buffer() + }; + + + { + auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)]; + unique_lock queue_lock(fragment_buffer.buffer_lock); + fragment_buffer.push_front(std::move(fragment_entry)); + } +} + +void PacketDecoder::register_initiv_packet() { + auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)]; + unique_lock buffer_lock(fragment_buffer.buffer_lock); + fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */ +} \ No newline at end of file diff --git a/server/src/client/voice/PacketDecoder.h b/server/src/client/voice/PacketDecoder.h index e48064f..2fbab17 100644 --- a/server/src/client/voice/PacketDecoder.h +++ b/server/src/client/voice/PacketDecoder.h @@ -36,18 +36,59 @@ namespace ts::server::server::udp { }; static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer)); + enum struct PacketDecodeResult { + SUCCESS, + + INVALID_PACKET, + DUPLICATED_PACKET, /* error message contains debug properties */ + + BUFFER_OVERFLOW, /* error message contains debug properties */ + + DECRYPT_KEY_GEN_FAILED, + DECRYPT_FAILED, /* has custom message */ + + COMMAND_INSTERT_FAILED + }; + + enum struct CommandReassembleResult { + SUCCESS, + + MORE_COMMANDS_PENDING, /* equal with success */ + NO_COMMANDS_PENDING, + + COMMAND_TOO_LARGE, /* this is a fatal error to the connection */ + COMMAND_DECOMPRESS_FAILED, + + SEQUENCE_LENGTH_TOO_LONG /* unrecoverable error */ + }; + class PacketDecoder { typedef protocol::PacketRingBuffer command_fragment_buffer_t; typedef std::array command_packet_reassembler; public: - typedef std::function callback_decoded_packet_t; + //typedef std::function callback_decoded_packet_t; + //typedef std::function callback_send_acknowledge_t; + //typedef std::function callback_decode_failed_t; + /* gets better optimized out */ + typedef void(*callback_decoded_packet_t)(void* /* cb argument */, const protocol::ClientPacketParser&); + typedef void(*callback_send_acknowledge_t)(void* /* cb argument */, uint16_t /* packet id */, bool /* is command low */); + typedef void(*callback_decode_failed_t)(void* /* cb argument */, PacketDecodeResult /* error */, const std::string& /* custom message */); - PacketDecoder(connection::CryptHandler* /* crypt handler */, connection::CompressionHandler* /* compress handler */, connection::AcknowledgeManager* /* acknowledge handler */); + explicit PacketDecoder(connection::CryptHandler* /* crypt handler */); ~PacketDecoder(); void reset(); - void decode_incoming_data(const pipes::buffer_view &/* buffer */); + bool verify_encryption(const pipes::buffer_view& /* full packet */); + + /* true if commands might be pending */ + bool decode_incoming_data(const pipes::buffer_view &/* buffer */); + + /* This method is not thread save! Only one concurrent call supported */ + CommandReassembleResult reassemble_command(pipes::buffer& /* result */, bool& /* is command low */); + void force_insert_command(const pipes::buffer_view& /* payload */); + + void register_initiv_packet(); [[nodiscard]] inline std::shared_ptr get_statistics() { return this->statistics_; } inline void set_statistics(const std::shared_ptr& stats) { this->statistics_ = stats; } @@ -55,18 +96,25 @@ namespace ts::server::server::udp { [[nodiscard]] inline bool is_protocol_encrypted() const { return this->protocol_encrypted; } void set_protocol_encrypted(bool flag) { this->protocol_encrypted = flag; } - callback_decoded_packet_t callback_decoded_packet{}; + void* callback_argument{nullptr}; + callback_decoded_packet_t callback_decoded_packet{[](auto, auto&){}}; /* needs to be valid all the time! */ + callback_send_acknowledge_t callback_send_acknowledge{[](auto, auto, auto){}}; /* needs to be valid all the time! */ + callback_decode_failed_t callback_decode_failed{nullptr}; private: bool protocol_encrypted{false}; std::shared_ptr statistics_{nullptr}; connection::CryptHandler* crypt_handler_{nullptr}; - connection::CompressionHandler* compress_handler_{nullptr}; - connection::AcknowledgeManager* acknowledge_handler_{nullptr}; std::array incoming_generation_estimators{}; /* implementation is thread save */ std::recursive_mutex packet_buffer_lock; command_packet_reassembler _command_fragment_buffers; + + static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) { + return packet_index & 0x1U; /* use 0 for command and 1 for command low */ + } + + PacketDecodeResult decode_incoming_data_(std::string& /* error */, bool& /* needs command reassemble */, const pipes::buffer_view &/* buffer */); }; } \ No newline at end of file diff --git a/server/src/client/voice/PacketEncoder.cpp b/server/src/client/voice/PacketEncoder.cpp index 1418880..adf2357 100644 --- a/server/src/client/voice/PacketEncoder.cpp +++ b/server/src/client/voice/PacketEncoder.cpp @@ -57,13 +57,13 @@ bool PacketEncoder::encode_packet(const std::shared_ptr auto encode_result = this->encode_packet_(error, buffers, packet, work_lock); if(encode_result != PacketEncodeResult::SUCCESS) { if(auto callback{this->callback_encode_failed}; callback) - callback(original_packet, encode_result, error); + callback(this->callback_argument, original_packet, encode_result, error); goto sync_cleanup_exit; } } if(auto callback{this->callback_encoded_buffers}; callback) - callback(buffers); + callback(this->callback_argument, buffers); sync_cleanup_exit: this->process_count--; /* we're now done preparing */ @@ -110,7 +110,7 @@ bool PacketEncoder::do_encode() { if(auto errc = this->encode_packet_(error, buffers, packet, work_lock); errc != PacketEncodeResult::SUCCESS) { if(auto callback{this->callback_encode_failed}; callback) - callback(packet, errc, error); + callback(this->callback_argument, packet, errc, error); if(flag_more) break; else @@ -126,7 +126,7 @@ bool PacketEncoder::do_encode() { /* enqueue buffers for write */ if(!buffers.empty()) { if(auto callback{this->callback_encoded_buffers}; callback) - callback(buffers); + callback(this->callback_argument, buffers); } this->process_count--; /* we're now done preparing */ diff --git a/server/src/client/voice/PacketEncoder.h b/server/src/client/voice/PacketEncoder.h index a5d75c4..ca542a4 100644 --- a/server/src/client/voice/PacketEncoder.h +++ b/server/src/client/voice/PacketEncoder.h @@ -68,8 +68,11 @@ namespace ts::server::server::udp { sync = 0x02 /* directly process the packet */ }; - typedef std::function& /* buffers */)> callback_encoded_buffers_t; - typedef std::function &/* the packet */, PacketEncodeResult& /* error */, std::string& /* custom message */)> callback_encode_failed_t; + //typedef std::function& /* buffers */)> callback_encoded_buffers_t; + //typedef std::function &/* the packet */, PacketEncodeResult /* error */, std::string& /* custom message */)> callback_encode_failed_t; + /* gets better optimized out */ + typedef void(*callback_encoded_buffers_t)(void* /* cb argument */, const std::vector& /* buffers */); + typedef void(*callback_encode_failed_t)(void* /* cb argument */, const std::shared_ptr &/* the packet */, PacketEncodeResult /* error */, const std::string& /* custom message */); PacketEncoder(connection::CryptHandler* /* crypt handler */, connection::CompressionHandler* /* compress handler */, connection::AcknowledgeManager* /* acknowledge handler */); ~PacketEncoder(); @@ -86,6 +89,7 @@ namespace ts::server::server::udp { [[nodiscard]] inline bool is_protocol_encrypted() const { return this->protocol_encrypted; } void set_protocol_encrypted(bool flag) { this->protocol_encrypted = flag; } + void* callback_argument{nullptr}; callback_encoded_buffers_t callback_encoded_buffers{}; callback_encode_failed_t callback_encode_failed{}; private: diff --git a/server/src/client/voice/PingHandler.cpp b/server/src/client/voice/PingHandler.cpp new file mode 100644 index 0000000..a41e692 --- /dev/null +++ b/server/src/client/voice/PingHandler.cpp @@ -0,0 +1,7 @@ +// +// Created by WolverinDEV on 11/03/2020. +// + +#include "PingHandler.h" + +using namespace ts::server::server::udp; \ No newline at end of file diff --git a/server/src/client/voice/PingHandler.h b/server/src/client/voice/PingHandler.h new file mode 100644 index 0000000..34fe173 --- /dev/null +++ b/server/src/client/voice/PingHandler.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include + +namespace ts::server::server::udp { + class PingHandler { + public: + typedef void(*callback_time_outed_t)(void* /* cb data */); + typedef void(*callback_send_ping_t)(void* /* cb data */, uint16_t& /* ping id */); + typedef void(*callback_send_recovery_command_t)(void* /* cb data */); + + void reset(); + + void tick(); + void received_ping(uint16_t /* ping id */); + + void command_packet_acknowledged(); + + [[nodiscard]] inline std::chrono::milliseconds current_ping() const { return this->current_ping_; } + + void* callback_argument{nullptr}; + callback_send_ping_t callback_send_ping{nullptr}; + callback_send_recovery_command_t callback_send_recovery_command{nullptr}; + callback_time_outed_t callback_time_outed{nullptr}; + private: + uint16_t last_ping_id{0}; + std::chrono::milliseconds current_ping_{0};; + std::chrono::system_clock::time_point last_response_{}; + std::chrono::system_clock::time_point last_request_{}; + + std::chrono::system_clock::time_point last_command_packet_{}; + std::chrono::system_clock::time_point last_recovery_command_send{}; + }; +} \ No newline at end of file diff --git a/server/src/client/voice/VoiceClient.cpp b/server/src/client/voice/VoiceClient.cpp index 36b1193..d7fa6e3 100644 --- a/server/src/client/voice/VoiceClient.cpp +++ b/server/src/client/voice/VoiceClient.cpp @@ -82,6 +82,7 @@ void VoiceClient::sendAcknowledge(uint16_t packetId, bool low) { void VoiceClient::tick(const std::chrono::system_clock::time_point &time) { SpeakingClient::tick(time); + this->connection->tick(); { ALARM_TIMER(A1, "VoiceClient::tick", milliseconds(3)); if(this->state == ConnectionState::CONNECTED) { diff --git a/server/src/client/voice/VoiceClient.h b/server/src/client/voice/VoiceClient.h index ec84452..5a3d49d 100644 --- a/server/src/client/voice/VoiceClient.h +++ b/server/src/client/voice/VoiceClient.h @@ -59,7 +59,6 @@ namespace ts { /* Note: Order is only guaranteed if progressDirectly is on! */ virtual void sendCommand0(const std::string_view& /* data */, bool low = false, bool progressDirectly = false, std::unique_ptr> listener = nullptr); - virtual void sendAcknowledge(uint16_t packetId, bool low = false); connection::VoiceClientConnection* getConnection(){ return connection; } std::shared_ptr getVoiceServer(){ return voice_server; } @@ -74,11 +73,6 @@ namespace ts { virtual void tick(const std::chrono::system_clock::time_point &time) override; void handlePacketCommand(const pipes::buffer_view&); - void handlePacketAck(const protocol::ClientPacketParser&); - void handlePacketVoice(const protocol::ClientPacketParser&); - void handlePacketPing(const protocol::ClientPacketParser&); - void handlePacketInit(const protocol::ClientPacketParser&); - //Handshake helpers @@ -89,9 +83,6 @@ namespace ts { protected: virtual command_result handleCommand(Command &command) override; - //Some helper method - void sendPingRequest(); - //Ping/pong uint16_t lastPingId = 0; std::chrono::milliseconds ping = std::chrono::milliseconds(0); @@ -119,7 +110,6 @@ namespace ts { struct { bool client_init = false; bool new_protocol = false; - bool protocol_encrypted = false; uint32_t client_time = 0; std::string alpha; diff --git a/server/src/client/voice/VoiceClientCommandHandler.cpp b/server/src/client/voice/VoiceClientCommandHandler.cpp index a27499e..f4741cf 100644 --- a/server/src/client/voice/VoiceClientCommandHandler.cpp +++ b/server/src/client/voice/VoiceClientCommandHandler.cpp @@ -1,12 +1,11 @@ #include -#include #include #include #include +#include #include #include "../../InstanceHandler.h" -#include "../../geo/GeoLocation.h" #include "VoiceClient.h" using namespace std; diff --git a/server/src/client/voice/VoiceClientConnection.cpp b/server/src/client/voice/VoiceClientConnection.cpp index 4932b71..61d9121 100644 --- a/server/src/client/voice/VoiceClientConnection.cpp +++ b/server/src/client/voice/VoiceClientConnection.cpp @@ -5,22 +5,8 @@ #include #include #include -#include "VoiceClientConnection.h" -#include "src/client/ConnectedClient.h" -#include "VoiceClient.h" - - -//#define LOG_AUTO_ACK_AUTORESPONSE -//#define FUZZING_TESTING_INCOMMING -//#define FUZZING_TESTING_OUTGOING -//#define FIZZING_TESTING_DISABLE_HANDSHAKE -#define FUZZING_TESTING_DROP 8 -#define FUZZING_TESTING_DROP_MAX 10 - -//#define CONNECTION_NO_STATISTICS - -#define QLZ_COMPRESSION_LEVEL 1 -#include "qlz/QuickLZ.h" +#include "./VoiceClientConnection.h" +#include "./VoiceClient.h" using namespace std; using namespace std::chrono; @@ -31,23 +17,47 @@ using namespace ts::server; VoiceClientConnection::VoiceClientConnection(VoiceClient* client) : packet_encoder_{&this->crypt_handler, &this->compress_handler, &this->acknowledge_handler}, - packet_decoder_{&this->crypt_handler, &this->compress_handler, &this->acknowledge_handler} { + packet_decoder_{&this->crypt_handler} { memtrack::allocated(this); - this->packet_encoder_.callback_encoded_buffers = std::bind(&VoiceClientConnection::handle_encoded_buffers, this, std::placeholders::_1); - this->packet_encoder_.callback_encode_failed = std::bind(&VoiceClientConnection::handle_encode_error, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + this->packet_encoder_.callback_argument = this; + this->packet_encoder_.callback_encoded_buffers = [](auto _this, const auto& a1) { + reinterpret_cast(_this)->handle_encoded_buffers(a1); + }; + this->packet_encoder_.callback_encode_failed = [](auto _this, const auto& a1, auto a2, const auto& a3) { + reinterpret_cast(_this)->handle_encode_error(a1, a2, a3); + }; + + this->packet_decoder_.callback_argument = this; + this->packet_decoder_.callback_decoded_packet = [](auto _this, const auto& a1) { + reinterpret_cast(_this)->handle_decoded_packet(a1); + }; + this->packet_decoder_.callback_decode_failed = [](auto _this, auto a1, const auto& a2) { + reinterpret_cast(_this)->handle_decode_error(a1, a2); + }; + this->packet_decoder_.callback_send_acknowledge = [](auto _this, auto a1, auto a2) { + reinterpret_cast(_this)->send_packet_acknowledge(a1, a2); + }; + + this->ping_handler_.callback_argument = this; + this->ping_handler_.callback_send_ping = [](auto _this, auto& a1) { + reinterpret_cast(_this)->send_packet_ping(a1); + }; + //TODO: The two other callbacks! this->server_id = client->getServerId(); this->client_handle = client; this->crypt_handler.reset(); - debugMessage(client->getServer()->getServerId(), "Allocated new voice client connection at {}", (void*) this); + debugMessage(this->server_id, "Allocated new voice client connection at {}", (void*) this); } VoiceClientConnection::~VoiceClientConnection() { + debugMessage(this->server_id, "Deleted voice client connection at {}", (void*) this); + /* locking here should be useless, but just to ensure! */ { - lock_guard write_queue_lock(this->write_queue_lock); + lock_guard wqlock(this->write_queue_lock); this->write_queue.clear(); } @@ -71,435 +81,187 @@ void VoiceClientConnection::register_client_for_write() { //Message handle methods void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& buffer) { -#ifdef FUZZING_TESTING_INCOMMING - #ifdef FIZZING_TESTING_DISABLE_HANDSHAKE - if (this->client->state == ConnectionState::CONNECTED) { - #endif - if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) { - debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping incoming packet of length {}", CLIENT_STR_LOG_PREFIX_(this->client), buffer.length()); - return; - } - #ifdef FIZZING_TESTING_DISABLE_HANDSHAKE - } - #endif -#endif + auto command_pending = this->packet_decoder_.decode_incoming_data(buffer); + if(command_pending) { + std::shared_lock clock{this->client_mutex}; + if(!this->client_handle) return; //TODO: Warn etc? - ClientPacketParser packet_parser{buffer}; - if(!packet_parser.valid()) { - logTrace(this->client->getServerId(), "{} Received invalid packet. Dropping.", CLIENT_STR_LOG_PREFIX_(this->client)); - return; - } - assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size()); - packet_parser.set_estimated_generation(this->incoming_generation_estimators[packet_parser.type()].visit_packet(packet_parser.packet_id())); - - auto is_command = packet_parser.type() == protocol::COMMAND || packet_parser.type() == protocol::COMMAND_LOW; - /* pretest if the packet is worth the effort of decoding it */ - if(is_command) { - /* handle the order stuff */ - auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())]; - - unique_lock queue_lock(fragment_buffer.buffer_lock); - auto result = fragment_buffer.accept_index(packet_parser.packet_id()); - if(result != 0) { /* packet index is ahead buffer index */ - debugMessage(this->client->getServerId(), "{} Dropping command packet because command assembly buffer has an {} ({}|{}|{})", - CLIENT_STR_LOG_PREFIX_(this->client), - result == -1 ? "underflow" : "overflow", - fragment_buffer.capacity(), - fragment_buffer.current_index(), - packet_parser.packet_id() - ); - - if(result == -1) { /* underflow */ - /* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */ - if(this->client->crypto.protocol_encrypted) - this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW); - } - return; - } - } - - //NOTICE I found out that the Compressed flag is set if the packet contains an audio header - - if(this->client->state == ConnectionState::INIT_LOW && packet_parser.type() != protocol::INIT1) - return; - - /* decrypt the packet if needed */ - if(packet_parser.is_encrypted()) { - std::string error; - - CryptHandler::key_t crypt_key{}; - CryptHandler::nonce_t crypt_nonce{}; - - auto data = (uint8_t*) packet_parser.mutable_data_ptr(); - bool use_default_key{!this->client->crypto.protocol_encrypted}, decrypt_result; - - decrypt_packet: - if(use_default_key) { - crypt_key = CryptHandler::default_key; - crypt_nonce = CryptHandler::default_nonce; - } else { - if(!this->crypt_handler.generate_key_nonce(true, packet_parser.type(), packet_parser.packet_id(), packet_parser.estimated_generation(), crypt_key, crypt_nonce)) { - logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client)); - return; - } - } - - decrypt_result = this->crypt_handler.decrypt( - data + ClientPacketParser::kHeaderOffset, ClientPacketParser::kHeaderLength, - data + ClientPacketParser::kPayloadOffset, packet_parser.payload_length(), - data, - crypt_key, crypt_nonce, - error - ); - - if(!decrypt_result) { - if(!this->client->crypto.client_init) { - if(use_default_key) { - logTrace(this->client->getServerId(), "{} Failed to decrypt packet with default key ({}). Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), error); - return; - } else { - logTrace(this->client->getServerId(), "{} Failed to decrypt packet ({}). Trying with default key.", CLIENT_STR_LOG_PREFIX_(this->client), error); - use_default_key = true; - goto decrypt_packet; - } - } else { - logTrace(this->client->getServerId(), "{} Failed to decrypt packet ({}). Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), error); - return; - } - } - packet_parser.set_decrypted(); - } else if(is_command && this->client->state != ConnectionState::INIT_HIGH) { - logTrace(this->client->getServerId(), "{} Voice client {}/{} tried to send a unencrypted command packet. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), client->getDisplayName(), this->client->getLoggingPeerIp()); - return; - } - -#ifndef CONNECTION_NO_STATISTICS - if(this->client && this->client->getServer()) - this->client->connectionStatistics->logIncomingPacket(stats::ConnectionStatistics::category::from_type(packet_parser.type()), buffer.length()); -#endif - -#ifdef LOG_INCOMPING_PACKET_FRAGMENTS - debugMessage(lstream << CLIENT_LOG_PREFIX << "Recived packet. PacketId: " << packet->packetId() << " PacketType: " << packet->type().name() << " Flags: " << packet->flags() << " - " << packet->data() << endl); -#endif - if(is_command) { - auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())]; - CommandFragment fragment_entry{ - packet_parser.packet_id(), - packet_parser.estimated_generation(), - - packet_parser.flags(), - (uint32_t) packet_parser.payload_length(), - packet_parser.payload().own_buffer() - }; - - { - unique_lock queue_lock(fragment_buffer.buffer_lock); - - if(!fragment_buffer.insert_index(packet_parser.packet_id(), std::move(fragment_entry))) { - logTrace(this->client->getServerId(), "{} Failed to insert command packet into command packet buffer.", CLIENT_STR_LOG_PREFIX_(this->client)); - return; - } - } - this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW); - - auto voice_server = this->client->voice_server; + auto voice_server = this->client_handle->voice_server; if(voice_server) - voice_server->schedule_command_handling(this->client); - } else { - if(packet_parser.type() == protocol::VOICE || packet_parser.type() == protocol::VOICE_WHISPER) - this->client->handlePacketVoice(packet_parser); - else if(packet_parser.type() == protocol::ACK || packet_parser.type() == protocol::ACK_LOW) - this->client->handlePacketAck(packet_parser); - else if(packet_parser.type() == protocol::PING || packet_parser.type() == protocol::PONG) - this->client->handlePacketPing(packet_parser); - else { - logError(this->client->getServerId(), "{} Received hand decoded packet, but we've no method to handle it. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client)); - } + voice_server->schedule_command_handling(this->client_handle); } } -bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer /* incl. mac etc */) { - ClientPacketParser packet_parser{buffer}; - if(!packet_parser.valid() || !packet_parser.is_encrypted()) return false; +bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer) { + return this->packet_decoder_.verify_encryption(buffer); +} - assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size()); - return this->crypt_handler.verify_encryption(buffer, packet_parser.packet_id(), this->incoming_generation_estimators[packet_parser.type()].generation()); +void VoiceClientConnection::handle_decoded_packet(const ts::protocol::ClientPacketParser &packet) { + auto packet_type = packet.type(); + if(packet_type == PacketType::VOICE ) { + std::shared_lock clock{this->client_mutex}; + if(!this->client_handle) return; //TODO: Warn etc? + + this->client_handle->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0); + } else if(packet_type == PacketType::VOICE_WHISPER) { + std::shared_lock clock{this->client_mutex}; + if(!this->client_handle) return; //TODO: Warn etc? + + this->client_handle->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0); + } else if(packet_type == PacketType::ACK || packet_type == PacketType::ACK_LOW) { + string error{}; + if(!this->acknowledge_handler.process_acknowledge(packet.type(), packet.payload(), error)) + debugMessage(this->server_id, "{} Failed to handle acknowledge: {}", this->client_log_prefix(), error); + } else if(packet_type == PacketType::PING) { + /* just send a pong response */ + char buffer[2]; + le2be16(packet.packet_id(), buffer); + auto pkt = make_shared(PacketTypeInfo::Pong, pipes::buffer_view{buffer, 2}); + pkt->enable_flag(PacketFlag::Unencrypted); + this->send_packet(pkt); + } else if(packet_type == PacketType::PONG) { + if(packet.payload_length() < 2) return; + + uint16_t ping_id = be2le16((char*) packet.payload().data_ptr()); + //TODO: Ping handler handle ping + } else if(packet_type == PacketType::COMMAND || packet_type == PacketType::COMMAND_LOW) { + logCritical(this->server_id, "{} Received command packet within handle_decoded_packet callback.", this->client_log_prefix()); + } else if(packet_type == PacketType::INIT1) { + logCritical(this->server_id, "{} Received init packet within handle_decoded_packet callback.", this->client_log_prefix()); + } +} + +void VoiceClientConnection::handle_decode_error(ts::server::server::udp::PacketDecodeResult error, const std::string &message) { + using PacketDecodeResult = ts::server::server::udp::PacketDecodeResult; + switch (error) { + case PacketDecodeResult::DECRYPT_FAILED: + logWarning(this->server_id, "{} Dropping incoming packet. Failed to decrypt packet ({}).", this->client_log_prefix(), message); + break; + + case PacketDecodeResult::DECRYPT_KEY_GEN_FAILED: + logWarning(this->server_id, "{} Dropping incoming packet. Failed to generate crypto key for packet.", this->client_log_prefix()); + break; + + case PacketDecodeResult::BUFFER_OVERFLOW: + logWarning(this->server_id, "{} Dropping incoming packet because queue has a buffer overflow ({}).", this->client_log_prefix(), message); + break; + + case PacketDecodeResult::COMMAND_INSTERT_FAILED: + logWarning(this->server_id, "{} Dropping incoming packet because we failed to register the command packet.", this->client_log_prefix()); + break; + +#if 0 + case PacketDecodeResult::DUPLICATED_PACKET: + logWarning(this->server_id, "{} Dropping incoming packet because it has already be processed.", this->client_log_prefix()); + break; + + case PacketDecodeResult::INVALID_PACKET: + logWarning(this->server_id, "{} Dropping incoming packet because its invalid.", this->client_log_prefix()); + break; +#else + case PacketDecodeResult::INVALID_PACKET: + case PacketDecodeResult::DUPLICATED_PACKET: +#endif + case PacketDecodeResult::SUCCESS: + break; + } +} + +void VoiceClientConnection::send_packet_acknowledge(uint16_t packet_id, bool command_low) { + char buffer[2]; + le2be16(packet_id, buffer); + + auto packet = make_shared(command_low ? protocol::PacketTypeInfo::AckLow : protocol::PacketTypeInfo::Ack, pipes::buffer_view{buffer, 2}); + packet->enable_flag(PacketFlag::Unencrypted); + if(!command_low) packet->enable_flag(protocol::PacketFlag::NewProtocol); + this->send_packet(packet); +#ifdef PKT_LOG_ACK + logTrace(this->getServerId(), "{}[Acknowledge][Server -> Client] Sending acknowledge for {}", CLIENT_STR_LOG_PREFIX, packetId); +#endif +} + +void VoiceClientConnection::send_packet_ping(uint16_t& ping_id) { + auto packet = make_shared(PacketTypeInfo::Ping, pipes::buffer_view{}); + packet->enable_flag(PacketFlag::Unencrypted); + this->send_packet(packet, false, true); /* prepare directly so the packet gets a packet id */ + + ping_id = packet->packetId(); } void VoiceClientConnection::execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */) { - if(this->client->state >= ConnectionState::DISCONNECTING || !this->client->getServer()) + if((int) this->connection_state_ >= (int) ClientConnectionState::DISCONNECTING) return; - //TODO: Remove the buffer_execute_lock and use the one within the this->client->handlePacketCommand method - unique_lock buffer_execute_lock; + std::shared_lock clock{this->client_handle}; + if(!this->client_handle) return; //TODO: Warn etc? + + using CommandReassembleResult = ts::server::server::udp::CommandReassembleResult; + pipes::buffer payload{}; - uint16_t packet_id{}; - auto reexecute_handle = this->next_reassembled_command(buffer_execute_lock, payload, packet_id); + bool command_low{false}; + auto command_status = this->packet_decoder_.reassemble_command(payload, command_low); + switch (command_status) { + case CommandReassembleResult::SUCCESS: + case CommandReassembleResult::MORE_COMMANDS_PENDING: + break; - if(!payload.empty()){ - auto startTime = system_clock::now(); - try { - this->client->handlePacketCommand(payload); - } catch (std::exception& ex) { - logCritical(this->client->getServerId(), "{} Exception reached root tree! {}", CLIENT_STR_LOG_PREFIX_(this->client), ex.what()); - } + case CommandReassembleResult::NO_COMMANDS_PENDING: + return; - auto end = system_clock::now(); - if(end - startTime > milliseconds(10)) { - logError(this->client->getServerId(), - "{} Handling of command packet needs more than 10ms ({}ms)", - CLIENT_STR_LOG_PREFIX_(this->client), - duration_cast(end - startTime).count() - ); - } - } - if(buffer_execute_lock.owns_lock()) - buffer_execute_lock.unlock(); - - auto voice_server = this->client->voice_server; - if(voice_server && reexecute_handle) - this->client->voice_server->schedule_command_handling(this->client); -} - -/* buffer_execute_lock: lock for in order execution */ -bool VoiceClientConnection::next_reassembled_command(unique_lock& buffer_execute_lock, pipes::buffer& result, uint16_t& packet_id) { - command_fragment_buffer_t* buffer{nullptr}; - unique_lock buffer_lock; /* general buffer lock */ - - bool have_more{false}; - { - //FIXME: Currently command low packets cant be handeled if there is a command packet stuck in reassamble - - /* handle commands before command low packets */ - for(auto& buf : this->_command_fragment_buffers) { - unique_lock ring_lock(buf.buffer_lock, try_to_lock); - if(!ring_lock.owns_lock()) continue; - - if(buf.front_set()) { - if(!buffer) { /* lets still test for reexecute */ - buffer_execute_lock = unique_lock(buf.execute_lock, try_to_lock); - if(!buffer_execute_lock.owns_lock()) continue; - - buffer_lock = move(ring_lock); - buffer = &buf; - } else { - have_more = true; - break; - } - } - } + case CommandReassembleResult::COMMAND_DECOMPRESS_FAILED: + case CommandReassembleResult::COMMAND_TOO_LARGE: + case CommandReassembleResult::SEQUENCE_LENGTH_TOO_LONG: + //TODO: Shutdown connection? + logError(this->server_id, "{} Failed to reassemble next command ({}). This will cause the connection to fail.", this->client_log_prefix(), (int) command_status); + return; } - if(!buffer) - return false; /* we've no packets */ - - uint8_t packet_flags{0}; - pipes::buffer payload{}; - - /* lets find out if we've to reassemble the packet */ - auto& first_buffer = buffer->slot_value(0); - packet_id = first_buffer.packet_id; - if(first_buffer.packet_flags & PacketFlag::Fragmented) { - uint16_t sequence_length{1}; - size_t total_payload_length{first_buffer.payload_length}; - do { - if(sequence_length >= buffer->capacity()) { - logError(this->client->getServerId(), "{} Command fragment buffer is full, and there is not fragmented packet end. Dropping full buffer which will probably cause a connection loss.", CLIENT_STR_LOG_PREFIX_(this->client)); - buffer->clear(); - return false; /* we've nothing to handle */ - } - - if(!buffer->slot_set(sequence_length)) - return false; /* we need more packets */ - - auto& packet = buffer->slot_value(sequence_length++); - total_payload_length += packet.payload_length; - if(packet.packet_flags & PacketFlag::Fragmented) { - /* yep we find the end */ - break; - } - } while(true); - /* ok we have all fragments lets reassemble */ - - /* - * Packet sequence could never be so long. If it is so then the data_length() returned an invalid value. - * We're checking it here because we dont want to make a huge allocation - */ - assert(total_payload_length < 512 * 1024 * 1024); - - pipes::buffer packet_buffer{total_payload_length}; - char* packet_buffer_ptr = &packet_buffer[0]; - size_t packet_count{0}; - - packet_flags = buffer->slot_value(0).packet_flags; - while(packet_count < sequence_length) { - auto fragment = buffer->pop_front(); - memcpy(packet_buffer_ptr, fragment.payload.data_ptr(), fragment.payload_length); - - packet_buffer_ptr += fragment.payload_length; - packet_count++; - } - -#ifndef _NDEBUG - if((packet_buffer_ptr - 1) != &packet_buffer[packet_buffer.length() - 1]) { - logCritical(this->client->getServer()->getServerId(), - "Buffer over/underflow: packet_buffer_ptr != &packet_buffer[packet_buffer.length() - 1]; packet_buffer_ptr := {}; packet_buffer.end() := {}", - (void*) packet_buffer_ptr, - (void*) &packet_buffer[packet_buffer.length() - 1] - ); - } -#endif - payload = packet_buffer; - } else { - auto packet = buffer->pop_front(); - packet_flags = packet.packet_flags; - payload = packet.payload; + auto startTime = system_clock::now(); + try { + this->client_handle->handlePacketCommand(payload); + } catch (std::exception& ex) { + logCritical(this->server_id, "{} An exception has been thrown within command handling, which reached to root handler. This should not happen! (Message: {})", this->client_log_prefix(), ex.what()); } - have_more |= buffer->front_set(); /* set the more flag if we have more to process */ - buffer_lock.unlock(); - - if(packet_flags & PacketFlag::Compressed) { - std::string error{}; - - auto decompressed_size = compression::qlz_decompressed_size(payload.data_ptr(), payload.length()); - auto buffer = buffer::allocate_buffer(decompressed_size); - if(!compression::qlz_decompress_payload(payload.data_ptr(), buffer.data_ptr(), &decompressed_size)) { - logTrace(this->client->getServerId(), "{} Failed to decompress received command. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client)); - return false; - } - - payload = buffer.range(0, decompressed_size); + auto end = system_clock::now(); + if(end - startTime > milliseconds(10)) { + auto index = payload.find(" "); + std::string command{}; + if(index == std::string::npos) + command = payload.string(); + else + command = payload.view(0, index).string(); + logWarning(this->server_id, "{} Command handling of command \"{}\" required more than 10ms ({}ms)",this->client_log_prefix(), + command, + std::chrono::duration_cast(end - startTime).count() + ); } - result = std::move(payload); - return have_more; + if(command_status == CommandReassembleResult::MORE_COMMANDS_PENDING) { + auto voice_server = this->client_handle->voice_server; + if(voice_server) + voice_server->schedule_command_handling(this->client_handle); + } } void VoiceClientConnection::send_packet(const shared_ptr& original_packet, bool copy, bool prepare_directly) { - if(this->client->state == ConnectionState::DISCONNECTED) + if(this->connection_state_ == ClientConnectionState::DISCONNECTED) return; using EncodeFlags = server::server::udp::PacketEncoder::EncodeFlags; int flags{EncodeFlags::none}; if(!copy) - flags |= EncodeFlags::no_copy; + flags |= (unsigned) EncodeFlags::no_copy; if(prepare_directly) - flags |= EncodeFlags::sync; + flags |= (unsigned) EncodeFlags::sync; if(this->packet_encoder_.encode_packet(original_packet, (EncodeFlags) flags)) this->register_client_for_write(); } -#if 0 -bool VoiceClientConnection::encode_packet(vector &result, const shared_ptr &packet, std::unique_lock& work_lock) { - assert(work_lock.owns_lock()); - - string error = "success"; - - if(packet->type().compressable() && !packet->memory_state.fragment_entry) { - packet->enable_flag(PacketFlag::Compressed); - if(!this->compress_handler.progressPacketOut(packet.get(), error)) { - logError(this->getClient()->getServerId(), "{} Could not compress outgoing packet.\nThis could cause fatal failed for the client.\nError: {}", error); - return false; - } - } - - std::vector> fragments; - fragments.reserve((size_t) (packet->data().length() / packet->type().max_length()) + 1); - - if(packet->data().length() > packet->type().max_length()) { - if(!packet->type().fragmentable()) { - logError(this->client->getServerId(), "{} We've tried to send a too long, not fragmentable, packet. Dropping packet of type {} with length {}", CLIENT_STR_LOG_PREFIX_(this->client), packet->type().name(), packet->data().length()); - return false; - } - - { //Split packets - auto buffer = packet->data(); - - const auto max_length = packet->type().max_length(); - while(buffer.length() > max_length * 2) { - fragments.push_back(make_shared(packet->type(), buffer.view(0, max_length).dup(buffer::allocate_buffer(max_length)))); - buffer = buffer.range((size_t) max_length); - } - - if(buffer.length() > max_length) { //Divide rest by 2 - fragments.push_back(make_shared(packet->type(), buffer.view(0, buffer.length() / 2).dup(buffer::allocate_buffer(buffer.length() / 2)))); - buffer = buffer.range(buffer.length() / 2); - } - fragments.push_back(make_shared(packet->type(), buffer)); - - for(const auto& frag : fragments) { - frag->setFragmentedEntry(true); - frag->enable_flag(PacketFlag::NewProtocol); - } - } - - assert(fragments.size() >= 2); - fragments.front()->enable_flag(PacketFlag::Fragmented); - if(packet->has_flag(PacketFlag::Compressed)) - fragments.front()->enable_flag(PacketFlag::Compressed); - - fragments.back()->enable_flag(PacketFlag::Fragmented); - - if(packet->getListener()) - fragments.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :) - } else { - fragments.push_back(packet); - } - - result.reserve(fragments.size()); - - /* apply packet ids */ - for(const auto& fragment : fragments) { - if(!fragment->memory_state.id_branded) - fragment->applyPacketId(this->packet_id_manager); - } - work_lock.unlock(); /* the rest could be unordered */ - - - CryptHandler::key_t crypt_key{}; - CryptHandler::nonce_t crypt_nonce{}; - auto statistics = this->client ? this->client->connectionStatistics : nullptr; - for(const auto& fragment : fragments) { - if(fragment->has_flag(PacketFlag::Unencrypted)) { - this->crypt_handler.write_default_mac(fragment->mac().data_ptr()); - } else { - if(!this->client->crypto.protocol_encrypted) { - crypt_key = CryptHandler::default_key; - crypt_nonce = CryptHandler::default_nonce; - } else { - if(!this->crypt_handler.generate_key_nonce(false, fragment->type().type(), fragment->packetId(), fragment->generationId(), crypt_key, crypt_nonce)) { - logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce for sending a packet. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client)); - return false; - } - } - - auto crypt_result = this->crypt_handler.encrypt(fragment->header().data_ptr(), fragment->header().length(), - fragment->data().data_ptr(), fragment->data().length(), - fragment->mac().data_ptr(), - crypt_key, crypt_nonce, error); - if(!crypt_result){ - logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error); - return false; - } - } - -#ifndef CONNECTION_NO_STATISTICS - if(statistics) - statistics->logOutgoingPacket(*fragment); -#endif - this->acknowledge_handler.process_packet(*fragment); - result.push_back(fragment->buffer()); - } - - return true; -} -#endif - void VoiceClientConnection::handle_encode_error(const shared_ptr &packet, - ts::server::server::udp::PacketEncodeResult &result, std::string &message) { + ts::server::server::udp::PacketEncodeResult result, const std::string &message) { using PacketEncodeResult = ts::server::server::udp::PacketEncodeResult; switch (result) { case PacketEncodeResult::PACKET_TOO_LARGE: @@ -596,42 +358,23 @@ bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_poin void VoiceClientConnection::reset() { this->packet_encoder_.reset(); - + this->packet_decoder_.reset(); this->acknowledge_handler.reset(); this->crypt_handler.reset(); - - { - lock_guard buffer_lock(this->packet_buffer_lock); - for(auto& buffer : this->_command_fragment_buffers) - buffer.reset(); - } } void VoiceClientConnection::force_insert_command(const pipes::buffer_view &buffer) { - CommandFragment fragment_entry{ - 0, - 0, + this->packet_decoder_.force_insert_command(buffer); - PacketFlag::Unencrypted, - (uint32_t) buffer.length(), - buffer.own_buffer() - }; + std::shared_lock clock{this->client_handle}; + if(!this->client_handle) return; //TODO: Warn etc? - - { - auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)]; - unique_lock queue_lock(fragment_buffer.buffer_lock); - fragment_buffer.push_front(std::move(fragment_entry)); - } - - auto voice_server = this->client->voice_server; + auto voice_server = this->client_handle->voice_server; if(voice_server) - voice_server->schedule_command_handling(this->client); + voice_server->schedule_command_handling(this->client_handle); } -void VoiceClientConnection::register_initiv_packet() { - auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)]; - unique_lock buffer_lock(fragment_buffer.buffer_lock); - fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */ +void VoiceClientConnection::tick() { + //TODO: Tick ping handler } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClientConnection.h b/server/src/client/voice/VoiceClientConnection.h index c45d8fa..43e4f4f 100644 --- a/server/src/client/voice/VoiceClientConnection.h +++ b/server/src/client/voice/VoiceClientConnection.h @@ -17,6 +17,7 @@ #include #include "./PacketEncoder.h" #include "./PacketDecoder.h" +#include "./PingHandler.h" //#define LOG_ACK_SYSTEM #ifdef LOG_ACK_SYSTEM @@ -33,24 +34,6 @@ namespace ts { } namespace connection { - struct CommandFragment { - uint16_t packet_id{0}; - uint16_t packet_generation{0}; - - uint8_t packet_flags{0}; - uint32_t payload_length : 24; - pipes::buffer payload{}; - - CommandFragment() { this->payload_length = 0; } - CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload) - : packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags}, payload_length{payloadLength}, payload{std::move(payload)} {} - - CommandFragment& operator=(const CommandFragment&) = default; - CommandFragment(const CommandFragment& other) = default; - CommandFragment(CommandFragment&&) = default; - }; - static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer)); - enum struct WriteBufferStatus { EMPTY, BUFFERS_LEFT, @@ -60,13 +43,17 @@ namespace ts { UNSET }; + enum struct ClientConnectionState { + INITIALITZING, /* crypto setup */ + CONNECTED, /* basic connection has been established */ + DISCONNECTING, /* connection is already disconnecting */ + DISCONNECTED /* connection has been (maybe successfully) closed */ + }; + class VoiceClientConnection { friend class server::VoiceServer; friend class server::VoiceClient; public: - typedef protocol::PacketRingBuffer command_fragment_buffer_t; - typedef std::array command_packet_reassembler; - explicit VoiceClientConnection(server::VoiceClient*); virtual ~VoiceClientConnection(); @@ -81,15 +68,20 @@ namespace ts { */ bool encode_packets(); + [[nodiscard]] inline server::server::udp::PacketEncoder& packet_encoder() { return this->packet_encoder_; } + [[nodiscard]] inline server::server::udp::PacketDecoder& packet_decoder() { return this->packet_decoder_; } + /* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */ [[nodiscard]] WriteBufferStatus pop_write_buffer(pipes::buffer& /* buffer */); bool wait_empty_write_and_prepare_queue(std::chrono::time_point until = std::chrono::time_point()); void reset(); + void tick(); void force_insert_command(const pipes::buffer_view& /* payload */); - void register_initiv_packet(); + void send_packet_acknowledge(uint16_t /* packet id */, bool /* is command low */); + void send_packet_ping(uint16_t& /* ping id */); protected: void handle_incoming_datagram(const pipes::buffer_view &buffer); bool verify_encryption(const pipes::buffer_view& /* full packet */); @@ -100,34 +92,30 @@ namespace ts { std::shared_mutex client_mutex{}; server::VoiceClient* client_handle{nullptr}; + ClientConnectionState connection_state_{ClientConnectionState::INITIALITZING}; + CryptHandler crypt_handler{}; CompressionHandler compress_handler{}; AcknowledgeManager acknowledge_handler{}; - //Handle stuff - void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */); - bool next_reassembled_command(std::unique_lock &buffer_execute_lock /* packet channel execute lock */, pipes::buffer & /* buffer*/, uint16_t& /* packet id */); - - /* ---------- Write declarations ---------- */ spin_lock write_queue_lock; /* queue access isn't for long in general */ std::deque write_queue; server::server::udp::PacketEncoder packet_encoder_; server::server::udp::PacketDecoder packet_decoder_; + server::server::udp::PingHandler ping_handler_{}; + //Handle stuff + [[nodiscard]] std::string client_log_prefix(); + void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */); + /* will be called on the IO thread or if sync has been set directly in any thread */ - void handle_encode_error(const std::shared_ptr &/* the packet */, ts::server::server::udp::PacketEncodeResult& /* error */, std::string& /* custom message */); + void handle_encode_error(const std::shared_ptr &/* the packet */, ts::server::server::udp::PacketEncodeResult /* error */, const std::string& /* custom message */); void handle_encoded_buffers(const std::vector& /* buffers */); /* will be called on the IO thread */ void handle_decoded_packet(const protocol::ClientPacketParser&); - void handle_decode_error(); - - static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) { - return packet_index & 0x1U; /* use 0 for command and 1 for command low */ - } - - [[nodiscard]] std::string client_log_prefix(); + void handle_decode_error(ts::server::server::udp::PacketDecodeResult /* error */, const std::string& /* custom message */); }; } } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClientHandschake.cpp b/server/src/client/voice/VoiceClientHandschake.cpp index 8c2e9a0..cd96d11 100644 --- a/server/src/client/voice/VoiceClientHandschake.cpp +++ b/server/src/client/voice/VoiceClientHandschake.cpp @@ -69,8 +69,8 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) { state_lock.unlock(); this->connection->reset(); - this->connection->register_initiv_packet(); - this->crypto.protocol_encrypted = false; + this->connection->packet_decoder().register_initiv_packet(); + this->connection->packet_decoder().set_protocol_encrypted(false); bool use_teaspeak = command.hasParm("teaspeak"); if(use_teaspeak ? !config::server::clients::teaspeak : !config::server::clients::teamspeak) @@ -175,7 +175,9 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) { logError(this->server->getServerId(), "Could not setup shared secret! (" + error + ")"); return ts::command_result{error::vs_critical}; } - this->crypto.protocol_encrypted = true; + + auto& decoder = this->connection->packet_decoder(); + decoder.set_protocol_encrypted(true); } } return ts::command_result{error::ok}; @@ -190,7 +192,9 @@ ts::command_result VoiceClient::handleCommandClientEk(Command& cmd) { this->connection->getCryptHandler()->setupSharedSecretNew(this->crypto.alpha, this->crypto.beta, (char*) private_key.data(), client_key.data()); this->connection->acknowledge_handler.reset(); - this->crypto.protocol_encrypted = true; - this->sendAcknowledge(1); //Send the encrypted acknowledge (most the times the second packet; If not we're going into the resend loop) + + auto& decoder = this->connection->packet_decoder(); + decoder.set_protocol_encrypted(true); + this->connection->send_packet_acknowledge(1, false); //Send the encrypted acknowledge (most the times the second packet; If not we're going into the resend loop) return ts::command_result{error::ok}; } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClientPacketHandler.cpp b/server/src/client/voice/VoiceClientPacketHandler.cpp index 0d63fdf..89b8b19 100644 --- a/server/src/client/voice/VoiceClientPacketHandler.cpp +++ b/server/src/client/voice/VoiceClientPacketHandler.cpp @@ -1,6 +1,4 @@ -#include #include -#include #include #include "../web/WebClient.h" #include "VoiceClient.h" @@ -11,10 +9,6 @@ using namespace ts::server; using namespace ts::protocol; //#define PKT_LOG_PING -/* should never happen! */ -void VoiceClient::handlePacketInit(const ts::protocol::ClientPacketParser &) {} - -//TODO Packet handlers -> move back to voice client? void VoiceClient::handlePacketCommand(const pipes::buffer_view& command_string) { std::unique_ptr command; command_result result{}; @@ -40,48 +34,4 @@ void VoiceClient::handlePacketCommand(const pipes::buffer_view& command_string) handle_error: this->notifyError(result); result.release_details(); -} - -void VoiceClient::handlePacketPing(const protocol::ClientPacketParser& packet) { - if (packet.type() == protocol::PONG) { - if(packet.payload_length() < 2) return; - - uint16_t id = be2le16((char*) packet.payload().data_ptr()); - if (this->lastPingId == id) { -#ifdef PKT_LOG_PING - logMessage(this->getServerId(), "{}[Ping] Got a valid pong for ping {}. Required time: {}", CLIENT_STR_LOG_PREFIX, id, duration_cast(system_clock::now() - this->lastPingRequest).count() / 1000.f); -#endif - this->lastPingResponse = system_clock::now(); - this->ping = std::chrono::duration_cast(this->lastPingResponse - this->lastPingRequest); - } -#ifdef PKT_LOG_PING - else { - logMessage(this->getServerId(), "{}[Ping] Got invalid pong. (Responded pong id {} but expected {})", CLIENT_STR_LOG_PREFIX, packet->packetId(), this->lastPingId); - } -#endif - return; - } - -#ifdef PKT_LOG_PING - logMessage(this->getServerId(), "{}[Ping] Sending pong for client requested ping {}", CLIENT_STR_LOG_PREFIX, packet->packetId()); -#endif - char buffer[2]; - le2be16(packet.packet_id(), buffer); - auto pkt = make_shared(PacketTypeInfo::Pong, pipes::buffer_view{buffer, 2}); - pkt->enable_flag(PacketFlag::Unencrypted); - this->connection->send_packet(pkt); -} - -void VoiceClient::handlePacketVoice(const protocol::ClientPacketParser& packet) { - if (packet.type() == protocol::VOICE) { - SpeakingClient::handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0); - } else if(packet.type() == protocol::VOICE_WHISPER) { - SpeakingClient::handlePacketVoiceWhisper(packet.payload(), (packet.flags() & PacketFlag::NewProtocol) > 0); - } -} - -void VoiceClient::handlePacketAck(const protocol::ClientPacketParser& packet) { - string error{}; - if(!this->connection->acknowledge_handler.process_acknowledge(packet.type(), packet.payload(), error)) - debugMessage(this->getServerId(), "{} Failed to handle acknowledge: {}", CLIENT_STR_LOG_PREFIX, error); } \ No newline at end of file diff --git a/server/src/client/voice/VoiceClientView.cpp b/server/src/client/voice/VoiceClientView.cpp deleted file mode 100644 index 67b70df..0000000 --- a/server/src/client/voice/VoiceClientView.cpp +++ /dev/null @@ -1,24 +0,0 @@ -#include -#include -#include "../../InstanceHandler.h" -#include "VoiceClient.h" - -using namespace std; -using namespace ts::server; -using namespace ts::protocol; - -extern InstanceHandler* serverInstance; - -void VoiceClient::sendPingRequest() { - this->lastPingRequest = std::chrono::system_clock::now(); - - auto packet = make_shared(PacketTypeInfo::Ping, pipes::buffer_view{}); - packet->enable_flag(PacketFlag::Unencrypted); - this->connection->send_packet(packet, false, true); /* prepare directly so the packet gets a packet id */ - - this->lastPingId = packet->packetId(); - -#ifdef PKT_LOG_PING - logMessage(this->getServerId(), "{}[Ping] Sending a ping request with it {}", CLIENT_STR_LOG_PREFIX, this->lastPingId); -#endif -} \ No newline at end of file diff --git a/shared b/shared index 2ffa124..62292af 160000 --- a/shared +++ b/shared @@ -1 +1 @@ -Subproject commit 2ffa12489d4c7b16789ec2a93d82d02ee412b264 +Subproject commit 62292af022798db2aba9ae5aa69aebbb849fb75a