1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-27 16:10:57 -05:00
Original: 2153fd2646
This commit is contained in:
Eugen Cazacu 2019-03-09 17:46:53 +01:00 committed by Ryan Dahl
parent 831d74e364
commit 1870a8ec72
2 changed files with 176 additions and 179 deletions

View file

@ -1,4 +1,3 @@
/flags/ /flags/
/fs/ /fs/
/http/
/ws/ /ws/

View file

@ -13,101 +13,13 @@ interface Deferred {
resolve: () => void; resolve: () => void;
reject: () => void; reject: () => void;
} }
function bufWriter(w: Writer): BufWriter {
function deferred(): Deferred { if (w instanceof BufWriter) {
let resolve, reject; return w;
const promise = new Promise((res, rej) => { } else {
resolve = res; return new BufWriter(w);
reject = rej;
});
return {
promise,
resolve,
reject
};
}
interface ServeEnv {
reqQueue: ServerRequest[];
serveDeferred: Deferred;
}
/** Continuously read more requests from conn until EOF
* Calls maybeHandleReq.
* bufr is empty on a fresh TCP connection.
* Would be passed around and reused for later request on same conn
* TODO: make them async function after this change is done
* https://github.com/tc39/ecma262/pull/1250
* See https://v8.dev/blog/fast-async
*/
function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) {
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
}
function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
const [req, _err] = maybeReq;
if (_err) {
conn.close(); // assume EOF for now...
return;
}
env.reqQueue.push(req); // push req to queue
env.serveDeferred.resolve(); // signal while loop to process it
}
export async function* serve(addr: string) {
const listener = listen("tcp", addr);
const env: ServeEnv = {
reqQueue: [], // in case multiple promises are ready
serveDeferred: deferred()
};
// Routine that keeps calling accept
const acceptRoutine = () => {
const handleConn = (conn: Conn) => {
serveConn(env, conn); // don't block
scheduleAccept(); // schedule next accept
};
const scheduleAccept = () => {
listener.accept().then(handleConn);
};
scheduleAccept();
};
acceptRoutine();
// Loop hack to allow yield (yield won't work in callbacks)
while (true) {
await env.serveDeferred.promise;
env.serveDeferred = deferred(); // use a new deferred
let queueToProcess = env.reqQueue;
env.reqQueue = [];
for (const result of queueToProcess) {
yield result;
// Continue read more from conn when user is done with the current req
// Moving this here makes it easier to manage
serveConn(env, result.conn, result.r);
}
}
listener.close();
}
export async function listenAndServe(
addr: string,
handler: (req: ServerRequest) => void
) {
const server = serve(addr);
for await (const request of server) {
await handler(request);
} }
} }
export interface Response {
status?: number;
headers?: Headers;
body?: Uint8Array | Reader;
}
export function setContentLength(r: Response): void { export function setContentLength(r: Response): void {
if (!r.headers) { if (!r.headers) {
r.headers = new Headers(); r.headers = new Headers();
@ -124,6 +36,83 @@ export function setContentLength(r: Response): void {
} }
} }
} }
async function writeChunkedBody(w: Writer, r: Reader) {
const writer = bufWriter(w);
const encoder = new TextEncoder();
for await (const chunk of toAsyncIterator(r)) {
const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`);
const end = encoder.encode("\r\n");
await writer.write(start);
await writer.write(chunk);
await writer.write(end);
}
const endChunk = encoder.encode("0\r\n\r\n");
await writer.write(endChunk);
}
export async function writeResponse(w: Writer, r: Response): Promise<void> {
const protoMajor = 1;
const protoMinor = 1;
const statusCode = r.status || 200;
const statusText = STATUS_TEXT.get(statusCode);
const writer = bufWriter(w);
if (!statusText) {
throw Error("bad status code");
}
let out = `HTTP/${protoMajor}.${protoMinor} ${statusCode} ${statusText}\r\n`;
setContentLength(r);
if (r.headers) {
for (const [key, value] of r.headers) {
out += `${key}: ${value}\r\n`;
}
}
out += "\r\n";
const header = new TextEncoder().encode(out);
let n = await writer.write(header);
assert(header.byteLength == n);
if (r.body) {
if (r.body instanceof Uint8Array) {
n = await writer.write(r.body);
assert(r.body.byteLength == n);
} else {
if (r.headers.has("content-length")) {
const bodyLength = parseInt(r.headers.get("content-length"));
const n = await copy(writer, r.body);
assert(n == bodyLength);
} else {
await writeChunkedBody(writer, r.body);
}
}
}
await writer.flush();
}
async function readAllIterator(
it: AsyncIterableIterator<Uint8Array>
): Promise<Uint8Array> {
const chunks = [];
let len = 0;
for await (const chunk of it) {
chunks.push(chunk);
len += chunk.length;
}
if (chunks.length === 0) {
// No need for copy
return chunks[0];
}
const collected = new Uint8Array(len);
let offset = 0;
for (let chunk of chunks) {
collected.set(chunk, offset);
offset += chunk.length;
}
return collected;
}
export class ServerRequest { export class ServerRequest {
url: string; url: string;
@ -159,22 +148,22 @@ export class ServerRequest {
if (transferEncodings.includes("chunked")) { if (transferEncodings.includes("chunked")) {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(this.r); const tp = new TextProtoReader(this.r);
let [line, _] = await tp.readLine(); let [line] = await tp.readLine();
// TODO: handle chunk extension // TODO: handle chunk extension
let [chunkSizeString, optExt] = line.split(";"); let [chunkSizeString] = line.split(";");
let chunkSize = parseInt(chunkSizeString, 16); let chunkSize = parseInt(chunkSizeString, 16);
if (Number.isNaN(chunkSize) || chunkSize < 0) { if (Number.isNaN(chunkSize) || chunkSize < 0) {
throw new Error("Invalid chunk size"); throw new Error("Invalid chunk size");
} }
while (chunkSize > 0) { while (chunkSize > 0) {
let data = new Uint8Array(chunkSize); let data = new Uint8Array(chunkSize);
let [nread, err] = await this.r.readFull(data); let [nread] = await this.r.readFull(data);
if (nread !== chunkSize) { if (nread !== chunkSize) {
throw new Error("Chunk data does not match size"); throw new Error("Chunk data does not match size");
} }
yield data; yield data;
await this.r.readLine(); // Consume \r\n await this.r.readLine(); // Consume \r\n
[line, _] = await tp.readLine(); [line] = await tp.readLine();
chunkSize = parseInt(line, 16); chunkSize = parseInt(line, 16);
} }
const [entityHeaders, err] = await tp.readMIMEHeader(); const [entityHeaders, err] = await tp.readMIMEHeader();
@ -219,72 +208,32 @@ export class ServerRequest {
} }
} }
function bufWriter(w: Writer): BufWriter { function deferred(): Deferred {
if (w instanceof BufWriter) { let resolve, reject;
return w; const promise = new Promise((res, rej) => {
} else { resolve = res;
return new BufWriter(w); reject = rej;
} });
return {
promise,
resolve,
reject
};
} }
export async function writeResponse(w: Writer, r: Response): Promise<void> { interface ServeEnv {
const protoMajor = 1; reqQueue: ServerRequest[];
const protoMinor = 1; serveDeferred: Deferred;
const statusCode = r.status || 200;
const statusText = STATUS_TEXT.get(statusCode);
const writer = bufWriter(w);
if (!statusText) {
throw Error("bad status code");
}
let out = `HTTP/${protoMajor}.${protoMinor} ${statusCode} ${statusText}\r\n`;
setContentLength(r);
if (r.headers) {
for (const [key, value] of r.headers) {
out += `${key}: ${value}\r\n`;
}
}
out += "\r\n";
const header = new TextEncoder().encode(out);
let n = await writer.write(header);
assert(header.byteLength == n);
if (r.body) {
if (r.body instanceof Uint8Array) {
n = await writer.write(r.body);
assert(r.body.byteLength == n);
} else {
if (r.headers.has("content-length")) {
const bodyLength = parseInt(r.headers.get("content-length"));
const n = await copy(writer, r.body);
assert(n == bodyLength);
} else {
await writeChunkedBody(writer, r.body);
}
}
}
await writer.flush();
}
async function writeChunkedBody(w: Writer, r: Reader) {
const writer = bufWriter(w);
const encoder = new TextEncoder();
for await (const chunk of toAsyncIterator(r)) {
const start = encoder.encode(`${chunk.byteLength.toString(16)}\r\n`);
const end = encoder.encode("\r\n");
await writer.write(start);
await writer.write(chunk);
await writer.write(end);
}
const endChunk = encoder.encode("0\r\n\r\n");
await writer.write(endChunk);
} }
/** Continuously read more requests from conn until EOF
* Calls maybeHandleReq.
* bufr is empty on a fresh TCP connection.
* Would be passed around and reused for later request on same conn
* TODO: make them async function after this change is done
* https://github.com/tc39/ecma262/pull/1250
* See https://v8.dev/blog/fast-async
*/
async function readRequest( async function readRequest(
c: Conn, c: Conn,
bufr?: BufReader bufr?: BufReader
@ -314,24 +263,73 @@ async function readRequest(
return [req, err]; return [req, err];
} }
async function readAllIterator( function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
it: AsyncIterableIterator<Uint8Array> const [req, _err] = maybeReq;
): Promise<Uint8Array> { if (_err) {
const chunks = []; conn.close(); // assume EOF for now...
let len = 0; return;
for await (const chunk of it) {
chunks.push(chunk);
len += chunk.length;
} }
if (chunks.length === 0) { env.reqQueue.push(req); // push req to queue
// No need for copy env.serveDeferred.resolve(); // signal while loop to process it
return chunks[0]; }
}
const collected = new Uint8Array(len); function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) {
let offset = 0; readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
for (let chunk of chunks) { }
collected.set(chunk, offset);
offset += chunk.length; export async function* serve(addr: string) {
} const listener = listen("tcp", addr);
return collected; const env: ServeEnv = {
reqQueue: [], // in case multiple promises are ready
serveDeferred: deferred()
};
// Routine that keeps calling accept
let handleConn = (_conn: Conn) => {};
let scheduleAccept = () => {};
const acceptRoutine = () => {
scheduleAccept = () => {
listener.accept().then(handleConn);
};
handleConn = (conn: Conn) => {
serveConn(env, conn); // don't block
scheduleAccept(); // schedule next accept
};
scheduleAccept();
};
acceptRoutine();
// Loop hack to allow yield (yield won't work in callbacks)
while (true) {
await env.serveDeferred.promise;
env.serveDeferred = deferred(); // use a new deferred
let queueToProcess = env.reqQueue;
env.reqQueue = [];
for (const result of queueToProcess) {
yield result;
// Continue read more from conn when user is done with the current req
// Moving this here makes it easier to manage
serveConn(env, result.conn, result.r);
}
}
listener.close();
}
export async function listenAndServe(
addr: string,
handler: (req: ServerRequest) => void
) {
const server = serve(addr);
for await (const request of server) {
await handler(request);
}
}
export interface Response {
status?: number;
headers?: Headers;
body?: Uint8Array | Reader;
} }