1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-13 09:32:24 -05:00
denoland-deno/ext/http/slab.rs

265 lines
7.1 KiB
Rust
Raw Normal View History

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::request_properties::HttpConnectionProperties;
use crate::response_body::CompletionHandle;
use crate::response_body::ResponseBytes;
use deno_core::error::AnyError;
use http::request::Parts;
use http::HeaderMap;
use hyper1::body::Incoming;
use hyper1::upgrade::OnUpgrade;
use slab::Slab;
use std::cell::RefCell;
use std::cell::RefMut;
use std::ptr::NonNull;
use std::rc::Rc;
pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
pub type SlabId = u32;
pub struct HttpSlabRecord {
request_info: HttpConnectionProperties,
request_parts: Parts,
request_body: Option<Incoming>,
// The response may get taken before we tear this down
response: Option<Response>,
promise: CompletionHandle,
trailers: Rc<RefCell<Option<HeaderMap>>>,
been_dropped: bool,
#[cfg(feature = "__zombie_http_tracking")]
alive: bool,
}
thread_local! {
static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) };
}
macro_rules! http_trace {
($index:expr, $args:tt) => {
#[cfg(feature = "__http_tracing")]
{
let total = SLAB.with(|x| x.try_borrow().map(|x| x.len()));
if let Ok(total) = total {
println!("HTTP id={} total={}: {}", $index, total, format!($args));
} else {
println!("HTTP id={} total=?: {}", $index, format!($args));
}
}
};
}
/// Hold a lock on the slab table and a reference to one entry in the table.
pub struct SlabEntry(
NonNull<HttpSlabRecord>,
SlabId,
RefMut<'static, Slab<HttpSlabRecord>>,
);
const SLAB_CAPACITY: usize = 1024;
pub fn slab_init() {
SLAB.with(|slab: &RefCell<Slab<HttpSlabRecord>>| {
// Note that there might already be an active HTTP server, so this may just
// end up adding room for an additional SLAB_CAPACITY items. All HTTP servers
// on a single thread share the same slab.
let mut slab = slab.borrow_mut();
slab.reserve(SLAB_CAPACITY);
})
}
pub fn slab_get(index: SlabId) -> SlabEntry {
http_trace!(index, "slab_get");
let mut lock: RefMut<'static, Slab<HttpSlabRecord>> = SLAB.with(|x| {
// SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static
unsafe { std::mem::transmute(x.borrow_mut()) }
});
let Some(entry) = lock.get_mut(index as usize) else {
2023-06-26 09:10:27 -04:00
panic!("HTTP state error: Attempted to access invalid request {} ({} in total available)",
index,
lock.len())
};
#[cfg(feature = "__zombie_http_tracking")]
{
assert!(entry.alive, "HTTP state error: Entry is not alive");
}
let entry = NonNull::new(entry as _).unwrap();
SlabEntry(entry, index, lock)
}
#[allow(clippy::let_and_return)]
fn slab_insert_raw(
request_parts: Parts,
request_body: Option<Incoming>,
request_info: HttpConnectionProperties,
) -> SlabId {
let index = SLAB.with(|slab| {
let mut slab = slab.borrow_mut();
let body = ResponseBytes::default();
let trailers = body.trailers();
slab.insert(HttpSlabRecord {
request_info,
request_parts,
request_body,
response: Some(Response::new(body)),
trailers,
been_dropped: false,
promise: CompletionHandle::default(),
#[cfg(feature = "__zombie_http_tracking")]
alive: true,
})
}) as u32;
http_trace!(index, "slab_insert");
index
}
pub fn slab_insert(
request: Request,
request_info: HttpConnectionProperties,
) -> SlabId {
let (request_parts, request_body) = request.into_parts();
slab_insert_raw(request_parts, Some(request_body), request_info)
}
pub fn slab_drop(index: SlabId) {
http_trace!(index, "slab_drop");
let mut entry = slab_get(index);
let record = entry.self_mut();
assert!(
!record.been_dropped,
"HTTP state error: Entry has already been dropped"
);
record.been_dropped = true;
if record.promise.is_completed() {
drop(entry);
slab_expunge(index);
}
}
fn slab_expunge(index: SlabId) {
SLAB.with(|slab| {
#[cfg(__zombie_http_tracking)]
{
slab.borrow_mut().get_mut(index as usize).unwrap().alive = false;
}
#[cfg(not(__zombie_http_tracking))]
{
slab.borrow_mut().remove(index as usize);
}
});
http_trace!(index, "slab_expunge");
}
impl SlabEntry {
fn self_ref(&self) -> &HttpSlabRecord {
// SAFETY: We have the lock and we're borrowing lifetime from self
unsafe { self.0.as_ref() }
}
fn self_mut(&mut self) -> &mut HttpSlabRecord {
// SAFETY: We have the lock and we're borrowing lifetime from self
unsafe { self.0.as_mut() }
}
/// Perform the Hyper upgrade on this entry.
pub fn upgrade(&mut self) -> Result<OnUpgrade, AnyError> {
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
self
.self_mut()
.request_parts
.extensions
.remove::<OnUpgrade>()
.ok_or_else(|| AnyError::msg("upgrade unavailable"))
}
/// Take the Hyper body from this entry.
pub fn take_body(&mut self) -> Incoming {
self.self_mut().request_body.take().unwrap()
}
/// Complete this entry, potentially expunging it if it is complete.
pub fn complete(self) {
let promise = &self.self_ref().promise;
assert!(
!promise.is_completed(),
"HTTP state error: Entry has already been completed"
);
http_trace!(self.1, "SlabEntry::complete");
promise.complete(true);
// If we're all done, we need to drop ourself to release the lock before we expunge this record
if self.self_ref().been_dropped {
let index = self.1;
drop(self);
slab_expunge(index);
}
}
/// Get a mutable reference to the response.
pub fn response(&mut self) -> &mut Response {
self.self_mut().response.as_mut().unwrap()
}
/// Get a mutable reference to the trailers.
pub fn trailers(&mut self) -> &RefCell<Option<HeaderMap>> {
&self.self_mut().trailers
}
/// Take the response.
pub fn take_response(&mut self) -> Response {
self.self_mut().response.take().unwrap()
}
/// Get a reference to the connection properties.
pub fn request_info(&self) -> &HttpConnectionProperties {
&self.self_ref().request_info
}
/// Get a reference to the request parts.
pub fn request_parts(&self) -> &Parts {
&self.self_ref().request_parts
}
/// Get a reference to the completion handle.
pub fn promise(&self) -> CompletionHandle {
self.self_ref().promise.clone()
}
/// Get a reference to the response body completion handle.
pub fn body_promise(&self) -> CompletionHandle {
self
.self_ref()
.response
.as_ref()
.unwrap()
.body()
.completion_handle()
}
}
#[cfg(test)]
mod tests {
use super::*;
use deno_net::raw::NetworkStreamType;
use http::Request;
#[test]
fn test_slab() {
let req = Request::builder().body(()).unwrap();
let (parts, _) = req.into_parts();
let id = slab_insert_raw(
parts,
None,
HttpConnectionProperties {
peer_address: "".into(),
peer_port: None,
local_port: None,
stream_type: NetworkStreamType::Tcp,
},
);
let entry = slab_get(id);
entry.complete();
slab_drop(id);
}
}