1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-28 16:20:57 -05:00
denoland-deno/ext/fetch/lib.rs

1211 lines
33 KiB
Rust
Raw Normal View History

// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
2020-09-18 09:20:55 -04:00
mod fs_fetch_handler;
mod proxy;
#[cfg(test)]
mod tests;
use std::borrow::Cow;
use std::cell::RefCell;
use std::cmp::min;
use std::convert::From;
use std::path::Path;
use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use deno_core::anyhow::anyhow;
use deno_core::anyhow::Error;
2020-09-18 09:20:55 -04:00
use deno_core::error::type_error;
use deno_core::error::AnyError;
feat(core): improve resource read & write traits (#16115) This commit introduces two new buffer wrapper types to `deno_core`. The main benefit of these new wrappers is that they can wrap a number of different underlying buffer types. This allows for a more flexible read and write API on resources that will require less copying of data between different buffer representations. - `BufView` is a read-only view onto a buffer. It can be backed by `ZeroCopyBuf`, `Vec<u8>`, and `bytes::Bytes`. - `BufViewMut` is a read-write view onto a buffer. It can be cheaply converted into a `BufView`. It can be backed by `ZeroCopyBuf` or `Vec<u8>`. Both new buffer views have a cursor. This means that the start point of the view can be constrained to write / read from just a slice of the view. Only the start point of the slice can be adjusted. The end point is fixed. To adjust the end point, the underlying buffer needs to be truncated. Readable resources have been changed to better cater to resources that do not support BYOB reads. The basic `read` method now returns a `BufView` instead of taking a `ZeroCopyBuf` to fill. This allows the operation to return buffers that the resource has already allocated, instead of forcing the caller to allocate the buffer. BYOB reads are still very useful for resources that support them, so a new `read_byob` method has been added that takes a `BufViewMut` to fill. `op_read` attempts to use `read_byob` if the resource supports it, which falls back to `read` and performs an additional copy if it does not. For Rust->JS reads this change should have no impact, but for Rust->Rust reads, this allows the caller to avoid an additional copy in many scenarios. This combined with the support for `BufView` to be backed by `bytes::Bytes` allows us to avoid one data copy when piping from a `fetch` response into an `ext/http` response. Writable resources have been changed to take a `BufView` instead of a `ZeroCopyBuf` as an argument. This allows for less copying of data in certain scenarios, as described above. Additionally a new `Resource::write_all` method has been added that takes a `BufView` and continually attempts to write the resource until the entire buffer has been written. Certain resources like files can override this method to provide a more efficient `write_all` implementation.
2022-10-09 10:49:25 -04:00
use deno_core::futures::stream::Peekable;
use deno_core::futures::Future;
use deno_core::futures::FutureExt;
use deno_core::futures::Stream;
use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt;
use deno_core::op2;
use deno_core::unsync::spawn;
2020-09-18 09:20:55 -04:00
use deno_core::url::Url;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::Canceled;
use deno_core::JsBuffer;
2020-09-18 09:20:55 -04:00
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_tls::rustls::RootCertStore;
use deno_tls::Proxy;
use deno_tls::RootCertStoreProvider;
use deno_tls::TlsKey;
use deno_tls::TlsKeys;
use deno_tls::TlsKeysHolder;
use bytes::Bytes;
use data_url::DataUrl;
use http::header::HeaderName;
use http::header::HeaderValue;
use http::header::ACCEPT;
use http::header::ACCEPT_ENCODING;
use http::header::AUTHORIZATION;
use http::header::CONTENT_LENGTH;
use http::header::HOST;
use http::header::PROXY_AUTHORIZATION;
use http::header::RANGE;
use http::header::USER_AGENT;
use http::Extensions;
use http::Method;
use http::Uri;
use http_body_util::BodyExt;
use hyper::body::Frame;
use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::connect::HttpInfo;
use hyper_util::rt::TokioExecutor;
use hyper_util::rt::TokioIo;
use hyper_util::rt::TokioTimer;
2020-09-18 09:20:55 -04:00
use serde::Deserialize;
use serde::Serialize;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tower::ServiceExt;
use tower_http::decompression::Decompression;
2020-09-18 09:20:55 -04:00
// Re-export data_url
pub use data_url;
pub use proxy::basic_auth;
2020-09-18 12:31:30 -04:00
pub use fs_fetch_handler::FsFetchHandler;
#[derive(Clone)]
pub struct Options {
pub user_agent: String,
pub root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
pub proxy: Option<Proxy>,
#[allow(clippy::type_complexity)]
pub request_builder_hook:
Option<fn(&mut http::Request<ReqBody>) -> Result<(), AnyError>>,
pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
pub client_cert_chain_and_key: TlsKeys,
pub file_fetch_handler: Rc<dyn FetchHandler>,
}
impl Options {
pub fn root_cert_store(&self) -> Result<Option<RootCertStore>, AnyError> {
Ok(match &self.root_cert_store_provider {
Some(provider) => Some(provider.get_or_try_init()?.clone()),
None => None,
})
}
}
impl Default for Options {
fn default() -> Self {
Self {
user_agent: "".to_string(),
root_cert_store_provider: None,
proxy: None,
request_builder_hook: None,
unsafely_ignore_certificate_errors: None,
client_cert_chain_and_key: TlsKeys::Null,
file_fetch_handler: Rc::new(DefaultFileFetchHandler),
}
}
}
deno_core::extension!(deno_fetch,
deps = [ deno_webidl, deno_web, deno_url, deno_console ],
parameters = [FP: FetchPermissions],
ops = [
op_fetch<FP>,
op_fetch_send,
op_fetch_response_upgrade,
op_utf8_to_byte_string,
op_fetch_custom_client<FP>,
],
esm = [
"20_headers.js",
"21_formdata.js",
"22_body.js",
"22_http_client.js",
"23_request.js",
"23_response.js",
"26_fetch.js",
"27_eventsource.js"
],
options = {
options: Options,
},
state = |state, options| {
state.put::<Options>(options.options);
},
);
2020-09-18 09:20:55 -04:00
pub type CancelableResponseFuture =
Pin<Box<dyn Future<Output = CancelableResponseResult>>>;
pub trait FetchHandler: dyn_clone::DynClone {
// Return the result of the fetch request consisting of a tuple of the
// cancelable response result, the optional fetch body resource and the
// optional cancel handle.
fn fetch_file(
&self,
state: &mut OpState,
url: &Url,
) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>);
}
dyn_clone::clone_trait_object!(FetchHandler);
/// A default implementation which will error for every request.
#[derive(Clone)]
pub struct DefaultFileFetchHandler;
impl FetchHandler for DefaultFileFetchHandler {
fn fetch_file(
&self,
_state: &mut OpState,
_url: &Url,
) -> (CancelableResponseFuture, Option<Rc<CancelHandle>>) {
let fut = async move {
Ok(Err(type_error(
"NetworkError when attempting to fetch resource.",
)))
};
(Box::pin(fut), None)
}
}
pub fn get_declaration() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_fetch.d.ts")
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct FetchReturn {
pub request_rid: ResourceId,
pub cancel_handle_rid: Option<ResourceId>,
}
pub fn get_or_create_client_from_state(
state: &mut OpState,
) -> Result<Client, AnyError> {
if let Some(client) = state.try_borrow::<Client>() {
Ok(client.clone())
} else {
let options = state.borrow::<Options>();
let client = create_client_from_options(options)?;
state.put::<Client>(client.clone());
Ok(client)
}
}
pub fn create_client_from_options(
options: &Options,
) -> Result<Client, AnyError> {
create_http_client(
&options.user_agent,
CreateHttpClientOptions {
root_cert_store: options.root_cert_store()?,
ca_certs: vec![],
proxy: options.proxy.clone(),
unsafely_ignore_certificate_errors: options
.unsafely_ignore_certificate_errors
.clone(),
client_cert_chain_and_key: options
.client_cert_chain_and_key
.clone()
.try_into()
.unwrap_or_default(),
pool_max_idle_per_host: None,
pool_idle_timeout: None,
http1: true,
http2: true,
},
)
}
#[allow(clippy::type_complexity)]
pub struct ResourceToBodyAdapter(
Rc<dyn Resource>,
Option<Pin<Box<dyn Future<Output = Result<BufView, Error>>>>>,
);
impl ResourceToBodyAdapter {
pub fn new(resource: Rc<dyn Resource>) -> Self {
let future = resource.clone().read(64 * 1024);
Self(resource, Some(future))
}
}
// SAFETY: we only use this on a single-threaded executor
unsafe impl Send for ResourceToBodyAdapter {}
// SAFETY: we only use this on a single-threaded executor
unsafe impl Sync for ResourceToBodyAdapter {}
impl Stream for ResourceToBodyAdapter {
type Item = Result<Bytes, Error>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(mut fut) = this.1.take() {
match fut.poll_unpin(cx) {
Poll::Pending => {
this.1 = Some(fut);
Poll::Pending
}
Poll::Ready(res) => match res {
Ok(buf) if buf.is_empty() => Poll::Ready(None),
Ok(buf) => {
this.1 = Some(this.0.clone().read(64 * 1024));
Poll::Ready(Some(Ok(buf.to_vec().into())))
}
Err(err) => Poll::Ready(Some(Err(err))),
},
}
} else {
Poll::Ready(None)
}
}
}
impl hyper::body::Body for ResourceToBodyAdapter {
type Data = Bytes;
type Error = Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.poll_next(cx) {
Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
impl Drop for ResourceToBodyAdapter {
fn drop(&mut self) {
self.0.clone().close()
}
}
pub trait FetchPermissions {
fn check_net_url(
&mut self,
_url: &Url,
api_name: &str,
) -> Result<(), AnyError>;
fn check_read(&mut self, _p: &Path, api_name: &str) -> Result<(), AnyError>;
}
impl FetchPermissions for deno_permissions::PermissionsContainer {
#[inline(always)]
fn check_net_url(
&mut self,
url: &Url,
api_name: &str,
) -> Result<(), AnyError> {
deno_permissions::PermissionsContainer::check_net_url(self, url, api_name)
}
#[inline(always)]
fn check_read(
&mut self,
path: &Path,
api_name: &str,
) -> Result<(), AnyError> {
deno_permissions::PermissionsContainer::check_read(self, path, api_name)
}
}
#[op2]
#[serde]
#[allow(clippy::too_many_arguments)]
pub fn op_fetch<FP>(
state: &mut OpState,
#[serde] method: ByteString,
#[string] url: String,
#[serde] headers: Vec<(ByteString, ByteString)>,
#[smi] client_rid: Option<u32>,
has_body: bool,
#[buffer] data: Option<JsBuffer>,
#[smi] resource: Option<ResourceId>,
) -> Result<FetchReturn, AnyError>
2020-09-18 09:20:55 -04:00
where
FP: FetchPermissions + 'static,
{
let (client, allow_host) = if let Some(rid) = client_rid {
let r = state.resource_table.get::<HttpClientResource>(rid)?;
(r.client.clone(), r.allow_host)
2020-09-18 09:20:55 -04:00
} else {
(get_or_create_client_from_state(state)?, false)
2020-09-18 09:20:55 -04:00
};
let method = Method::from_bytes(&method)?;
let mut url = Url::parse(&url)?;
2020-09-18 09:20:55 -04:00
// Check scheme before asking for net permission
let scheme = url.scheme();
let (request_rid, cancel_handle_rid) = match scheme {
"file" => {
let path = url.to_file_path().map_err(|_| {
type_error("NetworkError when attempting to fetch resource.")
})?;
let permissions = state.borrow_mut::<FP>();
permissions.check_read(&path, "fetch()")?;
if method != Method::GET {
return Err(type_error(format!(
"Fetching files only supports the GET method. Received {method}."
)));
}
let Options {
file_fetch_handler, ..
} = state.borrow_mut::<Options>();
let file_fetch_handler = file_fetch_handler.clone();
let (future, maybe_cancel_handle) =
file_fetch_handler.fetch_file(state, &url);
let request_rid = state
.resource_table
.add(FetchRequestResource { future, url });
let maybe_cancel_handle_rid = maybe_cancel_handle
.map(|ch| state.resource_table.add(FetchCancelHandle(ch)));
(request_rid, maybe_cancel_handle_rid)
}
"http" | "https" => {
let permissions = state.borrow_mut::<FP>();
permissions.check_net_url(&url, "fetch()")?;
2020-09-18 09:20:55 -04:00
let maybe_authority = extract_authority(&mut url);
let uri = url
.as_str()
.parse::<Uri>()
.map_err(|_| type_error("Invalid URL"))?;
Revert "refactor(fetch): reimplement fetch with hyper instead of reqwest (#24237)" (#24574) This reverts commit f6fd6619e708a515831f707438368d81b0c9aa56. I'm seeing a difference between canary and 1.45.2. In `deno-docs/reference_gen` I can't download dax when running `deno task types` ``` ~/src/deno-docs/reference_gen# deno upgrade --canary Looking up latest canary version Found latest version f6fd6619e708a515831f707438368d81b0c9aa56 Downloading https://dl.deno.land/canary/f6fd6619e708a515831f707438368d81b0c9aa56/deno-aarch64-apple-darwin.zip Deno is upgrading to version f6fd6619e708a515831f707438368d81b0c9aa56 Archive: /var/folders/9v/kys6gqns6kl8nksyn4l1f9v40000gn/T/.tmpb5lDnq/deno.zip inflating: deno Upgraded successfully ~/src/deno-docs/reference_gen# deno -v deno 1.45.2+f6fd661 ~/src/deno-docs/reference_gen# rm -rf /Users/ry/Library/Caches/deno ~/src/deno-docs/reference_gen# deno task types Task types deno task types:deno && deno task types:node Task types:deno deno run --allow-read --allow-write --allow-run --allow-env --allow-sys deno-docs.ts error: JSR package manifest for '@david/dax' failed to load. expected value at line 1 column 1 at file:///Users/ry/src/deno-docs/reference_gen/deno-docs.ts:2:15 ~/src/deno-docs/reference_gen# deno upgrade --version 1.45.2 Downloading https://github.com/denoland/deno/releases/download/v1.45.2/deno-aarch64-apple-darwin.zip Deno is upgrading to version 1.45.2 Archive: /var/folders/9v/kys6gqns6kl8nksyn4l1f9v40000gn/T/.tmp3R7uhF/deno.zip inflating: deno Upgraded successfully ~/src/deno-docs/reference_gen# rm -rf /Users/ry/Library/Caches/deno ~/src/deno-docs/reference_gen# deno task types Task types deno task types:deno && deno task types:node Task types:deno deno run --allow-read --allow-write --allow-run --allow-env --allow-sys deno-docs.ts Task types:node deno run --allow-read --allow-write=. --allow-env --allow-sys node-docs.ts ```
2024-07-13 17:08:23 -04:00
let mut con_len = None;
let body = if has_body {
match (data, resource) {
(Some(data), _) => {
// If a body is passed, we use it, and don't return a body for streaming.
con_len = Some(data.len() as u64);
http_body_util::Full::new(data.to_vec().into())
.map_err(|never| match never {})
.boxed()
}
(_, Some(resource)) => {
let resource = state.resource_table.take_any(resource)?;
match resource.size_hint() {
(body_size, Some(n)) if body_size == n && body_size > 0 => {
con_len = Some(body_size);
}
_ => {}
}
ReqBody::new(ResourceToBodyAdapter::new(resource))
}
(None, None) => unreachable!(),
}
} else {
// POST and PUT requests should always have a 0 length content-length,
// if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch
if matches!(method, Method::POST | Method::PUT) {
con_len = Some(0);
}
http_body_util::Empty::new()
.map_err(|never| match never {})
.boxed()
};
let mut request = http::Request::new(body);
*request.method_mut() = method.clone();
*request.uri_mut() = uri.clone();
if let Some((username, password)) = maybe_authority {
request.headers_mut().insert(
AUTHORIZATION,
proxy::basic_auth(&username, password.as_deref()),
);
}
if let Some(len) = con_len {
request.headers_mut().insert(CONTENT_LENGTH, len.into());
}
for (key, value) in headers {
let name = HeaderName::from_bytes(&key)
.map_err(|err| type_error(err.to_string()))?;
let v = HeaderValue::from_bytes(&value)
.map_err(|err| type_error(err.to_string()))?;
if (name != HOST || allow_host) && name != CONTENT_LENGTH {
request.headers_mut().append(name, v);
}
}
if request.headers().contains_key(RANGE) {
// https://fetch.spec.whatwg.org/#http-network-or-cache-fetch step 18
// If httpRequests header list contains `Range`, then append (`Accept-Encoding`, `identity`)
request
.headers_mut()
.insert(ACCEPT_ENCODING, HeaderValue::from_static("identity"));
}
let options = state.borrow::<Options>();
if let Some(request_builder_hook) = options.request_builder_hook {
request_builder_hook(&mut request)
.map_err(|err| type_error(err.to_string()))?;
}
let cancel_handle = CancelHandle::new_rc();
let cancel_handle_ = cancel_handle.clone();
let fut = {
async move {
client
.send(request)
.map_err(Into::into)
.or_cancel(cancel_handle_)
.await
}
};
let request_rid = state.resource_table.add(FetchRequestResource {
future: Box::pin(fut),
url,
});
let cancel_handle_rid =
state.resource_table.add(FetchCancelHandle(cancel_handle));
(request_rid, Some(cancel_handle_rid))
}
"data" => {
let data_url = DataUrl::process(url.as_str())
.map_err(|e| type_error(format!("{e:?}")))?;
2020-09-18 09:20:55 -04:00
let (body, _) = data_url
.decode_to_vec()
.map_err(|e| type_error(format!("{e:?}")))?;
let body = http_body_util::Full::new(body.into())
.map_err(|never| match never {})
.boxed();
2020-09-18 09:20:55 -04:00
let response = http::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, data_url.mime_type().to_string())
.body(body)?;
let fut = async move { Ok(Ok(response)) };
let request_rid = state.resource_table.add(FetchRequestResource {
future: Box::pin(fut),
url,
});
(request_rid, None)
}
"blob" => {
// Blob URL resolution happens in the JS side of fetch. If we got here is
// because the URL isn't an object URL.
return Err(type_error("Blob for the given URL not found."));
}
_ => return Err(type_error(format!("scheme '{scheme}' not supported"))),
};
Ok(FetchReturn {
request_rid,
cancel_handle_rid,
})
}
#[derive(Default, Serialize)]
2021-03-17 17:33:29 -04:00
#[serde(rename_all = "camelCase")]
pub struct FetchResponse {
pub status: u16,
pub status_text: String,
pub headers: Vec<(ByteString, ByteString)>,
pub url: String,
pub response_rid: ResourceId,
pub content_length: Option<u64>,
pub remote_addr_ip: Option<String>,
pub remote_addr_port: Option<u16>,
/// This field is populated if some error occurred which needs to be
/// reconstructed in the JS side to set the error _cause_.
/// In the tuple, the first element is an error message and the second one is
/// an error cause.
pub error: Option<(String, String)>,
2021-03-17 17:33:29 -04:00
}
#[op2(async)]
#[serde]
pub async fn op_fetch_send(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<FetchResponse, AnyError> {
let request = state
.borrow_mut()
.resource_table
.take::<FetchRequestResource>(rid)?;
let request = Rc::try_unwrap(request)
.ok()
.expect("multiple op_fetch_send ongoing");
let res = match request.future.await {
Ok(Ok(res)) => res,
Ok(Err(err)) => {
// We're going to try and rescue the error cause from a stream and return it from this fetch.
// If any error in the chain is a hyper body error, return that as a special result we can use to
// reconstruct an error chain (eg: `new TypeError(..., { cause: new Error(...) })`).
// TODO(mmastrac): it would be a lot easier if we just passed a v8::Global through here instead
let mut err_ref: &dyn std::error::Error = err.as_ref();
while let Some(err_src) = std::error::Error::source(err_ref) {
if let Some(err_src) = err_src.downcast_ref::<hyper::Error>() {
if let Some(err_src) = std::error::Error::source(err_src) {
return Ok(FetchResponse {
error: Some((err.to_string(), err_src.to_string())),
..Default::default()
});
}
}
err_ref = err_src;
}
return Err(type_error(err.to_string()));
}
Err(_) => return Err(type_error("request was cancelled")),
};
2020-09-18 09:20:55 -04:00
let status = res.status();
let url = request.url.into();
2020-09-18 09:20:55 -04:00
let mut res_headers = Vec::new();
for (key, val) in res.headers().iter() {
res_headers.push((key.as_str().into(), val.as_bytes().into()));
2020-09-18 09:20:55 -04:00
}
let content_length = hyper::body::Body::size_hint(res.body()).exact();
let remote_addr = res
.extensions()
.get::<hyper_util::client::legacy::connect::HttpInfo>()
.map(|info| info.remote_addr());
let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr {
(Some(addr.ip().to_string()), Some(addr.port()))
} else {
(None, None)
};
let response_rid = state
.borrow_mut()
.resource_table
.add(FetchResponseResource::new(res, content_length));
2020-09-18 09:20:55 -04:00
Ok(FetchResponse {
status: status.as_u16(),
status_text: status.canonical_reason().unwrap_or("").to_string(),
headers: res_headers,
url,
response_rid,
content_length,
remote_addr_ip,
remote_addr_port,
error: None,
})
2021-03-17 17:33:29 -04:00
}
#[op2(async)]
#[smi]
pub async fn op_fetch_response_upgrade(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<ResourceId, AnyError> {
let raw_response = state
.borrow_mut()
.resource_table
.take::<FetchResponseResource>(rid)?;
let raw_response = Rc::try_unwrap(raw_response)
.expect("Someone is holding onto FetchResponseResource");
let (read, write) = tokio::io::duplex(1024);
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
let upgraded = raw_response.upgrade().await?;
{
// Stage 3: Pump the data
let (mut upgraded_rx, mut upgraded_tx) =
tokio::io::split(TokioIo::new(upgraded));
spawn(async move {
let mut buf = [0; 1024];
loop {
let read = upgraded_rx.read(&mut buf).await?;
if read == 0 {
break;
}
read_tx.write_all(&buf[..read]).await?;
}
Ok::<_, AnyError>(())
});
spawn(async move {
let mut buf = [0; 1024];
loop {
let read = write_rx.read(&mut buf).await?;
if read == 0 {
break;
}
upgraded_tx.write_all(&buf[..read]).await?;
}
Ok::<_, AnyError>(())
});
}
Ok(
state
.borrow_mut()
.resource_table
.add(UpgradeStream::new(read_rx, write_tx)),
)
}
struct UpgradeStream {
read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,
cancel_handle: CancelHandle,
}
impl UpgradeStream {
pub fn new(
read: tokio::io::ReadHalf<tokio::io::DuplexStream>,
write: tokio::io::WriteHalf<tokio::io::DuplexStream>,
) -> Self {
Self {
read: AsyncRefCell::new(read),
write: AsyncRefCell::new(write),
cancel_handle: CancelHandle::new(),
}
}
async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let read = RcRef::map(self, |this| &this.read);
let mut read = read.borrow_mut().await;
Ok(Pin::new(&mut *read).read(buf).await?)
}
.try_or_cancel(cancel_handle)
.await
}
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let write = RcRef::map(self, |this| &this.write);
let mut write = write.borrow_mut().await;
Ok(Pin::new(&mut *write).write(buf).await?)
}
.try_or_cancel(cancel_handle)
.await
}
}
impl Resource for UpgradeStream {
fn name(&self) -> Cow<str> {
"fetchUpgradedStream".into()
}
deno_core::impl_readable_byob!();
deno_core::impl_writable!();
fn close(self: Rc<Self>) {
self.cancel_handle.cancel();
}
}
type CancelableResponseResult =
Result<Result<http::Response<ResBody>, AnyError>, Canceled>;
pub struct FetchRequestResource {
pub future: Pin<Box<dyn Future<Output = CancelableResponseResult>>>,
pub url: Url,
}
impl Resource for FetchRequestResource {
fn name(&self) -> Cow<str> {
"fetchRequest".into()
}
}
2020-09-18 09:20:55 -04:00
pub struct FetchCancelHandle(pub Rc<CancelHandle>);
impl Resource for FetchCancelHandle {
fn name(&self) -> Cow<str> {
"fetchCancelHandle".into()
}
fn close(self: Rc<Self>) {
self.0.cancel()
}
}
type BytesStream =
Pin<Box<dyn Stream<Item = Result<bytes::Bytes, std::io::Error>> + Unpin>>;
pub enum FetchResponseReader {
Start(http::Response<ResBody>),
BodyReader(Peekable<BytesStream>),
}
impl Default for FetchResponseReader {
fn default() -> Self {
let stream: BytesStream = Box::pin(deno_core::futures::stream::empty());
Self::BodyReader(stream.peekable())
}
}
#[derive(Debug)]
pub struct FetchResponseResource {
pub response_reader: AsyncRefCell<FetchResponseReader>,
pub cancel: CancelHandle,
pub size: Option<u64>,
}
impl FetchResponseResource {
pub fn new(response: http::Response<ResBody>, size: Option<u64>) -> Self {
Self {
response_reader: AsyncRefCell::new(FetchResponseReader::Start(response)),
cancel: CancelHandle::default(),
size,
}
}
pub async fn upgrade(self) -> Result<hyper::upgrade::Upgraded, AnyError> {
let reader = self.response_reader.into_inner();
match reader {
FetchResponseReader::Start(resp) => Ok(hyper::upgrade::on(resp).await?),
_ => unreachable!(),
}
}
}
impl Resource for FetchResponseResource {
fn name(&self) -> Cow<str> {
"fetchResponse".into()
}
feat(core): improve resource read & write traits (#16115) This commit introduces two new buffer wrapper types to `deno_core`. The main benefit of these new wrappers is that they can wrap a number of different underlying buffer types. This allows for a more flexible read and write API on resources that will require less copying of data between different buffer representations. - `BufView` is a read-only view onto a buffer. It can be backed by `ZeroCopyBuf`, `Vec<u8>`, and `bytes::Bytes`. - `BufViewMut` is a read-write view onto a buffer. It can be cheaply converted into a `BufView`. It can be backed by `ZeroCopyBuf` or `Vec<u8>`. Both new buffer views have a cursor. This means that the start point of the view can be constrained to write / read from just a slice of the view. Only the start point of the slice can be adjusted. The end point is fixed. To adjust the end point, the underlying buffer needs to be truncated. Readable resources have been changed to better cater to resources that do not support BYOB reads. The basic `read` method now returns a `BufView` instead of taking a `ZeroCopyBuf` to fill. This allows the operation to return buffers that the resource has already allocated, instead of forcing the caller to allocate the buffer. BYOB reads are still very useful for resources that support them, so a new `read_byob` method has been added that takes a `BufViewMut` to fill. `op_read` attempts to use `read_byob` if the resource supports it, which falls back to `read` and performs an additional copy if it does not. For Rust->JS reads this change should have no impact, but for Rust->Rust reads, this allows the caller to avoid an additional copy in many scenarios. This combined with the support for `BufView` to be backed by `bytes::Bytes` allows us to avoid one data copy when piping from a `fetch` response into an `ext/http` response. Writable resources have been changed to take a `BufView` instead of a `ZeroCopyBuf` as an argument. This allows for less copying of data in certain scenarios, as described above. Additionally a new `Resource::write_all` method has been added that takes a `BufView` and continually attempts to write the resource until the entire buffer has been written. Certain resources like files can override this method to provide a more efficient `write_all` implementation.
2022-10-09 10:49:25 -04:00
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(async move {
let mut reader =
RcRef::map(&self, |r| &r.response_reader).borrow_mut().await;
feat(core): improve resource read & write traits (#16115) This commit introduces two new buffer wrapper types to `deno_core`. The main benefit of these new wrappers is that they can wrap a number of different underlying buffer types. This allows for a more flexible read and write API on resources that will require less copying of data between different buffer representations. - `BufView` is a read-only view onto a buffer. It can be backed by `ZeroCopyBuf`, `Vec<u8>`, and `bytes::Bytes`. - `BufViewMut` is a read-write view onto a buffer. It can be cheaply converted into a `BufView`. It can be backed by `ZeroCopyBuf` or `Vec<u8>`. Both new buffer views have a cursor. This means that the start point of the view can be constrained to write / read from just a slice of the view. Only the start point of the slice can be adjusted. The end point is fixed. To adjust the end point, the underlying buffer needs to be truncated. Readable resources have been changed to better cater to resources that do not support BYOB reads. The basic `read` method now returns a `BufView` instead of taking a `ZeroCopyBuf` to fill. This allows the operation to return buffers that the resource has already allocated, instead of forcing the caller to allocate the buffer. BYOB reads are still very useful for resources that support them, so a new `read_byob` method has been added that takes a `BufViewMut` to fill. `op_read` attempts to use `read_byob` if the resource supports it, which falls back to `read` and performs an additional copy if it does not. For Rust->JS reads this change should have no impact, but for Rust->Rust reads, this allows the caller to avoid an additional copy in many scenarios. This combined with the support for `BufView` to be backed by `bytes::Bytes` allows us to avoid one data copy when piping from a `fetch` response into an `ext/http` response. Writable resources have been changed to take a `BufView` instead of a `ZeroCopyBuf` as an argument. This allows for less copying of data in certain scenarios, as described above. Additionally a new `Resource::write_all` method has been added that takes a `BufView` and continually attempts to write the resource until the entire buffer has been written. Certain resources like files can override this method to provide a more efficient `write_all` implementation.
2022-10-09 10:49:25 -04:00
let body = loop {
match &mut *reader {
FetchResponseReader::BodyReader(reader) => break reader,
FetchResponseReader::Start(_) => {}
}
match std::mem::take(&mut *reader) {
FetchResponseReader::Start(resp) => {
let stream: BytesStream =
Box::pin(resp.into_body().into_data_stream().map(|r| {
r.map_err(|err| {
std::io::Error::new(std::io::ErrorKind::Other, err)
})
}));
*reader = FetchResponseReader::BodyReader(stream.peekable());
}
FetchResponseReader::BodyReader(_) => unreachable!(),
}
};
feat(core): improve resource read & write traits (#16115) This commit introduces two new buffer wrapper types to `deno_core`. The main benefit of these new wrappers is that they can wrap a number of different underlying buffer types. This allows for a more flexible read and write API on resources that will require less copying of data between different buffer representations. - `BufView` is a read-only view onto a buffer. It can be backed by `ZeroCopyBuf`, `Vec<u8>`, and `bytes::Bytes`. - `BufViewMut` is a read-write view onto a buffer. It can be cheaply converted into a `BufView`. It can be backed by `ZeroCopyBuf` or `Vec<u8>`. Both new buffer views have a cursor. This means that the start point of the view can be constrained to write / read from just a slice of the view. Only the start point of the slice can be adjusted. The end point is fixed. To adjust the end point, the underlying buffer needs to be truncated. Readable resources have been changed to better cater to resources that do not support BYOB reads. The basic `read` method now returns a `BufView` instead of taking a `ZeroCopyBuf` to fill. This allows the operation to return buffers that the resource has already allocated, instead of forcing the caller to allocate the buffer. BYOB reads are still very useful for resources that support them, so a new `read_byob` method has been added that takes a `BufViewMut` to fill. `op_read` attempts to use `read_byob` if the resource supports it, which falls back to `read` and performs an additional copy if it does not. For Rust->JS reads this change should have no impact, but for Rust->Rust reads, this allows the caller to avoid an additional copy in many scenarios. This combined with the support for `BufView` to be backed by `bytes::Bytes` allows us to avoid one data copy when piping from a `fetch` response into an `ext/http` response. Writable resources have been changed to take a `BufView` instead of a `ZeroCopyBuf` as an argument. This allows for less copying of data in certain scenarios, as described above. Additionally a new `Resource::write_all` method has been added that takes a `BufView` and continually attempts to write the resource until the entire buffer has been written. Certain resources like files can override this method to provide a more efficient `write_all` implementation.
2022-10-09 10:49:25 -04:00
let fut = async move {
let mut reader = Pin::new(body);
feat(core): improve resource read & write traits (#16115) This commit introduces two new buffer wrapper types to `deno_core`. The main benefit of these new wrappers is that they can wrap a number of different underlying buffer types. This allows for a more flexible read and write API on resources that will require less copying of data between different buffer representations. - `BufView` is a read-only view onto a buffer. It can be backed by `ZeroCopyBuf`, `Vec<u8>`, and `bytes::Bytes`. - `BufViewMut` is a read-write view onto a buffer. It can be cheaply converted into a `BufView`. It can be backed by `ZeroCopyBuf` or `Vec<u8>`. Both new buffer views have a cursor. This means that the start point of the view can be constrained to write / read from just a slice of the view. Only the start point of the slice can be adjusted. The end point is fixed. To adjust the end point, the underlying buffer needs to be truncated. Readable resources have been changed to better cater to resources that do not support BYOB reads. The basic `read` method now returns a `BufView` instead of taking a `ZeroCopyBuf` to fill. This allows the operation to return buffers that the resource has already allocated, instead of forcing the caller to allocate the buffer. BYOB reads are still very useful for resources that support them, so a new `read_byob` method has been added that takes a `BufViewMut` to fill. `op_read` attempts to use `read_byob` if the resource supports it, which falls back to `read` and performs an additional copy if it does not. For Rust->JS reads this change should have no impact, but for Rust->Rust reads, this allows the caller to avoid an additional copy in many scenarios. This combined with the support for `BufView` to be backed by `bytes::Bytes` allows us to avoid one data copy when piping from a `fetch` response into an `ext/http` response. Writable resources have been changed to take a `BufView` instead of a `ZeroCopyBuf` as an argument. This allows for less copying of data in certain scenarios, as described above. Additionally a new `Resource::write_all` method has been added that takes a `BufView` and continually attempts to write the resource until the entire buffer has been written. Certain resources like files can override this method to provide a more efficient `write_all` implementation.
2022-10-09 10:49:25 -04:00
loop {
match reader.as_mut().peek_mut().await {
Some(Ok(chunk)) if !chunk.is_empty() => {
let len = min(limit, chunk.len());
let chunk = chunk.split_to(len);
break Ok(chunk.into());
}
// This unwrap is safe because `peek_mut()` returned `Some`, and thus
// currently has a peeked value that can be synchronously returned
// from `next()`.
//
// The future returned from `next()` is always ready, so we can
// safely call `await` on it without creating a race condition.
Some(_) => match reader.as_mut().next().await.unwrap() {
Ok(chunk) => assert!(chunk.is_empty()),
Err(err) => break Err(type_error(err.to_string())),
feat(core): improve resource read & write traits (#16115) This commit introduces two new buffer wrapper types to `deno_core`. The main benefit of these new wrappers is that they can wrap a number of different underlying buffer types. This allows for a more flexible read and write API on resources that will require less copying of data between different buffer representations. - `BufView` is a read-only view onto a buffer. It can be backed by `ZeroCopyBuf`, `Vec<u8>`, and `bytes::Bytes`. - `BufViewMut` is a read-write view onto a buffer. It can be cheaply converted into a `BufView`. It can be backed by `ZeroCopyBuf` or `Vec<u8>`. Both new buffer views have a cursor. This means that the start point of the view can be constrained to write / read from just a slice of the view. Only the start point of the slice can be adjusted. The end point is fixed. To adjust the end point, the underlying buffer needs to be truncated. Readable resources have been changed to better cater to resources that do not support BYOB reads. The basic `read` method now returns a `BufView` instead of taking a `ZeroCopyBuf` to fill. This allows the operation to return buffers that the resource has already allocated, instead of forcing the caller to allocate the buffer. BYOB reads are still very useful for resources that support them, so a new `read_byob` method has been added that takes a `BufViewMut` to fill. `op_read` attempts to use `read_byob` if the resource supports it, which falls back to `read` and performs an additional copy if it does not. For Rust->JS reads this change should have no impact, but for Rust->Rust reads, this allows the caller to avoid an additional copy in many scenarios. This combined with the support for `BufView` to be backed by `bytes::Bytes` allows us to avoid one data copy when piping from a `fetch` response into an `ext/http` response. Writable resources have been changed to take a `BufView` instead of a `ZeroCopyBuf` as an argument. This allows for less copying of data in certain scenarios, as described above. Additionally a new `Resource::write_all` method has been added that takes a `BufView` and continually attempts to write the resource until the entire buffer has been written. Certain resources like files can override this method to provide a more efficient `write_all` implementation.
2022-10-09 10:49:25 -04:00
},
None => break Ok(BufView::empty()),
}
}
};
let cancel_handle = RcRef::map(self, |r| &r.cancel);
fut.try_or_cancel(cancel_handle).await
})
}
fn size_hint(&self) -> (u64, Option<u64>) {
(self.size.unwrap_or(0), self.size)
}
fn close(self: Rc<Self>) {
self.cancel.cancel()
}
2020-09-18 09:20:55 -04:00
}
pub struct HttpClientResource {
pub client: Client,
pub allow_host: bool,
2020-09-18 09:20:55 -04:00
}
impl Resource for HttpClientResource {
fn name(&self) -> Cow<str> {
"httpClient".into()
}
}
2020-09-18 09:20:55 -04:00
impl HttpClientResource {
fn new(client: Client, allow_host: bool) -> Self {
Self { client, allow_host }
2020-09-18 09:20:55 -04:00
}
}
#[derive(Deserialize, Debug)]
2021-03-17 17:33:29 -04:00
#[serde(rename_all = "camelCase")]
pub struct CreateHttpClientArgs {
ca_certs: Vec<String>,
proxy: Option<Proxy>,
pool_max_idle_per_host: Option<usize>,
pool_idle_timeout: Option<serde_json::Value>,
#[serde(default = "default_true")]
http1: bool,
#[serde(default = "default_true")]
http2: bool,
#[serde(default)]
allow_host: bool,
}
fn default_true() -> bool {
true
}
#[op2]
#[smi]
pub fn op_fetch_custom_client<FP>(
2020-09-18 09:20:55 -04:00
state: &mut OpState,
#[serde] args: CreateHttpClientArgs,
#[cppgc] tls_keys: &TlsKeysHolder,
) -> Result<ResourceId, AnyError>
2020-09-18 09:20:55 -04:00
where
FP: FetchPermissions + 'static,
{
if let Some(proxy) = args.proxy.clone() {
let permissions = state.borrow_mut::<FP>();
let url = Url::parse(&proxy.url)?;
permissions.check_net_url(&url, "Deno.createHttpClient()")?;
}
let options = state.borrow::<Options>();
let ca_certs = args
.ca_certs
.into_iter()
.map(|cert| cert.into_bytes())
.collect::<Vec<_>>();
let client = create_http_client(
&options.user_agent,
CreateHttpClientOptions {
root_cert_store: options.root_cert_store()?,
ca_certs,
proxy: args.proxy,
unsafely_ignore_certificate_errors: options
.unsafely_ignore_certificate_errors
.clone(),
client_cert_chain_and_key: tls_keys.take().try_into().unwrap(),
pool_max_idle_per_host: args.pool_max_idle_per_host,
pool_idle_timeout: args.pool_idle_timeout.and_then(
|timeout| match timeout {
serde_json::Value::Bool(true) => None,
serde_json::Value::Bool(false) => Some(None),
serde_json::Value::Number(specify) => {
Some(Some(specify.as_u64().unwrap_or_default()))
}
_ => Some(None),
},
),
http1: args.http1,
http2: args.http2,
},
)?;
2020-09-18 09:20:55 -04:00
let rid = state
.resource_table
.add(HttpClientResource::new(client, args.allow_host));
Ok(rid)
2020-09-18 09:20:55 -04:00
}
#[derive(Debug, Clone)]
pub struct CreateHttpClientOptions {
pub root_cert_store: Option<RootCertStore>,
pub ca_certs: Vec<Vec<u8>>,
pub proxy: Option<Proxy>,
pub unsafely_ignore_certificate_errors: Option<Vec<String>>,
pub client_cert_chain_and_key: Option<TlsKey>,
pub pool_max_idle_per_host: Option<usize>,
pub pool_idle_timeout: Option<Option<u64>>,
pub http1: bool,
pub http2: bool,
}
impl Default for CreateHttpClientOptions {
fn default() -> Self {
CreateHttpClientOptions {
root_cert_store: None,
ca_certs: vec![],
proxy: None,
unsafely_ignore_certificate_errors: None,
client_cert_chain_and_key: None,
pool_max_idle_per_host: None,
pool_idle_timeout: None,
http1: true,
http2: true,
}
}
}
/// Create new instance of async Client. This client supports
/// proxies and doesn't follow redirects.
pub fn create_http_client(
user_agent: &str,
options: CreateHttpClientOptions,
) -> Result<Client, AnyError> {
let mut tls_config = deno_tls::create_client_config(
options.root_cert_store,
options.ca_certs,
options.unsafely_ignore_certificate_errors,
options.client_cert_chain_and_key.into(),
deno_tls::SocketUse::Http,
)?;
// Proxy TLS should not send ALPN
tls_config.alpn_protocols.clear();
let proxy_tls_config = Arc::from(tls_config.clone());
let mut alpn_protocols = vec![];
if options.http2 {
alpn_protocols.push("h2".into());
}
if options.http1 {
alpn_protocols.push("http/1.1".into());
}
tls_config.alpn_protocols = alpn_protocols;
let tls_config = Arc::from(tls_config);
let mut http_connector = HttpConnector::new();
http_connector.enforce_http(false);
let user_agent = user_agent
.parse::<HeaderValue>()
.map_err(|_| type_error("illegal characters in User-Agent"))?;
let mut builder =
hyper_util::client::legacy::Builder::new(TokioExecutor::new());
builder.timer(TokioTimer::new());
builder.pool_timer(TokioTimer::new());
let mut proxies = proxy::from_env();
if let Some(proxy) = options.proxy {
let mut intercept = proxy::Intercept::all(&proxy.url)
.ok_or_else(|| type_error("invalid proxy url"))?;
if let Some(basic_auth) = &proxy.basic_auth {
intercept.set_auth(&basic_auth.username, &basic_auth.password);
}
proxies.prepend(intercept);
}
let proxies = Arc::new(proxies);
let connector = proxy::ProxyConnector {
http: http_connector,
proxies: proxies.clone(),
tls: tls_config,
tls_proxy: proxy_tls_config,
user_agent: Some(user_agent.clone()),
};
if let Some(pool_max_idle_per_host) = options.pool_max_idle_per_host {
builder.pool_max_idle_per_host(pool_max_idle_per_host);
}
if let Some(pool_idle_timeout) = options.pool_idle_timeout {
builder.pool_idle_timeout(
pool_idle_timeout.map(std::time::Duration::from_millis),
);
}
match (options.http1, options.http2) {
(true, false) => {} // noop, handled by ALPN above
(false, true) => {
builder.http2_only(true);
}
(true, true) => {}
(false, false) => {
return Err(type_error("Either `http1` or `http2` needs to be true"))
}
}
let pooled_client = builder.build(connector);
let decompress = Decompression::new(pooled_client).gzip(true).br(true);
Ok(Client {
inner: decompress,
proxies,
user_agent,
})
}
#[op2]
#[serde]
pub fn op_utf8_to_byte_string(
#[string] input: String,
) -> Result<ByteString, AnyError> {
Ok(input.into())
}
#[derive(Clone, Debug)]
pub struct Client {
inner: Decompression<hyper_util::client::legacy::Client<Connector, ReqBody>>,
// Used to check whether to include a proxy-authorization header
proxies: Arc<proxy::Proxies>,
user_agent: HeaderValue,
}
type Connector = proxy::ProxyConnector<HttpConnector>;
// clippy is wrong here
#[allow(clippy::declare_interior_mutable_const)]
const STAR_STAR: HeaderValue = HeaderValue::from_static("*/*");
#[derive(Debug)]
pub struct ClientSendError {
uri: Uri,
source: hyper_util::client::legacy::Error,
}
impl ClientSendError {
pub fn is_connect_error(&self) -> bool {
self.source.is_connect()
}
fn http_info(&self) -> Option<HttpInfo> {
let mut exts = Extensions::new();
self.source.connect_info()?.get_extras(&mut exts);
exts.remove::<HttpInfo>()
}
}
impl std::fmt::Display for ClientSendError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
// NOTE: we can use `std::error::Report` instead once it's stabilized.
let detail = error_reporter::Report::new(&self.source);
match self.http_info() {
Some(http_info) => {
write!(
f,
"error sending request from {src} for {uri} ({dst}): {detail}",
src = http_info.local_addr(),
uri = self.uri,
dst = http_info.remote_addr(),
detail = detail,
)
}
None => {
write!(
f,
"error sending request for url ({uri}): {detail}",
uri = self.uri,
detail = detail,
)
}
}
}
}
impl std::error::Error for ClientSendError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.source)
}
}
impl Client {
pub async fn send(
self,
mut req: http::Request<ReqBody>,
) -> Result<http::Response<ResBody>, ClientSendError> {
req
.headers_mut()
.entry(USER_AGENT)
.or_insert_with(|| self.user_agent.clone());
req.headers_mut().entry(ACCEPT).or_insert(STAR_STAR);
if let Some(auth) = self.proxies.http_forward_auth(req.uri()) {
req.headers_mut().insert(PROXY_AUTHORIZATION, auth.clone());
}
let uri = req.uri().clone();
let resp = self
.inner
.oneshot(req)
.await
.map_err(|e| ClientSendError { uri, source: e })?;
Ok(resp.map(|b| b.map_err(|e| anyhow!(e)).boxed()))
}
}
pub type ReqBody = http_body_util::combinators::BoxBody<Bytes, Error>;
pub type ResBody = http_body_util::combinators::BoxBody<Bytes, Error>;
/// Copied from https://github.com/seanmonstar/reqwest/blob/b9d62a0323d96f11672a61a17bf8849baec00275/src/async_impl/request.rs#L572
/// Check the request URL for a "username:password" type authority, and if
/// found, remove it from the URL and return it.
pub fn extract_authority(url: &mut Url) -> Option<(String, Option<String>)> {
use percent_encoding::percent_decode;
if url.has_authority() {
let username: String = percent_decode(url.username().as_bytes())
.decode_utf8()
.ok()?
.into();
let password = url.password().and_then(|pass| {
percent_decode(pass.as_bytes())
.decode_utf8()
.ok()
.map(String::from)
});
if !username.is_empty() || password.is_some() {
url
.set_username("")
.expect("has_authority means set_username shouldn't fail");
url
.set_password(None)
.expect("has_authority means set_password shouldn't fail");
return Some((username, password));
}
}
None
}