Use server sent events for presence monitoring

This commit is contained in:
Samuel Elliott 2022-11-16 12:10:42 +00:00
parent 4ac5f7f0ea
commit a52c45af36
No known key found for this signature in database
GPG Key ID: 8420C7CDE43DC4D6
4 changed files with 197 additions and 3 deletions

27
package-lock.json generated
View File

@ -16,6 +16,7 @@
"dotenv": "^16.0.3",
"dotenv-expand": "^9.0.0",
"env-paths": "^3.0.0",
"eventsource": "^2.0.2",
"express": "^4.18.2",
"mkdirp": "^1.0.4",
"node-fetch": "^3.2.10",
@ -43,6 +44,7 @@
"@types/cli-table": "^0.3.1",
"@types/debug": "^4.1.7",
"@types/discord-rpc": "^4.0.3",
"@types/eventsource": "^1.1.10",
"@types/express": "^4.17.14",
"@types/mkdirp": "^1.0.2",
"@types/node": "^18.11.4",
@ -546,6 +548,12 @@
"integrity": "sha512-EYNwp3bU+98cpU4lAWYYL7Zz+2gryWH1qbdDTidVd6hkiR6weksdbMadyXKXNPEkQFhXM+hVO9ZygomHXp+AIw==",
"dev": true
},
"node_modules/@types/eventsource": {
"version": "1.1.10",
"resolved": "https://registry.npmjs.org/@types/eventsource/-/eventsource-1.1.10.tgz",
"integrity": "sha512-rYzRmJSnm44Xb7FICRXEjwe/26ZiiS+VMGmuD17PevMP56cGgLEsaM955sYQW0S+K7h+mPOL70vGf1hi4WDjVA==",
"dev": true
},
"node_modules/@types/express": {
"version": "4.17.14",
"resolved": "https://registry.npmjs.org/@types/express/-/express-4.17.14.tgz",
@ -2204,6 +2212,14 @@
"node": ">= 0.6"
}
},
"node_modules/eventsource": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz",
"integrity": "sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==",
"engines": {
"node": ">=12.0.0"
}
},
"node_modules/express": {
"version": "4.18.2",
"resolved": "https://registry.npmjs.org/express/-/express-4.18.2.tgz",
@ -5099,6 +5115,12 @@
"integrity": "sha512-EYNwp3bU+98cpU4lAWYYL7Zz+2gryWH1qbdDTidVd6hkiR6weksdbMadyXKXNPEkQFhXM+hVO9ZygomHXp+AIw==",
"dev": true
},
"@types/eventsource": {
"version": "1.1.10",
"resolved": "https://registry.npmjs.org/@types/eventsource/-/eventsource-1.1.10.tgz",
"integrity": "sha512-rYzRmJSnm44Xb7FICRXEjwe/26ZiiS+VMGmuD17PevMP56cGgLEsaM955sYQW0S+K7h+mPOL70vGf1hi4WDjVA==",
"dev": true
},
"@types/express": {
"version": "4.17.14",
"resolved": "https://registry.npmjs.org/@types/express/-/express-4.17.14.tgz",
@ -6438,6 +6460,11 @@
"resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz",
"integrity": "sha512-aIL5Fx7mawVa300al2BnEE4iNvo1qETxLrPI/o05L7z6go7fCw1J6EQmbK4FmJ2AS7kgVF/KEZWufBfdClMcPg=="
},
"eventsource": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz",
"integrity": "sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA=="
},
"express": {
"version": "4.18.2",
"resolved": "https://registry.npmjs.org/express/-/express-4.18.2.tgz",

View File

@ -42,6 +42,7 @@
"dotenv": "^16.0.3",
"dotenv-expand": "^9.0.0",
"env-paths": "^3.0.0",
"eventsource": "^2.0.2",
"express": "^4.18.2",
"mkdirp": "^1.0.4",
"node-fetch": "^3.2.10",
@ -66,6 +67,7 @@
"@types/cli-table": "^0.3.1",
"@types/debug": "^4.1.7",
"@types/discord-rpc": "^4.0.3",
"@types/eventsource": "^1.1.10",
"@types/express": "^4.17.14",
"@types/mkdirp": "^1.0.2",
"@types/node": "^18.11.4",

View File

@ -1,15 +1,19 @@
import createDebug from 'debug';
import EventSource from 'eventsource';
import { DiscordRpcClient, findDiscordRpcClient } from '../discord/rpc.js';
import { getDiscordPresence, getInactiveDiscordPresence } from '../discord/util.js';
import { DiscordPresencePlayTime, DiscordPresenceContext, DiscordPresence, ExternalMonitorConstructor, ExternalMonitor, ErrorResult } from '../discord/types.js';
import { EmbeddedSplatNet2Monitor, handleError, ZncNotifications } from './notify.js';
import { getPresenceFromUrl } from '../api/znc-proxy.js';
import { ActiveEvent, CurrentUser, Friend, Game, Presence, PresenceState, CoralErrorResponse } from '../api/coral-types.js';
import { ErrorResponse } from '../api/util.js';
import { ErrorResponse, ResponseSymbol } from '../api/util.js';
import Loop, { LoopResult } from '../util/loop.js';
import { getTitleIdFromEcUrl } from '../index.js';
import { parseLinkHeader } from '../util/http.js';
import { getUserAgent } from '../util/useragent.js';
const debug = createDebug('nxapi:nso:presence');
const debugEventStream = createDebug('nxapi:nso:presence:sse');
const debugDiscord = createDebug('nxapi:nso:presence:discordrpc');
const debugSplatnet2 = createDebug('nxapi:nso:presence:splatnet2');
@ -246,7 +250,7 @@ class ZncDiscordPresenceClient {
let i = ++this.i;
while (attempts < MAX_CONNECT_ATTEMPTS) {
if (this.i !== i) return;
if (this.i !== i || client!) return;
if (attempts === 0) debugDiscord('RPC connecting', client_id, i);
else debugDiscord('RPC connecting, attempt %d', attempts + 1, client_id, i);
@ -518,6 +522,10 @@ export class ZncProxyDiscordPresence extends Loop {
readonly discord = new ZncDiscordPresenceClient(this);
is_first_request = true;
is_sse = false;
eventstream_url: string | null = null;
last_data: unknown | null = null;
constructor(
@ -537,13 +545,36 @@ export class ZncProxyDiscordPresence extends Loop {
protected proxy_temporary_errors = 0;
async update() {
if (this.is_sse) {
return await this.useEventStream();
}
try {
const [presence, user, data] = await getPresenceFromUrl(this.presence_url);
const result = await getPresenceFromUrl(this.presence_url);
const [presence, user, data] = result;
this.last_data = data;
this.proxy_temporary_errors = 0;
await this.discord.updatePresenceForDiscord(presence, user);
await this.updatePresenceForSplatNet2Monitor(presence, this.presence_url);
if (this.is_first_request) {
const link_header = result[ResponseSymbol].headers.get('Link');
const links = link_header ? parseLinkHeader(link_header) : [];
debug('presence links', links);
const eventstream_link = links.find(l => l.rel.includes('alternate') && l.type === 'text/event-stream');
if (eventstream_link) {
this.eventstream_url = new URL(eventstream_link.uri, this.presence_url).href;
this.is_sse = true;
debug('Presence URL included server-sent events link, switching now', this.eventstream_url);
}
this.is_first_request = false;
// Connect to the event stream immediately
if (eventstream_link) return LoopResult.OK_SKIP_INTERVAL;
}
} catch (err) {
if (err instanceof ErrorResponse) {
const retry_after = err.response.headers.get('Retry-After');
@ -567,6 +598,93 @@ export class ZncProxyDiscordPresence extends Loop {
}
}
async useEventStream() {
const events = new EventSource(this.eventstream_url ?? this.presence_url, {
headers: {
'User-Agent': getUserAgent(),
},
});
// Fix emitting "message" event for all messages
// @ts-ignore
const _emit = events.emit;
// @ts-ignore
events.emit = (type: string, event: MessageEvent, ...args: any[]) => {
if (event.data && type !== 'message') {
_emit.call(events, 'message', event, ...args);
}
return _emit.call(events, type, event, ...args);
};
// @ts-ignore
const _listeners = events.listeners;
// @ts-ignore
events.listeners = (type: string, ...args: any[]) => {
let a = null;
if (type !== 'message' && type !== 'open' && type !== 'error') {
a = _listeners.call(events, 'message', ...args);
}
return a ? [...a, ..._listeners.call(events, type, ...args)] : _listeners.call(events, type, ...args);
};
events.onopen = event => {
debugEventStream('EventSource connected', event);
};
let user: CurrentUser | Friend | undefined = undefined;
let presence: Presence | null = null;
events.onmessage = event => {
if (event.type === 'message') {
debugEventStream('Received debug message', event.data);
} else if (event.type === 'update') {
debugEventStream('Received presence updated message', event.data);
} else {
const data = JSON.parse(event.data);
debugEventStream('Received updated %s data', event.type, data);
Object.assign(this.last_data!, {[event.type]: data});
if (event.type === 'user' || event.type === 'friend') {
user = data;
presence = data.presence;
}
if (event.type === 'presence') {
presence = data;
}
if (presence) {
this.discord.updatePresenceForDiscord(presence, user);
this.updatePresenceForSplatNet2Monitor(presence, this.presence_url);
}
}
};
return new Promise<void>((rs, rj) => {
this.timeout_resolve = () => {
debugEventStream('Update interval cancelled, closing event stream');
events.close();
rs();
};
events.onerror = event => {
debugEventStream('EventSource error', event);
events.close();
if ((event as any).message) {
const err = new Error((event as any).message);
Object.assign(err, event);
rj(err);
} else {
// No error message
rs();
}
};
});
}
async onStop() {
await this.discord.setActivity(null);
}

47
src/util/http.ts Normal file
View File

@ -0,0 +1,47 @@
//
// Parse HTTP Link headers
//
// Based on https://github.com/thlorenz/parse-link-header
//
function parseLink(link: string) {
const match = link.match(/<?([^>]*)>((;.*)*)/);
if (!match) return null;
const uri = match[1];
const parameters_str = match[2].split(';');
// Reuse URLSearchParams for link parameters
const parameters = new URLSearchParams();
for (const parameter of parameters_str) {
// rel="next" => 1: rel 2: next
const match = parameter.match(/\s*(.+)\s*=\s*("([^"]*)"|[^\b]+)?/);
if (!match) continue;
const key = match[1];
const value = match[3] ?? match[2];
parameters.append(key, value);
}
const rel = (parameters.get('rel') ?? '').split(' ').filter(r => r);
return {
uri,
parameters,
rel,
type: parameters.get('type'),
};
}
export function parseLinkHeader(link_header: string) {
const links = [];
for (const link_str of link_header.split(/,\s*</)) {
const link = parseLink(link_str);
if (link) links.push(link);
}
return links;
}