diff --git a/native/serverconnection/src/EventLoop.h b/native/serverconnection/src/EventLoop.h index a7864e0..22a5974 100644 --- a/native/serverconnection/src/EventLoop.h +++ b/native/serverconnection/src/EventLoop.h @@ -12,6 +12,9 @@ namespace tc::event { class EventEntry { friend class EventExecutor; public: + EventEntry() = default; + virtual ~EventEntry() = default; + virtual void event_execute(const std::chrono::system_clock::time_point& /* scheduled timestamp */) = 0; virtual void event_execute_dropped(const std::chrono::system_clock::time_point& /* scheduled timestamp */) {} diff --git a/native/serverconnection/src/audio/codec/Converter.cpp b/native/serverconnection/src/audio/codec/Converter.cpp index 5bf11ed..6c0efa8 100644 --- a/native/serverconnection/src/audio/codec/Converter.cpp +++ b/native/serverconnection/src/audio/codec/Converter.cpp @@ -4,31 +4,78 @@ #include "Converter.h" +#include + +#define HAVE_CODEC_OPUS + +#ifdef HAVE_CODEC_OPUS + #include "./OpusConverter.h" +#endif + +using namespace tc::audio; using namespace tc::audio::codec; -Converter::Converter(size_t c, size_t s, size_t f) : _channels(c), _sample_rate(s), _frame_size(f) {} -Converter::~Converter() {} +[[nodiscard]] constexpr inline bool audio_codec_supported(const AudioCodec &codec) { + switch (codec) { -bool type::supported(value type) { #ifdef HAVE_CODEC_OPUS - if(type == type::opus) - return true; + case AudioCodec::OpusVoice: + case AudioCodec::OpusMusic: + return true; #endif #ifdef HAVE_CODEC_SPEEX - if(type == type::speex) - return true; + case AudioCodec::Speex: + return true; #endif #ifdef HAVE_CODEC_FLAC - if(type == type::flac) - return true; + case AudioCodec::Flac: + return true; #endif #ifdef HAVE_CODEC_CELT - if(type == type::celt) - return true; + case AudioCodec::Celt: + return true; #endif - return false; + default: + return false; + } +} + +bool codec::audio_decode_supported(const AudioCodec &codec) { + return audio_codec_supported(codec); +} + +bool codec::audio_encode_supported(const AudioCodec &codec) { + return audio_codec_supported(codec); +} + +std::unique_ptr codec::create_audio_decoder(const AudioCodec &codec) { + switch (codec) { + +#ifdef HAVE_CODEC_OPUS + case AudioCodec::OpusVoice: + case AudioCodec::OpusMusic: + return std::make_unique(codec); +#endif + + default: + return nullptr; + } +} + +std::unique_ptr codec::create_audio_encoder(const AudioCodec &codec) { + switch (codec) { + +#ifdef HAVE_CODEC_OPUS + case AudioCodec::OpusVoice: + case AudioCodec::OpusMusic: + return std::make_unique(codec); +#endif + + default: + return nullptr; + } } \ No newline at end of file diff --git a/native/serverconnection/src/audio/codec/Converter.h b/native/serverconnection/src/audio/codec/Converter.h index dc1e1ea..c155542 100644 --- a/native/serverconnection/src/audio/codec/Converter.h +++ b/native/serverconnection/src/audio/codec/Converter.h @@ -1,66 +1,122 @@ #pragma once #include - -#if !defined(ssize_t) && defined(WIN32) - #define ssize_t int64_t -#endif +#include +#include namespace tc::audio::codec { - namespace type { - enum value { - undefined, + enum struct AudioCodec { + Unknown, - /* supported */ - opus, - speex, + /* supported */ + OpusVoice, + OpusMusic, - /* unsupported */ - flac, - celt - }; + /* Not yet supported */ + Flac, - extern bool supported(value); + /* Removed in summer 2020 */ + SpeexNarrow, + SpeexWide, + SpeexUltraWide, + Celt, + }; + + class AudioEncoder; + class AudioDecoder; + + [[nodiscard]] extern bool audio_encode_supported(const AudioCodec& /* codec */); + [[nodiscard]] extern std::unique_ptr create_audio_encoder(const AudioCodec& /* codec */); + + [[nodiscard]] extern bool audio_decode_supported(const AudioCodec& /* codec */); + [[nodiscard]] extern std::unique_ptr create_audio_decoder(const AudioCodec& /* codec */); + + [[nodiscard]] constexpr inline std::optional audio_codec_to_protocol_id(const AudioCodec& codec) { + switch(codec) { + case AudioCodec::SpeexNarrow: + return std::make_optional(0); + + case AudioCodec::SpeexWide: + return std::make_optional(1); + + case AudioCodec::SpeexUltraWide: + return std::make_optional(2); + + case AudioCodec::Celt: + return std::make_optional(3); + + case AudioCodec::OpusVoice: + return std::make_optional(4); + + case AudioCodec::OpusMusic: + return std::make_optional(5); + + case AudioCodec::Flac: + return std::make_optional(6); + + default: + return std::nullopt; + } } - class Converter { - public: - Converter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */); - virtual ~Converter(); + [[nodiscard]] constexpr inline std::optional audio_codec_from_protocol_id(uint8_t id) { + switch(id) { + case 0: + return std::make_optional(AudioCodec::SpeexNarrow); - /* initialize parameters depend on the codec */ - virtual bool valid() = 0; - virtual void finalize() = 0; + case 1: + return std::make_optional(AudioCodec::SpeexWide); - virtual void reset_encoder() = 0; - virtual void reset_decoder() = 0; + case 2: + return std::make_optional(AudioCodec::SpeexUltraWide); - /** - * @return number of bytes written on success - */ - virtual ssize_t encode(std::string& /* error */, const void* /* source */, void* /* destination */, size_t /* destination byte length */, bool /* head package */) = 0; + case 3: + return std::make_optional(AudioCodec::Celt); - /** - * @return number of samples on success - */ - virtual ssize_t decode(std::string& /* error */, const void* /* source */, size_t /* source byte length */, void* /* destination */, bool /* fec decoding */) = 0; - virtual ssize_t decode_lost(std::string& /* error */, size_t /* packets */) = 0; + case 4: + return std::make_optional(AudioCodec::OpusVoice); - virtual size_t expected_encoded_length(size_t /* sample count */) = 0; - virtual size_t expected_decoded_length(const void* /* source */, size_t /* source byte length */) { - return this->bytes_per_frame(); - } + case 5: + return std::make_optional(AudioCodec::OpusMusic); - inline size_t channels() { return this->_channels; } - inline size_t sample_rate() { return this->_sample_rate; } - inline size_t frame_size() { return this->_frame_size; } + case 6: + return std::make_optional(AudioCodec::Flac); - inline size_t bytes_per_frame() { return this->_channels * this->_frame_size * 4; } - protected: - size_t _frame_size; - size_t _channels; - size_t _sample_rate; - }; + default: + return std::nullopt; + } + } + + [[nodiscard]] constexpr inline const char* audio_codec_name(const AudioCodec& codec) { + switch(codec) { + case AudioCodec::SpeexNarrow: + return "speex narrow"; + + case AudioCodec::SpeexWide: + return "speex wide"; + + case AudioCodec::SpeexUltraWide: + return "speex ultra wide"; + + case AudioCodec::Celt: + return "celt"; + + case AudioCodec::OpusVoice: + return "opus voice"; + + case AudioCodec::OpusMusic: + return "opus music"; + + case AudioCodec::Flac: + return "flac"; + + case AudioCodec::Unknown: + return "unknown"; + + default: + return "invalid"; + } + } struct EncoderBufferInfo { size_t sample_count{0}; @@ -104,5 +160,38 @@ namespace tc::audio::codec { [[nodiscard]] virtual bool encode(std::string& /* error */, void* /* target buffer */, size_t& /* target length */, const EncoderBufferInfo& /* buffer info */, const float* /* samples */) = 0; }; - class AudioDecoder {}; + struct DecodePayloadInfo { + /** + * Use a value of zero to indicate packet loss + */ + size_t byte_length{0}; + bool fec_decode{false}; + }; + + class AudioDecoder { + public: + explicit AudioDecoder() = default; + virtual ~AudioDecoder() = default; + + [[nodiscard]] virtual bool valid() const = 0; + [[nodiscard]] virtual bool initialize(std::string& /* error */) = 0; + virtual void reset_sequence() = 0; + + /** + * Get the codecs sample rate (will be the output rate) + */ + [[nodiscard]] virtual size_t sample_rate() const = 0; + + /** + * Get the codecs audio channel count. + */ + [[nodiscard]] virtual size_t channel_count() const = 0; + + /** + * @returns the expected sample count + */ + [[nodiscard]] virtual size_t expected_decoded_length(const void* /* payload */, size_t /* payload length */) const = 0; + + [[nodiscard]] virtual bool decode(std::string& /* error */, float* /* target buffer */, size_t& /* target sample count */, const DecodePayloadInfo& /* payload info */, const void* /* payload */) = 0; + }; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/codec/OpusConverter.cpp b/native/serverconnection/src/audio/codec/OpusConverter.cpp index 74b768a..3fe3c44 100644 --- a/native/serverconnection/src/audio/codec/OpusConverter.cpp +++ b/native/serverconnection/src/audio/codec/OpusConverter.cpp @@ -4,162 +4,8 @@ using namespace std; using namespace tc::audio::codec; -OpusConverter::OpusConverter(size_t c, size_t s, size_t f) : Converter(c, s, f) { } -OpusConverter::~OpusConverter() = default; - -bool OpusConverter::valid() { - return this->encoder && this->decoder; -} - -bool OpusConverter::initialize(std::string &error, int application_type) { - lock_guard lock(this->coder_lock); - this->_application_type = application_type; - - if(!this->_initialize_encoder(error)) - return false; - - if(!this->_initialize_decoder(error)) { - this->reset_encoder(); - return false; - } - - return true; -} - -void OpusConverter::reset_encoder() { - lock_guard lock(this->coder_lock); - - log_info(category::audio, tr("Resetting encoder")); - auto result = opus_encoder_ctl(this->encoder, OPUS_RESET_STATE); - if(result != OPUS_OK) - log_warn(category::audio, tr("Failed to reset opus encoder. Opus result: {}"), result); -} - -void OpusConverter::reset_decoder() { - lock_guard lock(this->coder_lock); - - log_info(category::audio, tr("Resetting decoder")); - auto result = opus_decoder_ctl(this->decoder, OPUS_RESET_STATE); - if(result != OPUS_OK) - log_warn(category::audio, tr("Failed to reset opus decoder. Opus result: {}"), result); - this->fec_decoder_ = true; -} - - -void OpusConverter::finalize() { - lock_guard lock(this->coder_lock); - - if(this->encoder) opus_encoder_destroy(this->encoder); - this->encoder = nullptr; - - if(this->decoder) opus_decoder_destroy(this->decoder); - this->decoder = nullptr; -} - -ssize_t OpusConverter::encode(std::string &error, const void *source, void *target, size_t target_length, bool head_package) { - lock_guard lock(this->coder_lock); - - opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(head_package ? 100 : 15)); - auto result = opus_encode_float(this->encoder, (float*) source, (int) this->_frame_size, (uint8_t*) target, (opus_int32) target_length); - if(result < OPUS_OK) { - error = to_string(result) + "|" + opus_strerror(result); - return -1; - } - return result; -} - -ssize_t OpusConverter::decode(std::string &error, const void *source, size_t source_length, void *target, bool use_fec) { - lock_guard lock(this->coder_lock); - - auto result = opus_decode_float(this->decoder, (uint8_t*) source, (opus_int32) source_length, (float*) target, (int) this->_frame_size, use_fec ? 1 : 0); - if(result < OPUS_OK) { - error = to_string(result) + "|" + opus_strerror(result); - return -1; - } - return result; -} - -ssize_t OpusConverter::decode_lost(std::string &error, size_t packets) { - lock_guard lock(this->coder_lock); - - auto buffer = (float*) malloc(this->_frame_size * this->_channels * sizeof(float)); - while (packets-- > 0) { - auto result = opus_decode_float(this->decoder, nullptr, 0, buffer, (int) this->_frame_size, false); - if(result < OPUS_OK) - log_warn(category::audio, tr("Opus decode lost resulted in error: {}"), result); - } - this->fec_decoder_ = true; - free(buffer); - return 0; -} - -size_t OpusConverter::expected_encoded_length(size_t sample_count) { - //TODO calculate stuff - return 512; -} - -bool OpusConverter::_initialize_decoder(std::string &error) { - if(!this->_finalize_decoder(error)) - return false; - - int error_id = 0; - this->decoder = opus_decoder_create((opus_int32) this->_sample_rate, (int) this->_channels, &error_id); - if(!this->encoder || error_id) { - error = "failed to create decoder (" + to_string(error_id) + ")"; - return false; - } - return true; -} - -bool OpusConverter::_initialize_encoder(std::string &error) { - if(!this->_finalize_encoder(error)) - return false; - - int error_id = 0; - this->encoder = opus_encoder_create((opus_int32) this->_sample_rate, (int) this->_channels, this->_application_type, &error_id); - if(!this->encoder || error_id) { - error = "failed to create encoder (" + to_string(error_id) + ")"; - return false; - } - - error_id = opus_encoder_ctl(encoder, OPUS_SET_BITRATE(64000)); - if(error_id) { - error = "failed to set bitrate (" + to_string(error_id) + ")"; - return false; - } - - error_id = opus_encoder_ctl(encoder, OPUS_SET_INBAND_FEC(1)); - if(error_id) { - error = "failed to enable fec (" + to_string(error_id) + ")"; - return false; - } - - error_id = opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(15)); - if(error_id) { - error = "failed to assume a 15% packet loss (" + to_string(error_id) + ")"; - return false; - } - - return true; -} - -bool OpusConverter::_finalize_decoder(std::string &) { - if(this->decoder) { - opus_decoder_destroy(this->decoder); - this->decoder = nullptr; - } - return true; -} - -bool OpusConverter::_finalize_encoder(std::string &) { - if(this->encoder) { - opus_encoder_destroy(this->encoder); - this->encoder = nullptr; - } - return true; -} - -OpusAudioEncoder::OpusAudioEncoder(int application_type) : application_type_{application_type} {} +/* The opus encoder */ +OpusAudioEncoder::OpusAudioEncoder(AudioCodec target_codec) : target_codec_{target_codec} {} OpusAudioEncoder::~OpusAudioEncoder() noexcept { if(this->encoder) { opus_encoder_destroy(this->encoder); @@ -172,8 +18,23 @@ bool OpusAudioEncoder::valid() const { } bool OpusAudioEncoder::initialize(string &error) { - int error_id = 0; - this->encoder = opus_encoder_create((opus_int32) this->sample_rate(), (int) this->channel_count(), this->application_type_, &error_id); + int application_type; + switch(this->target_codec_) { + case AudioCodec::OpusVoice: + application_type = OPUS_APPLICATION_VOIP; + break; + + case AudioCodec::OpusMusic: + application_type = OPUS_APPLICATION_AUDIO; + break; + + default: + error = "target codec isn't opus"; + return false; + } + + int error_id{0}; + this->encoder = opus_encoder_create((opus_int32) this->sample_rate(), (int) this->channel_count(), application_type, &error_id); if(!this->encoder || error_id) { error = "failed to create encoder (" + to_string(error_id) + ")"; goto cleanup_error; @@ -219,7 +80,7 @@ size_t OpusAudioEncoder::sample_rate() const { } size_t OpusAudioEncoder::channel_count() const { - if(this->application_type() == OPUS_APPLICATION_AUDIO) { + if(this->target_codec_ == AudioCodec::OpusMusic) { return 2; } else { return 1; @@ -248,3 +109,70 @@ bool OpusAudioEncoder::encode(std::string& error, void *target_buffer, size_t &t target_size = (size_t) result; return true; } + +/* The Opus decoder */ +OpusAudioDecoder::OpusAudioDecoder(AudioCodec target_codec) : target_codec_{target_codec} {} +OpusAudioDecoder::~OpusAudioDecoder() noexcept { + if(this->decoder) { + opus_decoder_destroy(this->decoder); + this->decoder = nullptr; + } +}; + +bool OpusAudioDecoder::valid() const { + return this->decoder != nullptr; +} + +bool OpusAudioDecoder::initialize(string &error) { + int error_id{0}; + this->decoder = opus_decoder_create((opus_int32) this->sample_rate(), (int) this->channel_count(), &error_id); + if(!this->decoder || error_id) { + error = "failed to create decoder (" + to_string(error_id) + ")"; + return false; + } + + return true; +} + +void OpusAudioDecoder::reset_sequence() { + auto result = opus_decoder_ctl(this->decoder, OPUS_RESET_STATE); + if(result != OPUS_OK) { + log_warn(category::audio, tr("Failed to reset opus decoder. Opus result: {}"), result); + } +} + +size_t OpusAudioDecoder::sample_rate() const { + return 48000; +} + +size_t OpusAudioDecoder::channel_count() const { + if(this->target_codec_ == AudioCodec::OpusMusic) { + return 2; + } else { + return 1; + } +} + +size_t OpusAudioDecoder::expected_decoded_length(const void *payload, size_t payload_size) const { + auto result = opus_decoder_get_nb_samples(this->decoder, (uint8_t*) payload, payload_size); + if(result <= 0) { + return 0; + } + return (size_t) result; +} + +bool OpusAudioDecoder::decode(string &error, float *sample_buffer, size_t &sample_count, const DecodePayloadInfo &info, const void *payload) { + auto result = opus_decode_float(this->decoder, + info.byte_length == 0 ? nullptr : (uint8_t*) payload, (opus_int32) info.byte_length, + (float*) sample_buffer, (int) sample_count, + info.fec_decode ? 1 : 0 + ); + + if(result < 0) { + error = to_string(result) + "|" + opus_strerror(result); + return false; + } + + sample_count = (size_t) result; + return true; +} diff --git a/native/serverconnection/src/audio/codec/OpusConverter.h b/native/serverconnection/src/audio/codec/OpusConverter.h index ce37928..c57425e 100644 --- a/native/serverconnection/src/audio/codec/OpusConverter.h +++ b/native/serverconnection/src/audio/codec/OpusConverter.h @@ -5,42 +5,9 @@ #include namespace tc::audio::codec { - class OpusConverter : public Converter { - public: - OpusConverter(size_t /* channels */, size_t /* sample rate */, size_t /* frame size */); - virtual ~OpusConverter(); - - bool valid() override; - - bool initialize(std::string& /* error */, int /* application type */); - void finalize() override; - - void reset_encoder() override; - void reset_decoder() override; - - ssize_t encode(std::string & /* error */, const void * /* source */, void * /* target */, size_t /* target size */, bool /* head package */) override; - ssize_t decode(std::string & /* error */, const void * /* source */, size_t /* source size */, void *pVoid1, bool /* use fec */) override; - - ssize_t decode_lost(std::string &string, size_t /* packets */) override; - - size_t expected_encoded_length(size_t size) override; - private: - std::mutex coder_lock; - OpusDecoder* decoder = nullptr; - OpusEncoder* encoder = nullptr; - - bool fec_decoder_{false}; - int _application_type = 0; - - bool _finalize_encoder(std::string& /* error */); - bool _finalize_decoder(std::string& /* error */); - bool _initialize_encoder(std::string& /* error */); - bool _initialize_decoder(std::string& /* error */); - }; - class OpusAudioEncoder : public AudioEncoder { public: - explicit OpusAudioEncoder(int /* application type */); + explicit OpusAudioEncoder(AudioCodec /* target codec */); ~OpusAudioEncoder() override; bool valid() const override; @@ -57,10 +24,32 @@ namespace tc::audio::codec { bool encode(std::string&, void *, size_t &, const EncoderBufferInfo &, const float *) override; - [[nodiscard]] inline auto application_type() const { return this->application_type_; } - private: - int application_type_; + AudioCodec target_codec_; OpusEncoder* encoder{nullptr}; }; + + class OpusAudioDecoder : public AudioDecoder { + public: + explicit OpusAudioDecoder(AudioCodec /* target codec */); + ~OpusAudioDecoder() override; + + bool valid() const override; + + bool initialize(std::string &string) override; + + void reset_sequence() override; + + size_t sample_rate() const override; + + size_t channel_count() const override; + + size_t expected_decoded_length(const void *pVoid, size_t size) const override; + + bool decode(std::string &, float *, size_t &, const DecodePayloadInfo &, const void *) override; + + private: + AudioCodec target_codec_; + OpusDecoder* decoder{nullptr}; + }; } \ No newline at end of file diff --git a/native/serverconnection/src/connection/ProtocolHandler.h b/native/serverconnection/src/connection/ProtocolHandler.h index e7ea34b..f1052af 100644 --- a/native/serverconnection/src/connection/ProtocolHandler.h +++ b/native/serverconnection/src/connection/ProtocolHandler.h @@ -18,10 +18,6 @@ #include #include "ServerConnection.h" -namespace ts::connection { - class CryptionHandler; -} - namespace tc::connection { class ServerConnection; diff --git a/native/serverconnection/src/connection/audio/AudioSender.cpp b/native/serverconnection/src/connection/audio/AudioSender.cpp index cc40644..da7e6e6 100644 --- a/native/serverconnection/src/connection/audio/AudioSender.cpp +++ b/native/serverconnection/src/connection/audio/AudioSender.cpp @@ -4,7 +4,6 @@ #include "../../audio/AudioEventLoop.h" #include "../../audio/AudioMerger.h" #include "../../audio/AudioReframer.h" -#include "../../audio/codec/OpusConverter.h" using namespace std; using namespace tc; @@ -43,7 +42,7 @@ void VoiceSender::send_data(const float *data, size_t samples, size_t rate, size const auto aligned_frame_size{((sizeof(AudioFrame) + 3) / sizeof(float)) * sizeof(float)}; auto frame = (AudioFrame*) malloc(aligned_frame_size + samples * channels * sizeof(float)); - new (&frame) AudioFrame{}; + new (frame) AudioFrame{}; frame->sample_count = samples; frame->sample_rate = rate; @@ -65,7 +64,7 @@ void VoiceSender::send_data(const float *data, size_t samples, size_t rate, size void VoiceSender::send_stop() { auto frame = (AudioFrame*) malloc(sizeof(AudioFrame)); - new (&frame) AudioFrame{}; + new (frame) AudioFrame{}; frame->timestamp = chrono::system_clock::now(); { @@ -82,10 +81,6 @@ void VoiceSender::finalize() { this->handle = nullptr; } -void VoiceSender::set_codec(connection::codec::value target_codec) { - this->target_codec_ = target_codec; -} - void VoiceSender::event_execute(const std::chrono::system_clock::time_point &point) { static auto max_time = chrono::milliseconds(10); @@ -129,47 +124,52 @@ void VoiceSender::encode_raw_frame(const AudioFrame* frame) { if(frame->sample_rate == 0) { /* Audio sequence end */ this->audio_sequence_no = 0; - if(this->current_codec_.has_value()) { + + auto codec_protocol_id = audio::codec::audio_codec_to_protocol_id(this->current_codec); + if(codec_protocol_id.has_value()) { this->flush_current_codec(); + if(this->codec_encoder) { + this->codec_encoder->reset_sequence(); + } + auto server = this->handle->handle(); - server->send_voice_data(nullptr, 0, *this->current_codec_, false); + server->send_voice_data(nullptr, 0, *codec_protocol_id, false); } return; } - if(!this->current_codec_.has_value() || *this->current_codec_ != this->target_codec_) { + if(this->current_codec != this->target_codec_) { + auto codec_protocol_id = audio::codec::audio_codec_to_protocol_id(this->target_codec_); + if(!codec_protocol_id.has_value()) { + /* we can't send it so no need to initialize it */ + return; + } + this->flush_current_codec(); this->audio_sequence_no = 0; this->codec_resampler = nullptr; this->codec_reframer = nullptr; this->codec_encoder = nullptr; - this->current_codec_ = std::make_optional(this->target_codec_); + this->current_codec = this->target_codec_; + + if(!audio::codec::audio_encode_supported(this->current_codec)) { + log_warn(category::voice_connection, tr("Audio sender set to codec where encoding is not supported. Do not send any audio data.")); + return; + } + + this->codec_encoder = audio::codec::create_audio_encoder(this->current_codec); + if(!this->codec_encoder) { + log_error(category::voice_connection, tr("Failed to allocate new audio encoder for codec {}"), (uint32_t) this->target_codec_); + return; + } std::string error{}; - switch(this->target_codec_) { - case codec::OPUS_VOICE: - this->codec_encoder = std::make_unique(OPUS_APPLICATION_VOIP); - if(!this->codec_encoder->initialize(error)) { - log_error(category::voice_connection, tr("Failed to initialize target audio codec {}"), (uint32_t) this->target_codec_); - this->codec_encoder = nullptr; - return; - } - break; - - case codec::OPUS_MUSIC: - this->codec_encoder = std::make_unique(OPUS_APPLICATION_AUDIO); - if(!this->codec_encoder->initialize(error)) { - log_error(category::voice_connection, tr("Failed to initialize target audio codec {}"), (uint32_t) this->target_codec_); - this->codec_encoder = nullptr; - return; - } - break; - - default: - log_error(category::voice_connection, tr("Unknown target encode codec {}"), (uint32_t) this->target_codec_); - return; + if(!this->codec_encoder->initialize(error)) { + log_error(category::voice_connection, tr("Failed to initialize auto encoder (codec {}) {}"), (uint32_t) this->target_codec_, error); + this->codec_encoder = nullptr; + return; } } @@ -229,8 +229,11 @@ void VoiceSender::encode_raw_frame(const AudioFrame* frame) { constexpr static auto kMaxPacketSize{1500}; void VoiceSender::handle_network_frame(const float *sample_buffer, size_t sample_count, bool is_flush) { - assert(this->current_codec_.has_value()); assert(this->codec_encoder); + auto codec_protocol_id = audio::codec::audio_codec_to_protocol_id(this->current_codec); + if(!codec_protocol_id.has_value()) { + return; + } //log_trace(category::voice_connection, tr("Encoding audio chunk of {}/{} aka {}ms with codec {}"), // sample_count, this->codec_encoder->sample_rate(), sample_count * 1000 / this->codec_encoder->sample_rate(), *this->current_codec_); @@ -249,8 +252,13 @@ void VoiceSender::handle_network_frame(const float *sample_buffer, size_t sample return; } + if(!packet_size) { + /* No audio packet created */ + return; + } + auto server = this->handle->handle(); - server->send_voice_data(packet_buffer, packet_size, *this->current_codec_, buffer_info.head_sequence); + server->send_voice_data(packet_buffer, packet_size, *codec_protocol_id, buffer_info.head_sequence); } void VoiceSender::flush_current_codec() { diff --git a/native/serverconnection/src/connection/audio/AudioSender.h b/native/serverconnection/src/connection/audio/AudioSender.h index ff980ce..94985f0 100644 --- a/native/serverconnection/src/connection/audio/AudioSender.h +++ b/native/serverconnection/src/connection/audio/AudioSender.h @@ -3,7 +3,7 @@ #include #include #include -#include "VoiceClient.h" +#include "./VoiceClient.h" namespace tc { namespace audio { @@ -20,17 +20,17 @@ namespace tc { namespace connection { class VoiceConnection; class VoiceSender : public event::EventEntry { - template - friend inline std::shared_ptr<_Tp> std::static_pointer_cast(const std::shared_ptr<_Up>& __r) noexcept; friend class VoiceConnection; public: + using AudioCodec = audio::codec::AudioCodec; + explicit VoiceSender(VoiceConnection*); virtual ~VoiceSender(); void finalize(); - codec::value get_codec() { return this->current_codec_.value_or(this->target_codec_); } - void set_codec(codec::value value); + [[nodiscard]] inline auto target_codec() const { return this->target_codec_; } + inline void set_codec(const AudioCodec& target) { this->target_codec_ = target; } void send_data(const float* /* buffer */, size_t /* samples */, size_t /* sample rate */, size_t /* channels */); void send_stop(); @@ -61,8 +61,8 @@ namespace tc { bool voice_send_enabled{false}; /* Codec specific values */ - codec::value target_codec_{codec::OPUS_VOICE}; - std::optional current_codec_{}; + AudioCodec target_codec_{AudioCodec::Unknown}; + AudioCodec current_codec{AudioCodec::Unknown}; std::unique_ptr codec_encoder{}; std::unique_ptr codec_resampler{}; diff --git a/native/serverconnection/src/connection/audio/VoiceClient.cpp b/native/serverconnection/src/connection/audio/VoiceClient.cpp index d1a828c..ec13d33 100644 --- a/native/serverconnection/src/connection/audio/VoiceClient.cpp +++ b/native/serverconnection/src/connection/audio/VoiceClient.cpp @@ -14,57 +14,6 @@ extern tc::audio::AudioOutput* global_audio_output; #define DEBUG_PREMATURE_PACKETS -#ifdef WIN32 - #define _field_(name, value) value -#else - #define _field_(name, value) .name = value -#endif - -const codec::CodecInfo codec::info[6] = { - { - _field_(supported, false), - _field_(name, "speex_narrowband"), - _field_(new_converter, nullptr) - }, - { - _field_(supported, false), - _field_(name, "speex_wideband"), - _field_(new_converter, nullptr) - }, - { - _field_(supported, false), - _field_(name, "speex_ultra_wideband"), - _field_(new_converter, nullptr) - }, - { - _field_(supported, false), - _field_(name, "celt_mono"), - _field_(new_converter, nullptr) - }, - { - _field_(supported, true), - _field_(name, "opus_voice"), - _field_(new_converter, [](string& error) -> std::shared_ptr { - auto result = std::make_shared(1, 48000, 960); - if(!result->initialize(error, OPUS_APPLICATION_VOIP)) { - return nullptr; - } - return std::dynamic_pointer_cast(result); - }) - }, - { - _field_(supported, true), - _field_(name, "opus_music"), - _field_(new_converter, [](string& error) -> std::shared_ptr { - auto result = std::make_shared(2, 48000, 960); - if(!result->initialize(error, OPUS_APPLICATION_AUDIO)) { - return nullptr; - } - return std::dynamic_pointer_cast(result); - }) - } -}; - void VoiceClientWrap::do_wrap(const v8::Local &object) { this->Wrap(object); @@ -267,7 +216,7 @@ void VoiceClient::initialize() { void VoiceClient::execute_tick() { switch(this->state_) { case state::buffering: - if(this->_last_received_packet + chrono::milliseconds{250} < chrono::system_clock::now()) { + if(this->packet_queue.last_packet_timestamp + chrono::milliseconds{250} < chrono::system_clock::now()) { this->set_state(state::stopped); log_debug(category::audio, tr("Audio stop packet for client {} seems to be lost. Stopping playback."), this->client_id_); } @@ -341,9 +290,6 @@ void VoiceClient::finalize_js_object() { * @param clip_window The size how long the "overflow" counts * @return true if lower is less than upper */ - #ifdef max - #undef max - #endif inline constexpr bool packet_id_less(uint16_t lower, uint16_t upper, uint16_t window) { constexpr auto bounds = std::numeric_limits::max(); @@ -353,8 +299,9 @@ inline constexpr bool packet_id_less(uint16_t lower, uint16_t upper, uint16_t wi return true; } else if(upper > lower) { return true; + } else { + return false; } - return false; } else { if(lower >= upper) { return false; @@ -365,70 +312,48 @@ inline constexpr bool packet_id_less(uint16_t lower, uint16_t upper, uint16_t wi } inline constexpr uint16_t packet_id_diff(uint16_t lower, uint16_t upper) { - if(upper < lower) - return (uint16_t) (((uint32_t) upper | 0x10000U) - (uint32_t) lower); + if(upper < lower) { + return (uint16_t) (((uint32_t) upper | 0x10000U) - (uint32_t) lower); + } return upper - lower; } #define MAX_LOST_PACKETS (6) -#define TEMP_BUFFER_LENGTH 16384 -void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& buffer, codec::value buffer_codec, bool is_head) { +void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& buffer, uint8_t buffer_codec, bool is_head) { #if 0 if(rand() % 10 == 0) { log_info(category::audio, tr("Dropping audio packet id {}"), packet_id); return; } #endif - if(buffer_codec < 0 || buffer_codec > this->codec.size()) { - log_warn(category::voice_connection, tr("Received voice packet from client {} with unknown codec ({})"), this->client_id_, buffer_codec); - return; - } - - if(!this->output_source) { - /* audio hasn't been initialized yet */ - return; - } - - auto& codec_data = this->codec[buffer_codec]; - if(codec_data.state == AudioCodec::State::UNINITIALIZED) - this->initialize_code(buffer_codec); - - if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) { - log_warn(category::voice_connection, tr("Dropping audio packet because audio codec {} hasn't been initialized successfully (state: {})"), buffer_codec, (int) codec_data.state); - return; - } - //TODO: short circuit handling if we've muted him (e.g. volume = 0) - auto encoded_buffer = new EncodedBuffer{}; encoded_buffer->packet_id = packet_id; encoded_buffer->codec = buffer_codec; encoded_buffer->receive_timestamp = chrono::system_clock::now(); encoded_buffer->buffer = buffer.own_buffer(); encoded_buffer->head = is_head; - this->_last_received_packet = encoded_buffer->receive_timestamp; - { - lock_guard lock{codec_data.pending_lock}; - if(codec_data.stream_timeout() < encoded_buffer->receive_timestamp) { + lock_guard lock{this->packet_queue.pending_lock}; + if(this->packet_queue.stream_timeout() < encoded_buffer->receive_timestamp) { //Old stream hasn't been terminated successfully. //TODO: Cleanup packets which are too old? - codec_data.force_replay = encoded_buffer; + this->packet_queue.force_replay = encoded_buffer; } else if(encoded_buffer->buffer.empty()) { //Flush replay and stop - codec_data.force_replay = encoded_buffer; + this->packet_queue.force_replay = encoded_buffer; } - if(packet_id_less(encoded_buffer->packet_id, codec_data.last_packet_id, MAX_LOST_PACKETS) || encoded_buffer->packet_id == codec_data.last_packet_id) { + if(packet_id_less(encoded_buffer->packet_id, this->packet_queue.last_packet_id, MAX_LOST_PACKETS) || encoded_buffer->packet_id == this->packet_queue.last_packet_id) { log_debug(category::voice_connection, - tr("Received audio packet which is older than the current index (packet: {}, current: {})"), encoded_buffer->packet_id, codec_data.last_packet_id); + tr("Received audio packet which is older than the current index (packet: {}, current: {})"), encoded_buffer->packet_id, this->packet_queue.last_packet_id); return; } /* insert the new buffer */ { EncodedBuffer* prv_head{nullptr}; - auto head{codec_data.pending_buffers}; + auto head{this->packet_queue.pending_buffers}; while(head && packet_id_less(head->packet_id, encoded_buffer->packet_id, MAX_LOST_PACKETS)) { prv_head = head; head = head->next; @@ -438,14 +363,14 @@ void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& b if(prv_head) { prv_head->next = encoded_buffer; } else { - codec_data.pending_buffers = encoded_buffer; + this->packet_queue.pending_buffers = encoded_buffer; } } - codec_data.last_packet_timestamp = encoded_buffer->receive_timestamp; - codec_data.process_pending = true; + this->packet_queue.last_packet_timestamp = encoded_buffer->receive_timestamp; + this->packet_queue.process_pending = true; } - audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); + audio::decode_event_loop->schedule(dynamic_pointer_cast(this->ref())); } void VoiceClient::cancel_replay() { @@ -466,17 +391,13 @@ void VoiceClient::cancel_replay() { } void VoiceClient::drop_enqueued_buffers() { - for(auto& codec_entry : this->codec) { - auto head = codec_entry.pending_buffers; - while(head) { - auto tmp = head->next; - delete head; - head = tmp; - } + auto head = std::exchange(this->packet_queue.pending_buffers, nullptr); + while(head) { + delete std::exchange(head, head->next); + } - codec_entry.pending_buffers = nullptr; - codec_entry.force_replay = nullptr; - } + this->packet_queue.pending_buffers = nullptr; + this->packet_queue.force_replay = nullptr; } void VoiceClient::event_execute(const std::chrono::system_clock::time_point &scheduled) { @@ -487,341 +408,370 @@ void VoiceClient::event_execute(const std::chrono::system_clock::time_point &sch } static auto max_time = chrono::milliseconds(10); - auto reschedule{false}; string error; - auto timeout = chrono::system_clock::now() + max_time; + std::unique_lock lock{this->packet_queue.pending_lock}; + while(this->packet_queue.process_pending) { + assert(lock.owns_lock()); + EncodedBuffer* replay_head{nullptr}; + uint16_t local_last_pid{this->packet_queue.last_packet_id}; - for(auto& audio_codec : this->codec) { - std::unique_lock lock{audio_codec.pending_lock}; - while(audio_codec.process_pending) { - assert(lock.owns_lock()); - EncodedBuffer* replay_head{nullptr}; - uint16_t local_last_pid{audio_codec.last_packet_id}; + /* nothing to play */ + if(!this->packet_queue.pending_buffers) { + this->packet_queue.process_pending = false; + break; + } - /* nothing to play */ - if(!audio_codec.pending_buffers) { - audio_codec.process_pending = false; - break; - } + if(this->packet_queue.force_replay) { + replay_head = this->packet_queue.pending_buffers; + this->packet_queue.pending_buffers = this->packet_queue.force_replay->next; - if(audio_codec.force_replay) { - replay_head = audio_codec.pending_buffers; - audio_codec.pending_buffers = audio_codec.force_replay->next; + this->packet_queue.force_replay->next = nullptr; + this->packet_queue.force_replay = nullptr; + } else { + EncodedBuffer* prv_head{nullptr}; + EncodedBuffer* head{nullptr}; - audio_codec.force_replay->next = nullptr; - audio_codec.force_replay = nullptr; - } else { - EncodedBuffer* prv_head{nullptr}; - EncodedBuffer* head{nullptr}; + //Trying to replay the sequence + head = this->packet_queue.pending_buffers; + while(head && head->packet_id == this->packet_queue.last_packet_id + 1) { + if(!replay_head) { + replay_head = this->packet_queue.pending_buffers; + } - //Trying to replay the sequence - head = audio_codec.pending_buffers; - while(head && head->packet_id == audio_codec.last_packet_id + 1) { - if(!replay_head) { - replay_head = audio_codec.pending_buffers; - } + this->packet_queue.last_packet_id++; + prv_head = head; + head = head->next; + } + this->packet_queue.pending_buffers = head; - audio_codec.last_packet_id++; - prv_head = head; - head = head->next; - } - audio_codec.pending_buffers = head; + if(prv_head) { + prv_head->next = nullptr; /* mark the tail */ + } else { + assert(!replay_head); /* could not be set, else prv_head would be set */ - if(prv_head) { - prv_head->next = nullptr; /* mark the tail */ - } else { - assert(!replay_head); /* could not be set, else prv_head would be set */ - - //No packet found here, test if we've more than n packets in a row somewhere + //No packet found here, test if we've more than n packets in a row somewhere #define SKIP_SEQ_LENGTH (3) - EncodedBuffer* skip_ptr[SKIP_SEQ_LENGTH + 1]; - memset(skip_ptr, 0, sizeof(skip_ptr)); - skip_ptr[0] = audio_codec.pending_buffers; + EncodedBuffer* skip_ptr[SKIP_SEQ_LENGTH + 1]; + memset(skip_ptr, 0, sizeof(skip_ptr)); + skip_ptr[0] = this->packet_queue.pending_buffers; - while(skip_ptr[0]->next) { - for(size_t i = 0; i < SKIP_SEQ_LENGTH; i++) { - if(!skip_ptr[i]->next || skip_ptr[i]->packet_id + 1 != skip_ptr[i]->next->packet_id) { - break; - } + while(skip_ptr[0]->next) { + for(size_t i = 0; i < SKIP_SEQ_LENGTH; i++) { + if(!skip_ptr[i]->next || skip_ptr[i]->packet_id + 1 != skip_ptr[i]->next->packet_id) { + break; + } - skip_ptr[i + 1] = skip_ptr[i]->next; - } + skip_ptr[i + 1] = skip_ptr[i]->next; + } - if(skip_ptr[SKIP_SEQ_LENGTH]) { - break; - } + if(skip_ptr[SKIP_SEQ_LENGTH]) { + break; + } - skip_ptr[0] = skip_ptr[0]->next; - } + skip_ptr[0] = skip_ptr[0]->next; + } - if(skip_ptr[SKIP_SEQ_LENGTH]) { - /* we've three packets in a row */ - replay_head = audio_codec.pending_buffers; - audio_codec.pending_buffers = skip_ptr[SKIP_SEQ_LENGTH]; - skip_ptr[SKIP_SEQ_LENGTH - 1]->next = nullptr; - log_trace(category::voice_connection, tr("Skipping from {} to {} because of {} packets in a row"), audio_codec.last_packet_id, replay_head->packet_id, SKIP_SEQ_LENGTH); + if(skip_ptr[SKIP_SEQ_LENGTH]) { + /* we've three packets in a row */ + replay_head = this->packet_queue.pending_buffers; + this->packet_queue.pending_buffers = skip_ptr[SKIP_SEQ_LENGTH]; + skip_ptr[SKIP_SEQ_LENGTH - 1]->next = nullptr; + log_trace(category::voice_connection, tr("Skipping from {} to {} because of {} packets in a row"), this->packet_queue.last_packet_id, replay_head->packet_id, SKIP_SEQ_LENGTH); - /* - * Do not set process_pending to false, because we're not done - * We're just replaying all loose packets which are not within a sequence until we reach a sequence - * In the next loop the sequence will be played - */ - } else { - head = audio_codec.pending_buffers; - while(head) { - if(packet_id_diff(audio_codec.last_packet_id, head->packet_id) >= 5) { - break; - } + /* + * Do not set process_pending to false, because we're not done + * We're just replaying all loose packets which are not within a sequence until we reach a sequence + * In the next loop the sequence will be played + */ + } else { + head = this->packet_queue.pending_buffers; + while(head) { + if(packet_id_diff(this->packet_queue.last_packet_id, head->packet_id) >= 5) { + break; + } - head = head->next; - } + head = head->next; + } - if(head) { - replay_head = audio_codec.pending_buffers; - audio_codec.pending_buffers = head->next; - head->next = nullptr; - log_trace(category::voice_connection, tr("Skipping from {} to {} because of over 6 packets between"), - audio_codec.last_packet_id, replay_head->packet_id); - /* do not negate process_pending here. Same reason as with the 3 sequence */ - } else { - /* no packets we're willing to replay */ - audio_codec.process_pending = false; - } - } - } - } + if(head) { + replay_head = this->packet_queue.pending_buffers; + this->packet_queue.pending_buffers = head->next; + head->next = nullptr; + log_trace(category::voice_connection, tr("Skipping from {} to {} because of over 6 packets between"), + this->packet_queue.last_packet_id, replay_head->packet_id); + /* do not negate process_pending here. Same reason as with the 3 sequence */ + } else { + /* no packets we're willing to replay */ + this->packet_queue.process_pending = false; + } + } + } + } - if(!replay_head) { - audio_codec.process_pending = false; - break; - } + if(!replay_head) { + this->packet_queue.process_pending = false; + break; + } - { - auto head = replay_head; - while(head->next) { - head = head->next; - } + { + auto head = replay_head; + while(head->next) { + head = head->next; + } - audio_codec.last_packet_id = head->packet_id; - const auto ordered = !audio_codec.pending_buffers || packet_id_less(audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id, 10); - if(!ordered) { - log_critical(category::voice_connection, tr("Unordered packet ids. [!audio_codec.pending_buffers: {}; a: {}; b: {}]"), - !audio_codec.pending_buffers, - audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id - ); - //assert(!audio_codec.pending_buffers || packet_id_less(audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id, 10)); - } - } - lock.unlock(); + this->packet_queue.last_packet_id = head->packet_id; + const auto ordered = !this->packet_queue.pending_buffers || packet_id_less(this->packet_queue.last_packet_id, this->packet_queue.pending_buffers->packet_id, 10); + if(!ordered) { + log_critical(category::voice_connection, tr("Unordered packet ids. [!this->packet_queue.pending_buffers: {}; a: {}; b: {}]"), + !this->packet_queue.pending_buffers, + this->packet_queue.last_packet_id, this->packet_queue.pending_buffers->packet_id + ); + //assert(!this->packet_queue.pending_buffers || packet_id_less(this->packet_queue.last_packet_id, this->packet_queue.pending_buffers->packet_id, 10)); + } + } + lock.unlock(); - while(replay_head) { - if(replay_head->buffer.empty()) { - switch(this->state_) { - case state::playing: - case state::buffering: - this->set_state(state::stopping); - log_debug(category::voice_connection, tr("Client {} send a stop signal. Flushing stream and stopping"), this->client_id_); - break; + while(replay_head) { + if(replay_head->buffer.empty()) { + switch(this->state_) { + case state::playing: + case state::buffering: + this->set_state(state::stopping); + log_debug(category::voice_connection, tr("Client {} send a stop signal. Flushing stream and stopping"), this->client_id_); + break; - case state::stopping: - case state::stopped: - break; + case state::stopping: + case state::stopped: + break; - default: - assert(false); - break; - } - } else { - auto lost_packets = packet_id_diff(local_last_pid, replay_head->packet_id) - 1; - if(lost_packets > 10) { - log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream ({} to {}). Resetting decoder."), this->client_id_, lost_packets, local_last_pid, replay_head->packet_id); - replay_head->reset_decoder = true; - } else if(lost_packets > 0) { - log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream. FEC decoding it."), this->client_id_, lost_packets); - /* - if(audio_codec.converter->decode_lost(error, lost_packets)) - log_warn(category::audio, tr("Failed to decode lost packets for client {}: {}"), this->_client_id, error); - */ - auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer, true); - if(decoded) { - this->output_source->enqueue_samples(decoded->sample_data, decoded->sample_size); - } - } + default: + assert(false); + break; + } + } else { + bool reset_decoder{false}; + auto lost_packets = packet_id_diff(local_last_pid, replay_head->packet_id) - 1; + if(lost_packets > 10) { + log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream ({} to {}). Resetting decoder."), this->client_id_, lost_packets, local_last_pid, replay_head->packet_id); + reset_decoder = true; + } else if(lost_packets > 0) { + log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream. FEC decoding it."), this->client_id_, lost_packets); + /* + if(this->packet_queue.converter->decode_lost(error, lost_packets)) + log_warn(category::audio, tr("Failed to decode lost packets for client {}: {}"), this->_client_id, error); + */ - bool is_new_audio_stream; - switch(this->state_) { - case state::stopped: - case state::stopping: - is_new_audio_stream = true; - break; + /* TODO: Notify the decoder about the lost decode packet? */ + /* Reconstructing and replaying the lost packet by fec decoding the next known packet */ + this->playback_audio_packet(replay_head->codec, replay_head->buffer.data_ptr(), replay_head->buffer.length(), true); + } - case state::buffering: - case state::playing: - is_new_audio_stream = false; - break; + bool is_new_audio_stream; + switch(this->state_) { + case state::stopped: + case state::stopping: + is_new_audio_stream = true; + break; - default: - assert(false); - is_new_audio_stream = false; - break; - } + case state::buffering: + case state::playing: + is_new_audio_stream = false; + break; - if(replay_head->reset_decoder || is_new_audio_stream) { - audio_codec.converter->reset_decoder(); - replay_head->reset_decoder = false; + default: + assert(false); + is_new_audio_stream = false; + break; + } -#if 1 /* Better approch */ - /* initialize with last packet */ - static constexpr auto kTempBufferLength{16384}; - char temp_target_buffer[kTempBufferLength]; - if(kTempBufferLength >= audio_codec.converter->expected_decoded_length(replay_head->buffer.data_ptr(), replay_head->buffer.length())) { - audio_codec.converter->decode(error, replay_head->buffer.data_ptr(), replay_head->buffer.length(), temp_target_buffer, true); - } else { - //TODO: May a small warning here? - } -#endif - } + if(reset_decoder || is_new_audio_stream) { + this->reset_decoder(false); + } -#if 0 /* (maybe) TS3 approch */ - if(replay_head->head) { - /* initialize with last packet */ - char target_buffer[target_buffer_length]; - if(target_buffer_length > audio_codec.converter->expected_decoded_length(replay_head->buffer.data_ptr(), replay_head->buffer.length())) { - audio_codec.converter->decode(error, replay_head->buffer.data_ptr(), replay_head->buffer.length(), target_buffer, 1); - } else { - //TODO: May a small warning here? - } - } -#endif + this->playback_audio_packet(replay_head->codec, replay_head->buffer.data_ptr(), replay_head->buffer.length(), false); + } - //TODO: Use statically allocated buffer? - auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer, false); - if(decoded) { - if(is_new_audio_stream) { - log_warn(category::audio, tr("New audio chunk for client {}"), this->client_id_); + local_last_pid = replay_head->packet_id; - //this->output_source->enqueue_silence((size_t) ceil(0.0075f * (float) this->output_source->sample_rate)); /* enqueue 7.5ms silence so we give the next packet a chance to be send */ - } + delete std::exchange(replay_head, replay_head->next); + } - this->output_source->enqueue_samples(decoded->sample_data, decoded->sample_size); - this->set_state(state::playing); - } - } + /* + * Needs to be locked when entering the loop. + * We'll check for more packets. + */ + lock.lock(); + }; +} - local_last_pid = replay_head->packet_id; - - auto last_head = replay_head; - replay_head = replay_head->next; - delete last_head; - } - - /* - * Needs to be locked when entering the loop. - * We'll check for more packets. - */ - lock.lock(); - }; - } - - if(reschedule) { - log_warn(category::voice_connection, tr("Audio data decode will take longer than {} us. Enqueueing for later"), - chrono::duration_cast(max_time).count()); - audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); +void VoiceClient::reset_decoder(bool deallocate) { + this->decoder.decoder_initialized = false; + if(deallocate) { + this->decoder.decoder = nullptr; + this->decoder.resampler = nullptr; + this->decoder.current_codec = AudioCodec::Unknown; + } else if(this->decoder.decoder) { + this->decoder.decoder->reset_sequence(); } } -void VoiceClient::initialize_code(const codec::value &audio_codec) { - assert(this->output_source); +constexpr static auto kTempBufferSampleSize{1024 * 8}; +void VoiceClient::playback_audio_packet(uint8_t protocol_codec_id, const void *payload, size_t payload_size, bool fec_decode) { + auto payload_codec = audio::codec::audio_codec_from_protocol_id(protocol_codec_id); + if(!payload_codec.has_value()) { + log_trace(category::audio, tr("Received packet with unknown audio codec id ({})."), (size_t) protocol_codec_id); + return; + } - string error; + if(this->decoder.current_codec != *payload_codec) { + if(fec_decode) { + log_debug(category::audio, tr("Trying to fec decode audio packet but decoder hasn't been initialized with that codec. Dropping attempt.")); + return; + } - auto& codec_data = this->codec[audio_codec]; - if(codec_data.state != AudioCodec::State::UNINITIALIZED) { - log_warn(category::voice_connection, tr("Could not initialize codec of type {} because it isn't in uninitialized state anymore!"), (int) codec_data.state); + this->decoder.decoder_initialized = false; + this->decoder.decoder = nullptr; + this->decoder.current_codec = *payload_codec; + + if(!audio::codec::audio_decode_supported(this->decoder.current_codec)) { + log_warn(category::voice_connection, tr("Client {} using and unsupported audio codec ({}). Dropping all its audio data."), + this->client_id_, audio::codec::audio_codec_name(this->decoder.current_codec)); + return; + } + + this->decoder.decoder = audio::codec::create_audio_decoder(this->decoder.current_codec); + if(!this->decoder.decoder) { + log_error(category::voice_connection, tr("Failed to create decoder for audio codec {}."), audio::codec::audio_codec_name(this->decoder.current_codec)); + return; + } + + std::string error{}; + if(!this->decoder.decoder->initialize(error)) { + log_error(category::voice_connection, tr("Failed to initialize {} decoder: {}"), audio::codec::audio_codec_name(this->decoder.current_codec), error); + this->decoder.decoder = nullptr; + return; + } + } + + if(!this->decoder.decoder) { + /* Decoder failed to initialize. Dropping all packets. */ return; } - codec_data.codec = audio_codec; + float temp_buffer[kTempBufferSampleSize]; + const auto decoder_channel_count = this->decoder.decoder->channel_count(); - auto info = codec::get_info(audio_codec); - if(!info || !info->supported) { - log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Codec is not supported"), audio_codec, this->client_id_); - codec_data.state = AudioCodec::State::UNSUPPORTED; - return; + if(!this->decoder.decoder_initialized) { + if(fec_decode) { + log_debug(category::audio, tr("Trying to fec decode audio packet but decoder hasn't been initialized with that codec. Dropping attempt.")); + return; + } + + /* + * We're fec decoding so we need to pass the amount of samples we want to decode. + * Usually a network packet contains 20ms of audio data. + */ + auto sample_count{(size_t) (this->decoder.decoder->sample_rate() * 0.02)}; + + DecodePayloadInfo decode_info{}; + decode_info.fec_decode = true; + decode_info.byte_length = payload_size; + + std::string error{}; + if(!this->decoder.decoder->decode(error, temp_buffer, sample_count, decode_info, payload)) { + log_warn(category::audio, tr("Failed to initialize decoder with fec data: {}"), error); + } + + this->decoder.decoder_initialized = true; } - codec_data.state = AudioCodec::State::INITIALIZED_FAIL; - codec_data.converter = info->new_converter(error); - if(!codec_data.converter) { - log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize decoder: {}"), audio_codec, this->client_id_, error); - return; - } - codec_data.resampler = make_shared(codec_data.converter->sample_rate(), this->output_source->sample_rate(), this->output_source->channel_count()); - if(!codec_data.resampler->valid()) { - log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize resampler"), audio_codec, this->client_id_); - return; - } + size_t current_sample_rate; + size_t current_sample_count; + size_t current_channel_count; + { + std::string error{}; + auto sample_count{kTempBufferSampleSize / decoder_channel_count}; + if(fec_decode) { + /* + * See notes for the decoder initialisation. + */ + sample_count = (size_t) (this->decoder.decoder->sample_rate() * 0.02); + } - codec_data.state = AudioCodec::State::INITIALIZED_SUCCESSFULLY; - log_trace(category::voice_connection, tr("Successfully initialized codec {} for client {}."), audio_codec, this->client_id_); + DecodePayloadInfo decode_info{}; + decode_info.fec_decode = fec_decode; + decode_info.byte_length = payload_size; + if(!this->decoder.decoder->decode(error, temp_buffer, sample_count, decode_info, payload)) { + log_warn(category::audio, tr("Failed to decode audio packet (fec: {}): {}"), fec_decode, error); + } + + current_sample_count = sample_count; + current_channel_count = this->decoder.decoder->channel_count(); + current_sample_rate = this->decoder.decoder->sample_rate(); + } + + auto audio_output = this->output_source; + if(!audio_output) { + /* + * We have no target to replay the audio. + * We're only doing it here and not earlier to provide the decoder with the required info of the audio sequence + * so when we actually have audio we're not lacking behind and having some artefacts. + */ + return; + } + + if(this->volume_ == 0) { + /* Client has been muted */ + return; + } + + const auto output_channel_count = audio_output->channel_count(); + if(current_channel_count != output_channel_count) { + if(kTempBufferSampleSize < output_channel_count * current_sample_count) { + log_error(category::voice_connection, tr("Temporary buffer can't hold {} samples ({} channels) of audio data. Audio frame too big. Dropping it."), current_sample_count, output_channel_count); + return; + } + + if(!audio::merge::merge_channels_interleaved(temp_buffer, output_channel_count, temp_buffer, current_channel_count, current_sample_count)) { + log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count!")); + return; + } + + current_channel_count = output_channel_count; + } + + const auto output_sample_rate = audio_output->sample_rate(); + if(current_sample_rate != output_sample_rate) { + if(!this->decoder.resampler || this->decoder.resampler->output_rate() != output_sample_rate || this->decoder.resampler->input_rate() != current_sample_rate) { + this->decoder.resampler = std::make_unique(current_sample_rate, output_sample_rate, output_channel_count); + } + + auto expected_output_samples = this->decoder.resampler->estimated_output_size(current_sample_count); + if(expected_output_samples * output_channel_count > kTempBufferSampleSize) { + log_error(category::voice_connection, tr("Temporary buffer can't hold the full resampled frame. Dropping it."), current_sample_count, output_channel_count); + return; + } + + size_t output_samples{expected_output_samples}; + if(!this->decoder.resampler->process(temp_buffer, temp_buffer, current_sample_count, output_samples)) { + log_error(category::voice_connection, tr("Failed to resample audio codec sample rate to our audio output sample rate. Dropping audio frame.")); + return; + } + + current_sample_rate = output_sample_rate; + current_sample_count = output_samples; + } + + audio::apply_gain(temp_buffer, current_channel_count, current_sample_count, this->volume_); + + assert(audio_output->sample_rate() == current_sample_rate); + assert(audio_output->channel_count() == current_channel_count); + audio_output->enqueue_samples(temp_buffer, current_sample_count); + this->set_state(state::playing); } -std::shared_ptr VoiceClient::decode_buffer(const codec::value &audio_codec, const pipes::buffer_view &buffer, bool fec) { - assert(this->output_source); - - auto& codec_data = this->codec[audio_codec]; - if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) { - log_trace(category::audio, tr("Cant decode auto buffer of codec {} because codec isn't successfully initialized (state: {})"), audio_codec, (int) codec_data.state); - return nullptr; - } - - string error; - char target_buffer[TEMP_BUFFER_LENGTH]; - if(TEMP_BUFFER_LENGTH < codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length())) { - log_warn(category::voice_connection, tr("Failed to decode audio data. Target buffer is smaller then expected bytes ({} < {})"), TEMP_BUFFER_LENGTH, codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length())); - return nullptr; - } - - auto samples = codec_data.converter->decode(error, buffer.data_ptr(), buffer.length(), target_buffer, fec); - if(samples < 0) { - log_warn(category::voice_connection, tr("Failed to decode audio data: {}"), error); - return nullptr; - } - - if(!audio::merge::merge_channels_interleaved(target_buffer, this->output_source->channel_count(), target_buffer, codec_data.converter->channels(), samples)) { - log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count!")); - return nullptr; - } - - auto estimated_output_samples = codec_data.resampler->estimated_output_size(samples); - if(TEMP_BUFFER_LENGTH < estimated_output_samples * this->output_source->channel_count() * sizeof(float)) { - log_warn(category::voice_connection, tr("Failed to resample audio data. Target buffer is smaller then expected bytes ({} < {})"), TEMP_BUFFER_LENGTH, (codec_data.resampler->estimated_output_size(samples) * this->output_source->channel_count() * 4)); - return nullptr; - } - - auto resampled_samples{estimated_output_samples}; - if(!codec_data.resampler->process(target_buffer, target_buffer, samples, resampled_samples)) { - log_warn(category::voice_connection, tr("Failed to resample audio data.")); - return nullptr; - } - - if(!resampled_samples) { - /* we don't seem to have any output samples */ - return nullptr; - } - - audio::apply_gain(target_buffer, this->output_source->channel_count(), resampled_samples, this->volume_); - auto audio_buffer = audio::SampleBuffer::allocate((uint8_t) this->output_source->channel_count(), (uint16_t) resampled_samples); - - audio_buffer->sample_index = 0; - memcpy(audio_buffer->sample_data, target_buffer, this->output_source->channel_count() * resampled_samples * 4); - return audio_buffer; -} - -void VoiceClient::event_execute_dropped(const std::chrono::system_clock::time_point &point) { } - /* * This method will be called within the audio event loop. */ @@ -855,15 +805,13 @@ bool VoiceClient::handle_output_underflow(size_t sample_count) { * Seems like we don't have any data for a bit longer already. * Lets check if we timeout this stream. */ - if(this->_last_received_packet + std::chrono::seconds{1} < std::chrono::system_clock::now()) { + if(this->packet_queue.last_packet_timestamp + std::chrono::seconds{1} < std::chrono::system_clock::now()) { this->set_state(state::stopped); log_warn(category::audio, tr("Clients {} audio stream timed out. We haven't received any audio packed within the last second. Stopping replay."), this->client_id_, sample_count); - break; } else { /* * Lets wait until we have the next audio packet. */ - break; } break; diff --git a/native/serverconnection/src/connection/audio/VoiceClient.h b/native/serverconnection/src/connection/audio/VoiceClient.h index dd19aac..93c67ef 100644 --- a/native/serverconnection/src/connection/audio/VoiceClient.h +++ b/native/serverconnection/src/connection/audio/VoiceClient.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "../../audio/AudioResampler.h" #include "../../audio/codec/Converter.h" @@ -16,47 +17,8 @@ namespace tc::connection { class VoiceConnection; class VoiceClient; - namespace codec { - enum value { - MIN = 0, - - SPEEX_NARROWBAND = 0, - SPEEX_WIDEBAND = 1, - SPEEX_ULTRA_WIDEBAND = 2, - - CELT_MONO = 3, - - OPUS_VOICE = 4, - OPUS_MUSIC = 5, - - MAX = 5, - }; - - struct CodecInfo { - bool supported; - std::string name; - std::function(std::string&)> new_converter; - }; - - extern const CodecInfo info[6]; - inline const CodecInfo* get_info(value codec) { - if(codec > value::MAX || codec < value::MIN) { - return nullptr; - } - return &info[codec]; - } - } - - class VoiceClient : private event::EventEntry { + class VoiceClient : public event::EventEntry { friend class VoiceConnection; - -#ifdef WIN32 - template - friend _NODISCARD std::shared_ptr<_Tp> std::static_pointer_cast(std::shared_ptr<_Up>&& _Other) noexcept; -#else - template - friend inline std::shared_ptr<_Tp> std::static_pointer_cast(const std::shared_ptr<_Up>& __r) noexcept; -#endif public: struct state { enum value { @@ -90,7 +52,7 @@ namespace tc::connection { inline std::shared_ptr ref() { return this->ref_.lock(); } - void process_packet(uint16_t packet_id, const pipes::buffer_view& /* buffer */, codec::value /* codec */, bool /* head */); + void process_packet(uint16_t packet_id, const pipes::buffer_view& /* buffer */, uint8_t /* payload codec id */, bool /* head */); void execute_tick(); inline float get_volume() const { return this->volume_; } @@ -106,38 +68,20 @@ namespace tc::connection { private: struct EncodedBuffer { bool head{false}; - bool reset_decoder{false}; std::chrono::system_clock::time_point receive_timestamp; pipes::buffer buffer; - codec::value codec{codec::MIN}; + uint8_t codec{0xFF}; uint16_t packet_id{0}; EncodedBuffer* next{nullptr}; }; - struct AudioCodec { - enum struct State { - UNINITIALIZED, - INITIALIZED_SUCCESSFULLY, - INITIALIZED_FAIL, - UNSUPPORTED, - }; - - codec::value codec{}; - + struct { uint16_t last_packet_id{0xFFFF}; /* the first packet id is 0 so one packet before is 0xFFFF */ std::chrono::system_clock::time_point last_packet_timestamp{}; - [[nodiscard]] inline std::chrono::system_clock::time_point stream_timeout() const { - return this->last_packet_timestamp + std::chrono::milliseconds{1000}; - } - - State state{State::UNINITIALIZED}; - std::shared_ptr converter{nullptr}; - std::shared_ptr resampler{nullptr}; - std::mutex pending_lock{}; EncodedBuffer* pending_buffers{nullptr}; @@ -145,10 +89,19 @@ namespace tc::connection { EncodedBuffer* force_replay{nullptr}; bool process_pending{false}; - }; - std::array codec{}; - void initialize_code(const codec::value& /* codec */); + [[nodiscard]] inline std::chrono::system_clock::time_point stream_timeout() const { + return this->last_packet_timestamp + std::chrono::milliseconds{1000}; + } + } packet_queue; + + struct { + /* the decoder has been initialized with fec data */ + bool decoder_initialized{false}; + audio::codec::AudioCodec current_codec{audio::codec::AudioCodec::Unknown}; + std::unique_ptr decoder{}; + std::unique_ptr resampler{}; + } decoder; /* might be null (if audio hasn't been initialized) */ std::shared_ptr output_source{}; @@ -159,7 +112,6 @@ namespace tc::connection { uint16_t client_id_{0}; float volume_{1.f}; - std::chrono::system_clock::time_point _last_received_packet{}; state::value state_{state::stopped}; inline void set_state(state::value value) { if(value == this->state_) { @@ -177,12 +129,19 @@ namespace tc::connection { void drop_enqueued_buffers(); void event_execute(const std::chrono::system_clock::time_point &point) override; - void event_execute_dropped(const std::chrono::system_clock::time_point &point) override; - bool handle_output_underflow(size_t sample_count); - /* its recommend to call this in correct packet oder */ - std::shared_ptr decode_buffer(const codec::value& /* codec */,const pipes::buffer_view& /* buffer */, bool /* fec */); + /** + * Reset the decoder. + */ + void reset_decoder(bool /* deallocate */); + + /** + * Decode and playback an audio packet. + * If fec decode is active we try to decode the fec data within the packet and playback them instead of the packet data. + * Note: If fec is set and the decoder hasn't been initialized we'll drop the buffer. + */ + void playback_audio_packet(uint8_t /* codec protocol id */, const void* /* buffer */, size_t /* buffer length */, bool /* use fec data */); }; diff --git a/native/serverconnection/src/connection/audio/VoiceConnection.cpp b/native/serverconnection/src/connection/audio/VoiceConnection.cpp index 18106e1..44e727e 100644 --- a/native/serverconnection/src/connection/audio/VoiceConnection.cpp +++ b/native/serverconnection/src/connection/audio/VoiceConnection.cpp @@ -211,7 +211,8 @@ NAN_METHOD(VoiceConnectionWrap::get_encoder_codec) { return; } - info.GetReturnValue().Set(handle->get_encoder_codec()); + auto codec = handle->get_encoder_codec(); + info.GetReturnValue().Set(audio::codec::audio_codec_to_protocol_id(codec).value_or(-1)); } NAN_METHOD(VoiceConnectionWrap::set_encoder_codec) { @@ -228,7 +229,12 @@ NAN_METHOD(VoiceConnectionWrap::set_encoder_codec) { return; } - handle->set_encoder_codec((uint8_t) info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0)); + auto codec = audio::codec::audio_codec_from_protocol_id(info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(-1)); + if(!codec.has_value()) { + Nan::ThrowError("unknown codec id"); + return; + } + handle->set_encoder_codec(*codec); } NAN_METHOD(VoiceConnectionWrap::enable_voice_send) { @@ -275,7 +281,7 @@ void VoiceConnectionWrap::release_recorder() { VoiceConnection::VoiceConnection(ServerConnection *handle) : _handle(handle) { this->_voice_sender = make_shared(this); this->_voice_sender->_ref = this->_voice_sender; - this->_voice_sender->set_codec(codec::OPUS_MUSIC); + this->_voice_sender->set_codec(audio::codec::AudioCodec::OpusMusic); } VoiceConnection::~VoiceConnection() { if(v8::Isolate::GetCurrent()) @@ -369,24 +375,23 @@ void VoiceConnection::process_packet(const ts::protocol::PacketParser &packet) { } if(payload.length() > 5) { - client->process_packet(packet_id, payload.view(5), (codec::value) codec_id, flag_head); + client->process_packet(packet_id, payload.view(5), codec_id, flag_head); } else { - client->process_packet(packet_id, pipes::buffer_view{nullptr, 0}, (codec::value) codec_id, flag_head); + client->process_packet(packet_id, pipes::buffer_view{nullptr, 0}, codec_id, flag_head); } } else { //TODO implement whisper } } -void VoiceConnection::set_encoder_codec(const uint8_t &codec) { - if(codec > codec::MAX) return; - +void VoiceConnection::set_encoder_codec(const audio::codec::AudioCodec &codec) { auto vs = this->_voice_sender; - if(vs) - vs->set_codec((codec::value) codec); + if(vs) { + vs->set_codec(codec); + } } -uint8_t VoiceConnection::get_encoder_codec() { +audio::codec::AudioCodec VoiceConnection::get_encoder_codec() { auto vs = this->_voice_sender; - return vs ? vs->get_codec() : 0; + return vs ? vs->target_codec() : audio::codec::AudioCodec::Unknown; } \ No newline at end of file diff --git a/native/serverconnection/src/connection/audio/VoiceConnection.h b/native/serverconnection/src/connection/audio/VoiceConnection.h index 5ee3ab4..f25db7a 100644 --- a/native/serverconnection/src/connection/audio/VoiceConnection.h +++ b/native/serverconnection/src/connection/audio/VoiceConnection.h @@ -6,6 +6,7 @@ #include #include #include +#include "../../audio/codec/Converter.h" namespace tc { namespace audio::recorder { @@ -88,8 +89,8 @@ namespace tc { void process_packet(const ts::protocol::PacketParser&); - void set_encoder_codec(const uint8_t& /* codec */); - uint8_t get_encoder_codec(); + void set_encoder_codec(const audio::codec::AudioCodec& /* target */); + audio::codec::AudioCodec get_encoder_codec(); private: ServerConnection* _handle; std::weak_ptr _ref;