Some changes

This commit is contained in:
WolverinDEV 2020-03-11 10:19:08 +01:00
parent c002be7307
commit 58666b8906
16 changed files with 608 additions and 583 deletions

View File

@ -49,7 +49,6 @@ set(SERVER_SOURCE_FILES
src/client/voice/VoiceClientHandschake.cpp
src/client/voice/VoiceClientCommandHandler.cpp
src/client/voice/VoiceClientPacketHandler.cpp
src/client/voice/VoiceClientView.cpp
src/TS3ServerClientManager.cpp
src/VirtualServer.cpp
src/TS3ServerHeartbeat.cpp
@ -161,7 +160,7 @@ if (COMPILE_WEB_CLIENT)
src/client/web/WSWebClient.cpp
src/client/web/SampleHandler.cpp
src/client/web/VoiceBridge.cpp
src/client/command_handler/helpers.h src/music/PlaylistPermissions.cpp src/music/PlaylistPermissions.h src/lincense/LicenseService.cpp src/lincense/LicenseService.h src/groups/GroupManager.cpp src/groups/GroupManager.h src/groups/GroupAssignmentManager.cpp src/groups/GroupAssignmentManager.h src/groups/Group.cpp src/groups/Group.h src/services/VirtualServerInformation.cpp src/services/VirtualServerInformation.h src/vserver/VirtualServerManager.cpp src/vserver/VirtualServerManager.h src/services/VirtualServerBroadcastService.cpp src/services/VirtualServerBroadcastService.h src/server/udp-server/UDPServer.cpp src/server/udp-server/UDPServer.h src/client/voice/PacketEncoder.cpp src/client/voice/PacketEncoder.h src/client/voice/PacketDecoder.cpp src/client/voice/PacketDecoder.h)
src/client/command_handler/helpers.h src/music/PlaylistPermissions.cpp src/music/PlaylistPermissions.h src/lincense/LicenseService.cpp src/lincense/LicenseService.h src/groups/GroupManager.cpp src/groups/GroupManager.h src/groups/GroupAssignmentManager.cpp src/groups/GroupAssignmentManager.h src/groups/Group.cpp src/groups/Group.h src/services/VirtualServerInformation.cpp src/services/VirtualServerInformation.h src/vserver/VirtualServerManager.cpp src/vserver/VirtualServerManager.h src/services/VirtualServerBroadcastService.cpp src/services/VirtualServerBroadcastService.h src/server/udp-server/UDPServer.cpp src/server/udp-server/UDPServer.h src/client/voice/PacketEncoder.cpp src/client/voice/PacketEncoder.h src/client/voice/PacketDecoder.cpp src/client/voice/PacketDecoder.h src/client/voice/PingHandler.cpp src/client/voice/PingHandler.h)
endif ()
add_executable(PermHelper helpers/permgen.cpp)

View File

@ -12,4 +12,285 @@
#include "../../ConnectionStatistics.h"
using namespace ts;
using namespace ts::server::server::udp;
using namespace ts::server::server::udp;
PacketDecoder::PacketDecoder(ts::connection::CryptHandler *crypt_handler)
: crypt_handler_{crypt_handler} {
memtrack::allocated<PacketDecoder>(this);
}
PacketDecoder::~PacketDecoder() {
memtrack::freed<PacketDecoder>(this);
this->reset();
}
void PacketDecoder::reset() {
std::lock_guard buffer_lock(this->packet_buffer_lock);
for(auto& buffer : this->_command_fragment_buffers)
buffer.reset();
}
bool PacketDecoder::decode_incoming_data(const pipes::buffer_view &buffer) {
std::string error{};
bool needs_command_reassemble{false};
auto result = this->decode_incoming_data_(error, needs_command_reassemble, buffer);
if(result != PacketDecodeResult::SUCCESS)
if(auto callback{this->callback_decode_failed}; callback)
callback(this->callback_argument, result, error);
return needs_command_reassemble;
}
PacketDecodeResult PacketDecoder::decode_incoming_data_(std::string& error, bool& commands_pending, const pipes::buffer_view &buffer) {
#ifdef FUZZING_TESTING_INCOMMING
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
if (this->client->state == ConnectionState::CONNECTED) {
#endif
if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) {
debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping incoming packet of length {}", CLIENT_STR_LOG_PREFIX_(this->client), buffer.length());
return;
}
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
}
#endif
#endif
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid())
return PacketDecodeResult::INVALID_PACKET;
assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size());
packet_parser.set_estimated_generation(this->incoming_generation_estimators[packet_parser.type()].visit_packet(packet_parser.packet_id()));
auto is_command = packet_parser.type() == protocol::COMMAND || packet_parser.type() == protocol::COMMAND_LOW;
/* pretest if the packet is worth the effort of decoding it */
if(is_command) {
/* handle the order stuff */
auto& fragment_buffer = this->_command_fragment_buffers[PacketDecoder::command_fragment_buffer_index(packet_parser.type())];
unique_lock queue_lock(fragment_buffer.buffer_lock);
auto result = fragment_buffer.accept_index(packet_parser.packet_id());
if(result != 0) { /* packet index is ahead buffer index */
error = "pid: " + std::to_string(packet_parser.packet_id()) + ",";
error += "bidx: " + std::to_string(fragment_buffer.current_index()) + ",";
error += "bcap: " + std::to_string(fragment_buffer.capacity());
if(result == -1) { /* underflow */
/* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */
this->callback_send_acknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
return PacketDecodeResult::DUPLICATED_PACKET;
}
return PacketDecodeResult::BUFFER_OVERFLOW;
}
}
//NOTICE I found out that the Compressed flag is set if the packet contains an audio header
/* decrypt the packet if needed */
if(packet_parser.is_encrypted()) {
CryptHandler::key_t crypt_key{};
CryptHandler::nonce_t crypt_nonce{};
auto data = (uint8_t*) packet_parser.mutable_data_ptr();
bool use_default_key{!this->protocol_encrypted}, decrypt_result;
decrypt_packet:
if(use_default_key) {
crypt_key = CryptHandler::default_key;
crypt_nonce = CryptHandler::default_nonce;
} else {
if(!this->crypt_handler_->generate_key_nonce(true, packet_parser.type(), packet_parser.packet_id(), packet_parser.estimated_generation(), crypt_key, crypt_nonce))
return PacketDecodeResult::DECRYPT_KEY_GEN_FAILED;
}
decrypt_result = this->crypt_handler_->decrypt(
data + ClientPacketParser::kHeaderOffset, ClientPacketParser::kHeaderLength,
data + ClientPacketParser::kPayloadOffset, packet_parser.payload_length(),
data,
crypt_key, crypt_nonce,
error
);
if(!decrypt_result) {
if(packet_parser.packet_id() < 10 && packet_parser.estimated_generation() == 0) {
if(use_default_key) {
return PacketDecodeResult::DECRYPT_FAILED;
} else {
use_default_key = true;
goto decrypt_packet;
}
} else {
return PacketDecodeResult::DECRYPT_FAILED;
}
}
packet_parser.set_decrypted();
}
if(auto statistics{this->statistics_}; statistics)
statistics->logIncomingPacket(stats::ConnectionStatistics::category::from_type(packet_parser.type()), buffer.length());
#ifdef LOG_INCOMPING_PACKET_FRAGMENTS
debugMessage(lstream << CLIENT_LOG_PREFIX << "Recived packet. PacketId: " << packet->packetId() << " PacketType: " << packet->type().name() << " Flags: " << packet->flags() << " - " << packet->data() << endl);
#endif
if(is_command) {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())];
CommandFragment fragment_entry{
packet_parser.packet_id(),
packet_parser.estimated_generation(),
packet_parser.flags(),
(uint32_t) packet_parser.payload_length(),
packet_parser.payload().own_buffer()
};
{
unique_lock queue_lock(fragment_buffer.buffer_lock);
if(!fragment_buffer.insert_index(packet_parser.packet_id(), std::move(fragment_entry)))
return PacketDecodeResult::COMMAND_INSTERT_FAILED;
}
this->callback_send_acknowledge(this->callback_argument, packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
commands_pending = true;
} else {
this->callback_decoded_packet(this->callback_argument, packet_parser);
}
}
bool PacketDecoder::verify_encryption(const pipes::buffer_view &buffer) {
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid() || !packet_parser.is_encrypted()) return false;
assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size());
return this->crypt_handler_->verify_encryption(buffer, packet_parser.packet_id(), this->incoming_generation_estimators[packet_parser.type()].generation());
}
CommandReassembleResult PacketDecoder::reassemble_command(pipes::buffer &result, bool &is_command_low) {
bool more_commands_pending{false};
command_fragment_buffer_t* buffer{nullptr};
unique_lock<std::recursive_timed_mutex> buffer_lock; /* general buffer lock */
{
//FIXME: Currently command low packets cant be handled if there is a command packet stuck in reassemble queue
/* handle commands before command low packets */
for(auto& buf : this->_command_fragment_buffers) {
unique_lock ring_lock(buf.buffer_lock, try_to_lock);
if(!ring_lock.owns_lock()) continue;
if(buf.front_set()) {
if(!buffer) { /* lets still test for reexecute */
buffer_lock = move(ring_lock);
buffer = &buf;
} else {
more_commands_pending = true;
break;
}
}
}
}
if(!buffer)
return CommandReassembleResult::NO_COMMANDS_PENDING;
uint8_t packet_flags{0};
pipes::buffer payload{};
/* lets find out if we've to reassemble the packet */
auto& first_buffer = buffer->slot_value(0);
if(first_buffer.packet_flags & PacketFlag::Fragmented) {
uint16_t sequence_length{1};
size_t total_payload_length{first_buffer.payload_length};
do {
if(sequence_length >= buffer->capacity())
return CommandReassembleResult::SEQUENCE_LENGTH_TOO_LONG;
if(!buffer->slot_set(sequence_length))
return CommandReassembleResult::NO_COMMANDS_PENDING; /* we need more packets */
auto& packet = buffer->slot_value(sequence_length++);
total_payload_length += packet.payload_length;
if(packet.packet_flags & PacketFlag::Fragmented) {
/* yep we find the end */
break;
}
} while(true);
/* ok we have all fragments lets reassemble */
/*
* Packet sequence could never be so long. If it is so then the data_length() returned an invalid value.
* We're checking it here because we dont want to make a huge allocation
*/
assert(total_payload_length < 512 * 1024 * 1024);
pipes::buffer packet_buffer{total_payload_length};
char* packet_buffer_ptr = &packet_buffer[0];
size_t packet_count{0};
packet_flags = buffer->slot_value(0).packet_flags;
while(packet_count < sequence_length) {
auto fragment = buffer->pop_front();
memcpy(packet_buffer_ptr, fragment.payload.data_ptr(), fragment.payload_length);
packet_buffer_ptr += fragment.payload_length;
packet_count++;
}
#ifndef _NDEBUG
if((packet_buffer_ptr - 1) != &packet_buffer[packet_buffer.length() - 1]) {
logCritical(0,
"Buffer over/underflow: packet_buffer_ptr != &packet_buffer[packet_buffer.length() - 1]; packet_buffer_ptr := {}; packet_buffer.end() := {}",
(void*) packet_buffer_ptr,
(void*) &packet_buffer[packet_buffer.length() - 1]
);
}
#endif
payload = packet_buffer;
} else {
auto packet = buffer->pop_front();
packet_flags = packet.packet_flags;
payload = packet.payload;
}
more_commands_pending |= buffer->front_set(); /* set the more flag if we have more to process */
buffer_lock.unlock();
if(packet_flags & PacketFlag::Compressed) {
std::string error{};
auto decompressed_size = compression::qlz_decompressed_size(payload.data_ptr(), payload.length());
auto uncompressed_buffer = buffer::allocate_buffer(decompressed_size);
if(!compression::qlz_decompress_payload(payload.data_ptr(), uncompressed_buffer.data_ptr(), &decompressed_size))
return CommandReassembleResult::COMMAND_DECOMPRESS_FAILED;
payload = uncompressed_buffer.range(0, decompressed_size);
}
result = std::move(payload);
return more_commands_pending ? CommandReassembleResult::MORE_COMMANDS_PENDING : CommandReassembleResult::SUCCESS;
}
void PacketDecoder::force_insert_command(const pipes::buffer_view &buffer) {
CommandFragment fragment_entry{
0,
0,
PacketFlag::Unencrypted,
(uint32_t) buffer.length(),
buffer.own_buffer()
};
{
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
unique_lock queue_lock(fragment_buffer.buffer_lock);
fragment_buffer.push_front(std::move(fragment_entry));
}
}
void PacketDecoder::register_initiv_packet() {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
unique_lock buffer_lock(fragment_buffer.buffer_lock);
fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */
}

