mirror of
https://github.com/denoland/deno.git
synced 2024-12-24 08:09:08 -05:00
fix(ext/web): better handling of errors in resourceForReadableStream (#20238)
Improves error handling when the Resource is closed in various phases of the ReadableStream. Ensure that we send a consistent `cancel` reason.
This commit is contained in:
parent
3e9fb8aafd
commit
f1dc054d52
3 changed files with 69 additions and 26 deletions
|
@ -142,7 +142,7 @@ Deno.test(async function readableStreamClose() {
|
|||
const nread = await core.ops.op_read(rid, buffer);
|
||||
assertEquals(nread, 12);
|
||||
core.ops.op_close(rid);
|
||||
assertEquals(await cancel, undefined);
|
||||
assertEquals(await cancel, "resource closed");
|
||||
});
|
||||
|
||||
// Close the stream without reading everything
|
||||
|
@ -153,7 +153,7 @@ Deno.test(async function readableStreamClosePartialRead() {
|
|||
const nread = await core.ops.op_read(rid, buffer);
|
||||
assertEquals(nread, 5);
|
||||
core.ops.op_close(rid);
|
||||
assertEquals(await cancel, undefined);
|
||||
assertEquals(await cancel, "resource closed");
|
||||
});
|
||||
|
||||
// Close the stream without reading anything
|
||||
|
@ -161,7 +161,7 @@ Deno.test(async function readableStreamCloseWithoutRead() {
|
|||
const cancel = deferred();
|
||||
const rid = resourceForReadableStream(helloWorldStream(false, cancel));
|
||||
core.ops.op_close(rid);
|
||||
assertEquals(await cancel, undefined);
|
||||
assertEquals(await cancel, "resource closed");
|
||||
});
|
||||
|
||||
Deno.test(async function readableStreamPartial() {
|
||||
|
@ -205,7 +205,13 @@ for (
|
|||
) {
|
||||
Deno.test(`readableStreamError_${type}`, async function () {
|
||||
const rid = resourceForReadableStream(errorStream(type));
|
||||
assertEquals(12, await core.ops.op_read(rid, new Uint8Array(16)));
|
||||
let nread;
|
||||
try {
|
||||
nread = await core.ops.op_read(rid, new Uint8Array(16));
|
||||
} catch (_) {
|
||||
fail("Should not have thrown");
|
||||
}
|
||||
assertEquals(12, nread);
|
||||
try {
|
||||
await core.ops.op_read(rid, new Uint8Array(1));
|
||||
fail();
|
||||
|
@ -297,3 +303,32 @@ function createStreamTest(
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
Deno.test(async function readableStreamWithAggressiveResourceClose() {
|
||||
let first = true;
|
||||
const reasonPromise = deferred();
|
||||
const rid = resourceForReadableStream(
|
||||
new ReadableStream({
|
||||
pull(controller) {
|
||||
if (first) {
|
||||
// We queue this up and then immediately close the resource (not the reader)
|
||||
controller.enqueue(new Uint8Array(1));
|
||||
core.close(rid);
|
||||
// This doesn't throw, even though the resource is closed
|
||||
controller.enqueue(new Uint8Array(1));
|
||||
first = false;
|
||||
}
|
||||
},
|
||||
cancel(reason) {
|
||||
reasonPromise.resolve(reason);
|
||||
},
|
||||
}),
|
||||
);
|
||||
try {
|
||||
await core.ops.op_read(rid, new Uint8Array(1));
|
||||
fail();
|
||||
} catch (e) {
|
||||
assertEquals(e.message, "operation canceled");
|
||||
}
|
||||
assertEquals(await reasonPromise, "resource closed");
|
||||
});
|
||||
|
|
|
@ -724,7 +724,7 @@ function resourceForReadableStream(stream) {
|
|||
PromisePrototypeCatch(
|
||||
PromisePrototypeThen(
|
||||
op_readable_stream_resource_await_close(rid),
|
||||
() => reader.cancel(),
|
||||
() => reader.cancel("resource closed"),
|
||||
),
|
||||
() => {},
|
||||
);
|
||||
|
@ -745,17 +745,25 @@ function resourceForReadableStream(stream) {
|
|||
break;
|
||||
}
|
||||
} catch (err) {
|
||||
const message = err.message;
|
||||
if (message) {
|
||||
await op_readable_stream_resource_write_error(sink, err.message);
|
||||
} else {
|
||||
await op_readable_stream_resource_write_error(sink, String(err));
|
||||
const message = err?.message;
|
||||
const success = (message && (typeof message == "string"))
|
||||
? await op_readable_stream_resource_write_error(sink, message)
|
||||
: await op_readable_stream_resource_write_error(
|
||||
sink,
|
||||
String(err),
|
||||
);
|
||||
// We don't cancel the reader if there was an error reading. We'll let the downstream
|
||||
// consumer close the resource after it receives the error.
|
||||
if (!success) {
|
||||
reader.cancel("resource closed");
|
||||
}
|
||||
break;
|
||||
}
|
||||
// If the chunk has non-zero length, write it
|
||||
if (value.length > 0) {
|
||||
await op_readable_stream_resource_write_buf(sink, value);
|
||||
if (!await op_readable_stream_resource_write_buf(sink, value)) {
|
||||
reader.cancel("resource closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -111,6 +111,10 @@ impl Resource for ReadableStreamResource {
|
|||
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
|
||||
Box::pin(ReadableStreamResource::read(self, limit))
|
||||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel_handle.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(mmastrac): Move this to deno_core
|
||||
|
@ -155,10 +159,6 @@ impl Future for CompletionHandle {
|
|||
}
|
||||
}
|
||||
|
||||
fn sender_closed() -> Error {
|
||||
type_error("sender closed")
|
||||
}
|
||||
|
||||
/// Allocate a resource that wraps a ReadableStream.
|
||||
#[op2(fast)]
|
||||
#[smi]
|
||||
|
@ -210,15 +210,13 @@ fn drop_sender(sender: *const c_void) {
|
|||
pub fn op_readable_stream_resource_write_buf(
|
||||
sender: *const c_void,
|
||||
#[buffer] buffer: JsBuffer,
|
||||
) -> impl Future<Output = Result<(), Error>> {
|
||||
) -> impl Future<Output = bool> {
|
||||
let sender = get_sender(sender);
|
||||
async move {
|
||||
let sender = sender.ok_or_else(sender_closed)?;
|
||||
sender
|
||||
.send(Ok(buffer.into()))
|
||||
.await
|
||||
.map_err(|_| sender_closed())?;
|
||||
Ok(())
|
||||
let Some(sender) = sender else {
|
||||
return false;
|
||||
};
|
||||
sender.send(Ok(buffer.into())).await.ok().is_some()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -226,15 +224,17 @@ pub fn op_readable_stream_resource_write_buf(
|
|||
pub fn op_readable_stream_resource_write_error(
|
||||
sender: *const c_void,
|
||||
#[string] error: String,
|
||||
) -> impl Future<Output = Result<(), Error>> {
|
||||
) -> impl Future<Output = bool> {
|
||||
let sender = get_sender(sender);
|
||||
async move {
|
||||
let sender = sender.ok_or_else(sender_closed)?;
|
||||
let Some(sender) = sender else {
|
||||
return false;
|
||||
};
|
||||
sender
|
||||
.send(Err(type_error(Cow::Owned(error))))
|
||||
.await
|
||||
.map_err(|_| sender_closed())?;
|
||||
Ok(())
|
||||
.ok()
|
||||
.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue