1
0
mirror of https://github.com/f4exb/sdrangel.git synced 2025-07-22 18:55:45 -04:00

Remote Output and Remote Source more fixes

This commit is contained in:
f4exb 2021-12-12 10:44:58 +01:00
parent f7f5f4b2dd
commit 96411edd3c
27 changed files with 259 additions and 291 deletions

View File

@ -113,9 +113,6 @@ public:
static const char* const m_channelIdURI; static const char* const m_channelIdURI;
static const char* const m_channelId; static const char* const m_channelId;
signals:
void dataBlockAvailable(RemoteDataBlock *dataBlock);
private: private:
DeviceAPI *m_deviceAPI; DeviceAPI *m_deviceAPI;
QThread *m_thread; QThread *m_thread;

View File

@ -47,7 +47,7 @@ void RemoteSinkFifo::reset()
m_writeHead = 0; m_writeHead = 0;
} }
RemoteDataBlock *RemoteSinkFifo::getDataBlock() RemoteDataFrame *RemoteSinkFifo::getDataFrame()
{ {
QMutexLocker mutexLocker(&m_mutex); QMutexLocker mutexLocker(&m_mutex);
m_servedHead = m_writeHead; m_servedHead = m_writeHead;
@ -62,18 +62,18 @@ RemoteDataBlock *RemoteSinkFifo::getDataBlock()
return &m_data[m_servedHead]; return &m_data[m_servedHead];
} }
unsigned int RemoteSinkFifo::readDataBlock(RemoteDataBlock **dataBlock) unsigned int RemoteSinkFifo::readDataFrame(RemoteDataFrame **dataFrame)
{ {
QMutexLocker mutexLocker(&m_mutex); QMutexLocker mutexLocker(&m_mutex);
if (calculateRemainder() == 0) if (calculateRemainder() == 0)
{ {
*dataBlock = nullptr; *dataFrame = nullptr;
return 0; return 0;
} }
else else
{ {
*dataBlock = &m_data[m_readHead]; *dataFrame = &m_data[m_readHead];
m_readHead = m_readHead < m_size - 1 ? m_readHead + 1 : 0; m_readHead = m_readHead < m_size - 1 ? m_readHead + 1 : 0;
return calculateRemainder(); return calculateRemainder();
} }
@ -92,4 +92,4 @@ unsigned int RemoteSinkFifo::calculateRemainder()
} else { } else {
return m_size - (m_readHead - m_servedHead); return m_size - (m_readHead - m_servedHead);
} }
} }

View File

@ -34,15 +34,15 @@ public:
void resize(unsigned int size); void resize(unsigned int size);
void reset(); void reset();
RemoteDataBlock *getDataBlock(); RemoteDataFrame *getDataFrame();
unsigned int readDataBlock(RemoteDataBlock **dataBlock); unsigned int readDataFrame(RemoteDataFrame **dataFrame);
unsigned int getRemainder(); unsigned int getRemainder();
signals: signals:
void dataBlockServed(); void dataBlockServed();
private: private:
std::vector<RemoteDataBlock> m_data; std::vector<RemoteDataFrame> m_data;
int m_size; int m_size;
int m_readHead; //!< index of last data block processed int m_readHead; //!< index of last data block processed
int m_servedHead; //!< index of last data block served int m_servedHead; //!< index of last data block served

View File

@ -57,37 +57,37 @@ RemoteSinkSender::~RemoteSinkSender()
m_socket->deleteLater(); m_socket->deleteLater();
} }
RemoteDataBlock *RemoteSinkSender::getDataBlock() RemoteDataFrame *RemoteSinkSender::getDataFrame()
{ {
return m_fifo.getDataBlock(); return m_fifo.getDataFrame();
} }
void RemoteSinkSender::handleData() void RemoteSinkSender::handleData()
{ {
RemoteDataBlock *dataBlock; RemoteDataFrame *dataFrame;
unsigned int remainder = m_fifo.getRemainder(); unsigned int remainder = m_fifo.getRemainder();
while (remainder != 0) while (remainder != 0)
{ {
remainder = m_fifo.readDataBlock(&dataBlock); remainder = m_fifo.readDataFrame(&dataFrame);
if (dataBlock) { if (dataFrame) {
sendDataBlock(dataBlock); sendDataFrame(dataFrame);
} }
} }
} }
void RemoteSinkSender::sendDataBlock(RemoteDataBlock *dataBlock) void RemoteSinkSender::sendDataFrame(RemoteDataFrame *dataFrame)
{ {
CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder
CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder
RemoteProtectedBlock fecBlocks[256]; //!< FEC data RemoteProtectedBlock fecBlocks[256]; //!< FEC data
uint16_t frameIndex = dataBlock->m_txControlBlock.m_frameIndex; uint16_t frameIndex = dataFrame->m_txControlBlock.m_frameIndex;
int nbBlocksFEC = dataBlock->m_txControlBlock.m_nbBlocksFEC; int nbBlocksFEC = dataFrame->m_txControlBlock.m_nbBlocksFEC;
m_address.setAddress(dataBlock->m_txControlBlock.m_dataAddress); m_address.setAddress(dataFrame->m_txControlBlock.m_dataAddress);
uint16_t dataPort = dataBlock->m_txControlBlock.m_dataPort; uint16_t dataPort = dataFrame->m_txControlBlock.m_dataPort;
RemoteSuperBlock *txBlockx = dataBlock->m_superBlocks; RemoteSuperBlock *txBlockx = dataFrame->m_superBlocks;
if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode
{ {
@ -141,5 +141,5 @@ void RemoteSinkSender::sendDataBlock(RemoteDataBlock *dataBlock)
} }
} }
dataBlock->m_txControlBlock.m_processed = true; dataFrame->m_txControlBlock.m_processed = true;
} }

View File

@ -36,7 +36,7 @@
#include "remotesinkfifo.h" #include "remotesinkfifo.h"
class RemoteDataBlock; class RemoteDataFrame;
class CM256; class CM256;
class QUdpSocket; class QUdpSocket;
@ -47,7 +47,7 @@ public:
RemoteSinkSender(); RemoteSinkSender();
~RemoteSinkSender(); ~RemoteSinkSender();
RemoteDataBlock *getDataBlock(); RemoteDataFrame *getDataFrame();
private: private:
RemoteSinkFifo m_fifo; RemoteSinkFifo m_fifo;
@ -57,7 +57,7 @@ private:
QHostAddress m_address; QHostAddress m_address;
QUdpSocket *m_socket; QUdpSocket *m_socket;
void sendDataBlock(RemoteDataBlock *dataBlock); void sendDataFrame(RemoteDataFrame *dataFrame);
private slots: private slots:
void handleData(); void handleData();

View File

