1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 08:33:43 -05:00

fix(ext/http): Ensure cancelled requests don't crash Deno.serve (#19154)

Fixes for various `Attemped to access invalid request` bugs (#19058,
#15427, #17213).

We did not wait for both a drop event and a completion event before
removing items from the slab table. This ensures that we do so.

In addition, the slab methods are refactored out into `slab.rs` for
maintainability.
This commit is contained in:
Matt Mastracci 2023-05-16 17:00:59 -06:00 committed by GitHub
parent 9ba2c4c42f
commit a22388bbd1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 416 additions and 313 deletions

View file

@ -15,7 +15,6 @@ import {
deferred, deferred,
fail, fail,
} from "./test_util.ts"; } from "./test_util.ts";
import { consoleSize } from "../../../runtime/js/40_tty.js";
const { const {
upgradeHttpRaw, upgradeHttpRaw,
@ -665,6 +664,38 @@ Deno.test({ permissions: { net: true } }, async function httpServerClose() {
await server; await server;
}); });
// https://github.com/denoland/deno/issues/15427
Deno.test({ permissions: { net: true } }, async function httpServerCloseGet() {
const ac = new AbortController();
const listeningPromise = deferred();
const requestPromise = deferred();
const responsePromise = deferred();
const server = Deno.serve({
handler: async () => {
requestPromise.resolve();
await new Promise((r) => setTimeout(r, 500));
responsePromise.resolve();
return new Response("ok");
},
port: 4501,
signal: ac.signal,
onListen: onListen(listeningPromise),
onError: createOnErrorCb(ac),
});
await listeningPromise;
const conn = await Deno.connect({ port: 4501 });
const encoder = new TextEncoder();
const body =
`GET / HTTP/1.1\r\nHost: example.domain\r\nConnection: close\r\n\r\n`;
const writeResult = await conn.write(encoder.encode(body));
assertEquals(body.length, writeResult);
await requestPromise;
conn.close();
await responsePromise;
ac.abort();
await server;
});
// FIXME: // FIXME:
Deno.test( Deno.test(
{ permissions: { net: true } }, { permissions: { net: true } },

View file

@ -534,6 +534,8 @@ function mapToCallback(responseBodies, context, signal, callback, onError) {
// Did everything shut down while we were waiting? // Did everything shut down while we were waiting?
if (context.closed) { if (context.closed) {
// We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
op_http_set_promise_complete(req, 503);
innerRequest?.close(); innerRequest?.close();
return; return;
} }

View file

@ -12,6 +12,7 @@ description = "HTTP server implementation for Deno"
[features] [features]
"__zombie_http_tracking" = [] "__zombie_http_tracking" = []
"__http_tracing" = []
[lib] [lib]
path = "lib.rs" path = "lib.rs"

View file

@ -6,11 +6,14 @@ use crate::request_body::HttpRequestBody;
use crate::request_properties::HttpConnectionProperties; use crate::request_properties::HttpConnectionProperties;
use crate::request_properties::HttpListenProperties; use crate::request_properties::HttpListenProperties;
use crate::request_properties::HttpPropertyExtractor; use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::CompletionHandle;
use crate::response_body::Compression; use crate::response_body::Compression;
use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner; use crate::response_body::ResponseBytesInner;
use crate::response_body::V8StreamHttpResponseBody; use crate::response_body::V8StreamHttpResponseBody;
use crate::slab::slab_drop;
use crate::slab::slab_get;
use crate::slab::slab_insert;
use crate::slab::SlabId;
use crate::websocket_upgrade::WebSocketUpgrade; use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor; use crate::LocalExecutor;
use cache_control::CacheControl; use cache_control::CacheControl;
@ -39,7 +42,6 @@ use http::header::CONTENT_ENCODING;
use http::header::CONTENT_LENGTH; use http::header::CONTENT_LENGTH;
use http::header::CONTENT_RANGE; use http::header::CONTENT_RANGE;
use http::header::CONTENT_TYPE; use http::header::CONTENT_TYPE;
use http::request::Parts;
use http::HeaderMap; use http::HeaderMap;
use hyper1::body::Incoming; use hyper1::body::Incoming;
use hyper1::header::COOKIE; use hyper1::header::COOKIE;
@ -49,12 +51,10 @@ use hyper1::server::conn::http1;
use hyper1::server::conn::http2; use hyper1::server::conn::http2;
use hyper1::service::service_fn; use hyper1::service::service_fn;
use hyper1::service::HttpService; use hyper1::service::HttpService;
use hyper1::upgrade::OnUpgrade;
use hyper1::StatusCode; use hyper1::StatusCode;
use pin_project::pin_project; use pin_project::pin_project;
use pin_project::pinned_drop; use pin_project::pinned_drop;
use slab::Slab;
use std::borrow::Cow; use std::borrow::Cow;
use std::cell::RefCell; use std::cell::RefCell;
use std::future::Future; use std::future::Future;
@ -99,163 +99,16 @@ impl<
{ {
} }
pub struct HttpSlabRecord {
request_info: HttpConnectionProperties,
request_parts: Parts,
request_body: Option<Incoming>,
// The response may get taken before we tear this down
response: Option<Response>,
body: Option<Rc<HttpRequestBody>>,
promise: CompletionHandle,
#[cfg(__zombie_http_tracking)]
alive: bool,
}
thread_local! {
pub static SLAB: RefCell<Slab<HttpSlabRecord>> = RefCell::new(Slab::with_capacity(1024));
}
/// Generates getters and setters for the [`SLAB`]. For example,
/// `with!(with_req, with_req_mut, Parts, http, http.request_parts);` expands to:
///
/// ```ignore
/// #[inline(always)]
/// #[allow(dead_code)]
/// pub(crate) fn with_req_mut<T>(key: usize, f: impl FnOnce(&mut Parts) -> T) -> T {
/// SLAB.with(|slab| {
/// let mut borrow = slab.borrow_mut();
/// let mut http = borrow.get_mut(key).unwrap();
/// #[cfg(__zombie_http_tracking)]
/// if !http.alive {
/// panic!("Attempted to access a dead HTTP object")
/// }
/// f(&mut http.expr)
/// })
/// }
/// #[inline(always)]
/// #[allow(dead_code)]
/// pub(crate) fn with_req<T>(key: usize, f: impl FnOnce(&Parts) -> T) -> T {
/// SLAB.with(|slab| {
/// let mut borrow = slab.borrow();
/// let mut http = borrow.get(key).unwrap();
/// #[cfg(__zombie_http_tracking)]
/// if !http.alive {
/// panic!("Attempted to access a dead HTTP object")
/// }
/// f(&http.expr)
/// })
/// }
/// ```
macro_rules! with {
($ref:ident, $mut:ident, $type:ty, $http:ident, $expr:expr) => {
#[inline(always)]
#[allow(dead_code)]
pub(crate) fn $mut<T>(key: u32, f: impl FnOnce(&mut $type) -> T) -> T {
SLAB.with(|slab| {
let mut borrow = slab.borrow_mut();
#[allow(unused_mut)] // TODO(mmastrac): compiler issue?
let mut $http = match borrow.get_mut(key as usize) {
Some(http) => http,
None => panic!(
"Attemped to access invalid request {} ({} in total available)",
key,
borrow.len()
),
};
#[cfg(__zombie_http_tracking)]
if !$http.alive {
panic!("Attempted to access a dead HTTP object")
}
f(&mut $expr)
})
}
#[inline(always)]
#[allow(dead_code)]
pub(crate) fn $ref<T>(key: u32, f: impl FnOnce(&$type) -> T) -> T {
SLAB.with(|slab| {
let borrow = slab.borrow();
let $http = borrow.get(key as usize).unwrap();
#[cfg(__zombie_http_tracking)]
if !$http.alive {
panic!("Attempted to access a dead HTTP object")
}
f(&$expr)
})
}
};
}
with!(with_req, with_req_mut, Parts, http, http.request_parts);
with!(
with_req_body,
with_req_body_mut,
Option<Incoming>,
http,
http.request_body
);
with!(
with_resp,
with_resp_mut,
Option<Response>,
http,
http.response
);
with!(
with_body,
with_body_mut,
Option<Rc<HttpRequestBody>>,
http,
http.body
);
with!(
with_promise,
with_promise_mut,
CompletionHandle,
http,
http.promise
);
with!(with_http, with_http_mut, HttpSlabRecord, http, http);
fn slab_insert(
request: Request,
request_info: HttpConnectionProperties,
) -> u32 {
SLAB.with(|slab| {
let (request_parts, request_body) = request.into_parts();
slab.borrow_mut().insert(HttpSlabRecord {
request_info,
request_parts,
request_body: Some(request_body),
response: Some(Response::new(ResponseBytes::default())),
body: None,
promise: CompletionHandle::default(),
#[cfg(__zombie_http_tracking)]
alive: true,
})
}) as u32
}
#[op] #[op]
pub fn op_http_upgrade_raw( pub fn op_http_upgrade_raw(
state: &mut OpState, state: &mut OpState,
index: u32, slab_id: SlabId,
) -> Result<ResourceId, AnyError> { ) -> Result<ResourceId, AnyError> {
// Stage 1: extract the upgrade future // Stage 1: extract the upgrade future
let upgrade = with_http_mut(index, |http| { let upgrade = slab_get(slab_id).upgrade()?;
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
http
.request_parts
.extensions
.remove::<OnUpgrade>()
.ok_or_else(|| AnyError::msg("upgrade unavailable"))
})?;
let (read, write) = tokio::io::duplex(1024); let (read, write) = tokio::io::duplex(1024);
let (read_rx, write_tx) = tokio::io::split(read); let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write); let (mut write_rx, mut read_tx) = tokio::io::split(write);
spawn(async move { spawn(async move {
let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default(); let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();
@ -266,8 +119,9 @@ pub fn op_http_upgrade_raw(
match upgrade_stream.write(&buf[..read]) { match upgrade_stream.write(&buf[..read]) {
Ok(None) => continue, Ok(None) => continue,
Ok(Some((response, bytes))) => { Ok(Some((response, bytes))) => {
with_resp_mut(index, |resp| *resp = Some(response)); let mut http = slab_get(slab_id);
with_promise_mut(index, |promise| promise.complete(true)); *http.response() = response;
http.complete();
let mut upgraded = upgrade.await?; let mut upgraded = upgrade.await?;
upgraded.write_all(&bytes).await?; upgraded.write_all(&bytes).await?;
break upgraded; break upgraded;
@ -315,19 +169,14 @@ pub fn op_http_upgrade_raw(
#[op] #[op]
pub async fn op_http_upgrade_websocket_next( pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
index: u32, slab_id: SlabId,
headers: Vec<(ByteString, ByteString)>, headers: Vec<(ByteString, ByteString)>,
) -> Result<ResourceId, AnyError> { ) -> Result<ResourceId, AnyError> {
// Stage 1: set the respnse to 101 Switching Protocols and send it let mut http = slab_get(slab_id);
let upgrade = with_http_mut(index, |http| { // Stage 1: set the response to 101 Switching Protocols and send it
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit let upgrade = http.upgrade()?;
let upgrade = http
.request_parts
.extensions
.remove::<OnUpgrade>()
.ok_or_else(|| AnyError::msg("upgrade unavailable"))?;
let response = http.response.as_mut().unwrap(); let response = http.response();
*response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
for (name, value) in headers { for (name, value) in headers {
response.headers_mut().append( response.headers_mut().append(
@ -335,9 +184,7 @@ pub async fn op_http_upgrade_websocket_next(
HeaderValue::from_bytes(&value).unwrap(), HeaderValue::from_bytes(&value).unwrap(),
); );
} }
http.promise.complete(true); http.complete();
Ok::<_, AnyError>(upgrade)
})?;
// Stage 2: wait for the request to finish upgrading // Stage 2: wait for the request to finish upgrading
let upgraded = upgrade.await?; let upgraded = upgrade.await?;
@ -348,65 +195,61 @@ pub async fn op_http_upgrade_websocket_next(
} }
#[op(fast)] #[op(fast)]
pub fn op_http_set_promise_complete(index: u32, status: u16) { pub fn op_http_set_promise_complete(slab_id: SlabId, status: u16) {
with_resp_mut(index, |resp| { let mut http = slab_get(slab_id);
// The Javascript code will never provide a status that is invalid here (see 23_response.js) // The Javascript code will never provide a status that is invalid here (see 23_response.js)
*resp.as_mut().unwrap().status_mut() = *http.response().status_mut() = StatusCode::from_u16(status).unwrap();
StatusCode::from_u16(status).unwrap(); http.complete();
});
with_promise_mut(index, |promise| {
promise.complete(true);
});
} }
#[op] #[op]
pub fn op_http_get_request_method_and_url<HTTP>( pub fn op_http_get_request_method_and_url<HTTP>(
index: u32, slab_id: SlabId,
) -> (String, Option<String>, String, String, Option<u16>) ) -> (String, Option<String>, String, String, Option<u16>)
where where
HTTP: HttpPropertyExtractor, HTTP: HttpPropertyExtractor,
{ {
// TODO(mmastrac): Passing method can be optimized let http = slab_get(slab_id);
with_http(index, |http| { let request_info = http.request_info();
let request_parts = http.request_parts();
let request_properties = HTTP::request_properties( let request_properties = HTTP::request_properties(
&http.request_info, request_info,
&http.request_parts.uri, &request_parts.uri,
&http.request_parts.headers, &request_parts.headers,
); );
// Only extract the path part - we handle authority elsewhere // Only extract the path part - we handle authority elsewhere
let path = match &http.request_parts.uri.path_and_query() { let path = match &request_parts.uri.path_and_query() {
Some(path_and_query) => path_and_query.to_string(), Some(path_and_query) => path_and_query.to_string(),
None => "".to_owned(), None => "".to_owned(),
}; };
// TODO(mmastrac): Passing method can be optimized
( (
http.request_parts.method.as_str().to_owned(), request_parts.method.as_str().to_owned(),
request_properties.authority, request_properties.authority,
path, path,
String::from(http.request_info.peer_address.as_ref()), String::from(request_info.peer_address.as_ref()),
http.request_info.peer_port, request_info.peer_port,
) )
})
} }
#[op] #[op]
pub fn op_http_get_request_header( pub fn op_http_get_request_header(
index: u32, slab_id: SlabId,
name: String, name: String,
) -> Option<ByteString> { ) -> Option<ByteString> {
with_req(index, |req| { let http = slab_get(slab_id);
let value = req.headers.get(name); let value = http.request_parts().headers.get(name);
value.map(|value| value.as_bytes().into()) value.map(|value| value.as_bytes().into())
})
} }
#[op] #[op]
pub fn op_http_get_request_headers( pub fn op_http_get_request_headers(
index: u32, slab_id: SlabId,
) -> Vec<(ByteString, ByteString)> { ) -> Vec<(ByteString, ByteString)> {
with_req(index, |req| { let http = slab_get(slab_id);
let headers = &req.headers; let headers = &http.request_parts().headers;
let mut vec = Vec::with_capacity(headers.len()); let mut vec = Vec::with_capacity(headers.len());
let mut cookies: Option<Vec<&[u8]>> = None; let mut cookies: Option<Vec<&[u8]>> = None;
for (name, value) in headers { for (name, value) in headers {
@ -435,42 +278,37 @@ pub fn op_http_get_request_headers(
)); ));
} }
vec vec
})
} }
#[op(fast)] #[op(fast)]
pub fn op_http_read_request_body( pub fn op_http_read_request_body(
state: &mut OpState, state: &mut OpState,
index: u32, slab_id: SlabId,
) -> ResourceId { ) -> ResourceId {
let incoming = with_req_body_mut(index, |body| body.take().unwrap()); let mut http = slab_get(slab_id);
let incoming = http.take_body();
let body_resource = Rc::new(HttpRequestBody::new(incoming)); let body_resource = Rc::new(HttpRequestBody::new(incoming));
let res = state.resource_table.add_rc(body_resource.clone()); state.resource_table.add_rc(body_resource)
with_body_mut(index, |body| {
*body = Some(body_resource);
});
res
} }
#[op(fast)] #[op(fast)]
pub fn op_http_set_response_header(index: u32, name: &str, value: &str) { pub fn op_http_set_response_header(slab_id: SlabId, name: &str, value: &str) {
with_resp_mut(index, |resp| { let mut http = slab_get(slab_id);
let resp_headers = resp.as_mut().unwrap().headers_mut(); let resp_headers = http.response().headers_mut();
// These are valid latin-1 strings // These are valid latin-1 strings
let name = HeaderName::from_bytes(name.as_bytes()).unwrap(); let name = HeaderName::from_bytes(name.as_bytes()).unwrap();
let value = HeaderValue::from_bytes(value.as_bytes()).unwrap(); let value = HeaderValue::from_bytes(value.as_bytes()).unwrap();
resp_headers.append(name, value); resp_headers.append(name, value);
});
} }
#[op] #[op]
pub fn op_http_set_response_headers( pub fn op_http_set_response_headers(
index: u32, slab_id: SlabId,
headers: Vec<(ByteString, ByteString)>, headers: Vec<(ByteString, ByteString)>,
) { ) {
let mut http = slab_get(slab_id);
// TODO(mmastrac): Invalid headers should be handled? // TODO(mmastrac): Invalid headers should be handled?
with_resp_mut(index, |resp| { let resp_headers = http.response().headers_mut();
let resp_headers = resp.as_mut().unwrap().headers_mut();
resp_headers.reserve(headers.len()); resp_headers.reserve(headers.len());
for (name, value) in headers { for (name, value) in headers {
// These are valid latin-1 strings // These are valid latin-1 strings
@ -478,7 +316,6 @@ pub fn op_http_set_response_headers(
let value = HeaderValue::from_bytes(&value).unwrap(); let value = HeaderValue::from_bytes(&value).unwrap();
resp_headers.append(name, value); resp_headers.append(name, value);
} }
})
} }
fn is_request_compressible(headers: &HeaderMap) -> Compression { fn is_request_compressible(headers: &HeaderMap) -> Compression {
@ -588,28 +425,25 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
} }
fn set_response( fn set_response(
index: u32, slab_id: SlabId,
length: Option<usize>, length: Option<usize>,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner, response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) { ) {
let compression = let mut http = slab_get(slab_id);
with_req(index, |req| is_request_compressible(&req.headers)); let compression = is_request_compressible(&http.request_parts().headers);
let response = http.response();
with_resp_mut(index, move |response| {
let response = response.as_mut().unwrap();
let compression = modify_compressibility_from_response( let compression = modify_compressibility_from_response(
compression, compression,
length, length,
response.headers_mut(), response.headers_mut(),
); );
response.body_mut().initialize(response_fn(compression)) response.body_mut().initialize(response_fn(compression))
});
} }
#[op(fast)] #[op(fast)]
pub fn op_http_set_response_body_resource( pub fn op_http_set_response_body_resource(
state: &mut OpState, state: &mut OpState,
index: u32, slab_id: SlabId,
stream_rid: ResourceId, stream_rid: ResourceId,
auto_close: bool, auto_close: bool,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
@ -621,7 +455,7 @@ pub fn op_http_set_response_body_resource(
}; };
set_response( set_response(
index, slab_id,
resource.size_hint().1.map(|s| s as usize), resource.size_hint().1.map(|s| s as usize),
move |compression| { move |compression| {
ResponseBytesInner::from_resource(compression, resource, auto_close) ResponseBytesInner::from_resource(compression, resource, auto_close)
@ -634,12 +468,11 @@ pub fn op_http_set_response_body_resource(
#[op(fast)] #[op(fast)]
pub fn op_http_set_response_body_stream( pub fn op_http_set_response_body_stream(
state: &mut OpState, state: &mut OpState,
index: u32, slab_id: SlabId,
) -> Result<ResourceId, AnyError> { ) -> Result<ResourceId, AnyError> {
// TODO(mmastrac): what should this channel size be? // TODO(mmastrac): what should this channel size be?
let (tx, rx) = tokio::sync::mpsc::channel(1); let (tx, rx) = tokio::sync::mpsc::channel(1);
set_response(slab_id, None, |compression| {
set_response(index, None, |compression| {
ResponseBytesInner::from_v8(compression, rx) ResponseBytesInner::from_v8(compression, rx)
}); });
@ -647,18 +480,18 @@ pub fn op_http_set_response_body_stream(
} }
#[op(fast)] #[op(fast)]
pub fn op_http_set_response_body_text(index: u32, text: String) { pub fn op_http_set_response_body_text(slab_id: SlabId, text: String) {
if !text.is_empty() { if !text.is_empty() {
set_response(index, Some(text.len()), |compression| { set_response(slab_id, Some(text.len()), |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes()) ResponseBytesInner::from_vec(compression, text.into_bytes())
}); });
} }
} }
#[op(fast)] #[op(fast)]
pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) { pub fn op_http_set_response_body_bytes(slab_id: SlabId, buffer: &[u8]) {
if !buffer.is_empty() { if !buffer.is_empty() {
set_response(index, Some(buffer.len()), |compression| { set_response(slab_id, Some(buffer.len()), |compression| {
ResponseBytesInner::from_slice(compression, buffer) ResponseBytesInner::from_slice(compression, buffer)
}); });
}; };
@ -667,12 +500,11 @@ pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) {
#[op] #[op]
pub async fn op_http_track( pub async fn op_http_track(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
index: u32, slab_id: SlabId,
server_rid: ResourceId, server_rid: ResourceId,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let handle = with_resp(index, |resp| { let http = slab_get(slab_id);
resp.as_ref().unwrap().body().completion_handle() let handle = http.body_promise();
});
let join_handle = state let join_handle = state
.borrow_mut() .borrow_mut()
@ -689,15 +521,15 @@ pub async fn op_http_track(
} }
#[pin_project(PinnedDrop)] #[pin_project(PinnedDrop)]
pub struct SlabFuture<F: Future<Output = ()>>(u32, #[pin] F); pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F);
pub fn new_slab_future( pub fn new_slab_future(
request: Request, request: Request,
request_info: HttpConnectionProperties, request_info: HttpConnectionProperties,
tx: tokio::sync::mpsc::Sender<u32>, tx: tokio::sync::mpsc::Sender<SlabId>,
) -> SlabFuture<impl Future<Output = ()>> { ) -> SlabFuture<impl Future<Output = ()>> {
let index = slab_insert(request, request_info); let index = slab_insert(request, request_info);
let rx = with_promise(index, |promise| promise.clone()); let rx = slab_get(index).promise();
SlabFuture(index, async move { SlabFuture(index, async move {
if tx.send(index).await.is_ok() { if tx.send(index).await.is_ok() {
// We only need to wait for completion if we aren't closed // We only need to wait for completion if we aren't closed
@ -711,16 +543,7 @@ impl<F: Future<Output = ()>> SlabFuture<F> {}
#[pinned_drop] #[pinned_drop]
impl<F: Future<Output = ()>> PinnedDrop for SlabFuture<F> { impl<F: Future<Output = ()>> PinnedDrop for SlabFuture<F> {
fn drop(self: Pin<&mut Self>) { fn drop(self: Pin<&mut Self>) {
SLAB.with(|slab| { slab_drop(self.0);
#[cfg(__zombie_http_tracking)]
{
slab.borrow_mut().get_mut(self.0 as usize).unwrap().alive = false;
}
#[cfg(not(__zombie_http_tracking))]
{
slab.borrow_mut().remove(self.0 as usize);
}
});
} }
} }
@ -736,7 +559,7 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> {
.project() .project()
.1 .1
.poll(cx) .poll(cx)
.map(|_| Ok(with_resp_mut(index, |resp| resp.take().unwrap()))) .map(|_| Ok(slab_get(index).take_response()))
} }
} }
@ -776,7 +599,7 @@ fn serve_https(
mut io: TlsStream, mut io: TlsStream,
request_info: HttpConnectionProperties, request_info: HttpConnectionProperties,
cancel: Rc<CancelHandle>, cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>, tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> { ) -> JoinHandle<Result<(), AnyError>> {
let svc = service_fn(move |req: Request| { let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone()) new_slab_future(req, request_info.clone(), tx.clone())
@ -803,7 +626,7 @@ fn serve_http(
io: impl HttpServeStream, io: impl HttpServeStream,
request_info: HttpConnectionProperties, request_info: HttpConnectionProperties,
cancel: Rc<CancelHandle>, cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>, tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> { ) -> JoinHandle<Result<(), AnyError>> {
let svc = service_fn(move |req: Request| { let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone()) new_slab_future(req, request_info.clone(), tx.clone())
@ -815,7 +638,7 @@ fn serve_http_on<HTTP>(
connection: HTTP::Connection, connection: HTTP::Connection,
listen_properties: &HttpListenProperties, listen_properties: &HttpListenProperties,
cancel: Rc<CancelHandle>, cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>, tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> ) -> JoinHandle<Result<(), AnyError>>
where where
HTTP: HttpPropertyExtractor, HTTP: HttpPropertyExtractor,
@ -843,7 +666,7 @@ struct HttpJoinHandle(
AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>, AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
// Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd
Rc<CancelHandle>, Rc<CancelHandle>,
AsyncRefCell<tokio::sync::mpsc::Receiver<u32>>, AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
); );
impl HttpJoinHandle { impl HttpJoinHandle {
@ -963,7 +786,7 @@ where
pub async fn op_http_wait( pub async fn op_http_wait(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
rid: ResourceId, rid: ResourceId,
) -> Result<u32, AnyError> { ) -> Result<SlabId, AnyError> {
// We will get the join handle initially, as we might be consuming requests still // We will get the join handle initially, as we might be consuming requests still
let join_handle = state let join_handle = state
.borrow_mut() .borrow_mut()
@ -1003,14 +826,14 @@ pub async fn op_http_wait(
if let Some(err) = err.source() { if let Some(err) = err.source() {
if let Some(err) = err.downcast_ref::<io::Error>() { if let Some(err) = err.downcast_ref::<io::Error>() {
if err.kind() == io::ErrorKind::NotConnected { if err.kind() == io::ErrorKind::NotConnected {
return Ok(u32::MAX); return Ok(SlabId::MAX);
} }
} }
} }
return Err(err); return Err(err);
} }
Ok(u32::MAX) Ok(SlabId::MAX)
} }
struct UpgradeStream { struct UpgradeStream {

View file

@ -81,6 +81,7 @@ mod reader_stream;
mod request_body; mod request_body;
mod request_properties; mod request_properties;
mod response_body; mod response_body;
mod slab;
mod websocket_upgrade; mod websocket_upgrade;
pub use request_properties::DefaultHttpPropertyExtractor; pub use request_properties::DefaultHttpPropertyExtractor;

View file

@ -82,6 +82,10 @@ impl CompletionHandle {
waker.wake(); waker.wake();
} }
} }
pub fn is_completed(&self) -> bool {
self.inner.borrow().complete
}
} }
impl Future for CompletionHandle { impl Future for CompletionHandle {

241
ext/http/slab.rs Normal file
View file

@ -0,0 +1,241 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::request_properties::HttpConnectionProperties;
use crate::response_body::CompletionHandle;
use crate::response_body::ResponseBytes;
use deno_core::error::AnyError;
use http::request::Parts;
use hyper1::body::Incoming;
use hyper1::upgrade::OnUpgrade;
use slab::Slab;
use std::cell::RefCell;
use std::cell::RefMut;
use std::ptr::NonNull;
pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
pub type SlabId = u32;
pub struct HttpSlabRecord {
request_info: HttpConnectionProperties,
request_parts: Parts,
request_body: Option<Incoming>,
// The response may get taken before we tear this down
response: Option<Response>,
promise: CompletionHandle,
been_dropped: bool,
#[cfg(feature = "__zombie_http_tracking")]
alive: bool,
}
thread_local! {
static SLAB: RefCell<Slab<HttpSlabRecord>> = RefCell::new(Slab::with_capacity(1024));
}
macro_rules! http_trace {
($index:expr, $args:tt) => {
#[cfg(feature = "__http_tracing")]
{
let total = SLAB.with(|x| x.try_borrow().map(|x| x.len()));
if let Ok(total) = total {
println!("HTTP id={} total={}: {}", $index, total, format!($args));
} else {
println!("HTTP id={} total=?: {}", $index, format!($args));
}
}
};
}
/// Hold a lock on the slab table and a reference to one entry in the table.
pub struct SlabEntry(
NonNull<HttpSlabRecord>,
SlabId,
RefMut<'static, Slab<HttpSlabRecord>>,
);
pub fn slab_get(index: SlabId) -> SlabEntry {
http_trace!(index, "slab_get");
let mut lock: RefMut<'static, Slab<HttpSlabRecord>> = SLAB.with(|x| {
// SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static
unsafe { std::mem::transmute(x.borrow_mut()) }
});
let Some(entry) = lock.get_mut(index as usize) else {
panic!("HTTP state error: Attemped to access invalid request {} ({} in total available)",
index,
lock.len())
};
#[cfg(feature = "__zombie_http_tracking")]
{
assert!(entry.alive, "HTTP state error: Entry is not alive");
}
let entry = NonNull::new(entry as _).unwrap();
SlabEntry(entry, index, lock)
}
#[allow(clippy::let_and_return)]
fn slab_insert_raw(
request_parts: Parts,
request_body: Option<Incoming>,
request_info: HttpConnectionProperties,
) -> SlabId {
let index = SLAB.with(|slab| {
let mut slab = slab.borrow_mut();
slab.insert(HttpSlabRecord {
request_info,
request_parts,
request_body,
response: Some(Response::new(ResponseBytes::default())),
been_dropped: false,
promise: CompletionHandle::default(),
#[cfg(feature = "__zombie_http_tracking")]
alive: true,
})
}) as u32;
http_trace!(index, "slab_insert");
index
}
pub fn slab_insert(
request: Request,
request_info: HttpConnectionProperties,
) -> SlabId {
let (request_parts, request_body) = request.into_parts();
slab_insert_raw(request_parts, Some(request_body), request_info)
}
pub fn slab_drop(index: SlabId) {
http_trace!(index, "slab_drop");
let mut entry = slab_get(index);
let record = entry.self_mut();
assert!(
!record.been_dropped,
"HTTP state error: Entry has already been dropped"
);
record.been_dropped = true;
if record.promise.is_completed() {
drop(entry);
slab_expunge(index);
}
}
fn slab_expunge(index: SlabId) {
SLAB.with(|slab| {
#[cfg(__zombie_http_tracking)]
{
slab.borrow_mut().get_mut(index as usize).unwrap().alive = false;
}
#[cfg(not(__zombie_http_tracking))]
{
slab.borrow_mut().remove(index as usize);
}
});
http_trace!(index, "slab_expunge");
}
impl SlabEntry {
fn self_ref(&self) -> &HttpSlabRecord {
// SAFETY: We have the lock and we're borrowing lifetime from self
unsafe { self.0.as_ref() }
}
fn self_mut(&mut self) -> &mut HttpSlabRecord {
// SAFETY: We have the lock and we're borrowing lifetime from self
unsafe { self.0.as_mut() }
}
/// Perform the Hyper upgrade on this entry.
pub fn upgrade(&mut self) -> Result<OnUpgrade, AnyError> {
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
self
.self_mut()
.request_parts
.extensions
.remove::<OnUpgrade>()
.ok_or_else(|| AnyError::msg("upgrade unavailable"))
}
/// Take the Hyper body from this entry.
pub fn take_body(&mut self) -> Incoming {
self.self_mut().request_body.take().unwrap()
}
/// Complete this entry, potentially expunging it if it is complete.
pub fn complete(self) {
let promise = &self.self_ref().promise;
assert!(
!promise.is_completed(),
"HTTP state error: Entry has already been completed"
);
http_trace!(self.1, "SlabEntry::complete");
promise.complete(true);
// If we're all done, we need to drop ourself to release the lock before we expunge this record
if self.self_ref().been_dropped {
let index = self.1;
drop(self);
slab_expunge(index);
}
}
/// Get a mutable reference to the response.
pub fn response(&mut self) -> &mut Response {
self.self_mut().response.as_mut().unwrap()
}
/// Take the response.
pub fn take_response(&mut self) -> Response {
self.self_mut().response.take().unwrap()
}
/// Get a reference to the connection properties.
pub fn request_info(&self) -> &HttpConnectionProperties {
&self.self_ref().request_info
}
/// Get a reference to the request parts.
pub fn request_parts(&self) -> &Parts {
&self.self_ref().request_parts
}
/// Get a reference to the completion handle.
pub fn promise(&self) -> CompletionHandle {
self.self_ref().promise.clone()
}
/// Get a reference to the response body completion handle.
pub fn body_promise(&self) -> CompletionHandle {
self
.self_ref()
.response
.as_ref()
.unwrap()
.body()
.completion_handle()
}
}
#[cfg(test)]
mod tests {
use super::*;
use deno_net::raw::NetworkStreamType;
use http::Request;
#[test]
fn test_slab() {
let req = Request::builder().body(()).unwrap();
let (parts, _) = req.into_parts();
let id = slab_insert_raw(
parts,
None,
HttpConnectionProperties {
peer_address: "".into(),
peer_port: None,
local_port: None,
stream_type: NetworkStreamType::Tcp,
},
);
let entry = slab_get(id);
entry.complete();
slab_drop(id);
}
}