From 8b703a1302e681f5cf0df4b99bb71a89638280a0 Mon Sep 17 00:00:00 2001 From: f4exb Date: Sun, 21 May 2017 04:19:12 +0200 Subject: [PATCH] SDRdaemonSink: added UDPSinkFEC class --- .../samplesink/sdrdaemonsink/CMakeLists.txt | 5 + .../sdrdaemonsink/sdrdaemonsinkgui.cpp | 16 +- .../sdrdaemonsink/sdrdaemonsinkgui.ui | 2 +- .../sdrdaemonsink/sdrdaemonsinkoutput.cpp | 56 ++-- .../sdrdaemonsink/sdrdaemonsinkoutput.h | 74 ++--- .../sdrdaemonsink/sdrdaemonsinkthread.cpp | 73 +++-- .../samplesink/sdrdaemonsink/udpsinkfec.cpp | 298 ++++++++++++++++++ plugins/samplesink/sdrdaemonsink/udpsinkfec.h | 248 +++++++++++++++ 8 files changed, 663 insertions(+), 109 deletions(-) create mode 100644 plugins/samplesink/sdrdaemonsink/udpsinkfec.cpp create mode 100644 plugins/samplesink/sdrdaemonsink/udpsinkfec.h diff --git a/plugins/samplesink/sdrdaemonsink/CMakeLists.txt b/plugins/samplesink/sdrdaemonsink/CMakeLists.txt index 71169dd28..3feb21a95 100644 --- a/plugins/samplesink/sdrdaemonsink/CMakeLists.txt +++ b/plugins/samplesink/sdrdaemonsink/CMakeLists.txt @@ -6,6 +6,7 @@ set(sdrdaemonsink_SOURCES # sdrdaemonsinkplugin.cpp sdrdaemonsinksettings.cpp # sdrdaemonsinkthread.cpp + udpsinkfec.cpp ) set(sdrdaemonsink_HEADERS @@ -14,6 +15,7 @@ set(sdrdaemonsink_HEADERS # sdrdaemonsinkplugin.h sdrdaemonsinksettings.h # sdrdaemonsinkthread.h + udpsinkfec.h ) set(sdrdaemonsink_FORMS @@ -23,6 +25,8 @@ set(sdrdaemonsink_FORMS include_directories( . ${CMAKE_CURRENT_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/devices + ${CM256CC_INCLUDE_DIR} ) add_definitions(${QT_DEFINITIONS}) @@ -40,6 +44,7 @@ add_library(outputsdrdaemonsink SHARED target_link_libraries(outputsdrdaemonsink ${QT_LIBRARIES} sdrbase + ${CM256CC_LIBRARIES} ) qt5_use_modules(outputsdrdaemonsink Core Widgets) diff --git a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.cpp b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.cpp index 4d32a99b1..8fc81904f 100644 --- a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.cpp +++ b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.cpp @@ -65,7 +65,7 @@ FileSinkGui::FileSinkGui(DeviceSinkAPI *deviceAPI, QWidget* parent) : displaySettings(); - m_deviceSampleSink = new FileSinkOutput(m_deviceAPI, m_deviceAPI->getMainWindow()->getMasterTimer()); + m_deviceSampleSink = new SDRdaemonSinkOutput(m_deviceAPI, m_deviceAPI->getMainWindow()->getMasterTimer()); connect(m_deviceSampleSink->getOutputMessageQueueToGUI(), SIGNAL(messageEnqueued()), this, SLOT(handleSinkMessages())); m_deviceAPI->setSink(m_deviceSampleSink); @@ -130,15 +130,15 @@ bool FileSinkGui::deserialize(const QByteArray& data) bool FileSinkGui::handleMessage(const Message& message) { - if (FileSinkOutput::MsgReportFileSinkGeneration::match(message)) + if (SDRdaemonSinkOutput::MsgReportSDRdaemonSinkGeneration::match(message)) { - m_generation = ((FileSinkOutput::MsgReportFileSinkGeneration&)message).getAcquisition(); + m_generation = ((SDRdaemonSinkOutput::MsgReportSDRdaemonSinkGeneration&)message).getAcquisition(); updateWithGeneration(); return true; } - else if (FileSinkOutput::MsgReportFileSinkStreamTiming::match(message)) + else if (SDRdaemonSinkOutput::MsgReportSDRdaemonSinkStreamTiming::match(message)) { - m_samplesCount = ((FileSinkOutput::MsgReportFileSinkStreamTiming&)message).getSamplesCount(); + m_samplesCount = ((SDRdaemonSinkOutput::MsgReportSDRdaemonSinkStreamTiming&)message).getSamplesCount(); updateWithStreamTime(); return true; } @@ -207,7 +207,7 @@ void FileSinkGui::sendSettings() void FileSinkGui::updateHardware() { qDebug() << "FileSinkGui::updateHardware"; - FileSinkOutput::MsgConfigureFileSink* message = FileSinkOutput::MsgConfigureFileSink::create(m_settings); + SDRdaemonSinkOutput::MsgConfigureSDRdaemonSink* message = SDRdaemonSinkOutput::MsgConfigureSDRdaemonSink::create(m_settings); m_deviceSampleSink->getInputMessageQueue()->push(message); m_updateTimer.stop(); } @@ -301,7 +301,7 @@ void FileSinkGui::on_showFileDialog_clicked(bool checked) void FileSinkGui::configureFileName() { qDebug() << "FileSinkGui::configureFileName: " << m_fileName.toStdString().c_str(); - FileSinkOutput::MsgConfigureFileSinkName* message = FileSinkOutput::MsgConfigureFileSinkName::create(m_fileName); + SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkName* message = SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkName::create(m_fileName); m_deviceSampleSink->getInputMessageQueue()->push(message); } @@ -331,7 +331,7 @@ void FileSinkGui::tick() { if ((++m_tickCount & 0xf) == 0) { - FileSinkOutput::MsgConfigureFileSinkStreamTiming* message = FileSinkOutput::MsgConfigureFileSinkStreamTiming::create(); + SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkStreamTiming* message = SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkStreamTiming::create(); m_deviceSampleSink->getInputMessageQueue()->push(message); } } diff --git a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.ui b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.ui index 9534b4446..d6835c5d5 100644 --- a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.ui +++ b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkgui.ui @@ -29,7 +29,7 @@ - FileSource + SDRdaemon Sink diff --git a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.cpp b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.cpp index af051f894..a12b14415 100644 --- a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.cpp +++ b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.cpp @@ -29,14 +29,14 @@ #include "sdrdaemonsinkoutput.h" #include "sdrdaemonsinkthread.h" -MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgConfigureFileSink, Message) -MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgConfigureFileSinkName, Message) -MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgConfigureFileSinkWork, Message) -MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgConfigureFileSinkStreamTiming, Message) -MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgReportFileSinkGeneration, Message) -MESSAGE_CLASS_DEFINITION(FileSinkOutput::MsgReportFileSinkStreamTiming, Message) +MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgConfigureSDRdaemonSink, Message) +MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkName, Message) +MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkWork, Message) +MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgConfigureSDRdaemonSinkStreamTiming, Message) +MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgReportSDRdaemonSinkGeneration, Message) +MESSAGE_CLASS_DEFINITION(SDRdaemonSinkOutput::MsgReportSDRdaemonSinkStreamTiming, Message) -FileSinkOutput::FileSinkOutput(DeviceSinkAPI *deviceAPI, const QTimer& masterTimer) : +SDRdaemonSinkOutput::SDRdaemonSinkOutput(DeviceSinkAPI *deviceAPI, const QTimer& masterTimer) : m_deviceAPI(deviceAPI), m_settings(), m_fileSinkThread(0), @@ -47,12 +47,12 @@ FileSinkOutput::FileSinkOutput(DeviceSinkAPI *deviceAPI, const QTimer& masterTim { } -FileSinkOutput::~FileSinkOutput() +SDRdaemonSinkOutput::~SDRdaemonSinkOutput() { stop(); } -void FileSinkOutput::openFileStream() +void SDRdaemonSinkOutput::openFileStream() { if (m_ofstream.is_open()) { m_ofstream.close(); @@ -70,7 +70,7 @@ void FileSinkOutput::openFileStream() qDebug() << "FileSinkOutput::openFileStream: " << m_fileName.toStdString().c_str(); } -bool FileSinkOutput::start() +bool SDRdaemonSinkOutput::start() { QMutexLocker mutexLocker(&m_mutex); qDebug() << "FileSinkOutput::start"; @@ -93,13 +93,13 @@ bool FileSinkOutput::start() //applySettings(m_generalSettings, m_settings, true); qDebug("FileSinkOutput::start: started"); - MsgReportFileSinkGeneration *report = MsgReportFileSinkGeneration::create(true); // acquisition on + MsgReportSDRdaemonSinkGeneration *report = MsgReportSDRdaemonSinkGeneration::create(true); // acquisition on getOutputMessageQueueToGUI()->push(report); return true; } -void FileSinkOutput::stop() +void SDRdaemonSinkOutput::stop() { qDebug() << "FileSourceInput::stop"; QMutexLocker mutexLocker(&m_mutex); @@ -115,49 +115,49 @@ void FileSinkOutput::stop() m_ofstream.close(); } - MsgReportFileSinkGeneration *report = MsgReportFileSinkGeneration::create(false); // acquisition off + MsgReportSDRdaemonSinkGeneration *report = MsgReportSDRdaemonSinkGeneration::create(false); // acquisition off getOutputMessageQueueToGUI()->push(report); } -const QString& FileSinkOutput::getDeviceDescription() const +const QString& SDRdaemonSinkOutput::getDeviceDescription() const { return m_deviceDescription; } -int FileSinkOutput::getSampleRate() const +int SDRdaemonSinkOutput::getSampleRate() const { return m_settings.m_sampleRate; } -quint64 FileSinkOutput::getCenterFrequency() const +quint64 SDRdaemonSinkOutput::getCenterFrequency() const { return m_settings.m_centerFrequency; } -std::time_t FileSinkOutput::getStartingTimeStamp() const +std::time_t SDRdaemonSinkOutput::getStartingTimeStamp() const { return m_startingTimeStamp; } -bool FileSinkOutput::handleMessage(const Message& message) +bool SDRdaemonSinkOutput::handleMessage(const Message& message) { - if (MsgConfigureFileSinkName::match(message)) + if (MsgConfigureSDRdaemonSinkName::match(message)) { - MsgConfigureFileSinkName& conf = (MsgConfigureFileSinkName&) message; + MsgConfigureSDRdaemonSinkName& conf = (MsgConfigureSDRdaemonSinkName&) message; m_fileName = conf.getFileName(); openFileStream(); return true; } - else if (MsgConfigureFileSink::match(message)) + else if (MsgConfigureSDRdaemonSink::match(message)) { qDebug() << "FileSinkOutput::handleMessage: MsgConfigureFileSink"; - MsgConfigureFileSink& conf = (MsgConfigureFileSink&) message; + MsgConfigureSDRdaemonSink& conf = (MsgConfigureSDRdaemonSink&) message; applySettings(conf.getSettings(), false); return true; } - else if (MsgConfigureFileSinkWork::match(message)) + else if (MsgConfigureSDRdaemonSinkWork::match(message)) { - MsgConfigureFileSinkWork& conf = (MsgConfigureFileSinkWork&) message; + MsgConfigureSDRdaemonSinkWork& conf = (MsgConfigureSDRdaemonSinkWork&) message; bool working = conf.isWorking(); if (m_fileSinkThread != 0) @@ -174,13 +174,13 @@ bool FileSinkOutput::handleMessage(const Message& message) return true; } - else if (MsgConfigureFileSinkStreamTiming::match(message)) + else if (MsgConfigureSDRdaemonSinkStreamTiming::match(message)) { - MsgReportFileSinkStreamTiming *report; + MsgReportSDRdaemonSinkStreamTiming *report; if (m_fileSinkThread != 0) { - report = MsgReportFileSinkStreamTiming::create(m_fileSinkThread->getSamplesCount()); + report = MsgReportSDRdaemonSinkStreamTiming::create(m_fileSinkThread->getSamplesCount()); getOutputMessageQueueToGUI()->push(report); } @@ -192,7 +192,7 @@ bool FileSinkOutput::handleMessage(const Message& message) } } -void FileSinkOutput::applySettings(const FileSinkSettings& settings, bool force) +void SDRdaemonSinkOutput::applySettings(const FileSinkSettings& settings, bool force) { QMutexLocker mutexLocker(&m_mutex); bool forwardChange = false; diff --git a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.h b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.h index 95fa6a152..d72da0842 100644 --- a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.h +++ b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkoutput.h @@ -1,5 +1,5 @@ /////////////////////////////////////////////////////////////////////////////////// -// Copyright (C) 2016 Edouard Griffiths, F4EXB // +// Copyright (C) 2017 Edouard Griffiths, F4EXB // // // // This program is free software; you can redistribute it and/or modify // // it under the terms of the GNU General Public License as published by // @@ -14,8 +14,8 @@ // along with this program. If not, see . // /////////////////////////////////////////////////////////////////////////////////// -#ifndef INCLUDE_FILESINKOUTPUT_H -#define INCLUDE_FILESINKOUTPUT_H +#ifndef INCLUDE_SDRDAEMONSINKOUTPUT_H +#define INCLUDE_SDRDAEMONSINKOUTPUT_H #include #include @@ -27,130 +27,130 @@ #include "sdrdaemonsinksettings.h" -class FileSinkThread; +class SDRdaemonSinkThread; class DeviceSinkAPI; -class FileSinkOutput : public DeviceSampleSink { +class SDRdaemonSinkOutput : public DeviceSampleSink { public: - class MsgConfigureFileSink : public Message { + class MsgConfigureSDRdaemonSink : public Message { MESSAGE_CLASS_DECLARATION public: - const FileSinkSettings& getSettings() const { return m_settings; } + const SDRdaemonSinkSettings& getSettings() const { return m_settings; } - static MsgConfigureFileSink* create(const FileSinkSettings& settings) + static MsgConfigureSDRdaemonSink* create(const SDRdaemonSinkSettings& settings) { - return new MsgConfigureFileSink(settings); + return new MsgConfigureSDRdaemonSink(settings); } private: - FileSinkSettings m_settings; + SDRdaemonSinkSettings m_settings; - MsgConfigureFileSink(const FileSinkSettings& settings) : + MsgConfigureSDRdaemonSink(const SDRdaemonSinkSettings& settings) : Message(), m_settings(settings) { } }; - class MsgConfigureFileSinkName : public Message { + class MsgConfigureSDRdaemonSinkName : public Message { MESSAGE_CLASS_DECLARATION public: const QString& getFileName() const { return m_fileName; } - static MsgConfigureFileSinkName* create(const QString& fileName) + static MsgConfigureSDRdaemonSinkName* create(const QString& fileName) { - return new MsgConfigureFileSinkName(fileName); + return new MsgConfigureSDRdaemonSinkName(fileName); } private: QString m_fileName; - MsgConfigureFileSinkName(const QString& fileName) : + MsgConfigureSDRdaemonSinkName(const QString& fileName) : Message(), m_fileName(fileName) { } }; - class MsgConfigureFileSinkWork : public Message { + class MsgConfigureSDRdaemonSinkWork : public Message { MESSAGE_CLASS_DECLARATION public: bool isWorking() const { return m_working; } - static MsgConfigureFileSinkWork* create(bool working) + static MsgConfigureSDRdaemonSinkWork* create(bool working) { - return new MsgConfigureFileSinkWork(working); + return new MsgConfigureSDRdaemonSinkWork(working); } private: bool m_working; - MsgConfigureFileSinkWork(bool working) : + MsgConfigureSDRdaemonSinkWork(bool working) : Message(), m_working(working) { } }; - class MsgConfigureFileSinkStreamTiming : public Message { + class MsgConfigureSDRdaemonSinkStreamTiming : public Message { MESSAGE_CLASS_DECLARATION public: - static MsgConfigureFileSinkStreamTiming* create() + static MsgConfigureSDRdaemonSinkStreamTiming* create() { - return new MsgConfigureFileSinkStreamTiming(); + return new MsgConfigureSDRdaemonSinkStreamTiming(); } private: - MsgConfigureFileSinkStreamTiming() : + MsgConfigureSDRdaemonSinkStreamTiming() : Message() { } }; - class MsgReportFileSinkGeneration : public Message { + class MsgReportSDRdaemonSinkGeneration : public Message { MESSAGE_CLASS_DECLARATION public: bool getAcquisition() const { return m_acquisition; } - static MsgReportFileSinkGeneration* create(bool acquisition) + static MsgReportSDRdaemonSinkGeneration* create(bool acquisition) { - return new MsgReportFileSinkGeneration(acquisition); + return new MsgReportSDRdaemonSinkGeneration(acquisition); } protected: bool m_acquisition; - MsgReportFileSinkGeneration(bool acquisition) : + MsgReportSDRdaemonSinkGeneration(bool acquisition) : Message(), m_acquisition(acquisition) { } }; - class MsgReportFileSinkStreamTiming : public Message { + class MsgReportSDRdaemonSinkStreamTiming : public Message { MESSAGE_CLASS_DECLARATION public: std::size_t getSamplesCount() const { return m_samplesCount; } - static MsgReportFileSinkStreamTiming* create(std::size_t samplesCount) + static MsgReportSDRdaemonSinkStreamTiming* create(std::size_t samplesCount) { - return new MsgReportFileSinkStreamTiming(samplesCount); + return new MsgReportSDRdaemonSinkStreamTiming(samplesCount); } protected: std::size_t m_samplesCount; - MsgReportFileSinkStreamTiming(std::size_t samplesCount) : + MsgReportSDRdaemonSinkStreamTiming(std::size_t samplesCount) : Message(), m_samplesCount(samplesCount) { } }; - FileSinkOutput(DeviceSinkAPI *deviceAPI, const QTimer& masterTimer); - virtual ~FileSinkOutput(); + SDRdaemonSinkOutput(DeviceSinkAPI *deviceAPI, const QTimer& masterTimer); + virtual ~SDRdaemonSinkOutput(); virtual bool start(); virtual void stop(); @@ -165,16 +165,16 @@ public: private: DeviceSinkAPI *m_deviceAPI; QMutex m_mutex; - FileSinkSettings m_settings; + SDRdaemonSinkSettings m_settings; std::ofstream m_ofstream; - FileSinkThread* m_fileSinkThread; + SDRdaemonSinkThread* m_fileSinkThread; QString m_deviceDescription; QString m_fileName; std::time_t m_startingTimeStamp; const QTimer& m_masterTimer; void openFileStream(); - void applySettings(const FileSinkSettings& settings, bool force = false); + void applySettings(const SDRdaemonSinkSettings& settings, bool force = false); }; -#endif // INCLUDE_FILESINKOUTPUT_H +#endif // INCLUDE_SDRDAEMONSINKOUTPUT_H diff --git a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkthread.cpp b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkthread.cpp index 4c0745b68..b3c1f044d 100644 --- a/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkthread.cpp +++ b/plugins/samplesink/sdrdaemonsink/sdrdaemonsinkthread.cpp @@ -121,7 +121,7 @@ void SDRdaemonSinkThread::setLog2Interpolation(int log2Interpolation) if (log2Interpolation != m_log2Interpolation) { - qDebug() << "FileSinkThread::setLog2Interpolation:" + qDebug() << "SDRdaemonSinkThread::setLog2Interpolation:" << " new:" << log2Interpolation << " old:" << m_log2Interpolation; @@ -187,44 +187,47 @@ void SDRdaemonSinkThread::tick() SampleVector::iterator readUntil; - m_sampleFifo->readAdvance(readUntil, m_samplesChunkSize); + m_sampleFifo->readAdvance(readUntil, m_samplesChunkSize); // pull samples SampleVector::iterator beginRead = readUntil - m_samplesChunkSize; m_samplesCount += m_samplesChunkSize; - if (m_log2Interpolation == 0) - { - m_ofstream->write(reinterpret_cast(&(*beginRead)), m_samplesChunkSize*sizeof(Sample)); - } - else - { - int chunkSize = std::min((int) m_samplesChunkSize, m_samplerate); + m_ofstream->write(reinterpret_cast(&(*beginRead)), m_samplesChunkSize*sizeof(Sample)); // send samples - switch (m_log2Interpolation) - { - case 1: - m_interpolators.interpolate2_cen(&beginRead, m_buf, chunkSize*(1<write(reinterpret_cast(m_buf), m_samplesChunkSize*(1<write(reinterpret_cast(&(*beginRead)), m_samplesChunkSize*sizeof(Sample)); // send samples +// } +// else +// { +// int chunkSize = std::min((int) m_samplesChunkSize, m_samplerate); +// +// switch (m_log2Interpolation) +// { +// case 1: +// m_interpolators.interpolate2_cen(&beginRead, m_buf, chunkSize*(1<write(reinterpret_cast(m_buf), m_samplesChunkSize*(1<. // +/////////////////////////////////////////////////////////////////////////////////// + +#include + +#include +#include +#include +#include + +#include "udpsinkfec.h" + +MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgUDPFECEncodeAndSend, Message) +MESSAGE_CLASS_DEFINITION(UDPSinkFECWorker::MsgConfigureRemoteAddress, Message) + + +UDPSinkFEC::UDPSinkFEC() : + m_centerFrequency(100000), + m_sampleRate(48000), + m_sampleBytes(1), + m_sampleBits(8), + m_nbSamples(0), + m_nbBlocksFEC(0), + m_txDelay(0), + m_txBlockIndex(0), + m_txBlocksIndex(0), + m_frameCount(0), + m_sampleIndex(0) +{ + m_currentMetaFEC.init(); + m_bufMeta = new uint8_t[m_udpSize]; + m_buf = new uint8_t[m_udpSize]; + m_udpWorker = new UDPSinkFECWorker(); + + m_udpWorker->moveToThread(&m_udpThread); + connect(&(m_udpWorker->m_inputMessageQueue), SIGNAL(messageEnqueued()), m_udpWorker, SLOT(handleInputMessages())); + m_udpThread.start(); +} + +UDPSinkFEC::~UDPSinkFEC() +{ + disconnect(&(m_udpWorker->m_inputMessageQueue), SIGNAL(messageEnqueued()), m_udpWorker, SLOT(handleInputMessages())); + m_udpThread.exit(); + m_udpThread.wait(); + + delete[] m_buf; + delete[] m_bufMeta; + delete m_udpWorker; +} + +void UDPSinkFEC::setTxDelay(uint32_t txDelay) +{ + qDebug() << "UDPSinkFEC::setTxDelay: txDelay: " << txDelay; + m_txDelay = txDelay; +} + +void UDPSinkFEC::setNbBlocksFEC(uint32_t nbBlocksFEC) +{ + qDebug() << "UDPSinkFEC::setNbBlocksFEC: nbBlocksFEC: " << nbBlocksFEC; + m_nbBlocksFEC = nbBlocksFEC; +} + +void UDPSinkFEC::setRemoteAddress(const QString& address, uint16_t port) +{ + qDebug() << "UDPSinkFEC::setRemoteAddress: address: " << address << " port: " << port; + m_udpWorker->setRemoteAddress(address, port); +} + +void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunkSize) +{ + const SampleVector::iterator end = begin + sampleChunkSize; + SampleVector::iterator it = begin; + + while (it != end) + { + int inSamplesIndex = it - begin; + int inRemainingSamples = end - it; + + if (m_txBlockIndex == 0) // Tx block index 0 is a block with only meta data + { + struct timeval tv; + MetaDataFEC metaData; + + gettimeofday(&tv, 0); + + // create meta data TODO: semaphore + metaData.m_centerFrequency = m_centerFrequency; + metaData.m_sampleRate = m_sampleRate; + metaData.m_sampleBytes = m_sampleBytes; + metaData.m_sampleBits = m_sampleBits; + metaData.m_nbOriginalBlocks = m_nbOriginalBlocks; + metaData.m_nbFECBlocks = m_nbBlocksFEC; + metaData.m_tv_sec = tv.tv_sec; + metaData.m_tv_usec = tv.tv_usec; + + boost::crc_32_type crc32; + crc32.process_bytes(&metaData, 20); + + metaData.m_crc32 = crc32.checksum(); + + memset((void *) &m_superBlock, 0, m_udpSize); + + m_superBlock.header.frameIndex = m_frameCount; + m_superBlock.header.blockIndex = m_txBlockIndex; + memcpy((void *) &m_superBlock.protectedBlock, (const void *) &metaData, sizeof(MetaDataFEC)); + + if (!(metaData == m_currentMetaFEC)) + { + qDebug() << "UDPSinkFEC::write: meta: " + << "|" << metaData.m_centerFrequency + << ":" << metaData.m_sampleRate + << ":" << (int) (metaData.m_sampleBytes & 0xF) + << ":" << (int) metaData.m_sampleBits + << "|" << (int) metaData.m_nbOriginalBlocks + << ":" << (int) metaData.m_nbFECBlocks + << "|" << metaData.m_tv_sec + << ":" << metaData.m_tv_usec + << "|"; + + m_currentMetaFEC = metaData; + } + + m_txBlocks[m_txBlocksIndex][0] = m_superBlock; + m_txBlockIndex = 1; // next Tx block with data + } + + if (m_sampleIndex + inRemainingSamples < samplesPerBlock) // there is still room in the current super block + { + memcpy((void *) &m_superBlock.protectedBlock.m_samples[m_sampleIndex], + (const void *) &(*it), + inRemainingSamples * sizeof(Sample)); + m_sampleIndex += inRemainingSamples; + it = end; // all input samples are consumed + } + else // complete super block and initiate the next if not end of frame + { + memcpy((void *) &m_superBlock.protectedBlock.m_samples[m_sampleIndex], + (const void *) &(*it), + (samplesPerBlock - m_sampleIndex) * sizeof(Sample)); + it += samplesPerBlock - m_sampleIndex; + m_sampleIndex = 0; + + m_superBlock.header.frameIndex = m_frameCount; + m_superBlock.header.blockIndex = m_txBlockIndex; + m_txBlocks[m_txBlocksIndex][m_txBlockIndex] = m_superBlock; + + if (m_txBlockIndex == m_nbOriginalBlocks - 1) // frame complete + { + int nbBlocksFEC = m_nbBlocksFEC; + int txDelay = m_txDelay; + + // TODO: send blocks + m_udpWorker->pushTxFrame(m_txBlocks[m_txBlocksIndex], nbBlocksFEC, txDelay, m_frameCount); + //m_txThread = new std::thread(transmitUDP, this, m_txBlocks[m_txBlocksIndex], m_frameCount, nbBlocksFEC, txDelay, m_cm256Valid); + //transmitUDP(this, m_txBlocks[m_txBlocksIndex], m_frameCount, m_nbBlocksFEC, m_txDelay, m_cm256Valid); + + m_txBlocksIndex = (m_txBlocksIndex + 1) % 4; + m_txBlockIndex = 0; + m_frameCount++; + } + else + { + m_txBlockIndex++; + } + } + } +} + +UDPSinkFECWorker::UDPSinkFECWorker() : m_remotePort(9090) +{ + m_cm256Valid = m_cm256.isInitialized(); +} + +UDPSinkFECWorker::~UDPSinkFECWorker() +{ +} + +void UDPSinkFECWorker::pushTxFrame(const UDPSinkFEC::SuperBlock *txBlocks, + uint32_t nbBlocksFEC, + uint32_t txDelay, + uint16_t frameIndex) +{ + m_inputMessageQueue.push(MsgUDPFECEncodeAndSend::create(txBlocks, nbBlocksFEC, txDelay, frameIndex)); +} + +void UDPSinkFECWorker::setRemoteAddress(const QString& address, uint16_t port) +{ + m_inputMessageQueue.push(MsgConfigureRemoteAddress::create(address, port)); +} + +void UDPSinkFECWorker::handleInputMessages() +{ + Message* message; + + while ((message = m_inputMessageQueue.pop()) != 0) + { + if (MsgUDPFECEncodeAndSend::match(*message)) + { + MsgUDPFECEncodeAndSend *sendMsg = (MsgUDPFECEncodeAndSend *) message; + } + else if (MsgConfigureRemoteAddress::match(*message)) + { + MsgConfigureRemoteAddress *addressMsg = (MsgConfigureRemoteAddress *) message; + m_remoteAddress.setAddress(addressMsg->getAddress()); + m_remotePort = addressMsg->getPort(); + } + + delete message; + } +} + +void UDPSinkFECWorker::transmitUDP(UDPSinkFEC::SuperBlock *txBlockx, uint16_t frameIndex, int nbBlocksFEC, int txDelay) +{ + CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder + CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder + UDPSinkFEC::ProtectedBlock fecBlocks[256]; //!< FEC data + + if ((nbBlocksFEC == 0) || !m_cm256Valid) + { + for (int i = 0; i < UDPSinkFEC::m_nbOriginalBlocks; i++) + { + m_udpSocket.writeDatagram((const char *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress, m_remotePort); + usleep(txDelay); + } + } + else + { + cm256Params.BlockBytes = sizeof(UDPSinkFEC::ProtectedBlock); + cm256Params.OriginalCount = UDPSinkFEC::m_nbOriginalBlocks; + cm256Params.RecoveryCount = nbBlocksFEC; + + + // Fill pointers to data + for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; ++i) + { + if (i >= cm256Params.OriginalCount) { + memset((void *) &txBlockx[i].protectedBlock, 0, sizeof(UDPSinkFEC::ProtectedBlock)); + } + + txBlockx[i].header.frameIndex = frameIndex; + txBlockx[i].header.blockIndex = i; + descriptorBlocks[i].Block = (void *) &(txBlockx[i].protectedBlock); + descriptorBlocks[i].Index = txBlockx[i].header.blockIndex; + } + + // Encode FEC blocks + if (m_cm256.cm256_encode(cm256Params, descriptorBlocks, fecBlocks)) + { + qDebug() << "UDPSinkFECWorker::transmitUDP: CM256 encode failed. No transmission."; + return; + } + + // Merge FEC with data to transmit + for (int i = 0; i < cm256Params.RecoveryCount; i++) + { + txBlockx[i + cm256Params.OriginalCount].protectedBlock = fecBlocks[i]; + } + + // Transmit all blocks + for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) + { +#ifdef SDRDAEMON_PUNCTURE + if (i == SDRDAEMON_PUNCTURE) { + continue; + } +#endif +// std::cerr << "UDPSinkFEC::transmitUDP:" +// << " i: " << i +// << " frameIndex: " << (int) m_txBlocks[i].header.frameIndex +// << " blockIndex: " << (int) m_txBlocks[i].header.blockIndex +// << " i.q:"; +// +// for (int j = 0; j < 10; j++) +// { +// std::cerr << " " << (int) m_txBlocks[i].protectedBlock.m_samples[j].m_real +// << "." << (int) m_txBlocks[i].protectedBlock.m_samples[j].m_imag; +// } +// +// std::cerr << std::endl; + + m_udpSocket.writeDatagram((const char *) &txBlockx[i], (int) UDPSinkFEC::m_udpSize, m_remoteAddress, m_remotePort); + usleep(txDelay); + } + } +} diff --git a/plugins/samplesink/sdrdaemonsink/udpsinkfec.h b/plugins/samplesink/sdrdaemonsink/udpsinkfec.h new file mode 100644 index 000000000..56133b897 --- /dev/null +++ b/plugins/samplesink/sdrdaemonsink/udpsinkfec.h @@ -0,0 +1,248 @@ +/////////////////////////////////////////////////////////////////////////////////// +// Copyright (C) 2017 Edouard Griffiths, F4EXB // +// // +// This program is free software; you can redistribute it and/or modify // +// it under the terms of the GNU General Public License as published by // +// the Free Software Foundation as version 3 of the License, or // +// // +// This program is distributed in the hope that it will be useful, // +// but WITHOUT ANY WARRANTY; without even the implied warranty of // +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // +// GNU General Public License V3 for more details. // +// // +// You should have received a copy of the GNU General Public License // +// along with this program. If not, see . // +/////////////////////////////////////////////////////////////////////////////////// + +#ifndef PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFEC_H_ +#define PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFEC_H_ + +#include +#include + +#include +#include +#include +#include +#include + +#include "cm256.h" + +#include "dsp/dsptypes.h" +#include "util/CRC64.h" +#include "util/messagequeue.h" +#include "util/message.h" + +class UDPSinkFECWorker; + +class UDPSinkFEC : public QObject +{ + Q_OBJECT +public: + static const uint32_t m_udpSize = 512; //!< Size of UDP block in number of bytes + static const uint32_t m_nbOriginalBlocks = 128; //!< Number of original blocks in a protected block sequence +#pragma pack(push, 1) + struct MetaDataFEC + { + uint32_t m_centerFrequency; //!< 4 center frequency in kHz + uint32_t m_sampleRate; //!< 8 sample rate in Hz + uint8_t m_sampleBytes; //!< 9 MSB(4): indicators, LSB(4) number of bytes per sample + uint8_t m_sampleBits; //!< 10 number of effective bits per sample + uint8_t m_nbOriginalBlocks; //!< 11 number of blocks with original (protected) data + uint8_t m_nbFECBlocks; //!< 12 number of blocks carrying FEC + uint32_t m_tv_sec; //!< 16 seconds of timestamp at start time of super-frame processing + uint32_t m_tv_usec; //!< 20 microseconds of timestamp at start time of super-frame processing + uint32_t m_crc32; //!< 24 CRC32 of the above + + bool operator==(const MetaDataFEC& rhs) + { + return (memcmp((const void *) this, (const void *) &rhs, 12) == 0); // Only the 12 first bytes are relevant + } + + void init() + { + memset((void *) this, 0, sizeof(MetaDataFEC)); + m_nbFECBlocks = -1; + } + }; + + struct Header + { + uint16_t frameIndex; + uint8_t blockIndex; + uint8_t filler; + }; + + static const int samplesPerBlock = (m_udpSize - sizeof(Header)) / sizeof(Sample); + + struct ProtectedBlock + { + Sample m_samples[samplesPerBlock]; + }; + + struct SuperBlock + { + Header header; + ProtectedBlock protectedBlock; + }; +#pragma pack(pop) + + /** + * Construct UDP sink + */ + UDPSinkFEC(); + + /** Destroy UDP sink */ + ~UDPSinkFEC(); + + /** + * Write IQ samples + */ + void write(const SampleVector::iterator& begin, uint32_t sampleChunkSize); + + /** Return the last error, or return an empty string if there is no error. */ + std::string error() + { + std::string ret(m_error); + m_error.clear(); + return ret; + } + + /** Set center frequency given in Hz */ + void setCenterFrequency(uint64_t centerFrequency) { m_centerFrequency = centerFrequency / 1000; } + + /** Set sample rate given in Hz */ + void setSampleRate(uint32_t sampleRate) { m_sampleRate = sampleRate; } + + void setSampleBytes(uint8_t sampleBytes) { m_sampleBytes = (sampleBytes & 0x0F) + (m_sampleBytes & 0xF0); } + void setSampleBits(uint8_t sampleBits) { m_sampleBits = sampleBits; } + + void setNbBlocksFEC(uint32_t nbBlocksFEC); + void setTxDelay(uint32_t txDelay); + void setRemoteAddress(const QString& address, uint16_t port); + + /** Return true if the stream is OK, return false if there is an error. */ + operator bool() const + { + return m_error.empty(); + } + +private: + std::string m_error; + + uint32_t m_centerFrequency; //!< center frequency in kHz + uint32_t m_sampleRate; //!< sample rate in Hz + uint8_t m_sampleBytes; //!< number of bytes per sample + uint8_t m_sampleBits; //!< number of effective bits per sample + uint32_t m_nbSamples; //!< total number of samples sent int the last frame + + QHostAddress m_ownAddress; + + CRC64 m_crc64; + uint8_t* m_bufMeta; + uint8_t* m_buf; + + MetaDataFEC m_currentMetaFEC; //!< Meta data for current frame + uint32_t m_nbBlocksFEC; //!< Variable number of FEC blocks + uint32_t m_txDelay; //!< Delay in microseconds (usleep) between each sending of an UDP datagram + SuperBlock m_txBlocks[4][256]; //!< UDP blocks to send with original data + FEC + SuperBlock m_superBlock; //!< current super block being built + int m_txBlockIndex; //!< Current index in blocks to transmit in the Tx row + int m_txBlocksIndex; //!< Current index of Tx blocks row + uint16_t m_frameCount; //!< transmission frame count + int m_sampleIndex; //!< Current sample index in protected block data + + QThread m_udpThread; + UDPSinkFECWorker *m_udpWorker; +}; + + +class UDPSinkFECWorker : public QObject +{ + Q_OBJECT +public: + class MsgUDPFECEncodeAndSend : public Message + { + MESSAGE_CLASS_DECLARATION + public: + const UDPSinkFEC::SuperBlock *getTxBlocks() const { return m_txBlockx; } + uint32_t getNbBlocsFEC() const { return m_nbBlocksFEC; } + uint32_t getTxDelay() const { return m_txDelay; } + uint16_t getFrameIndex() const { return m_frameIndex; } + + static MsgUDPFECEncodeAndSend* create( + const UDPSinkFEC::SuperBlock *txBlocks, + uint32_t nbBlocksFEC, + uint32_t txDelay, + uint16_t frameIndex) + { + return new MsgUDPFECEncodeAndSend(txBlocks, nbBlocksFEC, txDelay, frameIndex); + } + + private: + const UDPSinkFEC::SuperBlock *m_txBlockx; + uint32_t m_nbBlocksFEC; + uint32_t m_txDelay; + uint16_t m_frameIndex; + + MsgUDPFECEncodeAndSend( + const UDPSinkFEC::SuperBlock *txBlocks, + uint32_t nbBlocksFEC, + uint32_t txDelay, + uint16_t frameIndex) : + m_txBlockx(txBlocks), + m_nbBlocksFEC(nbBlocksFEC), + m_txDelay(txDelay), + m_frameIndex(frameIndex) + {} + }; + + class MsgConfigureRemoteAddress : public Message + { + MESSAGE_CLASS_DECLARATION + public: + const QString& getAddress() const { return m_address; } + uint16_t getPort() const { return m_port; } + + static MsgConfigureRemoteAddress* create(const QString& address, uint16_t port) + { + return new MsgConfigureRemoteAddress(address, port); + } + + private: + QString m_address; + uint16_t m_port; + + MsgConfigureRemoteAddress(const QString& address, uint16_t port) : + m_address(address), + m_port(port) + {} + }; + + UDPSinkFECWorker(); + ~UDPSinkFECWorker(); + + void pushTxFrame(const UDPSinkFEC::SuperBlock *txBlocks, + uint32_t nbBlocksFEC, + uint32_t txDelay, + uint16_t frameIndex); + void setRemoteAddress(const QString& address, uint16_t port); + + MessageQueue m_inputMessageQueue; //!< Queue for asynchronous inbound communication + +public slots: + void handleInputMessages(); + +private: + void transmitUDP(UDPSinkFEC::SuperBlock *txBlockx, uint16_t frameIndex, int nbBlocksFEC, int txDelay); + + QUdpSocket m_udpSocket; + CM256 m_cm256; //!< CM256 library object + bool m_cm256Valid; //!< true if CM256 library is initialized correctly + QHostAddress m_remoteAddress; + uint16_t m_remotePort; +}; + + + +#endif /* PLUGINS_SAMPLESINK_SDRDAEMONSINK_UDPSINKFEC_H_ */