1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 08:33:43 -05:00

fix(ext/web): better handling of errors in resourceForReadableStream (#20238)

Improves error handling when the Resource is closed in various phases of
the ReadableStream. Ensure that we send a consistent `cancel` reason.
This commit is contained in:
Matt Mastracci 2023-08-22 16:16:34 -06:00 committed by GitHub
parent c37b9655b6
commit 9b01307704
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 26 deletions

View file

@ -142,7 +142,7 @@ Deno.test(async function readableStreamClose() {
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 12);
core.ops.op_close(rid);
assertEquals(await cancel, undefined);
assertEquals(await cancel, "resource closed");
});
// Close the stream without reading everything
@ -153,7 +153,7 @@ Deno.test(async function readableStreamClosePartialRead() {
const nread = await core.ops.op_read(rid, buffer);
assertEquals(nread, 5);
core.ops.op_close(rid);
assertEquals(await cancel, undefined);
assertEquals(await cancel, "resource closed");
});
// Close the stream without reading anything
@ -161,7 +161,7 @@ Deno.test(async function readableStreamCloseWithoutRead() {
const cancel = deferred();
const rid = resourceForReadableStream(helloWorldStream(false, cancel));
core.ops.op_close(rid);
assertEquals(await cancel, undefined);
assertEquals(await cancel, "resource closed");
});
Deno.test(async function readableStreamPartial() {
@ -205,7 +205,13 @@ for (
) {
Deno.test(`readableStreamError_${type}`, async function () {
const rid = resourceForReadableStream(errorStream(type));
assertEquals(12, await core.ops.op_read(rid, new Uint8Array(16)));
let nread;
try {
nread = await core.ops.op_read(rid, new Uint8Array(16));
} catch (_) {
fail("Should not have thrown");
}
assertEquals(12, nread);
try {
await core.ops.op_read(rid, new Uint8Array(1));
fail();
@ -297,3 +303,32 @@ function createStreamTest(
}
});
}
Deno.test(async function readableStreamWithAggressiveResourceClose() {
let first = true;
const reasonPromise = deferred();
const rid = resourceForReadableStream(
new ReadableStream({
pull(controller) {
if (first) {
// We queue this up and then immediately close the resource (not the reader)
controller.enqueue(new Uint8Array(1));
core.close(rid);
// This doesn't throw, even though the resource is closed
controller.enqueue(new Uint8Array(1));
first = false;
}
},
cancel(reason) {
reasonPromise.resolve(reason);
},
}),
);
try {
await core.ops.op_read(rid, new Uint8Array(1));
fail();
} catch (e) {
assertEquals(e.message, "operation canceled");
}
assertEquals(await reasonPromise, "resource closed");
});

View file

@ -724,7 +724,7 @@ function resourceForReadableStream(stream) {
PromisePrototypeCatch(
PromisePrototypeThen(
op_readable_stream_resource_await_close(rid),
() => reader.cancel(),
() => reader.cancel("resource closed"),
),
() => {},
);
@ -745,17 +745,25 @@ function resourceForReadableStream(stream) {
break;
}
} catch (err) {
const message = err.message;
if (message) {
await op_readable_stream_resource_write_error(sink, err.message);
} else {
await op_readable_stream_resource_write_error(sink, String(err));
const message = err?.message;
const success = (message && (typeof message == "string"))
? await op_readable_stream_resource_write_error(sink, message)
: await op_readable_stream_resource_write_error(
sink,
String(err),
);
// We don't cancel the reader if there was an error reading. We'll let the downstream
// consumer close the resource after it receives the error.
if (!success) {
reader.cancel("resource closed");
}
break;
}
// If the chunk has non-zero length, write it
if (value.length > 0) {
await op_readable_stream_resource_write_buf(sink, value);
if (!await op_readable_stream_resource_write_buf(sink, value)) {
reader.cancel("resource closed");
}
}
}
} finally {

View file

@ -111,6 +111,10 @@ impl Resource for ReadableStreamResource {
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(ReadableStreamResource::read(self, limit))
}
fn close(self: Rc<Self>) {
self.cancel_handle.cancel();
}
}
// TODO(mmastrac): Move this to deno_core
@ -155,10 +159,6 @@ impl Future for CompletionHandle {
}
}
fn sender_closed() -> Error {
type_error("sender closed")
}
/// Allocate a resource that wraps a ReadableStream.
#[op2(fast)]
#[smi]
@ -210,15 +210,13 @@ fn drop_sender(sender: *const c_void) {
pub fn op_readable_stream_resource_write_buf(
sender: *const c_void,
#[buffer] buffer: JsBuffer,
) -> impl Future<Output = Result<(), Error>> {
) -> impl Future<Output = bool> {
let sender = get_sender(sender);
async move {
let sender = sender.ok_or_else(sender_closed)?;
sender
.send(Ok(buffer.into()))
.await
.map_err(|_| sender_closed())?;
Ok(())
let Some(sender) = sender else {
return false;
};
sender.send(Ok(buffer.into())).await.ok().is_some()
}
}
@ -226,15 +224,17 @@ pub fn op_readable_stream_resource_write_buf(
pub fn op_readable_stream_resource_write_error(
sender: *const c_void,
#[string] error: String,
) -> impl Future<Output = Result<(), Error>> {
) -> impl Future<Output = bool> {
let sender = get_sender(sender);
async move {
let sender = sender.ok_or_else(sender_closed)?;
let Some(sender) = sender else {
return false;
};
sender
.send(Err(type_error(Cow::Owned(error))))
.await
.map_err(|_| sender_closed())?;
Ok(())
.ok()
.is_some()
}
}