mirror of
https://github.com/smogon/pokemon-showdown.git
synced 2026-03-21 17:25:10 -05:00
873 lines
25 KiB
TypeScript
873 lines
25 KiB
TypeScript
/**
|
|
* Streams
|
|
* Pokemon Showdown - http://pokemonshowdown.com/
|
|
*
|
|
* The Node.js standard library's Streams are really hard to use. This
|
|
* offers a better stream API.
|
|
*
|
|
* Documented in STREAMS.md.
|
|
*
|
|
* @license MIT
|
|
*/
|
|
|
|
const BUF_SIZE = 65536 * 4;
|
|
|
|
type BufferEncoding =
|
|
'ascii' | 'utf8' | 'utf-8' | 'utf16le' | 'ucs2' | 'ucs-2' | 'base64' | 'latin1' | 'binary' | 'hex';
|
|
|
|
export class ReadStream {
|
|
buf: Buffer;
|
|
bufStart: number;
|
|
bufEnd: number;
|
|
bufCapacity: number;
|
|
readSize: number;
|
|
atEOF: boolean;
|
|
errorBuf: Error[] | null;
|
|
encoding: BufferEncoding;
|
|
isReadable: boolean;
|
|
isWritable: boolean;
|
|
nodeReadableStream: NodeJS.ReadableStream | null;
|
|
nextPushResolver: (() => void) | null;
|
|
nextPush: Promise<void>;
|
|
awaitingPush: boolean;
|
|
|
|
constructor(optionsOrStreamLike: { [k: string]: any } | NodeJS.ReadableStream | string | Buffer = {}) {
|
|
this.buf = Buffer.allocUnsafe(BUF_SIZE);
|
|
this.bufStart = 0;
|
|
this.bufEnd = 0;
|
|
this.bufCapacity = BUF_SIZE;
|
|
this.readSize = 0;
|
|
this.atEOF = false;
|
|
this.errorBuf = null;
|
|
this.encoding = 'utf8';
|
|
this.isReadable = true;
|
|
this.isWritable = false;
|
|
this.nodeReadableStream = null;
|
|
this.nextPushResolver = null;
|
|
this.nextPush = new Promise(resolve => {
|
|
this.nextPushResolver = resolve;
|
|
});
|
|
this.awaitingPush = false;
|
|
|
|
let options: { [k: string]: any };
|
|
if (typeof optionsOrStreamLike === 'string') {
|
|
options = { buffer: optionsOrStreamLike };
|
|
} else if (optionsOrStreamLike instanceof Buffer) {
|
|
options = { buffer: optionsOrStreamLike };
|
|
} else if (typeof (optionsOrStreamLike as any)._readableState === 'object') {
|
|
options = { nodeStream: optionsOrStreamLike as NodeJS.ReadableStream };
|
|
} else {
|
|
options = optionsOrStreamLike;
|
|
}
|
|
if (options.nodeStream) {
|
|
const nodeStream: NodeJS.ReadableStream = options.nodeStream;
|
|
this.nodeReadableStream = nodeStream;
|
|
nodeStream.on('data', data => {
|
|
this.push(data);
|
|
});
|
|
nodeStream.on('end', () => {
|
|
this.pushEnd();
|
|
});
|
|
|
|
options.read = function (this: ReadStream, unusedBytes: number) {
|
|
this.nodeReadableStream!.resume();
|
|
};
|
|
|
|
options.pause = function (this: ReadStream, unusedBytes: number) {
|
|
this.nodeReadableStream!.pause();
|
|
};
|
|
}
|
|
|
|
if (options.read) this._read = options.read;
|
|
if (options.pause) this._pause = options.pause;
|
|
if (options.destroy) this._destroy = options.destroy;
|
|
if (options.encoding) this.encoding = options.encoding;
|
|
if (options.buffer !== undefined) {
|
|
this.push(options.buffer);
|
|
this.pushEnd();
|
|
}
|
|
}
|
|
|
|
get bufSize() {
|
|
return this.bufEnd - this.bufStart;
|
|
}
|
|
|
|
moveBuf() {
|
|
if (this.bufStart !== this.bufEnd) {
|
|
this.buf.copy(this.buf, 0, this.bufStart, this.bufEnd);
|
|
}
|
|
this.bufEnd -= this.bufStart;
|
|
this.bufStart = 0;
|
|
}
|
|
|
|
expandBuf(newCapacity = this.bufCapacity * 2) {
|
|
const newBuf = Buffer.allocUnsafe(newCapacity);
|
|
this.buf.copy(newBuf, 0, this.bufStart, this.bufEnd);
|
|
this.bufEnd -= this.bufStart;
|
|
this.bufStart = 0;
|
|
this.bufCapacity = newCapacity;
|
|
this.buf = newBuf;
|
|
}
|
|
|
|
ensureCapacity(additionalCapacity: number) {
|
|
if (this.bufEnd + additionalCapacity <= this.bufCapacity) return;
|
|
const capacity = this.bufEnd - this.bufStart + additionalCapacity;
|
|
if (capacity <= this.bufCapacity) {
|
|
return this.moveBuf();
|
|
}
|
|
let newCapacity = this.bufCapacity * 2;
|
|
while (newCapacity < capacity) newCapacity *= 2;
|
|
this.expandBuf(newCapacity);
|
|
}
|
|
|
|
push(buf: Buffer | string, encoding: BufferEncoding = this.encoding) {
|
|
let size;
|
|
if (this.atEOF) return;
|
|
if (typeof buf === 'string') {
|
|
size = Buffer.byteLength(buf, encoding);
|
|
this.ensureCapacity(size);
|
|
this.buf.write(buf, this.bufEnd);
|
|
} else {
|
|
size = buf.length;
|
|
this.ensureCapacity(size);
|
|
buf.copy(this.buf, this.bufEnd);
|
|
}
|
|
this.bufEnd += size;
|
|
if (this.bufSize > this.readSize && size * 2 < this.bufSize) this._pause();
|
|
this.resolvePush();
|
|
}
|
|
|
|
pushEnd() {
|
|
this.atEOF = true;
|
|
this.resolvePush();
|
|
}
|
|
|
|
pushError(err: Error, recoverable?: boolean) {
|
|
if (!this.errorBuf) this.errorBuf = [];
|
|
this.errorBuf.push(err);
|
|
if (!recoverable) this.atEOF = true;
|
|
this.resolvePush();
|
|
}
|
|
|
|
readError() {
|
|
if (this.errorBuf) {
|
|
const err = this.errorBuf.shift()!;
|
|
if (!this.errorBuf.length) this.errorBuf = null;
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
peekError() {
|
|
if (this.errorBuf) {
|
|
throw this.errorBuf[0];
|
|
}
|
|
}
|
|
|
|
resolvePush() {
|
|
if (!this.nextPushResolver) throw new Error(`Push after end of read stream`);
|
|
this.nextPushResolver();
|
|
if (this.atEOF) {
|
|
this.nextPushResolver = null;
|
|
return;
|
|
}
|
|
this.nextPush = new Promise(resolve => {
|
|
this.nextPushResolver = resolve;
|
|
});
|
|
}
|
|
|
|
_read(size = 0): void | Promise<void> {
|
|
throw new Error(`ReadStream needs to be subclassed and the _read function needs to be implemented.`);
|
|
}
|
|
|
|
_destroy(): void | Promise<void> {}
|
|
_pause() {}
|
|
|
|
/**
|
|
* Reads until the internal buffer is non-empty. Does nothing if the
|
|
* internal buffer is already non-empty.
|
|
*
|
|
* If `byteCount` is a number, instead read until the internal buffer
|
|
* contains at least `byteCount` bytes.
|
|
*
|
|
* If `byteCount` is `true`, reads even if the internal buffer is
|
|
* non-empty.
|
|
*/
|
|
loadIntoBuffer(byteCount: number | null | true = null, readError?: boolean) {
|
|
this[readError ? 'readError' : 'peekError']();
|
|
if (byteCount === 0) return;
|
|
this.readSize = Math.max(
|
|
byteCount === true ? this.bufSize + 1 : byteCount === null ? 1 : byteCount,
|
|
this.readSize
|
|
);
|
|
if (!this.errorBuf && !this.atEOF && this.bufSize < this.readSize) {
|
|
let bytes: number | null = this.readSize - this.bufSize;
|
|
if (bytes === Infinity || byteCount === null || byteCount === true) bytes = null;
|
|
return this.doLoad(bytes, readError);
|
|
}
|
|
}
|
|
|
|
async doLoad(chunkSize?: number | null, readError?: boolean) {
|
|
while (!this.errorBuf && !this.atEOF && this.bufSize < this.readSize) {
|
|
if (chunkSize) void this._read(chunkSize);
|
|
else void this._read();
|
|
await this.nextPush;
|
|
this[readError ? 'readError' : 'peekError']();
|
|
}
|
|
}
|
|
|
|
peek(byteCount?: number | null, encoding?: BufferEncoding): string | null | Promise<string | null>;
|
|
peek(encoding: BufferEncoding): string | null | Promise<string | null>;
|
|
peek(byteCount: number | string | null = null, encoding: BufferEncoding = this.encoding) {
|
|
if (typeof byteCount === 'string') {
|
|
encoding = byteCount as BufferEncoding;
|
|
byteCount = null;
|
|
}
|
|
const maybeLoad = this.loadIntoBuffer(byteCount);
|
|
if (maybeLoad) return maybeLoad.then(() => this.peek(byteCount as number, encoding));
|
|
|
|
if (!this.bufSize && byteCount !== 0) return null;
|
|
if (byteCount === null) return this.buf.toString(encoding, this.bufStart, this.bufEnd);
|
|
if (byteCount > this.bufSize) byteCount = this.bufSize;
|
|
return this.buf.toString(encoding, this.bufStart, this.bufStart + byteCount);
|
|
}
|
|
|
|
peekBuffer(byteCount: number | null = null): Buffer | null | Promise<Buffer | null> {
|
|
const maybeLoad = this.loadIntoBuffer(byteCount);
|
|
if (maybeLoad) return maybeLoad.then(() => this.peekBuffer(byteCount));
|
|
|
|
if (!this.bufSize && byteCount !== 0) return null;
|
|
if (byteCount === null) return this.buf.slice(this.bufStart, this.bufEnd);
|
|
if (byteCount > this.bufSize) byteCount = this.bufSize;
|
|
return this.buf.slice(this.bufStart, this.bufStart + byteCount);
|
|
}
|
|
|
|
async read(byteCount?: number | null, encoding?: BufferEncoding): Promise<string | null>;
|
|
async read(encoding: BufferEncoding): Promise<string | null>;
|
|
async read(byteCount: number | string | null = null, encoding: BufferEncoding = this.encoding) {
|
|
if (typeof byteCount === 'string') {
|
|
encoding = byteCount as BufferEncoding;
|
|
byteCount = null;
|
|
}
|
|
await this.loadIntoBuffer(byteCount, true);
|
|
|
|
// This MUST NOT be awaited: we MUST synchronously clear byteCount after peeking
|
|
// if the buffer is written to after peek but before clearing the buffer, the write
|
|
// will be lost forever
|
|
const out = this.peek(byteCount, encoding);
|
|
if (out && typeof out !== 'string') {
|
|
throw new Error("Race condition; you must not read before a previous read has completed");
|
|
}
|
|
|
|
if (byteCount === null || byteCount >= this.bufSize) {
|
|
this.bufStart = 0;
|
|
this.bufEnd = 0;
|
|
this.readSize = 0;
|
|
} else {
|
|
this.bufStart += byteCount;
|
|
this.readSize -= byteCount;
|
|
}
|
|
return out;
|
|
}
|
|
|
|
byChunk(byteCount?: number | null) {
|
|
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
|
const byteStream = this;
|
|
return new ObjectReadStream<string>({
|
|
async read(this: ObjectReadStream<string>) {
|
|
const next = await byteStream.read(byteCount);
|
|
if (typeof next === 'string') this.push(next);
|
|
else this.pushEnd();
|
|
},
|
|
});
|
|
}
|
|
|
|
byLine() {
|
|
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
|
const byteStream = this;
|
|
return new ObjectReadStream<string>({
|
|
async read(this: ObjectReadStream<string>) {
|
|
const next = await byteStream.readLine();
|
|
if (typeof next === 'string') this.push(next);
|
|
else this.pushEnd();
|
|
},
|
|
});
|
|
}
|
|
|
|
delimitedBy(delimiter: string) {
|
|
// eslint-disable-next-line @typescript-eslint/no-this-alias
|
|
const byteStream = this;
|
|
return new ObjectReadStream<string>({
|
|
async read(this: ObjectReadStream<string>) {
|
|
const next = await byteStream.readDelimitedBy(delimiter);
|
|
if (typeof next === 'string') this.push(next);
|
|
else this.pushEnd();
|
|
},
|
|
});
|
|
}
|
|
|
|
async readBuffer(byteCount: number | null = null) {
|
|
await this.loadIntoBuffer(byteCount, true);
|
|
|
|
// This MUST NOT be awaited: we must synchronously clear the buffer after peeking
|
|
// (see `read`)
|
|
const out = this.peekBuffer(byteCount);
|
|
if (out && (out as Promise<unknown>).then) {
|
|
throw new Error("Race condition; you must not read before a previous read has completed");
|
|
}
|
|
|
|
if (byteCount === null || byteCount >= this.bufSize) {
|
|
this.bufStart = 0;
|
|
this.bufEnd = 0;
|
|
} else {
|
|
this.bufStart += byteCount;
|
|
}
|
|
return out;
|
|
}
|
|
|
|
async indexOf(symbol: string, encoding: BufferEncoding = this.encoding) {
|
|
let idx = this.buf.indexOf(symbol, this.bufStart, encoding);
|
|
while (!this.atEOF && (idx >= this.bufEnd || idx < 0)) {
|
|
await this.loadIntoBuffer(true);
|
|
idx = this.buf.indexOf(symbol, this.bufStart, encoding);
|
|
}
|
|
if (idx >= this.bufEnd) return -1;
|
|
return idx - this.bufStart;
|
|
}
|
|
|
|
async readAll(encoding: BufferEncoding = this.encoding) {
|
|
return (await this.read(Infinity, encoding)) || '';
|
|
}
|
|
|
|
peekAll(encoding: BufferEncoding = this.encoding) {
|
|
return this.peek(Infinity, encoding);
|
|
}
|
|
|
|
async readDelimitedBy(symbol: string, encoding: BufferEncoding = this.encoding) {
|
|
if (this.atEOF && !this.bufSize) return null;
|
|
const idx = await this.indexOf(symbol, encoding);
|
|
if (idx < 0) {
|
|
return this.readAll(encoding);
|
|
} else {
|
|
const out = await this.read(idx, encoding);
|
|
this.bufStart += Buffer.byteLength(symbol, 'utf8');
|
|
return out;
|
|
}
|
|
}
|
|
|
|
async readLine(encoding: BufferEncoding = this.encoding) {
|
|
if (!encoding) throw new Error(`readLine must have an encoding`);
|
|
let line = await this.readDelimitedBy('\n', encoding);
|
|
if (line?.endsWith('\r')) line = line.slice(0, -1);
|
|
return line;
|
|
}
|
|
|
|
destroy() {
|
|
this.atEOF = true;
|
|
this.bufStart = 0;
|
|
this.bufEnd = 0;
|
|
if (this.nextPushResolver) this.resolvePush();
|
|
return this._destroy();
|
|
}
|
|
|
|
async next(byteCount: number | null = null) {
|
|
const value = await this.read(byteCount);
|
|
return { value, done: value === null } as { value: string, done: false } | { value: null, done: true };
|
|
}
|
|
|
|
async pipeTo(outStream: WriteStream, options: { noEnd?: boolean } = {}) {
|
|
let next;
|
|
while ((next = await this.next(), !next.done)) {
|
|
await outStream.write(next.value);
|
|
}
|
|
if (!options.noEnd) return outStream.writeEnd();
|
|
}
|
|
}
|
|
|
|
interface WriteStreamOptions {
|
|
nodeStream?: NodeJS.WritableStream;
|
|
write?: (this: WriteStream, data: string | Buffer) => (Promise<undefined> | undefined | void);
|
|
writeEnd?: (this: WriteStream) => Promise<any>;
|
|
}
|
|
|
|
export class WriteStream {
|
|
isReadable: boolean;
|
|
isWritable: true;
|
|
encoding: BufferEncoding;
|
|
nodeWritableStream: NodeJS.WritableStream | null;
|
|
drainListeners: (() => void)[];
|
|
|
|
constructor(optionsOrStream: WriteStreamOptions | NodeJS.WritableStream = {}) {
|
|
this.isReadable = false;
|
|
this.isWritable = true;
|
|
this.encoding = 'utf8';
|
|
this.nodeWritableStream = null;
|
|
this.drainListeners = [];
|
|
|
|
let options: WriteStreamOptions = optionsOrStream as any;
|
|
if ((options as any)._writableState) {
|
|
options = { nodeStream: optionsOrStream as NodeJS.WritableStream };
|
|
}
|
|
if (options.nodeStream) {
|
|
const nodeStream: NodeJS.WritableStream = options.nodeStream;
|
|
this.nodeWritableStream = nodeStream;
|
|
options.write = function (data: string | Buffer) {
|
|
const result = this.nodeWritableStream!.write(data);
|
|
if (result !== false) return undefined;
|
|
if (!this.drainListeners.length) {
|
|
this.nodeWritableStream!.once('drain', () => {
|
|
for (const listener of this.drainListeners) listener();
|
|
this.drainListeners = [];
|
|
});
|
|
}
|
|
return new Promise(resolve => {
|
|
// `as () => void` is necessary because TypeScript thinks that it should be a function
|
|
// that takes an undefined value as its only parameter: `(value: PromiseLike<undefined> | undefined) => void`
|
|
this.drainListeners.push(resolve as () => void);
|
|
});
|
|
};
|
|
// Prior to Node v10.12.0, attempting to close STDOUT or STDERR will throw
|
|
if (nodeStream !== process.stdout && nodeStream !== process.stderr) {
|
|
options.writeEnd = function () {
|
|
return new Promise<void>(resolve => {
|
|
this.nodeWritableStream!.end(() => resolve());
|
|
});
|
|
};
|
|
}
|
|
}
|
|
|
|
if (options.write) this._write = options.write;
|
|
if (options.writeEnd) this._writeEnd = options.writeEnd;
|
|
}
|
|
|
|
write(chunk: Buffer | string): void | Promise<void> {
|
|
return this._write(chunk);
|
|
}
|
|
|
|
writeLine(chunk: string): void | Promise<void> {
|
|
if (chunk === null) {
|
|
return this.writeEnd();
|
|
}
|
|
return this.write(chunk + '\n');
|
|
}
|
|
|
|
_write(chunk: Buffer | string): void | Promise<void> {
|
|
throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`);
|
|
}
|
|
|
|
_writeEnd(): void | Promise<void> {}
|
|
|
|
async writeEnd(chunk?: string): Promise<void> {
|
|
if (chunk) {
|
|
await this.write(chunk);
|
|
}
|
|
return this._writeEnd();
|
|
}
|
|
}
|
|
|
|
export class ReadWriteStream extends ReadStream implements WriteStream {
|
|
override isReadable: true;
|
|
override isWritable: true;
|
|
nodeWritableStream: NodeJS.WritableStream | null;
|
|
drainListeners: (() => void)[];
|
|
|
|
constructor(options: AnyObject = {}) {
|
|
super(options);
|
|
this.isReadable = true;
|
|
this.isWritable = true;
|
|
this.nodeWritableStream = null;
|
|
this.drainListeners = [];
|
|
|
|
if (options.nodeStream) {
|
|
const nodeStream: NodeJS.WritableStream = options.nodeStream;
|
|
this.nodeWritableStream = nodeStream;
|
|
options.write = function (data: string | Buffer) {
|
|
const result = this.nodeWritableStream.write(data);
|
|
if (result !== false) return undefined;
|
|
if (!this.drainListeners.length) {
|
|
this.nodeWritableStream.once('drain', () => {
|
|
for (const listener of this.drainListeners) listener();
|
|
this.drainListeners = [];
|
|
});
|
|
}
|
|
return new Promise(resolve => {
|
|
this.drainListeners.push(resolve);
|
|
});
|
|
};
|
|
// Prior to Node v10.12.0, attempting to close STDOUT or STDERR will throw
|
|
if (nodeStream !== process.stdout && nodeStream !== process.stderr) {
|
|
options.writeEnd = function () {
|
|
return new Promise<void>(resolve => {
|
|
this.nodeWritableStream.end(() => resolve());
|
|
});
|
|
};
|
|
}
|
|
}
|
|
if (options.write) this._write = options.write;
|
|
if (options.writeEnd) this._writeEnd = options.writeEnd;
|
|
}
|
|
|
|
write(chunk: Buffer | string): Promise<void> | void {
|
|
return this._write(chunk);
|
|
}
|
|
|
|
writeLine(chunk: string): Promise<void> | void {
|
|
return this.write(chunk + '\n');
|
|
}
|
|
|
|
_write(chunk: Buffer | string): Promise<void> | void {
|
|
throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`);
|
|
}
|
|
|
|
/**
|
|
* In a ReadWriteStream, `_read` does not need to be implemented,
|
|
* because it's valid for the read stream buffer to be filled only by
|
|
* `_write`.
|
|
*/
|
|
override _read(size?: number) {}
|
|
|
|
_writeEnd(): void | Promise<void> {}
|
|
|
|
async writeEnd() {
|
|
return this._writeEnd();
|
|
}
|
|
}
|
|
|
|
type ObjectReadStreamOptions<T> = {
|
|
buffer?: T[],
|
|
read?: (this: ObjectReadStream<T>) => void | Promise<void>,
|
|
pause?: (this: ObjectReadStream<T>) => void | Promise<void>,
|
|
destroy?: (this: ObjectReadStream<T>) => void | Promise<void>,
|
|
nodeStream?: undefined,
|
|
} | {
|
|
buffer?: undefined,
|
|
read?: undefined,
|
|
pause?: undefined,
|
|
destroy?: undefined,
|
|
nodeStream: NodeJS.ReadableStream,
|
|
};
|
|
|
|
export class ObjectReadStream<T> {
|
|
buf: T[];
|
|
readSize: number;
|
|
atEOF: boolean;
|
|
errorBuf: Error[] | null;
|
|
isReadable: boolean;
|
|
isWritable: boolean;
|
|
nodeReadableStream: NodeJS.ReadableStream | null;
|
|
nextPushResolver: (() => void) | null;
|
|
nextPush: Promise<void>;
|
|
awaitingPush: boolean;
|
|
|
|
constructor(optionsOrStreamLike: ObjectReadStreamOptions<T> | NodeJS.ReadableStream | T[] = {}) {
|
|
this.buf = [];
|
|
this.readSize = 0;
|
|
this.atEOF = false;
|
|
this.errorBuf = null;
|
|
this.isReadable = true;
|
|
this.isWritable = false;
|
|
this.nodeReadableStream = null;
|
|
this.nextPushResolver = null;
|
|
this.nextPush = new Promise(resolve => {
|
|
this.nextPushResolver = resolve;
|
|
});
|
|
this.awaitingPush = false;
|
|
|
|
let options: ObjectReadStreamOptions<T>;
|
|
if (Array.isArray(optionsOrStreamLike)) {
|
|
options = { buffer: optionsOrStreamLike };
|
|
} else if (typeof (optionsOrStreamLike as any)._readableState === 'object') {
|
|
options = { nodeStream: optionsOrStreamLike as NodeJS.ReadableStream };
|
|
} else {
|
|
options = optionsOrStreamLike as ObjectReadStreamOptions<T>;
|
|
}
|
|
if ((options as any).nodeStream) {
|
|
const nodeStream: NodeJS.ReadableStream = (options as any).nodeStream;
|
|
this.nodeReadableStream = nodeStream;
|
|
nodeStream.on('data', data => {
|
|
this.push(data);
|
|
});
|
|
nodeStream.on('end', () => {
|
|
this.pushEnd();
|
|
});
|
|
|
|
options = {
|
|
read() {
|
|
this.nodeReadableStream!.resume();
|
|
},
|
|
pause() {
|
|
this.nodeReadableStream!.pause();
|
|
},
|
|
};
|
|
}
|
|
|
|
if (options.read) this._read = options.read;
|
|
if (options.pause) this._pause = options.pause;
|
|
if (options.destroy) this._destroy = options.destroy;
|
|
if (options.buffer !== undefined) {
|
|
this.buf = options.buffer.slice();
|
|
this.pushEnd();
|
|
}
|
|
}
|
|
|
|
push(elem: T) {
|
|
if (this.atEOF) return;
|
|
this.buf.push(elem);
|
|
if (this.buf.length > this.readSize && this.buf.length >= 16) void this._pause();
|
|
this.resolvePush();
|
|
}
|
|
|
|
pushEnd() {
|
|
this.atEOF = true;
|
|
this.resolvePush();
|
|
}
|
|
|
|
pushError(err: Error, recoverable?: boolean) {
|
|
if (!this.errorBuf) this.errorBuf = [];
|
|
this.errorBuf.push(err);
|
|
if (!recoverable) this.atEOF = true;
|
|
this.resolvePush();
|
|
}
|
|
|
|
readError() {
|
|
if (this.errorBuf) {
|
|
const err = this.errorBuf.shift()!;
|
|
if (!this.errorBuf.length) this.errorBuf = null;
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
peekError() {
|
|
if (this.errorBuf) {
|
|
throw this.errorBuf[0];
|
|
}
|
|
}
|
|
|
|
resolvePush() {
|
|
if (!this.nextPushResolver) throw new Error(`Push after end of read stream`);
|
|
this.nextPushResolver();
|
|
if (this.atEOF) {
|
|
this.nextPushResolver = null;
|
|
return;
|
|
}
|
|
this.nextPush = new Promise(resolve => {
|
|
this.nextPushResolver = resolve;
|
|
});
|
|
}
|
|
|
|
_read(size = 0): void | Promise<void> {
|
|
throw new Error(`ReadStream needs to be subclassed and the _read function needs to be implemented.`);
|
|
}
|
|
|
|
_destroy(): void | Promise<void> {}
|
|
_pause(): void | Promise<void> {}
|
|
|
|
async loadIntoBuffer(count: number | true = 1, readError?: boolean) {
|
|
this[readError ? 'readError' : 'peekError']();
|
|
if (count === true) count = this.buf.length + 1;
|
|
if (this.buf.length >= count) return;
|
|
this.readSize = Math.max(count, this.readSize);
|
|
while (!this.errorBuf && !this.atEOF && this.buf.length < this.readSize) {
|
|
const readResult = this._read();
|
|
if (readResult) {
|
|
await readResult;
|
|
} else {
|
|
await this.nextPush;
|
|
}
|
|
this[readError ? 'readError' : 'peekError']();
|
|
}
|
|
}
|
|
|
|
async peek() {
|
|
if (this.buf.length) return this.buf[0];
|
|
await this.loadIntoBuffer();
|
|
return this.buf[0];
|
|
}
|
|
|
|
async read() {
|
|
if (this.buf.length) return this.buf.shift();
|
|
await this.loadIntoBuffer(1, true);
|
|
if (!this.buf.length) return null;
|
|
return this.buf.shift()!;
|
|
}
|
|
|
|
async peekArray(count: number | null = null) {
|
|
await this.loadIntoBuffer(count === null ? 1 : count);
|
|
return this.buf.slice(0, count === null ? Infinity : count);
|
|
}
|
|
|
|
async readArray(count: number | null = null) {
|
|
await this.loadIntoBuffer(count === null ? 1 : count, true);
|
|
const out = this.buf.slice(0, count === null ? Infinity : count);
|
|
this.buf = this.buf.slice(out.length);
|
|
return out;
|
|
}
|
|
|
|
async readAll() {
|
|
await this.loadIntoBuffer(Infinity, true);
|
|
const out = this.buf;
|
|
this.buf = [];
|
|
return out;
|
|
}
|
|
|
|
async peekAll() {
|
|
await this.loadIntoBuffer(Infinity);
|
|
return this.buf.slice();
|
|
}
|
|
|
|
destroy() {
|
|
this.atEOF = true;
|
|
this.buf = [];
|
|
this.resolvePush();
|
|
return this._destroy();
|
|
}
|
|
|
|
[Symbol.asyncIterator]() { return this; }
|
|
async next() {
|
|
if (this.buf.length) return { value: this.buf.shift() as T, done: false as const };
|
|
await this.loadIntoBuffer(1, true);
|
|
if (!this.buf.length) return { value: undefined, done: true as const };
|
|
return { value: this.buf.shift() as T, done: false as const };
|
|
}
|
|
|
|
async pipeTo(outStream: ObjectWriteStream<T>, options: { noEnd?: boolean } = {}) {
|
|
let next;
|
|
while ((next = await this.next(), !next.done)) {
|
|
await outStream.write(next.value);
|
|
}
|
|
if (!options.noEnd) return outStream.writeEnd();
|
|
}
|
|
}
|
|
|
|
interface ObjectWriteStreamOptions<T> {
|
|
_writableState?: any;
|
|
nodeStream?: NodeJS.WritableStream;
|
|
write?: (this: ObjectWriteStream<T>, data: T) => Promise<any> | undefined;
|
|
writeEnd?: (this: ObjectWriteStream<T>) => Promise<any>;
|
|
}
|
|
|
|
export class ObjectWriteStream<T> {
|
|
isReadable: boolean;
|
|
isWritable: true;
|
|
nodeWritableStream: NodeJS.WritableStream | null;
|
|
|
|
constructor(optionsOrStream: ObjectWriteStreamOptions<T> | NodeJS.WritableStream = {}) {
|
|
this.isReadable = false;
|
|
this.isWritable = true;
|
|
this.nodeWritableStream = null;
|
|
|
|
let options: ObjectWriteStreamOptions<T> = optionsOrStream as any;
|
|
if (options._writableState) {
|
|
options = { nodeStream: optionsOrStream as NodeJS.WritableStream };
|
|
}
|
|
if (options.nodeStream) {
|
|
const nodeStream: NodeJS.WritableStream = options.nodeStream;
|
|
this.nodeWritableStream = nodeStream;
|
|
|
|
options.write = function (data: T) {
|
|
const result = this.nodeWritableStream!.write(data as unknown as string);
|
|
if (result === false) {
|
|
return new Promise<void>(resolve => {
|
|
this.nodeWritableStream!.once('drain', () => {
|
|
resolve();
|
|
});
|
|
});
|
|
}
|
|
};
|
|
|
|
// Prior to Node v10.12.0, attempting to close STDOUT or STDERR will throw
|
|
if (nodeStream !== process.stdout && nodeStream !== process.stderr) {
|
|
options.writeEnd = function () {
|
|
return new Promise<void>(resolve => {
|
|
this.nodeWritableStream!.end(() => resolve());
|
|
});
|
|
};
|
|
}
|
|
}
|
|
|
|
if (options.write) this._write = options.write;
|
|
if (options.writeEnd) this._writeEnd = options.writeEnd;
|
|
}
|
|
|
|
write(elem: T | null): void | Promise<void> {
|
|
if (elem === null) {
|
|
return this.writeEnd();
|
|
}
|
|
return this._write(elem);
|
|
}
|
|
|
|
_write(elem: T): void | Promise<void> {
|
|
throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`);
|
|
}
|
|
|
|
_writeEnd(): void | Promise<void> {}
|
|
|
|
async writeEnd(elem?: T): Promise<void> {
|
|
if (elem !== undefined) {
|
|
await this.write(elem);
|
|
}
|
|
return this._writeEnd();
|
|
}
|
|
}
|
|
|
|
interface ObjectReadWriteStreamOptions<T> {
|
|
read?: (this: ObjectReadStream<T>) => void | Promise<void>;
|
|
pause?: (this: ObjectReadStream<T>) => void | Promise<void>;
|
|
destroy?: (this: ObjectReadStream<T>) => void | Promise<void>;
|
|
write?: (this: ObjectWriteStream<T>, elem: T) => Promise<any> | undefined | void;
|
|
writeEnd?: () => Promise<any> | undefined | void;
|
|
}
|
|
|
|
export class ObjectReadWriteStream<T> extends ObjectReadStream<T> implements ObjectWriteStream<T> {
|
|
override isReadable: true;
|
|
override isWritable: true;
|
|
nodeWritableStream: NodeJS.WritableStream | null;
|
|
|
|
constructor(options: ObjectReadWriteStreamOptions<T> = {}) {
|
|
super(options);
|
|
this.isReadable = true;
|
|
this.isWritable = true;
|
|
this.nodeWritableStream = null;
|
|
if (options.write) this._write = options.write;
|
|
if (options.writeEnd) this._writeEnd = options.writeEnd;
|
|
}
|
|
|
|
write(elem: T): void | Promise<void> {
|
|
return this._write(elem);
|
|
}
|
|
|
|
_write(elem: T): void | Promise<void> {
|
|
throw new Error(`WriteStream needs to be subclassed and the _write function needs to be implemented.`);
|
|
}
|
|
/** In a ReadWriteStream, _read does not need to be implemented. */
|
|
override _read() {}
|
|
|
|
_writeEnd(): void | Promise<void> {}
|
|
|
|
async writeEnd() {
|
|
return this._writeEnd();
|
|
}
|
|
}
|
|
|
|
export function readAll(nodeStream: NodeJS.ReadableStream, encoding?: any) {
|
|
return new ReadStream(nodeStream).readAll(encoding);
|
|
}
|
|
|
|
export function stdin() {
|
|
return new ReadStream(process.stdin);
|
|
}
|
|
|
|
export function stdout() {
|
|
return new WriteStream(process.stdout);
|
|
}
|
|
|
|
export function stdpipe(stream: WriteStream | ReadStream | ReadWriteStream) {
|
|
const promises = [];
|
|
if ((stream as ReadStream | WriteStream & { pipeTo: undefined }).pipeTo) {
|
|
promises.push((stream as ReadStream).pipeTo(stdout()));
|
|
}
|
|
if ((stream as WriteStream | ReadStream & { write: undefined }).write) {
|
|
promises.push(stdin().pipeTo(stream as WriteStream));
|
|
}
|
|
return Promise.all(promises);
|
|
}
|