diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index f48f314db8..37c827b9b2 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -854,45 +854,6 @@ Deno.test({ permissions: { net: true } }, async function httpServerPanic() { listener.close(); }); -Deno.test( - { permissions: { net: true, write: true, read: true } }, - async function httpServerClosedStream() { - const listener = Deno.listen({ port: 4502 }); - - const client = await Deno.connect({ port: 4502 }); - await client.write(new TextEncoder().encode( - `GET / HTTP/1.0\r\n\r\n`, - )); - - const conn = await listener.accept(); - const httpConn = Deno.serveHttp(conn); - const ev = await httpConn.nextRequest(); - const { respondWith } = ev!; - - const tmpFile = await Deno.makeTempFile(); - const file = await Deno.open(tmpFile, { write: true, read: true }); - await file.write(new TextEncoder().encode("hello")); - - const reader = await file.readable.getReader(); - while (true) { - const { done, value } = await reader.read(); - if (done) break; - assert(value); - } - - try { - await respondWith(new Response(file.readable)); - fail("The stream should've been locked"); - } catch { - // pass - } - - httpConn.close(); - listener.close(); - client.close(); - }, -); - // https://github.com/denoland/deno/issues/11595 Deno.test( { permissions: { net: true } }, diff --git a/ext/http/01_http.js b/ext/http/01_http.js index ff4b6f41f7..217bfc0614 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -32,8 +32,7 @@ } = window.__bootstrap.webSocket; const { TcpConn, UnixConn } = window.__bootstrap.net; const { TlsConn } = window.__bootstrap.tls; - const { Deferred, getReadableStreamRid, readableStreamClose } = - window.__bootstrap.streams; + const { Deferred } = window.__bootstrap.streams; const { ArrayPrototypeIncludes, ArrayPrototypePush, @@ -236,6 +235,7 @@ typeof respBody === "string" || ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody) ); + try { await core.opAsync( "op_http_write_headers", @@ -269,50 +269,35 @@ ) { throw new TypeError("Unreachable"); } - const resourceRid = getReadableStreamRid(respBody); - if (resourceRid) { - if (respBody.locked) { - throw new TypeError("ReadableStream is locked."); + const reader = respBody.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { + await reader.cancel(new TypeError("Value not a Uint8Array")); + break; } - const _reader = respBody.getReader(); // Aquire JS lock. - await core.opAsync( - "op_http_write_resource", - streamRid, - resourceRid, - ); - readableStreamClose(respBody); // Release JS lock. - } else { - const reader = respBody.getReader(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { - await reader.cancel(new TypeError("Value not a Uint8Array")); - break; - } - try { - await core.opAsync("op_http_write", streamRid, value); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if ( - ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && - connError != null - ) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - await reader.cancel(error); - throw error; - } - } - try { - await core.opAsync("op_http_shutdown", streamRid); + await core.opAsync("op_http_write", streamRid, value); } catch (error) { + const connError = httpConn[connErrorSymbol]; + if ( + ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && + connError != null + ) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } await reader.cancel(error); throw error; } } + try { + await core.opAsync("op_http_shutdown", streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } } const deferred = request[_deferred]; diff --git a/ext/http/lib.rs b/ext/http/lib.rs index dff5c14cbe..9c0109937a 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -75,7 +75,6 @@ pub fn init() -> Extension { op_http_read::decl(), op_http_write_headers::decl(), op_http_write::decl(), - op_http_write_resource::decl(), op_http_shutdown::decl(), op_http_websocket_accept_header::decl(), op_http_upgrade_websocket::decl(), @@ -665,56 +664,6 @@ async fn op_http_write_headers( } } -#[op] -async fn op_http_write_resource( - state: Rc>, - rid: ResourceId, - stream: ResourceId, -) -> Result<(), AnyError> { - let http_stream = state - .borrow() - .resource_table - .get::(rid)?; - let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; - let resource = state.borrow().resource_table.get_any(stream)?; - loop { - let body_tx = match &mut *wr { - HttpResponseWriter::Body(body_tx) => body_tx, - HttpResponseWriter::Headers(_) => { - return Err(http_error("no response headers")) - } - HttpResponseWriter::Closed => { - return Err(http_error("response already completed")) - } - }; - - let mut vec = vec![0u8; 64 * 1024]; - let vec_ptr = vec.as_mut_ptr(); - let buf = ZeroCopyBuf::new_temp(vec); - let nread = resource.clone().read(buf).await?; - if nread == 0 { - break; - } - // SAFETY: ZeroCopyBuf keeps the Vec alive. - let bytes = - Bytes::from_static(unsafe { std::slice::from_raw_parts(vec_ptr, nread) }); - match body_tx.send_data(bytes).await { - Ok(_) => {} - Err(err) => { - // Don't return "channel closed", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - assert!(err.is_closed()); - http_stream.conn.closed().await?; - // If there was no connection error, drop body_tx. - *wr = HttpResponseWriter::Closed; - } - } - } - - take(&mut *wr); - Ok(()) -} - #[op] async fn op_http_write( state: Rc>, diff --git a/ext/net/01_net.js b/ext/net/01_net.js index fde75fe568..48cbfaaaba 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -4,7 +4,7 @@ ((window) => { const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype } = core; - const { WritableStream, readableStreamForRid } = window.__bootstrap.streams; + const { ReadableStream, WritableStream } = window.__bootstrap.streams; const { Error, ObjectPrototypeIsPrototypeOf, @@ -65,6 +65,8 @@ return core.opAsync("op_dns_resolve", { query, recordType, options }); } + const DEFAULT_CHUNK_SIZE = 64 * 1024; + function tryClose(rid) { try { core.close(rid); @@ -73,6 +75,32 @@ } } + function readableStreamForRid(rid) { + return new ReadableStream({ + type: "bytes", + async pull(controller) { + const v = controller.byobRequest.view; + try { + const bytesRead = await read(rid, v); + if (bytesRead === null) { + tryClose(rid); + controller.close(); + controller.byobRequest.respond(0); + } else { + controller.byobRequest.respond(bytesRead); + } + } catch (e) { + controller.error(e); + tryClose(rid); + } + }, + cancel() { + tryClose(rid); + }, + autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, + }); + } + function writableStreamForRid(rid) { return new WritableStream({ async write(chunk, controller) { diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 4926945633..6daea08983 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -8,7 +8,6 @@ "use strict"; ((window) => { - const core = window.Deno.core; const webidl = window.__bootstrap.webidl; const { add, remove, signalAbort, newSignal, AbortSignalPrototype } = window.__bootstrap.abortSignal; @@ -641,41 +640,6 @@ return stream[_disturbed]; } - const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB - - function readableStreamForRid(rid) { - const stream = new ReadableStream({ - type: "bytes", - async pull(controller) { - const v = controller.byobRequest.view; - try { - const bytesRead = await core.read(rid, v); - if (bytesRead === 0) { - core.tryClose(rid); - controller.close(); - controller.byobRequest.respond(0); - } else { - controller.byobRequest.respond(bytesRead); - } - } catch (e) { - controller.error(e); - core.tryClose(rid); - } - }, - cancel() { - core.tryClose(rid); - }, - autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, - }); - - stream[_maybeRid] = rid; - return stream; - } - - function getReadableStreamRid(stream) { - return stream[_maybeRid]; - } - /** * @param {unknown} value * @returns {value is WritableStream} @@ -4324,7 +4288,6 @@ WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size); } - const _maybeRid = Symbol("[[maybeRid]]"); /** @template R */ class ReadableStream { /** @type {ReadableStreamDefaultController | ReadableByteStreamController} */ @@ -4339,8 +4302,6 @@ [_state]; /** @type {any} */ [_storedError]; - /** @type {number | null} */ - [_maybeRid] = null; /** * @param {UnderlyingSource=} underlyingSource @@ -5879,9 +5840,6 @@ errorReadableStream, createProxy, writableStreamClose, - readableStreamClose, - readableStreamForRid, - getReadableStreamRid, Deferred, // Exposed in global runtime scope ByteLengthQueuingStrategy, diff --git a/runtime/js/40_files.js b/runtime/js/40_files.js index d2148be2fc..8aa0a49723 100644 --- a/runtime/js/40_files.js +++ b/runtime/js/40_files.js @@ -6,8 +6,8 @@ const { read, readSync, write, writeSync } = window.__bootstrap.io; const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs; const { pathFromURL } = window.__bootstrap.util; - const { writableStreamForRid } = window.__bootstrap.streamUtils; - const { readableStreamForRid } = window.__bootstrap.streams; + const { readableStreamForRid, writableStreamForRid } = + window.__bootstrap.streamUtils; const { ArrayPrototypeFilter, Error, diff --git a/serde_v8/magic/buffer.rs b/serde_v8/magic/buffer.rs index a0a1c974bc..484984ac5f 100644 --- a/serde_v8/magic/buffer.rs +++ b/serde_v8/magic/buffer.rs @@ -14,28 +14,19 @@ use crate::magic::transl8::impl_magic; pub enum MagicBuffer { FromV8(ZeroCopyBuf), ToV8(Mutex>>), - // Variant of the MagicBuffer than is never exposed to the JS. - // Generally used to pass Vec backed buffers to resource methods. - Temp(Vec), } - impl_magic!(MagicBuffer); impl MagicBuffer { pub fn empty() -> Self { MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice()))) } - - pub fn new_temp(vec: Vec) -> Self { - MagicBuffer::Temp(vec) - } } impl Clone for MagicBuffer { fn clone(&self) -> Self { match self { Self::FromV8(zbuf) => Self::FromV8(zbuf.clone()), - Self::Temp(vec) => Self::Temp(vec.clone()), Self::ToV8(_) => panic!("Don't Clone a MagicBuffer sent to v8"), } } @@ -58,7 +49,6 @@ impl Deref for MagicBuffer { fn deref(&self) -> &[u8] { match self { Self::FromV8(buf) => &*buf, - Self::Temp(vec) => &*vec, Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"), } } @@ -68,7 +58,6 @@ impl DerefMut for MagicBuffer { fn deref_mut(&mut self) -> &mut [u8] { match self { Self::FromV8(buf) => &mut *buf, - Self::Temp(vec) => &mut *vec, Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"), } } @@ -96,7 +85,6 @@ impl ToV8 for MagicBuffer { let value: &[u8] = buf; value.into() } - Self::Temp(_) => unreachable!(), Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"), };