diff --git a/.eslintignore b/.eslintignore index 9f113730f7..93c2c09248 100644 --- a/.eslintignore +++ b/.eslintignore @@ -1,4 +1,3 @@ /flags/ /fs/ -/http/ /ws/ diff --git a/http/server.ts b/http/server.ts index f3b92ee907..5cf658cf34 100644 --- a/http/server.ts +++ b/http/server.ts @@ -13,101 +13,13 @@ interface Deferred { resolve: () => void; reject: () => void; } - -function deferred(): Deferred { - let resolve, reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { - promise, - resolve, - reject - }; -} - -interface ServeEnv { - reqQueue: ServerRequest[]; - serveDeferred: Deferred; -} - -/** Continuously read more requests from conn until EOF - * Calls maybeHandleReq. - * bufr is empty on a fresh TCP connection. - * Would be passed around and reused for later request on same conn - * TODO: make them async function after this change is done - * https://github.com/tc39/ecma262/pull/1250 - * See https://v8.dev/blog/fast-async - */ -function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) { - readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); -} - -function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) { - const [req, _err] = maybeReq; - if (_err) { - conn.close(); // assume EOF for now... - return; - } - env.reqQueue.push(req); // push req to queue - env.serveDeferred.resolve(); // signal while loop to process it -} - -export async function* serve(addr: string) { - const listener = listen("tcp", addr); - const env: ServeEnv = { - reqQueue: [], // in case multiple promises are ready - serveDeferred: deferred() - }; - - // Routine that keeps calling accept - const acceptRoutine = () => { - const handleConn = (conn: Conn) => { - serveConn(env, conn); // don't block - scheduleAccept(); // schedule next accept - }; - const scheduleAccept = () => { - listener.accept().then(handleConn); - }; - scheduleAccept(); - }; - - acceptRoutine(); - - // Loop hack to allow yield (yield won't work in callbacks) - while (true) { - await env.serveDeferred.promise; - env.serveDeferred = deferred(); // use a new deferred - let queueToProcess = env.reqQueue; - env.reqQueue = []; - for (const result of queueToProcess) { - yield result; - // Continue read more from conn when user is done with the current req - // Moving this here makes it easier to manage - serveConn(env, result.conn, result.r); - } - } - listener.close(); -} - -export async function listenAndServe( - addr: string, - handler: (req: ServerRequest) => void -) { - const server = serve(addr); - - for await (const request of server) { - await handler(request); +function bufWriter(w: Writer): BufWriter { + if (w instanceof BufWriter) { + return w; + } else { + return new BufWriter(w); } } - -export interface Response { - status?: number; - headers?: Headers; - body?: Uint8Array | Reader; -} - export function setContentLength(r: Response): void { if (!r.headers) { r.headers = new Headers(); @@ -124,6 +36,83 @@ export function setContentLength(r: Response): void { } } } +async function writeChunkedBody(w: Writer, r: Reader) { + const writer = bufWriter(w); + const encoder = new TextEncoder(); + + for await (const chunk of toAsyncIterator(r)) { + const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`); + const end = encoder.encode("\r\n"); + await writer.write(start); + await writer.write(chunk); + await writer.write(end); + } + + const endChunk = encoder.encode("0\r\n\r\n"); + await writer.write(endChunk); +} +export async function writeResponse(w: Writer, r: Response): Promise { + const protoMajor = 1; + const protoMinor = 1; + const statusCode = r.status || 200; + const statusText = STATUS_TEXT.get(statusCode); + const writer = bufWriter(w); + if (!statusText) { + throw Error("bad status code"); + } + + let out = `HTTP/${protoMajor}.${protoMinor} ${statusCode} ${statusText}\r\n`; + + setContentLength(r); + + if (r.headers) { + for (const [key, value] of r.headers) { + out += `${key}: ${value}\r\n`; + } + } + out += "\r\n"; + + const header = new TextEncoder().encode(out); + let n = await writer.write(header); + assert(header.byteLength == n); + + if (r.body) { + if (r.body instanceof Uint8Array) { + n = await writer.write(r.body); + assert(r.body.byteLength == n); + } else { + if (r.headers.has("content-length")) { + const bodyLength = parseInt(r.headers.get("content-length")); + const n = await copy(writer, r.body); + assert(n == bodyLength); + } else { + await writeChunkedBody(writer, r.body); + } + } + } + await writer.flush(); +} +async function readAllIterator( + it: AsyncIterableIterator +): Promise { + const chunks = []; + let len = 0; + for await (const chunk of it) { + chunks.push(chunk); + len += chunk.length; + } + if (chunks.length === 0) { + // No need for copy + return chunks[0]; + } + const collected = new Uint8Array(len); + let offset = 0; + for (let chunk of chunks) { + collected.set(chunk, offset); + offset += chunk.length; + } + return collected; +} export class ServerRequest { url: string; @@ -159,22 +148,22 @@ export class ServerRequest { if (transferEncodings.includes("chunked")) { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(this.r); - let [line, _] = await tp.readLine(); + let [line] = await tp.readLine(); // TODO: handle chunk extension - let [chunkSizeString, optExt] = line.split(";"); + let [chunkSizeString] = line.split(";"); let chunkSize = parseInt(chunkSizeString, 16); if (Number.isNaN(chunkSize) || chunkSize < 0) { throw new Error("Invalid chunk size"); } while (chunkSize > 0) { let data = new Uint8Array(chunkSize); - let [nread, err] = await this.r.readFull(data); + let [nread] = await this.r.readFull(data); if (nread !== chunkSize) { throw new Error("Chunk data does not match size"); } yield data; await this.r.readLine(); // Consume \r\n - [line, _] = await tp.readLine(); + [line] = await tp.readLine(); chunkSize = parseInt(line, 16); } const [entityHeaders, err] = await tp.readMIMEHeader(); @@ -219,72 +208,32 @@ export class ServerRequest { } } -function bufWriter(w: Writer): BufWriter { - if (w instanceof BufWriter) { - return w; - } else { - return new BufWriter(w); - } +function deferred(): Deferred { + let resolve, reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + return { + promise, + resolve, + reject + }; } -export async function writeResponse(w: Writer, r: Response): Promise { - const protoMajor = 1; - const protoMinor = 1; - const statusCode = r.status || 200; - const statusText = STATUS_TEXT.get(statusCode); - const writer = bufWriter(w); - if (!statusText) { - throw Error("bad status code"); - } - - let out = `HTTP/${protoMajor}.${protoMinor} ${statusCode} ${statusText}\r\n`; - - setContentLength(r); - - if (r.headers) { - for (const [key, value] of r.headers) { - out += `${key}: ${value}\r\n`; - } - } - out += "\r\n"; - - const header = new TextEncoder().encode(out); - let n = await writer.write(header); - assert(header.byteLength == n); - - if (r.body) { - if (r.body instanceof Uint8Array) { - n = await writer.write(r.body); - assert(r.body.byteLength == n); - } else { - if (r.headers.has("content-length")) { - const bodyLength = parseInt(r.headers.get("content-length")); - const n = await copy(writer, r.body); - assert(n == bodyLength); - } else { - await writeChunkedBody(writer, r.body); - } - } - } - await writer.flush(); -} - -async function writeChunkedBody(w: Writer, r: Reader) { - const writer = bufWriter(w); - const encoder = new TextEncoder(); - - for await (const chunk of toAsyncIterator(r)) { - const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`); - const end = encoder.encode("\r\n"); - await writer.write(start); - await writer.write(chunk); - await writer.write(end); - } - - const endChunk = encoder.encode("0\r\n\r\n"); - await writer.write(endChunk); +interface ServeEnv { + reqQueue: ServerRequest[]; + serveDeferred: Deferred; } +/** Continuously read more requests from conn until EOF + * Calls maybeHandleReq. + * bufr is empty on a fresh TCP connection. + * Would be passed around and reused for later request on same conn + * TODO: make them async function after this change is done + * https://github.com/tc39/ecma262/pull/1250 + * See https://v8.dev/blog/fast-async + */ async function readRequest( c: Conn, bufr?: BufReader @@ -314,24 +263,73 @@ async function readRequest( return [req, err]; } -async function readAllIterator( - it: AsyncIterableIterator -): Promise { - const chunks = []; - let len = 0; - for await (const chunk of it) { - chunks.push(chunk); - len += chunk.length; +function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) { + const [req, _err] = maybeReq; + if (_err) { + conn.close(); // assume EOF for now... + return; } - if (chunks.length === 0) { - // No need for copy - return chunks[0]; - } - const collected = new Uint8Array(len); - let offset = 0; - for (let chunk of chunks) { - collected.set(chunk, offset); - offset += chunk.length; - } - return collected; + env.reqQueue.push(req); // push req to queue + env.serveDeferred.resolve(); // signal while loop to process it +} + +function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) { + readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); +} + +export async function* serve(addr: string) { + const listener = listen("tcp", addr); + const env: ServeEnv = { + reqQueue: [], // in case multiple promises are ready + serveDeferred: deferred() + }; + + // Routine that keeps calling accept + let handleConn = (_conn: Conn) => {}; + let scheduleAccept = () => {}; + const acceptRoutine = () => { + scheduleAccept = () => { + listener.accept().then(handleConn); + }; + handleConn = (conn: Conn) => { + serveConn(env, conn); // don't block + scheduleAccept(); // schedule next accept + }; + + scheduleAccept(); + }; + + acceptRoutine(); + + // Loop hack to allow yield (yield won't work in callbacks) + while (true) { + await env.serveDeferred.promise; + env.serveDeferred = deferred(); // use a new deferred + let queueToProcess = env.reqQueue; + env.reqQueue = []; + for (const result of queueToProcess) { + yield result; + // Continue read more from conn when user is done with the current req + // Moving this here makes it easier to manage + serveConn(env, result.conn, result.r); + } + } + listener.close(); +} + +export async function listenAndServe( + addr: string, + handler: (req: ServerRequest) => void +) { + const server = serve(addr); + + for await (const request of server) { + await handler(request); + } +} + +export interface Response { + status?: number; + headers?: Headers; + body?: Uint8Array | Reader; }