diff --git a/file/local_server/LocalFileProvider.h b/file/local_server/LocalFileProvider.h index 617c666..22af126 100644 --- a/file/local_server/LocalFileProvider.h +++ b/file/local_server/LocalFileProvider.h @@ -178,11 +178,10 @@ namespace ts::server::file { size_t bytes{0}; bool buffering_stopped{false}; + bool write_disconnected{false}; Buffer* buffer_head{nullptr}; Buffer** buffer_tail{&buffer_head}; - - bool flushed{false}; } network_buffer{}; struct { @@ -190,11 +189,10 @@ namespace ts::server::file { size_t bytes{0}; bool buffering_stopped{false}; + bool write_disconnected{false}; Buffer* buffer_head{nullptr}; Buffer** buffer_tail{&buffer_head}; - - bool flushed{false}; } disk_buffer{}; struct { @@ -258,6 +256,10 @@ namespace ts::server::file { bool enqueue_network_buffer_bytes(const void* /* buffer */, size_t /* length */); bool enqueue_disk_buffer_bytes(const void* /* buffer */, size_t /* length */); + /* these function clear the buffers and set the write disconnected flags to true so no new buffers will be enqueued */ + void flush_network_buffer(); + void flush_disk_buffer(); + [[nodiscard]] inline std::string log_prefix() const { return "[" + net::to_string(this->networking.address) + "]"; } }; @@ -462,7 +464,7 @@ namespace ts::server::file { void shutdown_disk_io(); void shutdown_client_worker(); - void disconnect_client(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */, bool /* flush */); + void disconnect_client(const std::shared_ptr& /* client */, std::unique_lock& /* state lock */, bool /* flush network */); [[nodiscard]] NetworkInitializeResult initialize_networking(const std::shared_ptr& /* client */, int /* file descriptor */); /* might block 'till all IO operations have been succeeded */ @@ -477,6 +479,8 @@ namespace ts::server::file { void enqueue_disk_io(const std::shared_ptr& /* client */); void execute_disk_io(const std::shared_ptr& /* client */); + void test_disconnecting_state(const std::shared_ptr& /* client */); + [[nodiscard]] TransferUploadRawResult handle_transfer_upload_raw(const std::shared_ptr& /* client */, const char * /* buffer */, size_t /* length */, size_t* /* bytes written */); [[nodiscard]] TransferUploadHTTPResult handle_transfer_upload_http(const std::shared_ptr& /* client */, const char * /* buffer */, size_t /* length */); diff --git a/file/local_server/LocalFileTransfer.cpp b/file/local_server/LocalFileTransfer.cpp index cd6df12..0a19d1e 100644 --- a/file/local_server/LocalFileTransfer.cpp +++ b/file/local_server/LocalFileTransfer.cpp @@ -28,22 +28,11 @@ void transfer::free_buffer(Buffer* buffer) { } FileClient::~FileClient() { - { - auto head = this->network_buffer.buffer_head; - while (head) { - auto next = head->next; - free_buffer(head); - head = next; - } - } - { - auto head = this->disk_buffer.buffer_head; - while (head) { - auto next = head->next; - free_buffer(head); - head = next; - } - } + this->flush_network_buffer(); + this->flush_disk_buffer(); + + assert(!this->disk_buffer.buffer_head); + assert(!this->network_buffer.buffer_head); assert(!this->file.file_descriptor); assert(!this->file.currently_processing); diff --git a/file/local_server/LocalFileTransferClientWorker.cpp b/file/local_server/LocalFileTransferClientWorker.cpp index 09ddcaa..774c324 100644 --- a/file/local_server/LocalFileTransferClientWorker.cpp +++ b/file/local_server/LocalFileTransferClientWorker.cpp @@ -67,6 +67,26 @@ void LocalFileTransfer::disconnect_client(const std::shared_ptr &cli #undef del_ev_noblock } +void LocalFileTransfer::test_disconnecting_state(const std::shared_ptr &client) { + if(client->state != FileClient::STATE_DISCONNECTING) + return; + + { + std::lock_guard db_lock{client->disk_buffer.mutex}; + std::lock_guard nb_lock{client->network_buffer.mutex}; + + if(client->disk_buffer.bytes > 0) + return; + + if(client->network_buffer.bytes > 0) + return; + } + + debugMessage(LOG_FT, "{} Disk and network buffers are flushed.", client->log_prefix()); + std::unique_lock s_lock{client->state_mutex}; + this->disconnect_client(client, s_lock, false); +} + void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) { auto provider = reinterpret_cast(ptr_transfer); diff --git a/file/local_server/LocalFileTransferDisk.cpp b/file/local_server/LocalFileTransferDisk.cpp index 4c52bca..f8539da 100644 --- a/file/local_server/LocalFileTransferDisk.cpp +++ b/file/local_server/LocalFileTransferDisk.cpp @@ -107,13 +107,37 @@ bool FileClient::enqueue_disk_buffer_bytes(const void *snd_buffer, size_t size) size_t buffer_size; { std::lock_guard block{this->disk_buffer.mutex}; + if(this->disk_buffer.write_disconnected) + goto write_disconnected; + *this->disk_buffer.buffer_tail = tbuffer; this->disk_buffer.buffer_tail = &tbuffer->next; - buffer_size = (this->disk_buffer.bytes += size); } return buffer_size > TRANSFER_MAX_CACHED_BYTES; + + write_disconnected: + free_buffer(tbuffer); + return false; +} + +void FileClient::flush_disk_buffer() { + Buffer* current_head; + { + std::lock_guard block{this->disk_buffer.mutex}; + + this->disk_buffer.write_disconnected = true; + this->disk_buffer.bytes = 0; + current_head = std::exchange(this->disk_buffer.buffer_head, nullptr); + this->disk_buffer.buffer_tail = &this->disk_buffer.buffer_head; + } + + while(current_head) { + auto next = current_head->next; + free_buffer(current_head); + current_head = next; + } } FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr &transfer) { @@ -226,8 +250,6 @@ FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr } } } else if(transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { - transfer->disk_buffer.flushed = true; /* we're not using this buffer, so it will be flushed all the times */ - auto file_size = lseek(file_data.file_descriptor, 0, SEEK_END); if(file_size != transfer->transfer->expected_file_size) { logWarning(LOG_FT, "{} Expected target file to be of size {}, but file is actually of size {}", transfer->log_prefix(), transfer->transfer->expected_file_size, file_size); @@ -387,6 +409,8 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien this->invoke_aborted_callback(client, { TransferError::UNEXPECTED_DISK_EOF, strerror(errno) }); + + client->flush_disk_buffer(); { std::unique_lock slock{client->state_mutex}; client->handle->disconnect_client(client, slock, true); @@ -404,6 +428,8 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien client->log_prefix(), offset_written, client->transfer->expected_file_size, aoffset); this->invoke_aborted_callback(client, { TransferError::DISK_IO_ERROR, strerror(errno) }); + + client->flush_disk_buffer(); { std::unique_lock slock{client->state_mutex}; client->handle->disconnect_client(client, slock, true); @@ -442,22 +468,7 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien if(buffer_left_size > 0) { this->enqueue_disk_io(client); } else if(client->state == FileClient::STATE_DISCONNECTING) { - { - std::lock_guard nb_lock{client->network_buffer.mutex}; - { - std::lock_guard db_lock{client->disk_buffer.mutex}; - if(std::exchange(client->disk_buffer.flushed, true)) - return; - } - if(!client->network_buffer.flushed) { - logTrace(LOG_FT, "{} Disk IO has been flushed, awaiting network buffer flush.", client->log_prefix()); - return; - } - logTrace(LOG_FT, "{} Disk IO and network buffer have been flushed.", client->log_prefix()); - } - - std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, false); + this->test_disconnecting_state(client); } if(client->state == FileClient::STATE_TRANSFERRING && buffer_left_size < TRANSFER_MAX_CACHED_BYTES / 2) { @@ -466,7 +477,10 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien client->add_network_read_event(false); } } else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { - if(client->state == FileClient::STATE_DISCONNECTING) return; + if(client->state == FileClient::STATE_DISCONNECTING) { + client->flush_disk_buffer(); /* just in case */ + return; + } while(true) { constexpr auto buffer_capacity{4096}; @@ -487,11 +501,6 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien this->invoke_aborted_callback(client, { TransferError::UNEXPECTED_DISK_EOF, "" }); } - - { - std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client, slock, true); - } } else { if(errno == EAGAIN) { this->enqueue_disk_io(client); @@ -501,11 +510,10 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien logWarning(LOG_FT, "{} Failed to read from file {} ({}/{}). Aborting transfer.", client->log_prefix(), client->transfer->absolute_file_path, errno, strerror(errno)); this->invoke_aborted_callback(client, { TransferError::DISK_IO_ERROR, strerror(errno) }); - { - std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client, slock, true); - } } + + std::unique_lock slock{client->state_mutex}; + client->handle->disconnect_client(client, slock, true); return; } else { auto buffer_full = client->send_file_bytes(buffer, read); diff --git a/file/local_server/LocalFileTransferNetwork.cpp b/file/local_server/LocalFileTransferNetwork.cpp index cd13092..16d514f 100644 --- a/file/local_server/LocalFileTransferNetwork.cpp +++ b/file/local_server/LocalFileTransferNetwork.cpp @@ -12,6 +12,7 @@ #include "./LocalFileProvider.h" #include "./duration_utils.h" #include "HTTPUtils.h" +#include "LocalFileProvider.h" #if defined(TCP_CORK) && !defined(TCP_NOPUSH) #define TCP_NOPUSH TCP_CORK @@ -132,6 +133,8 @@ bool FileClient::enqueue_network_buffer_bytes(const void *snd_buffer, size_t siz size_t buffer_size; { std::lock_guard block{this->network_buffer.mutex}; + if(this->network_buffer.write_disconnected) + goto write_disconnected; *this->network_buffer.buffer_tail = tbuffer; this->network_buffer.buffer_tail = &tbuffer->next; @@ -140,6 +143,28 @@ bool FileClient::enqueue_network_buffer_bytes(const void *snd_buffer, size_t siz this->add_network_write_event(false); return buffer_size > TRANSFER_MAX_CACHED_BYTES; + + write_disconnected: + free_buffer(tbuffer); + return false; +} + +void FileClient::flush_network_buffer() { + Buffer* current_head; + { + std::lock_guard block{this->network_buffer.mutex}; + + this->network_buffer.write_disconnected = true; + this->network_buffer.bytes = 0; + current_head = std::exchange(this->network_buffer.buffer_head, nullptr); + this->network_buffer.buffer_tail = &this->network_buffer.buffer_head; + } + + while(current_head) { + auto next = current_head->next; + free_buffer(current_head); + current_head = next; + } } NetworkingStartResult LocalFileTransfer::start_networking() { @@ -347,16 +372,18 @@ bool LocalFileTransfer::initialize_client_ssl(const std::shared_ptr if(!ssl_option_supplier || !(options = ssl_option_supplier())) { logError(0, "{} Failed to initialize client SSL pipe because we've no SSL options.", client->log_prefix()); + client->flush_network_buffer(); /* invalidate all network write operations */ std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client, slock, false); + client->handle->disconnect_client(client, slock, true); return false; } if(!ssl_pipe.initialize(options, error)) { logWarning(0, "{} Failed to initialize client SSL pipe ({}). Disconnecting client.", client->log_prefix(), error); + client->flush_network_buffer(); /* invalidate all network write operations */ std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client, slock, false); + client->handle->disconnect_client(client, slock, true); return false; } @@ -470,7 +497,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi if(read == 0) { std::unique_lock slock{transfer->state_mutex}; auto original_state = transfer->state; - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false); + transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true); slock.unlock(); switch(original_state) { @@ -505,7 +532,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi std::unique_lock slock{transfer->state_mutex}; auto original_state = transfer->state; - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false); + transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true); slock.unlock(); switch(original_state) { @@ -580,44 +607,20 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo if((unsigned) events & (unsigned) EV_TIMEOUT) { if(transfer->state == FileClient::STATE_DISCONNECTING) { { - std::lock_guard nb_lock{transfer->network_buffer.mutex}; - std::lock_guard db_lock{transfer->disk_buffer.mutex}; - if(!std::exchange(transfer->network_buffer.flushed, true)) { + std::unique_lock nb_lock{transfer->network_buffer.mutex}; + if(transfer->network_buffer.bytes > 0) { + nb_lock.unlock(); + transfer->flush_network_buffer(); debugMessage(LOG_FT, "{} Failed to flush networking buffer in given timeout. Marking it as flushed.", transfer->log_prefix()); } - if(!transfer->disk_buffer.flushed) { - logTrace(LOG_FT, "{} Disk IO hasn't been fully flushed yet, awaiting disk IO flush.", transfer->log_prefix()); - return; - } - logTrace(LOG_FT, "{} Disk IO and network buffer have been flushed.", transfer->log_prefix()); } - { - std::unique_lock slock{transfer->state_mutex}; - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false); - } + transfer->handle->test_disconnecting_state(transfer->shared_from_this()); return; } } if((unsigned) events & (unsigned) EV_WRITE) { - if(transfer->state != FileClient::STATE_DISCONNECTING && transfer->state != FileClient::STATE_TRANSFERRING) { - if(!(transfer->state == FileClient::STATE_AWAITING_KEY && transfer->networking.protocol == FileClient::PROTOCOL_HTTPS)) { - debugMessage(LOG_FT, "{} Tried to write data to send only stream. Dropping buffers.", transfer->log_prefix()); - - std::unique_lock block{transfer->network_buffer.mutex}; - auto head = std::exchange(transfer->network_buffer.buffer_head, nullptr); - transfer->network_buffer.buffer_tail = &transfer->network_buffer.buffer_head; - - while(head) { - auto next = head->next; - free_buffer(head); - head = next; - } - return; - } - } - Buffer* buffer{nullptr}; size_t buffer_left_size{0}; @@ -637,37 +640,30 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo assert(buffer->offset < buffer->length); auto written = ::send(fd, buffer->data + buffer->offset, std::min(buffer->length - buffer->offset, max_write_bytes), MSG_DONTWAIT | MSG_NOSIGNAL); if(written <= 0) { - if(transfer->state != FileClient::STATE_TRANSFERRING) { - std::unique_lock slock{transfer->state_mutex}; - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false); - return; + if(errno == EAGAIN) { + transfer->add_network_write_event(false); + break; } - if(written == 0) { - /* EOF, how the hell is this event possible?! (Read should already catch it) */ - logError(LOG_FT, "{} Client disconnected unexpectedly on write. Send {} bytes out of {}.", - transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1); + if(transfer->state == FileClient::STATE_TRANSFERRING) { + assert(transfer->transfer); + if(written == 0) { + /* EOF, how the hell is this event possible?! (Read should already catch it) */ + logError(LOG_FT, "{} Client disconnected unexpectedly on write. Send {} bytes out of {}.", + transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1); - transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" }); - { - std::unique_lock slock{transfer->state_mutex}; - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false); - } - } else { - if(errno == EAGAIN) { - transfer->add_network_write_event(false); - break; - } + transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" }); + } else { + logError(LOG_FT, "{} Received network write error. Send {} bytes out of {}. Closing transfer.", + transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1); - logError(LOG_FT, "{} Received network write error. Send {} bytes out of {}. Closing transfer.", - transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1); - - transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) }); - { - std::unique_lock slock{transfer->state_mutex}; - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false); + transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) }); } } + + transfer->flush_network_buffer(); /* invalidate all network write operations */ + std::unique_lock slock{transfer->state_mutex}; + transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true); return; } else { buffer->offset += written; @@ -699,22 +695,10 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo } } - if(buffer_left_size > 0) + if(buffer_left_size > 0) { transfer->add_network_write_event(false); - else if(transfer->state == FileClient::STATE_DISCONNECTING) { - { - std::lock_guard nb_lock{transfer->network_buffer.mutex}; - std::lock_guard db_lock{transfer->disk_buffer.mutex}; - if(std::exchange(transfer->network_buffer.flushed, true)) - return; - - if(!transfer->disk_buffer.flushed) { - debugMessage(LOG_FT, "{} Disk IO hasn't been fully flushed yet, awaiting disk IO flush.", transfer->log_prefix()); - return; - } - - debugMessage(LOG_FT, "{} Disk IO and network buffer have been flushed.", transfer->log_prefix()); - } + } else if(transfer->state == FileClient::STATE_DISCONNECTING) { + transfer->handle->test_disconnecting_state(transfer->shared_from_this()); if(!std::exchange(transfer->finished_signal_send, true)) { if(transfer->transfer && transfer->statistics.file_transferred.total_bytes + transfer->transfer->file_offset == transfer->transfer->expected_file_size) { @@ -724,7 +708,9 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo callback(transfer->transfer); } } + std::unique_lock slock{transfer->state_mutex}; + /* no need to flush here, since we read only from the disk and all bytes which sould be send have been written already */ transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false); return; } @@ -753,7 +739,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptrlog_prefix()); std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, false); + client->handle->disconnect_client(client->shared_from_this(), slock, true); return (size_t) -1; } @@ -761,7 +747,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptrlog_prefix()); std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, false); + client->handle->disconnect_client(client->shared_from_this(), slock, true); return (size_t) -1; } @@ -831,7 +817,7 @@ size_t LocalFileTransfer::handle_transfer_read_raw(const std::shared_ptrstate_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, false); + client->handle->disconnect_client(client->shared_from_this(), slock, true); return (size_t) -1; } @@ -871,9 +857,9 @@ size_t LocalFileTransfer::handle_transfer_read(const std::shared_ptr if(!http::parse_request(std::string{header_view.data(), header_end}, request)) { logError(LOG_FT, "{} Failed to parse HTTP request. Disconnecting client.", client->log_prefix()); - std::unique_lock slock{client->state_mutex}; - client->handle->disconnect_client(client->shared_from_this(), slock, true); - return (size_t) -1; + response.code = http::code::code(400, "Bad Request"); + response.setHeader("x-error-message", { "failed to parse http request" }); + goto send_response_exit; } if(auto header = request.findHeader("Sec-Fetch-Mode"); request.method == "OPTIONS") {