1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

fix(ext/fetch): handle errors in req body stream (#17081)

Right now an error in a request body stream causes an uncatchable
global promise rejection. This PR fixes this to instead propagate the
error correctly into the promise returned from `fetch`.

It additionally fixes errored readable stream bodies being treated as
successfully completed bodies by Rust.
This commit is contained in:
Luca Casonato 2022-12-19 12:49:00 +01:00 committed by GitHub
parent 84b70dd2fb
commit 43b6390629
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 212 additions and 30 deletions

View file

@ -4,6 +4,7 @@ import {
assertEquals, assertEquals,
assertRejects, assertRejects,
deferred, deferred,
delay,
fail, fail,
unimplemented, unimplemented,
} from "./test_util.ts"; } from "./test_util.ts";
@ -1828,3 +1829,46 @@ Deno.test(
assertEquals(req2.headers.get("x-foo"), "bar"); assertEquals(req2.headers.get("x-foo"), "bar");
}, },
); );
Deno.test(
{ permissions: { net: true } },
async function fetchRequestBodyErrorCatchable() {
const listener = Deno.listen({ hostname: "127.0.0.1", port: 4514 });
const server = (async () => {
const conn = await listener.accept();
listener.close();
const buf = new Uint8Array(160);
const n = await conn.read(buf);
assertEquals(n, 160); // this is the request headers + first body chunk
const n2 = await conn.read(buf);
assertEquals(n2, 6); // this is the second body chunk
const n3 = await conn.read(buf);
assertEquals(n3, null); // the connection now abruptly closes because the client has errored
conn.close();
})();
const stream = new ReadableStream({
async start(controller) {
controller.enqueue(new TextEncoder().encode("a"));
await delay(1000);
controller.enqueue(new TextEncoder().encode("b"));
await delay(1000);
controller.error(new Error("foo"));
},
});
const err = await assertRejects(() =>
fetch("http://localhost:4514", {
body: stream,
method: "POST",
})
);
assert(err instanceof TypeError);
assert(err.cause);
assert(err.cause instanceof Error);
assertEquals(err.cause.message, "foo");
await server;
},
);

View file

