From 5fc5e4269cef01fac73397fdfa53a0753fdf3a1d Mon Sep 17 00:00:00 2001 From: "Charles J. Cliffe" Date: Sun, 30 Nov 2014 23:33:55 -0500 Subject: [PATCH 1/3] Demodulator worker thread test --- CMakeLists.txt | 2 + src/demod/DemodulatorThread.cpp | 34 +++++++-- src/demod/DemodulatorThread.h | 12 +++- src/demod/DemodulatorWorkerThread.cpp | 78 ++++++++++++++++++++ src/demod/DemodulatorWorkerThread.h | 100 ++++++++++++++++++++++++++ src/sdr/SDRThread.h | 2 +- src/visual/WaterfallCanvas.cpp | 6 +- 7 files changed, 222 insertions(+), 12 deletions(-) create mode 100644 src/demod/DemodulatorWorkerThread.cpp create mode 100644 src/demod/DemodulatorWorkerThread.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f2390be..b20606b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -92,6 +92,7 @@ SET (cubicsdr_sources src/sdr/SDRThread.cpp src/sdr/SDRPostThread.cpp src/demod/DemodulatorThread.cpp + src/demod/DemodulatorWorkerThread.cpp src/demod/DemodulatorMgr.cpp src/audio/AudioThread.cpp src/util/Gradient.cpp @@ -113,6 +114,7 @@ SET (cubicsdr_headers src/sdr/SDRThread.h src/sdr/SDRPostThread.h src/demod/DemodulatorThread.h + src/demod/DemodulatorWorkerThread.h src/demod/DemodulatorMgr.h src/audio/AudioThread.h src/util/Gradient.h diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 7588521..e1d1160 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -4,7 +4,7 @@ DemodulatorThread::DemodulatorThread(DemodulatorThreadInputQueue* pQueue) : inputQueue(pQueue), visOutQueue(NULL), terminated(false), initialized(false), audio_resampler(NULL), resample_ratio(1), audio_resample_ratio( - 1), resampler(NULL), commandQueue(NULL), fir_filter(NULL) { + 1), resampler(NULL), commandQueue(NULL), fir_filter(NULL), audioInputQueue(NULL) { float kf = 0.5; // modulation factor fdem = freqdem_create(kf); @@ -13,6 +13,11 @@ DemodulatorThread::DemodulatorThread(DemodulatorThreadInputQueue* pQueue) : nco_shift = nco_crcf_create(LIQUID_VCO); shift_freq = 0; + workerQueue = new DemodulatorThreadWorkerCommandQueue; + workerResults = new DemodulatorThreadWorkerResultQueue; + workerThread = new DemodulatorWorkerThread(workerQueue,workerResults); + + t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain,workerThread); } void DemodulatorThread::initialize() { @@ -66,6 +71,9 @@ void DemodulatorThread::initialize() { } DemodulatorThread::~DemodulatorThread() { + delete workerThread; + delete workerQueue; + delete workerResults; } void DemodulatorThread::threadMain() { @@ -79,29 +87,40 @@ void DemodulatorThread::threadMain() { DemodulatorThreadIQData inp; inputQueue->pop(inp); + bool bandwidthChanged = false; + DemodulatorThreadParameters bandwidthParams = params; + if (!commandQueue->empty()) { bool paramsChanged = false; while (!commandQueue->empty()) { DemodulatorThreadCommand command; commandQueue->pop(command); switch (command.cmd) { - case DemodulatorThreadCommand::SDR_THREAD_CMD_SET_BANDWIDTH: + case DemodulatorThreadCommand::DEMOD_THREAD_CMD_SET_BANDWIDTH: if (command.int_value < 3000) { command.int_value = 3000; } if (command.int_value > SRATE) { command.int_value = SRATE; } - params.bandwidth = command.int_value; - paramsChanged = true; + bandwidthParams.bandwidth = command.int_value; + bandwidthChanged = true; break; - case DemodulatorThreadCommand::SDR_THREAD_CMD_SET_FREQUENCY: + case DemodulatorThreadCommand::DEMOD_THREAD_CMD_SET_FREQUENCY: params.frequency = command.int_value; break; } } - if (paramsChanged) { + if (bandwidthChanged) { +// DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS); +// command.audioSampleRate = bandwidthParams.audioSampleRate; +// command.bandwidth = bandwidthParams.bandwidth; +// command.frequency = bandwidthParams.frequency; +// command.inputRate = bandwidthParams.inputRate; +// +// workerQueue->push(command); + initialize(); while (!inputQueue->empty()) { // catch up inputQueue->pop(inp); @@ -206,4 +225,7 @@ void DemodulatorThread::terminate() { terminated = true; DemodulatorThreadIQData inp; // push dummy to nudge queue inputQueue->push(inp); + + workerThread->terminate(); + t_Worker->join(); } diff --git a/src/demod/DemodulatorThread.h b/src/demod/DemodulatorThread.h index 9198af4..3fefd13 100644 --- a/src/demod/DemodulatorThread.h +++ b/src/demod/DemodulatorThread.h @@ -14,6 +14,7 @@ #include "AudioThread.h" #include "ThreadQueue.h" #include "CubicSDRDefs.h" +#include "DemodulatorWorkerThread.h" enum DemodulatorType { DEMOD_TYPE_NULL, DEMOD_TYPE_AM, DEMOD_TYPE_FM, DEMOD_TYPE_LSB, DEMOD_TYPE_USB @@ -22,11 +23,11 @@ enum DemodulatorType { class DemodulatorThreadCommand { public: enum DemodulatorThreadCommandEnum { - SDR_THREAD_CMD_NULL, SDR_THREAD_CMD_SET_BANDWIDTH, SDR_THREAD_CMD_SET_FREQUENCY + DEMOD_THREAD_CMD_NULL, DEMOD_THREAD_CMD_SET_BANDWIDTH, DEMOD_THREAD_CMD_SET_FREQUENCY }; DemodulatorThreadCommand() : - cmd(cmd), int_value(SDR_THREAD_CMD_NULL) { + cmd(DEMOD_THREAD_CMD_NULL), int_value(0) { } @@ -106,6 +107,7 @@ typedef ThreadQueue DemodulatorThreadInputQueue; typedef ThreadQueue DemodulatorThreadOutputQueue; typedef ThreadQueue DemodulatorThreadCommandQueue; + class DemodulatorThread { public: @@ -158,4 +160,10 @@ protected: std::atomic terminated; std::atomic initialized; + + DemodulatorWorkerThread *workerThread; + std::thread *t_Worker; + + DemodulatorThreadWorkerCommandQueue *workerQueue; + DemodulatorThreadWorkerResultQueue *workerResults; }; diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp new file mode 100644 index 0000000..84c44d6 --- /dev/null +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -0,0 +1,78 @@ +#include "DemodulatorWorkerThread.h" +#include "CubicSDRDefs.h" +#include + +DemodulatorWorkerThread::DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out) : + terminated(false), commandQueue(in), resultQueue(out) { + +} + +DemodulatorWorkerThread::~DemodulatorWorkerThread() { +} + +void DemodulatorWorkerThread::threadMain() { + + std::cout << "Demodulator worker thread started.." << std::endl; + + while (!terminated) { + bool filterChanged = false; + DemodulatorWorkerThreadCommand filterCommand; + DemodulatorWorkerThreadCommand command; + DemodulatorWorkerThreadResult result; + + bool done = false; + while (!done) { + commandQueue->pop(command); + switch (command.cmd) { + case DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS: + filterChanged = true; + filterCommand = command; + break; + } + done = commandQueue->empty(); + } + + if (filterChanged) { + result.cmd = DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS; + + result.resample_ratio = (float) (filterCommand.bandwidth) / (float) filterCommand.inputRate; + result.audio_resample_ratio = (float) (filterCommand.audioSampleRate) / (float) filterCommand.bandwidth; + + float fc = 0.5 * ((double) filterCommand.bandwidth / (double) filterCommand.inputRate); // filter cutoff frequency + + if (fc <= 0) { + fc = 0; + } + + if (fc >= 0.5) { + fc = 0.5; + } + + float ft = 0.05f; // filter transition + float As = 60.0f; // stop-band attenuation [dB] + float mu = 0.0f; // fractional timing offset + + // estimate required filter length and generate filter + unsigned int h_len = estimate_req_filter_len(ft, As); + float h[h_len]; + liquid_firdes_kaiser(h_len, fc, As, mu, h); + + result.fir_filter = firfilt_crcf_create(h, h_len); + + result.resampler = msresamp_crcf_create(result.resample_ratio, As); + result.audio_resampler = msresamp_crcf_create(result.audio_resample_ratio, As); + // msresamp_crcf_print(audio_resampler); + + resultQueue->push(result); + } + + } + + std::cout << "Demodulator worker thread done." << std::endl; +} + +void DemodulatorWorkerThread::terminate() { + terminated = true; + DemodulatorWorkerThreadCommand inp; // push dummy to nudge queue + commandQueue->push(inp); +} diff --git a/src/demod/DemodulatorWorkerThread.h b/src/demod/DemodulatorWorkerThread.h new file mode 100644 index 0000000..9911972 --- /dev/null +++ b/src/demod/DemodulatorWorkerThread.h @@ -0,0 +1,100 @@ +#pragma once + +#include +#include +#include "wx/wxprec.h" + +#ifndef WX_PRECOMP +#include "wx/wx.h" +#endif + +#include "wx/thread.h" + +#include "liquid/liquid.h" +#include "AudioThread.h" +#include "ThreadQueue.h" +#include "CubicSDRDefs.h" + +class DemodulatorWorkerThreadResult { +public: + enum DemodulatorThreadResultEnum { + DEMOD_WORKER_THREAD_RESULT_NULL, DEMOD_WORKER_THREAD_RESULT_FILTERS + }; + + DemodulatorWorkerThreadResult() : + cmd(DEMOD_WORKER_THREAD_RESULT_NULL), audioSampleRate(0), bandwidth(0), inputRate(0), fir_filter(NULL), resampler(NULL), resample_ratio( + 0), audio_resampler(NULL), audio_resample_ratio(0) { + + } + + DemodulatorWorkerThreadResult(DemodulatorThreadResultEnum cmd) : + cmd(cmd), audioSampleRate(0), bandwidth(0), inputRate(0), fir_filter(NULL), resampler(NULL), resample_ratio(0), audio_resampler(NULL), audio_resample_ratio( + 0) { + + } + + firfilt_crcf fir_filter; + msresamp_crcf resampler; + float resample_ratio; + msresamp_crcf audio_resampler; + float audio_resample_ratio; + + unsigned int inputRate; + unsigned int bandwidth; + unsigned int audioSampleRate; + + DemodulatorThreadResultEnum cmd; +}; + +class DemodulatorWorkerThreadCommand { +public: + enum DemodulatorThreadCommandEnum { + DEMOD_WORKER_THREAD_CMD_NULL, DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS + }; + + DemodulatorWorkerThreadCommand() : + cmd(DEMOD_WORKER_THREAD_CMD_NULL), frequency(0), inputRate(0), bandwidth(0), audioSampleRate(0) { + + } + + DemodulatorWorkerThreadCommand(DemodulatorThreadCommandEnum cmd) : + cmd(cmd), frequency(0), inputRate(0), bandwidth(0), audioSampleRate(0) { + + } + + unsigned int frequency; + unsigned int inputRate; + unsigned int bandwidth; + unsigned int audioSampleRate; + + DemodulatorThreadCommandEnum cmd; +}; + +typedef ThreadQueue DemodulatorThreadWorkerCommandQueue; +typedef ThreadQueue DemodulatorThreadWorkerResultQueue; + +class DemodulatorWorkerThread { +public: + + DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out); + ~DemodulatorWorkerThread(); + + void threadMain(); + + void setCommandQueue(DemodulatorThreadWorkerCommandQueue *tQueue) { + commandQueue = tQueue; + } + + void setResultQueue(DemodulatorThreadWorkerResultQueue *tQueue) { + resultQueue = tQueue; + } + + void terminate(); + +protected: + + DemodulatorThreadWorkerCommandQueue *commandQueue; + DemodulatorThreadWorkerResultQueue *resultQueue; + + std::atomic terminated; +}; diff --git a/src/sdr/SDRThread.h b/src/sdr/SDRThread.h index bdec3cf..67e4b16 100644 --- a/src/sdr/SDRThread.h +++ b/src/sdr/SDRThread.h @@ -21,7 +21,7 @@ public: SDR_THREAD_CMD_TUNE }; - SDRThreadCommand() : cmd(cmd), int_value(SDR_THREAD_CMD_NULL) { + SDRThreadCommand() : cmd(SDR_THREAD_CMD_NULL), int_value(0) { } diff --git a/src/visual/WaterfallCanvas.cpp b/src/visual/WaterfallCanvas.cpp index bb4dc92..942da14 100644 --- a/src/visual/WaterfallCanvas.cpp +++ b/src/visual/WaterfallCanvas.cpp @@ -221,7 +221,7 @@ void WaterfallCanvas::mouseMoved(wxMouseEvent& event) { } DemodulatorThreadCommand command; - command.cmd = DemodulatorThreadCommand::SDR_THREAD_CMD_SET_BANDWIDTH; + command.cmd = DemodulatorThreadCommand::DEMOD_THREAD_CMD_SET_BANDWIDTH; activeDemodulatorBandwidth = activeDemodulatorBandwidth + bwDiff; if (activeDemodulatorBandwidth < 1000) { activeDemodulatorBandwidth = 1000; @@ -242,7 +242,7 @@ void WaterfallCanvas::mouseMoved(wxMouseEvent& event) { } DemodulatorThreadCommand command; - command.cmd = DemodulatorThreadCommand::SDR_THREAD_CMD_SET_FREQUENCY; + command.cmd = DemodulatorThreadCommand::DEMOD_THREAD_CMD_SET_FREQUENCY; activeDemodulatorFrequency = activeDemodulatorFrequency + bwDiff; command.int_value = activeDemodulatorFrequency; @@ -327,7 +327,7 @@ void WaterfallCanvas::mouseReleased(wxMouseEvent& event) { int freq = center_freq - (int) (0.5 * (float) SRATE) + (int) ((float) pos * (float) SRATE); DemodulatorThreadCommand command; - command.cmd = DemodulatorThreadCommand::SDR_THREAD_CMD_SET_FREQUENCY; + command.cmd = DemodulatorThreadCommand::DEMOD_THREAD_CMD_SET_FREQUENCY; command.int_value = freq; demod->getCommandQueue()->push(command); From 38b1393c44e246e5d83c1b3049debb1dc62865a2 Mon Sep 17 00:00:00 2001 From: "Charles J. Cliffe" Date: Mon, 1 Dec 2014 01:14:32 -0500 Subject: [PATCH 2/3] Worker results.. --- src/demod/DemodulatorThread.cpp | 50 ++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index e1d1160..1fd4d92 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -113,21 +113,51 @@ void DemodulatorThread::threadMain() { } if (bandwidthChanged) { -// DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS); -// command.audioSampleRate = bandwidthParams.audioSampleRate; -// command.bandwidth = bandwidthParams.bandwidth; -// command.frequency = bandwidthParams.frequency; -// command.inputRate = bandwidthParams.inputRate; + std::cout << "Requesting new filters from worker.." << std::endl; + DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS); + command.audioSampleRate = bandwidthParams.audioSampleRate; + command.bandwidth = bandwidthParams.bandwidth; + command.frequency = bandwidthParams.frequency; + command.inputRate = bandwidthParams.inputRate; // -// workerQueue->push(command); + workerQueue->push(command); - initialize(); - while (!inputQueue->empty()) { // catch up - inputQueue->pop(inp); - } +// params = bandwidthParams; +// initialize(); +// while (!inputQueue->empty()) { // catch up +// inputQueue->pop(inp); +// } } } + if (!workerResults->empty()) { + while (!workerResults->empty()) { + DemodulatorWorkerThreadResult result; + workerResults->pop(result); + + switch (result.cmd) { + case DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS: + std::cout << "New filters arrived from worker.." << std::endl; + firfilt_crcf_destroy(fir_filter); + msresamp_crcf_destroy(resampler); + msresamp_crcf_destroy(audio_resampler); + + fir_filter = result.fir_filter; + resampler = result.resampler; + audio_resampler = result.audio_resampler; + + resample_ratio = result.resample_ratio; + audio_resample_ratio = result.audio_resample_ratio; + + params.audioSampleRate = result.audioSampleRate; + params.bandwidth = result.bandwidth; + params.inputRate = result.inputRate; + + break; + } + } + } + if (!initialized) { continue; } From 746eca8d3e6b128e2c1a518f50102cf77870ae2f Mon Sep 17 00:00:00 2001 From: "Charles J. Cliffe" Date: Mon, 1 Dec 2014 02:10:36 -0500 Subject: [PATCH 3/3] Demod worker now handles filter reconstruction Reduces audio jittering and only generates the last queued filter to save redundant regeneration during dragging. --- src/demod/DemodulatorThread.cpp | 68 ++++++++++++--------------- src/demod/DemodulatorWorkerThread.cpp | 9 ++-- 2 files changed, 35 insertions(+), 42 deletions(-) diff --git a/src/demod/DemodulatorThread.cpp b/src/demod/DemodulatorThread.cpp index 1fd4d92..42aa7e1 100644 --- a/src/demod/DemodulatorThread.cpp +++ b/src/demod/DemodulatorThread.cpp @@ -15,9 +15,9 @@ DemodulatorThread::DemodulatorThread(DemodulatorThreadInputQueue* pQueue) : workerQueue = new DemodulatorThreadWorkerCommandQueue; workerResults = new DemodulatorThreadWorkerResultQueue; - workerThread = new DemodulatorWorkerThread(workerQueue,workerResults); + workerThread = new DemodulatorWorkerThread(workerQueue, workerResults); - t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain,workerThread); + t_Worker = new std::thread(&DemodulatorWorkerThread::threadMain, workerThread); } void DemodulatorThread::initialize() { @@ -113,51 +113,16 @@ void DemodulatorThread::threadMain() { } if (bandwidthChanged) { - std::cout << "Requesting new filters from worker.." << std::endl; DemodulatorWorkerThreadCommand command(DemodulatorWorkerThreadCommand::DEMOD_WORKER_THREAD_CMD_BUILD_FILTERS); command.audioSampleRate = bandwidthParams.audioSampleRate; command.bandwidth = bandwidthParams.bandwidth; command.frequency = bandwidthParams.frequency; command.inputRate = bandwidthParams.inputRate; -// + workerQueue->push(command); - -// params = bandwidthParams; -// initialize(); -// while (!inputQueue->empty()) { // catch up -// inputQueue->pop(inp); -// } } } - if (!workerResults->empty()) { - while (!workerResults->empty()) { - DemodulatorWorkerThreadResult result; - workerResults->pop(result); - - switch (result.cmd) { - case DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS: - std::cout << "New filters arrived from worker.." << std::endl; - firfilt_crcf_destroy(fir_filter); - msresamp_crcf_destroy(resampler); - msresamp_crcf_destroy(audio_resampler); - - fir_filter = result.fir_filter; - resampler = result.resampler; - audio_resampler = result.audio_resampler; - - resample_ratio = result.resample_ratio; - audio_resample_ratio = result.audio_resample_ratio; - - params.audioSampleRate = result.audioSampleRate; - params.bandwidth = result.bandwidth; - params.inputRate = result.inputRate; - - break; - } - } - } - if (!initialized) { continue; } @@ -246,6 +211,33 @@ void DemodulatorThread::threadMain() { visOutQueue->push(ati); } } + + if (!workerResults->empty()) { + while (!workerResults->empty()) { + DemodulatorWorkerThreadResult result; + workerResults->pop(result); + + switch (result.cmd) { + case DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS: + firfilt_crcf_destroy(fir_filter); + msresamp_crcf_destroy(resampler); + msresamp_crcf_destroy(audio_resampler); + + fir_filter = result.fir_filter; + resampler = result.resampler; + audio_resampler = result.audio_resampler; + + resample_ratio = result.resample_ratio; + audio_resample_ratio = result.audio_resample_ratio; + + params.audioSampleRate = result.audioSampleRate; + params.bandwidth = result.bandwidth; + params.inputRate = result.inputRate; + + break; + } + } + } } std::cout << "Demodulator thread done." << std::endl; diff --git a/src/demod/DemodulatorWorkerThread.cpp b/src/demod/DemodulatorWorkerThread.cpp index 84c44d6..f002737 100644 --- a/src/demod/DemodulatorWorkerThread.cpp +++ b/src/demod/DemodulatorWorkerThread.cpp @@ -18,7 +18,6 @@ void DemodulatorWorkerThread::threadMain() { bool filterChanged = false; DemodulatorWorkerThreadCommand filterCommand; DemodulatorWorkerThreadCommand command; - DemodulatorWorkerThreadResult result; bool done = false; while (!done) { @@ -33,7 +32,7 @@ void DemodulatorWorkerThread::threadMain() { } if (filterChanged) { - result.cmd = DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS; + DemodulatorWorkerThreadResult result(DemodulatorWorkerThreadResult::DEMOD_WORKER_THREAD_RESULT_FILTERS); result.resample_ratio = (float) (filterCommand.bandwidth) / (float) filterCommand.inputRate; result.audio_resample_ratio = (float) (filterCommand.audioSampleRate) / (float) filterCommand.bandwidth; @@ -58,10 +57,12 @@ void DemodulatorWorkerThread::threadMain() { liquid_firdes_kaiser(h_len, fc, As, mu, h); result.fir_filter = firfilt_crcf_create(h, h_len); - result.resampler = msresamp_crcf_create(result.resample_ratio, As); result.audio_resampler = msresamp_crcf_create(result.audio_resample_ratio, As); - // msresamp_crcf_print(audio_resampler); + + result.audioSampleRate = filterCommand.audioSampleRate; + result.bandwidth = filterCommand.bandwidth; + result.inputRate = filterCommand.inputRate; resultQueue->push(result); }