1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-04 05:18:59 -05:00
denoland-deno/ext/fetch/byte_stream.rs
Luca Casonato c059c12a1e
fix(ext/fetch): handle errors in req body stream (#17081)
Right now an error in a request body stream causes an uncatchable
global promise rejection. This PR fixes this to instead propagate the
error correctly into the promise returned from `fetch`.

It additionally fixes errored readable stream bodies being treated as
successfully completed bodies by Rust.
2023-01-05 13:03:42 +01:00

87 lines
2.4 KiB
Rust

// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use deno_core::futures::Stream;
use tokio::sync::mpsc;
/// [MpscByteStream] is a stream of bytes that is backed by a mpsc channel. It is
/// used to bridge between the fetch task and the HTTP body stream. The stream
/// has the special property that it errors if the channel is closed before an
/// explicit EOF is sent (in the form of a [None] value on the sender).
pub struct MpscByteStream {
receiver: mpsc::Receiver<Option<bytes::Bytes>>,
shutdown: bool,
}
impl MpscByteStream {
pub fn new() -> (Self, mpsc::Sender<Option<bytes::Bytes>>) {
let (sender, receiver) = mpsc::channel::<Option<bytes::Bytes>>(1);
let this = Self {
receiver,
shutdown: false,
};
(this, sender)
}
}
impl Stream for MpscByteStream {
type Item = Result<bytes::Bytes, std::io::Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let val = std::task::ready!(self.receiver.poll_recv(cx));
match val {
None if self.shutdown => Poll::Ready(None),
None => Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"channel closed",
)))),
Some(None) => {
self.shutdown = true;
Poll::Ready(None)
}
Some(Some(val)) => Poll::Ready(Some(Ok(val))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use deno_core::futures::StreamExt;
#[tokio::test]
async fn success() {
let (mut stream, sender) = MpscByteStream::new();
sender.send(Some(Bytes::from("hello"))).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
sender.send(Some(Bytes::from("world"))).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
sender.send(None).await.unwrap();
drop(sender);
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn error() {
let (mut stream, sender) = MpscByteStream::new();
sender.send(Some(Bytes::from("hello"))).await.unwrap();
assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
drop(sender);
assert_eq!(
stream.next().await.unwrap().unwrap_err().kind(),
std::io::ErrorKind::UnexpectedEof
);
}
}