diff --git a/ext/fetch/README.md b/ext/fetch/README.md index d088a6147c..3af8110a6f 100644 --- a/ext/fetch/README.md +++ b/ext/fetch/README.md @@ -78,6 +78,5 @@ Following ops are provided, which can be accessed through `Deno.ops`: - op_fetch - op_fetch_send -- op_fetch_response_upgrade - op_utf8_to_byte_string - op_fetch_custom_client diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index a3f9eeb4fb..79659771e6 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -28,7 +28,6 @@ use deno_core::futures::Stream; use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::op2; -use deno_core::unsync::spawn; use deno_core::url::Url; use deno_core::AsyncRefCell; use deno_core::AsyncResult; @@ -70,12 +69,9 @@ use hyper::body::Frame; use hyper_util::client::legacy::connect::HttpConnector; use hyper_util::client::legacy::connect::HttpInfo; use hyper_util::rt::TokioExecutor; -use hyper_util::rt::TokioIo; use hyper_util::rt::TokioTimer; use serde::Deserialize; use serde::Serialize; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncWriteExt; use tower::ServiceExt; use tower_http::decompression::Decompression; @@ -127,7 +123,6 @@ deno_core::extension!(deno_fetch, ops = [ op_fetch, op_fetch_send, - op_fetch_response_upgrade, op_utf8_to_byte_string, op_fetch_custom_client, ], @@ -627,114 +622,6 @@ pub async fn op_fetch_send( }) } -#[op2(async)] -#[smi] -pub async fn op_fetch_response_upgrade( - state: Rc>, - #[smi] rid: ResourceId, -) -> Result { - let raw_response = state - .borrow_mut() - .resource_table - .take::(rid)?; - let raw_response = Rc::try_unwrap(raw_response) - .expect("Someone is holding onto FetchResponseResource"); - - let (read, write) = tokio::io::duplex(1024); - let (read_rx, write_tx) = tokio::io::split(read); - let (mut write_rx, mut read_tx) = tokio::io::split(write); - let upgraded = raw_response.upgrade().await?; - { - // Stage 3: Pump the data - let (mut upgraded_rx, mut upgraded_tx) = - tokio::io::split(TokioIo::new(upgraded)); - - spawn(async move { - let mut buf = [0; 1024]; - loop { - let read = upgraded_rx.read(&mut buf).await?; - if read == 0 { - break; - } - read_tx.write_all(&buf[..read]).await?; - } - Ok::<_, AnyError>(()) - }); - spawn(async move { - let mut buf = [0; 1024]; - loop { - let read = write_rx.read(&mut buf).await?; - if read == 0 { - break; - } - upgraded_tx.write_all(&buf[..read]).await?; - } - Ok::<_, AnyError>(()) - }); - } - - Ok( - state - .borrow_mut() - .resource_table - .add(UpgradeStream::new(read_rx, write_tx)), - ) -} - -struct UpgradeStream { - read: AsyncRefCell>, - write: AsyncRefCell>, - cancel_handle: CancelHandle, -} - -impl UpgradeStream { - pub fn new( - read: tokio::io::ReadHalf, - write: tokio::io::WriteHalf, - ) -> Self { - Self { - read: AsyncRefCell::new(read), - write: AsyncRefCell::new(write), - cancel_handle: CancelHandle::new(), - } - } - - async fn read(self: Rc, buf: &mut [u8]) -> Result { - let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); - async { - let read = RcRef::map(self, |this| &this.read); - let mut read = read.borrow_mut().await; - Ok(Pin::new(&mut *read).read(buf).await?) - } - .try_or_cancel(cancel_handle) - .await - } - - async fn write(self: Rc, buf: &[u8]) -> Result { - let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); - async { - let write = RcRef::map(self, |this| &this.write); - let mut write = write.borrow_mut().await; - Ok(Pin::new(&mut *write).write(buf).await?) - } - .try_or_cancel(cancel_handle) - .await - } -} - -impl Resource for UpgradeStream { - fn name(&self) -> Cow { - "fetchUpgradedStream".into() - } - - deno_core::impl_readable_byob!(); - deno_core::impl_writable!(); - - fn close(self: Rc) { - self.cancel_handle.cancel(); - } -} - type CancelableResponseResult = Result, AnyError>, Canceled>;