Add custom EventSource using fetch, automatically upgrade to SSE

This commit is contained in:
Samuel Elliott 2023-07-14 19:00:56 +01:00
parent eda75b4dee
commit fbc1bc1682
No known key found for this signature in database
GPG Key ID: 8420C7CDE43DC4D6
10 changed files with 307 additions and 60 deletions

14
package-lock.json generated
View File

@ -16,7 +16,6 @@
"dotenv": "^16.3.1",
"dotenv-expand": "^10.0.0",
"env-paths": "^3.0.0",
"eventsource": "^2.0.2",
"express": "^4.18.2",
"node-notifier": "^10.0.1",
"node-persist": "^3.1.3",
@ -2792,14 +2791,6 @@
"node": ">= 0.6"
}
},
"node_modules/eventsource": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz",
"integrity": "sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==",
"engines": {
"node": ">=12.0.0"
}
},
"node_modules/exponential-backoff": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/exponential-backoff/-/exponential-backoff-3.1.1.tgz",
@ -8064,11 +8055,6 @@
"resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz",
"integrity": "sha512-aIL5Fx7mawVa300al2BnEE4iNvo1qETxLrPI/o05L7z6go7fCw1J6EQmbK4FmJ2AS7kgVF/KEZWufBfdClMcPg=="
},
"eventsource": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz",
"integrity": "sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA=="
},
"exponential-backoff": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/exponential-backoff/-/exponential-backoff-3.1.1.tgz",

View File

@ -43,7 +43,6 @@
"dotenv": "^16.3.1",
"dotenv-expand": "^10.0.0",
"env-paths": "^3.0.0",
"eventsource": "^2.0.2",
"express": "^4.18.2",
"node-notifier": "^10.0.1",
"node-persist": "^3.1.3",

View File

@ -695,7 +695,7 @@ class Server extends HttpServer {
}
if (match.splatoon3_fest_team) break;
for (const player of team.preVotes.nodes) {
if (player.userIcon.url !== friend.userIcon.url) continue;
@ -705,7 +705,7 @@ class Server extends HttpServer {
};
break;
}
if (match.splatoon3_fest_team) break;
}

View File

