From e9edd7e14d9d78f03c5f2e67fcc44e4dbaab8f2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Wed, 26 May 2021 17:47:33 +0200 Subject: [PATCH] refactor: Rewrite Inspector implementation (#10725) This commit refactors implementation of inspector. The intention is to be able to move inspector implementation to "deno_core". Following things were done to make that possible: * "runtime/inspector.rs" was split into "runtime/inspector/mod.rs" and "runtime/inspector/server.rs", separating inspector implementation from Websocket server implementation. * "DenoInspector" was renamed to "JsRuntimeInspector" and reference to "server" was removed from the structure, making it independent of Websocket server used to connect to Chrome Devtools. * "WebsocketSession" was renamed to "InspectorSession" and rewritten in such a way that it's not tied to Websockets anymore; instead it accepts a pair of "proxy" channel ends that allow to integrate the session with different "transports". * "InspectorSession" was renamed to "LocalInspectorSession" to better indicate that it's an "in-memory" session and doesn't require Websocket server. It was also rewritten in such a way that it uses "InspectorSession" from previous point instead of reimplementing "v8::inspector::ChannelImpl" trait; this is done by using the "proxy" channels to communicate with the V8 session. Consequently "LocalInspectorSession" is now a frontend to "InspectorSession". This introduces a small inconvenience that awaiting responses for "LocalInspectorSession" requires to concurrently poll worker's event loop. This arises from the fact that "InspectorSession" is now owned by "JsRuntimeInspector", which in turn is owned by "Worker" or "WebWorker". To ease this situation "Worker::with_event_loop" helper method was added, that takes a future and concurrently polls it along with the event loop (using "tokio::select!" macro inside a loop). --- cli/main.rs | 2 +- cli/tools/coverage.rs | 8 +- cli/tools/repl.rs | 218 ++++--- cli/tools/test_runner.rs | 11 +- runtime/inspector.rs | 1061 ----------------------------------- runtime/inspector/mod.rs | 751 +++++++++++++++++++++++++ runtime/inspector/server.rs | 389 +++++++++++++ runtime/web_worker.rs | 17 +- runtime/worker.rs | 50 +- 9 files changed, 1304 insertions(+), 1203 deletions(-) delete mode 100644 runtime/inspector.rs create mode 100644 runtime/inspector/mod.rs create mode 100644 runtime/inspector/server.rs diff --git a/cli/main.rs b/cli/main.rs index 60c202a7ad..185de5bfe0 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -866,7 +866,7 @@ async fn run_command(flags: Flags, script: String) -> Result<(), AnyError> { let mut maybe_coverage_collector = if let Some(ref coverage_dir) = program_state.coverage_dir { - let session = worker.create_inspector_session(); + let session = worker.create_inspector_session().await; let coverage_dir = PathBuf::from(coverage_dir); let mut coverage_collector = diff --git a/cli/tools/coverage.rs b/cli/tools/coverage.rs index 23b4a4fb67..204141b258 100644 --- a/cli/tools/coverage.rs +++ b/cli/tools/coverage.rs @@ -13,7 +13,7 @@ use deno_core::error::AnyError; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url::Url; -use deno_runtime::inspector::InspectorSession; +use deno_runtime::inspector::LocalInspectorSession; use deno_runtime::permissions::Permissions; use regex::Regex; use serde::Deserialize; @@ -26,18 +26,17 @@ use uuid::Uuid; pub struct CoverageCollector { pub dir: PathBuf, - session: Box, + session: LocalInspectorSession, } impl CoverageCollector { - pub fn new(dir: PathBuf, session: Box) -> Self { + pub fn new(dir: PathBuf, session: LocalInspectorSession) -> Self { Self { dir, session } } pub async fn start_collecting(&mut self) -> Result<(), AnyError> { self.session.post_message("Debugger.enable", None).await?; self.session.post_message("Profiler.enable", None).await?; - self .session .post_message( @@ -45,7 +44,6 @@ impl CoverageCollector { Some(json!({"callCount": true, "detailed": true})), ) .await?; - Ok(()) } diff --git a/cli/tools/repl.rs b/cli/tools/repl.rs index ce0879a852..ba69c92343 100644 --- a/cli/tools/repl.rs +++ b/cli/tools/repl.rs @@ -6,9 +6,10 @@ use crate::colors; use crate::media_type::MediaType; use crate::program_state::ProgramState; use deno_core::error::AnyError; +use deno_core::futures::FutureExt; use deno_core::serde_json::json; use deno_core::serde_json::Value; -use deno_runtime::inspector::InspectorSession; +use deno_runtime::inspector::LocalInspectorSession; use deno_runtime::worker::MainWorker; use rustyline::completion::Completer; use rustyline::error::ReadlineError; @@ -255,34 +256,9 @@ impl Highlighter for Helper { } } -async fn post_message_and_poll( - worker: &mut MainWorker, - session: &mut InspectorSession, - method: &str, - params: Option, -) -> Result { - let response = session.post_message(method, params); - tokio::pin!(response); - - loop { - tokio::select! { - result = &mut response => { - return result - } - - _ = worker.run_event_loop() => { - // A zero delay is long enough to yield the thread in order to prevent the loop from - // running hot for messages that are taking longer to resolve like for example an - // evaluation of top level await. - tokio::time::sleep(tokio::time::Duration::from_millis(0)).await; - } - } - } -} - async fn read_line_and_poll( worker: &mut MainWorker, - session: &mut InspectorSession, + session: &mut LocalInspectorSession, message_rx: &Receiver<(String, Option)>, response_tx: &Sender>, editor: Arc>>, @@ -294,9 +270,10 @@ async fn read_line_and_poll( loop { for (method, params) in message_rx.try_iter() { - response_tx - .send(session.post_message(&method, params).await) - .unwrap(); + let result = worker + .with_event_loop(session.post_message(&method, params).boxed_local()) + .await; + response_tx.send(result).unwrap(); } // Because an inspector websocket client may choose to connect at anytime when we have an @@ -353,44 +330,50 @@ Object.defineProperty(globalThis, "_error", { async fn inject_prelude( worker: &mut MainWorker, - session: &mut InspectorSession, + session: &mut LocalInspectorSession, context_id: u64, ) -> Result<(), AnyError> { - post_message_and_poll( - worker, - session, - "Runtime.evaluate", - Some(json!({ - "expression": PRELUDE, - "contextId": context_id, - })), - ) - .await?; + worker + .with_event_loop( + session + .post_message( + "Runtime.evaluate", + Some(json!({ + "expression": PRELUDE, + "contextId": context_id, + })), + ) + .boxed_local(), + ) + .await?; Ok(()) } pub async fn is_closing( worker: &mut MainWorker, - session: &mut InspectorSession, + session: &mut LocalInspectorSession, context_id: u64, ) -> Result { - let closed = post_message_and_poll( - worker, - session, - "Runtime.evaluate", - Some(json!({ - "expression": "(globalThis.closed)", - "contextId": context_id, - })), - ) - .await? - .get("result") - .unwrap() - .get("value") - .unwrap() - .as_bool() - .unwrap(); + let closed = worker + .with_event_loop( + session + .post_message( + "Runtime.evaluate", + Some(json!({ + "expression": "(globalThis.closed)", + "contextId": context_id, + })), + ) + .boxed_local(), + ) + .await? + .get("result") + .unwrap() + .get("value") + .unwrap() + .as_bool() + .unwrap(); Ok(closed) } @@ -399,13 +382,13 @@ pub async fn run( program_state: &ProgramState, mut worker: MainWorker, ) -> Result<(), AnyError> { - let mut session = worker.create_inspector_session(); + let mut session = worker.create_inspector_session().await; let history_file = program_state.dir.root.join("deno_history.txt"); - post_message_and_poll(&mut worker, &mut session, "Runtime.enable", None) + worker + .with_event_loop(session.post_message("Runtime.enable", None).boxed_local()) .await?; - // Enabling the runtime domain will always send trigger one executionContextCreated for each // context the inspector knows about so we grab the execution context from that since // our inspector does not support a default context (0 is an invalid context id). @@ -471,15 +454,15 @@ pub async fn run( line.clone() }; - let evaluate_response = post_message_and_poll( - &mut worker, - &mut session, - "Runtime.evaluate", - Some(json!({ - "expression": format!("'use strict'; void 0;\n{}", &wrapped_line), - "contextId": context_id, - "replMode": true, - })), + let evaluate_response = worker.with_event_loop( + session.post_message( + "Runtime.evaluate", + Some(json!({ + "expression": format!("'use strict'; void 0;\n{}", &wrapped_line), + "contextId": context_id, + "replMode": true, + })), + ).boxed_local() ) .await?; @@ -489,17 +472,20 @@ pub async fn run( if evaluate_response.get("exceptionDetails").is_some() && wrapped_line != line { - post_message_and_poll( - &mut worker, - &mut session, - "Runtime.evaluate", - Some(json!({ - "expression": format!("'use strict'; void 0;\n{}", &line), - "contextId": context_id, - "replMode": true, - })), - ) - .await? + worker + .with_event_loop( + session + .post_message( + "Runtime.evaluate", + Some(json!({ + "expression": format!("'use strict'; void 0;\n{}", &line), + "contextId": context_id, + "replMode": true, + })), + ) + .boxed_local(), + ) + .await? } else { evaluate_response }; @@ -515,48 +501,48 @@ pub async fn run( evaluate_response.get("exceptionDetails"); if evaluate_exception_details.is_some() { - post_message_and_poll( - &mut worker, - &mut session, - "Runtime.callFunctionOn", - Some(json!({ - "executionContextId": context_id, - "functionDeclaration": "function (object) { Deno[Deno.internal].lastThrownError = object; }", - "arguments": [ - evaluate_result, - ], - })), - ).await?; + worker.with_event_loop( + session.post_message( + "Runtime.callFunctionOn", + Some(json!({ + "executionContextId": context_id, + "functionDeclaration": "function (object) { Deno[Deno.internal].lastThrownError = object; }", + "arguments": [ + evaluate_result, + ], + })), + ).boxed_local() + ).await?; } else { - post_message_and_poll( - &mut worker, - &mut session, - "Runtime.callFunctionOn", - Some(json!({ - "executionContextId": context_id, - "functionDeclaration": "function (object) { Deno[Deno.internal].lastEvalResult = object; }", - "arguments": [ - evaluate_result, - ], - })), - ).await?; + worker.with_event_loop( + session.post_message( + "Runtime.callFunctionOn", + Some(json!({ + "executionContextId": context_id, + "functionDeclaration": "function (object) { Deno[Deno.internal].lastEvalResult = object; }", + "arguments": [ + evaluate_result, + ], + })), + ).boxed_local() + ).await?; } // TODO(caspervonb) we should investigate using previews here but to keep things // consistent with the previous implementation we just get the preview result from // Deno.inspectArgs. let inspect_response = - post_message_and_poll( - &mut worker, - &mut session, - "Runtime.callFunctionOn", - Some(json!({ - "executionContextId": context_id, - "functionDeclaration": "function (object) { return Deno[Deno.internal].inspectArgs(['%o', object], { colors: !Deno.noColor }); }", - "arguments": [ - evaluate_result, - ], - })), + worker.with_event_loop( + session.post_message( + "Runtime.callFunctionOn", + Some(json!({ + "executionContextId": context_id, + "functionDeclaration": "function (object) { return Deno[Deno.internal].inspectArgs(['%o', object], { colors: !Deno.noColor }); }", + "arguments": [ + evaluate_result, + ], + })), + ).boxed_local() ).await?; let inspect_result = inspect_response.get("result").unwrap(); diff --git a/cli/tools/test_runner.rs b/cli/tools/test_runner.rs index f112eef16c..7d1eae9ee3 100644 --- a/cli/tools/test_runner.rs +++ b/cli/tools/test_runner.rs @@ -14,6 +14,7 @@ use crate::tools::coverage::CoverageCollector; use deno_core::error::AnyError; use deno_core::futures::future; use deno_core::futures::stream; +use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; use deno_core::serde_json::json; use deno_core::url::Url; @@ -286,10 +287,12 @@ pub async fn run_test_file( let mut maybe_coverage_collector = if let Some(ref coverage_dir) = program_state.coverage_dir { - let session = worker.create_inspector_session(); + let session = worker.create_inspector_session().await; let coverage_dir = PathBuf::from(coverage_dir); let mut coverage_collector = CoverageCollector::new(coverage_dir, session); - coverage_collector.start_collecting().await?; + worker + .with_event_loop(coverage_collector.start_collecting().boxed_local()) + .await?; Some(coverage_collector) } else { @@ -308,7 +311,9 @@ pub async fn run_test_file( worker.execute("window.dispatchEvent(new Event('unload'))")?; if let Some(coverage_collector) = maybe_coverage_collector.as_mut() { - coverage_collector.stop_collecting().await?; + worker + .with_event_loop(coverage_collector.stop_collecting().boxed_local()) + .await?; } Ok(()) diff --git a/runtime/inspector.rs b/runtime/inspector.rs deleted file mode 100644 index 7254091d4d..0000000000 --- a/runtime/inspector.rs +++ /dev/null @@ -1,1061 +0,0 @@ -// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. - -//! The documentation for the inspector API is sparse, but these are helpful: -//! https://chromedevtools.github.io/devtools-protocol/ -//! https://hyperandroid.com/2020/02/12/v8-inspector-from-an-embedder-standpoint/ - -use core::convert::Infallible as Never; // Alias for the future `!` type. -use deno_core::error::generic_error; -use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; -use deno_core::futures::channel::mpsc::UnboundedReceiver; -use deno_core::futures::channel::mpsc::UnboundedSender; -use deno_core::futures::channel::oneshot; -use deno_core::futures::future::{self, Future}; -use deno_core::futures::pin_mut; -use deno_core::futures::prelude::*; -use deno_core::futures::select; -use deno_core::futures::stream::FuturesUnordered; -use deno_core::futures::stream::StreamExt; -use deno_core::futures::task; -use deno_core::futures::task::Context; -use deno_core::futures::task::Poll; -use deno_core::serde_json; -use deno_core::serde_json::json; -use deno_core::serde_json::Value; -use deno_core::v8; -use deno_websocket::tokio_tungstenite::tungstenite; -use std::collections::HashMap; -use std::ffi::c_void; -use std::mem::replace; -use std::mem::take; -use std::mem::MaybeUninit; -use std::net::SocketAddr; -use std::ops::Deref; -use std::ops::DerefMut; -use std::pin::Pin; -use std::process; -use std::ptr; -use std::ptr::NonNull; -use std::sync::Arc; -use std::sync::Mutex; -use std::thread; -use std::{cell::BorrowMutError, convert::Infallible}; -use std::{cell::RefCell, rc::Rc}; -use uuid::Uuid; - -pub struct InspectorServer { - pub host: SocketAddr, - register_inspector_tx: UnboundedSender, - shutdown_server_tx: Option>, - thread_handle: Option>, -} - -impl InspectorServer { - pub fn new(host: SocketAddr, name: String) -> Self { - let (register_inspector_tx, register_inspector_rx) = - mpsc::unbounded::(); - - let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel(); - - let thread_handle = thread::spawn(move || { - let rt = crate::tokio_util::create_basic_runtime(); - let local = tokio::task::LocalSet::new(); - local.block_on( - &rt, - server(host, register_inspector_rx, shutdown_server_rx, name), - ) - }); - - Self { - host, - register_inspector_tx, - shutdown_server_tx: Some(shutdown_server_tx), - thread_handle: Some(thread_handle), - } - } - - fn register_inspector(&self, info: InspectorInfo) { - self.register_inspector_tx.unbounded_send(info).unwrap(); - } -} - -impl Drop for InspectorServer { - fn drop(&mut self) { - if let Some(shutdown_server_tx) = self.shutdown_server_tx.take() { - shutdown_server_tx - .send(()) - .expect("unable to send shutdown signal"); - } - - if let Some(thread_handle) = self.thread_handle.take() { - thread_handle.join().expect("unable to join thread"); - } - } -} - -/// Inspector information that is sent from the isolate thread to the server -/// thread when a new inspector is created. -struct InspectorInfo { - host: SocketAddr, - uuid: Uuid, - thread_name: Option, - new_websocket_tx: UnboundedSender, - canary_rx: oneshot::Receiver, -} - -impl InspectorInfo { - fn get_json_metadata(&self) -> Value { - json!({ - "description": "deno", - "devtoolsFrontendUrl": self.get_frontend_url(), - "faviconUrl": "https://deno.land/favicon.ico", - "id": self.uuid.to_string(), - "title": self.get_title(), - "type": "node", - // TODO(ry): "url": "file://", - "webSocketDebuggerUrl": self.get_websocket_debugger_url(), - }) - } - - fn get_websocket_debugger_url(&self) -> String { - format!("ws://{}/ws/{}", &self.host, &self.uuid) - } - - fn get_frontend_url(&self) -> String { - format!( - "devtools://devtools/bundled/js_app.html?ws={}/ws/{}&experiments=true&v8only=true", - &self.host, &self.uuid - ) - } - - fn get_title(&self) -> String { - format!( - "[{}] deno{}", - process::id(), - self - .thread_name - .as_ref() - .map(|n| format!(" - {}", n)) - .unwrap_or_default() - ) - } -} - -// Needed so hyper can use non Send futures -#[derive(Clone)] -struct LocalExecutor; - -impl hyper::rt::Executor for LocalExecutor -where - Fut: Future + 'static, - Fut::Output: 'static, -{ - fn execute(&self, fut: Fut) { - tokio::task::spawn_local(fut); - } -} - -fn handle_ws_request( - req: http::Request, - inspector_map: Rc>>, -) -> http::Result> { - let (parts, body) = req.into_parts(); - let req = http::Request::from_parts(parts, ()); - - if let Some(new_websocket_tx) = req - .uri() - .path() - .strip_prefix("/ws/") - .and_then(|s| Uuid::parse_str(s).ok()) - .and_then(|uuid| { - inspector_map - .borrow() - .get(&uuid) - .map(|info| info.new_websocket_tx.clone()) - }) - { - let resp = tungstenite::handshake::server::create_response(&req) - .map(|resp| resp.map(|_| hyper::Body::empty())) - .or_else(|e| match e { - tungstenite::error::Error::HttpFormat(http_error) => Err(http_error), - _ => http::Response::builder() - .status(http::StatusCode::BAD_REQUEST) - .body("Not a valid Websocket Request".into()), - }); - - let (parts, _) = req.into_parts(); - let req = http::Request::from_parts(parts, body); - - if resp.is_ok() { - tokio::task::spawn_local(async move { - let upgraded = hyper::upgrade::on(req).await.unwrap(); - let websocket = - deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( - upgraded, - tungstenite::protocol::Role::Server, - None, - ) - .await; - let (proxy, pump) = create_websocket_proxy(websocket); - - let _ = new_websocket_tx.unbounded_send(proxy); - pump.await; - }); - } - resp - } else { - http::Response::builder() - .status(http::StatusCode::NOT_FOUND) - .body("No Valid inspector".into()) - } -} - -fn handle_json_request( - inspector_map: Rc>>, -) -> http::Result> { - let data = inspector_map - .borrow() - .values() - .map(|info| info.get_json_metadata()) - .collect::>(); - http::Response::builder() - .status(http::StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") - .body(serde_json::to_string(&data).unwrap().into()) -} - -fn handle_json_version_request( - version_response: Value, -) -> http::Result> { - http::Response::builder() - .status(http::StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") - .body(serde_json::to_string(&version_response).unwrap().into()) -} - -async fn server( - host: SocketAddr, - register_inspector_rx: UnboundedReceiver, - shutdown_server_rx: oneshot::Receiver<()>, - name: String, -) { - let inspector_map_ = - Rc::new(RefCell::new(HashMap::::new())); - - let inspector_map = Rc::clone(&inspector_map_); - let register_inspector_handler = register_inspector_rx - .map(|info| { - eprintln!( - "Debugger listening on {}", - info.get_websocket_debugger_url() - ); - if inspector_map.borrow_mut().insert(info.uuid, info).is_some() { - panic!("Inspector UUID already in map"); - } - }) - .collect::<()>(); - - let inspector_map = Rc::clone(&inspector_map_); - let deregister_inspector_handler = future::poll_fn(|cx| { - inspector_map - .borrow_mut() - .retain(|_, info| info.canary_rx.poll_unpin(cx) == Poll::Pending); - Poll::::Pending - }) - .fuse(); - - let json_version_response = json!({ - "Browser": name, - "Protocol-Version": "1.3", - "V8-Version": deno_core::v8_version(), - }); - - let make_svc = hyper::service::make_service_fn(|_| { - let inspector_map = Rc::clone(&inspector_map_); - let json_version_response = json_version_response.clone(); - - future::ok::<_, Infallible>(hyper::service::service_fn( - move |req: http::Request| { - future::ready({ - match (req.method(), req.uri().path()) { - (&http::Method::GET, path) if path.starts_with("/ws/") => { - handle_ws_request(req, inspector_map.clone()) - } - (&http::Method::GET, "/json/version") => { - handle_json_version_request(json_version_response.clone()) - } - (&http::Method::GET, "/json") => { - handle_json_request(inspector_map.clone()) - } - (&http::Method::GET, "/json/list") => { - handle_json_request(inspector_map.clone()) - } - _ => http::Response::builder() - .status(http::StatusCode::NOT_FOUND) - .body("Not Found".into()), - } - }) - }, - )) - }); - - // Create the server manually so it can use the Local Executor - let server_handler = hyper::server::Builder::new( - hyper::server::conn::AddrIncoming::bind(&host).unwrap_or_else(|e| { - eprintln!("Cannot start inspector server: {}.", e); - process::exit(1); - }), - hyper::server::conn::Http::new().with_executor(LocalExecutor), - ) - .serve(make_svc) - .with_graceful_shutdown(async { - shutdown_server_rx.await.ok(); - }) - .unwrap_or_else(|err| { - eprintln!("Cannot start inspector server: {}.", err); - process::exit(1); - }) - .fuse(); - - pin_mut!(register_inspector_handler); - pin_mut!(deregister_inspector_handler); - pin_mut!(server_handler); - - select! { - _ = register_inspector_handler => {}, - _ = deregister_inspector_handler => unreachable!(), - _ = server_handler => {}, - } -} - -type WebSocketProxySender = UnboundedSender; -type WebSocketProxyReceiver = - UnboundedReceiver>; - -/// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form -/// a duplex channel for sending/receiving websocket messages. -struct WebSocketProxy { - tx: WebSocketProxySender, - rx: WebSocketProxyReceiver, -} - -impl WebSocketProxy { - pub fn split(self) -> (WebSocketProxySender, WebSocketProxyReceiver) { - (self.tx, self.rx) - } -} - -/// Creates a future that proxies messages sent and received on a warp WebSocket -/// to a UnboundedSender/UnboundedReceiver pair. We need this to sidestep -/// Tokio's task budget, which causes issues when DenoInspector::poll_sessions() -/// needs to block the thread because JavaScript execution is paused. -/// -/// This works because UnboundedSender/UnboundedReceiver are implemented in the -/// 'futures' crate, therefore they can't participate in Tokio's cooperative -/// task yielding. -/// -/// A tuple is returned, where the first element is a duplex channel that can -/// be used to send/receive messages on the websocket, and the second element -/// is a future that does the forwarding. -fn create_websocket_proxy( - websocket: deno_websocket::tokio_tungstenite::WebSocketStream< - hyper::upgrade::Upgraded, - >, -) -> (WebSocketProxy, impl Future + Send) { - // The 'outbound' channel carries messages sent to the websocket. - let (outbound_tx, outbound_rx) = mpsc::unbounded(); - - // The 'inbound' channel carries messages received from the websocket. - let (inbound_tx, inbound_rx) = mpsc::unbounded(); - - let proxy = WebSocketProxy { - tx: outbound_tx, - rx: inbound_rx, - }; - - // The pump future takes care of forwarding messages between the websocket - // and channels. It resolves to () when either side disconnects, ignoring any - // errors. - let pump = async move { - let (websocket_tx, websocket_rx) = websocket.split(); - - let outbound_pump = - outbound_rx.map(Ok).forward(websocket_tx).map_err(|_| ()); - - let inbound_pump = websocket_rx - .map(|msg| inbound_tx.unbounded_send(msg)) - .map_err(|_| ()) - .try_collect::<()>(); - - let _ = future::try_join(outbound_pump, inbound_pump).await; - }; - - (proxy, pump) -} - -#[derive(Clone, Copy)] -enum PollState { - Idle, - Woken, - Polling, - Parked, - Dropped, -} - -pub struct DenoInspector { - v8_inspector_client: v8::inspector::V8InspectorClientBase, - v8_inspector: v8::UniquePtr, - sessions: RefCell, - flags: RefCell, - waker: Arc, - _canary_tx: oneshot::Sender, - pub server: Option>, - pub debugger_url: Option, -} - -impl Deref for DenoInspector { - type Target = v8::inspector::V8Inspector; - fn deref(&self) -> &Self::Target { - self.v8_inspector.as_ref().unwrap() - } -} - -impl DerefMut for DenoInspector { - fn deref_mut(&mut self) -> &mut Self::Target { - self.v8_inspector.as_mut().unwrap() - } -} - -impl Drop for DenoInspector { - fn drop(&mut self) { - // Since the waker is cloneable, it might outlive the inspector itself. - // Set the poll state to 'dropped' so it doesn't attempt to request an - // interrupt from the isolate. - self.waker.update(|w| w.poll_state = PollState::Dropped); - // V8 automatically deletes all sessions when an Inspector instance is - // deleted, however InspectorSession also has a drop handler that cleans - // up after itself. To avoid a double free, make sure the inspector is - // dropped last. - take(&mut *self.sessions.borrow_mut()); - } -} - -impl v8::inspector::V8InspectorClientImpl for DenoInspector { - fn base(&self) -> &v8::inspector::V8InspectorClientBase { - &self.v8_inspector_client - } - - fn base_mut(&mut self) -> &mut v8::inspector::V8InspectorClientBase { - &mut self.v8_inspector_client - } - - fn run_message_loop_on_pause(&mut self, context_group_id: i32) { - assert_eq!(context_group_id, DenoInspectorSession::CONTEXT_GROUP_ID); - self.flags.borrow_mut().on_pause = true; - let _ = self.poll_sessions(None); - } - - fn quit_message_loop_on_pause(&mut self) { - self.flags.borrow_mut().on_pause = false; - } - - fn run_if_waiting_for_debugger(&mut self, context_group_id: i32) { - assert_eq!(context_group_id, DenoInspectorSession::CONTEXT_GROUP_ID); - self.flags.borrow_mut().session_handshake_done = true; - } -} - -/// DenoInspector implements a Future so that it can poll for new incoming -/// connections and messages from the WebSocket server. The Worker that owns -/// this DenoInspector will call our poll function from Worker::poll(). -impl Future for DenoInspector { - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { - self.poll_sessions(Some(cx)).unwrap() - } -} - -impl DenoInspector { - const CONTEXT_GROUP_ID: i32 = 1; - - pub fn new( - js_runtime: &mut deno_core::JsRuntime, - server: Option>, - ) -> Box { - let context = js_runtime.global_context(); - let scope = &mut v8::HandleScope::new(js_runtime.v8_isolate()); - - let (new_websocket_tx, new_websocket_rx) = - mpsc::unbounded::(); - let (canary_tx, canary_rx) = oneshot::channel::(); - - // Create DenoInspector instance. - let mut self_ = new_box_with(|self_ptr| { - let v8_inspector_client = - v8::inspector::V8InspectorClientBase::new::(); - - let sessions = InspectorSessions::new(self_ptr, new_websocket_rx); - let flags = InspectorFlags::new(); - let waker = InspectorWaker::new(scope.thread_safe_handle()); - - let debugger_url = if let Some(server) = server.clone() { - let info = InspectorInfo { - host: server.host, - uuid: Uuid::new_v4(), - thread_name: thread::current().name().map(|n| n.to_owned()), - new_websocket_tx, - canary_rx, - }; - - let debugger_url = info.get_websocket_debugger_url(); - server.register_inspector(info); - Some(debugger_url) - } else { - None - }; - - Self { - v8_inspector_client, - v8_inspector: Default::default(), - sessions, - flags, - waker, - _canary_tx: canary_tx, - server, - debugger_url, - } - }); - self_.v8_inspector = - v8::inspector::V8Inspector::create(scope, &mut *self_).into(); - - // Tell the inspector about the global context. - let context = v8::Local::new(scope, context); - let context_name = v8::inspector::StringView::from(&b"global context"[..]); - self_.context_created(context, Self::CONTEXT_GROUP_ID, context_name); - - // Poll the session handler so we will get notified whenever there is - // new_incoming debugger activity. - let _ = self_.poll_sessions(None).unwrap(); - - self_ - } - - fn poll_sessions( - &self, - mut invoker_cx: Option<&mut Context>, - ) -> Result, BorrowMutError> { - // Short-circuit if there is no server - if self.server.is_none() { - return Ok(Poll::Ready(())); - } - - // The futures this function uses do not have re-entrant poll() functions. - // However it is can happpen that poll_sessions() gets re-entered, e.g. - // when an interrupt request is honored while the inspector future is polled - // by the task executor. We let the caller know by returning some error. - let mut sessions = self.sessions.try_borrow_mut()?; - - self.waker.update(|w| { - match w.poll_state { - PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling, - _ => unreachable!(), - }; - }); - - // Create a new Context object that will make downstream futures - // use the InspectorWaker when they are ready to be polled again. - let waker_ref = task::waker_ref(&self.waker); - let cx = &mut Context::from_waker(&waker_ref); - - loop { - loop { - // Do one "handshake" with a newly connected session at a time. - if let Some(session) = &mut sessions.handshake { - let poll_result = session.poll_unpin(cx); - let handshake_done = - replace(&mut self.flags.borrow_mut().session_handshake_done, false); - match poll_result { - Poll::Pending if handshake_done => { - let session = sessions.handshake.take().unwrap(); - sessions.established.push(session); - take(&mut self.flags.borrow_mut().waiting_for_session); - } - Poll::Ready(_) => sessions.handshake = None, - Poll::Pending => break, - }; - } - - // Accept new connections. - match sessions.new_incoming.poll_next_unpin(cx) { - Poll::Ready(Some(session)) => { - let prev = sessions.handshake.replace(session); - assert!(prev.is_none()); - continue; - } - Poll::Ready(None) => {} - Poll::Pending => {} - } - - // Poll established sessions. - match sessions.established.poll_next_unpin(cx) { - Poll::Ready(Some(_)) => continue, - Poll::Ready(None) => break, - Poll::Pending => break, - }; - } - - let should_block = sessions.handshake.is_some() - || self.flags.borrow().on_pause - || self.flags.borrow().waiting_for_session; - - let new_state = self.waker.update(|w| { - match w.poll_state { - PollState::Woken => { - // The inspector was woken while the session handler was being - // polled, so we poll it another time. - w.poll_state = PollState::Polling; - } - PollState::Polling if !should_block => { - // The session handler doesn't need to be polled any longer, and - // there's no reason to block (execution is not paused), so this - // function is about to return. - w.poll_state = PollState::Idle; - // Register the task waker that can be used to wake the parent - // task that will poll the inspector future. - if let Some(cx) = invoker_cx.take() { - w.task_waker.replace(cx.waker().clone()); - } - // Register the address of the inspector, which allows the waker - // to request an interrupt from the isolate. - w.inspector_ptr = NonNull::new(self as *const _ as *mut Self); - } - PollState::Polling if should_block => { - // Isolate execution has been paused but there are no more - // events to process, so this thread will be parked. Therefore, - // store the current thread handle in the waker so it knows - // which thread to unpark when new events arrive. - w.poll_state = PollState::Parked; - w.parked_thread.replace(thread::current()); - } - _ => unreachable!(), - }; - w.poll_state - }); - match new_state { - PollState::Idle => break Ok(Poll::Pending), // Yield to task. - PollState::Polling => {} // Poll the session handler again. - PollState::Parked => thread::park(), // Park the thread. - _ => unreachable!(), - }; - } - } - - /// This function blocks the thread until at least one inspector client has - /// established a websocket connection and successfully completed the - /// handshake. After that, it instructs V8 to pause at the next statement. - pub fn wait_for_session_and_break_on_next_statement(&mut self) { - loop { - match self.sessions.get_mut().established.iter_mut().next() { - Some(session) => break session.break_on_next_statement(), - None => { - self.flags.get_mut().waiting_for_session = true; - let _ = self.poll_sessions(None).unwrap(); - } - }; - } - } -} - -#[derive(Default)] -struct InspectorFlags { - waiting_for_session: bool, - session_handshake_done: bool, - on_pause: bool, -} - -impl InspectorFlags { - fn new() -> RefCell { - let self_ = Self::default(); - RefCell::new(self_) - } -} - -struct InspectorSessions { - new_incoming: - Pin> + 'static>>, - handshake: Option>, - established: FuturesUnordered>, -} - -impl InspectorSessions { - fn new( - inspector_ptr: *mut DenoInspector, - new_websocket_rx: UnboundedReceiver, - ) -> RefCell { - let new_incoming = new_websocket_rx - .map(move |websocket| DenoInspectorSession::new(inspector_ptr, websocket)) - .boxed_local(); - let self_ = Self { - new_incoming, - ..Default::default() - }; - RefCell::new(self_) - } -} - -impl Default for InspectorSessions { - fn default() -> Self { - Self { - new_incoming: stream::empty().boxed_local(), - handshake: None, - established: FuturesUnordered::new(), - } - } -} - -struct InspectorWakerInner { - poll_state: PollState, - task_waker: Option, - parked_thread: Option, - inspector_ptr: Option>, - isolate_handle: v8::IsolateHandle, -} - -unsafe impl Send for InspectorWakerInner {} - -struct InspectorWaker(Mutex); - -impl InspectorWaker { - fn new(isolate_handle: v8::IsolateHandle) -> Arc { - let inner = InspectorWakerInner { - poll_state: PollState::Idle, - task_waker: None, - parked_thread: None, - inspector_ptr: None, - isolate_handle, - }; - Arc::new(Self(Mutex::new(inner))) - } - - fn update(&self, update_fn: F) -> R - where - F: FnOnce(&mut InspectorWakerInner) -> R, - { - let mut g = self.0.lock().unwrap(); - update_fn(&mut g) - } -} - -impl task::ArcWake for InspectorWaker { - fn wake_by_ref(arc_self: &Arc) { - arc_self.update(|w| { - match w.poll_state { - PollState::Idle => { - // Wake the task, if any, that has polled the Inspector future last. - if let Some(waker) = w.task_waker.take() { - waker.wake() - } - // Request an interrupt from the isolate if it's running and there's - // not unhandled interrupt request in flight. - if let Some(arg) = w - .inspector_ptr - .take() - .map(|ptr| ptr.as_ptr() as *mut c_void) - { - w.isolate_handle.request_interrupt(handle_interrupt, arg); - } - extern "C" fn handle_interrupt( - _isolate: &mut v8::Isolate, - arg: *mut c_void, - ) { - let inspector = unsafe { &*(arg as *mut DenoInspector) }; - let _ = inspector.poll_sessions(None); - } - } - PollState::Parked => { - // Unpark the isolate thread. - let parked_thread = w.parked_thread.take().unwrap(); - assert_ne!(parked_thread.id(), thread::current().id()); - parked_thread.unpark(); - } - _ => {} - }; - w.poll_state = PollState::Woken; - }); - } -} - -struct DenoInspectorSession { - v8_channel: v8::inspector::ChannelBase, - v8_session: v8::UniqueRef, - websocket_tx: WebSocketProxySender, - websocket_rx_handler: Pin + 'static>>, -} - -impl Deref for DenoInspectorSession { - type Target = v8::inspector::V8InspectorSession; - fn deref(&self) -> &Self::Target { - &self.v8_session - } -} - -impl DerefMut for DenoInspectorSession { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.v8_session - } -} - -impl DenoInspectorSession { - const CONTEXT_GROUP_ID: i32 = 1; - - pub fn new( - inspector_ptr: *mut DenoInspector, - websocket: WebSocketProxy, - ) -> Box { - new_box_with(move |self_ptr| { - let v8_channel = v8::inspector::ChannelBase::new::(); - let v8_session = unsafe { &mut *inspector_ptr }.connect( - Self::CONTEXT_GROUP_ID, - // Todo(piscisaureus): V8Inspector::connect() should require that - // the 'v8_channel' argument cannot move. - unsafe { &mut *self_ptr }, - v8::inspector::StringView::empty(), - ); - - let (websocket_tx, websocket_rx) = websocket.split(); - let websocket_rx_handler = - Self::receive_from_websocket(self_ptr, websocket_rx); - - Self { - v8_channel, - v8_session, - websocket_tx, - websocket_rx_handler, - } - }) - } - - /// Returns a future that receives messages from the websocket and dispatches - /// them to the V8 session. - fn receive_from_websocket( - self_ptr: *mut Self, - websocket_rx: WebSocketProxyReceiver, - ) -> Pin + 'static>> { - async move { - eprintln!("Debugger session started."); - - let result = websocket_rx - .map_ok(move |msg| { - let msg = msg.into_data(); - let msg = v8::inspector::StringView::from(msg.as_slice()); - unsafe { &mut *self_ptr }.dispatch_protocol_message(msg); - }) - .try_collect::<()>() - .await; - - match result { - Ok(_) => eprintln!("Debugger session ended."), - Err(err) => eprintln!("Debugger session ended: {}.", err), - }; - } - .boxed_local() - } - - fn send_to_websocket(&self, msg: v8::UniquePtr) { - let msg = msg.unwrap().string().to_string(); - let msg = tungstenite::Message::text(msg); - let _ = self.websocket_tx.unbounded_send(msg); - } - - pub fn break_on_next_statement(&mut self) { - let reason = v8::inspector::StringView::from(&b"debugCommand"[..]); - let detail = v8::inspector::StringView::empty(); - self.schedule_pause_on_next_statement(reason, detail); - } -} - -impl v8::inspector::ChannelImpl for DenoInspectorSession { - fn base(&self) -> &v8::inspector::ChannelBase { - &self.v8_channel - } - - fn base_mut(&mut self) -> &mut v8::inspector::ChannelBase { - &mut self.v8_channel - } - - fn send_response( - &mut self, - _call_id: i32, - message: v8::UniquePtr, - ) { - self.send_to_websocket(message); - } - - fn send_notification( - &mut self, - message: v8::UniquePtr, - ) { - self.send_to_websocket(message); - } - - fn flush_protocol_notifications(&mut self) {} -} - -impl Future for DenoInspectorSession { - type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - self.websocket_rx_handler.poll_unpin(cx) - } -} - -/// A local inspector session that can be used to send and receive protocol messages directly on -/// the same thread as an isolate. -pub struct InspectorSession { - v8_channel: v8::inspector::ChannelBase, - v8_session: v8::UniqueRef, - response_tx_map: HashMap>, - next_message_id: i32, - notification_queue: Vec, -} - -impl Deref for InspectorSession { - type Target = v8::inspector::V8InspectorSession; - fn deref(&self) -> &Self::Target { - &self.v8_session - } -} - -impl DerefMut for InspectorSession { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.v8_session - } -} - -impl v8::inspector::ChannelImpl for InspectorSession { - fn base(&self) -> &v8::inspector::ChannelBase { - &self.v8_channel - } - - fn base_mut(&mut self) -> &mut v8::inspector::ChannelBase { - &mut self.v8_channel - } - - fn send_response( - &mut self, - call_id: i32, - message: v8::UniquePtr, - ) { - let raw_message = message.unwrap().string().to_string(); - let message: serde_json::Value = match serde_json::from_str(&raw_message) { - Ok(v) => v, - Err(error) => match error.classify() { - serde_json::error::Category::Syntax => json!({ - "id": call_id, - "result": { - "result": { - "type": "error", - "description": "Unterminated string literal", - "value": "Unterminated string literal", - }, - "exceptionDetails": { - "exceptionId": 0, - "text": "Unterminated string literal", - "lineNumber": 0, - "columnNumber": 0 - }, - }, - }), - _ => panic!("Could not parse inspector message"), - }, - }; - - self - .response_tx_map - .remove(&call_id) - .unwrap() - .send(message) - .unwrap(); - } - - fn send_notification( - &mut self, - message: v8::UniquePtr, - ) { - let raw_message = message.unwrap().string().to_string(); - let message = serde_json::from_str(&raw_message).unwrap(); - - self.notification_queue.push(message); - } - - fn flush_protocol_notifications(&mut self) {} -} - -impl InspectorSession { - const CONTEXT_GROUP_ID: i32 = 1; - - pub fn new(inspector_ptr: *mut DenoInspector) -> Box { - new_box_with(move |self_ptr| { - let v8_channel = v8::inspector::ChannelBase::new::(); - let v8_session = unsafe { &mut *inspector_ptr }.connect( - Self::CONTEXT_GROUP_ID, - unsafe { &mut *self_ptr }, - v8::inspector::StringView::empty(), - ); - - let response_tx_map = HashMap::new(); - let next_message_id = 0; - - let notification_queue = Vec::new(); - - Self { - v8_channel, - v8_session, - response_tx_map, - next_message_id, - notification_queue, - } - }) - } - - pub fn notifications(&mut self) -> Vec { - self.notification_queue.split_off(0) - } - - pub async fn post_message( - &mut self, - method: &str, - params: Option, - ) -> Result { - let id = self.next_message_id; - self.next_message_id += 1; - - let (response_tx, response_rx) = oneshot::channel::(); - self.response_tx_map.insert(id, response_tx); - - let message = json!({ - "id": id, - "method": method, - "params": params, - }); - - let raw_message = serde_json::to_string(&message).unwrap(); - let raw_message = v8::inspector::StringView::from(raw_message.as_bytes()); - self.v8_session.dispatch_protocol_message(raw_message); - - let response = response_rx.await.unwrap(); - if let Some(error) = response.get("error") { - return Err(generic_error(error.to_string())); - } - - let result = response.get("result").unwrap().clone(); - Ok(result) - } -} - -fn new_box_with(new_fn: impl FnOnce(*mut T) -> T) -> Box { - let b = Box::new(MaybeUninit::::uninit()); - let p = Box::into_raw(b) as *mut T; - unsafe { ptr::write(p, new_fn(p)) }; - unsafe { Box::from_raw(p) } -} diff --git a/runtime/inspector/mod.rs b/runtime/inspector/mod.rs new file mode 100644 index 0000000000..08d3883026 --- /dev/null +++ b/runtime/inspector/mod.rs @@ -0,0 +1,751 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +//! The documentation for the inspector API is sparse, but these are helpful: +//! https://chromedevtools.github.io/devtools-protocol/ +//! https://hyperandroid.com/2020/02/12/v8-inspector-from-an-embedder-standpoint/ + +use deno_core::error::generic_error; +use deno_core::error::AnyError; +use deno_core::futures::channel::mpsc; +use deno_core::futures::channel::mpsc::UnboundedReceiver; +use deno_core::futures::channel::mpsc::UnboundedSender; +use deno_core::futures::channel::oneshot; +use deno_core::futures::future::select; +use deno_core::futures::future::Either; +use deno_core::futures::future::Future; +use deno_core::futures::prelude::*; +use deno_core::futures::stream::FuturesUnordered; +use deno_core::futures::stream::StreamExt; +use deno_core::futures::task; +use deno_core::futures::task::Context; +use deno_core::futures::task::Poll; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_core::v8; +use std::cell::BorrowMutError; +use std::cell::RefCell; +use std::collections::HashMap; +use std::ffi::c_void; +use std::mem::replace; +use std::mem::take; +use std::mem::MaybeUninit; +use std::pin::Pin; +use std::ptr; +use std::ptr::NonNull; +use std::rc::Rc; +use std::sync::Arc; +use std::sync::Mutex; +use std::thread; + +mod server; + +pub use server::InspectorServer; + +/// If first argument is `None` then it's a notification, otherwise +/// it's a message. +pub type SessionProxySender = UnboundedSender<(Option, String)>; +// TODO(bartlomieju): does it even need to send a Result? +// It seems `Vec` would be enough +pub type SessionProxyReceiver = UnboundedReceiver, AnyError>>; + +/// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form +/// a duplex channel for sending/receiving messages in V8 session. +pub struct SessionProxy { + pub tx: SessionProxySender, + pub rx: SessionProxyReceiver, +} + +impl SessionProxy { + pub fn split(self) -> (SessionProxySender, SessionProxyReceiver) { + (self.tx, self.rx) + } +} + +#[derive(Clone, Copy)] +enum PollState { + Idle, + Woken, + Polling, + Parked, + Dropped, +} + +/// This structure is used responsible for providing inspector interface +/// to the `JsRuntime`. +/// +/// It stores an instance of `v8::inspector::V8Inspector` and additionally +/// implements `v8::inspector::V8InspectorClientImpl`. +/// +/// After creating this structure it's possible to connect multiple sessions +/// to the inspector, in case of Deno it's either: a "websocket session" that +/// provides integration with Chrome Devtools, or an "in-memory session" that +/// is used for REPL or converage collection. +pub struct JsRuntimeInspector { + v8_inspector_client: v8::inspector::V8InspectorClientBase, + v8_inspector: Rc>>, + new_session_tx: UnboundedSender, + sessions: RefCell, + flags: RefCell, + waker: Arc, + deregister_tx: Option>, +} + +impl Drop for JsRuntimeInspector { + fn drop(&mut self) { + // Since the waker is cloneable, it might outlive the inspector itself. + // Set the poll state to 'dropped' so it doesn't attempt to request an + // interrupt from the isolate. + self.waker.update(|w| w.poll_state = PollState::Dropped); + // TODO(bartlomieju): this comment is out of date + // V8 automatically deletes all sessions when an `V8Inspector` instance is + // deleted, however InspectorSession also has a drop handler that cleans + // up after itself. To avoid a double free, make sure the inspector is + // dropped last. + take(&mut *self.sessions.borrow_mut()); + + // Notify counterparty that this instance is being destroyed. Ignoring + // result because counterparty waiting for the signal might have already + // dropped the other end of channel. + if let Some(deregister_tx) = self.deregister_tx.take() { + let _ = deregister_tx.send(()); + } + } +} + +impl v8::inspector::V8InspectorClientImpl for JsRuntimeInspector { + fn base(&self) -> &v8::inspector::V8InspectorClientBase { + &self.v8_inspector_client + } + + fn base_mut(&mut self) -> &mut v8::inspector::V8InspectorClientBase { + &mut self.v8_inspector_client + } + + fn run_message_loop_on_pause(&mut self, context_group_id: i32) { + assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID); + self.flags.borrow_mut().on_pause = true; + let _ = self.poll_sessions(None); + } + + fn quit_message_loop_on_pause(&mut self) { + self.flags.borrow_mut().on_pause = false; + } + + fn run_if_waiting_for_debugger(&mut self, context_group_id: i32) { + assert_eq!(context_group_id, JsRuntimeInspector::CONTEXT_GROUP_ID); + self.flags.borrow_mut().session_handshake_done = true; + } +} + +/// `JsRuntimeInspector` implements a Future so that it can poll for new incoming +/// connections and messages from the WebSocket server. The Worker that owns +/// this `JsRuntimeInspector` will call this function from `Worker::poll()`. +impl Future for JsRuntimeInspector { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { + self.poll_sessions(Some(cx)).unwrap() + } +} + +impl JsRuntimeInspector { + /// Currently Deno supports only a single context in `JsRuntime` + /// and thus it's id is provided as an associated contant. + const CONTEXT_GROUP_ID: i32 = 1; + + pub fn new(js_runtime: &mut deno_core::JsRuntime) -> Box { + let context = js_runtime.global_context(); + let scope = &mut v8::HandleScope::new(js_runtime.v8_isolate()); + + let (new_session_tx, new_session_rx) = mpsc::unbounded::(); + + let v8_inspector_client = + v8::inspector::V8InspectorClientBase::new::(); + + let flags = InspectorFlags::new(); + let waker = InspectorWaker::new(scope.thread_safe_handle()); + + // Create JsRuntimeInspector instance. + let mut self_ = Box::new(Self { + v8_inspector_client, + v8_inspector: Default::default(), + sessions: Default::default(), + new_session_tx, + flags, + waker, + deregister_tx: None, + }); + self_.v8_inspector = Rc::new(RefCell::new( + v8::inspector::V8Inspector::create(scope, &mut *self_).into(), + )); + self_.sessions = + SessionContainer::new(self_.v8_inspector.clone(), new_session_rx); + + // Tell the inspector about the global context. + let context = v8::Local::new(scope, context); + let context_name = v8::inspector::StringView::from(&b"global context"[..]); + self_ + .v8_inspector + .borrow_mut() + .as_mut() + .unwrap() + .context_created(context, Self::CONTEXT_GROUP_ID, context_name); + + // Poll the session handler so we will get notified whenever there is + // new_incoming debugger activity. + let _ = self_.poll_sessions(None).unwrap(); + + self_ + } + + fn poll_sessions( + &self, + mut invoker_cx: Option<&mut Context>, + ) -> Result, BorrowMutError> { + // The futures this function uses do not have re-entrant poll() functions. + // However it is can happpen that poll_sessions() gets re-entered, e.g. + // when an interrupt request is honored while the inspector future is polled + // by the task executor. We let the caller know by returning some error. + let mut sessions = self.sessions.try_borrow_mut()?; + + self.waker.update(|w| { + match w.poll_state { + PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling, + _ => unreachable!(), + }; + }); + + // Create a new Context object that will make downstream futures + // use the InspectorWaker when they are ready to be polled again. + let waker_ref = task::waker_ref(&self.waker); + let cx = &mut Context::from_waker(&waker_ref); + + loop { + loop { + // Do one "handshake" with a newly connected session at a time. + if let Some(session) = &mut sessions.handshake { + let poll_result = session.poll_unpin(cx); + let handshake_done = + replace(&mut self.flags.borrow_mut().session_handshake_done, false); + match poll_result { + Poll::Pending if handshake_done => { + let session = sessions.handshake.take().unwrap(); + sessions.established.push(session); + take(&mut self.flags.borrow_mut().waiting_for_session); + } + Poll::Ready(_) => sessions.handshake = None, + Poll::Pending => break, + }; + } + + // Accept new connections. + match sessions.new_incoming.poll_next_unpin(cx) { + Poll::Ready(Some(session)) => { + let prev = sessions.handshake.replace(session); + assert!(prev.is_none()); + continue; + } + Poll::Ready(None) => {} + Poll::Pending => {} + } + + // Poll established sessions. + match sessions.established.poll_next_unpin(cx) { + Poll::Ready(Some(_)) => continue, + Poll::Ready(None) => break, + Poll::Pending => break, + }; + } + + let should_block = sessions.handshake.is_some() + || self.flags.borrow().on_pause + || self.flags.borrow().waiting_for_session; + + let new_state = self.waker.update(|w| { + match w.poll_state { + PollState::Woken => { + // The inspector was woken while the session handler was being + // polled, so we poll it another time. + w.poll_state = PollState::Polling; + } + PollState::Polling if !should_block => { + // The session handler doesn't need to be polled any longer, and + // there's no reason to block (execution is not paused), so this + // function is about to return. + w.poll_state = PollState::Idle; + // Register the task waker that can be used to wake the parent + // task that will poll the inspector future. + if let Some(cx) = invoker_cx.take() { + w.task_waker.replace(cx.waker().clone()); + } + // Register the address of the inspector, which allows the waker + // to request an interrupt from the isolate. + w.inspector_ptr = NonNull::new(self as *const _ as *mut Self); + } + PollState::Polling if should_block => { + // Isolate execution has been paused but there are no more + // events to process, so this thread will be parked. Therefore, + // store the current thread handle in the waker so it knows + // which thread to unpark when new events arrive. + w.poll_state = PollState::Parked; + w.parked_thread.replace(thread::current()); + } + _ => unreachable!(), + }; + w.poll_state + }); + match new_state { + PollState::Idle => break Ok(Poll::Pending), // Yield to task. + PollState::Polling => {} // Poll the session handler again. + PollState::Parked => thread::park(), // Park the thread. + _ => unreachable!(), + }; + } + } + + /// This function blocks the thread until at least one inspector client has + /// established a websocket connection and successfully completed the + /// handshake. After that, it instructs V8 to pause at the next statement. + pub fn wait_for_session_and_break_on_next_statement(&mut self) { + loop { + match self.sessions.get_mut().established.iter_mut().next() { + Some(session) => break session.break_on_next_statement(), + None => { + self.flags.get_mut().waiting_for_session = true; + let _ = self.poll_sessions(None).unwrap(); + } + }; + } + } + + /// Obtain a sender for proxy channels. + /// + /// After a proxy is sent inspector will wait for a "handshake". + /// Frontend must send "Runtime.runIfWaitingForDebugger" message to + /// complete the handshake. + pub fn get_session_sender(&self) -> UnboundedSender { + self.new_session_tx.clone() + } + + /// Create a channel that notifies the frontend when inspector is dropped. + /// + /// NOTE: Only a single handler is currently available. + pub fn add_deregister_handler(&mut self) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel::<()>(); + let prev = self.deregister_tx.replace(tx); + assert!( + prev.is_none(), + "Only a single deregister handler is allowed" + ); + rx + } + + /// Create a local inspector session that can be used on + /// the same thread as the isolate. + pub fn create_local_session(&self) -> LocalInspectorSession { + // The 'outbound' channel carries messages sent to the session. + let (outbound_tx, outbound_rx) = mpsc::unbounded(); + + // The 'inbound' channel carries messages received from the session. + let (inbound_tx, inbound_rx) = mpsc::unbounded(); + + let proxy = SessionProxy { + tx: outbound_tx, + rx: inbound_rx, + }; + + // InspectorSessions for a local session is added directly to the "established" + // sessions, so it doesn't need to go through the session sender and handshake + // phase. + let inspector_session = + InspectorSession::new(self.v8_inspector.clone(), proxy); + self + .sessions + .borrow_mut() + .established + .push(inspector_session); + take(&mut self.flags.borrow_mut().waiting_for_session); + + LocalInspectorSession::new(inbound_tx, outbound_rx) + } +} + +#[derive(Default)] +struct InspectorFlags { + waiting_for_session: bool, + session_handshake_done: bool, + on_pause: bool, +} + +impl InspectorFlags { + fn new() -> RefCell { + let self_ = Self::default(); + RefCell::new(self_) + } +} + +/// A helper structure that helps coordinate sessions during different +/// parts of their lifecycle. +struct SessionContainer { + new_incoming: Pin> + 'static>>, + handshake: Option>, + established: FuturesUnordered>, +} + +impl SessionContainer { + fn new( + v8_inspector: Rc>>, + new_session_rx: UnboundedReceiver, + ) -> RefCell { + let new_incoming = new_session_rx + .map(move |session_proxy| { + InspectorSession::new(v8_inspector.clone(), session_proxy) + }) + .boxed_local(); + let self_ = Self { + new_incoming, + ..Default::default() + }; + RefCell::new(self_) + } +} + +impl Default for SessionContainer { + fn default() -> Self { + Self { + new_incoming: stream::empty().boxed_local(), + handshake: None, + established: FuturesUnordered::new(), + } + } +} + +struct InspectorWakerInner { + poll_state: PollState, + task_waker: Option, + parked_thread: Option, + inspector_ptr: Option>, + isolate_handle: v8::IsolateHandle, +} + +unsafe impl Send for InspectorWakerInner {} + +struct InspectorWaker(Mutex); + +impl InspectorWaker { + fn new(isolate_handle: v8::IsolateHandle) -> Arc { + let inner = InspectorWakerInner { + poll_state: PollState::Idle, + task_waker: None, + parked_thread: None, + inspector_ptr: None, + isolate_handle, + }; + Arc::new(Self(Mutex::new(inner))) + } + + fn update(&self, update_fn: F) -> R + where + F: FnOnce(&mut InspectorWakerInner) -> R, + { + let mut g = self.0.lock().unwrap(); + update_fn(&mut g) + } +} + +impl task::ArcWake for InspectorWaker { + fn wake_by_ref(arc_self: &Arc) { + arc_self.update(|w| { + match w.poll_state { + PollState::Idle => { + // Wake the task, if any, that has polled the Inspector future last. + if let Some(waker) = w.task_waker.take() { + waker.wake() + } + // Request an interrupt from the isolate if it's running and there's + // not unhandled interrupt request in flight. + if let Some(arg) = w + .inspector_ptr + .take() + .map(|ptr| ptr.as_ptr() as *mut c_void) + { + w.isolate_handle.request_interrupt(handle_interrupt, arg); + } + extern "C" fn handle_interrupt( + _isolate: &mut v8::Isolate, + arg: *mut c_void, + ) { + let inspector = unsafe { &*(arg as *mut JsRuntimeInspector) }; + let _ = inspector.poll_sessions(None); + } + } + PollState::Parked => { + // Unpark the isolate thread. + let parked_thread = w.parked_thread.take().unwrap(); + assert_ne!(parked_thread.id(), thread::current().id()); + parked_thread.unpark(); + } + _ => {} + }; + w.poll_state = PollState::Woken; + }); + } +} + +/// An inspector session that proxies messages to concrete "transport layer", +/// eg. Websocket or another set of channels. +struct InspectorSession { + v8_channel: v8::inspector::ChannelBase, + v8_session: Rc>>, + proxy_tx: SessionProxySender, + proxy_rx_handler: Pin + 'static>>, +} + +impl InspectorSession { + const CONTEXT_GROUP_ID: i32 = 1; + + pub fn new( + v8_inspector_rc: Rc>>, + session_proxy: SessionProxy, + ) -> Box { + new_box_with(move |self_ptr| { + let v8_channel = v8::inspector::ChannelBase::new::(); + let mut v8_inspector = v8_inspector_rc.borrow_mut(); + let v8_inspector_ptr = v8_inspector.as_mut().unwrap(); + let v8_session = Rc::new(RefCell::new(v8_inspector_ptr.connect( + Self::CONTEXT_GROUP_ID, + // Todo(piscisaureus): V8Inspector::connect() should require that + // the 'v8_channel' argument cannot move. + unsafe { &mut *self_ptr }, + v8::inspector::StringView::empty(), + ))); + + let (proxy_tx, proxy_rx) = session_proxy.split(); + let proxy_rx_handler = + Self::receive_from_proxy(v8_session.clone(), proxy_rx); + + Self { + v8_channel, + v8_session, + proxy_tx, + proxy_rx_handler, + } + }) + } + + // Dispatch message to V8 session + #[allow(unused)] + fn dispatch_message(&mut self, msg: Vec) { + let msg = v8::inspector::StringView::from(msg.as_slice()); + let mut v8_session = self.v8_session.borrow_mut(); + let v8_session_ptr = v8_session.as_mut(); + v8_session_ptr.dispatch_protocol_message(msg); + } + + // TODO(bartlomieju): this function should be reworked into `impl Future` + // or `impl Stream` + /// Returns a future that receives messages from the proxy and dispatches + /// them to the V8 session. + fn receive_from_proxy( + v8_session_rc: Rc< + RefCell>, + >, + proxy_rx: SessionProxyReceiver, + ) -> Pin + 'static>> { + async move { + let result = proxy_rx + .map_ok(move |msg| { + let msg = v8::inspector::StringView::from(msg.as_slice()); + let mut v8_session = v8_session_rc.borrow_mut(); + let v8_session_ptr = v8_session.as_mut(); + v8_session_ptr.dispatch_protocol_message(msg); + }) + .try_collect::<()>() + .await; + + // TODO(bartlomieju): ideally these prints should be moved + // to `server.rs` as they are unwanted in context of REPL/coverage collection + // but right now they do not pose a huge problem. Investigate how to + // move them to `server.rs`. + match result { + Ok(_) => eprintln!("Debugger session ended."), + Err(err) => eprintln!("Debugger session ended: {}.", err), + }; + } + .boxed_local() + } + + fn send_message( + &self, + maybe_call_id: Option, + msg: v8::UniquePtr, + ) { + let msg = msg.unwrap().string().to_string(); + let _ = self.proxy_tx.unbounded_send((maybe_call_id, msg)); + } + + pub fn break_on_next_statement(&mut self) { + let reason = v8::inspector::StringView::from(&b"debugCommand"[..]); + let detail = v8::inspector::StringView::empty(); + self + .v8_session + .borrow_mut() + .as_mut() + .schedule_pause_on_next_statement(reason, detail); + } +} + +impl v8::inspector::ChannelImpl for InspectorSession { + fn base(&self) -> &v8::inspector::ChannelBase { + &self.v8_channel + } + + fn base_mut(&mut self) -> &mut v8::inspector::ChannelBase { + &mut self.v8_channel + } + + fn send_response( + &mut self, + call_id: i32, + message: v8::UniquePtr, + ) { + self.send_message(Some(call_id), message); + } + + fn send_notification( + &mut self, + message: v8::UniquePtr, + ) { + self.send_message(None, message); + } + + fn flush_protocol_notifications(&mut self) {} +} + +impl Future for InspectorSession { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.proxy_rx_handler.poll_unpin(cx) + } +} + +/// A local inspector session that can be used to send and receive protocol messages directly on +/// the same thread as an isolate. +pub struct LocalInspectorSession { + v8_session_tx: UnboundedSender, AnyError>>, + v8_session_rx: UnboundedReceiver<(Option, String)>, + response_tx_map: HashMap>, + next_message_id: i32, + notification_queue: Vec, +} + +impl LocalInspectorSession { + pub fn new( + v8_session_tx: UnboundedSender, AnyError>>, + v8_session_rx: UnboundedReceiver<(Option, String)>, + ) -> Self { + let response_tx_map = HashMap::new(); + let next_message_id = 0; + + let notification_queue = Vec::new(); + + Self { + v8_session_tx, + v8_session_rx, + response_tx_map, + next_message_id, + notification_queue, + } + } + + pub fn notifications(&mut self) -> Vec { + self.notification_queue.split_off(0) + } + + pub async fn post_message( + &mut self, + method: &str, + params: Option, + ) -> Result { + let id = self.next_message_id; + self.next_message_id += 1; + + let (response_tx, mut response_rx) = + oneshot::channel::(); + self.response_tx_map.insert(id, response_tx); + + let message = json!({ + "id": id, + "method": method, + "params": params, + }); + + let raw_message = serde_json::to_string(&message).unwrap(); + self + .v8_session_tx + .unbounded_send(Ok(raw_message.as_bytes().to_vec())) + .unwrap(); + + loop { + let receive_fut = self.receive_from_v8_session().boxed_local(); + match select(receive_fut, &mut response_rx).await { + Either::Left(_) => continue, + Either::Right((result, _)) => { + let response = result?; + if let Some(error) = response.get("error") { + return Err(generic_error(error.to_string())); + } + + let result = response.get("result").unwrap().clone(); + return Ok(result); + } + } + } + } + + async fn receive_from_v8_session(&mut self) { + let (maybe_call_id, message) = self.v8_session_rx.next().await.unwrap(); + // If there's no call_id then it's a notification + if let Some(call_id) = maybe_call_id { + let message: serde_json::Value = match serde_json::from_str(&message) { + Ok(v) => v, + Err(error) => match error.classify() { + serde_json::error::Category::Syntax => json!({ + "id": call_id, + "result": { + "result": { + "type": "error", + "description": "Unterminated string literal", + "value": "Unterminated string literal", + }, + "exceptionDetails": { + "exceptionId": 0, + "text": "Unterminated string literal", + "lineNumber": 0, + "columnNumber": 0 + }, + }, + }), + _ => panic!("Could not parse inspector message"), + }, + }; + + self + .response_tx_map + .remove(&call_id) + .unwrap() + .send(message) + .unwrap(); + } else { + let message = serde_json::from_str(&message).unwrap(); + self.notification_queue.push(message); + } + } +} + +fn new_box_with(new_fn: impl FnOnce(*mut T) -> T) -> Box { + let b = Box::new(MaybeUninit::::uninit()); + let p = Box::into_raw(b) as *mut T; + unsafe { ptr::write(p, new_fn(p)) }; + unsafe { Box::from_raw(p) } +} diff --git a/runtime/inspector/server.rs b/runtime/inspector/server.rs new file mode 100644 index 0000000000..8cbf11d577 --- /dev/null +++ b/runtime/inspector/server.rs @@ -0,0 +1,389 @@ +// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. + +use core::convert::Infallible as Never; // Alias for the future `!` type. +use deno_core::error::AnyError; +use deno_core::futures::channel::mpsc; +use deno_core::futures::channel::mpsc::UnboundedReceiver; +use deno_core::futures::channel::mpsc::UnboundedSender; +use deno_core::futures::channel::oneshot; +use deno_core::futures::future; +use deno_core::futures::future::Future; +use deno_core::futures::pin_mut; +use deno_core::futures::prelude::*; +use deno_core::futures::select; +use deno_core::futures::stream::StreamExt; +use deno_core::futures::task::Poll; +use deno_core::serde_json; +use deno_core::serde_json::json; +use deno_core::serde_json::Value; +use deno_websocket::tokio_tungstenite::tungstenite; +use std::cell::RefCell; +use std::collections::HashMap; +use std::convert::Infallible; +use std::net::SocketAddr; +use std::process; +use std::rc::Rc; +use std::thread; +use uuid::Uuid; + +use super::SessionProxy; + +/// Websocket server that is used to proxy connections from +/// devtools to the inspector. +pub struct InspectorServer { + pub host: SocketAddr, + register_inspector_tx: UnboundedSender, + shutdown_server_tx: Option>, + thread_handle: Option>, +} + +impl InspectorServer { + pub fn new(host: SocketAddr, name: String) -> Self { + let (register_inspector_tx, register_inspector_rx) = + mpsc::unbounded::(); + + let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel(); + + let thread_handle = thread::spawn(move || { + let rt = crate::tokio_util::create_basic_runtime(); + let local = tokio::task::LocalSet::new(); + local.block_on( + &rt, + server(host, register_inspector_rx, shutdown_server_rx, name), + ) + }); + + Self { + host, + register_inspector_tx, + shutdown_server_tx: Some(shutdown_server_tx), + thread_handle: Some(thread_handle), + } + } + + pub fn register_inspector( + &self, + session_sender: UnboundedSender, + deregister_rx: oneshot::Receiver<()>, + ) { + let info = InspectorInfo::new(self.host, session_sender, deregister_rx); + self.register_inspector_tx.unbounded_send(info).unwrap(); + } +} + +impl Drop for InspectorServer { + fn drop(&mut self) { + if let Some(shutdown_server_tx) = self.shutdown_server_tx.take() { + shutdown_server_tx + .send(()) + .expect("unable to send shutdown signal"); + } + + if let Some(thread_handle) = self.thread_handle.take() { + thread_handle.join().expect("unable to join thread"); + } + } +} + +// Needed so hyper can use non Send futures +#[derive(Clone)] +struct LocalExecutor; + +impl hyper::rt::Executor for LocalExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn_local(fut); + } +} + +fn handle_ws_request( + req: http::Request, + inspector_map: Rc>>, +) -> http::Result> { + let (parts, body) = req.into_parts(); + let req = http::Request::from_parts(parts, ()); + + if let Some(new_session_tx) = req + .uri() + .path() + .strip_prefix("/ws/") + .and_then(|s| Uuid::parse_str(s).ok()) + .and_then(|uuid| { + inspector_map + .borrow() + .get(&uuid) + .map(|info| info.new_session_tx.clone()) + }) + { + let resp = tungstenite::handshake::server::create_response(&req) + .map(|resp| resp.map(|_| hyper::Body::empty())) + .or_else(|e| match e { + tungstenite::error::Error::HttpFormat(http_error) => Err(http_error), + _ => http::Response::builder() + .status(http::StatusCode::BAD_REQUEST) + .body("Not a valid Websocket Request".into()), + }); + + let (parts, _) = req.into_parts(); + let req = http::Request::from_parts(parts, body); + + if resp.is_ok() { + tokio::task::spawn_local(async move { + let upgraded = hyper::upgrade::on(req).await.unwrap(); + let websocket = + deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + tungstenite::protocol::Role::Server, + None, + ) + .await; + let (proxy, pump) = create_websocket_proxy(websocket); + eprintln!("Debugger session started."); + let _ = new_session_tx.unbounded_send(proxy); + pump.await; + }); + } + resp + } else { + http::Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body("No Valid inspector".into()) + } +} + +fn handle_json_request( + inspector_map: Rc>>, +) -> http::Result> { + let data = inspector_map + .borrow() + .values() + .map(|info| info.get_json_metadata()) + .collect::>(); + http::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&data).unwrap().into()) +} + +fn handle_json_version_request( + version_response: Value, +) -> http::Result> { + http::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&version_response).unwrap().into()) +} + +async fn server( + host: SocketAddr, + register_inspector_rx: UnboundedReceiver, + shutdown_server_rx: oneshot::Receiver<()>, + name: String, +) { + let inspector_map_ = + Rc::new(RefCell::new(HashMap::::new())); + + let inspector_map = Rc::clone(&inspector_map_); + let register_inspector_handler = register_inspector_rx + .map(|info| { + eprintln!( + "Debugger listening on {}", + info.get_websocket_debugger_url() + ); + if inspector_map.borrow_mut().insert(info.uuid, info).is_some() { + panic!("Inspector UUID already in map"); + } + }) + .collect::<()>(); + + let inspector_map = Rc::clone(&inspector_map_); + let deregister_inspector_handler = future::poll_fn(|cx| { + inspector_map + .borrow_mut() + .retain(|_, info| info.deregister_rx.poll_unpin(cx) == Poll::Pending); + Poll::::Pending + }) + .fuse(); + + let json_version_response = json!({ + "Browser": name, + "Protocol-Version": "1.3", + "V8-Version": deno_core::v8_version(), + }); + + let make_svc = hyper::service::make_service_fn(|_| { + let inspector_map = Rc::clone(&inspector_map_); + let json_version_response = json_version_response.clone(); + + future::ok::<_, Infallible>(hyper::service::service_fn( + move |req: http::Request| { + future::ready({ + match (req.method(), req.uri().path()) { + (&http::Method::GET, path) if path.starts_with("/ws/") => { + handle_ws_request(req, inspector_map.clone()) + } + (&http::Method::GET, "/json/version") => { + handle_json_version_request(json_version_response.clone()) + } + (&http::Method::GET, "/json") => { + handle_json_request(inspector_map.clone()) + } + (&http::Method::GET, "/json/list") => { + handle_json_request(inspector_map.clone()) + } + _ => http::Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body("Not Found".into()), + } + }) + }, + )) + }); + + // Create the server manually so it can use the Local Executor + let server_handler = hyper::server::Builder::new( + hyper::server::conn::AddrIncoming::bind(&host).unwrap_or_else(|e| { + eprintln!("Cannot start inspector server: {}.", e); + process::exit(1); + }), + hyper::server::conn::Http::new().with_executor(LocalExecutor), + ) + .serve(make_svc) + .with_graceful_shutdown(async { + shutdown_server_rx.await.ok(); + }) + .unwrap_or_else(|err| { + eprintln!("Cannot start inspector server: {}.", err); + process::exit(1); + }) + .fuse(); + + pin_mut!(register_inspector_handler); + pin_mut!(deregister_inspector_handler); + pin_mut!(server_handler); + + select! { + _ = register_inspector_handler => {}, + _ = deregister_inspector_handler => unreachable!(), + _ = server_handler => {}, + } +} + +/// Creates a future that proxies messages sent and received on a warp WebSocket +/// to a UnboundedSender/UnboundedReceiver pair. We need this to sidestep +/// Tokio's task budget, which causes issues when JsRuntimeInspector::poll_sessions() +/// needs to block the thread because JavaScript execution is paused. +/// +/// This works because UnboundedSender/UnboundedReceiver are implemented in the +/// 'futures' crate, therefore they can't participate in Tokio's cooperative +/// task yielding. +/// +/// A tuple is returned, where the first element is a duplex channel that can +/// be used to send/receive messages on the websocket, and the second element +/// is a future that does the forwarding. +fn create_websocket_proxy( + websocket: deno_websocket::tokio_tungstenite::WebSocketStream< + hyper::upgrade::Upgraded, + >, +) -> (SessionProxy, impl Future + Send) { + // The 'outbound' channel carries messages sent to the websocket. + let (outbound_tx, outbound_rx) = mpsc::unbounded(); + + // The 'inbound' channel carries messages received from the websocket. + let (inbound_tx, inbound_rx) = mpsc::unbounded(); + + let proxy = SessionProxy { + tx: outbound_tx, + rx: inbound_rx, + }; + + // The pump future takes care of forwarding messages between the websocket + // and channels. It resolves to () when either side disconnects, ignoring any + // errors. + let pump = async move { + let (websocket_tx, websocket_rx) = websocket.split(); + + let outbound_pump = outbound_rx + .map(|(_maybe_call_id, msg)| tungstenite::Message::text(msg)) + .map(Ok) + .forward(websocket_tx) + .map_err(|_| ()); + + let inbound_pump = websocket_rx + .map(|result| { + let result = result.map(|msg| msg.into_data()).map_err(AnyError::from); + inbound_tx.unbounded_send(result) + }) + .map_err(|_| ()) + .try_collect::<()>(); + + let _ = future::try_join(outbound_pump, inbound_pump).await; + }; + + (proxy, pump) +} + +/// Inspector information that is sent from the isolate thread to the server +/// thread when a new inspector is created. +pub struct InspectorInfo { + pub host: SocketAddr, + pub uuid: Uuid, + pub thread_name: Option, + pub new_session_tx: UnboundedSender, + pub deregister_rx: oneshot::Receiver<()>, +} + +impl InspectorInfo { + pub fn new( + host: SocketAddr, + new_session_tx: mpsc::UnboundedSender, + deregister_rx: oneshot::Receiver<()>, + ) -> Self { + Self { + host, + uuid: Uuid::new_v4(), + thread_name: thread::current().name().map(|n| n.to_owned()), + new_session_tx, + deregister_rx, + } + } + + fn get_json_metadata(&self) -> Value { + json!({ + "description": "deno", + "devtoolsFrontendUrl": self.get_frontend_url(), + "faviconUrl": "https://deno.land/favicon.ico", + "id": self.uuid.to_string(), + "title": self.get_title(), + "type": "node", + // TODO(ry): "url": "file://", + "webSocketDebuggerUrl": self.get_websocket_debugger_url(), + }) + } + + pub fn get_websocket_debugger_url(&self) -> String { + format!("ws://{}/ws/{}", &self.host, &self.uuid) + } + + fn get_frontend_url(&self) -> String { + format!( + "devtools://devtools/bundled/js_app.html?ws={}/ws/{}&experiments=true&v8only=true", + &self.host, &self.uuid + ) + } + + fn get_title(&self) -> String { + format!( + "[{}] deno{}", + process::id(), + self + .thread_name + .as_ref() + .map(|n| format!(" - {}", n)) + .unwrap_or_default() + ) + } +} diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index c2356651e8..6bb26e555c 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -1,7 +1,7 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. use crate::colors; -use crate::inspector::DenoInspector; use crate::inspector::InspectorServer; +use crate::inspector::JsRuntimeInspector; use crate::js; use crate::metrics; use crate::ops; @@ -199,7 +199,7 @@ fn create_handles( /// `WebWorker`. pub struct WebWorker { id: WorkerId, - inspector: Option>, + inspector: Option>, pub js_runtime: JsRuntime, pub name: String, internal_handle: WebWorkerInternalHandle, @@ -325,10 +325,15 @@ impl WebWorker { }); let inspector = if options.attach_inspector { - Some(DenoInspector::new( - &mut js_runtime, - options.maybe_inspector_server.clone(), - )) + let mut inspector = JsRuntimeInspector::new(&mut js_runtime); + + if let Some(server) = options.maybe_inspector_server.clone() { + let session_sender = inspector.get_session_sender(); + let deregister_rx = inspector.add_deregister_handler(); + server.register_inspector(session_sender, deregister_rx); + } + + Some(inspector) } else { None }; diff --git a/runtime/worker.rs b/runtime/worker.rs index 9ffd0b5ab1..1ffd32b482 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -1,8 +1,8 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use crate::inspector::DenoInspector; use crate::inspector::InspectorServer; -use crate::inspector::InspectorSession; +use crate::inspector::JsRuntimeInspector; +use crate::inspector::LocalInspectorSession; use crate::js; use crate::metrics; use crate::ops; @@ -13,6 +13,7 @@ use deno_core::error::Context as ErrorContext; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; use deno_core::futures::stream::StreamExt; +use deno_core::futures::Future; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url::Url; @@ -27,6 +28,7 @@ use deno_core::RuntimeOptions; use deno_file::BlobUrlStore; use log::debug; use std::env; +use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; use std::task::Context; @@ -40,7 +42,7 @@ use std::task::Poll; /// All `WebWorker`s created during program execution /// are descendants of this worker. pub struct MainWorker { - inspector: Option>, + inspector: Option>, pub js_runtime: JsRuntime, should_break_on_first_statement: bool, } @@ -147,10 +149,15 @@ impl MainWorker { }); let inspector = if options.attach_inspector { - Some(DenoInspector::new( - &mut js_runtime, - options.maybe_inspector_server.clone(), - )) + let mut inspector = JsRuntimeInspector::new(&mut js_runtime); + + if let Some(server) = options.maybe_inspector_server.clone() { + let session_sender = inspector.get_session_sender(); + let deregister_rx = inspector.add_deregister_handler(); + server.register_inspector(session_sender, deregister_rx); + } + + Some(inspector) } else { None }; @@ -243,10 +250,9 @@ impl MainWorker { /// Create new inspector session. This function panics if Worker /// was not configured to create inspector. - pub fn create_inspector_session(&mut self) -> Box { - let inspector = self.inspector.as_mut().unwrap(); - - InspectorSession::new(&mut **inspector) + pub async fn create_inspector_session(&mut self) -> LocalInspectorSession { + let inspector = self.inspector.as_ref().unwrap(); + inspector.create_local_session() } pub fn poll_event_loop( @@ -261,6 +267,28 @@ impl MainWorker { pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { poll_fn(|cx| self.poll_event_loop(cx)).await } + + /// A utility function that runs provided future concurrently with the event loop. + /// + /// Useful when using a local inspector session. + pub async fn with_event_loop<'a, T>( + &mut self, + mut fut: Pin + 'a>>, + ) -> T { + loop { + tokio::select! { + result = &mut fut => { + return result; + } + _ = self.run_event_loop() => { + // A zero delay is long enough to yield the thread in order to prevent the loop from + // running hot for messages that are taking longer to resolve like for example an + // evaluation of top level await. + tokio::time::sleep(tokio::time::Duration::from_millis(0)).await; + } + }; + } + } } impl Drop for MainWorker {