// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use std::borrow::Cow; use std::cell::RefCell; use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::task::Context; use std::task::Poll; use bytes::Bytes; use deno_core::anyhow; use deno_core::anyhow::Error; use deno_core::error::bad_resource; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::stream::Peekable; use deno_core::futures::Future; use deno_core::futures::FutureExt; use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::op2; use deno_core::serde::Serialize; use deno_core::unsync::spawn; use deno_core::url::Url; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_fetch::ResBody; use deno_net::io::TcpStreamResource; use http::header::HeaderMap; use http::header::HeaderName; use http::header::HeaderValue; use http::header::AUTHORIZATION; use http::header::CONTENT_LENGTH; use http::Method; use http::Response; use http_body_util::BodyExt; use hyper::body::Frame; use hyper::body::Incoming; use hyper_util::rt::TokioIo; use std::cmp::min; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; #[derive(Default, Serialize)] #[serde(rename_all = "camelCase")] pub struct NodeHttpResponse { pub status: u16, pub status_text: String, pub headers: Vec<(ByteString, ByteString)>, pub url: String, pub response_rid: ResourceId, pub content_length: Option, pub remote_addr_ip: Option, pub remote_addr_port: Option, pub error: Option, } pub struct NodeHttpConnReady { recv: tokio::sync::oneshot::Receiver<()>, } impl deno_core::Resource for NodeHttpConnReady { fn name(&self) -> Cow { "nodeHttpConnReady".into() } } pub struct NodeHttpClientResponse { response: Pin, Error>> + Send>>, url: String, connection_started: Arc, } impl deno_core::Resource for NodeHttpClientResponse { fn name(&self) -> Cow { "nodeHttpClientResponse".into() } } #[op2(async)] #[serde] pub async fn op_node_http_request_with_conn

( state: Rc>, #[serde] method: ByteString, #[string] url: String, #[serde] headers: Vec<(ByteString, ByteString)>, #[smi] body: Option, #[smi] conn_rid: ResourceId, ) -> Result<(ResourceId, ResourceId), AnyError> where P: crate::NodePermissions + 'static, { // Establish the connection/client. let resource_rc = state .borrow_mut() .resource_table .take::(conn_rid)?; let resource = Rc::try_unwrap(resource_rc) .map_err(|_| bad_resource("TCP stream is currently in use"))?; let (read_half, write_half) = resource.into_inner(); let tcp_stream = read_half.reunite(write_half)?; let io = TokioIo::new(tcp_stream); let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?; let connection_started = Arc::new(AtomicBool::new(false)); let conn_start = connection_started.clone(); let (notify, receiver) = tokio::sync::oneshot::channel::<()>(); // Spawn a task to poll the connection, driving the HTTP state let _handle = tokio::task::spawn(async move { eprintln!("rs: connection started"); conn_start.store(true, std::sync::atomic::Ordering::Relaxed); let _ = notify.send(()); conn.await?; eprintln!("rs: connection completed"); Ok::<_, AnyError>(()) }); // Create the request. let method = Method::from_bytes(&method)?; let mut url_parsed = Url::parse(&url)?; let maybe_authority = deno_fetch::extract_authority(&mut url_parsed); { let mut state_ = state.borrow_mut(); let permissions = state_.borrow_mut::

(); permissions.check_net_url(&url_parsed, "ClientRequest")?; } let mut header_map = HeaderMap::new(); for (key, value) in headers { let name = HeaderName::from_bytes(&key) .map_err(|err| type_error(err.to_string()))?; let v = HeaderValue::from_bytes(&value) .map_err(|err| type_error(err.to_string()))?; header_map.append(name, v); } let (body, con_len) = if let Some(body) = body { ( BodyExt::boxed(NodeHttpResourceToBodyAdapter::new( state.borrow_mut().resource_table.take_any(body)?, )), None, ) } else { // POST and PUT requests should always have a 0 length content-length, // if there is no body. https://fetch.spec.whatwg.org/#http-network-or-cache-fetch let len = if matches!(method, Method::POST | Method::PUT) { Some(0) } else { None }; ( http_body_util::Empty::new() .map_err(|never| match never {}) .boxed(), len, ) }; let mut request = http::Request::new(body); *request.method_mut() = method.clone(); *request.uri_mut() = url_parsed .path() .to_string() .parse() .map_err(|_| type_error("Invalid URL"))?; *request.headers_mut() = header_map; if let Some((username, password)) = maybe_authority { request.headers_mut().insert( AUTHORIZATION, deno_fetch::basic_auth(&username, password.as_deref()), ); } if let Some(len) = con_len { request.headers_mut().insert(CONTENT_LENGTH, len.into()); } eprintln!("rs: sending request: {request:?}"); // let req_fut = sender.send_request(request); // let res = tokio::time::timeout(Duration::from_secs(10), req_fut).await??; let res = sender.send_request(request).map_err(Error::from).boxed(); let rid = state .borrow_mut() .resource_table .add(NodeHttpClientResponse { response: res, url: url.clone(), connection_started, }); let conn_rid = state .borrow_mut() .resource_table .add(NodeHttpConnReady { recv: receiver }); Ok((rid, conn_rid)) } #[op2(async)] #[serde] pub async fn op_node_http_wait_for_connection( state: Rc>, #[smi] rid: ResourceId, ) -> Result { let resource = state .borrow_mut() .resource_table .take::(rid)?; let resource = Rc::try_unwrap(resource).map_err(|_| bad_resource("NodeHttpConnReady"))?; resource.recv.await?; Ok(rid) } #[op2(async)] #[serde] pub async fn op_node_http_await_response( state: Rc>, #[smi] rid: ResourceId, ) -> Result { let resource = state .borrow_mut() .resource_table .take::(rid)?; let resource = Rc::try_unwrap(resource) .map_err(|_| bad_resource("NodeHttpClientResponse"))?; eprintln!( "rs: awaiting response: {}", resource .connection_started .load(std::sync::atomic::Ordering::Relaxed) ); let res = resource.response.await?; // let res = tokio::time::timeout(Duration::from_secs(10), req_fut).await??; eprintln!("rs: received response"); let status = res.status(); let mut res_headers = Vec::new(); for (key, val) in res.headers().iter() { res_headers.push((key.as_str().into(), val.as_bytes().into())); } let content_length = hyper::body::Body::size_hint(res.body()).exact(); let remote_addr = res .extensions() .get::() .map(|info| info.remote_addr()); let (remote_addr_ip, remote_addr_port) = if let Some(addr) = remote_addr { (Some(addr.ip().to_string()), Some(addr.port())) } else { (None, None) }; let (parts, body) = res.into_parts(); let body = body.map_err(deno_core::anyhow::Error::from); let body = body.boxed(); let res = http::Response::from_parts(parts, body); println!("res: {res:?}"); let response_rid = state .borrow_mut() .resource_table .add(NodeHttpResponseResource::new(res, content_length)); Ok(NodeHttpResponse { status: status.as_u16(), status_text: status.canonical_reason().unwrap_or("").to_string(), headers: res_headers, url: resource.url, response_rid, content_length, remote_addr_ip, remote_addr_port, error: None, }) } #[op2(async)] #[smi] pub async fn op_node_http_fetch_response_upgrade( state: Rc>, #[smi] rid: ResourceId, ) -> Result { let raw_response = state .borrow_mut() .resource_table .take::(rid)?; let raw_response = Rc::try_unwrap(raw_response) .expect("Someone is holding onto NodeHttpFetchResponseResource"); let (read, write) = tokio::io::duplex(1024); let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); let upgraded = raw_response.upgrade().await?; { // Stage 3: Pump the data let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(TokioIo::new(upgraded)); spawn(async move { let mut buf = [0; 1024]; loop { let read = upgraded_rx.read(&mut buf).await?; if read == 0 { read_tx.shutdown().await?; break; } read_tx.write_all(&buf[..read]).await?; } Ok::<_, AnyError>(()) }); spawn(async move { let mut buf = [0; 1024]; loop { let read = write_rx.read(&mut buf).await?; if read == 0 { break; } upgraded_tx.write_all(&buf[..read]).await?; } Ok::<_, AnyError>(()) }); } Ok( state .borrow_mut() .resource_table .add(UpgradeStream::new(read_rx, write_tx)), ) } struct UpgradeStream { read: AsyncRefCell>, write: AsyncRefCell>, cancel_handle: CancelHandle, } impl UpgradeStream { pub fn new( read: tokio::io::ReadHalf, write: tokio::io::WriteHalf, ) -> Self { Self { read: AsyncRefCell::new(read), write: AsyncRefCell::new(write), cancel_handle: CancelHandle::new(), } } async fn read(self: Rc, buf: &mut [u8]) -> Result { let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); async { let read = RcRef::map(self, |this| &this.read); let mut read = read.borrow_mut().await; Ok(Pin::new(&mut *read).read(buf).await?) } .try_or_cancel(cancel_handle) .await } async fn write(self: Rc, buf: &[u8]) -> Result { let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); async { let write = RcRef::map(self, |this| &this.write); let mut write = write.borrow_mut().await; Ok(Pin::new(&mut *write).write(buf).await?) } .try_or_cancel(cancel_handle) .await } } impl Resource for UpgradeStream { fn name(&self) -> Cow { "fetchUpgradedStream".into() } deno_core::impl_readable_byob!(); deno_core::impl_writable!(); fn close(self: Rc) { self.cancel_handle.cancel(); } } type BytesStream = Pin> + Unpin>>; pub enum NodeHttpFetchResponseReader { Start(http::Response), BodyReader(Peekable), } impl Default for NodeHttpFetchResponseReader { fn default() -> Self { let stream: BytesStream = Box::pin(deno_core::futures::stream::empty()); Self::BodyReader(stream.peekable()) } } #[derive(Debug)] pub struct NodeHttpResponseResource { pub response_reader: AsyncRefCell, pub cancel: CancelHandle, pub size: Option, } impl NodeHttpResponseResource { pub fn new(response: http::Response, size: Option) -> Self { Self { response_reader: AsyncRefCell::new(NodeHttpFetchResponseReader::Start( response, )), cancel: CancelHandle::default(), size, } } pub async fn upgrade(self) -> Result { let reader = self.response_reader.into_inner(); match reader { NodeHttpFetchResponseReader::Start(resp) => { Ok(hyper::upgrade::on(resp).await?) } _ => unreachable!(), } } } impl Resource for NodeHttpResponseResource { fn name(&self) -> Cow { "fetchResponse".into() } fn read(self: Rc, limit: usize) -> AsyncResult { Box::pin(async move { let mut reader = RcRef::map(&self, |r| &r.response_reader).borrow_mut().await; let body = loop { match &mut *reader { NodeHttpFetchResponseReader::BodyReader(reader) => break reader, NodeHttpFetchResponseReader::Start(_) => {} } match std::mem::take(&mut *reader) { NodeHttpFetchResponseReader::Start(resp) => { let stream: BytesStream = Box::pin(resp.into_body().into_data_stream().map(|r| { r.map_err(|err| { std::io::Error::new(std::io::ErrorKind::Other, err) }) })); *reader = NodeHttpFetchResponseReader::BodyReader(stream.peekable()); } NodeHttpFetchResponseReader::BodyReader(_) => unreachable!(), } }; let fut = async move { let mut reader = Pin::new(body); loop { match reader.as_mut().peek_mut().await { Some(Ok(chunk)) if !chunk.is_empty() => { let len = min(limit, chunk.len()); let chunk = chunk.split_to(len); break Ok(chunk.into()); } // This unwrap is safe because `peek_mut()` returned `Some`, and thus // currently has a peeked value that can be synchronously returned // from `next()`. // // The future returned from `next()` is always ready, so we can // safely call `await` on it without creating a race condition. Some(_) => match reader.as_mut().next().await.unwrap() { Ok(chunk) => assert!(chunk.is_empty()), Err(err) => break Err(type_error(err.to_string())), }, None => break Ok(BufView::empty()), } } }; let cancel_handle = RcRef::map(self, |r| &r.cancel); fut.try_or_cancel(cancel_handle).await }) } fn size_hint(&self) -> (u64, Option) { (self.size.unwrap_or(0), self.size) } fn close(self: Rc) { self.cancel.cancel() } } #[allow(clippy::type_complexity)] pub struct NodeHttpResourceToBodyAdapter( Rc, Option>>>>, ); impl NodeHttpResourceToBodyAdapter { pub fn new(resource: Rc) -> Self { let future = resource.clone().read(64 * 1024); Self(resource, Some(future)) } } // SAFETY: we only use this on a single-threaded executor unsafe impl Send for NodeHttpResourceToBodyAdapter {} // SAFETY: we only use this on a single-threaded executor unsafe impl Sync for NodeHttpResourceToBodyAdapter {} impl Stream for NodeHttpResourceToBodyAdapter { type Item = Result; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { let this = self.get_mut(); if let Some(mut fut) = this.1.take() { match fut.poll_unpin(cx) { Poll::Pending => { this.1 = Some(fut); Poll::Pending } Poll::Ready(res) => match res { Ok(buf) if buf.is_empty() => Poll::Ready(None), Ok(buf) => { println!("rs: reading: {len}", len = buf.len()); this.1 = Some(this.0.clone().read(64 * 1024)); Poll::Ready(Some(Ok(buf.to_vec().into()))) } Err(err) => Poll::Ready(Some(Err(err))), }, } } else { Poll::Ready(None) } } } impl hyper::body::Body for NodeHttpResourceToBodyAdapter { type Data = Bytes; type Error = anyhow::Error; fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { match self.poll_next(cx) { Poll::Ready(Some(res)) => Poll::Ready(Some(res.map(Frame::data))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } } impl Drop for NodeHttpResourceToBodyAdapter { fn drop(&mut self) { self.0.clone().close() } }