From 5cfd2c45862eb91b8c05ce26a7048cf674f6f4db Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Fri, 8 Nov 2019 02:44:22 +0100 Subject: [PATCH] Some updates --- license/shared/License.cpp | 7 - server/main.cpp | 1 + server/src/ConnectionStatistics.cpp | 4 + server/src/DatabaseHelper.cpp | 17 +-- server/src/Group.cpp | 8 +- server/src/ShutdownHelper.cpp | 22 ++- server/src/ShutdownHelper.h | 2 +- .../client/ConnectedClientCommandHandler.cpp | 42 ++++-- server/src/client/music/Song.cpp | 3 +- server/src/client/web/WebClient.cpp | 7 +- server/src/geo/GeoLocation.cpp | 2 +- server/src/server/VoiceIOManager.cpp | 135 ++++++++++-------- server/src/server/VoiceIOManager.h | 3 +- server/src/terminal/CommandHandler.cpp | 2 +- 14 files changed, 145 insertions(+), 110 deletions(-) diff --git a/license/shared/License.cpp b/license/shared/License.cpp index 8f5433b..9656b6a 100644 --- a/license/shared/License.cpp +++ b/license/shared/License.cpp @@ -857,11 +857,4 @@ namespace license::v2 { return BodyInterpreter::_create(pub_key, begin, end, 0, buffer); } } - - static int test() { - uint8_t errc; - auto license = License::read(nullptr, 0, errc); - license->push_entry(system_clock::now(), system_clock::now(), "test!"); - return 0; - } } \ No newline at end of file diff --git a/server/main.cpp b/server/main.cpp index 5d80311..5ecf9dc 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -123,6 +123,7 @@ int main(int argc, char** argv) { { auto evthread_use_pthreads_result = evthread_use_pthreads(); assert(evthread_use_pthreads_result == 0); + (void) evthread_use_pthreads_result; } terminal::install(); if(!terminal::active()){ cerr << "could not setup terminal!" << endl; return -1; } diff --git a/server/src/ConnectionStatistics.cpp b/server/src/ConnectionStatistics.cpp index 928eb84..3eb64db 100644 --- a/server/src/ConnectionStatistics.cpp +++ b/server/src/ConnectionStatistics.cpp @@ -108,6 +108,7 @@ void ConnectionStatistics::_log_incoming_packet(ts::stats::StatisticEntry *info_ if(this->_measure_bandwidths) { auto lock_count = info_entry->use_count++; assert(lock_count >= 0); + (void) lock_count; lock_guard lock(this->history_lock_incoming); this->history_incoming.push_back(info_entry); @@ -136,6 +137,7 @@ void ConnectionStatistics::_log_outgoing_packet(ts::stats::StatisticEntry *info_ if(this->_measure_bandwidths) { auto lock_count = info_entry->use_count++; assert(lock_count >= 0); + (void) lock_count; lock_guard lock(this->history_lock_outgoing); this->history_outgoing.push_back(info_entry); @@ -159,6 +161,7 @@ void ConnectionStatistics::_log_incoming_file_packet(ts::stats::StatisticEntry * if(this->_measure_bandwidths) { auto lock_count = info_entry->use_count++; assert(lock_count >= 0); + (void) lock_count; lock_guard lock(this->history_lock_incoming); this->history_file_incoming.push_back(info_entry); @@ -182,6 +185,7 @@ void ConnectionStatistics::_log_outgoing_file_packet(ts::stats::StatisticEntry * if(this->_measure_bandwidths) { auto lock_count = info_entry->use_count++; assert(lock_count >= 0); + (void) lock_count; lock_guard lock(this->history_lock_outgoing); this->history_file_outgoing.push_back(info_entry); diff --git a/server/src/DatabaseHelper.cpp b/server/src/DatabaseHelper.cpp index 857ef96..0f001b0 100644 --- a/server/src/DatabaseHelper.cpp +++ b/server/src/DatabaseHelper.cpp @@ -309,7 +309,7 @@ std::shared_ptr DatabaseHelper::loadClientPermissionManag { lock_guard lock(permManagerLock); for(auto permMgr : this->cachedPermissionManagers) - if(permMgr->cldbid == cldbid && permMgr->sid == (server ? server->getServerId() : 0)){ + if(permMgr->cldbid == cldbid && permMgr->sid == (server ? server->getServerId() : 0)) { auto ptr = permMgr->manager.lock(); if(!ptr){ this->cachedPermissionManagers.erase(std::find(this->cachedPermissionManagers.begin(), this->cachedPermissionManagers.end(), permMgr)); @@ -323,7 +323,7 @@ std::shared_ptr DatabaseHelper::loadClientPermissionManag #endif logTrace(server_id, "[Permission] Loading client permission manager for client {}", cldbid); - auto pMgr = std::make_shared(); + auto permission_manager = std::make_shared(); bool loaded = false; if(this->use_startup_cache && server) { shared_ptr entry; @@ -342,20 +342,21 @@ std::shared_ptr DatabaseHelper::loadClientPermissionManag auto channel = perm->channelId > 0 ? server->getChannelTree()->findChannel(perm->channelId) : nullptr; if(channel) - pMgr->load_permission(perm->permission->type, {perm->value, perm->grant}, channel->channelId(), perm->flag_skip, perm->flag_negate, perm->value != permNotGranted, perm->grant != permNotGranted); + permission_manager->load_permission(perm->permission->type, {perm->value, perm->grant}, channel->channelId(), perm->flag_skip, perm->flag_negate, perm->value != permNotGranted, perm->grant != permNotGranted); else - pMgr->load_permission(perm->permission->type, {perm->value, perm->grant}, perm->flag_skip, perm->flag_negate, perm->value != permNotGranted, perm->grant != permNotGranted); + permission_manager->load_permission(perm->permission->type, {perm->value, perm->grant}, perm->flag_skip, perm->flag_negate, perm->value != permNotGranted, perm->grant != permNotGranted); } } loaded = true; } } + if(!loaded) { auto command = sql::command(this->sql, "SELECT `permId`, `value`, `channelId`, `grant`, `flag_skip`, `flag_negate` FROM `permissions` WHERE `serverId` = :serverId AND `type` = :type AND `id` = :id", variable{":serverId", server ? server->getServerId() : 0}, variable{":type", permission::SQL_PERM_USER}, variable{":id", cldbid}); - LOG_SQL_CMD(load_permissions_v2(server, pMgr.get(), command, true)); + LOG_SQL_CMD(load_permissions_v2(server, permission_manager.get(), command, true)); } @@ -363,14 +364,14 @@ std::shared_ptr DatabaseHelper::loadClientPermissionManag this->permManagerLock.lock(); auto entry = new CachedPermissionManager(); entry->sid = server_id; - entry->manager = pMgr; - entry->ownLock = pMgr; + entry->manager = permission_manager; + entry->ownLock = permission_manager; entry->cldbid = cldbid; entry->lastAccess = system_clock::now(); this->cachedPermissionManagers.push_back(entry); this->permManagerLock.unlock(); #endif - return pMgr; + return permission_manager; } diff --git a/server/src/Group.cpp b/server/src/Group.cpp index c2524aa..478a6b5 100644 --- a/server/src/Group.cpp +++ b/server/src/Group.cpp @@ -566,8 +566,9 @@ void GroupManager::enableCache(const ClientDbId& client_database_id) { //FIXME: This method till get far more often then it should be. We should add a flag if the group cache is loaded for each std::shared_ptr instance void GroupManager::disableCache(const ClientDbId& client_database_id) { - if(this->root) - this->root->disableCache(client_database_id); + if(this->root) { + this->root->disableCache(client_database_id); + } lock_guard cache_lock(this->cacheLock); this->cachedClients.erase(std::remove_if(this->cachedClients.begin(), this->cachedClients.end(), [&](const std::shared_ptr& client) { @@ -720,8 +721,7 @@ std::vector> GroupManager::getAssignedServerGro return result; } - debugMessage("DB query groups! for -> " + to_string(cldbid) + " - server " + to_string(this->getServerId())); - + debugMessage(this->getServerId(), "Query client groups for client {} on server {}.", cldbid, this->getServerId()); res = sql::command(this->sql, "SELECT `groupId`, `until` FROM `assignedGroups` WHERE `serverId` = :sid AND `cldbid` = :cldbid AND `channelId` = 0", variable{":sid", this->getServerId()}, variable{":cldbid", cldbid}).query([&](int length, char** value, char** column){ shared_ptr group = nullptr; time_point until; diff --git a/server/src/ShutdownHelper.cpp b/server/src/ShutdownHelper.cpp index 920cb24..02bc3e1 100644 --- a/server/src/ShutdownHelper.cpp +++ b/server/src/ShutdownHelper.cpp @@ -1,9 +1,6 @@ -// -// Created by wolverindev on 01.04.18. -// - #include #include +#include #include "ShutdownHelper.h" #include "InstanceHandler.h" @@ -13,7 +10,6 @@ using namespace ts; using namespace ts::server; extern bool mainThreadActive; -extern ts::server::InstanceHandler* serverInstance; bool shuttingDown = false; void ts::server::shutdownInstance(const std::string& message) { @@ -45,7 +41,9 @@ std::shared_ptr currentShutdown = nullptr; std::shared_ptr server::scheduledShutdown() { return currentShutdown; } inline void broadcastMessage(const std::string& message) { - if(!serverInstance || !serverInstance->getVoiceServerManager()); + if(!serverInstance || !serverInstance->getVoiceServerManager()) + return; + for(const auto &server : serverInstance->getVoiceServerManager()->serverInstances()) { if(server->running()) { server->broadcastMessage(server->getServerRoot(), message); @@ -62,8 +60,10 @@ bool server::scheduleShutdown(const std::chrono::system_clock::time_point& time, data->time_point = time; data->reason = reason; - data->shutdownThread = new threads::Thread(THREAD_EXECUTE_LATER | THREAD_SAVE_OPERATIONS, [data](){ executeScheduledShutdown(data); }); - data->shutdownThread->name("Shutdown Executor").execute(); + data->shutdown_thread = std::thread{[data]{ + executeScheduledShutdown(data); + }}; + threads::name(data->shutdown_thread, "Shutdown executor"); currentShutdown = data; return true; } @@ -77,12 +77,10 @@ void server::cancelShutdown(bool notify) { auto current = server::scheduledShutdown(); current->active = false; current->shutdownNotify.notify_all(); - if(current->shutdownThread->join(seconds(3)) != 0) { + if(!threads::save_join(current->shutdown_thread)) { logCritical("Could not terminal shutdown thread!"); + current->shutdown_thread.detach(); } - delete current->shutdownThread; - current->shutdownThread = nullptr; - currentShutdown = nullptr; } diff --git a/server/src/ShutdownHelper.h b/server/src/ShutdownHelper.h index b707983..85654de 100644 --- a/server/src/ShutdownHelper.h +++ b/server/src/ShutdownHelper.h @@ -12,7 +12,7 @@ namespace ts { std::chrono::system_clock::time_point time_point; bool active; - threads::Thread* shutdownThread = nullptr; + std::thread shutdown_thread{}; std::mutex shutdownMutex; std::condition_variable shutdownNotify; }; diff --git a/server/src/client/ConnectedClientCommandHandler.cpp b/server/src/client/ConnectedClientCommandHandler.cpp index 7e0087a..1d2ba81 100644 --- a/server/src/client/ConnectedClientCommandHandler.cpp +++ b/server/src/client/ConnectedClientCommandHandler.cpp @@ -5234,10 +5234,12 @@ CommandResult ConnectedClient::handleCommandClientAddPerm(Command &cmd) { CMD_RESET_IDLE; CMD_CHK_AND_INC_FLOOD_POINTS(5); - if(!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cmd["cldbid"])) return {findError("client_invalid_id"), "invalid client id"}; - auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cmd["cldbid"]); + auto cldbid = cmd["cldbid"].as(); + if(!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cldbid)) + return {findError("client_invalid_id"), "invalid client id"}; + auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cldbid); - PERM_CHECKR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cmd["cldbid"], permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, nullptr), true); + PERM_CHECKR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cldbid, permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, nullptr), true); auto maxValue = this->getPermissionGrantValue(permission::PERMTEST_ORDERED, permission::i_permission_modify_power, this->currentChannel); bool ignoreGrant = this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_permission_modify_power_ignore, 1, this->currentChannel); @@ -5260,7 +5262,8 @@ CommandResult ConnectedClient::handleCommandClientAddPerm(Command &cmd) { update_channels |= permission_is_client_property(permType); } } - auto onlineClients = this->server->findClientsByCldbId(cmd["cldbid"]); + serverInstance->databaseHelper()->saveClientPermissions(this->server, cldbid, mgr); + auto onlineClients = this->server->findClientsByCldbId(cldbid); if (!onlineClients.empty()) for (const auto &elm : onlineClients) { if(elm->update_cached_permissions()) /* update cached calculated permissions */ @@ -5278,9 +5281,11 @@ CommandResult ConnectedClient::handleCommandClientDelPerm(Command &cmd) { CMD_RESET_IDLE; CMD_CHK_AND_INC_FLOOD_POINTS(5); - if(!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cmd["cldbid"])) return {findError("client_invalid_id"), "invalid client id"}; - auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cmd["cldbid"]); - PERM_CHECKR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cmd["cldbid"], permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, nullptr), true); + auto cldbid = cmd["cldbid"].as(); + if(!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cldbid)) + return {findError("client_invalid_id"), "invalid client id"}; + auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cldbid); + PERM_CHECKR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cldbid, permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, nullptr), true); bool ignoreGrant = this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_permission_modify_power_ignore, 1, this->currentChannel); bool conOnError = cmd[0].has("continueonerror"); @@ -5300,6 +5305,8 @@ CommandResult ConnectedClient::handleCommandClientDelPerm(Command &cmd) { update_channel |= permission_is_client_property(permType); } } + + serverInstance->databaseHelper()->saveClientPermissions(this->server, cldbid, mgr); if (!onlineClients.empty()) for (const auto &elm : onlineClients) { if(elm->update_cached_permissions()) /* update cached calculated permissions */ @@ -5384,9 +5391,12 @@ CommandResult ConnectedClient::handleCommandChannelClientDelPerm(Command &cmd) { CMD_REF_SERVER(server_ref); CMD_RESET_IDLE; CMD_CHK_AND_INC_FLOOD_POINTS(5); - if (!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cmd["cldbid"])) return {findError("parameter_invalid"), "Invalid manager db id"}; - auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cmd["cldbid"]); + auto cldbid = cmd["cldbid"].as(); + if (!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cldbid)) + return {findError("parameter_invalid"), "Invalid manager db id"}; + + auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cldbid); PERM_CHECKR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cmd["cldbid"], permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, nullptr), true); RESOLVE_CHANNEL_R(cmd["cid"], true); @@ -5395,7 +5405,7 @@ CommandResult ConnectedClient::handleCommandChannelClientDelPerm(Command &cmd) { bool ignoreGrant = this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_permission_modify_power_ignore, 1, this->currentChannel); bool conOnError = cmd[0].has("continueonerror"), update_view = false; - auto cll = this->server->findClientsByCldbId(cmd["cldbid"]); + auto cll = this->server->findClientsByCldbId(cldbid); for (int index = 0; index < cmd.bulkCount(); index++) { PARSE_PERMISSION(cmd); @@ -5410,6 +5420,7 @@ CommandResult ConnectedClient::handleCommandChannelClientDelPerm(Command &cmd) { } } + serverInstance->databaseHelper()->saveClientPermissions(this->server, cldbid, mgr); if (!cll.empty()) { for (const auto &elm : cll) { if(elm->update_cached_permissions()) /* update cached calculated permissions */ @@ -5444,20 +5455,23 @@ CommandResult ConnectedClient::handleCommandChannelClientAddPerm(Command &cmd) { CMD_REF_SERVER(server_ref); CMD_RESET_IDLE; CMD_CHK_AND_INC_FLOOD_POINTS(5); - if (!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cmd["cldbid"])) return {findError("parameter_invalid"), "Invalid manager db id"}; + + auto cldbid = cmd["cldbid"].as(); + if (!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cldbid)) + return {findError("parameter_invalid"), "Invalid client db id"}; RESOLVE_CHANNEL_R(cmd["cid"], true); auto channel = dynamic_pointer_cast(l_channel->entry); if(!channel) return {ErrorType::VSError}; - auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cmd["cldbid"]); + auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cldbid); PERM_CHECK_CHANNELR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cmd["cldbid"], permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, channel), channel, true); auto maxValue = this->getPermissionGrantValue(permission::PERMTEST_ORDERED, permission::i_permission_modify_power, this->currentChannel); bool ignoreGrant = this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_permission_modify_power_ignore, 1, this->currentChannel); bool conOnError = cmd[0].has("continueonerror"); - auto onlineClientInstances = this->server->findClientsByCldbId(cmd["cldbid"]); + auto onlineClientInstances = this->server->findClientsByCldbId(cldbid); bool update_view = false; for (int index = 0; index < cmd.bulkCount(); index++) { PARSE_PERMISSION(cmd); @@ -5477,6 +5491,8 @@ CommandResult ConnectedClient::handleCommandChannelClientAddPerm(Command &cmd) { update_view = permType == permission::b_channel_ignore_view_power || permType == permission::i_channel_view_power; } } + + serverInstance->databaseHelper()->saveClientPermissions(this->server, cldbid, mgr); if (!onlineClientInstances.empty()) for (const auto &elm : onlineClientInstances) { if (elm->update_cached_permissions()) /* update cached calculated permissions */ diff --git a/server/src/client/music/Song.cpp b/server/src/client/music/Song.cpp index d3df5f9..aa5c9b5 100644 --- a/server/src/client/music/Song.cpp +++ b/server/src/client/music/Song.cpp @@ -1,4 +1,5 @@ #include +#include #include "Song.h" @@ -22,7 +23,7 @@ shared_ptr PlayableSong::get_loader(const std::shar if(function && ref) { weak_ptr weak_server = server; /* async loading */ - thread([weak_server, ref, future, function]{ + std::thread([weak_server, ref, future, function]{ auto server = weak_server.lock(); if(!server) { future->executionFailed("broken server reference"); diff --git a/server/src/client/web/WebClient.cpp b/server/src/client/web/WebClient.cpp index 353b940..c858767 100644 --- a/server/src/client/web/WebClient.cpp +++ b/server/src/client/web/WebClient.cpp @@ -460,7 +460,12 @@ void WebClient::handleMessage(const std::string &message) { }; this->voice_bridge->callback_failed = [&] { - this->voice_bridge.reset(); + auto vb_ptr = &*this->voice_bridge; /* read only no lock needed */ + std::thread([&, vb_ptr, lock = this->ref()]{ + unique_lock vbl{this->voice_bridge_lock}; + if(&*this->voice_bridge == vb_ptr) + this->voice_bridge.release(); + }).detach(); Json::Value response; response["type"] = "WebRTC"; diff --git a/server/src/geo/GeoLocation.cpp b/server/src/geo/GeoLocation.cpp index a23eaef..88319ce 100644 --- a/server/src/geo/GeoLocation.cpp +++ b/server/src/geo/GeoLocation.cpp @@ -17,7 +17,7 @@ std::shared_ptr RangedIPProviderBase::_resolveInfo(IpAddress_t address, bo int16_t index = this->index(beAddr); while(index >= 0) { - const auto &list = mapping[index]; + auto &list = mapping[index]; RangeEntryMapping* closest = nullptr; for(auto& entry : list) { if(entry.startAddress > beAddr) diff --git a/server/src/server/VoiceIOManager.cpp b/server/src/server/VoiceIOManager.cpp index 3f23d08..985bcb6 100644 --- a/server/src/server/VoiceIOManager.cpp +++ b/server/src/server/VoiceIOManager.cpp @@ -18,59 +18,68 @@ VoiceIOManager::VoiceIOManager(){ } VoiceIOManager::~VoiceIOManager() { } std::shared_ptr VoiceIOManager::enableIo(server::TSServer *server) { - shared_ptr io = std::make_shared(server); + auto server_io = std::make_shared(server); this->adjustExecutors(this->servers.size() + 1); - std::deque> blacklist; - for(int i = 0; i < config::threads::voice::events_per_server; i++){ - lock_guard l(this->executorLock); - auto loop = this->lowestIoLoop(blacklist); + + std::vector> use_list; + use_list.reserve(config::threads::voice::events_per_server); + + lock_guard executor_lock(this->executorLock); + for(size_t i = 0; i < config::threads::voice::events_per_server; i++){ + auto loop = this->less_used_io_loop(use_list); if(!loop) break; //No more loops open - io->create_event_loop_events(loop)->activate(); - blacklist.push_back(loop); + + server_io->create_event_loop_events(loop)->activate(); + use_list.push_back(loop); } { threads::MutexLock l(this->serverLock); - this->servers.push_back(io); + this->servers.push_back(server_io); } this->ioExecutorNotify.notify_all(); - return io; + return server_io; } void VoiceIOManager::disableIo(server::TSServer* server) { - std::shared_ptr io; + std::shared_ptr server_io; { threads::MutexLock l(this->serverLock); - for(const auto& elm : this->servers) - if(elm->server == server) { - io = elm; - break; - } - if(!io) return; - this->servers.erase(std::find(this->servers.begin(), this->servers.end(), io)); + for(const auto& sio : this->servers) { + if(sio->server == server) { + server_io = sio; + break; + } + } + if(!server_io) return; + this->servers.erase(std::find(this->servers.begin(), this->servers.end(), server_io),this->servers.end()); } - for(const auto& entry : io->event_loop_events) { + for(const auto& entry : server_io->event_loop_events) { entry->disable(); entry->despawn(); } - io->event_loop_events.clear(); + + server_io->event_loop_events.clear(); this->adjustExecutors(this->servers.size()); } void VoiceIOManager::shutdownGlobally() { - { - threads::MutexLock l(this->serverLock); - for(const auto& elm : this->servers) - for(const auto& e : elm->event_loop_events){ - e->disable(); - e->despawn(); + /* Unregister all servers */ + { + lock_guard server_lock(this->serverLock); + for(const auto& server : this->servers) + for(const auto& loop : server->event_loop_events){ + loop->disable(); + loop->despawn(); } } + + /* shutting down event loops */ { - lock_guard l(this->executorLock); + lock_guard executor_lock(this->executorLock); for(const auto& loop : this->event_loops) { loop->shutdown = true; event_base_loopexit(loop->base, nullptr); @@ -78,18 +87,27 @@ void VoiceIOManager::shutdownGlobally() { this->ioExecutorNotify.notify_all(); } - auto beg = system_clock::now(); + /* keep a ref to all event loops so they dont despawn in their event thread */ + unique_lock executor_lock{this->executorLock}; + auto event_loops = this->event_loops; + auto wait_end = system_clock::now() + chrono::seconds{5}; + while(true) { - { - lock_guard l(this->executorLock); - if(this->event_loops.empty()) break; - } - if(system_clock::now() - beg > seconds(2)) { - logCritical("Could not shutdown io loop!"); + if(this->event_loops.empty()) + break; + + auto status = this->ioExecutorNotify.wait_until(executor_lock, wait_end); + if(status == std::cv_status::timeout) { + logCritical(LOG_GENERAL, + "Failed to shutdown all event loops successfully. After timeout {} loops are left.", + this->event_loops.size() + ); break; } - threads::self::sleep_for(milliseconds(10)); } + + /* now delete all loops */ + event_loops.clear(); } //TODO also reduce thread pool! @@ -107,15 +125,7 @@ void VoiceIOManager::adjustExecutors(size_t size) { } IOEventLoop::~IOEventLoop() { - if(this->executor.joinable()) { - auto handle = this->executor.native_handle(); - timespec timeout{}; - clock_gettime(CLOCK_REALTIME, &timeout); - timeout.tv_sec += 5; - auto join_result = pthread_timedjoin_np(handle, nullptr, &timeout); - if(join_result == EBUSY) - logError(LOG_INSTANCE, "Failed to shutdown IO event loop. Error: {}", join_result); - } + assert(this_thread::get_id() != this->executor.get_id()); assert(!this->executor.joinable()); } @@ -129,8 +139,10 @@ std::shared_ptr VoiceIOManager::spawnEventLoop() { loop->bound_thread = -1; { +#ifndef WIN32 const auto name = "IO exec #" + to_string(this->event_loops.size()); pthread_setname_np(loop->executor.native_handle(), name.c_str()); +#endif const auto num_threads = std::thread::hardware_concurrency(); if(num_threads != 0 && config::threads::voice::bind_io_thread_to_kernel_thread) { @@ -169,7 +181,7 @@ std::shared_ptr VoiceIOManager::spawnEventLoop() { return loop; } -std::shared_ptr VoiceIOManager::lowestIoLoop(std::deque> blacklist) { +std::shared_ptr VoiceIOManager::less_used_io_loop(vector> &blacklist) { std::shared_ptr current; for(const auto& loop : this->event_loops) if(!current || loop->assigned_events.size() < current->assigned_events.size()) { @@ -278,7 +290,6 @@ void IOEventLoopEvents::despawn() { assert(std::find(event_loop_events.begin(), event_loop_events.end(), event) != event_loop_events.end()); event_loop_events.erase(std::find(event_loop_events.begin(), event_loop_events.end(), event)); - } } @@ -286,33 +297,37 @@ void IOEventLoopEvents::despawn() { } void VoiceIOManager::dispatchBase(shared_ptr self) { - debugMessage(lstream << "Dispatching io base " << self->base); - while(!event_base_got_exit(self->base)) { + debugMessage(LOG_INSTANCE, "Dispatching io base {}", (void*) self->base); + while(true) { + if(event_base_got_exit(self->base) || self->shutdown) + break; + event_base_loop(self->base, 0); { - unique_lock l(this->executorLock); - this->ioExecutorNotify.wait(l, [&](){ - if(event_base_got_exit(self->base) || self->shutdown) return true; - return !this->servers.empty(); - }); + /* wait until reschedule */ + unique_lock execute_lock(this->executorLock); + this->ioExecutorNotify.wait(execute_lock); } - if(event_base_get_num_events(self->base, EVENT_BASE_COUNT_ACTIVE) == 0) threads::self::sleep_for(seconds(1)); } - debugMessage(lstream << "Dispatching io base " << self->base << " finished!"); + debugMessage(LOG_INSTANCE, "Dispatching io base {} finished", (void*) self->base); { - unique_lock l(this->executorLock); + lock_guard executor_lock(this->executorLock); auto found = std::find(this->event_loops.begin(), this->event_loops.end(), self); - if(found != this->event_loops.end()) - this->event_loops.erase(found); - else - logCritical("Could not find executor in executor registry!"); + if(found != this->event_loops.end()) { + this->event_loops.erase(found); + } else { + logCritical(LOG_INSTANCE, "Could not find executor in executor registry ({})!", (void*) self->base); + } if(!self->assigned_events.empty()) { - logError("Event loop exited, but sill containing some events!"); + logError(LOG_INSTANCE, "Event loop exited, but sill containing some events ({})!", self->assigned_events.size()); } + event_base_free(self->base); self->base = nullptr; + + this->ioExecutorNotify.notify_all(); /* let everybody know we're done */ } } diff --git a/server/src/server/VoiceIOManager.h b/server/src/server/VoiceIOManager.h index de36ddf..eaadb12 100644 --- a/server/src/server/VoiceIOManager.h +++ b/server/src/server/VoiceIOManager.h @@ -211,12 +211,13 @@ namespace ts { void shutdownGlobally(); private: - std::shared_ptr lowestIoLoop(std::deque>); + std::shared_ptr less_used_io_loop(std::vector>&); threads::Mutex serverLock; std::deque> servers; std::mutex executorLock; + /* will be called as soon servers have been added or an event loop has been finished */ std::condition_variable ioExecutorNotify; std::deque> event_loops; diff --git a/server/src/terminal/CommandHandler.cpp b/server/src/terminal/CommandHandler.cpp index 762d183..708ae09 100644 --- a/server/src/terminal/CommandHandler.cpp +++ b/server/src/terminal/CommandHandler.cpp @@ -474,7 +474,7 @@ namespace terminal { for(auto& fd : fd_leaks) close(fd); fd_leaks.clear(); - return; + return true; } else { value = cmd.arguments[0].as(); }