mirror of
https://github.com/denoland/deno.git
synced 2024-11-23 15:16:54 -05:00
fix(ext/fetch): handle errors in req body stream (#17081)
Right now an error in a request body stream causes an uncatchable global promise rejection. This PR fixes this to instead propagate the error correctly into the promise returned from `fetch`. It additionally fixes errored readable stream bodies being treated as successfully completed bodies by Rust.
This commit is contained in:
parent
0bed6e009c
commit
c059c12a1e
4 changed files with 212 additions and 30 deletions
|
@ -4,6 +4,7 @@ import {
|
||||||
assertEquals,
|
assertEquals,
|
||||||
assertRejects,
|
assertRejects,
|
||||||
deferred,
|
deferred,
|
||||||
|
delay,
|
||||||
fail,
|
fail,
|
||||||
unimplemented,
|
unimplemented,
|
||||||
} from "./test_util.ts";
|
} from "./test_util.ts";
|
||||||
|
@ -1828,3 +1829,46 @@ Deno.test(
|
||||||
assertEquals(req2.headers.get("x-foo"), "bar");
|
assertEquals(req2.headers.get("x-foo"), "bar");
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Deno.test(
|
||||||
|
{ permissions: { net: true } },
|
||||||
|
async function fetchRequestBodyErrorCatchable() {
|
||||||
|
const listener = Deno.listen({ hostname: "127.0.0.1", port: 4514 });
|
||||||
|
const server = (async () => {
|
||||||
|
const conn = await listener.accept();
|
||||||
|
listener.close();
|
||||||
|
const buf = new Uint8Array(160);
|
||||||
|
const n = await conn.read(buf);
|
||||||
|
assertEquals(n, 160); // this is the request headers + first body chunk
|
||||||
|
const n2 = await conn.read(buf);
|
||||||
|
assertEquals(n2, 6); // this is the second body chunk
|
||||||
|
const n3 = await conn.read(buf);
|
||||||
|
assertEquals(n3, null); // the connection now abruptly closes because the client has errored
|
||||||
|
conn.close();
|
||||||
|
})();
|
||||||
|
|
||||||
|
const stream = new ReadableStream({
|
||||||
|
async start(controller) {
|
||||||
|
controller.enqueue(new TextEncoder().encode("a"));
|
||||||
|
await delay(1000);
|
||||||
|
controller.enqueue(new TextEncoder().encode("b"));
|
||||||
|
await delay(1000);
|
||||||
|
controller.error(new Error("foo"));
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const err = await assertRejects(() =>
|
||||||
|
fetch("http://localhost:4514", {
|
||||||
|
body: stream,
|
||||||
|
method: "POST",
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
assert(err instanceof TypeError);
|
||||||
|
assert(err.cause);
|
||||||
|
assert(err.cause instanceof Error);
|
||||||
|
assertEquals(err.cause.message, "foo");
|
||||||
|
|
||||||
|
await server;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
|
@ -200,6 +200,8 @@
|
||||||
}
|
}
|
||||||
terminator[abortSignal.add](onAbort);
|
terminator[abortSignal.add](onAbort);
|
||||||
|
|
||||||
|
let requestSendError;
|
||||||
|
let requestSendErrorSet = false;
|
||||||
if (requestBodyRid !== null) {
|
if (requestBodyRid !== null) {
|
||||||
if (
|
if (
|
||||||
reqBody === null ||
|
reqBody === null ||
|
||||||
|
@ -210,44 +212,69 @@
|
||||||
const reader = reqBody.getReader();
|
const reader = reqBody.getReader();
|
||||||
WeakMapPrototypeSet(requestBodyReaders, req, reader);
|
WeakMapPrototypeSet(requestBodyReaders, req, reader);
|
||||||
(async () => {
|
(async () => {
|
||||||
while (true) {
|
let done = false;
|
||||||
const { value, done } = await PromisePrototypeCatch(
|
while (!done) {
|
||||||
reader.read(),
|
let val;
|
||||||
(err) => {
|
try {
|
||||||
if (terminator.aborted) return { done: true, value: undefined };
|
const res = await reader.read();
|
||||||
throw err;
|
done = res.done;
|
||||||
},
|
val = res.value;
|
||||||
);
|
} catch (err) {
|
||||||
|
if (terminator.aborted) break;
|
||||||
|
// TODO(lucacasonato): propagate error into response body stream
|
||||||
|
requestSendError = err;
|
||||||
|
requestSendErrorSet = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
if (done) break;
|
if (done) break;
|
||||||
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
|
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, val)) {
|
||||||
await reader.cancel("value not a Uint8Array");
|
const error = new TypeError(
|
||||||
|
"Item in request body ReadableStream is not a Uint8Array",
|
||||||
|
);
|
||||||
|
await reader.cancel(error);
|
||||||
|
// TODO(lucacasonato): propagate error into response body stream
|
||||||
|
requestSendError = error;
|
||||||
|
requestSendErrorSet = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
await PromisePrototypeCatch(
|
await core.writeAll(requestBodyRid, val);
|
||||||
core.writeAll(requestBodyRid, value),
|
|
||||||
(err) => {
|
|
||||||
if (terminator.aborted) return;
|
|
||||||
throw err;
|
|
||||||
},
|
|
||||||
);
|
|
||||||
if (terminator.aborted) break;
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
if (terminator.aborted) break;
|
||||||
await reader.cancel(err);
|
await reader.cancel(err);
|
||||||
|
// TODO(lucacasonato): propagate error into response body stream
|
||||||
|
requestSendError = err;
|
||||||
|
requestSendErrorSet = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (done && !terminator.aborted) {
|
||||||
|
try {
|
||||||
|
await core.shutdown(requestBodyRid);
|
||||||
|
} catch (err) {
|
||||||
|
if (!terminator.aborted) {
|
||||||
|
requestSendError = err;
|
||||||
|
requestSendErrorSet = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
WeakMapPrototypeDelete(requestBodyReaders, req);
|
WeakMapPrototypeDelete(requestBodyReaders, req);
|
||||||
core.tryClose(requestBodyRid);
|
core.tryClose(requestBodyRid);
|
||||||
})();
|
})();
|
||||||
}
|
}
|
||||||
|
|
||||||
let resp;
|
let resp;
|
||||||
try {
|
try {
|
||||||
resp = await PromisePrototypeCatch(opFetchSend(requestRid), (err) => {
|
resp = await opFetchSend(requestRid);
|
||||||
|
} catch (err) {
|
||||||
if (terminator.aborted) return;
|
if (terminator.aborted) return;
|
||||||
throw err;
|
if (requestSendErrorSet) {
|
||||||
|
// if the request body stream errored, we want to propagate that error
|
||||||
|
// instead of the original error from opFetchSend
|
||||||
|
throw new TypeError("Failed to fetch: request body stream errored", {
|
||||||
|
cause: requestSendError,
|
||||||
});
|
});
|
||||||
|
}
|
||||||
|
throw err;
|
||||||
} finally {
|
} finally {
|
||||||
if (cancelHandleRid !== null) {
|
if (cancelHandleRid !== null) {
|
||||||
core.tryClose(cancelHandleRid);
|
core.tryClose(cancelHandleRid);
|
||||||
|
|
87
ext/fetch/byte_stream.rs
Normal file
87
ext/fetch/byte_stream.rs
Normal file
|
@ -0,0 +1,87 @@
|
||||||
|
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::Context;
|
||||||
|
use std::task::Poll;
|
||||||
|
|
||||||
|
use deno_core::futures::Stream;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
|
||||||
|
/// used to bridge between the fetch task and the HTTP body stream. The stream
|
||||||
|
/// has the special property that it errors if the channel is closed before an
|
||||||
|
/// explicit EOF is sent (in the form of a [None] value on the sender).
|
||||||
|
pub struct MpscByteStream {
|
||||||
|
receiver: mpsc::Receiver<Option<bytes::Bytes>>,
|
||||||
|
shutdown: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MpscByteStream {
|
||||||
|
pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
|
||||||
|
let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
|
||||||
|
let this = Self {
|
||||||
|
receiver,
|
||||||
|
shutdown: false,
|
||||||
|
};
|
||||||
|
(this, sender)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Stream for MpscByteStream {
|
||||||
|
type Item = Result<bytes::Bytes, std::io::Error>;
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
mut self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Option<Self::Item>> {
|
||||||
|
let val = std::task::ready!(self.receiver.poll_recv(cx));
|
||||||
|
match val {
|
||||||
|
None if self.shutdown => Poll::Ready(None),
|
||||||
|
None => Poll::Ready(Some(Err(std::io::Error::new(
|
||||||
|
std::io::ErrorKind::UnexpectedEof,
|
||||||
|
"channel closed",
|
||||||
|
)))),
|
||||||
|
Some(None) => {
|
||||||
|
self.shutdown = true;
|
||||||
|
Poll::Ready(None)
|
||||||
|
}
|
||||||
|
Some(Some(val)) => Poll::Ready(Some(Ok(val))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use deno_core::futures::StreamExt;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn success() {
|
||||||
|
let (mut stream, sender) = MpscByteStream::new();
|
||||||
|
|
||||||
|
sender.send(Some(Bytes::from("hello"))).await.unwrap();
|
||||||
|
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
|
||||||
|
|
||||||
|
sender.send(Some(Bytes::from("world"))).await.unwrap();
|
||||||
|
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
|
||||||
|
|
||||||
|
sender.send(None).await.unwrap();
|
||||||
|
drop(sender);
|
||||||
|
assert!(stream.next().await.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn error() {
|
||||||
|
let (mut stream, sender) = MpscByteStream::new();
|
||||||
|
|
||||||
|
sender.send(Some(Bytes::from("hello"))).await.unwrap();
|
||||||
|
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
|
||||||
|
|
||||||
|
drop(sender);
|
||||||
|
assert_eq!(
|
||||||
|
stream.next().await.unwrap().unwrap_err().kind(),
|
||||||
|
std::io::ErrorKind::UnexpectedEof
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,5 +1,6 @@
|
||||||
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
mod byte_stream;
|
||||||
mod fs_fetch_handler;
|
mod fs_fetch_handler;
|
||||||
|
|
||||||
use data_url::DataUrl;
|
use data_url::DataUrl;
|
||||||
|
@ -55,7 +56,6 @@ use std::path::PathBuf;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
|
||||||
|
|
||||||
// Re-export reqwest and data_url
|
// Re-export reqwest and data_url
|
||||||
pub use data_url;
|
pub use data_url;
|
||||||
|
@ -63,6 +63,8 @@ pub use reqwest;
|
||||||
|
|
||||||
pub use fs_fetch_handler::FsFetchHandler;
|
pub use fs_fetch_handler::FsFetchHandler;
|
||||||
|
|
||||||
|
use crate::byte_stream::MpscByteStream;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Options {
|
pub struct Options {
|
||||||
pub user_agent: String,
|
pub user_agent: String,
|
||||||
|
@ -256,7 +258,7 @@ where
|
||||||
match data {
|
match data {
|
||||||
None => {
|
None => {
|
||||||
// If no body is passed, we return a writer for streaming the body.
|
// If no body is passed, we return a writer for streaming the body.
|
||||||
let (tx, rx) = mpsc::channel::<std::io::Result<bytes::Bytes>>(1);
|
let (stream, tx) = MpscByteStream::new();
|
||||||
|
|
||||||
// If the size of the body is known, we include a content-length
|
// If the size of the body is known, we include a content-length
|
||||||
// header explicitly.
|
// header explicitly.
|
||||||
|
@ -265,7 +267,7 @@ where
|
||||||
request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
|
request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
|
||||||
}
|
}
|
||||||
|
|
||||||
request = request.body(Body::wrap_stream(ReceiverStream::new(rx)));
|
request = request.body(Body::wrap_stream(stream));
|
||||||
|
|
||||||
let request_body_rid =
|
let request_body_rid =
|
||||||
state.resource_table.add(FetchRequestBodyResource {
|
state.resource_table.add(FetchRequestBodyResource {
|
||||||
|
@ -459,7 +461,7 @@ impl Resource for FetchCancelHandle {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct FetchRequestBodyResource {
|
pub struct FetchRequestBodyResource {
|
||||||
body: AsyncRefCell<mpsc::Sender<std::io::Result<bytes::Bytes>>>,
|
body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
|
||||||
cancel: CancelHandle,
|
cancel: CancelHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,13 +476,35 @@ impl Resource for FetchRequestBodyResource {
|
||||||
let nwritten = bytes.len();
|
let nwritten = bytes.len();
|
||||||
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
|
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
|
||||||
let cancel = RcRef::map(self, |r| &r.cancel);
|
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||||
body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
|
body
|
||||||
|
.send(Some(bytes))
|
||||||
|
.or_cancel(cancel)
|
||||||
|
.await?
|
||||||
|
.map_err(|_| {
|
||||||
type_error("request body receiver not connected (request closed)")
|
type_error("request body receiver not connected (request closed)")
|
||||||
})?;
|
})?;
|
||||||
Ok(WriteOutcome::Full { nwritten })
|
Ok(WriteOutcome::Full { nwritten })
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
|
||||||
|
Box::pin(async move {
|
||||||
|
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
|
||||||
|
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||||
|
// There is a case where hyper knows the size of the response body up
|
||||||
|
// front (through content-length header on the resp), where it will drop
|
||||||
|
// the body once that content length has been reached, regardless of if
|
||||||
|
// the stream is complete or not. This is expected behaviour, but it means
|
||||||
|
// that if you stream a body with an up front known size (eg a Blob),
|
||||||
|
// explicit shutdown can never succeed because the body (and by extension
|
||||||
|
// the receiver) will have dropped by the time we try to shutdown. As such
|
||||||
|
// we ignore if the receiver is closed, because we know that the request
|
||||||
|
// is complete in good health in that case.
|
||||||
|
body.send(None).or_cancel(cancel).await?.ok();
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
fn close(self: Rc<Self>) {
|
fn close(self: Rc<Self>) {
|
||||||
self.cancel.cancel()
|
self.cancel.cancel()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue