diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 1888409ba6..b4a2f0c452 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -343,6 +343,7 @@ impl Default for HttpRequestReader { enum HttpResponseWriter { Headers(oneshot::Sender>), Body(Pin>), + BodyUncompressed(hyper::body::Sender), Closed, } @@ -638,17 +639,15 @@ async fn op_http_write_headers( } None => { // If no buffer was passed, the caller will stream the response body. - - // Create a one way pipe that implements tokio's async io traits. To do - // this we create a [tokio::io::DuplexStream], but then throw away one - // of the directions to create a one way pipe. - let (a, b) = tokio::io::duplex(64 * 1024); - let (reader, _) = tokio::io::split(a); - let (_, writer) = tokio::io::split(b); - - let writer_body: Pin>; - if should_compress { + // Create a one way pipe that implements tokio's async io traits. To do + // this we create a [tokio::io::DuplexStream], but then throw away one + // of the directions to create a one way pipe. + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, writer) = tokio::io::split(b); + + let writer_body: Pin>; match *stream.accept_encoding.borrow() { Encoding::Brotli => { let writer = BrotliEncoder::new(writer); @@ -662,12 +661,14 @@ async fn op_http_write_headers( builder = builder.header("content-encoding", "gzip"); } } - } else { - writer_body = Box::pin(writer); - } - body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?; - new_wr = HttpResponseWriter::Body(writer_body); + body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?; + new_wr = HttpResponseWriter::Body(writer_body); + } else { + let (body_tx, body_rx) = Body::channel(); + body = builder.body(body_rx)?; + new_wr = HttpResponseWriter::BodyUncompressed(body_tx); + } } } @@ -699,14 +700,14 @@ async fn op_http_write_resource( let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; let resource = state.borrow().resource_table.get_any(stream)?; loop { - let body_writer = match &mut *wr { - HttpResponseWriter::Body(body_writer) => body_writer, + match *wr { HttpResponseWriter::Headers(_) => { return Err(http_error("no response headers")) } HttpResponseWriter::Closed => { return Err(http_error("response already completed")) } + _ => {} }; let vec = vec![0u8; 64 * 1024]; // 64KB @@ -715,17 +716,29 @@ async fn op_http_write_resource( if nread == 0 { break; } - match body_writer.write_all(&buf[..nread]).await { - Ok(_) => {} - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - http_stream.conn.closed().await?; - // If there was no connection error, drop body_tx. - *wr = HttpResponseWriter::Closed; + + match &mut *wr { + HttpResponseWriter::Body(body) => { + if let Err(err) = body.write_all(&buf[..nread]).await { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + http_stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + } } - } + HttpResponseWriter::BodyUncompressed(body) => { + if let Err(err) = body.send_data(Bytes::from(buf.to_temp())).await { + assert!(err.is_closed()); + // Pull up the failure associated with the transport connection instead. + http_stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + } + } + _ => unreachable!(), + }; } let wr = take(&mut *wr); @@ -756,30 +769,42 @@ async fn op_http_write( let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; loop { - let body_writer = match &mut *wr { - HttpResponseWriter::Body(body_tx) => body_tx, + match &mut *wr { HttpResponseWriter::Headers(_) => { break Err(http_error("no response headers")) } HttpResponseWriter::Closed => { break Err(http_error("response already completed")) } - }; - - let mut res = body_writer.write_all(&buf).await; - if res.is_ok() { - res = body_writer.flush().await; - } - - match res { - Ok(_) => break Ok(()), - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - stream.conn.closed().await?; - // If there was no connection error, drop body_tx. - *wr = HttpResponseWriter::Closed; + HttpResponseWriter::Body(body) => { + let mut result = body.write_all(&buf).await; + if result.is_ok() { + result = body.flush().await; + } + match result { + Ok(_) => break Ok(()), + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + } + } + } + HttpResponseWriter::BodyUncompressed(body) => { + let bytes = Bytes::copy_from_slice(&buf[..]); + match body.send_data(bytes).await { + Ok(_) => break Ok(()), + Err(err) => { + assert!(err.is_closed()); + // Pull up the failure associated with the transport connection instead. + stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + } + } } } }