pokemon-showdown/sockets.js
Guangcong Luo 466359023f Load balance connections to proxy processes
Instead of handling all connections in the master process, we now
use Node's cluster module to push them to worker processes. These
worker processes are pretty lightweight and do nothing but proxy
connections so far.
2013-11-13 18:44:44 -08:00

279 lines
7.2 KiB
JavaScript

/**
* Connections
* Pokemon Showdown - http://pokemonshowdown.com/
*
* Abstraction layer for multi-process SockJS connections.
*
* @license MIT license
*/
var cluster = require('cluster');
var config = require('./config/config');
if (cluster.isMaster) {
cluster.setupMaster({
exec: 'sockets.js'
});
var workers = exports.workers = {};
var workerCount = config.workers || 1;
for (var i=0; i<workerCount; i++) {
var worker = workers[i] = cluster.fork();
(function(worker, i) {
worker.on('message', function(data) {
console.log('master received: '+data);
switch (data.charAt(0)) {
case '*': // *socketid, ip
// connect
var nlPos = data.indexOf('\n');
Users.socketConnect(worker, i, data.substr(1, nlPos-1), data.substr(nlPos+1));
break;
case '!': // !socketid
// disconnect
Users.socketDisconnect(worker, i, data.substr(1));
break;
case '<': // <socketid, message
// message
var nlPos = data.indexOf('\n');
Users.socketReceive(worker, i, data.substr(1, nlPos-1), data.substr(nlPos+1));
break;
}
});
})(worker, i);
}
exports.socketSend = function(worker, socketid, message) {
worker.send('>'+socketid+'\n'+message);
};
exports.socketDisconnect = function(worker, socketid) {
worker.send('!'+socketid);
};
exports.channelSend = function(worker, channelid, message) {
worker.send('#'+channelid+'\n'+message);
};
exports.channelAdd = function(worker, channelid, socketid) {
worker.send('+'+channelid+'\n'+socketid);
};
exports.channelRemove = function(worker, channelid, socketid) {
worker.send('-'+channelid+'\n'+socketid);
};
} else {
// is worker
// Static HTTP server
// This handles the custom CSS and custom avatar features, and also
// redirects yourserver:8001 to yourserver-8001.psim.us
// It's optional if you don't need these features.
var Cidr = require('./cidr');
var app = require('http').createServer();
var appssl;
if (config.ssl) {
appssl = require('https').createServer(config.ssl.options);
}
try {
(function() {
var nodestatic = require('node-static');
var cssserver = new nodestatic.Server('./config');
var avatarserver = new nodestatic.Server('./config/avatars');
var staticserver = new nodestatic.Server('./static');
var staticRequestHandler = function(request, response) {
request.resume();
request.addListener('end', function() {
if (config.customhttpresponse &&
config.customhttpresponse(request, response)) {
return;
}
var server;
if (request.url === '/custom.css') {
server = cssserver;
} else if (request.url.substr(0, 9) === '/avatars/') {
request.url = request.url.substr(8);
server = avatarserver;
} else {
if (/^\/([A-Za-z0-9][A-Za-z0-9-]*)\/?$/.test(request.url)) {
request.url = '/';
}
server = staticserver;
}
server.serve(request, response, function(e, res) {
if (e && (e.status === 404)) {
staticserver.serveFile('404.html', 404, {}, request, response);
}
});
});
};
app.on('request', staticRequestHandler);
if (appssl) {
appssl.on('request', staticRequestHandler);
}
})();
} catch (e) {
console.log('Could not start node-static - try `npm install` if you want to use it');
}
// SockJS server
// This is the main server that handles users connecting to our server
// and doing things on our server.
var sockjs = require('sockjs');
var server = sockjs.createServer({
sockjs_url: "//play.pokemonshowdown.com/js/lib/sockjs-0.3.min.js",
log: function(severity, message) {
if (severity === 'error') console.log('ERROR: '+message);
},
prefix: '/showdown',
websocket: !config.disablewebsocket
});
// Make `app`, `appssl`, and `server` available to the console.
global.App = app;
global.AppSSL = appssl;
global.Server = server;
var sockets = {};
var channels = {};
process.on('message', function(data) {
console.log('worker received: '+data);
var socket = null;
var socketid = null;
var channelid = null;
switch (data.charAt(0)) {
case '!': // !socketid
// destroy
socketid = data.substr(1);
socket = sockets[socketid];
if (!socket) return;
socket.end();
// After sending the FIN packet, we make sure the I/O is totally blocked for this socket
socket.destroy();
delete sockets[socketid];
for (channelid in channels) {
delete channels[channelid][socketid];
}
break;
case '>': // >socketid, message
// message
var nlLoc = data.indexOf('\n');
socket = sockets[data.substr(1, nlLoc-1)];
if (!socket) return;
socket.write(data.substr(nlLoc+1));
break;
case '#': // #channelid, message
// message to channel
var nlLoc = data.indexOf('\n');
channel = channels[data.substr(1, nlLoc-1)];
var message = data.substr(nlLoc+1);
for (socketid in channel) {
channel[socketid].write(message);
}
break;
case '+': // +channelid, socketid
// add to channel
var nlLoc = data.indexOf('\n');
socketid = data.substr(nlLoc+1);
socket = sockets[socketid];
if (!socket) return;
channelid = data.substr(1, nlLoc-1);
var channel = channels[channelid];
if (!channel) channel = channels[channelid] = {};
channel[socketid] = socket;
break;
case '-': // -channelid, socketid
// remove from channel
var nlLoc = data.indexOf('\n');
var channel = channels[data.substr(1, nlLoc-1)];
if (!channel) return;
delete channel[data.substr(nlLoc+1)];
break;
}
});
// this is global so it can be hotpatched if necessary
var isTrustedProxyIp = Cidr.checker(config.proxyip);
var socketCounter = 0;
server.on('connection', function(socket) {
if (!socket) {
// For reasons that are not entirely clear, SockJS sometimes triggers
// this event with a null `socket` argument.
return;
} else if (!socket.remoteAddress) {
// This condition occurs several times per day. It may be a SockJS bug.
try {
socket.end();
} catch (e) {}
return;
}
var socketid = socket.id = (++socketCounter);
sockets[socket.id] = socket;
if (isTrustedProxyIp(socket.remoteAddress)) {
var ips = (socket.headers['x-forwarded-for'] || '').split(',');
var ip;
while (ip = ips.pop()) {
ip = ip.trim();
if (!isTrustedProxyIp(ip)) {
socket.remoteAddress = ip;
break;
}
}
}
process.send('*'+socketid+'\n'+socket.remoteAddress);
// console.log('CONNECT: '+socket.remoteAddress+' ['+socket.id+']');
var interval;
if (config.herokuhack) {
// see https://github.com/sockjs/sockjs-node/issues/57#issuecomment-5242187
interval = setInterval(function() {
try {
socket._session.recv.didClose();
} catch (e) {}
}, 15000);
}
socket.on('data', function(message) {
process.send('<'+socketid+'\n'+message);
});
socket.on('close', function() {
if (interval) {
clearInterval(interval);
}
process.send('!'+socketid);
});
});
server.installHandlers(app, {});
console.log('Server starting on port ' + config.port);
app.listen(config.port);
if (appssl) {
server.installHandlers(appssl, {});
appssl.listen(config.ssl.port);
}
console.log('Server started on port ' + config.port);
if (appssl) {
console.log('SSL server started on port ' + config.ssl.port);
}
console.log('Test your server at http://localhost:' + config.port);
}