diff --git a/src/audio/AudioThread.cpp b/src/audio/AudioThread.cpp index 97b44ee..1f89911 100644 --- a/src/audio/AudioThread.cpp +++ b/src/audio/AudioThread.cpp @@ -22,23 +22,23 @@ std::recursive_mutex AudioThread::m_device_mutex; AudioThread::AudioThread() : IOThread(), nBufferFrames(1024), sampleRate(0), controllerThread(nullptr) { - audioQueuePtr = 0; - underflowCount = 0; - active.store(false); - outputDevice.store(-1); + audioQueuePtr = 0; + underflowCount = 0; + active.store(false); + outputDevice.store(-1); gain = 1.0; } AudioThread::~AudioThread() { - std::lock_guard lock(m_mutex); - - if (controllerThread != nullptr) { + std::lock_guard lock(m_mutex); - controllerThread->join(); - delete controllerThread; - - controllerThread = nullptr; - } + if (controllerThread != nullptr) { + + controllerThread->join(); + delete controllerThread; + + controllerThread = nullptr; + } } std::recursive_mutex & AudioThread::getMutex() @@ -47,15 +47,15 @@ std::recursive_mutex & AudioThread::getMutex() } void AudioThread::attachControllerThread(std::thread* controllerThread_in) { - - //cleanup previous (should never happen) - if (controllerThread != nullptr) { - controllerThread->join(); - delete controllerThread; - } + //cleanup previous (should never happen) + if (controllerThread != nullptr) { - controllerThread = controllerThread_in; + controllerThread->join(); + delete controllerThread; + } + + controllerThread = controllerThread_in; } void AudioThread::bindThread(AudioThread *other) { @@ -68,8 +68,8 @@ void AudioThread::bindThread(AudioThread *other) { } void AudioThread::removeThread(AudioThread *other) { - - std::lock_guard lock(m_mutex); + + std::lock_guard lock(m_mutex); auto i = std::find(boundThreads.begin(), boundThreads.end(), other); @@ -80,15 +80,15 @@ void AudioThread::removeThread(AudioThread *other) { void AudioThread::deviceCleanup() { - std::lock_guard lock(m_device_mutex); - // only notify, let the thread die by itself. - for (auto i = deviceController.begin(); i != deviceController.end(); i++) { - i->second->terminate(); - } + std::lock_guard lock(m_device_mutex); + // only notify, let the thread die by itself. + for (auto i = deviceController.begin(); i != deviceController.end(); i++) { + i->second->terminate(); + } } static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned int nBufferFrames, double /* streamTime */, RtAudioStreamStatus status, - void *userData) { + void *userData) { float *out = (float*)outputBuffer; @@ -96,10 +96,10 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned //actually active. ::memset(out, 0, nBufferFrames * 2 * sizeof(float)); - //src in the controller thread: - AudioThread *src = (AudioThread *) userData; - - //by construction, src is a controller thread, from deviceController: + //src in the controller thread: + AudioThread *src = (AudioThread *)userData; + + //by construction, src is a controller thread, from deviceController: std::lock_guard lock(src->getMutex()); if (src->isTerminated()) { @@ -107,15 +107,15 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned } if (status) { - std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl << std::flush; + std::cout << "Audio buffer underflow.." << (src->underflowCount++) << std::endl << std::flush; } double peak = 0.0; - - //Process the bound threads audio: + + //Process the bound threads audio: for (size_t j = 0; j < src->boundThreads.size(); j++) { - AudioThread *srcmix = src->boundThreads[j]; + AudioThread *srcmix = src->boundThreads[j]; //lock every single boundThread srcmix in succession the time we process //its audio samples. @@ -127,23 +127,23 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned if (!srcmix->currentInput) { srcmix->audioQueuePtr = 0; - + if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) { continue; } - + continue; } if (srcmix->currentInput->sampleRate != src->getSampleRate()) { while (srcmix->inputQueue->try_pop(srcmix->currentInput)) { - + if (srcmix->currentInput) { if (srcmix->currentInput->sampleRate == src->getSampleRate()) { break; } - + } srcmix->currentInput = nullptr; } //end while @@ -160,13 +160,13 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned if (!srcmix->inputQueue->empty()) { srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { - + srcmix->currentInput = nullptr; } if (!srcmix->inputQueue->try_pop(srcmix->currentInput)) { continue; - } + } } continue; } @@ -180,7 +180,7 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) { srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { - + srcmix->currentInput = nullptr; } @@ -188,7 +188,7 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned break; } - + double srcPeak = srcmix->currentInput->peak * srcmix->gain; if (mixPeak < srcPeak) { mixPeak = srcPeak; @@ -201,13 +201,14 @@ static int audioCallback(void *outputBuffer, void * /* inputBuffer */, unsigned } srcmix->audioQueuePtr++; } - } else { + } + else { for (int i = 0, iMax = srcmix->currentInput->channels * nBufferFrames; i < iMax; i++) { if (srcmix->audioQueuePtr >= srcmix->currentInput->data.size()) { srcmix->audioQueuePtr = 0; if (srcmix->currentInput) { - + srcmix->currentInput = nullptr; } @@ -296,58 +297,58 @@ void AudioThread::enumerateDevices(std::vector &devs) { } void AudioThread::setDeviceSampleRate(int deviceId, int sampleRate) { - - AudioThread* matchingControllerThread = nullptr; - //scope lock here to minimize the common unique static lock contention - { - std::lock_guard lock(m_device_mutex); + AudioThread* matchingControllerThread = nullptr; - if (deviceController.find(deviceId) != deviceController.end()) { + //scope lock here to minimize the common unique static lock contention + { + std::lock_guard lock(m_device_mutex); - matchingControllerThread = deviceController[deviceId]; - } - } + if (deviceController.find(deviceId) != deviceController.end()) { - //out-of-lock test - if (matchingControllerThread != nullptr) { + matchingControllerThread = deviceController[deviceId]; + } + } - AudioThreadCommand refreshDevice; - refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE; - refreshDevice.int_value = sampleRate; - //VSO : blocking push ! - matchingControllerThread->getCommandQueue()->push(refreshDevice); - } + //out-of-lock test + if (matchingControllerThread != nullptr) { + + AudioThreadCommand refreshDevice; + refreshDevice.cmd = AudioThreadCommand::AUDIO_THREAD_CMD_SET_SAMPLE_RATE; + refreshDevice.int_value = sampleRate; + //VSO : blocking push ! + matchingControllerThread->getCommandQueue()->push(refreshDevice); + } } void AudioThread::setSampleRate(int sampleRate) { - bool thisIsAController = false; + bool thisIsAController = false; - //scope lock here to minimize the common unique static lock contention - { - std::lock_guard lock(m_device_mutex); + //scope lock here to minimize the common unique static lock contention + { + std::lock_guard lock(m_device_mutex); - if (deviceController[outputDevice.load()] == this) { - thisIsAController = true; - deviceSampleRate[outputDevice.load()] = sampleRate; - } - } + if (deviceController[outputDevice.load()] == this) { + thisIsAController = true; + deviceSampleRate[outputDevice.load()] = sampleRate; + } + } std::lock_guard lock(m_mutex); if (thisIsAController) { - + dac.stopStream(); dac.closeStream(); - //Set bounded sample rate: + //Set bounded sample rate: for (size_t j = 0; j < boundThreads.size(); j++) { AudioThread *srcmix = boundThreads[j]; - // the controller thread is part of the boundedThreads, so prevent infinite recursion: - if (srcmix != this) { - srcmix->setSampleRate(sampleRate); - } + // the controller thread is part of the boundedThreads, so prevent infinite recursion: + if (srcmix != this) { + srcmix->setSampleRate(sampleRate); + } } //make a local copy, snapshot of the list of demodulators @@ -359,7 +360,7 @@ void AudioThread::setSampleRate(int sampleRate) { } } - dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *) this, &opts); + dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *)this, &opts); dac.startStream(); } @@ -374,8 +375,8 @@ int AudioThread::getSampleRate() { void AudioThread::setupDevice(int deviceId) { - //global lock to setup the device... - std::lock_guard lock(m_device_mutex); + //global lock to setup the device... + std::lock_guard lock(m_device_mutex); parameters.deviceId = deviceId; parameters.nChannels = 2; @@ -385,10 +386,10 @@ void AudioThread::setupDevice(int deviceId) { try { if (deviceController.find(outputDevice.load()) != deviceController.end()) { - //'this' is not the controller, so remove it from the bounded list: - //beware, we must take the controller mutex, because the audio callback may use the list of bounded - //threads at that moment: - std::lock_guard lock(deviceController[outputDevice.load()]->getMutex()); + //'this' is not the controller, so remove it from the bounded list: + //beware, we must take the controller mutex, because the audio callback may use the list of bounded + //threads at that moment: + std::lock_guard lock(deviceController[outputDevice.load()]->getMutex()); deviceController[outputDevice.load()]->removeThread(this); } @@ -400,39 +401,43 @@ void AudioThread::setupDevice(int deviceId) { if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) { sampleRate = deviceSampleRate[parameters.deviceId]; - } else { - std::cout << "Error, device sample rate wasn't initialized?" << std::endl; - return; -// sampleRate = AudioThread::getDefaultAudioSampleRate(); -// deviceSampleRate[parameters.deviceId] = sampleRate; + } + else { + std::cout << "Error, device sample rate wasn't initialized?" << std::endl; + return; + // sampleRate = AudioThread::getDefaultAudioSampleRate(); + // deviceSampleRate[parameters.deviceId] = sampleRate; } - //Create a new controller: + //Create a new controller: if (deviceController.find(parameters.deviceId) == deviceController.end()) { - //Create a new controller thread for parameters.deviceId: + //Create a new controller thread for parameters.deviceId: deviceController[parameters.deviceId] = new AudioThread(); deviceController[parameters.deviceId]->setInitOutputDevice(parameters.deviceId, sampleRate); deviceController[parameters.deviceId]->bindThread(this); - deviceController[parameters.deviceId]->attachControllerThread(new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId])); + deviceController[parameters.deviceId]->attachControllerThread(new std::thread(&AudioThread::threadMain, deviceController[parameters.deviceId])); - } else if (deviceController[parameters.deviceId] == this) { + } + else if (deviceController[parameters.deviceId] == this) { - //Attach callback - dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *)this, &opts); - dac.startStream(); - } else { - //we are a bound thread, add ourselves to the controller deviceController[parameters.deviceId]. - //beware, we must take the controller mutex, because the audio callback may use the list of bounded - //threads at that moment: - std::lock_guard lock(deviceController[parameters.deviceId]->getMutex()); + //Attach callback + dac.openStream(¶meters, NULL, RTAUDIO_FLOAT32, sampleRate, &nBufferFrames, &audioCallback, (void *)this, &opts); + dac.startStream(); + } + else { + //we are a bound thread, add ourselves to the controller deviceController[parameters.deviceId]. + //beware, we must take the controller mutex, because the audio callback may use the list of bounded + //threads at that moment: + std::lock_guard lock(deviceController[parameters.deviceId]->getMutex()); deviceController[parameters.deviceId]->bindThread(this); } active = true; - } catch (RtAudioError& e) { + } + catch (RtAudioError& e) { e.printMessage(); return; } @@ -452,16 +457,17 @@ int AudioThread::getOutputDevice() { } void AudioThread::setInitOutputDevice(int deviceId, int sampleRate) { - - //global lock + + //global lock std::lock_guard lock(m_device_mutex); outputDevice = deviceId; if (sampleRate == -1) { if (deviceSampleRate.find(parameters.deviceId) != deviceSampleRate.end()) { - sampleRate = deviceSampleRate[deviceId]; + sampleRate = deviceSampleRate[deviceId]; } - } else { + } + else { deviceSampleRate[deviceId] = sampleRate; } this->sampleRate = sampleRate; @@ -470,12 +476,12 @@ void AudioThread::setInitOutputDevice(int deviceId, int sampleRate) { void AudioThread::run() { #ifdef __APPLE__ pthread_t tID = pthread_self(); // ID of this thread - int priority = sched_get_priority_max( SCHED_RR) - 1; - sched_param prio = {priority}; // scheduling priority of thread + int priority = sched_get_priority_max(SCHED_RR) - 1; + sched_param prio = { priority }; // scheduling priority of thread pthread_setschedparam(tID, SCHED_RR, &prio); #endif -// std::cout << "Audio thread initializing.." << std::endl; + // std::cout << "Audio thread initializing.." << std::endl; if (dac.getDeviceCount() < 1) { std::cout << "No audio devices found!" << std::endl; @@ -484,10 +490,10 @@ void AudioThread::run() { setupDevice((outputDevice.load() == -1) ? (dac.getDefaultOutputDevice()) : outputDevice.load()); -// std::cout << "Audio thread started." << std::endl; - + // std::cout << "Audio thread started." << std::endl; + inputQueue = std::static_pointer_cast(getInputQueue("AudioDataInput")); - + //Infinite loop, witing for commands or for termination while (!stopping) { AudioThreadCommand command; @@ -503,27 +509,28 @@ void AudioThread::run() { setSampleRate(command.int_value); } } //end while - + // Drain any remaining inputs, with a non-blocking pop if (inputQueue != nullptr) { inputQueue->flush(); } - //Nullify currentInput... - currentInput = nullptr; + //Nullify currentInput... + currentInput = nullptr; //Stop : this affects the device list , so must be protected globally. - std::lock_guard global_lock(m_device_mutex); + std::lock_guard global_lock(m_device_mutex); if (deviceController[parameters.deviceId] != this) { - //'this' is not the controller, so remove it from the bounded list: - //beware, we must take the controller mutex, because the audio callback may use the list of bounded - //threads at that moment: - std::lock_guard lock(deviceController[parameters.deviceId]->getMutex()); + //'this' is not the controller, so remove it from the bounded list: + //beware, we must take the controller mutex, because the audio callback may use the list of bounded + //threads at that moment: + std::lock_guard lock(deviceController[parameters.deviceId]->getMutex()); deviceController[parameters.deviceId]->removeThread(this); - } else { - // 'this' is a controller thread: + } + else { + // 'this' is a controller thread: try { if (dac.isStreamOpen()) { if (dac.isStreamRunning()) { @@ -531,12 +538,13 @@ void AudioThread::run() { } dac.closeStream(); } - } catch (RtAudioError& e) { + } + catch (RtAudioError& e) { e.printMessage(); } } -// std::cout << "Audio thread done." << std::endl; + // std::cout << "Audio thread done." << std::endl; } void AudioThread::terminate() { @@ -551,32 +559,33 @@ bool AudioThread::isActive() { void AudioThread::setActive(bool state) { - AudioThread* matchingControllerThread = nullptr; + AudioThread* matchingControllerThread = nullptr; - std::lock_guard lock(m_mutex); + std::lock_guard lock(m_mutex); - //scope lock here to minimize the common unique static lock contention - { - std::lock_guard lock(m_device_mutex); + //scope lock here to minimize the common unique static lock contention + { + std::lock_guard lock(m_device_mutex); - if (deviceController.find(parameters.deviceId) != deviceController.end()) { + if (deviceController.find(parameters.deviceId) != deviceController.end()) { - matchingControllerThread = deviceController[parameters.deviceId]; - } - } + matchingControllerThread = deviceController[parameters.deviceId]; + } + } if (matchingControllerThread == nullptr) { return; } if (state && !active && inputQueue) { - matchingControllerThread->bindThread(this); - } else if (!state && active) { - matchingControllerThread->removeThread(this); + matchingControllerThread->bindThread(this); + } + else if (!state && active) { + matchingControllerThread->removeThread(this); } // Activity state changing, clear any inputs - if(inputQueue) { + if (inputQueue) { inputQueue->flush(); } active = state; @@ -587,7 +596,7 @@ AudioThreadCommandQueue *AudioThread::getCommandQueue() { } void AudioThread::setGain(float gain_in) { - + if (gain_in < 0.0) { gain_in = 0.0; } diff --git a/src/audio/AudioThread.h b/src/audio/AudioThread.h index ec13edc..a977c05 100644 --- a/src/audio/AudioThread.h +++ b/src/audio/AudioThread.h @@ -21,20 +21,20 @@ public: int channels; float peak; int type; - bool is_squelch_active; + bool is_squelch_active; std::vector data; AudioThreadInput() : - frequency(0), inputRate(0), sampleRate(0), channels(0), peak(0), type(0), is_squelch_active(false) { + frequency(0), inputRate(0), sampleRate(0), channels(0), peak(0), type(0), is_squelch_active(false) { } - + AudioThreadInput(AudioThreadInput *copyFrom) { copy(copyFrom); } - + void copy(AudioThreadInput *copyFrom) { frequency = copyFrom->frequency; inputRate = copyFrom->inputRate; @@ -46,9 +46,9 @@ public: data.assign(copyFrom->data.begin(), copyFrom->data.end()); } - + virtual ~AudioThreadInput() { - + } }; @@ -65,7 +65,7 @@ public: }; AudioThreadCommand() : - cmd(AUDIO_THREAD_CMD_NULL), int_value(0) { + cmd(AUDIO_THREAD_CMD_NULL), int_value(0) { } AudioThreadCommandEnum cmd; @@ -81,15 +81,15 @@ typedef std::shared_ptr AudioThreadCommandQueuePtr; class AudioThread : public IOThread { public: - + AudioThread(); virtual ~AudioThread(); static void enumerateDevices(std::vector &devs); - void setInitOutputDevice(int deviceId, int sampleRate=-1); + void setInitOutputDevice(int deviceId, int sampleRate = -1); int getOutputDevice(); - + int getSampleRate(); virtual void run(); @@ -99,7 +99,7 @@ public: void setActive(bool state); void setGain(float gain_in); - + static std::map deviceSampleRate; AudioThreadCommandQueue *getCommandQueue(); @@ -110,8 +110,8 @@ public: static void deviceCleanup(); static void setDeviceSampleRate(int deviceId, int sampleRate); - // - void attachControllerThread(std::thread* controllerThread); + // + void attachControllerThread(std::thread* controllerThread); //fields below, only to be used by other AudioThreads ! size_t underflowCount; @@ -126,7 +126,7 @@ private: std::atomic_bool active; std::atomic_int outputDevice; - + RtAudio dac; unsigned int nBufferFrames; RtAudio::StreamOptions opts; @@ -134,8 +134,8 @@ private: AudioThreadCommandQueue cmdQueue; int sampleRate; - //if != nullptr, it mean AudioThread is a controller thread. - std::thread* controllerThread = nullptr; + //if != nullptr, it mean AudioThread is a controller thread. + std::thread* controllerThread = nullptr; //The own m_mutex protecting this AudioThread, in particular boundThreads std::recursive_mutex m_mutex; @@ -146,8 +146,8 @@ private: void bindThread(AudioThread *other); void removeThread(AudioThread *other); - static std::map deviceController; + static std::map deviceController; - //The mutex protecting static deviceController, deviceThread and deviceSampleRate access. - static std::recursive_mutex m_device_mutex; + //The mutex protecting static deviceController, deviceThread and deviceSampleRate access. + static std::recursive_mutex m_device_mutex; };