From cee221109a61406004817b09361bf3674ab06cde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 12 Oct 2023 16:03:19 +0200 Subject: [PATCH] fix(node/http2): fixes to support grpc (#20712) This commit improves "node:http2" module implementation, by enabling to use "options.createConnection" callback when starting an HTTP2 session. This change enables to pass basic client-side test with "grpc-js/grpc" package. Smaller fixes like "Http2Session.unref()" and "Http2Session.setTimeout()" were handled as well. Fixes #16647 --- Cargo.lock | 4 +- cli/tests/unit_node/http2_test.ts | 46 ++++++++++ ext/node/polyfills/http2.ts | 144 ++++++++++++++++++++++++------ 3 files changed, 163 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 51192db2f0..8afda07c2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6301,9 +6301,9 @@ dependencies = [ [[package]] name = "v8" -version = "0.79.1" +version = "0.79.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b87d5248d1a7e321a264d21dc7839675fc0bb456e489102272a55b44047869f0" +checksum = "b15561535230812a1db89a696f1f16a12ae6c2c370c6b2241c68d4cb33963faf" dependencies = [ "bitflags 1.3.2", "fslock", diff --git a/cli/tests/unit_node/http2_test.ts b/cli/tests/unit_node/http2_test.ts index 8e7b261ae5..d0e0de43f1 100644 --- a/cli/tests/unit_node/http2_test.ts +++ b/cli/tests/unit_node/http2_test.ts @@ -63,6 +63,52 @@ for (const url of ["http://127.0.0.1:4246", "https://127.0.0.1:4247"]) { }); } +Deno.test(`[node/http2 client createConnection]`, { + ignore: Deno.build.os === "windows", +}, async () => { + const url = "http://127.0.0.1:4246"; + const createConnPromise = deferred(); + // Create a server to respond to the HTTP2 requests + const client = http2.connect(url, { + createConnection() { + const socket = net.connect({ host: "127.0.0.1", port: 4246 }); + + socket.on("connect", () => { + createConnPromise.resolve(); + }); + + return socket; + }, + }); + client.on("error", (err) => console.error(err)); + + const req = client.request({ ":method": "POST", ":path": "/" }); + + let receivedData = ""; + + req.write("hello"); + req.setEncoding("utf8"); + + req.on("data", (chunk) => { + receivedData += chunk; + }); + req.end(); + + const endPromise = deferred(); + setTimeout(() => { + try { + client.close(); + } catch (_) { + // pass + } + endPromise.resolve(); + }, 2000); + + await createConnPromise; + await endPromise; + assertEquals(receivedData, "hello world\n"); +}); + // TODO(bartlomieju): reenable sanitizers Deno.test("[node/http2 server]", { sanitizeOps: false }, async () => { const server = http2.createServer(); diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts index 62dd1a501b..9ebdabb795 100644 --- a/ext/node/polyfills/http2.ts +++ b/ext/node/polyfills/http2.ts @@ -8,9 +8,11 @@ const core = globalThis.Deno.core; import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts"; import { EventEmitter } from "node:events"; import { Buffer } from "node:buffer"; -import { Server, Socket, TCP } from "node:net"; +import { connect as netConnect, Server, Socket, TCP } from "node:net"; +import { connect as tlsConnect } from "node:tls"; import { TypedArray } from "ext:deno_node/internal/util/types.ts"; import { + kHandle, kMaybeDestroy, kUpdateTimer, setStreamTimeout, @@ -36,11 +38,11 @@ import { ERR_HTTP2_STREAM_ERROR, ERR_HTTP2_TRAILERS_ALREADY_SENT, ERR_HTTP2_TRAILERS_NOT_READY, + ERR_HTTP2_UNSUPPORTED_PROTOCOL, ERR_INVALID_HTTP_TOKEN, + ERR_SOCKET_CLOSED, } from "ext:deno_node/internal/errors.ts"; import { _checkIsHttpToken } from "ext:deno_node/_http_common.ts"; -import { TcpConn } from "ext:deno_net/01_net.js"; -import { TlsConn } from "ext:deno_net/02_tls.js"; const { op_http2_connect, @@ -66,6 +68,7 @@ const kDenoResponse = Symbol("kDenoResponse"); const kDenoRid = Symbol("kDenoRid"); const kDenoClientRid = Symbol("kDenoClientRid"); const kDenoConnRid = Symbol("kDenoConnRid"); +const kPollConnPromiseId = Symbol("kPollConnPromiseId"); const STREAM_FLAGS_PENDING = 0x0; const STREAM_FLAGS_READY = 0x1; @@ -205,8 +208,12 @@ export class Http2Session extends EventEmitter { _opaqueData: Buffer | TypedArray | DataView, ) { warnNotImplemented("Http2Session.goaway"); - core.tryClose(this[kDenoConnRid]); - core.tryClose(this[kDenoClientRid]); + if (this[kDenoConnRid]) { + core.tryClose(this[kDenoConnRid]); + } + if (this[kDenoClientRid]) { + core.tryClose(this[kDenoClientRid]); + } } destroy(error = constants.NGHTTP2_NO_ERROR, code?: number) { @@ -264,7 +271,7 @@ export class Http2Session extends EventEmitter { } setTimeout(msecs: number, callback?: () => void) { - setStreamTimeout(this, msecs, callback); + setStreamTimeout.call(this, msecs, callback); } } @@ -302,8 +309,13 @@ function closeSession(session: Http2Session, code?: number, error?: Error) { session[kDenoConnRid], session[kDenoClientRid], ); - core.tryClose(session[kDenoConnRid]); - core.tryClose(session[kDenoClientRid]); + console.table(Deno.resources()); + if (session[kDenoConnRid]) { + core.tryClose(session[kDenoConnRid]); + } + if (session[kDenoClientRid]) { + core.tryClose(session[kDenoClientRid]); + } finishSessionClose(session, error); } @@ -340,9 +352,11 @@ function assertValidPseudoHeader(header: string) { export class ClientHttp2Session extends Http2Session { #connectPromise: Promise; + #refed = true; constructor( - connPromise: Promise | Promise, + // deno-lint-ignore no-explicit-any + socket: any, url: string, options: Record, ) { @@ -350,22 +364,42 @@ export class ClientHttp2Session extends Http2Session { this[kPendingRequestCalls] = null; this[kDenoClientRid] = undefined; this[kDenoConnRid] = undefined; + this[kPollConnPromiseId] = undefined; + + socket.on("error", socketOnError); + socket.on("close", socketOnClose); + const connPromise = new Promise((resolve) => { + const eventName = url.startsWith("https") ? "secureConnect" : "connect"; + socket.once(eventName, () => { + const rid = socket[kHandle][kStreamBaseField].rid; + nextTick(() => { + resolve(rid); + }); + }); + }); + socket[kSession] = this; // TODO(bartlomieju): cleanup this.#connectPromise = (async () => { debugHttp2(">>> before connect"); - const conn = await connPromise; - const [clientRid, connRid] = await op_http2_connect(conn.rid, url); - debugHttp2(">>> after connect"); + const connRid_ = await connPromise; + // console.log(">>>> awaited connRid", connRid_, url); + const [clientRid, connRid] = await op_http2_connect(connRid_, url); + debugHttp2(">>> after connect", clientRid, connRid); this[kDenoClientRid] = clientRid; this[kDenoConnRid] = connRid; - // TODO(bartlomieju): save this promise, so the session can be unrefed (async () => { try { - await core.opAsync( + const promise = core.opAsync( "op_http2_poll_client_connection", this[kDenoConnRid], ); + this[kPollConnPromiseId] = + promise[Symbol.for("Deno.core.internalPromiseId")]; + if (!this.#refed) { + this.unref(); + } + await promise; } catch (e) { this.emit("error", e); } @@ -374,6 +408,20 @@ export class ClientHttp2Session extends Http2Session { })(); } + ref() { + this.#refed = true; + if (this[kPollConnPromiseId]) { + core.refOp(this[kPollConnPromiseId]); + } + } + + unref() { + this.#refed = false; + if (this[kPollConnPromiseId]) { + core.unrefOp(this[kPollConnPromiseId]); + } + } + request( headers: Http2Headers, options?: Record, @@ -1190,7 +1238,9 @@ function finishCloseStream(stream, code) { ); core.tryClose(stream[kDenoRid]); core.tryClose(stream[kDenoResponse].bodyRid); - stream.emit("close"); + nextTick(() => { + stream.emit("close"); + }); }).catch(() => { debugHttp2( ">>> finishCloseStream close2 catch", @@ -1199,7 +1249,9 @@ function finishCloseStream(stream, code) { ); core.tryClose(stream[kDenoRid]); core.tryClose(stream[kDenoResponse].bodyRid); - stream.emit("close"); + nextTick(() => { + stream.emit("close"); + }); }); } } @@ -1488,24 +1540,32 @@ export function connect( host = authority.host; } - // TODO(bartlomieju): handle defaults + let url, socket; + if (typeof options.createConnection === "function") { - console.error("Not implemented: http2.connect.options.createConnection"); - // notImplemented("http2.connect.options.createConnection"); - } - - let conn, url; - if (protocol == "http:") { - conn = Deno.connect({ port, hostname: host }); url = `http://${host}${port == 80 ? "" : (":" + port)}`; - } else if (protocol == "https:") { - conn = Deno.connectTls({ port, hostname: host, alpnProtocols: ["h2"] }); - url = `http://${host}${port == 443 ? "" : (":" + port)}`; + socket = options.createConnection(host, options); } else { - throw new TypeError("Unexpected URL protocol"); + switch (protocol) { + case "http:": + url = `http://${host}${port == 80 ? "" : (":" + port)}`; + socket = netConnect({ port, host, ...options, pauseOnCreate: true }); + break; + case "https:": + // TODO(bartlomieju): handle `initializeTLSOptions` here + url = `https://${host}${port == 443 ? "" : (":" + port)}`; + socket = tlsConnect(port, host, { manualStart: true }); + break; + default: + throw new ERR_HTTP2_UNSUPPORTED_PROTOCOL(protocol); + } } - const session = new ClientHttp2Session(conn, url, options); + // Pause so no "socket.read()" starts in the background that would + // prevent us from taking ownership of the socket in `ClientHttp2Session` + socket.pause(); + const session = new ClientHttp2Session(socket, url, options); + session[kAuthority] = `${options.servername || host}:${port}`; session[kProtocol] = protocol; @@ -1515,6 +1575,32 @@ export function connect( return session; } +function socketOnError(error) { + const session = this[kSession]; + if (session !== undefined) { + if (error.code === "ECONNRESET" && session[kState].goawayCode !== null) { + return session.destroy(); + } + debugHttp2(">>>> socket error", error); + session.destroy(error); + } +} + +function socketOnClose() { + const session = this[kSession]; + if (session !== undefined) { + debugHttp2(">>>> socket closed"); + const err = session.connecting ? new ERR_SOCKET_CLOSED() : null; + const state = session[kState]; + state.streams.forEach((stream) => stream.close(constants.NGHTTP2_CANCEL)); + state.pendingStreams.forEach((stream) => + stream.close(constants.NGHTTP2_CANCEL) + ); + session.close(); + session[kMaybeDestroy](err); + } +} + export const constants = { NGHTTP2_ERR_FRAME_SIZE_ERROR: -522, NGHTTP2_NV_FLAG_NONE: 0,