Fix handling errors connecting to event streams

This commit is contained in:
Samuel Elliott 2023-07-26 10:00:08 +01:00
parent f043d8ebb0
commit 69afbc6d69
No known key found for this signature in database
GPG Key ID: 8420C7CDE43DC4D6

View File

@ -1,6 +1,7 @@
import { Buffer } from 'node:buffer';
import { fetch, Headers, Response } from 'undici';
import createDebug from './debug.js';
import { ErrorResponse } from '../api/util.js';
const debug = createDebug('nxapi:util:eventsource');
@ -43,6 +44,7 @@ export default class EventSource {
protected readonly _authorisation: string | (() => string) | null = null;
protected readonly _useragent: string | null = null;
protected readonly _min_retry_after = 2000;
readonly withCredentials = false;
@ -68,6 +70,7 @@ export default class EventSource {
Object.defineProperty(this, '_retry_after', {enumerable: false});
Object.defineProperty(this, '_authorisation', {enumerable: false});
Object.defineProperty(this, '_useragent', {enumerable: false});
Object.defineProperty(this, '_min_retry_after', {enumerable: false});
Object.defineProperty(this, 'onerror', {enumerable: false, writable: true});
Object.defineProperty(this, 'onmessage', {enumerable: false, writable: true});
Object.defineProperty(this, 'onopen', {enumerable: false, writable: true});
@ -148,6 +151,8 @@ export default class EventSource {
this._controller = controller;
this._connecting = connecting;
let _response: Response | null = null;
connecting.then(async response => {
const url = new URL(this.url);
url.search = '';
@ -164,21 +169,20 @@ export default class EventSource {
this._connecting = null;
if (!response.ok) {
debug('Non-200 response code', await response.text());
controller.abort();
return this._handleConnectionClosed();
const error = await EventSourceErrorResponse.fromResponse(response, 'Non-200 status code');
return this._handleResponseError(response, controller, error);
}
if (!response.headers.get('Content-Type')?.match(/^text\/event-stream($|;)/)) {
debug('Response type is not text/event-stream', await response.text());
controller.abort();
return this._handleConnectionClosed();
const error = await EventSourceInvalidResponseTypeError.fromResponse(response,
'Response type is not text/event-stream');
return this._handleResponseError(response, controller, error);
}
if (!response.body) {
debug('Response does not include a body');
controller.abort();
return this._handleConnectionClosed();
const error = await EventSourceInvalidResponseTypeError.fromResponse(response,
'Response does not include a body');
return this._handleResponseError(response, controller, error);
}
debug('Connected to %s', url);
@ -196,6 +200,17 @@ export default class EventSource {
});
}
protected async _handleResponseError(
response: Response, controller: AbortController,
error: EventSourceErrorResponse,
) {
debug('Error connecting to event stream', error);
controller.abort();
const event = new ErrorEvent(error);
this.dispatchEvent(event);
}
protected async _handleResponseStream(reader: ReadableStreamDefaultReader<Uint8Array>) {
let buffer = Buffer.alloc(0);
const n = '\n'.charCodeAt(0);
@ -299,7 +314,7 @@ export default class EventSource {
this.dispatchEvent(event);
}
const wait = Math.max(1000, this._retry_after ?? 0);
const wait = Math.max(this._min_retry_after, this._retry_after ?? 0);
clearTimeout(this._reconnect_timeout!);
this._reconnect_timeout = setTimeout(() => this._connect(), wait);
@ -325,3 +340,6 @@ export default class EventSource {
}
}
}
export class EventSourceErrorResponse extends ErrorResponse {}
export class EventSourceInvalidResponseTypeError extends ErrorResponse {}