0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-01 09:24:20 -04:00
denoland-deno/runtime/ops/fs_events.rs
Bartek Iwańczuk 6984b63f2f
refactor: rewrite ops to use ResourceTable2 (#8512)
This commit migrates all ops to use new resource table
and "AsyncRefCell".

Old implementation of resource table was completely 
removed and all code referencing it was updated to use
new system.
2020-12-16 17:14:12 +01:00

151 lines
4.1 KiB
Rust

// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::permissions::Permissions;
use deno_core::error::bad_resource_id;
use deno_core::error::AnyError;
use deno_core::serde_json;
use deno_core::serde_json::json;
use deno_core::serde_json::Value;
use deno_core::AsyncRefCell;
use deno_core::BufVec;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ZeroCopyBuf;
use notify::event::Event as NotifyEvent;
use notify::Error as NotifyError;
use notify::EventKind;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher;
use serde::Deserialize;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::RefCell;
use std::convert::From;
use std::path::PathBuf;
use std::rc::Rc;
use tokio::sync::mpsc;
pub fn init(rt: &mut deno_core::JsRuntime) {
super::reg_json_sync(rt, "op_fs_events_open", op_fs_events_open);
super::reg_json_async(rt, "op_fs_events_poll", op_fs_events_poll);
}
struct FsEventsResource {
#[allow(unused)]
watcher: RecommendedWatcher,
receiver: AsyncRefCell<mpsc::Receiver<Result<FsEvent, AnyError>>>,
cancel: CancelHandle,
}
impl Resource for FsEventsResource {
fn name(&self) -> Cow<str> {
"fsEvents".into()
}
fn close(self: Rc<Self>) {
self.cancel.cancel();
}
}
/// Represents a file system event.
///
/// We do not use the event directly from the notify crate. We flatten
/// the structure into this simpler structure. We want to only make it more
/// complex as needed.
///
/// Feel free to expand this struct as long as you can add tests to demonstrate
/// the complexity.
#[derive(Serialize, Debug)]
struct FsEvent {
kind: String,
paths: Vec<PathBuf>,
}
impl From<NotifyEvent> for FsEvent {
fn from(e: NotifyEvent) -> Self {
let kind = match e.kind {
EventKind::Any => "any",
EventKind::Access(_) => "access",
EventKind::Create(_) => "create",
EventKind::Modify(_) => "modify",
EventKind::Remove(_) => "remove",
EventKind::Other => todo!(), // What's this for? Leaving it out for now.
}
.to_string();
FsEvent {
kind,
paths: e.paths,
}
}
}
fn op_fs_events_open(
state: &mut OpState,
args: Value,
_zero_copy: &mut [ZeroCopyBuf],
) -> Result<Value, AnyError> {
#[derive(Deserialize)]
struct OpenArgs {
recursive: bool,
paths: Vec<String>,
}
let args: OpenArgs = serde_json::from_value(args)?;
let (sender, receiver) = mpsc::channel::<Result<FsEvent, AnyError>>(16);
let sender = std::sync::Mutex::new(sender);
let mut watcher: RecommendedWatcher =
Watcher::new_immediate(move |res: Result<NotifyEvent, NotifyError>| {
let res2 = res.map(FsEvent::from).map_err(AnyError::from);
let mut sender = sender.lock().unwrap();
// Ignore result, if send failed it means that watcher was already closed,
// but not all messages have been flushed.
let _ = sender.try_send(res2);
})?;
let recursive_mode = if args.recursive {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
};
for path in &args.paths {
state
.borrow::<Permissions>()
.check_read(&PathBuf::from(path))?;
watcher.watch(path, recursive_mode)?;
}
let resource = FsEventsResource {
watcher,
receiver: AsyncRefCell::new(receiver),
cancel: Default::default(),
};
let rid = state.resource_table.add(resource);
Ok(json!(rid))
}
async fn op_fs_events_poll(
state: Rc<RefCell<OpState>>,
args: Value,
_zero_copy: BufVec,
) -> Result<Value, AnyError> {
#[derive(Deserialize)]
struct PollArgs {
rid: u32,
}
let PollArgs { rid } = serde_json::from_value(args)?;
let resource = state
.borrow()
.resource_table
.get::<FsEventsResource>(rid)
.ok_or_else(bad_resource_id)?;
let mut receiver = RcRef::map(&resource, |r| &r.receiver).borrow_mut().await;
let cancel = RcRef::map(resource, |r| &r.cancel);
let maybe_result = receiver.recv().or_cancel(cancel).await?;
match maybe_result {
Some(Ok(value)) => Ok(json!({ "value": value, "done": false })),
Some(Err(err)) => Err(err),
None => Ok(json!({ "done": true })),
}
}