diff --git a/Cargo.lock b/Cargo.lock index 7371baf6a5..308cb13b41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -743,6 +743,7 @@ dependencies = [ "os_pipe", "pretty_assertions", "regex", + "reqwest", "serde", "test_server", "tokio", @@ -1148,6 +1149,10 @@ dependencies = [ "fs3", "glibc_version", "glob", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper-util", "import_map", "indexmap", "jsonc-parser", @@ -1172,7 +1177,6 @@ dependencies = [ "quick-junit", "rand", "regex", - "reqwest", "ring", "runtimelib", "rustyline", @@ -1462,6 +1466,7 @@ dependencies = [ name = "deno_fetch" version = "0.185.0" dependencies = [ + "base64 0.21.7", "bytes", "data-url", "deno_core", @@ -1469,11 +1474,20 @@ dependencies = [ "deno_tls", "dyn-clone", "http 1.1.0", - "reqwest", + "http-body-util", + "hyper 1.4.0", + "hyper-rustls", + "hyper-util", + "ipnet", "serde", "serde_json", "tokio", + "tokio-rustls", + "tokio-socks", "tokio-util", + "tower", + "tower-http", + "tower-service", ] [[package]] @@ -1617,6 +1631,7 @@ dependencies = [ "denokv_sqlite", "faster-hex", "http 1.1.0", + "http-body-util", "log", "num-bigint", "prost", @@ -1736,6 +1751,7 @@ dependencies = [ "hkdf", "home", "http 1.1.0", + "http-body-util", "idna 0.3.0", "indexmap", "ipnetwork", @@ -1758,7 +1774,6 @@ dependencies = [ "pin-project-lite", "rand", "regex", - "reqwest", "ring", "ripemd", "rsa", @@ -3358,12 +3373,12 @@ dependencies = [ [[package]] name = "http-body-util" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0475f8b2ac86659c21b64320d5d653f9efe42acd2a4e560073ec61a155a34f1d" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", - "futures-core", + "futures-util", "http 1.1.0", "http-body 1.0.0", "pin-project-lite", @@ -7054,6 +7069,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "async-compression", + "bitflags 2.5.0", + "bytes", + "futures-core", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "pin-project-lite", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.2" diff --git a/Cargo.toml b/Cargo.toml index e1ae7dc545..6476ef1752 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -115,13 +115,16 @@ futures = "0.3.21" glob = "0.3.1" h2 = "0.4.4" http = "1.0" -http-body-util = "0.1" +http-body = "1.0" +http-body-util = "0.1.2" http_v02 = { package = "http", version = "0.2.9" } httparse = "1.8.0" hyper = { version = "=1.4.0", features = ["full"] } -hyper-util = { version = "=0.1.6", features = ["tokio", "server", "server-auto"] } +hyper-rustls = { version = "0.26.0", default-features = false, features = ["http1", "http2", "tls12", "ring"] } +hyper-util = { version = "=0.1.6", features = ["tokio", "client", "client-legacy", "server", "server-auto"] } hyper_v014 = { package = "hyper", version = "0.14.26", features = ["runtime", "http1"] } indexmap = { version = "2", features = ["serde"] } +ipnet = "2.3" jsonc-parser = { version = "=0.23.0", features = ["serde"] } lazy-regex = "3" libc = "0.2.126" @@ -173,8 +176,13 @@ termcolor = "1.1.3" thiserror = "1.0.61" tokio = { version = "1.36.0", features = ["full"] } tokio-metrics = { version = "0.3.0", features = ["rt"] } +tokio-rustls = "0.25.0" +tokio-socks = "0.5.1" tokio-util = "0.7.4" +tower = { version = "0.4.13", default-features = false, features = ["util"] } +tower-http = { version = "0.5.2", features = ["decompression-br", "decompression-gzip"] } tower-lsp = { version = "=0.20.0", features = ["proposed"] } +tower-service = "0.3.2" twox-hash = "=1.6.3" # Upgrading past 2.4.1 may cause WPT failures url = { version = "< 2.5.0", features = ["serde", "expose_internals"] } diff --git a/cli/Cargo.toml b/cli/Cargo.toml index d02411dc2e..d9144afff9 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -107,6 +107,10 @@ faster-hex.workspace = true flate2.workspace = true fs3.workspace = true glob = "0.3.1" +http.workspace = true +http-body.workspace = true +http-body-util.workspace = true +hyper-util.workspace = true import_map = { version = "=0.20.0", features = ["ext"] } indexmap.workspace = true jsonc-parser.workspace = true @@ -128,7 +132,6 @@ phf.workspace = true quick-junit = "^0.3.5" rand = { workspace = true, features = ["small_rng"] } regex.workspace = true -reqwest.workspace = true ring.workspace = true rustyline.workspace = true rustyline-derive = "=0.7.0" diff --git a/cli/http_util.rs b/cli/http_util.rs index 18c0687bdb..a8646c1880 100644 --- a/cli/http_util.rs +++ b/cli/http_util.rs @@ -12,18 +12,22 @@ use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::futures::StreamExt; use deno_core::parking_lot::Mutex; +use deno_core::serde; +use deno_core::serde_json; use deno_core::url::Url; +use deno_runtime::deno_fetch; use deno_runtime::deno_fetch::create_http_client; -use deno_runtime::deno_fetch::reqwest; -use deno_runtime::deno_fetch::reqwest::header::HeaderName; -use deno_runtime::deno_fetch::reqwest::header::HeaderValue; -use deno_runtime::deno_fetch::reqwest::header::ACCEPT; -use deno_runtime::deno_fetch::reqwest::header::AUTHORIZATION; -use deno_runtime::deno_fetch::reqwest::header::IF_NONE_MATCH; -use deno_runtime::deno_fetch::reqwest::header::LOCATION; -use deno_runtime::deno_fetch::reqwest::StatusCode; use deno_runtime::deno_fetch::CreateHttpClientOptions; use deno_runtime::deno_tls::RootCertStoreProvider; +use http::header::HeaderName; +use http::header::HeaderValue; +use http::header::ACCEPT; +use http::header::AUTHORIZATION; +use http::header::IF_NONE_MATCH; +use http::header::LOCATION; +use http::StatusCode; +use http_body_util::BodyExt; + use std::collections::HashMap; use std::sync::Arc; use std::thread::ThreadId; @@ -208,8 +212,7 @@ pub struct HttpClientProvider { // it's not safe to share a reqwest::Client across tokio runtimes, // so we store these Clients keyed by thread id // https://github.com/seanmonstar/reqwest/issues/1148#issuecomment-910868788 - #[allow(clippy::disallowed_types)] // reqwest::Client allowed here - clients_by_thread_id: Mutex>, + clients_by_thread_id: Mutex>, } impl std::fmt::Debug for HttpClientProvider { @@ -270,9 +273,15 @@ pub struct BadResponseError { #[derive(Debug, Error)] pub enum DownloadError { #[error(transparent)] - Reqwest(#[from] reqwest::Error), + Fetch(AnyError), #[error(transparent)] - ToStr(#[from] reqwest::header::ToStrError), + UrlParse(#[from] deno_core::url::ParseError), + #[error(transparent)] + HttpParse(#[from] http::Error), + #[error(transparent)] + Json(#[from] serde_json::Error), + #[error(transparent)] + ToStr(#[from] http::header::ToStrError), #[error("Redirection from '{}' did not provide location header", .request_url)] NoRedirectHeader { request_url: Url }, #[error("Too many redirects.")] @@ -283,8 +292,7 @@ pub enum DownloadError { #[derive(Debug)] pub struct HttpClient { - #[allow(clippy::disallowed_types)] // reqwest::Client allowed here - client: reqwest::Client, + client: deno_fetch::Client, // don't allow sending this across threads because then // it might be shared accidentally across tokio runtimes // which will cause issues @@ -295,22 +303,56 @@ pub struct HttpClient { impl HttpClient { // DO NOT make this public. You should always be creating one of these from // the HttpClientProvider - #[allow(clippy::disallowed_types)] // reqwest::Client allowed here - fn new(client: reqwest::Client) -> Self { + fn new(client: deno_fetch::Client) -> Self { Self { client, _unsend_marker: deno_core::unsync::UnsendMarker::default(), } } - // todo(dsherret): don't expose `reqwest::RequestBuilder` because it - // is `Sync` and could accidentally be shared with multiple tokio runtimes - pub fn get(&self, url: impl reqwest::IntoUrl) -> reqwest::RequestBuilder { - self.client.get(url) + pub fn get(&self, url: Url) -> Result { + let body = http_body_util::Empty::new() + .map_err(|never| match never {}) + .boxed(); + let mut req = http::Request::new(body); + *req.uri_mut() = url.as_str().parse()?; + Ok(RequestBuilder { + client: self.client.clone(), + req, + }) } - pub fn post(&self, url: impl reqwest::IntoUrl) -> reqwest::RequestBuilder { - self.client.post(url) + pub fn post( + &self, + url: Url, + body: deno_fetch::ReqBody, + ) -> Result { + let mut req = http::Request::new(body); + *req.method_mut() = http::Method::POST; + *req.uri_mut() = url.as_str().parse()?; + Ok(RequestBuilder { + client: self.client.clone(), + req, + }) + } + + pub fn post_json( + &self, + url: Url, + ser: &S, + ) -> Result + where + S: serde::Serialize, + { + let json = deno_core::serde_json::to_vec(ser)?; + let body = http_body_util::Full::new(json.into()) + .map_err(|never| match never {}) + .boxed(); + let builder = self.post(url, body)?; + Ok(builder.header( + http::header::CONTENT_TYPE, + "application/json".parse().map_err(http::Error::from)?, + )) } /// Asynchronously fetches the given HTTP URL one pass only. @@ -322,27 +364,35 @@ impl HttpClient { &self, args: FetchOnceArgs<'a>, ) -> Result { - let mut request = self.client.get(args.url.clone()); + let body = http_body_util::Empty::new() + .map_err(|never| match never {}) + .boxed(); + let mut request = http::Request::new(body); + *request.uri_mut() = args.url.as_str().parse()?; if let Some(etag) = args.maybe_etag { let if_none_match_val = HeaderValue::from_str(&etag)?; - request = request.header(IF_NONE_MATCH, if_none_match_val); + request + .headers_mut() + .insert(IF_NONE_MATCH, if_none_match_val); } if let Some(auth_token) = args.maybe_auth_token { let authorization_val = HeaderValue::from_str(&auth_token.to_string())?; - request = request.header(AUTHORIZATION, authorization_val); + request + .headers_mut() + .insert(AUTHORIZATION, authorization_val); } if let Some(accept) = args.maybe_accept { let accepts_val = HeaderValue::from_str(&accept)?; - request = request.header(ACCEPT, accepts_val); + request.headers_mut().insert(ACCEPT, accepts_val); } - let response = match request.send().await { + let response = match self.client.clone().send(request).await { Ok(resp) => resp, Err(err) => { - if err.is_connect() || err.is_timeout() { + if is_error_connect(&err) { return Ok(FetchOnceResult::RequestError(err.to_string())); } - return Err(err.into()); + return Err(err); } }; @@ -406,18 +456,12 @@ impl HttpClient { Ok(FetchOnceResult::Code(body, result_headers)) } - pub async fn download_text( - &self, - url: impl reqwest::IntoUrl, - ) -> Result { + pub async fn download_text(&self, url: Url) -> Result { let bytes = self.download(url).await?; Ok(String::from_utf8(bytes)?) } - pub async fn download( - &self, - url: impl reqwest::IntoUrl, - ) -> Result, AnyError> { + pub async fn download(&self, url: Url) -> Result, AnyError> { let maybe_bytes = self.download_inner(url, None, None).await?; match maybe_bytes { Some(bytes) => Ok(bytes), @@ -427,7 +471,7 @@ impl HttpClient { pub async fn download_with_progress( &self, - url: impl reqwest::IntoUrl, + url: Url, maybe_header: Option<(HeaderName, HeaderValue)>, progress_guard: &UpdateGuard, ) -> Result>, DownloadError> { @@ -438,26 +482,26 @@ impl HttpClient { pub async fn get_redirected_url( &self, - url: impl reqwest::IntoUrl, + url: Url, maybe_header: Option<(HeaderName, HeaderValue)>, ) -> Result { - let response = self.get_redirected_response(url, maybe_header).await?; - Ok(response.url().clone()) + let (_, url) = self.get_redirected_response(url, maybe_header).await?; + Ok(url) } async fn download_inner( &self, - url: impl reqwest::IntoUrl, + url: Url, maybe_header: Option<(HeaderName, HeaderValue)>, progress_guard: Option<&UpdateGuard>, ) -> Result>, DownloadError> { - let response = self.get_redirected_response(url, maybe_header).await?; + let (response, _) = self.get_redirected_response(url, maybe_header).await?; if response.status() == 404 { return Ok(None); } else if !response.status().is_success() { let status = response.status(); - let maybe_response_text = response.text().await.ok(); + let maybe_response_text = body_to_string(response).await.ok(); return Err(DownloadError::BadResponse(BadResponseError { status_code: status, response_text: maybe_response_text @@ -469,60 +513,77 @@ impl HttpClient { get_response_body_with_progress(response, progress_guard) .await .map(Some) - .map_err(Into::into) + .map_err(DownloadError::Fetch) } async fn get_redirected_response( &self, - url: impl reqwest::IntoUrl, + mut url: Url, mut maybe_header: Option<(HeaderName, HeaderValue)>, - ) -> Result { - let mut url = url.into_url()?; - let mut builder = self.get(url.clone()); + ) -> Result<(http::Response, Url), DownloadError> { + let mut req = self.get(url.clone())?.build(); if let Some((header_name, header_value)) = maybe_header.as_ref() { - builder = builder.header(header_name, header_value); + req.headers_mut().append(header_name, header_value.clone()); } - let mut response = builder.send().await?; + let mut response = self + .client + .clone() + .send(req) + .await + .map_err(DownloadError::Fetch)?; let status = response.status(); if status.is_redirection() { for _ in 0..5 { let new_url = resolve_redirect_from_response(&url, &response)?; - let mut builder = self.get(new_url.clone()); + let mut req = self.get(new_url.clone())?.build(); if new_url.origin() == url.origin() { if let Some((header_name, header_value)) = maybe_header.as_ref() { - builder = builder.header(header_name, header_value); + req.headers_mut().append(header_name, header_value.clone()); } } else { maybe_header = None; } - let new_response = builder.send().await?; + let new_response = self + .client + .clone() + .send(req) + .await + .map_err(DownloadError::Fetch)?; let status = new_response.status(); if status.is_redirection() { response = new_response; url = new_url; } else { - return Ok(new_response); + return Ok((new_response, new_url)); } } Err(DownloadError::TooManyRedirects) } else { - Ok(response) + Ok((response, url)) } } } +fn is_error_connect(err: &AnyError) -> bool { + err + .downcast_ref::() + .map(|err| err.is_connect()) + .unwrap_or(false) +} + async fn get_response_body_with_progress( - response: reqwest::Response, + response: http::Response, progress_guard: Option<&UpdateGuard>, -) -> Result, reqwest::Error> { +) -> Result, AnyError> { + use http_body::Body as _; if let Some(progress_guard) = progress_guard { - if let Some(total_size) = response.content_length() { + if let Some(total_size) = response.body().size_hint().exact() { progress_guard.set_total_size(total_size); let mut current_size = 0; let mut data = Vec::with_capacity(total_size as usize); - let mut stream = response.bytes_stream(); + let mut stream = response.into_body().into_data_stream(); while let Some(item) = stream.next().await { let bytes = item?; current_size += bytes.len() as u64; @@ -532,7 +593,7 @@ async fn get_response_body_with_progress( return Ok(data); } } - let bytes = response.bytes().await?; + let bytes = response.collect().await?.to_bytes(); Ok(bytes.into()) } @@ -563,9 +624,9 @@ fn resolve_url_from_location(base_url: &Url, location: &str) -> Url { } } -fn resolve_redirect_from_response( +fn resolve_redirect_from_response( request_url: &Url, - response: &reqwest::Response, + response: &http::Response, ) -> Result { debug_assert!(response.status().is_redirection()); if let Some(location) = response.headers().get(LOCATION) { @@ -580,6 +641,49 @@ fn resolve_redirect_from_response( } } +pub async fn body_to_string(body: B) -> Result +where + B: http_body::Body, + AnyError: From, +{ + let bytes = body.collect().await?.to_bytes(); + let s = std::str::from_utf8(&bytes)?; + Ok(s.into()) +} + +pub async fn body_to_json(body: B) -> Result +where + B: http_body::Body, + AnyError: From, + D: serde::de::DeserializeOwned, +{ + let bytes = body.collect().await?.to_bytes(); + let val = deno_core::serde_json::from_slice(&bytes)?; + Ok(val) +} + +pub struct RequestBuilder { + client: deno_fetch::Client, + req: http::Request, +} + +impl RequestBuilder { + pub fn header(mut self, name: HeaderName, value: HeaderValue) -> Self { + self.req.headers_mut().append(name, value); + self + } + + pub async fn send( + self, + ) -> Result, AnyError> { + self.client.send(self.req).await + } + + pub fn build(self) -> http::Request { + self.req + } +} + #[allow(clippy::print_stdout)] #[allow(clippy::print_stderr)] #[cfg(test)] @@ -600,14 +704,20 @@ mod test { // make a request to the redirect server let text = client - .download_text("http://localhost:4546/subdir/redirects/redirect1.js") + .download_text( + Url::parse("http://localhost:4546/subdir/redirects/redirect1.js") + .unwrap(), + ) .await .unwrap(); assert_eq!(text, "export const redirect = 1;\n"); // now make one to the infinite redirects server let err = client - .download_text("http://localhost:4549/subdir/redirects/redirect1.js") + .download_text( + Url::parse("http://localhost:4549/subdir/redirects/redirect1.js") + .unwrap(), + ) .await .err() .unwrap(); diff --git a/cli/npm/common.rs b/cli/npm/common.rs index c55f73cd50..34835216c9 100644 --- a/cli/npm/common.rs +++ b/cli/npm/common.rs @@ -1,7 +1,7 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use deno_npm::npm_rc::RegistryConfig; -use reqwest::header; +use http::header; // TODO(bartlomieju): support more auth methods besides token and basic auth pub fn maybe_auth_header_for_npm_registry( diff --git a/cli/npm/managed/cache/tarball.rs b/cli/npm/managed/cache/tarball.rs index 46186b87c7..eec890bed2 100644 --- a/cli/npm/managed/cache/tarball.rs +++ b/cli/npm/managed/cache/tarball.rs @@ -11,12 +11,12 @@ use deno_core::error::AnyError; use deno_core::futures::future::LocalBoxFuture; use deno_core::futures::FutureExt; use deno_core::parking_lot::Mutex; +use deno_core::url::Url; use deno_npm::npm_rc::ResolvedNpmRc; use deno_npm::registry::NpmPackageVersionDistInfo; use deno_runtime::deno_fs::FileSystem; use deno_semver::package::PackageNv; -use reqwest::StatusCode; -use reqwest::Url; +use http::StatusCode; use crate::args::CacheSetting; use crate::http_util::DownloadError; diff --git a/cli/standalone/binary.rs b/cli/standalone/binary.rs index 10a7620933..342c637d54 100644 --- a/cli/standalone/binary.rs +++ b/cli/standalone/binary.rs @@ -495,7 +495,7 @@ impl<'a> DenoCompileBinaryWriter<'a> { self .http_client_provider .get_or_create()? - .download_with_progress(download_url, None, &progress) + .download_with_progress(download_url.parse()?, None, &progress) .await? }; let bytes = match maybe_bytes { diff --git a/cli/tools/registry/api.rs b/cli/tools/registry/api.rs index ee9579a194..519800660c 100644 --- a/cli/tools/registry/api.rs +++ b/cli/tools/registry/api.rs @@ -1,8 +1,9 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +use crate::http_util; use deno_core::error::AnyError; use deno_core::serde_json; -use deno_runtime::deno_fetch::reqwest; +use deno_runtime::deno_fetch; use lsp_types::Url; use serde::de::DeserializeOwned; @@ -82,7 +83,7 @@ impl std::fmt::Debug for ApiError { impl std::error::Error for ApiError {} pub async fn parse_response( - response: reqwest::Response, + response: http::Response, ) -> Result { let status = response.status(); let x_deno_ray = response @@ -90,7 +91,7 @@ pub async fn parse_response( .get("x-deno-ray") .and_then(|value| value.to_str().ok()) .map(|s| s.to_string()); - let text = response.text().await.unwrap(); + let text = http_util::body_to_string(response).await.unwrap(); if !status.is_success() { match serde_json::from_str::(&text) { @@ -122,9 +123,9 @@ pub async fn get_scope( client: &HttpClient, registry_api_url: &Url, scope: &str, -) -> Result { +) -> Result, AnyError> { let scope_url = format!("{}scopes/{}", registry_api_url, scope); - let response = client.get(&scope_url).send().await?; + let response = client.get(scope_url.parse()?)?.send().await?; Ok(response) } @@ -141,9 +142,9 @@ pub async fn get_package( registry_api_url: &Url, scope: &str, package: &str, -) -> Result { +) -> Result, AnyError> { let package_url = get_package_api_url(registry_api_url, scope, package); - let response = client.get(&package_url).send().await?; + let response = client.get(package_url.parse()?)?.send().await?; Ok(response) } diff --git a/cli/tools/registry/mod.rs b/cli/tools/registry/mod.rs index 8e4d97897e..a22384a528 100644 --- a/cli/tools/registry/mod.rs +++ b/cli/tools/registry/mod.rs @@ -23,8 +23,8 @@ use deno_core::futures::StreamExt; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; -use deno_runtime::deno_fetch::reqwest; use deno_terminal::colors; +use http_body_util::BodyExt; use lsp_types::Url; use serde::Deserialize; use serde::Serialize; @@ -539,11 +539,13 @@ async fn get_auth_headers( let challenge = BASE64_STANDARD.encode(sha2::Sha256::digest(&verifier)); let response = client - .post(format!("{}authorizations", registry_url)) - .json(&serde_json::json!({ - "challenge": challenge, - "permissions": permissions, - })) + .post_json( + format!("{}authorizations", registry_url).parse()?, + &serde_json::json!({ + "challenge": challenge, + "permissions": permissions, + }), + )? .send() .await .context("Failed to create interactive authorization")?; @@ -573,11 +575,13 @@ async fn get_auth_headers( loop { tokio::time::sleep(interval).await; let response = client - .post(format!("{}authorizations/exchange", registry_url)) - .json(&serde_json::json!({ - "exchangeToken": auth.exchange_token, - "verifier": verifier, - })) + .post_json( + format!("{}authorizations/exchange", registry_url).parse()?, + &serde_json::json!({ + "exchangeToken": auth.exchange_token, + "verifier": verifier, + }), + )? .send() .await .context("Failed to exchange authorization")?; @@ -634,15 +638,20 @@ async fn get_auth_headers( ); let response = client - .get(url) - .bearer_auth(&oidc_config.token) + .get(url.parse()?)? + .header( + http::header::AUTHORIZATION, + format!("Bearer {}", oidc_config.token).parse()?, + ) .send() .await .context("Failed to get OIDC token")?; let status = response.status(); - let text = response.text().await.with_context(|| { - format!("Failed to get OIDC token: status {}", status) - })?; + let text = crate::http_util::body_to_string(response) + .await + .with_context(|| { + format!("Failed to get OIDC token: status {}", status) + })?; if !status.is_success() { bail!( "Failed to get OIDC token: status {}, response: '{}'", @@ -770,7 +779,7 @@ async fn ensure_scopes_and_packages_exist( loop { tokio::time::sleep(std::time::Duration::from_secs(3)).await; - let response = client.get(&package_api_url).send().await?; + let response = client.get(package_api_url.parse()?)?.send().await?; if response.status() == 200 { let name = format!("@{}/{}", package.scope, package.package); log::info!("Package {} created", colors::green(name)); @@ -894,11 +903,19 @@ async fn publish_package( package.config ); + let body = http_body_util::Full::new(package.tarball.bytes.clone()) + .map_err(|never| match never {}) + .boxed(); let response = http_client - .post(url) - .header(reqwest::header::AUTHORIZATION, authorization) - .header(reqwest::header::CONTENT_ENCODING, "gzip") - .body(package.tarball.bytes.clone()) + .post(url.parse()?, body)? + .header( + http::header::AUTHORIZATION, + authorization.parse().map_err(http::Error::from)?, + ) + .header( + http::header::CONTENT_ENCODING, + "gzip".parse().map_err(http::Error::from)?, + ) .send() .await?; @@ -943,7 +960,7 @@ async fn publish_package( while task.status != "success" && task.status != "failure" { tokio::time::sleep(interval).await; let resp = http_client - .get(format!("{}publish_status/{}", registry_api_url, task.id)) + .get(format!("{}publish_status/{}", registry_api_url, task.id).parse()?)? .send() .await .with_context(|| { @@ -992,7 +1009,8 @@ async fn publish_package( package.scope, package.package, package.version ))?; - let meta_bytes = http_client.get(meta_url).send().await?.bytes().await?; + let resp = http_client.get(meta_url)?.send().await?; + let meta_bytes = resp.collect().await?.to_bytes(); if std::env::var("DISABLE_JSR_MANIFEST_VERIFICATION_FOR_TESTING").is_err() { verify_version_manifest(&meta_bytes, &package)?; @@ -1023,9 +1041,8 @@ async fn publish_package( registry_api_url, package.scope, package.package, package.version ); http_client - .post(provenance_url) - .header(reqwest::header::AUTHORIZATION, authorization) - .json(&json!({ "bundle": bundle })) + .post_json(provenance_url.parse()?, &json!({ "bundle": bundle }))? + .header(http::header::AUTHORIZATION, authorization.parse()?) .send() .await?; } diff --git a/cli/tools/registry/provenance.rs b/cli/tools/registry/provenance.rs index 622e483d6e..ce3d6ff8a8 100644 --- a/cli/tools/registry/provenance.rs +++ b/cli/tools/registry/provenance.rs @@ -1,5 +1,6 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +use crate::http_util; use crate::http_util::HttpClient; use super::api::OidcTokenResponse; @@ -12,6 +13,8 @@ use deno_core::anyhow; use deno_core::anyhow::bail; use deno_core::error::AnyError; use deno_core::serde_json; +use deno_core::url::Url; +use http_body_util::BodyExt; use once_cell::sync::Lazy; use p256::elliptic_curve; use p256::pkcs8::AssociatedOid; @@ -504,12 +507,12 @@ impl<'a> FulcioSigner<'a> { let response = self .http_client - .post(url) - .json(&request_body) + .post_json(url.parse()?, &request_body)? .send() .await?; - let body: SigningCertificateResponse = response.json().await?; + let body: SigningCertificateResponse = + http_util::body_to_json(response).await?; let key = body .signed_certificate_embedded_sct @@ -527,15 +530,23 @@ impl<'a> FulcioSigner<'a> { bail!("No OIDC token available"); }; - let res = self + let mut url = req_url.parse::()?; + url.query_pairs_mut().append_pair("audience", aud); + let res_bytes = self .http_client - .get(&req_url) - .bearer_auth(token) - .query(&[("audience", aud)]) + .get(url)? + .header( + http::header::AUTHORIZATION, + format!("Bearer {}", token) + .parse() + .map_err(http::Error::from)?, + ) .send() .await? - .json::() - .await?; + .collect() + .await? + .to_bytes(); + let res: OidcTokenResponse = serde_json::from_slice(&res_bytes)?; Ok(res.value) } } @@ -685,11 +696,10 @@ async fn testify( let url = format!("{}/api/v1/log/entries", *DEFAULT_REKOR_URL); let res = http_client - .post(&url) - .json(&proposed_intoto_entry) + .post_json(url.parse()?, &proposed_intoto_entry)? .send() .await?; - let body: RekorEntry = res.json().await?; + let body: RekorEntry = http_util::body_to_json(res).await?; Ok(body) } diff --git a/cli/tools/test/mod.rs b/cli/tools/test/mod.rs index 81dc36a892..587b737d63 100644 --- a/cli/tools/test/mod.rs +++ b/cli/tools/test/mod.rs @@ -881,12 +881,11 @@ async fn run_tests_for_worker_inner( // failing. If we don't do this, a connection to a test server we just tore down might be re-used in // the next test. // TODO(mmastrac): this should be some sort of callback that we can implement for any subsystem - #[allow(clippy::disallowed_types)] // allow using reqwest::Client here worker .js_runtime .op_state() .borrow_mut() - .try_take::(); + .try_take::(); if desc.ignore { send_test_event( diff --git a/cli/tools/upgrade.rs b/cli/tools/upgrade.rs index 2afeffc92c..fd8394efa9 100644 --- a/cli/tools/upgrade.rs +++ b/cli/tools/upgrade.rs @@ -571,7 +571,7 @@ async fn get_latest_version( check_kind: UpgradeCheckKind, ) -> Result { let url = get_url(release_kind, env!("TARGET"), check_kind); - let text = client.download_text(url).await?; + let text = client.download_text(url.parse()?).await?; Ok(normalize_version_from_server(release_kind, &text)) } @@ -624,7 +624,7 @@ async fn download_package( // text above which will stay alive after the progress bars are complete let progress = progress_bar.update(""); client - .download_with_progress(download_url, None, &progress) + .download_with_progress(download_url.parse()?, None, &progress) .await? }; match maybe_bytes { diff --git a/ext/fetch/Cargo.toml b/ext/fetch/Cargo.toml index 8785da7df3..dc7cacd37e 100644 --- a/ext/fetch/Cargo.toml +++ b/ext/fetch/Cargo.toml @@ -14,6 +14,7 @@ description = "Fetch API implementation for Deno" path = "lib.rs" [dependencies] +base64.workspace = true bytes.workspace = true data-url.workspace = true deno_core.workspace = true @@ -21,8 +22,17 @@ deno_permissions.workspace = true deno_tls.workspace = true dyn-clone = "1" http.workspace = true -reqwest.workspace = true +http-body-util.workspace = true +hyper.workspace = true +hyper-rustls.workspace = true +hyper-util.workspace = true +ipnet.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true +tokio-rustls.workspace = true +tokio-socks.workspace = true tokio-util = { workspace = true, features = ["io"] } +tower.workspace = true +tower-http.workspace = true +tower-service.workspace = true diff --git a/ext/fetch/fs_fetch_handler.rs b/ext/fetch/fs_fetch_handler.rs index 29bad5992b..6f45ee664e 100644 --- a/ext/fetch/fs_fetch_handler.rs +++ b/ext/fetch/fs_fetch_handler.rs @@ -7,10 +7,12 @@ use crate::FetchHandler; use deno_core::error::type_error; use deno_core::futures::FutureExt; use deno_core::futures::TryFutureExt; +use deno_core::futures::TryStreamExt; use deno_core::url::Url; use deno_core::CancelFuture; use deno_core::OpState; -use reqwest::StatusCode; +use http::StatusCode; +use http_body_util::BodyExt; use std::rc::Rc; use tokio_util::io::ReaderStream; @@ -23,19 +25,21 @@ impl FetchHandler for FsFetchHandler { fn fetch_file( &self, _state: &mut OpState, - url: Url, + url: &Url, ) -> (CancelableResponseFuture, Option>) { let cancel_handle = CancelHandle::new_rc(); + let path_result = url.to_file_path(); let response_fut = async move { - let path = url.to_file_path()?; + let path = path_result?; let file = tokio::fs::File::open(path).map_err(|_| ()).await?; - let stream = ReaderStream::new(file); - let body = reqwest::Body::wrap_stream(stream); + let stream = ReaderStream::new(file) + .map_ok(hyper::body::Frame::data) + .map_err(Into::into); + let body = http_body_util::StreamBody::new(stream).boxed(); let response = http::Response::builder() .status(StatusCode::OK) .body(body) - .map_err(|_| ())? - .into(); + .map_err(|_| ())?; Ok::<_, ()>(response) } .map_err(move |_| { diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 75ceb86d99..1343a9f56e 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -1,6 +1,7 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. mod fs_fetch_handler; +mod proxy; use std::borrow::Cow; use std::cell::RefCell; @@ -14,7 +15,7 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -use bytes::Bytes; +use deno_core::anyhow::anyhow; use deno_core::anyhow::Error; use deno_core::error::type_error; use deno_core::error::AnyError; @@ -42,34 +43,38 @@ use deno_core::ResourceId; use deno_tls::rustls::RootCertStore; use deno_tls::Proxy; use deno_tls::RootCertStoreProvider; - -use data_url::DataUrl; use deno_tls::TlsKey; use deno_tls::TlsKeys; use deno_tls::TlsKeysHolder; + +use bytes::Bytes; +use data_url::DataUrl; +use http::header::HeaderName; +use http::header::HeaderValue; +use http::header::ACCEPT_ENCODING; use http::header::CONTENT_LENGTH; +use http::header::HOST; +use http::header::PROXY_AUTHORIZATION; +use http::header::RANGE; +use http::header::USER_AGENT; +use http::Method; use http::Uri; -use reqwest::header::HeaderMap; -use reqwest::header::HeaderName; -use reqwest::header::HeaderValue; -use reqwest::header::ACCEPT_ENCODING; -use reqwest::header::HOST; -use reqwest::header::RANGE; -use reqwest::header::USER_AGENT; -use reqwest::redirect::Policy; -use reqwest::Body; -use reqwest::Client; -use reqwest::Method; -use reqwest::RequestBuilder; -use reqwest::Response; +use http_body_util::BodyExt; +use hyper::body::Frame; +use hyper_rustls::HttpsConnector; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::rt::TokioExecutor; +use hyper_util::rt::TokioIo; +use hyper_util::rt::TokioTimer; use serde::Deserialize; use serde::Serialize; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; +use tower::ServiceExt; +use tower_http::decompression::Decompression; -// Re-export reqwest and data_url +// Re-export data_url pub use data_url; -pub use reqwest; pub use fs_fetch_handler::FsFetchHandler; @@ -78,8 +83,9 @@ pub struct Options { pub user_agent: String, pub root_cert_store_provider: Option>, pub proxy: Option, + #[allow(clippy::type_complexity)] pub request_builder_hook: - Option Result>, + Option) -> Result<(), AnyError>>, pub unsafely_ignore_certificate_errors: Option>, pub client_cert_chain_and_key: TlsKeys, pub file_fetch_handler: Rc, @@ -146,7 +152,7 @@ pub trait FetchHandler: dyn_clone::DynClone { fn fetch_file( &self, state: &mut OpState, - url: Url, + url: &Url, ) -> (CancelableResponseFuture, Option>); } @@ -160,7 +166,7 @@ impl FetchHandler for DefaultFileFetchHandler { fn fetch_file( &self, _state: &mut OpState, - _url: Url, + _url: &Url, ) -> (CancelableResponseFuture, Option>) { let fut = async move { Ok(Err(type_error( @@ -183,20 +189,20 @@ pub struct FetchReturn { pub fn get_or_create_client_from_state( state: &mut OpState, -) -> Result { - if let Some(client) = state.try_borrow::() { +) -> Result { + if let Some(client) = state.try_borrow::() { Ok(client.clone()) } else { let options = state.borrow::(); let client = create_client_from_options(options)?; - state.put::(client.clone()); + state.put::(client.clone()); Ok(client) } } pub fn create_client_from_options( options: &Options, -) -> Result { +) -> Result { create_http_client( &options.user_agent, CreateHttpClientOptions { @@ -253,11 +259,11 @@ impl Stream for ResourceToBodyAdapter { } Poll::Ready(res) => match res { Ok(buf) if buf.is_empty() => Poll::Ready(None), - Ok(_) => { + Ok(buf) => { this.1 = Some(this.0.clone().read(64 * 1024)); - Poll::Ready(Some(res.map(|b| b.to_vec().into()))) + Poll::Ready(Some(Ok(buf.to_vec().into()))) } - _ => Poll::Ready(Some(res.map(|b| b.to_vec().into()))), + Err(err) => Poll::Ready(Some(Err(err))), }, } } else { @@ -266,6 +272,22 @@ impl Stream for ResourceToBodyAdapter { } } +impl hyper::body::Body for ResourceToBodyAdapter { + type Data = Bytes; + type Error = Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.poll_next(cx) { + Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + impl Drop for ResourceToBodyAdapter { fn drop(&mut self) { self.0.clone().close() @@ -347,9 +369,11 @@ where file_fetch_handler, .. } = state.borrow_mut::(); let file_fetch_handler = file_fetch_handler.clone(); - let (request, maybe_cancel_handle) = - file_fetch_handler.fetch_file(state, url); - let request_rid = state.resource_table.add(FetchRequestResource(request)); + let (future, maybe_cancel_handle) = + file_fetch_handler.fetch_file(state, &url); + let request_rid = state + .resource_table + .add(FetchRequestResource { future, url }); let maybe_cancel_handle_rid = maybe_cancel_handle .map(|ch| state.resource_table.add(FetchCancelHandle(ch))); @@ -359,31 +383,31 @@ where let permissions = state.borrow_mut::(); permissions.check_net_url(&url, "fetch()")?; - // Make sure that we have a valid URI early, as reqwest's `RequestBuilder::send` - // internally uses `expect_uri`, which panics instead of returning a usable `Result`. - if url.as_str().parse::().is_err() { - return Err(type_error("Invalid URL")); - } + let uri = url + .as_str() + .parse::() + .map_err(|_| type_error("Invalid URL"))?; - let mut request = client.request(method.clone(), url); - - if has_body { + let mut con_len = None; + let body = if has_body { match (data, resource) { (Some(data), _) => { // If a body is passed, we use it, and don't return a body for streaming. - request = request.body(data.to_vec()); + con_len = Some(data.len() as u64); + + http_body_util::Full::new(data.to_vec().into()) + .map_err(|never| match never {}) + .boxed() } (_, Some(resource)) => { let resource = state.resource_table.take_any(resource)?; match resource.size_hint() { (body_size, Some(n)) if body_size == n && body_size > 0 => { - request = - request.header(CONTENT_LENGTH, HeaderValue::from(body_size)); + con_len = Some(body_size); } _ => {} } - request = request - .body(Body::wrap_stream(ResourceToBodyAdapter::new(resource))) + ReqBody::new(ResourceToBodyAdapter::new(resource)) } (None, None) => unreachable!(), } @@ -391,11 +415,21 @@ where // POST and PUT requests should always have a 0 length content-length, // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch if matches!(method, Method::POST | Method::PUT) { - request = request.header(CONTENT_LENGTH, HeaderValue::from(0)); + con_len = Some(0); } + http_body_util::Empty::new() + .map_err(|never| match never {}) + .boxed() }; - let mut header_map = HeaderMap::new(); + let mut request = http::Request::new(body); + *request.method_mut() = method.clone(); + *request.uri_mut() = uri; + + if let Some(len) = con_len { + request.headers_mut().insert(CONTENT_LENGTH, len.into()); + } + for (key, value) in headers { let name = HeaderName::from_bytes(&key) .map_err(|err| type_error(err.to_string()))?; @@ -403,38 +437,34 @@ where .map_err(|err| type_error(err.to_string()))?; if (name != HOST || allow_host) && name != CONTENT_LENGTH { - header_map.append(name, v); + request.headers_mut().append(name, v); } } - if header_map.contains_key(RANGE) { + if request.headers().contains_key(RANGE) { // https://fetch.spec.whatwg.org/#http-network-or-cache-fetch step 18 // If httpRequest’s header list contains `Range`, then append (`Accept-Encoding`, `identity`) - header_map + request + .headers_mut() .insert(ACCEPT_ENCODING, HeaderValue::from_static("identity")); } - request = request.headers(header_map); let options = state.borrow::(); if let Some(request_builder_hook) = options.request_builder_hook { - request = request_builder_hook(request) + request_builder_hook(&mut request) .map_err(|err| type_error(err.to_string()))?; } let cancel_handle = CancelHandle::new_rc(); let cancel_handle_ = cancel_handle.clone(); - let fut = async move { - request - .send() - .or_cancel(cancel_handle_) - .await - .map(|res| res.map_err(|err| err.into())) - }; + let fut = + async move { client.send(request).or_cancel(cancel_handle_).await }; - let request_rid = state - .resource_table - .add(FetchRequestResource(Box::pin(fut))); + let request_rid = state.resource_table.add(FetchRequestResource { + future: Box::pin(fut), + url, + }); let cancel_handle_rid = state.resource_table.add(FetchCancelHandle(cancel_handle)); @@ -448,17 +478,21 @@ where let (body, _) = data_url .decode_to_vec() .map_err(|e| type_error(format!("{e:?}")))?; + let body = http_body_util::Full::new(body.into()) + .map_err(|never| match never {}) + .boxed(); let response = http::Response::builder() .status(http::StatusCode::OK) .header(http::header::CONTENT_TYPE, data_url.mime_type().to_string()) - .body(reqwest::Body::from(body))?; + .body(body)?; - let fut = async move { Ok(Ok(Response::from(response))) }; + let fut = async move { Ok(Ok(response)) }; - let request_rid = state - .resource_table - .add(FetchRequestResource(Box::pin(fut))); + let request_rid = state.resource_table.add(FetchRequestResource { + future: Box::pin(fut), + url, + }); (request_rid, None) } @@ -505,24 +539,21 @@ pub async fn op_fetch_send( .ok() .expect("multiple op_fetch_send ongoing"); - let res = match request.0.await { + let res = match request.future.await { Ok(Ok(res)) => res, Ok(Err(err)) => { // We're going to try and rescue the error cause from a stream and return it from this fetch. - // If any error in the chain is a reqwest body error, return that as a special result we can use to + // If any error in the chain is a hyper body error, return that as a special result we can use to // reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`). // TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead let mut err_ref: &dyn std::error::Error = err.as_ref(); while let Some(err) = std::error::Error::source(err_ref) { - if let Some(err) = err.downcast_ref::() { - if err.is_body() { - // Extracts the next error cause and uses that for the message - if let Some(err) = std::error::Error::source(err) { - return Ok(FetchResponse { - error: Some(err.to_string()), - ..Default::default() - }); - } + if let Some(err) = err.downcast_ref::() { + if let Some(err) = std::error::Error::source(err) { + return Ok(FetchResponse { + error: Some(err.to_string()), + ..Default::default() + }); } } err_ref = err; @@ -534,14 +565,17 @@ pub async fn op_fetch_send( }; let status = res.status(); - let url = res.url().to_string(); + let url = request.url.into(); let mut res_headers = Vec::new(); for (key, val) in res.headers().iter() { res_headers.push((key.as_str().into(), val.as_bytes().into())); } - let content_length = res.content_length(); - let remote_addr = res.remote_addr(); + let content_length = hyper::body::Body::size_hint(res.body()).exact(); + let remote_addr = res + .extensions() + .get::() + .map(|info| info.remote_addr()); let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr { (Some(addr.ip().to_string()), Some(addr.port())) } else { @@ -585,7 +619,8 @@ pub async fn op_fetch_response_upgrade( let upgraded = raw_response.upgrade().await?; { // Stage 3: Pump the data - let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); + let (mut upgraded_rx, mut upgraded_tx) = + tokio::io::split(TokioIo::new(upgraded)); spawn(async move { let mut buf = [0; 1024]; @@ -673,11 +708,13 @@ impl Resource for UpgradeStream { } } -type CancelableResponseResult = Result, Canceled>; +type CancelableResponseResult = + Result, AnyError>, Canceled>; -pub struct FetchRequestResource( - pub Pin>>, -); +pub struct FetchRequestResource { + pub future: Pin>>, + pub url: Url, +} impl Resource for FetchRequestResource { fn name(&self) -> Cow { @@ -701,7 +738,7 @@ type BytesStream = Pin> + Unpin>>; pub enum FetchResponseReader { - Start(Response), + Start(http::Response), BodyReader(Peekable), } @@ -719,7 +756,7 @@ pub struct FetchResponseResource { } impl FetchResponseResource { - pub fn new(response: Response, size: Option) -> Self { + pub fn new(response: http::Response, size: Option) -> Self { Self { response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)), cancel: CancelHandle::default(), @@ -727,10 +764,10 @@ impl FetchResponseResource { } } - pub async fn upgrade(self) -> Result { + pub async fn upgrade(self) -> Result { let reader = self.response_reader.into_inner(); match reader { - FetchResponseReader::Start(resp) => Ok(resp.upgrade().await?), + FetchResponseReader::Start(resp) => Ok(hyper::upgrade::on(resp).await?), _ => unreachable!(), } } @@ -754,11 +791,12 @@ impl Resource for FetchResponseResource { match std::mem::take(&mut *reader) { FetchResponseReader::Start(resp) => { - let stream: BytesStream = Box::pin(resp.bytes_stream().map(|r| { - r.map_err(|err| { - std::io::Error::new(std::io::ErrorKind::Other, err) - }) - })); + let stream: BytesStream = + Box::pin(resp.into_body().into_data_stream().map(|r| { + r.map_err(|err| { + std::io::Error::new(std::io::ErrorKind::Other, err) + }) + })); *reader = FetchResponseReader::BodyReader(stream.peekable()); } FetchResponseReader::BodyReader(_) => unreachable!(), @@ -922,7 +960,7 @@ impl Default for CreateHttpClientOptions { } } -/// Create new instance of async reqwest::Client. This client supports +/// Create new instance of async Client. This client supports /// proxies and doesn't follow redirects. pub fn create_http_client( user_agent: &str, @@ -944,43 +982,64 @@ pub fn create_http_client( alpn_protocols.push("http/1.1".into()); } tls_config.alpn_protocols = alpn_protocols; + let tls_config = Arc::from(tls_config); - let mut headers = HeaderMap::new(); - headers.insert(USER_AGENT, user_agent.parse().unwrap()); - let mut builder = Client::builder() - .redirect(Policy::none()) - .default_headers(headers) - .use_preconfigured_tls(tls_config); + let mut http_connector = HttpConnector::new(); + http_connector.enforce_http(false); + let connector = HttpsConnector::from((http_connector, tls_config.clone())); + let user_agent = user_agent + .parse::() + .map_err(|_| type_error("illegal characters in User-Agent"))?; + + let mut builder = + hyper_util::client::legacy::Builder::new(TokioExecutor::new()); + builder.timer(TokioTimer::new()); + builder.pool_timer(TokioTimer::new()); + + let mut proxies = proxy::from_env(); if let Some(proxy) = options.proxy { - let mut reqwest_proxy = reqwest::Proxy::all(&proxy.url)?; + let mut intercept = proxy::Intercept::all(&proxy.url) + .ok_or_else(|| type_error("invalid proxy url"))?; if let Some(basic_auth) = &proxy.basic_auth { - reqwest_proxy = - reqwest_proxy.basic_auth(&basic_auth.username, &basic_auth.password); + intercept.set_auth(&basic_auth.username, &basic_auth.password); } - builder = builder.proxy(reqwest_proxy); + proxies.prepend(intercept); } + let proxies = Arc::new(proxies); + let mut connector = + proxy::ProxyConnector::new(proxies.clone(), connector, tls_config); + connector.user_agent(user_agent.clone()); if let Some(pool_max_idle_per_host) = options.pool_max_idle_per_host { - builder = builder.pool_max_idle_per_host(pool_max_idle_per_host); + builder.pool_max_idle_per_host(pool_max_idle_per_host); } if let Some(pool_idle_timeout) = options.pool_idle_timeout { - builder = builder.pool_idle_timeout( + builder.pool_idle_timeout( pool_idle_timeout.map(std::time::Duration::from_millis), ); } match (options.http1, options.http2) { - (true, false) => builder = builder.http1_only(), - (false, true) => builder = builder.http2_prior_knowledge(), + (true, false) => {} // noop, handled by ALPN above + (false, true) => { + builder.http2_only(true); + } (true, true) => {} (false, false) => { return Err(type_error("Either `http1` or `http2` needs to be true")) } } - builder.build().map_err(|e| e.into()) + let pooled_client = builder.build(connector); + let decompress = Decompression::new(pooled_client).gzip(true).br(true); + + Ok(Client { + inner: decompress, + proxies, + user_agent, + }) } #[op2] @@ -990,3 +1049,35 @@ pub fn op_utf8_to_byte_string( ) -> Result { Ok(input.into()) } + +#[derive(Clone, Debug)] +pub struct Client { + inner: Decompression>, + // Used to check whether to include a proxy-authorization header + proxies: Arc, + user_agent: HeaderValue, +} + +type Connector = proxy::ProxyConnector>; + +impl Client { + pub async fn send( + self, + mut req: http::Request, + ) -> Result, AnyError> { + req + .headers_mut() + .entry(USER_AGENT) + .or_insert_with(|| self.user_agent.clone()); + + if let Some(auth) = self.proxies.http_forward_auth(req.uri()) { + req.headers_mut().insert(PROXY_AUTHORIZATION, auth.clone()); + } + + let resp = self.inner.oneshot(req).await?; + Ok(resp.map(|b| b.map_err(|e| anyhow!(e)).boxed())) + } +} + +pub type ReqBody = http_body_util::combinators::BoxBody; +pub type ResBody = http_body_util::combinators::BoxBody; diff --git a/ext/fetch/proxy.rs b/ext/fetch/proxy.rs new file mode 100644 index 0000000000..db187c3f68 --- /dev/null +++ b/ext/fetch/proxy.rs @@ -0,0 +1,860 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +//! Parts of this module should be able to be replaced with other crates +//! eventually, once generic versions appear in hyper-util, et al. + +use std::env; +use std::future::Future; +use std::net::IpAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +use deno_core::futures::TryFutureExt; +use deno_tls::rustls::ClientConfig as TlsConfig; + +use http::header::HeaderValue; +use http::uri::Scheme; +use http::Uri; +use hyper_util::client::legacy::connect::Connected; +use hyper_util::client::legacy::connect::Connection; +use hyper_util::rt::TokioIo; +use ipnet::IpNet; +use tokio::net::TcpStream; +use tokio_rustls::client::TlsStream; +use tokio_rustls::TlsConnector; +use tokio_socks::tcp::Socks5Stream; +use tower_service::Service; + +#[derive(Debug, Clone)] +pub(crate) struct ProxyConnector { + connector: C, + proxies: Arc, + tls: Arc, + user_agent: Option, +} + +#[derive(Debug)] +pub(crate) struct Proxies { + no: Option, + intercepts: Vec, +} + +#[derive(Clone)] +pub(crate) struct Intercept { + filter: Filter, + target: Target, +} + +#[derive(Clone)] +enum Target { + Http { + dst: Uri, + auth: Option, + }, + Https { + dst: Uri, + auth: Option, + }, + Socks { + dst: Uri, + auth: Option<(String, String)>, + }, +} + +#[derive(Debug, Clone, Copy)] +enum Filter { + Http, + Https, + All, +} + +pub(crate) fn from_env() -> Proxies { + let mut intercepts = Vec::new(); + + if let Some(proxy) = parse_env_var("ALL_PROXY", Filter::All) { + intercepts.push(proxy); + } else if let Some(proxy) = parse_env_var("all_proxy", Filter::All) { + intercepts.push(proxy); + } + + if let Some(proxy) = parse_env_var("HTTPS_PROXY", Filter::Https) { + intercepts.push(proxy); + } else if let Some(proxy) = parse_env_var("https_proxy", Filter::Https) { + intercepts.push(proxy); + } + + // In a CGI context, headers become environment variables. So, "Proxy:" becomes HTTP_PROXY. + // To prevent an attacker from injecting a proxy, check if we are in CGI. + if env::var_os("REQUEST_METHOD").is_none() { + if let Some(proxy) = parse_env_var("HTTP_PROXY", Filter::Http) { + intercepts.push(proxy); + } else if let Some(proxy) = parse_env_var("http_proxy", Filter::Https) { + intercepts.push(proxy); + } + } + + let no = NoProxy::from_env(); + + Proxies { intercepts, no } +} + +pub(crate) fn basic_auth(user: &str, pass: &str) -> HeaderValue { + use base64::prelude::BASE64_STANDARD; + use base64::write::EncoderWriter; + use std::io::Write; + + let mut buf = b"Basic ".to_vec(); + { + let mut encoder = EncoderWriter::new(&mut buf, &BASE64_STANDARD); + let _ = write!(encoder, "{user}:{pass}"); + } + let mut header = + HeaderValue::from_bytes(&buf).expect("base64 is always valid HeaderValue"); + header.set_sensitive(true); + header +} + +fn parse_env_var(name: &str, filter: Filter) -> Option { + let val = env::var(name).ok()?; + let target = Target::parse(&val)?; + Some(Intercept { filter, target }) +} + +impl Intercept { + pub(crate) fn all(s: &str) -> Option { + let target = Target::parse(s)?; + Some(Intercept { + filter: Filter::All, + target, + }) + } + + pub(crate) fn set_auth(&mut self, user: &str, pass: &str) { + match self.target { + Target::Http { ref mut auth, .. } => { + *auth = Some(basic_auth(user, pass)); + } + Target::Https { ref mut auth, .. } => { + *auth = Some(basic_auth(user, pass)); + } + Target::Socks { ref mut auth, .. } => { + *auth = Some((user.into(), pass.into())); + } + } + } +} + +impl std::fmt::Debug for Intercept { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Intercept") + .field("filter", &self.filter) + .finish() + } +} + +impl Target { + fn parse(val: &str) -> Option { + let uri = val.parse::().ok()?; + + let mut builder = Uri::builder(); + let mut is_socks = false; + let mut http_auth = None; + let mut socks_auth = None; + + builder = builder.scheme(match uri.scheme() { + Some(s) => { + if s == &Scheme::HTTP || s == &Scheme::HTTPS { + s.clone() + } else if s.as_str() == "socks5" || s.as_str() == "socks5h" { + is_socks = true; + s.clone() + } else { + // can't use this proxy scheme + return None; + } + } + // if no scheme provided, assume they meant 'http' + None => Scheme::HTTP, + }); + + let authority = uri.authority()?; + + if let Some((userinfo, host_port)) = authority.as_str().split_once('@') { + let (user, pass) = userinfo.split_once(':')?; + if is_socks { + socks_auth = Some((user.into(), pass.into())); + } else { + http_auth = Some(basic_auth(user, pass)); + } + builder = builder.authority(host_port); + } else { + builder = builder.authority(authority.clone()); + } + + // removing any path, but we MUST specify one or the builder errors + builder = builder.path_and_query("/"); + + let dst = builder.build().ok()?; + + let target = match dst.scheme().unwrap().as_str() { + "https" => Target::Https { + dst, + auth: http_auth, + }, + "http" => Target::Http { + dst, + auth: http_auth, + }, + "socks5" | "socks5h" => Target::Socks { + dst, + auth: socks_auth, + }, + // shouldn't happen + _ => return None, + }; + + Some(target) + } +} + +#[derive(Debug)] +struct NoProxy { + domains: DomainMatcher, + ips: IpMatcher, +} + +/// Represents a possible matching entry for an IP address +#[derive(Clone, Debug)] +enum Ip { + Address(IpAddr), + Network(IpNet), +} + +/// A wrapper around a list of IP cidr blocks or addresses with a [IpMatcher::contains] method for +/// checking if an IP address is contained within the matcher +#[derive(Clone, Debug, Default)] +struct IpMatcher(Vec); + +/// A wrapper around a list of domains with a [DomainMatcher::contains] method for checking if a +/// domain is contained within the matcher +#[derive(Clone, Debug, Default)] +struct DomainMatcher(Vec); + +impl NoProxy { + /// Returns a new no-proxy configuration based on environment variables (or `None` if no variables are set) + /// see [self::NoProxy::from_string()] for the string format + fn from_env() -> Option { + let raw = env::var("NO_PROXY") + .or_else(|_| env::var("no_proxy")) + .unwrap_or_default(); + + Self::from_string(&raw) + } + + /// Returns a new no-proxy configuration based on a `no_proxy` string (or `None` if no variables + /// are set) + /// The rules are as follows: + /// * The environment variable `NO_PROXY` is checked, if it is not set, `no_proxy` is checked + /// * If neither environment variable is set, `None` is returned + /// * Entries are expected to be comma-separated (whitespace between entries is ignored) + /// * IP addresses (both IPv4 and IPv6) are allowed, as are optional subnet masks (by adding /size, + /// for example "`192.168.1.0/24`"). + /// * An entry "`*`" matches all hostnames (this is the only wildcard allowed) + /// * Any other entry is considered a domain name (and may contain a leading dot, for example `google.com` + /// and `.google.com` are equivalent) and would match both that domain AND all subdomains. + /// + /// For example, if `"NO_PROXY=google.com, 192.168.1.0/24"` was set, all of the following would match + /// (and therefore would bypass the proxy): + /// * `http://google.com/` + /// * `http://www.google.com/` + /// * `http://192.168.1.42/` + /// + /// The URL `http://notgoogle.com/` would not match. + fn from_string(no_proxy_list: &str) -> Option { + if no_proxy_list.is_empty() { + return None; + } + let mut ips = Vec::new(); + let mut domains = Vec::new(); + let parts = no_proxy_list.split(',').map(str::trim); + for part in parts { + match part.parse::() { + // If we can parse an IP net or address, then use it, otherwise, assume it is a domain + Ok(ip) => ips.push(Ip::Network(ip)), + Err(_) => match part.parse::() { + Ok(addr) => ips.push(Ip::Address(addr)), + Err(_) => domains.push(part.to_owned()), + }, + } + } + Some(NoProxy { + ips: IpMatcher(ips), + domains: DomainMatcher(domains), + }) + } + + fn contains(&self, host: &str) -> bool { + // According to RFC3986, raw IPv6 hosts will be wrapped in []. So we need to strip those off + // the end in order to parse correctly + let host = if host.starts_with('[') { + let x: &[_] = &['[', ']']; + host.trim_matches(x) + } else { + host + }; + match host.parse::() { + // If we can parse an IP addr, then use it, otherwise, assume it is a domain + Ok(ip) => self.ips.contains(ip), + Err(_) => self.domains.contains(host), + } + } +} + +impl IpMatcher { + fn contains(&self, addr: IpAddr) -> bool { + for ip in &self.0 { + match ip { + Ip::Address(address) => { + if &addr == address { + return true; + } + } + Ip::Network(net) => { + if net.contains(&addr) { + return true; + } + } + } + } + false + } +} + +impl DomainMatcher { + // The following links may be useful to understand the origin of these rules: + // * https://curl.se/libcurl/c/CURLOPT_NOPROXY.html + // * https://github.com/curl/curl/issues/1208 + fn contains(&self, domain: &str) -> bool { + let domain_len = domain.len(); + for d in &self.0 { + if d == domain || d.strip_prefix('.') == Some(domain) { + return true; + } else if domain.ends_with(d) { + if d.starts_with('.') { + // If the first character of d is a dot, that means the first character of domain + // must also be a dot, so we are looking at a subdomain of d and that matches + return true; + } else if domain.as_bytes().get(domain_len - d.len() - 1) == Some(&b'.') + { + // Given that d is a prefix of domain, if the prior character in domain is a dot + // then that means we must be matching a subdomain of d, and that matches + return true; + } + } else if d == "*" { + return true; + } + } + false + } +} + +impl ProxyConnector { + pub(crate) fn new( + proxies: Arc, + connector: C, + tls: Arc, + ) -> Self { + ProxyConnector { + connector, + proxies, + tls, + user_agent: None, + } + } + + pub(crate) fn user_agent(&mut self, val: HeaderValue) { + self.user_agent = Some(val); + } + + fn intercept(&self, dst: &Uri) -> Option<&Intercept> { + self.proxies.intercept(dst) + } +} + +impl Proxies { + pub(crate) fn prepend(&mut self, intercept: Intercept) { + self.intercepts.insert(0, intercept); + } + + pub(crate) fn http_forward_auth(&self, dst: &Uri) -> Option<&HeaderValue> { + let intercept = self.intercept(dst)?; + match intercept.target { + // Only if the proxy target is http + Target::Http { ref auth, .. } => auth.as_ref(), + _ => None, + } + } + + fn intercept(&self, dst: &Uri) -> Option<&Intercept> { + if let Some(no_proxy) = self.no.as_ref() { + if no_proxy.contains(dst.host()?) { + return None; + } + } + + for intercept in &self.intercepts { + return match ( + intercept.filter, + dst.scheme().map(Scheme::as_str).unwrap_or(""), + ) { + (Filter::All, _) => Some(intercept), + (Filter::Https, "https") => Some(intercept), + (Filter::Http, "http") => Some(intercept), + _ => continue, + }; + } + None + } +} + +type BoxFuture = Pin + Send>>; +type BoxError = Box; + +// These variatns are not to be inspected. +pub enum Proxied { + /// Not proxied + PassThrough(T), + /// An HTTP forwarding proxy needed absolute-form + HttpForward(T), + /// Tunneled through HTTP CONNECT + HttpTunneled(Box>>>), + /// Tunneled through SOCKS + Socks(TokioIo), + /// Tunneled through SOCKS and TLS + SocksTls(TokioIo>>>), +} + +impl Service for ProxyConnector +where + C: Service, + C::Response: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static, + C::Future: Send + 'static, + C::Error: Into + 'static, +{ + type Response = Proxied; + type Error = BoxError; + type Future = BoxFuture>; + + fn poll_ready( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + self.connector.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, orig_dst: Uri) -> Self::Future { + if let Some(intercept) = self.intercept(&orig_dst).cloned() { + let is_https = orig_dst.scheme() == Some(&Scheme::HTTPS); + let user_agent = self.user_agent.clone(); + return match intercept.target { + Target::Http { + dst: proxy_dst, + auth, + } + | Target::Https { + dst: proxy_dst, + auth, + } => { + let connecting = self.connector.call(proxy_dst); + let tls = TlsConnector::from(self.tls.clone()); + Box::pin(async move { + let mut io = connecting.await.map_err(Into::into)?; + + if is_https { + tunnel(&mut io, &orig_dst, user_agent, auth).await?; + let tokio_io = TokioIo::new(io); + let io = tls + .connect( + TryFrom::try_from(orig_dst.host().unwrap().to_owned())?, + tokio_io, + ) + .await?; + Ok(Proxied::HttpTunneled(Box::new(TokioIo::new(io)))) + } else { + Ok(Proxied::HttpForward(io)) + } + }) + } + Target::Socks { + dst: proxy_dst, + auth, + } => { + let tls = TlsConnector::from(self.tls.clone()); + Box::pin(async move { + let socks_addr = ( + proxy_dst.host().unwrap(), + proxy_dst.port().map(|p| p.as_u16()).unwrap_or(1080), + ); + let host = orig_dst.host().ok_or("no host in url")?; + let port = match orig_dst.port() { + Some(p) => p.as_u16(), + None if is_https => 443, + _ => 80, + }; + let io = if let Some((user, pass)) = auth { + Socks5Stream::connect_with_password( + socks_addr, + (host, port), + &user, + &pass, + ) + .await? + } else { + Socks5Stream::connect(socks_addr, (host, port)).await? + }; + let io = TokioIo::new(io.into_inner()); + + if is_https { + let tokio_io = TokioIo::new(io); + let io = tls + .connect(TryFrom::try_from(host.to_owned())?, tokio_io) + .await?; + Ok(Proxied::SocksTls(TokioIo::new(io))) + } else { + Ok(Proxied::Socks(io)) + } + }) + } + }; + } + Box::pin( + self + .connector + .call(orig_dst) + .map_ok(Proxied::PassThrough) + .map_err(Into::into), + ) + } +} + +async fn tunnel( + io: &mut T, + dst: &Uri, + user_agent: Option, + auth: Option, +) -> Result<(), BoxError> +where + T: hyper::rt::Read + hyper::rt::Write + Unpin, +{ + use tokio::io::AsyncReadExt; + use tokio::io::AsyncWriteExt; + + let host = dst.host().expect("proxy dst has host"); + let port = match dst.port() { + Some(p) => p.as_u16(), + None => match dst.scheme().map(Scheme::as_str).unwrap_or("") { + "https" => 443, + "http" => 80, + _ => return Err("proxy dst unexpected scheme".into()), + }, + }; + + let mut buf = format!( + "\ + CONNECT {host}:{port} HTTP/1.1\r\n\ + Host: {host}:{port}\r\n\ + " + ) + .into_bytes(); + + // user-agent + if let Some(user_agent) = user_agent { + buf.extend_from_slice(b"User-Agent: "); + buf.extend_from_slice(user_agent.as_bytes()); + buf.extend_from_slice(b"\r\n"); + } + + // proxy-authorization + if let Some(value) = auth { + buf.extend_from_slice(b"Proxy-Authorization: "); + buf.extend_from_slice(value.as_bytes()); + buf.extend_from_slice(b"\r\n"); + } + + // headers end + buf.extend_from_slice(b"\r\n"); + + let mut tokio_conn = TokioIo::new(io); + + tokio_conn.write_all(&buf).await?; + + let mut buf = [0; 8192]; + let mut pos = 0; + + loop { + let n = tokio_conn.read(&mut buf[pos..]).await?; + + if n == 0 { + return Err("unexpected eof while tunneling".into()); + } + pos += n; + + let recvd = &buf[..pos]; + if recvd.starts_with(b"HTTP/1.1 200") || recvd.starts_with(b"HTTP/1.0 200") + { + if recvd.ends_with(b"\r\n\r\n") { + return Ok(()); + } + if pos == buf.len() { + return Err("proxy headers too long for tunnel".into()); + } + // else read more + } else if recvd.starts_with(b"HTTP/1.1 407") { + return Err("proxy authentication required".into()); + } else { + return Err("unsuccessful tunnel".into()); + } + } +} + +impl hyper::rt::Read for Proxied +where + T: hyper::rt::Read + hyper::rt::Write + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + match *self { + Proxied::PassThrough(ref mut p) => Pin::new(p).poll_read(cx, buf), + Proxied::HttpForward(ref mut p) => Pin::new(p).poll_read(cx, buf), + Proxied::HttpTunneled(ref mut p) => Pin::new(p).poll_read(cx, buf), + Proxied::Socks(ref mut p) => Pin::new(p).poll_read(cx, buf), + Proxied::SocksTls(ref mut p) => Pin::new(p).poll_read(cx, buf), + } + } +} + +impl hyper::rt::Write for Proxied +where + T: hyper::rt::Read + hyper::rt::Write + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match *self { + Proxied::PassThrough(ref mut p) => Pin::new(p).poll_write(cx, buf), + Proxied::HttpForward(ref mut p) => Pin::new(p).poll_write(cx, buf), + Proxied::HttpTunneled(ref mut p) => Pin::new(p).poll_write(cx, buf), + Proxied::Socks(ref mut p) => Pin::new(p).poll_write(cx, buf), + Proxied::SocksTls(ref mut p) => Pin::new(p).poll_write(cx, buf), + } + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match *self { + Proxied::PassThrough(ref mut p) => Pin::new(p).poll_flush(cx), + Proxied::HttpForward(ref mut p) => Pin::new(p).poll_flush(cx), + Proxied::HttpTunneled(ref mut p) => Pin::new(p).poll_flush(cx), + Proxied::Socks(ref mut p) => Pin::new(p).poll_flush(cx), + Proxied::SocksTls(ref mut p) => Pin::new(p).poll_flush(cx), + } + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match *self { + Proxied::PassThrough(ref mut p) => Pin::new(p).poll_shutdown(cx), + Proxied::HttpForward(ref mut p) => Pin::new(p).poll_shutdown(cx), + Proxied::HttpTunneled(ref mut p) => Pin::new(p).poll_shutdown(cx), + Proxied::Socks(ref mut p) => Pin::new(p).poll_shutdown(cx), + Proxied::SocksTls(ref mut p) => Pin::new(p).poll_shutdown(cx), + } + } + + fn is_write_vectored(&self) -> bool { + match *self { + Proxied::PassThrough(ref p) => p.is_write_vectored(), + Proxied::HttpForward(ref p) => p.is_write_vectored(), + Proxied::HttpTunneled(ref p) => p.is_write_vectored(), + Proxied::Socks(ref p) => p.is_write_vectored(), + Proxied::SocksTls(ref p) => p.is_write_vectored(), + } + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + match *self { + Proxied::PassThrough(ref mut p) => { + Pin::new(p).poll_write_vectored(cx, bufs) + } + Proxied::HttpForward(ref mut p) => { + Pin::new(p).poll_write_vectored(cx, bufs) + } + Proxied::HttpTunneled(ref mut p) => { + Pin::new(p).poll_write_vectored(cx, bufs) + } + Proxied::Socks(ref mut p) => Pin::new(p).poll_write_vectored(cx, bufs), + Proxied::SocksTls(ref mut p) => Pin::new(p).poll_write_vectored(cx, bufs), + } + } +} + +impl Connection for Proxied +where + T: Connection, +{ + fn connected(&self) -> Connected { + match self { + Proxied::PassThrough(ref p) => p.connected(), + Proxied::HttpForward(ref p) => p.connected().proxy(true), + Proxied::HttpTunneled(ref p) => p.inner().get_ref().0.connected(), + Proxied::Socks(ref p) => p.connected(), + Proxied::SocksTls(ref p) => p.inner().get_ref().0.connected(), + } + } +} + +#[test] +fn test_proxy_parse_from_env() { + fn parse(s: &str) -> Target { + Target::parse(s).unwrap() + } + + // normal + match parse("http://127.0.0.1:6666") { + Target::Http { dst, auth } => { + assert_eq!(dst, "http://127.0.0.1:6666"); + assert!(auth.is_none()); + } + _ => panic!("bad target"), + } + + // without scheme + match parse("127.0.0.1:6666") { + Target::Http { dst, auth } => { + assert_eq!(dst, "http://127.0.0.1:6666"); + assert!(auth.is_none()); + } + _ => panic!("bad target"), + } + + // with userinfo + match parse("user:pass@127.0.0.1:6666") { + Target::Http { dst, auth } => { + assert_eq!(dst, "http://127.0.0.1:6666"); + assert!(auth.is_some()); + assert!(auth.unwrap().is_sensitive()); + } + _ => panic!("bad target"), + } + + // socks + match parse("socks5://user:pass@127.0.0.1:6666") { + Target::Socks { dst, auth } => { + assert_eq!(dst, "socks5://127.0.0.1:6666"); + assert!(auth.is_some()); + } + _ => panic!("bad target"), + } + + // socks5h + match parse("socks5h://localhost:6666") { + Target::Socks { dst, auth } => { + assert_eq!(dst, "socks5h://localhost:6666"); + assert!(auth.is_none()); + } + _ => panic!("bad target"), + } +} + +#[test] +fn test_domain_matcher() { + let domains = vec![".foo.bar".into(), "bar.foo".into()]; + let matcher = DomainMatcher(domains); + + // domains match with leading `.` + assert!(matcher.contains("foo.bar")); + // subdomains match with leading `.` + assert!(matcher.contains("www.foo.bar")); + + // domains match with no leading `.` + assert!(matcher.contains("bar.foo")); + // subdomains match with no leading `.` + assert!(matcher.contains("www.bar.foo")); + + // non-subdomain string prefixes don't match + assert!(!matcher.contains("notfoo.bar")); + assert!(!matcher.contains("notbar.foo")); +} + +#[test] +fn test_no_proxy_wildcard() { + let no_proxy = NoProxy::from_string("*").unwrap(); + assert!(no_proxy.contains("any.where")); +} + +#[test] +fn test_no_proxy_ip_ranges() { + let no_proxy = NoProxy::from_string( + ".foo.bar, bar.baz,10.42.1.1/24,::1,10.124.7.8,2001::/17", + ) + .unwrap(); + + let should_not_match = [ + // random url, not in no_proxy + "deno.com", + // make sure that random non-subdomain string prefixes don't match + "notfoo.bar", + // make sure that random non-subdomain string prefixes don't match + "notbar.baz", + // ipv4 address out of range + "10.43.1.1", + // ipv4 address out of range + "10.124.7.7", + // ipv6 address out of range + "[ffff:db8:a0b:12f0::1]", + // ipv6 address out of range + "[2005:db8:a0b:12f0::1]", + ]; + + for host in &should_not_match { + assert!(!no_proxy.contains(host), "should not contain {:?}", host); + } + + let should_match = [ + // make sure subdomains (with leading .) match + "hello.foo.bar", + // make sure exact matches (without leading .) match (also makes sure spaces between entries work) + "bar.baz", + // make sure subdomains (without leading . in no_proxy) match + "foo.bar.baz", + // make sure subdomains (without leading . in no_proxy) match - this differs from cURL + "foo.bar", + // ipv4 address match within range + "10.42.1.100", + // ipv6 address exact match + "[::1]", + // ipv6 address match within range + "[2001:db8:a0b:12f0::1]", + // ipv4 address exact match + "10.124.7.8", + ]; + + for host in &should_match { + assert!(no_proxy.contains(host), "should contain {:?}", host); + } +} diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml index e4249afead..4556eb23c1 100644 --- a/ext/kv/Cargo.toml +++ b/ext/kv/Cargo.toml @@ -29,6 +29,7 @@ denokv_remote.workspace = true denokv_sqlite.workspace = true faster-hex.workspace = true http.workspace = true +http-body-util.workspace = true log.workspace = true num-bigint.workspace = true prost.workspace = true diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs index 7541b5a06a..922853588a 100644 --- a/ext/kv/remote.rs +++ b/ext/kv/remote.rs @@ -12,10 +12,8 @@ use bytes::Bytes; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::Stream; -use deno_core::futures::TryStreamExt as _; use deno_core::OpState; use deno_fetch::create_http_client; -use deno_fetch::reqwest; use deno_fetch::CreateHttpClientOptions; use deno_tls::rustls::RootCertStore; use deno_tls::Proxy; @@ -25,6 +23,7 @@ use denokv_remote::MetadataEndpoint; use denokv_remote::Remote; use denokv_remote::RemoteResponse; use denokv_remote::RemoteTransport; +use http_body_util::BodyExt; use url::Url; #[derive(Clone)] @@ -109,35 +108,43 @@ impl denokv_remote::RemotePermissions } #[derive(Clone)] -pub struct ReqwestClient(reqwest::Client); -pub struct ReqwestResponse(reqwest::Response); +pub struct FetchClient(deno_fetch::Client); +pub struct FetchResponse(http::Response); -impl RemoteTransport for ReqwestClient { - type Response = ReqwestResponse; +impl RemoteTransport for FetchClient { + type Response = FetchResponse; async fn post( &self, url: Url, headers: http::HeaderMap, body: Bytes, ) -> Result<(Url, http::StatusCode, Self::Response), anyhow::Error> { - let res = self.0.post(url).headers(headers).body(body).send().await?; - let url = res.url().clone(); + let body = http_body_util::Full::new(body) + .map_err(|never| match never {}) + .boxed(); + let mut req = http::Request::new(body); + *req.method_mut() = http::Method::POST; + *req.uri_mut() = url.as_str().parse()?; + *req.headers_mut() = headers; + + let res = self.0.clone().send(req).await?; let status = res.status(); - Ok((url, status, ReqwestResponse(res))) + Ok((url, status, FetchResponse(res))) } } -impl RemoteResponse for ReqwestResponse { +impl RemoteResponse for FetchResponse { async fn bytes(self) -> Result { - Ok(self.0.bytes().await?) + Ok(self.0.collect().await?.to_bytes()) } fn stream( self, ) -> impl Stream> + Send + Sync { - self.0.bytes_stream().map_err(|e| e.into()) + self.0.into_body().into_data_stream() } async fn text(self) -> Result { - Ok(self.0.text().await?) + let bytes = self.bytes().await?; + Ok(std::str::from_utf8(&bytes)?.into()) } } @@ -145,7 +152,7 @@ impl RemoteResponse for ReqwestResponse { impl DatabaseHandler for RemoteDbHandler

{ - type DB = Remote, ReqwestClient>; + type DB = Remote, FetchClient>; async fn open( &self, @@ -201,14 +208,14 @@ impl DatabaseHandler http2: true, }, )?; - let reqwest_client = ReqwestClient(client); + let fetch_client = FetchClient(client); let permissions = PermissionChecker { state: state.clone(), _permissions: PhantomData, }; - let remote = Remote::new(reqwest_client, permissions, metadata_endpoint); + let remote = Remote::new(fetch_client, permissions, metadata_endpoint); Ok(remote) } diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index c618ea58d2..3f63011c09 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -42,6 +42,7 @@ h2.workspace = true hkdf.workspace = true home = "0.5.9" http.workspace = true +http-body-util.workspace = true idna = "0.3.0" indexmap.workspace = true ipnetwork = "0.20.0" @@ -64,7 +65,6 @@ pbkdf2 = "0.12.1" pin-project-lite = "0.2.13" rand.workspace = true regex.workspace = true -reqwest.workspace = true ring.workspace = true ripemd = { version = "0.1.3", features = ["oid"] } rsa.workspace = true diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index a6d999330d..89024e3f31 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -15,12 +15,12 @@ use deno_fetch::FetchRequestResource; use deno_fetch::FetchReturn; use deno_fetch::HttpClientResource; use deno_fetch::ResourceToBodyAdapter; -use reqwest::header::HeaderMap; -use reqwest::header::HeaderName; -use reqwest::header::HeaderValue; -use reqwest::header::CONTENT_LENGTH; -use reqwest::Body; -use reqwest::Method; +use http::header::HeaderMap; +use http::header::HeaderName; +use http::header::HeaderValue; +use http::header::CONTENT_LENGTH; +use http::Method; +use http_body_util::BodyExt; #[op2] #[serde] @@ -60,34 +60,54 @@ where header_map.append(name, v); } - let mut request = client.request(method.clone(), url).headers(header_map); - - if let Some(body) = body { - request = request.body(Body::wrap_stream(ResourceToBodyAdapter::new( - state.resource_table.take_any(body)?, - ))); + let (body, con_len) = if let Some(body) = body { + ( + ResourceToBodyAdapter::new(state.resource_table.take_any(body)?).boxed(), + None, + ) } else { // POST and PUT requests should always have a 0 length content-length, // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch - if matches!(method, Method::POST | Method::PUT) { - request = request.header(CONTENT_LENGTH, HeaderValue::from(0)); - } + let len = if matches!(method, Method::POST | Method::PUT) { + Some(0) + } else { + None + }; + ( + http_body_util::Empty::new() + .map_err(|never| match never {}) + .boxed(), + len, + ) }; + let mut request = http::Request::new(body); + *request.method_mut() = method.clone(); + *request.uri_mut() = url + .as_str() + .parse() + .map_err(|_| type_error("Invalid URL"))?; + *request.headers_mut() = header_map; + + if let Some(len) = con_len { + request.headers_mut().insert(CONTENT_LENGTH, len.into()); + } + let cancel_handle = CancelHandle::new_rc(); let cancel_handle_ = cancel_handle.clone(); let fut = async move { - request - .send() + client + .send(request) .or_cancel(cancel_handle_) .await .map(|res| res.map_err(|err| type_error(err.to_string()))) }; - let request_rid = state - .resource_table - .add(FetchRequestResource(Box::pin(fut))); + let request_rid = state.resource_table.add(FetchRequestResource { + future: Box::pin(fut), + url, + }); let cancel_handle_rid = state.resource_table.add(FetchCancelHandle(cancel_handle)); diff --git a/ext/node/ops/http2.rs b/ext/node/ops/http2.rs index d12e108e66..9f3c12a999 100644 --- a/ext/node/ops/http2.rs +++ b/ext/node/ops/http2.rs @@ -27,12 +27,12 @@ use h2; use h2::Reason; use h2::RecvStream; use http; +use http::header::HeaderName; +use http::header::HeaderValue; use http::request::Parts; use http::HeaderMap; use http::Response; use http::StatusCode; -use reqwest::header::HeaderName; -use reqwest::header::HeaderValue; use url::Url; pub struct Http2Client { diff --git a/runtime/errors.rs b/runtime/errors.rs index 7f2e492503..694402773e 100644 --- a/runtime/errors.rs +++ b/runtime/errors.rs @@ -13,7 +13,6 @@ use deno_core::error::AnyError; use deno_core::serde_json; use deno_core::url; use deno_core::ModuleResolutionError; -use deno_fetch::reqwest; use std::env; use std::error::Error; use std::io; @@ -101,27 +100,6 @@ fn get_regex_error_class(error: ®ex::Error) -> &'static str { } } -fn get_request_error_class(error: &reqwest::Error) -> &'static str { - error - .source() - .and_then(|inner_err| { - (inner_err - .downcast_ref::() - .map(get_io_error_class)) - .or_else(|| { - inner_err - .downcast_ref::() - .map(get_serde_json_error_class) - }) - .or_else(|| { - inner_err - .downcast_ref::() - .map(get_url_parse_error_class) - }) - }) - .unwrap_or("Http") -} - fn get_serde_json_error_class( error: &serde_json::error::Error, ) -> &'static str { @@ -142,7 +120,17 @@ fn get_url_parse_error_class(_error: &url::ParseError) -> &'static str { "URIError" } -fn get_hyper_error_class(_error: &hyper_v014::Error) -> &'static str { +fn get_hyper_error_class(_error: &hyper::Error) -> &'static str { + "Http" +} + +fn get_hyper_util_error_class( + _error: &hyper_util::client::legacy::Error, +) -> &'static str { + "Http" +} + +fn get_hyper_v014_error_class(_error: &hyper_v014::Error) -> &'static str { "Http" } @@ -175,13 +163,18 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { e.downcast_ref::() .map(get_dlopen_error_class) }) + .or_else(|| e.downcast_ref::().map(get_hyper_error_class)) + .or_else(|| { + e.downcast_ref::() + .map(get_hyper_util_error_class) + }) .or_else(|| { e.downcast_ref::() - .map(get_hyper_error_class) + .map(get_hyper_v014_error_class) }) .or_else(|| { e.downcast_ref::>() - .map(|e| get_hyper_error_class(e)) + .map(|e| get_hyper_v014_error_class(e)) }) .or_else(|| { e.downcast_ref::().map(|e| { @@ -202,10 +195,6 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { e.downcast_ref::() .map(get_notify_error_class) }) - .or_else(|| { - e.downcast_ref::() - .map(get_request_error_class) - }) .or_else(|| e.downcast_ref::().map(get_regex_error_class)) .or_else(|| { e.downcast_ref::() diff --git a/runtime/ops/web_worker/sync_fetch.rs b/runtime/ops/web_worker/sync_fetch.rs index 37286ca625..cdb151a86f 100644 --- a/runtime/ops/web_worker/sync_fetch.rs +++ b/runtime/ops/web_worker/sync_fetch.rs @@ -13,6 +13,7 @@ use deno_core::OpState; use deno_fetch::data_url::DataUrl; use deno_web::BlobStore; use deno_websocket::DomExceptionNetworkError; +use http_body_util::BodyExt; use hyper::body::Bytes; use serde::Deserialize; use serde::Serialize; @@ -78,10 +79,23 @@ pub fn op_worker_sync_fetch( let (body, mime_type, res_url) = match script_url.scheme() { "http" | "https" => { - let resp = - client.get(script_url).send().await?.error_for_status()?; + let mut req = http::Request::new( + http_body_util::Empty::new() + .map_err(|never| match never {}) + .boxed(), + ); + *req.uri_mut() = script_url.as_str().parse()?; - let res_url = resp.url().to_string(); + let resp = client.send(req).await?; + + if resp.status().is_client_error() + || resp.status().is_server_error() + { + return Err(type_error(format!( + "http status error: {}", + resp.status() + ))); + } // TODO(andreubotella) Properly run fetch's "extract a MIME type". let mime_type = resp @@ -93,9 +107,9 @@ pub fn op_worker_sync_fetch( // Always check the MIME type with HTTP(S). loose_mime_checks = false; - let body = resp.bytes().await?; + let body = resp.collect().await?.to_bytes(); - (body, mime_type, res_url) + (body, mime_type, script) } "data" => { let data_url = DataUrl::process(&script) diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 9d513571b1..34cffbb406 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -53,6 +53,7 @@ once_cell.workspace = true os_pipe.workspace = true pretty_assertions.workspace = true regex.workspace = true +reqwest.workspace = true serde.workspace = true test_util.workspace = true tokio.workspace = true diff --git a/tests/integration/inspector_tests.rs b/tests/integration/inspector_tests.rs index 57831ab46c..fa1b3a9d83 100644 --- a/tests/integration/inspector_tests.rs +++ b/tests/integration/inspector_tests.rs @@ -6,7 +6,7 @@ use deno_core::error::AnyError; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url; -use deno_fetch::reqwest; + use fastwebsockets::FragmentCollector; use fastwebsockets::Frame; use fastwebsockets::WebSocket; diff --git a/tests/integration/npm_tests.rs b/tests/integration/npm_tests.rs index 82f0697d56..dba5190870 100644 --- a/tests/integration/npm_tests.rs +++ b/tests/integration/npm_tests.rs @@ -3,7 +3,7 @@ use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; -use deno_fetch::reqwest; + use pretty_assertions::assert_eq; use test_util as util; use test_util::itest; diff --git a/tests/integration/run_tests.rs b/tests/integration/run_tests.rs index d4d1fea2eb..6a9b47b1aa 100644 --- a/tests/integration/run_tests.rs +++ b/tests/integration/run_tests.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use bytes::Bytes; use deno_core::serde_json::json; use deno_core::url; -use deno_fetch::reqwest; + use deno_tls::rustls; use deno_tls::rustls::ClientConnection; use deno_tls::rustls_pemfile; diff --git a/tests/integration/serve_tests.rs b/tests/integration/serve_tests.rs index 85de068c9c..3d64ce3a30 100644 --- a/tests/integration/serve_tests.rs +++ b/tests/integration/serve_tests.rs @@ -2,7 +2,6 @@ use std::io::Read; -use deno_fetch::reqwest; use pretty_assertions::assert_eq; use regex::Regex; use test_util as util; diff --git a/tests/specs/cert/localhost_unsafe_ssl/localhost_unsafe_ssl.ts.out b/tests/specs/cert/localhost_unsafe_ssl/localhost_unsafe_ssl.ts.out index 3067fffae2..f98c7e4e47 100644 --- a/tests/specs/cert/localhost_unsafe_ssl/localhost_unsafe_ssl.ts.out +++ b/tests/specs/cert/localhost_unsafe_ssl/localhost_unsafe_ssl.ts.out @@ -1,3 +1,3 @@ DANGER: TLS certificate validation is disabled for: deno.land -error: Import 'https://localhost:5545/subdir/mod2.ts' failed: error sending request for url (https://localhost:5545/subdir/mod2.ts) +error: Import 'https://localhost:5545/subdir/mod2.ts' failed: client error[WILDCARD] at file:///[WILDCARD]/cafile_url_imports.ts:[WILDCARD] diff --git a/tests/testdata/run/fetch_async_error_stack.ts.out b/tests/testdata/run/fetch_async_error_stack.ts.out index e8169228fd..06d92d15a4 100644 --- a/tests/testdata/run/fetch_async_error_stack.ts.out +++ b/tests/testdata/run/fetch_async_error_stack.ts.out @@ -1,4 +1,4 @@ -error: Uncaught (in promise) TypeError: error sending request for url[WILDCARD] +error: Uncaught (in promise) TypeError: client error[WILDCARD] await fetch("https://nonexistent.deno.land/"); ^[WILDCARD] at async fetch (ext:[WILDCARD]) diff --git a/tests/unit/fetch_test.ts b/tests/unit/fetch_test.ts index b549be9a43..bc3822d994 100644 --- a/tests/unit/fetch_test.ts +++ b/tests/unit/fetch_test.ts @@ -67,7 +67,7 @@ Deno.test( await fetch(`http://localhost:${port}`); }, TypeError, - "error sending request for url", + "client error (Connect)", ); }, ); @@ -80,7 +80,7 @@ Deno.test( await fetch("http://nil/"); }, TypeError, - "error sending request for url", + "client error (Connect)", ); }, ); @@ -688,7 +688,7 @@ Deno.test( "accept: */*\r\n", "accept-language: *\r\n", `user-agent: Deno/${Deno.version.deno}\r\n`, - "accept-encoding: gzip, br\r\n", + "accept-encoding: gzip,br\r\n", `host: ${addr}\r\n\r\n`, ].join(""); assertEquals(actual, expected); @@ -720,7 +720,7 @@ Deno.test( "accept: text/html\r\n", "accept-language: en-US\r\n", `user-agent: Deno/${Deno.version.deno}\r\n`, - "accept-encoding: gzip, br\r\n", + "accept-encoding: gzip,br\r\n", `host: ${addr}\r\n\r\n`, ].join(""); assertEquals(actual, expected); @@ -750,15 +750,16 @@ Deno.test( const actual = new TextDecoder().decode((await bufPromise).bytes()); const expected = [ "POST /blah HTTP/1.1\r\n", + `content-length: ${body.length}\r\n`, "hello: World\r\n", "foo: Bar\r\n", "content-type: text/plain;charset=UTF-8\r\n", "accept: */*\r\n", "accept-language: *\r\n", `user-agent: Deno/${Deno.version.deno}\r\n`, - "accept-encoding: gzip, br\r\n", + "accept-encoding: gzip,br\r\n", `host: ${addr}\r\n`, - `content-length: ${body.length}\r\n\r\n`, + `\r\n`, body, ].join(""); assertEquals(actual, expected); @@ -789,14 +790,15 @@ Deno.test( const actual = new TextDecoder().decode((await bufPromise).bytes()); const expected = [ "POST /blah HTTP/1.1\r\n", + `content-length: ${body.byteLength}\r\n`, "hello: World\r\n", "foo: Bar\r\n", "accept: */*\r\n", "accept-language: *\r\n", `user-agent: Deno/${Deno.version.deno}\r\n`, - "accept-encoding: gzip, br\r\n", + "accept-encoding: gzip,br\r\n", `host: ${addr}\r\n`, - `content-length: ${body.byteLength}\r\n\r\n`, + `\r\n`, bodyStr, ].join(""); assertEquals(actual, expected); @@ -827,7 +829,7 @@ Deno.test( "accept: */*\r\n", "accept-language: *\r\n", `user-agent: Deno/${Deno.version.deno}\r\n`, - "accept-encoding: gzip, br\r\n", + "accept-encoding: gzip,br\r\n", `host: ${addr}\r\n\r\n`, ].join(""); assertEquals(actual, expected); @@ -859,7 +861,7 @@ Deno.test( "accept: */*\r\n", "accept-language: *\r\n", `user-agent: Deno/${Deno.version.deno}\r\n`, - "accept-encoding: gzip, br\r\n\r\n", + "accept-encoding: gzip,br\r\n\r\n", ].join(""); assertEquals(actual, expected); }, @@ -1226,7 +1228,7 @@ Deno.test( "accept: */*\r\n", "accept-language: *\r\n", `user-agent: Deno/${Deno.version.deno}\r\n`, - "accept-encoding: gzip, br\r\n", + "accept-encoding: gzip,br\r\n", `host: ${addr}\r\n`, `transfer-encoding: chunked\r\n\r\n`, "B\r\n", @@ -1824,7 +1826,7 @@ Deno.test( await fetch(`http://${addr}/`); }, TypeError, - "error sending request", + "client error", ); listener.close(); @@ -1880,7 +1882,7 @@ Deno.test( await response.arrayBuffer(); }, Error, - "error decoding response body", + "body", ); listener.close(); diff --git a/tests/unit/http_test.ts b/tests/unit/http_test.ts index eddb1520b4..03c30965ff 100644 --- a/tests/unit/http_test.ts +++ b/tests/unit/http_test.ts @@ -2572,9 +2572,11 @@ for (const compression of [true, false]) { const result = await reader.read(); assert(!result.done); assertEquals(result.value, new Uint8Array([65])); - const err = await assertRejects(() => reader.read()); - assert(err instanceof TypeError); - assert(err.message.includes("error decoding response body")); + await assertRejects( + () => reader.read(), + TypeError, + "body", + ); const httpConn = await server; httpConn.close(); @@ -2608,9 +2610,11 @@ for (const compression of [true, false]) { const result = await reader.read(); assert(!result.done); assertEquals(result.value, new Uint8Array([65])); - const err = await assertRejects(() => reader.read()); - assert(err instanceof TypeError); - assert(err.message.includes("error decoding response body")); + await assertRejects( + () => reader.read(), + TypeError, + "body", + ); const httpConn = await server; httpConn.close(); diff --git a/tests/unit/serve_test.ts b/tests/unit/serve_test.ts index 4239221be3..ee87d8189d 100644 --- a/tests/unit/serve_test.ts +++ b/tests/unit/serve_test.ts @@ -3522,7 +3522,7 @@ Deno.test( fail(); } catch (clientError) { assert(clientError instanceof TypeError); - assert(clientError.message.includes("error sending request for url")); + assert(clientError.message.includes("client error")); } finally { ac.abort(); await server.finished; @@ -3570,7 +3570,7 @@ Deno.test({ fail(); } catch (clientError) { assert(clientError instanceof TypeError); - assert(clientError.message.includes("error sending request for url")); + assert(clientError.message.includes("client error")); } finally { ac.abort(); await server.finished;