1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-22 15:06:54 -05:00

refactor: upgrade inspector server to Hyper 1.1 (#21599)

This commit rewrites "runtime/inspector_server.rs" to use Hyper 1.1.

Now "deno_runtime" crate depends on both Hyper 1.x and 0.y versions.
This commit is contained in:
Bartek Iwańczuk 2023-12-23 16:46:09 +01:00
parent 16550f0825
commit ff8abc50f9
No known key found for this signature in database
GPG key ID: 0C6BCDDC3B3AD750
3 changed files with 185 additions and 111 deletions

32
Cargo.lock generated
View file

@ -958,7 +958,7 @@ dependencies = [
"env_logger",
"eszip",
"fancy-regex",
"fastwebsockets",
"fastwebsockets 0.5.0",
"flaky_test",
"flate2",
"fs3",
@ -1612,13 +1612,18 @@ dependencies = [
"deno_webstorage",
"dlopen2",
"encoding_rs",
"fastwebsockets",
"fastwebsockets 0.5.0",
"fastwebsockets 0.6.0",
"filetime",
"flate2",
"fs3",
"fwdansi",
"http 0.2.11",
"http 1.0.0",
"http-body-util",
"hyper 0.14.27",
"hyper 1.1.0",
"hyper-util",
"libc",
"log",
"netif",
@ -1765,7 +1770,7 @@ dependencies = [
"deno_core",
"deno_net",
"deno_tls",
"fastwebsockets",
"fastwebsockets 0.5.0",
"h2 0.3.22",
"http 0.2.11",
"hyper 0.14.27",
@ -2353,6 +2358,25 @@ dependencies = [
"utf-8",
]
[[package]]
name = "fastwebsockets"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63dd7b57f9b33b1741fa631c9522eb35d43e96dcca4a6a91d5e4ca7c93acdc1"
dependencies = [
"base64 0.21.5",
"http-body-util",
"hyper 1.1.0",
"hyper-util",
"pin-project",
"rand",
"sha1",
"simdutf8",
"thiserror",
"tokio",
"utf-8",
]
[[package]]
name = "fd-lock"
version = "4.0.1"
@ -5958,7 +5982,7 @@ dependencies = [
"bytes",
"console_static_text",
"denokv_proto",
"fastwebsockets",
"fastwebsockets 0.5.0",
"flate2",
"futures",
"glob",

View file

@ -98,10 +98,15 @@ fastwebsockets.workspace = true
console_static_text.workspace = true
dlopen2.workspace = true
encoding_rs.workspace = true
fastwebsockets_06 = { package = "fastwebsockets", version = "0.6", features = ["upgrade"] }
filetime = "0.2.16"
fs3.workspace = true
http.workspace = true
http-body-util = "0.1"
http_1 = { package = "http", version = "1.0" }
hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] }
hyper-util = { version = "0.1", features = ["server", "server-auto"] }
hyper1 = { package = "hyper", version = "1.0.1", features = ["server"] }
libc.workspace = true
log.workspace = true
netif = "0.1.6"

View file

@ -7,7 +7,6 @@ use deno_core::futures::channel::mpsc::UnboundedReceiver;
use deno_core::futures::channel::mpsc::UnboundedSender;
use deno_core::futures::channel::oneshot;
use deno_core::futures::future;
use deno_core::futures::future::Future;
use deno_core::futures::prelude::*;
use deno_core::futures::select;
use deno_core::futures::stream::StreamExt;
@ -20,17 +19,20 @@ use deno_core::url::Url;
use deno_core::InspectorMsg;
use deno_core::InspectorSessionProxy;
use deno_core::JsRuntime;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
use fastwebsockets::WebSocket;
use fastwebsockets_06::Frame;
use fastwebsockets_06::OpCode;
use fastwebsockets_06::WebSocket;
use hyper::body::Bytes;
use hyper_util::rt::TokioIo;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::pin::pin;
use std::process;
use std::rc::Rc;
use std::thread;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use uuid::Uuid;
/// Websocket server that is used to proxy connections from
@ -38,7 +40,7 @@ use uuid::Uuid;
pub struct InspectorServer {
pub host: SocketAddr,
register_inspector_tx: UnboundedSender<InspectorInfo>,
shutdown_server_tx: Option<oneshot::Sender<()>>,
shutdown_server_tx: Option<broadcast::Sender<()>>,
thread_handle: Option<thread::JoinHandle<()>>,
}
@ -47,7 +49,7 @@ impl InspectorServer {
let (register_inspector_tx, register_inspector_rx) =
mpsc::unbounded::<InspectorInfo>();
let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel();
let (shutdown_server_tx, shutdown_server_rx) = broadcast::channel(1);
let thread_handle = thread::spawn(move || {
let rt = crate::tokio_util::create_basic_runtime();
@ -101,26 +103,12 @@ impl Drop for InspectorServer {
}
}
// Needed so hyper can use non Send futures
#[derive(Clone)]
struct LocalExecutor;
impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
where
Fut: Future + 'static,
Fut::Output: 'static,
{
fn execute(&self, fut: Fut) {
deno_core::unsync::spawn(fut);
}
}
fn handle_ws_request(
req: http::Request<hyper::Body>,
req: http_1::Request<hyper1::body::Incoming>,
inspector_map_rc: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
) -> http::Result<http::Response<hyper::Body>> {
) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> {
let (parts, body) = req.into_parts();
let req = http::Request::from_parts(parts, ());
let req = http_1::Request::from_parts(parts, ());
let maybe_uuid = req
.uri()
@ -129,9 +117,9 @@ fn handle_ws_request(
.and_then(|s| Uuid::parse_str(s).ok());
if maybe_uuid.is_none() {
return http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
.body("Malformed inspector UUID".into());
return http_1::Response::builder()
.status(http_1::StatusCode::BAD_REQUEST)
.body(Box::new(Bytes::from("Malformed inspector UUID").into()));
}
// run in a block to not hold borrow to `inspector_map` for too long
@ -140,34 +128,47 @@ fn handle_ws_request(
let maybe_inspector_info = inspector_map.get(&maybe_uuid.unwrap());
if maybe_inspector_info.is_none() {
return http::Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body("Invalid inspector UUID".into());
return http_1::Response::builder()
.status(http_1::StatusCode::NOT_FOUND)
.body(Box::new(Bytes::from("Invalid inspector UUID").into()));
}
let info = maybe_inspector_info.unwrap();
info.new_session_tx.clone()
};
let (parts, _) = req.into_parts();
let mut req = http::Request::from_parts(parts, body);
let mut req = http_1::Request::from_parts(parts, body);
let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) {
Ok(e) => e,
let (resp, fut) = match fastwebsockets_06::upgrade::upgrade(&mut req) {
Ok((resp, fut)) => {
let (parts, _body) = resp.into_parts();
let resp = http_1::Response::from_parts(
parts,
Box::new(http_body_util::Full::new(Bytes::new())),
);
(resp, fut)
}
_ => {
return http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
.body("Not a valid Websocket Request".into());
return http_1::Response::builder()
.status(http_1::StatusCode::BAD_REQUEST)
.body(Box::new(
Bytes::from("Not a valid Websocket Request").into(),
));
}
};
// spawn a task that will wait for websocket connection and then pump messages between
// the socket and inspector proxy
spawn(async move {
let websocket = if let Ok(w) = fut.await {
w
} else {
eprintln!("Inspector server failed to upgrade to WS connection");
return;
let websocket = match fut.await {
Ok(w) => w,
Err(err) => {
eprintln!(
"Inspector server failed to upgrade to WS connection: {:?}",
err
);
return;
}
};
// The 'outbound' channel carries messages sent to the websocket.
@ -191,31 +192,37 @@ fn handle_ws_request(
fn handle_json_request(
inspector_map: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
host: Option<String>,
) -> http::Result<http::Response<hyper::Body>> {
) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> {
let data = inspector_map
.borrow()
.values()
.map(move |info| info.get_json_metadata(&host))
.collect::<Vec<_>>();
http::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
.body(serde_json::to_string(&data).unwrap().into())
let body: http_body_util::Full<Bytes> =
Bytes::from(serde_json::to_string(&data).unwrap()).into();
http_1::Response::builder()
.status(http_1::StatusCode::OK)
.header(http_1::header::CONTENT_TYPE, "application/json")
.body(Box::new(body))
}
fn handle_json_version_request(
version_response: Value,
) -> http::Result<http::Response<hyper::Body>> {
http::Response::builder()
.status(http::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json")
.body(serde_json::to_string(&version_response).unwrap().into())
) -> http_1::Result<http_1::Response<Box<http_body_util::Full<Bytes>>>> {
let body = Box::new(http_body_util::Full::from(
serde_json::to_string(&version_response).unwrap(),
));
http_1::Response::builder()
.status(http_1::StatusCode::OK)
.header(http_1::header::CONTENT_TYPE, "application/json")
.body(body)
}
async fn server(
host: SocketAddr,
register_inspector_rx: UnboundedReceiver<InspectorInfo>,
shutdown_server_rx: oneshot::Receiver<()>,
shutdown_server_rx: broadcast::Receiver<()>,
name: &str,
) {
let inspector_map_ =
@ -253,61 +260,99 @@ async fn server(
"V8-Version": deno_core::v8_version(),
});
let make_svc = hyper::service::make_service_fn(|_| {
let inspector_map = Rc::clone(&inspector_map_);
let json_version_response = json_version_response.clone();
future::ok::<_, Infallible>(hyper::service::service_fn(
move |req: http::Request<hyper::Body>| {
future::ready({
// If the host header can make a valid URL, use it
let host = req
.headers()
.get("host")
.and_then(|host| host.to_str().ok())
.and_then(|host| Url::parse(&format!("http://{host}")).ok())
.and_then(|url| match (url.host(), url.port()) {
(Some(host), Some(port)) => Some(format!("{host}:{port}")),
(Some(host), None) => Some(format!("{host}")),
_ => None,
});
match (req.method(), req.uri().path()) {
(&http::Method::GET, path) if path.starts_with("/ws/") => {
handle_ws_request(req, Rc::clone(&inspector_map))
}
(&http::Method::GET, "/json/version") => {
handle_json_version_request(json_version_response.clone())
}
(&http::Method::GET, "/json") => {
handle_json_request(Rc::clone(&inspector_map), host)
}
(&http::Method::GET, "/json/list") => {
handle_json_request(Rc::clone(&inspector_map), host)
}
_ => http::Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body("Not Found".into()),
}
})
},
))
});
// Create the server manually so it can use the Local Executor
let mut server_handler = pin!(hyper::server::Builder::new(
hyper::server::conn::AddrIncoming::bind(&host).unwrap_or_else(|e| {
eprintln!("Cannot start inspector server: {e}.");
process::exit(1);
}),
hyper::server::conn::Http::new().with_executor(LocalExecutor),
)
.serve(make_svc)
.with_graceful_shutdown(async {
shutdown_server_rx.await.ok();
})
.unwrap_or_else(|err| {
eprintln!("Cannot start inspector server: {err}.");
process::exit(1);
let listener = match TcpListener::bind(&host).await {
Ok(l) => l,
Err(err) => {
eprintln!("Cannot start inspector server: {:?}", err);
return;
}
};
let mut server_handler = pin!(deno_core::unsync::spawn(async move {
loop {
let mut rx = shutdown_server_rx.resubscribe();
let mut shutdown_rx = pin!(rx.recv());
let mut accept = pin!(listener.accept());
let stream = tokio::select! {
accept_result = &mut accept => {
match accept_result {
Ok((s, _)) => s,
Err(err) => {
eprintln!("Failed to accept inspector connection: {:?}", err);
continue;
}
}
},
_ = &mut shutdown_rx => {
break;
}
};
let io = TokioIo::new(stream);
let inspector_map = Rc::clone(&inspector_map_);
let json_version_response = json_version_response.clone();
let mut shutdown_server_rx = shutdown_server_rx.resubscribe();
let service = hyper1::service::service_fn(
move |req: http_1::Request<hyper1::body::Incoming>| {
future::ready({
// If the host header can make a valid URL, use it
let host = req
.headers()
.get("host")
.and_then(|host| host.to_str().ok())
.and_then(|host| Url::parse(&format!("http://{host}")).ok())
.and_then(|url| match (url.host(), url.port()) {
(Some(host), Some(port)) => Some(format!("{host}:{port}")),
(Some(host), None) => Some(format!("{host}")),
_ => None,
});
match (req.method(), req.uri().path()) {
(&http_1::Method::GET, path) if path.starts_with("/ws/") => {
handle_ws_request(req, Rc::clone(&inspector_map))
}
(&http_1::Method::GET, "/json/version") => {
handle_json_version_request(json_version_response.clone())
}
(&http_1::Method::GET, "/json") => {
handle_json_request(Rc::clone(&inspector_map), host)
}
(&http_1::Method::GET, "/json/list") => {
handle_json_request(Rc::clone(&inspector_map), host)
}
_ => http_1::Response::builder()
.status(http_1::StatusCode::NOT_FOUND)
.body(Box::new(http_body_util::Full::new(Bytes::from(
"Not Found",
)))),
}
})
},
);
deno_core::unsync::spawn(async move {
let server = hyper1::server::conn::http1::Builder::new();
let mut conn =
pin!(server.serve_connection(io, service).with_upgrades());
let mut shutdown_rx = pin!(shutdown_server_rx.recv());
tokio::select! {
result = conn.as_mut() => {
if let Err(err) = result {
eprintln!("Failed to serve connection: {:?}", err);
}
},
_ = &mut shutdown_rx => {
conn.as_mut().graceful_shutdown();
let _ = conn.await;
}
}
});
}
})
.fuse());
@ -331,7 +376,7 @@ async fn server(
/// 'futures' crate, therefore they can't participate in Tokio's cooperative
/// task yielding.
async fn pump_websocket_messages(
mut websocket: WebSocket<hyper::upgrade::Upgraded>,
mut websocket: WebSocket<TokioIo<hyper1::upgrade::Upgraded>>,
inbound_tx: UnboundedSender<String>,
mut outbound_rx: UnboundedReceiver<InspectorMsg>,
) {