From d4d6978d5fce58dd26546cd84a874ef3d2723cc6 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sun, 21 Jul 2019 10:43:26 +0200 Subject: [PATCH] Fle server & Query server improvements --- server/src/Configuration.cpp | 8 +- server/src/InstanceHandler.cpp | 97 +++- server/src/TSServer.cpp | 21 +- .../client/ConnectedClientCommandHandler.cpp | 41 +- server/src/client/file/FileClient.cpp | 10 +- server/src/client/file/FileClient.h | 2 +- server/src/client/file/FileClientIO.cpp | 4 +- server/src/client/web/WebClient.cpp | 42 ++ server/src/client/web/WebClient.h | 13 +- server/src/manager/ConversationManager.cpp | 440 ++++++++++++++---- server/src/manager/ConversationManager.h | 11 +- server/src/server/QueryServer.cpp | 370 +++++++++++---- server/src/server/QueryServer.h | 26 +- server/src/server/file/FileServer.cpp | 299 +++++++++--- server/src/server/file/FileServer.h | 28 +- server/src/terminal/CommandHandler.cpp | 36 ++ server/src/terminal/CommandHandler.h | 1 + shared | 2 +- 18 files changed, 1138 insertions(+), 313 deletions(-) diff --git a/server/src/Configuration.cpp b/server/src/Configuration.cpp index 086fdf5..d63c9dd 100644 --- a/server/src/Configuration.cpp +++ b/server/src/Configuration.cpp @@ -963,8 +963,8 @@ std::deque> config::create_bindings() { } { CREATE_BINDING("host", 0); - BIND_STRING(config::binding::DefaultQueryHost, "0.0.0.0"); - ADD_NOTE("Multibinding like the voice server isnt supported yet!"); + BIND_STRING(config::binding::DefaultQueryHost, "0.0.0.0,[::]"); + ADD_NOTE("Multibinding supported here! Host delimiter is \",\""); } } { @@ -975,8 +975,8 @@ std::deque> config::create_bindings() { } { CREATE_BINDING("host", 0); - BIND_STRING(config::binding::DefaultFileHost, "0.0.0.0"); - ADD_NOTE("Multibinding like the voice server isnt supported yet!"); + BIND_STRING(config::binding::DefaultFileHost, "0.0.0.0,[::]"); + ADD_NOTE("Multibinding supported here! Host delimiter is \",\""); } } } diff --git a/server/src/InstanceHandler.cpp b/server/src/InstanceHandler.cpp index 8f36bd2..034ac06 100644 --- a/server/src/InstanceHandler.cpp +++ b/server/src/InstanceHandler.cpp @@ -212,6 +212,17 @@ InstanceHandler::~InstanceHandler() { tick_manager = nullptr; } +inline string strip(std::string message) { + while(!message.empty()) { + if(message[0] == ' ') + message = message.substr(1); + else if(message[message.length() - 1] == ' ') + message = message.substr(0, message.length() - 1); + else break; + } + return message; +} + inline sockaddr_in* resolveAddress(const string& host, uint16_t port) { hostent* record = gethostbyname(host.c_str()); if (!record) { @@ -225,12 +236,24 @@ inline sockaddr_in* resolveAddress(const string& host, uint16_t port) { return addr; } +inline vector split_hosts(const std::string& message, char delimiter) { + vector result; + size_t found, index = 0; + do { + found = message.find(delimiter, index); + result.push_back(strip(message.substr(index, found - index))); + index = found + 1; + } while(index != 0); + return result; +} + bool InstanceHandler::startInstance() { if (this->active) return false; active = true; this->web_list->enabled = ts::config::server::enable_teamspeak_weblist; + string errorMessage; this->sslMgr = new ssl::SSLManager(); if(!this->sslMgr->initialize()) { logCritical("Failed to initialize ssl manager."); @@ -243,21 +266,32 @@ bool InstanceHandler::startInstance() { return false; } - //Startup file server - sockaddr_in *fAddr = resolveAddress(this->properties()[property::SERVERINSTANCE_FILETRANSFER_HOST].as(), this->properties()[property::SERVERINSTANCE_FILETRANSFER_PORT].as()); - if (!fAddr) { - logCritical(LOG_FT, "Could not resolve file server host"); - return false; - } - logMessage(LOG_FT, "Starting server on {}:{}", inet_ntoa(fAddr->sin_addr), ntohs(fAddr->sin_port)); - fileServer = new ts::server::FileServer(); - if (!fileServer->start(*fAddr)) { - logCritical(LOG_FT, "Failed to start file server."); - delete fAddr; - return false; + { + auto bindings_string = this->properties()[property::SERVERINSTANCE_FILETRANSFER_HOST].as(); + auto port = this->properties()[property::SERVERINSTANCE_FILETRANSFER_PORT].as(); + auto ft_bindings = net::resolve_bindings(bindings_string, port); + deque> bindings; + + for(auto& binding : ft_bindings) { + if(!get<2>(binding).empty()) { + logError(LOG_FT, "Failed to resolve binding for {}: {}", get<0>(binding), get<2>(binding)); + continue; + } + auto entry = make_shared(); + memcpy(&entry->address, &get<1>(binding), sizeof(sockaddr_storage)); + + entry->file_descriptor = -1; + entry->event_accept = nullptr; + bindings.push_back(entry); + } + + logMessage(LOG_FT, "Starting server on {}:{}", bindings_string, port); + if(!fileServer->start(bindings, errorMessage)) { + logCritical(LOG_FT, "Failed to start server: {}", errorMessage); + return false; + } } - delete fAddr; if(config::query::sslMode > 0) { string error; @@ -271,7 +305,6 @@ bool InstanceHandler::startInstance() { } } - string errorMessage; queryServer = new ts::server::QueryServer(this->getSql()); { auto server_query = queryServer->find_query_account_by_name("serveradmin"); @@ -288,18 +321,32 @@ bool InstanceHandler::startInstance() { } } } - sockaddr_in *qAddr = resolveAddress(this->properties()[property::SERVERINSTANCE_QUERY_HOST].as(), this->properties()[property::SERVERINSTANCE_QUERY_PORT].as()); - if (!qAddr) { - logCritical(LOG_QUERY, "Could not resolve query server host"); - return false; + + { + auto query_bindings_string = this->properties()[property::SERVERINSTANCE_QUERY_HOST].as(); + auto query_port = this->properties()[property::SERVERINSTANCE_QUERY_PORT].as(); + auto query_bindings = net::resolve_bindings(query_bindings_string, query_port); + deque> bindings; + + for(auto& binding : query_bindings) { + if(!get<2>(binding).empty()) { + logError(LOG_QUERY, "Failed to resolve binding for {}: {}", get<0>(binding), get<2>(binding)); + continue; + } + auto entry = make_shared(); + memcpy(&entry->address, &get<1>(binding), sizeof(sockaddr_storage)); + + entry->file_descriptor = -1; + entry->event_accept = nullptr; + bindings.push_back(entry); + } + + logMessage(LOG_QUERY, "Starting server on {}:{}", query_bindings_string, query_port); + if(!queryServer->start(bindings, errorMessage)) { + logCritical(LOG_QUERY, "Failed to start query server: {}", errorMessage); + return false; + } } - logMessage(LOG_QUERY, "Starting server on {}:{}", inet_ntoa(qAddr->sin_addr), ntohs(qAddr->sin_port)); - if (!queryServer->start(*qAddr, errorMessage)) { - logCritical(LOG_QUERY, "Could not start Query server.\nMessage: " + errorMessage); - delete qAddr; - return false; - } - delete qAddr; #ifdef COMPILE_WEB_CLIENT if(config::web::activated) { diff --git a/server/src/TSServer.cpp b/server/src/TSServer.cpp index 6fd76ff..21ffcd2 100644 --- a/server/src/TSServer.cpp +++ b/server/src/TSServer.cpp @@ -11,9 +11,10 @@ #include #include "weblist/WebListManager.h" -#include "client/voice/VoiceClient.h" -#include "client/InternalClient.h" -#include "client/music/MusicClient.h" +#include "./client/web/WebClient.h" +#include "./client/voice/VoiceClient.h" +#include "./client/InternalClient.h" +#include "./client/music/MusicClient.h" #include "music/MusicBotManager.h" #include "server/VoiceServer.h" #include "server/file/FileServer.h" @@ -869,7 +870,8 @@ vector(data); }), server_group_data.end()); logTrace(this->serverId, "[Permission] Found negate flag within server groups. Groups left: {}", server_group_data.size()); - sassert(!server_group_data.empty()); /* this should never happen! */ + if(server_group_data.empty()) + logTrace(this->serverId, "[Permission] After non negated groups have been kicked out the negated groups are empty! This should not happen! Permission: {}, Client ID: {}", permission_type, client_dbid); permission::PermissionValue current_lowest = 0; for(auto& group : server_group_data) { if(!active_server_group || (std::get<3>(group) < current_lowest && std::get<3>(group) != -1)) { @@ -1035,9 +1037,14 @@ float TSServer::averagePing() { float sum = 0; this->forEachClient([&count, &sum](shared_ptr client) { - if(client->getType() != ClientType::CLIENT_TEAMSPEAK) return; - count++; - sum += duration_cast(dynamic_pointer_cast(client)->calculatePing()).count(); + auto type = client->getType(); + if(type == ClientType::CLIENT_TEAMSPEAK || type == ClientType::CLIENT_TEASPEAK) { + count++; + sum += duration_cast(dynamic_pointer_cast(client)->calculatePing()).count(); + } else if(type == ClientType::CLIENT_WEB) { + count++; + sum += duration_cast(dynamic_pointer_cast(client)->client_ping()).count(); + } }); if(count == 0) return 0; diff --git a/server/src/client/ConnectedClientCommandHandler.cpp b/server/src/client/ConnectedClientCommandHandler.cpp index 3329ac3..17ca6f0 100644 --- a/server/src/client/ConnectedClientCommandHandler.cpp +++ b/server/src/client/ConnectedClientCommandHandler.cpp @@ -3471,9 +3471,23 @@ CommandResult ConnectedClient::handleCommandFTInitUpload(Command &cmd) { Command result(this->getExternalType() == CLIENT_TEAMSPEAK ? "notifystartupload" : ""); result["clientftfid"] = cmd["clientftfid"].as(); result["ftkey"] = key->key; - result["port"] = ntohs(serverInstance->getFileServer()->boundedAddress()->sin_port); - if(serverInstance->getFileServer()->boundedAddress()->sin_addr.s_addr != 0) - result["ip"] = inet_ntoa(serverInstance->getFileServer()->boundedAddress()->sin_addr) + string(","); + + auto bindings = serverInstance->getFileServer()->list_bindings(); + if(!bindings.empty()) { + result["port"] = net::port(bindings[0]->address); + string ip = ""; + for(auto& entry : bindings) { + if(net::is_anybind(entry->address)) { + ip = ""; + break; + } + ip += net::to_string(entry->address, false) + ","; + } + if(!ip.empty()) + result["ip"] = ip; + } else { + return {findError("server_is_not_running"), "file server is not bound to any address"}; + } result["seekpos"] = 0; result["proto"] = 1; result["serverftfid"] = key->key_id; //TODO generate! @@ -3549,9 +3563,24 @@ CommandResult ConnectedClient::handleCommandFTInitDownload(Command &cmd) { result["proto"] = 1; result["serverftfid"] = key->key_id; result["ftkey"] = key->key; - result["port"] = ntohs(serverInstance->getFileServer()->boundedAddress()->sin_port); - if(serverInstance->getFileServer()->boundedAddress()->sin_addr.s_addr != 0) - result["ip"] = inet_ntoa(serverInstance->getFileServer()->boundedAddress()->sin_addr) + string(","); + + auto bindings = serverInstance->getFileServer()->list_bindings(); + if(!bindings.empty()) { + result["port"] = net::port(bindings[0]->address); + string ip = ""; + for(auto& entry : bindings) { + if(net::is_anybind(entry->address)) { + ip = ""; + break; + } + ip += net::to_string(entry->address, false) + ","; + } + if(!ip.empty()) + result["ip"] = ip; + } else { + return {findError("server_is_not_running"), "file server is not bound to any address"}; + } + result["size"] = key->size; this->sendCommand(result); diff --git a/server/src/client/file/FileClient.cpp b/server/src/client/file/FileClient.cpp index af03c56..c6659b6 100644 --- a/server/src/client/file/FileClient.cpp +++ b/server/src/client/file/FileClient.cpp @@ -101,7 +101,6 @@ size_t FileClient::used_bandwidth() { } std::string FileClient::client_prefix() { - auto ip_address = net::to_string(this->remoteAddress.sin_addr); bool hide_ip = config::server::disable_ip_saving; if(!hide_ip) { auto client = this->client; @@ -112,10 +111,13 @@ std::string FileClient::client_prefix() { } } } + std::string address = ""; if(hide_ip) - ip_address = "X.X.X.X"; - if(this->client) return "[" + to_string(this->client->getServerId()) + "|" + ip_address + ":" + to_string(htons(this->remoteAddress.sin_port)) + "| " + this->client->getDisplayName() + "]"; - return "[0|" + ip_address + ":" + to_string(htons(this->remoteAddress.sin_port)) + "|unconnected]"; + address = "X.X.X.X:" + to_string(net::port(this->remote_address)); + else + address = net::to_string(this->remote_address); + if(this->client) return "[" + to_string(this->client->getServerId()) + "|" + address + "| " + this->client->getDisplayName() + "]"; + return "[0|" + address + "|unconnected]"; } size_t FileClient::transferred_bytes() { diff --git a/server/src/client/file/FileClient.h b/server/src/client/file/FileClient.h index d2025b4..7b44893 100644 --- a/server/src/client/file/FileClient.h +++ b/server/src/client/file/FileClient.h @@ -89,7 +89,7 @@ namespace ts { std::recursive_mutex bandwidth_lock; std::deque> bandwidth; - sockaddr_in remoteAddress; + sockaddr_storage remote_address; int clientFd; bool event_read_hold = false; diff --git a/server/src/client/file/FileClientIO.cpp b/server/src/client/file/FileClientIO.cpp index 8700b36..9505dd8 100644 --- a/server/src/client/file/FileClientIO.cpp +++ b/server/src/client/file/FileClientIO.cpp @@ -166,9 +166,9 @@ void FileClient::handleMessageRead(int fd, short, void *) { } if(this->state_connection == C_CONNECTED) { if(this->state_transfer == T_TRANSFER) - logError(LOG_FT, "{} Transfer hang up. Remote peer closed the connection.", this->client_prefix()); + logWarning(LOG_FT, "{} Transfer hang up. Remote peer closed the connection.", this->client_prefix()); else - logTrace(LOG_FT, "{} Received notification that the remote peer has closed the connection", this->client_prefix()); + logMessage(LOG_FT, "{} Remote peer has closed the connection before initializing a transfer.", this->client_prefix()); self->disconnect(seconds(3)); } return; diff --git a/server/src/client/web/WebClient.cpp b/server/src/client/web/WebClient.cpp index a26292e..c7018ad 100644 --- a/server/src/client/web/WebClient.cpp +++ b/server/src/client/web/WebClient.cpp @@ -262,6 +262,18 @@ void WebClient::tick(const std::chrono::system_clock::time_point& point) { this->ws_handler.send({pipes::PING, {buffer, 2}}); } } + if(this->js_ping.last_request + seconds(1) < point) { + if(this->js_ping.last_response > this->js_ping.last_request || this->js_ping.last_request + this->js_ping.timeout < point) { + this->js_ping.current_id++; + this->js_ping.last_request = point; + + Json::Value jsonCandidate; + jsonCandidate["type"] = "ping"; + jsonCandidate["payload"] = to_string(this->js_ping.current_id); + + this->sendJson(jsonCandidate); + } + } } void WebClient::onWSConnected() { @@ -554,6 +566,36 @@ void WebClient::handleMessage(const std::string &message) { } this->voice_bridge->remote_ice_finished(); } + } else if(val["type"].asString() == "ping") { + Json::Value response; + response["type"] = "pong"; + response["payload"] = val["payload"]; + response["ping_native"] = to_string(duration_cast(this->ping.value).count()); + this->sendJson(response); + return; + } else if(val["type"].asString() == "pong") { + auto payload = val["payload"].isString() ? val["payload"].asString() : ""; + uint8_t response_id = 0; + try { + response_id = (uint8_t) stoul(payload); + } catch(std::exception& ex) { + debugMessage(this->getServerId(), "[{}] Failed to parse pong payload."); + return; + } + + if(response_id != this->js_ping.current_id) { + debugMessage( + this->getServerId(), + "{} Received pong on web socket from javascript which is older than the last request. Delay may over {}ms? (Index: {}, Current index: {})", + CLIENT_STR_LOG_PREFIX, + duration_cast(this->js_ping.timeout).count(), + response_id, + this->js_ping.current_id + ); + return; + } + this->js_ping.last_response = system_clock::now(); + this->js_ping.value = duration_cast(this->js_ping.last_response - this->js_ping.last_request); } } catch (const std::exception& ex) { logError(this->server->getServerId(), "Could not handle json packet! Message {}", ex.what()); diff --git a/server/src/client/web/WebClient.h b/server/src/client/web/WebClient.h index 3058654..1c20ccf 100644 --- a/server/src/client/web/WebClient.h +++ b/server/src/client/web/WebClient.h @@ -32,6 +32,9 @@ namespace ts { bool shouldReceiveVoice(const std::shared_ptr &sender) override; + inline std::chrono::nanoseconds client_ping() { return this->client_ping_layer_7(); } + inline std::chrono::nanoseconds client_ping_layer_5() { return this->ping.value; } + inline std::chrono::nanoseconds client_ping_layer_7() { return this->js_ping.value; } protected: void handlePacketVoiceWhisper(const pipes::buffer_view &string, bool) override; @@ -60,6 +63,15 @@ namespace ts { std::chrono::nanoseconds timeout{2000}; } ping; + struct { + uint8_t current_id = 0; + std::chrono::system_clock::time_point last_request; + std::chrono::system_clock::time_point last_response; + + std::chrono::nanoseconds value; + std::chrono::nanoseconds timeout{2000}; + } js_ping; + std::mutex queue_lock; std::deque queue_read; std::deque queue_write; @@ -93,7 +105,6 @@ namespace ts { public: void send_voice_packet(const pipes::buffer_view &view, const VoicePacketFlags &flags) override; - void send_voice_whisper_packet(const pipes::buffer_view &view, const VoicePacketFlags &flags) override; protected: diff --git a/server/src/manager/ConversationManager.cpp b/server/src/manager/ConversationManager.cpp index e4e6e6f..fe4f8b0 100644 --- a/server/src/manager/ConversationManager.cpp +++ b/server/src/manager/ConversationManager.cpp @@ -18,6 +18,203 @@ using namespace ts::server::conversation; namespace fs = std::experimental::filesystem; +/* Using const O3 to improve unreadability */ +#if 0 +/* +/* Debug */ +0x555555c542a2 push rbp +0x555555c542a3 mov rbp,rsp +0x555555c542a6 mov QWORD PTR [rbp-0x28],rdi +0x555555c542aa mov QWORD PTR [rbp-0x30],rsi +0x555555c542ae mov QWORD PTR [rbp-0x38],rdx +0x555555c542b2 mov QWORD PTR [rbp-0x40],rcx +0x555555c542b6 mov rax,QWORD PTR [rbp-0x40] +0x555555c542ba mov QWORD PTR [rbp-0x20],rax +0x555555c542be mov rax,QWORD PTR [rbp-0x38] +0x555555c542c2 mov QWORD PTR [rbp-0x18],rax +0x555555c542c6 mov rax,QWORD PTR [rbp-0x28] +0x555555c542ca mov QWORD PTR [rbp-0x10],rax +0x555555c542ce mov rax,QWORD PTR [rbp-0x30] +0x555555c542d2 mov QWORD PTR [rbp-0x8],rax + +/* first loop */ +0x555555c542d6 cmp QWORD PTR [rbp-0x18],0x7 +0x555555c542db jbe 0x555555c5431e +0x555555c542dd mov rax,QWORD PTR [rbp-0x18] +0x555555c542e1 and eax,0x7 +0x555555c542e4 mov rdx,QWORD PTR [rbp-0x20] +0x555555c542e8 mov ecx,eax +0x555555c542ea shl rdx,cl +0x555555c542ed mov rax,rdx +0x555555c542f0 xor rax,QWORD PTR [rbp-0x18] +0x555555c542f4 xor QWORD PTR [rbp-0x20],rax +0x555555c542f8 mov rax,QWORD PTR [rbp-0x10] +0x555555c542fc mov rax,QWORD PTR [rax] +0x555555c542ff xor rax,QWORD PTR [rbp-0x20] +0x555555c54303 mov rdx,rax +0x555555c54306 mov rax,QWORD PTR [rbp-0x8] +0x555555c5430a mov QWORD PTR [rax],rdx +0x555555c5430d add QWORD PTR [rbp-0x8],0x8 +0x555555c54312 add QWORD PTR [rbp-0x10],0x8 +0x555555c54317 sub QWORD PTR [rbp-0x18],0x8 +0x555555c5431c jmp 0x555555c542d6 /* first loop */ + +/* Second loop */ +0x555555c5431e cmp QWORD PTR [rbp-0x18],0x0 +0x555555c54323 je 0x555555c54364 +0x555555c54325 mov rax,QWORD PTR [rbp-0x18] +0x555555c54329 and eax,0x7 +0x555555c5432c mov rdx,QWORD PTR [rbp-0x20] +0x555555c54330 mov ecx,eax +0x555555c54332 shl rdx,cl +0x555555c54335 mov rax,rdx +0x555555c54338 xor rax,QWORD PTR [rbp-0x18] +0x555555c5433c xor QWORD PTR [rbp-0x20],rax +0x555555c54340 mov rax,QWORD PTR [rbp-0x10] +0x555555c54344 movzx edx,BYTE PTR [rax] +0x555555c54347 mov rax,QWORD PTR [rbp-0x20] +0x555555c5434b xor edx,eax +0x555555c5434d mov rax,QWORD PTR [rbp-0x8] +0x555555c54351 mov BYTE PTR [rax],dl +0x555555c54353 sub QWORD PTR [rbp-0x18],0x1 +0x555555c54358 add QWORD PTR [rbp-0x10],0x1 +0x555555c5435d add QWORD PTR [rbp-0x8],0x1 +0x555555c54362 jmp 0x555555c5431e /* second loop */ +0x555555c54364 nop +0x555555c54365 pop rbp +0x555555c54366 ret + + +/* O3 */ +0x555555c41d8f push rbp +0x555555c41d90 cmp rdx,0x7 +0x555555c41d94 mov r8,rcx +0x555555c41d97 mov rbp,rsp +0x555555c41d9a push r14 +0x555555c41d9c push rbx +0x555555c41d9d jbe 0x555555c41df8 +0x555555c41d9f lea rbx,[rdx-0x8] +0x555555c41da3 mov r10,rsi +0x555555c41da6 mov r9,rdi +0x555555c41da9 mov rax,rdx +0x555555c41dac mov r11,rbx +0x555555c41daf and r11d,0x7 +0x555555c41db3 mov ecx,eax +0x555555c41db5 mov r14,r8 +0x555555c41db8 add r10,0x8 +0x555555c41dbc and ecx,0x7 +0x555555c41dbf add r9,0x8 +0x555555c41dc3 shl r14,cl +0x555555c41dc6 mov rcx,r14 +0x555555c41dc9 xor rcx,rax +0x555555c41dcc sub rax,0x8 +0x555555c41dd0 xor r8,rcx +0x555555c41dd3 mov rcx,QWORD PTR [r9-0x8] +0x555555c41dd7 xor rcx,r8 +0x555555c41dda mov QWORD PTR [r10-0x8],rcx +0x555555c41dde cmp rax,r11 +0x555555c41de1 jne 0x555555c41db3 +0x555555c41de3 shr rbx,0x3 +0x555555c41de7 and edx,0x7 +0x555555c41dea lea rax,[rbx*8+0x8] +0x555555c41df2 add rdi,rax +0x555555c41df5 add rsi,rax +0x555555c41df8 test rdx,rdx +0x555555c41dfb je 0x555555c41ed3 +0x555555c41e01 mov rax,r8 +0x555555c41e04 mov ecx,edx +0x555555c41e06 xor r8,rdx +0x555555c41e09 shl rax,cl +0x555555c41e0c mov rcx,rdx +0x555555c41e0f xor r8,rax +0x555555c41e12 movzx eax,BYTE PTR [rdi] +0x555555c41e15 xor eax,r8d +0x555555c41e18 sub rcx,0x1 +0x555555c41e1c mov BYTE PTR [rsi],al +0x555555c41e1e je 0x555555c41ed3 +0x555555c41e24 mov rax,r8 +0x555555c41e27 xor r8,rcx +0x555555c41e2a shl rax,cl +0x555555c41e2d mov rcx,rdx +0x555555c41e30 xor r8,rax +0x555555c41e33 movzx eax,BYTE PTR [rdi+0x1] +0x555555c41e37 xor eax,r8d +0x555555c41e3a sub rcx,0x2 +0x555555c41e3e mov BYTE PTR [rsi+0x1],al +0x555555c41e41 je 0x555555c41ed3 +0x555555c41e47 mov rax,r8 +0x555555c41e4a xor r8,rcx +0x555555c41e4d shl rax,cl +0x555555c41e50 mov rcx,rdx +0x555555c41e53 xor r8,rax +0x555555c41e56 movzx eax,BYTE PTR [rdi+0x2] +0x555555c41e5a xor eax,r8d +0x555555c41e5d sub rcx,0x3 +0x555555c41e61 mov BYTE PTR [rsi+0x2],al +0x555555c41e64 je 0x555555c41ed3 +0x555555c41e66 mov rax,r8 +0x555555c41e69 xor r8,rcx +0x555555c41e6c shl rax,cl +0x555555c41e6f mov rcx,rdx +0x555555c41e72 xor r8,rax +0x555555c41e75 movzx eax,BYTE PTR [rdi+0x3] +0x555555c41e79 xor eax,r8d +0x555555c41e7c sub rcx,0x4 +0x555555c41e80 mov BYTE PTR [rsi+0x3],al +0x555555c41e83 je 0x555555c41ed3 +0x555555c41e85 mov rax,r8 +0x555555c41e88 xor r8,rcx +0x555555c41e8b shl rax,cl +0x555555c41e8e mov rcx,rdx +0x555555c41e91 xor r8,rax +0x555555c41e94 movzx eax,BYTE PTR [rdi+0x4] +0x555555c41e98 xor eax,r8d +0x555555c41e9b sub rcx,0x5 +0x555555c41e9f mov BYTE PTR [rsi+0x4],al +0x555555c41ea2 je 0x555555c41ed3 +0x555555c41ea4 mov rax,r8 +0x555555c41ea7 xor r8,rcx +0x555555c41eaa shl rax,cl +0x555555c41ead xor r8,rax +0x555555c41eb0 movzx eax,BYTE PTR [rdi+0x5] +0x555555c41eb4 xor eax,r8d +0x555555c41eb7 cmp rdx,0x6 +0x555555c41ebb mov BYTE PTR [rsi+0x5],al +0x555555c41ebe je 0x555555c41ed3 +0x555555c41ec0 lea rax,[r8+r8*1] +0x555555c41ec4 xor r8,0x1 +0x555555c41ec8 xor r8,rax +0x555555c41ecb xor r8b,BYTE PTR [rdi+0x6] +0x555555c41ecf mov BYTE PTR [rsi+0x6],r8b +0x555555c41ed3 pop rbx +0x555555c41ed4 pop r14 +0x555555c41ed6 pop rbp +0x555555c41ed7 ret + */ +#endif +__attribute__((optimize("-O3"), always_inline)) void apply_crypt(void* source, void* target, size_t length, uint64_t base_key) { + uint64_t crypt_key = base_key; + size_t length_left = length; + auto source_ptr = (uint8_t*) source; + auto dest_ptr = (uint8_t*) target; + + while(length_left >= 8) { + crypt_key ^= (crypt_key << (length_left & 0x7U)) ^ length_left; + *(uint64_t*) dest_ptr = *(uint64_t*) source_ptr ^ crypt_key; + + dest_ptr += 8; + source_ptr += 8; + length_left -= 8; + } + while(length_left > 0) { + crypt_key ^= (crypt_key << (length_left & 0x7U)) ^ length_left; + *dest_ptr = *source_ptr ^ (uint8_t) crypt_key; + + length_left--; + source_ptr++; + dest_ptr++; + } +} Conversation::Conversation(const std::shared_ptr &handle, ts::ChannelId channel_id, const std::string& file) : _ref_handle(handle), _channel_id(channel_id), file_name(file) { } Conversation::~Conversation() { @@ -50,6 +247,7 @@ bool Conversation::initialize(std::string& error) { this->file_handle = fopen(this->file_name.c_str(), fs::exists(file) ? "r+" : "w+"); if(!this->file_handle) { + this->_volatile = true; error = "failed to open file"; return false; } @@ -197,6 +395,11 @@ bool Conversation::initialize(std::string& error) { else this->_last_message_timestamp = system_clock::time_point{}; } + /* close the file handle because we've passed our checks */ + { + fclose(this->file_handle); + this->file_handle = nullptr; + } return true; } @@ -214,14 +417,59 @@ void Conversation::finalize() { } void Conversation::cleanup_cache() { - //FIXME: Implement this shit here! + auto ref_handle = this->ref_handle(); + if(!ref_handle) + return; + auto ref_server = ref_handle->ref_server(); + if(!ref_server) + return; + + { + lock_guard block(this->message_block_lock); + for(auto& block : this->message_blocks) { + block->block_header = nullptr; + block->indexed_block = nullptr; + } + } + { + lock_guard file_lock(this->file_handle_lock); + if(this->last_access + minutes(5) < system_clock::now()) { + if(this->file_handle) { + fclose(this->file_handle); + this->file_handle = nullptr; + debugMessage(ref_server->getServerId(), "[Conversations][{}] Closing file handle due to inactivity.", this->_channel_id); + } + } + } } -ssize_t Conversation::fread(void *target, size_t length, ssize_t index) { +bool Conversation::setup_file() { + this->file_handle = fopen(this->file_name.c_str(), fs::exists(this->file_name) ? "r+" : "w+"); + if(!this->file_handle) { + auto ref_handle = this->ref_handle(); + if(!ref_handle) + return false; + auto ref_server = ref_handle->ref_server(); + if(!ref_server) + return false; + + logError(ref_server->getServerId(), "[Conversations][{}] Failed to open closed file handle. ({} | {})", errno, strerror(errno)); + return false; + } + setbuf(this->file_handle, nullptr); /* we're doing random access (a buffer is useless here) */ + return true; +} + +ssize_t Conversation::fread(void *target, size_t length, ssize_t index, bool acquire_lock) { if(length == 0) return 0; - lock_guard file_lock(this->file_handle_lock); + unique_lock file_lock(this->file_handle_lock, defer_lock); + if(acquire_lock) + file_lock.lock(); + this->last_access = system_clock::now(); + if(!this->file_handle && !this->setup_file()) + return -3; if(index >= 0) { auto result = fseek(this->file_handle, index, SEEK_SET); if(result < 0) @@ -238,12 +486,17 @@ ssize_t Conversation::fread(void *target, size_t length, ssize_t index) { return total_read; } -ssize_t Conversation::fwrite(void *target, size_t length, ssize_t index, bool extend_file) { +ssize_t Conversation::fwrite(void *target, size_t length, ssize_t index, bool extend_file, bool acquire_lock) { if(length == 0) return 0; + unique_lock file_lock(this->file_handle_lock, defer_lock); + if(acquire_lock) + file_lock.lock(); extend_file = false; /* fseek does the job good ad well */ - lock_guard file_lock(this->file_handle_lock); + if(!this->file_handle && !this->setup_file()) + return -3; + this->last_access = system_clock::now(); if(index >= 0) { auto result = extend_file ? lseek(fileno(this->file_handle), index, SEEK_SET) : fseek(this->file_handle, index, SEEK_SET); if(result < 0) @@ -265,7 +518,7 @@ bool Conversation::load_message_block_header(const std::shared_ptr(); - if(this->fread(&*block_header, sizeof(*block_header), block->block_offset) != sizeof(*block_header)) { + if(this->fread(&*block_header, sizeof(*block_header), block->block_offset, true) != sizeof(*block_header)) { error = "failed to read block header"; return false; } @@ -311,7 +564,7 @@ bool Conversation::load_message_block_index(const std::shared_ptrfread(&header, sizeof(header), offset) != sizeof(header)) { + if(this->fread(&header, sizeof(header), offset, true) != sizeof(header)) { error = "failed to read message header at index" + to_string(offset); return false; } @@ -344,6 +597,12 @@ bool Conversation::load_messages(const std::shared_ptr &block, if(index >= indexed_block->message_index.size()) return true; + + unique_lock file_lock(this->file_handle_lock); + if(!this->file_handle && !this->setup_file()) { + error = "failed to open file handle"; + return false; + } auto result = fseek(this->file_handle, block->block_offset + indexed_block->message_index[index].offset, SEEK_SET); if(result == EINVAL) { error = "failed to seek to begin of an indexed block read"; @@ -357,7 +616,7 @@ bool Conversation::load_messages(const std::shared_ptr &block, } auto data = make_shared(); - if(this->fread(&data->header, sizeof(data->header), -1) != sizeof(data->header)) { + if(this->fread(&data->header, sizeof(data->header), -1, false) != sizeof(data->header)) { error = "failed to read message header at index " + to_string(index); return false; } @@ -367,42 +626,44 @@ bool Conversation::load_messages(const std::shared_ptr &block, return false; } - data->sender_unique_id.resize(data->header.sender_unique_id_length); - data->sender_name.resize(data->header.sender_name_length); + if(header->meta_encrypted) { + auto meta_size = data->header.sender_unique_id_length + data->header.sender_name_length; + auto meta_buffer = malloc(meta_size); + + if(this->fread(meta_buffer, meta_size, -1, false) != meta_size) { + error = "failed to read message metadata at " + to_string(index); + free(meta_buffer); + return false; + } + + apply_crypt(meta_buffer, meta_buffer, meta_size, (block->block_offset ^ data->header.message_timestamp) ^ 0x6675636b20796f75ULL); /* 0x6675636b20796f75 := 'fuck you' */ + + data->sender_unique_id.assign((char*) meta_buffer, data->header.sender_unique_id_length); + data->sender_name.assign((char*) meta_buffer + data->header.sender_unique_id_length, data->header.sender_name_length); + free(meta_buffer); + } else { + data->sender_unique_id.resize(data->header.sender_unique_id_length); + data->sender_name.resize(data->header.sender_name_length); + + if(this->fread(data->sender_unique_id.data(), data->sender_unique_id.length(), -1, false) != data->sender_unique_id.length()) { + error = "failed to read message sender unique id at " + to_string(index); + return false; + } + + if(this->fread(data->sender_name.data(), data->sender_name.length(), -1, false) != data->sender_name.length()) { + error = "failed to read message sender name id at " + to_string(index); + return false; + } + } + data->message.resize(data->header.message_length); - - if(this->fread(data->sender_unique_id.data(), data->sender_unique_id.length(), -1) != data->sender_unique_id.length()) { - error = "failed to read message sender unique id at " + to_string(index); - return false; - } - - if(this->fread(data->sender_name.data(), data->sender_name.length(), -1) != data->sender_name.length()) { - error = "failed to read message sender name id at " + to_string(index); - return false; - } - - if(this->fread(data->message.data(), data->message.length(), -1) != data->message.length()) { + if(this->fread(data->message.data(), data->message.length(), -1, false) != data->message.length()) { error = "failed to read message id at " + to_string(index); return false; } - if(header->message_encrypted) { - uint64_t crypt_key = block->block_offset ^ data->header.message_timestamp; - size_t length_left = data->message.size(); - auto ptr = (char*) data->message.data(); - while(length_left >= 8) { - crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left; - *(uint64_t*) ptr ^= crypt_key; - ptr += 8; - length_left -= 8; - } - while(length_left > 0) { - crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left; - *ptr ^= (uint8_t) crypt_key; - length_left--; - ptr++; - } - } + if(header->message_encrypted) + apply_crypt(data->message.data(), data->message.data(), data->message.size(), block->block_offset ^ data->header.message_timestamp); message_data.message_data = data; index++; @@ -452,7 +713,7 @@ void Conversation::finish_block(const std::shared_ptr &header, size_t index, std::string &error) { - auto code = this->fwrite(&*header, sizeof(fio::BlockHeader), index, false); + auto code = this->fwrite(&*header, sizeof(fio::BlockHeader), index, false, true); if(code == sizeof(fio::BlockHeader)) return true; error = "write returned " + to_string(code); @@ -527,6 +788,10 @@ void Conversation::process_write_queue(const std::chrono::system_clock::time_poi //TODO: Find "free" blocks and use them! (But do not use indirectly finished blocks, their max size could be invalid) unique_lock file_lock(this->file_handle_lock); + if(!this->file_handle && !this->setup_file()) { + logError(ref_server->getServerId(), "[Conversations][{}] Failed to reopen log file. Dropping message!", this->_channel_id); + return; + } auto result = fseek(this->file_handle, 0, SEEK_END); if(result != 0) { logError(ref_server->getServerId(), "[Conversations][{}] failed to seek to the end (" + to_string(result) + " " + to_string(errno) + "). Could not create new block. Dropping message!", this->_channel_id); @@ -553,7 +818,8 @@ void Conversation::process_write_queue(const std::chrono::system_clock::time_poi block_header->first_message_timestamp = (uint64_t) duration_cast(write_entry->message_timestamp.time_since_epoch()).count(); block_header->block_size = sizeof(fio::BlockHeader); - //block_header->message_encrypted = true; /* May add some kind of hidden debug option? */ + block_header->message_encrypted = true; /* May add some kind of hidden debug option? */ + block_header->meta_encrypted = true; /* May add some kind of hidden debug option? */ this->last_message_block->block_header = block_header; } @@ -563,61 +829,52 @@ void Conversation::process_write_queue(const std::chrono::system_clock::time_poi block_header->last_message_timestamp = write_header.message_timestamp; /* first write the header */ - if(this->fwrite(&write_header, sizeof(write_header), entry_offset, true) != sizeof(write_header)) { + if(this->fwrite(&write_header, sizeof(write_header), entry_offset, true, true) != sizeof(write_header)) { logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message header. Dropping message!", this->_channel_id); return; } entry_offset += sizeof(write_header); - /* then write the sender unique id */ - if(this->fwrite(write_entry->sender_unique_id.data(), write_header.sender_unique_id_length, entry_offset, true) != write_header.sender_unique_id_length) { - logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message sender unique id. Dropping message!", this->_channel_id); - return; - } - entry_offset += write_header.sender_unique_id_length; + /* write the metadata */ + { + auto write_buffer_size = write_header.sender_unique_id_length + write_header.sender_name_length; + auto write_buffer = malloc(write_buffer_size); - /* then write the sender name */ - if(this->fwrite(write_entry->sender_name.data(), write_header.sender_name_length, entry_offset, true) != write_header.sender_name_length) { - logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message sender name. Dropping message!", this->_channel_id); - return; + memcpy(write_buffer, write_entry->sender_unique_id.data(), write_header.sender_unique_id_length); + memcpy((char*) write_buffer + write_header.sender_unique_id_length, write_entry->sender_name.data(), write_header.sender_name_length); + + if(block_header->meta_encrypted) + apply_crypt(write_buffer, write_buffer, write_buffer_size, (this->last_message_block->block_offset ^ write_header.message_timestamp) ^ 0x6675636b20796f75ULL); /* 0x6675636b20796f75 := 'fuck you' */ + + /* then write the sender unique id */ + if(this->fwrite(write_buffer, write_buffer_size, entry_offset, true, true) != write_buffer_size) { + logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message header. Dropping message!", this->_channel_id); + free(write_buffer); + return; + } + free(write_buffer); + entry_offset += write_buffer_size; } - entry_offset += write_header.sender_name_length; /* then write the message */ - bool message_result; - if(block_header->message_encrypted) { - uint64_t crypt_key = this->last_message_block->block_offset ^ write_header.message_timestamp; - size_t length_left = write_entry->message.size(); - auto ptr = (char*) write_entry->message.data(); - char* target_buffer = (char*) malloc(length_left); - char* target_buffer_ptr = target_buffer; - assert(target_buffer); + { + bool message_result; + if(block_header->message_encrypted) { + size_t length = write_entry->message.size(); + char* target_buffer = (char*) malloc(length); + apply_crypt(write_entry->message.data(), target_buffer, length, this->last_message_block->block_offset ^ write_header.message_timestamp); - while(length_left >= 8) { - crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left; - *(uint64_t*) target_buffer_ptr = crypt_key; - ptr += 8; - target_buffer_ptr += 8; - length_left -= 8; + message_result = this->fwrite(target_buffer, write_header.message_length, entry_offset, true, true) == write_header.message_length; + free(target_buffer); + } else { + message_result = this->fwrite(write_entry->message.data(), write_header.message_length, entry_offset, true, true) == write_header.message_length; } - while(length_left > 0) { - crypt_key ^= (crypt_key << (length_left & 0x7)) ^ length_left; - *target_buffer_ptr = *ptr ^ (uint8_t) crypt_key; - length_left--; - ptr++; - target_buffer_ptr++; + if(!message_result) { + logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message. Dropping message!", this->_channel_id); + return; } - - message_result = this->fwrite(target_buffer, write_header.message_length, entry_offset, true) == write_header.message_length; - free(target_buffer); - } else { - message_result = this->fwrite(write_entry->message.data(), write_header.message_length, entry_offset, true) == write_header.message_length; + entry_offset += write_header.message_length; } - if(!message_result) { - logError(ref_server->getServerId(), "[Conversations][{}] Failed to write message. Dropping message!", this->_channel_id); - return; - } - entry_offset += write_header.message_length; block_header->last_message_offset = (uint32_t) (entry_offset - this->last_message_block->block_offset - sizeof(fio::BlockHeader)); block_header->block_size += write_header.total_length; @@ -751,6 +1008,14 @@ std::deque> Conversation::message_history(con } if(!this->volatile_only()) { + auto handle = this->_ref_handle.lock(); + if(!handle) + return result; + + auto ref_server = handle->ref_server(); + if(!ref_server) + return result; + auto timestamp = result.empty() ? end_timestamp : result.back()->message_timestamp; unique_lock lock(this->message_block_lock); @@ -776,12 +1041,12 @@ std::deque> Conversation::message_history(con auto block = *_rit; /* lets search for messages */ if(!this->load_message_block_index(block, error)) { - //TODO: Log error + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to load message block {} for message lookup: {}", this->_channel_id, block->block_offset, error); continue; } auto index = (*_rit)->indexed_block; if(!index) { - //TODO Log error + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to reference indexed block within message block.", this->_channel_id); continue; } @@ -803,7 +1068,7 @@ std::deque> Conversation::message_history(con if(!this->load_messages(block, 0, std::distance(index->message_index.begin(), rmid) + 1, error)) { - //TODO: Log error + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to load messages within block {} for message lookup: {}", this->_channel_id, block->block_offset, error); continue; } do { @@ -813,6 +1078,8 @@ std::deque> Conversation::message_history(con if(begin_timestamp.time_since_epoch().count() != 0 && rmid->timestamp < begin_timestamp) return result; + if(rmid->timestamp >= timestamp) + continue; /* for some reason we got a message from the index of before where we are. This could happen for "orphaned" blocks which point to a valid block within the future block */ /* std::chrono::system_clock::time_point message_timestamp; @@ -831,6 +1098,7 @@ std::deque> Conversation::message_history(con data->message })); + timestamp = rmid->timestamp; if(--message_count == 0) return result; } while(rmid-- != index->message_index.begin()); diff --git a/server/src/manager/ConversationManager.h b/server/src/manager/ConversationManager.h index d056ad1..7f0e385 100644 --- a/server/src/manager/ConversationManager.h +++ b/server/src/manager/ConversationManager.h @@ -143,7 +143,7 @@ namespace ts { inline ChannelId channel_id() { return this->_channel_id; } /* if for some reason we're not able to open the file then we're in volatile mode */ - inline bool volatile_only() { return !this->file_handle; } + inline bool volatile_only() { return this->_volatile; } void cleanup_cache(); //void set_history_length(ssize_t /* save length */); @@ -167,8 +167,10 @@ namespace ts { ts_always_inline std::shared_ptr ref_handle() { return this->_ref_handle.lock(); } - inline ssize_t fread(void* target, size_t length, ssize_t index); - inline ssize_t fwrite(void* target, size_t length, ssize_t index, bool extend_file); + + inline bool setup_file(); + inline ssize_t fread(void* target, size_t length, ssize_t index, bool acquire_handle); + inline ssize_t fwrite(void* target, size_t length, ssize_t index, bool extend_file, bool acquire_handle); /* block db functions */ void db_save_block(const std::shared_ptr& /* block */); @@ -204,9 +206,12 @@ namespace ts { /* basic file stuff */ std::string file_name; std::mutex file_handle_lock; + std::chrono::system_clock::time_point last_access; FILE* file_handle = nullptr; ChannelId _channel_id; + bool _volatile = false; + std::chrono::system_clock::time_point _last_message_timestamp; }; diff --git a/server/src/server/QueryServer.cpp b/server/src/server/QueryServer.cpp index c29831a..02e41c5 100644 --- a/server/src/server/QueryServer.cpp +++ b/server/src/server/QueryServer.cpp @@ -36,12 +36,12 @@ QueryServer::~QueryServer() { void QueryServer::unregisterConnection(const shared_ptr &client) { { - threads::MutexLock lock(this->clientLock); + lock_guard lock(this->connected_clients_lock); auto found = std::find(this->connectedClients.begin(), this->connectedClients.end(), client); if(found != this->connectedClients.end()) this->connectedClients.erase(found); else - logError(LOG_QUERY, "Attempted to unregister an invalid query connection!"); + logError(LOG_QUERY, "Attempted to unregister an invalid connection!"); } if(client->server) { @@ -52,80 +52,111 @@ void QueryServer::unregisterConnection(const shared_ptr &client) { /* client->handle = nullptr; */ } -bool QueryServer::start(const sockaddr_in& localAdress, std::string& errorMessage) { - if(this->running()) return false; - this->active = true; - boundAddress = new sockaddr_in; - memcpy(boundAddress, &localAdress, sizeof(localAdress)); +bool QueryServer::start(const deque> &bindings, std::string &error) { + if(this->active) { + error = "already started"; + return false; + } + this->active = true; - ip_blacklist.reset(new IpListManager("query_ip_blacklist.txt", {"#A new line separated address blacklist", "#", "#For example if we dont want google:", "8.8.8.8"})); - ip_whitelist.reset(new IpListManager("query_ip_whitelist.txt", {"#A new line separated address whitelist", "#Every ip have no flood and login attempt limit!", "127.0.0.1/8", "::1"})); - string error; - if(!this->ip_blacklist->reload(error)) logError(LOG_QUERY, "Failed to load query blacklist: {}", error); - if(!this->ip_whitelist->reload(error)) logError(LOG_QUERY, "Failed to load query whitelist: {}", error); - this->serverSocket = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); - if (serverSocket < 0) { - logCritical("Cant create server socket for file server"); - return false; - } + /* load ip black/whitelist */ + { + ip_blacklist.reset(new IpListManager("query_ip_blacklist.txt", {"#A new line separated address blacklist", "#", "#For example if we dont want google:", "8.8.8.8"})); + ip_whitelist.reset(new IpListManager("query_ip_whitelist.txt", {"#A new line separated address whitelist", "#Every ip have no flood and login attempt limit!", "127.0.0.1/8", "::1"})); + string error; + if(!this->ip_blacklist->reload(error)) logError(LOG_QUERY, "Failed to load query blacklist: {}", error); + if(!this->ip_whitelist->reload(error)) logError(LOG_QUERY, "Failed to load query whitelist: {}", error); + } - int enable = 1; - int disabled = 0; - if (setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) - logError("setsockopt(SO_REUSEADDR) failed"); - if(setsockopt(serverSocket, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) - logError("Cant disable nopush! Error: "+to_string(errno)+" / "+strerror(errno)); - setsockopt(serverSocket, SOL_SOCKET, SO_KEEPALIVE, &enable, sizeof(enable)); + /* reserve backup file descriptor in case that the max file descriptors have been reached */ + { + this->server_reserve_fd = dup(1); + if(this->server_reserve_fd < 0) + logWarning(LOG_QUERY, "Failed to reserve a backup accept file descriptor. ({} | {})", errno, strerror(errno)); + } - if(fcntl(serverSocket, F_SETFD, FD_CLOEXEC) < 0) - logError(LOG_QUERY, "Failed to enable FD_CLOEXEC for {} (QueryServer)", serverSocket); + /* setup event bases */ + { + this->eventLoop = event_base_new(); + this->ioThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [&]{ + while(this->active) { + debugMessage(LOG_QUERY, "Entering event loop ({})", (void*) this->eventLoop); + event_base_loop(this->eventLoop, EVLOOP_NO_EXIT_ON_EMPTY); + if(this->active) { + debugMessage(LOG_QUERY, "Event loop exited ({}). No active events. Sleeping 1 seconds", (void*) this->eventLoop); + this_thread::sleep_for(seconds(1)); + } else { + debugMessage(LOG_QUERY, "Event loop exited ({})", (void*) this->eventLoop); + } + } + }); + this->ioThread->name("EVENT Query").execute(); + } - if (bind(serverSocket, (struct sockaddr *) &localAdress, sizeof(localAdress)) < 0) { - errorMessage = string() + "Cant bind server socket (" + strerror(errno) + ")"; - return false; - } - if(listen(serverSocket, 255) < 0){ - errorMessage = string() + "Cant listen on server socket (" + strerror(errno) + ")"; - return false; - } + for(auto& binding : bindings) { + binding->file_descriptor = socket(binding->address.ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0); + if(binding->file_descriptor < 0) { + logError(LOG_QUERY, "Failed to bind server to {}. (Failed to create socket: {} | {})", binding->as_string(), errno, strerror(errno)); + continue; + } - this->eventLoop = event_base_new(); - this->acceptEvent = event_new(this->eventLoop, this->serverSocket, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryServer*) c)->onClientAccept(a, b, c); }, this); - event_add(this->acceptEvent, nullptr); + int enable = 1, disabled = 0; - this->ioThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [&](){ - debugMessage(LOG_QUERY, "Event base executed ({})", (void*) this->eventLoop); - event_base_dispatch(this->eventLoop); - debugMessage(LOG_QUERY, "Event base terminated ({})", (void*) this->eventLoop); - }); - this->ioThread->name("EVENT Query").execute(); + if (setsockopt(binding->file_descriptor, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) + logWarning(LOG_QUERY, "Failed to activate SO_REUSEADDR for binding {} ({} | {})", binding->as_string(), errno, strerror(errno)); + if(setsockopt(binding->file_descriptor, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) + logWarning(LOG_QUERY, "Failed to deactivate TCP_NOPUSH for binding {} ({} | {})", binding->as_string(), errno, strerror(errno)); + if(binding->address.ss_family == AF_INET6) { + if(setsockopt(binding->file_descriptor, IPPROTO_IPV6, IPV6_V6ONLY, &enable, sizeof(int)) < 0) + logWarning(LOG_QUERY, "Failed to activate IPV6_V6ONLY for IPv6 binding {} ({} | {})", binding->as_string(), errno, strerror(errno)); + } + if(fcntl(binding->file_descriptor, F_SETFD, FD_CLOEXEC) < 0) + logWarning(LOG_QUERY, "Failed to set flag FD_CLOEXEC for binding {} ({} | {})", binding->as_string(), errno, strerror(errno)); - this->tickingId = serverInstance->scheduler()->schedule("query", bind(&QueryServer::tick, this), seconds(1)); - return true; + + if (bind(binding->file_descriptor, (struct sockaddr *) &binding->address, sizeof(binding->address)) < 0) { + logError(LOG_QUERY, "Failed to bind server to {}. (Failed to bind socket: {} | {})", binding->as_string(), errno, strerror(errno)); + close(binding->file_descriptor); + continue; + } + + if (listen(binding->file_descriptor, SOMAXCONN) < 0) { + logError(LOG_QUERY, "Failed to bind server to {}. (Failed to listen: {} | {})", binding->as_string(), errno, strerror(errno)); + close(binding->file_descriptor); + continue; + } + + binding->event_accept = event_new(this->eventLoop, binding->file_descriptor, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryServer *) c)->on_client_receive(a, b, c); }, this); + event_add(binding->event_accept, nullptr); + this->bindings.push_back(binding); + } + + if(this->bindings.empty()) { + this->stop(); + error = "failed to bind to any address"; + return false; + } + + this->tickingId = serverInstance->scheduler()->schedule("query", bind(&QueryServer::tick, this), seconds(1)); + return true; } - void QueryServer::stop() { - if(!this->running()) return; + if(!this->running()) + return; active = false; serverInstance->scheduler()->cancelTask("query"); - this->clientLock.lock(); - auto clList = this->connectedClients; - this->clientLock.unlock(); + this->connected_clients_lock.lock(); + auto connected_clients = this->connectedClients; + this->connected_clients_lock.unlock(); Command cmd("serverstop"); cmd["stopped"] = true; - for(const auto &client : clList){ + for(const auto &client : connected_clients){ client->sendCommand(cmd); client->disconnect("server stopped"); } - if(this->acceptEvent){ - event_del(this->acceptEvent); - event_free(this->acceptEvent); - this->acceptEvent = nullptr; - } - { auto now = system_clock::now(); while(!this->connectedClients.empty()) { @@ -144,6 +175,22 @@ void QueryServer::stop() { } this->threads.clear(); + for(auto& binding : this->bindings) { + if(binding->event_accept) { + event_del_block(binding->event_accept); + event_free(binding->event_accept); + binding->event_accept = nullptr; + } + if(binding->file_descriptor > 0) { + if(shutdown(binding->file_descriptor, SHUT_RDWR) < 0) + logWarning(LOG_QUERY, "Failed to shutdown socket for binding {} ({} | {}).", binding->as_string(), errno, strerror(errno)); + if(close(binding->file_descriptor) < 0) + logError(LOG_QUERY, "Failed to close socket for binding {} ({} | {}).", binding->as_string(), errno, strerror(errno)); + binding->file_descriptor = -1; + } + } + this->bindings.clear(); + if(this->eventLoop) event_base_loopexit(this->eventLoop, nullptr); if(this->ioThread) { @@ -160,41 +207,198 @@ void QueryServer::stop() { this->eventLoop = nullptr; } - delete this->boundAddress; - this->boundAddress = nullptr; - - if(this->serverSocket > 0) { - if(shutdown(this->serverSocket, SHUT_RDWR) < 0) logError(LOG_QUERY, "Could not shutdown server socket!"); - if(close(this->serverSocket) < 0) logError(LOG_QUERY, "Could not close server socket!"); + if(this->server_reserve_fd > 0) { + if(close(this->server_reserve_fd) < 0) + logError(LOG_QUERY, "Failed to close backup file descriptor ({} | {})", errno, strerror(errno)); } - this->serverSocket = -1; + this->server_reserve_fd = -1; } -void QueryServer::onClientAccept(int fd, short ev, void *arg) { - sockaddr_in remoteAddr{}; - memset(&remoteAddr, 0, sizeof(sockaddr_in)); - socklen_t addrLength = sizeof(remoteAddr); +inline std::string logging_address(const sockaddr_storage& address) { + if(config::server::disable_ip_saving) + return "X.X.X.X" + to_string(net::port(address)); + return net::to_string(address, true); +} - int acceptedSocketFd = accept(serverSocket, (struct sockaddr *) &remoteAddr, &addrLength); - if (acceptedSocketFd < 0) { - if(errno == EAGAIN) { //No manager +inline void send_direct_disconnect(const sockaddr_storage& address, int file_descriptor, const char* message, size_t message_length) { + auto _non_block = [&]{ + int flags = fcntl(file_descriptor, F_GETFL, 0); + if (flags == -1) { + debugMessage(LOG_QUERY, "[{}] Failed to set socket to nonblock. Flag query failed ({} | {})", logging_address(address), errno, strerror(errno)); + return; + } + + flags &= ~O_NONBLOCK; + if(fcntl(file_descriptor, F_SETFL, flags) == -1) { + debugMessage(LOG_QUERY, "[{}] Failed to set socket to nonblock. Flag apply failed ({} | {})", logging_address(address), errno, strerror(errno)); + return; + } + }; + _non_block(); + + { + struct timeval timeout{}; + timeout.tv_sec = 5; + timeout.tv_usec = 0; + if (setsockopt (file_descriptor, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout)) < 0) + debugMessage(LOG_QUERY, "[{}] Failed to set the send timeout on socket", logging_address(address)); + } + + bool broken_pipe = false; + auto _send = [&](const char* data, size_t length) { + if(broken_pipe) + return; + + size_t written_bytes = 0; + while(written_bytes < length) { + auto result = send(file_descriptor, data + written_bytes, length - written_bytes, MSG_NOSIGNAL); + if(result <= 0) { + broken_pipe |= errno == EPIPE; + debugMessage(LOG_QUERY, "[{}] Failed to send a message of length {}. Bytes written: {}, error: {} | {}", logging_address(address), length, written_bytes, errno, strerror(errno)); + return; + } else { + written_bytes += result; + } + } + }; + + /* we could ignore errors here */ + _send(config::query::motd.data(), config::query::motd.size()); + _send(message, message_length); + + /* "flush" with the last new line and then close */ + int flag = 1; + if(setsockopt(file_descriptor, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int)) < 0) { + debugMessage(LOG_QUERY, "[{}] Failed to enabled TCP no delay to flush the direct query disconnect socket ({} | {}).", logging_address(address), errno, strerror(errno)); + } + _send(config::query::newlineCharacter.data(), config::query::newlineCharacter.size()); + + if(shutdown(file_descriptor, SHUT_RDWR) < 0) { + debugMessage(LOG_QUERY, "[{}] Failed to shutdown socket ({} | {}).", logging_address(address), errno, strerror(errno)); + } + if(close(file_descriptor) < 0) { + debugMessage(LOG_QUERY, "[{}] Failed to close socket ({} | {}).", logging_address(address), errno, strerror(errno)); + } +} + +//dummyfdflood +//dummyfdflood clear + +void QueryServer::on_client_receive(int _server_file_descriptor, short ev, void *arg) { + sockaddr_storage remote_address{}; + memset(&remote_address, 0, sizeof(sockaddr_in)); + socklen_t address_length = sizeof(remote_address); + + int file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length); + if (file_descriptor < 0) { + if(errno == EAGAIN) return; + + if(errno == EMFILE || errno == ENFILE) { + if(errno == EMFILE) + logError(LOG_QUERY, "Server ran out file descriptors. Please increase the process file descriptor limit or decrease the instance variable 'serverinstance_query_max_connections'"); + else + logError(LOG_QUERY, "Server ran out file descriptors. Please increase the process and system-wide file descriptor limit or decrease the instance variable 'serverinstance_query_max_connections'"); + + bool tmp_close_success = false; + { + lock_guard reserve_fd_lock(server_reserve_fd_lock); + if(this->server_reserve_fd > 0) { + debugMessage(LOG_QUERY, "Trying to accept client with the reserved file descriptor to send him a protocol limit reached exception."); + auto _ = [&]{ + if(close(this->server_reserve_fd) < 0) { + debugMessage(LOG_QUERY, "Failed to close reserved file descriptor"); + tmp_close_success = false; + return; + } + this->server_reserve_fd = 0; + + errno = 0; + file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length); + if(file_descriptor < 0) { + if(errno == EMFILE || errno == ENFILE) + debugMessage(LOG_QUERY, "[{}] Even with freeing the reserved descriptor accept failed. Attempting to reclaim reserved file descriptor", logging_address(remote_address)); + else if(errno == EAGAIN); + else { + debugMessage(LOG_QUERY, "[{}] Failed to accept client with reserved file descriptor. ({} | {})", logging_address(remote_address), errno, strerror(errno)); + } + this->server_reserve_fd = dup(1); + if(this->server_reserve_fd < 0) + debugMessage(LOG_QUERY, "[{}] Failed to reclaim reserved file descriptor. Future clients cant be accepted!", logging_address(remote_address)); + else + tmp_close_success = true; + return; + } + debugMessage(LOG_QUERY, "[{}] Successfully accepted client via reserved descriptor (fd: {}). Initializing socket and sending MOTD and disconnect.", logging_address(remote_address), file_descriptor); + + static auto resource_limit_error = R"(error id=57344 msg=query\sserver\sresource\slimit\sreached extra_msg=file\sdescriptor\slimit\sexceeded)"; + send_direct_disconnect(remote_address, file_descriptor, resource_limit_error, strlen(resource_limit_error)); + + this->server_reserve_fd = dup(1); + if(this->server_reserve_fd < 0) + debugMessage(LOG_QUERY, "Failed to reclaim reserved file descriptor. Future clients cant be accepted!"); + else + tmp_close_success = true; + logMessage(LOG_QUERY, "[{}] Dropping new query connection attempt because of too many open file descriptors.", logging_address(remote_address)); + }; + _(); + } + } + + if(!tmp_close_success) { + debugMessage(LOG_QUERY, "Sleeping two seconds because we're currently having no resources for this user. (Removing the accept event)"); + for(auto& binding : this->bindings) + event_del_noblock(binding->event_accept); + accept_event_deleted = system_clock::now(); + return; + } + return; } logMessage(LOG_QUERY, "Got an error while accepting a new client. (errno: {}, message: {})", errno, strerror(errno)); return; } + { + unique_lock lock(this->connected_clients_lock); + auto max_connections = serverInstance->properties()[property::SERVERINSTANCE_QUERY_MAX_CONNECTIONS].as(); + if(max_connections > 0 && max_connections <= this->connectedClients.size()) { + lock.unlock(); + logMessage(LOG_QUERY, "[{}] Dropping new query connection attempt because of too many connected query clients.", logging_address(remote_address)); + static auto query_server_full = R"(error id=4611 msg=max\sclients\sreached)"; + send_direct_disconnect(remote_address, file_descriptor, query_server_full, strlen(query_server_full)); + return; + } - shared_ptr client = std::make_shared(this, acceptedSocketFd); + auto max_ip_connections = serverInstance->properties()[property::SERVERINSTANCE_QUERY_MAX_CONNECTIONS_PER_IP].as(); + if(max_ip_connections > 0) { + size_t connection_count = 0; + for(auto& client : this->connectedClients) { + if(net::address_equal(client->remote_address, remote_address)) + connection_count++; + } + + if(connection_count >= max_ip_connections) { + lock.unlock(); + logMessage(LOG_QUERY, "[{}] Dropping new query connection attempt because of too many simultaneously connected session from this ip.", logging_address(remote_address)); + static auto query_server_full = R"(error id=4610 msg=too\smany\ssimultaneously\sconnected\ssessions)";// + send_direct_disconnect(remote_address, file_descriptor, query_server_full, strlen(query_server_full)); + return; + } + } + } + + shared_ptr client = std::make_shared(this, file_descriptor); client->applySelfLock(client); - memcpy(&client->remote_address, &remoteAddr, sizeof(sockaddr_in)); + memcpy(&client->remote_address, &remote_address, sizeof(remote_address)); - this->clientLock.lock(); - this->connectedClients.push_back(client); - this->clientLock.unlock(); + { + lock_guard lock(this->connected_clients_lock); + this->connectedClients.push_back(client); + } client->preInitialize(); - if(client->readEvent) - event_add(client->readEvent, nullptr); - logMessage(LOG_QUERY, "Got new client from {}", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort())); + if(client->readEvent) { + event_add(client->readEvent, nullptr); + } + logMessage(LOG_QUERY, "Got new client from {}", client->getLoggingPeerIp() + ":" + to_string(client->getPeerPort())); } /* @@ -351,7 +555,7 @@ void QueryServer::tick() { decltype(this->connectedClients) clCopy; { - threads::MutexLock lock(this->clientLock); + lock_guard lock(this->connected_clients_lock); clCopy = this->connectedClients; } for(const auto& cl : clCopy) cl->queryTick(); @@ -380,4 +584,10 @@ void QueryServer::tick() { this->loginAttempts.erase(ip); } } + if(this->accept_event_deleted.time_since_epoch().count() != 0 && accept_event_deleted + seconds(5) < system_clock::now()) { + debugMessage(LOG_QUERY, "Readding accept event and try again if we have enough resources again."); + for(auto& binding : this->bindings) + event_add(binding->event_accept, nullptr); + accept_event_deleted = system_clock::time_point{}; + } } \ No newline at end of file diff --git a/server/src/server/QueryServer.h b/server/src/server/QueryServer.h index 2d3f0c1..92cf9c4 100644 --- a/server/src/server/QueryServer.h +++ b/server/src/server/QueryServer.h @@ -12,6 +12,7 @@ #include #include "../Group.h" #include +#include #include "../manager/IpListManager.h" namespace ts { @@ -50,13 +51,20 @@ namespace ts { class QueryServer { friend class QueryClient; public: + struct Binding { + sockaddr_storage address{}; + int file_descriptor = 0; + ::event* event_accept = nullptr; + + inline std::string as_string() { return net::to_string(address, true); } + }; + explicit QueryServer(sql::SqlManager*); ~QueryServer(); - bool start(const sockaddr_in&, std::string&); + bool start(const std::deque>& /* bindings */, std::string& /* error */); void stop(); bool running(){ return active; } - sockaddr_in* boundedAddress(){ return boundAddress; } void unregisterConnection(const std::shared_ptr &); @@ -83,22 +91,24 @@ namespace ts { threads::ThreadPool* executePool() { return this->_executePool; } private: sql::SqlManager* sql; - sockaddr_in* boundAddress = nullptr; bool active = false; - int serverSocket; + std::deque> bindings; std::vector threads; + std::mutex server_reserve_fd_lock; + int server_reserve_fd = -1; /* -1 = unset | 0 = in use | > 0 ready to use */ + std::unique_ptr ip_whitelist; std::unique_ptr ip_blacklist; //IO stuff event_base* eventLoop = nullptr; - ::event* acceptEvent = nullptr; + std::chrono::system_clock::time_point accept_event_deleted; threads::ThreadPool* _executePool = nullptr; - threads::Mutex clientLock; + std::mutex connected_clients_lock; std::deque> connectedClients; threads::Mutex loginLock; @@ -107,8 +117,8 @@ namespace ts { std::map queryBann; threads::Thread* ioThread = nullptr; - threads::SchedulingTask tickingId = 0; - void onClientAccept(int fd, short ev, void *arg); + threads::SchedulingTask tickingId = nullptr; + void on_client_receive(int fd, short ev, void *arg); void tick(); }; } diff --git a/server/src/server/file/FileServer.cpp b/server/src/server/file/FileServer.cpp index 8efaee2..e6289db 100644 --- a/server/src/server/file/FileServer.cpp +++ b/server/src/server/file/FileServer.cpp @@ -63,7 +63,7 @@ std::shared_ptr FileServer::findFile(std::string path, std::sha fs::path absPath = fs::u8path(path); if(!fs::is_regular_file(absPath) && !fs::is_directory(absPath)){ - debugMessage(lstream << "Could not find requested file. Abs path: " << absPath << "|" << path << ". (path=" << path << ", parent=" << (parent ? parent->path + "/" + parent->name : "./") << ")"); + debugMessage(LOG_FT, "Could not find requested file. Abs path: {} | {}. (path={}, parent={})", absPath.string(), path, path, (parent ? parent->path + "/" + parent->name : "./")); return nullptr; } @@ -113,7 +113,7 @@ std::vector> FileServer::listFiles(std::shared_ entry->lastChanged = fs::last_write_time(elm.path()); result.push_back(entry); } else { - logError("Invalid file in file tree. File path: " + elm.path().string()); + logError(LOG_FT, "Invalid file in file tree. File path: " + elm.path().string()); } } @@ -207,7 +207,7 @@ std::shared_ptr FileServer::generateUploadTransferKey(st threads::MutexLock lock(this->keylock); pendingKeys.push_back(result); } - debugMessage("Created file upload key=" + result->key + " for " + targetFile + " (" + to_string(size) + " bytes)"); + debugMessage(LOG_FT, "Created file upload key=" + result->key + " for " + targetFile + " (" + to_string(size) + " bytes)"); return result; } @@ -216,8 +216,10 @@ std::shared_ptr FileServer::resolveDirectory(const shared_ptrcreateDirectory(path.string(), nullptr); path += subPath; - logMessage(lstream << "resolve " << path.string() << " -> " << findFile(path.string()) << " -> " << typeid(findFile(path.string())).name() << endl); - return static_pointer_cast(findFile(path.string())); + + auto ffile = findFile(path.string()); + debugMessage(LOG_FT, "Resolve {} => {} -> {}", path.string(), (void*) ffile.get(), typeid(ffile).name()); + return static_pointer_cast(ffile); } std::shared_ptr FileServer::iconDirectory(const shared_ptr &server) { @@ -268,56 +270,90 @@ void FileServer::deleteServer(const shared_ptr &server) { if(fs::exists(path)) { error_code error; if(fs::remove_all(path, error) == 0) - logError(0, "Could not delete server directory {} ({} | {})", path.string(), error.value(), error.message()); + logError(LOG_FT, "Could not delete server directory {} ({} | {})", path.string(), error.value(), error.message()); } else { - logError(0, "Could not delete missing server directory (" + path.string() + ")"); + logError(LOG_FT, "Could not delete missing server directory (" + path.string() + ")"); } } //The actual server! -bool FileServer::start(const sockaddr_in& localAdress) { - if(this->running()) return false; - +bool FileServer::start(const std::deque>& bindings, std::string& error) { + if(this->running()) { + error = "server already running"; + return false; + } this->active = true; - boundAddress = new sockaddr_in; - memcpy(boundAddress, &localAdress, sizeof(localAdress)); - this->serverSocket = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); - if (serverSocket < 0) { - logCritical(LOG_FT, "Cant create server socket for file server"); - return false; - } - - int enable = 1; - int disabled = 0; - if (setsockopt(serverSocket, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) { - logError(LOG_FT, "setsockopt(SO_REUSEADDR) failed"); - } - if(setsockopt(serverSocket, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) { - logError(LOG_FT, lstream << "Cant disable nopush! Error: "+to_string(errno)+" / "+strerror(errno) << endl); - } - if(fcntl(serverSocket, F_SETFD, FD_CLOEXEC) < 0) { - logError(LOG_QUERY, "Failed to enable FD_CLOEXEC for {} (QueryServer)", serverSocket); + /* reserve backup file descriptor in case that the max file descriptors have been reached */ + { + this->server_reserve_fd = dup(1); + if(this->server_reserve_fd < 0) + logWarning(LOG_FT, "Failed to reserve a backup accept file descriptor. ({} | {})", errno, strerror(errno)); } - if (bind(serverSocket, (struct sockaddr *) &localAdress, sizeof(localAdress)) < 0) { - logError(LOG_FT, lstream << "Cant bind server socket (" << strerror(errno) << ")" << endl); - return false; - } - if(listen(serverSocket, 255) < 0){ - logError(LOG_FT, lstream << "Cant listen on server socket (" << strerror(errno) << ")" << endl); - return false; - } + /* setup event bases */ + { + this->ioLoop = event_base_new(); + this->ioThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [&]{ + while(this->active) { + debugMessage(LOG_FT, "Entering event loop ({})", (void*) this->ioLoop); + event_base_loop(this->ioLoop, EVLOOP_NO_EXIT_ON_EMPTY); + if(this->active) { + debugMessage(LOG_FT, "Event loop exited ({}). No active events. Sleeping 1 seconds", (void*) this->ioLoop); + this_thread::sleep_for(seconds(1)); + } else { + debugMessage(LOG_FT, "Event loop exited ({})", (void*) this->ioLoop); + } + } + }); + this->ioThread->name("File IO #1").execute(); + } - ioLoop = event_base_new(); - this->acceptEvent = event_new(this->ioLoop, this->serverSocket, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((FileServer*) c)->onClientAccept(a, b, c); }, this); - event_add(this->acceptEvent, nullptr); + { + for(auto& binding : bindings) { + binding->file_descriptor = socket(binding->address.ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0); + if(binding->file_descriptor < 0) { + logError(LOG_FT, "Failed to bind server to {}. (Failed to create socket: {} | {})", binding->as_string(), errno, strerror(errno)); + continue; + } - this->ioThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [&](){ - event_base_dispatch(this->ioLoop); - debugMessage(LOG_FT, "File-Server accept thread terminated"); - }); - this->ioThread->name("File IO #1").execute(); + int enable = 1, disabled = 0; + + if (setsockopt(binding->file_descriptor, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) < 0) + logWarning(LOG_FT, "Failed to activate SO_REUSEADDR for binding {} ({} | {})", binding->as_string(), errno, strerror(errno)); + if(setsockopt(binding->file_descriptor, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) + logWarning(LOG_FT, "Failed to deactivate TCP_NOPUSH for binding {} ({} | {})", binding->as_string(), errno, strerror(errno)); + if(binding->address.ss_family == AF_INET6) { + if(setsockopt(binding->file_descriptor, IPPROTO_IPV6, IPV6_V6ONLY, &enable, sizeof(int)) < 0) + logWarning(LOG_FT, "Failed to activate IPV6_V6ONLY for IPv6 binding {} ({} | {})", binding->as_string(), errno, strerror(errno)); + } + if(fcntl(binding->file_descriptor, F_SETFD, FD_CLOEXEC) < 0) + logWarning(LOG_FT, "Failed to set flag FD_CLOEXEC for binding {} ({} | {})", binding->as_string(), errno, strerror(errno)); + + + if (bind(binding->file_descriptor, (struct sockaddr *) &binding->address, sizeof(binding->address)) < 0) { + logError(LOG_FT, "Failed to bind server to {}. (Failed to bind socket: {} | {})", binding->as_string(), errno, strerror(errno)); + close(binding->file_descriptor); + continue; + } + + if (listen(binding->file_descriptor, SOMAXCONN) < 0) { + logError(LOG_FT, "Failed to bind server to {}. (Failed to listen: {} | {})", binding->as_string(), errno, strerror(errno)); + close(binding->file_descriptor); + continue; + } + + binding->event_accept = event_new(this->ioLoop, binding->file_descriptor, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((FileServer *) c)->on_client_accept(a, b, c); }, this); + event_add(binding->event_accept, nullptr); + this->bindings.push_back(binding); + } + + if(this->bindings.empty()) { + this->stop(); + error = "failed to bind to any address"; + return false; + } + } for(int index = 0; index < 2; index++){ auto th = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, &FileServer::clientTickingExecutor, this); @@ -335,16 +371,26 @@ void FileServer::stop() { this->tickingCon.notify_all(); } - if(this->acceptEvent) { - event_del(this->acceptEvent); - event_free(this->acceptEvent); - this->acceptEvent = nullptr; - } + for(auto& binding : this->bindings) { + if(binding->event_accept) { + event_del_block(binding->event_accept); + event_free(binding->event_accept); + binding->event_accept = nullptr; + } + if(binding->file_descriptor > 0) { + if(shutdown(binding->file_descriptor, SHUT_RDWR) < 0) + logWarning(LOG_FT, "Failed to shutdown socket for binding {} ({} | {}).", binding->as_string(), errno, strerror(errno)); + if(close(binding->file_descriptor) < 0) + logError(LOG_FT, "Failed to close socket for binding {} ({} | {}).", binding->as_string(), errno, strerror(errno)); + binding->file_descriptor = -1; + } + } + this->bindings.clear(); - auto clClone = this->connectedClients; - for(const auto& cl : clClone) { - cl->disconnect(chrono::milliseconds(1)); - } + auto clClone = this->connectedClients; + for(const auto& cl : clClone) { + cl->disconnect(chrono::milliseconds(1)); + } if(this->ioLoop) { event_base_loopbreak(this->ioLoop); @@ -370,40 +416,135 @@ void FileServer::stop() { this->ioLoop = nullptr; } - delete this->boundAddress; - this->boundAddress = nullptr; - - if(this->serverSocket > 0){ - if(shutdown(this->serverSocket, SHUT_RDWR) < 0) logError(LOG_FT, "Could not shutdown server socket!"); - if(close(this->serverSocket) < 0) logError(LOG_FT, "Could not close server socket!"); - } - this->serverSocket = 0; + if(this->server_reserve_fd > 0) { + if(close(this->server_reserve_fd) < 0) + logError(LOG_FT, "Failed to close backup file descriptor ({} | {})", errno, strerror(errno)); + } + this->server_reserve_fd = -1; } +inline std::string logging_address(const sockaddr_storage& address) { + if(config::server::disable_ip_saving) + return "[0|X.X.X.X" + to_string(net::port(address)) + "|unconnected]"; + return "[0|X.X.X.X" + net::to_string(address, true) + "|unconnected]"; +} -void FileServer::onClientAccept(int fd, short ev, void *arg) { - sockaddr_in remoteAddr{}; - memset(&remoteAddr, 0, sizeof(sockaddr_in)); - socklen_t addrLength = sizeof(remoteAddr); +#define CLOSE_CONNECTION \ +if(shutdown(file_descriptor, SHUT_RDWR) < 0) { \ + debugMessage(LOG_FT, "[{}] Failed to shutdown socket ({} | {}).", logging_address(remote_address), errno, strerror(errno)); \ +} \ +if(close(file_descriptor) < 0) { \ + debugMessage(LOG_FT, "[{}] Failed to close socket ({} | {}).", logging_address(remote_address), errno, strerror(errno)); \ +} - int acceptedSocketFd = accept(serverSocket, (struct sockaddr *) &remoteAddr, &addrLength); - if (acceptedSocketFd < 0) { - if(errno == EAGAIN){ //No manager - return; - } - logError(LOG_FT, "Having an error while accepting a new client. ({}/{})", errno, strerror(errno)); - return; - } +void FileServer::on_client_accept(int _server_file_descriptor, short ev, void *arg) { + sockaddr_storage remote_address{}; + memset(&remote_address, 0, sizeof(remote_address)); + socklen_t address_length = sizeof(remote_address); - shared_ptr client = std::make_shared(this, acceptedSocketFd); + int file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length); + if (file_descriptor < 0) { + if(errno == EAGAIN) + return; + + if(errno == EMFILE || errno == ENFILE) { + if(errno == EMFILE) + logError(LOG_FT, "Server ran out file descriptors. Please increase the process file descriptor limit or decrease the instance variable 'serverinstance_filetransfer_max_connections'"); + else + logError(LOG_FT, "Server ran out file descriptors. Please increase the process and system-wide file descriptor limit or decrease the instance variable 'serverinstance_filetransfer_max_connections'"); + + bool tmp_close_success = false; + { + lock_guard reserve_fd_lock(server_reserve_fd_lock); + if(this->server_reserve_fd > 0) { + debugMessage(LOG_FT, "Trying to accept client with the reserved file descriptor to close the incomming connection."); + auto _ = [&]{ + if(close(this->server_reserve_fd) < 0) { + debugMessage(LOG_FT, "Failed to close reserved file descriptor"); + tmp_close_success = false; + return; + } + this->server_reserve_fd = 0; + + errno = 0; + file_descriptor = accept(_server_file_descriptor, (struct sockaddr *) &remote_address, &address_length); + if(file_descriptor < 0) { + if(errno == EMFILE || errno == ENFILE) + debugMessage(LOG_FT, "[{}] Even with freeing the reserved descriptor accept failed. Attempting to reclaim reserved file descriptor", logging_address(remote_address)); + else if(errno == EAGAIN); + else { + debugMessage(LOG_FT, "[{}] Failed to accept client with reserved file descriptor. ({} | {})", logging_address(remote_address), errno, strerror(errno)); + } + this->server_reserve_fd = dup(1); + if(this->server_reserve_fd < 0) + debugMessage(LOG_FT, "[{}] Failed to reclaim reserved file descriptor. Future clients cant be accepted!", logging_address(remote_address)); + else + tmp_close_success = true; + return; + } + debugMessage(LOG_FT, "[{}] Successfully accepted client via reserved descriptor (fd: {}). Disconnecting client.", logging_address(remote_address), file_descriptor); + + CLOSE_CONNECTION + this->server_reserve_fd = dup(1); + if(this->server_reserve_fd < 0) + debugMessage(LOG_FT, "Failed to reclaim reserved file descriptor. Future clients cant be accepted!"); + else + tmp_close_success = true; + logMessage(LOG_FT, "[{}] Dropping file transfer connection attempt because of too many open file descriptors.", logging_address(remote_address)); + }; + _(); + } + } + + if(!tmp_close_success) { + debugMessage(LOG_FT, "Sleeping two seconds because we're currently having no resources for this user. (Removing the accept event)"); + for(auto& binding : this->bindings) + event_del_noblock(binding->event_accept); + accept_event_deleted = system_clock::now(); + return; + } + return; + } + logMessage(LOG_FT, "Got an error while accepting a new client. (errno: {}, message: {})", errno, strerror(errno)); + return; + } + { + unique_lock lock(this->clientLock); + auto max_connections = serverInstance->properties()[property::SERVERINSTANCE_FILETRANSFER_MAX_CONNECTIONS].as(); + if(max_connections > 0 && max_connections <= this->connectedClients.size()) { + lock.unlock(); + logMessage(LOG_FT, "[{}] Dropping new connection attempt because of too many connected clients.", logging_address(remote_address)); + CLOSE_CONNECTION + return; + } + + auto max_ip_connections = serverInstance->properties()[property::SERVERINSTANCE_FILETRANSFER_MAX_CONNECTIONS_PER_IP].as(); + if(max_ip_connections > 0) { + size_t connection_count = 0; + for(auto& client : this->connectedClients) { + if(net::address_equal(client->remote_address, remote_address)) + connection_count++; + } + + if(connection_count >= max_ip_connections) { + lock.unlock(); + logMessage(LOG_FT, "[{}] Dropping new connection attempt because of too many simultaneously connected session from this ip.", logging_address(remote_address)); + CLOSE_CONNECTION + return; + } + } + } + + + shared_ptr client = std::make_shared(this, file_descriptor); client->_this = client; - memcpy(&client->remoteAddress, &remoteAddr, sizeof(sockaddr_in)); + memcpy(&client->remote_address, &remote_address, sizeof(remote_address)); this->clientLock.lock(); this->connectedClients.push_back(client); this->clientLock.unlock(); event_add(client->readEvent, nullptr); - logMessage(LOG_FT, "Got new client from {}", config::server::disable_ip_saving ? "X.X.X.X" : string(inet_ntoa(client->remoteAddress.sin_addr)) + ":" + to_string(ntohs(client->remoteAddress.sin_port))); + logMessage(LOG_FT, "[{}] Remote peer connected. Initializing session.", logging_address(remote_address)); } void FileServer::clientTickingExecutor() { @@ -468,6 +609,12 @@ void FileServer::instanceTick() { this->tickQueue.insert(this->tickQueue.end(), client.begin(), client.end()); //Tick all clients :) this->tickingCon.notify_all(); } + if(this->accept_event_deleted.time_since_epoch().count() != 0 && accept_event_deleted + seconds(5) < system_clock::now()) { + debugMessage(LOG_FT, "Readding accept event and try again if we have enough resources again."); + for(auto& binding : this->bindings) + event_add(binding->event_accept, nullptr); + accept_event_deleted = system_clock::time_point{}; + } auto now = system_clock::now(); if(timestamp_bandwidth_update + seconds(1) < now) { diff --git a/server/src/server/file/FileServer.h b/server/src/server/file/FileServer.h index dc06132..e864c6a 100644 --- a/server/src/server/file/FileServer.h +++ b/server/src/server/file/FileServer.h @@ -12,6 +12,7 @@ #include #include "Variable.h" #include +#include namespace ts { namespace file { @@ -78,15 +79,23 @@ namespace ts { class FileServer { friend class FileClient; public: + struct Binding { + sockaddr_storage address{}; + int file_descriptor = 0; + ::event* event_accept = nullptr; + + inline std::string as_string() { return net::to_string(address, true); } + }; + FileServer(); ~FileServer(); - bool start(const sockaddr_in&); + bool start(const std::deque>& /* bindings */, std::string& /* error */); void stop(); - bool running(){ return active; } - sockaddr_in* boundedAddress(){ return boundAddress; } + ts_always_inline bool running(){ return active; } + ts_always_inline std::deque> list_bindings() { return this->bindings; } - std::shared_ptr createDirectory(std::string name, std::shared_ptr parent); + std::shared_ptr createDirectory(std::string name, std::shared_ptr parent); bool fileExists(std::shared_ptr); bool fileExists(std::shared_ptr); std::shared_ptr findFile(std::string, std::shared_ptr = nullptr); @@ -112,15 +121,16 @@ namespace ts { std::deque> running_file_transfers(const std::shared_ptr & /* client */ = nullptr); std::deque> pending_file_transfers(const std::shared_ptr & /* client */ = nullptr); private: - sockaddr_in* boundAddress = nullptr;; - bool active = false; - int serverSocket; + std::deque> bindings; std::string rootPath = "./files/"; //IO stuff event_base* ioLoop = nullptr; - ::event* acceptEvent = nullptr; + std::chrono::system_clock::time_point accept_event_deleted; + + std::mutex server_reserve_fd_lock; + int server_reserve_fd = -1; /* -1 = unset | 0 = in use | > 0 ready to use */ threads::Mutex clientLock; std::deque> connectedClients; @@ -130,7 +140,7 @@ namespace ts { } threads::Thread* ioThread = nullptr; - void onClientAccept(int fd, short ev, void *arg); + void on_client_accept(int fd, short ev, void *arg); std::deque> tickQueue; std::deque tickingThreads; diff --git a/server/src/terminal/CommandHandler.cpp b/server/src/terminal/CommandHandler.cpp index a7ca03a..9ac9154 100644 --- a/server/src/terminal/CommandHandler.cpp +++ b/server/src/terminal/CommandHandler.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include "CommandHandler.h" #include "src/server/QueryServer.h" @@ -65,6 +66,8 @@ namespace terminal { handleCommandPermGrant(cmd); else if(cmd.lcommand == "dummycrash" || cmd.lcommand == "dummy_crash") handleCommandDummyCrash(cmd); + else if(cmd.lcommand == "dummyfdflood" || cmd.lcommand == "dummy_fdflood") + handleCommandDummyFdFlood(cmd); else if(cmd.lcommand == "meminfo") handleCommandMemInfo(cmd); else if(cmd.lcommand == "spoken") @@ -457,5 +460,38 @@ namespace terminal { logMessage("Monthly statistics will be reset"); return true; } + + deque fd_leaks; + bool handleCommandDummyFdFlood(TerminalCommand& cmd) { + size_t value; + if(cmd.arguments.size() < 1) { + value = 1024; + + rlimit limit{1024, 10000}; + setrlimit(7, &limit); + } else if(cmd.larguments[0] == "clear") { + logMessage("Clearup leaks"); + for(auto& fd : fd_leaks) + close(fd); + fd_leaks.clear(); + return; + } else { + value = cmd.arguments[0].as(); + } + + + logMessage("Leaking {} file descriptors", value); + size_t index = 0; + while(index < value) { + auto fd = dup(1); + if(fd < 0) + logMessage("Failed to create a file descriptor {} | {}", errno, strerror(errno)); + else + fd_leaks.push_back(fd); + + index++; + } + return true; + } } } \ No newline at end of file diff --git a/server/src/terminal/CommandHandler.h b/server/src/terminal/CommandHandler.h index 6ba0605..9aa8b5a 100644 --- a/server/src/terminal/CommandHandler.h +++ b/server/src/terminal/CommandHandler.h @@ -18,6 +18,7 @@ namespace terminal { extern void handleCommand(std::string); extern bool handleCommandDummyCrash(TerminalCommand&); + extern bool handleCommandDummyFdFlood(TerminalCommand&); extern bool handleCommandHelp(TerminalCommand&); extern bool handleCommandEnd(TerminalCommand&); diff --git a/shared b/shared index d9ddc2c..a0cca36 160000 --- a/shared +++ b/shared @@ -1 +1 @@ -Subproject commit d9ddc2c06d7731b14cae47f67a3414ce47e34bae +Subproject commit a0cca36eca11da410626a340dbe4377067d59c1b