mirror of
https://github.com/denoland/deno.git
synced 2024-12-21 23:04:45 -05:00
fix: share inotify fd across watchers (#26200)
Fixes https://github.com/denoland/deno/issues/26104 Fixes https://github.com/denoland/deno/issues/26071 Fixes https://github.com/denoland/deno/issues/17757
This commit is contained in:
parent
285635daa6
commit
be969cb532
4 changed files with 72 additions and 24 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -2042,6 +2042,7 @@ dependencies = [
|
|||
"percent-encoding",
|
||||
"regex",
|
||||
"rustyline",
|
||||
"same-file",
|
||||
"serde",
|
||||
"signal-hook",
|
||||
"signal-hook-registry",
|
||||
|
|
|
@ -117,6 +117,7 @@ once_cell.workspace = true
|
|||
percent-encoding.workspace = true
|
||||
regex.workspace = true
|
||||
rustyline = { workspace = true, features = ["custom-bindings"] }
|
||||
same-file = "1.0.6"
|
||||
serde.workspace = true
|
||||
signal-hook = "0.3.17"
|
||||
signal-hook-registry = "1.4.0"
|
||||
|
|
|
@ -21,7 +21,7 @@ class FsWatcher {
|
|||
|
||||
constructor(paths, options) {
|
||||
const { recursive } = options;
|
||||
this.#rid = op_fs_events_open({ recursive, paths });
|
||||
this.#rid = op_fs_events_open(recursive, paths);
|
||||
}
|
||||
|
||||
unref() {
|
||||
|
|
|
@ -19,13 +19,14 @@ use notify::EventKind;
|
|||
use notify::RecommendedWatcher;
|
||||
use notify::RecursiveMode;
|
||||
use notify::Watcher;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::borrow::Cow;
|
||||
use std::cell::RefCell;
|
||||
use std::convert::From;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
deno_core::extension!(
|
||||
|
@ -34,8 +35,6 @@ deno_core::extension!(
|
|||
);
|
||||
|
||||
struct FsEventsResource {
|
||||
#[allow(unused)]
|
||||
watcher: RecommendedWatcher,
|
||||
receiver: AsyncRefCell<mpsc::Receiver<Result<FsEvent, NotifyError>>>,
|
||||
cancel: CancelHandle,
|
||||
}
|
||||
|
@ -58,7 +57,7 @@ impl Resource for FsEventsResource {
|
|||
///
|
||||
/// Feel free to expand this struct as long as you can add tests to demonstrate
|
||||
/// the complexity.
|
||||
#[derive(Serialize, Debug)]
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
struct FsEvent {
|
||||
kind: &'static str,
|
||||
paths: Vec<PathBuf>,
|
||||
|
@ -92,6 +91,24 @@ impl From<NotifyEvent> for FsEvent {
|
|||
}
|
||||
}
|
||||
|
||||
type WatchSender = (Vec<String>, mpsc::Sender<Result<FsEvent, NotifyError>>);
|
||||
|
||||
struct WatcherState {
|
||||
senders: Arc<Mutex<Vec<WatchSender>>>,
|
||||
watcher: RecommendedWatcher,
|
||||
}
|
||||
|
||||
fn starts_with_canonicalized(path: &Path, prefix: &str) -> bool {
|
||||
#[allow(clippy::disallowed_methods)]
|
||||
let path = path.canonicalize().ok();
|
||||
#[allow(clippy::disallowed_methods)]
|
||||
let prefix = std::fs::canonicalize(prefix).ok();
|
||||
match (path, prefix) {
|
||||
(Some(path), Some(prefix)) => path.starts_with(prefix),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum FsEventsError {
|
||||
#[error(transparent)]
|
||||
|
@ -104,44 +121,73 @@ pub enum FsEventsError {
|
|||
Canceled(#[from] deno_core::Canceled),
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct OpenArgs {
|
||||
recursive: bool,
|
||||
fn start_watcher(
|
||||
state: &mut OpState,
|
||||
paths: Vec<String>,
|
||||
sender: mpsc::Sender<Result<FsEvent, NotifyError>>,
|
||||
) -> Result<(), FsEventsError> {
|
||||
if let Some(watcher) = state.try_borrow_mut::<WatcherState>() {
|
||||
watcher.senders.lock().push((paths, sender));
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let senders = Arc::new(Mutex::new(vec![(paths, sender)]));
|
||||
|
||||
let sender_clone = senders.clone();
|
||||
let watcher: RecommendedWatcher = Watcher::new(
|
||||
move |res: Result<NotifyEvent, NotifyError>| {
|
||||
let res2 = res.map(FsEvent::from).map_err(FsEventsError::Notify);
|
||||
for (paths, sender) in sender_clone.lock().iter() {
|
||||
// Ignore result, if send failed it means that watcher was already closed,
|
||||
// but not all messages have been flushed.
|
||||
|
||||
// Only send the event if the path matches one of the paths that the user is watching
|
||||
if let Ok(event) = &res2 {
|
||||
if paths.iter().any(|path| {
|
||||
event.paths.iter().any(|event_path| {
|
||||
same_file::is_same_file(event_path, path).unwrap_or(false)
|
||||
|| starts_with_canonicalized(event_path, path)
|
||||
})
|
||||
}) {
|
||||
let _ = sender.try_send(Ok(event.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Default::default(),
|
||||
)?;
|
||||
|
||||
state.put::<WatcherState>(WatcherState { watcher, senders });
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[op2]
|
||||
#[smi]
|
||||
fn op_fs_events_open(
|
||||
state: &mut OpState,
|
||||
#[serde] args: OpenArgs,
|
||||
recursive: bool,
|
||||
#[serde] paths: Vec<String>,
|
||||
) -> Result<ResourceId, FsEventsError> {
|
||||
let (sender, receiver) = mpsc::channel::<Result<FsEvent, NotifyError>>(16);
|
||||
let sender = Mutex::new(sender);
|
||||
let mut watcher: RecommendedWatcher = Watcher::new(
|
||||
move |res: Result<NotifyEvent, NotifyError>| {
|
||||
let res2 = res.map(FsEvent::from);
|
||||
let sender = sender.lock();
|
||||
// Ignore result, if send failed it means that watcher was already closed,
|
||||
// but not all messages have been flushed.
|
||||
let _ = sender.try_send(res2);
|
||||
},
|
||||
Default::default(),
|
||||
)?;
|
||||
let recursive_mode = if args.recursive {
|
||||
|
||||
start_watcher(state, paths.clone(), sender)?;
|
||||
|
||||
let recursive_mode = if recursive {
|
||||
RecursiveMode::Recursive
|
||||
} else {
|
||||
RecursiveMode::NonRecursive
|
||||
};
|
||||
for path in &args.paths {
|
||||
for path in &paths {
|
||||
let path = state
|
||||
.borrow_mut::<PermissionsContainer>()
|
||||
.check_read(path, "Deno.watchFs()")
|
||||
.map_err(FsEventsError::Permission)?;
|
||||
watcher.watch(&path, recursive_mode)?;
|
||||
|
||||
let watcher = state.borrow_mut::<WatcherState>();
|
||||
watcher.watcher.watch(&path, recursive_mode)?;
|
||||
}
|
||||
let resource = FsEventsResource {
|
||||
watcher,
|
||||
receiver: AsyncRefCell::new(receiver),
|
||||
cancel: Default::default(),
|
||||
};
|
||||
|
|
Loading…
Reference in a new issue