1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-13 09:32:24 -05:00

fix(cli/inspector): shutdown server gracefully on drop (#7716)

This commit is contained in:
Casper Beyer 2020-09-28 02:19:36 +08:00 committed by GitHub
parent ebcb032c6b
commit 5db72dcaf3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -10,6 +10,7 @@ use deno_core::futures::channel::mpsc::UnboundedReceiver;
use deno_core::futures::channel::mpsc::UnboundedSender;
use deno_core::futures::channel::oneshot;
use deno_core::futures::future::Future;
use deno_core::futures::pin_mut;
use deno_core::futures::prelude::*;
use deno_core::futures::select;
use deno_core::futures::stream::FuturesUnordered;
@ -43,21 +44,30 @@ use warp::Filter;
pub struct InspectorServer {
pub host: SocketAddr,
register_inspector_tx: UnboundedSender<InspectorInfo>,
_thread_handle: thread::JoinHandle<()>,
shutdown_server_tx: Option<oneshot::Sender<()>>,
thread_handle: Option<thread::JoinHandle<()>>,
}
impl InspectorServer {
pub fn new(host: SocketAddr) -> Self {
let (register_inspector_tx, register_inspector_rx) =
mpsc::unbounded::<InspectorInfo>();
let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel();
let thread_handle = thread::spawn(move || {
crate::tokio_util::run_basic(server(host, register_inspector_rx))
crate::tokio_util::run_basic(server(
host,
register_inspector_rx,
shutdown_server_rx,
))
});
Self {
host,
register_inspector_tx,
_thread_handle: thread_handle,
shutdown_server_tx: Some(shutdown_server_tx),
thread_handle: Some(thread_handle),
}
}
@ -66,6 +76,20 @@ impl InspectorServer {
}
}
impl Drop for InspectorServer {
fn drop(&mut self) {
if let Some(shutdown_server_tx) = self.shutdown_server_tx.take() {
shutdown_server_tx
.send(())
.expect("unable to send shutdown signal");
}
if let Some(thread_handle) = self.thread_handle.take() {
thread_handle.join().expect("unable to join thread");
}
}
}
/// Inspector information that is sent from the isolate thread to the server
/// thread when a new inspector is created.
struct InspectorInfo {
@ -117,24 +141,8 @@ impl InspectorInfo {
async fn server(
host: SocketAddr,
register_inspector_rx: UnboundedReceiver<InspectorInfo>,
shutdown_server_rx: oneshot::Receiver<()>,
) {
// When the main thread shuts down, The Rust stdlib will call `WSACleanup()`,
// which shuts down the network stack. This thread will still be
// running at that time (because it never exits), but all attempts at network
// I/O will fail with a `WSANOTINITIALIZED` error, which causes a panic.
// To prevent this from happening, Winsock is initialized another time here;
// this increases Winsock's internal reference count, so it won't shut
// itself down when the main thread calls `WSACleanup()` upon exit.
// TODO: When the last `Inspector` instance is dropped, make it signal the
// server thread so it exits cleanly, then join it with the main thread.
#[cfg(windows)]
unsafe {
use winapi::um::winsock2::{WSAStartup, WSADATA};
let mut wsa_data = MaybeUninit::<WSADATA>::zeroed();
let r = WSAStartup(0x202 /* Winsock 2.2 */, wsa_data.as_mut_ptr());
assert_eq!(r, 0);
}
// TODO: put the `inspector_map` in an `Rc<RefCell<_>>` instead. This is
// currently not possible because warp requires all filters to implement
// `Send`, which should not be necessary because we are using the
@ -143,7 +151,7 @@ async fn server(
let inspector_map = Arc::new(Mutex::new(inspector_map));
let inspector_map_ = inspector_map.clone();
let mut register_inspector_handler = register_inspector_rx
let register_inspector_handler = register_inspector_rx
.map(|info| {
eprintln!(
"Debugger listening on {}",
@ -157,7 +165,7 @@ async fn server(
.collect::<()>();
let inspector_map_ = inspector_map_.clone();
let mut deregister_inspector_handler = future::poll_fn(|cx| {
let deregister_inspector_handler = future::poll_fn(|cx| {
let mut g = inspector_map_.lock().unwrap();
g.retain(|_, info| info.canary_rx.poll_unpin(cx) == Poll::Pending);
Poll::<Never>::Pending
@ -208,8 +216,10 @@ async fn server(
let server_routes =
websocket_route.or(json_version_route).or(json_list_route);
let mut server_handler = warp::serve(server_routes)
.try_bind_ephemeral(host)
let server_handler = warp::serve(server_routes)
.try_bind_with_graceful_shutdown(host, async {
shutdown_server_rx.await.ok();
})
.map(|(_, fut)| fut)
.unwrap_or_else(|err| {
eprintln!("Cannot start inspector server: {}.", err);
@ -217,10 +227,14 @@ async fn server(
})
.fuse();
pin_mut!(register_inspector_handler);
pin_mut!(deregister_inspector_handler);
pin_mut!(server_handler);
select! {
_ = register_inspector_handler => (),
_ = deregister_inspector_handler => unreachable!(),
_ = server_handler => unreachable!(),
_ = server_handler => (),
}
}