mirror of
https://github.com/denoland/deno.git
synced 2025-01-12 09:03:42 -05:00
Unblock server on multiple HTTP requests (denoland/deno_std#3)
Original: 1bf555ab2f
This commit is contained in:
parent
a34bc9040d
commit
17c255581d
2 changed files with 82 additions and 33 deletions
112
http.ts
112
http.ts
|
@ -4,37 +4,82 @@ import { TextProtoReader } from "./textproto.ts";
|
|||
import { STATUS_TEXT } from "./http_status";
|
||||
import { assert } from "./util";
|
||||
|
||||
export async function* serve(addr: string) {
|
||||
const listener = listen("tcp", addr);
|
||||
while (true) {
|
||||
const c = await listener.accept();
|
||||
yield* serveConn(c);
|
||||
}
|
||||
listener.close();
|
||||
interface Deferred {
|
||||
promise: Promise<{}>;
|
||||
resolve: () => void;
|
||||
reject: () => void;
|
||||
}
|
||||
|
||||
export async function* serveConn(c: Conn) {
|
||||
let bufr = new BufReader(c);
|
||||
let bufw = new BufWriter(c);
|
||||
try {
|
||||
while (true) {
|
||||
const [req, err] = await readRequest(bufr);
|
||||
if (err == "EOF") {
|
||||
break;
|
||||
}
|
||||
if (err == "ShortWrite") {
|
||||
console.log("ShortWrite error");
|
||||
break;
|
||||
}
|
||||
if (err) {
|
||||
throw err;
|
||||
}
|
||||
req.w = bufw;
|
||||
yield req;
|
||||
}
|
||||
} finally {
|
||||
c.close();
|
||||
function deferred(): Deferred {
|
||||
let resolve, reject;
|
||||
const promise = new Promise((res, rej) => {
|
||||
resolve = res;
|
||||
reject = rej;
|
||||
});
|
||||
return {
|
||||
promise,
|
||||
resolve,
|
||||
reject
|
||||
};
|
||||
}
|
||||
|
||||
interface ServeEnv {
|
||||
reqQueue: ServerRequest[];
|
||||
serveDeferred: Deferred;
|
||||
}
|
||||
|
||||
// Continuously read more requests from conn until EOF
|
||||
// Mutually calling with maybeHandleReq
|
||||
// TODO: make them async function after this change is done
|
||||
// https://github.com/tc39/ecma262/pull/1250
|
||||
// See https://v8.dev/blog/fast-async
|
||||
export function serveConn(env: ServeEnv, conn: Conn) {
|
||||
readRequest(conn).then(maybeHandleReq.bind(null, env, conn));
|
||||
}
|
||||
function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
|
||||
const [req, _err] = maybeReq;
|
||||
if (_err) {
|
||||
conn.close(); // assume EOF for now...
|
||||
return;
|
||||
}
|
||||
env.reqQueue.push(req); // push req to queue
|
||||
env.serveDeferred.resolve(); // signal while loop to process it
|
||||
// TODO: protection against client req flooding
|
||||
serveConn(env, conn); // try read more (reusing connection)
|
||||
}
|
||||
|
||||
export async function* serve(addr: string) {
|
||||
const listener = listen("tcp", addr);
|
||||
const env: ServeEnv = {
|
||||
reqQueue: [], // in case multiple promises are ready
|
||||
serveDeferred: deferred()
|
||||
};
|
||||
|
||||
// Routine that keeps calling accept
|
||||
const acceptRoutine = () => {
|
||||
const handleConn = (conn: Conn) => {
|
||||
serveConn(env, conn); // don't block
|
||||
scheduleAccept(); // schedule next accept
|
||||
};
|
||||
const scheduleAccept = () => {
|
||||
listener.accept().then(handleConn);
|
||||
};
|
||||
scheduleAccept();
|
||||
};
|
||||
|
||||
acceptRoutine();
|
||||
|
||||
// Loop hack to allow yield (yield won't work in callbacks)
|
||||
while (true) {
|
||||
await env.serveDeferred.promise;
|
||||
env.serveDeferred = deferred(); // use a new deferred
|
||||
let queueToProcess = env.reqQueue;
|
||||
env.reqQueue = [];
|
||||
for (const result of queueToProcess) {
|
||||
yield result;
|
||||
}
|
||||
}
|
||||
listener.close();
|
||||
}
|
||||
|
||||
interface Response {
|
||||
|
@ -75,7 +120,7 @@ class ServerRequest {
|
|||
setContentLength(r);
|
||||
|
||||
if (r.headers) {
|
||||
for (let [key, value] of r.headers) {
|
||||
for (const [key, value] of r.headers) {
|
||||
out += `${key}: ${value}\r\n`;
|
||||
}
|
||||
}
|
||||
|
@ -93,9 +138,12 @@ class ServerRequest {
|
|||
}
|
||||
}
|
||||
|
||||
async function readRequest(b: BufReader): Promise<[ServerRequest, BufState]> {
|
||||
const tp = new TextProtoReader(b);
|
||||
async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> {
|
||||
const bufr = new BufReader(c);
|
||||
const bufw = new BufWriter(c);
|
||||
const req = new ServerRequest();
|
||||
req.w = bufw;
|
||||
const tp = new TextProtoReader(bufr);
|
||||
|
||||
let s: string;
|
||||
let err: BufState;
|
||||
|
@ -109,5 +157,7 @@ async function readRequest(b: BufReader): Promise<[ServerRequest, BufState]> {
|
|||
|
||||
[req.headers, err] = await tp.readMIMEHeader();
|
||||
|
||||
// TODO: handle body
|
||||
|
||||
return [req, err];
|
||||
}
|
||||
|
|
|
@ -5,8 +5,7 @@ const addr = "0.0.0.0:8000";
|
|||
const s = serve(addr);
|
||||
console.log(`listening on http://${addr}/`);
|
||||
|
||||
const body = (new TextEncoder()).encode("Hello World\n");
|
||||
|
||||
const body = new TextEncoder().encode("Hello World\n");
|
||||
|
||||
async function main() {
|
||||
for await (const req of s) {
|
||||
|
|
Loading…
Reference in a new issue