From d2ba1b4eee16022f201e5129bd1986a57049605f Mon Sep 17 00:00:00 2001 From: root Date: Tue, 17 Mar 2020 12:08:33 +0100 Subject: [PATCH] Totally fucked up... --- server/namespaces | 16 +- .../client/query/QueryClientConnection.cpp | 766 +++++++++--------- .../src/client/query/QueryClientConnection.h | 194 ++--- 3 files changed, 488 insertions(+), 488 deletions(-) diff --git a/server/namespaces b/server/namespaces index 6bf1fa4..3a17e37 100644 --- a/server/namespaces +++ b/server/namespaces @@ -1,9 +1,9 @@ -The general namespace prefix is ts:: - -TeaSpeak - Server: ts::server - Basic: ts::server - Sub-Server: - Query: ts::server::server::query - Voice: ts::server::server::udp - File: ts::server::server::file +The general namespace prefix is ts:: + +TeaSpeak - Server: ts::server + Basic: ts::server + Sub-Server: + Query: ts::server::server::query + Voice: ts::server::server::udp + File: ts::server::server::file Web: ts::server::server::web \ No newline at end of file diff --git a/server/src/client/query/QueryClientConnection.cpp b/server/src/client/query/QueryClientConnection.cpp index b95c565..267c228 100644 --- a/server/src/client/query/QueryClientConnection.cpp +++ b/server/src/client/query/QueryClientConnection.cpp @@ -1,384 +1,384 @@ -// -// Created by WolverinDEV on 11/03/2020. -// - -#include "./QueryClientConnection.h" - -#include -#include -#include -#include - -#include "./QueryClient.h" -#include "../ConnectedClient.h" - -#include "../../server/QueryServer.h" -#include "QueryClientConnection.h" - -using namespace ts::server::server::query; - -#if defined(TCP_CORK) && !defined(TCP_NOPUSH) - #define TCP_NOPUSH TCP_CORK -#endif - -namespace ts::server::server::query { - /* will be set by the event loop */ - thread_local bool thread_is_event_loop{false}; -} - -QueryClientConnection::QueryClientConnection(ts::server::QueryClient *client, int fd) : client_handle{client}, file_descriptor_{fd} { - TAILQ_INIT(&this->write_queue); -} - -QueryClientConnection::~QueryClientConnection() { - this->finalize(true); -} - -bool QueryClientConnection::initialize(std::string &error) { - assert(this->client_handle); - - int enabled{1}; - int disabled{0}; - setsockopt(this->file_descriptor_, SOL_SOCKET, SO_KEEPALIVE, &enabled, sizeof(enabled)); - if(setsockopt(this->file_descriptor_, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) - logError(LOG_QUERY, "Could not disable nopush for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this->client_handle), errno, strerror(errno)); - - if(setsockopt(this->file_descriptor_, IPPROTO_TCP, TCP_NODELAY, &enabled, sizeof enabled) < 0) - logError(LOG_QUERY, "[Query] Could not disable no delay for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this->client_handle), errno, strerror(errno)); - - auto query_server = this->client_handle->getQueryServer(); - this->readEvent = event_new(query_server->io_event_loop(), this->file_descriptor_, EV_READ | EV_PERSIST, [](int a1, short a2, void* _this) { - reinterpret_cast(_this)->handle_event_read(a1, a2); - }, this); - this->writeEvent = event_new(query_server->io_event_loop(), this->file_descriptor_, EV_WRITE, [](int a1, short a2, void* _this){ - reinterpret_cast(_this)->handle_event_write(a1, a2); - }, this); - - this->connection_state = ConnectionState::INITIALIZING; - - if(ts::config::query::sslMode == 0) { - this->connection_state = ConnectionState::CONNECTED; - this->connection_type_ = ConnectionType::PLAIN_TEXT; - - this->client_handle->handle_connection_initialized(); - } - return true; -} - -void QueryClientConnection::add_read_event() { - std::lock_guard elock{this->event_mutex}; - if(this->readEvent) event_add(this->readEvent, nullptr); -} - -void QueryClientConnection::finalize(bool is_destructor_call) { - auto old_state = this->connection_state; - this->connection_state = ConnectionState::DISCONNECTED; - - /* unregister event handling */ - { - std::unique_lock elock{this->event_mutex}; - auto wevent = std::exchange(this->writeEvent, nullptr); - auto revent = std::exchange(this->readEvent, nullptr); - elock.unlock(); - if(revent) { - if(thread_is_event_loop) - event_del_noblock(revent); - else - event_del_block(revent); /* may calls finalize() while we're waiting. But thats okey. */ - event_free(revent); - } - if(wevent) { - if(thread_is_event_loop) - event_del_noblock(wevent); - else - event_del_block(wevent); /* may calls finalize() while we're waiting. But thats okey. */ - event_free(wevent); - } - } - - { - std::lock_guard block{this->buffer_lock}; - - /* Free the entire tail queue. */ - while (auto buffer = TAILQ_FIRST(&this->write_queue)) { - TAILQ_REMOVE(&this->write_queue, buffer, tq); - free(buffer->original_ptr); - delete buffer; - } - TAILQ_INIT(&this->write_queue); /* just ensures a valid tailq */ - - ::free(this->read_buffer.buffer); - this->read_buffer.buffer = nullptr; - this->read_buffer.length = 0; - this->read_buffer.fill_count = 0; - } - - if(!is_destructor_call && old_state != ConnectionState::DISCONNECTED) - this->client_handle->handle_connection_finalized(); -} - -void QueryClientConnection::handle_event_read(int fd, short events) { - constexpr auto buffer_length{1024 * 4}; - uint8_t buffer[buffer_length]; - - auto length = read(fd, (void *) buffer, buffer_length); - if (length <= 0) { - if (errno == EINTR || errno == EAGAIN) - return; - else if (length == 0) { - logMessage(LOG_QUERY, "{} Connection closed (r). Client disconnected.", - CLIENT_STR_LOG_PREFIX_(this->client_handle)); - } else { - logError(LOG_QUERY, "{} Failed to read! Code: {} errno: {} message: {}", - CLIENT_STR_LOG_PREFIX_(this->client_handle), length, errno, strerror(errno)); - } - event_del_noblock(this->readEvent); - this->close_connection(std::chrono::system_clock::time_point{}); - return; - } - - if (this->connection_type_ == ConnectionType::PLAIN_TEXT) { - plain_text_buffer_insert: - this->handle_decoded_message(buffer, length); - } else if (this->connection_type_ == ConnectionType::SSL_ENCRYPTED) { - ssl_buffer_insert:; - this->ssl_handler.process_incoming_data(pipes::buffer_view{(const char*) buffer, (size_t) length});; - } else { - if (config::query::sslMode != 0 && pipes::SSL::isSSLHeader(std::string{(const char *) buffer, (size_t) length})) { - if(!this->initialize_ssl()) return; - - /* - * - Content - * \x16 - * -Version (1) - * \x03 \x00 - * - length (2) - * \x00 \x04 - * - * - Header - * \x00 -> hello request (3) - * \x05 -> length (4) - */ - - //this->writeRawMessage(string("\x16\x03\x01\x00\x05\x00\x00\x00\x00\x00", 10)); - goto ssl_buffer_insert; - } else { - this->connection_type_ = ConnectionType::PLAIN_TEXT; - this->client_handle->handle_connection_initialized(); - goto plain_text_buffer_insert; - } - } -} - -void QueryClientConnection::handle_event_write(int fd, short events) { - bool readd_write{false}; - if(events & EV_WRITE) { - /* Safe to access, because we're only reading the queue and the head could never change. Only within the IO loop itself. */ - WriteBuffer* wbuffer; - while((wbuffer = TAILQ_FIRST(&this->write_queue))) { - auto written = send(fd, wbuffer->ptr, wbuffer->length, 0); - if(written <= 0) { - if(errno == EAGAIN) { - readd_write = true; - break; - } - if(written == 0) { - logMessage(LOG_QUERY, "{} Connection closed (w). Client disconnected.", CLIENT_STR_LOG_PREFIX_(this->client_handle)); - } else { - logError(LOG_QUERY, "{} Failed to write! Code: {} errno: {} message: {}", CLIENT_STR_LOG_PREFIX_(this->client_handle), written, errno, strerror(errno)); - } - event_del_noblock(this->readEvent); - this->close_connection(std::chrono::system_clock::time_point{}); - return; - } - - wbuffer->length -= written; - if(wbuffer->length == 0) { - std::lock_guard block{this->buffer_lock}; - TAILQ_REMOVE(&this->write_queue, wbuffer, tq); - - ::free(wbuffer->original_ptr); - delete wbuffer; - } else { - wbuffer->ptr += written; - } - } - } - - if(this->connection_state == ConnectionState::DISCONNECTING) { - if(!readd_write || (events & EV_TIMEOUT)) { - /* disconnect timeouted or nothing more to write */ - this->finalize(false); - return; - } else /* if(readd_write) */ { /* check not needed because tested before already */ - auto time_left = this->disconnect_timeout - std::chrono::system_clock::now(); - timeval timeout{0, 1}; - if(time_left.count() > 0) { - timeout.tv_sec = std::chrono::floor(time_left).count(); - timeout.tv_usec = std::chrono::floor(time_left).count() % 1000000ULL; - } - event_add(this->writeEvent, &timeout); - } - } else if(readd_write) { - event_add(this->writeEvent, nullptr); - } -} - - -bool QueryClientConnection::initialize_ssl() { - this->connection_type_ = ConnectionType::SSL_ENCRYPTED; - - this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_OUT, true); - this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_IN, true); - - this->ssl_handler.callback_data([&](const pipes::buffer_view &buffer) { - this->handle_decoded_message(buffer.data_ptr(), buffer.length()); - }); - - this->ssl_handler.callback_write([&](const pipes::buffer_view &buffer) { - this->send_data_raw({buffer.data_ptr(), buffer.length()}); - }); - - this->ssl_handler.callback_initialized = [&] { - this->client_handle->handle_connection_initialized(); - }; - - this->ssl_handler.callback_error([&](int code, const std::string& message) { - if(code == PERROR_SSL_ACCEPT) { - logError(LOG_QUERY, "{} Failed to initialize query ssl session ({})", CLIENT_STR_LOG_PREFIX_(this->client_handle), message); - this->close_connection(std::chrono::system_clock::time_point{}); - } else if(code == PERROR_SSL_TIMEOUT) { - logError(LOG_QUERY, "{} Failed to initialize query ssl session (timeout: {})", CLIENT_STR_LOG_PREFIX_(this->client_handle), message); - this->close_connection(std::chrono::system_clock::time_point{}); - } else - logError(LOG_QUERY, "{} Received SSL error ({} | {})", CLIENT_STR_LOG_PREFIX_(this->client_handle), code, message); - }); - - { - auto context = serverInstance->sslManager()->getQueryContext(); - - auto options = std::make_shared(); - options->type = pipes::SSL::SERVER; - options->context_method = TLS_method(); - options->default_keypair({context->privateKey, context->certificate}); - if(!this->ssl_handler.initialize(options)) { - logError(LOG_QUERY, "[{}] Failed to setup ssl!", CLIENT_STR_LOG_PREFIX_(this->client_handle)); - this->close_connection(std::chrono::system_clock::time_point{}); - return false; - } - } - return true; -} - -void QueryClientConnection::handle_decoded_message(const void *buffer, size_t size) { - { - std::lock_guard block{this->buffer_lock}; - if((this->read_buffer.length - this->read_buffer.fill_count) < size) { /* !this->read_buffer.buffer is already implicitly implemented because by default read_buffer.length will be zero */ - const auto new_size{this->read_buffer.length + size + 128}; - auto new_buffer = ::malloc(new_size); - assert(new_buffer); - - if(this->read_buffer.fill_count) memcpy(new_buffer, this->read_buffer.buffer, this->read_buffer.fill_count); - ::free(this->read_buffer.buffer); - - this->read_buffer.buffer = new_buffer; - this->read_buffer.length = new_size; - } - assert(this->read_buffer.buffer); - assert(this->read_buffer.length - this->read_buffer.fill_count >= size); - - memcpy((char*) this->read_buffer.buffer + this->read_buffer.fill_count, buffer, size); - this->read_buffer.fill_count += size; - } - { - //TODO: Improve this command progress - auto qserver{this->client_handle->handle}; - if(qserver) { - auto wlock{this->client_handle->_this}; - qserver->executePool()->execute([wlock]() { - auto client{std::dynamic_pointer_cast(wlock.lock())}; - if(!client) return; - - int counter = 0; - while(client->process_next_command() && counter++ < 15); - }); - } - } -} - -void QueryClientConnection::send_data(const std::string_view &buffer) { - if(this->connection_type_ == ConnectionType::PLAIN_TEXT) - this->send_data_raw(buffer); - else if(this->connection_type_ == ConnectionType::SSL_ENCRYPTED) - this->ssl_handler.send(pipes::buffer_view{buffer.data(), buffer.length()}); -} - -void QueryClientConnection::send_data_raw(const std::string_view &buffer) { - auto wbuf = new WriteBuffer{}; - wbuf->original_ptr = (char*) malloc(buffer.length()); - wbuf->ptr = wbuf->original_ptr; - - memcpy(wbuf->ptr, buffer.data(), buffer.length()); - wbuf->length = buffer.length(); - - { - std::lock_guard wlock{this->buffer_lock}; - TAILQ_INSERT_TAIL(&this->write_queue, wbuf, tq); - } - - { - std::lock_guard elock{this->event_mutex}; - if(this->writeEvent) - event_add(this->writeEvent, nullptr); - } -} - -void QueryClientConnection::close_connection(const std::chrono::system_clock::time_point &timeout) { - if(timeout.time_since_epoch().count() > 0) { - this->connection_state = ConnectionState::DISCONNECTING; - this->disconnect_timeout = timeout; - - std::lock_guard elock{this->event_mutex}; - if(this->writeEvent) { - event_add(this->writeEvent, nullptr); - return; - } - /* failed to add the write event, so call disconnect */ - } - - if(this->connection_state == ConnectionState::DISCONNECTED) return; - this->finalize(false); -} - -void QueryClientConnection::enforce_text_connection() { - if(this->connection_state != ConnectionState::INITIALIZING) return; - - this->connection_state = ConnectionState::CONNECTED; - this->connection_type_ = ConnectionType::PLAIN_TEXT; - this->client_handle->handle_connection_initialized(); -} - -CommandAssembleState QueryClientConnection::next_command(std::string &result) { - std::lock_guard block{this->buffer_lock}; - - auto new_line_idx = (char*) memchr(this->read_buffer.buffer, '\n', this->read_buffer.fill_count); - if(!new_line_idx) return CommandAssembleState::NO_COMMAND_PENDING; - - const auto length = ((char*) this->read_buffer.buffer - new_line_idx) * sizeof(*new_line_idx); - auto line_length{length}; - if(length > 0 && *(new_line_idx - 1) == '\r') - line_length--; - - result.assign((char*) this->read_buffer.buffer, line_length); - - //Do not copy the \r character - auto copy_bytes{this->read_buffer.fill_count - length}; - if(copy_bytes > 0 && *(new_line_idx + 1) == '\r') { - copy_bytes--; - new_line_idx++; - } - memcpy(this->read_buffer.buffer, new_line_idx + 1, copy_bytes); - this->read_buffer.fill_count = copy_bytes; - - return copy_bytes == 0 ? CommandAssembleState::SUCCESS : CommandAssembleState::MORE_COMMANDS_PENDING; +// +// Created by WolverinDEV on 11/03/2020. +// + +#include "./QueryClientConnection.h" + +#include +#include +#include +#include + +#include "./QueryClient.h" +#include "../ConnectedClient.h" + +#include "../../server/QueryServer.h" +#include "QueryClientConnection.h" + +using namespace ts::server::server::query; + +#if defined(TCP_CORK) && !defined(TCP_NOPUSH) + #define TCP_NOPUSH TCP_CORK +#endif + +namespace ts::server::server::query { + /* will be set by the event loop */ + thread_local bool thread_is_event_loop{false}; +} + +QueryClientConnection::QueryClientConnection(ts::server::QueryClient *client, int fd) : client_handle{client}, file_descriptor_{fd} { + TAILQ_INIT(&this->write_queue); +} + +QueryClientConnection::~QueryClientConnection() { + this->finalize(true); +} + +bool QueryClientConnection::initialize(std::string &error) { + assert(this->client_handle); + + int enabled{1}; + int disabled{0}; + setsockopt(this->file_descriptor_, SOL_SOCKET, SO_KEEPALIVE, &enabled, sizeof(enabled)); + if(setsockopt(this->file_descriptor_, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) + logError(LOG_QUERY, "Could not disable nopush for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this->client_handle), errno, strerror(errno)); + + if(setsockopt(this->file_descriptor_, IPPROTO_TCP, TCP_NODELAY, &enabled, sizeof enabled) < 0) + logError(LOG_QUERY, "[Query] Could not disable no delay for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this->client_handle), errno, strerror(errno)); + + auto query_server = this->client_handle->getQueryServer(); + this->readEvent = event_new(query_server->io_event_loop(), this->file_descriptor_, EV_READ | EV_PERSIST, [](int a1, short a2, void* _this) { + reinterpret_cast(_this)->handle_event_read(a1, a2); + }, this); + this->writeEvent = event_new(query_server->io_event_loop(), this->file_descriptor_, EV_WRITE, [](int a1, short a2, void* _this){ + reinterpret_cast(_this)->handle_event_write(a1, a2); + }, this); + + this->connection_state = ConnectionState::INITIALIZING; + + if(ts::config::query::sslMode == 0) { + this->connection_state = ConnectionState::CONNECTED; + this->connection_type_ = ConnectionType::PLAIN_TEXT; + + this->client_handle->handle_connection_initialized(); + } + return true; +} + +void QueryClientConnection::add_read_event() { + std::lock_guard elock{this->event_mutex}; + if(this->readEvent) event_add(this->readEvent, nullptr); +} + +void QueryClientConnection::finalize(bool is_destructor_call) { + auto old_state = this->connection_state; + this->connection_state = ConnectionState::DISCONNECTED; + + /* unregister event handling */ + { + std::unique_lock elock{this->event_mutex}; + auto wevent = std::exchange(this->writeEvent, nullptr); + auto revent = std::exchange(this->readEvent, nullptr); + elock.unlock(); + if(revent) { + if(thread_is_event_loop) + event_del_noblock(revent); + else + event_del_block(revent); /* may calls finalize() while we're waiting. But thats okey. */ + event_free(revent); + } + if(wevent) { + if(thread_is_event_loop) + event_del_noblock(wevent); + else + event_del_block(wevent); /* may calls finalize() while we're waiting. But thats okey. */ + event_free(wevent); + } + } + + { + std::lock_guard block{this->buffer_lock}; + + /* Free the entire tail queue. */ + while (auto buffer = TAILQ_FIRST(&this->write_queue)) { + TAILQ_REMOVE(&this->write_queue, buffer, tq); + free(buffer->original_ptr); + delete buffer; + } + TAILQ_INIT(&this->write_queue); /* just ensures a valid tailq */ + + ::free(this->read_buffer.buffer); + this->read_buffer.buffer = nullptr; + this->read_buffer.length = 0; + this->read_buffer.fill_count = 0; + } + + if(!is_destructor_call && old_state != ConnectionState::DISCONNECTED) + this->client_handle->handle_connection_finalized(); +} + +void QueryClientConnection::handle_event_read(int fd, short events) { + constexpr auto buffer_length{1024 * 4}; + uint8_t buffer[buffer_length]; + + auto length = read(fd, (void *) buffer, buffer_length); + if (length <= 0) { + if (errno == EINTR || errno == EAGAIN) + return; + else if (length == 0) { + logMessage(LOG_QUERY, "{} Connection closed (r). Client disconnected.", + CLIENT_STR_LOG_PREFIX_(this->client_handle)); + } else { + logError(LOG_QUERY, "{} Failed to read! Code: {} errno: {} message: {}", + CLIENT_STR_LOG_PREFIX_(this->client_handle), length, errno, strerror(errno)); + } + event_del_noblock(this->readEvent); + this->close_connection(std::chrono::system_clock::time_point{}); + return; + } + + if (this->connection_type_ == ConnectionType::PLAIN_TEXT) { + plain_text_buffer_insert: + this->handle_decoded_message(buffer, length); + } else if (this->connection_type_ == ConnectionType::SSL_ENCRYPTED) { + ssl_buffer_insert:; + this->ssl_handler.process_incoming_data(pipes::buffer_view{(const char*) buffer, (size_t) length});; + } else { + if (config::query::sslMode != 0 && pipes::SSL::isSSLHeader(std::string{(const char *) buffer, (size_t) length})) { + if(!this->initialize_ssl()) return; + + /* + * - Content + * \x16 + * -Version (1) + * \x03 \x00 + * - length (2) + * \x00 \x04 + * + * - Header + * \x00 -> hello request (3) + * \x05 -> length (4) + */ + + //this->writeRawMessage(string("\x16\x03\x01\x00\x05\x00\x00\x00\x00\x00", 10)); + goto ssl_buffer_insert; + } else { + this->connection_type_ = ConnectionType::PLAIN_TEXT; + this->client_handle->handle_connection_initialized(); + goto plain_text_buffer_insert; + } + } +} + +void QueryClientConnection::handle_event_write(int fd, short events) { + bool readd_write{false}; + if(events & EV_WRITE) { + /* Safe to access, because we're only reading the queue and the head could never change. Only within the IO loop itself. */ + WriteBuffer* wbuffer; + while((wbuffer = TAILQ_FIRST(&this->write_queue))) { + auto written = send(fd, wbuffer->ptr, wbuffer->length, 0); + if(written <= 0) { + if(errno == EAGAIN) { + readd_write = true; + break; + } + if(written == 0) { + logMessage(LOG_QUERY, "{} Connection closed (w). Client disconnected.", CLIENT_STR_LOG_PREFIX_(this->client_handle)); + } else { + logError(LOG_QUERY, "{} Failed to write! Code: {} errno: {} message: {}", CLIENT_STR_LOG_PREFIX_(this->client_handle), written, errno, strerror(errno)); + } + event_del_noblock(this->readEvent); + this->close_connection(std::chrono::system_clock::time_point{}); + return; + } + + wbuffer->length -= written; + if(wbuffer->length == 0) { + std::lock_guard block{this->buffer_lock}; + TAILQ_REMOVE(&this->write_queue, wbuffer, tq); + + ::free(wbuffer->original_ptr); + delete wbuffer; + } else { + wbuffer->ptr += written; + } + } + } + + if(this->connection_state == ConnectionState::DISCONNECTING) { + if(!readd_write || (events & EV_TIMEOUT)) { + /* disconnect timeouted or nothing more to write */ + this->finalize(false); + return; + } else /* if(readd_write) */ { /* check not needed because tested before already */ + auto time_left = this->disconnect_timeout - std::chrono::system_clock::now(); + timeval timeout{0, 1}; + if(time_left.count() > 0) { + timeout.tv_sec = std::chrono::floor(time_left).count(); + timeout.tv_usec = std::chrono::floor(time_left).count() % 1000000ULL; + } + event_add(this->writeEvent, &timeout); + } + } else if(readd_write) { + event_add(this->writeEvent, nullptr); + } +} + + +bool QueryClientConnection::initialize_ssl() { + this->connection_type_ = ConnectionType::SSL_ENCRYPTED; + + this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_OUT, true); + this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_IN, true); + + this->ssl_handler.callback_data([&](const pipes::buffer_view &buffer) { + this->handle_decoded_message(buffer.data_ptr(), buffer.length()); + }); + + this->ssl_handler.callback_write([&](const pipes::buffer_view &buffer) { + this->send_data_raw({buffer.data_ptr(), buffer.length()}); + }); + + this->ssl_handler.callback_initialized = [&] { + this->client_handle->handle_connection_initialized(); + }; + + this->ssl_handler.callback_error([&](int code, const std::string& message) { + if(code == PERROR_SSL_ACCEPT) { + logError(LOG_QUERY, "{} Failed to initialize query ssl session ({})", CLIENT_STR_LOG_PREFIX_(this->client_handle), message); + this->close_connection(std::chrono::system_clock::time_point{}); + } else if(code == PERROR_SSL_TIMEOUT) { + logError(LOG_QUERY, "{} Failed to initialize query ssl session (timeout: {})", CLIENT_STR_LOG_PREFIX_(this->client_handle), message); + this->close_connection(std::chrono::system_clock::time_point{}); + } else + logError(LOG_QUERY, "{} Received SSL error ({} | {})", CLIENT_STR_LOG_PREFIX_(this->client_handle), code, message); + }); + + { + auto context = serverInstance->sslManager()->getQueryContext(); + + auto options = std::make_shared(); + options->type = pipes::SSL::SERVER; + options->context_method = TLS_method(); + options->default_keypair({context->privateKey, context->certificate}); + if(!this->ssl_handler.initialize(options)) { + logError(LOG_QUERY, "[{}] Failed to setup ssl!", CLIENT_STR_LOG_PREFIX_(this->client_handle)); + this->close_connection(std::chrono::system_clock::time_point{}); + return false; + } + } + return true; +} + +void QueryClientConnection::handle_decoded_message(const void *buffer, size_t size) { + { + std::lock_guard block{this->buffer_lock}; + if((this->read_buffer.length - this->read_buffer.fill_count) < size) { /* !this->read_buffer.buffer is already implicitly implemented because by default read_buffer.length will be zero */ + const auto new_size{this->read_buffer.length + size + 128}; + auto new_buffer = ::malloc(new_size); + assert(new_buffer); + + if(this->read_buffer.fill_count) memcpy(new_buffer, this->read_buffer.buffer, this->read_buffer.fill_count); + ::free(this->read_buffer.buffer); + + this->read_buffer.buffer = new_buffer; + this->read_buffer.length = new_size; + } + assert(this->read_buffer.buffer); + assert(this->read_buffer.length - this->read_buffer.fill_count >= size); + + memcpy((char*) this->read_buffer.buffer + this->read_buffer.fill_count, buffer, size); + this->read_buffer.fill_count += size; + } + { + //TODO: Improve this command progress + auto qserver{this->client_handle->handle}; + if(qserver) { + auto wlock{this->client_handle->_this}; + qserver->executePool()->execute([wlock]() { + auto client{std::dynamic_pointer_cast(wlock.lock())}; + if(!client) return; + + int counter = 0; + while(client->process_next_command() && counter++ < 15); + }); + } + } +} + +void QueryClientConnection::send_data(const std::string_view &buffer) { + if(this->connection_type_ == ConnectionType::PLAIN_TEXT) + this->send_data_raw(buffer); + else if(this->connection_type_ == ConnectionType::SSL_ENCRYPTED) + this->ssl_handler.send(pipes::buffer_view{buffer.data(), buffer.length()}); +} + +void QueryClientConnection::send_data_raw(const std::string_view &buffer) { + auto wbuf = new WriteBuffer{}; + wbuf->original_ptr = (char*) malloc(buffer.length()); + wbuf->ptr = wbuf->original_ptr; + + memcpy(wbuf->ptr, buffer.data(), buffer.length()); + wbuf->length = buffer.length(); + + { + std::lock_guard wlock{this->buffer_lock}; + TAILQ_INSERT_TAIL(&this->write_queue, wbuf, tq); + } + + { + std::lock_guard elock{this->event_mutex}; + if(this->writeEvent) + event_add(this->writeEvent, nullptr); + } +} + +void QueryClientConnection::close_connection(const std::chrono::system_clock::time_point &timeout) { + if(timeout.time_since_epoch().count() > 0) { + this->connection_state = ConnectionState::DISCONNECTING; + this->disconnect_timeout = timeout; + + std::lock_guard elock{this->event_mutex}; + if(this->writeEvent) { + event_add(this->writeEvent, nullptr); + return; + } + /* failed to add the write event, so call disconnect */ + } + + if(this->connection_state == ConnectionState::DISCONNECTED) return; + this->finalize(false); +} + +void QueryClientConnection::enforce_text_connection() { + if(this->connection_state != ConnectionState::INITIALIZING) return; + + this->connection_state = ConnectionState::CONNECTED; + this->connection_type_ = ConnectionType::PLAIN_TEXT; + this->client_handle->handle_connection_initialized(); +} + +CommandAssembleState QueryClientConnection::next_command(std::string &result) { + std::lock_guard block{this->buffer_lock}; + + auto new_line_idx = (char*) memchr(this->read_buffer.buffer, '\n', this->read_buffer.fill_count); + if(!new_line_idx) return CommandAssembleState::NO_COMMAND_PENDING; + + const auto length = ((char*) this->read_buffer.buffer - new_line_idx) * sizeof(*new_line_idx); + auto line_length{length}; + if(length > 0 && *(new_line_idx - 1) == '\r') + line_length--; + + result.assign((char*) this->read_buffer.buffer, line_length); + + //Do not copy the \r character + auto copy_bytes{this->read_buffer.fill_count - length}; + if(copy_bytes > 0 && *(new_line_idx + 1) == '\r') { + copy_bytes--; + new_line_idx++; + } + memcpy(this->read_buffer.buffer, new_line_idx + 1, copy_bytes); + this->read_buffer.fill_count = copy_bytes; + + return copy_bytes == 0 ? CommandAssembleState::SUCCESS : CommandAssembleState::MORE_COMMANDS_PENDING; } \ No newline at end of file diff --git a/server/src/client/query/QueryClientConnection.h b/server/src/client/query/QueryClientConnection.h index 90a4382..ca2e857 100644 --- a/server/src/client/query/QueryClientConnection.h +++ b/server/src/client/query/QueryClientConnection.h @@ -1,98 +1,98 @@ -#pragma once - -#include -#include -#include -#include - -#include -#include - -namespace ts::server { - class QueryClient; -} - -namespace ts::server::server::query { - enum struct ConnectionType { - UNKNOWN, - - PLAIN_TEXT, - SSL_ENCRYPTED, - - /* SSH */ - }; - - enum struct ConnectionState { - INITIALIZING, - CONNECTED, - DISCONNECTING, - DISCONNECTED - }; - - enum struct CommandAssembleState { - SUCCESS, - MORE_COMMANDS_PENDING, - - NO_COMMAND_PENDING - }; - - class QueryClientConnection { - public: - explicit QueryClientConnection(QueryClient* /* client */, int /* file descriptor */); - ~QueryClientConnection(); - - [[nodiscard]] inline ConnectionType connection_type() const { return this->connection_type_; } - - bool initialize(std::string& /* error */); - void add_read_event(); - - void finalize(bool /* is destructor call */); - - void send_data(const std::string_view& /* payload */); - void send_data_raw(const std::string_view& /* payload */); - - void enforce_text_connection(); - [[nodiscard]] CommandAssembleState next_command(std::string& /* command */); - - /* could be called from every thread (event IO thread) */ - void close_connection(const std::chrono::system_clock::time_point& /* disconnect timeout */); - private: - struct WriteBuffer { - char* original_ptr; - char* ptr; - size_t length; - - TAILQ_ENTRY(WriteBuffer) tq; - }; - - QueryClient* client_handle{nullptr}; - ConnectionState connection_state{ConnectionState::INITIALIZING}; - std::chrono::system_clock::time_point disconnect_timeout{}; - - ConnectionType connection_type_{ConnectionType::UNKNOWN}; - int file_descriptor_{-1}; - - /* only delete the events within the event loop! */ - std::mutex event_mutex{}; - ::event* readEvent{nullptr}; - ::event* writeEvent{nullptr}; - - pipes::SSL ssl_handler{}; - - std::mutex buffer_lock{}; - struct { - void* buffer{nullptr}; - size_t length{0}; - size_t fill_count{0}; - std::chrono::system_clock::time_point last_shrink{}; - } read_buffer; - TAILQ_HEAD(, WriteBuffer) write_queue{}; - - void handle_event_write(int, short); - void handle_event_read(int, short); - - - bool initialize_ssl(); - void handle_decoded_message(const void* /* message */, size_t /* length */); - }; +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace ts::server { + class QueryClient; +} + +namespace ts::server::server::query { + enum struct ConnectionType { + UNKNOWN, + + PLAIN_TEXT, + SSL_ENCRYPTED, + + /* SSH */ + }; + + enum struct ConnectionState { + INITIALIZING, + CONNECTED, + DISCONNECTING, + DISCONNECTED + }; + + enum struct CommandAssembleState { + SUCCESS, + MORE_COMMANDS_PENDING, + + NO_COMMAND_PENDING + }; + + class QueryClientConnection { + public: + explicit QueryClientConnection(QueryClient* /* client */, int /* file descriptor */); + ~QueryClientConnection(); + + [[nodiscard]] inline ConnectionType connection_type() const { return this->connection_type_; } + + bool initialize(std::string& /* error */); + void add_read_event(); + + void finalize(bool /* is destructor call */); + + void send_data(const std::string_view& /* payload */); + void send_data_raw(const std::string_view& /* payload */); + + void enforce_text_connection(); + [[nodiscard]] CommandAssembleState next_command(std::string& /* command */); + + /* could be called from every thread (event IO thread) */ + void close_connection(const std::chrono::system_clock::time_point& /* disconnect timeout */); + private: + struct WriteBuffer { + char* original_ptr; + char* ptr; + size_t length; + + TAILQ_ENTRY(WriteBuffer) tq; + }; + + QueryClient* client_handle{nullptr}; + ConnectionState connection_state{ConnectionState::INITIALIZING}; + std::chrono::system_clock::time_point disconnect_timeout{}; + + ConnectionType connection_type_{ConnectionType::UNKNOWN}; + int file_descriptor_{-1}; + + /* only delete the events within the event loop! */ + std::mutex event_mutex{}; + ::event* readEvent{nullptr}; + ::event* writeEvent{nullptr}; + + pipes::SSL ssl_handler{}; + + std::mutex buffer_lock{}; + struct { + void* buffer{nullptr}; + size_t length{0}; + size_t fill_count{0}; + std::chrono::system_clock::time_point last_shrink{}; + } read_buffer; + TAILQ_HEAD(, WriteBuffer) write_queue{}; + + void handle_event_write(int, short); + void handle_event_read(int, short); + + + bool initialize_ssl(); + void handle_decoded_message(const void* /* message */, size_t /* length */); + }; } \ No newline at end of file