@ -31,7 +31,7 @@ RemoteSinkSink::RemoteSinkSink() :
m_txBlockIndex(0), m_txBlockIndex(0),
m_frameCount(0), m_frameCount(0),
m_sampleIndex(0), m_sampleIndex(0),
m_dataBlock(nullptr), m_dataFrame(nullptr),
m_deviceCenterFrequency(0), m_deviceCenterFrequency(0),
m_frequencyOffset(0), m_frequencyOffset(0),
m_basebandSampleRate(48000), m_basebandSampleRate(48000),
@ -99,14 +99,14 @@ void RemoteSinkSink::feed(const SampleVector::const_iterator& begin, const Sampl
metaData.m_tv_sec = nowus / 1000000UL; // tv.tv_sec; metaData.m_tv_sec = nowus / 1000000UL; // tv.tv_sec;
metaData.m_tv_usec = nowus % 1000000UL; // tv.tv_usec; metaData.m_tv_usec = nowus % 1000000UL; // tv.tv_usec;
if (!m_dataBlock) { // on the very first cycle there is no data block allocated if (!m_dataFrame) { // on the very first cycle there is no data block allocated
m_dataBlock = m_remoteSinkSender->getDataBlock(); // ask a new block to sender m_dataFrame = m_remoteSinkSender->getDataFrame(); // ask a new block to sender
} }
boost::crc_32_type crc32; boost::crc_32_type crc32;
crc32.process_bytes(&metaData, sizeof(RemoteMetaDataFEC)-4); crc32.process_bytes(&metaData, sizeof(RemoteMetaDataFEC)-4);
metaData.m_crc32 = crc32.checksum(); metaData.m_crc32 = crc32.checksum();
RemoteSuperBlock& superBlock = m_dataBlock->m_superBlocks[0]; // first block RemoteSuperBlock& superBlock = m_dataFrame->m_superBlocks[0]; // first block
superBlock.init(); superBlock.init();
superBlock.m_header.m_frameIndex = m_frameCount; superBlock.m_header.m_frameIndex = m_frameCount;
superBlock.m_header.m_blockIndex = m_txBlockIndex; superBlock.m_header.m_blockIndex = m_txBlockIndex;
@ -156,18 +156,18 @@ void RemoteSinkSink::feed(const SampleVector::const_iterator& begin, const Sampl
m_superBlock.m_header.m_blockIndex = m_txBlockIndex; m_superBlock.m_header.m_blockIndex = m_txBlockIndex;
m_superBlock.m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); m_superBlock.m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4);
m_superBlock.m_header.m_sampleBits = SDR_RX_SAMP_SZ; m_superBlock.m_header.m_sampleBits = SDR_RX_SAMP_SZ;
m_dataBlock->m_superBlocks[m_txBlockIndex] = m_superBlock; m_dataFrame->m_superBlocks[m_txBlockIndex] = m_superBlock;
if (m_txBlockIndex == RemoteNbOrginalBlocks - 1) // frame complete if (m_txBlockIndex == RemoteNbOrginalBlocks - 1) // frame complete
{ {
m_dataBlock->m_txControlBlock.m_frameIndex = m_frameCount; m_dataFrame->m_txControlBlock.m_frameIndex = m_frameCount;
m_dataBlock->m_txControlBlock.m_processed = false; m_dataFrame->m_txControlBlock.m_processed = false;
m_dataBlock->m_txControlBlock.m_complete = true; m_dataFrame->m_txControlBlock.m_complete = true;
m_dataBlock->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC; m_dataFrame->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC;
m_dataBlock->m_txControlBlock.m_dataAddress = m_dataAddress; m_dataFrame->m_txControlBlock.m_dataAddress = m_dataAddress;
m_dataBlock->m_txControlBlock.m_dataPort = m_dataPort; m_dataFrame->m_txControlBlock.m_dataPort = m_dataPort;
m_dataBlock = m_remoteSinkSender->getDataBlock(); // ask a new block to sender m_dataFrame = m_remoteSinkSender->getDataFrame(); // ask a new block to sender
m_txBlockIndex = 0; m_txBlockIndex = 0;
m_frameCount++; m_frameCount++;

View File

@ -55,7 +55,7 @@ private:
int m_sampleIndex; //!< Current sample index in protected block data int m_sampleIndex; //!< Current sample index in protected block data
RemoteSuperBlock m_superBlock; RemoteSuperBlock m_superBlock;
RemoteMetaDataFEC m_currentMetaFEC; RemoteMetaDataFEC m_currentMetaFEC;
RemoteDataBlock *m_dataBlock; RemoteDataFrame *m_dataFrame;
uint64_t m_deviceCenterFrequency; uint64_t m_deviceCenterFrequency;
int64_t m_frequencyOffset; int64_t m_frequencyOffset;

View File

@ -31,7 +31,7 @@ RemoteSourceSource::RemoteSourceSource() :
connect(&m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection); connect(&m_dataQueue, SIGNAL(dataBlockEnqueued()), this, SLOT(handleData()), Qt::QueuedConnection);
m_cm256p = m_cm256.isInitialized() ? &m_cm256 : 0; m_cm256p = m_cm256.isInitialized() ? &m_cm256 : 0;
m_currentMeta.init(); m_currentMeta.init();
m_dataReadQueue.setSize(50); m_dataReadQueue.setSize(20);
applyChannelSettings(m_channelSampleRate, true); applyChannelSettings(m_channelSampleRate, true);
} }
@ -53,7 +53,8 @@ void RemoteSourceSource::pull(SampleVector::iterator begin, unsigned int nbSampl
void RemoteSourceSource::pullOne(Sample& sample) void RemoteSourceSource::pullOne(Sample& sample)
{ {
// m_dataReadQueue.readSample(sample, true); // true is scale for Tx m_dataReadQueue.readSample(sample, true); // true is scale for Tx
return;
Complex ci; Complex ci;
@ -129,18 +130,18 @@ void RemoteSourceSource::stopWorker()
void RemoteSourceSource::handleData() void RemoteSourceSource::handleData()
{ {
RemoteDataBlock* dataBlock; RemoteDataFrame* dataFrame;
while (m_running && ((dataBlock = m_dataQueue.pop()) != nullptr)) { while (m_running && ((dataFrame = m_dataQueue.pop()) != nullptr)) {
handleDataBlock(dataBlock); handleDataFrame(dataFrame);
} }
} }
void RemoteSourceSource::handleDataBlock(RemoteDataBlock* dataBlock) void RemoteSourceSource::handleDataFrame(RemoteDataFrame* dataFrame)
{ {
if (dataBlock->m_rxControlBlock.m_blockCount < RemoteNbOrginalBlocks) if (dataFrame->m_rxControlBlock.m_blockCount < RemoteNbOrginalBlocks)
{ {
qWarning("RemoteSourceSource::handleDataBlock: incomplete data block: not processing"); qWarning("RemoteSourceSource::handleDataFrame: incomplete data frame: not processing");
} }
else else
{ {
@ -148,69 +149,69 @@ void RemoteSourceSource::handleDataBlock(RemoteDataBlock* dataBlock)
for (int blockIndex = 0; blockIndex < 256; blockIndex++) for (int blockIndex = 0; blockIndex < 256; blockIndex++)
{ {
if ((blockIndex == 0) && (dataBlock->m_rxControlBlock.m_metaRetrieved)) if ((blockIndex == 0) && (dataFrame->m_rxControlBlock.m_metaRetrieved))
{ {
m_cm256DescriptorBlocks[blockCount].Index = 0; m_cm256DescriptorBlocks[blockCount].Index = 0;
m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock->m_superBlocks[0].m_protectedBlock); m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataFrame->m_superBlocks[0].m_protectedBlock);
blockCount++; blockCount++;
} }
else if (dataBlock->m_superBlocks[blockIndex].m_header.m_blockIndex != 0) else if (dataFrame->m_superBlocks[blockIndex].m_header.m_blockIndex != 0)
{ {
m_cm256DescriptorBlocks[blockCount].Index = dataBlock->m_superBlocks[blockIndex].m_header.m_blockIndex; m_cm256DescriptorBlocks[blockCount].Index = dataFrame->m_superBlocks[blockIndex].m_header.m_blockIndex;
m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataBlock->m_superBlocks[blockIndex].m_protectedBlock); m_cm256DescriptorBlocks[blockCount].Block = (void *) &(dataFrame->m_superBlocks[blockIndex].m_protectedBlock);
blockCount++; blockCount++;
} }
} }
//qDebug("RemoteSourceSource::handleDataBlock: frame: %u blocks: %d", dataBlock.m_rxControlBlock.m_frameIndex, blockCount); //qDebug("RemoteSourceSource::handleDataFrame: frame: %u blocks: %d", dataFrame.m_rxControlBlock.m_frameIndex, blockCount);
// Need to use the CM256 recovery // Need to use the CM256 recovery
if (m_cm256p &&(dataBlock->m_rxControlBlock.m_originalCount < RemoteNbOrginalBlocks)) if (m_cm256p &&(dataFrame->m_rxControlBlock.m_originalCount < RemoteNbOrginalBlocks))
{ {
qDebug("RemoteSourceSource::handleDataBlock: %d recovery blocks", dataBlock->m_rxControlBlock.m_recoveryCount); qDebug("RemoteSourceSource::handleDataFrame: %d recovery blocks", dataFrame->m_rxControlBlock.m_recoveryCount);
CM256::cm256_encoder_params paramsCM256; CM256::cm256_encoder_params paramsCM256;
paramsCM256.BlockBytes = sizeof(RemoteProtectedBlock); // never changes paramsCM256.BlockBytes = sizeof(RemoteProtectedBlock); // never changes
paramsCM256.OriginalCount = RemoteNbOrginalBlocks; // never changes paramsCM256.OriginalCount = RemoteNbOrginalBlocks; // never changes
if (m_currentMeta.m_tv_sec == 0) { if (m_currentMeta.m_tv_sec == 0) {
paramsCM256.RecoveryCount = dataBlock->m_rxControlBlock.m_recoveryCount; paramsCM256.RecoveryCount = dataFrame->m_rxControlBlock.m_recoveryCount;
} else { } else {
paramsCM256.RecoveryCount = m_currentMeta.m_nbFECBlocks; paramsCM256.RecoveryCount = m_currentMeta.m_nbFECBlocks;
} }
// update counters // update counters
if (dataBlock->m_rxControlBlock.m_originalCount < RemoteNbOrginalBlocks - paramsCM256.RecoveryCount) { if (dataFrame->m_rxControlBlock.m_originalCount < RemoteNbOrginalBlocks - paramsCM256.RecoveryCount) {
m_nbUncorrectableErrors += RemoteNbOrginalBlocks - paramsCM256.RecoveryCount - dataBlock->m_rxControlBlock.m_originalCount; m_nbUncorrectableErrors += RemoteNbOrginalBlocks - paramsCM256.RecoveryCount - dataFrame->m_rxControlBlock.m_originalCount;
} else { } else {
m_nbCorrectableErrors += dataBlock->m_rxControlBlock.m_recoveryCount; m_nbCorrectableErrors += dataFrame->m_rxControlBlock.m_recoveryCount;
} }
if (m_cm256.cm256_decode(paramsCM256, m_cm256DescriptorBlocks)) // CM256 decode if (m_cm256.cm256_decode(paramsCM256, m_cm256DescriptorBlocks)) // CM256 decode
{ {
qWarning() << "RemoteSourceSource::handleDataBlock: decode CM256 error:" qWarning() << "RemoteSourceSource::handleDataFrame: decode CM256 error:"
<< " m_originalCount: " << dataBlock->m_rxControlBlock.m_originalCount << " m_originalCount: " << dataFrame->m_rxControlBlock.m_originalCount
<< " m_recoveryCount: " << dataBlock->m_rxControlBlock.m_recoveryCount; << " m_recoveryCount: " << dataFrame->m_rxControlBlock.m_recoveryCount;
} }
else else
{ {
for (int ir = 0; ir < dataBlock->m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks for (int ir = 0; ir < dataFrame->m_rxControlBlock.m_recoveryCount; ir++) // restore missing blocks
{ {
int recoveryIndex = RemoteNbOrginalBlocks - dataBlock->m_rxControlBlock.m_recoveryCount + ir; int recoveryIndex = RemoteNbOrginalBlocks - dataFrame->m_rxControlBlock.m_recoveryCount + ir;
int blockIndex = m_cm256DescriptorBlocks[recoveryIndex].Index; int blockIndex = m_cm256DescriptorBlocks[recoveryIndex].Index;
RemoteProtectedBlock *recoveredBlock = RemoteProtectedBlock *recoveredBlock =
(RemoteProtectedBlock *) m_cm256DescriptorBlocks[recoveryIndex].Block; (RemoteProtectedBlock *) m_cm256DescriptorBlocks[recoveryIndex].Block;
memcpy((void *) &(dataBlock->m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(RemoteProtectedBlock)); memcpy((void *) &(dataFrame->m_superBlocks[blockIndex].m_protectedBlock), recoveredBlock, sizeof(RemoteProtectedBlock));
if ((blockIndex == 0) && !dataBlock->m_rxControlBlock.m_metaRetrieved) { if ((blockIndex == 0) && !dataFrame->m_rxControlBlock.m_metaRetrieved) {
dataBlock->m_rxControlBlock.m_metaRetrieved = true; dataFrame->m_rxControlBlock.m_metaRetrieved = true;
} }
} }
} }
} }
// Validate block zero and retrieve its data // Validate block zero and retrieve its data
if (dataBlock->m_rxControlBlock.m_metaRetrieved) if (dataFrame->m_rxControlBlock.m_metaRetrieved)
{ {
RemoteMetaDataFEC *metaData = (RemoteMetaDataFEC *) &(dataBlock->m_superBlocks[0].m_protectedBlock); RemoteMetaDataFEC *metaData = (RemoteMetaDataFEC *) &(dataFrame->m_superBlocks[0].m_protectedBlock);
boost::crc_32_type crc32; boost::crc_32_type crc32;
crc32.process_bytes(metaData, sizeof(RemoteMetaDataFEC)-4); crc32.process_bytes(metaData, sizeof(RemoteMetaDataFEC)-4);
@ -218,7 +219,7 @@ void RemoteSourceSource::handleDataBlock(RemoteDataBlock* dataBlock)
{ {
if (!(m_currentMeta == *metaData)) if (!(m_currentMeta == *metaData))
{ {
printMeta("RemoteSourceSource::handleDataBlock", metaData); printMeta("RemoteSourceSource::handleDataFrame", metaData);
if (m_currentMeta.m_sampleRate != metaData->m_sampleRate) { if (m_currentMeta.m_sampleRate != metaData->m_sampleRate) {
emit newRemoteSampleRate(metaData->m_sampleRate); emit newRemoteSampleRate(metaData->m_sampleRate);
@ -230,11 +231,11 @@ void RemoteSourceSource::handleDataBlock(RemoteDataBlock* dataBlock)
} }
else else
{ {
qWarning() << "RemoteSource::handleDataBlock: recovered meta: invalid CRC32"; qWarning() << "RemoteSource::handleDataFrame: recovered meta: invalid CRC32";
} }
} }
m_dataReadQueue.push(dataBlock); // Push into R/W buffer m_dataReadQueue.push(dataFrame); // Push into R/W buffer
} }
} }

View File

@ -80,7 +80,7 @@ private:
void startWorker(); void startWorker();
void stopWorker(); void stopWorker();
void handleDataBlock(RemoteDataBlock *dataBlock); void handleDataFrame(RemoteDataFrame *dataFrame);
void printMeta(const QString& header, RemoteMetaDataFEC *metaData); void printMeta(const QString& header, RemoteMetaDataFEC *metaData);
void getSample(); void getSample();

View File

@ -33,9 +33,12 @@ RemoteSourceWorker::RemoteSourceWorker(RemoteDataQueue *dataQueue, QObject* pare
m_address(QHostAddress::LocalHost), m_address(QHostAddress::LocalHost),
m_socket(this), m_socket(this),
m_mutex(QMutex::Recursive), m_mutex(QMutex::Recursive),
m_sampleRate(0) m_sampleRate(0),
m_udpReadBytes(0)
{ {
std::fill(m_dataBlocks, m_dataBlocks+4, (RemoteDataBlock *) 0); m_udpBuf = new char[RemoteUdpSize];
std::fill(m_dataFrames, m_dataFrames+4, (RemoteDataFrame *) 0);
connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection); connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
connect(&m_socket, SIGNAL(readyRead()),this, SLOT(recv())); connect(&m_socket, SIGNAL(readyRead()),this, SLOT(recv()));
#if QT_VERSION < QT_VERSION_CHECK(5, 15, 0) #if QT_VERSION < QT_VERSION_CHECK(5, 15, 0)
@ -106,87 +109,87 @@ void RemoteSourceWorker::handleInputMessages()
QMutexLocker mutexLocker(&m_mutex); QMutexLocker mutexLocker(&m_mutex);
MsgDataBind* notif = (MsgDataBind*) message; MsgDataBind* notif = (MsgDataBind*) message;
qDebug("RemoteSourceWorker::handleInputMessages: MsgDataBind: %s:%d", qPrintable(notif->getAddress().toString()), notif->getPort()); qDebug("RemoteSourceWorker::handleInputMessages: MsgDataBind: %s:%d", qPrintable(notif->getAddress().toString()), notif->getPort());
disconnect(&m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams())); disconnect(&m_socket, SIGNAL(readyRead()), this, SLOT(dataReadyRead()));
m_socket.abort(); m_socket.abort();
m_socket.bind(notif->getAddress(), notif->getPort()); m_socket.bind(notif->getAddress(), notif->getPort());
connect(&m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams())); connect(&m_socket, SIGNAL(readyRead()), this, SLOT(dataReadyRead()));
} }
} }
} }
void RemoteSourceWorker::readPendingDatagrams() void RemoteSourceWorker::dataReadyRead()
{ {
RemoteSuperBlock superBlock; m_udpReadBytes = 0;
qint64 size;
while (m_socket.hasPendingDatagrams()) while (m_socket.hasPendingDatagrams())
{ {
qint64 pendingDataSize = m_socket.pendingDatagramSize();
QHostAddress sender; QHostAddress sender;
quint16 senderPort = 0; m_udpReadBytes += m_socket.readDatagram(&m_udpBuf[m_udpReadBytes], pendingDataSize, &sender, nullptr);
//qint64 pendingDataSize = m_socket->pendingDatagramSize();
size = m_socket.readDatagram((char *) &superBlock, (long long int) sizeof(RemoteSuperBlock), &sender, &senderPort);
if (size == sizeof(RemoteSuperBlock)) if (m_udpReadBytes == RemoteUdpSize)
{ {
unsigned int dataBlockIndex = superBlock.m_header.m_frameIndex % m_nbDataBlocks; processData();
int blockIndex = superBlock.m_header.m_blockIndex; m_udpReadBytes = 0;
}
}
}
if (blockIndex == 0) // first block with meta data void RemoteSourceWorker::processData()
{ {
const RemoteMetaDataFEC *metaData = (const RemoteMetaDataFEC *) &superBlock.m_protectedBlock; RemoteSuperBlock *superBlock = (RemoteSuperBlock *) m_udpBuf;
uint32_t sampleRate = metaData->m_sampleRate; unsigned int dataBlockIndex = superBlock->m_header.m_frameIndex % m_nbDataFrames;
int blockIndex = superBlock->m_header.m_blockIndex;
if (m_sampleRate != sampleRate) if (blockIndex == 0) // first block with meta data
{ {
qDebug("RemoteSourceWorker::readPendingDatagrams: sampleRate: %u", sampleRate); const RemoteMetaDataFEC *metaData = (const RemoteMetaDataFEC *) &superBlock->m_protectedBlock;
m_socket.setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, getDataSocketBufferSize(sampleRate)); uint32_t sampleRate = metaData->m_sampleRate;
m_sampleRate = sampleRate;
}
}
// create the first block for this index if (m_sampleRate != sampleRate)
if (m_dataBlocks[dataBlockIndex] == 0) {
m_dataBlocks[dataBlockIndex] = new RemoteDataBlock();
}
if (m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex < 0)
{
// initialize virgin block with the frame index
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex;
}
else
{
// if the frame index is not the same for the same slot it means we are starting a new frame
uint32_t frameIndex = m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex;
if (superBlock.m_header.m_frameIndex != frameIndex)
{
//qDebug("RemoteSourceWorker::readPendingDatagrams: push frame %u", frameIndex);
m_dataQueue->push(m_dataBlocks[dataBlockIndex]);
m_dataBlocks[dataBlockIndex] = new RemoteDataBlock();
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex;
}
}
m_dataBlocks[dataBlockIndex]->m_superBlocks[superBlock.m_header.m_blockIndex] = superBlock;
if (superBlock.m_header.m_blockIndex == 0) {
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_metaRetrieved = true;
}
if (superBlock.m_header.m_blockIndex < RemoteNbOrginalBlocks) {
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_originalCount++;
} else {
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_recoveryCount++;
}
m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_blockCount++;
}
else
{ {
qWarning("RemoteSourceWorker::readPendingDatagrams: wrong super block size not processing"); qDebug("RemoteSourceWorker::processData: sampleRate: %u", sampleRate);
m_socket.setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, getDataSocketBufferSize(sampleRate));
m_sampleRate = sampleRate;
} }
} }
// create the first block for this index
if (m_dataFrames[dataBlockIndex] == nullptr) {
m_dataFrames[dataBlockIndex] = new RemoteDataFrame();
}
if (m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_frameIndex < 0)
{
// initialize virgin block with the frame index
m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock->m_header.m_frameIndex;
}
else
{
// if the frame index is not the same for the same slot it means we are starting a new frame
uint32_t frameIndex = m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_frameIndex;
if (superBlock->m_header.m_frameIndex != frameIndex)
{
m_dataQueue->push(m_dataFrames[dataBlockIndex]);
m_dataFrames[dataBlockIndex] = new RemoteDataFrame();
m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock->m_header.m_frameIndex;
}
}
m_dataFrames[dataBlockIndex]->m_superBlocks[superBlock->m_header.m_blockIndex] = *superBlock;
if (superBlock->m_header.m_blockIndex == 0) {
m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_metaRetrieved = true;
}
if (superBlock->m_header.m_blockIndex < RemoteNbOrginalBlocks) {
m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_originalCount++;
} else {
m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_recoveryCount++;
}
m_dataFrames[dataBlockIndex]->m_rxControlBlock.m_blockCount++;
} }
int RemoteSourceWorker::getDataSocketBufferSize(uint32_t inSampleRate) int RemoteSourceWorker::getDataSocketBufferSize(uint32_t inSampleRate)

