1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-24 08:09:08 -05:00

feat(fetch): implement abort (#10863)

This commit introduces fetch aborting via an AbortSignal.
This commit is contained in:
Luca Casonato 2021-06-06 15:37:17 +02:00 committed by GitHub
parent 3f9187c366
commit 1fb2e23a67
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 325 additions and 63 deletions

View file

@ -3227,6 +3227,10 @@
}
}
function errorReadableStream(stream, e) {
readableStreamDefaultControllerError(stream[_controller], e);
}
/** @template R */
class ReadableStreamGenericReader {
/** @type {Deferred<void>} */
@ -3873,6 +3877,7 @@
window.__bootstrap.streams = {
// Non-Public
isReadableStreamDisturbed,
errorReadableStream,
// Exposed in global runtime scope
ByteLengthQueuingStrategy,
CountQueuingStrategy,

View file

@ -20,7 +20,8 @@
const { parseFormData, formDataFromEntries, encodeFormData } =
globalThis.__bootstrap.formData;
const mimesniff = globalThis.__bootstrap.mimesniff;
const { isReadableStreamDisturbed } = globalThis.__bootstrap.streams;
const { isReadableStreamDisturbed, errorReadableStream } =
globalThis.__bootstrap.streams;
class InnerBody {
/** @type {ReadableStream<Uint8Array> | { body: Uint8Array, consumed: boolean }} */
@ -106,6 +107,22 @@
}
}
cancel(error) {
if (this.streamOrStatic instanceof ReadableStream) {
this.streamOrStatic.cancel(error);
} else {
this.streamOrStatic.consumed = true;
}
}
error(error) {
if (this.streamOrStatic instanceof ReadableStream) {
errorReadableStream(this.streamOrStatic, error);
} else {
this.streamOrStatic.consumed = true;
}
}
/**
* @returns {InnerBody}
*/

View file

@ -26,9 +26,11 @@
getDecodeSplitHeader,
} = window.__bootstrap.headers;
const { HttpClient } = window.__bootstrap.fetch;
const abortSignal = window.__bootstrap.abortSignal;
const _request = Symbol("request");
const _headers = Symbol("headers");
const _signal = Symbol("signal");
const _mimeType = Symbol("mime type");
const _body = Symbol("body");
@ -145,6 +147,8 @@
[_request];
/** @type {Headers} */
[_headers];
/** @type {AbortSignal} */
[_signal];
get [_mimeType]() {
let charset = null;
let essence = null;
@ -206,6 +210,9 @@
let request;
const baseURL = getLocationHref();
// 4.
let signal = null;
// 5.
if (typeof input === "string") {
const parsedURL = new URL(input, baseURL);
@ -213,8 +220,12 @@
} else { // 6.
if (!(input instanceof Request)) throw new TypeError("Unreachable");
request = input[_request];
signal = input[_signal];
}
// 12.
// TODO(lucacasonato): create a copy of `request`
// 22.
if (init.redirect !== undefined) {
request.redirectMode = init.redirect;
@ -227,6 +238,11 @@
request.method = method;
}
// 26.
if (init.signal !== undefined) {
signal = init.signal;
}
// NOTE: non standard extension. This handles Deno.HttpClient parameter
if (init.client !== undefined) {
if (init.client !== null && !(init.client instanceof HttpClient)) {
@ -242,6 +258,12 @@
// 27.
this[_request] = request;
// 28.
this[_signal] = abortSignal.newSignal();
if (signal !== null) {
abortSignal.follow(this[_signal], signal);
}
// 29.
this[_headers] = headersFromHeaderList(request.headerList, "request");
@ -299,6 +321,9 @@
// 40.
request.body = finalBody;
// 41.
// TODO(lucacasonato): Extranious? https://github.com/whatwg/fetch/issues/1249
}
get method() {
@ -321,13 +346,24 @@
return this[_request].redirectMode;
}
get signal() {
webidl.assertBranded(this, Request);
return this[_signal];
}
clone() {
webidl.assertBranded(this, Request);
if (this[_body] && this[_body].unusable()) {
throw new TypeError("Body is unusable.");
}
const newReq = cloneInnerRequest(this[_request]);
return fromInnerRequest(newReq, guardFromHeaders(this[_headers]));
const newSignal = abortSignal.newSignal();
abortSignal.follow(newSignal, this[_signal]);
return fromInnerRequest(
newReq,
newSignal,
guardFromHeaders(this[_headers]),
);
}
get [Symbol.toStringTag]() {
@ -364,6 +400,10 @@
enumerable: true,
configurable: true,
});
Object.defineProperty(Request.prototype, "signal", {
enumerable: true,
configurable: true,
});
Object.defineProperty(Request.prototype, "clone", {
enumerable: true,
writable: true,
@ -403,6 +443,12 @@
),
},
{ key: "redirect", converter: webidl.converters["RequestRedirect"] },
{
key: "signal",
converter: webidl.createNullableConverter(
webidl.converters["AbortSignal"],
),
},
{ key: "client", converter: webidl.converters.any },
],
);
@ -420,9 +466,10 @@
* @param {"request" | "immutable" | "request-no-cors" | "response" | "none"} guard
* @returns {Request}
*/
function fromInnerRequest(inner, guard) {
function fromInnerRequest(inner, signal, guard) {
const request = webidl.createBranded(Request);
request[_request] = inner;
request[_signal] = signal;
request[_headers] = headersFromHeaderList(inner.headerList, guard);
return request;
}

View file

@ -48,6 +48,7 @@
* @property {string} statusMessage
* @property {[string, string][]} headerList
* @property {null | typeof __window.bootstrap.fetchBody.InnerBody} body
* @property {boolean} aborted
* @property {string} [error]
*/
@ -92,12 +93,14 @@
urlList,
status: response.status,
statusMessage: response.statusMessage,
aborted: response.aborted,
};
}
const defaultInnerResponse = {
type: "default",
body: null,
aborted: false,
url() {
if (this.urlList.length == 0) return null;
return this.urlList[this.urlList.length - 1];
@ -128,6 +131,15 @@
return resp;
}
/**
* @returns {InnerResponse}
*/
function abortedNetworkError() {
const resp = networkError("aborted");
resp.aborted = true;
return resp;
}
class Response {
/** @type {InnerResponse} */
[_response];
@ -446,4 +458,5 @@
window.__bootstrap.fetch.redirectStatus = redirectStatus;
window.__bootstrap.fetch.nullBodyStatus = nullBodyStatus;
window.__bootstrap.fetch.networkError = networkError;
window.__bootstrap.fetch.abortedNetworkError = abortedNetworkError;
})(globalThis);

