From 83b6085604c28124597f6e668ca4a93941999d6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sun, 4 Dec 2022 21:08:53 +0100 Subject: [PATCH] revert: Inspector changes (#16939) Reverts 66dc54a7f and e2a0c3f0 Closes https://github.com/denoland/deno/issues/16926 --- cli/tests/inspector_tests.rs | 104 ------------ core/inspector.rs | 300 +++++++++++++++-------------------- 2 files changed, 132 insertions(+), 272 deletions(-) diff --git a/cli/tests/inspector_tests.rs b/cli/tests/inspector_tests.rs index 853bb80443..fd3e886e4c 100644 --- a/cli/tests/inspector_tests.rs +++ b/cli/tests/inspector_tests.rs @@ -1303,108 +1303,4 @@ mod inspector { child.kill().unwrap(); child.wait().unwrap(); } - - // https://github.com/denoland/deno/issues/11570 - #[tokio::test] - async fn inspector_repl_debugger_statement() { - let mut child = util::deno_cmd() - .arg("repl") - .arg(inspect_flag_with_unique_port("--inspect")) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn() - .unwrap(); - - let stderr = child.stderr.as_mut().unwrap(); - let mut stderr_lines = std::io::BufReader::new(stderr) - .lines() - .map(|r| r.unwrap()) - .filter(|s| s.as_str() != "Debugger session started."); - let ws_url = extract_ws_url_from_stderr(&mut stderr_lines); - - let (socket, response) = - tokio_tungstenite::connect_async(ws_url).await.unwrap(); - assert_eq!(response.status(), 101); // Switching protocols. - - let (mut socket_tx, socket_rx) = socket.split(); - let mut socket_rx = socket_rx - .map(|msg| msg.unwrap().to_string()) - .filter(|msg| { - let pass = !msg.starts_with(r#"{"method":"Debugger.scriptParsed","#); - futures::future::ready(pass) - }) - .boxed_local(); - - let stdin = child.stdin.take().unwrap(); - - let stdout = child.stdout.as_mut().unwrap(); - let mut stdout_lines = std::io::BufReader::new(stdout) - .lines() - .map(|r| r.unwrap()) - .filter(|s| !s.starts_with("Deno ")); - - assert_stderr_for_inspect(&mut stderr_lines); - assert_eq!( - &stdout_lines.next().unwrap(), - "exit using ctrl+d, ctrl+c, or close()" - ); - - assert_inspector_messages( - &mut socket_tx, - &[ - r#"{"id":1,"method":"Runtime.enable"}"#, - r#"{"id":2,"method":"Debugger.enable"}"#, - ], - &mut socket_rx, - &[ - r#"{"id":1,"result":{}}"#, - r#"{"id":2,"result":{"debuggerId":"#, - ], - &[ - r#"{"method":"Runtime.executionContextCreated","params":{"context":{"id":1,"#, - ], - ) - .await; - - assert_inspector_messages( - &mut socket_tx, - &[ - r#"{"id":3,"method":"Runtime.evaluate","params":{"expression":"debugger","objectGroup":"console","includeCommandLineAPI":true,"silent":false,"contextId":1,"returnByValue":true,"generatePreview":true,"userGesture":true,"awaitPromise":false,"replMode":true}}"#, - ], - &mut socket_rx, - &[], - &[ - r#"{"method":"Debugger.paused""#, - ], - ).await; - assert_inspector_messages( - &mut socket_tx, - &[ - r#"{"id":4,"method":"Debugger.resume","params":{"terminateOnResume":false}}"#, - ], - &mut socket_rx, - &[ - r#"{"id":4,"result":{}}"#, - r#"{"id":3,"result":{"result":{"type":"undefined"}}}"#, - ], - &[ - r#"{"method":"Debugger.resumed""#, - ], - ).await; - assert_inspector_messages( - &mut socket_tx, - &[ - r#"{"id":5,"method":"Runtime.evaluate","params":{"expression":"1","objectGroup":"console","includeCommandLineAPI":true,"silent":false,"contextId":1,"returnByValue":true,"generatePreview":true,"userGesture":true,"awaitPromise":false,"replMode":true}}"#, - ], - &mut socket_rx, - &[ - r#"{"id":5,"result":{"result":{"type":"number","value":1,"description":"1"}}}"#, - ], - &[ - ], - ).await; - drop(stdin); - child.wait().unwrap(); - } } diff --git a/core/inspector.rs b/core/inspector.rs index 30129f94be..8a91360919 100644 --- a/core/inspector.rs +++ b/core/inspector.rs @@ -23,10 +23,10 @@ use crate::serde_json::json; use crate::serde_json::Value; use anyhow::Error; use parking_lot::Mutex; +use std::cell::BorrowMutError; use std::cell::RefCell; use std::collections::HashMap; use std::ffi::c_void; -use std::mem::replace; use std::mem::take; use std::mem::MaybeUninit; use std::pin::Pin; @@ -34,6 +34,7 @@ use std::ptr; use std::ptr::NonNull; use std::rc::Rc; use std::sync::Arc; +use std::thread; use v8::HandleScope; pub enum InspectorMsgKind { @@ -54,21 +55,12 @@ pub struct InspectorSessionProxy { pub rx: SessionProxyReceiver, } -#[derive(Clone, Copy, Debug, PartialEq)] +#[derive(Clone, Copy)] enum PollState { - // Inspector is not being polled at this moment, it's waiting for more events - // from the inspector. Idle, - // `InspectorWaker` has been called - either explicitly by outside code - // (like WS server), or from one of the futures we were polling. Woken, - // Inspector is being polled asynchronously from the owning runtime. Polling, - // Inspector is being polled synchronously, possibly in a reentrant way - // (e.g. from a callback invoked by V8). - SyncPolling, - // Inspector has been dropped already, but wakers might outlive the inspector - // so make sure nothing gets woken at this point. + Parked, Dropped, } @@ -123,24 +115,14 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector { &mut self.v8_inspector_client } - /// This method id called when a breakpoint is triggered, eg. using `debugger` statement. In that case - /// inspector sends `Debugger.paused` notification. Nested message loop should be run and process all - /// sent protocol commands until `quit_message_loop_on_pause` is called. After that execution will - /// return to inspector and then JavaScript execution will resume. fn run_message_loop_on_pause(&mut self, context_group_id: i32) { assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID); self.flags.borrow_mut().on_pause = true; - self.poll_sessions_sync(); - assert!( - !self.flags.borrow().on_pause, - "V8InspectorClientImpl::run_message_loop_on_pause returned before quit_message_loop_on_pause was called" - ); + let _ = self.poll_sessions(None); } fn quit_message_loop_on_pause(&mut self) { - let mut flags = self.flags.borrow_mut(); - assert!(flags.on_pause); - flags.on_pause = false; + self.flags.borrow_mut().on_pause = false; } fn run_if_waiting_for_debugger(&mut self, context_group_id: i32) { @@ -157,10 +139,7 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector { impl Future for JsRuntimeInspector { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { - // Here we actually want to set up waker so we are notified when new - // messages arrive. Note that other call sites might want to reenter - // and pump sessions synchronously. - self.poll_sessions(cx) + self.poll_sessions(Some(cx)).unwrap() } } @@ -227,7 +206,9 @@ impl JsRuntimeInspector { aux_data_view, ); - self_.poll_sessions_sync(); + // Poll the session handler so we will get notified whenever there is + // new incoming debugger activity. + let _ = self_.poll_sessions(None).unwrap(); drop(self_); self__ @@ -255,55 +236,20 @@ impl JsRuntimeInspector { self.sessions.borrow().has_blocking_sessions() } - fn poll_sessions_sync(&self) { - let (prev_poll_state, mut prev_task_waker) = self.waker.update(|w| { - let prev_poll_state = replace(&mut w.poll_state, PollState::SyncPolling); - assert!(prev_poll_state != PollState::SyncPolling); + fn poll_sessions( + &self, + mut invoker_cx: Option<&mut Context>, + ) -> Result, BorrowMutError> { + // The futures this function uses do not have re-entrant poll() functions. + // However it is can happpen that poll_sessions() gets re-entered, e.g. + // when an interrupt request is honored while the inspector future is polled + // by the task executor. We let the caller know by returning some error. + let mut sessions = self.sessions.try_borrow_mut()?; - let prev_task_waker = w.task_waker.take(); - - (prev_poll_state, prev_task_waker) - }); - - futures::executor::block_on(futures::future::poll_fn(|cx| { - self.poll_sessions_inner(cx); - - // Block the thread if either the `on_pause` or the `waiting_for_session`. - // is set. Otherwise, return `Ready(_)` to make `block_on()` return. - let flags = self.flags.borrow(); - if flags.on_pause || flags.waiting_for_session { - Poll::Pending - } else { - Poll::Ready(()) - } - })); - - // Restore the previous poll state. - self.waker.update(|w| { - let replaced = replace(&mut w.poll_state, prev_poll_state); - assert_eq!(replaced, PollState::SyncPolling); - }); - - // The `block_on(...)` call above must have created a new `Waker` that will - // now be registered with `sessions.session_rx` and `sessions.established`. - // This has the consequence that when either of those streams transitions - // from `Pending` to `Ready`, they'll wake that (stale) waker, and the - // inspector task won't get notified. To avoid a hang, explicitly wake the - // inspector task here; when it gets polled, it will re-register the right - // waker (the `InspectorWaker`) with those streams. - if let Some(waker) = prev_task_waker.take() { - waker.wake(); - } - } - - fn poll_sessions(&self, invoker_cx: &mut Context) -> Poll<()> { self.waker.update(|w| { match w.poll_state { - PollState::Idle | PollState::Woken => { - w.poll_state = PollState::Polling; - w.inspector_ptr = Some(NonNull::from(self)); - } - s => unreachable!("state in poll_sessions {:#?}", s), + PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling, + _ => unreachable!(), }; }); @@ -313,83 +259,94 @@ impl JsRuntimeInspector { let cx = &mut Context::from_waker(&waker_ref); loop { - self.poll_sessions_inner(cx); + loop { + // Do one "handshake" with a newly connected session at a time. + if let Some(mut session) = sessions.handshake.take() { + let poll_result = session.poll_next_unpin(cx); + match poll_result { + Poll::Pending => { + sessions.established.push(session); + continue; + } + Poll::Ready(Some(session_stream_item)) => { + let (v8_session_ptr, msg) = session_stream_item; + InspectorSession::dispatch_message(v8_session_ptr, msg); + sessions.established.push(session); + continue; + } + Poll::Ready(None) => {} + } + } - { - let flags = self.flags.borrow(); - assert!(!flags.on_pause); - assert!(!flags.waiting_for_session); + // Accept new connections. + let poll_result = sessions.session_rx.poll_next_unpin(cx); + if let Poll::Ready(Some(session_proxy)) = poll_result { + let session = InspectorSession::new( + sessions.v8_inspector.clone(), + session_proxy, + false, + ); + let prev = sessions.handshake.replace(session); + assert!(prev.is_none()); + } + + // Poll established sessions. + match sessions.established.poll_next_unpin(cx) { + Poll::Ready(Some(session_stream_item)) => { + let (v8_session_ptr, msg) = session_stream_item; + InspectorSession::dispatch_message(v8_session_ptr, msg); + continue; + } + Poll::Ready(None) => break, + Poll::Pending => break, + }; } - let new_poll_state = self.waker.update(|w| { + let should_block = + self.flags.borrow().on_pause || self.flags.borrow().waiting_for_session; + + let new_state = self.waker.update(|w| { match w.poll_state { PollState::Woken => { - // The inspector got woken up before the last round of polling was - // even over, so we need to do another round. + // The inspector was woken while the session handler was being + // polled, so we poll it another time. w.poll_state = PollState::Polling; } - PollState::Polling => { - // Since all streams were polled until they all yielded `Pending`, - // there's nothing else we can do right now. + PollState::Polling if !should_block => { + // The session handler doesn't need to be polled any longer, and + // there's no reason to block (execution is not paused), so this + // function is about to return. w.poll_state = PollState::Idle; - // Capture the waker that, when used, will get the inspector polled. - w.task_waker.replace(invoker_cx.waker().clone()); + // Register the task waker that can be used to wake the parent + // task that will poll the inspector future. + if let Some(cx) = invoker_cx.take() { + w.task_waker.replace(cx.waker().clone()); + } + // Register the address of the inspector, which allows the waker + // to request an interrupt from the isolate. + w.inspector_ptr = NonNull::new(self as *const _ as *mut Self); + } + PollState::Polling if should_block => { + // Isolate execution has been paused but there are no more + // events to process, so this thread will be parked. Therefore, + // store the current thread handle in the waker so it knows + // which thread to unpark when new events arrive. + w.poll_state = PollState::Parked; + w.parked_thread.replace(thread::current()); } _ => unreachable!(), }; w.poll_state }); - - match new_poll_state { - PollState::Idle => break Poll::Pending, - PollState::Polling => continue, // Poll the session handler again. + match new_state { + PollState::Idle => break Ok(Poll::Pending), // Yield to task. + PollState::Polling => {} // Poll the session handler again. + PollState::Parked => thread::park(), // Park the thread. _ => unreachable!(), }; } } - /// Accepts incoming connections from inspector clients, and polls established - /// inspector sessions for messages that need to be dispatched to V8. This - /// function will repeatedly poll its innner streams and will not return until - /// they all yield `Pending` or have ended. - fn poll_sessions_inner(&self, cx: &mut Context) { - loop { - let mut sessions = self.sessions.borrow_mut(); - - // Accept new connections. - let poll_result = sessions.session_rx.poll_next_unpin(cx); - match poll_result { - Poll::Ready(Some(session_proxy)) => { - let session = InspectorSession::new( - self.v8_inspector.clone(), - session_proxy, - false, - ); - sessions.established.push(session); - // `session_rx` needs to be polled repeatedly until it is `Pending`. - continue; - } - Poll::Ready(None) => unreachable!(), // `session_rx` should never end. - Poll::Pending => {} - } - - // Poll established inspector sessions. - let poll_result = sessions.established.poll_next_unpin(cx); - if let Poll::Ready(Some(session_stream_item)) = poll_result { - let (v8_session_ptr, msg) = session_stream_item; - // Don't hold the borrow on sessions while dispatching a message, as it - // might result in a call to `poll_sessions_sync`. - drop(sessions); - InspectorSession::dispatch_message(v8_session_ptr, msg); - // Loop around. We need to keep polling established sessions and - // accepting new ones until eventually everything is `Pending`. - continue; - } - - break; - } - } - /// This function blocks the thread until at least one inspector client has /// established a websocket connection. /// @@ -399,12 +356,10 @@ impl JsRuntimeInspector { pub fn wait_for_session_and_break_on_next_statement(&mut self) { loop { match self.sessions.get_mut().established.iter_mut().next() { - Some(session) => { - break session.break_on_next_statement(); - } + Some(session) => break session.break_on_next_statement(), None => { self.flags.get_mut().waiting_for_session = true; - self.poll_sessions_sync(); + let _ = self.poll_sessions(None).unwrap(); } }; } @@ -468,6 +423,7 @@ struct InspectorFlags { struct SessionContainer { v8_inspector: Rc>>, session_rx: UnboundedReceiver, + handshake: Option>, established: SelectAll>, } @@ -479,6 +435,7 @@ impl SessionContainer { Self { v8_inspector, session_rx: new_session_rx, + handshake: None, established: SelectAll::new(), } } @@ -489,11 +446,12 @@ impl SessionContainer { /// all sessions before dropping the inspector instance. fn drop_sessions(&mut self) { self.v8_inspector = Default::default(); + self.handshake.take(); self.established.clear(); } fn has_active_sessions(&self) -> bool { - !self.established.is_empty() + !self.established.is_empty() || self.handshake.is_some() } fn has_blocking_sessions(&self) -> bool { @@ -509,6 +467,7 @@ impl SessionContainer { Self { v8_inspector: Default::default(), session_rx: rx, + handshake: None, established: SelectAll::new(), } } @@ -517,6 +476,7 @@ impl SessionContainer { struct InspectorWakerInner { poll_state: PollState, task_waker: Option, + parked_thread: Option, inspector_ptr: Option>, isolate_handle: v8::IsolateHandle, } @@ -531,6 +491,7 @@ impl InspectorWaker { let inner = InspectorWakerInner { poll_state: PollState::Idle, task_waker: None, + parked_thread: None, inspector_ptr: None, isolate_handle, }; @@ -546,39 +507,43 @@ impl InspectorWaker { } } -extern "C" fn handle_interrupt(_isolate: &mut v8::Isolate, arg: *mut c_void) { - // SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the - // pointer to the latter is valid as long as waker is alive. - let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) }; - inspector.poll_sessions_sync(); -} - impl task::ArcWake for InspectorWaker { fn wake_by_ref(arc_self: &Arc) { arc_self.update(|w| { - // Determine whether, given the current poll state, waking up is possible - // and necessary. If it is, change the poll state to `Woken`. match w.poll_state { - PollState::Idle | PollState::Polling => w.poll_state = PollState::Woken, - PollState::Woken => {} // Even if already woken, schedule an interrupt. - PollState::Dropped => return, // Don't do anything. - PollState::SyncPolling => panic!("wake() called while sync polling"), + PollState::Idle => { + // Wake the task, if any, that has polled the Inspector future last. + if let Some(waker) = w.task_waker.take() { + waker.wake() + } + // Request an interrupt from the isolate if it's running and there's + // not unhandled interrupt request in flight. + if let Some(arg) = w + .inspector_ptr + .take() + .map(|ptr| ptr.as_ptr() as *mut c_void) + { + w.isolate_handle.request_interrupt(handle_interrupt, arg); + } + extern "C" fn handle_interrupt( + _isolate: &mut v8::Isolate, + arg: *mut c_void, + ) { + // SAFETY: `InspectorWaker` is owned by `JsRuntimeInspector`, so the + // pointer to the latter is valid as long as waker is alive. + let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) }; + let _ = inspector.poll_sessions(None); + } + } + PollState::Parked => { + // Unpark the isolate thread. + let parked_thread = w.parked_thread.take().unwrap(); + assert_ne!(parked_thread.id(), thread::current().id()); + parked_thread.unpark(); + } + _ => {} }; - - // Wake the task, if any, that has polled the Inspector future last. - if let Some(waker) = w.task_waker.take() { - waker.wake() - } - - // Request an interrupt from the isolate, if the isolate is currently - // running and there isn't already an interrupt request in flight. - if let Some(arg) = w - .inspector_ptr - .take() - .map(|ptr| ptr.cast::().as_ptr()) - { - w.isolate_handle.request_interrupt(handle_interrupt, arg); - } + w.poll_state = PollState::Woken; }); } } @@ -603,7 +568,6 @@ impl InspectorSession { blocking: bool, ) -> Box { new_box_with(move |self_ptr| { - // TODO(bartlomieju): channel should probably be a separate struct let v8_channel = v8::inspector::ChannelBase::new::(); let mut v8_inspector = v8_inspector_rc.borrow_mut(); let v8_inspector_ptr = v8_inspector.as_mut().unwrap();