diff --git a/http/http_bench.ts b/http/http_bench.ts index 6d72d4be6a..06043f9e4e 100644 --- a/http/http_bench.ts +++ b/http/http_bench.ts @@ -3,13 +3,12 @@ import { serve } from "./server.ts"; const addr = Deno.args[1] || "127.0.0.1:4500"; const server = serve(addr); - const body = new TextEncoder().encode("Hello World"); async function main(): Promise { console.log(`http://${addr}/`); - for await (const request of server) { - request.respond({ status: 200, body }); + for await (const req of server) { + req.respond({ body }); } } diff --git a/http/server.ts b/http/server.ts index 484ecf808d..281f8d3025 100644 --- a/http/server.ts +++ b/http/server.ts @@ -1,55 +1,14 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. const { listen, copy, toAsyncIterator } = Deno; +type Listener = Deno.Listener; type Conn = Deno.Conn; type Reader = Deno.Reader; type Writer = Deno.Writer; import { BufReader, BufState, BufWriter } from "../io/bufio.ts"; import { TextProtoReader } from "../textproto/mod.ts"; import { STATUS_TEXT } from "./http_status.ts"; -import { assert } from "../testing/asserts.ts"; - -interface Deferred { - promise: Promise<{}>; - resolve: () => void; - reject: () => void; -} - -function deferred(isResolved = false): Deferred { - let resolve, reject; - const promise = new Promise( - (res, rej): void => { - resolve = res; - reject = rej; - } - ); - if (isResolved) { - resolve(); - } - return { - promise, - resolve, - reject - }; -} - -interface HttpConn extends Conn { - // When read by a newly created request B, lastId is the id pointing to a previous - // request A, such that we must wait for responses to A to complete before - // writing B's response. - lastPipelineId: number; - pendingDeferredMap: Map; -} - -function createHttpConn(c: Conn): HttpConn { - const httpConn = Object.assign(c, { - lastPipelineId: 0, - pendingDeferredMap: new Map() - }); - - const resolvedDeferred = deferred(true); - httpConn.pendingDeferredMap.set(0, resolvedDeferred); - return httpConn; -} +import { assert, fail } from "../testing/asserts.ts"; +import { deferred, Deferred, MuxAsyncIterator } from "../util/async.ts"; function bufWriter(w: Writer): BufWriter { if (w instanceof BufWriter) { @@ -58,6 +17,7 @@ function bufWriter(w: Writer): BufWriter { return new BufWriter(w); } } + export function setContentLength(r: Response): void { if (!r.headers) { r.headers = new Headers(); @@ -74,6 +34,7 @@ export function setContentLength(r: Response): void { } } } + async function writeChunkedBody(w: Writer, r: Reader): Promise { const writer = bufWriter(w); const encoder = new TextEncoder(); @@ -90,6 +51,7 @@ async function writeChunkedBody(w: Writer, r: Reader): Promise { const endChunk = encoder.encode("0\r\n\r\n"); await writer.write(endChunk); } + export async function writeResponse(w: Writer, r: Response): Promise { const protoMajor = 1; const protoMinor = 1; @@ -131,6 +93,7 @@ export async function writeResponse(w: Writer, r: Response): Promise { } await writer.flush(); } + async function readAllIterator( it: AsyncIterableIterator ): Promise { @@ -154,14 +117,14 @@ async function readAllIterator( } export class ServerRequest { - pipelineId: number; url: string; method: string; proto: string; headers: Headers; - conn: HttpConn; + conn: Conn; r: BufReader; w: BufWriter; + done: Deferred = deferred(); public async *bodyStream(): AsyncIterableIterator { if (this.headers.has("content-length")) { @@ -244,134 +207,102 @@ export class ServerRequest { } async respond(r: Response): Promise { - // Check and wait if the previous request is done responding. - const lastPipelineId = this.pipelineId - 1; - const lastPipelineDeferred = this.conn.pendingDeferredMap.get( - lastPipelineId - ); - assert(!!lastPipelineDeferred); - await lastPipelineDeferred.promise; - // If yes, delete old deferred and proceed with writing. - this.conn.pendingDeferredMap.delete(lastPipelineId); // Write our response! await writeResponse(this.w, r); - // Signal the next pending request that it can start writing. - const currPipelineDeferred = this.conn.pendingDeferredMap.get( - this.pipelineId - ); - assert(!!currPipelineDeferred); - currPipelineDeferred.resolve(); + // Signal that this request has been processed and the next pipelined + // request on the same connection can be accepted. + this.done.resolve(); } } -interface ServeEnv { - reqQueue: ServerRequest[]; - serveDeferred: Deferred; -} - -/** Continuously read more requests from conn until EOF - * Calls maybeHandleReq. - * bufr is empty on a fresh TCP connection. - * Would be passed around and reused for later request on same conn - * TODO: make them async function after this change is done - * https://github.com/tc39/ecma262/pull/1250 - * See https://v8.dev/blog/fast-async - */ async function readRequest( - c: HttpConn, - bufr?: BufReader + conn: Conn, + bufr: BufReader ): Promise<[ServerRequest, BufState]> { - if (!bufr) { - bufr = new BufReader(c); - } - const bufw = new BufWriter(c); const req = new ServerRequest(); - - // Set and incr pipeline id; - req.pipelineId = ++c.lastPipelineId; - // Set a new pipeline deferred associated with this request - // for future requests to wait for. - c.pendingDeferredMap.set(req.pipelineId, deferred()); - - req.conn = c; - req.r = bufr!; - req.w = bufw; - const tp = new TextProtoReader(bufr!); - - let s: string; + req.conn = conn; + req.r = bufr; + req.w = new BufWriter(conn); + const tp = new TextProtoReader(bufr); let err: BufState; - // First line: GET /index.html HTTP/1.0 - [s, err] = await tp.readLine(); + let firstLine: string; + [firstLine, err] = await tp.readLine(); if (err) { return [null, err]; } - [req.method, req.url, req.proto] = s.split(" ", 3); - + [req.method, req.url, req.proto] = firstLine.split(" ", 3); [req.headers, err] = await tp.readMIMEHeader(); - return [req, err]; } -function maybeHandleReq( - env: ServeEnv, - conn: Conn, - maybeReq: [ServerRequest, BufState] -): void { - const [req, _err] = maybeReq; - if (_err) { - conn.close(); // assume EOF for now... - return; +export class Server implements AsyncIterable { + private closing = false; + + constructor(public listener: Listener) {} + + close(): void { + this.closing = true; + this.listener.close(); } - env.reqQueue.push(req); // push req to queue - env.serveDeferred.resolve(); // signal while loop to process it -} -function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void { - readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); -} + // Yields all HTTP requests on a single TCP connection. + private async *iterateHttpRequests( + conn: Conn + ): AsyncIterableIterator { + const bufr = new BufReader(conn); + let bufStateErr: BufState; + let req: ServerRequest; -export async function* serve( - addr: string -): AsyncIterableIterator { - const listener = listen("tcp", addr); - const env: ServeEnv = { - reqQueue: [], // in case multiple promises are ready - serveDeferred: deferred() - }; - - // Routine that keeps calling accept - let handleConn = (_conn: Conn): void => {}; - let scheduleAccept = (): void => {}; - const acceptRoutine = (): void => { - scheduleAccept = (): void => { - listener.accept().then(handleConn); - }; - handleConn = (conn: Conn): void => { - const httpConn = createHttpConn(conn); - serveConn(env, httpConn); // don't block - scheduleAccept(); // schedule next accept - }; - - scheduleAccept(); - }; - - acceptRoutine(); - - // Loop hack to allow yield (yield won't work in callbacks) - while (true) { - await env.serveDeferred.promise; - env.serveDeferred = deferred(); // use a new deferred - let queueToProcess = env.reqQueue; - env.reqQueue = []; - for (const result of queueToProcess) { - yield result; - // Continue read more from conn when user is done with the current req - // Moving this here makes it easier to manage - serveConn(env, result.conn, result.r); + while (!this.closing) { + [req, bufStateErr] = await readRequest(conn, bufr); + if (bufStateErr) break; + yield req; + // Wait for the request to be processed before we accept a new request on + // this connection. + await req.done; } + + if (bufStateErr === "EOF") { + // The connection was gracefully closed. + } else if (bufStateErr instanceof Error) { + // TODO(ry): send something back like a HTTP 500 status. + } else if (this.closing) { + // There are more requests incoming but the server is closing. + // TODO(ry): send a back a HTTP 503 Service Unavailable status. + } else { + fail(`unexpected BufState: ${bufStateErr}`); + } + + conn.close(); } - listener.close(); + + // Accepts a new TCP connection and yields all HTTP requests that arrive on + // it. When a connection is accepted, it also creates a new iterator of the + // same kind and adds it to the request multiplexer so that another TCP + // connection can be accepted. + private async *acceptConnAndIterateHttpRequests( + mux: MuxAsyncIterator + ): AsyncIterableIterator { + if (this.closing) return; + // Wait for a new connection. + const conn = await this.listener.accept(); + // Try to accept another connection and add it to the multiplexer. + mux.add(this.acceptConnAndIterateHttpRequests(mux)); + // Yield the requests that arrive on the just-accepted connection. + yield* this.iterateHttpRequests(conn); + } + + [Symbol.asyncIterator](): AsyncIterableIterator { + const mux: MuxAsyncIterator = new MuxAsyncIterator(); + mux.add(this.acceptConnAndIterateHttpRequests(mux)); + return mux.iterate(); + } +} + +export function serve(addr: string): Server { + const listener = listen("tcp", addr); + return new Server(listener); } export async function listenAndServe( diff --git a/http/server_test.ts b/http/server_test.ts index 82a368395e..396a0321a1 100644 --- a/http/server_test.ts +++ b/http/server_test.ts @@ -22,31 +22,6 @@ const dec = new TextDecoder(); type Handler = () => void; -interface Deferred { - promise: Promise<{}>; - resolve: Handler; - reject: Handler; -} - -function deferred(isResolved = false): Deferred { - let resolve: Handler = (): void => void 0; - let reject: Handler = (): void => void 0; - const promise = new Promise( - (res, rej): void => { - resolve = res; - reject = rej; - } - ); - if (isResolved) { - resolve(); - } - return { - promise, - resolve, - reject - }; -} - const responseTests: ResponseTest[] = [ // Default response { @@ -72,8 +47,8 @@ test(async function responseWrite(): Promise { const buf = new Buffer(); const bufw = new BufWriter(buf); const request = new ServerRequest(); - request.pipelineId = 1; request.w = bufw; + request.conn = { localAddr: "", remoteAddr: "", @@ -86,13 +61,12 @@ test(async function responseWrite(): Promise { write: async (): Promise => { return -1; }, - close: (): void => {}, - lastPipelineId: 0, - pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]]) + close: (): void => {} }; await request.respond(testCase.response); assertEquals(buf.toString(), testCase.raw); + await request.done; } }); diff --git a/test.ts b/test.ts index cc921cf9f1..221bbc9858 100755 --- a/test.ts +++ b/test.ts @@ -16,6 +16,7 @@ import "./strings/test.ts"; import "./testing/test.ts"; import "./textproto/test.ts"; import "./toml/test.ts"; +import "./util/test.ts"; import "./ws/test.ts"; import "./testing/main.ts"; diff --git a/util/async.ts b/util/async.ts new file mode 100644 index 0000000000..f9f2477d06 --- /dev/null +++ b/util/async.ts @@ -0,0 +1,85 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. + +// TODO(ry) It'd be better to make Deferred a class that inherits from +// Promise, rather than an interface. This is possible in ES2016, however +// typescript produces broken code when targeting ES5 code. +// See https://github.com/Microsoft/TypeScript/issues/15202 +// At the time of writing, the github issue is closed but the problem remains. +export interface Deferred extends Promise { + resolve: (value?: T | PromiseLike) => void; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + reject: (reason?: any) => void; +} + +/** Creates a Promise with the `reject` and `resolve` functions + * placed as methods on the promise object itself. It allows you to do: + * + * const p = deferred(); + * // ... + * p.resolve(42); + */ +export function deferred(): Deferred { + let methods; + const promise = new Promise( + (resolve, reject): void => { + methods = { resolve, reject }; + } + ); + return Object.assign(promise, methods) as Deferred; +} + +interface TaggedYieldedValue { + iterator: AsyncIterableIterator; + value: T; +} + +/** The MuxAsyncIterator class multiplexes multiple async iterators into a + * single stream. It currently makes a few assumptions: + * - The iterators do not throw. + * - The final result (the value returned and not yielded from the iterator) + * does not matter; if there is any, it is discarded. + */ +export class MuxAsyncIterator implements AsyncIterable { + private iteratorCount = 0; + private yields: Array> = []; + private signal: Deferred = deferred(); + + add(iterator: AsyncIterableIterator): void { + ++this.iteratorCount; + this.callIteratorNext(iterator); + } + + private async callIteratorNext( + iterator: AsyncIterableIterator + ): Promise { + const { value, done } = await iterator.next(); + if (done) { + --this.iteratorCount; + } else { + this.yields.push({ iterator, value }); + } + this.signal.resolve(); + } + + async *iterate(): AsyncIterableIterator { + while (this.iteratorCount > 0) { + // Sleep until any of the wrapped iterators yields. + await this.signal; + + // Note that while we're looping over `yields`, new items may be added. + for (let i = 0; i < this.yields.length; i++) { + const { iterator, value } = this.yields[i]; + yield value; + this.callIteratorNext(iterator); + } + + // Clear the `yields` list and reset the `signal` promise. + this.yields.length = 0; + this.signal = deferred(); + } + } + + [Symbol.asyncIterator](): AsyncIterableIterator { + return this.iterate(); + } +} diff --git a/util/async_test.ts b/util/async_test.ts new file mode 100644 index 0000000000..c704002d41 --- /dev/null +++ b/util/async_test.ts @@ -0,0 +1,34 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +import { test, runIfMain } from "../testing/mod.ts"; +import { assertEquals } from "../testing/asserts.ts"; +import { MuxAsyncIterator, deferred } from "./async.ts"; + +test(async function asyncDeferred(): Promise { + const d = deferred(); + d.resolve(12); +}); + +async function* gen123(): AsyncIterableIterator { + yield 1; + yield 2; + yield 3; +} + +async function* gen456(): AsyncIterableIterator { + yield 4; + yield 5; + yield 6; +} + +test(async function asyncMuxAsyncIterator(): Promise { + const mux = new MuxAsyncIterator(); + mux.add(gen123()); + mux.add(gen456()); + const results = new Set(); + for await (const value of mux) { + results.add(value); + } + assertEquals(results.size, 6); +}); + +runIfMain(import.meta); diff --git a/util/test.ts b/util/test.ts index a617c10ab3..ede984904f 100644 --- a/util/test.ts +++ b/util/test.ts @@ -1 +1,2 @@ +import "./async_test.ts"; import "./deep_assign_test.ts";