View File

@ -26,7 +26,7 @@
#include "util/messagequeue.h" #include "util/messagequeue.h"
class RemoteDataQueue; class RemoteDataQueue;
class RemoteDataBlock; class RemoteDataFrame;
class RemoteSourceWorker : public QObject { class RemoteSourceWorker : public QObject {
Q_OBJECT Q_OBJECT
@ -71,18 +71,22 @@ private:
QUdpSocket m_socket; QUdpSocket m_socket;
QMutex m_mutex; QMutex m_mutex;
static const uint32_t m_nbDataBlocks = 4; //!< number of data blocks in the ring buffer static const uint32_t m_nbDataFrames = 4; //!< number of data frames in the ring buffer
RemoteDataBlock *m_dataBlocks[m_nbDataBlocks]; //!< ring buffer of data blocks indexed by frame affinity RemoteDataFrame *m_dataFrames[m_nbDataFrames]; //!< ring buffer of data frames indexed by frame affinity
uint32_t m_sampleRate; //!< current sample rate from meta data uint32_t m_sampleRate; //!< current sample rate from meta data
qint64 m_udpReadBytes;
char *m_udpBuf;
static int getDataSocketBufferSize(uint32_t inSampleRate); static int getDataSocketBufferSize(uint32_t inSampleRate);
void processData();
private slots: private slots:
void started(); void started();
void finished(); void finished();
void errorOccurred(QAbstractSocket::SocketError socketError); void errorOccurred(QAbstractSocket::SocketError socketError);
void handleInputMessages(); void handleInputMessages();
void readPendingDatagrams(); void dataReadyRead();
}; };

