1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-03 17:08:35 -05:00

fix(ext/flash): don't block requests (#15852)

This commit is contained in:
Divy Srivastava 2022-09-13 22:24:27 +05:30 committed by cjihrig
parent 503f8105c5
commit e65d8af1f7
No known key found for this signature in database
GPG key ID: 7434390BDBE9B9C5

View file

@ -32,6 +32,7 @@
TypedArrayPrototypeSubarray, TypedArrayPrototypeSubarray,
TypeError, TypeError,
Uint8Array, Uint8Array,
Promise,
Uint8ArrayPrototype, Uint8ArrayPrototype,
} = window.__bootstrap.primordials; } = window.__bootstrap.primordials;
@ -227,6 +228,192 @@
} }
} }
// TODO(@littledivy): Woah woah, cut down the number of arguments.
async function handleResponse(
req,
resp,
body,
hasBody,
method,
serverId,
i,
respondFast,
respondChunked,
) {
// there might've been an HTTP upgrade.
if (resp === undefined) {
return;
}
const innerResp = toInnerResponse(resp);
// If response body length is known, it will be sent synchronously in a
// single op, in other case a "response body" resource will be created and
// we'll be streaming it.
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
let respBody = null;
let isStreamingResponseBody = false;
if (innerResp.body !== null) {
if (typeof innerResp.body.streamOrStatic?.body === "string") {
if (innerResp.body.streamOrStatic.consumed === true) {
throw new TypeError("Body is unusable.");
}
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
isStreamingResponseBody = false;
} else if (
ObjectPrototypeIsPrototypeOf(
ReadableStreamPrototype,
innerResp.body.streamOrStatic,
)
) {
if (innerResp.body.unusable()) {
throw new TypeError("Body is unusable.");
}
if (
innerResp.body.length === null ||
ObjectPrototypeIsPrototypeOf(
BlobPrototype,
innerResp.body.source,
)
) {
respBody = innerResp.body.stream;
} else {
const reader = innerResp.body.stream.getReader();
const r1 = await reader.read();
if (r1.done) {
respBody = new Uint8Array(0);
} else {
respBody = r1.value;
const r2 = await reader.read();
if (!r2.done) throw new TypeError("Unreachable");
}
}
isStreamingResponseBody = !(
typeof respBody === "string" ||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
);
} else {
if (innerResp.body.streamOrStatic.consumed === true) {
throw new TypeError("Body is unusable.");
}
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
}
} else {
respBody = new Uint8Array(0);
}
const ws = resp[_ws];
if (isStreamingResponseBody === false) {
const length = respBody.byteLength || core.byteLength(respBody);
const responseStr = http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
respBody,
length,
);
writeFixedResponse(
serverId,
i,
responseStr,
length,
!ws, // Don't close socket if there is a deferred websocket upgrade.
respondFast,
);
}
(async () => {
if (!ws) {
if (hasBody && body[_state] !== "closed") {
// TODO(@littledivy): Optimize by draining in a single op.
try {
await req.arrayBuffer();
} catch { /* pass */ }
}
}
if (isStreamingResponseBody === true) {
const resourceRid = getReadableStreamRid(respBody);
if (resourceRid) {
if (respBody.locked) {
throw new TypeError("ReadableStream is locked.");
}
const reader = respBody.getReader(); // Aquire JS lock.
try {
core.opAsync(
"op_flash_write_resource",
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
0, // Content-Length will be set by the op.
null,
true,
),
serverId,
i,
resourceRid,
).then(() => {
// Release JS lock.
readableStreamClose(respBody);
});
} catch (error) {
await reader.cancel(error);
throw error;
}
} else {
const reader = respBody.getReader();
writeFixedResponse(
serverId,
i,
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
respBody.byteLength,
null,
),
respBody.byteLength,
false,
respondFast,
);
while (true) {
const { value, done } = await reader.read();
await respondChunked(
i,
value,
done,
);
if (done) break;
}
}
}
if (ws) {
const wsRid = await core.opAsync(
"op_flash_upgrade_websocket",
serverId,
i,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);
ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
}
ws[_serverHandleIdleTimeout]();
}
})();
}
async function serve(arg1, arg2) { async function serve(arg1, arg2) {
let options = undefined; let options = undefined;
let handler = undefined; let handler = undefined;
@ -353,183 +540,38 @@
let resp; let resp;
try { try {
resp = await handler(req); resp = handler(req);
if (resp instanceof Promise || typeof resp.then === "function") {
resp.then((resp) =>
handleResponse(
req,
resp,
body,
hasBody,
method,
serverId,
i,
respondFast,
respondChunked,
)
).catch(onError);
continue;
}
} catch (e) { } catch (e) {
resp = await onError(e); resp = await onError(e);
} }
// there might've been an HTTP upgrade.
if (resp === undefined) {
return;
}
const innerResp = toInnerResponse(resp);
// If response body length is known, it will be sent synchronously in a handleResponse(
// single op, in other case a "response body" resource will be created and req,
// we'll be streaming it. resp,
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ body,
let respBody = null; hasBody,
let isStreamingResponseBody = false; method,
if (innerResp.body !== null) { serverId,
if (typeof innerResp.body.streamOrStatic?.body === "string") { i,
if (innerResp.body.streamOrStatic.consumed === true) { respondFast,
throw new TypeError("Body is unusable."); respondChunked,
} );
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
isStreamingResponseBody = false;
} else if (
ObjectPrototypeIsPrototypeOf(
ReadableStreamPrototype,
innerResp.body.streamOrStatic,
)
) {
if (innerResp.body.unusable()) {
throw new TypeError("Body is unusable.");
}
if (
innerResp.body.length === null ||
ObjectPrototypeIsPrototypeOf(
BlobPrototype,
innerResp.body.source,
)
) {
respBody = innerResp.body.stream;
} else {
const reader = innerResp.body.stream.getReader();
const r1 = await reader.read();
if (r1.done) {
respBody = new Uint8Array(0);
} else {
respBody = r1.value;
const r2 = await reader.read();
if (!r2.done) throw new TypeError("Unreachable");
}
}
isStreamingResponseBody = !(
typeof respBody === "string" ||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
);
} else {
if (innerResp.body.streamOrStatic.consumed === true) {
throw new TypeError("Body is unusable.");
}
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
}
} else {
respBody = new Uint8Array(0);
}
const ws = resp[_ws];
if (isStreamingResponseBody === false) {
const length = respBody.byteLength || core.byteLength(respBody);
const responseStr = http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
respBody,
length,
);
writeFixedResponse(
serverId,
i,
responseStr,
length,
!ws, // Don't close socket if there is a deferred websocket upgrade.
respondFast,
);
}
(async () => {
if (!ws) {
if (hasBody && body[_state] !== "closed") {
// TODO(@littledivy): Optimize by draining in a single op.
try {
await req.arrayBuffer();
} catch { /* pass */ }
}
}
if (isStreamingResponseBody === true) {
const resourceRid = getReadableStreamRid(respBody);
if (resourceRid) {
if (respBody.locked) {
throw new TypeError("ReadableStream is locked.");
}
const reader = respBody.getReader(); // Aquire JS lock.
try {
core.opAsync(
"op_flash_write_resource",
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
0, // Content-Length will be set by the op.
null,
true,
),
serverId,
i,
resourceRid,
).then(() => {
// Release JS lock.
readableStreamClose(respBody);
});
} catch (error) {
await reader.cancel(error);
throw error;
}
} else {
const reader = respBody.getReader();
writeFixedResponse(
serverId,
i,
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
respBody.byteLength,
null,
),
respBody.byteLength,
false,
respondFast,
);
while (true) {
const { value, done } = await reader.read();
await respondChunked(
i,
value,
done,
);
if (done) break;
}
}
}
if (ws) {
const wsRid = await core.opAsync(
"op_flash_upgrade_websocket",
serverId,
i,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);
ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
}
ws[_serverHandleIdleTimeout]();
}
})().catch(onError);
} }
offset += tokens; offset += tokens;