diff --git a/extensions/fetch/11_streams.js b/extensions/fetch/11_streams.js index 80929f65e4..9d784bc7b4 100644 --- a/extensions/fetch/11_streams.js +++ b/extensions/fetch/11_streams.js @@ -3227,6 +3227,10 @@ } } + function errorReadableStream(stream, e) { + readableStreamDefaultControllerError(stream[_controller], e); + } + /** @template R */ class ReadableStreamGenericReader { /** @type {Deferred} */ @@ -3873,6 +3877,7 @@ window.__bootstrap.streams = { // Non-Public isReadableStreamDisturbed, + errorReadableStream, // Exposed in global runtime scope ByteLengthQueuingStrategy, CountQueuingStrategy, diff --git a/extensions/fetch/22_body.js b/extensions/fetch/22_body.js index f0c7ac8bd2..dcf1128a70 100644 --- a/extensions/fetch/22_body.js +++ b/extensions/fetch/22_body.js @@ -20,7 +20,8 @@ const { parseFormData, formDataFromEntries, encodeFormData } = globalThis.__bootstrap.formData; const mimesniff = globalThis.__bootstrap.mimesniff; - const { isReadableStreamDisturbed } = globalThis.__bootstrap.streams; + const { isReadableStreamDisturbed, errorReadableStream } = + globalThis.__bootstrap.streams; class InnerBody { /** @type {ReadableStream | { body: Uint8Array, consumed: boolean }} */ @@ -106,6 +107,22 @@ } } + cancel(error) { + if (this.streamOrStatic instanceof ReadableStream) { + this.streamOrStatic.cancel(error); + } else { + this.streamOrStatic.consumed = true; + } + } + + error(error) { + if (this.streamOrStatic instanceof ReadableStream) { + errorReadableStream(this.streamOrStatic, error); + } else { + this.streamOrStatic.consumed = true; + } + } + /** * @returns {InnerBody} */ diff --git a/extensions/fetch/23_request.js b/extensions/fetch/23_request.js index ac38ce5522..29eddcf22b 100644 --- a/extensions/fetch/23_request.js +++ b/extensions/fetch/23_request.js @@ -26,9 +26,11 @@ getDecodeSplitHeader, } = window.__bootstrap.headers; const { HttpClient } = window.__bootstrap.fetch; + const abortSignal = window.__bootstrap.abortSignal; const _request = Symbol("request"); const _headers = Symbol("headers"); + const _signal = Symbol("signal"); const _mimeType = Symbol("mime type"); const _body = Symbol("body"); @@ -145,6 +147,8 @@ [_request]; /** @type {Headers} */ [_headers]; + /** @type {AbortSignal} */ + [_signal]; get [_mimeType]() { let charset = null; let essence = null; @@ -206,6 +210,9 @@ let request; const baseURL = getLocationHref(); + // 4. + let signal = null; + // 5. if (typeof input === "string") { const parsedURL = new URL(input, baseURL); @@ -213,8 +220,12 @@ } else { // 6. if (!(input instanceof Request)) throw new TypeError("Unreachable"); request = input[_request]; + signal = input[_signal]; } + // 12. + // TODO(lucacasonato): create a copy of `request` + // 22. if (init.redirect !== undefined) { request.redirectMode = init.redirect; @@ -227,6 +238,11 @@ request.method = method; } + // 26. + if (init.signal !== undefined) { + signal = init.signal; + } + // NOTE: non standard extension. This handles Deno.HttpClient parameter if (init.client !== undefined) { if (init.client !== null && !(init.client instanceof HttpClient)) { @@ -242,6 +258,12 @@ // 27. this[_request] = request; + // 28. + this[_signal] = abortSignal.newSignal(); + if (signal !== null) { + abortSignal.follow(this[_signal], signal); + } + // 29. this[_headers] = headersFromHeaderList(request.headerList, "request"); @@ -299,6 +321,9 @@ // 40. request.body = finalBody; + + // 41. + // TODO(lucacasonato): Extranious? https://github.com/whatwg/fetch/issues/1249 } get method() { @@ -321,13 +346,24 @@ return this[_request].redirectMode; } + get signal() { + webidl.assertBranded(this, Request); + return this[_signal]; + } + clone() { webidl.assertBranded(this, Request); if (this[_body] && this[_body].unusable()) { throw new TypeError("Body is unusable."); } const newReq = cloneInnerRequest(this[_request]); - return fromInnerRequest(newReq, guardFromHeaders(this[_headers])); + const newSignal = abortSignal.newSignal(); + abortSignal.follow(newSignal, this[_signal]); + return fromInnerRequest( + newReq, + newSignal, + guardFromHeaders(this[_headers]), + ); } get [Symbol.toStringTag]() { @@ -364,6 +400,10 @@ enumerable: true, configurable: true, }); + Object.defineProperty(Request.prototype, "signal", { + enumerable: true, + configurable: true, + }); Object.defineProperty(Request.prototype, "clone", { enumerable: true, writable: true, @@ -403,6 +443,12 @@ ), }, { key: "redirect", converter: webidl.converters["RequestRedirect"] }, + { + key: "signal", + converter: webidl.createNullableConverter( + webidl.converters["AbortSignal"], + ), + }, { key: "client", converter: webidl.converters.any }, ], ); @@ -420,9 +466,10 @@ * @param {"request" | "immutable" | "request-no-cors" | "response" | "none"} guard * @returns {Request} */ - function fromInnerRequest(inner, guard) { + function fromInnerRequest(inner, signal, guard) { const request = webidl.createBranded(Request); request[_request] = inner; + request[_signal] = signal; request[_headers] = headersFromHeaderList(inner.headerList, guard); return request; } diff --git a/extensions/fetch/23_response.js b/extensions/fetch/23_response.js index 7bbf3c66e6..4d843829b4 100644 --- a/extensions/fetch/23_response.js +++ b/extensions/fetch/23_response.js @@ -48,6 +48,7 @@ * @property {string} statusMessage * @property {[string, string][]} headerList * @property {null | typeof __window.bootstrap.fetchBody.InnerBody} body + * @property {boolean} aborted * @property {string} [error] */ @@ -92,12 +93,14 @@ urlList, status: response.status, statusMessage: response.statusMessage, + aborted: response.aborted, }; } const defaultInnerResponse = { type: "default", body: null, + aborted: false, url() { if (this.urlList.length == 0) return null; return this.urlList[this.urlList.length - 1]; @@ -128,6 +131,15 @@ return resp; } + /** + * @returns {InnerResponse} + */ + function abortedNetworkError() { + const resp = networkError("aborted"); + resp.aborted = true; + return resp; + } + class Response { /** @type {InnerResponse} */ [_response]; @@ -446,4 +458,5 @@ window.__bootstrap.fetch.redirectStatus = redirectStatus; window.__bootstrap.fetch.nullBodyStatus = nullBodyStatus; window.__bootstrap.fetch.networkError = networkError; + window.__bootstrap.fetch.abortedNetworkError = abortedNetworkError; })(globalThis); diff --git a/extensions/fetch/26_fetch.js b/extensions/fetch/26_fetch.js index 1dd797339c..6e1d4d4c83 100644 --- a/extensions/fetch/26_fetch.js +++ b/extensions/fetch/26_fetch.js @@ -15,14 +15,18 @@ const core = window.Deno.core; const webidl = window.__bootstrap.webidl; const { byteLowerCase } = window.__bootstrap.infra; + const { errorReadableStream } = window.__bootstrap.streams; const { InnerBody, extractBody } = window.__bootstrap.fetchBody; const { toInnerRequest, + toInnerResponse, fromInnerResponse, redirectStatus, nullBodyStatus, networkError, + abortedNetworkError, } = window.__bootstrap.fetch; + const abortSignal = window.__bootstrap.abortSignal; const REQUEST_BODY_HEADER_NAMES = [ "content-encoding", @@ -68,10 +72,26 @@ /** * @param {number} responseBodyRid + * @param {AbortSignal} [terminator] * @returns {ReadableStream} */ - function createResponseBodyStream(responseBodyRid) { - return new ReadableStream({ + function createResponseBodyStream(responseBodyRid, terminator) { + function onAbort() { + if (readable) { + errorReadableStream( + readable, + new DOMException("Ongoing fetch was aborted.", "AbortError"), + ); + } + try { + core.close(responseBodyRid); + } catch (_) { + // might have already been closed + } + } + // TODO(lucacasonato): clean up registration + terminator[abortSignal.add](onAbort); + const readable = new ReadableStream({ type: "bytes", async pull(controller) { try { @@ -88,28 +108,45 @@ } else { // We have reached the end of the body, so we close the stream. controller.close(); - core.close(responseBodyRid); + try { + core.close(responseBodyRid); + } catch (_) { + // might have already been closed + } } } catch (err) { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - controller.close(); - core.close(responseBodyRid); + if (terminator.aborted) { + controller.error( + new DOMException("Ongoing fetch was aborted.", "AbortError"), + ); + } else { + // There was an error while reading a chunk of the body, so we + // error. + controller.error(err); + } + try { + core.close(responseBodyRid); + } catch (_) { + // might have already been closed + } } }, cancel() { - core.close(responseBodyRid); + if (!terminator.aborted) { + terminator[abortSignal.signalAbort](); + } }, }); + return readable; } /** * @param {InnerRequest} req * @param {boolean} recursive + * @param {AbortSignal} terminator * @returns {Promise} */ - async function mainFetch(req, recursive) { + async function mainFetch(req, recursive, terminator) { /** @type {ReadableStream | Uint8Array | null} */ let reqBody = null; if (req.body !== null) { @@ -130,7 +167,7 @@ } } - const { requestRid, requestBodyRid } = opFetch({ + const { requestRid, requestBodyRid, cancelHandleRid } = opFetch({ method: req.method, url: req.currentUrl(), headers: req.headerList, @@ -138,6 +175,20 @@ hasBody: reqBody !== null, }, reqBody instanceof Uint8Array ? reqBody : null); + function onAbort() { + try { + core.close(cancelHandleRid); + } catch (_) { + // might have already been closed + } + try { + core.close(requestBodyRid); + } catch (_) { + // might have already been closed + } + } + terminator[abortSignal.add](onAbort); + if (requestBodyRid !== null) { if (reqBody === null || !(reqBody instanceof ReadableStream)) { throw new TypeError("Unreachable"); @@ -145,24 +196,49 @@ const reader = reqBody.getReader(); (async () => { while (true) { - const { value, done } = await reader.read(); + const { value, done } = await reader.read().catch((err) => { + if (terminator.aborted) return { done: true, value: undefined }; + throw err; + }); if (done) break; if (!(value instanceof Uint8Array)) { await reader.cancel("value not a Uint8Array"); break; } try { - await opFetchRequestWrite(requestBodyRid, value); + await opFetchRequestWrite(requestBodyRid, value).catch((err) => { + if (terminator.aborted) return; + throw err; + }); + if (terminator.aborted) break; } catch (err) { await reader.cancel(err); break; } } - core.close(requestBodyRid); + try { + core.close(requestBodyRid); + } catch (_) { + // might have already been closed + } })(); } - const resp = await opFetchSend(requestRid); + let resp; + try { + resp = await opFetchSend(requestRid).catch((err) => { + if (terminator.aborted) return; + throw err; + }); + } finally { + try { + core.close(cancelHandleRid); + } catch (_) { + // might have already been closed + } + } + if (terminator.aborted) return abortedNetworkError(); + /** @type {InnerResponse} */ const response = { headerList: resp.headers, @@ -185,7 +261,7 @@ ); case "follow": core.close(resp.responseRid); - return httpRedirectFetch(req, response); + return httpRedirectFetch(req, response, terminator); case "manual": break; } @@ -194,7 +270,9 @@ if (nullBodyStatus(response.status)) { core.close(resp.responseRid); } else { - response.body = new InnerBody(createResponseBodyStream(resp.responseRid)); + response.body = new InnerBody( + createResponseBodyStream(resp.responseRid, terminator), + ); } if (recursive) return response; @@ -211,7 +289,7 @@ * @param {InnerResponse} response * @returns {Promise} */ - function httpRedirectFetch(request, response) { + function httpRedirectFetch(request, response, terminator) { const locationHeaders = response.headerList.filter((entry) => byteLowerCase(entry[0]) === "location" ); @@ -264,43 +342,90 @@ request.body = res.body; } request.urlList.push(locationURL.href); - return mainFetch(request, true); + return mainFetch(request, true, terminator); } /** * @param {RequestInfo} input * @param {RequestInit} init */ - async function fetch(input, init = {}) { - const prefix = "Failed to call 'fetch'"; - webidl.requiredArguments(arguments.length, 1, { prefix }); - input = webidl.converters["RequestInfo"](input, { - prefix, - context: "Argument 1", - }); - init = webidl.converters["RequestInit"](init, { - prefix, - context: "Argument 2", - }); - + function fetch(input, init = {}) { // 1. - const requestObject = new Request(input, init); - // 2. - const request = toInnerRequest(requestObject); - // 10. - if (!requestObject.headers.has("Accept")) { - request.headerList.push(["Accept", "*/*"]); - } + const p = new Promise((resolve, reject) => { + const prefix = "Failed to call 'fetch'"; + webidl.requiredArguments(arguments.length, 1, { prefix }); + input = webidl.converters["RequestInfo"](input, { + prefix, + context: "Argument 1", + }); + init = webidl.converters["RequestInit"](init, { + prefix, + context: "Argument 2", + }); - // 12. - const response = await mainFetch(request, false); - if (response.type === "error") { - throw new TypeError( - "Fetch failed: " + (response.error ?? "unknown error"), - ); - } + // 2. + const requestObject = new Request(input, init); + // 3. + const request = toInnerRequest(requestObject); + // 4. + if (requestObject.signal.aborted) { + reject(abortFetch(request, null)); + return; + } - return fromInnerResponse(response, "immutable"); + // 7. + let responseObject = null; + // 9. + let locallyAborted = false; + // 10. + function onabort() { + locallyAborted = true; + reject(abortFetch(request, responseObject)); + } + requestObject.signal[abortSignal.add](onabort); + + if (!requestObject.headers.has("Accept")) { + request.headerList.push(["Accept", "*/*"]); + } + + // 12. + mainFetch(request, false, requestObject.signal).then((response) => { + // 12.1. + if (locallyAborted) return; + // 12.2. + if (response.aborted) { + reject(request, responseObject); + requestObject.signal[abortSignal.remove](onabort); + return; + } + // 12.3. + if (response.type === "error") { + const err = new TypeError( + "Fetch failed: " + (response.error ?? "unknown error"), + ); + reject(err); + requestObject.signal[abortSignal.remove](onabort); + return; + } + responseObject = fromInnerResponse(response, "immutable"); + resolve(responseObject); + requestObject.signal[abortSignal.remove](onabort); + }).catch((err) => { + reject(err); + requestObject.signal[abortSignal.remove](onabort); + }); + }); + return p; + } + + function abortFetch(request, responseObject) { + const error = new DOMException("Ongoing fetch was aborted.", "AbortError"); + if (request.body !== null) request.body.cancel(error); + if (responseObject !== null) { + const response = toInnerResponse(responseObject); + if (response.body !== null) response.body.error(error); + } + return error; } window.__bootstrap.fetch ??= {}; diff --git a/extensions/fetch/internal.d.ts b/extensions/fetch/internal.d.ts index 86de527613..f34e5e12ca 100644 --- a/extensions/fetch/internal.d.ts +++ b/extensions/fetch/internal.d.ts @@ -82,6 +82,7 @@ declare namespace globalThis { function toInnerRequest(request: Request): InnerRequest; function fromInnerRequest( inner: InnerRequest, + signal: AbortSignal | null, guard: | "request" | "immutable" diff --git a/extensions/fetch/lib.rs b/extensions/fetch/lib.rs index 0ac6422e40..2fbd38b3a9 100644 --- a/extensions/fetch/lib.rs +++ b/extensions/fetch/lib.rs @@ -16,6 +16,7 @@ use deno_core::AsyncRefCell; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; +use deno_core::Canceled; use deno_core::Extension; use deno_core::OpState; use deno_core::RcRef; @@ -131,6 +132,7 @@ pub struct FetchArgs { pub struct FetchReturn { request_rid: ResourceId, request_body_rid: Option, + cancel_handle_rid: Option, } pub fn op_fetch( @@ -157,7 +159,7 @@ where // Check scheme before asking for net permission let scheme = url.scheme(); - let (request_rid, request_body_rid) = match scheme { + let (request_rid, request_body_rid, cancel_handle_rid) = match scheme { "http" | "https" => { let permissions = state.borrow_mut::(); permissions.check_net_url(&url)?; @@ -195,13 +197,19 @@ where request = request.header(name, v); } - let fut = request.send(); + let cancel_handle = CancelHandle::new_rc(); + let cancel_handle_ = cancel_handle.clone(); + + let fut = async move { request.send().or_cancel(cancel_handle_).await }; let request_rid = state .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, request_body_rid) + let cancel_handle_rid = + state.resource_table.add(FetchCancelHandle(cancel_handle)); + + (request_rid, request_body_rid, Some(cancel_handle_rid)) } "data" => { let data_url = DataUrl::process(url.as_str()) @@ -216,13 +224,13 @@ where .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string()) .body(reqwest::Body::from(body))?; - let fut = async move { Ok(Response::from(response)) }; + let fut = async move { Ok(Ok(Response::from(response))) }; let request_rid = state .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, None) + (request_rid, None, None) } "blob" => { let blob_url_storage = @@ -244,13 +252,13 @@ where .header(http::header::CONTENT_TYPE, blob.media_type) .body(reqwest::Body::from(blob.data))?; - let fut = async move { Ok(Response::from(response)) }; + let fut = async move { Ok(Ok(Response::from(response))) }; let request_rid = state .resource_table .add(FetchRequestResource(Box::pin(fut))); - (request_rid, None) + (request_rid, None, None) } _ => return Err(type_error(format!("scheme '{}' not supported", scheme))), }; @@ -258,6 +266,7 @@ where Ok(FetchReturn { request_rid, request_body_rid, + cancel_handle_rid, }) } @@ -287,8 +296,9 @@ pub async fn op_fetch_send( .expect("multiple op_fetch_send ongoing"); let res = match request.0.await { - Ok(res) => res, - Err(e) => return Err(type_error(e.to_string())), + Ok(Ok(res)) => res, + Ok(Err(err)) => return Err(type_error(err.to_string())), + Err(_) => return Err(type_error("request was cancelled")), }; //debug!("Fetch response {}", url); @@ -372,8 +382,11 @@ pub async fn op_fetch_response_read( Ok(read) } +type CancelableResponseResult = + Result, Canceled>; + struct FetchRequestResource( - Pin>>>, + Pin>>, ); impl Resource for FetchRequestResource { @@ -382,6 +395,18 @@ impl Resource for FetchRequestResource { } } +struct FetchCancelHandle(Rc); + +impl Resource for FetchCancelHandle { + fn name(&self) -> Cow { + "fetchCancelHandle".into() + } + + fn close(self: Rc) { + self.0.cancel() + } +} + struct FetchRequestBodyResource { body: AsyncRefCell>>>, cancel: CancelHandle, @@ -391,6 +416,10 @@ impl Resource for FetchRequestBodyResource { fn name(&self) -> Cow { "fetchRequestBody".into() } + + fn close(self: Rc) { + self.cancel.cancel() + } } type BytesStream = @@ -405,6 +434,10 @@ impl Resource for FetchResponseBodyResource { fn name(&self) -> Cow { "fetchResponseBody".into() } + + fn close(self: Rc) { + self.cancel.cancel() + } } struct HttpClientResource { diff --git a/extensions/web/03_abort_signal.js b/extensions/web/03_abort_signal.js index e2648e778c..5d7e10bb2d 100644 --- a/extensions/web/03_abort_signal.js +++ b/extensions/web/03_abort_signal.js @@ -118,11 +118,25 @@ AbortSignal, ); + function newSignal() { + return new AbortSignal(illegalConstructorKey); + } + + function follow(followingSignal, parentSignal) { + if (parentSignal.aborted) { + followingSignal[signalAbort](); + } else { + parentSignal[add](() => followingSignal[signalAbort]()); + } + } + window.AbortSignal = AbortSignal; window.AbortController = AbortController; window.__bootstrap.abortSignal = { add, signalAbort, remove, + follow, + newSignal, }; })(this); diff --git a/runtime/js/40_http.js b/runtime/js/40_http.js index d4b658314a..63f64545f3 100644 --- a/runtime/js/40_http.js +++ b/runtime/js/40_http.js @@ -72,7 +72,7 @@ headersList, body !== null ? new InnerBody(body) : null, ); - const request = fromInnerRequest(innerRequest, "immutable"); + const request = fromInnerRequest(innerRequest, null, "immutable"); const respondWith = createRespondWith(this, responseSenderRid); diff --git a/tools/wpt/expectation.json b/tools/wpt/expectation.json index d724b6b5c9..b98138b0b2 100644 --- a/tools/wpt/expectation.json +++ b/tools/wpt/expectation.json @@ -1052,7 +1052,6 @@ "Request interface: attribute keepalive", "Request interface: attribute isReloadNavigation", "Request interface: attribute isHistoryNavigation", - "Request interface: attribute signal", "Request interface: attribute body", "Request interface: attribute bodyUsed", "Request interface: new Request('about:blank') must inherit property \"destination\" with the proper type", @@ -1065,14 +1064,22 @@ "Request interface: new Request('about:blank') must inherit property \"keepalive\" with the proper type", "Request interface: new Request('about:blank') must inherit property \"isReloadNavigation\" with the proper type", "Request interface: new Request('about:blank') must inherit property \"isHistoryNavigation\" with the proper type", - "Request interface: new Request('about:blank') must inherit property \"signal\" with the proper type", "Response interface: operation error()", "Response interface: operation redirect(USVString, optional unsigned short)", "Response interface: attribute body", "Response interface: attribute bodyUsed", "Response interface: calling redirect(USVString, optional unsigned short) on new Response() with too few arguments must throw TypeError", "Window interface: operation fetch(RequestInfo, optional RequestInit)" - ] + ], + "abort": { + "general.any.html": [ + "response.arrayBuffer() rejects if already aborted", + "response.blob() rejects if already aborted", + "response.formData() rejects if already aborted", + "response.json() rejects if already aborted", + "response.text() rejects if already aborted" + ] + } }, "data-urls": { "base64.any.html": true,