mirror of
https://github.com/denoland/deno.git
synced 2024-11-26 16:09:27 -05:00
484b6fe2fa
Applies suggestion from #17912
782 lines
21 KiB
JavaScript
782 lines
21 KiB
JavaScript
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
|
const core = globalThis.Deno.core;
|
|
const ops = core.ops;
|
|
const primordials = globalThis.__bootstrap.primordials;
|
|
import { BlobPrototype } from "internal:deno_web/09_file.js";
|
|
import { TcpConn } from "internal:deno_net/01_net.js";
|
|
import { toInnerResponse } from "internal:deno_fetch/23_response.js";
|
|
import { _flash, fromFlashRequest } from "internal:deno_fetch/23_request.js";
|
|
import { Event } from "internal:deno_web/02_event.js";
|
|
import {
|
|
_state,
|
|
getReadableStreamResourceBacking,
|
|
ReadableStream,
|
|
readableStreamClose,
|
|
ReadableStreamPrototype,
|
|
} from "internal:deno_web/06_streams.js";
|
|
import {
|
|
_eventLoop,
|
|
_idleTimeoutDuration,
|
|
_idleTimeoutTimeout,
|
|
_protocol,
|
|
_readyState,
|
|
_rid,
|
|
_serverHandleIdleTimeout,
|
|
WebSocket,
|
|
} from "internal:deno_websocket/01_websocket.js";
|
|
import { _ws } from "internal:deno_http/01_http.js";
|
|
const {
|
|
ObjectPrototypeIsPrototypeOf,
|
|
PromisePrototype,
|
|
PromisePrototypeCatch,
|
|
PromisePrototypeThen,
|
|
SafePromiseAll,
|
|
TypedArrayPrototypeSet,
|
|
TypedArrayPrototypeSubarray,
|
|
TypeError,
|
|
Uint8Array,
|
|
Uint8ArrayPrototype,
|
|
} = primordials;
|
|
|
|
const statusCodes = {
|
|
100: "Continue",
|
|
101: "Switching Protocols",
|
|
102: "Processing",
|
|
200: "OK",
|
|
201: "Created",
|
|
202: "Accepted",
|
|
203: "Non Authoritative Information",
|
|
204: "No Content",
|
|
205: "Reset Content",
|
|
206: "Partial Content",
|
|
207: "Multi-Status",
|
|
208: "Already Reported",
|
|
226: "IM Used",
|
|
300: "Multiple Choices",
|
|
301: "Moved Permanently",
|
|
302: "Found",
|
|
303: "See Other",
|
|
304: "Not Modified",
|
|
305: "Use Proxy",
|
|
307: "Temporary Redirect",
|
|
308: "Permanent Redirect",
|
|
400: "Bad Request",
|
|
401: "Unauthorized",
|
|
402: "Payment Required",
|
|
403: "Forbidden",
|
|
404: "Not Found",
|
|
405: "Method Not Allowed",
|
|
406: "Not Acceptable",
|
|
407: "Proxy Authentication Required",
|
|
408: "Request Timeout",
|
|
409: "Conflict",
|
|
410: "Gone",
|
|
411: "Length Required",
|
|
412: "Precondition Failed",
|
|
413: "Payload Too Large",
|
|
414: "URI Too Long",
|
|
415: "Unsupported Media Type",
|
|
416: "Range Not Satisfiable",
|
|
418: "I'm a teapot",
|
|
421: "Misdirected Request",
|
|
422: "Unprocessable Entity",
|
|
423: "Locked",
|
|
424: "Failed Dependency",
|
|
426: "Upgrade Required",
|
|
428: "Precondition Required",
|
|
429: "Too Many Requests",
|
|
431: "Request Header Fields Too Large",
|
|
451: "Unavailable For Legal Reasons",
|
|
500: "Internal Server Error",
|
|
501: "Not Implemented",
|
|
502: "Bad Gateway",
|
|
503: "Service Unavailable",
|
|
504: "Gateway Timeout",
|
|
505: "HTTP Version Not Supported",
|
|
506: "Variant Also Negotiates",
|
|
507: "Insufficient Storage",
|
|
508: "Loop Detected",
|
|
510: "Not Extended",
|
|
511: "Network Authentication Required",
|
|
};
|
|
|
|
const methods = {
|
|
0: "GET",
|
|
1: "HEAD",
|
|
2: "CONNECT",
|
|
3: "PUT",
|
|
4: "DELETE",
|
|
5: "OPTIONS",
|
|
6: "TRACE",
|
|
7: "POST",
|
|
8: "PATCH",
|
|
};
|
|
|
|
let dateInterval;
|
|
let date;
|
|
|
|
/**
|
|
* Construct an HTTP response message.
|
|
* All HTTP/1.1 messages consist of a start-line followed by a sequence
|
|
* of octets.
|
|
*
|
|
* HTTP-message = start-line
|
|
* *( header-field CRLF )
|
|
* CRLF
|
|
* [ message-body ]
|
|
*
|
|
* @param {keyof typeof methods} method
|
|
* @param {keyof typeof statusCodes} status
|
|
* @param {[name: string, value: string][]} headerList
|
|
* @param {Uint8Array | string | null} body
|
|
* @param {number} bodyLen
|
|
* @param {boolean} earlyEnd
|
|
* @returns {Uint8Array | string}
|
|
*/
|
|
function http1Response(
|
|
method,
|
|
status,
|
|
headerList,
|
|
body,
|
|
bodyLen,
|
|
earlyEnd = false,
|
|
) {
|
|
// HTTP uses a "<major>.<minor>" numbering scheme
|
|
// HTTP-version = HTTP-name "/" DIGIT "." DIGIT
|
|
// HTTP-name = %x48.54.54.50 ; "HTTP", case-sensitive
|
|
//
|
|
// status-line = HTTP-version SP status-code SP reason-phrase CRLF
|
|
// Date header: https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.1.2
|
|
let str = `HTTP/1.1 ${status} ${statusCodes[status]}\r\nDate: ${date}\r\n`;
|
|
for (let i = 0; i < headerList.length; ++i) {
|
|
const { 0: name, 1: value } = headerList[i];
|
|
// header-field = field-name ":" OWS field-value OWS
|
|
str += `${name}: ${value}\r\n`;
|
|
}
|
|
|
|
// https://datatracker.ietf.org/doc/html/rfc7231#section-6.3.6
|
|
if (status === 205 || status === 304) {
|
|
// MUST NOT generate a payload in a 205 response.
|
|
// indicate a zero-length body for the response by
|
|
// including a Content-Length header field with a value of 0.
|
|
str += "Content-Length: 0\r\n\r\n";
|
|
return str;
|
|
}
|
|
|
|
// MUST NOT send Content-Length or Transfer-Encoding if status code is 1xx or 204.
|
|
if (status === 204 || status < 200) {
|
|
str += "\r\n";
|
|
return str;
|
|
}
|
|
|
|
if (earlyEnd === true) {
|
|
return str;
|
|
}
|
|
|
|
// null body status is validated by inititalizeAResponse in ext/fetch
|
|
if (body !== null && body !== undefined) {
|
|
str += `Content-Length: ${bodyLen}\r\n\r\n`;
|
|
} else {
|
|
str += "Transfer-Encoding: chunked\r\n\r\n";
|
|
return str;
|
|
}
|
|
|
|
// A HEAD request.
|
|
if (method === 1) return str;
|
|
|
|
if (typeof body === "string") {
|
|
str += body ?? "";
|
|
} else {
|
|
const head = core.encode(str);
|
|
const response = new Uint8Array(head.byteLength + bodyLen);
|
|
TypedArrayPrototypeSet(response, head, 0);
|
|
TypedArrayPrototypeSet(response, body, head.byteLength);
|
|
return response;
|
|
}
|
|
|
|
return str;
|
|
}
|
|
|
|
function prepareFastCalls() {
|
|
return ops.op_flash_make_request();
|
|
}
|
|
|
|
function hostnameForDisplay(hostname) {
|
|
// If the hostname is "0.0.0.0", we display "localhost" in console
|
|
// because browsers in Windows don't resolve "0.0.0.0".
|
|
// See the discussion in https://github.com/denoland/deno_std/issues/1165
|
|
return hostname === "0.0.0.0" ? "localhost" : hostname;
|
|
}
|
|
|
|
function writeFixedResponse(
|
|
server,
|
|
requestId,
|
|
response,
|
|
responseLen,
|
|
end,
|
|
respondFast,
|
|
) {
|
|
let nwritten = 0;
|
|
// TypedArray
|
|
if (typeof response !== "string") {
|
|
nwritten = respondFast(requestId, response, end);
|
|
} else {
|
|
// string
|
|
nwritten = ops.op_flash_respond(
|
|
server,
|
|
requestId,
|
|
response,
|
|
end,
|
|
);
|
|
}
|
|
|
|
if (nwritten < responseLen) {
|
|
core.opAsync(
|
|
"op_flash_respond_async",
|
|
server,
|
|
requestId,
|
|
response.slice(nwritten),
|
|
end,
|
|
);
|
|
}
|
|
}
|
|
|
|
// TODO(@littledivy): Woah woah, cut down the number of arguments.
|
|
async function handleResponse(
|
|
req,
|
|
resp,
|
|
body,
|
|
hasBody,
|
|
method,
|
|
serverId,
|
|
i,
|
|
respondFast,
|
|
respondChunked,
|
|
tryRespondChunked,
|
|
) {
|
|
// there might've been an HTTP upgrade.
|
|
if (resp === undefined) {
|
|
return;
|
|
}
|
|
const innerResp = toInnerResponse(resp);
|
|
// If response body length is known, it will be sent synchronously in a
|
|
// single op, in other case a "response body" resource will be created and
|
|
// we'll be streaming it.
|
|
/** @type {ReadableStream<Uint8Array> | Uint8Array | string | null} */
|
|
let respBody = null;
|
|
let isStreamingResponseBody = false;
|
|
if (innerResp.body !== null) {
|
|
if (typeof innerResp.body.streamOrStatic?.body === "string") {
|
|
if (innerResp.body.streamOrStatic.consumed === true) {
|
|
throw new TypeError("Body is unusable.");
|
|
}
|
|
innerResp.body.streamOrStatic.consumed = true;
|
|
respBody = innerResp.body.streamOrStatic.body;
|
|
isStreamingResponseBody = false;
|
|
} else if (
|
|
ObjectPrototypeIsPrototypeOf(
|
|
ReadableStreamPrototype,
|
|
innerResp.body.streamOrStatic,
|
|
)
|
|
) {
|
|
if (innerResp.body.unusable()) {
|
|
throw new TypeError("Body is unusable.");
|
|
}
|
|
if (
|
|
innerResp.body.length === null ||
|
|
ObjectPrototypeIsPrototypeOf(
|
|
BlobPrototype,
|
|
innerResp.body.source,
|
|
)
|
|
) {
|
|
respBody = innerResp.body.stream;
|
|
} else {
|
|
const reader = innerResp.body.stream.getReader();
|
|
const r1 = await reader.read();
|
|
if (r1.done) {
|
|
respBody = new Uint8Array(0);
|
|
} else {
|
|
respBody = r1.value;
|
|
const r2 = await reader.read();
|
|
if (!r2.done) throw new TypeError("Unreachable");
|
|
}
|
|
}
|
|
isStreamingResponseBody = !(
|
|
typeof respBody === "string" ||
|
|
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
|
|
);
|
|
} else {
|
|
if (innerResp.body.streamOrStatic.consumed === true) {
|
|
throw new TypeError("Body is unusable.");
|
|
}
|
|
innerResp.body.streamOrStatic.consumed = true;
|
|
respBody = innerResp.body.streamOrStatic.body;
|
|
}
|
|
} else {
|
|
respBody = new Uint8Array(0);
|
|
}
|
|
|
|
const ws = resp[_ws];
|
|
if (isStreamingResponseBody === false) {
|
|
const length = respBody.byteLength || core.byteLength(respBody);
|
|
const responseStr = http1Response(
|
|
method,
|
|
innerResp.status ?? 200,
|
|
innerResp.headerList,
|
|
respBody,
|
|
length,
|
|
);
|
|
// A HEAD request always ignores body, but includes the correct content-length size.
|
|
const responseLen = method === 1 ? core.byteLength(responseStr) : length;
|
|
writeFixedResponse(
|
|
serverId,
|
|
i,
|
|
responseStr,
|
|
responseLen,
|
|
!ws, // Don't close socket if there is a deferred websocket upgrade.
|
|
respondFast,
|
|
);
|
|
}
|
|
|
|
return (async () => {
|
|
if (!ws) {
|
|
if (hasBody && body[_state] !== "closed") {
|
|
// TODO(@littledivy): Optimize by draining in a single op.
|
|
try {
|
|
await req.arrayBuffer();
|
|
} catch { /* pass */ }
|
|
}
|
|
}
|
|
|
|
if (isStreamingResponseBody === true) {
|
|
const resourceBacking = getReadableStreamResourceBacking(respBody);
|
|
if (resourceBacking) {
|
|
if (respBody.locked) {
|
|
throw new TypeError("ReadableStream is locked.");
|
|
}
|
|
const reader = respBody.getReader(); // Aquire JS lock.
|
|
try {
|
|
PromisePrototypeThen(
|
|
core.opAsync(
|
|
"op_flash_write_resource",
|
|
http1Response(
|
|
method,
|
|
innerResp.status ?? 200,
|
|
innerResp.headerList,
|
|
null,
|
|
0, // Content-Length will be set by the op.
|
|
true,
|
|
),
|
|
serverId,
|
|
i,
|
|
resourceBacking.rid,
|
|
resourceBacking.autoClose,
|
|
),
|
|
() => {
|
|
// Release JS lock.
|
|
readableStreamClose(respBody);
|
|
},
|
|
);
|
|
} catch (error) {
|
|
await reader.cancel(error);
|
|
throw error;
|
|
}
|
|
} else {
|
|
const reader = respBody.getReader();
|
|
|
|
// Best case: sends headers + first chunk in a single go.
|
|
const { value, done } = await reader.read();
|
|
writeFixedResponse(
|
|
serverId,
|
|
i,
|
|
http1Response(
|
|
method,
|
|
innerResp.status ?? 200,
|
|
innerResp.headerList,
|
|
null,
|
|
respBody.byteLength,
|
|
),
|
|
respBody.byteLength,
|
|
false,
|
|
respondFast,
|
|
);
|
|
|
|
await tryRespondChunked(
|
|
i,
|
|
value,
|
|
done,
|
|
);
|
|
|
|
if (!done) {
|
|
while (true) {
|
|
const chunk = await reader.read();
|
|
await respondChunked(
|
|
i,
|
|
chunk.value,
|
|
chunk.done,
|
|
);
|
|
if (chunk.done) break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (ws) {
|
|
const wsRid = await core.opAsync(
|
|
"op_flash_upgrade_websocket",
|
|
serverId,
|
|
i,
|
|
);
|
|
ws[_rid] = wsRid;
|
|
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
|
|
|
|
ws[_readyState] = WebSocket.OPEN;
|
|
const event = new Event("open");
|
|
ws.dispatchEvent(event);
|
|
|
|
ws[_eventLoop]();
|
|
if (ws[_idleTimeoutDuration]) {
|
|
ws.addEventListener(
|
|
"close",
|
|
() => clearTimeout(ws[_idleTimeoutTimeout]),
|
|
);
|
|
}
|
|
ws[_serverHandleIdleTimeout]();
|
|
}
|
|
})();
|
|
}
|
|
|
|
function createServe(opFn) {
|
|
return async function serve(arg1, arg2) {
|
|
let options = undefined;
|
|
let handler = undefined;
|
|
if (typeof arg1 === "function") {
|
|
handler = arg1;
|
|
options = arg2;
|
|
} else if (typeof arg2 === "function") {
|
|
handler = arg2;
|
|
options = arg1;
|
|
} else {
|
|
options = arg1;
|
|
}
|
|
if (handler === undefined) {
|
|
if (options === undefined) {
|
|
throw new TypeError(
|
|
"No handler was provided, so an options bag is mandatory.",
|
|
);
|
|
}
|
|
handler = options.handler;
|
|
}
|
|
if (typeof handler !== "function") {
|
|
throw new TypeError("A handler function must be provided.");
|
|
}
|
|
if (options === undefined) {
|
|
options = {};
|
|
}
|
|
|
|
const signal = options.signal;
|
|
|
|
const onError = options.onError ?? function (error) {
|
|
console.error(error);
|
|
return new Response("Internal Server Error", { status: 500 });
|
|
};
|
|
|
|
const onListen = options.onListen ?? function ({ port }) {
|
|
console.log(
|
|
`Listening on http://${
|
|
hostnameForDisplay(listenOpts.hostname)
|
|
}:${port}/`,
|
|
);
|
|
};
|
|
|
|
const listenOpts = {
|
|
hostname: options.hostname ?? "127.0.0.1",
|
|
port: options.port ?? 9000,
|
|
reuseport: options.reusePort ?? false,
|
|
};
|
|
if (options.cert || options.key) {
|
|
if (!options.cert || !options.key) {
|
|
throw new TypeError(
|
|
"Both cert and key must be provided to enable HTTPS.",
|
|
);
|
|
}
|
|
listenOpts.cert = options.cert;
|
|
listenOpts.key = options.key;
|
|
}
|
|
|
|
const serverId = opFn(listenOpts);
|
|
const serverPromise = core.opAsync("op_flash_drive_server", serverId);
|
|
|
|
PromisePrototypeCatch(
|
|
PromisePrototypeThen(
|
|
core.opAsync("op_flash_wait_for_listening", serverId),
|
|
(port) => {
|
|
onListen({ hostname: listenOpts.hostname, port });
|
|
},
|
|
),
|
|
() => {},
|
|
);
|
|
const finishedPromise = PromisePrototypeCatch(serverPromise, () => {});
|
|
|
|
const server = {
|
|
id: serverId,
|
|
transport: listenOpts.cert && listenOpts.key ? "https" : "http",
|
|
hostname: listenOpts.hostname,
|
|
port: listenOpts.port,
|
|
closed: false,
|
|
finished: finishedPromise,
|
|
async close() {
|
|
if (server.closed) {
|
|
return;
|
|
}
|
|
server.closed = true;
|
|
await core.opAsync("op_flash_close_server", serverId);
|
|
await server.finished;
|
|
},
|
|
async serve() {
|
|
let offset = 0;
|
|
while (true) {
|
|
if (server.closed) {
|
|
break;
|
|
}
|
|
|
|
let tokens = nextRequestSync();
|
|
if (tokens === 0) {
|
|
tokens = await core.opAsync("op_flash_next_async", serverId);
|
|
if (server.closed) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (let i = offset; i < offset + tokens; i++) {
|
|
let body = null;
|
|
// There might be a body, but we don't expose it for GET/HEAD requests.
|
|
// It will be closed automatically once the request has been handled and
|
|
// the response has been sent.
|
|
const method = getMethodSync(i);
|
|
let hasBody = method > 2; // Not GET/HEAD/CONNECT
|
|
if (hasBody) {
|
|
body = createRequestBodyStream(serverId, i);
|
|
if (body === null) {
|
|
hasBody = false;
|
|
}
|
|
}
|
|
|
|
const req = fromFlashRequest(
|
|
serverId,
|
|
/* streamRid */
|
|
i,
|
|
body,
|
|
/* methodCb */
|
|
() => methods[method],
|
|
/* urlCb */
|
|
() => {
|
|
const path = ops.op_flash_path(serverId, i);
|
|
return `${server.transport}://${server.hostname}:${server.port}${path}`;
|
|
},
|
|
/* headersCb */
|
|
() => ops.op_flash_headers(serverId, i),
|
|
);
|
|
|
|
let resp;
|
|
let remoteAddr;
|
|
try {
|
|
resp = handler(req, {
|
|
get remoteAddr() {
|
|
if (!remoteAddr) {
|
|
const { 0: hostname, 1: port } = core.ops.op_flash_addr(
|
|
serverId,
|
|
i,
|
|
);
|
|
remoteAddr = { hostname, port };
|
|
}
|
|
return remoteAddr;
|
|
},
|
|
});
|
|
if (ObjectPrototypeIsPrototypeOf(PromisePrototype, resp)) {
|
|
PromisePrototypeCatch(
|
|
PromisePrototypeThen(
|
|
resp,
|
|
(resp) =>
|
|
handleResponse(
|
|
req,
|
|
resp,
|
|
body,
|
|
hasBody,
|
|
method,
|
|
serverId,
|
|
i,
|
|
respondFast,
|
|
respondChunked,
|
|
tryRespondChunked,
|
|
),
|
|
),
|
|
onError,
|
|
);
|
|
} else if (typeof resp?.then === "function") {
|
|
resp.then((resp) =>
|
|
handleResponse(
|
|
req,
|
|
resp,
|
|
body,
|
|
hasBody,
|
|
method,
|
|
serverId,
|
|
i,
|
|
respondFast,
|
|
respondChunked,
|
|
tryRespondChunked,
|
|
)
|
|
).catch(onError);
|
|
} else {
|
|
handleResponse(
|
|
req,
|
|
resp,
|
|
body,
|
|
hasBody,
|
|
method,
|
|
serverId,
|
|
i,
|
|
respondFast,
|
|
respondChunked,
|
|
tryRespondChunked,
|
|
).catch(onError);
|
|
}
|
|
} catch (e) {
|
|
resp = await onError(e);
|
|
}
|
|
}
|
|
|
|
offset += tokens;
|
|
}
|
|
await server.finished;
|
|
},
|
|
};
|
|
|
|
signal?.addEventListener("abort", () => {
|
|
clearInterval(dateInterval);
|
|
PromisePrototypeThen(server.close(), () => {}, () => {});
|
|
}, {
|
|
once: true,
|
|
});
|
|
|
|
function tryRespondChunked(token, chunk, shutdown) {
|
|
const nwritten = ops.op_try_flash_respond_chunked(
|
|
serverId,
|
|
token,
|
|
chunk ?? new Uint8Array(),
|
|
shutdown,
|
|
);
|
|
if (nwritten > 0) {
|
|
return core.opAsync(
|
|
"op_flash_respond_chunked",
|
|
serverId,
|
|
token,
|
|
chunk,
|
|
shutdown,
|
|
nwritten,
|
|
);
|
|
}
|
|
}
|
|
|
|
function respondChunked(token, chunk, shutdown) {
|
|
return core.opAsync(
|
|
"op_flash_respond_chunked",
|
|
serverId,
|
|
token,
|
|
chunk,
|
|
shutdown,
|
|
);
|
|
}
|
|
|
|
const fastOp = prepareFastCalls();
|
|
let nextRequestSync = () => fastOp.nextRequest();
|
|
let getMethodSync = (token) => fastOp.getMethod(token);
|
|
let respondFast = (token, response, shutdown) =>
|
|
fastOp.respond(token, response, shutdown);
|
|
if (serverId > 0) {
|
|
nextRequestSync = () => ops.op_flash_next_server(serverId);
|
|
getMethodSync = (token) => ops.op_flash_method(serverId, token);
|
|
respondFast = (token, response, shutdown) =>
|
|
ops.op_flash_respond(serverId, token, response, null, shutdown);
|
|
}
|
|
|
|
if (!dateInterval) {
|
|
date = new Date().toUTCString();
|
|
dateInterval = setInterval(() => {
|
|
date = new Date().toUTCString();
|
|
}, 1000);
|
|
}
|
|
|
|
await SafePromiseAll([
|
|
PromisePrototypeCatch(server.serve(), console.error),
|
|
serverPromise,
|
|
]);
|
|
};
|
|
}
|
|
|
|
function createRequestBodyStream(serverId, token) {
|
|
// The first packet is left over bytes after parsing the request
|
|
const firstRead = ops.op_flash_first_packet(
|
|
serverId,
|
|
token,
|
|
);
|
|
if (!firstRead) return null;
|
|
let firstEnqueued = firstRead.byteLength == 0;
|
|
|
|
return new ReadableStream({
|
|
type: "bytes",
|
|
async pull(controller) {
|
|
try {
|
|
if (firstEnqueued === false) {
|
|
controller.enqueue(firstRead);
|
|
firstEnqueued = true;
|
|
return;
|
|
}
|
|
// This is the largest possible size for a single packet on a TLS
|
|
// stream.
|
|
const chunk = new Uint8Array(16 * 1024 + 256);
|
|
const read = await core.opAsync(
|
|
"op_flash_read_body",
|
|
serverId,
|
|
token,
|
|
chunk,
|
|
);
|
|
if (read > 0) {
|
|
// We read some data. Enqueue it onto the stream.
|
|
controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
|
|
} else {
|
|
// We have reached the end of the body, so we close the stream.
|
|
controller.close();
|
|
}
|
|
} catch (err) {
|
|
// There was an error while reading a chunk of the body, so we
|
|
// error.
|
|
controller.error(err);
|
|
controller.close();
|
|
}
|
|
},
|
|
});
|
|
}
|
|
|
|
function upgradeHttpRaw(req) {
|
|
if (!req[_flash]) {
|
|
throw new TypeError(
|
|
"Non-flash requests can not be upgraded with `upgradeHttpRaw`. Use `upgradeHttp` instead.",
|
|
);
|
|
}
|
|
|
|
// NOTE(bartlomieju):
|
|
// Access these fields so they are cached on `req` object, otherwise
|
|
// they wouldn't be available after the connection gets upgraded.
|
|
req.url;
|
|
req.method;
|
|
req.headers;
|
|
|
|
const { serverId, streamRid } = req[_flash];
|
|
const connRid = ops.op_flash_upgrade_http(streamRid, serverId);
|
|
// TODO(@littledivy): return already read first packet too.
|
|
return [new TcpConn(connRid), new Uint8Array()];
|
|
}
|
|
|
|
export { createServe, upgradeHttpRaw };
|