From aa3f85ddd2636116d3c7d02c212070b7ae2ffb23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Mon, 3 Apr 2023 19:01:02 +0200 Subject: [PATCH] refactor: remove "ext/flash" (#18578) With https://github.com/denoland/deno/pull/18568 landed we no longer need "ext/flash". This commit removes "deno_flash" extension completely. This should have some impact on the binary and snapshot size. Closes https://github.com/denoland/deno/issues/17356 --- Cargo.lock | 20 - Cargo.toml | 4 - cli/bench/http/deno_http_flash_ops.js | 37 - cli/bench/http/deno_http_flash_ops_spawn.js | 18 - cli/bench/http/deno_http_flash_post_bin.js | 16 - cli/bench/http/deno_http_flash_post_bin.lua | 5 - cli/bench/http/deno_http_flash_spawn.js | 18 - ...{deno_http_flash.js => deno_http_serve.js} | 0 cli/build.rs | 1 - cli/tests/integration/run_tests.rs | 7 - cli/tests/testdata/run/flash_shutdown/main.ts | 23 - ext/flash/01_http.js | 793 --------- ext/flash/Cargo.toml | 30 - ext/flash/README.md | 7 - ext/flash/chunked.rs | 273 --- ext/flash/lib.rs | 1543 ----------------- ext/flash/request.rs | 49 - ext/flash/sendfile.rs | 82 - ext/flash/socket.rs | 151 -- runtime/Cargo.toml | 2 - runtime/build.rs | 12 - runtime/js/90_deno_ns.js | 2 - runtime/lib.rs | 1 - runtime/ops/http.rs | 19 +- runtime/permissions/mod.rs | 11 - runtime/web_worker.rs | 1 - runtime/worker.rs | 1 - 27 files changed, 1 insertion(+), 3125 deletions(-) delete mode 100644 cli/bench/http/deno_http_flash_ops.js delete mode 100644 cli/bench/http/deno_http_flash_ops_spawn.js delete mode 100644 cli/bench/http/deno_http_flash_post_bin.js delete mode 100644 cli/bench/http/deno_http_flash_post_bin.lua delete mode 100644 cli/bench/http/deno_http_flash_spawn.js rename cli/bench/http/{deno_http_flash.js => deno_http_serve.js} (100%) delete mode 100644 cli/tests/testdata/run/flash_shutdown/main.ts delete mode 100644 ext/flash/01_http.js delete mode 100644 ext/flash/Cargo.toml delete mode 100644 ext/flash/README.md delete mode 100644 ext/flash/chunked.rs delete mode 100644 ext/flash/lib.rs delete mode 100644 ext/flash/request.rs delete mode 100644 ext/flash/sendfile.rs delete mode 100644 ext/flash/socket.rs diff --git a/Cargo.lock b/Cargo.lock index 10d7f335c3..15275f911e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -957,25 +957,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "deno_flash" -version = "0.33.0" -dependencies = [ - "deno_core", - "deno_tls", - "deno_websocket", - "http", - "httparse", - "libc", - "log", - "mio", - "rustls", - "rustls-pemfile", - "serde", - "socket2", - "tokio", -] - [[package]] name = "deno_fs" version = "0.7.0" @@ -1181,7 +1162,6 @@ dependencies = [ "deno_crypto", "deno_fetch", "deno_ffi", - "deno_flash", "deno_fs", "deno_http", "deno_io", diff --git a/Cargo.toml b/Cargo.toml index 88fc8d2ce9..c12b14af1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ members = [ "ext/console", "ext/crypto", "ext/fetch", - "ext/flash", "ext/ffi", "ext/fs", "ext/http", @@ -62,7 +61,6 @@ deno_console = { version = "0.97.0", path = "./ext/console" } deno_crypto = { version = "0.111.0", path = "./ext/crypto" } deno_fetch = { version = "0.121.0", path = "./ext/fetch" } deno_ffi = { version = "0.84.0", path = "./ext/ffi" } -deno_flash = { version = "0.33.0", path = "./ext/flash" } deno_fs = { version = "0.7.0", path = "./ext/fs" } deno_http = { version = "0.92.0", path = "./ext/http" } deno_io = { version = "0.7.0", path = "./ext/io" } @@ -265,8 +263,6 @@ opt-level = 3 opt-level = 3 [profile.release.package.deno_http] opt-level = 3 -[profile.release.package.deno_flash] -opt-level = 3 [profile.release.package.deno_net] opt-level = 3 [profile.release.package.deno_web] diff --git a/cli/bench/http/deno_http_flash_ops.js b/cli/bench/http/deno_http_flash_ops.js deleted file mode 100644 index 7b024f9afb..0000000000 --- a/cli/bench/http/deno_http_flash_ops.js +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -// deno-lint-ignore-file - -const { - opAsync, - ops: { op_flash_make_request, op_flash_serve }, - encode, -} = Deno[Deno.internal].core; -const addr = Deno.args[0] || "127.0.0.1:4500"; -const [hostname, port] = addr.split(":"); -const serverId = op_flash_serve({ hostname, port, reuseport: true }); -const serverPromise = opAsync("op_flash_drive_server", serverId); - -const fastOps = op_flash_make_request(); -function nextRequest() { - return fastOps.nextRequest(); -} -function respond(token, response) { - return fastOps.respond(token, response, true); -} - -const response = encode( - "HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World", -); -let offset = 0; -while (true) { - let token = nextRequest(); - if (token === 0) token = await opAsync("op_flash_next_async", serverId); - for (let i = offset; i < offset + token; i++) { - respond( - i, - response, - ); - } - offset += token; -} diff --git a/cli/bench/http/deno_http_flash_ops_spawn.js b/cli/bench/http/deno_http_flash_ops_spawn.js deleted file mode 100644 index b9d11462ff..0000000000 --- a/cli/bench/http/deno_http_flash_ops_spawn.js +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -if (Deno.build.os !== "linux") { - throw new Error("SO_REUSEPORT is only supported on Linux"); -} - -const executable = Deno.execPath(); -const path = new URL("./deno_http_flash_ops.js", import.meta.url).pathname; -// single flash instance runs on ~1.8 cores -const cpus = navigator.hardwareConcurrency / 2; -const processes = new Array(cpus); -for (let i = 0; i < cpus; i++) { - const proc = Deno.run({ - cmd: [executable, "run", "-A", "--unstable", path, Deno.args[0]], - }); - processes.push(proc.status()); -} -await Promise.all(processes); diff --git a/cli/bench/http/deno_http_flash_post_bin.js b/cli/bench/http/deno_http_flash_post_bin.js deleted file mode 100644 index b81553dcd4..0000000000 --- a/cli/bench/http/deno_http_flash_post_bin.js +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -const addr = Deno.args[0] || "127.0.0.1:4500"; -const [hostname, port] = addr.split(":"); -const { serve } = Deno; - -async function handler(request) { - try { - const buffer = await request.arrayBuffer(); - return new Response(buffer.byteLength); - } catch (e) { - console.log(e); - } -} - -serve(handler, { hostname, port }); diff --git a/cli/bench/http/deno_http_flash_post_bin.lua b/cli/bench/http/deno_http_flash_post_bin.lua deleted file mode 100644 index c8f5d3e3f7..0000000000 --- a/cli/bench/http/deno_http_flash_post_bin.lua +++ /dev/null @@ -1,5 +0,0 @@ -wrk.method = "POST" -wrk.headers["Content-Type"] = "application/octet-stream" - -file = io.open("./cli/bench/testdata/128k.bin", "rb") -wrk.body = file:read("*a") \ No newline at end of file diff --git a/cli/bench/http/deno_http_flash_spawn.js b/cli/bench/http/deno_http_flash_spawn.js deleted file mode 100644 index e47acffc59..0000000000 --- a/cli/bench/http/deno_http_flash_spawn.js +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -if (Deno.build.os !== "linux") { - throw new Error("SO_REUSEPORT is only supported on Linux"); -} - -const executable = Deno.execPath(); -const path = new URL("./deno_http_flash.js", import.meta.url).pathname; -// single flash instance runs on ~1.8 cores -const cpus = navigator.hardwareConcurrency / 2; -const processes = new Array(cpus); -for (let i = 0; i < cpus; i++) { - const proc = Deno.run({ - cmd: [executable, "run", "-A", "--unstable", path, Deno.args[0]], - }); - processes.push(proc.status()); -} -await Promise.all(processes); diff --git a/cli/bench/http/deno_http_flash.js b/cli/bench/http/deno_http_serve.js similarity index 100% rename from cli/bench/http/deno_http_flash.js rename to cli/bench/http/deno_http_serve.js diff --git a/cli/build.rs b/cli/build.rs index 2a7327a90f..ddd942593f 100644 --- a/cli/build.rs +++ b/cli/build.rs @@ -362,7 +362,6 @@ fn create_cli_snapshot(snapshot_path: PathBuf) { deno_http::deno_http::init_ops(), deno_io::deno_io::init_ops(Default::default()), deno_fs::deno_fs::init_ops::(false), - deno_flash::deno_flash::init_ops::(false), // No --unstable deno_node::deno_node::init_ops::(None), cli::init_ops_and_esm(), // NOTE: This needs to be init_ops_and_esm! ]; diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs index b661e135ae..1d70a9cb74 100644 --- a/cli/tests/integration/run_tests.rs +++ b/cli/tests/integration/run_tests.rs @@ -4230,13 +4230,6 @@ itest!(config_file_lock_true { exit_code: 10, }); -// TODO(bartlomieju): this test is flaky on CI, reenable it after debugging -// // Check https://github.com/denoland/deno_std/issues/2882 -// itest!(flash_shutdown { -// args: "run --unstable --allow-net run/flash_shutdown/main.ts", -// exit_code: 0, -// }); - itest!(permission_args { args: "run run/001_hello.js --allow-net", output: "run/permission_args.out", diff --git a/cli/tests/testdata/run/flash_shutdown/main.ts b/cli/tests/testdata/run/flash_shutdown/main.ts deleted file mode 100644 index 5e0908efb1..0000000000 --- a/cli/tests/testdata/run/flash_shutdown/main.ts +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -// Deno.serve caused segfault with this example after #16383 -// refs: -// - https://github.com/denoland/deno/pull/16383 -// - https://github.com/denoland/deno_std/issues/2882 -// - revert https://github.com/denoland/deno/pull/16610 - -const ctl = new AbortController(); -Deno.serve(() => - new Promise((resolve) => { - resolve(new Response(new TextEncoder().encode("ok"))); - ctl.abort(); - }), { - signal: ctl.signal, - async onListen({ port }) { - const a = await fetch(`http://localhost:${port}`, { - method: "POST", - body: "", - }); - await a.text(); - }, -}); diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js deleted file mode 100644 index fe503ed05d..0000000000 --- a/ext/flash/01_http.js +++ /dev/null @@ -1,793 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -const core = globalThis.Deno.core; -const ops = core.ops; -const primordials = globalThis.__bootstrap.primordials; -import { BlobPrototype } from "ext:deno_web/09_file.js"; -import { TcpConn } from "ext:deno_net/01_net.js"; -import { toInnerResponse } from "ext:deno_fetch/23_response.js"; -import { _flash, fromFlashRequest } from "ext:deno_fetch/23_request.js"; -import { Event } from "ext:deno_web/02_event.js"; -import { - _state, - getReadableStreamResourceBacking, - ReadableStream, - readableStreamClose, - ReadableStreamPrototype, -} from "ext:deno_web/06_streams.js"; -import { - _eventLoop, - _idleTimeoutDuration, - _idleTimeoutTimeout, - _protocol, - _readyState, - _rid, - _serverHandleIdleTimeout, - WebSocket, -} from "ext:deno_websocket/01_websocket.js"; -import { _ws } from "ext:deno_http/01_http.js"; -const { - ObjectPrototypeIsPrototypeOf, - PromisePrototype, - PromisePrototypeCatch, - PromisePrototypeThen, - SafePromiseAll, - TypedArrayPrototypeGetByteLength, - TypedArrayPrototypeGetSymbolToStringTag, - TypedArrayPrototypeSet, - TypedArrayPrototypeSubarray, - TypeError, - Uint8Array, -} = primordials; - -const statusCodes = { - 100: "Continue", - 101: "Switching Protocols", - 102: "Processing", - 200: "OK", - 201: "Created", - 202: "Accepted", - 203: "Non Authoritative Information", - 204: "No Content", - 205: "Reset Content", - 206: "Partial Content", - 207: "Multi-Status", - 208: "Already Reported", - 226: "IM Used", - 300: "Multiple Choices", - 301: "Moved Permanently", - 302: "Found", - 303: "See Other", - 304: "Not Modified", - 305: "Use Proxy", - 307: "Temporary Redirect", - 308: "Permanent Redirect", - 400: "Bad Request", - 401: "Unauthorized", - 402: "Payment Required", - 403: "Forbidden", - 404: "Not Found", - 405: "Method Not Allowed", - 406: "Not Acceptable", - 407: "Proxy Authentication Required", - 408: "Request Timeout", - 409: "Conflict", - 410: "Gone", - 411: "Length Required", - 412: "Precondition Failed", - 413: "Payload Too Large", - 414: "URI Too Long", - 415: "Unsupported Media Type", - 416: "Range Not Satisfiable", - 418: "I'm a teapot", - 421: "Misdirected Request", - 422: "Unprocessable Entity", - 423: "Locked", - 424: "Failed Dependency", - 426: "Upgrade Required", - 428: "Precondition Required", - 429: "Too Many Requests", - 431: "Request Header Fields Too Large", - 451: "Unavailable For Legal Reasons", - 500: "Internal Server Error", - 501: "Not Implemented", - 502: "Bad Gateway", - 503: "Service Unavailable", - 504: "Gateway Timeout", - 505: "HTTP Version Not Supported", - 506: "Variant Also Negotiates", - 507: "Insufficient Storage", - 508: "Loop Detected", - 510: "Not Extended", - 511: "Network Authentication Required", -}; - -const methods = { - 0: "GET", - 1: "HEAD", - 2: "CONNECT", - 3: "PUT", - 4: "DELETE", - 5: "OPTIONS", - 6: "TRACE", - 7: "POST", - 8: "PATCH", -}; - -let dateInterval; -let date; - -/** - * Construct an HTTP response message. - * All HTTP/1.1 messages consist of a start-line followed by a sequence - * of octets. - * - * HTTP-message = start-line - * *( header-field CRLF ) - * CRLF - * [ message-body ] - * - * @param {keyof typeof methods} method - * @param {keyof typeof statusCodes} status - * @param {[name: string, value: string][]} headerList - * @param {Uint8Array | string | null} body - * @param {number} bodyLen - * @param {boolean} earlyEnd - * @returns {Uint8Array | string} - */ -function http1Response( - method, - status, - headerList, - body, - bodyLen, - earlyEnd = false, -) { - // HTTP uses a "." numbering scheme - // HTTP-version = HTTP-name "/" DIGIT "." DIGIT - // HTTP-name = %x48.54.54.50 ; "HTTP", case-sensitive - // - // status-line = HTTP-version SP status-code SP reason-phrase CRLF - // Date header: https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.1.2 - let str = `HTTP/1.1 ${status} ${statusCodes[status]}\r\nDate: ${date}\r\n`; - for (let i = 0; i < headerList.length; ++i) { - const { 0: name, 1: value } = headerList[i]; - // header-field = field-name ":" OWS field-value OWS - str += `${name}: ${value}\r\n`; - } - - // https://datatracker.ietf.org/doc/html/rfc7231#section-6.3.6 - if (status === 205 || status === 304) { - // MUST NOT generate a payload in a 205 response. - // indicate a zero-length body for the response by - // including a Content-Length header field with a value of 0. - str += "Content-Length: 0\r\n\r\n"; - return str; - } - - // MUST NOT send Content-Length or Transfer-Encoding if status code is 1xx or 204. - if (status === 204 || status < 200) { - str += "\r\n"; - return str; - } - - if (earlyEnd === true) { - return str; - } - - // null body status is validated by inititalizeAResponse in ext/fetch - if (body !== null && body !== undefined) { - str += `Content-Length: ${bodyLen}\r\n\r\n`; - } else { - str += "Transfer-Encoding: chunked\r\n\r\n"; - return str; - } - - // A HEAD request. - if (method === 1) return str; - - if (typeof body === "string") { - str += body ?? ""; - } else { - const head = core.encode(str); - const response = new Uint8Array( - TypedArrayPrototypeGetByteLength(head) + bodyLen, - ); - TypedArrayPrototypeSet(response, head, 0); - TypedArrayPrototypeSet( - response, - body, - TypedArrayPrototypeGetByteLength(head), - ); - return response; - } - - return str; -} - -function prepareFastCalls() { - return ops.op_flash_make_request(); -} - -function hostnameForDisplay(hostname) { - // If the hostname is "0.0.0.0", we display "localhost" in console - // because browsers in Windows don't resolve "0.0.0.0". - // See the discussion in https://github.com/denoland/deno_std/issues/1165 - return hostname === "0.0.0.0" ? "localhost" : hostname; -} - -function writeFixedResponse( - server, - requestId, - response, - responseLen, - end, - respondFast, -) { - let nwritten = 0; - // TypedArray - if (typeof response !== "string") { - nwritten = respondFast(requestId, response, end); - } else { - // string - nwritten = ops.op_flash_respond( - server, - requestId, - response, - end, - ); - } - - if (nwritten < responseLen) { - core.opAsync( - "op_flash_respond_async", - server, - requestId, - response.slice(nwritten), - end, - ); - } -} - -// TODO(@littledivy): Woah woah, cut down the number of arguments. -async function handleResponse( - req, - resp, - body, - hasBody, - method, - serverId, - i, - respondFast, - respondChunked, - tryRespondChunked, -) { - // there might've been an HTTP upgrade. - if (resp === undefined) { - return; - } - const innerResp = toInnerResponse(resp); - // If response body length is known, it will be sent synchronously in a - // single op, in other case a "response body" resource will be created and - // we'll be streaming it. - /** @type {ReadableStream | Uint8Array | string | null} */ - let respBody = null; - let isStreamingResponseBody = false; - if (innerResp.body !== null) { - if (typeof innerResp.body.streamOrStatic?.body === "string") { - if (innerResp.body.streamOrStatic.consumed === true) { - throw new TypeError("Body is unusable."); - } - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; - isStreamingResponseBody = false; - } else if ( - ObjectPrototypeIsPrototypeOf( - ReadableStreamPrototype, - innerResp.body.streamOrStatic, - ) - ) { - if (innerResp.body.unusable()) { - throw new TypeError("Body is unusable."); - } - if ( - innerResp.body.length === null || - ObjectPrototypeIsPrototypeOf( - BlobPrototype, - innerResp.body.source, - ) - ) { - respBody = innerResp.body.stream; - } else { - const reader = innerResp.body.stream.getReader(); - const r1 = await reader.read(); - if (r1.done) { - respBody = new Uint8Array(0); - } else { - respBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); - } - } - isStreamingResponseBody = !( - typeof respBody === "string" || - TypedArrayPrototypeGetSymbolToStringTag(respBody) === "Uint8Array" - ); - } else { - if (innerResp.body.streamOrStatic.consumed === true) { - throw new TypeError("Body is unusable."); - } - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; - } - } else { - respBody = new Uint8Array(0); - } - - const ws = resp[_ws]; - if (isStreamingResponseBody === false) { - const length = typeof respBody === "string" - ? core.byteLength(respBody) - : TypedArrayPrototypeGetByteLength(respBody); - const responseStr = http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - respBody, - length, - ); - // A HEAD request always ignores body, but includes the correct content-length size. - const responseLen = method === 1 ? core.byteLength(responseStr) : length; - writeFixedResponse( - serverId, - i, - responseStr, - responseLen, - !ws, // Don't close socket if there is a deferred websocket upgrade. - respondFast, - ); - } - - return (async () => { - if (!ws) { - if (hasBody && body[_state] !== "closed") { - // TODO(@littledivy): Optimize by draining in a single op. - try { - await req.arrayBuffer(); - } catch { /* pass */ } - } - } - - if (isStreamingResponseBody === true) { - const resourceBacking = getReadableStreamResourceBacking(respBody); - if (resourceBacking) { - if (respBody.locked) { - throw new TypeError("ReadableStream is locked."); - } - const reader = respBody.getReader(); // Aquire JS lock. - try { - PromisePrototypeThen( - core.opAsync( - "op_flash_write_resource", - http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - null, - 0, // Content-Length will be set by the op. - true, - ), - serverId, - i, - resourceBacking.rid, - resourceBacking.autoClose, - ), - () => { - // Release JS lock. - readableStreamClose(respBody); - }, - ); - } catch (error) { - await reader.cancel(error); - throw error; - } - } else { - const reader = respBody.getReader(); - - // Best case: sends headers + first chunk in a single go. - const { value, done } = await reader.read(); - writeFixedResponse( - serverId, - i, - http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - null, - // deno-lint-ignore prefer-primordials - respBody.byteLength, - ), - // deno-lint-ignore prefer-primordials - respBody.byteLength, - false, - respondFast, - ); - - await tryRespondChunked( - i, - value, - done, - ); - - if (!done) { - while (true) { - const chunk = await reader.read(); - await respondChunked( - i, - chunk.value, - chunk.done, - ); - if (chunk.done) break; - } - } - } - } - - if (ws) { - const wsRid = await core.opAsync( - "op_flash_upgrade_websocket", - serverId, - i, - ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); - - ws[_eventLoop](); - if (ws[_idleTimeoutDuration]) { - ws.addEventListener( - "close", - () => clearTimeout(ws[_idleTimeoutTimeout]), - ); - } - ws[_serverHandleIdleTimeout](); - } - })(); -} - -function createServe(opFn) { - return async function serve(arg1, arg2) { - let options = undefined; - let handler = undefined; - if (typeof arg1 === "function") { - handler = arg1; - options = arg2; - } else if (typeof arg2 === "function") { - handler = arg2; - options = arg1; - } else { - options = arg1; - } - if (handler === undefined) { - if (options === undefined) { - throw new TypeError( - "No handler was provided, so an options bag is mandatory.", - ); - } - handler = options.handler; - } - if (typeof handler !== "function") { - throw new TypeError("A handler function must be provided."); - } - if (options === undefined) { - options = {}; - } - - const signal = options.signal; - - const onError = options.onError ?? function (error) { - console.error(error); - return new Response("Internal Server Error", { status: 500 }); - }; - - const onListen = options.onListen ?? function ({ port }) { - console.log( - `Listening on http://${ - hostnameForDisplay(listenOpts.hostname) - }:${port}/`, - ); - }; - - const listenOpts = { - hostname: options.hostname ?? "127.0.0.1", - port: options.port ?? 9000, - reuseport: options.reusePort ?? false, - }; - if (options.cert || options.key) { - if (!options.cert || !options.key) { - throw new TypeError( - "Both cert and key must be provided to enable HTTPS.", - ); - } - listenOpts.cert = options.cert; - listenOpts.key = options.key; - } - - const serverId = opFn(listenOpts); - const serverPromise = core.opAsync("op_flash_drive_server", serverId); - - PromisePrototypeCatch( - PromisePrototypeThen( - core.opAsync("op_flash_wait_for_listening", serverId), - (port) => { - onListen({ hostname: listenOpts.hostname, port }); - }, - ), - () => {}, - ); - const finishedPromise = PromisePrototypeCatch(serverPromise, () => {}); - - const server = { - id: serverId, - transport: listenOpts.cert && listenOpts.key ? "https" : "http", - hostname: listenOpts.hostname, - port: listenOpts.port, - closed: false, - finished: finishedPromise, - async close() { - if (server.closed) { - return; - } - server.closed = true; - await core.opAsync("op_flash_close_server", serverId); - await server.finished; - }, - async serve() { - let offset = 0; - while (true) { - if (server.closed) { - break; - } - - let tokens = nextRequestSync(); - if (tokens === 0) { - tokens = await core.opAsync("op_flash_next_async", serverId); - if (server.closed) { - break; - } - } - - for (let i = offset; i < offset + tokens; i++) { - let body = null; - // There might be a body, but we don't expose it for GET/HEAD requests. - // It will be closed automatically once the request has been handled and - // the response has been sent. - const method = getMethodSync(i); - let hasBody = method > 2; // Not GET/HEAD/CONNECT - if (hasBody) { - body = createRequestBodyStream(serverId, i); - if (body === null) { - hasBody = false; - } - } - - const req = fromFlashRequest( - serverId, - /* streamRid */ - i, - body, - /* methodCb */ - () => methods[method], - /* urlCb */ - () => { - const path = ops.op_flash_path(serverId, i); - return `${server.transport}://${server.hostname}:${server.port}${path}`; - }, - /* headersCb */ - () => ops.op_flash_headers(serverId, i), - ); - - let resp; - let remoteAddr; - try { - resp = handler(req, { - get remoteAddr() { - if (!remoteAddr) { - const { 0: hostname, 1: port } = core.ops.op_flash_addr( - serverId, - i, - ); - remoteAddr = { hostname, port }; - } - return remoteAddr; - }, - }); - if (ObjectPrototypeIsPrototypeOf(PromisePrototype, resp)) { - PromisePrototypeCatch( - PromisePrototypeThen( - resp, - (resp) => - handleResponse( - req, - resp, - body, - hasBody, - method, - serverId, - i, - respondFast, - respondChunked, - tryRespondChunked, - ), - ), - onError, - ); - } else if (typeof resp?.then === "function") { - resp.then((resp) => - handleResponse( - req, - resp, - body, - hasBody, - method, - serverId, - i, - respondFast, - respondChunked, - tryRespondChunked, - ) - ).catch(onError); - } else { - handleResponse( - req, - resp, - body, - hasBody, - method, - serverId, - i, - respondFast, - respondChunked, - tryRespondChunked, - ).catch(onError); - } - } catch (e) { - resp = await onError(e); - } - } - - offset += tokens; - } - await server.finished; - }, - }; - - signal?.addEventListener("abort", () => { - clearInterval(dateInterval); - PromisePrototypeThen(server.close(), () => {}, () => {}); - }, { - once: true, - }); - - function tryRespondChunked(token, chunk, shutdown) { - const nwritten = ops.op_try_flash_respond_chunked( - serverId, - token, - chunk ?? new Uint8Array(), - shutdown, - ); - if (nwritten > 0) { - return core.opAsync( - "op_flash_respond_chunked", - serverId, - token, - chunk, - shutdown, - nwritten, - ); - } - } - - function respondChunked(token, chunk, shutdown) { - return core.opAsync( - "op_flash_respond_chunked", - serverId, - token, - chunk, - shutdown, - ); - } - - const fastOp = prepareFastCalls(); - let nextRequestSync = () => fastOp.nextRequest(); - let getMethodSync = (token) => fastOp.getMethod(token); - let respondFast = (token, response, shutdown) => - fastOp.respond(token, response, shutdown); - if (serverId > 0) { - nextRequestSync = () => ops.op_flash_next_server(serverId); - getMethodSync = (token) => ops.op_flash_method(serverId, token); - respondFast = (token, response, shutdown) => - ops.op_flash_respond(serverId, token, response, null, shutdown); - } - - if (!dateInterval) { - date = new Date().toUTCString(); - dateInterval = setInterval(() => { - date = new Date().toUTCString(); - }, 1000); - } - - await SafePromiseAll([ - PromisePrototypeCatch(server.serve(), console.error), - serverPromise, - ]); - }; -} - -function createRequestBodyStream(serverId, token) { - // The first packet is left over bytes after parsing the request - const firstRead = ops.op_flash_first_packet( - serverId, - token, - ); - if (!firstRead) return null; - let firstEnqueued = TypedArrayPrototypeGetByteLength(firstRead) === 0; - - return new ReadableStream({ - type: "bytes", - async pull(controller) { - try { - if (firstEnqueued === false) { - controller.enqueue(firstRead); - firstEnqueued = true; - return; - } - // This is the largest possible size for a single packet on a TLS - // stream. - const chunk = new Uint8Array(16 * 1024 + 256); - const read = await core.opAsync( - "op_flash_read_body", - serverId, - token, - chunk, - ); - if (read > 0) { - // We read some data. Enqueue it onto the stream. - controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); - } else { - // We have reached the end of the body, so we close the stream. - controller.close(); - } - } catch (err) { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - controller.close(); - } - }, - }); -} - -function upgradeHttpRaw(req) { - if (!req[_flash]) { - throw new TypeError( - "Non-flash requests can not be upgraded with `upgradeHttpRaw`. Use `upgradeHttp` instead.", - ); - } - - // NOTE(bartlomieju): - // Access these fields so they are cached on `req` object, otherwise - // they wouldn't be available after the connection gets upgraded. - req.url; - req.method; - req.headers; - - const { serverId, streamRid } = req[_flash]; - const connRid = ops.op_flash_upgrade_http(streamRid, serverId); - // TODO(@littledivy): return already read first packet too. - return [new TcpConn(connRid), new Uint8Array()]; -} - -export { createServe, upgradeHttpRaw }; diff --git a/ext/flash/Cargo.toml b/ext/flash/Cargo.toml deleted file mode 100644 index 2bde23826e..0000000000 --- a/ext/flash/Cargo.toml +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -[package] -name = "deno_flash" -version = "0.33.0" -authors.workspace = true -edition.workspace = true -license.workspace = true -readme = "README.md" -repository.workspace = true -description = "Fast HTTP/1 server implementation for Deno" - -[lib] -path = "lib.rs" - -[dependencies] -deno_core.workspace = true -deno_tls.workspace = true -# For HTTP/2 and websocket upgrades -deno_websocket.workspace = true -http.workspace = true -httparse = "1.8" -libc.workspace = true -log.workspace = true -mio = { version = "0.8.1", features = ["os-poll", "net"] } -rustls.workspace = true -rustls-pemfile.workspace = true -serde.workspace = true -socket2.workspace = true -tokio.workspace = true diff --git a/ext/flash/README.md b/ext/flash/README.md deleted file mode 100644 index bc3c12065d..0000000000 --- a/ext/flash/README.md +++ /dev/null @@ -1,7 +0,0 @@ -# flash - -Flash is a fast HTTP/1.1 server implementation for Deno. - -```js -serve({ fetch: (req) => new Response("Hello World") }); -``` diff --git a/ext/flash/chunked.rs b/ext/flash/chunked.rs deleted file mode 100644 index 711dd717d4..0000000000 --- a/ext/flash/chunked.rs +++ /dev/null @@ -1,273 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -// -// Based on https://github.com/frewsxcv/rust-chunked-transfer/blob/5c08614458580f9e7a85124021006d83ce1ed6e9/src/decoder.rs -// Copyright 2015 The tiny-http Contributors -// Copyright 2015 The rust-chunked-transfer Contributors - -use std::error::Error; -use std::fmt; -use std::io::Error as IoError; -use std::io::ErrorKind; -use std::io::Read; -use std::io::Result as IoResult; - -pub struct Decoder { - pub source: R, - - // remaining size of the chunk being read - // none if we are not in a chunk - pub remaining_chunks_size: Option, - pub end: bool, -} - -impl Decoder -where - R: Read, -{ - pub fn new(source: R, remaining_chunks_size: Option) -> Decoder { - Decoder { - source, - remaining_chunks_size, - end: false, - } - } - - fn read_chunk_size(&mut self) -> IoResult { - let mut chunk_size_bytes = Vec::new(); - let mut has_ext = false; - - loop { - let byte = match self.source.by_ref().bytes().next() { - Some(b) => b?, - None => { - return Err(IoError::new(ErrorKind::InvalidInput, DecoderError)) - } - }; - - if byte == b'\r' { - break; - } - - if byte == b';' { - has_ext = true; - break; - } - - chunk_size_bytes.push(byte); - } - - // Ignore extensions for now - if has_ext { - loop { - let byte = match self.source.by_ref().bytes().next() { - Some(b) => b?, - None => { - return Err(IoError::new(ErrorKind::InvalidInput, DecoderError)) - } - }; - if byte == b'\r' { - break; - } - } - } - - self.read_line_feed()?; - - let chunk_size = String::from_utf8(chunk_size_bytes) - .ok() - .and_then(|c| usize::from_str_radix(c.trim(), 16).ok()) - .ok_or_else(|| IoError::new(ErrorKind::InvalidInput, DecoderError))?; - - Ok(chunk_size) - } - - fn read_carriage_return(&mut self) -> IoResult<()> { - match self.source.by_ref().bytes().next() { - Some(Ok(b'\r')) => Ok(()), - _ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)), - } - } - - fn read_line_feed(&mut self) -> IoResult<()> { - match self.source.by_ref().bytes().next() { - Some(Ok(b'\n')) => Ok(()), - _ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)), - } - } -} - -impl Read for Decoder -where - R: Read, -{ - fn read(&mut self, buf: &mut [u8]) -> IoResult { - let remaining_chunks_size = match self.remaining_chunks_size { - Some(c) => c, - None => { - // first possibility: we are not in a chunk, so we'll attempt to determine - // the chunks size - let chunk_size = self.read_chunk_size()?; - - // if the chunk size is 0, we are at EOF - if chunk_size == 0 { - self.read_carriage_return()?; - self.read_line_feed()?; - self.end = true; - return Ok(0); - } - - chunk_size - } - }; - - // second possibility: we continue reading from a chunk - if buf.len() < remaining_chunks_size { - let read = self.source.read(buf)?; - self.remaining_chunks_size = Some(remaining_chunks_size - read); - return Ok(read); - } - - // third possibility: the read request goes further than the current chunk - // we simply read until the end of the chunk and return - let buf = &mut buf[..remaining_chunks_size]; - let read = self.source.read(buf)?; - self.remaining_chunks_size = if read == remaining_chunks_size { - self.read_carriage_return()?; - self.read_line_feed()?; - None - } else { - Some(remaining_chunks_size - read) - }; - - Ok(read) - } -} - -#[derive(Debug, Copy, Clone)] -struct DecoderError; - -impl fmt::Display for DecoderError { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - write!(fmt, "Error while decoding chunks") - } -} - -impl Error for DecoderError { - fn description(&self) -> &str { - "Error while decoding chunks" - } -} - -#[cfg(test)] -mod test { - use super::Decoder; - use std::io; - use std::io::Read; - - /// This unit test is taken from from Hyper - /// https://github.com/hyperium/hyper - /// Copyright (c) 2014 Sean McArthur - #[test] - fn test_read_chunk_size() { - fn read(s: &str, expected: usize) { - let mut decoded = Decoder::new(s.as_bytes(), None); - let actual = decoded.read_chunk_size().unwrap(); - assert_eq!(expected, actual); - } - - fn read_err(s: &str) { - let mut decoded = Decoder::new(s.as_bytes(), None); - let err_kind = decoded.read_chunk_size().unwrap_err().kind(); - assert_eq!(err_kind, io::ErrorKind::InvalidInput); - } - - read("1\r\n", 1); - read("01\r\n", 1); - read("0\r\n", 0); - read("00\r\n", 0); - read("A\r\n", 10); - read("a\r\n", 10); - read("Ff\r\n", 255); - read("Ff \r\n", 255); - // Missing LF or CRLF - read_err("F\rF"); - read_err("F"); - // Invalid hex digit - read_err("X\r\n"); - read_err("1X\r\n"); - read_err("-\r\n"); - read_err("-1\r\n"); - // Acceptable (if not fully valid) extensions do not influence the size - read("1;extension\r\n", 1); - read("a;ext name=value\r\n", 10); - read("1;extension;extension2\r\n", 1); - read("1;;; ;\r\n", 1); - read("2; extension...\r\n", 2); - read("3 ; extension=123\r\n", 3); - read("3 ;\r\n", 3); - read("3 ; \r\n", 3); - // Invalid extensions cause an error - read_err("1 invalid extension\r\n"); - read_err("1 A\r\n"); - read_err("1;no CRLF"); - } - - #[test] - fn test_valid_chunk_decode() { - let source = io::Cursor::new( - "3\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n\r\n" - .to_string() - .into_bytes(), - ); - let mut decoded = Decoder::new(source, None); - - let mut string = String::new(); - decoded.read_to_string(&mut string).unwrap(); - - assert_eq!(string, "hello world!!!"); - } - - #[test] - fn test_decode_zero_length() { - let mut decoder = Decoder::new(b"0\r\n\r\n" as &[u8], None); - - let mut decoded = String::new(); - decoder.read_to_string(&mut decoded).unwrap(); - - assert_eq!(decoded, ""); - } - - #[test] - fn test_decode_invalid_chunk_length() { - let mut decoder = Decoder::new(b"m\r\n\r\n" as &[u8], None); - - let mut decoded = String::new(); - assert!(decoder.read_to_string(&mut decoded).is_err()); - } - - #[test] - fn invalid_input1() { - let source = io::Cursor::new( - "2\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n" - .to_string() - .into_bytes(), - ); - let mut decoded = Decoder::new(source, None); - - let mut string = String::new(); - assert!(decoded.read_to_string(&mut string).is_err()); - } - - #[test] - fn invalid_input2() { - let source = io::Cursor::new( - "3\rhel\r\nb\r\nlo world!!!\r\n0\r\n" - .to_string() - .into_bytes(), - ); - let mut decoded = Decoder::new(source, None); - - let mut string = String::new(); - assert!(decoded.read_to_string(&mut string).is_err()); - } -} diff --git a/ext/flash/lib.rs b/ext/flash/lib.rs deleted file mode 100644 index 647358a4f8..0000000000 --- a/ext/flash/lib.rs +++ /dev/null @@ -1,1543 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -// False positive lint for explicit drops. -// https://github.com/rust-lang/rust-clippy/issues/6446 -#![allow(clippy::await_holding_lock)] -// https://github.com/rust-lang/rust-clippy/issues/6353 -#![allow(clippy::await_holding_refcell_ref)] - -use deno_core::error::generic_error; -use deno_core::error::type_error; -use deno_core::error::AnyError; -use deno_core::op; -use deno_core::serde_v8; -use deno_core::v8; -use deno_core::v8::fast_api; -use deno_core::ByteString; -use deno_core::CancelFuture; -use deno_core::CancelHandle; -use deno_core::OpState; -use deno_core::StringOrBuffer; -use deno_core::ZeroCopyBuf; -use deno_core::V8_WRAPPER_OBJECT_INDEX; -use deno_tls::load_certs; -use deno_tls::load_private_keys; -use http::header::CONNECTION; -use http::header::CONTENT_LENGTH; -use http::header::EXPECT; -use http::header::TRANSFER_ENCODING; -use http::HeaderName; -use http::HeaderValue; -use log::trace; -use mio::net::TcpListener; -use mio::Events; -use mio::Interest; -use mio::Poll; -use mio::Token; -use serde::Deserialize; -use serde::Serialize; -use socket2::Socket; -use std::cell::RefCell; -use std::cell::UnsafeCell; -use std::collections::HashMap; -use std::ffi::c_void; -use std::future::Future; -use std::intrinsics::transmute; -use std::io::BufReader; -use std::io::Read; -use std::io::Write; -use std::mem::replace; -use std::net::SocketAddr; -use std::net::ToSocketAddrs; -use std::pin::Pin; -use std::rc::Rc; -use std::sync::Arc; -use std::sync::Mutex; -use std::task::Context; -use std::time::Duration; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; - -mod chunked; -mod request; -#[cfg(unix)] -mod sendfile; -mod socket; - -use request::InnerRequest; -use request::Request; -use socket::InnerStream; -use socket::Stream; - -pub struct FlashContext { - next_server_id: u32, - join_handles: HashMap>>, - pub servers: HashMap, -} - -pub struct ServerContext { - _addr: SocketAddr, - tx: mpsc::Sender, - rx: mpsc::Receiver, - requests: HashMap, - next_token: u32, - listening_rx: Option>, - close_tx: mpsc::Sender<()>, - cancel_handle: Rc, -} - -#[derive(Debug, Eq, PartialEq)] -pub enum ParseStatus { - None, - Ongoing(usize), -} - -#[op] -fn op_flash_respond( - op_state: &mut OpState, - server_id: u32, - token: u32, - response: StringOrBuffer, - shutdown: bool, -) -> u32 { - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - flash_respond(ctx, token, shutdown, &response) -} - -#[op(fast)] -fn op_try_flash_respond_chunked( - op_state: &mut OpState, - server_id: u32, - token: u32, - response: &[u8], - shutdown: bool, -) -> u32 { - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - let tx = ctx.requests.get(&token).unwrap(); - let sock = tx.socket(); - - // TODO(@littledivy): Use writev when `UnixIoSlice` lands. - // https://github.com/denoland/deno/pull/15629 - let h = format!("{:x}\r\n", response.len()); - - let concat = [h.as_bytes(), response, b"\r\n"].concat(); - let expected = sock.try_write(&concat); - if expected != concat.len() { - if expected > 2 { - return expected as u32; - } - return expected as u32; - } - - if shutdown { - // Best case: We've written everything and the stream is done too. - let _ = ctx.requests.remove(&token).unwrap(); - } - 0 -} - -#[op] -async fn op_flash_respond_async( - state: Rc>, - server_id: u32, - token: u32, - response: StringOrBuffer, - shutdown: bool, -) -> Result<(), AnyError> { - trace!("op_flash_respond_async"); - - let mut close = false; - let sock = { - let mut op_state = state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - - match shutdown { - true => { - let tx = ctx.requests.remove(&token).unwrap(); - close = !tx.keep_alive; - tx.socket() - } - // In case of a websocket upgrade or streaming response. - false => { - let tx = ctx.requests.get(&token).unwrap(); - tx.socket() - } - } - }; - - sock - .with_async_stream(|stream| { - Box::pin(async move { - Ok(tokio::io::AsyncWriteExt::write(stream, &response).await?) - }) - }) - .await?; - // server is done writing and request doesn't want to kept alive. - if shutdown && close { - sock.shutdown(); - } - Ok(()) -} - -#[op] -async fn op_flash_respond_chunked( - op_state: Rc>, - server_id: u32, - token: u32, - response: Option, - shutdown: bool, - nwritten: u32, -) -> Result<(), AnyError> { - let mut op_state = op_state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - let sock = match shutdown { - true => { - let tx = ctx.requests.remove(&token).unwrap(); - tx.socket() - } - // In case of a websocket upgrade or streaming response. - false => { - let tx = ctx.requests.get(&token).unwrap(); - tx.socket() - } - }; - - drop(op_state); - sock - .with_async_stream(|stream| { - Box::pin(async move { - use tokio::io::AsyncWriteExt; - // TODO(@littledivy): Use writev when `UnixIoSlice` lands. - // https://github.com/denoland/deno/pull/15629 - macro_rules! write_whats_not_written { - ($e:expr) => { - let e = $e; - let n = nwritten as usize; - if n < e.len() { - stream.write_all(&e[n..]).await?; - } - }; - } - if let Some(response) = response { - let h = format!("{:x}\r\n", response.len()); - write_whats_not_written!(h.as_bytes()); - write_whats_not_written!(&response); - write_whats_not_written!(b"\r\n"); - } - - // The last chunk - if shutdown { - write_whats_not_written!(b"0\r\n\r\n"); - } - - Ok(()) - }) - }) - .await?; - Ok(()) -} - -#[op] -async fn op_flash_write_resource( - op_state: Rc>, - response: StringOrBuffer, - server_id: u32, - token: u32, - resource_id: deno_core::ResourceId, - auto_close: bool, -) -> Result<(), AnyError> { - let (resource, sock) = { - let op_state = &mut op_state.borrow_mut(); - let resource = if auto_close { - op_state.resource_table.take_any(resource_id)? - } else { - op_state.resource_table.get_any(resource_id)? - }; - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - (resource, ctx.requests.remove(&token).unwrap().socket()) - }; - - let _ = sock.write(&response); - - #[cfg(unix)] - { - use std::os::unix::io::AsRawFd; - if let InnerStream::Tcp(stream_handle) = &sock.inner { - let stream_handle = stream_handle.as_raw_fd(); - if let Some(fd) = resource.clone().backing_fd() { - // SAFETY: all-zero byte-pattern is a valid value for libc::stat. - let mut stat: libc::stat = unsafe { std::mem::zeroed() }; - // SAFETY: call to libc::fstat. - if unsafe { libc::fstat(fd, &mut stat) } >= 0 { - let _ = sock.write( - format!("Content-Length: {}\r\n\r\n", stat.st_size).as_bytes(), - ); - let tx = sendfile::SendFile { - io: (fd, stream_handle), - written: 0, - }; - tx.await?; - return Ok(()); - } - } - } - } - - sock - .with_async_stream(|stream| { - Box::pin(async move { - use tokio::io::AsyncWriteExt; - stream - .write_all(b"Transfer-Encoding: chunked\r\n\r\n") - .await?; - loop { - let view = resource.clone().read(64 * 1024).await?; // 64KB - if view.is_empty() { - stream.write_all(b"0\r\n\r\n").await?; - break; - } - // TODO(@littledivy): use vectored writes. - stream - .write_all(format!("{:x}\r\n", view.len()).as_bytes()) - .await?; - stream.write_all(&view).await?; - stream.write_all(b"\r\n").await?; - } - resource.close(); - Ok(()) - }) - }) - .await?; - Ok(()) -} - -pub const RESPOND_FAST: fast_api::FastFunction = fast_api::FastFunction::new( - &[ - fast_api::Type::V8Value, - fast_api::Type::Uint32, - fast_api::Type::TypedArray(fast_api::CType::Uint8), - fast_api::Type::Bool, - ], - fast_api::CType::Uint32, - op_flash_respond_fast as *const c_void, -); - -fn flash_respond( - ctx: &mut ServerContext, - token: u32, - shutdown: bool, - response: &[u8], -) -> u32 { - let tx = ctx.requests.get(&token).unwrap(); - let sock = tx.socket(); - - sock.read_tx.take(); - sock.read_rx.take(); - - let nwritten = sock.try_write(response); - - if shutdown && nwritten == response.len() { - if !tx.keep_alive { - sock.shutdown(); - } - ctx.requests.remove(&token).unwrap(); - } - - nwritten as u32 -} - -unsafe fn op_flash_respond_fast( - recv: v8::Local, - token: u32, - response: *const fast_api::FastApiTypedArray, - shutdown: bool, -) -> u32 { - let ptr = - recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); - let ctx = &mut *(ptr as *mut ServerContext); - - let response = &*response; - // Uint8Array is always byte-aligned. - let response = response.get_storage_if_aligned().unwrap_unchecked(); - flash_respond(ctx, token, shutdown, response) -} - -macro_rules! get_request { - ($op_state: ident, $token: ident) => { - get_request!($op_state, 0, $token) - }; - ($op_state: ident, $server_id: expr, $token: ident) => {{ - let flash_ctx = $op_state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap(); - ctx.requests.get_mut(&$token).unwrap() - }}; -} - -#[repr(u32)] -pub enum Method { - GET = 0, - HEAD, - CONNECT, - PUT, - DELETE, - OPTIONS, - TRACE, - POST, - PATCH, -} - -#[inline] -fn get_method(req: &mut Request) -> u32 { - let method = match req.method() { - "GET" => Method::GET, - "POST" => Method::POST, - "PUT" => Method::PUT, - "DELETE" => Method::DELETE, - "OPTIONS" => Method::OPTIONS, - "HEAD" => Method::HEAD, - "PATCH" => Method::PATCH, - "TRACE" => Method::TRACE, - "CONNECT" => Method::CONNECT, - _ => Method::GET, - }; - method as u32 -} - -#[op] -fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 { - let req = get_request!(state, server_id, token); - get_method(req) -} - -#[op] -async fn op_flash_close_server(state: Rc>, server_id: u32) { - let close_tx = { - let mut op_state = state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - ctx.cancel_handle.cancel(); - ctx.close_tx.clone() - }; - let _ = close_tx.send(()).await; -} - -#[op] -fn op_flash_path( - state: Rc>, - server_id: u32, - token: u32, -) -> String { - let mut op_state = state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - ctx - .requests - .get(&token) - .unwrap() - .inner - .req - .path - .unwrap() - .to_string() -} - -#[inline] -fn next_request_sync(ctx: &mut ServerContext) -> u32 { - let offset = ctx.next_token; - - while let Ok(token) = ctx.rx.try_recv() { - ctx.requests.insert(ctx.next_token, token); - ctx.next_token += 1; - } - - ctx.next_token - offset -} - -const NEXT_REQUEST_FAST: fast_api::FastFunction = fast_api::FastFunction::new( - &[fast_api::Type::V8Value], - fast_api::CType::Uint32, - op_flash_next_fast as *const c_void, -); - -unsafe fn op_flash_next_fast(recv: v8::Local) -> u32 { - let ptr = - recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); - let ctx = &mut *(ptr as *mut ServerContext); - next_request_sync(ctx) -} - -const GET_METHOD_FAST: fast_api::FastFunction = fast_api::FastFunction::new( - &[fast_api::Type::V8Value, fast_api::Type::Uint32], - fast_api::CType::Uint32, - op_flash_get_method_fast as *const c_void, -); - -unsafe fn op_flash_get_method_fast( - recv: v8::Local, - token: u32, -) -> u32 { - let ptr = - recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX); - let ctx = &mut *(ptr as *mut ServerContext); - let req = ctx.requests.get_mut(&token).unwrap(); - get_method(req) -} - -// Fast calls -#[op(v8)] -fn op_flash_make_request<'scope>( - scope: &mut v8::HandleScope<'scope>, - state: &mut OpState, -) -> serde_v8::Value<'scope> { - let object_template = v8::ObjectTemplate::new(scope); - assert!(object_template - .set_internal_field_count((V8_WRAPPER_OBJECT_INDEX + 1) as usize)); - let obj = object_template.new_instance(scope).unwrap(); - let ctx = { - let flash_ctx = state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&0).unwrap(); - ctx as *mut ServerContext - }; - obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _); - - // nextRequest - { - let builder = v8::FunctionTemplate::builder( - |_: &mut v8::HandleScope, - args: v8::FunctionCallbackArguments, - mut rv: v8::ReturnValue| { - let external: v8::Local = args.data().try_into().unwrap(); - // SAFETY: This external is guaranteed to be a pointer to a ServerContext - let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; - rv.set_uint32(next_request_sync(ctx)); - }, - ) - .data(v8::External::new(scope, ctx as *mut _).into()); - - let func = builder.build_fast(scope, &NEXT_REQUEST_FAST, None, None, None); - let func: v8::Local = func.get_function(scope).unwrap().into(); - - let key = v8::String::new(scope, "nextRequest").unwrap(); - obj.set(scope, key.into(), func).unwrap(); - } - - // getMethod - { - let builder = v8::FunctionTemplate::builder( - |scope: &mut v8::HandleScope, - args: v8::FunctionCallbackArguments, - mut rv: v8::ReturnValue| { - let external: v8::Local = args.data().try_into().unwrap(); - // SAFETY: This external is guaranteed to be a pointer to a ServerContext - let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; - let token = args.get(0).uint32_value(scope).unwrap(); - let req = ctx.requests.get_mut(&token).unwrap(); - rv.set_uint32(get_method(req)); - }, - ) - .data(v8::External::new(scope, ctx as *mut _).into()); - - let func = builder.build_fast(scope, &GET_METHOD_FAST, None, None, None); - let func: v8::Local = func.get_function(scope).unwrap().into(); - - let key = v8::String::new(scope, "getMethod").unwrap(); - obj.set(scope, key.into(), func).unwrap(); - } - - // respond - { - let builder = v8::FunctionTemplate::builder( - |scope: &mut v8::HandleScope, - args: v8::FunctionCallbackArguments, - mut rv: v8::ReturnValue| { - let external: v8::Local = args.data().try_into().unwrap(); - // SAFETY: This external is guaranteed to be a pointer to a ServerContext - let ctx = unsafe { &mut *(external.value() as *mut ServerContext) }; - - let token = args.get(0).uint32_value(scope).unwrap(); - - let response: v8::Local = - args.get(1).try_into().unwrap(); - let ab = response.buffer(scope).unwrap(); - let store = ab.get_backing_store(); - let (offset, len) = (response.byte_offset(), response.byte_length()); - // SAFETY: v8::SharedRef is similar to Arc<[u8]>, - // it points to a fixed continuous slice of bytes on the heap. - // We assume it's initialized and thus safe to read (though may not contain meaningful data) - let response = unsafe { - &*(&store[offset..offset + len] as *const _ as *const [u8]) - }; - - let shutdown = args.get(2).boolean_value(scope); - - rv.set_uint32(flash_respond(ctx, token, shutdown, response)); - }, - ) - .data(v8::External::new(scope, ctx as *mut _).into()); - - let func = builder.build_fast(scope, &RESPOND_FAST, None, None, None); - let func: v8::Local = func.get_function(scope).unwrap().into(); - - let key = v8::String::new(scope, "respond").unwrap(); - obj.set(scope, key.into(), func).unwrap(); - } - - let value: v8::Local = obj.into(); - value.into() -} - -#[inline] -fn has_body_stream(req: &Request) -> bool { - let sock = req.socket(); - sock.read_rx.is_some() -} - -#[op] -fn op_flash_has_body_stream( - op_state: &mut OpState, - server_id: u32, - token: u32, -) -> bool { - let req = get_request!(op_state, server_id, token); - has_body_stream(req) -} - -#[op] -fn op_flash_headers( - state: Rc>, - server_id: u32, - token: u32, -) -> Result, AnyError> { - let mut op_state = state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx - .servers - .get_mut(&server_id) - .ok_or_else(|| type_error("server closed"))?; - let inner_req = &ctx - .requests - .get(&token) - .ok_or_else(|| type_error("request closed"))? - .inner - .req; - Ok( - inner_req - .headers - .iter() - .map(|h| (h.name.as_bytes().into(), h.value.into())) - .collect(), - ) -} - -#[op] -fn op_flash_addr( - state: Rc>, - server_id: u32, - token: u32, -) -> Result<(String, u16), AnyError> { - let mut op_state = state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx - .servers - .get_mut(&server_id) - .ok_or_else(|| type_error("server closed"))?; - let req = &ctx - .requests - .get(&token) - .ok_or_else(|| type_error("request closed"))?; - let socket = req.socket(); - Ok((socket.addr.ip().to_string(), socket.addr.port())) -} - -// Remember the first packet we read? It probably also has some body data. This op quickly copies it into -// a buffer and sets up channels for streaming the rest. -#[op] -fn op_flash_first_packet( - op_state: &mut OpState, - server_id: u32, - token: u32, -) -> Result, AnyError> { - let tx = get_request!(op_state, server_id, token); - let sock = tx.socket(); - - if !tx.te_chunked && tx.content_length.is_none() { - return Ok(None); - } - - if tx.expect_continue { - let _ = sock.write(b"HTTP/1.1 100 Continue\r\n\r\n"); - tx.expect_continue = false; - } - - let buffer = &tx.inner.buffer[tx.inner.body_offset..tx.inner.body_len]; - // Oh there is nothing here. - if buffer.is_empty() { - return Ok(Some(ZeroCopyBuf::empty())); - } - - if tx.te_chunked { - let mut buf = vec![0; 1024]; - let mut offset = 0; - let mut decoder = chunked::Decoder::new( - std::io::Cursor::new(buffer), - tx.remaining_chunk_size, - ); - - loop { - match decoder.read(&mut buf[offset..]) { - Ok(n) => { - tx.remaining_chunk_size = decoder.remaining_chunks_size; - offset += n; - - if n == 0 { - tx.te_chunked = false; - buf.truncate(offset); - return Ok(Some(buf.into())); - } - - if offset < buf.len() - && decoder.source.position() < buffer.len() as u64 - { - continue; - } - - buf.truncate(offset); - return Ok(Some(buf.into())); - } - Err(e) => { - return Err(type_error(format!("{e}"))); - } - } - } - } - - tx.content_length - .ok_or_else(|| type_error("no content-length"))?; - tx.content_read += buffer.len(); - - Ok(Some(buffer.to_vec().into())) -} - -#[op] -async fn op_flash_read_body( - state: Rc>, - server_id: u32, - token: u32, - mut buf: ZeroCopyBuf, -) -> usize { - // SAFETY: we cannot hold op_state borrow across the await point. The JS caller - // is responsible for ensuring this is not called concurrently. - let ctx = unsafe { - { - let op_state = &mut state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - flash_ctx.servers.get_mut(&server_id).unwrap() as *mut ServerContext - } - .as_mut() - .unwrap() - }; - let tx = match ctx.requests.get_mut(&token) { - Some(tx) => tx, - // request was already consumed by caller - None => return 0, - }; - - if tx.te_chunked { - let mut decoder = - chunked::Decoder::new(tx.socket(), tx.remaining_chunk_size); - loop { - let sock = tx.socket(); - - let _lock = sock.read_lock.lock().unwrap(); - match decoder.read(&mut buf) { - Ok(n) => { - tx.remaining_chunk_size = decoder.remaining_chunks_size; - return n; - } - Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => { - panic!("chunked read error: {e}"); - } - Err(_) => { - drop(_lock); - sock.read_rx.as_mut().unwrap().recv().await.unwrap(); - } - } - } - } - - if let Some(content_length) = tx.content_length { - let sock = tx.socket(); - let l = sock.read_lock.clone(); - - loop { - let _lock = l.lock().unwrap(); - if tx.content_read >= content_length as usize { - return 0; - } - match sock.read(&mut buf) { - Ok(n) => { - tx.content_read += n; - return n; - } - _ => { - drop(_lock); - sock.read_rx.as_mut().unwrap().recv().await.unwrap(); - } - } - } - } - - 0 -} - -// https://github.com/hyperium/hyper/blob/0c8ee93d7f557afc63ca2a5686d19071813ab2b7/src/headers.rs#L67 -#[inline] -fn from_digits(bytes: &[u8]) -> Option { - // cannot use FromStr for u64, since it allows a signed prefix - let mut result = 0u64; - const RADIX: u64 = 10; - if bytes.is_empty() { - return None; - } - for &b in bytes { - // can't use char::to_digit, since we haven't verified these bytes - // are utf-8. - match b { - b'0'..=b'9' => { - result = result.checked_mul(RADIX)?; - result = result.checked_add((b - b'0') as u64)?; - } - _ => { - return None; - } - } - } - Some(result) -} - -#[inline] -fn connection_has(value: &HeaderValue, needle: &str) -> bool { - if let Ok(s) = value.to_str() { - for val in s.split(',') { - if val.trim().eq_ignore_ascii_case(needle) { - return true; - } - } - } - false -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct ListenOpts { - cert: Option, - key: Option, - hostname: String, - port: u16, - reuseport: bool, -} - -fn run_server( - tx: mpsc::Sender, - listening_tx: mpsc::Sender, - mut close_rx: mpsc::Receiver<()>, - addr: SocketAddr, - maybe_cert: Option, - maybe_key: Option, - reuseport: bool, -) -> Result<(), AnyError> { - let domain = if addr.is_ipv4() { - socket2::Domain::IPV4 - } else { - socket2::Domain::IPV6 - }; - let socket = Socket::new(domain, socket2::Type::STREAM, None)?; - - #[cfg(not(windows))] - socket.set_reuse_address(true)?; - if reuseport { - #[cfg(target_os = "linux")] - socket.set_reuse_port(true)?; - } - - let socket_addr = socket2::SockAddr::from(addr); - socket.bind(&socket_addr)?; - socket.listen(128)?; - socket.set_nonblocking(true)?; - let std_listener: std::net::TcpListener = socket.into(); - let mut listener = TcpListener::from_std(std_listener); - - let mut poll = Poll::new()?; - let token = Token(0); - poll - .registry() - .register(&mut listener, token, Interest::READABLE) - .unwrap(); - - let tls_context: Option> = { - if let Some(cert) = maybe_cert { - let key = maybe_key.unwrap(); - let certificate_chain: Vec = - load_certs(&mut BufReader::new(cert.as_bytes()))?; - let private_key = load_private_keys(key.as_bytes())?.remove(0); - - let config = rustls::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_single_cert(certificate_chain, private_key) - .expect("invalid key or certificate"); - Some(Arc::new(config)) - } else { - None - } - }; - - listening_tx - .blocking_send(listener.local_addr().unwrap().port()) - .unwrap(); - let mut sockets = HashMap::with_capacity(1000); - let mut counter: usize = 1; - let mut events = Events::with_capacity(1024); - 'outer: loop { - let result = close_rx.try_recv(); - if result.is_ok() { - break 'outer; - } - // FIXME(bartlomieju): how does Tokio handle it? I just put random 100ms - // timeout here to handle close signal. - match poll.poll(&mut events, Some(Duration::from_millis(100))) { - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue, - Err(e) => panic!("{}", e), - Ok(()) => (), - } - 'events: for event in &events { - if close_rx.try_recv().is_ok() { - break 'outer; - } - let token = event.token(); - match token { - Token(0) => loop { - match listener.accept() { - Ok((mut socket, addr)) => { - counter += 1; - let token = Token(counter); - poll - .registry() - .register(&mut socket, token, Interest::READABLE) - .unwrap(); - - let socket = match tls_context { - Some(ref tls_conf) => { - let connection = - rustls::ServerConnection::new(tls_conf.clone()).unwrap(); - InnerStream::Tls(Box::new(rustls::StreamOwned::new( - connection, socket, - ))) - } - None => InnerStream::Tcp(socket), - }; - let stream = Box::pin(Stream { - inner: socket, - detached: false, - read_rx: None, - read_tx: None, - read_lock: Arc::new(Mutex::new(())), - parse_done: ParseStatus::None, - buffer: UnsafeCell::new(vec![0_u8; 1024]), - addr, - }); - - trace!("New connection: {}", token.0); - sockets.insert(token, stream); - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break, - Err(_) => break, - } - }, - token => { - let socket = sockets.get_mut(&token).unwrap(); - // SAFETY: guarantee that we will never move the data out of the mutable reference. - let socket = unsafe { - let mut_ref: Pin<&mut Stream> = Pin::as_mut(socket); - Pin::get_unchecked_mut(mut_ref) - }; - let sock_ptr = socket as *mut _; - - if socket.detached { - match &mut socket.inner { - InnerStream::Tcp(ref mut socket) => { - poll.registry().deregister(socket).unwrap(); - } - InnerStream::Tls(_) => { - todo!("upgrade tls not implemented"); - } - } - - let boxed = sockets.remove(&token).unwrap(); - std::mem::forget(boxed); - trace!("Socket detached: {}", token.0); - continue; - } - - debug_assert!(event.is_readable()); - - trace!("Socket readable: {}", token.0); - if let Some(tx) = &socket.read_tx { - { - let _l = socket.read_lock.lock().unwrap(); - } - trace!("Sending readiness notification: {}", token.0); - let _ = tx.blocking_send(()); - - continue; - } - - let mut headers = vec![httparse::EMPTY_HEADER; 40]; - let mut req = httparse::Request::new(&mut headers); - let body_offset; - let body_len; - loop { - // SAFETY: It is safe for the read buf to be mutable here. - let buffer = unsafe { &mut *socket.buffer.get() }; - let offset = match socket.parse_done { - ParseStatus::None => 0, - ParseStatus::Ongoing(offset) => offset, - }; - if offset >= buffer.len() { - buffer.resize(offset * 2, 0); - } - let nread = socket.read(&mut buffer[offset..]); - - match nread { - Ok(0) => { - trace!("Socket closed: {}", token.0); - // FIXME: don't remove while JS is writing! - // sockets.remove(&token); - continue 'events; - } - Ok(read) => { - match req.parse(&buffer[..offset + read]) { - Ok(httparse::Status::Complete(n)) => { - body_offset = n; - body_len = offset + read; - socket.parse_done = ParseStatus::None; - // On Windows, We must keep calling socket.read() until it fails with WouldBlock. - // - // Mio tries to emulate edge triggered events on Windows. - // AFAICT it only rearms the event on WouldBlock, but it doesn't when a partial read happens. - // https://github.com/denoland/deno/issues/15549 - #[cfg(target_os = "windows")] - match &mut socket.inner { - InnerStream::Tcp(ref mut socket) => { - poll - .registry() - .reregister(socket, token, Interest::READABLE) - .unwrap(); - } - InnerStream::Tls(ref mut socket) => { - poll - .registry() - .reregister( - &mut socket.sock, - token, - Interest::READABLE, - ) - .unwrap(); - } - }; - break; - } - Ok(httparse::Status::Partial) => { - socket.parse_done = ParseStatus::Ongoing(offset + read); - continue; - } - Err(_) => { - let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); - continue 'events; - } - } - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - break 'events - } - Err(_) => break 'events, - } - } - - debug_assert_eq!(socket.parse_done, ParseStatus::None); - if let Some(method) = &req.method { - if method == &"POST" || method == &"PUT" { - let (tx, rx) = mpsc::channel(100); - socket.read_tx = Some(tx); - socket.read_rx = Some(rx); - } - } - - // SAFETY: It is safe for the read buf to be mutable here. - let buffer = unsafe { &mut *socket.buffer.get() }; - let inner_req = InnerRequest { - // SAFETY: backing buffer is pinned and lives as long as the request. - req: unsafe { transmute::, _>(req) }, - // SAFETY: backing buffer is pinned and lives as long as the request. - _headers: unsafe { - transmute::>, _>(headers) - }, - buffer: Pin::new( - replace(buffer, vec![0_u8; 1024]).into_boxed_slice(), - ), - body_offset, - body_len, - }; - // h1 - // https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177 - // https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127 - let mut keep_alive = inner_req.req.version.unwrap() == 1; - let mut expect_continue = false; - let mut te = false; - let mut te_chunked = false; - let mut content_length = None; - for header in inner_req.req.headers.iter() { - match HeaderName::from_bytes(header.name.as_bytes()) { - Ok(CONNECTION) => { - // SAFETY: illegal bytes are validated by httparse. - let value = unsafe { - HeaderValue::from_maybe_shared_unchecked(header.value) - }; - if keep_alive { - // 1.1 - keep_alive = !connection_has(&value, "close"); - } else { - // 1.0 - keep_alive = connection_has(&value, "keep-alive"); - } - } - Ok(TRANSFER_ENCODING) => { - // https://tools.ietf.org/html/rfc7230#section-3.3.3 - debug_assert!(inner_req.req.version.unwrap() == 1); - // Two states for Transfer-Encoding because we want to make sure Content-Length handling knows it. - te = true; - content_length = None; - // SAFETY: illegal bytes are validated by httparse. - let value = unsafe { - HeaderValue::from_maybe_shared_unchecked(header.value) - }; - if let Ok(Some(encoding)) = - value.to_str().map(|s| s.rsplit(',').next()) - { - // Chunked must always be the last encoding - if encoding.trim().eq_ignore_ascii_case("chunked") { - te_chunked = true; - } - } - } - // Transfer-Encoding overrides the Content-Length. - Ok(CONTENT_LENGTH) if !te => { - if let Some(len) = from_digits(header.value) { - if let Some(prev) = content_length { - if prev != len { - let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); - continue 'events; - } - continue; - } - content_length = Some(len); - } else { - let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); - continue 'events; - } - } - Ok(EXPECT) if inner_req.req.version.unwrap() != 0 => { - expect_continue = - header.value.eq_ignore_ascii_case(b"100-continue"); - } - _ => {} - } - } - - // There is Transfer-Encoding but its not chunked. - if te && !te_chunked { - let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n"); - continue 'events; - } - - tx.blocking_send(Request { - socket: sock_ptr, - // SAFETY: headers backing buffer outlives the mio event loop ('static) - inner: inner_req, - keep_alive, - te_chunked, - remaining_chunk_size: None, - content_read: 0, - content_length, - expect_continue, - }) - .ok(); - } - } - } - } - - Ok(()) -} - -fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) { - // Default to localhost if given just the port. Example: ":80" - if hostname.is_empty() { - return ("0.0.0.0", port); - } - - // If this looks like an ipv6 IP address. Example: "[2001:db8::1]" - // Then we remove the brackets. - let addr = hostname.trim_start_matches('[').trim_end_matches(']'); - (addr, port) -} - -/// Resolve network address *synchronously*. -pub fn resolve_addr_sync( - hostname: &str, - port: u16, -) -> Result, AnyError> { - let addr_port_pair = make_addr_port_pair(hostname, port); - let result = addr_port_pair.to_socket_addrs()?; - Ok(result) -} - -fn flash_serve

( - state: &mut OpState, - opts: ListenOpts, -) -> Result -where - P: FlashPermissions + 'static, -{ - state - .borrow_mut::

() - .check_net(&(&opts.hostname, Some(opts.port)), "Deno.serve()")?; - - let addr = resolve_addr_sync(&opts.hostname, opts.port)? - .next() - .ok_or_else(|| generic_error("No resolved address found"))?; - let (tx, rx) = mpsc::channel(100); - let (close_tx, close_rx) = mpsc::channel(1); - let (listening_tx, listening_rx) = mpsc::channel(1); - let ctx = ServerContext { - _addr: addr, - tx, - rx, - requests: HashMap::with_capacity(1000), - next_token: 0, - close_tx, - listening_rx: Some(listening_rx), - cancel_handle: CancelHandle::new_rc(), - }; - let tx = ctx.tx.clone(); - let maybe_cert = opts.cert; - let maybe_key = opts.key; - let reuseport = opts.reuseport; - let join_handle = tokio::task::spawn_blocking(move || { - run_server( - tx, - listening_tx, - close_rx, - addr, - maybe_cert, - maybe_key, - reuseport, - ) - }); - let flash_ctx = state.borrow_mut::(); - let server_id = flash_ctx.next_server_id; - flash_ctx.next_server_id += 1; - flash_ctx.join_handles.insert(server_id, join_handle); - flash_ctx.servers.insert(server_id, ctx); - Ok(server_id) -} - -#[op] -fn op_flash_serve

( - state: &mut OpState, - opts: ListenOpts, -) -> Result -where - P: FlashPermissions + 'static, -{ - check_unstable(state, "Deno.serve"); - flash_serve::

(state, opts) -} - -#[op] -fn op_node_unstable_flash_serve

( - state: &mut OpState, - opts: ListenOpts, -) -> Result -where - P: FlashPermissions + 'static, -{ - flash_serve::

(state, opts) -} - -#[op] -fn op_flash_wait_for_listening( - state: Rc>, - server_id: u32, -) -> Result> + 'static, AnyError> { - let mut listening_rx = { - let mut state = state.borrow_mut(); - let flash_ctx = state.borrow_mut::(); - let server_ctx = flash_ctx - .servers - .get_mut(&server_id) - .ok_or_else(|| type_error("server not found"))?; - server_ctx.listening_rx.take().unwrap() - }; - Ok(async move { - if let Some(port) = listening_rx.recv().await { - Ok(port) - } else { - Err(generic_error("This error will be discarded")) - } - }) -} - -#[op] -fn op_flash_drive_server( - state: Rc>, - server_id: u32, -) -> Result> + 'static, AnyError> { - let join_handle = { - let mut state = state.borrow_mut(); - let flash_ctx = state.borrow_mut::(); - flash_ctx - .join_handles - .remove(&server_id) - .ok_or_else(|| type_error("server not found"))? - }; - Ok(async move { - join_handle - .await - .map_err(|_| type_error("server join error"))??; - Ok(()) - }) -} - -// Asychronous version of op_flash_next. This can be a bottleneck under -// heavy load, it should be used as a fallback if there are no buffered -// requests i.e `op_flash_next() == 0`. -#[op] -async fn op_flash_next_async( - op_state: Rc>, - server_id: u32, -) -> u32 { - let ctx = { - let mut op_state = op_state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - ctx as *mut ServerContext - }; - // SAFETY: we cannot hold op_state borrow across the await point. The JS caller - // is responsible for ensuring this is not called concurrently. - let ctx = unsafe { &mut *ctx }; - let cancel_handle = &ctx.cancel_handle; - - if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await { - ctx.requests.insert(ctx.next_token, req); - ctx.next_token += 1; - return 1; - } - - 0 -} - -// Synchronous version of op_flash_next_async. Under heavy load, -// this can collect buffered requests from rx channel and return tokens in a single batch. -// -// perf: please do not add any arguments to this op. With optimizations enabled, -// the ContextScope creation is optimized away and the op is as simple as: -// f(info: *const v8::FunctionCallbackInfo) { let rv = ...; rv.set_uint32(op_flash_next()); } -#[op] -fn op_flash_next(state: &mut OpState) -> u32 { - let flash_ctx = state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&0).unwrap(); - next_request_sync(ctx) -} - -// Syncrhonous version of op_flash_next_async. Under heavy load, -// this can collect buffered requests from rx channel and return tokens in a single batch. -#[op] -fn op_flash_next_server(state: &mut OpState, server_id: u32) -> u32 { - let flash_ctx = state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - next_request_sync(ctx) -} - -// Wrapper type for tokio::net::TcpStream that implements -// deno_websocket::UpgradedStream -struct UpgradedStream(tokio::net::TcpStream); -impl tokio::io::AsyncRead for UpgradedStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut tokio::io::ReadBuf, - ) -> std::task::Poll> { - Pin::new(&mut self.get_mut().0).poll_read(cx, buf) - } -} - -impl tokio::io::AsyncWrite for UpgradedStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8], - ) -> std::task::Poll> { - Pin::new(&mut self.get_mut().0).poll_write(cx, buf) - } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> std::task::Poll> { - Pin::new(&mut self.get_mut().0).poll_flush(cx) - } - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> std::task::Poll> { - Pin::new(&mut self.get_mut().0).poll_shutdown(cx) - } -} - -impl deno_websocket::Upgraded for UpgradedStream {} - -#[inline] -pub fn detach_socket( - ctx: &mut ServerContext, - token: u32, -) -> Result { - // Two main 'hacks' to get this working: - // * make server thread forget about the socket. `detach_ownership` prevents the socket from being - // dropped on the server thread. - // * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we - // use raw fds. - let tx = ctx - .requests - .remove(&token) - .ok_or_else(|| type_error("request closed"))?; - let stream = tx.socket(); - // prevent socket from being dropped on server thread. - // TODO(@littledivy): Box-ify, since there is no overhead. - stream.detach_ownership(); - - #[cfg(unix)] - let std_stream = { - use std::os::unix::prelude::AsRawFd; - use std::os::unix::prelude::FromRawFd; - let fd = match stream.inner { - InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(), - _ => todo!(), - }; - // SAFETY: `fd` is a valid file descriptor. - unsafe { std::net::TcpStream::from_raw_fd(fd) } - }; - #[cfg(windows)] - let std_stream = { - use std::os::windows::prelude::AsRawSocket; - use std::os::windows::prelude::FromRawSocket; - let fd = match stream.inner { - InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(), - _ => todo!(), - }; - // SAFETY: `fd` is a valid file descriptor. - unsafe { std::net::TcpStream::from_raw_socket(fd) } - }; - let stream = tokio::net::TcpStream::from_std(std_stream)?; - Ok(stream) -} - -#[op] -async fn op_flash_upgrade_websocket( - state: Rc>, - server_id: u32, - token: u32, -) -> Result { - let stream = { - let op_state = &mut state.borrow_mut(); - let flash_ctx = op_state.borrow_mut::(); - detach_socket(flash_ctx.servers.get_mut(&server_id).unwrap(), token)? - }; - deno_websocket::ws_create_server_stream( - &state, - Box::pin(UpgradedStream(stream)), - ) - .await -} - -pub struct Unstable(pub bool); - -fn check_unstable(state: &OpState, api_name: &str) { - let unstable = state.borrow::(); - - if !unstable.0 { - eprintln!( - "Unstable API '{api_name}'. The --unstable flag must be provided." - ); - std::process::exit(70); - } -} - -pub trait FlashPermissions { - fn check_net>( - &mut self, - _host: &(T, Option), - _api_name: &str, - ) -> Result<(), AnyError>; -} - -deno_core::extension!(deno_flash, - deps = [ - deno_web, - deno_net, - deno_fetch, - deno_websocket, - deno_http - ], - parameters = [P: FlashPermissions], - ops = [ - op_flash_serve