View file

@ -15,14 +15,18 @@
const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { byteLowerCase } = window.__bootstrap.infra;
const { errorReadableStream } = window.__bootstrap.streams;
const { InnerBody, extractBody } = window.__bootstrap.fetchBody;
const {
toInnerRequest,
toInnerResponse,
fromInnerResponse,
redirectStatus,
nullBodyStatus,
networkError,
abortedNetworkError,
} = window.__bootstrap.fetch;
const abortSignal = window.__bootstrap.abortSignal;
const REQUEST_BODY_HEADER_NAMES = [
"content-encoding",
@ -68,10 +72,26 @@
/**
* @param {number} responseBodyRid
* @param {AbortSignal} [terminator]
* @returns {ReadableStream<Uint8Array>}
*/
function createResponseBodyStream(responseBodyRid) {
return new ReadableStream({
function createResponseBodyStream(responseBodyRid, terminator) {
function onAbort() {
if (readable) {
errorReadableStream(
readable,
new DOMException("Ongoing fetch was aborted.", "AbortError"),
);
}
try {
core.close(responseBodyRid);
} catch (_) {
// might have already been closed
}
}
// TODO(lucacasonato): clean up registration
terminator[abortSignal.add](onAbort);
const readable = new ReadableStream({
type: "bytes",
async pull(controller) {
try {
@ -88,28 +108,45 @@
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
core.close(responseBodyRid);
try {
core.close(responseBodyRid);
} catch (_) {
// might have already been closed
}
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
core.close(responseBodyRid);
if (terminator.aborted) {
controller.error(
new DOMException("Ongoing fetch was aborted.", "AbortError"),
);
} else {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
}
try {
core.close(responseBodyRid);
} catch (_) {
// might have already been closed
}
}
},
cancel() {
core.close(responseBodyRid);
if (!terminator.aborted) {
terminator[abortSignal.signalAbort]();
}
},
});
return readable;
}
/**
* @param {InnerRequest} req
* @param {boolean} recursive
* @param {AbortSignal} terminator
* @returns {Promise<InnerResponse>}
*/
async function mainFetch(req, recursive) {
async function mainFetch(req, recursive, terminator) {
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
let reqBody = null;
if (req.body !== null) {
@ -130,7 +167,7 @@
}
}
const { requestRid, requestBodyRid } = opFetch({
const { requestRid, requestBodyRid, cancelHandleRid } = opFetch({
method: req.method,
url: req.currentUrl(),
headers: req.headerList,
@ -138,6 +175,20 @@
hasBody: reqBody !== null,
}, reqBody instanceof Uint8Array ? reqBody : null);
function onAbort() {
try {
core.close(cancelHandleRid);
} catch (_) {
// might have already been closed
}
try {
core.close(requestBodyRid);
} catch (_) {
// might have already been closed
}
}
terminator[abortSignal.add](onAbort);
if (requestBodyRid !== null) {
if (reqBody === null || !(reqBody instanceof ReadableStream)) {
throw new TypeError("Unreachable");
@ -145,24 +196,49 @@
const reader = reqBody.getReader();
(async () => {
while (true) {
const { value, done } = await reader.read();
const { value, done } = await reader.read().catch((err) => {
if (terminator.aborted) return { done: true, value: undefined };
throw err;
});
if (done) break;
if (!(value instanceof Uint8Array)) {
await reader.cancel("value not a Uint8Array");
break;
}
try {
await opFetchRequestWrite(requestBodyRid, value);
await opFetchRequestWrite(requestBodyRid, value).catch((err) => {
if (terminator.aborted) return;
throw err;
});
if (terminator.aborted) break;
} catch (err) {
await reader.cancel(err);
break;
}
}
core.close(requestBodyRid);
try {
core.close(requestBodyRid);
} catch (_) {
// might have already been closed
}
})();
}
const resp = await opFetchSend(requestRid);
let resp;
try {
resp = await opFetchSend(requestRid).catch((err) => {
if (terminator.aborted) return;
throw err;
});
} finally {
try {
core.close(cancelHandleRid);
} catch (_) {
// might have already been closed
}
}
if (terminator.aborted) return abortedNetworkError();
/** @type {InnerResponse} */
const response = {
headerList: resp.headers,
@ -185,7 +261,7 @@
);
case "follow":
core.close(resp.responseRid);
return httpRedirectFetch(req, response);
return httpRedirectFetch(req, response, terminator);
case "manual":
break;
}
@ -194,7 +270,9 @@
if (nullBodyStatus(response.status)) {
core.close(resp.responseRid);
} else {
response.body = new InnerBody(createResponseBodyStream(resp.responseRid));
response.body = new InnerBody(
createResponseBodyStream(resp.responseRid, terminator),
);
}
if (recursive) return response;
@ -211,7 +289,7 @@
* @param {InnerResponse} response
* @returns {Promise<InnerResponse>}
*/
function httpRedirectFetch(request, response) {
function httpRedirectFetch(request, response, terminator) {
const locationHeaders = response.headerList.filter((entry) =>
byteLowerCase(entry[0]) === "location"
);
@ -264,43 +342,90 @@
request.body = res.body;
}
request.urlList.push(locationURL.href);
return mainFetch(request, true);
return mainFetch(request, true, terminator);
}
/**
* @param {RequestInfo} input
* @param {RequestInit} init
*/
async function fetch(input, init = {}) {
const prefix = "Failed to call 'fetch'";
webidl.requiredArguments(arguments.length, 1, { prefix });
input = webidl.converters["RequestInfo"](input, {
prefix,
context: "Argument 1",
});
init = webidl.converters["RequestInit"](init, {
prefix,
context: "Argument 2",
});
function fetch(input, init = {}) {
// 1.
const requestObject = new Request(input, init);
// 2.
const request = toInnerRequest(requestObject);
// 10.
if (!requestObject.headers.has("Accept")) {
request.headerList.push(["Accept", "*/*"]);
}
const p = new Promise((resolve, reject) => {
const prefix = "Failed to call 'fetch'";
webidl.requiredArguments(arguments.length, 1, { prefix });
input = webidl.converters["RequestInfo"](input, {
prefix,
context: "Argument 1",
});
init = webidl.converters["RequestInit"](init, {
prefix,
context: "Argument 2",
});
// 12.
const response = await mainFetch(request, false);
if (response.type === "error") {
throw new TypeError(
"Fetch failed: " + (response.error ?? "unknown error"),
);
}
// 2.
const requestObject = new Request(input, init);
// 3.
const request = toInnerRequest(requestObject);
// 4.
if (requestObject.signal.aborted) {
reject(abortFetch(request, null));
return;
}
return fromInnerResponse(response, "immutable");
// 7.
let responseObject = null;
// 9.
let locallyAborted = false;
// 10.
function onabort() {
locallyAborted = true;
reject(abortFetch(request, responseObject));
}
requestObject.signal[abortSignal.add](onabort);
if (!requestObject.headers.has("Accept")) {
request.headerList.push(["Accept", "*/*"]);
}
// 12.
mainFetch(request, false, requestObject.signal).then((response) => {
// 12.1.
if (locallyAborted) return;
// 12.2.
if (response.aborted) {
reject(request, responseObject);
requestObject.signal[abortSignal.remove](onabort);
return;
}
// 12.3.
if (response.type === "error") {
const err = new TypeError(
"Fetch failed: " + (response.error ?? "unknown error"),
);
reject(err);
requestObject.signal[abortSignal.remove](onabort);
return;
}
responseObject = fromInnerResponse(response, "immutable");
resolve(responseObject);
requestObject.signal[abortSignal.remove](onabort);
}).catch((err) => {
reject(err);
requestObject.signal[abortSignal.remove](onabort);
});
});
return p;
}
function abortFetch(request, responseObject) {
const error = new DOMException("Ongoing fetch was aborted.", "AbortError");
if (request.body !== null) request.body.cancel(error);
if (responseObject !== null) {
const response = toInnerResponse(responseObject);
if (response.body !== null) response.body.error(error);
}
return error;
}
window.__bootstrap.fetch ??= {};

View file

@ -82,6 +82,7 @@ declare namespace globalThis {
function toInnerRequest(request: Request): InnerRequest;
function fromInnerRequest(
inner: InnerRequest,
signal: AbortSignal | null,
guard:
| "request"
| "immutable"

View file

@ -16,6 +16,7 @@ use deno_core::AsyncRefCell;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::Canceled;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::RcRef;
@ -131,6 +132,7 @@ pub struct FetchArgs {
pub struct FetchReturn {
request_rid: ResourceId,
request_body_rid: Option<ResourceId>,
cancel_handle_rid: Option<ResourceId>,
}
pub fn op_fetch<FP>(
@ -157,7 +159,7 @@ where
// Check scheme before asking for net permission
let scheme = url.scheme();
let (request_rid, request_body_rid) = match scheme {
let (request_rid, request_body_rid, cancel_handle_rid) = match scheme {
"http" | "https" => {
let permissions = state.borrow_mut::<FP>();
permissions.check_net_url(&url)?;
@ -195,13 +197,19 @@ where
request = request.header(name, v);
}
let fut = request.send();
let cancel_handle = CancelHandle::new_rc();
let cancel_handle_ = cancel_handle.clone();
let fut = async move { request.send().or_cancel(cancel_handle_).await };
let request_rid = state
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
(request_rid, request_body_rid)
let cancel_handle_rid =
state.resource_table.add(FetchCancelHandle(cancel_handle));
(request_rid, request_body_rid, Some(cancel_handle_rid))
}
"data" => {
let data_url = DataUrl::process(url.as_str())
@ -216,13 +224,13 @@ where
.header(http::header::CONTENT_TYPE, data_url.mime_type().to_string())
.body(reqwest::Body::from(body))?;
let fut = async move { Ok(Response::from(response)) };
let fut = async move { Ok(Ok(Response::from(response))) };
let request_rid = state
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
(request_rid, None)
(request_rid, None, None)
}
"blob" => {
let blob_url_storage =
@ -244,13 +252,13 @@ where
.header(http::header::CONTENT_TYPE, blob.media_type)
.body(reqwest::Body::from(blob.data))?;
let fut = async move { Ok(Response::from(response)) };
let fut = async move { Ok(Ok(Response::from(response))) };
let request_rid = state
.resource_table
.add(FetchRequestResource(Box::pin(fut)));
(request_rid, None)
(request_rid, None, None)
}
_ => return Err(type_error(format!("scheme '{}' not supported", scheme))),
};
@ -258,6 +266,7 @@ where
Ok(FetchReturn {
request_rid,
request_body_rid,
cancel_handle_rid,
})
}
@ -287,8 +296,9 @@ pub async fn op_fetch_send(
.expect("multiple op_fetch_send ongoing");
let res = match request.0.await {
Ok(res) => res,
Err(e) => return Err(type_error(e.to_string())),
Ok(Ok(res)) => res,
Ok(Err(err)) => return Err(type_error(err.to_string())),
Err(_) => return Err(type_error("request was cancelled")),
};
//debug!("Fetch response {}", url);
@ -372,8 +382,11 @@ pub async fn op_fetch_response_read(
Ok(read)
}
type CancelableResponseResult =
Result<Result<Response, reqwest::Error>, Canceled>;
struct FetchRequestResource(
Pin<Box<dyn Future<Output = Result<Response, reqwest::Error>>>>,
Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
);
impl Resource for FetchRequestResource {
@ -382,6 +395,18 @@ impl Resource for FetchRequestResource {
}
}
struct FetchCancelHandle(Rc<CancelHandle>);
impl Resource for FetchCancelHandle {
fn name(&self) -> Cow<str> {
"fetchCancelHandle".into()
}
fn close(self: Rc<Self>) {
self.0.cancel()
}
}
struct FetchRequestBodyResource {
body: AsyncRefCell<mpsc::Sender<std::io::Result<Vec<u8>>>>,
cancel: CancelHandle,
@ -391,6 +416,10 @@ impl Resource for FetchRequestBodyResource {
fn name(&self) -> Cow<str> {
"fetchRequestBody".into()
}
fn close(self: Rc<Self>) {
self.cancel.cancel()
}
}
type BytesStream =
@ -405,6 +434,10 @@ impl Resource for FetchResponseBodyResource {
fn name(&self) -> Cow<str> {
"fetchResponseBody".into()
}
fn close(self: Rc<Self>) {
self.cancel.cancel()
}
}
struct HttpClientResource {

View file

@ -118,11 +118,25 @@
AbortSignal,
);
function newSignal() {
return new AbortSignal(illegalConstructorKey);
}
function follow(followingSignal, parentSignal) {
if (parentSignal.aborted) {
followingSignal[signalAbort]();
} else {
parentSignal[add](() => followingSignal[signalAbort]());
}
}
window.AbortSignal = AbortSignal;
window.AbortController = AbortController;
window.__bootstrap.abortSignal = {
add,
signalAbort,
remove,
follow,
newSignal,
};
})(this);

View file

@ -72,7 +72,7 @@
headersList,
body !== null ? new InnerBody(body) : null,
);
const request = fromInnerRequest(innerRequest, "immutable");
const request = fromInnerRequest(innerRequest, null, "immutable");
const respondWith = createRespondWith(this, responseSenderRid);

View file

@ -1052,7 +1052,6 @@
"Request interface: attribute keepalive",
"Request interface: attribute isReloadNavigation",
"Request interface: attribute isHistoryNavigation",
"Request interface: attribute signal",
"Request interface: attribute body",
"Request interface: attribute bodyUsed",
"Request interface: new Request('about:blank') must inherit property \"destination\" with the proper type",
@ -1065,14 +1064,22 @@
"Request interface: new Request('about:blank') must inherit property \"keepalive\" with the proper type",
"Request interface: new Request('about:blank') must inherit property \"isReloadNavigation\" with the proper type",
"Request interface: new Request('about:blank') must inherit property \"isHistoryNavigation\" with the proper type",
"Request interface: new Request('about:blank') must inherit property \"signal\" with the proper type",
"Response interface: operation error()",
"Response interface: operation redirect(USVString, optional unsigned short)",
"Response interface: attribute body",
"Response interface: attribute bodyUsed",
"Response interface: calling redirect(USVString, optional unsigned short) on new Response() with too few arguments must throw TypeError",
"Window interface: operation fetch(RequestInfo, optional RequestInit)"
]
],
"abort": {
"general.any.html": [
"response.arrayBuffer() rejects if already aborted",
"response.blob() rejects if already aborted",
"response.formData() rejects if already aborted",
"response.json() rejects if already aborted",
"response.text() rejects if already aborted"
]
}
},
"data-urls": {
"base64.any.html": true,