// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use std::borrow::Cow; use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; use std::task::Poll; use bytes::Bytes; use deno_core::error::AnyError; use deno_core::futures::future::poll_fn; use deno_core::op; use deno_core::serde::Serialize; use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::JsBuffer; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_net::raw::take_network_stream_resource; use deno_net::raw::NetworkStream; use h2; use h2::RecvStream; use http; use http::request::Parts; use http::HeaderMap; use http::Response; use http::StatusCode; use reqwest::header::HeaderName; use reqwest::header::HeaderValue; use url::Url; pub struct Http2Client { pub client: AsyncRefCell>, pub url: Url, } impl Resource for Http2Client { fn name(&self) -> Cow { "http2Client".into() } } #[derive(Debug)] pub struct Http2ClientConn { pub conn: AsyncRefCell>, cancel_handle: CancelHandle, } impl Resource for Http2ClientConn { fn name(&self) -> Cow { "http2ClientConnection".into() } fn close(self: Rc) { self.cancel_handle.cancel() } } #[derive(Debug)] pub struct Http2ClientStream { pub response: AsyncRefCell, pub stream: AsyncRefCell>, } impl Resource for Http2ClientStream { fn name(&self) -> Cow { "http2ClientStream".into() } } #[derive(Debug)] pub struct Http2ClientResponseBody { pub body: AsyncRefCell, pub trailers_rx: AsyncRefCell>>>, pub trailers_tx: AsyncRefCell>>>, } impl Resource for Http2ClientResponseBody { fn name(&self) -> Cow { "http2ClientResponseBody".into() } } #[derive(Debug)] pub struct Http2ServerConnection { pub conn: AsyncRefCell>, } impl Resource for Http2ServerConnection { fn name(&self) -> Cow { "http2ServerConnection".into() } } pub struct Http2ServerSendResponse { pub send_response: AsyncRefCell>, } impl Resource for Http2ServerSendResponse { fn name(&self) -> Cow { "http2ServerSendResponse".into() } } #[op] pub async fn op_http2_connect( state: Rc>, rid: ResourceId, url: String, ) -> Result<(ResourceId, ResourceId), AnyError> { // No permission check necessary because we're using an existing connection let network_stream = { let mut state = state.borrow_mut(); take_network_stream_resource(&mut state.resource_table, rid)? }; let url = Url::parse(&url)?; let (client, conn) = h2::client::handshake(network_stream).await?; let mut state = state.borrow_mut(); let client_rid = state.resource_table.add(Http2Client { client: AsyncRefCell::new(client), url, }); let conn_rid = state.resource_table.add(Http2ClientConn { conn: AsyncRefCell::new(conn), cancel_handle: CancelHandle::new(), }); Ok((client_rid, conn_rid)) } #[op] pub async fn op_http2_listen( state: Rc>, rid: ResourceId, ) -> Result { let stream = take_network_stream_resource(&mut state.borrow_mut().resource_table, rid)?; let conn = h2::server::handshake(stream).await?; Ok( state .borrow_mut() .resource_table .add(Http2ServerConnection { conn: AsyncRefCell::new(conn), }), ) } #[op] pub async fn op_http2_accept( state: Rc>, rid: ResourceId, ) -> Result< Option<(Vec<(ByteString, ByteString)>, ResourceId, ResourceId)>, AnyError, > { let resource = state .borrow() .resource_table .get::(rid)?; let mut conn = RcRef::map(&resource, |r| &r.conn).borrow_mut().await; if let Some(res) = conn.accept().await { let (req, resp) = res?; let (parts, body) = req.into_parts(); let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel(); let stm = state .borrow_mut() .resource_table .add(Http2ClientResponseBody { body: AsyncRefCell::new(body), trailers_rx: AsyncRefCell::new(Some(trailers_rx)), trailers_tx: AsyncRefCell::new(Some(trailers_tx)), }); let Parts { uri, method, headers, .. } = parts; let mut req_headers = Vec::with_capacity(headers.len() + 4); req_headers.push(( ByteString::from(":method"), ByteString::from(method.as_str()), )); req_headers.push(( ByteString::from(":scheme"), ByteString::from(uri.scheme().map(|s| s.as_str()).unwrap_or("http")), )); req_headers.push(( ByteString::from(":path"), ByteString::from(uri.path_and_query().map(|p| p.as_str()).unwrap_or("")), )); req_headers.push(( ByteString::from(":authority"), ByteString::from(uri.authority().map(|a| a.as_str()).unwrap_or("")), )); for (key, val) in headers.iter() { req_headers.push((key.as_str().into(), val.as_bytes().into())); } let resp = state .borrow_mut() .resource_table .add(Http2ServerSendResponse { send_response: AsyncRefCell::new(resp), }); Ok(Some((req_headers, stm, resp))) } else { Ok(None) } } #[op] pub async fn op_http2_send_response( state: Rc>, rid: ResourceId, status: u16, headers: Vec<(ByteString, ByteString)>, ) -> Result<(ResourceId, u32), AnyError> { let resource = state .borrow() .resource_table .get::(rid)?; let mut send_response = RcRef::map(resource, |r| &r.send_response) .borrow_mut() .await; let mut response = Response::new(()); if let Ok(status) = StatusCode::from_u16(status) { *response.status_mut() = status; } for (name, value) in headers { response.headers_mut().append( HeaderName::from_lowercase(&name).unwrap(), HeaderValue::from_bytes(&value).unwrap(), ); } let stream = send_response.send_response(response, false)?; let stream_id = stream.stream_id(); Ok((rid, stream_id.into())) } #[op] pub async fn op_http2_poll_client_connection( state: Rc>, rid: ResourceId, ) -> Result<(), AnyError> { let resource = state.borrow().resource_table.get::(rid)?; let cancel_handle = RcRef::map(resource.clone(), |this| &this.cancel_handle); let mut conn = RcRef::map(resource, |this| &this.conn).borrow_mut().await; match (&mut *conn).or_cancel(cancel_handle).await { Ok(result) => result?, Err(_) => { // TODO(bartlomieju): probably need a better mechanism for closing the connection // cancelled } } Ok(()) } #[op] pub async fn op_http2_client_request( state: Rc>, client_rid: ResourceId, // TODO(bartlomieju): maybe use a vector with fixed layout to save sending // 4 strings of keys? mut pseudo_headers: HashMap, headers: Vec<(ByteString, ByteString)>, ) -> Result<(ResourceId, u32), AnyError> { let resource = state .borrow() .resource_table .get::(client_rid)?; let url = resource.url.clone(); let pseudo_path = pseudo_headers.remove(":path").unwrap_or("/".to_string()); let pseudo_method = pseudo_headers .remove(":method") .unwrap_or("GET".to_string()); // TODO(bartlomieju): handle all pseudo-headers (:authority, :scheme) let _pseudo_authority = pseudo_headers .remove(":authority") .unwrap_or("/".to_string()); let _pseudo_scheme = pseudo_headers .remove(":scheme") .unwrap_or("http".to_string()); let url = url.join(&pseudo_path)?; let mut req = http::Request::builder() .uri(url.as_str()) .method(pseudo_method.as_str()); for (name, value) in headers { req.headers_mut().unwrap().append( HeaderName::from_lowercase(&name).unwrap(), HeaderValue::from_bytes(&value).unwrap(), ); } let request = req.body(()).unwrap(); let resource = { let state = state.borrow(); state.resource_table.get::(client_rid)? }; let mut client = RcRef::map(&resource, |r| &r.client).borrow_mut().await; poll_fn(|cx| client.poll_ready(cx)).await?; let (response, stream) = client.send_request(request, false).unwrap(); let stream_id = stream.stream_id(); let stream_rid = state.borrow_mut().resource_table.add(Http2ClientStream { response: AsyncRefCell::new(response), stream: AsyncRefCell::new(stream), }); Ok((stream_rid, stream_id.into())) } #[op] pub async fn op_http2_client_send_data( state: Rc>, stream_rid: ResourceId, data: JsBuffer, ) -> Result<(), AnyError> { let resource = state .borrow() .resource_table .get::(stream_rid)?; let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; // TODO(bartlomieju): handle end of stream stream.send_data(bytes::Bytes::from(data), false)?; Ok(()) } #[op] pub async fn op_http2_client_end_stream( state: Rc>, stream_rid: ResourceId, ) -> Result<(), AnyError> { let resource = state .borrow() .resource_table .get::(stream_rid)?; let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; // TODO(bartlomieju): handle end of stream stream.send_data(bytes::Bytes::from(vec![]), true)?; Ok(()) } #[op] pub async fn op_http2_client_reset_stream( state: Rc>, stream_rid: ResourceId, code: u32, ) -> Result<(), AnyError> { let resource = state .borrow() .resource_table .get::(stream_rid)?; let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; stream.send_reset(h2::Reason::from(code)); Ok(()) } #[op] pub async fn op_http2_client_send_trailers( state: Rc>, stream_rid: ResourceId, trailers: Vec<(ByteString, ByteString)>, ) -> Result<(), AnyError> { let resource = state .borrow() .resource_table .get::(stream_rid)?; let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; let mut trailers_map = http::HeaderMap::new(); for (name, value) in trailers { trailers_map.insert( HeaderName::from_bytes(&name).unwrap(), HeaderValue::from_bytes(&value).unwrap(), ); } stream.send_trailers(trailers_map)?; Ok(()) } #[derive(Serialize)] #[serde(rename_all = "camelCase")] pub struct Http2ClientResponse { headers: Vec<(ByteString, ByteString)>, body_rid: ResourceId, status_code: u16, } #[op] pub async fn op_http2_client_get_response( state: Rc>, stream_rid: ResourceId, ) -> Result { let resource = state .borrow() .resource_table .get::(stream_rid)?; let mut response_future = RcRef::map(&resource, |r| &r.response).borrow_mut().await; let response = (&mut *response_future).await?; let (parts, body) = response.into_parts(); let status = parts.status; let mut res_headers = Vec::new(); for (key, val) in parts.headers.iter() { res_headers.push((key.as_str().into(), val.as_bytes().into())); } let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel(); let body_rid = state .borrow_mut() .resource_table .add(Http2ClientResponseBody { body: AsyncRefCell::new(body), trailers_rx: AsyncRefCell::new(Some(trailers_rx)), trailers_tx: AsyncRefCell::new(Some(trailers_tx)), }); Ok(Http2ClientResponse { headers: res_headers, body_rid, status_code: status.into(), }) } enum DataOrTrailers { Data(Bytes), Trailers(HeaderMap), Eof, } fn poll_data_or_trailers( cx: &mut std::task::Context, body: &mut RecvStream, ) -> Poll> { loop { if let Poll::Ready(trailers) = body.poll_trailers(cx) { if let Some(trailers) = trailers? { return Poll::Ready(Ok(DataOrTrailers::Trailers(trailers))); } else { return Poll::Ready(Ok(DataOrTrailers::Eof)); } } if let Poll::Ready(data) = body.poll_data(cx) { if let Some(data) = data { return Poll::Ready(Ok(DataOrTrailers::Data(data?))); } // If data is None, loop one more time to check for trailers continue; } // Return pending here as poll_data will keep the waker return Poll::Pending; } } #[op] pub async fn op_http2_client_get_response_body_chunk( state: Rc>, body_rid: ResourceId, ) -> Result<(Option>, bool), AnyError> { let resource = state .borrow() .resource_table .get::(body_rid)?; let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; loop { match poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await? { DataOrTrailers::Data(data) => { return Ok((Some(data.to_vec()), false)); } DataOrTrailers::Trailers(trailers) => { println!("{trailers:?}"); if let Some(trailers_tx) = RcRef::map(&resource, |r| &r.trailers_tx) .borrow_mut() .await .take() { _ = trailers_tx.send(Some(trailers)); }; continue; } DataOrTrailers::Eof => { RcRef::map(&resource, |r| &r.trailers_tx) .borrow_mut() .await .take(); return Ok((None, true)); } }; } } #[op] pub async fn op_http2_client_get_response_trailers( state: Rc>, body_rid: ResourceId, ) -> Result>, AnyError> { let resource = state .borrow() .resource_table .get::(body_rid)?; let trailers = RcRef::map(&resource, |r| &r.trailers_rx) .borrow_mut() .await .take(); if let Some(trailers) = trailers { if let Ok(Some(trailers)) = trailers.await { let mut v = Vec::with_capacity(trailers.len()); for (key, value) in trailers.iter() { v.push(( ByteString::from(key.as_str()), ByteString::from(value.as_bytes()), )); } Ok(Some(v)) } else { Ok(None) } } else { Ok(None) } }