From d7fa67b0aa29dc37cd327b4ee1578ac44efa96df Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Thu, 16 Apr 2020 13:12:00 +0200 Subject: [PATCH] Some minor improvements --- native/CMakeLists.txt | 4 +- native/serverconnection/exports/exports.d.ts | 5 +- .../src/audio/filter/FilterThreshold.cpp | 8 +- .../src/audio/filter/FilterThreshold.h | 9 +- .../src/audio/filter/FilterVad.cpp | 8 +- .../src/audio/filter/FilterVad.h | 8 +- .../src/audio/js/AudioFilter.cpp | 16 +-- .../src/audio/js/AudioFilter.h | 4 +- .../src/connection/ProtocolHandler.cpp | 2 + .../src/connection/ProtocolHandler.h | 1 + .../src/connection/ProtocolHandlerPOW.cpp | 10 +- .../src/connection/Socket.cpp | 114 +++++++++--------- .../serverconnection/src/connection/Socket.h | 4 +- native/serverconnection/test/js/main.ts | 14 ++- 14 files changed, 115 insertions(+), 92 deletions(-) diff --git a/native/CMakeLists.txt b/native/CMakeLists.txt index 4d77087..a239ec3 100644 --- a/native/CMakeLists.txt +++ b/native/CMakeLists.txt @@ -24,8 +24,8 @@ function(setup_nodejs) set(NODEJS_URL "https://atom.io/download/atom-shell") set(NODEJS_VERSION "v8.0.0") -# set(NODEJS_URL "https://nodejs.org/download/release/") -# set(NODEJS_VERSION "v12.13.0") + set(NODEJS_URL "https://nodejs.org/download/release/") + set(NODEJS_VERSION "v12.13.0") find_package(NodeJS REQUIRED) diff --git a/native/serverconnection/exports/exports.d.ts b/native/serverconnection/exports/exports.d.ts index c01dcbb..822ea99 100644 --- a/native/serverconnection/exports/exports.d.ts +++ b/native/serverconnection/exports/exports.d.ts @@ -187,8 +187,9 @@ declare module "tc-native/connection" { } export interface MarginedFilter { - get_margin_frames() : number; - set_margin_frames(value: number); + /* in seconds */ + get_margin_time() : number; + set_margin_time(value: number); } export interface VADConsumeFilter extends ConsumeFilter, MarginedFilter { diff --git a/native/serverconnection/src/audio/filter/FilterThreshold.cpp b/native/serverconnection/src/audio/filter/FilterThreshold.cpp index 71e55ba..d2da025 100644 --- a/native/serverconnection/src/audio/filter/FilterThreshold.cpp +++ b/native/serverconnection/src/audio/filter/FilterThreshold.cpp @@ -13,7 +13,7 @@ ThresholdFilter::~ThresholdFilter() {} bool ThresholdFilter::initialize(std::string &, float val, size_t margin) { this->_threshold = val; - this->_margin_frames = margin; + this->_margin_samples = margin; return true; } @@ -53,7 +53,7 @@ bool ThresholdFilter::process(const void *_buffer) { auto last_level = this->_current_level; float smooth; - if(this->_margin_processed_frames == 0) /* we're in release */ + if(this->_margin_processed_samples == 0) /* we're in release */ smooth = this->_release_smooth; else smooth = this->_attack_smooth; @@ -65,11 +65,11 @@ bool ThresholdFilter::process(const void *_buffer) { analyze_callback(this->_current_level); if(this->_current_level >= this->_threshold) { - this->_margin_processed_frames = 0; + this->_margin_processed_samples = 0; return true; } - return this->_margin_processed_frames++ < this->_margin_frames; + return (this->_margin_processed_samples += this->_frame_size) < this->_margin_samples; } diff --git a/native/serverconnection/src/audio/filter/FilterThreshold.h b/native/serverconnection/src/audio/filter/FilterThreshold.h index 1285021..9b0bb29 100644 --- a/native/serverconnection/src/audio/filter/FilterThreshold.h +++ b/native/serverconnection/src/audio/filter/FilterThreshold.h @@ -21,8 +21,9 @@ namespace tc { inline float threshold() { return this->_threshold; } inline void set_threshold(float value) { this->_threshold = value; } - inline size_t margin_frames() { return this->_margin_frames; } - inline void set_margin_frames(size_t value) { this->_margin_frames = value; } + /* in seconds */ + inline float margin_release_time() { return (float) this->_margin_samples / (float) this->_sample_rate; } + inline void set_margin_release_time(float value) { this->_margin_samples = (size_t) ceil((float) this->_sample_rate * value); } inline void attack_smooth(float value) { this->_attack_smooth = value; } inline float attack_smooth() { return this->_attack_smooth; } @@ -38,8 +39,8 @@ namespace tc { float _threshold; - size_t _margin_frames = 0; - size_t _margin_processed_frames = 0; + size_t _margin_samples = 0; + size_t _margin_processed_samples = 0; }; } } diff --git a/native/serverconnection/src/audio/filter/FilterVad.cpp b/native/serverconnection/src/audio/filter/FilterVad.cpp index d38c3a8..6f0f493 100644 --- a/native/serverconnection/src/audio/filter/FilterVad.cpp +++ b/native/serverconnection/src/audio/filter/FilterVad.cpp @@ -44,7 +44,7 @@ bool VadFilter::initialize(std::string &error, size_t mode, size_t margin) { } this->_mode = mode; - this->_margin_frames = margin; + this->_margin_samples = margin; if(this->_channels > 1) { this->ensure_buffer(this->_frame_size * this->_channels * 4); /* buffer to merge the channels into one channel */ } else { @@ -99,10 +99,10 @@ bool VadFilter::process(const void *buffer) { auto flag_vad = result == 1; if(!flag_vad) { - this->_margin_processed_frames++; - return this->_margin_processed_frames <= this->_margin_frames; + this->_margin_processed_samples += this->_frame_size; + return this->_margin_processed_samples <= this->_margin_samples; } else { - this->_margin_processed_frames = 0; + this->_margin_processed_samples = 0; } return flag_vad; } \ No newline at end of file diff --git a/native/serverconnection/src/audio/filter/FilterVad.h b/native/serverconnection/src/audio/filter/FilterVad.h index c09a300..401a66c 100644 --- a/native/serverconnection/src/audio/filter/FilterVad.h +++ b/native/serverconnection/src/audio/filter/FilterVad.h @@ -15,16 +15,16 @@ namespace tc { bool initialize(std::string& /* error */, size_t /* mode */, size_t /* margin frames */); bool process(const void* /* buffer */) override; - inline size_t margin_frames() { return this->_margin_frames; } - inline void set_margin_frames(size_t value) { this->_margin_frames = value; } + inline float margin_release_time() { return (float) this->_margin_samples / (float) this->_sample_rate; } + inline void set_margin_release_time(float value) { this->_margin_samples = (size_t) ceil((float) this->_sample_rate * value); } inline size_t mode() { return this->_mode; } private: Fvad* _vad_handle = nullptr; size_t _mode = 0; - size_t _margin_frames = 0; - size_t _margin_processed_frames = 0; + size_t _margin_samples = 0; + size_t _margin_processed_samples = 0; std::mutex _buffer_lock; void* _buffer = nullptr; diff --git a/native/serverconnection/src/audio/js/AudioFilter.cpp b/native/serverconnection/src/audio/js/AudioFilter.cpp index 6994ed6..74ad139 100644 --- a/native/serverconnection/src/audio/js/AudioFilter.cpp +++ b/native/serverconnection/src/audio/js/AudioFilter.cpp @@ -16,8 +16,8 @@ NAN_MODULE_INIT(AudioFilterWrapper::Init) { Nan::SetPrototypeMethod(klass, "get_name", AudioFilterWrapper::_get_name); - Nan::SetPrototypeMethod(klass, "get_margin_frames", AudioFilterWrapper::_get_margin_frames); - Nan::SetPrototypeMethod(klass, "set_margin_frames", AudioFilterWrapper::_set_margin_frames); + Nan::SetPrototypeMethod(klass, "get_margin_time", AudioFilterWrapper::_get_margin_time); + Nan::SetPrototypeMethod(klass, "set_margin_time", AudioFilterWrapper::_set_margin_time); Nan::SetPrototypeMethod(klass, "get_level", AudioFilterWrapper::_get_level); @@ -102,7 +102,7 @@ NAN_METHOD(AudioFilterWrapper::_get_level) { } -NAN_METHOD(AudioFilterWrapper::_get_margin_frames) { +NAN_METHOD(AudioFilterWrapper::_get_margin_time) { auto handle = ObjectWrap::Unwrap(info.Holder()); if(!handle->_filter) { Nan::ThrowError("invalid handle"); @@ -112,16 +112,16 @@ NAN_METHOD(AudioFilterWrapper::_get_margin_frames) { auto vad_filter = dynamic_pointer_cast(handle->_filter); auto threshold_filter = dynamic_pointer_cast(handle->_filter); if(vad_filter) { - info.GetReturnValue().Set((int) vad_filter->margin_frames()); + info.GetReturnValue().Set((float) vad_filter->margin_release_time()); } else if(threshold_filter) { - info.GetReturnValue().Set((int) threshold_filter->margin_frames()); + info.GetReturnValue().Set((float) threshold_filter->margin_release_time()); } else { Nan::ThrowError("invalid handle"); return; } } -NAN_METHOD(AudioFilterWrapper::_set_margin_frames) { +NAN_METHOD(AudioFilterWrapper::_set_margin_time) { auto handle = ObjectWrap::Unwrap(info.Holder()); if(!handle->_filter) { Nan::ThrowError("invalid handle"); @@ -136,9 +136,9 @@ NAN_METHOD(AudioFilterWrapper::_set_margin_frames) { auto vad_filter = dynamic_pointer_cast(handle->_filter); auto threshold_filter = dynamic_pointer_cast(handle->_filter); if(vad_filter) { - vad_filter->set_margin_frames(info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0)); + vad_filter->set_margin_release_time(info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0)); } else if(threshold_filter) { - threshold_filter->set_margin_frames(info[0]->Int32Value(Nan::GetCurrentContext()).FromMaybe(0)); + threshold_filter->set_margin_release_time(info[0]->NumberValue(Nan::GetCurrentContext()).FromMaybe(0)); } else { Nan::ThrowError("invalid handle"); return; diff --git a/native/serverconnection/src/audio/js/AudioFilter.h b/native/serverconnection/src/audio/js/AudioFilter.h index 6c4202d..bda225f 100644 --- a/native/serverconnection/src/audio/js/AudioFilter.h +++ b/native/serverconnection/src/audio/js/AudioFilter.h @@ -32,8 +32,8 @@ namespace tc { static NAN_METHOD(_get_name); /* VAD and Threshold */ - static NAN_METHOD(_get_margin_frames); - static NAN_METHOD(_set_margin_frames); + static NAN_METHOD(_get_margin_time); + static NAN_METHOD(_set_margin_time); /* VAD relevant */ static NAN_METHOD(_get_level); diff --git a/native/serverconnection/src/connection/ProtocolHandler.cpp b/native/serverconnection/src/connection/ProtocolHandler.cpp index 3f73748..4598aae 100644 --- a/native/serverconnection/src/connection/ProtocolHandler.cpp +++ b/native/serverconnection/src/connection/ProtocolHandler.cpp @@ -36,6 +36,8 @@ void ProtocolHandler::reset() { { /* initialize pow handler */ this->pow.state = pow_state::COOKIE_SET; + this->pow.retry_count = 0; + this->pow.last_buffer = pipes::buffer{}; this->pow.last_resend = system_clock::time_point{}; this->pow.last_response = system_clock::time_point{}; diff --git a/native/serverconnection/src/connection/ProtocolHandler.h b/native/serverconnection/src/connection/ProtocolHandler.h index fbc6557..2a7860d 100644 --- a/native/serverconnection/src/connection/ProtocolHandler.h +++ b/native/serverconnection/src/connection/ProtocolHandler.h @@ -98,6 +98,7 @@ namespace tc { uint8_t disconnect_id = 0; struct { + size_t retry_count{0}; pow_state::value state; uint64_t client_ts3_build_timestamp = 173265950 /* TS3 */; /* needs to be lower than 173265950 for old stuff, else new protocol */ diff --git a/native/serverconnection/src/connection/ProtocolHandlerPOW.cpp b/native/serverconnection/src/connection/ProtocolHandlerPOW.cpp index b384f33..11f3e93 100644 --- a/native/serverconnection/src/connection/ProtocolHandlerPOW.cpp +++ b/native/serverconnection/src/connection/ProtocolHandlerPOW.cpp @@ -49,6 +49,14 @@ void ProtocolHandler::handlePacketInit(const std::shared_ptrpow.retry_count++; + if(this->pow.retry_count > 8) { + log_trace(category::connection, tr("[POW] Retied puzzle too many times. Aborting connect.")); + + this->handle->call_connect_result.call(this->handle->errors.register_error(tr("failed to solve connect puzzle")), true); + this->handle->close_connection(); + return; + } this->pow.state = pow_state::COOKIE_SET; /* next expected packet state */ this->pow_send_cookie_get(); return; @@ -130,7 +138,7 @@ void ProtocolHandler::handlePacketInit(const std::shared_ptrevent_write = event_new(this->io_base, this->file_descriptor, EV_WRITE, &UDPSocket::_callback_write, this); event_add(this->event_read, nullptr); - this->_io_thread = thread(&UDPSocket::_io_execute, this); + this->_io_thread = thread(&UDPSocket::_io_execute, this, this->io_base); #ifdef WIN32 //TODO set thread name #else @@ -71,38 +71,42 @@ bool UDPSocket::initialize() { } void UDPSocket::finalize() { + const auto is_event_thread = this_thread::get_id() == this->_io_thread.get_id(); if(this->file_descriptor == 0) return; unique_lock lock(this->io_lock); - auto event_read = this->event_read, event_write = this->event_write; - auto io_base = this->io_base; - this->io_base = nullptr; - this->event_read = nullptr; - this->event_write = nullptr; + auto event_read = std::exchange(this->event_read, nullptr); + auto event_write = std::exchange(this->event_write, nullptr); + auto io_base = std::exchange(this->io_base, nullptr); lock.unlock(); - assert(this_thread::get_id() != this->_io_thread.get_id()); - if(event_read) event_del_block(event_read); - if(event_write) event_del_block(event_write); - - if(io_base) { - timeval seconds{1, 0}; - event_base_loopexit(io_base, &seconds); - event_base_loopexit(io_base, nullptr); - } - - if(this->_io_thread.joinable()) - this->_io_thread.join(); + if(is_event_thread) { + if(event_read) event_del_block(event_read); + if(event_write) event_del_block(event_write); + } else { + if(event_read) event_del_noblock(event_read); + if(event_write) event_del_noblock(event_write); + } if(io_base) - event_base_free(io_base); + event_base_loopexit(io_base, nullptr); + + if(is_event_thread) { + event_base_loopexit(io_base, nullptr); + this->_io_thread.detach(); + } else { + event_base_loopexit(io_base, nullptr); + if(this->_io_thread.joinable()) + this->_io_thread.join(); + } #ifdef WIN32 - if(::closesocket(this->file_descriptor) != 0) { + const auto close_result = ::closesocket(this->file_descriptor); #else - if(::close(this->file_descriptor) != 0) { + const auto close_result = ::close(this->file_descriptor); #endif + if(close_result != 0) { if(errno != EBADF) logger::warn(category::socket, tr("Failed to close file descriptor ({}/{})"), to_string(errno), strerror(errno)); } @@ -117,15 +121,16 @@ void UDPSocket::_callback_read(evutil_socket_t fd, short, void *_ptr_socket) { ((UDPSocket*) _ptr_socket)->callback_read(fd); } -void UDPSocket::_io_execute(void *_ptr_socket) { - ((UDPSocket*) _ptr_socket)->io_execute(); +void UDPSocket::_io_execute(void *_ptr_socket, void* _ptr_event_base) { + ((UDPSocket*) _ptr_socket)->io_execute(_ptr_event_base); } -void UDPSocket::io_execute() { - while(this->io_base) { - event_base_loop(this->io_base, EVLOOP_NO_EXIT_ON_EMPTY); - } +void UDPSocket::io_execute(void* ptr_event_base) { + auto base = (event_base*) ptr_event_base; + event_base_loop(base, EVLOOP_NO_EXIT_ON_EMPTY); + /* this pointer might be dangling here! */ logger::trace(category::socket, tr("Socket IO loop exited")); + event_base_free(base); } void UDPSocket::callback_read(evutil_socket_t fd) { sockaddr source_address{}; @@ -135,41 +140,36 @@ void UDPSocket::callback_read(evutil_socket_t fd) { size_t buffer_length = 1600; /* IPv6 MTU is ~1.5k */ char buffer[1600]; - size_t read_count = 0; - while(true) { //TODO: Some kind of timeout - source_address_length = sizeof(sockaddr); - read_length = recvfrom(fd, (char*) buffer, (int) buffer_length, MSG_DONTWAIT, &source_address, &source_address_length); - if(read_length <= 0) { - if(read_length == 0 && read_count > 0) - break; - - int error; + source_address_length = sizeof(sockaddr); + read_length = recvfrom(fd, (char*) buffer, (int) buffer_length, MSG_DONTWAIT, &source_address, &source_address_length); + if(read_length <= 0) { + int error; #ifdef WIN32 - error = WSAGetLastError(); - if(error == WSAEWOULDBLOCK) - break; + error = WSAGetLastError(); + if(error == WSAEWOULDBLOCK) + return; #else - error = errno; - if(errno == EAGAIN) - break; + error = errno; + if(errno == EAGAIN) + return; #endif - logger::warn(category::socket, tr("Failed to receive data: {}"), error); - if(auto callback{this->on_fatal_error}; callback) - callback(1, error); + logger::warn(category::socket, tr("Failed to receive data: {}"), error); + { + std::lock_guard lock{this->io_lock}; + if(this->event_read) + event_del_noblock(this->event_read); + } - { - std::lock_guard lock{this->io_lock}; - if(this->event_read) - event_del_noblock(this->event_read); - } - break; /* this should never happen! */ - } - - //logger::trace(category::socket, tr("Read {} bytes"), read_length); - read_count++; - if(this->on_data) - this->on_data(pipes::buffer_view{buffer, (size_t) read_length}); + if(auto callback{this->on_fatal_error}; callback) + callback(1, error); + /* this pointer might be dangling now because we got deleted while handling this data */ + return; } + + //logger::trace(category::socket, tr("Read {} bytes"), read_length); + if(this->on_data) + this->on_data(pipes::buffer_view{buffer, (size_t) read_length}); + /* this pointer might be dangling now because we got deleted while handling this data */ } void UDPSocket::callback_write(evutil_socket_t fd) { diff --git a/native/serverconnection/src/connection/Socket.h b/native/serverconnection/src/connection/Socket.h index fb12bd7..141befb 100644 --- a/native/serverconnection/src/connection/Socket.h +++ b/native/serverconnection/src/connection/Socket.h @@ -34,11 +34,11 @@ namespace tc::connection { const std::thread& io_thread() { return this->_io_thread; } private: - static void _io_execute(void *_ptr_socket); + static void _io_execute(void *_ptr_socket, void *_ptr_event_base); static void _callback_read(evutil_socket_t, short, void*); static void _callback_write(evutil_socket_t, short, void*); - void io_execute(); + void io_execute(void*); void callback_read(evutil_socket_t); void callback_write(evutil_socket_t); diff --git a/native/serverconnection/test/js/main.ts b/native/serverconnection/test/js/main.ts index c16691c..21c85de 100644 --- a/native/serverconnection/test/js/main.ts +++ b/native/serverconnection/test/js/main.ts @@ -56,7 +56,7 @@ connection.callback_disconnect = reason => { console.log("Got disconnect: %s", reason); }; -const do_connect = () => { +const do_connect = (connection) => { connection.connect({ timeout: 5000, remote_port: 9987, @@ -150,7 +150,16 @@ const do_connect = () => { connection._voice_connection.register_client(7); }; -do_connect(); +do_connect(connection); +let _connections = []; +let i = 0; +let ii = setInterval(() => { + if(i++ > 35) + clearInterval(ii); + const c = handle.spawn_server_connection(); + _connections.push(c); + do_connect(c); +}, 500); connection.callback_voice_data = (buffer, client_id, codec_id, flag_head, packet_id) => { console.log("Received voice of length %d from client %d in codec %d (Head: %o | ID: %d)", buffer.byteLength, client_id, codec_id, flag_head, packet_id); @@ -167,6 +176,7 @@ setInterval(() => { /* keep the object alive */ setTimeout(() => { connection.connected(); + _connections.forEach(e => e.current_ping()); }, 1000); connection_list.push(connection); \ No newline at end of file