From 960186d55ea368723281aedd5ff6dc2b3f0d0017 Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sun, 3 Jan 2021 17:16:23 +0100 Subject: [PATCH] Fixed a crash related to file transfers --- file/local_server/LocalFileTransfer.cpp | 13 ++- file/local_server/LocalFileTransfer.h | 14 ++- file/local_server/LocalFileTransferDisk.cpp | 92 ++++++++++++------- .../local_server/LocalFileTransferNetwork.cpp | 53 +++++++---- git-teaspeak | 2 +- rtclib | 2 +- server/src/client/SpeakingClient.cpp | 8 -- 7 files changed, 112 insertions(+), 72 deletions(-) diff --git a/file/local_server/LocalFileTransfer.cpp b/file/local_server/LocalFileTransfer.cpp index ff147b7..7d79953 100644 --- a/file/local_server/LocalFileTransfer.cpp +++ b/file/local_server/LocalFileTransfer.cpp @@ -22,9 +22,16 @@ Buffer* transfer::allocate_buffer(size_t size) { return buffer; } -void transfer::free_buffer(Buffer* buffer) { - buffer->~Buffer(); - free(buffer); +Buffer* transfer::ref_buffer(Buffer *buffer) { + buffer->ref_count++; + return buffer; +} + +void transfer::deref_buffer(Buffer *buffer) { + if(--buffer->ref_count == 0) { + buffer->~Buffer(); + free(buffer); + } } FileClient::~FileClient() { diff --git a/file/local_server/LocalFileTransfer.h b/file/local_server/LocalFileTransfer.h index 2188f79..b18088d 100644 --- a/file/local_server/LocalFileTransfer.h +++ b/file/local_server/LocalFileTransfer.h @@ -29,14 +29,17 @@ namespace ts::server::file::transfer { struct Buffer { Buffer* next{nullptr}; - size_t capacity{0}; - size_t length{0}; - size_t offset{0}; + std::atomic_uint32_t ref_count{0}; + uint32_t capacity{0}; + uint32_t length{0}; + uint32_t offset{0}; char data[1]{}; }; + [[nodiscard]] extern Buffer* allocate_buffer(size_t); - extern void free_buffer(Buffer*); + [[nodiscard]] extern Buffer* ref_buffer(Buffer*); + extern void deref_buffer(Buffer*); /* all variables are locked via the state_mutex */ struct FileClient : std::enable_shared_from_this { @@ -85,6 +88,7 @@ namespace ts::server::file::transfer { } transfer_key{}; struct { + /* TODO: Could be a spin lock (never gets locked while writing so no long blocking activity) */ std::mutex mutex{}; size_t bytes{0}; @@ -126,7 +130,7 @@ namespace ts::server::file::transfer { pipes::SSL pipe_ssl{}; bool pipe_ssl_init{false}; - std::unique_ptr http_header_buffer{nullptr, free_buffer}; + std::unique_ptr http_header_buffer{nullptr, deref_buffer}; HTTPUploadState http_state{HTTPUploadState::HTTP_AWAITING_HEADER}; std::string http_boundary{}; diff --git a/file/local_server/LocalFileTransferDisk.cpp b/file/local_server/LocalFileTransferDisk.cpp index f08a894..5d21850 100644 --- a/file/local_server/LocalFileTransferDisk.cpp +++ b/file/local_server/LocalFileTransferDisk.cpp @@ -71,8 +71,9 @@ void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) { } provider->disk_io.queue_head = provider->disk_io.queue_head->file.next_client; - if(!provider->disk_io.queue_head) + if(!provider->disk_io.queue_head) { provider->disk_io.queue_tail = &provider->disk_io.queue_head; + } } if(provider->disk_io.state != DiskIOLoopState::RUNNING) { @@ -83,8 +84,9 @@ void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) { } else { /* force stopping without any flushing */ auto fclient = &*client; - while(fclient) + while(fclient) { fclient = std::exchange(fclient->file.next_client, nullptr); + } provider->disk_io.queue_head = nullptr; provider->disk_io.queue_tail = &provider->disk_io.queue_head; @@ -92,8 +94,9 @@ void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) { } } - if(!client) + if(!client) { continue; + } client->file.currently_processing = true; client->file.next_client = nullptr; @@ -124,7 +127,7 @@ bool FileClient::enqueue_disk_buffer_bytes(const void *snd_buffer, size_t size) return buffer_size > TRANSFER_MAX_CACHED_BYTES; write_disconnected: - free_buffer(tbuffer); + deref_buffer(tbuffer); return false; } @@ -141,7 +144,7 @@ void FileClient::flush_disk_buffer() { while(current_head) { auto next = current_head->next; - free_buffer(current_head); + deref_buffer(current_head); current_head = next; } } @@ -410,7 +413,9 @@ void LocalFileTransfer::enqueue_disk_io(const std::shared_ptr &clien } void LocalFileTransfer::execute_disk_io(const std::shared_ptr &client) { - if(!client->transfer) return; + if(!client->transfer) { + return; + } if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) { Buffer* buffer; @@ -419,17 +424,28 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien while(true) { { std::lock_guard block{client->disk_buffer.mutex}; - buffer = client->disk_buffer.buffer_head; + + if(!client->disk_buffer.buffer_head) { + assert(client->disk_buffer.bytes == 0); + buffer_left_size = 0; + break; + } + buffer_left_size = client->disk_buffer.bytes; - } - if(!buffer) { - assert(buffer_left_size == 0); - break; + buffer = ref_buffer(client->disk_buffer.buffer_head); } assert(buffer->offset < buffer->length); auto written = ::write(client->file.file_descriptor, buffer->data + buffer->offset, buffer->length - buffer->offset); if(written <= 0) { + deref_buffer(buffer); + + if(errno == EAGAIN) { + //TODO: Timeout? + this->enqueue_disk_io(client); + break; + } + if(written == 0) { /* EOF, how the hell is this event possible?! */ auto offset_written = client->statistics.disk_bytes_write.total_bytes + client->transfer->file_offset; @@ -446,12 +462,6 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien client->handle->disconnect_client(client, slock, true); } } else { - if(errno == EAGAIN) { - //TODO: Timeout? - this->enqueue_disk_io(client); - break; - } - auto offset_written = client->statistics.disk_bytes_write.total_bytes + client->transfer->file_offset; auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR); logError(LOG_FT, "{} Received write to disk IO error. Write pointer is at {} of {}. Actual file offset: {}. Closing transfer.", @@ -469,28 +479,38 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien } else { buffer->offset += written; assert(buffer->offset <= buffer->length); + if(buffer->length == buffer->offset) { { std::lock_guard block{client->disk_buffer.mutex}; - client->disk_buffer.buffer_head = buffer->next; - if(!buffer->next) - client->disk_buffer.buffer_tail = &client->disk_buffer.buffer_head; + if(client->disk_buffer.buffer_head == buffer) { + client->disk_buffer.buffer_head = buffer->next; + if(!buffer->next) { + client->disk_buffer.buffer_tail = &client->disk_buffer.buffer_head; + } + assert(client->disk_buffer.bytes >= written); + client->disk_buffer.bytes -= written; + buffer_left_size = client->disk_buffer.bytes; + } else { + /* The buffer got removed */ + } + } + + /* We have to deref the buffer twice since we've removed it from the list which owns us one reference */ + deref_buffer(buffer); + } else { + std::lock_guard block{client->disk_buffer.mutex}; + if(client->disk_buffer.buffer_head == buffer) { assert(client->disk_buffer.bytes >= written); client->disk_buffer.bytes -= written; buffer_left_size = client->disk_buffer.bytes; - (void) buffer_left_size; /* trick my IDE here a bit */ + } else { + /* The buffer got removed */ } - - free_buffer(buffer); - } else { - std::lock_guard block{client->disk_buffer.mutex}; - assert(client->disk_buffer.bytes >= written); - client->disk_buffer.bytes -= written; - buffer_left_size = client->disk_buffer.bytes; - (void) buffer_left_size; /* trick my IDE here a bit */ } + deref_buffer(buffer); client->statistics.disk_bytes_write.increase_bytes(written); } } @@ -502,8 +522,10 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien } if(client->state == FileClient::STATE_TRANSFERRING && buffer_left_size < TRANSFER_MAX_CACHED_BYTES / 2) { - if(client->disk_buffer.buffering_stopped) + if(client->disk_buffer.buffering_stopped) { logMessage(LOG_FT, "{} Starting network read, buffer is capable for reading again.", client->log_prefix()); + } + client->add_network_read_event(false); } } else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { @@ -518,6 +540,11 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien auto read = ::read(client->file.file_descriptor, buffer, buffer_capacity); if(read <= 0) { + if(errno == EAGAIN) { + this->enqueue_disk_io(client); + return; + } + if(read == 0) { /* EOF */ auto offset_send = client->statistics.disk_bytes_read.total_bytes + client->transfer->file_offset; @@ -532,11 +559,6 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien this->invoke_aborted_callback(client, { TransferError::UNEXPECTED_DISK_EOF, "" }); } } else { - if(errno == EAGAIN) { - this->enqueue_disk_io(client); - return; - } - logWarning(LOG_FT, "{} Failed to read from file {} ({}/{}). Aborting transfer.", client->log_prefix(), client->transfer->absolute_file_path, errno, strerror(errno)); this->invoke_aborted_callback(client, { TransferError::DISK_IO_ERROR, strerror(errno) }); diff --git a/file/local_server/LocalFileTransferNetwork.cpp b/file/local_server/LocalFileTransferNetwork.cpp index 50c6d99..4f90b26 100644 --- a/file/local_server/LocalFileTransferNetwork.cpp +++ b/file/local_server/LocalFileTransferNetwork.cpp @@ -145,7 +145,7 @@ bool FileClient::enqueue_network_buffer_bytes(const void *snd_buffer, size_t siz return buffer_size > TRANSFER_MAX_CACHED_BYTES; write_disconnected: - free_buffer(tbuffer); + deref_buffer(tbuffer); return false; } @@ -163,7 +163,7 @@ size_t FileClient::flush_network_buffer() { while(current_head) { auto next = current_head->next; - free_buffer(current_head); + deref_buffer(current_head); current_head = next; } @@ -723,20 +723,24 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo while(true) { { std::lock_guard block{transfer->network_buffer.mutex}; - buffer = transfer->network_buffer.buffer_head; - buffer_left_size = transfer->network_buffer.bytes; - } - if(!buffer) { - break; + if(!transfer->network_buffer.buffer_head) { + buffer_left_size = 0; + assert(transfer->network_buffer.bytes == 0); + break; + } + + buffer = ref_buffer(transfer->network_buffer.buffer_head); + buffer_left_size = transfer->network_buffer.bytes; } const auto max_write_bytes = transfer->networking.throttle.bytes_left(); if(!max_write_bytes) break; /* network throttle */ assert(buffer->offset < buffer->length); - auto written = ::send(fd, buffer->data + buffer->offset, std::min(buffer->length - buffer->offset, max_write_bytes), MSG_DONTWAIT | MSG_NOSIGNAL); + auto written = ::send(fd, buffer->data + buffer->offset, std::min((size_t) (buffer->length - buffer->offset), max_write_bytes), MSG_DONTWAIT | MSG_NOSIGNAL); if(written <= 0) { + deref_buffer(buffer); if(errno == EAGAIN) { transfer->add_network_write_event(false); break; @@ -795,27 +799,38 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo if(buffer->length == buffer->offset) { { std::lock_guard block{transfer->network_buffer.mutex}; - transfer->network_buffer.buffer_head = buffer->next; - if(!buffer->next) - transfer->network_buffer.buffer_tail = &transfer->network_buffer.buffer_head; + if(transfer->network_buffer.buffer_head == buffer) { + transfer->network_buffer.buffer_head = buffer->next; + if(!buffer->next) + transfer->network_buffer.buffer_tail = &transfer->network_buffer.buffer_head; + assert(transfer->network_buffer.bytes >= written); + transfer->network_buffer.bytes -= written; + buffer_left_size = transfer->network_buffer.bytes; + } else { + /* the buffer got remove */ + } + } + + deref_buffer(buffer); + } else { + std::lock_guard block{transfer->network_buffer.mutex}; + if(transfer->network_buffer.buffer_head == buffer) { assert(transfer->network_buffer.bytes >= written); transfer->network_buffer.bytes -= written; buffer_left_size = transfer->network_buffer.bytes; + } else { + /* the buffer got remove */ } - - free_buffer(buffer); - } else { - std::lock_guard block{transfer->network_buffer.mutex}; - assert(transfer->network_buffer.bytes >= written); - transfer->network_buffer.bytes -= written; - buffer_left_size = transfer->network_buffer.bytes; } transfer->timings.last_write = std::chrono::system_clock::now(); transfer->statistics.network_send.increase_bytes(written); - if(transfer->networking.throttle.increase_bytes(written)) + deref_buffer(buffer); + + if(transfer->networking.throttle.increase_bytes(written)) { break; /* we've to slow down */ + } } } diff --git a/git-teaspeak b/git-teaspeak index 171dded..5c2ff84 160000 --- a/git-teaspeak +++ b/git-teaspeak @@ -1 +1 @@ -Subproject commit 171ddedeedc5bce33b75c9f14c3fafd41ed8820e +Subproject commit 5c2ff84e47d4fa905f9b01b1d6bdf783c6fdd808 diff --git a/rtclib b/rtclib index 6beb431..ea13ab4 160000 --- a/rtclib +++ b/rtclib @@ -1 +1 @@ -Subproject commit 6beb431776ea4f21c76ccefcbe30c2daadfa2687 +Subproject commit ea13ab489529ef0ebc72c15505c6be52ed484269 diff --git a/server/src/client/SpeakingClient.cpp b/server/src/client/SpeakingClient.cpp index 46765bd..35720d1 100644 --- a/server/src/client/SpeakingClient.cpp +++ b/server/src/client/SpeakingClient.cpp @@ -23,9 +23,6 @@ using namespace ts::protocol; //#define PKT_LOG_VOICE //#define PKT_LOG_WHISPER -constexpr static auto kMaxWhisperClientNameLength{30}; -constexpr static auto kWhisperClientUniqueIdLength{28}; /* base64 encoded SHA1 hash */ -constexpr static auto kWhisperMaxHeaderLength{2 + 2 + 1 + 2 + kWhisperClientUniqueIdLength + 1 + kMaxWhisperClientNameLength}; SpeakingClient::SpeakingClient(sql::SqlManager *a, const std::shared_ptr &b) : ConnectedClient(a, b), whisper_handler_{this} { speak_begin = std::chrono::system_clock::now(); @@ -78,11 +75,6 @@ inline bool update_whisper_error(std::chrono::system_clock::time_point& last) { return false; } -#define TEST_PARM(type) \ -do {\ - if(!cmd[0][key].castable())\ - return {findError("parameter_invalid"), "Invalid type for " + key};\ -} while(false) auto regex_wildcard = std::regex(".*");