From 5a524a9a5a7fac0d16b2cbe2df1142dc419df7fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sun, 23 Apr 2023 22:55:45 +0200 Subject: [PATCH] refactor: rewrite client side tests to 'fastwebsockets' crate (#18800) Follow up to https://github.com/denoland/deno/pull/18781. --- cli/tests/integration/inspector_tests.rs | 71 +++++------ cli/tests/integration/run_tests.rs | 147 +++++++++++++++++------ 2 files changed, 144 insertions(+), 74 deletions(-) diff --git a/cli/tests/integration/inspector_tests.rs b/cli/tests/integration/inspector_tests.rs index 35ff014030..cf66c4adc1 100644 --- a/cli/tests/integration/inspector_tests.rs +++ b/cli/tests/integration/inspector_tests.rs @@ -6,15 +6,18 @@ use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url; use deno_runtime::deno_fetch::reqwest; -use deno_runtime::deno_websocket::tokio_tungstenite; use fastwebsockets::FragmentCollector; use fastwebsockets::Frame; +use fastwebsockets::WebSocket; use hyper::upgrade::Upgraded; +use hyper::Body; use hyper::Request; +use hyper::Response; use std::io::BufRead; use test_util as util; use test_util::TempDir; use tokio::net::TcpStream; +use url::Url; use util::http_server; use util::DenoChild; @@ -30,6 +33,37 @@ where } } +async fn connect_to_ws(uri: Url) -> (WebSocket, Response) { + let domain = &uri.host().unwrap().to_string(); + let port = &uri.port().unwrap_or(match uri.scheme() { + "wss" | "https" => 443, + _ => 80, + }); + let addr = format!("{domain}:{port}"); + + let stream = TcpStream::connect(addr).await.unwrap(); + + let host = uri.host_str().unwrap(); + + let req = Request::builder() + .method("GET") + .uri(uri.path()) + .header("Host", host) + .header(hyper::header::UPGRADE, "websocket") + .header(hyper::header::CONNECTION, "Upgrade") + .header( + "Sec-WebSocket-Key", + fastwebsockets::handshake::generate_key(), + ) + .header("Sec-WebSocket-Version", "13") + .body(hyper::Body::empty()) + .unwrap(); + + fastwebsockets::handshake::client(&SpawnExecutor, req, stream) + .await + .unwrap() +} + struct InspectorTester { socket: FragmentCollector, notification_filter: Box bool + 'static>, @@ -57,35 +91,7 @@ impl InspectorTester { let uri = extract_ws_url_from_stderr(&mut stderr_lines); - let domain = &uri.host().unwrap().to_string(); - let port = &uri.port().unwrap_or(match uri.scheme() { - "wss" | "https" => 443, - _ => 80, - }); - let addr = format!("{domain}:{port}"); - - let stream = TcpStream::connect(addr).await.unwrap(); - - let host = uri.host_str().unwrap(); - - let req = Request::builder() - .method("GET") - .uri(uri.path()) - .header("Host", host) - .header(hyper::header::UPGRADE, "websocket") - .header(hyper::header::CONNECTION, "Upgrade") - .header( - "Sec-WebSocket-Key", - fastwebsockets::handshake::generate_key(), - ) - .header("Sec-WebSocket-Version", "13") - .body(hyper::Body::empty()) - .unwrap(); - - let (socket, response) = - fastwebsockets::handshake::client(&SpawnExecutor, req, stream) - .await - .unwrap(); + let (socket, response) = connect_to_ws(uri).await; assert_eq!(response.status(), 101); // Switching protocols. @@ -289,10 +295,7 @@ async fn inspector_connect() { std::io::BufReader::new(stderr).lines().map(|r| r.unwrap()); let ws_url = extract_ws_url_from_stderr(&mut stderr_lines); - // We use tokio_tungstenite as a websocket client because warp (which is - // a dependency of Deno) uses it. - let (_socket, response) = - tokio_tungstenite::connect_async(ws_url).await.unwrap(); + let (_socket, response) = connect_to_ws(ws_url).await; assert_eq!("101 Switching Protocols", response.status().to_string()); child.kill().unwrap(); child.wait().unwrap(); diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs index b5bb04fdd9..aba6283d12 100644 --- a/cli/tests/integration/run_tests.rs +++ b/cli/tests/integration/run_tests.rs @@ -4088,14 +4088,46 @@ fn websocketstream() { assert!(status.success()); } -#[test] -fn websocketstream_ping() { - use deno_runtime::deno_websocket::tokio_tungstenite::tungstenite; +#[tokio::test(flavor = "multi_thread")] +async fn websocketstream_ping() { let _g = util::http_server(); let script = util::testdata_path().join("run/websocketstream_ping_test.ts"); let root_ca = util::testdata_path().join("tls/RootCA.pem"); - let mut child = util::deno_cmd() + + let srv_fn = hyper::service::service_fn(|mut req| async move { + let (response, upgrade_fut) = + fastwebsockets::upgrade::upgrade(&mut req).unwrap(); + tokio::spawn(async move { + let mut ws = upgrade_fut.await.unwrap(); + + ws.write_frame(fastwebsockets::Frame::text("A".as_bytes().to_vec())) + .await + .unwrap(); + ws.write_frame(fastwebsockets::Frame::new( + true, + fastwebsockets::OpCode::Ping, + None, + vec![], + )) + .await + .unwrap(); + ws.write_frame(fastwebsockets::Frame::text("B".as_bytes().to_vec())) + .await + .unwrap(); + let message = ws.read_frame().await.unwrap(); + assert_eq!(message.opcode, fastwebsockets::OpCode::Pong); + ws.write_frame(fastwebsockets::Frame::text("C".as_bytes().to_vec())) + .await + .unwrap(); + ws.write_frame(fastwebsockets::Frame::close_raw(vec![])) + .await + .unwrap(); + }); + Ok::<_, std::convert::Infallible>(response) + }); + + let child = util::deno_cmd() .arg("test") .arg("--unstable") .arg("--allow-net") @@ -4105,31 +4137,38 @@ fn websocketstream_ping() { .stdout(std::process::Stdio::piped()) .spawn() .unwrap(); + let server = tokio::net::TcpListener::bind("127.0.0.1:4513") + .await + .unwrap(); + tokio::spawn(async move { + let (stream, _) = server.accept().await.unwrap(); + let conn_fut = hyper::server::conn::Http::new() + .serve_connection(stream, srv_fn) + .with_upgrades(); - let server = std::net::TcpListener::bind("127.0.0.1:4513").unwrap(); - let (stream, _) = server.accept().unwrap(); - let mut socket = tungstenite::accept(stream).unwrap(); - socket - .write_message(tungstenite::Message::Text(String::from("A"))) - .unwrap(); - socket - .write_message(tungstenite::Message::Ping(vec![])) - .unwrap(); - socket - .write_message(tungstenite::Message::Text(String::from("B"))) - .unwrap(); - let message = socket.read_message().unwrap(); - assert_eq!(message, tungstenite::Message::Pong(vec![])); - socket - .write_message(tungstenite::Message::Text(String::from("C"))) - .unwrap(); - socket.close(None).unwrap(); + if let Err(e) = conn_fut.await { + eprintln!("websocket server error: {e:?}"); + } + }); - assert!(child.wait().unwrap().success()); + let r = child.wait_with_output().unwrap(); + assert!(r.status.success()); } -#[test] -fn websocket_server_multi_field_connection_header() { +struct SpawnExecutor; + +impl hyper::rt::Executor for SpawnExecutor +where + Fut: std::future::Future + Send + 'static, + Fut::Output: Send + 'static, +{ + fn execute(&self, fut: Fut) { + tokio::task::spawn(fut); + } +} + +#[tokio::test] +async fn websocket_server_multi_field_connection_header() { let script = util::testdata_path() .join("run/websocket_server_multi_field_connection_header_test.ts"); let root_ca = util::testdata_path().join("tls/RootCA.pem"); @@ -4151,25 +4190,41 @@ fn websocket_server_multi_field_connection_header() { let msg = std::str::from_utf8(&buffer).unwrap(); assert_eq!(msg, "READY"); - let req = http::request::Builder::new() - .header(http::header::CONNECTION, "keep-alive, Upgrade") - .uri("ws://localhost:4319") - .body(()) + let stream = tokio::net::TcpStream::connect("localhost:4319") + .await .unwrap(); + let req = hyper::Request::builder() + .header(hyper::header::UPGRADE, "websocket") + .header(http::header::CONNECTION, "keep-alive, Upgrade") + .header( + "Sec-WebSocket-Key", + fastwebsockets::handshake::generate_key(), + ) + .header("Sec-WebSocket-Version", "13") + .uri("ws://localhost:4319") + .body(hyper::Body::empty()) + .unwrap(); + let (mut socket, _) = - deno_runtime::deno_websocket::tokio_tungstenite::tungstenite::connect(req) + fastwebsockets::handshake::client(&SpawnExecutor, req, stream) + .await .unwrap(); - let message = socket.read_message().unwrap(); - assert_eq!(message, deno_runtime::deno_websocket::tokio_tungstenite::tungstenite::Message::Close(None)); - socket.close(None).unwrap(); + + let message = socket.read_frame().await.unwrap(); + assert_eq!(message.opcode, fastwebsockets::OpCode::Close); + assert!(message.payload.is_empty()); + socket + .write_frame(fastwebsockets::Frame::close_raw(vec![])) + .await + .unwrap(); assert!(child.wait().unwrap().success()); } // TODO(bartlomieju): this should use `deno run`, not `deno test`; but the // test hangs then. https://github.com/denoland/deno/issues/14283 -#[test] +#[tokio::test] #[ignore] -fn websocket_server_idletimeout() { +async fn websocket_server_idletimeout() { let script = util::testdata_path().join("run/websocket_server_idletimeout.ts"); let root_ca = util::testdata_path().join("tls/RootCA.pem"); @@ -4191,12 +4246,24 @@ fn websocket_server_idletimeout() { let msg = std::str::from_utf8(&buffer).unwrap(); assert_eq!(msg, "READY"); - let req = http::request::Builder::new() - .uri("ws://localhost:4509") - .body(()) + let stream = tokio::net::TcpStream::connect("localhost:4509") + .await .unwrap(); - let (_ws, _request) = - deno_runtime::deno_websocket::tokio_tungstenite::tungstenite::connect(req) + let req = hyper::Request::builder() + .header(hyper::header::UPGRADE, "websocket") + .header(http::header::CONNECTION, "keep-alive, Upgrade") + .header( + "Sec-WebSocket-Key", + fastwebsockets::handshake::generate_key(), + ) + .header("Sec-WebSocket-Version", "13") + .uri("ws://localhost:4509") + .body(hyper::Body::empty()) + .unwrap(); + + let (_socket, _) = + fastwebsockets::handshake::client(&SpawnExecutor, req, stream) + .await .unwrap(); assert!(child.wait().unwrap().success());