1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

refactor(test_util): move servers to a separate module (#21577)

This commit has no functional changes, just moves all the testing
servers to "test_util::servers" module to make "test_util/src/lib.rs"
shorter.
This commit is contained in:
Bartek Iwańczuk 2023-12-14 17:52:12 +01:00 committed by GitHub
parent ac04787c30
commit 8d269efbc2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 1875 additions and 1766 deletions

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,102 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use futures::StreamExt;
use hyper::header::HeaderValue;
use hyper::http;
use rustls_tokio_stream::TlsStream;
use tokio::net::TcpStream;
use tokio::task::LocalSet;
use super::get_tcp_listener_stream;
use super::get_tls_listener_stream;
use super::SupportedHttpVersions;
pub async fn h2_grpc_server(h2_grpc_port: u16, h2s_grpc_port: u16) {
let mut tcp = get_tcp_listener_stream("grpc", h2_grpc_port).await;
let mut tls = get_tls_listener_stream(
"grpc (tls)",
h2s_grpc_port,
SupportedHttpVersions::Http2Only,
)
.await;
async fn serve(socket: TcpStream) -> Result<(), anyhow::Error> {
let mut connection = h2::server::handshake(socket).await?;
while let Some(result) = connection.accept().await {
let (request, respond) = result?;
tokio::spawn(async move {
let _ = handle_request(request, respond).await;
});
}
Ok(())
}
async fn serve_tls(socket: TlsStream) -> Result<(), anyhow::Error> {
let mut connection = h2::server::handshake(socket).await?;
while let Some(result) = connection.accept().await {
let (request, respond) = result?;
tokio::spawn(async move {
let _ = handle_request(request, respond).await;
});
}
Ok(())
}
async fn handle_request(
mut request: http::Request<h2::RecvStream>,
mut respond: h2::server::SendResponse<bytes::Bytes>,
) -> Result<(), anyhow::Error> {
let body = request.body_mut();
while let Some(data) = body.data().await {
let data = data?;
let _ = body.flow_control().release_capacity(data.len());
}
let maybe_recv_trailers = body.trailers().await?;
let response = http::Response::new(());
let mut send = respond.send_response(response, false)?;
send.send_data(bytes::Bytes::from_static(b"hello "), false)?;
send.send_data(bytes::Bytes::from_static(b"world\n"), false)?;
let mut trailers = http::HeaderMap::new();
trailers.insert(
http::HeaderName::from_static("abc"),
HeaderValue::from_static("def"),
);
trailers.insert(
http::HeaderName::from_static("opr"),
HeaderValue::from_static("stv"),
);
if let Some(recv_trailers) = maybe_recv_trailers {
for (key, value) in recv_trailers {
trailers.insert(key.unwrap(), value);
}
}
send.send_trailers(trailers)?;
Ok(())
}
let local_set = LocalSet::new();
local_set.spawn_local(async move {
while let Some(Ok(tcp)) = tcp.next().await {
tokio::spawn(async move {
let _ = serve(tcp).await;
});
}
});
local_set.spawn_local(async move {
while let Some(Ok(tls)) = tls.next().await {
tokio::spawn(async move {
let _ = serve_tls(tls).await;
});
}
});
local_set.await;
}

1449
test_util/src/servers/mod.rs Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,54 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use hyper::server::Server;
use hyper::service::make_service_fn;
use hyper::service::service_fn;
use hyper::Body;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
use serde_json::json;
use std::convert::Infallible;
use std::net::SocketAddr;
pub async fn registry_server(port: u16) {
let registry_server_addr = SocketAddr::from(([127, 0, 0, 1], port));
let registry_server_svc = make_service_fn(|_| async {
Ok::<_, Infallible>(service_fn(registry_server_handler))
});
let registry_server =
Server::bind(&registry_server_addr).serve(registry_server_svc);
if let Err(e) = registry_server.await {
eprintln!("Registry server error: {:?}", e);
}
}
async fn registry_server_handler(
req: Request<Body>,
) -> Result<Response<Body>, hyper::http::Error> {
let path = req.uri().path();
if path.starts_with("/scopes/") {
let body = serde_json::to_string_pretty(&json!({
"id": "sdfwqer-sffg-qwerasdf",
"status": "success",
"error": null
}))
.unwrap();
let res = Response::new(Body::from(body));
return Ok(res);
} else if path.starts_with("/publish_status/") {
let body = serde_json::to_string_pretty(&json!({
"id": "sdfwqer-qwer-qwerasdf",
"status": "success",
"error": null
}))
.unwrap();
let res = Response::new(Body::from(body));
return Ok(res);
}
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
}

268
test_util/src/servers/ws.rs Normal file
View file

@ -0,0 +1,268 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use anyhow::anyhow;
use bytes::Bytes;
use fastwebsockets::FragmentCollector;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
use fastwebsockets::Role;
use fastwebsockets::WebSocket;
use futures::future::join3;
use futures::future::poll_fn;
use futures::Future;
use futures::StreamExt;
use h2::server::Handshake;
use h2::server::SendResponse;
use h2::Reason;
use h2::RecvStream;
use hyper::service::service_fn;
use hyper::upgrade::Upgraded;
use hyper::Body;
use hyper::Method;
use hyper::Request;
use hyper::Response;
use hyper::StatusCode;
use pretty_assertions::assert_eq;
use std::pin::Pin;
use std::result::Result;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use super::get_tcp_listener_stream;
use super::get_tls_listener_stream;
use super::SupportedHttpVersions;
pub async fn run_ws_server(port: u16) {
let mut tcp = get_tcp_listener_stream("ws", port).await;
while let Some(Ok(stream)) = tcp.next().await {
spawn_ws_server(stream, |ws| Box::pin(echo_websocket_handler(ws)));
}
}
pub async fn run_ws_ping_server(port: u16) {
let mut tcp = get_tcp_listener_stream("ws (ping)", port).await;
while let Some(Ok(stream)) = tcp.next().await {
spawn_ws_server(stream, |ws| Box::pin(ping_websocket_handler(ws)));
}
}
pub async fn run_wss_server(port: u16) {
let mut tls = get_tls_listener_stream("wss", port, Default::default()).await;
while let Some(Ok(tls_stream)) = tls.next().await {
tokio::spawn(async move {
spawn_ws_server(tls_stream, |ws| Box::pin(echo_websocket_handler(ws)));
});
}
}
pub async fn run_ws_close_server(port: u16) {
let mut tcp = get_tcp_listener_stream("ws (close)", port).await;
while let Some(Ok(stream)) = tcp.next().await {
spawn_ws_server(stream, |ws| Box::pin(close_websocket_handler(ws)));
}
}
pub async fn run_wss2_server(port: u16) {
let mut tls = get_tls_listener_stream(
"wss2 (tls)",
port,
SupportedHttpVersions::Http2Only,
)
.await;
while let Some(Ok(tls)) = tls.next().await {
tokio::spawn(async move {
let mut h2 = h2::server::Builder::new();
h2.enable_connect_protocol();
// Using Bytes is pretty alloc-heavy but this is a test server
let server: Handshake<_, Bytes> = h2.handshake(tls);
let mut server = match server.await {
Ok(server) => server,
Err(e) => {
println!("Failed to handshake h2: {e:?}");
return;
}
};
loop {
let Some(conn) = server.accept().await else {
break;
};
let (recv, send) = match conn {
Ok(conn) => conn,
Err(e) => {
println!("Failed to accept a connection: {e:?}");
break;
}
};
tokio::spawn(handle_wss_stream(recv, send));
}
});
}
}
async fn echo_websocket_handler(
ws: fastwebsockets::WebSocket<Upgraded>,
) -> Result<(), anyhow::Error> {
let mut ws = fastwebsockets::FragmentCollector::new(ws);
loop {
let frame = ws.read_frame().await.unwrap();
match frame.opcode {
fastwebsockets::OpCode::Close => break,
fastwebsockets::OpCode::Text | fastwebsockets::OpCode::Binary => {
ws.write_frame(frame).await.unwrap();
}
_ => {}
}
}
Ok(())
}
type WsHandler =
fn(
fastwebsockets::WebSocket<Upgraded>,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send>>;
fn spawn_ws_server<S>(stream: S, handler: WsHandler)
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{
let srv_fn = service_fn(move |mut req: Request<Body>| async move {
let (response, upgrade_fut) = fastwebsockets::upgrade::upgrade(&mut req)
.map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))?;
tokio::spawn(async move {
let ws = upgrade_fut
.await
.map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))
.unwrap();
if let Err(e) = handler(ws).await {
eprintln!("Error in websocket connection: {}", e);
}
});
Ok::<_, anyhow::Error>(response)
});
tokio::spawn(async move {
let conn_fut = hyper::server::conn::Http::new()
.serve_connection(stream, srv_fn)
.with_upgrades();
if let Err(e) = conn_fut.await {
eprintln!("websocket server error: {e:?}");
}
});
}
async fn handle_wss_stream(
recv: Request<RecvStream>,
mut send: SendResponse<Bytes>,
) -> Result<(), h2::Error> {
if recv.method() != Method::CONNECT {
eprintln!("wss2: refusing non-CONNECT stream");
send.send_reset(Reason::REFUSED_STREAM);
return Ok(());
}
let Some(protocol) = recv.extensions().get::<h2::ext::Protocol>() else {
eprintln!("wss2: refusing no-:protocol stream");
send.send_reset(Reason::REFUSED_STREAM);
return Ok(());
};
if protocol.as_str() != "websocket" && protocol.as_str() != "WebSocket" {
eprintln!("wss2: refusing non-websocket stream");
send.send_reset(Reason::REFUSED_STREAM);
return Ok(());
}
let mut body = recv.into_body();
let mut response = Response::new(());
*response.status_mut() = StatusCode::OK;
let mut resp = send.send_response(response, false)?;
// Use a duplex stream to talk to fastwebsockets because it's just faster to implement
let (a, b) = tokio::io::duplex(65536);
let f1 = tokio::spawn(tokio::task::unconstrained(async move {
let ws = WebSocket::after_handshake(a, Role::Server);
let mut ws = FragmentCollector::new(ws);
loop {
let frame = ws.read_frame().await.unwrap();
if frame.opcode == OpCode::Close {
break;
}
ws.write_frame(frame).await.unwrap();
}
}));
let (mut br, mut bw) = tokio::io::split(b);
let f2 = tokio::spawn(tokio::task::unconstrained(async move {
loop {
let Some(Ok(data)) = poll_fn(|cx| body.poll_data(cx)).await else {
return;
};
body.flow_control().release_capacity(data.len()).unwrap();
let Ok(_) = bw.write_all(&data).await else {
break;
};
}
}));
let f3 = tokio::spawn(tokio::task::unconstrained(async move {
loop {
let mut buf = [0; 65536];
let n = br.read(&mut buf).await.unwrap();
if n == 0 {
break;
}
resp.reserve_capacity(n);
poll_fn(|cx| resp.poll_capacity(cx)).await;
resp
.send_data(Bytes::copy_from_slice(&buf[0..n]), false)
.unwrap();
}
resp.send_data(Bytes::new(), true).unwrap();
}));
_ = join3(f1, f2, f3).await;
Ok(())
}
async fn close_websocket_handler(
ws: fastwebsockets::WebSocket<Upgraded>,
) -> Result<(), anyhow::Error> {
let mut ws = fastwebsockets::FragmentCollector::new(ws);
ws.write_frame(fastwebsockets::Frame::close_raw(vec![].into()))
.await
.unwrap();
Ok(())
}
async fn ping_websocket_handler(
ws: fastwebsockets::WebSocket<Upgraded>,
) -> Result<(), anyhow::Error> {
let mut ws = fastwebsockets::FragmentCollector::new(ws);
for i in 0..9 {
ws.write_frame(Frame::new(true, OpCode::Ping, None, vec![].into()))
.await
.unwrap();
let frame = ws.read_frame().await.unwrap();
assert_eq!(frame.opcode, OpCode::Pong);
assert!(frame.payload.is_empty());
ws.write_frame(Frame::text(
format!("hello {}", i).as_bytes().to_vec().into(),
))
.await
.unwrap();
let frame = ws.read_frame().await.unwrap();
assert_eq!(frame.opcode, OpCode::Text);
assert_eq!(frame.payload, format!("hello {}", i).as_bytes());
}
ws.write_frame(fastwebsockets::Frame::close(1000, b""))
.await
.unwrap();
Ok(())
}

View file

@ -1,5 +1,5 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
fn main() { fn main() {
test_util::run_all_servers(); test_util::servers::run_all_servers();
} }