AudioThread & demod worker IOThread queues, cleanup

This commit is contained in:
Charles J. Cliffe 2015-07-30 19:30:46 -04:00
parent 5bbcf7aa11
commit 10e35002f1
12 changed files with 56 additions and 53 deletions

View File

@ -51,23 +51,25 @@ bool CubicSDR::OnInit() {
ppm = 0; ppm = 0;
directSamplingMode = 0; directSamplingMode = 0;
// Visual Data
iqVisualQueue = new DemodulatorThreadInputQueue();
iqVisualQueue->set_max_num_items(1);
audioVisualQueue = new DemodulatorThreadOutputQueue(); audioVisualQueue = new DemodulatorThreadOutputQueue();
audioVisualQueue->set_max_num_items(1); audioVisualQueue->set_max_num_items(1);
// I/Q Data
iqPostDataQueue = new SDRThreadIQDataQueue;
threadCmdQueueSDR = new SDRThreadCommandQueue(); threadCmdQueueSDR = new SDRThreadCommandQueue();
sdrThread = new SDRThread(); sdrThread = new SDRThread();
sdrThread->setInputQueue("SDRCommandQueue",threadCmdQueueSDR);
sdrThread->setOutputQueue("IQDataOutput",iqPostDataQueue);
sdrPostThread = new SDRPostThread(); sdrPostThread = new SDRPostThread();
sdrPostThread->setNumVisSamples(16384 * 2); sdrPostThread->setNumVisSamples(16384 * 2);
sdrPostThread->setInputQueue("IQDataInput", iqPostDataQueue);
iqPostDataQueue = new SDRThreadIQDataQueue; sdrPostThread->setOutputQueue("IQVisualDataOut", iqVisualQueue);
iqVisualQueue = new DemodulatorThreadInputQueue;
iqVisualQueue->set_max_num_items(1);
sdrThread->setInputQueue("SDRCommandQueue",threadCmdQueueSDR);
sdrThread->setOutputQueue("IQDataOutput",iqPostDataQueue);
sdrPostThread->setIQDataInQueue(iqPostDataQueue);
sdrPostThread->setIQVisualQueue(iqVisualQueue);
std::vector<SDRDeviceInfo *>::iterator devs_i; std::vector<SDRDeviceInfo *>::iterator devs_i;

View File

@ -11,9 +11,9 @@ std::map<int, AudioThread *> AudioThread::deviceController;
std::map<int, int> AudioThread::deviceSampleRate; std::map<int, int> AudioThread::deviceSampleRate;
std::map<int, std::thread *> AudioThread::deviceThread; std::map<int, std::thread *> AudioThread::deviceThread;
AudioThread::AudioThread(AudioThreadInputQueue *inputQueue, DemodulatorThreadCommandQueue* threadQueueNotify) : IOThread(), AudioThread::AudioThread() : IOThread(),
currentInput(NULL), inputQueue(inputQueue), gain( currentInput(NULL), inputQueue(NULL), gain(
1.0), threadQueueNotify(threadQueueNotify), sampleRate(0), nBufferFrames(1024) { 1.0), threadQueueNotify(NULL), sampleRate(0), nBufferFrames(1024) {
audioQueuePtr.store(0); audioQueuePtr.store(0);
underflowCount.store(0); underflowCount.store(0);
@ -316,7 +316,7 @@ void AudioThread::setupDevice(int deviceId) {
} }
if (deviceController.find(parameters.deviceId) == deviceController.end()) { if (deviceController.find(parameters.deviceId) == deviceController.end()) {
deviceController[parameters.deviceId] = new AudioThread(NULL, NULL); deviceController[parameters.deviceId] = new AudioThread();
deviceController[parameters.deviceId]->setInitOutputDevice(parameters.deviceId, sampleRate); deviceController[parameters.deviceId]->setInitOutputDevice(parameters.deviceId, sampleRate);
deviceController[parameters.deviceId]->bindThread(this); deviceController[parameters.deviceId]->bindThread(this);
@ -377,6 +377,9 @@ void AudioThread::run() {
std::cout << "Audio thread started." << std::endl; std::cout << "Audio thread started." << std::endl;
inputQueue = (AudioThreadInputQueue *)getInputQueue("AudioDataInput");
threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue");
while (!terminated) { while (!terminated) {
AudioThreadCommand command; AudioThreadCommand command;
cmdQueue.pop(command); cmdQueue.pop(command);

View File

@ -49,7 +49,6 @@ typedef ThreadQueue<AudioThreadCommand> AudioThreadCommandQueue;
class AudioThread : public IOThread { class AudioThread : public IOThread {
public: public:
AudioThreadInput *currentInput; AudioThreadInput *currentInput;
AudioThreadInputQueue *inputQueue; AudioThreadInputQueue *inputQueue;
std::atomic_uint audioQueuePtr; std::atomic_uint audioQueuePtr;
@ -59,7 +58,7 @@ public:
std::atomic_int outputDevice; std::atomic_int outputDevice;
std::atomic<float> gain; std::atomic<float> gain;
AudioThread(AudioThreadInputQueue *inputQueue, DemodulatorThreadCommandQueue* threadQueueNotify); AudioThread();
~AudioThread(); ~AudioThread();
static void enumerateDevices(std::vector<RtAudio::DeviceInfo> &devs); static void enumerateDevices(std::vector<RtAudio::DeviceInfo> &devs);

View File

@ -26,7 +26,7 @@ DemodulatorInstance::DemodulatorInstance() :
demodulatorPreThread = new DemodulatorPreThread(); demodulatorPreThread = new DemodulatorPreThread();
demodulatorPreThread->setInputQueue("IQDataInput",threadQueueDemod); demodulatorPreThread->setInputQueue("IQDataInput",threadQueueDemod);
demodulatorPreThread->setOutputQueue("IQDataOut",threadQueuePostDemod); demodulatorPreThread->setOutputQueue("IQDataOutput",threadQueuePostDemod);
demodulatorPreThread->setOutputQueue("NotifyQueue",threadQueueNotify); demodulatorPreThread->setOutputQueue("NotifyQueue",threadQueueNotify);
demodulatorPreThread->setInputQueue("CommandQueue",threadQueueCommand); demodulatorPreThread->setInputQueue("CommandQueue",threadQueueCommand);
@ -37,9 +37,12 @@ DemodulatorInstance::DemodulatorInstance() :
demodulatorThread->setInputQueue("IQDataInput",threadQueuePostDemod); demodulatorThread->setInputQueue("IQDataInput",threadQueuePostDemod);
demodulatorThread->setInputQueue("ControlQueue",threadQueueControl); demodulatorThread->setInputQueue("ControlQueue",threadQueueControl);
demodulatorThread->setOutputQueue("NotifyQueue",threadQueueNotify); demodulatorThread->setOutputQueue("NotifyQueue",threadQueueNotify);
demodulatorThread->setOutputQueue("AudioDataOut", audioInputQueue); demodulatorThread->setOutputQueue("AudioDataOutput", audioInputQueue);
audioThread = new AudioThread();
audioThread->setInputQueue("AudioDataInput", audioInputQueue);
audioThread->setOutputQueue("NotifyQueue", threadQueueNotify);
audioThread = new AudioThread(audioInputQueue, threadQueueNotify);
currentDemodType = demodulatorThread->getDemodulatorType(); currentDemodType = demodulatorThread->getDemodulatorType();
} }

View File

@ -7,7 +7,7 @@
DemodulatorMgr::DemodulatorMgr() : DemodulatorMgr::DemodulatorMgr() :
activeDemodulator(NULL), lastActiveDemodulator(NULL), activeVisualDemodulator(NULL), lastBandwidth(DEFAULT_DEMOD_BW), lastDemodType( activeDemodulator(NULL), lastActiveDemodulator(NULL), activeVisualDemodulator(NULL), lastBandwidth(DEFAULT_DEMOD_BW), lastDemodType(
DEFAULT_DEMOD_TYPE), lastGain(1.0), lastSquelch(0), lastSquelchEnabled(false), lastStereo(false) { DEFAULT_DEMOD_TYPE), lastSquelchEnabled(false), lastSquelch(0), lastGain(1.0), lastStereo(false) {
} }

View File

@ -17,7 +17,10 @@ DemodulatorPreThread::DemodulatorPreThread() : IOThread(), iqResampler(NULL), iq
workerQueue = new DemodulatorThreadWorkerCommandQueue; workerQueue = new DemodulatorThreadWorkerCommandQueue;
workerResults = new DemodulatorThreadWorkerResultQueue; workerResults = new DemodulatorThreadWorkerResultQueue;
workerThread = new DemodulatorWorkerThread(workerQueue, workerResults);
workerThread = new DemodulatorWorkerThread();
workerThread->setInputQueue("WorkerCommandQueue",workerQueue);
workerThread->setOutputQueue("WorkerResultQueue",workerResults);
} }
void DemodulatorPreThread::initialize() { void DemodulatorPreThread::initialize() {
@ -93,7 +96,7 @@ void DemodulatorPreThread::run() {
ReBuffer<DemodulatorThreadPostIQData> buffers; ReBuffer<DemodulatorThreadPostIQData> buffers;
iqInputQueue = (DemodulatorThreadInputQueue*)getInputQueue("IQDataInput"); iqInputQueue = (DemodulatorThreadInputQueue*)getInputQueue("IQDataInput");
iqOutputQueue = (DemodulatorThreadPostInputQueue*)getOutputQueue("IQDataOut"); iqOutputQueue = (DemodulatorThreadPostInputQueue*)getOutputQueue("IQDataOutput");
threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue"); threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue");
commandQueue = ( DemodulatorThreadCommandQueue*)getInputQueue("CommandQueue"); commandQueue = ( DemodulatorThreadCommandQueue*)getInputQueue("CommandQueue");

View File

@ -70,7 +70,7 @@ void DemodulatorThread::run() {
std::cout << "Demodulator thread started.." << std::endl; std::cout << "Demodulator thread started.." << std::endl;
iqInputQueue = (DemodulatorThreadPostInputQueue*)getInputQueue("IQDataInput"); iqInputQueue = (DemodulatorThreadPostInputQueue*)getInputQueue("IQDataInput");
audioOutputQueue = (AudioThreadInputQueue*)getOutputQueue("AudioDataOut"); audioOutputQueue = (AudioThreadInputQueue*)getOutputQueue("AudioDataOutput");
threadQueueControl = (DemodulatorThreadControlCommandQueue *)getInputQueue("ControlQueue"); threadQueueControl = (DemodulatorThreadControlCommandQueue *)getInputQueue("ControlQueue");
threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue"); threadQueueNotify = (DemodulatorThreadCommandQueue*)getOutputQueue("NotifyQueue");

View File

@ -2,8 +2,8 @@
#include "CubicSDRDefs.h" #include "CubicSDRDefs.h"
#include <vector> #include <vector>
DemodulatorWorkerThread::DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out) : IOThread(), DemodulatorWorkerThread::DemodulatorWorkerThread() : IOThread(),
commandQueue(in), resultQueue(out) { commandQueue(NULL), resultQueue(NULL) {
} }
DemodulatorWorkerThread::~DemodulatorWorkerThread() { DemodulatorWorkerThread::~DemodulatorWorkerThread() {
@ -13,6 +13,9 @@ void DemodulatorWorkerThread::run() {
std::cout << "Demodulator worker thread started.." << std::endl; std::cout << "Demodulator worker thread started.." << std::endl;
commandQueue = (DemodulatorThreadWorkerCommandQueue *)getInputQueue("WorkerCommandQueue");
resultQueue = (DemodulatorThreadWorkerResultQueue *)getOutputQueue("WorkerResultQueue");
while (!terminated) { while (!terminated) {
bool filterChanged = false; bool filterChanged = false;
DemodulatorWorkerThreadCommand filterCommand; DemodulatorWorkerThreadCommand filterCommand;

View File

@ -73,7 +73,7 @@ typedef ThreadQueue<DemodulatorWorkerThreadResult> DemodulatorThreadWorkerResult
class DemodulatorWorkerThread : public IOThread { class DemodulatorWorkerThread : public IOThread {
public: public:
DemodulatorWorkerThread(DemodulatorThreadWorkerCommandQueue* in, DemodulatorThreadWorkerResultQueue* out); DemodulatorWorkerThread();
~DemodulatorWorkerThread(); ~DemodulatorWorkerThread();
void run(); void run();

View File

@ -50,16 +50,6 @@ void SDRPostThread::removeDemodulator(DemodulatorInstance *demod) {
busy_demod.unlock(); busy_demod.unlock();
} }
void SDRPostThread::setIQDataInQueue(SDRThreadIQDataQueue* iqDataQueue) {
iqDataInQueue = iqDataQueue;
}
void SDRPostThread::setIQDataOutQueue(DemodulatorThreadInputQueue* iqDataQueue) {
iqDataOutQueue = iqDataQueue;
}
void SDRPostThread::setIQVisualQueue(DemodulatorThreadInputQueue *iqVisQueue) {
iqVisualQueue = iqVisQueue;
}
void SDRPostThread::setNumVisSamples(int num_vis_samples_in) { void SDRPostThread::setNumVisSamples(int num_vis_samples_in) {
num_vis_samples = num_vis_samples_in; num_vis_samples = num_vis_samples_in;
} }
@ -90,6 +80,10 @@ void SDRPostThread::run() {
std::cout << "SDR post-processing thread started.." << std::endl; std::cout << "SDR post-processing thread started.." << std::endl;
iqDataInQueue = (SDRThreadIQDataQueue*)getInputQueue("IQDataInput");
iqDataOutQueue = (DemodulatorThreadInputQueue*)getOutputQueue("IQDataOutput");
iqVisualQueue = (DemodulatorThreadInputQueue*)getOutputQueue("IQVisualDataOut");
ReBuffer<DemodulatorThreadIQData> buffers; ReBuffer<DemodulatorThreadIQData> buffers;
std::vector<liquid_float_complex> fpData; std::vector<liquid_float_complex> fpData;
std::vector<liquid_float_complex> dataOut; std::vector<liquid_float_complex> dataOut;
@ -97,7 +91,7 @@ void SDRPostThread::run() {
while (!terminated) { while (!terminated) {
SDRThreadIQData *data_in; SDRThreadIQData *data_in;
iqDataInQueue.load()->pop(data_in); iqDataInQueue->pop(data_in);
// std::lock_guard < std::mutex > lock(data_in->m_mutex); // std::lock_guard < std::mutex > lock(data_in->m_mutex);
if (data_in && data_in->data.size()) { if (data_in && data_in->data.size()) {
@ -123,16 +117,16 @@ void SDRPostThread::run() {
iirfilt_crcf_execute_block(dcFilter, &fpData[0], dataSize, &dataOut[0]); iirfilt_crcf_execute_block(dcFilter, &fpData[0], dataSize, &dataOut[0]);
if (iqDataOutQueue.load() != NULL) { if (iqDataOutQueue != NULL) {
DemodulatorThreadIQData *pipeDataOut = new DemodulatorThreadIQData; DemodulatorThreadIQData *pipeDataOut = new DemodulatorThreadIQData;
pipeDataOut->frequency = data_in->frequency; pipeDataOut->frequency = data_in->frequency;
pipeDataOut->sampleRate = data_in->sampleRate; pipeDataOut->sampleRate = data_in->sampleRate;
pipeDataOut->data.assign(dataOut.begin(), dataOut.end()); pipeDataOut->data.assign(dataOut.begin(), dataOut.end());
iqDataOutQueue.load()->push(pipeDataOut); iqDataOutQueue->push(pipeDataOut);
} }
if (iqVisualQueue.load() != NULL && iqVisualQueue.load()->empty()) { if (iqVisualQueue != NULL && iqVisualQueue->empty()) {
visualDataOut->busy_rw.lock(); visualDataOut->busy_rw.lock();
@ -147,7 +141,7 @@ void SDRPostThread::run() {
visualDataOut->sampleRate = data_in->sampleRate; visualDataOut->sampleRate = data_in->sampleRate;
visualDataOut->data.assign(dataOut.begin(), dataOut.begin() + num_vis_samples); visualDataOut->data.assign(dataOut.begin(), dataOut.begin() + num_vis_samples);
iqVisualQueue.load()->push(visualDataOut); iqVisualQueue->push(visualDataOut);
visualDataOut->busy_rw.unlock(); visualDataOut->busy_rw.unlock();
} }
@ -227,9 +221,9 @@ void SDRPostThread::run() {
buffers.purge(); buffers.purge();
if (iqVisualQueue.load() && !iqVisualQueue.load()->empty()) { if (iqVisualQueue && !iqVisualQueue->empty()) {
DemodulatorThreadIQData *visualDataDummy; DemodulatorThreadIQData *visualDataDummy;
iqVisualQueue.load()->pop(visualDataDummy); iqVisualQueue->pop(visualDataDummy);
} }
delete visualDataOut; delete visualDataOut;
@ -240,5 +234,5 @@ void SDRPostThread::run() {
void SDRPostThread::terminate() { void SDRPostThread::terminate() {
terminated = true; terminated = true;
SDRThreadIQData *dummy = new SDRThreadIQData; SDRThreadIQData *dummy = new SDRThreadIQData;
iqDataInQueue.load()->push(dummy); iqDataInQueue->push(dummy);
} }

View File

@ -11,10 +11,6 @@ public:
void bindDemodulator(DemodulatorInstance *demod); void bindDemodulator(DemodulatorInstance *demod);
void removeDemodulator(DemodulatorInstance *demod); void removeDemodulator(DemodulatorInstance *demod);
void setIQDataInQueue(SDRThreadIQDataQueue* iqDataQueue);
void setIQDataOutQueue(DemodulatorThreadInputQueue* iqDataQueue);
void setIQVisualQueue(DemodulatorThreadInputQueue* iqVisQueue);
void setNumVisSamples(int num_vis_samples_in); void setNumVisSamples(int num_vis_samples_in);
int getNumVisSamples(); int getNumVisSamples();
@ -25,9 +21,9 @@ public:
void terminate(); void terminate();
protected: protected:
std::atomic<SDRThreadIQDataQueue *> iqDataInQueue; SDRThreadIQDataQueue *iqDataInQueue;
std::atomic<DemodulatorThreadInputQueue *> iqDataOutQueue; DemodulatorThreadInputQueue *iqDataOutQueue;
std::atomic<DemodulatorThreadInputQueue *> iqVisualQueue; DemodulatorThreadInputQueue *iqVisualQueue;
std::mutex busy_demod; std::mutex busy_demod;
std::vector<DemodulatorInstance *> demodulators; std::vector<DemodulatorInstance *> demodulators;

View File

@ -25,7 +25,7 @@ EVT_ENTER_WINDOW(ModeSelectorCanvas::OnMouseEnterWindow)
wxEND_EVENT_TABLE() wxEND_EVENT_TABLE()
ModeSelectorCanvas::ModeSelectorCanvas(wxWindow *parent, int *attribList) : ModeSelectorCanvas::ModeSelectorCanvas(wxWindow *parent, int *attribList) :
InteractiveCanvas(parent, attribList), currentSelection(-1), numChoices(0) { InteractiveCanvas(parent, attribList), numChoices(0), currentSelection(-1) {
glContext = new ModeSelectorContext(this, &wxGetApp().GetContext(this)); glContext = new ModeSelectorContext(this, &wxGetApp().GetContext(this));
} }