From 03019e778189b38938f1238f22652162de5a7434 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 21 Apr 2022 02:22:55 +0200 Subject: [PATCH] Revert various PRs related to "ext/http" (#14339) * Revert "feat(ext/http): stream auto resp body compression (#14325)" * Revert "core: introduce `resource.read_return` (#14331)" * Revert "perf(http): optimize `ReadableStream`s backed by a resource (#14284)" --- .github/workflows/ci.yml | 6 +- Cargo.lock | 1 - cli/tests/unit/http_test.ts | 200 ++++------------------- core/examples/http_bench_json_ops.rs | 16 +- core/resources.rs | 12 +- ext/fetch/lib.rs | 7 +- ext/http/01_http.js | 63 +++----- ext/http/Cargo.toml | 1 - ext/http/lib.rs | 227 ++++++++------------------- ext/net/01_net.js | 30 +++- ext/net/io.rs | 16 +- ext/net/ops_tls.rs | 9 +- ext/web/06_streams.js | 42 ----- runtime/js/40_files.js | 4 +- runtime/ops/io.rs | 32 ++-- serde_v8/magic/buffer.rs | 12 -- 16 files changed, 175 insertions(+), 503 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3612b3a3e9..26bf1ace8e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -236,7 +236,7 @@ jobs: ~/.cargo/registry/index ~/.cargo/registry/cache ~/.cargo/git/db - key: 9-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }} + key: 7-cargo-home-${{ matrix.os }}-${{ hashFiles('Cargo.lock') }} # In main branch, always creates fresh cache - name: Cache build output (main) @@ -252,7 +252,7 @@ jobs: !./target/*/*.zip !./target/*/*.tar.gz key: | - 9-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }} + 7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}-${{ github.sha }} # Restore cache from the latest 'main' branch build. - name: Cache build output (PR) @@ -268,7 +268,7 @@ jobs: !./target/*/*.tar.gz key: never_saved restore-keys: | - 9-cargo-target-${{ matrix.os }}-${{ matrix.profile }}- + 7-cargo-target-${{ matrix.os }}-${{ matrix.profile }}- # Don't save cache after building PRs or branches other than 'main'. - name: Skip save cache (PR) diff --git a/Cargo.lock b/Cargo.lock index be295c4c23..e7008286b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -964,7 +964,6 @@ dependencies = [ name = "deno_http" version = "0.41.0" dependencies = [ - "async-compression", "base64 0.13.0", "brotli", "bytes", diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index 5fabd40fea..37c827b9b2 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -854,45 +854,6 @@ Deno.test({ permissions: { net: true } }, async function httpServerPanic() { listener.close(); }); -Deno.test( - { permissions: { net: true, write: true, read: true } }, - async function httpServerClosedStream() { - const listener = Deno.listen({ port: 4502 }); - - const client = await Deno.connect({ port: 4502 }); - await client.write(new TextEncoder().encode( - `GET / HTTP/1.0\r\n\r\n`, - )); - - const conn = await listener.accept(); - const httpConn = Deno.serveHttp(conn); - const ev = await httpConn.nextRequest(); - const { respondWith } = ev!; - - const tmpFile = await Deno.makeTempFile(); - const file = await Deno.open(tmpFile, { write: true, read: true }); - await file.write(new TextEncoder().encode("hello")); - - const reader = await file.readable.getReader(); - while (true) { - const { done, value } = await reader.read(); - if (done) break; - assert(value); - } - - try { - await respondWith(new Response(file.readable)); - fail("The stream should've been locked"); - } catch { - // pass - } - - httpConn.close(); - listener.close(); - client.close(); - }, -); - // https://github.com/denoland/deno/issues/11595 Deno.test( { permissions: { net: true } }, @@ -1224,25 +1185,26 @@ Deno.test( const decoder = new TextDecoder(); Deno.test({ - name: "http server compresses body - check headers", + name: "http server compresses body", permissions: { net: true, run: true }, async fn() { const hostname = "localhost"; const port = 4501; - const listener = Deno.listen({ hostname, port }); - - const data = { hello: "deno", now: "with", compressed: "body" }; async function server() { + const listener = Deno.listen({ hostname, port }); const tcpConn = await listener.accept(); const httpConn = Deno.serveHttp(tcpConn); const e = await httpConn.nextRequest(); assert(e); const { request, respondWith } = e; assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); - const response = new Response(JSON.stringify(data), { - headers: { "content-type": "application/json" }, - }); + const response = new Response( + JSON.stringify({ hello: "deno", now: "with", compressed: "body" }), + { + headers: { "content-type": "application/json" }, + }, + ); await respondWith(response); httpConn.close(); listener.close(); @@ -1273,60 +1235,6 @@ Deno.test({ }, }); -Deno.test({ - name: "http server compresses body - check body", - permissions: { net: true, run: true }, - async fn() { - const hostname = "localhost"; - const port = 4501; - const listener = Deno.listen({ hostname, port }); - - const data = { hello: "deno", now: "with", compressed: "body" }; - - async function server() { - const tcpConn = await listener.accept(); - const httpConn = Deno.serveHttp(tcpConn); - const e = await httpConn.nextRequest(); - assert(e); - const { request, respondWith } = e; - assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); - const response = new Response(JSON.stringify(data), { - headers: { "content-type": "application/json" }, - }); - await respondWith(response); - httpConn.close(); - listener.close(); - } - - async function client() { - const url = `http://${hostname}:${port}/`; - const cmd = [ - "curl", - "--request", - "GET", - "--url", - url, - "--header", - "Accept-Encoding: gzip, deflate, br", - ]; - const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" }); - const status = await proc.status(); - assert(status.success); - const stdout = proc.stdout!.readable - .pipeThrough(new DecompressionStream("gzip")) - .pipeThrough(new TextDecoderStream()); - let body = ""; - for await (const chunk of stdout) { - body += chunk; - } - assertEquals(JSON.parse(body), data); - proc.close(); - } - - await Promise.all([server(), client()]); - }, -}); - Deno.test({ name: "http server doesn't compress small body", permissions: { net: true, run: true }, @@ -1706,18 +1614,15 @@ Deno.test({ }); Deno.test({ - name: "http server compresses streamed bodies - check headers", + name: "http server doesn't compress streamed bodies", permissions: { net: true, run: true }, async fn() { const hostname = "localhost"; const port = 4501; - const encoder = new TextEncoder(); - const listener = Deno.listen({ hostname, port }); - - const data = { hello: "deno", now: "with", compressed: "body" }; - async function server() { + const encoder = new TextEncoder(); + const listener = Deno.listen({ hostname, port }); const tcpConn = await listener.accept(); const httpConn = Deno.serveHttp(tcpConn); const e = await httpConn.nextRequest(); @@ -1726,13 +1631,23 @@ Deno.test({ assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); const bodyInit = new ReadableStream({ start(controller) { - controller.enqueue(encoder.encode(JSON.stringify(data))); + controller.enqueue( + encoder.encode( + JSON.stringify({ + hello: "deno", + now: "with", + compressed: "body", + }), + ), + ); controller.close(); }, }); const response = new Response( bodyInit, - { headers: { "content-type": "application/json" } }, + { + headers: { "content-type": "application/json", vary: "Accept" }, + }, ); await respondWith(response); httpConn.close(); @@ -1755,71 +1670,8 @@ Deno.test({ const status = await proc.status(); assert(status.success); const output = decoder.decode(await proc.output()); - assert(output.includes("vary: Accept-Encoding\r\n")); - assert(output.includes("content-encoding: gzip\r\n")); - proc.close(); - } - - await Promise.all([server(), client()]); - }, -}); - -Deno.test({ - name: "http server compresses streamed bodies - check body", - permissions: { net: true, run: true }, - async fn() { - const hostname = "localhost"; - const port = 4501; - - const encoder = new TextEncoder(); - const listener = Deno.listen({ hostname, port }); - - const data = { hello: "deno", now: "with", compressed: "body" }; - - async function server() { - const tcpConn = await listener.accept(); - const httpConn = Deno.serveHttp(tcpConn); - const e = await httpConn.nextRequest(); - assert(e); - const { request, respondWith } = e; - assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); - const bodyInit = new ReadableStream({ - start(controller) { - controller.enqueue(encoder.encode(JSON.stringify(data))); - controller.close(); - }, - }); - const response = new Response( - bodyInit, - { headers: { "content-type": "application/json" } }, - ); - await respondWith(response); - httpConn.close(); - listener.close(); - } - - async function client() { - const url = `http://${hostname}:${port}/`; - const cmd = [ - "curl", - "--request", - "GET", - "--url", - url, - "--header", - "Accept-Encoding: gzip, deflate, br", - ]; - const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" }); - const status = await proc.status(); - assert(status.success); - const stdout = proc.stdout.readable - .pipeThrough(new DecompressionStream("gzip")) - .pipeThrough(new TextDecoderStream()); - let body = ""; - for await (const chunk of stdout) { - body += chunk; - } - assertEquals(JSON.parse(body), data); + assert(output.includes("vary: Accept\r\n")); + assert(!output.includes("content-encoding: ")); proc.close(); } @@ -1884,6 +1736,8 @@ Deno.test({ // Ensure the content-length header is updated. assert(!output.includes(`content-length: ${contentLength}\r\n`)); assert(output.includes("content-length: 72\r\n")); + console.log(output); + proc.close(); } diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops.rs index 7c895f326f..2068c3b853 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops.rs @@ -83,18 +83,13 @@ struct TcpStream { } impl TcpStream { - async fn read( - self: Rc, - mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), Error> { + async fn read(self: Rc, mut buf: ZeroCopyBuf) -> Result { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); - let nread = rd - .read(&mut buf) + rd.read(&mut buf) .try_or_cancel(cancel) .await - .map_err(Error::from)?; - Ok((nread, buf)) + .map_err(Error::from) } async fn write(self: Rc, buf: ZeroCopyBuf) -> Result { @@ -104,10 +99,7 @@ impl TcpStream { } impl Resource for TcpStream { - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { Box::pin(self.read(buf)) } diff --git a/core/resources.rs b/core/resources.rs index ae4ef73944..9a14473928 100644 --- a/core/resources.rs +++ b/core/resources.rs @@ -36,17 +36,7 @@ pub trait Resource: Any + 'static { } /// Resources may implement `read()` to be a readable stream - fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { - Box::pin(async move { - let (nread, _) = self.read_return(buf).await?; - Ok(nread) - }) - } - - fn read_return( - self: Rc, - _buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc, _buf: ZeroCopyBuf) -> AsyncResult { Box::pin(futures::future::err(not_supported())) } diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index def823d8fa..c216d53fa8 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -485,15 +485,12 @@ impl Resource for FetchResponseBodyResource { "fetchResponseBody".into() } - fn read_return( - self: Rc, - mut buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc, mut buf: ZeroCopyBuf) -> AsyncResult { Box::pin(async move { let mut reader = RcRef::map(&self, |r| &r.reader).borrow_mut().await; let cancel = RcRef::map(self, |r| &r.cancel); let read = reader.read(&mut buf).try_or_cancel(cancel).await?; - Ok((read, buf)) + Ok(read) }) } diff --git a/ext/http/01_http.js b/ext/http/01_http.js index ff4b6f41f7..217bfc0614 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -32,8 +32,7 @@ } = window.__bootstrap.webSocket; const { TcpConn, UnixConn } = window.__bootstrap.net; const { TlsConn } = window.__bootstrap.tls; - const { Deferred, getReadableStreamRid, readableStreamClose } = - window.__bootstrap.streams; + const { Deferred } = window.__bootstrap.streams; const { ArrayPrototypeIncludes, ArrayPrototypePush, @@ -236,6 +235,7 @@ typeof respBody === "string" || ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody) ); + try { await core.opAsync( "op_http_write_headers", @@ -269,50 +269,35 @@ ) { throw new TypeError("Unreachable"); } - const resourceRid = getReadableStreamRid(respBody); - if (resourceRid) { - if (respBody.locked) { - throw new TypeError("ReadableStream is locked."); + const reader = respBody.getReader(); + while (true) { + const { value, done } = await reader.read(); + if (done) break; + if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { + await reader.cancel(new TypeError("Value not a Uint8Array")); + break; } - const _reader = respBody.getReader(); // Aquire JS lock. - await core.opAsync( - "op_http_write_resource", - streamRid, - resourceRid, - ); - readableStreamClose(respBody); // Release JS lock. - } else { - const reader = respBody.getReader(); - while (true) { - const { value, done } = await reader.read(); - if (done) break; - if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, value)) { - await reader.cancel(new TypeError("Value not a Uint8Array")); - break; - } - try { - await core.opAsync("op_http_write", streamRid, value); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if ( - ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && - connError != null - ) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - await reader.cancel(error); - throw error; - } - } - try { - await core.opAsync("op_http_shutdown", streamRid); + await core.opAsync("op_http_write", streamRid, value); } catch (error) { + const connError = httpConn[connErrorSymbol]; + if ( + ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) && + connError != null + ) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); + } await reader.cancel(error); throw error; } } + try { + await core.opAsync("op_http_shutdown", streamRid); + } catch (error) { + await reader.cancel(error); + throw error; + } } const deferred = request[_deferred]; diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index b4a2082282..2bdbfdade7 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -14,7 +14,6 @@ description = "HTTP server implementation for Deno" path = "lib.rs" [dependencies] -async-compression = { version = "0.3.1", features = ["tokio", "brotli", "gzip"] } base64 = "0.13.0" brotli = "3.3.3" bytes = "1" diff --git a/ext/http/lib.rs b/ext/http/lib.rs index a6f47c1c95..9c0109937a 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1,7 +1,6 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. -use async_compression::tokio::write::BrotliEncoder; -use async_compression::tokio::write::GzipEncoder; +use bytes::Bytes; use cache_control::CacheControl; use deno_core::error::custom_error; use deno_core::error::AnyError; @@ -22,6 +21,7 @@ use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op; + use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; @@ -60,9 +60,7 @@ use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; -use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; -use tokio_util::io::ReaderStream; mod compressible; @@ -77,7 +75,6 @@ pub fn init() -> Extension { op_http_read::decl(), op_http_write_headers::decl(), op_http_write::decl(), - op_http_write_resource::decl(), op_http_shutdown::decl(), op_http_websocket_accept_header::decl(), op_http_upgrade_websocket::decl(), @@ -341,7 +338,7 @@ impl Default for HttpRequestReader { /// The write half of an HTTP stream. enum HttpResponseWriter { Headers(oneshot::Sender>), - Body(Pin>), + Body(hyper::body::Sender), Closed, } @@ -548,60 +545,55 @@ async fn op_http_write_headers( let body: Response; let new_wr: HttpResponseWriter; - // Set Vary: Accept-Encoding header for direct body response. - // Note: we set the header irrespective of whether or not we compress the data - // to make sure cache services do not serve uncompressed data to clients that - // support compression. - let vary_value = if let Some(value) = vary_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.to_lowercase().contains("accept-encoding") { - format!("Accept-Encoding, {}", value_str) - } else { - value_str.to_string() - } - } else { - // the header value wasn't valid UTF8, so it would have been a - // problem anyways, so sending a default header. - "Accept-Encoding".to_string() - } - } else { - "Accept-Encoding".to_string() - }; - builder = builder.header("vary", &vary_value); - - let accepts_compression = matches!( - *stream.accept_encoding.borrow(), - Encoding::Brotli | Encoding::Gzip - ); - let should_compress = body_compressible - && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none()) - && accepts_compression; - - if should_compress { - // If user provided a ETag header for uncompressed data, we need to - // ensure it is a Weak Etag header ("W/"). - if let Some(value) = etag_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.starts_with("W/") { - builder = builder.header("etag", format!("W/{}", value_str)); - } else { - builder = builder.header("etag", value.as_slice()); - } - } else { - builder = builder.header("etag", value.as_slice()); - } - } - } else if let Some(value) = etag_header { - builder = builder.header("etag", value.as_slice()); - } - match data { Some(data) => { + // Set Vary: Accept-Encoding header for direct body response. + // Note: we set the header irrespective of whether or not we compress the + // data to make sure cache services do not serve uncompressed data to + // clients that support compression. + let vary_value = if let Some(value) = vary_header { + if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { + if !value_str.to_lowercase().contains("accept-encoding") { + format!("Accept-Encoding, {}", value_str) + } else { + value_str.to_string() + } + } else { + // the header value wasn't valid UTF8, so it would have been a + // problem anyways, so sending a default header. + "Accept-Encoding".to_string() + } + } else { + "Accept-Encoding".to_string() + }; + builder = builder.header("vary", &vary_value); + + let accepts_compression = matches!( + *stream.accept_encoding.borrow(), + Encoding::Brotli | Encoding::Gzip + ); + + let should_compress = + body_compressible && data.len() > 20 && accepts_compression; + if should_compress { // Drop 'content-length' header. Hyper will update it using compressed body. if let Some(headers) = builder.headers_mut() { headers.remove("content-length"); } + // If user provided a ETag header for uncompressed data, we need to + // ensure it is a Weak Etag header ("W/"). + if let Some(value) = etag_header { + if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { + if !value_str.starts_with("W/") { + builder = builder.header("etag", format!("W/{}", value_str)); + } else { + builder = builder.header("etag", value.as_slice()); + } + } else { + builder = builder.header("etag", value.as_slice()); + } + } match *stream.accept_encoding.borrow() { Encoding::Brotli => { @@ -629,6 +621,9 @@ async fn op_http_write_headers( } } } else { + if let Some(value) = etag_header { + builder = builder.header("etag", value.as_slice()); + } // If a buffer was passed, but isn't compressible, we use it to // construct a response body. body = builder.body(data.into_bytes().into())?; @@ -638,35 +633,19 @@ async fn op_http_write_headers( None => { // If no buffer was passed, the caller will stream the response body. - // Create a one way pipe that implements tokio's async io traits. To do - // this we create a [tokio::io::DuplexStream], but then throw away one - // of the directions to create a one way pipe. - let (a, b) = tokio::io::duplex(64 * 1024); - let (reader, _) = tokio::io::split(a); - let (_, writer) = tokio::io::split(b); + // TODO(@kitsonk) had compression for streamed bodies. - let writer_body: Pin>; - - if should_compress { - match *stream.accept_encoding.borrow() { - Encoding::Brotli => { - let writer = BrotliEncoder::new(writer); - writer_body = Box::pin(writer); - builder = builder.header("content-encoding", "br"); - } - _ => { - assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip); - let writer = GzipEncoder::new(writer); - writer_body = Box::pin(writer); - builder = builder.header("content-encoding", "gzip"); - } - } - } else { - writer_body = Box::pin(writer); + // Set the user provided ETag & Vary headers for a streaming response + if let Some(value) = etag_header { + builder = builder.header("etag", value.as_slice()); + } + if let Some(value) = vary_header { + builder = builder.header("vary", value.as_slice()); } - body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?; - new_wr = HttpResponseWriter::Body(writer_body); + let (body_tx, body_rx) = Body::channel(); + body = builder.body(body_rx)?; + new_wr = HttpResponseWriter::Body(body_tx); } } @@ -685,69 +664,6 @@ async fn op_http_write_headers( } } -#[op] -async fn op_http_write_resource( - state: Rc>, - rid: ResourceId, - stream: ResourceId, -) -> Result<(), AnyError> { - let http_stream = state - .borrow() - .resource_table - .get::(rid)?; - let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; - let resource = state.borrow().resource_table.get_any(stream)?; - loop { - let body_writer = match &mut *wr { - HttpResponseWriter::Body(body_writer) => body_writer, - HttpResponseWriter::Headers(_) => { - return Err(http_error("no response headers")) - } - HttpResponseWriter::Closed => { - return Err(http_error("response already completed")) - } - }; - - let vec = vec![0u8; 64 * 1024]; // 64KB - let buf = ZeroCopyBuf::new_temp(vec); - let (nread, buf) = resource.clone().read_return(buf).await?; - if nread == 0 { - break; - } - - let mut res = body_writer.write_all(&buf).await; - if res.is_ok() { - res = body_writer.flush().await; - } - match res { - Ok(_) => {} - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - http_stream.conn.closed().await?; - // If there was no connection error, drop body_tx. - *wr = HttpResponseWriter::Closed; - } - } - } - - let wr = take(&mut *wr); - if let HttpResponseWriter::Body(mut body_writer) = wr { - match body_writer.shutdown().await { - Ok(_) => {} - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - http_stream.conn.closed().await?; - } - } - } - - Ok(()) -} - #[op] async fn op_http_write( state: Rc>, @@ -761,7 +677,7 @@ async fn op_http_write( let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; loop { - let body_writer = match &mut *wr { + let body_tx = match &mut *wr { HttpResponseWriter::Body(body_tx) => body_tx, HttpResponseWriter::Headers(_) => { break Err(http_error("no response headers")) @@ -771,17 +687,13 @@ async fn op_http_write( } }; - let mut res = body_writer.write_all(&buf).await; - if res.is_ok() { - res = body_writer.flush().await; - } - - match res { + let bytes = Bytes::copy_from_slice(&buf[..]); + match body_tx.send_data(bytes).await { Ok(_) => break Ok(()), Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. + // Don't return "channel closed", that's an implementation detail. // Pull up the failure associated with the transport connection instead. + assert!(err.is_closed()); stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; @@ -803,18 +715,7 @@ async fn op_http_shutdown( .resource_table .get::(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - let wr = take(&mut *wr); - if let HttpResponseWriter::Body(mut body_writer) = wr { - match body_writer.shutdown().await { - Ok(_) => {} - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - stream.conn.closed().await?; - } - } - } + take(&mut *wr); Ok(()) } diff --git a/ext/net/01_net.js b/ext/net/01_net.js index fde75fe568..48cbfaaaba 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -4,7 +4,7 @@ ((window) => { const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype } = core; - const { WritableStream, readableStreamForRid } = window.__bootstrap.streams; + const { ReadableStream, WritableStream } = window.__bootstrap.streams; const { Error, ObjectPrototypeIsPrototypeOf, @@ -65,6 +65,8 @@ return core.opAsync("op_dns_resolve", { query, recordType, options }); } + const DEFAULT_CHUNK_SIZE = 64 * 1024; + function tryClose(rid) { try { core.close(rid); @@ -73,6 +75,32 @@ } } + function readableStreamForRid(rid) { + return new ReadableStream({ + type: "bytes", + async pull(controller) { + const v = controller.byobRequest.view; + try { + const bytesRead = await read(rid, v); + if (bytesRead === null) { + tryClose(rid); + controller.close(); + controller.byobRequest.respond(0); + } else { + controller.byobRequest.respond(bytesRead); + } + } catch (e) { + controller.error(e); + tryClose(rid); + } + }, + cancel() { + tryClose(rid); + }, + autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, + }); + } + function writableStreamForRid(rid) { return new WritableStream({ async write(chunk, controller) { diff --git a/ext/net/io.rs b/ext/net/io.rs index 02caf7473b..17b86af17e 100644 --- a/ext/net/io.rs +++ b/ext/net/io.rs @@ -70,13 +70,13 @@ where pub async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + ) -> Result { let mut rd = self.rd_borrow_mut().await; let nread = rd .read(&mut buf) .try_or_cancel(self.cancel_handle()) .await?; - Ok((nread, buf)) + Ok(nread) } pub async fn write( @@ -103,10 +103,7 @@ impl Resource for TcpStreamResource { "tcpStream".into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { Box::pin(self.read(buf)) } @@ -163,7 +160,7 @@ impl UnixStreamResource { pub async fn read( self: Rc, _buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + ) -> Result { unreachable!() } pub async fn write( @@ -185,10 +182,7 @@ impl Resource for UnixStreamResource { "unixStream".into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { Box::pin(self.read(buf)) } diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index ca922203cf..d6b83e6e8b 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -674,11 +674,11 @@ impl TlsStreamResource { pub async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + ) -> Result { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle); let nread = rd.read(&mut buf).try_or_cancel(cancel_handle).await?; - Ok((nread, buf)) + Ok(nread) } pub async fn write( @@ -722,10 +722,7 @@ impl Resource for TlsStreamResource { "tlsStream".into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { Box::pin(self.read(buf)) } diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 4926945633..6daea08983 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -8,7 +8,6 @@ "use strict"; ((window) => { - const core = window.Deno.core; const webidl = window.__bootstrap.webidl; const { add, remove, signalAbort, newSignal, AbortSignalPrototype } = window.__bootstrap.abortSignal; @@ -641,41 +640,6 @@ return stream[_disturbed]; } - const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB - - function readableStreamForRid(rid) { - const stream = new ReadableStream({ - type: "bytes", - async pull(controller) { - const v = controller.byobRequest.view; - try { - const bytesRead = await core.read(rid, v); - if (bytesRead === 0) { - core.tryClose(rid); - controller.close(); - controller.byobRequest.respond(0); - } else { - controller.byobRequest.respond(bytesRead); - } - } catch (e) { - controller.error(e); - core.tryClose(rid); - } - }, - cancel() { - core.tryClose(rid); - }, - autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, - }); - - stream[_maybeRid] = rid; - return stream; - } - - function getReadableStreamRid(stream) { - return stream[_maybeRid]; - } - /** * @param {unknown} value * @returns {value is WritableStream} @@ -4324,7 +4288,6 @@ WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size); } - const _maybeRid = Symbol("[[maybeRid]]"); /** @template R */ class ReadableStream { /** @type {ReadableStreamDefaultController | ReadableByteStreamController} */ @@ -4339,8 +4302,6 @@ [_state]; /** @type {any} */ [_storedError]; - /** @type {number | null} */ - [_maybeRid] = null; /** * @param {UnderlyingSource=} underlyingSource @@ -5879,9 +5840,6 @@ errorReadableStream, createProxy, writableStreamClose, - readableStreamClose, - readableStreamForRid, - getReadableStreamRid, Deferred, // Exposed in global runtime scope ByteLengthQueuingStrategy, diff --git a/runtime/js/40_files.js b/runtime/js/40_files.js index d2148be2fc..8aa0a49723 100644 --- a/runtime/js/40_files.js +++ b/runtime/js/40_files.js @@ -6,8 +6,8 @@ const { read, readSync, write, writeSync } = window.__bootstrap.io; const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs; const { pathFromURL } = window.__bootstrap.util; - const { writableStreamForRid } = window.__bootstrap.streamUtils; - const { readableStreamForRid } = window.__bootstrap.streams; + const { readableStreamForRid, writableStreamForRid } = + window.__bootstrap.streamUtils; const { ArrayPrototypeFilter, Error, diff --git a/runtime/ops/io.rs b/runtime/ops/io.rs index 34cd541d5c..27a48a9612 100644 --- a/runtime/ops/io.rs +++ b/runtime/ops/io.rs @@ -174,13 +174,13 @@ where async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + ) -> Result { let mut rd = self.borrow_mut().await; let nread = rd .read(&mut buf) .try_or_cancel(self.cancel_handle()) .await?; - Ok((nread, buf)) + Ok(nread) } pub fn into_inner(self) -> S { @@ -211,10 +211,7 @@ impl Resource for ChildStdoutResource { "childStdout".into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { Box::pin(self.read(buf)) } @@ -230,10 +227,7 @@ impl Resource for ChildStderrResource { "childStderr".into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { Box::pin(self.read(buf)) } @@ -277,17 +271,16 @@ impl StdFileResource { async fn read( self: Rc, mut buf: ZeroCopyBuf, - ) -> Result<(usize, ZeroCopyBuf), AnyError> { + ) -> Result { if self.fs_file.is_some() { let fs_file = self.fs_file.as_ref().unwrap(); let std_file = fs_file.0.as_ref().unwrap().clone(); - tokio::task::spawn_blocking( - move || -> Result<(usize, ZeroCopyBuf), AnyError> { - let mut std_file = std_file.lock().unwrap(); - Ok((std_file.read(&mut buf)?, buf)) - }, - ) + tokio::task::spawn_blocking(move || { + let mut std_file = std_file.lock().unwrap(); + std_file.read(&mut buf) + }) .await? + .map_err(AnyError::from) } else { Err(resource_unavailable()) } @@ -337,10 +330,7 @@ impl Resource for StdFileResource { self.name.as_str().into() } - fn read_return( - self: Rc, - buf: ZeroCopyBuf, - ) -> AsyncResult<(usize, ZeroCopyBuf)> { + fn read(self: Rc, buf: ZeroCopyBuf) -> AsyncResult { Box::pin(self.read(buf)) } diff --git a/serde_v8/magic/buffer.rs b/serde_v8/magic/buffer.rs index a0a1c974bc..484984ac5f 100644 --- a/serde_v8/magic/buffer.rs +++ b/serde_v8/magic/buffer.rs @@ -14,28 +14,19 @@ use crate::magic::transl8::impl_magic; pub enum MagicBuffer { FromV8(ZeroCopyBuf), ToV8(Mutex>>), - // Variant of the MagicBuffer than is never exposed to the JS. - // Generally used to pass Vec backed buffers to resource methods. - Temp(Vec), } - impl_magic!(MagicBuffer); impl MagicBuffer { pub fn empty() -> Self { MagicBuffer::ToV8(Mutex::new(Some(vec![0_u8; 0].into_boxed_slice()))) } - - pub fn new_temp(vec: Vec) -> Self { - MagicBuffer::Temp(vec) - } } impl Clone for MagicBuffer { fn clone(&self) -> Self { match self { Self::FromV8(zbuf) => Self::FromV8(zbuf.clone()), - Self::Temp(vec) => Self::Temp(vec.clone()), Self::ToV8(_) => panic!("Don't Clone a MagicBuffer sent to v8"), } } @@ -58,7 +49,6 @@ impl Deref for MagicBuffer { fn deref(&self) -> &[u8] { match self { Self::FromV8(buf) => &*buf, - Self::Temp(vec) => &*vec, Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"), } } @@ -68,7 +58,6 @@ impl DerefMut for MagicBuffer { fn deref_mut(&mut self) -> &mut [u8] { match self { Self::FromV8(buf) => &mut *buf, - Self::Temp(vec) => &mut *vec, Self::ToV8(_) => panic!("Don't Deref a MagicBuffer sent to v8"), } } @@ -96,7 +85,6 @@ impl ToV8 for MagicBuffer { let value: &[u8] = buf; value.into() } - Self::Temp(_) => unreachable!(), Self::ToV8(x) => x.lock().unwrap().take().expect("MagicBuffer was empty"), };