mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 08:33:43 -05:00
fix(ext/http): flush chunk when streaming resource (#16536)
When streaming a resource in ext/http, with compression enabled, we didn't flush individual chunks. This became very problematic when we enabled `req.body` from `fetch` for FastStream recently. This commit now correctly flushes each resource chunk after compression.
This commit is contained in:
parent
61fbfabe44
commit
1410e4adea
2 changed files with 82 additions and 1 deletions
|
@ -2515,6 +2515,83 @@ Deno.test(
|
|||
},
|
||||
);
|
||||
|
||||
Deno.test({
|
||||
name: "http server compresses and flushes each chunk of a streamed resource",
|
||||
permissions: { net: true, run: true },
|
||||
async fn() {
|
||||
const hostname = "localhost";
|
||||
const port = 4501;
|
||||
const port2 = 4502;
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
const listener = Deno.listen({ hostname, port });
|
||||
const listener2 = Deno.listen({ hostname, port: port2 });
|
||||
|
||||
let httpConn: Deno.HttpConn;
|
||||
async function server() {
|
||||
const tcpConn = await listener.accept();
|
||||
httpConn = Deno.serveHttp(tcpConn);
|
||||
const e = await httpConn.nextRequest();
|
||||
assert(e);
|
||||
const { request, respondWith } = e;
|
||||
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
|
||||
const resp = await fetch(`http://${hostname}:${port2}/`);
|
||||
await respondWith(resp);
|
||||
listener.close();
|
||||
}
|
||||
|
||||
const ts = new TransformStream();
|
||||
const writer = ts.writable.getWriter();
|
||||
writer.write(encoder.encode("hello"));
|
||||
|
||||
let httpConn2: Deno.HttpConn;
|
||||
async function server2() {
|
||||
const tcpConn = await listener2.accept();
|
||||
httpConn2 = Deno.serveHttp(tcpConn);
|
||||
const e = await httpConn2.nextRequest();
|
||||
assert(e);
|
||||
await e.respondWith(
|
||||
new Response(ts.readable, {
|
||||
headers: { "Content-Type": "text/plain" },
|
||||
}),
|
||||
);
|
||||
listener2.close();
|
||||
}
|
||||
|
||||
async function client() {
|
||||
const url = `http://${hostname}:${port}/`;
|
||||
const args = [
|
||||
"--request",
|
||||
"GET",
|
||||
"--url",
|
||||
url,
|
||||
"--header",
|
||||
"Accept-Encoding: gzip, deflate, br",
|
||||
"--no-buffer",
|
||||
];
|
||||
const proc = Deno.spawnChild("curl", { args, stderr: "null" });
|
||||
const stdout = proc.stdout
|
||||
.pipeThrough(new DecompressionStream("gzip"))
|
||||
.pipeThrough(new TextDecoderStream());
|
||||
let body = "";
|
||||
for await (const chunk of stdout) {
|
||||
body += chunk;
|
||||
if (body === "hello") {
|
||||
writer.write(encoder.encode(" world"));
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
assertEquals(body, "hello world");
|
||||
const status = await proc.status;
|
||||
assert(status.success);
|
||||
}
|
||||
|
||||
await Promise.all([server(), server2(), client()]);
|
||||
httpConn!.close();
|
||||
httpConn2!.close();
|
||||
},
|
||||
});
|
||||
|
||||
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
|
||||
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
|
||||
const tp = new TextProtoReader(r);
|
||||
|
|
|
@ -769,7 +769,11 @@ async fn op_http_write_resource(
|
|||
|
||||
match &mut *wr {
|
||||
HttpResponseWriter::Body(body) => {
|
||||
if let Err(err) = body.write_all(&view).await {
|
||||
let mut result = body.write_all(&view).await;
|
||||
if result.is_ok() {
|
||||
result = body.flush().await;
|
||||
}
|
||||
if let Err(err) = result {
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
|
||||
// Don't return "broken pipe", that's an implementation detail.
|
||||
// Pull up the failure associated with the transport connection instead.
|
||||
|
|
Loading…
Reference in a new issue