#include "VoiceClient.h" #include "../../audio/AudioOutput.h" #include "../../audio/codec/Converter.h" #include "../../audio/codec/OpusConverter.h" #include "../../audio/AudioMerger.h" #include "../../audio/js/AudioOutputStream.h" #include "AudioEventLoop.h" #include "../../logger.h" using namespace std; using namespace tc; using namespace tc::audio::codec; using namespace tc::connection; extern tc::audio::AudioOutput* global_audio_output; #define DEBUG_PREMATURE_PACKETS #ifdef WIN32 #define _field_(name, value) value #else #define _field_(name, value) .name = value #endif const codec::condec_info codec::info[6] = { { _field_(supported, false), _field_(name, "speex_narrowband"), _field_(new_converter, nullptr) }, { _field_(supported, false), _field_(name, "speex_wideband"), _field_(new_converter, nullptr) }, { _field_(supported, false), _field_(name, "speex_ultra_wideband"), _field_(new_converter, nullptr) }, { _field_(supported, false), _field_(name, "celt_mono"), _field_(new_converter, nullptr) }, { _field_(supported, true), _field_(name, "opus_voice"), _field_(new_converter, [](string& error) -> shared_ptr { auto result = make_shared(1, 48000, 960); if(!result->initialize(error, OPUS_APPLICATION_VOIP)) return nullptr; return dynamic_pointer_cast(result); }) }, { _field_(supported, true), _field_(name, "opus_music"), _field_(new_converter, [](string& error) -> shared_ptr { auto result = make_shared(2, 48000, 960); if(!result->initialize(error, OPUS_APPLICATION_AUDIO)) return nullptr; return dynamic_pointer_cast(result); }) } }; void VoiceClientWrap::do_wrap(const v8::Local &object) { this->Wrap(object); auto handle = this->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } Nan::Set(object, Nan::New("client_id").ToLocalChecked(), Nan::New(handle->client_id())); handle->on_state_changed = [&]{ this->call_state_changed(); }; this->call_state_changed = Nan::async_callback([&]{ Nan::HandleScope scope; this->_call_state_changed(); }); } void VoiceClientWrap::_call_state_changed() { auto handle = this->_handle.lock(); if(!handle) { log_warn(category::voice_connection, tr("State changed on invalid handle!")); return; } auto state = handle->state(); auto call_playback_callback = state == VoiceClient::state::playing && !this->_currently_playing; auto call_stopped_callback = state == VoiceClient::state::stopped && this->_currently_playing; if(state == VoiceClient::state::stopped) this->_currently_playing = false; if(state == VoiceClient::state::playing) this->_currently_playing = true; if(call_playback_callback) { auto callback = Nan::Get(this->handle(), Nan::New("callback_playback").ToLocalChecked()).ToLocalChecked(); if(callback->IsFunction()) callback.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr); } if(call_stopped_callback) { auto callback = Nan::Get(this->handle(), Nan::New("callback_stopped").ToLocalChecked()).ToLocalChecked(); if(callback->IsFunction()) callback.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr); } auto callback = Nan::Get(this->handle(), Nan::New("callback_state_changed").ToLocalChecked()).ToLocalChecked(); if(callback->IsFunction()) { v8::Local argv[1] = { Nan::New(state) }; callback.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv); } } NAN_MODULE_INIT(VoiceClientWrap::Init) { auto klass = Nan::New(VoiceClientWrap::NewInstance); klass->SetClassName(Nan::New("VoiceConnection").ToLocalChecked()); klass->InstanceTemplate()->SetInternalFieldCount(1); Nan::SetPrototypeMethod(klass, "get_state", VoiceClientWrap::_get_state); Nan::SetPrototypeMethod(klass, "get_volume", VoiceClientWrap::_get_volume); Nan::SetPrototypeMethod(klass, "set_volume", VoiceClientWrap::_set_volume); Nan::SetPrototypeMethod(klass, "abort_replay", VoiceClientWrap::_abort_replay); Nan::SetPrototypeMethod(klass, "get_stream", VoiceClientWrap::_get_stream); constructor().Reset(Nan::GetFunction(klass).ToLocalChecked()); } NAN_METHOD(VoiceClientWrap::NewInstance) { if(!info.IsConstructCall()) Nan::ThrowError("invalid invoke!"); } NAN_METHOD(VoiceClientWrap::_get_volume) { auto client = ObjectWrap::Unwrap(info.Holder()); auto handle = client->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } info.GetReturnValue().Set(handle->get_volume()); } NAN_METHOD(VoiceClientWrap::_set_volume) { auto client = ObjectWrap::Unwrap(info.Holder()); auto handle = client->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } if(info.Length() != 1 || !info[0]->IsNumber()) { Nan::ThrowError("Invalid arguments"); return; } handle->set_volume(info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0)); } NAN_METHOD(VoiceClientWrap::_abort_replay) { auto client = ObjectWrap::Unwrap(info.Holder()); auto handle = client->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } handle->cancel_replay(); } NAN_METHOD(VoiceClientWrap::_get_state) { auto client = ObjectWrap::Unwrap(info.Holder()); auto handle = client->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } info.GetReturnValue().Set(handle->state()); } NAN_METHOD(VoiceClientWrap::_get_stream) { auto client = ObjectWrap::Unwrap(info.Holder()); auto handle = client->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } auto wrapper = new audio::AudioOutputStreamWrapper(handle->output_stream(), false); auto object = Nan::NewInstance(Nan::New(audio::AudioOutputStreamWrapper::constructor()), 0, nullptr).ToLocalChecked(); wrapper->do_wrap(object); info.GetReturnValue().Set(object); } VoiceClientWrap::VoiceClientWrap(const std::shared_ptr& client) : _handle(client) { } VoiceClientWrap::~VoiceClientWrap() {} VoiceClient::VoiceClient(const std::shared_ptr&, uint16_t client_id) : _client_id(client_id) { this->output_source = global_audio_output->create_source(); this->output_source->overflow_strategy = audio::overflow_strategy::ignore; this->output_source->max_latency = (size_t) ceil(this->output_source->sample_rate * 0.5); this->output_source->min_buffer = (size_t) ceil(this->output_source->sample_rate * 0.04); this->output_source->on_underflow = [&]{ if(this->_state == state::stopping) this->set_state(state::stopped); else if(this->_state != state::stopped) { if(this->_last_received_packet + chrono::seconds(1) < chrono::system_clock::now()) { this->set_state(state::stopped); log_warn(category::audio, tr("Client {} has a audio buffer underflow and not received any data for one second. Stopping replay."), this->_client_id); } else { if(this->_state != state::buffering) { log_warn(category::audio, tr("Client {} has a audio buffer underflow. Buffer again."), this->_client_id); this->set_state(state::buffering); } audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); } } return false; }; this->output_source->on_overflow = [&](size_t count){ log_warn(category::audio, tr("Client {} has a audio buffer overflow of {}."), this->_client_id, count); }; this->execute_lock_timeout = std::chrono::microseconds{500}; } VoiceClient::~VoiceClient() { if(v8::Isolate::GetCurrent()) this->finalize_js_object(); else { assert(this->_js_handle.IsEmpty()); } this->cancel_replay(); /* cleanup all buffers */ this->output_source->on_underflow = nullptr; /* to ensure */ global_audio_output->delete_source(this->output_source); } void VoiceClient::initialize_js_object() { if(!this->_js_handle.IsEmpty()) return; auto object_wrap = new VoiceClientWrap(this->ref()); auto object = Nan::NewInstance(Nan::New(VoiceClientWrap::constructor()), 0, nullptr).ToLocalChecked(); Nan::TryCatch tc; object_wrap->do_wrap(object); if(tc.HasCaught()) { tc.ReThrow(); return; } this->_js_handle.Reset(Nan::GetCurrentContext()->GetIsolate(), object); } void VoiceClient::finalize_js_object() { this->_js_handle.Reset(); } /** * @param lower The packet ID which should be lower than the other * @param upper The packet id which should be higher than the lower one * @param clip_window The size how long the "overflow" counts * @return true if lower is less than upper */ #ifdef max #undef max #endif inline constexpr bool packet_id_less(uint16_t lower, uint16_t upper, uint16_t window) { constexpr auto bounds = std::numeric_limits::max(); if(bounds - window <= lower) { uint16_t max_clip = lower + window; if(upper <= max_clip) return true; else if(upper > lower) return true; return false; } else { if(lower >= upper) return false; return upper - lower <= window; } } inline constexpr uint16_t packet_id_diff(uint16_t lower, uint16_t upper) { if(upper < lower) return (uint16_t) (((uint32_t) upper | 0x10000U) - (uint32_t) lower); return upper - lower; } #define MAX_LOST_PACKETS (6) #define target_buffer_length 16384 void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& buffer, codec::value codec, bool is_head) { if(codec < 0 || codec > this->codec.size()) { log_warn(category::voice_connection, tr("Received voice packet from client {} with unknown codec ({})"), this->_client_id, codec); return; } auto& codec_data = this->codec[codec]; if(codec_data.state == AudioCodec::State::UNINITIALIZED) this->initialize_code(codec); if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) { log_warn(category::voice_connection, tr("Dropping audio packet because audio codec {} hasn't been initialized successfully (state: {})"), codec, (int) codec_data.state); return; } //TODO: short circuit handling if we've muted him (e.g. volume = 0) auto encoded_buffer = new EncodedBuffer{}; encoded_buffer->packet_id = packet_id; encoded_buffer->codec = codec; encoded_buffer->receive_timestamp = chrono::system_clock::now(); encoded_buffer->buffer = buffer.own_buffer(); encoded_buffer->head = is_head; this->_last_received_packet = encoded_buffer->receive_timestamp; { lock_guard lock{codec_data.pending_lock}; if(codec_data.stream_timeout() < encoded_buffer->receive_timestamp) { //Old stream hasn't been terminated successfully. //TODO: Cleanup packets which are too old? codec_data.force_replay = encoded_buffer; } else if(encoded_buffer->buffer.empty()) { //Flush replay and stop codec_data.force_replay = encoded_buffer; } if(packet_id_less(encoded_buffer->packet_id, codec_data.last_packet_id, MAX_LOST_PACKETS) || encoded_buffer->packet_id == codec_data.last_packet_id) { log_debug(category::voice_connection, tr("Received audio packet which is older than the current index (packet: {}, current: {})"), encoded_buffer->packet_id, codec_data.last_packet_id); return; } /* insert the new buffer */ { EncodedBuffer* prv_head{nullptr}; auto head{codec_data.pending_buffers}; while(head && packet_id_less(head->packet_id, encoded_buffer->packet_id, MAX_LOST_PACKETS)) { prv_head = head; head = head->next; } encoded_buffer->next = head; if(prv_head) prv_head->next = encoded_buffer; else codec_data.pending_buffers = encoded_buffer; } codec_data.last_packet_timestamp = encoded_buffer->receive_timestamp; codec_data.process_pending = true; } audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); } void VoiceClient::cancel_replay() { log_trace(category::voice_connection, tr("Cancel replay for client {}"), this->_client_id); this->output_source->clear(); this->set_state(state::stopped); audio::decode_event_loop->cancel(static_pointer_cast(this->ref())); auto execute_lock = this->execute_lock(true); for(auto& codec : this->codec) { auto head = codec.pending_buffers; while(head) { auto tmp = head->next; delete head; head = tmp; } codec.pending_buffers = nullptr; codec.force_replay = nullptr; } } void VoiceClient::event_execute(const std::chrono::system_clock::time_point &scheduled) { static auto max_time = chrono::milliseconds(10); auto reschedule{false}; string error; auto timeout = chrono::system_clock::now() + max_time; for(auto& audio_codec : this->codec) { if(!audio_codec.process_pending) continue; unique_lock lock{audio_codec.pending_lock}; do { EncodedBuffer* replay_head{nullptr}; uint16_t local_last_pid{audio_codec.last_packet_id}; /* nothing to play */ if(!audio_codec.pending_buffers) { audio_codec.process_pending = false; break; } if(audio_codec.force_replay) { replay_head = audio_codec.pending_buffers; audio_codec.pending_buffers = audio_codec.force_replay->next; audio_codec.force_replay->next = nullptr; audio_codec.force_replay = nullptr; } else { EncodedBuffer* prv_head{nullptr}; EncodedBuffer* head{nullptr}; //Trying to replay the sequence head = audio_codec.pending_buffers; while(head && head->packet_id == audio_codec.last_packet_id + 1) { if(!replay_head) replay_head = audio_codec.pending_buffers; audio_codec.last_packet_id++; prv_head = head; head = head->next; } audio_codec.pending_buffers = head; if(prv_head) { prv_head->next = nullptr; /* mark the tail */ } else { assert(!replay_head); /* could not be set, else prv_head would be set */ //No packet found here, test if we've more than n packets in a row somewhere #define SKIP_SEQ_LENGTH (1) EncodedBuffer* skip_ptr[SKIP_SEQ_LENGTH + 1]; memset(skip_ptr, 0, sizeof(skip_ptr)); skip_ptr[0] = audio_codec.pending_buffers; while(skip_ptr[0]->next) { for(size_t i = 0; i < SKIP_SEQ_LENGTH; i++) { if(!skip_ptr[i]->next || skip_ptr[i]->packet_id + 1 != skip_ptr[i]->next->packet_id) break; skip_ptr[i + 1] = skip_ptr[i]->next; } if(skip_ptr[SKIP_SEQ_LENGTH]) break; skip_ptr[0] = skip_ptr[0]->next; } if(skip_ptr[SKIP_SEQ_LENGTH]) { /* we've tree packets in a row */ replay_head = audio_codec.pending_buffers; audio_codec.pending_buffers = skip_ptr[SKIP_SEQ_LENGTH]; skip_ptr[SKIP_SEQ_LENGTH - 1]->next = nullptr; log_trace(category::voice_connection, tr("Skipping from {} to {} because of {} packets in a row"), audio_codec.last_packet_id, head->packet_id, SKIP_SEQ_LENGTH); /* Do not set process_pending to false, because we're not done * We're just replaying all loose packets which are not within a sequence until we reach a sequence * In the next loop the sequence will be played */ } else { head = audio_codec.pending_buffers; while(head) { if(packet_id_diff(audio_codec.last_packet_id, head->packet_id) >= 5) { break; } head = head->next; } if(head) { replay_head = audio_codec.pending_buffers; audio_codec.pending_buffers = head->next; head->next = nullptr; log_trace(category::voice_connection, tr("Skipping from {} to {} because of over 6 packets between"), audio_codec.last_packet_id, head->packet_id); /* do not negate process_pending here. Same reason as with the 3 sequence */ } else { /* no packets we're willing to replay */ audio_codec.process_pending = false; } } } } if(!replay_head) { audio_codec.process_pending = false; break; } { auto head = replay_head; while(head->next) head = head->next; audio_codec.last_packet_id = head->packet_id; const auto ordered = !audio_codec.pending_buffers || packet_id_less(audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id, 10); if(!ordered) { log_critical(category::voice_connection, tr("Unordered packet ids. [!audio_codec.pending_buffers: {}; a: {}; b: {}]"), !audio_codec.pending_buffers, audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id ); //assert(!audio_codec.pending_buffers || packet_id_less(audio_codec.last_packet_id, audio_codec.pending_buffers->packet_id, 10)); } } lock.unlock(); while(replay_head) { if(replay_head->buffer.empty()) { this->set_state(state::stopping); log_debug(category::voice_connection, tr("Client {} send a stop signal. Flushing stream and stopping"), this->_client_id); } else { auto lost_packets = packet_id_diff(local_last_pid, replay_head->packet_id) - 1; if(lost_packets > 6) { log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream ({} to {}). Resetting decoder."), this->_client_id, lost_packets, local_last_pid, replay_head->packet_id); replay_head->reset_decoder = true; } else if(lost_packets > 0) { log_debug(category::voice_connection, tr("Client {} seems to be missing {} packets in stream. Skipping ahead."), this->_client_id, lost_packets); if(audio_codec.converter->decode_lost(error, lost_packets)) log_warn(category::audio, tr("Failed to decode lost packets for client {}: {}"), this->_client_id, error); } if(replay_head->reset_decoder) audio_codec.converter->reset_decoder(); auto decoded = this->decode_buffer(audio_codec.codec, replay_head->buffer); this->output_source->enqueue_samples(decoded); this->set_state(state::playing); } local_last_pid = replay_head->packet_id; replay_head = replay_head->next; } lock.lock(); //Check for more packets //TODO: Check for timeout? } while(audio_codec.process_pending); } if(reschedule) { log_warn(category::voice_connection, tr("Audio data decode will take longer than {} us. Enqueueing for later"), chrono::duration_cast(max_time).count()); audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); } } void VoiceClient::initialize_code(const codec::value &audio_codec) { string error; auto& codec_data = this->codec[audio_codec]; if(codec_data.state != AudioCodec::State::UNINITIALIZED) { log_warn(category::voice_connection, tr("Could not initialize codec of type {} because it isn't in uninitialized state anymore!"), (int) codec_data.state); return; } codec_data.codec = audio_codec; auto info = codec::get_info(audio_codec); if(!info || !info->supported) { log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Codec is not supported"), audio_codec, this->_client_id); codec_data.state = AudioCodec::State::UNSUPPORTED; return; } codec_data.state = AudioCodec::State::INITIALIZED_FAIL; codec_data.converter = info->new_converter(error); if(!codec_data.converter) { log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize decoder: {}"), audio_codec, this->_client_id, error); return; } codec_data.resampler = make_shared(codec_data.converter->sample_rate(), this->output_source->sample_rate, codec_data.converter->channels()); if(!codec_data.resampler->valid()) { log_warn(category::voice_connection, tr("Failed to initialized codec {} for client {}. Failed to initialize resampler"), audio_codec, this->_client_id); return; } codec_data.state = AudioCodec::State::INITIALIZED_SUCCESSFULLY; log_trace(category::voice_connection, tr("Successfully initialized codec {} for client {}."), audio_codec, this->_client_id); } std::shared_ptr VoiceClient::decode_buffer(const codec::value &audio_codec, const pipes::buffer_view &buffer) { auto& codec_data = this->codec[audio_codec]; if(codec_data.state != AudioCodec::State::INITIALIZED_SUCCESSFULLY) { log_trace(category::audio, tr("Cant decode auto buffer of codec {} because codec isn't successfully initialized (state: {})"), audio_codec, (int) codec_data.state); return nullptr; } string error; char target_buffer[target_buffer_length]; if(target_buffer_length < codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length())) { log_warn(category::voice_connection, tr("Failed to decode audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, codec_data.converter->expected_decoded_length(buffer.data_ptr(), buffer.length())); return nullptr; } auto samples = codec_data.converter->decode(error, buffer.data_ptr(), buffer.length(), target_buffer); if(samples < 0) { log_warn(category::voice_connection, tr("Failed to decode audio data: {}"), error); return nullptr; } if(target_buffer_length < codec_data.resampler->estimated_output_size(samples) * codec_data.resampler->channels() * 4) { log_warn(category::voice_connection, tr("Failed to resample audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, (codec_data.resampler->estimated_output_size(samples) * codec_data.resampler->channels() * 4)); return nullptr; } auto resampled_samples = codec_data.resampler->process(target_buffer, target_buffer, samples); if(resampled_samples <= 0) { log_warn(category::voice_connection, tr("Failed to resample audio data. Resampler resulted in {}"), resampled_samples); return nullptr; } if(!audio::merge::merge_channels_interleaved(target_buffer, this->output_source->channel_count, target_buffer, codec_data.resampler->channels(), resampled_samples)) { log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count!")); return nullptr; } if(this->_volume != 1) { auto buf = (float*) target_buffer; auto count = this->output_source->channel_count * resampled_samples; while(count-- > 0) *(buf++) *= this->_volume; } auto audio_buffer = audio::SampleBuffer::allocate((uint8_t) this->output_source->channel_count, (uint16_t) resampled_samples); audio_buffer->sample_index = 0; memcpy(audio_buffer->sample_data, target_buffer, this->output_source->channel_count * resampled_samples * 4); return audio_buffer; } void VoiceClient::event_execute_dropped(const std::chrono::system_clock::time_point &point) { if(audio_decode_event_dropped.exchange(true)) //Is not really a warning, it happens all the time and isn't really an issue ;//log_warn(category::voice_connection, tr("Dropped auto enqueue event execution two or more times in a row for client {}"), this->_client_id); }