From 05fce637bc032e97d0276b7bcb6af12bee2f6ca3 Mon Sep 17 00:00:00 2001 From: Jon Beniston Date: Tue, 12 Oct 2021 11:18:29 +0100 Subject: [PATCH] Add support for message pipes from features to channels --- sdrbase/channel/channelapi.h | 9 +++ sdrbase/pipes/messagepipes.cpp | 8 +-- sdrbase/pipes/messagepipes.h | 8 +-- sdrbase/pipes/messagepipescommon.h | 1 - sdrbase/pipes/messagepipesgcworker.cpp | 21 ++++--- sdrbase/pipes/messagepipesgcworker.h | 10 ++-- sdrbase/pipes/pipeendpoint.cpp | 76 +++++++++++++++++++------- sdrbase/pipes/pipeendpoint.h | 4 +- 8 files changed, 95 insertions(+), 42 deletions(-) diff --git a/sdrbase/channel/channelapi.h b/sdrbase/channel/channelapi.h index 9c504d429..e876b6505 100644 --- a/sdrbase/channel/channelapi.h +++ b/sdrbase/channel/channelapi.h @@ -130,6 +130,15 @@ public: virtual int getNbSourceStreams() const = 0; virtual qint64 getStreamCenterFrequency(int streamIndex, bool sinkElseSource) const = 0; + void handlePipeMessageQueue(MessageQueue* messageQueue) + { + Message* message; + + while ((message = messageQueue->pop()) != nullptr) { + m_channelMessageQueue.push(message); + } + } + protected: MessageQueue *m_guiMessageQueue; //!< Input message queue to the GUI MessageQueue m_channelMessageQueue; //!< Input message queue for inter plugin communication diff --git a/sdrbase/pipes/messagepipes.cpp b/sdrbase/pipes/messagepipes.cpp index 7977bd011..4dd5cf222 100644 --- a/sdrbase/pipes/messagepipes.cpp +++ b/sdrbase/pipes/messagepipes.cpp @@ -42,14 +42,14 @@ MessagePipes::~MessagePipes() } } -MessageQueue *MessagePipes::registerChannelToFeature(const PipeEndPoint *source, Feature *feature, const QString& type) +MessageQueue *MessagePipes::registerChannelToFeature(const PipeEndPoint *source, PipeEndPoint *dest, const QString& type) { - return m_registrations.registerProducerToConsumer(source, feature, type); + return m_registrations.registerProducerToConsumer(source, dest, type); } -MessageQueue *MessagePipes::unregisterChannelToFeature(const PipeEndPoint *source, Feature *feature, const QString& type) +MessageQueue *MessagePipes::unregisterChannelToFeature(const PipeEndPoint *source, PipeEndPoint *dest, const QString& type) { - MessageQueue *messageQueue = m_registrations.unregisterProducerToConsumer(source, feature, type); + MessageQueue *messageQueue = m_registrations.unregisterProducerToConsumer(source, dest, type); m_gcWorker->addMessageQueueToDelete(messageQueue); return messageQueue; } diff --git a/sdrbase/pipes/messagepipes.h b/sdrbase/pipes/messagepipes.h index 0848adc13..519f24698 100644 --- a/sdrbase/pipes/messagepipes.h +++ b/sdrbase/pipes/messagepipes.h @@ -30,7 +30,6 @@ #include "elementpipesregistrations.h" class PipeEndPoint; -class Feature; class MessagePipesGCWorker; class MessageQueue; @@ -43,12 +42,13 @@ public: MessagePipes& operator=(const MessagePipes&) = delete; ~MessagePipes(); - MessageQueue *registerChannelToFeature(const PipeEndPoint *source, Feature *feature, const QString& type); - MessageQueue *unregisterChannelToFeature(const PipeEndPoint *source, Feature *feature, const QString& type); + // FIXME: Names of these functions should probably change, as we now support channel or feature at either end + MessageQueue *registerChannelToFeature(const PipeEndPoint *source, PipeEndPoint *dest, const QString& type); + MessageQueue *unregisterChannelToFeature(const PipeEndPoint *source, PipeEndPoint *dest, const QString& type); QList* getMessageQueues(const PipeEndPoint *source, const QString& type); private: - ElementPipesRegistrations m_registrations; + ElementPipesRegistrations m_registrations; QThread m_gcThread; //!< Garbage collector thread MessagePipesGCWorker *m_gcWorker; //!< Garbage collector diff --git a/sdrbase/pipes/messagepipescommon.h b/sdrbase/pipes/messagepipescommon.h index 42e7dbb2c..2fda6a3bc 100644 --- a/sdrbase/pipes/messagepipescommon.h +++ b/sdrbase/pipes/messagepipescommon.h @@ -27,7 +27,6 @@ #include "elementpipescommon.h" class PipeEndPoint; -class Feature; class MessageQueue; class SDRBASE_API MessagePipesCommon diff --git a/sdrbase/pipes/messagepipesgcworker.cpp b/sdrbase/pipes/messagepipesgcworker.cpp index 572da409c..24b75ff60 100644 --- a/sdrbase/pipes/messagepipesgcworker.cpp +++ b/sdrbase/pipes/messagepipesgcworker.cpp @@ -24,26 +24,33 @@ bool MessagePipesGCWorker::MessagePipesGC::existsProducer(const PipeEndPoint *pipeEndPoint) { - // Not overly sure about casting to both types here, but currently safeish as the - // existing functions only use the pointer address - and I presume these - // may be pointers to deleted objects anyway? return MainCore::instance()->existsChannel((const ChannelAPI *)pipeEndPoint) || MainCore::instance()->existsFeature((const Feature *)pipeEndPoint); } -bool MessagePipesGCWorker::MessagePipesGC::existsConsumer(const Feature *feature) +bool MessagePipesGCWorker::MessagePipesGC::existsConsumer(const PipeEndPoint *pipeEndPoint) { - return MainCore::instance()->existsFeature(feature); + return MainCore::instance()->existsChannel((const ChannelAPI *)pipeEndPoint) + || MainCore::instance()->existsFeature((const Feature *)pipeEndPoint); } void MessagePipesGCWorker::MessagePipesGC::sendMessageToConsumer( const MessageQueue *messageQueue, MessagePipesCommon::ChannelRegistrationKey channelKey, - Feature *feature) + PipeEndPoint *pipeEndPoint) { MessagePipesCommon::MsgReportChannelDeleted *msg = MessagePipesCommon::MsgReportChannelDeleted::create( messageQueue, channelKey); - feature->getInputMessageQueue()->push(msg); + if (MainCore::instance()->existsFeature((const Feature *)pipeEndPoint)) // Use RTTI instead? + { + Feature *feature = (Feature *)pipeEndPoint; + feature->getInputMessageQueue()->push(msg); + } + else + { + ChannelAPI *channel = (ChannelAPI *)pipeEndPoint; + channel->getChannelMessageQueue()->push(msg); + } } MessagePipesGCWorker::MessagePipesGCWorker() : diff --git a/sdrbase/pipes/messagepipesgcworker.h b/sdrbase/pipes/messagepipesgcworker.h index 5b9ce1543..14b8e5f27 100644 --- a/sdrbase/pipes/messagepipesgcworker.h +++ b/sdrbase/pipes/messagepipesgcworker.h @@ -38,10 +38,10 @@ public: void setC2FRegistrations( QMutex *c2fMutex, QMap> *c2fQueues, - QMap> *c2fFeatures + QMap> *c2fPipeEndPoints ) { - m_messagePipesGC.setRegistrations(c2fMutex, c2fQueues, c2fFeatures); + m_messagePipesGC.setRegistrations(c2fMutex, c2fQueues, c2fPipeEndPoints); } void startWork(); @@ -50,12 +50,12 @@ public: bool isRunning() const { return m_running; } private: - class MessagePipesGC : public ElementPipesGC + class MessagePipesGC : public ElementPipesGC { private: virtual bool existsProducer(const PipeEndPoint *pipeEndPoint); - virtual bool existsConsumer(const Feature *feature); - virtual void sendMessageToConsumer(const MessageQueue *messageQueue, MessagePipesCommon::ChannelRegistrationKey key, Feature *feature); + virtual bool existsConsumer(const PipeEndPoint *pipeEndPoint); + virtual void sendMessageToConsumer(const MessageQueue *messageQueue, MessagePipesCommon::ChannelRegistrationKey key, PipeEndPoint *pipeEndPoint); }; MessagePipesGC m_messagePipesGC; diff --git a/sdrbase/pipes/pipeendpoint.cpp b/sdrbase/pipes/pipeendpoint.cpp index bf5ed4f2b..68066edd7 100644 --- a/sdrbase/pipes/pipeendpoint.cpp +++ b/sdrbase/pipes/pipeendpoint.cpp @@ -31,13 +31,14 @@ MESSAGE_CLASS_DEFINITION(PipeEndPoint::MsgReportPipes, Message) -QList PipeEndPoint::updateAvailablePipeSources(QString pipeName, QStringList pipeTypes, QStringList pipeURIs, Feature *destinationFeature) +QList PipeEndPoint::updateAvailablePipeSources(QString pipeName, QStringList pipeTypes, QStringList pipeURIs, PipeEndPoint *destination) { MainCore *mainCore = MainCore::instance(); MessagePipes& messagePipes = mainCore->getMessagePipes(); std::vector& deviceSets = mainCore->getDeviceSets(); QHash availablePipes; + // Source is a channel int deviceIndex = 0; for (std::vector::const_iterator it = deviceSets.begin(); it != deviceSets.end(); ++it, deviceIndex++) { @@ -55,14 +56,30 @@ QList PipeEndPoint::updateAvailablePipeSource { if (!availablePipes.contains(channel)) { - MessageQueue *messageQueue = messagePipes.registerChannelToFeature(channel, destinationFeature, pipeName); - QObject::connect( - messageQueue, - &MessageQueue::messageEnqueued, - destinationFeature, - [=](){ destinationFeature->handlePipeMessageQueue(messageQueue); }, - Qt::QueuedConnection - ); + MessageQueue *messageQueue = messagePipes.registerChannelToFeature(channel, destination, pipeName); + if (MainCore::instance()->existsFeature((const Feature *)destination)) + { + // Destination is feature + Feature *featureDest = (Feature *)destination; + QObject::connect( + messageQueue, + &MessageQueue::messageEnqueued, + featureDest, + [=](){ featureDest->handlePipeMessageQueue(messageQueue); }, + Qt::QueuedConnection + ); + } + else + { + // Destination is a channel + // Can't use Qt::QueuedConnection because ChannelAPI isn't a QObject + ChannelAPI *channelDest = (ChannelAPI *)destination; + QObject::connect( + messageQueue, + &MessageQueue::messageEnqueued, + [=](){ channelDest->handlePipeMessageQueue(messageQueue); } + ); + } } AvailablePipeSource availablePipe = @@ -79,6 +96,7 @@ QList PipeEndPoint::updateAvailablePipeSource } } + // Source is a feature std::vector& featureSets = mainCore->getFeatureeSets(); int featureIndex = 0; for (std::vector::const_iterator it = featureSets.begin(); it != featureSets.end(); ++it, featureIndex++) @@ -92,14 +110,30 @@ QList PipeEndPoint::updateAvailablePipeSource { if (!availablePipes.contains(feature)) { - MessageQueue *messageQueue = messagePipes.registerChannelToFeature(feature, destinationFeature, pipeName); - QObject::connect( - messageQueue, - &MessageQueue::messageEnqueued, - destinationFeature, - [=](){ destinationFeature->handlePipeMessageQueue(messageQueue); }, - Qt::QueuedConnection - ); + MessageQueue *messageQueue = messagePipes.registerChannelToFeature(feature, destination, pipeName); + if (MainCore::instance()->existsFeature((const Feature *)destination)) + { + // Destination is feature + Feature *featureDest = (Feature *)destination; + QObject::connect( + messageQueue, + &MessageQueue::messageEnqueued, + featureDest, + [=](){ featureDest->handlePipeMessageQueue(messageQueue); }, + Qt::QueuedConnection + ); + } + else + { + // Destination is a channel + // Can't use Qt::QueuedConnection because ChannelAPI isn't a QObject + ChannelAPI *channelDest = (ChannelAPI *)destination; + QObject::connect( + messageQueue, + &MessageQueue::messageEnqueued, + [=](){ channelDest->handlePipeMessageQueue(messageQueue); } + ); + } } AvailablePipeSource availablePipe = @@ -135,13 +169,17 @@ PipeEndPoint *PipeEndPoint::getPipeEndPoint(const QString name, const QList itr(availablePipeSources); - while (itr.hasNext()) { + while (itr.hasNext()) + { AvailablePipeSource p = itr.next(); - if ((p.m_setIndex == setIndex) && (p.m_index == index) && (id == p.m_id)) + if ((p.m_setIndex == setIndex) && (p.m_index == index) && (id == p.m_id)) { return p.m_source; + } } } else + { qDebug() << "PipeEndPoint::getPipeEndPoint: " << name << " is malformed"; + } return nullptr; } diff --git a/sdrbase/pipes/pipeendpoint.h b/sdrbase/pipes/pipeendpoint.h index df510f3c4..5de230b09 100644 --- a/sdrbase/pipes/pipeendpoint.h +++ b/sdrbase/pipes/pipeendpoint.h @@ -32,7 +32,7 @@ class Feature; class SDRBASE_API PipeEndPoint { public: - // Used by pipe sinks (features) to record details about available pipe sources (channels or features) + // Used by pipe sinks (channels or features) to record details about available pipe sources (channels or features) struct AvailablePipeSource { enum {RX, TX, Feature} m_type; @@ -92,7 +92,7 @@ public: protected: // Utility functions for pipe sinks to manage list of sources - QList updateAvailablePipeSources(QString pipeName, QStringList pipeTypes, QStringList pipeURIs, Feature *destinationFeature); + QList updateAvailablePipeSources(QString pipeName, QStringList pipeTypes, QStringList pipeURIs, PipeEndPoint *destination); PipeEndPoint *getPipeEndPoint(const QString name, const QList &availablePipeSources); };