From 9cd9a68b002601c43455f150312beae676bbb824 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Tue, 20 Dec 2022 09:46:45 +0100 Subject: [PATCH] fix(ext/http): close stream on resp body error (#17126) Previously, errored streaming response bodies did not cause the HTTP stream to be aborted. It instead caused the stream to be closed gracefully, which had the result that the client could not detect the difference between a successful response and an errored response. This commit fixes the issue by aborting the stream on error. --- Cargo.lock | 1 + cli/tests/unit/http_test.ts | 123 ++++++++++++++++++++++++++++ ext/http/01_http.js | 15 ++-- ext/http/Cargo.toml | 1 + ext/http/lib.rs | 100 +++++++++++++++++------ ext/http/reader_stream.rs | 157 ++++++++++++++++++++++++++++++++++++ 6 files changed, 369 insertions(+), 28 deletions(-) create mode 100644 ext/http/reader_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 6ae7e6b81f..665a0901ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1095,6 +1095,7 @@ dependencies = [ "mime", "percent-encoding", "phf", + "pin-project", "ring", "serde", "tokio", diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 3475513620..73bf07b68c 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -2614,6 +2614,129 @@ Deno.test({ }, }); +async function httpServerWithErrorBody( + listener: Deno.Listener, + compression: boolean, +): Promise { + const conn = await listener.accept(); + listener.close(); + const httpConn = Deno.serveHttp(conn); + const e = await httpConn.nextRequest(); + assert(e); + const { respondWith } = e; + const originalErr = new Error("boom"); + const rs = new ReadableStream({ + async start(controller) { + controller.enqueue(new Uint8Array([65])); + await delay(1000); + controller.error(originalErr); + }, + }); + const init = compression ? { headers: { "content-type": "text/plain" } } : {}; + const response = new Response(rs, init); + const err = await assertRejects(() => respondWith(response)); + assert(err === originalErr); + return httpConn; +} + +for (const compression of [true, false]) { + Deno.test({ + name: `http server errors stream if response body errors (http/1.1${ + compression ? " + compression" : "" + })`, + permissions: { net: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + + const listener = Deno.listen({ hostname, port }); + const server = httpServerWithErrorBody(listener, compression); + + const conn = await Deno.connect({ hostname, port }); + const msg = new TextEncoder().encode( + `GET / HTTP/1.1\r\nHost: ${hostname}:${port}\r\n\r\n`, + ); + const nwritten = await conn.write(msg); + assertEquals(nwritten, msg.byteLength); + + const buf = new Uint8Array(1024); + const nread = await conn.read(buf); + assert(nread); + const data = new TextDecoder().decode(buf.subarray(0, nread)); + assert(data.endsWith("1\r\nA\r\n")); + const nread2 = await conn.read(buf); // connection should be closed now because the stream errored + assertEquals(nread2, null); + conn.close(); + + const httpConn = await server; + httpConn.close(); + }, + }); + + Deno.test({ + name: `http server errors stream if response body errors (http/1.1 + fetch${ + compression ? " + compression" : "" + })`, + permissions: { net: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + + const listener = Deno.listen({ hostname, port }); + const server = httpServerWithErrorBody(listener, compression); + + const resp = await fetch(`http://${hostname}:${port}/`); + assert(resp.body); + const reader = resp.body.getReader(); + const result = await reader.read(); + assert(!result.done); + assertEquals(result.value, new Uint8Array([65])); + const err = await assertRejects(() => reader.read()); + assert(err instanceof TypeError); + assert(err.message.includes("unexpected EOF")); + + const httpConn = await server; + httpConn.close(); + }, + }); + + Deno.test({ + name: `http server errors stream if response body errors (http/2 + fetch${ + compression ? " + compression" : "" + }))`, + permissions: { net: true, read: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + + const listener = Deno.listenTls({ + hostname, + port, + certFile: "cli/tests/testdata/tls/localhost.crt", + keyFile: "cli/tests/testdata/tls/localhost.key", + alpnProtocols: ["h2"], + }); + const server = httpServerWithErrorBody(listener, compression); + + const caCert = Deno.readTextFileSync("cli/tests/testdata/tls/RootCA.pem"); + const client = Deno.createHttpClient({ caCerts: [caCert] }); + const resp = await fetch(`https://${hostname}:${port}/`, { client }); + client.close(); + assert(resp.body); + const reader = resp.body.getReader(); + const result = await reader.read(); + assert(!result.done); + assertEquals(result.value, new Uint8Array([65])); + const err = await assertRejects(() => reader.read()); + assert(err instanceof TypeError); + assert(err.message.includes("unexpected internal error encountered")); + + const httpConn = await server; + httpConn.close(); + }, + }); +} + function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(r); diff --git a/ext/http/01_http.js b/ext/http/01_http.js index bd740b600c..dfb0f206cf 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -263,6 +263,7 @@ } if (isStreamingResponseBody) { + let success = false; if ( respBody === null || !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody) @@ -284,6 +285,7 @@ ); if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid); readableStreamClose(respBody); // Release JS lock. + success = true; } catch (error) { const connError = httpConn[connErrorSymbol]; if ( @@ -320,13 +322,16 @@ throw error; } } + success = true; } - try { - await core.opAsync("op_http_shutdown", streamRid); - } catch (error) { - await reader.cancel(error); - throw error; + if (success) { + try { + await core.opAsync("op_http_shutdown", streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } } } diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index 2f4ae31e64..65cd4ccfef 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -31,6 +31,7 @@ hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "r mime = "0.3.16" percent-encoding.workspace = true phf = { version = "0.10", features = ["macros"] } +pin-project.workspace = true ring.workspace = true serde.workspace = true tokio.workspace = true diff --git a/ext/http/lib.rs b/ext/http/lib.rs index af117d3f92..812394d94b 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -70,9 +70,12 @@ use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; -use tokio_util::io::ReaderStream; + +use crate::reader_stream::ExternallyAbortableReaderStream; +use crate::reader_stream::ShutdownHandle; pub mod compressible; +mod reader_stream; pub fn init() -> Extension { Extension::builder() @@ -414,8 +417,11 @@ impl Default for HttpRequestReader { /// The write half of an HTTP stream. enum HttpResponseWriter { Headers(oneshot::Sender>), - Body(Pin>), - BodyUncompressed(hyper::body::Sender), + Body { + writer: Pin>, + shutdown_handle: ShutdownHandle, + }, + BodyUncompressed(BodyUncompressedSender), Closed, } @@ -425,6 +431,36 @@ impl Default for HttpResponseWriter { } } +struct BodyUncompressedSender(Option); + +impl BodyUncompressedSender { + fn sender(&mut self) -> &mut hyper::body::Sender { + // This is safe because we only ever take the sender out of the option + // inside of the shutdown method. + self.0.as_mut().unwrap() + } + + fn shutdown(mut self) { + // take the sender out of self so that when self is dropped at the end of + // this block, it doesn't get aborted + self.0.take(); + } +} + +impl From for BodyUncompressedSender { + fn from(sender: hyper::body::Sender) -> Self { + BodyUncompressedSender(Some(sender)) + } +} + +impl Drop for BodyUncompressedSender { + fn drop(&mut self) { + if let Some(sender) = self.0.take() { + sender.abort(); + } + } +} + // We use a tuple instead of struct to avoid serialization overhead of the keys. #[derive(Serialize)] #[serde(rename_all = "camelCase")] @@ -668,14 +704,22 @@ fn http_response( Encoding::Gzip => Box::pin(GzipEncoder::new(writer)), _ => unreachable!(), // forbidden by accepts_compression }; + let (stream, shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); Ok(( - HttpResponseWriter::Body(writer), - Body::wrap_stream(ReaderStream::new(reader)), + HttpResponseWriter::Body { + writer, + shutdown_handle, + }, + Body::wrap_stream(stream), )) } None => { let (body_tx, body_rx) = Body::channel(); - Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx)) + Ok(( + HttpResponseWriter::BodyUncompressed(body_tx.into()), + body_rx, + )) } } } @@ -768,10 +812,10 @@ async fn op_http_write_resource( } match &mut *wr { - HttpResponseWriter::Body(body) => { - let mut result = body.write_all(&view).await; + HttpResponseWriter::Body { writer, .. } => { + let mut result = writer.write_all(&view).await; if result.is_ok() { - result = body.flush().await; + result = writer.flush().await; } if let Err(err) = result { assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); @@ -784,7 +828,7 @@ async fn op_http_write_resource( } HttpResponseWriter::BodyUncompressed(body) => { let bytes = Bytes::from(view); - if let Err(err) = body.send_data(bytes).await { + if let Err(err) = body.sender().send_data(bytes).await { assert!(err.is_closed()); // Pull up the failure associated with the transport connection instead. http_stream.conn.closed().await?; @@ -813,10 +857,10 @@ async fn op_http_write( match &mut *wr { HttpResponseWriter::Headers(_) => Err(http_error("no response headers")), HttpResponseWriter::Closed => Err(http_error("response already completed")), - HttpResponseWriter::Body(body) => { - let mut result = body.write_all(&buf).await; + HttpResponseWriter::Body { writer, .. } => { + let mut result = writer.write_all(&buf).await; if result.is_ok() { - result = body.flush().await; + result = writer.flush().await; } match result { Ok(_) => Ok(()), @@ -833,7 +877,7 @@ async fn op_http_write( } HttpResponseWriter::BodyUncompressed(body) => { let bytes = Bytes::from(buf); - match body.send_data(bytes).await { + match body.sender().send_data(bytes).await { Ok(_) => Ok(()), Err(err) => { assert!(err.is_closed()); @@ -862,17 +906,27 @@ async fn op_http_shutdown( .get::(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let wr = take(&mut *wr); - if let HttpResponseWriter::Body(mut body_writer) = wr { - match body_writer.shutdown().await { - Ok(_) => {} - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - stream.conn.closed().await?; + match wr { + HttpResponseWriter::Body { + mut writer, + shutdown_handle, + } => { + shutdown_handle.shutdown(); + match writer.shutdown().await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + stream.conn.closed().await?; + } } } - } + HttpResponseWriter::BodyUncompressed(body) => { + body.shutdown(); + } + _ => {} + }; Ok(()) } diff --git a/ext/http/reader_stream.rs b/ext/http/reader_stream.rs new file mode 100644 index 0000000000..388b8db814 --- /dev/null +++ b/ext/http/reader_stream.rs @@ -0,0 +1,157 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +use std::pin::Pin; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; + +use bytes::Bytes; +use deno_core::futures::Stream; +use pin_project::pin_project; +use tokio::io::AsyncRead; +use tokio_util::io::ReaderStream; + +/// [ExternallyAbortableByteStream] adapts a [tokio::AsyncRead] into a [Stream]. +/// It is used to bridge between the HTTP response body resource, and +/// `hyper::Body`. The stream has the special property that it errors if the +/// underlying reader is closed before an explicit EOF is sent (in the form of +/// setting the `shutdown` flag to true). +#[pin_project] +pub struct ExternallyAbortableReaderStream { + #[pin] + inner: ReaderStream, + done: Arc, +} + +pub struct ShutdownHandle(Arc); + +impl ShutdownHandle { + pub fn shutdown(&self) { + self.0.store(true, std::sync::atomic::Ordering::SeqCst); + } +} + +impl ExternallyAbortableReaderStream { + pub fn new(reader: R) -> (Self, ShutdownHandle) { + let done = Arc::new(AtomicBool::new(false)); + let this = Self { + inner: ReaderStream::new(reader), + done: done.clone(), + }; + (this, ShutdownHandle(done)) + } +} + +impl Stream for ExternallyAbortableReaderStream { + type Item = std::io::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let val = std::task::ready!(this.inner.poll_next(cx)); + match val { + None if this.done.load(Ordering::SeqCst) => Poll::Ready(None), + None => Poll::Ready(Some(Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "stream reader has shut down", + )))), + Some(val) => Poll::Ready(Some(val)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use deno_core::futures::StreamExt; + use tokio::io::AsyncWriteExt; + + #[tokio::test] + async fn success() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + writer.write_all(b"world").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world")); + + shutdown_handle.shutdown(); + writer.shutdown().await.unwrap(); + drop(writer); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn error() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, _shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + drop(writer); + assert_eq!( + stream.next().await.unwrap().unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } + + #[tokio::test] + async fn error2() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, _shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + writer.shutdown().await.unwrap(); + drop(writer); + assert_eq!( + stream.next().await.unwrap().unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } + + #[tokio::test] + async fn write_after_shutdown() { + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, mut writer) = tokio::io::split(b); + + let (mut stream, shutdown_handle) = + ExternallyAbortableReaderStream::new(reader); + + writer.write_all(b"hello").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello")); + + writer.write_all(b"world").await.unwrap(); + assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world")); + + shutdown_handle.shutdown(); + writer.shutdown().await.unwrap(); + + assert!(writer.write_all(b"!").await.is_err()); + + drop(writer); + assert!(stream.next().await.is_none()); + } +}