mirror of
https://github.com/denoland/deno.git
synced 2024-11-25 15:29:32 -05:00
cleanup(ext/http): simpler http write ops (#14552)
Facilitates making `op_http_write_headers` sync and thus faster
This commit is contained in:
parent
385c8eef56
commit
510ab505e7
2 changed files with 187 additions and 197 deletions
377
ext/http/lib.rs
377
ext/http/lib.rs
|
@ -39,6 +39,8 @@ use flate2::write::GzEncoder;
|
|||
use flate2::Compression;
|
||||
use fly_accept_encoding::Encoding;
|
||||
use hyper::body::Bytes;
|
||||
use hyper::header::HeaderName;
|
||||
use hyper::header::HeaderValue;
|
||||
use hyper::server::conn::Http;
|
||||
use hyper::service::Service;
|
||||
use hyper::Body;
|
||||
|
@ -500,177 +502,46 @@ async fn op_http_write_headers(
|
|||
|
||||
let mut builder = Response::builder().status(status);
|
||||
|
||||
let mut body_compressible = false;
|
||||
let mut headers_allow_compression = true;
|
||||
let mut vary_header = None;
|
||||
let mut etag_header = None;
|
||||
let mut content_type_header = None;
|
||||
// Add headers
|
||||
let header_count = headers.len();
|
||||
let headers = headers.into_iter().filter_map(|(k, v)| {
|
||||
let v: Vec<u8> = v.into();
|
||||
Some((
|
||||
HeaderName::try_from(k.as_slice()).ok()?,
|
||||
HeaderValue::try_from(v).ok()?,
|
||||
))
|
||||
});
|
||||
// Track supported encoding
|
||||
let encoding = *stream.accept_encoding.borrow();
|
||||
|
||||
builder.headers_mut().unwrap().reserve(headers.len());
|
||||
for (key, value) in &headers {
|
||||
if key.eq_ignore_ascii_case(b"cache-control") {
|
||||
if let Ok(value) = std::str::from_utf8(value) {
|
||||
if let Some(cache_control) = CacheControl::from_value(value) {
|
||||
// We skip compression if the cache-control header value is set to
|
||||
// "no-transform"
|
||||
if cache_control.no_transform {
|
||||
headers_allow_compression = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
headers_allow_compression = false;
|
||||
}
|
||||
} else if key.eq_ignore_ascii_case(b"content-range") {
|
||||
// we skip compression if the `content-range` header value is set, as it
|
||||
// indicates the contents of the body were negotiated based directly
|
||||
// with the user code and we can't compress the response
|
||||
headers_allow_compression = false;
|
||||
} else if key.eq_ignore_ascii_case(b"content-type") && !value.is_empty() {
|
||||
content_type_header = Some(value);
|
||||
} else if key.eq_ignore_ascii_case(b"content-encoding") {
|
||||
// we don't compress if a content-encoding header was provided
|
||||
headers_allow_compression = false;
|
||||
} else if key.eq_ignore_ascii_case(b"etag") && !value.is_empty() {
|
||||
// we store the values of ETag and Vary and skip adding them for now, as
|
||||
// we may need to modify or change.
|
||||
etag_header = Some(value);
|
||||
continue;
|
||||
} else if key.eq_ignore_ascii_case(b"vary") && !value.is_empty() {
|
||||
vary_header = Some(value);
|
||||
continue;
|
||||
}
|
||||
builder = builder.header(key.as_slice(), value.as_slice());
|
||||
}
|
||||
let hmap = builder.headers_mut().unwrap();
|
||||
hmap.reserve(header_count + 2);
|
||||
hmap.extend(headers);
|
||||
ensure_vary_accept_encoding(hmap);
|
||||
|
||||
if headers_allow_compression {
|
||||
body_compressible = content_type_header
|
||||
.map(compressible::is_content_compressible)
|
||||
.unwrap_or_default();
|
||||
}
|
||||
|
||||
let body: Response<Body>;
|
||||
let new_wr: HttpResponseWriter;
|
||||
|
||||
// Set Vary: Accept-Encoding header for direct body response.
|
||||
// Note: we set the header irrespective of whether or not we compress the data
|
||||
// to make sure cache services do not serve uncompressed data to clients that
|
||||
// support compression.
|
||||
let vary_value = if let Some(value) = vary_header {
|
||||
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
|
||||
if !value_str.to_lowercase().contains("accept-encoding") {
|
||||
format!("Accept-Encoding, {}", value_str)
|
||||
} else {
|
||||
value_str.to_string()
|
||||
}
|
||||
} else {
|
||||
// the header value wasn't valid UTF8, so it would have been a
|
||||
// problem anyways, so sending a default header.
|
||||
"Accept-Encoding".to_string()
|
||||
}
|
||||
} else {
|
||||
"Accept-Encoding".to_string()
|
||||
};
|
||||
builder = builder.header("vary", &vary_value);
|
||||
|
||||
let accepts_compression = matches!(
|
||||
*stream.accept_encoding.borrow(),
|
||||
Encoding::Brotli | Encoding::Gzip
|
||||
);
|
||||
let should_compress = body_compressible
|
||||
let accepts_compression =
|
||||
matches!(encoding, Encoding::Brotli | Encoding::Gzip);
|
||||
let compressing = accepts_compression
|
||||
&& (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
|
||||
&& accepts_compression;
|
||||
&& should_compress(hmap);
|
||||
|
||||
if should_compress {
|
||||
// If user provided a ETag header for uncompressed data, we need to
|
||||
// ensure it is a Weak Etag header ("W/").
|
||||
if let Some(value) = etag_header {
|
||||
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
|
||||
if !value_str.starts_with("W/") {
|
||||
builder = builder.header("etag", format!("W/{}", value_str));
|
||||
} else {
|
||||
builder = builder.header("etag", value.as_slice());
|
||||
}
|
||||
} else {
|
||||
builder = builder.header("etag", value.as_slice());
|
||||
}
|
||||
}
|
||||
if compressing {
|
||||
weaken_etag(hmap);
|
||||
// Drop 'content-length' header. Hyper will update it using compressed body.
|
||||
if let Some(headers) = builder.headers_mut() {
|
||||
headers.remove("content-length");
|
||||
}
|
||||
} else if let Some(value) = etag_header {
|
||||
builder = builder.header("etag", value.as_slice());
|
||||
hmap.remove(hyper::header::CONTENT_LENGTH);
|
||||
// Content-Encoding header
|
||||
hmap.insert(
|
||||
hyper::header::CONTENT_ENCODING,
|
||||
HeaderValue::from_static(match encoding {
|
||||
Encoding::Brotli => "br",
|
||||
Encoding::Gzip => "gzip",
|
||||
_ => unreachable!(), // Forbidden by accepts_compression
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
match data {
|
||||
Some(data) => {
|
||||
if should_compress {
|
||||
match *stream.accept_encoding.borrow() {
|
||||
Encoding::Brotli => {
|
||||
builder = builder.header("content-encoding", "br");
|
||||
// quality level 6 is based on google's nginx default value for
|
||||
// on-the-fly compression
|
||||
// https://github.com/google/ngx_brotli#brotli_comp_level
|
||||
// lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes
|
||||
// (~4MB)
|
||||
let mut writer =
|
||||
brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22);
|
||||
writer.write_all(&data)?;
|
||||
body = builder.body(writer.into_inner().into())?;
|
||||
}
|
||||
_ => {
|
||||
assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
|
||||
builder = builder.header("content-encoding", "gzip");
|
||||
// Gzip, after level 1, doesn't produce significant size difference.
|
||||
// Probably the reason why nginx's default gzip compression level is
|
||||
// 1.
|
||||
// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
|
||||
let mut writer = GzEncoder::new(Vec::new(), Compression::new(1));
|
||||
writer.write_all(&data)?;
|
||||
body = builder.body(writer.finish()?.into())?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// If a buffer was passed, but isn't compressible, we use it to
|
||||
// construct a response body.
|
||||
body = builder.body(Bytes::copy_from_slice(&data).into())?;
|
||||
}
|
||||
new_wr = HttpResponseWriter::Closed;
|
||||
}
|
||||
None => {
|
||||
// If no buffer was passed, the caller will stream the response body.
|
||||
if should_compress {
|
||||
// Create a one way pipe that implements tokio's async io traits. To do
|
||||
// this we create a [tokio::io::DuplexStream], but then throw away one
|
||||
// of the directions to create a one way pipe.
|
||||
let (a, b) = tokio::io::duplex(64 * 1024);
|
||||
let (reader, _) = tokio::io::split(a);
|
||||
let (_, writer) = tokio::io::split(b);
|
||||
|
||||
let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
|
||||
match *stream.accept_encoding.borrow() {
|
||||
Encoding::Brotli => {
|
||||
let writer = BrotliEncoder::new(writer);
|
||||
writer_body = Box::pin(writer);
|
||||
builder = builder.header("content-encoding", "br");
|
||||
}
|
||||
_ => {
|
||||
assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
|
||||
let writer = GzipEncoder::new(writer);
|
||||
writer_body = Box::pin(writer);
|
||||
builder = builder.header("content-encoding", "gzip");
|
||||
}
|
||||
}
|
||||
|
||||
body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
|
||||
new_wr = HttpResponseWriter::Body(writer_body);
|
||||
} else {
|
||||
let (body_tx, body_rx) = Body::channel();
|
||||
body = builder.body(body_rx)?;
|
||||
new_wr = HttpResponseWriter::BodyUncompressed(body_tx);
|
||||
}
|
||||
}
|
||||
}
|
||||
let (new_wr, body) = http_response(data, compressing, encoding)?;
|
||||
let body = builder.body(body)?;
|
||||
|
||||
let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
|
||||
let response_tx = match replace(&mut *old_wr, new_wr) {
|
||||
|
@ -687,6 +558,122 @@ async fn op_http_write_headers(
|
|||
}
|
||||
}
|
||||
|
||||
fn http_response(
|
||||
data: Option<StringOrBuffer>,
|
||||
compressing: bool,
|
||||
encoding: Encoding,
|
||||
) -> Result<(HttpResponseWriter, hyper::Body), AnyError> {
|
||||
match data {
|
||||
Some(data) if compressing => match encoding {
|
||||
Encoding::Brotli => {
|
||||
// quality level 6 is based on google's nginx default value for
|
||||
// on-the-fly compression
|
||||
// https://github.com/google/ngx_brotli#brotli_comp_level
|
||||
// lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes
|
||||
// (~4MB)
|
||||
let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22);
|
||||
writer.write_all(&data)?;
|
||||
Ok((HttpResponseWriter::Closed, writer.into_inner().into()))
|
||||
}
|
||||
Encoding::Gzip => {
|
||||
// Gzip, after level 1, doesn't produce significant size difference.
|
||||
// Probably the reason why nginx's default gzip compression level is
|
||||
// 1.
|
||||
// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
|
||||
let mut writer = GzEncoder::new(Vec::new(), Compression::new(1));
|
||||
writer.write_all(&data)?;
|
||||
Ok((HttpResponseWriter::Closed, writer.finish()?.into()))
|
||||
}
|
||||
_ => unreachable!(), // forbidden by accepts_compression
|
||||
},
|
||||
Some(data) => {
|
||||
// If a buffer was passed, but isn't compressible, we use it to
|
||||
// construct a response body.
|
||||
Ok((
|
||||
HttpResponseWriter::Closed,
|
||||
Bytes::copy_from_slice(&data).into(),
|
||||
))
|
||||
}
|
||||
None if compressing => {
|
||||
// Create a one way pipe that implements tokio's async io traits. To do
|
||||
// this we create a [tokio::io::DuplexStream], but then throw away one
|
||||
// of the directions to create a one way pipe.
|
||||
let (a, b) = tokio::io::duplex(64 * 1024);
|
||||
let (reader, _) = tokio::io::split(a);
|
||||
let (_, writer) = tokio::io::split(b);
|
||||
let writer: Pin<Box<dyn tokio::io::AsyncWrite>> = match encoding {
|
||||
Encoding::Brotli => Box::pin(BrotliEncoder::new(writer)),
|
||||
Encoding::Gzip => Box::pin(GzipEncoder::new(writer)),
|
||||
_ => unreachable!(), // forbidden by accepts_compression
|
||||
};
|
||||
Ok((
|
||||
HttpResponseWriter::Body(writer),
|
||||
Body::wrap_stream(ReaderStream::new(reader)),
|
||||
))
|
||||
}
|
||||
None => {
|
||||
let (body_tx, body_rx) = Body::channel();
|
||||
Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If user provided a ETag header for uncompressed data, we need to
|
||||
// ensure it is a Weak Etag header ("W/").
|
||||
fn weaken_etag(hmap: &mut hyper::HeaderMap) {
|
||||
if let Some(etag) = hmap.get_mut(hyper::header::ETAG) {
|
||||
if !etag.as_bytes().starts_with(b"W/") {
|
||||
let mut v = Vec::with_capacity(etag.as_bytes().len() + 2);
|
||||
v.extend(b"W/");
|
||||
v.extend(etag.as_bytes());
|
||||
*etag = v.try_into().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set Vary: Accept-Encoding header for direct body response.
|
||||
// Note: we set the header irrespective of whether or not we compress the data
|
||||
// to make sure cache services do not serve uncompressed data to clients that
|
||||
// support compression.
|
||||
fn ensure_vary_accept_encoding(hmap: &mut hyper::HeaderMap) {
|
||||
if let Some(v) = hmap.get_mut(hyper::header::VARY) {
|
||||
if let Ok(s) = v.to_str() {
|
||||
if !s.to_lowercase().contains("accept-encoding") {
|
||||
*v = format!("Accept-Encoding, {}", s).try_into().unwrap()
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
hmap.insert(
|
||||
hyper::header::VARY,
|
||||
HeaderValue::from_static("Accept-Encoding"),
|
||||
);
|
||||
}
|
||||
|
||||
fn should_compress(headers: &hyper::HeaderMap) -> bool {
|
||||
// skip compression if the cache-control header value is set to "no-transform" or not utf8
|
||||
fn cache_control_no_transform(headers: &hyper::HeaderMap) -> Option<bool> {
|
||||
let v = headers.get(hyper::header::CACHE_CONTROL)?;
|
||||
let s = match std::str::from_utf8(v.as_bytes()) {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Some(true),
|
||||
};
|
||||
let c = CacheControl::from_value(s)?;
|
||||
Some(c.no_transform)
|
||||
}
|
||||
// we skip compression if the `content-range` header value is set, as it
|
||||
// indicates the contents of the body were negotiated based directly
|
||||
// with the user code and we can't compress the response
|
||||
let content_range = headers.contains_key(hyper::header::CONTENT_RANGE);
|
||||
|
||||
!content_range
|
||||
&& !cache_control_no_transform(headers).unwrap_or_default()
|
||||
&& headers
|
||||
.get(hyper::header::CONTENT_TYPE)
|
||||
.map(compressible::is_content_compressible)
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
#[op]
|
||||
async fn op_http_write_resource(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
|
@ -757,42 +744,38 @@ async fn op_http_write(
|
|||
.get::<HttpStreamResource>(rid)?;
|
||||
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
|
||||
|
||||
loop {
|
||||
match &mut *wr {
|
||||
HttpResponseWriter::Headers(_) => {
|
||||
break Err(http_error("no response headers"))
|
||||
match &mut *wr {
|
||||
HttpResponseWriter::Headers(_) => Err(http_error("no response headers")),
|
||||
HttpResponseWriter::Closed => Err(http_error("response already completed")),
|
||||
HttpResponseWriter::Body(body) => {
|
||||
let mut result = body.write_all(&buf).await;
|
||||
if result.is_ok() {
|
||||
result = body.flush().await;
|
||||
}
|
||||
HttpResponseWriter::Closed => {
|
||||
break Err(http_error("response already completed"))
|
||||
}
|
||||
HttpResponseWriter::Body(body) => {
|
||||
let mut result = body.write_all(&buf).await;
|
||||
if result.is_ok() {
|
||||
result = body.flush().await;
|
||||
}
|
||||
match result {
|
||||
Ok(_) => break Ok(()),
|
||||
Err(err) => {
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
|
||||
// Don't return "broken pipe", that's an implementation detail.
|
||||
// Pull up the failure associated with the transport connection instead.
|
||||
stream.conn.closed().await?;
|
||||
// If there was no connection error, drop body_tx.
|
||||
*wr = HttpResponseWriter::Closed;
|
||||
}
|
||||
match result {
|
||||
Ok(_) => Ok(()),
|
||||
Err(err) => {
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
|
||||
// Don't return "broken pipe", that's an implementation detail.
|
||||
// Pull up the failure associated with the transport connection instead.
|
||||
stream.conn.closed().await?;
|
||||
// If there was no connection error, drop body_tx.
|
||||
*wr = HttpResponseWriter::Closed;
|
||||
Err(http_error("response already completed"))
|
||||
}
|
||||
}
|
||||
HttpResponseWriter::BodyUncompressed(body) => {
|
||||
let bytes = Bytes::copy_from_slice(&buf[..]);
|
||||
match body.send_data(bytes).await {
|
||||
Ok(_) => break Ok(()),
|
||||
Err(err) => {
|
||||
assert!(err.is_closed());
|
||||
// Pull up the failure associated with the transport connection instead.
|
||||
stream.conn.closed().await?;
|
||||
// If there was no connection error, drop body_tx.
|
||||
*wr = HttpResponseWriter::Closed;
|
||||
}
|
||||
}
|
||||
HttpResponseWriter::BodyUncompressed(body) => {
|
||||
let bytes = Bytes::copy_from_slice(&buf[..]);
|
||||
match body.send_data(bytes).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(err) => {
|
||||
assert!(err.is_closed());
|
||||
// Pull up the failure associated with the transport connection instead.
|
||||
stream.conn.closed().await?;
|
||||
// If there was no connection error, drop body_tx.
|
||||
*wr = HttpResponseWriter::Closed;
|
||||
Err(http_error("response already completed"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,3 +46,10 @@ impl FromV8 for ByteString {
|
|||
Ok(buffer.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::from_over_into)]
|
||||
impl Into<Vec<u8>> for ByteString {
|
||||
fn into(self) -> Vec<u8> {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue