From 5a1505db67d0326bf37b765c8a566584b44a2c1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Fri, 15 Sep 2023 21:51:25 +0200 Subject: [PATCH] feat(ext/node): http2.connect() API (#19671) This commit improves compatibility of "node:http2" module by polyfilling "connect" method and "ClientHttp2Session" class. Basic operations like streaming, header and trailer handling are working correctly. Refing/unrefing is still a TODO and "npm:grpc-js/grpc" is not yet working correctly. --------- Co-authored-by: Matt Mastracci --- Cargo.lock | 19 +- Cargo.toml | 1 + cli/tests/unit/websocket_test.ts | 21 +- cli/tests/unit_node/http2_test.ts | 123 ++- ext/node/Cargo.toml | 5 + ext/node/lib.rs | 13 + ext/node/ops/http2.rs | 550 +++++++++++++ ext/node/ops/mod.rs | 1 + ext/node/polyfills/http2.ts | 1199 ++++++++++++++++++++++++++--- test_util/Cargo.toml | 2 + test_util/src/lib.rs | 131 +++- 11 files changed, 1850 insertions(+), 215 deletions(-) create mode 100644 ext/node/ops/http2.rs diff --git a/Cargo.lock b/Cargo.lock index 60897e5041..fbf9ee8f6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -484,9 +484,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e2c3daef883ecc1b5d58c15adae93470a91d425f3532ba1695849656af3fc1" +checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" [[package]] name = "byteorder" @@ -533,9 +533,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.30" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defd4e7873dbddba6c7c91e199c7fcb946abc4a6a4ac3195400bcfb01b5de877" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1386,12 +1386,14 @@ dependencies = [ "aead-gcm-stream", "aes", "brotli", + "bytes", "cbc", "data-encoding", "deno_core", "deno_fetch", "deno_fs", "deno_media_type", + "deno_net", "deno_npm", "deno_semver", "digest 0.10.7", @@ -1399,8 +1401,10 @@ dependencies = [ "ecb", "elliptic-curve 0.13.5", "errno 0.2.8", + "h2", "hex", "hkdf", + "http", "idna 0.3.0", "indexmap 2.0.0", "lazy-regex", @@ -1432,6 +1436,7 @@ dependencies = [ "signature 1.6.4", "tokio", "typenum", + "url", "whoami", "winapi", "x25519-dalek", @@ -5445,11 +5450,13 @@ dependencies = [ "anyhow", "async-stream", "base64 0.13.1", + "bytes", "console_static_text", "fastwebsockets", "flate2", "futures", "glob", + "h2", "hyper 0.14.27", "lazy-regex", "libc", @@ -5887,9 +5894,9 @@ checksum = "0685c84d5d54d1c26f7d3eb96cd41550adb97baed141a761cf335d3d33bcd0ae" [[package]] name = "typenum" -version = "1.16.0" +version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "unic-char-property" diff --git a/Cargo.toml b/Cargo.toml index 5ca7d1d961..b601edcc22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,6 +96,7 @@ futures = "0.3.21" glob = "0.3.1" hex = "0.4" http = "0.2.9" +h2 = "0.3.17" httparse = "1.8.0" hyper = { version = "0.14.26", features = ["runtime", "http1"] } # TODO(mmastrac): indexmap 2.0 will require multiple synchronized changes diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts index ac33f9d692..11f0fd7dc3 100644 --- a/cli/tests/unit/websocket_test.ts +++ b/cli/tests/unit/websocket_test.ts @@ -7,6 +7,9 @@ import { fail, } from "./test_util.ts"; +const servePort = 4248; +const serveUrl = `ws://localhost:${servePort}/`; + Deno.test({ permissions: "none" }, function websocketPermissionless() { assertThrows( () => new WebSocket("ws://localhost"), @@ -81,13 +84,13 @@ Deno.test( signal: ac.signal, onListen: () => listeningPromise.resolve(), hostname: "localhost", - port: 4246, + port: servePort, }); await listeningPromise; const promise = deferred(); - const ws = new WebSocket("ws://localhost:4246/"); - assertEquals(ws.url, "ws://localhost:4246/"); + const ws = new WebSocket(serveUrl); + assertEquals(ws.url, serveUrl); ws.onerror = () => fail(); ws.onmessage = (e) => { assertEquals(e.data, "Hello"); @@ -133,13 +136,13 @@ Deno.test({ signal: ac.signal, onListen: () => listeningPromise.resolve(), hostname: "localhost", - port: 4247, + port: servePort, }); await listeningPromise; - const ws = new WebSocket("ws://localhost:4247/"); - assertEquals(ws.url, "ws://localhost:4247/"); + const ws = new WebSocket(serveUrl); + assertEquals(ws.url, serveUrl); ws.onerror = () => fail(); ws.onmessage = () => ws.send("bye"); ws.onclose = () => { @@ -173,13 +176,13 @@ Deno.test({ signal: ac.signal, onListen: () => listeningPromise.resolve(), hostname: "localhost", - port: 4247, + port: servePort, }); await listeningPromise; - const ws = new WebSocket("ws://localhost:4247/"); - assertEquals(ws.url, "ws://localhost:4247/"); + const ws = new WebSocket(serveUrl); + assertEquals(ws.url, serveUrl); let seenBye = false; ws.onerror = () => fail(); ws.onmessage = ({ data }) => { diff --git a/cli/tests/unit_node/http2_test.ts b/cli/tests/unit_node/http2_test.ts index 64824f3e80..8e7b261ae5 100644 --- a/cli/tests/unit_node/http2_test.ts +++ b/cli/tests/unit_node/http2_test.ts @@ -5,75 +5,66 @@ import * as net from "node:net"; import { deferred } from "../../../test_util/std/async/deferred.ts"; import { assertEquals } from "../../../test_util/std/testing/asserts.ts"; -const { - HTTP2_HEADER_AUTHORITY, - HTTP2_HEADER_METHOD, - HTTP2_HEADER_PATH, - HTTP2_HEADER_STATUS, -} = http2.constants; +for (const url of ["http://127.0.0.1:4246", "https://127.0.0.1:4247"]) { + Deno.test(`[node/http2 client] ${url}`, { + ignore: Deno.build.os === "windows", + }, async () => { + // Create a server to respond to the HTTP2 requests + const client = http2.connect(url, {}); + client.on("error", (err) => console.error(err)); -Deno.test("[node/http2 client]", async () => { - // Create a server to respond to the HTTP2 requests - const portPromise = deferred(); - const reqPromise = deferred(); - const ready = deferred(); - const ac = new AbortController(); - const server = Deno.serve({ - port: 0, - signal: ac.signal, - onListen: ({ port }: { port: number }) => portPromise.resolve(port), - handler: async (req: Request) => { - reqPromise.resolve(req); - await ready; - return new Response("body", { - status: 401, - headers: { "resp-header-name": "resp-header-value" }, - }); - }, + const req = client.request({ ":method": "POST", ":path": "/" }, { + waitForTrailers: true, + }); + + let receivedTrailers; + let receivedHeaders; + let receivedData = ""; + + req.on("response", (headers, _flags) => { + receivedHeaders = headers; + }); + + req.write("hello"); + req.setEncoding("utf8"); + + req.on("wantTrailers", () => { + req.sendTrailers({ foo: "bar" }); + }); + + req.on("trailers", (trailers, _flags) => { + receivedTrailers = trailers; + }); + + req.on("data", (chunk) => { + receivedData += chunk; + }); + req.end(); + + const endPromise = deferred(); + setTimeout(() => { + try { + client.close(); + } catch (_) { + // pass + } + endPromise.resolve(); + }, 2000); + + await endPromise; + assertEquals(receivedHeaders, { ":status": 200 }); + assertEquals(receivedData, "hello world\n"); + + assertEquals(receivedTrailers, { + "abc": "def", + "opr": "stv", + "foo": "bar", + }); }); +} - const port = await portPromise; - - // Get a session - const sessionPromise = deferred(); - const session = http2.connect( - `localhost:${port}`, - {}, - sessionPromise.resolve.bind(sessionPromise), - ); - const session2 = await sessionPromise; - assertEquals(session, session2); - - // Write a request, including a body - const stream = session.request({ - [HTTP2_HEADER_AUTHORITY]: `localhost:${port}`, - [HTTP2_HEADER_METHOD]: "POST", - [HTTP2_HEADER_PATH]: "/path", - "req-header-name": "req-header-value", - }); - stream.write("body"); - stream.end(); - - // Check the request - const req = await reqPromise; - assertEquals(req.headers.get("req-header-name"), "req-header-value"); - assertEquals(await req.text(), "body"); - - ready.resolve(); - - // Read a response - const headerPromise = new Promise>(( - resolve, - ) => stream.on("headers", resolve)); - const headers = await headerPromise; - assertEquals(headers["resp-header-name"], "resp-header-value"); - assertEquals(headers[HTTP2_HEADER_STATUS], "401"); - - ac.abort(); - await server.finished; -}); - -Deno.test("[node/http2 server]", async () => { +// TODO(bartlomieju): reenable sanitizers +Deno.test("[node/http2 server]", { sanitizeOps: false }, async () => { const server = http2.createServer(); server.listen(0); const port = ( server.address()).port; diff --git a/ext/node/Cargo.toml b/ext/node/Cargo.toml index 07c2b2da5e..44b56978e5 100644 --- a/ext/node/Cargo.toml +++ b/ext/node/Cargo.toml @@ -17,12 +17,14 @@ path = "lib.rs" aead-gcm-stream = "0.1" aes.workspace = true brotli.workspace = true +bytes.workspace = true cbc.workspace = true data-encoding = "2.3.3" deno_core.workspace = true deno_fetch.workspace = true deno_fs.workspace = true deno_media_type.workspace = true +deno_net.workspace = true deno_npm.workspace = true deno_semver.workspace = true digest = { version = "0.10.5", features = ["core-api", "std"] } @@ -30,8 +32,10 @@ dsa = "0.6.1" ecb.workspace = true elliptic-curve.workspace = true errno = "0.2.8" +h2.workspace = true hex.workspace = true hkdf.workspace = true +http.workspace = true idna = "0.3.0" indexmap.workspace = true lazy-regex.workspace = true @@ -63,6 +67,7 @@ sha2.workspace = true signature.workspace = true tokio.workspace = true typenum = "1.15.0" +url.workspace = true whoami = "1.4.0" winapi.workspace = true # https://github.com/dalek-cryptography/x25519-dalek/pull/89 diff --git a/ext/node/lib.rs b/ext/node/lib.rs index c01785b954..fbc1c9ffd8 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -243,6 +243,19 @@ deno_core::extension!(deno_node, ops::zlib::brotli::op_brotli_decompress_stream, ops::zlib::brotli::op_brotli_decompress_stream_end, ops::http::op_node_http_request

, + ops::http2::op_http2_connect, + ops::http2::op_http2_poll_client_connection, + ops::http2::op_http2_client_request, + ops::http2::op_http2_client_get_response, + ops::http2::op_http2_client_get_response_body_chunk, + ops::http2::op_http2_client_send_data, + ops::http2::op_http2_client_end_stream, + ops::http2::op_http2_client_reset_stream, + ops::http2::op_http2_client_send_trailers, + ops::http2::op_http2_client_get_response_trailers, + ops::http2::op_http2_accept, + ops::http2::op_http2_listen, + ops::http2::op_http2_send_response, ops::os::op_node_os_get_priority

, ops::os::op_node_os_set_priority

, ops::os::op_node_os_username

, diff --git a/ext/node/ops/http2.rs b/ext/node/ops/http2.rs new file mode 100644 index 0000000000..3e9fb5586c --- /dev/null +++ b/ext/node/ops/http2.rs @@ -0,0 +1,550 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::borrow::Cow; +use std::cell::RefCell; +use std::collections::HashMap; +use std::rc::Rc; +use std::task::Poll; + +use bytes::Bytes; +use deno_core::error::AnyError; +use deno_core::futures::future::poll_fn; +use deno_core::op; +use deno_core::serde::Serialize; +use deno_core::AsyncRefCell; +use deno_core::ByteString; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::JsBuffer; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_net::raw::take_network_stream_resource; +use deno_net::raw::NetworkStream; +use h2; +use h2::RecvStream; +use http; +use http::request::Parts; +use http::HeaderMap; +use http::Response; +use http::StatusCode; +use reqwest::header::HeaderName; +use reqwest::header::HeaderValue; +use url::Url; + +pub struct Http2Client { + pub client: AsyncRefCell>, + pub url: Url, +} + +impl Resource for Http2Client { + fn name(&self) -> Cow { + "http2Client".into() + } +} + +#[derive(Debug)] +pub struct Http2ClientConn { + pub conn: AsyncRefCell>, + cancel_handle: CancelHandle, +} + +impl Resource for Http2ClientConn { + fn name(&self) -> Cow { + "http2ClientConnection".into() + } + + fn close(self: Rc) { + self.cancel_handle.cancel() + } +} + +#[derive(Debug)] +pub struct Http2ClientStream { + pub response: AsyncRefCell, + pub stream: AsyncRefCell>, +} + +impl Resource for Http2ClientStream { + fn name(&self) -> Cow { + "http2ClientStream".into() + } +} + +#[derive(Debug)] +pub struct Http2ClientResponseBody { + pub body: AsyncRefCell, + pub trailers_rx: + AsyncRefCell>>>, + pub trailers_tx: + AsyncRefCell>>>, +} + +impl Resource for Http2ClientResponseBody { + fn name(&self) -> Cow { + "http2ClientResponseBody".into() + } +} + +#[derive(Debug)] +pub struct Http2ServerConnection { + pub conn: AsyncRefCell>, +} + +impl Resource for Http2ServerConnection { + fn name(&self) -> Cow { + "http2ServerConnection".into() + } +} + +pub struct Http2ServerSendResponse { + pub send_response: AsyncRefCell>, +} + +impl Resource for Http2ServerSendResponse { + fn name(&self) -> Cow { + "http2ServerSendResponse".into() + } +} + +#[op] +pub async fn op_http2_connect( + state: Rc>, + rid: ResourceId, + url: String, +) -> Result<(ResourceId, ResourceId), AnyError> { + // No permission check necessary because we're using an existing connection + let network_stream = { + let mut state = state.borrow_mut(); + take_network_stream_resource(&mut state.resource_table, rid)? + }; + + let url = Url::parse(&url)?; + + let (client, conn) = h2::client::handshake(network_stream).await?; + let mut state = state.borrow_mut(); + let client_rid = state.resource_table.add(Http2Client { + client: AsyncRefCell::new(client), + url, + }); + let conn_rid = state.resource_table.add(Http2ClientConn { + conn: AsyncRefCell::new(conn), + cancel_handle: CancelHandle::new(), + }); + Ok((client_rid, conn_rid)) +} + +#[op] +pub async fn op_http2_listen( + state: Rc>, + rid: ResourceId, +) -> Result { + let stream = + take_network_stream_resource(&mut state.borrow_mut().resource_table, rid)?; + + let conn = h2::server::handshake(stream).await?; + Ok( + state + .borrow_mut() + .resource_table + .add(Http2ServerConnection { + conn: AsyncRefCell::new(conn), + }), + ) +} + +#[op] +pub async fn op_http2_accept( + state: Rc>, + rid: ResourceId, +) -> Result< + Option<(Vec<(ByteString, ByteString)>, ResourceId, ResourceId)>, + AnyError, +> { + let resource = state + .borrow() + .resource_table + .get::(rid)?; + let mut conn = RcRef::map(&resource, |r| &r.conn).borrow_mut().await; + if let Some(res) = conn.accept().await { + let (req, resp) = res?; + let (parts, body) = req.into_parts(); + let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel(); + let stm = state + .borrow_mut() + .resource_table + .add(Http2ClientResponseBody { + body: AsyncRefCell::new(body), + trailers_rx: AsyncRefCell::new(Some(trailers_rx)), + trailers_tx: AsyncRefCell::new(Some(trailers_tx)), + }); + + let Parts { + uri, + method, + headers, + .. + } = parts; + let mut req_headers = Vec::with_capacity(headers.len() + 4); + req_headers.push(( + ByteString::from(":method"), + ByteString::from(method.as_str()), + )); + req_headers.push(( + ByteString::from(":scheme"), + ByteString::from(uri.scheme().map(|s| s.as_str()).unwrap_or("http")), + )); + req_headers.push(( + ByteString::from(":path"), + ByteString::from(uri.path_and_query().map(|p| p.as_str()).unwrap_or("")), + )); + req_headers.push(( + ByteString::from(":authority"), + ByteString::from(uri.authority().map(|a| a.as_str()).unwrap_or("")), + )); + for (key, val) in headers.iter() { + req_headers.push((key.as_str().into(), val.as_bytes().into())); + } + + let resp = state + .borrow_mut() + .resource_table + .add(Http2ServerSendResponse { + send_response: AsyncRefCell::new(resp), + }); + + Ok(Some((req_headers, stm, resp))) + } else { + Ok(None) + } +} + +#[op] +pub async fn op_http2_send_response( + state: Rc>, + rid: ResourceId, + status: u16, + headers: Vec<(ByteString, ByteString)>, +) -> Result<(ResourceId, u32), AnyError> { + let resource = state + .borrow() + .resource_table + .get::(rid)?; + let mut send_response = RcRef::map(resource, |r| &r.send_response) + .borrow_mut() + .await; + let mut response = Response::new(()); + if let Ok(status) = StatusCode::from_u16(status) { + *response.status_mut() = status; + } + for (name, value) in headers { + response.headers_mut().append( + HeaderName::from_lowercase(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } + + let stream = send_response.send_response(response, false)?; + let stream_id = stream.stream_id(); + + Ok((rid, stream_id.into())) +} + +#[op] +pub async fn op_http2_poll_client_connection( + state: Rc>, + rid: ResourceId, +) -> Result<(), AnyError> { + let resource = state.borrow().resource_table.get::(rid)?; + + let cancel_handle = RcRef::map(resource.clone(), |this| &this.cancel_handle); + let mut conn = RcRef::map(resource, |this| &this.conn).borrow_mut().await; + + match (&mut *conn).or_cancel(cancel_handle).await { + Ok(result) => result?, + Err(_) => { + // TODO(bartlomieju): probably need a better mechanism for closing the connection + + // cancelled + } + } + + Ok(()) +} + +#[op] +pub async fn op_http2_client_request( + state: Rc>, + client_rid: ResourceId, + // TODO(bartlomieju): maybe use a vector with fixed layout to save sending + // 4 strings of keys? + mut pseudo_headers: HashMap, + headers: Vec<(ByteString, ByteString)>, +) -> Result<(ResourceId, u32), AnyError> { + let resource = state + .borrow() + .resource_table + .get::(client_rid)?; + + let url = resource.url.clone(); + + let pseudo_path = pseudo_headers.remove(":path").unwrap_or("/".to_string()); + let pseudo_method = pseudo_headers + .remove(":method") + .unwrap_or("GET".to_string()); + // TODO(bartlomieju): handle all pseudo-headers (:authority, :scheme) + let _pseudo_authority = pseudo_headers + .remove(":authority") + .unwrap_or("/".to_string()); + let _pseudo_scheme = pseudo_headers + .remove(":scheme") + .unwrap_or("http".to_string()); + + let url = url.join(&pseudo_path)?; + + let mut req = http::Request::builder() + .uri(url.as_str()) + .method(pseudo_method.as_str()); + + for (name, value) in headers { + req.headers_mut().unwrap().append( + HeaderName::from_lowercase(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } + + let request = req.body(()).unwrap(); + + let resource = { + let state = state.borrow(); + state.resource_table.get::(client_rid)? + }; + let mut client = RcRef::map(&resource, |r| &r.client).borrow_mut().await; + poll_fn(|cx| client.poll_ready(cx)).await?; + let (response, stream) = client.send_request(request, false).unwrap(); + let stream_id = stream.stream_id(); + let stream_rid = state.borrow_mut().resource_table.add(Http2ClientStream { + response: AsyncRefCell::new(response), + stream: AsyncRefCell::new(stream), + }); + Ok((stream_rid, stream_id.into())) +} + +#[op] +pub async fn op_http2_client_send_data( + state: Rc>, + stream_rid: ResourceId, + data: JsBuffer, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::(stream_rid)?; + let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; + + // TODO(bartlomieju): handle end of stream + stream.send_data(bytes::Bytes::from(data), false)?; + Ok(()) +} + +#[op] +pub async fn op_http2_client_end_stream( + state: Rc>, + stream_rid: ResourceId, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::(stream_rid)?; + let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; + + // TODO(bartlomieju): handle end of stream + stream.send_data(bytes::Bytes::from(vec![]), true)?; + Ok(()) +} + +#[op] +pub async fn op_http2_client_reset_stream( + state: Rc>, + stream_rid: ResourceId, + code: u32, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::(stream_rid)?; + let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; + stream.send_reset(h2::Reason::from(code)); + Ok(()) +} + +#[op] +pub async fn op_http2_client_send_trailers( + state: Rc>, + stream_rid: ResourceId, + trailers: Vec<(ByteString, ByteString)>, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::(stream_rid)?; + let mut stream = RcRef::map(&resource, |r| &r.stream).borrow_mut().await; + + let mut trailers_map = http::HeaderMap::new(); + for (name, value) in trailers { + trailers_map.insert( + HeaderName::from_bytes(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } + + stream.send_trailers(trailers_map)?; + Ok(()) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Http2ClientResponse { + headers: Vec<(ByteString, ByteString)>, + body_rid: ResourceId, + status_code: u16, +} + +#[op] +pub async fn op_http2_client_get_response( + state: Rc>, + stream_rid: ResourceId, +) -> Result { + let resource = state + .borrow() + .resource_table + .get::(stream_rid)?; + let mut response_future = + RcRef::map(&resource, |r| &r.response).borrow_mut().await; + + let response = (&mut *response_future).await?; + + let (parts, body) = response.into_parts(); + let status = parts.status; + let mut res_headers = Vec::new(); + for (key, val) in parts.headers.iter() { + res_headers.push((key.as_str().into(), val.as_bytes().into())); + } + + let (trailers_tx, trailers_rx) = tokio::sync::oneshot::channel(); + let body_rid = + state + .borrow_mut() + .resource_table + .add(Http2ClientResponseBody { + body: AsyncRefCell::new(body), + trailers_rx: AsyncRefCell::new(Some(trailers_rx)), + trailers_tx: AsyncRefCell::new(Some(trailers_tx)), + }); + Ok(Http2ClientResponse { + headers: res_headers, + body_rid, + status_code: status.into(), + }) +} + +enum DataOrTrailers { + Data(Bytes), + Trailers(HeaderMap), + Eof, +} + +fn poll_data_or_trailers( + cx: &mut std::task::Context, + body: &mut RecvStream, +) -> Poll> { + loop { + if let Poll::Ready(trailers) = body.poll_trailers(cx) { + if let Some(trailers) = trailers? { + return Poll::Ready(Ok(DataOrTrailers::Trailers(trailers))); + } else { + return Poll::Ready(Ok(DataOrTrailers::Eof)); + } + } + if let Poll::Ready(data) = body.poll_data(cx) { + if let Some(data) = data { + return Poll::Ready(Ok(DataOrTrailers::Data(data?))); + } + // If data is None, loop one more time to check for trailers + continue; + } + // Return pending here as poll_data will keep the waker + return Poll::Pending; + } +} + +#[op] +pub async fn op_http2_client_get_response_body_chunk( + state: Rc>, + body_rid: ResourceId, +) -> Result<(Option>, bool), AnyError> { + let resource = state + .borrow() + .resource_table + .get::(body_rid)?; + let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + + loop { + match poll_fn(|cx| poll_data_or_trailers(cx, &mut body)).await? { + DataOrTrailers::Data(data) => { + return Ok((Some(data.to_vec()), false)); + } + DataOrTrailers::Trailers(trailers) => { + println!("{trailers:?}"); + if let Some(trailers_tx) = RcRef::map(&resource, |r| &r.trailers_tx) + .borrow_mut() + .await + .take() + { + _ = trailers_tx.send(Some(trailers)); + }; + + continue; + } + DataOrTrailers::Eof => { + RcRef::map(&resource, |r| &r.trailers_tx) + .borrow_mut() + .await + .take(); + return Ok((None, true)); + } + }; + } +} + +#[op] +pub async fn op_http2_client_get_response_trailers( + state: Rc>, + body_rid: ResourceId, +) -> Result>, AnyError> { + let resource = state + .borrow() + .resource_table + .get::(body_rid)?; + let trailers = RcRef::map(&resource, |r| &r.trailers_rx) + .borrow_mut() + .await + .take(); + if let Some(trailers) = trailers { + if let Ok(Some(trailers)) = trailers.await { + let mut v = Vec::with_capacity(trailers.len()); + for (key, value) in trailers.iter() { + v.push(( + ByteString::from(key.as_str()), + ByteString::from(value.as_bytes()), + )); + } + Ok(Some(v)) + } else { + Ok(None) + } + } else { + Ok(None) + } +} diff --git a/ext/node/ops/mod.rs b/ext/node/ops/mod.rs index 22ad546e89..cf4abf3ddc 100644 --- a/ext/node/ops/mod.rs +++ b/ext/node/ops/mod.rs @@ -2,6 +2,7 @@ pub mod crypto; pub mod http; +pub mod http2; pub mod idna; pub mod os; pub mod require; diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts index 20306584f8..62dd1a501b 100644 --- a/ext/node/polyfills/http2.ts +++ b/ext/node/polyfills/http2.ts @@ -4,79 +4,160 @@ // TODO(petamoriken): enable prefer-primordials for node polyfills // deno-lint-ignore-file prefer-primordials +const core = globalThis.Deno.core; import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts"; import { EventEmitter } from "node:events"; import { Buffer } from "node:buffer"; import { Server, Socket, TCP } from "node:net"; import { TypedArray } from "ext:deno_node/internal/util/types.ts"; -import { setStreamTimeout } from "ext:deno_node/internal/stream_base_commons.ts"; +import { + kMaybeDestroy, + kUpdateTimer, + setStreamTimeout, +} from "ext:deno_node/internal/stream_base_commons.ts"; import { FileHandle } from "node:fs/promises"; import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts"; import { addTrailers, serveHttpOnConnection } from "ext:deno_http/00_serve.js"; import { type Deferred, deferred } from "ext:deno_node/_util/async.ts"; import { nextTick } from "ext:deno_node/_next_tick.ts"; import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; +import { Duplex } from "node:stream"; +import { + AbortError, + ERR_HTTP2_CONNECT_AUTHORITY, + ERR_HTTP2_CONNECT_PATH, + ERR_HTTP2_CONNECT_SCHEME, + ERR_HTTP2_GOAWAY_SESSION, + ERR_HTTP2_INVALID_PSEUDOHEADER, + ERR_HTTP2_INVALID_SESSION, + ERR_HTTP2_INVALID_STREAM, + ERR_HTTP2_SESSION_ERROR, + ERR_HTTP2_STREAM_CANCEL, + ERR_HTTP2_STREAM_ERROR, + ERR_HTTP2_TRAILERS_ALREADY_SENT, + ERR_HTTP2_TRAILERS_NOT_READY, + ERR_INVALID_HTTP_TOKEN, +} from "ext:deno_node/internal/errors.ts"; +import { _checkIsHttpToken } from "ext:deno_node/_http_common.ts"; +import { TcpConn } from "ext:deno_net/01_net.js"; +import { TlsConn } from "ext:deno_net/02_tls.js"; + +const { + op_http2_connect, +} = core.ensureFastOps(); + +const kSession = Symbol("session"); +const kAlpnProtocol = Symbol("alpnProtocol"); +const kAuthority = Symbol("authority"); +const kEncrypted = Symbol("encrypted"); +const kID = Symbol("id"); +const kInit = Symbol("init"); +const kInfoHeaders = Symbol("sent-info-headers"); +const kOrigin = Symbol("origin"); +const kPendingRequestCalls = Symbol("kPendingRequestCalls"); +const kProtocol = Symbol("protocol"); +const kSentHeaders = Symbol("sent-headers"); +const kSentTrailers = Symbol("sent-trailers"); +const kState = Symbol("state"); +const kType = Symbol("type"); +const kTimeout = Symbol("timeout"); + +const kDenoResponse = Symbol("kDenoResponse"); +const kDenoRid = Symbol("kDenoRid"); +const kDenoClientRid = Symbol("kDenoClientRid"); +const kDenoConnRid = Symbol("kDenoConnRid"); + +const STREAM_FLAGS_PENDING = 0x0; +const STREAM_FLAGS_READY = 0x1; +const STREAM_FLAGS_CLOSED = 0x2; +const STREAM_FLAGS_HEADERS_SENT = 0x4; +const STREAM_FLAGS_HEAD_REQUEST = 0x8; +const STREAM_FLAGS_ABORTED = 0x10; +const STREAM_FLAGS_HAS_TRAILERS = 0x20; + +const SESSION_FLAGS_PENDING = 0x0; +const SESSION_FLAGS_READY = 0x1; +const SESSION_FLAGS_CLOSED = 0x2; +const SESSION_FLAGS_DESTROYED = 0x4; const ENCODER = new TextEncoder(); type Http2Headers = Record; +const debugHttp2Enabled = false; +function debugHttp2(...args) { + if (debugHttp2Enabled) { + console.log(...args); + } +} + export class Http2Session extends EventEmitter { - constructor() { + constructor(type, _options /* socket */) { super(); - } - get alpnProtocol(): string | undefined { - notImplemented("Http2Session.alpnProtocol"); - return undefined; - } + // TODO(bartlomieju): Handle sockets here - close(_callback?: () => void) { - warnNotImplemented("Http2Session.close"); - } + this[kState] = { + destroyCode: constants.NGHTTP2_NO_ERROR, + flags: SESSION_FLAGS_PENDING, + goawayCode: null, + goawayLastStreamID: null, + streams: new Map(), + pendingStreams: new Set(), + pendingAck: 0, + writeQueueSize: 0, + originSet: undefined, + }; - get closed(): boolean { - return false; - } + this[kEncrypted] = undefined; + this[kAlpnProtocol] = undefined; + this[kType] = type; + this[kTimeout] = null; + // this[kProxySocket] = null; + // this[kSocket] = socket; + // this[kHandle] = undefined; - get connecting(): boolean { - notImplemented("Http2Session.connecting"); - return false; - } - - destroy(_error?: Error, _code?: number) { - notImplemented("Http2Session.destroy"); - } - - get destroyed(): boolean { - return false; + // TODO(bartlomieju): connecting via socket } get encrypted(): boolean { - notImplemented("Http2Session.encrypted"); - return false; + return this[kEncrypted]; } - goaway( - _code: number, - _lastStreamID: number, - _opaqueData: Buffer | TypedArray | DataView, - ) { - notImplemented("Http2Session.goaway"); - } - - get localSettings(): Record { - notImplemented("Http2Session.localSettings"); - return {}; + get alpnProtocol(): string | undefined { + return this[kAlpnProtocol]; } get originSet(): string[] | undefined { - notImplemented("Http2Session.originSet"); - return undefined; + if (!this.encrypted || this.destroyed) { + return undefined; + } + // TODO(bartlomieju): + return []; } - get pendingSettingsAck(): boolean { - notImplemented("Http2Session.pendingSettingsAck"); - return false; + get connecting(): boolean { + return (this[kState].flags & SESSION_FLAGS_READY) === 0; + } + + get closed(): boolean { + return !!(this[kState].flags & SESSION_FLAGS_CLOSED); + } + + get destroyed(): boolean { + return !!(this[kState].flags & SESSION_FLAGS_DESTROYED); + } + + [kUpdateTimer]() { + if (this.destroyed) { + return; + } + if (this[kTimeout]) { + this[kTimeout].refresh(); + } + } + + setLocalWindowSize(_windowSize: number) { + notImplemented("Http2Session.setLocalWindowSize"); } ping( @@ -87,8 +168,26 @@ export class Http2Session extends EventEmitter { return false; } - ref() { - warnNotImplemented("Http2Session.ref"); + get socket(): Socket /*| TlsSocket*/ { + warnNotImplemented("Http2Session.socket"); + return {}; + } + + get type(): number { + return this[kType]; + } + + get pendingSettingsAck() { + return this[kState].pendingAck > 0; + } + + get state(): Record { + return {}; + } + + get localSettings(): Record { + notImplemented("Http2Session.localSettings"); + return {}; } get remoteSettings(): Record { @@ -96,39 +195,122 @@ export class Http2Session extends EventEmitter { return {}; } - setLocalWindowSize(_windowSize: number) { - notImplemented("Http2Session.setLocalWindowSize"); - } - - setTimeout(msecs: number, callback?: () => void) { - setStreamTimeout(this, msecs, callback); - } - - get socket(): Socket /*| TlsSocket*/ { - return {}; - } - - get state(): Record { - return {}; - } - settings(_settings: Record, _callback: () => void) { notImplemented("Http2Session.settings"); } - get type(): number { - notImplemented("Http2Session.type"); - return 0; + goaway( + _code: number, + _lastStreamID: number, + _opaqueData: Buffer | TypedArray | DataView, + ) { + warnNotImplemented("Http2Session.goaway"); + core.tryClose(this[kDenoConnRid]); + core.tryClose(this[kDenoClientRid]); + } + + destroy(error = constants.NGHTTP2_NO_ERROR, code?: number) { + if (this.destroyed) { + return; + } + + if (typeof error === "number") { + code = error; + error = code !== constants.NGHTTP2_NO_ERROR + ? new ERR_HTTP2_SESSION_ERROR(code) + : undefined; + } + if (code === undefined && error != null) { + code = constants.NGHTTP2_INTERNAL_ERROR; + } + + closeSession(this, code, error); + } + + close(callback?: () => void) { + if (this.closed || this.destroyed) { + return; + } + + this[kState].flags |= SESSION_FLAGS_CLOSED; + if (typeof callback === "function") { + this.once("close", callback); + } + this.goaway(); + this[kMaybeDestroy](); + } + + [kMaybeDestroy](error?: number) { + if (!error) { + const state = this[kState]; + // Don't destroy if the session is not closed or there are pending or open + // streams. + if ( + !this.closed || state.streams.size > 0 || state.pendingStreams.size > + 0 + ) { + return; + } + } + this.destroy(error); + } + + ref() { + warnNotImplemented("Http2Session.ref"); } unref() { warnNotImplemented("Http2Session.unref"); } + + setTimeout(msecs: number, callback?: () => void) { + setStreamTimeout(this, msecs, callback); + } +} + +function emitClose(session: Http2Session, error?: Error) { + if (error) { + session.emit("error", error); + } + session.emit("close"); +} + +function finishSessionClose(session: Http2Session, error?: Error) { + // TODO(bartlomieju): handle sockets + + nextTick(emitClose, session, error); +} + +function closeSession(session: Http2Session, code?: number, error?: Error) { + const state = session[kState]; + state.flags |= SESSION_FLAGS_DESTROYED; + state.destroyCode = code; + + session.setTimeout(0); + session.removeAllListeners("timeout"); + + // Destroy open and pending streams + if (state.pendingStreams.size > 0 || state.streams.size > 0) { + const cancel = new ERR_HTTP2_STREAM_CANCEL(error); + state.pendingStreams.forEach((stream) => stream.destroy(cancel)); + state.streams.forEach((stream) => stream.destroy(cancel)); + } + + // TODO(bartlomieju): handle sockets + debugHttp2( + ">>> closeSession", + session[kDenoConnRid], + session[kDenoClientRid], + ); + core.tryClose(session[kDenoConnRid]); + core.tryClose(session[kDenoClientRid]); + + finishSessionClose(session, error); } export class ServerHttp2Session extends Http2Session { constructor() { - super(); + super(constants.NGHTTP2_SESSION_SERVER, {}); } altsvc( @@ -143,71 +325,184 @@ export class ServerHttp2Session extends Http2Session { } } +function assertValidPseudoHeader(header: string) { + switch (header) { + case ":authority": + case ":path": + case ":method": + case ":scheme": + case ":status": + return; + default: + throw new ERR_HTTP2_INVALID_PSEUDOHEADER(header); + } +} + export class ClientHttp2Session extends Http2Session { + #connectPromise: Promise; + constructor( - _authority: string | URL, - _options: Record, - callback: (session: Http2Session) => void, + connPromise: Promise | Promise, + url: string, + options: Record, ) { - super(); - if (callback) { - this.on("connect", callback); - } - nextTick(() => this.emit("connect", this)); + super(constants.NGHTTP2_SESSION_CLIENT, options); + this[kPendingRequestCalls] = null; + this[kDenoClientRid] = undefined; + this[kDenoConnRid] = undefined; + + // TODO(bartlomieju): cleanup + this.#connectPromise = (async () => { + debugHttp2(">>> before connect"); + const conn = await connPromise; + const [clientRid, connRid] = await op_http2_connect(conn.rid, url); + debugHttp2(">>> after connect"); + this[kDenoClientRid] = clientRid; + this[kDenoConnRid] = connRid; + // TODO(bartlomieju): save this promise, so the session can be unrefed + (async () => { + try { + await core.opAsync( + "op_http2_poll_client_connection", + this[kDenoConnRid], + ); + } catch (e) { + this.emit("error", e); + } + })(); + this.emit("connect", this, {}); + })(); } request( headers: Http2Headers, - _options?: Record, + options?: Record, ): ClientHttp2Stream { - const reqHeaders: string[][] = []; - const controllerPromise: Deferred< - ReadableStreamDefaultController - > = deferred(); - const body = new ReadableStream({ - start(controller) { - controllerPromise.resolve(controller); - }, - }); - const request: RequestInit = { headers: reqHeaders, body }; - let authority = null; - let path = null; - for (const [name, value] of Object.entries(headers)) { - if (name == constants.HTTP2_HEADER_PATH) { - path = String(value); - } else if (name == constants.HTTP2_HEADER_METHOD) { - request.method = String(value); - } else if (name == constants.HTTP2_HEADER_AUTHORITY) { - authority = String(value); - } else { - reqHeaders.push([name, String(value)]); + if (this.destroyed) { + throw new ERR_HTTP2_INVALID_SESSION(); + } + + if (this.closed) { + throw new ERR_HTTP2_GOAWAY_SESSION(); + } + + this[kUpdateTimer](); + if (headers !== null && headers !== undefined) { + const keys = Object.keys(headers); + for (let i = 0; i < keys.length; i++) { + const header = keys[i]; + if (header[0] === ":") { + assertValidPseudoHeader(header); + } else if (header && !_checkIsHttpToken(header)) { + this.destroy(new ERR_INVALID_HTTP_TOKEN("Header name", header)); + } } } - const fetchPromise = fetch(`http://${authority}${path}`, request); - const readerPromise = deferred(); - const headersPromise = deferred(); - (async () => { - const fetch = await fetchPromise; - readerPromise.resolve(fetch.body); + headers = Object.assign({ __proto__: null }, headers); + options = { ...options }; - const headers: Http2Headers = {}; - for (const [key, value] of fetch.headers) { - headers[key] = value; + if (headers[constants.HTTP2_HEADER_METHOD] === undefined) { + headers[constants.HTTP2_HEADER_METHOD] = constants.HTTP2_METHOD_GET; + } + + const connect = + headers[constants.HTTP2_HEADER_METHOD] === constants.HTTP2_METHOD_CONNECT; + + if (!connect || headers[constants.HTTP2_HEADER_PROTOCOL] !== undefined) { + if (getAuthority(headers) === undefined) { + headers[constants.HTTP2_HEADER_AUTHORITY] = this[kAuthority]; } - headers[constants.HTTP2_HEADER_STATUS] = String(fetch.status); + if (headers[constants.HTTP2_HEADER_SCHEME] === undefined) { + headers[constants.HTTP2_HEADER_SCHEME] = this[kProtocol].slice(0, -1); + } + if (headers[constants.HTTP2_HEADER_PATH] === undefined) { + headers[constants.HTTP2_HEADER_PATH] = "/"; + } + } else { + if (headers[constants.HTTP2_HEADER_AUTHORITY] === undefined) { + throw new ERR_HTTP2_CONNECT_AUTHORITY(); + } + if (headers[constants.HTTP2_HEADER_SCHEME] === undefined) { + throw new ERR_HTTP2_CONNECT_SCHEME(); + } + if (headers[constants.HTTP2_HEADER_PATH] === undefined) { + throw new ERR_HTTP2_CONNECT_PATH(); + } + } - headersPromise.resolve(headers); - })(); - return new ClientHttp2Stream( + if (options.endStream === undefined) { + const method = headers[constants.HTTP2_HEADER_METHOD]; + options.endStream = method === constants.HTTP2_METHOD_DELETE || + method === constants.HTTP2_METHOD_GET || + method === constants.HTTP2_METHOD_HEAD; + } else { + options.endStream = !!options.endStream; + } + + const stream = new ClientHttp2Stream( + options, this, - headersPromise, - controllerPromise, - readerPromise, + this.#connectPromise, + headers, ); + stream[kSentHeaders] = headers; + stream[kOrigin] = `${headers[constants.HTTP2_HEADER_SCHEME]}://${ + getAuthority(headers) + }`; + + if (options.endStream) { + stream.end(); + } + + if (options.waitForTrailers) { + stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS; + } + + const { signal } = options; + if (signal) { + const aborter = () => { + stream.destroy(new AbortError(undefined, { cause: signal.reason })); + }; + if (signal.aborted) { + aborter(); + } else { + // TODO(bartlomieju): handle this + // const disposable = EventEmitter.addAbortListener(signal, aborter); + // stream.once("close", disposable[Symbol.dispose]); + } + } + + // TODO(bartlomieju): handle this + const onConnect = () => {}; + if (this.connecting) { + if (this[kPendingRequestCalls] !== null) { + this[kPendingRequestCalls].push(onConnect); + } else { + this[kPendingRequestCalls] = [onConnect]; + this.once("connect", () => { + this[kPendingRequestCalls].forEach((f) => f()); + this[kPendingRequestCalls] = null; + }); + } + } else { + onConnect(); + } + + return stream; } } +function getAuthority(headers) { + if (headers[constants.HTTP2_HEADER_AUTHORITY] !== undefined) { + return headers[constants.HTTP2_HEADER_AUTHORITY]; + } + if (headers[constants.HTTP2_HEADER_HOST] !== undefined) { + return headers[constants.HTTP2_HEADER_HOST]; + } + return undefined; +} + export class Http2Stream extends EventEmitter { #session: Http2Session; #headers: Deferred; @@ -265,6 +560,8 @@ export class Http2Stream extends EventEmitter { })(); } + setEncoding(_encoding) {} + resume() { } @@ -351,15 +648,564 @@ export class Http2Stream extends EventEmitter { } } -export class ClientHttp2Stream extends Http2Stream { - constructor( - session: Http2Session, - headers: Promise, - controllerPromise: Deferred>, - readerPromise: Deferred>, - ) { - super(session, headers, controllerPromise, readerPromise); +async function clientHttp2Request( + session, + sessionConnectPromise, + headers, + options, +) { + debugHttp2( + ">>> waiting for connect promise", + sessionConnectPromise, + headers, + options, + ); + await sessionConnectPromise; + + const reqHeaders: string[][] = []; + const pseudoHeaders = {}; + + for (const [key, value] of Object.entries(headers)) { + if (key[0] === ":") { + pseudoHeaders[key] = value; + } else { + reqHeaders.push([key, Array.isArray(value) ? value[0] : value]); + } } + debugHttp2( + "waited for connect promise", + !!options.waitForTrailers, + pseudoHeaders, + reqHeaders, + ); + + return await core.opAsync( + "op_http2_client_request", + session[kDenoClientRid], + pseudoHeaders, + reqHeaders, + ); +} + +export class ClientHttp2Stream extends Duplex { + #requestPromise: Promise<[number, number]>; + #responsePromise: Promise; + #rid: number | undefined = undefined; + #encoding = "utf8"; + + constructor( + options: Record, + session: Http2Session, + sessionConnectPromise: Promise, + headers: Record, + ) { + options.allowHalfOpen = true; + options.decodeString = false; + options.autoDestroy = false; + super(options); + this.cork(); + this[kSession] = session; + session[kState].pendingStreams.add(this); + + this._readableState.readingMore = true; + + this[kState] = { + didRead: false, + flags: STREAM_FLAGS_PENDING | STREAM_FLAGS_HEADERS_SENT, + rstCode: constants.NGHTTP2_NO_ERROR, + writeQueueSize: 0, + trailersReady: false, + endAfterHeaders: false, + shutdownWritableCalled: false, + }; + this[kDenoResponse] = undefined; + this[kDenoRid] = undefined; + + this.#requestPromise = clientHttp2Request( + session, + sessionConnectPromise, + headers, + options, + ); + debugHttp2(">>> created clienthttp2stream"); + // TODO(bartlomieju): save it so we can unref + this.#responsePromise = (async () => { + debugHttp2(">>> before request promise", session[kDenoClientRid]); + const [streamRid, streamId] = await this.#requestPromise; + this.#rid = streamRid; + this[kDenoRid] = streamRid; + this[kInit](streamId); + debugHttp2( + ">>> after request promise", + session[kDenoClientRid], + this.#rid, + ); + const response = await core.opAsync( + "op_http2_client_get_response", + this.#rid, + ); + debugHttp2(">>> after get response", response); + const headers = { + ":status": response.statusCode, + ...Object.fromEntries(response.headers), + }; + debugHttp2(">>> emitting response", headers); + this.emit("response", headers, 0); + this[kDenoResponse] = response; + this.emit("ready"); + })(); + } + + [kUpdateTimer]() { + if (this.destroyed) { + return; + } + if (this[kTimeout]) { + this[kTimeout].refresh(); + } + if (this[kSession]) { + this[kSession][kUpdateTimer](); + } + } + + [kInit](id) { + const state = this[kState]; + state.flags |= STREAM_FLAGS_READY; + + const session = this[kSession]; + session[kState].pendingStreams.delete(this); + session[kState].streams.set(id, this); + + // TODO(bartlomieju): handle socket handle + + this[kID] = id; + this.uncork(); + this.emit("ready"); + } + + get bufferSize() { + return this[kState].writeQueueSize + this.writableLength; + } + + get endAfterHeaders() { + return this[kState].endAfterHeaders; + } + + get sentHeaders() { + return this[kSentHeaders]; + } + + get sentTrailers() { + return this[kSentTrailers]; + } + + get sendInfoHeaders() { + return this[kInfoHeaders]; + } + + get pending(): boolean { + return this[kID] === undefined; + } + + get id(): number | undefined { + return this[kID]; + } + + get session(): Http2Session { + return this[kSession]; + } + + _onTimeout() { + callTimeout(this, kSession); + } + + get headersSent() { + return !!(this[kState].flags & STREAM_FLAGS_HEADERS_SENT); + } + + get aborted() { + return !!(this[kState].flags & STREAM_FLAGS_ABORTED); + } + + get headRequest() { + return !!(this[kState].flags & STREAM_FLAGS_HEAD_REQUEST); + } + + get rstCode() { + return this[kState].rstCode; + } + + get state(): Record { + notImplemented("Http2Stream.state"); + return {}; + } + + // [kAfterAsyncWrite]() {} + + // [kWriteGeneric]() {} + + // TODO(bartlomieju): clean up + _write(chunk, encoding, callback?: () => void) { + debugHttp2(">>> _write", callback); + if (typeof encoding === "function") { + callback = encoding; + encoding = "utf8"; + } + let data; + if (typeof encoding === "string") { + data = ENCODER.encode(chunk); + } else { + data = chunk.buffer; + } + + this.#requestPromise + .then(() => { + debugHttp2(">>> _write", this.#rid, data, encoding, callback); + return core.opAsync( + "op_http2_client_send_data", + this.#rid, + data, + ); + }) + .then(() => { + callback?.(); + debugHttp2( + "this.writableFinished", + this.pending, + this.destroyed, + this.writableFinished, + ); + }) + .catch((e) => { + callback?.(e); + }); + } + + // TODO(bartlomieju): finish this method + _writev(_chunks, _callback?) { + notImplemented("ClientHttp2Stream._writev"); + } + + _final(cb) { + debugHttp2("_final", new Error()); + if (this.pending) { + this.once("ready", () => this._final(cb)); + return; + } + + shutdownWritable(this, cb); + } + + // TODO(bartlomieju): needs a proper cleanup + _read() { + if (this.destroyed) { + this.push(null); + return; + } + + if (!this[kState].didRead) { + this._readableState.readingMore = false; + this[kState].didRead = true; + } + // if (!this.pending) { + // streamOnResume(this); + // } else { + // this.once("ready", () => streamOnResume(this)); + // } + + if (!this[kDenoResponse]) { + this.once("ready", this._read); + return; + } + + debugHttp2(">>> read"); + + (async () => { + const [chunk, finished] = await core.opAsync( + "op_http2_client_get_response_body_chunk", + this[kDenoResponse].bodyRid, + ); + + debugHttp2(">>> chunk", chunk, finished, this[kDenoResponse].bodyRid); + if (chunk === null) { + const trailerList = await core.opAsync( + "op_http2_client_get_response_trailers", + this[kDenoResponse].bodyRid, + ); + if (trailerList) { + const trailers = Object.fromEntries(trailerList); + this.emit("trailers", trailers); + } + + debugHttp2("tryClose"); + core.tryClose(this[kDenoResponse].bodyRid); + this.push(null); + debugHttp2(">>> read null chunk"); + this[kMaybeDestroy](); + return; + } + + let result; + if (this.#encoding === "utf8") { + result = this.push(new TextDecoder().decode(new Uint8Array(chunk))); + } else { + result = this.push(new Uint8Array(chunk)); + } + debugHttp2(">>> read result", result); + })(); + } + + // TODO(bartlomieju): + priority(_options: Record) { + notImplemented("Http2Stream.priority"); + } + + sendTrailers(trailers: Record) { + debugHttp2("sendTrailers", trailers); + if (this.destroyed || this.closed) { + throw new ERR_HTTP2_INVALID_STREAM(); + } + if (this[kSentTrailers]) { + throw new ERR_HTTP2_TRAILERS_ALREADY_SENT(); + } + if (!this[kState].trailersReady) { + throw new ERR_HTTP2_TRAILERS_NOT_READY(); + } + + trailers = Object.assign({ __proto__: null }, trailers); + const trailerList = []; + for (const [key, value] of Object.entries(trailers)) { + trailerList.push([key, value]); + } + this[kSentTrailers] = trailers; + + // deno-lint-ignore no-this-alias + const stream = this; + stream[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS; + debugHttp2("sending trailers", this.#rid, trailers); + + core.opAsync( + "op_http2_client_send_trailers", + this.#rid, + trailerList, + ).then(() => { + stream[kMaybeDestroy](); + core.tryClose(this.#rid); + }).catch((e) => { + debugHttp2(">>> send trailers error", e); + core.tryClose(this.#rid); + stream._destroy(e); + }); + } + + get closed(): boolean { + return !!(this[kState].flags & STREAM_FLAGS_CLOSED); + } + + close(code: number = constants.NGHTTP2_NO_ERROR, callback?: () => void) { + debugHttp2(">>> close", code, this.closed, callback); + + if (this.closed) { + return; + } + if (typeof callback !== "undefined") { + this.once("close", callback); + } + closeStream(this, code); + } + + _destroy(err, callback) { + debugHttp2(">>> ClientHttp2Stream._destroy", err, callback); + const session = this[kSession]; + const id = this[kID]; + + const state = this[kState]; + const sessionState = session[kState]; + const sessionCode = sessionState.goawayCode || sessionState.destroyCode; + + let code = this.closed ? this.rstCode : sessionCode; + if (err != null) { + if (sessionCode) { + code = sessionCode; + } else if (err instanceof AbortError) { + code = constants.NGHTTP2_CANCEL; + } else { + code = constants.NGHTTP2_INTERNAL_ERROR; + } + } + + if (!this.closed) { + // TODO(bartlomieju): this not handle `socket handle` + closeStream(this, code, kNoRstStream); + } + + sessionState.streams.delete(id); + sessionState.pendingStreams.delete(this); + + sessionState.writeQueueSize -= state.writeQueueSize; + state.writeQueueSize = 0; + + const nameForErrorCode = {}; + if ( + err == null && code !== constants.NGHTTP2_NO_ERROR && + code !== constants.NGHTTP2_CANCEL + ) { + err = new ERR_HTTP2_STREAM_ERROR(nameForErrorCode[code] || code); + } + + this[kSession] = undefined; + + session[kMaybeDestroy](); + callback(err); + } + + [kMaybeDestroy](code = constants.NGHTTP2_NO_ERROR) { + debugHttp2( + ">>> ClientHttp2Stream[kMaybeDestroy]", + code, + this.writableFinished, + this.readable, + this.closed, + ); + if (code !== constants.NGHTTP2_NO_ERROR) { + this._destroy(); + return; + } + + if (this.writableFinished) { + if (!this.readable && this.closed) { + debugHttp2("going into _destroy"); + this._destroy(); + return; + } + } + } + + setTimeout(msecs: number, callback?: () => void) { + // TODO(bartlomieju): fix this call, it's crashing on `this` being undefined; + // some strange transpilation quirk going on here. + setStreamTimeout.call(this, msecs, callback); + } +} + +function shutdownWritable(stream, callback) { + debugHttp2(">>> shutdownWritable", callback); + const state = stream[kState]; + if (state.shutdownWritableCalled) { + return callback(); + } + state.shutdownWritableCalled = true; + onStreamTrailers(stream); + callback(); + // TODO(bartlomieju): might have to add "finish" event listener here, + // check it. +} + +function onStreamTrailers(stream) { + stream[kState].trailersReady = true; + debugHttp2(">>> onStreamTrailers", stream.destroyed, stream.closed); + if (stream.destroyed || stream.closed) { + return; + } + if (!stream.emit("wantTrailers")) { + debugHttp2(">>> onStreamTrailers no wantTrailers"); + stream.sendTrailers({}); + } + debugHttp2(">>> onStreamTrailers wantTrailers"); +} + +const kNoRstStream = 0; +const kSubmitRstStream = 1; +const kForceRstStream = 2; + +function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) { + const state = stream[kState]; + state.flags |= STREAM_FLAGS_CLOSED; + state.rstCode = code; + + stream.setTimeout(0); + stream.removeAllListeners("timeout"); + + const { ending } = stream._writableState; + + if (!ending) { + if (!stream.aborted) { + state.flags |= STREAM_FLAGS_ABORTED; + stream.emit("aborted"); + } + + stream.end(); + } + + if (rstStreamStatus != kNoRstStream) { + debugHttp2( + ">>> closeStream", + !ending, + stream.writableFinished, + code !== constants.NGHTTP2_NO_ERROR, + rstStreamStatus === kForceRstStream, + ); + if ( + !ending || stream.writableFinished || + code !== constants.NGHTTP2_NO_ERROR || rstStreamStatus === kForceRstStream + ) { + finishCloseStream(stream, code); + } else { + stream.once("finish", () => finishCloseStream(stream, code)); + } + } +} + +function finishCloseStream(stream, code) { + debugHttp2(">>> finishCloseStream", stream.readableEnded, code); + if (stream.pending) { + stream.push(null); + stream.once("ready", () => { + core.opAsync( + "op_http2_client_reset_stream", + stream[kDenoRid], + code, + ).then(() => { + debugHttp2( + ">>> finishCloseStream close", + stream[kDenoRid], + stream[kDenoResponse].bodyRid, + ); + core.tryClose(stream[kDenoRid]); + core.tryClose(stream[kDenoResponse].bodyRid); + stream.emit("close"); + }); + }); + } else { + stream.resume(); + core.opAsync( + "op_http2_client_reset_stream", + stream[kDenoRid], + code, + ).then(() => { + debugHttp2( + ">>> finishCloseStream close2", + stream[kDenoRid], + stream[kDenoResponse].bodyRid, + ); + core.tryClose(stream[kDenoRid]); + core.tryClose(stream[kDenoResponse].bodyRid); + stream.emit("close"); + }).catch(() => { + debugHttp2( + ">>> finishCloseStream close2 catch", + stream[kDenoRid], + stream[kDenoResponse].bodyRid, + ); + core.tryClose(stream[kDenoRid]); + core.tryClose(stream[kDenoResponse].bodyRid); + stream.emit("close"); + }); + } +} + +function callTimeout() { + notImplemented("callTimeout"); } export class ServerHttp2Stream extends Http2Stream { @@ -496,17 +1342,17 @@ export class Http2Server extends Server { this.emit("stream", stream, headers); return await stream._promise; } catch (e) { - console.log("Error in serveHttpOnConnection", e); + console.log(">>> Error in serveHttpOnConnection", e); } return new Response(""); }, () => { - console.log("error"); + console.log(">>> error"); }, () => {}, ); } catch (e) { - console.log("Error in Http2Server", e); + console.log(">>> Error in Http2Server", e); } }, ); @@ -602,11 +1448,77 @@ export function connect( options: Record, callback: (session: ClientHttp2Session) => void, ): ClientHttp2Session { - return new ClientHttp2Session(authority, options, callback); + debugHttp2(">>> http2.connect", options); + + if (typeof options === "function") { + callback = options; + options = undefined; + } + + options = { ...options }; + + if (typeof authority === "string") { + authority = new URL(authority); + } + + const protocol = authority.protocol || options.protocol || "https:"; + let port = 0; + + if (authority.port !== "") { + port = Number(authority.port); + } else if (protocol === "http:") { + port = 80; + } else { + port = 443; + } + + if (port == 0) { + throw new Error("Invalid port"); + } + + let host = "localhost"; + + if (authority.hostname) { + host = authority.hostname; + + if (host[0] === "[") { + host = host.slice(1, -1); + } + } else if (authority.host) { + host = authority.host; + } + + // TODO(bartlomieju): handle defaults + if (typeof options.createConnection === "function") { + console.error("Not implemented: http2.connect.options.createConnection"); + // notImplemented("http2.connect.options.createConnection"); + } + + let conn, url; + if (protocol == "http:") { + conn = Deno.connect({ port, hostname: host }); + url = `http://${host}${port == 80 ? "" : (":" + port)}`; + } else if (protocol == "https:") { + conn = Deno.connectTls({ port, hostname: host, alpnProtocols: ["h2"] }); + url = `http://${host}${port == 443 ? "" : (":" + port)}`; + } else { + throw new TypeError("Unexpected URL protocol"); + } + + const session = new ClientHttp2Session(conn, url, options); + session[kAuthority] = `${options.servername || host}:${port}`; + session[kProtocol] = protocol; + + if (typeof callback === "function") { + session.once("connect", callback); + } + return session; } export const constants = { NGHTTP2_ERR_FRAME_SIZE_ERROR: -522, + NGHTTP2_NV_FLAG_NONE: 0, + NGHTTP2_NV_FLAG_NO_INDEX: 1, NGHTTP2_SESSION_SERVER: 0, NGHTTP2_SESSION_CLIENT: 1, NGHTTP2_STREAM_STATE_IDLE: 1, @@ -849,6 +1761,49 @@ export const constants = { HTTP_STATUS_NETWORK_AUTHENTICATION_REQUIRED: 511, }; +// const kSingleValueHeaders = new Set([ +// constants.HTTP2_HEADER_STATUS, +// constants.HTTP2_HEADER_METHOD, +// constants.HTTP2_HEADER_AUTHORITY, +// constants.HTTP2_HEADER_SCHEME, +// constants.HTTP2_HEADER_PATH, +// constants.HTTP2_HEADER_PROTOCOL, +// constants.HTTP2_HEADER_ACCESS_CONTROL_ALLOW_CREDENTIALS, +// constants.HTTP2_HEADER_ACCESS_CONTROL_MAX_AGE, +// constants.HTTP2_HEADER_ACCESS_CONTROL_REQUEST_METHOD, +// constants.HTTP2_HEADER_AGE, +// constants.HTTP2_HEADER_AUTHORIZATION, +// constants.HTTP2_HEADER_CONTENT_ENCODING, +// constants.HTTP2_HEADER_CONTENT_LANGUAGE, +// constants.HTTP2_HEADER_CONTENT_LENGTH, +// constants.HTTP2_HEADER_CONTENT_LOCATION, +// constants.HTTP2_HEADER_CONTENT_MD5, +// constants.HTTP2_HEADER_CONTENT_RANGE, +// constants.HTTP2_HEADER_CONTENT_TYPE, +// constants.HTTP2_HEADER_DATE, +// constants.HTTP2_HEADER_DNT, +// constants.HTTP2_HEADER_ETAG, +// constants.HTTP2_HEADER_EXPIRES, +// constants.HTTP2_HEADER_FROM, +// constants.HTTP2_HEADER_HOST, +// constants.HTTP2_HEADER_IF_MATCH, +// constants.HTTP2_HEADER_IF_MODIFIED_SINCE, +// constants.HTTP2_HEADER_IF_NONE_MATCH, +// constants.HTTP2_HEADER_IF_RANGE, +// constants.HTTP2_HEADER_IF_UNMODIFIED_SINCE, +// constants.HTTP2_HEADER_LAST_MODIFIED, +// constants.HTTP2_HEADER_LOCATION, +// constants.HTTP2_HEADER_MAX_FORWARDS, +// constants.HTTP2_HEADER_PROXY_AUTHORIZATION, +// constants.HTTP2_HEADER_RANGE, +// constants.HTTP2_HEADER_REFERER, +// constants.HTTP2_HEADER_RETRY_AFTER, +// constants.HTTP2_HEADER_TK, +// constants.HTTP2_HEADER_UPGRADE_INSECURE_REQUESTS, +// constants.HTTP2_HEADER_USER_AGENT, +// constants.HTTP2_HEADER_X_CONTENT_TYPE_OPTIONS, +// ]); + export function getDefaultSettings(): Record { notImplemented("http2.getDefaultSettings"); return {}; diff --git a/test_util/Cargo.toml b/test_util/Cargo.toml index 47f7d1d52f..f48fce0b3b 100644 --- a/test_util/Cargo.toml +++ b/test_util/Cargo.toml @@ -17,11 +17,13 @@ path = "src/test_server.rs" anyhow.workspace = true async-stream = "0.3.3" base64.workspace = true +bytes.workspace = true console_static_text.workspace = true fastwebsockets = { workspace = true, features = ["upgrade"] } flate2.workspace = true futures.workspace = true glob.workspace = true +h2.workspace = true hyper = { workspace = true, features = ["server", "http1", "http2", "runtime"] } lazy-regex.workspace = true libc.workspace = true diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs index 0600867d74..635520b44d 100644 --- a/test_util/src/lib.rs +++ b/test_util/src/lib.rs @@ -7,6 +7,7 @@ use futures::FutureExt; use futures::Stream; use futures::StreamExt; use hyper::header::HeaderValue; +use hyper::http; use hyper::server::Server; use hyper::service::make_service_fn; use hyper::service::service_fn; @@ -57,6 +58,7 @@ use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; use tokio::net::TcpStream; use tokio_rustls::rustls; +use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; use url::Url; @@ -102,6 +104,8 @@ const WS_PORT: u16 = 4242; const WSS_PORT: u16 = 4243; const WS_CLOSE_PORT: u16 = 4244; const WS_PING_PORT: u16 = 4245; +const H2_GRPC_PORT: u16 = 4246; +const H2S_GRPC_PORT: u16 = 4247; pub const PERMISSION_VARIANTS: [&str; 5] = ["read", "write", "env", "net", "run"]; @@ -1664,17 +1668,7 @@ async fn wrap_https_h1_only_tls_server() { async fn wrap_https_h2_only_tls_server() { let main_server_https_addr = SocketAddr::from(([127, 0, 0, 1], H2_ONLY_TLS_PORT)); - let cert_file = "tls/localhost.crt"; - let key_file = "tls/localhost.key"; - let ca_cert_file = "tls/RootCA.pem"; - let tls_config = get_tls_config( - cert_file, - key_file, - ca_cert_file, - SupportedHttpVersions::Http2Only, - ) - .await - .unwrap(); + let tls_config = create_tls_server_config().await; loop { let tcp = TcpListener::bind(&main_server_https_addr) .await @@ -1707,6 +1701,20 @@ async fn wrap_https_h2_only_tls_server() { } } +async fn create_tls_server_config() -> Arc { + let cert_file = "tls/localhost.crt"; + let key_file = "tls/localhost.key"; + let ca_cert_file = "tls/RootCA.pem"; + get_tls_config( + cert_file, + key_file, + ca_cert_file, + SupportedHttpVersions::Http2Only, + ) + .await + .unwrap() +} + async fn wrap_https_h1_only_server() { let main_server_http_addr = SocketAddr::from(([127, 0, 0, 1], H1_ONLY_PORT)); @@ -1729,6 +1737,103 @@ async fn wrap_https_h2_only_server() { let _ = main_server_http.await; } +async fn h2_grpc_server() { + let addr = SocketAddr::from(([127, 0, 0, 1], H2_GRPC_PORT)); + let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); + + let addr_tls = SocketAddr::from(([127, 0, 0, 1], H2S_GRPC_PORT)); + let listener_tls = tokio::net::TcpListener::bind(addr_tls).await.unwrap(); + let tls_config = create_tls_server_config().await; + + async fn serve(socket: TcpStream) -> Result<(), anyhow::Error> { + let mut connection = h2::server::handshake(socket).await?; + + while let Some(result) = connection.accept().await { + let (request, respond) = result?; + tokio::spawn(async move { + let _ = handle_request(request, respond).await; + }); + } + + Ok(()) + } + + async fn serve_tls( + socket: TlsStream, + ) -> Result<(), anyhow::Error> { + let mut connection = h2::server::handshake(socket).await?; + + while let Some(result) = connection.accept().await { + let (request, respond) = result?; + tokio::spawn(async move { + let _ = handle_request(request, respond).await; + }); + } + + Ok(()) + } + + async fn handle_request( + mut request: http::Request, + mut respond: h2::server::SendResponse, + ) -> Result<(), anyhow::Error> { + let body = request.body_mut(); + while let Some(data) = body.data().await { + let data = data?; + let _ = body.flow_control().release_capacity(data.len()); + } + + let maybe_recv_trailers = body.trailers().await?; + + let response = http::Response::new(()); + let mut send = respond.send_response(response, false)?; + send.send_data(bytes::Bytes::from_static(b"hello "), false)?; + send.send_data(bytes::Bytes::from_static(b"world\n"), false)?; + let mut trailers = http::HeaderMap::new(); + trailers.insert( + http::HeaderName::from_static("abc"), + HeaderValue::from_static("def"), + ); + trailers.insert( + http::HeaderName::from_static("opr"), + HeaderValue::from_static("stv"), + ); + if let Some(recv_trailers) = maybe_recv_trailers { + for (key, value) in recv_trailers { + trailers.insert(key.unwrap(), value); + } + } + send.send_trailers(trailers)?; + + Ok(()) + } + + let http = tokio::spawn(async move { + loop { + if let Ok((socket, _peer_addr)) = listener.accept().await { + tokio::spawn(async move { + let _ = serve(socket).await; + }); + } + } + }); + + let https = tokio::spawn(async move { + loop { + if let Ok((socket, _peer_addr)) = listener_tls.accept().await { + let tls_acceptor = TlsAcceptor::from(tls_config.clone()); + let tls = tls_acceptor.accept(socket).await.unwrap(); + tokio::spawn(async move { + let _ = serve_tls(tls).await; + }); + } + } + }); + + http.await.unwrap(); + https.await.unwrap(); +} + async fn wrap_client_auth_https_server() { let main_server_https_addr = SocketAddr::from(([127, 0, 0, 1], HTTPS_CLIENT_AUTH_PORT)); @@ -1821,6 +1926,7 @@ pub async fn run_all_servers() { let h2_only_server_tls_fut = wrap_https_h2_only_tls_server(); let h1_only_server_fut = wrap_https_h1_only_server(); let h2_only_server_fut = wrap_https_h2_only_server(); + let h2_grpc_server_fut = h2_grpc_server(); let mut server_fut = async { futures::join!( @@ -1843,7 +1949,8 @@ pub async fn run_all_servers() { h1_only_server_tls_fut, h2_only_server_tls_fut, h1_only_server_fut, - h2_only_server_fut + h2_only_server_fut, + h2_grpc_server_fut, ) } .boxed();