mirror of
https://github.com/smogon/pokemon-showdown.git
synced 2026-03-21 17:25:10 -05:00
Some checks failed
Node.js CI / build (18.x) (push) Has been cancelled
This minimizes side effects of import/require across the codebase, and lets the caller be responsible of initializing child processeses, as well as other async logic, such as restoring saved battles.
573 lines
17 KiB
TypeScript
573 lines
17 KiB
TypeScript
/**
|
|
* Connections
|
|
* Pokemon Showdown - http://pokemonshowdown.com/
|
|
*
|
|
* Abstraction layer for multi-process SockJS connections.
|
|
*
|
|
* This file handles all the communications between the users'
|
|
* browsers, the networking processes, and users.ts in the
|
|
* main process.
|
|
*
|
|
* @license MIT
|
|
*/
|
|
|
|
import * as fs from 'fs';
|
|
import * as http from 'http';
|
|
import * as https from 'https';
|
|
import * as path from 'path';
|
|
import * as ConfigLoader from './config-loader';
|
|
import { crashlogger, ProcessManager, Streams } from '../lib';
|
|
import { IPTools } from './ip-tools';
|
|
import { type ChannelID, extractChannelMessages } from '../sim/battle';
|
|
import { StaticServer } from '../lib/static-server';
|
|
|
|
type StreamWorker = ProcessManager.StreamWorker;
|
|
|
|
export const Sockets = new class {
|
|
async onSpawn(worker: StreamWorker) {
|
|
const id = worker.workerid;
|
|
for await (const data of worker.stream) {
|
|
switch (data.charAt(0)) {
|
|
case '*': {
|
|
// *socketid, ip, protocol
|
|
// connect
|
|
worker.load++;
|
|
const [socketid, ip, protocol] = data.substr(1).split('\n');
|
|
Users.socketConnect(worker, id, socketid, ip, protocol);
|
|
break;
|
|
}
|
|
|
|
case '!': {
|
|
// !socketid
|
|
// disconnect
|
|
worker.load--;
|
|
const socketid = data.substr(1);
|
|
Users.socketDisconnect(worker, id, socketid);
|
|
break;
|
|
}
|
|
|
|
case '<': {
|
|
// <socketid, message
|
|
// message
|
|
const idx = data.indexOf('\n');
|
|
const socketid = data.substr(1, idx - 1);
|
|
const message = data.substr(idx + 1);
|
|
Users.socketReceive(worker, id, socketid, message);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
// unhandled
|
|
}
|
|
}
|
|
}
|
|
onUnspawn(this: void, worker: StreamWorker) {
|
|
Users.socketDisconnectAll(worker, worker.workerid);
|
|
}
|
|
|
|
listen(port?: number, bindAddress?: string, processesCount?: ConfigLoader.SubProcessesConfig) {
|
|
if (port !== undefined && !isNaN(port)) {
|
|
Config.port = port;
|
|
Config.ssl = null;
|
|
} else {
|
|
port = Config.port;
|
|
|
|
// Autoconfigure when running in cloud environments.
|
|
try {
|
|
const cloudenv = (require as any)('cloud-env');
|
|
bindAddress = cloudenv.get('IP', bindAddress);
|
|
port = cloudenv.get('PORT', port);
|
|
} catch {}
|
|
}
|
|
if (bindAddress !== undefined) {
|
|
Config.bindaddress = bindAddress;
|
|
}
|
|
if (port !== undefined) {
|
|
Config.port = port;
|
|
}
|
|
const workerCount = processesCount?.['network'] ?? 1;
|
|
|
|
PM.env = { PSPORT: Config.port, PSBINDADDR: Config.bindaddress || '0.0.0.0', PSNOSSL: Config.ssl ? 0 : 1 };
|
|
PM.subscribeSpawn(worker => void this.onSpawn(worker));
|
|
PM.subscribeUnspawn(this.onUnspawn);
|
|
|
|
PM.spawn(workerCount);
|
|
}
|
|
|
|
socketSend(worker: StreamWorker, socketid: string, message: string) {
|
|
void worker.stream.write(`>${socketid}\n${message}`);
|
|
}
|
|
|
|
socketDisconnect(worker: StreamWorker, socketid: string) {
|
|
void worker.stream.write(`!${socketid}`);
|
|
}
|
|
|
|
roomBroadcast(roomid: RoomID, message: string) {
|
|
for (const worker of PM.workers) {
|
|
void worker.stream.write(`#${roomid}\n${message}`);
|
|
}
|
|
}
|
|
|
|
roomAdd(worker: StreamWorker, roomid: RoomID, socketid: string) {
|
|
void worker.stream.write(`+${roomid}\n${socketid}`);
|
|
}
|
|
|
|
roomRemove(worker: StreamWorker, roomid: RoomID, socketid: string) {
|
|
void worker.stream.write(`-${roomid}\n${socketid}`);
|
|
}
|
|
|
|
channelBroadcast(roomid: RoomID, message: string) {
|
|
for (const worker of PM.workers) {
|
|
void worker.stream.write(`:${roomid}\n${message}`);
|
|
}
|
|
}
|
|
|
|
channelMove(worker: StreamWorker, roomid: RoomID, channelid: ChannelID, socketid: string) {
|
|
void worker.stream.write(`.${roomid}\n${channelid}\n${socketid}`);
|
|
}
|
|
|
|
eval(worker: StreamWorker, query: string) {
|
|
void worker.stream.write(`$${query}`);
|
|
}
|
|
|
|
start(processCount: ConfigLoader.SubProcessesConfig) {
|
|
start(processCount);
|
|
}
|
|
};
|
|
|
|
export class ServerStream extends Streams.ObjectReadWriteStream<string> {
|
|
/** socketid:Connection */
|
|
sockets = new Map<string, import('sockjs').Connection>();
|
|
/** roomid:socketid:Connection */
|
|
rooms = new Map<RoomID, Map<string, import('sockjs').Connection>>();
|
|
/** roomid:socketid:channelid */
|
|
roomChannels = new Map<RoomID, Map<string, ChannelID>>();
|
|
|
|
server: http.Server;
|
|
serverSsl: https.Server | null;
|
|
socketCounter = 0;
|
|
|
|
isTrustedProxyIp: (ip: string) => boolean;
|
|
|
|
receivers: { [k: string]: (this: ServerStream, data: string) => void } = {
|
|
'$'(data) {
|
|
// $code
|
|
// eslint-disable-next-line no-eval
|
|
eval(data.substr(1));
|
|
},
|
|
'!'(data) {
|
|
// !socketid
|
|
// destroy
|
|
const socketid = data.substr(1);
|
|
const socket = this.sockets.get(socketid);
|
|
if (!socket) return;
|
|
socket.destroy();
|
|
this.sockets.delete(socketid);
|
|
for (const [curRoomid, curRoom] of this.rooms) {
|
|
curRoom.delete(socketid);
|
|
const roomChannel = this.roomChannels.get(curRoomid);
|
|
if (roomChannel) roomChannel.delete(socketid);
|
|
if (!curRoom.size) {
|
|
this.rooms.delete(curRoomid);
|
|
if (roomChannel) this.roomChannels.delete(curRoomid);
|
|
}
|
|
}
|
|
},
|
|
'>'(data) {
|
|
// >socketid, message
|
|
// message to single connection
|
|
const nlLoc = data.indexOf('\n');
|
|
const socketid = data.substr(1, nlLoc - 1);
|
|
const socket = this.sockets.get(socketid);
|
|
if (!socket) return;
|
|
const message = data.substr(nlLoc + 1);
|
|
socket.write(message);
|
|
},
|
|
'#'(data) {
|
|
// #roomid, message
|
|
// message to all connections in room
|
|
// #, message
|
|
// message to all connections
|
|
const nlLoc = data.indexOf('\n');
|
|
const roomid = data.substr(1, nlLoc - 1) as RoomID;
|
|
const room = roomid ? this.rooms.get(roomid) : this.sockets;
|
|
if (!room) return;
|
|
const message = data.substr(nlLoc + 1);
|
|
for (const curSocket of room.values()) curSocket.write(message);
|
|
},
|
|
'+'(data) {
|
|
// +roomid, socketid
|
|
// join room with connection
|
|
const nlLoc = data.indexOf('\n');
|
|
const socketid = data.substr(nlLoc + 1);
|
|
const socket = this.sockets.get(socketid);
|
|
if (!socket) return;
|
|
const roomid = data.substr(1, nlLoc - 1) as RoomID;
|
|
let room = this.rooms.get(roomid);
|
|
if (!room) {
|
|
room = new Map();
|
|
this.rooms.set(roomid, room);
|
|
}
|
|
room.set(socketid, socket);
|
|
},
|
|
'-'(data) {
|
|
// -roomid, socketid
|
|
// leave room with connection
|
|
const nlLoc = data.indexOf('\n');
|
|
const roomid = data.slice(1, nlLoc) as RoomID;
|
|
const room = this.rooms.get(roomid);
|
|
if (!room) return;
|
|
const socketid = data.slice(nlLoc + 1);
|
|
room.delete(socketid);
|
|
const roomChannel = this.roomChannels.get(roomid);
|
|
if (roomChannel) roomChannel.delete(socketid);
|
|
if (!room.size) {
|
|
this.rooms.delete(roomid);
|
|
if (roomChannel) this.roomChannels.delete(roomid);
|
|
}
|
|
},
|
|
'.'(data) {
|
|
// .roomid, channelid, socketid
|
|
// move connection to different channel in room
|
|
const nlLoc = data.indexOf('\n');
|
|
const roomid = data.slice(1, nlLoc) as RoomID;
|
|
const nlLoc2 = data.indexOf('\n', nlLoc + 1);
|
|
const channelid = Number(data.slice(nlLoc + 1, nlLoc2)) as ChannelID;
|
|
const socketid = data.slice(nlLoc2 + 1);
|
|
|
|
let roomChannel = this.roomChannels.get(roomid);
|
|
if (!roomChannel) {
|
|
roomChannel = new Map();
|
|
this.roomChannels.set(roomid, roomChannel);
|
|
}
|
|
if (channelid === 0) {
|
|
roomChannel.delete(socketid);
|
|
} else {
|
|
roomChannel.set(socketid, channelid);
|
|
}
|
|
},
|
|
':'(data) {
|
|
// :roomid, message
|
|
// message to a room, splitting `|split` by channel
|
|
const nlLoc = data.indexOf('\n');
|
|
const roomid = data.slice(1, nlLoc) as RoomID;
|
|
const room = this.rooms.get(roomid);
|
|
if (!room) return;
|
|
|
|
const messages: [string | null, string | null, string | null, string | null, string | null] = [
|
|
null, null, null, null, null,
|
|
];
|
|
const message = data.substr(nlLoc + 1);
|
|
const channelMessages = extractChannelMessages(message, [0, 1, 2, 3, 4]);
|
|
const roomChannel = this.roomChannels.get(roomid);
|
|
for (const [curSocketid, curSocket] of room) {
|
|
const channelid = roomChannel?.get(curSocketid) || 0;
|
|
if (!messages[channelid]) messages[channelid] = channelMessages[channelid].join('\n');
|
|
curSocket.write(messages[channelid]);
|
|
}
|
|
},
|
|
};
|
|
|
|
constructor(config: {
|
|
port: number,
|
|
bindaddress?: string,
|
|
ssl?: typeof Config.ssl,
|
|
wsdeflate?: typeof Config.wsdeflate,
|
|
proxyip?: typeof Config.proxyip,
|
|
customhttpresponse?: typeof Config.customhttpresponse,
|
|
}) {
|
|
super();
|
|
if (!config.bindaddress) config.bindaddress = '0.0.0.0';
|
|
|
|
this.isTrustedProxyIp = config.proxyip ? IPTools.checker(config.proxyip) : () => false;
|
|
|
|
// Static HTTP server
|
|
|
|
// This handles the custom CSS and custom avatar features, and also
|
|
// redirects yourserver:8001 to yourserver-8001.psim.us
|
|
|
|
// It's optional if you don't need these features.
|
|
|
|
this.server = http.createServer();
|
|
this.serverSsl = null;
|
|
if (config.ssl) {
|
|
let key;
|
|
try {
|
|
key = path.resolve(__dirname, config.ssl.options.key);
|
|
if (!fs.statSync(key).isFile()) throw new Error();
|
|
try {
|
|
key = fs.readFileSync(key);
|
|
} catch (e: any) {
|
|
crashlogger(
|
|
new Error(`Failed to read the configured SSL private key PEM file:\n${e.stack}`),
|
|
`Socket process ${process.pid}`
|
|
);
|
|
}
|
|
} catch {
|
|
console.warn('SSL private key config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.');
|
|
key = config.ssl.options.key;
|
|
}
|
|
|
|
let cert;
|
|
try {
|
|
cert = path.resolve(__dirname, config.ssl.options.cert);
|
|
if (!fs.statSync(cert).isFile()) throw new Error();
|
|
try {
|
|
cert = fs.readFileSync(cert);
|
|
} catch (e: any) {
|
|
crashlogger(
|
|
new Error(`Failed to read the configured SSL certificate PEM file:\n${e.stack}`),
|
|
`Socket process ${process.pid}`
|
|
);
|
|
}
|
|
} catch {
|
|
console.warn('SSL certificate config values will not support HTTPS server option values in the future. Please set it to use the absolute path of its PEM file.');
|
|
cert = config.ssl.options.cert;
|
|
}
|
|
|
|
if (key && cert) {
|
|
try {
|
|
// In case there are additional SSL config settings besides the key and cert...
|
|
this.serverSsl = https.createServer({ ...config.ssl.options, key, cert });
|
|
} catch (e: any) {
|
|
crashlogger(new Error(`The SSL settings are misconfigured:\n${e.stack}`), `Socket process ${process.pid}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Static server
|
|
try {
|
|
const roomidRegex = /^\/(?:[A-Za-z0-9][A-Za-z0-9-]*)\/?$/;
|
|
const cssServer = new StaticServer('./config');
|
|
const avatarServer = new StaticServer('./config/avatars');
|
|
const staticServer = new StaticServer('./server/static');
|
|
const staticRequestHandler = (req: http.IncomingMessage, res: http.ServerResponse) => {
|
|
// console.log(`static rq: ${req.socket.remoteAddress}:${req.socket.remotePort} -> ${req.socket.localAddress}:${req.socket.localPort} - ${req.method} ${req.url} ${req.httpVersion} - ${req.rawHeaders.join('|')}`);
|
|
req.resume();
|
|
req.addListener('end', () => {
|
|
if (config.customhttpresponse?.(req, res)) {
|
|
return;
|
|
}
|
|
|
|
let server = staticServer;
|
|
if (req.url) {
|
|
if (req.url === '/custom.css' || req.url.startsWith('/custom.css?')) {
|
|
server = cssServer;
|
|
} else if (req.url.startsWith('/avatars/')) {
|
|
req.url = req.url.slice(8);
|
|
server = avatarServer;
|
|
} else if (roomidRegex.test(req.url)) {
|
|
req.url = '/';
|
|
}
|
|
}
|
|
|
|
void server.serve(req, res, e => {
|
|
if (e.status === 404) {
|
|
void staticServer.serveFile('404.html', 404, {}, req, res);
|
|
return true;
|
|
}
|
|
});
|
|
});
|
|
};
|
|
|
|
this.server.on('request', staticRequestHandler);
|
|
if (this.serverSsl) this.serverSsl.on('request', staticRequestHandler);
|
|
} catch {
|
|
console.log('Could not start static server');
|
|
}
|
|
|
|
// SockJS server
|
|
|
|
// This is the main server that handles users connecting to our server
|
|
// and doing things on our server.
|
|
|
|
const sockjs: typeof import('sockjs') = (require as any)('sockjs');
|
|
const options: import('sockjs').ServerOptions & { faye_server_options?: { [key: string]: any } } = {
|
|
sockjs_url: `//play.pokemonshowdown.com/js/lib/sockjs-1.4.0-nwjsfix.min.js`,
|
|
prefix: '/showdown',
|
|
log(severity: string, message: string) {
|
|
if (severity === 'error') console.log(`ERROR: ${message}`);
|
|
},
|
|
};
|
|
|
|
if (config.wsdeflate !== null) {
|
|
try {
|
|
const deflate = (require as any)('permessage-deflate').configure(config.wsdeflate);
|
|
options.faye_server_options = { extensions: [deflate] };
|
|
} catch {
|
|
crashlogger(
|
|
new Error("Dependency permessage-deflate is not installed or is otherwise unaccessable. No message compression will take place until server restart."),
|
|
"Sockets"
|
|
);
|
|
}
|
|
}
|
|
|
|
const server = sockjs.createServer(options);
|
|
|
|
process.once('disconnect', () => this.cleanup());
|
|
process.once('exit', () => this.cleanup());
|
|
|
|
// this is global so it can be hotpatched if necessary
|
|
server.on('connection', connection => this.onConnection(connection));
|
|
server.installHandlers(this.server, {});
|
|
this.server.listen(config.port, config.bindaddress);
|
|
console.log(`Worker ${PM.workerid} now listening on ${config.bindaddress}:${config.port}`);
|
|
|
|
if (this.serverSsl) {
|
|
server.installHandlers(this.serverSsl, {});
|
|
// @ts-expect-error if appssl exists, then `config.ssl` must also exist
|
|
this.serverSsl.listen(config.ssl.port, config.bindaddress);
|
|
// @ts-expect-error if appssl exists, then `config.ssl` must also exist
|
|
console.log(`Worker ${PM.workerid} now listening for SSL on port ${config.ssl.port}`);
|
|
}
|
|
|
|
console.log(`Test your server at http://${config.bindaddress === '0.0.0.0' ? 'localhost' : config.bindaddress}:${config.port}`);
|
|
}
|
|
|
|
/**
|
|
* Clean up any remaining connections on disconnect. If this isn't done,
|
|
* the process will not exit until any remaining connections have been destroyed.
|
|
* Afterwards, the worker process will die on its own
|
|
*/
|
|
cleanup() {
|
|
for (const socket of this.sockets.values()) {
|
|
try {
|
|
socket.destroy();
|
|
} catch {}
|
|
}
|
|
this.sockets.clear();
|
|
this.rooms.clear();
|
|
this.roomChannels.clear();
|
|
|
|
this.server.close();
|
|
if (this.serverSsl) this.serverSsl.close();
|
|
|
|
// Let the server(s) finish closing.
|
|
setImmediate(() => process.exit(0));
|
|
}
|
|
|
|
onConnection(socket: import('sockjs').Connection) {
|
|
// For reasons that are not entirely clear, SockJS sometimes triggers
|
|
// this event with a null `socket` argument.
|
|
if (!socket) return;
|
|
|
|
if (!socket.remoteAddress) {
|
|
// SockJS sometimes fails to be able to cache the IP, port, and
|
|
// address from connection request headers.
|
|
try {
|
|
socket.destroy();
|
|
} catch {}
|
|
return;
|
|
}
|
|
|
|
const socketid = `${++this.socketCounter}`;
|
|
this.sockets.set(socketid, socket);
|
|
|
|
let socketip = socket.remoteAddress;
|
|
if (this.isTrustedProxyIp(socketip)) {
|
|
const ips = (socket.headers['x-forwarded-for'] || '').split(',').reverse();
|
|
for (const ip of ips) {
|
|
const proxy = ip.trim();
|
|
if (!this.isTrustedProxyIp(proxy)) {
|
|
socketip = proxy;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
this.push(`*${socketid}\n${socketip}\n${socket.protocol}`);
|
|
|
|
socket.on('data', message => {
|
|
// drop empty messages (DDoS?)
|
|
if (!message) return;
|
|
// drop messages over 100KB
|
|
if (message.length > (100 * 1024)) {
|
|
socket.write(`|popup|Your message must be below 100KB`);
|
|
console.log(`Dropping client message ${message.length / 1024} KB...`);
|
|
console.log(message.slice(0, 160));
|
|
return;
|
|
}
|
|
// drop legacy JSON messages
|
|
if (typeof message !== 'string' || message.startsWith('{')) return;
|
|
// drop blank messages (DDoS?)
|
|
const pipeIndex = message.indexOf('|');
|
|
if (pipeIndex < 0 || pipeIndex === message.length - 1) return;
|
|
|
|
this.push(`<${socketid}\n${message}`);
|
|
});
|
|
|
|
socket.once('close', () => {
|
|
this.push(`!${socketid}`);
|
|
this.sockets.delete(socketid);
|
|
for (const room of this.rooms.values()) room.delete(socketid);
|
|
});
|
|
}
|
|
|
|
override _write(data: string) {
|
|
// console.log('worker received: ' + data);
|
|
|
|
const receiver = this.receivers[data.charAt(0)];
|
|
if (receiver) receiver.call(this, data);
|
|
}
|
|
}
|
|
|
|
/*********************************************************
|
|
* Process manager
|
|
*********************************************************/
|
|
|
|
export const PM = new ProcessManager.RawProcessManager({
|
|
id: 'sockets',
|
|
module,
|
|
setupChild: () => new ServerStream(Config),
|
|
isCluster: true,
|
|
});
|
|
|
|
if (!PM.isParentProcess) {
|
|
ConfigLoader.ensureLoaded();
|
|
if (Config.crashguard) {
|
|
// graceful crash - allow current battles to finish before restarting
|
|
process.on('uncaughtException', err => {
|
|
crashlogger(err, `Socket process ${PM.workerid} (${process.pid})`);
|
|
});
|
|
process.on('unhandledRejection', err => {
|
|
crashlogger(err as any || {}, `Socket process ${PM.workerid} (${process.pid}) Promise`);
|
|
});
|
|
}
|
|
|
|
if (Config.ofesockets) {
|
|
try {
|
|
require.resolve('node-oom-heapdump');
|
|
} catch (e: any) {
|
|
if (e.code !== 'MODULE_NOT_FOUND') throw e; // should never happen
|
|
throw new Error(
|
|
'node-oom-heapdump is not installed, but it is a required dependency if Config.ofesockets is set to true! ' +
|
|
'Run npm install node-oom-heapdump and restart the server.'
|
|
);
|
|
}
|
|
|
|
// Create a heapdump if the process runs out of memory.
|
|
(global as any).nodeOomHeapdump = (require as any)('node-oom-heapdump')({
|
|
addTimestamp: true,
|
|
});
|
|
}
|
|
|
|
// setup worker
|
|
if (process.env.PSPORT) Config.port = +process.env.PSPORT;
|
|
if (process.env.PSBINDADDR) Config.bindaddress = process.env.PSBINDADDR;
|
|
if (process.env.PSNOSSL && parseInt(process.env.PSNOSSL)) Config.ssl = null;
|
|
|
|
// eslint-disable-next-line no-eval
|
|
PM.startRepl({ filename: `sockets-${PM.workerid}-${process.pid}`, eval: cmd => eval(cmd) });
|
|
}
|
|
|
|
function start(processCount: ConfigLoader.SubProcessesConfig) {
|
|
let port;
|
|
for (const arg of process.argv) {
|
|
if (/^[0-9]+$/.test(arg)) {
|
|
port = parseInt(arg);
|
|
break;
|
|
}
|
|
}
|
|
Sockets.listen(port, undefined, processCount);
|
|
}
|