diff --git a/Cargo.lock b/Cargo.lock index 38646afa65..6477eb63e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1118,6 +1118,7 @@ dependencies = [ "libz-sys", "log", "lsp-types", + "memmem", "monch", "napi_sym", "nix 0.26.2", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 4eb5eebc7a..33ab80b712 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -115,6 +115,7 @@ libc.workspace = true libz-sys.workspace = true log = { workspace = true, features = ["serde"] } lsp-types.workspace = true +memmem.workspace = true monch.workspace = true notify.workspace = true once_cell.workspace = true diff --git a/cli/tools/test/channel.rs b/cli/tools/test/channel.rs index aad3f926e7..780a17de60 100644 --- a/cli/tools/test/channel.rs +++ b/cli/tools/test/channel.rs @@ -10,6 +10,7 @@ use deno_runtime::deno_io::pipe; use deno_runtime::deno_io::AsyncPipeRead; use deno_runtime::deno_io::PipeRead; use deno_runtime::deno_io::PipeWrite; +use memmem::Searcher; use std::fmt::Display; use std::future::Future; use std::io::Write; @@ -30,6 +31,7 @@ use tokio::sync::mpsc::WeakUnboundedSender; /// 8-byte sync marker that is unlikely to appear in normal output. Equivalent /// to the string `"\u{200B}\0\u{200B}\0"`. const SYNC_MARKER: &[u8; 8] = &[226, 128, 139, 0, 226, 128, 139, 0]; +const HALF_SYNC_MARKER: &[u8; 4] = &[226, 128, 139, 0]; const BUFFER_SIZE: usize = 4096; @@ -202,8 +204,30 @@ impl TestStream { } Ok(read) => { flush.extend(&buffer[0..read]); - if flush.ends_with(SYNC_MARKER) { - flush.truncate(flush.len() - SYNC_MARKER.len()); + + // "ends_with" is cheaper, so check that first + if flush.ends_with(HALF_SYNC_MARKER) { + // We might have read the full sync marker. + if flush.ends_with(SYNC_MARKER) { + flush.truncate(flush.len() - SYNC_MARKER.len()); + } else { + flush.truncate(flush.len() - HALF_SYNC_MARKER.len()); + } + // Try to send our flushed buffer. If the channel is closed, this stream will + // be marked as not alive. + _ = self.send(flush); + return; + } + + // If we don't end with the marker, then we need to search the bytes we read plus four bytes + // from before. There's still a possibility that the marker could be split because of a pipe + // buffer that fills up, forcing the flush to be written across two writes and interleaving + // data between, but that's a risk we take with this sync marker approach. + let searcher = memmem::TwoWaySearcher::new(HALF_SYNC_MARKER); + let start = + (flush.len() - read).saturating_sub(HALF_SYNC_MARKER.len()); + if let Some(offset) = searcher.search_in(&flush[start..]) { + flush.truncate(offset); // Try to send our flushed buffer. If the channel is closed, this stream will // be marked as not alive. _ = self.send(flush); diff --git a/tests/specs/test/worker_large_output/__test__.jsonc b/tests/specs/test/worker_large_output/__test__.jsonc new file mode 100644 index 0000000000..9e6533e8c4 --- /dev/null +++ b/tests/specs/test/worker_large_output/__test__.jsonc @@ -0,0 +1,4 @@ +{ + "args": "test main.js", + "output": "main.out" +} diff --git a/tests/specs/test/worker_large_output/main.js b/tests/specs/test/worker_large_output/main.js new file mode 100644 index 0000000000..0941e60f3b --- /dev/null +++ b/tests/specs/test/worker_large_output/main.js @@ -0,0 +1,15 @@ +// Regression test for workers that post large amounts of output as a test is ending. This +// test should not deadlock, though the output is undefined. +Deno.test(async function workerOutput() { + console.log("Booting worker"); + const code = + "self.postMessage(0); console.log(`hello from worker\n`.repeat(60000));"; + const worker = new Worker(URL.createObjectURL(new Blob([code])), { + type: "module", + }); + await new Promise((r) => + worker.addEventListener("message", () => { + r(); + }) + ); +}); diff --git a/tests/specs/test/worker_large_output/main.out b/tests/specs/test/worker_large_output/main.out new file mode 100644 index 0000000000..ed130e0fe9 --- /dev/null +++ b/tests/specs/test/worker_large_output/main.out @@ -0,0 +1,4 @@ +[WILDCARD] + +ok | 1 passed | 0 failed ([WILDCARD]) +