Upgrade to Queued Buffering.

This commit is contained in:
J-D-K 2025-09-21 22:01:47 -04:00
parent 73d6bc2188
commit b5d7b90c0e
3 changed files with 134 additions and 189 deletions

View File

@ -176,7 +176,7 @@ void SettingsState::change_working_directory()
if (exists)
{
moved = fs::move_directory_recursively(oldPath, newPath);
error::fslib(fslib::delete_directory_recursively(oldPath))
error::fslib(fslib::delete_directory_recursively(oldPath));
}
else { moved = fslib::rename_directory(oldPath, newPath); }

View File

@ -9,58 +9,53 @@
#include "sys/sys.hpp"
#include "ui/PopMessageManager.hpp"
#include <condition_variable>
#include <cstring>
#include <memory>
#include <mutex>
#include <queue>
namespace
{
// Size of buffer shared between threads.
// constexpr size_t SIZE_FILE_BUFFER = 0x600000;
// This one is just for testing something.
constexpr size_t SIZE_FILE_BUFFER = 0x200000;
} // namespace
constexpr size_t SIZE_FILE_BUFFER = 0x80000;
// clang-format off
struct FileThreadStruct : sys::threadpool::DataStruct
{
std::mutex lock{};
std::condition_variable condition{};
bool bufferReady{};
ssize_t readSize{};
std::unique_ptr<sys::Byte[]> sharedBuffer{};
fslib::File *source{};
};
// clang-format on
/// @brief Easier to type later.
using QueuePair = std::pair<std::unique_ptr<sys::Byte[]>, ssize_t>;
// clang-format off
struct FileThreadStruct : sys::threadpool::DataStruct
{
std::mutex lock{};
std::queue<QueuePair> bufferQueue{};
fslib::File *source{};
};
// clang-format on
} // namespace
static void read_thread_function(sys::threadpool::JobData jobData)
{
auto castData = std::static_pointer_cast<FileThreadStruct>(jobData);
std::mutex &lock = castData->lock;
std::condition_variable &condition = castData->condition;
bool &bufferReady = castData->bufferReady;
ssize_t &readSize = castData->readSize;
std::unique_ptr<sys::Byte[]> &sharedBuffer = castData->sharedBuffer;
fslib::File &source = *castData->source;
const int64_t fileSize = source.get_size();
std::mutex &lock = castData->lock;
auto &bufferQueue = castData->bufferQueue;
fslib::File &source = *castData->source;
const int64_t fileSize = source.get_size();
for (int64_t i = 0; i < fileSize;)
{
ssize_t localRead{};
auto readBuffer = std::make_unique<sys::Byte[]>(SIZE_FILE_BUFFER);
const ssize_t readSize = source.read(readBuffer.get(), SIZE_FILE_BUFFER);
auto queuePair = std::make_pair(std::move(readBuffer), readSize);
{
std::unique_lock<std::mutex> bufferLock(lock);
condition.wait(bufferLock, [&]() { return bufferReady == false; });
readSize = source.read(sharedBuffer.get(), SIZE_FILE_BUFFER);
localRead = readSize;
bufferReady = true;
condition.notify_one();
std::lock_guard queueGuard{lock};
bufferQueue.push(std::move(queuePair));
}
if (localRead == -1) { break; }
i += localRead;
i += readSize;
}
}
@ -81,38 +76,30 @@ void fs::copy_file(const fslib::Path &source, const fslib::Path &destination, sy
task->reset(static_cast<double>(sourceSize));
}
auto sharedData = std::make_shared<FileThreadStruct>();
sharedData->sharedBuffer = std::make_unique<sys::Byte[]>(SIZE_FILE_BUFFER);
sharedData->source = &sourceFile;
auto sharedData = std::make_shared<FileThreadStruct>();
sharedData->source = &sourceFile;
auto localBuffer = std::make_unique<sys::Byte[]>(SIZE_FILE_BUFFER);
std::mutex &lock = sharedData->lock;
std::condition_variable &condition = sharedData->condition;
bool &bufferReady = sharedData->bufferReady;
ssize_t &readSize = sharedData->readSize;
auto &sharedBuffer = sharedData->sharedBuffer;
std::mutex &lock = sharedData->lock;
auto &bufferQueue = sharedData->bufferQueue;
sys::threadpool::push_job(read_thread_function, sharedData);
for (int64_t i = 0; i < sourceSize; i++)
for (int64_t i = 0; i < sourceSize;)
{
ssize_t localRead{};
QueuePair queuePair{};
{
std::unique_lock<std::mutex> bufferLock(lock);
condition.wait(bufferLock, [&]() { return bufferReady == true; });
std::lock_guard queueGuard{lock};
if (bufferQueue.empty()) { continue; }
localRead = readSize;
if (localRead == -1) { break; }
std::memcpy(localBuffer.get(), sharedBuffer.get(), localRead);
bufferReady = false;
condition.notify_one();
queuePair = std::move(bufferQueue.front());
bufferQueue.pop();
}
// This should be checked. Not sure how yet...
destFile.write(localBuffer.get(), localRead);
i += localRead;
auto &[buffer, bufferSize] = queuePair;
destFile.write(buffer.get(), bufferSize);
i += bufferSize;
if (task) { task->update_current(static_cast<double>(i)); }
}
}
@ -139,37 +126,27 @@ void fs::copy_file_commit(const fslib::Path &source,
task->reset(static_cast<double>(sourceSize));
}
auto sharedData = std::make_shared<FileThreadStruct>();
sharedData->sharedBuffer = std::make_unique<sys::Byte[]>(SIZE_FILE_BUFFER);
sharedData->source = &sourceFile;
auto sharedData = std::make_shared<FileThreadStruct>();
sharedData->source = &sourceFile;
auto localBuffer = std::make_unique<sys::Byte[]>(SIZE_FILE_BUFFER);
std::mutex &lock = sharedData->lock;
std::condition_variable &condition = sharedData->condition;
bool &bufferReady = sharedData->bufferReady;
ssize_t &readSize = sharedData->readSize;
auto &sharedBuffer = sharedData->sharedBuffer;
std::mutex &lock = sharedData->lock;
auto &bufferQueue = sharedData->bufferQueue;
int64_t journalCount{};
sys::threadpool::push_job(read_thread_function, sharedData);
for (int64_t i = 0; i < sourceSize;)
{
ssize_t localRead{};
QueuePair queuePair{};
{
std::unique_lock<std::mutex> bufferlock{lock};
condition.wait(bufferlock, [&]() { return bufferReady == true; });
std::lock_guard queueGuard{lock};
if (bufferQueue.empty()) { continue; }
localRead = readSize;
std::memcpy(localBuffer.get(), sharedBuffer.get(), localRead);
bufferReady = false;
condition.notify_one();
queuePair = std::move(bufferQueue.front());
bufferQueue.pop();
}
if (localRead == -1) { break; }
const bool needsCommit = journalCount + localRead >= journalSize;
const auto &[buffer, bufferSize] = queuePair;
const bool needsCommit = journalCount + bufferSize >= journalSize;
if (needsCommit)
{
destFile.close();
@ -181,10 +158,10 @@ void fs::copy_file_commit(const fslib::Path &source,
destFile.seek(i, destFile.BEGINNING);
}
destFile.write(localBuffer.get(), localRead);
destFile.write(buffer.get(), bufferSize);
i += localRead;
journalCount += localRead;
i += bufferSize;
journalCount += bufferSize;
if (task) { task->update_current(static_cast<double>(i)); }
}
destFile.close();

View File

@ -9,73 +9,64 @@
#include "sys/sys.hpp"
#include "ui/PopMessageManager.hpp"
#include <condition_variable>
#include <cstring>
#include <ctime>
#include <memory>
#include <mutex>
#include <semaphore>
#include <queue>
namespace
{
/// @brief Buffer size used for writing files to ZIP.
constexpr size_t SIZE_ZIP_BUFFER = 0x40000;
constexpr size_t SIZE_ZIP_BUFFER = 0x10000;
/// @brief Buffer size used for decompressing files from ZIP.
constexpr size_t SIZE_UNZIP_BUFFER = 0x600000;
constexpr size_t SIZE_UNZIP_BUFFER = 0x80000;
using QueuePair = std::pair<std::unique_ptr<sys::Byte[]>, ssize_t>;
// Shared struct for Zip/File IO
// clang-format off
struct ZipIOBase : sys::threadpool::DataStruct
{
std::mutex lock{};
std::queue<QueuePair> bufferQueue{};
};
struct ZipReadStruct : ZipIOBase
{
fslib::File *source{};
};
struct UnzipReadStruct : ZipIOBase
{
fs::MiniUnzip *unzip{};
};
// clang-format on
} // namespace
// Shared struct for Zip/File IO
// clang-format off
struct ZipIOBase : sys::threadpool::DataStruct
{
std::mutex lock{};
std::condition_variable condition{};
ssize_t readSize{};
bool bufferReady{};
std::unique_ptr<sys::Byte[]> sharedBuffer{};
std::binary_semaphore writeComplete{0};
};
struct ZipReadStruct : ZipIOBase
{
fslib::File *source{};
};
struct UnzipReadStruct : ZipIOBase
{
fs::MiniUnzip *unzip{};
};
// clang-format on
// Function for reading files for Zipping.
static void zip_read_thread_function(sys::threadpool::JobData jobData)
{
auto castData = std::static_pointer_cast<ZipReadStruct>(jobData);
std::mutex &lock = castData->lock;
std::condition_variable &condition = castData->condition;
ssize_t &readSize = castData->readSize;
bool &bufferReady = castData->bufferReady;
std::unique_ptr<sys::Byte[]> &sharedBuffer = castData->sharedBuffer;
fslib::File &source = *castData->source;
const int64_t fileSize = source.get_size();
std::mutex &lock = castData->lock;
auto &bufferQueue = castData->bufferQueue;
fslib::File &source = *castData->source;
const int64_t fileSize = source.get_size();
for (int64_t i = 0; i < fileSize;)
{
ssize_t localRead{}; // This is a local variable to store the read size so we don't need to hold the other thread up.
auto readBuffer = std::make_unique<sys::Byte[]>(SIZE_ZIP_BUFFER);
const ssize_t readSize = source.read(readBuffer.get(), SIZE_ZIP_BUFFER);
{
std::unique_lock<std::mutex> bufferLock(lock);
condition.wait(bufferLock, [&]() { return bufferReady == false; });
readSize = source.read(sharedBuffer.get(), SIZE_ZIP_BUFFER);
localRead = readSize;
bufferReady = true;
condition.notify_one();
std::lock_guard queueGuard{lock};
auto queuePair = std::make_pair(std::move(readBuffer), readSize);
bufferQueue.push(std::move(queuePair));
}
if (readSize == -1) { break; }
i += localRead;
i += readSize;
}
}
@ -84,29 +75,21 @@ static void unzip_read_thread_function(sys::threadpool::JobData jobData)
{
auto castData = std::static_pointer_cast<UnzipReadStruct>(jobData);
std::mutex &lock = castData->lock;
std::condition_variable &condition = castData->condition;
ssize_t &readSize = castData->readSize;
bool &bufferReady = castData->bufferReady;
std::unique_ptr<sys::Byte[]> &sharedBuffer = castData->sharedBuffer;
fs::MiniUnzip &unzip = *castData->unzip;
const int64_t fileSize = unzip.get_uncompressed_size();
std::mutex &lock = castData->lock;
auto &bufferQueue = castData->bufferQueue;
fs::MiniUnzip &unzip = *castData->unzip;
const int64_t fileSize = unzip.get_uncompressed_size();
for (int64_t i = 0; i < fileSize;)
{
ssize_t localRead{};
auto readBuffer = std::make_unique<sys::Byte[]>(SIZE_UNZIP_BUFFER);
const ssize_t readSize = unzip.read(readBuffer.get(), SIZE_UNZIP_BUFFER);
{
std::unique_lock<std::mutex> bufferLock(lock);
condition.wait(bufferLock, [&]() { return bufferReady == false; });
readSize = unzip.read(sharedBuffer.get(), SIZE_UNZIP_BUFFER);
localRead = readSize;
bufferReady = true;
condition.notify_one();
std::lock_guard queueGuard{lock};
auto queuePair = std::make_pair(std::move(readBuffer), readSize);
bufferQueue.push(std::move(queuePair));
}
if (localRead == -1) { break; }
i += localRead;
i += readSize;
}
}
@ -132,12 +115,9 @@ void fs::copy_directory_to_zip(const fslib::Path &source, fs::MiniZip &dest, sys
const bool newZipFile = dest.open_new_file(sourceString);
if (error::fslib(sourceFile.is_open()) || !newZipFile) { continue; }
const int64_t fileSize = sourceFile.get_size();
auto sharedData = std::make_shared<ZipReadStruct>();
sharedData->source = &sourceFile;
sharedData->sharedBuffer = std::make_unique<sys::Byte[]>(SIZE_ZIP_BUFFER);
auto localBuffer = std::make_unique<sys::Byte[]>(SIZE_ZIP_BUFFER);
const int64_t fileSize = sourceFile.get_size();
auto sharedData = std::make_shared<ZipReadStruct>();
sharedData->source = &sourceFile;
if (task)
{
@ -147,31 +127,25 @@ void fs::copy_directory_to_zip(const fslib::Path &source, fs::MiniZip &dest, sys
}
// I just like doing this. It makes things easier to type.
std::mutex &lock = sharedData->lock;
std::condition_variable &condition = sharedData->condition;
ssize_t &readSize = sharedData->readSize;
bool &bufferReady = sharedData->bufferReady;
auto &sharedBuffer = sharedData->sharedBuffer;
std::mutex &lock = sharedData->lock;
auto &bufferQueue = sharedData->bufferQueue;
sys::threadpool::push_job(zip_read_thread_function, sharedData);
for (int64_t i = 0; i < fileSize;)
{
ssize_t localRead{};
QueuePair queuePair{};
{
std::unique_lock<std::mutex> bufferLock(lock);
condition.wait(bufferLock, [&]() { return bufferReady == true; });
std::lock_guard queueGuard{lock};
if (bufferQueue.empty()) { continue; }
localRead = readSize;
std::memcpy(localBuffer.get(), sharedBuffer.get(), localRead);
bufferReady = false;
condition.notify_one();
queuePair = std::move(bufferQueue.front());
bufferQueue.pop();
}
const bool writeGood = localRead != -1 && dest.write(localBuffer.get(), localRead);
if (!writeGood) { break; }
i += localRead;
auto &[buffer, bufferSize] = queuePair;
dest.write(buffer.get(), bufferSize);
i += bufferSize;
if (task) { task->update_current(static_cast<double>(i)); }
}
@ -223,35 +197,28 @@ void fs::copy_zip_to_directory(fs::MiniUnzip &unzip, const fslib::Path &dest, in
task->reset(static_cast<double>(fileSize));
}
auto sharedData = std::make_shared<UnzipReadStruct>();
sharedData->sharedBuffer = std::make_unique<sys::Byte[]>(SIZE_UNZIP_BUFFER);
sharedData->unzip = &unzip;
auto sharedData = std::make_shared<UnzipReadStruct>();
sharedData->unzip = &unzip;
auto localBuffer = std::make_unique<sys::Byte[]>(SIZE_UNZIP_BUFFER);
std::mutex &lock = sharedData->lock;
std::condition_variable &condition = sharedData->condition;
ssize_t &readSize = sharedData->readSize;
bool &bufferReady = sharedData->bufferReady;
std::unique_ptr<sys::Byte[]> &sharedBuffer = sharedData->sharedBuffer;
std::mutex &lock = sharedData->lock;
auto &bufferQueue = sharedData->bufferQueue;
int64_t journalCount{};
sys::threadpool::push_job(unzip_read_thread_function, sharedData);
for (int64_t i = 0; i < fileSize;)
{
ssize_t localRead{};
QueuePair queuePair{};
{
std::unique_lock<std::mutex> bufferLock(lock);
condition.wait(bufferLock, [&]() { return bufferReady == true; });
std::lock_guard queueGuard{lock};
if (bufferQueue.empty()) { continue; }
localRead = readSize;
std::memcpy(localBuffer.get(), sharedBuffer.get(), localRead);
bufferReady = false;
condition.notify_one();
queuePair = std::move(bufferQueue.front());
bufferQueue.pop();
}
const bool commitNeeded = needCommits && journalCount + localRead >= journalSize;
auto &[buffer, bufferSize] = queuePair;
const bool commitNeeded = needCommits && journalCount + bufferSize >= journalSize;
if (commitNeeded)
{
destFile.close();
@ -262,11 +229,12 @@ void fs::copy_zip_to_directory(fs::MiniUnzip &unzip, const fslib::Path &dest, in
destFile.seek(i, destFile.BEGINNING);
journalCount = 0;
}
const bool goodWrite = localRead != -1 && destFile.write(localBuffer.get(), localRead);
error::fslib(goodWrite); // This will catch the error. To do: Recovery.
i += localRead;
journalCount += localRead;
destFile.write(buffer.get(), bufferSize);
i += bufferSize;
journalCount += bufferSize;
if (task) { task->update_current(static_cast<double>(i)); }
}
destFile.close();