diff --git a/cli/bench/http/deno_http_flash_post_bin.js b/cli/bench/http/deno_http_flash_post_bin.js new file mode 100644 index 0000000000..cea530e603 --- /dev/null +++ b/cli/bench/http/deno_http_flash_post_bin.js @@ -0,0 +1,16 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +const addr = Deno.args[0] || "127.0.0.1:4500"; +const [hostname, port] = addr.split(":"); +const { serve } = Deno; + +async function handler(request) { + try { + const buffer = await request.arrayBuffer(); + return new Response(buffer.byteLength); + } catch (e) { + console.log(e); + } +} + +serve(handler, { hostname, port }); diff --git a/cli/bench/http/deno_http_flash_post_bin.lua b/cli/bench/http/deno_http_flash_post_bin.lua new file mode 100644 index 0000000000..c8f5d3e3f7 --- /dev/null +++ b/cli/bench/http/deno_http_flash_post_bin.lua @@ -0,0 +1,5 @@ +wrk.method = "POST" +wrk.headers["Content-Type"] = "application/octet-stream" + +file = io.open("./cli/bench/testdata/128k.bin", "rb") +wrk.body = file:read("*a") \ No newline at end of file diff --git a/cli/bench/http/deno_post_bin.js b/cli/bench/http/deno_post_bin.js new file mode 100644 index 0000000000..33ffeed1b0 --- /dev/null +++ b/cli/bench/http/deno_post_bin.js @@ -0,0 +1,19 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +const addr = Deno.args[0] || "127.0.0.1:4500"; +const [hostname, port] = addr.split(":"); +const listener = Deno.listen({ hostname, port: Number(port) }); +console.log("Server listening on", addr); + +for await (const conn of listener) { + (async () => { + const requests = Deno.serveHttp(conn); + for await (const { respondWith, request } of requests) { + if (request.method == "POST") { + const buffer = await request.arrayBuffer(); + respondWith(new Response(buffer.byteLength)) + .catch((e) => console.log(e)); + } + } + })(); +} diff --git a/cli/bench/http/deno_post_bin.lua b/cli/bench/http/deno_post_bin.lua new file mode 100644 index 0000000000..c8f5d3e3f7 --- /dev/null +++ b/cli/bench/http/deno_post_bin.lua @@ -0,0 +1,5 @@ +wrk.method = "POST" +wrk.headers["Content-Type"] = "application/octet-stream" + +file = io.open("./cli/bench/testdata/128k.bin", "rb") +wrk.body = file:read("*a") \ No newline at end of file diff --git a/cli/bench/http/node_post_bin.js b/cli/bench/http/node_post_bin.js new file mode 100644 index 0000000000..d0f2d6667c --- /dev/null +++ b/cli/bench/http/node_post_bin.js @@ -0,0 +1,18 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +const http = require("http"); +const port = process.argv[2] || "4544"; +console.log("port", port); +http + .Server((req, res) => { + if (req.method == "POST") { + let chunks = []; + req.on("data", function (data) { + chunks.push(data); + }); + req.on("end", function () { + const buffer = Buffer.concat(chunks); + res.end(buffer.byteLength.toString()); + }); + } + }) + .listen(port); diff --git a/cli/bench/http/node_post_bin.lua b/cli/bench/http/node_post_bin.lua new file mode 100644 index 0000000000..c8f5d3e3f7 --- /dev/null +++ b/cli/bench/http/node_post_bin.lua @@ -0,0 +1,5 @@ +wrk.method = "POST" +wrk.headers["Content-Type"] = "application/octet-stream" + +file = io.open("./cli/bench/testdata/128k.bin", "rb") +wrk.body = file:read("*a") \ No newline at end of file diff --git a/cli/tests/unit/fetch_test.ts b/cli/tests/unit/fetch_test.ts index 36c1926f2f..e2ff0d5e04 100644 --- a/cli/tests/unit/fetch_test.ts +++ b/cli/tests/unit/fetch_test.ts @@ -1789,3 +1789,19 @@ Deno.test( assertEquals(await res.text(), "ok"); }, ); + +Deno.test( + { permissions: { net: true } }, + async function fetchResponseStreamIsLockedWhileReading() { + const response = await fetch("http://localhost:4545/echo_server", { + body: new Uint8Array(5000), + method: "POST", + }); + + assertEquals(response.body!.locked, false); + const promise = response.arrayBuffer(); + assertEquals(response.body!.locked, true); + + await promise; + }, +); diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 3de93076e4..7bb16aecc5 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -2292,6 +2292,87 @@ Deno.test("upgradeHttp unix", { await Promise.all([server, client()]); }); +Deno.test( + { permissions: { net: true } }, + async function httpServerReadLargeBodyWithContentLength() { + const TLS_PACKET_SIZE = 16 * 1024 + 256; + // We want the body to be read in multiple packets + const body = "aa\n" + "deno.land large body\n".repeat(TLS_PACKET_SIZE) + + "zz"; + + let httpConn: Deno.HttpConn; + const promise = (async () => { + const listener = Deno.listen({ port: 4501 }); + const conn = await listener.accept(); + listener.close(); + httpConn = Deno.serveHttp(conn); + const reqEvent = await httpConn.nextRequest(); + assert(reqEvent); + const { request, respondWith } = reqEvent; + assertEquals(await request.text(), body); + await respondWith(new Response(body)); + })(); + + const resp = await fetch("http://127.0.0.1:4501/", { + method: "POST", + headers: { "connection": "close" }, + body, + }); + const text = await resp.text(); + assertEquals(text, body); + await promise; + + httpConn!.close(); + }, +); + +Deno.test( + { permissions: { net: true } }, + async function httpServerReadLargeBodyWithTransferChunked() { + const TLS_PACKET_SIZE = 16 * 1024 + 256; + + // We want the body to be read in multiple packets + const chunks = [ + "aa\n", + "deno.land large body\n".repeat(TLS_PACKET_SIZE), + "zz", + ]; + + const body = chunks.join(""); + + const stream = new TransformStream(); + const writer = stream.writable.getWriter(); + for (const chunk of chunks) { + writer.write(new TextEncoder().encode(chunk)); + } + writer.close(); + + let httpConn: Deno.HttpConn; + const promise = (async () => { + const listener = Deno.listen({ port: 4501 }); + const conn = await listener.accept(); + listener.close(); + httpConn = Deno.serveHttp(conn); + const reqEvent = await httpConn.nextRequest(); + assert(reqEvent); + const { request, respondWith } = reqEvent; + assertEquals(await request.text(), body); + await respondWith(new Response(body)); + })(); + + const resp = await fetch("http://127.0.0.1:4501/", { + method: "POST", + headers: { "connection": "close" }, + body: stream.readable, + }); + const text = await resp.text(); + assertEquals(text, body); + await promise; + + httpConn!.close(); + }, +); + function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(r); diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs index 6ca2a132cc..7393d4b699 100644 --- a/core/ops_builtin.rs +++ b/core/ops_builtin.rs @@ -34,6 +34,7 @@ pub(crate) fn init_builtins() -> Extension { op_add::decl(), // // TODO(@AaronO): track IO metrics for builtin streams op_read::decl(), + op_read_all::decl(), op_write::decl(), op_shutdown::decl(), op_metrics::decl(), @@ -168,6 +169,26 @@ async fn op_read( resource.read_return(buf).await.map(|(n, _)| n as u32) } +#[op] +async fn op_read_all( + state: Rc>, + rid: ResourceId, +) -> Result { + let resource = state.borrow().resource_table.get_any(rid)?; + let (min, maximum) = resource.size_hint(); + let size = maximum.unwrap_or(min) as usize; + + let mut buffer = Vec::with_capacity(size); + loop { + let tmp = ZeroCopyBuf::new_temp(vec![0u8; 64 * 1024]); + let (nread, tmp) = resource.clone().read_return(tmp).await?; + if nread == 0 { + return Ok(buffer.into()); + } + buffer.extend_from_slice(&tmp[..nread]); + } +} + #[op] async fn op_write( state: Rc>, diff --git a/core/resources.rs b/core/resources.rs index 56c9298af3..1a1ba31934 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -64,6 +64,10 @@ pub trait Resource: Any + 'static { fn backing_fd(self: Rc) -> Option { None } + + fn size_hint(&self) -> (u64, Option) { + (0, None) + } } impl dyn Resource { diff --git a/ext/fetch/22_body.js b/ext/fetch/22_body.js index 6e9a574478..429b56ae1d 100644 --- a/ext/fetch/22_body.js +++ b/ext/fetch/22_body.js @@ -30,19 +30,18 @@ errorReadableStream, readableStreamClose, readableStreamDisturb, + readableStreamCollectIntoUint8Array, createProxy, ReadableStreamPrototype, } = globalThis.__bootstrap.streams; const { ArrayBufferPrototype, ArrayBufferIsView, - ArrayPrototypePush, ArrayPrototypeMap, JSONParse, ObjectDefineProperties, ObjectPrototypeIsPrototypeOf, PromiseResolve, - TypedArrayPrototypeSet, TypedArrayPrototypeSlice, TypeError, Uint8Array, @@ -66,12 +65,10 @@ } class InnerBody { - #knownExactLength = null; - /** * @param {ReadableStream | { body: Uint8Array | string, consumed: boolean }} stream */ - constructor(stream, knownExactLength) { + constructor(stream) { /** @type {ReadableStream | { body: Uint8Array | string, consumed: boolean }} */ this.streamOrStatic = stream ?? { body: new Uint8Array(), consumed: false }; @@ -79,8 +76,6 @@ this.source = null; /** @type {null | number} */ this.length = null; - - this.#knownExactLength = knownExactLength; } get stream() { @@ -144,7 +139,7 @@ * https://fetch.spec.whatwg.org/#concept-body-consume-body * @returns {Promise} */ - async consume() { + consume() { if (this.unusable()) throw new TypeError("Body already consumed."); if ( ObjectPrototypeIsPrototypeOf( @@ -152,40 +147,7 @@ this.streamOrStatic, ) ) { - const reader = this.stream.getReader(); - /** @type {Uint8Array[]} */ - const chunks = []; - - let finalBuffer = this.#knownExactLength - ? new Uint8Array(this.#knownExactLength) - : null; - - let totalLength = 0; - while (true) { - const { value: chunk, done } = await reader.read(); - if (done) break; - - if (finalBuffer) { - // fast path, content-length is present - TypedArrayPrototypeSet(finalBuffer, chunk, totalLength); - } else { - // slow path, content-length is not present - ArrayPrototypePush(chunks, chunk); - } - totalLength += chunk.byteLength; - } - - if (finalBuffer) { - return finalBuffer; - } - - finalBuffer = new Uint8Array(totalLength); - let i = 0; - for (const chunk of chunks) { - TypedArrayPrototypeSet(finalBuffer, chunk, i); - i += chunk.byteLength; - } - return finalBuffer; + return readableStreamCollectIntoUint8Array(this.stream); } else { this.streamOrStatic.consumed = true; return this.streamOrStatic.body; @@ -224,7 +186,7 @@ clone() { const [out1, out2] = this.stream.tee(); this.streamOrStatic = out1; - const second = new InnerBody(out2, this.#knownExactLength); + const second = new InnerBody(out2); second.source = core.deserialize(core.serialize(this.source)); second.length = this.length; return second; diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 3e90429ce4..169db2bbf2 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -17,7 +17,7 @@ const webidl = window.__bootstrap.webidl; const { byteLowerCase } = window.__bootstrap.infra; const { BlobPrototype } = window.__bootstrap.file; - const { errorReadableStream, ReadableStreamPrototype } = + const { errorReadableStream, ReadableStreamPrototype, readableStreamForRid } = window.__bootstrap.streams; const { InnerBody, extractBody } = window.__bootstrap.fetchBody; const { @@ -44,7 +44,6 @@ String, StringPrototypeStartsWith, StringPrototypeToLowerCase, - TypedArrayPrototypeSubarray, TypeError, Uint8Array, Uint8ArrayPrototype, @@ -89,65 +88,22 @@ return core.opAsync("op_fetch_send", rid); } - // A finalization registry to clean up underlying fetch resources that are GC'ed. - const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => { - core.tryClose(rid); - }); - /** * @param {number} responseBodyRid * @param {AbortSignal} [terminator] * @returns {ReadableStream} */ function createResponseBodyStream(responseBodyRid, terminator) { + const readable = readableStreamForRid(responseBodyRid); + function onAbort() { - if (readable) { - errorReadableStream(readable, terminator.reason); - } + errorReadableStream(readable, terminator.reason); core.tryClose(responseBodyRid); } + // TODO(lucacasonato): clean up registration terminator[abortSignal.add](onAbort); - const readable = new ReadableStream({ - type: "bytes", - async pull(controller) { - try { - // This is the largest possible size for a single packet on a TLS - // stream. - const chunk = new Uint8Array(16 * 1024 + 256); - // TODO(@AaronO): switch to handle nulls if that's moved to core - const read = await core.read( - responseBodyRid, - chunk, - ); - if (read > 0) { - // We read some data. Enqueue it onto the stream. - controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); - } else { - RESOURCE_REGISTRY.unregister(readable); - // We have reached the end of the body, so we close the stream. - controller.close(); - core.tryClose(responseBodyRid); - } - } catch (err) { - RESOURCE_REGISTRY.unregister(readable); - if (terminator.aborted) { - controller.error(terminator.reason); - } else { - // There was an error while reading a chunk of the body, so we - // error. - controller.error(err); - } - core.tryClose(responseBodyRid); - } - }, - cancel() { - if (!terminator.aborted) { - terminator[abortSignal.signalAbort](); - } - }, - }); - RESOURCE_REGISTRY.register(readable, responseBodyRid, readable); + return readable; } @@ -338,7 +294,6 @@ } else { response.body = new InnerBody( createResponseBodyStream(resp.responseRid, terminator), - resp.contentLength, ); } } diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index a7daaa63ae..0adc32343d 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -408,6 +408,7 @@ pub async fn op_fetch_send( .add(FetchResponseBodyResource { reader: AsyncRefCell::new(stream_reader), cancel: CancelHandle::default(), + size: content_length, }); Ok(FetchResponse { @@ -479,6 +480,7 @@ type BytesStream = struct FetchResponseBodyResource { reader: AsyncRefCell>, cancel: CancelHandle, + size: Option, } impl Resource for FetchResponseBodyResource { @@ -498,6 +500,10 @@ impl Resource for FetchResponseBodyResource { }) } + fn size_hint(&self) -> (u64, Option) { + (0, self.size) + } + fn close(self: Rc) { self.cancel.cancel() } diff --git a/ext/http/lib.rs b/ext/http/lib.rs index bffe3c3d5d..a8c2810bc7 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -39,6 +39,8 @@ use flate2::write::GzEncoder; use flate2::Compression; use fly_accept_encoding::Encoding; use hyper::body::Bytes; +use hyper::body::HttpBody; +use hyper::body::SizeHint; use hyper::header::HeaderName; use hyper::header::HeaderValue; use hyper::server::conn::Http; @@ -309,6 +311,7 @@ pub struct HttpStreamResource { wr: AsyncRefCell, accept_encoding: Encoding, cancel_handle: CancelHandle, + size: SizeHint, } impl HttpStreamResource { @@ -318,11 +321,13 @@ impl HttpStreamResource { response_tx: oneshot::Sender>, accept_encoding: Encoding, ) -> Self { + let size = request.body().size_hint(); Self { conn: conn.clone(), rd: HttpRequestReader::Headers(request).into(), wr: HttpResponseWriter::Headers(response_tx).into(), accept_encoding, + size, cancel_handle: CancelHandle::new(), } } @@ -388,6 +393,10 @@ impl Resource for HttpStreamResource { fn close(self: Rc) { self.cancel_handle.cancel(); } + + fn size_hint(&self) -> (u64, Option) { + (self.size.lower(), self.size.upper()) + } } /// The read half of an HTTP stream. diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 412c58c3c1..ba422b71dd 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -48,6 +48,7 @@ SymbolAsyncIterator, SymbolFor, TypeError, + TypedArrayPrototypeSet, Uint8Array, Uint8ArrayPrototype, Uint16ArrayPrototype, @@ -647,6 +648,10 @@ const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB + // A finalization registry to clean up underlying resources that are GC'ed. + const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => { + core.tryClose(rid); + }); /** * Create a new ReadableStream object that is backed by a Resource that * implements `Resource::read_return`. This object contains enough metadata to @@ -660,6 +665,17 @@ function readableStreamForRid(rid, autoClose = true) { const stream = webidl.createBranded(ReadableStream); stream[_resourceBacking] = { rid, autoClose }; + + const tryClose = () => { + if (!autoClose) return; + RESOURCE_REGISTRY.unregister(stream); + core.tryClose(rid); + }; + + if (autoClose) { + RESOURCE_REGISTRY.register(stream, rid, stream); + } + const underlyingSource = { type: "bytes", async pull(controller) { @@ -667,7 +683,7 @@ try { const bytesRead = await core.read(rid, v); if (bytesRead === 0) { - if (autoClose) core.tryClose(rid); + tryClose(); controller.close(); controller.byobRequest.respond(0); } else { @@ -675,11 +691,11 @@ } } catch (e) { controller.error(e); - if (autoClose) core.tryClose(rid); + tryClose(); } }, cancel() { - if (autoClose) core.tryClose(rid); + tryClose(); }, autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, }; @@ -766,6 +782,59 @@ return stream[_resourceBacking]; } + async function readableStreamCollectIntoUint8Array(stream) { + const resourceBacking = getReadableStreamResourceBacking(stream); + const reader = acquireReadableStreamDefaultReader(stream); + + if (resourceBacking) { + // fast path, read whole body in a single op call + try { + readableStreamDisturb(stream); + const buf = await core.opAsync("op_read_all", resourceBacking.rid); + readableStreamThrowIfErrored(stream); + readableStreamClose(stream); + return buf; + } catch (err) { + readableStreamThrowIfErrored(stream); + readableStreamError(stream, err); + throw err; + } finally { + if (resourceBacking.autoClose) { + core.tryClose(resourceBacking.rid); + } + } + } + + // slow path + /** @type {Uint8Array[]} */ + const chunks = []; + let totalLength = 0; + while (true) { + const { value: chunk, done } = await reader.read(); + if (done) break; + + ArrayPrototypePush(chunks, chunk); + totalLength += chunk.byteLength; + } + + const finalBuffer = new Uint8Array(totalLength); + let i = 0; + for (const chunk of chunks) { + TypedArrayPrototypeSet(finalBuffer, chunk, i); + i += chunk.byteLength; + } + return finalBuffer; + } + + /* + * @param {ReadableStream} stream + */ + function readableStreamThrowIfErrored(stream) { + if (stream[_state] === "errored") { + throw stream[_storedError]; + } + } + /** * @param {unknown} value * @returns {value is WritableStream} @@ -5982,6 +6051,7 @@ createProxy, writableStreamClose, readableStreamClose, + readableStreamCollectIntoUint8Array, readableStreamDisturb, readableStreamForRid, readableStreamForRidUnrefable, diff --git a/tools/wpt/expectation.json b/tools/wpt/expectation.json index 79e22ea144..4afa460668 100644 --- a/tools/wpt/expectation.json +++ b/tools/wpt/expectation.json @@ -2963,21 +2963,11 @@ "stream-response.any.worker.html": true, "stream-safe-creation.any.html": [ "throwing Object.prototype.start accessor should not affect stream creation by 'fetch'", - "Object.prototype.start accessor returning invalid value should not affect stream creation by 'fetch'", - "throwing Object.prototype.type accessor should not affect stream creation by 'fetch'", - "throwing Object.prototype.size accessor should not affect stream creation by 'fetch'", - "Object.prototype.size accessor returning invalid value should not affect stream creation by 'fetch'", - "throwing Object.prototype.highWaterMark accessor should not affect stream creation by 'fetch'", - "Object.prototype.highWaterMark accessor returning invalid value should not affect stream creation by 'fetch'" + "Object.prototype.start accessor returning invalid value should not affect stream creation by 'fetch'" ], "stream-safe-creation.any.worker.html": [ "throwing Object.prototype.start accessor should not affect stream creation by 'fetch'", - "Object.prototype.start accessor returning invalid value should not affect stream creation by 'fetch'", - "throwing Object.prototype.type accessor should not affect stream creation by 'fetch'", - "throwing Object.prototype.size accessor should not affect stream creation by 'fetch'", - "Object.prototype.size accessor returning invalid value should not affect stream creation by 'fetch'", - "throwing Object.prototype.highWaterMark accessor should not affect stream creation by 'fetch'", - "Object.prototype.highWaterMark accessor returning invalid value should not affect stream creation by 'fetch'" + "Object.prototype.start accessor returning invalid value should not affect stream creation by 'fetch'" ], "integrity.sub.any.html": [ "Invalid integrity",