From 5842bbe0676cb06c3947943fb5dd422aff0bef16 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Fri, 24 Apr 2020 22:04:04 +0200 Subject: [PATCH] A lot of updates (Speed improvement) --- src/PermissionManager.h | 4 +- src/Properties.cpp | 4 +- src/Properties.h | 4 +- src/misc/{spin_lock.h => spin_mutex.h} | 2 +- src/protocol/AcknowledgeManager.cpp | 25 +++-- src/protocol/AcknowledgeManager.h | 16 +-- src/protocol/Packet.cpp | 136 +++++++++++++++++++++++++ src/protocol/Packet.h | 96 +++++++++++++---- src/sql/mysql/MySQL.h | 4 +- 9 files changed, 243 insertions(+), 48 deletions(-) rename src/misc/{spin_lock.h => spin_mutex.h} (96%) diff --git a/src/PermissionManager.h b/src/PermissionManager.h index af66e84..8fa1257 100644 --- a/src/PermissionManager.h +++ b/src/PermissionManager.h @@ -12,7 +12,7 @@ #include #include #include /* for memset */ -#include "./misc/spin_lock.h" +#include "misc/spin_mutex.h" #include "Definitions.h" #include "Variable.h" #include "spdlog/fmt/ostr.h" // must be included @@ -894,7 +894,7 @@ namespace ts { bool requires_db_save = false; ts_always_inline void trigger_db_update() { this->requires_db_save = true; } - spin_lock block_use_count_lock{}; + spin_mutex block_use_count_lock{}; int16_t block_use_count[BULK_COUNT]; PermissionContainerBulk* block_containers[BULK_COUNT]; diff --git a/src/Properties.cpp b/src/Properties.cpp index fe3f919..364818b 100644 --- a/src/Properties.cpp +++ b/src/Properties.cpp @@ -91,7 +91,7 @@ bool Properties::register_property_type(ts::property::PropertyType type, size_t for(int index = 0; index < bundle->length; index++) { auto& property = bundle->properties[index]; property.value.~string(); - property.value_lock.~spin_lock(); + property.value_lock.~spin_mutex(); property.casted_value.~any(); } ::free(bundle); @@ -104,7 +104,7 @@ bool Properties::register_property_type(ts::property::PropertyType type, size_t auto& property = ptr->properties[index]; new (&property.casted_value) any(); - new (&property.value_lock) spin_lock(); + new (&property.value_lock) spin_mutex(); new (&property.value) string(); property.description = &property::describe(type, index); property.flag_modified = false; diff --git a/src/Properties.h b/src/Properties.h index 897d878..85052b7 100644 --- a/src/Properties.h +++ b/src/Properties.h @@ -15,7 +15,7 @@ #include #include -#include "misc/spin_lock.h" +#include "misc/spin_mutex.h" #include "converters/converter.h" #ifdef NDEBUG @@ -684,7 +684,7 @@ namespace ts { class Properties; struct PropertyData { - spin_lock value_lock; + spin_mutex value_lock; std::any casted_value; std::string value; const property::PropertyDescription* description; diff --git a/src/misc/spin_lock.h b/src/misc/spin_mutex.h similarity index 96% rename from src/misc/spin_lock.h rename to src/misc/spin_mutex.h index b33ea9d..8fe5db6 100644 --- a/src/misc/spin_lock.h +++ b/src/misc/spin_mutex.h @@ -9,7 +9,7 @@ #define always_inline inline __attribute__((__always_inline__)) #endif -class spin_lock { +class spin_mutex { std::atomic_bool locked{false}; public: always_inline void lock() { diff --git a/src/protocol/AcknowledgeManager.cpp b/src/protocol/AcknowledgeManager.cpp index 7dfd683..249ffae 100644 --- a/src/protocol/AcknowledgeManager.cpp +++ b/src/protocol/AcknowledgeManager.cpp @@ -33,22 +33,21 @@ size_t AcknowledgeManager::awaiting_acknowledge() { return this->entries.size(); } -void AcknowledgeManager::process_packet(ts::protocol::BasicPacket &packet) { - if(!packet.type().requireAcknowledge()) return; +void AcknowledgeManager::process_packet(uint8_t type, uint32_t id, void *ptr, std::unique_ptr> ack) { + std::shared_ptr entry{new Entry{}, [&](Entry* entry){ + this->destroy_packet(entry->packet_ptr); + delete entry; + }}; + entry->acknowledge_listener = std::move(ack); - auto entry = make_shared(); - entry->acknowledge_listener = std::move(packet.getListener()); - - entry->buffer = packet.buffer(); + entry->packet_type = type; + entry->packet_full_id = id; + entry->packet_ptr = ptr; entry->resend_count = 0; entry->first_send = system_clock::now(); entry->next_resend = entry->first_send + std::chrono::milliseconds{(int64_t) ceil(this->rto)}; - entry->packet_type = packet.type().type(); - entry->packet_id = packet.packetId(); - entry->generation_id = packet.generationId(); - entry->acknowledged = false; entry->send_count = 1; { @@ -65,7 +64,7 @@ bool AcknowledgeManager::process_acknowledge(uint8_t packet_type, uint16_t targe { std::lock_guard lock{this->entry_lock}; for(auto it = this->entries.begin(); it != this->entries.end(); it++) { - if((*it)->packet_type == target_type && (*it)->packet_id == target_id) { + if((*it)->packet_type == target_type && (*it)->packet_full_id == target_id) { entry = *it; ack_listener = std::move(entry->acknowledge_listener); /* move it out so nobody else could call it as well */ @@ -110,7 +109,7 @@ ssize_t AcknowledgeManager::execute_resend(const system_clock::time_point& now , if(entry->next_resend <= now) { entry->next_resend = now + std::chrono::milliseconds{(int64_t) std::min(ceil(this->rto), 1500.f)}; need_resend.push_back(entry); - entry->resend_count++; + //entry->resend_count++; /* this MUST be incremented by the result handler (resend may fails) */ entry->send_count++; } if(next_resend > entry->next_resend) @@ -126,7 +125,7 @@ ssize_t AcknowledgeManager::execute_resend(const system_clock::time_point& now , for(const auto& packet : need_resend) { if(packet->resend_count > 15 && packet->first_send + seconds(15) < now) { //FIXME configurable - error = "Failed to receive acknowledge for packet " + to_string(packet->packet_id) + " of type " + PacketTypeInfo::fromid(packet->packet_type).name(); + error = "Failed to receive acknowledge for packet " + to_string(packet->packet_full_id) + " of type " + PacketTypeInfo::fromid(packet->packet_type).name(); return -1; } diff --git a/src/protocol/AcknowledgeManager.h b/src/protocol/AcknowledgeManager.h index 81d6416..28a892d 100644 --- a/src/protocol/AcknowledgeManager.h +++ b/src/protocol/AcknowledgeManager.h @@ -6,22 +6,22 @@ #define DEBUG_ACKNOWLEDGE namespace ts::connection { class VoiceClientConnection; + class AcknowledgeManager { public: struct Entry { - uint16_t packet_id{0}; - uint16_t generation_id{0}; - + uint32_t packet_full_id{0}; uint8_t packet_type{0xFF}; + uint8_t resend_count{0}; bool acknowledged : 1; uint8_t send_count : 7; - - pipes::buffer buffer; std::chrono::system_clock::time_point first_send; std::chrono::system_clock::time_point next_resend; std::unique_ptr> acknowledge_listener; + + void* packet_ptr; }; AcknowledgeManager(); @@ -30,8 +30,8 @@ namespace ts::connection { size_t awaiting_acknowledge(); void reset(); - void process_packet(ts::protocol::BasicPacket& /* packet */); - bool process_acknowledge(uint8_t packet_type, uint16_t /* packet id */, std::string& /* error */); + void process_packet(uint8_t /* packet type */, uint32_t /* full packet id */, void* /* packet ptr */, std::unique_ptr> /* ack listener */); + bool process_acknowledge(uint8_t /* packet type */, uint16_t /* packet id */, std::string& /* error */); ssize_t execute_resend( const std::chrono::system_clock::time_point& /* now */, @@ -43,6 +43,8 @@ namespace ts::connection { [[nodiscard]] inline auto current_rto() const { return this->rto; } [[nodiscard]] inline auto current_srtt() const { return this->srtt; } [[nodiscard]] inline auto current_rttvar() const { return this->rttvar; } + + void(*destroy_packet)(void* /* packet */); private: std::mutex entry_lock; std::deque> entries; diff --git a/src/protocol/Packet.cpp b/src/protocol/Packet.cpp index efeb9b9..af78482 100644 --- a/src/protocol/Packet.cpp +++ b/src/protocol/Packet.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "Packet.h" #include "buffers.h" #include "misc/endianness.h" @@ -244,4 +245,139 @@ namespace ts { uint8_t ServerPacketParser::type() const { return (uint8_t) this->_buffer[ClientPacketParser::kHeaderOffset + 2] & 0xFU; } uint8_t ServerPacketParser::flags() const { return (uint8_t) this->_buffer[ClientPacketParser::kHeaderOffset + 2] & 0xF0U; } } + + void construct_osp(protocol::OutgoingServerPacket* packet) { + new (&packet->ref_count) std::atomic{}; + } + + void deconstruct_osp(protocol::OutgoingServerPacket* packet) { + packet->ref_count.~atomic(); + } + + void reset_osp(protocol::OutgoingServerPacket* packet, size_t payload_size) { + packet->next = nullptr; + packet->payload_size = payload_size; + + packet->generation = 0; + } + +#if 1 + #define BUKKIT_ENTRY_SIZE (1650) + #define BUKKIT_MAX_ENTRIES (3000) + + struct OSPBukkitEntry { + bool extra_allocated; + OSPBukkitEntry* next; + }; + + spin_mutex osp_mutex{}; + size_t sdp_count{0}; + OSPBukkitEntry* osp_head{nullptr}; + OSPBukkitEntry** osp_tail{&osp_head}; + + protocol::OutgoingServerPacket* osp_from_bosp(OSPBukkitEntry* bops) { + return reinterpret_cast((char*) bops + sizeof(OSPBukkitEntry)); + } + + OSPBukkitEntry* bosp_from_osp(protocol::OutgoingServerPacket* ops) { + return reinterpret_cast((char*) ops - sizeof(OSPBukkitEntry)); + } + + void destroy_bosp(OSPBukkitEntry* entry) { + deconstruct_osp(osp_from_bosp(entry)); + ::free(entry); + } + + OSPBukkitEntry* construct_bosp(size_t payload_size) { + auto base_size = sizeof(OSPBukkitEntry) + sizeof(protocol::OutgoingServerPacket) - 1; + auto full_size = base_size + payload_size; + auto bentry = (OSPBukkitEntry*) malloc(full_size); + + bentry->next = nullptr; + bentry->extra_allocated = false; + + construct_osp(osp_from_bosp(bentry)); + return bentry; + } + + void protocol::OutgoingServerPacket::object_freed() { + auto bentry = (OSPBukkitEntry*) bosp_from_osp(this); + if(bentry->extra_allocated) { + destroy_bosp(bentry); + return; + } + + std::unique_lock block{osp_mutex}; + if(sdp_count >= BUKKIT_MAX_ENTRIES) { + block.unlock(); + destroy_bosp(bentry); + return; + } + + assert(!bentry->next); + *osp_tail = bentry; + osp_tail = &bentry->next; + sdp_count++; + } + + protocol::OutgoingServerPacket* protocol::allocate_outgoing_packet(size_t payload_size) { + if(BUKKIT_ENTRY_SIZE > payload_size) { + std::lock_guard block{osp_mutex}; + if(osp_head) { + assert(sdp_count > 0); + sdp_count--; + auto entry = osp_head; + if(osp_head->next) { + assert(osp_tail != &osp_head->next); + osp_head = osp_head->next; + } else { + assert(osp_tail == &osp_head->next); + osp_head = nullptr; + osp_tail = &osp_head; + } + + entry->next = nullptr; + + auto result = osp_from_bosp(entry); + reset_osp(result, payload_size); + result->ref_count++; + return result; + } else if(sdp_count < BUKKIT_MAX_ENTRIES) { + auto entry = construct_bosp(BUKKIT_ENTRY_SIZE); + entry->extra_allocated = false; + + auto result = osp_from_bosp(entry); + reset_osp(result, payload_size); + result->ref_count++; + return result; + } + } + + auto entry = construct_bosp(payload_size); + entry->extra_allocated = true; + + auto result = osp_from_bosp(entry); + reset_osp(result, payload_size); + result->ref_count++; + return result; + } +#else + void protocol::OutgoingServerPacket::object_freed() { + //TODO: Bukkit list? + deconstruct_osp(this); + ::free(this); + } + + protocol::OutgoingServerPacket* protocol::allocate_outgoing_packet(size_t payload_size) { + auto base_size = sizeof(protocol::OutgoingServerPacket) - 1; + auto full_size = base_size + payload_size; + auto result = (protocol::OutgoingServerPacket*) malloc(full_size); + + construct_osp(result); + reset_osp(result, payload_size); + result->ref_count++; + + return result; + } +#endif } \ No newline at end of file diff --git a/src/protocol/Packet.h b/src/protocol/Packet.h index 97beac5..dce57ff 100644 --- a/src/protocol/Packet.h +++ b/src/protocol/Packet.h @@ -75,37 +75,37 @@ namespace ts { bool owns_data = false; }; - struct PacketIdManagerData { - PacketIdManagerData(){ - memset(this->packetCounter, 0, sizeof(uint32_t) * 16); - } - uint32_t packetCounter[16]{}; - }; - class PacketIdManager { public: - PacketIdManager() : data(new PacketIdManagerData){} + PacketIdManager() { + this->reset(); + } + ~PacketIdManager() = default; - PacketIdManager(const PacketIdManager& ref) : data(ref.data) {} - PacketIdManager(PacketIdManager&& ref) : data(std::move(ref.data)) {} + PacketIdManager(const PacketIdManager& ref) = delete; + PacketIdManager(PacketIdManager&& ref) = delete; - uint16_t nextPacketId(const PacketTypeInfo &type){ - return static_cast(data->packetCounter[type.type()]++ & 0xFFFF); + [[nodiscard]] uint16_t nextPacketId(const PacketTypeInfo &type){ + return static_cast(this->packetCounter[type.type()]++ & 0xFFFF); } - uint16_t currentPacketId(const PacketTypeInfo &type){ - return static_cast(data->packetCounter[type.type()] & 0xFFFF); + [[nodiscard]] uint16_t currentPacketId(const PacketTypeInfo &type){ + return static_cast(this->packetCounter[type.type()] & 0xFFFF); } - uint16_t generationId(const PacketTypeInfo &type){ - return static_cast((data->packetCounter[type.type()] >> 16) & 0xFFFF); + [[nodiscard]] uint16_t generationId(const PacketTypeInfo &type){ + return static_cast((this->packetCounter[type.type()] >> 16) & 0xFFFF); + } + + [[nodiscard]] uint32_t generate_full_id(const PacketType& type) { + return this->packetCounter[type]++; } void reset() { - memset(&data->packetCounter[0], 0, sizeof(uint32_t) * 16); + memset(&this->packetCounter[0], 0, sizeof(uint32_t) * 16); } private: - std::shared_ptr data; + uint32_t packetCounter[16]{}; }; namespace PacketFlag { @@ -383,12 +383,16 @@ namespace ts { void setPacketId(uint16_t, uint16_t) override; }; - class ServerPacketParser : public PacketParser { + class ServerPacketP { public: constexpr static auto kHeaderOffset = 8; constexpr static auto kHeaderLength = SERVER_HEADER_SIZE; constexpr static auto kPayloadOffset = kHeaderOffset + SERVER_HEADER_SIZE; + }; + + class ServerPacketParser : public PacketParser, public ServerPacketP { + public: explicit ServerPacketParser(pipes::buffer_view buffer) : PacketParser{std::move(buffer)} {} ServerPacketParser(const ServerPacketParser&) = delete; @@ -404,5 +408,59 @@ namespace ts { [[nodiscard]] uint8_t type() const override; [[nodiscard]] uint8_t flags() const override; }; + + struct OutgoingServerPacket { + public: + /* general info */ + std::atomic ref_count; + size_t payload_size; + + OutgoingServerPacket* next; /* used within the write/process queue */ + uint16_t generation; + + /* actual packet data */ + uint8_t mac[8]; + uint8_t packet_id_bytes[2]; + uint8_t type_and_flags; + uint8_t payload[1]; /* variable size */ + + [[nodiscard]] inline const void* packet_data() const { + return this->mac; + } + + [[nodiscard]] inline size_t packet_length() const { + return this->payload_size + (8 + 2 + 1); + } + + inline auto ref() { + auto count = ++ref_count; + assert(count > 1); + return count; + } + + inline void unref() { + if(--this->ref_count == 0) + this->object_freed(); + } + + /* some helper methods */ + inline void set_packet_id(uint16_t id) { + this->packet_id_bytes[0] = id >> 8U; + this->packet_id_bytes[1] = id & 0xFFU; + } + + [[nodiscard]] inline auto packet_id() const { + return (uint16_t) (this->packet_id_bytes[0] << 8U) | this->packet_id_bytes[1]; + } + + [[nodiscard]] inline auto packet_type() const { + return (PacketType) (this->type_and_flags & 0xF); + } + private: + void object_freed(); + }; + + /* This will allocate a new outgoing packet. To delete just unref the packet! */ + OutgoingServerPacket* allocate_outgoing_packet(size_t /* payload size */); } } \ No newline at end of file diff --git a/src/sql/mysql/MySQL.h b/src/sql/mysql/MySQL.h index d3071ce..327e235 100644 --- a/src/sql/mysql/MySQL.h +++ b/src/sql/mysql/MySQL.h @@ -6,7 +6,7 @@ #include #include "sql/SqlQuery.h" -#include "../../misc/spin_lock.h" +#include "misc/spin_mutex.h" #if defined(HAVE_MYSQL_MYSQL_H) #include @@ -31,7 +31,7 @@ namespace sql::mysql { struct Connection { MYSQL* handle = nullptr; - spin_lock used_lock; + spin_mutex used_lock; bool used = false; ~Connection();