//
// Created by WolverinDEV on 04/05/2020.
//

#include <cassert>
#include <event.h>
#include <experimental/filesystem>
#include <log/LogUtils.h>
#include "./LocalFileProvider.h"
#include "./duration_utils.h"
#include "LocalFileProvider.h"

using namespace ts::server::file;
using namespace ts::server::file::transfer;
namespace fs = std::experimental::filesystem;

DiskIOStartResult LocalFileTransfer::start_disk_io() {
    assert(this->disk_io.state == DiskIOLoopState::STOPPED);

    this->disk_io.state = DiskIOLoopState::RUNNING;
    this->disk_io.dispatch_thread = std::thread(&LocalFileTransfer::dispatch_loop_disk_io, this);
    return DiskIOStartResult::SUCCESS;
}

void LocalFileTransfer::shutdown_disk_io() {
    if(this->disk_io.state == DiskIOLoopState::STOPPED) return;

    this->disk_io.state = DiskIOLoopState::STOPPING;
    {
        std::unique_lock qlock{this->disk_io.queue_lock};
        this->disk_io.notify_work_awaiting.notify_all();
        while(this->disk_io.queue_head)
            this->disk_io.notify_client_processed.wait_for(qlock, std::chrono::seconds{10});

        if(this->disk_io.queue_head) {
            logWarning(0, "Failed to flush disk IO. Force aborting.");
            this->disk_io.state = DiskIOLoopState::FORCE_STOPPING;
            this->disk_io.notify_work_awaiting.notify_all();
            this->disk_io.notify_client_processed.wait(qlock);
        }
    }

    if(this->disk_io.dispatch_thread.joinable())
        this->disk_io.dispatch_thread.join();

    this->disk_io.state = DiskIOLoopState::STOPPED;
}

void LocalFileTransfer::dispatch_loop_disk_io(void *provider_ptr) {
    auto provider = reinterpret_cast<LocalFileTransfer*>(provider_ptr);

    std::shared_ptr<FileClient> client{};
    while(true) {
        {
            std::unique_lock qlock{provider->disk_io.queue_lock};
            if(client) {
                client->file.currently_processing = false;
                provider->disk_io.notify_client_processed.notify_all();
                client = nullptr;
            }

            provider->disk_io.notify_work_awaiting.wait(qlock, [&]{ return provider->disk_io.state != DiskIOLoopState::RUNNING || provider->disk_io.queue_head != nullptr; });

            if(provider->disk_io.queue_head) {
                client = provider->disk_io.queue_head->shared_from_this();

                provider->disk_io.queue_head = provider->disk_io.queue_head->file.next_client;
                if(!provider->disk_io.queue_head)
                    provider->disk_io.queue_tail = &provider->disk_io.queue_head;
            }

            if(provider->disk_io.state != DiskIOLoopState::RUNNING) {
                if(provider->disk_io.state == DiskIOLoopState::STOPPING) {
                    if(!client)
                        break;
                    /* break only if all clients have been flushed */
                } else {
                    /* force stopping without any flushing */
                    auto fclient = &*client;
                    while(fclient)
                        fclient = std::exchange(fclient->file.next_client, nullptr);

                    provider->disk_io.queue_head = nullptr;
                    provider->disk_io.queue_tail = &provider->disk_io.queue_head;
                    break;
                }
            }

            if(!client)
                continue;

            client->file.currently_processing = true;
            client->file.next_client = nullptr;
        }

        provider->execute_disk_io(client);
    }
    provider->disk_io.notify_client_processed.notify_all();
}

