Using tasks instead of inlining certain client updates

This commit is contained in:
WolverinDEV 2021-02-21 21:56:51 +01:00
parent 84a08c469b
commit 6e99fc1ab4
4 changed files with 649 additions and 5 deletions

View File

@ -104,6 +104,7 @@ set(SOURCE_FILES
src/misc/digest.cpp
src/misc/base64.cpp
src/misc/net.cpp
src/misc/task_executor.cpp
src/lock/rw_mutex.cpp
@ -155,6 +156,7 @@ set(HEADER_FILES
src/misc/hex.h
src/misc/advanced_mutex.h
src/misc/strobf.h
src/misc/task_executor.h
src/protocol/buffers.h
src/protocol/Packet.h

View File

@ -823,6 +823,13 @@ namespace ts {
template <typename T>
PropertyWrapper& operator=(const T& value) {
this->update_value(value);
return *this;
}
template <typename T>
bool update_value(const T& value) {
static_assert(ts::converter<T>::supported, "type isn't supported for type");
{
@ -830,17 +837,18 @@ namespace ts {
auto value_string = ts::converter<T>::to_string(any_value);
std::lock_guard lock(this->data_ptr->value_lock);
if(value_string == this->data_ptr->value)
return *this;
if(value_string == this->data_ptr->value) {
return false;
}
this->data_ptr->casted_value = any_value;
this->data_ptr->value = value_string;
}
this->trigger_update();
return *this;
return true;
}
PropertyWrapper& operator=(const std::string& value) {
this->value(value);
return *this;

353
src/misc/task_executor.cpp Normal file
View File

@ -0,0 +1,353 @@
//
// Created by WolverinDEV on 21/02/2021.
//
#include "./task_executor.h"
#include <ThreadPool/ThreadHelper.h>
#include <cassert>
#include <iostream>
using std::chrono::system_clock;
using namespace ts;
task_executor::task_executor(size_t num_threads, const std::string &thread_prefix) {
this->task_context = std::make_shared<struct task_context>();
this->executors.reserve(num_threads);
for(size_t index{0}; index < num_threads; index++) {
auto handle = std::make_shared<executor_context>();
handle->handle = this;
handle->task_context = this->task_context;
handle->thread_handle = std::thread(task_executor::executor, handle);
if(!thread_prefix.empty()) {
threads::name(handle->thread_handle, thread_prefix + std::to_string(index + 1));
}
this->executors.push_back(std::move(handle));
}
}
task_executor::~task_executor() {
for(auto& thread : this->executors) {
if(thread->thread_handle.joinable()) {
/* TODO: Print an error */
thread->thread_handle.detach();
}
}
/* TODO: delete tasks which are still pending */
}
void task_executor::set_exception_handler(task_exception_handler handler) {
std::lock_guard task_lock{this->task_context->mutex};
this->task_context->exception_handler = std::move(handler);
}
void task_executor::abort_exception_handler(const std::string &task, const std::exception_ptr &exception) {
std::string message{};
try {
std::rethrow_exception(exception);
} catch (const std::exception& ex) {
message = "std::exception::what() -> " + std::string{ex.what()};
} catch(...) {
message = "unknown exception";
}
std::cerr << "task_executor encountered an exception while executing task " << task << ": " << message << std::endl;
abort();
}
bool task_executor::shutdown(const std::chrono::system_clock::time_point &timeout) {
{
std::lock_guard task_lock{this->task_context->mutex};
this->task_context->shutdown = true;
this->task_context->notify.notify_all();
}
for(auto& thread : this->executors) {
if(timeout.time_since_epoch().count() > 0) {
auto now = system_clock::now();
if(now > timeout) {
/* failed to join all executors */
return false;
}
if(!threads::timed_join(thread->thread_handle, timeout - now)) {
/* thread failed to join */
return false;
}
} else {
threads::save_join(thread->thread_handle, false);
}
}
return true;
}
bool task_executor::schedule(task_id &task_id, std::string task_name, std::function<void()> callback) {
auto& task_context_ = this->task_context;
auto task = std::make_unique<task_executor::task>();
task->name = std::move(task_name);
task->callback = std::move(callback);
std::lock_guard task_lock{task_context_->mutex};
if(task_context_->shutdown) {
return false;
}
task->id = task_context_->id_index++;
if(!task->id) {
/* the task ids wrapped around I guess */
task->id = task_context_->id_index++;
}
task_id = task->id;
auto task_ptr = task.release();
task_context_->task_count++;
*task_context_->task_tail = task_ptr;
task_context_->task_tail = &task_ptr->next;
task_context_->notify.notify_one();
return true;
}
bool task_executor::schedule_repeating(task_id &task_id, std::string task_name, std::chrono::nanoseconds interval,
std::function<void(const std::chrono::system_clock::time_point &)> callback) {
auto& task_context_ = this->task_context;
auto task = std::make_unique<task_executor::task_recurring>();
task->name = std::move(task_name);
task->callback = std::move(callback);
task->interval = std::move(interval);
task->scheduled_invoke = std::chrono::system_clock::now();
std::lock_guard task_lock{task_context_->mutex};
if(task_context_->shutdown) {
return false;
}
task->id = task_context_->id_index++;
if(!task->id) {
/* the task ids wrapped around I guess */
task->id = task_context_->id_index++;
}
task_id = task->id;
task_context_->task_recurring_count++;
this->enqueue_recurring_task(task.release());
task_context_->notify.notify_one();
return true;
}
void task_executor::enqueue_recurring_task(task_recurring *task) {
auto& task_context_ = this->task_context;
if(!task_context_->task_recurring_head) {
/* No tasks pending. We could easily enqueue the task. */
task->next = nullptr;
task_context_->task_recurring_head = task;
} else {
/* Find the correct insert spot */
task_recurring* previous_task{nullptr};
task_recurring* next_task{task_context_->task_recurring_head};
while(true) {
if(next_task->scheduled_invoke > task->scheduled_invoke) {
break;
}
previous_task = next_task;
next_task = next_task->next;
if(!next_task) {
/* We reached the queue end. */
break;
}
}
task->next = next_task;
if(!previous_task) {
/* we're inserting the task as head */
assert(next_task == task_context_->task_recurring_head);
task_context_->task_recurring_head = task;
} else {
previous_task->next = task;
task->next = next_task;
}
}
}
bool task_executor::cancel_task(task_id task_id) {
auto& task_context_ = this->task_context;
std::unique_lock task_lock{task_context->mutex};
/* 1. Search for a pending normal task */
{
task* previous_task{nullptr};
task* current_task{task_context_->task_head};
while(current_task) {
if(current_task->id == task_id) {
/* We found our task. Just remove and delete it. */
if(previous_task) {
previous_task->next = current_task->next;
} else {
assert(task_context_->task_head == current_task);
if(current_task->next) {
assert(task_context_->task_tail != &current_task->next);
task_context_->task_head = current_task->next;
} else {
assert(task_context_->task_tail == &current_task->next);
task_context_->task_head = nullptr;
task_context_->task_tail = &task_context_->task_head;
}
}
assert(task_context_->task_count > 0);
task_context_->task_count--;
task_lock.unlock();
delete current_task;
return true;
}
previous_task = current_task;
current_task = current_task->next;
}
}
/* 2. Search for a pending recurring task */
{
task_recurring* previous_task{nullptr};
task_recurring* current_task{task_context_->task_recurring_head};
while(current_task) {
if(current_task->id == task_id) {
/* We found our task. Just remove and delete it. */
if(previous_task) {
previous_task->next = current_task->next;
} else {
assert(task_context_->task_recurring_head == current_task);
task_context_->task_recurring_head = nullptr;
}
assert(task_context_->task_recurring_count > 0);
task_context_->task_recurring_count--;
task_lock.unlock();
delete current_task;
return true;
}
previous_task = current_task;
current_task = current_task->next;
}
}
/* 3. Our task does not seem to pend anywhere. May it already gets executed. */
for(auto& executor : this->executors) {
if(executor->executing_task && executor->executing_task->id == task_id) {
/*
* It gets executed right now.
* The task itself will be deleted by the executor.
* Note: No need to decrease the task count here since it has already been
* decreased when receiving the task by the executor.
*/
return true;
}
if(executor->executing_recurring_task && executor->executing_recurring_task->id == task_id) {
/*
* It gets executed right now.
* Setting shutdown flag to prevent rescheduling.
* The task will be deleted by the executor itself.
*/
auto task_handle = executor->executing_recurring_task;
task_handle->shutdown = true;
assert(task_context_->task_recurring_count > 0);
task_context_->task_recurring_count--;
return true;
}
}
return false;
}
void task_executor::executor(std::shared_ptr<executor_context> executor_context) {
auto& task_context = executor_context->task_context;
std::unique_lock task_lock{task_context->mutex};
while(true) {
assert(task_lock.owns_lock());
if(task_context->shutdown) {
break;
}
if(task_context->task_head) {
auto task = task_context->task_head;
if(task->next) {
assert(task_context->task_tail != &task->next);
task_context->task_head = task->next;
} else {
assert(task_context->task_tail == &task->next);
task_context->task_head = nullptr;
task_context->task_tail = &task_context->task_head;
}
assert(task_context->task_count > 0);
task_context->task_count--;
executor_context->executing_task = task;
task_lock.unlock();
try {
task->callback();
} catch (...) {
auto exception = std::current_exception();
task_lock.lock();
auto handler = task_context->exception_handler;
task_lock.unlock();
handler(task->name, exception);
}
task_lock.lock();
executor_context->executing_task = nullptr;
delete task;
continue;
}
auto execute_timestamp = system_clock::now();
if(task_context->task_recurring_head && task_context->task_recurring_head->scheduled_invoke <= execute_timestamp) {
auto task = task_context->task_recurring_head;
task_context->task_recurring_head = task->next;
executor_context->executing_recurring_task = task;
task_lock.unlock();
try {
task->callback(task->last_invoked);
} catch (...) {
auto exception = std::current_exception();
task_lock.lock();
auto handler = task_context->exception_handler;
task_lock.unlock();
handler(task->name, exception);
}
task->last_invoked = execute_timestamp;
task->scheduled_invoke = std::min(system_clock::now(), execute_timestamp + task->interval);
task_lock.lock();
executor_context->executing_recurring_task = nullptr;
if(task->shutdown) {
delete task;
} else {
executor_context->handle->enqueue_recurring_task(task);
}
continue;
}
task_context->notify.wait(task_lock);
}
}

281
src/misc/task_executor.h Normal file
View File

@ -0,0 +1,281 @@
#pragma once
#include <thread>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <atomic>
#include <cassert>
namespace ts {
typedef uint32_t task_id;
typedef std::function<void(const std::string& /* task name */, const std::exception_ptr& /* exception */)> task_exception_handler;
/**
* A basic task executor & scheduler for one time and repeating tasks
* Note: All methods are thread save or it's specified otherwise
*/
class task_executor {
public:
task_executor(size_t /* num threads */, const std::string& /* thread prefix */);
~task_executor();
void set_exception_handler(task_exception_handler);
/**
* Note: This method is not thread save.
* @returns `true` if all actions have been successfully shut down
*/
bool shutdown(const std::chrono::system_clock::time_point & /* timeout */);
/**
* Cancel a task. If the task is currently executing it will not block.
* @returns `true` if the task has been found and `false` if the task isn't known.
*/
bool cancel_task(task_id /* task id */);
#if 0
/**
* Cancel a task with the possibility to wait until it has finished.
*/
void cancel_task_joinable(task_id /* task id */);
#endif
/**
* @returns `true` if the task has successfully be enqueued for scheduling.
*/
bool schedule(task_id& /* task handle */, std::string /* name */, std::function<void()> /* callback */);
/**
* @returns `true` if the task has successfully be enqueued for repeating scheduling.
*/
bool schedule_repeating(task_id& /* task handle */,
std::string /* name */,
std::chrono::nanoseconds /* interval */,
std::function<void(const std::chrono::system_clock::time_point& /* last scheduled */)> /* callback */);
private:
struct task {
task_id id{0};
std::string name{};
std::function<void()> callback{};
task* next{nullptr};
};
struct task_recurring {
task_id id{0};
std::string name{};
bool shutdown{false};
std::chrono::nanoseconds interval{};
std::chrono::system_clock::time_point last_invoked{};
std::chrono::system_clock::time_point scheduled_invoke{};
std::function<void(const std::chrono::system_clock::time_point& /* last executed */)> callback{};
task_recurring* next{nullptr};
};
struct task_context {
std::mutex mutex{};
std::condition_variable notify{};
bool shutdown{false};
task_id id_index{1};
size_t task_count{};
task* task_head{nullptr};
task** task_tail{&this->task_head};
size_t task_recurring_count{};
task_recurring* task_recurring_head{nullptr};
task_exception_handler exception_handler{task_executor::abort_exception_handler};
};
struct executor_context {
task_executor* handle;
std::thread thread_handle{};
std::shared_ptr<struct task_context> task_context{};
/**
* Must be accessed while holding the task_context.mutex and shall never be changed except for the executor.
* Lifetime will be granted while holding the lock.
*/
task* executing_task{nullptr};
task_recurring* executing_recurring_task{nullptr};
};
std::vector<std::shared_ptr<executor_context>> executors{};
std::shared_ptr<task_context> task_context;
/**
* Enqueue the task into the task queue.
* Attention:
* 1. The task context mutex must be hold by the caller
* 2. The task should not be enqueued already
*/
void enqueue_recurring_task(task_recurring* /* task */);
static void executor(std::shared_ptr<executor_context> /* context shared pointer */);
static void abort_exception_handler(const std::string& /* task name */, const std::exception_ptr& /* exception */);
};
/**
* Helper class for tasks which could be executed multiple times.
* It will avoid execution stacking while the task is executing.
* The task will never be executed twice only sequential.
*/
struct multi_shot_task {
public:
explicit multi_shot_task() {}
multi_shot_task(std::shared_ptr<task_executor> executor, std::string task_name, std::function<void()> callback)
: inner{std::make_shared<execute_inner>(std::move(executor), std::move(task_name), std::move(callback))} {
this->inner->callback_wrapper = [inner = this->inner]{
auto result = inner->schedule_kind.exchange(2);
assert(result == 1);
(void) result;
try {
(inner->callback)();
execute_finished(&*inner);
} catch (...) {
execute_finished(&*inner);
std::rethrow_exception(std::current_exception());
}
};
}
multi_shot_task(const multi_shot_task&) = default;
multi_shot_task(multi_shot_task&& other) = default;
inline multi_shot_task& operator=(const multi_shot_task& other) {
this->inner = other.inner;
return *this;
}
inline multi_shot_task& operator=(multi_shot_task&& other) {
this->inner = std::move(other.inner);
return *this;
}
/**
* @returns `true` if the task has successfully be enqueued or is already enqueued
* and `false` if the `schedule` call failed or we have no task.
*/
inline bool enqueue() {
auto& inner_ = this->inner;
if(!inner_) {
return false;
}
{
//CAS loop: https://preshing.com/20150402/you-can-do-any-kind-of-atomic-read-modify-write-operation/
uint8_t current_state = inner_->schedule_kind.load();
uint8_t new_state;
do {
switch(current_state) {
case 0:
/* no execute has been scheduled */
new_state = 1;
break;
case 1:
case 3:
/* an execute is already scheduled */
return true;
case 2:
/* we're already executing now but we need a new execute */
new_state = 3;
break;
default:
assert(false);
return false;
}
} while(!inner_->schedule_kind.compare_exchange_weak(current_state, new_state));
}
task_id task_id_;
auto result = inner_->executor->schedule(task_id_, inner_->task_name, inner->callback_wrapper);
if(!result) {
/*
* Task isn't scheduled any more. We failed to schedule it.
* Note: The task might got rescheduled again so may more than only one schedule attempt fail
* in total.
*/
inner_->schedule_kind = 0;
return false;
}
return true;
}
private:
struct execute_inner {
explicit execute_inner(std::shared_ptr<task_executor> executor, std::string name, std::function<void()> callback) noexcept
: task_name{std::move(name)}, executor{std::move(executor)}, callback{std::move(callback)} {}
std::string task_name;
std::shared_ptr<task_executor> executor;
std::function<void()> callback;
std::function<void()> callback_wrapper;
/**
* `0` not scheduled
* `1` scheduled
* `2` executing
* `3` executing with reschedule
*/
std::atomic<uint8_t> schedule_kind{0};
};
std::shared_ptr<execute_inner> inner{};
inline static void execute_finished(execute_inner* inner) {
auto current_state = inner->schedule_kind.load();
uint8_t new_state;
do {
switch(current_state) {
case 0:
case 1:
assert(false);
return;
case 2:
new_state = 0;
break;
case 3:
new_state = 1;
break;
default:
assert(false);
return;
};
} while(!inner->schedule_kind.compare_exchange_weak(current_state, new_state));
if(new_state == 1) {
/* a reschedule was requested */
task_id task_id_;
if(!inner->executor->schedule(task_id_, inner->task_name, inner->callback_wrapper)) {
/*
* Task isn't scheduled any more. We failed to schedule it.
* Note: The task might got rescheduled again so may more than only one schedule attempt fail
* in total.
*/
inner->schedule_kind = 0;
}
}
}
};
}