1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-18 11:53:59 -05:00

refactor: rewrite client side tests to 'fastwebsockets' crate (#18800)

Follow up to https://github.com/denoland/deno/pull/18781.
This commit is contained in:
Bartek Iwańczuk 2023-04-23 22:55:45 +02:00 committed by GitHub
parent fafb2584ef
commit 5a524a9a5a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 144 additions and 74 deletions

View file

@ -6,15 +6,18 @@ use deno_core::serde_json;
use deno_core::serde_json::json; use deno_core::serde_json::json;
use deno_core::url; use deno_core::url;
use deno_runtime::deno_fetch::reqwest; use deno_runtime::deno_fetch::reqwest;
use deno_runtime::deno_websocket::tokio_tungstenite;
use fastwebsockets::FragmentCollector; use fastwebsockets::FragmentCollector;
use fastwebsockets::Frame; use fastwebsockets::Frame;
use fastwebsockets::WebSocket;
use hyper::upgrade::Upgraded; use hyper::upgrade::Upgraded;
use hyper::Body;
use hyper::Request; use hyper::Request;
use hyper::Response;
use std::io::BufRead; use std::io::BufRead;
use test_util as util; use test_util as util;
use test_util::TempDir; use test_util::TempDir;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use url::Url;
use util::http_server; use util::http_server;
use util::DenoChild; use util::DenoChild;
@ -30,6 +33,37 @@ where
} }
} }
async fn connect_to_ws(uri: Url) -> (WebSocket<Upgraded>, Response<Body>) {
let domain = &uri.host().unwrap().to_string();
let port = &uri.port().unwrap_or(match uri.scheme() {
"wss" | "https" => 443,
_ => 80,
});
let addr = format!("{domain}:{port}");
let stream = TcpStream::connect(addr).await.unwrap();
let host = uri.host_str().unwrap();
let req = Request::builder()
.method("GET")
.uri(uri.path())
.header("Host", host)
.header(hyper::header::UPGRADE, "websocket")
.header(hyper::header::CONNECTION, "Upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13")
.body(hyper::Body::empty())
.unwrap();
fastwebsockets::handshake::client(&SpawnExecutor, req, stream)
.await
.unwrap()
}
struct InspectorTester { struct InspectorTester {
socket: FragmentCollector<Upgraded>, socket: FragmentCollector<Upgraded>,
notification_filter: Box<dyn FnMut(&str) -> bool + 'static>, notification_filter: Box<dyn FnMut(&str) -> bool + 'static>,
@ -57,35 +91,7 @@ impl InspectorTester {
let uri = extract_ws_url_from_stderr(&mut stderr_lines); let uri = extract_ws_url_from_stderr(&mut stderr_lines);
let domain = &uri.host().unwrap().to_string(); let (socket, response) = connect_to_ws(uri).await;
let port = &uri.port().unwrap_or(match uri.scheme() {
"wss" | "https" => 443,
_ => 80,
});
let addr = format!("{domain}:{port}");
let stream = TcpStream::connect(addr).await.unwrap();
let host = uri.host_str().unwrap();
let req = Request::builder()
.method("GET")
.uri(uri.path())
.header("Host", host)
.header(hyper::header::UPGRADE, "websocket")
.header(hyper::header::CONNECTION, "Upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13")
.body(hyper::Body::empty())
.unwrap();
let (socket, response) =
fastwebsockets::handshake::client(&SpawnExecutor, req, stream)
.await
.unwrap();
assert_eq!(response.status(), 101); // Switching protocols. assert_eq!(response.status(), 101); // Switching protocols.
@ -289,10 +295,7 @@ async fn inspector_connect() {
std::io::BufReader::new(stderr).lines().map(|r| r.unwrap()); std::io::BufReader::new(stderr).lines().map(|r| r.unwrap());
let ws_url = extract_ws_url_from_stderr(&mut stderr_lines); let ws_url = extract_ws_url_from_stderr(&mut stderr_lines);
// We use tokio_tungstenite as a websocket client because warp (which is let (_socket, response) = connect_to_ws(ws_url).await;
// a dependency of Deno) uses it.
let (_socket, response) =
tokio_tungstenite::connect_async(ws_url).await.unwrap();
assert_eq!("101 Switching Protocols", response.status().to_string()); assert_eq!("101 Switching Protocols", response.status().to_string());
child.kill().unwrap(); child.kill().unwrap();
child.wait().unwrap(); child.wait().unwrap();

View file

@ -4088,14 +4088,46 @@ fn websocketstream() {
assert!(status.success()); assert!(status.success());
} }
#[test] #[tokio::test(flavor = "multi_thread")]
fn websocketstream_ping() { async fn websocketstream_ping() {
use deno_runtime::deno_websocket::tokio_tungstenite::tungstenite;
let _g = util::http_server(); let _g = util::http_server();
let script = util::testdata_path().join("run/websocketstream_ping_test.ts"); let script = util::testdata_path().join("run/websocketstream_ping_test.ts");
let root_ca = util::testdata_path().join("tls/RootCA.pem"); let root_ca = util::testdata_path().join("tls/RootCA.pem");
let mut child = util::deno_cmd()
let srv_fn = hyper::service::service_fn(|mut req| async move {
let (response, upgrade_fut) =
fastwebsockets::upgrade::upgrade(&mut req).unwrap();
tokio::spawn(async move {
let mut ws = upgrade_fut.await.unwrap();
ws.write_frame(fastwebsockets::Frame::text("A".as_bytes().to_vec()))
.await
.unwrap();
ws.write_frame(fastwebsockets::Frame::new(
true,
fastwebsockets::OpCode::Ping,
None,
vec![],
))
.await
.unwrap();
ws.write_frame(fastwebsockets::Frame::text("B".as_bytes().to_vec()))
.await
.unwrap();
let message = ws.read_frame().await.unwrap();
assert_eq!(message.opcode, fastwebsockets::OpCode::Pong);
ws.write_frame(fastwebsockets::Frame::text("C".as_bytes().to_vec()))
.await
.unwrap();
ws.write_frame(fastwebsockets::Frame::close_raw(vec![]))
.await
.unwrap();
});
Ok::<_, std::convert::Infallible>(response)
});
let child = util::deno_cmd()
.arg("test") .arg("test")
.arg("--unstable") .arg("--unstable")
.arg("--allow-net") .arg("--allow-net")
@ -4105,31 +4137,38 @@ fn websocketstream_ping() {
.stdout(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped())
.spawn() .spawn()
.unwrap(); .unwrap();
let server = tokio::net::TcpListener::bind("127.0.0.1:4513")
.await
.unwrap();
tokio::spawn(async move {
let (stream, _) = server.accept().await.unwrap();
let conn_fut = hyper::server::conn::Http::new()
.serve_connection(stream, srv_fn)
.with_upgrades();
let server = std::net::TcpListener::bind("127.0.0.1:4513").unwrap(); if let Err(e) = conn_fut.await {
let (stream, _) = server.accept().unwrap(); eprintln!("websocket server error: {e:?}");
let mut socket = tungstenite::accept(stream).unwrap(); }
socket });
.write_message(tungstenite::Message::Text(String::from("A")))
.unwrap();
socket
.write_message(tungstenite::Message::Ping(vec![]))
.unwrap();
socket
.write_message(tungstenite::Message::Text(String::from("B")))
.unwrap();
let message = socket.read_message().unwrap();
assert_eq!(message, tungstenite::Message::Pong(vec![]));
socket
.write_message(tungstenite::Message::Text(String::from("C")))
.unwrap();
socket.close(None).unwrap();
assert!(child.wait().unwrap().success()); let r = child.wait_with_output().unwrap();
assert!(r.status.success());
} }
#[test] struct SpawnExecutor;
fn websocket_server_multi_field_connection_header() {
impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
where
Fut: std::future::Future + Send + 'static,
Fut::Output: Send + 'static,
{
fn execute(&self, fut: Fut) {
tokio::task::spawn(fut);
}
}
#[tokio::test]
async fn websocket_server_multi_field_connection_header() {
let script = util::testdata_path() let script = util::testdata_path()
.join("run/websocket_server_multi_field_connection_header_test.ts"); .join("run/websocket_server_multi_field_connection_header_test.ts");
let root_ca = util::testdata_path().join("tls/RootCA.pem"); let root_ca = util::testdata_path().join("tls/RootCA.pem");
@ -4151,25 +4190,41 @@ fn websocket_server_multi_field_connection_header() {
let msg = std::str::from_utf8(&buffer).unwrap(); let msg = std::str::from_utf8(&buffer).unwrap();
assert_eq!(msg, "READY"); assert_eq!(msg, "READY");
let req = http::request::Builder::new() let stream = tokio::net::TcpStream::connect("localhost:4319")
.await
.unwrap();
let req = hyper::Request::builder()
.header(hyper::header::UPGRADE, "websocket")
.header(http::header::CONNECTION, "keep-alive, Upgrade") .header(http::header::CONNECTION, "keep-alive, Upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13")
.uri("ws://localhost:4319") .uri("ws://localhost:4319")
.body(()) .body(hyper::Body::empty())
.unwrap(); .unwrap();
let (mut socket, _) = let (mut socket, _) =
deno_runtime::deno_websocket::tokio_tungstenite::tungstenite::connect(req) fastwebsockets::handshake::client(&SpawnExecutor, req, stream)
.await
.unwrap();
let message = socket.read_frame().await.unwrap();
assert_eq!(message.opcode, fastwebsockets::OpCode::Close);
assert!(message.payload.is_empty());
socket
.write_frame(fastwebsockets::Frame::close_raw(vec![]))
.await
.unwrap(); .unwrap();
let message = socket.read_message().unwrap();
assert_eq!(message, deno_runtime::deno_websocket::tokio_tungstenite::tungstenite::Message::Close(None));
socket.close(None).unwrap();
assert!(child.wait().unwrap().success()); assert!(child.wait().unwrap().success());
} }
// TODO(bartlomieju): this should use `deno run`, not `deno test`; but the // TODO(bartlomieju): this should use `deno run`, not `deno test`; but the
// test hangs then. https://github.com/denoland/deno/issues/14283 // test hangs then. https://github.com/denoland/deno/issues/14283
#[test] #[tokio::test]
#[ignore] #[ignore]
fn websocket_server_idletimeout() { async fn websocket_server_idletimeout() {
let script = let script =
util::testdata_path().join("run/websocket_server_idletimeout.ts"); util::testdata_path().join("run/websocket_server_idletimeout.ts");
let root_ca = util::testdata_path().join("tls/RootCA.pem"); let root_ca = util::testdata_path().join("tls/RootCA.pem");
@ -4191,12 +4246,24 @@ fn websocket_server_idletimeout() {
let msg = std::str::from_utf8(&buffer).unwrap(); let msg = std::str::from_utf8(&buffer).unwrap();
assert_eq!(msg, "READY"); assert_eq!(msg, "READY");
let req = http::request::Builder::new() let stream = tokio::net::TcpStream::connect("localhost:4509")
.uri("ws://localhost:4509") .await
.body(())
.unwrap(); .unwrap();
let (_ws, _request) = let req = hyper::Request::builder()
deno_runtime::deno_websocket::tokio_tungstenite::tungstenite::connect(req) .header(hyper::header::UPGRADE, "websocket")
.header(http::header::CONNECTION, "keep-alive, Upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
)
.header("Sec-WebSocket-Version", "13")
.uri("ws://localhost:4509")
.body(hyper::Body::empty())
.unwrap();
let (_socket, _) =
fastwebsockets::handshake::client(&SpawnExecutor, req, stream)
.await
.unwrap(); .unwrap();
assert!(child.wait().unwrap().success()); assert!(child.wait().unwrap().success());