mirror of
https://github.com/cjcliffe/CubicSDR.git
synced 2026-06-27 22:14:26 -04:00
Make ReBuffer refcount management automatic, using std::shared_ptr:
- No longer need to call setRefCount() / decRefCount() - Restore old ThreadBlockingQueue using std::deque, for std::shared_ptr correct behaviour.
This commit is contained in:
@@ -3,7 +3,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <vector>
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <cstdint>
|
||||
@@ -29,24 +29,22 @@ class ThreadQueueBase {
|
||||
template<typename T>
|
||||
class ThreadBlockingQueue : public ThreadQueueBase {
|
||||
|
||||
typedef typename std::vector<T>::value_type value_type;
|
||||
typedef typename std::vector<T>::size_type size_type;
|
||||
typedef typename std::deque<T>::value_type value_type;
|
||||
typedef typename std::deque<T>::size_type size_type;
|
||||
|
||||
public:
|
||||
|
||||
/*! Create safe blocking queue. */
|
||||
ThreadBlockingQueue() {
|
||||
//at least 1 (== Exchanger)
|
||||
m_circular_buffer.resize(MIN_ITEM_NB);
|
||||
m_max_num_items = MIN_ITEM_NB;
|
||||
};
|
||||
|
||||
//Copy constructor
|
||||
ThreadBlockingQueue(const ThreadBlockingQueue& sq) {
|
||||
std::lock_guard < std::mutex > lock(sq.m_mutex);
|
||||
m_circular_buffer = sq.m_circular_buffer;
|
||||
m_head = sq.m_head;
|
||||
m_tail = sq.m_tail;
|
||||
m_size = sq.m_size;
|
||||
m_queue = sq.m_queue;
|
||||
m_max_num_items = sq.m_max_num_items;
|
||||
}
|
||||
|
||||
/*! Destroy safe queue. */
|
||||
@@ -62,11 +60,10 @@ public:
|
||||
void set_max_num_items(unsigned int max_num_items) {
|
||||
std::lock_guard < std::mutex > lock(m_mutex);
|
||||
|
||||
if (max_num_items > (unsigned int)m_circular_buffer.size()) {
|
||||
if (max_num_items > m_max_num_items) {
|
||||
//Only raise the existing max size, never reduce it
|
||||
//for simplification sake at runtime.
|
||||
m_circular_buffer.resize(max_num_items);
|
||||
//m_head and m_tail stays valid for the new size.
|
||||
m_max_num_items = max_num_items;
|
||||
m_cond_not_full.notify_all();
|
||||
}
|
||||
}
|
||||
@@ -86,14 +83,14 @@ public:
|
||||
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
|
||||
m_cond_not_full.wait(lock, [this]() // Lambda funct
|
||||
{
|
||||
return m_size < m_circular_buffer.size();
|
||||
return m_queue.size() < m_max_num_items;
|
||||
});
|
||||
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_size >= m_circular_buffer.size()) {
|
||||
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.size() >= m_max_num_items) {
|
||||
// if the value is below a threshold, consider it is a try_push()
|
||||
return false;
|
||||
}
|
||||
else if (false == m_cond_not_full.wait_for(lock, std::chrono::microseconds(timeout),
|
||||
[this]() { return m_size < m_circular_buffer.size(); })) {
|
||||
[this]() { return m_queue.size() < m_max_num_items; })) {
|
||||
std::thread::id currentThreadId = std::this_thread::get_id();
|
||||
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
|
||||
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.push() has failed with timeout > " <<
|
||||
@@ -101,11 +98,7 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
//m_tail is already the next valid place an item can be put
|
||||
m_circular_buffer[m_tail] = item;
|
||||
m_tail = nextIndex(m_tail, (int)m_circular_buffer.size());
|
||||
m_size++;
|
||||
|
||||
m_queue.push_back(item);
|
||||
m_cond_not_empty.notify_all();
|
||||
return true;
|
||||
}
|
||||
@@ -118,14 +111,11 @@ public:
|
||||
bool try_push(const value_type& item) {
|
||||
std::lock_guard < std::mutex > lock(m_mutex);
|
||||
|
||||
if (m_size >= m_circular_buffer.size()) {
|
||||
if (m_queue.size() >= m_max_num_items) {
|
||||
return false;
|
||||
}
|
||||
|
||||
//m_tail is already the next valid place an item can be put
|
||||
m_circular_buffer[m_tail] = item;
|
||||
m_tail = nextIndex(m_tail, (int)m_circular_buffer.size());
|
||||
m_size++;
|
||||
m_queue.push_back(item);
|
||||
m_cond_not_empty.notify_all();
|
||||
return true;
|
||||
}
|
||||
@@ -142,14 +132,14 @@ public:
|
||||
if (timeout == BLOCKING_INFINITE_TIMEOUT) {
|
||||
m_cond_not_empty.wait(lock, [this]() // Lambda funct
|
||||
{
|
||||
return m_size > 0;
|
||||
return !m_queue.empty();
|
||||
});
|
||||
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_size == 0) {
|
||||
} else if (timeout <= NON_BLOCKING_TIMEOUT && m_queue.empty()) {
|
||||
// if the value is below a threshold, consider it is try_pop()
|
||||
return false;
|
||||
}
|
||||
else if (false == m_cond_not_empty.wait_for(lock, std::chrono::microseconds(timeout),
|
||||
[this]() { return m_size > 0; })) {
|
||||
[this]() { return !m_queue.empty(); })) {
|
||||
std::thread::id currentThreadId = std::this_thread::get_id();
|
||||
std::cout << "WARNING: Thread 0x" << std::hex << currentThreadId << std::dec <<
|
||||
" (" << currentThreadId << ") executing {" << typeid(*this).name() << "}.pop() has failed with timeout > " <<
|
||||
@@ -157,9 +147,8 @@ public:
|
||||
return false;
|
||||
}
|
||||
|
||||
item = m_circular_buffer[m_head];
|
||||
m_head = nextIndex(m_head, (int)m_circular_buffer.size());
|
||||
m_size--;
|
||||
item = m_queue.front();
|
||||
m_queue.pop_front();
|
||||
m_cond_not_full.notify_all();
|
||||
return true;
|
||||
}
|
||||
@@ -172,13 +161,12 @@ public:
|
||||
bool try_pop(value_type& item) {
|
||||
std::lock_guard < std::mutex > lock(m_mutex);
|
||||
|
||||
if (m_size == 0) {
|
||||
if (m_queue.empty()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
item = m_circular_buffer[m_head];
|
||||
m_head = nextIndex(m_head, (int)m_circular_buffer.size());
|
||||
m_size--;
|
||||
item = m_queue.front();
|
||||
m_queue.pop_front();
|
||||
m_cond_not_full.notify_all();
|
||||
return true;
|
||||
}
|
||||
@@ -190,7 +178,7 @@ public:
|
||||
*/
|
||||
size_type size() const {
|
||||
std::lock_guard < std::mutex > lock(m_mutex);
|
||||
return m_size;
|
||||
return m_queue.size();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -199,7 +187,7 @@ public:
|
||||
*/
|
||||
bool empty() const {
|
||||
std::lock_guard < std::mutex > lock(m_mutex);
|
||||
return m_size == 0;
|
||||
return m_queue.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -208,7 +196,7 @@ public:
|
||||
*/
|
||||
bool full() const {
|
||||
std::lock_guard < std::mutex > lock(m_mutex);
|
||||
return (m_size >= m_circular_buffer.size());
|
||||
return (m_queue.size() >= m_max_num_items);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -216,9 +204,7 @@ public:
|
||||
*/
|
||||
void flush() {
|
||||
std::lock_guard < std::mutex > lock(m_mutex);
|
||||
m_head = 0;
|
||||
m_tail = 0;
|
||||
m_size = 0;
|
||||
m_queue.clear();
|
||||
m_cond_not_full.notify_all();
|
||||
}
|
||||
|
||||
@@ -230,25 +216,22 @@ public:
|
||||
if (this != &sq) {
|
||||
std::lock_guard < std::mutex > lock1(m_mutex);
|
||||
std::lock_guard < std::mutex > lock2(sq.m_mutex);
|
||||
m_circular_buffer.swap(sq.m_circular_buffer);
|
||||
m_queue.swap(sq.m_queue);
|
||||
std::swap(m_max_num_items, sq.m_max_num_items);
|
||||
|
||||
std::swap(m_head, sq.m_head);
|
||||
std::swap(m_tail, sq.m_tail);
|
||||
std::swap(m_size, sq.m_size);
|
||||
|
||||
if (m_size > 0) {
|
||||
if (!m_queue.empty()) {
|
||||
m_cond_not_empty.notify_all();
|
||||
}
|
||||
|
||||
if (sq.m_size > 0) {
|
||||
if (!sq.m_queue.empty()) {
|
||||
sq.m_cond_not_empty.notify_all();
|
||||
}
|
||||
|
||||
if (m_size < m_circular_buffer.size()) {
|
||||
if (!m_queue.full()) {
|
||||
m_cond_not_full.notify_all();
|
||||
}
|
||||
|
||||
if (sq.m_size < sq.m_circular_buffer.size()) {
|
||||
if (!sq.m_queue.full()) {
|
||||
sq.m_cond_not_full.notify_all();
|
||||
}
|
||||
}
|
||||
@@ -260,17 +243,14 @@ public:
|
||||
std::lock_guard < std::mutex > lock1(m_mutex);
|
||||
std::lock_guard < std::mutex > lock2(sq.m_mutex);
|
||||
|
||||
m_circular_buffer = sq.m_circular_buffer;
|
||||
m_queue = sq.m_queue;
|
||||
m_max_num_items = sq.m_max_num_items;
|
||||
|
||||
m_head = sq.m_head;
|
||||
m_tail = sq.m_tail;
|
||||
m_size = sq.m_size;
|
||||
|
||||
if (m_size > 0) {
|
||||
if (!m_queue.empty()) {
|
||||
m_cond_not_empty.notify_all();
|
||||
}
|
||||
|
||||
if (m_size < m_circular_buffer.size()) {
|
||||
if (!m_queue.full()) {
|
||||
m_cond_not_full.notify_all();
|
||||
}
|
||||
}
|
||||
@@ -278,27 +258,13 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
/// use a circular buffer structure to prevent allocations / reallocations (fixed array + modulo)
|
||||
std::vector<T> m_circular_buffer;
|
||||
|
||||
/**
|
||||
* The 'head' index of the element at the head of the deque, 'tail'
|
||||
* the next (valid !) index at which an element can be pushed.
|
||||
* m_head == m_tail means empty.
|
||||
*/
|
||||
int m_head = 0, m_tail = 0;
|
||||
|
||||
//hold the current number of elements.
|
||||
size_type m_size = 0;
|
||||
|
||||
//
|
||||
inline int nextIndex(int index, int mod) const {
|
||||
return (index + 1 == mod) ? 0 : index + 1;
|
||||
}
|
||||
//TODO: use a circular buffer structure ? (fixed array + modulo)
|
||||
std::deque<T> m_queue;
|
||||
|
||||
mutable std::mutex m_mutex;
|
||||
std::condition_variable m_cond_not_empty;
|
||||
std::condition_variable m_cond_not_full;
|
||||
size_t m_max_num_items = MIN_ITEM_NB;
|
||||
};
|
||||
|
||||
/*! Swaps the contents of two ThreadBlockingQueue objects. (external operator) */
|
||||
|
||||
Reference in New Issue
Block a user