From 702f7ee89c882f82b2fb649212bd8dbbb4476062 Mon Sep 17 00:00:00 2001 From: Rano | Ranadeep Date: Mon, 15 Jul 2024 13:40:59 +0200 Subject: [PATCH] fix(std/http2): release window capacity back to remote stream (#24576) This PR adds logic to release window capacity after reading the chunks from the stream. Without it, large response (more than `u16::MAX`) may fill up the capacity and the whole response can't be read. Closes https://github.com/denoland/deno/issues/24552 Closes https://github.com/denoland/deno/issues/24305 --- ext/node/ops/http2.rs | 29 ++++++++++++------------- tests/unit_node/http2_test.ts | 40 +++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/ext/node/ops/http2.rs b/ext/node/ops/http2.rs index d12e108e66..d877a5b14e 100644 --- a/ext/node/ops/http2.rs +++ b/ext/node/ops/http2.rs @@ -456,24 +456,21 @@ fn poll_data_or_trailers( cx: &mut std::task::Context, body: &mut RecvStream, ) -> Poll> { - loop { - if let Poll::Ready(trailers) = body.poll_trailers(cx) { - if let Some(trailers) = trailers? { - return Poll::Ready(Ok(DataOrTrailers::Trailers(trailers))); - } else { - return Poll::Ready(Ok(DataOrTrailers::Eof)); - } + if let Poll::Ready(trailers) = body.poll_trailers(cx) { + if let Some(trailers) = trailers? { + return Poll::Ready(Ok(DataOrTrailers::Trailers(trailers))); + } else { + return Poll::Ready(Ok(DataOrTrailers::Eof)); } - if let Poll::Ready(data) = body.poll_data(cx) { - if let Some(data) = data { - return Poll::Ready(Ok(DataOrTrailers::Data(data?))); - } - // If data is None, loop one more time to check for trailers - continue; - } - // Return pending here as poll_data will keep the waker - return Poll::Pending; } + if let Poll::Ready(Some(data)) = body.poll_data(cx) { + let data = data?; + body.flow_control().release_capacity(data.len())?; + return Poll::Ready(Ok(DataOrTrailers::Data(data))); + // If `poll_data` returns `Ready(None)`, poll one more time to check for trailers + } + // Return pending here as poll_data will keep the waker + Poll::Pending } #[op2(async)] diff --git a/tests/unit_node/http2_test.ts b/tests/unit_node/http2_test.ts index f2604b344c..968ef15a06 100644 --- a/tests/unit_node/http2_test.ts +++ b/tests/unit_node/http2_test.ts @@ -203,3 +203,43 @@ Deno.test("[node/http2 client] write image buffer on request stream works", asyn await endPromise.promise; assertEquals(receivedData!, buffer); }); + +Deno.test("[node/http2 client] write 512kb buffer on request stream works", async () => { + const url = "https://localhost:5545"; + const client = http2.connect(url); + client.on("error", (err) => console.error(err)); + + const filePath = join( + import.meta.dirname!, + "testdata", + "lorem_ipsum_512kb.txt", + ); + const buffer = await readFile(filePath); + const req = client.request({ ":method": "POST", ":path": "/echo_server" }); + req.write(buffer, (err) => { + if (err) throw err; + }); + + let receivedData: Buffer; + req.on("data", (chunk) => { + if (!receivedData) { + receivedData = chunk; + } else { + receivedData = Buffer.concat([receivedData, chunk]); + } + }); + req.end(); + + const endPromise = Promise.withResolvers(); + setTimeout(() => { + try { + client.close(); + } catch (_) { + // pass + } + endPromise.resolve(); + }, 2000); + + await endPromise.promise; + assertEquals(receivedData!, buffer); +});