mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 08:33:43 -05:00
feat(ext/http): Automatic compression for Deno.serve (#19031)
`Content-Encoding: gzip` support for `Deno.serve`. This doesn't support Brotli (`br`) yet, however it should not be difficult to add. Heuristics for compression are modelled after those in `Deno.serveHttp`. Tests are provided to ensure that the gzip compression is correct. We chunk a number of different streams (zeros, hard-to-compress data, already-gzipped data) in a number of different ways (regular, random, large/small, small/large).
This commit is contained in:
parent
3dc745c881
commit
234cef982c
5 changed files with 847 additions and 156 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1046,6 +1046,7 @@ dependencies = [
|
|||
"percent-encoding",
|
||||
"phf",
|
||||
"pin-project",
|
||||
"rand",
|
||||
"ring",
|
||||
"serde",
|
||||
"slab",
|
||||
|
|
|
@ -1425,41 +1425,6 @@ Deno.test(
|
|||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { net: true, write: true, read: true } },
|
||||
async function httpServerCorrectSizeResponse() {
|
||||
const promise = deferred();
|
||||
const listeningPromise = deferred();
|
||||
const ac = new AbortController();
|
||||
|
||||
const tmpFile = await Deno.makeTempFile();
|
||||
const file = await Deno.open(tmpFile, { write: true, read: true });
|
||||
await file.write(new Uint8Array(70 * 1024).fill(1)); // 70kb sent in 64kb + 6kb chunks
|
||||
file.close();
|
||||
|
||||
const server = Deno.serve({
|
||||
handler: async (request) => {
|
||||
const f = await Deno.open(tmpFile, { read: true });
|
||||
promise.resolve();
|
||||
return new Response(f.readable);
|
||||
},
|
||||
port: 4503,
|
||||
signal: ac.signal,
|
||||
onListen: onListen(listeningPromise),
|
||||
onError: createOnErrorCb(ac),
|
||||
});
|
||||
|
||||
await listeningPromise;
|
||||
const resp = await fetch("http://127.0.0.1:4503/");
|
||||
await promise;
|
||||
const body = await resp.arrayBuffer();
|
||||
|
||||
assertEquals(body.byteLength, 70 * 1024);
|
||||
ac.abort();
|
||||
await server;
|
||||
},
|
||||
);
|
||||
|
||||
// https://github.com/denoland/deno/issues/12741
|
||||
// https://github.com/denoland/deno/pull/12746
|
||||
// https://github.com/denoland/deno/pull/12798
|
||||
|
@ -2012,38 +1977,146 @@ Deno.test(
|
|||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { net: true, write: true, read: true } },
|
||||
async function httpServerSendFile() {
|
||||
const promise = deferred();
|
||||
const ac = new AbortController();
|
||||
const listeningPromise = deferred();
|
||||
const tmpFile = await Deno.makeTempFile();
|
||||
const file = await Deno.open(tmpFile, { write: true, read: true });
|
||||
const data = new Uint8Array(70 * 1024).fill(1);
|
||||
await file.write(data);
|
||||
file.close();
|
||||
const server = Deno.serve({
|
||||
handler: async () => {
|
||||
const f = await Deno.open(tmpFile, { read: true });
|
||||
promise.resolve();
|
||||
return new Response(f.readable, { status: 200 });
|
||||
},
|
||||
port: 4503,
|
||||
signal: ac.signal,
|
||||
onListen: onListen(listeningPromise),
|
||||
onError: createOnErrorCb(ac),
|
||||
});
|
||||
function makeTempData(size: number) {
|
||||
return new Uint8Array(size).fill(1);
|
||||
}
|
||||
|
||||
await listeningPromise;
|
||||
const response = await fetch(`http://localhost:4503/`);
|
||||
assertEquals(response.status, 200);
|
||||
await promise;
|
||||
assertEquals(new Uint8Array(await response.arrayBuffer()), data);
|
||||
ac.abort();
|
||||
await server;
|
||||
async function makeTempFile(size: number) {
|
||||
const tmpFile = await Deno.makeTempFile();
|
||||
const file = await Deno.open(tmpFile, { write: true, read: true });
|
||||
const data = makeTempData(size);
|
||||
await file.write(data);
|
||||
file.close();
|
||||
|
||||
return await Deno.open(tmpFile, { write: true, read: true });
|
||||
}
|
||||
|
||||
const compressionTestCases = [
|
||||
{ name: "Empty", length: 0, in: {}, out: {}, expect: null },
|
||||
{
|
||||
name: "EmptyAcceptGzip",
|
||||
length: 0,
|
||||
in: { "Accept-Encoding": "gzip" },
|
||||
out: {},
|
||||
expect: null,
|
||||
},
|
||||
);
|
||||
// This technically would be compressible if not for the size, however the size_hint is not implemented
|
||||
// for FileResource and we don't currently peek ahead on resources.
|
||||
// {
|
||||
// name: "EmptyAcceptGzip2",
|
||||
// length: 0,
|
||||
// in: { "Accept-Encoding": "gzip" },
|
||||
// out: { "Content-Type": "text/plain" },
|
||||
// expect: null,
|
||||
// },
|
||||
{ name: "Uncompressible", length: 1024, in: {}, out: {}, expect: null },
|
||||
{
|
||||
name: "UncompressibleAcceptGzip",
|
||||
length: 1024,
|
||||
in: { "Accept-Encoding": "gzip" },
|
||||
out: {},
|
||||
expect: null,
|
||||
},
|
||||
{
|
||||
name: "UncompressibleType",
|
||||
length: 1024,
|
||||
in: { "Accept-Encoding": "gzip" },
|
||||
out: { "Content-Type": "text/fake" },
|
||||
expect: null,
|
||||
},
|
||||
{
|
||||
name: "CompressibleType",
|
||||
length: 1024,
|
||||
in: { "Accept-Encoding": "gzip" },
|
||||
out: { "Content-Type": "text/plain" },
|
||||
expect: "gzip",
|
||||
},
|
||||
{
|
||||
name: "CompressibleType2",
|
||||
length: 1024,
|
||||
in: { "Accept-Encoding": "gzip, deflate, br" },
|
||||
out: { "Content-Type": "text/plain" },
|
||||
expect: "gzip",
|
||||
},
|
||||
{
|
||||
name: "UncompressibleRange",
|
||||
length: 1024,
|
||||
in: { "Accept-Encoding": "gzip" },
|
||||
out: { "Content-Type": "text/plain", "Content-Range": "1" },
|
||||
expect: null,
|
||||
},
|
||||
{
|
||||
name: "UncompressibleCE",
|
||||
length: 1024,
|
||||
in: { "Accept-Encoding": "gzip" },
|
||||
out: { "Content-Type": "text/plain", "Content-Encoding": "random" },
|
||||
expect: null,
|
||||
},
|
||||
{
|
||||
name: "UncompressibleCC",
|
||||
length: 1024,
|
||||
in: { "Accept-Encoding": "gzip" },
|
||||
out: { "Content-Type": "text/plain", "Cache-Control": "no-transform" },
|
||||
expect: null,
|
||||
},
|
||||
];
|
||||
|
||||
for (const testCase of compressionTestCases) {
|
||||
const name = `httpServerCompression${testCase.name}`;
|
||||
Deno.test(
|
||||
{ permissions: { net: true, write: true, read: true } },
|
||||
{
|
||||
[name]: async function () {
|
||||
const promise = deferred();
|
||||
const ac = new AbortController();
|
||||
const listeningPromise = deferred();
|
||||
const server = Deno.serve({
|
||||
handler: async (request) => {
|
||||
const f = await makeTempFile(testCase.length);
|
||||
promise.resolve();
|
||||
const headers = testCase.out as any;
|
||||
headers["Content-Length"] = testCase.length.toString();
|
||||
return new Response(f.readable, {
|
||||
headers: headers as HeadersInit,
|
||||
});
|
||||
},
|
||||
port: 4503,
|
||||
signal: ac.signal,
|
||||
onListen: onListen(listeningPromise),
|
||||
onError: createOnErrorCb(ac),
|
||||
});
|
||||
try {
|
||||
await listeningPromise;
|
||||
const resp = await fetch("http://127.0.0.1:4503/", {
|
||||
headers: testCase.in as HeadersInit,
|
||||
});
|
||||
await promise;
|
||||
const body = await resp.arrayBuffer();
|
||||
if (testCase.expect == null) {
|
||||
assertEquals(body.byteLength, testCase.length);
|
||||
assertEquals(
|
||||
resp.headers.get("content-length"),
|
||||
testCase.length.toString(),
|
||||
);
|
||||
assertEquals(
|
||||
resp.headers.get("content-encoding"),
|
||||
testCase.out["Content-Encoding"] || null,
|
||||
);
|
||||
} else if (testCase.expect == "gzip") {
|
||||
// Note the fetch will transparently decompress this response, BUT we can detect that a response
|
||||
// was compressed by the lack of a content length.
|
||||
assertEquals(body.byteLength, testCase.length);
|
||||
assertEquals(resp.headers.get("content-encoding"), null);
|
||||
assertEquals(resp.headers.get("content-length"), null);
|
||||
}
|
||||
} finally {
|
||||
ac.abort();
|
||||
await server;
|
||||
}
|
||||
},
|
||||
}[name],
|
||||
);
|
||||
}
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { net: true, write: true, read: true } },
|
||||
|
@ -2052,15 +2125,12 @@ Deno.test(
|
|||
const ac = new AbortController();
|
||||
const listeningPromise = deferred();
|
||||
|
||||
const tmpFile = await Deno.makeTempFile();
|
||||
const file = await Deno.open(tmpFile, { write: true, read: true });
|
||||
const data = new Uint8Array(70 * 1024).fill(1);
|
||||
await file.write(data);
|
||||
file.close();
|
||||
|
||||
const server = Deno.serve({
|
||||
handler: async (request) => {
|
||||
assertEquals(new Uint8Array(await request.arrayBuffer()), data);
|
||||
assertEquals(
|
||||
new Uint8Array(await request.arrayBuffer()),
|
||||
makeTempData(70 * 1024),
|
||||
);
|
||||
promise.resolve();
|
||||
return new Response("ok");
|
||||
},
|
||||
|
@ -2071,7 +2141,7 @@ Deno.test(
|
|||
});
|
||||
|
||||
await listeningPromise;
|
||||
const f = await Deno.open(tmpFile, { write: true, read: true });
|
||||
const f = await makeTempFile(70 * 1024);
|
||||
const response = await fetch(`http://localhost:4503/`, {
|
||||
method: "POST",
|
||||
body: f.readable,
|
||||
|
|
|
@ -50,3 +50,4 @@ tokio-util = { workspace = true, features = ["io"] }
|
|||
|
||||
[dev-dependencies]
|
||||
bencher.workspace = true
|
||||
rand.workspace = true
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use crate::compressible::is_content_compressible;
|
||||
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||
use crate::extract_network_stream;
|
||||
use crate::network_buffered_stream::NetworkStreamPrefixCheck;
|
||||
|
@ -7,17 +8,18 @@ use crate::request_properties::HttpConnectionProperties;
|
|||
use crate::request_properties::HttpListenProperties;
|
||||
use crate::request_properties::HttpPropertyExtractor;
|
||||
use crate::response_body::CompletionHandle;
|
||||
use crate::response_body::Compression;
|
||||
use crate::response_body::ResponseBytes;
|
||||
use crate::response_body::ResponseBytesInner;
|
||||
use crate::response_body::V8StreamHttpResponseBody;
|
||||
use crate::websocket_upgrade::WebSocketUpgrade;
|
||||
use crate::LocalExecutor;
|
||||
use cache_control::CacheControl;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::TryFutureExt;
|
||||
use deno_core::op;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::AsyncResult;
|
||||
use deno_core::BufView;
|
||||
use deno_core::ByteString;
|
||||
use deno_core::CancelFuture;
|
||||
use deno_core::CancelHandle;
|
||||
|
@ -31,7 +33,15 @@ use deno_net::ops_tls::TlsStream;
|
|||
use deno_net::raw::put_network_stream_resource;
|
||||
use deno_net::raw::NetworkStream;
|
||||
use deno_net::raw::NetworkStreamAddress;
|
||||
use fly_accept_encoding::Encoding;
|
||||
use http::header::ACCEPT_ENCODING;
|
||||
use http::header::CACHE_CONTROL;
|
||||
use http::header::CONTENT_ENCODING;
|
||||
use http::header::CONTENT_LENGTH;
|
||||
use http::header::CONTENT_RANGE;
|
||||
use http::header::CONTENT_TYPE;
|
||||
use http::request::Parts;
|
||||
use http::HeaderMap;
|
||||
use hyper1::body::Incoming;
|
||||
use hyper1::header::COOKIE;
|
||||
use hyper1::http::HeaderName;
|
||||
|
@ -483,6 +493,131 @@ pub fn op_http_set_response_headers(
|
|||
})
|
||||
}
|
||||
|
||||
fn is_request_compressible(headers: &HeaderMap) -> Compression {
|
||||
let Some(accept_encoding) = headers.get(ACCEPT_ENCODING) else {
|
||||
return Compression::None;
|
||||
};
|
||||
// Firefox and Chrome send this -- no need to parse
|
||||
if accept_encoding == "gzip, deflate, br" {
|
||||
return Compression::GZip;
|
||||
}
|
||||
if accept_encoding == "gzip" {
|
||||
return Compression::GZip;
|
||||
}
|
||||
// Fall back to the expensive parser
|
||||
let accepted = fly_accept_encoding::encodings_iter(headers).filter(|r| {
|
||||
matches!(r, Ok((Some(Encoding::Identity | Encoding::Gzip), _)))
|
||||
});
|
||||
#[allow(clippy::single_match)]
|
||||
match fly_accept_encoding::preferred(accepted) {
|
||||
Ok(Some(fly_accept_encoding::Encoding::Gzip)) => return Compression::GZip,
|
||||
_ => {}
|
||||
}
|
||||
Compression::None
|
||||
}
|
||||
|
||||
fn is_response_compressible(headers: &HeaderMap) -> bool {
|
||||
if let Some(content_type) = headers.get(CONTENT_TYPE) {
|
||||
if !is_content_compressible(content_type) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
if headers.contains_key(CONTENT_ENCODING) {
|
||||
return false;
|
||||
}
|
||||
if headers.contains_key(CONTENT_RANGE) {
|
||||
return false;
|
||||
}
|
||||
if let Some(cache_control) = headers.get(CACHE_CONTROL) {
|
||||
if let Ok(s) = std::str::from_utf8(cache_control.as_bytes()) {
|
||||
if let Some(cache_control) = CacheControl::from_value(s) {
|
||||
if cache_control.no_transform {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
fn modify_compressibility_from_response(
|
||||
compression: Compression,
|
||||
length: Option<usize>,
|
||||
headers: &mut HeaderMap,
|
||||
) -> Compression {
|
||||
ensure_vary_accept_encoding(headers);
|
||||
if let Some(length) = length {
|
||||
// By the time we add compression headers and Accept-Encoding, it probably doesn't make sense
|
||||
// to compress stuff that's smaller than this.
|
||||
if length < 64 {
|
||||
return Compression::None;
|
||||
}
|
||||
}
|
||||
if compression == Compression::None {
|
||||
return Compression::None;
|
||||
}
|
||||
if !is_response_compressible(headers) {
|
||||
return Compression::None;
|
||||
}
|
||||
weaken_etag(headers);
|
||||
headers.remove(CONTENT_LENGTH);
|
||||
headers.insert(CONTENT_ENCODING, HeaderValue::from_static("gzip"));
|
||||
compression
|
||||
}
|
||||
|
||||
/// If the user provided a ETag header for uncompressed data, we need to ensure it is a
|
||||
/// weak Etag header ("W/").
|
||||
fn weaken_etag(hmap: &mut HeaderMap) {
|
||||
if let Some(etag) = hmap.get_mut(hyper::header::ETAG) {
|
||||
if !etag.as_bytes().starts_with(b"W/") {
|
||||
let mut v = Vec::with_capacity(etag.as_bytes().len() + 2);
|
||||
v.extend(b"W/");
|
||||
v.extend(etag.as_bytes());
|
||||
*etag = v.try_into().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set Vary: Accept-Encoding header for direct body response.
|
||||
// Note: we set the header irrespective of whether or not we compress the data
|
||||
// to make sure cache services do not serve uncompressed data to clients that
|
||||
// support compression.
|
||||
fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
|
||||
if let Some(v) = hmap.get_mut(hyper::header::VARY) {
|
||||
if let Ok(s) = v.to_str() {
|
||||
if !s.to_lowercase().contains("accept-encoding") {
|
||||
*v = format!("Accept-Encoding, {s}").try_into().unwrap()
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
hmap.insert(
|
||||
hyper::header::VARY,
|
||||
HeaderValue::from_static("Accept-Encoding"),
|
||||
);
|
||||
}
|
||||
|
||||
fn set_response(
|
||||
index: u32,
|
||||
length: Option<usize>,
|
||||
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
|
||||
) {
|
||||
let compression =
|
||||
with_req(index, |req| is_request_compressible(&req.headers));
|
||||
|
||||
with_resp_mut(index, move |response| {
|
||||
let response = response.as_mut().unwrap();
|
||||
let compression = modify_compressibility_from_response(
|
||||
compression,
|
||||
length,
|
||||
response.headers_mut(),
|
||||
);
|
||||
response.body_mut().initialize(response_fn(compression))
|
||||
});
|
||||
}
|
||||
|
||||
#[op(fast)]
|
||||
pub fn op_http_set_response_body_resource(
|
||||
state: &mut OpState,
|
||||
|
@ -497,14 +632,13 @@ pub fn op_http_set_response_body_resource(
|
|||
state.resource_table.get_any(stream_rid)?
|
||||
};
|
||||
|
||||
with_resp_mut(index, move |response| {
|
||||
let future = resource.clone().read(64 * 1024);
|
||||
response
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.body_mut()
|
||||
.initialize(ResponseBytesInner::Resource(auto_close, resource, future));
|
||||
});
|
||||
set_response(
|
||||
index,
|
||||
resource.size_hint().1.map(|s| s as usize),
|
||||
move |compression| {
|
||||
ResponseBytesInner::from_resource(compression, resource, auto_close)
|
||||
},
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -516,27 +650,19 @@ pub fn op_http_set_response_body_stream(
|
|||
) -> Result<ResourceId, AnyError> {
|
||||
// TODO(mmastrac): what should this channel size be?
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
let (tx, rx) = (
|
||||
V8StreamHttpResponseBody::new(tx),
|
||||
ResponseBytesInner::V8Stream(rx),
|
||||
);
|
||||
|
||||
with_resp_mut(index, move |response| {
|
||||
response.as_mut().unwrap().body_mut().initialize(rx);
|
||||
set_response(index, None, |compression| {
|
||||
ResponseBytesInner::from_v8(compression, rx)
|
||||
});
|
||||
|
||||
Ok(state.resource_table.add(tx))
|
||||
Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx)))
|
||||
}
|
||||
|
||||
#[op(fast)]
|
||||
pub fn op_http_set_response_body_text(index: u32, text: String) {
|
||||
if !text.is_empty() {
|
||||
with_resp_mut(index, move |response| {
|
||||
response
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.body_mut()
|
||||
.initialize(ResponseBytesInner::Bytes(BufView::from(text.into_bytes())))
|
||||
set_response(index, Some(text.len()), |compression| {
|
||||
ResponseBytesInner::from_vec(compression, text.into_bytes())
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -544,12 +670,8 @@ pub fn op_http_set_response_body_text(index: u32, text: String) {
|
|||
#[op(fast)]
|
||||
pub fn op_http_set_response_body_bytes(index: u32, buffer: &[u8]) {
|
||||
if !buffer.is_empty() {
|
||||
with_resp_mut(index, |response| {
|
||||
response
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.body_mut()
|
||||
.initialize(ResponseBytesInner::Bytes(BufView::from(buffer.to_vec())))
|
||||
set_response(index, Some(buffer.len()), |compression| {
|
||||
ResponseBytesInner::from_slice(compression, buffer)
|
||||
});
|
||||
};
|
||||
}
|
||||
|
|
|
@ -2,12 +2,16 @@
|
|||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::future::Future;
|
||||
use std::io::Write;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::task::Waker;
|
||||
|
||||
use bytes::Bytes;
|
||||
use bytes::BytesMut;
|
||||
use deno_core::error::bad_resource;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::ready;
|
||||
use deno_core::futures::FutureExt;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::AsyncResult;
|
||||
|
@ -17,9 +21,44 @@ use deno_core::CancelTryFuture;
|
|||
use deno_core::RcRef;
|
||||
use deno_core::Resource;
|
||||
use deno_core::WriteOutcome;
|
||||
use flate2::write::GzEncoder;
|
||||
use http::HeaderMap;
|
||||
use hyper1::body::Body;
|
||||
use hyper1::body::Frame;
|
||||
use hyper1::body::SizeHint;
|
||||
use pin_project::pin_project;
|
||||
|
||||
/// Simplification for nested types we use for our streams. We provide a way to convert from
|
||||
/// this type into Hyper's body [`Frame`].
|
||||
enum ResponseStreamResult {
|
||||
/// Stream is over.
|
||||
EndOfStream,
|
||||
/// Stream provided non-empty data.
|
||||
NonEmptyBuf(BufView),
|
||||
/// Stream is ready, but provided no data. Retry. This is a result that is like Pending, but does
|
||||
/// not register a waker and should be called again at the lowest level of this code. Generally this
|
||||
/// will only be returned from compression streams that require additional buffering.
|
||||
NoData,
|
||||
/// Stream provided trailers.
|
||||
// TODO(mmastrac): We are threading trailers through the response system to eventually support Grpc.
|
||||
#[allow(unused)]
|
||||
Trailers(HeaderMap),
|
||||
/// Stream failed.
|
||||
Error(AnyError),
|
||||
}
|
||||
|
||||
impl From<ResponseStreamResult> for Option<Result<Frame<BufView>, AnyError>> {
|
||||
fn from(value: ResponseStreamResult) -> Self {
|
||||
match value {
|
||||
ResponseStreamResult::EndOfStream => None,
|
||||
ResponseStreamResult::NonEmptyBuf(buf) => Some(Ok(Frame::data(buf))),
|
||||
ResponseStreamResult::Error(err) => Some(Err(err)),
|
||||
ResponseStreamResult::Trailers(map) => Some(Ok(Frame::trailers(map))),
|
||||
// This result should be handled by retrying
|
||||
ResponseStreamResult::NoData => unimplemented!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct CompletionHandle {
|
||||
|
@ -62,6 +101,28 @@ impl Future for CompletionHandle {
|
|||
}
|
||||
}
|
||||
|
||||
trait PollFrame: Unpin {
|
||||
fn poll_frame(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<ResponseStreamResult>;
|
||||
|
||||
fn size_hint(&self) -> SizeHint;
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq)]
|
||||
pub enum Compression {
|
||||
None,
|
||||
GZip,
|
||||
}
|
||||
|
||||
pub enum ResponseStream {
|
||||
/// A resource stream, piped in fast mode.
|
||||
Resource(ResourceBodyAdapter),
|
||||
/// A JS-backed stream, written in JS and transported via pipe.
|
||||
V8Stream(tokio::sync::mpsc::Receiver<BufView>),
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub enum ResponseBytesInner {
|
||||
/// An empty stream.
|
||||
|
@ -69,12 +130,12 @@ pub enum ResponseBytesInner {
|
|||
Empty,
|
||||
/// A completed stream.
|
||||
Done,
|
||||
/// A static buffer of bytes, sent it one fell swoop.
|
||||
/// A static buffer of bytes, sent in one fell swoop.
|
||||
Bytes(BufView),
|
||||
/// A resource stream, piped in fast mode.
|
||||
Resource(bool, Rc<dyn Resource>, AsyncResult<BufView>),
|
||||
/// A JS-backed stream, written in JS and transported via pipe.
|
||||
V8Stream(tokio::sync::mpsc::Receiver<BufView>),
|
||||
/// An uncompressed stream.
|
||||
UncompressedStream(ResponseStream),
|
||||
/// A GZip stream.
|
||||
GZipStream(GZipResponseStream),
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ResponseBytesInner {
|
||||
|
@ -83,8 +144,8 @@ impl std::fmt::Debug for ResponseBytesInner {
|
|||
Self::Done => f.write_str("Done"),
|
||||
Self::Empty => f.write_str("Empty"),
|
||||
Self::Bytes(..) => f.write_str("Bytes"),
|
||||
Self::Resource(..) => f.write_str("Resource"),
|
||||
Self::V8Stream(..) => f.write_str("V8Stream"),
|
||||
Self::UncompressedStream(..) => f.write_str("Uncompressed"),
|
||||
Self::GZipStream(..) => f.write_str("GZip"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -122,16 +183,54 @@ impl ResponseBytesInner {
|
|||
Self::Done => SizeHint::with_exact(0),
|
||||
Self::Empty => SizeHint::with_exact(0),
|
||||
Self::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64),
|
||||
Self::Resource(_, res, _) => {
|
||||
let hint = res.size_hint();
|
||||
let mut size_hint = SizeHint::new();
|
||||
size_hint.set_lower(hint.0);
|
||||
if let Some(upper) = hint.1 {
|
||||
size_hint.set_upper(upper)
|
||||
}
|
||||
size_hint
|
||||
}
|
||||
Self::V8Stream(..) => SizeHint::default(),
|
||||
Self::UncompressedStream(res) => res.size_hint(),
|
||||
Self::GZipStream(..) => SizeHint::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_stream(compression: Compression, stream: ResponseStream) -> Self {
|
||||
if compression == Compression::GZip {
|
||||
Self::GZipStream(GZipResponseStream::new(stream))
|
||||
} else {
|
||||
Self::UncompressedStream(stream)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_v8(
|
||||
compression: Compression,
|
||||
rx: tokio::sync::mpsc::Receiver<BufView>,
|
||||
) -> Self {
|
||||
Self::from_stream(compression, ResponseStream::V8Stream(rx))
|
||||
}
|
||||
|
||||
pub fn from_resource(
|
||||
compression: Compression,
|
||||
stm: Rc<dyn Resource>,
|
||||
auto_close: bool,
|
||||
) -> Self {
|
||||
Self::from_stream(
|
||||
compression,
|
||||
ResponseStream::Resource(ResourceBodyAdapter::new(stm, auto_close)),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self {
|
||||
if compression == Compression::GZip {
|
||||
let mut writer = GzEncoder::new(Vec::new(), flate2::Compression::fast());
|
||||
writer.write_all(bytes).unwrap();
|
||||
Self::Bytes(BufView::from(writer.finish().unwrap()))
|
||||
} else {
|
||||
Self::Bytes(BufView::from(bytes.to_vec()))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_vec(compression: Compression, vec: Vec<u8>) -> Self {
|
||||
if compression == Compression::GZip {
|
||||
let mut writer = GzEncoder::new(Vec::new(), flate2::Compression::fast());
|
||||
writer.write_all(&vec).unwrap();
|
||||
Self::Bytes(BufView::from(writer.finish().unwrap()))
|
||||
} else {
|
||||
Self::Bytes(BufView::from(vec))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -144,48 +243,33 @@ impl Body for ResponseBytes {
|
|||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
||||
match &mut self.0 {
|
||||
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
|
||||
unreachable!()
|
||||
}
|
||||
ResponseBytesInner::Bytes(..) => {
|
||||
if let ResponseBytesInner::Bytes(data) = self.complete(true) {
|
||||
std::task::Poll::Ready(Some(Ok(Frame::data(data))))
|
||||
} else {
|
||||
let res = loop {
|
||||
let res = match &mut self.0 {
|
||||
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
|
||||
unreachable!()
|
||||
}
|
||||
ResponseBytesInner::Bytes(..) => {
|
||||
let ResponseBytesInner::Bytes(data) = self.complete(true) else { unreachable!(); };
|
||||
return std::task::Poll::Ready(Some(Ok(Frame::data(data))));
|
||||
}
|
||||
ResponseBytesInner::UncompressedStream(stm) => {
|
||||
ready!(Pin::new(stm).poll_frame(cx))
|
||||
}
|
||||
ResponseBytesInner::GZipStream(stm) => {
|
||||
ready!(Pin::new(stm).poll_frame(cx))
|
||||
}
|
||||
};
|
||||
// This is where we retry the NoData response
|
||||
if matches!(res, ResponseStreamResult::NoData) {
|
||||
continue;
|
||||
}
|
||||
ResponseBytesInner::Resource(auto_close, stm, ref mut future) => {
|
||||
match future.poll_unpin(cx) {
|
||||
std::task::Poll::Pending => std::task::Poll::Pending,
|
||||
std::task::Poll::Ready(Err(err)) => {
|
||||
std::task::Poll::Ready(Some(Err(err)))
|
||||
}
|
||||
std::task::Poll::Ready(Ok(buf)) => {
|
||||
if buf.is_empty() {
|
||||
if *auto_close {
|
||||
stm.clone().close();
|
||||
}
|
||||
self.complete(true);
|
||||
return std::task::Poll::Ready(None);
|
||||
}
|
||||
// Re-arm the future
|
||||
*future = stm.clone().read(64 * 1024);
|
||||
std::task::Poll::Ready(Some(Ok(Frame::data(buf))))
|
||||
}
|
||||
}
|
||||
}
|
||||
ResponseBytesInner::V8Stream(stm) => match stm.poll_recv(cx) {
|
||||
std::task::Poll::Pending => std::task::Poll::Pending,
|
||||
std::task::Poll::Ready(Some(buf)) => {
|
||||
std::task::Poll::Ready(Some(Ok(Frame::data(buf))))
|
||||
}
|
||||
std::task::Poll::Ready(None) => {
|
||||
self.complete(true);
|
||||
std::task::Poll::Ready(None)
|
||||
}
|
||||
},
|
||||
break res;
|
||||
};
|
||||
|
||||
if matches!(res, ResponseStreamResult::EndOfStream) {
|
||||
self.complete(true);
|
||||
}
|
||||
std::task::Poll::Ready(res.into())
|
||||
}
|
||||
|
||||
fn is_end_stream(&self) -> bool {
|
||||
|
@ -206,6 +290,243 @@ impl Drop for ResponseBytes {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct ResourceBodyAdapter {
|
||||
auto_close: bool,
|
||||
stm: Rc<dyn Resource>,
|
||||
future: AsyncResult<BufView>,
|
||||
}
|
||||
|
||||
impl ResourceBodyAdapter {
|
||||
pub fn new(stm: Rc<dyn Resource>, auto_close: bool) -> Self {
|
||||
let future = stm.clone().read(64 * 1024);
|
||||
ResourceBodyAdapter {
|
||||
auto_close,
|
||||
stm,
|
||||
future,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PollFrame for ResponseStream {
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<ResponseStreamResult> {
|
||||
match &mut *self {
|
||||
ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx),
|
||||
ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx),
|
||||
}
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> SizeHint {
|
||||
match self {
|
||||
ResponseStream::Resource(res) => res.size_hint(),
|
||||
ResponseStream::V8Stream(res) => res.size_hint(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PollFrame for ResourceBodyAdapter {
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<ResponseStreamResult> {
|
||||
let res = match ready!(self.future.poll_unpin(cx)) {
|
||||
Err(err) => ResponseStreamResult::Error(err),
|
||||
Ok(buf) => {
|
||||
if buf.is_empty() {
|
||||
if self.auto_close {
|
||||
self.stm.clone().close();
|
||||
}
|
||||
ResponseStreamResult::EndOfStream
|
||||
} else {
|
||||
// Re-arm the future
|
||||
self.future = self.stm.clone().read(64 * 1024);
|
||||
ResponseStreamResult::NonEmptyBuf(buf)
|
||||
}
|
||||
}
|
||||
};
|
||||
std::task::Poll::Ready(res)
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> SizeHint {
|
||||
let hint = self.stm.size_hint();
|
||||
let mut size_hint = SizeHint::new();
|
||||
size_hint.set_lower(hint.0);
|
||||
if let Some(upper) = hint.1 {
|
||||
size_hint.set_upper(upper)
|
||||
}
|
||||
size_hint
|
||||
}
|
||||
}
|
||||
|
||||
impl PollFrame for tokio::sync::mpsc::Receiver<BufView> {
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<ResponseStreamResult> {
|
||||
let res = match ready!(self.poll_recv(cx)) {
|
||||
Some(buf) => ResponseStreamResult::NonEmptyBuf(buf),
|
||||
None => ResponseStreamResult::EndOfStream,
|
||||
};
|
||||
std::task::Poll::Ready(res)
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> SizeHint {
|
||||
SizeHint::default()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
enum GZipState {
|
||||
Header,
|
||||
Streaming,
|
||||
Flushing,
|
||||
Trailer,
|
||||
EndOfStream,
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
pub struct GZipResponseStream {
|
||||
stm: flate2::Compress,
|
||||
crc: flate2::Crc,
|
||||
next_buf: Option<BytesMut>,
|
||||
partial: Option<BufView>,
|
||||
#[pin]
|
||||
underlying: ResponseStream,
|
||||
state: GZipState,
|
||||
}
|
||||
|
||||
impl GZipResponseStream {
|
||||
pub fn new(underlying: ResponseStream) -> Self {
|
||||
Self {
|
||||
stm: flate2::Compress::new(flate2::Compression::fast(), false),
|
||||
crc: flate2::Crc::new(),
|
||||
next_buf: None,
|
||||
partial: None,
|
||||
state: GZipState::Header,
|
||||
underlying,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is a minimal GZip header suitable for serving data from a webserver. We don't need to provide
|
||||
/// most of the information. We're skipping header name, CRC, etc, and providing a null timestamp.
|
||||
///
|
||||
/// We're using compression level 1, as higher levels don't produce significant size differences. This
|
||||
/// is probably the reason why nginx's default gzip compression level is also 1:
|
||||
///
|
||||
/// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
|
||||
static GZIP_HEADER: Bytes =
|
||||
Bytes::from_static(&[0x1f, 0x8b, 0x08, 0, 0, 0, 0, 0, 0x01, 0xff]);
|
||||
|
||||
impl PollFrame for GZipResponseStream {
|
||||
fn poll_frame(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<ResponseStreamResult> {
|
||||
let this = self.get_mut();
|
||||
let state = &mut this.state;
|
||||
let orig_state = *state;
|
||||
let frame = match *state {
|
||||
GZipState::EndOfStream => {
|
||||
return std::task::Poll::Ready(ResponseStreamResult::EndOfStream)
|
||||
}
|
||||
GZipState::Header => {
|
||||
*state = GZipState::Streaming;
|
||||
return std::task::Poll::Ready(ResponseStreamResult::NonEmptyBuf(
|
||||
BufView::from(GZIP_HEADER.clone()),
|
||||
));
|
||||
}
|
||||
GZipState::Trailer => {
|
||||
*state = GZipState::EndOfStream;
|
||||
let mut v = Vec::with_capacity(8);
|
||||
v.extend(&this.crc.sum().to_le_bytes());
|
||||
v.extend(&this.crc.amount().to_le_bytes());
|
||||
return std::task::Poll::Ready(ResponseStreamResult::NonEmptyBuf(
|
||||
BufView::from(v),
|
||||
));
|
||||
}
|
||||
GZipState::Streaming => {
|
||||
if let Some(partial) = this.partial.take() {
|
||||
ResponseStreamResult::NonEmptyBuf(partial)
|
||||
} else {
|
||||
ready!(Pin::new(&mut this.underlying).poll_frame(cx))
|
||||
}
|
||||
}
|
||||
GZipState::Flushing => ResponseStreamResult::EndOfStream,
|
||||
};
|
||||
|
||||
let stm = &mut this.stm;
|
||||
|
||||
// Ideally we could use MaybeUninit here, but flate2 requires &[u8]. We should also try
|
||||
// to dynamically adjust this buffer.
|
||||
let mut buf = this
|
||||
.next_buf
|
||||
.take()
|
||||
.unwrap_or_else(|| BytesMut::zeroed(64 * 1024));
|
||||
|
||||
let start_in = stm.total_in();
|
||||
let start_out = stm.total_out();
|
||||
let res = match frame {
|
||||
// Short-circuit these and just return
|
||||
x @ (ResponseStreamResult::NoData
|
||||
| ResponseStreamResult::Error(..)
|
||||
| ResponseStreamResult::Trailers(..)) => {
|
||||
return std::task::Poll::Ready(x)
|
||||
}
|
||||
ResponseStreamResult::EndOfStream => {
|
||||
*state = GZipState::Flushing;
|
||||
stm.compress(&[], &mut buf, flate2::FlushCompress::Finish)
|
||||
}
|
||||
ResponseStreamResult::NonEmptyBuf(mut input) => {
|
||||
let res = stm.compress(&input, &mut buf, flate2::FlushCompress::None);
|
||||
let len_in = (stm.total_in() - start_in) as usize;
|
||||
debug_assert!(len_in <= input.len());
|
||||
this.crc.update(&input[..len_in]);
|
||||
if len_in < input.len() {
|
||||
input.advance_cursor(len_in);
|
||||
this.partial = Some(input);
|
||||
}
|
||||
res
|
||||
}
|
||||
};
|
||||
let len = stm.total_out() - start_out;
|
||||
let res = match res {
|
||||
Err(err) => ResponseStreamResult::Error(err.into()),
|
||||
Ok(flate2::Status::BufError) => {
|
||||
// This should not happen
|
||||
unreachable!("old={orig_state:?} new={state:?} buf_len={}", buf.len());
|
||||
}
|
||||
Ok(flate2::Status::Ok) => {
|
||||
if len == 0 {
|
||||
this.next_buf = Some(buf);
|
||||
ResponseStreamResult::NoData
|
||||
} else {
|
||||
buf.truncate(len as usize);
|
||||
ResponseStreamResult::NonEmptyBuf(BufView::from(buf.freeze()))
|
||||
}
|
||||
}
|
||||
Ok(flate2::Status::StreamEnd) => {
|
||||
*state = GZipState::Trailer;
|
||||
if len == 0 {
|
||||
this.next_buf = Some(buf);
|
||||
ResponseStreamResult::NoData
|
||||
} else {
|
||||
buf.truncate(len as usize);
|
||||
ResponseStreamResult::NonEmptyBuf(BufView::from(buf.freeze()))
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
std::task::Poll::Ready(res)
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> SizeHint {
|
||||
SizeHint::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which
|
||||
/// feed's hyper's HTTP response.
|
||||
pub struct V8StreamHttpResponseBody(
|
||||
|
@ -251,3 +572,179 @@ impl Resource for V8StreamHttpResponseBody {
|
|||
self.1.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use deno_core::futures::future::poll_fn;
|
||||
use std::hash::Hasher;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
|
||||
fn zeros() -> Vec<u8> {
|
||||
vec![0; 1024 * 1024]
|
||||
}
|
||||
|
||||
fn hard_to_gzip_data() -> Vec<u8> {
|
||||
const SIZE: usize = 1024 * 1024;
|
||||
let mut v = Vec::with_capacity(SIZE);
|
||||
let mut hasher = std::collections::hash_map::DefaultHasher::new();
|
||||
for i in 0..SIZE {
|
||||
hasher.write_usize(i);
|
||||
v.push(hasher.finish() as u8);
|
||||
}
|
||||
v
|
||||
}
|
||||
|
||||
fn already_gzipped_data() -> Vec<u8> {
|
||||
let mut v = Vec::with_capacity(1024 * 1024);
|
||||
let mut gz =
|
||||
flate2::GzBuilder::new().write(&mut v, flate2::Compression::best());
|
||||
gz.write_all(&hard_to_gzip_data()).unwrap();
|
||||
_ = gz.finish().unwrap();
|
||||
v
|
||||
}
|
||||
|
||||
fn chunk(v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
|
||||
// Chunk the data into 10k
|
||||
let mut out = vec![];
|
||||
for v in v.chunks(10 * 1024) {
|
||||
out.push(v.to_vec());
|
||||
}
|
||||
out.into_iter()
|
||||
}
|
||||
|
||||
fn random(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
|
||||
let mut out = vec![];
|
||||
loop {
|
||||
if v.is_empty() {
|
||||
break;
|
||||
}
|
||||
let rand = (rand::random::<usize>() % v.len()) + 1;
|
||||
let new = v.split_off(rand);
|
||||
out.push(v);
|
||||
v = new;
|
||||
}
|
||||
// Print the lengths of the vectors if we actually fail this test at some point
|
||||
let lengths = out.iter().map(|v| v.len()).collect::<Vec<_>>();
|
||||
eprintln!("Lengths = {:?}", lengths);
|
||||
out.into_iter()
|
||||
}
|
||||
|
||||
fn front_load(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
|
||||
// Chunk the data at 90%
|
||||
let offset = (v.len() * 90) / 100;
|
||||
let v2 = v.split_off(offset);
|
||||
vec![v, v2].into_iter()
|
||||
}
|
||||
|
||||
fn front_load_but_one(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
|
||||
let offset = v.len() - 1;
|
||||
let v2 = v.split_off(offset);
|
||||
vec![v, v2].into_iter()
|
||||
}
|
||||
|
||||
fn back_load(mut v: Vec<u8>) -> impl Iterator<Item = Vec<u8>> {
|
||||
// Chunk the data at 10%
|
||||
let offset = (v.len() * 10) / 100;
|
||||
let v2 = v.split_off(offset);
|
||||
vec![v, v2].into_iter()
|
||||
}
|
||||
|
||||
async fn test(i: impl Iterator<Item = Vec<u8>> + Send + 'static) {
|
||||
let v = i.collect::<Vec<_>>();
|
||||
let mut expected: Vec<u8> = vec![];
|
||||
for v in &v {
|
||||
expected.extend(v);
|
||||
}
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(1);
|
||||
let underlying = ResponseStream::V8Stream(rx);
|
||||
let mut resp = GZipResponseStream::new(underlying);
|
||||
let handle = tokio::task::spawn(async move {
|
||||
for chunk in v {
|
||||
tx.send(chunk.into()).await.ok().unwrap();
|
||||
}
|
||||
});
|
||||
// Limit how many times we'll loop
|
||||
const LIMIT: usize = 1000;
|
||||
let mut v: Vec<u8> = vec![];
|
||||
for i in 0..=LIMIT {
|
||||
assert_ne!(i, LIMIT);
|
||||
let frame = poll_fn(|cx| Pin::new(&mut resp).poll_frame(cx)).await;
|
||||
if matches!(frame, ResponseStreamResult::EndOfStream) {
|
||||
break;
|
||||
}
|
||||
if matches!(frame, ResponseStreamResult::NoData) {
|
||||
continue;
|
||||
}
|
||||
let ResponseStreamResult::NonEmptyBuf(buf) = frame else {
|
||||
panic!("Unexpected stream type");
|
||||
};
|
||||
assert_ne!(buf.len(), 0);
|
||||
v.extend(&*buf);
|
||||
}
|
||||
|
||||
let mut gz = flate2::read::GzDecoder::new(&*v);
|
||||
let mut v = vec![];
|
||||
gz.read_to_end(&mut v).unwrap();
|
||||
|
||||
assert_eq!(v, expected);
|
||||
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple() {
|
||||
test(vec![b"hello world".to_vec()].into_iter()).await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_empty() {
|
||||
test(vec![].into_iter()).await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_zeros() {
|
||||
test(vec![vec![0; 0x10000]].into_iter()).await
|
||||
}
|
||||
|
||||
macro_rules! test {
|
||||
($vec:ident) => {
|
||||
mod $vec {
|
||||
#[tokio::test]
|
||||
async fn chunk() {
|
||||
let iter = super::chunk(super::$vec());
|
||||
super::test(iter).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn front_load() {
|
||||
let iter = super::front_load(super::$vec());
|
||||
super::test(iter).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn front_load_but_one() {
|
||||
let iter = super::front_load_but_one(super::$vec());
|
||||
super::test(iter).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn back_load() {
|
||||
let iter = super::back_load(super::$vec());
|
||||
super::test(iter).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn random() {
|
||||
let iter = super::random(super::$vec());
|
||||
super::test(iter).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test!(zeros);
|
||||
test!(hard_to_gzip_data);
|
||||
test!(already_gzipped_data);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue