1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

fix(ext/fetch): make EventSource more robust (#22493)

This PR fixes all unhandled rejections and resource leaks found while
adding a test for #22368.
This commit is contained in:
ud2 2024-03-25 01:49:01 +08:00 committed by GitHub
parent ae52b49dd6
commit d263c632e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 165 additions and 182 deletions

View file

@ -169,7 +169,7 @@ async function mainFetch(req, recursive, terminator) {
try { try {
resp = await opFetchSend(requestRid); resp = await opFetchSend(requestRid);
} catch (err) { } catch (err) {
if (terminator.aborted) return; if (terminator.aborted) return abortedNetworkError();
throw err; throw err;
} finally { } finally {
if (cancelHandleRid !== null) { if (cancelHandleRid !== null) {

View file

@ -11,7 +11,6 @@ const {
NumberIsNaN, NumberIsNaN,
ObjectDefineProperties, ObjectDefineProperties,
ObjectPrototypeIsPrototypeOf, ObjectPrototypeIsPrototypeOf,
Promise,
StringPrototypeEndsWith, StringPrototypeEndsWith,
StringPrototypeIncludes, StringPrototypeIncludes,
StringPrototypeIndexOf, StringPrototypeIndexOf,
@ -19,7 +18,6 @@ const {
StringPrototypeSplit, StringPrototypeSplit,
StringPrototypeStartsWith, StringPrototypeStartsWith,
StringPrototypeToLowerCase, StringPrototypeToLowerCase,
Symbol,
SymbolFor, SymbolFor,
} = primordials; } = primordials;
@ -32,6 +30,7 @@ import {
EventTarget, EventTarget,
setIsTrusted, setIsTrusted,
} from "ext:deno_web/02_event.js"; } from "ext:deno_web/02_event.js";
import { clearTimeout, setTimeout } from "ext:deno_web/02_timers.js";
import { TransformStream } from "ext:deno_web/06_streams.js"; import { TransformStream } from "ext:deno_web/06_streams.js";
import { TextDecoderStream } from "ext:deno_web/08_text_encoding.js"; import { TextDecoderStream } from "ext:deno_web/08_text_encoding.js";
import { getLocationHref } from "ext:deno_web/12_location.js"; import { getLocationHref } from "ext:deno_web/12_location.js";
@ -99,29 +98,24 @@ const CONNECTING = 0;
const OPEN = 1; const OPEN = 1;
const CLOSED = 2; const CLOSED = 2;
const _url = Symbol("[[url]]");
const _withCredentials = Symbol("[[withCredentials]]");
const _readyState = Symbol("[[readyState]]");
const _reconnectionTime = Symbol("[[reconnectionTime]]");
const _lastEventID = Symbol("[[lastEventID]]");
const _abortController = Symbol("[[abortController]]");
const _loop = Symbol("[[loop]]");
class EventSource extends EventTarget { class EventSource extends EventTarget {
/** @type {AbortController} */ /** @type {AbortController} */
[_abortController] = new AbortController(); #abortController = new AbortController();
/** @type {number | undefined} */
#reconnectionTimerId;
/** @type {number} */ /** @type {number} */
[_reconnectionTime] = 5000; #reconnectionTime = 5000;
/** @type {string} */ /** @type {string} */
[_lastEventID] = ""; #lastEventId = "";
/** @type {number} */ /** @type {number} */
[_readyState] = CONNECTING; #readyState = CONNECTING;
get readyState() { get readyState() {
webidl.assertBranded(this, EventSourcePrototype); webidl.assertBranded(this, EventSourcePrototype);
return this[_readyState]; return this.#readyState;
} }
get CONNECTING() { get CONNECTING() {
@ -138,36 +132,29 @@ class EventSource extends EventTarget {
} }
/** @type {string} */ /** @type {string} */
[_url]; #url;
get url() { get url() {
webidl.assertBranded(this, EventSourcePrototype); webidl.assertBranded(this, EventSourcePrototype);
return this[_url]; return this.#url;
} }
/** @type {boolean} */ /** @type {boolean} */
[_withCredentials]; #withCredentials;
get withCredentials() { get withCredentials() {
webidl.assertBranded(this, EventSourcePrototype); webidl.assertBranded(this, EventSourcePrototype);
return this[_withCredentials]; return this.#withCredentials;
} }
constructor(url, eventSourceInitDict = {}) { constructor(url, eventSourceInitDict = {}) {
super(); super();
this[webidl.brand] = webidl.brand; this[webidl.brand] = webidl.brand;
const prefix = "Failed to construct 'EventSource'"; const prefix = "Failed to construct 'EventSource'";
webidl.requiredArguments(arguments.length, 1, { webidl.requiredArguments(arguments.length, 1, prefix);
prefix, url = webidl.converters.USVString(url, prefix, "Argument 1");
});
url = webidl.converters.USVString(url, {
prefix,
context: "Argument 1",
});
eventSourceInitDict = webidl.converters.EventSourceInit( eventSourceInitDict = webidl.converters.EventSourceInit(
eventSourceInitDict, eventSourceInitDict,
{ prefix,
prefix, "Argument 2",
context: "Argument 2",
},
); );
try { try {
@ -176,175 +163,171 @@ class EventSource extends EventTarget {
throw new DOMException(e.message, "SyntaxError"); throw new DOMException(e.message, "SyntaxError");
} }
this[_url] = url; this.#url = url;
this[_withCredentials] = eventSourceInitDict.withCredentials; this.#withCredentials = eventSourceInitDict.withCredentials;
this[_loop](); this.#loop();
} }
close() { close() {
webidl.assertBranded(this, EventSourcePrototype); webidl.assertBranded(this, EventSourcePrototype);
this[_abortController].abort(); this.#abortController.abort();
this[_readyState] = CLOSED; this.#readyState = CLOSED;
clearTimeout(this.#reconnectionTimerId);
} }
async [_loop]() { async #loop() {
let lastEventIDValue = ""; const lastEventIdValue = this.#lastEventId;
while (this[_readyState] !== CLOSED) { const req = newInnerRequest(
const lastEventIDValueCopy = lastEventIDValue; "GET",
lastEventIDValue = ""; this.#url,
const req = newInnerRequest( () =>
"GET", lastEventIdValue === ""
this[_url], ? [
() => ["accept", "text/event-stream"],
lastEventIDValueCopy === "" ]
? [ : [
["accept", "text/event-stream"], ["accept", "text/event-stream"],
] ["Last-Event-Id", op_utf8_to_byte_string(lastEventIdValue)],
: [ ],
["accept", "text/event-stream"], null,
[ false,
"Last-Event-Id", );
op_utf8_to_byte_string(lastEventIDValueCopy), /** @type {InnerResponse} */
], let res;
], try {
null, res = await mainFetch(req, true, this.#abortController.signal);
false, } catch {
); this.#reestablishConnection();
/** @type {InnerResponse} */ return;
const res = await mainFetch(req, true, this[_abortController].signal); }
const contentType = ArrayPrototypeFind( if (res.aborted) {
res.headerList, this.#failConnection();
(header) => StringPrototypeToLowerCase(header[0]) === "content-type", return;
); }
if (res.type === "error") { if (res.type === "error") {
if (res.aborted) { this.#reestablishConnection();
this[_readyState] = CLOSED; return;
this.dispatchEvent(new Event("error")); }
break; const contentType = ArrayPrototypeFind(
} else { res.headerList,
if (this[_readyState] === CLOSED) { (header) => StringPrototypeToLowerCase(header[0]) === "content-type",
this[_abortController].abort(); );
break; if (
} res.status !== 200 ||
this[_readyState] = CONNECTING; !contentType ||
this.dispatchEvent(new Event("error")); !StringPrototypeIncludes(
await new Promise((res) => setTimeout(res, this[_reconnectionTime])); StringPrototypeToLowerCase(contentType[1]),
if (this[_readyState] !== CONNECTING) { "text/event-stream",
continue; )
} ) {
this.#failConnection();
return;
}
if (this[_lastEventID] !== "") { if (this.#readyState === CLOSED) {
lastEventIDValue = this[_lastEventID]; return;
} }
continue; this.#readyState = OPEN;
} this.dispatchEvent(new Event("open"));
} else if (
res.status !== 200 || let data = "";
!StringPrototypeIncludes( let eventType = "";
contentType?.[1].toLowerCase(), let lastEventId = this.#lastEventId;
"text/event-stream",
) try {
for await (
// deno-lint-ignore prefer-primordials
const chunk of res.body.stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream({ allowCR: true }))
) { ) {
this[_readyState] = CLOSED; if (chunk === "") {
this.dispatchEvent(new Event("error")); this.#lastEventId = lastEventId;
break; if (data === "") {
}
if (this[_readyState] !== CLOSED) {
this[_readyState] = OPEN;
this.dispatchEvent(new Event("open"));
let data = "";
let eventType = "";
let lastEventID = this[_lastEventID];
for await (
// deno-lint-ignore prefer-primordials
const chunk of res.body.stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream({ allowCR: true }))
) {
if (chunk === "") {
this[_lastEventID] = lastEventID;
if (data === "") {
eventType = "";
continue;
}
if (StringPrototypeEndsWith(data, "\n")) {
data = StringPrototypeSlice(data, 0, -1);
}
const event = new MessageEvent(eventType || "message", {
data,
origin: res.url(),
lastEventId: this[_lastEventID],
});
setIsTrusted(event, true);
data = "";
eventType = ""; eventType = "";
if (this[_readyState] !== CLOSED) {
this.dispatchEvent(event);
}
} else if (StringPrototypeStartsWith(chunk, ":")) {
continue; continue;
} else {
let field = chunk;
let value = "";
if (StringPrototypeIncludes(chunk, ":")) {
({ 0: field, 1: value } = StringPrototypeSplit(chunk, ":"));
if (StringPrototypeStartsWith(value, " ")) {
value = StringPrototypeSlice(value, 1);
}
}
switch (field) {
case "event": {
eventType = value;
break;
}
case "data": {
data += value + "\n";
break;
}
case "id": {
if (!StringPrototypeIncludes(value, "\0")) {
lastEventID = value;
}
break;
}
case "retry": {
const reconnectionTime = Number(value);
if (
!NumberIsNaN(reconnectionTime) &&
NumberIsFinite(reconnectionTime)
) {
this[_reconnectionTime] = reconnectionTime;
}
break;
}
}
} }
if (StringPrototypeEndsWith(data, "\n")) {
if (this[_abortController].signal.aborted) { data = StringPrototypeSlice(data, 0, -1);
break;
} }
} const event = new MessageEvent(eventType || "message", {
if (this[_readyState] === CLOSED) { data,
this[_abortController].abort(); origin: res.url(),
break; lastEventId: this.#lastEventId,
} });
this[_readyState] = CONNECTING; setIsTrusted(event, true);
this.dispatchEvent(new Event("error")); data = "";
await new Promise((res) => setTimeout(res, this[_reconnectionTime])); eventType = "";
if (this[_readyState] !== CONNECTING) { if (this.#readyState !== CLOSED) {
this.dispatchEvent(event);
}
} else if (StringPrototypeStartsWith(chunk, ":")) {
continue; continue;
} } else {
let field = chunk;
let value = "";
if (StringPrototypeIncludes(chunk, ":")) {
({ 0: field, 1: value } = StringPrototypeSplit(chunk, ":"));
if (StringPrototypeStartsWith(value, " ")) {
value = StringPrototypeSlice(value, 1);
}
}
if (this[_lastEventID] !== "") { switch (field) {
lastEventIDValue = this[_lastEventID]; case "event": {
eventType = value;
break;
}
case "data": {
data += value + "\n";
break;
}
case "id": {
if (!StringPrototypeIncludes(value, "\0")) {
lastEventId = value;
}
break;
}
case "retry": {
const reconnectionTime = Number(value);
if (
!NumberIsNaN(reconnectionTime) &&
NumberIsFinite(reconnectionTime)
) {
this.#reconnectionTime = reconnectionTime;
}
break;
}
}
} }
} }
} catch {
// The connection is reestablished below
}
this.#reestablishConnection();
}
#reestablishConnection() {
if (this.#readyState === CLOSED) {
return;
}
this.#readyState = CONNECTING;
this.dispatchEvent(new Event("error"));
this.#reconnectionTimerId = setTimeout(() => {
if (this.#readyState !== CONNECTING) {
return;
}
this.#loop();
}, this.#reconnectionTime);
}
#failConnection() {
if (this.#readyState !== CLOSED) {
this.#readyState = CLOSED;
this.dispatchEvent(new Event("error"));
} }
} }