1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

chore(ext/fetch): remove op_fetch_response_upgrade (#25421)

This commit is contained in:
Satya Rohith 2024-09-04 18:10:28 +05:30 committed by GitHub
parent 84dc375b2d
commit 334c842392
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 0 additions and 114 deletions

View file

@ -78,6 +78,5 @@ Following ops are provided, which can be accessed through `Deno.ops`:
- op_fetch - op_fetch
- op_fetch_send - op_fetch_send
- op_fetch_response_upgrade
- op_utf8_to_byte_string - op_utf8_to_byte_string
- op_fetch_custom_client - op_fetch_custom_client

View file

@ -28,7 +28,6 @@ use deno_core::futures::Stream;
use deno_core::futures::StreamExt; use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt; use deno_core::futures::TryFutureExt;
use deno_core::op2; use deno_core::op2;
use deno_core::unsync::spawn;
use deno_core::url::Url; use deno_core::url::Url;
use deno_core::AsyncRefCell; use deno_core::AsyncRefCell;
use deno_core::AsyncResult; use deno_core::AsyncResult;
@ -70,12 +69,9 @@ use hyper::body::Frame;
use hyper_util::client::legacy::connect::HttpConnector; use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::connect::HttpInfo; use hyper_util::client::legacy::connect::HttpInfo;
use hyper_util::rt::TokioExecutor; use hyper_util::rt::TokioExecutor;
use hyper_util::rt::TokioIo;
use hyper_util::rt::TokioTimer; use hyper_util::rt::TokioTimer;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tower::ServiceExt; use tower::ServiceExt;
use tower_http::decompression::Decompression; use tower_http::decompression::Decompression;
@ -127,7 +123,6 @@ deno_core::extension!(deno_fetch,
ops = [ ops = [
op_fetch<FP>, op_fetch<FP>,
op_fetch_send, op_fetch_send,
op_fetch_response_upgrade,
op_utf8_to_byte_string, op_utf8_to_byte_string,
op_fetch_custom_client<FP>, op_fetch_custom_client<FP>,
], ],
@ -627,114 +622,6 @@ pub async fn op_fetch_send(
}) })
} }
#[op2(async)]
#[smi]
pub async fn op_fetch_response_upgrade(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<ResourceId, AnyError> {
let raw_response = state
.borrow_mut()
.resource_table
.take::<FetchResponseResource>(rid)?;
let raw_response = Rc::try_unwrap(raw_response)
.expect("Someone is holding onto FetchResponseResource");
let (read, write) = tokio::io::duplex(1024);
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
let upgraded = raw_response.upgrade().await?;
{
// Stage 3: Pump the data
let (mut upgraded_rx, mut upgraded_tx) =
tokio::io::split(TokioIo::new(upgraded));
spawn(async move {
let mut buf = [0; 1024];
loop {
let read = upgraded_rx.read(&mut buf).await?;
if read == 0 {
break;
}
read_tx.write_all(&buf[..read]).await?;
}
Ok::<_, AnyError>(())
});
spawn(async move {
let mut buf = [0; 1024];
loop {
let read = write_rx.read(&mut buf).await?;
if read == 0 {
break;
}
upgraded_tx.write_all(&buf[..read]).await?;
}
Ok::<_, AnyError>(())
});
}
Ok(
state
.borrow_mut()
.resource_table
.add(UpgradeStream::new(read_rx, write_tx)),
)
}
struct UpgradeStream {
read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,
cancel_handle: CancelHandle,
}
impl UpgradeStream {
pub fn new(
read: tokio::io::ReadHalf<tokio::io::DuplexStream>,
write: tokio::io::WriteHalf<tokio::io::DuplexStream>,
) -> Self {
Self {
read: AsyncRefCell::new(read),
write: AsyncRefCell::new(write),
cancel_handle: CancelHandle::new(),
}
}
async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let read = RcRef::map(self, |this| &this.read);
let mut read = read.borrow_mut().await;
Ok(Pin::new(&mut *read).read(buf).await?)
}
.try_or_cancel(cancel_handle)
.await
}
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let write = RcRef::map(self, |this| &this.write);
let mut write = write.borrow_mut().await;
Ok(Pin::new(&mut *write).write(buf).await?)
}
.try_or_cancel(cancel_handle)
.await
}
}
impl Resource for UpgradeStream {
fn name(&self) -> Cow<str> {
"fetchUpgradedStream".into()
}
deno_core::impl_readable_byob!();
deno_core::impl_writable!();
fn close(self: Rc<Self>) {
self.cancel_handle.cancel();
}
}
type CancelableResponseResult = type CancelableResponseResult =
Result<Result<http::Response<ResBody>, AnyError>, Canceled>; Result<Result<http::Response<ResBody>, AnyError>, Canceled>;