From 2612b6f20fc21fb92402aa9086d13a7192ae3814 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Wed, 20 Apr 2022 22:09:13 +0530 Subject: [PATCH] core: introduce `resource.read_return` (#14331) --- .github/workflows/ci.yml | 6 +++--- core/examples/http_bench_json_ops.rs | 16 ++++++++++---- core/resources.rs | 12 ++++++++++- ext/fetch/lib.rs | 7 ++++-- ext/http/lib.rs | 9 +++----- ext/net/io.rs | 16 +++++++++----- ext/net/ops_tls.rs | 9 +++++--- runtime/ops/io.rs | 32 ++++++++++++++++++---------- serde_v8/magic/buffer.rs | 8 +++++++ 9 files changed, 80 insertions(+), 35 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 26bf1ace8e..1394be10ad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -236,7 +236,7 @@ jobs: ~/.cargo/registry/index ~/.cargo/registry/cache ~/.cargo/git/db - key: 7-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }} + key: 8-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }} # In main branch, always creates fresh cache - name: Cache build output (main) @@ -252,7 +252,7 @@ jobs: !./target/*/*.zip !./target/*/*.tar.gz key: | - 7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }} + 8-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }} # Restore cache from the latest 'main' branch build. - name: Cache build output (PR) @@ -268,7 +268,7 @@ jobs: !./target/*/*.tar.gz key: never_saved restore-keys: | - 7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}- + 8-cargo-target-${{ matrix.os }}-${{ matrix.profile }}- # Don't save cache after building PRs or branches other than 'main'. - name: Skip save cache (PR) diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 2068c3b853..7c895f326f 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -83,13 +83,18 @@ struct TcpStream { } impl TcpStream { - async fn read(self: Rc, mut buf: ZeroCopyBuf) -> Result { + async fn read( + self: Rc, + mut buf: ZeroCopyBuf, + ) -> Result<(usize, ZeroCopyBuf), Error> { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); - rd.read(&mut buf) + let nread = rd + .read(&mut buf) .try_or_cancel(cancel) .await - .map_err(Error::from) + .map_err(Error::from)?; + Ok((nread, buf)) } async fn write(self: Rc, buf: ZeroCopyBuf) -> Result { @@ -99,7 +104,10 @@ impl TcpStream { } impl Resource for TcpStream { - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } diff --git a/core/resources.rs b/core/resources.rs index 9a14473928..ae4ef73944 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -36,7 +36,17 @@ pub trait Resource: Any + 'static { } /// Resources may implement `read()` to be a readable stream - fn read(self: Rc, _buf: ZeroCopyBuf) -> AsyncResult { + fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + Box::pin(async move { + let (nread, _) = self.read_return(buf).await?; + Ok(nread) + }) + } + + fn read_return( + self: Rc, + _buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(futures::future::err(not_supported())) } diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index c216d53fa8..def823d8fa 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -485,12 +485,15 @@ impl Resource for FetchResponseBodyResource { "fetchResponseBody".into() } - fn read(self: Rc, mut buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + mut buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(async move { let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); let read = reader.read(&mut buf).try_or_cancel(cancel).await?; - Ok(read) + Ok((read, buf)) }) } diff --git a/ext/http/lib.rs b/ext/http/lib.rs index dff5c14cbe..b85dcc4736 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -688,16 +688,13 @@ async fn op_http_write_resource( } }; - let mut vec = vec![0u8; 64 * 1024]; - let vec_ptr = vec.as_mut_ptr(); + let vec = vec![0u8; 64 * 1024]; // 64KB let buf = ZeroCopyBuf::new_temp(vec); - let nread = resource.clone().read(buf).await?; + let (nread, buf) = resource.clone().read_return(buf).await?; if nread == 0 { break; } - // SAFETY: ZeroCopyBuf keeps the Vec alive. - let bytes = - Bytes::from_static(unsafe { std::slice::from_raw_parts(vec_ptr, nread) }); + let bytes = Bytes::from(buf.to_temp()); match body_tx.send_data(bytes).await { Ok(_) => {} Err(err) => { diff --git a/ext/net/io.rs b/ext/net/io.rs index 17b86af17e..02caf7473b 100644 --- a/ext/net/io.rs +++ b/ext/net/io.rs @@ -70,13 +70,13 @@ where pub async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut rd = self.rd_borrow_mut().await; let nread = rd .read(&mut buf) .try_or_cancel(self.cancel_handle()) .await?; - Ok(nread) + Ok((nread, buf)) } pub async fn write( @@ -103,7 +103,10 @@ impl Resource for TcpStreamResource { "tcpStream".into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } @@ -160,7 +163,7 @@ impl UnixStreamResource { pub async fn read( self: Rc, _buf: ZeroCopyBuf, - ) -> Result { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { unreachable!() } pub async fn write( @@ -182,7 +185,10 @@ impl Resource for UnixStreamResource { "unixStream".into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index d6b83e6e8b..ca922203cf 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -674,11 +674,11 @@ impl TlsStreamResource { pub async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?; - Ok(nread) + Ok((nread, buf)) } pub async fn write( @@ -722,7 +722,10 @@ impl Resource for TlsStreamResource { "tlsStream".into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 6db5d69a9f..b8449af86b 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -170,13 +170,13 @@ where async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { let mut rd = self.borrow_mut().await; let nread = rd .read(&mut buf) .try_or_cancel(self.cancel_handle()) .await?; - Ok(nread) + Ok((nread, buf)) } } @@ -203,7 +203,10 @@ impl Resource for ChildStdoutResource { "childStdout".into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } @@ -219,7 +222,10 @@ impl Resource for ChildStderrResource { "childStderr".into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } @@ -263,16 +269,17 @@ impl StdFileResource { async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result { + ) -> Result<(usize, ZeroCopyBuf), AnyError> { if self.fs_file.is_some() { let fs_file = self.fs_file.as_ref().unwrap(); let std_file = fs_file.0.as_ref().unwrap().clone(); - tokio::task::spawn_blocking(move || { - let mut std_file = std_file.lock().unwrap(); - std_file.read(&mut buf) - }) + tokio::task::spawn_blocking( + move || -> Result<(usize, ZeroCopyBuf), AnyError> { + let mut std_file = std_file.lock().unwrap(); + Ok((std_file.read(&mut buf)?, buf)) + }, + ) .await? - .map_err(AnyError::from) } else { Err(resource_unavailable()) } @@ -322,7 +329,10 @@ impl Resource for StdFileResource { self.name.as_str().into() } - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { + fn read_return( + self: Rc, + buf: ZeroCopyBuf, + ) -> AsyncResult<(usize, ZeroCopyBuf)> { Box::pin(self.read(buf)) } diff --git a/serde_v8/magic/buffer.rs b/serde_v8/magic/buffer.rs index a0a1c974bc..3a8c9499b8 100644 --- a/serde_v8/magic/buffer.rs +++ b/serde_v8/magic/buffer.rs @@ -29,6 +29,14 @@ impl MagicBuffer { pub fn new_temp(vec: Vec) -> Self { MagicBuffer::Temp(vec) } + + // TODO(@littledivy): Temporary, this needs a refactor. + pub fn to_temp(self) -> Vec { + match self { + MagicBuffer::Temp(vec) => vec, + _ => unreachable!(), + } + } } impl Clone for MagicBuffer {