1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-30 16:40:57 -05:00

refactor: migrate ext/websocket to hyper 1.1 (#21699)

This commit is contained in:
Bartek Iwańczuk 2023-12-26 11:20:49 +01:00
parent e7dedb8036
commit 0533ae4380
No known key found for this signature in database
GPG key ID: 0C6BCDDC3B3AD750
4 changed files with 58 additions and 45 deletions

6
Cargo.lock generated
View file

@ -1771,9 +1771,15 @@ dependencies = [
"deno_net", "deno_net",
"deno_tls", "deno_tls",
"fastwebsockets 0.5.0", "fastwebsockets 0.5.0",
"fastwebsockets 0.6.0",
"h2 0.3.22", "h2 0.3.22",
"h2 0.4.0",
"http 0.2.11", "http 0.2.11",
"http 1.0.0",
"http-body-util",
"hyper 0.14.27", "hyper 0.14.27",
"hyper 1.1.0",
"hyper-util",
"once_cell", "once_cell",
"rustls-tokio-stream", "rustls-tokio-stream",
"serde", "serde",

View file

@ -19,9 +19,15 @@ deno_core.workspace = true
deno_net.workspace = true deno_net.workspace = true
deno_tls.workspace = true deno_tls.workspace = true
fastwebsockets = { workspace = true, features = ["upgrade", "unstable-split"] } fastwebsockets = { workspace = true, features = ["upgrade", "unstable-split"] }
fastwebsockets_06 = { package = "fastwebsockets", version = "0.6", features = ["upgrade", "unstable-split"] }
h2.workspace = true h2.workspace = true
h2_04 = { package = "h2", version = "0.4" }
http.workspace = true http.workspace = true
http-body-util = "0.1"
http_1 = { package = "http", version = "1.0" }
hyper = { workspace = true, features = ["backports"] } hyper = { workspace = true, features = ["backports"] }
hyper-util.workspace = true
hyper1.workspace = true
once_cell.workspace = true once_cell.workspace = true
rustls-tokio-stream.workspace = true rustls-tokio-stream.workspace = true
serde.workspace = true serde.workspace = true

View file

@ -25,15 +25,14 @@ use deno_tls::create_client_config;
use deno_tls::rustls::ClientConfig; use deno_tls::rustls::ClientConfig;
use deno_tls::RootCertStoreProvider; use deno_tls::RootCertStoreProvider;
use deno_tls::SocketUse; use deno_tls::SocketUse;
use http::header::CONNECTION; use http_1::header::CONNECTION;
use http::header::UPGRADE; use http_1::header::UPGRADE;
use http::HeaderName; use http_1::HeaderName;
use http::HeaderValue; use http_1::HeaderValue;
use http::Method; use http_1::Method;
use http::Request; use http_1::Request;
use http::StatusCode; use http_1::StatusCode;
use http::Uri; use http_1::Uri;
use hyper::Body;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use rustls_tokio_stream::rustls::RootCertStore; use rustls_tokio_stream::rustls::RootCertStore;
use rustls_tokio_stream::rustls::ServerName; use rustls_tokio_stream::rustls::ServerName;
@ -55,13 +54,13 @@ use tokio::io::ReadHalf;
use tokio::io::WriteHalf; use tokio::io::WriteHalf;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use fastwebsockets::CloseCode; use fastwebsockets_06::CloseCode;
use fastwebsockets::FragmentCollectorRead; use fastwebsockets_06::FragmentCollectorRead;
use fastwebsockets::Frame; use fastwebsockets_06::Frame;
use fastwebsockets::OpCode; use fastwebsockets_06::OpCode;
use fastwebsockets::Role; use fastwebsockets_06::Role;
use fastwebsockets::WebSocket; use fastwebsockets_06::WebSocket;
use fastwebsockets::WebSocketWrite; use fastwebsockets_06::WebSocketWrite;
mod stream; mod stream;
@ -157,7 +156,7 @@ async fn handshake_websocket(
uri: &Uri, uri: &Uri,
protocols: &str, protocols: &str,
headers: Option<Vec<(ByteString, ByteString)>>, headers: Option<Vec<(ByteString, ByteString)>>,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> { ) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
let mut request = Request::builder().method(Method::GET).uri( let mut request = Request::builder().method(Method::GET).uri(
uri uri
.path_and_query() .path_and_query()
@ -176,14 +175,14 @@ async fn handshake_websocket(
.header(CONNECTION, "Upgrade") .header(CONNECTION, "Upgrade")
.header( .header(
"Sec-WebSocket-Key", "Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(), fastwebsockets_06::handshake::generate_key(),
); );
let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone(); let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone();
request = request =
populate_common_request_headers(request, &user_agent, protocols, &headers)?; populate_common_request_headers(request, &user_agent, protocols, &headers)?;
let request = request.body(Body::empty())?; let request = request.body(http_body_util::Empty::new())?;
let domain = &uri.host().unwrap().to_string(); let domain = &uri.host().unwrap().to_string();
let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
Some("wss") => 443, Some("wss") => 443,
@ -218,19 +217,19 @@ async fn handshake_websocket(
} }
async fn handshake_http1_ws( async fn handshake_http1_ws(
request: Request<Body>, request: Request<http_body_util::Empty<Bytes>>,
addr: &String, addr: &String,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> { ) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?; let tcp_socket = TcpStream::connect(addr).await?;
handshake_connection(request, tcp_socket).await handshake_connection(request, tcp_socket).await
} }
async fn handshake_http1_wss( async fn handshake_http1_wss(
state: &Rc<RefCell<OpState>>, state: &Rc<RefCell<OpState>>,
request: Request<Body>, request: Request<http_body_util::Empty<Bytes>>,
domain: &str, domain: &str,
addr: &str, addr: &str,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> { ) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?; let tcp_socket = TcpStream::connect(addr).await?;
let tls_config = create_ws_client_config(state, SocketUse::Http1Only)?; let tls_config = create_ws_client_config(state, SocketUse::Http1Only)?;
let dnsname = let dnsname =
@ -256,7 +255,7 @@ async fn handshake_http2_wss(
domain: &str, domain: &str,
headers: &Option<Vec<(ByteString, ByteString)>>, headers: &Option<Vec<(ByteString, ByteString)>>,
addr: &str, addr: &str,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> { ) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
let tcp_socket = TcpStream::connect(addr).await?; let tcp_socket = TcpStream::connect(addr).await?;
let tls_config = create_ws_client_config(state, SocketUse::Http2Only)?; let tls_config = create_ws_client_config(state, SocketUse::Http2Only)?;
let dnsname = let dnsname =
@ -268,7 +267,7 @@ async fn handshake_http2_wss(
if handshake.alpn.is_none() { if handshake.alpn.is_none() {
bail!("Didn't receive h2 alpn, aborting connection"); bail!("Didn't receive h2 alpn, aborting connection");
} }
let h2 = h2::client::Builder::new(); let h2 = h2_04::client::Builder::new();
let (mut send, conn) = h2.handshake::<_, Bytes>(tls_connector).await?; let (mut send, conn) = h2.handshake::<_, Bytes>(tls_connector).await?;
spawn(conn); spawn(conn);
let mut request = Request::builder(); let mut request = Request::builder();
@ -281,13 +280,13 @@ async fn handshake_http2_wss(
request = request.uri(uri); request = request.uri(uri);
request = request =
populate_common_request_headers(request, user_agent, protocols, headers)?; populate_common_request_headers(request, user_agent, protocols, headers)?;
request = request.extension(h2::ext::Protocol::from("websocket")); request = request.extension(h2_04::ext::Protocol::from("websocket"));
let (resp, send) = send.send_request(request.body(())?, false)?; let (resp, send) = send.send_request(request.body(())?, false)?;
let resp = resp.await?; let resp = resp.await?;
if resp.status() != StatusCode::OK { if resp.status() != StatusCode::OK {
bail!("Invalid status code: {}", resp.status()); bail!("Invalid status code: {}", resp.status());
} }
let (http::response::Parts { headers, .. }, recv) = resp.into_parts(); let (http_1::response::Parts { headers, .. }, recv) = resp.into_parts();
let mut stream = WebSocket::after_handshake( let mut stream = WebSocket::after_handshake(
WebSocketStream::new(stream::WsStreamKind::H2(send, recv), None), WebSocketStream::new(stream::WsStreamKind::H2(send, recv), None),
Role::Client, Role::Client,
@ -302,11 +301,12 @@ async fn handshake_http2_wss(
async fn handshake_connection< async fn handshake_connection<
S: AsyncRead + AsyncWrite + Send + Unpin + 'static, S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
>( >(
request: Request<Body>, request: Request<http_body_util::Empty<Bytes>>,
socket: S, socket: S,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> { ) -> Result<(WebSocket<WebSocketStream>, http_1::HeaderMap), AnyError> {
let (upgraded, response) = let (upgraded, response) =
fastwebsockets::handshake::client(&LocalExecutor, request, socket).await?; fastwebsockets_06::handshake::client(&LocalExecutor, request, socket)
.await?;
let upgraded = upgraded.into_inner(); let upgraded = upgraded.into_inner();
let stream = let stream =
@ -340,11 +340,11 @@ pub fn create_ws_client_config(
/// Headers common to both http/1.1 and h2 requests. /// Headers common to both http/1.1 and h2 requests.
fn populate_common_request_headers( fn populate_common_request_headers(
mut request: http::request::Builder, mut request: http_1::request::Builder,
user_agent: &str, user_agent: &str,
protocols: &str, protocols: &str,
headers: &Option<Vec<(ByteString, ByteString)>>, headers: &Option<Vec<(ByteString, ByteString)>>,
) -> Result<http::request::Builder, AnyError> { ) -> Result<http_1::request::Builder, AnyError> {
request = request request = request
.header("User-Agent", user_agent) .header("User-Agent", user_agent)
.header("Sec-WebSocket-Version", "13"); .header("Sec-WebSocket-Version", "13");
@ -362,14 +362,14 @@ fn populate_common_request_headers(
let is_disallowed_header = matches!( let is_disallowed_header = matches!(
name, name,
http::header::HOST http_1::header::HOST
| http::header::SEC_WEBSOCKET_ACCEPT | http_1::header::SEC_WEBSOCKET_ACCEPT
| http::header::SEC_WEBSOCKET_EXTENSIONS | http_1::header::SEC_WEBSOCKET_EXTENSIONS
| http::header::SEC_WEBSOCKET_KEY | http_1::header::SEC_WEBSOCKET_KEY
| http::header::SEC_WEBSOCKET_PROTOCOL | http_1::header::SEC_WEBSOCKET_PROTOCOL
| http::header::SEC_WEBSOCKET_VERSION | http_1::header::SEC_WEBSOCKET_VERSION
| http::header::UPGRADE | http_1::header::UPGRADE
| http::header::CONNECTION | http_1::header::CONNECTION
); );
if !is_disallowed_header { if !is_disallowed_header {
request = request.header(name, v); request = request.header(name, v);
@ -892,7 +892,7 @@ pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> {
#[derive(Clone)] #[derive(Clone)]
struct LocalExecutor; struct LocalExecutor;
impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor impl<Fut> hyper1::rt::Executor<Fut> for LocalExecutor
where where
Fut: Future + 'static, Fut: Future + 'static,
Fut::Output: 'static, Fut::Output: 'static,

View file

@ -2,9 +2,10 @@
use bytes::Buf; use bytes::Buf;
use bytes::Bytes; use bytes::Bytes;
use deno_net::raw::NetworkStream; use deno_net::raw::NetworkStream;
use h2::RecvStream; use h2_04::RecvStream;
use h2::SendStream; use h2_04::SendStream;
use hyper::upgrade::Upgraded; use hyper1::upgrade::Upgraded;
use hyper_util::rt::TokioIo;
use std::io::ErrorKind; use std::io::ErrorKind;
use std::pin::Pin; use std::pin::Pin;
use std::task::ready; use std::task::ready;
@ -15,7 +16,7 @@ use tokio::io::ReadBuf;
// TODO(bartlomieju): remove this // TODO(bartlomieju): remove this
pub(crate) enum WsStreamKind { pub(crate) enum WsStreamKind {
Upgraded(Upgraded), Upgraded(TokioIo<Upgraded>),
Network(NetworkStream), Network(NetworkStream),
H2(SendStream<Bytes>, RecvStream), H2(SendStream<Bytes>, RecvStream),
} }