View File

@ -200,14 +200,11 @@ bool RemoteOutput::handleMessage(const Message& message)
MsgConfigureRemoteOutputWork& conf = (MsgConfigureRemoteOutputWork&) message; MsgConfigureRemoteOutputWork& conf = (MsgConfigureRemoteOutputWork&) message;
bool working = conf.isWorking(); bool working = conf.isWorking();
if (m_remoteOutputWorker != 0) if (m_remoteOutputWorker != nullptr)
{ {
if (working) if (working) {
{
m_remoteOutputWorker->startWork(); m_remoteOutputWorker->startWork();
} } else {
else
{
m_remoteOutputWorker->stopWork(); m_remoteOutputWorker->stopWork();
} }
} }
@ -221,8 +218,7 @@ bool RemoteOutput::handleMessage(const Message& message)
if (cmd.getStartStop()) if (cmd.getStartStop())
{ {
if (m_deviceAPI->initDeviceEngine()) if (m_deviceAPI->initDeviceEngine()) {
{
m_deviceAPI->startDeviceEngine(); m_deviceAPI->startDeviceEngine();
} }
} }
@ -241,8 +237,7 @@ bool RemoteOutput::handleMessage(const Message& message)
{ {
MsgConfigureRemoteOutputChunkCorrection& conf = (MsgConfigureRemoteOutputChunkCorrection&) message; MsgConfigureRemoteOutputChunkCorrection& conf = (MsgConfigureRemoteOutputChunkCorrection&) message;
if (m_remoteOutputWorker != 0) if (m_remoteOutputWorker != nullptr) {
{
m_remoteOutputWorker->setChunkCorrection(conf.getChunkCorrection()); m_remoteOutputWorker->setChunkCorrection(conf.getChunkCorrection());
} }
@ -472,9 +467,8 @@ void RemoteOutput::webapiFormatDeviceSettings(SWGSDRangel::SWGDeviceSettings& re
void RemoteOutput::webapiFormatDeviceReport(SWGSDRangel::SWGDeviceReport& response) void RemoteOutput::webapiFormatDeviceReport(SWGSDRangel::SWGDeviceReport& response)
{ {
uint64_t ts_usecs;
response.getRemoteOutputReport()->setBufferRwBalance(m_sampleSourceFifo.getRWBalance()); response.getRemoteOutputReport()->setBufferRwBalance(m_sampleSourceFifo.getRWBalance());
response.getRemoteOutputReport()->setSampleCount(m_remoteOutputWorker ? (int) m_remoteOutputWorker->getSamplesCount(ts_usecs) : 0); response.getRemoteOutputReport()->setSampleCount(m_remoteOutputWorker ? (int) m_remoteOutputWorker->getSamplesCount() : 0);
} }
void RemoteOutput::tick() void RemoteOutput::tick()
@ -615,7 +609,8 @@ void RemoteOutput::sampleRateCorrection(double remoteTimeDeltaUs, double timeDel
double chunkCorr = 50000 * deltaSR; // for 50ms chunk intervals (50000us) double chunkCorr = 50000 * deltaSR; // for 50ms chunk intervals (50000us)
m_chunkSizeCorrection += roundf(chunkCorr); m_chunkSizeCorrection += roundf(chunkCorr);
qDebug("RemoteOutput::sampleRateCorrection: %d (%f) samples", m_chunkSizeCorrection, chunkCorr); qDebug("RemoteOutput::sampleRateCorrection: remote: %u / %f us local: %u / %f us corr: %d (%f) samples",
remoteSampleCount, remoteTimeDeltaUs, sampleCount, timeDeltaUs, m_chunkSizeCorrection, chunkCorr);
MsgConfigureRemoteOutputChunkCorrection* message = MsgConfigureRemoteOutputChunkCorrection::create(m_chunkSizeCorrection); MsgConfigureRemoteOutputChunkCorrection* message = MsgConfigureRemoteOutputChunkCorrection::create(m_chunkSizeCorrection);
getInputMessageQueue()->push(message); getInputMessageQueue()->push(message);

View File

@ -47,7 +47,7 @@ void RemoteOutputFifo::reset()
m_writeHead = 0; m_writeHead = 0;
} }
RemoteDataBlock *RemoteOutputFifo::getDataBlock() RemoteDataFrame *RemoteOutputFifo::getDataFrame()
{ {
QMutexLocker mutexLocker(&m_mutex); QMutexLocker mutexLocker(&m_mutex);
m_servedHead = m_writeHead; m_servedHead = m_writeHead;
@ -62,18 +62,18 @@ RemoteDataBlock *RemoteOutputFifo::getDataBlock()
return &m_data[m_servedHead]; return &m_data[m_servedHead];
} }
unsigned int RemoteOutputFifo::readDataBlock(RemoteDataBlock **dataBlock) unsigned int RemoteOutputFifo::readDataFrame(RemoteDataFrame **dataFrame)
{ {
QMutexLocker mutexLocker(&m_mutex); QMutexLocker mutexLocker(&m_mutex);
if (calculateRemainder() == 0) if (calculateRemainder() == 0)
{ {
*dataBlock = nullptr; *dataFrame = nullptr;
return 0; return 0;
} }
else else
{ {
*dataBlock = &m_data[m_readHead]; *dataFrame = &m_data[m_readHead];
m_readHead = m_readHead < m_size - 1 ? m_readHead + 1 : 0; m_readHead = m_readHead < m_size - 1 ? m_readHead + 1 : 0;
return calculateRemainder(); return calculateRemainder();
} }
@ -92,4 +92,4 @@ unsigned int RemoteOutputFifo::calculateRemainder()
} else { } else {
return m_size - (m_readHead - m_servedHead); return m_size - (m_readHead - m_servedHead);
} }
} }