FileInitializeResult LocalFileTransfer::initialize_file_io(const std::shared_ptr<FileClient> &transfer) {
    FileInitializeResult result{FileInitializeResult::SUCCESS};
    assert(transfer->transfer);

    std::shared_lock slock{transfer->state_mutex};
    auto& file_data = transfer->file;
    assert(!file_data.file_descriptor);
    assert(!file_data.next_client);

    {
        unsigned int open_flags{0};
        if(transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
            open_flags = O_RDONLY;

            std::error_code fs_error{};
            if(file_data.absolute_path.empty() || !fs::exists(file_data.absolute_path, fs_error)) {
                result = FileInitializeResult::FILE_DOES_NOT_EXISTS;
                goto error_exit;
            } else if(fs_error) {
                logWarning(LOG_FT, "{} Failed to check for file existence of {}: {}/{}", transfer->log_prefix(), file_data.absolute_path, fs_error.value(), fs_error.message());
                result = FileInitializeResult::FILE_SYSTEM_ERROR;
                goto error_exit;
            }
        } else if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) {
            open_flags = (unsigned) O_WRONLY | (unsigned) O_CREAT;
            if(transfer->transfer->override_exiting)
                open_flags |= (unsigned) O_TRUNC;
        } else {
            return FileInitializeResult::INVALID_TRANSFER_DIRECTION;
        }

        file_data.file_descriptor = open(file_data.absolute_path.c_str(), (int) open_flags, 0644);
        if(file_data.file_descriptor <= 0) {
            const auto errno_ = errno;
            switch (errno_) {
                case EACCES:
                    result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE;
                    break;
                case EDQUOT:
                    logWarning(LOG_FT, "{} Disk inode limit has been reached. Failed to start file transfer for file {}", transfer->log_prefix(), file_data.absolute_path);
                    result = FileInitializeResult::FILE_SYSTEM_ERROR;
                    break;
                case EISDIR:
                    result = FileInitializeResult::FILE_IS_A_DIRECTORY;
                    break;
                case EMFILE:
                    result = FileInitializeResult::PROCESS_FILE_LIMIT_REACHED;
                    break;
                case ENFILE:
                    result = FileInitializeResult::SYSTEM_FILE_LIMIT_REACHED;
                    break;
                case ETXTBSY:
                    result = FileInitializeResult::FILE_IS_BUSY;
                    break;
                case EROFS:
                    result = FileInitializeResult::DISK_IS_READ_ONLY;
                    break;
                default:
                    logWarning(LOG_FT, "{} Failed to start file transfer for file {}: {}/{}", transfer->log_prefix(), file_data.absolute_path, errno_, strerror(errno_));
                    result = FileInitializeResult::FILE_SYSTEM_ERROR;
                    break;
            }
            goto error_exit;
        }
    }

    this->file_system_.lock_file(transfer->file.absolute_path);
    transfer->file.file_locked = true;

    if(transfer->transfer->direction == Transfer::DIRECTION_UPLOAD) {
        if(ftruncate(file_data.file_descriptor, transfer->transfer->expected_file_size) != 0) {
            const auto errno_ = errno;
            switch (errno_) {
                case EACCES:
                    logWarning(LOG_FT, "{} File {} got inaccessible on truncating, but not on opening.", transfer->log_prefix(), file_data.absolute_path);
                    result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE;
                    goto error_exit;

                case EFBIG:
                    result = FileInitializeResult::FILE_TOO_LARGE;
                    goto error_exit;

                case EIO:
                    logWarning(LOG_FT, "{} A disk IO error occurred while resizing file {}.", transfer->log_prefix(), file_data.absolute_path);
                    result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE;
                    goto error_exit;

                case EROFS:
                    logWarning(LOG_FT, "{} Failed to resize file {} because disk is in read only mode.", transfer->log_prefix(), file_data.absolute_path);
                    result = FileInitializeResult::FILE_IS_NOT_ACCESSIBLE;
                    goto error_exit;
                default:
                    debugMessage(LOG_FT, "{} Failed to truncate file {}: {}/{}. Trying to upload file anyways.", transfer->log_prefix(), file_data.absolute_path, errno_, strerror(errno_));
            }
        }
    } else if(transfer->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
        auto file_size = lseek(file_data.file_descriptor, 0, SEEK_END);
        if(file_size != transfer->transfer->expected_file_size) {
            logWarning(LOG_FT, "{} Expected target file to be of size {}, but file is actually of size {}", transfer->log_prefix(), transfer->transfer->expected_file_size, file_size);
            result = FileInitializeResult::FILE_SIZE_MISMATCH;
            goto error_exit;
        }
    }
    {
        auto new_pos = lseek(file_data.file_descriptor, transfer->transfer->file_offset, SEEK_SET);
        if(new_pos < 0) {
            logWarning(LOG_FT, "{} Failed to seek to target file offset ({}): {}/{}", transfer->log_prefix(), transfer->transfer->file_offset, errno, strerror(errno));
            result = FileInitializeResult::FILE_SEEK_FAILED;
            goto error_exit;
        } else if(new_pos != transfer->transfer->file_offset) {
            logWarning(LOG_FT, "{} File rw offset mismatch after seek. Expected {} but received {}", transfer->log_prefix(), transfer->transfer->file_offset, new_pos);
            result = FileInitializeResult::FILE_SEEK_FAILED;
            goto error_exit;
        }
        debugMessage(LOG_FT, "{} Seek to file offset {}. New actual offset is {}", transfer->log_prefix(), transfer->transfer->file_offset, new_pos);
    }

    return FileInitializeResult::SUCCESS;
    error_exit:
    if(std::exchange(transfer->file.file_locked, false))
        this->file_system_.unlock_file(transfer->file.absolute_path);

    if(file_data.file_descriptor > 0)
        ::close(file_data.file_descriptor);
    file_data.file_descriptor = 0;

    return result;
}

