From 3cd7abf73fa104526508984daef54bbb8e120310 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Mon, 3 Apr 2023 00:50:39 +0200 Subject: [PATCH] refactor(ext/node): migrate "http" module to use "Deno.serveHttp" API (#18552) This commit changes "node:http" module to use "Deno.serveHttp" API instead of "Deno.serve" API. --------- Co-authored-by: Matt Mastracci --- ext/node/polyfills/http.ts | 189 ++++++++++--------------------------- 1 file changed, 48 insertions(+), 141 deletions(-) diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index b99e7afabf..dd92a97447 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -1,9 +1,6 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -const core = globalThis.Deno.core; -const ops = core.ops; import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; -import { type Deferred, deferred } from "ext:deno_node/_util/async.ts"; import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts"; import { Buffer } from "ext:deno_node/buffer.ts"; import { ERR_SERVER_NOT_RUNNING } from "ext:deno_node/internal/errors.ts"; @@ -19,7 +16,8 @@ import { Agent } from "ext:deno_node/_http_agent.mjs"; import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; -import * as flash from "ext:deno_flash/01_http.js"; +import * as denoHttp from "ext:deno_http/01_http.js"; +import * as httpRuntime from "ext:runtime/40_http.js"; enum STATUS_CODES { /** RFC 7231, 6.2.1 */ @@ -191,9 +189,6 @@ const METHODS = [ type Chunk = string | Buffer | Uint8Array; -const DenoServe = flash.createServe(ops.op_node_unstable_flash_serve); -const DenoUpgradeHttpRaw = flash.upgradeHttpRaw; - const ENCODER = new TextEncoder(); export interface RequestOptions { @@ -411,11 +406,7 @@ export class ServerResponse extends NodeWritable { finished = false; headersSent = false; #firstChunk: Chunk | null = null; - // Used if --unstable flag IS NOT present #reqEvent?: Deno.RequestEvent; - // Used if --unstable flag IS present - #resolve?: (value: Response | PromiseLike) => void; - #isFlashRequest: boolean; static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) { // TODO(kt3k): This is a workaround for denoland/deno#17194 @@ -436,10 +427,7 @@ export class ServerResponse extends NodeWritable { return status === 101 || status === 204 || status === 205 || status === 304; } - constructor( - reqEvent: undefined | Deno.RequestEvent, - resolve: undefined | ((value: Response | PromiseLike) => void), - ) { + constructor(reqEvent: undefined | Deno.RequestEvent) { let controller: ReadableByteStreamController; const readable = new ReadableStream({ start(c) { @@ -481,9 +469,7 @@ export class ServerResponse extends NodeWritable { }, }); this.#readable = readable; - this.#resolve = resolve; this.#reqEvent = reqEvent; - this.#isFlashRequest = typeof resolve !== "undefined"; } setHeader(name: string, value: string) { @@ -519,9 +505,8 @@ export class ServerResponse extends NodeWritable { this.statusCode = 200; this.statusMessage = "OK"; } - // Only taken if --unstable IS NOT present if ( - !this.#isFlashRequest && typeof singleChunk === "string" && + typeof singleChunk === "string" && !this.hasHeader("content-type") ) { this.setHeader("content-type", "text/plain;charset=UTF-8"); @@ -535,35 +520,22 @@ export class ServerResponse extends NodeWritable { if (ServerResponse.#bodyShouldBeNull(this.statusCode!)) { body = null; } - if (this.#isFlashRequest) { - this.#resolve!( - new Response(body, { - headers: this.#headers, - status: this.statusCode, - statusText: this.statusMessage, - }), - ); - } else { - this.#reqEvent!.respondWith( - new Response(body, { - headers: this.#headers, - status: this.statusCode, - statusText: this.statusMessage, - }), - ).catch(() => { - // ignore this error - }); - } + this.#reqEvent!.respondWith( + new Response(body, { + headers: this.#headers, + status: this.statusCode, + statusText: this.statusMessage, + }), + ).catch(() => { + // TODO(bartlomieju): this error should be handled somehow + // ignore this error + }); } // deno-lint-ignore no-explicit-any override end(chunk?: any, encoding?: any, cb?: any): this { this.finished = true; - if (this.#isFlashRequest) { - // Flash sets both of these headers. - this.#headers.delete("transfer-encoding"); - this.#headers.delete("content-length"); - } else if (!chunk && this.#headers.has("transfer-encoding")) { + if (!chunk && this.#headers.has("transfer-encoding")) { // FIXME(bnoordhuis) Node sends a zero length chunked body instead, i.e., // the trailing "0\r\n", but respondWith() just hangs when I try that. this.#headers.set("content-length", "0"); @@ -641,25 +613,12 @@ export function Server(handler?: ServerHandler): ServerImpl { } class ServerImpl extends EventEmitter { - #isFlashServer: boolean; - #httpConnections: Set = new Set(); #listener?: Deno.Listener; - - #addr?: Deno.NetAddr; - #hasClosed = false; - #ac?: AbortController; - #servePromise?: Deferred; listening = false; constructor(handler?: ServerHandler) { super(); - // @ts-ignore Might be undefined without `--unstable` flag - this.#isFlashServer = typeof DenoServe == "function"; - if (this.#isFlashServer) { - this.#servePromise = deferred(); - this.#servePromise.then(() => this.emit("close")); - } if (handler !== undefined) { this.on("request", handler); } @@ -684,26 +643,16 @@ class ServerImpl extends EventEmitter { // TODO(bnoordhuis) Node prefers [::] when host is omitted, // we on the other hand default to 0.0.0.0. - if (this.#isFlashServer) { - const hostname = options.host ?? "0.0.0.0"; - this.#addr = { - hostname, - port, - } as Deno.NetAddr; - this.listening = true; - nextTick(() => this.#serve()); - } else { - this.listening = true; - const hostname = options.host ?? ""; - this.#listener = Deno.listen({ port, hostname }); - nextTick(() => this.#listenLoop()); - } + this.listening = true; + const hostname = options.host ?? ""; + this.#listener = Deno.listen({ port, hostname }); + nextTick(() => this.#listenLoop()); return this; } async #listenLoop() { - const go = async (httpConn: Deno.HttpConn) => { + const go = async (tcpConn: Deno.Conn, httpConn: Deno.HttpConn) => { try { for (;;) { let reqEvent = null; @@ -721,8 +670,20 @@ class ServerImpl extends EventEmitter { break; } const req = new IncomingMessageForServer(reqEvent.request); - const res = new ServerResponse(reqEvent, undefined); - this.emit("request", req, res); + if (req.upgrade && this.listenerCount("upgrade") > 0) { + const conn = await denoHttp.upgradeHttpRaw( + reqEvent.request, + tcpConn, + ) as Deno.Conn; + const socket = new Socket({ + handle: new TCP(constants.SERVER, conn), + }); + this.emit("upgrade", req, socket, Buffer.from([])); + return; + } else { + const res = new ServerResponse(reqEvent); + this.emit("request", req, res); + } } } finally { this.#httpConnections.delete(httpConn); @@ -737,56 +698,17 @@ class ServerImpl extends EventEmitter { for await (const conn of listener) { let httpConn: Deno.HttpConn; try { - httpConn = Deno.serveHttp(conn); + httpConn = httpRuntime.serveHttp(conn); } catch { continue; /// Connection closed. } this.#httpConnections.add(httpConn); - go(httpConn); + go(conn, httpConn); } } } - #serve() { - const ac = new AbortController(); - const handler = (request: Request) => { - const req = new IncomingMessageForServer(request); - if (req.upgrade && this.listenerCount("upgrade") > 0) { - const [conn, head] = DenoUpgradeHttpRaw(request) as [ - Deno.Conn, - Uint8Array, - ]; - const socket = new Socket({ - handle: new TCP(constants.SERVER, conn), - }); - this.emit("upgrade", req, socket, Buffer.from(head)); - } else { - return new Promise((resolve): void => { - const res = new ServerResponse(undefined, resolve); - this.emit("request", req, res); - }); - } - }; - - if (this.#hasClosed) { - return; - } - this.#ac = ac; - DenoServe( - { - handler: handler as Deno.ServeHandler, - ...this.#addr, - signal: ac.signal, - // @ts-ignore Might be any without `--unstable` flag - onListen: ({ port }) => { - this.#addr!.port = port; - this.emit("listening"); - }, - }, - ).then(() => this.#servePromise!.resolve()); - } - setTimeout() { console.error("Not implemented: Server.setTimeout()"); } @@ -795,7 +717,6 @@ class ServerImpl extends EventEmitter { const listening = this.listening; this.listening = false; - this.#hasClosed = true; if (typeof cb === "function") { if (listening) { this.once("close", cb); @@ -806,42 +727,28 @@ class ServerImpl extends EventEmitter { } } - if (this.#isFlashServer) { - if (listening && this.#ac) { - this.#ac.abort(); - this.#ac = undefined; - } else { - this.#servePromise!.resolve(); - } - } else { - nextTick(() => this.emit("close")); + nextTick(() => this.emit("close")); - if (listening) { - this.#listener!.close(); - this.#listener = undefined; + if (listening) { + this.#listener!.close(); + this.#listener = undefined; - for (const httpConn of this.#httpConnections) { - try { - httpConn.close(); - } catch { - // Already closed. - } + for (const httpConn of this.#httpConnections) { + try { + httpConn.close(); + } catch { + // Already closed. } - - this.#httpConnections.clear(); } + + this.#httpConnections.clear(); } return this; } address() { - let addr; - if (this.#isFlashServer) { - addr = this.#addr!; - } else { - addr = this.#listener!.addr as Deno.NetAddr; - } + const addr = this.#listener!.addr as Deno.NetAddr; return { port: addr.port, address: addr.hostname,