From 41ca39eab7a9c85211ae52647fb6910b2fb1c60c Mon Sep 17 00:00:00 2001 From: vsonnier Date: Wed, 1 Jun 2016 19:32:22 +0200 Subject: [PATCH 1/4] FIX audioVisOutputQueue nullptr crash --- src/demod/DemodulatorThread.cpp | 32 +++++++++++++++++++++++++------- src/demod/DemodulatorThread.h | 21 ++++++++++++--------- 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index e7790d5..e8f4036 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -13,7 +13,9 @@ #include #endif -DemodulatorThread::DemodulatorThread(DemodulatorInstance *parent) : IOThread(), outputBuffers("DemodulatorThreadBuffers"), squelchLevel(-100), signalLevel(-100), squelchEnabled(false), cModem(nullptr), cModemKit(nullptr), iqInputQueue(NULL), audioOutputQueue(NULL), audioVisOutputQueue(NULL), threadQueueControl(NULL), threadQueueNotify(NULL) { +DemodulatorThread::DemodulatorThread(DemodulatorInstance *parent) + : IOThread(), outputBuffers("DemodulatorThreadBuffers"), squelchLevel(-100), + signalLevel(-100), squelchEnabled(false) { demodInstance = parent; muted.store(false); @@ -26,6 +28,10 @@ DemodulatorThread::~DemodulatorThread() { void DemodulatorThread::onBindOutput(std::string name, ThreadQueueBase *threadQueue) { if (name == "AudioVisualOutput") { + + //protects because it may be changed at runtime + std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue); + audioVisOutputQueue = (DemodulatorThreadOutputQueue *)threadQueue; } } @@ -119,7 +125,7 @@ void DemodulatorThread::run() { modemData.data.assign(inputData->begin(), inputData->end()); modemData.setRefCount(1); - AudioThreadInput *ati = NULL; + AudioThreadInput *ati = nullptr; ModemAnalog *modemAnalog = (cModem->getType() == "analog")?((ModemAnalog *)cModem):nullptr; ModemDigital *modemDigital = (cModem->getType() == "digital")?((ModemDigital *)cModem):nullptr; @@ -159,7 +165,7 @@ void DemodulatorThread::run() { } } - if (audioOutputQueue != NULL && ati && !squelched) { + if (audioOutputQueue != nullptr && ati && !squelched) { std::vector::iterator data_i; ati->peak = 0; for (data_i = ati->data.begin(); data_i != ati->data.end(); data_i++) { @@ -173,8 +179,17 @@ void DemodulatorThread::run() { ati = nullptr; } - if (ati && audioVisOutputQueue != NULL && audioVisOutputQueue->empty()) { + //At that point, capture the current state of audioVisOutputQueue in a local + //variable, and works with it with now on until the next while-turn. + DemodulatorThreadOutputQueue* localAudioVisOutputQueue = nullptr; + { + std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue); + localAudioVisOutputQueue = audioVisOutputQueue; + } + + if (ati && localAudioVisOutputQueue != nullptr && localAudioVisOutputQueue->empty()) { AudioThreadInput *ati_vis = audioVisBuffers.getBuffer(); + ati_vis->setRefCount(1); ati_vis->sampleRate = inp->sampleRate; ati_vis->inputRate = inp->sampleRate; @@ -230,11 +245,11 @@ void DemodulatorThread::run() { ati_vis->type = 0; } - audioVisOutputQueue->push(ati_vis); + localAudioVisOutputQueue->push(ati_vis); } - if (ati != NULL) { + if (ati != nullptr) { if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) { audioOutputQueue->push(ati); } else { @@ -266,7 +281,10 @@ void DemodulatorThread::run() { outputBuffers.purge(); - if (audioVisOutputQueue && !audioVisOutputQueue->empty()) { + //Guard the cleanup of audioVisOutputQueue properly. + std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue); + + if (audioVisOutputQueue != nullptr && !audioVisOutputQueue->empty()) { AudioThreadInput *dummy_vis; audioVisOutputQueue->pop(dummy_vis); } diff --git a/src/demod/DemodulatorThread.h b/src/demod/DemodulatorThread.h index b95f7ac..cf3393e 100644 --- a/src/demod/DemodulatorThread.h +++ b/src/demod/DemodulatorThread.h @@ -40,8 +40,8 @@ protected: float abMagnitude(double alpha, double beta, float inphase, float quadrature); float linearToDb(float linear); - DemodulatorInstance *demodInstance; - ReBuffer outputBuffers; + DemodulatorInstance *demodInstance = nullptr; + ReBuffer outputBuffers = nullptr; std::atomic_bool muted; @@ -49,12 +49,15 @@ protected: std::atomic signalLevel; bool squelchEnabled, squelchBreak; - Modem *cModem; - ModemKit *cModemKit; + Modem *cModem = nullptr; + ModemKit *cModemKit = nullptr; - DemodulatorThreadPostInputQueue* iqInputQueue; - AudioThreadInputQueue *audioOutputQueue; - DemodulatorThreadOutputQueue* audioVisOutputQueue; - DemodulatorThreadControlCommandQueue *threadQueueControl; - DemodulatorThreadCommandQueue* threadQueueNotify; + DemodulatorThreadPostInputQueue* iqInputQueue = nullptr; + AudioThreadInputQueue *audioOutputQueue = nullptr; + DemodulatorThreadOutputQueue* audioVisOutputQueue = nullptr; + DemodulatorThreadControlCommandQueue *threadQueueControl = nullptr; + DemodulatorThreadCommandQueue* threadQueueNotify = nullptr; + + //protects the audioVisOutputQueue dynamic binding change at runtime (in DemodulatorMgr) + mutable std::mutex m_mutexAudioVisOutputQueue; }; From 357dcc967b861a3dce27072957f2571de1206a5a Mon Sep 17 00:00:00 2001 From: vsonnier Date: Wed, 1 Jun 2016 19:42:11 +0200 Subject: [PATCH 2/4] MISC 1: make IOThread input and output queues returned as ThreadQueueBase*, not void*, cleaner. then use static_cast for downcasts, because we know what we are doing --- src/IOThread.cpp | 4 ++-- src/IOThread.h | 4 ++-- src/audio/AudioThread.cpp | 4 ++-- src/demod/DemodulatorPreThread.cpp | 6 +++--- src/demod/DemodulatorThread.cpp | 10 +++++----- src/demod/DemodulatorWorkerThread.cpp | 4 ++-- src/process/FFTVisualDataThread.cpp | 4 ++-- src/sdr/SDRPostThread.cpp | 8 ++++---- src/sdr/SoapySDRThread.cpp | 2 +- 9 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/IOThread.cpp b/src/IOThread.cpp index 81345df..d57f5ba 100644 --- a/src/IOThread.cpp +++ b/src/IOThread.cpp @@ -50,7 +50,7 @@ void IOThread::setInputQueue(std::string qname, ThreadQueueBase *threadQueue) { this->onBindInput(qname, threadQueue); }; -void *IOThread::getInputQueue(std::string qname) { +ThreadQueueBase *IOThread::getInputQueue(std::string qname) { return input_queues[qname]; }; @@ -59,7 +59,7 @@ void IOThread::setOutputQueue(std::string qname, ThreadQueueBase *threadQueue) { this->onBindOutput(qname, threadQueue); }; -void *IOThread::getOutputQueue(std::string qname) { +ThreadQueueBase *IOThread::getOutputQueue(std::string qname) { return output_queues[qname]; }; diff --git a/src/IOThread.h b/src/IOThread.h index f67b81e..fd8c1d1 100644 --- a/src/IOThread.h +++ b/src/IOThread.h @@ -115,9 +115,9 @@ public: virtual void onBindInput(std::string name, ThreadQueueBase* threadQueue); void setInputQueue(std::string qname, ThreadQueueBase *threadQueue); - void *getInputQueue(std::string qname); + ThreadQueueBase *getInputQueue(std::string qname); void setOutputQueue(std::string qname, ThreadQueueBase *threadQueue); - void *getOutputQueue(std::string qname); + ThreadQueueBase *getOutputQueue(std::string qname); protected: std::map input_queues; diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index 2a02a03..23507c4 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -379,8 +379,8 @@ void AudioThread::run() { std::cout << "Audio thread started." << std::endl; - inputQueue = (AudioThreadInputQueue *)getInputQueue("AudioDataInput"); - threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue"); + inputQueue = static_cast(getInputQueue("AudioDataInput")); + threadQueueNotify = static_cast(getOutputQueue("NotifyQueue")); while (!terminated) { AudioThreadCommand command; diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index 19161b1..6b128ea 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -56,9 +56,9 @@ void DemodulatorPreThread::run() { ReBuffer buffers("DemodulatorPreThreadBuffers"); - iqInputQueue = (DemodulatorThreadInputQueue*)getInputQueue("IQDataInput"); - iqOutputQueue = (DemodulatorThreadPostInputQueue*)getOutputQueue("IQDataOutput"); - threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue"); + iqInputQueue = static_cast(getInputQueue("IQDataInput")); + iqOutputQueue = static_cast(getOutputQueue("IQDataOutput")); + threadQueueNotify = static_cast(getOutputQueue("NotifyQueue")); std::vector in_buf_data; std::vector out_buf_data; diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index e8f4036..6d642dd 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -32,7 +32,7 @@ void DemodulatorThread::onBindOutput(std::string name, ThreadQueueBase *threadQu //protects because it may be changed at runtime std::lock_guard < std::mutex > lock(m_mutexAudioVisOutputQueue); - audioVisOutputQueue = (DemodulatorThreadOutputQueue *)threadQueue; + audioVisOutputQueue = static_cast(threadQueue); } } @@ -69,10 +69,10 @@ void DemodulatorThread::run() { std::cout << "Demodulator thread started.." << std::endl; - iqInputQueue = (DemodulatorThreadPostInputQueue*)getInputQueue("IQDataInput"); - audioOutputQueue = (AudioThreadInputQueue*)getOutputQueue("AudioDataOutput"); - threadQueueControl = (DemodulatorThreadControlCommandQueue *)getInputQueue("ControlQueue"); - threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue"); + iqInputQueue = static_cast(getInputQueue("IQDataInput")); + audioOutputQueue = static_cast(getOutputQueue("AudioDataOutput")); + threadQueueControl = static_cast(getInputQueue("ControlQueue")); + threadQueueNotify = static_cast(getOutputQueue("NotifyQueue")); ModemIQData modemData; diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index dd43aaf..3b1a28d 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -14,8 +14,8 @@ void DemodulatorWorkerThread::run() { std::cout << "Demodulator worker thread started.." << std::endl; - commandQueue = (DemodulatorThreadWorkerCommandQueue *)getInputQueue("WorkerCommandQueue"); - resultQueue = (DemodulatorThreadWorkerResultQueue *)getOutputQueue("WorkerResultQueue"); + commandQueue = static_cast(getInputQueue("WorkerCommandQueue")); + resultQueue = static_cast(getOutputQueue("WorkerResultQueue")); while (!terminated) { bool filterChanged = false; diff --git a/src/process/FFTVisualDataThread.cpp b/src/process/FFTVisualDataThread.cpp index 88df9e7..86f7caf 100644 --- a/src/process/FFTVisualDataThread.cpp +++ b/src/process/FFTVisualDataThread.cpp @@ -24,8 +24,8 @@ SpectrumVisualProcessor *FFTVisualDataThread::getProcessor() { } void FFTVisualDataThread::run() { - DemodulatorThreadInputQueue *pipeIQDataIn = (DemodulatorThreadInputQueue *)getInputQueue("IQDataInput"); - SpectrumVisualDataQueue *pipeFFTDataOut = (SpectrumVisualDataQueue *)getOutputQueue("FFTDataOutput"); + DemodulatorThreadInputQueue *pipeIQDataIn = static_cast(getInputQueue("IQDataInput")); + SpectrumVisualDataQueue *pipeFFTDataOut = static_cast(getOutputQueue("FFTDataOutput")); fftQueue.set_max_num_items(100); pipeFFTDataOut->set_max_num_items(100); diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index fe6f62c..8ae854a 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -161,10 +161,10 @@ void SDRPostThread::run() { std::cout << "SDR post-processing thread started.." << std::endl; - iqDataInQueue = (SDRThreadIQDataQueue*)getInputQueue("IQDataInput"); - iqDataOutQueue = (DemodulatorThreadInputQueue*)getOutputQueue("IQDataOutput"); - iqVisualQueue = (DemodulatorThreadInputQueue*)getOutputQueue("IQVisualDataOutput"); - iqActiveDemodVisualQueue = (DemodulatorThreadInputQueue*)getOutputQueue("IQActiveDemodVisualDataOutput"); + iqDataInQueue = static_cast(getInputQueue("IQDataInput")); + iqDataOutQueue = static_cast(getOutputQueue("IQDataOutput")); + iqVisualQueue = static_cast(getOutputQueue("IQVisualDataOutput")); + iqActiveDemodVisualQueue = static_cast(getOutputQueue("IQActiveDemodVisualDataOutput")); iqDataInQueue->set_max_num_items(0); diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index 66c60ee..e3d121e 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -216,7 +216,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { } void SDRThread::readLoop() { - SDRThreadIQDataQueue* iqDataOutQueue = (SDRThreadIQDataQueue*) getOutputQueue("IQDataOutput"); + SDRThreadIQDataQueue* iqDataOutQueue = static_cast( getOutputQueue("IQDataOutput")); if (iqDataOutQueue == NULL) { return; From fc4fa3e74f10d3160df4268812e2c851ad090d51 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Wed, 1 Jun 2016 19:46:45 +0200 Subject: [PATCH 3/4] MISC 2: Make ReferenceCounter refcount itself protected by the same mutex of the whole class, for state consistency --- src/IOThread.h | 16 +++++++++++----- src/audio/AudioThread.h | 2 +- src/demod/DemodDefs.h | 2 +- src/modules/modem/Modem.h | 2 +- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/IOThread.h b/src/IOThread.h index fd8c1d1..eb4e94d 100644 --- a/src/IOThread.h +++ b/src/IOThread.h @@ -21,21 +21,27 @@ struct map_string_less : public std::binary_function lock(m_mutex); + refCount = rc; } void decRefCount() { - refCount.store(refCount.load()-1); + std::lock_guard < std::recursive_mutex > lock(m_mutex); + refCount--; } int getRefCount() { - return refCount.load(); + std::lock_guard < std::recursive_mutex > lock(m_mutex); + return refCount; } protected: - std::atomic_int refCount; + //this is a basic mutex for all ReferenceCounter derivatives operations INCLUDING the counter itself for consistency ! + mutable std::recursive_mutex m_mutex; + +private: + int refCount; }; diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index 5712288..57193f5 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -28,7 +28,7 @@ public: } ~AudioThreadInput() { - std::lock_guard < std::mutex > lock(m_mutex); + std::lock_guard < std::recursive_mutex > lock(m_mutex); } }; diff --git a/src/demod/DemodDefs.h b/src/demod/DemodDefs.h index b7d78a3..8dc5163 100644 --- a/src/demod/DemodDefs.h +++ b/src/demod/DemodDefs.h @@ -90,7 +90,7 @@ public: } ~DemodulatorThreadPostIQData() { - std::lock_guard < std::mutex > lock(m_mutex); + std::lock_guard < std::recursive_mutex > lock(m_mutex); } }; diff --git a/src/modules/modem/Modem.h b/src/modules/modem/Modem.h index 0172ea5..5a1c74e 100644 --- a/src/modules/modem/Modem.h +++ b/src/modules/modem/Modem.h @@ -32,7 +32,7 @@ public: } ~ModemIQData() { - std::lock_guard < std::mutex > lock(m_mutex); + std::lock_guard < std::recursive_mutex > lock(m_mutex); } }; From c3d949ddba1e9c5ade93a629a722a782c60e94a0 Mon Sep 17 00:00:00 2001 From: vsonnier Date: Wed, 1 Jun 2016 19:51:01 +0200 Subject: [PATCH 4/4] MISC 3: Make the whole BufferType life-cycle and recycling properly guarded against concurrent access --- src/IOThread.h | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/IOThread.h b/src/IOThread.h index eb4e94d..9d9232b 100644 --- a/src/IOThread.h +++ b/src/IOThread.h @@ -20,6 +20,7 @@ struct map_string_less : public std::binary_function lock(m_mutex); + + BufferType* buf = nullptr; for (outputBuffersI = outputBuffers.begin(); outputBuffersI != outputBuffers.end(); outputBuffersI++) { - if (!buf && (*outputBuffersI)->getRefCount() <= 0) { + if (buf == nullptr && (*outputBuffersI)->getRefCount() <= 0) { buf = (*outputBuffersI); - (*outputBuffersI)->setRefCount(0); + buf->setRefCount(0); } else if ((*outputBuffersI)->getRefCount() <= 0) { (*outputBuffersI)->decRefCount(); } } - if (buf) { + if (buf != nullptr) { if (outputBuffers.back()->getRefCount() < -REBUFFER_GC_LIMIT) { BufferType *ref = outputBuffers.back(); outputBuffers.pop_back(); @@ -87,6 +90,7 @@ public: } void purge() { + std::lock_guard < std::mutex > lock(m_mutex); while (!outputBuffers.empty()) { BufferType *ref = outputBuffers.front(); outputBuffers.pop_front(); @@ -97,6 +101,7 @@ private: std::string bufferId; std::deque outputBuffers; typename std::deque::iterator outputBuffersI; + mutable std::mutex m_mutex; };