diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 9504a6fa48..db63601a05 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,15 +10,18 @@ use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; use crate::response_body::ResponseBytesInner; use crate::service::handle_request; +use crate::service::http_general_trace; use crate::service::http_trace; use crate::service::HttpRecord; use crate::service::HttpRecordResponse; use crate::service::HttpRequestBodyAutocloser; use crate::service::HttpServerState; +use crate::service::SignallingRc; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; use cache_control::CacheControl; use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; use deno_core::futures::TryFutureExt; use deno_core::op2; use deno_core::serde_v8::from_v8; @@ -924,7 +927,7 @@ where struct HttpLifetime { connection_cancel_handle: Rc, listen_cancel_handle: Rc, - server_state: Rc, + server_state: SignallingRc, } struct HttpJoinHandle { @@ -932,7 +935,7 @@ struct HttpJoinHandle { connection_cancel_handle: Rc, listen_cancel_handle: Rc, rx: AsyncRefCell>>, - server_state: Rc, + server_state: SignallingRc, } impl HttpJoinHandle { @@ -1179,6 +1182,7 @@ pub async fn op_http_close( .take::(rid)?; if graceful { + http_general_trace!("graceful shutdown"); // TODO(bartlomieju): replace with `state.feature_checker.check_or_exit` // once we phase out `check_or_exit_with_legacy_fallback` state @@ -1191,8 +1195,9 @@ pub async fn op_http_close( // In a graceful shutdown, we close the listener and allow all the remaining connections to drain join_handle.listen_cancel_handle().cancel(); - join_handle.server_state.drain().await; + poll_fn(|cx| join_handle.server_state.poll_complete(cx)).await; } else { + http_general_trace!("forceful shutdown"); // In a forceful shutdown, we close everything join_handle.listen_cancel_handle().cancel(); join_handle.connection_cancel_handle().cancel(); @@ -1200,6 +1205,8 @@ pub async fn op_http_close( tokio::task::yield_now().await; } + http_general_trace!("awaiting shutdown"); + let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle) .borrow_mut() .await; diff --git a/ext/http/service.rs b/ext/http/service.rs index fbd533cacd..c232962be4 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -17,6 +17,7 @@ use hyper1::upgrade::OnUpgrade; use scopeguard::guard; use scopeguard::ScopeGuard; +use std::cell::Cell; use std::cell::Ref; use std::cell::RefCell; use std::cell::RefMut; @@ -31,12 +32,34 @@ use std::task::Waker; pub type Request = hyper1::Request; pub type Response = hyper1::Response; +#[cfg(feature = "__http_tracing")] +pub static RECORD_COUNT: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + +macro_rules! http_general_trace { + ($($args:expr),*) => { + #[cfg(feature = "__http_tracing")] + { + let count = $crate::service::RECORD_COUNT + .load(std::sync::atomic::Ordering::SeqCst); + + println!( + "HTTP [+{count}]: {}", + format!($($args),*), + ); + } + }; +} + macro_rules! http_trace { ($record:expr $(, $args:expr)*) => { #[cfg(feature = "__http_tracing")] { + let count = $crate::service::RECORD_COUNT + .load(std::sync::atomic::Ordering::SeqCst); + println!( - "HTTP id={:p} strong={}: {}", + "HTTP [+{count}] id={:p} strong={}: {}", $record, std::rc::Rc::strong_count(&$record), format!($($args),*), @@ -45,44 +68,83 @@ macro_rules! http_trace { }; } +pub(crate) use http_general_trace; pub(crate) use http_trace; -struct HttpServerStateInner { +pub(crate) struct HttpServerStateInner { pool: Vec<(Rc, HeaderMap)>, - drain_waker: Option, } -pub struct HttpServerState(RefCell); +/// A signalling version of `Rc` that allows one to poll for when all other references +/// to the `Rc` have been dropped. +#[repr(transparent)] +pub(crate) struct SignallingRc(Rc<(T, Cell>)>); -impl HttpServerState { - pub fn new() -> Rc { - Rc::new(Self(RefCell::new(HttpServerStateInner { - pool: Vec::new(), - drain_waker: None, - }))) +impl SignallingRc { + #[inline] + pub fn new(t: T) -> Self { + Self(Rc::new((t, Default::default()))) } - pub fn drain<'a>(self: &'a Rc) -> impl Future + 'a { - struct HttpServerStateDrain<'a>(&'a Rc); + #[inline] + pub fn strong_count(&self) -> usize { + Rc::strong_count(&self.0) + } - impl<'a> Future for HttpServerStateDrain<'a> { - type Output = (); + /// Resolves when this is the only remaining reference. + #[inline] + pub fn poll_complete(&self, cx: &mut Context<'_>) -> Poll<()> { + if Rc::strong_count(&self.0) == 1 { + Poll::Ready(()) + } else { + self.0 .1.set(Some(cx.waker().clone())); + Poll::Pending + } + } +} - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll { - let server_state = self.0; - http_trace!(server_state, "HttpServerState::drain poll"); - if Rc::strong_count(server_state) <= 1 { - return Poll::Ready(()); - } - server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone()); - Poll::Pending +impl Clone for SignallingRc { + #[inline] + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl Drop for SignallingRc { + #[inline] + fn drop(&mut self) { + // Trigger the waker iff the refcount is about to become 1. + if Rc::strong_count(&self.0) == 2 { + if let Some(waker) = self.0 .1.take() { + waker.wake(); } } + } +} - HttpServerStateDrain(self) +impl std::ops::Deref for SignallingRc { + type Target = T; + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 .0 + } +} + +pub(crate) struct HttpServerState(RefCell); + +impl HttpServerState { + pub fn new() -> SignallingRc { + SignallingRc::new(Self(RefCell::new(HttpServerStateInner { + pool: Vec::new(), + }))) + } +} + +impl std::ops::Deref for HttpServerState { + type Target = RefCell; + + fn deref(&self) -> &Self::Target { + &self.0 } } @@ -117,7 +179,7 @@ impl Drop for HttpRequestBodyAutocloser { pub async fn handle_request( request: Request, request_info: HttpConnectionProperties, - server_state: Rc, // Keep server alive for duration of this future. + server_state: SignallingRc, // Keep server alive for duration of this future. tx: tokio::sync::mpsc::Sender>, ) -> Result { // If the underlying TCP connection is closed, this future will be dropped @@ -145,7 +207,7 @@ pub async fn handle_request( } struct HttpRecordInner { - server_state: Rc, + server_state: SignallingRc, request_info: HttpConnectionProperties, request_parts: http::request::Parts, request_body: Option, @@ -163,18 +225,14 @@ struct HttpRecordInner { pub struct HttpRecord(RefCell>); -#[cfg(feature = "__http_tracing")] -pub static RECORD_COUNT: std::sync::atomic::AtomicUsize = - std::sync::atomic::AtomicUsize::new(0); - #[cfg(feature = "__http_tracing")] impl Drop for HttpRecord { fn drop(&mut self) { - let count = RECORD_COUNT + RECORD_COUNT .fetch_sub(1, std::sync::atomic::Ordering::SeqCst) .checked_sub(1) .expect("Count went below zero"); - println!("HTTP count={count}: HttpRecord::drop"); + http_general_trace!("HttpRecord::drop"); } } @@ -182,13 +240,13 @@ impl HttpRecord { fn new( request: Request, request_info: HttpConnectionProperties, - server_state: Rc, + server_state: SignallingRc, ) -> Rc { let (request_parts, request_body) = request.into_parts(); let request_body = Some(request_body.into()); let (mut response_parts, _) = http::Response::new(()).into_parts(); let record = - if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() { + if let Some((record, headers)) = server_state.borrow_mut().pool.pop() { response_parts.headers = headers; http_trace!(record, "HttpRecord::reuse"); record @@ -262,23 +320,13 @@ impl HttpRecord { .. } = self.0.borrow_mut().take().unwrap(); - let mut server_state_mut = server_state.0.borrow_mut(); - let inflight = Rc::strong_count(&server_state); + let inflight = server_state.strong_count(); http_trace!(self, "HttpRecord::recycle inflight={}", inflight); - // Server is shutting down so wake the drain future. - if let Some(waker) = server_state_mut.drain_waker.take() { - drop(server_state_mut); - drop(server_state); - http_trace!(self, "HttpRecord::recycle wake"); - waker.wake(); - return; - } - // Keep a buffer of allocations on hand to be reused by incoming requests. // Estimated target size is 16 + 1/8 the number of inflight requests. let target = 16 + (inflight >> 3); - let pool = &mut server_state_mut.pool; + let pool = &mut server_state.borrow_mut().pool; if target > pool.len() { headers.clear(); pool.push((self, headers)); @@ -634,7 +682,7 @@ mod tests { .await }, )?; - assert_eq!(Rc::strong_count(&server_state_check), 1); + assert_eq!(server_state_check.strong_count(), 1); Ok(()) } }