diff --git a/net/http.ts b/net/http.ts index bd45aea0d1..2b0e5477a3 100644 --- a/net/http.ts +++ b/net/http.ts @@ -28,13 +28,16 @@ interface ServeEnv { serveDeferred: Deferred; } -// Continuously read more requests from conn until EOF -// Mutually calling with maybeHandleReq -// TODO: make them async function after this change is done -// https://github.com/tc39/ecma262/pull/1250 -// See https://v8.dev/blog/fast-async -export function serveConn(env: ServeEnv, conn: Conn) { - readRequest(conn).then(maybeHandleReq.bind(null, env, conn)); +/** Continuously read more requests from conn until EOF + * Calls maybeHandleReq. + * bufr is empty on a fresh TCP connection. + * Would be passed around and reused for later request on same conn + * TODO: make them async function after this change is done + * https://github.com/tc39/ecma262/pull/1250 + * See https://v8.dev/blog/fast-async + */ +function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) { + readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn)); } function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) { const [req, _err] = maybeReq; @@ -44,8 +47,6 @@ function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) { } env.reqQueue.push(req); // push req to queue env.serveDeferred.resolve(); // signal while loop to process it - // TODO: protection against client req flooding - serveConn(env, conn); // try read more (reusing connection) } export async function* serve(addr: string) { @@ -77,6 +78,9 @@ export async function* serve(addr: string) { env.reqQueue = []; for (const result of queueToProcess) { yield result; + // Continue read more from conn when user is done with the current req + // Moving this here makes it easier to manage + serveConn(env, result.conn, result.r); } } listener.close(); @@ -121,8 +125,90 @@ export class ServerRequest { method: string; proto: string; headers: Headers; + conn: Conn; + r: BufReader; w: BufWriter; + public async *bodyStream() { + if (this.headers.has("content-length")) { + const len = +this.headers.get("content-length"); + if (Number.isNaN(len)) { + return new Uint8Array(0); + } + let buf = new Uint8Array(1024); + let rr = await this.r.read(buf); + let nread = rr.nread; + while (!rr.eof && nread < len) { + yield buf.subarray(0, rr.nread); + buf = new Uint8Array(1024); + rr = await this.r.read(buf); + nread += rr.nread; + } + yield buf.subarray(0, rr.nread); + } else { + if (this.headers.has("transfer-encoding")) { + const transferEncodings = this.headers + .get("transfer-encoding") + .split(",") + .map(e => e.trim().toLowerCase()); + if (transferEncodings.includes("chunked")) { + // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 + const tp = new TextProtoReader(this.r); + let [line, _] = await tp.readLine(); + // TODO: handle chunk extension + let [chunkSizeString, optExt] = line.split(";"); + let chunkSize = parseInt(chunkSizeString, 16); + if (Number.isNaN(chunkSize) || chunkSize < 0) { + throw new Error("Invalid chunk size"); + } + while (chunkSize > 0) { + let data = new Uint8Array(chunkSize); + let [nread, err] = await this.r.readFull(data); + if (nread !== chunkSize) { + throw new Error("Chunk data does not match size"); + } + yield data; + await this.r.readLine(); // Consume \r\n + [line, _] = await tp.readLine(); + chunkSize = parseInt(line, 16); + } + const [entityHeaders, err] = await tp.readMIMEHeader(); + if (!err) { + for (let [k, v] of entityHeaders) { + this.headers.set(k, v); + } + } + /* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6 + length := 0 + read chunk-size, chunk-extension (if any) and CRLF + while (chunk-size > 0) { + read chunk-data and CRLF + append chunk-data to entity-body + length := length + chunk-size + read chunk-size and CRLF + } + read entity-header + while (entity-header not empty) { + append entity-header to existing header fields + read entity-header + } + Content-Length := length + Remove "chunked" from Transfer-Encoding + */ + return; // Must return here to avoid fall through + } + // TODO: handle other transfer-encoding types + } + // Otherwise... + yield new Uint8Array(0); + } + } + + // Read the body of the request into a single Uint8Array + public async body(): Promise { + return readAllIterator(this.bodyStream()); + } + private async _streamBody(body: Reader, bodyLength: number) { const n = await copy(this.w, body); assert(n == bodyLength); @@ -187,12 +273,19 @@ export class ServerRequest { } } -async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> { - const bufr = new BufReader(c); +async function readRequest( + c: Conn, + bufr?: BufReader +): Promise<[ServerRequest, BufState]> { + if (!bufr) { + bufr = new BufReader(c); + } const bufw = new BufWriter(c); const req = new ServerRequest(); + req.conn = c; + req.r = bufr!; req.w = bufw; - const tp = new TextProtoReader(bufr); + const tp = new TextProtoReader(bufr!); let s: string; let err: BufState; @@ -206,7 +299,27 @@ async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> { [req.headers, err] = await tp.readMIMEHeader(); - // TODO: handle body - return [req, err]; } + +async function readAllIterator( + it: AsyncIterableIterator +): Promise { + const chunks = []; + let len = 0; + for await (const chunk of it) { + chunks.push(chunk); + len += chunk.length; + } + if (chunks.length === 0) { + // No need for copy + return chunks[0]; + } + const collected = new Uint8Array(len); + let offset = 0; + for (let chunk of chunks) { + collected.set(chunk, offset); + offset += chunk.length; + } + return collected; +} diff --git a/net/http_test.ts b/net/http_test.ts index cdb7f8303e..97d07a5b4e 100644 --- a/net/http_test.ts +++ b/net/http_test.ts @@ -18,13 +18,16 @@ import { Response } from "./http"; import { Buffer } from "deno"; -import { BufWriter } from "./bufio"; +import { BufWriter, BufReader } from "./bufio"; interface ResponseTest { response: Response; raw: string; } +const enc = new TextEncoder(); +const dec = new TextDecoder(); + const responseTests: ResponseTest[] = [ // Default response { @@ -56,3 +59,170 @@ test(async function responseWrite() { assertEqual(buf.toString(), testCase.raw); } }); + +test(async function requestBodyWithContentLength() { + { + const req = new ServerRequest(); + req.headers = new Headers(); + req.headers.set("content-length", "5"); + const buf = new Buffer(enc.encode("Hello")); + req.r = new BufReader(buf); + const body = dec.decode(await req.body()); + assertEqual(body, "Hello"); + } + + // Larger than internal buf + { + const longText = "1234\n".repeat(1000); + const req = new ServerRequest(); + req.headers = new Headers(); + req.headers.set("Content-Length", "5000"); + const buf = new Buffer(enc.encode(longText)); + req.r = new BufReader(buf); + const body = dec.decode(await req.body()); + assertEqual(body, longText); + } +}); + +test(async function requestBodyWithTransferEncoding() { + { + const shortText = "Hello"; + const req = new ServerRequest(); + req.headers = new Headers(); + req.headers.set("transfer-encoding", "chunked"); + let chunksData = ""; + let chunkOffset = 0; + const maxChunkSize = 70; + while (chunkOffset < shortText.length) { + const chunkSize = Math.min(maxChunkSize, shortText.length - chunkOffset); + chunksData += `${chunkSize.toString(16)}\r\n${shortText.substr( + chunkOffset, + chunkSize + )}\r\n`; + chunkOffset += chunkSize; + } + chunksData += "0\r\n\r\n"; + const buf = new Buffer(enc.encode(chunksData)); + req.r = new BufReader(buf); + const body = dec.decode(await req.body()); + assertEqual(body, shortText); + } + + // Larger than internal buf + { + const longText = "1234\n".repeat(1000); + const req = new ServerRequest(); + req.headers = new Headers(); + req.headers.set("transfer-encoding", "chunked"); + let chunksData = ""; + let chunkOffset = 0; + const maxChunkSize = 70; + while (chunkOffset < longText.length) { + const chunkSize = Math.min(maxChunkSize, longText.length - chunkOffset); + chunksData += `${chunkSize.toString(16)}\r\n${longText.substr( + chunkOffset, + chunkSize + )}\r\n`; + chunkOffset += chunkSize; + } + chunksData += "0\r\n\r\n"; + const buf = new Buffer(enc.encode(chunksData)); + req.r = new BufReader(buf); + const body = dec.decode(await req.body()); + assertEqual(body, longText); + } +}); + +test(async function requestBodyStreamWithContentLength() { + { + const shortText = "Hello"; + const req = new ServerRequest(); + req.headers = new Headers(); + req.headers.set("content-length", "" + shortText.length); + const buf = new Buffer(enc.encode(shortText)); + req.r = new BufReader(buf); + const it = await req.bodyStream(); + let offset = 0; + for await (const chunk of it) { + const s = dec.decode(chunk); + assertEqual(shortText.substr(offset, s.length), s); + offset += s.length; + } + } + + // Larger than internal buf + { + const longText = "1234\n".repeat(1000); + const req = new ServerRequest(); + req.headers = new Headers(); + req.headers.set("Content-Length", "5000"); + const buf = new Buffer(enc.encode(longText)); + req.r = new BufReader(buf); + const it = await req.bodyStream(); + let offset = 0; + for await (const chunk of it) { + const s = dec.decode(chunk); + assertEqual(longText.substr(offset, s.length), s); + offset += s.length; + } + } +}); + +test(async function requestBodyStreamWithTransferEncoding() { + { + const shortText = "Hello"; + const req = new ServerRequest(); + req.headers = new Headers(); + req.headers.set("transfer-encoding", "chunked"); + let chunksData = ""; + let chunkOffset = 0; + const maxChunkSize = 70; + while (chunkOffset < shortText.length) { + const chunkSize = Math.min(maxChunkSize, shortText.length - chunkOffset); + chunksData += `${chunkSize.toString(16)}\r\n${shortText.substr( + chunkOffset, + chunkSize + )}\r\n`; + chunkOffset += chunkSize; + } + chunksData += "0\r\n\r\n"; + const buf = new Buffer(enc.encode(chunksData)); + req.r = new BufReader(buf); + const it = await req.bodyStream(); + let offset = 0; + for await (const chunk of it) { + const s = dec.decode(chunk); + assertEqual(shortText.substr(offset, s.length), s); + offset += s.length; + } + } + + // Larger than internal buf + { + const longText = "1234\n".repeat(1000); + const req = new ServerRequest(); + req.headers = new Headers(); + req.headers.set("transfer-encoding", "chunked"); + let chunksData = ""; + let chunkOffset = 0; + const maxChunkSize = 70; + while (chunkOffset < longText.length) { + const chunkSize = Math.min(maxChunkSize, longText.length - chunkOffset); + chunksData += `${chunkSize.toString(16)}\r\n${longText.substr( + chunkOffset, + chunkSize + )}\r\n`; + chunkOffset += chunkSize; + } + chunksData += "0\r\n\r\n"; + const buf = new Buffer(enc.encode(chunksData)); + req.r = new BufReader(buf); + const it = await req.bodyStream(); + let offset = 0; + for await (const chunk of it) { + const s = dec.decode(chunk); + assertEqual(longText.substr(offset, s.length), s); + offset += s.length; + } + } +});