diff --git a/http/racing_server.ts b/http/racing_server.ts new file mode 100644 index 0000000000..c44fc12161 --- /dev/null +++ b/http/racing_server.ts @@ -0,0 +1,53 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +import { serve, ServerRequest } from "./server.ts"; + +const addr = Deno.args[1] || "127.0.0.1:4501"; +const server = serve(addr); + +const body = new TextEncoder().encode("Hello 1\n"); +const body4 = new TextEncoder().encode("World 4\n"); + +function sleep(ms: number): Promise { + return new Promise(res => setTimeout(res, ms)); +} + +async function delayedRespond(request: ServerRequest): Promise { + await sleep(3000); + await request.respond({ status: 200, body }); +} + +async function largeRespond(request: ServerRequest, c: string): Promise { + const b = new Uint8Array(1024 * 1024); + b.fill(c.charCodeAt(0)); + await request.respond({ status: 200, body: b }); +} + +async function main(): Promise { + let step = 1; + for await (const request of server) { + switch (step) { + case 1: + // Try to wait long enough. + // For pipelining, this should cause all the following response + // to block. + delayedRespond(request); + break; + case 2: + // HUGE body. + largeRespond(request, "a"); + break; + case 3: + // HUGE body. + largeRespond(request, "b"); + break; + default: + request.respond({ status: 200, body: body4 }); + break; + } + step++; + } +} + +main(); + +console.log("Racing server listening...\n"); diff --git a/http/racing_server_test.ts b/http/racing_server_test.ts new file mode 100644 index 0000000000..0c1a5c65f6 --- /dev/null +++ b/http/racing_server_test.ts @@ -0,0 +1,65 @@ +const { dial, run } = Deno; + +import { test } from "../testing/mod.ts"; +import { assert, assertEquals } from "../testing/asserts.ts"; +import { BufReader } from "../io/bufio.ts"; +import { TextProtoReader } from "../textproto/mod.ts"; + +let server; +async function startServer(): Promise { + server = run({ + args: ["deno", "-A", "http/racing_server.ts"], + stdout: "piped" + }); + // Once fileServer is ready it will write to its stdout. + const r = new TextProtoReader(new BufReader(server.stdout)); + const [s, err] = await r.readLine(); + assert(err == null); + assert(s.includes("Racing server listening...")); +} +function killServer(): void { + server.close(); + server.stdout.close(); +} + +let input = `GET / HTTP/1.1 + +GET / HTTP/1.1 + +GET / HTTP/1.1 + +GET / HTTP/1.1 + +`; +const HUGE_BODY_SIZE = 1024 * 1024; +let output = `HTTP/1.1 200 OK +content-length: 8 + +Hello 1 +HTTP/1.1 200 OK +content-length: ${HUGE_BODY_SIZE} + +${"a".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK +content-length: ${HUGE_BODY_SIZE} + +${"b".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK +content-length: 8 + +World 4 +`; + +test(async function serverPipelineRace(): Promise { + await startServer(); + + const conn = await dial("tcp", "127.0.0.1:4501"); + const r = new TextProtoReader(new BufReader(conn)); + await conn.write(new TextEncoder().encode(input)); + const outLines = output.split("\n"); + // length - 1 to disregard last empty line + for (let i = 0; i < outLines.length - 1; i++) { + const [s, err] = await r.readLine(); + assert(!err); + assertEquals(s, outLines[i]); + } + killServer(); +}); diff --git a/http/server.ts b/http/server.ts index 17295f7399..f1ef015ba4 100644 --- a/http/server.ts +++ b/http/server.ts @@ -13,6 +13,42 @@ interface Deferred { resolve: () => void; reject: () => void; } + +function deferred(isResolved = false): Deferred { + let resolve, reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + if (isResolved) { + resolve(); + } + return { + promise, + resolve, + reject + }; +} + +interface HttpConn extends Conn { + // When read by a newly created request B, lastId is the id pointing to a previous + // request A, such that we must wait for responses to A to complete before + // writing B's response. + lastPipelineId: number; + pendingDeferredMap: Map; +} + +function createHttpConn(c: Conn): HttpConn { + const httpConn = Object.assign(c, { + lastPipelineId: 0, + pendingDeferredMap: new Map() + }); + + const resolvedDeferred = deferred(true); + httpConn.pendingDeferredMap.set(0, resolvedDeferred); + return httpConn; +} + function bufWriter(w: Writer): BufWriter { if (w instanceof BufWriter) { return w; @@ -115,11 +151,12 @@ async function readAllIterator( } export class ServerRequest { + pipelineId: number; url: string; method: string; proto: string; headers: Headers; - conn: Conn; + conn: HttpConn; r: BufReader; w: BufWriter; @@ -204,23 +241,26 @@ export class ServerRequest { } async respond(r: Response): Promise { - return writeResponse(this.w, r); + // Check and wait if the previous request is done responding. + const lastPipelineId = this.pipelineId - 1; + const lastPipelineDeferred = this.conn.pendingDeferredMap.get( + lastPipelineId + ); + assert(!!lastPipelineDeferred); + await lastPipelineDeferred.promise; + // If yes, delete old deferred and proceed with writing. + this.conn.pendingDeferredMap.delete(lastPipelineId); + // Write our response! + await writeResponse(this.w, r); + // Signal the next pending request that it can start writing. + const currPipelineDeferred = this.conn.pendingDeferredMap.get( + this.pipelineId + ); + assert(!!currPipelineDeferred); + currPipelineDeferred.resolve(); } } -function deferred(): Deferred { - let resolve, reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { - promise, - resolve, - reject - }; -} - interface ServeEnv { reqQueue: ServerRequest[]; serveDeferred: Deferred; @@ -235,7 +275,7 @@ interface ServeEnv { * See https://v8.dev/blog/fast-async */ async function readRequest( - c: Conn, + c: HttpConn, bufr?: BufReader ): Promise<[ServerRequest, BufState]> { if (!bufr) { @@ -243,6 +283,13 @@ async function readRequest( } const bufw = new BufWriter(c); const req = new ServerRequest(); + + // Set and incr pipeline id; + req.pipelineId = ++c.lastPipelineId; + // Set a new pipeline deferred associated with this request + // for future requests to wait for. + c.pendingDeferredMap.set(req.pipelineId, deferred()); + req.conn = c; req.r = bufr!; req.w = bufw; @@ -277,7 +324,7 @@ function maybeHandleReq( env.serveDeferred.resolve(); // signal while loop to process it } -function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader): void { +function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void { readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); } @@ -298,7 +345,8 @@ export async function* serve( listener.accept().then(handleConn); }; handleConn = (conn: Conn) => { - serveConn(env, conn); // don't block + const httpConn = createHttpConn(conn); + serveConn(env, httpConn); // don't block scheduleAccept(); // schedule next accept }; diff --git a/http/server_test.ts b/http/server_test.ts index 4f1bc1c40a..09a8df2bd1 100644 --- a/http/server_test.ts +++ b/http/server_test.ts @@ -19,6 +19,28 @@ interface ResponseTest { const enc = new TextEncoder(); const dec = new TextDecoder(); +interface Deferred { + promise: Promise<{}>; + resolve: () => void; + reject: () => void; +} + +function deferred(isResolved = false): Deferred { + let resolve, reject; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + if (isResolved) { + resolve(); + } + return { + promise, + resolve, + reject + }; +} + const responseTests: ResponseTest[] = [ // Default response { @@ -44,7 +66,24 @@ test(async function responseWrite() { const buf = new Buffer(); const bufw = new BufWriter(buf); const request = new ServerRequest(); + request.pipelineId = 1; request.w = bufw; + request.conn = { + localAddr: "", + remoteAddr: "", + rid: -1, + closeRead: () => {}, + closeWrite: () => {}, + read: async () => { + return { eof: true, nread: 0 }; + }, + write: async () => { + return -1; + }, + close: () => {}, + lastPipelineId: 0, + pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]]) + }; await request.respond(testCase.response); assertEquals(buf.toString(), testCase.raw); diff --git a/http/test.ts b/http/test.ts index 25b8af79f0..938ea4458c 100644 --- a/http/test.ts +++ b/http/test.ts @@ -1,3 +1,4 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. import "./server_test.ts"; import "./file_server_test.ts"; +import "./racing_server_test.ts";