diff --git a/Cargo.lock b/Cargo.lock index a74724e92d..bb0617ceee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -380,7 +380,7 @@ dependencies = [ "rustversion", "serde", "sync_wrapper", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", ] @@ -1651,6 +1651,7 @@ dependencies = [ "dyn-clone", "error_reporter", "fast-socks5", + "h2 0.4.4", "hickory-resolver", "http 1.1.0", "http-body-util", @@ -1667,7 +1668,7 @@ dependencies = [ "tokio-rustls", "tokio-socks", "tokio-util", - "tower", + "tower 0.5.2", "tower-http", "tower-service", ] @@ -2322,7 +2323,7 @@ dependencies = [ "serde_json", "tokio", "tokio-util", - "tower", + "tower 0.4.13", "tracing", ] @@ -7976,7 +7977,7 @@ dependencies = [ "socket2", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -8002,6 +8003,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-http" version = "0.6.1" @@ -8030,9 +8046,9 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" diff --git a/Cargo.toml b/Cargo.toml index 0d045c9059..55a94b75a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -202,7 +202,7 @@ tokio-metrics = { version = "0.3.0", features = ["rt"] } tokio-rustls = { version = "0.26.0", default-features = false, features = ["ring", "tls12"] } tokio-socks = "0.5.1" tokio-util = "0.7.4" -tower = { version = "0.4.13", default-features = false, features = ["util"] } +tower = { version = "0.5.2", default-features = false, features = ["retry", "util"] } tower-http = { version = "0.6.1", features = ["decompression-br", "decompression-gzip"] } tower-lsp = { package = "deno_tower_lsp", version = "0.1.0", features = ["proposed"] } tower-service = "0.3.2" diff --git a/cli/http_util.rs b/cli/http_util.rs index ce05d66b78..b24dd7bc0c 100644 --- a/cli/http_util.rs +++ b/cli/http_util.rs @@ -145,9 +145,7 @@ impl HttpClient { } pub fn get(&self, url: Url) -> Result { - let body = http_body_util::Empty::new() - .map_err(|never| match never {}) - .boxed(); + let body = deno_fetch::ReqBody::empty(); let mut req = http::Request::new(body); *req.uri_mut() = url.as_str().parse()?; Ok(RequestBuilder { @@ -179,9 +177,7 @@ impl HttpClient { S: serde::Serialize, { let json = deno_core::serde_json::to_vec(ser)?; - let body = http_body_util::Full::new(json.into()) - .map_err(|never| match never {}) - .boxed(); + let body = deno_fetch::ReqBody::full(json.into()); let builder = self.post(url, body)?; Ok(builder.header( http::header::CONTENT_TYPE, @@ -194,9 +190,7 @@ impl HttpClient { url: &Url, headers: HeaderMap, ) -> Result, SendError> { - let body = http_body_util::Empty::new() - .map_err(|never| match never {}) - .boxed(); + let body = deno_fetch::ReqBody::empty(); let mut request = http::Request::new(body); *request.uri_mut() = http::Uri::try_from(url.as_str())?; *request.headers_mut() = headers; diff --git a/cli/tools/registry/mod.rs b/cli/tools/registry/mod.rs index 001e401459..45a040d236 100644 --- a/cli/tools/registry/mod.rs +++ b/cli/tools/registry/mod.rs @@ -26,6 +26,7 @@ use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; use deno_core::url::Url; +use deno_runtime::deno_fetch; use deno_terminal::colors; use http_body_util::BodyExt; use serde::Deserialize; @@ -911,9 +912,7 @@ async fn publish_package( package.config ); - let body = http_body_util::Full::new(package.tarball.bytes.clone()) - .map_err(|never| match never {}) - .boxed(); + let body = deno_fetch::ReqBody::full(package.tarball.bytes.clone()); let response = http_client .post(url.parse()?, body)? .header( diff --git a/ext/fetch/Cargo.toml b/ext/fetch/Cargo.toml index 98d2fdf5da..fee21808e7 100644 --- a/ext/fetch/Cargo.toml +++ b/ext/fetch/Cargo.toml @@ -23,6 +23,7 @@ deno_permissions.workspace = true deno_tls.workspace = true dyn-clone = "1" error_reporter = "1" +h2.workspace = true hickory-resolver.workspace = true http.workspace = true http-body-util.workspace = true diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 919c6d3044..103698b3bf 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -10,6 +10,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::cmp::min; use std::convert::From; +use std::future; use std::path::Path; use std::path::PathBuf; use std::pin::Pin; @@ -66,6 +67,7 @@ use http::header::USER_AGENT; use http::Extensions; use http::Method; use http::Uri; +use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; use hyper::body::Frame; use hyper_util::client::legacy::connect::HttpConnector; @@ -75,6 +77,7 @@ use hyper_util::rt::TokioExecutor; use hyper_util::rt::TokioTimer; use serde::Deserialize; use serde::Serialize; +use tower::retry; use tower::ServiceExt; use tower_http::decompression::Decompression; @@ -476,9 +479,7 @@ where // If a body is passed, we use it, and don't return a body for streaming. con_len = Some(data.len() as u64); - http_body_util::Full::new(data.to_vec().into()) - .map_err(|never| match never {}) - .boxed() + ReqBody::full(data.to_vec().into()) } (_, Some(resource)) => { let resource = state @@ -491,7 +492,7 @@ where } _ => {} } - ReqBody::new(ResourceToBodyAdapter::new(resource)) + ReqBody::streaming(ResourceToBodyAdapter::new(resource)) } (None, None) => unreachable!(), } @@ -501,9 +502,7 @@ where if matches!(method, Method::POST | Method::PUT) { con_len = Some(0); } - http_body_util::Empty::new() - .map_err(|never| match never {}) - .boxed() + ReqBody::empty() }; let mut request = http::Request::new(body); @@ -1066,7 +1065,8 @@ pub fn create_http_client( } let pooled_client = builder.build(connector); - let decompress = Decompression::new(pooled_client).gzip(true).br(true); + let retry_client = retry::Retry::new(FetchRetry, pooled_client); + let decompress = Decompression::new(retry_client).gzip(true).br(true); Ok(Client { inner: decompress, @@ -1083,7 +1083,12 @@ pub fn op_utf8_to_byte_string(#[string] input: String) -> ByteString { #[derive(Clone, Debug)] pub struct Client { - inner: Decompression>, + inner: Decompression< + retry::Retry< + FetchRetry, + hyper_util::client::legacy::Client, + >, + >, // Used to check whether to include a proxy-authorization header proxies: Arc, user_agent: HeaderValue, @@ -1174,10 +1179,70 @@ impl Client { } } -pub type ReqBody = - http_body_util::combinators::BoxBody; -pub type ResBody = - http_body_util::combinators::BoxBody; +// This is a custom enum to allow the retry policy to clone the variants that could be retried. +pub enum ReqBody { + Full(http_body_util::Full), + Empty(http_body_util::Empty), + Streaming(BoxBody), +} + +pub type ResBody = BoxBody; + +impl ReqBody { + pub fn full(bytes: Bytes) -> Self { + ReqBody::Full(http_body_util::Full::new(bytes)) + } + + pub fn empty() -> Self { + ReqBody::Empty(http_body_util::Empty::new()) + } + + pub fn streaming(body: B) -> Self + where + B: hyper::body::Body + + Send + + Sync + + 'static, + { + ReqBody::Streaming(BoxBody::new(body)) + } +} + +impl hyper::body::Body for ReqBody { + type Data = Bytes; + type Error = deno_core::error::AnyError; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match &mut *self { + ReqBody::Full(ref mut b) => { + Pin::new(b).poll_frame(cx).map_err(|never| match never {}) + } + ReqBody::Empty(ref mut b) => { + Pin::new(b).poll_frame(cx).map_err(|never| match never {}) + } + ReqBody::Streaming(ref mut b) => Pin::new(b).poll_frame(cx), + } + } + + fn is_end_stream(&self) -> bool { + match self { + ReqBody::Full(ref b) => b.is_end_stream(), + ReqBody::Empty(ref b) => b.is_end_stream(), + ReqBody::Streaming(ref b) => b.is_end_stream(), + } + } + + fn size_hint(&self) -> hyper::body::SizeHint { + match self { + ReqBody::Full(ref b) => b.size_hint(), + ReqBody::Empty(ref b) => b.size_hint(), + ReqBody::Streaming(ref b) => b.size_hint(), + } + } +} /// Copied from https://github.com/seanmonstar/reqwest/blob/b9d62a0323d96f11672a61a17bf8849baec00275/src/async_impl/request.rs#L572 /// Check the request URL for a "username:password" type authority, and if @@ -1214,3 +1279,102 @@ pub fn extract_authority(url: &mut Url) -> Option<(String, Option)> { fn op_fetch_promise_is_settled(promise: v8::Local) -> bool { promise.state() != v8::PromiseState::Pending } + +/// Deno.fetch's retry policy. +#[derive(Clone, Debug)] +struct FetchRetry; + +/// Marker extension that a request has been retried once. +#[derive(Clone, Debug)] +struct Retried; + +impl + retry::Policy, http::Response, E> + for FetchRetry +where + E: std::error::Error + 'static, +{ + /// Don't delay retries. + type Future = future::Ready<()>; + + fn retry( + &mut self, + req: &mut http::Request, + result: &mut Result, E>, + ) -> Option { + if req.extensions().get::().is_some() { + // only retry once + return None; + } + + match result { + Ok(..) => { + // never retry a Response + None + } + Err(err) => { + if is_error_retryable(&*err) { + req.extensions_mut().insert(Retried); + Some(future::ready(())) + } else { + None + } + } + } + } + + fn clone_request( + &mut self, + req: &http::Request, + ) -> Option> { + let body = match req.body() { + ReqBody::Full(b) => ReqBody::Full(b.clone()), + ReqBody::Empty(b) => ReqBody::Empty(*b), + ReqBody::Streaming(..) => return None, + }; + + let mut clone = http::Request::new(body); + *clone.method_mut() = req.method().clone(); + *clone.uri_mut() = req.uri().clone(); + *clone.headers_mut() = req.headers().clone(); + *clone.extensions_mut() = req.extensions().clone(); + Some(clone) + } +} + +fn is_error_retryable(err: &(dyn std::error::Error + 'static)) -> bool { + // Note: hyper doesn't promise it will always be this h2 version. Keep up to date. + if let Some(err) = find_source::(err) { + // They sent us a graceful shutdown, try with a new connection! + if err.is_go_away() + && err.is_remote() + && err.reason() == Some(h2::Reason::NO_ERROR) + { + return true; + } + + // REFUSED_STREAM was sent from the server, which is safe to retry. + // https://www.rfc-editor.org/rfc/rfc9113.html#section-8.7-3.2 + if err.is_reset() + && err.is_remote() + && err.reason() == Some(h2::Reason::REFUSED_STREAM) + { + return true; + } + } + + false +} + +fn find_source<'a, E: std::error::Error + 'static>( + err: &'a (dyn std::error::Error + 'static), +) -> Option<&'a E> { + let mut err = Some(err); + while let Some(src) = err { + if let Some(found) = src.downcast_ref::() { + return Some(found); + } + err = src.source(); + } + None +} diff --git a/ext/fetch/tests.rs b/ext/fetch/tests.rs index 3da29f8aa7..243b80bd90 100644 --- a/ext/fetch/tests.rs +++ b/ext/fetch/tests.rs @@ -133,11 +133,7 @@ async fn rust_test_client_with_resolver( let req = http::Request::builder() .uri(format!("https://{}/foo", src_addr)) - .body( - http_body_util::Empty::new() - .map_err(|err| match err {}) - .boxed(), - ) + .body(crate::ReqBody::empty()) .unwrap(); let resp = client.send(req).await.unwrap(); assert_eq!(resp.status(), http::StatusCode::OK); diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs index 1830aa67ee..891786e319 100644 --- a/ext/kv/remote.rs +++ b/ext/kv/remote.rs @@ -122,9 +122,7 @@ impl RemoteTransport for FetchClient { headers: http::HeaderMap, body: Bytes, ) -> Result<(Url, http::StatusCode, Self::Response), anyhow::Error> { - let body = http_body_util::Full::new(body) - .map_err(|never| match never {}) - .boxed(); + let body = deno_fetch::ReqBody::full(body); let mut req = http::Request::new(body); *req.method_mut() = http::Method::POST; *req.uri_mut() = url.as_str().parse()?; diff --git a/runtime/ops/web_worker/sync_fetch.rs b/runtime/ops/web_worker/sync_fetch.rs index d1f133d3d2..508bcb7bb0 100644 --- a/runtime/ops/web_worker/sync_fetch.rs +++ b/runtime/ops/web_worker/sync_fetch.rs @@ -104,11 +104,7 @@ pub fn op_worker_sync_fetch( let (body, mime_type, res_url) = match script_url.scheme() { "http" | "https" => { - let mut req = http::Request::new( - http_body_util::Empty::new() - .map_err(|never| match never {}) - .boxed(), - ); + let mut req = http::Request::new(deno_fetch::ReqBody::empty()); *req.uri_mut() = script_url.as_str().parse()?; let resp =