From c059c12a1ec7da82e793e1ab0763e16d307e2895 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Mon, 19 Dec 2022 12:49:00 +0100 Subject: [PATCH] fix(ext/fetch): handle errors in req body stream (#17081) Right now an error in a request body stream causes an uncatchable global promise rejection. This PR fixes this to instead propagate the error correctly into the promise returned from `fetch`. It additionally fixes errored readable stream bodies being treated as successfully completed bodies by Rust. --- cli/tests/unit/fetch_test.ts | 44 ++++++++++++++++++ ext/fetch/26_fetch.js | 73 ++++++++++++++++++++---------- ext/fetch/byte_stream.rs | 87 ++++++++++++++++++++++++++++++++++++ ext/fetch/lib.rs | 38 +++++++++++++--- 4 files changed, 212 insertions(+), 30 deletions(-) create mode 100644 ext/fetch/byte_stream.rs diff --git a/cli/tests/unit/fetch_test.ts b/cli/tests/unit/fetch_test.ts index fde119bf15..7035fe4445 100644 --- a/cli/tests/unit/fetch_test.ts +++ b/cli/tests/unit/fetch_test.ts @@ -4,6 +4,7 @@ import { assertEquals, assertRejects, deferred, + delay, fail, unimplemented, } from "./test_util.ts"; @@ -1828,3 +1829,46 @@ Deno.test( assertEquals(req2.headers.get("x-foo"), "bar"); }, ); + +Deno.test( + { permissions: { net: true } }, + async function fetchRequestBodyErrorCatchable() { + const listener = Deno.listen({ hostname: "127.0.0.1", port: 4514 }); + const server = (async () => { + const conn = await listener.accept(); + listener.close(); + const buf = new Uint8Array(160); + const n = await conn.read(buf); + assertEquals(n, 160); // this is the request headers + first body chunk + const n2 = await conn.read(buf); + assertEquals(n2, 6); // this is the second body chunk + const n3 = await conn.read(buf); + assertEquals(n3, null); // the connection now abruptly closes because the client has errored + conn.close(); + })(); + + const stream = new ReadableStream({ + async start(controller) { + controller.enqueue(new TextEncoder().encode("a")); + await delay(1000); + controller.enqueue(new TextEncoder().encode("b")); + await delay(1000); + controller.error(new Error("foo")); + }, + }); + + const err = await assertRejects(() => + fetch("http://localhost:4514", { + body: stream, + method: "POST", + }) + ); + + assert(err instanceof TypeError); + assert(err.cause); + assert(err.cause instanceof Error); + assertEquals(err.cause.message, "foo"); + + await server; + }, +); diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index e522079bf7..4a18e73f28 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -200,6 +200,8 @@ } terminator[abortSignal.add](onAbort); + let requestSendError; + let requestSendErrorSet = false; if (requestBodyRid !== null) { if ( reqBody === null || @@ -210,44 +212,69 @@ const reader = reqBody.getReader(); WeakMapPrototypeSet(requestBodyReaders, req, reader); (async () => { - while (true) { - const { value, done } = await PromisePrototypeCatch( - reader.read(), - (err) => { - if (terminator.aborted) return { done: true, value: undefined }; - throw err; - }, - ); + let done = false; + while (!done) { + let val; + try { + const res = await reader.read(); + done = res.done; + val = res.value; + } catch (err) { + if (terminator.aborted) break; + // TODO(lucacasonato): propagate error into response body stream + requestSendError = err; + requestSendErrorSet = true; + break; + } if (done) break; - if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { - await reader.cancel("value not a Uint8Array"); + if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, val)) { + const error = new TypeError( + "Item in request body ReadableStream is not a Uint8Array", + ); + await reader.cancel(error); + // TODO(lucacasonato): propagate error into response body stream + requestSendError = error; + requestSendErrorSet = true; break; } try { - await PromisePrototypeCatch( - core.writeAll(requestBodyRid, value), - (err) => { - if (terminator.aborted) return; - throw err; - }, - ); - if (terminator.aborted) break; + await core.writeAll(requestBodyRid, val); } catch (err) { + if (terminator.aborted) break; await reader.cancel(err); + // TODO(lucacasonato): propagate error into response body stream + requestSendError = err; + requestSendErrorSet = true; break; } } + if (done && !terminator.aborted) { + try { + await core.shutdown(requestBodyRid); + } catch (err) { + if (!terminator.aborted) { + requestSendError = err; + requestSendErrorSet = true; + } + } + } WeakMapPrototypeDelete(requestBodyReaders, req); core.tryClose(requestBodyRid); })(); } - let resp; try { - resp = await PromisePrototypeCatch(opFetchSend(requestRid), (err) => { - if (terminator.aborted) return; - throw err; - }); + resp = await opFetchSend(requestRid); + } catch (err) { + if (terminator.aborted) return; + if (requestSendErrorSet) { + // if the request body stream errored, we want to propagate that error + // instead of the original error from opFetchSend + throw new TypeError("Failed to fetch: request body stream errored", { + cause: requestSendError, + }); + } + throw err; } finally { if (cancelHandleRid !== null) { core.tryClose(cancelHandleRid); diff --git a/ext/fetch/byte_stream.rs b/ext/fetch/byte_stream.rs new file mode 100644 index 0000000000..66e29e5a07 --- /dev/null +++ b/ext/fetch/byte_stream.rs @@ -0,0 +1,87 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use deno_core::futures::Stream; +use tokio::sync::mpsc; + +/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is +/// used to bridge between the fetch task and the HTTP body stream. The stream +/// has the special property that it errors if the channel is closed before an +/// explicit EOF is sent (in the form of a [None] value on the sender). +pub struct MpscByteStream { + receiver: mpsc::Receiver>, + shutdown: bool, +} + +impl MpscByteStream { + pub fn new() -> (Self, mpsc::Sender>) { + let (sender, receiver) = mpsc::channel::>(1); + let this = Self { + receiver, + shutdown: false, + }; + (this, sender) + } +} + +impl Stream for MpscByteStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let val = std::task::ready!(self.receiver.poll_recv(cx)); + match val { + None if self.shutdown => Poll::Ready(None), + None => Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "channel closed", + )))), + Some(None) => { + self.shutdown = true; + Poll::Ready(None) + } + Some(Some(val)) => Poll::Ready(Some(Ok(val))), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use deno_core::futures::StreamExt; + + #[tokio::test] + async fn success() { + let (mut stream, sender) = MpscByteStream::new(); + + sender.send(Some(Bytes::from("hello"))).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + sender.send(Some(Bytes::from("world"))).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world")); + + sender.send(None).await.unwrap(); + drop(sender); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn error() { + let (mut stream, sender) = MpscByteStream::new(); + + sender.send(Some(Bytes::from("hello"))).await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + drop(sender); + assert_eq!( + stream.next().await.unwrap().unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } +} diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index f79fc33f4e..c19336e7de 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -1,5 +1,6 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +mod byte_stream; mod fs_fetch_handler; use data_url::DataUrl; @@ -55,7 +56,6 @@ use std::path::PathBuf; use std::pin::Pin; use std::rc::Rc; use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; // Re-export reqwest and data_url pub use data_url; @@ -63,6 +63,8 @@ pub use reqwest; pub use fs_fetch_handler::FsFetchHandler; +use crate::byte_stream::MpscByteStream; + #[derive(Clone)] pub struct Options { pub user_agent: String, @@ -256,7 +258,7 @@ where match data { None => { // If no body is passed, we return a writer for streaming the body. - let (tx, rx) = mpsc::channel::>(1); + let (stream, tx) = MpscByteStream::new(); // If the size of the body is known, we include a content-length // header explicitly. @@ -265,7 +267,7 @@ where request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) } - request = request.body(Body::wrap_stream(ReceiverStream::new(rx))); + request = request.body(Body::wrap_stream(stream)); let request_body_rid = state.resource_table.add(FetchRequestBodyResource { @@ -459,7 +461,7 @@ impl Resource for FetchCancelHandle { } pub struct FetchRequestBodyResource { - body: AsyncRefCell>>, + body: AsyncRefCell>>, cancel: CancelHandle, } @@ -474,13 +476,35 @@ impl Resource for FetchRequestBodyResource { let nwritten = bytes.len(); let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); - body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { - type_error("request body receiver not connected (request closed)") - })?; + body + .send(Some(bytes)) + .or_cancel(cancel) + .await? + .map_err(|_| { + type_error("request body receiver not connected (request closed)") + })?; Ok(WriteOutcome::Full { nwritten }) }) } + fn shutdown(self: Rc) -> AsyncResult<()> { + Box::pin(async move { + let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; + let cancel = RcRef::map(self, |r| &r.cancel); + // There is a case where hyper knows the size of the response body up + // front (through content-length header on the resp), where it will drop + // the body once that content length has been reached, regardless of if + // the stream is complete or not. This is expected behaviour, but it means + // that if you stream a body with an up front known size (eg a Blob), + // explicit shutdown can never succeed because the body (and by extension + // the receiver) will have dropped by the time we try to shutdown. As such + // we ignore if the receiver is closed, because we know that the request + // is complete in good health in that case. + body.send(None).or_cancel(cancel).await?.ok(); + Ok(()) + }) + } + fn close(self: Rc) { self.cancel.cancel() }