diff --git a/Cargo.lock b/Cargo.lock index adf3706b70..2d16aa88ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -850,6 +850,7 @@ dependencies = [ "fancy-regex", "flaky_test", "flate2", + "fs3", "fwdansi", "glibc_version", "http", diff --git a/Cargo.toml b/Cargo.toml index 1fbdd5c657..48fe593e7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,6 +88,7 @@ data-url = "=0.2.0" dlopen = "0.1.8" encoding_rs = "=0.8.31" flate2 = "=1.0.24" +fs3 = "0.5.0" futures = "0.3.21" http = "=0.2.8" hyper = "0.14.18" diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 0ad4896f32..692ac52056 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -73,6 +73,7 @@ env_logger = "=0.9.0" eszip = "=0.37.0" fancy-regex = "=0.10.0" flate2.workspace = true +fs3.workspace = true http.workspace = true import_map = "=0.15.0" indexmap.workspace = true diff --git a/cli/npm/resolvers/local.rs b/cli/npm/resolvers/local.rs index bf5b8529c4..52a783823f 100644 --- a/cli/npm/resolvers/local.rs +++ b/cli/npm/resolvers/local.rs @@ -10,6 +10,7 @@ use std::path::Path; use std::path::PathBuf; use crate::util::fs::symlink_dir; +use crate::util::fs::LaxSingleProcessFsFlag; use async_trait::async_trait; use deno_ast::ModuleSpecifier; use deno_core::anyhow::bail; @@ -236,6 +237,13 @@ async fn sync_resolution_with_fs( format!("Creating '{}'", deno_local_registry_dir.display()) })?; + let single_process_lock = LaxSingleProcessFsFlag::lock( + deno_local_registry_dir.join(".deno.lock"), + // similar message used by cargo build + "waiting for file lock on node_modules directory", + ) + .await; + // 1. Write all the packages out the .deno directory. // // Copy (hardlink in future) // to @@ -394,6 +402,8 @@ async fn sync_resolution_with_fs( } } + drop(single_process_lock); + Ok(()) } diff --git a/cli/util/fs.rs b/cli/util/fs.rs index 777b22c5fe..4ac57eac03 100644 --- a/cli/util/fs.rs +++ b/cli/util/fs.rs @@ -14,10 +14,14 @@ use std::io::ErrorKind; use std::io::Write; use std::path::Path; use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; use walkdir::WalkDir; use crate::args::FilesConfig; +use crate::util::progress_bar::ProgressBar; +use crate::util::progress_bar::ProgressBarStyle; +use crate::util::progress_bar::ProgressMessagePrompt; use super::path::specifier_to_file_path; @@ -471,11 +475,167 @@ pub fn dir_size(path: &Path) -> std::io::Result { Ok(total) } +struct LaxSingleProcessFsFlagInner { + file_path: PathBuf, + fs_file: std::fs::File, + finished_token: Arc, +} + +impl Drop for LaxSingleProcessFsFlagInner { + fn drop(&mut self) { + use fs3::FileExt; + // kill the poll thread + self.finished_token.cancel(); + // release the file lock + if let Err(err) = self.fs_file.unlock() { + log::debug!( + "Failed releasing lock for {}. {:#}", + self.file_path.display(), + err + ); + } + } +} + +/// A file system based flag that will attempt to synchronize multiple +/// processes so they go one after the other. In scenarios where +/// synchronization cannot be achieved, it will allow the current process +/// to proceed. +/// +/// This should only be used in places where it's ideal for multiple +/// processes to not update something on the file system at the same time, +/// but it's not that big of a deal. +pub struct LaxSingleProcessFsFlag(Option); + +impl LaxSingleProcessFsFlag { + pub async fn lock(file_path: PathBuf, long_wait_message: &str) -> Self { + log::debug!("Acquiring file lock at {}", file_path.display()); + use fs3::FileExt; + let last_updated_path = file_path.with_extension("lock.poll"); + let start_instant = std::time::Instant::now(); + let open_result = std::fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&file_path); + + match open_result { + Ok(fs_file) => { + let mut pb_update_guard = None; + let mut error_count = 0; + while error_count < 10 { + let lock_result = fs_file.try_lock_exclusive(); + let poll_file_update_ms = 100; + match lock_result { + Ok(_) => { + log::debug!("Acquired file lock at {}", file_path.display()); + let _ignore = std::fs::write(&last_updated_path, ""); + let token = Arc::new(tokio_util::sync::CancellationToken::new()); + + // Spawn a blocking task that will continually update a file + // signalling the lock is alive. This is a fail safe for when + // a file lock is never released. For example, on some operating + // systems, if a process does not release the lock (say it's + // killed), then the OS may release it at an indeterminate time + // + // This uses a blocking task because we use a single threaded + // runtime and this is time sensitive so we don't want it to update + // at the whims of of whatever is occurring on the runtime thread. + tokio::task::spawn_blocking({ + let token = token.clone(); + let last_updated_path = last_updated_path.clone(); + move || { + let mut i = 0; + while !token.is_cancelled() { + i += 1; + let _ignore = + std::fs::write(&last_updated_path, i.to_string()); + std::thread::sleep(Duration::from_millis( + poll_file_update_ms, + )); + } + } + }); + + return Self(Some(LaxSingleProcessFsFlagInner { + file_path, + fs_file, + finished_token: token, + })); + } + Err(_) => { + // show a message if it's been a while + if pb_update_guard.is_none() + && start_instant.elapsed().as_millis() > 1_000 + { + let pb = ProgressBar::new(ProgressBarStyle::TextOnly); + let guard = pb.update_with_prompt( + ProgressMessagePrompt::Blocking, + long_wait_message, + ); + pb_update_guard = Some((guard, pb)); + } + + // sleep for a little bit + tokio::time::sleep(Duration::from_millis(20)).await; + + // Poll the last updated path to check if it's stopped updating, + // which is an indication that the file lock is claimed, but + // was never properly released. + match std::fs::metadata(&last_updated_path) + .and_then(|p| p.modified()) + { + Ok(last_updated_time) => { + let current_time = std::time::SystemTime::now(); + match current_time.duration_since(last_updated_time) { + Ok(duration) => { + if duration.as_millis() + > (poll_file_update_ms * 2) as u128 + { + // the other process hasn't updated this file in a long time + // so maybe it was killed and the operating system hasn't + // released the file lock yet + return Self(None); + } else { + error_count = 0; // reset + } + } + Err(_) => { + error_count += 1; + } + } + } + Err(_) => { + error_count += 1; + } + } + } + } + } + + drop(pb_update_guard); // explicit for clarity + Self(None) + } + Err(err) => { + log::debug!( + "Failed to open file lock at {}. {:#}", + file_path.display(), + err + ); + Self(None) // let the process through + } + } + } +} + #[cfg(test)] mod tests { use super::*; + use deno_core::futures; + use deno_core::parking_lot::Mutex; use pretty_assertions::assert_eq; use test_util::TempDir; + use tokio::sync::Notify; #[test] fn resolve_from_cwd_child() { @@ -793,4 +953,90 @@ mod tests { ); } } + + #[tokio::test] + async fn lax_fs_lock() { + let temp_dir = TempDir::new(); + let lock_path = temp_dir.path().join("file.lock"); + let signal1 = Arc::new(Notify::new()); + let signal2 = Arc::new(Notify::new()); + let signal3 = Arc::new(Notify::new()); + let signal4 = Arc::new(Notify::new()); + tokio::spawn({ + let lock_path = lock_path.clone(); + let signal1 = signal1.clone(); + let signal2 = signal2.clone(); + let signal3 = signal3.clone(); + let signal4 = signal4.clone(); + let temp_dir = temp_dir.clone(); + async move { + let flag = + LaxSingleProcessFsFlag::lock(lock_path.clone(), "waiting").await; + signal1.notify_one(); + signal2.notified().await; + tokio::time::sleep(Duration::from_millis(10)).await; // give the other thread time to acquire the lock + temp_dir.write("file.txt", "update1"); + signal3.notify_one(); + signal4.notified().await; + drop(flag); + } + }); + let signal5 = Arc::new(Notify::new()); + tokio::spawn({ + let temp_dir = temp_dir.clone(); + let signal5 = signal5.clone(); + async move { + signal1.notified().await; + signal2.notify_one(); + let flag = LaxSingleProcessFsFlag::lock(lock_path, "waiting").await; + temp_dir.write("file.txt", "update2"); + signal5.notify_one(); + drop(flag); + } + }); + + signal3.notified().await; + assert_eq!(temp_dir.read_to_string("file.txt"), "update1"); + signal4.notify_one(); + signal5.notified().await; + assert_eq!(temp_dir.read_to_string("file.txt"), "update2"); + } + + #[tokio::test] + async fn lax_fs_lock_ordered() { + let temp_dir = TempDir::new(); + let lock_path = temp_dir.path().join("file.lock"); + let output_path = temp_dir.path().join("output"); + let expected_order = Arc::new(Mutex::new(Vec::new())); + let count = 10; + let mut tasks = Vec::with_capacity(count); + + std::fs::write(&output_path, "").unwrap(); + + for i in 0..count { + let lock_path = lock_path.clone(); + let output_path = output_path.clone(); + let expected_order = expected_order.clone(); + tasks.push(tokio::spawn(async move { + let flag = + LaxSingleProcessFsFlag::lock(lock_path.clone(), "waiting").await; + expected_order.lock().push(i.to_string()); + // be extremely racy + let mut output = std::fs::read_to_string(&output_path).unwrap(); + if !output.is_empty() { + output.push('\n'); + } + output.push_str(&i.to_string()); + std::fs::write(&output_path, output).unwrap(); + drop(flag); + })); + } + + futures::future::join_all(tasks).await; + let expected_output = expected_order.lock().join("\n"); + assert_eq!( + std::fs::read_to_string(output_path).unwrap(), + expected_output + ); + } } diff --git a/cli/util/progress_bar/mod.rs b/cli/util/progress_bar/mod.rs index 004b48b2f8..2568710795 100644 --- a/cli/util/progress_bar/mod.rs +++ b/cli/util/progress_bar/mod.rs @@ -23,6 +23,21 @@ mod renderer; // Inspired by Indicatif, but this custom implementation allows // for more control over what's going on under the hood. +#[derive(Debug, Clone, Copy)] +pub enum ProgressMessagePrompt { + Download, + Blocking, +} + +impl ProgressMessagePrompt { + pub fn as_text(&self) -> String { + match self { + ProgressMessagePrompt::Download => colors::green("Download").to_string(), + ProgressMessagePrompt::Blocking => colors::cyan("Blocking").to_string(), + } + } +} + #[derive(Debug)] pub struct UpdateGuard { maybe_entry: Option, @@ -59,6 +74,7 @@ pub enum ProgressBarStyle { #[derive(Clone, Debug)] struct ProgressBarEntry { id: usize, + prompt: ProgressMessagePrompt, pub message: String, pos: Arc, total_size: Arc, @@ -128,11 +144,16 @@ impl ProgressBarInner { } } - pub fn add_entry(&self, message: String) -> ProgressBarEntry { + pub fn add_entry( + &self, + kind: ProgressMessagePrompt, + message: String, + ) -> ProgressBarEntry { let mut internal_state = self.state.lock(); let id = internal_state.total_entries; let entry = ProgressBarEntry { id, + prompt: kind, message, pos: Default::default(), total_size: Default::default(), @@ -208,6 +229,7 @@ impl DrawThreadRenderer for ProgressBarInner { pending_entries: state.entries.len(), total_entries: state.total_entries, display_entry: ProgressDataDisplayEntry { + prompt: preferred_entry.prompt, message: preferred_entry.message.clone(), position: preferred_entry.position(), total_size: preferred_entry.total_size(), @@ -255,9 +277,17 @@ impl ProgressBar { } pub fn update(&self, msg: &str) -> UpdateGuard { + self.update_with_prompt(ProgressMessagePrompt::Download, msg) + } + + pub fn update_with_prompt( + &self, + kind: ProgressMessagePrompt, + msg: &str, + ) -> UpdateGuard { match &self.inner { Some(inner) => { - let entry = inner.add_entry(msg.to_string()); + let entry = inner.add_entry(kind, msg.to_string()); UpdateGuard { maybe_entry: Some(entry), } @@ -265,7 +295,7 @@ impl ProgressBar { None => { // if we're not running in TTY, fallback to using logger crate if !msg.is_empty() { - log::log!(log::Level::Info, "{} {}", colors::green("Download"), msg); + log::log!(log::Level::Info, "{} {}", kind.as_text(), msg); } UpdateGuard { maybe_entry: None } } diff --git a/cli/util/progress_bar/renderer.rs b/cli/util/progress_bar/renderer.rs index 0ea275e773..5635ad3165 100644 --- a/cli/util/progress_bar/renderer.rs +++ b/cli/util/progress_bar/renderer.rs @@ -6,8 +6,11 @@ use deno_runtime::colors; use crate::util::display::human_download_size; +use super::ProgressMessagePrompt; + #[derive(Clone)] pub struct ProgressDataDisplayEntry { + pub prompt: ProgressMessagePrompt, pub message: String, pub position: u64, pub total_size: u64, @@ -142,7 +145,7 @@ impl ProgressBarRenderer for TextOnlyProgressBarRenderer { format!( "{} {}{}{}", - colors::green("Download"), + data.display_entry.prompt.as_text(), data.display_entry.message, colors::gray(bytes_text), colors::gray(total_text), @@ -195,6 +198,7 @@ mod test { let renderer = BarProgressBarRenderer; let mut data = ProgressData { display_entry: ProgressDataDisplayEntry { + prompt: ProgressMessagePrompt::Download, message: "data".to_string(), position: 0, total_size: 10 * BYTES_TO_KIB, @@ -251,6 +255,7 @@ mod test { let renderer = TextOnlyProgressBarRenderer; let mut data = ProgressData { display_entry: ProgressDataDisplayEntry { + prompt: ProgressMessagePrompt::Blocking, message: "data".to_string(), position: 0, total_size: 10 * BYTES_TO_KIB, @@ -263,7 +268,7 @@ mod test { }; let text = renderer.render(data.clone()); let text = test_util::strip_ansi_codes(&text); - assert_eq!(text, "Download data 0.00KiB/10.00KiB (2/3)"); + assert_eq!(text, "Blocking data 0.00KiB/10.00KiB (2/3)"); data.pending_entries = 0; data.total_entries = 1; @@ -271,6 +276,6 @@ mod test { data.display_entry.total_size = 0; let text = renderer.render(data); let text = test_util::strip_ansi_codes(&text); - assert_eq!(text, "Download data"); + assert_eq!(text, "Blocking data"); } } diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 1121562030..0ba8f8d3ad 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -89,7 +89,7 @@ atty.workspace = true dlopen.workspace = true encoding_rs.workspace = true filetime = "0.2.16" -fs3 = "0.5.0" +fs3.workspace = true http.workspace = true hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] } libc.workspace = true