#include #include #include "QueryClient.h" #include #include #include #include #include "src/InstanceHandler.h" #include #include using namespace std; using namespace std::chrono; using namespace ts; using namespace ts::server; #if defined(TCP_CORK) && !defined(TCP_NOPUSH) #define TCP_NOPUSH TCP_CORK #endif //#define DEBUG_TRAFFIC QueryClient::QueryClient(QueryServer* handle, int sockfd) : ConnectedClient(handle->sql, nullptr), handle(handle), clientFd(sockfd) { memtrack::allocated(this); int enabled = 1; int disabled = 0; setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &enabled, sizeof(enabled)); if(setsockopt(sockfd, IPPROTO_TCP, TCP_NOPUSH, &disabled, sizeof disabled) < 0) { logError(this->getServerId(), "[Query] Could not disable nopush for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this), errno, strerror(errno)); } if(setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &enabled, sizeof enabled) < 0) { logError(this->getServerId(), "[Query] Could not disable no delay for {} ({}/{})", CLIENT_STR_LOG_PREFIX_(this), errno, strerror(errno)); } this->readEvent = event_new(this->handle->eventLoop, this->clientFd, EV_READ | EV_PERSIST, [](int a, short b, void* c){ ((QueryClient*) c)->handleMessageRead(a, b, c); }, this); this->writeEvent = event_new(this->handle->eventLoop, this->clientFd, EV_WRITE, [](int a, short b, void* c){ ((QueryClient*) c)->handleMessageWrite(a, b, c); }, this); this->state = ConnectionState::CONNECTED; connectedTimestamp = system_clock::now(); this->resetEventMask(); } void QueryClient::applySelfLock(const std::shared_ptr &cl) { this->_this = cl; } QueryClient::~QueryClient() { memtrack::freed(this); // if(this->closeLock.tryLock() != 0) // logCritical("Query manager deleted, but is still in usage! (closeLock)"); // if(this->bufferLock.tryLock() != 0) // logCritical("Query manager deleted, but is still in usage! (bufferLock)"); this->ssl_handler.finalize(); } void QueryClient::preInitialize() { this->properties()[property::CLIENT_TYPE] = ClientType::CLIENT_QUERY; this->properties()[property::CLIENT_TYPE_EXACT] = ClientType::CLIENT_QUERY; this->properties()[property::CLIENT_UNIQUE_IDENTIFIER] = "UnknownQuery"; this->properties()[property::CLIENT_NICKNAME] = string() + "ServerQuery#" + this->getLoggingPeerIp() + "/" + to_string(this->getPeerPort()); DatabaseHelper::assignDatabaseId(this->sql, this->getServerId(), _this.lock()); if(ts::config::query::sslMode == 0) { this->connectionType = ConnectionType::PLAIN; this->postInitialize(); } } void QueryClient::postInitialize() { lock_guard lock(this->lock_packet_handle); /* we dont want to handle anything while we're initializing */ this->connectTimestamp = system_clock::now(); this->properties()[property::CLIENT_LASTCONNECTED] = duration_cast(this->connectTimestamp.time_since_epoch()).count(); if(ts::config::query::sslMode == 1 && this->connectionType != ConnectionType::SSL_ENCRIPTED) { command_result error{error::failed_connection_initialisation, "Please use a SSL encryption!"}; this->notifyError(error); error.release_data(); this->disconnect("Please us a SSL encryption for more security.\nThe server denies also all other connections!"); return; } writeMessage(config::query::motd); assert(this->handle); if(this->handle->ip_blacklist) { assert(this->handle->ip_blacklist); if(this->handle->ip_blacklist->contains(this->remote_address)) { Command cmd("error"); auto err = findError("client_login_not_permitted"); cmd["id"] = err.errorId; cmd["msg"] = err.message; cmd["extra_msg"] = "You're not permitted to use the query interface! (Your blacklisted)"; this->sendCommand(cmd); this->disconnect("blacklisted"); return;; } if(this->handle->ip_whitelist) this->whitelisted = this->handle->ip_whitelist->contains(this->remote_address); else this->whitelisted = false; debugMessage(LOG_QUERY, "Got new query client from {}. Whitelisted: {}", this->getLoggingPeerIp(), this->whitelisted); } if(!this->whitelisted) { threads::MutexLock lock(this->handle->loginLock); if(this->handle->queryBann.count(this->getPeerIp()) > 0) { auto ban = this->handle->queryBann[this->getPeerIp()]; Command cmd("error"); auto err = findError("server_connect_banned"); cmd["id"] = err.errorId; cmd["msg"] = err.message; cmd["extra_msg"] = "you may retry in " + to_string(duration_cast(ban - system_clock::now()).count()) + " seconds"; this->sendCommand(cmd); this->disconnect(""); } } this->update_cached_permissions(); } void QueryClient::writeMessage(const std::string& message) { if(this->state == ConnectionState::DISCONNECTED || !this->handle) return; if(this->connectionType == ConnectionType::PLAIN) this->writeRawMessage(message); else if(this->connectionType == ConnectionType::SSL_ENCRIPTED) this->ssl_handler.send(pipes::buffer_view{(void*) message.data(), message.length()}); else logCritical(LOG_GENERAL, "Invalid query connection type to write to!"); } bool QueryClient::disconnect(const std::string &reason) { if(!reason.empty()) { Command cmd("disconnect"); cmd["reason"] = reason; this->sendCommand(cmd); } return this->close_connection(system_clock::now() + seconds(1)); } bool QueryClient::close_connection(const std::chrono::system_clock::time_point& flushTimeout) { auto ownLock = dynamic_pointer_cast(_this.lock()); if(!ownLock) return false; unique_lock handleLock(this->lock_packet_handle); unique_lock lock(this->closeLock); bool flushing = flushTimeout.time_since_epoch().count() != 0; if(this->state == ConnectionState::DISCONNECTED || (flushing && this->state == ConnectionState::DISCONNECTING)) return false; this->state = flushing ? ConnectionState::DISCONNECTING : ConnectionState::DISCONNECTED; if(this->readEvent) { //Attention dont trigger this within the read thread! event_del_block(this->readEvent); event_free(this->readEvent); this->readEvent = nullptr; } if(this->server){ { unique_lock channel_lock(this->server->channel_tree_lock); this->server->unregisterClient(_this.lock(), "disconnected", channel_lock); } this->server->groups->disableCache(this->getClientDatabaseId()); this->server = nullptr; } if(flushing){ this->flushThread = new threads::Thread(THREAD_SAVE_OPERATIONS | THREAD_EXECUTE_LATER, [ownLock, flushTimeout](){ while(ownLock->state == ConnectionState::DISCONNECTING && flushTimeout > system_clock::now()){ { std::lock_guard buffer_lock(ownLock->buffer_lock); if(ownLock->readQueue.empty() && ownLock->writeQueue.empty()) break; } usleep(10 * 1000); } if(ownLock->state == ConnectionState::DISCONNECTING) ownLock->disconnectFinal(); }); flushThread->name("Flush thread QC").execute(); } else { threads::MutexLock l1(this->flushThreadLock); handleLock.unlock(); lock.unlock(); if(this->flushThread){ threads::NegatedMutexLock l(this->closeLock); this->flushThread->join(); } disconnectFinal(); } return true; } void QueryClient::disconnectFinal() { lock_guard lock_tick(this->lock_query_tick); lock_guard lock_handle(this->lock_packet_handle); threads::MutexLock lock_close(this->closeLock); std::unique_lock buffer_lock(this->buffer_lock, try_to_lock); if(final_disconnected) { logError(LOG_QUERY, "Tried to disconnect a client twice!"); return; } final_disconnected = true; this->state = ConnectionState::DISCONNECTED; { threads::MutexTryLock l(this->flushThreadLock); if(!!l) { if(this->flushThread) { this->flushThread->detach(); delete this->flushThread; //Release the captured this lock this->flushThread = nullptr; } } } if(this->writeEvent) { event_del_block(this->writeEvent); event_free(this->writeEvent); this->writeEvent = nullptr; } if(this->readEvent) { event_del_block(this->readEvent); event_free(this->readEvent); this->readEvent = nullptr; } if(this->clientFd > 0) { if(shutdown(this->clientFd, SHUT_RDWR) < 0) debugMessage(LOG_QUERY, "Could not shutdown query client socket! {} ({})", errno, strerror(errno)); if(close(this->clientFd) < 0) debugMessage(LOG_QUERY, "Failed to close the query client socket! {} ({})", errno, strerror(errno)); this->clientFd = -1; } if(this->server) { { unique_lock channel_lock(this->server->channel_tree_lock); this->server->unregisterClient(_this.lock(), "disconnected", channel_lock); } this->server->groups->disableCache(this->getClientDatabaseId()); this->server = nullptr; } this->readQueue.clear(); this->writeQueue.clear(); if(this->handle) this->handle->unregisterConnection(dynamic_pointer_cast(_this.lock())); } void QueryClient::writeRawMessage(const std::string &message) { { std::lock_guard lock(this->buffer_lock); this->writeQueue.push_back(message); } if(this->writeEvent) event_add(this->writeEvent, nullptr); } void QueryClient::handleMessageWrite(int fd, short, void *) { auto ownLock = _this.lock(); std::unique_lock buffer_lock(this->buffer_lock, try_to_lock); if(this->state == ConnectionState::DISCONNECTED) return; if(!buffer_lock.owns_lock()) { if(this->writeEvent) event_add(this->writeEvent, nullptr); return; } int writes = 0; string buffer; while(writes < 10 && !this->writeQueue.empty()) { if(buffer.empty()) { buffer = std::move(this->writeQueue.front()); this->writeQueue.pop_front(); } auto length = send(fd, buffer.data(), buffer.length(), MSG_NOSIGNAL); #ifdef DEBUG_TRAFFIC debugMessage("Write " + to_string(buffer.length())); hexDump((void *) buffer.data(), buffer.length()); #endif if(length == -1) { if (errno == EINTR || errno == EAGAIN) { if(this->writeEvent) event_add(this->writeEvent, nullptr); return; } else { logError(LOG_QUERY, "{} Failed to write message: {} ({} => {})", CLIENT_STR_LOG_PREFIX, length, errno, strerror(errno)); threads::Thread([=](){ ownLock->close_connection(chrono::system_clock::now() + chrono::seconds{5}); }).detach(); return; } } else { if(buffer.length() == length) buffer = ""; else buffer = buffer.substr(length); } writes++; } if(!buffer.empty()) this->writeQueue.push_front(buffer); if(!this->writeQueue.empty() && this->writeEvent) event_add(this->writeEvent, nullptr); } void QueryClient::handleMessageRead(int fd, short, void *) { auto ownLock = dynamic_pointer_cast(_this.lock()); if(!ownLock) { logCritical(LOG_QUERY, "Could not get own lock!"); return; } string buffer(1024, 0); auto length = read(fd, (void*) buffer.data(), buffer.length()); if(length <= 0){ if(errno == EINTR || errno == EAGAIN) ;//event_add(this->readEvent, nullptr); else if(length == 0 && errno == 0) { logMessage(LOG_QUERY, "{} Connection closed. Client disconnected.", CLIENT_STR_LOG_PREFIX); event_del_noblock(this->readEvent); std::thread([ownLock]{ ownLock->close_connection(); }).detach(); } else { logError(LOG_QUERY, "{} Failed to read! Code: {} errno: {} message: {}", CLIENT_STR_LOG_PREFIX, length, errno, strerror(errno)); event_del_noblock(this->readEvent); threads::Thread(THREAD_SAVE_OPERATIONS, [ownLock](){ ownLock->close_connection(); }).detach(); } return; } buffer.resize(length); { std::lock_guard buffer_lock(this->buffer_lock); if(this->state == ConnectionState::DISCONNECTED) return; this->readQueue.push_back(std::move(buffer)); #ifdef DEBUG_TRAFFIC debugMessage("Read " + to_string(buffer.length())); hexDump((void *) buffer.data(), buffer.length()); #endif } if(this->handle) this->handle->executePool()->execute([ownLock]() { int counter = 0; while(ownLock->tickIOMessageProgress() && counter++ < 15); }); } bool QueryClient::tickIOMessageProgress() { lock_guard lock(this->lock_packet_handle); if(!this->handle || this->state == ConnectionState::DISCONNECTED || this->state == ConnectionState::DISCONNECTING) return false; string message; bool next = false; { std::lock_guard buffer_lock(this->buffer_lock); if(this->readQueue.empty()) return false; message = std::move(this->readQueue.front()); this->readQueue.pop_front(); next |= this->readQueue.empty(); } if(this->connectionType == ConnectionType::PLAIN) { int count = 0; while(this->handleMessage(pipes::buffer_view{(void*) message.data(), message.length()}) && count++ < 15) message = ""; next |= count == 15; } else if(this->connectionType == ConnectionType::SSL_ENCRIPTED) { this->ssl_handler.process_incoming_data(pipes::buffer_view{(void*) message.data(), message.length()}); } else if(this->connectionType == ConnectionType::UNKNOWN) { if(config::query::sslMode != 0 && pipes::SSL::isSSLHeader(message)) { this->initializeSSL(); /* * - Content * \x16 * -Version (1) * \x03 \x00 * - length (2) * \x00 \x04 * * - Header * \x00 -> hello request (3) * \x05 -> length (4) */ //this->writeRawMessage(string("\x16\x03\x01\x00\x05\x00\x00\x00\x00\x00", 10)); } else { this->connectionType = ConnectionType::PLAIN; this->postInitialize(); } next = true; { std::lock_guard buffer_lock(this->buffer_lock); this->readQueue.push_front(std::move(message)); } } return next; } extern InstanceHandler* serverInstance; void QueryClient::initializeSSL() { this->connectionType = ConnectionType::SSL_ENCRIPTED; this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_OUT, true); this->ssl_handler.direct_process(pipes::PROCESS_DIRECTION_IN, true); this->ssl_handler.callback_data(std::bind(&QueryClient::handleMessage, this, placeholders::_1)); this->ssl_handler.callback_write(std::bind(&QueryClient::writeRawMessage, this, placeholders::_1)); this->ssl_handler.callback_initialized = std::bind(&QueryClient::postInitialize, this); this->ssl_handler.callback_error([&](int code, const std::string& message) { if(code == PERROR_SSL_ACCEPT) { this->disconnect("invalid accept"); } else if(code == PERROR_SSL_TIMEOUT) this->disconnect("invalid accept (timeout)"); else logError(LOG_QUERY, "Got unknown ssl error ({} | {})", code, message); }); { auto context = serverInstance->sslManager()->getQueryContext(); auto options = make_shared(); options->type = pipes::SSL::SERVER; options->context_method = TLS_method(); options->default_keypair({context->privateKey, context->certificate}); if(!this->ssl_handler.initialize(options)) { logError(LOG_QUERY, "[{}] Failed to setup ssl!", CLIENT_STR_LOG_PREFIX); } } } bool QueryClient::handleMessage(const pipes::buffer_view& message) { { threads::MutexLock l(this->closeLock); if(this->state == ConnectionState::DISCONNECTED) return false; } #ifdef DEBUG_TRAFFIC debugMessage("Handling message " + to_string(message.length())); hexDump((void *) message.data(), message.length()); #endif string command; { this->lineBuffer += message.string(); int length = 2; auto pos = this->lineBuffer.find("\r\n"); if(pos == string::npos) pos = this->lineBuffer.find("\n\r"); if(pos == string::npos) { length = 1; pos = this->lineBuffer.find('\n'); } if(pos != string::npos){ command = this->lineBuffer.substr(0, pos); if(this->lineBuffer.size() > pos + length) this->lineBuffer = this->lineBuffer.substr(pos + length); else this->lineBuffer.clear(); } if(pos == string::npos) return false; } if(command.empty() || command.find_first_not_of(' ') == string::npos) { //Empty command logTrace(LOG_QUERY, "[{}:{}] Got query idle command (Empty command or spaces)", this->getLoggingPeerIp(), this->getPeerPort()); CMD_RESET_IDLE; //if idle time over 5 min than connection drop return true; } if(auto non_escape{command.find_first_not_of('\r')}; non_escape == std::string::npos) { logTrace(LOG_QUERY, "[{}:{}] Got query idle command (\\r)", this->getLoggingPeerIp(), this->getPeerPort()); CMD_RESET_IDLE; //if idle time over 5 min than connection drop return true; } else { command = command.substr(non_escape); } if(auto non_escape{command.find_first_not_of('\n')}; non_escape == std::string::npos) { logTrace(LOG_QUERY, "[{}:{}] Got query idle command (\\n)", this->getLoggingPeerIp(), this->getPeerPort()); CMD_RESET_IDLE; //if idle time over 5 min than connection drop return true; } else { command = command.substr(non_escape); } if((uint8_t) command[0] == 255) { string commands{}; /* we got a telnet command here */ while(command.size() >= 2 && (uint8_t) command[0] == 255) { uint8_t code = command[1]; uint8_t option = command[2]; if(!commands.empty()) commands += ", "; commands += to_string(code) + ":" + to_string(option); command = command.substr(3); } logTrace(LOG_QUERY, "[{}:{}] Received telnet command(s): {}. Ignoring it.",this->getLoggingPeerIp(), this->getPeerPort(), commands); CMD_RESET_IDLE; if(command.empty()) return true; } unique_ptr cmd; command_result error{}; try { cmd = make_unique(Command::parse(pipes::buffer_view{(void*) command.data(), command.length()}, true, !ts::config::server::strict_ut8_mode)); } catch(std::invalid_argument& ex) { logTrace(LOG_QUERY, "[{}:{}] Failed to parse command (invalid argument): {}", this->getLoggingPeerIp(), this->getPeerPort(), command); error.reset(command_result{error::parameter_convert}); goto handle_error; } catch(std::exception& ex) { logTrace(LOG_QUERY, "[{}:{}] Failed to parse command (exception: {}): {}", this->getLoggingPeerIp(), this->getPeerPort(), ex.what(), command); error.reset(command_result{error::vs_critical, std::string{ex.what()}}); goto handle_error; } try { this->handleCommandFull(*cmd); } catch(std::exception& ex) { error.reset(command_result{error::vs_critical, std::string{ex.what()}}); goto handle_error; } return true; handle_error: this->notifyError(error); error.release_data(); return false; } void QueryClient::sendCommand(const ts::Command &command, bool) { auto cmd = command.build(); writeMessage(cmd + config::query::newlineCharacter); logTrace(LOG_QUERY, "Send command {}", cmd); } void QueryClient::sendCommand(const ts::command_builder &command, bool) { writeMessage(command.build() + config::query::newlineCharacter); logTrace(LOG_QUERY, "Send command {}", command.build()); } void QueryClient::tick(const std::chrono::system_clock::time_point &time) { ConnectedClient::tick(time); } void QueryClient::queryTick() { lock_guard lock_tick(this->lock_query_tick); if(this->idleTimestamp.time_since_epoch().count() > 0 && system_clock::now() - this->idleTimestamp > minutes(5)){ debugMessage(LOG_QUERY, "Dropping client " + this->getLoggingPeerIp() + "|" + this->getDisplayName() + ". (Timeout)"); this->close_connection(system_clock::now() + seconds(1)); } if(this->connectionType == ConnectionType::UNKNOWN && system_clock::now() - milliseconds(500) > connectedTimestamp) { this->connectionType = ConnectionType::PLAIN; this->postInitialize(); } } bool QueryClient::notifyChannelSubscribed(const deque> &) { return false; } bool QueryClient::notifyChannelUnsubscribed(const deque> &){ return false; } bool QueryClient::ignoresFlood() { return this->whitelisted || ConnectedClient::ignoresFlood(); } void QueryClient::disconnect_from_virtual_server() { threads::MutexLock lock(this->command_lock); auto server_locked = this->server; if(server_locked) { //unregister manager from old server { unique_lock tree_lock(this->server->channel_tree_lock); if(this->currentChannel) this->server->client_move(this->ref(), nullptr, nullptr, "", ViewReasonId::VREASON_USER_ACTION, false, tree_lock); this->server->unregisterClient(_this.lock(), "server switch", tree_lock); } server_locked->groups->disableCache(this->getClientDatabaseId()); this->channels->reset(); this->currentChannel = nullptr; this->server = nullptr; this->loadDataForCurrentServer(); } serverInstance->getGroupManager()->enableCache(this->getClientDatabaseId()); }