mirror of
https://github.com/denoland/deno.git
synced 2024-12-24 08:09:08 -05:00
feat: support abort reasons in Deno APIs and WebSocketStream
(#13066)
This commit is contained in:
parent
9ffc7edc23
commit
01a6b66034
9 changed files with 241 additions and 63 deletions
49
cli/tests/testdata/websocketstream_test.ts
vendored
49
cli/tests/testdata/websocketstream_test.ts
vendored
|
@ -1,9 +1,11 @@
|
|||
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
import {
|
||||
assert,
|
||||
assertEquals,
|
||||
assertRejects,
|
||||
assertThrows,
|
||||
unreachable,
|
||||
} from "../../../test_util/std/testing/asserts.ts";
|
||||
|
||||
Deno.test("fragment", () => {
|
||||
|
@ -89,8 +91,49 @@ Deno.test("aborting immediately throws an AbortError", async () => {
|
|||
controller.abort();
|
||||
await assertRejects(
|
||||
() => wss.connection,
|
||||
DOMException,
|
||||
"connection was aborted",
|
||||
(error: Error) => {
|
||||
assert(error instanceof DOMException);
|
||||
assertEquals(error.name, "AbortError");
|
||||
},
|
||||
);
|
||||
await assertRejects(
|
||||
() => wss.closed,
|
||||
(error: Error) => {
|
||||
assert(error instanceof DOMException);
|
||||
assertEquals(error.name, "AbortError");
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
Deno.test("aborting immediately with a reason throws that reason", async () => {
|
||||
const controller = new AbortController();
|
||||
const wss = new WebSocketStream("ws://localhost:4242", {
|
||||
signal: controller.signal,
|
||||
});
|
||||
const abortReason = new Error();
|
||||
controller.abort(abortReason);
|
||||
await assertRejects(
|
||||
() => wss.connection,
|
||||
(error: Error) => assertEquals(error, abortReason),
|
||||
);
|
||||
await assertRejects(
|
||||
() => wss.closed,
|
||||
(error: Error) => assertEquals(error, abortReason),
|
||||
);
|
||||
});
|
||||
|
||||
Deno.test("aborting immediately with a primitive as reason throws that primitive", async () => {
|
||||
const controller = new AbortController();
|
||||
const wss = new WebSocketStream("ws://localhost:4242", {
|
||||
signal: controller.signal,
|
||||
});
|
||||
controller.abort("Some string");
|
||||
await wss.connection.then(
|
||||
() => unreachable(),
|
||||
(e) => assertEquals(e, "Some string"),
|
||||
);
|
||||
await wss.closed.then(
|
||||
() => unreachable(),
|
||||
(e) => assertEquals(e, "Some string"),
|
||||
);
|
||||
await assertRejects(() => wss.closed, DOMException, "connection was aborted");
|
||||
});
|
||||
|
|
|
@ -6,6 +6,7 @@ import {
|
|||
assertRejects,
|
||||
assertThrows,
|
||||
pathToAbsoluteFileUrl,
|
||||
unreachable,
|
||||
} from "./test_util.ts";
|
||||
|
||||
Deno.test({ permissions: { read: true } }, function readFileSyncSuccess() {
|
||||
|
@ -95,11 +96,52 @@ Deno.test(
|
|||
async function readFileWithAbortSignal() {
|
||||
const ac = new AbortController();
|
||||
queueMicrotask(() => ac.abort());
|
||||
await assertRejects(async () => {
|
||||
await assertRejects(
|
||||
async () => {
|
||||
await Deno.readFile("cli/tests/testdata/fixture.json", {
|
||||
signal: ac.signal,
|
||||
});
|
||||
},
|
||||
(error: Error) => {
|
||||
assert(error instanceof DOMException);
|
||||
assertEquals(error.name, "AbortError");
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true } },
|
||||
async function readFileWithAbortSignalReason() {
|
||||
const ac = new AbortController();
|
||||
const abortReason = new Error();
|
||||
queueMicrotask(() => ac.abort(abortReason));
|
||||
await assertRejects(
|
||||
async () => {
|
||||
await Deno.readFile("cli/tests/testdata/fixture.json", {
|
||||
signal: ac.signal,
|
||||
});
|
||||
},
|
||||
(error: Error) => {
|
||||
assertEquals(error, abortReason);
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true } },
|
||||
async function readFileWithAbortSignalPrimitiveReason() {
|
||||
const ac = new AbortController();
|
||||
queueMicrotask(() => ac.abort("Some string"));
|
||||
try {
|
||||
await Deno.readFile("cli/tests/testdata/fixture.json", {
|
||||
signal: ac.signal,
|
||||
});
|
||||
});
|
||||
unreachable();
|
||||
} catch (e) {
|
||||
assertEquals(e, "Some string");
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import {
|
|||
assertRejects,
|
||||
assertThrows,
|
||||
pathToAbsoluteFileUrl,
|
||||
unreachable,
|
||||
} from "./test_util.ts";
|
||||
|
||||
Deno.test({ permissions: { read: true } }, function readTextFileSyncSuccess() {
|
||||
|
@ -88,11 +89,52 @@ Deno.test(
|
|||
async function readTextFileWithAbortSignal() {
|
||||
const ac = new AbortController();
|
||||
queueMicrotask(() => ac.abort());
|
||||
await assertRejects(async () => {
|
||||
await assertRejects(
|
||||
async () => {
|
||||
await Deno.readFile("cli/tests/testdata/fixture.json", {
|
||||
signal: ac.signal,
|
||||
});
|
||||
},
|
||||
(error: Error) => {
|
||||
assert(error instanceof DOMException);
|
||||
assertEquals(error.name, "AbortError");
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true } },
|
||||
async function readTextFileWithAbortSignalReason() {
|
||||
const ac = new AbortController();
|
||||
const abortReason = new Error();
|
||||
queueMicrotask(() => ac.abort(abortReason));
|
||||
await assertRejects(
|
||||
async () => {
|
||||
await Deno.readFile("cli/tests/testdata/fixture.json", {
|
||||
signal: ac.signal,
|
||||
});
|
||||
},
|
||||
(error: Error) => {
|
||||
assertEquals(error, abortReason);
|
||||
},
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true } },
|
||||
async function readTextFileWithAbortSignalPrimitiveReason() {
|
||||
const ac = new AbortController();
|
||||
queueMicrotask(() => ac.abort("Some string"));
|
||||
try {
|
||||
await Deno.readFile("cli/tests/testdata/fixture.json", {
|
||||
signal: ac.signal,
|
||||
});
|
||||
});
|
||||
unreachable();
|
||||
} catch (e) {
|
||||
assertEquals(e, "Some string");
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import {
|
|||
assertEquals,
|
||||
assertRejects,
|
||||
assertThrows,
|
||||
unreachable,
|
||||
} from "./test_util.ts";
|
||||
|
||||
Deno.test(
|
||||
|
@ -250,6 +251,7 @@ Deno.test(
|
|||
queueMicrotask(() => ac.abort());
|
||||
try {
|
||||
await Deno.writeFile(filename, data, { signal: ac.signal });
|
||||
unreachable();
|
||||
} catch (e) {
|
||||
assert(e instanceof Error);
|
||||
assertEquals(e.name, "AbortError");
|
||||
|
@ -259,6 +261,45 @@ Deno.test(
|
|||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true, write: true } },
|
||||
async function writeFileAbortSignalReason(): Promise<void> {
|
||||
const ac = new AbortController();
|
||||
const enc = new TextEncoder();
|
||||
const data = enc.encode("Hello");
|
||||
const filename = Deno.makeTempDirSync() + "/test.txt";
|
||||
const abortReason = new Error();
|
||||
queueMicrotask(() => ac.abort(abortReason));
|
||||
try {
|
||||
await Deno.writeFile(filename, data, { signal: ac.signal });
|
||||
unreachable();
|
||||
} catch (e) {
|
||||
assertEquals(e, abortReason);
|
||||
}
|
||||
const stat = Deno.statSync(filename);
|
||||
assertEquals(stat.size, 0);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true, write: true } },
|
||||
async function writeFileAbortSignalPrimitiveReason(): Promise<void> {
|
||||
const ac = new AbortController();
|
||||
const enc = new TextEncoder();
|
||||
const data = enc.encode("Hello");
|
||||
const filename = Deno.makeTempDirSync() + "/test.txt";
|
||||
queueMicrotask(() => ac.abort("Some string"));
|
||||
try {
|
||||
await Deno.writeFile(filename, data, { signal: ac.signal });
|
||||
unreachable();
|
||||
} catch (e) {
|
||||
assertEquals(e, "Some string");
|
||||
}
|
||||
const stat = Deno.statSync(filename);
|
||||
assertEquals(stat.size, 0);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true, write: true } },
|
||||
async function writeFileAbortSignalPreAborted(): Promise<void> {
|
||||
|
@ -269,6 +310,7 @@ Deno.test(
|
|||
const filename = Deno.makeTempDirSync() + "/test.txt";
|
||||
try {
|
||||
await Deno.writeFile(filename, data, { signal: ac.signal });
|
||||
unreachable();
|
||||
} catch (e) {
|
||||
assert(e instanceof Error);
|
||||
assertEquals(e.name, "AbortError");
|
||||
|
@ -277,3 +319,44 @@ Deno.test(
|
|||
assertEquals(stat.size, 0);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true, write: true } },
|
||||
async function writeFileAbortSignalReasonPreAborted(): Promise<void> {
|
||||
const ac = new AbortController();
|
||||
const abortReason = new Error();
|
||||
ac.abort(abortReason);
|
||||
const enc = new TextEncoder();
|
||||
const data = enc.encode("Hello");
|
||||
const filename = Deno.makeTempDirSync() + "/test.txt";
|
||||
try {
|
||||
await Deno.writeFile(filename, data, { signal: ac.signal });
|
||||
unreachable();
|
||||
} catch (e) {
|
||||
assertEquals(e, abortReason);
|
||||
}
|
||||
const stat = Deno.statSync(filename);
|
||||
assertEquals(stat.size, 0);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true, write: true } },
|
||||
async function writeFileAbortSignalPrimitiveReasonPreAborted(): Promise<
|
||||
void
|
||||
> {
|
||||
const ac = new AbortController();
|
||||
ac.abort("Some string");
|
||||
const enc = new TextEncoder();
|
||||
const data = enc.encode("Hello");
|
||||
const filename = Deno.makeTempDirSync() + "/test.txt";
|
||||
try {
|
||||
await Deno.writeFile(filename, data, { signal: ac.signal });
|
||||
unreachable();
|
||||
} catch (e) {
|
||||
assertEquals(e, "Some string");
|
||||
}
|
||||
const stat = Deno.statSync(filename);
|
||||
assertEquals(stat.size, 0);
|
||||
},
|
||||
);
|
||||
|
|
|
@ -125,10 +125,7 @@
|
|||
|
||||
if (options.signal?.aborted) {
|
||||
core.close(cancelRid);
|
||||
const err = new DOMException(
|
||||
"This operation was aborted",
|
||||
"AbortError",
|
||||
);
|
||||
const err = options.signal.reason;
|
||||
this[_connection].reject(err);
|
||||
this[_closed].reject(err);
|
||||
} else {
|
||||
|
@ -313,7 +310,12 @@
|
|||
}
|
||||
},
|
||||
(err) => {
|
||||
core.tryClose(cancelRid);
|
||||
if (err instanceof core.Interrupted) {
|
||||
// The signal was aborted.
|
||||
err = options.signal.reason;
|
||||
} else {
|
||||
core.tryClose(cancelRid);
|
||||
}
|
||||
this[_connection].reject(err);
|
||||
this[_closed].reject(err);
|
||||
},
|
||||
|
|
|
@ -298,10 +298,7 @@ where
|
|||
let client = client_async(request, socket);
|
||||
let (stream, response): (WsStream, Response) =
|
||||
if let Some(cancel_resource) = cancel_resource {
|
||||
client
|
||||
.or_cancel(cancel_resource.0.to_owned())
|
||||
.await
|
||||
.map_err(|_| DomExceptionAbortError::new("connection was aborted"))?
|
||||
client.or_cancel(cancel_resource.0.to_owned()).await?
|
||||
} else {
|
||||
client.await
|
||||
}
|
||||
|
@ -508,29 +505,3 @@ pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> {
|
|||
e.downcast_ref::<DomExceptionNetworkError>()
|
||||
.map(|_| "DOMExceptionNetworkError")
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DomExceptionAbortError {
|
||||
pub msg: String,
|
||||
}
|
||||
|
||||
impl DomExceptionAbortError {
|
||||
pub fn new(msg: &str) -> Self {
|
||||
DomExceptionAbortError {
|
||||
msg: msg.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for DomExceptionAbortError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
f.pad(&self.msg)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for DomExceptionAbortError {}
|
||||
|
||||
pub fn get_abort_error_class_name(e: &AnyError) -> Option<&'static str> {
|
||||
e.downcast_ref::<DomExceptionAbortError>()
|
||||
.map(|_| "DOMExceptionAbortError")
|
||||
}
|
||||
|
|
|
@ -158,7 +158,6 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
|
|||
.or_else(|| deno_web::get_error_class_name(e))
|
||||
.or_else(|| deno_webstorage::get_not_supported_error_class_name(e))
|
||||
.or_else(|| deno_websocket::get_network_error_class_name(e))
|
||||
.or_else(|| deno_websocket::get_abort_error_class_name(e))
|
||||
.or_else(|| {
|
||||
e.downcast_ref::<dlopen::Error>()
|
||||
.map(get_dlopen_error_class)
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
|
||||
((window) => {
|
||||
const core = window.Deno.core;
|
||||
const { DOMException } = window.__bootstrap.domException;
|
||||
const {
|
||||
Uint8Array,
|
||||
ArrayPrototypePush,
|
||||
|
@ -123,7 +122,8 @@
|
|||
async function readAllInner(r, options) {
|
||||
const buffers = [];
|
||||
const signal = options?.signal ?? null;
|
||||
while (!signal?.aborted) {
|
||||
while (true) {
|
||||
signal?.throwIfAborted();
|
||||
const buf = new Uint8Array(READ_PER_ITER);
|
||||
const read = await r.read(buf);
|
||||
if (typeof read == "number") {
|
||||
|
@ -132,9 +132,7 @@
|
|||
break;
|
||||
}
|
||||
}
|
||||
if (signal?.aborted) {
|
||||
throw new DOMException("The read operation was aborted.", "AbortError");
|
||||
}
|
||||
signal?.throwIfAborted();
|
||||
|
||||
return concatBuffers(buffers);
|
||||
}
|
||||
|
@ -200,7 +198,8 @@
|
|||
const buf = new Uint8Array(size + 1); // 1B to detect extended files
|
||||
let cursor = 0;
|
||||
const signal = options?.signal ?? null;
|
||||
while (!signal?.aborted && cursor < size) {
|
||||
while (cursor < size) {
|
||||
signal?.throwIfAborted();
|
||||
const sliceEnd = MathMin(size + 1, cursor + READ_PER_ITER);
|
||||
const slice = buf.subarray(cursor, sliceEnd);
|
||||
const read = await r.read(slice);
|
||||
|
@ -210,9 +209,7 @@
|
|||
break;
|
||||
}
|
||||
}
|
||||
if (signal?.aborted) {
|
||||
throw new DOMException("The read operation was aborted.", "AbortError");
|
||||
}
|
||||
signal?.throwIfAborted();
|
||||
|
||||
// Handle truncated or extended files during read
|
||||
if (cursor > size) {
|
||||
|
|
|
@ -13,9 +13,7 @@
|
|||
data,
|
||||
options = {},
|
||||
) {
|
||||
if (options?.signal?.aborted) {
|
||||
throw new DOMException("The write operation was aborted.", "AbortError");
|
||||
}
|
||||
options.signal?.throwIfAborted();
|
||||
if (options.create !== undefined) {
|
||||
const create = !!options.create;
|
||||
if (!create) {
|
||||
|
@ -73,14 +71,15 @@
|
|||
|
||||
const signal = options?.signal ?? null;
|
||||
let nwritten = 0;
|
||||
while (!signal?.aborted && nwritten < data.length) {
|
||||
nwritten += await file.write(TypedArrayPrototypeSubarray(data, nwritten));
|
||||
}
|
||||
|
||||
file.close();
|
||||
|
||||
if (signal?.aborted) {
|
||||
throw new DOMException("The write operation was aborted.", "AbortError");
|
||||
try {
|
||||
while (nwritten < data.length) {
|
||||
signal?.throwIfAborted();
|
||||
nwritten += await file.write(
|
||||
TypedArrayPrototypeSubarray(data, nwritten),
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
file.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue