#include "VoiceClient.h" #include "../../audio/AudioOutput.h" #include "../../audio/codec/Converter.h" #include "../../audio/codec/OpusConverter.h" #include "../../audio/AudioMerger.h" #include "../../audio/js/AudioOutputStream.h" #include "AudioEventLoop.h" #include "../../logger.h" using namespace std; using namespace tc; using namespace tc::audio::codec; using namespace tc::connection; extern tc::audio::AudioOutput* global_audio_output; #define DEBUG_PREMATURE_PACKETS #ifdef WIN32 #define _field_(name, value) value #else #define _field_(name, value) .name = value #endif const codec::condec_info codec::info[6] = { { _field_(supported, false), _field_(name, "speex_narrowband"), _field_(new_converter, nullptr) }, { _field_(supported, false), _field_(name, "speex_wideband"), _field_(new_converter, nullptr) }, { _field_(supported, false), _field_(name, "speex_ultra_wideband"), _field_(new_converter, nullptr) }, { _field_(supported, false), _field_(name, "celt_mono"), _field_(new_converter, nullptr) }, { _field_(supported, true), _field_(name, "opus_voice"), _field_(new_converter, [](string& error) -> shared_ptr { auto result = make_shared(1, 48000, 960); if(!result->initialize(error, OPUS_APPLICATION_VOIP)) return nullptr; return dynamic_pointer_cast(result); }) }, { _field_(supported, true), _field_(name, "opus_music"), _field_(new_converter, [](string& error) -> shared_ptr { auto result = make_shared(2, 48000, 960); if(!result->initialize(error, OPUS_APPLICATION_AUDIO)) return nullptr; return dynamic_pointer_cast(result); }) } }; void VoiceClientWrap::do_wrap(const v8::Local &object) { this->Wrap(object); auto handle = this->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } Nan::Set(object, Nan::New("client_id").ToLocalChecked(), Nan::New(handle->client_id())); handle->on_state_changed = [&]{ this->call_state_changed(); }; this->call_state_changed = Nan::async_callback([&]{ Nan::HandleScope scope; this->_call_state_changed(); }); } void VoiceClientWrap::_call_state_changed() { auto handle = this->_handle.lock(); if(!handle) { log_warn(category::voice_connection, tr("State changed on invalid handle!")); return; } auto state = handle->state(); auto call_playback_callback = state == VoiceClient::state::playing && !this->_currently_playing; auto call_stopped_callback = state == VoiceClient::state::stopped && this->_currently_playing; if(state == VoiceClient::state::stopped) this->_currently_playing = false; if(state == VoiceClient::state::playing) this->_currently_playing = true; if(call_playback_callback) { auto callback = Nan::Get(this->handle(), Nan::New("callback_playback").ToLocalChecked()).ToLocalChecked(); if(callback->IsFunction()) callback.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr); } if(call_stopped_callback) { auto callback = Nan::Get(this->handle(), Nan::New("callback_stopped").ToLocalChecked()).ToLocalChecked(); if(callback->IsFunction()) callback.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 0, nullptr); } auto callback = Nan::Get(this->handle(), Nan::New("callback_state_changed").ToLocalChecked()).ToLocalChecked(); if(callback->IsFunction()) { v8::Local argv[1] = { Nan::New(state) }; callback.As()->Call(Nan::GetCurrentContext(), Nan::Undefined(), 1, argv); } } NAN_MODULE_INIT(VoiceClientWrap::Init) { auto klass = Nan::New(VoiceClientWrap::NewInstance); klass->SetClassName(Nan::New("VoiceConnection").ToLocalChecked()); klass->InstanceTemplate()->SetInternalFieldCount(1); Nan::SetPrototypeMethod(klass, "get_state", VoiceClientWrap::_get_state); Nan::SetPrototypeMethod(klass, "get_volume", VoiceClientWrap::_get_volume); Nan::SetPrototypeMethod(klass, "set_volume", VoiceClientWrap::_set_volume); Nan::SetPrototypeMethod(klass, "abort_replay", VoiceClientWrap::_abort_replay); Nan::SetPrototypeMethod(klass, "get_stream", VoiceClientWrap::_get_stream); constructor().Reset(Nan::GetFunction(klass).ToLocalChecked()); } NAN_METHOD(VoiceClientWrap::NewInstance) { if(!info.IsConstructCall()) Nan::ThrowError("invalid invoke!"); } NAN_METHOD(VoiceClientWrap::_get_volume) { auto client = ObjectWrap::Unwrap(info.Holder()); auto handle = client->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } info.GetReturnValue().Set(handle->get_volume()); } NAN_METHOD(VoiceClientWrap::_set_volume) { auto client = ObjectWrap::Unwrap(info.Holder()); auto handle = client->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } if(info.Length() != 1 || !info[0]->IsNumber()) { Nan::ThrowError("Invalid arguments"); return; } handle->set_volume(info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0)); } NAN_METHOD(VoiceClientWrap::_abort_replay) { auto client = ObjectWrap::Unwrap(info.Holder()); auto handle = client->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } handle->cancel_replay(); } NAN_METHOD(VoiceClientWrap::_get_state) { auto client = ObjectWrap::Unwrap(info.Holder()); auto handle = client->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } info.GetReturnValue().Set(handle->state()); } NAN_METHOD(VoiceClientWrap::_get_stream) { auto client = ObjectWrap::Unwrap(info.Holder()); auto handle = client->_handle.lock(); if(!handle) { Nan::ThrowError("weak handle"); return; } auto wrapper = new audio::AudioOutputStreamWrapper(handle->output_stream(), false); auto object = Nan::NewInstance(Nan::New(audio::AudioOutputStreamWrapper::constructor()), 0, nullptr).ToLocalChecked(); wrapper->do_wrap(object); info.GetReturnValue().Set(object); } VoiceClientWrap::VoiceClientWrap(const std::shared_ptr& client) : _handle(client) { } VoiceClientWrap::~VoiceClientWrap() {} VoiceClient::VoiceClient(const std::shared_ptr&, uint16_t client_id) : _client_id(client_id) { this->output_source = global_audio_output->create_source(); this->output_source->overflow_strategy = audio::overflow_strategy::ignore; this->output_source->max_latency = (size_t) ceil(this->output_source->sample_rate * 1); this->output_source->min_buffer = (size_t) ceil(this->output_source->sample_rate * 0.04); this->output_source->on_underflow = [&]{ if(this->_state == state::stopping) this->set_state(state::stopped); else if(this->_state != state::stopped) { if(this->_last_received_packet + chrono::seconds(1) < chrono::system_clock::now()) { this->set_state(state::stopped); log_warn(category::audio, tr("Client {} has a audio buffer underflow and not received any data for one second. Stopping replay."), this->_client_id); } else { if(this->_state != state::buffering) { log_warn(category::audio, tr("Client {} has a audio buffer underflow. Buffer again and try to replay prematured packets."), this->_client_id); this->set_state(state::buffering); } play_premature_packets = true; /* try to replay any premature packets because we assume that the other packets got lost */ audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); } } return false; }; this->output_source->on_overflow = [&](size_t count){ log_warn(category::audio, tr("Client {} has a audio buffer overflow of {}."), this->_client_id, count); }; } VoiceClient::~VoiceClient() { if(v8::Isolate::GetCurrent()) this->finalize_js_object(); else { assert(this->_js_handle.IsEmpty()); } this->output_source->on_underflow = nullptr; /* to ensure */ global_audio_output->delete_source(this->output_source); } void VoiceClient::initialize_js_object() { if(!this->_js_handle.IsEmpty()) return; auto object_wrap = new VoiceClientWrap(this->ref()); auto object = Nan::NewInstance(Nan::New(VoiceClientWrap::constructor()), 0, nullptr).ToLocalChecked(); Nan::TryCatch tc; object_wrap->do_wrap(object); if(tc.HasCaught()) { tc.ReThrow(); return; } this->_js_handle.Reset(Nan::GetCurrentContext()->GetIsolate(), object); } void VoiceClient::finalize_js_object() { this->_js_handle.Reset(); } #define target_buffer_length 16384 void VoiceClient::process_packet(uint16_t packet_id, const pipes::buffer_view& buffer, codec::value codec, bool head) { if(this->_volume == 0) return; if(codec < 0 || codec > this->codec.size()) { log_warn(category::voice_connection, tr("Received voice packet from client {} with unknown codec ({})"), this->_client_id, codec); return; } auto encoded_buffer = make_unique(); encoded_buffer->packet_id = packet_id; encoded_buffer->codec = codec; encoded_buffer->receive_timestamp = chrono::system_clock::now(); encoded_buffer->buffer = buffer.own_buffer(); encoded_buffer->head = head; this->_last_received_packet = encoded_buffer->receive_timestamp; { lock_guard lock(this->audio_decode_queue_lock); this->audio_decode_queue.push_back(move(encoded_buffer)); } audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); } void VoiceClient::cancel_replay() { log_trace(category::voice_connection, tr("Cancel replay for client {}"), this->_client_id); this->output_source->clear(); this->set_state(state::stopped); } void VoiceClient::event_execute(const std::chrono::system_clock::time_point &scheduled) { static auto max_time = chrono::milliseconds(10); bool reschedule = false; auto now = chrono::system_clock::now(); while(true) { unique_lock buffer_lock(this->audio_decode_queue_lock); if(this->play_premature_packets) { this->play_premature_packets = false; for(auto& codec_data : this->codec) { if(!codec_data) continue; if(!codec_data->premature_packets.empty()) { size_t play_count = 0; while(!codec_data->premature_packets.empty()) { auto& packet = codec_data->premature_packets.front(); //Test if we're able to replay stuff again if((uint16_t) (codec_data->last_packet_id + 1) < packet.packet_id && play_count > 0) //Only check for the order if we replayed one already break; //Nothing new this->output_source->enqueue_samples(packet.buffer); codec_data->last_packet_id = packet.packet_id; codec_data->premature_packets.pop_front(); play_count++; } #ifdef DEBUG_PREMATURE_PACKETS if(play_count > 0) log_debug(category::audio, tr("Replayed (buffer underflow) {} premature packets for client {}"), play_count, this->_client_id); #endif break; } } } if(this->audio_decode_queue.empty()) break; if(chrono::system_clock::now() - now > max_time) { reschedule = true; break; } auto entry = move(this->audio_decode_queue.front()); this->audio_decode_queue.pop_front(); buffer_lock.unlock(); //TODO: Drop too old buffers! this->process_encoded_buffer(entry); } if(audio_decode_event_dropped.exchange(false) && !reschedule) { //Is not really a warning, it happens all the time and isn't really an issue //log_warn(category::voice_connection, tr("Dropped auto enqueue event execution for client {}. No reschedulling planned, hopefully we processed all buffers."), this->_client_id); } if(reschedule) { log_warn(category::voice_connection, tr("Audio data decode will take longer than {} us. Enqueueing for later"), chrono::duration_cast(max_time).count()); audio::decode_event_loop->schedule(static_pointer_cast(this->ref())); } } #define MAX_LOST_PACKETS (6) //Note: This function must be executed single threaded void VoiceClient::process_encoded_buffer(const std::unique_ptr &buffer) { string error; auto& codec_data = this->codec[buffer->codec]; if(!codec_data) { auto info = codec::get_info(buffer->codec); if(!info || !info->supported) { log_warn(category::voice_connection, tr("Received voice packet from client {}, but we dont support it ({})"), this->_client_id, buffer->codec); return; } auto instance = make_unique(); instance->successfully_initialized = false; instance->last_packet_id = (uint16_t) (buffer->packet_id - 1); /* could be 0xFFFF */ instance->converter = info->new_converter(error); if(!instance->converter) { codec_data = move(instance); log_warn(category::voice_connection, tr("Failed to initialize new codec {} for client {}: {}"), buffer->codec, this->_client_id, error); return; } instance->resampler = make_shared(instance->converter->sample_rate(), this->output_source->sample_rate, instance->converter->channels()); if(!instance->resampler->valid()) { codec_data = move(instance); log_warn(category::voice_connection, tr("Failed to initialize new codec resampler {} for client {}"), buffer->codec, this->_client_id); return; } instance->successfully_initialized = true; codec_data = move(instance); } else if(!codec_data->successfully_initialized) { return; /* already failed ignore that stuff */ } uint16_t diff; bool premature = false; if(codec_data->last_packet_timestamp + chrono::seconds(1) < buffer->receive_timestamp || this->_state >= state::stopping) { diff = 0xFFFF; } else { if(codec_data->last_packet_id > buffer->packet_id) { auto local_index = (uint16_t) (codec_data->last_packet_id + MAX_LOST_PACKETS); if(local_index < buffer->packet_id) diff = 0xFF; /* we got in a new generation */ else { log_warn(category::audio, tr("Received voice packet for client {} with is older than the last we received (Current index: {}, Packet index: {}). Dropping packet."), this->_client_id, buffer->packet_id, codec_data->last_packet_id ); return; } } else { diff = buffer->packet_id - codec_data->last_packet_id; } } const auto old_packet_id = codec_data->last_packet_id; codec_data->last_packet_timestamp = buffer->receive_timestamp; if(buffer->buffer.empty()) { /* lets playback the last samples and we're done */ this->set_state(state::stopping); /* enqueue all premature packets (list should be already ordered!) */ { unique_lock buffer_lock(this->audio_decode_queue_lock); for(const auto& packet : codec_data->premature_packets) this->output_source->enqueue_samples(packet.buffer); codec_data->premature_packets.clear(); } log_trace(category::voice_connection, tr("Stopping replay for client {}. Empty buffer!"), this->_client_id); return; } if(diff == 0) { //Duplicated packets log_warn(category::audio, tr("Received voice packet with the same ID then the last one. Dropping packet.")); return; } else diff--; /* because the diff is normally 1 (ofc) */ if(diff <= MAX_LOST_PACKETS) { if(diff > 0) { /* lets first handle packet as "lost", even thou we're enqueueing it as premature */ //auto status = codec_data->converter->decode_lost(error, diff); //if(status < 0) // log_warn(category::voice_connection, tr("Failed to decode (skip) dropped packets. Return code {} => {}"), status, error); premature = !buffer->head && this->state() != state::stopped; log_debug(category::voice_connection, tr("Client {} dropped one or more audio packets. Old packet id: {}, New packet id: {}, Diff: {}. Head: {}. Flagging chunk as premature: {}"), this->_client_id, old_packet_id, buffer->packet_id, diff, buffer->head, premature); } } else { log_debug(category::voice_connection, tr("Client {} resetted decoder. Old packet id: {}, New packet id: {}, diff: {}"), this->_client_id, old_packet_id, buffer->packet_id, diff); codec_data->converter->reset_decoder(); if(!codec_data->converter) { log_warn(category::voice_connection, tr("Failed to reset codec decoder {} for client {}: {}"), buffer->codec, this->_client_id, error); return; } } if(!premature) codec_data->last_packet_id = buffer->packet_id; char target_buffer[target_buffer_length]; if(target_buffer_length < codec_data->converter->expected_decoded_length(buffer->buffer.data_ptr(), buffer->buffer.length())) { log_warn(category::voice_connection, tr("Failed to decode audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, codec_data->converter->expected_decoded_length(buffer->buffer.data_ptr(), buffer->buffer.length())); return; } auto samples = codec_data->converter->decode(error, buffer->buffer.data_ptr(), buffer->buffer.length(), target_buffer); if(samples < 0) { log_warn(category::voice_connection, tr("Failed to decode audio data: {}"), error); return; } if(target_buffer_length < codec_data->resampler->estimated_output_size(samples) * codec_data->resampler->channels() * 4) { log_warn(category::voice_connection, tr("Failed to resample audio data. Target buffer is smaller then expected bytes ({} < {})"), target_buffer_length, (codec_data->resampler->estimated_output_size(samples) * codec_data->resampler->channels() * 4)); return; } auto resampled_samples = codec_data->resampler->process(target_buffer, target_buffer, samples); if(resampled_samples <= 0) { log_warn(category::voice_connection, tr("Failed to resample audio data. Resampler resulted in {}"), resampled_samples); return; } if(!audio::merge::merge_channels_interleaved(target_buffer, this->output_source->channel_count, target_buffer, codec_data->resampler->channels(), resampled_samples)) { log_warn(category::voice_connection, tr("Failed to merge channels to output stream channel count!")); return; } if(this->_volume != 1) { auto buf = (float*) target_buffer; auto count = this->output_source->channel_count * resampled_samples; while(count-- > 0) *(buf++) *= this->_volume; } if(premature) { auto audio_buffer = audio::SampleBuffer::allocate((uint8_t) this->output_source->channel_count, (uint16_t) resampled_samples); audio_buffer->sample_index = 0; memcpy(audio_buffer->sample_data, target_buffer, this->output_source->channel_count * resampled_samples * 4); { unique_lock buffer_lock(this->audio_decode_queue_lock); auto it = codec_data->premature_packets.begin(); for(; it != codec_data->premature_packets.end(); it++) { if(it->packet_id > buffer->packet_id) { break; /* it is set to the right position */ } } codec_data->premature_packets.insert(it, { buffer->packet_id, move(audio_buffer) }); std::stable_sort(codec_data->premature_packets.begin(), codec_data->premature_packets.end(), [](const PrematureAudioPacket& a, const PrematureAudioPacket& b) { return a.packet_id < b.packet_id; }); } } else { auto enqueued = this->output_source->enqueue_samples(target_buffer, resampled_samples); if(enqueued != resampled_samples) log_warn(category::voice_connection, tr("Failed to enqueue all samples for client {}. Enqueued {} of {}"), this->_client_id, enqueued, resampled_samples); this->set_state(state::playing); /* test if any premature got its original place */ { unique_lock buffer_lock(this->audio_decode_queue_lock); size_t play_count = 0; while(!codec_data->premature_packets.empty()) { auto& packet = codec_data->premature_packets[0]; //Test if we're able to replay stuff again if((uint16_t) (codec_data->last_packet_id + 1) < packet.packet_id) break; //Nothing new this->output_source->enqueue_samples(packet.buffer); codec_data->last_packet_id = packet.packet_id; codec_data->premature_packets.pop_front(); play_count++; } #ifdef DEBUG_PREMATURE_PACKETS if(play_count > 0) log_debug(category::audio, tr("Replayed (id match) {} premature packets for client {}"), play_count, this->_client_id); #endif } } } void VoiceClient::event_execute_dropped(const std::chrono::system_clock::time_point &point) { if(audio_decode_event_dropped.exchange(true)) //Is not really a warning, it happens all the time and isn't really an issue ;//log_warn(category::voice_connection, tr("Dropped auto enqueue event execution two or more times in a row for client {}"), this->_client_id); }