pokemon-showdown/sockets.js
Ben Davies b66e395bf6 Add Config.ofe, refactor optional dependency usage (#3644)
Config.ofe toggles whether or not to write heapdumps if sockets workers
run out of memory, since ofe is an optional dependency but is not
installed by default. nodemailer is now a nonDefaultDependency, and
will complain if it's not installed when Config.crashguardemail is
enabled.
2017-06-17 14:37:11 -07:00

525 lines
15 KiB
JavaScript

/**
* Connections
* Pokemon Showdown - http://pokemonshowdown.com/
*
* Abstraction layer for multi-process SockJS connections.
*
* This file handles all the communications between the users'
* browsers, the networking processes, and users.js in the
* main process.
*
* @license MIT license
*/
'use strict';
const cluster = require('cluster');
global.Config = require('./config/config');
if (cluster.isMaster) {
cluster.setupMaster({
exec: require('path').resolve(__dirname, 'sockets'),
});
const workers = exports.workers = new Map();
const spawnWorker = exports.spawnWorker = function () {
let worker = cluster.fork({PSPORT: Config.port, PSBINDADDR: Config.bindaddress || '0.0.0.0', PSNOSSL: Config.ssl ? 0 : 1});
let id = worker.id;
workers.set(id, worker);
worker.on('message', data => {
// console.log('master received: ' + data);
switch (data.charAt(0)) {
case '*': {
// *socketid, ip, protocol
// connect
let nlPos = data.indexOf('\n');
let nlPos2 = data.indexOf('\n', nlPos + 1);
Users.socketConnect(worker, id, data.slice(1, nlPos), data.slice(nlPos + 1, nlPos2), data.slice(nlPos2 + 1));
break;
}
case '!': {
// !socketid
// disconnect
Users.socketDisconnect(worker, id, data.substr(1));
break;
}
case '<': {
// <socketid, message
// message
let nlPos = data.indexOf('\n');
Users.socketReceive(worker, id, data.substr(1, nlPos - 1), data.substr(nlPos + 1));
break;
}
default:
// unhandled
}
});
return worker;
};
cluster.on('exit', (worker, code, signal) => {
if (code === null && signal === 'SIGTERM') {
// worker was killed by Sockets.killWorker or Sockets.killPid
} else {
// worker crashed, try our best to clean up
require('./crashlogger')(new Error(`Worker ${worker.id} abruptly died`), "The main process");
// this could get called during cleanup; prevent it from crashing
// note: overwriting Worker#send is unnecessary in Node.js v7.0.0 and above
// see https://github.com/nodejs/node/commit/8c53d2fe9f102944cc1889c4efcac7a86224cf0a
worker.send = () => {};
let count = 0;
Users.connections.forEach(connection => {
if (connection.worker === worker) {
Users.socketDisconnect(worker, worker.id, connection.socketid);
count++;
}
});
console.error(`${count} connections were lost.`);
}
// don't delete the worker, so we can investigate it if necessary.
// attempt to recover
spawnWorker();
});
exports.listen = function (port, bindAddress, workerCount) {
if (port !== undefined && !isNaN(port)) {
Config.port = port;
Config.ssl = null;
} else {
port = Config.port;
// Autoconfigure when running in cloud environments.
try {
const cloudenv = require('cloud-env');
bindAddress = cloudenv.get('IP', bindAddress);
port = cloudenv.get('PORT', port);
} catch (e) {}
}
if (bindAddress !== undefined) {
Config.bindaddress = bindAddress;
}
if (workerCount === undefined) {
workerCount = (Config.workers !== undefined ? Config.workers : 1);
}
for (let i = 0; i < workerCount; i++) {
spawnWorker();
}
};
exports.killWorker = function (worker) {
let count = 0;
Users.connections.forEach(connection => {
if (connection.worker === worker) {
Users.socketDisconnect(worker, worker.id, connection.socketid);
count++;
}
});
console.log(`${count} connections were lost.`);
try {
worker.disconnect();
worker.kill('SIGTERM');
} catch (e) {}
workers.delete(worker.id);
return count;
};
exports.killPid = function (pid) {
pid = '' + pid;
for (let [workerid, worker] of workers) { // eslint-disable-line no-unused-vars
if (pid === '' + worker.process.pid) {
return this.killWorker(worker);
}
}
return false;
};
exports.socketSend = function (worker, socketid, message) {
worker.send(`>${socketid}\n${message}`);
};
exports.socketDisconnect = function (worker, socketid) {
worker.send(`!${socketid}`);
};
exports.channelBroadcast = function (channelid, message) {
workers.forEach(worker => {
worker.send(`#${channelid}\n${message}`);
});
};
exports.channelSend = function (worker, channelid, message) {
worker.send(`#${channelid}\n${message}`);
};
exports.channelAdd = function (worker, channelid, socketid) {
worker.send(`+${channelid}\n${socketid}`);
};
exports.channelRemove = function (worker, channelid, socketid) {
worker.send(`-${channelid}\n${socketid}`);
};
exports.subchannelBroadcast = function (channelid, message) {
workers.forEach(worker => {
worker.send(`:${channelid}\n${message}`);
});
};
exports.subchannelMove = function (worker, channelid, subchannelid, socketid) {
worker.send(`.${channelid}\n${subchannelid}\n${socketid}`);
};
} else {
// is worker
if (process.env.PSPORT) Config.port = +process.env.PSPORT;
if (process.env.PSBINDADDR) Config.bindaddress = process.env.PSBINDADDR;
if (+process.env.PSNOSSL) Config.ssl = null;
if (Config.ofe) {
try {
require.resolve('ofe');
} catch (e) {
if (e.code !== 'MODULE_NOT_FOUND') throw e; // should never happen
throw new Error(
'ofe is not installed, but it is a required dependency if Config.ofe is set to true! ' +
'Run npm install ofe and restart the server.'
);
}
// Create a heapdump if the process runs out of memory.
require('ofe').call();
}
// Static HTTP server
// This handles the custom CSS and custom avatar features, and also
// redirects yourserver:8001 to yourserver-8001.psim.us
// It's optional if you don't need these features.
global.Dnsbl = require('./dnsbl');
if (Config.crashguard) {
// graceful crash
process.on('uncaughtException', err => {
require('./crashlogger')(err, `Socket process ${cluster.worker.id} (${process.pid})`, true);
});
}
let app = require('http').createServer();
let appssl = Config.ssl ? require('https').createServer(Config.ssl.options) : null;
// Static server
const StaticServer = require('node-static').Server;
const roomidRegex = /^\/(?:[A-Za-z0-9][A-Za-z0-9-]*)\/?$/;
const cssServer = new StaticServer('./config');
const avatarServer = new StaticServer('./config/avatars');
const staticServer = new StaticServer('./static');
const staticRequestHandler = (req, res) => {
// console.log(`static rq: ${req.socket.remoteAddress}:${req.socket.remotePort} -> ${req.socket.localAddress}:${req.socket.localPort} - ${req.method} ${req.url} ${req.httpVersion} - ${req.rawHeaders.join('|')}`);
req.resume();
req.addListener('end', () => {
if (Config.customhttpresponse &&
Config.customhttpresponse(req, res)) {
return;
}
let server = staticServer;
if (req.url === '/custom.css') {
server = cssServer;
} else if (req.url.startsWith('/avatars/')) {
req.url = req.url.substr(8);
server = avatarServer;
} else if (roomidRegex.test(req.url)) {
req.url = '/';
}
server.serve(req, res, e => {
if (e && (e.status === 404)) {
staticServer.serveFile('404.html', 404, {}, req, res);
}
});
});
};
app.on('request', staticRequestHandler);
if (appssl) appssl.on('request', staticRequestHandler);
// SockJS server
// This is the main server that handles users connecting to our server
// and doing things on our server.
const sockjs = require('sockjs');
const server = sockjs.createServer({
sockjs_url: "//play.pokemonshowdown.com/js/lib/sockjs-1.1.1-nwjsfix.min.js",
log: (severity, message) => {
if (severity === 'error') console.log('ERROR: ' + message);
},
prefix: '/showdown',
});
const sockets = new Map();
const channels = new Map();
const subchannels = new Map();
// Deal with phantom connections.
const sweepClosedSockets = () => {
sockets.forEach(socket => {
if (socket.protocol === 'xhr-streaming' &&
socket._session &&
socket._session.recv) {
socket._session.recv.didClose();
}
// A ghost connection's `_session.to_tref._idlePrev` (and `_idleNext`) property is `null` while
// it is an object for normal users. Under normal circumstances, those properties should only be
// `null` when the timeout has already been called, but somehow it's not happening for some connections.
// Simply calling `_session.timeout_cb` (the function bound to the aformentioned timeout) manually
// on those connections kills those connections. For a bit of background, this timeout is the timeout
// that sockjs sets to wait for users to reconnect within that time to continue their session.
if (socket._session &&
socket._session.to_tref &&
!socket._session.to_tref._idlePrev) {
socket._session.timeout_cb();
}
});
};
const interval = setInterval(sweepClosedSockets, 1000 * 60 * 10); // eslint-disable-line no-unused-vars
process.on('message', data => {
// console.log('worker received: ' + data);
let socket = null;
let socketid = '';
let channel = null;
let channelid = '';
let subchannel = null;
let subchannelid = '';
let nlLoc = -1;
let message = '';
switch (data.charAt(0)) {
case '$': // $code
eval(data.substr(1));
break;
case '!': // !socketid
// destroy
socketid = data.substr(1);
socket = sockets.get(socketid);
if (!socket) return;
socket.end();
// After sending the FIN packet, we make sure the I/O is totally blocked for this socket
socket.destroy();
sockets.delete(socketid);
channels.forEach(channel => channel.delete(socketid));
break;
case '>':
// >socketid, message
// message
nlLoc = data.indexOf('\n');
socketid = data.substr(1, nlLoc - 1);
socket = sockets.get(socketid);
if (!socket) return;
message = data.substr(nlLoc + 1);
socket.write(message);
break;
case '#':
// #channelid, message
// message to channel
nlLoc = data.indexOf('\n');
channelid = data.substr(1, nlLoc - 1);
channel = channels.get(channelid);
if (!channel) return;
message = data.substr(nlLoc + 1);
channel.forEach(socket => socket.write(message));
break;
case '+':
// +channelid, socketid
// add to channel
nlLoc = data.indexOf('\n');
socketid = data.substr(nlLoc + 1);
socket = sockets.get(socketid);
if (!socket) return;
channelid = data.substr(1, nlLoc - 1);
channel = channels.get(channelid);
if (!channel) {
channel = new Map();
channels.set(channelid, channel);
}
channel.set(socketid, socket);
break;
case '-':
// -channelid, socketid
// remove from channel
nlLoc = data.indexOf('\n');
channelid = data.slice(1, nlLoc);
channel = channels.get(channelid);
if (!channel) return;
socketid = data.slice(nlLoc + 1);
channel.delete(socketid);
subchannel = subchannels.get(channelid);
if (subchannel) subchannel.delete(socketid);
if (!channel.size) {
channels.delete(channelid);
if (subchannel) subchannels.delete(channelid);
}
break;
case '.':
// .channelid, subchannelid, socketid
// move subchannel
nlLoc = data.indexOf('\n');
channelid = data.slice(1, nlLoc);
let nlLoc2 = data.indexOf('\n', nlLoc + 1);
subchannelid = data.slice(nlLoc + 1, nlLoc2);
socketid = data.slice(nlLoc2 + 1);
subchannel = subchannels.get(channelid);
if (!subchannel) {
subchannel = new Map();
subchannels.set(channelid, subchannel);
}
if (subchannelid === '0') {
subchannel.delete(socketid);
} else {
subchannel.set(socketid, subchannelid);
}
break;
case ':':
// :channelid, message
// message to subchannel
nlLoc = data.indexOf('\n');
channelid = data.slice(1, nlLoc);
channel = channels.get(channelid);
if (!channel) return;
let messages = [null, null, null];
message = data.substr(nlLoc + 1);
subchannel = subchannels.get(channelid);
channel.forEach((socket, socketid) => {
switch (subchannel ? subchannel.get(socketid) : '0') {
case '1':
if (!messages[1]) {
messages[1] = message.replace(/\n\|split\n[^\n]*\n([^\n]*)\n[^\n]*\n[^\n]*/g, '\n$1');
}
socket.write(messages[1]);
break;
case '2':
if (!messages[2]) {
messages[2] = message.replace(/\n\|split\n[^\n]*\n[^\n]*\n([^\n]*)\n[^\n]*/g, '\n$1');
}
socket.write(messages[2]);
break;
default:
if (!messages[0]) {
messages[0] = message.replace(/\n\|split\n([^\n]*)\n[^\n]*\n[^\n]*\n[^\n]*/g, '\n$1');
}
socket.write(messages[0]);
break;
}
});
break;
}
});
// Clean up any remaining connections on disconnect. If this isn't done,
// the process will not exit until any remaining connections have been destroyed.
// Afterwards, the worker process will die on its own.
process.once('disconnect', () => {
sockets.forEach(socket => {
try {
socket.end();
socket.destroy();
} catch (e) {}
});
sockets.clear();
channels.clear();
subchannels.clear();
app.close();
if (appssl) appssl.close();
});
// this is global so it can be hotpatched if necessary
let isTrustedProxyIp = Dnsbl.checker(Config.proxyip);
let socketCounter = 0;
server.on('connection', socket => {
if (!socket) {
// For reasons that are not entirely clear, SockJS sometimes triggers
// this event with a null `socket` argument.
return;
} else if (!socket.remoteAddress) {
// This condition occurs several times per day. It may be a SockJS bug.
try {
socket.end();
} catch (e) {}
return;
}
let socketid = socket.id = '' + (++socketCounter);
sockets.set(socketid, socket);
if (isTrustedProxyIp(socket.remoteAddress)) {
let ips = (socket.headers['x-forwarded-for'] || '').split(',');
let ip;
while ((ip = ips.pop())) {
ip = ip.trim();
if (!isTrustedProxyIp(ip)) {
socket.remoteAddress = ip;
break;
}
}
}
process.send(`*${socketid}\n${socket.remoteAddress}\n${socket.protocol}`);
socket.on('data', message => {
// drop empty messages (DDoS?)
if (!message) return;
// drop messages over 100KB
if (message.length > 100000) {
console.log(`Dropping client message ${message.length / 1024} KB...`);
console.log(message.slice(0, 160));
return;
}
// drop legacy JSON messages
if (typeof message !== 'string' || message.startsWith('{')) return;
// drop blank messages (DDoS?)
let pipeIndex = message.indexOf('|');
if (pipeIndex < 0 || pipeIndex === message.length - 1) return;
process.send(`<${socketid}\n${message}`);
});
socket.on('close', () => {
process.send(`!${socketid}`);
sockets.delete(socketid);
channels.forEach(channel => channel.delete(socketid));
});
});
server.installHandlers(app, {});
app.listen(Config.port, Config.bindaddress);
console.log(`Worker ${cluster.worker.id} now listening on ${Config.bindaddress}:${Config.port}`);
if (appssl) {
server.installHandlers(appssl, {});
appssl.listen(Config.ssl.port, Config.bindaddress);
console.log(`Worker ${cluster.worker.id} now listening for SSL on port ${Config.ssl.port}`);
}
console.log(`Test your server at http://${Config.bindaddress === '0.0.0.0' ? 'localhost' : Config.bindaddress}:${Config.port}`);
require('./repl').start(`sockets-${cluster.worker.id}-${process.pid}`, cmd => eval(cmd));
}