#pragma once #include #include #include #include #include #include #include "Packet.h" #include "../misc/queue.h" #include #include #ifndef NO_LOG #include #endif namespace ts { namespace buffer { struct RawBuffer { public: RawBuffer() : RawBuffer(0) {} RawBuffer(size_t length) : index(0), length(length) { if(length > 0) buffer = (char *) malloc(length); else buffer = nullptr; this->length = length; this->index = 0; } RawBuffer(const RawBuffer &other) : RawBuffer(other.length) { if(other.length > 0) memcpy(this->buffer, other.buffer, this->length); this->index = other.index; } virtual ~RawBuffer() { if(buffer) free(buffer); this->buffer = nullptr; } void slice(size_t length) { char *oldBuff = this->buffer; this->buffer = (char *) malloc(length); memcpy(this->buffer, oldBuff, length); this->length = length; free(oldBuff); } char *buffer = nullptr; size_t length = 0; size_t index = 0; TAILQ_ENTRY(ts::buffer::RawBuffer) tail; }; template struct SortedBufferQueue { SortedBufferQueue(ts::protocol::PacketTypeInfo type, bool ignoreOrder) : _type(std::move(type)), ignoreOrder(ignoreOrder) { this->current.index = 0; } SortedBufferQueue(const SortedBufferQueue &ref) = delete; SortedBufferQueue(SortedBufferQueue&&ref) = delete; ~SortedBufferQueue() = default; ts::protocol::PacketTypeInfo type() { return this->_type; } void skipPacket(){ threads::MutexLock lock(this->lock); this->current.index++; } size_t available(){ threads::MutexLock lock(this->lock); if(this->ignoreOrder) return this->packets.size(); uint16_t index = 0; while(true) { if(!this->find_packet(this->current.index + index)) return index; else index++; } } std::shared_ptr find_packet(uint32_t pktId){ pktId &= 0xFFFF; threads::MutexLock lock(this->lock); for(const auto& elm : this->packets) if(elm->packetId() == pktId) return elm; return nullptr; } std::shared_ptr peekNext(uint32_t index) { threads::MutexLock lock(this->lock); if(this->ignoreOrder) { if(this->packets.size() > index) return this->packets[index]; else return nullptr; } return this->find_packet(this->current.index + index); } void pop_packets(int32_t count = -1) { if(count == -1) count = 1; threads::MutexLock lock(this->lock); if(this->ignoreOrder) { while(count-- > 0 && !this->packets.empty()) this->packets.pop_front(); return; } auto until = this->current.index + count; while(this->current.index < until) { for(auto it = this->packets.begin(); it != this->packets.end(); it++) { if((*it)->packetId() == this->current.packet_id) { this->packets.erase(it); break; } } this->current.index++; } } bool push_pack(const std::shared_ptr& pkt){ threads::MutexLock lock(this->lock); if(this->ignoreOrder) { this->packets.push_back(pkt); if(this->current.packet_id > pkt->packetId()) { if(this->current.packet_id > 0xFF00 && pkt->packetId() < 0xFF) { this->current.packet_id = pkt->packetId(); this->current.generation++; } } else this->current.packet_id = pkt->packetId(); return true; } if(this->current.packet_id > pkt->packetId()) { if(this->current.packet_id < 0xFF00 || pkt->packetId() > 0xFF) { #ifndef NO_LOG debugMessage(0, "Invalid packed pushpack! Current index {} (generation {}) Packet index {}", this->current.packet_id, this->current.generation, pkt->packetId()); #endif return false; } } this->packets.push_back(pkt); return true; } void reset(){ this->current.index = 0; } std::unique_lock try_lock_queue() { threads::MutexTryLock lock(this->lock); if(!lock) return {}; return std::unique_lock(this->lock); } std::unique_lock try_lock_execute() { threads::MutexTryLock lock(this->execute_lock); if(!lock) return {}; return std::unique_lock(this->execute_lock); } uint16_t current_packet_id() { return this->current.packet_id; } uint16_t current_generation_id() { return this->current.generation; } uint16_t calculate_generation(uint16_t packetId) { if(packetId >= this->current.packet_id) return this->current.generation; if(packetId < 0xFF && this->current.packet_id > 0xFF00) return this->current.generation + 1; return this->current.generation; } union PacketPair { uint32_t index; struct { uint16_t packet_id; uint16_t generation; }; }; PacketPair current{0}; private: ts::protocol::PacketTypeInfo _type; bool ignoreOrder = false; std::deque> packets{}; std::recursive_mutex lock; std::recursive_mutex execute_lock; }; struct size { enum value : uint8_t { unset, min, Bytes_512 = min, Bytes_1024, Bytes_1536, max }; static inline size_t byte_length(value size) { switch (size) { case Bytes_512: return 512; case Bytes_1024: return 1024; case Bytes_1536: return 1536; default: return 0; } } }; //typedef std::unique_ptr buffer_t; typedef pipes::buffer buffer_t; extern buffer_t allocate_buffer(size::value /* size */); inline buffer_t allocate_buffer(size_t length) { pipes::buffer result; if(length <= 512) result = allocate_buffer(size::Bytes_512); else if(length <= 1024) result = allocate_buffer(size::Bytes_1024); else if(length <= 1536) result = allocate_buffer(size::Bytes_1536); else { return pipes::buffer{length}; } result.resize(length); return result; } struct cleaninfo { size_t bytes_freed_internal; size_t bytes_freed_buffer; }; struct cleanmode { enum value { CHUNKS = 0x01, BLOCKS = 0x02, CHUNKS_BLOCKS = 0x03 }; }; extern cleaninfo cleanup_buffers(cleanmode::value /* mode */); struct meminfo { size_t bytes_buffer = 0; size_t bytes_buffer_used = 0; size_t bytes_internal = 0; size_t nodes = 0; size_t nodes_full = 0; }; extern meminfo buffer_memory(); } }