1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-07 06:46:59 -05:00

fix(ext/http): Ensure Deno.serve works across --watch restarts (#18998)

Fixes #16699 and #18960 by ensuring that we release our HTTP
`spawn_local` tasks when the HTTP resource is dropped.

Because our cancel handle was being projected from the resource via
`RcMap`, the resource was never `Drop`ped. By splitting the handle out
into its own `Rc`, we can avoid keeping the resource alive and let it
drop to cancel everything.
This commit is contained in:
Matt Mastracci 2023-05-08 09:52:56 +02:00 committed by David Sherret
parent 0dbbb34fcc
commit 5e848f0d1b
4 changed files with 88 additions and 35 deletions

View file

@ -1371,6 +1371,46 @@ async fn run_watch_reload_once() {
check_alive_then_kill(child); check_alive_then_kill(child);
} }
/// Regression test for https://github.com/denoland/deno/issues/18960. Ensures that Deno.serve
/// operates properly after a watch restart.
#[tokio::test]
async fn test_watch_serve() {
let t = TempDir::new();
let file_to_watch = t.path().join("file_to_watch.js");
let file_content = r#"
console.error("serving");
await Deno.serve({port: 4600, handler: () => new Response("hello")});
"#;
write(&file_to_watch, file_content).unwrap();
let mut child = util::deno_cmd()
.current_dir(util::testdata_path())
.arg("run")
.arg("--watch")
.arg("--unstable")
.arg("--allow-net")
.arg("-L")
.arg("debug")
.arg(&file_to_watch)
.env("NO_COLOR", "1")
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child);
wait_contains("Listening on", &mut stdout_lines).await;
// Note that we start serving very quickly, so we specifically want to wait for this message
wait_contains(r#"Watching paths: [""#, &mut stderr_lines).await;
write(&file_to_watch, file_content).unwrap();
wait_contains("serving", &mut stderr_lines).await;
wait_contains("Listening on", &mut stdout_lines).await;
check_alive_then_kill(child);
}
#[tokio::test] #[tokio::test]
async fn run_watch_dynamic_imports() { async fn run_watch_dynamic_imports() {
let t = TempDir::new(); let t = TempDir::new();

View file

@ -94,8 +94,9 @@ Deno.test(async function httpServerRejectsOnAddrInUse() {
onListen: onListen(listeningPromise), onListen: onListen(listeningPromise),
onError: createOnErrorCb(ac), onError: createOnErrorCb(ac),
}); });
await listeningPromise;
assertRejects( await assertRejects(
() => () =>
Deno.serve({ Deno.serve({
handler: (_req) => new Response("ok"), handler: (_req) => new Response("ok"),

View file

@ -304,6 +304,13 @@ where
} }
loop { loop {
// We may need to give the runtime a tick to settle, as cancellations may need to propagate
// to tasks. We choose yielding 10 times to the runtime as a decent heuristic. If watch tests
// start to fail, this may need to be increased.
for _ in 0..10 {
tokio::task::yield_now().await;
}
let mut watcher = new_watcher(watcher_sender.clone())?; let mut watcher = new_watcher(watcher_sender.clone())?;
consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver); consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver);

View file

@ -625,83 +625,80 @@ impl<F: Future<Output = ()>> Future for SlabFuture<F> {
fn serve_http11_unconditional( fn serve_http11_unconditional(
io: impl HttpServeStream, io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
cancel: RcRef<CancelHandle>,
) -> impl Future<Output = Result<(), AnyError>> + 'static { ) -> impl Future<Output = Result<(), AnyError>> + 'static {
let conn = http1::Builder::new() let conn = http1::Builder::new()
.keep_alive(true) .keep_alive(true)
.serve_connection(io, svc); .serve_connection(io, svc);
conn conn.with_upgrades().map_err(AnyError::from)
.with_upgrades()
.map_err(AnyError::from)
.try_or_cancel(cancel)
} }
fn serve_http2_unconditional( fn serve_http2_unconditional(
io: impl HttpServeStream, io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
cancel: RcRef<CancelHandle>,
) -> impl Future<Output = Result<(), AnyError>> + 'static { ) -> impl Future<Output = Result<(), AnyError>> + 'static {
let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc); let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc);
conn.map_err(AnyError::from).try_or_cancel(cancel) conn.map_err(AnyError::from)
} }
async fn serve_http2_autodetect( async fn serve_http2_autodetect(
io: impl HttpServeStream, io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static, svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
cancel: RcRef<CancelHandle>,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX); let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?; let (matches, io) = prefix.match_prefix().await?;
if matches { if matches {
serve_http2_unconditional(io, svc, cancel).await serve_http2_unconditional(io, svc).await
} else { } else {
serve_http11_unconditional(io, svc, cancel).await serve_http11_unconditional(io, svc).await
} }
} }
fn serve_https( fn serve_https(
mut io: TlsStream, mut io: TlsStream,
request_info: HttpConnectionProperties, request_info: HttpConnectionProperties,
cancel: RcRef<CancelHandle>, cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>, tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> { ) -> JoinHandle<Result<(), AnyError>> {
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
let svc = service_fn(move |req: Request| { let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone()) new_slab_future(req, request_info.clone(), tx.clone())
}); });
spawn_local(async { spawn_local(
io.handshake().await?; async {
// If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect io.handshake().await?;
// based on the prefix bytes // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
let handshake = io.get_ref().1.alpn_protocol(); // based on the prefix bytes
if handshake == Some(TLS_ALPN_HTTP_2) { let handshake = io.get_ref().1.alpn_protocol();
serve_http2_unconditional(io, svc, cancel).await if handshake == Some(TLS_ALPN_HTTP_2) {
} else if handshake == Some(TLS_ALPN_HTTP_11) { serve_http2_unconditional(io, svc).await
serve_http11_unconditional(io, svc, cancel).await } else if handshake == Some(TLS_ALPN_HTTP_11) {
} else { serve_http11_unconditional(io, svc).await
serve_http2_autodetect(io, svc, cancel).await } else {
serve_http2_autodetect(io, svc).await
}
} }
}) .try_or_cancel(cancel),
)
} }
fn serve_http( fn serve_http(
io: impl HttpServeStream, io: impl HttpServeStream,
request_info: HttpConnectionProperties, request_info: HttpConnectionProperties,
cancel: RcRef<CancelHandle>, cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>, tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> { ) -> JoinHandle<Result<(), AnyError>> {
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
let svc = service_fn(move |req: Request| { let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), tx.clone()) new_slab_future(req, request_info.clone(), tx.clone())
}); });
spawn_local(serve_http2_autodetect(io, svc, cancel)) spawn_local(serve_http2_autodetect(io, svc).try_or_cancel(cancel))
} }
fn serve_http_on( fn serve_http_on(
network_stream: NetworkStream, network_stream: NetworkStream,
listen_properties: &HttpListenProperties, listen_properties: &HttpListenProperties,
cancel: RcRef<CancelHandle>, cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>, tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> { ) -> JoinHandle<Result<(), AnyError>> {
// We always want some sort of peer address. If we can't get one, just make up one. // We always want some sort of peer address. If we can't get one, just make up one.
@ -733,13 +730,14 @@ fn serve_http_on(
struct HttpJoinHandle( struct HttpJoinHandle(
AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>, AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
CancelHandle, // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd
Rc<CancelHandle>,
AsyncRefCell<tokio::sync::mpsc::Receiver<u32>>, AsyncRefCell<tokio::sync::mpsc::Receiver<u32>>,
); );
impl HttpJoinHandle { impl HttpJoinHandle {
fn cancel_handle(self: &Rc<Self>) -> RcRef<CancelHandle> { fn cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
RcRef::map(self, |this| &this.1) self.1.clone()
} }
} }
@ -753,6 +751,13 @@ impl Resource for HttpJoinHandle {
} }
} }
impl Drop for HttpJoinHandle {
fn drop(&mut self) {
// In some cases we may be dropped without closing, so let's cancel everything on the way out
self.1.cancel();
}
}
#[op(v8)] #[op(v8)]
pub fn op_serve_http( pub fn op_serve_http(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
@ -773,12 +778,12 @@ pub fn op_serve_http(
let (tx, rx) = tokio::sync::mpsc::channel(10); let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle( let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
AsyncRefCell::new(None), AsyncRefCell::new(None),
CancelHandle::new(), CancelHandle::new_rc(),
AsyncRefCell::new(rx), AsyncRefCell::new(rx),
)); ));
let cancel_clone = resource.cancel_handle(); let cancel_clone = resource.cancel_handle();
let listen_properties_clone = listen_properties.clone(); let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn_local(async move { let handle = spawn_local(async move {
loop { loop {
let conn = listener let conn = listener
@ -813,7 +818,7 @@ pub fn op_serve_http_on(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
conn: ResourceId, conn: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError> { ) -> Result<(ResourceId, &'static str, String), AnyError> {
let network_stream = let network_stream: NetworkStream =
DefaultHttpRequestProperties::get_network_stream_for_rid( DefaultHttpRequestProperties::get_network_stream_for_rid(
&mut state.borrow_mut(), &mut state.borrow_mut(),
conn, conn,
@ -828,7 +833,7 @@ pub fn op_serve_http_on(
let (tx, rx) = tokio::sync::mpsc::channel(10); let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle( let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
AsyncRefCell::new(None), AsyncRefCell::new(None),
CancelHandle::new(), CancelHandle::new_rc(),
AsyncRefCell::new(rx), AsyncRefCell::new(rx),
)); ));
@ -862,7 +867,7 @@ pub async fn op_http_wait(
.resource_table .resource_table
.get::<HttpJoinHandle>(rid)?; .get::<HttpJoinHandle>(rid)?;
let cancel = join_handle.clone().cancel_handle(); let cancel = join_handle.cancel_handle();
let next = async { let next = async {
let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await; let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await;
recv.recv().await recv.recv().await