Some minor transfer updates
This commit is contained in:
		
							parent
							
								
									669b3ae349
								
							
						
					
					
						commit
						e9388e5e5e
					
				@ -137,7 +137,7 @@ namespace ts::server::file {
 | 
			
		||||
            enum {
 | 
			
		||||
                STATE_AWAITING_KEY, /* includes SSL/HTTP init */
 | 
			
		||||
                STATE_TRANSFERRING,
 | 
			
		||||
                STATE_DISCONNECTING,
 | 
			
		||||
                STATE_FLUSHING,
 | 
			
		||||
                STATE_DISCONNECTED
 | 
			
		||||
            } state{STATE_AWAITING_KEY};
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -123,7 +123,7 @@ std::shared_ptr<ExecuteResponse<TransferInitError, std::shared_ptr<Transfer>>> L
 | 
			
		||||
        std::unique_lock tlock{this->transfers_mutex};
 | 
			
		||||
        {
 | 
			
		||||
            auto transfers = std::count_if(this->transfers_.begin(), this->transfers_.end(), [&](const std::shared_ptr<FileClient>& client) {
 | 
			
		||||
                return client->transfer && client->transfer->client_unique_id == info.client_unique_id && client->state < FileClient::STATE_DISCONNECTING;
 | 
			
		||||
                return client->transfer && client->transfer->client_unique_id == info.client_unique_id && client->state < FileClient::STATE_FLUSHING;
 | 
			
		||||
            });
 | 
			
		||||
            transfers += std::count_if(this->pending_transfers.begin(), this->pending_transfers.end(), [&](const std::shared_ptr<Transfer>& transfer) {
 | 
			
		||||
                return transfer->client_unique_id == info.client_unique_id;
 | 
			
		||||
 | 
			
		||||
@ -36,16 +36,16 @@ void LocalFileTransfer::shutdown_client_worker() {
 | 
			
		||||
void LocalFileTransfer::disconnect_client(const std::shared_ptr<FileClient> &client, std::unique_lock<std::shared_mutex>& state_lock, bool flush) {
 | 
			
		||||
    assert(state_lock.owns_lock());
 | 
			
		||||
 | 
			
		||||
    if(client->state == FileClient::STATE_DISCONNECTED || (client->state == FileClient::STATE_DISCONNECTING && flush)) {
 | 
			
		||||
    if(client->state == FileClient::STATE_DISCONNECTED || (client->state == FileClient::STATE_FLUSHING && flush)) {
 | 
			
		||||
        return; /* shall NOT happen */
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
#define del_ev_noblock(event) if(event) event_del_noblock(event)
 | 
			
		||||
 | 
			
		||||
    client->state = flush ? FileClient::STATE_DISCONNECTING : FileClient::STATE_DISCONNECTED;
 | 
			
		||||
    client->state = flush ? FileClient::STATE_FLUSHING : FileClient::STATE_DISCONNECTED;
 | 
			
		||||
    client->timings.disconnecting = std::chrono::system_clock::now();
 | 
			
		||||
    if(flush) {
 | 
			
		||||
        const auto network_flush_time = client->networking.client_throttle.expected_writing_time(client->network_buffer.bytes) + std::chrono::seconds{10};
 | 
			
		||||
        const auto network_flush_time = client->networking.throttle.expected_writing_time(client->network_buffer.bytes) + std::chrono::seconds{10};
 | 
			
		||||
 | 
			
		||||
        del_ev_noblock(client->networking.event_read);
 | 
			
		||||
 | 
			
		||||
@ -66,19 +66,15 @@ void LocalFileTransfer::disconnect_client(const std::shared_ptr<FileClient> &cli
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void LocalFileTransfer::test_disconnecting_state(const std::shared_ptr<FileClient> &client) {
 | 
			
		||||
    if(client->state != FileClient::STATE_DISCONNECTING)
 | 
			
		||||
    if(client->state != FileClient::STATE_FLUSHING)
 | 
			
		||||
        return;
 | 
			
		||||
 | 
			
		||||
    if(!client->buffers_flushed())
 | 
			
		||||
        return;
 | 
			
		||||
 | 
			
		||||
    if(client->networking.protocol != FileClient::PROTOCOL_TS_V1) {
 | 
			
		||||
        debugMessage(LOG_FT, "{} Disk and network buffers are flushed. Closing connection.", client->log_prefix());
 | 
			
		||||
        std::unique_lock s_lock{client->state_mutex};
 | 
			
		||||
        this->disconnect_client(client, s_lock, false);
 | 
			
		||||
    } else {
 | 
			
		||||
        debugMessage(LOG_FT, "{} Disk and network buffers are flushed. Awaiting client disconnect.", client->log_prefix());
 | 
			
		||||
    }
 | 
			
		||||
    debugMessage(LOG_FT, "{} Disk and network buffers are flushed. Closing connection.", client->log_prefix());
 | 
			
		||||
    std::unique_lock s_lock{client->state_mutex};
 | 
			
		||||
    this->disconnect_client(client, s_lock, false);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
 | 
			
		||||
@ -100,7 +96,7 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
 | 
			
		||||
                switch(transfer->state) {
 | 
			
		||||
                    case FileClient::STATE_TRANSFERRING:
 | 
			
		||||
                        break;
 | 
			
		||||
                    case FileClient::STATE_DISCONNECTING:
 | 
			
		||||
                    case FileClient::STATE_FLUSHING:
 | 
			
		||||
                        if(!transfer->transfer)
 | 
			
		||||
                            continue;
 | 
			
		||||
 | 
			
		||||
@ -163,7 +159,7 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
 | 
			
		||||
                        } else if(t->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
 | 
			
		||||
                            return t->timings.last_write + std::chrono::seconds{5} < now;
 | 
			
		||||
                        }
 | 
			
		||||
                    } else if(t->state == FileClient::STATE_DISCONNECTING) {
 | 
			
		||||
                    } else if(t->state == FileClient::STATE_FLUSHING) {
 | 
			
		||||
                        if(t->networking.disconnect_timeout.time_since_epoch().count() > 0)
 | 
			
		||||
                            return t->networking.disconnect_timeout + std::chrono::seconds{5} < now;
 | 
			
		||||
                        return t->timings.disconnecting + std::chrono::seconds{30} < now;
 | 
			
		||||
@ -184,7 +180,7 @@ void LocalFileTransfer::dispatch_loop_client_worker(void *ptr_transfer) {
 | 
			
		||||
                        logMessage(LOG_FT, "{} Networking timeout. Dropping client", client->log_prefix());
 | 
			
		||||
                        provider->invoke_aborted_callback(client, { TransferError::TRANSFER_TIMEOUT, "" });
 | 
			
		||||
                        break;
 | 
			
		||||
                    case FileClient::STATE_DISCONNECTING:
 | 
			
		||||
                    case FileClient::STATE_FLUSHING:
 | 
			
		||||
                        if(!client->buffers_flushed())
 | 
			
		||||
                            logMessage(LOG_FT, "{} Failed to flush connection. Dropping client", client->log_prefix());
 | 
			
		||||
                        else
 | 
			
		||||
 | 
			
		||||
@ -483,7 +483,7 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
 | 
			
		||||
 | 
			
		||||
        if(buffer_left_size > 0) {
 | 
			
		||||
            this->enqueue_disk_io(client);
 | 
			
		||||
        } else if(client->state == FileClient::STATE_DISCONNECTING) {
 | 
			
		||||
        } else if(client->state == FileClient::STATE_FLUSHING) {
 | 
			
		||||
            this->test_disconnecting_state(client);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -493,7 +493,7 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
 | 
			
		||||
            client->add_network_read_event(false);
 | 
			
		||||
        }
 | 
			
		||||
    } else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
 | 
			
		||||
        if(client->state == FileClient::STATE_DISCONNECTING) {
 | 
			
		||||
        if(client->state == FileClient::STATE_FLUSHING) {
 | 
			
		||||
            client->flush_disk_buffer(); /* just in case, file download usually don't write to the disk */
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@ -63,7 +63,7 @@ void FileClient::add_network_read_event(bool ignore_bandwidth) {
 | 
			
		||||
    std::shared_lock slock{this->state_mutex};
 | 
			
		||||
 | 
			
		||||
    switch (this->state) {
 | 
			
		||||
        case STATE_DISCONNECTING:
 | 
			
		||||
        case STATE_FLUSHING:
 | 
			
		||||
        case STATE_DISCONNECTED:
 | 
			
		||||
            return;
 | 
			
		||||
 | 
			
		||||
@ -91,7 +91,7 @@ void FileClient::add_network_write_event_nolock(bool ignore_bandwidth) {
 | 
			
		||||
        case STATE_DISCONNECTED:
 | 
			
		||||
            return;
 | 
			
		||||
 | 
			
		||||
        case STATE_DISCONNECTING:
 | 
			
		||||
        case STATE_FLUSHING:
 | 
			
		||||
            /* flush our write buffer */
 | 
			
		||||
            break;
 | 
			
		||||
 | 
			
		||||
@ -587,7 +587,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
 | 
			
		||||
                            transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" });
 | 
			
		||||
                            break;
 | 
			
		||||
                        }
 | 
			
		||||
                        case FileClient::STATE_DISCONNECTING:
 | 
			
		||||
                        case FileClient::STATE_FLUSHING:
 | 
			
		||||
                            logMessage(LOG_FT, "{} Remote client closed connection. Finalizing disconnect.", transfer->log_prefix());
 | 
			
		||||
                            break;
 | 
			
		||||
 | 
			
		||||
@ -625,7 +625,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
 | 
			
		||||
 | 
			
		||||
                            transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) });
 | 
			
		||||
                            break;
 | 
			
		||||
                        case FileClient::STATE_DISCONNECTING:
 | 
			
		||||
                        case FileClient::STATE_FLUSHING:
 | 
			
		||||
                        case FileClient::STATE_DISCONNECTED:
 | 
			
		||||
                        default:
 | 
			
		||||
                            break;
 | 
			
		||||
@ -654,7 +654,7 @@ void LocalFileTransfer::callback_transfer_network_read(int fd, short events, voi
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                if(transfer->state == FileClient::STATE_DISCONNECTING || transfer->state == FileClient::STATE_DISCONNECTED)
 | 
			
		||||
                if(transfer->state == FileClient::STATE_FLUSHING || transfer->state == FileClient::STATE_DISCONNECTED)
 | 
			
		||||
                    break;
 | 
			
		||||
 | 
			
		||||
                if(bytes_buffered > TRANSFER_MAX_CACHED_BYTES) {
 | 
			
		||||
@ -676,7 +676,7 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
 | 
			
		||||
    auto transfer = reinterpret_cast<FileClient*>(ptr_transfer);
 | 
			
		||||
 | 
			
		||||
    if((unsigned) events & (unsigned) EV_TIMEOUT) {
 | 
			
		||||
        if(transfer->state == FileClient::STATE_DISCONNECTING) {
 | 
			
		||||
        if(transfer->state == FileClient::STATE_FLUSHING) {
 | 
			
		||||
            {
 | 
			
		||||
                std::unique_lock nb_lock{transfer->network_buffer.mutex};
 | 
			
		||||
                if(transfer->network_buffer.bytes > 0) {
 | 
			
		||||
@ -686,6 +686,13 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
            if(!std::exchange(transfer->finished_signal_send, true)) {
 | 
			
		||||
                if(transfer->transfer) {
 | 
			
		||||
                    transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, "failed to flush outgoing buffer" });
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            transfer->handle->test_disconnecting_state(transfer->shared_from_this());
 | 
			
		||||
            return;
 | 
			
		||||
        }
 | 
			
		||||
@ -701,6 +708,7 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
 | 
			
		||||
                buffer = transfer->network_buffer.buffer_head;
 | 
			
		||||
                buffer_left_size = transfer->network_buffer.bytes;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            if(!buffer) {
 | 
			
		||||
                break;
 | 
			
		||||
            }
 | 
			
		||||
@ -718,6 +726,7 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
 | 
			
		||||
 | 
			
		||||
                if(transfer->state == FileClient::STATE_TRANSFERRING) {
 | 
			
		||||
                    assert(transfer->transfer);
 | 
			
		||||
 | 
			
		||||
                    if(written == 0) {
 | 
			
		||||
                        /* EOF, how the hell is this event possible?! (Read should already catch it) */
 | 
			
		||||
                        logError(LOG_FT, "{} Client disconnected unexpectedly on write. Send {} bytes out of {}.",
 | 
			
		||||
@ -728,10 +737,28 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
 | 
			
		||||
                        logError(LOG_FT, "{} Received network write error. Send {} bytes out of {}. Closing transfer.",
 | 
			
		||||
                                 transfer->log_prefix(), transfer->statistics.file_transferred.total_bytes, transfer->transfer ? transfer->transfer->expected_file_size - transfer->transfer->file_offset : -1);
 | 
			
		||||
 | 
			
		||||
                        transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) });
 | 
			
		||||
                    }
 | 
			
		||||
                } else if(transfer->state == FileClient::STATE_FLUSHING && transfer->transfer) {
 | 
			
		||||
                    {
 | 
			
		||||
                        std::lock_guard block{transfer->network_buffer.mutex};
 | 
			
		||||
                        if(transfer->network_buffer.bytes == 0)
 | 
			
		||||
                            goto disconnect_client;
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    transfer->flush_network_buffer();
 | 
			
		||||
                    if(written == 0) {
 | 
			
		||||
                        logError(LOG_FT, "{} Received unexpected client disconnect while flushing the network buffer. Transfer failed.", transfer->log_prefix());
 | 
			
		||||
                        transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::UNEXPECTED_CLIENT_DISCONNECT, "" });
 | 
			
		||||
                    } else {
 | 
			
		||||
                        logError(LOG_FT, "{} Received network write error while flushing the network buffer. Closing transfer.",
 | 
			
		||||
                                 transfer->log_prefix());
 | 
			
		||||
 | 
			
		||||
                        transfer->handle->invoke_aborted_callback(transfer->shared_from_this(), { TransferError::NETWORK_IO_ERROR, strerror(errno) });
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                disconnect_client:
 | 
			
		||||
                /* invalidate all network write operations, but still flush the disk IO buffer */
 | 
			
		||||
                if(size_t bytes_dropped{transfer->flush_network_buffer()}; bytes_dropped > 0) {
 | 
			
		||||
                    if(transfer->state != FileClient::STATE_TRANSFERRING) {
 | 
			
		||||
@ -739,8 +766,10 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
 | 
			
		||||
                                transfer->log_prefix(), bytes_dropped, errno, strerror(errno));
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                std::unique_lock slock{transfer->state_mutex};
 | 
			
		||||
                transfer->handle->disconnect_client(transfer->shared_from_this(), slock, true);
 | 
			
		||||
                /* no need to flush anything here, write will only be invoked on a client download */
 | 
			
		||||
                transfer->handle->disconnect_client(transfer->shared_from_this(), slock, false);
 | 
			
		||||
                return;
 | 
			
		||||
            } else {
 | 
			
		||||
                buffer->offset += written;
 | 
			
		||||
@ -774,7 +803,7 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
 | 
			
		||||
 | 
			
		||||
        if(buffer_left_size > 0) {
 | 
			
		||||
            transfer->add_network_write_event(false);
 | 
			
		||||
        } else if(transfer->state == FileClient::STATE_DISCONNECTING) {
 | 
			
		||||
        } else if(transfer->state == FileClient::STATE_FLUSHING) {
 | 
			
		||||
            transfer->handle->test_disconnecting_state(transfer->shared_from_this());
 | 
			
		||||
 | 
			
		||||
            if(!std::exchange(transfer->finished_signal_send, true)) {
 | 
			
		||||
 | 
			
		||||
@ -1022,7 +1022,7 @@ std::shared_ptr<Properties> DatabaseHelper::loadClientProperties(const std::shar
 | 
			
		||||
                return;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        debugMessage(server ? server->getServerId() : 0, "[Property] Changing client property '{}' for {} (New value: {], Column: {})",
 | 
			
		||||
        debugMessage(server ? server->getServerId() : 0, "[Property] Changing client property '{}' for {} (New value: {}, Column: {})",
 | 
			
		||||
            prop.type().name,
 | 
			
		||||
            cldbid,
 | 
			
		||||
            prop.value(),
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user