mirror of
https://github.com/denoland/deno.git
synced 2024-11-24 15:19:26 -05:00
Revert "perf(http): optimize ReadableStream
s backed by a resource (#14284)"
This reverts commit 57a8fc37fc
.
This commit is contained in:
parent
57a8fc37fc
commit
b115a74eee
7 changed files with 55 additions and 186 deletions
|
@ -854,45 +854,6 @@ Deno.test({ permissions: { net: true } }, async function httpServerPanic() {
|
||||||
listener.close();
|
listener.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.test(
|
|
||||||
{ permissions: { net: true, write: true, read: true } },
|
|
||||||
async function httpServerClosedStream() {
|
|
||||||
const listener = Deno.listen({ port: 4502 });
|
|
||||||
|
|
||||||
const client = await Deno.connect({ port: 4502 });
|
|
||||||
await client.write(new TextEncoder().encode(
|
|
||||||
`GET / HTTP/1.0\r\n\r\n`,
|
|
||||||
));
|
|
||||||
|
|
||||||
const conn = await listener.accept();
|
|
||||||
const httpConn = Deno.serveHttp(conn);
|
|
||||||
const ev = await httpConn.nextRequest();
|
|
||||||
const { respondWith } = ev!;
|
|
||||||
|
|
||||||
const tmpFile = await Deno.makeTempFile();
|
|
||||||
const file = await Deno.open(tmpFile, { write: true, read: true });
|
|
||||||
await file.write(new TextEncoder().encode("hello"));
|
|
||||||
|
|
||||||
const reader = await file.readable.getReader();
|
|
||||||
while (true) {
|
|
||||||
const { done, value } = await reader.read();
|
|
||||||
if (done) break;
|
|
||||||
assert(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
await respondWith(new Response(file.readable));
|
|
||||||
fail("The stream should've been locked");
|
|
||||||
} catch {
|
|
||||||
// pass
|
|
||||||
}
|
|
||||||
|
|
||||||
httpConn.close();
|
|
||||||
listener.close();
|
|
||||||
client.close();
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// https://github.com/denoland/deno/issues/11595
|
// https://github.com/denoland/deno/issues/11595
|
||||||
Deno.test(
|
Deno.test(
|
||||||
{ permissions: { net: true } },
|
{ permissions: { net: true } },
|
||||||
|
|
|
@ -32,8 +32,7 @@
|
||||||
} = window.__bootstrap.webSocket;
|
} = window.__bootstrap.webSocket;
|
||||||
const { TcpConn, UnixConn } = window.__bootstrap.net;
|
const { TcpConn, UnixConn } = window.__bootstrap.net;
|
||||||
const { TlsConn } = window.__bootstrap.tls;
|
const { TlsConn } = window.__bootstrap.tls;
|
||||||
const { Deferred, getReadableStreamRid, readableStreamClose } =
|
const { Deferred } = window.__bootstrap.streams;
|
||||||
window.__bootstrap.streams;
|
|
||||||
const {
|
const {
|
||||||
ArrayPrototypeIncludes,
|
ArrayPrototypeIncludes,
|
||||||
ArrayPrototypePush,
|
ArrayPrototypePush,
|
||||||
|
@ -236,6 +235,7 @@
|
||||||
typeof respBody === "string" ||
|
typeof respBody === "string" ||
|
||||||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
|
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
|
||||||
);
|
);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await core.opAsync(
|
await core.opAsync(
|
||||||
"op_http_write_headers",
|
"op_http_write_headers",
|
||||||
|
@ -269,50 +269,35 @@
|
||||||
) {
|
) {
|
||||||
throw new TypeError("Unreachable");
|
throw new TypeError("Unreachable");
|
||||||
}
|
}
|
||||||
const resourceRid = getReadableStreamRid(respBody);
|
const reader = respBody.getReader();
|
||||||
if (resourceRid) {
|
while (true) {
|
||||||
if (respBody.locked) {
|
const { value, done } = await reader.read();
|
||||||
throw new TypeError("ReadableStream is locked.");
|
if (done) break;
|
||||||
|
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
|
||||||
|
await reader.cancel(new TypeError("Value not a Uint8Array"));
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
const _reader = respBody.getReader(); // Aquire JS lock.
|
|
||||||
await core.opAsync(
|
|
||||||
"op_http_write_resource",
|
|
||||||
streamRid,
|
|
||||||
resourceRid,
|
|
||||||
);
|
|
||||||
readableStreamClose(respBody); // Release JS lock.
|
|
||||||
} else {
|
|
||||||
const reader = respBody.getReader();
|
|
||||||
while (true) {
|
|
||||||
const { value, done } = await reader.read();
|
|
||||||
if (done) break;
|
|
||||||
if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) {
|
|
||||||
await reader.cancel(new TypeError("Value not a Uint8Array"));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
await core.opAsync("op_http_write", streamRid, value);
|
|
||||||
} catch (error) {
|
|
||||||
const connError = httpConn[connErrorSymbol];
|
|
||||||
if (
|
|
||||||
ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
|
|
||||||
connError != null
|
|
||||||
) {
|
|
||||||
// deno-lint-ignore no-ex-assign
|
|
||||||
error = new connError.constructor(connError.message);
|
|
||||||
}
|
|
||||||
await reader.cancel(error);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await core.opAsync("op_http_shutdown", streamRid);
|
await core.opAsync("op_http_write", streamRid, value);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
const connError = httpConn[connErrorSymbol];
|
||||||
|
if (
|
||||||
|
ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) &&
|
||||||
|
connError != null
|
||||||
|
) {
|
||||||
|
// deno-lint-ignore no-ex-assign
|
||||||
|
error = new connError.constructor(connError.message);
|
||||||
|
}
|
||||||
await reader.cancel(error);
|
await reader.cancel(error);
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
await core.opAsync("op_http_shutdown", streamRid);
|
||||||
|
} catch (error) {
|
||||||
|
await reader.cancel(error);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const deferred = request[_deferred];
|
const deferred = request[_deferred];
|
||||||
|
|
|
@ -75,7 +75,6 @@ pub fn init() -> Extension {
|
||||||
op_http_read::decl(),
|
op_http_read::decl(),
|
||||||
op_http_write_headers::decl(),
|
op_http_write_headers::decl(),
|
||||||
op_http_write::decl(),
|
op_http_write::decl(),
|
||||||
op_http_write_resource::decl(),
|
|
||||||
op_http_shutdown::decl(),
|
op_http_shutdown::decl(),
|
||||||
op_http_websocket_accept_header::decl(),
|
op_http_websocket_accept_header::decl(),
|
||||||
op_http_upgrade_websocket::decl(),
|
op_http_upgrade_websocket::decl(),
|
||||||
|
@ -665,56 +664,6 @@ async fn op_http_write_headers(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[op]
|
|
||||||
async fn op_http_write_resource(
|
|
||||||
state: Rc<RefCell<OpState>>,
|
|
||||||
rid: ResourceId,
|
|
||||||
stream: ResourceId,
|
|
||||||
) -> Result<(), AnyError> {
|
|
||||||
let http_stream = state
|
|
||||||
.borrow()
|
|
||||||
.resource_table
|
|
||||||
.get::<HttpStreamResource>(rid)?;
|
|
||||||
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
|
|
||||||
let resource = state.borrow().resource_table.get_any(stream)?;
|
|
||||||
loop {
|
|
||||||
let body_tx = match &mut *wr {
|
|
||||||
HttpResponseWriter::Body(body_tx) => body_tx,
|
|
||||||
HttpResponseWriter::Headers(_) => {
|
|
||||||
return Err(http_error("no response headers"))
|
|
||||||
}
|
|
||||||
HttpResponseWriter::Closed => {
|
|
||||||
return Err(http_error("response already completed"))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut vec = vec![0u8; 64 * 1024];
|
|
||||||
let vec_ptr = vec.as_mut_ptr();
|
|
||||||
let buf = ZeroCopyBuf::new_temp(vec);
|
|
||||||
let nread = resource.clone().read(buf).await?;
|
|
||||||
if nread == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// SAFETY: ZeroCopyBuf keeps the Vec<u8> alive.
|
|
||||||
let bytes =
|
|
||||||
Bytes::from_static(unsafe { std::slice::from_raw_parts(vec_ptr, nread) });
|
|
||||||
match body_tx.send_data(bytes).await {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(err) => {
|
|
||||||
// Don't return "channel closed", that's an implementation detail.
|
|
||||||
// Pull up the failure associated with the transport connection instead.
|
|
||||||
assert!(err.is_closed());
|
|
||||||
http_stream.conn.closed().await?;
|
|
||||||
// If there was no connection error, drop body_tx.
|
|
||||||
*wr = HttpResponseWriter::Closed;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
take(&mut *wr);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[op]
|
#[op]
|
||||||
async fn op_http_write(
|
async fn op_http_write(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
((window) => {
|
((window) => {
|
||||||
const core = window.Deno.core;
|
const core = window.Deno.core;
|
||||||
const { BadResourcePrototype, InterruptedPrototype } = core;
|
const { BadResourcePrototype, InterruptedPrototype } = core;
|
||||||
const { WritableStream, readableStreamForRid } = window.__bootstrap.streams;
|
const { ReadableStream, WritableStream } = window.__bootstrap.streams;
|
||||||
const {
|
const {
|
||||||
Error,
|
Error,
|
||||||
ObjectPrototypeIsPrototypeOf,
|
ObjectPrototypeIsPrototypeOf,
|
||||||
|
@ -65,6 +65,8 @@
|
||||||
return core.opAsync("op_dns_resolve", { query, recordType, options });
|
return core.opAsync("op_dns_resolve", { query, recordType, options });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const DEFAULT_CHUNK_SIZE = 64 * 1024;
|
||||||
|
|
||||||
function tryClose(rid) {
|
function tryClose(rid) {
|
||||||
try {
|
try {
|
||||||
core.close(rid);
|
core.close(rid);
|
||||||
|
@ -73,6 +75,32 @@
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function readableStreamForRid(rid) {
|
||||||
|
return new ReadableStream({
|
||||||
|
type: "bytes",
|
||||||
|
async pull(controller) {
|
||||||
|
const v = controller.byobRequest.view;
|
||||||
|
try {
|
||||||
|
const bytesRead = await read(rid, v);
|
||||||
|
if (bytesRead === null) {
|
||||||
|
tryClose(rid);
|
||||||
|
controller.close();
|
||||||
|
controller.byobRequest.respond(0);
|
||||||
|
} else {
|
||||||
|
controller.byobRequest.respond(bytesRead);
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
controller.error(e);
|
||||||
|
tryClose(rid);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
cancel() {
|
||||||
|
tryClose(rid);
|
||||||
|
},
|
||||||
|
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
function writableStreamForRid(rid) {
|
function writableStreamForRid(rid) {
|
||||||
return new WritableStream({
|
return new WritableStream({
|
||||||
async write(chunk, controller) {
|
async write(chunk, controller) {
|
||||||
|
|
|
@ -8,7 +8,6 @@
|
||||||
"use strict";
|
"use strict";
|
||||||
|
|
||||||
((window) => {
|
((window) => {
|
||||||
const core = window.Deno.core;
|
|
||||||
const webidl = window.__bootstrap.webidl;
|
const webidl = window.__bootstrap.webidl;
|
||||||
const { add, remove, signalAbort, newSignal, AbortSignalPrototype } =
|
const { add, remove, signalAbort, newSignal, AbortSignalPrototype } =
|
||||||
window.__bootstrap.abortSignal;
|
window.__bootstrap.abortSignal;
|
||||||
|
@ -641,41 +640,6 @@
|
||||||
return stream[_disturbed];
|
return stream[_disturbed];
|
||||||
}
|
}
|
||||||
|
|
||||||
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
|
|
||||||
|
|
||||||
function readableStreamForRid(rid) {
|
|
||||||
const stream = new ReadableStream({
|
|
||||||
type: "bytes",
|
|
||||||
async pull(controller) {
|
|
||||||
const v = controller.byobRequest.view;
|
|
||||||
try {
|
|
||||||
const bytesRead = await core.read(rid, v);
|
|
||||||
if (bytesRead === 0) {
|
|
||||||
core.tryClose(rid);
|
|
||||||
controller.close();
|
|
||||||
controller.byobRequest.respond(0);
|
|
||||||
} else {
|
|
||||||
controller.byobRequest.respond(bytesRead);
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
controller.error(e);
|
|
||||||
core.tryClose(rid);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
cancel() {
|
|
||||||
core.tryClose(rid);
|
|
||||||
},
|
|
||||||
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
|
|
||||||
});
|
|
||||||
|
|
||||||
stream[_maybeRid] = rid;
|
|
||||||
return stream;
|
|
||||||
}
|
|
||||||
|
|
||||||
function getReadableStreamRid(stream) {
|
|
||||||
return stream[_maybeRid];
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {unknown} value
|
* @param {unknown} value
|
||||||
* @returns {value is WritableStream}
|
* @returns {value is WritableStream}
|
||||||
|
@ -4324,7 +4288,6 @@
|
||||||
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
|
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
const _maybeRid = Symbol("[[maybeRid]]");
|
|
||||||
/** @template R */
|
/** @template R */
|
||||||
class ReadableStream {
|
class ReadableStream {
|
||||||
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
|
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
|
||||||
|
@ -4339,8 +4302,6 @@
|
||||||
[_state];
|
[_state];
|
||||||
/** @type {any} */
|
/** @type {any} */
|
||||||
[_storedError];
|
[_storedError];
|
||||||
/** @type {number | null} */
|
|
||||||
[_maybeRid] = null;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {UnderlyingSource<R>=} underlyingSource
|
* @param {UnderlyingSource<R>=} underlyingSource
|
||||||
|
@ -5879,9 +5840,6 @@
|
||||||
errorReadableStream,
|
errorReadableStream,
|
||||||
createProxy,
|
createProxy,
|
||||||
writableStreamClose,
|
writableStreamClose,
|
||||||
readableStreamClose,
|
|
||||||
readableStreamForRid,
|
|
||||||
getReadableStreamRid,
|
|
||||||
Deferred,
|
Deferred,
|
||||||
// Exposed in global runtime scope
|
// Exposed in global runtime scope
|
||||||
ByteLengthQueuingStrategy,
|
ByteLengthQueuingStrategy,
|
||||||
|
|
|
@ -6,8 +6,8 @@
|
||||||
const { read, readSync, write, writeSync } = window.__bootstrap.io;
|
const { read, readSync, write, writeSync } = window.__bootstrap.io;
|
||||||
const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs;
|
const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs;
|
||||||
const { pathFromURL } = window.__bootstrap.util;
|
const { pathFromURL } = window.__bootstrap.util;
|
||||||
const { writableStreamForRid } = window.__bootstrap.streamUtils;
|
const { readableStreamForRid, writableStreamForRid } =
|
||||||
const { readableStreamForRid } = window.__bootstrap.streams;
|
window.__bootstrap.streamUtils;
|
||||||
const {
|
const {
|
||||||
ArrayPrototypeFilter,
|
ArrayPrototypeFilter,
|
||||||
Error,
|
Error,
|
||||||
|
|
|
@ -14,28 +14,19 @@ use crate::magic::transl8::impl_magic;
|
||||||
pub enum MagicBuffer {
|
pub enum MagicBuffer {
|
||||||
FromV8(ZeroCopyBuf),
|
FromV8(ZeroCopyBuf),
|
||||||
ToV8(Mutex<Option<Box<[u8]>>>),
|
ToV8(Mutex<Option<Box<[u8]>>>),
|
||||||
// Variant of the MagicBuffer than is never exposed to the JS.
|
|
||||||
// Generally used to pass Vec<u8> backed buffers to resource methods.
|
|
||||||
Temp(Vec<u8>),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl_magic!(MagicBuffer);
|
impl_magic!(MagicBuffer);
|
||||||
|
|
||||||
impl MagicBuffer {
|
impl MagicBuffer {
|
||||||
pub fn empty() -> Self {
|
pub fn empty() -> Self {
|
||||||
MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice())))
|
MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice())))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_temp(vec: Vec<u8>) -> Self {
|
|
||||||
MagicBuffer::Temp(vec)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clone for MagicBuffer {
|
impl Clone for MagicBuffer {
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
match self {
|
match self {
|
||||||
Self::FromV8(zbuf) => Self::FromV8(zbuf.clone()),
|
Self::FromV8(zbuf) => Self::FromV8(zbuf.clone()),
|
||||||
Self::Temp(vec) => Self::Temp(vec.clone()),
|
|
||||||
Self::ToV8(_) => panic!("Don't Clone a MagicBuffer sent to v8"),
|
Self::ToV8(_) => panic!("Don't Clone a MagicBuffer sent to v8"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -58,7 +49,6 @@ impl Deref for MagicBuffer {
|
||||||
fn deref(&self) -> &[u8] {
|
fn deref(&self) -> &[u8] {
|
||||||
match self {
|
match self {
|
||||||
Self::FromV8(buf) => &*buf,
|
Self::FromV8(buf) => &*buf,
|
||||||
Self::Temp(vec) => &*vec,
|
|
||||||
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
|
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,7 +58,6 @@ impl DerefMut for MagicBuffer {
|
||||||
fn deref_mut(&mut self) -> &mut [u8] {
|
fn deref_mut(&mut self) -> &mut [u8] {
|
||||||
match self {
|
match self {
|
||||||
Self::FromV8(buf) => &mut *buf,
|
Self::FromV8(buf) => &mut *buf,
|
||||||
Self::Temp(vec) => &mut *vec,
|
|
||||||
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
|
Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -96,7 +85,6 @@ impl ToV8 for MagicBuffer {
|
||||||
let value: &[u8] = buf;
|
let value: &[u8] = buf;
|
||||||
value.into()
|
value.into()
|
||||||
}
|
}
|
||||||
Self::Temp(_) => unreachable!(),
|
|
||||||
Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"),
|
Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue