mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 08:33:43 -05:00
refactor(ext/http): use scopeguard defer to handle async drop (#20652)
Use the [scopeguard](https://docs.rs/scopeguard/) defer macro to run cleanup code for `new_slab_future`. This means it can be a single async function, avoiding the need to create a struct and implement `PinnedDrop` Async cleanup in Rust is awkward because async functions may be cancelled at any await point when their Future is dropped. The scopeguard approach comes from the following articles: * [How to think about `async`/`await` in Rust](http://cliffle.com/blog/async-inversion/) * [Async Cancellation I](https://blog.yoshuawuyts.com/async-cancellation-1/) (Reddit [discussion](https://www.reddit.com/r/rust/comments/qrhg39/blog_post_async_cancellation/))
This commit is contained in:
parent
c68650d532
commit
8fcea5966c
5 changed files with 26 additions and 52 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1411,6 +1411,7 @@ dependencies = [
|
||||||
"pin-project",
|
"pin-project",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"ring",
|
"ring",
|
||||||
|
"scopeguard",
|
||||||
"serde",
|
"serde",
|
||||||
"slab",
|
"slab",
|
||||||
"smallvec",
|
"smallvec",
|
||||||
|
|
|
@ -129,6 +129,7 @@ rustls-pemfile = "1.0.0"
|
||||||
rustls-webpki = "0.101.4"
|
rustls-webpki = "0.101.4"
|
||||||
rustls-native-certs = "0.6.2"
|
rustls-native-certs = "0.6.2"
|
||||||
webpki-roots = "0.25.2"
|
webpki-roots = "0.25.2"
|
||||||
|
scopeguard = "1.2.0"
|
||||||
serde = { version = "1.0.149", features = ["derive"] }
|
serde = { version = "1.0.149", features = ["derive"] }
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
serde_json = "1.0.85"
|
serde_json = "1.0.85"
|
||||||
|
|
|
@ -44,6 +44,7 @@ percent-encoding.workspace = true
|
||||||
phf = { version = "0.10", features = ["macros"] }
|
phf = { version = "0.10", features = ["macros"] }
|
||||||
pin-project.workspace = true
|
pin-project.workspace = true
|
||||||
ring.workspace = true
|
ring.workspace = true
|
||||||
|
scopeguard.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
slab.workspace = true
|
slab.workspace = true
|
||||||
smallvec.workspace = true
|
smallvec.workspace = true
|
||||||
|
|
|
@ -10,11 +10,9 @@ use crate::request_properties::HttpPropertyExtractor;
|
||||||
use crate::response_body::Compression;
|
use crate::response_body::Compression;
|
||||||
use crate::response_body::ResponseBytes;
|
use crate::response_body::ResponseBytes;
|
||||||
use crate::response_body::ResponseBytesInner;
|
use crate::response_body::ResponseBytesInner;
|
||||||
use crate::slab::http_trace;
|
use crate::slab::new_slab_future;
|
||||||
use crate::slab::slab_drop;
|
|
||||||
use crate::slab::slab_get;
|
use crate::slab::slab_get;
|
||||||
use crate::slab::slab_init;
|
use crate::slab::slab_init;
|
||||||
use crate::slab::slab_insert;
|
|
||||||
use crate::slab::HttpRequestBodyAutocloser;
|
use crate::slab::HttpRequestBodyAutocloser;
|
||||||
use crate::slab::RefCount;
|
use crate::slab::RefCount;
|
||||||
use crate::slab::SlabId;
|
use crate::slab::SlabId;
|
||||||
|
@ -61,8 +59,6 @@ use hyper1::service::service_fn;
|
||||||
use hyper1::service::HttpService;
|
use hyper1::service::HttpService;
|
||||||
use hyper1::StatusCode;
|
use hyper1::StatusCode;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use pin_project::pin_project;
|
|
||||||
use pin_project::pinned_drop;
|
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
|
@ -76,7 +72,6 @@ use tokio::io::AsyncReadExt;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
type Request = hyper1::Request<Incoming>;
|
type Request = hyper1::Request<Incoming>;
|
||||||
type Response = hyper1::Response<ResponseBytes>;
|
|
||||||
|
|
||||||
static USE_WRITEV: Lazy<bool> = Lazy::new(|| {
|
static USE_WRITEV: Lazy<bool> = Lazy::new(|| {
|
||||||
let enable = std::env::var("DENO_USE_WRITEV").ok();
|
let enable = std::env::var("DENO_USE_WRITEV").ok();
|
||||||
|
@ -706,52 +701,6 @@ pub async fn op_http_track(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[pin_project(PinnedDrop)]
|
|
||||||
pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F);
|
|
||||||
|
|
||||||
pub fn new_slab_future(
|
|
||||||
request: Request,
|
|
||||||
request_info: HttpConnectionProperties,
|
|
||||||
refcount: RefCount,
|
|
||||||
tx: tokio::sync::mpsc::Sender<SlabId>,
|
|
||||||
) -> SlabFuture<impl Future<Output = ()>> {
|
|
||||||
let index = slab_insert(request, request_info, refcount);
|
|
||||||
let rx = slab_get(index).promise();
|
|
||||||
SlabFuture(index, async move {
|
|
||||||
if tx.send(index).await.is_ok() {
|
|
||||||
http_trace!(index, "SlabFuture await");
|
|
||||||
// We only need to wait for completion if we aren't closed
|
|
||||||
rx.await;
|
|
||||||
http_trace!(index, "SlabFuture complete");
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F: Future<Output = ()>> SlabFuture<F> {}
|
|
||||||
|
|
||||||
#[pinned_drop]
|
|
||||||
impl<F: Future<Output = ()>> PinnedDrop for SlabFuture<F> {
|
|
||||||
fn drop(self: Pin<&mut Self>) {
|
|
||||||
slab_drop(self.0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F: Future<Output = ()>> Future for SlabFuture<F> {
|
|
||||||
type Output = Result<Response, hyper::Error>;
|
|
||||||
|
|
||||||
fn poll(
|
|
||||||
self: Pin<&mut Self>,
|
|
||||||
cx: &mut std::task::Context<'_>,
|
|
||||||
) -> std::task::Poll<Self::Output> {
|
|
||||||
let index = self.0;
|
|
||||||
self
|
|
||||||
.project()
|
|
||||||
.1
|
|
||||||
.poll(cx)
|
|
||||||
.map(|_| Ok(slab_get(index).take_response()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn serve_http11_unconditional(
|
fn serve_http11_unconditional(
|
||||||
io: impl HttpServeStream,
|
io: impl HttpServeStream,
|
||||||
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
|
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
|
||||||
|
|
|
@ -10,6 +10,7 @@ use http::HeaderMap;
|
||||||
use hyper1::body::Incoming;
|
use hyper1::body::Incoming;
|
||||||
use hyper1::upgrade::OnUpgrade;
|
use hyper1::upgrade::OnUpgrade;
|
||||||
|
|
||||||
|
use scopeguard::defer;
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::cell::RefMut;
|
use std::cell::RefMut;
|
||||||
|
@ -52,6 +53,27 @@ impl Drop for HttpRequestBodyAutocloser {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn new_slab_future(
|
||||||
|
request: Request,
|
||||||
|
request_info: HttpConnectionProperties,
|
||||||
|
refcount: RefCount,
|
||||||
|
tx: tokio::sync::mpsc::Sender<SlabId>,
|
||||||
|
) -> Result<Response, hyper::Error> {
|
||||||
|
let index = slab_insert(request, request_info, refcount);
|
||||||
|
defer! {
|
||||||
|
slab_drop(index);
|
||||||
|
}
|
||||||
|
let rx = slab_get(index).promise();
|
||||||
|
if tx.send(index).await.is_ok() {
|
||||||
|
http_trace!(index, "SlabFuture await");
|
||||||
|
// We only need to wait for completion if we aren't closed
|
||||||
|
rx.await;
|
||||||
|
http_trace!(index, "SlabFuture complete");
|
||||||
|
}
|
||||||
|
let response = slab_get(index).take_response();
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
|
||||||
pub struct HttpSlabRecord {
|
pub struct HttpSlabRecord {
|
||||||
request_info: HttpConnectionProperties,
|
request_info: HttpConnectionProperties,
|
||||||
request_parts: Parts,
|
request_parts: Parts,
|
||||||
|
|
Loading…
Reference in a new issue