View File

@ -34,15 +34,15 @@ public:
void resize(unsigned int size); void resize(unsigned int size);
void reset(); void reset();
RemoteDataBlock *getDataBlock(); RemoteDataFrame *getDataFrame();
unsigned int readDataBlock(RemoteDataBlock **dataBlock); unsigned int readDataFrame(RemoteDataFrame **dataFrame);
unsigned int getRemainder(); unsigned int getRemainder();
signals: signals:
void dataBlockServed(); void dataBlockServed();
private: private:
std::vector<RemoteDataBlock> m_data; std::vector<RemoteDataFrame> m_data;
int m_size; int m_size;
int m_readHead; //!< index of last data block processed int m_readHead; //!< index of last data block processed
int m_servedHead; //!< index of last data block served int m_servedHead; //!< index of last data block served

View File

@ -73,9 +73,6 @@ RemoteOutputSinkGui::RemoteOutputSinkGui(DeviceUISet *deviceUISet, QWidget* pare
ui->setupUi(this); ui->setupUi(this);
ui->centerFrequency->setColorMapper(ColorMapper(ColorMapper::GrayGold));
ui->centerFrequency->setValueRange(7, 0, pow(10,7));
ui->sampleRate->setColorMapper(ColorMapper(ColorMapper::GrayGreenYellow)); ui->sampleRate->setColorMapper(ColorMapper(ColorMapper::GrayGreenYellow));
ui->sampleRate->setValueRange(7, 32000U, 9000000U); ui->sampleRate->setValueRange(7, 32000U, 9000000U);
@ -214,7 +211,7 @@ void RemoteOutputSinkGui::updateSampleRate()
void RemoteOutputSinkGui::displaySettings() void RemoteOutputSinkGui::displaySettings()
{ {
blockApplySettings(true); blockApplySettings(true);
ui->centerFrequency->setValue(m_deviceCenterFrequency / 1000); ui->centerFrequency->setText(QString("%L1").arg(m_deviceCenterFrequency));
ui->sampleRate->setValue(m_settings.m_sampleRate); ui->sampleRate->setValue(m_settings.m_sampleRate);
ui->nbFECBlocks->setValue(m_settings.m_nbFECBlocks); ui->nbFECBlocks->setValue(m_settings.m_nbFECBlocks);
@ -530,7 +527,7 @@ void RemoteOutputSinkGui::analyzeApiReply(const QJsonObject& jsonObject)
QJsonObject report = jsonObject["RemoteSourceReport"].toObject(); QJsonObject report = jsonObject["RemoteSourceReport"].toObject();
m_deviceCenterFrequency = report["deviceCenterFreq"].toInt() * 1000; m_deviceCenterFrequency = report["deviceCenterFreq"].toInt() * 1000;
m_deviceUISet->getSpectrum()->setCenterFrequency(m_deviceCenterFrequency); m_deviceUISet->getSpectrum()->setCenterFrequency(m_deviceCenterFrequency);
ui->centerFrequency->setValue(m_deviceCenterFrequency/1000); ui->centerFrequency->setText(QString("%L1").arg(m_deviceCenterFrequency));
int remoteRate = report["deviceSampleRate"].toInt(); int remoteRate = report["deviceSampleRate"].toInt();
ui->remoteRateText->setText(tr("%1k").arg((float)(remoteRate) / 1000)); ui->remoteRateText->setText(tr("%1k").arg((float)(remoteRate) / 1000));
int queueSize = report["queueSize"].toInt(); int queueSize = report["queueSize"].toInt();

View File

@ -115,36 +115,24 @@
</spacer> </spacer>
</item> </item>
<item> <item>
<widget class="ValueDial" name="centerFrequency" native="true"> <widget class="QLabel" name="centerFrequency">
<property name="enabled">
<bool>false</bool>
</property>
<property name="sizePolicy">
<sizepolicy hsizetype="Maximum" vsizetype="Maximum">
<horstretch>0</horstretch>
<verstretch>0</verstretch>
</sizepolicy>
</property>
<property name="minimumSize"> <property name="minimumSize">
<size> <size>
<width>32</width> <width>140</width>
<height>16</height> <height>0</height>
</size> </size>
</property> </property>
<property name="font"> <property name="font">
<font> <font>
<family>Liberation Mono</family> <family>Liberation Sans</family>
<pointsize>20</pointsize> <pointsize>14</pointsize>
</font> </font>
</property> </property>
<property name="cursor"> <property name="text">
<cursorShape>PointingHandCursor</cursorShape> <string>10,000,000,000</string>
</property> </property>
<property name="focusPolicy"> <property name="alignment">
<enum>Qt::StrongFocus</enum> <set>Qt::AlignRight|Qt::AlignTrailing|Qt::AlignVCenter</set>
</property>
<property name="toolTip">
<string>Record center frequency in kHz</string>
</property> </property>
</widget> </widget>
</item> </item>
@ -155,7 +143,7 @@
<item> <item>
<widget class="QLabel" name="freqUnits"> <widget class="QLabel" name="freqUnits">
<property name="text"> <property name="text">
<string> kHz</string> <string>Hz</string>
</property> </property>
</widget> </widget>
</item> </item>

View File

@ -63,43 +63,43 @@ void RemoteOutputSender::setDestination(const QString& address, uint16_t port)
m_remoteHostAddress.setAddress(address); m_remoteHostAddress.setAddress(address);
} }
RemoteDataBlock *RemoteOutputSender::getDataBlock() RemoteDataFrame *RemoteOutputSender::getDataFrame()
{ {
return m_fifo.getDataBlock(); return m_fifo.getDataFrame();
} }
void RemoteOutputSender::handleData() void RemoteOutputSender::handleData()
{ {
RemoteDataBlock *dataBlock; RemoteDataFrame *dataFrame;
unsigned int remainder = m_fifo.getRemainder(); unsigned int remainder = m_fifo.getRemainder();
while (remainder != 0) while (remainder != 0)
{ {
remainder = m_fifo.readDataBlock(&dataBlock); remainder = m_fifo.readDataFrame(&dataFrame);
if (dataBlock) { if (dataFrame) {
sendDataBlock(dataBlock); sendDataFrame(dataFrame);
} }
} }
} }
void RemoteOutputSender::sendDataBlock(RemoteDataBlock *dataBlock) void RemoteOutputSender::sendDataFrame(RemoteDataFrame *dataFrame)
{ {
CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder CM256::cm256_encoder_params cm256Params; //!< Main interface with CM256 encoder
CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder CM256::cm256_block descriptorBlocks[256]; //!< Pointers to data for CM256 encoder
RemoteProtectedBlock fecBlocks[256]; //!< FEC data RemoteProtectedBlock fecBlocks[256]; //!< FEC data
uint16_t frameIndex = dataBlock->m_txControlBlock.m_frameIndex; uint16_t frameIndex = dataFrame->m_txControlBlock.m_frameIndex;
int nbBlocksFEC = dataBlock->m_txControlBlock.m_nbBlocksFEC; int nbBlocksFEC = dataFrame->m_txControlBlock.m_nbBlocksFEC;
m_remoteHostAddress.setAddress(dataBlock->m_txControlBlock.m_dataAddress); m_remoteHostAddress.setAddress(dataFrame->m_txControlBlock.m_dataAddress);
uint16_t dataPort = dataBlock->m_txControlBlock.m_dataPort; uint16_t dataPort = dataFrame->m_txControlBlock.m_dataPort;
RemoteSuperBlock *txBlockx = dataBlock->m_superBlocks; RemoteSuperBlock *txBlockx = dataFrame->m_superBlocks;
if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode
{ {
if (m_udpSocket) if (m_udpSocket)
{ {
for (int i = 0; i < RemoteNbOrginalBlocks; i++) { // send block via UDP for (int i = 0; i < RemoteNbOrginalBlocks; i++) { // send blocks via UDP
m_udpSocket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_remoteHostAddress, dataPort); m_udpSocket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_remoteHostAddress, dataPort);
} }
} }
@ -130,7 +130,7 @@ void RemoteOutputSender::sendDataBlock(RemoteDataBlock *dataBlock)
{ {
qWarning("RemoteSinkSender::handleDataBlock: CM256 encode failed. Transmit without FEC."); qWarning("RemoteSinkSender::handleDataBlock: CM256 encode failed. Transmit without FEC.");
cm256Params.RecoveryCount = 0; cm256Params.RecoveryCount = 0;
RemoteSuperBlock& superBlock = dataBlock->m_superBlocks[0]; // first block RemoteSuperBlock& superBlock = dataFrame->m_superBlocks[0]; // first block
RemoteMetaDataFEC *destMeta = (RemoteMetaDataFEC *) &superBlock.m_protectedBlock; RemoteMetaDataFEC *destMeta = (RemoteMetaDataFEC *) &superBlock.m_protectedBlock;
destMeta->m_nbFECBlocks = 0; destMeta->m_nbFECBlocks = 0;
boost::crc_32_type crc32; boost::crc_32_type crc32;
@ -146,11 +146,11 @@ void RemoteOutputSender::sendDataBlock(RemoteDataBlock *dataBlock)
// Transmit all blocks // Transmit all blocks
if (m_udpSocket) if (m_udpSocket)
{ {
for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) { // send block via UDP for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++) { // send blocks via UDP
m_udpSocket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_remoteHostAddress, dataPort); m_udpSocket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_remoteHostAddress, dataPort);
} }
} }
} }
dataBlock->m_txControlBlock.m_processed = true; dataFrame->m_txControlBlock.m_processed = true;
} }

