mirror of
https://github.com/denoland/deno.git
synced 2025-01-12 00:54:02 -05:00
refactor: cleanup Inspector and InspectorServer implementations (#11837)
This commit is contained in:
parent
873cce27b8
commit
f84cd9403d
4 changed files with 140 additions and 158 deletions
|
@ -41,9 +41,7 @@ use std::thread;
|
|||
/// If first argument is `None` then it's a notification, otherwise
|
||||
/// it's a message.
|
||||
pub type SessionProxySender = UnboundedSender<(Option<i32>, String)>;
|
||||
// TODO(bartlomieju): does it even need to send a Result?
|
||||
// It seems `Vec<u8>` would be enough
|
||||
pub type SessionProxyReceiver = UnboundedReceiver<Result<Vec<u8>, AnyError>>;
|
||||
pub type SessionProxyReceiver = UnboundedReceiver<Vec<u8>>;
|
||||
|
||||
/// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form
|
||||
/// a duplex channel for sending/receiving messages in V8 session.
|
||||
|
@ -89,11 +87,11 @@ pub struct JsRuntimeInspector {
|
|||
|
||||
impl Drop for JsRuntimeInspector {
|
||||
fn drop(&mut self) {
|
||||
// Since the waker is cloneable, it might outlive the inspector itself.
|
||||
// Since the waker is cloneable, it might outlive the inspector itself.
|
||||
// Set the poll state to 'dropped' so it doesn't attempt to request an
|
||||
// interrupt from the isolate.
|
||||
self.waker.update(|w| w.poll_state = PollState::Dropped);
|
||||
// TODO(bartlomieju): this comment is out of date
|
||||
|
||||
// V8 automatically deletes all sessions when an `V8Inspector` instance is
|
||||
// deleted, however InspectorSession also has a drop handler that cleans
|
||||
// up after itself. To avoid a double free, make sure the inspector is
|
||||
|
@ -134,9 +132,11 @@ impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector {
|
|||
}
|
||||
}
|
||||
|
||||
/// `JsRuntimeInspector` implements a Future so that it can poll for new incoming
|
||||
/// connections and messages from the WebSocket server. The Worker that owns
|
||||
/// this `JsRuntimeInspector` will call this function from `Worker::poll()`.
|
||||
/// Polling `JsRuntimeInspector` allows inspector to accept new incoming
|
||||
/// connections and "pump" messages in different sessions.
|
||||
///
|
||||
/// It should be polled on tick of event loop, ie. in `JsRuntime::poll_event_loop`
|
||||
/// function.
|
||||
impl Future for JsRuntimeInspector {
|
||||
type Output = ();
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
|
||||
|
@ -501,8 +501,7 @@ impl task::ArcWake for InspectorWaker {
|
|||
struct InspectorSession {
|
||||
v8_channel: v8::inspector::ChannelBase,
|
||||
v8_session: Rc<RefCell<v8::UniqueRef<v8::inspector::V8InspectorSession>>>,
|
||||
proxy_tx: SessionProxySender,
|
||||
proxy_rx_handler: Pin<Box<dyn Future<Output = ()> + 'static>>,
|
||||
proxy: InspectorSessionProxy,
|
||||
}
|
||||
|
||||
impl InspectorSession {
|
||||
|
@ -524,15 +523,10 @@ impl InspectorSession {
|
|||
v8::inspector::StringView::empty(),
|
||||
)));
|
||||
|
||||
let (proxy_tx, proxy_rx) = session_proxy.split();
|
||||
let proxy_rx_handler =
|
||||
Self::receive_from_proxy(v8_session.clone(), proxy_rx);
|
||||
|
||||
Self {
|
||||
v8_channel,
|
||||
v8_session,
|
||||
proxy_tx,
|
||||
proxy_rx_handler,
|
||||
proxy: session_proxy,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -546,46 +540,13 @@ impl InspectorSession {
|
|||
v8_session_ptr.dispatch_protocol_message(msg);
|
||||
}
|
||||
|
||||
// TODO(bartlomieju): this function should be reworked into `impl Future`
|
||||
// or `impl Stream`
|
||||
/// Returns a future that receives messages from the proxy and dispatches
|
||||
/// them to the V8 session.
|
||||
fn receive_from_proxy(
|
||||
v8_session_rc: Rc<
|
||||
RefCell<v8::UniqueRef<v8::inspector::V8InspectorSession>>,
|
||||
>,
|
||||
proxy_rx: SessionProxyReceiver,
|
||||
) -> Pin<Box<dyn Future<Output = ()> + 'static>> {
|
||||
async move {
|
||||
let result = proxy_rx
|
||||
.map_ok(move |msg| {
|
||||
let msg = v8::inspector::StringView::from(msg.as_slice());
|
||||
let mut v8_session = v8_session_rc.borrow_mut();
|
||||
let v8_session_ptr = v8_session.as_mut();
|
||||
v8_session_ptr.dispatch_protocol_message(msg);
|
||||
})
|
||||
.try_collect::<()>()
|
||||
.await;
|
||||
|
||||
// TODO(bartlomieju): ideally these prints should be moved
|
||||
// to `server.rs` as they are unwanted in context of REPL/coverage collection
|
||||
// but right now they do not pose a huge problem. Investigate how to
|
||||
// move them to `server.rs`.
|
||||
match result {
|
||||
Ok(_) => eprintln!("Debugger session ended."),
|
||||
Err(err) => eprintln!("Debugger session ended: {}.", err),
|
||||
};
|
||||
}
|
||||
.boxed_local()
|
||||
}
|
||||
|
||||
fn send_message(
|
||||
&self,
|
||||
maybe_call_id: Option<i32>,
|
||||
msg: v8::UniquePtr<v8::inspector::StringBuffer>,
|
||||
) {
|
||||
let msg = msg.unwrap().string().to_string();
|
||||
let _ = self.proxy_tx.unbounded_send((maybe_call_id, msg));
|
||||
let _ = self.proxy.tx.unbounded_send((maybe_call_id, msg));
|
||||
}
|
||||
|
||||
pub fn break_on_next_statement(&mut self) {
|
||||
|
@ -626,17 +587,30 @@ impl v8::inspector::ChannelImpl for InspectorSession {
|
|||
fn flush_protocol_notifications(&mut self) {}
|
||||
}
|
||||
|
||||
/// This is a "pump" future takes care of receiving messages and dispatching
|
||||
/// them to the inspector. It resolves when receiver closes.
|
||||
impl Future for InspectorSession {
|
||||
type Output = ();
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
self.proxy_rx_handler.poll_unpin(cx)
|
||||
while let Poll::Ready(maybe_msg) = self.proxy.rx.poll_next_unpin(cx) {
|
||||
if let Some(msg) = maybe_msg {
|
||||
let msg = v8::inspector::StringView::from(msg.as_slice());
|
||||
let mut v8_session = self.v8_session.borrow_mut();
|
||||
let v8_session_ptr = v8_session.as_mut();
|
||||
v8_session_ptr.dispatch_protocol_message(msg);
|
||||
} else {
|
||||
return Poll::Ready(());
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
/// A local inspector session that can be used to send and receive protocol messages directly on
|
||||
/// the same thread as an isolate.
|
||||
pub struct LocalInspectorSession {
|
||||
v8_session_tx: UnboundedSender<Result<Vec<u8>, AnyError>>,
|
||||
v8_session_tx: UnboundedSender<Vec<u8>>,
|
||||
v8_session_rx: UnboundedReceiver<(Option<i32>, String)>,
|
||||
response_tx_map: HashMap<i32, oneshot::Sender<serde_json::Value>>,
|
||||
next_message_id: i32,
|
||||
|
@ -645,7 +619,7 @@ pub struct LocalInspectorSession {
|
|||
|
||||
impl LocalInspectorSession {
|
||||
pub fn new(
|
||||
v8_session_tx: UnboundedSender<Result<Vec<u8>, AnyError>>,
|
||||
v8_session_tx: UnboundedSender<Vec<u8>>,
|
||||
v8_session_rx: UnboundedReceiver<(Option<i32>, String)>,
|
||||
) -> Self {
|
||||
let response_tx_map = HashMap::new();
|
||||
|
@ -687,7 +661,7 @@ impl LocalInspectorSession {
|
|||
let raw_message = serde_json::to_string(&message).unwrap();
|
||||
self
|
||||
.v8_session_tx
|
||||
.unbounded_send(Ok(raw_message.as_bytes().to_vec()))
|
||||
.unbounded_send(raw_message.as_bytes().to_vec())
|
||||
.unwrap();
|
||||
|
||||
loop {
|
||||
|
|
|
@ -17,6 +17,7 @@ use deno_core::serde_json;
|
|||
use deno_core::serde_json::json;
|
||||
use deno_core::serde_json::Value;
|
||||
use deno_core::InspectorSessionProxy;
|
||||
use deno_core::JsRuntime;
|
||||
use deno_websocket::tokio_tungstenite::tungstenite;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
|
@ -62,10 +63,13 @@ impl InspectorServer {
|
|||
|
||||
pub fn register_inspector(
|
||||
&self,
|
||||
session_sender: UnboundedSender<InspectorSessionProxy>,
|
||||
deregister_rx: oneshot::Receiver<()>,
|
||||
module_url: String,
|
||||
js_runtime: &mut JsRuntime,
|
||||
) {
|
||||
let inspector = js_runtime.inspector();
|
||||
let session_sender = inspector.get_session_sender();
|
||||
let deregister_rx = inspector.add_deregister_handler();
|
||||
// TODO(bartlomieju): simplify
|
||||
let info =
|
||||
InspectorInfo::new(self.host, session_sender, deregister_rx, module_url);
|
||||
self.register_inspector_tx.unbounded_send(info).unwrap();
|
||||
|
@ -102,63 +106,88 @@ where
|
|||
|
||||
fn handle_ws_request(
|
||||
req: http::Request<hyper::Body>,
|
||||
inspector_map: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
|
||||
inspector_map_rc: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
|
||||
) -> http::Result<http::Response<hyper::Body>> {
|
||||
let (parts, body) = req.into_parts();
|
||||
let req = http::Request::from_parts(parts, ());
|
||||
|
||||
if let Some(new_session_tx) = req
|
||||
let maybe_uuid = req
|
||||
.uri()
|
||||
.path()
|
||||
.strip_prefix("/ws/")
|
||||
.and_then(|s| Uuid::parse_str(s).ok())
|
||||
.and_then(|uuid| {
|
||||
inspector_map
|
||||
.borrow()
|
||||
.get(&uuid)
|
||||
.map(|info| info.new_session_tx.clone())
|
||||
})
|
||||
{
|
||||
let resp = tungstenite::handshake::server::create_response(&req)
|
||||
.map(|resp| resp.map(|_| hyper::Body::empty()))
|
||||
.or_else(|e| match e {
|
||||
tungstenite::error::Error::HttpFormat(http_error) => Err(http_error),
|
||||
_ => http::Response::builder()
|
||||
.status(http::StatusCode::BAD_REQUEST)
|
||||
.body("Not a valid Websocket Request".into()),
|
||||
});
|
||||
.and_then(|s| Uuid::parse_str(s).ok());
|
||||
|
||||
let (parts, _) = req.into_parts();
|
||||
let req = http::Request::from_parts(parts, body);
|
||||
|
||||
if resp.is_ok() {
|
||||
tokio::task::spawn_local(async move {
|
||||
let upgrade_result = hyper::upgrade::on(req).await;
|
||||
let upgraded = if let Ok(u) = upgrade_result {
|
||||
u
|
||||
} else {
|
||||
eprintln!("Inspector server failed to upgrade to WS connection");
|
||||
return;
|
||||
};
|
||||
let websocket =
|
||||
deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
|
||||
upgraded,
|
||||
tungstenite::protocol::Role::Server,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let (proxy, pump) = create_websocket_proxy(websocket);
|
||||
eprintln!("Debugger session started.");
|
||||
let _ = new_session_tx.unbounded_send(proxy);
|
||||
pump.await;
|
||||
});
|
||||
}
|
||||
resp
|
||||
} else {
|
||||
http::Response::builder()
|
||||
.status(http::StatusCode::NOT_FOUND)
|
||||
.body("No Valid inspector".into())
|
||||
if maybe_uuid.is_none() {
|
||||
return http::Response::builder()
|
||||
.status(http::StatusCode::BAD_REQUEST)
|
||||
.body("Malformed inspector UUID".into());
|
||||
}
|
||||
|
||||
// run in a block to not hold borrow to `inspector_map` for too long
|
||||
let new_session_tx = {
|
||||
let inspector_map = inspector_map_rc.borrow();
|
||||
let maybe_inspector_info = inspector_map.get(&maybe_uuid.unwrap());
|
||||
|
||||
if maybe_inspector_info.is_none() {
|
||||
return http::Response::builder()
|
||||
.status(http::StatusCode::NOT_FOUND)
|
||||
.body("Invalid inspector UUID".into());
|
||||
}
|
||||
|
||||
let info = maybe_inspector_info.unwrap();
|
||||
info.new_session_tx.clone()
|
||||
};
|
||||
|
||||
let resp = tungstenite::handshake::server::create_response(&req)
|
||||
.map(|resp| resp.map(|_| hyper::Body::empty()))
|
||||
.or_else(|e| match e {
|
||||
tungstenite::error::Error::HttpFormat(http_error) => Err(http_error),
|
||||
_ => http::Response::builder()
|
||||
.status(http::StatusCode::BAD_REQUEST)
|
||||
.body("Not a valid Websocket Request".into()),
|
||||
});
|
||||
|
||||
if resp.is_err() {
|
||||
return resp;
|
||||
}
|
||||
|
||||
let (parts, _) = req.into_parts();
|
||||
let req = http::Request::from_parts(parts, body);
|
||||
|
||||
// spawn a task that will wait for websocket connection and then pump messages between
|
||||
// the socket and inspector proxy
|
||||
tokio::task::spawn_local(async move {
|
||||
let upgrade_result = hyper::upgrade::on(req).await;
|
||||
let upgraded = if let Ok(u) = upgrade_result {
|
||||
u
|
||||
} else {
|
||||
eprintln!("Inspector server failed to upgrade to WS connection");
|
||||
return;
|
||||
};
|
||||
let websocket =
|
||||
deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket(
|
||||
upgraded,
|
||||
tungstenite::protocol::Role::Server,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
// The 'outbound' channel carries messages sent to the websocket.
|
||||
let (outbound_tx, outbound_rx) = mpsc::unbounded();
|
||||
// The 'inbound' channel carries messages received from the websocket.
|
||||
let (inbound_tx, inbound_rx) = mpsc::unbounded();
|
||||
|
||||
let inspector_session_proxy = InspectorSessionProxy {
|
||||
tx: outbound_tx,
|
||||
rx: inbound_rx,
|
||||
};
|
||||
|
||||
eprintln!("Debugger session started.");
|
||||
let _ = new_session_tx.unbounded_send(inspector_session_proxy);
|
||||
pump_websocket_messages(websocket, inbound_tx, outbound_rx).await;
|
||||
});
|
||||
|
||||
resp
|
||||
}
|
||||
|
||||
fn handle_json_request(
|
||||
|
@ -279,58 +308,51 @@ async fn server(
|
|||
}
|
||||
}
|
||||
|
||||
/// Creates a future that proxies messages sent and received on a warp WebSocket
|
||||
/// to a UnboundedSender/UnboundedReceiver pair. We need this to sidestep
|
||||
/// The pump future takes care of forwarding messages between the websocket
|
||||
/// and channels. It resolves when either side disconnects, ignoring any
|
||||
/// errors.
|
||||
///
|
||||
/// The future proxies messages sent and received on a warp WebSocket
|
||||
/// to a UnboundedSender/UnboundedReceiver pair. We need these "unbounded" channel ends to sidestep
|
||||
/// Tokio's task budget, which causes issues when JsRuntimeInspector::poll_sessions()
|
||||
/// needs to block the thread because JavaScript execution is paused.
|
||||
///
|
||||
/// This works because UnboundedSender/UnboundedReceiver are implemented in the
|
||||
/// 'futures' crate, therefore they can't participate in Tokio's cooperative
|
||||
/// task yielding.
|
||||
///
|
||||
/// A tuple is returned, where the first element is a duplex channel that can
|
||||
/// be used to send/receive messages on the websocket, and the second element
|
||||
/// is a future that does the forwarding.
|
||||
fn create_websocket_proxy(
|
||||
async fn pump_websocket_messages(
|
||||
websocket: deno_websocket::tokio_tungstenite::WebSocketStream<
|
||||
hyper::upgrade::Upgraded,
|
||||
>,
|
||||
) -> (InspectorSessionProxy, impl Future<Output = ()> + Send) {
|
||||
// The 'outbound' channel carries messages sent to the websocket.
|
||||
let (outbound_tx, outbound_rx) = mpsc::unbounded();
|
||||
inbound_tx: UnboundedSender<Vec<u8>>,
|
||||
outbound_rx: UnboundedReceiver<(Option<i32>, String)>,
|
||||
) {
|
||||
let (websocket_tx, websocket_rx) = websocket.split();
|
||||
|
||||
// The 'inbound' channel carries messages received from the websocket.
|
||||
let (inbound_tx, inbound_rx) = mpsc::unbounded();
|
||||
let outbound_pump = outbound_rx
|
||||
.map(|(_maybe_call_id, msg)| tungstenite::Message::text(msg))
|
||||
.map(Ok)
|
||||
.forward(websocket_tx)
|
||||
.map_err(|_| ());
|
||||
|
||||
let proxy = InspectorSessionProxy {
|
||||
tx: outbound_tx,
|
||||
rx: inbound_rx,
|
||||
};
|
||||
|
||||
// The pump future takes care of forwarding messages between the websocket
|
||||
// and channels. It resolves to () when either side disconnects, ignoring any
|
||||
// errors.
|
||||
let pump = async move {
|
||||
let (websocket_tx, websocket_rx) = websocket.split();
|
||||
|
||||
let outbound_pump = outbound_rx
|
||||
.map(|(_maybe_call_id, msg)| tungstenite::Message::text(msg))
|
||||
.map(Ok)
|
||||
.forward(websocket_tx)
|
||||
.map_err(|_| ());
|
||||
|
||||
let inbound_pump = websocket_rx
|
||||
.map(|result| {
|
||||
let result = result.map(|msg| msg.into_data()).map_err(AnyError::from);
|
||||
inbound_tx.unbounded_send(result)
|
||||
let inbound_pump = async move {
|
||||
let result = websocket_rx
|
||||
.map_ok(|msg| msg.into_data())
|
||||
.map_err(AnyError::from)
|
||||
.map_ok(|msg| {
|
||||
let _ = inbound_tx.unbounded_send(msg);
|
||||
})
|
||||
.map_err(|_| ())
|
||||
.try_collect::<()>();
|
||||
.try_collect::<()>()
|
||||
.await;
|
||||
|
||||
let _ = future::try_join(outbound_pump, inbound_pump).await;
|
||||
match result {
|
||||
Ok(_) => eprintln!("Debugger session ended"),
|
||||
Err(err) => eprintln!("Debugger session ended: {}.", err),
|
||||
};
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
(proxy, pump)
|
||||
let _ = future::try_join(outbound_pump, inbound_pump).await;
|
||||
}
|
||||
|
||||
/// Inspector information that is sent from the isolate thread to the server
|
||||
|
|
|
@ -388,14 +388,7 @@ impl WebWorker {
|
|||
});
|
||||
|
||||
if let Some(server) = options.maybe_inspector_server.clone() {
|
||||
let inspector = js_runtime.inspector();
|
||||
let session_sender = inspector.get_session_sender();
|
||||
let deregister_rx = inspector.add_deregister_handler();
|
||||
server.register_inspector(
|
||||
session_sender,
|
||||
deregister_rx,
|
||||
main_module.to_string(),
|
||||
);
|
||||
server.register_inspector(main_module.to_string(), &mut js_runtime);
|
||||
}
|
||||
|
||||
let (internal_handle, external_handle) = {
|
||||
|
|
|
@ -161,14 +161,7 @@ impl MainWorker {
|
|||
});
|
||||
|
||||
if let Some(server) = options.maybe_inspector_server.clone() {
|
||||
let inspector = js_runtime.inspector();
|
||||
let session_sender = inspector.get_session_sender();
|
||||
let deregister_rx = inspector.add_deregister_handler();
|
||||
server.register_inspector(
|
||||
session_sender,
|
||||
deregister_rx,
|
||||
main_module.to_string(),
|
||||
);
|
||||
server.register_inspector(main_module.to_string(), &mut js_runtime);
|
||||
}
|
||||
|
||||
Self {
|
||||
|
|
Loading…
Reference in a new issue