From b1c2d219353edafa2bdc95ac2b4dbab5d4b7f459 Mon Sep 17 00:00:00 2001 From: Liam Perlaki Date: Mon, 21 Dec 2020 16:29:50 +0100 Subject: [PATCH] refactor(runtime): remove warp dependency (#8813) This commit replaces the "warp" web server in the "deno_runtime" crate with a "hyper" server and a "tokio-tungstenite" websocket implementation. --- Cargo.lock | 16 +++- runtime/Cargo.toml | 6 +- runtime/inspector.rs | 224 +++++++++++++++++++++++++++++-------------- 3 files changed, 162 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c760e19a85..367ec806e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -592,6 +592,7 @@ dependencies = [ "filetime", "fwdansi", "http", + "hyper", "indexmap", "lazy_static", "libc", @@ -612,7 +613,6 @@ dependencies = [ "tokio-rustls", "tokio-tungstenite", "uuid", - "warp", "webpki", "webpki-roots", "winapi 0.3.9", @@ -1146,6 +1146,12 @@ version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" +[[package]] +name = "httpdate" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" + [[package]] name = "humantime" version = "1.3.0" @@ -1157,9 +1163,9 @@ dependencies = [ [[package]] name = "hyper" -version = "0.13.7" +version = "0.13.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e68a8dd9716185d9e64ea473ea6ef63529252e3e27623295a0378a19665d5eb" +checksum = "f6ad767baac13b44d4529fcf58ba2cd0995e36e7b435bc5b039de6f47e880dbf" dependencies = [ "bytes 0.5.6", "futures-channel", @@ -1169,10 +1175,10 @@ dependencies = [ "http", "http-body", "httparse", + "httpdate", "itoa", - "pin-project 0.4.23", + "pin-project 1.0.2", "socket2", - "time", "tokio 0.2.22", "tower-service", "tracing", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 1e5c9511fd..c62bce9947 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -55,13 +55,9 @@ sys-info = "0.7.0" termcolor = "1.1.0" tokio = { version = "0.2.22", features = ["full"] } tokio-rustls = "0.14.1" -# Keep in-sync with warp. tokio-tungstenite = "0.11.0" uuid = { version = "0.8.1", features = ["v4"] } -# TODO(bartlomieju): remove dependency on warp, it's only used -# for a WebSocket server in inspector.rs -# Keep in-sync with tokio-tungestenite. -warp = { version = "0.2.5", features = ["tls"] } +hyper = "0.13.9" webpki = "0.21.3" webpki-roots = "=0.19.0" # Pinned to v0.19.0 to match 'reqwest'. diff --git a/runtime/inspector.rs b/runtime/inspector.rs index fc0e793d9d..58df05c54a 100644 --- a/runtime/inspector.rs +++ b/runtime/inspector.rs @@ -11,7 +11,7 @@ use deno_core::futures::channel::mpsc; use deno_core::futures::channel::mpsc::UnboundedReceiver; use deno_core::futures::channel::mpsc::UnboundedSender; use deno_core::futures::channel::oneshot; -use deno_core::futures::future::Future; +use deno_core::futures::future::{self, Future}; use deno_core::futures::pin_mut; use deno_core::futures::prelude::*; use deno_core::futures::select; @@ -23,8 +23,6 @@ use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; use deno_core::v8; -use std::cell::BorrowMutError; -use std::cell::RefCell; use std::collections::HashMap; use std::ffi::c_void; use std::mem::replace; @@ -40,9 +38,10 @@ use std::ptr::NonNull; use std::sync::Arc; use std::sync::Mutex; use std::thread; +use std::{cell::BorrowMutError, convert::Infallible}; +use std::{cell::RefCell, rc::Rc}; +use tokio_tungstenite::tungstenite; use uuid::Uuid; -use warp::filters::ws; -use warp::Filter; pub struct InspectorServer { pub host: SocketAddr, @@ -59,12 +58,12 @@ impl InspectorServer { let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel(); let thread_handle = thread::spawn(move || { - crate::tokio_util::run_basic(server( - host, - register_inspector_rx, - shutdown_server_rx, - name, - )) + let mut rt = crate::tokio_util::create_basic_runtime(); + let local = tokio::task::LocalSet::new(); + local.block_on( + &mut rt, + server(host, register_inspector_rx, shutdown_server_rx, name), + ) }); Self { @@ -142,95 +141,172 @@ impl InspectorInfo { } } +// Needed so hyper can use non Send futures +#[derive(Clone)] +struct LocalExecutor; + +impl hyper::rt::Executor for LocalExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn_local(fut); + } +} + +fn handle_ws_request( + req: http::Request, + inspector_map: Rc>>, +) -> http::Result> { + let (parts, body) = req.into_parts(); + let req = http::Request::from_parts(parts, ()); + + if let Some(new_websocket_tx) = req + .uri() + .path() + .strip_prefix("/ws/") + .and_then(|s| Uuid::parse_str(s).ok()) + .and_then(|uuid| { + inspector_map + .borrow() + .get(&uuid) + .map(|info| info.new_websocket_tx.clone()) + }) + { + let resp = tungstenite::handshake::server::create_response(&req) + .map(|resp| resp.map(|_| hyper::Body::empty())) + .or_else(|e| match e { + tungstenite::error::Error::HttpFormat(http_error) => Err(http_error), + _ => http::Response::builder() + .status(http::StatusCode::BAD_REQUEST) + .body("Not a valid Websocket Request".into()), + }); + tokio::task::spawn_local(async move { + let upgraded = body.on_upgrade().await.unwrap(); + let websocket = tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + tungstenite::protocol::Role::Server, + None, + ) + .await; + let (proxy, pump) = create_websocket_proxy(websocket); + + let _ = new_websocket_tx.unbounded_send(proxy); + pump.await; + }); + + resp + } else { + http::Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body("No Valid inspector".into()) + } +} + +fn handle_json_request( + inspector_map: Rc>>, +) -> http::Result> { + let data = inspector_map + .borrow() + .values() + .map(|info| info.get_json_metadata()) + .collect::>(); + http::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&data).unwrap().into()) +} + +fn handle_json_version_request( + version_response: Value, +) -> http::Result> { + http::Response::builder() + .status(http::StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&version_response).unwrap().into()) +} + async fn server( host: SocketAddr, register_inspector_rx: UnboundedReceiver, shutdown_server_rx: oneshot::Receiver<()>, name: String, ) { - // TODO: put the `inspector_map` in an `Rc>` instead. This is - // currently not possible because warp requires all filters to implement - // `Send`, which should not be necessary because we are using the - // single-threaded Tokio runtime. - let inspector_map = HashMap::::new(); - let inspector_map = Arc::new(Mutex::new(inspector_map)); + let inspector_map_ = + Rc::new(RefCell::new(HashMap::::new())); - let inspector_map_ = inspector_map.clone(); + let inspector_map = Rc::clone(&inspector_map_); let register_inspector_handler = register_inspector_rx .map(|info| { eprintln!( "Debugger listening on {}", info.get_websocket_debugger_url() ); - let mut g = inspector_map_.lock().unwrap(); - if g.insert(info.uuid, info).is_some() { + if inspector_map.borrow_mut().insert(info.uuid, info).is_some() { panic!("Inspector UUID already in map"); } }) .collect::<()>(); - let inspector_map_ = inspector_map_.clone(); + let inspector_map = Rc::clone(&inspector_map_); let deregister_inspector_handler = future::poll_fn(|cx| { - let mut g = inspector_map_.lock().unwrap(); - g.retain(|_, info| info.canary_rx.poll_unpin(cx) == Poll::Pending); + inspector_map + .borrow_mut() + .retain(|_, info| info.canary_rx.poll_unpin(cx) == Poll::Pending); Poll::::Pending }) .fuse(); - let inspector_map_ = inspector_map.clone(); - let websocket_route = warp::path("ws") - .and(warp::path::param()) - .and(warp::ws()) - .and_then(move |uuid: String, ws: warp::ws::Ws| { - future::ready( - Uuid::parse_str(&uuid) - .ok() - .and_then(|uuid| { - let g = inspector_map_.lock().unwrap(); - g.get(&uuid).map(|info| info.new_websocket_tx.clone()).map( - |new_websocket_tx| { - ws.on_upgrade(move |websocket| async move { - let (proxy, pump) = create_websocket_proxy(websocket); - let _ = new_websocket_tx.unbounded_send(proxy); - pump.await; - }) - }, - ) - }) - .ok_or_else(warp::reject::not_found), - ) - }); - let json_version_response = json!({ "Browser": name, "Protocol-Version": "1.3", "V8-Version": deno_core::v8_version(), }); - let json_version_route = warp::path!("json" / "version") - .map(move || warp::reply::json(&json_version_response)); - let inspector_map_ = inspector_map.clone(); - let json_list_route = warp::path("json").map(move || { - let g = inspector_map_.lock().unwrap(); - let json_values = g - .values() - .map(|info| info.get_json_metadata()) - .collect::>(); - warp::reply::json(&json!(json_values)) + let make_svc = hyper::service::make_service_fn(|_| { + let inspector_map = Rc::clone(&inspector_map_); + let json_version_response = json_version_response.clone(); + + future::ok::<_, Infallible>(hyper::service::service_fn( + move |req: http::Request| { + future::ready({ + match (req.method(), req.uri().path()) { + (&http::Method::GET, path) if path.starts_with("/ws/") => { + handle_ws_request(req, inspector_map.clone()) + } + (&http::Method::GET, "/json") => { + handle_json_request(inspector_map.clone()) + } + (&http::Method::GET, "/json/version") => { + handle_json_version_request(json_version_response.clone()) + } + _ => http::Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body("Not Found".into()), + } + }) + }, + )) }); - let server_routes = - websocket_route.or(json_version_route).or(json_list_route); - let server_handler = warp::serve(server_routes) - .try_bind_with_graceful_shutdown(host, async { - shutdown_server_rx.await.ok(); - }) - .map(|(_, fut)| fut) - .unwrap_or_else(|err| { - eprintln!("Cannot start inspector server: {}.", err); + // Create the server manually so it can use the Local Executor + let server_handler = hyper::server::Builder::new( + hyper::server::conn::AddrIncoming::bind(&host).unwrap_or_else(|e| { + eprintln!("Cannot start inspector server: {}.", e); process::exit(1); - }) - .fuse(); + }), + hyper::server::conn::Http::new().with_executor(LocalExecutor), + ) + .serve(make_svc) + .with_graceful_shutdown(async { + shutdown_server_rx.await.ok(); + }) + .unwrap_or_else(|err| { + eprintln!("Cannot start inspector server: {}.", err); + process::exit(1); + }) + .fuse(); pin_mut!(register_inspector_handler); pin_mut!(deregister_inspector_handler); @@ -243,9 +319,9 @@ async fn server( } } -type WebSocketProxySender = UnboundedSender; +type WebSocketProxySender = UnboundedSender; type WebSocketProxyReceiver = - UnboundedReceiver>; + UnboundedReceiver>; /// Encapsulates an UnboundedSender/UnboundedReceiver pair that together form /// a duplex channel for sending/receiving websocket messages. @@ -273,7 +349,7 @@ impl WebSocketProxy { /// be used to send/receive messages on the websocket, and the second element /// is a future that does the forwarding. fn create_websocket_proxy( - websocket: ws::WebSocket, + websocket: tokio_tungstenite::WebSocketStream, ) -> (WebSocketProxy, impl Future + Send) { // The 'outbound' channel carries messages sent to the websocket. let (outbound_tx, outbound_rx) = mpsc::unbounded(); @@ -759,8 +835,8 @@ impl DenoInspectorSession { let result = websocket_rx .map_ok(move |msg| { - let msg = msg.as_bytes(); - let msg = v8::inspector::StringView::from(msg); + let msg = msg.into_data(); + let msg = v8::inspector::StringView::from(msg.as_slice()); unsafe { &mut *self_ptr }.dispatch_protocol_message(msg); }) .try_collect::<()>() @@ -776,7 +852,7 @@ impl DenoInspectorSession { fn send_to_websocket(&self, msg: v8::UniquePtr) { let msg = msg.unwrap().string().to_string(); - let msg = ws::Message::text(msg); + let msg = tungstenite::Message::text(msg); let _ = self.websocket_tx.unbounded_send(msg); }