View File

@ -36,7 +36,7 @@
#include "remoteoutputfifo.h" #include "remoteoutputfifo.h"
class RemoteDataBlock; class RemoteDataFrame;
class CM256; class CM256;
class QUdpSocket; class QUdpSocket;
@ -47,7 +47,7 @@ public:
RemoteOutputSender(); RemoteOutputSender();
~RemoteOutputSender(); ~RemoteOutputSender();
RemoteDataBlock *getDataBlock(); RemoteDataFrame *getDataFrame();
void setDestination(const QString& address, uint16_t port); void setDestination(const QString& address, uint16_t port);
private: private:
@ -61,7 +61,7 @@ private:
uint16_t m_remotePort; uint16_t m_remotePort;
QHostAddress m_remoteHostAddress; QHostAddress m_remoteHostAddress;
void sendDataBlock(RemoteDataBlock *dataBlock); void sendDataFrame(RemoteDataFrame *dataFrame);
private slots: private slots:
void handleData(); void handleData();

View File

@ -120,6 +120,7 @@ void RemoteOutputWorker::tick()
SampleVector& data = m_sampleFifo->getData(); SampleVector& data = m_sampleFifo->getData();
unsigned int iPart1Begin, iPart1End, iPart2Begin, iPart2End; unsigned int iPart1Begin, iPart1End, iPart2Begin, iPart2End;
m_sampleFifo->read(m_samplesChunkSize, iPart1Begin, iPart1End, iPart2Begin, iPart2End); m_sampleFifo->read(m_samplesChunkSize, iPart1Begin, iPart1End, iPart2Begin, iPart2End);
m_samplesCount += m_samplesChunkSize;
if (iPart1Begin != iPart1End) if (iPart1Begin != iPart1End)
{ {

View File

@ -53,8 +53,8 @@ public:
bool isRunning() const { return m_running; } bool isRunning() const { return m_running; }
uint32_t getSamplesCount() const { return m_samplesCount; }
uint32_t getSamplesCount(uint64_t& ts_usecs) const; uint32_t getSamplesCount(uint64_t& ts_usecs) const;
void setSamplesCount(int samplesCount) { m_samplesCount = samplesCount; }
void setChunkCorrection(int chunkCorrection) { m_chunkCorrection = chunkCorrection; } void setChunkCorrection(int chunkCorrection) { m_chunkCorrection = chunkCorrection; }
void connectTimer(const QTimer& timer); void connectTimer(const QTimer& timer);

View File

@ -32,7 +32,7 @@ UDPSinkFEC::UDPSinkFEC() :
m_nbSamples(0), m_nbSamples(0),
m_nbBlocksFEC(0), m_nbBlocksFEC(0),
m_txDelayRatio(0.0), m_txDelayRatio(0.0),
m_dataBlock(nullptr), m_dataFrame(nullptr),
m_txBlockIndex(0), m_txBlockIndex(0),
m_txBlocksIndex(0), m_txBlocksIndex(0),
m_frameCount(0), m_frameCount(0),
@ -115,14 +115,14 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk
metaData.m_tv_sec = nowus / 1000000UL; // tv.tv_sec; metaData.m_tv_sec = nowus / 1000000UL; // tv.tv_sec;
metaData.m_tv_usec = nowus % 1000000UL; // tv.tv_usec; metaData.m_tv_usec = nowus % 1000000UL; // tv.tv_usec;
if (!m_dataBlock) { // on the very first cycle there is no data block allocated if (!m_dataFrame) { // on the very first cycle there is no data block allocated
m_dataBlock = m_remoteOutputSender->getDataBlock(); // ask a new block to sender m_dataFrame = m_remoteOutputSender->getDataFrame(); // ask a new block to sender
} }
boost::crc_32_type crc32; boost::crc_32_type crc32;
crc32.process_bytes(&metaData, sizeof(RemoteMetaDataFEC)-4); crc32.process_bytes(&metaData, sizeof(RemoteMetaDataFEC)-4);
metaData.m_crc32 = crc32.checksum(); metaData.m_crc32 = crc32.checksum();
RemoteSuperBlock& superBlock = m_dataBlock->m_superBlocks[0]; // first block RemoteSuperBlock& superBlock = m_dataFrame->m_superBlocks[0]; // first block
superBlock.init(); superBlock.init();
superBlock.m_header.m_frameIndex = m_frameCount; superBlock.m_header.m_frameIndex = m_frameCount;
superBlock.m_header.m_blockIndex = m_txBlockIndex; superBlock.m_header.m_blockIndex = m_txBlockIndex;
@ -172,18 +172,18 @@ void UDPSinkFEC::write(const SampleVector::iterator& begin, uint32_t sampleChunk
m_superBlock.m_header.m_blockIndex = m_txBlockIndex; m_superBlock.m_header.m_blockIndex = m_txBlockIndex;
m_superBlock.m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4); m_superBlock.m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4);
m_superBlock.m_header.m_sampleBits = SDR_RX_SAMP_SZ; m_superBlock.m_header.m_sampleBits = SDR_RX_SAMP_SZ;
m_dataBlock->m_superBlocks[m_txBlockIndex] = m_superBlock; m_dataFrame->m_superBlocks[m_txBlockIndex] = m_superBlock;
if (m_txBlockIndex == RemoteNbOrginalBlocks - 1) // frame complete if (m_txBlockIndex == RemoteNbOrginalBlocks - 1) // frame complete
{ {
m_dataBlock->m_txControlBlock.m_frameIndex = m_frameCount; m_dataFrame->m_txControlBlock.m_frameIndex = m_frameCount;
m_dataBlock->m_txControlBlock.m_processed = false; m_dataFrame->m_txControlBlock.m_processed = false;
m_dataBlock->m_txControlBlock.m_complete = true; m_dataFrame->m_txControlBlock.m_complete = true;
m_dataBlock->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC; m_dataFrame->m_txControlBlock.m_nbBlocksFEC = m_nbBlocksFEC;
m_dataBlock->m_txControlBlock.m_dataAddress = m_remoteAddress; m_dataFrame->m_txControlBlock.m_dataAddress = m_remoteAddress;
m_dataBlock->m_txControlBlock.m_dataPort = m_remotePort; m_dataFrame->m_txControlBlock.m_dataPort = m_remotePort;
m_dataBlock = m_remoteOutputSender->getDataBlock(); // ask a new block to sender m_dataFrame = m_remoteOutputSender->getDataFrame(); // ask a new block to sender
m_txBlockIndex = 0; m_txBlockIndex = 0;
m_frameCount++; m_frameCount++;

View File

@ -86,7 +86,7 @@ private:
RemoteMetaDataFEC m_currentMetaFEC; //!< Meta data for current frame RemoteMetaDataFEC m_currentMetaFEC; //!< Meta data for current frame
uint32_t m_nbBlocksFEC; //!< Variable number of FEC blocks uint32_t m_nbBlocksFEC; //!< Variable number of FEC blocks
float m_txDelayRatio; //!< Delay in ratio of nominal frame period float m_txDelayRatio; //!< Delay in ratio of nominal frame period
RemoteDataBlock *m_dataBlock; RemoteDataFrame *m_dataFrame;
RemoteSuperBlock m_superBlock; //!< current super block being built RemoteSuperBlock m_superBlock; //!< current super block being built
int m_txBlockIndex; //!< Current index in blocks to transmit in the Tx row int m_txBlockIndex; //!< Current index in blocks to transmit in the Tx row
int m_txBlocksIndex; //!< Current index of Tx blocks row int m_txBlocksIndex; //!< Current index of Tx blocks row

View File

@ -157,13 +157,13 @@ struct RemoteRxControlBlock
} }
}; };
class RemoteDataBlock class RemoteDataFrame
{ {
public: public:
RemoteDataBlock() { RemoteDataFrame() {
m_superBlocks = new RemoteSuperBlock[256]; m_superBlocks = new RemoteSuperBlock[256]; //!< 128 original bloks + 128 possible recovery blocks
} }
~RemoteDataBlock() { ~RemoteDataFrame() {
delete[] m_superBlocks; delete[] m_superBlocks;
} }
RemoteTxControlBlock m_txControlBlock; RemoteTxControlBlock m_txControlBlock;

View File

@ -30,14 +30,13 @@
RemoteDataQueue::RemoteDataQueue(QObject* parent) : RemoteDataQueue::RemoteDataQueue(QObject* parent) :
QObject(parent), QObject(parent),
m_lock(QMutex::Recursive), m_lock(QMutex::Recursive),
m_queue(), m_queue()
m_count(0)
{ {
} }
RemoteDataQueue::~RemoteDataQueue() RemoteDataQueue::~RemoteDataQueue()
{ {
RemoteDataBlock* data; RemoteDataFrame* data;
while ((data = pop()) != nullptr) while ((data = pop()) != nullptr)
{ {
@ -46,14 +45,13 @@ RemoteDataQueue::~RemoteDataQueue()
} }
} }
void RemoteDataQueue::push(RemoteDataBlock* data, bool emitSignal) void RemoteDataQueue::push(RemoteDataFrame* data, bool emitSignal)
{ {
if (data) if (data)
{ {
m_lock.lock(); m_lock.lock();
m_queue.enqueue(data); m_queue.enqueue(data);
m_count++; // qDebug("RemoteDataQueue::push: %d", m_queue.size());
// qDebug("RemoteDataQueue::push: %d %d", m_count, m_queue.size());
m_lock.unlock(); m_lock.unlock();
} }
@ -62,14 +60,13 @@ void RemoteDataQueue::push(RemoteDataBlock* data, bool emitSignal)
} }
} }
RemoteDataBlock* RemoteDataQueue::pop() RemoteDataFrame* RemoteDataQueue::pop()
{ {
QMutexLocker locker(&m_lock); QMutexLocker locker(&m_lock);
if (m_queue.isEmpty()) { if (m_queue.isEmpty()) {
return nullptr; return nullptr;
} else { } else {
m_count--;
return m_queue.dequeue(); return m_queue.dequeue();
} }
} }

View File

@ -31,17 +31,17 @@
#include "export.h" #include "export.h"
class RemoteDataBlock; class RemoteDataFrame;
class SDRBASE_API RemoteDataQueue : public QObject { class SDRBASE_API RemoteDataQueue : public QObject {
Q_OBJECT Q_OBJECT
public: public:
RemoteDataQueue(QObject* parent = NULL); RemoteDataQueue(QObject* parent = nullptr);
~RemoteDataQueue(); ~RemoteDataQueue();
void push(RemoteDataBlock* dataBlock, bool emitSignal = true); //!< Push daa block onto queue void push(RemoteDataFrame* dataFrame, bool emitSignal = true); //!< Push data frame onto queue
RemoteDataBlock* pop(); //!< Pop message from queue RemoteDataFrame* pop(); //!< Pop frame from queue
int size(); //!< Returns queue size int size(); //!< Returns queue size
void clear(); //!< Empty queue void clear(); //!< Empty queue
@ -51,8 +51,7 @@ signals:
private: private:
QMutex m_lock; QMutex m_lock;
QQueue<RemoteDataBlock*> m_queue; QQueue<RemoteDataFrame*> m_queue;
int m_count;
}; };
#endif /* CHANNEL_REMOTEDATAQUEUE_H_ */ #endif /* CHANNEL_REMOTEDATAQUEUE_H_ */

View File

