From 8d9fef3b8955eadfd4820455b422b5bec1cdad0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 19 Oct 2023 07:05:00 +0200 Subject: [PATCH] refactor: add WatcherCommunicator helper struct (#20927) This commit introduces "WatcherCommunicator" struct that is used facilitate bi-directional communication between CLI file watcher and the watched function. Prerequisite for https://github.com/denoland/deno/pull/20876 --- cli/factory.rs | 39 ++++---- cli/graph_util.rs | 12 ++- cli/tools/bench/mod.rs | 9 +- cli/tools/bundle.rs | 7 +- cli/tools/fmt.rs | 4 +- cli/tools/lint.rs | 4 +- cli/tools/run.rs | 7 +- cli/tools/test/mod.rs | 9 +- cli/util/file_watcher.rs | 190 +++++++++++++++++++++++++++++---------- 9 files changed, 191 insertions(+), 90 deletions(-) diff --git a/cli/factory.rs b/cli/factory.rs index e4f9b60fe4..2841482f80 100644 --- a/cli/factory.rs +++ b/cli/factory.rs @@ -40,6 +40,7 @@ use crate::resolver::CliGraphResolver; use crate::resolver::CliGraphResolverOptions; use crate::standalone::DenoCompileBinaryWriter; use crate::tools::check::TypeChecker; +use crate::util::file_watcher::WatcherCommunicator; use crate::util::progress_bar::ProgressBar; use crate::util::progress_bar::ProgressBarStyle; use crate::worker::CliMainWorkerFactory; @@ -59,26 +60,18 @@ use deno_runtime::inspector_server::InspectorServer; use deno_semver::npm::NpmPackageReqReference; use import_map::ImportMap; use log::warn; -use std::cell::RefCell; use std::future::Future; -use std::path::PathBuf; use std::sync::Arc; pub struct CliFactoryBuilder { - maybe_sender: Option>>, + watcher_communicator: Option, } impl CliFactoryBuilder { pub fn new() -> Self { - Self { maybe_sender: None } - } - - pub fn with_watcher( - mut self, - sender: tokio::sync::mpsc::UnboundedSender>, - ) -> Self { - self.maybe_sender = Some(sender); - self + Self { + watcher_communicator: None, + } } pub async fn build_from_flags( @@ -88,9 +81,18 @@ impl CliFactoryBuilder { Ok(self.build_from_cli_options(Arc::new(CliOptions::from_flags(flags)?))) } + pub async fn build_from_flags_for_watcher( + mut self, + flags: Flags, + watcher_communicator: WatcherCommunicator, + ) -> Result { + self.watcher_communicator = Some(watcher_communicator); + self.build_from_flags(flags).await + } + pub fn build_from_cli_options(self, options: Arc) -> CliFactory { CliFactory { - maybe_sender: RefCell::new(self.maybe_sender), + watcher_communicator: self.watcher_communicator, options, services: Default::default(), } @@ -166,8 +168,7 @@ struct CliFactoryServices { } pub struct CliFactory { - maybe_sender: - RefCell>>>, + watcher_communicator: Option, options: Arc, services: CliFactoryServices, } @@ -384,11 +385,14 @@ impl CliFactory { } pub fn maybe_file_watcher_reporter(&self) -> &Option { - let maybe_sender = self.maybe_sender.borrow_mut().take(); + let maybe_file_watcher_reporter = self + .watcher_communicator + .as_ref() + .map(|i| FileWatcherReporter::new(i.clone())); self .services .maybe_file_watcher_reporter - .get_or_init(|| maybe_sender.map(FileWatcherReporter::new)) + .get_or_init(|| maybe_file_watcher_reporter) } pub fn emit_cache(&self) -> Result<&EmitCache, AnyError> { @@ -595,6 +599,7 @@ impl CliFactory { let npm_resolver = self.npm_resolver().await?; let fs = self.fs(); let cli_node_resolver = self.cli_node_resolver().await?; + Ok(CliMainWorkerFactory::new( StorageKeyResolver::from_options(&self.options), npm_resolver.clone(), diff --git a/cli/graph_util.rs b/cli/graph_util.rs index 17437ca997..b90581a145 100644 --- a/cli/graph_util.rs +++ b/cli/graph_util.rs @@ -13,6 +13,7 @@ use crate::npm::CliNpmResolver; use crate::resolver::CliGraphResolver; use crate::tools::check; use crate::tools::check::TypeChecker; +use crate::util::file_watcher::WatcherCommunicator; use crate::util::sync::TaskQueue; use crate::util::sync::TaskQueuePermit; @@ -635,14 +636,14 @@ impl<'a> ModuleGraphUpdatePermit<'a> { #[derive(Clone, Debug)] pub struct FileWatcherReporter { - sender: tokio::sync::mpsc::UnboundedSender>, + watcher_communicator: WatcherCommunicator, file_paths: Arc>>, } impl FileWatcherReporter { - pub fn new(sender: tokio::sync::mpsc::UnboundedSender>) -> Self { + pub fn new(watcher_communicator: WatcherCommunicator) -> Self { Self { - sender, + watcher_communicator, file_paths: Default::default(), } } @@ -665,7 +666,10 @@ impl deno_graph::source::Reporter for FileWatcherReporter { } if modules_done == modules_total { - self.sender.send(file_paths.drain(..).collect()).unwrap(); + self + .watcher_communicator + .watch_paths(file_paths.drain(..).collect()) + .unwrap(); } } } diff --git a/cli/tools/bench/mod.rs b/cli/tools/bench/mod.rs index 454a971266..eb400442e2 100644 --- a/cli/tools/bench/mod.rs +++ b/cli/tools/bench/mod.rs @@ -417,19 +417,18 @@ pub async fn run_benchmarks_with_watch( .map(|w| !w.no_clear_screen) .unwrap_or(true), }, - move |flags, sender, changed_paths| { + move |flags, watcher_communicator, changed_paths| { let bench_flags = bench_flags.clone(); Ok(async move { let factory = CliFactoryBuilder::new() - .with_watcher(sender.clone()) - .build_from_flags(flags) + .build_from_flags_for_watcher(flags, watcher_communicator.clone()) .await?; let cli_options = factory.cli_options(); let bench_options = cli_options.resolve_bench_options(bench_flags)?; - let _ = sender.send(cli_options.watch_paths()); + let _ = watcher_communicator.watch_paths(cli_options.watch_paths()); if let Some(include) = &bench_options.files.include { - let _ = sender.send(include.clone()); + let _ = watcher_communicator.watch_paths(include.clone()); } let graph_kind = cli_options.type_check_mode().as_graph_kind(); diff --git a/cli/tools/bundle.rs b/cli/tools/bundle.rs index 827641b1b8..cbde8768fd 100644 --- a/cli/tools/bundle.rs +++ b/cli/tools/bundle.rs @@ -35,15 +35,14 @@ pub async fn bundle( job_name: "Bundle".to_string(), clear_screen: !watch_flags.no_clear_screen, }, - move |flags, sender, _changed_paths| { + move |flags, watcher_communicator, _changed_paths| { let bundle_flags = bundle_flags.clone(); Ok(async move { let factory = CliFactoryBuilder::new() - .with_watcher(sender.clone()) - .build_from_flags(flags) + .build_from_flags_for_watcher(flags, watcher_communicator.clone()) .await?; let cli_options = factory.cli_options(); - let _ = sender.send(cli_options.watch_paths()); + let _ = watcher_communicator.watch_paths(cli_options.watch_paths()); bundle_action(factory, &bundle_flags).await?; Ok(()) diff --git a/cli/tools/fmt.rs b/cli/tools/fmt.rs index 284f20ddaa..b9525b7b2e 100644 --- a/cli/tools/fmt.rs +++ b/cli/tools/fmt.rs @@ -68,7 +68,7 @@ pub async fn format(flags: Flags, fmt_flags: FmtFlags) -> Result<(), AnyError> { job_name: "Fmt".to_string(), clear_screen: !watch_flags.no_clear_screen, }, - move |flags, sender, changed_paths| { + move |flags, watcher_communicator, changed_paths| { let fmt_flags = fmt_flags.clone(); Ok(async move { let factory = CliFactory::from_flags(flags).await?; @@ -82,7 +82,7 @@ pub async fn format(flags: Flags, fmt_flags: FmtFlags) -> Result<(), AnyError> { Ok(files) } })?; - _ = sender.send(files.clone()); + _ = watcher_communicator.watch_paths(files.clone()); let refmt_files = if let Some(paths) = changed_paths { if fmt_options.check { // check all files on any changed (https://github.com/denoland/deno/issues/12446) diff --git a/cli/tools/lint.rs b/cli/tools/lint.rs index 6a308b5990..b7f4a3f0d9 100644 --- a/cli/tools/lint.rs +++ b/cli/tools/lint.rs @@ -63,7 +63,7 @@ pub async fn lint(flags: Flags, lint_flags: LintFlags) -> Result<(), AnyError> { job_name: "Lint".to_string(), clear_screen: !watch_flags.no_clear_screen, }, - move |flags, sender, changed_paths| { + move |flags, watcher_communicator, changed_paths| { let lint_flags = lint_flags.clone(); Ok(async move { let factory = CliFactory::from_flags(flags).await?; @@ -77,7 +77,7 @@ pub async fn lint(flags: Flags, lint_flags: LintFlags) -> Result<(), AnyError> { Ok(files) } })?; - _ = sender.send(files.clone()); + _ = watcher_communicator.watch_paths(files.clone()); let lint_paths = if let Some(paths) = changed_paths { // lint all files on any changed (https://github.com/denoland/deno/issues/12446) diff --git a/cli/tools/run.rs b/cli/tools/run.rs index 5fb31a4ad7..80e80577e9 100644 --- a/cli/tools/run.rs +++ b/cli/tools/run.rs @@ -110,18 +110,17 @@ async fn run_with_watch( job_name: "Process".to_string(), clear_screen: !watch_flags.no_clear_screen, }, - move |flags, sender, _changed_paths| { + move |flags, watcher_communicator, _changed_paths| { Ok(async move { let factory = CliFactoryBuilder::new() - .with_watcher(sender.clone()) - .build_from_flags(flags) + .build_from_flags_for_watcher(flags, watcher_communicator.clone()) .await?; let cli_options = factory.cli_options(); let main_module = cli_options.resolve_main_module()?; maybe_npm_install(&factory).await?; - let _ = sender.send(cli_options.watch_paths()); + let _ = watcher_communicator.watch_paths(cli_options.watch_paths()); let permissions = PermissionsContainer::new(Permissions::from_options( &cli_options.permissions_options(), diff --git a/cli/tools/test/mod.rs b/cli/tools/test/mod.rs index b3aadc1e71..8e29ba2cbf 100644 --- a/cli/tools/test/mod.rs +++ b/cli/tools/test/mod.rs @@ -1213,19 +1213,18 @@ pub async fn run_tests_with_watch( .map(|w| !w.no_clear_screen) .unwrap_or(true), }, - move |flags, sender, changed_paths| { + move |flags, watcher_communicator, changed_paths| { let test_flags = test_flags.clone(); Ok(async move { let factory = CliFactoryBuilder::new() - .with_watcher(sender.clone()) - .build_from_flags(flags) + .build_from_flags_for_watcher(flags, watcher_communicator.clone()) .await?; let cli_options = factory.cli_options(); let test_options = cli_options.resolve_test_options(test_flags)?; - let _ = sender.send(cli_options.watch_paths()); + let _ = watcher_communicator.watch_paths(cli_options.watch_paths()); if let Some(include) = &test_options.files.include { - let _ = sender.send(include.clone()); + let _ = watcher_communicator.watch_paths(include.clone()); } let graph_kind = cli_options.type_check_mode().as_graph_kind(); diff --git a/cli/util/file_watcher.rs b/cli/util/file_watcher.rs index c0eda2d863..8d6b4e8fb6 100644 --- a/cli/util/file_watcher.rs +++ b/cli/util/file_watcher.rs @@ -7,6 +7,7 @@ use crate::util::fs::canonicalize_path; use deno_core::error::AnyError; use deno_core::error::JsError; use deno_core::futures::Future; +use deno_core::futures::FutureExt; use deno_runtime::fmt_errors::format_js_error; use log::info; use notify::event::Event as NotifyEvent; @@ -23,7 +24,6 @@ use std::time::Duration; use tokio::select; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedReceiver; -use tokio::sync::mpsc::UnboundedSender; use tokio::time::sleep; const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H"; @@ -109,26 +109,99 @@ fn create_print_after_restart_fn(clear_screen: bool) -> impl Fn() { } } +/// An interface to interact with Deno's CLI file watcher. +#[derive(Debug)] +pub struct WatcherCommunicator { + /// Send a list of paths that should be watched for changes. + paths_to_watch_tx: tokio::sync::mpsc::UnboundedSender>, + + /// Listen for a list of paths that were changed. + changed_paths_rx: tokio::sync::broadcast::Receiver>>, + + /// Send a message to force a restart. + restart_tx: tokio::sync::mpsc::UnboundedSender<()>, +} + +impl Clone for WatcherCommunicator { + fn clone(&self) -> Self { + Self { + paths_to_watch_tx: self.paths_to_watch_tx.clone(), + changed_paths_rx: self.changed_paths_rx.resubscribe(), + restart_tx: self.restart_tx.clone(), + } + } +} + +impl WatcherCommunicator { + pub fn watch_paths(&self, paths: Vec) -> Result<(), AnyError> { + self.paths_to_watch_tx.send(paths).map_err(AnyError::from) + } +} + /// Creates a file watcher. /// /// - `operation` is the actual operation we want to run every time the watcher detects file /// changes. For example, in the case where we would like to bundle, then `operation` would /// have the logic for it like bundling the code. pub async fn watch_func( + flags: Flags, + print_config: PrintConfig, + operation: O, +) -> Result<(), AnyError> +where + O: FnMut( + Flags, + WatcherCommunicator, + Option>, + ) -> Result, + F: Future>, +{ + let fut = watch_recv( + flags, + print_config, + WatcherRestartMode::Automatic, + operation, + ) + .boxed_local(); + + fut.await +} + +#[derive(Clone, Copy, Debug)] +pub enum WatcherRestartMode { + /// When a file path changes the process is restarted. + Automatic, + + /// When a file path changes the caller will trigger a restart, using + /// `WatcherCommunicator.restart_tx`. + // TODO(bartlomieju): this mode will be used in a follow up PR + #[allow(dead_code)] + Manual, +} + +/// Creates a file watcher. +/// +/// - `operation` is the actual operation we want to run every time the watcher detects file +/// changes. For example, in the case where we would like to bundle, then `operation` would +/// have the logic for it like bundling the code. +pub async fn watch_recv( mut flags: Flags, print_config: PrintConfig, + restart_mode: WatcherRestartMode, mut operation: O, ) -> Result<(), AnyError> where O: FnMut( Flags, - UnboundedSender>, + WatcherCommunicator, Option>, ) -> Result, F: Future>, { - let (paths_to_watch_sender, mut paths_to_watch_receiver) = + let (paths_to_watch_tx, mut paths_to_watch_rx) = tokio::sync::mpsc::unbounded_channel(); + let (restart_tx, mut restart_rx) = tokio::sync::mpsc::unbounded_channel(); + let (changed_paths_tx, changed_paths_rx) = tokio::sync::broadcast::channel(4); let (watcher_sender, mut watcher_receiver) = DebouncedReceiver::new_with_sender(); @@ -138,29 +211,13 @@ where } = print_config; let print_after_restart = create_print_after_restart_fn(clear_screen); - + let watcher_communicator = WatcherCommunicator { + paths_to_watch_tx: paths_to_watch_tx.clone(), + changed_paths_rx: changed_paths_rx.resubscribe(), + restart_tx: restart_tx.clone(), + }; info!("{} {} started.", colors::intense_blue("Watcher"), job_name,); - fn consume_paths_to_watch( - watcher: &mut RecommendedWatcher, - receiver: &mut UnboundedReceiver>, - ) { - loop { - match receiver.try_recv() { - Ok(paths) => { - add_paths_to_watcher(watcher, &paths); - } - Err(e) => match e { - mpsc::error::TryRecvError::Empty => { - break; - } - // there must be at least one receiver alive - _ => unreachable!(), - }, - } - } - } - let mut changed_paths = None; loop { // We may need to give the runtime a tick to settle, as cancellations may need to propagate @@ -171,17 +228,17 @@ where } let mut watcher = new_watcher(watcher_sender.clone())?; - consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver); + consume_paths_to_watch(&mut watcher, &mut paths_to_watch_rx); let receiver_future = async { loop { - let maybe_paths = paths_to_watch_receiver.recv().await; + let maybe_paths = paths_to_watch_rx.recv().await; add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap()); } }; let operation_future = error_handler(operation( flags.clone(), - paths_to_watch_sender.clone(), + watcher_communicator.clone(), changed_paths.take(), )?); @@ -190,13 +247,26 @@ where select! { _ = receiver_future => {}, - received_changed_paths = watcher_receiver.recv() => { + _ = restart_rx.recv() => { print_after_restart(); - changed_paths = received_changed_paths; continue; }, + received_changed_paths = watcher_receiver.recv() => { + changed_paths = received_changed_paths.clone(); + + match restart_mode { + WatcherRestartMode::Automatic => { + print_after_restart(); + continue; + }, + WatcherRestartMode::Manual => { + // TODO(bartlomieju): should we fail on sending changed paths? + let _ = changed_paths_tx.send(received_changed_paths); + } + } + }, success = operation_future => { - consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver); + consume_paths_to_watch(&mut watcher, &mut paths_to_watch_rx); // TODO(bartlomieju): print exit code here? info!( "{} {} {}. Restarting on file change...", @@ -213,10 +283,14 @@ where let receiver_future = async { loop { - let maybe_paths = paths_to_watch_receiver.recv().await; + let maybe_paths = paths_to_watch_rx.recv().await; add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap()); } }; + + // If we got this far, it means that the `operation` has finished; let's wait + // and see if there are any new paths to watch received or any of the already + // watched paths has changed. select! { _ = receiver_future => {}, received_changed_paths = watcher_receiver.recv() => { @@ -231,26 +305,28 @@ where fn new_watcher( sender: Arc>>, ) -> Result { - let watcher = Watcher::new( + Ok(Watcher::new( move |res: Result| { - if let Ok(event) = res { - if matches!( - event.kind, - EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) - ) { - let paths = event - .paths - .iter() - .filter_map(|path| canonicalize_path(path).ok()) - .collect(); - sender.send(paths).unwrap(); - } + let Ok(event) = res else { + return; + }; + + if !matches!( + event.kind, + EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) + ) { + return; } + + let paths = event + .paths + .iter() + .filter_map(|path| canonicalize_path(path).ok()) + .collect(); + sender.send(paths).unwrap(); }, Default::default(), - )?; - - Ok(watcher) + )?) } fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) { @@ -260,3 +336,23 @@ fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) { } log::debug!("Watching paths: {:?}", paths); } + +fn consume_paths_to_watch( + watcher: &mut RecommendedWatcher, + receiver: &mut UnboundedReceiver>, +) { + loop { + match receiver.try_recv() { + Ok(paths) => { + add_paths_to_watcher(watcher, &paths); + } + Err(e) => match e { + mpsc::error::TryRecvError::Empty => { + break; + } + // there must be at least one receiver alive + _ => unreachable!(), + }, + } + } +}