From 6406a0ba50c0558f1de09726bef2c3ce3fde5d15 Mon Sep 17 00:00:00 2001 From: f4exb Date: Sun, 16 Sep 2018 11:23:24 +0200 Subject: [PATCH] SDRDaemonSink: separate files for UDPSinkFECWorker --- .../samplesink/sdrdaemonsink/CMakeLists.txt | 2 + .../samplesink/sdrdaemonsink/udpsinkfec.cpp | 140 +-------------- plugins/samplesink/sdrdaemonsink/udpsinkfec.h | 103 ----------- .../sdrdaemonsink/udpsinkfecworker.cpp | 161 ++++++++++++++++++ .../sdrdaemonsink/udpsinkfecworker.h | 124 ++++++++++++++ 5 files changed, 288 insertions(+), 242 deletions(-) create mode 100644 plugins/samplesink/sdrdaemonsink/udpsinkfecworker.cpp create mode 100644 plugins/samplesink/sdrdaemonsink/udpsinkfecworker.h diff --git a/plugins/samplesink/sdrdaemonsink/CMakeLists.txt b/plugins/samplesink/sdrdaemonsink/CMakeLists.txt index a2e7e9e29..d6e7e4c2c 100644 --- a/plugins/samplesink/sdrdaemonsink/CMakeLists.txt +++ b/plugins/samplesink/sdrdaemonsink/CMakeLists.txt @@ -18,6 +18,7 @@ set(sdrdaemonsink_SOURCES sdrdaemonsinksettings.cpp sdrdaemonsinkthread.cpp udpsinkfec.cpp + udpsinkfecworker.cpp UDPSocket.cpp ) @@ -28,6 +29,7 @@ set(sdrdaemonsink_HEADERS sdrdaemonsinksettings.h sdrdaemonsinkthread.h udpsinkfec.h + udpsinkfecworker.h UDPSocket.h ) diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp b/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp index 404644728..8916fa443 100644 --- a/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp @@ -22,9 +22,7 @@ #include #include "udpsinkfec.h" - -MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgUDPFECEncodeAndSend, Message) -MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgConfigureRemoteAddress, Message) +#include "udpsinkfecworker.h" UDPSinkFEC::UDPSinkFEC() : @@ -199,139 +197,3 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk } } -UDPSinkFECWorker::UDPSinkFECWorker() : - m_running(false), - m_remotePort(9090) -{ - m_cm256Valid = m_cm256.isInitialized(); - connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::DirectConnection); -} - -UDPSinkFECWorker::~UDPSinkFECWorker() -{ - disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); - m_inputMessageQueue.clear(); -} - -void UDPSinkFECWorker::pushTxFrame(SDRDaemonSuperBlock *txBlocks, - uint32_t nbBlocksFEC, - uint32_t txDelay, - uint16_t frameIndex) -{ - //qDebug("UDPSinkFECWorker::pushTxFrame. %d", m_inputMessageQueue.size()); - m_inputMessageQueue.push(MsgUDPFECEncodeAndSend::create(txBlocks, nbBlocksFEC, txDelay, frameIndex)); -} - -void UDPSinkFECWorker::setRemoteAddress(const QString& address, uint16_t port) -{ - m_inputMessageQueue.push(MsgConfigureRemoteAddress::create(address, port)); -} - -void UDPSinkFECWorker::process() -{ - m_running = true; - - qDebug("UDPSinkFECWorker::process: started"); - - while (m_running) - { - usleep(250000); - } - - qDebug("UDPSinkFECWorker::process: stopped"); - emit finished(); -} - -void UDPSinkFECWorker::stop() -{ - m_running = false; -} - -void UDPSinkFECWorker::handleInputMessages() -{ - Message* message; - - while ((message = m_inputMessageQueue.pop()) != 0) - { - if (MsgUDPFECEncodeAndSend::match(*message)) - { - MsgUDPFECEncodeAndSend *sendMsg = (MsgUDPFECEncodeAndSend *) message; - encodeAndTransmit(sendMsg->getTxBlocks(), sendMsg->getFrameIndex(), sendMsg->getNbBlocsFEC(), sendMsg->getTxDelay()); - } - else if (MsgConfigureRemoteAddress::match(*message)) - { - qDebug("UDPSinkFECWorker::handleInputMessages: %s", message->getIdentifier()); - MsgConfigureRemoteAddress *addressMsg = (MsgConfigureRemoteAddress *) message; - m_remoteAddress = addressMsg->getAddress(); - m_remotePort = addressMsg->getPort(); - } - - delete message; - } -} - -void UDPSinkFECWorker::encodeAndTransmit(SDRDaemonSuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay) -{ - CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder - CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder - SDRDaemonProtectedBlock fecBlocks[256]; //!< FEC data - - if ((nbBlocksFEC == 0) || !m_cm256Valid) - { - for (unsigned int i = 0; i < UDPSinkFEC::m_nbOriginalBlocks; i++) - { - m_socket.SendDataGram((const void *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress.toStdString(), (uint32_t) m_remotePort); - //m_udpSocket->writeDatagram((const char *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress, m_remotePort); - usleep(txDelay); - } - } - else - { - cm256Params.BlockBytes = sizeof(SDRDaemonProtectedBlock); - cm256Params.OriginalCount = UDPSinkFEC::m_nbOriginalBlocks; - cm256Params.RecoveryCount = nbBlocksFEC; - - - // Fill pointers to data - for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; ++i) - { - if (i >= cm256Params.OriginalCount) { - memset((char *) &txBlockx[i].m_protectedBlock, 0, sizeof(SDRDaemonProtectedBlock)); - } - - txBlockx[i].m_header.m_frameIndex = frameIndex; - txBlockx[i].m_header.m_blockIndex = i; - txBlockx[i].m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); - txBlockx[i].m_header.m_sampleBits = SDR_RX_SAMP_SZ; - descriptorBlocks[i].Block = (void *) &(txBlockx[i].m_protectedBlock); - descriptorBlocks[i].Index = txBlockx[i].m_header.m_blockIndex; - } - - // Encode FEC blocks - if (m_cm256.cm256_encode(cm256Params, descriptorBlocks, fecBlocks)) - { - qDebug("UDPSinkFECWorker::encodeAndTransmit: CM256 encode failed. No transmission."); - return; - } - - // Merge FEC with data to transmit - for (int i = 0; i < cm256Params.RecoveryCount; i++) - { - txBlockx[i + cm256Params.OriginalCount].m_protectedBlock = fecBlocks[i]; - } - - // Transmit all blocks - - for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) - { -#ifdef SDRDAEMON_PUNCTURE - if (i == SDRDAEMON_PUNCTURE) { - continue; - } -#endif - - m_socket.SendDataGram((const void *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress.toStdString(), (uint32_t) m_remotePort); - usleep(txDelay); - } - } -} diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfec.h b/plugins/samplesink/sdrdaemonsink/udpsinkfec.h index 308e5db39..2aea27916 100644 --- a/plugins/samplesink/sdrdaemonsink/udpsinkfec.h +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfec.h @@ -25,16 +25,10 @@ #include #include -#include "cm256.h" - #include "dsp/dsptypes.h" #include "util/CRC64.h" -#include "util/messagequeue.h" -#include "util/message.h" #include "channel/sdrdaemondatablock.h" -#include "UDPSocket.h" - class UDPSinkFECWorker; class UDPSinkFEC : public QObject @@ -105,101 +99,4 @@ private: UDPSinkFECWorker *m_udpWorker; }; - -class UDPSinkFECWorker : public QObject -{ - Q_OBJECT -public: - class MsgUDPFECEncodeAndSend : public Message - { - MESSAGE_CLASS_DECLARATION - public: - SDRDaemonSuperBlock *getTxBlocks() const { return m_txBlockx; } - uint32_t getNbBlocsFEC() const { return m_nbBlocksFEC; } - uint32_t getTxDelay() const { return m_txDelay; } - uint16_t getFrameIndex() const { return m_frameIndex; } - - static MsgUDPFECEncodeAndSend* create( - SDRDaemonSuperBlock *txBlocks, - uint32_t nbBlocksFEC, - uint32_t txDelay, - uint16_t frameIndex) - { - return new MsgUDPFECEncodeAndSend(txBlocks, nbBlocksFEC, txDelay, frameIndex); - } - - private: - SDRDaemonSuperBlock *m_txBlockx; - uint32_t m_nbBlocksFEC; - uint32_t m_txDelay; - uint16_t m_frameIndex; - - MsgUDPFECEncodeAndSend( - SDRDaemonSuperBlock *txBlocks, - uint32_t nbBlocksFEC, - uint32_t txDelay, - uint16_t frameIndex) : - m_txBlockx(txBlocks), - m_nbBlocksFEC(nbBlocksFEC), - m_txDelay(txDelay), - m_frameIndex(frameIndex) - {} - }; - - class MsgConfigureRemoteAddress : public Message - { - MESSAGE_CLASS_DECLARATION - public: - const QString& getAddress() const { return m_address; } - uint16_t getPort() const { return m_port; } - - static MsgConfigureRemoteAddress* create(const QString& address, uint16_t port) - { - return new MsgConfigureRemoteAddress(address, port); - } - - private: - QString m_address; - uint16_t m_port; - - MsgConfigureRemoteAddress(const QString& address, uint16_t port) : - m_address(address), - m_port(port) - {} - }; - - UDPSinkFECWorker(); - ~UDPSinkFECWorker(); - - void pushTxFrame(SDRDaemonSuperBlock *txBlocks, - uint32_t nbBlocksFEC, - uint32_t txDelay, - uint16_t frameIndex); - void setRemoteAddress(const QString& address, uint16_t port); - void stop(); - - MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication - -signals: - void finished(); - -public slots: - void process(); - -private slots: - void handleInputMessages(); - -private: - void encodeAndTransmit(SDRDaemonSuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay); - - volatile bool m_running; - CM256 m_cm256; //!< CM256 library object - bool m_cm256Valid; //!< true if CM256 library is initialized correctly - UDPSocket m_socket; - QString m_remoteAddress; - uint16_t m_remotePort; -}; - - - #endif /* PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFEC_H_ */ diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.cpp b/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.cpp new file mode 100644 index 000000000..7ced93f6d --- /dev/null +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.cpp @@ -0,0 +1,161 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2017 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "udpsinkfecworker.h" + +MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgUDPFECEncodeAndSend, Message) +MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgConfigureRemoteAddress, Message) + +UDPSinkFECWorker::UDPSinkFECWorker() : + m_running(false), + m_remotePort(9090) +{ + m_cm256Valid = m_cm256.isInitialized(); + connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::DirectConnection); +} + +UDPSinkFECWorker::~UDPSinkFECWorker() +{ + disconnect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages())); + m_inputMessageQueue.clear(); +} + +void UDPSinkFECWorker::pushTxFrame(SDRDaemonSuperBlock *txBlocks, + uint32_t nbBlocksFEC, + uint32_t txDelay, + uint16_t frameIndex) +{ + //qDebug("UDPSinkFECWorker::pushTxFrame. %d", m_inputMessageQueue.size()); + m_inputMessageQueue.push(MsgUDPFECEncodeAndSend::create(txBlocks, nbBlocksFEC, txDelay, frameIndex)); +} + +void UDPSinkFECWorker::setRemoteAddress(const QString& address, uint16_t port) +{ + m_inputMessageQueue.push(MsgConfigureRemoteAddress::create(address, port)); +} + +void UDPSinkFECWorker::process() +{ + m_running = true; + + qDebug("UDPSinkFECWorker::process: started"); + + while (m_running) + { + usleep(250000); + } + + qDebug("UDPSinkFECWorker::process: stopped"); + emit finished(); +} + +void UDPSinkFECWorker::stop() +{ + m_running = false; +} + +void UDPSinkFECWorker::handleInputMessages() +{ + Message* message; + + while ((message = m_inputMessageQueue.pop()) != 0) + { + if (MsgUDPFECEncodeAndSend::match(*message)) + { + MsgUDPFECEncodeAndSend *sendMsg = (MsgUDPFECEncodeAndSend *) message; + encodeAndTransmit(sendMsg->getTxBlocks(), sendMsg->getFrameIndex(), sendMsg->getNbBlocsFEC(), sendMsg->getTxDelay()); + } + else if (MsgConfigureRemoteAddress::match(*message)) + { + qDebug("UDPSinkFECWorker::handleInputMessages: %s", message->getIdentifier()); + MsgConfigureRemoteAddress *addressMsg = (MsgConfigureRemoteAddress *) message; + m_remoteAddress = addressMsg->getAddress(); + m_remotePort = addressMsg->getPort(); + } + + delete message; + } +} + +void UDPSinkFECWorker::encodeAndTransmit(SDRDaemonSuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay) +{ + CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder + CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder + SDRDaemonProtectedBlock fecBlocks[256]; //!< FEC data + + if ((nbBlocksFEC == 0) || !m_cm256Valid) + { + for (unsigned int i = 0; i < SDRDaemonNbOrginalBlocks; i++) + { + m_socket.SendDataGram((const void *) &txBlockx[i], (int) SDRDaemonUdpSize, m_remoteAddress.toStdString(), (uint32_t) m_remotePort); + //m_udpSocket->writeDatagram((const char *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress, m_remotePort); + usleep(txDelay); + } + } + else + { + cm256Params.BlockBytes = sizeof(SDRDaemonProtectedBlock); + cm256Params.OriginalCount = SDRDaemonNbOrginalBlocks; + cm256Params.RecoveryCount = nbBlocksFEC; + + + // Fill pointers to data + for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; ++i) + { + if (i >= cm256Params.OriginalCount) { + memset((char *) &txBlockx[i].m_protectedBlock, 0, sizeof(SDRDaemonProtectedBlock)); + } + + txBlockx[i].m_header.m_frameIndex = frameIndex; + txBlockx[i].m_header.m_blockIndex = i; + txBlockx[i].m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); + txBlockx[i].m_header.m_sampleBits = SDR_RX_SAMP_SZ; + descriptorBlocks[i].Block = (void *) &(txBlockx[i].m_protectedBlock); + descriptorBlocks[i].Index = txBlockx[i].m_header.m_blockIndex; + } + + // Encode FEC blocks + if (m_cm256.cm256_encode(cm256Params, descriptorBlocks, fecBlocks)) + { + qDebug("UDPSinkFECWorker::encodeAndTransmit: CM256 encode failed. No transmission."); + return; + } + + // Merge FEC with data to transmit + for (int i = 0; i < cm256Params.RecoveryCount; i++) + { + txBlockx[i + cm256Params.OriginalCount].m_protectedBlock = fecBlocks[i]; + } + + // Transmit all blocks + + for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) + { +#ifdef SDRDAEMON_PUNCTURE + if (i == SDRDAEMON_PUNCTURE) { + continue; + } +#endif + + m_socket.SendDataGram((const void *) &txBlockx[i], (int) SDRDaemonUdpSize, m_remoteAddress.toStdString(), (uint32_t) m_remotePort); + usleep(txDelay); + } + } +} + + + + diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.h b/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.h new file mode 100644 index 000000000..6ff53d30f --- /dev/null +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfecworker.h @@ -0,0 +1,124 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2017 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFECWORKER_H_ +#define PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFECWORKER_H_ + +#include + +#include "cm256.h" + +#include "util/messagequeue.h" +#include "util/message.h" +#include "channel/sdrdaemondatablock.h" + +#include "UDPSocket.h" + +class UDPSinkFECWorker : public QObject +{ + Q_OBJECT +public: + class MsgUDPFECEncodeAndSend : public Message + { + MESSAGE_CLASS_DECLARATION + public: + SDRDaemonSuperBlock *getTxBlocks() const { return m_txBlockx; } + uint32_t getNbBlocsFEC() const { return m_nbBlocksFEC; } + uint32_t getTxDelay() const { return m_txDelay; } + uint16_t getFrameIndex() const { return m_frameIndex; } + + static MsgUDPFECEncodeAndSend* create( + SDRDaemonSuperBlock *txBlocks, + uint32_t nbBlocksFEC, + uint32_t txDelay, + uint16_t frameIndex) + { + return new MsgUDPFECEncodeAndSend(txBlocks, nbBlocksFEC, txDelay, frameIndex); + } + + private: + SDRDaemonSuperBlock *m_txBlockx; + uint32_t m_nbBlocksFEC; + uint32_t m_txDelay; + uint16_t m_frameIndex; + + MsgUDPFECEncodeAndSend( + SDRDaemonSuperBlock *txBlocks, + uint32_t nbBlocksFEC, + uint32_t txDelay, + uint16_t frameIndex) : + m_txBlockx(txBlocks), + m_nbBlocksFEC(nbBlocksFEC), + m_txDelay(txDelay), + m_frameIndex(frameIndex) + {} + }; + + class MsgConfigureRemoteAddress : public Message + { + MESSAGE_CLASS_DECLARATION + public: + const QString& getAddress() const { return m_address; } + uint16_t getPort() const { return m_port; } + + static MsgConfigureRemoteAddress* create(const QString& address, uint16_t port) + { + return new MsgConfigureRemoteAddress(address, port); + } + + private: + QString m_address; + uint16_t m_port; + + MsgConfigureRemoteAddress(const QString& address, uint16_t port) : + m_address(address), + m_port(port) + {} + }; + + UDPSinkFECWorker(); + ~UDPSinkFECWorker(); + + void pushTxFrame(SDRDaemonSuperBlock *txBlocks, + uint32_t nbBlocksFEC, + uint32_t txDelay, + uint16_t frameIndex); + void setRemoteAddress(const QString& address, uint16_t port); + void stop(); + + MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication + +signals: + void finished(); + +public slots: + void process(); + +private slots: + void handleInputMessages(); + +private: + void encodeAndTransmit(SDRDaemonSuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay); + + volatile bool m_running; + CM256 m_cm256; //!< CM256 library object + bool m_cm256Valid; //!< true if CM256 library is initialized correctly + UDPSocket m_socket; + QString m_remoteAddress; + uint16_t m_remotePort; +}; + +#endif /* PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFECWORKER_H_ */