diff --git a/src/CubicSDR.cpp b/src/CubicSDR.cpp index 4e4931c..1abcaee 100644 --- a/src/CubicSDR.cpp +++ b/src/CubicSDR.cpp @@ -393,18 +393,19 @@ int CubicSDR::OnExit() { std::cout << "Terminating SDR thread.." << std::endl; sdrThread->terminate(); sdrThread->isTerminated(3000); - - if (t_SDR) { - t_SDR->join(); - delete t_SDR; - t_SDR = nullptr; - } - + std::cout << "Terminating SDR post-processing thread.." << std::endl; sdrPostThread->terminate(); + //Wait for termination for sdrPostThread second:: since it is doing + //mostly blocking push() to the other threads, they must stay alive + //so that sdrPostThread can complete a processing loop and die. + sdrPostThread->isTerminated(3000); + std::cout << "Terminating All Demodulators.." << std::endl; demodMgr.terminateAll(); + //wait for effective death of all demodulators before continuing. + demodMgr.garbageCollect(true); std::cout << "Terminating Visual Processor threads.." << std::endl; spectrumVisualThread->terminate(); @@ -413,18 +414,28 @@ int CubicSDR::OnExit() { } //Wait nicely - sdrPostThread->isTerminated(1000); spectrumVisualThread->isTerminated(1000); if (demodVisualThread) { demodVisualThread->isTerminated(1000); } - //Then join the thread themselves + //Then join the thread themselves: + if (t_SDR) { + t_SDR->join(); + } + t_PostSDR->join(); - if (t_DemodVisual) t_DemodVisual->join(); + + if (t_DemodVisual) { + t_DemodVisual->join(); + } + t_SpectrumVisual->join(); - //Now only we can delete + //Now only we can delete: + delete t_SDR; + t_SDR = nullptr; + delete sdrThread; sdrThread = nullptr; diff --git a/src/demod/DemodulatorMgr.cpp b/src/demod/DemodulatorMgr.cpp index a17bf75..2eeffb1 100644 --- a/src/demod/DemodulatorMgr.cpp +++ b/src/demod/DemodulatorMgr.cpp @@ -278,29 +278,36 @@ DemodulatorInstance *DemodulatorMgr::getLastDemodulatorWith(const std::string& t return nullptr; } -void DemodulatorMgr::garbageCollect() { +void DemodulatorMgr::garbageCollect(boolean forcedGC) { std::lock_guard < std::mutex > lock(deleted_demods_busy); - std::vector::iterator it = demods_deleted.begin(); + while (!demods_deleted.empty()) { - while (it != demods_deleted.end()) { + std::vector::iterator it = demods_deleted.begin(); + //make 1 pass over + while (it != demods_deleted.end()) { - if ((*it)->isTerminated()) { + if ((*it)->isTerminated()) { - DemodulatorInstance *deleted = (*it); + DemodulatorInstance *deleted = (*it); - std::cout << "Garbage collected demodulator instance '" << deleted->getLabel() << "'... " << std::endl << std::flush; - demods_deleted.erase(it); - delete deleted; + std::cout << "Garbage collected demodulator instance '" << deleted->getLabel() << "'... " << std::endl << std::flush; + it = demods_deleted.erase(it); + delete deleted; - //only garbage collect 1 demod at a time. - return; - } - else { - it++; - } - } //end while + //only garbage collect 1 demod at a time. + if (!forcedGC) { + return; + } + } + else { + it++; + } + } //end while + //stupid busy-wait loop + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } //end while not empty } void DemodulatorMgr::updateLastState() { diff --git a/src/demod/DemodulatorMgr.h b/src/demod/DemodulatorMgr.h index d9db719..42032dc 100644 --- a/src/demod/DemodulatorMgr.h +++ b/src/demod/DemodulatorMgr.h @@ -69,7 +69,11 @@ public: DemodulatorInstance *loadInstance(DataNode *node); //to be called periodically to cleanup removed demodulators. - void garbageCollect(); + //if forcedGC = true, the methods waits until + //all deleted demodulators are effectively GCed. + //else: (default) the method test for effective termination + //and GC one demod per call. + void garbageCollect(boolean forcedGC = false); private: diff --git a/src/demod/DemodulatorPreThread.cpp b/src/demod/DemodulatorPreThread.cpp index f7a6217..5e90785 100644 --- a/src/demod/DemodulatorPreThread.cpp +++ b/src/demod/DemodulatorPreThread.cpp @@ -282,7 +282,7 @@ void DemodulatorPreThread::run() { iqOutputQueue->flush(); - buffers.purge(); + iqInputQueue->flush(); } void DemodulatorPreThread::setDemodType(std::string demodType) { diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 5767c08..b687d44 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -342,8 +342,6 @@ void DemodulatorThread::run() { // Purge any unused inputs, with a non-blocking pop iqInputQueue->flush(); audioOutputQueue->flush(); - - outputBuffers.purge(); // std::cout << "Demodulator thread done." << std::endl; } diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index b7e6daa..bcab93e 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -33,7 +33,7 @@ void DemodulatorWorkerThread::run() { //Beware of the subtility here, //we are waiting for the first command to show up (blocking!) //then consuming the commands until done. - while (!done) { + while (!done && !stopping) { if (!commandQueue->pop(command, HEARTBEAT_CHECK_PERIOD_MICROS)) { continue; } @@ -51,7 +51,7 @@ void DemodulatorWorkerThread::run() { break; } done = commandQueue->empty(); - } + } //end while done. if ((makeDemod || filterChanged) && !stopping) { DemodulatorWorkerThreadResult result(DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS); diff --git a/src/process/FFTVisualDataThread.cpp b/src/process/FFTVisualDataThread.cpp index c0c820e..cfd8c08 100644 --- a/src/process/FFTVisualDataThread.cpp +++ b/src/process/FFTVisualDataThread.cpp @@ -76,6 +76,9 @@ void FFTVisualDataThread::run() { wproc.run(); } } + + pipeIQDataIn->flush(); + pipeFFTDataOut->flush(); // std::cout << "FFT visual data thread done." << std::endl; } diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index 92283ca..ad561d0 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -225,8 +225,9 @@ void SDRPostThread::run() { iqVisualQueue->flush(); } - // buffers.purge(); - // visualDataBuffers.purge(); + iqDataInQueue->flush(); + iqDataOutQueue->flush(); + iqActiveDemodVisualQueue->flush(); // std::cout << "SDR post-processing thread done." << std::endl; } diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index a24fa98..23d1488 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -377,8 +377,7 @@ void SDRThread::readLoop() { updateSettings(); readStream(iqDataOutQueue); } - - buffers.purge(); + iqDataOutQueue->flush(); } void SDRThread::updateGains() {