@ -158,7 +158,7 @@ export async function handler(argv: ArgumentsCamelCase<Arguments>) {
try {
await new Promise(rs => sleep_timeout = setTimeout(sleep_resolve = rs, argv.updateInterval * 1000));
while (!should_exit) {
updating = true;
[vs, coop, album] = await update(argv, splatnet, directory, vs, coop, album);

View File

@ -181,7 +181,7 @@ export default class Coral {
oidc.getToken(),
oidc.getUser(),
]);
const {nso, data} = await CoralApi.createWithNintendoAccountToken(token, user);
const auth_data: SavedToken = {
@ -273,7 +273,7 @@ async function renewToken(
renew_token_data: {auth_data: SavedToken}, ratelimit = true
) {
// if (ratelimit) {
// const [jwt, sig] = Jwt.decode<NintendoAccountSessionTokenJwtPayload>(token);
// const [jwt, sig] = Jwt.decode<NintendoAccountSessionTokenJwtPayload>(token);
// await checkUseLimit(storage, 'coral', jwt.payload.sub, ratelimit);
// }

View File

@ -138,7 +138,7 @@ async function renewToken(
) {
let attempt;
if (ratelimit) {
const [jwt, sig] = Jwt.decode<NintendoAccountSessionTokenJwtPayload>(na_session_token);
const [jwt, sig] = Jwt.decode<NintendoAccountSessionTokenJwtPayload>(na_session_token);
attempt = await checkUseLimit(storage, 'coral', jwt.payload.sub, ratelimit);
}

View File

@ -72,7 +72,7 @@ export async function getPctlToken(storage: persist.LocalStorage, token: string,
debug('Using existing token');
await storage.setItem('NintendoAccountToken-pctl.' + existingToken.user.id, token);
const moon = MoonApi.createWithSavedToken(existingToken);
moon.onTokenExpired = createTokenExpiredHandler(storage, token, moon, {existingToken});

View File

@ -1,4 +1,5 @@
import EventSource from 'eventsource';
import { errors } from 'undici';
import EventSource, { ErrorEvent } from '../util/eventsource.js';
import { DiscordRpcClient, findDiscordRpcClient } from '../discord/rpc.js';
import { getDiscordPresence, getInactiveDiscordPresence } from '../discord/util.js';
import { DiscordPresencePlayTime, DiscordPresenceContext, DiscordPresence, ExternalMonitorConstructor, ExternalMonitor, ErrorResult } from '../discord/types.js';
@ -30,6 +31,7 @@ interface SavedPresence {
class ZncDiscordPresenceClient {
rpc: {client: DiscordRpcClient, id: string} | null = null;
connecting: string | null = null;
title: {id: string; since: number} | null = null;
monitors = new Map<ExternalMonitorConstructor<any>, ExternalMonitor>();
protected i = 0;
@ -245,7 +247,10 @@ class ZncDiscordPresenceClient {
}
if (!this.rpc) {
this.connect(client_id, this.m.discord_client_filter);
if (this.connecting !== client_id) {
this.connect(client_id, this.m.discord_client_filter).finally(() => this.connecting = null);
this.connecting = client_id;
}
} else {
if (typeof activity === 'string') this.rpc.client.clearActivity();
else this.rpc.client.setActivity(activity.activity);
@ -265,7 +270,7 @@ class ZncDiscordPresenceClient {
let i = ++this.i;
while (attempts < MAX_CONNECT_ATTEMPTS) {
if (this.i !== i || client!) return;
if (this.i !== i || client! || this.rpc) return;
if (attempts === 0) debugDiscord('RPC connecting', client_id, i);
else debugDiscord('RPC connecting, attempt %d', attempts + 1, client_id, i);
@ -524,7 +529,7 @@ export class ZncProxyDiscordPresence extends Loop {
readonly user_notifications = false;
readonly friend_notifications = false;
update_interval = 30;
upgrade_to_sse = process.env.NXAPI_PRESENCE_SSE === '1';
upgrade_to_sse = process.env.NXAPI_PRESENCE_SSE !== '0';
presence_user: null = null;
discord_preconnect = false;
@ -626,9 +631,7 @@ export class ZncProxyDiscordPresence extends Loop {
if (this.events) this.events.close();
const events = new EventSource(this.eventstream_url ?? this.presence_url, {
headers: {
'User-Agent': getUserAgent(),
},
useragent: getUserAgent(),
});
this.events = events;
@ -636,11 +639,10 @@ export class ZncProxyDiscordPresence extends Loop {
let timeout: NodeJS.Timeout;
let timeout_interval = 90000;
const ontimeout = () => {
const err = new Error('Timeout') as any;
err.type = 'error';
err[TemporaryErrorSymbol] = true;
Object.defineProperty(err, 'detail', {enumerable: false, value: err});
events.dispatchEvent(err);
const event = new ErrorEvent(new errors.RequestAbortedError('Timeout'));
// @ts-expect-error
event[TemporaryErrorSymbol] = true;
events.dispatchEvent(event);
};
events.onopen = event => {
@ -650,11 +652,10 @@ export class ZncProxyDiscordPresence extends Loop {
let user: CurrentUser | Friend | undefined = undefined;
let presence: Presence | null = null;
let supported_events: readonly string[] = ['friend'];
this.last_data = {};
const onmessage = (event: MessageEvent) => {
events.onAnyMessage = (event: MessageEvent) => {
clearTimeout(timeout);
timeout = setTimeout(ontimeout, timeout_interval);
@ -672,15 +673,6 @@ export class ZncProxyDiscordPresence extends Loop {
e !== 'message' && e !== 'update' &&
e !== 'supported_events');
debugEventStream('Received supported events message', new_supported_events);
for (const type of supported_events) {
events.removeEventListener(type, onmessage);
}
for (const type of new_supported_events) {
events.addEventListener(type, onmessage);
}
supported_events = new_supported_events;
return;
}
@ -703,11 +695,6 @@ export class ZncProxyDiscordPresence extends Loop {
}
};
events.onmessage = onmessage;
events.addEventListener('supported_events', onmessage);
events.addEventListener('update', onmessage);
events.addEventListener('friend', onmessage);
return new Promise<void>((rs, rj) => {
this.timeout_resolve = () => {
debugEventStream('Update interval cancelled, closing event stream');
@ -715,19 +702,14 @@ export class ZncProxyDiscordPresence extends Loop {
rs();
};
events.onerror = event => {
events.onerror = (event: ErrorEvent | MessageEvent) => {
debugEventStream('EventSource error', event);
events.close();
if (event instanceof Error) {
rj(event);
} else if ((event as any).message) {
const err = new Error((event as any).message);
Object.assign(err, event);
rj(err);
if (event instanceof MessageEvent) {
rj(new ErrorResponse('Received error in event stream', events.response!, event.data));
} else {
// No error message
rs();
rj(event.error);
}
};
});

View File

@ -36,9 +36,10 @@ export async function findDiscordRpcClient(
if (!socket) continue;
const client = new DiscordRpcClient({transport: 'ipc', ipc_socket: socket});
await client.connect(clientid);
try {
await client.connect(clientid);
if (filter.call(null, client, i)) return [i, client] as const;
await client.destroy();

279
src/util/eventsource.ts Normal file
View File

@ -0,0 +1,279 @@
import { Buffer } from 'node:buffer';
import { fetch, Headers, Response } from 'undici';
import createDebug from './debug.js';
const debug = createDebug('nxapi:util:eventsource');
export class ErrorEvent extends Event {
constructor(
readonly error: Error,
readonly message = error.message,
) {
super('error');
}
}
export enum EventSourceReadyState {
CONNECTING = 0,
OPEN = 1,
CLOSED = 2,
}
export interface EventSourceInit extends globalThis.EventSourceInit {
authorisation?: string | (() => string);
useragent?: string;
}
type Listener<T extends string> =
T extends 'error' ? [type: T, handler: (error: ErrorEvent) => void] :
T extends 'open' ? [type: T, handler: (event: Event) => void] :
[type: T, handler: (event: MessageEvent<string>) => void];
export default class EventSource {
protected _response: Response | null = null;
protected _controller: AbortController | null = null;
protected _closed = false;
protected _id: string | null = null;
protected _retry_after: number | null = null;
protected readonly _authorisation: string | (() => string) | null = null;
protected readonly _useragent: string | null = null;
readonly withCredentials = false;
onerror?: (error: ErrorEvent) => void;
onmessage?: (message: MessageEvent<string>) => void;
onopen?: (event: Event) => void;
onAnyMessage?: (message: MessageEvent<string>) => void;
protected readonly _listeners: Listener<string>[] = [];
constructor(readonly url: string, init?: EventSourceInit) {
if (init?.withCredentials) debug('init.withCredentials is not supported');
if (init?.authorisation) this._authorisation = init.authorisation;
if (init?.useragent) this._useragent = init.useragent;
this._connect();
}
get readyState(): EventSourceReadyState {
if (this._closed) return EventSourceReadyState.CLOSED;
if (this._response) return EventSourceReadyState.OPEN;
return EventSourceReadyState.CONNECTING;
}
get response(): Response | null {
return this._response;
}
addEventListener<T extends string>(event: T, handler: Listener<T>[1]) {
this._listeners.push([event, handler]);
}
removeEventListener<T extends string>(event: T, handler?: Listener<T>[1]) {
let index;
while ((index = this._listeners.findIndex(listener =>
listener[0] === event && (!handler || listener[1] === handler)
)) >= 0) {
this._listeners.splice(index, 1);
}
}
protected _fetch(signal: AbortSignal) {
const headers = new Headers({
'Accept': 'text/event-stream',
});
const authorisation = typeof this._authorisation === 'function' ?
this._authorisation.call(null) : this._authorisation;
if (authorisation) headers.append('Authorization', authorisation);
if (this._useragent) headers.append('User-Agent', this._useragent);
if (typeof this._id === 'string') {
headers.append('Last-Event-Id', this._id);
}
return fetch(this.url, {
headers,
signal,
keepalive: true,
});
}
protected _connect() {
const controller = new AbortController();
this._fetch(controller.signal).then(async response => {
if (this._closed || this._response) {
controller.abort();
return;
}
this._response = response;
this._controller = controller;
if (!response.ok) {
debug('Non-200 response code', await response.text());
controller.abort();
return this._handleConnectionClosed();
}
if (!response.headers.get('Content-Type')?.match(/^text\/event-stream($|;)/)) {
debug('Response type is not text/event-stream', await response.text());
controller.abort();
return this._handleConnectionClosed();
}
if (!response.body) {
debug('Response does not include a body');
controller.abort();
return this._handleConnectionClosed();
}
const url = new URL(this.url);
url.search = '';
url.hash = '';
debug('Connected to %s', url);
const event = new Event('open');
this.dispatchEvent(event);
const reader = response.body.getReader();
return this._handleResponseStream(reader);
}).then(() => {
this._handleConnectionClosed();
}, err => {
this._handleConnectionClosed(err);
});
}
protected async _handleResponseStream(reader: ReadableStreamDefaultReader<Uint8Array>) {
let buffer = Buffer.alloc(0);
const n = '\n'.charCodeAt(0);
let value: Uint8Array | undefined;
let done = false;
while (!done) {
({value, done} = await reader.read());
if (!value) continue;
let index;
while ((index = value.findIndex(v => v === n)) >= 0) {
const line = Buffer.concat([buffer, value.slice(0, index)]);
if (buffer.length) buffer = Buffer.alloc(0);
value = value.slice(index + 1);
this._handleLine(new Uint8Array(line));
}
// Move any remaining data
buffer = Buffer.concat([buffer, value]);
}
}
_message_event: string | null = null;
_message_data: Uint8Array | null = null;
_message_id: string | null = null;
protected _handleLine(line: Uint8Array) {
if (line.length === 0) {
const event = new MessageEvent(this._message_event ?? 'message', {
data: this._message_data ? new TextDecoder().decode(this._message_data) : '',
lastEventId: this._message_id ?? undefined,
// source: this as unknown as MessageEventSource,
});
this._message_data = null;
this.onAnyMessage?.call(this, event);
if (typeof this._message_event === 'string') {
for (const [type, handler] of this._listeners) {
if (type !== this._message_event) continue;
handler.call(this, event);
}
this._message_event = null;
} else {
this.onmessage?.call(null, event);
for (const [type, handler] of this._listeners) {
if (type !== 'message') continue;
handler.call(this, event);
}
}
if (typeof this._message_id === 'string') {
this._id = this._message_id;
this._message_id = null;
}
return;
}
const index = line.indexOf(':'.charCodeAt(0));
if (index < 0) return;
const tag = new TextDecoder().decode(line.slice(0, index));
const data = line.slice(index + (line[index + 1] === ' '.charCodeAt(0) ? 2 : 1));
if (tag === 'event') {
this._message_event = new TextDecoder().decode(data);
} else if (tag === 'data') {
this._message_data = this._message_data ?
new Uint8Array(Buffer.concat([this._message_data, Buffer.from('\n'), data])) :
data;
} else if (tag === 'id') {
this._message_id = new TextDecoder().decode(data);
} else if (tag === 'retry') {
const retry = parseInt(new TextDecoder().decode(data));
if (!isNaN(retry)) this._retry_after = retry;
} else if (tag) {
debug('Unknown message type "%s"', tag);
}
}
protected async _handleConnectionClosed(error?: Error) {
this._response = null;
if (this._closed) {
return;
}
if (error) {
const event = new ErrorEvent(error);
this.dispatchEvent(event);
}
const wait = Math.max(1000, this._retry_after ?? 0);
await new Promise(rs => setTimeout(rs, wait));
this._connect();
}
close() {
this._closed = true;
this._controller?.abort();
}
dispatchEvent(event: Event) {
// @ts-expect-error
this['on' + event.type]?.call(this, event);
for (const [type, handler] of this._listeners as Listener<string>[]) {
if (type !== event.type) continue;
// @ts-expect-error
handler.call(this, event);
}
}
}