@ -200,6 +200,8 @@
} }
terminator[abortSignal.add](onAbort); terminator[abortSignal.add](onAbort);
let requestSendError;
let requestSendErrorSet = false;
if (requestBodyRid !== null) { if (requestBodyRid !== null) {
if ( if (
reqBody === null || reqBody === null ||
@ -210,44 +212,69 @@
const reader = reqBody.getReader(); const reader = reqBody.getReader();
WeakMapPrototypeSet(requestBodyReaders, req, reader); WeakMapPrototypeSet(requestBodyReaders, req, reader);
(async () => { (async () => {
while (true) { let done = false;
const { value, done } = await PromisePrototypeCatch( while (!done) {
reader.read(), let val;
(err) => { try {
if (terminator.aborted) return { done: true, value: undefined }; const res = await reader.read();
throw err; done = res.done;
}, val = res.value;
); } catch (err) {
if (terminator.aborted) break;
// TODO(lucacasonato): propagate error into response body stream
requestSendError = err;
requestSendErrorSet = true;
break;
}
if (done) break; if (done) break;
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, val)) {
await reader.cancel("value not a Uint8Array"); const error = new TypeError(
"Item in request body ReadableStream is not a Uint8Array",
);
await reader.cancel(error);
// TODO(lucacasonato): propagate error into response body stream
requestSendError = error;
requestSendErrorSet = true;
break; break;
} }
try { try {
await PromisePrototypeCatch( await core.writeAll(requestBodyRid, val);
core.writeAll(requestBodyRid, value),
(err) => {
if (terminator.aborted) return;
throw err;
},
);
if (terminator.aborted) break;
} catch (err) { } catch (err) {
if (terminator.aborted) break;
await reader.cancel(err); await reader.cancel(err);
// TODO(lucacasonato): propagate error into response body stream
requestSendError = err;
requestSendErrorSet = true;
break; break;
} }
} }
if (done && !terminator.aborted) {
try {
await core.shutdown(requestBodyRid);
} catch (err) {
if (!terminator.aborted) {
requestSendError = err;
requestSendErrorSet = true;
}
}
}
WeakMapPrototypeDelete(requestBodyReaders, req); WeakMapPrototypeDelete(requestBodyReaders, req);
core.tryClose(requestBodyRid); core.tryClose(requestBodyRid);
})(); })();
} }
let resp; let resp;
try { try {
resp = await PromisePrototypeCatch(opFetchSend(requestRid), (err) => { resp = await opFetchSend(requestRid);
} catch (err) {
if (terminator.aborted) return; if (terminator.aborted) return;
throw err; if (requestSendErrorSet) {
// if the request body stream errored, we want to propagate that error
// instead of the original error from opFetchSend
throw new TypeError("Failed to fetch: request body stream errored", {
cause: requestSendError,
}); });
}
throw err;
} finally { } finally {
if (cancelHandleRid !== null) { if (cancelHandleRid !== null) {
core.tryClose(cancelHandleRid); core.tryClose(cancelHandleRid);

87
ext/fetch/byte_stream.rs Normal file
View file

@ -0,0 +1,87 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use deno_core::futures::Stream;
use tokio::sync::mpsc;
/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
/// used to bridge between the fetch task and the HTTP body stream. The stream
/// has the special property that it errors if the channel is closed before an
/// explicit EOF is sent (in the form of a [None] value on the sender).
pub struct MpscByteStream {
receiver: mpsc::Receiver<Option<bytes::Bytes>>,
shutdown: bool,
}
impl MpscByteStream {
pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
let this = Self {
receiver,
shutdown: false,
};
(this, sender)
}
}
impl Stream for MpscByteStream {
type Item = Result<bytes::Bytes, std::io::Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let val = std::task::ready!(self.receiver.poll_recv(cx));
match val {
None if self.shutdown => Poll::Ready(None),
None => Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"channel closed",
)))),
Some(None) => {
self.shutdown = true;
Poll::Ready(None)
}
Some(Some(val)) => Poll::Ready(Some(Ok(val))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use deno_core::futures::StreamExt;
#[tokio::test]
async fn success() {
let (mut stream, sender) = MpscByteStream::new();
sender.send(Some(Bytes::from("hello"))).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
sender.send(Some(Bytes::from("world"))).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
sender.send(None).await.unwrap();
drop(sender);
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn error() {
let (mut stream, sender) = MpscByteStream::new();
sender.send(Some(Bytes::from("hello"))).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
drop(sender);
assert_eq!(
stream.next().await.unwrap().unwrap_err().kind(),
std::io::ErrorKind::UnexpectedEof
);
}
}

View file

@ -1,5 +1,6 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
mod byte_stream;
mod fs_fetch_handler; mod fs_fetch_handler;
use data_url::DataUrl; use data_url::DataUrl;
@ -55,7 +56,6 @@ use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
// Re-export reqwest and data_url // Re-export reqwest and data_url
pub use data_url; pub use data_url;
@ -63,6 +63,8 @@ pub use reqwest;
pub use fs_fetch_handler::FsFetchHandler; pub use fs_fetch_handler::FsFetchHandler;
use crate::byte_stream::MpscByteStream;
#[derive(Clone)] #[derive(Clone)]
pub struct Options { pub struct Options {
pub user_agent: String, pub user_agent: String,
@ -256,7 +258,7 @@ where
match data { match data {
None => { None => {
// If no body is passed, we return a writer for streaming the body. // If no body is passed, we return a writer for streaming the body.
let (tx, rx) = mpsc::channel::<std::io::Result<bytes::Bytes>>(1); let (stream, tx) = MpscByteStream::new();
// If the size of the body is known, we include a content-length // If the size of the body is known, we include a content-length
// header explicitly. // header explicitly.
@ -265,7 +267,7 @@ where
request.header(CONTENT_LENGTH, HeaderValue::from(body_size)) request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
} }
request = request.body(Body::wrap_stream(ReceiverStream::new(rx))); request = request.body(Body::wrap_stream(stream));
let request_body_rid = let request_body_rid =
state.resource_table.add(FetchRequestBodyResource { state.resource_table.add(FetchRequestBodyResource {
@ -459,7 +461,7 @@ impl Resource for FetchCancelHandle {
} }
pub struct FetchRequestBodyResource { pub struct FetchRequestBodyResource {
body: AsyncRefCell<mpsc::Sender<std::io::Result<bytes::Bytes>>>, body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
cancel: CancelHandle, cancel: CancelHandle,
} }
@ -474,13 +476,35 @@ impl Resource for FetchRequestBodyResource {
let nwritten = bytes.len(); let nwritten = bytes.len();
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await; let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel); let cancel = RcRef::map(self, |r| &r.cancel);
body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| { body
.send(Some(bytes))
.or_cancel(cancel)
.await?
.map_err(|_| {
type_error("request body receiver not connected (request closed)") type_error("request body receiver not connected (request closed)")
})?; })?;
Ok(WriteOutcome::Full { nwritten }) Ok(WriteOutcome::Full { nwritten })
}) })
} }
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
Box::pin(async move {
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
// There is a case where hyper knows the size of the response body up
// front (through content-length header on the resp), where it will drop
// the body once that content length has been reached, regardless of if
// the stream is complete or not. This is expected behaviour, but it means
// that if you stream a body with an up front known size (eg a Blob),
// explicit shutdown can never succeed because the body (and by extension
// the receiver) will have dropped by the time we try to shutdown. As such
// we ignore if the receiver is closed, because we know that the request
// is complete in good health in that case.
body.send(None).or_cancel(cancel).await?.ok();
Ok(())
})
}
fn close(self: Rc<Self>) { fn close(self: Rc<Self>) {
self.cancel.cancel() self.cancel.cancel()
} }