From df4aeebcd553729302e95b9b366cb41fadc499cc Mon Sep 17 00:00:00 2001 From: Guangcong Luo Date: Sat, 13 Jan 2018 03:59:09 -0600 Subject: [PATCH] Use new Streams for FS streams --- dev-tools/global.d.ts | 4 +++ lib/fs.js | 59 +++++++++++++++++++++++++++++++++++++------ lib/streams.js | 56 ++++++++++------------------------------ roomlogs.js | 20 +++++---------- rooms.js | 2 +- 5 files changed, 77 insertions(+), 64 deletions(-) diff --git a/dev-tools/global.d.ts b/dev-tools/global.d.ts index 12494f03dc..9461de3299 100644 --- a/dev-tools/global.d.ts +++ b/dev-tools/global.d.ts @@ -11,6 +11,7 @@ import * as LadderStoreType from './../ladders-remote' import * as LaddersType from './../ladders' import * as UsersType from './../users' import * as PunishmentsType from './../punishments' +import * as StreamsType from './../lib/streams' declare global { // modules @@ -20,6 +21,9 @@ declare global { const Ladders: typeof LaddersType const LadderStoreT: typeof LadderStoreType + const WriteStream: typeof StreamsType.WriteStream + const ReadStream: typeof StreamsType.ReadStream + // sim const Battle: typeof BattleType const ModdedDex: typeof DexType diff --git a/lib/fs.js b/lib/fs.js index aa0403eedc..2d93f36108 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -25,6 +25,8 @@ const pathModule = require('path'); const fs = require('fs'); +const Streams = require('./streams'); +const WriteStream = Streams.WriteStream; const ROOT_PATH = pathModule.resolve(__dirname, '..'); @@ -216,26 +218,27 @@ class FSPath { readdirSync() { return fs.readdirSync(this.path); } + createReadStream() { + return new FileReadStream(this.path); + } /** - * @return {NodeJS.WritableStream} + * @return {WriteStream} */ createWriteStream(options = {}) { if (Config.nofswriting) { - const Writable = require('stream').Writable; - return new Writable({write: (chunk, encoding, callback) => callback()}); + return new WriteStream({write() {}}); } - return fs.createWriteStream(this.path, options); + return new WriteStream(fs.createWriteStream(this.path, options)); } /** - * @return {NodeJS.WritableStream} + * @return {WriteStream} */ createAppendStream(options = {}) { if (Config.nofswriting) { - const Writable = require('stream').Writable; - return new Writable({write: (chunk, encoding, callback) => callback()}); + return new WriteStream({write() {}}); } options.flags = options.flags || 'a'; - return fs.createWriteStream(this.path, options); + return new WriteStream(fs.createWriteStream(this.path, options)); } unlinkIfExists() { if (Config.nofswriting) return Promise.resolve(); @@ -344,6 +347,44 @@ class FSPath { } } +class FileReadStream extends Streams.ReadStream { + /** + * @param {string} file + */ + constructor(file) { + super(); + this.fd = new Promise(resolve => { + fs.open(file, 'r', resolve); + }); + this.atEOF = false; + } + _read(size = 0) { + return new Promise((resolve, reject) => { + if (this.atEOF) return resolve(false); + this.ensureCapacity(size); + this.fd.then(fd => { + fs.read(fd, this.buf, this.bufEnd, size, null, (err, bytesRead, buf) => { + if (err) return reject(err); + if (!bytesRead) { + this.atEOF = true; + return resolve(false); + } + this.bufEnd += bytesRead; + // throw new Error([...this.buf].map(x => x.toString(16)).join(' ')); + resolve(true); + }); + }); + }); + } + _destroy() { + return /** @type {Promise} */ (new Promise(resolve => { + this.fd.then(fd => { + fs.close(fd, () => resolve()); + }); + })); + } +} + /** * @param {string} path */ @@ -352,6 +393,8 @@ function getFs(path) { } const FS = Object.assign(getFs, { + FileReadStream, + /** * @type {Map string | Buffer)?, Object]>} */ diff --git a/lib/streams.js b/lib/streams.js index 986d6681ed..daf3919721 100644 --- a/lib/streams.js +++ b/lib/streams.js @@ -284,45 +284,6 @@ class ReadStream { } } -const fs = require('fs'); -class FileReadStream extends ReadStream { - /** - * @param {string} file - */ - constructor(file) { - super(); - this.fd = new Promise(resolve => { - fs.open(file, 'r', resolve); - }); - this.atEOF = false; - } - _read(size = 0) { - return new Promise((resolve, reject) => { - if (this.atEOF) return resolve(false); - this.ensureCapacity(size); - this.fd.then(fd => { - fs.read(fd, this.buf, this.bufEnd, size, null, (err, bytesRead, buf) => { - if (err) return reject(err); - if (!bytesRead) { - this.atEOF = true; - return resolve(false); - } - this.bufEnd += bytesRead; - // throw new Error([...this.buf].map(x => x.toString(16)).join(' ')); - resolve(true); - }); - }); - }); - } - _destroy() { - return /** @type {Promise} */ (new Promise(resolve => { - this.fd.then(fd => { - fs.close(fd, () => resolve()); - }); - })); - } -} - class WriteStream { constructor(options = {}) { this.isReadable = false; @@ -353,9 +314,15 @@ class WriteStream { }); } }; + options.end = function () { + return new Promise(resolve => { + this.nodeWritableStream.end(() => resolve()); + }); + }; } if (options.write) this._write = options.write; + if (options.end) this._end = options.end; } /** * @param {Buffer | string | null} chunk @@ -388,12 +355,18 @@ class WriteStream { throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`); } async _end() {} - async end() { + /** + * @param {string | null} chunk + * @return {Promise} + */ + async end(chunk = null) { + if (chunk) { + await this.write(chunk); + } return this._end(); } } - class ReadWriteStream extends ReadStream { constructor(options = {}) { super(options); @@ -438,7 +411,6 @@ module.exports = { WriteStream, ReadWriteStream, - FileReadStream, readAll(/** @type {NodeJS.ReadableStream} */ nodeStream, encoding = undefined) { return new ReadStream(nodeStream).readAll(encoding); }, diff --git a/roomlogs.js b/roomlogs.js index 27fceaa499..a8317992fe 100644 --- a/roomlogs.js +++ b/roomlogs.js @@ -60,13 +60,13 @@ class Roomlog { /** * undefined = uninitialized, * null = disabled - * @type {NodeJS.WritableStream? | undefined} + * @type {WriteStream? | undefined} */ this.modlogStream = undefined; /** * undefined = uninitialized, * null = disabled - * @type {NodeJS.WritableStream? | undefined} + * @type {WriteStream? | undefined} */ this.roomlogStream = undefined; @@ -225,25 +225,19 @@ class Roomlog { this.modlogStream = null; } if (this.modlogStream) { - promises.push(new Promise(resolve => { - // @ts-ignore https://github.com/DefinitelyTyped/DefinitelyTyped/pull/22077 - this.modlogStream.end(resolve); - this.modlogStream = null; - })); + promises.push(this.modlogStream.end()); + this.modlogStream = null; } if (this.roomlogStream) { - promises.push(new Promise(resolve => { - // @ts-ignore https://github.com/DefinitelyTyped/DefinitelyTyped/pull/22077 - this.roomlogStream.end(resolve); - this.roomlogStream = null; - })); + promises.push(this.roomlogStream.end()); + this.roomlogStream = null; } Roomlogs.roomlogs.delete(this.id); return Promise.all(promises); } } -/** @type {Map} */ +/** @type {Map} */ const sharedModlogs = new Map(); /** @type {Map} */ diff --git a/rooms.js b/rooms.js index f748ec532c..0db19ee4c2 100644 --- a/rooms.js +++ b/rooms.js @@ -453,7 +453,7 @@ class GlobalRoom extends BasicRoom { } else { // Prevent there from being two possible hidden classes an instance // of GlobalRoom can have. - this.ladderIpLog = new (require('stream')).Writable(); + this.ladderIpLog = new (require('./lib/streams')).WriteStream({write() {}}); } let lastBattle;