void LocalFileTransfer::finalize_file_io(const std::shared_ptr<FileClient> &transfer,
                                         std::unique_lock<std::shared_mutex> &state_lock) {
    assert(state_lock.owns_lock());

    auto& file_data = transfer->file;

    state_lock.unlock();
    {
        std::unique_lock dlock{this->disk_io.queue_lock};
        while(true) {
            if(file_data.currently_processing) {
                this->disk_io.notify_client_processed.wait(dlock);
                continue;
            }

            if(file_data.next_client) {
                if(this->disk_io.queue_head == &*transfer) {
                    this->disk_io.queue_head = file_data.next_client;
                    if(!this->disk_io.queue_head)
                        this->disk_io.queue_tail = &this->disk_io.queue_head;
                } else {
                    FileClient* head{this->disk_io.queue_head};
                    while(head->file.next_client != &*transfer) {
                        assert(head->file.next_client);
                        head = head->file.next_client;
                    }

                    head->file.next_client = file_data.next_client;
                    if(!file_data.next_client)
                        this->disk_io.queue_tail = &head->file.next_client;

                }
                file_data.next_client = nullptr;
            }

            break;
        }
    }
    state_lock.lock();

    if(std::exchange(file_data.file_locked, false))
        this->file_system_.unlock_file(file_data.absolute_path);

    if(file_data.file_descriptor > 0)
        ::close(file_data.file_descriptor);
    file_data.file_descriptor = 0;
}

void LocalFileTransfer::enqueue_disk_io(const std::shared_ptr<FileClient> &client) {
    if(!client->file.file_descriptor)
        return;

    if(!client->transfer)
        return;

    if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
        if(client->state != FileClient::STATE_TRANSFERRING)
            return;

        if(client->buffer.bytes > TRANSFER_MAX_CACHED_BYTES)
            return;
    } else if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) {
        /* we don't do this check because this might be a flush instruction, where the buffer is actually zero bytes filled */
        /*
        if(client->buffer.bytes == 0)
            return;
        */
    }

    std::lock_guard dlock{this->disk_io.queue_lock};
    if(client->file.next_client || this->disk_io.queue_tail == &client->file.next_client)
        return;

    *this->disk_io.queue_tail = &*client;
    this->disk_io.queue_tail = &client->file.next_client;

    this->disk_io.notify_work_awaiting.notify_all();
}

