diff --git a/github b/github index 6f5e938..fc5a17d 160000 --- a/github +++ b/github @@ -1 +1 @@ -Subproject commit 6f5e93828bc37c5911fba819b4cd8ed3527c838c +Subproject commit fc5a17d7f08b0eadc2066deb0e491bc3970e9a76 diff --git a/native/serverconnection/exports/exports.d.ts b/native/serverconnection/exports/exports.d.ts index 86a49f7..c60d42c 100644 --- a/native/serverconnection/exports/exports.d.ts +++ b/native/serverconnection/exports/exports.d.ts @@ -158,6 +158,8 @@ declare module "teaclient_connection" { get_buffer_max_latency() : number; set_buffer_max_latency(value: number); + + flush_buffer(); } export interface OwnedAudioOutputStream extends AudioOutputStream { diff --git a/native/serverconnection/src/EventLoop.h b/native/serverconnection/src/EventLoop.h index ae1f98f..d249891 100644 --- a/native/serverconnection/src/EventLoop.h +++ b/native/serverconnection/src/EventLoop.h @@ -16,19 +16,28 @@ namespace tc { virtual void event_execute(const std::chrono::system_clock::time_point& /* scheduled timestamp */) = 0; virtual void event_execute_dropped(const std::chrono::system_clock::time_point& /* scheduled timestamp */) {} - std::unique_lock execute_lock(bool force) { + std::unique_lock execute_lock(bool force) { if(force) - return std::unique_lock(this->_execute_mutex); - else - return std::unique_lock(this->_execute_mutex, std::try_to_lock); + return std::unique_lock(this->_execute_mutex); + else { + auto lock = std::unique_lock(this->_execute_mutex, std::defer_lock); + if(this->execute_lock_timeout.count() > 0) + lock.try_lock_for(this->execute_lock_timeout); + else + lock.try_lock(); + return lock; + } } inline bool single_thread_executed() const { return this->_single_thread; } inline void single_thread_executed(bool value) { this->_single_thread = value; } + + protected: + std::chrono::nanoseconds execute_lock_timeout{0}; private: void* _event_ptr = nullptr; bool _single_thread = true; /* if its set to true there might are some dropped executes! */ - std::mutex _execute_mutex; + std::timed_mutex _execute_mutex; }; class EventExecutor { diff --git a/native/serverconnection/src/audio/AudioOutput.cpp b/native/serverconnection/src/audio/AudioOutput.cpp index 7d1d947..9123ac2 100644 --- a/native/serverconnection/src/audio/AudioOutput.cpp +++ b/native/serverconnection/src/audio/AudioOutput.cpp @@ -21,6 +21,13 @@ ssize_t AudioOutputSource::pop_samples(void *buffer, size_t samples) { _retest: { lock_guard lock(this->buffer_lock); + if(this->buffering) { + if(this->buffered_samples > this->min_buffer) { + this->buffering = false; + } else { + return 0; + } + } while(sample_count > 0 && !this->sample_buffers.empty()) { auto buf = this->sample_buffers[0]; auto sc = min((size_t) (buf->sample_size - buf->sample_index), (size_t) sample_count); @@ -186,19 +193,6 @@ int AudioOutput::audio_callback(const void *input, void *output, unsigned long f for(size_t index = 0; index < sources; index++) { auto& source = this->_sources[index]; - { - lock_guard lock(source->buffer_lock); - if(source->buffering) { - if(source->buffered_samples > source->min_buffer) { - source->buffering = false; - } else { - this->source_merge_buffer[index] = nullptr; - actual_sources--; - continue; - } - } - } - if(volume > 0) { this->source_merge_buffer[index] = (char*) this->source_buffer + (buffer_length * index); auto written_frames = this->_sources[index]->pop_samples(this->source_merge_buffer[index], frameCount); diff --git a/native/serverconnection/src/audio/js/AudioOutputStream.cpp b/native/serverconnection/src/audio/js/AudioOutputStream.cpp index 90c58ac..35e8d36 100644 --- a/native/serverconnection/src/audio/js/AudioOutputStream.cpp +++ b/native/serverconnection/src/audio/js/AudioOutputStream.cpp @@ -20,6 +20,8 @@ NAN_MODULE_INIT(AudioOutputStreamWrapper::Init) { Nan::SetPrototypeMethod(klass, "get_buffer_max_latency", AudioOutputStreamWrapper::_get_buffer_max_latency); Nan::SetPrototypeMethod(klass, "set_buffer_max_latency", AudioOutputStreamWrapper::_set_buffer_max_latency); + Nan::SetPrototypeMethod(klass, "flush_buffer", AudioOutputStreamWrapper::_flush_buffer); + Nan::SetPrototypeMethod(klass, "write_data", AudioOutputStreamWrapper::_write_data); Nan::SetPrototypeMethod(klass, "write_data_rated", AudioOutputStreamWrapper::_write_data_rated); Nan::SetPrototypeMethod(klass, "deleted", AudioOutputStreamWrapper::_deleted); @@ -282,4 +284,21 @@ NAN_METHOD(AudioOutputStreamWrapper::_set_buffer_max_latency) { } handle->max_latency = (size_t) ceil(handle->sample_rate * info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0)); +} + +NAN_METHOD(AudioOutputStreamWrapper::_flush_buffer) { + auto client = ObjectWrap::Unwrap(info.Holder()); + + auto handle = client->_handle.lock(); + if(!handle) { + Nan::ThrowError("weak handle"); + return; + } + + if(info.Length() != 1 || !info[0]->IsNumber()) { + Nan::ThrowError("Invalid arguments"); + return; + } + + handle->clear(); } \ No newline at end of file diff --git a/native/serverconnection/src/audio/js/AudioOutputStream.h b/native/serverconnection/src/audio/js/AudioOutputStream.h index a0686ad..a2a89c7 100644 --- a/native/serverconnection/src/audio/js/AudioOutputStream.h +++ b/native/serverconnection/src/audio/js/AudioOutputStream.h @@ -31,6 +31,8 @@ namespace tc { static NAN_METHOD(_get_buffer_max_latency); static NAN_METHOD(_set_buffer_max_latency); + static NAN_METHOD(_flush_buffer); + /* methods for owned streams only */ static NAN_METHOD(_write_data); static NAN_METHOD(_write_data_rated); diff --git a/native/serverconnection/src/connection/audio/VoiceClient.cpp b/native/serverconnection/src/connection/audio/VoiceClient.cpp index 7a47aec..4b403a2 100644 --- a/native/serverconnection/src/connection/audio/VoiceClient.cpp +++ b/native/serverconnection/src/connection/audio/VoiceClient.cpp @@ -211,7 +211,7 @@ VoiceClientWrap::~VoiceClientWrap() {} VoiceClient::VoiceClient(const std::shared_ptr&, uint16_t client_id) : _client_id(client_id) { this->output_source = global_audio_output->create_source(); this->output_source->overflow_strategy = audio::overflow_strategy::ignore; - this->output_source->max_latency = (size_t) ceil(this->output_source->sample_rate * 1); + this->output_source->max_latency = (size_t) ceil(this->output_source->sample_rate * 0.5); this->output_source->min_buffer = (size_t) ceil(this->output_source->sample_rate * 0.04); this->output_source->on_underflow = [&]{ @@ -237,6 +237,8 @@ VoiceClient::VoiceClient(const std::shared_ptr&, uint16_t clien this->output_source->on_overflow = [&](size_t count){ log_warn(category::audio, tr("Client {} has a audio buffer overflow of {}."), this->_client_id, count); }; + + this->execute_lock_timeout = std::chrono::microseconds{500}; } VoiceClient::~VoiceClient() { @@ -309,36 +311,6 @@ void VoiceClient::event_execute(const std::chrono::system_clock::time_point &sch auto now = chrono::system_clock::now(); while(true) { unique_lock buffer_lock(this->audio_decode_queue_lock); - if(this->play_premature_packets) { - this->play_premature_packets = false; - - for(auto& codec_data : this->codec) { - if(!codec_data) continue; - - if(!codec_data->premature_packets.empty()) { - size_t play_count = 0; - while(!codec_data->premature_packets.empty()) { - auto& packet = codec_data->premature_packets.front(); - - //Test if we're able to replay stuff again - if((uint16_t) (codec_data->last_packet_id + 1) < packet.packet_id && play_count > 0) //Only check for the order if we replayed one already - break; //Nothing new - - this->output_source->enqueue_samples(packet.buffer); - codec_data->last_packet_id = packet.packet_id; - codec_data->premature_packets.pop_front(); - play_count++; - } - -#ifdef DEBUG_PREMATURE_PACKETS - if(play_count > 0) - log_debug(category::audio, tr("Replayed (buffer underflow) {} premature packets for client {}"), play_count, this->_client_id); -#endif - break; - } - } - } - if(this->audio_decode_queue.empty()) break; @@ -354,6 +326,51 @@ void VoiceClient::event_execute(const std::chrono::system_clock::time_point &sch //TODO: Drop too old buffers! this->process_encoded_buffer(entry); } + { + + unique_lock buffer_lock(this->audio_decode_queue_lock); + if(this->play_premature_packets) { + this->play_premature_packets = false; + + for(auto& codec_data : this->codec) { + if(!codec_data) continue; + + if(!codec_data->premature_packets.empty()) { + size_t play_index = 0; + bool should_replay = false; + for(; play_index < codec_data->premature_packets.size() - 1; play_index++) { + auto& packet = codec_data->premature_packets[play_index]; + auto& next_packet = codec_data->premature_packets[play_index + 1]; + if(codec_data->last_packet_id + 5 < packet.packet_id) { + //No packets which are in a row, but we have stuff so replay it + should_replay = true; + break; + } else if(packet.packet_id + 1 == next_packet.packet_id) { + /* we've good sound! */ + should_replay = true; + break; + } + } + + if(should_replay) { + for(size_t index = 0; index <= play_index; index++) { + auto& packet = codec_data->premature_packets[index]; + + this->output_source->enqueue_samples(packet.buffer); + codec_data->last_packet_id = packet.packet_id; + codec_data->premature_packets.pop_front(); + } + codec_data->premature_packets.erase(codec_data->premature_packets.begin(), codec_data->premature_packets.begin() + play_index + 1); +#ifdef DEBUG_PREMATURE_PACKETS + if(play_index > 0) + log_debug(category::audio, tr("Replayed (buffer underflow) {} premature packets for client {}"), play_index + 1, this->_client_id); +#endif + } + break; + } + } + } + } if(audio_decode_event_dropped.exchange(false) && !reschedule) { //Is not really a warning, it happens all the time and isn't really an issue @@ -401,7 +418,10 @@ void VoiceClient::process_encoded_buffer(const std::unique_ptr &b } instance->successfully_initialized = true; codec_data = move(instance); + + log_trace(category::voice_connection, tr("Initalized autio codec {} for client {}"), buffer->codec, this->_client_id); } else if(!codec_data->successfully_initialized) { + log_trace(category::voice_connection, tr("Dropping auto packet for failed initialized codec {} for client {}"), buffer->codec, this->_client_id); return; /* already failed ignore that stuff */ } @@ -416,10 +436,12 @@ void VoiceClient::process_encoded_buffer(const std::unique_ptr &b if(local_index < buffer->packet_id) diff = 0xFF; /* we got in a new generation */ else { + /* log_warn(category::audio, tr("Received voice packet for client {} with is older than the last we received (Current index: {}, Packet index: {}). Dropping packet."), this->_client_id, buffer->packet_id, codec_data->last_packet_id ); + */ return; } } else { @@ -536,6 +558,7 @@ void VoiceClient::process_encoded_buffer(const std::unique_ptr &b if(enqueued != resampled_samples) log_warn(category::voice_connection, tr("Failed to enqueue all samples for client {}. Enqueued {} of {}"), this->_client_id, enqueued, resampled_samples); this->set_state(state::playing); + this->play_premature_packets = false; /* test if any premature got its original place */ {