diff --git a/package-lock.json b/package-lock.json index 99258c9..8b33a78 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,6 @@ "dotenv": "^16.3.1", "dotenv-expand": "^10.0.0", "env-paths": "^3.0.0", - "eventsource": "^2.0.2", "express": "^4.18.2", "node-notifier": "^10.0.1", "node-persist": "^3.1.3", @@ -2792,14 +2791,6 @@ "node": ">= 0.6" } }, - "node_modules/eventsource": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz", - "integrity": "sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==", - "engines": { - "node": ">=12.0.0" - } - }, "node_modules/exponential-backoff": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/exponential-backoff/-/exponential-backoff-3.1.1.tgz", @@ -8064,11 +8055,6 @@ "resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz", "integrity": "sha512-aIL5Fx7mawVa300al2BnEE4iNvo1qETxLrPI/o05L7z6go7fCw1J6EQmbK4FmJ2AS7kgVF/KEZWufBfdClMcPg==" }, - "eventsource": { - "version": "2.0.2", - "resolved": "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz", - "integrity": "sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==" - }, "exponential-backoff": { "version": "3.1.1", "resolved": "https://registry.npmjs.org/exponential-backoff/-/exponential-backoff-3.1.1.tgz", diff --git a/package.json b/package.json index 55429ba..80973f2 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,6 @@ "dotenv": "^16.3.1", "dotenv-expand": "^10.0.0", "env-paths": "^3.0.0", - "eventsource": "^2.0.2", "express": "^4.18.2", "node-notifier": "^10.0.1", "node-persist": "^3.1.3", diff --git a/src/cli/presence-server.ts b/src/cli/presence-server.ts index dcb46a9..c4dffb1 100644 --- a/src/cli/presence-server.ts +++ b/src/cli/presence-server.ts @@ -695,7 +695,7 @@ class Server extends HttpServer { } if (match.splatoon3_fest_team) break; - + for (const player of team.preVotes.nodes) { if (player.userIcon.url !== friend.userIcon.url) continue; @@ -705,7 +705,7 @@ class Server extends HttpServer { }; break; } - + if (match.splatoon3_fest_team) break; } diff --git a/src/cli/splatnet3/monitor.ts b/src/cli/splatnet3/monitor.ts index 3ba14d6..0c66e20 100644 --- a/src/cli/splatnet3/monitor.ts +++ b/src/cli/splatnet3/monitor.ts @@ -158,7 +158,7 @@ export async function handler(argv: ArgumentsCamelCase) { try { await new Promise(rs => sleep_timeout = setTimeout(sleep_resolve = rs, argv.updateInterval * 1000)); - + while (!should_exit) { updating = true; [vs, coop, album] = await update(argv, splatnet, directory, vs, coop, album); diff --git a/src/client/coral.ts b/src/client/coral.ts index fd3159b..da2fd9d 100644 --- a/src/client/coral.ts +++ b/src/client/coral.ts @@ -181,7 +181,7 @@ export default class Coral { oidc.getToken(), oidc.getUser(), ]); - + const {nso, data} = await CoralApi.createWithNintendoAccountToken(token, user); const auth_data: SavedToken = { @@ -273,7 +273,7 @@ async function renewToken( renew_token_data: {auth_data: SavedToken}, ratelimit = true ) { // if (ratelimit) { - // const [jwt, sig] = Jwt.decode(token); + // const [jwt, sig] = Jwt.decode(token); // await checkUseLimit(storage, 'coral', jwt.payload.sub, ratelimit); // } diff --git a/src/common/auth/coral.ts b/src/common/auth/coral.ts index 0ca5620..48f0cd6 100644 --- a/src/common/auth/coral.ts +++ b/src/common/auth/coral.ts @@ -138,7 +138,7 @@ async function renewToken( ) { let attempt; if (ratelimit) { - const [jwt, sig] = Jwt.decode(na_session_token); + const [jwt, sig] = Jwt.decode(na_session_token); attempt = await checkUseLimit(storage, 'coral', jwt.payload.sub, ratelimit); } diff --git a/src/common/auth/moon.ts b/src/common/auth/moon.ts index f52babb..8cb7347 100644 --- a/src/common/auth/moon.ts +++ b/src/common/auth/moon.ts @@ -72,7 +72,7 @@ export async function getPctlToken(storage: persist.LocalStorage, token: string, debug('Using existing token'); await storage.setItem('NintendoAccountToken-pctl.' + existingToken.user.id, token); - + const moon = MoonApi.createWithSavedToken(existingToken); moon.onTokenExpired = createTokenExpiredHandler(storage, token, moon, {existingToken}); diff --git a/src/common/presence.ts b/src/common/presence.ts index d57fe62..50a60a5 100644 --- a/src/common/presence.ts +++ b/src/common/presence.ts @@ -1,4 +1,5 @@ -import EventSource from 'eventsource'; +import { errors } from 'undici'; +import EventSource, { ErrorEvent } from '../util/eventsource.js'; import { DiscordRpcClient, findDiscordRpcClient } from '../discord/rpc.js'; import { getDiscordPresence, getInactiveDiscordPresence } from '../discord/util.js'; import { DiscordPresencePlayTime, DiscordPresenceContext, DiscordPresence, ExternalMonitorConstructor, ExternalMonitor, ErrorResult } from '../discord/types.js'; @@ -30,6 +31,7 @@ interface SavedPresence { class ZncDiscordPresenceClient { rpc: {client: DiscordRpcClient, id: string} | null = null; + connecting: string | null = null; title: {id: string; since: number} | null = null; monitors = new Map, ExternalMonitor>(); protected i = 0; @@ -245,7 +247,10 @@ class ZncDiscordPresenceClient { } if (!this.rpc) { - this.connect(client_id, this.m.discord_client_filter); + if (this.connecting !== client_id) { + this.connect(client_id, this.m.discord_client_filter).finally(() => this.connecting = null); + this.connecting = client_id; + } } else { if (typeof activity === 'string') this.rpc.client.clearActivity(); else this.rpc.client.setActivity(activity.activity); @@ -265,7 +270,7 @@ class ZncDiscordPresenceClient { let i = ++this.i; while (attempts < MAX_CONNECT_ATTEMPTS) { - if (this.i !== i || client!) return; + if (this.i !== i || client! || this.rpc) return; if (attempts === 0) debugDiscord('RPC connecting', client_id, i); else debugDiscord('RPC connecting, attempt %d', attempts + 1, client_id, i); @@ -524,7 +529,7 @@ export class ZncProxyDiscordPresence extends Loop { readonly user_notifications = false; readonly friend_notifications = false; update_interval = 30; - upgrade_to_sse = process.env.NXAPI_PRESENCE_SSE === '1'; + upgrade_to_sse = process.env.NXAPI_PRESENCE_SSE !== '0'; presence_user: null = null; discord_preconnect = false; @@ -626,9 +631,7 @@ export class ZncProxyDiscordPresence extends Loop { if (this.events) this.events.close(); const events = new EventSource(this.eventstream_url ?? this.presence_url, { - headers: { - 'User-Agent': getUserAgent(), - }, + useragent: getUserAgent(), }); this.events = events; @@ -636,11 +639,10 @@ export class ZncProxyDiscordPresence extends Loop { let timeout: NodeJS.Timeout; let timeout_interval = 90000; const ontimeout = () => { - const err = new Error('Timeout') as any; - err.type = 'error'; - err[TemporaryErrorSymbol] = true; - Object.defineProperty(err, 'detail', {enumerable: false, value: err}); - events.dispatchEvent(err); + const event = new ErrorEvent(new errors.RequestAbortedError('Timeout')); + // @ts-expect-error + event[TemporaryErrorSymbol] = true; + events.dispatchEvent(event); }; events.onopen = event => { @@ -650,11 +652,10 @@ export class ZncProxyDiscordPresence extends Loop { let user: CurrentUser | Friend | undefined = undefined; let presence: Presence | null = null; - let supported_events: readonly string[] = ['friend']; this.last_data = {}; - const onmessage = (event: MessageEvent) => { + events.onAnyMessage = (event: MessageEvent) => { clearTimeout(timeout); timeout = setTimeout(ontimeout, timeout_interval); @@ -672,15 +673,6 @@ export class ZncProxyDiscordPresence extends Loop { e !== 'message' && e !== 'update' && e !== 'supported_events'); debugEventStream('Received supported events message', new_supported_events); - - for (const type of supported_events) { - events.removeEventListener(type, onmessage); - } - for (const type of new_supported_events) { - events.addEventListener(type, onmessage); - } - supported_events = new_supported_events; - return; } @@ -703,11 +695,6 @@ export class ZncProxyDiscordPresence extends Loop { } }; - events.onmessage = onmessage; - events.addEventListener('supported_events', onmessage); - events.addEventListener('update', onmessage); - events.addEventListener('friend', onmessage); - return new Promise((rs, rj) => { this.timeout_resolve = () => { debugEventStream('Update interval cancelled, closing event stream'); @@ -715,19 +702,14 @@ export class ZncProxyDiscordPresence extends Loop { rs(); }; - events.onerror = event => { + events.onerror = (event: ErrorEvent | MessageEvent) => { debugEventStream('EventSource error', event); events.close(); - if (event instanceof Error) { - rj(event); - } else if ((event as any).message) { - const err = new Error((event as any).message); - Object.assign(err, event); - rj(err); + if (event instanceof MessageEvent) { + rj(new ErrorResponse('Received error in event stream', events.response!, event.data)); } else { - // No error message - rs(); + rj(event.error); } }; }); diff --git a/src/discord/rpc.ts b/src/discord/rpc.ts index 0b1b16b..fef52cc 100644 --- a/src/discord/rpc.ts +++ b/src/discord/rpc.ts @@ -36,9 +36,10 @@ export async function findDiscordRpcClient( if (!socket) continue; const client = new DiscordRpcClient({transport: 'ipc', ipc_socket: socket}); - await client.connect(clientid); try { + await client.connect(clientid); + if (filter.call(null, client, i)) return [i, client] as const; await client.destroy(); diff --git a/src/util/eventsource.ts b/src/util/eventsource.ts new file mode 100644 index 0000000..da7cf74 --- /dev/null +++ b/src/util/eventsource.ts @@ -0,0 +1,279 @@ +import { Buffer } from 'node:buffer'; +import { fetch, Headers, Response } from 'undici'; +import createDebug from './debug.js'; + +const debug = createDebug('nxapi:util:eventsource'); + +export class ErrorEvent extends Event { + constructor( + readonly error: Error, + readonly message = error.message, + ) { + super('error'); + } +} + +export enum EventSourceReadyState { + CONNECTING = 0, + OPEN = 1, + CLOSED = 2, +} + +export interface EventSourceInit extends globalThis.EventSourceInit { + authorisation?: string | (() => string); + useragent?: string; +} + +type Listener = + T extends 'error' ? [type: T, handler: (error: ErrorEvent) => void] : + T extends 'open' ? [type: T, handler: (event: Event) => void] : + [type: T, handler: (event: MessageEvent) => void]; + +export default class EventSource { + protected _response: Response | null = null; + protected _controller: AbortController | null = null; + protected _closed = false; + + protected _id: string | null = null; + protected _retry_after: number | null = null; + + protected readonly _authorisation: string | (() => string) | null = null; + protected readonly _useragent: string | null = null; + + readonly withCredentials = false; + + onerror?: (error: ErrorEvent) => void; + onmessage?: (message: MessageEvent) => void; + onopen?: (event: Event) => void; + + onAnyMessage?: (message: MessageEvent) => void; + + protected readonly _listeners: Listener[] = []; + + constructor(readonly url: string, init?: EventSourceInit) { + if (init?.withCredentials) debug('init.withCredentials is not supported'); + if (init?.authorisation) this._authorisation = init.authorisation; + if (init?.useragent) this._useragent = init.useragent; + + this._connect(); + } + + get readyState(): EventSourceReadyState { + if (this._closed) return EventSourceReadyState.CLOSED; + if (this._response) return EventSourceReadyState.OPEN; + return EventSourceReadyState.CONNECTING; + } + + get response(): Response | null { + return this._response; + } + + addEventListener(event: T, handler: Listener[1]) { + this._listeners.push([event, handler]); + } + + removeEventListener(event: T, handler?: Listener[1]) { + let index; + while ((index = this._listeners.findIndex(listener => + listener[0] === event && (!handler || listener[1] === handler) + )) >= 0) { + this._listeners.splice(index, 1); + } + } + + protected _fetch(signal: AbortSignal) { + const headers = new Headers({ + 'Accept': 'text/event-stream', + }); + + const authorisation = typeof this._authorisation === 'function' ? + this._authorisation.call(null) : this._authorisation; + if (authorisation) headers.append('Authorization', authorisation); + + if (this._useragent) headers.append('User-Agent', this._useragent); + + if (typeof this._id === 'string') { + headers.append('Last-Event-Id', this._id); + } + + return fetch(this.url, { + headers, + signal, + keepalive: true, + }); + } + + protected _connect() { + const controller = new AbortController(); + + this._fetch(controller.signal).then(async response => { + if (this._closed || this._response) { + controller.abort(); + return; + } + + this._response = response; + this._controller = controller; + + if (!response.ok) { + debug('Non-200 response code', await response.text()); + controller.abort(); + return this._handleConnectionClosed(); + } + + if (!response.headers.get('Content-Type')?.match(/^text\/event-stream($|;)/)) { + debug('Response type is not text/event-stream', await response.text()); + controller.abort(); + return this._handleConnectionClosed(); + } + + if (!response.body) { + debug('Response does not include a body'); + controller.abort(); + return this._handleConnectionClosed(); + } + + const url = new URL(this.url); + url.search = ''; + url.hash = ''; + debug('Connected to %s', url); + + const event = new Event('open'); + this.dispatchEvent(event); + + const reader = response.body.getReader(); + + return this._handleResponseStream(reader); + }).then(() => { + this._handleConnectionClosed(); + }, err => { + this._handleConnectionClosed(err); + }); + } + + protected async _handleResponseStream(reader: ReadableStreamDefaultReader) { + let buffer = Buffer.alloc(0); + const n = '\n'.charCodeAt(0); + + let value: Uint8Array | undefined; + let done = false; + + while (!done) { + ({value, done} = await reader.read()); + + if (!value) continue; + + let index; + while ((index = value.findIndex(v => v === n)) >= 0) { + const line = Buffer.concat([buffer, value.slice(0, index)]); + if (buffer.length) buffer = Buffer.alloc(0); + value = value.slice(index + 1); + + this._handleLine(new Uint8Array(line)); + } + + // Move any remaining data + buffer = Buffer.concat([buffer, value]); + } + } + + _message_event: string | null = null; + _message_data: Uint8Array | null = null; + _message_id: string | null = null; + + protected _handleLine(line: Uint8Array) { + if (line.length === 0) { + const event = new MessageEvent(this._message_event ?? 'message', { + data: this._message_data ? new TextDecoder().decode(this._message_data) : '', + lastEventId: this._message_id ?? undefined, + // source: this as unknown as MessageEventSource, + }); + + this._message_data = null; + + this.onAnyMessage?.call(this, event); + + if (typeof this._message_event === 'string') { + for (const [type, handler] of this._listeners) { + if (type !== this._message_event) continue; + + handler.call(this, event); + } + + this._message_event = null; + } else { + this.onmessage?.call(null, event); + + for (const [type, handler] of this._listeners) { + if (type !== 'message') continue; + + handler.call(this, event); + } + } + + if (typeof this._message_id === 'string') { + this._id = this._message_id; + this._message_id = null; + } + + return; + } + + const index = line.indexOf(':'.charCodeAt(0)); + if (index < 0) return; + + const tag = new TextDecoder().decode(line.slice(0, index)); + const data = line.slice(index + (line[index + 1] === ' '.charCodeAt(0) ? 2 : 1)); + + if (tag === 'event') { + this._message_event = new TextDecoder().decode(data); + } else if (tag === 'data') { + this._message_data = this._message_data ? + new Uint8Array(Buffer.concat([this._message_data, Buffer.from('\n'), data])) : + data; + } else if (tag === 'id') { + this._message_id = new TextDecoder().decode(data); + } else if (tag === 'retry') { + const retry = parseInt(new TextDecoder().decode(data)); + if (!isNaN(retry)) this._retry_after = retry; + } else if (tag) { + debug('Unknown message type "%s"', tag); + } + } + + protected async _handleConnectionClosed(error?: Error) { + this._response = null; + + if (this._closed) { + return; + } + + if (error) { + const event = new ErrorEvent(error); + + this.dispatchEvent(event); + } + + const wait = Math.max(1000, this._retry_after ?? 0); + await new Promise(rs => setTimeout(rs, wait)); + + this._connect(); + } + + close() { + this._closed = true; + this._controller?.abort(); + } + + dispatchEvent(event: Event) { + // @ts-expect-error + this['on' + event.type]?.call(this, event); + + for (const [type, handler] of this._listeners as Listener[]) { + if (type !== event.type) continue; + + // @ts-expect-error + handler.call(this, event); + } + } +}