From 068228cb454d14a6f5943061a5a6569b9e395e23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 22 Apr 2023 11:17:31 +0200 Subject: [PATCH] refactor: rewrite tests to "fastwebsockets" crate (#18781) Migrating off of `tokio-tungstenite` crate. --------- Co-authored-by: Divy Srivastava --- Cargo.lock | 8 +- Cargo.toml | 1 + cli/Cargo.toml | 2 + cli/tests/integration/inspector_tests.rs | 90 ++++++++---- cli/tests/testdata/run/websocket_test.ts | 2 +- ext/websocket/Cargo.toml | 2 +- runtime/Cargo.toml | 1 + runtime/inspector_server.rs | 96 ++++++------- test_util/Cargo.toml | 3 +- test_util/src/lib.rs | 169 +++++++++++++++-------- 10 files changed, 229 insertions(+), 145 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ceeb2cf2a0..114a6e0e80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -721,12 +721,14 @@ dependencies = [ "env_logger", "eszip", "fancy-regex", + "fastwebsockets", "flaky_test", "flate2", "fs3", "fwdansi", "glibc_version", "http", + "hyper", "import_map 0.15.0", "indexmap", "jsonc-parser", @@ -1235,6 +1237,7 @@ dependencies = [ "deno_webstorage", "dlopen", "encoding_rs", + "fastwebsockets", "filetime", "fs3", "fwdansi", @@ -1791,9 +1794,9 @@ dependencies = [ [[package]] name = "fastwebsockets" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcf2f933f24f45831bd66580a8f9394e440f1f5a23806cf0d4d8b6649e1a01e9" +checksum = "a9e973e2bd2dbd77cc9e929ede2ce65984a35ac5481976afbfbd509cb40dc965" dependencies = [ "base64 0.21.0", "cc", @@ -4864,6 +4867,7 @@ dependencies = [ "atty", "base64 0.13.1", "console_static_text", + "fastwebsockets", "flate2", "futures", "hyper", diff --git a/Cargo.toml b/Cargo.toml index 6b49de2311..aa12e16295 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,6 +91,7 @@ data-url = "=0.2.0" dlopen = "0.1.8" encoding_rs = "=0.8.31" ecb = "=0.1.1" +fastwebsockets = "=0.2.5" flate2 = "=1.0.24" fs3 = "0.5.0" futures = "0.3.21" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 96fe458ae2..ebd8583304 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -72,9 +72,11 @@ encoding_rs.workspace = true env_logger = "=0.9.0" eszip = "=0.41.0" fancy-regex = "=0.10.0" +fastwebsockets.workspace = true flate2.workspace = true fs3.workspace = true http.workspace = true +hyper.workspace = true import_map = "=0.15.0" indexmap.workspace = true jsonc-parser = { version = "=0.21.0", features = ["serde"] } diff --git a/cli/tests/integration/inspector_tests.rs b/cli/tests/integration/inspector_tests.rs index 067963786d..35ff014030 100644 --- a/cli/tests/integration/inspector_tests.rs +++ b/cli/tests/integration/inspector_tests.rs @@ -1,15 +1,16 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use deno_core::anyhow::anyhow; use deno_core::error::AnyError; -use deno_core::futures::prelude::*; -use deno_core::futures::stream::SplitSink; -use deno_core::futures::stream::SplitStream; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url; use deno_runtime::deno_fetch::reqwest; use deno_runtime::deno_websocket::tokio_tungstenite; -use deno_runtime::deno_websocket::tokio_tungstenite::tungstenite; +use fastwebsockets::FragmentCollector; +use fastwebsockets::Frame; +use hyper::upgrade::Upgraded; +use hyper::Request; use std::io::BufRead; use test_util as util; use test_util::TempDir; @@ -17,18 +18,20 @@ use tokio::net::TcpStream; use util::http_server; use util::DenoChild; +struct SpawnExecutor; + +impl hyper::rt::Executor for SpawnExecutor +where + Fut: std::future::Future + Send + 'static, + Fut::Output: Send + 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn(fut); + } +} + struct InspectorTester { - socket_tx: SplitSink< - tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream, - >, - tungstenite::Message, - >, - socket_rx: SplitStream< - tokio_tungstenite::WebSocketStream< - tokio_tungstenite::MaybeTlsStream, - >, - >, + socket: FragmentCollector, notification_filter: Box bool + 'static>, child: DenoChild, stderr_lines: Box>, @@ -52,17 +55,42 @@ impl InspectorTester { let mut stderr_lines = std::io::BufReader::new(stderr).lines().map(|r| r.unwrap()); - let ws_url = extract_ws_url_from_stderr(&mut stderr_lines); + let uri = extract_ws_url_from_stderr(&mut stderr_lines); + + let domain = &uri.host().unwrap().to_string(); + let port = &uri.port().unwrap_or(match uri.scheme() { + "wss" | "https" => 443, + _ => 80, + }); + let addr = format!("{domain}:{port}"); + + let stream = TcpStream::connect(addr).await.unwrap(); + + let host = uri.host_str().unwrap(); + + let req = Request::builder() + .method("GET") + .uri(uri.path()) + .header("Host", host) + .header(hyper::header::UPGRADE, "websocket") + .header(hyper::header::CONNECTION, "Upgrade") + .header( + "Sec-WebSocket-Key", + fastwebsockets::handshake::generate_key(), + ) + .header("Sec-WebSocket-Version", "13") + .body(hyper::Body::empty()) + .unwrap(); let (socket, response) = - tokio_tungstenite::connect_async(ws_url).await.unwrap(); + fastwebsockets::handshake::client(&SpawnExecutor, req, stream) + .await + .unwrap(); + assert_eq!(response.status(), 101); // Switching protocols. - let (socket_tx, socket_rx) = socket.split(); - Self { - socket_tx, - socket_rx, + socket: FragmentCollector::new(socket), notification_filter: Box::new(notification_filter), child, stderr_lines: Box::new(stderr_lines), @@ -74,10 +102,10 @@ impl InspectorTester { // TODO(bartlomieju): add graceful error handling for msg in messages { let result = self - .socket_tx - .send(msg.to_string().into()) + .socket + .write_frame(Frame::text(msg.to_string().into_bytes())) .await - .map_err(|e| e.into()); + .map_err(|e| anyhow!(e)); self.handle_error(result); } } @@ -111,8 +139,9 @@ impl InspectorTester { async fn recv(&mut self) -> String { loop { - let result = self.socket_rx.next().await.unwrap().map_err(|e| e.into()); - let message = self.handle_error(result).to_string(); + let result = self.socket.read_frame().await.map_err(|e| anyhow!(e)); + let message = + String::from_utf8(self.handle_error(result).payload).unwrap(); if (self.notification_filter)(&message) { return message; } @@ -236,7 +265,7 @@ fn skip_check_line( let mut line = stderr_lines.next().unwrap(); line = util::strip_ansi_codes(&line).to_string(); - if line.starts_with("Check") { + if line.starts_with("Check") || line.starts_with("Download") { continue; } @@ -514,8 +543,11 @@ async fn inspector_does_not_hang() { } // Check that we can gracefully close the websocket connection. - tester.socket_tx.close().await.unwrap(); - tester.socket_rx.for_each(|_| async {}).await; + tester + .socket + .write_frame(Frame::close_raw(vec![])) + .await + .unwrap(); assert_eq!(&tester.stdout_lines.next().unwrap(), "done"); assert!(tester.child.wait().unwrap().success()); diff --git a/cli/tests/testdata/run/websocket_test.ts b/cli/tests/testdata/run/websocket_test.ts index a9dc34ad1d..27bc5adf92 100644 --- a/cli/tests/testdata/run/websocket_test.ts +++ b/cli/tests/testdata/run/websocket_test.ts @@ -161,7 +161,7 @@ Deno.test("websocket error", async () => { assert(err instanceof ErrorEvent); // Error message got changed because we don't use warp in test_util - assertEquals(err.message, "UnexpectedEof: tls handshake eof"); + assertEquals(err.message, "InvalidData: received corrupt message"); promise1.resolve(); }; await promise1; diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml index a96b6cceb9..53e184e1e2 100644 --- a/ext/websocket/Cargo.toml +++ b/ext/websocket/Cargo.toml @@ -16,7 +16,7 @@ path = "lib.rs" [dependencies] deno_core.workspace = true deno_tls.workspace = true -fastwebsockets = { version = "0.2.4", features = ["upgrade"] } +fastwebsockets = { workspace = true, features = ["upgrade"] } http.workspace = true hyper.workspace = true serde.workspace = true diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 58f292e8f9..20cbda0bfb 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -80,6 +80,7 @@ deno_web.workspace = true deno_webidl.workspace = true deno_websocket.workspace = true deno_webstorage.workspace = true +fastwebsockets.workspace = true atty.workspace = true console_static_text.workspace = true diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs index d65e813cb6..25d0d796c1 100644 --- a/runtime/inspector_server.rs +++ b/runtime/inspector_server.rs @@ -1,7 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use core::convert::Infallible as Never; // Alias for the future `!` type. -use deno_core::error::AnyError; +// Alias for the future `!` type. +use core::convert::Infallible as Never; use deno_core::futures::channel::mpsc; use deno_core::futures::channel::mpsc::UnboundedReceiver; use deno_core::futures::channel::mpsc::UnboundedSender; @@ -18,8 +18,9 @@ use deno_core::serde_json::Value; use deno_core::InspectorMsg; use deno_core::InspectorSessionProxy; use deno_core::JsRuntime; -use deno_websocket::tokio_tungstenite::tungstenite; -use deno_websocket::tokio_tungstenite::WebSocketStream; +use fastwebsockets::Frame; +use fastwebsockets::OpCode; +use fastwebsockets::WebSocket; use std::cell::RefCell; use std::collections::HashMap; use std::convert::Infallible; @@ -145,35 +146,27 @@ fn handle_ws_request( let info = maybe_inspector_info.unwrap(); info.new_session_tx.clone() }; - - let resp = tungstenite::handshake::server::create_response(&req) - .map(|resp| resp.map(|_| hyper::Body::empty())) - .or_else(|e| match e { - tungstenite::error::Error::HttpFormat(http_error) => Err(http_error), - _ => http::Response::builder() - .status(http::StatusCode::BAD_REQUEST) - .body("Not a valid Websocket Request".into()), - })?; - let (parts, _) = req.into_parts(); - let req = http::Request::from_parts(parts, body); + let mut req = http::Request::from_parts(parts, body); + + let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) { + Ok(e) => e, + _ => { + return http::Response::builder() + .status(http::StatusCode::BAD_REQUEST) + .body("Not a valid Websocket Request".into()); + } + }; // spawn a task that will wait for websocket connection and then pump messages between // the socket and inspector proxy tokio::task::spawn_local(async move { - let upgrade_result = hyper::upgrade::on(req).await; - let upgraded = if let Ok(u) = upgrade_result { - u + let websocket = if let Ok(w) = fut.await { + w } else { eprintln!("Inspector server failed to upgrade to WS connection"); return; }; - let websocket = WebSocketStream::from_raw_socket( - upgraded, - tungstenite::protocol::Role::Server, - None, - ) - .await; // The 'outbound' channel carries messages sent to the websocket. let (outbound_tx, outbound_rx) = mpsc::unbounded(); @@ -324,37 +317,36 @@ async fn server( /// 'futures' crate, therefore they can't participate in Tokio's cooperative /// task yielding. async fn pump_websocket_messages( - websocket: WebSocketStream, + mut websocket: WebSocket, inbound_tx: UnboundedSender, - outbound_rx: UnboundedReceiver, + mut outbound_rx: UnboundedReceiver, ) { - let (websocket_tx, websocket_rx) = websocket.split(); - - let outbound_pump = outbound_rx - .map(|msg| tungstenite::Message::text(msg.content)) - .map(Ok) - .forward(websocket_tx) - .map_err(|_| ()); - - let inbound_pump = async move { - let _result = websocket_rx - .map_err(AnyError::from) - .map_ok(|msg| { - // Messages that cannot be converted to strings are ignored. - if let Ok(msg_text) = msg.into_text() { - let _ = inbound_tx.unbounded_send(msg_text); + 'pump: loop { + tokio::select! { + Some(msg) = outbound_rx.next() => { + let msg = Frame::text(msg.content.into_bytes()); + let _ = websocket.write_frame(msg).await; } - }) - .try_collect::<()>() - .await; - - // Users don't care if there was an error coming from debugger, - // just about the fact that debugger did disconnect. - eprintln!("Debugger session ended"); - - Ok(()) - }; - let _ = future::try_join(outbound_pump, inbound_pump).await; + Ok(msg) = websocket.read_frame() => { + match msg.opcode { + OpCode::Text => { + if let Ok(s) = String::from_utf8(msg.payload) { + let _ = inbound_tx.unbounded_send(s); + } + } + OpCode::Close => { + // Users don't care if there was an error coming from debugger, + // just about the fact that debugger did disconnect. + eprintln!("Debugger session ended"); + break 'pump; + } + _ => { + // Ignore other messages. + } + } + } + } + } } /// Inspector information that is sent from the isolate thread to the server diff --git a/test_util/Cargo.toml b/test_util/Cargo.toml index cb1ea46cc2..5934913112 100644 --- a/test_util/Cargo.toml +++ b/test_util/Cargo.toml @@ -19,6 +19,7 @@ async-stream = "0.3.3" atty.workspace = true base64.workspace = true console_static_text.workspace = true +fastwebsockets = { workspace = true, features = ["upgrade"] } flate2.workspace = true futures.workspace = true hyper = { workspace = true, features = ["server", "http1", "http2", "runtime"] } @@ -40,7 +41,7 @@ tar.workspace = true tempfile.workspace = true tokio.workspace = true tokio-rustls.workspace = true -tokio-tungstenite.workspace = true +tokio-tungstenite = { workspace = true, features = ["rustls-tls-webpki-roots"] } url.workspace = true [target.'cfg(unix)'.dependencies] diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs index 6a6614ad0a..e647c0a4cb 100644 --- a/test_util/src/lib.rs +++ b/test_util/src/lib.rs @@ -2,6 +2,7 @@ // Usage: provide a port as argument to run hyper_hello benchmark server // otherwise this starts multiple servers on many ports for test endpoints. use anyhow::anyhow; +use futures::Future; use futures::FutureExt; use futures::Stream; use futures::StreamExt; @@ -9,6 +10,7 @@ use hyper::header::HeaderValue; use hyper::server::Server; use hyper::service::make_service_fn; use hyper::service::service_fn; +use hyper::upgrade::Upgraded; use hyper::Body; use hyper::Request; use hyper::Response; @@ -49,7 +51,6 @@ use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_rustls::rustls; use tokio_rustls::TlsAcceptor; -use tokio_tungstenite::accept_async; use url::Url; pub mod assertions; @@ -302,69 +303,128 @@ async fn basic_auth_redirect( Ok(resp) } +async fn echo_websocket_handler( + ws: fastwebsockets::WebSocket, +) -> Result<(), anyhow::Error> { + let mut ws = fastwebsockets::FragmentCollector::new(ws); + + loop { + let frame = ws.read_frame().await.unwrap(); + match frame.opcode { + fastwebsockets::OpCode::Close => break, + fastwebsockets::OpCode::Text | fastwebsockets::OpCode::Binary => { + ws.write_frame(frame).await.unwrap(); + } + _ => {} + } + } + + Ok(()) +} + +type WsHandler = + fn( + fastwebsockets::WebSocket, + ) -> Pin> + Send>>; + +fn spawn_ws_server(stream: S, handler: WsHandler) +where + S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, +{ + let srv_fn = service_fn(move |mut req: Request| async move { + let (response, upgrade_fut) = fastwebsockets::upgrade::upgrade(&mut req) + .map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))?; + + tokio::spawn(async move { + let ws = upgrade_fut + .await + .map_err(|e| anyhow!("Error upgrading websocket connection: {}", e)) + .unwrap(); + + if let Err(e) = handler(ws).await { + eprintln!("Error in websocket connection: {}", e); + } + }); + + Ok::<_, anyhow::Error>(response) + }); + + tokio::spawn(async move { + let conn_fut = hyper::server::conn::Http::new() + .serve_connection(stream, srv_fn) + .with_upgrades(); + + if let Err(e) = conn_fut.await { + eprintln!("websocket server error: {e:?}"); + } + }); +} + async fn run_ws_server(addr: &SocketAddr) { let listener = TcpListener::bind(addr).await.unwrap(); println!("ready: ws"); // Eye catcher for HttpServerCount while let Ok((stream, _addr)) = listener.accept().await { - tokio::spawn(async move { - let ws_stream_fut = accept_async(stream); - - let ws_stream = ws_stream_fut.await; - if let Ok(ws_stream) = ws_stream { - let (tx, rx) = ws_stream.split(); - rx.forward(tx) - .map(|result| { - if let Err(e) = result { - println!("websocket server error: {e:?}"); - } - }) - .await; - } - }); + spawn_ws_server(stream, |ws| Box::pin(echo_websocket_handler(ws))); } } +async fn ping_websocket_handler( + ws: fastwebsockets::WebSocket, +) -> Result<(), anyhow::Error> { + use fastwebsockets::Frame; + use fastwebsockets::OpCode; + + let mut ws = fastwebsockets::FragmentCollector::new(ws); + + for i in 0..9 { + ws.write_frame(Frame::new(true, OpCode::Ping, None, vec![])) + .await + .unwrap(); + + let frame = ws.read_frame().await.unwrap(); + assert_eq!(frame.opcode, OpCode::Pong); + assert!(frame.payload.is_empty()); + + ws.write_frame(Frame::text(format!("hello {}", i).as_bytes().to_vec())) + .await + .unwrap(); + + let frame = ws.read_frame().await.unwrap(); + assert_eq!(frame.opcode, OpCode::Text); + assert_eq!(frame.payload, format!("hello {}", i).as_bytes()); + } + + ws.write_frame(fastwebsockets::Frame::close(1000, b"")) + .await + .unwrap(); + + Ok(()) +} + async fn run_ws_ping_server(addr: &SocketAddr) { let listener = TcpListener::bind(addr).await.unwrap(); println!("ready: ws"); // Eye catcher for HttpServerCount while let Ok((stream, _addr)) = listener.accept().await { - tokio::spawn(async move { - let ws_stream = accept_async(stream).await; - use futures::SinkExt; - use tokio_tungstenite::tungstenite::Message; - if let Ok(mut ws_stream) = ws_stream { - for i in 0..9 { - ws_stream.send(Message::Ping(vec![])).await.unwrap(); - - let msg = ws_stream.next().await.unwrap().unwrap(); - assert_eq!(msg, Message::Pong(vec![])); - - ws_stream - .send(Message::Text(format!("hello {}", i))) - .await - .unwrap(); - - let msg = ws_stream.next().await.unwrap().unwrap(); - assert_eq!(msg, Message::Text(format!("hello {}", i))); - } - - ws_stream.close(None).await.unwrap(); - } - }); + spawn_ws_server(stream, |ws| Box::pin(ping_websocket_handler(ws))); } } +async fn close_websocket_handler( + ws: fastwebsockets::WebSocket, +) -> Result<(), anyhow::Error> { + let mut ws = fastwebsockets::FragmentCollector::new(ws); + + ws.write_frame(fastwebsockets::Frame::close_raw(vec![])) + .await + .unwrap(); + + Ok(()) +} + async fn run_ws_close_server(addr: &SocketAddr) { let listener = TcpListener::bind(addr).await.unwrap(); while let Ok((stream, _addr)) = listener.accept().await { - tokio::spawn(async move { - let ws_stream_fut = accept_async(stream); - - let ws_stream = ws_stream_fut.await; - if let Ok(mut ws_stream) = ws_stream { - ws_stream.close(None).await.unwrap(); - } - }); + spawn_ws_server(stream, |ws| Box::pin(close_websocket_handler(ws))); } } @@ -471,18 +531,9 @@ async fn run_wss_server(addr: &SocketAddr) { tokio::spawn(async move { match acceptor.accept(stream).await { Ok(tls_stream) => { - let ws_stream_fut = accept_async(tls_stream); - let ws_stream = ws_stream_fut.await; - if let Ok(ws_stream) = ws_stream { - let (tx, rx) = ws_stream.split(); - rx.forward(tx) - .map(|result| { - if let Err(e) = result { - println!("Websocket server error: {e:?}"); - } - }) - .await; - } + spawn_ws_server(tls_stream, |ws| { + Box::pin(echo_websocket_handler(ws)) + }); } Err(e) => { eprintln!("TLS accept error: {e:?}");