From 0084f7e85a4a6db0823333a1952c211371025f94 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Wed, 4 May 2022 17:01:51 -0400 Subject: [PATCH] chore: fix flaky `steps_output_within` test (#14479) --- cli/tools/test.rs | 102 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 75 insertions(+), 27 deletions(-) diff --git a/cli/tools/test.rs b/cli/tools/test.rs index d3ab9dbb06..096be383d7 100644 --- a/cli/tools/test.rs +++ b/cli/tools/test.rs @@ -36,6 +36,7 @@ use deno_core::futures::future; use deno_core::futures::stream; use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; +use deno_core::parking_lot::Mutex; use deno_core::serde_json::json; use deno_core::url::Url; use deno_core::ModuleSpecifier; @@ -1449,43 +1450,28 @@ pub async fn run_tests_with_watch( Ok(()) } +#[derive(Clone)] pub struct TestEventSender { sender: UnboundedSender, - stdout_writer: os_pipe::PipeWriter, - stderr_writer: os_pipe::PipeWriter, -} - -impl Clone for TestEventSender { - fn clone(&self) -> Self { - Self { - sender: self.sender.clone(), - stdout_writer: self.stdout_writer.try_clone().unwrap(), - stderr_writer: self.stderr_writer.try_clone().unwrap(), - } - } + stdout_writer: TestOutputPipe, + stderr_writer: TestOutputPipe, } impl TestEventSender { pub fn new(sender: UnboundedSender) -> Self { - let (stdout_reader, stdout_writer) = os_pipe::pipe().unwrap(); - let (stderr_reader, stderr_writer) = os_pipe::pipe().unwrap(); - - start_output_redirect_thread(stdout_reader, sender.clone()); - start_output_redirect_thread(stderr_reader, sender.clone()); - Self { + stdout_writer: TestOutputPipe::new(sender.clone()), + stderr_writer: TestOutputPipe::new(sender.clone()), sender, - stdout_writer, - stderr_writer, } } pub fn stdout(&self) -> std::fs::File { - pipe_writer_to_file(self.stdout_writer.try_clone().unwrap()) + self.stdout_writer.as_file() } pub fn stderr(&self) -> std::fs::File { - pipe_writer_to_file(self.stderr_writer.try_clone().unwrap()) + self.stderr_writer.as_file() } pub fn send(&mut self, message: TestEvent) -> Result<(), AnyError> { @@ -1497,13 +1483,62 @@ impl TestEventSender { | TestEvent::StepWait(_) | TestEvent::StepResult(_, _, _) ) { - self.stdout_writer.flush().unwrap(); - self.stderr_writer.flush().unwrap(); + self.flush_stdout_and_stderr(); } self.sender.send(message)?; Ok(()) } + + fn flush_stdout_and_stderr(&mut self) { + self.stdout_writer.flush(); + self.stderr_writer.flush(); + } +} + +// use a string that if it ends up in the output won't affect how things are displayed +const ZERO_WIDTH_SPACE: &str = "\u{200B}"; + +struct TestOutputPipe { + writer: os_pipe::PipeWriter, + state: Arc>>>, +} + +impl Clone for TestOutputPipe { + fn clone(&self) -> Self { + Self { + writer: self.writer.try_clone().unwrap(), + state: self.state.clone(), + } + } +} + +impl TestOutputPipe { + pub fn new(sender: UnboundedSender) -> Self { + let (reader, writer) = os_pipe::pipe().unwrap(); + let state = Arc::new(Mutex::new(None)); + + start_output_redirect_thread(reader, sender, state.clone()); + + Self { writer, state } + } + + pub fn flush(&mut self) { + // We want to wake up the other thread and have it respond back + // that it's done clearing out its pipe before returning. + let (sender, receiver) = std::sync::mpsc::channel(); + self.state.lock().replace(sender); + // Bit of a hack in order to send a zero width space in order + // to wake the thread up. It seems that sending zero bytes + // does not work here on windows. + self.writer.write_all(ZERO_WIDTH_SPACE.as_bytes()).unwrap(); + self.writer.flush().unwrap(); + receiver.recv().unwrap(); + } + + pub fn as_file(&self) -> std::fs::File { + pipe_writer_to_file(self.writer.try_clone().unwrap()) + } } #[cfg(windows)] @@ -1525,6 +1560,7 @@ fn pipe_writer_to_file(writer: os_pipe::PipeWriter) -> std::fs::File { fn start_output_redirect_thread( mut pipe_reader: os_pipe::PipeReader, sender: UnboundedSender, + flush_state: Arc>>>, ) { tokio::task::spawn_blocking(move || loop { let mut buffer = [0; 512]; @@ -1532,12 +1568,24 @@ fn start_output_redirect_thread( Ok(0) | Err(_) => break, Ok(size) => size, }; + let oneshot_sender = flush_state.lock().take(); + let mut data = &buffer[0..size]; + if data.ends_with(ZERO_WIDTH_SPACE.as_bytes()) { + data = &data[0..data.len() - ZERO_WIDTH_SPACE.len()]; + } - if sender - .send(TestEvent::Output(buffer[0..size].to_vec())) - .is_err() + if !data.is_empty() + && sender + .send(TestEvent::Output(buffer[0..size].to_vec())) + .is_err() { break; } + + // Always respond back if this was set. Ideally we would also check to + // ensure the pipe reader is empty before sending back this response. + if let Some(sender) = oneshot_sender { + let _ignore = sender.send(()); + } }); }