From b6fda735ee0106b72500d927b9695a27ecd519f3 Mon Sep 17 00:00:00 2001 From: F001 Date: Fri, 30 Nov 2018 11:03:00 +0800 Subject: [PATCH] Replace mutex by atomics (#1238) --- src/deno_dir.rs | 11 +-- src/isolate.rs | 198 +++++++++++++++++++++------------------------ src/libdeno.rs | 12 +++ src/main.rs | 10 +-- src/msg.rs | 14 ++++ src/ops.rs | 11 +-- src/permissions.rs | 43 +++++----- 7 files changed, 151 insertions(+), 148 deletions(-) diff --git a/src/deno_dir.rs b/src/deno_dir.rs index a7b9f97d6d..f75d35bc47 100644 --- a/src/deno_dir.rs +++ b/src/deno_dir.rs @@ -41,16 +41,13 @@ impl DenoDir { // https://github.com/denoland/deno/blob/golang/deno_dir.go#L99-L111 pub fn new( reload: bool, - custom_root: Option<&Path>, + custom_root: Option, ) -> std::io::Result { // Only setup once. let home_dir = dirs::home_dir().expect("Could not get home directory."); let default = home_dir.join(".deno"); - let root: PathBuf = match custom_root { - None => default, - Some(path) => path.to_path_buf(), - }; + let root: PathBuf = custom_root.unwrap_or(default); let gen = root.as_path().join("gen"); let deps = root.as_path().join("deps"); let deps_http = deps.join("http"); @@ -390,8 +387,8 @@ pub struct CodeFetchOutput { #[cfg(test)] pub fn test_setup() -> (TempDir, DenoDir) { let temp_dir = TempDir::new().expect("tempdir fail"); - let deno_dir = - DenoDir::new(false, Some(temp_dir.path())).expect("setup fail"); + let deno_dir = DenoDir::new(false, Some(temp_dir.path().to_path_buf())) + .expect("setup fail"); (temp_dir, deno_dir) } diff --git a/src/isolate.rs b/src/isolate.rs index acd40e5d84..6e53e4461f 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -17,10 +17,9 @@ use std; use std::env; use std::ffi::CStr; use std::ffi::CString; -use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc; use std::sync::Arc; -use std::sync::Mutex; use std::time::Duration; use std::time::Instant; use tokio; @@ -47,130 +46,113 @@ pub struct Isolate { libdeno_isolate: *const libdeno::isolate, dispatch: Dispatch, rx: mpsc::Receiver<(i32, Buf)>, + tx: mpsc::Sender<(i32, Buf)>, ntasks: i32, pub timeout_due: Option, pub state: Arc, } -// Isolate cannot be passed between threads but IsolateState can. So any state that -// needs to be accessed outside the main V8 thread should be inside IsolateState. +// Isolate cannot be passed between threads but IsolateState can. +// IsolateState satisfies Send and Sync. +// So any state that needs to be accessed outside the main V8 thread should be +// inside IsolateState. #[cfg_attr(feature = "cargo-clippy", allow(stutter))] pub struct IsolateState { pub dir: deno_dir::DenoDir, pub argv: Vec, - pub permissions: Mutex, + pub permissions: DenoPermissions, pub flags: flags::DenoFlags, - tx: Mutex>>, - pub metrics: Mutex, + pub metrics: Metrics, } impl IsolateState { - // Thread safe. - fn send_to_js(&self, req_id: i32, buf: Buf) { - let mut g = self.tx.lock().unwrap(); - let maybe_tx = g.as_mut(); - assert!(maybe_tx.is_some(), "Expected tx to not be deleted."); - let tx = maybe_tx.unwrap(); - tx.send((req_id, buf)).expect("tx.send error"); + pub fn new(flags: flags::DenoFlags, argv_rest: Vec) -> Self { + let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok(); + IsolateState { + dir: deno_dir::DenoDir::new(flags.reload, custom_root).unwrap(), + argv: argv_rest, + permissions: DenoPermissions::new(&flags), + flags, + metrics: Metrics::default(), + } } pub fn check_write(&self, filename: &str) -> DenoResult<()> { - let mut perm = self.permissions.lock().unwrap(); - perm.check_write(filename) + self.permissions.check_write(filename) } pub fn check_env(&self) -> DenoResult<()> { - let mut perm = self.permissions.lock().unwrap(); - perm.check_env() + self.permissions.check_env() } pub fn check_net(&self, filename: &str) -> DenoResult<()> { - let mut perm = self.permissions.lock().unwrap(); - perm.check_net(filename) + self.permissions.check_net(filename) } pub fn check_run(&self) -> DenoResult<()> { - let mut perm = self.permissions.lock().unwrap(); - perm.check_run() + self.permissions.check_run() } fn metrics_op_dispatched( &self, - bytes_sent_control: u64, - bytes_sent_data: u64, + bytes_sent_control: usize, + bytes_sent_data: usize, ) { - let mut metrics = self.metrics.lock().unwrap(); - metrics.ops_dispatched += 1; - metrics.bytes_sent_control += bytes_sent_control; - metrics.bytes_sent_data += bytes_sent_data; + self.metrics.ops_dispatched.fetch_add(1, Ordering::SeqCst); + self + .metrics + .bytes_sent_control + .fetch_add(bytes_sent_control, Ordering::SeqCst); + self + .metrics + .bytes_sent_data + .fetch_add(bytes_sent_data, Ordering::SeqCst); } - fn metrics_op_completed(&self, bytes_received: u64) { - let mut metrics = self.metrics.lock().unwrap(); - metrics.ops_completed += 1; - metrics.bytes_received += bytes_received; + fn metrics_op_completed(&self, bytes_received: usize) { + self.metrics.ops_completed.fetch_add(1, Ordering::SeqCst); + self + .metrics + .bytes_received + .fetch_add(bytes_received, Ordering::SeqCst); } } +// AtomicU64 is currently unstable #[derive(Default)] pub struct Metrics { - pub ops_dispatched: u64, - pub ops_completed: u64, - pub bytes_sent_control: u64, - pub bytes_sent_data: u64, - pub bytes_received: u64, + pub ops_dispatched: AtomicUsize, + pub ops_completed: AtomicUsize, + pub bytes_sent_control: AtomicUsize, + pub bytes_sent_data: AtomicUsize, + pub bytes_received: AtomicUsize, } static DENO_INIT: std::sync::Once = std::sync::ONCE_INIT; -fn empty() -> libdeno::deno_buf { - libdeno::deno_buf { - alloc_ptr: std::ptr::null_mut(), - alloc_len: 0, - data_ptr: std::ptr::null_mut(), - data_len: 0, - } -} - impl Isolate { pub fn new( snapshot: libdeno::deno_buf, - flags: flags::DenoFlags, - argv_rest: Vec, + state: Arc, dispatch: Dispatch, ) -> Self { DENO_INIT.call_once(|| { unsafe { libdeno::deno_init() }; }); - let shared = empty(); // TODO Use shared for message passing. + let shared = libdeno::deno_buf::empty(); // TODO Use shared for message passing. let libdeno_isolate = unsafe { libdeno::deno_new(snapshot, shared, pre_dispatch) }; // This channel handles sending async messages back to the runtime. let (tx, rx) = mpsc::channel::<(i32, Buf)>(); - let custom_root_path; - let custom_root = match env::var("DENO_DIR") { - Ok(path) => { - custom_root_path = path; - Some(Path::new(custom_root_path.as_str())) - } - Err(_e) => None, - }; - Self { libdeno_isolate, dispatch, rx, + tx, ntasks: 0, timeout_due: None, - state: Arc::new(IsolateState { - dir: deno_dir::DenoDir::new(flags.reload, custom_root).unwrap(), - argv: argv_rest, - permissions: Mutex::new(DenoPermissions::new(&flags)), - flags, - tx: Mutex::new(Some(tx)), - metrics: Mutex::new(Metrics::default()), - }), + state: state, } } @@ -207,7 +189,7 @@ impl Isolate { } pub fn respond(&mut self, req_id: i32, buf: Buf) { - self.state.metrics_op_completed(buf.len() as u64); + self.state.metrics_op_completed(buf.len()); // TODO(zero-copy) Use Buf::leak(buf) to leak the heap allocated buf. And // don't do the memcpy in ImportBuf() (in libdeno/binding.cc) @@ -311,8 +293,8 @@ extern "C" fn pre_dispatch( data_buf: libdeno::deno_buf, ) { // for metrics - let bytes_sent_control = control_buf.data_len as u64; - let bytes_sent_data = data_buf.data_len as u64; + let bytes_sent_control = control_buf.data_len; + let bytes_sent_data = data_buf.data_len; // control_buf is only valid for the lifetime of this call, thus is // interpretted as a slice. @@ -344,14 +326,14 @@ extern "C" fn pre_dispatch( if buf_size == 0 { // FIXME - isolate.state.metrics_op_completed(buf.len() as u64); + isolate.state.metrics_op_completed(buf.len()); } else { // Set the synchronous response, the value returned from isolate.send(). isolate.respond(req_id, buf); } } else { // Execute op asynchronously. - let state = Arc::clone(&isolate.state); + let tx = isolate.tx.clone(); // TODO Ideally Tokio would could tell us how many tasks are executing, but // it cannot currently. Therefore we track top-level promises/tasks @@ -360,7 +342,8 @@ extern "C" fn pre_dispatch( let task = op .and_then(move |buf| { - state.send_to_js(req_id, buf); + let sender = tx; // tx is moved to new thread + sender.send((req_id, buf)).expect("tx.send error"); Ok(()) }).map_err(|_| ()); tokio::spawn(task); @@ -398,7 +381,10 @@ mod tests { fn test_dispatch_sync() { let argv = vec![String::from("./deno"), String::from("hello.js")]; let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - let mut isolate = Isolate::new(empty(), flags, rest_argv, dispatch_sync); + + let state = Arc::new(IsolateState::new(flags, rest_argv)); + let snapshot = libdeno::deno_buf::empty(); + let mut isolate = Isolate::new(snapshot, state, dispatch_sync); tokio_util::init(|| { isolate .execute( @@ -438,17 +424,18 @@ mod tests { fn test_metrics_sync() { let argv = vec![String::from("./deno"), String::from("hello.js")]; let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - let mut isolate = - Isolate::new(empty(), flags, rest_argv, metrics_dispatch_sync); + let state = Arc::new(IsolateState::new(flags, rest_argv)); + let snapshot = libdeno::deno_buf::empty(); + let mut isolate = Isolate::new(snapshot, state, metrics_dispatch_sync); tokio_util::init(|| { // Verify that metrics have been properly initialized. { - let metrics = isolate.state.metrics.lock().unwrap(); - assert_eq!(metrics.ops_dispatched, 0); - assert_eq!(metrics.ops_completed, 0); - assert_eq!(metrics.bytes_sent_control, 0); - assert_eq!(metrics.bytes_sent_data, 0); - assert_eq!(metrics.bytes_received, 0); + let metrics = &isolate.state.metrics; + assert_eq!(metrics.ops_dispatched.load(Ordering::SeqCst), 0); + assert_eq!(metrics.ops_completed.load(Ordering::SeqCst), 0); + assert_eq!(metrics.bytes_sent_control.load(Ordering::SeqCst), 0); + assert_eq!(metrics.bytes_sent_data.load(Ordering::SeqCst), 0); + assert_eq!(metrics.bytes_received.load(Ordering::SeqCst), 0); } isolate @@ -461,12 +448,12 @@ mod tests { "#, ).expect("execute error"); isolate.event_loop(); - let metrics = isolate.state.metrics.lock().unwrap(); - assert_eq!(metrics.ops_dispatched, 1); - assert_eq!(metrics.ops_completed, 1); - assert_eq!(metrics.bytes_sent_control, 3); - assert_eq!(metrics.bytes_sent_data, 5); - assert_eq!(metrics.bytes_received, 4); + let metrics = &isolate.state.metrics; + assert_eq!(metrics.ops_dispatched.load(Ordering::SeqCst), 1); + assert_eq!(metrics.ops_completed.load(Ordering::SeqCst), 1); + assert_eq!(metrics.bytes_sent_control.load(Ordering::SeqCst), 3); + assert_eq!(metrics.bytes_sent_data.load(Ordering::SeqCst), 5); + assert_eq!(metrics.bytes_received.load(Ordering::SeqCst), 4); }); } @@ -474,17 +461,18 @@ mod tests { fn test_metrics_async() { let argv = vec![String::from("./deno"), String::from("hello.js")]; let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - let mut isolate = - Isolate::new(empty(), flags, rest_argv, metrics_dispatch_async); + let state = Arc::new(IsolateState::new(flags, rest_argv)); + let snapshot = libdeno::deno_buf::empty(); + let mut isolate = Isolate::new(snapshot, state, metrics_dispatch_async); tokio_util::init(|| { // Verify that metrics have been properly initialized. { - let metrics = isolate.state.metrics.lock().unwrap(); - assert_eq!(metrics.ops_dispatched, 0); - assert_eq!(metrics.ops_completed, 0); - assert_eq!(metrics.bytes_sent_control, 0); - assert_eq!(metrics.bytes_sent_data, 0); - assert_eq!(metrics.bytes_received, 0); + let metrics = &isolate.state.metrics; + assert_eq!(metrics.ops_dispatched.load(Ordering::SeqCst), 0); + assert_eq!(metrics.ops_completed.load(Ordering::SeqCst), 0); + assert_eq!(metrics.bytes_sent_control.load(Ordering::SeqCst), 0); + assert_eq!(metrics.bytes_sent_data.load(Ordering::SeqCst), 0); + assert_eq!(metrics.bytes_received.load(Ordering::SeqCst), 0); } isolate @@ -500,10 +488,10 @@ mod tests { // Make sure relevant metrics are updated before task is executed. { - let metrics = isolate.state.metrics.lock().unwrap(); - assert_eq!(metrics.ops_dispatched, 1); - assert_eq!(metrics.bytes_sent_control, 3); - assert_eq!(metrics.bytes_sent_data, 5); + let metrics = &isolate.state.metrics; + assert_eq!(metrics.ops_dispatched.load(Ordering::SeqCst), 1); + assert_eq!(metrics.bytes_sent_control.load(Ordering::SeqCst), 3); + assert_eq!(metrics.bytes_sent_data.load(Ordering::SeqCst), 5); // Note we cannot check ops_completed nor bytes_received because that // would be a race condition. It might be nice to have use a oneshot // with metrics_dispatch_async() to properly validate them. @@ -513,12 +501,12 @@ mod tests { // Make sure relevant metrics are updated after task is executed. { - let metrics = isolate.state.metrics.lock().unwrap(); - assert_eq!(metrics.ops_dispatched, 1); - assert_eq!(metrics.ops_completed, 1); - assert_eq!(metrics.bytes_sent_control, 3); - assert_eq!(metrics.bytes_sent_data, 5); - assert_eq!(metrics.bytes_received, 4); + let metrics = &isolate.state.metrics; + assert_eq!(metrics.ops_dispatched.load(Ordering::SeqCst), 1); + assert_eq!(metrics.ops_completed.load(Ordering::SeqCst), 1); + assert_eq!(metrics.bytes_sent_control.load(Ordering::SeqCst), 3); + assert_eq!(metrics.bytes_sent_data.load(Ordering::SeqCst), 5); + assert_eq!(metrics.bytes_received.load(Ordering::SeqCst), 4); } }); } diff --git a/src/libdeno.rs b/src/libdeno.rs index bb0509e011..e85b37ed42 100644 --- a/src/libdeno.rs +++ b/src/libdeno.rs @@ -2,6 +2,7 @@ use libc::c_char; use libc::c_int; use libc::c_void; +use std::ptr::null_mut; #[repr(C)] pub struct isolate { @@ -17,6 +18,17 @@ pub struct deno_buf { pub data_len: usize, } +impl deno_buf { + pub fn empty() -> Self { + deno_buf { + alloc_ptr: null_mut(), + alloc_len: 0, + data_ptr: null_mut(), + data_len: 0, + } + } +} + type DenoRecvCb = unsafe extern "C" fn( user_data: *mut c_void, req_id: i32, diff --git a/src/main.rs b/src/main.rs index 4595f71600..7c6806d5d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -49,6 +49,7 @@ pub mod version; mod eager_unix; use std::env; +use std::sync::Arc; static LOGGER: Logger = Logger; @@ -95,12 +96,9 @@ fn main() { log::LevelFilter::Info }); - let mut isolate = isolate::Isolate::new( - unsafe { snapshot::deno_snapshot.clone() }, - flags, - rest_argv, - ops::dispatch, - ); + let state = Arc::new(isolate::IsolateState::new(flags, rest_argv)); + let snapshot = unsafe { snapshot::deno_snapshot.clone() }; + let mut isolate = isolate::Isolate::new(snapshot, state, ops::dispatch); tokio_util::init(|| { isolate .execute("deno_main.js", "denoMain();") diff --git a/src/msg.rs b/src/msg.rs index 9c8095e8f7..b47d588bd4 100644 --- a/src/msg.rs +++ b/src/msg.rs @@ -2,6 +2,20 @@ #![allow(dead_code)] #![cfg_attr(feature = "cargo-clippy", allow(clippy, pedantic))] use flatbuffers; +use std::sync::atomic::Ordering; + // GN_OUT_DIR is set either by build.rs (for the Cargo build), or by // build_extra/rust/run.py (for the GN+Ninja build). include!(concat!(env!("GN_OUT_DIR"), "/gen/msg_generated.rs")); + +impl<'a> From<&'a super::isolate::Metrics> for MetricsResArgs { + fn from(m: &'a super::isolate::Metrics) -> Self { + MetricsResArgs { + ops_dispatched: m.ops_dispatched.load(Ordering::SeqCst) as u64, + ops_completed: m.ops_completed.load(Ordering::SeqCst) as u64, + bytes_sent_control: m.bytes_sent_control.load(Ordering::SeqCst) as u64, + bytes_sent_data: m.bytes_sent_data.load(Ordering::SeqCst) as u64, + bytes_received: m.bytes_received.load(Ordering::SeqCst) as u64, + } + } +} diff --git a/src/ops.rs b/src/ops.rs index f8e3c9dc1c..a0761fc84a 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -23,6 +23,7 @@ use remove_dir_all::remove_dir_all; use repl; use resources::table_entries; use std; +use std::convert::From; use std::fs; use std::net::{Shutdown, SocketAddr}; #[cfg(unix)] @@ -1291,18 +1292,10 @@ fn op_metrics( assert_eq!(data.len(), 0); let cmd_id = base.cmd_id(); - let metrics = state.metrics.lock().unwrap(); - let builder = &mut FlatBufferBuilder::new(); let inner = msg::MetricsRes::create( builder, - &msg::MetricsResArgs { - ops_dispatched: metrics.ops_dispatched, - ops_completed: metrics.ops_completed, - bytes_sent_control: metrics.bytes_sent_control, - bytes_sent_data: metrics.bytes_sent_data, - bytes_received: metrics.bytes_received, - }, + &msg::MetricsResArgs::from(&state.metrics), ); ok_future(serialize_response( cmd_id, diff --git a/src/permissions.rs b/src/permissions.rs index 7ca13d44a2..d86992da3f 100644 --- a/src/permissions.rs +++ b/src/permissions.rs @@ -5,40 +5,41 @@ use flags::DenoFlags; use errors::permission_denied; use errors::DenoResult; use std::io; +use std::sync::atomic::{AtomicBool, Ordering}; #[cfg_attr(feature = "cargo-clippy", allow(stutter))] -#[derive(Debug, Default, PartialEq)] +#[derive(Debug, Default)] pub struct DenoPermissions { - pub allow_write: bool, - pub allow_net: bool, - pub allow_env: bool, - pub allow_run: bool, + pub allow_write: AtomicBool, + pub allow_net: AtomicBool, + pub allow_env: AtomicBool, + pub allow_run: AtomicBool, } impl DenoPermissions { pub fn new(flags: &DenoFlags) -> Self { Self { - allow_write: flags.allow_write, - allow_env: flags.allow_env, - allow_net: flags.allow_net, - allow_run: flags.allow_run, + allow_write: AtomicBool::new(flags.allow_write), + allow_env: AtomicBool::new(flags.allow_env), + allow_net: AtomicBool::new(flags.allow_net), + allow_run: AtomicBool::new(flags.allow_run), } } - pub fn check_run(&mut self) -> DenoResult<()> { - if self.allow_run { + pub fn check_run(&self) -> DenoResult<()> { + if self.allow_run.load(Ordering::SeqCst) { return Ok(()); }; // TODO get location (where access occurred) let r = permission_prompt("Deno requests access to run a subprocess."); if r.is_ok() { - self.allow_run = true; + self.allow_run.store(true, Ordering::SeqCst); } r } - pub fn check_write(&mut self, filename: &str) -> DenoResult<()> { - if self.allow_write { + pub fn check_write(&self, filename: &str) -> DenoResult<()> { + if self.allow_write.load(Ordering::SeqCst) { return Ok(()); }; // TODO get location (where access occurred) @@ -47,13 +48,13 @@ impl DenoPermissions { filename ));; if r.is_ok() { - self.allow_write = true; + self.allow_write.store(true, Ordering::SeqCst); } r } - pub fn check_net(&mut self, domain_name: &str) -> DenoResult<()> { - if self.allow_net { + pub fn check_net(&self, domain_name: &str) -> DenoResult<()> { + if self.allow_net.load(Ordering::SeqCst) { return Ok(()); }; // TODO get location (where access occurred) @@ -62,20 +63,20 @@ impl DenoPermissions { domain_name )); if r.is_ok() { - self.allow_net = true; + self.allow_net.store(true, Ordering::SeqCst); } r } - pub fn check_env(&mut self) -> DenoResult<()> { - if self.allow_env { + pub fn check_env(&self) -> DenoResult<()> { + if self.allow_env.load(Ordering::SeqCst) { return Ok(()); }; // TODO get location (where access occurred) let r = permission_prompt(&"Deno requests access to environment variables."); if r.is_ok() { - self.allow_env = true; + self.allow_env.store(true, Ordering::SeqCst); } r }