mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 16:42:21 -05:00
refactor: add WatcherCommunicator helper struct (#20927)
This commit introduces "WatcherCommunicator" struct that is used facilitate bi-directional communication between CLI file watcher and the watched function. Prerequisite for https://github.com/denoland/deno/pull/20876
This commit is contained in:
parent
5095af7801
commit
8d9fef3b89
9 changed files with 191 additions and 90 deletions
|
@ -40,6 +40,7 @@ use crate::resolver::CliGraphResolver;
|
||||||
use crate::resolver::CliGraphResolverOptions;
|
use crate::resolver::CliGraphResolverOptions;
|
||||||
use crate::standalone::DenoCompileBinaryWriter;
|
use crate::standalone::DenoCompileBinaryWriter;
|
||||||
use crate::tools::check::TypeChecker;
|
use crate::tools::check::TypeChecker;
|
||||||
|
use crate::util::file_watcher::WatcherCommunicator;
|
||||||
use crate::util::progress_bar::ProgressBar;
|
use crate::util::progress_bar::ProgressBar;
|
||||||
use crate::util::progress_bar::ProgressBarStyle;
|
use crate::util::progress_bar::ProgressBarStyle;
|
||||||
use crate::worker::CliMainWorkerFactory;
|
use crate::worker::CliMainWorkerFactory;
|
||||||
|
@ -59,26 +60,18 @@ use deno_runtime::inspector_server::InspectorServer;
|
||||||
use deno_semver::npm::NpmPackageReqReference;
|
use deno_semver::npm::NpmPackageReqReference;
|
||||||
use import_map::ImportMap;
|
use import_map::ImportMap;
|
||||||
use log::warn;
|
use log::warn;
|
||||||
use std::cell::RefCell;
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
pub struct CliFactoryBuilder {
|
pub struct CliFactoryBuilder {
|
||||||
maybe_sender: Option<tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>>,
|
watcher_communicator: Option<WatcherCommunicator>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CliFactoryBuilder {
|
impl CliFactoryBuilder {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self { maybe_sender: None }
|
Self {
|
||||||
}
|
watcher_communicator: None,
|
||||||
|
}
|
||||||
pub fn with_watcher(
|
|
||||||
mut self,
|
|
||||||
sender: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>,
|
|
||||||
) -> Self {
|
|
||||||
self.maybe_sender = Some(sender);
|
|
||||||
self
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn build_from_flags(
|
pub async fn build_from_flags(
|
||||||
|
@ -88,9 +81,18 @@ impl CliFactoryBuilder {
|
||||||
Ok(self.build_from_cli_options(Arc::new(CliOptions::from_flags(flags)?)))
|
Ok(self.build_from_cli_options(Arc::new(CliOptions::from_flags(flags)?)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn build_from_flags_for_watcher(
|
||||||
|
mut self,
|
||||||
|
flags: Flags,
|
||||||
|
watcher_communicator: WatcherCommunicator,
|
||||||
|
) -> Result<CliFactory, AnyError> {
|
||||||
|
self.watcher_communicator = Some(watcher_communicator);
|
||||||
|
self.build_from_flags(flags).await
|
||||||
|
}
|
||||||
|
|
||||||
pub fn build_from_cli_options(self, options: Arc<CliOptions>) -> CliFactory {
|
pub fn build_from_cli_options(self, options: Arc<CliOptions>) -> CliFactory {
|
||||||
CliFactory {
|
CliFactory {
|
||||||
maybe_sender: RefCell::new(self.maybe_sender),
|
watcher_communicator: self.watcher_communicator,
|
||||||
options,
|
options,
|
||||||
services: Default::default(),
|
services: Default::default(),
|
||||||
}
|
}
|
||||||
|
@ -166,8 +168,7 @@ struct CliFactoryServices {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct CliFactory {
|
pub struct CliFactory {
|
||||||
maybe_sender:
|
watcher_communicator: Option<WatcherCommunicator>,
|
||||||
RefCell<Option<tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>>>,
|
|
||||||
options: Arc<CliOptions>,
|
options: Arc<CliOptions>,
|
||||||
services: CliFactoryServices,
|
services: CliFactoryServices,
|
||||||
}
|
}
|
||||||
|
@ -384,11 +385,14 @@ impl CliFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn maybe_file_watcher_reporter(&self) -> &Option<FileWatcherReporter> {
|
pub fn maybe_file_watcher_reporter(&self) -> &Option<FileWatcherReporter> {
|
||||||
let maybe_sender = self.maybe_sender.borrow_mut().take();
|
let maybe_file_watcher_reporter = self
|
||||||
|
.watcher_communicator
|
||||||
|
.as_ref()
|
||||||
|
.map(|i| FileWatcherReporter::new(i.clone()));
|
||||||
self
|
self
|
||||||
.services
|
.services
|
||||||
.maybe_file_watcher_reporter
|
.maybe_file_watcher_reporter
|
||||||
.get_or_init(|| maybe_sender.map(FileWatcherReporter::new))
|
.get_or_init(|| maybe_file_watcher_reporter)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn emit_cache(&self) -> Result<&EmitCache, AnyError> {
|
pub fn emit_cache(&self) -> Result<&EmitCache, AnyError> {
|
||||||
|
@ -595,6 +599,7 @@ impl CliFactory {
|
||||||
let npm_resolver = self.npm_resolver().await?;
|
let npm_resolver = self.npm_resolver().await?;
|
||||||
let fs = self.fs();
|
let fs = self.fs();
|
||||||
let cli_node_resolver = self.cli_node_resolver().await?;
|
let cli_node_resolver = self.cli_node_resolver().await?;
|
||||||
|
|
||||||
Ok(CliMainWorkerFactory::new(
|
Ok(CliMainWorkerFactory::new(
|
||||||
StorageKeyResolver::from_options(&self.options),
|
StorageKeyResolver::from_options(&self.options),
|
||||||
npm_resolver.clone(),
|
npm_resolver.clone(),
|
||||||
|
|
|
@ -13,6 +13,7 @@ use crate::npm::CliNpmResolver;
|
||||||
use crate::resolver::CliGraphResolver;
|
use crate::resolver::CliGraphResolver;
|
||||||
use crate::tools::check;
|
use crate::tools::check;
|
||||||
use crate::tools::check::TypeChecker;
|
use crate::tools::check::TypeChecker;
|
||||||
|
use crate::util::file_watcher::WatcherCommunicator;
|
||||||
use crate::util::sync::TaskQueue;
|
use crate::util::sync::TaskQueue;
|
||||||
use crate::util::sync::TaskQueuePermit;
|
use crate::util::sync::TaskQueuePermit;
|
||||||
|
|
||||||
|
@ -635,14 +636,14 @@ impl<'a> ModuleGraphUpdatePermit<'a> {
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct FileWatcherReporter {
|
pub struct FileWatcherReporter {
|
||||||
sender: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>,
|
watcher_communicator: WatcherCommunicator,
|
||||||
file_paths: Arc<Mutex<Vec<PathBuf>>>,
|
file_paths: Arc<Mutex<Vec<PathBuf>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FileWatcherReporter {
|
impl FileWatcherReporter {
|
||||||
pub fn new(sender: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>) -> Self {
|
pub fn new(watcher_communicator: WatcherCommunicator) -> Self {
|
||||||
Self {
|
Self {
|
||||||
sender,
|
watcher_communicator,
|
||||||
file_paths: Default::default(),
|
file_paths: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -665,7 +666,10 @@ impl deno_graph::source::Reporter for FileWatcherReporter {
|
||||||
}
|
}
|
||||||
|
|
||||||
if modules_done == modules_total {
|
if modules_done == modules_total {
|
||||||
self.sender.send(file_paths.drain(..).collect()).unwrap();
|
self
|
||||||
|
.watcher_communicator
|
||||||
|
.watch_paths(file_paths.drain(..).collect())
|
||||||
|
.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -417,19 +417,18 @@ pub async fn run_benchmarks_with_watch(
|
||||||
.map(|w| !w.no_clear_screen)
|
.map(|w| !w.no_clear_screen)
|
||||||
.unwrap_or(true),
|
.unwrap_or(true),
|
||||||
},
|
},
|
||||||
move |flags, sender, changed_paths| {
|
move |flags, watcher_communicator, changed_paths| {
|
||||||
let bench_flags = bench_flags.clone();
|
let bench_flags = bench_flags.clone();
|
||||||
Ok(async move {
|
Ok(async move {
|
||||||
let factory = CliFactoryBuilder::new()
|
let factory = CliFactoryBuilder::new()
|
||||||
.with_watcher(sender.clone())
|
.build_from_flags_for_watcher(flags, watcher_communicator.clone())
|
||||||
.build_from_flags(flags)
|
|
||||||
.await?;
|
.await?;
|
||||||
let cli_options = factory.cli_options();
|
let cli_options = factory.cli_options();
|
||||||
let bench_options = cli_options.resolve_bench_options(bench_flags)?;
|
let bench_options = cli_options.resolve_bench_options(bench_flags)?;
|
||||||
|
|
||||||
let _ = sender.send(cli_options.watch_paths());
|
let _ = watcher_communicator.watch_paths(cli_options.watch_paths());
|
||||||
if let Some(include) = &bench_options.files.include {
|
if let Some(include) = &bench_options.files.include {
|
||||||
let _ = sender.send(include.clone());
|
let _ = watcher_communicator.watch_paths(include.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
let graph_kind = cli_options.type_check_mode().as_graph_kind();
|
let graph_kind = cli_options.type_check_mode().as_graph_kind();
|
||||||
|
|
|
@ -35,15 +35,14 @@ pub async fn bundle(
|
||||||
job_name: "Bundle".to_string(),
|
job_name: "Bundle".to_string(),
|
||||||
clear_screen: !watch_flags.no_clear_screen,
|
clear_screen: !watch_flags.no_clear_screen,
|
||||||
},
|
},
|
||||||
move |flags, sender, _changed_paths| {
|
move |flags, watcher_communicator, _changed_paths| {
|
||||||
let bundle_flags = bundle_flags.clone();
|
let bundle_flags = bundle_flags.clone();
|
||||||
Ok(async move {
|
Ok(async move {
|
||||||
let factory = CliFactoryBuilder::new()
|
let factory = CliFactoryBuilder::new()
|
||||||
.with_watcher(sender.clone())
|
.build_from_flags_for_watcher(flags, watcher_communicator.clone())
|
||||||
.build_from_flags(flags)
|
|
||||||
.await?;
|
.await?;
|
||||||
let cli_options = factory.cli_options();
|
let cli_options = factory.cli_options();
|
||||||
let _ = sender.send(cli_options.watch_paths());
|
let _ = watcher_communicator.watch_paths(cli_options.watch_paths());
|
||||||
bundle_action(factory, &bundle_flags).await?;
|
bundle_action(factory, &bundle_flags).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -68,7 +68,7 @@ pub async fn format(flags: Flags, fmt_flags: FmtFlags) -> Result<(), AnyError> {
|
||||||
job_name: "Fmt".to_string(),
|
job_name: "Fmt".to_string(),
|
||||||
clear_screen: !watch_flags.no_clear_screen,
|
clear_screen: !watch_flags.no_clear_screen,
|
||||||
},
|
},
|
||||||
move |flags, sender, changed_paths| {
|
move |flags, watcher_communicator, changed_paths| {
|
||||||
let fmt_flags = fmt_flags.clone();
|
let fmt_flags = fmt_flags.clone();
|
||||||
Ok(async move {
|
Ok(async move {
|
||||||
let factory = CliFactory::from_flags(flags).await?;
|
let factory = CliFactory::from_flags(flags).await?;
|
||||||
|
@ -82,7 +82,7 @@ pub async fn format(flags: Flags, fmt_flags: FmtFlags) -> Result<(), AnyError> {
|
||||||
Ok(files)
|
Ok(files)
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
_ = sender.send(files.clone());
|
_ = watcher_communicator.watch_paths(files.clone());
|
||||||
let refmt_files = if let Some(paths) = changed_paths {
|
let refmt_files = if let Some(paths) = changed_paths {
|
||||||
if fmt_options.check {
|
if fmt_options.check {
|
||||||
// check all files on any changed (https://github.com/denoland/deno/issues/12446)
|
// check all files on any changed (https://github.com/denoland/deno/issues/12446)
|
||||||
|
|
|
@ -63,7 +63,7 @@ pub async fn lint(flags: Flags, lint_flags: LintFlags) -> Result<(), AnyError> {
|
||||||
job_name: "Lint".to_string(),
|
job_name: "Lint".to_string(),
|
||||||
clear_screen: !watch_flags.no_clear_screen,
|
clear_screen: !watch_flags.no_clear_screen,
|
||||||
},
|
},
|
||||||
move |flags, sender, changed_paths| {
|
move |flags, watcher_communicator, changed_paths| {
|
||||||
let lint_flags = lint_flags.clone();
|
let lint_flags = lint_flags.clone();
|
||||||
Ok(async move {
|
Ok(async move {
|
||||||
let factory = CliFactory::from_flags(flags).await?;
|
let factory = CliFactory::from_flags(flags).await?;
|
||||||
|
@ -77,7 +77,7 @@ pub async fn lint(flags: Flags, lint_flags: LintFlags) -> Result<(), AnyError> {
|
||||||
Ok(files)
|
Ok(files)
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
_ = sender.send(files.clone());
|
_ = watcher_communicator.watch_paths(files.clone());
|
||||||
|
|
||||||
let lint_paths = if let Some(paths) = changed_paths {
|
let lint_paths = if let Some(paths) = changed_paths {
|
||||||
// lint all files on any changed (https://github.com/denoland/deno/issues/12446)
|
// lint all files on any changed (https://github.com/denoland/deno/issues/12446)
|
||||||
|
|
|
@ -110,18 +110,17 @@ async fn run_with_watch(
|
||||||
job_name: "Process".to_string(),
|
job_name: "Process".to_string(),
|
||||||
clear_screen: !watch_flags.no_clear_screen,
|
clear_screen: !watch_flags.no_clear_screen,
|
||||||
},
|
},
|
||||||
move |flags, sender, _changed_paths| {
|
move |flags, watcher_communicator, _changed_paths| {
|
||||||
Ok(async move {
|
Ok(async move {
|
||||||
let factory = CliFactoryBuilder::new()
|
let factory = CliFactoryBuilder::new()
|
||||||
.with_watcher(sender.clone())
|
.build_from_flags_for_watcher(flags, watcher_communicator.clone())
|
||||||
.build_from_flags(flags)
|
|
||||||
.await?;
|
.await?;
|
||||||
let cli_options = factory.cli_options();
|
let cli_options = factory.cli_options();
|
||||||
let main_module = cli_options.resolve_main_module()?;
|
let main_module = cli_options.resolve_main_module()?;
|
||||||
|
|
||||||
maybe_npm_install(&factory).await?;
|
maybe_npm_install(&factory).await?;
|
||||||
|
|
||||||
let _ = sender.send(cli_options.watch_paths());
|
let _ = watcher_communicator.watch_paths(cli_options.watch_paths());
|
||||||
|
|
||||||
let permissions = PermissionsContainer::new(Permissions::from_options(
|
let permissions = PermissionsContainer::new(Permissions::from_options(
|
||||||
&cli_options.permissions_options(),
|
&cli_options.permissions_options(),
|
||||||
|
|
|
@ -1213,19 +1213,18 @@ pub async fn run_tests_with_watch(
|
||||||
.map(|w| !w.no_clear_screen)
|
.map(|w| !w.no_clear_screen)
|
||||||
.unwrap_or(true),
|
.unwrap_or(true),
|
||||||
},
|
},
|
||||||
move |flags, sender, changed_paths| {
|
move |flags, watcher_communicator, changed_paths| {
|
||||||
let test_flags = test_flags.clone();
|
let test_flags = test_flags.clone();
|
||||||
Ok(async move {
|
Ok(async move {
|
||||||
let factory = CliFactoryBuilder::new()
|
let factory = CliFactoryBuilder::new()
|
||||||
.with_watcher(sender.clone())
|
.build_from_flags_for_watcher(flags, watcher_communicator.clone())
|
||||||
.build_from_flags(flags)
|
|
||||||
.await?;
|
.await?;
|
||||||
let cli_options = factory.cli_options();
|
let cli_options = factory.cli_options();
|
||||||
let test_options = cli_options.resolve_test_options(test_flags)?;
|
let test_options = cli_options.resolve_test_options(test_flags)?;
|
||||||
|
|
||||||
let _ = sender.send(cli_options.watch_paths());
|
let _ = watcher_communicator.watch_paths(cli_options.watch_paths());
|
||||||
if let Some(include) = &test_options.files.include {
|
if let Some(include) = &test_options.files.include {
|
||||||
let _ = sender.send(include.clone());
|
let _ = watcher_communicator.watch_paths(include.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
let graph_kind = cli_options.type_check_mode().as_graph_kind();
|
let graph_kind = cli_options.type_check_mode().as_graph_kind();
|
||||||
|
|
|
@ -7,6 +7,7 @@ use crate::util::fs::canonicalize_path;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
use deno_core::error::JsError;
|
use deno_core::error::JsError;
|
||||||
use deno_core::futures::Future;
|
use deno_core::futures::Future;
|
||||||
|
use deno_core::futures::FutureExt;
|
||||||
use deno_runtime::fmt_errors::format_js_error;
|
use deno_runtime::fmt_errors::format_js_error;
|
||||||
use log::info;
|
use log::info;
|
||||||
use notify::event::Event as NotifyEvent;
|
use notify::event::Event as NotifyEvent;
|
||||||
|
@ -23,7 +24,6 @@ use std::time::Duration;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::mpsc::UnboundedReceiver;
|
use tokio::sync::mpsc::UnboundedReceiver;
|
||||||
use tokio::sync::mpsc::UnboundedSender;
|
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
|
||||||
const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H";
|
const CLEAR_SCREEN: &str = "\x1B[2J\x1B[1;1H";
|
||||||
|
@ -109,26 +109,99 @@ fn create_print_after_restart_fn(clear_screen: bool) -> impl Fn() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An interface to interact with Deno's CLI file watcher.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct WatcherCommunicator {
|
||||||
|
/// Send a list of paths that should be watched for changes.
|
||||||
|
paths_to_watch_tx: tokio::sync::mpsc::UnboundedSender<Vec<PathBuf>>,
|
||||||
|
|
||||||
|
/// Listen for a list of paths that were changed.
|
||||||
|
changed_paths_rx: tokio::sync::broadcast::Receiver<Option<Vec<PathBuf>>>,
|
||||||
|
|
||||||
|
/// Send a message to force a restart.
|
||||||
|
restart_tx: tokio::sync::mpsc::UnboundedSender<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for WatcherCommunicator {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
paths_to_watch_tx: self.paths_to_watch_tx.clone(),
|
||||||
|
changed_paths_rx: self.changed_paths_rx.resubscribe(),
|
||||||
|
restart_tx: self.restart_tx.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WatcherCommunicator {
|
||||||
|
pub fn watch_paths(&self, paths: Vec<PathBuf>) -> Result<(), AnyError> {
|
||||||
|
self.paths_to_watch_tx.send(paths).map_err(AnyError::from)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates a file watcher.
|
/// Creates a file watcher.
|
||||||
///
|
///
|
||||||
/// - `operation` is the actual operation we want to run every time the watcher detects file
|
/// - `operation` is the actual operation we want to run every time the watcher detects file
|
||||||
/// changes. For example, in the case where we would like to bundle, then `operation` would
|
/// changes. For example, in the case where we would like to bundle, then `operation` would
|
||||||
/// have the logic for it like bundling the code.
|
/// have the logic for it like bundling the code.
|
||||||
pub async fn watch_func<O, F>(
|
pub async fn watch_func<O, F>(
|
||||||
|
flags: Flags,
|
||||||
|
print_config: PrintConfig,
|
||||||
|
operation: O,
|
||||||
|
) -> Result<(), AnyError>
|
||||||
|
where
|
||||||
|
O: FnMut(
|
||||||
|
Flags,
|
||||||
|
WatcherCommunicator,
|
||||||
|
Option<Vec<PathBuf>>,
|
||||||
|
) -> Result<F, AnyError>,
|
||||||
|
F: Future<Output = Result<(), AnyError>>,
|
||||||
|
{
|
||||||
|
let fut = watch_recv(
|
||||||
|
flags,
|
||||||
|
print_config,
|
||||||
|
WatcherRestartMode::Automatic,
|
||||||
|
operation,
|
||||||
|
)
|
||||||
|
.boxed_local();
|
||||||
|
|
||||||
|
fut.await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
|
pub enum WatcherRestartMode {
|
||||||
|
/// When a file path changes the process is restarted.
|
||||||
|
Automatic,
|
||||||
|
|
||||||
|
/// When a file path changes the caller will trigger a restart, using
|
||||||
|
/// `WatcherCommunicator.restart_tx`.
|
||||||
|
// TODO(bartlomieju): this mode will be used in a follow up PR
|
||||||
|
#[allow(dead_code)]
|
||||||
|
Manual,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a file watcher.
|
||||||
|
///
|
||||||
|
/// - `operation` is the actual operation we want to run every time the watcher detects file
|
||||||
|
/// changes. For example, in the case where we would like to bundle, then `operation` would
|
||||||
|
/// have the logic for it like bundling the code.
|
||||||
|
pub async fn watch_recv<O, F>(
|
||||||
mut flags: Flags,
|
mut flags: Flags,
|
||||||
print_config: PrintConfig,
|
print_config: PrintConfig,
|
||||||
|
restart_mode: WatcherRestartMode,
|
||||||
mut operation: O,
|
mut operation: O,
|
||||||
) -> Result<(), AnyError>
|
) -> Result<(), AnyError>
|
||||||
where
|
where
|
||||||
O: FnMut(
|
O: FnMut(
|
||||||
Flags,
|
Flags,
|
||||||
UnboundedSender<Vec<PathBuf>>,
|
WatcherCommunicator,
|
||||||
Option<Vec<PathBuf>>,
|
Option<Vec<PathBuf>>,
|
||||||
) -> Result<F, AnyError>,
|
) -> Result<F, AnyError>,
|
||||||
F: Future<Output = Result<(), AnyError>>,
|
F: Future<Output = Result<(), AnyError>>,
|
||||||
{
|
{
|
||||||
let (paths_to_watch_sender, mut paths_to_watch_receiver) =
|
let (paths_to_watch_tx, mut paths_to_watch_rx) =
|
||||||
tokio::sync::mpsc::unbounded_channel();
|
tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (restart_tx, mut restart_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
let (changed_paths_tx, changed_paths_rx) = tokio::sync::broadcast::channel(4);
|
||||||
let (watcher_sender, mut watcher_receiver) =
|
let (watcher_sender, mut watcher_receiver) =
|
||||||
DebouncedReceiver::new_with_sender();
|
DebouncedReceiver::new_with_sender();
|
||||||
|
|
||||||
|
@ -138,29 +211,13 @@ where
|
||||||
} = print_config;
|
} = print_config;
|
||||||
|
|
||||||
let print_after_restart = create_print_after_restart_fn(clear_screen);
|
let print_after_restart = create_print_after_restart_fn(clear_screen);
|
||||||
|
let watcher_communicator = WatcherCommunicator {
|
||||||
|
paths_to_watch_tx: paths_to_watch_tx.clone(),
|
||||||
|
changed_paths_rx: changed_paths_rx.resubscribe(),
|
||||||
|
restart_tx: restart_tx.clone(),
|
||||||
|
};
|
||||||
info!("{} {} started.", colors::intense_blue("Watcher"), job_name,);
|
info!("{} {} started.", colors::intense_blue("Watcher"), job_name,);
|
||||||
|
|
||||||
fn consume_paths_to_watch(
|
|
||||||
watcher: &mut RecommendedWatcher,
|
|
||||||
receiver: &mut UnboundedReceiver<Vec<PathBuf>>,
|
|
||||||
) {
|
|
||||||
loop {
|
|
||||||
match receiver.try_recv() {
|
|
||||||
Ok(paths) => {
|
|
||||||
add_paths_to_watcher(watcher, &paths);
|
|
||||||
}
|
|
||||||
Err(e) => match e {
|
|
||||||
mpsc::error::TryRecvError::Empty => {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// there must be at least one receiver alive
|
|
||||||
_ => unreachable!(),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut changed_paths = None;
|
let mut changed_paths = None;
|
||||||
loop {
|
loop {
|
||||||
// We may need to give the runtime a tick to settle, as cancellations may need to propagate
|
// We may need to give the runtime a tick to settle, as cancellations may need to propagate
|
||||||
|
@ -171,17 +228,17 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut watcher = new_watcher(watcher_sender.clone())?;
|
let mut watcher = new_watcher(watcher_sender.clone())?;
|
||||||
consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver);
|
consume_paths_to_watch(&mut watcher, &mut paths_to_watch_rx);
|
||||||
|
|
||||||
let receiver_future = async {
|
let receiver_future = async {
|
||||||
loop {
|
loop {
|
||||||
let maybe_paths = paths_to_watch_receiver.recv().await;
|
let maybe_paths = paths_to_watch_rx.recv().await;
|
||||||
add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap());
|
add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let operation_future = error_handler(operation(
|
let operation_future = error_handler(operation(
|
||||||
flags.clone(),
|
flags.clone(),
|
||||||
paths_to_watch_sender.clone(),
|
watcher_communicator.clone(),
|
||||||
changed_paths.take(),
|
changed_paths.take(),
|
||||||
)?);
|
)?);
|
||||||
|
|
||||||
|
@ -190,13 +247,26 @@ where
|
||||||
|
|
||||||
select! {
|
select! {
|
||||||
_ = receiver_future => {},
|
_ = receiver_future => {},
|
||||||
received_changed_paths = watcher_receiver.recv() => {
|
_ = restart_rx.recv() => {
|
||||||
print_after_restart();
|
print_after_restart();
|
||||||
changed_paths = received_changed_paths;
|
|
||||||
continue;
|
continue;
|
||||||
},
|
},
|
||||||
|
received_changed_paths = watcher_receiver.recv() => {
|
||||||
|
changed_paths = received_changed_paths.clone();
|
||||||
|
|
||||||
|
match restart_mode {
|
||||||
|
WatcherRestartMode::Automatic => {
|
||||||
|
print_after_restart();
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
WatcherRestartMode::Manual => {
|
||||||
|
// TODO(bartlomieju): should we fail on sending changed paths?
|
||||||
|
let _ = changed_paths_tx.send(received_changed_paths);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
success = operation_future => {
|
success = operation_future => {
|
||||||
consume_paths_to_watch(&mut watcher, &mut paths_to_watch_receiver);
|
consume_paths_to_watch(&mut watcher, &mut paths_to_watch_rx);
|
||||||
// TODO(bartlomieju): print exit code here?
|
// TODO(bartlomieju): print exit code here?
|
||||||
info!(
|
info!(
|
||||||
"{} {} {}. Restarting on file change...",
|
"{} {} {}. Restarting on file change...",
|
||||||
|
@ -213,10 +283,14 @@ where
|
||||||
|
|
||||||
let receiver_future = async {
|
let receiver_future = async {
|
||||||
loop {
|
loop {
|
||||||
let maybe_paths = paths_to_watch_receiver.recv().await;
|
let maybe_paths = paths_to_watch_rx.recv().await;
|
||||||
add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap());
|
add_paths_to_watcher(&mut watcher, &maybe_paths.unwrap());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// If we got this far, it means that the `operation` has finished; let's wait
|
||||||
|
// and see if there are any new paths to watch received or any of the already
|
||||||
|
// watched paths has changed.
|
||||||
select! {
|
select! {
|
||||||
_ = receiver_future => {},
|
_ = receiver_future => {},
|
||||||
received_changed_paths = watcher_receiver.recv() => {
|
received_changed_paths = watcher_receiver.recv() => {
|
||||||
|
@ -231,26 +305,28 @@ where
|
||||||
fn new_watcher(
|
fn new_watcher(
|
||||||
sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>,
|
sender: Arc<mpsc::UnboundedSender<Vec<PathBuf>>>,
|
||||||
) -> Result<RecommendedWatcher, AnyError> {
|
) -> Result<RecommendedWatcher, AnyError> {
|
||||||
let watcher = Watcher::new(
|
Ok(Watcher::new(
|
||||||
move |res: Result<NotifyEvent, NotifyError>| {
|
move |res: Result<NotifyEvent, NotifyError>| {
|
||||||
if let Ok(event) = res {
|
let Ok(event) = res else {
|
||||||
if matches!(
|
return;
|
||||||
event.kind,
|
};
|
||||||
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
|
|
||||||
) {
|
if !matches!(
|
||||||
let paths = event
|
event.kind,
|
||||||
.paths
|
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
|
||||||
.iter()
|
) {
|
||||||
.filter_map(|path| canonicalize_path(path).ok())
|
return;
|
||||||
.collect();
|
|
||||||
sender.send(paths).unwrap();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let paths = event
|
||||||
|
.paths
|
||||||
|
.iter()
|
||||||
|
.filter_map(|path| canonicalize_path(path).ok())
|
||||||
|
.collect();
|
||||||
|
sender.send(paths).unwrap();
|
||||||
},
|
},
|
||||||
Default::default(),
|
Default::default(),
|
||||||
)?;
|
)?)
|
||||||
|
|
||||||
Ok(watcher)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) {
|
fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) {
|
||||||
|
@ -260,3 +336,23 @@ fn add_paths_to_watcher(watcher: &mut RecommendedWatcher, paths: &[PathBuf]) {
|
||||||
}
|
}
|
||||||
log::debug!("Watching paths: {:?}", paths);
|
log::debug!("Watching paths: {:?}", paths);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn consume_paths_to_watch(
|
||||||
|
watcher: &mut RecommendedWatcher,
|
||||||
|
receiver: &mut UnboundedReceiver<Vec<PathBuf>>,
|
||||||
|
) {
|
||||||
|
loop {
|
||||||
|
match receiver.try_recv() {
|
||||||
|
Ok(paths) => {
|
||||||
|
add_paths_to_watcher(watcher, &paths);
|
||||||
|
}
|
||||||
|
Err(e) => match e {
|
||||||
|
mpsc::error::TryRecvError::Empty => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// there must be at least one receiver alive
|
||||||
|
_ => unreachable!(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue