mirror of
https://github.com/denoland/deno.git
synced 2025-01-08 07:08:27 -05:00
perf(ext/http): Object pooling for HttpRecord and HeaderMap (#20809)
Reuse existing existing allocations for HttpRecord and response HeaderMap where possible. At request end used allocations are returned to the pool and the pool and the pool sized to 1/8th the current number of inflight requests. For http1 hyper will reuse the response HeaderMap for the following request on the connection. Builds upon https://github.com/denoland/deno/pull/20770 --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
This commit is contained in:
parent
60c6052060
commit
2980cb5e5b
2 changed files with 119 additions and 49 deletions
|
@ -14,7 +14,7 @@ use crate::service::handle_request;
|
||||||
use crate::service::http_trace;
|
use crate::service::http_trace;
|
||||||
use crate::service::HttpRecord;
|
use crate::service::HttpRecord;
|
||||||
use crate::service::HttpRequestBodyAutocloser;
|
use crate::service::HttpRequestBodyAutocloser;
|
||||||
use crate::service::RefCount;
|
use crate::service::HttpServerState;
|
||||||
use crate::websocket_upgrade::WebSocketUpgrade;
|
use crate::websocket_upgrade::WebSocketUpgrade;
|
||||||
use crate::LocalExecutor;
|
use crate::LocalExecutor;
|
||||||
use cache_control::CacheControl;
|
use cache_control::CacheControl;
|
||||||
|
@ -844,13 +844,13 @@ fn serve_https(
|
||||||
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
|
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
|
||||||
) -> JoinHandle<Result<(), AnyError>> {
|
) -> JoinHandle<Result<(), AnyError>> {
|
||||||
let HttpLifetime {
|
let HttpLifetime {
|
||||||
refcount,
|
server_state,
|
||||||
connection_cancel_handle,
|
connection_cancel_handle,
|
||||||
listen_cancel_handle,
|
listen_cancel_handle,
|
||||||
} = lifetime;
|
} = lifetime;
|
||||||
|
|
||||||
let svc = service_fn(move |req: Request| {
|
let svc = service_fn(move |req: Request| {
|
||||||
handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
|
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
|
||||||
});
|
});
|
||||||
spawn(
|
spawn(
|
||||||
async {
|
async {
|
||||||
|
@ -881,13 +881,13 @@ fn serve_http(
|
||||||
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
|
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
|
||||||
) -> JoinHandle<Result<(), AnyError>> {
|
) -> JoinHandle<Result<(), AnyError>> {
|
||||||
let HttpLifetime {
|
let HttpLifetime {
|
||||||
refcount,
|
server_state,
|
||||||
connection_cancel_handle,
|
connection_cancel_handle,
|
||||||
listen_cancel_handle,
|
listen_cancel_handle,
|
||||||
} = lifetime;
|
} = lifetime;
|
||||||
|
|
||||||
let svc = service_fn(move |req: Request| {
|
let svc = service_fn(move |req: Request| {
|
||||||
handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
|
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
|
||||||
});
|
});
|
||||||
spawn(
|
spawn(
|
||||||
serve_http2_autodetect(io, svc, listen_cancel_handle)
|
serve_http2_autodetect(io, svc, listen_cancel_handle)
|
||||||
|
@ -927,7 +927,7 @@ where
|
||||||
struct HttpLifetime {
|
struct HttpLifetime {
|
||||||
connection_cancel_handle: Rc<CancelHandle>,
|
connection_cancel_handle: Rc<CancelHandle>,
|
||||||
listen_cancel_handle: Rc<CancelHandle>,
|
listen_cancel_handle: Rc<CancelHandle>,
|
||||||
refcount: RefCount,
|
server_state: Rc<HttpServerState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct HttpJoinHandle {
|
struct HttpJoinHandle {
|
||||||
|
@ -935,7 +935,7 @@ struct HttpJoinHandle {
|
||||||
connection_cancel_handle: Rc<CancelHandle>,
|
connection_cancel_handle: Rc<CancelHandle>,
|
||||||
listen_cancel_handle: Rc<CancelHandle>,
|
listen_cancel_handle: Rc<CancelHandle>,
|
||||||
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
|
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<Rc<HttpRecord>>>,
|
||||||
refcount: RefCount,
|
server_state: Rc<HttpServerState>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HttpJoinHandle {
|
impl HttpJoinHandle {
|
||||||
|
@ -945,7 +945,7 @@ impl HttpJoinHandle {
|
||||||
connection_cancel_handle: CancelHandle::new_rc(),
|
connection_cancel_handle: CancelHandle::new_rc(),
|
||||||
listen_cancel_handle: CancelHandle::new_rc(),
|
listen_cancel_handle: CancelHandle::new_rc(),
|
||||||
rx: AsyncRefCell::new(rx),
|
rx: AsyncRefCell::new(rx),
|
||||||
refcount: RefCount::default(),
|
server_state: HttpServerState::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -953,7 +953,7 @@ impl HttpJoinHandle {
|
||||||
HttpLifetime {
|
HttpLifetime {
|
||||||
connection_cancel_handle: self.connection_cancel_handle.clone(),
|
connection_cancel_handle: self.connection_cancel_handle.clone(),
|
||||||
listen_cancel_handle: self.listen_cancel_handle.clone(),
|
listen_cancel_handle: self.listen_cancel_handle.clone(),
|
||||||
refcount: self.refcount.clone(),
|
server_state: self.server_state.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1194,17 +1194,16 @@ pub async fn op_http_close(
|
||||||
|
|
||||||
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
|
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
|
||||||
join_handle.listen_cancel_handle().cancel();
|
join_handle.listen_cancel_handle().cancel();
|
||||||
|
// Async spin on the server_state while we wait for everything to drain
|
||||||
|
while Rc::strong_count(&join_handle.server_state) > 1 {
|
||||||
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// In a forceful shutdown, we close everything
|
// In a forceful shutdown, we close everything
|
||||||
join_handle.listen_cancel_handle().cancel();
|
join_handle.listen_cancel_handle().cancel();
|
||||||
join_handle.connection_cancel_handle().cancel();
|
join_handle.connection_cancel_handle().cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Async spin on the refcount while we wait for everything to drain
|
|
||||||
while Rc::strong_count(&join_handle.refcount.0) > 1 {
|
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
|
let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
.await;
|
.await;
|
||||||
|
|
|
@ -22,14 +22,14 @@ pub type Request = hyper1::Request<Incoming>;
|
||||||
pub type Response = hyper1::Response<ResponseBytes>;
|
pub type Response = hyper1::Response<ResponseBytes>;
|
||||||
|
|
||||||
macro_rules! http_trace {
|
macro_rules! http_trace {
|
||||||
($record:expr, $args:tt) => {
|
($record:expr $(, $args:expr)*) => {
|
||||||
#[cfg(feature = "__http_tracing")]
|
#[cfg(feature = "__http_tracing")]
|
||||||
{
|
{
|
||||||
println!(
|
println!(
|
||||||
"HTTP id={:p} strong={}: {}",
|
"HTTP id={:p} strong={}: {}",
|
||||||
$record,
|
$record,
|
||||||
std::rc::Rc::strong_count(&$record),
|
std::rc::Rc::strong_count(&$record),
|
||||||
format!($args),
|
format!($($args),*),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -37,9 +37,19 @@ macro_rules! http_trace {
|
||||||
|
|
||||||
pub(crate) use http_trace;
|
pub(crate) use http_trace;
|
||||||
|
|
||||||
#[repr(transparent)]
|
struct HttpServerStateInner {
|
||||||
#[derive(Clone, Default)]
|
pool: Vec<(Rc<HttpRecord>, HeaderMap)>,
|
||||||
pub struct RefCount(pub Rc<()>);
|
}
|
||||||
|
|
||||||
|
pub struct HttpServerState(RefCell<HttpServerStateInner>);
|
||||||
|
|
||||||
|
impl HttpServerState {
|
||||||
|
pub fn new() -> Rc<Self> {
|
||||||
|
Rc::new(Self(RefCell::new(HttpServerStateInner {
|
||||||
|
pool: Vec::new(),
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
enum RequestBodyState {
|
enum RequestBodyState {
|
||||||
Incoming(Incoming),
|
Incoming(Incoming),
|
||||||
|
@ -72,15 +82,17 @@ impl Drop for HttpRequestBodyAutocloser {
|
||||||
pub async fn handle_request(
|
pub async fn handle_request(
|
||||||
request: Request,
|
request: Request,
|
||||||
request_info: HttpConnectionProperties,
|
request_info: HttpConnectionProperties,
|
||||||
_refcount: RefCount, // Keep server alive for duration of this future.
|
server_state: Rc<HttpServerState>, // Keep server alive for duration of this future.
|
||||||
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
|
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
|
||||||
) -> Result<Response, hyper::Error> {
|
) -> Result<Response, hyper::Error> {
|
||||||
// If the underlying TCP connection is closed, this future will be dropped
|
// If the underlying TCP connection is closed, this future will be dropped
|
||||||
// and execution could stop at any await point.
|
// and execution could stop at any await point.
|
||||||
// The HttpRecord must live until JavaScript is done processing so is wrapped
|
// The HttpRecord must live until JavaScript is done processing so is wrapped
|
||||||
// in an Rc. The guard ensures unneeded resources are freed at cancellation.
|
// in an Rc. The guard ensures unneeded resources are freed at cancellation.
|
||||||
let guarded_record =
|
let guarded_record = guard(
|
||||||
guard(HttpRecord::new(request, request_info), HttpRecord::cancel);
|
HttpRecord::new(request, request_info, server_state),
|
||||||
|
HttpRecord::cancel,
|
||||||
|
);
|
||||||
|
|
||||||
// Clone HttpRecord and send to JavaScript for processing.
|
// Clone HttpRecord and send to JavaScript for processing.
|
||||||
// Safe to unwrap as channel receiver is never closed.
|
// Safe to unwrap as channel receiver is never closed.
|
||||||
|
@ -93,15 +105,13 @@ pub async fn handle_request(
|
||||||
// Defuse the guard. Must not await after the point.
|
// Defuse the guard. Must not await after the point.
|
||||||
let record = ScopeGuard::into_inner(guarded_record);
|
let record = ScopeGuard::into_inner(guarded_record);
|
||||||
http_trace!(record, "handle_request complete");
|
http_trace!(record, "handle_request complete");
|
||||||
assert!(
|
|
||||||
Rc::strong_count(&record) == 1,
|
|
||||||
"HTTP state error: Expected to be last strong reference (handle_request)"
|
|
||||||
);
|
|
||||||
let response = record.take_response();
|
let response = record.take_response();
|
||||||
|
record.recycle();
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
struct HttpRecordInner {
|
struct HttpRecordInner {
|
||||||
|
server_state: Rc<HttpServerState>,
|
||||||
request_info: HttpConnectionProperties,
|
request_info: HttpConnectionProperties,
|
||||||
request_parts: Parts,
|
request_parts: Parts,
|
||||||
request_body: Option<RequestBodyState>,
|
request_body: Option<RequestBodyState>,
|
||||||
|
@ -113,7 +123,7 @@ struct HttpRecordInner {
|
||||||
been_dropped: bool,
|
been_dropped: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct HttpRecord(RefCell<HttpRecordInner>);
|
pub struct HttpRecord(RefCell<Option<HttpRecordInner>>);
|
||||||
|
|
||||||
#[cfg(feature = "__http_tracing")]
|
#[cfg(feature = "__http_tracing")]
|
||||||
pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
|
pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
|
||||||
|
@ -131,37 +141,86 @@ impl Drop for HttpRecord {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HttpRecord {
|
impl HttpRecord {
|
||||||
fn new(request: Request, request_info: HttpConnectionProperties) -> Rc<Self> {
|
fn new(
|
||||||
#[cfg(feature = "__http_tracing")]
|
request: Request,
|
||||||
{
|
request_info: HttpConnectionProperties,
|
||||||
RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
server_state: Rc<HttpServerState>,
|
||||||
}
|
) -> Rc<Self> {
|
||||||
let (request_parts, request_body) = request.into_parts();
|
let (request_parts, request_body) = request.into_parts();
|
||||||
let body = ResponseBytes::default();
|
let body = ResponseBytes::default();
|
||||||
let trailers = body.trailers();
|
let trailers = body.trailers();
|
||||||
let request_body = Some(request_body.into());
|
let request_body = Some(request_body.into());
|
||||||
|
let mut response = Response::new(body);
|
||||||
|
let reuse_record =
|
||||||
|
if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() {
|
||||||
|
*response.headers_mut() = headers;
|
||||||
|
Some(record)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
let inner = HttpRecordInner {
|
let inner = HttpRecordInner {
|
||||||
|
server_state,
|
||||||
request_info,
|
request_info,
|
||||||
request_parts,
|
request_parts,
|
||||||
request_body,
|
request_body,
|
||||||
response: Some(Response::new(body)),
|
response: Some(response),
|
||||||
response_ready: false,
|
response_ready: false,
|
||||||
response_waker: None,
|
response_waker: None,
|
||||||
trailers,
|
trailers,
|
||||||
been_dropped: false,
|
been_dropped: false,
|
||||||
};
|
};
|
||||||
|
if let Some(record) = reuse_record {
|
||||||
|
*record.0.borrow_mut() = Some(inner);
|
||||||
|
http_trace!(record, "HttpRecord::reuse");
|
||||||
|
record
|
||||||
|
} else {
|
||||||
|
#[cfg(feature = "__http_tracing")]
|
||||||
|
{
|
||||||
|
RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::let_and_return)]
|
#[allow(clippy::let_and_return)]
|
||||||
let record = Rc::new(Self(RefCell::new(inner)));
|
let record = Rc::new(Self(RefCell::new(Some(inner))));
|
||||||
http_trace!(record, "HttpRecord::new");
|
http_trace!(record, "HttpRecord::new");
|
||||||
record
|
record
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn recycle(self: Rc<Self>) {
|
||||||
|
assert!(
|
||||||
|
Rc::strong_count(&self) == 1,
|
||||||
|
"HTTP state error: Expected to be last strong reference"
|
||||||
|
);
|
||||||
|
let HttpRecordInner {
|
||||||
|
server_state,
|
||||||
|
request_parts: Parts { mut headers, .. },
|
||||||
|
..
|
||||||
|
} = self.0.borrow_mut().take().unwrap();
|
||||||
|
let mut server_state_mut = server_state.0.borrow_mut();
|
||||||
|
let inflight = Rc::strong_count(&server_state);
|
||||||
|
http_trace!(self, "HttpRecord::recycle inflight={}", inflight);
|
||||||
|
|
||||||
|
// TODO(mmastrac): we never recover the pooled memory here, and we could likely be shuttling
|
||||||
|
// the to-drop objects off to another thread.
|
||||||
|
|
||||||
|
// Keep a buffer of allocations on hand to be reused by incoming requests.
|
||||||
|
// Estimated target size is 16 + 1/8 the number of inflight requests.
|
||||||
|
let target = 16 + (inflight >> 3);
|
||||||
|
let pool = &mut server_state_mut.pool;
|
||||||
|
if target > pool.len() {
|
||||||
|
headers.clear();
|
||||||
|
pool.push((self, headers));
|
||||||
|
} else if target < pool.len() - 8 {
|
||||||
|
pool.truncate(target);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn self_ref(&self) -> Ref<'_, HttpRecordInner> {
|
fn self_ref(&self) -> Ref<'_, HttpRecordInner> {
|
||||||
self.0.borrow()
|
Ref::map(self.0.borrow(), |option| option.as_ref().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn self_mut(&self) -> RefMut<'_, HttpRecordInner> {
|
fn self_mut(&self) -> RefMut<'_, HttpRecordInner> {
|
||||||
self.0.borrow_mut()
|
RefMut::map(self.0.borrow_mut(), |option| option.as_mut().unwrap())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Perform the Hyper upgrade on this record.
|
/// Perform the Hyper upgrade on this record.
|
||||||
|
@ -210,7 +269,13 @@ impl HttpRecord {
|
||||||
/// Cleanup resources not needed after the future is dropped.
|
/// Cleanup resources not needed after the future is dropped.
|
||||||
fn cancel(self: Rc<Self>) {
|
fn cancel(self: Rc<Self>) {
|
||||||
http_trace!(self, "HttpRecord::cancel");
|
http_trace!(self, "HttpRecord::cancel");
|
||||||
let mut inner = self.0.borrow_mut();
|
let mut inner = self.self_mut();
|
||||||
|
if inner.response_ready {
|
||||||
|
// Future dropped between wake() and async fn resuming.
|
||||||
|
drop(inner);
|
||||||
|
self.recycle();
|
||||||
|
return;
|
||||||
|
}
|
||||||
inner.been_dropped = true;
|
inner.been_dropped = true;
|
||||||
// The request body might include actual resources.
|
// The request body might include actual resources.
|
||||||
inner.request_body.take();
|
inner.request_body.take();
|
||||||
|
@ -220,14 +285,15 @@ impl HttpRecord {
|
||||||
pub fn complete(self: Rc<Self>) {
|
pub fn complete(self: Rc<Self>) {
|
||||||
http_trace!(self, "HttpRecord::complete");
|
http_trace!(self, "HttpRecord::complete");
|
||||||
let mut inner = self.self_mut();
|
let mut inner = self.self_mut();
|
||||||
assert!(
|
|
||||||
!inner.been_dropped || Rc::strong_count(&self) == 1,
|
|
||||||
"HTTP state error: Expected to be last strong reference (been_dropped)"
|
|
||||||
);
|
|
||||||
assert!(
|
assert!(
|
||||||
!inner.response_ready,
|
!inner.response_ready,
|
||||||
"HTTP state error: Entry has already been completed"
|
"HTTP state error: Entry has already been completed"
|
||||||
);
|
);
|
||||||
|
if inner.been_dropped {
|
||||||
|
drop(inner);
|
||||||
|
self.recycle();
|
||||||
|
return;
|
||||||
|
}
|
||||||
inner.response_ready = true;
|
inner.response_ready = true;
|
||||||
if let Some(waker) = inner.response_waker.take() {
|
if let Some(waker) = inner.response_waker.take() {
|
||||||
drop(inner);
|
drop(inner);
|
||||||
|
@ -277,7 +343,7 @@ impl HttpRecord {
|
||||||
self: std::pin::Pin<&mut Self>,
|
self: std::pin::Pin<&mut Self>,
|
||||||
cx: &mut std::task::Context<'_>,
|
cx: &mut std::task::Context<'_>,
|
||||||
) -> std::task::Poll<Self::Output> {
|
) -> std::task::Poll<Self::Output> {
|
||||||
let mut mut_self = self.0 .0.borrow_mut();
|
let mut mut_self = self.0.self_mut();
|
||||||
if mut_self.response_ready {
|
if mut_self.response_ready {
|
||||||
return std::task::Poll::Ready(());
|
return std::task::Poll::Ready(());
|
||||||
}
|
}
|
||||||
|
@ -352,8 +418,8 @@ mod tests {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_handle_request() -> Result<(), AnyError> {
|
async fn test_handle_request() -> Result<(), AnyError> {
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
|
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
|
||||||
let refcount = RefCount::default();
|
let server_state = HttpServerState::new();
|
||||||
let refcount_check = refcount.clone();
|
let server_state_check = server_state.clone();
|
||||||
let request_info = HttpConnectionProperties {
|
let request_info = HttpConnectionProperties {
|
||||||
peer_address: "".into(),
|
peer_address: "".into(),
|
||||||
peer_port: None,
|
peer_port: None,
|
||||||
|
@ -361,7 +427,12 @@ mod tests {
|
||||||
stream_type: NetworkStreamType::Tcp,
|
stream_type: NetworkStreamType::Tcp,
|
||||||
};
|
};
|
||||||
let svc = service_fn(move |req: hyper1::Request<Incoming>| {
|
let svc = service_fn(move |req: hyper1::Request<Incoming>| {
|
||||||
handle_request(req, request_info.clone(), refcount.clone(), tx.clone())
|
handle_request(
|
||||||
|
req,
|
||||||
|
request_info.clone(),
|
||||||
|
server_state.clone(),
|
||||||
|
tx.clone(),
|
||||||
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
let client_req = http::Request::builder().uri("/").body("".to_string())?;
|
let client_req = http::Request::builder().uri("/").body("".to_string())?;
|
||||||
|
@ -395,7 +466,7 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
assert_eq!(Rc::strong_count(&refcount_check.0), 1);
|
assert_eq!(Rc::strong_count(&server_state_check), 1);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue