From 72661d17bc59c5710e2d8c564d2315db41e8f3fc Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Tue, 18 Feb 2020 11:53:32 +0100 Subject: [PATCH] Fixed some crashes --- src/protocol/AcknowledgeManager.cpp | 90 +++++++++++++---------------- src/protocol/AcknowledgeManager.h | 78 ++++++++++++------------- 2 files changed, 77 insertions(+), 91 deletions(-) diff --git a/src/protocol/AcknowledgeManager.cpp b/src/protocol/AcknowledgeManager.cpp index 06a93f4..b2ee670 100644 --- a/src/protocol/AcknowledgeManager.cpp +++ b/src/protocol/AcknowledgeManager.cpp @@ -1,5 +1,5 @@ #include "AcknowledgeManager.h" -#include +#include #include using namespace ts; @@ -8,30 +8,27 @@ using namespace ts::protocol; using namespace std; using namespace std::chrono; -AcknowledgeManager::AcknowledgeManager() {} +AcknowledgeManager::AcknowledgeManager() = default; AcknowledgeManager::~AcknowledgeManager() { - { - lock_guard lock(this->entry_lock); - for(const auto& entry : this->entries) - if(entry->acknowledge_listener) - entry->acknowledge_listener->executionFailed("deleted"); - this->entries.clear(); - } + this->reset(); } void AcknowledgeManager::reset() { { - lock_guard lock(this->entry_lock); - for(const auto& entry : this->entries) + std::unique_lock lock{this->entry_lock}; + auto pending_entries = std::move(this->entries); + lock.unlock(); + + /* save because entries are not accessable anymore */ + for(const auto& entry : pending_entries) if(entry->acknowledge_listener) entry->acknowledge_listener->executionFailed("reset"); - this->entries.clear(); } } size_t AcknowledgeManager::awaiting_acknowledge() { - lock_guard lock(this->entry_lock); + std::lock_guard lock(this->entry_lock); return this->entries.size(); } @@ -54,7 +51,7 @@ void AcknowledgeManager::process_packet(ts::protocol::BasicPacket &packet) { entry->acknowledged = false; entry->send_count = 1; { - lock_guard lock(this->entry_lock); + std::lock_guard lock(this->entry_lock); this->entries.push_front(std::move(entry)); } } @@ -62,25 +59,18 @@ void AcknowledgeManager::process_packet(ts::protocol::BasicPacket &packet) { bool AcknowledgeManager::process_acknowledge(uint8_t packet_type, const pipes::buffer_view& payload, std::string& error) { if(payload.length() < 2) return false; - PacketType target_type = PacketType::UNDEFINED; - uint16_t target_id = 0; - - if(packet_type == protocol::ACK_LOW) target_type = PacketType::COMMAND_LOW; - else if(packet_type == protocol::ACK) target_type = PacketType::COMMAND; - target_id = be2le16((char*) payload.data_ptr()); - //debugMessage(0, "Got ack for {} {}", target_type, target_id); - - if(target_type == PacketType::UNDEFINED) { - error = "Invalid packet type (" + to_string(target_type) + ")"; - return false; - } + PacketType target_type{packet_type == protocol::ACK_LOW ? PacketType::COMMAND_LOW : PacketType::COMMAND}; + uint16_t target_id{be2le16((char*) payload.data_ptr())}; std::shared_ptr entry; + std::unique_ptr> ack_listener; { - lock_guard lock(this->entry_lock); + std::lock_guard lock{this->entry_lock}; for(auto it = this->entries.begin(); it != this->entries.end(); it++) { if((*it)->packet_type == target_type && (*it)->packet_id == target_id) { entry = *it; + ack_listener = std::move(entry->acknowledge_listener); /* move it out so nobody else could call it as well */ + entry->send_count--; if(entry->send_count == 0) this->entries.erase(it); @@ -100,46 +90,44 @@ bool AcknowledgeManager::process_acknowledge(uint8_t packet_type, const pipes::b } entry->acknowledged = true; - if(entry->acknowledge_listener) - entry->acknowledge_listener->executionSucceed(true); - entry->acknowledge_listener.reset(); - + if(ack_listener) ack_listener->executionSucceed(true); return true; } ssize_t AcknowledgeManager::execute_resend(const system_clock::time_point& now , std::chrono::system_clock::time_point &next_resend,std::deque& buffers, string& error) { - ssize_t resend_count = 0; + size_t resend_count{0}; - deque> need_resend; + vector> need_resend; { - deque> erase; + bool cleanup{false}; + std::lock_guard lock{this->entry_lock}; + need_resend.resize(this->entries.size()); - lock_guard lock(this->entry_lock); for (auto &entry : this->entries) { - if(!entry->acknowledged && entry->next_resend <= now) { - entry->resend_period = entry->resend_period + milliseconds((int) ceil(this->average_response * 2)); - if(entry->resend_period.count() > 1000) - entry->resend_period = milliseconds(1000); - else if(entry->resend_period.count() < 25) - entry->resend_period = milliseconds(25); - - entry->next_resend = now + entry->resend_period; - need_resend.push_front(entry); - } if(entry->acknowledged) { - if(entry->next_resend + entry->resend_period <= now) { //Timeout for may (more acknowledges) - erase.push_back(entry); + if(entry->next_resend + entry->resend_period <= now) { // Some resends are lost. So we just drop it after time + entry.reset(); + cleanup = true; } } else { + if(entry->next_resend <= now) { + entry->resend_period = entry->resend_period + milliseconds{(int) ceil(this->average_response * 2)}; + if(entry->resend_period.count() > 1000) + entry->resend_period = milliseconds(1000); + else if(entry->resend_period.count() < 25) + entry->resend_period = milliseconds(25); + + entry->next_resend = now + entry->resend_period; + need_resend.push_back(entry); + } if(next_resend > entry->next_resend) next_resend = entry->next_resend; } } - for(const auto& e : erase) { - auto it = find(this->entries.begin(), this->entries.end(), e); - if(it != this->entries.end()) - this->entries.erase(it); + if(cleanup) { + this->entries.erase(std::remove_if(this->entries.begin(), this->entries.end(), + [](const auto& entry) { return !entry; }), this->entries.end()); } } diff --git a/src/protocol/AcknowledgeManager.h b/src/protocol/AcknowledgeManager.h index 52e9c82..e3d3352 100644 --- a/src/protocol/AcknowledgeManager.h +++ b/src/protocol/AcknowledgeManager.h @@ -4,48 +4,46 @@ #include #define DEBUG_ACKNOWLEDGE -namespace ts { - namespace connection { - class VoiceClientConnection; - class AcknowledgeManager { - struct Entry { - uint16_t packet_id = 0; - uint8_t packet_type = 0xFF; - uint8_t resend_count = 0; - bool acknowledged : 1; - uint8_t send_count : 7; +namespace ts::connection { + class VoiceClientConnection; + class AcknowledgeManager { + struct Entry { + uint16_t packet_id = 0; + uint8_t packet_type = 0xFF; + uint8_t resend_count = 0; + bool acknowledged : 1; + uint8_t send_count : 7; - pipes::buffer buffer; - std::chrono::system_clock::time_point first_send; - std::chrono::system_clock::time_point next_resend; - std::chrono::milliseconds resend_period; + pipes::buffer buffer; + std::chrono::system_clock::time_point first_send; + std::chrono::system_clock::time_point next_resend; + std::chrono::milliseconds resend_period; - std::unique_ptr> acknowledge_listener; - }; - public: - AcknowledgeManager(); - virtual ~AcknowledgeManager(); - - size_t awaiting_acknowledge(); - void reset(); - - void process_packet(ts::protocol::BasicPacket& /* packet */); - bool process_acknowledge(uint8_t packet_type, const pipes::buffer_view& /* payload */, std::string& /* error */); - - ssize_t execute_resend( - const std::chrono::system_clock::time_point& /* now */, - std::chrono::system_clock::time_point& /* next resend */, - std::deque& /* buffers to resend */, - std::string& /* error */ - ); - private: - std::recursive_mutex entry_lock; - std::deque> entries; - - std::chrono::milliseconds resend_delay{500}; - - double average_response = 20; + std::unique_ptr> acknowledge_listener; }; - } + public: + AcknowledgeManager(); + virtual ~AcknowledgeManager(); + + size_t awaiting_acknowledge(); + void reset(); + + void process_packet(ts::protocol::BasicPacket& /* packet */); + bool process_acknowledge(uint8_t packet_type, const pipes::buffer_view& /* payload */, std::string& /* error */); + + ssize_t execute_resend( + const std::chrono::system_clock::time_point& /* now */, + std::chrono::system_clock::time_point& /* next resend */, + std::deque& /* buffers to resend */, + std::string& /* error */ + ); + private: + std::mutex entry_lock; + std::deque> entries; + + std::chrono::milliseconds resend_delay{500}; + + double average_response = 20; + }; } \ No newline at end of file