From bae6a56ed3b3e90e531ef717478f4868341f624c Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Fri, 16 Aug 2019 16:13:14 +0200 Subject: [PATCH] Changes --- server/src/client/ConnectedClient.h | 1 + .../client/ConnectedClientCommandHandler.cpp | 68 ++++++ server/src/manager/ConversationManager.cpp | 204 ++++++++++++++---- server/src/manager/ConversationManager.h | 28 +-- 4 files changed, 245 insertions(+), 56 deletions(-) diff --git a/server/src/client/ConnectedClient.h b/server/src/client/ConnectedClient.h index 25e80f9..1b0ebf6 100644 --- a/server/src/client/ConnectedClient.h +++ b/server/src/client/ConnectedClient.h @@ -588,6 +588,7 @@ namespace ts { CommandResult handleCommandConversationHistory(Command&); CommandResult handleCommandConversationFetch(Command&); + CommandResult handleCommandConversationMessageDelete(Command&); CommandResult handleCommandLogView(Command&); //CMD_TODO handleCommandLogAdd diff --git a/server/src/client/ConnectedClientCommandHandler.cpp b/server/src/client/ConnectedClientCommandHandler.cpp index 8858bf5..9ba7bc2 100644 --- a/server/src/client/ConnectedClientCommandHandler.cpp +++ b/server/src/client/ConnectedClientCommandHandler.cpp @@ -7550,6 +7550,74 @@ CommandResult ConnectedClient::handleCommandConversationFetch(ts::Command &cmd) } +CommandResult ConnectedClient::handleCommandConversationMessageDelete(ts::Command &cmd) { + CMD_REF_SERVER(ref_server); + CMD_CHK_AND_INC_FLOOD_POINTS(25); + + auto conversation_manager = ref_server->conversation_manager(); + std::shared_ptr current_conversation; + ChannelId current_conversation_id = 0; + + for(size_t index = 0; index < cmd.bulkCount(); index++) { + auto &bulk = cmd[index]; + + if(!bulk.has("cid") || !bulk["cid"].castable()) + continue; + + /* test if we have access to the conversation */ + if(current_conversation_id != bulk["cid"].as()) { + current_conversation_id = bulk["cid"].as(); + + /* test if we're able to see the channel */ + { + shared_lock channel_view_lock(this->channel_lock); + auto channel = this->channel_view()->find_channel(current_conversation_id); + if(!channel) + return findError("conversation_invalid_id"); + } + + /* test if there is a channel password or join power which denies that we see the conversation */ + { + shared_lock channel_view_lock(ref_server->channel_tree_lock); + auto channel = ref_server->getChannelTree()->findChannel(current_conversation_id); + if(!channel) + return findError("conversation_invalid_id"); + + if(!bulk.has("cpw")) + bulk["cpw"] = ""; + + if (!channel->passwordMatch(bulk["cpw"], true)) + if (!this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_channel_join_ignore_password, 1, channel, true)) + return findError("channel_invalid_password"); + + if (!this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_channel_conversation_message_delete, 1, channel)) + return CommandResultPermissionError{permission::b_channel_conversation_message_delete}; + + if(!this->permissionGranted(permission::PERMTEST_ORDERED, permission::b_channel_ignore_join_power, 1, channel, true)) { + auto permission_granted = this->calculate_permission_value(permission::i_channel_join_power, channel->channelId()); + if(!channel->permission_granted(permission::i_channel_needed_join_power, permission_granted, false)) + return CommandResultPermissionError{permission::i_channel_needed_join_power}; + } + } + } + + current_conversation = conversation_manager->get(current_conversation_id); + if(!current_conversation) continue; + + auto timestamp_begin = system_clock::time_point{} + milliseconds{bulk["timestamp_begin"]}; + auto timestamp_end = system_clock::time_point{} + milliseconds{bulk.has("timestamp_begin") ? bulk["timestamp_begin"].as() : 0}; + auto limit = bulk.has("limit") ? bulk["limit"].as() : 1; + if(limit > 100) + limit = 100; + auto delete_count = current_conversation->delete_messages(timestamp_begin, limit, timestamp_end, bulk["cldbid"]); + if(delete_count > 0) { + //TODO: Notify + } + } + + return CommandResult::Success; +} + diff --git a/server/src/manager/ConversationManager.cpp b/server/src/manager/ConversationManager.cpp index ce87453..c26e998 100644 --- a/server/src/manager/ConversationManager.cpp +++ b/server/src/manager/ConversationManager.cpp @@ -1,3 +1,5 @@ +#include + #include "./ConversationManager.h" #include "../InstanceHandler.h" #include "../TSServer.h" @@ -215,7 +217,7 @@ __attribute__((optimize("-O3"), always_inline)) void apply_crypt(void* source, v dest_ptr++; } } -Conversation::Conversation(const std::shared_ptr &handle, ts::ChannelId channel_id, const std::string& file) : _ref_handle(handle), _channel_id(channel_id), file_name(file) { } +Conversation::Conversation(const std::shared_ptr &handle, ts::ChannelId channel_id, std::string file) : _ref_handle(handle), _channel_id(channel_id), file_name(std::move(file)) { } Conversation::~Conversation() { this->finalize(); @@ -565,7 +567,7 @@ bool Conversation::load_message_block_index(const std::shared_ptr(); - index->successfully = false; + index->index_successful = false; { if(!this->load_message_block_header(block, error)) { error = "failed to load block header: " + error; @@ -596,7 +598,7 @@ bool Conversation::load_message_block_index(const std::shared_ptrmessage_index.emplace_back(fio::IndexedMessage{(uint32_t) (offset - block->block_offset), system_clock::time_point{} + milliseconds{header.message_timestamp}, std::shared_ptr{nullptr}}); + index->message_index.emplace_back(fio::IndexedBlockMessage{(uint32_t) (offset - block->block_offset), header, nullptr}); offset += header.total_length; } } @@ -627,11 +629,15 @@ bool Conversation::load_messages(const std::shared_ptr &block, error = "failed to open file handle"; return false; } - auto result = fseek(this->file_handle, block->block_offset + indexed_block->message_index[index].offset, SEEK_SET); + auto result = fseek(this->file_handle, block->block_offset + indexed_block->message_index[index].message_offset, SEEK_SET); if(result == EINVAL) { error = "failed to seek to begin of an indexed block read"; return false; } + + /* + * We dont need to lock the message_index_lock here because we never delete the messages and we just iterate with index + */ while(index < end_index && index < indexed_block->message_index.size()) { auto& message_data = indexed_block->message_index[index]; if(message_data.message_data) { @@ -639,19 +645,10 @@ bool Conversation::load_messages(const std::shared_ptr &block, continue; } + auto data = make_shared(); - if(this->fread(&data->header, sizeof(data->header), -1, false) != sizeof(data->header)) { - error = "failed to read message header at index " + to_string(index); - return false; - } - - if(data->header.cookie != fio::MessageHeader::HEADER_COOKIE) { - error = "failed to verify message header at " + to_string(index); - return false; - } - if(header->meta_encrypted) { - auto meta_size = data->header.sender_unique_id_length + data->header.sender_name_length; + auto meta_size = message_data.header.sender_unique_id_length + message_data.header.sender_name_length; auto meta_buffer = malloc(meta_size); if(this->fread(meta_buffer, meta_size, -1, false) != meta_size) { @@ -660,14 +657,14 @@ bool Conversation::load_messages(const std::shared_ptr &block, return false; } - apply_crypt(meta_buffer, meta_buffer, meta_size, (block->block_offset ^ data->header.message_timestamp) ^ 0x6675636b20796f75ULL); /* 0x6675636b20796f75 := 'fuck you' */ + apply_crypt(meta_buffer, meta_buffer, meta_size, (block->block_offset ^ message_data.header.message_timestamp) ^ 0x6675636b20796f75ULL); /* 0x6675636b20796f75 := 'fuck you' */ - data->sender_unique_id.assign((char*) meta_buffer, data->header.sender_unique_id_length); - data->sender_name.assign((char*) meta_buffer + data->header.sender_unique_id_length, data->header.sender_name_length); + data->sender_unique_id.assign((char*) meta_buffer, message_data.header.sender_unique_id_length); + data->sender_name.assign((char*) meta_buffer + message_data.header.sender_unique_id_length, message_data.header.sender_name_length); free(meta_buffer); } else { - data->sender_unique_id.resize(data->header.sender_unique_id_length); - data->sender_name.resize(data->header.sender_name_length); + data->sender_unique_id.resize(message_data.header.sender_unique_id_length); + data->sender_name.resize(message_data.header.sender_name_length); if(this->fread(data->sender_unique_id.data(), data->sender_unique_id.length(), -1, false) != data->sender_unique_id.length()) { error = "failed to read message sender unique id at " + to_string(index); @@ -680,14 +677,14 @@ bool Conversation::load_messages(const std::shared_ptr &block, } } - data->message.resize(data->header.message_length); + data->message.resize(message_data.header.message_length); if(this->fread(data->message.data(), data->message.length(), -1, false) != data->message.length()) { error = "failed to read message id at " + to_string(index); return false; } if(header->message_encrypted) - apply_crypt(data->message.data(), data->message.data(), data->message.size(), block->block_offset ^ data->header.message_timestamp); + apply_crypt(data->message.data(), data->message.data(), data->message.size(), block->block_offset ^ message_data.header.message_timestamp); message_data.message_data = data; index++; @@ -907,7 +904,7 @@ void Conversation::process_write_queue(const std::chrono::system_clock::time_poi auto indexed_block = this->last_message_block->indexed_block; if(indexed_block) { lock_guard lock(indexed_block->message_index_lock); - indexed_block->message_index.push_back(fio::IndexedMessage{(uint32_t) (entry_offset - this->last_message_block->block_offset), write_entry->message_timestamp, nullptr}); + indexed_block->message_index.push_back(fio::IndexedBlockMessage{(uint32_t) (entry_offset - this->last_message_block->block_offset), {write_header}, nullptr}); } } @@ -1018,6 +1015,7 @@ std::deque> Conversation::message_history(con if(message_count == 0) return result; + bool count_deleted = false; /* first try to fillout the result with the cached messages */ { lock_guard lock(this->_last_messages_lock); @@ -1027,6 +1025,8 @@ std::deque> Conversation::message_history(con continue; if(begin_timestamp.time_since_epoch().count() != 0 && (*it)->message_timestamp < begin_timestamp) return result; + if((*it)->flag_message_deleted && !count_deleted) + continue; result.push_back(*it); if(--message_count == 0) @@ -1043,7 +1043,8 @@ std::deque> Conversation::message_history(con if(!ref_server) return result; - auto timestamp = result.empty() ? end_timestamp : result.back()->message_timestamp; + auto begin_timestamp_ms = chrono::floor(begin_timestamp.time_since_epoch()).count(); + auto timestamp_ms = result.empty() ? chrono::floor(end_timestamp.time_since_epoch()).count() : chrono::floor(result.back()->message_timestamp.time_since_epoch()).count(); unique_lock lock(this->message_block_lock); auto rit = this->message_blocks.end(); @@ -1051,7 +1052,7 @@ std::deque> Conversation::message_history(con bool found = false; do { rit--; - if((*rit)->begin_timestamp < timestamp) { + if(chrono::floor((*rit)->begin_timestamp.time_since_epoch()).count() < timestamp_ms) { found = true; break; /* we found the first block which is created before the point we're searching from */ } @@ -1085,7 +1086,7 @@ std::deque> Conversation::message_history(con auto block_found = false; do { rmid--; - if((*rmid).timestamp < timestamp) { + if(rmid->header.message_timestamp < timestamp_ms) { block_found = true; break; /* we found the first block which is created before the point we're searching from */ } @@ -1099,34 +1100,31 @@ std::deque> Conversation::message_history(con continue; } do { + if(rmid->header.flag_deleted && !count_deleted) + continue; + + if(rmid->header.message_timestamp >= timestamp_ms) + continue; /* for some reason we got a message from the index of before where we are. This could happen for "orphaned" blocks which point to a valid block within the future block */ + + if(begin_timestamp.time_since_epoch().count() != 0 && rmid->header.message_timestamp < begin_timestamp_ms) + return result; + auto data = rmid->message_data; if(!data) continue; - if(begin_timestamp.time_since_epoch().count() != 0 && rmid->timestamp < begin_timestamp) - return result; - - if(rmid->timestamp >= timestamp) - continue; /* for some reason we got a message from the index of before where we are. This could happen for "orphaned" blocks which point to a valid block within the future block */ - /* - std::chrono::system_clock::time_point message_timestamp; - - ClientDbId sender_database_id; - std::string sender_unique_id; - std::string sender_name; - - std::string message; - */ result.push_back(make_shared(ConversationEntry{ - rmid->timestamp, + system_clock::time_point{} + milliseconds{rmid->header.message_timestamp}, - (ClientDbId) data->header.sender_database_id, + (ClientDbId) rmid->header.sender_database_id, data->sender_unique_id, data->sender_name, - data->message + data->message, + + rmid->header.flag_deleted })); - timestamp = rmid->timestamp; + timestamp_ms = rmid->header.message_timestamp; if(--message_count == 0) return result; } while(rmid-- != index->message_index.begin()); @@ -1138,6 +1136,124 @@ std::deque> Conversation::message_history(con return result; } +//TODO: May move the IO write part to the write queue? +size_t Conversation::delete_messages(const std::chrono::system_clock::time_point &end_timestamp, size_t message_count, const std::chrono::system_clock::time_point &begin_timestamp, ts::ClientDbId cldbid) { + size_t delete_count_volatile = 0, delete_count = 0; + + if(message_count == 0) + return 0; + + /* first try to fillout the result with the cached messages */ + { + lock_guard lock(this->_last_messages_lock); + for(auto it = this->_last_messages.rbegin(); it != this->_last_messages.rend(); it++) { + if((*it)->message_timestamp > end_timestamp) /* message has been send after the search timestamp */ + continue; + + if(begin_timestamp.time_since_epoch().count() != 0 && (*it)->message_timestamp < begin_timestamp) + break; + + if(cldbid != 0 && (*it)->sender_database_id != cldbid) + continue; + + (*it)->flag_message_deleted = false; + if(++delete_count_volatile >= message_count) + break; + } + } + + /* TODO: Remove from write queue */ + + if(!this->volatile_only()) { + auto handle = this->_ref_handle.lock(); + if(!handle) + return delete_count_volatile; + + auto ref_server = handle->ref_server(); + if(!ref_server) + return delete_count_volatile; + + auto begin_timestamp_ms = chrono::floor(begin_timestamp.time_since_epoch()).count(); + auto timestamp_ms = chrono::floor(end_timestamp.time_since_epoch()).count(); + + unique_lock lock(this->message_block_lock); + auto rit = this->message_blocks.end(); + if(rit != this->message_blocks.begin()) { + bool found = false; + do { + rit--; + if(chrono::floor((*rit)->begin_timestamp.time_since_epoch()).count() < timestamp_ms) { + found = true; + break; /* we found the first block which is created before the point we're searching from */ + } + } while(rit != this->message_blocks.begin()); + + + string error; + if(found) { + vector> relevant_entries{this->message_blocks.begin(), ++rit}; + lock.unlock(); + + auto _rit = --relevant_entries.end(); + do { + auto block = *_rit; + /* lets search for messages */ + if(!this->load_message_block_index(block, error)) { + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to load message block {} for message delete: {}", this->_channel_id, block->block_offset, error); + continue; + } + auto index = (*_rit)->indexed_block; + if(!index) { + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to reference indexed block within message block.", this->_channel_id); + continue; + } + + lock_guard index_lock{index->message_index_lock}; + auto rmid = index->message_index.end(); + if(rmid == index->message_index.begin()) + continue; /* Empty block? Funny */ + + auto block_found = false; + do { + rmid--; + if(rmid->header.message_timestamp < timestamp_ms) { + block_found = true; + break; /* we found the first block which is created before the point we're searching from */ + } + } while(rmid != index->message_index.begin()); + if(!block_found) + continue; + + do { + if(rmid->header.message_timestamp >= timestamp_ms) + continue; /* for some reason we got a message from the index of before where we are. This could happen for "orphaned" blocks which point to a valid block within the future block */ + + if(begin_timestamp.time_since_epoch().count() != 0 && rmid->header.message_timestamp < begin_timestamp_ms) + return max(delete_count, delete_count_volatile); + + if(cldbid != 0 && rmid->header.sender_database_id != cldbid) + continue; + + if(!rmid->header.flag_deleted) { + rmid->header.flag_deleted = true; + + auto offset = block->block_offset + rmid->message_offset; + if(this->fwrite(&rmid->header, sizeof(rmid->header), offset, false, true) != sizeof(rmid->header)) + logWarning(ref_server->getServerId(), "[Conversations][{}] Failed to save message flags.", this->_channel_id); + } + + timestamp_ms = rmid->header.message_timestamp; + if(++delete_count >= message_count) + return max(delete_count, delete_count_volatile); + } while(rmid-- != index->message_index.begin()); + } while(_rit-- != relevant_entries.begin()); + } + } + } + + return max(delete_count, delete_count_volatile); +} + ConversationManager::ConversationManager(const std::shared_ptr &server) : _ref_server(server) { } ConversationManager::~ConversationManager() { } diff --git a/server/src/manager/ConversationManager.h b/server/src/manager/ConversationManager.h index 4df9d31..0952163 100644 --- a/server/src/manager/ConversationManager.h +++ b/server/src/manager/ConversationManager.h @@ -21,6 +21,7 @@ namespace ts { std::string sender_name; std::string message; + bool flag_message_deleted; }; namespace fio { @@ -64,32 +65,33 @@ namespace ts { uint8_t sender_unique_id_length; /* directly followed by this header */ uint8_t sender_name_length; /* directly followed after the unique id */ uint16_t message_length; /* directly followed after the name */ - uint16_t message_flags; /* could be later something like deleted etc.... */ + union { + uint16_t message_flags; /* could be later something like deleted etc.... */ + struct { + uint16_t _flags_padding: 15; + bool flag_deleted: 1; + }; + }; }; static_assert(sizeof(MessageHeader) == 26); #pragma pack(pop) struct IndexedMessageData { - MessageHeader header; std::string sender_unique_id; std::string sender_name; std::string message; }; - struct IndexedMessage { - uint32_t offset; - std::chrono::system_clock::time_point timestamp; + struct IndexedBlockMessage { + uint32_t message_offset; + MessageHeader header; std::shared_ptr message_data; }; struct IndexedBlock { - bool successfully; - /* - * message_index[0] := index of the message (including the header!) - * message_index[1] := timestamp of the message - */ - std::deque message_index; + bool index_successful; + std::deque message_index; std::mutex message_index_lock; }; } @@ -135,7 +137,7 @@ namespace ts { class ConversationManager; class Conversation { public: - Conversation(const std::shared_ptr& /* handle */, ChannelId /* channel id */, const std::string& /* file name */); + Conversation(const std::shared_ptr& /* handle */, ChannelId /* channel id */, std::string /* file name */); ~Conversation(); bool initialize(std::string& error); @@ -158,6 +160,8 @@ namespace ts { return this->message_history(std::chrono::system_clock::now(), limit, std::chrono::system_clock::time_point{}); } + size_t delete_messages(const std::chrono::system_clock::time_point& /* end timestamp */, size_t /* limit */, const std::chrono::system_clock::time_point& /* begin timestamp */, ClientDbId /* database id */); + ts_always_inline void set_ref_self(const std::shared_ptr& pointer) { this->_ref_self = pointer; }