From bc287a4c3340f725a1d351744e3c58efabb2f130 Mon Sep 17 00:00:00 2001 From: f4exb Date: Fri, 21 Aug 2015 08:54:28 +0200 Subject: [PATCH] Deep redesign: debug session #2 phase #2: fixed multi-threading of channelizers --- include-gpl/dsp/dspcommands.h | 17 +++-- include-gpl/dsp/dspengine.h | 4 +- include/dsp/threadedsamplesink.h | 17 ++--- plugins/channel/am/amdemod.cpp | 7 +- plugins/channel/am/amdemodgui.cpp | 6 +- plugins/channel/am/amdemodgui.h | 2 + plugins/channel/chanalyzer/chanalyzergui.cpp | 7 +- plugins/channel/chanalyzer/chanalyzergui.h | 2 + plugins/channel/lora/lorademodgui.cpp | 6 +- plugins/channel/lora/lorademodgui.h | 2 + plugins/channel/nfm/nfmdemodgui.cpp | 6 +- plugins/channel/nfm/nfmdemodgui.h | 2 + plugins/channel/ssb/ssbdemodgui.cpp | 6 +- plugins/channel/ssb/ssbdemodgui.h | 2 + plugins/channel/tcpsrc/tcpsrcgui.cpp | 7 +- plugins/channel/tcpsrc/tcpsrcgui.h | 2 + plugins/channel/wfm/wfmdemodgui.cpp | 6 +- plugins/channel/wfm/wfmdemodgui.h | 2 + sdrbase/dsp/channelizer.cpp | 6 +- sdrbase/dsp/dspcommands.cpp | 4 +- sdrbase/dsp/dspengine.cpp | 39 +++------- sdrbase/dsp/threadedsamplesink.cpp | 78 +++++++++++--------- 22 files changed, 129 insertions(+), 101 deletions(-) diff --git a/include-gpl/dsp/dspcommands.h b/include-gpl/dsp/dspcommands.h index 1983ce1e1..0ab9c9b86 100644 --- a/include-gpl/dsp/dspcommands.h +++ b/include-gpl/dsp/dspcommands.h @@ -25,6 +25,7 @@ class SampleSource; class SampleSink; +class ThreadedSampleSink; class AudioFifo; class SDRANGELOVE_API DSPPing : public Message { @@ -105,28 +106,28 @@ private: SampleSink* m_sampleSink; }; -class SDRANGELOVE_API DSPAddThreadedSink : public Message { +class SDRANGELOVE_API DSPAddThreadedSampleSink : public Message { MESSAGE_CLASS_DECLARATION public: - DSPAddThreadedSink(SampleSink* sampleSink) : Message(), m_sampleSink(sampleSink) { } + DSPAddThreadedSampleSink(ThreadedSampleSink* threadedSampleSink) : Message(), m_threadedSampleSink(threadedSampleSink) { } - SampleSink* getSampleSink() const { return m_sampleSink; } + ThreadedSampleSink* getThreadedSampleSink() const { return m_threadedSampleSink; } private: - SampleSink* m_sampleSink; + ThreadedSampleSink* m_threadedSampleSink; }; -class SDRANGELOVE_API DSPRemoveThreadedSink : public Message { +class SDRANGELOVE_API DSPRemoveThreadedSampleSink : public Message { MESSAGE_CLASS_DECLARATION public: - DSPRemoveThreadedSink(SampleSink* sampleSink) : Message(), m_sampleSink(sampleSink) { } + DSPRemoveThreadedSampleSink(ThreadedSampleSink* threadedSampleSink) : Message(), m_threadedSampleSink(threadedSampleSink) { } - SampleSink* getSampleSink() const { return m_sampleSink; } + ThreadedSampleSink* getThreadedSampleSink() const { return m_threadedSampleSink; } private: - SampleSink* m_sampleSink; + ThreadedSampleSink* m_threadedSampleSink; }; class SDRANGELOVE_API DSPAddAudioSink : public Message { diff --git a/include-gpl/dsp/dspengine.h b/include-gpl/dsp/dspengine.h index cd9350241..d46607698 100644 --- a/include-gpl/dsp/dspengine.h +++ b/include-gpl/dsp/dspengine.h @@ -67,8 +67,8 @@ public: void addSink(SampleSink* sink); //!< Add a sample sink void removeSink(SampleSink* sink); //!< Remove a sample sink - void addThreadedSink(SampleSink* sink); //!< Add a sample sink that will run on its own thread - void removeThreadedSink(SampleSink* sink); //!< Remove a sample sink that runs on its own thread + void addThreadedSink(ThreadedSampleSink* sink); //!< Add a sample sink that will run on its own thread + void removeThreadedSink(ThreadedSampleSink* sink); //!< Remove a sample sink that runs on its own thread void addAudioSink(AudioFifo* audioFifo); //!< Add the audio sink void removeAudioSink(AudioFifo* audioFifo); //!< Remove the audio sink diff --git a/include/dsp/threadedsamplesink.h b/include/dsp/threadedsamplesink.h index ef8aafea4..38e12ccf7 100644 --- a/include/dsp/threadedsamplesink.h +++ b/include/dsp/threadedsamplesink.h @@ -19,7 +19,6 @@ #define INCLUDE_THREADEDSAMPLESINK_H #include -#include #include "samplesink.h" #include "dsp/samplefifo.h" #include "util/messagequeue.h" @@ -27,15 +26,16 @@ #include "util/syncmessenger.h" class SampleSink; +class QThread; /** * This class is a wrapper for SampleSink that runs the SampleSink object in its own thread */ -class SDRANGELOVE_API ThreadedSampleSink : public QThread { +class SDRANGELOVE_API ThreadedSampleSink : public QObject { Q_OBJECT public: - ThreadedSampleSink(SampleSink* sampleSink); + ThreadedSampleSink(SampleSink* sampleSink, QObject *parent = 0); ~ThreadedSampleSink(); const SampleSink *getSink() const { return m_sampleSink; } @@ -45,20 +45,19 @@ public: void start(); //!< this thread start() void stop(); //!< this thread exit() and wait() - bool sendWaitSink(Message& cmd); //!< Send message to sink synchronously + bool handleSinkMessage(Message& cmd); //!< Send message to sink synchronously void feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool positiveOnly); //!< Feed sink with samples QString getSampleSinkObjectName() const; protected: + QThread *m_thread; //!< The thead object SyncMessenger m_syncMessenger; //!< Used to process messages synchronously with the thread SampleSink* m_sampleSink; + SampleFifo m_sampleFifo; -private: - void run(); //!< this thread run() method - -private slots: - void handleSynchronousMessages(); //!< Handle synchronous messages with the thread +protected slots: + void handleData(); }; #endif // INCLUDE_THREADEDSAMPLESINK_H diff --git a/plugins/channel/am/amdemod.cpp b/plugins/channel/am/amdemod.cpp index 7fd09dc9a..6280c6f6d 100644 --- a/plugins/channel/am/amdemod.cpp +++ b/plugins/channel/am/amdemod.cpp @@ -124,11 +124,11 @@ void AMDemod::feed(SampleVector::const_iterator begin, SampleVector::const_itera { uint res = m_audioFifo->write((const quint8*)&m_audioBuffer[0], m_audioBufferFill, 1); - /* FIXME: Not necessarily bad, There is a race between threads but generally it works i.e. samples are not lost + // FIXME: Not necessarily bad, There is a race between threads but generally it works i.e. samples are not lost if (res != m_audioBufferFill) { qDebug("AMDemod::feed: %u/%u audio samples lost", m_audioBufferFill - res, m_audioBufferFill); - }*/ + } m_audioBufferFill = 0; } @@ -141,7 +141,8 @@ void AMDemod::feed(SampleVector::const_iterator begin, SampleVector::const_itera { uint res = m_audioFifo->write((const quint8*)&m_audioBuffer[0], m_audioBufferFill, 1); - /* SAme remark as above + // Same remark as above + /* if (res != m_audioBufferFill) { qDebug("AMDemod::feed: %u samples written vs %u requested", res, m_audioBufferFill); diff --git a/plugins/channel/am/amdemodgui.cpp b/plugins/channel/am/amdemodgui.cpp index 8d9adc509..de10d8921 100644 --- a/plugins/channel/am/amdemodgui.cpp +++ b/plugins/channel/am/amdemodgui.cpp @@ -204,8 +204,9 @@ AMDemodGUI::AMDemodGUI(PluginAPI* pluginAPI, QWidget* parent) : m_audioFifo = new AudioFifo(4, 48000); m_amDemod = new AMDemod(m_audioFifo, 0); m_channelizer = new Channelizer(m_amDemod); + m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this); DSPEngine::instance()->addAudioSink(m_audioFifo); - DSPEngine::instance()->addThreadedSink(m_channelizer); + DSPEngine::instance()->addThreadedSink(m_threadedChannelizer); m_channelMarker = new ChannelMarker(this); m_channelMarker->setColor(Qt::yellow); @@ -222,7 +223,8 @@ AMDemodGUI::~AMDemodGUI() { m_pluginAPI->removeChannelInstance(this); DSPEngine::instance()->removeAudioSink(m_audioFifo); - DSPEngine::instance()->removeThreadedSink(m_channelizer); + DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer); + delete m_threadedChannelizer; delete m_channelizer; delete m_amDemod; delete m_audioFifo; diff --git a/plugins/channel/am/amdemodgui.h b/plugins/channel/am/amdemodgui.h index 836ab85b6..325139c99 100644 --- a/plugins/channel/am/amdemodgui.h +++ b/plugins/channel/am/amdemodgui.h @@ -8,6 +8,7 @@ class PluginAPI; class ChannelMarker; class AudioFifo; +class ThreadedSampleSink; class Channelizer; class AMDemod; @@ -51,6 +52,7 @@ private: bool m_doApplySettings; AudioFifo* m_audioFifo; + ThreadedSampleSink* m_threadedChannelizer; Channelizer* m_channelizer; AMDemod* m_amDemod; diff --git a/plugins/channel/chanalyzer/chanalyzergui.cpp b/plugins/channel/chanalyzer/chanalyzergui.cpp index eddc5ec54..8825f703c 100644 --- a/plugins/channel/chanalyzer/chanalyzergui.cpp +++ b/plugins/channel/chanalyzer/chanalyzergui.cpp @@ -1,6 +1,7 @@ #include #include #include "ui_chanalyzergui.h" +#include "dsp/threadedsamplesink.h" #include "dsp/channelizer.h" #include "dsp/spectrumscopecombovis.h" #include "dsp/spectrumvis.h" @@ -283,8 +284,9 @@ ChannelAnalyzerGUI::ChannelAnalyzerGUI(PluginAPI* pluginAPI, QWidget* parent) : m_spectrumScopeComboVis = new SpectrumScopeComboVis(m_spectrumVis, m_scopeVis); m_channelAnalyzer = new ChannelAnalyzer(m_spectrumScopeComboVis); m_channelizer = new Channelizer(m_channelAnalyzer); + m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this); connect(m_channelizer, SIGNAL(inputSampleRateChanged()), this, SLOT(channelSampleRateChanged())); - DSPEngine::instance()->addThreadedSink(m_channelizer); + DSPEngine::instance()->addThreadedSink(m_threadedChannelizer); ui->deltaFrequency->setColorMapper(ColorMapper(ColorMapper::ReverseGold)); @@ -315,7 +317,8 @@ ChannelAnalyzerGUI::ChannelAnalyzerGUI(PluginAPI* pluginAPI, QWidget* parent) : ChannelAnalyzerGUI::~ChannelAnalyzerGUI() { m_pluginAPI->removeChannelInstance(this); - DSPEngine::instance()->removeThreadedSink(m_channelizer); + DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer); + delete m_threadedChannelizer; delete m_channelizer; delete m_channelAnalyzer; delete m_spectrumVis; diff --git a/plugins/channel/chanalyzer/chanalyzergui.h b/plugins/channel/chanalyzer/chanalyzergui.h index feda0f4d9..22c54c91f 100644 --- a/plugins/channel/chanalyzer/chanalyzergui.h +++ b/plugins/channel/chanalyzer/chanalyzergui.h @@ -8,6 +8,7 @@ class PluginAPI; class ChannelMarker; //class AudioFifo; +class ThreadedSampleSink; class Channelizer; class ChannelAnalyzer; class SpectrumScopeComboVis; @@ -56,6 +57,7 @@ private: int m_rate; int m_spanLog2; + ThreadedSampleSink* m_threadedChannelizer; Channelizer* m_channelizer; ChannelAnalyzer* m_channelAnalyzer; SpectrumScopeComboVis* m_spectrumScopeComboVis; diff --git a/plugins/channel/lora/lorademodgui.cpp b/plugins/channel/lora/lorademodgui.cpp index 2321f01c9..583f850c8 100644 --- a/plugins/channel/lora/lorademodgui.cpp +++ b/plugins/channel/lora/lorademodgui.cpp @@ -155,7 +155,8 @@ LoRaDemodGUI::LoRaDemodGUI(PluginAPI* pluginAPI, QWidget* parent) : m_spectrumVis = new SpectrumVis(ui->glSpectrum); m_LoRaDemod = new LoRaDemod(m_spectrumVis); m_channelizer = new Channelizer(m_LoRaDemod); - DSPEngine::instance()->addThreadedSink(m_channelizer); + m_threadedChannelizer = new ThreadedSampleSink(m_channelizer); + DSPEngine::instance()->addThreadedSink(m_threadedChannelizer); ui->glSpectrum->setCenterFrequency(16000); ui->glSpectrum->setSampleRate(32000); @@ -180,7 +181,8 @@ LoRaDemodGUI::LoRaDemodGUI(PluginAPI* pluginAPI, QWidget* parent) : LoRaDemodGUI::~LoRaDemodGUI() { m_pluginAPI->removeChannelInstance(this); - DSPEngine::instance()->removeThreadedSink(m_channelizer); + DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer); + delete m_threadedChannelizer; delete m_channelizer; delete m_LoRaDemod; delete m_spectrumVis; diff --git a/plugins/channel/lora/lorademodgui.h b/plugins/channel/lora/lorademodgui.h index 5314b8c2a..4198d9a3d 100644 --- a/plugins/channel/lora/lorademodgui.h +++ b/plugins/channel/lora/lorademodgui.h @@ -8,6 +8,7 @@ class PluginAPI; class ChannelMarker; +class ThreadedSampleSink; class Channelizer; class LoRaDemod; class SpectrumVis; @@ -47,6 +48,7 @@ private: bool m_basicSettingsShown; bool m_doApplySettings; + ThreadedSampleSink* m_threadedChannelizer; Channelizer* m_channelizer; LoRaDemod* m_LoRaDemod; SpectrumVis* m_spectrumVis; diff --git a/plugins/channel/nfm/nfmdemodgui.cpp b/plugins/channel/nfm/nfmdemodgui.cpp index 68cf97286..8f3f56a7d 100644 --- a/plugins/channel/nfm/nfmdemodgui.cpp +++ b/plugins/channel/nfm/nfmdemodgui.cpp @@ -236,8 +236,9 @@ NFMDemodGUI::NFMDemodGUI(PluginAPI* pluginAPI, QWidget* parent) : //ui->deltaFrequency->setBold(true); m_channelizer = new Channelizer(m_nfmDemod); + m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this); DSPEngine::instance()->addAudioSink(m_audioFifo); - DSPEngine::instance()->addThreadedSink(m_channelizer); + DSPEngine::instance()->addThreadedSink(m_threadedChannelizer); m_channelMarker = new ChannelMarker(this); m_channelMarker->setColor(Qt::red); @@ -254,7 +255,8 @@ NFMDemodGUI::~NFMDemodGUI() { m_pluginAPI->removeChannelInstance(this); DSPEngine::instance()->removeAudioSink(m_audioFifo); - DSPEngine::instance()->removeThreadedSink(m_channelizer); + DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer); + delete m_threadedChannelizer; delete m_channelizer; delete m_nfmDemod; delete m_audioFifo; diff --git a/plugins/channel/nfm/nfmdemodgui.h b/plugins/channel/nfm/nfmdemodgui.h index 67b9a2330..3ccb721a5 100644 --- a/plugins/channel/nfm/nfmdemodgui.h +++ b/plugins/channel/nfm/nfmdemodgui.h @@ -9,6 +9,7 @@ class PluginAPI; class ChannelMarker; class AudioFifo; +class ThreadedSampleSink; class Channelizer; class NFMDemod; @@ -54,6 +55,7 @@ private: bool m_doApplySettings; AudioFifo* m_audioFifo; + ThreadedSampleSink* m_threadedChannelizer; Channelizer* m_channelizer; NFMDemod* m_nfmDemod; diff --git a/plugins/channel/ssb/ssbdemodgui.cpp b/plugins/channel/ssb/ssbdemodgui.cpp index df3451313..9a52521fb 100644 --- a/plugins/channel/ssb/ssbdemodgui.cpp +++ b/plugins/channel/ssb/ssbdemodgui.cpp @@ -259,8 +259,9 @@ SSBDemodGUI::SSBDemodGUI(PluginAPI* pluginAPI, QWidget* parent) : m_spectrumVis = new SpectrumVis(ui->glSpectrum); m_ssbDemod = new SSBDemod(m_audioFifo, m_spectrumVis); m_channelizer = new Channelizer(m_ssbDemod); + m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this); DSPEngine::instance()->addAudioSink(m_audioFifo); - DSPEngine::instance()->addThreadedSink(m_channelizer); + DSPEngine::instance()->addThreadedSink(m_threadedChannelizer); ui->deltaFrequency->setColorMapper(ColorMapper(ColorMapper::ReverseGold)); @@ -289,7 +290,8 @@ SSBDemodGUI::~SSBDemodGUI() { m_pluginAPI->removeChannelInstance(this); DSPEngine::instance()->removeAudioSink(m_audioFifo); - DSPEngine::instance()->removeThreadedSink(m_channelizer); + DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer); + delete m_threadedChannelizer; delete m_channelizer; delete m_ssbDemod; delete m_spectrumVis; diff --git a/plugins/channel/ssb/ssbdemodgui.h b/plugins/channel/ssb/ssbdemodgui.h index fe7753f47..5aa26c251 100644 --- a/plugins/channel/ssb/ssbdemodgui.h +++ b/plugins/channel/ssb/ssbdemodgui.h @@ -8,6 +8,7 @@ class PluginAPI; class ChannelMarker; class AudioFifo; +class ThreadedSampleSink; class Channelizer; class SSBDemod; class SpectrumVis; @@ -54,6 +55,7 @@ private: int m_spanLog2; AudioFifo* m_audioFifo; + ThreadedSampleSink* m_threadedChannelizer; Channelizer* m_channelizer; SSBDemod* m_ssbDemod; SpectrumVis* m_spectrumVis; diff --git a/plugins/channel/tcpsrc/tcpsrcgui.cpp b/plugins/channel/tcpsrc/tcpsrcgui.cpp index 116a094d9..821590d6b 100644 --- a/plugins/channel/tcpsrc/tcpsrcgui.cpp +++ b/plugins/channel/tcpsrc/tcpsrcgui.cpp @@ -1,6 +1,7 @@ #include "tcpsrcgui.h" #include "plugin/pluginapi.h" #include "tcpsrc.h" +#include "dsp/threadedsamplesink.h" #include "dsp/channelizer.h" #include "dsp/spectrumvis.h" #include "dsp/dspengine.h" @@ -176,7 +177,8 @@ TCPSrcGUI::TCPSrcGUI(PluginAPI* pluginAPI, QWidget* parent) : m_spectrumVis = new SpectrumVis(ui->glSpectrum); m_tcpSrc = new TCPSrc(m_pluginAPI->getMainWindowMessageQueue(), this, m_spectrumVis); m_channelizer = new Channelizer(m_tcpSrc); - DSPEngine::instance()->addThreadedSink(m_channelizer); + m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this); + DSPEngine::instance()->addThreadedSink(m_threadedChannelizer); ui->glSpectrum->setCenterFrequency(0); ui->glSpectrum->setSampleRate(ui->sampleRate->text().toInt()); @@ -200,7 +202,8 @@ TCPSrcGUI::TCPSrcGUI(PluginAPI* pluginAPI, QWidget* parent) : TCPSrcGUI::~TCPSrcGUI() { m_pluginAPI->removeChannelInstance(this); - DSPEngine::instance()->removeThreadedSink(m_channelizer); + DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer); + delete m_threadedChannelizer; delete m_channelizer; delete m_tcpSrc; delete m_spectrumVis; diff --git a/plugins/channel/tcpsrc/tcpsrcgui.h b/plugins/channel/tcpsrc/tcpsrcgui.h index 846589739..c4ebcb4c3 100644 --- a/plugins/channel/tcpsrc/tcpsrcgui.h +++ b/plugins/channel/tcpsrc/tcpsrcgui.h @@ -8,6 +8,7 @@ class PluginAPI; class ChannelMarker; +class ThreadedSampleSink; class Channelizer; class TCPSrc; class SpectrumVis; @@ -59,6 +60,7 @@ private: bool m_doApplySettings; // RF path + ThreadedSampleSink* m_threadedChannelizer; Channelizer* m_channelizer; SpectrumVis* m_spectrumVis; diff --git a/plugins/channel/wfm/wfmdemodgui.cpp b/plugins/channel/wfm/wfmdemodgui.cpp index ebe7b8f3b..c654ab08d 100644 --- a/plugins/channel/wfm/wfmdemodgui.cpp +++ b/plugins/channel/wfm/wfmdemodgui.cpp @@ -214,8 +214,9 @@ WFMDemodGUI::WFMDemodGUI(PluginAPI* pluginAPI, QWidget* parent) : m_audioFifo = new AudioFifo(4, 250000); // TODO: check. Room for 1s FIFO at max rate m_wfmDemod = new WFMDemod(m_audioFifo, 0); m_channelizer = new Channelizer(m_wfmDemod); + m_threadedChannelizer = new ThreadedSampleSink(m_channelizer, this); DSPEngine::instance()->addAudioSink(m_audioFifo); - DSPEngine::instance()->addThreadedSink(m_channelizer); + DSPEngine::instance()->addThreadedSink(m_threadedChannelizer); m_channelMarker = new ChannelMarker(this); m_channelMarker->setColor(Qt::blue); @@ -232,7 +233,8 @@ WFMDemodGUI::~WFMDemodGUI() { m_pluginAPI->removeChannelInstance(this); DSPEngine::instance()->removeAudioSink(m_audioFifo); - DSPEngine::instance()->removeThreadedSink(m_channelizer); + DSPEngine::instance()->removeThreadedSink(m_threadedChannelizer); + delete m_threadedChannelizer; delete m_channelizer; delete m_wfmDemod; delete m_audioFifo; diff --git a/plugins/channel/wfm/wfmdemodgui.h b/plugins/channel/wfm/wfmdemodgui.h index f018caf2a..e35929502 100644 --- a/plugins/channel/wfm/wfmdemodgui.h +++ b/plugins/channel/wfm/wfmdemodgui.h @@ -8,6 +8,7 @@ class PluginAPI; class ChannelMarker; class AudioFifo; +class ThreadedSampleSink; class Channelizer; class WFMDemod; @@ -51,6 +52,7 @@ private: bool m_doApplySettings; AudioFifo* m_audioFifo; + ThreadedSampleSink* m_threadedChannelizer; Channelizer* m_channelizer; WFMDemod* m_wfmDemod; diff --git a/sdrbase/dsp/channelizer.cpp b/sdrbase/dsp/channelizer.cpp index f89f8e545..938b4bd34 100644 --- a/sdrbase/dsp/channelizer.cpp +++ b/sdrbase/dsp/channelizer.cpp @@ -2,6 +2,7 @@ #include "dsp/inthalfbandfilter.h" #include "dsp/dspcommands.h" +#include #include MESSAGE_CLASS_DEFINITION(Channelizer::MsgChannelizerNotification, Message) @@ -14,7 +15,8 @@ Channelizer::Channelizer(SampleSink* sampleSink) : m_currentOutputSampleRate(0), m_currentCenterFrequency(0) { - setObjectName("Channelizer"); + QString name = "Channelizer(" + m_sampleSink->objectName() + ")"; + setObjectName(name); } Channelizer::~Channelizer() @@ -55,7 +57,7 @@ void Channelizer::start() { if(m_sampleSink != NULL) { - qDebug() << "Channelizer::start"; + qDebug() << "Channelizer::start: thread: " << thread(); m_sampleSink->start(); } } diff --git a/sdrbase/dsp/dspcommands.cpp b/sdrbase/dsp/dspcommands.cpp index ba39c2ac2..f6f8a19b4 100644 --- a/sdrbase/dsp/dspcommands.cpp +++ b/sdrbase/dsp/dspcommands.cpp @@ -27,8 +27,8 @@ MESSAGE_CLASS_DEFINITION(DSPGetErrorMessage, Message) MESSAGE_CLASS_DEFINITION(DSPSetSource, Message) MESSAGE_CLASS_DEFINITION(DSPAddSink, Message) MESSAGE_CLASS_DEFINITION(DSPRemoveSink, Message) -MESSAGE_CLASS_DEFINITION(DSPAddThreadedSink, Message) -MESSAGE_CLASS_DEFINITION(DSPRemoveThreadedSink, Message) +MESSAGE_CLASS_DEFINITION(DSPAddThreadedSampleSink, Message) +MESSAGE_CLASS_DEFINITION(DSPRemoveThreadedSampleSink, Message) MESSAGE_CLASS_DEFINITION(DSPAddAudioSink, Message) MESSAGE_CLASS_DEFINITION(DSPRemoveAudioSink, Message) //MESSAGE_CLASS_DEFINITION(DSPConfigureSpectrumVis, Message) diff --git a/sdrbase/dsp/dspengine.cpp b/sdrbase/dsp/dspengine.cpp index 442e5a8e1..e9624dbe6 100644 --- a/sdrbase/dsp/dspengine.cpp +++ b/sdrbase/dsp/dspengine.cpp @@ -131,17 +131,17 @@ void DSPEngine::removeSink(SampleSink* sink) m_syncMessenger.sendWait(cmd); } -void DSPEngine::addThreadedSink(SampleSink* sink) +void DSPEngine::addThreadedSink(ThreadedSampleSink* sink) { qDebug() << "DSPEngine::addThreadedSink: " << sink->objectName().toStdString().c_str(); - DSPAddThreadedSink cmd(sink); + DSPAddThreadedSampleSink cmd(sink); m_syncMessenger.sendWait(cmd); } -void DSPEngine::removeThreadedSink(SampleSink* sink) +void DSPEngine::removeThreadedSink(ThreadedSampleSink* sink) { qDebug() << "DSPEngine::removeThreadedSink: " << sink->objectName().toStdString().c_str(); - DSPRemoveThreadedSink cmd(sink); + DSPRemoveThreadedSampleSink cmd(sink); m_syncMessenger.sendWait(cmd); } @@ -418,7 +418,7 @@ DSPEngine::State DSPEngine::gotoInit() for (ThreadedSampleSinks::const_iterator it = m_threadedSampleSinks.begin(); it != m_threadedSampleSinks.end(); ++it) { qDebug() << " - initializing ThreadedSampleSink(" << (*it)->getSampleSinkObjectName().toStdString().c_str() << ")"; - (*it)->sendWaitSink(notif); + (*it)->handleSinkMessage(notif); } // pass data to listeners @@ -580,32 +580,17 @@ void DSPEngine::handleSynchronousMessages() m_sampleSinks.remove(sink); } - else if (DSPAddThreadedSink::match(*message)) + else if (DSPAddThreadedSampleSink::match(*message)) { - SampleSink* sink = ((DSPAddThreadedSink*) message)->getSampleSink(); - ThreadedSampleSink *threadedSink = new ThreadedSampleSink(sink); + ThreadedSampleSink *threadedSink = ((DSPAddThreadedSampleSink*) message)->getThreadedSampleSink(); m_threadedSampleSinks.push_back(threadedSink); threadedSink->start(); } - else if (DSPRemoveThreadedSink::match(*message)) + else if (DSPRemoveThreadedSampleSink::match(*message)) { - SampleSink* sink = ((DSPRemoveThreadedSink*) message)->getSampleSink(); - ThreadedSampleSinks::iterator threadedSinkIt = m_threadedSampleSinks.begin(); - - for (; threadedSinkIt != m_threadedSampleSinks.end(); ++threadedSinkIt) - { - if ((*threadedSinkIt)->getSink() == sink) - { - break; - } - } - - if (threadedSinkIt != m_threadedSampleSinks.end()) - { - (*threadedSinkIt)->stop(); - m_threadedSampleSinks.remove(*threadedSinkIt); - delete (*threadedSinkIt); - } + ThreadedSampleSink* threadedSink = ((DSPRemoveThreadedSampleSink*) message)->getThreadedSampleSink(); + threadedSink->stop(); + m_threadedSampleSinks.remove(threadedSink); } else if (DSPAddAudioSink::match(*message)) { @@ -684,7 +669,7 @@ void DSPEngine::handleSourceMessages() for (ThreadedSampleSinks::const_iterator it = m_threadedSampleSinks.begin(); it != m_threadedSampleSinks.end(); ++it) { qDebug() << "DSPEngine::handleSourceMessages: forward message to ThreadedSampleSink(" << (*it)->getSampleSinkObjectName().toStdString().c_str() << ")"; - (*it)->sendWaitSink(*message); + (*it)->handleSinkMessage(*message); } // forward changes to listeners on DSP output queue diff --git a/sdrbase/dsp/threadedsamplesink.cpp b/sdrbase/dsp/threadedsamplesink.cpp index 95ec7a872..7d532dcf4 100644 --- a/sdrbase/dsp/threadedsamplesink.cpp +++ b/sdrbase/dsp/threadedsamplesink.cpp @@ -1,60 +1,59 @@ #include #include +#include #include "dsp/threadedsamplesink.h" #include "dsp/dspcommands.h" #include "util/message.h" -ThreadedSampleSink::ThreadedSampleSink(SampleSink* sampleSink) : +ThreadedSampleSink::ThreadedSampleSink(SampleSink* sampleSink, QObject *parent) : m_sampleSink(sampleSink) { - moveToThread(this); + QString name = "ThreadedSampleSink(" + m_sampleSink->objectName() + ")"; + setObjectName(name); + + qDebug() << "ThreadedSampleSink::ThreadedSampleSink: " << name; + + m_thread = new QThread(parent); + moveToThread(m_thread); // FIXME: the intermediate FIFO should be handled within the sink. Define a new type of sink that is compatible with threading + m_sampleSink->moveToThread(m_thread); + m_sampleFifo.moveToThread(m_thread); + connect(&m_sampleFifo, SIGNAL(dataReady()), this, SLOT(handleData())); + m_sampleFifo.setSize(262144); + + qDebug() << "ThreadedSampleSink::ThreadedSampleSink: thread: " << thread() << " m_thread: " << m_thread; } ThreadedSampleSink::~ThreadedSampleSink() { - wait(); + delete m_thread; } void ThreadedSampleSink::start() { qDebug() << "ThreadedSampleSink::start"; - DSPPing cmd; - QThread::start(); - m_syncMessenger.sendWait(cmd); + m_thread->start(); + m_sampleSink->start(); } void ThreadedSampleSink::stop() { qDebug() << "ThreadedSampleSink::stop"; - exit(); - wait(); -} + m_sampleSink->stop(); + m_thread->exit(); + m_thread->wait(); + m_sampleFifo.readCommit(m_sampleFifo.fill()); -void ThreadedSampleSink::run() -{ - qDebug() << "ThreadedSampleSink::run"; - connect(&m_syncMessenger, SIGNAL(messageSent()), this, SLOT(handleSynchronousMessages()), Qt::QueuedConnection); - m_syncMessenger.done(); // Release start() that is waiting in calling trhead - exec(); } void ThreadedSampleSink::feed(SampleVector::const_iterator begin, SampleVector::const_iterator end, bool positiveOnly) { - m_sampleSink->feed(begin, end, positiveOnly); + // m_sampleSink->feed(begin, end, positiveOnly); + m_sampleFifo.write(begin, end); } -bool ThreadedSampleSink::sendWaitSink(Message& cmd) +bool ThreadedSampleSink::handleSinkMessage(Message& cmd) { - m_syncMessenger.sendWait(cmd); -} - -void ThreadedSampleSink::handleSynchronousMessages() -{ - qDebug() << "ThreadedSampleSink::handleSynchronousMessages"; - Message *message = m_syncMessenger.getMessage(); - qDebug() << " - message: " << message->getIdentifier(); - m_sampleSink->handleMessage(*message); // just delegate to the sink - m_syncMessenger.done(); + return m_sampleSink->handleMessage(cmd); } QString ThreadedSampleSink::getSampleSinkObjectName() const @@ -63,33 +62,44 @@ QString ThreadedSampleSink::getSampleSinkObjectName() const } -/* -void ThreadedSampleSink::handleData() +void ThreadedSampleSink::handleData() // FIXME: Move it to the new threadable sink class { bool positiveOnly = false; - while((m_sampleFifo.fill() > 0) && (m_messageQueue.countPending() == 0)) { + while ((m_sampleFifo.fill() > 0) && (m_sampleSink->getInputMessageQueue()->size() == 0)) + { SampleVector::iterator part1begin; SampleVector::iterator part1end; SampleVector::iterator part2begin; SampleVector::iterator part2end; - size_t count = m_sampleFifo.readBegin(m_sampleFifo.fill(), &part1begin, &part1end, &part2begin, &part2end); + std::size_t count = m_sampleFifo.readBegin(m_sampleFifo.fill(), &part1begin, &part1end, &part2begin, &part2end); // first part of FIFO data - if(count > 0) { + + if (count > 0) + { // handle data if(m_sampleSink != NULL) + { m_sampleSink->feed(part1begin, part1end, positiveOnly); + } + m_sampleFifo.readCommit(part1end - part1begin); } + // second part of FIFO data (used when block wraps around) - if(part2begin != part2end) { + + if(part2begin != part2end) + { // handle data if(m_sampleSink != NULL) + { m_sampleSink->feed(part2begin, part2end, positiveOnly); + } + m_sampleFifo.readCommit(part2end - part2begin); } } -}*/ +}