mirror of
https://github.com/denoland/deno.git
synced 2024-11-25 15:29:32 -05:00
405 lines
12 KiB
JavaScript
405 lines
12 KiB
JavaScript
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
|
|
"use strict";
|
|
|
|
((window) => {
|
|
const webidl = window.__bootstrap.webidl;
|
|
const { InnerBody } = window.__bootstrap.fetchBody;
|
|
const { setEventTargetData } = window.__bootstrap.eventTarget;
|
|
const {
|
|
Response,
|
|
fromInnerRequest,
|
|
toInnerResponse,
|
|
newInnerRequest,
|
|
newInnerResponse,
|
|
fromInnerResponse,
|
|
} = window.__bootstrap.fetch;
|
|
const core = window.Deno.core;
|
|
const { BadResource, Interrupted } = core;
|
|
const { ReadableStream } = window.__bootstrap.streams;
|
|
const abortSignal = window.__bootstrap.abortSignal;
|
|
const { WebSocket, _rid, _readyState, _eventLoop, _protocol, _server } =
|
|
window.__bootstrap.webSocket;
|
|
const {
|
|
ArrayPrototypeIncludes,
|
|
ArrayPrototypePush,
|
|
ArrayPrototypeSome,
|
|
Promise,
|
|
Set,
|
|
SetPrototypeAdd,
|
|
SetPrototypeDelete,
|
|
SetPrototypeValues,
|
|
StringPrototypeIncludes,
|
|
StringPrototypeToLowerCase,
|
|
StringPrototypeSplit,
|
|
Symbol,
|
|
SymbolAsyncIterator,
|
|
TypedArrayPrototypeSubarray,
|
|
TypeError,
|
|
Uint8Array,
|
|
} = window.__bootstrap.primordials;
|
|
|
|
const connErrorSymbol = Symbol("connError");
|
|
|
|
class HttpConn {
|
|
#rid = 0;
|
|
// This set holds resource ids of resources
|
|
// that were created during lifecycle of this request.
|
|
// When the connection is closed these resources should be closed
|
|
// as well.
|
|
managedResources = new Set();
|
|
|
|
constructor(rid) {
|
|
this.#rid = rid;
|
|
}
|
|
|
|
/** @returns {number} */
|
|
get rid() {
|
|
return this.#rid;
|
|
}
|
|
|
|
/** @returns {Promise<ResponseEvent | null>} */
|
|
async nextRequest() {
|
|
let nextRequest;
|
|
try {
|
|
nextRequest = await core.opAsync(
|
|
"op_http_request_next",
|
|
this.#rid,
|
|
);
|
|
} catch (error) {
|
|
// A connection error seen here would cause disrupted responses to throw
|
|
// a generic `BadResource` error. Instead store this error and replace
|
|
// those with it.
|
|
this[connErrorSymbol] = error;
|
|
if (error instanceof BadResource) {
|
|
return null;
|
|
} else if (error instanceof Interrupted) {
|
|
return null;
|
|
} else if (
|
|
StringPrototypeIncludes(error.message, "connection closed")
|
|
) {
|
|
return null;
|
|
}
|
|
throw error;
|
|
}
|
|
if (nextRequest === null) return null;
|
|
|
|
const [
|
|
requestRid,
|
|
responseSenderRid,
|
|
method,
|
|
headersList,
|
|
url,
|
|
] = nextRequest;
|
|
|
|
/** @type {ReadableStream<Uint8Array> | undefined} */
|
|
let body = null;
|
|
if (typeof requestRid === "number") {
|
|
SetPrototypeAdd(this.managedResources, requestRid);
|
|
body = createRequestBodyStream(this, requestRid);
|
|
}
|
|
|
|
const innerRequest = newInnerRequest(
|
|
method,
|
|
url,
|
|
headersList,
|
|
body !== null ? new InnerBody(body) : null,
|
|
);
|
|
const signal = abortSignal.newSignal();
|
|
const request = fromInnerRequest(innerRequest, signal, "immutable");
|
|
|
|
SetPrototypeAdd(this.managedResources, responseSenderRid);
|
|
const respondWith = createRespondWith(
|
|
this,
|
|
responseSenderRid,
|
|
requestRid,
|
|
);
|
|
|
|
return { request, respondWith };
|
|
}
|
|
|
|
/** @returns {void} */
|
|
close() {
|
|
for (const rid of SetPrototypeValues(this.managedResources)) {
|
|
core.tryClose(rid);
|
|
}
|
|
core.close(this.#rid);
|
|
}
|
|
|
|
[SymbolAsyncIterator]() {
|
|
// deno-lint-ignore no-this-alias
|
|
const httpConn = this;
|
|
return {
|
|
async next() {
|
|
const reqEvt = await httpConn.nextRequest();
|
|
// Change with caution, current form avoids a v8 deopt
|
|
return { value: reqEvt, done: reqEvt === null };
|
|
},
|
|
};
|
|
}
|
|
}
|
|
|
|
function readRequest(requestRid, zeroCopyBuf) {
|
|
return core.opAsync(
|
|
"op_http_request_read",
|
|
requestRid,
|
|
zeroCopyBuf,
|
|
);
|
|
}
|
|
|
|
function createRespondWith(httpConn, responseSenderRid, requestRid) {
|
|
return async function respondWith(resp) {
|
|
if (resp instanceof Promise) {
|
|
resp = await resp;
|
|
}
|
|
|
|
if (!(resp instanceof Response)) {
|
|
throw new TypeError(
|
|
"First argument to respondWith must be a Response or a promise resolving to a Response.",
|
|
);
|
|
}
|
|
|
|
const innerResp = toInnerResponse(resp);
|
|
|
|
// If response body length is known, it will be sent synchronously in a
|
|
// single op, in other case a "response body" resource will be created and
|
|
// we'll be streaming it.
|
|
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
|
|
let respBody = null;
|
|
if (innerResp.body !== null) {
|
|
if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
|
|
if (innerResp.body.streamOrStatic instanceof ReadableStream) {
|
|
if (
|
|
innerResp.body.length === null ||
|
|
innerResp.body.source instanceof Blob
|
|
) {
|
|
respBody = innerResp.body.stream;
|
|
} else {
|
|
const reader = innerResp.body.stream.getReader();
|
|
const r1 = await reader.read();
|
|
if (r1.done) {
|
|
respBody = new Uint8Array(0);
|
|
} else {
|
|
respBody = r1.value;
|
|
const r2 = await reader.read();
|
|
if (!r2.done) throw new TypeError("Unreachable");
|
|
}
|
|
}
|
|
} else {
|
|
innerResp.body.streamOrStatic.consumed = true;
|
|
respBody = innerResp.body.streamOrStatic.body;
|
|
}
|
|
} else {
|
|
respBody = new Uint8Array(0);
|
|
}
|
|
|
|
SetPrototypeDelete(httpConn.managedResources, responseSenderRid);
|
|
let responseBodyRid;
|
|
try {
|
|
responseBodyRid = await core.opAsync("op_http_response", [
|
|
responseSenderRid,
|
|
innerResp.status ?? 200,
|
|
innerResp.headerList,
|
|
], respBody instanceof Uint8Array ? respBody : null);
|
|
} catch (error) {
|
|
const connError = httpConn[connErrorSymbol];
|
|
if (error instanceof BadResource && connError != null) {
|
|
// deno-lint-ignore no-ex-assign
|
|
error = new connError.constructor(connError.message);
|
|
}
|
|
if (respBody !== null && respBody instanceof ReadableStream) {
|
|
await respBody.cancel(error);
|
|
}
|
|
throw error;
|
|
}
|
|
|
|
// If `respond` returns a responseBodyRid, we should stream the body
|
|
// to that resource.
|
|
if (responseBodyRid !== null) {
|
|
SetPrototypeAdd(httpConn.managedResources, responseBodyRid);
|
|
try {
|
|
if (respBody === null || !(respBody instanceof ReadableStream)) {
|
|
throw new TypeError("Unreachable");
|
|
}
|
|
const reader = respBody.getReader();
|
|
while (true) {
|
|
const { value, done } = await reader.read();
|
|
if (done) break;
|
|
if (!(value instanceof Uint8Array)) {
|
|
await reader.cancel(new TypeError("Value not a Uint8Array"));
|
|
break;
|
|
}
|
|
try {
|
|
await core.opAsync(
|
|
"op_http_response_write",
|
|
responseBodyRid,
|
|
value,
|
|
);
|
|
} catch (error) {
|
|
const connError = httpConn[connErrorSymbol];
|
|
if (error instanceof BadResource && connError != null) {
|
|
// deno-lint-ignore no-ex-assign
|
|
error = new connError.constructor(connError.message);
|
|
}
|
|
await reader.cancel(error);
|
|
throw error;
|
|
}
|
|
}
|
|
} finally {
|
|
// Once all chunks are sent, and the request body is closed, we can
|
|
// close the response body.
|
|
SetPrototypeDelete(httpConn.managedResources, responseBodyRid);
|
|
try {
|
|
await core.opAsync("op_http_response_close", responseBodyRid);
|
|
} catch { /* pass */ }
|
|
}
|
|
}
|
|
|
|
const ws = resp[_ws];
|
|
if (ws) {
|
|
if (typeof requestRid !== "number") {
|
|
throw new TypeError(
|
|
"This request can not be upgraded to a websocket connection.",
|
|
);
|
|
}
|
|
|
|
const wsRid = await core.opAsync(
|
|
"op_http_upgrade_websocket",
|
|
requestRid,
|
|
);
|
|
ws[_rid] = wsRid;
|
|
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
|
|
|
|
if (ws[_readyState] === WebSocket.CLOSING) {
|
|
await core.opAsync("op_ws_close", { rid: wsRid });
|
|
|
|
ws[_readyState] = WebSocket.CLOSED;
|
|
|
|
const errEvent = new ErrorEvent("error");
|
|
ws.dispatchEvent(errEvent);
|
|
|
|
const event = new CloseEvent("close");
|
|
ws.dispatchEvent(event);
|
|
|
|
core.tryClose(wsRid);
|
|
} else {
|
|
ws[_readyState] = WebSocket.OPEN;
|
|
const event = new Event("open");
|
|
ws.dispatchEvent(event);
|
|
|
|
ws[_eventLoop]();
|
|
}
|
|
} else if (typeof requestRid === "number") {
|
|
// Try to close "request" resource. It might have been already consumed,
|
|
// but if it hasn't been we need to close it here to avoid resource
|
|
// leak.
|
|
SetPrototypeDelete(httpConn.managedResources, requestRid);
|
|
core.tryClose(requestRid);
|
|
}
|
|
};
|
|
}
|
|
|
|
function createRequestBodyStream(httpConn, requestRid) {
|
|
return new ReadableStream({
|
|
type: "bytes",
|
|
async pull(controller) {
|
|
try {
|
|
// This is the largest possible size for a single packet on a TLS
|
|
// stream.
|
|
const chunk = new Uint8Array(16 * 1024 + 256);
|
|
const read = await readRequest(
|
|
requestRid,
|
|
chunk,
|
|
);
|
|
if (read > 0) {
|
|
// We read some data. Enqueue it onto the stream.
|
|
controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
|
|
} else {
|
|
// We have reached the end of the body, so we close the stream.
|
|
controller.close();
|
|
SetPrototypeDelete(httpConn.managedResources, requestRid);
|
|
core.close(requestRid);
|
|
}
|
|
} catch (err) {
|
|
// There was an error while reading a chunk of the body, so we
|
|
// error.
|
|
controller.error(err);
|
|
controller.close();
|
|
SetPrototypeDelete(httpConn.managedResources, requestRid);
|
|
core.close(requestRid);
|
|
}
|
|
},
|
|
cancel() {
|
|
SetPrototypeDelete(httpConn.managedResources, requestRid);
|
|
core.close(requestRid);
|
|
},
|
|
});
|
|
}
|
|
|
|
const _ws = Symbol("[[associated_ws]]");
|
|
|
|
function upgradeWebSocket(request, options = {}) {
|
|
const upgrade = request.headers.get("upgrade");
|
|
if (!upgrade || StringPrototypeToLowerCase(upgrade) !== "websocket") {
|
|
throw new TypeError(
|
|
"Invalid Header: 'upgrade' header must be 'websocket'",
|
|
);
|
|
}
|
|
|
|
const connection = request.headers.get("connection");
|
|
const connectionHasUpgradeOption = connection !== null &&
|
|
ArrayPrototypeSome(
|
|
StringPrototypeSplit(connection, /\s*,\s*/),
|
|
(option) => StringPrototypeToLowerCase(option) === "upgrade",
|
|
);
|
|
if (!connectionHasUpgradeOption) {
|
|
throw new TypeError(
|
|
"Invalid Header: 'connection' header must be 'Upgrade'",
|
|
);
|
|
}
|
|
|
|
const websocketKey = request.headers.get("sec-websocket-key");
|
|
if (websocketKey === null) {
|
|
throw new TypeError(
|
|
"Invalid Header: 'sec-websocket-key' header must be set",
|
|
);
|
|
}
|
|
|
|
const accept = core.opSync("op_http_websocket_accept_header", websocketKey);
|
|
|
|
const r = newInnerResponse(101);
|
|
r.headerList = [
|
|
["upgrade", "websocket"],
|
|
["connection", "Upgrade"],
|
|
["sec-websocket-accept", accept],
|
|
];
|
|
|
|
const protocolsStr = request.headers.get("sec-websocket-protocol") || "";
|
|
const protocols = StringPrototypeSplit(protocolsStr, ", ");
|
|
if (protocols && options.protocol) {
|
|
if (ArrayPrototypeIncludes(protocols, options.protocol)) {
|
|
ArrayPrototypePush(r.headerList, [
|
|
"sec-websocket-protocol",
|
|
options.protocol,
|
|
]);
|
|
} else {
|
|
throw new TypeError(
|
|
`Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`,
|
|
);
|
|
}
|
|
}
|
|
|
|
const response = fromInnerResponse(r, "immutable");
|
|
|
|
const socket = webidl.createBranded(WebSocket);
|
|
setEventTargetData(socket);
|
|
socket[_server] = true;
|
|
response[_ws] = socket;
|
|
|
|
return { response, socket };
|
|
}
|
|
|
|
window.__bootstrap.http = {
|
|
HttpConn,
|
|
upgradeWebSocket,
|
|
};
|
|
})(this);
|