1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-26 00:59:24 -05:00

Reland "feat(ext/http): stream auto resp body compression" (#14345)

This commit is contained in:
Divy Srivastava 2022-04-21 12:37:49 +05:30 committed by crowlkats
parent 29c8cd8aae
commit 5aaa6d5206
No known key found for this signature in database
GPG key ID: A82C9D461FC483E8
4 changed files with 237 additions and 94 deletions

1
Cargo.lock generated
View file

@ -964,6 +964,7 @@ dependencies = [
name = "deno_http" name = "deno_http"
version = "0.42.0" version = "0.42.0"
dependencies = [ dependencies = [
"async-compression",
"base64 0.13.0", "base64 0.13.0",
"brotli", "brotli",
"bytes", "bytes",

View file

@ -1185,26 +1185,25 @@ Deno.test(
const decoder = new TextDecoder(); const decoder = new TextDecoder();
Deno.test({ Deno.test({
name: "http server compresses body", name: "http server compresses body - check headers",
permissions: { net: true, run: true }, permissions: { net: true, run: true },
async fn() { async fn() {
const hostname = "localhost"; const hostname = "localhost";
const port = 4501; const port = 4501;
const listener = Deno.listen({ hostname, port });
const data = { hello: "deno", now: "with", compressed: "body" };
async function server() { async function server() {
const listener = Deno.listen({ hostname, port });
const tcpConn = await listener.accept(); const tcpConn = await listener.accept();
const httpConn = Deno.serveHttp(tcpConn); const httpConn = Deno.serveHttp(tcpConn);
const e = await httpConn.nextRequest(); const e = await httpConn.nextRequest();
assert(e); assert(e);
const { request, respondWith } = e; const { request, respondWith } = e;
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
const response = new Response( const response = new Response(JSON.stringify(data), {
JSON.stringify({ hello: "deno", now: "with", compressed: "body" }), headers: { "content-type": "application/json" },
{ });
headers: { "content-type": "application/json" },
},
);
await respondWith(response); await respondWith(response);
httpConn.close(); httpConn.close();
listener.close(); listener.close();
@ -1235,6 +1234,60 @@ Deno.test({
}, },
}); });
Deno.test({
name: "http server compresses body - check body",
permissions: { net: true, run: true },
async fn() {
const hostname = "localhost";
const port = 4501;
const listener = Deno.listen({ hostname, port });
const data = { hello: "deno", now: "with", compressed: "body" };
async function server() {
const tcpConn = await listener.accept();
const httpConn = Deno.serveHttp(tcpConn);
const e = await httpConn.nextRequest();
assert(e);
const { request, respondWith } = e;
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
const response = new Response(JSON.stringify(data), {
headers: { "content-type": "application/json" },
});
await respondWith(response);
httpConn.close();
listener.close();
}
async function client() {
const url = `http://${hostname}:${port}/`;
const cmd = [
"curl",
"--request",
"GET",
"--url",
url,
"--header",
"Accept-Encoding: gzip, deflate, br",
];
const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" });
const status = await proc.status();
assert(status.success);
const stdout = proc.stdout!.readable
.pipeThrough(new DecompressionStream("gzip"))
.pipeThrough(new TextDecoderStream());
let body = "";
for await (const chunk of stdout) {
body += chunk;
}
assertEquals(JSON.parse(body), data);
proc.close();
}
await Promise.all([server(), client()]);
},
});
Deno.test({ Deno.test({
name: "http server doesn't compress small body", name: "http server doesn't compress small body",
permissions: { net: true, run: true }, permissions: { net: true, run: true },
@ -1614,15 +1667,18 @@ Deno.test({
}); });
Deno.test({ Deno.test({
name: "http server doesn't compress streamed bodies", name: "http server compresses streamed bodies - check headers",
permissions: { net: true, run: true }, permissions: { net: true, run: true },
async fn() { async fn() {
const hostname = "localhost"; const hostname = "localhost";
const port = 4501; const port = 4501;
const encoder = new TextEncoder();
const listener = Deno.listen({ hostname, port });
const data = { hello: "deno", now: "with", compressed: "body" };
async function server() { async function server() {
const encoder = new TextEncoder();
const listener = Deno.listen({ hostname, port });
const tcpConn = await listener.accept(); const tcpConn = await listener.accept();
const httpConn = Deno.serveHttp(tcpConn); const httpConn = Deno.serveHttp(tcpConn);
const e = await httpConn.nextRequest(); const e = await httpConn.nextRequest();
@ -1631,23 +1687,13 @@ Deno.test({
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
const bodyInit = new ReadableStream({ const bodyInit = new ReadableStream({
start(controller) { start(controller) {
controller.enqueue( controller.enqueue(encoder.encode(JSON.stringify(data)));
encoder.encode(
JSON.stringify({
hello: "deno",
now: "with",
compressed: "body",
}),
),
);
controller.close(); controller.close();
}, },
}); });
const response = new Response( const response = new Response(
bodyInit, bodyInit,
{ { headers: { "content-type": "application/json" } },
headers: { "content-type": "application/json", vary: "Accept" },
},
); );
await respondWith(response); await respondWith(response);
httpConn.close(); httpConn.close();
@ -1670,8 +1716,71 @@ Deno.test({
const status = await proc.status(); const status = await proc.status();
assert(status.success); assert(status.success);
const output = decoder.decode(await proc.output()); const output = decoder.decode(await proc.output());
assert(output.includes("vary: Accept\r\n")); assert(output.includes("vary: Accept-Encoding\r\n"));
assert(!output.includes("content-encoding: ")); assert(output.includes("content-encoding: gzip\r\n"));
proc.close();
}
await Promise.all([server(), client()]);
},
});
Deno.test({
name: "http server compresses streamed bodies - check body",
permissions: { net: true, run: true },
async fn() {
const hostname = "localhost";
const port = 4501;
const encoder = new TextEncoder();
const listener = Deno.listen({ hostname, port });
const data = { hello: "deno", now: "with", compressed: "body" };
async function server() {
const tcpConn = await listener.accept();
const httpConn = Deno.serveHttp(tcpConn);
const e = await httpConn.nextRequest();
assert(e);
const { request, respondWith } = e;
assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br");
const bodyInit = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode(JSON.stringify(data)));
controller.close();
},
});
const response = new Response(
bodyInit,
{ headers: { "content-type": "application/json" } },
);
await respondWith(response);
httpConn.close();
listener.close();
}
async function client() {
const url = `http://${hostname}:${port}/`;
const cmd = [
"curl",
"--request",
"GET",
"--url",
url,
"--header",
"Accept-Encoding: gzip, deflate, br",
];
const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" });
const status = await proc.status();
assert(status.success);
const stdout = proc.stdout.readable
.pipeThrough(new DecompressionStream("gzip"))
.pipeThrough(new TextDecoderStream());
let body = "";
for await (const chunk of stdout) {
body += chunk;
}
assertEquals(JSON.parse(body), data);
proc.close(); proc.close();
} }
@ -1736,8 +1845,6 @@ Deno.test({
// Ensure the content-length header is updated. // Ensure the content-length header is updated.
assert(!output.includes(`content-length: ${contentLength}\r\n`)); assert(!output.includes(`content-length: ${contentLength}\r\n`));
assert(output.includes("content-length: 72\r\n")); assert(output.includes("content-length: 72\r\n"));
console.log(output);
proc.close(); proc.close();
} }

View file

@ -14,6 +14,7 @@ description = "HTTP server implementation for Deno"
path = "lib.rs" path = "lib.rs"
[dependencies] [dependencies]
async-compression = { version = "0.3.1", features = ["tokio", "brotli", "gzip"] }
base64 = "0.13.0" base64 = "0.13.0"
brotli = "3.3.3" brotli = "3.3.3"
bytes = "1" bytes = "1"

View file

@ -1,6 +1,7 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use bytes::Bytes; use async_compression::tokio::write::BrotliEncoder;
use async_compression::tokio::write::GzipEncoder;
use cache_control::CacheControl; use cache_control::CacheControl;
use deno_core::error::custom_error; use deno_core::error::custom_error;
use deno_core::error::AnyError; use deno_core::error::AnyError;
@ -21,7 +22,6 @@ use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt; use deno_core::futures::TryFutureExt;
use deno_core::include_js_files; use deno_core::include_js_files;
use deno_core::op; use deno_core::op;
use deno_core::AsyncRefCell; use deno_core::AsyncRefCell;
use deno_core::ByteString; use deno_core::ByteString;
use deno_core::CancelFuture; use deno_core::CancelFuture;
@ -60,7 +60,9 @@ use std::task::Context;
use std::task::Poll; use std::task::Poll;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use tokio::io::AsyncWrite; use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::task::spawn_local; use tokio::task::spawn_local;
use tokio_util::io::ReaderStream;
mod compressible; mod compressible;
@ -338,7 +340,7 @@ impl Default for HttpRequestReader {
/// The write half of an HTTP stream. /// The write half of an HTTP stream.
enum HttpResponseWriter { enum HttpResponseWriter {
Headers(oneshot::Sender<Response<Body>>), Headers(oneshot::Sender<Response<Body>>),
Body(hyper::body::Sender), Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
Closed, Closed,
} }
@ -545,56 +547,60 @@ async fn op_http_write_headers(
let body: Response<Body>; let body: Response<Body>;
let new_wr: HttpResponseWriter; let new_wr: HttpResponseWriter;
match data { // Set Vary: Accept-Encoding header for direct body response.
Some(data) => { // Note: we set the header irrespective of whether or not we compress the data
// Set Vary: Accept-Encoding header for direct body response. // to make sure cache services do not serve uncompressed data to clients that
// Note: we set the header irrespective of whether or not we compress the // support compression.
// data to make sure cache services do not serve uncompressed data to let vary_value = if let Some(value) = vary_header {
// clients that support compression. if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
let vary_value = if let Some(value) = vary_header { if !value_str.to_lowercase().contains("accept-encoding") {
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { format!("Accept-Encoding, {}", value_str)
if !value_str.to_lowercase().contains("accept-encoding") { } else {
format!("Accept-Encoding, {}", value_str) value_str.to_string()
} else { }
value_str.to_string() } else {
} // the header value wasn't valid UTF8, so it would have been a
// problem anyways, so sending a default header.
"Accept-Encoding".to_string()
}
} else {
"Accept-Encoding".to_string()
};
builder = builder.header("vary", &vary_value);
let accepts_compression = matches!(
*stream.accept_encoding.borrow(),
Encoding::Brotli | Encoding::Gzip
);
let should_compress = body_compressible
&& (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
&& accepts_compression;
if should_compress {
// If user provided a ETag header for uncompressed data, we need to
// ensure it is a Weak Etag header ("W/").
if let Some(value) = etag_header {
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
if !value_str.starts_with("W/") {
builder = builder.header("etag", format!("W/{}", value_str));
} else { } else {
// the header value wasn't valid UTF8, so it would have been a builder = builder.header("etag", value.as_slice());
// problem anyways, so sending a default header.
"Accept-Encoding".to_string()
} }
} else { } else {
"Accept-Encoding".to_string() builder = builder.header("etag", value.as_slice());
}; }
builder = builder.header("vary", &vary_value); }
// Drop 'content-length' header. Hyper will update it using compressed body.
let accepts_compression = matches!( if let Some(headers) = builder.headers_mut() {
*stream.accept_encoding.borrow(), headers.remove("content-length");
Encoding::Brotli | Encoding::Gzip }
); } else if let Some(value) = etag_header {
builder = builder.header("etag", value.as_slice());
let should_compress = }
body_compressible && data.len() > 20 && accepts_compression;
match data {
Some(data) => {
if should_compress { if should_compress {
// Drop 'content-length' header. Hyper will update it using compressed body.
if let Some(headers) = builder.headers_mut() {
headers.remove("content-length");
}
// If user provided a ETag header for uncompressed data, we need to
// ensure it is a Weak Etag header ("W/").
if let Some(value) = etag_header {
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
if !value_str.starts_with("W/") {
builder = builder.header("etag", format!("W/{}", value_str));
} else {
builder = builder.header("etag", value.as_slice());
}
} else {
builder = builder.header("etag", value.as_slice());
}
}
match *stream.accept_encoding.borrow() { match *stream.accept_encoding.borrow() {
Encoding::Brotli => { Encoding::Brotli => {
builder = builder.header("content-encoding", "br"); builder = builder.header("content-encoding", "br");
@ -621,9 +627,6 @@ async fn op_http_write_headers(
} }
} }
} else { } else {
if let Some(value) = etag_header {
builder = builder.header("etag", value.as_slice());
}
// If a buffer was passed, but isn't compressible, we use it to // If a buffer was passed, but isn't compressible, we use it to
// construct a response body. // construct a response body.
body = builder.body(data.into_bytes().into())?; body = builder.body(data.into_bytes().into())?;
@ -633,19 +636,35 @@ async fn op_http_write_headers(
None => { None => {
// If no buffer was passed, the caller will stream the response body. // If no buffer was passed, the caller will stream the response body.
// TODO(@kitsonk) had compression for streamed bodies. // Create a one way pipe that implements tokio's async io traits. To do
// this we create a [tokio::io::DuplexStream], but then throw away one
// of the directions to create a one way pipe.
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, writer) = tokio::io::split(b);
// Set the user provided ETag & Vary headers for a streaming response let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
if let Some(value) = etag_header {
builder = builder.header("etag", value.as_slice()); if should_compress {
} match *stream.accept_encoding.borrow() {
if let Some(value) = vary_header { Encoding::Brotli => {
builder = builder.header("vary", value.as_slice()); let writer = BrotliEncoder::new(writer);
writer_body = Box::pin(writer);
builder = builder.header("content-encoding", "br");
}
_ => {
assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
let writer = GzipEncoder::new(writer);
writer_body = Box::pin(writer);
builder = builder.header("content-encoding", "gzip");
}
}
} else {
writer_body = Box::pin(writer);
} }
let (body_tx, body_rx) = Body::channel(); body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
body = builder.body(body_rx)?; new_wr = HttpResponseWriter::Body(writer_body);
new_wr = HttpResponseWriter::Body(body_tx);
} }
} }
@ -677,7 +696,7 @@ async fn op_http_write(
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
loop { loop {
let body_tx = match &mut *wr { let body_writer = match &mut *wr {
HttpResponseWriter::Body(body_tx) => body_tx, HttpResponseWriter::Body(body_tx) => body_tx,
HttpResponseWriter::Headers(_) => { HttpResponseWriter::Headers(_) => {
break Err(http_error("no response headers")) break Err(http_error("no response headers"))
@ -687,13 +706,17 @@ async fn op_http_write(
} }
}; };
let bytes = Bytes::copy_from_slice(&buf[..]); let mut res = body_writer.write_all(&buf).await;
match body_tx.send_data(bytes).await { if res.is_ok() {
res = body_writer.flush().await;
}
match res {
Ok(_) => break Ok(()), Ok(_) => break Ok(()),
Err(err) => { Err(err) => {
// Don't return "channel closed", that's an implementation detail. assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead. // Pull up the failure associated with the transport connection instead.
assert!(err.is_closed());
stream.conn.closed().await?; stream.conn.closed().await?;
// If there was no connection error, drop body_tx. // If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed; *wr = HttpResponseWriter::Closed;
@ -715,7 +738,18 @@ async fn op_http_shutdown(
.resource_table .resource_table
.get::<HttpStreamResource>(rid)?; .get::<HttpStreamResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
take(&mut *wr); let wr = take(&mut *wr);
if let HttpResponseWriter::Body(mut body_writer) = wr {
match body_writer.shutdown().await {
Ok(_) => {}
Err(err) => {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
stream.conn.closed().await?;
}
}
}
Ok(()) Ok(())
} }