1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

refactor(core): cleanup Inspector implementation (#12962)

This commit is contained in:
Bartek Iwańczuk 2021-12-28 17:40:42 +01:00 committed by GitHub
parent 07618c861e
commit ee7ab81768
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 115 additions and 94 deletions

View file

@ -37,10 +37,16 @@ use std::rc::Rc;
use std::sync::Arc;
use std::thread;
/// If first argument is `None` then it's a notification, otherwise
/// it's a message.
pub type SessionProxySender = UnboundedSender<(Option<i32>, String)>;
pub type SessionProxyReceiver = UnboundedReceiver<Vec<u8>>;
pub enum InspectorMsgKind {
Notification,
Message(i32),
}
pub struct InspectorMsg {
pub kind: InspectorMsgKind,
pub content: String,
}
pub type SessionProxySender = UnboundedSender<InspectorMsg>;
pub type SessionProxyReceiver = UnboundedReceiver<String>;
/// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form
/// a duplex channel for sending/receiving messages in V8 session.
@ -89,7 +95,7 @@ impl Drop for JsRuntimeInspector {
// deleted, however InspectorSession also has a drop handler that cleans
// up after itself. To avoid a double free, make sure the inspector is
// dropped last.
take(&mut *self.sessions.borrow_mut());
self.sessions.borrow_mut().drop_sessions();
// Notify counterparty that this instance is being destroyed. Ignoring
// result because counterparty waiting for the signal might have already
@ -154,24 +160,25 @@ impl JsRuntimeInspector {
let v8_inspector_client =
v8::inspector::V8InspectorClientBase::new::<Self>();
let flags = InspectorFlags::new();
let waker = InspectorWaker::new(scope.thread_safe_handle());
// Create JsRuntimeInspector instance.
let mut self_ = Box::new(Self {
v8_inspector_client,
v8_inspector: Default::default(),
sessions: Default::default(),
sessions: RefCell::new(SessionContainer::temporary_placeholder()),
new_session_tx,
flags,
flags: Default::default(),
waker,
deregister_tx: None,
});
self_.v8_inspector = Rc::new(RefCell::new(
v8::inspector::V8Inspector::create(scope, &mut *self_).into(),
));
self_.sessions =
SessionContainer::new(self_.v8_inspector.clone(), new_session_rx);
self_.sessions = RefCell::new(SessionContainer::new(
self_.v8_inspector.clone(),
new_session_rx,
));
// Tell the inspector about the global context.
let context = v8::Local::new(scope, context);
@ -184,15 +191,14 @@ impl JsRuntimeInspector {
.context_created(context, Self::CONTEXT_GROUP_ID, context_name);
// Poll the session handler so we will get notified whenever there is
// new_incoming debugger activity.
// new incoming debugger activity.
let _ = self_.poll_sessions(None).unwrap();
self_
}
pub fn has_active_sessions(&self) -> bool {
let sessions = self.sessions.borrow();
!sessions.established.is_empty() || sessions.handshake.is_some()
self.sessions.borrow().has_active_sessions()
}
fn poll_sessions(
@ -236,8 +242,12 @@ impl JsRuntimeInspector {
}
// Accept new connections.
match sessions.new_incoming.poll_next_unpin(cx) {
Poll::Ready(Some(session)) => {
match sessions.session_rx.poll_next_unpin(cx) {
Poll::Ready(Some(session_proxy)) => {
let session = InspectorSession::new(
sessions.v8_inspector.clone(),
session_proxy,
);
let prev = sessions.handshake.replace(session);
assert!(prev.is_none());
continue;
@ -374,17 +384,11 @@ struct InspectorFlags {
on_pause: bool,
}
impl InspectorFlags {
fn new() -> RefCell<Self> {
let self_ = Self::default();
RefCell::new(self_)
}
}
/// A helper structure that helps coordinate sessions during different
/// parts of their lifecycle.
struct SessionContainer {
new_incoming: Pin<Box<dyn Stream<Item = Box<InspectorSession>> + 'static>>,
v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>,
session_rx: UnboundedReceiver<InspectorSessionProxy>,
handshake: Option<Box<InspectorSession>>,
established: FuturesUnordered<Box<InspectorSession>>,
}
@ -393,24 +397,38 @@ impl SessionContainer {
fn new(
v8_inspector: Rc<RefCell<v8::UniquePtr<v8::inspector::V8Inspector>>>,
new_session_rx: UnboundedReceiver<InspectorSessionProxy>,
) -> RefCell<Self> {
let new_incoming = new_session_rx
.map(move |session_proxy| {
InspectorSession::new(v8_inspector.clone(), session_proxy)
})
.boxed_local();
let self_ = Self {
new_incoming,
..Default::default()
};
RefCell::new(self_)
}
}
impl Default for SessionContainer {
fn default() -> Self {
) -> Self {
Self {
new_incoming: stream::empty().boxed_local(),
v8_inspector,
session_rx: new_session_rx,
handshake: None,
established: FuturesUnordered::new(),
}
}
/// V8 automatically deletes all sessions when an `V8Inspector` instance is
/// deleted, however InspectorSession also has a drop handler that cleans
/// up after itself. To avoid a double free, we need to manually drop
/// all sessions before dropping the inspector instance.
fn drop_sessions(&mut self) {
self.v8_inspector = Default::default();
self.handshake.take();
self.established.clear();
}
fn has_active_sessions(&self) -> bool {
!self.established.is_empty() || self.handshake.is_some()
}
/// A temporary placeholder that should be used before actual
/// instance of V8Inspector is created. It's used in favor
/// of `Default` implementation to signal that it's not meant
/// for actual use.
fn temporary_placeholder() -> Self {
let (_tx, rx) = mpsc::unbounded::<InspectorSessionProxy>();
Self {
v8_inspector: Default::default(),
session_rx: rx,
handshake: None,
established: FuturesUnordered::new(),
}
@ -535,11 +553,14 @@ impl InspectorSession {
fn send_message(
&self,
maybe_call_id: Option<i32>,
msg_kind: InspectorMsgKind,
msg: v8::UniquePtr<v8::inspector::StringBuffer>,
) {
let msg = msg.unwrap().string().to_string();
let _ = self.proxy.tx.unbounded_send((maybe_call_id, msg));
let _ = self.proxy.tx.unbounded_send(InspectorMsg {
kind: msg_kind,
content: msg,
});
}
pub fn break_on_next_statement(&mut self) {
@ -567,14 +588,14 @@ impl v8::inspector::ChannelImpl for InspectorSession {
call_id: i32,
message: v8::UniquePtr<v8::inspector::StringBuffer>,
) {
self.send_message(Some(call_id), message);
self.send_message(InspectorMsgKind::Message(call_id), message);
}
fn send_notification(
&mut self,
message: v8::UniquePtr<v8::inspector::StringBuffer>,
) {
self.send_message(None, message);
self.send_message(InspectorMsgKind::Notification, message);
}
fn flush_protocol_notifications(&mut self) {}
@ -587,7 +608,7 @@ impl Future for InspectorSession {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
while let Poll::Ready(maybe_msg) = self.proxy.rx.poll_next_unpin(cx) {
if let Some(msg) = maybe_msg {
let msg = v8::inspector::StringView::from(msg.as_slice());
let msg = v8::inspector::StringView::from(msg.as_bytes());
let mut v8_session = self.v8_session.borrow_mut();
let v8_session_ptr = v8_session.as_mut();
v8_session_ptr.dispatch_protocol_message(msg);
@ -603,8 +624,8 @@ impl Future for InspectorSession {
/// A local inspector session that can be used to send and receive protocol messages directly on
/// the same thread as an isolate.
pub struct LocalInspectorSession {
v8_session_tx: UnboundedSender<Vec<u8>>,
v8_session_rx: UnboundedReceiver<(Option<i32>, String)>,
v8_session_tx: UnboundedSender<String>,
v8_session_rx: UnboundedReceiver<InspectorMsg>,
response_tx_map: HashMap<i32, oneshot::Sender<serde_json::Value>>,
next_message_id: i32,
notification_queue: Vec<Value>,
@ -612,8 +633,8 @@ pub struct LocalInspectorSession {
impl LocalInspectorSession {
pub fn new(
v8_session_tx: UnboundedSender<Vec<u8>>,
v8_session_rx: UnboundedReceiver<(Option<i32>, String)>,
v8_session_tx: UnboundedSender<String>,
v8_session_rx: UnboundedReceiver<InspectorMsg>,
) -> Self {
let response_tx_map = HashMap::new();
let next_message_id = 0;
@ -651,11 +672,8 @@ impl LocalInspectorSession {
"params": params,
});
let raw_message = serde_json::to_string(&message).unwrap();
self
.v8_session_tx
.unbounded_send(raw_message.as_bytes().to_vec())
.unwrap();
let stringified_msg = serde_json::to_string(&message).unwrap();
self.v8_session_tx.unbounded_send(stringified_msg).unwrap();
loop {
let receive_fut = self.receive_from_v8_session().boxed_local();
@ -675,14 +693,14 @@ impl LocalInspectorSession {
}
async fn receive_from_v8_session(&mut self) {
let (maybe_call_id, message) = self.v8_session_rx.next().await.unwrap();
// If there's no call_id then it's a notification
if let Some(call_id) = maybe_call_id {
let message: serde_json::Value = match serde_json::from_str(&message) {
let inspector_msg = self.v8_session_rx.next().await.unwrap();
if let InspectorMsgKind::Message(msg_id) = inspector_msg.kind {
let message: serde_json::Value =
match serde_json::from_str(&inspector_msg.content) {
Ok(v) => v,
Err(error) => match error.classify() {
serde_json::error::Category::Syntax => json!({
"id": call_id,
"id": msg_id,
"result": {
"result": {
"type": "error",
@ -703,12 +721,12 @@ impl LocalInspectorSession {
self
.response_tx_map
.remove(&call_id)
.remove(&msg_id)
.unwrap()
.send(message)
.unwrap();
} else {
let message = serde_json::from_str(&message).unwrap();
let message = serde_json::from_str(&inspector_msg.content).unwrap();
self.notification_queue.push(message);
}
}

View file

@ -45,6 +45,8 @@ pub use crate::async_cell::AsyncRefFuture;
pub use crate::async_cell::RcLike;
pub use crate::async_cell::RcRef;
pub use crate::flags::v8_set_flags;
pub use crate::inspector::InspectorMsg;
pub use crate::inspector::InspectorMsgKind;
pub use crate::inspector::InspectorSessionProxy;
pub use crate::inspector::JsRuntimeInspector;
pub use crate::inspector::LocalInspectorSession;

View file

@ -16,9 +16,11 @@ use deno_core::futures::task::Poll;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_core::InspectorMsg;
use deno_core::InspectorSessionProxy;
use deno_core::JsRuntime;
use deno_websocket::tokio_tungstenite::tungstenite;
use deno_websocket::tokio_tungstenite::WebSocketStream;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::Infallible;
@ -169,8 +171,7 @@ fn handle_ws_request(
eprintln!("Inspector server failed to upgrade to WS connection");
return;
};
let websocket =
deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
let websocket = WebSocketStream::from_raw_socket(
upgraded,
tungstenite::protocol::Role::Server,
None,
@ -330,26 +331,26 @@ async fn server(
/// 'futures' crate, therefore they can't participate in Tokio's cooperative
/// task yielding.
async fn pump_websocket_messages(
websocket: deno_websocket::tokio_tungstenite::WebSocketStream<
hyper::upgrade::Upgraded,
>,
inbound_tx: UnboundedSender<Vec<u8>>,
outbound_rx: UnboundedReceiver<(Option<i32>, String)>,
websocket: WebSocketStream<hyper::upgrade::Upgraded>,
inbound_tx: UnboundedSender<String>,
outbound_rx: UnboundedReceiver<InspectorMsg>,
) {
let (websocket_tx, websocket_rx) = websocket.split();
let outbound_pump = outbound_rx
.map(|(_maybe_call_id, msg)| tungstenite::Message::text(msg))
.map(|msg| tungstenite::Message::text(msg.content))
.map(Ok)
.forward(websocket_tx)
.map_err(|_| ());
let inbound_pump = async move {
let _result = websocket_rx
.map_ok(|msg| msg.into_data())
.map_err(AnyError::from)
.map_ok(|msg| {
let _ = inbound_tx.unbounded_send(msg);
// Messages that cannot be converted to strings are ignored.
if let Ok(msg_text) = msg.into_text() {
let _ = inbound_tx.unbounded_send(msg_text);
}
})
.try_collect::<()>()
.await;