diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index b137ef4ff4..24ae7f6664 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -20,6 +20,8 @@ const servePort = 4502; const { upgradeHttpRaw, addTrailers, + serveHttpOnListener, + serveHttpOnConnection, // @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol } = Deno[Deno.internal]; @@ -165,6 +167,98 @@ Deno.test({ permissions: { net: true } }, async function httpServerBasic() { await server; }); +// Test serving of HTTP on an arbitrary listener. +Deno.test( + { permissions: { net: true } }, + async function httpServerOnListener() { + const ac = new AbortController(); + const promise = deferred(); + const listeningPromise = deferred(); + const listener = Deno.listen({ port: servePort }); + const server = serveHttpOnListener( + listener, + ac.signal, + async ( + request: Request, + { remoteAddr }: { remoteAddr: { hostname: string } }, + ) => { + assertEquals( + new URL(request.url).href, + `http://127.0.0.1:${servePort}/`, + ); + assertEquals(await request.text(), ""); + assertEquals(remoteAddr.hostname, "127.0.0.1"); + promise.resolve(); + return new Response("Hello World", { headers: { "foo": "bar" } }); + }, + createOnErrorCb(ac), + onListen(listeningPromise), + ); + + await listeningPromise; + const resp = await fetch(`http://127.0.0.1:${servePort}/`, { + headers: { "connection": "close" }, + }); + await promise; + const clone = resp.clone(); + const text = await resp.text(); + assertEquals(text, "Hello World"); + assertEquals(resp.headers.get("foo"), "bar"); + const cloneText = await clone.text(); + assertEquals(cloneText, "Hello World"); + ac.abort(); + await server; + }, +); + +// Test serving of HTTP on an arbitrary connection. +Deno.test( + { permissions: { net: true } }, + async function httpServerOnConnection() { + const ac = new AbortController(); + const promise = deferred(); + const listeningPromise = deferred(); + const listener = Deno.listen({ port: servePort }); + const acceptPromise = listener.accept(); + const fetchPromise = fetch(`http://127.0.0.1:${servePort}/`, { + headers: { "connection": "close" }, + }); + + const server = serveHttpOnConnection( + await acceptPromise, + ac.signal, + async ( + request: Request, + { remoteAddr }: { remoteAddr: { hostname: string } }, + ) => { + assertEquals( + new URL(request.url).href, + `http://127.0.0.1:${servePort}/`, + ); + assertEquals(await request.text(), ""); + assertEquals(remoteAddr.hostname, "127.0.0.1"); + promise.resolve(); + return new Response("Hello World", { headers: { "foo": "bar" } }); + }, + createOnErrorCb(ac), + onListen(listeningPromise), + ); + + const resp = await fetchPromise; + await promise; + const clone = resp.clone(); + const text = await resp.text(); + assertEquals(text, "Hello World"); + assertEquals(resp.headers.get("foo"), "bar"); + const cloneText = await clone.text(); + assertEquals(cloneText, "Hello World"); + // Note that we don't need to abort this server -- it closes when the connection does + // ac.abort(); + await server; + listener.close(); + }, +); + Deno.test({ permissions: { net: true } }, async function httpServerOnError() { const ac = new AbortController(); const listeningPromise = deferred(); diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index e3926280b2..d84244ee4b 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -34,7 +34,8 @@ import { readableStreamForRid, ReadableStreamPrototype, } from "ext:deno_web/06_streams.js"; -import { TcpConn } from "ext:deno_net/01_net.js"; +import { listen, TcpConn } from "ext:deno_net/01_net.js"; +import { listenTls } from "ext:deno_net/02_tls.js"; const { ObjectPrototypeIsPrototypeOf, PromisePrototypeCatch, @@ -54,6 +55,7 @@ const { op_http_get_request_method_and_url, op_http_read_request_body, op_http_serve, + op_http_serve_on, op_http_set_promise_complete, op_http_set_response_body_bytes, op_http_set_response_body_resource, @@ -71,6 +73,7 @@ const { "op_http_get_request_method_and_url", "op_http_read_request_body", "op_http_serve", + "op_http_serve_on", "op_http_set_promise_complete", "op_http_set_response_body_bytes", "op_http_set_response_body_resource", @@ -340,12 +343,21 @@ class InnerRequest { } class CallbackContext { + abortController; + responseBodies; scheme; fallbackHost; serverRid; closed; - initialize(args) { + constructor(signal, args) { + signal?.addEventListener( + "abort", + () => this.close(), + { once: true }, + ); + this.abortController = new AbortController(); + this.responseBodies = new SafeSet(); this.serverRid = args[0]; this.scheme = args[1]; this.fallbackHost = args[2]; @@ -500,7 +512,9 @@ async function asyncResponse(responseBodies, req, status, stream) { * * This function returns a promise that will only reject in the case of abnormal exit. */ -function mapToCallback(responseBodies, context, signal, callback, onError) { +function mapToCallback(context, callback, onError) { + const responseBodies = context.responseBodies; + const signal = context.abortController.signal; return async function (req) { // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback // 500 error. @@ -611,18 +625,7 @@ function serve(arg1, arg2) { reusePort: options.reusePort ?? false, }; - const abortController = new AbortController(); - - const responseBodies = new SafeSet(); - const context = new CallbackContext(); - const callback = mapToCallback( - responseBodies, - context, - abortController.signal, - handler, - onError, - ); - + let listener; if (wantsHttps) { if (!options.cert || !options.key) { throw new TypeError( @@ -632,37 +635,56 @@ function serve(arg1, arg2) { listenOpts.cert = options.cert; listenOpts.key = options.key; listenOpts.alpnProtocols = ["h2", "http/1.1"]; - const listener = Deno.listenTls(listenOpts); + listener = listenTls(listenOpts); listenOpts.port = listener.addr.port; - context.initialize(op_http_serve( - listener.rid, - )); } else { - const listener = Deno.listen(listenOpts); + listener = listen(listenOpts); listenOpts.port = listener.addr.port; - context.initialize(op_http_serve( - listener.rid, - )); } - signal?.addEventListener( - "abort", - () => context.close(), - { once: true }, - ); - - const onListen = options.onListen ?? function ({ port }) { - // If the hostname is "0.0.0.0", we display "localhost" in console - // because browsers in Windows don't resolve "0.0.0.0". - // See the discussion in https://github.com/denoland/deno_std/issues/1165 - const hostname = listenOpts.hostname == "0.0.0.0" - ? "localhost" - : listenOpts.hostname; - console.log(`Listening on ${context.scheme}${hostname}:${port}/`); + const onListen = (scheme) => { + const port = listenOpts.port; + if (options.onListen) { + options.onListen({ port }); + } else { + // If the hostname is "0.0.0.0", we display "localhost" in console + // because browsers in Windows don't resolve "0.0.0.0". + // See the discussion in https://github.com/denoland/deno_std/issues/1165 + const hostname = listenOpts.hostname == "0.0.0.0" + ? "localhost" + : listenOpts.hostname; + console.log(`Listening on ${scheme}${hostname}:${port}/`); + } }; - onListen({ port: listenOpts.port }); + return serveHttpOnListener(listener, signal, handler, onError, onListen); +} +/** + * Serve HTTP/1.1 and/or HTTP/2 on an arbitrary listener. + */ +function serveHttpOnListener(listener, signal, handler, onError, onListen) { + const context = new CallbackContext(signal, op_http_serve(listener.rid)); + const callback = mapToCallback(context, handler, onError); + + onListen(context.scheme); + + return serveHttpOn(context, callback); +} + +/** + * Serve HTTP/1.1 and/or HTTP/2 on an arbitrary connection. + */ +function serveHttpOnConnection(connection, signal, handler, onError, onListen) { + const context = new CallbackContext(signal, op_http_serve_on(connection.rid)); + const callback = mapToCallback(context, handler, onError); + + onListen(context.scheme); + + return serveHttpOn(context, callback); +} + +function serveHttpOn(context, callback) { let ref = true; let currentPromise = null; const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); @@ -710,7 +732,7 @@ function serve(arg1, arg2) { }); } - for (const streamRid of new SafeSetIterator(responseBodies)) { + for (const streamRid of new SafeSetIterator(context.responseBodies)) { core.tryClose(streamRid); } })(); @@ -734,5 +756,7 @@ function serve(arg1, arg2) { internals.addTrailers = addTrailers; internals.upgradeHttpRaw = upgradeHttpRaw; +internals.serveHttpOnListener = serveHttpOnListener; +internals.serveHttpOnConnection = serveHttpOnConnection; export { serve, upgradeHttpRaw };