// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use std::cell::Cell; use std::cell::Ref; use std::cell::RefCell; use std::cell::RefMut; use std::future::Future; use std::mem::ManuallyDrop; use std::pin::Pin; use std::rc::Rc; use std::task::Context; use std::task::Poll; use std::task::Waker; use deno_core::futures::ready; use deno_core::BufView; use deno_core::OpState; use deno_core::ResourceId; use http::request::Parts; use hyper::body::Body; use hyper::body::Frame; use hyper::body::Incoming; use hyper::body::SizeHint; use hyper::header::HeaderMap; use hyper::upgrade::OnUpgrade; use scopeguard::guard; use scopeguard::ScopeGuard; use tokio::sync::oneshot; use crate::request_properties::HttpConnectionProperties; use crate::response_body::ResponseBytesInner; use crate::response_body::ResponseStreamResult; pub type Request = hyper::Request; pub type Response = hyper::Response; #[cfg(feature = "__http_tracing")] pub static RECORD_COUNT: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); macro_rules! http_general_trace { ($($args:expr),*) => { #[cfg(feature = "__http_tracing")] { let count = $crate::service::RECORD_COUNT .load(std::sync::atomic::Ordering::SeqCst); println!( "HTTP [+{count}]: {}", format!($($args),*), ); } }; } macro_rules! http_trace { ($record:expr $(, $args:expr)*) => { #[cfg(feature = "__http_tracing")] { let count = $crate::service::RECORD_COUNT .load(std::sync::atomic::Ordering::SeqCst); println!( "HTTP [+{count}] id={:p} strong={}: {}", $record, std::rc::Rc::strong_count(&$record), format!($($args),*), ); } }; } pub(crate) use http_general_trace; pub(crate) use http_trace; pub(crate) struct HttpServerStateInner { pool: Vec<(Rc, HeaderMap)>, } /// A signalling version of `Rc` that allows one to poll for when all other references /// to the `Rc` have been dropped. #[repr(transparent)] pub(crate) struct SignallingRc(Rc<(T, Cell>)>); impl SignallingRc { #[inline] pub fn new(t: T) -> Self { Self(Rc::new((t, Default::default()))) } #[inline] pub fn strong_count(&self) -> usize { Rc::strong_count(&self.0) } /// Resolves when this is the only remaining reference. #[inline] pub fn poll_complete(&self, cx: &mut Context<'_>) -> Poll<()> { if Rc::strong_count(&self.0) == 1 { Poll::Ready(()) } else { self.0 .1.set(Some(cx.waker().clone())); Poll::Pending } } } impl Clone for SignallingRc { #[inline] fn clone(&self) -> Self { Self(self.0.clone()) } } impl Drop for SignallingRc { #[inline] fn drop(&mut self) { // Trigger the waker iff the refcount is about to become 1. if Rc::strong_count(&self.0) == 2 { if let Some(waker) = self.0 .1.take() { waker.wake(); } } } } impl std::ops::Deref for SignallingRc { type Target = T; #[inline] fn deref(&self) -> &Self::Target { &self.0 .0 } } pub(crate) struct HttpServerState(RefCell); impl HttpServerState { pub fn new() -> SignallingRc { SignallingRc::new(Self(RefCell::new(HttpServerStateInner { pool: Vec::new(), }))) } } impl std::ops::Deref for HttpServerState { type Target = RefCell; fn deref(&self) -> &Self::Target { &self.0 } } enum RequestBodyState { Incoming(Incoming), Resource(#[allow(dead_code)] HttpRequestBodyAutocloser), } impl From for RequestBodyState { fn from(value: Incoming) -> Self { RequestBodyState::Incoming(value) } } /// Ensures that the request body closes itself when no longer needed. pub struct HttpRequestBodyAutocloser(ResourceId, Rc>); impl HttpRequestBodyAutocloser { pub fn new(res: ResourceId, op_state: Rc>) -> Self { Self(res, op_state) } } impl Drop for HttpRequestBodyAutocloser { fn drop(&mut self) { if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) { res.close(); } } } pub(crate) async fn handle_request( request: Request, request_info: HttpConnectionProperties, server_state: SignallingRc, // Keep server alive for duration of this future. tx: tokio::sync::mpsc::Sender>, ) -> Result { // If the underlying TCP connection is closed, this future will be dropped // and execution could stop at any await point. // The HttpRecord must live until JavaScript is done processing so is wrapped // in an Rc. The guard ensures unneeded resources are freed at cancellation. let guarded_record = guard( HttpRecord::new(request, request_info, server_state), HttpRecord::cancel, ); // Clone HttpRecord and send to JavaScript for processing. // Safe to unwrap as channel receiver is never closed. tx.send(guarded_record.clone()).await.unwrap(); // Wait for JavaScript handler to return request. http_trace!(*guarded_record, "handle_request response_ready.await"); guarded_record.response_ready().await; // Defuse the guard. Must not await after this point. let record = ScopeGuard::into_inner(guarded_record); http_trace!(record, "handle_request complete"); let response = record.into_response(); Ok(response) } #[derive(Debug, thiserror::Error)] #[error("upgrade unavailable")] pub struct UpgradeUnavailableError; struct HttpRecordInner { server_state: SignallingRc, closed_channel: Option>, request_info: HttpConnectionProperties, request_parts: http::request::Parts, request_body: Option, response_parts: Option, response_ready: bool, response_waker: Option, response_body: ResponseBytesInner, response_body_finished: bool, response_body_waker: Option, trailers: Option, been_dropped: bool, finished: bool, needs_close_after_finish: bool, } pub struct HttpRecord(RefCell>); #[cfg(feature = "__http_tracing")] impl Drop for HttpRecord { fn drop(&mut self) { RECORD_COUNT .fetch_sub(1, std::sync::atomic::Ordering::SeqCst) .checked_sub(1) .expect("Count went below zero"); http_general_trace!("HttpRecord::drop"); } } impl HttpRecord { fn new( request: Request, request_info: HttpConnectionProperties, server_state: SignallingRc, ) -> Rc { let (request_parts, request_body) = request.into_parts(); let request_body = Some(request_body.into()); let (mut response_parts, _) = http::Response::new(()).into_parts(); let record = if let Some((record, headers)) = server_state.borrow_mut().pool.pop() { response_parts.headers = headers; http_trace!(record, "HttpRecord::reuse"); record } else { #[cfg(feature = "__http_tracing")] { RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); } #[allow(clippy::let_and_return)] let record = Rc::new(Self(RefCell::new(None))); http_trace!(record, "HttpRecord::new"); record }; *record.0.borrow_mut() = Some(HttpRecordInner { server_state, request_info, request_parts, request_body, response_parts: Some(response_parts), response_ready: false, response_waker: None, response_body: ResponseBytesInner::Empty, response_body_finished: false, response_body_waker: None, trailers: None, closed_channel: None, been_dropped: false, finished: false, needs_close_after_finish: false, }); record } fn finish(self: Rc) { http_trace!(self, "HttpRecord::finish"); let mut inner = self.self_mut(); inner.response_body_finished = true; let response_body_waker = inner.response_body_waker.take(); let needs_close_after_finish = inner.needs_close_after_finish; drop(inner); if let Some(waker) = response_body_waker { waker.wake(); } if !needs_close_after_finish { self.recycle(); } } pub fn close_after_finish(self: Rc) { debug_assert!(self.self_ref().needs_close_after_finish); let mut inner = self.self_mut(); inner.needs_close_after_finish = false; if !inner.finished { drop(inner); self.recycle(); } } pub fn needs_close_after_finish(&self) -> RefMut<'_, bool> { RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish) } pub fn on_cancel(&self, sender: oneshot::Sender<()>) { self.self_mut().closed_channel = Some(sender); } fn recycle(self: Rc) { assert!( Rc::strong_count(&self) == 1, "HTTP state error: Expected to be last strong reference" ); let HttpRecordInner { server_state, request_parts: Parts { mut headers, .. }, .. } = self.0.borrow_mut().take().unwrap(); let inflight = server_state.strong_count(); http_trace!(self, "HttpRecord::recycle inflight={}", inflight); // Keep a buffer of allocations on hand to be reused by incoming requests. // Estimated target size is 16 + 1/8 the number of inflight requests. let target = 16 + (inflight >> 3); let pool = &mut server_state.borrow_mut().pool; if target > pool.len() { headers.clear(); pool.push((self, headers)); } else if target < pool.len() - 8 { pool.truncate(target); } } fn self_ref(&self) -> Ref<'_, HttpRecordInner> { Ref::map(self.0.borrow(), |option| option.as_ref().unwrap()) } fn self_mut(&self) -> RefMut<'_, HttpRecordInner> { RefMut::map(self.0.borrow_mut(), |option| option.as_mut().unwrap()) } /// Perform the Hyper upgrade on this record. pub fn upgrade(&self) -> Result { // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit self .self_mut() .request_parts .extensions .remove::() .ok_or(UpgradeUnavailableError) } /// Take the Hyper body from this record. pub fn take_request_body(&self) -> Option { let body_holder = &mut self.self_mut().request_body; let body = body_holder.take(); match body { Some(RequestBodyState::Incoming(body)) => Some(body), x => { *body_holder = x; None } } } /// Replace the request body with a resource ID and the OpState we'll need to shut it down. /// We cannot keep just the resource itself, as JS code might be reading from the resource ID /// to generate the response data (requiring us to keep it in the resource table). pub fn put_resource(&self, res: HttpRequestBodyAutocloser) { self.self_mut().request_body = Some(RequestBodyState::Resource(res)); } /// Cleanup resources not needed after the future is dropped. fn cancel(self: Rc) { http_trace!(self, "HttpRecord::cancel"); let mut inner = self.self_mut(); if inner.response_ready { // Future dropped between wake() and async fn resuming. drop(inner); self.finish(); return; } inner.been_dropped = true; // The request body might include actual resources. inner.request_body.take(); if let Some(closed_channel) = inner.closed_channel.take() { let _ = closed_channel.send(()); } } /// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well). pub fn complete(self: Rc) { http_trace!(self, "HttpRecord::complete"); let mut inner = self.self_mut(); assert!( !inner.response_ready, "HTTP state error: Entry has already been completed" ); if inner.been_dropped { drop(inner); self.finish(); return; } inner.response_ready = true; if let Some(waker) = inner.response_waker.take() { drop(inner); waker.wake(); } } fn take_response_body(&self) -> ResponseBytesInner { let mut inner = self.self_mut(); debug_assert!( !matches!(inner.response_body, ResponseBytesInner::Done), "HTTP state error: response body already complete" ); std::mem::replace(&mut inner.response_body, ResponseBytesInner::Done) } /// Has the future for this record been dropped? ie, has the underlying TCP connection /// been closed? pub fn cancelled(&self) -> bool { self.self_ref().been_dropped } /// Get a mutable reference to the response status and headers. pub fn response_parts(&self) -> RefMut<'_, http::response::Parts> { RefMut::map(self.self_mut(), |inner| { inner.response_parts.as_mut().unwrap() }) } /// Get a mutable reference to the trailers. pub fn trailers(&self) -> RefMut<'_, Option> { RefMut::map(self.self_mut(), |inner| &mut inner.trailers) } pub fn set_response_body(&self, response_body: ResponseBytesInner) { let mut inner = self.self_mut(); debug_assert!(matches!(inner.response_body, ResponseBytesInner::Empty)); inner.response_body = response_body; } /// Take the response. fn into_response(self: Rc) -> Response { let parts = self.self_mut().response_parts.take().unwrap(); let body = HttpRecordResponse(ManuallyDrop::new(self)); Response::from_parts(parts, body) } /// Get a reference to the connection properties. pub fn request_info(&self) -> Ref<'_, HttpConnectionProperties> { Ref::map(self.self_ref(), |inner| &inner.request_info) } /// Get a reference to the request parts. pub fn request_parts(&self) -> Ref<'_, Parts> { Ref::map(self.self_ref(), |inner| &inner.request_parts) } /// Resolves when response head is ready. fn response_ready(&self) -> impl Future + '_ { struct HttpRecordReady<'a>(&'a HttpRecord); impl<'a> Future for HttpRecordReady<'a> { type Output = (); fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { let mut mut_self = self.0.self_mut(); if mut_self.response_ready { return Poll::Ready(()); } mut_self.response_waker = Some(cx.waker().clone()); Poll::Pending } } HttpRecordReady(self) } /// Resolves when response body has finished streaming. Returns true if the /// response completed. pub fn response_body_finished(&self) -> impl Future + '_ { struct HttpRecordFinished<'a>(&'a HttpRecord); impl<'a> Future for HttpRecordFinished<'a> { type Output = bool; fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll { let mut mut_self = self.0.self_mut(); if mut_self.response_body_finished { // If we sent the response body and the trailers, this body completed successfully return Poll::Ready( mut_self.response_body.is_complete() && mut_self.trailers.is_none(), ); } mut_self.response_body_waker = Some(cx.waker().clone()); Poll::Pending } } HttpRecordFinished(self) } } #[repr(transparent)] pub struct HttpRecordResponse(ManuallyDrop>); impl Body for HttpRecordResponse { type Data = BufView; type Error = deno_core::error::AnyError; fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>>> { use crate::response_body::PollFrame; let record = &self.0; let res = loop { let mut inner = record.self_mut(); let res = match &mut inner.response_body { ResponseBytesInner::Done | ResponseBytesInner::Empty => { if let Some(trailers) = inner.trailers.take() { return Poll::Ready(Some(Ok(Frame::trailers(trailers)))); } unreachable!() } ResponseBytesInner::Bytes(..) => { drop(inner); let ResponseBytesInner::Bytes(data) = record.take_response_body() else { unreachable!(); }; return Poll::Ready(Some(Ok(Frame::data(data)))); } ResponseBytesInner::UncompressedStream(stm) => { ready!(Pin::new(stm).poll_frame(cx)) } ResponseBytesInner::GZipStream(stm) => { ready!(Pin::new(stm.as_mut()).poll_frame(cx)) } ResponseBytesInner::BrotliStream(stm) => { ready!(Pin::new(stm.as_mut()).poll_frame(cx)) } }; // This is where we retry the NoData response if matches!(res, ResponseStreamResult::NoData) { continue; } break res; }; if matches!(res, ResponseStreamResult::EndOfStream) { if let Some(trailers) = record.self_mut().trailers.take() { return Poll::Ready(Some(Ok(Frame::trailers(trailers)))); } record.take_response_body(); } Poll::Ready(res.into()) } fn is_end_stream(&self) -> bool { let inner = self.0.self_ref(); matches!( inner.response_body, ResponseBytesInner::Done | ResponseBytesInner::Empty ) && inner.trailers.is_none() } fn size_hint(&self) -> SizeHint { // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through // anyways just in case hyper needs it. self.0.self_ref().response_body.size_hint() } } impl Drop for HttpRecordResponse { fn drop(&mut self) { // SAFETY: this ManuallyDrop is not used again. let record = unsafe { ManuallyDrop::take(&mut self.0) }; http_trace!(record, "HttpRecordResponse::drop"); record.finish(); } } #[cfg(test)] mod tests { use std::error::Error as StdError; use bytes::Buf; use deno_net::raw::NetworkStreamType; use hyper::body::Body; use hyper::service::service_fn; use hyper::service::HttpService; use hyper_util::rt::TokioIo; use super::*; use crate::response_body::Compression; use crate::response_body::ResponseBytesInner; /// Execute client request on service and concurrently map the response. async fn serve_request( req: http::Request, service: S, map_response: impl FnOnce(hyper::Response) -> F, ) -> hyper::Result where B: Body + Send + 'static, // Send bound due to DuplexStream B::Data: Send, B::Error: Into>, S: HttpService, S::Error: Into>, S::ResBody: 'static, ::Error: Into>, F: std::future::Future>, { use hyper::client::conn::http1::handshake; use hyper::server::conn::http1::Builder; let (stream_client, stream_server) = tokio::io::duplex(16 * 1024); let conn_server = Builder::new().serve_connection(TokioIo::new(stream_server), service); let (mut sender, conn_client) = handshake(TokioIo::new(stream_client)).await?; let (res, _, _) = tokio::try_join!( async move { let res = sender.send_request(req).await?; map_response(res).await }, conn_server, conn_client, )?; Ok(res) } #[tokio::test] async fn test_handle_request() -> Result<(), deno_core::error::AnyError> { let (tx, mut rx) = tokio::sync::mpsc::channel(10); let server_state = HttpServerState::new(); let server_state_check = server_state.clone(); let request_info = HttpConnectionProperties { peer_address: "".into(), peer_port: None, local_port: None, stream_type: NetworkStreamType::Tcp, }; let svc = service_fn(move |req: hyper::Request| { handle_request( req, request_info.clone(), server_state.clone(), tx.clone(), ) }); let client_req = http::Request::builder().uri("/").body("".to_string())?; // Response produced by concurrent tasks tokio::try_join!( async move { // JavaScript handler produces response let record = rx.recv().await.unwrap(); record.set_response_body(ResponseBytesInner::from_vec( Compression::None, b"hello world".to_vec(), )); record.complete(); Ok(()) }, // Server connection executes service async move { serve_request(client_req, svc, |res| async { // Client reads the response use http_body_util::BodyExt; assert_eq!(res.status(), 200); let body = res.collect().await?.to_bytes(); assert_eq!(body.chunk(), b"hello world"); Ok(()) }) .await }, )?; assert_eq!(server_state_check.strong_count(), 1); Ok(()) } }