1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 07:14:47 -05:00

fix(watch): mitigate race condition between file write by other process and watch read (#13038)

This commit is contained in:
David Sherret 2021-12-09 20:24:37 -05:00 committed by GitHub
parent 616ff1d482
commit f530189c50
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 89 additions and 120 deletions

View file

@ -4,9 +4,7 @@ use crate::colors;
use crate::fs_util::canonicalize_path;
use deno_core::error::AnyError;
use deno_core::futures::stream::{Stream, StreamExt};
use deno_core::futures::Future;
use deno_core::parking_lot::Mutex;
use log::info;
use notify::event::Event as NotifyEvent;
use notify::event::EventKind;
@ -15,57 +13,42 @@ use notify::Error as NotifyError;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
use pin_project::pin_project;
use std::collections::HashSet;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use tokio::pin;
use tokio::select;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tokio::time::Instant;
use tokio::time::Sleep;
const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(200);
#[pin_project(project = DebounceProjection)]
struct Debounce {
#[pin]
timer: Sleep,
changed_paths: Arc<Mutex<HashSet<PathBuf>>>,
struct DebouncedReceiver {
receiver: mpsc::UnboundedReceiver<Vec<PathBuf>>,
}
impl Debounce {
fn new() -> Self {
Self {
timer: sleep(DEBOUNCE_INTERVAL),
changed_paths: Arc::new(Mutex::new(HashSet::new())),
}
}
impl DebouncedReceiver {
fn new_with_sender() -> (Arc<mpsc::UnboundedSender<Vec<PathBuf>>>, Self) {
let (sender, receiver) = mpsc::unbounded_channel();
(Arc::new(sender), Self { receiver })
}
impl Stream for Debounce {
type Item = Vec<PathBuf>;
/// Note that this never returns `Poll::Ready(None)`, which means that the
/// file watcher will be alive until the Deno process is terminated.
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
let mut changed_paths = self.changed_paths.lock();
if changed_paths.len() > 0 {
Poll::Ready(Some(changed_paths.drain().collect()))
} else {
drop(changed_paths);
let mut timer = self.project().timer;
if timer.as_mut().poll(cx).is_ready() {
timer.reset(Instant::now() + DEBOUNCE_INTERVAL);
async fn recv(&mut self) -> Option<Vec<PathBuf>> {
let mut received_items = self
.receiver
.recv()
.await?
.into_iter()
.collect::<HashSet<_>>(); // prevent duplicates
loop {
tokio::select! {
items = self.receiver.recv() => {
received_items.extend(items?);
}
_ = sleep(DEBOUNCE_INTERVAL) => {
return Some(received_items.into_iter().collect());
}
}
Poll::Pending
}
}
}
@ -91,14 +74,14 @@ pub enum ResolutionResult<T> {
async fn next_restart<R, T, F>(
resolver: &mut R,
debounce: &mut Pin<&mut Debounce>,
debounced_receiver: &mut DebouncedReceiver,
) -> (Vec<PathBuf>, Result<T, AnyError>)
where
R: FnMut(Option<Vec<PathBuf>>) -> F,
F: Future<Output = ResolutionResult<T>>,
{
loop {
let changed = debounce.next().await;
let changed = debounced_receiver.recv().await;
match resolver(changed).await {
ResolutionResult::Ignore => {
log::debug!("File change ignored")
@ -140,8 +123,7 @@ where
F1: Future<Output = ResolutionResult<T>>,
F2: Future<Output = Result<(), AnyError>>,
{
let debounce = Debounce::new();
pin!(debounce);
let (sender, mut receiver) = DebouncedReceiver::new_with_sender();
// Store previous data. If module resolution fails at some point, the watcher will try to
// continue watching files using these data.
@ -161,7 +143,7 @@ where
colors::intense_blue("Watcher"),
);
let (paths, result) = next_restart(&mut resolver, &mut debounce).await;
let (paths, result) = next_restart(&mut resolver, &mut receiver).await;
paths_to_watch = paths;
resolution_result = result;
}
@ -175,13 +157,13 @@ where
};
loop {
let watcher = new_watcher(&paths_to_watch, &debounce)?;
let watcher = new_watcher(&paths_to_watch, sender.clone())?;
match resolution_result {
Ok(operation_arg) => {
let fut = error_handler(operation(operation_arg));
select! {
(paths, result) = next_restart(&mut resolver, &mut debounce) => {
(paths, result) = next_restart(&mut resolver, &mut receiver) => {
if result.is_ok() {
paths_to_watch = paths;
}
@ -207,7 +189,7 @@ where
}
}
let (paths, result) = next_restart(&mut resolver, &mut debounce).await;
let (paths, result) = next_restart(&mut resolver, &mut receiver).await;
if result.is_ok() {
paths_to_watch = paths;
}
@ -219,10 +201,8 @@ where
fn new_watcher(
paths: &[PathBuf],
debounce: &Debounce,
sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>,
) -> Result<RecommendedWatcher, AnyError> {
let changed_paths = Arc::clone(&debounce.changed_paths);
let mut watcher: RecommendedWatcher =
Watcher::new(move |res: Result<NotifyEvent, NotifyError>| {
if let Ok(event) = res {
@ -233,9 +213,9 @@ fn new_watcher(
let paths = event
.paths
.iter()
.filter_map(|path| canonicalize_path(path).ok());
let mut changed_paths = changed_paths.lock();
changed_paths.extend(paths);
.filter_map(|path| canonicalize_path(path).ok())
.collect();
sender.send(paths).unwrap();
}
}
})?;

View file

@ -166,7 +166,9 @@ async fn op_emit(
args.import_map_path
{
let import_map_specifier = resolve_url_or_path(&import_map_str)
.context(format!("Bad URL (\"{}\") for import map.", import_map_str))?;
.with_context(|| {
format!("Bad URL (\"{}\") for import map.", import_map_str)
})?;
let import_map = if let Some(value) = args.import_map {
ImportMap::from_json(import_map_specifier.as_str(), &value.to_string())?
} else {

View file

@ -18,7 +18,7 @@ macro_rules! assert_contains {
// Helper function to skip watcher output that contains "Restarting"
// phrase.
fn skip_restarting_line(
mut stderr_lines: impl Iterator<Item = String>,
stderr_lines: &mut impl Iterator<Item = String>,
) -> String {
loop {
let msg = stderr_lines.next().unwrap();
@ -69,10 +69,18 @@ fn child_lines(
) -> (impl Iterator<Item = String>, impl Iterator<Item = String>) {
let stdout_lines = std::io::BufReader::new(child.stdout.take().unwrap())
.lines()
.map(|r| r.unwrap());
.map(|r| {
let line = r.unwrap();
eprintln!("STDOUT: {}", line);
line
});
let stderr_lines = std::io::BufReader::new(child.stderr.take().unwrap())
.lines()
.map(|r| r.unwrap());
.map(|r| {
let line = r.unwrap();
eprintln!("STERR: {}", line);
line
});
(stdout_lines, stderr_lines)
}
@ -106,13 +114,7 @@ fn lint_watch_test() {
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn script");
let mut stderr = child.stderr.as_mut().unwrap();
let mut stderr_lines = std::io::BufReader::new(&mut stderr)
.lines()
.map(|r| r.unwrap());
// TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
std::thread::sleep(std::time::Duration::from_secs(1));
let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
let mut output = read_all_lints(&mut stderr_lines);
let expected = std::fs::read_to_string(badly_linted_output).unwrap();
@ -130,7 +132,6 @@ fn lint_watch_test() {
// Change content of the file again to be badly-linted1
std::fs::copy(&badly_linted_fixed2, &badly_linted)
.expect("Failed to copy file");
std::thread::sleep(std::time::Duration::from_secs(1));
output = read_all_lints(&mut stderr_lines);
let expected = std::fs::read_to_string(badly_linted_fixed2_output).unwrap();
@ -172,13 +173,7 @@ fn lint_watch_without_args_test() {
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn script");
let mut stderr = child.stderr.as_mut().unwrap();
let mut stderr_lines = std::io::BufReader::new(&mut stderr)
.lines()
.map(|r| r.unwrap());
// TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
std::thread::sleep(std::time::Duration::from_secs(1));
let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
let mut output = read_all_lints(&mut stderr_lines);
let expected = std::fs::read_to_string(badly_linted_output).unwrap();
@ -187,7 +182,6 @@ fn lint_watch_without_args_test() {
// Change content of the file again to be badly-linted1
std::fs::copy(&badly_linted_fixed1, &badly_linted)
.expect("Failed to copy file");
std::thread::sleep(std::time::Duration::from_secs(1));
output = read_all_lints(&mut stderr_lines);
let expected = std::fs::read_to_string(badly_linted_fixed1_output).unwrap();
@ -236,18 +230,12 @@ fn lint_all_files_on_each_change_test() {
.stderr(std::process::Stdio::piped())
.spawn()
.expect("Failed to spawn script");
let mut stderr = child.stderr.as_mut().unwrap();
let mut stderr_lines = std::io::BufReader::new(&mut stderr)
.lines()
.map(|r| r.unwrap());
std::thread::sleep(std::time::Duration::from_secs(1));
let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 2 files");
std::fs::copy(&badly_linted_fixed2, &badly_linted_2)
.expect("Failed to copy file");
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 2 files");
@ -276,12 +264,13 @@ fn fmt_watch_test() {
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let (_stdout_lines, stderr_lines) = child_lines(&mut child);
let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
// TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
std::thread::sleep(std::time::Duration::from_secs(1));
assert!(skip_restarting_line(stderr_lines).contains("badly_formatted.js"));
assert_contains!(
skip_restarting_line(&mut stderr_lines),
"badly_formatted.js"
);
assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file");
let expected = std::fs::read_to_string(fixed.clone()).unwrap();
let actual = std::fs::read_to_string(badly_formatted.clone()).unwrap();
@ -289,7 +278,12 @@ fn fmt_watch_test() {
// Change content of the file again to be badly formatted
std::fs::copy(&badly_formatted_original, &badly_formatted).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(
skip_restarting_line(&mut stderr_lines),
"badly_formatted.js"
);
assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file");
// Check if file has been automatically formatted by watcher
let expected = std::fs::read_to_string(fixed).unwrap();
@ -316,12 +310,13 @@ fn fmt_watch_without_args_test() {
.stderr(std::process::Stdio::piped())
.spawn()
.unwrap();
let (_stdout_lines, stderr_lines) = child_lines(&mut child);
let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
// TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
std::thread::sleep(std::time::Duration::from_secs(1));
assert!(skip_restarting_line(stderr_lines).contains("badly_formatted.js"));
assert_contains!(
skip_restarting_line(&mut stderr_lines),
"badly_formatted.js"
);
assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file");
let expected = std::fs::read_to_string(fixed.clone()).unwrap();
let actual = std::fs::read_to_string(badly_formatted.clone()).unwrap();
@ -329,7 +324,11 @@ fn fmt_watch_without_args_test() {
// Change content of the file again to be badly formatted
std::fs::copy(&badly_formatted_original, &badly_formatted).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(
skip_restarting_line(&mut stderr_lines),
"badly_formatted.js"
);
assert_contains!(read_line("Checked", &mut stderr_lines), "Checked 1 file");
// Check if file has been automatically formatted by watcher
let expected = std::fs::read_to_string(fixed).unwrap();
@ -361,9 +360,6 @@ fn fmt_check_all_files_on_each_change_test() {
.unwrap();
let (_stdout_lines, mut stderr_lines) = child_lines(&mut child);
// TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(
read_line("error", &mut stderr_lines),
"Found 2 not formatted files in 2 files"
@ -372,8 +368,6 @@ fn fmt_check_all_files_on_each_change_test() {
// Change content of the file again to be badly formatted
std::fs::copy(&badly_formatted_original, &badly_formatted_1).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(
read_line("error", &mut stderr_lines),
"Found 2 not formatted files in 2 files"
@ -407,7 +401,6 @@ fn bundle_js_watch() {
let (_stdout_lines, mut stderr_lines) = child_lines(&mut deno);
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Check");
assert_contains!(stderr_lines.next().unwrap(), "file_to_watch.js");
assert_contains!(stderr_lines.next().unwrap(), "mod6.bundle.js");
@ -416,7 +409,7 @@ fn bundle_js_watch() {
wait_for("Bundle finished", &mut stderr_lines);
write(&file_to_watch, "console.log('Hello world2');").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Check");
assert_contains!(stderr_lines.next().unwrap(), "File change detected!");
assert_contains!(stderr_lines.next().unwrap(), "file_to_watch.js");
@ -427,7 +420,7 @@ fn bundle_js_watch() {
// Confirm that the watcher keeps on working even if the file is updated and has invalid syntax
write(&file_to_watch, "syntax error ^^").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "File change detected!");
assert_contains!(stderr_lines.next().unwrap(), "error: ");
wait_for("Bundle failed", &mut stderr_lines);
@ -456,7 +449,6 @@ fn bundle_watch_not_exit() {
.unwrap();
let (_stdout_lines, mut stderr_lines) = child_lines(&mut deno);
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "error:");
assert_contains!(stderr_lines.next().unwrap(), "Bundle failed");
// the target file hasn't been created yet
@ -464,12 +456,14 @@ fn bundle_watch_not_exit() {
// Make sure the watcher actually restarts and works fine with the proper syntax
write(&file_to_watch, "console.log(42);").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Check");
assert_contains!(stderr_lines.next().unwrap(), "File change detected!");
assert_contains!(stderr_lines.next().unwrap(), "file_to_watch.js");
assert_contains!(stderr_lines.next().unwrap(), "target.js");
wait_for("Bundle finished", &mut stderr_lines);
// bundled file is created
assert!(target_file.is_file());
check_alive_then_kill(deno);
@ -497,13 +491,8 @@ fn run_watch() {
assert_contains!(stdout_lines.next().unwrap(), "Hello world");
wait_for("Process finished", &mut stderr_lines);
// TODO(lucacasonato): remove this timeout. It seems to be needed on Linux.
std::thread::sleep(std::time::Duration::from_secs(1));
// Change content of the file
write(&file_to_watch, "console.log('Hello world2');").unwrap();
// Events from the file watcher is "debounced", so we need to wait for the next execution to start
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), "Hello world2");
@ -517,21 +506,21 @@ fn run_watch() {
"import { foo } from './another_file.js'; console.log(foo);",
)
.unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), '0');
wait_for("Process finished", &mut stderr_lines);
// Confirm that restarting occurs when a new file is updated
write(&another_file, "export const foo = 42;").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), "42");
wait_for("Process finished", &mut stderr_lines);
// Confirm that the watcher keeps on working even if the file is updated and has invalid syntax
write(&file_to_watch, "syntax error ^^").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stderr_lines.next().unwrap(), "error:");
wait_for("Process failed", &mut stderr_lines);
@ -542,21 +531,21 @@ fn run_watch() {
"import { foo } from './another_file.js'; console.log(foo);",
)
.unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), "42");
wait_for("Process finished", &mut stderr_lines);
// Update the content of the imported file with invalid syntax
write(&another_file, "syntax error ^^").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stderr_lines.next().unwrap(), "error:");
wait_for("Process failed", &mut stderr_lines);
// Modify the imported file and make sure that restarting occurs
write(&another_file, "export const foo = 'modified!';").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), "modified!");
wait_for("Process finished", &mut stderr_lines);
@ -613,9 +602,6 @@ fn run_watch_load_unload_events() {
)
.unwrap();
// Events from the file watcher is "debounced", so we need to wait for the next execution to start
std::thread::sleep(std::time::Duration::from_secs(1));
// Wait for the restart
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
@ -650,13 +636,12 @@ fn run_watch_not_exit() {
.unwrap();
let (mut stdout_lines, mut stderr_lines) = child_lines(&mut child);
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "error:");
assert_contains!(stderr_lines.next().unwrap(), "Process failed");
// Make sure the watcher actually restarts and works fine with the proper syntax
write(&file_to_watch, "console.log(42);").unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
assert_contains!(stderr_lines.next().unwrap(), "Restarting");
assert_contains!(stdout_lines.next().unwrap(), "42");
wait_for("Process finished", &mut stderr_lines);
@ -905,6 +890,6 @@ fn test_watch_doc() {
// We only need to scan for a Check file://.../foo.ts$3-6 line that
// corresponds to the documentation block being type-checked.
assert_contains!(skip_restarting_line(stderr_lines), "foo.ts$3-6");
assert_contains!(skip_restarting_line(&mut stderr_lines), "foo.ts$3-6");
check_alive_then_kill(child);
}

View file

@ -19,6 +19,7 @@ use crate::fs_util::specifier_to_file_path;
use crate::fs_util::{collect_files, get_extension, is_supported_ext_fmt};
use crate::text_encoding;
use deno_ast::ParsedSource;
use deno_core::anyhow::Context;
use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::futures;
@ -525,7 +526,8 @@ struct FileContents {
}
fn read_file_contents(file_path: &Path) -> Result<FileContents, AnyError> {
let file_bytes = fs::read(&file_path)?;
let file_bytes = fs::read(&file_path)
.with_context(|| format!("Error reading {}", file_path.display()))?;
let charset = text_encoding::detect_charset(&file_bytes);
let file_text = text_encoding::convert_to_utf8(&file_bytes, charset)?;
let had_bom = file_text.starts_with(text_encoding::BOM_CHAR);