mirror of
https://github.com/smogon/pokemon-showdown.git
synced 2026-03-21 17:25:10 -05:00
782 lines
22 KiB
TypeScript
782 lines
22 KiB
TypeScript
/**
|
|
* Process Manager
|
|
* Pokemon Showdown - http://pokemonshowdown.com/
|
|
*
|
|
* This file abstract out multiprocess logic involved in several tasks.
|
|
*
|
|
* Child processes can be queried.
|
|
*
|
|
* @license MIT
|
|
*/
|
|
|
|
import * as child_process from 'child_process';
|
|
import * as cluster from 'cluster';
|
|
import * as path from 'path';
|
|
import * as Streams from './streams';
|
|
import { FS } from './fs';
|
|
|
|
type ChildProcess = child_process.ChildProcess;
|
|
type Worker = cluster.Worker;
|
|
|
|
export const processManagers: ProcessManager[] = [];
|
|
|
|
export function exec(
|
|
args: string, execOptions?: child_process.ExecOptions
|
|
): Promise<{ stderr: string, stdout: string }>;
|
|
export function exec(
|
|
args: [string, ...string[]], execOptions?: child_process.ExecFileOptions
|
|
): Promise<{ stderr: string, stdout: string }>;
|
|
export function exec(args: string | string[], execOptions?: AnyObject) {
|
|
if (Array.isArray(args)) {
|
|
const cmd = args.shift();
|
|
if (!cmd) throw new Error(`You must pass a command to ProcessManager.exec.`);
|
|
return new Promise<{ stderr: string, stdout: string }>((resolve, reject) => {
|
|
child_process.execFile(cmd, args, execOptions, (err, stdout, stderr) => {
|
|
if (err) reject(err);
|
|
if (typeof stdout !== 'string') stdout = stdout.toString();
|
|
if (typeof stderr !== 'string') stderr = stderr.toString();
|
|
resolve({ stdout, stderr });
|
|
});
|
|
});
|
|
} else {
|
|
return new Promise<string>((resolve, reject) => {
|
|
child_process.exec(args, execOptions, (error, stdout, stderr) => {
|
|
if (error) reject(error);
|
|
if (typeof stdout !== 'string') stdout = stdout.toString();
|
|
resolve(stdout);
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
class SubprocessStream extends Streams.ObjectReadWriteStream<string> {
|
|
process: StreamProcessWrapper;
|
|
taskId: number;
|
|
constructor(process: StreamProcessWrapper, taskId: number) {
|
|
super();
|
|
this.process = process;
|
|
this.taskId = taskId;
|
|
this.process.process.send(`${taskId}\nNEW`);
|
|
}
|
|
override _write(message: string) {
|
|
if (!this.process.process.connected) {
|
|
this.pushError(new Error(`Process disconnected (possibly crashed?)`));
|
|
return;
|
|
}
|
|
this.process.process.send(`${this.taskId}\nWRITE\n${message}`);
|
|
// responses are handled in ProcessWrapper
|
|
}
|
|
override _writeEnd() {
|
|
this.process.process.send(`${this.taskId}\nWRITEEND`);
|
|
}
|
|
override _destroy() {
|
|
if (!this.process.process.connected) return;
|
|
this.process.process.send(`${this.taskId}\nDESTROY`);
|
|
this.process.deleteStream(this.taskId);
|
|
this.process = null!;
|
|
}
|
|
}
|
|
|
|
class RawSubprocessStream extends Streams.ObjectReadWriteStream<string> {
|
|
process: RawProcessWrapper;
|
|
constructor(process: RawProcessWrapper) {
|
|
super();
|
|
this.process = process;
|
|
}
|
|
override _write(message: string) {
|
|
if (!this.process.getProcess().connected) {
|
|
// no error because the crash handler should already have shown an error, and
|
|
// sometimes harmless messages are sent during cleanup
|
|
return;
|
|
}
|
|
this.process.process.send(message);
|
|
// responses are handled in ProcessWrapper
|
|
}
|
|
}
|
|
|
|
export interface ProcessWrapper {
|
|
getLoad: () => number;
|
|
process: ChildProcess | Worker;
|
|
release: () => Promise<void>;
|
|
getProcess: () => ChildProcess;
|
|
}
|
|
|
|
/** Wraps the process object in the PARENT process. */
|
|
export class QueryProcessWrapper<T, U> implements ProcessWrapper {
|
|
process: ChildProcess;
|
|
taskId: number;
|
|
pendingTasks: Map<number, (resp: U) => void>;
|
|
messageCallback: ((message: string) => any) | null;
|
|
pendingRelease: Promise<void> | null;
|
|
resolveRelease: (() => void) | null;
|
|
debug?: string;
|
|
file: string;
|
|
|
|
constructor(file: string, messageCallback?: (message: string) => any) {
|
|
this.process = child_process.fork(file, [], { cwd: FS.ROOT_PATH });
|
|
this.taskId = 0;
|
|
this.file = file;
|
|
this.pendingTasks = new Map();
|
|
this.pendingRelease = null;
|
|
this.resolveRelease = null;
|
|
this.messageCallback = messageCallback || null;
|
|
|
|
this.process.on('message', (message: string) => {
|
|
if (message.startsWith('THROW\n')) {
|
|
const error = new Error();
|
|
error.stack = message.slice(6);
|
|
throw error;
|
|
}
|
|
|
|
if (message.startsWith('DEBUG\n')) {
|
|
this.debug = message.slice(6);
|
|
return;
|
|
}
|
|
|
|
if (this.messageCallback && message.startsWith(`CALLBACK\n`)) {
|
|
this.messageCallback(message.slice(9));
|
|
return;
|
|
}
|
|
|
|
const nlLoc = message.indexOf('\n');
|
|
if (nlLoc <= 0) throw new Error(`Invalid response ${message}`);
|
|
const taskId = parseInt(message.slice(0, nlLoc));
|
|
const resolve = this.pendingTasks.get(taskId);
|
|
if (!resolve) throw new Error(`Invalid taskId ${message.slice(0, nlLoc)}`);
|
|
this.pendingTasks.delete(taskId);
|
|
const resp = this.safeJSON(message.slice(nlLoc + 1));
|
|
resolve(resp);
|
|
|
|
if (this.resolveRelease && !this.getLoad()) this.destroy();
|
|
});
|
|
}
|
|
safeJSON(obj: string): any {
|
|
// special cases? undefined should strictly be fine
|
|
// so let's just return it since we can't parse it
|
|
if (obj === "undefined") {
|
|
return undefined;
|
|
}
|
|
try {
|
|
return JSON.parse(obj);
|
|
} catch (e: any) {
|
|
// this is in the parent, so it should usually exist, but it's possible
|
|
// it's also futureproofing in case other external modfules require this
|
|
// we also specifically do not throw here because this json might be sensitive,
|
|
// so we only want it to go to emails
|
|
(global as any).Monitor?.crashlog?.(e, `a ${path.basename(this.file)} process`, { result: obj });
|
|
return undefined;
|
|
}
|
|
}
|
|
|
|
getProcess() {
|
|
return this.process;
|
|
}
|
|
|
|
getLoad() {
|
|
return this.pendingTasks.size;
|
|
}
|
|
|
|
query(input: T): Promise<U> {
|
|
this.taskId++;
|
|
const taskId = this.taskId;
|
|
this.process.send(`${taskId}\n${JSON.stringify(input)}`);
|
|
return new Promise(resolve => {
|
|
this.pendingTasks.set(taskId, resolve);
|
|
});
|
|
}
|
|
|
|
release(): Promise<void> {
|
|
if (this.pendingRelease) return this.pendingRelease;
|
|
if (!this.getLoad()) {
|
|
this.destroy();
|
|
} else {
|
|
this.pendingRelease = new Promise(resolve => {
|
|
this.resolveRelease = resolve;
|
|
});
|
|
}
|
|
return this.pendingRelease!;
|
|
}
|
|
|
|
destroy() {
|
|
if (this.pendingRelease && !this.resolveRelease) {
|
|
// already destroyed
|
|
return;
|
|
}
|
|
this.process.disconnect();
|
|
for (const resolver of this.pendingTasks.values()) {
|
|
// maybe we should track reject functions too...
|
|
resolver('' as any);
|
|
}
|
|
this.pendingTasks.clear();
|
|
if (this.resolveRelease) {
|
|
this.resolveRelease();
|
|
this.resolveRelease = null;
|
|
} else if (!this.pendingRelease) {
|
|
this.pendingRelease = Promise.resolve();
|
|
}
|
|
}
|
|
}
|
|
|
|
/** Wraps the process object in the PARENT process. */
|
|
export class StreamProcessWrapper implements ProcessWrapper {
|
|
process: ChildProcess;
|
|
taskId = 0;
|
|
activeStreams = new Map<number, SubprocessStream>();
|
|
pendingRelease: Promise<void> | null = null;
|
|
resolveRelease: (() => void) | null = null;
|
|
debug?: string;
|
|
|
|
setDebug(message: string) {
|
|
this.debug = (this.debug || '').slice(-32768) + '\n=====\n' + message;
|
|
}
|
|
messageCallback?: (message: string) => any;
|
|
|
|
constructor(file: string, messageCallback?: (message: string) => any) {
|
|
this.process = child_process.fork(file, [], { cwd: FS.ROOT_PATH });
|
|
this.messageCallback = messageCallback;
|
|
|
|
this.process.on('message', (message: string) => {
|
|
if (message.startsWith('THROW\n')) {
|
|
const error = new Error();
|
|
error.stack = message.slice(6);
|
|
throw error;
|
|
}
|
|
|
|
if (this.messageCallback && message.startsWith(`CALLBACK\n`)) {
|
|
this.messageCallback(message.slice(9));
|
|
return;
|
|
}
|
|
|
|
if (message.startsWith('DEBUG\n')) {
|
|
this.setDebug(message.slice(6));
|
|
return;
|
|
}
|
|
|
|
let nlLoc = message.indexOf('\n');
|
|
if (nlLoc <= 0) throw new Error(`Invalid response ${message}`);
|
|
const taskId = parseInt(message.slice(0, nlLoc));
|
|
const stream = this.activeStreams.get(taskId);
|
|
if (!stream) return; // stream already destroyed
|
|
|
|
message = message.slice(nlLoc + 1);
|
|
nlLoc = message.indexOf('\n');
|
|
if (nlLoc < 0) nlLoc = message.length;
|
|
const messageType = message.slice(0, nlLoc);
|
|
message = message.slice(nlLoc + 1);
|
|
|
|
if (messageType === 'END') {
|
|
stream.pushEnd();
|
|
this.deleteStream(taskId);
|
|
} else if (messageType === 'PUSH') {
|
|
stream.push(message);
|
|
} else if (messageType === 'THROW') {
|
|
const error = new Error();
|
|
error.stack = message;
|
|
stream.pushError(error, true);
|
|
} else {
|
|
throw new Error(`Unrecognized messageType ${messageType}`);
|
|
}
|
|
});
|
|
}
|
|
|
|
getLoad() {
|
|
return this.activeStreams.size;
|
|
}
|
|
|
|
getProcess() {
|
|
return this.process;
|
|
}
|
|
|
|
deleteStream(taskId: number) {
|
|
this.activeStreams.delete(taskId);
|
|
// try to release
|
|
if (this.resolveRelease && !this.getLoad()) void this.destroy();
|
|
}
|
|
|
|
createStream(): SubprocessStream {
|
|
this.taskId++;
|
|
const taskId = this.taskId;
|
|
const stream = new SubprocessStream(this, taskId);
|
|
this.activeStreams.set(taskId, stream);
|
|
return stream;
|
|
}
|
|
|
|
release(): Promise<void> {
|
|
if (this.pendingRelease) return this.pendingRelease;
|
|
if (!this.getLoad()) {
|
|
void this.destroy();
|
|
} else {
|
|
this.pendingRelease = new Promise(resolve => {
|
|
this.resolveRelease = resolve;
|
|
});
|
|
}
|
|
return this.pendingRelease!;
|
|
}
|
|
|
|
destroy() {
|
|
if (this.pendingRelease && !this.resolveRelease) {
|
|
// already destroyed
|
|
return;
|
|
}
|
|
this.process.disconnect();
|
|
const destroyed = [];
|
|
for (const stream of this.activeStreams.values()) {
|
|
destroyed.push(stream.destroy());
|
|
}
|
|
this.activeStreams.clear();
|
|
if (this.resolveRelease) {
|
|
this.resolveRelease();
|
|
this.resolveRelease = null;
|
|
} else if (!this.pendingRelease) {
|
|
this.pendingRelease = Promise.resolve();
|
|
}
|
|
return Promise.all(destroyed);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A container for a RawProcessManager stream. This is usually the
|
|
* RawProcessWrapper, but it can also be a fake RawProcessWrapper if the PM is
|
|
* told to spawn 0 worker processes.
|
|
*/
|
|
export class StreamWorker {
|
|
load = 0;
|
|
workerid = 0;
|
|
stream: Streams.ObjectReadWriteStream<string>;
|
|
constructor(stream: Streams.ObjectReadWriteStream<string>) {
|
|
this.stream = stream;
|
|
}
|
|
}
|
|
|
|
/** Wraps the process object in the PARENT process. */
|
|
export class RawProcessWrapper implements ProcessWrapper, StreamWorker {
|
|
process: ChildProcess & { process: undefined } | Worker;
|
|
taskId = 0;
|
|
stream: RawSubprocessStream;
|
|
pendingRelease: Promise<void> | null = null;
|
|
resolveRelease: (() => void) | null = null;
|
|
debug?: string;
|
|
workerid = 0;
|
|
|
|
/** Not managed by RawProcessWrapper itself */
|
|
load = 0;
|
|
|
|
setDebug(message: string) {
|
|
this.debug = (this.debug || '').slice(-32768) + '\n=====\n' + message;
|
|
}
|
|
|
|
constructor(file: string, isCluster?: boolean, env?: AnyObject) {
|
|
if (isCluster) {
|
|
this.process = cluster.fork(env);
|
|
this.workerid = this.process.id;
|
|
} else {
|
|
this.process = child_process.fork(file, [], { cwd: FS.ROOT_PATH, env }) as any;
|
|
}
|
|
|
|
this.process.on('message', (message: string) => {
|
|
this.stream.push(message);
|
|
});
|
|
|
|
this.stream = new RawSubprocessStream(this);
|
|
}
|
|
|
|
getLoad() {
|
|
return this.load;
|
|
}
|
|
getProcess() {
|
|
return this.process.process ? this.process.process : this.process;
|
|
}
|
|
|
|
release(): Promise<void> {
|
|
if (this.pendingRelease) return this.pendingRelease;
|
|
if (!this.getLoad()) {
|
|
void this.destroy();
|
|
} else {
|
|
this.pendingRelease = new Promise(resolve => {
|
|
this.resolveRelease = resolve;
|
|
});
|
|
}
|
|
return this.pendingRelease!;
|
|
}
|
|
|
|
destroy() {
|
|
if (this.pendingRelease && !this.resolveRelease) {
|
|
// already destroyed
|
|
return;
|
|
}
|
|
void this.stream.destroy();
|
|
this.process.disconnect();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A ProcessManager wraps a query function: A function that takes a
|
|
* string and returns a string or Promise<string>.
|
|
*/
|
|
export abstract class ProcessManager<T extends ProcessWrapper = ProcessWrapper> {
|
|
static disabled = false;
|
|
processes: T[] = [];
|
|
releasingProcesses: T[] = [];
|
|
crashedProcesses: T[] = [];
|
|
readonly filename: string;
|
|
readonly basename: string;
|
|
readonly isParentProcess: boolean;
|
|
crashTime = 0;
|
|
crashRespawnCount = 0;
|
|
|
|
constructor(module: NodeJS.Module) {
|
|
this.filename = module.filename;
|
|
this.basename = path.basename(module.filename);
|
|
this.isParentProcess = (process.mainModule !== module || !process.send);
|
|
|
|
this.listen();
|
|
}
|
|
acquire() {
|
|
if (!this.processes.length) {
|
|
return null;
|
|
}
|
|
let lowestLoad = this.processes[0];
|
|
for (const process of this.processes) {
|
|
if (process.getLoad() < lowestLoad.getLoad()) {
|
|
lowestLoad = process;
|
|
}
|
|
}
|
|
return lowestLoad;
|
|
}
|
|
releaseCrashed(process: T) {
|
|
const index = this.processes.indexOf(process);
|
|
|
|
// The process was shut down sanely, not crashed
|
|
if (index < 0) return;
|
|
|
|
this.processes.splice(index, 1);
|
|
|
|
this.destroyProcess(process);
|
|
void process.release().then(() => {
|
|
const releasingIndex = this.releasingProcesses.indexOf(process);
|
|
if (releasingIndex >= 0) {
|
|
this.releasingProcesses.splice(releasingIndex, 1);
|
|
}
|
|
});
|
|
|
|
const now = Date.now();
|
|
if (this.crashTime && now - this.crashTime > 30 * 60 * 1000) {
|
|
this.crashTime = 0;
|
|
this.crashRespawnCount = 0;
|
|
}
|
|
if (!this.crashTime) this.crashTime = now;
|
|
this.crashRespawnCount += 1;
|
|
// Notify any global crash logger
|
|
void Promise.reject(
|
|
new Error(`Process ${this.basename} ${process.getProcess().pid} crashed and had to be restarted`)
|
|
);
|
|
this.releasingProcesses.push(process);
|
|
this.crashedProcesses.push(process);
|
|
|
|
// only respawn processes if there have been fewer than 5 crashes in 30 minutes
|
|
if (this.crashRespawnCount <= 5) {
|
|
this.spawn(this.processes.length + 1);
|
|
}
|
|
}
|
|
unspawn() {
|
|
return Promise.all([...this.processes].map(
|
|
process => this.unspawnOne(process)
|
|
));
|
|
}
|
|
async unspawnOne(process: T | null) {
|
|
if (!process) return;
|
|
this.destroyProcess(process);
|
|
const processIndex = this.processes.indexOf(process);
|
|
if (processIndex < 0) throw new Error('Process inactive');
|
|
this.processes.splice(this.processes.indexOf(process), 1);
|
|
this.releasingProcesses.push(process);
|
|
|
|
await process.release();
|
|
|
|
const index = this.releasingProcesses.indexOf(process);
|
|
if (index < 0) return; // can happen if process crashed while releasing
|
|
this.releasingProcesses.splice(index, 1);
|
|
}
|
|
spawn(count = 1, force?: boolean) {
|
|
if (!this.isParentProcess) return;
|
|
if (ProcessManager.disabled && !force) return;
|
|
const spawnCount = count - this.processes.length;
|
|
for (let i = 0; i < spawnCount; i++) {
|
|
this.spawnOne(force);
|
|
}
|
|
}
|
|
spawnOne(force?: boolean) {
|
|
if (!this.isParentProcess) throw new Error('Must use in parent process');
|
|
if (ProcessManager.disabled && !force) return null;
|
|
const process = this.createProcess();
|
|
process.process.on('disconnect', () => this.releaseCrashed(process));
|
|
this.processes.push(process);
|
|
return process;
|
|
}
|
|
respawn(count: number | null = null) {
|
|
if (count === null) count = this.processes.length;
|
|
const unspawned = this.unspawn();
|
|
this.spawn(count);
|
|
return unspawned;
|
|
}
|
|
abstract listen(): void;
|
|
abstract createProcess(...args: any): T;
|
|
destroyProcess(process: T) {}
|
|
destroy() {
|
|
const index = processManagers.indexOf(this);
|
|
if (index >= 0) processManagers.splice(index, 1);
|
|
return this.unspawn();
|
|
}
|
|
}
|
|
|
|
export class QueryProcessManager<T = string, U = string> extends ProcessManager<QueryProcessWrapper<T, U>> {
|
|
_query: (input: T) => U | Promise<U>;
|
|
messageCallback?: (message: string) => any;
|
|
timeout: number;
|
|
|
|
/**
|
|
* @param timeout The number of milliseconds to wait before terminating a query. Defaults to 900000 ms (15 minutes).
|
|
*/
|
|
constructor(
|
|
module: NodeJS.Module, query: (input: T) => U | Promise<U>,
|
|
timeout = 15 * 60 * 1000, debugCallback?: (message: string) => any
|
|
) {
|
|
super(module);
|
|
this._query = query;
|
|
this.timeout = timeout;
|
|
this.messageCallback = debugCallback;
|
|
|
|
processManagers.push(this);
|
|
}
|
|
async query(input: T, process = this.acquire()) {
|
|
if (!process) return this._query(input);
|
|
|
|
const timeout = setTimeout(() => {
|
|
const debugInfo = process.debug || "No debug information found.";
|
|
process.destroy();
|
|
this.spawnOne();
|
|
throw new Error(
|
|
`A query originating in ${this.basename} took too long to complete; the process has been respawned.\n${debugInfo}`
|
|
);
|
|
}, this.timeout);
|
|
|
|
const result = await process.query(input);
|
|
|
|
clearTimeout(timeout);
|
|
return result;
|
|
}
|
|
queryTemporaryProcess(input: T, force?: boolean) {
|
|
const process = this.spawnOne(force);
|
|
const result = this.query(input, process);
|
|
void this.unspawnOne(process);
|
|
return result;
|
|
}
|
|
createProcess() {
|
|
return new QueryProcessWrapper<T, U>(this.filename, this.messageCallback);
|
|
}
|
|
listen() {
|
|
if (this.isParentProcess) return;
|
|
// child process
|
|
process.on('message', (message: string) => {
|
|
const nlLoc = message.indexOf('\n');
|
|
if (nlLoc <= 0) throw new Error(`Invalid response ${message}`);
|
|
const taskId = message.slice(0, nlLoc);
|
|
message = message.slice(nlLoc + 1);
|
|
|
|
if (taskId.startsWith('EVAL')) {
|
|
// eslint-disable-next-line no-eval
|
|
process.send!(`${taskId}\n` + eval(message));
|
|
return;
|
|
}
|
|
|
|
void Promise.resolve(this._query(JSON.parse(message))).then(
|
|
response => process.send!(`${taskId}\n${JSON.stringify(response)}`)
|
|
);
|
|
});
|
|
process.on('disconnect', () => {
|
|
process.exit();
|
|
});
|
|
}
|
|
}
|
|
|
|
export class StreamProcessManager extends ProcessManager<StreamProcessWrapper> {
|
|
/* taskid: stream used only in child process */
|
|
activeStreams: Map<string, Streams.ObjectReadWriteStream<string>>;
|
|
_createStream: () => Streams.ObjectReadWriteStream<string>;
|
|
messageCallback?: (message: string) => any;
|
|
|
|
constructor(
|
|
module: NodeJS.Module,
|
|
createStream: () => Streams.ObjectReadWriteStream<string>,
|
|
messageCallback?: (message: string) => any
|
|
) {
|
|
super(module);
|
|
this.activeStreams = new Map();
|
|
this._createStream = createStream;
|
|
this.messageCallback = messageCallback;
|
|
|
|
processManagers.push(this);
|
|
}
|
|
createStream() {
|
|
const process = this.acquire();
|
|
if (!process) return this._createStream();
|
|
return process.createStream();
|
|
}
|
|
createProcess() {
|
|
return new StreamProcessWrapper(this.filename, this.messageCallback);
|
|
}
|
|
async pipeStream(taskId: string, stream: Streams.ObjectReadStream<string>) {
|
|
let done = false;
|
|
while (!done) {
|
|
try {
|
|
let value;
|
|
({ value, done } = await stream.next());
|
|
process.send!(`${taskId}\nPUSH\n${value}`);
|
|
} catch (err: any) {
|
|
process.send!(`${taskId}\nTHROW\n${err.stack}`);
|
|
}
|
|
}
|
|
if (!this.activeStreams.has(taskId)) {
|
|
// stream.destroy() was called, don't send an END message
|
|
return;
|
|
}
|
|
process.send!(`${taskId}\nEND`);
|
|
this.activeStreams.delete(taskId);
|
|
}
|
|
listen() {
|
|
if (this.isParentProcess) return;
|
|
// child process
|
|
process.on('message', (message: string) => {
|
|
let nlLoc = message.indexOf('\n');
|
|
if (nlLoc <= 0) throw new Error(`Invalid request ${message}`);
|
|
const taskId = message.slice(0, nlLoc);
|
|
const stream = this.activeStreams.get(taskId);
|
|
|
|
message = message.slice(nlLoc + 1);
|
|
nlLoc = message.indexOf('\n');
|
|
if (nlLoc < 0) nlLoc = message.length;
|
|
const messageType = message.slice(0, nlLoc);
|
|
message = message.slice(nlLoc + 1);
|
|
|
|
if (taskId.startsWith('EVAL')) {
|
|
// eslint-disable-next-line no-eval
|
|
process.send!(`${taskId}\n` + eval(message));
|
|
return;
|
|
}
|
|
|
|
if (messageType === 'NEW') {
|
|
if (stream) throw new Error(`NEW: taskId ${taskId} already exists`);
|
|
const newStream = this._createStream();
|
|
this.activeStreams.set(taskId, newStream);
|
|
void this.pipeStream(taskId, newStream);
|
|
} else if (messageType === 'DESTROY') {
|
|
if (!stream) throw new Error(`DESTROY: Invalid taskId ${taskId}`);
|
|
void stream.destroy();
|
|
this.activeStreams.delete(taskId);
|
|
} else if (messageType === 'WRITE') {
|
|
if (!stream) throw new Error(`WRITE: Invalid taskId ${taskId}`);
|
|
void stream.write(message);
|
|
} else if (messageType === 'WRITEEND') {
|
|
if (!stream) throw new Error(`WRITEEND: Invalid taskId ${taskId}`);
|
|
void stream.writeEnd();
|
|
} else {
|
|
throw new Error(`Unrecognized messageType ${messageType}`);
|
|
}
|
|
});
|
|
process.on('disconnect', () => {
|
|
process.exit();
|
|
});
|
|
}
|
|
}
|
|
|
|
export class RawProcessManager extends ProcessManager<RawProcessWrapper> {
|
|
/** full list of processes - parent process only */
|
|
workers: StreamWorker[] = [];
|
|
/** if spawning 0 worker processes, the worker is instead stored here in the parent process */
|
|
masterWorker: StreamWorker | null = null;
|
|
/** stream used only in the child process */
|
|
activeStream: Streams.ObjectReadWriteStream<string> | null = null;
|
|
isCluster: boolean;
|
|
spawnSubscription: ((worker: StreamWorker) => void) | null = null;
|
|
unspawnSubscription: ((worker: StreamWorker) => void) | null = null;
|
|
_setupChild: () => Streams.ObjectReadWriteStream<string>;
|
|
/** worker ID of cluster worker - cluster child process only (0 otherwise) */
|
|
readonly workerid = cluster.worker?.id || 0;
|
|
env: AnyObject | undefined;
|
|
|
|
constructor(options: {
|
|
module: NodeJS.Module,
|
|
setupChild: () => Streams.ObjectReadWriteStream<string>,
|
|
isCluster?: boolean,
|
|
env?: AnyObject,
|
|
}) {
|
|
super(options.module);
|
|
this.isCluster = !!options.isCluster;
|
|
this._setupChild = options.setupChild;
|
|
this.env = options.env;
|
|
|
|
if (this.isCluster && this.isParentProcess) {
|
|
cluster.setupMaster({
|
|
exec: this.filename,
|
|
cwd: FS.ROOT_PATH,
|
|
});
|
|
}
|
|
|
|
processManagers.push(this);
|
|
}
|
|
subscribeSpawn(callback: (worker: StreamWorker) => void) {
|
|
this.spawnSubscription = callback;
|
|
}
|
|
subscribeUnspawn(callback: (worker: StreamWorker) => void) {
|
|
this.unspawnSubscription = callback;
|
|
}
|
|
override spawn(count?: number) {
|
|
super.spawn(count);
|
|
if (!this.workers.length) {
|
|
this.masterWorker = new StreamWorker(this._setupChild());
|
|
this.workers.push(this.masterWorker);
|
|
this.spawnSubscription?.(this.masterWorker);
|
|
}
|
|
}
|
|
createProcess() {
|
|
const process = new RawProcessWrapper(this.filename, this.isCluster, this.env);
|
|
this.workers.push(process);
|
|
this.spawnSubscription?.(process);
|
|
return process;
|
|
}
|
|
override destroyProcess(process: RawProcessWrapper) {
|
|
const index = this.workers.indexOf(process);
|
|
if (index >= 0) this.workers.splice(index, 1);
|
|
|
|
this.unspawnSubscription?.(process);
|
|
}
|
|
async pipeStream(stream: Streams.ObjectReadStream<string>) {
|
|
let done = false;
|
|
while (!done) {
|
|
try {
|
|
let value;
|
|
({ value, done } = await stream.next());
|
|
process.send!(value);
|
|
} catch (err: any) {
|
|
process.send!(`THROW\n${err.stack}`);
|
|
}
|
|
}
|
|
}
|
|
listen() {
|
|
if (this.isParentProcess) return;
|
|
|
|
setImmediate(() => {
|
|
this.activeStream = this._setupChild();
|
|
void this.pipeStream(this.activeStream);
|
|
});
|
|
|
|
// child process
|
|
process.on('message', (message: string) => {
|
|
void this.activeStream!.write(message);
|
|
});
|
|
process.on('disconnect', () => {
|
|
process.exit();
|
|
});
|
|
}
|
|
}
|