A lot of updates (Speed improvement)
This commit is contained in:
@@ -32,23 +32,16 @@ using namespace ts::server;
|
||||
VoiceClientConnection::VoiceClientConnection(VoiceClient* client) : client(client) {
|
||||
memtrack::allocated<VoiceClientConnection>(this);
|
||||
|
||||
this->acknowledge_handler.destroy_packet = [](void* packet) {
|
||||
reinterpret_cast<OutgoingServerPacket*>(packet)->unref();
|
||||
};
|
||||
|
||||
this->crypt_handler.reset();
|
||||
debugMessage(client->getServer()->getServerId(), "Allocated new voice client connection at {}", (void*) this);
|
||||
}
|
||||
|
||||
VoiceClientConnection::~VoiceClientConnection() {
|
||||
/* locking here should be useless, but just to ensure! */
|
||||
{
|
||||
lock_guard write_queue_lock(this->write_queue_lock);
|
||||
this->write_queue.clear();
|
||||
}
|
||||
|
||||
for(auto& category : this->write_preprocess_queues) {
|
||||
lock_guard work_lock{category.work_lock};
|
||||
lock_guard queue_lock{category.queue_lock};
|
||||
|
||||
category.queue.clear();
|
||||
}
|
||||
this->reset();
|
||||
this->client = nullptr;
|
||||
memtrack::freed<VoiceClientConnection>(this);
|
||||
}
|
||||
@@ -96,6 +89,12 @@ void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& b
|
||||
#endif
|
||||
|
||||
auto is_command = packet_parser.type() == protocol::COMMAND || packet_parser.type() == protocol::COMMAND_LOW;
|
||||
|
||||
/* in previous versions we checked if the arrived packet is "worth decoding".
|
||||
* But since in general a command buffer underflow is much more unlikely, especially because most packets are not even command packets,
|
||||
* it's better we just skip that step and decode it anyways */
|
||||
|
||||
#if 0
|
||||
/* pretest if the packet is worth the effort of decoding it */
|
||||
if(is_command) {
|
||||
/* handle the order stuff */
|
||||
@@ -120,6 +119,7 @@ void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& b
|
||||
return;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
//NOTICE I found out that the Compressed flag is set if the packet contains an audio header
|
||||
|
||||
@@ -194,7 +194,19 @@ void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& b
|
||||
unique_lock queue_lock(fragment_buffer.buffer_lock);
|
||||
|
||||
if(!fragment_buffer.insert_index(packet_parser.packet_id(), std::move(fragment_entry))) {
|
||||
logTrace(this->client->getServerId(), "{} Failed to insert command packet into command packet buffer.", CLIENT_STR_LOG_PREFIX_(this->client));
|
||||
auto ignore_type = fragment_buffer.accept_index(packet_parser.packet_id());
|
||||
debugMessage(this->client->getServerId(), "{} Dropping command packet because command assembly buffer has an {} ({}|{}|{})",
|
||||
CLIENT_STR_LOG_PREFIX_(this->client),
|
||||
ignore_type == -1 ? "underflow" : "overflow",
|
||||
fragment_buffer.capacity(),
|
||||
fragment_buffer.current_index(),
|
||||
packet_parser.packet_id()
|
||||
);
|
||||
|
||||
if(ignore_type == -1) { /* underflow */
|
||||
/* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */
|
||||
this->client->sendAcknowledge(packet_parser.packet_id(), packet_parser.type() == protocol::COMMAND_LOW);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -204,8 +216,10 @@ void VoiceClientConnection::handle_incoming_datagram(const pipes::buffer_view& b
|
||||
if(voice_server)
|
||||
voice_server->schedule_command_handling(this->client);
|
||||
} else {
|
||||
if(packet_parser.type() == protocol::VOICE || packet_parser.type() == protocol::VOICE_WHISPER)
|
||||
if(packet_parser.type() == protocol::VOICE)
|
||||
this->client->handlePacketVoice(packet_parser);
|
||||
else if(packet_parser.type() == protocol::VOICE_WHISPER)
|
||||
this->client->handlePacketVoiceWhisper(packet_parser);
|
||||
else if(packet_parser.type() == protocol::ACK || packet_parser.type() == protocol::ACK_LOW)
|
||||
this->client->handlePacketAck(packet_parser);
|
||||
else if(packet_parser.type() == protocol::PING || packet_parser.type() == protocol::PONG)
|
||||
@@ -376,273 +390,155 @@ bool VoiceClientConnection::next_reassembled_command(unique_lock<std::recursive_
|
||||
logTrace(this->client->getServerId(), "{} Command packet has a too large compressed size. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
|
||||
return false;
|
||||
}
|
||||
auto buffer = buffer::allocate_buffer(decompressed_size);
|
||||
if(!compression::qlz_decompress_payload(payload.data_ptr(), buffer.data_ptr(), &decompressed_size)) {
|
||||
auto decompress_buffer = buffer::allocate_buffer(decompressed_size);
|
||||
if(!compression::qlz_decompress_payload(payload.data_ptr(), decompress_buffer.data_ptr(), &decompressed_size)) {
|
||||
logTrace(this->client->getServerId(), "{} Failed to decompress received command. Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
|
||||
return false;
|
||||
}
|
||||
|
||||
payload = buffer.range(0, decompressed_size);
|
||||
payload = decompress_buffer.range(0, decompressed_size);
|
||||
}
|
||||
|
||||
result = std::move(payload);
|
||||
return have_more;
|
||||
}
|
||||
|
||||
|
||||
void VoiceClientConnection::sendPacket(const shared_ptr<protocol::ServerPacket>& original_packet, bool copy, bool prepare_directly) {
|
||||
if(this->client->state == ConnectionState::DISCONNECTED)
|
||||
return;
|
||||
|
||||
shared_ptr<protocol::ServerPacket> packet;
|
||||
if(copy) {
|
||||
packet = protocol::ServerPacket::from_buffer(original_packet->buffer().dup(buffer::allocate_buffer(original_packet->buffer().length())));
|
||||
if(original_packet->getListener())
|
||||
packet->setListener(std::move(original_packet->getListener()));
|
||||
packet->memory_state.flags = original_packet->memory_state.flags;
|
||||
bool VoiceClientConnection::prepare_outgoing_packet(ts::protocol::OutgoingServerPacket *packet) {
|
||||
if(packet->type_and_flags & PacketFlag::Unencrypted) {
|
||||
this->crypt_handler.write_default_mac(packet->mac);
|
||||
} else {
|
||||
packet = original_packet;
|
||||
}
|
||||
CryptHandler::key_t crypt_key{};
|
||||
CryptHandler::nonce_t crypt_nonce{};
|
||||
std::string error{};
|
||||
|
||||
auto type = WritePreprocessCategory::from_type(packet->type().type());
|
||||
auto& queue = this->write_preprocess_queues[type];
|
||||
if(prepare_directly) {
|
||||
vector<pipes::buffer> buffers;
|
||||
this->prepare_process_count++;
|
||||
|
||||
{
|
||||
unique_lock work_lock{queue.work_lock};
|
||||
if(!this->prepare_packet_for_write(buffers, packet, work_lock)) {
|
||||
logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client));
|
||||
this->prepare_process_count--;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* enqueue buffers for write */
|
||||
{
|
||||
lock_guard write_queue_lock(this->write_queue_lock);
|
||||
this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end());
|
||||
}
|
||||
this->prepare_process_count--; /* we're now done preparing */
|
||||
} else {
|
||||
lock_guard queue_lock{queue.queue_lock};
|
||||
queue.queue.push_back(packet);
|
||||
queue.has_work = true;
|
||||
}
|
||||
this->triggerWrite();
|
||||
}
|
||||
|
||||
bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &result, const shared_ptr<ServerPacket> &packet, std::unique_lock<std::mutex>& work_lock) {
|
||||
assert(work_lock.owns_lock());
|
||||
|
||||
string error = "success";
|
||||
|
||||
if(packet->type().compressable() && !packet->memory_state.fragment_entry) {
|
||||
packet->enable_flag(PacketFlag::Compressed);
|
||||
if(!this->compress_handler.progressPacketOut(packet.get(), error)) {
|
||||
logError(this->getClient()->getServerId(), "{} Could not compress outgoing packet.\nThis could cause fatal failed for the client.\nError: {}", error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<shared_ptr<ServerPacket>> fragments;
|
||||
fragments.reserve((size_t) (packet->data().length() / packet->type().max_length()) + 1);
|
||||
|
||||
if(packet->data().length() > packet->type().max_length()) {
|
||||
if(!packet->type().fragmentable()) {
|
||||
logError(this->client->getServerId(), "{} We've tried to send a too long, not fragmentable, packet. Dropping packet of type {} with length {}", CLIENT_STR_LOG_PREFIX_(this->client), packet->type().name(), packet->data().length());
|
||||
return false;
|
||||
}
|
||||
|
||||
{ //Split packets
|
||||
auto buffer = packet->data();
|
||||
|
||||
const auto max_length = packet->type().max_length();
|
||||
while(buffer.length() > max_length * 2) {
|
||||
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer.view(0, max_length).dup(buffer::allocate_buffer(max_length))));
|
||||
buffer = buffer.range((size_t) max_length);
|
||||
}
|
||||
|
||||
if(buffer.length() > max_length) { //Divide rest by 2
|
||||
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer.view(0, buffer.length() / 2).dup(buffer::allocate_buffer(buffer.length() / 2))));
|
||||
buffer = buffer.range(buffer.length() / 2);
|
||||
}
|
||||
fragments.push_back(make_shared<ServerPacket>(packet->type(), buffer));
|
||||
|
||||
for(const auto& frag : fragments) {
|
||||
frag->setFragmentedEntry(true);
|
||||
frag->enable_flag(PacketFlag::NewProtocol);
|
||||
}
|
||||
}
|
||||
|
||||
assert(fragments.size() >= 2);
|
||||
fragments.front()->enable_flag(PacketFlag::Fragmented);
|
||||
if(packet->has_flag(PacketFlag::Compressed))
|
||||
fragments.front()->enable_flag(PacketFlag::Compressed);
|
||||
|
||||
fragments.back()->enable_flag(PacketFlag::Fragmented);
|
||||
|
||||
if(packet->getListener())
|
||||
fragments.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :)
|
||||
} else {
|
||||
fragments.push_back(packet);
|
||||
}
|
||||
|
||||
result.reserve(fragments.size());
|
||||
|
||||
/* apply packet ids */
|
||||
for(const auto& fragment : fragments) {
|
||||
if(!fragment->memory_state.id_branded)
|
||||
fragment->applyPacketId(this->packet_id_manager);
|
||||
|
||||
if(fragment->type().type() == protocol::PacketType::COMMAND_LOW || fragment->type().type() == protocol::PacketType::COMMAND)
|
||||
this->packet_statistics().send_command(fragment->type().type(), fragment->packetId() | fragment->generationId() << 16U);
|
||||
}
|
||||
|
||||
work_lock.unlock(); /* the rest could be unordered */
|
||||
|
||||
|
||||
CryptHandler::key_t crypt_key{};
|
||||
CryptHandler::nonce_t crypt_nonce{};
|
||||
auto statistics = this->client ? this->client->connectionStatistics : nullptr;
|
||||
for(const auto& fragment : fragments) {
|
||||
if(fragment->has_flag(PacketFlag::Unencrypted)) {
|
||||
this->crypt_handler.write_default_mac(fragment->mac().data_ptr());
|
||||
if(!this->client->crypto.protocol_encrypted) {
|
||||
crypt_key = CryptHandler::default_key;
|
||||
crypt_nonce = CryptHandler::default_nonce;
|
||||
} else {
|
||||
if(!this->client->crypto.protocol_encrypted) {
|
||||
crypt_key = CryptHandler::default_key;
|
||||
crypt_nonce = CryptHandler::default_nonce;
|
||||
} else {
|
||||
if(!this->crypt_handler.generate_key_nonce(false, fragment->type().type(), fragment->packetId(), fragment->generationId(), crypt_key, crypt_nonce)) {
|
||||
logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce for sending a packet. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
auto crypt_result = this->crypt_handler.encrypt(fragment->header().data_ptr(), fragment->header().length(),
|
||||
fragment->data().data_ptr(), fragment->data().length(),
|
||||
fragment->mac().data_ptr(),
|
||||
crypt_key, crypt_nonce, error);
|
||||
if(!crypt_result){
|
||||
logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error);
|
||||
if(!this->crypt_handler.generate_key_nonce(false, (uint8_t) packet->packet_type(), packet->packet_id(), packet->generation, crypt_key, crypt_nonce)) {
|
||||
logError(this->client->getServerId(), "{} Failed to generate crypt key/nonce for sending a packet. This should never happen! Dropping packet.", CLIENT_STR_LOG_PREFIX_(this->client));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef CONNECTION_NO_STATISTICS
|
||||
if(statistics) {
|
||||
auto category = stats::ConnectionStatistics::category::from_type(fragment->type());
|
||||
statistics->logOutgoingPacket(category, fragment->length() + 96); /* 96 for the UDP packet overhead */
|
||||
auto crypt_result = this->crypt_handler.encrypt((char*) packet->packet_data() + ServerPacketP::kHeaderOffset, ServerPacketP::kHeaderLength,
|
||||
packet->payload, packet->payload_size,
|
||||
packet->mac,
|
||||
crypt_key, crypt_nonce, error);
|
||||
if(!crypt_result){
|
||||
logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error);
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
this->acknowledge_handler.process_packet(*fragment);
|
||||
result.push_back(fragment->buffer());
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool VoiceClientConnection::preprocess_write_packets() {
|
||||
std::shared_ptr<ServerPacket> packet{nullptr};
|
||||
vector<pipes::buffer> buffers{};
|
||||
bool flag_more{false};
|
||||
VoiceClientConnection::WBufferPopResult VoiceClientConnection::pop_write_buffer(protocol::OutgoingServerPacket *&result) {
|
||||
if(this->client->state == ConnectionState::DISCONNECTED)
|
||||
return WBufferPopResult::DRAINED;
|
||||
|
||||
prepare_process_count++; /* we're not preparing a packet */
|
||||
for(auto& category : this->write_preprocess_queues) {
|
||||
if(!category.has_work) continue;
|
||||
else if(packet) {
|
||||
flag_more = true;
|
||||
break;
|
||||
}
|
||||
|
||||
unique_lock work_lock{category.work_lock, try_to_lock};
|
||||
if(!work_lock) continue; /* This particular category will already be processed */
|
||||
|
||||
{
|
||||
lock_guard buffer_lock{category.queue_lock};
|
||||
if(category.queue.empty()) {
|
||||
category.has_work = false;
|
||||
continue;
|
||||
bool need_prepare_packet{false}, more_packets{false};
|
||||
{
|
||||
std::lock_guard wlock{this->write_queue_mutex};
|
||||
if(this->resend_queue_head) {
|
||||
result = this->resend_queue_head;
|
||||
if(result->next) {
|
||||
assert(this->resend_queue_tail != &result->next);
|
||||
this->resend_queue_head = result->next;
|
||||
} else {
|
||||
assert(this->resend_queue_tail == &result->next);
|
||||
this->resend_queue_head = nullptr;
|
||||
this->resend_queue_tail = &this->resend_queue_head;
|
||||
}
|
||||
|
||||
packet = std::move(category.queue.front());
|
||||
category.queue.pop_front();
|
||||
category.has_work = !category.queue.empty();
|
||||
flag_more = category.has_work;
|
||||
} else if(this->write_queue_head) {
|
||||
result = this->write_queue_head;
|
||||
if(result->next) {
|
||||
assert(this->write_queue_tail != &result->next);
|
||||
this->write_queue_head = result->next;
|
||||
} else {
|
||||
assert(this->write_queue_tail == &result->next);
|
||||
this->write_queue_head = nullptr;
|
||||
this->write_queue_tail = &this->write_queue_head;
|
||||
}
|
||||
need_prepare_packet = true;
|
||||
} else {
|
||||
return WBufferPopResult::DRAINED;
|
||||
}
|
||||
|
||||
if(!this->prepare_packet_for_write(buffers, packet, work_lock)) {
|
||||
logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client));
|
||||
if(flag_more)
|
||||
break;
|
||||
else
|
||||
continue; /* find out if we have more */
|
||||
}
|
||||
|
||||
if(flag_more)
|
||||
break;
|
||||
else
|
||||
continue; /* find out if we have more */
|
||||
result->next = nullptr;
|
||||
more_packets = this->resend_queue_head != nullptr || this->write_queue_head != nullptr;
|
||||
}
|
||||
|
||||
/* enqueue buffers for write */
|
||||
if(!buffers.empty()) {
|
||||
lock_guard write_queue_lock(this->write_queue_lock);
|
||||
this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end());
|
||||
}
|
||||
this->prepare_process_count--; /* we're now done preparing */
|
||||
|
||||
return flag_more;
|
||||
if(need_prepare_packet)
|
||||
this->prepare_outgoing_packet(result);
|
||||
return more_packets ? WBufferPopResult::MORE_AVAILABLE : WBufferPopResult::DRAINED;
|
||||
}
|
||||
|
||||
int VoiceClientConnection::pop_write_buffer(pipes::buffer& target) {
|
||||
if(this->client->state == DISCONNECTED)
|
||||
return 2;
|
||||
void VoiceClientConnection::execute_resend(const std::chrono::system_clock::time_point &now, std::chrono::system_clock::time_point &next) {
|
||||
std::deque<std::shared_ptr<connection::AcknowledgeManager::Entry>> buffers{};
|
||||
std::string error{};
|
||||
|
||||
lock_guard wqlock{this->write_queue_lock};
|
||||
size_t size = this->write_queue.size();
|
||||
if(size == 0)
|
||||
return 2;
|
||||
if (this->acknowledge_handler.execute_resend(now, next, buffers, error) < 0) {
|
||||
debugMessage(client->getServerId(), "{} Failed to execute packet resend: {}", CLIENT_STR_LOG_PREFIX_(this->client), error);
|
||||
|
||||
target = std::move(this->write_queue.front());
|
||||
this->write_queue.pop_front();
|
||||
|
||||
#ifdef FUZZING_TESTING_OUTGOING
|
||||
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
|
||||
if (this->client->state == ConnectionState::CONNECTED) {
|
||||
#endif
|
||||
if ((rand() % FUZZING_TESTING_DROP_MAX) < FUZZING_TESTING_DROP) {
|
||||
debugMessage(this->client->getServerId(), "{}[FUZZING] Dropping outgoing packet", CLIENT_STR_LOG_PREFIX_(this->client));
|
||||
return 0;
|
||||
if(this->client->state == ConnectionState::CONNECTED) {
|
||||
this->client->disconnect(ViewReasonId::VREASON_TIMEOUT, config::messages::timeout::packet_resend_failed, nullptr, true);
|
||||
} else {
|
||||
this->client->close_connection(system_clock::now() + seconds(1));
|
||||
}
|
||||
#ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
|
||||
} else if(!buffers.empty()) {
|
||||
size_t send_count{0};
|
||||
{
|
||||
lock_guard wlock{this->write_queue_mutex};
|
||||
for(auto& buffer : buffers) {
|
||||
auto packet = (protocol::OutgoingServerPacket*) buffer->packet_ptr;
|
||||
if(packet->next) continue; /* still in write queue (this shall not happen very often) */
|
||||
if(&packet->next == this->write_queue_tail || &packet->next == this->resend_queue_tail) continue;
|
||||
|
||||
packet->ref(); /* for the write queue again */
|
||||
*this->resend_queue_tail = packet;
|
||||
this->resend_queue_tail = &packet->next;
|
||||
|
||||
send_count++;
|
||||
this->packet_statistics().send_command((protocol::PacketType) buffer->packet_type, buffer->packet_full_id);
|
||||
}
|
||||
}
|
||||
logTrace(client->getServerId(), "{} Resending {} packets. Send actually {} packets.", CLIENT_STR_LOG_PREFIX_(client), buffers.size(), send_count);
|
||||
this->triggerWrite();
|
||||
}
|
||||
}
|
||||
|
||||
void VoiceClientConnection::encrypt_write_queue() {
|
||||
OutgoingServerPacket* packets_head, *packets_tail;
|
||||
{
|
||||
std::lock_guard wlock{this->write_queue_mutex};
|
||||
packets_head = this->write_queue_head;
|
||||
this->write_queue_head = nullptr;
|
||||
this->write_queue_tail = &this->write_queue_head;
|
||||
}
|
||||
if(!packets_head) return;
|
||||
|
||||
auto packet = packets_head;
|
||||
while(packet) {
|
||||
this->prepare_outgoing_packet(packet);
|
||||
packets_tail = packet;
|
||||
packet = packet->next;
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard wlock{this->write_queue_mutex};
|
||||
*this->resend_queue_tail = packets_head;
|
||||
this->resend_queue_tail = &packets_tail->next;
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
return size > 1;
|
||||
}
|
||||
|
||||
bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) {
|
||||
while(true) {
|
||||
for(auto& queue : this->write_preprocess_queues) {
|
||||
{
|
||||
lock_guard lock{queue.queue_lock};
|
||||
if(!queue.queue.empty())
|
||||
goto _wait;
|
||||
}
|
||||
|
||||
{
|
||||
unique_lock lock{queue.work_lock, try_to_lock};
|
||||
if(!lock.owns_lock())
|
||||
goto _wait;
|
||||
}
|
||||
}
|
||||
{
|
||||
lock_guard buffer_lock{this->write_queue_lock};
|
||||
if(!this->write_queue.empty())
|
||||
std::lock_guard wlock{this->write_queue_mutex};
|
||||
if(this->write_queue_head)
|
||||
goto _wait;
|
||||
if(this->prepare_process_count != 0)
|
||||
|
||||
if(this->resend_queue_head)
|
||||
goto _wait;
|
||||
}
|
||||
break;
|
||||
@@ -657,11 +553,25 @@ bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_poin
|
||||
}
|
||||
|
||||
void VoiceClientConnection::reset() {
|
||||
for(auto& queue : this->write_preprocess_queues) {
|
||||
{
|
||||
lock_guard lock{queue.queue_lock};
|
||||
queue.queue.clear();
|
||||
{
|
||||
std::lock_guard wlock{this->write_queue_mutex};
|
||||
auto head = this->write_queue_head;
|
||||
while(head) {
|
||||
auto next = head->next;
|
||||
head->unref();
|
||||
head = next;
|
||||
}
|
||||
this->write_queue_head = nullptr;
|
||||
this->write_queue_tail = &this->write_queue_head;
|
||||
|
||||
head = this->resend_queue_head;
|
||||
while(head) {
|
||||
auto next = head->next;
|
||||
head->unref();
|
||||
head = next;
|
||||
}
|
||||
this->resend_queue_head = nullptr;
|
||||
this->resend_queue_tail = &this->resend_queue_head;
|
||||
}
|
||||
|
||||
this->acknowledge_handler.reset();
|
||||
@@ -701,4 +611,165 @@ void VoiceClientConnection::register_initiv_packet() {
|
||||
auto& fragment_buffer = this->_command_fragment_buffers[command_fragment_buffer_index(protocol::COMMAND)];
|
||||
unique_lock buffer_lock(fragment_buffer.buffer_lock);
|
||||
fragment_buffer.set_full_index_to(1); /* the first packet (0) is already the clientinitiv packet */
|
||||
}
|
||||
|
||||
void VoiceClientConnection::send_packet(protocol::OutgoingServerPacket *packet) {
|
||||
uint32_t full_id;
|
||||
{
|
||||
std::lock_guard id_lock{this->packet_id_mutex};
|
||||
full_id = this->packet_id_manager.generate_full_id(packet->packet_type());
|
||||
}
|
||||
packet->set_packet_id(full_id & 0xFFFFU);
|
||||
packet->generation = full_id >> 16U;
|
||||
|
||||
{
|
||||
std::lock_guard qlock{this->write_queue_mutex};
|
||||
*this->write_queue_tail = packet;
|
||||
this->write_queue_tail = &packet->next;
|
||||
}
|
||||
|
||||
auto statistics = this->client ? this->client->connectionStatistics : nullptr;
|
||||
if(statistics) {
|
||||
auto category = stats::ConnectionStatistics::category::from_type(packet->packet_type());
|
||||
statistics->logOutgoingPacket(category, packet->packet_length() + 96); /* 96 for the UDP packet overhead */
|
||||
}
|
||||
|
||||
this->triggerWrite();
|
||||
}
|
||||
|
||||
void VoiceClientConnection::send_packet(protocol::PacketType type, protocol::PacketFlag::PacketFlags flag, const void *payload, size_t payload_size) {
|
||||
auto packet = protocol::allocate_outgoing_packet(payload_size);
|
||||
|
||||
packet->type_and_flags = (uint8_t) type | (uint8_t) flag;
|
||||
memcpy(packet->payload, payload, payload_size);
|
||||
|
||||
this->send_packet(packet);
|
||||
}
|
||||
|
||||
#define MAX_COMMAND_PACKET_PAYLOAD_LENGTH (487)
|
||||
void VoiceClientConnection::send_command(const std::string_view &command, bool low, std::unique_ptr<threads::Future<bool>> ack_listener) {
|
||||
bool own_data_buffer{false};
|
||||
void* own_data_buffer_ptr; /* imutable! */
|
||||
|
||||
const char* data_buffer{command.data()};
|
||||
size_t data_length{command.length()};
|
||||
|
||||
uint8_t head_pflags{0};
|
||||
PacketType ptype{low ? PacketType::COMMAND_LOW : PacketType::COMMAND};
|
||||
protocol::OutgoingServerPacket *packets_head{nullptr};
|
||||
protocol::OutgoingServerPacket *packets_tail{nullptr};
|
||||
/* only compress "long" commands */
|
||||
if(command.size() > 100) {
|
||||
size_t max_compressed_payload_size = compression::qlz_compressed_size(command.data(), command.length());
|
||||
auto compressed_buffer = ::malloc(max_compressed_payload_size);
|
||||
|
||||
size_t compressed_size{max_compressed_payload_size};
|
||||
if(!compression::qlz_compress_payload(command.data(), command.length(), compressed_buffer, &compressed_size)) {
|
||||
logCritical(0, "Failed to compress command packet. Dropping packet");
|
||||
::free(compressed_buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
/* we don't need to make the command longer than it is */
|
||||
if(compressed_size < command.length()) {
|
||||
own_data_buffer = true;
|
||||
data_buffer = (char*) compressed_buffer;
|
||||
own_data_buffer_ptr = compressed_buffer;
|
||||
data_length = compressed_size;
|
||||
head_pflags |= PacketFlag::Compressed;
|
||||
} else {
|
||||
::free(compressed_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
uint8_t ptype_and_flags{(uint8_t) ((uint8_t) ptype | (uint8_t) PacketFlag::NewProtocol)};
|
||||
if(data_length > MAX_COMMAND_PACKET_PAYLOAD_LENGTH) {
|
||||
auto chunk_count = (size_t) ceil((float) data_length / (float) MAX_COMMAND_PACKET_PAYLOAD_LENGTH);
|
||||
auto chunk_size = (size_t) ceil((float) data_length / (float) chunk_count);
|
||||
auto packet = protocol::allocate_outgoing_packet(chunk_size);
|
||||
packets_head = packet;
|
||||
while(true) {
|
||||
packet->type_and_flags = ptype_and_flags;
|
||||
|
||||
auto bytes = min(chunk_size, data_length);
|
||||
memcpy(packet->payload, data_buffer, bytes);
|
||||
|
||||
data_length -= bytes;
|
||||
if(data_length == 0)
|
||||
break;
|
||||
data_buffer += bytes;
|
||||
|
||||
packet->next = protocol::allocate_outgoing_packet(bytes);
|
||||
packet = packet->next;
|
||||
}
|
||||
packets_tail = packet;
|
||||
} else {
|
||||
auto packet = protocol::allocate_outgoing_packet(data_length);
|
||||
packet->type_and_flags = ptype_and_flags;
|
||||
|
||||
memcpy(packet->payload, data_buffer, data_length);
|
||||
|
||||
packets_head = packet;
|
||||
packets_tail = packet;
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
std::lock_guard id_lock{this->packet_id_mutex};
|
||||
uint32_t full_id;
|
||||
auto head = packets_head;
|
||||
while(head) {
|
||||
full_id = this->packet_id_manager.generate_full_id(ptype);
|
||||
|
||||
head->set_packet_id(full_id & 0xFFFFU);
|
||||
head->generation = full_id >> 16U;
|
||||
head = head->next;
|
||||
}
|
||||
}
|
||||
|
||||
/* if head = tail, fragmented will not be enabled (2x xored) */
|
||||
packets_head->type_and_flags |= head_pflags;
|
||||
packets_head->type_and_flags ^= PacketFlag::Fragmented;
|
||||
packets_tail->type_and_flags ^= PacketFlag::Fragmented;
|
||||
|
||||
/* do this before the next ptr might get modified due to the write queue */
|
||||
auto statistics = this->client ? this->client->connectionStatistics : nullptr;
|
||||
/* general stats */
|
||||
if(statistics) {
|
||||
auto head = packets_head;
|
||||
while(head) {
|
||||
statistics->logOutgoingPacket(stats::ConnectionStatistics::category::COMMAND, head->packet_length() + 96); /* 96 for the UDP overhead */
|
||||
head = head->next;
|
||||
}
|
||||
}
|
||||
|
||||
/* loss stats */
|
||||
{
|
||||
auto head = packets_head;
|
||||
while(head) {
|
||||
auto full_packet_id = (uint32_t) (head->generation << 16U) | head->packet_id();
|
||||
this->packet_statistics_.send_command(head->packet_type(), full_packet_id);
|
||||
|
||||
/* increase a reference for the ack handler */
|
||||
head->ref();
|
||||
|
||||
/* Even thou the packet is yet unencrypted, it will be encrypted with the next write. The next write will be before the next resend because the next ptr must be null in order to resend a packet */
|
||||
if(head == packets_tail)
|
||||
this->acknowledge_handler.process_packet(ptype, full_packet_id, head, std::move(ack_listener));
|
||||
else
|
||||
this->acknowledge_handler.process_packet(ptype, full_packet_id, head, nullptr);
|
||||
|
||||
head = head->next;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard qlock{this->write_queue_mutex};
|
||||
*this->write_queue_tail = packets_head;
|
||||
this->write_queue_tail = &packets_tail->next;
|
||||
}
|
||||
this->triggerWrite();
|
||||
|
||||
if(own_data_buffer)
|
||||
::free(own_data_buffer_ptr);
|
||||
}
|
||||
Reference in New Issue
Block a user