View File

@ -36,18 +36,59 @@ namespace ts::server::server::udp {
};
static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer));
enum struct PacketDecodeResult {
SUCCESS,
INVALID_PACKET,
DUPLICATED_PACKET, /* error message contains debug properties */
BUFFER_OVERFLOW, /* error message contains debug properties */
DECRYPT_KEY_GEN_FAILED,
DECRYPT_FAILED, /* has custom message */
COMMAND_INSTERT_FAILED
};
enum struct CommandReassembleResult {
SUCCESS,
MORE_COMMANDS_PENDING, /* equal with success */
NO_COMMANDS_PENDING,
COMMAND_TOO_LARGE, /* this is a fatal error to the connection */
COMMAND_DECOMPRESS_FAILED,
SEQUENCE_LENGTH_TOO_LONG /* unrecoverable error */
};
class PacketDecoder {
typedef protocol::PacketRingBuffer<CommandFragment, 32, CommandFragment> command_fragment_buffer_t;
typedef std::array<command_fragment_buffer_t, 2> command_packet_reassembler;
public:
typedef std::function<void(const protocol::ClientPacketParser&)> callback_decoded_packet_t;
//typedef std::function<void(const protocol::ClientPacketParser&)> callback_decoded_packet_t;
//typedef std::function<void(uint16_t /* packet id */, bool /* is command low */)> callback_send_acknowledge_t;
//typedef std::function<void(PacketDecodeResult /* error */, const std::string& /* custom message */)> callback_decode_failed_t;
/* gets better optimized out */
typedef void(*callback_decoded_packet_t)(void* /* cb argument */, const protocol::ClientPacketParser&);
typedef void(*callback_send_acknowledge_t)(void* /* cb argument */, uint16_t /* packet id */, bool /* is command low */);
typedef void(*callback_decode_failed_t)(void* /* cb argument */, PacketDecodeResult /* error */, const std::string& /* custom message */);
PacketDecoder(connection::CryptHandler* /* crypt handler */, connection::CompressionHandler* /* compress handler */, connection::AcknowledgeManager* /* acknowledge handler */);
explicit PacketDecoder(connection::CryptHandler* /* crypt handler */);
~PacketDecoder();
void reset();
void decode_incoming_data(const pipes::buffer_view &/* buffer */);
bool verify_encryption(const pipes::buffer_view& /* full packet */);
/* true if commands might be pending */
bool decode_incoming_data(const pipes::buffer_view &/* buffer */);
/* This method is not thread save! Only one concurrent call supported */
CommandReassembleResult reassemble_command(pipes::buffer& /* result */, bool& /* is command low */);
void force_insert_command(const pipes::buffer_view& /* payload */);
void register_initiv_packet();
[[nodiscard]] inline std::shared_ptr<stats::ConnectionStatistics> get_statistics() { return this->statistics_; }
inline void set_statistics(const std::shared_ptr<stats::ConnectionStatistics>& stats) { this->statistics_ = stats; }
@ -55,18 +96,25 @@ namespace ts::server::server::udp {
[[nodiscard]] inline bool is_protocol_encrypted() const { return this->protocol_encrypted; }
void set_protocol_encrypted(bool flag) { this->protocol_encrypted = flag; }
callback_decoded_packet_t callback_decoded_packet{};
void* callback_argument{nullptr};
callback_decoded_packet_t callback_decoded_packet{[](auto, auto&){}}; /* needs to be valid all the time! */
callback_send_acknowledge_t callback_send_acknowledge{[](auto, auto, auto){}}; /* needs to be valid all the time! */
callback_decode_failed_t callback_decode_failed{nullptr};
private:
bool protocol_encrypted{false};
std::shared_ptr<stats::ConnectionStatistics> statistics_{nullptr};
connection::CryptHandler* crypt_handler_{nullptr};
connection::CompressionHandler* compress_handler_{nullptr};
connection::AcknowledgeManager* acknowledge_handler_{nullptr};
std::array<protocol::generation_estimator, 9> incoming_generation_estimators{}; /* implementation is thread save */
std::recursive_mutex packet_buffer_lock;
command_packet_reassembler _command_fragment_buffers;
static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) {
return packet_index & 0x1U; /* use 0 for command and 1 for command low */
}
PacketDecodeResult decode_incoming_data_(std::string& /* error */, bool& /* needs command reassemble */, const pipes::buffer_view &/* buffer */);
};
}

