diff --git a/sdrdaemon/channel/sdrdaemonchannelsink.cpp b/sdrdaemon/channel/sdrdaemonchannelsink.cpp
index 7a551fa0d..a10d6a771 100644
--- a/sdrdaemon/channel/sdrdaemonchannelsink.cpp
+++ b/sdrdaemon/channel/sdrdaemonchannelsink.cpp
@@ -70,7 +70,7 @@ SDRDaemonChannelSink::SDRDaemonChannelSink(DeviceSourceAPI *deviceAPI) :
SDRDaemonChannelSink::~SDRDaemonChannelSink()
{
m_dataBlockMutex.lock();
- if (m_dataBlock && !m_dataBlock->m_controlBlock.m_complete) {
+ if (m_dataBlock && !m_dataBlock->m_txControlBlock.m_complete) {
delete m_dataBlock;
}
m_dataBlockMutex.unlock();
@@ -171,13 +171,13 @@ void SDRDaemonChannelSink::feed(const SampleVector::const_iterator& begin, const
if (m_txBlockIndex == SDRDaemonNbOrginalBlocks - 1) // frame complete
{
m_dataBlockMutex.lock();
- m_dataBlock->m_controlBlock.m_frameIndex = m_frameCount;
- m_dataBlock->m_controlBlock.m_processed = false;
- m_dataBlock->m_controlBlock.m_complete = true;
- m_dataBlock->m_controlBlock.m_nbBlocksFEC = m_nbBlocksFEC;
- m_dataBlock->m_controlBlock.m_txDelay = m_txDelay;
- m_dataBlock->m_controlBlock.m_dataAddress = m_dataAddress;
- m_dataBlock->m_controlBlock.m_dataPort = m_dataPort;
+ m_dataBlock->m_txControlBlock.m_frameIndex = m_frameCount;
+ m_dataBlock->m_txControlBlock.m_processed = false;
+ m_dataBlock->m_txControlBlock.m_complete = true;
+ m_dataBlock->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC;
+ m_dataBlock->m_txControlBlock.m_txDelay = m_txDelay;
+ m_dataBlock->m_txControlBlock.m_dataAddress = m_dataAddress;
+ m_dataBlock->m_txControlBlock.m_dataPort = m_dataPort;
m_dataQueue.push(m_dataBlock);
m_dataBlock = new SDRDaemonDataBlock(); // create a new one immediately
diff --git a/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp b/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp
index e975dddcb..368a37e3c 100644
--- a/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp
+++ b/sdrdaemon/channel/sdrdaemonchannelsinkthread.cpp
@@ -94,11 +94,11 @@ bool SDRDaemonChannelSinkThread::handleDataBlock(SDRDaemonDataBlock& dataBlock)
CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder
SDRDaemonProtectedBlock fecBlocks[256]; //!< FEC data
- uint16_t frameIndex = dataBlock.m_controlBlock.m_frameIndex;
- int nbBlocksFEC = dataBlock.m_controlBlock.m_nbBlocksFEC;
- int txDelay = dataBlock.m_controlBlock.m_txDelay;
- m_address.setAddress(dataBlock.m_controlBlock.m_dataAddress);
- uint16_t dataPort = dataBlock.m_controlBlock.m_dataPort;
+ uint16_t frameIndex = dataBlock.m_txControlBlock.m_frameIndex;
+ int nbBlocksFEC = dataBlock.m_txControlBlock.m_nbBlocksFEC;
+ int txDelay = dataBlock.m_txControlBlock.m_txDelay;
+ m_address.setAddress(dataBlock.m_txControlBlock.m_dataAddress);
+ uint16_t dataPort = dataBlock.m_txControlBlock.m_dataPort;
SDRDaemonSuperBlock *txBlockx = dataBlock.m_superBlocks;
if ((nbBlocksFEC == 0) || !m_cm256) // Do not FEC encode
@@ -158,7 +158,7 @@ bool SDRDaemonChannelSinkThread::handleDataBlock(SDRDaemonDataBlock& dataBlock)
}
}
- dataBlock.m_controlBlock.m_processed = true;
+ dataBlock.m_txControlBlock.m_processed = true;
return true;
}
diff --git a/sdrdaemon/channel/sdrdaemonchannelsource.cpp b/sdrdaemon/channel/sdrdaemonchannelsource.cpp
index e16e6baf9..cc1a5af32 100644
--- a/sdrdaemon/channel/sdrdaemonchannelsource.cpp
+++ b/sdrdaemon/channel/sdrdaemonchannelsource.cpp
@@ -20,6 +20,9 @@
// along with this program. If not, see . //
///////////////////////////////////////////////////////////////////////////////////
+#include
+#include
+
#include
#include "util/simpleserializer.h"
@@ -53,6 +56,7 @@ SDRDaemonChannelSource::SDRDaemonChannelSource(DeviceSinkAPI *deviceAPI) :
connect(&m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection);
m_cm256p = m_cm256.isInitialized() ? &m_cm256 : 0;
+ m_currentMeta.init();
}
SDRDaemonChannelSource::~SDRDaemonChannelSource()
@@ -79,7 +83,7 @@ void SDRDaemonChannelSource::start()
stop();
}
- m_sourceThread = new SDRDaemonChannelSourceThread(&m_dataQueue, m_cm256p);
+ m_sourceThread = new SDRDaemonChannelSourceThread(&m_dataQueue);
m_sourceThread->startStop(true);
m_sourceThread->dataBind(m_dataAddress, m_dataPort);
m_running = true;
@@ -165,8 +169,91 @@ void SDRDaemonChannelSource::applySettings(const SDRDaemonChannelSourceSettings&
m_settings = settings;
}
-bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock __attribute__((unused)))
+bool SDRDaemonChannelSource::handleDataBlock(SDRDaemonDataBlock& dataBlock)
{
+ if (dataBlock.m_rxControlBlock.m_blockCount < SDRDaemonNbOrginalBlocks)
+ {
+ qWarning("SDRDaemonChannelSource::handleDataBlock: incomplete data block: not processing");
+ }
+ else
+ {
+ int blockCount = 0;
+
+ for (int blockIndex = 0; blockIndex < 256; blockIndex++)
+ {
+ if ((blockIndex == 0) && (dataBlock.m_rxControlBlock.m_metaRetrieved))
+ {
+ m_cm256DescriptorBlocks[blockCount].Index = 0;
+ m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock.m_superBlocks[0].m_protectedBlock);
+ blockCount++;
+ }
+ else if (dataBlock.m_superBlocks[blockIndex].m_header.m_blockIndex != 0)
+ {
+ m_cm256DescriptorBlocks[blockCount].Index = dataBlock.m_superBlocks[blockIndex].m_header.m_blockIndex;
+ m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock.m_superBlocks[blockIndex].m_protectedBlock);
+ blockCount++;
+ }
+ }
+
+ //qDebug("SDRDaemonChannelSource::handleDataBlock: frame: %u blocks: %d", dataBlock.m_rxControlBlock.m_frameIndex, blockCount);
+
+ // Need to use the CM256 recovery
+ if (m_cm256p &&(dataBlock.m_rxControlBlock.m_originalCount < SDRDaemonNbOrginalBlocks))
+ {
+ qDebug("SDRDaemonChannelSource::handleDataBlock: %d recovery blocks", dataBlock.m_rxControlBlock.m_recoveryCount);
+ CM256::cm256_encoder_params paramsCM256;
+ paramsCM256.BlockBytes = sizeof(SDRDaemonProtectedBlock); // never changes
+ paramsCM256.OriginalCount = SDRDaemonNbOrginalBlocks; // never changes
+
+ if (m_currentMeta.m_tv_sec == 0) {
+ paramsCM256.RecoveryCount = dataBlock.m_rxControlBlock.m_recoveryCount;
+ } else {
+ paramsCM256.RecoveryCount = m_currentMeta.m_nbFECBlocks;
+ }
+
+ if (m_cm256.cm256_decode(paramsCM256, m_cm256DescriptorBlocks)) // CM256 decode
+ {
+ qWarning() << "SDRDaemonChannelSource::handleDataBlock: decode CM256 error:"
+ << " m_originalCount: " << dataBlock.m_rxControlBlock.m_originalCount
+ << " m_recoveryCount: " << dataBlock.m_rxControlBlock.m_recoveryCount;
+ }
+ else
+ {
+ for (int ir = 0; ir < dataBlock.m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks
+ {
+ int recoveryIndex = SDRDaemonNbOrginalBlocks - dataBlock.m_rxControlBlock.m_recoveryCount + ir;
+ int blockIndex = m_cm256DescriptorBlocks[recoveryIndex].Index;
+ SDRDaemonProtectedBlock *recoveredBlock =
+ (SDRDaemonProtectedBlock *) m_cm256DescriptorBlocks[recoveryIndex].Block;
+ memcpy((void *) &(dataBlock.m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(SDRDaemonProtectedBlock));
+ if ((blockIndex == 0) && !dataBlock.m_rxControlBlock.m_metaRetrieved) {
+ dataBlock.m_rxControlBlock.m_metaRetrieved = true;
+ }
+ }
+ }
+ }
+
+ // Validate block zero and retrieve its data
+ if (dataBlock.m_rxControlBlock.m_metaRetrieved)
+ {
+ SDRDaemonMetaDataFEC *metaData = (SDRDaemonMetaDataFEC *) &(dataBlock.m_superBlocks[0].m_protectedBlock);
+ boost::crc_32_type crc32;
+ crc32.process_bytes(metaData, 20);
+
+ if (crc32.checksum() == metaData->m_crc32)
+ {
+ if (!(m_currentMeta == *metaData)) {
+ printMeta("SDRDaemonChannelSource::handleDataBlock", metaData);
+ }
+
+ m_currentMeta = *metaData;
+ }
+ else
+ {
+ qWarning() << "SDRDaemonChannelSource::handleDataBlock: recovered meta: invalid CRC32";
+ }
+ }
+ }
//TODO: Push into R/W buffer
return true;
}
@@ -183,3 +270,17 @@ void SDRDaemonChannelSource::handleData()
}
}
}
+
+void SDRDaemonChannelSource::printMeta(const QString& header, SDRDaemonMetaDataFEC *metaData)
+{
+ qDebug() << header << ": "
+ << "|" << metaData->m_centerFrequency
+ << ":" << metaData->m_sampleRate
+ << ":" << (int) (metaData->m_sampleBytes & 0xF)
+ << ":" << (int) metaData->m_sampleBits
+ << ":" << (int) metaData->m_nbOriginalBlocks
+ << ":" << (int) metaData->m_nbFECBlocks
+ << "|" << metaData->m_tv_sec
+ << ":" << metaData->m_tv_usec
+ << "|";
+}
diff --git a/sdrdaemon/channel/sdrdaemonchannelsource.h b/sdrdaemon/channel/sdrdaemonchannelsource.h
index be1e73e71..433bf8250 100644
--- a/sdrdaemon/channel/sdrdaemonchannelsource.h
+++ b/sdrdaemon/channel/sdrdaemonchannelsource.h
@@ -29,6 +29,7 @@
#include "channel/channelsourceapi.h"
#include "channel/sdrdaemonchannelsourcesettings.h"
#include "channel/sdrdaemondataqueue.h"
+#include "channel/sdrdaemondatablock.h"
class ThreadedBasebandSampleSource;
class UpChannelizer;
@@ -97,8 +98,12 @@ private:
QString m_dataAddress;
uint16_t m_dataPort;
+ CM256::cm256_block m_cm256DescriptorBlocks[2*SDRDaemonNbOrginalBlocks]; //!< CM256 decoder descriptors (block addresses and block indexes)
+ SDRDaemonMetaDataFEC m_currentMeta;
+
void applySettings(const SDRDaemonChannelSourceSettings& settings, bool force = false);
bool handleDataBlock(SDRDaemonDataBlock& dataBlock);
+ void printMeta(const QString& header, SDRDaemonMetaDataFEC *metaData);
private slots:
void handleData();
diff --git a/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp b/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp
index f644a9523..533ace7d2 100644
--- a/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp
+++ b/sdrdaemon/channel/sdrdaemonchannelsourcethread.cpp
@@ -20,6 +20,8 @@
// along with this program. If not, see . //
///////////////////////////////////////////////////////////////////////////////////
+#include
+
#include
#include "channel/sdrdaemondataqueue.h"
@@ -31,14 +33,14 @@
MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSourceThread::MsgStartStop, Message)
MESSAGE_CLASS_DEFINITION(SDRDaemonChannelSourceThread::MsgDataBind, Message)
-SDRDaemonChannelSourceThread::SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent) :
+SDRDaemonChannelSourceThread::SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, QObject* parent) :
QThread(parent),
m_running(false),
m_dataQueue(dataQueue),
- m_cm256(cm256),
m_address(QHostAddress::LocalHost),
m_socket(0)
{
+ std::fill(m_dataBlocks, m_dataBlocks+4, (SDRDaemonDataBlock *) 0);
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
}
@@ -131,17 +133,69 @@ void SDRDaemonChannelSourceThread::handleInputMessages()
void SDRDaemonChannelSourceThread::readPendingDatagrams()
{
- char data[1024];
+ SDRDaemonSuperBlock superBlock;
+ qint64 size;
+
while (m_socket->hasPendingDatagrams())
{
QHostAddress sender;
quint16 senderPort = 0;
- qint64 pendingDataSize = m_socket->pendingDatagramSize();
- m_socket->readDatagram(data, pendingDataSize, &sender, &senderPort);
- qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: %lld bytes received from %s:%d",
- pendingDataSize,
- qPrintable(sender.toString()),
- senderPort);
+ //qint64 pendingDataSize = m_socket->pendingDatagramSize();
+ size = m_socket->readDatagram((char *) &superBlock, (long long int) sizeof(SDRDaemonSuperBlock), &sender, &senderPort);
+ if (size == sizeof(SDRDaemonSuperBlock))
+ {
+ unsigned int dataBlockIndex = superBlock.m_header.m_frameIndex % m_nbDataBlocks;
+
+ // create the first block for this index
+ if (m_dataBlocks[dataBlockIndex] == 0) {
+ m_dataBlocks[dataBlockIndex] = new SDRDaemonDataBlock();
+ }
+
+ if (m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex < 0)
+ {
+ // initialize virgin block with the frame index
+ m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex;
+ }
+ else
+ {
+ // if the frame index is not the same for the same slot it means we are starting a new frame
+ uint32_t frameIndex = m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex;
+
+ if (superBlock.m_header.m_frameIndex != frameIndex)
+ {
+ //qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: push frame %u", frameIndex);
+ m_dataQueue->push(m_dataBlocks[dataBlockIndex]);
+ m_dataBlocks[dataBlockIndex] = new SDRDaemonDataBlock();
+ m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex;
+ }
+ }
+
+ m_dataBlocks[dataBlockIndex]->m_superBlocks[superBlock.m_header.m_blockIndex] = superBlock;
+
+ if (superBlock.m_header.m_blockIndex == 0) {
+ m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_metaRetrieved = true;
+ }
+
+ if (superBlock.m_header.m_blockIndex < SDRDaemonNbOrginalBlocks) {
+ m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_originalCount++;
+ } else {
+ m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_recoveryCount++;
+ }
+
+ m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_blockCount++;
+
+// // if enough data blocks to decode push into data queue
+// if (m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_blockCount == SDRDaemonNbOrginalBlocks)
+// {
+// //qDebug("SDRDaemonChannelSourceThread::readPendingDatagrams: push frame %u", superBlock.m_header.m_frameIndex);
+// m_dataQueue->push(m_dataBlocks[dataBlockIndex]);
+// m_dataBlocks[dataBlockIndex] = new SDRDaemonDataBlock();
+// }
+ }
+ else
+ {
+ qWarning("SDRDaemonChannelSourceThread::readPendingDatagrams: wrong super block size not processing");
+ }
}
}
diff --git a/sdrdaemon/channel/sdrdaemonchannelsourcethread.h b/sdrdaemon/channel/sdrdaemonchannelsourcethread.h
index de4767578..37892d69b 100644
--- a/sdrdaemon/channel/sdrdaemonchannelsourcethread.h
+++ b/sdrdaemon/channel/sdrdaemonchannelsourcethread.h
@@ -33,7 +33,6 @@
class SDRDaemonDataQueue;
class SDRDaemonDataBlock;
-class CM256;
class QUdpSocket;
class SDRDaemonChannelSourceThread : public QThread {
@@ -81,7 +80,7 @@ public:
}
};
- SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, CM256 *cm256, QObject* parent = 0);
+ SDRDaemonChannelSourceThread(SDRDaemonDataQueue *dataQueue, QObject* parent = 0);
~SDRDaemonChannelSourceThread();
void startStop(bool start);
@@ -94,11 +93,13 @@ private:
MessageQueue m_inputMessageQueue;
SDRDaemonDataQueue *m_dataQueue;
- CM256 *m_cm256; //!< CM256 library object
QHostAddress m_address;
QUdpSocket *m_socket;
+ static const uint32_t m_nbDataBlocks = 4; //!< number of data blocks in the ring buffer
+ SDRDaemonDataBlock *m_dataBlocks[m_nbDataBlocks]; //!< ring buffer of data blocks indexed by frame affinity
+
void startWork();
void stopWork();
diff --git a/sdrdaemon/channel/sdrdaemondatablock.h b/sdrdaemon/channel/sdrdaemondatablock.h
index b8aec1a71..af7a635b0 100644
--- a/sdrdaemon/channel/sdrdaemondatablock.h
+++ b/sdrdaemon/channel/sdrdaemondatablock.h
@@ -57,7 +57,7 @@ struct SDRDaemonMetaDataFEC
m_sampleBytes = 0;
m_sampleBits = 0;
m_nbOriginalBlocks = 0;
- m_nbFECBlocks = -1;
+ m_nbFECBlocks = 0;
m_tv_sec = 0;
m_tv_usec = 0;
m_crc32 = 0;
@@ -125,6 +125,23 @@ struct SDRDaemonTxControlBlock
}
};
+struct SDRDaemonRxControlBlock
+{
+ int m_blockCount; //!< number of blocks received for this frame
+ int m_originalCount; //!< number of original blocks received
+ int m_recoveryCount; //!< number of recovery blocks received
+ bool m_metaRetrieved; //!< true if meta data (block zero) was retrieved
+ int m_frameIndex; //!< this frame index or -1 if unset
+
+ SDRDaemonRxControlBlock() {
+ m_blockCount = 0;
+ m_originalCount = 0;
+ m_recoveryCount = 0;
+ m_metaRetrieved = false;
+ m_frameIndex = -1;
+ }
+};
+
class SDRDaemonDataBlock
{
public:
@@ -134,7 +151,8 @@ public:
~SDRDaemonDataBlock() {
delete[] m_superBlocks;
}
- SDRDaemonTxControlBlock m_controlBlock;
+ SDRDaemonTxControlBlock m_txControlBlock;
+ SDRDaemonRxControlBlock m_rxControlBlock;
SDRDaemonSuperBlock *m_superBlocks;
};