From 4258ed262f6eed9b0ee123e1ba9c91f999f0b429 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 31 Dec 2019 15:09:58 +0100 Subject: [PATCH] refactor: move HttpBody to cli/http_util.rs (#3569) --- cli/http_body.rs | 96 ------------------------------------------------ cli/http_util.rs | 84 ++++++++++++++++++++++++++++++++++++++++++ cli/lib.rs | 1 - cli/ops/fetch.rs | 6 +-- cli/ops/io.rs | 2 +- 5 files changed, 87 insertions(+), 102 deletions(-) delete mode 100644 cli/http_body.rs diff --git a/cli/http_body.rs b/cli/http_body.rs deleted file mode 100644 index 4873069898..0000000000 --- a/cli/http_body.rs +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -use bytes::Bytes; -use futures::Stream; -use futures::StreamExt; -use reqwest; -use std::cmp::min; -use std::io; -use std::io::Read; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; -use tokio::io::AsyncRead; - -// TODO(bartlomieju): most of this stuff can be moved to `cli/ops/fetch.rs` -type ReqwestStream = Pin> + Send>>; - -/// Wraps `ReqwestStream` so that it can be exposed as an `AsyncRead` and integrated -/// into resources more easily. -pub struct HttpBody { - stream: ReqwestStream, - chunk: Option, - pos: usize, -} - -impl HttpBody { - pub fn from(body: ReqwestStream) -> Self { - Self { - stream: body, - chunk: None, - pos: 0, - } - } -} - -impl Read for HttpBody { - fn read(&mut self, _buf: &mut [u8]) -> io::Result { - unimplemented!(); - } -} - -impl AsyncRead for HttpBody { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll> { - let mut inner = self.get_mut(); - if let Some(chunk) = inner.chunk.take() { - debug!( - "HttpBody Fake Read buf {} chunk {} pos {}", - buf.len(), - chunk.len(), - inner.pos - ); - let n = min(buf.len(), chunk.len() - inner.pos); - { - let rest = &chunk[inner.pos..]; - buf[..n].clone_from_slice(&rest[..n]); - } - inner.pos += n; - if inner.pos == chunk.len() { - inner.pos = 0; - } else { - inner.chunk = Some(chunk); - } - return Poll::Ready(Ok(n)); - } else { - assert_eq!(inner.pos, 0); - } - - let p = inner.stream.poll_next_unpin(cx); - match p { - Poll::Ready(Some(Err(e))) => Poll::Ready(Err( - // TODO(bartlomieju): rewrite it to use ErrBox - io::Error::new(io::ErrorKind::Other, e), - )), - Poll::Ready(Some(Ok(chunk))) => { - debug!( - "HttpBody Real Read buf {} chunk {} pos {}", - buf.len(), - chunk.len(), - inner.pos - ); - let n = min(buf.len(), chunk.len()); - buf[..n].clone_from_slice(&chunk[..n]); - if buf.len() < chunk.len() { - inner.pos = n; - inner.chunk = Some(chunk); - } - Poll::Ready(Ok(n)) - } - Poll::Ready(None) => Poll::Ready(Ok(0)), - Poll::Pending => Poll::Pending, - } - } -} diff --git a/cli/http_util.rs b/cli/http_util.rs index 83aaadd1ea..4a925e3d9a 100644 --- a/cli/http_util.rs +++ b/cli/http_util.rs @@ -2,6 +2,7 @@ use crate::deno_error; use crate::deno_error::DenoError; use crate::version; +use bytes::Bytes; use deno::ErrBox; use futures::future::FutureExt; use reqwest; @@ -11,7 +12,14 @@ use reqwest::header::LOCATION; use reqwest::header::USER_AGENT; use reqwest::redirect::Policy; use reqwest::Client; +use reqwest::Response; +use std::cmp::min; use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use tokio::io::AsyncRead; use url::Url; lazy_static! { @@ -119,6 +127,82 @@ pub fn fetch_string_once( fut.boxed() } +/// Wraps reqwest `Response` so that it can be exposed as an `AsyncRead` and integrated +/// into resources more easily. +pub struct HttpBody { + response: Response, + chunk: Option, + pos: usize, +} + +impl HttpBody { + pub fn from(body: Response) -> Self { + Self { + response: body, + chunk: None, + pos: 0, + } + } +} + +impl AsyncRead for HttpBody { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let mut inner = self.get_mut(); + if let Some(chunk) = inner.chunk.take() { + debug!( + "HttpBody Fake Read buf {} chunk {} pos {}", + buf.len(), + chunk.len(), + inner.pos + ); + let n = min(buf.len(), chunk.len() - inner.pos); + { + let rest = &chunk[inner.pos..]; + buf[..n].clone_from_slice(&rest[..n]); + } + inner.pos += n; + if inner.pos == chunk.len() { + inner.pos = 0; + } else { + inner.chunk = Some(chunk); + } + return Poll::Ready(Ok(n)); + } else { + assert_eq!(inner.pos, 0); + } + + let chunk_future = &mut inner.response.chunk(); + // Safety: `chunk_future` lives only for duration of this poll. So, it doesn't move. + let chunk_future = unsafe { Pin::new_unchecked(chunk_future) }; + match chunk_future.poll(cx) { + Poll::Ready(Err(e)) => { + Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, e))) + } + Poll::Ready(Ok(Some(chunk))) => { + debug!( + "HttpBody Real Read buf {} chunk {} pos {}", + buf.len(), + chunk.len(), + inner.pos + ); + let n = min(buf.len(), chunk.len()); + buf[..n].clone_from_slice(&chunk[..n]); + if buf.len() < chunk.len() { + inner.pos = n; + inner.chunk = Some(chunk); + } + Poll::Ready(Ok(n)) + } + Poll::Ready(Ok(None)) => Poll::Ready(Ok(0)), + Poll::Pending => Poll::Pending, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/cli/lib.rs b/cli/lib.rs index b191d5e876..096bd7abfa 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -30,7 +30,6 @@ pub mod fmt_errors; mod fs; mod global_state; mod global_timer; -mod http_body; mod http_util; mod import_map; mod js; diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index 9db8d68bea..2787b5b7f2 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -1,13 +1,11 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; use super::io::StreamResource; -use crate::http_body::HttpBody; -use crate::http_util::get_client; +use crate::http_util::{get_client, HttpBody}; use crate::ops::json_op; use crate::state::ThreadSafeState; use deno::*; use futures::future::FutureExt; -use futures::StreamExt; use http::header::HeaderName; use http::header::HeaderValue; use http::Method; @@ -66,7 +64,7 @@ pub fn op_fetch( res_headers.push((key.to_string(), val.to_str().unwrap().to_owned())); } - let body = HttpBody::from(res.bytes_stream().boxed()); + let body = HttpBody::from(res); let mut table = state_.lock_resource_table(); let rid = table.add( "httpBody", diff --git a/cli/ops/io.rs b/cli/ops/io.rs index f268adc031..1d832a70e4 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -1,7 +1,7 @@ use super::dispatch_minimal::MinimalOp; use crate::deno_error; use crate::deno_error::bad_resource; -use crate::http_body::HttpBody; +use crate::http_util::HttpBody; use crate::ops::minimal_op; use crate::state::ThreadSafeState; use deno::ErrBox;