mirror of
https://github.com/denoland/deno.git
synced 2024-12-28 01:59:06 -05:00
e2761df3fe
Fix internal "upgradeHttpRaw" API restoring capability to upgrade HTTP connection in polyfilles "node:http" API.
654 lines
17 KiB
JavaScript
654 lines
17 KiB
JavaScript
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
|
const core = globalThis.Deno.core;
|
|
const primordials = globalThis.__bootstrap.primordials;
|
|
const internals = globalThis.__bootstrap.internals;
|
|
|
|
const { BadResourcePrototype } = core;
|
|
import { InnerBody } from "ext:deno_fetch/22_body.js";
|
|
import { Event } from "ext:deno_web/02_event.js";
|
|
import {
|
|
fromInnerResponse,
|
|
newInnerResponse,
|
|
toInnerResponse,
|
|
} from "ext:deno_fetch/23_response.js";
|
|
import { fromInnerRequest, toInnerRequest } from "ext:deno_fetch/23_request.js";
|
|
import { AbortController } from "ext:deno_web/03_abort_signal.js";
|
|
import {
|
|
_eventLoop,
|
|
_idleTimeoutDuration,
|
|
_idleTimeoutTimeout,
|
|
_protocol,
|
|
_readyState,
|
|
_rid,
|
|
_role,
|
|
_server,
|
|
_serverHandleIdleTimeout,
|
|
SERVER,
|
|
WebSocket,
|
|
} from "ext:deno_websocket/01_websocket.js";
|
|
import {
|
|
Deferred,
|
|
getReadableStreamResourceBacking,
|
|
readableStreamClose,
|
|
readableStreamForRid,
|
|
ReadableStreamPrototype,
|
|
} from "ext:deno_web/06_streams.js";
|
|
import { TcpConn } from "ext:deno_net/01_net.js";
|
|
const {
|
|
ObjectPrototypeIsPrototypeOf,
|
|
SafeSet,
|
|
SafeSetIterator,
|
|
SetPrototypeAdd,
|
|
SetPrototypeDelete,
|
|
Symbol,
|
|
TypeError,
|
|
Uint8ArrayPrototype,
|
|
Uint8Array,
|
|
} = primordials;
|
|
|
|
const _upgraded = Symbol("_upgraded");
|
|
|
|
function internalServerError() {
|
|
// "Internal Server Error"
|
|
return new Response(
|
|
new Uint8Array([
|
|
73,
|
|
110,
|
|
116,
|
|
101,
|
|
114,
|
|
110,
|
|
97,
|
|
108,
|
|
32,
|
|
83,
|
|
101,
|
|
114,
|
|
118,
|
|
101,
|
|
114,
|
|
32,
|
|
69,
|
|
114,
|
|
114,
|
|
111,
|
|
114,
|
|
]),
|
|
{ status: 500 },
|
|
);
|
|
}
|
|
|
|
// Used to ensure that user returns a valid response (but not a different response) from handlers that are upgraded.
|
|
const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse(
|
|
newInnerResponse(101),
|
|
"immutable",
|
|
);
|
|
|
|
function upgradeHttpRaw(req, conn) {
|
|
const inner = toInnerRequest(req);
|
|
if (inner._wantsUpgrade) {
|
|
return inner._wantsUpgrade("upgradeHttpRaw", conn);
|
|
}
|
|
throw new TypeError("upgradeHttpRaw may only be used with Deno.serve");
|
|
}
|
|
|
|
class InnerRequest {
|
|
#slabId;
|
|
#context;
|
|
#methodAndUri;
|
|
#streamRid;
|
|
#body;
|
|
#upgraded;
|
|
|
|
constructor(slabId, context) {
|
|
this.#slabId = slabId;
|
|
this.#context = context;
|
|
this.#upgraded = false;
|
|
}
|
|
|
|
close() {
|
|
if (this.#streamRid !== undefined) {
|
|
core.close(this.#streamRid);
|
|
this.#streamRid = undefined;
|
|
}
|
|
this.#slabId = undefined;
|
|
}
|
|
|
|
get [_upgraded]() {
|
|
return this.#upgraded;
|
|
}
|
|
|
|
_wantsUpgrade(upgradeType, ...originalArgs) {
|
|
if (this.#upgraded) {
|
|
throw new Deno.errors.Http("already upgraded");
|
|
}
|
|
if (this.#slabId === undefined) {
|
|
throw new Deno.errors.Http("already closed");
|
|
}
|
|
|
|
// upgradeHttp is async
|
|
// TODO(mmastrac)
|
|
if (upgradeType == "upgradeHttp") {
|
|
throw "upgradeHttp is unavailable in Deno.serve at this time";
|
|
}
|
|
|
|
// upgradeHttpRaw is sync
|
|
if (upgradeType == "upgradeHttpRaw") {
|
|
const slabId = this.#slabId;
|
|
const underlyingConn = originalArgs[0];
|
|
|
|
this.url();
|
|
this.headerList;
|
|
this.close();
|
|
|
|
this.#upgraded = () => {};
|
|
|
|
const upgradeRid = core.ops.op_upgrade_raw(slabId);
|
|
|
|
const conn = new TcpConn(
|
|
upgradeRid,
|
|
underlyingConn?.remoteAddr,
|
|
underlyingConn?.localAddr,
|
|
);
|
|
|
|
return { response: UPGRADE_RESPONSE_SENTINEL, conn };
|
|
}
|
|
|
|
// upgradeWebSocket is sync
|
|
if (upgradeType == "upgradeWebSocket") {
|
|
const response = originalArgs[0];
|
|
const ws = originalArgs[1];
|
|
|
|
const slabId = this.#slabId;
|
|
|
|
this.url();
|
|
this.headerList;
|
|
this.close();
|
|
|
|
const goAhead = new Deferred();
|
|
this.#upgraded = () => {
|
|
goAhead.resolve();
|
|
};
|
|
|
|
// Start the upgrade in the background.
|
|
(async () => {
|
|
try {
|
|
// Returns the connection and extra bytes, which we can pass directly to op_ws_server_create
|
|
const upgrade = await core.opAsync2(
|
|
"op_upgrade",
|
|
slabId,
|
|
response.headerList,
|
|
);
|
|
const wsRid = core.ops.op_ws_server_create(upgrade[0], upgrade[1]);
|
|
|
|
// We have to wait for the go-ahead signal
|
|
await goAhead;
|
|
|
|
ws[_rid] = wsRid;
|
|
ws[_readyState] = WebSocket.OPEN;
|
|
ws[_role] = SERVER;
|
|
const event = new Event("open");
|
|
ws.dispatchEvent(event);
|
|
|
|
ws[_eventLoop]();
|
|
if (ws[_idleTimeoutDuration]) {
|
|
ws.addEventListener(
|
|
"close",
|
|
() => clearTimeout(ws[_idleTimeoutTimeout]),
|
|
);
|
|
}
|
|
ws[_serverHandleIdleTimeout]();
|
|
} catch (error) {
|
|
const event = new ErrorEvent("error", { error });
|
|
ws.dispatchEvent(event);
|
|
}
|
|
})();
|
|
return { response: UPGRADE_RESPONSE_SENTINEL, socket: ws };
|
|
}
|
|
}
|
|
|
|
url() {
|
|
if (this.#methodAndUri === undefined) {
|
|
if (this.#slabId === undefined) {
|
|
throw new TypeError("request closed");
|
|
}
|
|
// TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider
|
|
// splitting this up into multiple ops.
|
|
this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId);
|
|
}
|
|
|
|
const path = this.#methodAndUri[2];
|
|
|
|
// * is valid for OPTIONS
|
|
if (path === "*") {
|
|
return "*";
|
|
}
|
|
|
|
// If the path is empty, return the authority (valid for CONNECT)
|
|
if (path == "") {
|
|
return this.#methodAndUri[1];
|
|
}
|
|
|
|
// CONNECT requires an authority
|
|
if (this.#methodAndUri[0] == "CONNECT") {
|
|
return this.#methodAndUri[1];
|
|
}
|
|
|
|
const hostname = this.#methodAndUri[1];
|
|
if (hostname) {
|
|
// Construct a URL from the scheme, the hostname, and the path
|
|
return this.#context.scheme + hostname + path;
|
|
}
|
|
|
|
// Construct a URL from the scheme, the fallback hostname, and the path
|
|
return this.#context.scheme + this.#context.fallbackHost + path;
|
|
}
|
|
|
|
get remoteAddr() {
|
|
if (this.#methodAndUri === undefined) {
|
|
if (this.#slabId === undefined) {
|
|
throw new TypeError("request closed");
|
|
}
|
|
this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId);
|
|
}
|
|
return {
|
|
transport: "tcp",
|
|
hostname: this.#methodAndUri[3],
|
|
port: this.#methodAndUri[4],
|
|
};
|
|
}
|
|
|
|
get method() {
|
|
if (this.#methodAndUri === undefined) {
|
|
if (this.#slabId === undefined) {
|
|
throw new TypeError("request closed");
|
|
}
|
|
this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId);
|
|
}
|
|
return this.#methodAndUri[0];
|
|
}
|
|
|
|
get body() {
|
|
if (this.#slabId === undefined) {
|
|
throw new TypeError("request closed");
|
|
}
|
|
if (this.#body !== undefined) {
|
|
return this.#body;
|
|
}
|
|
// If the method is GET or HEAD, we do not want to include a body here, even if the Rust
|
|
// side of the code is willing to provide it to us.
|
|
if (this.method == "GET" || this.method == "HEAD") {
|
|
this.#body = null;
|
|
return null;
|
|
}
|
|
this.#streamRid = core.ops.op_read_request_body(this.#slabId);
|
|
this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false));
|
|
return this.#body;
|
|
}
|
|
|
|
get headerList() {
|
|
if (this.#slabId === undefined) {
|
|
throw new TypeError("request closed");
|
|
}
|
|
return core.ops.op_get_request_headers(this.#slabId);
|
|
}
|
|
|
|
get slabId() {
|
|
return this.#slabId;
|
|
}
|
|
}
|
|
|
|
class CallbackContext {
|
|
scheme;
|
|
fallbackHost;
|
|
serverRid;
|
|
closed;
|
|
|
|
initialize(args) {
|
|
this.serverRid = args[0];
|
|
this.scheme = args[1];
|
|
this.fallbackHost = args[2];
|
|
this.closed = false;
|
|
}
|
|
|
|
close() {
|
|
try {
|
|
this.closed = true;
|
|
core.tryClose(this.serverRid);
|
|
} catch {
|
|
// Pass
|
|
}
|
|
}
|
|
}
|
|
|
|
function fastSyncResponseOrStream(req, respBody) {
|
|
if (respBody === null || respBody === undefined) {
|
|
// Don't set the body
|
|
return null;
|
|
}
|
|
|
|
const stream = respBody.streamOrStatic;
|
|
const body = stream.body;
|
|
|
|
if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) {
|
|
core.ops.op_set_response_body_bytes(req, body);
|
|
return null;
|
|
}
|
|
|
|
if (typeof body === "string") {
|
|
core.ops.op_set_response_body_text(req, body);
|
|
return null;
|
|
}
|
|
|
|
// At this point in the response it needs to be a stream
|
|
if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) {
|
|
throw TypeError("invalid response");
|
|
}
|
|
const resourceBacking = getReadableStreamResourceBacking(stream);
|
|
if (resourceBacking) {
|
|
core.ops.op_set_response_body_resource(
|
|
req,
|
|
resourceBacking.rid,
|
|
resourceBacking.autoClose,
|
|
);
|
|
return null;
|
|
}
|
|
|
|
return stream;
|
|
}
|
|
|
|
async function asyncResponse(responseBodies, req, status, stream) {
|
|
const reader = stream.getReader();
|
|
let responseRid;
|
|
let closed = false;
|
|
let timeout;
|
|
|
|
try {
|
|
// IMPORTANT: We get a performance boost from this optimization, but V8 is very
|
|
// sensitive to the order and structure. Benchmark any changes to this code.
|
|
|
|
// Optimize for streams that are done in zero or one packets. We will not
|
|
// have to allocate a resource in this case.
|
|
const { value: value1, done: done1 } = await reader.read();
|
|
if (done1) {
|
|
closed = true;
|
|
// Exit 1: no response body at all, extreme fast path
|
|
// Reader will be closed by finally block
|
|
return;
|
|
}
|
|
|
|
// The second value cannot block indefinitely, as someone may be waiting on a response
|
|
// of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms
|
|
// and we race it.
|
|
let timeoutPromise;
|
|
timeout = setTimeout(() => {
|
|
responseRid = core.ops.op_set_response_body_stream(req);
|
|
SetPrototypeAdd(responseBodies, responseRid);
|
|
core.ops.op_set_promise_complete(req, status);
|
|
timeoutPromise = core.writeAll(responseRid, value1);
|
|
}, 250);
|
|
const { value: value2, done: done2 } = await reader.read();
|
|
|
|
if (timeoutPromise) {
|
|
await timeoutPromise;
|
|
if (done2) {
|
|
closed = true;
|
|
// Exit 2(a): read 2 is EOS, and timeout resolved.
|
|
// Reader will be closed by finally block
|
|
// Response stream will be closed by finally block.
|
|
return;
|
|
}
|
|
|
|
// Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward.
|
|
} else {
|
|
clearTimeout(timeout);
|
|
timeout = undefined;
|
|
|
|
if (done2) {
|
|
// Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough.
|
|
// Reader will be closed by finally block
|
|
// No response stream
|
|
closed = true;
|
|
core.ops.op_set_response_body_bytes(req, value1);
|
|
return;
|
|
}
|
|
|
|
responseRid = core.ops.op_set_response_body_stream(req);
|
|
SetPrototypeAdd(responseBodies, responseRid);
|
|
core.ops.op_set_promise_complete(req, status);
|
|
// Write our first packet
|
|
await core.writeAll(responseRid, value1);
|
|
}
|
|
|
|
await core.writeAll(responseRid, value2);
|
|
while (true) {
|
|
const { value, done } = await reader.read();
|
|
if (done) {
|
|
closed = true;
|
|
break;
|
|
}
|
|
await core.writeAll(responseRid, value);
|
|
}
|
|
} catch (error) {
|
|
closed = true;
|
|
try {
|
|
await reader.cancel(error);
|
|
} catch {
|
|
// Pass
|
|
}
|
|
} finally {
|
|
if (!closed) {
|
|
readableStreamClose(reader);
|
|
}
|
|
if (timeout !== undefined) {
|
|
clearTimeout(timeout);
|
|
}
|
|
if (responseRid) {
|
|
core.tryClose(responseRid);
|
|
SetPrototypeDelete(responseBodies, responseRid);
|
|
} else {
|
|
core.ops.op_set_promise_complete(req, status);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Maps the incoming request slab ID to a fully-fledged Request object, passes it to the user-provided
|
|
* callback, then extracts the response that was returned from that callback. The response is then pulled
|
|
* apart and handled on the Rust side.
|
|
*
|
|
* This function returns a promise that will only reject in the case of abnormal exit.
|
|
*/
|
|
function mapToCallback(responseBodies, context, signal, callback, onError) {
|
|
return async function (req) {
|
|
// Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
|
|
// 500 error.
|
|
let innerRequest;
|
|
let response;
|
|
try {
|
|
if (callback.length > 0) {
|
|
innerRequest = new InnerRequest(req, context);
|
|
const request = fromInnerRequest(innerRequest, signal, "immutable");
|
|
if (callback.length === 1) {
|
|
response = await callback(request);
|
|
} else {
|
|
response = await callback(request, {
|
|
get remoteAddr() {
|
|
return innerRequest.remoteAddr;
|
|
},
|
|
});
|
|
}
|
|
} else {
|
|
response = await callback();
|
|
}
|
|
} catch (error) {
|
|
try {
|
|
response = await onError(error);
|
|
} catch (error) {
|
|
console.error("Exception in onError while handling exception", error);
|
|
response = internalServerError();
|
|
}
|
|
}
|
|
|
|
const inner = toInnerResponse(response);
|
|
if (innerRequest?.[_upgraded]) {
|
|
// We're done here as the connection has been upgraded during the callback and no longer requires servicing.
|
|
if (response !== UPGRADE_RESPONSE_SENTINEL) {
|
|
console.error("Upgrade response was not returned from callback");
|
|
context.close();
|
|
}
|
|
innerRequest?.[_upgraded]();
|
|
return;
|
|
}
|
|
|
|
// Did everything shut down while we were waiting?
|
|
if (context.closed) {
|
|
innerRequest?.close();
|
|
return;
|
|
}
|
|
|
|
const status = inner.status;
|
|
const headers = inner.headerList;
|
|
if (headers && headers.length > 0) {
|
|
if (headers.length == 1) {
|
|
core.ops.op_set_response_header(req, headers[0][0], headers[0][1]);
|
|
} else {
|
|
core.ops.op_set_response_headers(req, headers);
|
|
}
|
|
}
|
|
|
|
// Attempt to response quickly to this request, otherwise extract the stream
|
|
const stream = fastSyncResponseOrStream(req, inner.body);
|
|
if (stream !== null) {
|
|
// Handle the stream asynchronously
|
|
await asyncResponse(responseBodies, req, status, stream);
|
|
} else {
|
|
core.ops.op_set_promise_complete(req, status);
|
|
}
|
|
|
|
innerRequest?.close();
|
|
};
|
|
}
|
|
|
|
async function serve(arg1, arg2) {
|
|
let options = undefined;
|
|
let handler = undefined;
|
|
if (typeof arg1 === "function") {
|
|
handler = arg1;
|
|
} else if (typeof arg2 === "function") {
|
|
handler = arg2;
|
|
options = arg1;
|
|
} else {
|
|
options = arg1;
|
|
}
|
|
if (handler === undefined) {
|
|
if (options === undefined) {
|
|
throw new TypeError(
|
|
"No handler was provided, so an options bag is mandatory.",
|
|
);
|
|
}
|
|
handler = options.handler;
|
|
}
|
|
if (typeof handler !== "function") {
|
|
throw new TypeError("A handler function must be provided.");
|
|
}
|
|
if (options === undefined) {
|
|
options = {};
|
|
}
|
|
|
|
const wantsHttps = options.cert || options.key;
|
|
const signal = options.signal;
|
|
const onError = options.onError ?? function (error) {
|
|
console.error(error);
|
|
return internalServerError();
|
|
};
|
|
const listenOpts = {
|
|
hostname: options.hostname ?? "0.0.0.0",
|
|
port: options.port ?? (wantsHttps ? 9000 : 8000),
|
|
reusePort: options.reusePort ?? false,
|
|
};
|
|
|
|
const abortController = new AbortController();
|
|
|
|
const responseBodies = new SafeSet();
|
|
const context = new CallbackContext();
|
|
const callback = mapToCallback(
|
|
responseBodies,
|
|
context,
|
|
abortController.signal,
|
|
handler,
|
|
onError,
|
|
);
|
|
|
|
if (wantsHttps) {
|
|
if (!options.cert || !options.key) {
|
|
throw new TypeError(
|
|
"Both cert and key must be provided to enable HTTPS.",
|
|
);
|
|
}
|
|
listenOpts.cert = options.cert;
|
|
listenOpts.key = options.key;
|
|
listenOpts.alpnProtocols = ["h2", "http/1.1"];
|
|
const listener = Deno.listenTls(listenOpts);
|
|
listenOpts.port = listener.addr.port;
|
|
context.initialize(core.ops.op_serve_http(
|
|
listener.rid,
|
|
));
|
|
} else {
|
|
const listener = Deno.listen(listenOpts);
|
|
listenOpts.port = listener.addr.port;
|
|
context.initialize(core.ops.op_serve_http(
|
|
listener.rid,
|
|
));
|
|
}
|
|
|
|
signal?.addEventListener(
|
|
"abort",
|
|
() => context.close(),
|
|
{ once: true },
|
|
);
|
|
|
|
const onListen = options.onListen ?? function ({ port }) {
|
|
// If the hostname is "0.0.0.0", we display "localhost" in console
|
|
// because browsers in Windows don't resolve "0.0.0.0".
|
|
// See the discussion in https://github.com/denoland/deno_std/issues/1165
|
|
const hostname = listenOpts.hostname == "0.0.0.0"
|
|
? "localhost"
|
|
: listenOpts.hostname;
|
|
console.log(`Listening on ${context.scheme}${hostname}:${port}/`);
|
|
};
|
|
|
|
onListen({ port: listenOpts.port });
|
|
|
|
while (true) {
|
|
const rid = context.serverRid;
|
|
let req;
|
|
try {
|
|
req = await core.opAsync2("op_http_wait", rid);
|
|
} catch (error) {
|
|
if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) {
|
|
break;
|
|
}
|
|
throw new Deno.errors.Http(error);
|
|
}
|
|
if (req === 0xffffffff) {
|
|
break;
|
|
}
|
|
callback(req).catch((error) => {
|
|
// Abnormal exit
|
|
console.error(
|
|
"Terminating Deno.serve loop due to unexpected error",
|
|
error,
|
|
);
|
|
context.close();
|
|
});
|
|
}
|
|
|
|
for (const streamRid of new SafeSetIterator(responseBodies)) {
|
|
core.tryClose(streamRid);
|
|
}
|
|
}
|
|
|
|
internals.upgradeHttpRaw = upgradeHttpRaw;
|
|
|
|
export { serve, upgradeHttpRaw };
|