diff --git a/src/AppFrame.cpp b/src/AppFrame.cpp index beb8761..c07a95e 100644 --- a/src/AppFrame.cpp +++ b/src/AppFrame.cpp @@ -806,11 +806,11 @@ void AppFrame::OnMenu(wxCommandEvent& event) { #endif else if (event.GetId() == wxID_SDR_START_STOP) { if (!wxGetApp().getSDRThread()->isTerminated()) { - wxGetApp().stopDevice(true); + wxGetApp().stopDevice(true, 2000); } else { SDRDeviceInfo *dev = wxGetApp().getDevice(); if (dev != nullptr) { - wxGetApp().setDevice(dev); + wxGetApp().setDevice(dev, 0); } } } else if (event.GetId() == wxID_LOW_PERF) { @@ -1645,6 +1645,7 @@ bool AppFrame::loadSession(std::string fileName) { } wxGetApp().getDemodMgr().setActiveDemodulator(nullptr, false); + wxGetApp().getDemodMgr().terminateAll(); try { diff --git a/src/CubicSDR.cpp b/src/CubicSDR.cpp index c926d8f..a87d927 100644 --- a/src/CubicSDR.cpp +++ b/src/CubicSDR.cpp @@ -133,8 +133,8 @@ long long strToFrequency(std::string freqStr) { } -CubicSDR::CubicSDR() : appframe(NULL), m_glContext(NULL), frequency(0), offset(0), ppm(0), snap(1), sampleRate(DEFAULT_SAMPLE_RATE), - sdrThread(NULL), sdrPostThread(NULL), spectrumVisualThread(NULL), demodVisualThread(NULL), pipeSDRIQData(NULL), pipeIQVisualData(NULL), pipeAudioVisualData(NULL), t_SDR(NULL), t_PostSDR(NULL) { +CubicSDR::CubicSDR() : frequency(0), offset(0), ppm(0), snap(1), sampleRate(DEFAULT_SAMPLE_RATE),agcMode(false) + { sampleRateInitialized.store(false); agcMode.store(true); soloMode.store(false); @@ -254,6 +254,7 @@ bool CubicSDR::OnInit() { sdrPostThread = new SDRPostThread(); sdrPostThread->setInputQueue("IQDataInput", pipeSDRIQData); + sdrPostThread->setOutputQueue("IQVisualDataOutput", pipeIQVisualData); sdrPostThread->setOutputQueue("IQDataOutput", pipeWaterfallIQVisualData); sdrPostThread->setOutputQueue("IQActiveDemodVisualDataOutput", pipeDemodIQVisualData); @@ -289,22 +290,24 @@ int CubicSDR::OnExit() { stopRig(); } #endif - - demodMgr.terminateAll(); - + + //The thread feeding them all should be terminated first, so: std::cout << "Terminating SDR thread.." << std::endl; sdrThread->terminate(); - sdrThread->isTerminated(1000); + sdrThread->isTerminated(3000); if (t_SDR) { t_SDR->join(); delete t_SDR; t_SDR = nullptr; } - + std::cout << "Terminating SDR post-processing thread.." << std::endl; sdrPostThread->terminate(); - + + std::cout << "Terminating All Demodulators.." << std::endl; + demodMgr.terminateAll(); + std::cout << "Terminating Visual Processor threads.." << std::endl; spectrumVisualThread->terminate(); demodVisualThread->terminate(); @@ -542,16 +545,11 @@ void CubicSDR::setSampleRate(long long rate_in) { } } -void CubicSDR::stopDevice(bool store) { - if (store) { - stoppedDev = sdrThread->getDevice(); - } else { - stoppedDev = nullptr; - } - sdrThread->setDevice(nullptr); - +void CubicSDR::stopDevice(bool store, int waitMsForTermination) { + + //Firt we must stop the threads sdrThread->terminate(); - sdrThread->isTerminated(1000); + sdrThread->isTerminated(waitMsForTermination); if (t_SDR) { t_SDR->join(); @@ -559,6 +557,15 @@ void CubicSDR::stopDevice(bool store) { t_SDR = nullptr; } + //Only now we can nullify devices + if (store) { + stoppedDev = sdrThread->getDevice(); + } + else { + stoppedDev = nullptr; + } + + sdrThread->setDevice(nullptr); } void CubicSDR::reEnumerateDevices() { @@ -568,10 +575,10 @@ void CubicSDR::reEnumerateDevices() { t_SDREnum = new std::thread(&SDREnumerator::threadMain, sdrEnum); } -void CubicSDR::setDevice(SDRDeviceInfo *dev) { +void CubicSDR::setDevice(SDRDeviceInfo *dev, int waitMsForTermination) { sdrThread->terminate(); - sdrThread->isTerminated(1000); + sdrThread->isTerminated(waitMsForTermination); if (t_SDR) { t_SDR->join(); diff --git a/src/CubicSDR.h b/src/CubicSDR.h index 767ae4f..07b28cb 100644 --- a/src/CubicSDR.h +++ b/src/CubicSDR.h @@ -98,8 +98,8 @@ public: long long getSampleRate(); std::vector *getDevices(); - void setDevice(SDRDeviceInfo *dev); - void stopDevice(bool store); + void setDevice(SDRDeviceInfo *dev, int waitMsForTermination); + void stopDevice(bool store, int waitMsForTermination); SDRDeviceInfo * getDevice(); ScopeVisualProcessor *getScopeProcessor(); @@ -173,10 +173,10 @@ public: private: int FilterEvent(wxEvent& event); - AppFrame *appframe; + AppFrame *appframe = nullptr; AppConfig config; - PrimaryGLContext *m_glContext; - std::vector *devs; + PrimaryGLContext *m_glContext = nullptr; + std::vector *devs = nullptr; DemodulatorMgr demodMgr; @@ -186,27 +186,31 @@ private: std::atomic_llong sampleRate; std::atomic_bool agcMode; - SDRThread *sdrThread; - SDREnumerator *sdrEnum; - SDRPostThread *sdrPostThread; - SpectrumVisualDataThread *spectrumVisualThread; - SpectrumVisualDataThread *demodVisualThread; + SDRThread *sdrThread = nullptr; + SDREnumerator *sdrEnum = nullptr; + SDRPostThread *sdrPostThread = nullptr; + SpectrumVisualDataThread *spectrumVisualThread = nullptr; + SpectrumVisualDataThread *demodVisualThread = nullptr; - SDRThreadIQDataQueue* pipeSDRIQData; - DemodulatorThreadInputQueue* pipeIQVisualData; - DemodulatorThreadOutputQueue* pipeAudioVisualData; - DemodulatorThreadInputQueue* pipeDemodIQVisualData; - DemodulatorThreadInputQueue* pipeWaterfallIQVisualData; - DemodulatorThreadInputQueue* pipeActiveDemodIQVisualData; + SDRThreadIQDataQueue* pipeSDRIQData = nullptr; + DemodulatorThreadInputQueue* pipeIQVisualData = nullptr; + DemodulatorThreadOutputQueue* pipeAudioVisualData = nullptr; + DemodulatorThreadInputQueue* pipeDemodIQVisualData = nullptr; + DemodulatorThreadInputQueue* pipeWaterfallIQVisualData = nullptr; + DemodulatorThreadInputQueue* pipeActiveDemodIQVisualData = nullptr; ScopeVisualProcessor scopeProcessor; - SDRDevicesDialog *deviceSelectorDialog; + SDRDevicesDialog *deviceSelectorDialog = nullptr; SoapySDR::Kwargs streamArgs; SoapySDR::Kwargs settingArgs; - std::thread *t_SDR, *t_SDREnum, *t_PostSDR, *t_SpectrumVisual, *t_DemodVisual; + std::thread *t_SDR = nullptr; + std::thread *t_SDREnum = nullptr; + std::thread *t_PostSDR = nullptr; + std::thread *t_SpectrumVisual = nullptr; + std::thread *t_DemodVisual = nullptr; std::atomic_bool devicesReady; std::atomic_bool devicesFailed; std::atomic_bool deviceSelectorOpen; @@ -224,8 +228,8 @@ private: std::atomic_bool soloMode; SDRDeviceInfo *stoppedDev; #ifdef USE_HAMLIB - RigThread* rigThread; - std::thread *t_Rig; + RigThread* rigThread = nullptr; + std::thread *t_Rig = nullptr; #endif }; diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index 3910be5..f1c2038 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -6,44 +6,54 @@ #include "DemodulatorThread.h" #include "DemodulatorInstance.h" #include +#include + std::map AudioThread::deviceController; std::map AudioThread::deviceSampleRate; std::map AudioThread::deviceThread; AudioThread::AudioThread() : IOThread(), - currentInput(NULL), inputQueue(NULL), nBufferFrames(1024), sampleRate(0) { + currentInput(nullptr), inputQueue(nullptr), nBufferFrames(1024), sampleRate(0) { audioQueuePtr.store(0); underflowCount.store(0); active.store(false); outputDevice.store(-1); - gain.store(1.0); - - vBoundThreads = new std::vector; - boundThreads.store(vBoundThreads); + gain = 1.0; } AudioThread::~AudioThread() { - boundThreads.store(nullptr); - delete vBoundThreads; + +} + +std::recursive_mutex & AudioThread::getMutex() +{ + return m_mutex; } void AudioThread::bindThread(AudioThread *other) { - if (std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other) == boundThreads.load()->end()) { - boundThreads.load()->push_back(other); + + std::lock_guard lock(m_mutex); + + if (std::find(boundThreads.begin(), boundThreads.end(), other) == boundThreads.end()) { + boundThreads.push_back(other); } } void AudioThread::removeThread(AudioThread *other) { + + std::lock_guard lock(m_mutex); + std::vector::iterator i; - i = std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other); - if (i != boundThreads.load()->end()) { - boundThreads.load()->erase(i); + i = std::find(boundThreads.begin(), boundThreads.end(), other); + if (i != boundThreads.end()) { + boundThreads.erase(i); } } void AudioThread::deviceCleanup() { + std::map::iterator i; for (i = deviceController.begin(); i != deviceController.end(); i++) { @@ -53,53 +63,67 @@ void AudioThread::deviceCleanup() { static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status, void *userData) { - AudioThread *src = (AudioThread *) userData; - float *out = (float*) outputBuffer; + + float *out = (float*)outputBuffer; + + //Zero output buffer in all cases: this allow to mute audio if no AudioThread data is + //actually active. memset(out, 0, nBufferFrames * 2 * sizeof(float)); + AudioThread *src = (AudioThread *) userData; + + std::lock_guard lock(src->getMutex()); + if (src->isTerminated()) { return 1; } if (status) { - std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl; + std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl; } - if (src->boundThreads.load()->empty()) { - return 0; + if (src->boundThreads.empty()) { + return 0; } - float peak = 0.0; + + double peak = 0.0; + + //for all boundThreads + for (size_t j = 0; j < src->boundThreads.size(); j++) { + + AudioThread *srcmix = src->boundThreads[j]; + + //lock every single boundThread srcmix in succession the time we process + //its audio samples. + std::lock_guard lock(srcmix->getMutex()); - for (size_t j = 0; j < src->boundThreads.load()->size(); j++) { - AudioThread *srcmix = (*(src->boundThreads.load()))[j]; if (srcmix->isTerminated() || !srcmix->inputQueue || srcmix->inputQueue->empty() || !srcmix->isActive()) { continue; } if (!srcmix->currentInput) { srcmix->audioQueuePtr = 0; - if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { - continue; - } - srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->isTerminated()) { + + if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) { continue; } + continue; } if (srcmix->currentInput->sampleRate != src->getSampleRate()) { - while (srcmix->inputQueue->size()) { - srcmix->inputQueue->pop(srcmix->currentInput); + + while (srcmix->inputQueue->try_pop(srcmix->currentInput)) { + if (srcmix->currentInput) { if (srcmix->currentInput->sampleRate == src->getSampleRate()) { break; } srcmix->currentInput->decRefCount(); } - srcmix->currentInput = NULL; - } + srcmix->currentInput = nullptr; + } //end while srcmix->audioQueuePtr = 0; @@ -114,37 +138,35 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { srcmix->currentInput->decRefCount(); - srcmix->currentInput = NULL; + srcmix->currentInput = nullptr; } - if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { + + if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) { continue; - } - srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->isTerminated()) { - continue; - } + } } continue; } - float mixPeak = srcmix->currentInput->peak * srcmix->gain; + double mixPeak = srcmix->currentInput->peak * srcmix->gain; if (srcmix->currentInput->channels == 1) { + for (unsigned int i = 0; i < nBufferFrames; i++) { + if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) { srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { srcmix->currentInput->decRefCount(); - srcmix->currentInput = NULL; + srcmix->currentInput = nullptr; } - if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { + + if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) { break; } - srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->isTerminated()) { - break; - } - float srcPeak = srcmix->currentInput->peak * srcmix->gain; + + + double srcPeak = srcmix->currentInput->peak * srcmix->gain; if (mixPeak < srcPeak) { mixPeak = srcPeak; } @@ -158,25 +180,25 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned } } else { for (int i = 0, iMax = srcmix->currentInput->channels * nBufferFrames; i < iMax; i++) { + if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) { srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { srcmix->currentInput->decRefCount(); - srcmix->currentInput = NULL; + srcmix->currentInput = nullptr; } - if (srcmix->isTerminated() || srcmix->inputQueue->empty()) { + + if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) { break; } - srcmix->inputQueue->pop(srcmix->currentInput); - if (srcmix->isTerminated()) { - break; - } - float srcPeak = srcmix->currentInput->peak * srcmix->gain; + + double srcPeak = srcmix->currentInput->peak * srcmix->gain; if (mixPeak < srcPeak) { mixPeak = srcPeak; } } if (srcmix->currentInput && srcmix->currentInput->data.size()) { + out[i] = out[i] + srcmix->currentInput->data[srcmix->audioQueuePtr] * srcmix->gain; } srcmix->audioQueuePtr++; @@ -186,11 +208,15 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned peak += mixPeak; } + //normalize volume if (peak > 1.0) { + float invPeak = (float)(1.0 / peak); + for (unsigned int i = 0; i < nBufferFrames * 2; i++) { - out[i] /= peak; + out[i] *= invPeak; } } + return 0; } @@ -247,6 +273,8 @@ void AudioThread::enumerateDevices(std::vector &devs) { } void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { + + if (deviceController.find(deviceId) != deviceController.end()) { AudioThreadCommand refreshDevice; refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE; @@ -256,14 +284,17 @@ void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { } void AudioThread::setSampleRate(int sampleRate) { + + std::lock_guard lock(m_mutex); + if (deviceController[outputDevice.load()] == this) { deviceSampleRate[outputDevice.load()] = sampleRate; dac.stopStream(); dac.closeStream(); - for (size_t j = 0; j < boundThreads.load()->size(); j++) { - AudioThread *srcmix = (*(boundThreads.load()))[j]; + for (size_t j = 0; j < boundThreads.size(); j++) { + AudioThread *srcmix = boundThreads[j]; srcmix->setSampleRate(sampleRate); } @@ -286,10 +317,15 @@ void AudioThread::setSampleRate(int sampleRate) { } int AudioThread::getSampleRate() { + std::lock_guard lock(m_mutex); + return this->sampleRate; } void AudioThread::setupDevice(int deviceId) { + + std::lock_guard lock(m_mutex); + parameters.deviceId = deviceId; parameters.nChannels = 2; parameters.firstChannel = 0; @@ -323,6 +359,7 @@ void AudioThread::setupDevice(int deviceId) { deviceThread[parameters.deviceId] = new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId]); } else if (deviceController[parameters.deviceId] == this) { + //Attach callback dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *) this, &opts); dac.startStream(); } else { @@ -340,6 +377,8 @@ void AudioThread::setupDevice(int deviceId) { } int AudioThread::getOutputDevice() { + std::lock_guard lock(m_mutex); + if (outputDevice == -1) { return dac.getDefaultOutputDevice(); } @@ -347,6 +386,9 @@ int AudioThread::getOutputDevice() { } void AudioThread::setInitOutputDevice(int deviceId, int sampleRate) { + + std::lock_guard lock(m_mutex); + outputDevice = deviceId; if (sampleRate == -1) { if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) { @@ -379,8 +421,10 @@ void AudioThread::run() { inputQueue = static_cast(getInputQueue("AudioDataInput")); + //Infinite loop, witing for commands or for termination while (!stopping) { AudioThreadCommand command; + cmdQueue.pop(command); if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE) { @@ -391,20 +435,26 @@ void AudioThread::run() { } } - // Drain any remaining inputs - if (inputQueue) while (!inputQueue->empty()) { - AudioThreadInput *ref; - inputQueue->pop(ref); + //Thread termination, prevent fancy things to happen, lock the whole thing: + //This way audioThreadCallback is rightly protected from thread termination + std::lock_guard lock(m_mutex); + + // Drain any remaining inputs, with a non-blocking pop + AudioThreadInput *ref; + while (inputQueue && inputQueue->try_pop(ref)) { + if (ref) { ref->decRefCount(); } - } + } //end while + //Nullify currentInput... if (currentInput) { currentInput->setRefCount(0); currentInput = nullptr; } + //Stop if (deviceController[parameters.deviceId] != this) { deviceController[parameters.deviceId]->removeThread(this); } else { @@ -430,10 +480,14 @@ void AudioThread::terminate() { } bool AudioThread::isActive() { + std::lock_guard lock(m_mutex); + return active; } void AudioThread::setActive(bool state) { + + std::lock_guard lock(m_mutex); AudioThreadInput *dummy; if (state && !active && inputQueue) { @@ -444,8 +498,9 @@ void AudioThread::setActive(bool state) { // Activity state changing, clear any inputs if(inputQueue) { - while (!inputQueue->empty()) { // flush queue - inputQueue->pop(dummy); + + while (inputQueue->try_pop(dummy)) { // flush queue, non-blocking pop + if (dummy) { dummy->decRefCount(); } @@ -459,6 +514,9 @@ AudioThreadCommandQueue *AudioThread::getCommandQueue() { } void AudioThread::setGain(float gain_in) { + + std::lock_guard lock(m_mutex); + if (gain < 0.0) { gain = 0.0; } @@ -469,5 +527,8 @@ void AudioThread::setGain(float gain_in) { } float AudioThread::getGain() { + + std::lock_guard lock(m_mutex); + return gain; } diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index aa664ac..65388c7 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -57,7 +57,7 @@ public: std::atomic_bool initialized; std::atomic_bool active; std::atomic_int outputDevice; - std::atomic gain; + float gain; AudioThread(); ~AudioThread(); @@ -88,7 +88,13 @@ private: AudioThreadCommandQueue cmdQueue; int sampleRate; + //The own m_mutex protecting this AudioThread, in particular boundThreads + std::recursive_mutex m_mutex; + public: + //give access to the this AudioThread lock + std::recursive_mutex& getMutex(); + void bindThread(AudioThread *other); void removeThread(AudioThread *other); @@ -97,7 +103,8 @@ public: static std::map deviceThread; static void deviceCleanup(); static void setDeviceSampleRate(int deviceId, int sampleRate); - std::atomic *> boundThreads; - std::vector *vBoundThreads; + + //protected by m_mutex + std::vector boundThreads; }; diff --git a/src/demod/DemodulatorInstance.cpp b/src/demod/DemodulatorInstance.cpp index 4852f9c..5b6c37f 100644 --- a/src/demod/DemodulatorInstance.cpp +++ b/src/demod/DemodulatorInstance.cpp @@ -152,7 +152,6 @@ bool DemodulatorInstance::isTerminated() { bool demodTerminated = demodulatorThread->isTerminated(); bool preDemodTerminated = demodulatorPreThread->isTerminated(); - //Cleanup the worker threads, if the threads are indeed terminated if (audioTerminated) { @@ -168,7 +167,6 @@ bool DemodulatorInstance::isTerminated() { if (demodTerminated) { if (t_Demod) { - #ifdef __APPLE__ pthread_join(t_Demod, nullptr); #else @@ -185,8 +183,8 @@ bool DemodulatorInstance::isTerminated() { } if (preDemodTerminated) { - - if (t_PreDemod) { + + if (t_PreDemod) { #ifdef __APPLE__ pthread_join(t_PreDemod, NULL); @@ -195,10 +193,9 @@ bool DemodulatorInstance::isTerminated() { delete t_PreDemod; #endif t_PreDemod = nullptr; - } + } } - bool terminated = audioTerminated && demodTerminated && preDemodTerminated; return terminated; diff --git a/src/demod/DemodulatorMgr.cpp b/src/demod/DemodulatorMgr.cpp index fd2abb9..61be65c 100644 --- a/src/demod/DemodulatorMgr.cpp +++ b/src/demod/DemodulatorMgr.cpp @@ -136,13 +136,13 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) { i = std::find(demods.begin(), demods.end(), demod); if (activeDemodulator == demod) { - activeDemodulator = NULL; + activeDemodulator = nullptr; } if (lastActiveDemodulator == demod) { - lastActiveDemodulator = NULL; + lastActiveDemodulator = nullptr; } if (activeVisualDemodulator == demod) { - activeVisualDemodulator = NULL; + activeVisualDemodulator = nullptr; } if (i != demods.end()) { @@ -150,6 +150,7 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) { } //Ask for termination + demod->setActive(false); demod->terminate(); //Do not cleanup immediatly @@ -200,27 +201,28 @@ bool DemodulatorMgr::anyDemodulatorsAt(long long freq, int bandwidth) { void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool temporary) { - std::lock_guard < std::recursive_mutex > lock(demods_busy); + if (!temporary) { - if (activeDemodulator != NULL) { - lastActiveDemodulator = activeDemodulator; + if (activeDemodulator.load() != nullptr) { + lastActiveDemodulator = activeDemodulator.load(); updateLastState(); } else { lastActiveDemodulator = demod; } updateLastState(); #if USE_HAMLIB - if (wxGetApp().rigIsActive() && wxGetApp().getRigThread()->getFollowModem() && lastActiveDemodulator) { - wxGetApp().getRigThread()->setFrequency(lastActiveDemodulator->getFrequency(),true); + if (wxGetApp().rigIsActive() && wxGetApp().getRigThread()->getFollowModem() && lastActiveDemodulator.load()) { + wxGetApp().getRigThread()->setFrequency(lastActiveDemodulator.load()->getFrequency(),true); } #endif } else { + std::lock_guard < std::recursive_mutex > lock(demods_busy); garbageCollect(); ReBufferGC::garbageCollect(); } - if (activeVisualDemodulator) { - activeVisualDemodulator->setVisualOutputQueue(NULL); + if (activeVisualDemodulator.load()) { + activeVisualDemodulator.load()->setVisualOutputQueue(nullptr); } if (demod) { demod->setVisualOutputQueue(wxGetApp().getAudioVisualQueue()); @@ -238,7 +240,7 @@ void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool tempo } DemodulatorInstance *DemodulatorMgr::getActiveDemodulator() { - if (activeDemodulator && !activeDemodulator->isActive()) { + if (activeDemodulator.load() && !activeDemodulator.load()->isActive()) { activeDemodulator = getLastActiveDemodulator(); } return activeDemodulator; @@ -262,8 +264,6 @@ void DemodulatorMgr::garbageCollect() { std::cout << "Garbage collected demodulator instance " << deleted->getLabel() << std::endl; delete deleted; - - return; } } @@ -273,27 +273,28 @@ void DemodulatorMgr::garbageCollect() { void DemodulatorMgr::updateLastState() { std::lock_guard < std::recursive_mutex > lock(demods_busy); + if (std::find(demods.begin(), demods.end(), lastActiveDemodulator) == demods.end()) { - if (activeDemodulator && activeDemodulator->isActive()) { - lastActiveDemodulator = activeDemodulator; - } else if (activeDemodulator && !activeDemodulator->isActive()){ - activeDemodulator = NULL; - lastActiveDemodulator = NULL; + if (activeDemodulator.load() && activeDemodulator.load()->isActive()) { + lastActiveDemodulator = activeDemodulator.load(); + } else if (activeDemodulator.load() && !activeDemodulator.load()->isActive()){ + activeDemodulator = nullptr; + lastActiveDemodulator = nullptr; } } - if (lastActiveDemodulator && !lastActiveDemodulator->isActive()) { - lastActiveDemodulator = NULL; + if (lastActiveDemodulator.load() && !lastActiveDemodulator.load()->isActive()) { + lastActiveDemodulator = nullptr; } - if (lastActiveDemodulator) { - lastBandwidth = lastActiveDemodulator->getBandwidth(); - lastDemodType = lastActiveDemodulator->getDemodulatorType(); - lastDemodLock = lastActiveDemodulator->getDemodulatorLock()?true:false; - lastSquelchEnabled = lastActiveDemodulator->isSquelchEnabled(); - lastSquelch = lastActiveDemodulator->getSquelchLevel(); - lastGain = lastActiveDemodulator->getGain(); - lastModemSettings[lastDemodType] = lastActiveDemodulator->readModemSettings(); + if (lastActiveDemodulator.load()) { + lastBandwidth = lastActiveDemodulator.load()->getBandwidth(); + lastDemodType = lastActiveDemodulator.load()->getDemodulatorType(); + lastDemodLock = lastActiveDemodulator.load()->getDemodulatorLock()?true:false; + lastSquelchEnabled = lastActiveDemodulator.load()->isSquelchEnabled(); + lastSquelch = lastActiveDemodulator.load()->getSquelchLevel(); + lastGain = lastActiveDemodulator.load()->getGain(); + lastModemSettings[lastDemodType] = lastActiveDemodulator.load()->readModemSettings(); } } diff --git a/src/demod/DemodulatorMgr.h b/src/demod/DemodulatorMgr.h index 09b72ce..a993f84 100644 --- a/src/demod/DemodulatorMgr.h +++ b/src/demod/DemodulatorMgr.h @@ -53,15 +53,17 @@ public: void setLastModemSettings(std::string, ModemSettings); void updateLastState(); - + private: + void garbageCollect(); std::vector demods; std::vector demods_deleted; - DemodulatorInstance *activeDemodulator; - DemodulatorInstance *lastActiveDemodulator; - DemodulatorInstance *activeVisualDemodulator; + + std::atomic activeDemodulator; + std::atomic lastActiveDemodulator; + std::atomic activeVisualDemodulator; int lastBandwidth; std::string lastDemodType; diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index 69c9e34..e4286fe 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -66,6 +66,7 @@ void DemodulatorPreThread::run() { while (!stopping) { DemodulatorThreadIQData *inp; + iqInputQueue->pop(inp); if (frequencyChanged.load()) { @@ -205,17 +206,20 @@ void DemodulatorPreThread::run() { resamp->modemKit = cModemKit; resamp->sampleRate = currentBandwidth; - iqOutputQueue->push(resamp); + if (!iqOutputQueue->push(resamp)) { + resamp->setRefCount(0); + std::cout << "DemodulatorPreThread::run() cannot push resamp into iqOutputQueue, is full !" << std::endl; + std::this_thread::yield(); + } } inp->decRefCount(); - if (!stopping && !workerResults->empty()) { - while (!workerResults->empty()) { - DemodulatorWorkerThreadResult result; - workerResults->pop(result); - - switch (result.cmd) { + DemodulatorWorkerThreadResult result; + //process all worker results until + while (!stopping && workerResults->try_pop(result)) { + + switch (result.cmd) { case DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS: if (result.iqResampler) { if (iqResampler) { @@ -258,20 +262,19 @@ void DemodulatorPreThread::run() { break; default: break; - } } - } + } //end while if ((cModem != nullptr) && modemSettingsChanged.load()) { cModem->writeSettings(modemSettingsBuffered); modemSettingsBuffered.clear(); modemSettingsChanged.store(false); } - } + } //end while stopping - while (!iqOutputQueue->empty()) { - DemodulatorThreadPostIQData *tmp; - iqOutputQueue->pop(tmp); + DemodulatorThreadPostIQData *tmp; + while (iqOutputQueue->try_pop(tmp)) { + tmp->decRefCount(); } buffers.purge(); @@ -337,7 +340,10 @@ int DemodulatorPreThread::getAudioSampleRate() { void DemodulatorPreThread::terminate() { IOThread::terminate(); DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue - iqInputQueue->push(inp); + if (!iqInputQueue->push(inp)) { + delete inp; + } + DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL); workerQueue->push(command); diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 391c092..e2853a7 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -74,6 +74,7 @@ void DemodulatorThread::run() { while (!stopping) { DemodulatorThreadPostIQData *inp; + iqInputQueue->pop(inp); // std::lock_guard < std::mutex > lock(inp->m_mutex); @@ -238,55 +239,67 @@ void DemodulatorThread::run() { ati_vis->type = 0; } - localAudioVisOutputQueue->push(ati_vis); + if (!localAudioVisOutputQueue->push(ati_vis)) { + ati_vis->setRefCount(0); + std::cout << "DemodulatorThread::run() cannot push ati_vis into localAudioVisOutputQueue, is full !" << std::endl; + std::this_thread::yield(); + } } if (ati != nullptr) { if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) { - audioOutputQueue->push(ati); + + if (!audioOutputQueue->push(ati)) { + ati->decRefCount(); + std::cout << "DemodulatorThread::run() cannot push ati into audioOutputQueue, is full !" << std::endl; + std::this_thread::yield(); + } + } else { ati->setRefCount(0); } } - if (!threadQueueControl->empty()) { - while (!threadQueueControl->empty()) { - DemodulatorThreadControlCommand command; - threadQueueControl->pop(command); - - switch (command.cmd) { - case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON: - squelchEnabled = true; - break; - case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_OFF: - squelchEnabled = false; - break; - default: - break; - } + DemodulatorThreadControlCommand command; + + //empty command queue, execute commands + while (threadQueueControl->try_pop(command)) { + + switch (command.cmd) { + case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_ON: + squelchEnabled = true; + break; + case DemodulatorThreadControlCommand::DEMOD_THREAD_CMD_CTL_SQUELCH_OFF: + squelchEnabled = false; + break; + default: + break; } } + inp->decRefCount(); } // end while !stopping - // Purge any unused inputs - while (!iqInputQueue->empty()) { - DemodulatorThreadPostIQData *ref; - iqInputQueue->pop(ref); + // Purge any unused inputs, with a non-blocking pop + DemodulatorThreadPostIQData *ref; + while (iqInputQueue->try_pop(ref)) { + if (ref) { // May have other consumers; just decrement ref->decRefCount(); } } - while (!audioOutputQueue->empty()) { - AudioThreadInput *ref; - audioOutputQueue->pop(ref); - if (ref) { // Originated here; set RefCount to 0 - ref->setRefCount(0); + + AudioThreadInput *ref_audio; + while (audioOutputQueue->try_pop(ref_audio)) { + + if (ref_audio) { // Originated here; set RefCount to 0 + ref_audio->setRefCount(0); } } + outputBuffers.purge(); // std::cout << "Demodulator thread done." << std::endl; @@ -295,7 +308,9 @@ void DemodulatorThread::run() { void DemodulatorThread::terminate() { IOThread::terminate(); DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue - iqInputQueue->push(inp); + if (!iqInputQueue->push(inp)) { + delete inp; + } } bool DemodulatorThread::isMuted() { diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index 4fa600b..c5c59e8 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -24,8 +24,12 @@ void DemodulatorWorkerThread::run() { DemodulatorWorkerThreadCommand command; bool done = false; + //Beware of the subtility here, + //we are waiting for the first command to show up (blocking!) + //then consuming the commands until done. while (!done) { commandQueue->pop(command); + switch (command.cmd) { case DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS: filterChanged = true; diff --git a/src/forms/SDRDevices/SDRDevices.cpp b/src/forms/SDRDevices/SDRDevices.cpp index d22e5b9..df81fb8 100644 --- a/src/forms/SDRDevices/SDRDevices.cpp +++ b/src/forms/SDRDevices/SDRDevices.cpp @@ -314,7 +314,7 @@ void SDRDevicesDialog::OnUseSelected( wxMouseEvent& event) { devConfig->setStreamOpts(streamArgs); wxGetApp().setDeviceArgs(settingArgs); wxGetApp().setStreamArgs(streamArgs); - wxGetApp().setDevice(dev); + wxGetApp().setDevice(dev,0); Close(); } @@ -483,7 +483,7 @@ void SDRDevicesDialog::doRefreshDevices() { editId = nullptr; removeId = nullptr; dev = nullptr; - wxGetApp().stopDevice(false); + wxGetApp().stopDevice(false, 2000); devTree->DeleteAllItems(); devTree->Disable(); m_propertyGrid->Clear(); diff --git a/src/process/FFTDataDistributor.cpp b/src/process/FFTDataDistributor.cpp index c4c5632..140a50f 100644 --- a/src/process/FFTDataDistributor.cpp +++ b/src/process/FFTDataDistributor.cpp @@ -17,6 +17,7 @@ unsigned int FFTDataDistributor::getLinesPerSecond() { } void FFTDataDistributor::process() { + while (!input->empty()) { if (!isAnyOutputEmpty()) { return; diff --git a/src/process/ScopeVisualProcessor.cpp b/src/process/ScopeVisualProcessor.cpp index 97715d4..9815a77 100644 --- a/src/process/ScopeVisualProcessor.cpp +++ b/src/process/ScopeVisualProcessor.cpp @@ -68,10 +68,10 @@ void ScopeVisualProcessor::process() { if (!isOutputEmpty()) { return; } - if (!input->empty()) { - AudioThreadInput *audioInputData; - input->pop(audioInputData); - + AudioThreadInput *audioInputData; + + if (input->try_pop(audioInputData)) { + if (!audioInputData) { return; } @@ -271,5 +271,5 @@ void ScopeVisualProcessor::process() { } else { delete audioInputData; //->decRefCount(); } - } + } //end if try_pop() } diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index a2a33a5..2b3bf2c 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -86,11 +86,10 @@ protected: output->setRefCount(outputs.size()); for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) { - if ((*outputs_i)->full()) { + + if (!(*outputs_i)->push(output)) { output->decRefCount(); - } else { - (*outputs_i)->push(output); - } + } } } @@ -107,12 +106,16 @@ template class VisualDataDistributor : public VisualProcessor { protected: void process() { - while (!VisualProcessor::input->empty()) { + OutputDataType *inp; + while (VisualProcessor::input->try_pop(inp)) { + if (!VisualProcessor::isAnyOutputEmpty()) { + if (inp) { + inp->decRefCount(); + } return; } - OutputDataType *inp; - VisualProcessor::input->pop(inp); + if (inp) { VisualProcessor::distribute(inp); } @@ -125,12 +128,15 @@ template class VisualDataReDistributor : public VisualProcessor { protected: void process() { - while (!VisualProcessor::input->empty()) { + OutputDataType *inp; + while (VisualProcessor::input->try_pop(inp)) { + if (!VisualProcessor::isAnyOutputEmpty()) { + if (inp) { + inp->decRefCount(); + } return; } - OutputDataType *inp; - VisualProcessor::input->pop(inp); if (inp) { OutputDataType *outp = buffers.getBuffer(); diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index f730375..40cda6d 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -86,7 +86,7 @@ void SDRPostThread::updateActiveDemodulators() { nRunDemods = 0; long long centerFreq = wxGetApp().getFrequency(); - + for (demod_i = demodulators.begin(); demod_i != demodulators.end(); demod_i++) { DemodulatorInstance *demod = *demod_i; DemodulatorThreadInputQueue *demodQueue = demod->getIQInputDataPipe(); @@ -108,7 +108,9 @@ void SDRPostThread::updateActiveDemodulators() { DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData; dummyDataOut->frequency = frequency; dummyDataOut->sampleRate = sampleRate; - demodQueue->push(dummyDataOut); + if (!demodQueue->push(dummyDataOut)) { + delete dummyDataOut; + } } // follow if follow mode @@ -119,6 +121,7 @@ void SDRPostThread::updateActiveDemodulators() { } else if (!demod->isActive()) { // in range, activate if not activated demod->setActive(true); if (wxGetApp().getDemodMgr().getLastActiveDemodulator() == NULL) { + wxGetApp().getDemodMgr().setActiveDemodulator(demod); } } @@ -181,8 +184,6 @@ void SDRPostThread::run() { iqDataOutQueue = static_cast(getOutputQueue("IQDataOutput")); iqVisualQueue = static_cast(getOutputQueue("IQVisualDataOutput")); iqActiveDemodVisualQueue = static_cast(getOutputQueue("IQActiveDemodVisualDataOutput")); - - iqDataInQueue->set_max_num_items(0); while (!stopping) { SDRThreadIQData *data_in; @@ -212,14 +213,16 @@ void SDRPostThread::run() { } } + //Only update the list of demodulators here if (doUpdate) { updateActiveDemodulators(); } - } + } //end while - if (iqVisualQueue && !iqVisualQueue->empty()) { - DemodulatorThreadIQData *visualDataDummy; - iqVisualQueue->pop(visualDataDummy); + //Be safe, remove as many elements as possible + DemodulatorThreadIQData *visualDataDummy; + while (iqVisualQueue && iqVisualQueue->try_pop(visualDataDummy)) { + visualDataDummy->decRefCount(); } // buffers.purge(); @@ -231,7 +234,9 @@ void SDRPostThread::run() { void SDRPostThread::terminate() { IOThread::terminate(); SDRThreadIQData *dummy = new SDRThreadIQData; - iqDataInQueue->push(dummy); + if (!iqDataInQueue->push(dummy)) { + delete dummy; + } } void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { @@ -292,19 +297,34 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], dataSize, &demodDataOut->data[0]); if (doDemodVisOut) { - iqActiveDemodVisualQueue->push(demodDataOut); + if (!iqActiveDemodVisualQueue->push(demodDataOut)) { + demodDataOut->decRefCount(); + std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl; + std::this_thread::yield(); + } } if (doIQDataOut) { - iqDataOutQueue->push(demodDataOut); + if (!iqDataOutQueue->push(demodDataOut)) { + demodDataOut->decRefCount(); + std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqDataOutQueue, is full !" << std::endl; + std::this_thread::yield(); + } } if (doVisOut) { - iqVisualQueue->push(demodDataOut); + if (!iqVisualQueue->push(demodDataOut)) { + demodDataOut->decRefCount(); + std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqVisualQueue, is full !" << std::endl; + std::this_thread::yield(); + } } for (size_t i = 0; i < nRunDemods; i++) { - runDemods[i]->getIQInputDataPipe()->push(demodDataOut); + if (!runDemods[i]->getIQInputDataPipe()->push(demodDataOut)) { + demodDataOut->decRefCount(); + std::this_thread::yield(); + } } } } @@ -342,9 +362,19 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { iqDataOut->sampleRate = data_in->sampleRate; iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize); - iqDataOutQueue->push(iqDataOut); - if (doVis) { - iqVisualQueue->push(iqDataOut); + if (!iqDataOutQueue->push(iqDataOut)) { + std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqDataOutQueue, is full !" << std::endl; + iqDataOut->decRefCount(); + std::this_thread::yield(); + } + + + if (doVis) { + if (!iqVisualQueue->push(iqDataOut)) { + std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqVisualQueue, is full !" << std::endl; + iqDataOut->decRefCount(); + std::this_thread::yield(); + } } } @@ -440,13 +470,21 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { } if (doDemodVis) { - iqActiveDemodVisualQueue->push(demodDataOut); + if (!iqActiveDemodVisualQueue->push(demodDataOut)) { + std::cout << "SDRPostThread::runPFBCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl; + demodDataOut->decRefCount(); + std::this_thread::yield(); + } } for (size_t j = 0; j < nRunDemods; j++) { if (demodChannel[j] == i) { DemodulatorInstance *demod = runDemods[j]; - demod->getIQInputDataPipe()->push(demodDataOut); + + if (!demod->getIQInputDataPipe()->push(demodDataOut)) { + demodDataOut->decRefCount(); + std::this_thread::yield(); + } } } } diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index db450f9..a413cf4 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -162,6 +162,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { int nElems = numElems.load(); int mtElems = mtuElems.load(); + //If overflow occured on the previous readStream(), transfer it in inpBuffer now if (numOverflow > 0) { int n_overflow = numOverflow; if (n_overflow > nElems) { @@ -176,9 +177,18 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { } } + //attempt readStream() at most nElems, by mtElems-sized chunks, append inpBuffer. while (n_read < nElems && !stopping) { int n_requested = nElems-n_read; + int n_stream_read = device->readStream(stream, buffs, mtElems, flags, timeNs); + + //if the n_stream_read <= 0, bail out from reading. + if (n_stream_read <= 0) { + break; + } + + //sucess read beyond nElems, with overflow if ((n_read + n_stream_read) > nElems) { memcpy(&inpBuffer.data[n_read], buffs[0], n_requested * sizeof(float) * 2); numOverflow = n_stream_read-n_requested; @@ -194,7 +204,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { } } - if (n_read > 0 && !stopping) { + if (n_read > 0 && !stopping && !iqDataOutQueue->full()) { SDRThreadIQData *dataOut = buffers.getBuffer(); if (iq_swap.load()) { @@ -212,7 +222,16 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { dataOut->dcCorrected = hasHardwareDC.load(); dataOut->numChannels = numChannels.load(); - iqDataOutQueue->push(dataOut); + if (!iqDataOutQueue->push(dataOut)) { + //The rest of the system saturates, + //finally the push didn't suceeded, recycle dataOut immediatly. + dataOut->setRefCount(0); + + std::cout << "SDRThread::readStream(): iqDataOutQueue output queue is full, discard processing ! " << std::endl; + + //saturation, let a chance to the other threads to consume the existing samples + std::this_thread::yield(); + } } } diff --git a/src/util/ThreadQueue.h b/src/util/ThreadQueue.h index 4d1f901..5d0c52f 100644 --- a/src/util/ThreadQueue.h +++ b/src/util/ThreadQueue.h @@ -68,11 +68,13 @@ public: bool push(const value_type& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) + if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) { + m_condition.notify_all(); return false; + } m_queue.push(item); - m_condition.notify_one(); + m_condition.notify_all(); return true; } @@ -84,11 +86,13 @@ public: bool push(const value_type&& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) + if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) { + m_condition.notify_all(); return false; + } m_queue.push(item); - m_condition.notify_one(); + m_condition.notify_all(); return true; } diff --git a/src/visual/ScopeCanvas.cpp b/src/visual/ScopeCanvas.cpp index 2aa6e22..b289703 100644 --- a/src/visual/ScopeCanvas.cpp +++ b/src/visual/ScopeCanvas.cpp @@ -97,11 +97,10 @@ bool ScopeCanvas::getShowDb() { void ScopeCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { wxPaintDC dc(this); const wxSize ClientSize = GetClientSize(); - - while (!inputData.empty()) { - ScopeRenderData *avData; - inputData.pop(avData); - + + ScopeRenderData *avData; + while (inputData.try_pop(avData)) { + if (!avData->spectrum) { scopePanel.setMode(avData->mode); diff --git a/src/visual/SpectrumCanvas.cpp b/src/visual/SpectrumCanvas.cpp index 678b0c1..8a31c85 100644 --- a/src/visual/SpectrumCanvas.cpp +++ b/src/visual/SpectrumCanvas.cpp @@ -51,11 +51,9 @@ void SpectrumCanvas::OnPaint(wxPaintEvent& WXUNUSED(event)) { wxPaintDC dc(this); const wxSize ClientSize = GetClientSize(); - if (!visualDataQueue.empty()) { - SpectrumVisualData *vData; - - visualDataQueue.pop(vData); - + SpectrumVisualData *vData; + if (visualDataQueue.try_pop(vData)) { + if (vData) { spectrumPanel.setPoints(vData->spectrum_points); spectrumPanel.setPeakPoints(vData->spectrum_hold_points); diff --git a/src/visual/WaterfallCanvas.cpp b/src/visual/WaterfallCanvas.cpp index 5d0e208..26816f4 100644 --- a/src/visual/WaterfallCanvas.cpp +++ b/src/visual/WaterfallCanvas.cpp @@ -95,8 +95,8 @@ void WaterfallCanvas::processInputQueue() { if (lpsIndex >= targetVis) { while (lpsIndex >= targetVis) { SpectrumVisualData *vData; - if (!visualDataQueue.empty()) { - visualDataQueue.pop(vData); + + if (visualDataQueue.try_pop(vData)) { if (vData) { if (vData->spectrum_points.size() == fft_size * 2) { @@ -912,11 +912,13 @@ void WaterfallCanvas::updateCenterFrequency(long long freq) { void WaterfallCanvas::setLinesPerSecond(int lps) { std::lock_guard < std::mutex > lock(tex_update); + linesPerSecond = lps; - while (!visualDataQueue.empty()) { - SpectrumVisualData *vData; - visualDataQueue.pop(vData); + //empty all + SpectrumVisualData *vData; + while (visualDataQueue.try_pop(vData)) { + if (vData) { vData->decRefCount(); }