1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-31 19:44:10 -05:00
denoland-deno/ext/http/01_http.js

404 lines
12 KiB
JavaScript

// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
"use strict";
((window) => {
const webidl = window.__bootstrap.webidl;
const { InnerBody } = window.__bootstrap.fetchBody;
const { setEventTargetData } = window.__bootstrap.eventTarget;
const {
Response,
fromInnerRequest,
toInnerResponse,
newInnerRequest,
newInnerResponse,
fromInnerResponse,
} = window.__bootstrap.fetch;
const core = window.Deno.core;
const { BadResource, Interrupted } = core;
const { ReadableStream } = window.__bootstrap.streams;
const abortSignal = window.__bootstrap.abortSignal;
const { WebSocket, _rid, _readyState, _eventLoop, _protocol, _server } =
window.__bootstrap.webSocket;
const {
ArrayPrototypeIncludes,
ArrayPrototypePush,
ArrayPrototypeSome,
Promise,
Set,
SetPrototypeAdd,
SetPrototypeDelete,
SetPrototypeValues,
StringPrototypeIncludes,
StringPrototypeToLowerCase,
StringPrototypeSplit,
Symbol,
SymbolAsyncIterator,
TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
} = window.__bootstrap.primordials;
const connErrorSymbol = Symbol("connError");
class HttpConn {
#rid = 0;
// This set holds resource ids of resources
// that were created during lifecycle of this request.
// When the connection is closed these resources should be closed
// as well.
managedResources = new Set();
constructor(rid) {
this.#rid = rid;
}
/** @returns {number} */
get rid() {
return this.#rid;
}
/** @returns {Promise<ResponseEvent | null>} */
async nextRequest() {
let nextRequest;
try {
nextRequest = await core.opAsync(
"op_http_request_next",
this.#rid,
);
} catch (error) {
// A connection error seen here would cause disrupted responses to throw
// a generic `BadResource` error. Instead store this error and replace
// those with it.
this[connErrorSymbol] = error;
if (
error instanceof BadResource ||
error instanceof Interrupted ||
StringPrototypeIncludes(error.message, "connection closed")
) {
return null;
}
throw error;
}
if (nextRequest === null) return null;
const [
requestRid,
responseSenderRid,
method,
headersList,
url,
] = nextRequest;
/** @type {ReadableStream<Uint8Array> | undefined} */
let body = null;
if (typeof requestRid === "number") {
SetPrototypeAdd(this.managedResources, requestRid);
body = createRequestBodyStream(this, requestRid);
}
const innerRequest = newInnerRequest(
method,
url,
headersList,
body !== null ? new InnerBody(body) : null,
false,
);
const signal = abortSignal.newSignal();
const request = fromInnerRequest(innerRequest, signal, "immutable");
SetPrototypeAdd(this.managedResources, responseSenderRid);
const respondWith = createRespondWith(
this,
responseSenderRid,
requestRid,
);
return { request, respondWith };
}
/** @returns {void} */
close() {
for (const rid of SetPrototypeValues(this.managedResources)) {
core.tryClose(rid);
}
core.close(this.#rid);
}
[SymbolAsyncIterator]() {
// deno-lint-ignore no-this-alias
const httpConn = this;
return {
async next() {
const reqEvt = await httpConn.nextRequest();
// Change with caution, current form avoids a v8 deopt
return { value: reqEvt, done: reqEvt === null };
},
};
}
}
function readRequest(requestRid, zeroCopyBuf) {
return core.opAsync(
"op_http_request_read",
requestRid,
zeroCopyBuf,
);
}
function createRespondWith(httpConn, responseSenderRid, requestRid) {
return async function respondWith(resp) {
if (resp instanceof Promise) {
resp = await resp;
}
if (!(resp instanceof Response)) {
throw new TypeError(
"First argument to respondWith must be a Response or a promise resolving to a Response.",
);
}
const innerResp = toInnerResponse(resp);
// If response body length is known, it will be sent synchronously in a
// single op, in other case a "response body" resource will be created and
// we'll be streaming it.
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
let respBody = null;
if (innerResp.body !== null) {
if (innerResp.body.unusable()) throw new TypeError("Body is unusable.");
if (innerResp.body.streamOrStatic instanceof ReadableStream) {
if (
innerResp.body.length === null ||
innerResp.body.source instanceof Blob
) {
respBody = innerResp.body.stream;
} else {
const reader = innerResp.body.stream.getReader();
const r1 = await reader.read();
if (r1.done) {
respBody = new Uint8Array(0);
} else {
respBody = r1.value;
const r2 = await reader.read();
if (!r2.done) throw new TypeError("Unreachable");
}
}
} else {
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
}
} else {
respBody = new Uint8Array(0);
}
SetPrototypeDelete(httpConn.managedResources, responseSenderRid);
let responseBodyRid;
try {
responseBodyRid = await core.opAsync("op_http_response", [
responseSenderRid,
innerResp.status ?? 200,
innerResp.headerList,
], respBody instanceof Uint8Array ? respBody : null);
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (error instanceof BadResource && connError != null) {
// deno-lint-ignore no-ex-assign
error = new connError.constructor(connError.message);
}
if (respBody !== null && respBody instanceof ReadableStream) {
await respBody.cancel(error);
}
throw error;
}
// If `respond` returns a responseBodyRid, we should stream the body
// to that resource.
if (responseBodyRid !== null) {
SetPrototypeAdd(httpConn.managedResources, responseBodyRid);
try {
if (respBody === null || !(respBody instanceof ReadableStream)) {
throw new TypeError("Unreachable");
}
const reader = respBody.getReader();
while (true) {
const { value, done } = await reader.read();
if (done) break;
if (!(value instanceof Uint8Array)) {
await reader.cancel(new TypeError("Value not a Uint8Array"));
break;
}
try {
await core.opAsync(
"op_http_response_write",
responseBodyRid,
value,
);
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (error instanceof BadResource && connError != null) {
// deno-lint-ignore no-ex-assign
error = new connError.constructor(connError.message);
}
await reader.cancel(error);
throw error;
}
}
} finally {
// Once all chunks are sent, and the request body is closed, we can
// close the response body.
SetPrototypeDelete(httpConn.managedResources, responseBodyRid);
try {
await core.opAsync("op_http_response_close", responseBodyRid);
} catch { /* pass */ }
}
}
const ws = resp[_ws];
if (ws) {
if (typeof requestRid !== "number") {
throw new TypeError(
"This request can not be upgraded to a websocket connection.",
);
}
const wsRid = await core.opAsync(
"op_http_upgrade_websocket",
requestRid,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
if (ws[_readyState] === WebSocket.CLOSING) {
await core.opAsync("op_ws_close", { rid: wsRid });
ws[_readyState] = WebSocket.CLOSED;
const errEvent = new ErrorEvent("error");
ws.dispatchEvent(errEvent);
const event = new CloseEvent("close");
ws.dispatchEvent(event);
core.tryClose(wsRid);
} else {
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);
ws[_eventLoop]();
}
} else if (typeof requestRid === "number") {
// Try to close "request" resource. It might have been already consumed,
// but if it hasn't been we need to close it here to avoid resource
// leak.
SetPrototypeDelete(httpConn.managedResources, requestRid);
core.tryClose(requestRid);
}
};
}
function createRequestBodyStream(httpConn, requestRid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
try {
// This is the largest possible size for a single packet on a TLS
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
const read = await readRequest(
requestRid,
chunk,
);
if (read > 0) {
// We read some data. Enqueue it onto the stream.
controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
SetPrototypeDelete(httpConn.managedResources, requestRid);
core.close(requestRid);
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
SetPrototypeDelete(httpConn.managedResources, requestRid);
core.close(requestRid);
}
},
cancel() {
SetPrototypeDelete(httpConn.managedResources, requestRid);
core.close(requestRid);
},
});
}
const _ws = Symbol("[[associated_ws]]");
function upgradeWebSocket(request, options = {}) {
const upgrade = request.headers.get("upgrade");
if (!upgrade || StringPrototypeToLowerCase(upgrade) !== "websocket") {
throw new TypeError(
"Invalid Header: 'upgrade' header must be 'websocket'",
);
}
const connection = request.headers.get("connection");
const connectionHasUpgradeOption = connection !== null &&
ArrayPrototypeSome(
StringPrototypeSplit(connection, /\s*,\s*/),
(option) => StringPrototypeToLowerCase(option) === "upgrade",
);
if (!connectionHasUpgradeOption) {
throw new TypeError(
"Invalid Header: 'connection' header must be 'Upgrade'",
);
}
const websocketKey = request.headers.get("sec-websocket-key");
if (websocketKey === null) {
throw new TypeError(
"Invalid Header: 'sec-websocket-key' header must be set",
);
}
const accept = core.opSync("op_http_websocket_accept_header", websocketKey);
const r = newInnerResponse(101);
r.headerList = [
["upgrade", "websocket"],
["connection", "Upgrade"],
["sec-websocket-accept", accept],
];
const protocolsStr = request.headers.get("sec-websocket-protocol") || "";
const protocols = StringPrototypeSplit(protocolsStr, ", ");
if (protocols && options.protocol) {
if (ArrayPrototypeIncludes(protocols, options.protocol)) {
ArrayPrototypePush(r.headerList, [
"sec-websocket-protocol",
options.protocol,
]);
} else {
throw new TypeError(
`Protocol '${options.protocol}' not in the request's protocol list (non negotiable)`,
);
}
}
const response = fromInnerResponse(r, "immutable");
const socket = webidl.createBranded(WebSocket);
setEventTargetData(socket);
socket[_server] = true;
response[_ws] = socket;
return { response, socket };
}
window.__bootstrap.http = {
HttpConn,
upgradeWebSocket,
};
})(this);