From b5d7b90c0e2a8f49a66e8e7936dd30cece7680fb Mon Sep 17 00:00:00 2001 From: J-D-K Date: Sun, 21 Sep 2025 22:01:47 -0400 Subject: [PATCH] Upgrade to Queued Buffering. --- source/appstates/SettingsState.cpp | 2 +- source/fs/io.cpp | 135 +++++++++------------ source/fs/zip.cpp | 186 ++++++++++++----------------- 3 files changed, 134 insertions(+), 189 deletions(-) diff --git a/source/appstates/SettingsState.cpp b/source/appstates/SettingsState.cpp index 78eda86..0b2075b 100644 --- a/source/appstates/SettingsState.cpp +++ b/source/appstates/SettingsState.cpp @@ -176,7 +176,7 @@ void SettingsState::change_working_directory() if (exists) { moved = fs::move_directory_recursively(oldPath, newPath); - error::fslib(fslib::delete_directory_recursively(oldPath)) + error::fslib(fslib::delete_directory_recursively(oldPath)); } else { moved = fslib::rename_directory(oldPath, newPath); } diff --git a/source/fs/io.cpp b/source/fs/io.cpp index 67e7570..252c237 100644 --- a/source/fs/io.cpp +++ b/source/fs/io.cpp @@ -9,58 +9,53 @@ #include "sys/sys.hpp" #include "ui/PopMessageManager.hpp" -#include #include #include #include +#include namespace { // Size of buffer shared between threads. // constexpr size_t SIZE_FILE_BUFFER = 0x600000; // This one is just for testing something. - constexpr size_t SIZE_FILE_BUFFER = 0x200000; -} // namespace + constexpr size_t SIZE_FILE_BUFFER = 0x80000; -// clang-format off -struct FileThreadStruct : sys::threadpool::DataStruct -{ - std::mutex lock{}; - std::condition_variable condition{}; - bool bufferReady{}; - ssize_t readSize{}; - std::unique_ptr sharedBuffer{}; - fslib::File *source{}; -}; -// clang-format on + /// @brief Easier to type later. + using QueuePair = std::pair, ssize_t>; + + // clang-format off + struct FileThreadStruct : sys::threadpool::DataStruct + { + std::mutex lock{}; + std::queue bufferQueue{}; + fslib::File *source{}; + }; + // clang-format on +} // namespace static void read_thread_function(sys::threadpool::JobData jobData) { auto castData = std::static_pointer_cast(jobData); - std::mutex &lock = castData->lock; - std::condition_variable &condition = castData->condition; - bool &bufferReady = castData->bufferReady; - ssize_t &readSize = castData->readSize; - std::unique_ptr &sharedBuffer = castData->sharedBuffer; - fslib::File &source = *castData->source; - const int64_t fileSize = source.get_size(); + std::mutex &lock = castData->lock; + auto &bufferQueue = castData->bufferQueue; + fslib::File &source = *castData->source; + const int64_t fileSize = source.get_size(); for (int64_t i = 0; i < fileSize;) { - ssize_t localRead{}; + auto readBuffer = std::make_unique(SIZE_FILE_BUFFER); + + const ssize_t readSize = source.read(readBuffer.get(), SIZE_FILE_BUFFER); + auto queuePair = std::make_pair(std::move(readBuffer), readSize); { - std::unique_lock bufferLock(lock); - condition.wait(bufferLock, [&]() { return bufferReady == false; }); - readSize = source.read(sharedBuffer.get(), SIZE_FILE_BUFFER); - localRead = readSize; - - bufferReady = true; - condition.notify_one(); + std::lock_guard queueGuard{lock}; + bufferQueue.push(std::move(queuePair)); } - if (localRead == -1) { break; } - i += localRead; + + i += readSize; } } @@ -81,38 +76,30 @@ void fs::copy_file(const fslib::Path &source, const fslib::Path &destination, sy task->reset(static_cast(sourceSize)); } - auto sharedData = std::make_shared(); - sharedData->sharedBuffer = std::make_unique(SIZE_FILE_BUFFER); - sharedData->source = &sourceFile; + auto sharedData = std::make_shared(); + sharedData->source = &sourceFile; - auto localBuffer = std::make_unique(SIZE_FILE_BUFFER); - - std::mutex &lock = sharedData->lock; - std::condition_variable &condition = sharedData->condition; - bool &bufferReady = sharedData->bufferReady; - ssize_t &readSize = sharedData->readSize; - auto &sharedBuffer = sharedData->sharedBuffer; + std::mutex &lock = sharedData->lock; + auto &bufferQueue = sharedData->bufferQueue; sys::threadpool::push_job(read_thread_function, sharedData); - for (int64_t i = 0; i < sourceSize; i++) + for (int64_t i = 0; i < sourceSize;) { - ssize_t localRead{}; + QueuePair queuePair{}; { - std::unique_lock bufferLock(lock); - condition.wait(bufferLock, [&]() { return bufferReady == true; }); + std::lock_guard queueGuard{lock}; + if (bufferQueue.empty()) { continue; } - localRead = readSize; - if (localRead == -1) { break; } - - std::memcpy(localBuffer.get(), sharedBuffer.get(), localRead); - - bufferReady = false; - condition.notify_one(); + queuePair = std::move(bufferQueue.front()); + bufferQueue.pop(); } - // This should be checked. Not sure how yet... - destFile.write(localBuffer.get(), localRead); - i += localRead; + auto &[buffer, bufferSize] = queuePair; + + destFile.write(buffer.get(), bufferSize); + + i += bufferSize; + if (task) { task->update_current(static_cast(i)); } } } @@ -139,37 +126,27 @@ void fs::copy_file_commit(const fslib::Path &source, task->reset(static_cast(sourceSize)); } - auto sharedData = std::make_shared(); - sharedData->sharedBuffer = std::make_unique(SIZE_FILE_BUFFER); - sharedData->source = &sourceFile; + auto sharedData = std::make_shared(); + sharedData->source = &sourceFile; - auto localBuffer = std::make_unique(SIZE_FILE_BUFFER); - - std::mutex &lock = sharedData->lock; - std::condition_variable &condition = sharedData->condition; - bool &bufferReady = sharedData->bufferReady; - ssize_t &readSize = sharedData->readSize; - auto &sharedBuffer = sharedData->sharedBuffer; + std::mutex &lock = sharedData->lock; + auto &bufferQueue = sharedData->bufferQueue; int64_t journalCount{}; sys::threadpool::push_job(read_thread_function, sharedData); for (int64_t i = 0; i < sourceSize;) { - ssize_t localRead{}; + QueuePair queuePair{}; { - std::unique_lock bufferlock{lock}; - condition.wait(bufferlock, [&]() { return bufferReady == true; }); + std::lock_guard queueGuard{lock}; + if (bufferQueue.empty()) { continue; } - localRead = readSize; - std::memcpy(localBuffer.get(), sharedBuffer.get(), localRead); - - bufferReady = false; - condition.notify_one(); + queuePair = std::move(bufferQueue.front()); + bufferQueue.pop(); } - if (localRead == -1) { break; } - - const bool needsCommit = journalCount + localRead >= journalSize; + const auto &[buffer, bufferSize] = queuePair; + const bool needsCommit = journalCount + bufferSize >= journalSize; if (needsCommit) { destFile.close(); @@ -181,10 +158,10 @@ void fs::copy_file_commit(const fslib::Path &source, destFile.seek(i, destFile.BEGINNING); } - destFile.write(localBuffer.get(), localRead); + destFile.write(buffer.get(), bufferSize); - i += localRead; - journalCount += localRead; + i += bufferSize; + journalCount += bufferSize; if (task) { task->update_current(static_cast(i)); } } destFile.close(); diff --git a/source/fs/zip.cpp b/source/fs/zip.cpp index 8796264..18b6397 100644 --- a/source/fs/zip.cpp +++ b/source/fs/zip.cpp @@ -9,73 +9,64 @@ #include "sys/sys.hpp" #include "ui/PopMessageManager.hpp" -#include #include #include #include #include -#include +#include namespace { /// @brief Buffer size used for writing files to ZIP. - constexpr size_t SIZE_ZIP_BUFFER = 0x40000; + constexpr size_t SIZE_ZIP_BUFFER = 0x10000; /// @brief Buffer size used for decompressing files from ZIP. - constexpr size_t SIZE_UNZIP_BUFFER = 0x600000; + constexpr size_t SIZE_UNZIP_BUFFER = 0x80000; + + using QueuePair = std::pair, ssize_t>; + + // Shared struct for Zip/File IO + // clang-format off + struct ZipIOBase : sys::threadpool::DataStruct + { + std::mutex lock{}; + std::queue bufferQueue{}; + + }; + + struct ZipReadStruct : ZipIOBase + { + fslib::File *source{}; + }; + + struct UnzipReadStruct : ZipIOBase + { + fs::MiniUnzip *unzip{}; + }; + // clang-format on } // namespace -// Shared struct for Zip/File IO -// clang-format off -struct ZipIOBase : sys::threadpool::DataStruct -{ - std::mutex lock{}; - std::condition_variable condition{}; - ssize_t readSize{}; - bool bufferReady{}; - std::unique_ptr sharedBuffer{}; - std::binary_semaphore writeComplete{0}; -}; - -struct ZipReadStruct : ZipIOBase -{ - fslib::File *source{}; -}; - -struct UnzipReadStruct : ZipIOBase -{ - fs::MiniUnzip *unzip{}; -}; -// clang-format on - // Function for reading files for Zipping. static void zip_read_thread_function(sys::threadpool::JobData jobData) { auto castData = std::static_pointer_cast(jobData); - std::mutex &lock = castData->lock; - std::condition_variable &condition = castData->condition; - ssize_t &readSize = castData->readSize; - bool &bufferReady = castData->bufferReady; - std::unique_ptr &sharedBuffer = castData->sharedBuffer; - fslib::File &source = *castData->source; - const int64_t fileSize = source.get_size(); + std::mutex &lock = castData->lock; + auto &bufferQueue = castData->bufferQueue; + fslib::File &source = *castData->source; + const int64_t fileSize = source.get_size(); for (int64_t i = 0; i < fileSize;) { - ssize_t localRead{}; // This is a local variable to store the read size so we don't need to hold the other thread up. + auto readBuffer = std::make_unique(SIZE_ZIP_BUFFER); + const ssize_t readSize = source.read(readBuffer.get(), SIZE_ZIP_BUFFER); { - std::unique_lock bufferLock(lock); - condition.wait(bufferLock, [&]() { return bufferReady == false; }); - - readSize = source.read(sharedBuffer.get(), SIZE_ZIP_BUFFER); - localRead = readSize; - - bufferReady = true; - condition.notify_one(); + std::lock_guard queueGuard{lock}; + auto queuePair = std::make_pair(std::move(readBuffer), readSize); + bufferQueue.push(std::move(queuePair)); } - if (readSize == -1) { break; } - i += localRead; + + i += readSize; } } @@ -84,29 +75,21 @@ static void unzip_read_thread_function(sys::threadpool::JobData jobData) { auto castData = std::static_pointer_cast(jobData); - std::mutex &lock = castData->lock; - std::condition_variable &condition = castData->condition; - ssize_t &readSize = castData->readSize; - bool &bufferReady = castData->bufferReady; - std::unique_ptr &sharedBuffer = castData->sharedBuffer; - fs::MiniUnzip &unzip = *castData->unzip; - const int64_t fileSize = unzip.get_uncompressed_size(); + std::mutex &lock = castData->lock; + auto &bufferQueue = castData->bufferQueue; + fs::MiniUnzip &unzip = *castData->unzip; + const int64_t fileSize = unzip.get_uncompressed_size(); for (int64_t i = 0; i < fileSize;) { - ssize_t localRead{}; + auto readBuffer = std::make_unique(SIZE_UNZIP_BUFFER); + const ssize_t readSize = unzip.read(readBuffer.get(), SIZE_UNZIP_BUFFER); { - std::unique_lock bufferLock(lock); - condition.wait(bufferLock, [&]() { return bufferReady == false; }); - - readSize = unzip.read(sharedBuffer.get(), SIZE_UNZIP_BUFFER); - localRead = readSize; - - bufferReady = true; - condition.notify_one(); + std::lock_guard queueGuard{lock}; + auto queuePair = std::make_pair(std::move(readBuffer), readSize); + bufferQueue.push(std::move(queuePair)); } - if (localRead == -1) { break; } - i += localRead; + i += readSize; } } @@ -132,12 +115,9 @@ void fs::copy_directory_to_zip(const fslib::Path &source, fs::MiniZip &dest, sys const bool newZipFile = dest.open_new_file(sourceString); if (error::fslib(sourceFile.is_open()) || !newZipFile) { continue; } - const int64_t fileSize = sourceFile.get_size(); - auto sharedData = std::make_shared(); - sharedData->source = &sourceFile; - sharedData->sharedBuffer = std::make_unique(SIZE_ZIP_BUFFER); - - auto localBuffer = std::make_unique(SIZE_ZIP_BUFFER); + const int64_t fileSize = sourceFile.get_size(); + auto sharedData = std::make_shared(); + sharedData->source = &sourceFile; if (task) { @@ -147,31 +127,25 @@ void fs::copy_directory_to_zip(const fslib::Path &source, fs::MiniZip &dest, sys } // I just like doing this. It makes things easier to type. - std::mutex &lock = sharedData->lock; - std::condition_variable &condition = sharedData->condition; - ssize_t &readSize = sharedData->readSize; - bool &bufferReady = sharedData->bufferReady; - auto &sharedBuffer = sharedData->sharedBuffer; + std::mutex &lock = sharedData->lock; + auto &bufferQueue = sharedData->bufferQueue; sys::threadpool::push_job(zip_read_thread_function, sharedData); for (int64_t i = 0; i < fileSize;) { - ssize_t localRead{}; + QueuePair queuePair{}; { - std::unique_lock bufferLock(lock); - condition.wait(bufferLock, [&]() { return bufferReady == true; }); + std::lock_guard queueGuard{lock}; + if (bufferQueue.empty()) { continue; } - localRead = readSize; - - std::memcpy(localBuffer.get(), sharedBuffer.get(), localRead); - - bufferReady = false; - condition.notify_one(); + queuePair = std::move(bufferQueue.front()); + bufferQueue.pop(); } - const bool writeGood = localRead != -1 && dest.write(localBuffer.get(), localRead); - if (!writeGood) { break; } - i += localRead; + auto &[buffer, bufferSize] = queuePair; + dest.write(buffer.get(), bufferSize); + + i += bufferSize; if (task) { task->update_current(static_cast(i)); } } @@ -223,35 +197,28 @@ void fs::copy_zip_to_directory(fs::MiniUnzip &unzip, const fslib::Path &dest, in task->reset(static_cast(fileSize)); } - auto sharedData = std::make_shared(); - sharedData->sharedBuffer = std::make_unique(SIZE_UNZIP_BUFFER); - sharedData->unzip = &unzip; + auto sharedData = std::make_shared(); + sharedData->unzip = &unzip; - auto localBuffer = std::make_unique(SIZE_UNZIP_BUFFER); - - std::mutex &lock = sharedData->lock; - std::condition_variable &condition = sharedData->condition; - ssize_t &readSize = sharedData->readSize; - bool &bufferReady = sharedData->bufferReady; - std::unique_ptr &sharedBuffer = sharedData->sharedBuffer; + std::mutex &lock = sharedData->lock; + auto &bufferQueue = sharedData->bufferQueue; int64_t journalCount{}; sys::threadpool::push_job(unzip_read_thread_function, sharedData); for (int64_t i = 0; i < fileSize;) { - ssize_t localRead{}; + QueuePair queuePair{}; { - std::unique_lock bufferLock(lock); - condition.wait(bufferLock, [&]() { return bufferReady == true; }); + std::lock_guard queueGuard{lock}; + if (bufferQueue.empty()) { continue; } - localRead = readSize; - std::memcpy(localBuffer.get(), sharedBuffer.get(), localRead); - - bufferReady = false; - condition.notify_one(); + queuePair = std::move(bufferQueue.front()); + bufferQueue.pop(); } - const bool commitNeeded = needCommits && journalCount + localRead >= journalSize; + auto &[buffer, bufferSize] = queuePair; + + const bool commitNeeded = needCommits && journalCount + bufferSize >= journalSize; if (commitNeeded) { destFile.close(); @@ -262,11 +229,12 @@ void fs::copy_zip_to_directory(fs::MiniUnzip &unzip, const fslib::Path &dest, in destFile.seek(i, destFile.BEGINNING); journalCount = 0; } - const bool goodWrite = localRead != -1 && destFile.write(localBuffer.get(), localRead); - error::fslib(goodWrite); // This will catch the error. To do: Recovery. - i += localRead; - journalCount += localRead; + destFile.write(buffer.get(), bufferSize); + + i += bufferSize; + journalCount += bufferSize; + if (task) { task->update_current(static_cast(i)); } } destFile.close();