@ -28,17 +28,16 @@
const uint32_t RemoteDataReadQueue::MinimumMaxSize = 10; const uint32_t RemoteDataReadQueue::MinimumMaxSize = 10;
RemoteDataReadQueue::RemoteDataReadQueue() : RemoteDataReadQueue::RemoteDataReadQueue() :
m_dataBlock(nullptr), m_dataFrame(nullptr),
m_maxSize(MinimumMaxSize), m_maxSize(MinimumMaxSize),
m_blockIndex(1), m_blockIndex(1),
m_sampleIndex(0), m_sampleIndex(0),
m_sampleCount(0), m_sampleCount(0)
m_full(false)
{} {}
RemoteDataReadQueue::~RemoteDataReadQueue() RemoteDataReadQueue::~RemoteDataReadQueue()
{ {
RemoteDataBlock* data; RemoteDataFrame* data;
while ((data = pop()) != nullptr) while ((data = pop()) != nullptr)
{ {
@ -47,27 +46,18 @@ RemoteDataReadQueue::~RemoteDataReadQueue()
} }
} }
void RemoteDataReadQueue::push(RemoteDataBlock* dataBlock) void RemoteDataReadQueue::push(RemoteDataFrame* dataFrame)
{ {
if (length() >= m_maxSize) if (length() < m_maxSize) {
{ m_dataReadQueue.enqueue(dataFrame);
} else {
qWarning("RemoteDataReadQueue::push: queue is full"); qWarning("RemoteDataReadQueue::push: queue is full");
m_full = true; // stop filling the queue
RemoteDataBlock *data = m_dataReadQueue.takeLast();
delete data;
}
if (m_full) {
m_full = (length() > m_maxSize/10); // do not fill queue again before queue is half size
}
if (!m_full) {
m_dataReadQueue.enqueue(dataBlock);
} }
} }
RemoteDataBlock* RemoteDataReadQueue::pop() RemoteDataFrame* RemoteDataReadQueue::pop()
{ {
if (m_dataReadQueue.isEmpty()) if (m_dataReadQueue.isEmpty())
{ {
return nullptr; return nullptr;
@ -76,7 +66,6 @@ RemoteDataBlock* RemoteDataReadQueue::pop()
{ {
m_blockIndex = 1; m_blockIndex = 1;
m_sampleIndex = 0; m_sampleIndex = 0;
return m_dataReadQueue.dequeue(); return m_dataReadQueue.dequeue();
} }
} }
@ -91,13 +80,15 @@ void RemoteDataReadQueue::setSize(uint32_t size)
void RemoteDataReadQueue::readSample(Sample& s, bool scaleForTx) void RemoteDataReadQueue::readSample(Sample& s, bool scaleForTx)
{ {
// depletion/repletion state // depletion/repletion state
if (m_dataBlock == nullptr) if (m_dataFrame == nullptr)
{ {
if (length() >= m_maxSize/10) m_dataFrame = pop();
if (m_dataFrame)
{ {
qDebug("RemoteDataReadQueue::readSample: initial pop new block: queue size: %u", length()); qDebug("RemoteDataReadQueue::readSample: initial pop new frame: queue size: %u", length());
m_blockIndex = 1; m_blockIndex = 1;
m_dataBlock = m_dataReadQueue.dequeue(); m_sampleIndex = 0;
convertDataToSample(s, m_blockIndex, m_sampleIndex, scaleForTx); convertDataToSample(s, m_blockIndex, m_sampleIndex, scaleForTx);
m_sampleIndex++; m_sampleIndex++;
m_sampleCount++; m_sampleCount++;
@ -110,7 +101,7 @@ void RemoteDataReadQueue::readSample(Sample& s, bool scaleForTx)
return; return;
} }
int sampleSize = m_dataBlock->m_superBlocks[m_blockIndex].m_header.m_sampleBytes * 2; int sampleSize = m_dataFrame->m_superBlocks[m_blockIndex].m_header.m_sampleBytes * 2;
uint32_t samplesPerBlock = RemoteNbBytesPerBlock / sampleSize; uint32_t samplesPerBlock = RemoteNbBytesPerBlock / sampleSize;
if (m_sampleIndex < samplesPerBlock) if (m_sampleIndex < samplesPerBlock)
@ -132,28 +123,24 @@ void RemoteDataReadQueue::readSample(Sample& s, bool scaleForTx)
} }
else else
{ {
delete m_dataBlock; delete m_dataFrame;
m_dataBlock = nullptr; m_dataFrame = nullptr;
if (length() == 0) { m_dataFrame = pop();
qWarning("RemoteDataReadQueue::readSample: try to pop new block but queue is empty");
}
if (length() > 0) if (m_dataFrame)
{ {
m_blockIndex = 1; m_blockIndex = 1;
m_dataBlock = m_dataReadQueue.dequeue(); m_sampleIndex = 0;
convertDataToSample(s, m_blockIndex, m_sampleIndex, scaleForTx); convertDataToSample(s, m_blockIndex, m_sampleIndex, scaleForTx);
m_sampleIndex++; m_sampleIndex++;
m_sampleCount++; m_sampleCount++;
} }
else else
{ {
qWarning("RemoteDataReadQueue::readSample: try to pop new block but queue is empty");
s = Sample{0, 0}; s = Sample{0, 0};
} }
} }
} }
} }

View File

@ -29,7 +29,7 @@
#include "export.h" #include "export.h"
class RemoteDataBlock; class RemoteDataFrame;
struct Sample; struct Sample;
class SDRBASE_API RemoteDataReadQueue class SDRBASE_API RemoteDataReadQueue
@ -38,7 +38,7 @@ public:
RemoteDataReadQueue(); RemoteDataReadQueue();
~RemoteDataReadQueue(); ~RemoteDataReadQueue();
void push(RemoteDataBlock* dataBlock); //!< push block on the queue void push(RemoteDataFrame* dataFrame); //!< push frame on the queue
void readSample(Sample& s, bool scaleForTx = false); //!< Read sample from queue possibly scaling to Tx size void readSample(Sample& s, bool scaleForTx = false); //!< Read sample from queue possibly scaling to Tx size
uint32_t length() const { return m_dataReadQueue.size(); } //!< Returns queue length uint32_t length() const { return m_dataReadQueue.size(); } //!< Returns queue length
uint32_t size() const { return m_maxSize; } //!< Returns queue size (max length) uint32_t size() const { return m_maxSize; } //!< Returns queue size (max length)
@ -48,26 +48,25 @@ public:
static const uint32_t MinimumMaxSize; static const uint32_t MinimumMaxSize;
private: private:
QQueue<RemoteDataBlock*> m_dataReadQueue; QQueue<RemoteDataFrame*> m_dataReadQueue;
RemoteDataBlock *m_dataBlock; RemoteDataFrame *m_dataFrame;
uint32_t m_maxSize; uint32_t m_maxSize;
uint32_t m_blockIndex; uint32_t m_blockIndex;
uint32_t m_sampleIndex; uint32_t m_sampleIndex;
uint32_t m_sampleCount; //!< use a counter capped below 2^31 as it is going to be converted to an int in the web interface uint32_t m_sampleCount; //!< use a counter capped below 2^31 as it is going to be converted to an int in the web interface
bool m_full; //!< full condition was hit
RemoteDataBlock* pop(); //!< Pop block from the queue RemoteDataFrame* pop(); //!< Pop frame from the queue
inline void convertDataToSample(Sample& s, uint32_t blockIndex, uint32_t sampleIndex, bool scaleForTx) inline void convertDataToSample(Sample& s, uint32_t blockIndex, uint32_t sampleIndex, bool scaleForTx)
{ {
int sampleSize = m_dataBlock->m_superBlocks[blockIndex].m_header.m_sampleBytes * 2; // I/Q sample size in data block int sampleSize = m_dataFrame->m_superBlocks[blockIndex].m_header.m_sampleBytes * 2; // I/Q sample size in data block
int samplebits = m_dataBlock->m_superBlocks[blockIndex].m_header.m_sampleBits; // I or Q sample size in bits int samplebits = m_dataFrame->m_superBlocks[blockIndex].m_header.m_sampleBits; // I or Q sample size in bits
int32_t iconv, qconv; int32_t iconv, qconv;
if ((sizeof(Sample) == 4) && (sampleSize == 8)) // generally 24->16 bits if ((sizeof(Sample) == 4) && (sampleSize == 8)) // generally 24->16 bits
{ {
iconv = ((int32_t*) &(m_dataBlock->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize]))[0]; iconv = ((int32_t*) &(m_dataFrame->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize]))[0];
qconv = ((int32_t*) &(m_dataBlock->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize+4]))[0]; qconv = ((int32_t*) &(m_dataFrame->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize+4]))[0];
iconv >>= scaleForTx ? (SDR_TX_SAMP_SZ-SDR_RX_SAMP_SZ) : (samplebits-SDR_RX_SAMP_SZ); iconv >>= scaleForTx ? (SDR_TX_SAMP_SZ-SDR_RX_SAMP_SZ) : (samplebits-SDR_RX_SAMP_SZ);
qconv >>= scaleForTx ? (SDR_TX_SAMP_SZ-SDR_RX_SAMP_SZ) : (samplebits-SDR_RX_SAMP_SZ); qconv >>= scaleForTx ? (SDR_TX_SAMP_SZ-SDR_RX_SAMP_SZ) : (samplebits-SDR_RX_SAMP_SZ);
s.setReal(iconv); s.setReal(iconv);
@ -75,8 +74,8 @@ private:
} }
else if ((sizeof(Sample) == 8) && (sampleSize == 4)) // generally 16->24 bits else if ((sizeof(Sample) == 8) && (sampleSize == 4)) // generally 16->24 bits
{ {
iconv = ((int16_t*) &(m_dataBlock->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize]))[0]; iconv = ((int16_t*) &(m_dataFrame->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize]))[0];
qconv = ((int16_t*) &(m_dataBlock->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize+2]))[0]; qconv = ((int16_t*) &(m_dataFrame->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize+2]))[0];
iconv <<= scaleForTx ? (SDR_TX_SAMP_SZ-samplebits) : (SDR_RX_SAMP_SZ-samplebits); iconv <<= scaleForTx ? (SDR_TX_SAMP_SZ-samplebits) : (SDR_RX_SAMP_SZ-samplebits);
qconv <<= scaleForTx ? (SDR_TX_SAMP_SZ-samplebits) : (SDR_RX_SAMP_SZ-samplebits); qconv <<= scaleForTx ? (SDR_TX_SAMP_SZ-samplebits) : (SDR_RX_SAMP_SZ-samplebits);
s.setReal(iconv); s.setReal(iconv);
@ -84,7 +83,7 @@ private:
} }
else if ((sampleSize == 4) || (sampleSize == 8)) // generally 16->16 or 24->24 bits else if ((sampleSize == 4) || (sampleSize == 8)) // generally 16->16 or 24->24 bits
{ {
s = *((Sample*) &(m_dataBlock->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize])); s = *((Sample*) &(m_dataFrame->m_superBlocks[blockIndex].m_protectedBlock.buf[sampleIndex*sampleSize]));
} }
else // invalid size else // invalid size
{ {