// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // deno-lint-ignore-file camelcase const core = globalThis.Deno.core; const primordials = globalThis.__bootstrap.primordials; const internals = globalThis.__bootstrap.internals; const { BadResourcePrototype } = core; import { InnerBody } from "ext:deno_fetch/22_body.js"; import { Event } from "ext:deno_web/02_event.js"; import { fromInnerResponse, newInnerResponse, toInnerResponse, } from "ext:deno_fetch/23_response.js"; import { fromInnerRequest, toInnerRequest } from "ext:deno_fetch/23_request.js"; import { AbortController } from "ext:deno_web/03_abort_signal.js"; import { _eventLoop, _idleTimeoutDuration, _idleTimeoutTimeout, _protocol, _readyState, _rid, _role, _server, _serverHandleIdleTimeout, SERVER, WebSocket, } from "ext:deno_websocket/01_websocket.js"; import { Deferred, getReadableStreamResourceBacking, readableStreamClose, readableStreamForRid, ReadableStreamPrototype, } from "ext:deno_web/06_streams.js"; import { TcpConn } from "ext:deno_net/01_net.js"; const { ObjectPrototypeIsPrototypeOf, PromisePrototypeCatch, SafeSet, SafeSetIterator, SetPrototypeAdd, SetPrototypeDelete, Symbol, TypeError, Uint8Array, Uint8ArrayPrototype, } = primordials; const { op_http_wait, op_upgrade, op_get_request_headers, op_get_request_method_and_url, op_read_request_body, op_serve_http, op_set_promise_complete, op_set_response_body_bytes, op_set_response_body_resource, op_set_response_body_stream, op_set_response_body_text, op_set_response_header, op_set_response_headers, op_upgrade_raw, op_ws_server_create, } = Deno.core.generateAsyncOpHandler( "op_http_wait", "op_upgrade", "op_get_request_headers", "op_get_request_method_and_url", "op_read_request_body", "op_serve_http", "op_set_promise_complete", "op_set_response_body_bytes", "op_set_response_body_resource", "op_set_response_body_stream", "op_set_response_body_text", "op_set_response_header", "op_set_response_headers", "op_upgrade_raw", "op_ws_server_create", ); const _upgraded = Symbol("_upgraded"); function internalServerError() { // "Internal Server Error" return new Response( new Uint8Array([ 73, 110, 116, 101, 114, 110, 97, 108, 32, 83, 101, 114, 118, 101, 114, 32, 69, 114, 114, 111, 114, ]), { status: 500 }, ); } // Used to ensure that user returns a valid response (but not a different response) from handlers that are upgraded. const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse( newInnerResponse(101), "immutable", ); function upgradeHttpRaw(req, conn) { const inner = toInnerRequest(req); if (inner._wantsUpgrade) { return inner._wantsUpgrade("upgradeHttpRaw", conn); } throw new TypeError("upgradeHttpRaw may only be used with Deno.serve"); } class InnerRequest { #slabId; #context; #methodAndUri; #streamRid; #body; #upgraded; constructor(slabId, context) { this.#slabId = slabId; this.#context = context; this.#upgraded = false; } close() { if (this.#streamRid !== undefined) { core.close(this.#streamRid); this.#streamRid = undefined; } this.#slabId = undefined; } get [_upgraded]() { return this.#upgraded; } _wantsUpgrade(upgradeType, ...originalArgs) { if (this.#upgraded) { throw new Deno.errors.Http("already upgraded"); } if (this.#slabId === undefined) { throw new Deno.errors.Http("already closed"); } // upgradeHttp is async // TODO(mmastrac) if (upgradeType == "upgradeHttp") { throw "upgradeHttp is unavailable in Deno.serve at this time"; } // upgradeHttpRaw is sync if (upgradeType == "upgradeHttpRaw") { const slabId = this.#slabId; const underlyingConn = originalArgs[0]; this.url(); this.headerList; this.close(); this.#upgraded = () => {}; const upgradeRid = op_upgrade_raw(slabId); const conn = new TcpConn( upgradeRid, underlyingConn?.remoteAddr, underlyingConn?.localAddr, ); return { response: UPGRADE_RESPONSE_SENTINEL, conn }; } // upgradeWebSocket is sync if (upgradeType == "upgradeWebSocket") { const response = originalArgs[0]; const ws = originalArgs[1]; const slabId = this.#slabId; this.url(); this.headerList; this.close(); const goAhead = new Deferred(); this.#upgraded = () => { goAhead.resolve(); }; // Start the upgrade in the background. (async () => { try { // Returns the connection and extra bytes, which we can pass directly to op_ws_server_create const upgrade = await op_upgrade( slabId, response.headerList, ); const wsRid = op_ws_server_create(upgrade[0], upgrade[1]); // We have to wait for the go-ahead signal await goAhead; ws[_rid] = wsRid; ws[_readyState] = WebSocket.OPEN; ws[_role] = SERVER; const event = new Event("open"); ws.dispatchEvent(event); ws[_eventLoop](); if (ws[_idleTimeoutDuration]) { ws.addEventListener( "close", () => clearTimeout(ws[_idleTimeoutTimeout]), ); } ws[_serverHandleIdleTimeout](); } catch (error) { const event = new ErrorEvent("error", { error }); ws.dispatchEvent(event); } })(); return { response: UPGRADE_RESPONSE_SENTINEL, socket: ws }; } } url() { if (this.#methodAndUri === undefined) { if (this.#slabId === undefined) { throw new TypeError("request closed"); } // TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider // splitting this up into multiple ops. this.#methodAndUri = op_get_request_method_and_url(this.#slabId); } const path = this.#methodAndUri[2]; // * is valid for OPTIONS if (path === "*") { return "*"; } // If the path is empty, return the authority (valid for CONNECT) if (path == "") { return this.#methodAndUri[1]; } // CONNECT requires an authority if (this.#methodAndUri[0] == "CONNECT") { return this.#methodAndUri[1]; } const hostname = this.#methodAndUri[1]; if (hostname) { // Construct a URL from the scheme, the hostname, and the path return this.#context.scheme + hostname + path; } // Construct a URL from the scheme, the fallback hostname, and the path return this.#context.scheme + this.#context.fallbackHost + path; } get remoteAddr() { if (this.#methodAndUri === undefined) { if (this.#slabId === undefined) { throw new TypeError("request closed"); } this.#methodAndUri = op_get_request_method_and_url(this.#slabId); } return { transport: "tcp", hostname: this.#methodAndUri[3], port: this.#methodAndUri[4], }; } get method() { if (this.#methodAndUri === undefined) { if (this.#slabId === undefined) { throw new TypeError("request closed"); } this.#methodAndUri = op_get_request_method_and_url(this.#slabId); } return this.#methodAndUri[0]; } get body() { if (this.#slabId === undefined) { throw new TypeError("request closed"); } if (this.#body !== undefined) { return this.#body; } // If the method is GET or HEAD, we do not want to include a body here, even if the Rust // side of the code is willing to provide it to us. if (this.method == "GET" || this.method == "HEAD") { this.#body = null; return null; } this.#streamRid = op_read_request_body(this.#slabId); this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false)); return this.#body; } get headerList() { if (this.#slabId === undefined) { throw new TypeError("request closed"); } return op_get_request_headers(this.#slabId); } get slabId() { return this.#slabId; } } class CallbackContext { scheme; fallbackHost; serverRid; closed; initialize(args) { this.serverRid = args[0]; this.scheme = args[1]; this.fallbackHost = args[2]; this.closed = false; } close() { try { this.closed = true; core.tryClose(this.serverRid); } catch { // Pass } } } function fastSyncResponseOrStream(req, respBody) { if (respBody === null || respBody === undefined) { // Don't set the body return null; } const stream = respBody.streamOrStatic; const body = stream.body; if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) { op_set_response_body_bytes(req, body); return null; } if (typeof body === "string") { op_set_response_body_text(req, body); return null; } // At this point in the response it needs to be a stream if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) { throw TypeError("invalid response"); } const resourceBacking = getReadableStreamResourceBacking(stream); if (resourceBacking) { op_set_response_body_resource( req, resourceBacking.rid, resourceBacking.autoClose, ); return null; } return stream; } async function asyncResponse(responseBodies, req, status, stream) { const reader = stream.getReader(); let responseRid; let closed = false; let timeout; try { // IMPORTANT: We get a performance boost from this optimization, but V8 is very // sensitive to the order and structure. Benchmark any changes to this code. // Optimize for streams that are done in zero or one packets. We will not // have to allocate a resource in this case. const { value: value1, done: done1 } = await reader.read(); if (done1) { closed = true; // Exit 1: no response body at all, extreme fast path // Reader will be closed by finally block return; } // The second value cannot block indefinitely, as someone may be waiting on a response // of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms // and we race it. let timeoutPromise; timeout = setTimeout(() => { responseRid = op_set_response_body_stream(req); SetPrototypeAdd(responseBodies, responseRid); op_set_promise_complete(req, status); timeoutPromise = core.writeAll(responseRid, value1); }, 250); const { value: value2, done: done2 } = await reader.read(); if (timeoutPromise) { await timeoutPromise; if (done2) { closed = true; // Exit 2(a): read 2 is EOS, and timeout resolved. // Reader will be closed by finally block // Response stream will be closed by finally block. return; } // Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward. } else { clearTimeout(timeout); timeout = undefined; if (done2) { // Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough. // Reader will be closed by finally block // No response stream closed = true; op_set_response_body_bytes(req, value1); return; } responseRid = op_set_response_body_stream(req); SetPrototypeAdd(responseBodies, responseRid); op_set_promise_complete(req, status); // Write our first packet await core.writeAll(responseRid, value1); } await core.writeAll(responseRid, value2); while (true) { const { value, done } = await reader.read(); if (done) { closed = true; break; } await core.writeAll(responseRid, value); } } catch (error) { closed = true; try { await reader.cancel(error); } catch { // Pass } } finally { if (!closed) { readableStreamClose(reader); } if (timeout !== undefined) { clearTimeout(timeout); } if (responseRid) { core.tryClose(responseRid); SetPrototypeDelete(responseBodies, responseRid); } else { op_set_promise_complete(req, status); } } } /** * Maps the incoming request slab ID to a fully-fledged Request object, passes it to the user-provided * callback, then extracts the response that was returned from that callback. The response is then pulled * apart and handled on the Rust side. * * This function returns a promise that will only reject in the case of abnormal exit. */ function mapToCallback(responseBodies, context, signal, callback, onError) { return async function (req) { // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback // 500 error. let innerRequest; let response; try { if (callback.length > 0) { innerRequest = new InnerRequest(req, context); const request = fromInnerRequest(innerRequest, signal, "immutable"); if (callback.length === 1) { response = await callback(request); } else { response = await callback(request, { get remoteAddr() { return innerRequest.remoteAddr; }, }); } } else { response = await callback(); } } catch (error) { try { response = await onError(error); } catch (error) { console.error("Exception in onError while handling exception", error); response = internalServerError(); } } const inner = toInnerResponse(response); if (innerRequest?.[_upgraded]) { // We're done here as the connection has been upgraded during the callback and no longer requires servicing. if (response !== UPGRADE_RESPONSE_SENTINEL) { console.error("Upgrade response was not returned from callback"); context.close(); } innerRequest?.[_upgraded](); return; } // Did everything shut down while we were waiting? if (context.closed) { innerRequest?.close(); return; } const status = inner.status; const headers = inner.headerList; if (headers && headers.length > 0) { if (headers.length == 1) { op_set_response_header(req, headers[0][0], headers[0][1]); } else { op_set_response_headers(req, headers); } } // Attempt to response quickly to this request, otherwise extract the stream const stream = fastSyncResponseOrStream(req, inner.body); if (stream !== null) { // Handle the stream asynchronously await asyncResponse(responseBodies, req, status, stream); } else { op_set_promise_complete(req, status); } innerRequest?.close(); }; } async function serve(arg1, arg2) { let options = undefined; let handler = undefined; if (typeof arg1 === "function") { handler = arg1; } else if (typeof arg2 === "function") { handler = arg2; options = arg1; } else { options = arg1; } if (handler === undefined) { if (options === undefined) { throw new TypeError( "No handler was provided, so an options bag is mandatory.", ); } handler = options.handler; } if (typeof handler !== "function") { throw new TypeError("A handler function must be provided."); } if (options === undefined) { options = {}; } const wantsHttps = options.cert || options.key; const signal = options.signal; const onError = options.onError ?? function (error) { console.error(error); return internalServerError(); }; const listenOpts = { hostname: options.hostname ?? "0.0.0.0", port: options.port ?? (wantsHttps ? 9000 : 8000), reusePort: options.reusePort ?? false, }; const abortController = new AbortController(); const responseBodies = new SafeSet(); const context = new CallbackContext(); const callback = mapToCallback( responseBodies, context, abortController.signal, handler, onError, ); if (wantsHttps) { if (!options.cert || !options.key) { throw new TypeError( "Both cert and key must be provided to enable HTTPS.", ); } listenOpts.cert = options.cert; listenOpts.key = options.key; listenOpts.alpnProtocols = ["h2", "http/1.1"]; const listener = Deno.listenTls(listenOpts); listenOpts.port = listener.addr.port; context.initialize(op_serve_http( listener.rid, )); } else { const listener = Deno.listen(listenOpts); listenOpts.port = listener.addr.port; context.initialize(op_serve_http( listener.rid, )); } signal?.addEventListener( "abort", () => context.close(), { once: true }, ); const onListen = options.onListen ?? function ({ port }) { // If the hostname is "0.0.0.0", we display "localhost" in console // because browsers in Windows don't resolve "0.0.0.0". // See the discussion in https://github.com/denoland/deno_std/issues/1165 const hostname = listenOpts.hostname == "0.0.0.0" ? "localhost" : listenOpts.hostname; console.log(`Listening on ${context.scheme}${hostname}:${port}/`); }; onListen({ port: listenOpts.port }); while (true) { const rid = context.serverRid; let req; try { req = await op_http_wait(rid); } catch (error) { if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) { break; } throw new Deno.errors.Http(error); } if (req === 0xffffffff) { break; } PromisePrototypeCatch(callback(req), (error) => { // Abnormal exit console.error( "Terminating Deno.serve loop due to unexpected error", error, ); context.close(); }); } for (const streamRid of new SafeSetIterator(responseBodies)) { core.tryClose(streamRid); } } internals.upgradeHttpRaw = upgradeHttpRaw; export { serve, upgradeHttpRaw };