diff --git a/file/local_server/LocalFileProvider.h b/file/local_server/LocalFileProvider.h index 88d8a9e..600f94e 100644 --- a/file/local_server/LocalFileProvider.h +++ b/file/local_server/LocalFileProvider.h @@ -258,7 +258,7 @@ namespace ts::server::file { bool enqueue_disk_buffer_bytes(const void* /* buffer */, size_t /* length */); /* these function clear the buffers and set the write disconnected flags to true so no new buffers will be enqueued */ - void flush_network_buffer(); + size_t flush_network_buffer(); void flush_disk_buffer(); [[nodiscard]] inline std::string log_prefix() const { return "[" + net::to_string(this->networking.address) + "]"; } diff --git a/file/local_server/LocalFileTransferClientWorker.cpp b/file/local_server/LocalFileTransferClientWorker.cpp index 774c324..6170ba8 100644 --- a/file/local_server/LocalFileTransferClientWorker.cpp +++ b/file/local_server/LocalFileTransferClientWorker.cpp @@ -6,7 +6,6 @@ #include #include #include "./LocalFileProvider.h" -#include "LocalFileProvider.h" using namespace ts::server::file; using namespace ts::server::file::transfer; diff --git a/file/local_server/LocalFileTransferDisk.cpp b/file/local_server/LocalFileTransferDisk.cpp index 037ec3a..d45039f 100644 --- a/file/local_server/LocalFileTransferDisk.cpp +++ b/file/local_server/LocalFileTransferDisk.cpp @@ -478,7 +478,7 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien } } else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { if(client->state == FileClient::STATE_DISCONNECTING) { - client->flush_disk_buffer(); /* just in case */ + client->flush_disk_buffer(); /* just in case, file download usually don't write to the disk */ return; } diff --git a/file/local_server/LocalFileTransferNetwork.cpp b/file/local_server/LocalFileTransferNetwork.cpp index 5f5b8e8..72c174d 100644 --- a/file/local_server/LocalFileTransferNetwork.cpp +++ b/file/local_server/LocalFileTransferNetwork.cpp @@ -149,13 +149,14 @@ bool FileClient::enqueue_network_buffer_bytes(const void *snd_buffer, size_t siz return false; } -void FileClient::flush_network_buffer() { +size_t FileClient::flush_network_buffer() { Buffer* current_head; + size_t bytes; { std::lock_guard block{this->network_buffer.mutex}; this->network_buffer.write_disconnected = true; - this->network_buffer.bytes = 0; + bytes = std::exchange(this->network_buffer.bytes, 0); current_head = std::exchange(this->network_buffer.buffer_head, nullptr); this->network_buffer.buffer_tail = &this->network_buffer.buffer_head; } @@ -165,6 +166,8 @@ void FileClient::flush_network_buffer() { free_buffer(current_head); current_head = next; } + + return bytes; } NetworkingStartResult LocalFileTransfer::start_networking() { @@ -726,7 +729,13 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo } } - transfer->flush_network_buffer(); /* invalidate all network write operations */ + /* invalidate all network write operations, but still flush the disk IO buffer */ + if(size_t bytes_dropped{transfer->flush_network_buffer()}; bytes_dropped > 0) { + if(transfer->state != FileClient::STATE_TRANSFERRING) { + logWarning(LOG_FT, "{} Dropped {} bytes due to a write error ({}/{})", + transfer->log_prefix(), bytes_dropped, errno, strerror(errno)); + } + } std::unique_lock slock{transfer->state_mutex}; transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true); return; @@ -773,10 +782,6 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo callback(transfer->transfer); } } - - std::unique_lock slock{transfer->state_mutex}; - /* no need to flush here, since we read only from the disk and all bytes which sould be send have been written already */ - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false); return; } transfer->handle->enqueue_disk_io(transfer->shared_from_this()); diff --git a/file/local_server/NetTools.h b/file/local_server/NetTools.h index f1e2be9..3d2e044 100644 --- a/file/local_server/NetTools.h +++ b/file/local_server/NetTools.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -11,6 +12,7 @@ namespace ts::server::file::networking { struct NetworkThrottle { constexpr static auto kThrottleTimespanMs{250}; + typedef uint8_t span_t; static NetworkThrottle kNoThrottle; diff --git a/server/src/FileServerHandler.cpp b/server/src/FileServerHandler.cpp index dfdab34..5fc443f 100644 --- a/server/src/FileServerHandler.cpp +++ b/server/src/FileServerHandler.cpp @@ -93,44 +93,38 @@ void FileServerHandler::callback_transfer_aborted(const std::shared_ptrclient_transfer_id); - notify.put(0, "size", 0); /* not sure where TeamSpeak counts from */ + notify.put(0, "size", 0); + ts::command_result status{}; using ErrorType = ts::server::file::transfer::TransferError::Type; switch (error.error_type) { case ErrorType::TRANSFER_TIMEOUT: - notify.put_unchecked(0, "status", (int) error::file_transfer_connection_timeout); - notify.put_unchecked(0, "msg", findError(error::file_transfer_connection_timeout).message); + status.reset(ts::command_result{error::file_transfer_connection_timeout}); break; case ErrorType::DISK_IO_ERROR: case ErrorType::DISK_TIMEOUT: case ErrorType::DISK_INITIALIZE_ERROR: - notify.put_unchecked(0, "status", (int) error::file_io_error); - notify.put_unchecked(0, "msg", findError(error::file_io_error).message); + status.reset(ts::command_result{error::file_io_error}); break; case ErrorType::UNKNOWN: case ErrorType::NETWORK_IO_ERROR: - notify.put_unchecked(0, "status", (int) error::file_connection_lost); - notify.put_unchecked(0, "msg", findError(error::file_connection_lost).message); + status.reset(ts::command_result{error::file_connection_lost}); break; case ErrorType::UNEXPECTED_CLIENT_DISCONNECT: case ErrorType::UNEXPECTED_DISK_EOF: - notify.put_unchecked(0, "status", (int) error::file_transfer_interrupted); - notify.put_unchecked(0, "msg", findError(error::file_transfer_interrupted).message); + status.reset(ts::command_result{error::file_transfer_interrupted}); case ErrorType::USER_REQUEST: - notify.put_unchecked(0, "status", (int) error::file_transfer_canceled); - notify.put_unchecked(0, "msg", findError(error::file_transfer_canceled).message); + status.reset(ts::command_result{error::file_transfer_canceled}); break; } - notify.put_unchecked(0, "extra_msg", error.error_message); - + client->writeCommandResult(notify, status, "status"); client->sendCommand(notify); } } @@ -197,7 +191,7 @@ void FileServerHandler::callback_transfer_finished(const std::shared_ptrclient_transfer_id); - notify.put(0, "size", 0); /* not sure where TeamSpeak counts from */ + notify.put(0, "size", transfer->expected_file_size); /* not sure where TeamSpeak counts from */ notify.put_unchecked(0, "status", (int) error::file_transfer_complete); notify.put_unchecked(0, "msg", findError(error::file_transfer_complete).message); diff --git a/server/src/FileServerHandler.h b/server/src/FileServerHandler.h index 25e7e5c..620006f 100644 --- a/server/src/FileServerHandler.h +++ b/server/src/FileServerHandler.h @@ -15,13 +15,6 @@ namespace ts::server::file { private: InstanceHandler* instance_; -#if 0 - std::function&)> callback_transfer_registered{}; /* transfer has been registered */ - std::function&)> callback_transfer_started{}; /* transfer has been started */ - std::function&)> callback_transfer_finished{}; /* transfer has been finished */ - std::function&, const TransferError&)> callback_transfer_aborted{}; /* an error happened while transferring the data */ - std::function&, const TransferStatistics&)> callback_transfer_statistics{}; -#endif void callback_transfer_registered(const std::shared_ptr&); void callback_transfer_started(const std::shared_ptr&); void callback_transfer_finished(const std::shared_ptr&); diff --git a/server/src/VirtualServer.cpp b/server/src/VirtualServer.cpp index 8dd168f..9a5dbb8 100644 --- a/server/src/VirtualServer.cpp +++ b/server/src/VirtualServer.cpp @@ -138,7 +138,7 @@ bool VirtualServer::initialize(bool test_properties) { } channelTree->deleteSemiPermanentChannels(); - if(channelTree->channel_count() == 0){ + if(channelTree->channel_count() == 0) { logMessage(this->serverId, "Creating new channel tree (Copy from server 0)"); LOG_SQL_CMD(sql::command(this->getSql(), "INSERT INTO `channels` (`serverId`, `channelId`, `type`, `parentId`) SELECT :serverId AS `serverId`, `channelId`, `type`, `parentId` FROM `channels` WHERE `serverId` = 0", variable{":serverId", this->serverId}).execute()); LOG_SQL_CMD(sql::command(this->getSql(), "INSERT INTO `properties` (`serverId`, `type`, `id`, `key`, `value`) SELECT :serverId AS `serverId`, `type`, `id`, `key`, `value` FROM `properties` WHERE `serverId` = 0 AND `type` = :type", diff --git a/server/src/client/ConnectedClient.cpp b/server/src/client/ConnectedClient.cpp index f4d3dfb..62bf627 100644 --- a/server/src/client/ConnectedClient.cpp +++ b/server/src/client/ConnectedClient.cpp @@ -575,16 +575,16 @@ bool ConnectedClient::notifyClientNeededPermissions() { return true; } -inline void write_command_result_error(ts::command_builder_bulk bulk, const command_result& result) { - bulk.put_unchecked("id", (uint32_t) result.error_code()); +inline void write_command_result_error(ts::command_builder_bulk bulk, const command_result& result, const std::string& errorCodeKey) { + bulk.put_unchecked(errorCodeKey, (uint32_t) result.error_code()); bulk.put_unchecked("msg", findError(result.error_code()).message); if(result.is_permission_error()) bulk.put_unchecked("failed_permid", (uint32_t) result.permission_id()); } -inline void write_command_result_detailed(ts::command_builder_bulk bulk, const command_result& result) { +inline void write_command_result_detailed(ts::command_builder_bulk bulk, const command_result& result, const std::string& errorCodeKey) { auto details = result.details(); - bulk.put_unchecked("id", (uint32_t) details->error_id); + bulk.put_unchecked(errorCodeKey, (uint32_t) details->error_id); bulk.put_unchecked("msg", findError(details->error_id).message); for(const auto& extra : details->extra_properties) @@ -594,25 +594,34 @@ inline void write_command_result_detailed(ts::command_builder_bulk bulk, const c bool ConnectedClient::notifyError(const command_result& result, const std::string& retCode) { ts::command_builder command{"error"}; + this->writeCommandResult(command, result); + if(!retCode.empty()) + command.put_unchecked(0, "return_code", retCode); + + this->sendCommand(command); + return true; +} + +void ConnectedClient::writeCommandResult(ts::command_builder &cmd_builder, const command_result &result, const std::string& errorCodeKey) { switch(result.type()) { case command_result_type::error: - write_command_result_error(command.bulk(0), result); + write_command_result_error(cmd_builder.bulk(0), result, errorCodeKey); break; case command_result_type::detailed: - write_command_result_detailed(command.bulk(0), result); + write_command_result_detailed(cmd_builder.bulk(0), result, errorCodeKey); break; case command_result_type::bulked: { auto bulks = result.bulks(); - command.reserve_bulks(bulks->size()); + cmd_builder.reserve_bulks(bulks->size()); for(size_t index{0}; index < bulks->size(); index++) { auto& entry = bulks->at(index); switch (entry.type()) { case command_result_type::error: - write_command_result_error(command.bulk(index), entry); + write_command_result_error(cmd_builder.bulk(index), entry, errorCodeKey); break; case command_result_type::detailed: - write_command_result_detailed(command.bulk(index), entry); + write_command_result_detailed(cmd_builder.bulk(index), entry, errorCodeKey); break; case command_result_type::bulked: assert(false); @@ -621,8 +630,8 @@ bool ConnectedClient::notifyError(const command_result& result, const std::strin } if(bulks->empty()) { logWarning(this->getServerId(), "{} Trying to send empty error bulk.", CLIENT_STR_LOG_PREFIX_(this)); - command.put_unchecked(0, "id", (uint32_t) error::ok); - command.put_unchecked(0, "msg", findError(error::ok).message); + cmd_builder.put_unchecked(0, errorCodeKey, (uint32_t) error::ok); + cmd_builder.put_unchecked(0, "msg", findError(error::ok).message); } break; } @@ -630,12 +639,6 @@ bool ConnectedClient::notifyError(const command_result& result, const std::strin assert(false); break; } - - if(!retCode.empty()) - command.put_unchecked(0, "return_code", retCode); - - this->sendCommand(command); - return true; } inline std::shared_ptr pop_view_entry(std::deque>& pool, ChannelId id) { diff --git a/server/src/client/ConnectedClient.h b/server/src/client/ConnectedClient.h index f62511c..11ae14b 100644 --- a/server/src/client/ConnectedClient.h +++ b/server/src/client/ConnectedClient.h @@ -122,6 +122,7 @@ namespace ts { /** Notifies general stuff **/ virtual bool notifyError(const command_result&, const std::string& retCode = ""); + virtual void writeCommandResult(ts::command_builder&, const command_result&, const std::string& errorCodeKey = "id"); /** Notifies (after request) */ bool sendNeededPermissions(bool /* force an update */); /* invoke this because it dosn't spam the client */ @@ -597,7 +598,6 @@ namespace ts { command_result handleCommandConversationMessageDelete(Command&); command_result handleCommandLogView(Command&); - //CMD_TODO handleCommandLogAdd command_result handleCommandLogQuery(Command&); command_result handleCommandLogAdd(Command&); diff --git a/server/src/client/command_handler/misc.cpp b/server/src/client/command_handler/misc.cpp index b2c6bfb..0e441e6 100644 --- a/server/src/client/command_handler/misc.cpp +++ b/server/src/client/command_handler/misc.cpp @@ -4,8 +4,6 @@ #include -#include - #include #include #include @@ -15,7 +13,6 @@ #include "../InternalClient.h" #include "../../server/VoiceServer.h" #include "../voice/VoiceClient.h" -#include "PermissionManager.h" #include "../../InstanceHandler.h" #include "../../server/QueryServer.h" #include "../music/MusicClient.h" @@ -30,16 +27,10 @@ #include "helpers.h" -#include #include #include -#include -#include -#include #include -#include #include -#include #include namespace fs = std::experimental::filesystem; @@ -132,8 +123,34 @@ command_result ConnectedClient::handleCommand(Command &cmd) { else if (command == "ftgetfilelist") return this->handleCommandFTGetFileList(cmd); else if (command == "ftcreatedir") return this->handleCommandFTCreateDir(cmd); else if (command == "ftdeletefile") return this->handleCommandFTDeleteFile(cmd); - else if (command == "ftinitupload") return this->handleCommandFTInitUpload(cmd); - else if (command == "ftinitdownload") return this->handleCommandFTInitDownload(cmd); + else if (command == "ftinitupload") { + auto result = this->handleCommandFTInitUpload(cmd); + if(result.has_error() && this->getType() == ClientType::CLIENT_TEAMSPEAK) { + ts::command_builder notify{"notifystatusfiletransfer"}; + notify.put_unchecked(0, "clientftfid", cmd["clientftfid"].string()); + notify.put(0, "size", 0); + this->writeCommandResult(notify, result, "status"); + this->sendCommand(notify); + result.release_data(); + + return command_result{error::ok}; + } + return result; + } + else if (command == "ftinitdownload") { + auto result = this->handleCommandFTInitDownload(cmd); + if(result.has_error() && this->getType() == ClientType::CLIENT_TEAMSPEAK) { + ts::command_builder notify{"notifystatusfiletransfer"}; + notify.put_unchecked(0, "clientftfid", cmd["clientftfid"].string()); + notify.put(0, "size", 0); + this->writeCommandResult(notify, result, "status"); + this->sendCommand(notify); + result.release_data(); + + return command_result{error::ok}; + } + return result; + } else if (command == "ftgetfileinfo") return this->handleCommandFTGetFileInfo(cmd); else if (command == "ftrenamefile") return this->handleCommandFTRenameFile(cmd); else if (command == "ftlist") return this->handleCommandFTList(cmd); @@ -2523,6 +2540,7 @@ command_result ConnectedClient::handleCommandConversationHistory(ts::Command &co auto channel = this->channel_view()->find_channel(conversation_id); if(!channel) return command_result{error::conversation_invalid_id}; + if(channel->channel()->properties()[property::CHANNEL_FLAG_CONVERSATION_PRIVATE].as()) return command_result{error::conversation_is_private}; } @@ -2684,11 +2702,13 @@ command_result ConnectedClient::handleCommandConversationFetch(ts::Command &cmd) } auto conversation = conversation_manager->get(conversation_id); - if(conversation) + if(conversation) { result_bulk["timestamp"] = duration_cast(conversation->last_message().time_since_epoch()).count(); - else + result_bulk["flag_volatile"] = conversation->volatile_only(); + } else { result_bulk["timestamp"] = 0; - result_bulk["flag_volatile"] = conversation->volatile_only(); + result_bulk["flag_volatile"] = false; + } } if(result_index == 0) return command_result{error::database_empty_result}; @@ -2737,7 +2757,7 @@ command_result ConnectedClient::handleCommandConversationMessageDelete(ts::Comma if (!channel->passwordMatch(bulk["cpw"], true)) ACTION_REQUIRES_PERMISSION(permission::b_channel_join_ignore_password, 1, channel->channelId()); - if (!permission::v2::permission_granted(1, this->calculate_permission(permission::b_channel_conversation_message_delete, 1, channel->channelId()))) + if (!permission::v2::permission_granted(1, this->calculate_permission(permission::b_channel_conversation_message_delete, channel->channelId()))) return command_result{permission::b_channel_conversation_message_delete}; if(auto error_perm = this->calculate_and_get_join_state(channel); error_perm != permission::ok && error_perm != permission::b_client_is_sticky) @@ -2753,6 +2773,7 @@ command_result ConnectedClient::handleCommandConversationMessageDelete(ts::Comma auto limit = bulk.has("limit") ? bulk["limit"].as() : 1; if(limit > 100) limit = 100; + auto delete_count = current_conversation->delete_messages(timestamp_end, limit, timestamp_begin, bulk["cldbid"]); if(delete_count > 0) { for(const auto& client : ref_server->getClients()) { diff --git a/server/src/client/command_handler/music.cpp b/server/src/client/command_handler/music.cpp index 3933172..6c57c89 100644 --- a/server/src/client/command_handler/music.cpp +++ b/server/src/client/command_handler/music.cpp @@ -75,7 +75,7 @@ command_result ConnectedClient::handleCommandMusicBotCreate(Command& cmd) { permission::i_client_music_create_modify_max_volume }, this->getChannelId()); - auto permissions = map(permissions_list.begin(), permissions_list.end()); + auto permissions = std::map(permissions_list.begin(), permissions_list.end()); auto max_bots = permissions[permission::i_client_music_limit]; if(max_bots.has_value) { diff --git a/server/src/manager/ConversationManager.cpp b/server/src/manager/ConversationManager.cpp index c404f46..72113e6 100644 --- a/server/src/manager/ConversationManager.cpp +++ b/server/src/manager/ConversationManager.cpp @@ -806,6 +806,7 @@ void Conversation::process_write_queue(const std::chrono::system_clock::time_poi } } } + /* test if we have a block or create a new one at the begin of the file */ if(!this->last_message_block) { //Note: If we reuse blocks we've to reorder them within message_blocks (newest blocks are at the end) @@ -1003,6 +1004,7 @@ void Conversation::register_message(ts::ClientDbId sender_database_id, const std while(this->_last_messages.size() > this->_last_messages_limit) this->_last_messages.pop_front(); /* TODO: Use a iterator for delete to improve performance */ } + if(!this->volatile_only()) { { lock_guard lock(this->_write_queue_lock);