From 5e8581ff4b7bd4a58f1e7d16544ca6498952b5b1 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 6 Feb 2020 21:24:51 -0500 Subject: [PATCH] fix 015_duplicate_parallel_import (#3904) --- cli/compilers/js.rs | 14 +-- cli/compilers/json.rs | 21 ++-- cli/compilers/ts.rs | 195 +++++++++++++-------------------- cli/compilers/wasm.rs | 22 ++-- cli/global_state.rs | 36 +++--- cli/tests/integration_tests.rs | 2 - 6 files changed, 124 insertions(+), 166 deletions(-) diff --git a/cli/compilers/js.rs b/cli/compilers/js.rs index 4e99017b19..e6142a57e2 100644 --- a/cli/compilers/js.rs +++ b/cli/compilers/js.rs @@ -1,25 +1,21 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::compilers::CompiledModule; -use crate::compilers::CompiledModuleFuture; use crate::file_fetcher::SourceFile; -use futures::future::FutureExt; -use std::pin::Pin; +use deno_core::ErrBox; use std::str; pub struct JsCompiler {} impl JsCompiler { - pub fn compile_async( + pub async fn compile_async( &self, source_file: SourceFile, - ) -> Pin> { - let module = CompiledModule { + ) -> Result { + Ok(CompiledModule { code: str::from_utf8(&source_file.source_code) .unwrap() .to_string(), name: source_file.url.to_string(), - }; - - futures::future::ok(module).boxed() + }) } } diff --git a/cli/compilers/json.rs b/cli/compilers/json.rs index 765b79cd67..8d9ed1c4fc 100644 --- a/cli/compilers/json.rs +++ b/cli/compilers/json.rs @@ -1,12 +1,8 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::compilers::CompiledModule; -use crate::compilers::CompiledModuleFuture; use crate::file_fetcher::SourceFile; -use crate::futures::future::FutureExt; use deno_core::ErrBox; use regex::Regex; -use std::pin::Pin; -use std::str; // From https://github.com/mathiasbynens/mothereff.in/blob/master/js-variables/eff.js static JS_RESERVED_WORDS: &str = r"^(?:do|if|in|for|let|new|try|var|case|else|enum|eval|false|null|this|true|void|with|await|break|catch|class|const|super|throw|while|yield|delete|export|import|public|return|static|switch|typeof|default|extends|finally|package|private|continue|debugger|function|arguments|interface|protected|implements|instanceof)$"; @@ -14,19 +10,18 @@ static JS_RESERVED_WORDS: &str = r"^(?:do|if|in|for|let|new|try|var|case|else|en pub struct JsonCompiler {} impl JsonCompiler { - pub fn compile_async( + pub async fn compile_async( &self, source_file: &SourceFile, - ) -> Pin> { - let maybe_json_value: serde_json::Result = - serde_json::from_str(&str::from_utf8(&source_file.source_code).unwrap()); + ) -> Result { + let maybe_json_value = serde_json::from_slice(&source_file.source_code); if let Err(err) = maybe_json_value { - return futures::future::err(ErrBox::from(err)).boxed(); + return Err(ErrBox::from(err)); } let mut code = format!( "export default {};\n", - str::from_utf8(&source_file.source_code).unwrap() + std::str::from_utf8(&source_file.source_code).unwrap() ); if let serde_json::Value::Object(m) = maybe_json_value.unwrap() { @@ -47,11 +42,9 @@ impl JsonCompiler { } } - let module = CompiledModule { + Ok(CompiledModule { code, name: source_file.url.to_string(), - }; - - futures::future::ok(module).boxed() + }) } } diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index c7896ec61f..c882f7d661 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -2,7 +2,6 @@ use super::compiler_worker::CompilerWorker; use crate::compilers::CompilationResultFuture; use crate::compilers::CompiledModule; -use crate::compilers::CompiledModuleFuture; use crate::diagnostics::Diagnostic; use crate::disk_cache::DiskCache; use crate::file_fetcher::SourceFile; @@ -18,7 +17,6 @@ use deno_core::Buf; use deno_core::ErrBox; use deno_core::ModuleSpecifier; use futures::future::FutureExt; -use futures::Future; use regex::Regex; use serde_json::json; use std::collections::HashMap; @@ -26,10 +24,12 @@ use std::collections::HashSet; use std::fs; use std::hash::BuildHasher; use std::io; +use std::ops::Deref; use std::path::PathBuf; use std::pin::Pin; use std::str; use std::sync::atomic::Ordering; +use std::sync::Arc; use std::sync::Mutex; use url::Url; @@ -202,7 +202,7 @@ pub fn source_code_version_hash( crate::checksum::gen(vec![source_code, version.as_bytes(), config_hash]) } -pub struct TsCompiler { +pub struct TsCompilerInner { pub file_fetcher: SourceFileFetcher, pub config: CompilerConfig, pub disk_cache: DiskCache, @@ -216,6 +216,16 @@ pub struct TsCompiler { pub compile_js: bool, } +#[derive(Clone)] +pub struct TsCompiler(Arc); + +impl Deref for TsCompiler { + type Target = TsCompilerInner; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + impl TsCompiler { pub fn new( file_fetcher: SourceFileFetcher, @@ -224,17 +234,14 @@ impl TsCompiler { config_path: Option, ) -> Result { let config = CompilerConfig::load(config_path)?; - - let compiler = Self { + Ok(TsCompiler(Arc::new(TsCompilerInner { file_fetcher, disk_cache, compile_js: config.compile_js, config, compiled: Mutex::new(HashSet::new()), use_disk_cache, - }; - - Ok(compiler) + }))) } /// Create a new V8 worker with snapshot of TS compiler and setup compiler's @@ -261,12 +268,12 @@ impl TsCompiler { worker } - pub fn bundle_async( + pub async fn bundle_async( &self, global_state: ThreadSafeGlobalState, module_name: String, out_file: Option, - ) -> impl Future> { + ) -> Result<(), ErrBox> { debug!( "Invoking the compiler to bundle. module_name: {}", module_name @@ -282,41 +289,15 @@ impl TsCompiler { true, ); - // TODO(ry) The code below looks very similar to spawn_ts_compiler_worker. - // Can we combine them? - let (load_sender, load_receiver) = - tokio::sync::oneshot::channel::>(); - std::thread::spawn(move || { - let mut worker = TsCompiler::setup_worker(global_state); - let handle = worker.thread_safe_handle(); - - let fut = async move { - if let Err(err) = handle.post_message(req_msg).await { - load_sender.send(Err(err)).unwrap(); - return; - } - debug!("Sent message to worker"); - if let Err(err) = (&mut *worker).await { - load_sender.send(Err(err)).unwrap(); - return; - } - let maybe_msg = handle.get_message().await; - debug!("Received message from worker"); - if let Some(ref msg) = maybe_msg { - let json_str = std::str::from_utf8(msg).unwrap(); - debug!("Message: {}", json_str); - if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { - let err = ErrBox::from(diagnostics); - load_sender.send(Err(err)).unwrap(); - return; - } - } - load_sender.send(Ok(())).unwrap(); + let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?; + if let Some(ref msg) = maybe_msg { + let json_str = std::str::from_utf8(msg).unwrap(); + debug!("Message: {}", json_str); + if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { + return Err(ErrBox::from(diagnostics)); } - .boxed_local(); - crate::tokio_util::run_basic(fut); - }); - async { load_receiver.await.unwrap() }.boxed_local() + } + Ok(()) } /// Mark given module URL as compiled to avoid multiple compilations of same @@ -342,17 +323,14 @@ impl TsCompiler { /// /// If compilation is required then new V8 worker is spawned with fresh TS /// compiler. - pub fn compile_async( + pub async fn compile_async( &self, global_state: ThreadSafeGlobalState, source_file: &SourceFile, target: TargetLib, - ) -> Pin> { + ) -> Result { if self.has_compiled(&source_file.url) { - return match self.get_compiled_module(&source_file.url) { - Ok(compiled) => futures::future::ok(compiled).boxed(), - Err(err) => futures::future::err(err).boxed(), - }; + return self.get_compiled_module(&source_file.url); } if self.use_disk_cache { @@ -373,7 +351,7 @@ impl TsCompiler { self.get_compiled_module(&source_file.url) { self.mark_compiled(&source_file.url); - return futures::future::ok(compiled_module).boxed(); + return Ok(compiled_module); } } } @@ -394,51 +372,22 @@ impl TsCompiler { false, ); - // TODO(ry) The code below looks very similar to spawn_ts_compiler_worker. - // Can we combine them? - let (load_sender, load_receiver) = - tokio::sync::oneshot::channel::>(); - std::thread::spawn(move || { - debug!(">>>>> compile_async START"); + let ts_compiler = self.clone(); - let mut worker = TsCompiler::setup_worker(global_state.clone()); - let handle = worker.thread_safe_handle(); + let compiling_job = global_state + .progress + .add("Compile", &module_url.to_string()); + let maybe_msg = execute_in_thread(global_state.clone(), req_msg).await?; - let compiling_job = global_state - .progress - .add("Compile", &module_url.to_string()); - - let fut = async move { - if let Err(err) = handle.post_message(req_msg).await { - load_sender.send(Err(err)).unwrap(); - return; - } - if let Err(err) = (&mut *worker).await { - load_sender.send(Err(err)).unwrap(); - return; - } - let maybe_msg = handle.get_message().await; - if let Some(ref msg) = maybe_msg { - let json_str = std::str::from_utf8(msg).unwrap(); - if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { - let err = ErrBox::from(diagnostics); - load_sender.send(Err(err)).unwrap(); - return; - } - } - let compiled_module = global_state - .ts_compiler - .get_compiled_module(&source_file_.url) - .expect("Expected to find compiled file"); - drop(compiling_job); - debug!(">>>>> compile_sync END"); - load_sender.send(Ok(compiled_module)).unwrap(); + if let Some(ref msg) = maybe_msg { + let json_str = std::str::from_utf8(msg).unwrap(); + if let Some(diagnostics) = Diagnostic::from_emit_result(json_str) { + return Err(ErrBox::from(diagnostics)); } - .boxed_local(); - crate::tokio_util::run_basic(fut); - }); - - async { load_receiver.await.unwrap() }.boxed_local() + } + let compiled_module = ts_compiler.get_compiled_module(&source_file_.url)?; + drop(compiling_job); + Ok(compiled_module) } /// Get associated `CompiledFileMetadata` for given module if it exists. @@ -654,37 +603,47 @@ impl TsCompiler { } } -// TODO(ry) this is pretty general purpose and should be lifted and generalized. -fn spawn_ts_compiler_worker( - req_msg: Buf, +async fn execute_in_thread( global_state: ThreadSafeGlobalState, -) -> Pin> { + req: Buf, +) -> Result, ErrBox> { let (load_sender, load_receiver) = - tokio::sync::oneshot::channel::(); - + tokio::sync::oneshot::channel::, ErrBox>>(); std::thread::spawn(move || { - let mut worker = TsCompiler::setup_worker(global_state); + debug!(">>>>> compile_async START"); + + let mut worker = TsCompiler::setup_worker(global_state.clone()); let handle = worker.thread_safe_handle(); - let fut = async move { - debug!("Sent message to worker"); - if let Err(err) = handle.post_message(req_msg).await { - load_sender.send(Err(err)).unwrap(); - return; + crate::tokio_util::run_basic( + async move { + if let Err(err) = handle.post_message(req).await { + load_sender.send(Err(err)).unwrap(); + return; + } + if let Err(err) = (&mut *worker).await { + load_sender.send(Err(err)).unwrap(); + return; + } + let maybe_msg = handle.get_message().await; + load_sender.send(Ok(maybe_msg)).unwrap(); + debug!(">>>>> compile_sync END"); } - if let Err(err) = (&mut *worker).await { - load_sender.send(Err(err)).unwrap(); - return; - } - let msg = handle.get_message().await.unwrap(); - let json_str = std::str::from_utf8(&msg).unwrap(); - load_sender.send(Ok(json!(json_str))).unwrap(); - }; - crate::tokio_util::run_basic(fut); + .boxed_local(), + ); }); - let fut = async { load_receiver.await.unwrap() }; - fut.boxed_local() + load_receiver.await.unwrap() +} + +async fn execute_in_thread_json( + req_msg: Buf, + global_state: ThreadSafeGlobalState, +) -> JsonResult { + let maybe_msg = execute_in_thread(global_state, req_msg).await?; + let msg = maybe_msg.unwrap(); + let json_str = std::str::from_utf8(&msg).unwrap(); + Ok(json!(json_str)) } pub fn runtime_compile_async( @@ -706,7 +665,7 @@ pub fn runtime_compile_async( .into_boxed_str() .into_boxed_bytes(); - spawn_ts_compiler_worker(req_msg, global_state) + execute_in_thread_json(req_msg, global_state).boxed_local() } pub fn runtime_transpile_async( @@ -723,7 +682,7 @@ pub fn runtime_transpile_async( .into_boxed_str() .into_boxed_bytes(); - spawn_ts_compiler_worker(req_msg, global_state) + execute_in_thread_json(req_msg, global_state).boxed_local() } #[cfg(test)] diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index a3e578f01c..63eebadb31 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -1,22 +1,26 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::compiler_worker::CompilerWorker; use crate::compilers::CompiledModule; -use crate::compilers::CompiledModuleFuture; use crate::file_fetcher::SourceFile; use crate::global_state::ThreadSafeGlobalState; use crate::startup_data; use crate::state::*; use deno_core::ErrBox; use deno_core::ModuleSpecifier; -use futures::FutureExt; use serde_derive::Deserialize; use serde_json; use std::collections::HashMap; -use std::pin::Pin; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; use url::Url; +// TODO(ry) The entire concept of spawning a thread, sending data to JS, +// compiling WASM there, and moving the data back into the calling thread is +// completelly wrong. V8 has native facilities for getting this information. +// We might be lacking bindings for this currently in rusty_v8 but ultimately +// this "compiler" should be calling into rusty_v8 directly, not spawning +// threads. + // TODO(kevinkassimo): This is a hack to encode/decode data as base64 string. // (Since Deno namespace might not be available, Deno.read can fail). // Binary data is already available through source_file.source_code. @@ -67,18 +71,19 @@ impl WasmCompiler { worker } - pub fn compile_async( + pub async fn compile_async( &self, global_state: ThreadSafeGlobalState, source_file: &SourceFile, - ) -> Pin> { + ) -> Result { let cache = self.cache.clone(); + let cache_ = self.cache.clone(); let source_file = source_file.clone(); + let maybe_cached = { cache.lock().unwrap().get(&source_file.url).cloned() }; if let Some(m) = maybe_cached { - return futures::future::ok(m).boxed(); + return Ok(m); } - let cache_ = self.cache.clone(); let (load_sender, load_receiver) = tokio::sync::oneshot::channel::>(); @@ -133,8 +138,7 @@ impl WasmCompiler { crate::tokio_util::run_basic(fut); }); - let fut = async { load_receiver.await.unwrap() }; - fut.boxed_local() + load_receiver.await.unwrap() } } diff --git a/cli/global_state.rs b/cli/global_state.rs index 80f7d6e7a7..ca040c40c0 100644 --- a/cli/global_state.rs +++ b/cli/global_state.rs @@ -23,8 +23,10 @@ use std::path::Path; use std::str; use std::sync::Arc; use std::sync::Mutex; +use tokio::sync::Mutex as AsyncMutex; /// Holds state of the program and can be accessed by V8 isolate. +#[derive(Clone)] pub struct ThreadSafeGlobalState(Arc); /// This structure represents state of single "deno" program. @@ -45,12 +47,7 @@ pub struct GlobalState { pub ts_compiler: TsCompiler, pub wasm_compiler: WasmCompiler, pub lockfile: Option>, -} - -impl Clone for ThreadSafeGlobalState { - fn clone(&self) -> Self { - ThreadSafeGlobalState(self.0.clone()) - } + compile_lock: AsyncMutex<()>, } impl Deref for ThreadSafeGlobalState { @@ -103,6 +100,7 @@ impl ThreadSafeGlobalState { json_compiler: JsonCompiler {}, wasm_compiler: WasmCompiler::default(), lockfile, + compile_lock: AsyncMutex::new(()), }; Ok(ThreadSafeGlobalState(Arc::new(state))) @@ -122,11 +120,19 @@ impl ThreadSafeGlobalState { .file_fetcher .fetch_source_file_async(&module_specifier, maybe_referrer) .await?; - let compiled_module_fut = match out.media_type { - msg::MediaType::Unknown => state1.js_compiler.compile_async(out), - msg::MediaType::Json => state1.json_compiler.compile_async(&out), + + // TODO(ry) Try to lift compile_lock as high up in the call stack for + // sanity. + let compile_lock = self.compile_lock.lock().await; + + let compiled_module = match out.media_type { + msg::MediaType::Unknown => state1.js_compiler.compile_async(out).await, + msg::MediaType::Json => state1.json_compiler.compile_async(&out).await, msg::MediaType::Wasm => { - state1.wasm_compiler.compile_async(state1.clone(), &out) + state1 + .wasm_compiler + .compile_async(state1.clone(), &out) + .await } msg::MediaType::TypeScript | msg::MediaType::TSX @@ -134,18 +140,20 @@ impl ThreadSafeGlobalState { state1 .ts_compiler .compile_async(state1.clone(), &out, target_lib) + .await } msg::MediaType::JavaScript => { if state1.ts_compiler.compile_js { - state1 + state2 .ts_compiler .compile_async(state1.clone(), &out, target_lib) + .await } else { - state1.js_compiler.compile_async(out) + state1.js_compiler.compile_async(out).await } } - }; - let compiled_module = compiled_module_fut.await?; + }?; + drop(compile_lock); if let Some(ref lockfile) = state2.lockfile { let mut g = lockfile.lock().unwrap(); diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index 9555f93c07..a0486fbf66 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -323,12 +323,10 @@ itest!(_014_duplicate_import { output: "014_duplicate_import.ts.out", }); -/* TODO(ry) Disabled to get #3844 landed faster. Re-enable. itest!(_015_duplicate_parallel_import { args: "run --reload --allow-read 015_duplicate_parallel_import.js", output: "015_duplicate_parallel_import.js.out", }); -*/ itest!(_016_double_await { args: "run --allow-read --reload 016_double_await.ts",