Some updates

This commit is contained in:
WolverinDEV 2019-11-08 02:44:22 +01:00
parent 1cd1221217
commit 5cfd2c4586
14 changed files with 145 additions and 110 deletions

View File

@ -857,11 +857,4 @@ namespace license::v2 {
return BodyInterpreter::_create<Ephemeral>(pub_key, begin, end, 0, buffer);
}
}
static int test() {
uint8_t errc;
auto license = License::read(nullptr, 0, errc);
license->push_entry<hierarchy::Intermediate>(system_clock::now(), system_clock::now(), "test!");
return 0;
}
}

View File

@ -123,6 +123,7 @@ int main(int argc, char** argv) {
{
auto evthread_use_pthreads_result = evthread_use_pthreads();
assert(evthread_use_pthreads_result == 0);
(void) evthread_use_pthreads_result;
}
terminal::install();
if(!terminal::active()){ cerr << "could not setup terminal!" << endl; return -1; }

View File

@ -108,6 +108,7 @@ void ConnectionStatistics::_log_incoming_packet(ts::stats::StatisticEntry *info_
if(this->_measure_bandwidths) {
auto lock_count = info_entry->use_count++;
assert(lock_count >= 0);
(void) lock_count;
lock_guard lock(this->history_lock_incoming);
this->history_incoming.push_back(info_entry);
@ -136,6 +137,7 @@ void ConnectionStatistics::_log_outgoing_packet(ts::stats::StatisticEntry *info_
if(this->_measure_bandwidths) {
auto lock_count = info_entry->use_count++;
assert(lock_count >= 0);
(void) lock_count;
lock_guard lock(this->history_lock_outgoing);
this->history_outgoing.push_back(info_entry);
@ -159,6 +161,7 @@ void ConnectionStatistics::_log_incoming_file_packet(ts::stats::StatisticEntry *
if(this->_measure_bandwidths) {
auto lock_count = info_entry->use_count++;
assert(lock_count >= 0);
(void) lock_count;
lock_guard lock(this->history_lock_incoming);
this->history_file_incoming.push_back(info_entry);
@ -182,6 +185,7 @@ void ConnectionStatistics::_log_outgoing_file_packet(ts::stats::StatisticEntry *
if(this->_measure_bandwidths) {
auto lock_count = info_entry->use_count++;
assert(lock_count >= 0);
(void) lock_count;
lock_guard lock(this->history_lock_outgoing);
this->history_file_outgoing.push_back(info_entry);

View File

@ -309,7 +309,7 @@ std::shared_ptr<v2::PermissionManager> DatabaseHelper::loadClientPermissionManag
{
lock_guard<threads::Mutex> lock(permManagerLock);
for(auto permMgr : this->cachedPermissionManagers)
if(permMgr->cldbid == cldbid && permMgr->sid == (server ? server->getServerId() : 0)){
if(permMgr->cldbid == cldbid && permMgr->sid == (server ? server->getServerId() : 0)) {
auto ptr = permMgr->manager.lock();
if(!ptr){
this->cachedPermissionManagers.erase(std::find(this->cachedPermissionManagers.begin(), this->cachedPermissionManagers.end(), permMgr));
@ -323,7 +323,7 @@ std::shared_ptr<v2::PermissionManager> DatabaseHelper::loadClientPermissionManag
#endif
logTrace(server_id, "[Permission] Loading client permission manager for client {}", cldbid);
auto pMgr = std::make_shared<v2::PermissionManager>();
auto permission_manager = std::make_shared<v2::PermissionManager>();
bool loaded = false;
if(this->use_startup_cache && server) {
shared_ptr<StartupCacheEntry> entry;
@ -342,20 +342,21 @@ std::shared_ptr<v2::PermissionManager> DatabaseHelper::loadClientPermissionManag
auto channel = perm->channelId > 0 ? server->getChannelTree()->findChannel(perm->channelId) : nullptr;
if(channel)
pMgr->load_permission(perm->permission->type, {perm->value, perm->grant}, channel->channelId(), perm->flag_skip, perm->flag_negate, perm->value != permNotGranted, perm->grant != permNotGranted);
permission_manager->load_permission(perm->permission->type, {perm->value, perm->grant}, channel->channelId(), perm->flag_skip, perm->flag_negate, perm->value != permNotGranted, perm->grant != permNotGranted);
else
pMgr->load_permission(perm->permission->type, {perm->value, perm->grant}, perm->flag_skip, perm->flag_negate, perm->value != permNotGranted, perm->grant != permNotGranted);
permission_manager->load_permission(perm->permission->type, {perm->value, perm->grant}, perm->flag_skip, perm->flag_negate, perm->value != permNotGranted, perm->grant != permNotGranted);
}
}
loaded = true;
}
}
if(!loaded) {
auto command = sql::command(this->sql, "SELECT `permId`, `value`, `channelId`, `grant`, `flag_skip`, `flag_negate` FROM `permissions` WHERE `serverId` = :serverId AND `type` = :type AND `id` = :id",
variable{":serverId", server ? server->getServerId() : 0},
variable{":type", permission::SQL_PERM_USER},
variable{":id", cldbid});
LOG_SQL_CMD(load_permissions_v2(server, pMgr.get(), command, true));
LOG_SQL_CMD(load_permissions_v2(server, permission_manager.get(), command, true));
}
@ -363,14 +364,14 @@ std::shared_ptr<v2::PermissionManager> DatabaseHelper::loadClientPermissionManag
this->permManagerLock.lock();
auto entry = new CachedPermissionManager();
entry->sid = server_id;
entry->manager = pMgr;
entry->ownLock = pMgr;
entry->manager = permission_manager;
entry->ownLock = permission_manager;
entry->cldbid = cldbid;
entry->lastAccess = system_clock::now();
this->cachedPermissionManagers.push_back(entry);
this->permManagerLock.unlock();
#endif
return pMgr;
return permission_manager;
}

View File

@ -566,8 +566,9 @@ void GroupManager::enableCache(const ClientDbId& client_database_id) {
//FIXME: This method till get far more often then it should be. We should add a flag if the group cache is loaded for each std::shared_ptr<ConnectedClient> instance
void GroupManager::disableCache(const ClientDbId& client_database_id) {
if(this->root)
this->root->disableCache(client_database_id);
if(this->root) {
this->root->disableCache(client_database_id);
}
lock_guard cache_lock(this->cacheLock);
this->cachedClients.erase(std::remove_if(this->cachedClients.begin(), this->cachedClients.end(), [&](const std::shared_ptr<CachedClient>& client) {
@ -720,8 +721,7 @@ std::vector<std::shared_ptr<GroupAssignment>> GroupManager::getAssignedServerGro
return result;
}
debugMessage("DB query groups! for -> " + to_string(cldbid) + " - server " + to_string(this->getServerId()));
debugMessage(this->getServerId(), "Query client groups for client {} on server {}.", cldbid, this->getServerId());
res = sql::command(this->sql, "SELECT `groupId`, `until` FROM `assignedGroups` WHERE `serverId` = :sid AND `cldbid` = :cldbid AND `channelId` = 0", variable{":sid", this->getServerId()}, variable{":cldbid", cldbid}).query([&](int length, char** value, char** column){
shared_ptr<Group> group = nullptr;
time_point<system_clock> until;

View File

@ -1,9 +1,6 @@
//
// Created by wolverindev on 01.04.18.
//
#include <log/LogUtils.h>
#include <StringVariable.h>
#include <ThreadPool/ThreadHelper.h>
#include "ShutdownHelper.h"
#include "InstanceHandler.h"
@ -13,7 +10,6 @@ using namespace ts;
using namespace ts::server;
extern bool mainThreadActive;
extern ts::server::InstanceHandler* serverInstance;
bool shuttingDown = false;
void ts::server::shutdownInstance(const std::string& message) {
@ -45,7 +41,9 @@ std::shared_ptr<server::ShutdownData> currentShutdown = nullptr;
std::shared_ptr<server::ShutdownData> server::scheduledShutdown() { return currentShutdown; }
inline void broadcastMessage(const std::string& message) {
if(!serverInstance || !serverInstance->getVoiceServerManager());
if(!serverInstance || !serverInstance->getVoiceServerManager())
return;
for(const auto &server : serverInstance->getVoiceServerManager()->serverInstances()) {
if(server->running()) {
server->broadcastMessage(server->getServerRoot(), message);
@ -62,8 +60,10 @@ bool server::scheduleShutdown(const std::chrono::system_clock::time_point& time,
data->time_point = time;
data->reason = reason;
data->shutdownThread = new threads::Thread(THREAD_EXECUTE_LATER | THREAD_SAVE_OPERATIONS, [data](){ executeScheduledShutdown(data); });
data->shutdownThread->name("Shutdown Executor").execute();
data->shutdown_thread = std::thread{[data]{
executeScheduledShutdown(data);
}};
threads::name(data->shutdown_thread, "Shutdown executor");
currentShutdown = data;
return true;
}
@ -77,12 +77,10 @@ void server::cancelShutdown(bool notify) {
auto current = server::scheduledShutdown();
current->active = false;
current->shutdownNotify.notify_all();
if(current->shutdownThread->join(seconds(3)) != 0) {
if(!threads::save_join(current->shutdown_thread)) {
logCritical("Could not terminal shutdown thread!");
current->shutdown_thread.detach();
}
delete current->shutdownThread;
current->shutdownThread = nullptr;
currentShutdown = nullptr;
}

View File

@ -12,7 +12,7 @@ namespace ts {
std::chrono::system_clock::time_point time_point;
bool active;
threads::Thread* shutdownThread = nullptr;
std::thread shutdown_thread{};
std::mutex shutdownMutex;
std::condition_variable shutdownNotify;
};

View File

@ -5234,10 +5234,12 @@ CommandResult ConnectedClient::handleCommandClientAddPerm(Command &cmd) {
CMD_RESET_IDLE;
CMD_CHK_AND_INC_FLOOD_POINTS(5);
if(!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cmd["cldbid"])) return {findError("client_invalid_id"), "invalid client id"};
auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cmd["cldbid"]);
auto cldbid = cmd["cldbid"].as<ClientDbId>();
if(!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cldbid))
return {findError("client_invalid_id"), "invalid client id"};
auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cldbid);
PERM_CHECKR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cmd["cldbid"], permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, nullptr), true);
PERM_CHECKR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cldbid, permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, nullptr), true);
auto maxValue = this->getPermissionGrantValue(permission::PERMTEST_ORDERED, permission::i_permission_modify_power, this->currentChannel);
bool ignoreGrant = this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_permission_modify_power_ignore, 1, this->currentChannel);
@ -5260,7 +5262,8 @@ CommandResult ConnectedClient::handleCommandClientAddPerm(Command &cmd) {
update_channels |= permission_is_client_property(permType);
}
}
auto onlineClients = this->server->findClientsByCldbId(cmd["cldbid"]);
serverInstance->databaseHelper()->saveClientPermissions(this->server, cldbid, mgr);
auto onlineClients = this->server->findClientsByCldbId(cldbid);
if (!onlineClients.empty())
for (const auto &elm : onlineClients) {
if(elm->update_cached_permissions()) /* update cached calculated permissions */
@ -5278,9 +5281,11 @@ CommandResult ConnectedClient::handleCommandClientDelPerm(Command &cmd) {
CMD_RESET_IDLE;
CMD_CHK_AND_INC_FLOOD_POINTS(5);
if(!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cmd["cldbid"])) return {findError("client_invalid_id"), "invalid client id"};
auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cmd["cldbid"]);
PERM_CHECKR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cmd["cldbid"], permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, nullptr), true);
auto cldbid = cmd["cldbid"].as<ClientDbId>();
if(!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cldbid))
return {findError("client_invalid_id"), "invalid client id"};
auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cldbid);
PERM_CHECKR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cldbid, permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, nullptr), true);
bool ignoreGrant = this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_permission_modify_power_ignore, 1, this->currentChannel);
bool conOnError = cmd[0].has("continueonerror");
@ -5300,6 +5305,8 @@ CommandResult ConnectedClient::handleCommandClientDelPerm(Command &cmd) {
update_channel |= permission_is_client_property(permType);
}
}
serverInstance->databaseHelper()->saveClientPermissions(this->server, cldbid, mgr);
if (!onlineClients.empty())
for (const auto &elm : onlineClients) {
if(elm->update_cached_permissions()) /* update cached calculated permissions */
@ -5384,9 +5391,12 @@ CommandResult ConnectedClient::handleCommandChannelClientDelPerm(Command &cmd) {
CMD_REF_SERVER(server_ref);
CMD_RESET_IDLE;
CMD_CHK_AND_INC_FLOOD_POINTS(5);
if (!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cmd["cldbid"])) return {findError("parameter_invalid"), "Invalid manager db id"};
auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cmd["cldbid"]);
auto cldbid = cmd["cldbid"].as<ClientDbId>();
if (!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cldbid))
return {findError("parameter_invalid"), "Invalid manager db id"};
auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cldbid);
PERM_CHECKR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cmd["cldbid"], permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, nullptr), true);
RESOLVE_CHANNEL_R(cmd["cid"], true);
@ -5395,7 +5405,7 @@ CommandResult ConnectedClient::handleCommandChannelClientDelPerm(Command &cmd) {
bool ignoreGrant = this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_permission_modify_power_ignore, 1, this->currentChannel);
bool conOnError = cmd[0].has("continueonerror"), update_view = false;
auto cll = this->server->findClientsByCldbId(cmd["cldbid"]);
auto cll = this->server->findClientsByCldbId(cldbid);
for (int index = 0; index < cmd.bulkCount(); index++) {
PARSE_PERMISSION(cmd);
@ -5410,6 +5420,7 @@ CommandResult ConnectedClient::handleCommandChannelClientDelPerm(Command &cmd) {
}
}
serverInstance->databaseHelper()->saveClientPermissions(this->server, cldbid, mgr);
if (!cll.empty()) {
for (const auto &elm : cll) {
if(elm->update_cached_permissions()) /* update cached calculated permissions */
@ -5444,20 +5455,23 @@ CommandResult ConnectedClient::handleCommandChannelClientAddPerm(Command &cmd) {
CMD_REF_SERVER(server_ref);
CMD_RESET_IDLE;
CMD_CHK_AND_INC_FLOOD_POINTS(5);
if (!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cmd["cldbid"])) return {findError("parameter_invalid"), "Invalid manager db id"};
auto cldbid = cmd["cldbid"].as<ClientDbId>();
if (!serverInstance->databaseHelper()->validClientDatabaseId(this->server, cldbid))
return {findError("parameter_invalid"), "Invalid client db id"};
RESOLVE_CHANNEL_R(cmd["cid"], true);
auto channel = dynamic_pointer_cast<ServerChannel>(l_channel->entry);
if(!channel) return {ErrorType::VSError};
auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cmd["cldbid"]);
auto mgr = serverInstance->databaseHelper()->loadClientPermissionManager(this->server, cldbid);
PERM_CHECK_CHANNELR(permission::i_client_permission_modify_power, this->server->calculatePermission(permission::PERMTEST_ORDERED, cmd["cldbid"], permission::i_client_needed_permission_modify_power, ClientType::CLIENT_TEAMSPEAK, channel), channel, true);
auto maxValue = this->getPermissionGrantValue(permission::PERMTEST_ORDERED, permission::i_permission_modify_power, this->currentChannel);
bool ignoreGrant = this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_permission_modify_power_ignore, 1, this->currentChannel);
bool conOnError = cmd[0].has("continueonerror");
auto onlineClientInstances = this->server->findClientsByCldbId(cmd["cldbid"]);
auto onlineClientInstances = this->server->findClientsByCldbId(cldbid);
bool update_view = false;
for (int index = 0; index < cmd.bulkCount(); index++) {
PARSE_PERMISSION(cmd);
@ -5477,6 +5491,8 @@ CommandResult ConnectedClient::handleCommandChannelClientAddPerm(Command &cmd) {
update_view = permType == permission::b_channel_ignore_view_power || permType == permission::i_channel_view_power;
}
}
serverInstance->databaseHelper()->saveClientPermissions(this->server, cldbid, mgr);
if (!onlineClientInstances.empty())
for (const auto &elm : onlineClientInstances) {
if (elm->update_cached_permissions()) /* update cached calculated permissions */

View File

@ -1,4 +1,5 @@
#include <chrono>
#include <thread>
#include "Song.h"
@ -22,7 +23,7 @@ shared_ptr<PlayableSong::song_future_t> PlayableSong::get_loader(const std::shar
if(function && ref) {
weak_ptr weak_server = server;
/* async loading */
thread([weak_server, ref, future, function]{
std::thread([weak_server, ref, future, function]{
auto server = weak_server.lock();
if(!server) {
future->executionFailed("broken server reference");

View File

@ -460,7 +460,12 @@ void WebClient::handleMessage(const std::string &message) {
};
this->voice_bridge->callback_failed = [&] {
this->voice_bridge.reset();
auto vb_ptr = &*this->voice_bridge; /* read only no lock needed */
std::thread([&, vb_ptr, lock = this->ref()]{
unique_lock vbl{this->voice_bridge_lock};
if(&*this->voice_bridge == vb_ptr)
this->voice_bridge.release();
}).detach();
Json::Value response;
response["type"] = "WebRTC";

View File

@ -17,7 +17,7 @@ std::shared_ptr<void> RangedIPProviderBase::_resolveInfo(IpAddress_t address, bo
int16_t index = this->index(beAddr);
while(index >= 0) {
const auto &list = mapping[index];
auto &list = mapping[index];
RangeEntryMapping<void>* closest = nullptr;
for(auto& entry : list) {
if(entry.startAddress > beAddr)

View File

@ -18,59 +18,68 @@ VoiceIOManager::VoiceIOManager(){ }
VoiceIOManager::~VoiceIOManager() { }
std::shared_ptr<IOServerHandler> VoiceIOManager::enableIo(server::TSServer *server) {
shared_ptr<IOServerHandler> io = std::make_shared<IOServerHandler>(server);
auto server_io = std::make_shared<IOServerHandler>(server);
this->adjustExecutors(this->servers.size() + 1);
std::deque<shared_ptr<IOEventLoop>> blacklist;
for(int i = 0; i < config::threads::voice::events_per_server; i++){
lock_guard<mutex> l(this->executorLock);
auto loop = this->lowestIoLoop(blacklist);
std::vector<shared_ptr<IOEventLoop>> use_list;
use_list.reserve(config::threads::voice::events_per_server);
lock_guard<mutex> executor_lock(this->executorLock);
for(size_t i = 0; i < config::threads::voice::events_per_server; i++){
auto loop = this->less_used_io_loop(use_list);
if(!loop) break; //No more loops open
io->create_event_loop_events(loop)->activate();
blacklist.push_back(loop);
server_io->create_event_loop_events(loop)->activate();
use_list.push_back(loop);
}
{
threads::MutexLock l(this->serverLock);
this->servers.push_back(io);
this->servers.push_back(server_io);
}
this->ioExecutorNotify.notify_all();
return io;
return server_io;
}
void VoiceIOManager::disableIo(server::TSServer* server) {
std::shared_ptr<IOServerHandler> io;
std::shared_ptr<IOServerHandler> server_io;
{
threads::MutexLock l(this->serverLock);
for(const auto& elm : this->servers)
if(elm->server == server) {
io = elm;
break;
}
if(!io) return;
this->servers.erase(std::find(this->servers.begin(), this->servers.end(), io));
for(const auto& sio : this->servers) {
if(sio->server == server) {
server_io = sio;
break;
}
}
if(!server_io) return;
this->servers.erase(std::find(this->servers.begin(), this->servers.end(), server_io),this->servers.end());
}
for(const auto& entry : io->event_loop_events) {
for(const auto& entry : server_io->event_loop_events) {
entry->disable();
entry->despawn();
}
io->event_loop_events.clear();
server_io->event_loop_events.clear();
this->adjustExecutors(this->servers.size());
}
void VoiceIOManager::shutdownGlobally() {
{
threads::MutexLock l(this->serverLock);
for(const auto& elm : this->servers)
for(const auto& e : elm->event_loop_events){
e->disable();
e->despawn();
/* Unregister all servers */
{
lock_guard server_lock(this->serverLock);
for(const auto& server : this->servers)
for(const auto& loop : server->event_loop_events){
loop->disable();
loop->despawn();
}
}
/* shutting down event loops */
{
lock_guard<mutex> l(this->executorLock);
lock_guard<mutex> executor_lock(this->executorLock);
for(const auto& loop : this->event_loops) {
loop->shutdown = true;
event_base_loopexit(loop->base, nullptr);
@ -78,18 +87,27 @@ void VoiceIOManager::shutdownGlobally() {
this->ioExecutorNotify.notify_all();
}
auto beg = system_clock::now();
/* keep a ref to all event loops so they dont despawn in their event thread */
unique_lock executor_lock{this->executorLock};
auto event_loops = this->event_loops;
auto wait_end = system_clock::now() + chrono::seconds{5};
while(true) {
{
lock_guard<mutex> l(this->executorLock);
if(this->event_loops.empty()) break;
}
if(system_clock::now() - beg > seconds(2)) {
logCritical("Could not shutdown io loop!");
if(this->event_loops.empty())
break;
auto status = this->ioExecutorNotify.wait_until(executor_lock, wait_end);
if(status == std::cv_status::timeout) {
logCritical(LOG_GENERAL,
"Failed to shutdown all event loops successfully. After timeout {} loops are left.",
this->event_loops.size()
);
break;
}
threads::self::sleep_for(milliseconds(10));
}
/* now delete all loops */
event_loops.clear();
}
//TODO also reduce thread pool!
@ -107,15 +125,7 @@ void VoiceIOManager::adjustExecutors(size_t size) {
}
IOEventLoop::~IOEventLoop() {
if(this->executor.joinable()) {
auto handle = this->executor.native_handle();
timespec timeout{};
clock_gettime(CLOCK_REALTIME, &timeout);
timeout.tv_sec += 5;
auto join_result = pthread_timedjoin_np(handle, nullptr, &timeout);
if(join_result == EBUSY)
logError(LOG_INSTANCE, "Failed to shutdown IO event loop. Error: {}", join_result);
}
assert(this_thread::get_id() != this->executor.get_id());
assert(!this->executor.joinable());
}
@ -129,8 +139,10 @@ std::shared_ptr<IOEventLoop> VoiceIOManager::spawnEventLoop() {
loop->bound_thread = -1;
{
#ifndef WIN32
const auto name = "IO exec #" + to_string(this->event_loops.size());
pthread_setname_np(loop->executor.native_handle(), name.c_str());
#endif
const auto num_threads = std::thread::hardware_concurrency();
if(num_threads != 0 && config::threads::voice::bind_io_thread_to_kernel_thread) {
@ -169,7 +181,7 @@ std::shared_ptr<IOEventLoop> VoiceIOManager::spawnEventLoop() {
return loop;
}
std::shared_ptr<IOEventLoop> VoiceIOManager::lowestIoLoop(std::deque<std::shared_ptr<IOEventLoop>> blacklist) {
std::shared_ptr<IOEventLoop> VoiceIOManager::less_used_io_loop(vector<shared_ptr<IOEventLoop>> &blacklist) {
std::shared_ptr<IOEventLoop> current;
for(const auto& loop : this->event_loops)
if(!current || loop->assigned_events.size() < current->assigned_events.size()) {
@ -278,7 +290,6 @@ void IOEventLoopEvents::despawn() {
assert(std::find(event_loop_events.begin(), event_loop_events.end(), event) != event_loop_events.end());
event_loop_events.erase(std::find(event_loop_events.begin(), event_loop_events.end(), event));
}
}
@ -286,33 +297,37 @@ void IOEventLoopEvents::despawn() {
}
void VoiceIOManager::dispatchBase(shared_ptr<IOEventLoop> self) {
debugMessage(lstream << "Dispatching io base " << self->base);
while(!event_base_got_exit(self->base)) {
debugMessage(LOG_INSTANCE, "Dispatching io base {}", (void*) self->base);
while(true) {
if(event_base_got_exit(self->base) || self->shutdown)
break;
event_base_loop(self->base, 0);
{
unique_lock<mutex> l(this->executorLock);
this->ioExecutorNotify.wait(l, [&](){
if(event_base_got_exit(self->base) || self->shutdown) return true;
return !this->servers.empty();
});
/* wait until reschedule */
unique_lock<mutex> execute_lock(this->executorLock);
this->ioExecutorNotify.wait(execute_lock);
}
if(event_base_get_num_events(self->base, EVENT_BASE_COUNT_ACTIVE) == 0) threads::self::sleep_for(seconds(1));
}
debugMessage(lstream << "Dispatching io base " << self->base << " finished!");
debugMessage(LOG_INSTANCE, "Dispatching io base {} finished", (void*) self->base);
{
unique_lock<mutex> l(this->executorLock);
lock_guard executor_lock(this->executorLock);
auto found = std::find(this->event_loops.begin(), this->event_loops.end(), self);
if(found != this->event_loops.end())
this->event_loops.erase(found);
else
logCritical("Could not find executor in executor registry!");
if(found != this->event_loops.end()) {
this->event_loops.erase(found);
} else {
logCritical(LOG_INSTANCE, "Could not find executor in executor registry ({})!", (void*) self->base);
}
if(!self->assigned_events.empty()) {
logError("Event loop exited, but sill containing some events!");
logError(LOG_INSTANCE, "Event loop exited, but sill containing some events ({})!", self->assigned_events.size());
}
event_base_free(self->base);
self->base = nullptr;
this->ioExecutorNotify.notify_all(); /* let everybody know we're done */
}
}

View File

@ -211,12 +211,13 @@ namespace ts {
void shutdownGlobally();
private:
std::shared_ptr<IOEventLoop> lowestIoLoop(std::deque<std::shared_ptr<IOEventLoop>>);
std::shared_ptr<IOEventLoop> less_used_io_loop(std::vector<std::shared_ptr<IOEventLoop>>&);
threads::Mutex serverLock;
std::deque<std::shared_ptr<IOServerHandler>> servers;
std::mutex executorLock;
/* will be called as soon servers have been added or an event loop has been finished */
std::condition_variable ioExecutorNotify;
std::deque<std::shared_ptr<IOEventLoop>> event_loops;

View File

@ -474,7 +474,7 @@ namespace terminal {
for(auto& fd : fd_leaks)
close(fd);
fd_leaks.clear();
return;
return true;
} else {
value = cmd.arguments[0].as<size_t>();
}