1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-08 07:08:27 -05:00
denoland-deno/ext/web/stream_resource.rs
Matt Mastracci 375bf08ce2 feat(ext/web): resourceForReadableStream (#20180)
Extracted from fast streams work.

This is a resource wrapper for `ReadableStream`, allowing us to treat
all `ReadableStream` instances as resources, and remove special paths in
both `fetch` and `serve`.

Performance with a ReadableStream response yields ~18% improvement:

```
  return new Response(new ReadableStream({
    start(controller) {
      controller.enqueue(new Uint8Array([104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100]));
      controller.close();
    }
  })
```

This patch:

```
12:36 $ third_party/prebuilt/mac/wrk http://localhost:8080
Running 10s test @ http://localhost:8080
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    99.96us  100.03us   6.65ms   98.84%
    Req/Sec    47.73k     2.43k   51.02k    89.11%
  959308 requests in 10.10s, 117.10MB read
Requests/sec:  94978.71
Transfer/sec:     11.59MB
```

main:

```
Running 10s test @ http://localhost:8080
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   163.03us  685.51us  19.73ms   99.27%
    Req/Sec    39.50k     3.98k   66.11k    95.52%
  789582 requests in 10.10s, 82.83MB read
Requests/sec:  78182.65
Transfer/sec:      8.20MB
```
2023-08-21 17:00:45 +05:30

274 lines
6.8 KiB
Rust

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_core::anyhow::Error;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcLike;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use futures::stream::Peekable;
use futures::Stream;
use futures::StreamExt;
use std::borrow::Cow;
use std::cell::RefCell;
use std::ffi::c_void;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
type SenderCell = RefCell<Option<Sender<Result<BufView, Error>>>>;
// This indirection allows us to more easily integrate the fast streams work at a later date
#[repr(transparent)]
struct ChannelStreamAdapter<C>(C);
impl<C> Stream for ChannelStreamAdapter<C>
where
C: ChannelBytesRead,
{
type Item = Result<BufView, AnyError>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
self.0.poll_recv(cx)
}
}
pub trait ChannelBytesRead: Unpin + 'static {
fn poll_recv(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<BufView, AnyError>>>;
}
impl ChannelBytesRead for tokio::sync::mpsc::Receiver<Result<BufView, Error>> {
fn poll_recv(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<BufView, AnyError>>> {
self.poll_recv(cx)
}
}
#[allow(clippy::type_complexity)]
struct ReadableStreamResource {
reader: AsyncRefCell<
Peekable<ChannelStreamAdapter<Receiver<Result<BufView, Error>>>>,
>,
cancel_handle: CancelHandle,
data: ReadableStreamResourceData,
}
impl ReadableStreamResource {
pub fn cancel_handle(self: &Rc<Self>) -> impl RcLike<CancelHandle> {
RcRef::map(self, |s| &s.cancel_handle).clone()
}
async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, AnyError> {
let cancel_handle = self.cancel_handle();
let peekable = RcRef::map(self, |this| &this.reader);
let mut peekable = peekable.borrow_mut().await;
match Pin::new(&mut *peekable)
.peek_mut()
.or_cancel(cancel_handle)
.await?
{
None => Ok(BufView::empty()),
// Take the actual error since we only have a reference to it
Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()),
Some(Ok(bytes)) => {
if bytes.len() <= limit {
// We can safely take the next item since we peeked it
return peekable.next().await.unwrap();
}
// The remainder of the bytes after we split it is still left in the peek buffer
let ret = bytes.split_to(limit);
Ok(ret)
}
}
}
}
impl Resource for ReadableStreamResource {
fn name(&self) -> Cow<str> {
Cow::Borrowed("readableStream")
}
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
Box::pin(ReadableStreamResource::read(self, limit))
}
}
// TODO(mmastrac): Move this to deno_core
#[derive(Clone, Debug, Default)]
pub struct CompletionHandle {
inner: Rc<RefCell<CompletionHandleInner>>,
}
#[derive(Debug, Default)]
struct CompletionHandleInner {
complete: bool,
success: bool,
waker: Option<Waker>,
}
impl CompletionHandle {
pub fn complete(&self, success: bool) {
let mut mut_self = self.inner.borrow_mut();
mut_self.complete = true;
mut_self.success = success;
if let Some(waker) = mut_self.waker.take() {
drop(mut_self);
waker.wake();
}
}
}
impl Future for CompletionHandle {
type Output = bool;
fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let mut mut_self = self.inner.borrow_mut();
if mut_self.complete {
return std::task::Poll::Ready(mut_self.success);
}
mut_self.waker = Some(cx.waker().clone());
std::task::Poll::Pending
}
}
fn sender_closed() -> Error {
type_error("sender closed")
}
/// Allocate a resource that wraps a ReadableStream.
#[op2(fast)]
#[smi]
pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let tx = RefCell::new(Some(tx));
let completion = CompletionHandle::default();
let tx = Box::new(tx);
let resource = ReadableStreamResource {
cancel_handle: Default::default(),
reader: AsyncRefCell::new(ChannelStreamAdapter(rx).peekable()),
data: ReadableStreamResourceData {
tx: Box::into_raw(tx),
completion,
},
};
state.resource_table.add(resource)
}
#[op2(fast)]
pub fn op_readable_stream_resource_get_sink(
state: &mut OpState,
#[smi] rid: ResourceId,
) -> *const c_void {
let Ok(resource) = state.resource_table.get::<ReadableStreamResource>(rid) else {
return std::ptr::null();
};
resource.data.tx as _
}
fn get_sender(sender: *const c_void) -> Option<Sender<Result<BufView, Error>>> {
// SAFETY: We know this is a valid v8::External
unsafe {
(sender as *const SenderCell)
.as_ref()
.and_then(|r| r.borrow_mut().as_ref().cloned())
}
}
fn drop_sender(sender: *const c_void) {
// SAFETY: We know this is a valid v8::External
unsafe {
assert!(!sender.is_null());
_ = Box::from_raw(sender as *mut SenderCell);
}
}
#[op2(async)]
pub fn op_readable_stream_resource_write_buf(
sender: *const c_void,
#[buffer] buffer: JsBuffer,
) -> impl Future<Output = Result<(), Error>> {
let sender = get_sender(sender);
async move {
let sender = sender.ok_or_else(sender_closed)?;
sender
.send(Ok(buffer.into()))
.await
.map_err(|_| sender_closed())?;
Ok(())
}
}
#[op2(async)]
pub fn op_readable_stream_resource_write_error(
sender: *const c_void,
#[string] error: String,
) -> impl Future<Output = Result<(), Error>> {
let sender = get_sender(sender);
async move {
let sender = sender.ok_or_else(sender_closed)?;
sender
.send(Err(type_error(Cow::Owned(error))))
.await
.map_err(|_| sender_closed())?;
Ok(())
}
}
#[op2(fast)]
#[smi]
pub fn op_readable_stream_resource_close(sender: *const c_void) {
drop_sender(sender);
}
#[op2(async)]
pub fn op_readable_stream_resource_await_close(
state: &mut OpState,
#[smi] rid: ResourceId,
) -> impl Future<Output = ()> {
let completion = state
.resource_table
.get::<ReadableStreamResource>(rid)
.ok()
.map(|r| r.data.completion.clone());
async move {
if let Some(completion) = completion {
completion.await;
}
}
}
struct ReadableStreamResourceData {
tx: *const SenderCell,
completion: CompletionHandle,
}
impl Drop for ReadableStreamResourceData {
fn drop(&mut self) {
self.completion.complete(true);
}
}