From f530189c500b9f12c4c94e88eca23eab9ae0d970 Mon Sep 17 00:00:00 2001 From: David Sherret Date: Thu, 9 Dec 2021 20:24:37 -0500 Subject: [PATCH] fix(watch): mitigate race condition between file write by other process and watch read (#13038) --- cli/file_watcher.rs | 86 +++++++----------- cli/ops/runtime_compiler.rs | 4 +- cli/tests/integration/watcher_tests.rs | 115 +++++++++++-------------- cli/tools/fmt.rs | 4 +- 4 files changed, 89 insertions(+), 120 deletions(-) diff --git a/cli/file_watcher.rs b/cli/file_watcher.rs index c7fff4632d..84b30dfd26 100644 --- a/cli/file_watcher.rs +++ b/cli/file_watcher.rs @@ -4,9 +4,7 @@ use crate::colors; use crate::fs_util::canonicalize_path; use deno_core::error::AnyError; -use deno_core::futures::stream::{Stream, StreamExt}; use deno_core::futures::Future; -use deno_core::parking_lot::Mutex; use log::info; use notify::event::Event as NotifyEvent; use notify::event::EventKind; @@ -15,57 +13,42 @@ use notify::Error as NotifyError; use notify::RecommendedWatcher; use notify::RecursiveMode; use notify::Watcher; -use pin_project::pin_project; use std::collections::HashSet; use std::path::PathBuf; -use std::pin::Pin; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; use std::time::Duration; -use tokio::pin; use tokio::select; +use tokio::sync::mpsc; use tokio::time::sleep; -use tokio::time::Instant; -use tokio::time::Sleep; const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200); -#[pin_project(project = DebounceProjection)] -struct Debounce { - #[pin] - timer: Sleep, - changed_paths: Arc>>, +struct DebouncedReceiver { + receiver: mpsc::UnboundedReceiver>, } -impl Debounce { - fn new() -> Self { - Self { - timer: sleep(DEBOUNCE_INTERVAL), - changed_paths: Arc::new(Mutex::new(HashSet::new())), - } +impl DebouncedReceiver { + fn new_with_sender() -> (Arc>>, Self) { + let (sender, receiver) = mpsc::unbounded_channel(); + (Arc::new(sender), Self { receiver }) } -} -impl Stream for Debounce { - type Item = Vec; - - /// Note that this never returns `Poll::Ready(None)`, which means that the - /// file watcher will be alive until the Deno process is terminated. - fn poll_next( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll> { - let mut changed_paths = self.changed_paths.lock(); - if changed_paths.len() > 0 { - Poll::Ready(Some(changed_paths.drain().collect())) - } else { - drop(changed_paths); - let mut timer = self.project().timer; - if timer.as_mut().poll(cx).is_ready() { - timer.reset(Instant::now() + DEBOUNCE_INTERVAL); + async fn recv(&mut self) -> Option> { + let mut received_items = self + .receiver + .recv() + .await? + .into_iter() + .collect::>(); // prevent duplicates + loop { + tokio::select! { + items = self.receiver.recv() => { + received_items.extend(items?); + } + _ = sleep(DEBOUNCE_INTERVAL) => { + return Some(received_items.into_iter().collect()); + } } - Poll::Pending } } } @@ -91,14 +74,14 @@ pub enum ResolutionResult { async fn next_restart( resolver: &mut R, - debounce: &mut Pin<&mut Debounce>, + debounced_receiver: &mut DebouncedReceiver, ) -> (Vec, Result) where R: FnMut(Option>) -> F, F: Future>, { loop { - let changed = debounce.next().await; + let changed = debounced_receiver.recv().await; match resolver(changed).await { ResolutionResult::Ignore => { log::debug!("File change ignored") @@ -140,8 +123,7 @@ where F1: Future>, F2: Future>, { - let debounce = Debounce::new(); - pin!(debounce); + let (sender, mut receiver) = DebouncedReceiver::new_with_sender(); // Store previous data. If module resolution fails at some point, the watcher will try to // continue watching files using these data. @@ -161,7 +143,7 @@ where colors::intense_blue("Watcher"), ); - let (paths, result) = next_restart(&mut resolver, &mut debounce).await; + let (paths, result) = next_restart(&mut resolver, &mut receiver).await; paths_to_watch = paths; resolution_result = result; } @@ -175,13 +157,13 @@ where }; loop { - let watcher = new_watcher(&paths_to_watch, &debounce)?; + let watcher = new_watcher(&paths_to_watch, sender.clone())?; match resolution_result { Ok(operation_arg) => { let fut = error_handler(operation(operation_arg)); select! { - (paths, result) = next_restart(&mut resolver, &mut debounce) => { + (paths, result) = next_restart(&mut resolver, &mut receiver) => { if result.is_ok() { paths_to_watch = paths; } @@ -207,7 +189,7 @@ where } } - let (paths, result) = next_restart(&mut resolver, &mut debounce).await; + let (paths, result) = next_restart(&mut resolver, &mut receiver).await; if result.is_ok() { paths_to_watch = paths; } @@ -219,10 +201,8 @@ where fn new_watcher( paths: &[PathBuf], - debounce: &Debounce, + sender: Arc>>, ) -> Result { - let changed_paths = Arc::clone(&debounce.changed_paths); - let mut watcher: RecommendedWatcher = Watcher::new(move |res: Result| { if let Ok(event) = res { @@ -233,9 +213,9 @@ fn new_watcher( let paths = event .paths .iter() - .filter_map(|path| canonicalize_path(path).ok()); - let mut changed_paths = changed_paths.lock(); - changed_paths.extend(paths); + .filter_map(|path| canonicalize_path(path).ok()) + .collect(); + sender.send(paths).unwrap(); } } })?; diff --git a/cli/ops/runtime_compiler.rs b/cli/ops/runtime_compiler.rs index d79a5fe31f..af99f12caa 100644 --- a/cli/ops/runtime_compiler.rs +++ b/cli/ops/runtime_compiler.rs @@ -166,7 +166,9 @@ async fn op_emit( args.import_map_path { let import_map_specifier = resolve_url_or_path(&import_map_str) - .context(format!("Bad URL (\"{}\") for import map.", import_map_str))?; + .with_context(|| { + format!("Bad URL (\"{}\") for import map.", import_map_str) + })?; let import_map = if let Some(value) = args.import_map { ImportMap::from_json(import_map_specifier.as_str(), &value.to_string())? } else { diff --git a/cli/tests/integration/watcher_tests.rs b/cli/tests/integration/watcher_tests.rs index 42ca4b520e..c9a5632c04 100644 --- a/cli/tests/integration/watcher_tests.rs +++ b/cli/tests/integration/watcher_tests.rs @@ -18,7 +18,7 @@ macro_rules! assert_contains { // Helper function to skip watcher output that contains "Restarting" // phrase. fn skip_restarting_line( - mut stderr_lines: impl Iterator, + stderr_lines: &mut impl Iterator, ) -> String { loop { let msg = stderr_lines.next().unwrap(); @@ -69,10 +69,18 @@ fn child_lines( ) -> (impl Iterator, impl Iterator) { let stdout_lines = std::io::BufReader::new(child.stdout.take().unwrap()) .lines() - .map(|r| r.unwrap()); + .map(|r| { + let line = r.unwrap(); + eprintln!("STDOUT: {}", line); + line + }); let stderr_lines = std::io::BufReader::new(child.stderr.take().unwrap()) .lines() - .map(|r| r.unwrap()); + .map(|r| { + let line = r.unwrap(); + eprintln!("STERR: {}", line); + line + }); (stdout_lines, stderr_lines) } @@ -106,13 +114,7 @@ fn lint_watch_test() { .stderr(std::process::Stdio::piped()) .spawn() .expect("Failed to spawn script"); - let mut stderr = child.stderr.as_mut().unwrap(); - let mut stderr_lines = std::io::BufReader::new(&mut stderr) - .lines() - .map(|r| r.unwrap()); - - // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux. - std::thread::sleep(std::time::Duration::from_secs(1)); + let (_stdout_lines, mut stderr_lines) = child_lines(&mut child); let mut output = read_all_lints(&mut stderr_lines); let expected = std::fs::read_to_string(badly_linted_output).unwrap(); @@ -130,7 +132,6 @@ fn lint_watch_test() { // Change content of the file again to be badly-linted1 std::fs::copy(&badly_linted_fixed2, &badly_linted) .expect("Failed to copy file"); - std::thread::sleep(std::time::Duration::from_secs(1)); output = read_all_lints(&mut stderr_lines); let expected = std::fs::read_to_string(badly_linted_fixed2_output).unwrap(); @@ -172,13 +173,7 @@ fn lint_watch_without_args_test() { .stderr(std::process::Stdio::piped()) .spawn() .expect("Failed to spawn script"); - let mut stderr = child.stderr.as_mut().unwrap(); - let mut stderr_lines = std::io::BufReader::new(&mut stderr) - .lines() - .map(|r| r.unwrap()); - - // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux. - std::thread::sleep(std::time::Duration::from_secs(1)); + let (_stdout_lines, mut stderr_lines) = child_lines(&mut child); let mut output = read_all_lints(&mut stderr_lines); let expected = std::fs::read_to_string(badly_linted_output).unwrap(); @@ -187,7 +182,6 @@ fn lint_watch_without_args_test() { // Change content of the file again to be badly-linted1 std::fs::copy(&badly_linted_fixed1, &badly_linted) .expect("Failed to copy file"); - std::thread::sleep(std::time::Duration::from_secs(1)); output = read_all_lints(&mut stderr_lines); let expected = std::fs::read_to_string(badly_linted_fixed1_output).unwrap(); @@ -236,18 +230,12 @@ fn lint_all_files_on_each_change_test() { .stderr(std::process::Stdio::piped()) .spawn() .expect("Failed to spawn script"); - let mut stderr = child.stderr.as_mut().unwrap(); - let mut stderr_lines = std::io::BufReader::new(&mut stderr) - .lines() - .map(|r| r.unwrap()); - - std::thread::sleep(std::time::Duration::from_secs(1)); + let (_stdout_lines, mut stderr_lines) = child_lines(&mut child); assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 2 files"); std::fs::copy(&badly_linted_fixed2, &badly_linted_2) .expect("Failed to copy file"); - std::thread::sleep(std::time::Duration::from_secs(1)); assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 2 files"); @@ -276,12 +264,13 @@ fn fmt_watch_test() { .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); - let (_stdout_lines, stderr_lines) = child_lines(&mut child); + let (_stdout_lines, mut stderr_lines) = child_lines(&mut child); - // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux. - std::thread::sleep(std::time::Duration::from_secs(1)); - - assert!(skip_restarting_line(stderr_lines).contains("badly_formatted.js")); + assert_contains!( + skip_restarting_line(&mut stderr_lines), + "badly_formatted.js" + ); + assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file"); let expected = std::fs::read_to_string(fixed.clone()).unwrap(); let actual = std::fs::read_to_string(badly_formatted.clone()).unwrap(); @@ -289,7 +278,12 @@ fn fmt_watch_test() { // Change content of the file again to be badly formatted std::fs::copy(&badly_formatted_original, &badly_formatted).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + + assert_contains!( + skip_restarting_line(&mut stderr_lines), + "badly_formatted.js" + ); + assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file"); // Check if file has been automatically formatted by watcher let expected = std::fs::read_to_string(fixed).unwrap(); @@ -316,12 +310,13 @@ fn fmt_watch_without_args_test() { .stderr(std::process::Stdio::piped()) .spawn() .unwrap(); - let (_stdout_lines, stderr_lines) = child_lines(&mut child); + let (_stdout_lines, mut stderr_lines) = child_lines(&mut child); - // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux. - std::thread::sleep(std::time::Duration::from_secs(1)); - - assert!(skip_restarting_line(stderr_lines).contains("badly_formatted.js")); + assert_contains!( + skip_restarting_line(&mut stderr_lines), + "badly_formatted.js" + ); + assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file"); let expected = std::fs::read_to_string(fixed.clone()).unwrap(); let actual = std::fs::read_to_string(badly_formatted.clone()).unwrap(); @@ -329,7 +324,11 @@ fn fmt_watch_without_args_test() { // Change content of the file again to be badly formatted std::fs::copy(&badly_formatted_original, &badly_formatted).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!( + skip_restarting_line(&mut stderr_lines), + "badly_formatted.js" + ); + assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file"); // Check if file has been automatically formatted by watcher let expected = std::fs::read_to_string(fixed).unwrap(); @@ -361,9 +360,6 @@ fn fmt_check_all_files_on_each_change_test() { .unwrap(); let (_stdout_lines, mut stderr_lines) = child_lines(&mut child); - // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux. - std::thread::sleep(std::time::Duration::from_secs(1)); - assert_contains!( read_line("error", &mut stderr_lines), "Found 2 not formatted files in 2 files" @@ -372,8 +368,6 @@ fn fmt_check_all_files_on_each_change_test() { // Change content of the file again to be badly formatted std::fs::copy(&badly_formatted_original, &badly_formatted_1).unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); - assert_contains!( read_line("error", &mut stderr_lines), "Found 2 not formatted files in 2 files" @@ -407,7 +401,6 @@ fn bundle_js_watch() { let (_stdout_lines, mut stderr_lines) = child_lines(&mut deno); - std::thread::sleep(std::time::Duration::from_secs(1)); assert_contains!(stderr_lines.next().unwrap(), "Check"); assert_contains!(stderr_lines.next().unwrap(), "file_to_watch.js"); assert_contains!(stderr_lines.next().unwrap(), "mod6.bundle.js"); @@ -416,7 +409,7 @@ fn bundle_js_watch() { wait_for("Bundle finished", &mut stderr_lines); write(&file_to_watch, "console.log('Hello world2');").unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!(stderr_lines.next().unwrap(), "Check"); assert_contains!(stderr_lines.next().unwrap(), "File change detected!"); assert_contains!(stderr_lines.next().unwrap(), "file_to_watch.js"); @@ -427,7 +420,7 @@ fn bundle_js_watch() { // Confirm that the watcher keeps on working even if the file is updated and has invalid syntax write(&file_to_watch, "syntax error ^^").unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!(stderr_lines.next().unwrap(), "File change detected!"); assert_contains!(stderr_lines.next().unwrap(), "error: "); wait_for("Bundle failed", &mut stderr_lines); @@ -456,7 +449,6 @@ fn bundle_watch_not_exit() { .unwrap(); let (_stdout_lines, mut stderr_lines) = child_lines(&mut deno); - std::thread::sleep(std::time::Duration::from_secs(1)); assert_contains!(stderr_lines.next().unwrap(), "error:"); assert_contains!(stderr_lines.next().unwrap(), "Bundle failed"); // the target file hasn't been created yet @@ -464,12 +456,14 @@ fn bundle_watch_not_exit() { // Make sure the watcher actually restarts and works fine with the proper syntax write(&file_to_watch, "console.log(42);").unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!(stderr_lines.next().unwrap(), "Check"); assert_contains!(stderr_lines.next().unwrap(), "File change detected!"); assert_contains!(stderr_lines.next().unwrap(), "file_to_watch.js"); assert_contains!(stderr_lines.next().unwrap(), "target.js"); + wait_for("Bundle finished", &mut stderr_lines); + // bundled file is created assert!(target_file.is_file()); check_alive_then_kill(deno); @@ -497,13 +491,8 @@ fn run_watch() { assert_contains!(stdout_lines.next().unwrap(), "Hello world"); wait_for("Process finished", &mut stderr_lines); - // TODO(lucacasonato): remove this timeout. It seems to be needed on Linux. - std::thread::sleep(std::time::Duration::from_secs(1)); - // Change content of the file write(&file_to_watch, "console.log('Hello world2');").unwrap(); - // Events from the file watcher is "debounced", so we need to wait for the next execution to start - std::thread::sleep(std::time::Duration::from_secs(1)); assert_contains!(stderr_lines.next().unwrap(), "Restarting"); assert_contains!(stdout_lines.next().unwrap(), "Hello world2"); @@ -517,21 +506,21 @@ fn run_watch() { "import { foo } from './another_file.js'; console.log(foo);", ) .unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!(stderr_lines.next().unwrap(), "Restarting"); assert_contains!(stdout_lines.next().unwrap(), '0'); wait_for("Process finished", &mut stderr_lines); // Confirm that restarting occurs when a new file is updated write(&another_file, "export const foo = 42;").unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!(stderr_lines.next().unwrap(), "Restarting"); assert_contains!(stdout_lines.next().unwrap(), "42"); wait_for("Process finished", &mut stderr_lines); // Confirm that the watcher keeps on working even if the file is updated and has invalid syntax write(&file_to_watch, "syntax error ^^").unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!(stderr_lines.next().unwrap(), "Restarting"); assert_contains!(stderr_lines.next().unwrap(), "error:"); wait_for("Process failed", &mut stderr_lines); @@ -542,21 +531,21 @@ fn run_watch() { "import { foo } from './another_file.js'; console.log(foo);", ) .unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!(stderr_lines.next().unwrap(), "Restarting"); assert_contains!(stdout_lines.next().unwrap(), "42"); wait_for("Process finished", &mut stderr_lines); // Update the content of the imported file with invalid syntax write(&another_file, "syntax error ^^").unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!(stderr_lines.next().unwrap(), "Restarting"); assert_contains!(stderr_lines.next().unwrap(), "error:"); wait_for("Process failed", &mut stderr_lines); // Modify the imported file and make sure that restarting occurs write(&another_file, "export const foo = 'modified!';").unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!(stderr_lines.next().unwrap(), "Restarting"); assert_contains!(stdout_lines.next().unwrap(), "modified!"); wait_for("Process finished", &mut stderr_lines); @@ -613,9 +602,6 @@ fn run_watch_load_unload_events() { ) .unwrap(); - // Events from the file watcher is "debounced", so we need to wait for the next execution to start - std::thread::sleep(std::time::Duration::from_secs(1)); - // Wait for the restart assert_contains!(stderr_lines.next().unwrap(), "Restarting"); @@ -650,13 +636,12 @@ fn run_watch_not_exit() { .unwrap(); let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child); - std::thread::sleep(std::time::Duration::from_secs(1)); assert_contains!(stderr_lines.next().unwrap(), "error:"); assert_contains!(stderr_lines.next().unwrap(), "Process failed"); // Make sure the watcher actually restarts and works fine with the proper syntax write(&file_to_watch, "console.log(42);").unwrap(); - std::thread::sleep(std::time::Duration::from_secs(1)); + assert_contains!(stderr_lines.next().unwrap(), "Restarting"); assert_contains!(stdout_lines.next().unwrap(), "42"); wait_for("Process finished", &mut stderr_lines); @@ -905,6 +890,6 @@ fn test_watch_doc() { // We only need to scan for a Check file://.../foo.ts$3-6 line that // corresponds to the documentation block being type-checked. - assert_contains!(skip_restarting_line(stderr_lines), "foo.ts$3-6"); + assert_contains!(skip_restarting_line(&mut stderr_lines), "foo.ts$3-6"); check_alive_then_kill(child); } diff --git a/cli/tools/fmt.rs b/cli/tools/fmt.rs index 2149552cd6..9172421b51 100644 --- a/cli/tools/fmt.rs +++ b/cli/tools/fmt.rs @@ -19,6 +19,7 @@ use crate::fs_util::specifier_to_file_path; use crate::fs_util::{collect_files, get_extension, is_supported_ext_fmt}; use crate::text_encoding; use deno_ast::ParsedSource; +use deno_core::anyhow::Context; use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::futures; @@ -525,7 +526,8 @@ struct FileContents { } fn read_file_contents(file_path: &Path) -> Result { - let file_bytes = fs::read(&file_path)?; + let file_bytes = fs::read(&file_path) + .with_context(|| format!("Error reading {}", file_path.display()))?; let charset = text_encoding::detect_charset(&file_bytes); let file_text = text_encoding::convert_to_utf8(&file_bytes, charset)?; let had_bom = file_text.starts_with(text_encoding::BOM_CHAR);