1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-01 09:24:20 -04:00
denoland-deno/ext/http/reader_stream.rs
Luca Casonato 8e947bb674
fix(ext/http): close stream on resp body error (#17126)
Previously, errored streaming response bodies did not cause the HTTP
stream to be aborted. It instead caused the stream to be closed gracefully,
which had the result that the client could not detect the difference
between a successful response and an errored response.

This commit fixes the issue by aborting the stream on error.
2022-12-20 08:46:45 +00:00

157 lines
4.4 KiB
Rust

// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use bytes::Bytes;
use deno_core::futures::Stream;
use pin_project::pin_project;
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
/// [ExternallyAbortableByteStream] adapts a [tokio::AsyncRead] into a [Stream].
/// It is used to bridge between the HTTP response body resource, and
/// `hyper::Body`. The stream has the special property that it errors if the
/// underlying reader is closed before an explicit EOF is sent (in the form of
/// setting the `shutdown` flag to true).
#[pin_project]
pub struct ExternallyAbortableReaderStream<R: AsyncRead> {
#[pin]
inner: ReaderStream<R>,
done: Arc<AtomicBool>,
}
pub struct ShutdownHandle(Arc<AtomicBool>);
impl ShutdownHandle {
pub fn shutdown(&self) {
self.0.store(true, std::sync::atomic::Ordering::SeqCst);
}
}
impl<R: AsyncRead> ExternallyAbortableReaderStream<R> {
pub fn new(reader: R) -> (Self, ShutdownHandle) {
let done = Arc::new(AtomicBool::new(false));
let this = Self {
inner: ReaderStream::new(reader),
done: done.clone(),
};
(this, ShutdownHandle(done))
}
}
impl<R: AsyncRead> Stream for ExternallyAbortableReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.project();
let val = std::task::ready!(this.inner.poll_next(cx));
match val {
None if this.done.load(Ordering::SeqCst) => Poll::Ready(None),
None => Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"stream reader has shut down",
)))),
Some(val) => Poll::Ready(Some(val)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use deno_core::futures::StreamExt;
use tokio::io::AsyncWriteExt;
#[tokio::test]
async fn success() {
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, mut writer) = tokio::io::split(b);
let (mut stream, shutdown_handle) =
ExternallyAbortableReaderStream::new(reader);
writer.write_all(b"hello").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
writer.write_all(b"world").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
shutdown_handle.shutdown();
writer.shutdown().await.unwrap();
drop(writer);
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn error() {
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, mut writer) = tokio::io::split(b);
let (mut stream, _shutdown_handle) =
ExternallyAbortableReaderStream::new(reader);
writer.write_all(b"hello").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
drop(writer);
assert_eq!(
stream.next().await.unwrap().unwrap_err().kind(),
std::io::ErrorKind::UnexpectedEof
);
}
#[tokio::test]
async fn error2() {
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, mut writer) = tokio::io::split(b);
let (mut stream, _shutdown_handle) =
ExternallyAbortableReaderStream::new(reader);
writer.write_all(b"hello").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
writer.shutdown().await.unwrap();
drop(writer);
assert_eq!(
stream.next().await.unwrap().unwrap_err().kind(),
std::io::ErrorKind::UnexpectedEof
);
}
#[tokio::test]
async fn write_after_shutdown() {
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, mut writer) = tokio::io::split(b);
let (mut stream, shutdown_handle) =
ExternallyAbortableReaderStream::new(reader);
writer.write_all(b"hello").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
writer.write_all(b"world").await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
shutdown_handle.shutdown();
writer.shutdown().await.unwrap();
assert!(writer.write_all(b"!").await.is_err());
drop(writer);
assert!(stream.next().await.is_none());
}
}