diff --git a/cli/inspector.rs b/cli/inspector.rs index a637e980ff..b6f653f4cc 100644 --- a/cli/inspector.rs +++ b/cli/inspector.rs @@ -37,7 +37,6 @@ use std::sync::Once; use std::thread; use uuid::Uuid; use warp::filters::ws; -use warp::filters::ws::WebSocket; use warp::Filter; struct InspectorServer { @@ -91,7 +90,7 @@ struct InspectorInfo { host: SocketAddr, uuid: Uuid, thread_name: Option, - new_websocket_tx: UnboundedSender, + new_websocket_tx: UnboundedSender, canary_rx: oneshot::Receiver, } @@ -178,7 +177,9 @@ async fn server( g.get(&uuid).map(|info| info.new_websocket_tx.clone()).map( |new_websocket_tx| { ws.on_upgrade(move |websocket| async move { - let _ = new_websocket_tx.unbounded_send(websocket); + let (proxy, pump) = create_websocket_proxy(websocket); + let _ = new_websocket_tx.unbounded_send(proxy); + pump.await; }) }, ) @@ -223,6 +224,69 @@ async fn server( } } +type WebSocketProxySender = UnboundedSender; +type WebSocketProxyReceiver = + UnboundedReceiver>; + +/// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form +/// a duplex channel for sending/receiving websocket messages. +struct WebSocketProxy { + tx: WebSocketProxySender, + rx: WebSocketProxyReceiver, +} + +impl WebSocketProxy { + pub fn split(self) -> (WebSocketProxySender, WebSocketProxyReceiver) { + (self.tx, self.rx) + } +} + +/// Creates a future that proxies messages sent and received on a warp WebSocket +/// to a UnboundedSender/UnboundedReceiver pair. We need this to sidestep +/// Tokio's task budget, which causes issues when DenoInspector::poll_sessions() +/// needs to block the thread because JavaScript execution is paused. +/// +/// This works because UnboundedSender/UnboundedReceiver are implemented in the +/// 'futures' crate, therefore they can't participate in Tokio's cooperative +/// task yielding. +/// +/// A tuple is returned, where the first element is a duplex channel that can +/// be used to send/receive messages on the websocket, and the second element +/// is a future that does the forwarding. +fn create_websocket_proxy( + websocket: ws::WebSocket, +) -> (WebSocketProxy, impl Future + Send) { + // The 'outbound' channel carries messages sent to the websocket. + let (outbound_tx, outbound_rx) = mpsc::unbounded(); + + // The 'inbound' channel carries messages received from the websocket. + let (inbound_tx, inbound_rx) = mpsc::unbounded(); + + let proxy = WebSocketProxy { + tx: outbound_tx, + rx: inbound_rx, + }; + + // The pump future takes care of forwarding messages between the websocket + // and channels. It resolves to () when either side disconnects, ignoring any + // errors. + let pump = async move { + let (websocket_tx, websocket_rx) = websocket.split(); + + let outbound_pump = + outbound_rx.map(Ok).forward(websocket_tx).map_err(|_| ()); + + let inbound_pump = websocket_rx + .map(|msg| inbound_tx.unbounded_send(msg)) + .map_err(|_| ()) + .try_collect::<()>(); + + let _ = future::try_join(outbound_pump, inbound_pump).await; + }; + + (proxy, pump) +} + #[derive(Clone, Copy)] enum PollState { Idle, @@ -322,7 +386,8 @@ impl DenoInspector { let mut hs = v8::HandleScope::new(v8_isolate); let scope = hs.enter(); - let (new_websocket_tx, new_websocket_rx) = mpsc::unbounded::(); + let (new_websocket_tx, new_websocket_rx) = + mpsc::unbounded::(); let (canary_tx, canary_rx) = oneshot::channel::(); let info = InspectorInfo { @@ -511,7 +576,7 @@ struct InspectorSessions { impl InspectorSessions { fn new( inspector_ptr: *mut DenoInspector, - new_websocket_rx: UnboundedReceiver, + new_websocket_rx: UnboundedReceiver, ) -> RefCell { let new_incoming = new_websocket_rx .map(move |websocket| DenoInspectorSession::new(inspector_ptr, websocket)) @@ -609,11 +674,8 @@ impl task::ArcWake for InspectorWaker { struct DenoInspectorSession { v8_channel: v8::inspector::ChannelBase, v8_session: v8::UniqueRef, - message_handler: Pin + 'static>>, - // Internal channel/queue that temporarily stores messages sent by V8 to - // the front-end, before they are sent over the websocket. - outbound_queue_tx: - UnboundedSender>, + websocket_tx: WebSocketProxySender, + websocket_rx_handler: Pin + 'static>>, } impl Deref for DenoInspectorSession { @@ -634,7 +696,7 @@ impl DenoInspectorSession { pub fn new( inspector_ptr: *mut DenoInspector, - websocket: WebSocket, + websocket: WebSocketProxy, ) -> Box { new_box_with(move |self_ptr| { let v8_channel = v8::inspector::ChannelBase::new::(); @@ -648,54 +710,38 @@ impl DenoInspectorSession { &empty_view, ); - let (outbound_queue_tx, outbound_queue_rx) = - mpsc::unbounded::>(); - - let message_handler = - Self::create_message_handler(self_ptr, websocket, outbound_queue_rx); + let (websocket_tx, websocket_rx) = websocket.split(); + let websocket_rx_handler = + Self::receive_from_websocket(self_ptr, websocket_rx); Self { v8_channel, v8_session, - message_handler, - outbound_queue_tx, + websocket_tx, + websocket_rx_handler, } }) } - fn create_message_handler( + /// Returns a future that receives messages from the websocket and dispatches + /// them to the V8 session. + fn receive_from_websocket( self_ptr: *mut Self, - websocket: WebSocket, - outbound_queue_rx: UnboundedReceiver< - v8::UniquePtr, - >, + websocket_rx: WebSocketProxyReceiver, ) -> Pin + 'static>> { - let (websocket_tx, websocket_rx) = websocket.split(); - - // Receive messages from the websocket and dispatch them to the V8 session. - let inbound_pump = websocket_rx - .map_ok(move |msg| { - let msg = msg.as_bytes(); - let msg = v8::inspector::StringView::from(msg); - unsafe { &mut *self_ptr }.dispatch_protocol_message(&msg); - }) - .try_collect::<()>(); - - // Convert and forward messages from the outbound message queue to the - // websocket. - let outbound_pump = outbound_queue_rx - .map(move |msg| { - let msg = msg.unwrap().string().to_string(); - let msg = ws::Message::text(msg); - Ok(msg) - }) - .forward(websocket_tx); - - let disconnect_future = future::try_join(inbound_pump, outbound_pump); - async move { eprintln!("Debugger session started."); - match disconnect_future.await { + + let result = websocket_rx + .map_ok(move |msg| { + let msg = msg.as_bytes(); + let msg = v8::inspector::StringView::from(msg); + unsafe { &mut *self_ptr }.dispatch_protocol_message(&msg); + }) + .try_collect::<()>() + .await; + + match result { Ok(_) => eprintln!("Debugger session ended."), Err(err) => eprintln!("Debugger session ended: {}.", err), }; @@ -703,6 +749,12 @@ impl DenoInspectorSession { .boxed_local() } + fn send_to_websocket(&self, msg: v8::UniquePtr) { + let msg = msg.unwrap().string().to_string(); + let msg = ws::Message::text(msg); + let _ = self.websocket_tx.unbounded_send(msg); + } + pub fn break_on_first_statement(&mut self) { let reason = v8::inspector::StringView::from(&b"debugCommand"[..]); let detail = v8::inspector::StringView::empty(); @@ -724,14 +776,14 @@ impl v8::inspector::ChannelImpl for DenoInspectorSession { _call_id: i32, message: v8::UniquePtr, ) { - let _ = self.outbound_queue_tx.unbounded_send(message); + self.send_to_websocket(message); } fn send_notification( &mut self, message: v8::UniquePtr, ) { - let _ = self.outbound_queue_tx.unbounded_send(message); + self.send_to_websocket(message); } fn flush_protocol_notifications(&mut self) {} @@ -740,7 +792,7 @@ impl v8::inspector::ChannelImpl for DenoInspectorSession { impl Future for DenoInspectorSession { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - self.message_handler.poll_unpin(cx) + self.websocket_rx_handler.poll_unpin(cx) } } diff --git a/cli/tests/inspector3.js b/cli/tests/inspector3.js new file mode 100644 index 0000000000..de079a1bc8 --- /dev/null +++ b/cli/tests/inspector3.js @@ -0,0 +1,6 @@ +for (let i = 0; i < 128; i++) { + console.log(i); + debugger; +} +await new Promise((res, _) => setTimeout(res, 100)); +console.log("done"); diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index 0847c18068..d4c58a685b 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -2152,10 +2152,7 @@ fn extract_ws_url_from_stderr( #[tokio::test] async fn inspector_connect() { - let script = deno::test_util::root_path() - .join("cli") - .join("tests") - .join("inspector1.js"); + let script = util::tests_path().join("inspector1.js"); let mut child = util::deno_cmd() .arg("run") // Warning: each inspector test should be on its own port to avoid @@ -2174,6 +2171,7 @@ async fn inspector_connect() { .expect("Can't connect"); assert_eq!("101 Switching Protocols", response.status().to_string()); child.kill().unwrap(); + child.wait().unwrap(); } enum TestStep { @@ -2184,10 +2182,7 @@ enum TestStep { #[tokio::test] async fn inspector_break_on_first_line() { - let script = deno::test_util::root_path() - .join("cli") - .join("tests") - .join("inspector2.js"); + let script = util::tests_path().join("inspector2.js"); let mut child = util::deno_cmd() .arg("run") // Warning: each inspector test should be on its own port to avoid @@ -2224,12 +2219,12 @@ async fn inspector_break_on_first_line() { WsRecv(r#"{"id":3,"result":{}}"#), WsRecv(r#"{"method":"Debugger.paused","#), WsSend( - r#"{"id":5,"method":"Runtime.evaluate","params":{"expression":"Deno.core.print(\"hello from the inspector\\n\")","contextId":1,"includeCommandLineAPI":true,"silent":false,"returnByValue":true}}"#, + r#"{"id":4,"method":"Runtime.evaluate","params":{"expression":"Deno.core.print(\"hello from the inspector\\n\")","contextId":1,"includeCommandLineAPI":true,"silent":false,"returnByValue":true}}"#, ), - WsRecv(r#"{"id":5,"result":{"result":{"type":"undefined"}}}"#), + WsRecv(r#"{"id":4,"result":{"result":{"type":"undefined"}}}"#), StdOut("hello from the inspector"), - WsSend(r#"{"id":6,"method":"Debugger.resume"}"#), - WsRecv(r#"{"id":6,"result":{}}"#), + WsSend(r#"{"id":5,"method":"Debugger.resume"}"#), + WsRecv(r#"{"id":5,"result":{}}"#), StdOut("hello from the script"), ]; @@ -2254,14 +2249,12 @@ async fn inspector_break_on_first_line() { } child.kill().unwrap(); + child.wait().unwrap(); } #[tokio::test] async fn inspector_pause() { - let script = deno::test_util::root_path() - .join("cli") - .join("tests") - .join("inspector1.js"); + let script = util::tests_path().join("inspector1.js"); let mut child = util::deno_cmd() .arg("run") // Warning: each inspector test should be on its own port to avoid @@ -2317,10 +2310,7 @@ async fn inspector_pause() { #[tokio::test] async fn inspector_port_collision() { - let script = deno::test_util::root_path() - .join("cli") - .join("tests") - .join("inspector1.js"); + let script = util::tests_path().join("inspector1.js"); let mut child1 = util::deno_cmd() .arg("run") .arg("--inspect=127.0.0.1:9231") @@ -2348,8 +2338,93 @@ async fn inspector_port_collision() { .read_to_string(&mut stderr_str_2) .unwrap(); assert!(stderr_str_2.contains("Cannot start inspector server")); + child1.kill().unwrap(); - let _ = child2.kill(); + child1.wait().unwrap(); + child2.wait().unwrap(); +} + +#[tokio::test] +async fn inspector_does_not_hang() { + let script = util::tests_path().join("inspector3.js"); + let mut child = util::deno_cmd() + .arg("run") + // Warning: each inspector test should be on its own port to avoid + // conflicting with another inspector test. + .arg("--inspect-brk=127.0.0.1:9232") + .arg(script) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .unwrap(); + + let stderr = child.stderr.as_mut().unwrap(); + let ws_url = extract_ws_url_from_stderr(stderr); + let (socket, response) = tokio_tungstenite::connect_async(ws_url) + .await + .expect("Can't connect"); + assert_eq!(response.status(), 101); // Switching protocols. + + let (mut socket_tx, socket_rx) = socket.split(); + let mut socket_rx = + socket_rx.map(|msg| msg.unwrap().to_string()).filter(|msg| { + let pass = !msg.starts_with(r#"{"method":"Debugger.scriptParsed","#); + futures::future::ready(pass) + }); + + let stdout = child.stdout.as_mut().unwrap(); + let mut stdout_lines = + std::io::BufReader::new(stdout).lines().map(|r| r.unwrap()); + + use TestStep::*; + let test_steps = vec![ + WsSend(r#"{"id":1,"method":"Runtime.enable"}"#), + WsSend(r#"{"id":2,"method":"Debugger.enable"}"#), + WsRecv( + r#"{"method":"Runtime.executionContextCreated","params":{"context":{"id":1,"#, + ), + WsRecv(r#"{"id":1,"result":{}}"#), + WsRecv(r#"{"id":2,"result":{"debuggerId":"#), + WsSend(r#"{"id":3,"method":"Runtime.runIfWaitingForDebugger"}"#), + WsRecv(r#"{"id":3,"result":{}}"#), + WsRecv(r#"{"method":"Debugger.paused","#), + WsSend(r#"{"id":4,"method":"Debugger.resume"}"#), + WsRecv(r#"{"id":4,"result":{}}"#), + WsRecv(r#"{"method":"Debugger.resumed","params":{}}"#), + ]; + + for step in test_steps { + match step { + WsRecv(s) => assert!(socket_rx.next().await.unwrap().starts_with(s)), + WsSend(s) => socket_tx.send(s.into()).await.unwrap(), + _ => unreachable!(), + } + } + + for i in 0..128u32 { + let request_id = i + 10; + // Expect the number {i} on stdout. + let s = format!("{}", i); + assert_eq!(stdout_lines.next().unwrap(), s); + // Expect hitting the `debugger` statement. + let s = r#"{"method":"Debugger.paused","#; + assert!(socket_rx.next().await.unwrap().starts_with(s)); + // Send the 'Debugger.resume' request. + let s = format!(r#"{{"id":{},"method":"Debugger.resume"}}"#, request_id); + socket_tx.send(s.into()).await.unwrap(); + // Expect confirmation of the 'Debugger.resume' request. + let s = format!(r#"{{"id":{},"result":{{}}}}"#, request_id); + assert_eq!(socket_rx.next().await.unwrap(), s); + let s = r#"{"method":"Debugger.resumed","params":{}}"#; + assert_eq!(socket_rx.next().await.unwrap(), s); + } + + // Check that we can gracefully close the websocket connection. + socket_tx.close().await.unwrap(); + socket_rx.for_each(|_| async {}).await; + + assert_eq!(&stdout_lines.next().unwrap(), "done"); + assert!(child.wait().unwrap().success()); } mod util {