Fixed a crash related to file transfers

This commit is contained in:
WolverinDEV 2021-01-03 17:16:23 +01:00
parent 6a502e23f2
commit 960186d55e
7 changed files with 112 additions and 72 deletions

View File

@ -22,9 +22,16 @@ Buffer* transfer::allocate_buffer(size_t size) {
return buffer; return buffer;
} }
void transfer::free_buffer(Buffer* buffer) { Buffer* transfer::ref_buffer(Buffer *buffer) {
buffer->~Buffer(); buffer->ref_count++;
free(buffer); return buffer;
}
void transfer::deref_buffer(Buffer *buffer) {
if(--buffer->ref_count == 0) {
buffer->~Buffer();
free(buffer);
}
} }
FileClient::~FileClient() { FileClient::~FileClient() {

View File

@ -29,14 +29,17 @@ namespace ts::server::file::transfer {
struct Buffer { struct Buffer {
Buffer* next{nullptr}; Buffer* next{nullptr};
size_t capacity{0}; std::atomic_uint32_t ref_count{0};
size_t length{0}; uint32_t capacity{0};
size_t offset{0}; uint32_t length{0};
uint32_t offset{0};
char data[1]{}; char data[1]{};
}; };
[[nodiscard]] extern Buffer* allocate_buffer(size_t); [[nodiscard]] extern Buffer* allocate_buffer(size_t);
extern void free_buffer(Buffer*); [[nodiscard]] extern Buffer* ref_buffer(Buffer*);
extern void deref_buffer(Buffer*);
/* all variables are locked via the state_mutex */ /* all variables are locked via the state_mutex */
struct FileClient : std::enable_shared_from_this<FileClient> { struct FileClient : std::enable_shared_from_this<FileClient> {
@ -85,6 +88,7 @@ namespace ts::server::file::transfer {
} transfer_key{}; } transfer_key{};
struct { struct {
/* TODO: Could be a spin lock (never gets locked while writing so no long blocking activity) */
std::mutex mutex{}; std::mutex mutex{};
size_t bytes{0}; size_t bytes{0};
@ -126,7 +130,7 @@ namespace ts::server::file::transfer {
pipes::SSL pipe_ssl{}; pipes::SSL pipe_ssl{};
bool pipe_ssl_init{false}; bool pipe_ssl_init{false};
std::unique_ptr<Buffer, decltype(free_buffer)*> http_header_buffer{nullptr, free_buffer}; std::unique_ptr<Buffer, decltype(deref_buffer)*> http_header_buffer{nullptr, deref_buffer};
HTTPUploadState http_state{HTTPUploadState::HTTP_AWAITING_HEADER}; HTTPUploadState http_state{HTTPUploadState::HTTP_AWAITING_HEADER};
std::string http_boundary{}; std::string http_boundary{};

View File

@ -71,8 +71,9 @@ void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) {
} }
provider->disk_io.queue_head = provider->disk_io.queue_head->file.next_client; provider->disk_io.queue_head = provider->disk_io.queue_head->file.next_client;
if(!provider->disk_io.queue_head) if(!provider->disk_io.queue_head) {
provider->disk_io.queue_tail = &provider->disk_io.queue_head; provider->disk_io.queue_tail = &provider->disk_io.queue_head;
}
} }
if(provider->disk_io.state != DiskIOLoopState::RUNNING) { if(provider->disk_io.state != DiskIOLoopState::RUNNING) {
@ -83,8 +84,9 @@ void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) {
} else { } else {
/* force stopping without any flushing */ /* force stopping without any flushing */
auto fclient = &*client; auto fclient = &*client;
while(fclient) while(fclient) {
fclient = std::exchange(fclient->file.next_client, nullptr); fclient = std::exchange(fclient->file.next_client, nullptr);
}
provider->disk_io.queue_head = nullptr; provider->disk_io.queue_head = nullptr;
provider->disk_io.queue_tail = &provider->disk_io.queue_head; provider->disk_io.queue_tail = &provider->disk_io.queue_head;
@ -92,8 +94,9 @@ void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) {
} }
} }
if(!client) if(!client) {
continue; continue;
}
client->file.currently_processing = true; client->file.currently_processing = true;
client->file.next_client = nullptr; client->file.next_client = nullptr;
@ -124,7 +127,7 @@ bool FileClient::enqueue_disk_buffer_bytes(const void *snd_buffer, size_t size)
return buffer_size > TRANSFER_MAX_CACHED_BYTES; return buffer_size > TRANSFER_MAX_CACHED_BYTES;
write_disconnected: write_disconnected:
free_buffer(tbuffer); deref_buffer(tbuffer);
return false; return false;
} }
@ -141,7 +144,7 @@ void FileClient::flush_disk_buffer() {
while(current_head) { while(current_head) {
auto next = current_head->next; auto next = current_head->next;
free_buffer(current_head); deref_buffer(current_head);
current_head = next; current_head = next;
} }
} }
@ -410,7 +413,9 @@ void LocalFileTransfer::enqueue_disk_io(const std::shared_ptr<FileClient> &clien
} }
void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &client) { void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &client) {
if(!client->transfer) return; if(!client->transfer) {
return;
}
if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) { if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) {
Buffer* buffer; Buffer* buffer;
@ -419,17 +424,28 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
while(true) { while(true) {
{ {
std::lock_guard block{client->disk_buffer.mutex}; std::lock_guard block{client->disk_buffer.mutex};
buffer = client->disk_buffer.buffer_head;
if(!client->disk_buffer.buffer_head) {
assert(client->disk_buffer.bytes == 0);
buffer_left_size = 0;
break;
}
buffer_left_size = client->disk_buffer.bytes; buffer_left_size = client->disk_buffer.bytes;
} buffer = ref_buffer(client->disk_buffer.buffer_head);
if(!buffer) {
assert(buffer_left_size == 0);
break;
} }
assert(buffer->offset < buffer->length); assert(buffer->offset < buffer->length);
auto written = ::write(client->file.file_descriptor, buffer->data + buffer->offset, buffer->length - buffer->offset); auto written = ::write(client->file.file_descriptor, buffer->data + buffer->offset, buffer->length - buffer->offset);
if(written <= 0) { if(written <= 0) {
deref_buffer(buffer);
if(errno == EAGAIN) {
//TODO: Timeout?
this->enqueue_disk_io(client);
break;
}
if(written == 0) { if(written == 0) {
/* EOF, how the hell is this event possible?! */ /* EOF, how the hell is this event possible?! */
auto offset_written = client->statistics.disk_bytes_write.total_bytes + client->transfer->file_offset; auto offset_written = client->statistics.disk_bytes_write.total_bytes + client->transfer->file_offset;
@ -446,12 +462,6 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
client->handle->disconnect_client(client, slock, true); client->handle->disconnect_client(client, slock, true);
} }
} else { } else {
if(errno == EAGAIN) {
//TODO: Timeout?
this->enqueue_disk_io(client);
break;
}
auto offset_written = client->statistics.disk_bytes_write.total_bytes + client->transfer->file_offset; auto offset_written = client->statistics.disk_bytes_write.total_bytes + client->transfer->file_offset;
auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR); auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR);
logError(LOG_FT, "{} Received write to disk IO error. Write pointer is at {} of {}. Actual file offset: {}. Closing transfer.", logError(LOG_FT, "{} Received write to disk IO error. Write pointer is at {} of {}. Actual file offset: {}. Closing transfer.",
@ -469,28 +479,38 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
} else { } else {
buffer->offset += written; buffer->offset += written;
assert(buffer->offset <= buffer->length); assert(buffer->offset <= buffer->length);
if(buffer->length == buffer->offset) { if(buffer->length == buffer->offset) {
{ {
std::lock_guard block{client->disk_buffer.mutex}; std::lock_guard block{client->disk_buffer.mutex};
client->disk_buffer.buffer_head = buffer->next; if(client->disk_buffer.buffer_head == buffer) {
if(!buffer->next) client->disk_buffer.buffer_head = buffer->next;
client->disk_buffer.buffer_tail = &client->disk_buffer.buffer_head; if(!buffer->next) {
client->disk_buffer.buffer_tail = &client->disk_buffer.buffer_head;
}
assert(client->disk_buffer.bytes >= written);
client->disk_buffer.bytes -= written;
buffer_left_size = client->disk_buffer.bytes;
} else {
/* The buffer got removed */
}
}
/* We have to deref the buffer twice since we've removed it from the list which owns us one reference */
deref_buffer(buffer);
} else {
std::lock_guard block{client->disk_buffer.mutex};
if(client->disk_buffer.buffer_head == buffer) {
assert(client->disk_buffer.bytes >= written); assert(client->disk_buffer.bytes >= written);
client->disk_buffer.bytes -= written; client->disk_buffer.bytes -= written;
buffer_left_size = client->disk_buffer.bytes; buffer_left_size = client->disk_buffer.bytes;
(void) buffer_left_size; /* trick my IDE here a bit */ } else {
/* The buffer got removed */
} }
free_buffer(buffer);
} else {
std::lock_guard block{client->disk_buffer.mutex};
assert(client->disk_buffer.bytes >= written);
client->disk_buffer.bytes -= written;
buffer_left_size = client->disk_buffer.bytes;
(void) buffer_left_size; /* trick my IDE here a bit */
} }
deref_buffer(buffer);
client->statistics.disk_bytes_write.increase_bytes(written); client->statistics.disk_bytes_write.increase_bytes(written);
} }
} }
@ -502,8 +522,10 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
} }
if(client->state == FileClient::STATE_TRANSFERRING && buffer_left_size < TRANSFER_MAX_CACHED_BYTES / 2) { if(client->state == FileClient::STATE_TRANSFERRING && buffer_left_size < TRANSFER_MAX_CACHED_BYTES / 2) {
if(client->disk_buffer.buffering_stopped) if(client->disk_buffer.buffering_stopped) {
logMessage(LOG_FT, "{} Starting network read, buffer is capable for reading again.", client->log_prefix()); logMessage(LOG_FT, "{} Starting network read, buffer is capable for reading again.", client->log_prefix());
}
client->add_network_read_event(false); client->add_network_read_event(false);
} }
} else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) { } else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
@ -518,6 +540,11 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
auto read = ::read(client->file.file_descriptor, buffer, buffer_capacity); auto read = ::read(client->file.file_descriptor, buffer, buffer_capacity);
if(read <= 0) { if(read <= 0) {
if(errno == EAGAIN) {
this->enqueue_disk_io(client);
return;
}
if(read == 0) { if(read == 0) {
/* EOF */ /* EOF */
auto offset_send = client->statistics.disk_bytes_read.total_bytes + client->transfer->file_offset; auto offset_send = client->statistics.disk_bytes_read.total_bytes + client->transfer->file_offset;
@ -532,11 +559,6 @@ void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &clien
this->invoke_aborted_callback(client, { TransferError::UNEXPECTED_DISK_EOF, "" }); this->invoke_aborted_callback(client, { TransferError::UNEXPECTED_DISK_EOF, "" });
} }
} else { } else {
if(errno == EAGAIN) {
this->enqueue_disk_io(client);
return;
}
logWarning(LOG_FT, "{} Failed to read from file {} ({}/{}). Aborting transfer.", client->log_prefix(), client->transfer->absolute_file_path, errno, strerror(errno)); logWarning(LOG_FT, "{} Failed to read from file {} ({}/{}). Aborting transfer.", client->log_prefix(), client->transfer->absolute_file_path, errno, strerror(errno));
this->invoke_aborted_callback(client, { TransferError::DISK_IO_ERROR, strerror(errno) }); this->invoke_aborted_callback(client, { TransferError::DISK_IO_ERROR, strerror(errno) });

View File

@ -145,7 +145,7 @@ bool FileClient::enqueue_network_buffer_bytes(const void *snd_buffer, size_t siz
return buffer_size > TRANSFER_MAX_CACHED_BYTES; return buffer_size > TRANSFER_MAX_CACHED_BYTES;
write_disconnected: write_disconnected:
free_buffer(tbuffer); deref_buffer(tbuffer);
return false; return false;
} }
@ -163,7 +163,7 @@ size_t FileClient::flush_network_buffer() {
while(current_head) { while(current_head) {
auto next = current_head->next; auto next = current_head->next;
free_buffer(current_head); deref_buffer(current_head);
current_head = next; current_head = next;
} }
@ -723,20 +723,24 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
while(true) { while(true) {
{ {
std::lock_guard block{transfer->network_buffer.mutex}; std::lock_guard block{transfer->network_buffer.mutex};
buffer = transfer->network_buffer.buffer_head;
buffer_left_size = transfer->network_buffer.bytes;
}
if(!buffer) { if(!transfer->network_buffer.buffer_head) {
break; buffer_left_size = 0;
assert(transfer->network_buffer.bytes == 0);
break;
}
buffer = ref_buffer(transfer->network_buffer.buffer_head);
buffer_left_size = transfer->network_buffer.bytes;
} }
const auto max_write_bytes = transfer->networking.throttle.bytes_left(); const auto max_write_bytes = transfer->networking.throttle.bytes_left();
if(!max_write_bytes) break; /* network throttle */ if(!max_write_bytes) break; /* network throttle */
assert(buffer->offset < buffer->length); assert(buffer->offset < buffer->length);
auto written = ::send(fd, buffer->data + buffer->offset, std::min(buffer->length - buffer->offset, max_write_bytes), MSG_DONTWAIT | MSG_NOSIGNAL); auto written = ::send(fd, buffer->data + buffer->offset, std::min((size_t) (buffer->length - buffer->offset), max_write_bytes), MSG_DONTWAIT | MSG_NOSIGNAL);
if(written <= 0) { if(written <= 0) {
deref_buffer(buffer);
if(errno == EAGAIN) { if(errno == EAGAIN) {
transfer->add_network_write_event(false); transfer->add_network_write_event(false);
break; break;
@ -795,27 +799,38 @@ void LocalFileTransfer::callback_transfer_network_write(int fd, short events, vo
if(buffer->length == buffer->offset) { if(buffer->length == buffer->offset) {
{ {
std::lock_guard block{transfer->network_buffer.mutex}; std::lock_guard block{transfer->network_buffer.mutex};
transfer->network_buffer.buffer_head = buffer->next; if(transfer->network_buffer.buffer_head == buffer) {
if(!buffer->next) transfer->network_buffer.buffer_head = buffer->next;
transfer->network_buffer.buffer_tail = &transfer->network_buffer.buffer_head; if(!buffer->next)
transfer->network_buffer.buffer_tail = &transfer->network_buffer.buffer_head;
assert(transfer->network_buffer.bytes >= written);
transfer->network_buffer.bytes -= written;
buffer_left_size = transfer->network_buffer.bytes;
} else {
/* the buffer got remove */
}
}
deref_buffer(buffer);
} else {
std::lock_guard block{transfer->network_buffer.mutex};
if(transfer->network_buffer.buffer_head == buffer) {
assert(transfer->network_buffer.bytes >= written); assert(transfer->network_buffer.bytes >= written);
transfer->network_buffer.bytes -= written; transfer->network_buffer.bytes -= written;
buffer_left_size = transfer->network_buffer.bytes; buffer_left_size = transfer->network_buffer.bytes;
} else {
/* the buffer got remove */
} }
free_buffer(buffer);
} else {
std::lock_guard block{transfer->network_buffer.mutex};
assert(transfer->network_buffer.bytes >= written);
transfer->network_buffer.bytes -= written;
buffer_left_size = transfer->network_buffer.bytes;
} }
transfer->timings.last_write = std::chrono::system_clock::now(); transfer->timings.last_write = std::chrono::system_clock::now();
transfer->statistics.network_send.increase_bytes(written); transfer->statistics.network_send.increase_bytes(written);
if(transfer->networking.throttle.increase_bytes(written)) deref_buffer(buffer);
if(transfer->networking.throttle.increase_bytes(written)) {
break; /* we've to slow down */ break; /* we've to slow down */
}
} }
} }

@ -1 +1 @@
Subproject commit 171ddedeedc5bce33b75c9f14c3fafd41ed8820e Subproject commit 5c2ff84e47d4fa905f9b01b1d6bdf783c6fdd808

2
rtclib

@ -1 +1 @@
Subproject commit 6beb431776ea4f21c76ccefcbe30c2daadfa2687 Subproject commit ea13ab489529ef0ebc72c15505c6be52ed484269

View File

@ -23,9 +23,6 @@ using namespace ts::protocol;
//#define PKT_LOG_VOICE //#define PKT_LOG_VOICE
//#define PKT_LOG_WHISPER //#define PKT_LOG_WHISPER
constexpr static auto kMaxWhisperClientNameLength{30};
constexpr static auto kWhisperClientUniqueIdLength{28}; /* base64 encoded SHA1 hash */
constexpr static auto kWhisperMaxHeaderLength{2 + 2 + 1 + 2 + kWhisperClientUniqueIdLength + 1 + kMaxWhisperClientNameLength};
SpeakingClient::SpeakingClient(sql::SqlManager *a, const std::shared_ptr<VirtualServer> &b) : ConnectedClient(a, b), whisper_handler_{this} { SpeakingClient::SpeakingClient(sql::SqlManager *a, const std::shared_ptr<VirtualServer> &b) : ConnectedClient(a, b), whisper_handler_{this} {
speak_begin = std::chrono::system_clock::now(); speak_begin = std::chrono::system_clock::now();
@ -78,11 +75,6 @@ inline bool update_whisper_error(std::chrono::system_clock::time_point& last) {
return false; return false;
} }
#define TEST_PARM(type) \
do {\
if(!cmd[0][key].castable<type>())\
return {findError("parameter_invalid"), "Invalid type for " + key};\
} while(false)
auto regex_wildcard = std::regex(".*"); auto regex_wildcard = std::regex(".*");