mirror of
https://github.com/denoland/deno.git
synced 2025-01-03 04:48:52 -05:00
refactor(ext/fetch): refactor fetch to use new write_error method (#20029)
This is a prerequisite for fast streams work -- this particular resource used a custom `mpsc`-style stream, and this work will allow us to unify it with the streams in `ext/http` in time. Instead of using Option as an internal semaphore for "correctly completed EOF", we allow code to propagate errors into the channel which can be picked up by downstream sinks like Hyper. EOF is signalled using a more standard sender drop.
This commit is contained in:
parent
0f07dc95f1
commit
7f8bf2537d
4 changed files with 70 additions and 130 deletions
|
@ -201,6 +201,23 @@ async function mainFetch(req, recursive, terminator) {
|
|||
|
||||
let requestSendError;
|
||||
let requestSendErrorSet = false;
|
||||
|
||||
async function propagateError(err, message) {
|
||||
// TODO(lucacasonato): propagate error into response body stream
|
||||
try {
|
||||
await core.writeTypeError(requestBodyRid, message);
|
||||
} catch (err) {
|
||||
if (!requestSendErrorSet) {
|
||||
requestSendErrorSet = true;
|
||||
requestSendError = err;
|
||||
}
|
||||
}
|
||||
if (!requestSendErrorSet) {
|
||||
requestSendErrorSet = true;
|
||||
requestSendError = err;
|
||||
}
|
||||
}
|
||||
|
||||
if (requestBodyRid !== null) {
|
||||
if (
|
||||
reqBody === null ||
|
||||
|
@ -220,9 +237,7 @@ async function mainFetch(req, recursive, terminator) {
|
|||
val = res.value;
|
||||
} catch (err) {
|
||||
if (terminator.aborted) break;
|
||||
// TODO(lucacasonato): propagate error into response body stream
|
||||
requestSendError = err;
|
||||
requestSendErrorSet = true;
|
||||
await propagateError(err, "failed to read");
|
||||
break;
|
||||
}
|
||||
if (done) break;
|
||||
|
@ -231,9 +246,7 @@ async function mainFetch(req, recursive, terminator) {
|
|||
"Item in request body ReadableStream is not a Uint8Array",
|
||||
);
|
||||
await reader.cancel(error);
|
||||
// TODO(lucacasonato): propagate error into response body stream
|
||||
requestSendError = error;
|
||||
requestSendErrorSet = true;
|
||||
await propagateError(error, error.message);
|
||||
break;
|
||||
}
|
||||
try {
|
||||
|
@ -241,9 +254,7 @@ async function mainFetch(req, recursive, terminator) {
|
|||
} catch (err) {
|
||||
if (terminator.aborted) break;
|
||||
await reader.cancel(err);
|
||||
// TODO(lucacasonato): propagate error into response body stream
|
||||
requestSendError = err;
|
||||
requestSendErrorSet = true;
|
||||
await propagateError(err, "failed to write");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -252,8 +263,7 @@ async function mainFetch(req, recursive, terminator) {
|
|||
await core.shutdown(requestBodyRid);
|
||||
} catch (err) {
|
||||
if (!terminator.aborted) {
|
||||
requestSendError = err;
|
||||
requestSendErrorSet = true;
|
||||
await propagateError(err, "failed to flush");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,87 +0,0 @@
|
|||
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
use deno_core::futures::Stream;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
|
||||
/// used to bridge between the fetch task and the HTTP body stream. The stream
|
||||
/// has the special property that it errors if the channel is closed before an
|
||||
/// explicit EOF is sent (in the form of a [None] value on the sender).
|
||||
pub struct MpscByteStream {
|
||||
receiver: mpsc::Receiver<Option<bytes::Bytes>>,
|
||||
shutdown: bool,
|
||||
}
|
||||
|
||||
impl MpscByteStream {
|
||||
pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
|
||||
let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
|
||||
let this = Self {
|
||||
receiver,
|
||||
shutdown: false,
|
||||
};
|
||||
(this, sender)
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MpscByteStream {
|
||||
type Item = Result<bytes::Bytes, std::io::Error>;
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Self::Item>> {
|
||||
let val = std::task::ready!(self.receiver.poll_recv(cx));
|
||||
match val {
|
||||
None if self.shutdown => Poll::Ready(None),
|
||||
None => Poll::Ready(Some(Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"channel closed",
|
||||
)))),
|
||||
Some(None) => {
|
||||
self.shutdown = true;
|
||||
Poll::Ready(None)
|
||||
}
|
||||
Some(Some(val)) => Poll::Ready(Some(Ok(val))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use bytes::Bytes;
|
||||
use deno_core::futures::StreamExt;
|
||||
|
||||
#[tokio::test]
|
||||
async fn success() {
|
||||
let (mut stream, sender) = MpscByteStream::new();
|
||||
|
||||
sender.send(Some(Bytes::from("hello"))).await.unwrap();
|
||||
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
|
||||
|
||||
sender.send(Some(Bytes::from("world"))).await.unwrap();
|
||||
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
|
||||
|
||||
sender.send(None).await.unwrap();
|
||||
drop(sender);
|
||||
assert!(stream.next().await.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn error() {
|
||||
let (mut stream, sender) = MpscByteStream::new();
|
||||
|
||||
sender.send(Some(Bytes::from("hello"))).await.unwrap();
|
||||
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
|
||||
|
||||
drop(sender);
|
||||
assert_eq!(
|
||||
stream.next().await.unwrap().unwrap_err().kind(),
|
||||
std::io::ErrorKind::UnexpectedEof
|
||||
);
|
||||
}
|
||||
}
|
|
@ -1,6 +1,5 @@
|
|||
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
mod byte_stream;
|
||||
mod fs_fetch_handler;
|
||||
|
||||
use std::borrow::Cow;
|
||||
|
@ -13,10 +12,12 @@ use std::pin::Pin;
|
|||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
|
||||
use deno_core::anyhow::Error;
|
||||
use deno_core::error::type_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::stream::Peekable;
|
||||
use deno_core::futures::Future;
|
||||
use deno_core::futures::FutureExt;
|
||||
use deno_core::futures::Stream;
|
||||
use deno_core::futures::StreamExt;
|
||||
use deno_core::op;
|
||||
|
@ -69,8 +70,6 @@ pub use reqwest;
|
|||
|
||||
pub use fs_fetch_handler::FsFetchHandler;
|
||||
|
||||
pub use crate::byte_stream::MpscByteStream;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Options {
|
||||
pub user_agent: String,
|
||||
|
@ -293,7 +292,7 @@ where
|
|||
match data {
|
||||
None => {
|
||||
// If no body is passed, we return a writer for streaming the body.
|
||||
let (stream, tx) = MpscByteStream::new();
|
||||
let (tx, stream) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
// If the size of the body is known, we include a content-length
|
||||
// header explicitly.
|
||||
|
@ -302,11 +301,11 @@ where
|
|||
request.header(CONTENT_LENGTH, HeaderValue::from(body_size))
|
||||
}
|
||||
|
||||
request = request.body(Body::wrap_stream(stream));
|
||||
request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
|
||||
|
||||
let request_body_rid =
|
||||
state.resource_table.add(FetchRequestBodyResource {
|
||||
body: AsyncRefCell::new(tx),
|
||||
body: AsyncRefCell::new(Some(tx)),
|
||||
cancel: CancelHandle::default(),
|
||||
});
|
||||
|
||||
|
@ -604,8 +603,21 @@ impl Resource for FetchCancelHandle {
|
|||
}
|
||||
}
|
||||
|
||||
/// Wraps a [`mpsc::Receiver`] in a [`Stream`] that can be used as a Hyper [`Body`].
|
||||
pub struct FetchBodyStream(pub mpsc::Receiver<Result<bytes::Bytes, Error>>);
|
||||
|
||||
impl Stream for FetchBodyStream {
|
||||
type Item = Result<bytes::Bytes, Error>;
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Self::Item>> {
|
||||
self.0.poll_recv(cx)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FetchRequestBodyResource {
|
||||
pub body: AsyncRefCell<mpsc::Sender<Option<bytes::Bytes>>>,
|
||||
pub body: AsyncRefCell<Option<mpsc::Sender<Result<bytes::Bytes, Error>>>>,
|
||||
pub cancel: CancelHandle,
|
||||
}
|
||||
|
||||
|
@ -619,38 +631,43 @@ impl Resource for FetchRequestBodyResource {
|
|||
let bytes: bytes::Bytes = buf.into();
|
||||
let nwritten = bytes.len();
|
||||
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
|
||||
let body = (*body).as_ref();
|
||||
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||
body
|
||||
.send(Some(bytes))
|
||||
.or_cancel(cancel)
|
||||
.await?
|
||||
.map_err(|_| {
|
||||
type_error("request body receiver not connected (request closed)")
|
||||
})?;
|
||||
let body = body.ok_or(type_error(
|
||||
"request body receiver not connected (request closed)",
|
||||
))?;
|
||||
body.send(Ok(bytes)).or_cancel(cancel).await?.map_err(|_| {
|
||||
type_error("request body receiver not connected (request closed)")
|
||||
})?;
|
||||
Ok(WriteOutcome::Full { nwritten })
|
||||
})
|
||||
}
|
||||
|
||||
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
|
||||
Box::pin(async move {
|
||||
fn write_error(self: Rc<Self>, error: Error) -> AsyncResult<()> {
|
||||
async move {
|
||||
let body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
|
||||
let body = (*body).as_ref();
|
||||
let cancel = RcRef::map(self, |r| &r.cancel);
|
||||
// There is a case where hyper knows the size of the response body up
|
||||
// front (through content-length header on the resp), where it will drop
|
||||
// the body once that content length has been reached, regardless of if
|
||||
// the stream is complete or not. This is expected behaviour, but it means
|
||||
// that if you stream a body with an up front known size (eg a Blob),
|
||||
// explicit shutdown can never succeed because the body (and by extension
|
||||
// the receiver) will have dropped by the time we try to shutdown. As such
|
||||
// we ignore if the receiver is closed, because we know that the request
|
||||
// is complete in good health in that case.
|
||||
body.send(None).or_cancel(cancel).await?.ok();
|
||||
let body = body.ok_or(type_error(
|
||||
"request body receiver not connected (request closed)",
|
||||
))?;
|
||||
body.send(Err(error)).or_cancel(cancel).await??;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
.boxed_local()
|
||||
}
|
||||
|
||||
fn shutdown(self: Rc<Self>) -> AsyncResult<()> {
|
||||
async move {
|
||||
let mut body = RcRef::map(&self, |r| &r.body).borrow_mut().await;
|
||||
body.take();
|
||||
Ok(())
|
||||
}
|
||||
.boxed_local()
|
||||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel.cancel()
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -10,12 +10,12 @@ use deno_core::CancelFuture;
|
|||
use deno_core::CancelHandle;
|
||||
use deno_core::OpState;
|
||||
use deno_fetch::get_or_create_client_from_state;
|
||||
use deno_fetch::FetchBodyStream;
|
||||
use deno_fetch::FetchCancelHandle;
|
||||
use deno_fetch::FetchRequestBodyResource;
|
||||
use deno_fetch::FetchRequestResource;
|
||||
use deno_fetch::FetchReturn;
|
||||
use deno_fetch::HttpClientResource;
|
||||
use deno_fetch::MpscByteStream;
|
||||
use reqwest::header::HeaderMap;
|
||||
use reqwest::header::HeaderName;
|
||||
use reqwest::header::HeaderValue;
|
||||
|
@ -64,12 +64,12 @@ where
|
|||
|
||||
let request_body_rid = if has_body {
|
||||
// If no body is passed, we return a writer for streaming the body.
|
||||
let (stream, tx) = MpscByteStream::new();
|
||||
let (tx, stream) = tokio::sync::mpsc::channel(1);
|
||||
|
||||
request = request.body(Body::wrap_stream(stream));
|
||||
request = request.body(Body::wrap_stream(FetchBodyStream(stream)));
|
||||
|
||||
let request_body_rid = state.resource_table.add(FetchRequestBodyResource {
|
||||
body: AsyncRefCell::new(tx),
|
||||
body: AsyncRefCell::new(Some(tx)),
|
||||
cancel: CancelHandle::default(),
|
||||
});
|
||||
|
||||
|
|
Loading…
Reference in a new issue