2019-07-17 19:37:18 +02:00
# include <misc/endianness.h>
# include <algorithm>
# include <log/LogUtils.h>
# include "../../server/VoiceServer.h"
# include <misc/memtracker.h>
# include <protocol/Packet.h>
# include <ThreadPool/Timer.h>
# include "VoiceClientConnection.h"
# include "src/client/ConnectedClient.h"
# include "VoiceClient.h"
//#define LOG_AUTO_ACK_AUTORESPONSE
//#define FUZZING_TESTING_INCOMMING
//#define FUZZING_TESTING_OUTGOING
2020-01-27 02:21:39 +01:00
//#define FIZZING_TESTING_DISABLE_HANDSHAKE
# define FUZZING_TESTING_DROP 8
2019-07-17 19:37:18 +02:00
# define FUZZING_TESTING_DROP_MAX 10
//#define CONNECTION_NO_STATISTICS
# define QLZ_COMPRESSION_LEVEL 1
# include "qlz/QuickLZ.h"
using namespace std ;
using namespace std : : chrono ;
using namespace ts ;
using namespace ts : : connection ;
using namespace ts : : protocol ;
using namespace ts : : server ;
VoiceClientConnection : : VoiceClientConnection ( VoiceClient * client ) : client ( client ) {
2020-01-24 02:57:58 +01:00
memtrack : : allocated < VoiceClientConnection > ( this ) ;
2019-07-17 19:37:18 +02:00
2020-01-24 02:57:58 +01:00
this - > crypt_handler . reset ( ) ;
debugMessage ( client - > getServer ( ) - > getServerId ( ) , " Allocated new voice client connection at {} " , ( void * ) this ) ;
2019-07-17 19:37:18 +02:00
}
VoiceClientConnection : : ~ VoiceClientConnection ( ) {
2020-01-24 02:57:58 +01:00
/* locking here should be useless, but just to ensure! */
{
lock_guard write_queue_lock ( this - > write_queue_lock ) ;
this - > write_queue . clear ( ) ;
}
for ( auto & category : this - > write_preprocess_queues ) {
lock_guard work_lock { category . work_lock } ;
lock_guard queue_lock { category . queue_lock } ;
category . queue . clear ( ) ;
}
this - > client = nullptr ;
memtrack : : freed < VoiceClientConnection > ( this ) ;
2019-07-17 19:37:18 +02:00
}
void VoiceClientConnection : : triggerWrite ( ) {
2020-01-24 02:57:58 +01:00
if ( this - > client - > voice_server )
this - > client - > voice_server - > triggerWrite ( dynamic_pointer_cast < VoiceClient > ( this - > client - > _this . lock ( ) ) ) ;
2019-07-17 19:37:18 +02:00
}
# ifdef CLIENT_LOG_PREFIX
# undef CLIENT_LOG_PREFIX
# endif
# define CLIENT_LOG_PREFIX "[" << this->client->getPeerIp() << ":" << this->client->getPeerPort() << " | " << this->client->getDisplayName() << "]"
//Message handle methods
2020-01-27 02:21:39 +01:00
void VoiceClientConnection : : handle_incoming_datagram ( const pipes : : buffer_view & buffer ) {
2019-07-17 19:37:18 +02:00
# ifdef FUZZING_TESTING_INCOMMING
2020-01-24 02:57:58 +01:00
# ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
if ( this - > client - > state = = ConnectionState : : CONNECTED ) {
# endif
if ( ( rand ( ) % FUZZING_TESTING_DROP_MAX ) < FUZZING_TESTING_DROP ) {
debugMessage ( this - > client - > getServerId ( ) , " {}[FUZZING] Dropping incoming packet of length {} " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) , buffer . length ( ) ) ;
return ;
}
# ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
}
# endif
2019-07-17 19:37:18 +02:00
# endif
2020-01-27 02:21:39 +01:00
IncomingClientPacketParser packet_parser { buffer } ;
if ( ! packet_parser . valid ( ) ) {
logTrace ( this - > client - > getServerId ( ) , " {} Received invalid packet. Dropping. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) ) ;
2020-01-24 02:57:58 +01:00
return ;
}
2020-01-27 02:21:39 +01:00
assert ( packet_parser . type ( ) > = 0 & & packet_parser . type ( ) < this - > incoming_generation_estimators . size ( ) ) ;
packet_parser . set_estimated_generation ( this - > incoming_generation_estimators [ packet_parser . type ( ) ] . visit_packet ( packet_parser . packet_id ( ) ) ) ;
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
auto is_command = packet_parser . type ( ) = = protocol : : COMMAND | | packet_parser . type ( ) = = protocol : : COMMAND_LOW ;
/* pretest if the packet is worth the effort of decoding it */
if ( is_command ) {
/* handle the order stuff */
auto & fragment_buffer = this - > _command_fragment_buffers [ command_fragment_buffer_index ( packet_parser . type ( ) ) ] ;
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
unique_lock queue_lock ( fragment_buffer . buffer_lock ) ;
auto result = fragment_buffer . accept_index ( packet_parser . packet_id ( ) ) ;
2020-01-24 02:57:58 +01:00
if ( result ! = 0 ) { /* packet index is ahead buffer index */
2020-01-27 13:02:22 +01:00
debugMessage ( this - > client - > getServerId ( ) , " {} Dropping command packet because command assembly buffer has an {} ({}|{}|{}) " ,
2020-01-27 02:21:39 +01:00
CLIENT_STR_LOG_PREFIX_ ( this - > client ) ,
2020-01-24 02:57:58 +01:00
result = = - 1 ? " underflow " : " overflow " ,
2020-01-27 02:21:39 +01:00
fragment_buffer . capacity ( ) ,
2020-01-27 13:02:22 +01:00
fragment_buffer . current_index ( ) ,
packet_parser . packet_id ( )
2020-01-24 02:57:58 +01:00
) ;
if ( result = = - 1 ) { /* underflow */
2020-01-27 02:21:39 +01:00
/* we've already got the packet, but the client dosn't know that so we've to send the acknowledge again */
if ( this - > client - > crypto . protocol_encrypted )
this - > client - > sendAcknowledge ( packet_parser . packet_id ( ) , packet_parser . type ( ) = = protocol : : COMMAND_LOW ) ;
2020-01-24 02:57:58 +01:00
}
return ;
}
}
//NOTICE I found out that the Compressed flag is set if the packet contains an audio header
2020-01-27 02:21:39 +01:00
if ( this - > client - > state = = ConnectionState : : INIT_LOW & & packet_parser . type ( ) ! = protocol : : INIT1 )
2020-01-24 02:57:58 +01:00
return ;
2020-01-27 02:21:39 +01:00
/* decrypt the packet if needed */
if ( packet_parser . is_encrypted ( ) ) {
std : : string error ;
CryptHandler : : key_t crypt_key { } ;
CryptHandler : : nonce_t crypt_nonce { } ;
auto data = ( uint8_t * ) packet_parser . mutable_data_ptr ( ) ;
bool use_default_key { ! this - > client - > crypto . protocol_encrypted } , decrypt_result ;
decrypt_packet :
if ( use_default_key ) {
crypt_key = CryptHandler : : default_key ;
crypt_nonce = CryptHandler : : default_nonce ;
2020-01-24 02:57:58 +01:00
} else {
2020-01-27 02:21:39 +01:00
if ( ! this - > crypt_handler . generate_key_nonce ( true , packet_parser . type ( ) , packet_parser . packet_id ( ) , packet_parser . estimated_generation ( ) , crypt_key , crypt_nonce ) ) {
logError ( this - > client - > getServerId ( ) , " {} Failed to generate crypt key/nonce. This should never happen! Dropping packet. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) ) ;
2020-01-24 02:57:58 +01:00
return ;
}
}
2020-01-27 02:21:39 +01:00
decrypt_result = this - > crypt_handler . decrypt (
data + IncomingClientPacketParser : : kHeaderOffset , IncomingClientPacketParser : : kHeaderLength ,
data + IncomingClientPacketParser : : kPayloadOffset , packet_parser . payload_length ( ) ,
data ,
crypt_key , crypt_nonce ,
error
) ;
if ( ! decrypt_result ) {
if ( ! this - > client - > crypto . client_init ) {
if ( use_default_key ) {
logTrace ( this - > client - > getServerId ( ) , " {} Failed to decrypt packet with default key ({}). Dropping packet. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) , error ) ;
return ;
} else {
logTrace ( this - > client - > getServerId ( ) , " {} Failed to decrypt packet ({}). Trying with default key. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) , error ) ;
use_default_key = true ;
goto decrypt_packet ;
}
} else {
logTrace ( this - > client - > getServerId ( ) , " {} Failed to decrypt packet ({}). Dropping packet. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) , error ) ;
return ;
}
2020-01-24 02:57:58 +01:00
}
2020-01-27 02:21:39 +01:00
packet_parser . set_decrypted ( ) ;
} else if ( is_command & & this - > client - > state ! = ConnectionState : : INIT_HIGH ) {
logTrace ( this - > client - > getServerId ( ) , " {} Voice client {}/{} tried to send a unencrypted command packet. Dropping packet. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) , client - > getDisplayName ( ) , this - > client - > getLoggingPeerIp ( ) ) ;
return ;
2020-01-24 02:57:58 +01:00
}
2019-07-17 19:37:18 +02:00
# ifndef CONNECTION_NO_STATISTICS
2020-01-24 02:57:58 +01:00
if ( this - > client & & this - > client - > getServer ( ) )
2020-01-27 02:21:39 +01:00
this - > client - > connectionStatistics - > logIncomingPacket ( stats : : ConnectionStatistics : : category : : from_type ( packet_parser . type ( ) ) , buffer . length ( ) ) ;
2019-07-17 19:37:18 +02:00
# endif
# ifdef LOG_INCOMPING_PACKET_FRAGMENTS
2020-01-24 02:57:58 +01:00
debugMessage ( lstream < < CLIENT_LOG_PREFIX < < " Recived packet. PacketId: " < < packet - > packetId ( ) < < " PacketType: " < < packet - > type ( ) . name ( ) < < " Flags: " < < packet - > flags ( ) < < " - " < < packet - > data ( ) < < endl ) ;
2019-07-17 19:37:18 +02:00
# endif
2020-01-27 02:21:39 +01:00
if ( is_command ) {
auto & fragment_buffer = this - > _command_fragment_buffers [ command_fragment_buffer_index ( packet_parser . type ( ) ) ] ;
CommandFragment fragment_entry {
packet_parser . packet_id ( ) ,
packet_parser . estimated_generation ( ) ,
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
packet_parser . flags ( ) ,
( uint32_t ) packet_parser . payload_length ( ) ,
packet_parser . payload ( ) . own_buffer ( )
} ;
{
unique_lock queue_lock ( fragment_buffer . buffer_lock ) ;
if ( ! fragment_buffer . insert_index ( packet_parser . packet_id ( ) , std : : move ( fragment_entry ) ) ) {
logTrace ( this - > client - > getServerId ( ) , " {} Failed to insert command packet into command packet buffer. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) ) ;
return ;
2020-01-24 02:57:58 +01:00
}
}
2020-01-27 02:21:39 +01:00
this - > client - > sendAcknowledge ( packet_parser . packet_id ( ) , packet_parser . type ( ) = = protocol : : COMMAND_LOW ) ;
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
auto voice_server = this - > client - > voice_server ;
if ( voice_server )
voice_server - > schedule_command_handling ( this - > client ) ;
} else {
if ( packet_parser . type ( ) = = protocol : : VOICE | | packet_parser . type ( ) = = protocol : : VOICE_WHISPER )
this - > client - > handlePacketVoice ( packet_parser ) ;
else if ( packet_parser . type ( ) = = protocol : : ACK | | packet_parser . type ( ) = = protocol : : ACK_LOW )
this - > client - > handlePacketAck ( packet_parser ) ;
else if ( packet_parser . type ( ) = = protocol : : PING | | packet_parser . type ( ) = = protocol : : PONG )
this - > client - > handlePacketPing ( packet_parser ) ;
else {
logError ( this - > client - > getServerId ( ) , " {} Received hand decoded packet, but we've no method to handle it. Dropping packet. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) ) ;
}
}
2019-07-17 19:37:18 +02:00
}
2020-01-27 02:21:39 +01:00
bool VoiceClientConnection : : verify_encryption ( const pipes : : buffer_view & buffer /* incl. mac etc */ ) {
IncomingClientPacketParser packet_parser { buffer } ;
if ( ! packet_parser . valid ( ) | | ! packet_parser . is_encrypted ( ) ) return false ;
2019-07-17 19:37:18 +02:00
2020-01-27 02:21:39 +01:00
assert ( packet_parser . type ( ) > = 0 & & packet_parser . type ( ) < this - > incoming_generation_estimators . size ( ) ) ;
return this - > crypt_handler . verify_encryption ( buffer , packet_parser . packet_id ( ) , this - > incoming_generation_estimators [ packet_parser . type ( ) ] . generation ( ) ) ;
2019-07-17 19:37:18 +02:00
}
2020-01-27 02:21:39 +01:00
void VoiceClientConnection : : execute_handle_command_packets ( const std : : chrono : : system_clock : : time_point & /* scheduled */ ) {
2020-02-01 14:32:16 +01:00
if ( this - > client - > state > = ConnectionState : : DISCONNECTING | | ! this - > client - > getServer ( ) )
2020-01-24 02:57:58 +01:00
return ;
2020-01-27 02:21:39 +01:00
//TODO: Remove the buffer_execute_lock and use the one within the this->client->handlePacketCommand method
2020-01-24 02:57:58 +01:00
unique_lock < std : : recursive_timed_mutex > buffer_execute_lock ;
2020-01-27 02:21:39 +01:00
pipes : : buffer payload { } ;
2020-01-27 13:02:22 +01:00
uint16_t packet_id { } ;
auto reexecute_handle = this - > next_reassembled_command ( buffer_execute_lock , payload , packet_id ) ;
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
if ( ! payload . empty ( ) ) {
2020-01-24 02:57:58 +01:00
auto startTime = system_clock : : now ( ) ;
try {
2020-01-27 02:21:39 +01:00
this - > client - > handlePacketCommand ( payload ) ;
2020-01-24 02:57:58 +01:00
} catch ( std : : exception & ex ) {
logCritical ( this - > client - > getServerId ( ) , " {} Exception reached root tree! {} " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) , ex . what ( ) ) ;
}
auto end = system_clock : : now ( ) ;
if ( end - startTime > milliseconds ( 10 ) ) {
2020-01-27 02:21:39 +01:00
logError ( this - > client - > getServerId ( ) ,
" {} Handling of command packet needs more than 10ms ({}ms) " ,
CLIENT_STR_LOG_PREFIX_ ( this - > client ) ,
duration_cast < milliseconds > ( end - startTime ) . count ( )
) ;
2020-01-24 02:57:58 +01:00
}
}
if ( buffer_execute_lock . owns_lock ( ) )
buffer_execute_lock . unlock ( ) ;
auto voice_server = this - > client - > voice_server ;
if ( voice_server & & reexecute_handle )
2020-01-27 02:21:39 +01:00
this - > client - > voice_server - > schedule_command_handling ( this - > client ) ;
2019-07-17 19:37:18 +02:00
}
/* buffer_execute_lock: lock for in order execution */
2020-01-27 13:02:22 +01:00
bool VoiceClientConnection : : next_reassembled_command ( unique_lock < std : : recursive_timed_mutex > & buffer_execute_lock , pipes : : buffer & result , uint16_t & packet_id ) {
2020-01-27 02:21:39 +01:00
command_fragment_buffer_t * buffer { nullptr } ;
2020-01-24 02:57:58 +01:00
unique_lock < std : : recursive_timed_mutex > buffer_lock ; /* general buffer lock */
2020-01-27 02:21:39 +01:00
bool have_more { false } ;
2020-01-24 02:57:58 +01:00
{
2020-01-27 02:21:39 +01:00
//FIXME: Currently command low packets cant be handeled if there is a command packet stuck in reassamble
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
/* handle commands before command low packets */
for ( auto & buf : this - > _command_fragment_buffers ) {
2020-01-24 02:57:58 +01:00
unique_lock ring_lock ( buf . buffer_lock , try_to_lock ) ;
if ( ! ring_lock . owns_lock ( ) ) continue ;
if ( buf . front_set ( ) ) {
if ( ! buffer ) { /* lets still test for reexecute */
buffer_execute_lock = unique_lock ( buf . execute_lock , try_to_lock ) ;
if ( ! buffer_execute_lock . owns_lock ( ) ) continue ;
buffer_lock = move ( ring_lock ) ;
buffer = & buf ;
} else {
2020-01-27 02:21:39 +01:00
have_more = true ;
2020-01-24 02:57:58 +01:00
break ;
}
}
}
}
if ( ! buffer )
2020-01-27 02:21:39 +01:00
return false ; /* we've no packets */
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
uint8_t packet_flags { 0 } ;
pipes : : buffer payload { } ;
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
/* lets find out if we've to reassemble the packet */
2020-01-27 13:02:22 +01:00
auto & first_buffer = buffer - > slot_value ( 0 ) ;
packet_id = first_buffer . packet_id ;
if ( first_buffer . packet_flags & PacketFlag : : Fragmented ) {
uint16_t sequence_length { 1 } ;
size_t total_payload_length { first_buffer . payload_length } ;
2020-01-24 02:57:58 +01:00
do {
if ( sequence_length > = buffer - > capacity ( ) ) {
2020-01-27 02:21:39 +01:00
logError ( this - > client - > getServerId ( ) , " {} Command fragment buffer is full, and there is not fragmented packet end. Dropping full buffer which will probably cause a connection loss. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) ) ;
2020-01-24 02:57:58 +01:00
buffer - > clear ( ) ;
2020-01-27 02:21:39 +01:00
return false ; /* we've nothing to handle */
2020-01-24 02:57:58 +01:00
}
2020-01-27 02:21:39 +01:00
if ( ! buffer - > slot_set ( sequence_length ) )
return false ; /* we need more packets */
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
auto & packet = buffer - > slot_value ( sequence_length + + ) ;
total_payload_length + = packet . payload_length ;
if ( packet . packet_flags & PacketFlag : : Fragmented ) {
/* yep we find the end */
break ;
}
} while ( true ) ;
/* ok we have all fragments lets reassemble */
2020-01-24 02:57:58 +01:00
/*
* Packet sequence could never be so long . If it is so then the data_length ( ) returned an invalid value .
* We ' re checking it here because we dont want to make a huge allocation
*/
2020-01-27 02:21:39 +01:00
assert ( total_payload_length < 512 * 1024 * 1024 ) ;
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
pipes : : buffer packet_buffer { total_payload_length } ;
2020-01-24 02:57:58 +01:00
char * packet_buffer_ptr = & packet_buffer [ 0 ] ;
2020-01-27 13:02:22 +01:00
size_t packet_count { 0 } ;
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
packet_flags = buffer - > slot_value ( 0 ) . packet_flags ;
2020-01-24 02:57:58 +01:00
while ( packet_count < sequence_length ) {
2020-01-27 02:21:39 +01:00
auto fragment = buffer - > pop_front ( ) ;
memcpy ( packet_buffer_ptr , fragment . payload . data_ptr ( ) , fragment . payload_length ) ;
2020-01-24 02:57:58 +01:00
2020-01-27 02:21:39 +01:00
packet_buffer_ptr + = fragment . payload_length ;
packet_count + + ;
2020-01-24 02:57:58 +01:00
}
2020-01-27 02:21:39 +01:00
# ifndef _NDEBUG
2020-01-24 02:57:58 +01:00
if ( ( packet_buffer_ptr - 1 ) ! = & packet_buffer [ packet_buffer . length ( ) - 1 ] ) {
logCritical ( this - > client - > getServer ( ) - > getServerId ( ) ,
2020-01-27 02:21:39 +01:00
" Buffer over/underflow: packet_buffer_ptr != &packet_buffer[packet_buffer.length() - 1]; packet_buffer_ptr := {}; packet_buffer.end() := {} " ,
( void * ) packet_buffer_ptr ,
( void * ) & packet_buffer [ packet_buffer . length ( ) - 1 ]
2020-01-24 02:57:58 +01:00
) ;
}
2020-01-27 02:21:39 +01:00
# endif
2020-01-27 13:02:22 +01:00
payload = packet_buffer ;
2020-01-24 02:57:58 +01:00
} else {
2020-01-27 02:21:39 +01:00
auto packet = buffer - > pop_front ( ) ;
packet_flags = packet . packet_flags ;
payload = packet . payload ;
2020-01-24 02:57:58 +01:00
}
2020-01-27 02:21:39 +01:00
have_more | = buffer - > front_set ( ) ; /* set the more flag if we have more to process */
2020-01-24 02:57:58 +01:00
buffer_lock . unlock ( ) ;
2020-01-27 02:21:39 +01:00
if ( packet_flags & PacketFlag : : Compressed ) {
std : : string error { } ;
auto decompressed_size = compression : : qlz_decompressed_size ( payload . data_ptr ( ) , payload . length ( ) ) ;
auto buffer = buffer : : allocate_buffer ( decompressed_size ) ;
if ( ! compression : : qlz_decompress_payload ( payload . data_ptr ( ) , buffer . data_ptr ( ) , & decompressed_size ) ) {
logTrace ( this - > client - > getServerId ( ) , " {} Failed to decompress received command. Dropping packet. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) ) ;
return false ;
}
payload = buffer . range ( 0 , decompressed_size ) ;
2020-01-24 02:57:58 +01:00
}
2020-01-27 02:21:39 +01:00
result = std : : move ( payload ) ;
return have_more ;
2019-07-17 19:37:18 +02:00
}
void VoiceClientConnection : : sendPacket ( const shared_ptr < protocol : : ServerPacket > & original_packet , bool copy , bool prepare_directly ) {
2020-01-24 02:57:58 +01:00
if ( this - > client - > state = = ConnectionState : : DISCONNECTED )
return ;
shared_ptr < protocol : : ServerPacket > packet ;
if ( copy ) {
packet = protocol : : ServerPacket : : from_buffer ( original_packet - > buffer ( ) . dup ( buffer : : allocate_buffer ( original_packet - > buffer ( ) . length ( ) ) ) ) ;
if ( original_packet - > getListener ( ) )
packet - > setListener ( std : : move ( original_packet - > getListener ( ) ) ) ;
packet - > memory_state . flags = original_packet - > memory_state . flags ;
} else {
packet = original_packet ;
}
auto type = WritePreprocessCategory : : from_type ( packet - > type ( ) . type ( ) ) ;
auto & queue = this - > write_preprocess_queues [ type ] ;
if ( prepare_directly ) {
vector < pipes : : buffer > buffers ;
this - > prepare_process_count + + ;
{
unique_lock work_lock { queue . work_lock } ;
if ( ! this - > prepare_packet_for_write ( buffers , packet , work_lock ) ) {
logError ( this - > client - > getServerId ( ) , " {} Dropping packet! " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) ) ;
this - > prepare_process_count - - ;
return ;
}
}
/* enqueue buffers for write */
{
lock_guard write_queue_lock ( this - > write_queue_lock ) ;
this - > write_queue . insert ( this - > write_queue . end ( ) , buffers . begin ( ) , buffers . end ( ) ) ;
}
this - > prepare_process_count - - ; /* we're now done preparing */
} else {
lock_guard queue_lock { queue . queue_lock } ;
queue . queue . push_back ( packet ) ;
queue . has_work = true ;
}
this - > triggerWrite ( ) ;
2019-07-17 19:37:18 +02:00
}
2019-10-22 18:39:52 +02:00
bool VoiceClientConnection : : prepare_packet_for_write ( vector < pipes : : buffer > & result , const shared_ptr < ServerPacket > & packet , std : : unique_lock < std : : mutex > & work_lock ) {
2020-01-24 02:57:58 +01:00
assert ( work_lock . owns_lock ( ) ) ;
string error = " success " ;
2020-02-01 14:32:16 +01:00
if ( packet - > type ( ) . compressable ( ) & & ! packet - > memory_state . fragment_entry ) {
2020-01-24 02:57:58 +01:00
packet - > enable_flag ( PacketFlag : : Compressed ) ;
if ( ! this - > compress_handler . progressPacketOut ( packet . get ( ) , error ) ) {
logError ( this - > getClient ( ) - > getServerId ( ) , " {} Could not compress outgoing packet. \n This could cause fatal failed for the client. \n Error: {} " , error ) ;
return false ;
}
}
std : : vector < shared_ptr < ServerPacket > > fragments ;
fragments . reserve ( ( size_t ) ( packet - > data ( ) . length ( ) / packet - > type ( ) . max_length ( ) ) + 1 ) ;
if ( packet - > data ( ) . length ( ) > packet - > type ( ) . max_length ( ) ) {
if ( ! packet - > type ( ) . fragmentable ( ) ) {
logError ( this - > client - > getServerId ( ) , " {} We've tried to send a too long, not fragmentable, packet. Dropping packet of type {} with length {} " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) , packet - > type ( ) . name ( ) , packet - > data ( ) . length ( ) ) ;
return false ;
}
{ //Split packets
auto buffer = packet - > data ( ) ;
const auto max_length = packet - > type ( ) . max_length ( ) ;
while ( buffer . length ( ) > max_length * 2 ) {
fragments . push_back ( make_shared < ServerPacket > ( packet - > type ( ) , buffer . view ( 0 , max_length ) . dup ( buffer : : allocate_buffer ( max_length ) ) ) ) ;
buffer = buffer . range ( ( size_t ) max_length ) ;
}
if ( buffer . length ( ) > max_length ) { //Divide rest by 2
fragments . push_back ( make_shared < ServerPacket > ( packet - > type ( ) , buffer . view ( 0 , buffer . length ( ) / 2 ) . dup ( buffer : : allocate_buffer ( buffer . length ( ) / 2 ) ) ) ) ;
buffer = buffer . range ( buffer . length ( ) / 2 ) ;
}
fragments . push_back ( make_shared < ServerPacket > ( packet - > type ( ) , buffer ) ) ;
for ( const auto & frag : fragments ) {
frag - > setFragmentedEntry ( true ) ;
frag - > enable_flag ( PacketFlag : : NewProtocol ) ;
}
}
assert ( fragments . size ( ) > = 2 ) ;
fragments . front ( ) - > enable_flag ( PacketFlag : : Fragmented ) ;
if ( packet - > has_flag ( PacketFlag : : Compressed ) )
fragments . front ( ) - > enable_flag ( PacketFlag : : Compressed ) ;
fragments . back ( ) - > enable_flag ( PacketFlag : : Fragmented ) ;
if ( packet - > getListener ( ) )
fragments . back ( ) - > setListener ( std : : move ( packet - > getListener ( ) ) ) ; //Move the listener to the last :)
} else {
fragments . push_back ( packet ) ;
}
result . reserve ( fragments . size ( ) ) ;
/* apply packet ids */
for ( const auto & fragment : fragments ) {
if ( ! fragment - > memory_state . id_branded )
fragment - > applyPacketId ( this - > packet_id_manager ) ;
}
work_lock . unlock ( ) ; /* the rest could be unordered */
2020-01-27 02:21:39 +01:00
CryptHandler : : key_t crypt_key { } ;
CryptHandler : : nonce_t crypt_nonce { } ;
2020-01-24 02:57:58 +01:00
auto statistics = this - > client ? this - > client - > connectionStatistics : nullptr ;
for ( const auto & fragment : fragments ) {
2020-01-27 02:21:39 +01:00
if ( fragment - > has_flag ( PacketFlag : : Unencrypted ) ) {
this - > crypt_handler . write_default_mac ( fragment - > mac ( ) . data_ptr ( ) ) ;
} else {
if ( ! this - > client - > crypto . protocol_encrypted ) {
crypt_key = CryptHandler : : default_key ;
crypt_nonce = CryptHandler : : default_nonce ;
} else {
if ( ! this - > crypt_handler . generate_key_nonce ( false , fragment - > type ( ) . type ( ) , fragment - > packetId ( ) , fragment - > generationId ( ) , crypt_key , crypt_nonce ) ) {
logError ( this - > client - > getServerId ( ) , " {} Failed to generate crypt key/nonce for sending a packet. This should never happen! Dropping packet. " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) ) ;
return false ;
}
}
auto crypt_result = this - > crypt_handler . encrypt ( fragment - > header ( ) . data_ptr ( ) , fragment - > header ( ) . length ( ) ,
fragment - > data ( ) . data_ptr ( ) , fragment - > data ( ) . length ( ) ,
fragment - > mac ( ) . data_ptr ( ) ,
crypt_key , crypt_nonce , error ) ;
if ( ! crypt_result ) {
logError ( this - > client - > getServerId ( ) , " {} Failed to encrypt packet. Error: {} " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) , error ) ;
return false ;
}
2020-01-24 02:57:58 +01:00
}
2020-01-27 02:21:39 +01:00
2019-07-17 19:37:18 +02:00
# ifndef CONNECTION_NO_STATISTICS
2020-01-24 02:57:58 +01:00
if ( statistics )
statistics - > logOutgoingPacket ( * fragment ) ;
2019-07-17 19:37:18 +02:00
# endif
2020-01-24 02:57:58 +01:00
this - > acknowledge_handler . process_packet ( * fragment ) ;
result . push_back ( fragment - > buffer ( ) ) ;
}
2019-07-17 19:37:18 +02:00
2020-01-24 02:57:58 +01:00
return true ;
2019-07-17 19:37:18 +02:00
}
2019-10-22 18:39:52 +02:00
bool VoiceClientConnection : : preprocess_write_packets ( ) {
2020-01-24 02:57:58 +01:00
std : : shared_ptr < ServerPacket > packet { nullptr } ;
vector < pipes : : buffer > buffers { } ;
bool flag_more { false } ;
prepare_process_count + + ; /* we're not preparing a packet */
for ( auto & category : this - > write_preprocess_queues ) {
if ( ! category . has_work ) continue ;
else if ( packet ) {
flag_more = true ;
break ;
}
unique_lock work_lock { category . work_lock , try_to_lock } ;
if ( ! work_lock ) continue ; /* This particular category will already be processed */
{
lock_guard buffer_lock { category . queue_lock } ;
if ( category . queue . empty ( ) ) {
category . has_work = false ;
continue ;
}
packet = std : : move ( category . queue . front ( ) ) ;
category . queue . pop_front ( ) ;
category . has_work = ! category . queue . empty ( ) ;
flag_more = category . has_work ;
}
if ( ! this - > prepare_packet_for_write ( buffers , packet , work_lock ) ) {
logError ( this - > client - > getServerId ( ) , " {} Dropping packet! " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) ) ;
if ( flag_more )
break ;
else
continue ; /* find out if we have more */
}
if ( flag_more )
break ;
else
continue ; /* find out if we have more */
}
/* enqueue buffers for write */
if ( ! buffers . empty ( ) ) {
lock_guard write_queue_lock ( this - > write_queue_lock ) ;
this - > write_queue . insert ( this - > write_queue . end ( ) , buffers . begin ( ) , buffers . end ( ) ) ;
}
this - > prepare_process_count - - ; /* we're now done preparing */
return flag_more ;
2019-07-17 19:37:18 +02:00
}
int VoiceClientConnection : : pop_write_buffer ( pipes : : buffer & target ) {
2020-01-24 02:57:58 +01:00
if ( this - > client - > state = = DISCONNECTED )
return 2 ;
2019-07-17 19:37:18 +02:00
2020-01-24 02:57:58 +01:00
lock_guard write_queue_lock ( this - > write_queue_lock ) ;
size_t size = this - > write_queue . size ( ) ;
if ( size = = 0 )
return 2 ;
2019-07-17 19:37:18 +02:00
2020-01-24 02:57:58 +01:00
target = std : : move ( this - > write_queue . front ( ) ) ;
this - > write_queue . pop_front ( ) ;
2019-07-17 19:37:18 +02:00
# ifdef FUZZING_TESTING_OUTGOING
2020-01-24 02:57:58 +01:00
# ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
if ( this - > client - > state = = ConnectionState : : CONNECTED ) {
# endif
if ( ( rand ( ) % FUZZING_TESTING_DROP_MAX ) < FUZZING_TESTING_DROP ) {
debugMessage ( this - > client - > getServerId ( ) , " {}[FUZZING] Dropping outgoing packet " , CLIENT_STR_LOG_PREFIX_ ( this - > client ) ) ;
return 0 ;
}
# ifdef FIZZING_TESTING_DISABLE_HANDSHAKE
}
# endif
2019-07-17 19:37:18 +02:00
# endif
2020-01-24 02:57:58 +01:00
return size > 1 ;
2019-07-17 19:37:18 +02:00
}
bool VoiceClientConnection : : wait_empty_write_and_prepare_queue ( chrono : : time_point < chrono : : system_clock > until ) {
2020-01-24 02:57:58 +01:00
while ( true ) {
for ( auto & queue : this - > write_preprocess_queues ) {
{
lock_guard lock { queue . queue_lock } ;
if ( ! queue . queue . empty ( ) )
goto _wait ;
}
{
unique_lock lock { queue . work_lock , try_to_lock } ;
if ( ! lock . owns_lock ( ) )
goto _wait ;
}
}
{
lock_guard buffer_lock { this - > write_queue_lock } ;
if ( ! this - > write_queue . empty ( ) )
goto _wait ;
if ( this - > prepare_process_count ! = 0 )
goto _wait ;
}
break ;
_wait :
if ( until . time_since_epoch ( ) . count ( ) ! = 0 & & system_clock : : now ( ) > until )
return false ;
2020-02-01 14:32:16 +01:00
threads : : self : : sleep_for ( milliseconds ( 5 ) ) ;
}
2020-01-24 02:57:58 +01:00
return true ;
2019-07-17 19:37:18 +02:00
}
void VoiceClientConnection : : reset ( ) {
2020-01-24 02:57:58 +01:00
for ( auto & queue : this - > write_preprocess_queues ) {
{
lock_guard lock { queue . queue_lock } ;
queue . queue . clear ( ) ;
}
}
this - > acknowledge_handler . reset ( ) ;
this - > crypt_handler . reset ( ) ;
this - > packet_id_manager . reset ( ) ;
{
lock_guard buffer_lock ( this - > packet_buffer_lock ) ;
2020-01-27 02:21:39 +01:00
for ( auto & buffer : this - > _command_fragment_buffers )
2020-01-24 02:57:58 +01:00
buffer . reset ( ) ;
}
2019-07-17 19:37:18 +02:00
}
2020-01-27 02:21:39 +01:00
void VoiceClientConnection : : force_insert_command ( const pipes : : buffer_view & buffer ) {
CommandFragment fragment_entry {
0 ,
0 ,
PacketFlag : : Unencrypted ,
( uint32_t ) buffer . length ( ) ,
buffer . own_buffer ( )
} ;
{
auto & fragment_buffer = this - > _command_fragment_buffers [ command_fragment_buffer_index ( protocol : : COMMAND ) ] ;
unique_lock queue_lock ( fragment_buffer . buffer_lock ) ;
fragment_buffer . push_front ( std : : move ( fragment_entry ) ) ;
}
auto voice_server = this - > client - > voice_server ;
if ( voice_server )
voice_server - > schedule_command_handling ( this - > client ) ;
}
void VoiceClientConnection : : register_initiv_packet ( ) {
auto & fragment_buffer = this - > _command_fragment_buffers [ command_fragment_buffer_index ( protocol : : COMMAND ) ] ;
unique_lock buffer_lock ( fragment_buffer . buffer_lock ) ;
fragment_buffer . set_full_index_to ( 1 ) ; /* the first packet (0) is already the clientinitiv packet */
}