, - op_node_unstable_flash_serve

, - op_flash_respond, - op_flash_respond_async, - op_flash_respond_chunked, - op_flash_method, - op_flash_path, - op_flash_headers, - op_flash_addr, - op_flash_next, - op_flash_next_server, - op_flash_next_async, - op_flash_read_body, - op_flash_upgrade_websocket, - op_flash_drive_server, - op_flash_wait_for_listening, - op_flash_first_packet, - op_flash_has_body_stream, - op_flash_close_server, - op_flash_make_request, - op_flash_write_resource, - op_try_flash_respond_chunked, - ], - esm = [ "01_http.js" ], - options = { - unstable: bool, - }, - state = |state, options| { - state.put(Unstable(options.unstable)); - state.put(FlashContext { - next_server_id: 0, - join_handles: HashMap::default(), - servers: HashMap::default(), - }); - }, -); diff --git a/ext/flash/request.rs b/ext/flash/request.rs deleted file mode 100644 index 32ab46ca22..0000000000 --- a/ext/flash/request.rs +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -use crate::Stream; -use std::pin::Pin; - -#[derive(Debug)] -pub struct InnerRequest { - /// Backing buffer for the request. - pub buffer: Pin>, - /// Owned headers, we have to keep it around since its referenced in `req`. - pub _headers: Vec>, - /// Fully parsed request. - pub req: httparse::Request<'static, 'static>, - pub body_offset: usize, - pub body_len: usize, -} - -#[derive(Debug)] -pub struct Request { - pub inner: InnerRequest, - // Pointer to stream owned by the server loop thread. - // - // Dereferencing is safe until server thread finishes and - // op_flash_serve resolves or websocket upgrade is performed. - pub socket: *mut Stream, - pub keep_alive: bool, - pub content_read: usize, - pub content_length: Option, - pub remaining_chunk_size: Option, - pub te_chunked: bool, - pub expect_continue: bool, -} - -// SAFETY: Sent from server thread to JS thread. -// See comment above for `socket`. -unsafe impl Send for Request {} - -impl Request { - #[inline(always)] - pub fn socket<'a>(&self) -> &'a mut Stream { - // SAFETY: Dereferencing is safe until server thread detaches socket or finishes. - unsafe { &mut *self.socket } - } - - #[inline(always)] - pub fn method(&self) -> &str { - self.inner.req.method.unwrap() - } -} diff --git a/ext/flash/sendfile.rs b/ext/flash/sendfile.rs deleted file mode 100644 index 18dc3a39d1..0000000000 --- a/ext/flash/sendfile.rs +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -// Forked from https://github.com/Thomasdezeeuw/sendfile/blob/024f82cd4dede9048392a5bd6d8afcd4d5aa83d5/src/lib.rs - -use std::future::Future; -use std::io; -use std::os::unix::io::RawFd; -use std::pin::Pin; -use std::task::Poll; -use std::task::{self}; - -pub struct SendFile { - pub io: (RawFd, RawFd), - pub written: usize, -} - -impl SendFile { - #[inline] - pub fn try_send(&mut self) -> Result { - #[cfg(target_os = "linux")] - { - // This is the maximum the Linux kernel will write in a single call. - let count = 0x7ffff000; - let mut offset = self.written as libc::off_t; - - let res = - // SAFETY: call to libc::sendfile() - unsafe { libc::sendfile(self.io.1, self.io.0, &mut offset, count) }; - if res == -1 { - Err(io::Error::last_os_error()) - } else { - self.written = offset as usize; - Ok(res as usize) - } - } - #[cfg(target_os = "macos")] - { - // Send all bytes. - let mut length = 0; - // On macOS `length` is value-result parameter. It determines the number - // of bytes to write and returns the number of bytes written also in - // case of `EAGAIN` errors. - // SAFETY: call to libc::sendfile() - let res = unsafe { - libc::sendfile( - self.io.0, - self.io.1, - self.written as libc::off_t, - &mut length, - std::ptr::null_mut(), - 0, - ) - }; - self.written += length as usize; - if res == -1 { - Err(io::Error::last_os_error()) - } else { - Ok(length as usize) - } - } - } -} - -impl Future for SendFile { - type Output = Result<(), std::io::Error>; - - fn poll( - mut self: Pin<&mut Self>, - _: &mut task::Context<'_>, - ) -> Poll { - loop { - match self.try_send() { - Ok(0) => break Poll::Ready(Ok(())), - Ok(_) => continue, - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - break Poll::Pending - } - Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, // Try again. - Err(err) => break Poll::Ready(Err(err)), - } - } - } -} diff --git a/ext/flash/socket.rs b/ext/flash/socket.rs deleted file mode 100644 index cf9501634b..0000000000 --- a/ext/flash/socket.rs +++ /dev/null @@ -1,151 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - -use std::cell::UnsafeCell; -use std::future::Future; -use std::io::Read; -use std::io::Write; -use std::pin::Pin; -use std::sync::Arc; -use std::sync::Mutex; - -use deno_core::error::AnyError; -use mio::net::TcpStream; -use tokio::sync::mpsc; - -use crate::ParseStatus; - -type TlsTcpStream = rustls::StreamOwned; - -pub enum InnerStream { - Tcp(TcpStream), - Tls(Box), -} - -pub struct Stream { - pub inner: InnerStream, - pub detached: bool, - pub read_rx: Option>, - pub read_tx: Option>, - pub parse_done: ParseStatus, - pub buffer: UnsafeCell>, - pub read_lock: Arc>, - pub addr: std::net::SocketAddr, -} - -impl Stream { - pub fn detach_ownership(&mut self) { - self.detached = true; - } - - /// Try to write to the socket. - #[inline] - pub fn try_write(&mut self, buf: &[u8]) -> usize { - let mut nwritten = 0; - while nwritten < buf.len() { - match self.write(&buf[nwritten..]) { - Ok(n) => nwritten += n, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - break; - } - Err(e) => { - log::trace!("Error writing to socket: {}", e); - break; - } - } - } - nwritten - } - - #[inline] - pub fn shutdown(&mut self) { - match &mut self.inner { - InnerStream::Tcp(stream) => { - // Typically shutdown shouldn't fail. - let _ = stream.shutdown(std::net::Shutdown::Both); - } - InnerStream::Tls(stream) => { - let _ = stream.sock.shutdown(std::net::Shutdown::Both); - } - } - } - - pub fn as_std(&mut self) -> std::net::TcpStream { - #[cfg(unix)] - let std_stream = { - use std::os::unix::prelude::AsRawFd; - use std::os::unix::prelude::FromRawFd; - let fd = match self.inner { - InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(), - _ => todo!(), - }; - // SAFETY: `fd` is a valid file descriptor. - unsafe { std::net::TcpStream::from_raw_fd(fd) } - }; - #[cfg(windows)] - let std_stream = { - use std::os::windows::prelude::AsRawSocket; - use std::os::windows::prelude::FromRawSocket; - let fd = match self.inner { - InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(), - _ => todo!(), - }; - // SAFETY: `fd` is a valid file descriptor. - unsafe { std::net::TcpStream::from_raw_socket(fd) } - }; - std_stream - } - - #[inline] - pub async fn with_async_stream(&mut self, f: F) -> Result - where - F: FnOnce( - &mut tokio::net::TcpStream, - ) -> Pin>>>, - { - let mut async_stream = tokio::net::TcpStream::from_std(self.as_std())?; - let result = f(&mut async_stream).await?; - forget_stream(async_stream.into_std()?); - Ok(result) - } -} - -#[inline] -pub fn forget_stream(stream: std::net::TcpStream) { - #[cfg(unix)] - { - use std::os::unix::prelude::IntoRawFd; - let _ = stream.into_raw_fd(); - } - #[cfg(windows)] - { - use std::os::windows::prelude::IntoRawSocket; - let _ = stream.into_raw_socket(); - } -} - -impl Write for Stream { - #[inline] - fn write(&mut self, buf: &[u8]) -> std::io::Result { - match self.inner { - InnerStream::Tcp(ref mut stream) => stream.write(buf), - InnerStream::Tls(ref mut stream) => stream.write(buf), - } - } - #[inline] - fn flush(&mut self) -> std::io::Result<()> { - match self.inner { - InnerStream::Tcp(ref mut stream) => stream.flush(), - InnerStream::Tls(ref mut stream) => stream.flush(), - } - } -} - -impl Read for Stream { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - match self.inner { - InnerStream::Tcp(ref mut stream) => stream.read(buf), - InnerStream::Tls(ref mut stream) => stream.read(buf), - } - } -} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 13bbfa34b7..01675c1204 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -41,7 +41,6 @@ deno_core.workspace = true deno_crypto.workspace = true deno_fetch.workspace = true deno_ffi.workspace = true -deno_flash.workspace = true deno_fs.workspace = true deno_http.workspace = true deno_io.workspace = true @@ -68,7 +67,6 @@ deno_core.workspace = true deno_crypto.workspace = true deno_fetch.workspace = true deno_ffi.workspace = true -deno_flash.workspace = true deno_fs.workspace = true deno_http.workspace = true deno_io.workspace = true diff --git a/runtime/build.rs b/runtime/build.rs index abdd0e5840..809e32a76d 100644 --- a/runtime/build.rs +++ b/runtime/build.rs @@ -120,16 +120,6 @@ mod startup_snapshot { } } - impl deno_flash::FlashPermissions for Permissions { - fn check_net>( - &mut self, - _host: &(T, Option), - _api_name: &str, - ) -> Result<(), deno_core::error::AnyError> { - unreachable!("snapshotting!") - } - } - impl deno_node::NodePermissions for Permissions { fn check_read( &mut self, @@ -244,7 +234,6 @@ mod startup_snapshot { deno_net, deno_napi, deno_http, - deno_flash, deno_io, deno_fs ], @@ -322,7 +311,6 @@ mod startup_snapshot { deno_http::deno_http::init_ops_and_esm(), deno_io::deno_io::init_ops_and_esm(Default::default()), deno_fs::deno_fs::init_ops_and_esm::(false), - deno_flash::deno_flash::init_ops_and_esm::(false), // No --unstable runtime::init_ops_and_esm(), // FIXME(bartlomieju): these extensions are specified last, because they // depend on `runtime`, even though it should be other way around diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index 1eb479114f..bb6ba3b08d 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -9,7 +9,6 @@ import * as ffi from "ext:deno_ffi/00_ffi.js"; import * as net from "ext:deno_net/01_net.js"; import * as tls from "ext:deno_net/02_tls.js"; import * as http from "ext:deno_http/01_http.js"; -import * as flash from "ext:deno_flash/01_http.js"; import * as errors from "ext:runtime/01_errors.js"; import * as version from "ext:runtime/01_version.ts"; import * as permissions from "ext:runtime/10_permissions.js"; @@ -172,7 +171,6 @@ const denoNsUnstable = { funlock: fs.funlock, funlockSync: fs.funlockSync, upgradeHttp: http.upgradeHttp, - upgradeHttpRaw: flash.upgradeHttpRaw, serve: http.serve, openKv: kv.openKv, Kv: kv.Kv, diff --git a/runtime/lib.rs b/runtime/lib.rs index 994e043fd3..6745c4a565 100644 --- a/runtime/lib.rs +++ b/runtime/lib.rs @@ -7,7 +7,6 @@ pub use deno_core; pub use deno_crypto; pub use deno_fetch; pub use deno_ffi; -pub use deno_flash; pub use deno_fs; pub use deno_http; pub use deno_io; diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index 3a316d8003..767fc3ae01 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -29,7 +29,7 @@ use tokio::net::UnixStream; deno_core::extension!( deno_http_runtime, - ops = [op_http_start, op_http_upgrade, op_flash_upgrade_http], + ops = [op_http_start, op_http_upgrade], customizer = |ext: &mut deno_core::ExtensionBuilder| { ext.force_op_registration(); }, @@ -91,23 +91,6 @@ fn op_http_start( Err(bad_resource_id()) } -#[op] -fn op_flash_upgrade_http( - state: &mut OpState, - token: u32, - server_id: u32, -) -> Result { - let flash_ctx = state.borrow_mut::(); - let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); - - let tcp_stream = deno_flash::detach_socket(ctx, token)?; - Ok( - state - .resource_table - .add(TcpStreamResource::new(tcp_stream.into_split())), - ) -} - #[derive(Serialize)] #[serde(rename_all = "camelCase")] pub struct HttpUpgradeResult { diff --git a/runtime/permissions/mod.rs b/runtime/permissions/mod.rs index 2093b08f9a..7e1772ee3a 100644 --- a/runtime/permissions/mod.rs +++ b/runtime/permissions/mod.rs @@ -1826,17 +1826,6 @@ impl PermissionsContainer { } } -impl deno_flash::FlashPermissions for PermissionsContainer { - #[inline(always)] - fn check_net>( - &mut self, - host: &(T, Option), - api_name: &str, - ) -> Result<(), AnyError> { - self.0.lock().net.check(host, Some(api_name)) - } -} - impl deno_node::NodePermissions for PermissionsContainer { #[inline(always)] fn check_read(&mut self, path: &Path) -> Result<(), AnyError> { diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 399b22912f..8bd5cf21e4 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -440,7 +440,6 @@ impl WebWorker { deno_http::deno_http::init_ops(), deno_io::deno_io::init_ops(Some(options.stdio)), deno_fs::deno_fs::init_ops::(unstable), - deno_flash::deno_flash::init_ops::(unstable), deno_node::deno_node::init_ops::( options.npm_resolver, ), diff --git a/runtime/worker.rs b/runtime/worker.rs index 296e9c4b1a..48bf7b09f3 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -264,7 +264,6 @@ impl MainWorker { deno_http::deno_http::init_ops(), deno_io::deno_io::init_ops(Some(options.stdio)), deno_fs::deno_fs::init_ops::(unstable), - deno_flash::deno_flash::init_ops::(unstable), deno_node::deno_node::init_ops::( options.npm_resolver, ),