mirror of
https://github.com/denoland/deno.git
synced 2025-01-14 10:01:51 -05:00
Enforce HTTP/1.1 pipeline response order (#331)
This commit is contained in:
parent
2c119627dc
commit
144ef0e08d
5 changed files with 224 additions and 18 deletions
53
http/racing_server.ts
Normal file
53
http/racing_server.ts
Normal file
|
@ -0,0 +1,53 @@
|
|||
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
|
||||
import { serve, ServerRequest } from "./server.ts";
|
||||
|
||||
const addr = Deno.args[1] || "127.0.0.1:4501";
|
||||
const server = serve(addr);
|
||||
|
||||
const body = new TextEncoder().encode("Hello 1\n");
|
||||
const body4 = new TextEncoder().encode("World 4\n");
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise(res => setTimeout(res, ms));
|
||||
}
|
||||
|
||||
async function delayedRespond(request: ServerRequest): Promise<void> {
|
||||
await sleep(3000);
|
||||
await request.respond({ status: 200, body });
|
||||
}
|
||||
|
||||
async function largeRespond(request: ServerRequest, c: string): Promise<void> {
|
||||
const b = new Uint8Array(1024 * 1024);
|
||||
b.fill(c.charCodeAt(0));
|
||||
await request.respond({ status: 200, body: b });
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
let step = 1;
|
||||
for await (const request of server) {
|
||||
switch (step) {
|
||||
case 1:
|
||||
// Try to wait long enough.
|
||||
// For pipelining, this should cause all the following response
|
||||
// to block.
|
||||
delayedRespond(request);
|
||||
break;
|
||||
case 2:
|
||||
// HUGE body.
|
||||
largeRespond(request, "a");
|
||||
break;
|
||||
case 3:
|
||||
// HUGE body.
|
||||
largeRespond(request, "b");
|
||||
break;
|
||||
default:
|
||||
request.respond({ status: 200, body: body4 });
|
||||
break;
|
||||
}
|
||||
step++;
|
||||
}
|
||||
}
|
||||
|
||||
main();
|
||||
|
||||
console.log("Racing server listening...\n");
|
65
http/racing_server_test.ts
Normal file
65
http/racing_server_test.ts
Normal file
|
@ -0,0 +1,65 @@
|
|||
const { dial, run } = Deno;
|
||||
|
||||
import { test } from "../testing/mod.ts";
|
||||
import { assert, assertEquals } from "../testing/asserts.ts";
|
||||
import { BufReader } from "../io/bufio.ts";
|
||||
import { TextProtoReader } from "../textproto/mod.ts";
|
||||
|
||||
let server;
|
||||
async function startServer(): Promise<void> {
|
||||
server = run({
|
||||
args: ["deno", "-A", "http/racing_server.ts"],
|
||||
stdout: "piped"
|
||||
});
|
||||
// Once fileServer is ready it will write to its stdout.
|
||||
const r = new TextProtoReader(new BufReader(server.stdout));
|
||||
const [s, err] = await r.readLine();
|
||||
assert(err == null);
|
||||
assert(s.includes("Racing server listening..."));
|
||||
}
|
||||
function killServer(): void {
|
||||
server.close();
|
||||
server.stdout.close();
|
||||
}
|
||||
|
||||
let input = `GET / HTTP/1.1
|
||||
|
||||
GET / HTTP/1.1
|
||||
|
||||
GET / HTTP/1.1
|
||||
|
||||
GET / HTTP/1.1
|
||||
|
||||
`;
|
||||
const HUGE_BODY_SIZE = 1024 * 1024;
|
||||
let output = `HTTP/1.1 200 OK
|
||||
content-length: 8
|
||||
|
||||
Hello 1
|
||||
HTTP/1.1 200 OK
|
||||
content-length: ${HUGE_BODY_SIZE}
|
||||
|
||||
${"a".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK
|
||||
content-length: ${HUGE_BODY_SIZE}
|
||||
|
||||
${"b".repeat(HUGE_BODY_SIZE)}HTTP/1.1 200 OK
|
||||
content-length: 8
|
||||
|
||||
World 4
|
||||
`;
|
||||
|
||||
test(async function serverPipelineRace(): Promise<void> {
|
||||
await startServer();
|
||||
|
||||
const conn = await dial("tcp", "127.0.0.1:4501");
|
||||
const r = new TextProtoReader(new BufReader(conn));
|
||||
await conn.write(new TextEncoder().encode(input));
|
||||
const outLines = output.split("\n");
|
||||
// length - 1 to disregard last empty line
|
||||
for (let i = 0; i < outLines.length - 1; i++) {
|
||||
const [s, err] = await r.readLine();
|
||||
assert(!err);
|
||||
assertEquals(s, outLines[i]);
|
||||
}
|
||||
killServer();
|
||||
});
|
|
@ -13,6 +13,42 @@ interface Deferred {
|
|||
resolve: () => void;
|
||||
reject: () => void;
|
||||
}
|
||||
|
||||
function deferred(isResolved = false): Deferred {
|
||||
let resolve, reject;
|
||||
const promise = new Promise((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
if (isResolved) {
|
||||
resolve();
|
||||
}
|
||||
return {
|
||||
promise,
|
||||
resolve,
|
||||
reject
|
||||
};
|
||||
}
|
||||
|
||||
interface HttpConn extends Conn {
|
||||
// When read by a newly created request B, lastId is the id pointing to a previous
|
||||
// request A, such that we must wait for responses to A to complete before
|
||||
// writing B's response.
|
||||
lastPipelineId: number;
|
||||
pendingDeferredMap: Map<number, Deferred>;
|
||||
}
|
||||
|
||||
function createHttpConn(c: Conn): HttpConn {
|
||||
const httpConn = Object.assign(c, {
|
||||
lastPipelineId: 0,
|
||||
pendingDeferredMap: new Map()
|
||||
});
|
||||
|
||||
const resolvedDeferred = deferred(true);
|
||||
httpConn.pendingDeferredMap.set(0, resolvedDeferred);
|
||||
return httpConn;
|
||||
}
|
||||
|
||||
function bufWriter(w: Writer): BufWriter {
|
||||
if (w instanceof BufWriter) {
|
||||
return w;
|
||||
|
@ -115,11 +151,12 @@ async function readAllIterator(
|
|||
}
|
||||
|
||||
export class ServerRequest {
|
||||
pipelineId: number;
|
||||
url: string;
|
||||
method: string;
|
||||
proto: string;
|
||||
headers: Headers;
|
||||
conn: Conn;
|
||||
conn: HttpConn;
|
||||
r: BufReader;
|
||||
w: BufWriter;
|
||||
|
||||
|
@ -204,23 +241,26 @@ export class ServerRequest {
|
|||
}
|
||||
|
||||
async respond(r: Response): Promise<void> {
|
||||
return writeResponse(this.w, r);
|
||||
// Check and wait if the previous request is done responding.
|
||||
const lastPipelineId = this.pipelineId - 1;
|
||||
const lastPipelineDeferred = this.conn.pendingDeferredMap.get(
|
||||
lastPipelineId
|
||||
);
|
||||
assert(!!lastPipelineDeferred);
|
||||
await lastPipelineDeferred.promise;
|
||||
// If yes, delete old deferred and proceed with writing.
|
||||
this.conn.pendingDeferredMap.delete(lastPipelineId);
|
||||
// Write our response!
|
||||
await writeResponse(this.w, r);
|
||||
// Signal the next pending request that it can start writing.
|
||||
const currPipelineDeferred = this.conn.pendingDeferredMap.get(
|
||||
this.pipelineId
|
||||
);
|
||||
assert(!!currPipelineDeferred);
|
||||
currPipelineDeferred.resolve();
|
||||
}
|
||||
}
|
||||
|
||||
function deferred(): Deferred {
|
||||
let resolve, reject;
|
||||
const promise = new Promise((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return {
|
||||
promise,
|
||||
resolve,
|
||||
reject
|
||||
};
|
||||
}
|
||||
|
||||
interface ServeEnv {
|
||||
reqQueue: ServerRequest[];
|
||||
serveDeferred: Deferred;
|
||||
|
@ -235,7 +275,7 @@ interface ServeEnv {
|
|||
* See https://v8.dev/blog/fast-async
|
||||
*/
|
||||
async function readRequest(
|
||||
c: Conn,
|
||||
c: HttpConn,
|
||||
bufr?: BufReader
|
||||
): Promise<[ServerRequest, BufState]> {
|
||||
if (!bufr) {
|
||||
|
@ -243,6 +283,13 @@ async function readRequest(
|
|||
}
|
||||
const bufw = new BufWriter(c);
|
||||
const req = new ServerRequest();
|
||||
|
||||
// Set and incr pipeline id;
|
||||
req.pipelineId = ++c.lastPipelineId;
|
||||
// Set a new pipeline deferred associated with this request
|
||||
// for future requests to wait for.
|
||||
c.pendingDeferredMap.set(req.pipelineId, deferred());
|
||||
|
||||
req.conn = c;
|
||||
req.r = bufr!;
|
||||
req.w = bufw;
|
||||
|
@ -277,7 +324,7 @@ function maybeHandleReq(
|
|||
env.serveDeferred.resolve(); // signal while loop to process it
|
||||
}
|
||||
|
||||
function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader): void {
|
||||
function serveConn(env: ServeEnv, conn: HttpConn, bufr?: BufReader): void {
|
||||
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
|
||||
}
|
||||
|
||||
|
@ -298,7 +345,8 @@ export async function* serve(
|
|||
listener.accept().then(handleConn);
|
||||
};
|
||||
handleConn = (conn: Conn) => {
|
||||
serveConn(env, conn); // don't block
|
||||
const httpConn = createHttpConn(conn);
|
||||
serveConn(env, httpConn); // don't block
|
||||
scheduleAccept(); // schedule next accept
|
||||
};
|
||||
|
||||
|
|
|
@ -19,6 +19,28 @@ interface ResponseTest {
|
|||
const enc = new TextEncoder();
|
||||
const dec = new TextDecoder();
|
||||
|
||||
interface Deferred {
|
||||
promise: Promise<{}>;
|
||||
resolve: () => void;
|
||||
reject: () => void;
|
||||
}
|
||||
|
||||
function deferred(isResolved = false): Deferred {
|
||||
let resolve, reject;
|
||||
const promise = new Promise((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
if (isResolved) {
|
||||
resolve();
|
||||
}
|
||||
return {
|
||||
promise,
|
||||
resolve,
|
||||
reject
|
||||
};
|
||||
}
|
||||
|
||||
const responseTests: ResponseTest[] = [
|
||||
// Default response
|
||||
{
|
||||
|
@ -44,7 +66,24 @@ test(async function responseWrite() {
|
|||
const buf = new Buffer();
|
||||
const bufw = new BufWriter(buf);
|
||||
const request = new ServerRequest();
|
||||
request.pipelineId = 1;
|
||||
request.w = bufw;
|
||||
request.conn = {
|
||||
localAddr: "",
|
||||
remoteAddr: "",
|
||||
rid: -1,
|
||||
closeRead: () => {},
|
||||
closeWrite: () => {},
|
||||
read: async () => {
|
||||
return { eof: true, nread: 0 };
|
||||
},
|
||||
write: async () => {
|
||||
return -1;
|
||||
},
|
||||
close: () => {},
|
||||
lastPipelineId: 0,
|
||||
pendingDeferredMap: new Map([[0, deferred(true)], [1, deferred()]])
|
||||
};
|
||||
|
||||
await request.respond(testCase.response);
|
||||
assertEquals(buf.toString(), testCase.raw);
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
|
||||
import "./server_test.ts";
|
||||
import "./file_server_test.ts";
|
||||
import "./racing_server_test.ts";
|
||||
|
|
Loading…
Reference in a new issue