Fixed voice disconnecting
This commit is contained in:
		
							parent
							
								
									4781e75a11
								
							
						
					
					
						commit
						d5ffbff3be
					
				| @ -42,9 +42,12 @@ VoiceClientConnection::~VoiceClientConnection() { | |||||||
| 		lock_guard write_queue_lock(this->write_queue_lock); | 		lock_guard write_queue_lock(this->write_queue_lock); | ||||||
| 		this->write_queue.clear(); | 		this->write_queue.clear(); | ||||||
| 	} | 	} | ||||||
| 	{ | 
 | ||||||
| 		lock_guard write_queue_lock(this->write_prepare_queue_lock); | 	for(auto& category : this->write_preprocess_queues) { | ||||||
| 		this->write_prepare_queue.clear(); | 		lock_guard work_lock{category.work_lock}; | ||||||
|  | 		lock_guard queue_lock{category.queue_lock}; | ||||||
|  | 
 | ||||||
|  | 		category.queue.clear(); | ||||||
| 	} | 	} | ||||||
| 	this->client = nullptr; | 	this->client = nullptr; | ||||||
| 	memtrack::freed<VoiceClientConnection>(this); | 	memtrack::freed<VoiceClientConnection>(this); | ||||||
| @ -451,7 +454,8 @@ unique_ptr<protocol::ClientPacket> VoiceClientConnection::next_reassembled_packe | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| void VoiceClientConnection::sendPacket(const shared_ptr<protocol::ServerPacket>& original_packet, bool copy, bool prepare_directly) { | void VoiceClientConnection::sendPacket(const shared_ptr<protocol::ServerPacket>& original_packet, bool copy, bool prepare_directly) { | ||||||
| 	if(this->client->state == ConnectionState::DISCONNECTED) return; | 	if(this->client->state == ConnectionState::DISCONNECTED) | ||||||
|  | 		return; | ||||||
| 
 | 
 | ||||||
| 	shared_ptr<protocol::ServerPacket> packet; | 	shared_ptr<protocol::ServerPacket> packet; | ||||||
| 	if(copy) { | 	if(copy) { | ||||||
| @ -459,16 +463,23 @@ void VoiceClientConnection::sendPacket(const shared_ptr<protocol::ServerPacket>& | |||||||
| 		if(original_packet->getListener()) | 		if(original_packet->getListener()) | ||||||
| 			packet->setListener(std::move(original_packet->getListener())); | 			packet->setListener(std::move(original_packet->getListener())); | ||||||
| 		packet->memory_state.flags = original_packet->memory_state.flags; | 		packet->memory_state.flags = original_packet->memory_state.flags; | ||||||
| 	} else | 	} else { | ||||||
| 		packet = original_packet; | 		packet = original_packet; | ||||||
|  | 	} | ||||||
| 
 | 
 | ||||||
|  | 	auto type = WritePreprocessCategory::from_type(packet->type().type()); | ||||||
|  | 	auto& queue = this->write_preprocess_queues[type]; | ||||||
| 	if(prepare_directly) { | 	if(prepare_directly) { | ||||||
| 		vector<pipes::buffer> buffers; | 		vector<pipes::buffer> buffers; | ||||||
| 		this->prepare_process_count++; | 		this->prepare_process_count++; | ||||||
| 		if(!this->prepare_packet_for_write(buffers, packet)) { | 
 | ||||||
| 			logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client)); | 		{ | ||||||
| 			this->prepare_process_count--; | 			unique_lock work_lock{queue.work_lock}; | ||||||
| 			return; | 			if(!this->prepare_packet_for_write(buffers, packet, work_lock)) { | ||||||
|  | 				logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client)); | ||||||
|  | 				this->prepare_process_count--; | ||||||
|  | 				return; | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		/* enqueue buffers for write */ | 		/* enqueue buffers for write */ | ||||||
| @ -478,13 +489,16 @@ void VoiceClientConnection::sendPacket(const shared_ptr<protocol::ServerPacket>& | |||||||
| 		} | 		} | ||||||
| 		this->prepare_process_count--; /* we're now done preparing */ | 		this->prepare_process_count--; /* we're now done preparing */ | ||||||
| 	} else { | 	} else { | ||||||
| 		lock_guard prepare_queue_lock(this->write_prepare_queue_lock); | 		lock_guard queue_lock{queue.queue_lock}; | ||||||
| 		this->write_prepare_queue.push_back(packet); | 		queue.queue.push_back(packet); | ||||||
|  | 		queue.has_work = true; | ||||||
| 	} | 	} | ||||||
| 	this->triggerWrite(); | 	this->triggerWrite(); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &result, const shared_ptr<ServerPacket> &packet) { | bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &result, const shared_ptr<ServerPacket> &packet, std::unique_lock<std::mutex>& work_lock) { | ||||||
|  | 	assert(work_lock.owns_lock()); | ||||||
|  | 
 | ||||||
| 	string error = "success"; | 	string error = "success"; | ||||||
| 
 | 
 | ||||||
| 	if(packet->type().compressable() && !packet->memory_state.fragment_entry) { | 	if(packet->type().compressable() && !packet->memory_state.fragment_entry) { | ||||||
| @ -498,7 +512,7 @@ bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &resu | |||||||
| 	std::vector<shared_ptr<ServerPacket>> fragments; | 	std::vector<shared_ptr<ServerPacket>> fragments; | ||||||
| 	fragments.reserve((size_t) (packet->data().length() / packet->type().max_length()) + 1); | 	fragments.reserve((size_t) (packet->data().length() / packet->type().max_length()) + 1); | ||||||
| 
 | 
 | ||||||
| 	if(packet->data().length() > packet->type().max_length()){ | 	if(packet->data().length() > packet->type().max_length()) { | ||||||
| 		if(!packet->type().fragmentable()) { | 		if(!packet->type().fragmentable()) { | ||||||
| 			logError(this->client->getServerId(), "{} We've tried to send a too long, not fragmentable, packet. Dropping packet of type {} with length {}", CLIENT_STR_LOG_PREFIX_(this->client), packet->type().name(), packet->data().length()); | 			logError(this->client->getServerId(), "{} We've tried to send a too long, not fragmentable, packet. Dropping packet of type {} with length {}", CLIENT_STR_LOG_PREFIX_(this->client), packet->type().name(), packet->data().length()); | ||||||
| 			return false; | 			return false; | ||||||
| @ -534,21 +548,21 @@ bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &resu | |||||||
| 
 | 
 | ||||||
| 		if(packet->getListener()) | 		if(packet->getListener()) | ||||||
| 			fragments.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :)
 | 			fragments.back()->setListener(std::move(packet->getListener())); //Move the listener to the last :)
 | ||||||
| 	} else | 	} else { | ||||||
| 		fragments.push_back(packet); | 		fragments.push_back(packet); | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	result.reserve(fragments.size()); | 	result.reserve(fragments.size()); | ||||||
| 
 | 
 | ||||||
| 	/* apply packet ids */ | 	/* apply packet ids */ | ||||||
| 	{ | 	for(const auto& fragment : fragments) { | ||||||
| 		lock_guard id_lock(this->packet_id_manager_lock); | 		if(!fragment->memory_state.id_branded) | ||||||
| 		for(const auto& fragment : fragments) { | 			fragment->applyPacketId(this->packet_id_manager); | ||||||
| 			if(!fragment->memory_state.id_branded) |  | ||||||
| 				fragment->applyPacketId(this->packet_id_manager); |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
|  | 	work_lock.unlock(); /* the rest could be unordered */ | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| 	auto statistics = this->client && this->client->getServer() ? this->client->connectionStatistics : nullptr; | 	auto statistics = this->client ? this->client->connectionStatistics : nullptr; | ||||||
| 	for(const auto& fragment : fragments) { | 	for(const auto& fragment : fragments) { | ||||||
| 		if(!this->crypt_handler.progressPacketOut(fragment.get(), error, false)){ | 		if(!this->crypt_handler.progressPacketOut(fragment.get(), error, false)){ | ||||||
| 			logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error); | 			logError(this->client->getServerId(), "{} Failed to encrypt packet. Error: {}", CLIENT_STR_LOG_PREFIX_(this->client), error); | ||||||
| @ -565,50 +579,50 @@ bool VoiceClientConnection::prepare_packet_for_write(vector<pipes::buffer> &resu | |||||||
| 	return true; | 	return true; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #ifdef VC_USE_READ_QUEUE | bool VoiceClientConnection::preprocess_write_packets() { | ||||||
| bool VoiceClientConnection::handleNextDatagram() { | 	std::shared_ptr<ServerPacket> packet{nullptr}; | ||||||
| 
 | 	vector<pipes::buffer> buffers{}; | ||||||
| 	bool flag_empty; | 	bool flag_more{false}; | ||||||
| 	pipes::buffer buffer; |  | ||||||
| 	{ |  | ||||||
| 		lock_guard<recursive_mutex> queue_lock(this->queueLock); |  | ||||||
| 		if(this->readQueue.empty()) return false; |  | ||||||
| 		buffer = std::move(this->readQueue.front()); |  | ||||||
| 		this->readQueue.pop_front(); |  | ||||||
| 		flag_empty = this->readQueue.empty(); |  | ||||||
| 	}; |  | ||||||
| 
 |  | ||||||
| 	try { |  | ||||||
| 		this->handleDatagramReceived(buffer); |  | ||||||
| 	} catch (std::exception& e) { |  | ||||||
| 		logCritical(this->client->getServerId(), "Handling of raw packet thrown an uncaught exception! (Message: " + string(e.what()) + ")"); |  | ||||||
| 	} |  | ||||||
| 	return flag_empty; |  | ||||||
| } |  | ||||||
| #endif |  | ||||||
| 
 |  | ||||||
| bool VoiceClientConnection::prepare_write_packets() { |  | ||||||
| 	/* get the next packet to prepare */ |  | ||||||
| 	unique_lock prepare_queue_lock(this->write_prepare_queue_lock); |  | ||||||
| 	if(this->write_prepare_queue.empty()) |  | ||||||
| 		return false; |  | ||||||
| 
 | 
 | ||||||
| 	prepare_process_count++; /* we're not preparing a packet */ | 	prepare_process_count++; /* we're not preparing a packet */ | ||||||
| 	auto packet = move(this->write_prepare_queue.front()); | 	for(auto& category : this->write_preprocess_queues) { | ||||||
| 	this->write_prepare_queue.pop_front(); | 		if(!category.has_work) continue; | ||||||
| 	auto flag_more = !this->write_prepare_queue.empty(); | 		else if(packet) { | ||||||
| 	prepare_queue_lock.unlock(); | 			flag_more = true; | ||||||
|  | 			break; | ||||||
|  | 		} | ||||||
| 
 | 
 | ||||||
| 	/* prepare the next packet */ | 		unique_lock work_lock{category.work_lock, try_to_lock}; | ||||||
| 	vector<pipes::buffer> buffers; | 		if(!work_lock) continue; /* This particular category will already be processed */ | ||||||
| 	if(!this->prepare_packet_for_write(buffers, packet)) { | 
 | ||||||
| 		logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client)); | 		{ | ||||||
| 		this->prepare_process_count--; | 			lock_guard buffer_lock{category.queue_lock}; | ||||||
| 		return flag_more; | 			if(category.queue.empty()) { | ||||||
|  | 				category.has_work = false; | ||||||
|  | 				continue; | ||||||
|  | 			} | ||||||
|  | 
 | ||||||
|  | 			packet = std::move(category.queue.front()); | ||||||
|  | 			category.queue.pop_front(); | ||||||
|  | 			category.has_work = !category.queue.empty(); | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if(!this->prepare_packet_for_write(buffers, packet, work_lock)) { | ||||||
|  | 			logError(this->client->getServerId(), "{} Dropping packet!", CLIENT_STR_LOG_PREFIX_(this->client)); | ||||||
|  | 			if(flag_more) | ||||||
|  | 				break; | ||||||
|  | 			else | ||||||
|  | 				continue; /* find out if we have more */ | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		if(flag_more) | ||||||
|  | 			break; | ||||||
|  | 		else | ||||||
|  | 			continue; /* find out if we have more */ | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	/* enqueue buffers for write */ | 	/* enqueue buffers for write */ | ||||||
| 	{ | 	if(!buffers.empty()) { | ||||||
| 		lock_guard write_queue_lock(this->write_queue_lock); | 		lock_guard write_queue_lock(this->write_queue_lock); | ||||||
| 		this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end()); | 		this->write_queue.insert(this->write_queue.end(), buffers.begin(), buffers.end()); | ||||||
| 	} | 	} | ||||||
| @ -646,12 +660,18 @@ int VoiceClientConnection::pop_write_buffer(pipes::buffer& target) { | |||||||
| 
 | 
 | ||||||
| bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) { | bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_point<chrono::system_clock> until) { | ||||||
| 	while(true) { | 	while(true) { | ||||||
| 		{ | 		for(auto& queue : this->write_preprocess_queues) { | ||||||
| 			lock_guard prepare_lock{this->write_prepare_queue_lock}; | 			{ | ||||||
| 			if(!this->write_prepare_queue.empty()) | 				lock_guard lock{queue.queue_lock}; | ||||||
| 				goto _wait; | 				if(!queue.queue.empty()) | ||||||
| 			if(this->prepare_process_count != 0) | 					goto _wait; | ||||||
| 				goto _wait; | 			} | ||||||
|  | 
 | ||||||
|  | 			{ | ||||||
|  | 				unique_lock lock{queue.work_lock, try_to_lock}; | ||||||
|  | 				if(!lock.owns_lock()) | ||||||
|  | 					goto _wait; | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 		{ | 		{ | ||||||
| 			lock_guard buffer_lock{this->write_queue_lock}; | 			lock_guard buffer_lock{this->write_queue_lock}; | ||||||
| @ -672,12 +692,16 @@ bool VoiceClientConnection::wait_empty_write_and_prepare_queue(chrono::time_poin | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| void VoiceClientConnection::reset() { | void VoiceClientConnection::reset() { | ||||||
|  | 	for(auto& queue : this->write_preprocess_queues) { | ||||||
|  | 		{ | ||||||
|  | 			lock_guard lock{queue.queue_lock}; | ||||||
|  | 			queue.queue.clear(); | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
| 	this->acknowledge_handler.reset(); | 	this->acknowledge_handler.reset(); | ||||||
| 	this->crypt_handler.reset(); | 	this->crypt_handler.reset(); | ||||||
| 	{ | 	this->packet_id_manager.reset(); | ||||||
| 		lock_guard manager_lock(this->packet_id_manager_lock); |  | ||||||
| 		this->packet_id_manager.reset(); |  | ||||||
| 	} |  | ||||||
| 
 | 
 | ||||||
| 	{ | 	{ | ||||||
| 		lock_guard buffer_lock(this->packet_buffer_lock); | 		lock_guard buffer_lock(this->packet_buffer_lock); | ||||||
|  | |||||||
| @ -55,7 +55,7 @@ namespace ts { | |||||||
| 			     * Split packets waiting in write_process_queue and moves the final buffers to writeQueue. | 			     * Split packets waiting in write_process_queue and moves the final buffers to writeQueue. | ||||||
| 			     * @returns true when there are more packets to prepare | 			     * @returns true when there are more packets to prepare | ||||||
| 			     */ | 			     */ | ||||||
|                 bool prepare_write_packets(); |                 bool preprocess_write_packets(); | ||||||
|                 /* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */ |                 /* return 2 => Nothing | 1 => More and buffer is set | 0 => Buffer is set, nothing more */ | ||||||
|                 int pop_write_buffer(pipes::buffer& /* buffer */); |                 int pop_write_buffer(pipes::buffer& /* buffer */); | ||||||
| 
 | 
 | ||||||
| @ -84,22 +84,64 @@ namespace ts { | |||||||
| 			    std::unique_ptr<protocol::ClientPacket> next_reassembled_packet(std::unique_lock<std::recursive_timed_mutex>& /* packet channel execute lock */, bool& /* have more */); | 			    std::unique_ptr<protocol::ClientPacket> next_reassembled_packet(std::unique_lock<std::recursive_timed_mutex>& /* packet channel execute lock */, bool& /* have more */); | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| #ifdef VC_USE_READ_QUEUE | 			    /* ---------- Write declarations ---------- */ | ||||||
|                 std::deque<pipes::buffer> readQueue; |  | ||||||
| #endif |  | ||||||
| 
 |  | ||||||
|                 spin_lock write_queue_lock; /* queue access isn't for long in general */ |                 spin_lock write_queue_lock; /* queue access isn't for long in general */ | ||||||
| 			    std::deque<pipes::buffer> write_queue; | 			    std::deque<pipes::buffer> write_queue; | ||||||
| 
 | 
 | ||||||
| 			    spin_lock write_prepare_queue_lock; /* preprocess queue access isn't for long in general */ | 			    struct WritePreprocessCategory { | ||||||
|                 std::deque<std::shared_ptr<protocol::ServerPacket>> write_prepare_queue; | 			    	enum value { | ||||||
|  | 			    		PING_PONG = 0, //Ping/Pongs
 | ||||||
|  | 					    ACK = 2, | ||||||
|  | 			    		VOICE_WHISPER = 1, //Voice/Whisper
 | ||||||
|  | 			    		COMMAND = 3, | ||||||
|  | 			    		INIT = 4, | ||||||
| 
 | 
 | ||||||
| 			    spin_lock packet_id_manager_lock;  /* packet id's must be generated in order; Calculating the ID should also not be take too much time */ | 					    MAX = INIT | ||||||
|  | 			    	}; | ||||||
|  | 
 | ||||||
|  | 			    	inline static value from_type(protocol::PacketType type) { | ||||||
|  | 			    		switch(type) { | ||||||
|  | 			    			case protocol::PING: | ||||||
|  | 						    case protocol::PONG: | ||||||
|  | 								return value::PING_PONG; | ||||||
|  | 
 | ||||||
|  | 						    case protocol::VOICE: | ||||||
|  | 						    case protocol::VOICE_WHISPER: | ||||||
|  | 							    return value::VOICE_WHISPER; | ||||||
|  | 
 | ||||||
|  | 						    case protocol::ACK: | ||||||
|  | 						    case protocol::ACK_LOW: | ||||||
|  | 							    return value::ACK; | ||||||
|  | 
 | ||||||
|  | 						    case protocol::COMMAND: | ||||||
|  | 						    case protocol::COMMAND_LOW: | ||||||
|  | 							    return value::COMMAND; | ||||||
|  | 
 | ||||||
|  | 						    default: | ||||||
|  | 							    return value::INIT; | ||||||
|  | 			    		} | ||||||
|  | 			    	} | ||||||
|  | 			    }; | ||||||
|  | 
 | ||||||
|  | 			    struct WritePreprocessQueue { | ||||||
|  | 				    int _zero1{0}; | ||||||
|  | 			    	bool has_work{false}; | ||||||
|  | 			    	std::mutex work_lock{}; | ||||||
|  | 
 | ||||||
|  | 			    	spin_lock queue_lock{}; | ||||||
|  | 				    std::deque<std::shared_ptr<protocol::ServerPacket>> queue{}; | ||||||
|  | 
 | ||||||
|  | 				    int _zero{0}; | ||||||
|  | 			    }; | ||||||
|  |                 std::array<WritePreprocessQueue, WritePreprocessCategory::MAX> write_preprocess_queues{}; | ||||||
|  | 
 | ||||||
|  |                 /* ---------- Processing ---------- */ | ||||||
|  |                 /* automatically locked because packets of the same kind should be lock their "work_lock" from their WritePreprocessQueue object */ | ||||||
| 			    protocol::PacketIdManager packet_id_manager; | 			    protocol::PacketIdManager packet_id_manager; | ||||||
| 
 | 
 | ||||||
|                 /* this function is thread save :) */ |                 /* this function is thread save :) */ | ||||||
|                 std::atomic<uint8_t> prepare_process_count{0}; /* current thread count preparing a packet */ |                 std::atomic<uint8_t> prepare_process_count{0}; /* current thread count preparing a packet */ | ||||||
|                 bool prepare_packet_for_write(std::vector<pipes::buffer> &/* buffers which need to be transferred */, const std::shared_ptr<protocol::ServerPacket> &/* the packet */); |                 bool prepare_packet_for_write(std::vector<pipes::buffer> &/* buffers which need to be transferred */, const std::shared_ptr<protocol::ServerPacket> &/* the packet */, std::unique_lock<std::mutex>& /* work lock */); | ||||||
| 
 | 
 | ||||||
| 		        std::recursive_mutex packet_buffer_lock; | 		        std::recursive_mutex packet_buffer_lock; | ||||||
| 		        packet_buffers_t _packet_buffers; | 		        packet_buffers_t _packet_buffers; | ||||||
|  | |||||||
| @ -482,7 +482,7 @@ void VoiceServer::handleMessageWrite(int fd, short events, void *_event_handle) | |||||||
| 			auto client_ptr = &*client; | 			auto client_ptr = &*client; | ||||||
| 
 | 
 | ||||||
| 			TIMING_STEP(timings, "client get"); | 			TIMING_STEP(timings, "client get"); | ||||||
| 			more_to_prepare = connection->prepare_write_packets(); | 			more_to_prepare = connection->preprocess_write_packets(); | ||||||
| 			TIMING_STEP(timings, "client prepare"); | 			TIMING_STEP(timings, "client prepare"); | ||||||
| 
 | 
 | ||||||
| 			while(system_clock::now() <= write_timeout) { | 			while(system_clock::now() <= write_timeout) { | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user