From 5db72dcaf364d0a9d9a5c5c9c10e80cdf8a199ad Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Mon, 28 Sep 2020 02:19:36 +0800 Subject: [PATCH] fix(cli/inspector): shutdown server gracefully on drop (#7716) --- cli/inspector.rs | 64 +++++++++++++++++++++++++++++------------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/cli/inspector.rs b/cli/inspector.rs index e4c0085ef8..d0601c5227 100644 --- a/cli/inspector.rs +++ b/cli/inspector.rs @@ -10,6 +10,7 @@ use deno_core::futures::channel::mpsc::UnboundedReceiver; use deno_core::futures::channel::mpsc::UnboundedSender; use deno_core::futures::channel::oneshot; use deno_core::futures::future::Future; +use deno_core::futures::pin_mut; use deno_core::futures::prelude::*; use deno_core::futures::select; use deno_core::futures::stream::FuturesUnordered; @@ -43,21 +44,30 @@ use warp::Filter; pub struct InspectorServer { pub host: SocketAddr, register_inspector_tx: UnboundedSender, - _thread_handle: thread::JoinHandle<()>, + shutdown_server_tx: Option>, + thread_handle: Option>, } impl InspectorServer { pub fn new(host: SocketAddr) -> Self { let (register_inspector_tx, register_inspector_rx) = mpsc::unbounded::(); + + let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel(); + let thread_handle = thread::spawn(move || { - crate::tokio_util::run_basic(server(host, register_inspector_rx)) + crate::tokio_util::run_basic(server( + host, + register_inspector_rx, + shutdown_server_rx, + )) }); Self { host, register_inspector_tx, - _thread_handle: thread_handle, + shutdown_server_tx: Some(shutdown_server_tx), + thread_handle: Some(thread_handle), } } @@ -66,6 +76,20 @@ impl InspectorServer { } } +impl Drop for InspectorServer { + fn drop(&mut self) { + if let Some(shutdown_server_tx) = self.shutdown_server_tx.take() { + shutdown_server_tx + .send(()) + .expect("unable to send shutdown signal"); + } + + if let Some(thread_handle) = self.thread_handle.take() { + thread_handle.join().expect("unable to join thread"); + } + } +} + /// Inspector information that is sent from the isolate thread to the server /// thread when a new inspector is created. struct InspectorInfo { @@ -117,24 +141,8 @@ impl InspectorInfo { async fn server( host: SocketAddr, register_inspector_rx: UnboundedReceiver, + shutdown_server_rx: oneshot::Receiver<()>, ) { - // When the main thread shuts down, The Rust stdlib will call `WSACleanup()`, - // which shuts down the network stack. This thread will still be - // running at that time (because it never exits), but all attempts at network - // I/O will fail with a `WSANOTINITIALIZED` error, which causes a panic. - // To prevent this from happening, Winsock is initialized another time here; - // this increases Winsock's internal reference count, so it won't shut - // itself down when the main thread calls `WSACleanup()` upon exit. - // TODO: When the last `Inspector` instance is dropped, make it signal the - // server thread so it exits cleanly, then join it with the main thread. - #[cfg(windows)] - unsafe { - use winapi::um::winsock2::{WSAStartup, WSADATA}; - let mut wsa_data = MaybeUninit::::zeroed(); - let r = WSAStartup(0x202 /* Winsock 2.2 */, wsa_data.as_mut_ptr()); - assert_eq!(r, 0); - } - // TODO: put the `inspector_map` in an `Rc>` instead. This is // currently not possible because warp requires all filters to implement // `Send`, which should not be necessary because we are using the @@ -143,7 +151,7 @@ async fn server( let inspector_map = Arc::new(Mutex::new(inspector_map)); let inspector_map_ = inspector_map.clone(); - let mut register_inspector_handler = register_inspector_rx + let register_inspector_handler = register_inspector_rx .map(|info| { eprintln!( "Debugger listening on {}", @@ -157,7 +165,7 @@ async fn server( .collect::<()>(); let inspector_map_ = inspector_map_.clone(); - let mut deregister_inspector_handler = future::poll_fn(|cx| { + let deregister_inspector_handler = future::poll_fn(|cx| { let mut g = inspector_map_.lock().unwrap(); g.retain(|_, info| info.canary_rx.poll_unpin(cx) == Poll::Pending); Poll::::Pending @@ -208,8 +216,10 @@ async fn server( let server_routes = websocket_route.or(json_version_route).or(json_list_route); - let mut server_handler = warp::serve(server_routes) - .try_bind_ephemeral(host) + let server_handler = warp::serve(server_routes) + .try_bind_with_graceful_shutdown(host, async { + shutdown_server_rx.await.ok(); + }) .map(|(_, fut)| fut) .unwrap_or_else(|err| { eprintln!("Cannot start inspector server: {}.", err); @@ -217,10 +227,14 @@ async fn server( }) .fuse(); + pin_mut!(register_inspector_handler); + pin_mut!(deregister_inspector_handler); + pin_mut!(server_handler); + select! { _ = register_inspector_handler => (), _ = deregister_inspector_handler => unreachable!(), - _ = server_handler => unreachable!(), + _ = server_handler => (), } }