diff --git a/cli/tests/integration/watcher_tests.rs b/cli/tests/integration/watcher_tests.rs index 04320060b2..2d41a74ed5 100644 --- a/cli/tests/integration/watcher_tests.rs +++ b/cli/tests/integration/watcher_tests.rs @@ -1371,6 +1371,46 @@ async fn run_watch_reload_once() { check_alive_then_kill(child); } +/// Regression test for https://github.com/denoland/deno/issues/18960. Ensures that Deno.serve +/// operates properly after a watch restart. +#[tokio::test] +async fn test_watch_serve() { + let t = TempDir::new(); + let file_to_watch = t.path().join("file_to_watch.js"); + let file_content = r#" + console.error("serving"); + await Deno.serve({port: 4600, handler: () => new Response("hello")}); + "#; + write(&file_to_watch, file_content).unwrap(); + + let mut child = util::deno_cmd() + .current_dir(util::testdata_path()) + .arg("run") + .arg("--watch") + .arg("--unstable") + .arg("--allow-net") + .arg("-L") + .arg("debug") + .arg(&file_to_watch) + .env("NO_COLOR", "1") + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .unwrap(); + let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child); + + wait_contains("Listening on", &mut stdout_lines).await; + // Note that we start serving very quickly, so we specifically want to wait for this message + wait_contains(r#"Watching paths: [""#, &mut stderr_lines).await; + + write(&file_to_watch, file_content).unwrap(); + + wait_contains("serving", &mut stderr_lines).await; + wait_contains("Listening on", &mut stdout_lines).await; + + check_alive_then_kill(child); +} + #[tokio::test] async fn run_watch_dynamic_imports() { let t = TempDir::new(); diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 5d5d0428f9..ce7267f580 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -94,8 +94,9 @@ Deno.test(async function httpServerRejectsOnAddrInUse() { onListen: onListen(listeningPromise), onError: createOnErrorCb(ac), }); + await listeningPromise; - assertRejects( + await assertRejects( () => Deno.serve({ handler: (_req) => new Response("ok"), diff --git a/cli/util/file_watcher.rs b/cli/util/file_watcher.rs index 05415f2a63..1ad5e9ba07 100644 --- a/cli/util/file_watcher.rs +++ b/cli/util/file_watcher.rs @@ -304,6 +304,13 @@ where } loop { + // We may need to give the runtime a tick to settle, as cancellations may need to propagate + // to tasks. We choose yielding 10 times to the runtime as a decent heuristic. If watch tests + // start to fail, this may need to be increased. + for _ in 0..10 { + tokio::task::yield_now().await; + } + let mut watcher = new_watcher(watcher_sender.clone())?; consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver); diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 593a9c8166..5ed443142e 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -625,83 +625,80 @@ impl> Future for SlabFuture { fn serve_http11_unconditional( io: impl HttpServeStream, svc: impl HttpService + 'static, - cancel: RcRef, ) -> impl Future> + 'static { let conn = http1::Builder::new() .keep_alive(true) .serve_connection(io, svc); - conn - .with_upgrades() - .map_err(AnyError::from) - .try_or_cancel(cancel) + conn.with_upgrades().map_err(AnyError::from) } fn serve_http2_unconditional( io: impl HttpServeStream, svc: impl HttpService + 'static, - cancel: RcRef, ) -> impl Future> + 'static { let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc); - conn.map_err(AnyError::from).try_or_cancel(cancel) + conn.map_err(AnyError::from) } async fn serve_http2_autodetect( io: impl HttpServeStream, svc: impl HttpService + 'static, - cancel: RcRef, ) -> Result<(), AnyError> { let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX); let (matches, io) = prefix.match_prefix().await?; if matches { - serve_http2_unconditional(io, svc, cancel).await + serve_http2_unconditional(io, svc).await } else { - serve_http11_unconditional(io, svc, cancel).await + serve_http11_unconditional(io, svc).await } } fn serve_https( mut io: TlsStream, request_info: HttpConnectionProperties, - cancel: RcRef, + cancel: Rc, tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) }); - spawn_local(async { - io.handshake().await?; - // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect - // based on the prefix bytes - let handshake = io.get_ref().1.alpn_protocol(); - if handshake == Some(TLS_ALPN_HTTP_2) { - serve_http2_unconditional(io, svc, cancel).await - } else if handshake == Some(TLS_ALPN_HTTP_11) { - serve_http11_unconditional(io, svc, cancel).await - } else { - serve_http2_autodetect(io, svc, cancel).await + spawn_local( + async { + io.handshake().await?; + // If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect + // based on the prefix bytes + let handshake = io.get_ref().1.alpn_protocol(); + if handshake == Some(TLS_ALPN_HTTP_2) { + serve_http2_unconditional(io, svc).await + } else if handshake == Some(TLS_ALPN_HTTP_11) { + serve_http11_unconditional(io, svc).await + } else { + serve_http2_autodetect(io, svc).await + } } - }) + .try_or_cancel(cancel), + ) } fn serve_http( io: impl HttpServeStream, request_info: HttpConnectionProperties, - cancel: RcRef, + cancel: Rc, tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { new_slab_future(req, request_info.clone(), tx.clone()) }); - spawn_local(serve_http2_autodetect(io, svc, cancel)) + spawn_local(serve_http2_autodetect(io, svc).try_or_cancel(cancel)) } fn serve_http_on( network_stream: NetworkStream, listen_properties: &HttpListenProperties, - cancel: RcRef, + cancel: Rc, tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { // We always want some sort of peer address. If we can't get one, just make up one. @@ -733,13 +730,14 @@ fn serve_http_on( struct HttpJoinHandle( AsyncRefCell>>>, - CancelHandle, + // Cancel handle must live in a separate Rc to avoid keeping the outer join handle ref'd + Rc, AsyncRefCell>, ); impl HttpJoinHandle { - fn cancel_handle(self: &Rc) -> RcRef { - RcRef::map(self, |this| &this.1) + fn cancel_handle(self: &Rc) -> Rc { + self.1.clone() } } @@ -753,6 +751,13 @@ impl Resource for HttpJoinHandle { } } +impl Drop for HttpJoinHandle { + fn drop(&mut self) { + // In some cases we may be dropped without closing, so let's cancel everything on the way out + self.1.cancel(); + } +} + #[op(v8)] pub fn op_serve_http( state: Rc>, @@ -773,12 +778,12 @@ pub fn op_serve_http( let (tx, rx) = tokio::sync::mpsc::channel(10); let resource: Rc = Rc::new(HttpJoinHandle( AsyncRefCell::new(None), - CancelHandle::new(), + CancelHandle::new_rc(), AsyncRefCell::new(rx), )); let cancel_clone = resource.cancel_handle(); - let listen_properties_clone = listen_properties.clone(); + let listen_properties_clone: HttpListenProperties = listen_properties.clone(); let handle = spawn_local(async move { loop { let conn = listener @@ -813,7 +818,7 @@ pub fn op_serve_http_on( state: Rc>, conn: ResourceId, ) -> Result<(ResourceId, &'static str, String), AnyError> { - let network_stream = + let network_stream: NetworkStream = DefaultHttpRequestProperties::get_network_stream_for_rid( &mut state.borrow_mut(), conn, @@ -828,7 +833,7 @@ pub fn op_serve_http_on( let (tx, rx) = tokio::sync::mpsc::channel(10); let resource: Rc = Rc::new(HttpJoinHandle( AsyncRefCell::new(None), - CancelHandle::new(), + CancelHandle::new_rc(), AsyncRefCell::new(rx), )); @@ -862,7 +867,7 @@ pub async fn op_http_wait( .resource_table .get::(rid)?; - let cancel = join_handle.clone().cancel_handle(); + let cancel = join_handle.cancel_handle(); let next = async { let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await; recv.recv().await