diff --git a/src/AppFrame.cpp b/src/AppFrame.cpp index f44105f..a5b15ff 100644 --- a/src/AppFrame.cpp +++ b/src/AppFrame.cpp @@ -1645,6 +1645,7 @@ bool AppFrame::loadSession(std::string fileName) { } wxGetApp().getDemodMgr().setActiveDemodulator(nullptr, false); + wxGetApp().getDemodMgr().terminateAll(); try { diff --git a/src/CubicSDR.cpp b/src/CubicSDR.cpp index e30b1bc..a87d927 100644 --- a/src/CubicSDR.cpp +++ b/src/CubicSDR.cpp @@ -254,6 +254,7 @@ bool CubicSDR::OnInit() { sdrPostThread = new SDRPostThread(); sdrPostThread->setInputQueue("IQDataInput", pipeSDRIQData); + sdrPostThread->setOutputQueue("IQVisualDataOutput", pipeIQVisualData); sdrPostThread->setOutputQueue("IQDataOutput", pipeWaterfallIQVisualData); sdrPostThread->setOutputQueue("IQActiveDemodVisualDataOutput", pipeDemodIQVisualData); diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index cd49d1b..f1c2038 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -8,6 +8,7 @@ #include #include + std::map AudioThread::deviceController; std::map AudioThread::deviceSampleRate; std::map AudioThread::deviceThread; @@ -19,15 +20,11 @@ AudioThread::AudioThread() : IOThread(), underflowCount.store(0); active.store(false); outputDevice.store(-1); - gain.store(1.0); - - vBoundThreads = new std::vector; - boundThreads.store(vBoundThreads); + gain = 1.0; } AudioThread::~AudioThread() { - boundThreads.store(nullptr); - delete vBoundThreads; + } std::recursive_mutex & AudioThread::getMutex() @@ -39,8 +36,8 @@ void AudioThread::bindThread(AudioThread *other) { std::lock_guard lock(m_mutex); - if (std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other) == boundThreads.load()->end()) { - boundThreads.load()->push_back(other); + if (std::find(boundThreads.begin(), boundThreads.end(), other) == boundThreads.end()) { + boundThreads.push_back(other); } } @@ -49,9 +46,9 @@ void AudioThread::removeThread(AudioThread *other) { std::lock_guard lock(m_mutex); std::vector::iterator i; - i = std::find(boundThreads.load()->begin(), boundThreads.load()->end(), other); - if (i != boundThreads.load()->end()) { - boundThreads.load()->erase(i); + i = std::find(boundThreads.begin(), boundThreads.end(), other); + if (i != boundThreads.end()) { + boundThreads.erase(i); } } @@ -85,7 +82,7 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl; } - if (src->boundThreads.load()->empty()) { + if (src->boundThreads.empty()) { return 0; } @@ -93,9 +90,9 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned double peak = 0.0; //for all boundThreads - for (size_t j = 0; j < src->boundThreads.load()->size(); j++) { + for (size_t j = 0; j < src->boundThreads.size(); j++) { - AudioThread *srcmix = (*(src->boundThreads.load()))[j]; + AudioThread *srcmix = src->boundThreads[j]; //lock every single boundThread srcmix in succession the time we process //its audio samples. @@ -276,6 +273,7 @@ void AudioThread::enumerateDevices(std::vector &devs) { } void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { + if (deviceController.find(deviceId) != deviceController.end()) { AudioThreadCommand refreshDevice; @@ -286,16 +284,17 @@ void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { } void AudioThread::setSampleRate(int sampleRate) { + + std::lock_guard lock(m_mutex); + if (deviceController[outputDevice.load()] == this) { deviceSampleRate[outputDevice.load()] = sampleRate; dac.stopStream(); dac.closeStream(); - - std::lock_guard lock(m_mutex); - for (size_t j = 0; j < boundThreads.load()->size(); j++) { - AudioThread *srcmix = (*(boundThreads.load()))[j]; + for (size_t j = 0; j < boundThreads.size(); j++) { + AudioThread *srcmix = boundThreads[j]; srcmix->setSampleRate(sampleRate); } @@ -318,10 +317,15 @@ void AudioThread::setSampleRate(int sampleRate) { } int AudioThread::getSampleRate() { + std::lock_guard lock(m_mutex); + return this->sampleRate; } void AudioThread::setupDevice(int deviceId) { + + std::lock_guard lock(m_mutex); + parameters.deviceId = deviceId; parameters.nChannels = 2; parameters.firstChannel = 0; @@ -373,6 +377,8 @@ void AudioThread::setupDevice(int deviceId) { } int AudioThread::getOutputDevice() { + std::lock_guard lock(m_mutex); + if (outputDevice == -1) { return dac.getDefaultOutputDevice(); } @@ -380,6 +386,9 @@ int AudioThread::getOutputDevice() { } void AudioThread::setInitOutputDevice(int deviceId, int sampleRate) { + + std::lock_guard lock(m_mutex); + outputDevice = deviceId; if (sampleRate == -1) { if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) { @@ -415,6 +424,7 @@ void AudioThread::run() { //Infinite loop, witing for commands or for termination while (!stopping) { AudioThreadCommand command; + cmdQueue.pop(command); if (command.cmd == AudioThreadCommand::AUDIO_THREAD_CMD_SET_DEVICE) { @@ -470,10 +480,14 @@ void AudioThread::terminate() { } bool AudioThread::isActive() { + std::lock_guard lock(m_mutex); + return active; } void AudioThread::setActive(bool state) { + + std::lock_guard lock(m_mutex); AudioThreadInput *dummy; if (state && !active && inputQueue) { @@ -500,6 +514,9 @@ AudioThreadCommandQueue *AudioThread::getCommandQueue() { } void AudioThread::setGain(float gain_in) { + + std::lock_guard lock(m_mutex); + if (gain < 0.0) { gain = 0.0; } @@ -510,5 +527,8 @@ void AudioThread::setGain(float gain_in) { } float AudioThread::getGain() { + + std::lock_guard lock(m_mutex); + return gain; } diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index 61008f9..65388c7 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -57,7 +57,7 @@ public: std::atomic_bool initialized; std::atomic_bool active; std::atomic_int outputDevice; - std::atomic gain; + float gain; AudioThread(); ~AudioThread(); @@ -88,7 +88,7 @@ private: AudioThreadCommandQueue cmdQueue; int sampleRate; - //The own m_mutex protecting this AudioThread + //The own m_mutex protecting this AudioThread, in particular boundThreads std::recursive_mutex m_mutex; public: @@ -103,7 +103,8 @@ public: static std::map deviceThread; static void deviceCleanup(); static void setDeviceSampleRate(int deviceId, int sampleRate); - std::atomic *> boundThreads; - std::vector *vBoundThreads; + + //protected by m_mutex + std::vector boundThreads; }; diff --git a/src/demod/DemodulatorInstance.cpp b/src/demod/DemodulatorInstance.cpp index 4852f9c..5b6c37f 100644 --- a/src/demod/DemodulatorInstance.cpp +++ b/src/demod/DemodulatorInstance.cpp @@ -152,7 +152,6 @@ bool DemodulatorInstance::isTerminated() { bool demodTerminated = demodulatorThread->isTerminated(); bool preDemodTerminated = demodulatorPreThread->isTerminated(); - //Cleanup the worker threads, if the threads are indeed terminated if (audioTerminated) { @@ -168,7 +167,6 @@ bool DemodulatorInstance::isTerminated() { if (demodTerminated) { if (t_Demod) { - #ifdef __APPLE__ pthread_join(t_Demod, nullptr); #else @@ -185,8 +183,8 @@ bool DemodulatorInstance::isTerminated() { } if (preDemodTerminated) { - - if (t_PreDemod) { + + if (t_PreDemod) { #ifdef __APPLE__ pthread_join(t_PreDemod, NULL); @@ -195,10 +193,9 @@ bool DemodulatorInstance::isTerminated() { delete t_PreDemod; #endif t_PreDemod = nullptr; - } + } } - bool terminated = audioTerminated && demodTerminated && preDemodTerminated; return terminated; diff --git a/src/demod/DemodulatorMgr.cpp b/src/demod/DemodulatorMgr.cpp index fd2abb9..61be65c 100644 --- a/src/demod/DemodulatorMgr.cpp +++ b/src/demod/DemodulatorMgr.cpp @@ -136,13 +136,13 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) { i = std::find(demods.begin(), demods.end(), demod); if (activeDemodulator == demod) { - activeDemodulator = NULL; + activeDemodulator = nullptr; } if (lastActiveDemodulator == demod) { - lastActiveDemodulator = NULL; + lastActiveDemodulator = nullptr; } if (activeVisualDemodulator == demod) { - activeVisualDemodulator = NULL; + activeVisualDemodulator = nullptr; } if (i != demods.end()) { @@ -150,6 +150,7 @@ void DemodulatorMgr::deleteThread(DemodulatorInstance *demod) { } //Ask for termination + demod->setActive(false); demod->terminate(); //Do not cleanup immediatly @@ -200,27 +201,28 @@ bool DemodulatorMgr::anyDemodulatorsAt(long long freq, int bandwidth) { void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool temporary) { - std::lock_guard < std::recursive_mutex > lock(demods_busy); + if (!temporary) { - if (activeDemodulator != NULL) { - lastActiveDemodulator = activeDemodulator; + if (activeDemodulator.load() != nullptr) { + lastActiveDemodulator = activeDemodulator.load(); updateLastState(); } else { lastActiveDemodulator = demod; } updateLastState(); #if USE_HAMLIB - if (wxGetApp().rigIsActive() && wxGetApp().getRigThread()->getFollowModem() && lastActiveDemodulator) { - wxGetApp().getRigThread()->setFrequency(lastActiveDemodulator->getFrequency(),true); + if (wxGetApp().rigIsActive() && wxGetApp().getRigThread()->getFollowModem() && lastActiveDemodulator.load()) { + wxGetApp().getRigThread()->setFrequency(lastActiveDemodulator.load()->getFrequency(),true); } #endif } else { + std::lock_guard < std::recursive_mutex > lock(demods_busy); garbageCollect(); ReBufferGC::garbageCollect(); } - if (activeVisualDemodulator) { - activeVisualDemodulator->setVisualOutputQueue(NULL); + if (activeVisualDemodulator.load()) { + activeVisualDemodulator.load()->setVisualOutputQueue(nullptr); } if (demod) { demod->setVisualOutputQueue(wxGetApp().getAudioVisualQueue()); @@ -238,7 +240,7 @@ void DemodulatorMgr::setActiveDemodulator(DemodulatorInstance *demod, bool tempo } DemodulatorInstance *DemodulatorMgr::getActiveDemodulator() { - if (activeDemodulator && !activeDemodulator->isActive()) { + if (activeDemodulator.load() && !activeDemodulator.load()->isActive()) { activeDemodulator = getLastActiveDemodulator(); } return activeDemodulator; @@ -262,8 +264,6 @@ void DemodulatorMgr::garbageCollect() { std::cout << "Garbage collected demodulator instance " << deleted->getLabel() << std::endl; delete deleted; - - return; } } @@ -273,27 +273,28 @@ void DemodulatorMgr::garbageCollect() { void DemodulatorMgr::updateLastState() { std::lock_guard < std::recursive_mutex > lock(demods_busy); + if (std::find(demods.begin(), demods.end(), lastActiveDemodulator) == demods.end()) { - if (activeDemodulator && activeDemodulator->isActive()) { - lastActiveDemodulator = activeDemodulator; - } else if (activeDemodulator && !activeDemodulator->isActive()){ - activeDemodulator = NULL; - lastActiveDemodulator = NULL; + if (activeDemodulator.load() && activeDemodulator.load()->isActive()) { + lastActiveDemodulator = activeDemodulator.load(); + } else if (activeDemodulator.load() && !activeDemodulator.load()->isActive()){ + activeDemodulator = nullptr; + lastActiveDemodulator = nullptr; } } - if (lastActiveDemodulator && !lastActiveDemodulator->isActive()) { - lastActiveDemodulator = NULL; + if (lastActiveDemodulator.load() && !lastActiveDemodulator.load()->isActive()) { + lastActiveDemodulator = nullptr; } - if (lastActiveDemodulator) { - lastBandwidth = lastActiveDemodulator->getBandwidth(); - lastDemodType = lastActiveDemodulator->getDemodulatorType(); - lastDemodLock = lastActiveDemodulator->getDemodulatorLock()?true:false; - lastSquelchEnabled = lastActiveDemodulator->isSquelchEnabled(); - lastSquelch = lastActiveDemodulator->getSquelchLevel(); - lastGain = lastActiveDemodulator->getGain(); - lastModemSettings[lastDemodType] = lastActiveDemodulator->readModemSettings(); + if (lastActiveDemodulator.load()) { + lastBandwidth = lastActiveDemodulator.load()->getBandwidth(); + lastDemodType = lastActiveDemodulator.load()->getDemodulatorType(); + lastDemodLock = lastActiveDemodulator.load()->getDemodulatorLock()?true:false; + lastSquelchEnabled = lastActiveDemodulator.load()->isSquelchEnabled(); + lastSquelch = lastActiveDemodulator.load()->getSquelchLevel(); + lastGain = lastActiveDemodulator.load()->getGain(); + lastModemSettings[lastDemodType] = lastActiveDemodulator.load()->readModemSettings(); } } diff --git a/src/demod/DemodulatorMgr.h b/src/demod/DemodulatorMgr.h index 09b72ce..a993f84 100644 --- a/src/demod/DemodulatorMgr.h +++ b/src/demod/DemodulatorMgr.h @@ -53,15 +53,17 @@ public: void setLastModemSettings(std::string, ModemSettings); void updateLastState(); - + private: + void garbageCollect(); std::vector demods; std::vector demods_deleted; - DemodulatorInstance *activeDemodulator; - DemodulatorInstance *lastActiveDemodulator; - DemodulatorInstance *activeVisualDemodulator; + + std::atomic activeDemodulator; + std::atomic lastActiveDemodulator; + std::atomic activeVisualDemodulator; int lastBandwidth; std::string lastDemodType; diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index da18d32..e4286fe 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -66,6 +66,7 @@ void DemodulatorPreThread::run() { while (!stopping) { DemodulatorThreadIQData *inp; + iqInputQueue->pop(inp); if (frequencyChanged.load()) { @@ -205,7 +206,11 @@ void DemodulatorPreThread::run() { resamp->modemKit = cModemKit; resamp->sampleRate = currentBandwidth; - iqOutputQueue->push(resamp); + if (!iqOutputQueue->push(resamp)) { + resamp->setRefCount(0); + std::cout << "DemodulatorPreThread::run() cannot push resamp into iqOutputQueue, is full !" << std::endl; + std::this_thread::yield(); + } } inp->decRefCount(); @@ -335,7 +340,10 @@ int DemodulatorPreThread::getAudioSampleRate() { void DemodulatorPreThread::terminate() { IOThread::terminate(); DemodulatorThreadIQData *inp = new DemodulatorThreadIQData; // push dummy to nudge queue - iqInputQueue->push(inp); + if (!iqInputQueue->push(inp)) { + delete inp; + } + DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_NULL); workerQueue->push(command); diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index bc2fb9f..e2853a7 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -74,6 +74,7 @@ void DemodulatorThread::run() { while (!stopping) { DemodulatorThreadPostIQData *inp; + iqInputQueue->pop(inp); // std::lock_guard < std::mutex > lock(inp->m_mutex); @@ -238,13 +239,23 @@ void DemodulatorThread::run() { ati_vis->type = 0; } - localAudioVisOutputQueue->push(ati_vis); + if (!localAudioVisOutputQueue->push(ati_vis)) { + ati_vis->setRefCount(0); + std::cout << "DemodulatorThread::run() cannot push ati_vis into localAudioVisOutputQueue, is full !" << std::endl; + std::this_thread::yield(); + } } if (ati != nullptr) { if (!muted.load() && (!wxGetApp().getSoloMode() || (demodInstance == wxGetApp().getDemodMgr().getLastActiveDemodulator()))) { - audioOutputQueue->push(ati); + + if (!audioOutputQueue->push(ati)) { + ati->decRefCount(); + std::cout << "DemodulatorThread::run() cannot push ati into audioOutputQueue, is full !" << std::endl; + std::this_thread::yield(); + } + } else { ati->setRefCount(0); } @@ -297,7 +308,9 @@ void DemodulatorThread::run() { void DemodulatorThread::terminate() { IOThread::terminate(); DemodulatorThreadPostIQData *inp = new DemodulatorThreadPostIQData; // push dummy to nudge queue - iqInputQueue->push(inp); + if (!iqInputQueue->push(inp)) { + delete inp; + } } bool DemodulatorThread::isMuted() { diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index a2a33a5..2b3bf2c 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -86,11 +86,10 @@ protected: output->setRefCount(outputs.size()); for (outputs_i = outputs.begin(); outputs_i != outputs.end(); outputs_i++) { - if ((*outputs_i)->full()) { + + if (!(*outputs_i)->push(output)) { output->decRefCount(); - } else { - (*outputs_i)->push(output); - } + } } } @@ -107,12 +106,16 @@ template class VisualDataDistributor : public VisualProcessor { protected: void process() { - while (!VisualProcessor::input->empty()) { + OutputDataType *inp; + while (VisualProcessor::input->try_pop(inp)) { + if (!VisualProcessor::isAnyOutputEmpty()) { + if (inp) { + inp->decRefCount(); + } return; } - OutputDataType *inp; - VisualProcessor::input->pop(inp); + if (inp) { VisualProcessor::distribute(inp); } @@ -125,12 +128,15 @@ template class VisualDataReDistributor : public VisualProcessor { protected: void process() { - while (!VisualProcessor::input->empty()) { + OutputDataType *inp; + while (VisualProcessor::input->try_pop(inp)) { + if (!VisualProcessor::isAnyOutputEmpty()) { + if (inp) { + inp->decRefCount(); + } return; } - OutputDataType *inp; - VisualProcessor::input->pop(inp); if (inp) { OutputDataType *outp = buffers.getBuffer(); diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index 2a2c6a7..40cda6d 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -86,7 +86,7 @@ void SDRPostThread::updateActiveDemodulators() { nRunDemods = 0; long long centerFreq = wxGetApp().getFrequency(); - + for (demod_i = demodulators.begin(); demod_i != demodulators.end(); demod_i++) { DemodulatorInstance *demod = *demod_i; DemodulatorThreadInputQueue *demodQueue = demod->getIQInputDataPipe(); @@ -108,7 +108,9 @@ void SDRPostThread::updateActiveDemodulators() { DemodulatorThreadIQData *dummyDataOut = new DemodulatorThreadIQData; dummyDataOut->frequency = frequency; dummyDataOut->sampleRate = sampleRate; - demodQueue->push(dummyDataOut); + if (!demodQueue->push(dummyDataOut)) { + delete dummyDataOut; + } } // follow if follow mode @@ -119,6 +121,7 @@ void SDRPostThread::updateActiveDemodulators() { } else if (!demod->isActive()) { // in range, activate if not activated demod->setActive(true); if (wxGetApp().getDemodMgr().getLastActiveDemodulator() == NULL) { + wxGetApp().getDemodMgr().setActiveDemodulator(demod); } } @@ -181,8 +184,6 @@ void SDRPostThread::run() { iqDataOutQueue = static_cast(getOutputQueue("IQDataOutput")); iqVisualQueue = static_cast(getOutputQueue("IQVisualDataOutput")); iqActiveDemodVisualQueue = static_cast(getOutputQueue("IQActiveDemodVisualDataOutput")); - - iqDataInQueue->set_max_num_items(0); while (!stopping) { SDRThreadIQData *data_in; @@ -212,17 +213,16 @@ void SDRPostThread::run() { } } + //Only update the list of demodulators here if (doUpdate) { updateActiveDemodulators(); } } //end while - //TODO: Why only 1 element was removed before ? + //Be safe, remove as many elements as possible DemodulatorThreadIQData *visualDataDummy; while (iqVisualQueue && iqVisualQueue->try_pop(visualDataDummy)) { - //nothing - //TODO: What about the refcounts ? - + visualDataDummy->decRefCount(); } // buffers.purge(); @@ -234,7 +234,9 @@ void SDRPostThread::run() { void SDRPostThread::terminate() { IOThread::terminate(); SDRThreadIQData *dummy = new SDRThreadIQData; - iqDataInQueue->push(dummy); + if (!iqDataInQueue->push(dummy)) { + delete dummy; + } } void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { @@ -295,19 +297,34 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { iirfilt_crcf_execute_block(dcFilter, &data_in->data[0], dataSize, &demodDataOut->data[0]); if (doDemodVisOut) { - iqActiveDemodVisualQueue->push(demodDataOut); + if (!iqActiveDemodVisualQueue->push(demodDataOut)) { + demodDataOut->decRefCount(); + std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl; + std::this_thread::yield(); + } } if (doIQDataOut) { - iqDataOutQueue->push(demodDataOut); + if (!iqDataOutQueue->push(demodDataOut)) { + demodDataOut->decRefCount(); + std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqDataOutQueue, is full !" << std::endl; + std::this_thread::yield(); + } } if (doVisOut) { - iqVisualQueue->push(demodDataOut); + if (!iqVisualQueue->push(demodDataOut)) { + demodDataOut->decRefCount(); + std::cout << "SDRPostThread::runSingleCH() cannot push demodDataOut into iqVisualQueue, is full !" << std::endl; + std::this_thread::yield(); + } } for (size_t i = 0; i < nRunDemods; i++) { - runDemods[i]->getIQInputDataPipe()->push(demodDataOut); + if (!runDemods[i]->getIQInputDataPipe()->push(demodDataOut)) { + demodDataOut->decRefCount(); + std::this_thread::yield(); + } } } } @@ -345,9 +362,19 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { iqDataOut->sampleRate = data_in->sampleRate; iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize); - iqDataOutQueue->push(iqDataOut); - if (doVis) { - iqVisualQueue->push(iqDataOut); + if (!iqDataOutQueue->push(iqDataOut)) { + std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqDataOutQueue, is full !" << std::endl; + iqDataOut->decRefCount(); + std::this_thread::yield(); + } + + + if (doVis) { + if (!iqVisualQueue->push(iqDataOut)) { + std::cout << "SDRPostThread::runPFBCH() cannot push iqDataOut into iqVisualQueue, is full !" << std::endl; + iqDataOut->decRefCount(); + std::this_thread::yield(); + } } } @@ -443,13 +470,21 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { } if (doDemodVis) { - iqActiveDemodVisualQueue->push(demodDataOut); + if (!iqActiveDemodVisualQueue->push(demodDataOut)) { + std::cout << "SDRPostThread::runPFBCH() cannot push demodDataOut into iqActiveDemodVisualQueue, is full !" << std::endl; + demodDataOut->decRefCount(); + std::this_thread::yield(); + } } for (size_t j = 0; j < nRunDemods; j++) { if (demodChannel[j] == i) { DemodulatorInstance *demod = runDemods[j]; - demod->getIQInputDataPipe()->push(demodDataOut); + + if (!demod->getIQInputDataPipe()->push(demodDataOut)) { + demodDataOut->decRefCount(); + std::this_thread::yield(); + } } } } diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index 81f4df2..a413cf4 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -204,7 +204,7 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { } } - if (n_read > 0 && !stopping) { + if (n_read > 0 && !stopping && !iqDataOutQueue->full()) { SDRThreadIQData *dataOut = buffers.getBuffer(); if (iq_swap.load()) { @@ -222,7 +222,16 @@ void SDRThread::readStream(SDRThreadIQDataQueue* iqDataOutQueue) { dataOut->dcCorrected = hasHardwareDC.load(); dataOut->numChannels = numChannels.load(); - iqDataOutQueue->push(dataOut); + if (!iqDataOutQueue->push(dataOut)) { + //The rest of the system saturates, + //finally the push didn't suceeded, recycle dataOut immediatly. + dataOut->setRefCount(0); + + std::cout << "SDRThread::readStream(): iqDataOutQueue output queue is full, discard processing ! " << std::endl; + + //saturation, let a chance to the other threads to consume the existing samples + std::this_thread::yield(); + } } } diff --git a/src/util/ThreadQueue.h b/src/util/ThreadQueue.h index 4d1f901..5d0c52f 100644 --- a/src/util/ThreadQueue.h +++ b/src/util/ThreadQueue.h @@ -68,11 +68,13 @@ public: bool push(const value_type& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) + if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) { + m_condition.notify_all(); return false; + } m_queue.push(item); - m_condition.notify_one(); + m_condition.notify_all(); return true; } @@ -84,11 +86,13 @@ public: bool push(const value_type&& item) { std::lock_guard < std::mutex > lock(m_mutex); - if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) + if (m_max_num_items.load() > 0 && m_queue.size() > m_max_num_items.load()) { + m_condition.notify_all(); return false; + } m_queue.push(item); - m_condition.notify_one(); + m_condition.notify_all(); return true; }