void LocalFileTransfer::execute_disk_io(const std::shared_ptr<FileClient> &client) {
    if(!client->transfer) return;

    if(client->transfer->direction == Transfer::DIRECTION_UPLOAD) {
        Buffer* buffer{nullptr};
        size_t buffer_left_size{0};

        while(true) {
            {
                std::lock_guard block{client->buffer.mutex};
                buffer = client->buffer.buffer_head;
                buffer_left_size = client->buffer.bytes;
            }
            if(!buffer) {
                assert(buffer_left_size == 0);
                break;
            }

            assert(buffer->offset < buffer->length);
            auto written = ::write(client->file.file_descriptor, buffer->data + buffer->offset, buffer->length - buffer->offset);
            if(written <= 0) {
                if(written == 0) {
                    /* EOF, how the hell is this event possible?! */
                    auto offset_written = client->statistics.disk_bytes_write + client->transfer->file_offset;
                    auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR);
                    logError(LOG_FT, "{} Received unexpected file write EOF. EOF received at {} but expected {}. Actual file offset: {}. Closing transfer.",
                            client->log_prefix(), offset_written, client->transfer->expected_file_size, aoffset);

                    this->report_transfer_statistics(client);
                    if(auto callback{client->handle->callback_transfer_aborted}; callback)
                        callback(client->transfer, { TransferError::UNEXPECTED_DISK_EOF, strerror(errno) });

                    {
                        std::unique_lock slock{client->state_mutex};
                        client->handle->disconnect_client(client, slock, true);
                    }
                } else {
                    if(errno == EAGAIN) {
                        //TODO: Timeout?
                        this->enqueue_disk_io(client);
                        break;
                    }

                    auto offset_written = client->statistics.disk_bytes_write + client->transfer->file_offset;
                    auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR);
                    logError(LOG_FT, "{} Received write to disk IO error. Write pointer is at {} of {}. Actual file offset: {}. Closing transfer.",
                            client->log_prefix(), offset_written, client->transfer->expected_file_size, aoffset);

                    this->report_transfer_statistics(client);
                    if(auto callback{client->handle->callback_transfer_aborted}; callback)
                        callback(client->transfer, { TransferError::DISK_IO_ERROR, strerror(errno) });

                    {
                        std::unique_lock slock{client->state_mutex};
                        client->handle->disconnect_client(client, slock, true);
                    }
                }
                return;
            } else {
                buffer->offset += written;
                assert(buffer->offset <= buffer->length);
                if(buffer->length == buffer->offset) {
                    {
                        std::lock_guard block{client->buffer.mutex};
                        client->buffer.buffer_head = buffer->next;
                        if(!buffer->next)
                            client->buffer.buffer_tail = &client->buffer.buffer_head;

                        assert(client->buffer.bytes >= written);
                        client->buffer.bytes -= written;
                        buffer_left_size = client->buffer.bytes;
                        (void) buffer_left_size; /* trick my IDE here a bit */
                    }

                    free_buffer(buffer);
                } else {
                    std::lock_guard block{client->buffer.mutex};
                    assert(client->buffer.bytes >= written);
                    client->buffer.bytes -= written;
                    buffer_left_size = client->buffer.bytes;
                    (void) buffer_left_size; /* trick my IDE here a bit */
                }

                client->statistics.disk_bytes_write += written;
            }
        }

        if(buffer_left_size > 0)
            this->enqueue_disk_io(client);
        else if(client->state == FileClient::STATE_DISCONNECTING) {
            debugMessage(LOG_FT, "{} Disk IO has been flushed.", client->log_prefix());

            std::unique_lock slock{client->state_mutex};
            client->handle->disconnect_client(client->shared_from_this(), slock, false);
        }

        if(client->state == FileClient::STATE_TRANSFERRING && buffer_left_size < TRANSFER_MAX_CACHED_BYTES / 2) {
            if(client->buffer.buffering_stopped)
                logMessage(LOG_FT, "{} Starting network read, buffer is capable for reading again.", client->log_prefix());
            client->add_network_read_event(false);
        }
    } else if(client->transfer->direction == Transfer::DIRECTION_DOWNLOAD) {
        while(true) {
            constexpr auto buffer_capacity{4096};
            char buffer[buffer_capacity];

            auto read = ::read(client->file.file_descriptor, buffer, buffer_capacity);
            if(read <= 0) {
                if(read == 0) {
                    /* EOF */
                    auto offset_send = client->statistics.disk_bytes_read + client->transfer->file_offset;
                    if(client->transfer->expected_file_size == offset_send) {
                        debugMessage(LOG_FT, "{} Finished file reading. Flushing and disconnecting transfer. Reading took {} seconds.",
                                client->log_prefix(), duration_to_string(std::chrono::system_clock::now() - client->timings.key_received));
                    } else {
                        auto aoffset = lseek(client->file.file_descriptor, 0, SEEK_CUR);
                        logError(LOG_FT, "{} Received unexpected read EOF. EOF received at {} but expected {}. Actual file offset: {}. Disconnecting client.",
                                client->log_prefix(), offset_send, client->transfer->expected_file_size, aoffset);

                        this->report_transfer_statistics(client);
                        if(auto callback{client->handle->callback_transfer_aborted}; callback)
                            callback(client->transfer, { TransferError::UNEXPECTED_DISK_EOF, strerror(errno) });
                    }

                    {
                        std::unique_lock slock{client->state_mutex};
                        client->handle->disconnect_client(client, slock, true);
                    }
                } else {
                    if(errno == EAGAIN) {
                        this->enqueue_disk_io(client);
                        return;
                    }

                    logWarning(LOG_FT, "{} Failed to read from file {} ({}/{}). Aborting transfer.", client->log_prefix(), client->file.absolute_path, errno, strerror(errno));

                    this->report_transfer_statistics(client);
                    if(auto callback{client->handle->callback_transfer_aborted}; callback)
                        callback(client->transfer, { TransferError::DISK_IO_ERROR, strerror(errno) });

                    {
                        std::unique_lock slock{client->state_mutex};
                        client->handle->disconnect_client(client, slock, true);
                    }
                }
                return;
            } else {
                auto buffer_full = client->send_file_bytes(buffer, read);
                client->statistics.disk_bytes_read += read;
                client->statistics.file_bytes_transferred += read;

                std::shared_lock slock{client->state_mutex};
                if(buffer_full) {
                    logMessage(LOG_FT, "{} Stopping buffering from disk. Buffer full ({}bytes)", client->log_prefix(), client->buffer.bytes);
                    break;
                }

                /* we've stuff to write again, yeahr */
                client->add_network_write_event(false);
            }
        }
    } else {
        logError(LOG_FT, "{} Disk IO scheduled, but transfer direction is unknown.", client->log_prefix());
    }
}