1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-30 16:40:57 -05:00
denoland-deno/ext/http/service.rs
Laurence Rowe e5819777c3
refactor(ext/http): Use HttpRecord as response body to track until body completion (#20822)
Use HttpRecord as response body so requests can be tracked all the way
to response body completion.

This allows Request properties to be accessed while the response body is
streaming.

Graceful shutdown now awaits a future instead of async spinning waiting
for requests to finish.

On the minimal benchmark this refactor improves performance an
additional 2% over pooling alone for a net 3% increase over the previous
deno main branch.

Builds upon https://github.com/denoland/deno/pull/20809 and
https://github.com/denoland/deno/pull/20770.

---------

Co-authored-by: Matt Mastracci <matthew@mastracci.com>
2023-11-13 19:17:31 +00:00

640 lines
19 KiB
Rust

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::request_properties::HttpConnectionProperties;
use crate::response_body::ResponseBytesInner;
use crate::response_body::ResponseStreamResult;
use deno_core::error::AnyError;
use deno_core::futures::ready;
use deno_core::BufView;
use deno_core::OpState;
use deno_core::ResourceId;
use http::request::Parts;
use http::HeaderMap;
use hyper1::body::Body;
use hyper1::body::Frame;
use hyper1::body::Incoming;
use hyper1::body::SizeHint;
use hyper1::upgrade::OnUpgrade;
use scopeguard::guard;
use scopeguard::ScopeGuard;
use std::cell::Ref;
use std::cell::RefCell;
use std::cell::RefMut;
use std::future::Future;
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<HttpRecordResponse>;
macro_rules! http_trace {
($record:expr $(, $args:expr)*) => {
#[cfg(feature = "__http_tracing")]
{
println!(
"HTTP id={:p} strong={}: {}",
$record,
std::rc::Rc::strong_count(&$record),
format!($($args),*),
);
}
};
}
pub(crate) use http_trace;
struct HttpServerStateInner {
pool: Vec<(Rc<HttpRecord>, HeaderMap)>,
drain_waker: Option<Waker>,
}
pub struct HttpServerState(RefCell<HttpServerStateInner>);
impl HttpServerState {
pub fn new() -> Rc<Self> {
Rc::new(Self(RefCell::new(HttpServerStateInner {
pool: Vec::new(),
drain_waker: None,
})))
}
pub fn drain<'a>(self: &'a Rc<Self>) -> impl Future<Output = ()> + 'a {
struct HttpServerStateDrain<'a>(&'a Rc<HttpServerState>);
impl<'a> Future for HttpServerStateDrain<'a> {
type Output = ();
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let server_state = self.0;
http_trace!(server_state, "HttpServerState::drain poll");
if Rc::strong_count(server_state) <= 1 {
return Poll::Ready(());
}
server_state.0.borrow_mut().drain_waker = Some(cx.waker().clone());
Poll::Pending
}
}
HttpServerStateDrain(self)
}
}
enum RequestBodyState {
Incoming(Incoming),
Resource(HttpRequestBodyAutocloser),
}
impl From<Incoming> for RequestBodyState {
fn from(value: Incoming) -> Self {
RequestBodyState::Incoming(value)
}
}
/// Ensures that the request body closes itself when no longer needed.
pub struct HttpRequestBodyAutocloser(ResourceId, Rc<RefCell<OpState>>);
impl HttpRequestBodyAutocloser {
pub fn new(res: ResourceId, op_state: Rc<RefCell<OpState>>) -> Self {
Self(res, op_state)
}
}
impl Drop for HttpRequestBodyAutocloser {
fn drop(&mut self) {
if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) {
res.close();
}
}
}
pub async fn handle_request(
request: Request,
request_info: HttpConnectionProperties,
server_state: Rc<HttpServerState>, // Keep server alive for duration of this future.
tx: tokio::sync::mpsc::Sender<Rc<HttpRecord>>,
) -> Result<Response, hyper::Error> {
// If the underlying TCP connection is closed, this future will be dropped
// and execution could stop at any await point.
// The HttpRecord must live until JavaScript is done processing so is wrapped
// in an Rc. The guard ensures unneeded resources are freed at cancellation.
let guarded_record = guard(
HttpRecord::new(request, request_info, server_state),
HttpRecord::cancel,
);
// Clone HttpRecord and send to JavaScript for processing.
// Safe to unwrap as channel receiver is never closed.
tx.send(guarded_record.clone()).await.unwrap();
// Wait for JavaScript handler to return request.
http_trace!(*guarded_record, "handle_request response_ready.await");
guarded_record.response_ready().await;
// Defuse the guard. Must not await after this point.
let record = ScopeGuard::into_inner(guarded_record);
http_trace!(record, "handle_request complete");
let response = record.into_response();
Ok(response)
}
struct HttpRecordInner {
server_state: Rc<HttpServerState>,
request_info: HttpConnectionProperties,
request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
response_parts: Option<http::response::Parts>,
response_ready: bool,
response_waker: Option<Waker>,
response_body: ResponseBytesInner,
response_body_finished: bool,
response_body_waker: Option<Waker>,
trailers: Option<HeaderMap>,
been_dropped: bool,
finished: bool,
needs_close_after_finish: bool,
}
pub struct HttpRecord(RefCell<Option<HttpRecordInner>>);
#[cfg(feature = "__http_tracing")]
pub static RECORD_COUNT: std::sync::atomic::AtomicUsize =
std::sync::atomic::AtomicUsize::new(0);
#[cfg(feature = "__http_tracing")]
impl Drop for HttpRecord {
fn drop(&mut self) {
let count = RECORD_COUNT
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst)
.checked_sub(1)
.expect("Count went below zero");
println!("HTTP count={count}: HttpRecord::drop");
}
}
impl HttpRecord {
fn new(
request: Request,
request_info: HttpConnectionProperties,
server_state: Rc<HttpServerState>,
) -> Rc<Self> {
let (request_parts, request_body) = request.into_parts();
let request_body = Some(request_body.into());
let (mut response_parts, _) = http::Response::new(()).into_parts();
let record =
if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() {
response_parts.headers = headers;
http_trace!(record, "HttpRecord::reuse");
record
} else {
#[cfg(feature = "__http_tracing")]
{
RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
#[allow(clippy::let_and_return)]
let record = Rc::new(Self(RefCell::new(None)));
http_trace!(record, "HttpRecord::new");
record
};
*record.0.borrow_mut() = Some(HttpRecordInner {
server_state,
request_info,
request_parts,
request_body,
response_parts: Some(response_parts),
response_ready: false,
response_waker: None,
response_body: ResponseBytesInner::Empty,
response_body_finished: false,
response_body_waker: None,
trailers: None,
been_dropped: false,
finished: false,
needs_close_after_finish: false,
});
record
}
fn finish(self: Rc<Self>) {
http_trace!(self, "HttpRecord::finish");
let mut inner = self.self_mut();
inner.response_body_finished = true;
let response_body_waker = inner.response_body_waker.take();
let needs_close_after_finish = inner.needs_close_after_finish;
drop(inner);
if let Some(waker) = response_body_waker {
waker.wake();
}
if !needs_close_after_finish {
self.recycle();
}
}
pub fn close_after_finish(self: Rc<Self>) {
debug_assert!(self.self_ref().needs_close_after_finish);
let mut inner = self.self_mut();
inner.needs_close_after_finish = false;
if !inner.finished {
drop(inner);
self.recycle();
}
}
pub fn needs_close_after_finish(&self) -> RefMut<'_, bool> {
RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish)
}
fn recycle(self: Rc<Self>) {
assert!(
Rc::strong_count(&self) == 1,
"HTTP state error: Expected to be last strong reference"
);
let HttpRecordInner {
server_state,
request_parts: Parts { mut headers, .. },
..
} = self.0.borrow_mut().take().unwrap();
let mut server_state_mut = server_state.0.borrow_mut();
let inflight = Rc::strong_count(&server_state);
http_trace!(self, "HttpRecord::recycle inflight={}", inflight);
// Server is shutting down so wake the drain future.
if let Some(waker) = server_state_mut.drain_waker.take() {
drop(server_state_mut);
drop(server_state);
http_trace!(self, "HttpRecord::recycle wake");
waker.wake();
return;
}
// Keep a buffer of allocations on hand to be reused by incoming requests.
// Estimated target size is 16 + 1/8 the number of inflight requests.
let target = 16 + (inflight >> 3);
let pool = &mut server_state_mut.pool;
if target > pool.len() {
headers.clear();
pool.push((self, headers));
} else if target < pool.len() - 8 {
pool.truncate(target);
}
}
fn self_ref(&self) -> Ref<'_, HttpRecordInner> {
Ref::map(self.0.borrow(), |option| option.as_ref().unwrap())
}
fn self_mut(&self) -> RefMut<'_, HttpRecordInner> {
RefMut::map(self.0.borrow_mut(), |option| option.as_mut().unwrap())
}
/// Perform the Hyper upgrade on this record.
pub fn upgrade(&self) -> Result<OnUpgrade, AnyError> {
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
self
.self_mut()
.request_parts
.extensions
.remove::<OnUpgrade>()
.ok_or_else(|| AnyError::msg("upgrade unavailable"))
}
/// Take the Hyper body from this record.
pub fn take_request_body(&self) -> Option<Incoming> {
let body_holder = &mut self.self_mut().request_body;
let body = body_holder.take();
match body {
Some(RequestBodyState::Incoming(body)) => Some(body),
x => {
*body_holder = x;
None
}
}
}
/// Replace the request body with a resource ID and the OpState we'll need to shut it down.
/// We cannot keep just the resource itself, as JS code might be reading from the resource ID
/// to generate the response data (requiring us to keep it in the resource table).
pub fn put_resource(&self, res: HttpRequestBodyAutocloser) {
self.self_mut().request_body = Some(RequestBodyState::Resource(res));
}
/// Cleanup resources not needed after the future is dropped.
fn cancel(self: Rc<Self>) {
http_trace!(self, "HttpRecord::cancel");
let mut inner = self.self_mut();
if inner.response_ready {
// Future dropped between wake() and async fn resuming.
drop(inner);
self.finish();
return;
}
inner.been_dropped = true;
// The request body might include actual resources.
inner.request_body.take();
}
/// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well).
pub fn complete(self: Rc<Self>) {
http_trace!(self, "HttpRecord::complete");
let mut inner = self.self_mut();
assert!(
!inner.response_ready,
"HTTP state error: Entry has already been completed"
);
if inner.been_dropped {
drop(inner);
self.finish();
return;
}
inner.response_ready = true;
if let Some(waker) = inner.response_waker.take() {
drop(inner);
waker.wake();
}
}
fn take_response_body(&self) -> ResponseBytesInner {
let mut inner = self.self_mut();
debug_assert!(
!matches!(inner.response_body, ResponseBytesInner::Done),
"HTTP state error: response body already complete"
);
std::mem::replace(&mut inner.response_body, ResponseBytesInner::Done)
}
/// Has the future for this record been dropped? ie, has the underlying TCP connection
/// been closed?
pub fn cancelled(&self) -> bool {
self.self_ref().been_dropped
}
/// Get a mutable reference to the response status and headers.
pub fn response_parts(&self) -> RefMut<'_, http::response::Parts> {
RefMut::map(self.self_mut(), |inner| {
inner.response_parts.as_mut().unwrap()
})
}
/// Get a mutable reference to the trailers.
pub fn trailers(&self) -> RefMut<'_, Option<HeaderMap>> {
RefMut::map(self.self_mut(), |inner| &mut inner.trailers)
}
pub fn set_response_body(&self, response_body: ResponseBytesInner) {
let mut inner = self.self_mut();
debug_assert!(matches!(inner.response_body, ResponseBytesInner::Empty));
inner.response_body = response_body;
}
/// Take the response.
fn into_response(self: Rc<Self>) -> Response {
let parts = self.self_mut().response_parts.take().unwrap();
let body = HttpRecordResponse(ManuallyDrop::new(self));
http::Response::from_parts(parts, body)
}
/// Get a reference to the connection properties.
pub fn request_info(&self) -> Ref<'_, HttpConnectionProperties> {
Ref::map(self.self_ref(), |inner| &inner.request_info)
}
/// Get a reference to the request parts.
pub fn request_parts(&self) -> Ref<'_, Parts> {
Ref::map(self.self_ref(), |inner| &inner.request_parts)
}
/// Resolves when response head is ready.
fn response_ready(&self) -> impl Future<Output = ()> + '_ {
struct HttpRecordReady<'a>(&'a HttpRecord);
impl<'a> Future for HttpRecordReady<'a> {
type Output = ();
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let mut mut_self = self.0.self_mut();
if mut_self.response_ready {
return Poll::Ready(());
}
mut_self.response_waker = Some(cx.waker().clone());
Poll::Pending
}
}
HttpRecordReady(self)
}
/// Resolves when response body has finished streaming.
pub fn response_body_finished(&self) -> impl Future<Output = ()> + '_ {
struct HttpRecordFinished<'a>(&'a HttpRecord);
impl<'a> Future for HttpRecordFinished<'a> {
type Output = ();
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let mut mut_self = self.0.self_mut();
if mut_self.response_body_finished {
return Poll::Ready(());
}
mut_self.response_body_waker = Some(cx.waker().clone());
Poll::Pending
}
}
HttpRecordFinished(self)
}
}
#[repr(transparent)]
pub struct HttpRecordResponse(ManuallyDrop<Rc<HttpRecord>>);
impl Body for HttpRecordResponse {
type Data = BufView;
type Error = AnyError;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
use crate::response_body::PollFrame;
let record = &self.0;
let res = loop {
let mut inner = record.self_mut();
let res = match &mut inner.response_body {
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
if let Some(trailers) = inner.trailers.take() {
return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
unreachable!()
}
ResponseBytesInner::Bytes(..) => {
drop(inner);
let ResponseBytesInner::Bytes(data) = record.take_response_body()
else {
unreachable!();
};
return Poll::Ready(Some(Ok(Frame::data(data))));
}
ResponseBytesInner::UncompressedStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
ResponseBytesInner::GZipStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
ResponseBytesInner::BrotliStream(stm) => {
ready!(Pin::new(stm).poll_frame(cx))
}
};
// This is where we retry the NoData response
if matches!(res, ResponseStreamResult::NoData) {
continue;
}
break res;
};
if matches!(res, ResponseStreamResult::EndOfStream) {
if let Some(trailers) = record.self_mut().trailers.take() {
return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
record.take_response_body();
}
Poll::Ready(res.into())
}
fn is_end_stream(&self) -> bool {
let inner = self.0.self_ref();
matches!(
inner.response_body,
ResponseBytesInner::Done | ResponseBytesInner::Empty
) && inner.trailers.is_none()
}
fn size_hint(&self) -> SizeHint {
// The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through
// anyways just in case hyper needs it.
self.0.self_ref().response_body.size_hint()
}
}
impl Drop for HttpRecordResponse {
fn drop(&mut self) {
// SAFETY: this ManuallyDrop is not used again.
let record = unsafe { ManuallyDrop::take(&mut self.0) };
http_trace!(record, "HttpRecordResponse::drop");
record.finish();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hyper_util_tokioio::TokioIo;
use crate::response_body::Compression;
use crate::response_body::ResponseBytesInner;
use bytes::Buf;
use deno_net::raw::NetworkStreamType;
use hyper1::body::Body;
use hyper1::service::service_fn;
use hyper1::service::HttpService;
use std::error::Error as StdError;
/// Execute client request on service and concurrently map the response.
async fn serve_request<B, S, T, F>(
req: http::Request<B>,
service: S,
map_response: impl FnOnce(hyper1::Response<Incoming>) -> F,
) -> hyper1::Result<T>
where
B: Body + Send + 'static, // Send bound due to DuplexStream
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
S: HttpService<Incoming>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::ResBody: 'static,
<S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
F: std::future::Future<Output = hyper1::Result<T>>,
{
use hyper1::client::conn::http1::handshake;
use hyper1::server::conn::http1::Builder;
let (stream_client, stream_server) = tokio::io::duplex(16 * 1024);
let conn_server =
Builder::new().serve_connection(TokioIo::new(stream_server), service);
let (mut sender, conn_client) =
handshake(TokioIo::new(stream_client)).await?;
let (res, _, _) = tokio::try_join!(
async move {
let res = sender.send_request(req).await?;
map_response(res).await
},
conn_server,
conn_client,
)?;
Ok(res)
}
#[tokio::test]
async fn test_handle_request() -> Result<(), AnyError> {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
let server_state = HttpServerState::new();
let server_state_check = server_state.clone();
let request_info = HttpConnectionProperties {
peer_address: "".into(),
peer_port: None,
local_port: None,
stream_type: NetworkStreamType::Tcp,
};
let svc = service_fn(move |req: hyper1::Request<Incoming>| {
handle_request(
req,
request_info.clone(),
server_state.clone(),
tx.clone(),
)
});
let client_req = http::Request::builder().uri("/").body("".to_string())?;
// Response produced by concurrent tasks
tokio::try_join!(
async move {
// JavaScript handler produces response
let record = rx.recv().await.unwrap();
record.set_response_body(ResponseBytesInner::from_vec(
Compression::None,
b"hello world".to_vec(),
));
record.complete();
Ok(())
},
// Server connection executes service
async move {
serve_request(client_req, svc, |res| async {
// Client reads the response
use http_body_util::BodyExt;
assert_eq!(res.status(), 200);
let body = res.collect().await?.to_bytes();
assert_eq!(body.chunk(), b"hello world");
Ok(())
})
.await
},
)?;
assert_eq!(Rc::strong_count(&server_state_check), 1);
Ok(())
}
}