2019-06-26 22:11:22 +02:00
# pragma once
# include <chrono>
# include <memory>
# include <list>
# include <cstring>
# include <ThreadPool/Mutex.h>
# include <sstream>
# include "Packet.h"
# include "../misc/queue.h"
# include <cassert>
# include <utility>
# ifndef NO_LOG
# include <log/LogUtils.h>
# endif
namespace ts {
namespace buffer {
struct RawBuffer {
public :
RawBuffer ( ) : RawBuffer ( 0 ) { }
RawBuffer ( size_t length ) : index ( 0 ) , length ( length ) {
if ( length > 0 ) buffer = ( char * ) malloc ( length ) ;
else buffer = nullptr ;
this - > length = length ;
this - > index = 0 ;
}
RawBuffer ( const RawBuffer & other ) : RawBuffer ( other . length ) {
if ( other . length > 0 ) memcpy ( this - > buffer , other . buffer , this - > length ) ;
this - > index = other . index ;
}
virtual ~ RawBuffer ( ) {
if ( buffer )
free ( buffer ) ;
this - > buffer = nullptr ;
}
void slice ( size_t length ) {
char * oldBuff = this - > buffer ;
this - > buffer = ( char * ) malloc ( length ) ;
memcpy ( this - > buffer , oldBuff , length ) ;
this - > length = length ;
free ( oldBuff ) ;
}
char * buffer = nullptr ;
size_t length = 0 ;
size_t index = 0 ;
TAILQ_ENTRY ( ts : : buffer : : RawBuffer ) tail ;
} ;
template < typename PktType >
struct SortedBufferQueue {
SortedBufferQueue ( ts : : protocol : : PacketTypeInfo type , bool ignoreOrder ) : _type ( std : : move ( type ) ) , ignoreOrder ( ignoreOrder ) {
this - > current . index = 0 ;
}
SortedBufferQueue ( const SortedBufferQueue & ref ) = delete ;
SortedBufferQueue ( SortedBufferQueue & & ref ) = delete ;
~ SortedBufferQueue ( ) = default ;
2020-01-24 02:49:59 +01:00
ts : : protocol : : PacketTypeInfo type ( ) { return this - > _type ; }
2019-06-26 22:11:22 +02:00
void skipPacket ( ) {
threads : : MutexLock lock ( this - > lock ) ;
this - > current . index + + ;
}
size_t available ( ) {
threads : : MutexLock lock ( this - > lock ) ;
if ( this - > ignoreOrder ) return this - > packets . size ( ) ;
uint16_t index = 0 ;
while ( true ) {
if ( ! this - > find_packet ( this - > current . index + index ) )
return index ;
else
index + + ;
}
}
std : : shared_ptr < PktType > find_packet ( uint32_t pktId ) {
pktId & = 0xFFFF ;
threads : : MutexLock lock ( this - > lock ) ;
for ( const auto & elm : this - > packets )
if ( elm - > packetId ( ) = = pktId )
return elm ;
return nullptr ;
}
2020-01-24 02:49:59 +01:00
std : : shared_ptr < PktType > peekNext ( uint32_t index ) {
threads : : MutexLock lock ( this - > lock ) ;
if ( this - > ignoreOrder ) {
if ( this - > packets . size ( ) > index )
return this - > packets [ index ] ;
else
return nullptr ;
}
return this - > find_packet ( this - > current . index + index ) ;
}
void pop_packets ( int32_t count = - 1 ) {
if ( count = = - 1 ) count = 1 ;
threads : : MutexLock lock ( this - > lock ) ;
if ( this - > ignoreOrder ) {
while ( count - - > 0 & & ! this - > packets . empty ( ) ) this - > packets . pop_front ( ) ;
return ;
}
auto until = this - > current . index + count ;
while ( this - > current . index < until ) {
for ( auto it = this - > packets . begin ( ) ; it ! = this - > packets . end ( ) ; it + + ) {
if ( ( * it ) - > packetId ( ) = = this - > current . packet_id ) {
this - > packets . erase ( it ) ;
break ;
}
}
this - > current . index + + ;
}
}
2019-06-26 22:11:22 +02:00
bool push_pack ( const std : : shared_ptr < PktType > & pkt ) {
threads : : MutexLock lock ( this - > lock ) ;
if ( this - > ignoreOrder ) {
this - > packets . push_back ( pkt ) ;
2020-01-24 02:49:59 +01:00
if ( this - > current . packet_id > pkt - > packetId ( ) ) {
if ( this - > current . packet_id > 0xFF00 & & pkt - > packetId ( ) < 0xFF ) {
this - > current . packet_id = pkt - > packetId ( ) ;
this - > current . generation + + ;
}
} else this - > current . packet_id = pkt - > packetId ( ) ;
2019-06-26 22:11:22 +02:00
return true ;
}
if ( this - > current . packet_id > pkt - > packetId ( ) ) {
2020-01-24 02:49:59 +01:00
if ( this - > current . packet_id < 0xFF00 | | pkt - > packetId ( ) > 0xFF ) {
2019-06-26 22:11:22 +02:00
# ifndef NO_LOG
2020-01-24 02:49:59 +01:00
debugMessage ( 0 , " Invalid packed pushpack! Current index {} (generation {}) Packet index {} " , this - > current . packet_id , this - > current . generation , pkt - > packetId ( ) ) ;
2019-06-26 22:11:22 +02:00
# endif
2020-01-24 02:49:59 +01:00
return false ;
}
2019-06-26 22:11:22 +02:00
}
this - > packets . push_back ( pkt ) ;
return true ;
}
void reset ( ) {
this - > current . index = 0 ;
}
2020-01-24 02:49:59 +01:00
std : : unique_lock < std : : recursive_mutex > try_lock_queue ( ) {
threads : : MutexTryLock lock ( this - > lock ) ;
if ( ! lock ) return { } ;
return std : : unique_lock ( this - > lock ) ;
}
2019-06-26 22:11:22 +02:00
2020-01-24 02:49:59 +01:00
std : : unique_lock < std : : recursive_mutex > try_lock_execute ( ) {
threads : : MutexTryLock lock ( this - > execute_lock ) ;
if ( ! lock ) return { } ;
return std : : unique_lock ( this - > execute_lock ) ;
}
2019-06-26 22:11:22 +02:00
2020-01-24 02:49:59 +01:00
uint16_t current_packet_id ( ) { return this - > current . packet_id ; }
uint16_t current_generation_id ( ) { return this - > current . generation ; }
2019-06-26 22:11:22 +02:00
2020-01-24 02:49:59 +01:00
uint16_t calculate_generation ( uint16_t packetId ) {
if ( packetId > = this - > current . packet_id ) return this - > current . generation ;
2019-06-26 22:11:22 +02:00
2020-01-24 02:49:59 +01:00
if ( packetId < 0xFF & & this - > current . packet_id > 0xFF00 )
return this - > current . generation + 1 ;
2019-06-26 22:11:22 +02:00
2020-01-24 02:49:59 +01:00
return this - > current . generation ;
}
2019-06-26 22:11:22 +02:00
union PacketPair {
2020-01-24 02:49:59 +01:00
uint32_t index ;
struct {
uint16_t packet_id ;
uint16_t generation ;
} ;
} ;
2019-06-26 22:11:22 +02:00
PacketPair current { 0 } ;
private :
ts : : protocol : : PacketTypeInfo _type ;
bool ignoreOrder = false ;
std : : deque < std : : shared_ptr < PktType > > packets { } ;
std : : recursive_mutex lock ;
std : : recursive_mutex execute_lock ;
} ;
struct size {
2020-01-24 02:49:59 +01:00
enum value : uint8_t {
unset ,
min ,
Bytes_512 = min ,
Bytes_1024 ,
Bytes_1536 ,
max
} ;
static inline size_t byte_length ( value size ) {
switch ( size ) {
case Bytes_512 :
return 512 ;
case Bytes_1024 :
return 1024 ;
case Bytes_1536 :
return 1536 ;
default :
return 0 ;
}
}
2019-06-26 22:11:22 +02:00
} ;
//typedef std::unique_ptr<pipes::buffer, void(*)(pipes::buffer*)> buffer_t;
typedef pipes : : buffer buffer_t ;
extern buffer_t allocate_buffer ( size : : value /* size */ ) ;
inline buffer_t allocate_buffer ( size_t length ) {
2020-01-24 02:49:59 +01:00
pipes : : buffer result ;
if ( length < = 512 )
result = allocate_buffer ( size : : Bytes_512 ) ;
else if ( length < = 1024 )
result = allocate_buffer ( size : : Bytes_1024 ) ;
else if ( length < = 1536 )
result = allocate_buffer ( size : : Bytes_1536 ) ;
else {
return pipes : : buffer { length } ;
}
result . resize ( length ) ;
return result ;
2019-06-26 22:11:22 +02:00
}
2020-01-24 02:49:59 +01:00
struct cleaninfo {
size_t bytes_freed_internal ;
size_t bytes_freed_buffer ;
} ;
struct cleanmode {
enum value {
CHUNKS = 0x01 ,
BLOCKS = 0x02 ,
CHUNKS_BLOCKS = 0x03
} ;
} ;
extern cleaninfo cleanup_buffers ( cleanmode : : value /* mode */ ) ;
struct meminfo {
size_t bytes_buffer = 0 ;
size_t bytes_buffer_used = 0 ;
size_t bytes_internal = 0 ;
size_t nodes = 0 ;
size_t nodes_full = 0 ;
} ;
extern meminfo buffer_memory ( ) ;
2019-06-26 22:11:22 +02:00
}
}