From 2a86b8fb02b594c42e057ca84b7171c22eba72d1 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Thu, 21 Apr 2022 12:37:49 +0530 Subject: [PATCH] Reland "feat(ext/http): stream auto resp body compression" (#14345) --- Cargo.lock | 1 + cli/tests/unit/http_test.ts | 161 ++++++++++++++++++++++++++++------ ext/http/Cargo.toml | 1 + ext/http/lib.rs | 168 ++++++++++++++++++++++-------------- 4 files changed, 237 insertions(+), 94 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 256d2c832c..cb115dcc2f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -964,6 +964,7 @@ dependencies = [ name = "deno_http" version = "0.42.0" dependencies = [ + "async-compression", "base64 0.13.0", "brotli", "bytes", diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 37c827b9b2..beb9557814 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -1185,26 +1185,25 @@ Deno.test( const decoder = new TextDecoder(); Deno.test({ - name: "http server compresses body", + name: "http server compresses body - check headers", permissions: { net: true, run: true }, async fn() { const hostname = "localhost"; const port = 4501; + const listener = Deno.listen({ hostname, port }); + + const data = { hello: "deno", now: "with", compressed: "body" }; async function server() { - const listener = Deno.listen({ hostname, port }); const tcpConn = await listener.accept(); const httpConn = Deno.serveHttp(tcpConn); const e = await httpConn.nextRequest(); assert(e); const { request, respondWith } = e; assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); - const response = new Response( - JSON.stringify({ hello: "deno", now: "with", compressed: "body" }), - { - headers: { "content-type": "application/json" }, - }, - ); + const response = new Response(JSON.stringify(data), { + headers: { "content-type": "application/json" }, + }); await respondWith(response); httpConn.close(); listener.close(); @@ -1235,6 +1234,60 @@ Deno.test({ }, }); +Deno.test({ + name: "http server compresses body - check body", + permissions: { net: true, run: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + const listener = Deno.listen({ hostname, port }); + + const data = { hello: "deno", now: "with", compressed: "body" }; + + async function server() { + const tcpConn = await listener.accept(); + const httpConn = Deno.serveHttp(tcpConn); + const e = await httpConn.nextRequest(); + assert(e); + const { request, respondWith } = e; + assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); + const response = new Response(JSON.stringify(data), { + headers: { "content-type": "application/json" }, + }); + await respondWith(response); + httpConn.close(); + listener.close(); + } + + async function client() { + const url = `http://${hostname}:${port}/`; + const cmd = [ + "curl", + "--request", + "GET", + "--url", + url, + "--header", + "Accept-Encoding: gzip, deflate, br", + ]; + const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" }); + const status = await proc.status(); + assert(status.success); + const stdout = proc.stdout!.readable + .pipeThrough(new DecompressionStream("gzip")) + .pipeThrough(new TextDecoderStream()); + let body = ""; + for await (const chunk of stdout) { + body += chunk; + } + assertEquals(JSON.parse(body), data); + proc.close(); + } + + await Promise.all([server(), client()]); + }, +}); + Deno.test({ name: "http server doesn't compress small body", permissions: { net: true, run: true }, @@ -1614,15 +1667,18 @@ Deno.test({ }); Deno.test({ - name: "http server doesn't compress streamed bodies", + name: "http server compresses streamed bodies - check headers", permissions: { net: true, run: true }, async fn() { const hostname = "localhost"; const port = 4501; + const encoder = new TextEncoder(); + const listener = Deno.listen({ hostname, port }); + + const data = { hello: "deno", now: "with", compressed: "body" }; + async function server() { - const encoder = new TextEncoder(); - const listener = Deno.listen({ hostname, port }); const tcpConn = await listener.accept(); const httpConn = Deno.serveHttp(tcpConn); const e = await httpConn.nextRequest(); @@ -1631,23 +1687,13 @@ Deno.test({ assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); const bodyInit = new ReadableStream({ start(controller) { - controller.enqueue( - encoder.encode( - JSON.stringify({ - hello: "deno", - now: "with", - compressed: "body", - }), - ), - ); + controller.enqueue(encoder.encode(JSON.stringify(data))); controller.close(); }, }); const response = new Response( bodyInit, - { - headers: { "content-type": "application/json", vary: "Accept" }, - }, + { headers: { "content-type": "application/json" } }, ); await respondWith(response); httpConn.close(); @@ -1670,8 +1716,71 @@ Deno.test({ const status = await proc.status(); assert(status.success); const output = decoder.decode(await proc.output()); - assert(output.includes("vary: Accept\r\n")); - assert(!output.includes("content-encoding: ")); + assert(output.includes("vary: Accept-Encoding\r\n")); + assert(output.includes("content-encoding: gzip\r\n")); + proc.close(); + } + + await Promise.all([server(), client()]); + }, +}); + +Deno.test({ + name: "http server compresses streamed bodies - check body", + permissions: { net: true, run: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + + const encoder = new TextEncoder(); + const listener = Deno.listen({ hostname, port }); + + const data = { hello: "deno", now: "with", compressed: "body" }; + + async function server() { + const tcpConn = await listener.accept(); + const httpConn = Deno.serveHttp(tcpConn); + const e = await httpConn.nextRequest(); + assert(e); + const { request, respondWith } = e; + assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); + const bodyInit = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(JSON.stringify(data))); + controller.close(); + }, + }); + const response = new Response( + bodyInit, + { headers: { "content-type": "application/json" } }, + ); + await respondWith(response); + httpConn.close(); + listener.close(); + } + + async function client() { + const url = `http://${hostname}:${port}/`; + const cmd = [ + "curl", + "--request", + "GET", + "--url", + url, + "--header", + "Accept-Encoding: gzip, deflate, br", + ]; + const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" }); + const status = await proc.status(); + assert(status.success); + const stdout = proc.stdout.readable + .pipeThrough(new DecompressionStream("gzip")) + .pipeThrough(new TextDecoderStream()); + let body = ""; + for await (const chunk of stdout) { + body += chunk; + } + assertEquals(JSON.parse(body), data); proc.close(); } @@ -1736,8 +1845,6 @@ Deno.test({ // Ensure the content-length header is updated. assert(!output.includes(`content-length: ${contentLength}\r\n`)); assert(output.includes("content-length: 72\r\n")); - console.log(output); - proc.close(); } diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index af0cfbe1c7..a5f524ecfa 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -14,6 +14,7 @@ description = "HTTP server implementation for Deno" path = "lib.rs" [dependencies] +async-compression = { version = "0.3.1", features = ["tokio", "brotli", "gzip"] } base64 = "0.13.0" brotli = "3.3.3" bytes = "1" diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 9c0109937a..9c1b48fff2 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1,6 +1,7 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. -use bytes::Bytes; +use async_compression::tokio::write::BrotliEncoder; +use async_compression::tokio::write::GzipEncoder; use cache_control::CacheControl; use deno_core::error::custom_error; use deno_core::error::AnyError; @@ -21,7 +22,6 @@ use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op; - use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; @@ -60,7 +60,9 @@ use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; +use tokio_util::io::ReaderStream; mod compressible; @@ -338,7 +340,7 @@ impl Default for HttpRequestReader { /// The write half of an HTTP stream. enum HttpResponseWriter { Headers(oneshot::Sender>), - Body(hyper::body::Sender), + Body(Pin>), Closed, } @@ -545,56 +547,60 @@ async fn op_http_write_headers( let body: Response; let new_wr: HttpResponseWriter; - match data { - Some(data) => { - // Set Vary: Accept-Encoding header for direct body response. - // Note: we set the header irrespective of whether or not we compress the - // data to make sure cache services do not serve uncompressed data to - // clients that support compression. - let vary_value = if let Some(value) = vary_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.to_lowercase().contains("accept-encoding") { - format!("Accept-Encoding, {}", value_str) - } else { - value_str.to_string() - } + // Set Vary: Accept-Encoding header for direct body response. + // Note: we set the header irrespective of whether or not we compress the data + // to make sure cache services do not serve uncompressed data to clients that + // support compression. + let vary_value = if let Some(value) = vary_header { + if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { + if !value_str.to_lowercase().contains("accept-encoding") { + format!("Accept-Encoding, {}", value_str) + } else { + value_str.to_string() + } + } else { + // the header value wasn't valid UTF8, so it would have been a + // problem anyways, so sending a default header. + "Accept-Encoding".to_string() + } + } else { + "Accept-Encoding".to_string() + }; + builder = builder.header("vary", &vary_value); + + let accepts_compression = matches!( + *stream.accept_encoding.borrow(), + Encoding::Brotli | Encoding::Gzip + ); + let should_compress = body_compressible + && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none()) + && accepts_compression; + + if should_compress { + // If user provided a ETag header for uncompressed data, we need to + // ensure it is a Weak Etag header ("W/"). + if let Some(value) = etag_header { + if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { + if !value_str.starts_with("W/") { + builder = builder.header("etag", format!("W/{}", value_str)); } else { - // the header value wasn't valid UTF8, so it would have been a - // problem anyways, so sending a default header. - "Accept-Encoding".to_string() + builder = builder.header("etag", value.as_slice()); } } else { - "Accept-Encoding".to_string() - }; - builder = builder.header("vary", &vary_value); - - let accepts_compression = matches!( - *stream.accept_encoding.borrow(), - Encoding::Brotli | Encoding::Gzip - ); - - let should_compress = - body_compressible && data.len() > 20 && accepts_compression; + builder = builder.header("etag", value.as_slice()); + } + } + // Drop 'content-length' header. Hyper will update it using compressed body. + if let Some(headers) = builder.headers_mut() { + headers.remove("content-length"); + } + } else if let Some(value) = etag_header { + builder = builder.header("etag", value.as_slice()); + } + match data { + Some(data) => { if should_compress { - // Drop 'content-length' header. Hyper will update it using compressed body. - if let Some(headers) = builder.headers_mut() { - headers.remove("content-length"); - } - // If user provided a ETag header for uncompressed data, we need to - // ensure it is a Weak Etag header ("W/"). - if let Some(value) = etag_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.starts_with("W/") { - builder = builder.header("etag", format!("W/{}", value_str)); - } else { - builder = builder.header("etag", value.as_slice()); - } - } else { - builder = builder.header("etag", value.as_slice()); - } - } - match *stream.accept_encoding.borrow() { Encoding::Brotli => { builder = builder.header("content-encoding", "br"); @@ -621,9 +627,6 @@ async fn op_http_write_headers( } } } else { - if let Some(value) = etag_header { - builder = builder.header("etag", value.as_slice()); - } // If a buffer was passed, but isn't compressible, we use it to // construct a response body. body = builder.body(data.into_bytes().into())?; @@ -633,19 +636,35 @@ async fn op_http_write_headers( None => { // If no buffer was passed, the caller will stream the response body. - // TODO(@kitsonk) had compression for streamed bodies. + // Create a one way pipe that implements tokio's async io traits. To do + // this we create a [tokio::io::DuplexStream], but then throw away one + // of the directions to create a one way pipe. + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, writer) = tokio::io::split(b); - // Set the user provided ETag & Vary headers for a streaming response - if let Some(value) = etag_header { - builder = builder.header("etag", value.as_slice()); - } - if let Some(value) = vary_header { - builder = builder.header("vary", value.as_slice()); + let writer_body: Pin>; + + if should_compress { + match *stream.accept_encoding.borrow() { + Encoding::Brotli => { + let writer = BrotliEncoder::new(writer); + writer_body = Box::pin(writer); + builder = builder.header("content-encoding", "br"); + } + _ => { + assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip); + let writer = GzipEncoder::new(writer); + writer_body = Box::pin(writer); + builder = builder.header("content-encoding", "gzip"); + } + } + } else { + writer_body = Box::pin(writer); } - let (body_tx, body_rx) = Body::channel(); - body = builder.body(body_rx)?; - new_wr = HttpResponseWriter::Body(body_tx); + body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?; + new_wr = HttpResponseWriter::Body(writer_body); } } @@ -677,7 +696,7 @@ async fn op_http_write( let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; loop { - let body_tx = match &mut *wr { + let body_writer = match &mut *wr { HttpResponseWriter::Body(body_tx) => body_tx, HttpResponseWriter::Headers(_) => { break Err(http_error("no response headers")) @@ -687,13 +706,17 @@ async fn op_http_write( } }; - let bytes = Bytes::copy_from_slice(&buf[..]); - match body_tx.send_data(bytes).await { + let mut res = body_writer.write_all(&buf).await; + if res.is_ok() { + res = body_writer.flush().await; + } + + match res { Ok(_) => break Ok(()), Err(err) => { - // Don't return "channel closed", that's an implementation detail. + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. // Pull up the failure associated with the transport connection instead. - assert!(err.is_closed()); stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; @@ -715,7 +738,18 @@ async fn op_http_shutdown( .resource_table .get::(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - take(&mut *wr); + let wr = take(&mut *wr); + if let HttpResponseWriter::Body(mut body_writer) = wr { + match body_writer.shutdown().await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + stream.conn.closed().await?; + } + } + } Ok(()) }