View File

@ -57,13 +57,13 @@ bool PacketEncoder::encode_packet(const std::shared_ptr<protocol::ServerPacket>
auto encode_result = this->encode_packet_(error, buffers, packet, work_lock);
if(encode_result != PacketEncodeResult::SUCCESS) {
if(auto callback{this->callback_encode_failed}; callback)
callback(original_packet, encode_result, error);
callback(this->callback_argument, original_packet, encode_result, error);
goto sync_cleanup_exit;
}
}
if(auto callback{this->callback_encoded_buffers}; callback)
callback(buffers);
callback(this->callback_argument, buffers);
sync_cleanup_exit:
this->process_count--; /* we're now done preparing */
@ -110,7 +110,7 @@ bool PacketEncoder::do_encode() {
if(auto errc = this->encode_packet_(error, buffers, packet, work_lock); errc != PacketEncodeResult::SUCCESS) {
if(auto callback{this->callback_encode_failed}; callback)
callback(packet, errc, error);
callback(this->callback_argument, packet, errc, error);
if(flag_more)
break;
else
@ -126,7 +126,7 @@ bool PacketEncoder::do_encode() {
/* enqueue buffers for write */
if(!buffers.empty()) {
if(auto callback{this->callback_encoded_buffers}; callback)
callback(buffers);
callback(this->callback_argument, buffers);
}
this->process_count--; /* we're now done preparing */

View File

@ -68,8 +68,11 @@ namespace ts::server::server::udp {
sync = 0x02 /* directly process the packet */
};
typedef std::function<void(const std::vector<pipes::buffer>& /* buffers */)> callback_encoded_buffers_t;
typedef std::function<void(const std::shared_ptr<protocol::ServerPacket> &/* the packet */, PacketEncodeResult& /* error */, std::string& /* custom message */)> callback_encode_failed_t;
//typedef std::function<void(const std::vector<pipes::buffer>& /* buffers */)> callback_encoded_buffers_t;
//typedef std::function<void(const std::shared_ptr<protocol::ServerPacket> &/* the packet */, PacketEncodeResult /* error */, std::string& /* custom message */)> callback_encode_failed_t;
/* gets better optimized out */
typedef void(*callback_encoded_buffers_t)(void* /* cb argument */, const std::vector<pipes::buffer>& /* buffers */);
typedef void(*callback_encode_failed_t)(void* /* cb argument */, const std::shared_ptr<protocol::ServerPacket> &/* the packet */, PacketEncodeResult /* error */, const std::string& /* custom message */);
PacketEncoder(connection::CryptHandler* /* crypt handler */, connection::CompressionHandler* /* compress handler */, connection::AcknowledgeManager* /* acknowledge handler */);
~PacketEncoder();
@ -86,6 +89,7 @@ namespace ts::server::server::udp {
[[nodiscard]] inline bool is_protocol_encrypted() const { return this->protocol_encrypted; }
void set_protocol_encrypted(bool flag) { this->protocol_encrypted = flag; }
void* callback_argument{nullptr};
callback_encoded_buffers_t callback_encoded_buffers{};
callback_encode_failed_t callback_encode_failed{};
private:

View File

@ -0,0 +1,7 @@
//
// Created by WolverinDEV on 11/03/2020.
//
#include "PingHandler.h"
using namespace ts::server::server::udp;

View File

@ -0,0 +1,35 @@
#pragma once
#include <cstdint>
#include <chrono>
namespace ts::server::server::udp {
class PingHandler {
public:
typedef void(*callback_time_outed_t)(void* /* cb data */);
typedef void(*callback_send_ping_t)(void* /* cb data */, uint16_t& /* ping id */);
typedef void(*callback_send_recovery_command_t)(void* /* cb data */);
void reset();
void tick();
void received_ping(uint16_t /* ping id */);
void command_packet_acknowledged();
[[nodiscard]] inline std::chrono::milliseconds current_ping() const { return this->current_ping_; }
void* callback_argument{nullptr};
callback_send_ping_t callback_send_ping{nullptr};
callback_send_recovery_command_t callback_send_recovery_command{nullptr};
callback_time_outed_t callback_time_outed{nullptr};
private:
uint16_t last_ping_id{0};
std::chrono::milliseconds current_ping_{0};;
std::chrono::system_clock::time_point last_response_{};
std::chrono::system_clock::time_point last_request_{};
std::chrono::system_clock::time_point last_command_packet_{};
std::chrono::system_clock::time_point last_recovery_command_send{};
};
}

View File

@ -82,6 +82,7 @@ void VoiceClient::sendAcknowledge(uint16_t packetId, bool low) {
void VoiceClient::tick(const std::chrono::system_clock::time_point &time) {
SpeakingClient::tick(time);
this->connection->tick();
{
ALARM_TIMER(A1, "VoiceClient::tick", milliseconds(3));
if(this->state == ConnectionState::CONNECTED) {

View File

@ -59,7 +59,6 @@ namespace ts {
/* Note: Order is only guaranteed if progressDirectly is on! */
virtual void sendCommand0(const std::string_view& /* data */, bool low = false, bool progressDirectly = false, std::unique_ptr<threads::Future<bool>> listener = nullptr);
virtual void sendAcknowledge(uint16_t packetId, bool low = false);
connection::VoiceClientConnection* getConnection(){ return connection; }
std::shared_ptr<VoiceServer> getVoiceServer(){ return voice_server; }
@ -74,11 +73,6 @@ namespace ts {
virtual void tick(const std::chrono::system_clock::time_point &time) override;
void handlePacketCommand(const pipes::buffer_view&);
void handlePacketAck(const protocol::ClientPacketParser&);
void handlePacketVoice(const protocol::ClientPacketParser&);
void handlePacketPing(const protocol::ClientPacketParser&);
void handlePacketInit(const protocol::ClientPacketParser&);
//Handshake helpers
@ -89,9 +83,6 @@ namespace ts {
protected:
virtual command_result handleCommand(Command &command) override;
//Some helper method
void sendPingRequest();
//Ping/pong
uint16_t lastPingId = 0;
std::chrono::milliseconds ping = std::chrono::milliseconds(0);
@ -119,7 +110,6 @@ namespace ts {
struct {
bool client_init = false;
bool new_protocol = false;
bool protocol_encrypted = false;
uint32_t client_time = 0;
std::string alpha;

View File

@ -1,12 +1,11 @@
#include <log/LogUtils.h>
#include <misc/endianness.h>
#include <misc/base64.h>
#include <ThreadPool/Timer.h>
#include <openssl/sha.h>
#include <misc/digest.h>
#include <src/client/SpeakingClient.h>
#include "../../InstanceHandler.h"
#include "../../geo/GeoLocation.h"
#include "VoiceClient.h"
using namespace std;

View File

@ -5,22 +5,8 @@
#include <misc/memtracker.h>
#include <protocol/Packet.h>
#include <ThreadPool/Timer.h>
#include "VoiceClientConnection.h"
#include "src/client/ConnectedClient.h"
#include "VoiceClient.h"
//#define LOG_AUTO_ACK_AUTORESPONSE
//#define FUZZING_TESTING_INCOMMING
//#define FUZZING_TESTING_OUTGOING
//#define FIZZING_TESTING_DISABLE_HANDSHAKE
#define FUZZING_TESTING_DROP 8
#define FUZZING_TESTING_DROP_MAX 10
//#define CONNECTION_NO_STATISTICS
#define QLZ_COMPRESSION_LEVEL 1
#include "qlz/QuickLZ.h"
#include "./VoiceClientConnection.h"
#include "./VoiceClient.h"
using namespace std;
using namespace std::chrono;
@ -31,23 +17,47 @@ using namespace ts::server;
VoiceClientConnection::VoiceClientConnection(VoiceClient* client) :
packet_encoder_{&this->crypt_handler, &this->compress_handler, &this->acknowledge_handler},
packet_decoder_{&this->crypt_handler, &this->compress_handler, &this->acknowledge_handler} {
packet_decoder_{&this->crypt_handler} {
memtrack::allocated<VoiceClientConnection>(this);
this->packet_encoder_.callback_encoded_buffers = std::bind(&VoiceClientConnection::handle_encoded_buffers, this, std::placeholders::_1);
this->packet_encoder_.callback_encode_failed = std::bind(&VoiceClientConnection::handle_encode_error, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
this->packet_encoder_.callback_argument = this;
this->packet_encoder_.callback_encoded_buffers = [](auto _this, const auto& a1) {
reinterpret_cast<VoiceClientConnection*>(_this)->handle_encoded_buffers(a1);
};
this->packet_encoder_.callback_encode_failed = [](auto _this, const auto& a1, auto a2, const auto& a3) {
reinterpret_cast<VoiceClientConnection*>(_this)->handle_encode_error(a1, a2, a3);
};
this->packet_decoder_.callback_argument = this;
this->packet_decoder_.callback_decoded_packet = [](auto _this, const auto& a1) {
reinterpret_cast<VoiceClientConnection*>(_this)->handle_decoded_packet(a1);
};
this->packet_decoder_.callback_decode_failed = [](auto _this, auto a1, const auto& a2) {
reinterpret_cast<VoiceClientConnection*>(_this)->handle_decode_error(a1, a2);
};
this->packet_decoder_.callback_send_acknowledge = [](auto _this, auto a1, auto a2) {
reinterpret_cast<VoiceClientConnection*>(_this)->send_packet_acknowledge(a1, a2);
};
this->ping_handler_.callback_argument = this;
this->ping_handler_.callback_send_ping = [](auto _this, auto& a1) {
reinterpret_cast<VoiceClientConnection*>(_this)->send_packet_ping(a1);
};
//TODO: The two other callbacks!
this->server_id = client->getServerId();
this->client_handle = client;
this->crypt_handler.reset();
debugMessage(client->getServer()->getServerId(), "Allocated new voice client connection at {}", (void*) this);
debugMessage(this->server_id, "Allocated new voice client connection at {}", (void*) this);
}
VoiceClientConnection::~VoiceClientConnection() {
debugMessage(this->server_id, "Deleted voice client connection at {}", (void*) this);
/* locking here should be useless, but just to ensure! */
{
lock_guard write_queue_lock(this->write_queue_lock);
lock_guard wqlock(this->write_queue_lock);
this->write_queue.clear();
}
@ -71,435 +81,187 @@ void VoiceClientConnection::register_client_for_write() {
//Message handle methods
void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& buffer) {
#ifdef FUZZING_TESTING_INCOMMING
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
if (this->client->state == ConnectionState::CONNECTED) {
#endif
if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) {
debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping incoming packet of length {}", CLIENT_STR_LOG_PREFIX_(this->client), buffer.length());
return;
}
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
}
#endif
#endif
auto command_pending = this->packet_decoder_.decode_incoming_data(buffer);
if(command_pending) {
std::shared_lock clock{this->client_mutex};
if(!this->client_handle) return; //TODO: Warn etc?
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid()) {
logTrace(this->client->getServerId(), "{} Received invalid packet. Dropping.", CLIENT_STR_LOG_PREFIX_(this->client));
return;
}
assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size());
packet_parser.set_estimated_generation(this->incoming_generation_estimators[packet_parser.type()].visit_packet(packet_parser.packet_id()));
auto is_command = packet_parser.type() == protocol::COMMAND || packet_parser.type() == protocol::COMMAND_LOW;
/* pretest if the packet is worth the effort of decoding it */
if(is_command) {
/* handle the order stuff */
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())];
unique_lock queue_lock(fragment_buffer.buffer_lock);
auto result = fragment_buffer.accept_index(packet_parser.packet_id());
if(result != 0) { /* packet index is ahead buffer index */
debugMessage(this->client->getServerId(), "{} Dropping command packet because command assembly buffer has an {} ({}|{}|{})",
CLIENT_STR_LOG_PREFIX_(this->client),
result == -1 ? "underflow" : "overflow",
fragment_buffer.capacity(),
fragment_buffer.current_index(),
packet_parser.packet_id()
);
if(result == -1) { /* underflow */
/* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */
if(this->client->crypto.protocol_encrypted)
this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
}
return;
}
}
//NOTICE I found out that the Compressed flag is set if the packet contains an audio header
if(this->client->state == ConnectionState::INIT_LOW && packet_parser.type() != protocol::INIT1)
return;
/* decrypt the packet if needed */
if(packet_parser.is_encrypted()) {
std::string error;
CryptHandler::key_t crypt_key{};
CryptHandler::nonce_t crypt_nonce{};
auto data = (uint8_t*) packet_parser.mutable_data_ptr();
bool use_default_key{!this->client->crypto.protocol_encrypted}, decrypt_result;
decrypt_packet:
if(use_default_key) {
crypt_key = CryptHandler::default_key;
crypt_nonce = CryptHandler::default_nonce;
} else {
if(!this->crypt_handler.generate_key_nonce(true, packet_parser.type(), packet_parser.packet_id(), packet_parser.estimated_generation(), crypt_key, crypt_nonce)) {
logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return;
}
}
decrypt_result = this->crypt_handler.decrypt(
data + ClientPacketParser::kHeaderOffset, ClientPacketParser::kHeaderLength,
data + ClientPacketParser::kPayloadOffset, packet_parser.payload_length(),
data,
crypt_key, crypt_nonce,
error
);
if(!decrypt_result) {
if(!this->client->crypto.client_init) {
if(use_default_key) {
logTrace(this->client->getServerId(), "{} Failed to decrypt packet with default key ({}). Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), error);
return;
} else {
logTrace(this->client->getServerId(), "{} Failed to decrypt packet ({}). Trying with default key.", CLIENT_STR_LOG_PREFIX_(this->client), error);
use_default_key = true;
goto decrypt_packet;
}
} else {
logTrace(this->client->getServerId(), "{} Failed to decrypt packet ({}). Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), error);
return;
}
}
packet_parser.set_decrypted();
} else if(is_command && this->client->state != ConnectionState::INIT_HIGH) {
logTrace(this->client->getServerId(), "{} Voice client {}/{} tried to send a unencrypted command packet. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client), client->getDisplayName(), this->client->getLoggingPeerIp());
return;
}
#ifndef CONNECTION_NO_STATISTICS
if(this->client && this->client->getServer())
this->client->connectionStatistics->logIncomingPacket(stats::ConnectionStatistics::category::from_type(packet_parser.type()), buffer.length());
#endif
#ifdef LOG_INCOMPING_PACKET_FRAGMENTS
debugMessage(lstream << CLIENT_LOG_PREFIX << "Recived packet. PacketId: " << packet->packetId() << " PacketType: " << packet->type().name() << " Flags: " << packet->flags() << " - " << packet->data() << endl);
#endif
if(is_command) {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(packet_parser.type())];
CommandFragment fragment_entry{
packet_parser.packet_id(),
packet_parser.estimated_generation(),
packet_parser.flags(),
(uint32_t) packet_parser.payload_length(),
packet_parser.payload().own_buffer()
};
{
unique_lock queue_lock(fragment_buffer.buffer_lock);
if(!fragment_buffer.insert_index(packet_parser.packet_id(), std::move(fragment_entry))) {
logTrace(this->client->getServerId(), "{} Failed to insert command packet into command packet buffer.", CLIENT_STR_LOG_PREFIX_(this->client));
return;
}
}
this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
auto voice_server = this->client->voice_server;
auto voice_server = this->client_handle->voice_server;
if(voice_server)
voice_server->schedule_command_handling(this->client);
} else {
if(packet_parser.type() == protocol::VOICE || packet_parser.type() == protocol::VOICE_WHISPER)
this->client->handlePacketVoice(packet_parser);
else if(packet_parser.type() == protocol::ACK || packet_parser.type() == protocol::ACK_LOW)
this->client->handlePacketAck(packet_parser);
else if(packet_parser.type() == protocol::PING || packet_parser.type() == protocol::PONG)
this->client->handlePacketPing(packet_parser);
else {
logError(this->client->getServerId(), "{} Received hand decoded packet, but we've no method to handle it. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
}
voice_server->schedule_command_handling(this->client_handle);
}
}
bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer /* incl. mac etc */) {
ClientPacketParser packet_parser{buffer};
if(!packet_parser.valid() || !packet_parser.is_encrypted()) return false;
bool VoiceClientConnection::verify_encryption(const pipes::buffer_view &buffer) {
return this->packet_decoder_.verify_encryption(buffer);
}
assert(packet_parser.type() >= 0 && packet_parser.type() < this->incoming_generation_estimators.size());
return this->crypt_handler.verify_encryption(buffer, packet_parser.packet_id(), this->incoming_generation_estimators[packet_parser.type()].generation());
void VoiceClientConnection::handle_decoded_packet(const ts::protocol::ClientPacketParser &packet) {
auto packet_type = packet.type();
if(packet_type == PacketType::VOICE ) {
std::shared_lock clock{this->client_mutex};
if(!this->client_handle) return; //TODO: Warn etc?
this->client_handle->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0);
} else if(packet_type == PacketType::VOICE_WHISPER) {
std::shared_lock clock{this->client_mutex};
if(!this->client_handle) return; //TODO: Warn etc?
this->client_handle->handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0);
} else if(packet_type == PacketType::ACK || packet_type == PacketType::ACK_LOW) {
string error{};
if(!this->acknowledge_handler.process_acknowledge(packet.type(), packet.payload(), error))
debugMessage(this->server_id, "{} Failed to handle acknowledge: {}", this->client_log_prefix(), error);
} else if(packet_type == PacketType::PING) {
/* just send a pong response */
char buffer[2];
le2be16(packet.packet_id(), buffer);
auto pkt = make_shared<ServerPacket>(PacketTypeInfo::Pong, pipes::buffer_view{buffer, 2});
pkt->enable_flag(PacketFlag::Unencrypted);
this->send_packet(pkt);
} else if(packet_type == PacketType::PONG) {
if(packet.payload_length() < 2) return;
uint16_t ping_id = be2le16((char*) packet.payload().data_ptr());
//TODO: Ping handler handle ping
} else if(packet_type == PacketType::COMMAND || packet_type == PacketType::COMMAND_LOW) {
logCritical(this->server_id, "{} Received command packet within handle_decoded_packet callback.", this->client_log_prefix());
} else if(packet_type == PacketType::INIT1) {
logCritical(this->server_id, "{} Received init packet within handle_decoded_packet callback.", this->client_log_prefix());
}
}
void VoiceClientConnection::handle_decode_error(ts::server::server::udp::PacketDecodeResult error, const std::string &message) {
using PacketDecodeResult = ts::server::server::udp::PacketDecodeResult;
switch (error) {
case PacketDecodeResult::DECRYPT_FAILED:
logWarning(this->server_id, "{} Dropping incoming packet. Failed to decrypt packet ({}).", this->client_log_prefix(), message);
break;
case PacketDecodeResult::DECRYPT_KEY_GEN_FAILED:
logWarning(this->server_id, "{} Dropping incoming packet. Failed to generate crypto key for packet.", this->client_log_prefix());
break;
case PacketDecodeResult::BUFFER_OVERFLOW:
logWarning(this->server_id, "{} Dropping incoming packet because queue has a buffer overflow ({}).", this->client_log_prefix(), message);
break;
case PacketDecodeResult::COMMAND_INSTERT_FAILED:
logWarning(this->server_id, "{} Dropping incoming packet because we failed to register the command packet.", this->client_log_prefix());
break;
#if 0
case PacketDecodeResult::DUPLICATED_PACKET:
logWarning(this->server_id, "{} Dropping incoming packet because it has already be processed.", this->client_log_prefix());
break;
case PacketDecodeResult::INVALID_PACKET:
logWarning(this->server_id, "{} Dropping incoming packet because its invalid.", this->client_log_prefix());
break;
#else
case PacketDecodeResult::INVALID_PACKET:
case PacketDecodeResult::DUPLICATED_PACKET:
#endif
case PacketDecodeResult::SUCCESS:
break;
}
}
void VoiceClientConnection::send_packet_acknowledge(uint16_t packet_id, bool command_low) {
char buffer[2];
le2be16(packet_id, buffer);
auto packet = make_shared<protocol::ServerPacket>(command_low ? protocol::PacketTypeInfo::AckLow : protocol::PacketTypeInfo::Ack, pipes::buffer_view{buffer, 2});
packet->enable_flag(PacketFlag::Unencrypted);
if(!command_low) packet->enable_flag(protocol::PacketFlag::NewProtocol);
this->send_packet(packet);
#ifdef PKT_LOG_ACK
logTrace(this->getServerId(), "{}[Acknowledge][Server -> Client] Sending acknowledge for {}", CLIENT_STR_LOG_PREFIX, packetId);
#endif
}
void VoiceClientConnection::send_packet_ping(uint16_t& ping_id) {
auto packet = make_shared<ServerPacket>(PacketTypeInfo::Ping, pipes::buffer_view{});
packet->enable_flag(PacketFlag::Unencrypted);
this->send_packet(packet, false, true); /* prepare directly so the packet gets a packet id */
ping_id = packet->packetId();
}
void VoiceClientConnection::execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */) {
if(this->client->state >= ConnectionState::DISCONNECTING || !this->client->getServer())
if((int) this->connection_state_ >= (int) ClientConnectionState::DISCONNECTING)
return;
//TODO: Remove the buffer_execute_lock and use the one within the this->client->handlePacketCommand method
unique_lock<std::recursive_timed_mutex> buffer_execute_lock;
std::shared_lock clock{this->client_handle};
if(!this->client_handle) return; //TODO: Warn etc?
using CommandReassembleResult = ts::server::server::udp::CommandReassembleResult;
pipes::buffer payload{};
uint16_t packet_id{};
auto reexecute_handle = this->next_reassembled_command(buffer_execute_lock, payload, packet_id);
bool command_low{false};
auto command_status = this->packet_decoder_.reassemble_command(payload, command_low);
switch (command_status) {
case CommandReassembleResult::SUCCESS:
case CommandReassembleResult::MORE_COMMANDS_PENDING:
break;
if(!payload.empty()){
auto startTime = system_clock::now();
try {
this->client->handlePacketCommand(payload);
} catch (std::exception& ex) {
logCritical(this->client->getServerId(), "{} Exception reached root tree! {}", CLIENT_STR_LOG_PREFIX_(this->client), ex.what());
}
case CommandReassembleResult::NO_COMMANDS_PENDING:
return;
auto end = system_clock::now();
if(end - startTime > milliseconds(10)) {
logError(this->client->getServerId(),
"{} Handling of command packet needs more than 10ms ({}ms)",
CLIENT_STR_LOG_PREFIX_(this->client),
duration_cast<milliseconds>(end - startTime).count()
);
}
}
if(buffer_execute_lock.owns_lock())
buffer_execute_lock.unlock();
auto voice_server = this->client->voice_server;
if(voice_server && reexecute_handle)
this->client->voice_server->schedule_command_handling(this->client);
}
/* buffer_execute_lock: lock for in order execution */
bool VoiceClientConnection::next_reassembled_command(unique_lock<std::recursive_timed_mutex>& buffer_execute_lock, pipes::buffer& result, uint16_t& packet_id) {
command_fragment_buffer_t* buffer{nullptr};
unique_lock<std::recursive_timed_mutex> buffer_lock; /* general buffer lock */
bool have_more{false};
{
//FIXME: Currently command low packets cant be handeled if there is a command packet stuck in reassamble
/* handle commands before command low packets */
for(auto& buf : this->_command_fragment_buffers) {
unique_lock ring_lock(buf.buffer_lock, try_to_lock);
if(!ring_lock.owns_lock()) continue;
if(buf.front_set()) {
if(!buffer) { /* lets still test for reexecute */
buffer_execute_lock = unique_lock(buf.execute_lock, try_to_lock);
if(!buffer_execute_lock.owns_lock()) continue;
buffer_lock = move(ring_lock);
buffer = &buf;
} else {
have_more = true;
break;
}
}
}
case CommandReassembleResult::COMMAND_DECOMPRESS_FAILED:
case CommandReassembleResult::COMMAND_TOO_LARGE:
case CommandReassembleResult::SEQUENCE_LENGTH_TOO_LONG:
//TODO: Shutdown connection?
logError(this->server_id, "{} Failed to reassemble next command ({}). This will cause the connection to fail.", this->client_log_prefix(), (int) command_status);
return;
}
if(!buffer)
return false; /* we've no packets */
uint8_t packet_flags{0};
pipes::buffer payload{};
/* lets find out if we've to reassemble the packet */
auto& first_buffer = buffer->slot_value(0);
packet_id = first_buffer.packet_id;
if(first_buffer.packet_flags & PacketFlag::Fragmented) {
uint16_t sequence_length{1};
size_t total_payload_length{first_buffer.payload_length};
do {
if(sequence_length >= buffer->capacity()) {
logError(this->client->getServerId(), "{} Command fragment buffer is full, and there is not fragmented packet end. Dropping full buffer which will probably cause a connection loss.", CLIENT_STR_LOG_PREFIX_(this->client));
buffer->clear();
return false; /* we've nothing to handle */
}
if(!buffer->slot_set(sequence_length))
return false; /* we need more packets */
auto& packet = buffer->slot_value(sequence_length++);
total_payload_length += packet.payload_length;
if(packet.packet_flags & PacketFlag::Fragmented) {
/* yep we find the end */
break;
}
} while(true);
/* ok we have all fragments lets reassemble */
/*
* Packet sequence could never be so long. If it is so then the data_length() returned an invalid value.
* We're checking it here because we dont want to make a huge allocation
*/
assert(total_payload_length < 512 * 1024 * 1024);
pipes::buffer packet_buffer{total_payload_length};
char* packet_buffer_ptr = &packet_buffer[0];
size_t packet_count{0};
packet_flags = buffer->slot_value(0).packet_flags;
while(packet_count < sequence_length) {
auto fragment = buffer->pop_front();
memcpy(packet_buffer_ptr, fragment.payload.data_ptr(), fragment.payload_length);
packet_buffer_ptr += fragment.payload_length;
packet_count++;
}
#ifndef _NDEBUG
if((packet_buffer_ptr - 1) != &packet_buffer[packet_buffer.length() - 1]) {
logCritical(this->client->getServer()->getServerId(),
"Buffer over/underflow: packet_buffer_ptr != &packet_buffer[packet_buffer.length() - 1]; packet_buffer_ptr := {}; packet_buffer.end() := {}",
(void*) packet_buffer_ptr,
(void*) &packet_buffer[packet_buffer.length() - 1]
);
}
#endif
payload = packet_buffer;
} else {
auto packet = buffer->pop_front();
packet_flags = packet.packet_flags;
payload = packet.payload;
auto startTime = system_clock::now();
try {
this->client_handle->handlePacketCommand(payload);
} catch (std::exception& ex) {
logCritical(this->server_id, "{} An exception has been thrown within command handling, which reached to root handler. This should not happen! (Message: {})", this->client_log_prefix(), ex.what());
}
have_more |= buffer->front_set(); /* set the more flag if we have more to process */
buffer_lock.unlock();
if(packet_flags & PacketFlag::Compressed) {
std::string error{};
auto decompressed_size = compression::qlz_decompressed_size(payload.data_ptr(), payload.length());
auto buffer = buffer::allocate_buffer(decompressed_size);
if(!compression::qlz_decompress_payload(payload.data_ptr(), buffer.data_ptr(), &decompressed_size)) {
logTrace(this->client->getServerId(), "{} Failed to decompress received command. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return false;
}
payload = buffer.range(0, decompressed_size);
auto end = system_clock::now();
if(end - startTime > milliseconds(10)) {
auto index = payload.find(" ");
std::string command{};
if(index == std::string::npos)
command = payload.string();
else
command = payload.view(0, index).string();
logWarning(this->server_id, "{} Command handling of command \"{}\" required more than 10ms ({}ms)",this->client_log_prefix(),
command,
std::chrono::duration_cast<std::chrono::milliseconds>(end - startTime).count()
);
}
result = std::move(payload);
return have_more;
if(command_status == CommandReassembleResult::MORE_COMMANDS_PENDING) {
auto voice_server = this->client_handle->voice_server;
if(voice_server)
voice_server->schedule_command_handling(this->client_handle);
}
}
void VoiceClientConnection::send_packet(const shared_ptr<protocol::ServerPacket>& original_packet, bool copy, bool prepare_directly) {
if(this->client->state == ConnectionState::DISCONNECTED)
if(this->connection_state_ == ClientConnectionState::DISCONNECTED)
return;
using EncodeFlags = server::server::udp::PacketEncoder::EncodeFlags;
int flags{EncodeFlags::none};
if(!copy)
flags |= EncodeFlags::no_copy;
flags |= (unsigned) EncodeFlags::no_copy;
if(prepare_directly)
flags |= EncodeFlags::sync;
flags |= (unsigned) EncodeFlags::sync;
if(this->packet_encoder_.encode_packet(original_packet, (EncodeFlags) flags))
this->register_client_for_write();
}
#if 0
bool VoiceClientConnection::encode_packet(vector<pipes::buffer> &result, const shared_ptr<ServerPacket> &packet, std::unique_lock<std::mutex>& work_lock) {
assert(work_lock.owns_lock());
string error = "success";
if(packet->type().compressable() && !packet->memory_state.fragment_entry) {
packet->enable_flag(PacketFlag::Compressed);
if(!this->compress_handler.progressPacketOut(packet.get(), error)) {
logError(this->getClient()->getServerId(), "{} Could not compress outgoing packet.\nThis could cause fatal failed for the client.\nError: {}", error);
return false;
}
}
std::vector<shared_ptr<ServerPacket>> fragments;
fragments.reserve((size_t) (packet->data().length() / packet->type().max_length()) + 1);
if(packet->data().length() > packet->type().max_length()) {
if(!packet->type().fragmentable()) {
logError(this->client->getServerId(), "{} We've tried to send a too long, not fragmentable, packet. Dropping packet of type {} with length {}", CLIENT_STR_LOG_PREFIX_(this->client), packet->type().name(), packet->data().length());
return false;
}
{ //Split packets
auto buffer = packet->data();
const auto max_length = packet->type().max_length();
while(buffer.length() > max_length * 2) {
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer.view(0, max_length).dup(buffer::allocate_buffer(max_length))));
buffer = buffer.range((size_t) max_length);
}
if(buffer.length() > max_length) { //Divide rest by 2
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer.view(0, buffer.length() / 2).dup(buffer::allocate_buffer(buffer.length() / 2))));
buffer = buffer.range(buffer.length() / 2);
}
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer));
for(const auto& frag : fragments) {
frag->setFragmentedEntry(true);
frag->enable_flag(PacketFlag::NewProtocol);
}
}
assert(fragments.size() >= 2);
fragments.front()->enable_flag(PacketFlag::Fragmented);
if(packet->has_flag(PacketFlag::Compressed))
fragments.front()->enable_flag(PacketFlag::Compressed);
fragments.back()->enable_flag(PacketFlag::Fragmented);
if(packet->getListener())
fragments.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :)
} else {
fragments.push_back(packet);
}
result.reserve(fragments.size());
/* apply packet ids */
for(const auto& fragment : fragments) {
if(!fragment->memory_state.id_branded)
fragment->applyPacketId(this->packet_id_manager);
}
work_lock.unlock(); /* the rest could be unordered */
CryptHandler::key_t crypt_key{};
CryptHandler::nonce_t crypt_nonce{};
auto statistics = this->client ? this->client->connectionStatistics : nullptr;
for(const auto& fragment : fragments) {
if(fragment->has_flag(PacketFlag::Unencrypted)) {
this->crypt_handler.write_default_mac(fragment->mac().data_ptr());
} else {
if(!this->client->crypto.protocol_encrypted) {
crypt_key = CryptHandler::default_key;
crypt_nonce = CryptHandler::default_nonce;
} else {
if(!this->crypt_handler.generate_key_nonce(false, fragment->type().type(), fragment->packetId(), fragment->generationId(), crypt_key, crypt_nonce)) {
logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce for sending a packet. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
return false;
}
}
auto crypt_result = this->crypt_handler.encrypt(fragment->header().data_ptr(), fragment->header().length(),
fragment->data().data_ptr(), fragment->data().length(),
fragment->mac().data_ptr(),
crypt_key, crypt_nonce, error);
if(!crypt_result){
logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error);
return false;
}
}
#ifndef CONNECTION_NO_STATISTICS
if(statistics)
statistics->logOutgoingPacket(*fragment);
#endif
this->acknowledge_handler.process_packet(*fragment);
result.push_back(fragment->buffer());
}
return true;
}
#endif
void VoiceClientConnection::handle_encode_error(const shared_ptr<protocol::ServerPacket> &packet,
ts::server::server::udp::PacketEncodeResult &result, std::string &message) {
ts::server::server::udp::PacketEncodeResult result, const std::string &message) {
using PacketEncodeResult = ts::server::server::udp::PacketEncodeResult;
switch (result) {
case PacketEncodeResult::PACKET_TOO_LARGE:
@ -596,42 +358,23 @@ bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_poin
void VoiceClientConnection::reset() {
this->packet_encoder_.reset();
this->packet_decoder_.reset();
this->acknowledge_handler.reset();
this->crypt_handler.reset();
{
lock_guard buffer_lock(this->packet_buffer_lock);
for(auto& buffer : this->_command_fragment_buffers)
buffer.reset();
}
}
void VoiceClientConnection::force_insert_command(const pipes::buffer_view &buffer) {
CommandFragment fragment_entry{
0,
0,
this->packet_decoder_.force_insert_command(buffer);
PacketFlag::Unencrypted,
(uint32_t) buffer.length(),
buffer.own_buffer()
};
std::shared_lock clock{this->client_handle};
if(!this->client_handle) return; //TODO: Warn etc?
{
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
unique_lock queue_lock(fragment_buffer.buffer_lock);
fragment_buffer.push_front(std::move(fragment_entry));
}
auto voice_server = this->client->voice_server;
auto voice_server = this->client_handle->voice_server;
if(voice_server)
voice_server->schedule_command_handling(this->client);
voice_server->schedule_command_handling(this->client_handle);
}
void VoiceClientConnection::register_initiv_packet() {
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
unique_lock buffer_lock(fragment_buffer.buffer_lock);
fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */
void VoiceClientConnection::tick() {
//TODO: Tick ping handler
}

View File

@ -17,6 +17,7 @@
#include <protocol/generation.h>
#include "./PacketEncoder.h"
#include "./PacketDecoder.h"
#include "./PingHandler.h"
//#define LOG_ACK_SYSTEM
#ifdef LOG_ACK_SYSTEM
@ -33,24 +34,6 @@ namespace ts {
}
namespace connection {
struct CommandFragment {
uint16_t packet_id{0};
uint16_t packet_generation{0};
uint8_t packet_flags{0};
uint32_t payload_length : 24;
pipes::buffer payload{};
CommandFragment() { this->payload_length = 0; }
CommandFragment(uint16_t packetId, uint16_t packetGeneration, uint8_t packetFlags, uint32_t payloadLength, pipes::buffer payload)
: packet_id{packetId}, packet_generation{packetGeneration}, packet_flags{packetFlags}, payload_length{payloadLength}, payload{std::move(payload)} {}
CommandFragment& operator=(const CommandFragment&) = default;
CommandFragment(const CommandFragment& other) = default;
CommandFragment(CommandFragment&&) = default;
};
static_assert(sizeof(CommandFragment) == 8 + sizeof(pipes::buffer));
enum struct WriteBufferStatus {
EMPTY,
BUFFERS_LEFT,
@ -60,13 +43,17 @@ namespace ts {
UNSET
};
enum struct ClientConnectionState {
INITIALITZING, /* crypto setup */
CONNECTED, /* basic connection has been established */
DISCONNECTING, /* connection is already disconnecting */
DISCONNECTED /* connection has been (maybe successfully) closed */
};
class VoiceClientConnection {
friend class server::VoiceServer;
friend class server::VoiceClient;
public:
typedef protocol::PacketRingBuffer<CommandFragment, 32, CommandFragment> command_fragment_buffer_t;
typedef std::array<command_fragment_buffer_t, 2> command_packet_reassembler;
explicit VoiceClientConnection(server::VoiceClient*);
virtual ~VoiceClientConnection();
@ -81,15 +68,20 @@ namespace ts {
*/
bool encode_packets();
[[nodiscard]] inline server::server::udp::PacketEncoder& packet_encoder() { return this->packet_encoder_; }
[[nodiscard]] inline server::server::udp::PacketDecoder& packet_decoder() { return this->packet_decoder_; }
/* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */
[[nodiscard]] WriteBufferStatus pop_write_buffer(pipes::buffer& /* buffer */);
bool wait_empty_write_and_prepare_queue(std::chrono::time_point<std::chrono::system_clock> until = std::chrono::time_point<std::chrono::system_clock>());
void reset();
void tick();
void force_insert_command(const pipes::buffer_view& /* payload */);
void register_initiv_packet();
void send_packet_acknowledge(uint16_t /* packet id */, bool /* is command low */);
void send_packet_ping(uint16_t& /* ping id */);
protected:
void handle_incoming_datagram(const pipes::buffer_view &buffer);
bool verify_encryption(const pipes::buffer_view& /* full packet */);
@ -100,34 +92,30 @@ namespace ts {
std::shared_mutex client_mutex{};
server::VoiceClient* client_handle{nullptr};
ClientConnectionState connection_state_{ClientConnectionState::INITIALITZING};
CryptHandler crypt_handler{};
CompressionHandler compress_handler{};
AcknowledgeManager acknowledge_handler{};
//Handle stuff
void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */);
bool next_reassembled_command(std::unique_lock<std::recursive_timed_mutex> &buffer_execute_lock /* packet channel execute lock */, pipes::buffer & /* buffer*/, uint16_t& /* packet id */);
/* ---------- Write declarations ---------- */
spin_lock write_queue_lock; /* queue access isn't for long in general */
std::deque<pipes::buffer> write_queue;
server::server::udp::PacketEncoder packet_encoder_;
server::server::udp::PacketDecoder packet_decoder_;
server::server::udp::PingHandler ping_handler_{};
//Handle stuff
[[nodiscard]] std::string client_log_prefix();
void execute_handle_command_packets(const std::chrono::system_clock::time_point& /* scheduled */);
/* will be called on the IO thread or if sync has been set directly in any thread */
void handle_encode_error(const std::shared_ptr<protocol::ServerPacket> &/* the packet */, ts::server::server::udp::PacketEncodeResult& /* error */, std::string& /* custom message */);
void handle_encode_error(const std::shared_ptr<protocol::ServerPacket> &/* the packet */, ts::server::server::udp::PacketEncodeResult /* error */, const std::string& /* custom message */);
void handle_encoded_buffers(const std::vector<pipes::buffer>& /* buffers */);
/* will be called on the IO thread */
void handle_decoded_packet(const protocol::ClientPacketParser&);
void handle_decode_error();
static inline uint8_t command_fragment_buffer_index(uint8_t packet_index) {
return packet_index & 0x1U; /* use 0 for command and 1 for command low */
}
[[nodiscard]] std::string client_log_prefix();
void handle_decode_error(ts::server::server::udp::PacketDecodeResult /* error */, const std::string& /* custom message */);
};
}
}

View File

@ -69,8 +69,8 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) {
state_lock.unlock();
this->connection->reset();
this->connection->register_initiv_packet();
this->crypto.protocol_encrypted = false;
this->connection->packet_decoder().register_initiv_packet();
this->connection->packet_decoder().set_protocol_encrypted(false);
bool use_teaspeak = command.hasParm("teaspeak");
if(use_teaspeak ? !config::server::clients::teaspeak : !config::server::clients::teamspeak)
@ -175,7 +175,9 @@ ts::command_result VoiceClient::handleCommandClientInitIv(Command& command) {
logError(this->server->getServerId(), "Could not setup shared secret! (" + error + ")");
return ts::command_result{error::vs_critical};
}
this->crypto.protocol_encrypted = true;
auto& decoder = this->connection->packet_decoder();
decoder.set_protocol_encrypted(true);
}
}
return ts::command_result{error::ok};
@ -190,7 +192,9 @@ ts::command_result VoiceClient::handleCommandClientEk(Command& cmd) {
this->connection->getCryptHandler()->setupSharedSecretNew(this->crypto.alpha, this->crypto.beta, (char*) private_key.data(), client_key.data());
this->connection->acknowledge_handler.reset();
this->crypto.protocol_encrypted = true;
this->sendAcknowledge(1); //Send the encrypted acknowledge (most the times the second packet; If not we're going into the resend loop)
auto& decoder = this->connection->packet_decoder();
decoder.set_protocol_encrypted(true);
this->connection->send_packet_acknowledge(1, false); //Send the encrypted acknowledge (most the times the second packet; If not we're going into the resend loop)
return ts::command_result{error::ok};
}

View File

@ -1,6 +1,4 @@
#include <tommath.h>
#include <misc/endianness.h>
#include <algorithm>
#include <log/LogUtils.h>
#include "../web/WebClient.h"
#include "VoiceClient.h"
@ -11,10 +9,6 @@ using namespace ts::server;
using namespace ts::protocol;
//#define PKT_LOG_PING
/* should never happen! */
void VoiceClient::handlePacketInit(const ts::protocol::ClientPacketParser &) {}
//TODO Packet handlers -> move back to voice client?
void VoiceClient::handlePacketCommand(const pipes::buffer_view& command_string) {
std::unique_ptr<Command> command;
command_result result{};
@ -40,48 +34,4 @@ void VoiceClient::handlePacketCommand(const pipes::buffer_view& command_string)
handle_error:
this->notifyError(result);
result.release_details();
}
void VoiceClient::handlePacketPing(const protocol::ClientPacketParser& packet) {
if (packet.type() == protocol::PONG) {
if(packet.payload_length() < 2) return;
uint16_t id = be2le16((char*) packet.payload().data_ptr());
if (this->lastPingId == id) {
#ifdef PKT_LOG_PING
logMessage(this->getServerId(), "{}[Ping] Got a valid pong for ping {}. Required time: {}", CLIENT_STR_LOG_PREFIX, id, duration_cast<microseconds>(system_clock::now() - this->lastPingRequest).count() / 1000.f);
#endif
this->lastPingResponse = system_clock::now();
this->ping = std::chrono::duration_cast<std::chrono::milliseconds>(this->lastPingResponse - this->lastPingRequest);
}
#ifdef PKT_LOG_PING
else {
logMessage(this->getServerId(), "{}[Ping] Got invalid pong. (Responded pong id {} but expected {})", CLIENT_STR_LOG_PREFIX, packet->packetId(), this->lastPingId);
}
#endif
return;
}
#ifdef PKT_LOG_PING
logMessage(this->getServerId(), "{}[Ping] Sending pong for client requested ping {}", CLIENT_STR_LOG_PREFIX, packet->packetId());
#endif
char buffer[2];
le2be16(packet.packet_id(), buffer);
auto pkt = make_shared<ServerPacket>(PacketTypeInfo::Pong, pipes::buffer_view{buffer, 2});
pkt->enable_flag(PacketFlag::Unencrypted);
this->connection->send_packet(pkt);
}
void VoiceClient::handlePacketVoice(const protocol::ClientPacketParser& packet) {
if (packet.type() == protocol::VOICE) {
SpeakingClient::handlePacketVoice(packet.payload(), (packet.flags() & PacketFlag::Compressed) > 0, (packet.flags() & PacketFlag::Fragmented) > 0);
} else if(packet.type() == protocol::VOICE_WHISPER) {
SpeakingClient::handlePacketVoiceWhisper(packet.payload(), (packet.flags() & PacketFlag::NewProtocol) > 0);
}
}
void VoiceClient::handlePacketAck(const protocol::ClientPacketParser& packet) {
string error{};
if(!this->connection->acknowledge_handler.process_acknowledge(packet.type(), packet.payload(), error))
debugMessage(this->getServerId(), "{} Failed to handle acknowledge: {}", CLIENT_STR_LOG_PREFIX, error);
}

View File

@ -1,24 +0,0 @@
#include <Definitions.h>
#include <log/LogUtils.h>
#include "../../InstanceHandler.h"
#include "VoiceClient.h"
using namespace std;
using namespace ts::server;
using namespace ts::protocol;
extern InstanceHandler* serverInstance;
void VoiceClient::sendPingRequest() {
this->lastPingRequest = std::chrono::system_clock::now();
auto packet = make_shared<ServerPacket>(PacketTypeInfo::Ping, pipes::buffer_view{});
packet->enable_flag(PacketFlag::Unencrypted);
this->connection->send_packet(packet, false, true); /* prepare directly so the packet gets a packet id */
this->lastPingId = packet->packetId();
#ifdef PKT_LOG_PING
logMessage(this->getServerId(), "{}[Ping] Sending a ping request with it {}", CLIENT_STR_LOG_PREFIX, this->lastPingId);
#endif
}

2
shared

@ -1 +1 @@
Subproject commit 2ffa12489d4c7b16789ec2a93d82d02ee412b264
Subproject commit 62292af022798db2aba9ae5aa69aebbb849fb75a