Use new Streams for FS streams

This commit is contained in:
Guangcong Luo 2018-01-13 03:59:09 -06:00
parent 4969fb0790
commit df4aeebcd5
5 changed files with 77 additions and 64 deletions

View File

@ -11,6 +11,7 @@ import * as LadderStoreType from './../ladders-remote'
import * as LaddersType from './../ladders'
import * as UsersType from './../users'
import * as PunishmentsType from './../punishments'
import * as StreamsType from './../lib/streams'
declare global {
// modules
@ -20,6 +21,9 @@ declare global {
const Ladders: typeof LaddersType
const LadderStoreT: typeof LadderStoreType
const WriteStream: typeof StreamsType.WriteStream
const ReadStream: typeof StreamsType.ReadStream
// sim
const Battle: typeof BattleType
const ModdedDex: typeof DexType

View File

@ -25,6 +25,8 @@
const pathModule = require('path');
const fs = require('fs');
const Streams = require('./streams');
const WriteStream = Streams.WriteStream;
const ROOT_PATH = pathModule.resolve(__dirname, '..');
@ -216,26 +218,27 @@ class FSPath {
readdirSync() {
return fs.readdirSync(this.path);
}
createReadStream() {
return new FileReadStream(this.path);
}
/**
* @return {NodeJS.WritableStream}
* @return {WriteStream}
*/
createWriteStream(options = {}) {
if (Config.nofswriting) {
const Writable = require('stream').Writable;
return new Writable({write: (chunk, encoding, callback) => callback()});
return new WriteStream({write() {}});
}
return fs.createWriteStream(this.path, options);
return new WriteStream(fs.createWriteStream(this.path, options));
}
/**
* @return {NodeJS.WritableStream}
* @return {WriteStream}
*/
createAppendStream(options = {}) {
if (Config.nofswriting) {
const Writable = require('stream').Writable;
return new Writable({write: (chunk, encoding, callback) => callback()});
return new WriteStream({write() {}});
}
options.flags = options.flags || 'a';
return fs.createWriteStream(this.path, options);
return new WriteStream(fs.createWriteStream(this.path, options));
}
unlinkIfExists() {
if (Config.nofswriting) return Promise.resolve();
@ -344,6 +347,44 @@ class FSPath {
}
}
class FileReadStream extends Streams.ReadStream {
/**
* @param {string} file
*/
constructor(file) {
super();
this.fd = new Promise(resolve => {
fs.open(file, 'r', resolve);
});
this.atEOF = false;
}
_read(size = 0) {
return new Promise((resolve, reject) => {
if (this.atEOF) return resolve(false);
this.ensureCapacity(size);
this.fd.then(fd => {
fs.read(fd, this.buf, this.bufEnd, size, null, (err, bytesRead, buf) => {
if (err) return reject(err);
if (!bytesRead) {
this.atEOF = true;
return resolve(false);
}
this.bufEnd += bytesRead;
// throw new Error([...this.buf].map(x => x.toString(16)).join(' '));
resolve(true);
});
});
});
}
_destroy() {
return /** @type {Promise<void>} */ (new Promise(resolve => {
this.fd.then(fd => {
fs.close(fd, () => resolve());
});
}));
}
}
/**
* @param {string} path
*/
@ -352,6 +393,8 @@ function getFs(path) {
}
const FS = Object.assign(getFs, {
FileReadStream,
/**
* @type {Map<string, [Promise, (() => string | Buffer)?, Object]>}
*/

View File

@ -284,45 +284,6 @@ class ReadStream {
}
}
const fs = require('fs');
class FileReadStream extends ReadStream {
/**
* @param {string} file
*/
constructor(file) {
super();
this.fd = new Promise(resolve => {
fs.open(file, 'r', resolve);
});
this.atEOF = false;
}
_read(size = 0) {
return new Promise((resolve, reject) => {
if (this.atEOF) return resolve(false);
this.ensureCapacity(size);
this.fd.then(fd => {
fs.read(fd, this.buf, this.bufEnd, size, null, (err, bytesRead, buf) => {
if (err) return reject(err);
if (!bytesRead) {
this.atEOF = true;
return resolve(false);
}
this.bufEnd += bytesRead;
// throw new Error([...this.buf].map(x => x.toString(16)).join(' '));
resolve(true);
});
});
});
}
_destroy() {
return /** @type {Promise<void>} */ (new Promise(resolve => {
this.fd.then(fd => {
fs.close(fd, () => resolve());
});
}));
}
}
class WriteStream {
constructor(options = {}) {
this.isReadable = false;
@ -353,9 +314,15 @@ class WriteStream {
});
}
};
options.end = function () {
return new Promise(resolve => {
this.nodeWritableStream.end(() => resolve());
});
};
}
if (options.write) this._write = options.write;
if (options.end) this._end = options.end;
}
/**
* @param {Buffer | string | null} chunk
@ -388,12 +355,18 @@ class WriteStream {
throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`);
}
async _end() {}
async end() {
/**
* @param {string | null} chunk
* @return {Promise<void>}
*/
async end(chunk = null) {
if (chunk) {
await this.write(chunk);
}
return this._end();
}
}
class ReadWriteStream extends ReadStream {
constructor(options = {}) {
super(options);
@ -438,7 +411,6 @@ module.exports = {
WriteStream,
ReadWriteStream,
FileReadStream,
readAll(/** @type {NodeJS.ReadableStream} */ nodeStream, encoding = undefined) {
return new ReadStream(nodeStream).readAll(encoding);
},

View File

@ -60,13 +60,13 @@ class Roomlog {
/**
* undefined = uninitialized,
* null = disabled
* @type {NodeJS.WritableStream? | undefined}
* @type {WriteStream? | undefined}
*/
this.modlogStream = undefined;
/**
* undefined = uninitialized,
* null = disabled
* @type {NodeJS.WritableStream? | undefined}
* @type {WriteStream? | undefined}
*/
this.roomlogStream = undefined;
@ -225,25 +225,19 @@ class Roomlog {
this.modlogStream = null;
}
if (this.modlogStream) {
promises.push(new Promise(resolve => {
// @ts-ignore https://github.com/DefinitelyTyped/DefinitelyTyped/pull/22077
this.modlogStream.end(resolve);
this.modlogStream = null;
}));
promises.push(this.modlogStream.end());
this.modlogStream = null;
}
if (this.roomlogStream) {
promises.push(new Promise(resolve => {
// @ts-ignore https://github.com/DefinitelyTyped/DefinitelyTyped/pull/22077
this.roomlogStream.end(resolve);
this.roomlogStream = null;
}));
promises.push(this.roomlogStream.end());
this.roomlogStream = null;
}
Roomlogs.roomlogs.delete(this.id);
return Promise.all(promises);
}
}
/** @type {Map<string, NodeJS.WritableStream>} */
/** @type {Map<string, WriteStream>} */
const sharedModlogs = new Map();
/** @type {Map<string, Roomlog>} */

View File

@ -453,7 +453,7 @@ class GlobalRoom extends BasicRoom {
} else {
// Prevent there from being two possible hidden classes an instance
// of GlobalRoom can have.
this.ladderIpLog = new (require('stream')).Writable();
this.ladderIpLog = new (require('./lib/streams')).WriteStream({write() {}});
}
let lastBattle;