diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 554a3c7297..c1ab2bec30 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -236,7 +236,7 @@ jobs: ~/.cargo/registry/index ~/.cargo/registry/cache ~/.cargo/git/db - key: 7-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }} + key: 8-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }} # In main branch, always creates fresh cache - name: Cache build output (main) @@ -252,7 +252,7 @@ jobs: !./target/*/*.zip !./target/*/*.tar.gz key: | - 7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }} + 8-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }} # Restore cache from the latest 'main' branch build. - name: Cache build output (PR) @@ -268,7 +268,7 @@ jobs: !./target/*/*.tar.gz key: never_saved restore-keys: | - 7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}- + 8-cargo-target-${{ matrix.os }}-${{ matrix.profile }}- # Don't save cache after building PRs or branches other than 'main'. - name: Skip save cache (PR) diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index beb9557814..c55c0372da 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -854,6 +854,47 @@ Deno.test({ permissions: { net: true } }, async function httpServerPanic() { listener.close(); }); +Deno.test( + { permissions: { net: true, write: true, read: true } }, + async function httpServerClosedStream() { + const listener = Deno.listen({ port: 4502 }); + + const client = await Deno.connect({ port: 4502 }); + await client.write(new TextEncoder().encode( + `GET / HTTP/1.0\r\n\r\n`, + )); + + const conn = await listener.accept(); + const httpConn = Deno.serveHttp(conn); + const ev = await httpConn.nextRequest(); + const { respondWith } = ev!; + + const tmpFile = await Deno.makeTempFile(); + const file = await Deno.open(tmpFile, { write: true, read: true }); + await file.write(new TextEncoder().encode("hello")); + + const reader = await file.readable.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + assert(value); + } + + let didThrow = false; + try { + await respondWith(new Response(file.readable)); + } catch { + // pass + didThrow = true; + } + + assert(didThrow); + httpConn.close(); + listener.close(); + client.close(); + }, +); + // https://github.com/denoland/deno/issues/11595 Deno.test( { permissions: { net: true } }, diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 2068c3b853..7c895f326f 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -83,13 +83,18 @@ struct TcpStream { } impl TcpStream { - async fn read(self: Rc, mut buf: ZeroCopyBuf) -> Result { + async fn read( + self: Rc, + mut buf: ZeroCopyBuf, + ) -> Result<(usize, ZeroCopyBuf), Error> { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); - rd.read(&mut buf) + let nread = rd + .read(&mut buf) .try_or_cancel(cancel) .await - .map_err(Error::from) + .map_err(Error::from)?; + Ok((nread, buf)) } async fn write(self: Rc, buf: ZeroCopyBuf) -> Result { @@ -99,7 +104,10 @@ impl TcpStream { } impl Resource for TcpStream { - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } diff --git a/core/resources.rs b/core/resources.rs index 9a14473928..ae4ef73944 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -36,7 +36,17 @@ pub trait Resource: Any + 'static { } /// Resources may implement `read()` to be a readable stream - fn read(self: Rc, _buf: ZeroCopyBuf) -> AsyncResult { + fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + Box::pin(async move { + let (nread, _) = self.read_return(buf).await?; + Ok(nread) + }) + } + + fn read_return( + self: Rc, + _buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(futures::future::err(not_supported())) } diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index c216d53fa8..def823d8fa 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -485,12 +485,15 @@ impl Resource for FetchResponseBodyResource { "fetchResponseBody".into() } - fn read(self: Rc, mut buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + mut buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(async move { let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); let read = reader.read(&mut buf).try_or_cancel(cancel).await?; - Ok(read) + Ok((read, buf)) }) } diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 217bfc0614..152241522c 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -32,7 +32,8 @@ } = window.__bootstrap.webSocket; const { TcpConn, UnixConn } = window.__bootstrap.net; const { TlsConn } = window.__bootstrap.tls; - const { Deferred } = window.__bootstrap.streams; + const { Deferred, getReadableStreamRid, readableStreamClose } = + window.__bootstrap.streams; const { ArrayPrototypeIncludes, ArrayPrototypePush, @@ -235,7 +236,6 @@ typeof respBody === "string" || ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody) ); - try { await core.opAsync( "op_http_write_headers", @@ -269,16 +269,19 @@ ) { throw new TypeError("Unreachable"); } - const reader = respBody.getReader(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { - await reader.cancel(new TypeError("Value not a Uint8Array")); - break; + const resourceRid = getReadableStreamRid(respBody); + if (resourceRid) { + if (respBody.locked) { + throw new TypeError("ReadableStream is locked."); } + const reader = respBody.getReader(); // Aquire JS lock. try { - await core.opAsync("op_http_write", streamRid, value); + await core.opAsync( + "op_http_write_resource", + streamRid, + resourceRid, + ); + readableStreamClose(respBody); // Release JS lock. } catch (error) { const connError = httpConn[connErrorSymbol]; if ( @@ -291,12 +294,37 @@ await reader.cancel(error); throw error; } - } - try { - await core.opAsync("op_http_shutdown", streamRid); - } catch (error) { - await reader.cancel(error); - throw error; + } else { + const reader = respBody.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { + await reader.cancel(new TypeError("Value not a Uint8Array")); + break; + } + try { + await core.opAsync("op_http_write", streamRid, value); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if ( + ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && + connError != null + ) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } + await reader.cancel(error); + throw error; + } + } + + try { + await core.opAsync("op_http_shutdown", streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } } } diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 9c1b48fff2..7fc90843f9 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -77,6 +77,7 @@ pub fn init() -> Extension { op_http_read::decl(), op_http_write_headers::decl(), op_http_write::decl(), + op_http_write_resource::decl(), op_http_shutdown::decl(), op_http_websocket_accept_header::decl(), op_http_upgrade_websocket::decl(), @@ -683,6 +684,63 @@ async fn op_http_write_headers( } } +#[op] +async fn op_http_write_resource( + state: Rc>, + rid: ResourceId, + stream: ResourceId, +) -> Result<(), AnyError> { + let http_stream = state + .borrow() + .resource_table + .get::(rid)?; + let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; + let resource = state.borrow().resource_table.get_any(stream)?; + loop { + let body_writer = match &mut *wr { + HttpResponseWriter::Body(body_writer) => body_writer, + HttpResponseWriter::Headers(_) => { + return Err(http_error("no response headers")) + } + HttpResponseWriter::Closed => { + return Err(http_error("response already completed")) + } + }; + + let vec = vec![0u8; 64 * 1024]; // 64KB + let buf = ZeroCopyBuf::new_temp(vec); + let (nread, buf) = resource.clone().read_return(buf).await?; + if nread == 0 { + break; + } + match body_writer.write_all(&buf[..nread]).await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + http_stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + } + } + } + + let wr = take(&mut *wr); + if let HttpResponseWriter::Body(mut body_writer) = wr { + match body_writer.shutdown().await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + http_stream.conn.closed().await?; + } + } + } + Ok(()) +} + #[op] async fn op_http_write( state: Rc>, diff --git a/ext/net/01_net.js b/ext/net/01_net.js index 48cbfaaaba..fde75fe568 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -4,7 +4,7 @@ ((window) => { const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype } = core; - const { ReadableStream, WritableStream } = window.__bootstrap.streams; + const { WritableStream, readableStreamForRid } = window.__bootstrap.streams; const { Error, ObjectPrototypeIsPrototypeOf, @@ -65,8 +65,6 @@ return core.opAsync("op_dns_resolve", { query, recordType, options }); } - const DEFAULT_CHUNK_SIZE = 64 * 1024; - function tryClose(rid) { try { core.close(rid); @@ -75,32 +73,6 @@ } } - function readableStreamForRid(rid) { - return new ReadableStream({ - type: "bytes", - async pull(controller) { - const v = controller.byobRequest.view; - try { - const bytesRead = await read(rid, v); - if (bytesRead === null) { - tryClose(rid); - controller.close(); - controller.byobRequest.respond(0); - } else { - controller.byobRequest.respond(bytesRead); - } - } catch (e) { - controller.error(e); - tryClose(rid); - } - }, - cancel() { - tryClose(rid); - }, - autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, - }); - } - function writableStreamForRid(rid) { return new WritableStream({ async write(chunk, controller) { diff --git a/ext/net/io.rs b/ext/net/io.rs index 17b86af17e..02caf7473b 100644 --- a/ext/net/io.rs +++ b/ext/net/io.rs @@ -70,13 +70,13 @@ where pub async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut rd = self.rd_borrow_mut().await; let nread = rd .read(&mut buf) .try_or_cancel(self.cancel_handle()) .await?; - Ok(nread) + Ok((nread, buf)) } pub async fn write( @@ -103,7 +103,10 @@ impl Resource for TcpStreamResource { "tcpStream".into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } @@ -160,7 +163,7 @@ impl UnixStreamResource { pub async fn read( self: Rc, _buf: ZeroCopyBuf, - ) -> Result { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { unreachable!() } pub async fn write( @@ -182,7 +185,10 @@ impl Resource for UnixStreamResource { "unixStream".into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index d6b83e6e8b..ca922203cf 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -674,11 +674,11 @@ impl TlsStreamResource { pub async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?; - Ok(nread) + Ok((nread, buf)) } pub async fn write( @@ -722,7 +722,10 @@ impl Resource for TlsStreamResource { "tlsStream".into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 6daea08983..4926945633 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -8,6 +8,7 @@ "use strict"; ((window) => { + const core = window.Deno.core; const webidl = window.__bootstrap.webidl; const { add, remove, signalAbort, newSignal, AbortSignalPrototype } = window.__bootstrap.abortSignal; @@ -640,6 +641,41 @@ return stream[_disturbed]; } + const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB + + function readableStreamForRid(rid) { + const stream = new ReadableStream({ + type: "bytes", + async pull(controller) { + const v = controller.byobRequest.view; + try { + const bytesRead = await core.read(rid, v); + if (bytesRead === 0) { + core.tryClose(rid); + controller.close(); + controller.byobRequest.respond(0); + } else { + controller.byobRequest.respond(bytesRead); + } + } catch (e) { + controller.error(e); + core.tryClose(rid); + } + }, + cancel() { + core.tryClose(rid); + }, + autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, + }); + + stream[_maybeRid] = rid; + return stream; + } + + function getReadableStreamRid(stream) { + return stream[_maybeRid]; + } + /** * @param {unknown} value * @returns {value is WritableStream} @@ -4288,6 +4324,7 @@ WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size); } + const _maybeRid = Symbol("[[maybeRid]]"); /** @template R */ class ReadableStream { /** @type {ReadableStreamDefaultController | ReadableByteStreamController} */ @@ -4302,6 +4339,8 @@ [_state]; /** @type {any} */ [_storedError]; + /** @type {number | null} */ + [_maybeRid] = null; /** * @param {UnderlyingSource=} underlyingSource @@ -5840,6 +5879,9 @@ errorReadableStream, createProxy, writableStreamClose, + readableStreamClose, + readableStreamForRid, + getReadableStreamRid, Deferred, // Exposed in global runtime scope ByteLengthQueuingStrategy, diff --git a/runtime/js/40_files.js b/runtime/js/40_files.js index 8aa0a49723..d2148be2fc 100644 --- a/runtime/js/40_files.js +++ b/runtime/js/40_files.js @@ -6,8 +6,8 @@ const { read, readSync, write, writeSync } = window.__bootstrap.io; const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs; const { pathFromURL } = window.__bootstrap.util; - const { readableStreamForRid, writableStreamForRid } = - window.__bootstrap.streamUtils; + const { writableStreamForRid } = window.__bootstrap.streamUtils; + const { readableStreamForRid } = window.__bootstrap.streams; const { ArrayPrototypeFilter, Error, diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 27a48a9612..34cd541d5c 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -174,13 +174,13 @@ where async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut rd = self.borrow_mut().await; let nread = rd .read(&mut buf) .try_or_cancel(self.cancel_handle()) .await?; - Ok(nread) + Ok((nread, buf)) } pub fn into_inner(self) -> S { @@ -211,7 +211,10 @@ impl Resource for ChildStdoutResource { "childStdout".into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } @@ -227,7 +230,10 @@ impl Resource for ChildStderrResource { "childStderr".into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } @@ -271,16 +277,17 @@ impl StdFileResource { async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { if self.fs_file.is_some() { let fs_file = self.fs_file.as_ref().unwrap(); let std_file = fs_file.0.as_ref().unwrap().clone(); - tokio::task::spawn_blocking(move || { - let mut std_file = std_file.lock().unwrap(); - std_file.read(&mut buf) - }) + tokio::task::spawn_blocking( + move || -> Result<(usize, ZeroCopyBuf), AnyError> { + let mut std_file = std_file.lock().unwrap(); + Ok((std_file.read(&mut buf)?, buf)) + }, + ) .await? - .map_err(AnyError::from) } else { Err(resource_unavailable()) } @@ -330,7 +337,10 @@ impl Resource for StdFileResource { self.name.as_str().into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } diff --git a/serde_v8/magic/buffer.rs b/serde_v8/magic/buffer.rs index 484984ac5f..3a8c9499b8 100644 --- a/serde_v8/magic/buffer.rs +++ b/serde_v8/magic/buffer.rs @@ -14,19 +14,36 @@ use crate::magic::transl8::impl_magic; pub enum MagicBuffer { FromV8(ZeroCopyBuf), ToV8(Mutex>>), + // Variant of the MagicBuffer than is never exposed to the JS. + // Generally used to pass Vec backed buffers to resource methods. + Temp(Vec), } + impl_magic!(MagicBuffer); impl MagicBuffer { pub fn empty() -> Self { MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice()))) } + + pub fn new_temp(vec: Vec) -> Self { + MagicBuffer::Temp(vec) + } + + // TODO(@littledivy): Temporary, this needs a refactor. + pub fn to_temp(self) -> Vec { + match self { + MagicBuffer::Temp(vec) => vec, + _ => unreachable!(), + } + } } impl Clone for MagicBuffer { fn clone(&self) -> Self { match self { Self::FromV8(zbuf) => Self::FromV8(zbuf.clone()), + Self::Temp(vec) => Self::Temp(vec.clone()), Self::ToV8(_) => panic!("Don't Clone a MagicBuffer sent to v8"), } } @@ -49,6 +66,7 @@ impl Deref for MagicBuffer { fn deref(&self) -> &[u8] { match self { Self::FromV8(buf) => &*buf, + Self::Temp(vec) => &*vec, Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"), } } @@ -58,6 +76,7 @@ impl DerefMut for MagicBuffer { fn deref_mut(&mut self) -> &mut [u8] { match self { Self::FromV8(buf) => &mut *buf, + Self::Temp(vec) => &mut *vec, Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"), } } @@ -85,6 +104,7 @@ impl ToV8 for MagicBuffer { let value: &[u8] = buf; value.into() } + Self::Temp(_) => unreachable!(), Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"), };