From ee7ab81768c593e99774b629c82c1784204a1cb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 28 Dec 2021 17:40:42 +0100 Subject: [PATCH] refactor(core): cleanup Inspector implementation (#12962) --- core/inspector.rs | 176 ++++++++++++++++++++---------------- core/lib.rs | 2 + runtime/inspector_server.rs | 31 ++++--- 3 files changed, 115 insertions(+), 94 deletions(-) diff --git a/core/inspector.rs b/core/inspector.rs index 8a686dc635..5dd3b00558 100644 --- a/core/inspector.rs +++ b/core/inspector.rs @@ -37,10 +37,16 @@ use std::rc::Rc; use std::sync::Arc; use std::thread; -/// If first argument is `None` then it's a notification, otherwise -/// it's a message. -pub type SessionProxySender = UnboundedSender<(Option, String)>; -pub type SessionProxyReceiver = UnboundedReceiver>; +pub enum InspectorMsgKind { + Notification, + Message(i32), +} +pub struct InspectorMsg { + pub kind: InspectorMsgKind, + pub content: String, +} +pub type SessionProxySender = UnboundedSender; +pub type SessionProxyReceiver = UnboundedReceiver; /// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form /// a duplex channel for sending/receiving messages in V8 session. @@ -89,7 +95,7 @@ impl Drop for JsRuntimeInspector { // deleted, however InspectorSession also has a drop handler that cleans // up after itself. To avoid a double free, make sure the inspector is // dropped last. - take(&mut *self.sessions.borrow_mut()); + self.sessions.borrow_mut().drop_sessions(); // Notify counterparty that this instance is being destroyed. Ignoring // result because counterparty waiting for the signal might have already @@ -154,24 +160,25 @@ impl JsRuntimeInspector { let v8_inspector_client = v8::inspector::V8InspectorClientBase::new::(); - let flags = InspectorFlags::new(); let waker = InspectorWaker::new(scope.thread_safe_handle()); // Create JsRuntimeInspector instance. let mut self_ = Box::new(Self { v8_inspector_client, v8_inspector: Default::default(), - sessions: Default::default(), + sessions: RefCell::new(SessionContainer::temporary_placeholder()), new_session_tx, - flags, + flags: Default::default(), waker, deregister_tx: None, }); self_.v8_inspector = Rc::new(RefCell::new( v8::inspector::V8Inspector::create(scope, &mut *self_).into(), )); - self_.sessions = - SessionContainer::new(self_.v8_inspector.clone(), new_session_rx); + self_.sessions = RefCell::new(SessionContainer::new( + self_.v8_inspector.clone(), + new_session_rx, + )); // Tell the inspector about the global context. let context = v8::Local::new(scope, context); @@ -184,15 +191,14 @@ impl JsRuntimeInspector { .context_created(context, Self::CONTEXT_GROUP_ID, context_name); // Poll the session handler so we will get notified whenever there is - // new_incoming debugger activity. + // new incoming debugger activity. let _ = self_.poll_sessions(None).unwrap(); self_ } pub fn has_active_sessions(&self) -> bool { - let sessions = self.sessions.borrow(); - !sessions.established.is_empty() || sessions.handshake.is_some() + self.sessions.borrow().has_active_sessions() } fn poll_sessions( @@ -236,8 +242,12 @@ impl JsRuntimeInspector { } // Accept new connections. - match sessions.new_incoming.poll_next_unpin(cx) { - Poll::Ready(Some(session)) => { + match sessions.session_rx.poll_next_unpin(cx) { + Poll::Ready(Some(session_proxy)) => { + let session = InspectorSession::new( + sessions.v8_inspector.clone(), + session_proxy, + ); let prev = sessions.handshake.replace(session); assert!(prev.is_none()); continue; @@ -374,17 +384,11 @@ struct InspectorFlags { on_pause: bool, } -impl InspectorFlags { - fn new() -> RefCell { - let self_ = Self::default(); - RefCell::new(self_) - } -} - /// A helper structure that helps coordinate sessions during different /// parts of their lifecycle. struct SessionContainer { - new_incoming: Pin> + 'static>>, + v8_inspector: Rc>>, + session_rx: UnboundedReceiver, handshake: Option>, established: FuturesUnordered>, } @@ -393,24 +397,38 @@ impl SessionContainer { fn new( v8_inspector: Rc>>, new_session_rx: UnboundedReceiver, - ) -> RefCell { - let new_incoming = new_session_rx - .map(move |session_proxy| { - InspectorSession::new(v8_inspector.clone(), session_proxy) - }) - .boxed_local(); - let self_ = Self { - new_incoming, - ..Default::default() - }; - RefCell::new(self_) - } -} - -impl Default for SessionContainer { - fn default() -> Self { + ) -> Self { Self { - new_incoming: stream::empty().boxed_local(), + v8_inspector, + session_rx: new_session_rx, + handshake: None, + established: FuturesUnordered::new(), + } + } + + /// V8 automatically deletes all sessions when an `V8Inspector` instance is + /// deleted, however InspectorSession also has a drop handler that cleans + /// up after itself. To avoid a double free, we need to manually drop + /// all sessions before dropping the inspector instance. + fn drop_sessions(&mut self) { + self.v8_inspector = Default::default(); + self.handshake.take(); + self.established.clear(); + } + + fn has_active_sessions(&self) -> bool { + !self.established.is_empty() || self.handshake.is_some() + } + + /// A temporary placeholder that should be used before actual + /// instance of V8Inspector is created. It's used in favor + /// of `Default` implementation to signal that it's not meant + /// for actual use. + fn temporary_placeholder() -> Self { + let (_tx, rx) = mpsc::unbounded::(); + Self { + v8_inspector: Default::default(), + session_rx: rx, handshake: None, established: FuturesUnordered::new(), } @@ -535,11 +553,14 @@ impl InspectorSession { fn send_message( &self, - maybe_call_id: Option, + msg_kind: InspectorMsgKind, msg: v8::UniquePtr, ) { let msg = msg.unwrap().string().to_string(); - let _ = self.proxy.tx.unbounded_send((maybe_call_id, msg)); + let _ = self.proxy.tx.unbounded_send(InspectorMsg { + kind: msg_kind, + content: msg, + }); } pub fn break_on_next_statement(&mut self) { @@ -567,14 +588,14 @@ impl v8::inspector::ChannelImpl for InspectorSession { call_id: i32, message: v8::UniquePtr, ) { - self.send_message(Some(call_id), message); + self.send_message(InspectorMsgKind::Message(call_id), message); } fn send_notification( &mut self, message: v8::UniquePtr, ) { - self.send_message(None, message); + self.send_message(InspectorMsgKind::Notification, message); } fn flush_protocol_notifications(&mut self) {} @@ -587,7 +608,7 @@ impl Future for InspectorSession { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { while let Poll::Ready(maybe_msg) = self.proxy.rx.poll_next_unpin(cx) { if let Some(msg) = maybe_msg { - let msg = v8::inspector::StringView::from(msg.as_slice()); + let msg = v8::inspector::StringView::from(msg.as_bytes()); let mut v8_session = self.v8_session.borrow_mut(); let v8_session_ptr = v8_session.as_mut(); v8_session_ptr.dispatch_protocol_message(msg); @@ -603,8 +624,8 @@ impl Future for InspectorSession { /// A local inspector session that can be used to send and receive protocol messages directly on /// the same thread as an isolate. pub struct LocalInspectorSession { - v8_session_tx: UnboundedSender>, - v8_session_rx: UnboundedReceiver<(Option, String)>, + v8_session_tx: UnboundedSender, + v8_session_rx: UnboundedReceiver, response_tx_map: HashMap>, next_message_id: i32, notification_queue: Vec, @@ -612,8 +633,8 @@ pub struct LocalInspectorSession { impl LocalInspectorSession { pub fn new( - v8_session_tx: UnboundedSender>, - v8_session_rx: UnboundedReceiver<(Option, String)>, + v8_session_tx: UnboundedSender, + v8_session_rx: UnboundedReceiver, ) -> Self { let response_tx_map = HashMap::new(); let next_message_id = 0; @@ -651,11 +672,8 @@ impl LocalInspectorSession { "params": params, }); - let raw_message = serde_json::to_string(&message).unwrap(); - self - .v8_session_tx - .unbounded_send(raw_message.as_bytes().to_vec()) - .unwrap(); + let stringified_msg = serde_json::to_string(&message).unwrap(); + self.v8_session_tx.unbounded_send(stringified_msg).unwrap(); loop { let receive_fut = self.receive_from_v8_session().boxed_local(); @@ -675,40 +693,40 @@ impl LocalInspectorSession { } async fn receive_from_v8_session(&mut self) { - let (maybe_call_id, message) = self.v8_session_rx.next().await.unwrap(); - // If there's no call_id then it's a notification - if let Some(call_id) = maybe_call_id { - let message: serde_json::Value = match serde_json::from_str(&message) { - Ok(v) => v, - Err(error) => match error.classify() { - serde_json::error::Category::Syntax => json!({ - "id": call_id, - "result": { + let inspector_msg = self.v8_session_rx.next().await.unwrap(); + if let InspectorMsgKind::Message(msg_id) = inspector_msg.kind { + let message: serde_json::Value = + match serde_json::from_str(&inspector_msg.content) { + Ok(v) => v, + Err(error) => match error.classify() { + serde_json::error::Category::Syntax => json!({ + "id": msg_id, "result": { - "type": "error", - "description": "Unterminated string literal", - "value": "Unterminated string literal", + "result": { + "type": "error", + "description": "Unterminated string literal", + "value": "Unterminated string literal", + }, + "exceptionDetails": { + "exceptionId": 0, + "text": "Unterminated string literal", + "lineNumber": 0, + "columnNumber": 0 + }, }, - "exceptionDetails": { - "exceptionId": 0, - "text": "Unterminated string literal", - "lineNumber": 0, - "columnNumber": 0 - }, - }, - }), - _ => panic!("Could not parse inspector message"), - }, - }; + }), + _ => panic!("Could not parse inspector message"), + }, + }; self .response_tx_map - .remove(&call_id) + .remove(&msg_id) .unwrap() .send(message) .unwrap(); } else { - let message = serde_json::from_str(&message).unwrap(); + let message = serde_json::from_str(&inspector_msg.content).unwrap(); self.notification_queue.push(message); } } diff --git a/core/lib.rs b/core/lib.rs index 95c5d36cb1..606304ad57 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -45,6 +45,8 @@ pub use crate::async_cell::AsyncRefFuture; pub use crate::async_cell::RcLike; pub use crate::async_cell::RcRef; pub use crate::flags::v8_set_flags; +pub use crate::inspector::InspectorMsg; +pub use crate::inspector::InspectorMsgKind; pub use crate::inspector::InspectorSessionProxy; pub use crate::inspector::JsRuntimeInspector; pub use crate::inspector::LocalInspectorSession; diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs index 99aff06446..04a8db959d 100644 --- a/runtime/inspector_server.rs +++ b/runtime/inspector_server.rs @@ -16,9 +16,11 @@ use deno_core::futures::task::Poll; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; +use deno_core::InspectorMsg; use deno_core::InspectorSessionProxy; use deno_core::JsRuntime; use deno_websocket::tokio_tungstenite::tungstenite; +use deno_websocket::tokio_tungstenite::WebSocketStream; use std::cell::RefCell; use std::collections::HashMap; use std::convert::Infallible; @@ -169,13 +171,12 @@ fn handle_ws_request( eprintln!("Inspector server failed to upgrade to WS connection"); return; }; - let websocket = - deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( - upgraded, - tungstenite::protocol::Role::Server, - None, - ) - .await; + let websocket = WebSocketStream::from_raw_socket( + upgraded, + tungstenite::protocol::Role::Server, + None, + ) + .await; // The 'outbound' channel carries messages sent to the websocket. let (outbound_tx, outbound_rx) = mpsc::unbounded(); @@ -330,26 +331,26 @@ async fn server( /// 'futures' crate, therefore they can't participate in Tokio's cooperative /// task yielding. async fn pump_websocket_messages( - websocket: deno_websocket::tokio_tungstenite::WebSocketStream< - hyper::upgrade::Upgraded, - >, - inbound_tx: UnboundedSender>, - outbound_rx: UnboundedReceiver<(Option, String)>, + websocket: WebSocketStream, + inbound_tx: UnboundedSender, + outbound_rx: UnboundedReceiver, ) { let (websocket_tx, websocket_rx) = websocket.split(); let outbound_pump = outbound_rx - .map(|(_maybe_call_id, msg)| tungstenite::Message::text(msg)) + .map(|msg| tungstenite::Message::text(msg.content)) .map(Ok) .forward(websocket_tx) .map_err(|_| ()); let inbound_pump = async move { let _result = websocket_rx - .map_ok(|msg| msg.into_data()) .map_err(AnyError::from) .map_ok(|msg| { - let _ = inbound_tx.unbounded_send(msg); + // Messages that cannot be converted to strings are ignored. + if let Ok(msg_text) = msg.into_text() { + let _ = inbound_tx.unbounded_send(msg_text); + } }) .try_collect::<()>() .await;