From a22388bbd1377f75d3b873c59f6836cd12c2abe5 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Tue, 16 May 2023 17:00:59 -0600 Subject: [PATCH] fix(ext/http): Ensure cancelled requests don't crash Deno.serve (#19154) Fixes for various `Attemped to access invalid request` bugs (#19058, #15427, #17213). We did not wait for both a drop event and a completion event before removing items from the slab table. This ensures that we do so. In addition, the slab methods are refactored out into `slab.rs` for maintainability. --- cli/tests/unit/serve_test.ts | 33 ++- ext/http/00_serve.js | 2 + ext/http/Cargo.toml | 1 + ext/http/http_next.rs | 447 +++++++++++------------------------ ext/http/lib.rs | 1 + ext/http/response_body.rs | 4 + ext/http/slab.rs | 241 +++++++++++++++++++ 7 files changed, 416 insertions(+), 313 deletions(-) create mode 100644 ext/http/slab.rs diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 2bd2314b73..15dc84a28d 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -15,7 +15,6 @@ import { deferred, fail, } from "./test_util.ts"; -import { consoleSize } from "../../../runtime/js/40_tty.js"; const { upgradeHttpRaw, @@ -665,6 +664,38 @@ Deno.test({ permissions: { net: true } }, async function httpServerClose() { await server; }); +// https://github.com/denoland/deno/issues/15427 +Deno.test({ permissions: { net: true } }, async function httpServerCloseGet() { + const ac = new AbortController(); + const listeningPromise = deferred(); + const requestPromise = deferred(); + const responsePromise = deferred(); + const server = Deno.serve({ + handler: async () => { + requestPromise.resolve(); + await new Promise((r) => setTimeout(r, 500)); + responsePromise.resolve(); + return new Response("ok"); + }, + port: 4501, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: createOnErrorCb(ac), + }); + await listeningPromise; + const conn = await Deno.connect({ port: 4501 }); + const encoder = new TextEncoder(); + const body = + `GET / HTTP/1.1\r\nHost: example.domain\r\nConnection: close\r\n\r\n`; + const writeResult = await conn.write(encoder.encode(body)); + assertEquals(body.length, writeResult); + await requestPromise; + conn.close(); + await responsePromise; + ac.abort(); + await server; +}); + // FIXME: Deno.test( { permissions: { net: true } }, diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 69ad885660..35af49b046 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -534,6 +534,8 @@ function mapToCallback(responseBodies, context, signal, callback, onError) { // Did everything shut down while we were waiting? if (context.closed) { + // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate + op_http_set_promise_complete(req, 503); innerRequest?.close(); return; } diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index e555d742e5..1b3d075d15 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -12,6 +12,7 @@ description = "HTTP server implementation for Deno" [features] "__zombie_http_tracking" = [] +"__http_tracing" = [] [lib] path = "lib.rs" diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index a986de7f3a..34281ee921 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -6,11 +6,14 @@ use crate::request_body::HttpRequestBody; use crate::request_properties::HttpConnectionProperties; use crate::request_properties::HttpListenProperties; use crate::request_properties::HttpPropertyExtractor; -use crate::response_body::CompletionHandle; use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; use crate::response_body::V8StreamHttpResponseBody; +use crate::slab::slab_drop; +use crate::slab::slab_get; +use crate::slab::slab_insert; +use crate::slab::SlabId; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; use cache_control::CacheControl; @@ -39,7 +42,6 @@ use http::header::CONTENT_ENCODING; use http::header::CONTENT_LENGTH; use http::header::CONTENT_RANGE; use http::header::CONTENT_TYPE; -use http::request::Parts; use http::HeaderMap; use hyper1::body::Incoming; use hyper1::header::COOKIE; @@ -49,12 +51,10 @@ use hyper1::server::conn::http1; use hyper1::server::conn::http2; use hyper1::service::service_fn; use hyper1::service::HttpService; -use hyper1::upgrade::OnUpgrade; use hyper1::StatusCode; use pin_project::pin_project; use pin_project::pinned_drop; -use slab::Slab; use std::borrow::Cow; use std::cell::RefCell; use std::future::Future; @@ -99,163 +99,16 @@ impl< { } -pub struct HttpSlabRecord { - request_info: HttpConnectionProperties, - request_parts: Parts, - request_body: Option, - // The response may get taken before we tear this down - response: Option, - body: Option>, - promise: CompletionHandle, - #[cfg(__zombie_http_tracking)] - alive: bool, -} - -thread_local! { - pub static SLAB: RefCell> = RefCell::new(Slab::with_capacity(1024)); -} - -/// Generates getters and setters for the [`SLAB`]. For example, -/// `with!(with_req, with_req_mut, Parts, http, http.request_parts);` expands to: -/// -/// ```ignore -/// #[inline(always)] -/// #[allow(dead_code)] -/// pub(crate) fn with_req_mut(key: usize, f: impl FnOnce(&mut Parts) -> T) -> T { -/// SLAB.with(|slab| { -/// let mut borrow = slab.borrow_mut(); -/// let mut http = borrow.get_mut(key).unwrap(); -/// #[cfg(__zombie_http_tracking)] -/// if !http.alive { -/// panic!("Attempted to access a dead HTTP object") -/// } -/// f(&mut http.expr) -/// }) -/// } - -/// #[inline(always)] -/// #[allow(dead_code)] -/// pub(crate) fn with_req(key: usize, f: impl FnOnce(&Parts) -> T) -> T { -/// SLAB.with(|slab| { -/// let mut borrow = slab.borrow(); -/// let mut http = borrow.get(key).unwrap(); -/// #[cfg(__zombie_http_tracking)] -/// if !http.alive { -/// panic!("Attempted to access a dead HTTP object") -/// } -/// f(&http.expr) -/// }) -/// } -/// ``` -macro_rules! with { - ($ref:ident, $mut:ident, $type:ty, $http:ident, $expr:expr) => { - #[inline(always)] - #[allow(dead_code)] - pub(crate) fn $mut(key: u32, f: impl FnOnce(&mut $type) -> T) -> T { - SLAB.with(|slab| { - let mut borrow = slab.borrow_mut(); - #[allow(unused_mut)] // TODO(mmastrac): compiler issue? - let mut $http = match borrow.get_mut(key as usize) { - Some(http) => http, - None => panic!( - "Attemped to access invalid request {} ({} in total available)", - key, - borrow.len() - ), - }; - #[cfg(__zombie_http_tracking)] - if !$http.alive { - panic!("Attempted to access a dead HTTP object") - } - f(&mut $expr) - }) - } - - #[inline(always)] - #[allow(dead_code)] - pub(crate) fn $ref(key: u32, f: impl FnOnce(&$type) -> T) -> T { - SLAB.with(|slab| { - let borrow = slab.borrow(); - let $http = borrow.get(key as usize).unwrap(); - #[cfg(__zombie_http_tracking)] - if !$http.alive { - panic!("Attempted to access a dead HTTP object") - } - f(&$expr) - }) - } - }; -} - -with!(with_req, with_req_mut, Parts, http, http.request_parts); -with!( - with_req_body, - with_req_body_mut, - Option, - http, - http.request_body -); -with!( - with_resp, - with_resp_mut, - Option, - http, - http.response -); -with!( - with_body, - with_body_mut, - Option>, - http, - http.body -); -with!( - with_promise, - with_promise_mut, - CompletionHandle, - http, - http.promise -); -with!(with_http, with_http_mut, HttpSlabRecord, http, http); - -fn slab_insert( - request: Request, - request_info: HttpConnectionProperties, -) -> u32 { - SLAB.with(|slab| { - let (request_parts, request_body) = request.into_parts(); - slab.borrow_mut().insert(HttpSlabRecord { - request_info, - request_parts, - request_body: Some(request_body), - response: Some(Response::new(ResponseBytes::default())), - body: None, - promise: CompletionHandle::default(), - #[cfg(__zombie_http_tracking)] - alive: true, - }) - }) as u32 -} - #[op] pub fn op_http_upgrade_raw( state: &mut OpState, - index: u32, + slab_id: SlabId, ) -> Result { // Stage 1: extract the upgrade future - let upgrade = with_http_mut(index, |http| { - // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit - http - .request_parts - .extensions - .remove::() - .ok_or_else(|| AnyError::msg("upgrade unavailable")) - })?; - + let upgrade = slab_get(slab_id).upgrade()?; let (read, write) = tokio::io::duplex(1024); let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); - spawn(async move { let mut upgrade_stream = WebSocketUpgrade::::default(); @@ -266,8 +119,9 @@ pub fn op_http_upgrade_raw( match upgrade_stream.write(&buf[..read]) { Ok(None) => continue, Ok(Some((response, bytes))) => { - with_resp_mut(index, |resp| *resp = Some(response)); - with_promise_mut(index, |promise| promise.complete(true)); + let mut http = slab_get(slab_id); + *http.response() = response; + http.complete(); let mut upgraded = upgrade.await?; upgraded.write_all(&bytes).await?; break upgraded; @@ -315,29 +169,22 @@ pub fn op_http_upgrade_raw( #[op] pub async fn op_http_upgrade_websocket_next( state: Rc>, - index: u32, + slab_id: SlabId, headers: Vec<(ByteString, ByteString)>, ) -> Result { - // Stage 1: set the respnse to 101 Switching Protocols and send it - let upgrade = with_http_mut(index, |http| { - // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit - let upgrade = http - .request_parts - .extensions - .remove::() - .ok_or_else(|| AnyError::msg("upgrade unavailable"))?; + let mut http = slab_get(slab_id); + // Stage 1: set the response to 101 Switching Protocols and send it + let upgrade = http.upgrade()?; - let response = http.response.as_mut().unwrap(); - *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; - for (name, value) in headers { - response.headers_mut().append( - HeaderName::from_bytes(&name).unwrap(), - HeaderValue::from_bytes(&value).unwrap(), - ); - } - http.promise.complete(true); - Ok::<_, AnyError>(upgrade) - })?; + let response = http.response(); + *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + for (name, value) in headers { + response.headers_mut().append( + HeaderName::from_bytes(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } + http.complete(); // Stage 2: wait for the request to finish upgrading let upgraded = upgrade.await?; @@ -348,137 +195,127 @@ pub async fn op_http_upgrade_websocket_next( } #[op(fast)] -pub fn op_http_set_promise_complete(index: u32, status: u16) { - with_resp_mut(index, |resp| { - // The Javascript code will never provide a status that is invalid here (see 23_response.js) - *resp.as_mut().unwrap().status_mut() = - StatusCode::from_u16(status).unwrap(); - }); - with_promise_mut(index, |promise| { - promise.complete(true); - }); +pub fn op_http_set_promise_complete(slab_id: SlabId, status: u16) { + let mut http = slab_get(slab_id); + // The Javascript code will never provide a status that is invalid here (see 23_response.js) + *http.response().status_mut() = StatusCode::from_u16(status).unwrap(); + http.complete(); } #[op] pub fn op_http_get_request_method_and_url( - index: u32, + slab_id: SlabId, ) -> (String, Option, String, String, Option) where HTTP: HttpPropertyExtractor, { + let http = slab_get(slab_id); + let request_info = http.request_info(); + let request_parts = http.request_parts(); + let request_properties = HTTP::request_properties( + request_info, + &request_parts.uri, + &request_parts.headers, + ); + + // Only extract the path part - we handle authority elsewhere + let path = match &request_parts.uri.path_and_query() { + Some(path_and_query) => path_and_query.to_string(), + None => "".to_owned(), + }; + // TODO(mmastrac): Passing method can be optimized - with_http(index, |http| { - let request_properties = HTTP::request_properties( - &http.request_info, - &http.request_parts.uri, - &http.request_parts.headers, - ); - - // Only extract the path part - we handle authority elsewhere - let path = match &http.request_parts.uri.path_and_query() { - Some(path_and_query) => path_and_query.to_string(), - None => "".to_owned(), - }; - - ( - http.request_parts.method.as_str().to_owned(), - request_properties.authority, - path, - String::from(http.request_info.peer_address.as_ref()), - http.request_info.peer_port, - ) - }) + ( + request_parts.method.as_str().to_owned(), + request_properties.authority, + path, + String::from(request_info.peer_address.as_ref()), + request_info.peer_port, + ) } #[op] pub fn op_http_get_request_header( - index: u32, + slab_id: SlabId, name: String, ) -> Option { - with_req(index, |req| { - let value = req.headers.get(name); - value.map(|value| value.as_bytes().into()) - }) + let http = slab_get(slab_id); + let value = http.request_parts().headers.get(name); + value.map(|value| value.as_bytes().into()) } #[op] pub fn op_http_get_request_headers( - index: u32, + slab_id: SlabId, ) -> Vec<(ByteString, ByteString)> { - with_req(index, |req| { - let headers = &req.headers; - let mut vec = Vec::with_capacity(headers.len()); - let mut cookies: Option> = None; - for (name, value) in headers { - if name == COOKIE { - if let Some(ref mut cookies) = cookies { - cookies.push(value.as_bytes()); - } else { - cookies = Some(vec![value.as_bytes()]); - } + let http = slab_get(slab_id); + let headers = &http.request_parts().headers; + let mut vec = Vec::with_capacity(headers.len()); + let mut cookies: Option> = None; + for (name, value) in headers { + if name == COOKIE { + if let Some(ref mut cookies) = cookies { + cookies.push(value.as_bytes()); } else { - let name: &[u8] = name.as_ref(); - vec.push((name.into(), value.as_bytes().into())) + cookies = Some(vec![value.as_bytes()]); } + } else { + let name: &[u8] = name.as_ref(); + vec.push((name.into(), value.as_bytes().into())) } + } - // We treat cookies specially, because we don't want them to get them - // mangled by the `Headers` object in JS. What we do is take all cookie - // headers and concat them into a single cookie header, separated by - // semicolons. - // TODO(mmastrac): This should probably happen on the JS side on-demand - if let Some(cookies) = cookies { - let cookie_sep = "; ".as_bytes(); - vec.push(( - ByteString::from(COOKIE.as_str()), - ByteString::from(cookies.join(cookie_sep)), - )); - } - vec - }) + // We treat cookies specially, because we don't want them to get them + // mangled by the `Headers` object in JS. What we do is take all cookie + // headers and concat them into a single cookie header, separated by + // semicolons. + // TODO(mmastrac): This should probably happen on the JS side on-demand + if let Some(cookies) = cookies { + let cookie_sep = "; ".as_bytes(); + vec.push(( + ByteString::from(COOKIE.as_str()), + ByteString::from(cookies.join(cookie_sep)), + )); + } + vec } #[op(fast)] pub fn op_http_read_request_body( state: &mut OpState, - index: u32, + slab_id: SlabId, ) -> ResourceId { - let incoming = with_req_body_mut(index, |body| body.take().unwrap()); + let mut http = slab_get(slab_id); + let incoming = http.take_body(); let body_resource = Rc::new(HttpRequestBody::new(incoming)); - let res = state.resource_table.add_rc(body_resource.clone()); - with_body_mut(index, |body| { - *body = Some(body_resource); - }); - res + state.resource_table.add_rc(body_resource) } #[op(fast)] -pub fn op_http_set_response_header(index: u32, name: &str, value: &str) { - with_resp_mut(index, |resp| { - let resp_headers = resp.as_mut().unwrap().headers_mut(); - // These are valid latin-1 strings - let name = HeaderName::from_bytes(name.as_bytes()).unwrap(); - let value = HeaderValue::from_bytes(value.as_bytes()).unwrap(); - resp_headers.append(name, value); - }); +pub fn op_http_set_response_header(slab_id: SlabId, name: &str, value: &str) { + let mut http = slab_get(slab_id); + let resp_headers = http.response().headers_mut(); + // These are valid latin-1 strings + let name = HeaderName::from_bytes(name.as_bytes()).unwrap(); + let value = HeaderValue::from_bytes(value.as_bytes()).unwrap(); + resp_headers.append(name, value); } #[op] pub fn op_http_set_response_headers( - index: u32, + slab_id: SlabId, headers: Vec<(ByteString, ByteString)>, ) { + let mut http = slab_get(slab_id); // TODO(mmastrac): Invalid headers should be handled? - with_resp_mut(index, |resp| { - let resp_headers = resp.as_mut().unwrap().headers_mut(); - resp_headers.reserve(headers.len()); - for (name, value) in headers { - // These are valid latin-1 strings - let name = HeaderName::from_bytes(&name).unwrap(); - let value = HeaderValue::from_bytes(&value).unwrap(); - resp_headers.append(name, value); - } - }) + let resp_headers = http.response().headers_mut(); + resp_headers.reserve(headers.len()); + for (name, value) in headers { + // These are valid latin-1 strings + let name = HeaderName::from_bytes(&name).unwrap(); + let value = HeaderValue::from_bytes(&value).unwrap(); + resp_headers.append(name, value); + } } fn is_request_compressible(headers: &HeaderMap) -> Compression { @@ -588,28 +425,25 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { } fn set_response( - index: u32, + slab_id: SlabId, length: Option, response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { - let compression = - with_req(index, |req| is_request_compressible(&req.headers)); - - with_resp_mut(index, move |response| { - let response = response.as_mut().unwrap(); - let compression = modify_compressibility_from_response( - compression, - length, - response.headers_mut(), - ); - response.body_mut().initialize(response_fn(compression)) - }); + let mut http = slab_get(slab_id); + let compression = is_request_compressible(&http.request_parts().headers); + let response = http.response(); + let compression = modify_compressibility_from_response( + compression, + length, + response.headers_mut(), + ); + response.body_mut().initialize(response_fn(compression)) } #[op(fast)] pub fn op_http_set_response_body_resource( state: &mut OpState, - index: u32, + slab_id: SlabId, stream_rid: ResourceId, auto_close: bool, ) -> Result<(), AnyError> { @@ -621,7 +455,7 @@ pub fn op_http_set_response_body_resource( }; set_response( - index, + slab_id, resource.size_hint().1.map(|s| s as usize), move |compression| { ResponseBytesInner::from_resource(compression, resource, auto_close) @@ -634,12 +468,11 @@ pub fn op_http_set_response_body_resource( #[op(fast)] pub fn op_http_set_response_body_stream( state: &mut OpState, - index: u32, + slab_id: SlabId, ) -> Result { // TODO(mmastrac): what should this channel size be? let (tx, rx) = tokio::sync::mpsc::channel(1); - - set_response(index, None, |compression| { + set_response(slab_id, None, |compression| { ResponseBytesInner::from_v8(compression, rx) }); @@ -647,18 +480,18 @@ pub fn op_http_set_response_body_stream( } #[op(fast)] -pub fn op_http_set_response_body_text(index: u32, text: String) { +pub fn op_http_set_response_body_text(slab_id: SlabId, text: String) { if !text.is_empty() { - set_response(index, Some(text.len()), |compression| { + set_response(slab_id, Some(text.len()), |compression| { ResponseBytesInner::from_vec(compression, text.into_bytes()) }); } } #[op(fast)] -pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) { +pub fn op_http_set_response_body_bytes(slab_id: SlabId, buffer: &[u8]) { if !buffer.is_empty() { - set_response(index, Some(buffer.len()), |compression| { + set_response(slab_id, Some(buffer.len()), |compression| { ResponseBytesInner::from_slice(compression, buffer) }); }; @@ -667,12 +500,11 @@ pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) { #[op] pub async fn op_http_track( state: Rc>, - index: u32, + slab_id: SlabId, server_rid: ResourceId, ) -> Result<(), AnyError> { - let handle = with_resp(index, |resp| { - resp.as_ref().unwrap().body().completion_handle() - }); + let http = slab_get(slab_id); + let handle = http.body_promise(); let join_handle = state .borrow_mut() @@ -689,15 +521,15 @@ pub async fn op_http_track( } #[pin_project(PinnedDrop)] -pub struct SlabFuture>(u32, #[pin] F); +pub struct SlabFuture>(SlabId, #[pin] F); pub fn new_slab_future( request: Request, request_info: HttpConnectionProperties, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, ) -> SlabFuture> { let index = slab_insert(request, request_info); - let rx = with_promise(index, |promise| promise.clone()); + let rx = slab_get(index).promise(); SlabFuture(index, async move { if tx.send(index).await.is_ok() { // We only need to wait for completion if we aren't closed @@ -711,16 +543,7 @@ impl> SlabFuture {} #[pinned_drop] impl> PinnedDrop for SlabFuture { fn drop(self: Pin<&mut Self>) { - SLAB.with(|slab| { - #[cfg(__zombie_http_tracking)] - { - slab.borrow_mut().get_mut(self.0 as usize).unwrap().alive = false; - } - #[cfg(not(__zombie_http_tracking))] - { - slab.borrow_mut().remove(self.0 as usize); - } - }); + slab_drop(self.0); } } @@ -736,7 +559,7 @@ impl> Future for SlabFuture { .project() .1 .poll(cx) - .map(|_| Ok(with_resp_mut(index, |resp| resp.take().unwrap()))) + .map(|_| Ok(slab_get(index).take_response())) } } @@ -776,7 +599,7 @@ fn serve_https( mut io: TlsStream, request_info: HttpConnectionProperties, cancel: Rc, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) @@ -803,7 +626,7 @@ fn serve_http( io: impl HttpServeStream, request_info: HttpConnectionProperties, cancel: Rc, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) @@ -815,7 +638,7 @@ fn serve_http_on( connection: HTTP::Connection, listen_properties: &HttpListenProperties, cancel: Rc, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> where HTTP: HttpPropertyExtractor, @@ -843,7 +666,7 @@ struct HttpJoinHandle( AsyncRefCell>>>, // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd Rc, - AsyncRefCell>, + AsyncRefCell>, ); impl HttpJoinHandle { @@ -963,7 +786,7 @@ where pub async fn op_http_wait( state: Rc>, rid: ResourceId, -) -> Result { +) -> Result { // We will get the join handle initially, as we might be consuming requests still let join_handle = state .borrow_mut() @@ -1003,14 +826,14 @@ pub async fn op_http_wait( if let Some(err) = err.source() { if let Some(err) = err.downcast_ref::() { if err.kind() == io::ErrorKind::NotConnected { - return Ok(u32::MAX); + return Ok(SlabId::MAX); } } } return Err(err); } - Ok(u32::MAX) + Ok(SlabId::MAX) } struct UpgradeStream { diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 1ed1e60b78..2660f46532 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -81,6 +81,7 @@ mod reader_stream; mod request_body; mod request_properties; mod response_body; +mod slab; mod websocket_upgrade; pub use request_properties::DefaultHttpPropertyExtractor; diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 288d747584..e30c917c30 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -82,6 +82,10 @@ impl CompletionHandle { waker.wake(); } } + + pub fn is_completed(&self) -> bool { + self.inner.borrow().complete + } } impl Future for CompletionHandle { diff --git a/ext/http/slab.rs b/ext/http/slab.rs new file mode 100644 index 0000000000..24554d6899 --- /dev/null +++ b/ext/http/slab.rs @@ -0,0 +1,241 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use crate::request_properties::HttpConnectionProperties; +use crate::response_body::CompletionHandle; +use crate::response_body::ResponseBytes; +use deno_core::error::AnyError; +use http::request::Parts; +use hyper1::body::Incoming; +use hyper1::upgrade::OnUpgrade; + +use slab::Slab; +use std::cell::RefCell; +use std::cell::RefMut; +use std::ptr::NonNull; + +pub type Request = hyper1::Request; +pub type Response = hyper1::Response; +pub type SlabId = u32; + +pub struct HttpSlabRecord { + request_info: HttpConnectionProperties, + request_parts: Parts, + request_body: Option, + // The response may get taken before we tear this down + response: Option, + promise: CompletionHandle, + been_dropped: bool, + #[cfg(feature = "__zombie_http_tracking")] + alive: bool, +} + +thread_local! { + static SLAB: RefCell> = RefCell::new(Slab::with_capacity(1024)); +} + +macro_rules! http_trace { + ($index:expr, $args:tt) => { + #[cfg(feature = "__http_tracing")] + { + let total = SLAB.with(|x| x.try_borrow().map(|x| x.len())); + if let Ok(total) = total { + println!("HTTP id={} total={}: {}", $index, total, format!($args)); + } else { + println!("HTTP id={} total=?: {}", $index, format!($args)); + } + } + }; +} + +/// Hold a lock on the slab table and a reference to one entry in the table. +pub struct SlabEntry( + NonNull, + SlabId, + RefMut<'static, Slab>, +); + +pub fn slab_get(index: SlabId) -> SlabEntry { + http_trace!(index, "slab_get"); + let mut lock: RefMut<'static, Slab> = SLAB.with(|x| { + // SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static + unsafe { std::mem::transmute(x.borrow_mut()) } + }); + let Some(entry) = lock.get_mut(index as usize) else { + panic!("HTTP state error: Attemped to access invalid request {} ({} in total available)", + index, + lock.len()) + }; + #[cfg(feature = "__zombie_http_tracking")] + { + assert!(entry.alive, "HTTP state error: Entry is not alive"); + } + let entry = NonNull::new(entry as _).unwrap(); + + SlabEntry(entry, index, lock) +} + +#[allow(clippy::let_and_return)] +fn slab_insert_raw( + request_parts: Parts, + request_body: Option, + request_info: HttpConnectionProperties, +) -> SlabId { + let index = SLAB.with(|slab| { + let mut slab = slab.borrow_mut(); + slab.insert(HttpSlabRecord { + request_info, + request_parts, + request_body, + response: Some(Response::new(ResponseBytes::default())), + been_dropped: false, + promise: CompletionHandle::default(), + #[cfg(feature = "__zombie_http_tracking")] + alive: true, + }) + }) as u32; + http_trace!(index, "slab_insert"); + index +} + +pub fn slab_insert( + request: Request, + request_info: HttpConnectionProperties, +) -> SlabId { + let (request_parts, request_body) = request.into_parts(); + slab_insert_raw(request_parts, Some(request_body), request_info) +} + +pub fn slab_drop(index: SlabId) { + http_trace!(index, "slab_drop"); + let mut entry = slab_get(index); + let record = entry.self_mut(); + assert!( + !record.been_dropped, + "HTTP state error: Entry has already been dropped" + ); + record.been_dropped = true; + if record.promise.is_completed() { + drop(entry); + slab_expunge(index); + } +} + +fn slab_expunge(index: SlabId) { + SLAB.with(|slab| { + #[cfg(__zombie_http_tracking)] + { + slab.borrow_mut().get_mut(index as usize).unwrap().alive = false; + } + #[cfg(not(__zombie_http_tracking))] + { + slab.borrow_mut().remove(index as usize); + } + }); + http_trace!(index, "slab_expunge"); +} + +impl SlabEntry { + fn self_ref(&self) -> &HttpSlabRecord { + // SAFETY: We have the lock and we're borrowing lifetime from self + unsafe { self.0.as_ref() } + } + + fn self_mut(&mut self) -> &mut HttpSlabRecord { + // SAFETY: We have the lock and we're borrowing lifetime from self + unsafe { self.0.as_mut() } + } + + /// Perform the Hyper upgrade on this entry. + pub fn upgrade(&mut self) -> Result { + // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit + self + .self_mut() + .request_parts + .extensions + .remove::() + .ok_or_else(|| AnyError::msg("upgrade unavailable")) + } + + /// Take the Hyper body from this entry. + pub fn take_body(&mut self) -> Incoming { + self.self_mut().request_body.take().unwrap() + } + + /// Complete this entry, potentially expunging it if it is complete. + pub fn complete(self) { + let promise = &self.self_ref().promise; + assert!( + !promise.is_completed(), + "HTTP state error: Entry has already been completed" + ); + http_trace!(self.1, "SlabEntry::complete"); + promise.complete(true); + // If we're all done, we need to drop ourself to release the lock before we expunge this record + if self.self_ref().been_dropped { + let index = self.1; + drop(self); + slab_expunge(index); + } + } + + /// Get a mutable reference to the response. + pub fn response(&mut self) -> &mut Response { + self.self_mut().response.as_mut().unwrap() + } + + /// Take the response. + pub fn take_response(&mut self) -> Response { + self.self_mut().response.take().unwrap() + } + + /// Get a reference to the connection properties. + pub fn request_info(&self) -> &HttpConnectionProperties { + &self.self_ref().request_info + } + + /// Get a reference to the request parts. + pub fn request_parts(&self) -> &Parts { + &self.self_ref().request_parts + } + + /// Get a reference to the completion handle. + pub fn promise(&self) -> CompletionHandle { + self.self_ref().promise.clone() + } + + /// Get a reference to the response body completion handle. + pub fn body_promise(&self) -> CompletionHandle { + self + .self_ref() + .response + .as_ref() + .unwrap() + .body() + .completion_handle() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use deno_net::raw::NetworkStreamType; + use http::Request; + + #[test] + fn test_slab() { + let req = Request::builder().body(()).unwrap(); + let (parts, _) = req.into_parts(); + let id = slab_insert_raw( + parts, + None, + HttpConnectionProperties { + peer_address: "".into(), + peer_port: None, + local_port: None, + stream_type: NetworkStreamType::Tcp, + }, + ); + let entry = slab_get(id); + entry.complete(); + slab_drop(id); + } +}