1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 16:42:21 -05:00

Reland "perf(http): optimize ReadableStreams backed by a resource" (#14346)

This commit is contained in:
Divy Srivastava 2022-04-22 16:19:08 +05:30 committed by GitHub
parent 2724235ec7
commit 57f7e07c13
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 277 additions and 76 deletions

View file

@ -236,7 +236,7 @@ jobs:
~/.cargo/registry/index
~/.cargo/registry/cache
~/.cargo/git/db
key: 7-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }}
key: 8-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }}
# In main branch, always creates fresh cache
- name: Cache build output (main)
@ -252,7 +252,7 @@ jobs:
!./target/*/*.zip
!./target/*/*.tar.gz
key: |
7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }}
8-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }}
# Restore cache from the latest 'main' branch build.
- name: Cache build output (PR)
@ -268,7 +268,7 @@ jobs:
!./target/*/*.tar.gz
key: never_saved
restore-keys: |
7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-
8-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-
# Don't save cache after building PRs or branches other than 'main'.
- name: Skip save cache (PR)

View file

@ -854,6 +854,47 @@ Deno.test({ permissions: { net: true } }, async function httpServerPanic() {
listener.close();
});
Deno.test(
{ permissions: { net: true, write: true, read: true } },
async function httpServerClosedStream() {
const listener = Deno.listen({ port: 4502 });
const client = await Deno.connect({ port: 4502 });
await client.write(new TextEncoder().encode(
`GET / HTTP/1.0\r\n\r\n`,
));
const conn = await listener.accept();
const httpConn = Deno.serveHttp(conn);
const ev = await httpConn.nextRequest();
const { respondWith } = ev!;
const tmpFile = await Deno.makeTempFile();
const file = await Deno.open(tmpFile, { write: true, read: true });
await file.write(new TextEncoder().encode("hello"));
const reader = await file.readable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
assert(value);
}
let didThrow = false;
try {
await respondWith(new Response(file.readable));
} catch {
// pass
didThrow = true;
}
assert(didThrow);
httpConn.close();
listener.close();
client.close();
},
);
// https://github.com/denoland/deno/issues/11595
Deno.test(
{ permissions: { net: true } },

View file

@ -83,13 +83,18 @@ struct TcpStream {
}
impl TcpStream {
async fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> Result<usize, Error> {
async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
) -> Result<(usize, ZeroCopyBuf), Error> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
rd.read(&mut buf)
let nread = rd
.read(&mut buf)
.try_or_cancel(cancel)
.await
.map_err(Error::from)
.map_err(Error::from)?;
Ok((nread, buf))
}
async fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> Result<usize, Error> {
@ -99,7 +104,10 @@ impl TcpStream {
}
impl Resource for TcpStream {
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}

View file

@ -36,7 +36,17 @@ pub trait Resource: Any + 'static {
}
/// Resources may implement `read()` to be a readable stream
fn read(self: Rc<Self>, _buf: ZeroCopyBuf) -> AsyncResult<usize> {
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(async move {
let (nread, _) = self.read_return(buf).await?;
Ok(nread)
})
}
fn read_return(
self: Rc<Self>,
_buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(futures::future::err(not_supported()))
}

View file

@ -485,12 +485,15 @@ impl Resource for FetchResponseBodyResource {
"fetchResponseBody".into()
}
fn read(self: Rc<Self>, mut buf: ZeroCopyBuf) -> AsyncResult<usize> {
fn read_return(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(async move {
let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await;
let cancel = RcRef::map(self, |r| &r.cancel);
let read = reader.read(&mut buf).try_or_cancel(cancel).await?;
Ok(read)
Ok((read, buf))
})
}

View file

@ -32,7 +32,8 @@
} = window.__bootstrap.webSocket;
const { TcpConn, UnixConn } = window.__bootstrap.net;
const { TlsConn } = window.__bootstrap.tls;
const { Deferred } = window.__bootstrap.streams;
const { Deferred, getReadableStreamRid, readableStreamClose } =
window.__bootstrap.streams;
const {
ArrayPrototypeIncludes,
ArrayPrototypePush,
@ -235,7 +236,6 @@
typeof respBody === "string" ||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
);
try {
await core.opAsync(
"op_http_write_headers",
@ -269,6 +269,32 @@
) {
throw new TypeError("Unreachable");
}
const resourceRid = getReadableStreamRid(respBody);
if (resourceRid) {
if (respBody.locked) {
throw new TypeError("ReadableStream is locked.");
}
const reader = respBody.getReader(); // Aquire JS lock.
try {
await core.opAsync(
"op_http_write_resource",
streamRid,
resourceRid,
);
readableStreamClose(respBody); // Release JS lock.
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (
ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
connError != null
) {
// deno-lint-ignore no-ex-assign
error = new connError.constructor(connError.message);
}
await reader.cancel(error);
throw error;
}
} else {
const reader = respBody.getReader();
while (true) {
const { value, done } = await reader.read();
@ -292,6 +318,7 @@
throw error;
}
}
try {
await core.opAsync("op_http_shutdown", streamRid);
} catch (error) {
@ -299,6 +326,7 @@
throw error;
}
}
}
const deferred = request[_deferred];
if (deferred) {

View file

@ -77,6 +77,7 @@ pub fn init() -> Extension {
op_http_read::decl(),
op_http_write_headers::decl(),
op_http_write::decl(),
op_http_write_resource::decl(),
op_http_shutdown::decl(),
op_http_websocket_accept_header::decl(),
op_http_upgrade_websocket::decl(),
@ -683,6 +684,63 @@ async fn op_http_write_headers(
}
}
#[op]
async fn op_http_write_resource(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
stream: ResourceId,
) -> Result<(), AnyError> {
let http_stream = state
.borrow()
.resource_table
.get::<HttpStreamResource>(rid)?;
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
let resource = state.borrow().resource_table.get_any(stream)?;
loop {
let body_writer = match &mut *wr {
HttpResponseWriter::Body(body_writer) => body_writer,
HttpResponseWriter::Headers(_) => {
return Err(http_error("no response headers"))
}
HttpResponseWriter::Closed => {
return Err(http_error("response already completed"))
}
};
let vec = vec![0u8; 64 * 1024]; // 64KB
let buf = ZeroCopyBuf::new_temp(vec);
let (nread, buf) = resource.clone().read_return(buf).await?;
if nread == 0 {
break;
}
match body_writer.write_all(&buf[..nread]).await {
Ok(_) => {}
Err(err) => {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
http_stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
}
}
}
let wr = take(&mut *wr);
if let HttpResponseWriter::Body(mut body_writer) = wr {
match body_writer.shutdown().await {
Ok(_) => {}
Err(err) => {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
http_stream.conn.closed().await?;
}
}
}
Ok(())
}
#[op]
async fn op_http_write(
state: Rc<RefCell<OpState>>,

View file

@ -4,7 +4,7 @@
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype } = core;
const { ReadableStream, WritableStream } = window.__bootstrap.streams;
const { WritableStream, readableStreamForRid } = window.__bootstrap.streams;
const {
Error,
ObjectPrototypeIsPrototypeOf,
@ -65,8 +65,6 @@
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
const DEFAULT_CHUNK_SIZE = 64 * 1024;
function tryClose(rid) {
try {
core.close(rid);
@ -75,32 +73,6 @@
}
}
function readableStreamForRid(rid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
const v = controller.byobRequest.view;
try {
const bytesRead = await read(rid, v);
if (bytesRead === null) {
tryClose(rid);
controller.close();
controller.byobRequest.respond(0);
} else {
controller.byobRequest.respond(bytesRead);
}
} catch (e) {
controller.error(e);
tryClose(rid);
}
},
cancel() {
tryClose(rid);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
function writableStreamForRid(rid) {
return new WritableStream({
async write(chunk, controller) {

View file

@ -70,13 +70,13 @@ where
pub async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
) -> Result<usize, AnyError> {
) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = self.rd_borrow_mut().await;
let nread = rd
.read(&mut buf)
.try_or_cancel(self.cancel_handle())
.await?;
Ok(nread)
Ok((nread, buf))
}
pub async fn write(
@ -103,7 +103,10 @@ impl Resource for TcpStreamResource {
"tcpStream".into()
}
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
@ -160,7 +163,7 @@ impl UnixStreamResource {
pub async fn read(
self: Rc<Self>,
_buf: ZeroCopyBuf,
) -> Result<usize, AnyError> {
) -> Result<(usize, ZeroCopyBuf), AnyError> {
unreachable!()
}
pub async fn write(
@ -182,7 +185,10 @@ impl Resource for UnixStreamResource {
"unixStream".into()
}
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}

View file

@ -674,11 +674,11 @@ impl TlsStreamResource {
pub async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
) -> Result<usize, AnyError> {
) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?;
Ok(nread)
Ok((nread, buf))
}
pub async fn write(
@ -722,7 +722,10 @@ impl Resource for TlsStreamResource {
"tlsStream".into()
}
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}

View file

@ -8,6 +8,7 @@
"use strict";
((window) => {
const core = window.Deno.core;
const webidl = window.__bootstrap.webidl;
const { add, remove, signalAbort, newSignal, AbortSignalPrototype } =
window.__bootstrap.abortSignal;
@ -640,6 +641,41 @@
return stream[_disturbed];
}
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
function readableStreamForRid(rid) {
const stream = new ReadableStream({
type: "bytes",
async pull(controller) {
const v = controller.byobRequest.view;
try {
const bytesRead = await core.read(rid, v);
if (bytesRead === 0) {
core.tryClose(rid);
controller.close();
controller.byobRequest.respond(0);
} else {
controller.byobRequest.respond(bytesRead);
}
} catch (e) {
controller.error(e);
core.tryClose(rid);
}
},
cancel() {
core.tryClose(rid);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
stream[_maybeRid] = rid;
return stream;
}
function getReadableStreamRid(stream) {
return stream[_maybeRid];
}
/**
* @param {unknown} value
* @returns {value is WritableStream}
@ -4288,6 +4324,7 @@
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
}
const _maybeRid = Symbol("[[maybeRid]]");
/** @template R */
class ReadableStream {
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
@ -4302,6 +4339,8 @@
[_state];
/** @type {any} */
[_storedError];
/** @type {number | null} */
[_maybeRid] = null;
/**
* @param {UnderlyingSource<R>=} underlyingSource
@ -5840,6 +5879,9 @@
errorReadableStream,
createProxy,
writableStreamClose,
readableStreamClose,
readableStreamForRid,
getReadableStreamRid,
Deferred,
// Exposed in global runtime scope
ByteLengthQueuingStrategy,

View file

@ -6,8 +6,8 @@
const { read, readSync, write, writeSync } = window.__bootstrap.io;
const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs;
const { pathFromURL } = window.__bootstrap.util;
const { readableStreamForRid, writableStreamForRid } =
window.__bootstrap.streamUtils;
const { writableStreamForRid } = window.__bootstrap.streamUtils;
const { readableStreamForRid } = window.__bootstrap.streams;
const {
ArrayPrototypeFilter,
Error,

View file

@ -174,13 +174,13 @@ where
async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
) -> Result<usize, AnyError> {
) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = self.borrow_mut().await;
let nread = rd
.read(&mut buf)
.try_or_cancel(self.cancel_handle())
.await?;
Ok(nread)
Ok((nread, buf))
}
pub fn into_inner(self) -> S {
@ -211,7 +211,10 @@ impl Resource for ChildStdoutResource {
"childStdout".into()
}
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
@ -227,7 +230,10 @@ impl Resource for ChildStderrResource {
"childStderr".into()
}
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}
@ -271,16 +277,17 @@ impl StdFileResource {
async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
) -> Result<usize, AnyError> {
) -> Result<(usize, ZeroCopyBuf), AnyError> {
if self.fs_file.is_some() {
let fs_file = self.fs_file.as_ref().unwrap();
let std_file = fs_file.0.as_ref().unwrap().clone();
tokio::task::spawn_blocking(move || {
tokio::task::spawn_blocking(
move || -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut std_file = std_file.lock().unwrap();
std_file.read(&mut buf)
})
Ok((std_file.read(&mut buf)?, buf))
},
)
.await?
.map_err(AnyError::from)
} else {
Err(resource_unavailable())
}
@ -330,7 +337,10 @@ impl Resource for StdFileResource {
self.name.as_str().into()
}
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
fn read_return(
self: Rc<Self>,
buf: ZeroCopyBuf,
) -> AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(buf))
}

View file

@ -14,19 +14,36 @@ use crate::magic::transl8::impl_magic;
pub enum MagicBuffer {
FromV8(ZeroCopyBuf),
ToV8(Mutex<Option<Box<[u8]>>>),
// Variant of the MagicBuffer than is never exposed to the JS.
// Generally used to pass Vec<u8> backed buffers to resource methods.
Temp(Vec<u8>),
}
impl_magic!(MagicBuffer);
impl MagicBuffer {
pub fn empty() -> Self {
MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice())))
}
pub fn new_temp(vec: Vec<u8>) -> Self {
MagicBuffer::Temp(vec)
}
// TODO(@littledivy): Temporary, this needs a refactor.
pub fn to_temp(self) -> Vec<u8> {
match self {
MagicBuffer::Temp(vec) => vec,
_ => unreachable!(),
}
}
}
impl Clone for MagicBuffer {
fn clone(&self) -> Self {
match self {
Self::FromV8(zbuf) => Self::FromV8(zbuf.clone()),
Self::Temp(vec) => Self::Temp(vec.clone()),
Self::ToV8(_) => panic!("Don't Clone a MagicBuffer sent to v8"),
}
}
@ -49,6 +66,7 @@ impl Deref for MagicBuffer {
fn deref(&self) -> &[u8] {
match self {
Self::FromV8(buf) => &*buf,
Self::Temp(vec) => &*vec,
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
}
}
@ -58,6 +76,7 @@ impl DerefMut for MagicBuffer {
fn deref_mut(&mut self) -> &mut [u8] {
match self {
Self::FromV8(buf) => &mut *buf,
Self::Temp(vec) => &mut *vec,
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
}
}
@ -85,6 +104,7 @@ impl ToV8 for MagicBuffer {
let value: &[u8] = buf;
value.into()
}
Self::Temp(_) => unreachable!(),
Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"),
};