A lot of updates (Speed improvement)

This commit is contained in:
WolverinDEV 2020-04-24 22:04:04 +02:00
parent 2725c57f2e
commit 5842bbe067
9 changed files with 243 additions and 48 deletions

View File

@ -12,7 +12,7 @@
#include <shared_mutex>
#include <cassert>
#include <cstring> /* for memset */
#include "./misc/spin_lock.h"
#include "misc/spin_mutex.h"
#include "Definitions.h"
#include "Variable.h"
#include "spdlog/fmt/ostr.h" // must be included
@ -894,7 +894,7 @@ namespace ts {
bool requires_db_save = false;
ts_always_inline void trigger_db_update() { this->requires_db_save = true; }
spin_lock block_use_count_lock{};
spin_mutex block_use_count_lock{};
int16_t block_use_count[BULK_COUNT];
PermissionContainerBulk<PERMISSIONS_BULK_ENTRY_COUNT>* block_containers[BULK_COUNT];

View File

@ -91,7 +91,7 @@ bool Properties::register_property_type(ts::property::PropertyType type, size_t
for(int index = 0; index < bundle->length; index++) {
auto& property = bundle->properties[index];
property.value.~string();
property.value_lock.~spin_lock();
property.value_lock.~spin_mutex();
property.casted_value.~any();
}
::free(bundle);
@ -104,7 +104,7 @@ bool Properties::register_property_type(ts::property::PropertyType type, size_t
auto& property = ptr->properties[index];
new (&property.casted_value) any();
new (&property.value_lock) spin_lock();
new (&property.value_lock) spin_mutex();
new (&property.value) string();
property.description = &property::describe(type, index);
property.flag_modified = false;

View File

@ -15,7 +15,7 @@
#include <any>
#include <array>
#include "misc/spin_lock.h"
#include "misc/spin_mutex.h"
#include "converters/converter.h"
#ifdef NDEBUG
@ -684,7 +684,7 @@ namespace ts {
class Properties;
struct PropertyData {
spin_lock value_lock;
spin_mutex value_lock;
std::any casted_value;
std::string value;
const property::PropertyDescription* description;

View File

@ -9,7 +9,7 @@
#define always_inline inline __attribute__((__always_inline__))
#endif
class spin_lock {
class spin_mutex {
std::atomic_bool locked{false};
public:
always_inline void lock() {

View File

@ -33,22 +33,21 @@ size_t AcknowledgeManager::awaiting_acknowledge() {
return this->entries.size();
}
void AcknowledgeManager::process_packet(ts::protocol::BasicPacket &packet) {
if(!packet.type().requireAcknowledge()) return;
void AcknowledgeManager::process_packet(uint8_t type, uint32_t id, void *ptr, std::unique_ptr<threads::Future<bool>> ack) {
std::shared_ptr<Entry> entry{new Entry{}, [&](Entry* entry){
this->destroy_packet(entry->packet_ptr);
delete entry;
}};
entry->acknowledge_listener = std::move(ack);
auto entry = make_shared<Entry>();
entry->acknowledge_listener = std::move(packet.getListener());
entry->buffer = packet.buffer();
entry->packet_type = type;
entry->packet_full_id = id;
entry->packet_ptr = ptr;
entry->resend_count = 0;
entry->first_send = system_clock::now();
entry->next_resend = entry->first_send + std::chrono::milliseconds{(int64_t) ceil(this->rto)};
entry->packet_type = packet.type().type();
entry->packet_id = packet.packetId();
entry->generation_id = packet.generationId();
entry->acknowledged = false;
entry->send_count = 1;
{
@ -65,7 +64,7 @@ bool AcknowledgeManager::process_acknowledge(uint8_t packet_type, uint16_t targe
{
std::lock_guard lock{this->entry_lock};
for(auto it = this->entries.begin(); it != this->entries.end(); it++) {
if((*it)->packet_type == target_type && (*it)->packet_id == target_id) {
if((*it)->packet_type == target_type && (*it)->packet_full_id == target_id) {
entry = *it;
ack_listener = std::move(entry->acknowledge_listener); /* move it out so nobody else could call it as well */
@ -110,7 +109,7 @@ ssize_t AcknowledgeManager::execute_resend(const system_clock::time_point& now ,
if(entry->next_resend <= now) {
entry->next_resend = now + std::chrono::milliseconds{(int64_t) std::min(ceil(this->rto), 1500.f)};
need_resend.push_back(entry);
entry->resend_count++;
//entry->resend_count++; /* this MUST be incremented by the result handler (resend may fails) */
entry->send_count++;
}
if(next_resend > entry->next_resend)
@ -126,7 +125,7 @@ ssize_t AcknowledgeManager::execute_resend(const system_clock::time_point& now ,
for(const auto& packet : need_resend) {
if(packet->resend_count > 15 && packet->first_send + seconds(15) < now) { //FIXME configurable
error = "Failed to receive acknowledge for packet " + to_string(packet->packet_id) + " of type " + PacketTypeInfo::fromid(packet->packet_type).name();
error = "Failed to receive acknowledge for packet " + to_string(packet->packet_full_id) + " of type " + PacketTypeInfo::fromid(packet->packet_type).name();
return -1;
}

View File

@ -6,22 +6,22 @@
#define DEBUG_ACKNOWLEDGE
namespace ts::connection {
class VoiceClientConnection;
class AcknowledgeManager {
public:
struct Entry {
uint16_t packet_id{0};
uint16_t generation_id{0};
uint32_t packet_full_id{0};
uint8_t packet_type{0xFF};
uint8_t resend_count{0};
bool acknowledged : 1;
uint8_t send_count : 7;
pipes::buffer buffer;
std::chrono::system_clock::time_point first_send;
std::chrono::system_clock::time_point next_resend;
std::unique_ptr<threads::Future<bool>> acknowledge_listener;
void* packet_ptr;
};
AcknowledgeManager();
@ -30,8 +30,8 @@ namespace ts::connection {
size_t awaiting_acknowledge();
void reset();
void process_packet(ts::protocol::BasicPacket& /* packet */);
bool process_acknowledge(uint8_t packet_type, uint16_t /* packet id */, std::string& /* error */);
void process_packet(uint8_t /* packet type */, uint32_t /* full packet id */, void* /* packet ptr */, std::unique_ptr<threads::Future<bool>> /* ack listener */);
bool process_acknowledge(uint8_t /* packet type */, uint16_t /* packet id */, std::string& /* error */);
ssize_t execute_resend(
const std::chrono::system_clock::time_point& /* now */,
@ -43,6 +43,8 @@ namespace ts::connection {
[[nodiscard]] inline auto current_rto() const { return this->rto; }
[[nodiscard]] inline auto current_srtt() const { return this->srtt; }
[[nodiscard]] inline auto current_rttvar() const { return this->rttvar; }
void(*destroy_packet)(void* /* packet */);
private:
std::mutex entry_lock;
std::deque<std::shared_ptr<Entry>> entries;

View File

@ -5,6 +5,7 @@
#include <cstring>
#include <iostream>
#include <bitset>
#include <src/misc/spin_mutex.h>
#include "Packet.h"
#include "buffers.h"
#include "misc/endianness.h"
@ -244,4 +245,139 @@ namespace ts {
uint8_t ServerPacketParser::type() const { return (uint8_t) this->_buffer[ClientPacketParser::kHeaderOffset + 2] & 0xFU; }
uint8_t ServerPacketParser::flags() const { return (uint8_t) this->_buffer[ClientPacketParser::kHeaderOffset + 2] & 0xF0U; }
}
void construct_osp(protocol::OutgoingServerPacket* packet) {
new (&packet->ref_count) std::atomic<uint16_t>{};
}
void deconstruct_osp(protocol::OutgoingServerPacket* packet) {
packet->ref_count.~atomic<uint16_t>();
}
void reset_osp(protocol::OutgoingServerPacket* packet, size_t payload_size) {
packet->next = nullptr;
packet->payload_size = payload_size;
packet->generation = 0;
}
#if 1
#define BUKKIT_ENTRY_SIZE (1650)
#define BUKKIT_MAX_ENTRIES (3000)
struct OSPBukkitEntry {
bool extra_allocated;
OSPBukkitEntry* next;
};
spin_mutex osp_mutex{};
size_t sdp_count{0};
OSPBukkitEntry* osp_head{nullptr};
OSPBukkitEntry** osp_tail{&osp_head};
protocol::OutgoingServerPacket* osp_from_bosp(OSPBukkitEntry* bops) {
return reinterpret_cast<protocol::OutgoingServerPacket*>((char*) bops + sizeof(OSPBukkitEntry));
}
OSPBukkitEntry* bosp_from_osp(protocol::OutgoingServerPacket* ops) {
return reinterpret_cast<OSPBukkitEntry*>((char*) ops - sizeof(OSPBukkitEntry));
}
void destroy_bosp(OSPBukkitEntry* entry) {
deconstruct_osp(osp_from_bosp(entry));
::free(entry);
}
OSPBukkitEntry* construct_bosp(size_t payload_size) {
auto base_size = sizeof(OSPBukkitEntry) + sizeof(protocol::OutgoingServerPacket) - 1;
auto full_size = base_size + payload_size;
auto bentry = (OSPBukkitEntry*) malloc(full_size);
bentry->next = nullptr;
bentry->extra_allocated = false;
construct_osp(osp_from_bosp(bentry));
return bentry;
}
void protocol::OutgoingServerPacket::object_freed() {
auto bentry = (OSPBukkitEntry*) bosp_from_osp(this);
if(bentry->extra_allocated) {
destroy_bosp(bentry);
return;
}
std::unique_lock block{osp_mutex};
if(sdp_count >= BUKKIT_MAX_ENTRIES) {
block.unlock();
destroy_bosp(bentry);
return;
}
assert(!bentry->next);
*osp_tail = bentry;
osp_tail = &bentry->next;
sdp_count++;
}
protocol::OutgoingServerPacket* protocol::allocate_outgoing_packet(size_t payload_size) {
if(BUKKIT_ENTRY_SIZE > payload_size) {
std::lock_guard block{osp_mutex};
if(osp_head) {
assert(sdp_count > 0);
sdp_count--;
auto entry = osp_head;
if(osp_head->next) {
assert(osp_tail != &osp_head->next);
osp_head = osp_head->next;
} else {
assert(osp_tail == &osp_head->next);
osp_head = nullptr;
osp_tail = &osp_head;
}
entry->next = nullptr;
auto result = osp_from_bosp(entry);
reset_osp(result, payload_size);
result->ref_count++;
return result;
} else if(sdp_count < BUKKIT_MAX_ENTRIES) {
auto entry = construct_bosp(BUKKIT_ENTRY_SIZE);
entry->extra_allocated = false;
auto result = osp_from_bosp(entry);
reset_osp(result, payload_size);
result->ref_count++;
return result;
}
}
auto entry = construct_bosp(payload_size);
entry->extra_allocated = true;
auto result = osp_from_bosp(entry);
reset_osp(result, payload_size);
result->ref_count++;
return result;
}
#else
void protocol::OutgoingServerPacket::object_freed() {
//TODO: Bukkit list?
deconstruct_osp(this);
::free(this);
}
protocol::OutgoingServerPacket* protocol::allocate_outgoing_packet(size_t payload_size) {
auto base_size = sizeof(protocol::OutgoingServerPacket) - 1;
auto full_size = base_size + payload_size;
auto result = (protocol::OutgoingServerPacket*) malloc(full_size);
construct_osp(result);
reset_osp(result, payload_size);
result->ref_count++;
return result;
}
#endif
}

View File

@ -75,37 +75,37 @@ namespace ts {
bool owns_data = false;
};
struct PacketIdManagerData {
PacketIdManagerData(){
memset(this->packetCounter, 0, sizeof(uint32_t) * 16);
}
uint32_t packetCounter[16]{};
};
class PacketIdManager {
public:
PacketIdManager() : data(new PacketIdManagerData){}
PacketIdManager() {
this->reset();
}
~PacketIdManager() = default;
PacketIdManager(const PacketIdManager& ref) : data(ref.data) {}
PacketIdManager(PacketIdManager&& ref) : data(std::move(ref.data)) {}
PacketIdManager(const PacketIdManager& ref) = delete;
PacketIdManager(PacketIdManager&& ref) = delete;
uint16_t nextPacketId(const PacketTypeInfo &type){
return static_cast<uint16_t>(data->packetCounter[type.type()]++ & 0xFFFF);
[[nodiscard]] uint16_t nextPacketId(const PacketTypeInfo &type){
return static_cast<uint16_t>(this->packetCounter[type.type()]++ & 0xFFFF);
}
uint16_t currentPacketId(const PacketTypeInfo &type){
return static_cast<uint16_t>(data->packetCounter[type.type()] & 0xFFFF);
[[nodiscard]] uint16_t currentPacketId(const PacketTypeInfo &type){
return static_cast<uint16_t>(this->packetCounter[type.type()] & 0xFFFF);
}
uint16_t generationId(const PacketTypeInfo &type){
return static_cast<uint16_t>((data->packetCounter[type.type()] >> 16) & 0xFFFF);
[[nodiscard]] uint16_t generationId(const PacketTypeInfo &type){
return static_cast<uint16_t>((this->packetCounter[type.type()] >> 16) & 0xFFFF);
}
[[nodiscard]] uint32_t generate_full_id(const PacketType& type) {
return this->packetCounter[type]++;
}
void reset() {
memset(&data->packetCounter[0], 0, sizeof(uint32_t) * 16);
memset(&this->packetCounter[0], 0, sizeof(uint32_t) * 16);
}
private:
std::shared_ptr<PacketIdManagerData> data;
uint32_t packetCounter[16]{};
};
namespace PacketFlag {
@ -383,12 +383,16 @@ namespace ts {
void setPacketId(uint16_t, uint16_t) override;
};
class ServerPacketParser : public PacketParser {
class ServerPacketP {
public:
constexpr static auto kHeaderOffset = 8;
constexpr static auto kHeaderLength = SERVER_HEADER_SIZE;
constexpr static auto kPayloadOffset = kHeaderOffset + SERVER_HEADER_SIZE;
};
class ServerPacketParser : public PacketParser, public ServerPacketP {
public:
explicit ServerPacketParser(pipes::buffer_view buffer) : PacketParser{std::move(buffer)} {}
ServerPacketParser(const ServerPacketParser&) = delete;
@ -404,5 +408,59 @@ namespace ts {
[[nodiscard]] uint8_t type() const override;
[[nodiscard]] uint8_t flags() const override;
};
struct OutgoingServerPacket {
public:
/* general info */
std::atomic<uint16_t> ref_count;
size_t payload_size;
OutgoingServerPacket* next; /* used within the write/process queue */
uint16_t generation;
/* actual packet data */
uint8_t mac[8];
uint8_t packet_id_bytes[2];
uint8_t type_and_flags;
uint8_t payload[1]; /* variable size */
[[nodiscard]] inline const void* packet_data() const {
return this->mac;
}
[[nodiscard]] inline size_t packet_length() const {
return this->payload_size + (8 + 2 + 1);
}
inline auto ref() {
auto count = ++ref_count;
assert(count > 1);
return count;
}
inline void unref() {
if(--this->ref_count == 0)
this->object_freed();
}
/* some helper methods */
inline void set_packet_id(uint16_t id) {
this->packet_id_bytes[0] = id >> 8U;
this->packet_id_bytes[1] = id & 0xFFU;
}
[[nodiscard]] inline auto packet_id() const {
return (uint16_t) (this->packet_id_bytes[0] << 8U) | this->packet_id_bytes[1];
}
[[nodiscard]] inline auto packet_type() const {
return (PacketType) (this->type_and_flags & 0xF);
}
private:
void object_freed();
};
/* This will allocate a new outgoing packet. To delete just unref the packet! */
OutgoingServerPacket* allocate_outgoing_packet(size_t /* payload size */);
}
}

View File

@ -6,7 +6,7 @@
#include <condition_variable>
#include "sql/SqlQuery.h"
#include "../../misc/spin_lock.h"
#include "misc/spin_mutex.h"
#if defined(HAVE_MYSQL_MYSQL_H)
#include <mysql/mysql.h>
@ -31,7 +31,7 @@ namespace sql::mysql {
struct Connection {
MYSQL* handle = nullptr;
spin_lock used_lock;
spin_mutex used_lock;
bool used = false;
~Connection();