mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
perf(http): optimize ReadableStream
s backed by a resource (#14284)
This commit is contained in:
parent
3833d37b15
commit
57a8fc37fc
7 changed files with 187 additions and 56 deletions
|
@ -854,6 +854,45 @@ Deno.test({ permissions: { net: true } }, async function httpServerPanic() {
|
|||
listener.close();
|
||||
});
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { net: true, write: true, read: true } },
|
||||
async function httpServerClosedStream() {
|
||||
const listener = Deno.listen({ port: 4502 });
|
||||
|
||||
const client = await Deno.connect({ port: 4502 });
|
||||
await client.write(new TextEncoder().encode(
|
||||
`GET / HTTP/1.0\r\n\r\n`,
|
||||
));
|
||||
|
||||
const conn = await listener.accept();
|
||||
const httpConn = Deno.serveHttp(conn);
|
||||
const ev = await httpConn.nextRequest();
|
||||
const { respondWith } = ev!;
|
||||
|
||||
const tmpFile = await Deno.makeTempFile();
|
||||
const file = await Deno.open(tmpFile, { write: true, read: true });
|
||||
await file.write(new TextEncoder().encode("hello"));
|
||||
|
||||
const reader = await file.readable.getReader();
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
assert(value);
|
||||
}
|
||||
|
||||
try {
|
||||
await respondWith(new Response(file.readable));
|
||||
fail("The stream should've been locked");
|
||||
} catch {
|
||||
// pass
|
||||
}
|
||||
|
||||
httpConn.close();
|
||||
listener.close();
|
||||
client.close();
|
||||
},
|
||||
);
|
||||
|
||||
// https://github.com/denoland/deno/issues/11595
|
||||
Deno.test(
|
||||
{ permissions: { net: true } },
|
||||
|
|
|
@ -32,7 +32,8 @@
|
|||
} = window.__bootstrap.webSocket;
|
||||
const { TcpConn, UnixConn } = window.__bootstrap.net;
|
||||
const { TlsConn } = window.__bootstrap.tls;
|
||||
const { Deferred } = window.__bootstrap.streams;
|
||||
const { Deferred, getReadableStreamRid, readableStreamClose } =
|
||||
window.__bootstrap.streams;
|
||||
const {
|
||||
ArrayPrototypeIncludes,
|
||||
ArrayPrototypePush,
|
||||
|
@ -235,7 +236,6 @@
|
|||
typeof respBody === "string" ||
|
||||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
|
||||
);
|
||||
|
||||
try {
|
||||
await core.opAsync(
|
||||
"op_http_write_headers",
|
||||
|
@ -269,35 +269,50 @@
|
|||
) {
|
||||
throw new TypeError("Unreachable");
|
||||
}
|
||||
const reader = respBody.getReader();
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
|
||||
await reader.cancel(new TypeError("Value not a Uint8Array"));
|
||||
break;
|
||||
const resourceRid = getReadableStreamRid(respBody);
|
||||
if (resourceRid) {
|
||||
if (respBody.locked) {
|
||||
throw new TypeError("ReadableStream is locked.");
|
||||
}
|
||||
try {
|
||||
await core.opAsync("op_http_write", streamRid, value);
|
||||
} catch (error) {
|
||||
const connError = httpConn[connErrorSymbol];
|
||||
if (
|
||||
ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
|
||||
connError != null
|
||||
) {
|
||||
// deno-lint-ignore no-ex-assign
|
||||
error = new connError.constructor(connError.message);
|
||||
const _reader = respBody.getReader(); // Aquire JS lock.
|
||||
await core.opAsync(
|
||||
"op_http_write_resource",
|
||||
streamRid,
|
||||
resourceRid,
|
||||
);
|
||||
readableStreamClose(respBody); // Release JS lock.
|
||||
} else {
|
||||
const reader = respBody.getReader();
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
|
||||
await reader.cancel(new TypeError("Value not a Uint8Array"));
|
||||
break;
|
||||
}
|
||||
try {
|
||||
await core.opAsync("op_http_write", streamRid, value);
|
||||
} catch (error) {
|
||||
const connError = httpConn[connErrorSymbol];
|
||||
if (
|
||||
ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
|
||||
connError != null
|
||||
) {
|
||||
// deno-lint-ignore no-ex-assign
|
||||
error = new connError.constructor(connError.message);
|
||||
}
|
||||
await reader.cancel(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await core.opAsync("op_http_shutdown", streamRid);
|
||||
} catch (error) {
|
||||
await reader.cancel(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
try {
|
||||
await core.opAsync("op_http_shutdown", streamRid);
|
||||
} catch (error) {
|
||||
await reader.cancel(error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
const deferred = request[_deferred];
|
||||
|
|
|
@ -75,6 +75,7 @@ pub fn init() -> Extension {
|
|||
op_http_read::decl(),
|
||||
op_http_write_headers::decl(),
|
||||
op_http_write::decl(),
|
||||
op_http_write_resource::decl(),
|
||||
op_http_shutdown::decl(),
|
||||
op_http_websocket_accept_header::decl(),
|
||||
op_http_upgrade_websocket::decl(),
|
||||
|
@ -664,6 +665,56 @@ async fn op_http_write_headers(
|
|||
}
|
||||
}
|
||||
|
||||
#[op]
|
||||
async fn op_http_write_resource(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
rid: ResourceId,
|
||||
stream: ResourceId,
|
||||
) -> Result<(), AnyError> {
|
||||
let http_stream = state
|
||||
.borrow()
|
||||
.resource_table
|
||||
.get::<HttpStreamResource>(rid)?;
|
||||
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
|
||||
let resource = state.borrow().resource_table.get_any(stream)?;
|
||||
loop {
|
||||
let body_tx = match &mut *wr {
|
||||
HttpResponseWriter::Body(body_tx) => body_tx,
|
||||
HttpResponseWriter::Headers(_) => {
|
||||
return Err(http_error("no response headers"))
|
||||
}
|
||||
HttpResponseWriter::Closed => {
|
||||
return Err(http_error("response already completed"))
|
||||
}
|
||||
};
|
||||
|
||||
let mut vec = vec![0u8; 64 * 1024];
|
||||
let vec_ptr = vec.as_mut_ptr();
|
||||
let buf = ZeroCopyBuf::new_temp(vec);
|
||||
let nread = resource.clone().read(buf).await?;
|
||||
if nread == 0 {
|
||||
break;
|
||||
}
|
||||
// SAFETY: ZeroCopyBuf keeps the Vec<u8> alive.
|
||||
let bytes =
|
||||
Bytes::from_static(unsafe { std::slice::from_raw_parts(vec_ptr, nread) });
|
||||
match body_tx.send_data(bytes).await {
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
// Don't return "channel closed", that's an implementation detail.
|
||||
// Pull up the failure associated with the transport connection instead.
|
||||
assert!(err.is_closed());
|
||||
http_stream.conn.closed().await?;
|
||||
// If there was no connection error, drop body_tx.
|
||||
*wr = HttpResponseWriter::Closed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
take(&mut *wr);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[op]
|
||||
async fn op_http_write(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
((window) => {
|
||||
const core = window.Deno.core;
|
||||
const { BadResourcePrototype, InterruptedPrototype } = core;
|
||||
const { ReadableStream, WritableStream } = window.__bootstrap.streams;
|
||||
const { WritableStream, readableStreamForRid } = window.__bootstrap.streams;
|
||||
const {
|
||||
Error,
|
||||
ObjectPrototypeIsPrototypeOf,
|
||||
|
@ -65,8 +65,6 @@
|
|||
return core.opAsync("op_dns_resolve", { query, recordType, options });
|
||||
}
|
||||
|
||||
const DEFAULT_CHUNK_SIZE = 64 * 1024;
|
||||
|
||||
function tryClose(rid) {
|
||||
try {
|
||||
core.close(rid);
|
||||
|
@ -75,32 +73,6 @@
|
|||
}
|
||||
}
|
||||
|
||||
function readableStreamForRid(rid) {
|
||||
return new ReadableStream({
|
||||
type: "bytes",
|
||||
async pull(controller) {
|
||||
const v = controller.byobRequest.view;
|
||||
try {
|
||||
const bytesRead = await read(rid, v);
|
||||
if (bytesRead === null) {
|
||||
tryClose(rid);
|
||||
controller.close();
|
||||
controller.byobRequest.respond(0);
|
||||
} else {
|
||||
controller.byobRequest.respond(bytesRead);
|
||||
}
|
||||
} catch (e) {
|
||||
controller.error(e);
|
||||
tryClose(rid);
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
tryClose(rid);
|
||||
},
|
||||
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
|
||||
});
|
||||
}
|
||||
|
||||
function writableStreamForRid(rid) {
|
||||
return new WritableStream({
|
||||
async write(chunk, controller) {
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
"use strict";
|
||||
|
||||
((window) => {
|
||||
const core = window.Deno.core;
|
||||
const webidl = window.__bootstrap.webidl;
|
||||
const { add, remove, signalAbort, newSignal, AbortSignalPrototype } =
|
||||
window.__bootstrap.abortSignal;
|
||||
|
@ -640,6 +641,41 @@
|
|||
return stream[_disturbed];
|
||||
}
|
||||
|
||||
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
|
||||
|
||||
function readableStreamForRid(rid) {
|
||||
const stream = new ReadableStream({
|
||||
type: "bytes",
|
||||
async pull(controller) {
|
||||
const v = controller.byobRequest.view;
|
||||
try {
|
||||
const bytesRead = await core.read(rid, v);
|
||||
if (bytesRead === 0) {
|
||||
core.tryClose(rid);
|
||||
controller.close();
|
||||
controller.byobRequest.respond(0);
|
||||
} else {
|
||||
controller.byobRequest.respond(bytesRead);
|
||||
}
|
||||
} catch (e) {
|
||||
controller.error(e);
|
||||
core.tryClose(rid);
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
core.tryClose(rid);
|
||||
},
|
||||
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
|
||||
});
|
||||
|
||||
stream[_maybeRid] = rid;
|
||||
return stream;
|
||||
}
|
||||
|
||||
function getReadableStreamRid(stream) {
|
||||
return stream[_maybeRid];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {unknown} value
|
||||
* @returns {value is WritableStream}
|
||||
|
@ -4288,6 +4324,7 @@
|
|||
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
|
||||
}
|
||||
|
||||
const _maybeRid = Symbol("[[maybeRid]]");
|
||||
/** @template R */
|
||||
class ReadableStream {
|
||||
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
|
||||
|
@ -4302,6 +4339,8 @@
|
|||
[_state];
|
||||
/** @type {any} */
|
||||
[_storedError];
|
||||
/** @type {number | null} */
|
||||
[_maybeRid] = null;
|
||||
|
||||
/**
|
||||
* @param {UnderlyingSource<R>=} underlyingSource
|
||||
|
@ -5840,6 +5879,9 @@
|
|||
errorReadableStream,
|
||||
createProxy,
|
||||
writableStreamClose,
|
||||
readableStreamClose,
|
||||
readableStreamForRid,
|
||||
getReadableStreamRid,
|
||||
Deferred,
|
||||
// Exposed in global runtime scope
|
||||
ByteLengthQueuingStrategy,
|
||||
|
|
|
@ -6,8 +6,8 @@
|
|||
const { read, readSync, write, writeSync } = window.__bootstrap.io;
|
||||
const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs;
|
||||
const { pathFromURL } = window.__bootstrap.util;
|
||||
const { readableStreamForRid, writableStreamForRid } =
|
||||
window.__bootstrap.streamUtils;
|
||||
const { writableStreamForRid } = window.__bootstrap.streamUtils;
|
||||
const { readableStreamForRid } = window.__bootstrap.streams;
|
||||
const {
|
||||
ArrayPrototypeFilter,
|
||||
Error,
|
||||
|
|
|
@ -14,19 +14,28 @@ use crate::magic::transl8::impl_magic;
|
|||
pub enum MagicBuffer {
|
||||
FromV8(ZeroCopyBuf),
|
||||
ToV8(Mutex<Option<Box<[u8]>>>),
|
||||
// Variant of the MagicBuffer than is never exposed to the JS.
|
||||
// Generally used to pass Vec<u8> backed buffers to resource methods.
|
||||
Temp(Vec<u8>),
|
||||
}
|
||||
|
||||
impl_magic!(MagicBuffer);
|
||||
|
||||
impl MagicBuffer {
|
||||
pub fn empty() -> Self {
|
||||
MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice())))
|
||||
}
|
||||
|
||||
pub fn new_temp(vec: Vec<u8>) -> Self {
|
||||
MagicBuffer::Temp(vec)
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for MagicBuffer {
|
||||
fn clone(&self) -> Self {
|
||||
match self {
|
||||
Self::FromV8(zbuf) => Self::FromV8(zbuf.clone()),
|
||||
Self::Temp(vec) => Self::Temp(vec.clone()),
|
||||
Self::ToV8(_) => panic!("Don't Clone a MagicBuffer sent to v8"),
|
||||
}
|
||||
}
|
||||
|
@ -49,6 +58,7 @@ impl Deref for MagicBuffer {
|
|||
fn deref(&self) -> &[u8] {
|
||||
match self {
|
||||
Self::FromV8(buf) => &*buf,
|
||||
Self::Temp(vec) => &*vec,
|
||||
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
|
||||
}
|
||||
}
|
||||
|
@ -58,6 +68,7 @@ impl DerefMut for MagicBuffer {
|
|||
fn deref_mut(&mut self) -> &mut [u8] {
|
||||
match self {
|
||||
Self::FromV8(buf) => &mut *buf,
|
||||
Self::Temp(vec) => &mut *vec,
|
||||
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
|
||||
}
|
||||
}
|
||||
|
@ -85,6 +96,7 @@ impl ToV8 for MagicBuffer {
|
|||
let value: &[u8] = buf;
|
||||
value.into()
|
||||
}
|
||||
Self::Temp(_) => unreachable!(),
|
||||
Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"),
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in a new issue