mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 16:42:21 -05:00
fix(ext/http): create a graceful shutdown API (#20387)
This PR implements a graceful shutdown API for Deno.serve, allowing all current connections to drain from the server before shutting down, while preventing new connections from being started or new transactions on existing connections from being created. We split the cancellation handle into two parts: a listener handle, and a connection handle. A graceful shutdown cancels the listener only, while allowing the connections to drain. The connection handle aborts all futures. If the listener handle is cancelled, we put the connections into graceful shutdown mode, which disables keep-alive on http/1.1 and uses http/2 mechanisms for http/2 connections. In addition, we now guarantee that all connections are complete or cancelled, and all resources are cleaned up when the server `finished` promise resolves -- we use a Rust-side server refcount for this. Performance impact: does not appear to affect basic serving performance by more than 1% (~126k -> ~125k) --------- Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
parent
bfd230fd78
commit
950e0e9cd6
9 changed files with 499 additions and 80 deletions
13
Cargo.lock
generated
13
Cargo.lock
generated
|
@ -922,7 +922,6 @@ dependencies = [
|
|||
"tokio",
|
||||
"tokio-util",
|
||||
"tower-lsp",
|
||||
"tracing",
|
||||
"trust-dns-client",
|
||||
"trust-dns-server",
|
||||
"twox-hash",
|
||||
|
@ -1075,9 +1074,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "deno_core"
|
||||
version = "0.208.0"
|
||||
version = "0.209.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "aab2b013707b6a1bb1e56404b72a4f68220d0fbe1184133b2b21386a8ffbc5d8"
|
||||
checksum = "c48ff1f83aeeda4b8ed9c101b85380fd2f25a52268130546c610c8e412911d7b"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
|
@ -1459,9 +1458,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "deno_ops"
|
||||
version = "0.86.0"
|
||||
version = "0.87.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b116802ace73e3dd910081652789c85aa21f057b9f5936255d786965816fb3b1"
|
||||
checksum = "573a5ae66f76ce159525ab9007433e19d1a074e32c27b17a4753780d659d79fa"
|
||||
dependencies = [
|
||||
"deno-proc-macro-rules",
|
||||
"lazy-regex 2.5.0",
|
||||
|
@ -4619,9 +4618,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "serde_v8"
|
||||
version = "0.119.0"
|
||||
version = "0.120.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85efce3bb967c7cd2be8058f7b06047489e0b0888fc25db9e3aa7907370ae45c"
|
||||
checksum = "a5424b4b41a92222abf9ddbdd78f59164f7594422ee4a61fc3704fc8ba608dc6"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"derive_more",
|
||||
|
|
|
@ -40,7 +40,7 @@ repository = "https://github.com/denoland/deno"
|
|||
[workspace.dependencies]
|
||||
deno_ast = { version = "0.29.1", features = ["transpiling"] }
|
||||
|
||||
deno_core = { version = "0.208.0" }
|
||||
deno_core = { version = "0.209.0" }
|
||||
|
||||
deno_runtime = { version = "0.126.0", path = "./runtime" }
|
||||
napi_sym = { version = "0.48.0", path = "./cli/napi/sym" }
|
||||
|
@ -139,7 +139,6 @@ tar = "=0.4.40"
|
|||
tempfile = "3.4.0"
|
||||
termcolor = "1.1.3"
|
||||
thiserror = "1.0.40"
|
||||
tracing = "0"
|
||||
tokio = { version = "1.28.1", features = ["full"] }
|
||||
tokio-metrics = { version = "0.3.0", features = ["rt"] }
|
||||
tokio-rustls = "0.24.0"
|
||||
|
|
|
@ -116,7 +116,6 @@ thiserror.workspace = true
|
|||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tower-lsp.workspace = true
|
||||
tracing.workspace = true
|
||||
twox-hash = "=1.6.3"
|
||||
typed-arena = "=2.0.1"
|
||||
uuid = { workspace = true, features = ["serde"] }
|
||||
|
|
|
@ -41,27 +41,257 @@ function onListen<T>(
|
|||
};
|
||||
}
|
||||
|
||||
Deno.test(async function httpServerShutsDownPortBeforeResolving() {
|
||||
async function makeServer(
|
||||
handler: (req: Request) => Response | Promise<Response>,
|
||||
): Promise<
|
||||
{ finished: Promise<void>; abort: () => void; shutdown: () => Promise<void> }
|
||||
> {
|
||||
const ac = new AbortController();
|
||||
const listeningPromise = deferred();
|
||||
|
||||
const server = Deno.serve({
|
||||
handler: (_req) => new Response("ok"),
|
||||
handler,
|
||||
port: servePort,
|
||||
signal: ac.signal,
|
||||
onListen: onListen(listeningPromise),
|
||||
});
|
||||
|
||||
await listeningPromise;
|
||||
assertThrows(() => Deno.listen({ port: servePort }));
|
||||
|
||||
return {
|
||||
finished: server.finished,
|
||||
abort() {
|
||||
ac.abort();
|
||||
await server.finished;
|
||||
},
|
||||
async shutdown() {
|
||||
await server.shutdown();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
Deno.test(async function httpServerShutsDownPortBeforeResolving() {
|
||||
const { finished, abort } = await makeServer((_req) => new Response("ok"));
|
||||
assertThrows(() => Deno.listen({ port: servePort }));
|
||||
abort();
|
||||
await finished;
|
||||
|
||||
const listener = Deno.listen({ port: servePort });
|
||||
listener!.close();
|
||||
});
|
||||
|
||||
// When shutting down abruptly, we require that all in-progress connections are aborted,
|
||||
// no new connections are allowed, and no new transactions are allowed on existing connections.
|
||||
Deno.test(
|
||||
{ permissions: { net: true } },
|
||||
async function httpServerShutdownAbruptGuaranteeHttp11() {
|
||||
const promiseQueue: { input: Deferred<string>; out: Deferred<void> }[] = [];
|
||||
const { finished, abort } = await makeServer((_req) => {
|
||||
const { input, out } = promiseQueue.shift()!;
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
async start(controller) {
|
||||
controller.enqueue(new Uint8Array([46]));
|
||||
out.resolve(undefined);
|
||||
controller.enqueue(encoder.encode(await input));
|
||||
controller.close();
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
const encoder = new TextEncoder();
|
||||
const decoder = new TextDecoder();
|
||||
const conn = await Deno.connect({ port: servePort });
|
||||
const w = conn.writable.getWriter();
|
||||
const r = conn.readable.getReader();
|
||||
|
||||
const deferred1 = { input: deferred<string>(), out: deferred<void>() };
|
||||
promiseQueue.push(deferred1);
|
||||
const deferred2 = { input: deferred<string>(), out: deferred<void>() };
|
||||
promiseQueue.push(deferred2);
|
||||
const deferred3 = { input: deferred<string>(), out: deferred<void>() };
|
||||
promiseQueue.push(deferred3);
|
||||
deferred1.input.resolve("#");
|
||||
deferred2.input.resolve("$");
|
||||
await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
|
||||
await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
|
||||
|
||||
// Fully read two responses
|
||||
let text = "";
|
||||
while (!text.includes("$\r\n")) {
|
||||
text += decoder.decode((await r.read()).value);
|
||||
}
|
||||
|
||||
await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
|
||||
await deferred3.out;
|
||||
|
||||
// This is half served, so wait for the chunk that has the first '.'
|
||||
text = "";
|
||||
while (!text.includes("1\r\n.\r\n")) {
|
||||
text += decoder.decode((await r.read()).value);
|
||||
}
|
||||
|
||||
abort();
|
||||
|
||||
// This doesn't actually write anything, but we release it after aborting
|
||||
deferred3.input.resolve("!");
|
||||
|
||||
// Guarantee: can't connect to an aborted server (though this may not happen immediately)
|
||||
let failed = false;
|
||||
for (let i = 0; i < 10; i++) {
|
||||
try {
|
||||
const conn = await Deno.connect({ port: servePort });
|
||||
conn.close();
|
||||
// Give the runtime a few ticks to settle (required for Windows)
|
||||
await new Promise((r) => setTimeout(r, 2 ** i));
|
||||
continue;
|
||||
} catch (_) {
|
||||
failed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert(failed, "The Deno.serve listener was not disabled promptly");
|
||||
|
||||
// Guarantee: the pipeline is closed abruptly
|
||||
assert((await r.read()).done);
|
||||
|
||||
try {
|
||||
conn.close();
|
||||
} catch (_) {
|
||||
// Ignore
|
||||
}
|
||||
await finished;
|
||||
},
|
||||
);
|
||||
|
||||
// When shutting down abruptly, we require that all in-progress connections are aborted,
|
||||
// no new connections are allowed, and no new transactions are allowed on existing connections.
|
||||
Deno.test(
|
||||
{ permissions: { net: true } },
|
||||
async function httpServerShutdownGracefulGuaranteeHttp11() {
|
||||
const promiseQueue: { input: Deferred<string>; out: Deferred<void> }[] = [];
|
||||
const { finished, shutdown } = await makeServer((_req) => {
|
||||
const { input, out } = promiseQueue.shift()!;
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
async start(controller) {
|
||||
controller.enqueue(new Uint8Array([46]));
|
||||
out.resolve(undefined);
|
||||
controller.enqueue(encoder.encode(await input));
|
||||
controller.close();
|
||||
},
|
||||
}),
|
||||
);
|
||||
});
|
||||
const encoder = new TextEncoder();
|
||||
const decoder = new TextDecoder();
|
||||
const conn = await Deno.connect({ port: servePort });
|
||||
const w = conn.writable.getWriter();
|
||||
const r = conn.readable.getReader();
|
||||
|
||||
const deferred1 = { input: deferred<string>(), out: deferred<void>() };
|
||||
promiseQueue.push(deferred1);
|
||||
const deferred2 = { input: deferred<string>(), out: deferred<void>() };
|
||||
promiseQueue.push(deferred2);
|
||||
const deferred3 = { input: deferred<string>(), out: deferred<void>() };
|
||||
promiseQueue.push(deferred3);
|
||||
deferred1.input.resolve("#");
|
||||
deferred2.input.resolve("$");
|
||||
await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
|
||||
await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
|
||||
|
||||
// Fully read two responses
|
||||
let text = "";
|
||||
while (!text.includes("$\r\n")) {
|
||||
text += decoder.decode((await r.read()).value);
|
||||
}
|
||||
|
||||
await w.write(encoder.encode(`GET / HTTP/1.1\nConnection: keep-alive\n\n`));
|
||||
await deferred3.out;
|
||||
|
||||
// This is half served, so wait for the chunk that has the first '.'
|
||||
text = "";
|
||||
while (!text.includes("1\r\n.\r\n")) {
|
||||
text += decoder.decode((await r.read()).value);
|
||||
}
|
||||
|
||||
const shutdownPromise = shutdown();
|
||||
|
||||
// Release the final response _after_ we shut down
|
||||
deferred3.input.resolve("!");
|
||||
|
||||
// Guarantee: can't connect to an aborted server (though this may not happen immediately)
|
||||
let failed = false;
|
||||
for (let i = 0; i < 10; i++) {
|
||||
try {
|
||||
const conn = await Deno.connect({ port: servePort });
|
||||
conn.close();
|
||||
// Give the runtime a few ticks to settle (required for Windows)
|
||||
await new Promise((r) => setTimeout(r, 2 ** i));
|
||||
continue;
|
||||
} catch (_) {
|
||||
failed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert(failed, "The Deno.serve listener was not disabled promptly");
|
||||
|
||||
// Guarantee: existing connections fully drain
|
||||
while (!text.includes("!\r\n")) {
|
||||
text += decoder.decode((await r.read()).value);
|
||||
}
|
||||
|
||||
await shutdownPromise;
|
||||
|
||||
try {
|
||||
conn.close();
|
||||
} catch (_) {
|
||||
// Ignore
|
||||
}
|
||||
await finished;
|
||||
},
|
||||
);
|
||||
|
||||
// Ensure that resources don't leak during a graceful shutdown
|
||||
Deno.test(
|
||||
{ permissions: { net: true, write: true, read: true } },
|
||||
async function httpServerShutdownGracefulResources() {
|
||||
const waitForRequest = deferred();
|
||||
const { finished, shutdown } = await makeServer(async (_req) => {
|
||||
waitForRequest.resolve(null);
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
return new Response((await makeTempFile(1024 * 1024)).readable);
|
||||
});
|
||||
|
||||
const f = fetch(`http://localhost:${servePort}`);
|
||||
await waitForRequest;
|
||||
assertEquals((await (await f).text()).length, 1048576);
|
||||
await shutdown();
|
||||
await finished;
|
||||
},
|
||||
);
|
||||
|
||||
// Ensure that resources don't leak during a graceful shutdown
|
||||
Deno.test(
|
||||
{ permissions: { net: true, write: true, read: true } },
|
||||
async function httpServerShutdownGracefulResources2() {
|
||||
const waitForAbort = deferred();
|
||||
const waitForRequest = deferred();
|
||||
const { finished, shutdown } = await makeServer(async (_req) => {
|
||||
waitForRequest.resolve(null);
|
||||
await waitForAbort;
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
return new Response((await makeTempFile(1024 * 1024)).readable);
|
||||
});
|
||||
|
||||
const f = fetch(`http://localhost:${servePort}`);
|
||||
await waitForRequest;
|
||||
const s = shutdown();
|
||||
waitForAbort.resolve(null);
|
||||
assertEquals((await (await f).text()).length, 1048576);
|
||||
await s;
|
||||
await finished;
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true, run: true } },
|
||||
async function httpServerUnref() {
|
||||
|
@ -2459,7 +2689,9 @@ for (const url of ["text", "file", "stream"]) {
|
|||
// Give it a few milliseconds for the serve machinery to work
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
ac.abort();
|
||||
// Since the handler has a chance of creating resources or running async ops, we need to use a
|
||||
// graceful shutdown here to ensure they have fully drained.
|
||||
await server.shutdown();
|
||||
await server.finished;
|
||||
},
|
||||
});
|
||||
|
|
11
cli/tsc/dts/lib.deno.unstable.d.ts
vendored
11
cli/tsc/dts/lib.deno.unstable.d.ts
vendored
|
@ -1936,6 +1936,17 @@ declare namespace Deno {
|
|||
/** The value of this unsigned 64-bit integer, represented as a bigint. */
|
||||
readonly value: bigint;
|
||||
}
|
||||
|
||||
/** An instance of the server created using `Deno.serve()` API.
|
||||
*
|
||||
* @category HTTP Server
|
||||
*/
|
||||
export interface Server {
|
||||
/** Gracefully close the server. No more new connections will be accepted,
|
||||
* while pending requests will be allowed to finish.
|
||||
*/
|
||||
shutdown(): Promise<void>;
|
||||
}
|
||||
}
|
||||
|
||||
/** **UNSTABLE**: New API, yet to be vetted.
|
||||
|
|
|
@ -4,7 +4,7 @@ const core = globalThis.Deno.core;
|
|||
const primordials = globalThis.__bootstrap.primordials;
|
||||
const internals = globalThis.__bootstrap.internals;
|
||||
|
||||
const { BadResourcePrototype } = core;
|
||||
const { BadResourcePrototype, InterruptedPrototype } = core;
|
||||
import { InnerBody } from "ext:deno_fetch/22_body.js";
|
||||
import { Event } from "ext:deno_web/02_event.js";
|
||||
import {
|
||||
|
@ -65,6 +65,8 @@ const {
|
|||
op_http_upgrade_websocket_next,
|
||||
op_http_try_wait,
|
||||
op_http_wait,
|
||||
op_http_cancel,
|
||||
op_http_close,
|
||||
} = core.ensureFastOps();
|
||||
const _upgraded = Symbol("_upgraded");
|
||||
|
||||
|
@ -334,11 +336,15 @@ class CallbackContext {
|
|||
fallbackHost;
|
||||
serverRid;
|
||||
closed;
|
||||
closing;
|
||||
|
||||
constructor(signal, args) {
|
||||
// The abort signal triggers a non-graceful shutdown
|
||||
signal?.addEventListener(
|
||||
"abort",
|
||||
() => this.close(),
|
||||
() => {
|
||||
op_http_cancel(this.serverRid, false);
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
this.abortController = new AbortController();
|
||||
|
@ -630,6 +636,9 @@ function serveHttpOn(context, callback) {
|
|||
if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) {
|
||||
break;
|
||||
}
|
||||
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error)) {
|
||||
break;
|
||||
}
|
||||
throw new Deno.errors.Http(error);
|
||||
}
|
||||
if (req === -1) {
|
||||
|
@ -637,10 +646,24 @@ function serveHttpOn(context, callback) {
|
|||
}
|
||||
PromisePrototypeCatch(callback(req), promiseErrorHandler);
|
||||
}
|
||||
|
||||
if (!context.closed && !context.closing) {
|
||||
context.closed = true;
|
||||
await op_http_close(rid, false);
|
||||
context.close();
|
||||
}
|
||||
})();
|
||||
|
||||
return {
|
||||
finished,
|
||||
async shutdown() {
|
||||
if (!context.closed && !context.closing) {
|
||||
// Shut this HTTP server down gracefully
|
||||
context.closing = true;
|
||||
await op_http_close(context.serverRid, true);
|
||||
context.closed = true;
|
||||
}
|
||||
},
|
||||
then() {
|
||||
throw new Error(
|
||||
"Deno.serve no longer returns a promise. await server.finished instead of server.",
|
||||
|
|
|
@ -10,11 +10,13 @@ use crate::request_properties::HttpPropertyExtractor;
|
|||
use crate::response_body::Compression;
|
||||
use crate::response_body::ResponseBytes;
|
||||
use crate::response_body::ResponseBytesInner;
|
||||
use crate::slab::http_trace;
|
||||
use crate::slab::slab_drop;
|
||||
use crate::slab::slab_get;
|
||||
use crate::slab::slab_init;
|
||||
use crate::slab::slab_insert;
|
||||
use crate::slab::HttpRequestBodyAutocloser;
|
||||
use crate::slab::RefCount;
|
||||
use crate::slab::SlabId;
|
||||
use crate::websocket_upgrade::WebSocketUpgrade;
|
||||
use crate::LocalExecutor;
|
||||
|
@ -70,6 +72,7 @@ use std::future::Future;
|
|||
use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
@ -690,7 +693,10 @@ pub async fn op_http_track(
|
|||
.resource_table
|
||||
.get::<HttpJoinHandle>(server_rid)?;
|
||||
|
||||
match handle.or_cancel(join_handle.cancel_handle()).await {
|
||||
match handle
|
||||
.or_cancel(join_handle.connection_cancel_handle())
|
||||
.await
|
||||
{
|
||||
Ok(true) => Ok(()),
|
||||
Ok(false) => {
|
||||
Err(AnyError::msg("connection closed before message completed"))
|
||||
|
@ -705,14 +711,17 @@ pub struct SlabFuture<F: Future<Output = ()>>(SlabId, #[pin] F);
|
|||
pub fn new_slab_future(
|
||||
request: Request,
|
||||
request_info: HttpConnectionProperties,
|
||||
refcount: RefCount,
|
||||
tx: tokio::sync::mpsc::Sender<SlabId>,
|
||||
) -> SlabFuture<impl Future<Output = ()>> {
|
||||
let index = slab_insert(request, request_info);
|
||||
let index = slab_insert(request, request_info, refcount);
|
||||
let rx = slab_get(index).promise();
|
||||
SlabFuture(index, async move {
|
||||
if tx.send(index).await.is_ok() {
|
||||
http_trace!(index, "SlabFuture await");
|
||||
// We only need to wait for completion if we aren't closed
|
||||
rx.await;
|
||||
http_trace!(index, "SlabFuture complete");
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -745,45 +754,75 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> {
|
|||
fn serve_http11_unconditional(
|
||||
io: impl HttpServeStream,
|
||||
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
|
||||
) -> impl Future<Output = Result<(), AnyError>> + 'static {
|
||||
cancel: Rc<CancelHandle>,
|
||||
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
|
||||
let conn = http1::Builder::new()
|
||||
.keep_alive(true)
|
||||
.writev(*USE_WRITEV)
|
||||
.serve_connection(TokioIo::new(io), svc);
|
||||
.serve_connection(TokioIo::new(io), svc)
|
||||
.with_upgrades();
|
||||
|
||||
conn.with_upgrades().map_err(AnyError::from)
|
||||
async {
|
||||
match conn.or_abort(cancel).await {
|
||||
Err(mut conn) => {
|
||||
Pin::new(&mut conn).graceful_shutdown();
|
||||
conn.await
|
||||
}
|
||||
Ok(res) => res,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn serve_http2_unconditional(
|
||||
io: impl HttpServeStream,
|
||||
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
|
||||
) -> impl Future<Output = Result<(), AnyError>> + 'static {
|
||||
cancel: Rc<CancelHandle>,
|
||||
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
|
||||
let conn =
|
||||
http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
|
||||
conn.map_err(AnyError::from)
|
||||
async {
|
||||
match conn.or_abort(cancel).await {
|
||||
Err(mut conn) => {
|
||||
Pin::new(&mut conn).graceful_shutdown();
|
||||
conn.await
|
||||
}
|
||||
Ok(res) => res,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn serve_http2_autodetect(
|
||||
io: impl HttpServeStream,
|
||||
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
|
||||
cancel: Rc<CancelHandle>,
|
||||
) -> Result<(), AnyError> {
|
||||
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
|
||||
let (matches, io) = prefix.match_prefix().await?;
|
||||
if matches {
|
||||
serve_http2_unconditional(io, svc).await
|
||||
serve_http2_unconditional(io, svc, cancel)
|
||||
.await
|
||||
.map_err(|e| e.into())
|
||||
} else {
|
||||
serve_http11_unconditional(io, svc).await
|
||||
serve_http11_unconditional(io, svc, cancel)
|
||||
.await
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
|
||||
fn serve_https(
|
||||
mut io: TlsStream,
|
||||
request_info: HttpConnectionProperties,
|
||||
cancel: Rc<CancelHandle>,
|
||||
lifetime: HttpLifetime,
|
||||
tx: tokio::sync::mpsc::Sender<SlabId>,
|
||||
) -> JoinHandle<Result<(), AnyError>> {
|
||||
let HttpLifetime {
|
||||
refcount,
|
||||
connection_cancel_handle,
|
||||
listen_cancel_handle,
|
||||
} = lifetime;
|
||||
|
||||
let svc = service_fn(move |req: Request| {
|
||||
new_slab_future(req, request_info.clone(), tx.clone())
|
||||
new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
|
||||
});
|
||||
spawn(
|
||||
async {
|
||||
|
@ -792,33 +831,46 @@ fn serve_https(
|
|||
// based on the prefix bytes
|
||||
let handshake = io.get_ref().1.alpn_protocol();
|
||||
if handshake == Some(TLS_ALPN_HTTP_2) {
|
||||
serve_http2_unconditional(io, svc).await
|
||||
serve_http2_unconditional(io, svc, listen_cancel_handle)
|
||||
.await
|
||||
.map_err(|e| e.into())
|
||||
} else if handshake == Some(TLS_ALPN_HTTP_11) {
|
||||
serve_http11_unconditional(io, svc).await
|
||||
serve_http11_unconditional(io, svc, listen_cancel_handle)
|
||||
.await
|
||||
.map_err(|e| e.into())
|
||||
} else {
|
||||
serve_http2_autodetect(io, svc).await
|
||||
serve_http2_autodetect(io, svc, listen_cancel_handle).await
|
||||
}
|
||||
}
|
||||
.try_or_cancel(cancel),
|
||||
.try_or_cancel(connection_cancel_handle),
|
||||
)
|
||||
}
|
||||
|
||||
fn serve_http(
|
||||
io: impl HttpServeStream,
|
||||
request_info: HttpConnectionProperties,
|
||||
cancel: Rc<CancelHandle>,
|
||||
lifetime: HttpLifetime,
|
||||
tx: tokio::sync::mpsc::Sender<SlabId>,
|
||||
) -> JoinHandle<Result<(), AnyError>> {
|
||||
let HttpLifetime {
|
||||
refcount,
|
||||
connection_cancel_handle,
|
||||
listen_cancel_handle,
|
||||
} = lifetime;
|
||||
|
||||
let svc = service_fn(move |req: Request| {
|
||||
new_slab_future(req, request_info.clone(), tx.clone())
|
||||
new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
|
||||
});
|
||||
spawn(serve_http2_autodetect(io, svc).try_or_cancel(cancel))
|
||||
spawn(
|
||||
serve_http2_autodetect(io, svc, listen_cancel_handle)
|
||||
.try_or_cancel(connection_cancel_handle),
|
||||
)
|
||||
}
|
||||
|
||||
fn serve_http_on<HTTP>(
|
||||
connection: HTTP::Connection,
|
||||
listen_properties: &HttpListenProperties,
|
||||
cancel: Rc<CancelHandle>,
|
||||
lifetime: HttpLifetime,
|
||||
tx: tokio::sync::mpsc::Sender<SlabId>,
|
||||
) -> JoinHandle<Result<(), AnyError>>
|
||||
where
|
||||
|
@ -831,28 +883,58 @@ where
|
|||
|
||||
match network_stream {
|
||||
NetworkStream::Tcp(conn) => {
|
||||
serve_http(conn, connection_properties, cancel, tx)
|
||||
serve_http(conn, connection_properties, lifetime, tx)
|
||||
}
|
||||
NetworkStream::Tls(conn) => {
|
||||
serve_https(conn, connection_properties, cancel, tx)
|
||||
serve_https(conn, connection_properties, lifetime, tx)
|
||||
}
|
||||
#[cfg(unix)]
|
||||
NetworkStream::Unix(conn) => {
|
||||
serve_http(conn, connection_properties, cancel, tx)
|
||||
serve_http(conn, connection_properties, lifetime, tx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct HttpJoinHandle(
|
||||
AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
|
||||
// Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd
|
||||
Rc<CancelHandle>,
|
||||
AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
|
||||
);
|
||||
#[derive(Clone)]
|
||||
struct HttpLifetime {
|
||||
connection_cancel_handle: Rc<CancelHandle>,
|
||||
listen_cancel_handle: Rc<CancelHandle>,
|
||||
refcount: RefCount,
|
||||
}
|
||||
|
||||
struct HttpJoinHandle {
|
||||
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
|
||||
connection_cancel_handle: Rc<CancelHandle>,
|
||||
listen_cancel_handle: Rc<CancelHandle>,
|
||||
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
|
||||
refcount: RefCount,
|
||||
}
|
||||
|
||||
impl HttpJoinHandle {
|
||||
fn cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
|
||||
self.1.clone()
|
||||
fn new(rx: tokio::sync::mpsc::Receiver<SlabId>) -> Self {
|
||||
Self {
|
||||
join_handle: AsyncRefCell::new(None),
|
||||
connection_cancel_handle: CancelHandle::new_rc(),
|
||||
listen_cancel_handle: CancelHandle::new_rc(),
|
||||
rx: AsyncRefCell::new(rx),
|
||||
refcount: RefCount::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn lifetime(self: &Rc<Self>) -> HttpLifetime {
|
||||
HttpLifetime {
|
||||
connection_cancel_handle: self.connection_cancel_handle.clone(),
|
||||
listen_cancel_handle: self.listen_cancel_handle.clone(),
|
||||
refcount: self.refcount.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
fn connection_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
|
||||
self.connection_cancel_handle.clone()
|
||||
}
|
||||
|
||||
fn listen_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
|
||||
self.listen_cancel_handle.clone()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -862,14 +944,17 @@ impl Resource for HttpJoinHandle {
|
|||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.1.cancel()
|
||||
// During a close operation, we cancel everything
|
||||
self.connection_cancel_handle.cancel();
|
||||
self.listen_cancel_handle.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HttpJoinHandle {
|
||||
fn drop(&mut self) {
|
||||
// In some cases we may be dropped without closing, so let's cancel everything on the way out
|
||||
self.1.cancel();
|
||||
self.connection_cancel_handle.cancel();
|
||||
self.listen_cancel_handle.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -890,23 +975,21 @@ where
|
|||
let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(10);
|
||||
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
|
||||
AsyncRefCell::new(None),
|
||||
CancelHandle::new_rc(),
|
||||
AsyncRefCell::new(rx),
|
||||
));
|
||||
let cancel_clone = resource.cancel_handle();
|
||||
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
|
||||
let listen_cancel_clone = resource.listen_cancel_handle();
|
||||
|
||||
let lifetime = resource.lifetime();
|
||||
|
||||
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
|
||||
let handle = spawn(async move {
|
||||
loop {
|
||||
let conn = HTTP::accept_connection_from_listener(&listener)
|
||||
.try_or_cancel(cancel_clone.clone())
|
||||
.try_or_cancel(listen_cancel_clone.clone())
|
||||
.await?;
|
||||
serve_http_on::<HTTP>(
|
||||
conn,
|
||||
&listen_properties_clone,
|
||||
cancel_clone.clone(),
|
||||
lifetime.clone(),
|
||||
tx.clone(),
|
||||
);
|
||||
}
|
||||
|
@ -915,7 +998,7 @@ where
|
|||
});
|
||||
|
||||
// Set the handle after we start the future
|
||||
*RcRef::map(&resource, |this| &this.0)
|
||||
*RcRef::map(&resource, |this| &this.join_handle)
|
||||
.try_borrow_mut()
|
||||
.unwrap() = Some(handle);
|
||||
|
||||
|
@ -943,22 +1026,18 @@ where
|
|||
let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(10);
|
||||
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
|
||||
AsyncRefCell::new(None),
|
||||
CancelHandle::new_rc(),
|
||||
AsyncRefCell::new(rx),
|
||||
));
|
||||
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
|
||||
|
||||
let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
|
||||
serve_http_on::<HTTP>(
|
||||
connection,
|
||||
&listen_properties,
|
||||
resource.cancel_handle(),
|
||||
resource.lifetime(),
|
||||
tx,
|
||||
);
|
||||
|
||||
// Set the handle after we start the future
|
||||
*RcRef::map(&resource, |this| &this.0)
|
||||
*RcRef::map(&resource, |this| &this.join_handle)
|
||||
.try_borrow_mut()
|
||||
.unwrap() = Some(handle);
|
||||
|
||||
|
@ -981,7 +1060,7 @@ pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
|
|||
|
||||
// If join handle is somehow locked, just abort.
|
||||
let Some(mut handle) =
|
||||
RcRef::map(&join_handle, |this| &this.2).try_borrow_mut()
|
||||
RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut()
|
||||
else {
|
||||
return SlabId::MAX;
|
||||
};
|
||||
|
@ -1006,9 +1085,9 @@ pub async fn op_http_wait(
|
|||
.resource_table
|
||||
.get::<HttpJoinHandle>(rid)?;
|
||||
|
||||
let cancel = join_handle.cancel_handle();
|
||||
let cancel = join_handle.listen_cancel_handle();
|
||||
let next = async {
|
||||
let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await;
|
||||
let mut recv = RcRef::map(&join_handle, |this| &this.rx).borrow_mut().await;
|
||||
recv.recv().await
|
||||
}
|
||||
.or_cancel(cancel)
|
||||
|
@ -1021,19 +1100,13 @@ pub async fn op_http_wait(
|
|||
}
|
||||
|
||||
// No - we're shutting down
|
||||
let res = RcRef::map(join_handle, |this| &this.0)
|
||||
let res = RcRef::map(join_handle, |this| &this.join_handle)
|
||||
.borrow_mut()
|
||||
.await
|
||||
.take()
|
||||
.unwrap()
|
||||
.await?;
|
||||
|
||||
// Drop the cancel and join handles
|
||||
state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.take::<HttpJoinHandle>(rid)?;
|
||||
|
||||
// Filter out shutdown (ENOTCONN) errors
|
||||
if let Err(err) = res {
|
||||
if let Some(err) = err.source() {
|
||||
|
@ -1049,6 +1122,63 @@ pub async fn op_http_wait(
|
|||
Ok(SlabId::MAX)
|
||||
}
|
||||
|
||||
/// Cancels the HTTP handle.
|
||||
#[op2(fast)]
|
||||
pub fn op_http_cancel(
|
||||
state: &mut OpState,
|
||||
#[smi] rid: ResourceId,
|
||||
graceful: bool,
|
||||
) -> Result<(), AnyError> {
|
||||
let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;
|
||||
|
||||
if graceful {
|
||||
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
|
||||
join_handle.listen_cancel_handle().cancel();
|
||||
} else {
|
||||
// In a forceful shutdown, we close everything
|
||||
join_handle.listen_cancel_handle().cancel();
|
||||
join_handle.connection_cancel_handle().cancel();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[op2(async)]
|
||||
pub async fn op_http_close(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
#[smi] rid: ResourceId,
|
||||
graceful: bool,
|
||||
) -> Result<(), AnyError> {
|
||||
let join_handle = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.take::<HttpJoinHandle>(rid)?;
|
||||
|
||||
if graceful {
|
||||
deno_net::check_unstable2(&state, "Deno.Server.shutdown");
|
||||
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
|
||||
join_handle.listen_cancel_handle().cancel();
|
||||
} else {
|
||||
// In a forceful shutdown, we close everything
|
||||
join_handle.listen_cancel_handle().cancel();
|
||||
join_handle.connection_cancel_handle().cancel();
|
||||
}
|
||||
|
||||
// Async spin on the refcount while we wait for everything to drain
|
||||
while Rc::strong_count(&join_handle.refcount.0) > 1 {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
|
||||
let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
|
||||
.borrow_mut()
|
||||
.await;
|
||||
if let Some(join_handle) = join_handle.take() {
|
||||
join_handle.await??;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct UpgradeStream {
|
||||
read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
|
||||
write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,
|
||||
|
|
|
@ -126,6 +126,8 @@ deno_core::extension!(
|
|||
http_next::op_can_write_vectored,
|
||||
http_next::op_http_try_wait,
|
||||
http_next::op_http_wait,
|
||||
http_next::op_http_close,
|
||||
http_next::op_http_cancel,
|
||||
],
|
||||
esm = ["00_serve.js", "01_http.js"],
|
||||
);
|
||||
|
|
|
@ -20,6 +20,10 @@ pub type Request = hyper1::Request<Incoming>;
|
|||
pub type Response = hyper1::Response<ResponseBytes>;
|
||||
pub type SlabId = u32;
|
||||
|
||||
#[repr(transparent)]
|
||||
#[derive(Clone, Default)]
|
||||
pub struct RefCount(pub Rc<()>);
|
||||
|
||||
enum RequestBodyState {
|
||||
Incoming(Incoming),
|
||||
Resource(HttpRequestBodyAutocloser),
|
||||
|
@ -50,24 +54,27 @@ pub struct HttpSlabRecord {
|
|||
request_info: HttpConnectionProperties,
|
||||
request_parts: Parts,
|
||||
request_body: Option<RequestBodyState>,
|
||||
// The response may get taken before we tear this down
|
||||
/// The response may get taken before we tear this down
|
||||
response: Option<Response>,
|
||||
promise: CompletionHandle,
|
||||
trailers: Rc<RefCell<Option<HeaderMap>>>,
|
||||
been_dropped: bool,
|
||||
/// Use a `Rc` to keep track of outstanding requests. We don't use this, but
|
||||
/// when it drops, it decrements the refcount of the server itself.
|
||||
refcount: Option<RefCount>,
|
||||
#[cfg(feature = "__zombie_http_tracking")]
|
||||
alive: bool,
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) };
|
||||
pub(crate) static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) };
|
||||
}
|
||||
|
||||
macro_rules! http_trace {
|
||||
($index:expr, $args:tt) => {
|
||||
#[cfg(feature = "__http_tracing")]
|
||||
{
|
||||
let total = SLAB.with(|x| x.try_borrow().map(|x| x.len()));
|
||||
let total = $crate::slab::SLAB.with(|x| x.try_borrow().map(|x| x.len()));
|
||||
if let Ok(total) = total {
|
||||
println!("HTTP id={} total={}: {}", $index, total, format!($args));
|
||||
} else {
|
||||
|
@ -77,6 +84,8 @@ macro_rules! http_trace {
|
|||
};
|
||||
}
|
||||
|
||||
pub(crate) use http_trace;
|
||||
|
||||
/// Hold a lock on the slab table and a reference to one entry in the table.
|
||||
pub struct SlabEntry(
|
||||
NonNull<HttpSlabRecord>,
|
||||
|
@ -121,6 +130,7 @@ fn slab_insert_raw(
|
|||
request_parts: Parts,
|
||||
request_body: Option<Incoming>,
|
||||
request_info: HttpConnectionProperties,
|
||||
refcount: RefCount,
|
||||
) -> SlabId {
|
||||
let index = SLAB.with(|slab| {
|
||||
let mut slab = slab.borrow_mut();
|
||||
|
@ -135,6 +145,7 @@ fn slab_insert_raw(
|
|||
trailers,
|
||||
been_dropped: false,
|
||||
promise: CompletionHandle::default(),
|
||||
refcount: Some(refcount),
|
||||
#[cfg(feature = "__zombie_http_tracking")]
|
||||
alive: true,
|
||||
})
|
||||
|
@ -146,9 +157,10 @@ fn slab_insert_raw(
|
|||
pub fn slab_insert(
|
||||
request: Request,
|
||||
request_info: HttpConnectionProperties,
|
||||
refcount: RefCount,
|
||||
) -> SlabId {
|
||||
let (request_parts, request_body) = request.into_parts();
|
||||
slab_insert_raw(request_parts, Some(request_body), request_info)
|
||||
slab_insert_raw(request_parts, Some(request_body), request_info, refcount)
|
||||
}
|
||||
|
||||
pub fn slab_drop(index: SlabId) {
|
||||
|
@ -159,10 +171,21 @@ pub fn slab_drop(index: SlabId) {
|
|||
!record.been_dropped,
|
||||
"HTTP state error: Entry has already been dropped"
|
||||
);
|
||||
|
||||
// The logic here is somewhat complicated. A slab record cannot be expunged until it has been dropped by Rust AND
|
||||
// the promise has been completed (indicating that JavaScript is done processing). However, if Rust has finished
|
||||
// dealing with this entry, we DO want to clean up some of the associated items -- namely the request body, which
|
||||
// might include actual resources, and the refcount, which is keeping the server alive.
|
||||
record.been_dropped = true;
|
||||
if record.promise.is_completed() {
|
||||
drop(entry);
|
||||
slab_expunge(index);
|
||||
} else {
|
||||
// Take the request body, as the future has been dropped and this will allow some resources to close
|
||||
record.request_body.take();
|
||||
// Take the refcount keeping the server alive. The future is no longer alive, which means this request
|
||||
// is toast.
|
||||
record.refcount.take();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -318,6 +341,7 @@ mod tests {
|
|||
local_port: None,
|
||||
stream_type: NetworkStreamType::Tcp,
|
||||
},
|
||||
RefCount::default(),
|
||||
);
|
||||
let entry = slab_get(id);
|
||||
entry.complete();
|
||||
|
|
Loading…
Reference in a new issue