mirror of
https://github.com/denoland/deno.git
synced 2024-12-22 07:14:47 -05:00
refactor: upgrade inspector server to Hyper 1.1 (#21599)
This commit rewrites "runtime/inspector_server.rs" to use Hyper 1.1. Now "deno_runtime" crate depends on both Hyper 1.x and 0.y versions.
This commit is contained in:
parent
140e8becd0
commit
36536c784c
3 changed files with 185 additions and 111 deletions
32
Cargo.lock
generated
32
Cargo.lock
generated
|
@ -958,7 +958,7 @@ dependencies = [
|
|||
"env_logger",
|
||||
"eszip",
|
||||
"fancy-regex",
|
||||
"fastwebsockets",
|
||||
"fastwebsockets 0.5.0",
|
||||
"flaky_test",
|
||||
"flate2",
|
||||
"fs3",
|
||||
|
@ -1612,13 +1612,18 @@ dependencies = [
|
|||
"deno_webstorage",
|
||||
"dlopen2",
|
||||
"encoding_rs",
|
||||
"fastwebsockets",
|
||||
"fastwebsockets 0.5.0",
|
||||
"fastwebsockets 0.6.0",
|
||||
"filetime",
|
||||
"flate2",
|
||||
"fs3",
|
||||
"fwdansi",
|
||||
"http 0.2.11",
|
||||
"http 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 0.14.27",
|
||||
"hyper 1.1.0",
|
||||
"hyper-util",
|
||||
"libc",
|
||||
"log",
|
||||
"netif",
|
||||
|
@ -1765,7 +1770,7 @@ dependencies = [
|
|||
"deno_core",
|
||||
"deno_net",
|
||||
"deno_tls",
|
||||
"fastwebsockets",
|
||||
"fastwebsockets 0.5.0",
|
||||
"h2 0.3.22",
|
||||
"http 0.2.11",
|
||||
"hyper 0.14.27",
|
||||
|
@ -2353,6 +2358,25 @@ dependencies = [
|
|||
"utf-8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fastwebsockets"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f63dd7b57f9b33b1741fa631c9522eb35d43e96dcca4a6a91d5e4ca7c93acdc1"
|
||||
dependencies = [
|
||||
"base64 0.21.5",
|
||||
"http-body-util",
|
||||
"hyper 1.1.0",
|
||||
"hyper-util",
|
||||
"pin-project",
|
||||
"rand",
|
||||
"sha1",
|
||||
"simdutf8",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"utf-8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fd-lock"
|
||||
version = "4.0.1"
|
||||
|
@ -5958,7 +5982,7 @@ dependencies = [
|
|||
"bytes",
|
||||
"console_static_text",
|
||||
"denokv_proto",
|
||||
"fastwebsockets",
|
||||
"fastwebsockets 0.5.0",
|
||||
"flate2",
|
||||
"futures",
|
||||
"glob",
|
||||
|
|
|
@ -98,10 +98,15 @@ fastwebsockets.workspace = true
|
|||
console_static_text.workspace = true
|
||||
dlopen2.workspace = true
|
||||
encoding_rs.workspace = true
|
||||
fastwebsockets_06 = { package = "fastwebsockets", version = "0.6", features = ["upgrade"] }
|
||||
filetime = "0.2.16"
|
||||
fs3.workspace = true
|
||||
http.workspace = true
|
||||
http-body-util = "0.1"
|
||||
http_1 = { package = "http", version = "1.0" }
|
||||
hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] }
|
||||
hyper-util = { version = "0.1", features = ["server", "server-auto"] }
|
||||
hyper1 = { package = "hyper", version = "1.0.1", features = ["server"] }
|
||||
libc.workspace = true
|
||||
log.workspace = true
|
||||
netif = "0.1.6"
|
||||
|
|
|
@ -7,7 +7,6 @@ use deno_core::futures::channel::mpsc::UnboundedReceiver;
|
|||
use deno_core::futures::channel::mpsc::UnboundedSender;
|
||||
use deno_core::futures::channel::oneshot;
|
||||
use deno_core::futures::future;
|
||||
use deno_core::futures::future::Future;
|
||||
use deno_core::futures::prelude::*;
|
||||
use deno_core::futures::select;
|
||||
use deno_core::futures::stream::StreamExt;
|
||||
|
@ -20,17 +19,20 @@ use deno_core::url::Url;
|
|||
use deno_core::InspectorMsg;
|
||||
use deno_core::InspectorSessionProxy;
|
||||
use deno_core::JsRuntime;
|
||||
use fastwebsockets::Frame;
|
||||
use fastwebsockets::OpCode;
|
||||
use fastwebsockets::WebSocket;
|
||||
use fastwebsockets_06::Frame;
|
||||
use fastwebsockets_06::OpCode;
|
||||
use fastwebsockets_06::WebSocket;
|
||||
use hyper::body::Bytes;
|
||||
use hyper_util::rt::TokioIo;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::pin;
|
||||
use std::process;
|
||||
use std::rc::Rc;
|
||||
use std::thread;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::broadcast;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Websocket server that is used to proxy connections from
|
||||
|
@ -38,7 +40,7 @@ use uuid::Uuid;
|
|||
pub struct InspectorServer {
|
||||
pub host: SocketAddr,
|
||||
register_inspector_tx: UnboundedSender<InspectorInfo>,
|
||||
shutdown_server_tx: Option<oneshot::Sender<()>>,
|
||||
shutdown_server_tx: Option<broadcast::Sender<()>>,
|
||||
thread_handle: Option<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
|
@ -47,7 +49,7 @@ impl InspectorServer {
|
|||
let (register_inspector_tx, register_inspector_rx) =
|
||||
mpsc::unbounded::<InspectorInfo>();
|
||||
|
||||
let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel();
|
||||
let (shutdown_server_tx, shutdown_server_rx) = broadcast::channel(1);
|
||||
|
||||
let thread_handle = thread::spawn(move || {
|
||||
let rt = crate::tokio_util::create_basic_runtime();
|
||||
|
@ -101,26 +103,12 @@ impl Drop for InspectorServer {
|
|||
}
|
||||
}
|
||||
|
||||
// Needed so hyper can use non Send futures
|
||||
#[derive(Clone)]
|
||||
struct LocalExecutor;
|
||||
|
||||
impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
|
||||
where
|
||||
Fut: Future + 'static,
|
||||
Fut::Output: 'static,
|
||||
{
|
||||
fn execute(&self, fut: Fut) {
|
||||
deno_core::unsync::spawn(fut);
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_ws_request(
|
||||
req: http::Request<hyper::Body>,
|
||||
req: http_1::Request<hyper1::body::Incoming>,
|
||||
inspector_map_rc: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
|
||||
) -> http::Result<http::Response<hyper::Body>> {
|
||||
) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> {
|
||||
let (parts, body) = req.into_parts();
|
||||
let req = http::Request::from_parts(parts, ());
|
||||
let req = http_1::Request::from_parts(parts, ());
|
||||
|
||||
let maybe_uuid = req
|
||||
.uri()
|
||||
|
@ -129,9 +117,9 @@ fn handle_ws_request(
|
|||
.and_then(|s| Uuid::parse_str(s).ok());
|
||||
|
||||
if maybe_uuid.is_none() {
|
||||
return http::Response::builder()
|
||||
.status(http::StatusCode::BAD_REQUEST)
|
||||
.body("Malformed inspector UUID".into());
|
||||
return http_1::Response::builder()
|
||||
.status(http_1::StatusCode::BAD_REQUEST)
|
||||
.body(Box::new(Bytes::from("Malformed inspector UUID").into()));
|
||||
}
|
||||
|
||||
// run in a block to not hold borrow to `inspector_map` for too long
|
||||
|
@ -140,34 +128,47 @@ fn handle_ws_request(
|
|||
let maybe_inspector_info = inspector_map.get(&maybe_uuid.unwrap());
|
||||
|
||||
if maybe_inspector_info.is_none() {
|
||||
return http::Response::builder()
|
||||
.status(http::StatusCode::NOT_FOUND)
|
||||
.body("Invalid inspector UUID".into());
|
||||
return http_1::Response::builder()
|
||||
.status(http_1::StatusCode::NOT_FOUND)
|
||||
.body(Box::new(Bytes::from("Invalid inspector UUID").into()));
|
||||
}
|
||||
|
||||
let info = maybe_inspector_info.unwrap();
|
||||
info.new_session_tx.clone()
|
||||
};
|
||||
let (parts, _) = req.into_parts();
|
||||
let mut req = http::Request::from_parts(parts, body);
|
||||
let mut req = http_1::Request::from_parts(parts, body);
|
||||
|
||||
let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) {
|
||||
Ok(e) => e,
|
||||
let (resp, fut) = match fastwebsockets_06::upgrade::upgrade(&mut req) {
|
||||
Ok((resp, fut)) => {
|
||||
let (parts, _body) = resp.into_parts();
|
||||
let resp = http_1::Response::from_parts(
|
||||
parts,
|
||||
Box::new(http_body_util::Full::new(Bytes::new())),
|
||||
);
|
||||
(resp, fut)
|
||||
}
|
||||
_ => {
|
||||
return http::Response::builder()
|
||||
.status(http::StatusCode::BAD_REQUEST)
|
||||
.body("Not a valid Websocket Request".into());
|
||||
return http_1::Response::builder()
|
||||
.status(http_1::StatusCode::BAD_REQUEST)
|
||||
.body(Box::new(
|
||||
Bytes::from("Not a valid Websocket Request").into(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
// spawn a task that will wait for websocket connection and then pump messages between
|
||||
// the socket and inspector proxy
|
||||
spawn(async move {
|
||||
let websocket = if let Ok(w) = fut.await {
|
||||
w
|
||||
} else {
|
||||
eprintln!("Inspector server failed to upgrade to WS connection");
|
||||
return;
|
||||
let websocket = match fut.await {
|
||||
Ok(w) => w,
|
||||
Err(err) => {
|
||||
eprintln!(
|
||||
"Inspector server failed to upgrade to WS connection: {:?}",
|
||||
err
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// The 'outbound' channel carries messages sent to the websocket.
|
||||
|
@ -191,31 +192,37 @@ fn handle_ws_request(
|
|||
fn handle_json_request(
|
||||
inspector_map: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
|
||||
host: Option<String>,
|
||||
) -> http::Result<http::Response<hyper::Body>> {
|
||||
) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> {
|
||||
let data = inspector_map
|
||||
.borrow()
|
||||
.values()
|
||||
.map(move |info| info.get_json_metadata(&host))
|
||||
.collect::<Vec<_>>();
|
||||
http::Response::builder()
|
||||
.status(http::StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, "application/json")
|
||||
.body(serde_json::to_string(&data).unwrap().into())
|
||||
let body: http_body_util::Full<Bytes> =
|
||||
Bytes::from(serde_json::to_string(&data).unwrap()).into();
|
||||
http_1::Response::builder()
|
||||
.status(http_1::StatusCode::OK)
|
||||
.header(http_1::header::CONTENT_TYPE, "application/json")
|
||||
.body(Box::new(body))
|
||||
}
|
||||
|
||||
fn handle_json_version_request(
|
||||
version_response: Value,
|
||||
) -> http::Result<http::Response<hyper::Body>> {
|
||||
http::Response::builder()
|
||||
.status(http::StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, "application/json")
|
||||
.body(serde_json::to_string(&version_response).unwrap().into())
|
||||
) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> {
|
||||
let body = Box::new(http_body_util::Full::from(
|
||||
serde_json::to_string(&version_response).unwrap(),
|
||||
));
|
||||
|
||||
http_1::Response::builder()
|
||||
.status(http_1::StatusCode::OK)
|
||||
.header(http_1::header::CONTENT_TYPE, "application/json")
|
||||
.body(body)
|
||||
}
|
||||
|
||||
async fn server(
|
||||
host: SocketAddr,
|
||||
register_inspector_rx: UnboundedReceiver<InspectorInfo>,
|
||||
shutdown_server_rx: oneshot::Receiver<()>,
|
||||
shutdown_server_rx: broadcast::Receiver<()>,
|
||||
name: &str,
|
||||
) {
|
||||
let inspector_map_ =
|
||||
|
@ -253,61 +260,99 @@ async fn server(
|
|||
"V8-Version": deno_core::v8_version(),
|
||||
});
|
||||
|
||||
let make_svc = hyper::service::make_service_fn(|_| {
|
||||
let inspector_map = Rc::clone(&inspector_map_);
|
||||
let json_version_response = json_version_response.clone();
|
||||
|
||||
future::ok::<_, Infallible>(hyper::service::service_fn(
|
||||
move |req: http::Request<hyper::Body>| {
|
||||
future::ready({
|
||||
// If the host header can make a valid URL, use it
|
||||
let host = req
|
||||
.headers()
|
||||
.get("host")
|
||||
.and_then(|host| host.to_str().ok())
|
||||
.and_then(|host| Url::parse(&format!("http://{host}")).ok())
|
||||
.and_then(|url| match (url.host(), url.port()) {
|
||||
(Some(host), Some(port)) => Some(format!("{host}:{port}")),
|
||||
(Some(host), None) => Some(format!("{host}")),
|
||||
_ => None,
|
||||
});
|
||||
match (req.method(), req.uri().path()) {
|
||||
(&http::Method::GET, path) if path.starts_with("/ws/") => {
|
||||
handle_ws_request(req, Rc::clone(&inspector_map))
|
||||
}
|
||||
(&http::Method::GET, "/json/version") => {
|
||||
handle_json_version_request(json_version_response.clone())
|
||||
}
|
||||
(&http::Method::GET, "/json") => {
|
||||
handle_json_request(Rc::clone(&inspector_map), host)
|
||||
}
|
||||
(&http::Method::GET, "/json/list") => {
|
||||
handle_json_request(Rc::clone(&inspector_map), host)
|
||||
}
|
||||
_ => http::Response::builder()
|
||||
.status(http::StatusCode::NOT_FOUND)
|
||||
.body("Not Found".into()),
|
||||
}
|
||||
})
|
||||
},
|
||||
))
|
||||
});
|
||||
|
||||
// Create the server manually so it can use the Local Executor
|
||||
let mut server_handler = pin!(hyper::server::Builder::new(
|
||||
hyper::server::conn::AddrIncoming::bind(&host).unwrap_or_else(|e| {
|
||||
eprintln!("Cannot start inspector server: {e}.");
|
||||
process::exit(1);
|
||||
}),
|
||||
hyper::server::conn::Http::new().with_executor(LocalExecutor),
|
||||
)
|
||||
.serve(make_svc)
|
||||
.with_graceful_shutdown(async {
|
||||
shutdown_server_rx.await.ok();
|
||||
})
|
||||
.unwrap_or_else(|err| {
|
||||
eprintln!("Cannot start inspector server: {err}.");
|
||||
process::exit(1);
|
||||
let listener = match TcpListener::bind(&host).await {
|
||||
Ok(l) => l,
|
||||
Err(err) => {
|
||||
eprintln!("Cannot start inspector server: {:?}", err);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut server_handler = pin!(deno_core::unsync::spawn(async move {
|
||||
loop {
|
||||
let mut rx = shutdown_server_rx.resubscribe();
|
||||
let mut shutdown_rx = pin!(rx.recv());
|
||||
let mut accept = pin!(listener.accept());
|
||||
|
||||
let stream = tokio::select! {
|
||||
accept_result = &mut accept => {
|
||||
match accept_result {
|
||||
Ok((s, _)) => s,
|
||||
Err(err) => {
|
||||
eprintln!("Failed to accept inspector connection: {:?}", err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
_ = &mut shutdown_rx => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
let io = TokioIo::new(stream);
|
||||
|
||||
let inspector_map = Rc::clone(&inspector_map_);
|
||||
let json_version_response = json_version_response.clone();
|
||||
let mut shutdown_server_rx = shutdown_server_rx.resubscribe();
|
||||
|
||||
let service = hyper1::service::service_fn(
|
||||
move |req: http_1::Request<hyper1::body::Incoming>| {
|
||||
future::ready({
|
||||
// If the host header can make a valid URL, use it
|
||||
let host = req
|
||||
.headers()
|
||||
.get("host")
|
||||
.and_then(|host| host.to_str().ok())
|
||||
.and_then(|host| Url::parse(&format!("http://{host}")).ok())
|
||||
.and_then(|url| match (url.host(), url.port()) {
|
||||
(Some(host), Some(port)) => Some(format!("{host}:{port}")),
|
||||
(Some(host), None) => Some(format!("{host}")),
|
||||
_ => None,
|
||||
});
|
||||
match (req.method(), req.uri().path()) {
|
||||
(&http_1::Method::GET, path) if path.starts_with("/ws/") => {
|
||||
handle_ws_request(req, Rc::clone(&inspector_map))
|
||||
}
|
||||
(&http_1::Method::GET, "/json/version") => {
|
||||
handle_json_version_request(json_version_response.clone())
|
||||
}
|
||||
(&http_1::Method::GET, "/json") => {
|
||||
handle_json_request(Rc::clone(&inspector_map), host)
|
||||
}
|
||||
(&http_1::Method::GET, "/json/list") => {
|
||||
handle_json_request(Rc::clone(&inspector_map), host)
|
||||
}
|
||||
_ => http_1::Response::builder()
|
||||
.status(http_1::StatusCode::NOT_FOUND)
|
||||
.body(Box::new(http_body_util::Full::new(Bytes::from(
|
||||
"Not Found",
|
||||
)))),
|
||||
}
|
||||
})
|
||||
},
|
||||
);
|
||||
|
||||
deno_core::unsync::spawn(async move {
|
||||
let server = hyper1::server::conn::http1::Builder::new();
|
||||
|
||||
let mut conn =
|
||||
pin!(server.serve_connection(io, service).with_upgrades());
|
||||
let mut shutdown_rx = pin!(shutdown_server_rx.recv());
|
||||
|
||||
tokio::select! {
|
||||
result = conn.as_mut() => {
|
||||
if let Err(err) = result {
|
||||
eprintln!("Failed to serve connection: {:?}", err);
|
||||
}
|
||||
},
|
||||
_ = &mut shutdown_rx => {
|
||||
conn.as_mut().graceful_shutdown();
|
||||
let _ = conn.await;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
})
|
||||
.fuse());
|
||||
|
||||
|
@ -331,7 +376,7 @@ async fn server(
|
|||
/// 'futures' crate, therefore they can't participate in Tokio's cooperative
|
||||
/// task yielding.
|
||||
async fn pump_websocket_messages(
|
||||
mut websocket: WebSocket<hyper::upgrade::Upgraded>,
|
||||
mut websocket: WebSocket<TokioIo<hyper1::upgrade::Upgraded>>,
|
||||
inbound_tx: UnboundedSender<String>,
|
||||
mut outbound_rx: UnboundedReceiver<InspectorMsg>,
|
||||
) {
|
||||
|
|
Loading…
Reference in a new issue