diff --git a/cli/tools/test/channel.rs b/cli/tools/test/channel.rs index ff1d4f8f29..611310538c 100644 --- a/cli/tools/test/channel.rs +++ b/cli/tools/test/channel.rs @@ -5,16 +5,21 @@ use super::TestStdioStream; use deno_core::futures::future::poll_fn; use deno_core::parking_lot; use deno_core::parking_lot::lock_api::RawMutex; +use deno_core::parking_lot::lock_api::RawMutexTimed; use deno_runtime::deno_io::pipe; use deno_runtime::deno_io::AsyncPipeRead; use deno_runtime::deno_io::PipeRead; use deno_runtime::deno_io::PipeWrite; use std::fmt::Display; +use std::future::Future; use std::io::Write; use std::pin::Pin; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::task::ready; +use std::task::Poll; +use std::time::Duration; use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::ReadBuf; @@ -143,17 +148,23 @@ impl TestStream { self.read_opt.is_some() } + /// Cancellation-safe. + #[inline] + fn pipe(&mut self) -> impl Future + '_ { + poll_fn(|cx| self.poll_pipe(cx)) + } + /// Attempt to read from a given stream, pushing all of the data in it into the given /// [`UnboundedSender`] before returning. - async fn pipe(&mut self) { + fn poll_pipe(&mut self, cx: &mut std::task::Context) -> Poll<()> { let mut buffer = [0_u8; BUFFER_SIZE]; let mut buf = ReadBuf::new(&mut buffer); let res = { - // No more stream, so just return. + // No more stream, we shouldn't hit this case. let Some(stream) = &mut self.read_opt else { - return; + unreachable!(); }; - poll_fn(|cx| Pin::new(&mut *stream).poll_read(cx, &mut buf)).await + ready!(Pin::new(&mut *stream).poll_read(cx, &mut buf)) }; match res { Ok(_) => { @@ -173,6 +184,7 @@ impl TestStream { self.read_opt.take(); } } + Poll::Ready(()) } /// Read and "block" until the sync markers have been read. @@ -249,11 +261,21 @@ impl TestEventSenderFactory { let mut test_stderr = TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?; + // This ensures that the stdout and stderr streams in the select! loop below cannot starve each + // other. + let mut alternate_stream_priority = false; + // This function will be woken whenever a stream or the receiver is ready loop { + alternate_stream_priority = !alternate_stream_priority; + let (a, b) = if alternate_stream_priority { + (&mut test_stdout, &mut test_stderr) + } else { + (&mut test_stderr, &mut test_stdout) + }; + tokio::select! { - _ = test_stdout.pipe(), if test_stdout.is_alive() => {}, - _ = test_stderr.pipe(), if test_stdout.is_alive() => {}, + biased; // We actually want to poll the channel first recv = sync_receiver.recv() => { match recv { // If the channel closed, we assume that all important data from the streams was synced, @@ -273,6 +295,10 @@ impl TestEventSenderFactory { } } } + // Poll stdout first if `alternate_stream_priority` is true, otherwise poll stderr first. + // This is necessary because of the `biased` flag above to avoid starvation. + _ = a.pipe(), if a.is_alive() => {}, + _ = b.pipe(), if b.is_alive() => {}, } } @@ -377,7 +403,12 @@ impl TestEventSender { let mutex = parking_lot::RawMutex::INIT; mutex.lock(); self.sync_sender.send(SendMutex(&mutex as _))?; - mutex.lock(); + if !mutex.try_lock_for(Duration::from_secs(30)) { + panic!( + "Test flush deadlock, sender closed = {}", + self.sync_sender.is_closed() + ); + } Ok(()) } } @@ -444,10 +475,9 @@ mod tests { } /// Test that flushing a large number of times doesn't hang. - #[ignore] #[tokio::test] async fn test_flush_lots() { - test_util::timeout!(60); + test_util::timeout!(240); let (mut worker, mut receiver) = create_single_test_event_channel(); let recv_handle = spawn(async move { let mut queue = vec![];