diff --git a/src/demod/DemodulatorThread.h b/src/demod/DemodulatorThread.h index c713596..bc4e2b0 100644 --- a/src/demod/DemodulatorThread.h +++ b/src/demod/DemodulatorThread.h @@ -25,8 +25,8 @@ public: void onBindOutput(std::string name, ThreadQueueBasePtr threadQueue); - void run(); - void terminate(); + virtual void run(); + virtual void terminate(); void setMuted(bool state); bool isMuted(); diff --git a/src/process/FFTVisualDataThread.cpp b/src/process/FFTVisualDataThread.cpp index e302c1b..0dd398e 100644 --- a/src/process/FFTVisualDataThread.cpp +++ b/src/process/FFTVisualDataThread.cpp @@ -84,3 +84,9 @@ void FFTVisualDataThread::run() { // std::cout << "FFT visual data thread done." << std::endl; } +void FFTVisualDataThread::terminate() { + IOThread::terminate(); + fftDistrib.flushQueues(); + wproc.flushQueues(); +} + diff --git a/src/process/FFTVisualDataThread.h b/src/process/FFTVisualDataThread.h index 70b31a9..549950b 100644 --- a/src/process/FFTVisualDataThread.h +++ b/src/process/FFTVisualDataThread.h @@ -17,6 +17,8 @@ public: SpectrumVisualProcessor *getProcessor(); virtual void run(); + + virtual void terminate(); protected: FFTDataDistributor fftDistrib; diff --git a/src/process/SpectrumVisualDataThread.cpp b/src/process/SpectrumVisualDataThread.cpp index 1e954a2..5841f52 100644 --- a/src/process/SpectrumVisualDataThread.cpp +++ b/src/process/SpectrumVisualDataThread.cpp @@ -28,3 +28,7 @@ void SpectrumVisualDataThread::run() { // std::cout << "Spectrum visual data thread done." << std::endl; } +void SpectrumVisualDataThread::terminate() { + IOThread::terminate(); + sproc.flushQueues(); +} \ No newline at end of file diff --git a/src/process/SpectrumVisualDataThread.h b/src/process/SpectrumVisualDataThread.h index c6efc6c..0c3eccf 100644 --- a/src/process/SpectrumVisualDataThread.h +++ b/src/process/SpectrumVisualDataThread.h @@ -13,6 +13,8 @@ public: SpectrumVisualProcessor *getProcessor(); virtual void run(); + + virtual void terminate(); protected: SpectrumVisualProcessor sproc; diff --git a/src/process/VisualProcessor.h b/src/process/VisualProcessor.h index 13e8966..6fae52a 100644 --- a/src/process/VisualProcessor.h +++ b/src/process/VisualProcessor.h @@ -82,6 +82,17 @@ public: outputs.erase(i); } } + //Flush all queues, either input or outputs clearing their accumulated messages. + //this is purposefully non-blocking call. + void flushQueues() { + //DO NOT take the busy_update, we want a never blocking op how imperfect it could be. + input->flush(); + + for (auto single_output : outputs) { + + single_output->flush(); + } + } //Call process() repeateadly until all available 'input' data is consumed. void run() { diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index ecd1d54..7513612 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -564,6 +564,15 @@ void SDRThread::run() { std::cout << "SDR thread done." << std::endl; } +void SDRThread::terminate() { + IOThread::terminate(); + + SDRThreadIQDataQueuePtr iqDataOutQueue = std::static_pointer_cast(getOutputQueue("IQDataOutput")); + + if (iqDataOutQueue != nullptr) { + iqDataOutQueue->flush(); + } +} SDRDeviceInfo *SDRThread::getDevice() { return deviceInfo.load(); diff --git a/src/sdr/SoapySDRThread.h b/src/sdr/SoapySDRThread.h index 83472ff..c72c5a9 100644 --- a/src/sdr/SoapySDRThread.h +++ b/src/sdr/SoapySDRThread.h @@ -61,6 +61,7 @@ public: enum SDRThreadState { SDR_THREAD_MESSAGE, SDR_THREAD_INITIALIZED, SDR_THREAD_FAILED}; virtual void run(); + virtual void terminate(); SDRDeviceInfo *getDevice(); void setDevice(SDRDeviceInfo *dev);