Improve FS throttle

PS's FS(...).writeUpdate(...) has a `throttle` option.

This changes it so it's possible to call it with the throttle on
sometimes and off sometimes, and "throttle off" will pre-empt "throttle
on" calls.
This commit is contained in:
Guangcong Luo 2018-09-21 16:34:48 -05:00
parent 60075830b6
commit 8c3e75706a

View File

@ -156,32 +156,44 @@ class FSPath {
* No synchronous version because there's no risk of race conditions
* with synchronous code; just use `safeWriteSync`.
*
* DO NOT do anything with the returned Promise; it's not meaningful.
*
* @param {() => string | Buffer} dataFetcher
* @param {Object} options
*/
async writeUpdate(dataFetcher, options = {}) {
writeUpdate(dataFetcher, options = {}) {
if (Config.nofswriting) return;
const pendingUpdate = FS.pendingUpdates.get(this.path);
if (pendingUpdate) {
pendingUpdate[1] = dataFetcher;
pendingUpdate[2] = options;
return;
}
let pendingFetcher = /** @type {(() => string | Buffer)?} */ (dataFetcher);
while (pendingFetcher) {
let updatePromise = this.safeWrite(pendingFetcher(), options);
FS.pendingUpdates.set(this.path, [updatePromise, null, options]);
await updatePromise;
if (options.throttle) {
await new Promise(resolve => setTimeout(resolve, options.throttle));
const [, oldOptions, throttlePromise] = pendingUpdate;
if (!throttlePromise || options.throttle) {
pendingUpdate[0] = dataFetcher;
if (!options.throttle) oldOptions.throttle = 0;
return;
}
const pendingUpdate = FS.pendingUpdates.get(this.path);
if (!pendingUpdate) return;
[updatePromise, pendingFetcher, options] = pendingUpdate;
FS.pendingUpdates.delete(this.path);
}
FS.pendingUpdates.delete(this.path);
(async (dataFetcher, options) => {
let pendingFetcher = /** @type {(() => string | Buffer)?} */ (dataFetcher);
while (pendingFetcher) {
/** @type {PendingUpdate | undefined} */
let pendingUpdate = [null, options, null];
FS.pendingUpdates.set(this.path, pendingUpdate);
await this.safeWrite(pendingFetcher(), options);
pendingUpdate = FS.pendingUpdates.get(this.path);
if (!pendingUpdate) return;
[, options] = pendingUpdate;
if (options.throttle) {
let throttlePromise = new Promise(resolve => setTimeout(resolve, options.throttle));
pendingUpdate[2] = throttlePromise;
await throttlePromise;
let newUpdate = FS.pendingUpdates.get(this.path);
if (newUpdate && newUpdate[2] === throttlePromise) return;
}
pendingUpdate = FS.pendingUpdates.get(this.path);
if (!pendingUpdate) return;
[pendingFetcher, options] = pendingUpdate;
}
FS.pendingUpdates.delete(this.path);
})(dataFetcher, options);
}
/**
* @param {string | Buffer} data
@ -422,11 +434,16 @@ function getFs(path) {
return new FSPath(path);
}
/**
* [updater, options, throttlePromise]
* @typedef {[(() => string | Buffer)?, Object, Promise<void>?]} PendingUpdate
*/
const FS = Object.assign(getFs, {
FileReadStream,
/**
* @type {Map<string, [Promise, (() => string | Buffer)?, Object]>}
* @type {Map<string, PendingUpdate>}
*/
pendingUpdates: new Map(),
});