diff --git a/plugins/samplesink/remoteoutput/CMakeLists.txt b/plugins/samplesink/remoteoutput/CMakeLists.txt index ac420d660..09e9b0a20 100644 --- a/plugins/samplesink/remoteoutput/CMakeLists.txt +++ b/plugins/samplesink/remoteoutput/CMakeLists.txt @@ -16,7 +16,8 @@ set(remoteoutput_SOURCES remoteoutputwebapiadapter.cpp remoteoutputthread.cpp udpsinkfec.cpp - udpsinkfecworker.cpp + remoteoutputsender.cpp + remoteoutputfifo.cpp ) set(remoteoutput_HEADERS @@ -26,28 +27,27 @@ set(remoteoutput_HEADERS remoteoutputwebapiadapter.h remoteoutputthread.h udpsinkfec.h - udpsinkfecworker.h + remoteoutputsender.h + remoteoutputfifo.h ) include_directories( - ${CMAKE_SOURCE_DIR}/swagger/sdrangel/code/qt5/client + ${CMAKE_SOURCE_DIR}/swagger/sdrangel/code/qt5/client ${CMAKE_SOURCE_DIR}/devices - ${Boost_INCLUDE_DIRS} - ${CM256CC_INCLUDE_DIR} + ${Boost_INCLUDE_DIRS} + ${CM256CC_INCLUDE_DIR} ) if(NOT SERVER_MODE) set(remoteoutput_SOURCES ${remoteoutput_SOURCES} remoteoutputgui.cpp - remoteoutputgui.ui ) set(remoteoutput_HEADERS ${remoteoutput_HEADERS} remoteoutputgui.h ) - set(TARGET_NAME outputremote) set(TARGET_LIB "Qt5::Widgets") set(TARGET_LIB_GUI "sdrgui") @@ -68,12 +68,12 @@ if(ENABLE_EXTERNAL_LIBRARIES) endif() target_link_libraries(${TARGET_NAME} - Qt5::Core - ${TARGET_LIB} + Qt5::Core + ${TARGET_LIB} sdrbase ${TARGET_LIB_GUI} - swagger - ${CM256CC_LIBRARIES} + swagger + ${CM256CC_LIBRARIES} ) install(TARGETS ${TARGET_NAME} DESTINATION ${INSTALL_FOLDER}) diff --git a/plugins/samplesink/remoteoutput/remoteoutputfifo.cpp b/plugins/samplesink/remoteoutput/remoteoutputfifo.cpp new file mode 100644 index 000000000..8beee691e --- /dev/null +++ b/plugins/samplesink/remoteoutput/remoteoutputfifo.cpp @@ -0,0 +1,95 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2019 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#include "remoteoutputfifo.h" + +RemoteOutputFifo::RemoteOutputFifo(QObject *parent) : + QObject(parent) +{} + +RemoteOutputFifo::RemoteOutputFifo(unsigned int size, QObject *parent) : + QObject(parent) +{ + resize(size); +} + +RemoteOutputFifo::~RemoteOutputFifo() +{} + +void RemoteOutputFifo::resize(unsigned int size) +{ + QMutexLocker mutexLocker(&m_mutex); + m_size = size; + m_data.resize(m_size); + m_readHead = 0; + m_servedHead = 0; + m_writeHead = 0; +} + +void RemoteOutputFifo::reset() +{ + m_readHead = 0; + m_servedHead = 0; + m_writeHead = 0; +} + +RemoteDataBlock *RemoteOutputFifo::getDataBlock() +{ + QMutexLocker mutexLocker(&m_mutex); + m_servedHead = m_writeHead; + + if (m_writeHead < m_size - 1) { + m_writeHead++; + } else { + m_writeHead = 0; + } + + emit dataBlockServed(); + return &m_data[m_servedHead]; +} + +unsigned int RemoteOutputFifo::readDataBlock(RemoteDataBlock **dataBlock) +{ + QMutexLocker mutexLocker(&m_mutex); + + if (calculateRemainder() == 0) + { + *dataBlock = nullptr; + return 0; + } + else + { + *dataBlock = &m_data[m_readHead]; + m_readHead = m_readHead < m_size - 1 ? m_readHead + 1 : 0; + return calculateRemainder(); + } +} + +unsigned int RemoteOutputFifo::getRemainder() +{ + QMutexLocker mutexLocker(&m_mutex); + return calculateRemainder(); +} + +unsigned int RemoteOutputFifo::calculateRemainder() +{ + if (m_readHead <= m_servedHead) { + return m_servedHead - m_readHead; + } else { + return m_size - (m_readHead - m_servedHead); + } +} \ No newline at end of file diff --git a/plugins/samplesink/remoteoutput/remoteoutputfifo.h b/plugins/samplesink/remoteoutput/remoteoutputfifo.h new file mode 100644 index 000000000..113c779bf --- /dev/null +++ b/plugins/samplesink/remoteoutput/remoteoutputfifo.h @@ -0,0 +1,55 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2019 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef REMOTESINK_REMOTEOUTPUTFIFO_H_ +#define REMOTESINK_REMOTEOUTPUTFIFO_H_ + +#include + +#include +#include + +#include "channel/remotedatablock.h" + +class RemoteOutputFifo : public QObject { + Q_OBJECT +public: + RemoteOutputFifo(QObject *parent = nullptr); + RemoteOutputFifo(unsigned int size, QObject *parent = nullptr); + ~RemoteOutputFifo(); + void resize(unsigned int size); + void reset(); + + RemoteDataBlock *getDataBlock(); + unsigned int readDataBlock(RemoteDataBlock **dataBlock); + unsigned int getRemainder(); + +signals: + void dataBlockServed(); + +private: + std::vector m_data; + int m_size; + unsigned int m_readHead; //!< index of last data block processed + unsigned int m_servedHead; //!< index of last data block served + unsigned int m_writeHead; //!< index of next data block to serve + QMutex m_mutex; + + unsigned int calculateRemainder(); +}; + +#endif // REMOTESINK_REMOTEOUTPUTFIFO_H_ \ No newline at end of file diff --git a/plugins/samplesink/remoteoutput/remoteoutputplugin.cpp b/plugins/samplesink/remoteoutput/remoteoutputplugin.cpp index 2aa09ae6d..446f222b1 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputplugin.cpp +++ b/plugins/samplesink/remoteoutput/remoteoutputplugin.cpp @@ -30,7 +30,7 @@ const PluginDescriptor RemoteOutputPlugin::m_pluginDescriptor = { QString("Remote device output"), - QString("4.12.0"), + QString("4.12.2"), QString("(c) Edouard Griffiths, F4EXB"), QString("https://github.com/f4exb/sdrangel"), true, diff --git a/plugins/samplesink/remoteoutput/remoteoutputsender.cpp b/plugins/samplesink/remoteoutput/remoteoutputsender.cpp new file mode 100644 index 000000000..87380fdf8 --- /dev/null +++ b/plugins/samplesink/remoteoutput/remoteoutputsender.cpp @@ -0,0 +1,165 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2018-2019 Edouard Griffiths, F4EXB. // +// // +// Remote sink channel (Rx) UDP sender thread // +// // +// SDRangel can work as a detached SDR front end. With this plugin it can // +// sends the I/Q samples stream to another SDRangel instance via UDP. // +// It is controlled via a Web REST API. // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + + +#include +#include +#include +#include + +#include + +#include "cm256cc/cm256.h" + +#include "channel/remotedatablock.h" +#include "remoteoutputsender.h" + +RemoteOutputSender::RemoteOutputSender() : + m_fifo(20, this), + m_udpSocket(nullptr), + m_remotePort(9090) +{ + qDebug("RemoteOutputSender::RemoteOutputSender"); + m_cm256p = m_cm256.isInitialized() ? &m_cm256 : nullptr; + m_udpSocket = new QUdpSocket(this); + + QObject::connect( + &m_fifo, + &RemoteOutputFifo::dataBlockServed, + this, + &RemoteOutputSender::handleData, + Qt::QueuedConnection + ); +} + +RemoteOutputSender::~RemoteOutputSender() +{ + qDebug("RemoteOutputSender::~RemoteOutputSender"); + delete m_udpSocket; +} + +void RemoteOutputSender::setDestination(const QString& address, uint16_t port) +{ + m_remoteAddress = address; + m_remotePort = port; + m_remoteHostAddress.setAddress(address); +} + +RemoteDataBlock *RemoteOutputSender::getDataBlock() +{ + return m_fifo.getDataBlock(); +} + +void RemoteOutputSender::handleData() +{ + RemoteDataBlock *dataBlock; + unsigned int remainder = m_fifo.getRemainder(); + + while (remainder != 0) + { + remainder = m_fifo.readDataBlock(&dataBlock); + + if (dataBlock) { + sendDataBlock(dataBlock); + } + } +} + +void RemoteOutputSender::sendDataBlock(RemoteDataBlock *dataBlock) +{ + CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder + CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder + RemoteProtectedBlock fecBlocks[256]; //!< FEC data + + uint16_t frameIndex = dataBlock->m_txControlBlock.m_frameIndex; + int nbBlocksFEC = dataBlock->m_txControlBlock.m_nbBlocksFEC; + int txDelay = dataBlock->m_txControlBlock.m_txDelay; + m_remoteHostAddress.setAddress(dataBlock->m_txControlBlock.m_dataAddress); + uint16_t dataPort = dataBlock->m_txControlBlock.m_dataPort; + RemoteSuperBlock *txBlockx = dataBlock->m_superBlocks; + + if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode + { + if (m_udpSocket) + { + for (int i = 0; i < RemoteNbOrginalBlocks; i++) + { + // send block via UDP + m_udpSocket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_remoteHostAddress, dataPort); + std::this_thread::sleep_for(std::chrono::microseconds(txDelay)); + } + } + } + else + { + cm256Params.BlockBytes = sizeof(RemoteProtectedBlock); + cm256Params.OriginalCount = RemoteNbOrginalBlocks; + cm256Params.RecoveryCount = nbBlocksFEC; + + // Fill pointers to data + for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; ++i) + { + if (i >= cm256Params.OriginalCount) { + memset((void *) &txBlockx[i].m_protectedBlock, 0, sizeof(RemoteProtectedBlock)); + } + + txBlockx[i].m_header.m_frameIndex = frameIndex; + txBlockx[i].m_header.m_blockIndex = i; + txBlockx[i].m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); + txBlockx[i].m_header.m_sampleBits = SDR_RX_SAMP_SZ; + descriptorBlocks[i].Block = (void *) &(txBlockx[i].m_protectedBlock); + descriptorBlocks[i].Index = txBlockx[i].m_header.m_blockIndex; + } + + // Encode FEC blocks + if (m_cm256p->cm256_encode(cm256Params, descriptorBlocks, fecBlocks)) + { + qWarning("RemoteSinkSender::handleDataBlock: CM256 encode failed. Transmit without FEC."); + cm256Params.RecoveryCount = 0; + RemoteSuperBlock& superBlock = dataBlock->m_superBlocks[0]; // first block + RemoteMetaDataFEC *destMeta = (RemoteMetaDataFEC *) &superBlock.m_protectedBlock; + destMeta->m_nbFECBlocks = 0; + boost::crc_32_type crc32; + crc32.process_bytes(destMeta, sizeof(RemoteMetaDataFEC)-4); + destMeta->m_crc32 = crc32.checksum(); + } + + // Merge FEC with data to transmit + for (int i = 0; i < cm256Params.RecoveryCount; i++) { + txBlockx[i + cm256Params.OriginalCount].m_protectedBlock = fecBlocks[i]; + } + + // Transmit all blocks + if (m_udpSocket) + { + for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) + { + // send block via UDP + m_udpSocket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_remoteHostAddress, dataPort); + std::this_thread::sleep_for(std::chrono::microseconds(txDelay)); + } + } + } + + dataBlock->m_txControlBlock.m_processed = true; +} diff --git a/plugins/samplesink/remoteoutput/remoteoutputsender.h b/plugins/samplesink/remoteoutput/remoteoutputsender.h new file mode 100644 index 000000000..a7ec2e369 --- /dev/null +++ b/plugins/samplesink/remoteoutput/remoteoutputsender.h @@ -0,0 +1,71 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2018-2019 Edouard Griffiths, F4EXB. // +// // +// Remote sink channel (Rx) UDP sender thread // +// // +// SDRangel can work as a detached SDR front end. With this plugin it can // +// sends the I/Q samples stream to another SDRangel instance via UDP. // +// It is controlled via a Web REST API. // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// (at your option) any later version. // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef REMOTEOUTPUT_REMOTEOUTPUTSENDER_H_ +#define REMOTEOUTPUT_REMOTEOUTPUTSENDER_H_ + +#include +#include +#include +#include + +#include "cm256cc/cm256.h" + +#include "util/message.h" +#include "util/messagequeue.h" + +#include "remoteoutputfifo.h" + +class RemoteDataBlock; +class CM256; +class QUdpSocket; + +class RemoteOutputSender : public QObject { + Q_OBJECT + +public: + RemoteOutputSender(); + ~RemoteOutputSender(); + + RemoteDataBlock *getDataBlock(); + void setDestination(const QString& address, uint16_t port); + +private: + RemoteOutputFifo m_fifo; + CM256 m_cm256; //!< CM256 library object + CM256 *m_cm256p; + bool m_cm256Valid; //!< true if CM256 library is initialized correctly + + QUdpSocket *m_udpSocket; + QString m_remoteAddress; + uint16_t m_remotePort; + QHostAddress m_remoteHostAddress; + + void sendDataBlock(RemoteDataBlock *dataBlock); + +private slots: + void handleData(); +}; + +#endif // REMOTEOUTPUT_REMOTEOUTPUTSENDER_H_ + diff --git a/plugins/samplesink/remoteoutput/remoteoutputthread.cpp b/plugins/samplesink/remoteoutput/remoteoutputthread.cpp index 26fe10fff..8e907b2ea 100644 --- a/plugins/samplesink/remoteoutput/remoteoutputthread.cpp +++ b/plugins/samplesink/remoteoutput/remoteoutputthread.cpp @@ -49,7 +49,7 @@ RemoteOutputThread::~RemoteOutputThread() void RemoteOutputThread::startWork() { qDebug() << "RemoteOutputThread::startWork: "; - m_udpSinkFEC.start(); + m_udpSinkFEC.startSender(); m_maxThrottlems = 0; m_startWaitMutex.lock(); m_elapsedTimer.start(); @@ -64,7 +64,7 @@ void RemoteOutputThread::stopWork() qDebug() << "RemoteOutputThread::stopWork"; m_running = false; wait(); - m_udpSinkFEC.stop(); + m_udpSinkFEC.stopSender(); } void RemoteOutputThread::setSamplerate(int samplerate) diff --git a/plugins/samplesink/remoteoutput/udpsinkfec.cpp b/plugins/samplesink/remoteoutput/udpsinkfec.cpp index 619833769..ed7152eb1 100644 --- a/plugins/samplesink/remoteoutput/udpsinkfec.cpp +++ b/plugins/samplesink/remoteoutput/udpsinkfec.cpp @@ -17,6 +17,7 @@ #include "udpsinkfec.h" +#include #include #include @@ -24,8 +25,7 @@ #include "util/timeutil.h" -#include "udpsinkfecworker.h" - +#include "remoteoutputsender.h" UDPSinkFEC::UDPSinkFEC() : m_sampleRate(48000), @@ -33,42 +33,42 @@ UDPSinkFEC::UDPSinkFEC() : m_nbBlocksFEC(0), m_txDelayRatio(0.0), m_txDelay(0), + m_dataBlock(nullptr), m_txBlockIndex(0), m_txBlocksIndex(0), m_frameCount(0), m_sampleIndex(0), - m_udpWorker(0), + m_remoteOutputSender(nullptr), + m_senderThread(nullptr), m_remoteAddress("127.0.0.1"), m_remotePort(9090) { - memset((char *) m_txBlocks, 0, 4*256*sizeof(RemoteSuperBlock)); memset((char *) &m_superBlock, 0, sizeof(RemoteSuperBlock)); m_currentMetaFEC.init(); - m_bufMeta = new uint8_t[m_udpSize]; - m_buf = new uint8_t[m_udpSize]; + + m_senderThread = new QThread(this); + m_remoteOutputSender = new RemoteOutputSender(); + m_remoteOutputSender->moveToThread(m_senderThread); } UDPSinkFEC::~UDPSinkFEC() { - delete[] m_buf; - delete[] m_bufMeta; + delete m_remoteOutputSender; + delete m_senderThread; } -void UDPSinkFEC::start() +void UDPSinkFEC::startSender() { - m_udpWorker = new UDPSinkFECWorker(); - m_udpWorker->setRemoteAddress(m_remoteAddress, m_remotePort); - m_udpWorker->startStop(true); + qDebug("UDPSinkFEC::startSender"); + m_remoteOutputSender->setDestination(m_remoteAddress, m_remotePort); + m_senderThread->start(); } -void UDPSinkFEC::stop() +void UDPSinkFEC::stopSender() { - if (m_udpWorker) - { - m_udpWorker->startStop(false); - m_udpWorker->deleteLater(); - m_udpWorker = 0; - } + qDebug("UDPSinkFEC::stopSender"); + m_senderThread->exit(); + m_senderThread->wait(); } void UDPSinkFEC::setTxDelay(float txDelayRatio) @@ -78,10 +78,14 @@ void UDPSinkFEC::setTxDelay(float txDelayRatio) // divided by sample rate gives the frame process time // divided by the number of actual blocks including FEC blocks gives the block (i.e. UDP block) process time m_txDelayRatio = txDelayRatio; - int samplesPerBlock = RemoteNbBytesPerBlock / (SDR_RX_SAMP_SZ <= 16 ? 4 : 8); - double delay = ((127*samplesPerBlock*txDelayRatio) / m_sampleRate)/(128 + m_nbBlocksFEC); + int samplesPerBlock = RemoteNbBytesPerBlock / sizeof(Sample); + double delay = m_sampleRate == 0 ? 1.0 : (127*samplesPerBlock*txDelayRatio) / m_sampleRate; + delay /= 128 + m_nbBlocksFEC; m_txDelay = delay * 1e6; - qDebug() << "UDPSinkFEC::setTxDelay: txDelay: " << txDelayRatio << " m_txDelay: " << m_txDelay << " us"; + qDebug() << "UDPSinkFEC::setTxDelay:" + << "txDelay:" << txDelayRatio + << "m_txDelay:" << m_txDelay << " us" + << "m_sampleRate:" << m_sampleRate; } void UDPSinkFEC::setNbBlocksFEC(uint32_t nbBlocksFEC) @@ -103,49 +107,49 @@ void UDPSinkFEC::setRemoteAddress(const QString& address, uint16_t port) qDebug() << "UDPSinkFEC::setRemoteAddress: address: " << address << " port: " << port; m_remoteAddress = address; m_remotePort = port; - - if (m_udpWorker) { - m_udpWorker->setRemoteAddress(m_remoteAddress, m_remotePort); - } + m_remoteOutputSender->setDestination(m_remoteAddress, m_remotePort); } void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunkSize) { const SampleVector::iterator end = begin + sampleChunkSize; - SampleVector::iterator it = begin; + + SampleVector::const_iterator it = begin; while (it != end) { + int inSamplesIndex = it - begin; int inRemainingSamples = end - it; - if (m_txBlockIndex == 0) // Tx block index 0 is a block with only meta data + if (m_txBlockIndex == 0) { RemoteMetaDataFEC metaData; - - uint64_t ts_usecs = TimeUtil::nowus(); + uint64_t nowus = TimeUtil::nowus(); metaData.m_centerFrequency = 0; // frequency not set by stream metaData.m_sampleRate = m_sampleRate; metaData.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); metaData.m_sampleBits = SDR_RX_SAMP_SZ; - metaData.m_nbOriginalBlocks = m_nbOriginalBlocks; + metaData.m_nbOriginalBlocks = RemoteNbOrginalBlocks; metaData.m_nbFECBlocks = m_nbBlocksFEC; - metaData.m_tv_sec = ts_usecs / 1000000UL; - metaData.m_tv_usec = ts_usecs % 1000000UL; + metaData.m_tv_sec = nowus / 1000000UL; // tv.tv_sec; + metaData.m_tv_usec = nowus % 1000000UL; // tv.tv_usec; + + if (!m_dataBlock) { // on the very first cycle there is no data block allocated + m_dataBlock = m_remoteOutputSender->getDataBlock(); // ask a new block to sender + } boost::crc_32_type crc32; crc32.process_bytes(&metaData, sizeof(RemoteMetaDataFEC)-4); - metaData.m_crc32 = crc32.checksum(); + RemoteSuperBlock& superBlock = m_dataBlock->m_superBlocks[0]; // first block + superBlock.init(); + superBlock.m_header.m_frameIndex = m_frameCount; + superBlock.m_header.m_blockIndex = m_txBlockIndex; + superBlock.m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); + superBlock.m_header.m_sampleBits = SDR_RX_SAMP_SZ; - memset((char *) &m_superBlock, 0, sizeof(m_superBlock)); - - m_superBlock.m_header.m_frameIndex = m_frameCount; - m_superBlock.m_header.m_blockIndex = m_txBlockIndex; - m_superBlock.m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); - m_superBlock.m_header.m_sampleBits = SDR_RX_SAMP_SZ; - - RemoteMetaDataFEC *destMeta = (RemoteMetaDataFEC *) &m_superBlock.m_protectedBlock; + RemoteMetaDataFEC *destMeta = (RemoteMetaDataFEC *) &superBlock.m_protectedBlock; *destMeta = metaData; if (!(metaData == m_currentMetaFEC)) @@ -158,30 +162,28 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk << "|" << (int) metaData.m_nbOriginalBlocks << ":" << (int) metaData.m_nbFECBlocks << "|" << metaData.m_tv_sec - << ":" << metaData.m_tv_usec - << "|"; + << ":" << metaData.m_tv_usec; m_currentMetaFEC = metaData; } - m_txBlocks[m_txBlocksIndex][0] = m_superBlock; m_txBlockIndex = 1; // next Tx block with data - } + } // block zero + // handle different sample sizes... int samplesPerBlock = RemoteNbBytesPerBlock / (SDR_RX_SAMP_SZ <= 16 ? 4 : 8); // two I or Q samples - if (m_sampleIndex + inRemainingSamples < samplesPerBlock) // there is still room in the current super block { - memcpy((char *) &m_superBlock.m_protectedBlock.buf[m_sampleIndex*sizeof(Sample)], - (const char *) &(*it), + memcpy((void *) &m_superBlock.m_protectedBlock.buf[m_sampleIndex*sizeof(Sample)], + (const void *) &(*(begin+inSamplesIndex)), inRemainingSamples * sizeof(Sample)); m_sampleIndex += inRemainingSamples; it = end; // all input samples are consumed } else // complete super block and initiate the next if not end of frame { - memcpy((char *) &m_superBlock.m_protectedBlock.buf[m_sampleIndex*sizeof(Sample)], - (const char *) &(*it), + memcpy((void *) &m_superBlock.m_protectedBlock.buf[m_sampleIndex*sizeof(Sample)], + (const void *) &(*(begin+inSamplesIndex)), (samplesPerBlock - m_sampleIndex) * sizeof(Sample)); it += samplesPerBlock - m_sampleIndex; m_sampleIndex = 0; @@ -190,18 +192,20 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk m_superBlock.m_header.m_blockIndex = m_txBlockIndex; m_superBlock.m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); m_superBlock.m_header.m_sampleBits = SDR_RX_SAMP_SZ; - m_txBlocks[m_txBlocksIndex][m_txBlockIndex] = m_superBlock; + m_dataBlock->m_superBlocks[m_txBlockIndex] = m_superBlock; - if (m_txBlockIndex == m_nbOriginalBlocks - 1) // frame complete + if (m_txBlockIndex == RemoteNbOrginalBlocks - 1) // frame complete { - int nbBlocksFEC = m_nbBlocksFEC; - int txDelay = m_txDelay; + m_dataBlock->m_txControlBlock.m_frameIndex = m_frameCount; + m_dataBlock->m_txControlBlock.m_processed = false; + m_dataBlock->m_txControlBlock.m_complete = true; + m_dataBlock->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC; + m_dataBlock->m_txControlBlock.m_txDelay = m_txDelay; + m_dataBlock->m_txControlBlock.m_dataAddress = m_remoteAddress; + m_dataBlock->m_txControlBlock.m_dataPort = m_remotePort; - if (m_udpWorker) { - m_udpWorker->pushTxFrame(m_txBlocks[m_txBlocksIndex], nbBlocksFEC, txDelay, m_frameCount); - } + m_dataBlock = m_remoteOutputSender->getDataBlock(); // ask a new block to sender - m_txBlocksIndex = (m_txBlocksIndex + 1) % 4; m_txBlockIndex = 0; m_frameCount++; } diff --git a/plugins/samplesink/remoteoutput/udpsinkfec.h b/plugins/samplesink/remoteoutput/udpsinkfec.h index b20a893b1..dadac61d4 100644 --- a/plugins/samplesink/remoteoutput/udpsinkfec.h +++ b/plugins/samplesink/remoteoutput/udpsinkfec.h @@ -29,7 +29,8 @@ #include "dsp/dsptypes.h" #include "util/CRC64.h" -class UDPSinkFECWorker; +class QThread; +class RemoteOutputSender; class UDPSinkFEC : public QObject { @@ -46,8 +47,8 @@ public: /** Destroy UDP sink */ ~UDPSinkFEC(); - void start(); - void stop(); + void startSender(); + void stopSender(); /** * Write IQ samples @@ -80,25 +81,22 @@ private: uint32_t m_sampleRate; //!< sample rate in Hz uint32_t m_nbSamples; //!< total number of samples sent int the last frame - QHostAddress m_ownAddress; CRC64 m_crc64; - uint8_t* m_bufMeta; - uint8_t* m_buf; - RemoteMetaDataFEC m_currentMetaFEC; //!< Meta data for current frame uint32_t m_nbBlocksFEC; //!< Variable number of FEC blocks float m_txDelayRatio; //!< Delay in ratio of nominal frame period uint32_t m_txDelay; //!< Delay in microseconds (usleep) between each sending of an UDP datagram - RemoteSuperBlock m_txBlocks[4][256]; //!< UDP blocks to send with original data + FEC + RemoteDataBlock *m_dataBlock; RemoteSuperBlock m_superBlock; //!< current super block being built int m_txBlockIndex; //!< Current index in blocks to transmit in the Tx row int m_txBlocksIndex; //!< Current index of Tx blocks row uint16_t m_frameCount; //!< transmission frame count int m_sampleIndex; //!< Current sample index in protected block data - UDPSinkFECWorker *m_udpWorker; + RemoteOutputSender *m_remoteOutputSender; + QThread *m_senderThread; QString m_remoteAddress; uint16_t m_remotePort; };