|
|
|
|
@ -12,10 +12,13 @@
|
|
|
|
|
import * as child_process from 'child_process';
|
|
|
|
|
import * as cluster from 'cluster';
|
|
|
|
|
import * as path from 'path';
|
|
|
|
|
import * as worker_threads from 'worker_threads';
|
|
|
|
|
|
|
|
|
|
import * as Streams from './streams';
|
|
|
|
|
|
|
|
|
|
type ChildProcess = child_process.ChildProcess;
|
|
|
|
|
type Worker = cluster.Worker;
|
|
|
|
|
/** means Web Worker in a browser context, so automatically wins over cluster workers for the top-level namespace */
|
|
|
|
|
const Worker = worker_threads.Worker;
|
|
|
|
|
|
|
|
|
|
const ROOT_DIR = path.resolve(__dirname, '..');
|
|
|
|
|
|
|
|
|
|
@ -29,22 +32,22 @@ class SubprocessStream extends Streams.ObjectReadWriteStream<string> {
|
|
|
|
|
super();
|
|
|
|
|
this.process = process;
|
|
|
|
|
this.taskId = taskId;
|
|
|
|
|
this.process.process.send(`${taskId}\nNEW`);
|
|
|
|
|
this.process._send(`${taskId}\nNEW`);
|
|
|
|
|
}
|
|
|
|
|
_write(message: string) {
|
|
|
|
|
if (!this.process.process.connected) {
|
|
|
|
|
if (!this.process.isConnected) {
|
|
|
|
|
this.pushError(new Error(`Process disconnected (possibly crashed?)`));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
this.process.process.send(`${this.taskId}\nWRITE\n${message}`);
|
|
|
|
|
this.process._send(`${this.taskId}\nWRITE\n${message}`);
|
|
|
|
|
// responses are handled in ProcessWrapper
|
|
|
|
|
}
|
|
|
|
|
_writeEnd() {
|
|
|
|
|
this.process.process.send(`${this.taskId}\nWRITEEND`);
|
|
|
|
|
this.process._send(`${this.taskId}\nWRITEEND`);
|
|
|
|
|
}
|
|
|
|
|
_destroy() {
|
|
|
|
|
if (!this.process.process.connected) return;
|
|
|
|
|
this.process.process.send(`${this.taskId}\nDESTROY`);
|
|
|
|
|
if (!this.process.isConnected) return;
|
|
|
|
|
this.process._send(`${this.taskId}\nDESTROY`);
|
|
|
|
|
this.process.deleteStream(this.taskId);
|
|
|
|
|
this.process = null!;
|
|
|
|
|
}
|
|
|
|
|
@ -57,40 +60,88 @@ class RawSubprocessStream extends Streams.ObjectReadWriteStream<string> {
|
|
|
|
|
this.process = process;
|
|
|
|
|
}
|
|
|
|
|
_write(message: string) {
|
|
|
|
|
if (!this.process.getProcess().connected) {
|
|
|
|
|
if (!this.process.isConnected) {
|
|
|
|
|
// no error because the crash handler should already have shown an error, and
|
|
|
|
|
// sometimes harmless messages are sent during cleanup
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
this.process.process.send(message);
|
|
|
|
|
this.process._send(message);
|
|
|
|
|
// responses are handled in ProcessWrapper
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
interface ProcessWrapper {
|
|
|
|
|
load: number;
|
|
|
|
|
process: ChildProcess | Worker;
|
|
|
|
|
release: () => Promise<void>;
|
|
|
|
|
getProcess: () => ChildProcess;
|
|
|
|
|
abstract class ProcessWrapper {
|
|
|
|
|
process: ChildProcess | null = null;
|
|
|
|
|
clusterWorker: cluster.Worker | null = null;
|
|
|
|
|
worker: worker_threads.Worker | null = null;
|
|
|
|
|
isConnected = true;
|
|
|
|
|
pid = 0;
|
|
|
|
|
constructor(file: string, type: ProcessManager['type'] = 'process', env?: AnyObject) {
|
|
|
|
|
switch (type) {
|
|
|
|
|
case 'process':
|
|
|
|
|
this.process = child_process.fork(file, [], {cwd: ROOT_DIR, env});
|
|
|
|
|
this.process.on('disconnect', () => {
|
|
|
|
|
this.isConnected = false;
|
|
|
|
|
});
|
|
|
|
|
this.pid = this.process.pid;
|
|
|
|
|
break;
|
|
|
|
|
case 'cluster':
|
|
|
|
|
this.clusterWorker = cluster.fork(env);
|
|
|
|
|
this.process = this.clusterWorker.process;
|
|
|
|
|
this.clusterWorker.on('disconnect', () => {
|
|
|
|
|
this.isConnected = false;
|
|
|
|
|
});
|
|
|
|
|
this.pid = this.process.pid;
|
|
|
|
|
break;
|
|
|
|
|
case 'worker':
|
|
|
|
|
this.worker = new Worker(file, {env});
|
|
|
|
|
this.worker.on('exit', () => {
|
|
|
|
|
this.isConnected = false;
|
|
|
|
|
});
|
|
|
|
|
this.pid = -this.worker.threadId;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
abstract getLoad(): number;
|
|
|
|
|
abstract release(): Promise<void>;
|
|
|
|
|
_send(message: string) {
|
|
|
|
|
if (this.worker) {
|
|
|
|
|
this.worker.postMessage(message);
|
|
|
|
|
} else {
|
|
|
|
|
(this.clusterWorker || this.process)!.send(message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_onMessage(listener: (message: string) => void) {
|
|
|
|
|
(this.worker || this.clusterWorker || this.process)!.on('message', listener);
|
|
|
|
|
}
|
|
|
|
|
_onDisconnect(listener: () => void) {
|
|
|
|
|
if (this.worker) {
|
|
|
|
|
this.worker.on('exit', listener);
|
|
|
|
|
} else {
|
|
|
|
|
(this.clusterWorker || this.process)!.on('disconnect', listener);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_terminate() {
|
|
|
|
|
if (this.worker) {
|
|
|
|
|
void this.worker.terminate();
|
|
|
|
|
} else {
|
|
|
|
|
(this.clusterWorker || this.process)!.disconnect();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Wraps the process object in the PARENT process. */
|
|
|
|
|
export class QueryProcessWrapper implements ProcessWrapper {
|
|
|
|
|
process: ChildProcess;
|
|
|
|
|
taskId: number;
|
|
|
|
|
pendingTasks: Map<number, (resp: string) => void>;
|
|
|
|
|
pendingRelease: Promise<void> | null;
|
|
|
|
|
resolveRelease: (() => void) | null;
|
|
|
|
|
export class QueryProcessWrapper extends ProcessWrapper {
|
|
|
|
|
taskId = 0;
|
|
|
|
|
pendingTasks: Map<number, (resp: string) => void> = new Map();
|
|
|
|
|
pendingRelease: Promise<void> | null = null;
|
|
|
|
|
resolveRelease: (() => void) | null = null;
|
|
|
|
|
debug?: string;
|
|
|
|
|
|
|
|
|
|
constructor(file: string) {
|
|
|
|
|
this.process = child_process.fork(file, [], {cwd: ROOT_DIR});
|
|
|
|
|
this.taskId = 0;
|
|
|
|
|
this.pendingTasks = new Map();
|
|
|
|
|
this.pendingRelease = null;
|
|
|
|
|
this.resolveRelease = null;
|
|
|
|
|
constructor(file: string, type?: ProcessManager['type']) {
|
|
|
|
|
super(file, type);
|
|
|
|
|
|
|
|
|
|
this.process.on('message', (message: string) => {
|
|
|
|
|
this._onMessage(message => {
|
|
|
|
|
const nlLoc = message.indexOf('\n');
|
|
|
|
|
if (nlLoc <= 0) throw new Error(`Invalid response ${message}`);
|
|
|
|
|
if (message.slice(0, nlLoc) === 'THROW') {
|
|
|
|
|
@ -110,22 +161,18 @@ export class QueryProcessWrapper implements ProcessWrapper {
|
|
|
|
|
this.pendingTasks.delete(taskId);
|
|
|
|
|
resolve(JSON.parse(message.slice(nlLoc + 1)));
|
|
|
|
|
|
|
|
|
|
if (this.resolveRelease && !this.load) this.destroy();
|
|
|
|
|
if (this.resolveRelease && !this.getLoad()) this.destroy();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
getProcess() {
|
|
|
|
|
return this.process;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
get load() {
|
|
|
|
|
getLoad() {
|
|
|
|
|
return this.pendingTasks.size;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
query(input: any): Promise<any> {
|
|
|
|
|
this.taskId++;
|
|
|
|
|
const taskId = this.taskId;
|
|
|
|
|
this.process.send(`${taskId}\n${JSON.stringify(input)}`);
|
|
|
|
|
this._send(`${taskId}\n${JSON.stringify(input)}`);
|
|
|
|
|
return new Promise(resolve => {
|
|
|
|
|
this.pendingTasks.set(taskId, resolve);
|
|
|
|
|
});
|
|
|
|
|
@ -133,7 +180,7 @@ export class QueryProcessWrapper implements ProcessWrapper {
|
|
|
|
|
|
|
|
|
|
release(): Promise<void> {
|
|
|
|
|
if (this.pendingRelease) return this.pendingRelease;
|
|
|
|
|
if (!this.load) {
|
|
|
|
|
if (!this.getLoad()) {
|
|
|
|
|
this.destroy();
|
|
|
|
|
} else {
|
|
|
|
|
this.pendingRelease = new Promise(resolve => {
|
|
|
|
|
@ -148,7 +195,7 @@ export class QueryProcessWrapper implements ProcessWrapper {
|
|
|
|
|
// already destroyed
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
this.process.disconnect();
|
|
|
|
|
if (this.isConnected) this._terminate();
|
|
|
|
|
for (const resolver of this.pendingTasks.values()) {
|
|
|
|
|
// maybe we should track reject functions too...
|
|
|
|
|
resolver('');
|
|
|
|
|
@ -164,8 +211,7 @@ export class QueryProcessWrapper implements ProcessWrapper {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Wraps the process object in the PARENT process. */
|
|
|
|
|
export class StreamProcessWrapper implements ProcessWrapper {
|
|
|
|
|
process: ChildProcess;
|
|
|
|
|
export class StreamProcessWrapper extends ProcessWrapper {
|
|
|
|
|
taskId = 0;
|
|
|
|
|
activeStreams = new Map<number, SubprocessStream>();
|
|
|
|
|
pendingRelease: Promise<void> | null = null;
|
|
|
|
|
@ -176,10 +222,10 @@ export class StreamProcessWrapper implements ProcessWrapper {
|
|
|
|
|
this.debug = (this.debug || '').slice(-32768) + '\n=====\n' + message;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
constructor(file: string) {
|
|
|
|
|
this.process = child_process.fork(file, [], {cwd: ROOT_DIR});
|
|
|
|
|
constructor(file: string, type?: ProcessManager['type']) {
|
|
|
|
|
super(file, type);
|
|
|
|
|
|
|
|
|
|
this.process.on('message', (message: string) => {
|
|
|
|
|
this._onMessage(message => {
|
|
|
|
|
let nlLoc = message.indexOf('\n');
|
|
|
|
|
if (nlLoc <= 0) throw new Error(`Invalid response ${message}`);
|
|
|
|
|
if (message.slice(0, nlLoc) === 'THROW') {
|
|
|
|
|
@ -219,17 +265,13 @@ export class StreamProcessWrapper implements ProcessWrapper {
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
getProcess() {
|
|
|
|
|
return this.process;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
deleteStream(taskId: number) {
|
|
|
|
|
this.activeStreams.delete(taskId);
|
|
|
|
|
// try to release
|
|
|
|
|
if (this.resolveRelease && !this.load) void this.destroy();
|
|
|
|
|
if (this.resolveRelease && !this.getLoad()) void this.destroy();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
get load() {
|
|
|
|
|
getLoad() {
|
|
|
|
|
return this.activeStreams.size;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -243,7 +285,7 @@ export class StreamProcessWrapper implements ProcessWrapper {
|
|
|
|
|
|
|
|
|
|
release(): Promise<void> {
|
|
|
|
|
if (this.pendingRelease) return this.pendingRelease;
|
|
|
|
|
if (!this.load) {
|
|
|
|
|
if (!this.getLoad()) {
|
|
|
|
|
void this.destroy();
|
|
|
|
|
} else {
|
|
|
|
|
this.pendingRelease = new Promise(resolve => {
|
|
|
|
|
@ -258,7 +300,7 @@ export class StreamProcessWrapper implements ProcessWrapper {
|
|
|
|
|
// already destroyed
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
this.process.disconnect();
|
|
|
|
|
if (this.isConnected) this._terminate();
|
|
|
|
|
const destroyed = [];
|
|
|
|
|
for (const stream of this.activeStreams.values()) {
|
|
|
|
|
destroyed.push(stream.destroy());
|
|
|
|
|
@ -289,8 +331,7 @@ export class StreamWorker {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Wraps the process object in the PARENT process. */
|
|
|
|
|
export class RawProcessWrapper implements ProcessWrapper, StreamWorker {
|
|
|
|
|
process: ChildProcess & {process: undefined} | Worker;
|
|
|
|
|
export class RawProcessWrapper extends ProcessWrapper implements StreamWorker {
|
|
|
|
|
taskId = 0;
|
|
|
|
|
stream: RawSubprocessStream;
|
|
|
|
|
pendingRelease: Promise<void> | null = null;
|
|
|
|
|
@ -305,23 +346,18 @@ export class RawProcessWrapper implements ProcessWrapper, StreamWorker {
|
|
|
|
|
this.debug = (this.debug || '').slice(-32768) + '\n=====\n' + message;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
constructor(file: string, isCluster?: boolean, env?: AnyObject) {
|
|
|
|
|
if (isCluster) {
|
|
|
|
|
this.process = cluster.fork(env);
|
|
|
|
|
this.workerid = this.process.id;
|
|
|
|
|
} else {
|
|
|
|
|
this.process = child_process.fork(file, [], {cwd: ROOT_DIR, env}) as any;
|
|
|
|
|
}
|
|
|
|
|
constructor(file: string, type: ProcessManager['type'] = 'process', env?: AnyObject) {
|
|
|
|
|
super(file, type, env);
|
|
|
|
|
|
|
|
|
|
this.process.on('message', (message: string) => {
|
|
|
|
|
this._onMessage(message => {
|
|
|
|
|
this.stream.push(message);
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
this.stream = new RawSubprocessStream(this);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
getProcess() {
|
|
|
|
|
return this.process.process ? this.process.process : this.process;
|
|
|
|
|
getLoad() {
|
|
|
|
|
return this.load;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
release(): Promise<void> {
|
|
|
|
|
@ -342,7 +378,7 @@ export class RawProcessWrapper implements ProcessWrapper, StreamWorker {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
this.stream.destroy();
|
|
|
|
|
this.process.disconnect();
|
|
|
|
|
if (this.isConnected) this._terminate();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -351,38 +387,63 @@ export class RawProcessWrapper implements ProcessWrapper, StreamWorker {
|
|
|
|
|
* A ProcessManager wraps a query function: A function that takes a
|
|
|
|
|
* string and returns a string or Promise<string>.
|
|
|
|
|
*/
|
|
|
|
|
export abstract class ProcessManager {
|
|
|
|
|
processes: ProcessWrapper[] = [];
|
|
|
|
|
releasingProcesses: ProcessWrapper[] = [];
|
|
|
|
|
crashedProcesses: ProcessWrapper[] = [];
|
|
|
|
|
export abstract class ProcessManager<T extends ProcessWrapper = ProcessWrapper> {
|
|
|
|
|
processes: T[] = [];
|
|
|
|
|
releasingProcesses: T[] = [];
|
|
|
|
|
crashedProcesses: T[] = [];
|
|
|
|
|
readonly module: NodeJS.Module;
|
|
|
|
|
readonly filename: string;
|
|
|
|
|
readonly basename: string;
|
|
|
|
|
readonly isParentProcess: boolean;
|
|
|
|
|
crashTime = 0;
|
|
|
|
|
crashRespawnCount = 0;
|
|
|
|
|
readonly type: 'process' | 'cluster' | 'worker';
|
|
|
|
|
|
|
|
|
|
constructor(module: NodeJS.Module) {
|
|
|
|
|
constructor(module: NodeJS.Module, type?: ProcessManager['type']) {
|
|
|
|
|
this.module = module;
|
|
|
|
|
this.type = type || 'process';
|
|
|
|
|
this.filename = module.filename;
|
|
|
|
|
this.basename = path.basename(module.filename);
|
|
|
|
|
this.isParentProcess = (process.mainModule !== module || !process.send);
|
|
|
|
|
this.isParentProcess = (process.mainModule !== module || !process.send) && worker_threads.isMainThread;
|
|
|
|
|
|
|
|
|
|
if (this.type === 'cluster' && this.isParentProcess) {
|
|
|
|
|
cluster.setupMaster({
|
|
|
|
|
exec: this.filename,
|
|
|
|
|
// @ts-ignore TODO: update type definition
|
|
|
|
|
cwd: ROOT_DIR,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.listen();
|
|
|
|
|
}
|
|
|
|
|
_sendParent(message: string) {
|
|
|
|
|
if (worker_threads.parentPort) {
|
|
|
|
|
worker_threads.parentPort.postMessage(message);
|
|
|
|
|
} else {
|
|
|
|
|
process.send!(message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_onParentMessage(listener: (message: string) => void) {
|
|
|
|
|
if (worker_threads.parentPort) {
|
|
|
|
|
worker_threads.parentPort.on('message', listener);
|
|
|
|
|
} else {
|
|
|
|
|
process.on('message', listener);
|
|
|
|
|
process.on('disconnect', () => process.exit());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
acquire() {
|
|
|
|
|
if (!this.processes.length) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
let lowestLoad = this.processes[0];
|
|
|
|
|
for (const process of this.processes) {
|
|
|
|
|
if (process.load < lowestLoad.load) {
|
|
|
|
|
if (process.getLoad() < lowestLoad.getLoad()) {
|
|
|
|
|
lowestLoad = process;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return lowestLoad;
|
|
|
|
|
}
|
|
|
|
|
releaseCrashed(process: ProcessWrapper) {
|
|
|
|
|
releaseCrashed(process: T) {
|
|
|
|
|
const index = this.processes.indexOf(process);
|
|
|
|
|
|
|
|
|
|
// The process was shut down sanely, not crashed
|
|
|
|
|
@ -407,7 +468,7 @@ export abstract class ProcessManager {
|
|
|
|
|
this.crashRespawnCount += 1;
|
|
|
|
|
// Notify any global crash logger
|
|
|
|
|
void Promise.reject(
|
|
|
|
|
new Error(`Process ${this.basename} ${process.getProcess().pid} crashed and had to be restarted`)
|
|
|
|
|
new Error(`Process ${this.basename} ${process.pid} crashed and had to be restarted`)
|
|
|
|
|
);
|
|
|
|
|
this.releasingProcesses.push(process);
|
|
|
|
|
this.crashedProcesses.push(process);
|
|
|
|
|
@ -438,7 +499,7 @@ export abstract class ProcessManager {
|
|
|
|
|
if (disabled && !force) return;
|
|
|
|
|
while (this.processes.length < count) {
|
|
|
|
|
const process = this.createProcess();
|
|
|
|
|
process.process.on('disconnect', () => this.releaseCrashed(process));
|
|
|
|
|
process._onDisconnect(() => this.releaseCrashed(process));
|
|
|
|
|
this.processes.push(process);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -449,8 +510,8 @@ export abstract class ProcessManager {
|
|
|
|
|
return unspawned;
|
|
|
|
|
}
|
|
|
|
|
abstract listen(): void;
|
|
|
|
|
abstract createProcess(): ProcessWrapper;
|
|
|
|
|
destroyProcess(process: ProcessWrapper) {}
|
|
|
|
|
abstract createProcess(): T;
|
|
|
|
|
destroyProcess(process: T) {}
|
|
|
|
|
destroy() {
|
|
|
|
|
const index = processManagers.indexOf(this);
|
|
|
|
|
if (index >= 0) processManagers.splice(index, 1);
|
|
|
|
|
@ -458,27 +519,27 @@ export abstract class ProcessManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export class QueryProcessManager<T = string, U = string> extends ProcessManager {
|
|
|
|
|
export class QueryProcessManager<T = string, U = string> extends ProcessManager<QueryProcessWrapper> {
|
|
|
|
|
_query: (input: T) => U | Promise<U>;
|
|
|
|
|
|
|
|
|
|
constructor(module: NodeJS.Module, query: (input: T) => U | Promise<U>) {
|
|
|
|
|
super(module);
|
|
|
|
|
constructor(module: NodeJS.Module, query: (input: T) => U | Promise<U>, type?: ProcessManager['type']) {
|
|
|
|
|
super(module, type);
|
|
|
|
|
this._query = query;
|
|
|
|
|
|
|
|
|
|
processManagers.push(this);
|
|
|
|
|
}
|
|
|
|
|
query(input: T) {
|
|
|
|
|
const process = this.acquire() as QueryProcessWrapper;
|
|
|
|
|
const process = this.acquire();
|
|
|
|
|
if (!process) return Promise.resolve(this._query(input));
|
|
|
|
|
return process.query(input);
|
|
|
|
|
}
|
|
|
|
|
createProcess() {
|
|
|
|
|
return new QueryProcessWrapper(this.filename);
|
|
|
|
|
return new QueryProcessWrapper(this.filename, this.type);
|
|
|
|
|
}
|
|
|
|
|
listen() {
|
|
|
|
|
if (this.isParentProcess) return;
|
|
|
|
|
// child process
|
|
|
|
|
process.on('message', (message: string) => {
|
|
|
|
|
this._onParentMessage(message => {
|
|
|
|
|
const nlLoc = message.indexOf('\n');
|
|
|
|
|
if (nlLoc <= 0) throw new Error(`Invalid response ${message}`);
|
|
|
|
|
const taskId = message.slice(0, nlLoc);
|
|
|
|
|
@ -486,21 +547,18 @@ export class QueryProcessManager<T = string, U = string> extends ProcessManager
|
|
|
|
|
|
|
|
|
|
if (taskId.startsWith('EVAL')) {
|
|
|
|
|
// eslint-disable-next-line no-eval
|
|
|
|
|
process.send!(`${taskId}\n` + eval(message));
|
|
|
|
|
this._sendParent(`${taskId}\n` + eval(message));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Promise.resolve(this._query(JSON.parse(message))).then(
|
|
|
|
|
response => process.send!(`${taskId}\n${JSON.stringify(response)}`)
|
|
|
|
|
response => this._sendParent(`${taskId}\n${JSON.stringify(response)}`)
|
|
|
|
|
);
|
|
|
|
|
});
|
|
|
|
|
process.on('disconnect', () => {
|
|
|
|
|
process.exit();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export class StreamProcessManager extends ProcessManager {
|
|
|
|
|
export class StreamProcessManager extends ProcessManager<StreamProcessWrapper> {
|
|
|
|
|
/* taskid: stream used only in child process */
|
|
|
|
|
activeStreams: Map<string, Streams.ObjectReadWriteStream<string>>;
|
|
|
|
|
// tslint:disable-next-line:variable-name
|
|
|
|
|
@ -514,12 +572,12 @@ export class StreamProcessManager extends ProcessManager {
|
|
|
|
|
processManagers.push(this);
|
|
|
|
|
}
|
|
|
|
|
createStream() {
|
|
|
|
|
const process = this.acquire() as StreamProcessWrapper;
|
|
|
|
|
const process = this.acquire();
|
|
|
|
|
if (!process) return this._createStream();
|
|
|
|
|
return process.createStream();
|
|
|
|
|
}
|
|
|
|
|
createProcess() {
|
|
|
|
|
return new StreamProcessWrapper(this.filename);
|
|
|
|
|
return new StreamProcessWrapper(this.filename, this.type);
|
|
|
|
|
}
|
|
|
|
|
async pipeStream(taskId: string, stream: Streams.ObjectReadStream<string>) {
|
|
|
|
|
let done = false;
|
|
|
|
|
@ -527,22 +585,22 @@ export class StreamProcessManager extends ProcessManager {
|
|
|
|
|
try {
|
|
|
|
|
let value;
|
|
|
|
|
({value, done} = await stream.next());
|
|
|
|
|
process.send!(`${taskId}\nPUSH\n${value}`);
|
|
|
|
|
this._sendParent(`${taskId}\nPUSH\n${value}`);
|
|
|
|
|
} catch (err) {
|
|
|
|
|
process.send!(`${taskId}\nTHROW\n${err.stack}`);
|
|
|
|
|
this._sendParent(`${taskId}\nTHROW\n${err.stack}`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (!this.activeStreams.has(taskId)) {
|
|
|
|
|
// stream.destroy() was called, don't send an END message
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
process.send!(`${taskId}\nEND`);
|
|
|
|
|
this._sendParent(`${taskId}\nEND`);
|
|
|
|
|
this.activeStreams.delete(taskId);
|
|
|
|
|
}
|
|
|
|
|
listen() {
|
|
|
|
|
if (this.isParentProcess) return;
|
|
|
|
|
// child process
|
|
|
|
|
process.on('message', (message: string) => {
|
|
|
|
|
this._onParentMessage(message => {
|
|
|
|
|
let nlLoc = message.indexOf('\n');
|
|
|
|
|
if (nlLoc <= 0) throw new Error(`Invalid request ${message}`);
|
|
|
|
|
const taskId = message.slice(0, nlLoc);
|
|
|
|
|
@ -556,7 +614,7 @@ export class StreamProcessManager extends ProcessManager {
|
|
|
|
|
|
|
|
|
|
if (taskId.startsWith('EVAL')) {
|
|
|
|
|
// eslint-disable-next-line no-eval
|
|
|
|
|
process.send!(`${taskId}\n` + eval(message));
|
|
|
|
|
this._sendParent(`${taskId}\n` + eval(message));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -579,20 +637,16 @@ export class StreamProcessManager extends ProcessManager {
|
|
|
|
|
throw new Error(`Unrecognized messageType ${messageType}`);
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
process.on('disconnect', () => {
|
|
|
|
|
process.exit();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
export class RawProcessManager extends ProcessManager {
|
|
|
|
|
export class RawProcessManager extends ProcessManager<RawProcessWrapper> {
|
|
|
|
|
/** full list of processes - parent process only */
|
|
|
|
|
workers: StreamWorker[] = [];
|
|
|
|
|
/** if spawning 0 worker processes, the worker is instead stored here in the parent process */
|
|
|
|
|
masterWorker: StreamWorker | null = null;
|
|
|
|
|
/** stream used only in the child process */
|
|
|
|
|
activeStream: Streams.ObjectReadWriteStream<string> | null = null;
|
|
|
|
|
isCluster: boolean;
|
|
|
|
|
spawnSubscription: ((worker: StreamWorker) => void) | null = null;
|
|
|
|
|
unspawnSubscription: ((worker: StreamWorker) => void) | null = null;
|
|
|
|
|
_setupChild: () => Streams.ObjectReadWriteStream<string>;
|
|
|
|
|
@ -603,21 +657,12 @@ export class RawProcessManager extends ProcessManager {
|
|
|
|
|
constructor(options: {
|
|
|
|
|
module: NodeJS.Module,
|
|
|
|
|
setupChild: () => Streams.ObjectReadWriteStream<string>,
|
|
|
|
|
isCluster?: boolean,
|
|
|
|
|
type?: ProcessManager['type'],
|
|
|
|
|
env?: AnyObject,
|
|
|
|
|
}) {
|
|
|
|
|
super(options.module);
|
|
|
|
|
this.isCluster = !!options.isCluster;
|
|
|
|
|
this._setupChild = options.setupChild;
|
|
|
|
|
super(options.module, options.type);
|
|
|
|
|
this.env = options.env;
|
|
|
|
|
|
|
|
|
|
if (this.isCluster && this.isParentProcess) {
|
|
|
|
|
cluster.setupMaster({
|
|
|
|
|
exec: this.filename,
|
|
|
|
|
// @ts-ignore TODO: update type definition
|
|
|
|
|
cwd: ROOT_DIR,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
this._setupChild = options.setupChild;
|
|
|
|
|
|
|
|
|
|
processManagers.push(this);
|
|
|
|
|
}
|
|
|
|
|
@ -636,7 +681,7 @@ export class RawProcessManager extends ProcessManager {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
createProcess() {
|
|
|
|
|
const process = new RawProcessWrapper(this.filename, this.isCluster, this.env);
|
|
|
|
|
const process = new RawProcessWrapper(this.filename, this.type, this.env);
|
|
|
|
|
this.workers.push(process);
|
|
|
|
|
this.spawnSubscription?.(process);
|
|
|
|
|
return process;
|
|
|
|
|
@ -653,9 +698,9 @@ export class RawProcessManager extends ProcessManager {
|
|
|
|
|
try {
|
|
|
|
|
let value;
|
|
|
|
|
({value, done} = await stream.next());
|
|
|
|
|
process.send!(value);
|
|
|
|
|
if (!done) this._sendParent(value!);
|
|
|
|
|
} catch (err) {
|
|
|
|
|
process.send!(`THROW\n${err.stack}`);
|
|
|
|
|
this._sendParent(`THROW\n${err.stack}`);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@ -668,11 +713,8 @@ export class RawProcessManager extends ProcessManager {
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// child process
|
|
|
|
|
process.on('message', (message: string) => {
|
|
|
|
|
this._onParentMessage(message => {
|
|
|
|
|
void this.activeStream!.write(message);
|
|
|
|
|
});
|
|
|
|
|
process.on('disconnect', () => {
|
|
|
|
|
process.exit();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|