pokemon-showdown/process-manager.js
2016-03-23 02:44:20 -05:00

151 lines
3.0 KiB
JavaScript

/**
* Process Manager
* Pokemon Showdown - http://pokemonshowdown.com/
*
* This file abstract out multiprocess logic involved in several tasks.
*
* @license MIT license
*/
'use strict';
const EventEmitter = require('events');
const processManagers = new Map();
class ProcessWrapper extends EventEmitter {
constructor(PM) {
super();
this.PM = PM;
this.active = true;
this.pendingTasks = new Map();
this.process = require('child_process').fork(PM.execFile, {cwd: __dirname});
// Allow events to bubble-up to the wrapper
this.process.on('message', message => this.emit('message', message));
this.on('message', PM.onMessageUpstream);
}
send(data) {
return this.process.send(data);
}
release() {
if (this.load || this.active) return;
this.process.disconnect();
}
get load() {
return this.pendingTasks.size;
}
}
class ProcessManager {
constructor(options) {
if (!options) options = {};
this.processes = [];
this.taskId = 0;
Object.assign(this, options);
if (typeof this.maxProcesses !== 'number') {
this.maxProcesses = 1;
}
processManagers.set(this, this.execFile);
}
spawn() {
for (let i = this.processes.length; i < this.maxProcesses; ++i) {
this.processes.push(new ProcessWrapper(this));
}
}
unspawn() {
for (let process of this.processes.splice(0)) {
process.active = false;
process.release();
}
}
respawn() {
this.unspawn();
this.spawn();
}
acquire() {
let process = this.processes[0];
for (let i = 1; i < this.processes.length; ++i) {
if (this.processes[i].load < process.load) {
process = this.processes[i];
}
}
return process;
}
release(process) {
process.release();
}
send() {
if (!this.processes.length) {
return Promise.resolve(this.receive.apply(this, arguments));
}
let serializedArgs = '';
switch (arguments.length) {
case 0:
break;
case 1:
serializedArgs = serialize(arguments[0]);
break;
case 2:
serializedArgs = serialize(arguments[0]) + '|' + serialize(arguments[1]);
break;
default:
let lastIndex = arguments.length - 1;
for (let i = 0; i < lastIndex; ++i) {
serializedArgs += serialize(arguments[i]) + '|';
}
serializedArgs += serialize(arguments[lastIndex]);
}
let process = this.acquire();
return new Promise((resolve, reject) => {
process.pendingTasks.set(this.taskId, resolve);
try {
process.process.send('' + this.taskId + '|' + serializedArgs);
} catch (e) {}
++this.taskId;
});
}
sendSync() {
// synchronously!
return this.receive.apply(this, arguments);
}
onMessageUpstream() {
// Expected to resolve the pending task completed.
}
onMessageDownstream() {
// Expected to call `receive()` at some point,
// and send the result to the parent process.
}
receive() {
// This is where the child process actually does stuff
// To be overriden by specific implementations.
return '';
}
}
ProcessManager.cache = processManagers;
module.exports = ProcessManager;
function serialize(str) {
if (typeof str === 'string') return str;
return JSON.stringify(str);
}