From e9388e5e5e03002ca29a940ee6b4caf203e81a1e Mon Sep 17 00:00:00 2001 From: WolverinDEV Date: Sat, 1 Aug 2020 11:34:24 +0200 Subject: [PATCH] Some minor transfer updates --- file/local_server/LocalFileProvider.h | 2 +- file/local_server/LocalFileTransfer.cpp | 2 +- .../LocalFileTransferClientWorker.cpp | 24 +++++----- file/local_server/LocalFileTransferDisk.cpp | 4 +- .../local_server/LocalFileTransferNetwork.cpp | 45 +++++++++++++++---- server/src/DatabaseHelper.cpp | 2 +- 6 files changed, 52 insertions(+), 27 deletions(-) diff --git a/file/local_server/LocalFileProvider.h b/file/local_server/LocalFileProvider.h index 0b08de5..bafa002 100644 --- a/file/local_server/LocalFileProvider.h +++ b/file/local_server/LocalFileProvider.h @@ -137,7 +137,7 @@ namespace ts::server::file { enum { STATE_AWAITING_KEY, /* includes SSL/HTTP init */ STATE_TRANSFERRING, - STATE_DISCONNECTING, + STATE_FLUSHING, STATE_DISCONNECTED } state{STATE_AWAITING_KEY}; diff --git a/file/local_server/LocalFileTransfer.cpp b/file/local_server/LocalFileTransfer.cpp index 505a1da..2fc5b47 100644 --- a/file/local_server/LocalFileTransfer.cpp +++ b/file/local_server/LocalFileTransfer.cpp @@ -123,7 +123,7 @@ std::shared_ptr>> L std::unique_lock tlock{this->transfers_mutex}; { auto transfers = std::count_if(this->transfers_.begin(), this->transfers_.end(), [&](const std::shared_ptr& client) { - return client->transfer && client->transfer->client_unique_id == info.client_unique_id && client->state < FileClient::STATE_DISCONNECTING; + return client->transfer && client->transfer->client_unique_id == info.client_unique_id && client->state < FileClient::STATE_FLUSHING; }); transfers += std::count_if(this->pending_transfers.begin(), this->pending_transfers.end(), [&](const std::shared_ptr& transfer) { return transfer->client_unique_id == info.client_unique_id; diff --git a/file/local_server/LocalFileTransferClientWorker.cpp b/file/local_server/LocalFileTransferClientWorker.cpp index ac1cd08..ea89ebb 100644 --- a/file/local_server/LocalFileTransferClientWorker.cpp +++ b/file/local_server/LocalFileTransferClientWorker.cpp @@ -36,16 +36,16 @@ void LocalFileTransfer::shutdown_client_worker() { void LocalFileTransfer::disconnect_client(const std::shared_ptr &client, std::unique_lock& state_lock, bool flush) { assert(state_lock.owns_lock()); - if(client->state == FileClient::STATE_DISCONNECTED || (client->state == FileClient::STATE_DISCONNECTING && flush)) { + if(client->state == FileClient::STATE_DISCONNECTED || (client->state == FileClient::STATE_FLUSHING && flush)) { return; /* shall NOT happen */ } #define del_ev_noblock(event) if(event) event_del_noblock(event) - client->state = flush ? FileClient::STATE_DISCONNECTING : FileClient::STATE_DISCONNECTED; + client->state = flush ? FileClient::STATE_FLUSHING : FileClient::STATE_DISCONNECTED; client->timings.disconnecting = std::chrono::system_clock::now(); if(flush) { - const auto network_flush_time = client->networking.client_throttle.expected_writing_time(client->network_buffer.bytes) + std::chrono::seconds{10}; + const auto network_flush_time = client->networking.throttle.expected_writing_time(client->network_buffer.bytes) + std::chrono::seconds{10}; del_ev_noblock(client->networking.event_read); @@ -66,19 +66,15 @@ void LocalFileTransfer::disconnect_client(const std::shared_ptr &cli } void LocalFileTransfer::test_disconnecting_state(const std::shared_ptr &client) { - if(client->state != FileClient::STATE_DISCONNECTING) + if(client->state != FileClient::STATE_FLUSHING) return; if(!client->buffers_flushed()) return; - if(client->networking.protocol != FileClient::PROTOCOL_TS_V1) { - debugMessage(LOG_FT, "{} Disk and network buffers are flushed. Closing connection.", client->log_prefix()); - std::unique_lock s_lock{client->state_mutex}; - this->disconnect_client(client, s_lock, false); - } else { - debugMessage(LOG_FT, "{} Disk and network buffers are flushed. Awaiting client disconnect.", client->log_prefix()); - } + debugMessage(LOG_FT, "{} Disk and network buffers are flushed. Closing connection.", client->log_prefix()); + std::unique_lock s_lock{client->state_mutex}; + this->disconnect_client(client, s_lock, false); } void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) { @@ -100,7 +96,7 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) { switch(transfer->state) { case FileClient::STATE_TRANSFERRING: break; - case FileClient::STATE_DISCONNECTING: + case FileClient::STATE_FLUSHING: if(!transfer->transfer) continue; @@ -163,7 +159,7 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) { } else if(t->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { return t->timings.last_write + std::chrono::seconds{5} < now; } - } else if(t->state == FileClient::STATE_DISCONNECTING) { + } else if(t->state == FileClient::STATE_FLUSHING) { if(t->networking.disconnect_timeout.time_since_epoch().count() > 0) return t->networking.disconnect_timeout + std::chrono::seconds{5} < now; return t->timings.disconnecting + std::chrono::seconds{30} < now; @@ -184,7 +180,7 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) { logMessage(LOG_FT, "{} Networking timeout. Dropping client", client->log_prefix()); provider->invoke_aborted_callback(client, { TransferError::TRANSFER_TIMEOUT, "" }); break; - case FileClient::STATE_DISCONNECTING: + case FileClient::STATE_FLUSHING: if(!client->buffers_flushed()) logMessage(LOG_FT, "{} Failed to flush connection. Dropping client", client->log_prefix()); else diff --git a/file/local_server/LocalFileTransferDisk.cpp b/file/local_server/LocalFileTransferDisk.cpp index 5c6e255..6e81652 100644 --- a/file/local_server/LocalFileTransferDisk.cpp +++ b/file/local_server/LocalFileTransferDisk.cpp @@ -483,7 +483,7 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien if(buffer_left_size > 0) { this->enqueue_disk_io(client); - } else if(client->state == FileClient::STATE_DISCONNECTING) { + } else if(client->state == FileClient::STATE_FLUSHING) { this->test_disconnecting_state(client); } @@ -493,7 +493,7 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr &clien client->add_network_read_event(false); } } else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { - if(client->state == FileClient::STATE_DISCONNECTING) { + if(client->state == FileClient::STATE_FLUSHING) { client->flush_disk_buffer(); /* just in case, file download usually don't write to the disk */ return; } diff --git a/file/local_server/LocalFileTransferNetwork.cpp b/file/local_server/LocalFileTransferNetwork.cpp index cd8e8a2..7319d11 100644 --- a/file/local_server/LocalFileTransferNetwork.cpp +++ b/file/local_server/LocalFileTransferNetwork.cpp @@ -63,7 +63,7 @@ void FileClient::add_network_read_event(bool ignore_bandwidth) { std::shared_lock slock{this->state_mutex}; switch (this->state) { - case STATE_DISCONNECTING: + case STATE_FLUSHING: case STATE_DISCONNECTED: return; @@ -91,7 +91,7 @@ void FileClient::add_network_write_event_nolock(bool ignore_bandwidth) { case STATE_DISCONNECTED: return; - case STATE_DISCONNECTING: + case STATE_FLUSHING: /* flush our write buffer */ break; @@ -587,7 +587,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" }); break; } - case FileClient::STATE_DISCONNECTING: + case FileClient::STATE_FLUSHING: logMessage(LOG_FT, "{} Remote client closed connection. Finalizing disconnect.", transfer->log_prefix()); break; @@ -625,7 +625,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) }); break; - case FileClient::STATE_DISCONNECTING: + case FileClient::STATE_FLUSHING: case FileClient::STATE_DISCONNECTED: default: break; @@ -654,7 +654,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi } } - if(transfer->state == FileClient::STATE_DISCONNECTING || transfer->state == FileClient::STATE_DISCONNECTED) + if(transfer->state == FileClient::STATE_FLUSHING || transfer->state == FileClient::STATE_DISCONNECTED) break; if(bytes_buffered > TRANSFER_MAX_CACHED_BYTES) { @@ -676,7 +676,7 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo auto transfer = reinterpret_cast(ptr_transfer); if((unsigned) events & (unsigned) EV_TIMEOUT) { - if(transfer->state == FileClient::STATE_DISCONNECTING) { + if(transfer->state == FileClient::STATE_FLUSHING) { { std::unique_lock nb_lock{transfer->network_buffer.mutex}; if(transfer->network_buffer.bytes > 0) { @@ -686,6 +686,13 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo } } + + if(!std::exchange(transfer->finished_signal_send, true)) { + if(transfer->transfer) { + transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, "failed to flush outgoing buffer" }); + } + } + transfer->handle->test_disconnecting_state(transfer->shared_from_this()); return; } @@ -701,6 +708,7 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo buffer = transfer->network_buffer.buffer_head; buffer_left_size = transfer->network_buffer.bytes; } + if(!buffer) { break; } @@ -718,6 +726,7 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo if(transfer->state == FileClient::STATE_TRANSFERRING) { assert(transfer->transfer); + if(written == 0) { /* EOF, how the hell is this event possible?! (Read should already catch it) */ logError(LOG_FT, "{} Client disconnected unexpectedly on write. Send {} bytes out of {}.", @@ -728,10 +737,28 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo logError(LOG_FT, "{} Received network write error. Send {} bytes out of {}. Closing transfer.", transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1); + transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) }); + } + } else if(transfer->state == FileClient::STATE_FLUSHING && transfer->transfer) { + { + std::lock_guard block{transfer->network_buffer.mutex}; + if(transfer->network_buffer.bytes == 0) + goto disconnect_client; + } + + transfer->flush_network_buffer(); + if(written == 0) { + logError(LOG_FT, "{} Received unexpected client disconnect while flushing the network buffer. Transfer failed.", transfer->log_prefix()); + transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" }); + } else { + logError(LOG_FT, "{} Received network write error while flushing the network buffer. Closing transfer.", + transfer->log_prefix()); + transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) }); } } + disconnect_client: /* invalidate all network write operations, but still flush the disk IO buffer */ if(size_t bytes_dropped{transfer->flush_network_buffer()}; bytes_dropped > 0) { if(transfer->state != FileClient::STATE_TRANSFERRING) { @@ -739,8 +766,10 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo transfer->log_prefix(), bytes_dropped, errno, strerror(errno)); } } + std::unique_lock slock{transfer->state_mutex}; - transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true); + /* no need to flush anything here, write will only be invoked on a client download */ + transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false); return; } else { buffer->offset += written; @@ -774,7 +803,7 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo if(buffer_left_size > 0) { transfer->add_network_write_event(false); - } else if(transfer->state == FileClient::STATE_DISCONNECTING) { + } else if(transfer->state == FileClient::STATE_FLUSHING) { transfer->handle->test_disconnecting_state(transfer->shared_from_this()); if(!std::exchange(transfer->finished_signal_send, true)) { diff --git a/server/src/DatabaseHelper.cpp b/server/src/DatabaseHelper.cpp index 395cee6..acdd340 100644 --- a/server/src/DatabaseHelper.cpp +++ b/server/src/DatabaseHelper.cpp @@ -1022,7 +1022,7 @@ std::shared_ptr DatabaseHelper::loadClientProperties(const std::shar return; } - debugMessage(server ? server->getServerId() : 0, "[Property] Changing client property '{}' for {} (New value: {], Column: {})", + debugMessage(server ? server->getServerId() : 0, "[Property] Changing client property '{}' for {} (New value: {}, Column: {})", prop.type().name, cldbid, prop.value(),