From 0533ae4380a46652fa6885ba610b4ec5900931ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 26 Dec 2023 11:20:49 +0100 Subject: [PATCH] refactor: migrate ext/websocket to hyper 1.1 (#21699) --- Cargo.lock | 6 +++ ext/websocket/Cargo.toml | 6 +++ ext/websocket/lib.rs | 82 ++++++++++++++++++++-------------------- ext/websocket/stream.rs | 9 +++-- 4 files changed, 58 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbd17df162..01668b1a34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1771,9 +1771,15 @@ dependencies = [ "deno_net", "deno_tls", "fastwebsockets 0.5.0", + "fastwebsockets 0.6.0", "h2 0.3.22", + "h2 0.4.0", "http 0.2.11", + "http 1.0.0", + "http-body-util", "hyper 0.14.27", + "hyper 1.1.0", + "hyper-util", "once_cell", "rustls-tokio-stream", "serde", diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml index 74008a027d..2a0a20114d 100644 --- a/ext/websocket/Cargo.toml +++ b/ext/websocket/Cargo.toml @@ -19,9 +19,15 @@ deno_core.workspace = true deno_net.workspace = true deno_tls.workspace = true fastwebsockets = { workspace = true, features = ["upgrade", "unstable-split"] } +fastwebsockets_06 = { package = "fastwebsockets", version = "0.6", features = ["upgrade", "unstable-split"] } h2.workspace = true +h2_04 = { package = "h2", version = "0.4" } http.workspace = true +http-body-util = "0.1" +http_1 = { package = "http", version = "1.0" } hyper = { workspace = true, features = ["backports"] } +hyper-util.workspace = true +hyper1.workspace = true once_cell.workspace = true rustls-tokio-stream.workspace = true serde.workspace = true diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index d25f7ecbdd..ad1483f88b 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -25,15 +25,14 @@ use deno_tls::create_client_config; use deno_tls::rustls::ClientConfig; use deno_tls::RootCertStoreProvider; use deno_tls::SocketUse; -use http::header::CONNECTION; -use http::header::UPGRADE; -use http::HeaderName; -use http::HeaderValue; -use http::Method; -use http::Request; -use http::StatusCode; -use http::Uri; -use hyper::Body; +use http_1::header::CONNECTION; +use http_1::header::UPGRADE; +use http_1::HeaderName; +use http_1::HeaderValue; +use http_1::Method; +use http_1::Request; +use http_1::StatusCode; +use http_1::Uri; use once_cell::sync::Lazy; use rustls_tokio_stream::rustls::RootCertStore; use rustls_tokio_stream::rustls::ServerName; @@ -55,13 +54,13 @@ use tokio::io::ReadHalf; use tokio::io::WriteHalf; use tokio::net::TcpStream; -use fastwebsockets::CloseCode; -use fastwebsockets::FragmentCollectorRead; -use fastwebsockets::Frame; -use fastwebsockets::OpCode; -use fastwebsockets::Role; -use fastwebsockets::WebSocket; -use fastwebsockets::WebSocketWrite; +use fastwebsockets_06::CloseCode; +use fastwebsockets_06::FragmentCollectorRead; +use fastwebsockets_06::Frame; +use fastwebsockets_06::OpCode; +use fastwebsockets_06::Role; +use fastwebsockets_06::WebSocket; +use fastwebsockets_06::WebSocketWrite; mod stream; @@ -157,7 +156,7 @@ async fn handshake_websocket( uri: &Uri, protocols: &str, headers: Option>, -) -> Result<(WebSocket, http::HeaderMap), AnyError> { +) -> Result<(WebSocket, http_1::HeaderMap), AnyError> { let mut request = Request::builder().method(Method::GET).uri( uri .path_and_query() @@ -176,14 +175,14 @@ async fn handshake_websocket( .header(CONNECTION, "Upgrade") .header( "Sec-WebSocket-Key", - fastwebsockets::handshake::generate_key(), + fastwebsockets_06::handshake::generate_key(), ); let user_agent = state.borrow().borrow::().0.clone(); request = populate_common_request_headers(request, &user_agent, protocols, &headers)?; - let request = request.body(Body::empty())?; + let request = request.body(http_body_util::Empty::new())?; let domain = &uri.host().unwrap().to_string(); let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { Some("wss") => 443, @@ -218,19 +217,19 @@ async fn handshake_websocket( } async fn handshake_http1_ws( - request: Request, + request: Request>, addr: &String, -) -> Result<(WebSocket, http::HeaderMap), AnyError> { +) -> Result<(WebSocket, http_1::HeaderMap), AnyError> { let tcp_socket = TcpStream::connect(addr).await?; handshake_connection(request, tcp_socket).await } async fn handshake_http1_wss( state: &Rc>, - request: Request, + request: Request>, domain: &str, addr: &str, -) -> Result<(WebSocket, http::HeaderMap), AnyError> { +) -> Result<(WebSocket, http_1::HeaderMap), AnyError> { let tcp_socket = TcpStream::connect(addr).await?; let tls_config = create_ws_client_config(state, SocketUse::Http1Only)?; let dnsname = @@ -256,7 +255,7 @@ async fn handshake_http2_wss( domain: &str, headers: &Option>, addr: &str, -) -> Result<(WebSocket, http::HeaderMap), AnyError> { +) -> Result<(WebSocket, http_1::HeaderMap), AnyError> { let tcp_socket = TcpStream::connect(addr).await?; let tls_config = create_ws_client_config(state, SocketUse::Http2Only)?; let dnsname = @@ -268,7 +267,7 @@ async fn handshake_http2_wss( if handshake.alpn.is_none() { bail!("Didn't receive h2 alpn, aborting connection"); } - let h2 = h2::client::Builder::new(); + let h2 = h2_04::client::Builder::new(); let (mut send, conn) = h2.handshake::<_, Bytes>(tls_connector).await?; spawn(conn); let mut request = Request::builder(); @@ -281,13 +280,13 @@ async fn handshake_http2_wss( request = request.uri(uri); request = populate_common_request_headers(request, user_agent, protocols, headers)?; - request = request.extension(h2::ext::Protocol::from("websocket")); + request = request.extension(h2_04::ext::Protocol::from("websocket")); let (resp, send) = send.send_request(request.body(())?, false)?; let resp = resp.await?; if resp.status() != StatusCode::OK { bail!("Invalid status code: {}", resp.status()); } - let (http::response::Parts { headers, .. }, recv) = resp.into_parts(); + let (http_1::response::Parts { headers, .. }, recv) = resp.into_parts(); let mut stream = WebSocket::after_handshake( WebSocketStream::new(stream::WsStreamKind::H2(send, recv), None), Role::Client, @@ -302,11 +301,12 @@ async fn handshake_http2_wss( async fn handshake_connection< S: AsyncRead + AsyncWrite + Send + Unpin + 'static, >( - request: Request, + request: Request>, socket: S, -) -> Result<(WebSocket, http::HeaderMap), AnyError> { +) -> Result<(WebSocket, http_1::HeaderMap), AnyError> { let (upgraded, response) = - fastwebsockets::handshake::client(&LocalExecutor, request, socket).await?; + fastwebsockets_06::handshake::client(&LocalExecutor, request, socket) + .await?; let upgraded = upgraded.into_inner(); let stream = @@ -340,11 +340,11 @@ pub fn create_ws_client_config( /// Headers common to both http/1.1 and h2 requests. fn populate_common_request_headers( - mut request: http::request::Builder, + mut request: http_1::request::Builder, user_agent: &str, protocols: &str, headers: &Option>, -) -> Result { +) -> Result { request = request .header("User-Agent", user_agent) .header("Sec-WebSocket-Version", "13"); @@ -362,14 +362,14 @@ fn populate_common_request_headers( let is_disallowed_header = matches!( name, - http::header::HOST - | http::header::SEC_WEBSOCKET_ACCEPT - | http::header::SEC_WEBSOCKET_EXTENSIONS - | http::header::SEC_WEBSOCKET_KEY - | http::header::SEC_WEBSOCKET_PROTOCOL - | http::header::SEC_WEBSOCKET_VERSION - | http::header::UPGRADE - | http::header::CONNECTION + http_1::header::HOST + | http_1::header::SEC_WEBSOCKET_ACCEPT + | http_1::header::SEC_WEBSOCKET_EXTENSIONS + | http_1::header::SEC_WEBSOCKET_KEY + | http_1::header::SEC_WEBSOCKET_PROTOCOL + | http_1::header::SEC_WEBSOCKET_VERSION + | http_1::header::UPGRADE + | http_1::header::CONNECTION ); if !is_disallowed_header { request = request.header(name, v); @@ -892,7 +892,7 @@ pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> { #[derive(Clone)] struct LocalExecutor; -impl hyper::rt::Executor for LocalExecutor +impl hyper1::rt::Executor for LocalExecutor where Fut: Future + 'static, Fut::Output: 'static, diff --git a/ext/websocket/stream.rs b/ext/websocket/stream.rs index 7e36c81475..7e0201ae46 100644 --- a/ext/websocket/stream.rs +++ b/ext/websocket/stream.rs @@ -2,9 +2,10 @@ use bytes::Buf; use bytes::Bytes; use deno_net::raw::NetworkStream; -use h2::RecvStream; -use h2::SendStream; -use hyper::upgrade::Upgraded; +use h2_04::RecvStream; +use h2_04::SendStream; +use hyper1::upgrade::Upgraded; +use hyper_util::rt::TokioIo; use std::io::ErrorKind; use std::pin::Pin; use std::task::ready; @@ -15,7 +16,7 @@ use tokio::io::ReadBuf; // TODO(bartlomieju): remove this pub(crate) enum WsStreamKind { - Upgraded(Upgraded), + Upgraded(TokioIo), Network(NetworkStream), H2(SendStream, RecvStream), }