1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

refactor(ext/node): migrate "http" module to use "Deno.serveHttp" API (#18552)

This commit changes "node:http" module to use "Deno.serveHttp" API
instead of "Deno.serve" API.

---------

Co-authored-by: Matt Mastracci <matthew@mastracci.com>
This commit is contained in:
Bartek Iwańczuk 2023-04-03 00:50:39 +02:00 committed by GitHub
parent 513dadadcf
commit 3cd7abf73f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -1,9 +1,6 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
const core = globalThis.Deno.core;
const ops = core.ops;
import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
import { type Deferred, deferred } from "ext:deno_node/_util/async.ts";
import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts"; import { _normalizeArgs, ListenOptions, Socket } from "ext:deno_node/net.ts";
import { Buffer } from "ext:deno_node/buffer.ts"; import { Buffer } from "ext:deno_node/buffer.ts";
import { ERR_SERVER_NOT_RUNNING } from "ext:deno_node/internal/errors.ts"; import { ERR_SERVER_NOT_RUNNING } from "ext:deno_node/internal/errors.ts";
@ -19,7 +16,8 @@ import { Agent } from "ext:deno_node/_http_agent.mjs";
import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts"; import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts";
import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts";
import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts";
import * as flash from "ext:deno_flash/01_http.js"; import * as denoHttp from "ext:deno_http/01_http.js";
import * as httpRuntime from "ext:runtime/40_http.js";
enum STATUS_CODES { enum STATUS_CODES {
/** RFC 7231, 6.2.1 */ /** RFC 7231, 6.2.1 */
@ -191,9 +189,6 @@ const METHODS = [
type Chunk = string | Buffer | Uint8Array; type Chunk = string | Buffer | Uint8Array;
const DenoServe = flash.createServe(ops.op_node_unstable_flash_serve);
const DenoUpgradeHttpRaw = flash.upgradeHttpRaw;
const ENCODER = new TextEncoder(); const ENCODER = new TextEncoder();
export interface RequestOptions { export interface RequestOptions {
@ -411,11 +406,7 @@ export class ServerResponse extends NodeWritable {
finished = false; finished = false;
headersSent = false; headersSent = false;
#firstChunk: Chunk | null = null; #firstChunk: Chunk | null = null;
// Used if --unstable flag IS NOT present
#reqEvent?: Deno.RequestEvent; #reqEvent?: Deno.RequestEvent;
// Used if --unstable flag IS present
#resolve?: (value: Response | PromiseLike<Response>) => void;
#isFlashRequest: boolean;
static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) { static #enqueue(controller: ReadableStreamDefaultController, chunk: Chunk) {
// TODO(kt3k): This is a workaround for denoland/deno#17194 // TODO(kt3k): This is a workaround for denoland/deno#17194
@ -436,10 +427,7 @@ export class ServerResponse extends NodeWritable {
return status === 101 || status === 204 || status === 205 || status === 304; return status === 101 || status === 204 || status === 205 || status === 304;
} }
constructor( constructor(reqEvent: undefined | Deno.RequestEvent) {
reqEvent: undefined | Deno.RequestEvent,
resolve: undefined | ((value: Response | PromiseLike<Response>) => void),
) {
let controller: ReadableByteStreamController; let controller: ReadableByteStreamController;
const readable = new ReadableStream({ const readable = new ReadableStream({
start(c) { start(c) {
@ -481,9 +469,7 @@ export class ServerResponse extends NodeWritable {
}, },
}); });
this.#readable = readable; this.#readable = readable;
this.#resolve = resolve;
this.#reqEvent = reqEvent; this.#reqEvent = reqEvent;
this.#isFlashRequest = typeof resolve !== "undefined";
} }
setHeader(name: string, value: string) { setHeader(name: string, value: string) {
@ -519,9 +505,8 @@ export class ServerResponse extends NodeWritable {
this.statusCode = 200; this.statusCode = 200;
this.statusMessage = "OK"; this.statusMessage = "OK";
} }
// Only taken if --unstable IS NOT present
if ( if (
!this.#isFlashRequest && typeof singleChunk === "string" && typeof singleChunk === "string" &&
!this.hasHeader("content-type") !this.hasHeader("content-type")
) { ) {
this.setHeader("content-type", "text/plain;charset=UTF-8"); this.setHeader("content-type", "text/plain;charset=UTF-8");
@ -535,35 +520,22 @@ export class ServerResponse extends NodeWritable {
if (ServerResponse.#bodyShouldBeNull(this.statusCode!)) { if (ServerResponse.#bodyShouldBeNull(this.statusCode!)) {
body = null; body = null;
} }
if (this.#isFlashRequest) { this.#reqEvent!.respondWith(
this.#resolve!( new Response(body, {
new Response(body, { headers: this.#headers,
headers: this.#headers, status: this.statusCode,
status: this.statusCode, statusText: this.statusMessage,
statusText: this.statusMessage, }),
}), ).catch(() => {
); // TODO(bartlomieju): this error should be handled somehow
} else { // ignore this error
this.#reqEvent!.respondWith( });
new Response(body, {
headers: this.#headers,
status: this.statusCode,
statusText: this.statusMessage,
}),
).catch(() => {
// ignore this error
});
}
} }
// deno-lint-ignore no-explicit-any // deno-lint-ignore no-explicit-any
override end(chunk?: any, encoding?: any, cb?: any): this { override end(chunk?: any, encoding?: any, cb?: any): this {
this.finished = true; this.finished = true;
if (this.#isFlashRequest) { if (!chunk && this.#headers.has("transfer-encoding")) {
// Flash sets both of these headers.
this.#headers.delete("transfer-encoding");
this.#headers.delete("content-length");
} else if (!chunk && this.#headers.has("transfer-encoding")) {
// FIXME(bnoordhuis) Node sends a zero length chunked body instead, i.e., // FIXME(bnoordhuis) Node sends a zero length chunked body instead, i.e.,
// the trailing "0\r\n", but respondWith() just hangs when I try that. // the trailing "0\r\n", but respondWith() just hangs when I try that.
this.#headers.set("content-length", "0"); this.#headers.set("content-length", "0");
@ -641,25 +613,12 @@ export function Server(handler?: ServerHandler): ServerImpl {
} }
class ServerImpl extends EventEmitter { class ServerImpl extends EventEmitter {
#isFlashServer: boolean;
#httpConnections: Set<Deno.HttpConn> = new Set(); #httpConnections: Set<Deno.HttpConn> = new Set();
#listener?: Deno.Listener; #listener?: Deno.Listener;
#addr?: Deno.NetAddr;
#hasClosed = false;
#ac?: AbortController;
#servePromise?: Deferred<void>;
listening = false; listening = false;
constructor(handler?: ServerHandler) { constructor(handler?: ServerHandler) {
super(); super();
// @ts-ignore Might be undefined without `--unstable` flag
this.#isFlashServer = typeof DenoServe == "function";
if (this.#isFlashServer) {
this.#servePromise = deferred();
this.#servePromise.then(() => this.emit("close"));
}
if (handler !== undefined) { if (handler !== undefined) {
this.on("request", handler); this.on("request", handler);
} }
@ -684,26 +643,16 @@ class ServerImpl extends EventEmitter {
// TODO(bnoordhuis) Node prefers [::] when host is omitted, // TODO(bnoordhuis) Node prefers [::] when host is omitted,
// we on the other hand default to 0.0.0.0. // we on the other hand default to 0.0.0.0.
if (this.#isFlashServer) { this.listening = true;
const hostname = options.host ?? "0.0.0.0"; const hostname = options.host ?? "";
this.#addr = { this.#listener = Deno.listen({ port, hostname });
hostname, nextTick(() => this.#listenLoop());
port,
} as Deno.NetAddr;
this.listening = true;
nextTick(() => this.#serve());
} else {
this.listening = true;
const hostname = options.host ?? "";
this.#listener = Deno.listen({ port, hostname });
nextTick(() => this.#listenLoop());
}
return this; return this;
} }
async #listenLoop() { async #listenLoop() {
const go = async (httpConn: Deno.HttpConn) => { const go = async (tcpConn: Deno.Conn, httpConn: Deno.HttpConn) => {
try { try {
for (;;) { for (;;) {
let reqEvent = null; let reqEvent = null;
@ -721,8 +670,20 @@ class ServerImpl extends EventEmitter {
break; break;
} }
const req = new IncomingMessageForServer(reqEvent.request); const req = new IncomingMessageForServer(reqEvent.request);
const res = new ServerResponse(reqEvent, undefined); if (req.upgrade && this.listenerCount("upgrade") > 0) {
this.emit("request", req, res); const conn = await denoHttp.upgradeHttpRaw(
reqEvent.request,
tcpConn,
) as Deno.Conn;
const socket = new Socket({
handle: new TCP(constants.SERVER, conn),
});
this.emit("upgrade", req, socket, Buffer.from([]));
return;
} else {
const res = new ServerResponse(reqEvent);
this.emit("request", req, res);
}
} }
} finally { } finally {
this.#httpConnections.delete(httpConn); this.#httpConnections.delete(httpConn);
@ -737,56 +698,17 @@ class ServerImpl extends EventEmitter {
for await (const conn of listener) { for await (const conn of listener) {
let httpConn: Deno.HttpConn; let httpConn: Deno.HttpConn;
try { try {
httpConn = Deno.serveHttp(conn); httpConn = httpRuntime.serveHttp(conn);
} catch { } catch {
continue; /// Connection closed. continue; /// Connection closed.
} }
this.#httpConnections.add(httpConn); this.#httpConnections.add(httpConn);
go(httpConn); go(conn, httpConn);
} }
} }
} }
#serve() {
const ac = new AbortController();
const handler = (request: Request) => {
const req = new IncomingMessageForServer(request);
if (req.upgrade && this.listenerCount("upgrade") > 0) {
const [conn, head] = DenoUpgradeHttpRaw(request) as [
Deno.Conn,
Uint8Array,
];
const socket = new Socket({
handle: new TCP(constants.SERVER, conn),
});
this.emit("upgrade", req, socket, Buffer.from(head));
} else {
return new Promise<Response>((resolve): void => {
const res = new ServerResponse(undefined, resolve);
this.emit("request", req, res);
});
}
};
if (this.#hasClosed) {
return;
}
this.#ac = ac;
DenoServe(
{
handler: handler as Deno.ServeHandler,
...this.#addr,
signal: ac.signal,
// @ts-ignore Might be any without `--unstable` flag
onListen: ({ port }) => {
this.#addr!.port = port;
this.emit("listening");
},
},
).then(() => this.#servePromise!.resolve());
}
setTimeout() { setTimeout() {
console.error("Not implemented: Server.setTimeout()"); console.error("Not implemented: Server.setTimeout()");
} }
@ -795,7 +717,6 @@ class ServerImpl extends EventEmitter {
const listening = this.listening; const listening = this.listening;
this.listening = false; this.listening = false;
this.#hasClosed = true;
if (typeof cb === "function") { if (typeof cb === "function") {
if (listening) { if (listening) {
this.once("close", cb); this.once("close", cb);
@ -806,42 +727,28 @@ class ServerImpl extends EventEmitter {
} }
} }
if (this.#isFlashServer) { nextTick(() => this.emit("close"));
if (listening && this.#ac) {
this.#ac.abort();
this.#ac = undefined;
} else {
this.#servePromise!.resolve();
}
} else {
nextTick(() => this.emit("close"));
if (listening) { if (listening) {
this.#listener!.close(); this.#listener!.close();
this.#listener = undefined; this.#listener = undefined;
for (const httpConn of this.#httpConnections) { for (const httpConn of this.#httpConnections) {
try { try {
httpConn.close(); httpConn.close();
} catch { } catch {
// Already closed. // Already closed.
}
} }
this.#httpConnections.clear();
} }
this.#httpConnections.clear();
} }
return this; return this;
} }
address() { address() {
let addr; const addr = this.#listener!.addr as Deno.NetAddr;
if (this.#isFlashServer) {
addr = this.#addr!;
} else {
addr = this.#listener!.addr as Deno.NetAddr;
}
return { return {
port: addr.port, port: addr.port,
address: addr.hostname, address: addr.hostname,