1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

chore: fix flaky steps_output_within test (#14479)

This commit is contained in:
David Sherret 2022-05-04 17:01:51 -04:00 committed by cjihrig
parent fbfb3d6061
commit 0084f7e85a
No known key found for this signature in database
GPG key ID: 7434390BDBE9B9C5

View file

@ -36,6 +36,7 @@ use deno_core::futures::future;
use deno_core::futures::stream; use deno_core::futures::stream;
use deno_core::futures::FutureExt; use deno_core::futures::FutureExt;
use deno_core::futures::StreamExt; use deno_core::futures::StreamExt;
use deno_core::parking_lot::Mutex;
use deno_core::serde_json::json; use deno_core::serde_json::json;
use deno_core::url::Url; use deno_core::url::Url;
use deno_core::ModuleSpecifier; use deno_core::ModuleSpecifier;
@ -1449,43 +1450,28 @@ pub async fn run_tests_with_watch(
Ok(()) Ok(())
} }
#[derive(Clone)]
pub struct TestEventSender { pub struct TestEventSender {
sender: UnboundedSender<TestEvent>, sender: UnboundedSender<TestEvent>,
stdout_writer: os_pipe::PipeWriter, stdout_writer: TestOutputPipe,
stderr_writer: os_pipe::PipeWriter, stderr_writer: TestOutputPipe,
}
impl Clone for TestEventSender {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
stdout_writer: self.stdout_writer.try_clone().unwrap(),
stderr_writer: self.stderr_writer.try_clone().unwrap(),
}
}
} }
impl TestEventSender { impl TestEventSender {
pub fn new(sender: UnboundedSender<TestEvent>) -> Self { pub fn new(sender: UnboundedSender<TestEvent>) -> Self {
let (stdout_reader, stdout_writer) = os_pipe::pipe().unwrap();
let (stderr_reader, stderr_writer) = os_pipe::pipe().unwrap();
start_output_redirect_thread(stdout_reader, sender.clone());
start_output_redirect_thread(stderr_reader, sender.clone());
Self { Self {
stdout_writer: TestOutputPipe::new(sender.clone()),
stderr_writer: TestOutputPipe::new(sender.clone()),
sender, sender,
stdout_writer,
stderr_writer,
} }
} }
pub fn stdout(&self) -> std::fs::File { pub fn stdout(&self) -> std::fs::File {
pipe_writer_to_file(self.stdout_writer.try_clone().unwrap()) self.stdout_writer.as_file()
} }
pub fn stderr(&self) -> std::fs::File { pub fn stderr(&self) -> std::fs::File {
pipe_writer_to_file(self.stderr_writer.try_clone().unwrap()) self.stderr_writer.as_file()
} }
pub fn send(&mut self, message: TestEvent) -> Result<(), AnyError> { pub fn send(&mut self, message: TestEvent) -> Result<(), AnyError> {
@ -1497,13 +1483,62 @@ impl TestEventSender {
| TestEvent::StepWait(_) | TestEvent::StepWait(_)
| TestEvent::StepResult(_, _, _) | TestEvent::StepResult(_, _, _)
) { ) {
self.stdout_writer.flush().unwrap(); self.flush_stdout_and_stderr();
self.stderr_writer.flush().unwrap();
} }
self.sender.send(message)?; self.sender.send(message)?;
Ok(()) Ok(())
} }
fn flush_stdout_and_stderr(&mut self) {
self.stdout_writer.flush();
self.stderr_writer.flush();
}
}
// use a string that if it ends up in the output won't affect how things are displayed
const ZERO_WIDTH_SPACE: &str = "\u{200B}";
struct TestOutputPipe {
writer: os_pipe::PipeWriter,
state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
}
impl Clone for TestOutputPipe {
fn clone(&self) -> Self {
Self {
writer: self.writer.try_clone().unwrap(),
state: self.state.clone(),
}
}
}
impl TestOutputPipe {
pub fn new(sender: UnboundedSender<TestEvent>) -> Self {
let (reader, writer) = os_pipe::pipe().unwrap();
let state = Arc::new(Mutex::new(None));
start_output_redirect_thread(reader, sender, state.clone());
Self { writer, state }
}
pub fn flush(&mut self) {
// We want to wake up the other thread and have it respond back
// that it's done clearing out its pipe before returning.
let (sender, receiver) = std::sync::mpsc::channel();
self.state.lock().replace(sender);
// Bit of a hack in order to send a zero width space in order
// to wake the thread up. It seems that sending zero bytes
// does not work here on windows.
self.writer.write_all(ZERO_WIDTH_SPACE.as_bytes()).unwrap();
self.writer.flush().unwrap();
receiver.recv().unwrap();
}
pub fn as_file(&self) -> std::fs::File {
pipe_writer_to_file(self.writer.try_clone().unwrap())
}
} }
#[cfg(windows)] #[cfg(windows)]
@ -1525,6 +1560,7 @@ fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File {
fn start_output_redirect_thread( fn start_output_redirect_thread(
mut pipe_reader: os_pipe::PipeReader, mut pipe_reader: os_pipe::PipeReader,
sender: UnboundedSender<TestEvent>, sender: UnboundedSender<TestEvent>,
flush_state: Arc<Mutex<Option<std::sync::mpsc::Sender<()>>>>,
) { ) {
tokio::task::spawn_blocking(move || loop { tokio::task::spawn_blocking(move || loop {
let mut buffer = [0; 512]; let mut buffer = [0; 512];
@ -1532,12 +1568,24 @@ fn start_output_redirect_thread(
Ok(0) | Err(_) => break, Ok(0) | Err(_) => break,
Ok(size) => size, Ok(size) => size,
}; };
let oneshot_sender = flush_state.lock().take();
let mut data = &buffer[0..size];
if data.ends_with(ZERO_WIDTH_SPACE.as_bytes()) {
data = &data[0..data.len() - ZERO_WIDTH_SPACE.len()];
}
if sender if !data.is_empty()
.send(TestEvent::Output(buffer[0..size].to_vec())) && sender
.is_err() .send(TestEvent::Output(buffer[0..size].to_vec()))
.is_err()
{ {
break; break;
} }
// Always respond back if this was set. Ideally we would also check to
// ensure the pipe reader is empty before sending back this response.
if let Some(sender) = oneshot_sender {
let _ignore = sender.send(());
}
}); });
} }