diff --git a/Cargo.lock b/Cargo.lock index e199ec6f73..ca0f04048b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -922,7 +922,6 @@ dependencies = [ "tokio", "tokio-util", "tower-lsp", - "tracing", "trust-dns-client", "trust-dns-server", "twox-hash", @@ -1075,9 +1074,9 @@ dependencies = [ [[package]] name = "deno_core" -version = "0.208.0" +version = "0.209.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aab2b013707b6a1bb1e56404b72a4f68220d0fbe1184133b2b21386a8ffbc5d8" +checksum = "c48ff1f83aeeda4b8ed9c101b85380fd2f25a52268130546c610c8e412911d7b" dependencies = [ "anyhow", "bytes", @@ -1459,9 +1458,9 @@ dependencies = [ [[package]] name = "deno_ops" -version = "0.86.0" +version = "0.87.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b116802ace73e3dd910081652789c85aa21f057b9f5936255d786965816fb3b1" +checksum = "573a5ae66f76ce159525ab9007433e19d1a074e32c27b17a4753780d659d79fa" dependencies = [ "deno-proc-macro-rules", "lazy-regex 2.5.0", @@ -4619,9 +4618,9 @@ dependencies = [ [[package]] name = "serde_v8" -version = "0.119.0" +version = "0.120.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85efce3bb967c7cd2be8058f7b06047489e0b0888fc25db9e3aa7907370ae45c" +checksum = "a5424b4b41a92222abf9ddbdd78f59164f7594422ee4a61fc3704fc8ba608dc6" dependencies = [ "bytes", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index 1bfab9fa1b..c5707de53a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ repository = "https://github.com/denoland/deno" [workspace.dependencies] deno_ast = { version = "0.29.1", features = ["transpiling"] } -deno_core = { version = "0.208.0" } +deno_core = { version = "0.209.0" } deno_runtime = { version = "0.126.0", path = "./runtime" } napi_sym = { version = "0.48.0", path = "./cli/napi/sym" } @@ -139,7 +139,6 @@ tar = "=0.4.40" tempfile = "3.4.0" termcolor = "1.1.3" thiserror = "1.0.40" -tracing = "0" tokio = { version = "1.28.1", features = ["full"] } tokio-metrics = { version = "0.3.0", features = ["rt"] } tokio-rustls = "0.24.0" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index c08e661805..42f868ef96 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -116,7 +116,6 @@ thiserror.workspace = true tokio.workspace = true tokio-util.workspace = true tower-lsp.workspace = true -tracing.workspace = true twox-hash = "=1.6.3" typed-arena = "=2.0.1" uuid = { workspace = true, features = ["serde"] } diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index d1ac826967..3f58903a85 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -41,27 +41,257 @@ function onListen( }; } -Deno.test(async function httpServerShutsDownPortBeforeResolving() { +async function makeServer( + handler: (req: Request) => Response | Promise, +): Promise< + { finished: Promise; abort: () => void; shutdown: () => Promise } +> { const ac = new AbortController(); const listeningPromise = deferred(); const server = Deno.serve({ - handler: (_req) => new Response("ok"), + handler, port: servePort, signal: ac.signal, onListen: onListen(listeningPromise), }); await listeningPromise; - assertThrows(() => Deno.listen({ port: servePort })); + return { + finished: server.finished, + abort() { + ac.abort(); + }, + async shutdown() { + await server.shutdown(); + }, + }; +} - ac.abort(); - await server.finished; +Deno.test(async function httpServerShutsDownPortBeforeResolving() { + const { finished, abort } = await makeServer((_req) => new Response("ok")); + assertThrows(() => Deno.listen({ port: servePort })); + abort(); + await finished; const listener = Deno.listen({ port: servePort }); listener!.close(); }); +// When shutting down abruptly, we require that all in-progress connections are aborted, +// no new connections are allowed, and no new transactions are allowed on existing connections. +Deno.test( + { permissions: { net: true } }, + async function httpServerShutdownAbruptGuaranteeHttp11() { + const promiseQueue: { input: Deferred; out: Deferred }[] = []; + const { finished, abort } = await makeServer((_req) => { + const { input, out } = promiseQueue.shift()!; + return new Response( + new ReadableStream({ + async start(controller) { + controller.enqueue(new Uint8Array([46])); + out.resolve(undefined); + controller.enqueue(encoder.encode(await input)); + controller.close(); + }, + }), + ); + }); + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + const conn = await Deno.connect({ port: servePort }); + const w = conn.writable.getWriter(); + const r = conn.readable.getReader(); + + const deferred1 = { input: deferred(), out: deferred() }; + promiseQueue.push(deferred1); + const deferred2 = { input: deferred(), out: deferred() }; + promiseQueue.push(deferred2); + const deferred3 = { input: deferred(), out: deferred() }; + promiseQueue.push(deferred3); + deferred1.input.resolve("#"); + deferred2.input.resolve("$"); + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + + // Fully read two responses + let text = ""; + while (!text.includes("$\r\n")) { + text += decoder.decode((await r.read()).value); + } + + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + await deferred3.out; + + // This is half served, so wait for the chunk that has the first '.' + text = ""; + while (!text.includes("1\r\n.\r\n")) { + text += decoder.decode((await r.read()).value); + } + + abort(); + + // This doesn't actually write anything, but we release it after aborting + deferred3.input.resolve("!"); + + // Guarantee: can't connect to an aborted server (though this may not happen immediately) + let failed = false; + for (let i = 0; i < 10; i++) { + try { + const conn = await Deno.connect({ port: servePort }); + conn.close(); + // Give the runtime a few ticks to settle (required for Windows) + await new Promise((r) => setTimeout(r, 2 ** i)); + continue; + } catch (_) { + failed = true; + break; + } + } + assert(failed, "The Deno.serve listener was not disabled promptly"); + + // Guarantee: the pipeline is closed abruptly + assert((await r.read()).done); + + try { + conn.close(); + } catch (_) { + // Ignore + } + await finished; + }, +); + +// When shutting down abruptly, we require that all in-progress connections are aborted, +// no new connections are allowed, and no new transactions are allowed on existing connections. +Deno.test( + { permissions: { net: true } }, + async function httpServerShutdownGracefulGuaranteeHttp11() { + const promiseQueue: { input: Deferred; out: Deferred }[] = []; + const { finished, shutdown } = await makeServer((_req) => { + const { input, out } = promiseQueue.shift()!; + return new Response( + new ReadableStream({ + async start(controller) { + controller.enqueue(new Uint8Array([46])); + out.resolve(undefined); + controller.enqueue(encoder.encode(await input)); + controller.close(); + }, + }), + ); + }); + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + const conn = await Deno.connect({ port: servePort }); + const w = conn.writable.getWriter(); + const r = conn.readable.getReader(); + + const deferred1 = { input: deferred(), out: deferred() }; + promiseQueue.push(deferred1); + const deferred2 = { input: deferred(), out: deferred() }; + promiseQueue.push(deferred2); + const deferred3 = { input: deferred(), out: deferred() }; + promiseQueue.push(deferred3); + deferred1.input.resolve("#"); + deferred2.input.resolve("$"); + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + + // Fully read two responses + let text = ""; + while (!text.includes("$\r\n")) { + text += decoder.decode((await r.read()).value); + } + + await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`)); + await deferred3.out; + + // This is half served, so wait for the chunk that has the first '.' + text = ""; + while (!text.includes("1\r\n.\r\n")) { + text += decoder.decode((await r.read()).value); + } + + const shutdownPromise = shutdown(); + + // Release the final response _after_ we shut down + deferred3.input.resolve("!"); + + // Guarantee: can't connect to an aborted server (though this may not happen immediately) + let failed = false; + for (let i = 0; i < 10; i++) { + try { + const conn = await Deno.connect({ port: servePort }); + conn.close(); + // Give the runtime a few ticks to settle (required for Windows) + await new Promise((r) => setTimeout(r, 2 ** i)); + continue; + } catch (_) { + failed = true; + break; + } + } + assert(failed, "The Deno.serve listener was not disabled promptly"); + + // Guarantee: existing connections fully drain + while (!text.includes("!\r\n")) { + text += decoder.decode((await r.read()).value); + } + + await shutdownPromise; + + try { + conn.close(); + } catch (_) { + // Ignore + } + await finished; + }, +); + +// Ensure that resources don't leak during a graceful shutdown +Deno.test( + { permissions: { net: true, write: true, read: true } }, + async function httpServerShutdownGracefulResources() { + const waitForRequest = deferred(); + const { finished, shutdown } = await makeServer(async (_req) => { + waitForRequest.resolve(null); + await new Promise((r) => setTimeout(r, 10)); + return new Response((await makeTempFile(1024 * 1024)).readable); + }); + + const f = fetch(`http://localhost:${servePort}`); + await waitForRequest; + assertEquals((await (await f).text()).length, 1048576); + await shutdown(); + await finished; + }, +); + +// Ensure that resources don't leak during a graceful shutdown +Deno.test( + { permissions: { net: true, write: true, read: true } }, + async function httpServerShutdownGracefulResources2() { + const waitForAbort = deferred(); + const waitForRequest = deferred(); + const { finished, shutdown } = await makeServer(async (_req) => { + waitForRequest.resolve(null); + await waitForAbort; + await new Promise((r) => setTimeout(r, 10)); + return new Response((await makeTempFile(1024 * 1024)).readable); + }); + + const f = fetch(`http://localhost:${servePort}`); + await waitForRequest; + const s = shutdown(); + waitForAbort.resolve(null); + assertEquals((await (await f).text()).length, 1048576); + await s; + await finished; + }, +); + Deno.test( { permissions: { read: true, run: true } }, async function httpServerUnref() { @@ -2459,7 +2689,9 @@ for (const url of ["text", "file", "stream"]) { // Give it a few milliseconds for the serve machinery to work await new Promise((r) => setTimeout(r, 10)); - ac.abort(); + // Since the handler has a chance of creating resources or running async ops, we need to use a + // graceful shutdown here to ensure they have fully drained. + await server.shutdown(); await server.finished; }, }); diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 6a6d6c8367..c8b857dc67 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1936,6 +1936,17 @@ declare namespace Deno { /** The value of this unsigned 64-bit integer, represented as a bigint. */ readonly value: bigint; } + + /** An instance of the server created using `Deno.serve()` API. + * + * @category HTTP Server + */ + export interface Server { + /** Gracefully close the server. No more new connections will be accepted, + * while pending requests will be allowed to finish. + */ + shutdown(): Promise; + } } /** **UNSTABLE**: New API, yet to be vetted. diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 9142058895..aeebca93df 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -4,7 +4,7 @@ const core = globalThis.Deno.core; const primordials = globalThis.__bootstrap.primordials; const internals = globalThis.__bootstrap.internals; -const { BadResourcePrototype } = core; +const { BadResourcePrototype, InterruptedPrototype } = core; import { InnerBody } from "ext:deno_fetch/22_body.js"; import { Event } from "ext:deno_web/02_event.js"; import { @@ -65,6 +65,8 @@ const { op_http_upgrade_websocket_next, op_http_try_wait, op_http_wait, + op_http_cancel, + op_http_close, } = core.ensureFastOps(); const _upgraded = Symbol("_upgraded"); @@ -334,11 +336,15 @@ class CallbackContext { fallbackHost; serverRid; closed; + closing; constructor(signal, args) { + // The abort signal triggers a non-graceful shutdown signal?.addEventListener( "abort", - () => this.close(), + () => { + op_http_cancel(this.serverRid, false); + }, { once: true }, ); this.abortController = new AbortController(); @@ -630,6 +636,9 @@ function serveHttpOn(context, callback) { if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) { break; } + if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error)) { + break; + } throw new Deno.errors.Http(error); } if (req === -1) { @@ -637,10 +646,24 @@ function serveHttpOn(context, callback) { } PromisePrototypeCatch(callback(req), promiseErrorHandler); } + + if (!context.closed && !context.closing) { + context.closed = true; + await op_http_close(rid, false); + context.close(); + } })(); return { finished, + async shutdown() { + if (!context.closed && !context.closing) { + // Shut this HTTP server down gracefully + context.closing = true; + await op_http_close(context.serverRid, true); + context.closed = true; + } + }, then() { throw new Error( "Deno.serve no longer returns a promise. await server.finished instead of server.", diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 476a55a804..94f6f1241a 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,11 +10,13 @@ use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; +use crate::slab::http_trace; use crate::slab::slab_drop; use crate::slab::slab_get; use crate::slab::slab_init; use crate::slab::slab_insert; use crate::slab::HttpRequestBodyAutocloser; +use crate::slab::RefCount; use crate::slab::SlabId; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; @@ -70,6 +72,7 @@ use std::future::Future; use std::io; use std::pin::Pin; use std::rc::Rc; +use std::time::Duration; use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; @@ -690,7 +693,10 @@ pub async fn op_http_track( .resource_table .get::(server_rid)?; - match handle.or_cancel(join_handle.cancel_handle()).await { + match handle + .or_cancel(join_handle.connection_cancel_handle()) + .await + { Ok(true) => Ok(()), Ok(false) => { Err(AnyError::msg("connection closed before message completed")) @@ -705,14 +711,17 @@ pub struct SlabFuture>(SlabId, #[pin] F); pub fn new_slab_future( request: Request, request_info: HttpConnectionProperties, + refcount: RefCount, tx: tokio::sync::mpsc::Sender, ) -> SlabFuture> { - let index = slab_insert(request, request_info); + let index = slab_insert(request, request_info, refcount); let rx = slab_get(index).promise(); SlabFuture(index, async move { if tx.send(index).await.is_ok() { + http_trace!(index, "SlabFuture await"); // We only need to wait for completion if we aren't closed rx.await; + http_trace!(index, "SlabFuture complete"); } }) } @@ -745,45 +754,75 @@ impl> Future for SlabFuture { fn serve_http11_unconditional( io: impl HttpServeStream, svc: impl HttpService + 'static, -) -> impl Future> + 'static { + cancel: Rc, +) -> impl Future> + 'static { let conn = http1::Builder::new() .keep_alive(true) .writev(*USE_WRITEV) - .serve_connection(TokioIo::new(io), svc); + .serve_connection(TokioIo::new(io), svc) + .with_upgrades(); - conn.with_upgrades().map_err(AnyError::from) + async { + match conn.or_abort(cancel).await { + Err(mut conn) => { + Pin::new(&mut conn).graceful_shutdown(); + conn.await + } + Ok(res) => res, + } + } } fn serve_http2_unconditional( io: impl HttpServeStream, svc: impl HttpService + 'static, -) -> impl Future> + 'static { + cancel: Rc, +) -> impl Future> + 'static { let conn = http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc); - conn.map_err(AnyError::from) + async { + match conn.or_abort(cancel).await { + Err(mut conn) => { + Pin::new(&mut conn).graceful_shutdown(); + conn.await + } + Ok(res) => res, + } + } } async fn serve_http2_autodetect( io: impl HttpServeStream, svc: impl HttpService + 'static, + cancel: Rc, ) -> Result<(), AnyError> { let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX); let (matches, io) = prefix.match_prefix().await?; if matches { - serve_http2_unconditional(io, svc).await + serve_http2_unconditional(io, svc, cancel) + .await + .map_err(|e| e.into()) } else { - serve_http11_unconditional(io, svc).await + serve_http11_unconditional(io, svc, cancel) + .await + .map_err(|e| e.into()) } } fn serve_https( mut io: TlsStream, request_info: HttpConnectionProperties, - cancel: Rc, + lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { + let HttpLifetime { + refcount, + connection_cancel_handle, + listen_cancel_handle, + } = lifetime; + let svc = service_fn(move |req: Request| { - new_slab_future(req, request_info.clone(), tx.clone()) + new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone()) }); spawn( async { @@ -792,33 +831,46 @@ fn serve_https( // based on the prefix bytes let handshake = io.get_ref().1.alpn_protocol(); if handshake == Some(TLS_ALPN_HTTP_2) { - serve_http2_unconditional(io, svc).await + serve_http2_unconditional(io, svc, listen_cancel_handle) + .await + .map_err(|e| e.into()) } else if handshake == Some(TLS_ALPN_HTTP_11) { - serve_http11_unconditional(io, svc).await + serve_http11_unconditional(io, svc, listen_cancel_handle) + .await + .map_err(|e| e.into()) } else { - serve_http2_autodetect(io, svc).await + serve_http2_autodetect(io, svc, listen_cancel_handle).await } } - .try_or_cancel(cancel), + .try_or_cancel(connection_cancel_handle), ) } fn serve_http( io: impl HttpServeStream, request_info: HttpConnectionProperties, - cancel: Rc, + lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { + let HttpLifetime { + refcount, + connection_cancel_handle, + listen_cancel_handle, + } = lifetime; + let svc = service_fn(move |req: Request| { - new_slab_future(req, request_info.clone(), tx.clone()) + new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone()) }); - spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel)) + spawn( + serve_http2_autodetect(io, svc, listen_cancel_handle) + .try_or_cancel(connection_cancel_handle), + ) } fn serve_http_on( connection: HTTP::Connection, listen_properties: &HttpListenProperties, - cancel: Rc, + lifetime: HttpLifetime, tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> where @@ -831,28 +883,58 @@ where match network_stream { NetworkStream::Tcp(conn) => { - serve_http(conn, connection_properties, cancel, tx) + serve_http(conn, connection_properties, lifetime, tx) } NetworkStream::Tls(conn) => { - serve_https(conn, connection_properties, cancel, tx) + serve_https(conn, connection_properties, lifetime, tx) } #[cfg(unix)] NetworkStream::Unix(conn) => { - serve_http(conn, connection_properties, cancel, tx) + serve_http(conn, connection_properties, lifetime, tx) } } } -struct HttpJoinHandle( - AsyncRefCell>>>, - // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd - Rc, - AsyncRefCell>, -); +#[derive(Clone)] +struct HttpLifetime { + connection_cancel_handle: Rc, + listen_cancel_handle: Rc, + refcount: RefCount, +} + +struct HttpJoinHandle { + join_handle: AsyncRefCell>>>, + connection_cancel_handle: Rc, + listen_cancel_handle: Rc, + rx: AsyncRefCell>, + refcount: RefCount, +} impl HttpJoinHandle { - fn cancel_handle(self: &Rc) -> Rc { - self.1.clone() + fn new(rx: tokio::sync::mpsc::Receiver) -> Self { + Self { + join_handle: AsyncRefCell::new(None), + connection_cancel_handle: CancelHandle::new_rc(), + listen_cancel_handle: CancelHandle::new_rc(), + rx: AsyncRefCell::new(rx), + refcount: RefCount::default(), + } + } + + fn lifetime(self: &Rc) -> HttpLifetime { + HttpLifetime { + connection_cancel_handle: self.connection_cancel_handle.clone(), + listen_cancel_handle: self.listen_cancel_handle.clone(), + refcount: self.refcount.clone(), + } + } + + fn connection_cancel_handle(self: &Rc) -> Rc { + self.connection_cancel_handle.clone() + } + + fn listen_cancel_handle(self: &Rc) -> Rc { + self.listen_cancel_handle.clone() } } @@ -862,14 +944,17 @@ impl Resource for HttpJoinHandle { } fn close(self: Rc) { - self.1.cancel() + // During a close operation, we cancel everything + self.connection_cancel_handle.cancel(); + self.listen_cancel_handle.cancel(); } } impl Drop for HttpJoinHandle { fn drop(&mut self) { // In some cases we may be dropped without closing, so let's cancel everything on the way out - self.1.cancel(); + self.connection_cancel_handle.cancel(); + self.listen_cancel_handle.cancel(); } } @@ -890,23 +975,21 @@ where let listen_properties = HTTP::listen_properties_from_listener(&listener)?; let (tx, rx) = tokio::sync::mpsc::channel(10); - let resource: Rc = Rc::new(HttpJoinHandle( - AsyncRefCell::new(None), - CancelHandle::new_rc(), - AsyncRefCell::new(rx), - )); - let cancel_clone = resource.cancel_handle(); + let resource: Rc = Rc::new(HttpJoinHandle::new(rx)); + let listen_cancel_clone = resource.listen_cancel_handle(); + + let lifetime = resource.lifetime(); let listen_properties_clone: HttpListenProperties = listen_properties.clone(); let handle = spawn(async move { loop { let conn = HTTP::accept_connection_from_listener(&listener) - .try_or_cancel(cancel_clone.clone()) + .try_or_cancel(listen_cancel_clone.clone()) .await?; serve_http_on::( conn, &listen_properties_clone, - cancel_clone.clone(), + lifetime.clone(), tx.clone(), ); } @@ -915,7 +998,7 @@ where }); // Set the handle after we start the future - *RcRef::map(&resource, |this| &this.0) + *RcRef::map(&resource, |this| &this.join_handle) .try_borrow_mut() .unwrap() = Some(handle); @@ -943,22 +1026,18 @@ where let listen_properties = HTTP::listen_properties_from_connection(&connection)?; let (tx, rx) = tokio::sync::mpsc::channel(10); - let resource: Rc = Rc::new(HttpJoinHandle( - AsyncRefCell::new(None), - CancelHandle::new_rc(), - AsyncRefCell::new(rx), - )); + let resource: Rc = Rc::new(HttpJoinHandle::new(rx)); let handle: JoinHandle> = serve_http_on::( connection, &listen_properties, - resource.cancel_handle(), + resource.lifetime(), tx, ); // Set the handle after we start the future - *RcRef::map(&resource, |this| &this.0) + *RcRef::map(&resource, |this| &this.join_handle) .try_borrow_mut() .unwrap() = Some(handle); @@ -981,7 +1060,7 @@ pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId { // If join handle is somehow locked, just abort. let Some(mut handle) = - RcRef::map(&join_handle, |this| &this.2).try_borrow_mut() + RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut() else { return SlabId::MAX; }; @@ -1006,9 +1085,9 @@ pub async fn op_http_wait( .resource_table .get::(rid)?; - let cancel = join_handle.cancel_handle(); + let cancel = join_handle.listen_cancel_handle(); let next = async { - let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await; + let mut recv = RcRef::map(&join_handle, |this| &this.rx).borrow_mut().await; recv.recv().await } .or_cancel(cancel) @@ -1021,19 +1100,13 @@ pub async fn op_http_wait( } // No - we're shutting down - let res = RcRef::map(join_handle, |this| &this.0) + let res = RcRef::map(join_handle, |this| &this.join_handle) .borrow_mut() .await .take() .unwrap() .await?; - // Drop the cancel and join handles - state - .borrow_mut() - .resource_table - .take::(rid)?; - // Filter out shutdown (ENOTCONN) errors if let Err(err) = res { if let Some(err) = err.source() { @@ -1049,6 +1122,63 @@ pub async fn op_http_wait( Ok(SlabId::MAX) } +/// Cancels the HTTP handle. +#[op2(fast)] +pub fn op_http_cancel( + state: &mut OpState, + #[smi] rid: ResourceId, + graceful: bool, +) -> Result<(), AnyError> { + let join_handle = state.resource_table.get::(rid)?; + + if graceful { + // In a graceful shutdown, we close the listener and allow all the remaining connections to drain + join_handle.listen_cancel_handle().cancel(); + } else { + // In a forceful shutdown, we close everything + join_handle.listen_cancel_handle().cancel(); + join_handle.connection_cancel_handle().cancel(); + } + + Ok(()) +} + +#[op2(async)] +pub async fn op_http_close( + state: Rc>, + #[smi] rid: ResourceId, + graceful: bool, +) -> Result<(), AnyError> { + let join_handle = state + .borrow_mut() + .resource_table + .take::(rid)?; + + if graceful { + deno_net::check_unstable2(&state, "Deno.Server.shutdown"); + // In a graceful shutdown, we close the listener and allow all the remaining connections to drain + join_handle.listen_cancel_handle().cancel(); + } else { + // In a forceful shutdown, we close everything + join_handle.listen_cancel_handle().cancel(); + join_handle.connection_cancel_handle().cancel(); + } + + // Async spin on the refcount while we wait for everything to drain + while Rc::strong_count(&join_handle.refcount.0) > 1 { + tokio::time::sleep(Duration::from_millis(10)).await; + } + + let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle) + .borrow_mut() + .await; + if let Some(join_handle) = join_handle.take() { + join_handle.await??; + } + + Ok(()) +} + struct UpgradeStream { read: AsyncRefCell>, write: AsyncRefCell>, diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 93ea0895e6..719dcd6dee 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -126,6 +126,8 @@ deno_core::extension!( http_next::op_can_write_vectored, http_next::op_http_try_wait, http_next::op_http_wait, + http_next::op_http_close, + http_next::op_http_cancel, ], esm = ["00_serve.js", "01_http.js"], ); diff --git a/ext/http/slab.rs b/ext/http/slab.rs index 8dd562cc2d..4718aded1e 100644 --- a/ext/http/slab.rs +++ b/ext/http/slab.rs @@ -20,6 +20,10 @@ pub type Request = hyper1::Request; pub type Response = hyper1::Response; pub type SlabId = u32; +#[repr(transparent)] +#[derive(Clone, Default)] +pub struct RefCount(pub Rc<()>); + enum RequestBodyState { Incoming(Incoming), Resource(HttpRequestBodyAutocloser), @@ -50,24 +54,27 @@ pub struct HttpSlabRecord { request_info: HttpConnectionProperties, request_parts: Parts, request_body: Option, - // The response may get taken before we tear this down + /// The response may get taken before we tear this down response: Option, promise: CompletionHandle, trailers: Rc>>, been_dropped: bool, + /// Use a `Rc` to keep track of outstanding requests. We don't use this, but + /// when it drops, it decrements the refcount of the server itself. + refcount: Option, #[cfg(feature = "__zombie_http_tracking")] alive: bool, } thread_local! { - static SLAB: RefCell> = const { RefCell::new(Slab::new()) }; + pub(crate) static SLAB: RefCell> = const { RefCell::new(Slab::new()) }; } macro_rules! http_trace { ($index:expr, $args:tt) => { #[cfg(feature = "__http_tracing")] { - let total = SLAB.with(|x| x.try_borrow().map(|x| x.len())); + let total = $crate::slab::SLAB.with(|x| x.try_borrow().map(|x| x.len())); if let Ok(total) = total { println!("HTTP id={} total={}: {}", $index, total, format!($args)); } else { @@ -77,6 +84,8 @@ macro_rules! http_trace { }; } +pub(crate) use http_trace; + /// Hold a lock on the slab table and a reference to one entry in the table. pub struct SlabEntry( NonNull, @@ -121,6 +130,7 @@ fn slab_insert_raw( request_parts: Parts, request_body: Option, request_info: HttpConnectionProperties, + refcount: RefCount, ) -> SlabId { let index = SLAB.with(|slab| { let mut slab = slab.borrow_mut(); @@ -135,6 +145,7 @@ fn slab_insert_raw( trailers, been_dropped: false, promise: CompletionHandle::default(), + refcount: Some(refcount), #[cfg(feature = "__zombie_http_tracking")] alive: true, }) @@ -146,9 +157,10 @@ fn slab_insert_raw( pub fn slab_insert( request: Request, request_info: HttpConnectionProperties, + refcount: RefCount, ) -> SlabId { let (request_parts, request_body) = request.into_parts(); - slab_insert_raw(request_parts, Some(request_body), request_info) + slab_insert_raw(request_parts, Some(request_body), request_info, refcount) } pub fn slab_drop(index: SlabId) { @@ -159,10 +171,21 @@ pub fn slab_drop(index: SlabId) { !record.been_dropped, "HTTP state error: Entry has already been dropped" ); + + // The logic here is somewhat complicated. A slab record cannot be expunged until it has been dropped by Rust AND + // the promise has been completed (indicating that JavaScript is done processing). However, if Rust has finished + // dealing with this entry, we DO want to clean up some of the associated items -- namely the request body, which + // might include actual resources, and the refcount, which is keeping the server alive. record.been_dropped = true; if record.promise.is_completed() { drop(entry); slab_expunge(index); + } else { + // Take the request body, as the future has been dropped and this will allow some resources to close + record.request_body.take(); + // Take the refcount keeping the server alive. The future is no longer alive, which means this request + // is toast. + record.refcount.take(); } } @@ -318,6 +341,7 @@ mod tests { local_port: None, stream_type: NetworkStreamType::Tcp, }, + RefCount::default(), ); let entry = slab_get(id); entry.complete();