1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-24 15:19:26 -05:00

fix(cli): remove possible deadlock in test channel (#22662)

The stderr stream could possibly starve the other bits of the
output-redirecting event loop.

Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
Matt Mastracci 2024-03-04 20:14:21 -07:00 committed by GitHub
parent 10557ff15c
commit 72d34a79ac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -5,16 +5,21 @@ use super::TestStdioStream;
use deno_core::futures::future::poll_fn;
use deno_core::parking_lot;
use deno_core::parking_lot::lock_api::RawMutex;
use deno_core::parking_lot::lock_api::RawMutexTimed;
use deno_runtime::deno_io::pipe;
use deno_runtime::deno_io::AsyncPipeRead;
use deno_runtime::deno_io::PipeRead;
use deno_runtime::deno_io::PipeWrite;
use std::fmt::Display;
use std::future::Future;
use std::io::Write;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::ready;
use std::task::Poll;
use std::time::Duration;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::ReadBuf;
@ -143,17 +148,23 @@ impl TestStream {
self.read_opt.is_some()
}
/// Cancellation-safe.
#[inline]
fn pipe(&mut self) -> impl Future<Output = ()> + '_ {
poll_fn(|cx| self.poll_pipe(cx))
}
/// Attempt to read from a given stream, pushing all of the data in it into the given
/// [`UnboundedSender`] before returning.
async fn pipe(&mut self) {
fn poll_pipe(&mut self, cx: &mut std::task::Context) -> Poll<()> {
let mut buffer = [0_u8; BUFFER_SIZE];
let mut buf = ReadBuf::new(&mut buffer);
let res = {
// No more stream, so just return.
// No more stream, we shouldn't hit this case.
let Some(stream) = &mut self.read_opt else {
return;
unreachable!();
};
poll_fn(|cx| Pin::new(&mut *stream).poll_read(cx, &mut buf)).await
ready!(Pin::new(&mut *stream).poll_read(cx, &mut buf))
};
match res {
Ok(_) => {
@ -173,6 +184,7 @@ impl TestStream {
self.read_opt.take();
}
}
Poll::Ready(())
}
/// Read and "block" until the sync markers have been read.
@ -249,11 +261,21 @@ impl TestEventSenderFactory {
let mut test_stderr =
TestStream::new(id, TestStdioStream::Stderr, stderr_reader, sender)?;
// This ensures that the stdout and stderr streams in the select! loop below cannot starve each
// other.
let mut alternate_stream_priority = false;
// This function will be woken whenever a stream or the receiver is ready
loop {
alternate_stream_priority = !alternate_stream_priority;
let (a, b) = if alternate_stream_priority {
(&mut test_stdout, &mut test_stderr)
} else {
(&mut test_stderr, &mut test_stdout)
};
tokio::select! {
_ = test_stdout.pipe(), if test_stdout.is_alive() => {},
_ = test_stderr.pipe(), if test_stdout.is_alive() => {},
biased; // We actually want to poll the channel first
recv = sync_receiver.recv() => {
match recv {
// If the channel closed, we assume that all important data from the streams was synced,
@ -273,6 +295,10 @@ impl TestEventSenderFactory {
}
}
}
// Poll stdout first if `alternate_stream_priority` is true, otherwise poll stderr first.
// This is necessary because of the `biased` flag above to avoid starvation.
_ = a.pipe(), if a.is_alive() => {},
_ = b.pipe(), if b.is_alive() => {},
}
}
@ -377,7 +403,12 @@ impl TestEventSender {
let mutex = parking_lot::RawMutex::INIT;
mutex.lock();
self.sync_sender.send(SendMutex(&mutex as _))?;
mutex.lock();
if !mutex.try_lock_for(Duration::from_secs(30)) {
panic!(
"Test flush deadlock, sender closed = {}",
self.sync_sender.is_closed()
);
}
Ok(())
}
}
@ -444,10 +475,9 @@ mod tests {
}
/// Test that flushing a large number of times doesn't hang.
#[ignore]
#[tokio::test]
async fn test_flush_lots() {
test_util::timeout!(60);
test_util::timeout!(240);
let (mut worker, mut receiver) = create_single_test_event_channel();
let recv_handle = spawn(async move {
let mut queue = vec![];