diff --git a/plugins/channeltx/remotesource/CMakeLists.txt b/plugins/channeltx/remotesource/CMakeLists.txt index 77738aa71..3bf447dc2 100644 --- a/plugins/channeltx/remotesource/CMakeLists.txt +++ b/plugins/channeltx/remotesource/CMakeLists.txt @@ -13,7 +13,7 @@ set(remotesource_SOURCES remotesource.cpp remotesourcebaseband.cpp remotesourcesource.cpp - remotesourcethread.cpp + remotesourceworker.cpp remotesourceplugin.cpp remotesourcesettings.cpp remotesourcewebapiadapter.cpp @@ -23,7 +23,7 @@ set(remotesource_HEADERS remotesource.h remotesourcebaseband.h remotesourcesource.h - remotesourcethread.h + remotesourceworker.h remotesourceplugin.h remotesourcesettings.h remotesourcewebapiadapter.cpp diff --git a/plugins/channeltx/remotesource/remotesourcesource.cpp b/plugins/channeltx/remotesource/remotesourcesource.cpp index d697347a9..4b43a7a8a 100644 --- a/plugins/channeltx/remotesource/remotesourcesource.cpp +++ b/plugins/channeltx/remotesource/remotesourcesource.cpp @@ -18,12 +18,12 @@ #include #include -#include "remotesourcethread.h" +#include "remotesourceworker.h" #include "remotesourcesource.h" RemoteSourceSource::RemoteSourceSource() : m_running(false), - m_sourceThread(nullptr), + m_sourceWorker(nullptr), m_nbCorrectableErrors(0), m_nbUncorrectableErrors(0), m_channelSampleRate(48000) @@ -90,9 +90,10 @@ void RemoteSourceSource::start() stop(); } - m_sourceThread = new RemoteSourceThread(&m_dataQueue); - m_sourceThread->startStop(true); - m_sourceThread->dataBind(m_settings.m_dataAddress, m_settings.m_dataPort); + m_sourceWorker = new RemoteSourceWorker(&m_dataQueue); + m_sourceWorker->moveToThread(&m_sourceWorkerThread); + startWorker(); + m_sourceWorker->dataBind(m_settings.m_dataAddress, m_settings.m_dataPort); m_running = true; } @@ -100,16 +101,29 @@ void RemoteSourceSource::stop() { qDebug("RemoteSourceSource::stop"); - if (m_sourceThread) + if (m_sourceWorker) { - m_sourceThread->startStop(false); - m_sourceThread->deleteLater(); - m_sourceThread = 0; + stopWorker(); + m_sourceWorker->deleteLater(); + m_sourceWorker = 0; } m_running = false; } +void RemoteSourceSource::startWorker() +{ + m_sourceWorker->startWork(); + m_sourceWorkerThread.start(); +} + +void RemoteSourceSource::stopWorker() +{ + m_sourceWorker->stopWork(); + m_sourceWorkerThread.quit(); + m_sourceWorkerThread.wait(); +} + void RemoteSourceSource::handleData() { RemoteDataBlock* dataBlock; @@ -238,8 +252,8 @@ void RemoteSourceSource::printMeta(const QString& header, RemoteMetaDataFEC *met void RemoteSourceSource::dataBind(const QString& dataAddress, uint16_t dataPort) { - if (m_sourceThread) { - m_sourceThread->dataBind(dataAddress, dataPort); + if (m_sourceWorker) { + m_sourceWorker->dataBind(dataAddress, dataPort); } m_settings.m_dataAddress = dataAddress; diff --git a/plugins/channeltx/remotesource/remotesourcesource.h b/plugins/channeltx/remotesource/remotesourcesource.h index 874593785..4d586251f 100644 --- a/plugins/channeltx/remotesource/remotesourcesource.h +++ b/plugins/channeltx/remotesource/remotesourcesource.h @@ -19,6 +19,7 @@ #define PLUGINS_CHANNELTX_REMOTESOURCE_REMOTESOURCESOURCE_H_ #include +#include #include "cm256cc/cm256.h" @@ -30,7 +31,7 @@ #include "remotesourcesettings.h" -class RemoteSourceThread; +class RemoteSourceWorker; class RemoteSourceSource : public QObject, public ChannelSampleSource { Q_OBJECT @@ -58,7 +59,8 @@ signals: private: bool m_running; - RemoteSourceThread *m_sourceThread; + RemoteSourceWorker *m_sourceWorker; + QThread m_sourceWorkerThread; RemoteDataQueue m_dataQueue; RemoteDataReadQueue m_dataReadQueue; CM256 m_cm256; @@ -76,6 +78,8 @@ private: bool m_interpolatorConsumed; Complex m_modSample; + void startWorker(); + void stopWorker(); void handleDataBlock(RemoteDataBlock *dataBlock); void printMeta(const QString& header, RemoteMetaDataFEC *metaData); void getSample(); diff --git a/plugins/channeltx/remotesource/remotesourcethread.cpp b/plugins/channeltx/remotesource/remotesourceworker.cpp similarity index 71% rename from plugins/channeltx/remotesource/remotesourcethread.cpp rename to plugins/channeltx/remotesource/remotesourceworker.cpp index 5a3f4c3e1..9dfa7e979 100644 --- a/plugins/channeltx/remotesource/remotesourcethread.cpp +++ b/plugins/channeltx/remotesource/remotesourceworker.cpp @@ -15,8 +15,6 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// -#include "remotesourcethread.h" - #include #include #include @@ -24,98 +22,57 @@ #include #include "cm256cc/cm256.h" +#include "remotesourceworker.h" +MESSAGE_CLASS_DEFINITION(RemoteSourceWorker::MsgDataBind, Message) -MESSAGE_CLASS_DEFINITION(RemoteSourceThread::MsgStartStop, Message) -MESSAGE_CLASS_DEFINITION(RemoteSourceThread::MsgDataBind, Message) - -RemoteSourceThread::RemoteSourceThread(RemoteDataQueue *dataQueue, QObject* parent) : - QThread(parent), +RemoteSourceWorker::RemoteSourceWorker(RemoteDataQueue *dataQueue, QObject* parent) : + QObject(parent), m_running(false), m_dataQueue(dataQueue), m_address(QHostAddress::LocalHost), - m_socket(0) + m_socket(nullptr) { std::fill(m_dataBlocks, m_dataBlocks+4, (RemoteDataBlock *) 0); connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); } -RemoteSourceThread::~RemoteSourceThread() +RemoteSourceWorker::~RemoteSourceWorker() { - qDebug("RemoteSourceThread::~RemoteSourceThread"); + qDebug("RemoteSourceWorker::~RemoteSourceWorker"); } -void RemoteSourceThread::startStop(bool start) -{ - MsgStartStop *msg = MsgStartStop::create(start); - m_inputMessageQueue.push(msg); -} - -void RemoteSourceThread::dataBind(const QString& address, uint16_t port) +void RemoteSourceWorker::dataBind(const QString& address, uint16_t port) { MsgDataBind *msg = MsgDataBind::create(address, port); m_inputMessageQueue.push(msg); } -void RemoteSourceThread::startWork() +void RemoteSourceWorker::startWork() { - qDebug("RemoteSourceThread::startWork"); - m_startWaitMutex.lock(); + qDebug("RemoteSourceWorker::startWork"); m_socket = new QUdpSocket(this); - start(); - while(!m_running) - m_startWaiter.wait(&m_startWaitMutex, 100); - m_startWaitMutex.unlock(); + m_running = false; } -void RemoteSourceThread::stopWork() +void RemoteSourceWorker::stopWork() { - qDebug("RemoteSourceThread::stopWork"); + qDebug("RemoteSourceWorker::stopWork"); delete m_socket; - m_socket = 0; + m_socket = nullptr; m_running = false; - wait(); } -void RemoteSourceThread::run() -{ - qDebug("RemoteSourceThread::run: begin"); - m_running = true; - m_startWaiter.wakeAll(); - - while (m_running) - { - sleep(1); // Do nothing as everything is in the data handler (dequeuer) - } - - m_running = false; - qDebug("RemoteSourceThread::run: end"); -} - - -void RemoteSourceThread::handleInputMessages() +void RemoteSourceWorker::handleInputMessages() { Message* message; while ((message = m_inputMessageQueue.pop()) != 0) { - if (MsgStartStop::match(*message)) - { - MsgStartStop* notif = (MsgStartStop*) message; - qDebug("RemoteSourceThread::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop"); - - if (notif->getStartStop()) { - startWork(); - } else { - stopWork(); - } - - delete message; - } - else if (MsgDataBind::match(*message)) + if (MsgDataBind::match(*message)) { MsgDataBind* notif = (MsgDataBind*) message; - qDebug("RemoteSourceThread::handleInputMessages: MsgDataBind: %s:%d", qPrintable(notif->getAddress().toString()), notif->getPort()); + qDebug("RemoteSourceWorker::handleInputMessages: MsgDataBind: %s:%d", qPrintable(notif->getAddress().toString()), notif->getPort()); if (m_socket) { @@ -127,7 +84,7 @@ void RemoteSourceThread::handleInputMessages() } } -void RemoteSourceThread::readPendingDatagrams() +void RemoteSourceWorker::readPendingDatagrams() { RemoteSuperBlock superBlock; qint64 size; @@ -160,7 +117,7 @@ void RemoteSourceThread::readPendingDatagrams() if (superBlock.m_header.m_frameIndex != frameIndex) { - //qDebug("RemoteSourceThread::readPendingDatagrams: push frame %u", frameIndex); + //qDebug("RemoteSourceWorker::readPendingDatagrams: push frame %u", frameIndex); m_dataQueue->push(m_dataBlocks[dataBlockIndex]); m_dataBlocks[dataBlockIndex] = new RemoteDataBlock(); m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex; @@ -183,7 +140,7 @@ void RemoteSourceThread::readPendingDatagrams() } else { - qWarning("RemoteSourceThread::readPendingDatagrams: wrong super block size not processing"); + qWarning("RemoteSourceWorker::readPendingDatagrams: wrong super block size not processing"); } } } diff --git a/plugins/channeltx/remotesource/remotesourcethread.h b/plugins/channeltx/remotesource/remotesourceworker.h similarity index 73% rename from plugins/channeltx/remotesource/remotesourcethread.h rename to plugins/channeltx/remotesource/remotesourceworker.h index 556e35f55..b98d455ba 100644 --- a/plugins/channeltx/remotesource/remotesourcethread.h +++ b/plugins/channeltx/remotesource/remotesourceworker.h @@ -1,5 +1,5 @@ /////////////////////////////////////////////////////////////////////////////////// -// Copyright (C) 2018-2019 Edouard Griffiths, F4EXB // +// Copyright (C) 2018-2020 Edouard Griffiths, F4EXB // // // // This program is free software; you can redistribute it and/or modify // // it under the terms of the GNU General Public License as published by // @@ -15,12 +15,10 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// -#ifndef PLUGINS_CHANNELTX_REMOTESRC_REMOTESRCTHREAD_H_ -#define PLUGINS_CHANNELTX_REMOTESRC_REMOTESRCTHREAD_H_ +#ifndef PLUGINS_CHANNELTX_REMOTESRC_REMOTESRCWORKER_H_ +#define PLUGINS_CHANNELTX_REMOTESRC_REMOTESRCWORKER_H_ -#include -#include -#include +#include #include #include "util/message.h" @@ -30,28 +28,9 @@ class RemoteDataQueue; class RemoteDataBlock; class QUdpSocket; -class RemoteSourceThread : public QThread { +class RemoteSourceWorker : public QObject { Q_OBJECT public: - class MsgStartStop : public Message { - MESSAGE_CLASS_DECLARATION - - public: - bool getStartStop() const { return m_startStop; } - - static MsgStartStop* create(bool startStop) { - return new MsgStartStop(startStop); - } - - protected: - bool m_startStop; - - MsgStartStop(bool startStop) : - Message(), - m_startStop(startStop) - { } - }; - class MsgDataBind : public Message { MESSAGE_CLASS_DECLARATION @@ -75,15 +54,14 @@ public: } }; - RemoteSourceThread(RemoteDataQueue *dataQueue, QObject* parent = 0); - ~RemoteSourceThread(); + RemoteSourceWorker(RemoteDataQueue *dataQueue, QObject* parent = 0); + ~RemoteSourceWorker(); - void startStop(bool start); + void startWork(); + void stopWork(); void dataBind(const QString& address, uint16_t port); private: - QMutex m_startWaitMutex; - QWaitCondition m_startWaiter; volatile bool m_running; MessageQueue m_inputMessageQueue; @@ -95,11 +73,6 @@ private: static const uint32_t m_nbDataBlocks = 4; //!< number of data blocks in the ring buffer RemoteDataBlock *m_dataBlocks[m_nbDataBlocks]; //!< ring buffer of data blocks indexed by frame affinity - void startWork(); - void stopWork(); - - void run(); - private slots: void handleInputMessages(); void readPendingDatagrams(); @@ -107,4 +80,4 @@ private slots: -#endif /* PLUGINS_CHANNELTX_REMOTESRC_REMOTESRCTHREAD_H_ */ +#endif /* PLUGINS_CHANNELTX_REMOTESRC_REMOTESRCWORKER_H_ */