diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index a2bd35d..00010f1 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -9,6 +9,9 @@ //50 ms #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) +//1s +#define MAX_BLOCKING_DURATION_MICROS (1000 * 1000) + DemodulatorWorkerThread::DemodulatorWorkerThread() : IOThread(), commandQueue(nullptr), resultQueue(nullptr), cModem(nullptr), cModemKit(nullptr) { } @@ -108,7 +111,7 @@ void DemodulatorWorkerThread::run() { result.modemName = cModemName; //VSO: blocking push - resultQueue->push(result); + resultQueue->push(result, MAX_BLOCKING_DURATION_MICROS, "resultQueue"); } } // std::cout << "Demodulator worker thread done." << std::endl; diff --git a/src/process/ScopeVisualProcessor.cpp b/src/process/ScopeVisualProcessor.cpp index a1eefac..8b204b3 100644 --- a/src/process/ScopeVisualProcessor.cpp +++ b/src/process/ScopeVisualProcessor.cpp @@ -5,6 +5,10 @@ #include #include +//2s +#define MAX_BLOCKING_DURATION_MICROS (2000 * 1000) + + ScopeVisualProcessor::ScopeVisualProcessor(): outputBuffers("ScopeVisualProcessorBuffers") { scopeEnabled.store(true); spectrumEnabled.store(true); @@ -116,7 +120,7 @@ void ScopeVisualProcessor::process() { } renderData->spectrum = false; - distribute(renderData); + distribute(renderData, MAX_BLOCKING_DURATION_MICROS, "renderData"); } if (spectrumEnabled) { @@ -212,7 +216,7 @@ void ScopeVisualProcessor::process() { renderData->fft_size = fftSize/2; renderData->spectrum = true; - distribute(renderData); + distribute(renderData, MAX_BLOCKING_DURATION_MICROS, "renderData"); } } //end if try_pop() } diff --git a/src/process/SpectrumVisualProcessor.cpp b/src/process/SpectrumVisualProcessor.cpp index de20052..bc29899 100644 --- a/src/process/SpectrumVisualProcessor.cpp +++ b/src/process/SpectrumVisualProcessor.cpp @@ -7,6 +7,10 @@ //50 ms #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) +//2s +#define MAX_BLOCKING_DURATION_MICROS (2000 * 1000) + + SpectrumVisualProcessor::SpectrumVisualProcessor() : outputBuffers("SpectrumVisualProcessorBuffers") { lastInputBandwidth = 0; lastBandwidth = 0; @@ -592,7 +596,7 @@ void SpectrumVisualProcessor::process() { output->centerFreq = centerFreq; output->bandwidth = bandwidth; - distribute(output); + distribute(output, MAX_BLOCKING_DURATION_MICROS, "output"); } } diff --git a/src/sdr/SDRPostThread.cpp b/src/sdr/SDRPostThread.cpp index a332bcb..7ad2b35 100644 --- a/src/sdr/SDRPostThread.cpp +++ b/src/sdr/SDRPostThread.cpp @@ -12,6 +12,9 @@ //50 ms #define HEARTBEAT_CHECK_PERIOD_MICROS (50 * 1000) +//1s +#define MAX_BLOCKING_DURATION_MICROS (1000 * 1000) + SDRPostThread::SDRPostThread() : IOThread(), buffers("SDRPostThreadBuffers"), visualDataBuffers("SDRPostThreadVisualDataBuffers"), frequency(0) { iqDataInQueue = NULL; iqDataOutQueue = NULL; @@ -203,10 +206,6 @@ void SDRPostThread::run() { } } - if (data_in) { - //nothing - } - bool doUpdate = false; for (size_t j = 0; j < nRunDemods; j++) { DemodulatorInstance *demod = runDemods[j]; @@ -296,22 +295,22 @@ void SDRPostThread::runSingleCH(SDRThreadIQData *data_in) { if (doDemodVisOut) { //VSO: blocking push - iqActiveDemodVisualQueue->push(demodDataOut); + iqActiveDemodVisualQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runSingleCH() iqActiveDemodVisualQueue"); } if (doIQDataOut) { //VSO: blocking push - iqDataOutQueue->push(demodDataOut); + iqDataOutQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS,"runSingleCH() iqDataOutQueue"); } if (doVisOut) { //VSO: blocking push - iqVisualQueue->push(demodDataOut); + iqVisualQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runSingleCH() iqVisualQueue"); } for (size_t i = 0; i < nRunDemods; i++) { //VSO: blocking push - runDemods[i]->getIQInputDataPipe()->push(demodDataOut); + runDemods[i]->getIQInputDataPipe()->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runSingleCH() runDemods[i]->getIQInputDataPipe()"); } } } @@ -334,7 +333,7 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { dataOut.resize(outSize); } - if (iqDataOutQueue != NULL && !iqDataOutQueue->full()) { + if (iqDataOutQueue != nullptr && !iqDataOutQueue->full()) { DemodulatorThreadIQDataPtr iqDataOut = visualDataBuffers.getBuffer(); bool doVis = false; @@ -348,11 +347,11 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { iqDataOut->data.assign(data_in->data.begin(), data_in->data.begin() + dataSize); //VSO: blocking push - iqDataOutQueue->push(iqDataOut); + iqDataOutQueue->push(iqDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() iqDataOutQueue"); if (doVis) { //VSO: blocking push - iqVisualQueue->push(iqDataOut); + iqVisualQueue->push(iqDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() iqVisualQueue"); } } @@ -448,14 +447,14 @@ void SDRPostThread::runPFBCH(SDRThreadIQData *data_in) { if (doDemodVis) { //VSO: blocking push - iqActiveDemodVisualQueue->push(demodDataOut); + iqActiveDemodVisualQueue->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() iqActiveDemodVisualQueue"); } for (size_t j = 0; j < nRunDemods; j++) { if (demodChannel[j] == i) { DemodulatorInstance *demod = runDemods[j]; //VSO: blocking push - demod->getIQInputDataPipe()->push(demodDataOut); + demod->getIQInputDataPipe()->push(demodDataOut, MAX_BLOCKING_DURATION_MICROS, "runPFBCH() demod->getIQInputDataPipe()"); } } } diff --git a/src/sdr/SoapySDRThread.cpp b/src/sdr/SoapySDRThread.cpp index f361562..ece5e94 100644 --- a/src/sdr/SoapySDRThread.cpp +++ b/src/sdr/SoapySDRThread.cpp @@ -357,6 +357,7 @@ int SDRThread::readStream(SDRThreadIQDataQueuePtr iqDataOutQueue) { //The rest of the system saturates, //finally the push didn't suceeded. + readStreamCode = -32; std::cout << "SDRThread::readStream(): 3.2 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl; //saturation, let a chance to the other threads to consume the existing samples @@ -364,6 +365,7 @@ int SDRThread::readStream(SDRThreadIQDataQueuePtr iqDataOutQueue) { } } else { + readStreamCode = -31; std::cout << "SDRThread::readStream(): 3.1 iqDataOutQueue output queue is full, discard processing of the batch..." << std::endl; //saturation, let a chance to the other threads to consume the existing samples std::this_thread::yield(); @@ -374,9 +376,7 @@ int SDRThread::readStream(SDRThreadIQDataQueuePtr iqDataOutQueue) { void SDRThread::readLoop() { - - #define STREAM_READ_WATCHDOG_S (2) - + SDRThreadIQDataQueuePtr iqDataOutQueue = std::static_pointer_cast( getOutputQueue("IQDataOutput")); if (iqDataOutQueue == nullptr) { @@ -384,34 +384,13 @@ void SDRThread::readLoop() { } updateGains(); - - auto streamWatchDog = std::chrono::steady_clock::now(); - + while (!stopping.load()) { updateSettings(); - if (readStream(iqDataOutQueue) > 0) { - // record the date of the last good read. - streamWatchDog = std::chrono::steady_clock::now(); - } + readStream(iqDataOutQueue); - auto now = std::chrono::steady_clock::now(); - - //check watchdog value: if the date is too old, deinit end init the device. - std::chrono::duration diff = now - streamWatchDog; - - if (diff.count() > STREAM_READ_WATCHDOG_S) { - - std::cout << "SDRThread::readStream(): Restarting stream after too many read erros..." << std::endl << std::flush; - - deinit(); - init(); - - streamWatchDog = std::chrono::steady_clock::now(); - - std::cout << "SDRThread::readStream(): stream restarted." << std::endl << std::flush; - } } //End while iqDataOutQueue->flush();