diff --git a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkthread.cpp b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkthread.cpp index 9c9d01816..bbe966cb8 100644 --- a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkthread.cpp +++ b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkthread.cpp @@ -48,6 +48,7 @@ SDRdaemonSinkThread::~SDRdaemonSinkThread() void SDRdaemonSinkThread::startWork() { qDebug() << "SDRdaemonSinkThread::startWork: "; + m_udpSinkFEC.start(); m_maxThrottlems = 0; m_startWaitMutex.lock(); m_elapsedTimer.start(); @@ -62,6 +63,7 @@ void SDRdaemonSinkThread::stopWork() qDebug() << "SDRdaemonSinkThread::stopWork"; m_running = false; wait(); + m_udpSinkFEC.stop(); } void SDRdaemonSinkThread::setSamplerate(int samplerate) diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp b/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp index 8916fa443..cd1b6fb0f 100644 --- a/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp @@ -34,33 +34,39 @@ UDPSinkFEC::UDPSinkFEC() : m_txBlockIndex(0), m_txBlocksIndex(0), m_frameCount(0), - m_sampleIndex(0) + m_sampleIndex(0), + m_udpWorker(0), + m_remoteAddress("127.0.0.1"), + m_remotePort(9090) { memset((char *) m_txBlocks, 0, 4*256*sizeof(SDRDaemonSuperBlock)); memset((char *) &m_superBlock, 0, sizeof(SDRDaemonSuperBlock)); m_currentMetaFEC.init(); m_bufMeta = new uint8_t[m_udpSize]; m_buf = new uint8_t[m_udpSize]; - m_udpThread = new QThread(); - m_udpWorker = new UDPSinkFECWorker(); - - m_udpWorker->moveToThread(m_udpThread); - - connect(m_udpThread, SIGNAL(started()), m_udpWorker, SLOT(process())); - connect(m_udpWorker, SIGNAL(finished()), m_udpThread, SLOT(quit())); - - m_udpThread->start(); } UDPSinkFEC::~UDPSinkFEC() { - m_udpWorker->stop(); - m_udpThread->wait(); - delete[] m_buf; delete[] m_bufMeta; - delete m_udpWorker; - delete m_udpThread; +} + +void UDPSinkFEC::start() +{ + m_udpWorker = new UDPSinkFECWorker(); + m_udpWorker->setRemoteAddress(m_remoteAddress, m_remotePort); + m_udpWorker->startStop(true); +} + +void UDPSinkFEC::stop() +{ + if (m_udpWorker) + { + m_udpWorker->startStop(false); + m_udpWorker->deleteLater(); + m_udpWorker = 0; + } } void UDPSinkFEC::setTxDelay(float txDelayRatio) @@ -93,7 +99,12 @@ void UDPSinkFEC::setSampleRate(uint32_t sampleRate) void UDPSinkFEC::setRemoteAddress(const QString& address, uint16_t port) { qDebug() << "UDPSinkFEC::setRemoteAddress: address: " << address << " port: " << port; - m_udpWorker->setRemoteAddress(address, port); + m_remoteAddress = address; + m_remotePort = port; + + if (m_udpWorker) { + m_udpWorker->setRemoteAddress(m_remoteAddress, m_remotePort); + } } void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunkSize) @@ -183,7 +194,9 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk int nbBlocksFEC = m_nbBlocksFEC; int txDelay = m_txDelay; - m_udpWorker->pushTxFrame(m_txBlocks[m_txBlocksIndex], nbBlocksFEC, txDelay, m_frameCount); + if (m_udpWorker) { + m_udpWorker->pushTxFrame(m_txBlocks[m_txBlocksIndex], nbBlocksFEC, txDelay, m_frameCount); + } m_txBlocksIndex = (m_txBlocksIndex + 1) % 4; m_txBlockIndex = 0; diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfec.h b/plugins/samplesink/sdrdaemonsink/udpsinkfec.h index 2aea27916..36055522b 100644 --- a/plugins/samplesink/sdrdaemonsink/udpsinkfec.h +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfec.h @@ -23,7 +23,6 @@ #include #include #include -#include #include "dsp/dsptypes.h" #include "util/CRC64.h" @@ -46,6 +45,9 @@ public: /** Destroy UDP sink */ ~UDPSinkFEC(); + void start(); + void stop(); + /** * Write IQ samples */ @@ -95,8 +97,9 @@ private: uint16_t m_frameCount; //!< transmission frame count int m_sampleIndex; //!< Current sample index in protected block data - QThread *m_udpThread; UDPSinkFECWorker *m_udpWorker; + QString m_remoteAddress; + uint16_t m_remotePort; }; #endif /* PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFEC_H_ */ diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.cpp b/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.cpp index 7ced93f6d..57693afb8 100644 --- a/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.cpp +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.cpp @@ -18,6 +18,7 @@ MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgUDPFECEncodeAndSend, Message) MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgConfigureRemoteAddress, Message) +MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgStartStop, Message) UDPSinkFECWorker::UDPSinkFECWorker() : m_running(false), @@ -29,8 +30,45 @@ UDPSinkFECWorker::UDPSinkFECWorker() : UDPSinkFECWorker::~UDPSinkFECWorker() { - disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); - m_inputMessageQueue.clear(); +} + +void UDPSinkFECWorker::startStop(bool start) +{ + MsgStartStop *msg = MsgStartStop::create(start); + m_inputMessageQueue.push(msg); +} + +void UDPSinkFECWorker::startWork() +{ + qDebug("UDPSinkFECWorker::startWork"); + m_startWaitMutex.lock(); + start(); + while(!m_running) + m_startWaiter.wait(&m_startWaitMutex, 100); + m_startWaitMutex.unlock(); +} + +void UDPSinkFECWorker::stopWork() +{ + qDebug("UDPSinkFECWorker::stopWork"); + m_running = false; + wait(); +} + +void UDPSinkFECWorker::run() +{ + m_running = true; + m_startWaiter.wakeAll(); + + qDebug("UDPSinkFECWorker::process: started"); + + while (m_running) + { + sleep(1); + } + m_running = false; + + qDebug("UDPSinkFECWorker::process: stopped"); } void UDPSinkFECWorker::pushTxFrame(SDRDaemonSuperBlock *txBlocks, @@ -47,26 +85,6 @@ void UDPSinkFECWorker::setRemoteAddress(const QString& address, uint16_t port) m_inputMessageQueue.push(MsgConfigureRemoteAddress::create(address, port)); } -void UDPSinkFECWorker::process() -{ - m_running = true; - - qDebug("UDPSinkFECWorker::process: started"); - - while (m_running) - { - usleep(250000); - } - - qDebug("UDPSinkFECWorker::process: stopped"); - emit finished(); -} - -void UDPSinkFECWorker::stop() -{ - m_running = false; -} - void UDPSinkFECWorker::handleInputMessages() { Message* message; @@ -85,6 +103,17 @@ void UDPSinkFECWorker::handleInputMessages() m_remoteAddress = addressMsg->getAddress(); m_remotePort = addressMsg->getPort(); } + else if (MsgStartStop::match(*message)) + { + MsgStartStop* notif = (MsgStartStop*) message; + qDebug("DaemonSinkThread::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop"); + + if (notif->getStartStop()) { + startWork(); + } else { + stopWork(); + } + } delete message; } diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.h b/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.h index 6ff53d30f..0a9aa7a48 100644 --- a/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.h +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.h @@ -17,7 +17,9 @@ #ifndef PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFECWORKER_H_ #define PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFECWORKER_H_ -#include +#include +#include +#include #include "cm256.h" @@ -27,7 +29,7 @@ #include "UDPSocket.h" -class UDPSinkFECWorker : public QObject +class UDPSinkFECWorker : public QThread { Q_OBJECT public: @@ -89,31 +91,50 @@ public: {} }; + class MsgStartStop : public Message { + MESSAGE_CLASS_DECLARATION + + public: + bool getStartStop() const { return m_startStop; } + + static MsgStartStop* create(bool startStop) { + return new MsgStartStop(startStop); + } + + protected: + bool m_startStop; + + MsgStartStop(bool startStop) : + Message(), + m_startStop(startStop) + { } + }; + UDPSinkFECWorker(); ~UDPSinkFECWorker(); + void startStop(bool start); + void pushTxFrame(SDRDaemonSuperBlock *txBlocks, uint32_t nbBlocksFEC, uint32_t txDelay, uint16_t frameIndex); void setRemoteAddress(const QString& address, uint16_t port); - void stop(); MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication -signals: - void finished(); - -public slots: - void process(); - private slots: void handleInputMessages(); private: + void startWork(); + void stopWork(); + void run(); void encodeAndTransmit(SDRDaemonSuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay); - volatile bool m_running; + QMutex m_startWaitMutex; + QWaitCondition m_startWaiter; + bool m_running; CM256 m_cm256; //!< CM256 library object bool m_cm256Valid; //!< true if CM256 library is initialized correctly UDPSocket m_socket;