mirror of
https://github.com/denoland/deno.git
synced 2024-11-30 16:40:57 -05:00
http: Request body & Streaming (#23)
This commit is contained in:
parent
2ae63d35d0
commit
e0e677bb02
2 changed files with 298 additions and 15 deletions
141
net/http.ts
141
net/http.ts
|
@ -28,13 +28,16 @@ interface ServeEnv {
|
|||
serveDeferred: Deferred;
|
||||
}
|
||||
|
||||
// Continuously read more requests from conn until EOF
|
||||
// Mutually calling with maybeHandleReq
|
||||
// TODO: make them async function after this change is done
|
||||
// https://github.com/tc39/ecma262/pull/1250
|
||||
// See https://v8.dev/blog/fast-async
|
||||
export function serveConn(env: ServeEnv, conn: Conn) {
|
||||
readRequest(conn).then(maybeHandleReq.bind(null, env, conn));
|
||||
/** Continuously read more requests from conn until EOF
|
||||
* Calls maybeHandleReq.
|
||||
* bufr is empty on a fresh TCP connection.
|
||||
* Would be passed around and reused for later request on same conn
|
||||
* TODO: make them async function after this change is done
|
||||
* https://github.com/tc39/ecma262/pull/1250
|
||||
* See https://v8.dev/blog/fast-async
|
||||
*/
|
||||
function serveConn(env: ServeEnv, conn: Conn, bufr?: BufReader) {
|
||||
readRequest(conn, bufr).then(maybeHandleReq.bind(null, env, conn));
|
||||
}
|
||||
function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
|
||||
const [req, _err] = maybeReq;
|
||||
|
@ -44,8 +47,6 @@ function maybeHandleReq(env: ServeEnv, conn: Conn, maybeReq: any) {
|
|||
}
|
||||
env.reqQueue.push(req); // push req to queue
|
||||
env.serveDeferred.resolve(); // signal while loop to process it
|
||||
// TODO: protection against client req flooding
|
||||
serveConn(env, conn); // try read more (reusing connection)
|
||||
}
|
||||
|
||||
export async function* serve(addr: string) {
|
||||
|
@ -77,6 +78,9 @@ export async function* serve(addr: string) {
|
|||
env.reqQueue = [];
|
||||
for (const result of queueToProcess) {
|
||||
yield result;
|
||||
// Continue read more from conn when user is done with the current req
|
||||
// Moving this here makes it easier to manage
|
||||
serveConn(env, result.conn, result.r);
|
||||
}
|
||||
}
|
||||
listener.close();
|
||||
|
@ -121,8 +125,90 @@ export class ServerRequest {
|
|||
method: string;
|
||||
proto: string;
|
||||
headers: Headers;
|
||||
conn: Conn;
|
||||
r: BufReader;
|
||||
w: BufWriter;
|
||||
|
||||
public async *bodyStream() {
|
||||
if (this.headers.has("content-length")) {
|
||||
const len = +this.headers.get("content-length");
|
||||
if (Number.isNaN(len)) {
|
||||
return new Uint8Array(0);
|
||||
}
|
||||
let buf = new Uint8Array(1024);
|
||||
let rr = await this.r.read(buf);
|
||||
let nread = rr.nread;
|
||||
while (!rr.eof && nread < len) {
|
||||
yield buf.subarray(0, rr.nread);
|
||||
buf = new Uint8Array(1024);
|
||||
rr = await this.r.read(buf);
|
||||
nread += rr.nread;
|
||||
}
|
||||
yield buf.subarray(0, rr.nread);
|
||||
} else {
|
||||
if (this.headers.has("transfer-encoding")) {
|
||||
const transferEncodings = this.headers
|
||||
.get("transfer-encoding")
|
||||
.split(",")
|
||||
.map(e => e.trim().toLowerCase());
|
||||
if (transferEncodings.includes("chunked")) {
|
||||
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
|
||||
const tp = new TextProtoReader(this.r);
|
||||
let [line, _] = await tp.readLine();
|
||||
// TODO: handle chunk extension
|
||||
let [chunkSizeString, optExt] = line.split(";");
|
||||
let chunkSize = parseInt(chunkSizeString, 16);
|
||||
if (Number.isNaN(chunkSize) || chunkSize < 0) {
|
||||
throw new Error("Invalid chunk size");
|
||||
}
|
||||
while (chunkSize > 0) {
|
||||
let data = new Uint8Array(chunkSize);
|
||||
let [nread, err] = await this.r.readFull(data);
|
||||
if (nread !== chunkSize) {
|
||||
throw new Error("Chunk data does not match size");
|
||||
}
|
||||
yield data;
|
||||
await this.r.readLine(); // Consume \r\n
|
||||
[line, _] = await tp.readLine();
|
||||
chunkSize = parseInt(line, 16);
|
||||
}
|
||||
const [entityHeaders, err] = await tp.readMIMEHeader();
|
||||
if (!err) {
|
||||
for (let [k, v] of entityHeaders) {
|
||||
this.headers.set(k, v);
|
||||
}
|
||||
}
|
||||
/* Pseudo code from https://tools.ietf.org/html/rfc2616#section-19.4.6
|
||||
length := 0
|
||||
read chunk-size, chunk-extension (if any) and CRLF
|
||||
while (chunk-size > 0) {
|
||||
read chunk-data and CRLF
|
||||
append chunk-data to entity-body
|
||||
length := length + chunk-size
|
||||
read chunk-size and CRLF
|
||||
}
|
||||
read entity-header
|
||||
while (entity-header not empty) {
|
||||
append entity-header to existing header fields
|
||||
read entity-header
|
||||
}
|
||||
Content-Length := length
|
||||
Remove "chunked" from Transfer-Encoding
|
||||
*/
|
||||
return; // Must return here to avoid fall through
|
||||
}
|
||||
// TODO: handle other transfer-encoding types
|
||||
}
|
||||
// Otherwise...
|
||||
yield new Uint8Array(0);
|
||||
}
|
||||
}
|
||||
|
||||
// Read the body of the request into a single Uint8Array
|
||||
public async body(): Promise<Uint8Array> {
|
||||
return readAllIterator(this.bodyStream());
|
||||
}
|
||||
|
||||
private async _streamBody(body: Reader, bodyLength: number) {
|
||||
const n = await copy(this.w, body);
|
||||
assert(n == bodyLength);
|
||||
|
@ -187,12 +273,19 @@ export class ServerRequest {
|
|||
}
|
||||
}
|
||||
|
||||
async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> {
|
||||
const bufr = new BufReader(c);
|
||||
async function readRequest(
|
||||
c: Conn,
|
||||
bufr?: BufReader
|
||||
): Promise<[ServerRequest, BufState]> {
|
||||
if (!bufr) {
|
||||
bufr = new BufReader(c);
|
||||
}
|
||||
const bufw = new BufWriter(c);
|
||||
const req = new ServerRequest();
|
||||
req.conn = c;
|
||||
req.r = bufr!;
|
||||
req.w = bufw;
|
||||
const tp = new TextProtoReader(bufr);
|
||||
const tp = new TextProtoReader(bufr!);
|
||||
|
||||
let s: string;
|
||||
let err: BufState;
|
||||
|
@ -206,7 +299,27 @@ async function readRequest(c: Conn): Promise<[ServerRequest, BufState]> {
|
|||
|
||||
[req.headers, err] = await tp.readMIMEHeader();
|
||||
|
||||
// TODO: handle body
|
||||
|
||||
return [req, err];
|
||||
}
|
||||
|
||||
async function readAllIterator(
|
||||
it: AsyncIterableIterator<Uint8Array>
|
||||
): Promise<Uint8Array> {
|
||||
const chunks = [];
|
||||
let len = 0;
|
||||
for await (const chunk of it) {
|
||||
chunks.push(chunk);
|
||||
len += chunk.length;
|
||||
}
|
||||
if (chunks.length === 0) {
|
||||
// No need for copy
|
||||
return chunks[0];
|
||||
}
|
||||
const collected = new Uint8Array(len);
|
||||
let offset = 0;
|
||||
for (let chunk of chunks) {
|
||||
collected.set(chunk, offset);
|
||||
offset += chunk.length;
|
||||
}
|
||||
return collected;
|
||||
}
|
||||
|
|
172
net/http_test.ts
172
net/http_test.ts
|
@ -18,13 +18,16 @@ import {
|
|||
Response
|
||||
} from "./http";
|
||||
import { Buffer } from "deno";
|
||||
import { BufWriter } from "./bufio";
|
||||
import { BufWriter, BufReader } from "./bufio";
|
||||
|
||||
interface ResponseTest {
|
||||
response: Response;
|
||||
raw: string;
|
||||
}
|
||||
|
||||
const enc = new TextEncoder();
|
||||
const dec = new TextDecoder();
|
||||
|
||||
const responseTests: ResponseTest[] = [
|
||||
// Default response
|
||||
{
|
||||
|
@ -56,3 +59,170 @@ test(async function responseWrite() {
|
|||
assertEqual(buf.toString(), testCase.raw);
|
||||
}
|
||||
});
|
||||
|
||||
test(async function requestBodyWithContentLength() {
|
||||
{
|
||||
const req = new ServerRequest();
|
||||
req.headers = new Headers();
|
||||
req.headers.set("content-length", "5");
|
||||
const buf = new Buffer(enc.encode("Hello"));
|
||||
req.r = new BufReader(buf);
|
||||
const body = dec.decode(await req.body());
|
||||
assertEqual(body, "Hello");
|
||||
}
|
||||
|
||||
// Larger than internal buf
|
||||
{
|
||||
const longText = "1234\n".repeat(1000);
|
||||
const req = new ServerRequest();
|
||||
req.headers = new Headers();
|
||||
req.headers.set("Content-Length", "5000");
|
||||
const buf = new Buffer(enc.encode(longText));
|
||||
req.r = new BufReader(buf);
|
||||
const body = dec.decode(await req.body());
|
||||
assertEqual(body, longText);
|
||||
}
|
||||
});
|
||||
|
||||
test(async function requestBodyWithTransferEncoding() {
|
||||
{
|
||||
const shortText = "Hello";
|
||||
const req = new ServerRequest();
|
||||
req.headers = new Headers();
|
||||
req.headers.set("transfer-encoding", "chunked");
|
||||
let chunksData = "";
|
||||
let chunkOffset = 0;
|
||||
const maxChunkSize = 70;
|
||||
while (chunkOffset < shortText.length) {
|
||||
const chunkSize = Math.min(maxChunkSize, shortText.length - chunkOffset);
|
||||
chunksData += `${chunkSize.toString(16)}\r\n${shortText.substr(
|
||||
chunkOffset,
|
||||
chunkSize
|
||||
)}\r\n`;
|
||||
chunkOffset += chunkSize;
|
||||
}
|
||||
chunksData += "0\r\n\r\n";
|
||||
const buf = new Buffer(enc.encode(chunksData));
|
||||
req.r = new BufReader(buf);
|
||||
const body = dec.decode(await req.body());
|
||||
assertEqual(body, shortText);
|
||||
}
|
||||
|
||||
// Larger than internal buf
|
||||
{
|
||||
const longText = "1234\n".repeat(1000);
|
||||
const req = new ServerRequest();
|
||||
req.headers = new Headers();
|
||||
req.headers.set("transfer-encoding", "chunked");
|
||||
let chunksData = "";
|
||||
let chunkOffset = 0;
|
||||
const maxChunkSize = 70;
|
||||
while (chunkOffset < longText.length) {
|
||||
const chunkSize = Math.min(maxChunkSize, longText.length - chunkOffset);
|
||||
chunksData += `${chunkSize.toString(16)}\r\n${longText.substr(
|
||||
chunkOffset,
|
||||
chunkSize
|
||||
)}\r\n`;
|
||||
chunkOffset += chunkSize;
|
||||
}
|
||||
chunksData += "0\r\n\r\n";
|
||||
const buf = new Buffer(enc.encode(chunksData));
|
||||
req.r = new BufReader(buf);
|
||||
const body = dec.decode(await req.body());
|
||||
assertEqual(body, longText);
|
||||
}
|
||||
});
|
||||
|
||||
test(async function requestBodyStreamWithContentLength() {
|
||||
{
|
||||
const shortText = "Hello";
|
||||
const req = new ServerRequest();
|
||||
req.headers = new Headers();
|
||||
req.headers.set("content-length", "" + shortText.length);
|
||||
const buf = new Buffer(enc.encode(shortText));
|
||||
req.r = new BufReader(buf);
|
||||
const it = await req.bodyStream();
|
||||
let offset = 0;
|
||||
for await (const chunk of it) {
|
||||
const s = dec.decode(chunk);
|
||||
assertEqual(shortText.substr(offset, s.length), s);
|
||||
offset += s.length;
|
||||
}
|
||||
}
|
||||
|
||||
// Larger than internal buf
|
||||
{
|
||||
const longText = "1234\n".repeat(1000);
|
||||
const req = new ServerRequest();
|
||||
req.headers = new Headers();
|
||||
req.headers.set("Content-Length", "5000");
|
||||
const buf = new Buffer(enc.encode(longText));
|
||||
req.r = new BufReader(buf);
|
||||
const it = await req.bodyStream();
|
||||
let offset = 0;
|
||||
for await (const chunk of it) {
|
||||
const s = dec.decode(chunk);
|
||||
assertEqual(longText.substr(offset, s.length), s);
|
||||
offset += s.length;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
test(async function requestBodyStreamWithTransferEncoding() {
|
||||
{
|
||||
const shortText = "Hello";
|
||||
const req = new ServerRequest();
|
||||
req.headers = new Headers();
|
||||
req.headers.set("transfer-encoding", "chunked");
|
||||
let chunksData = "";
|
||||
let chunkOffset = 0;
|
||||
const maxChunkSize = 70;
|
||||
while (chunkOffset < shortText.length) {
|
||||
const chunkSize = Math.min(maxChunkSize, shortText.length - chunkOffset);
|
||||
chunksData += `${chunkSize.toString(16)}\r\n${shortText.substr(
|
||||
chunkOffset,
|
||||
chunkSize
|
||||
)}\r\n`;
|
||||
chunkOffset += chunkSize;
|
||||
}
|
||||
chunksData += "0\r\n\r\n";
|
||||
const buf = new Buffer(enc.encode(chunksData));
|
||||
req.r = new BufReader(buf);
|
||||
const it = await req.bodyStream();
|
||||
let offset = 0;
|
||||
for await (const chunk of it) {
|
||||
const s = dec.decode(chunk);
|
||||
assertEqual(shortText.substr(offset, s.length), s);
|
||||
offset += s.length;
|
||||
}
|
||||
}
|
||||
|
||||
// Larger than internal buf
|
||||
{
|
||||
const longText = "1234\n".repeat(1000);
|
||||
const req = new ServerRequest();
|
||||
req.headers = new Headers();
|
||||
req.headers.set("transfer-encoding", "chunked");
|
||||
let chunksData = "";
|
||||
let chunkOffset = 0;
|
||||
const maxChunkSize = 70;
|
||||
while (chunkOffset < longText.length) {
|
||||
const chunkSize = Math.min(maxChunkSize, longText.length - chunkOffset);
|
||||
chunksData += `${chunkSize.toString(16)}\r\n${longText.substr(
|
||||
chunkOffset,
|
||||
chunkSize
|
||||
)}\r\n`;
|
||||
chunkOffset += chunkSize;
|
||||
}
|
||||
chunksData += "0\r\n\r\n";
|
||||
const buf = new Buffer(enc.encode(chunksData));
|
||||
req.r = new BufReader(buf);
|
||||
const it = await req.bodyStream();
|
||||
let offset = 0;
|
||||
for await (const chunk of it) {
|
||||
const s = dec.decode(chunk);
|
||||
assertEqual(longText.substr(offset, s.length), s);
|
||||
offset += s.length;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue