diff --git a/ext/flash/01_http.js b/ext/flash/01_http.js index b28989ccb3..72c0cb125f 100644 --- a/ext/flash/01_http.js +++ b/ext/flash/01_http.js @@ -32,6 +32,7 @@ TypedArrayPrototypeSubarray, TypeError, Uint8Array, + Promise, Uint8ArrayPrototype, } = window.__bootstrap.primordials; @@ -227,6 +228,192 @@ } } + // TODO(@littledivy): Woah woah, cut down the number of arguments. + async function handleResponse( + req, + resp, + body, + hasBody, + method, + serverId, + i, + respondFast, + respondChunked, + ) { + // there might've been an HTTP upgrade. + if (resp === undefined) { + return; + } + const innerResp = toInnerResponse(resp); + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream | Uint8Array | null} */ + let respBody = null; + let isStreamingResponseBody = false; + if (innerResp.body !== null) { + if (typeof innerResp.body.streamOrStatic?.body === "string") { + if (innerResp.body.streamOrStatic.consumed === true) { + throw new TypeError("Body is unusable."); + } + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; + isStreamingResponseBody = false; + } else if ( + ObjectPrototypeIsPrototypeOf( + ReadableStreamPrototype, + innerResp.body.streamOrStatic, + ) + ) { + if (innerResp.body.unusable()) { + throw new TypeError("Body is unusable."); + } + if ( + innerResp.body.length === null || + ObjectPrototypeIsPrototypeOf( + BlobPrototype, + innerResp.body.source, + ) + ) { + respBody = innerResp.body.stream; + } else { + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); + } else { + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } + } + isStreamingResponseBody = !( + typeof respBody === "string" || + ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody) + ); + } else { + if (innerResp.body.streamOrStatic.consumed === true) { + throw new TypeError("Body is unusable."); + } + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; + } + } else { + respBody = new Uint8Array(0); + } + + const ws = resp[_ws]; + if (isStreamingResponseBody === false) { + const length = respBody.byteLength || core.byteLength(respBody); + const responseStr = http1Response( + method, + innerResp.status ?? 200, + innerResp.headerList, + respBody, + length, + ); + writeFixedResponse( + serverId, + i, + responseStr, + length, + !ws, // Don't close socket if there is a deferred websocket upgrade. + respondFast, + ); + } + + (async () => { + if (!ws) { + if (hasBody && body[_state] !== "closed") { + // TODO(@littledivy): Optimize by draining in a single op. + try { + await req.arrayBuffer(); + } catch { /* pass */ } + } + } + + if (isStreamingResponseBody === true) { + const resourceRid = getReadableStreamRid(respBody); + if (resourceRid) { + if (respBody.locked) { + throw new TypeError("ReadableStream is locked."); + } + const reader = respBody.getReader(); // Aquire JS lock. + try { + core.opAsync( + "op_flash_write_resource", + http1Response( + method, + innerResp.status ?? 200, + innerResp.headerList, + 0, // Content-Length will be set by the op. + null, + true, + ), + serverId, + i, + resourceRid, + ).then(() => { + // Release JS lock. + readableStreamClose(respBody); + }); + } catch (error) { + await reader.cancel(error); + throw error; + } + } else { + const reader = respBody.getReader(); + writeFixedResponse( + serverId, + i, + http1Response( + method, + innerResp.status ?? 200, + innerResp.headerList, + respBody.byteLength, + null, + ), + respBody.byteLength, + false, + respondFast, + ); + while (true) { + const { value, done } = await reader.read(); + await respondChunked( + i, + value, + done, + ); + if (done) break; + } + } + } + + if (ws) { + const wsRid = await core.opAsync( + "op_flash_upgrade_websocket", + serverId, + i, + ); + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); + if (ws[_idleTimeoutDuration]) { + ws.addEventListener( + "close", + () => clearTimeout(ws[_idleTimeoutTimeout]), + ); + } + ws[_serverHandleIdleTimeout](); + } + })(); + } + async function serve(arg1, arg2) { let options = undefined; let handler = undefined; @@ -353,183 +540,38 @@ let resp; try { - resp = await handler(req); + resp = handler(req); + if (resp instanceof Promise || typeof resp.then === "function") { + resp.then((resp) => + handleResponse( + req, + resp, + body, + hasBody, + method, + serverId, + i, + respondFast, + respondChunked, + ) + ).catch(onError); + continue; + } } catch (e) { resp = await onError(e); } - // there might've been an HTTP upgrade. - if (resp === undefined) { - return; - } - const innerResp = toInnerResponse(resp); - // If response body length is known, it will be sent synchronously in a - // single op, in other case a "response body" resource will be created and - // we'll be streaming it. - /** @type {ReadableStream | Uint8Array | null} */ - let respBody = null; - let isStreamingResponseBody = false; - if (innerResp.body !== null) { - if (typeof innerResp.body.streamOrStatic?.body === "string") { - if (innerResp.body.streamOrStatic.consumed === true) { - throw new TypeError("Body is unusable."); - } - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; - isStreamingResponseBody = false; - } else if ( - ObjectPrototypeIsPrototypeOf( - ReadableStreamPrototype, - innerResp.body.streamOrStatic, - ) - ) { - if (innerResp.body.unusable()) { - throw new TypeError("Body is unusable."); - } - if ( - innerResp.body.length === null || - ObjectPrototypeIsPrototypeOf( - BlobPrototype, - innerResp.body.source, - ) - ) { - respBody = innerResp.body.stream; - } else { - const reader = innerResp.body.stream.getReader(); - const r1 = await reader.read(); - if (r1.done) { - respBody = new Uint8Array(0); - } else { - respBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); - } - } - isStreamingResponseBody = !( - typeof respBody === "string" || - ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody) - ); - } else { - if (innerResp.body.streamOrStatic.consumed === true) { - throw new TypeError("Body is unusable."); - } - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; - } - } else { - respBody = new Uint8Array(0); - } - - const ws = resp[_ws]; - if (isStreamingResponseBody === false) { - const length = respBody.byteLength || core.byteLength(respBody); - const responseStr = http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - respBody, - length, - ); - writeFixedResponse( - serverId, - i, - responseStr, - length, - !ws, // Don't close socket if there is a deferred websocket upgrade. - respondFast, - ); - } - - (async () => { - if (!ws) { - if (hasBody && body[_state] !== "closed") { - // TODO(@littledivy): Optimize by draining in a single op. - try { - await req.arrayBuffer(); - } catch { /* pass */ } - } - } - - if (isStreamingResponseBody === true) { - const resourceRid = getReadableStreamRid(respBody); - if (resourceRid) { - if (respBody.locked) { - throw new TypeError("ReadableStream is locked."); - } - const reader = respBody.getReader(); // Aquire JS lock. - try { - core.opAsync( - "op_flash_write_resource", - http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - 0, // Content-Length will be set by the op. - null, - true, - ), - serverId, - i, - resourceRid, - ).then(() => { - // Release JS lock. - readableStreamClose(respBody); - }); - } catch (error) { - await reader.cancel(error); - throw error; - } - } else { - const reader = respBody.getReader(); - writeFixedResponse( - serverId, - i, - http1Response( - method, - innerResp.status ?? 200, - innerResp.headerList, - respBody.byteLength, - null, - ), - respBody.byteLength, - false, - respondFast, - ); - while (true) { - const { value, done } = await reader.read(); - await respondChunked( - i, - value, - done, - ); - if (done) break; - } - } - } - - if (ws) { - const wsRid = await core.opAsync( - "op_flash_upgrade_websocket", - serverId, - i, - ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); - - ws[_eventLoop](); - if (ws[_idleTimeoutDuration]) { - ws.addEventListener( - "close", - () => clearTimeout(ws[_idleTimeoutTimeout]), - ); - } - ws[_serverHandleIdleTimeout](); - } - })().catch(onError); + handleResponse( + req, + resp, + body, + hasBody, + method, + serverId, + i, + respondFast, + respondChunked, + ); } offset += tokens;