From c7c767782538243ded64742dca9b34d6af74d62d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 6 Oct 2020 10:18:22 +0200 Subject: [PATCH] fix(core): module execution with top level await (#7672) This commit fixes implementation of top level await in "deno_core". Previously promise returned from module execution was ignored causing to execute modules out-of-order. With this commit promise returned from module execution is stored on "JsRuntime" and event loop is polled until the promise resolves. --- cli/ops/worker_host.rs | 7 + cli/tests/integration_tests.rs | 10 + cli/tests/top_level_await_bug.js | 2 + cli/tests/top_level_await_bug.out | 1 + cli/tests/top_level_await_bug2.js | 15 ++ cli/tests/top_level_await_bug2.out | 4 + cli/tests/top_level_await_bug_nested.js | 5 + cli/worker.rs | 4 +- core/modules.rs | 15 +- core/runtime.rs | 251 ++++++++++++++++++++++-- 10 files changed, 293 insertions(+), 21 deletions(-) create mode 100644 cli/tests/top_level_await_bug.js create mode 100644 cli/tests/top_level_await_bug.out create mode 100644 cli/tests/top_level_await_bug2.js create mode 100644 cli/tests/top_level_await_bug2.out create mode 100644 cli/tests/top_level_await_bug_nested.js diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index 9175ca0f16..17e0e397f6 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -155,6 +155,13 @@ fn run_worker_thread( if let Err(e) = result { let mut sender = worker.internal_channels.sender.clone(); + + // If sender is closed it means that worker has already been closed from + // within using "globalThis.close()" + if sender.is_closed() { + return; + } + sender .try_send(WorkerEvent::TerminalError(e)) .expect("Failed to post message to host"); diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index 8e2007b427..9ad7bac8cd 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -2662,6 +2662,16 @@ itest!(ignore_require { exit_code: 0, }); +itest!(top_level_await_bug { + args: "run --allow-read top_level_await_bug.js", + output: "top_level_await_bug.out", +}); + +itest!(top_level_await_bug2 { + args: "run --allow-read top_level_await_bug2.js", + output: "top_level_await_bug2.out", +}); + #[test] fn cafile_env_fetch() { use deno_core::url::Url; diff --git a/cli/tests/top_level_await_bug.js b/cli/tests/top_level_await_bug.js new file mode 100644 index 0000000000..3c6860a5b4 --- /dev/null +++ b/cli/tests/top_level_await_bug.js @@ -0,0 +1,2 @@ +const mod = await import("./top_level_await_bug_nested.js"); +console.log(mod); diff --git a/cli/tests/top_level_await_bug.out b/cli/tests/top_level_await_bug.out new file mode 100644 index 0000000000..f0369645c9 --- /dev/null +++ b/cli/tests/top_level_await_bug.out @@ -0,0 +1 @@ +Module { default: 1, [Symbol(Symbol.toStringTag)]: "Module" } diff --git a/cli/tests/top_level_await_bug2.js b/cli/tests/top_level_await_bug2.js new file mode 100644 index 0000000000..c847bbd34b --- /dev/null +++ b/cli/tests/top_level_await_bug2.js @@ -0,0 +1,15 @@ +const mod = await import("./top_level_await_bug_nested.js"); +console.log(mod); + +const sleep = (n) => new Promise((r) => setTimeout(r, n)); + +await sleep(100); +console.log("slept"); + +window.addEventListener("load", () => { + console.log("load event"); +}); + +setTimeout(() => { + console.log("timeout"); +}, 1000); diff --git a/cli/tests/top_level_await_bug2.out b/cli/tests/top_level_await_bug2.out new file mode 100644 index 0000000000..509ee27c26 --- /dev/null +++ b/cli/tests/top_level_await_bug2.out @@ -0,0 +1,4 @@ +Module { default: 1, [Symbol(Symbol.toStringTag)]: "Module" } +slept +load event +timeout diff --git a/cli/tests/top_level_await_bug_nested.js b/cli/tests/top_level_await_bug_nested.js new file mode 100644 index 0000000000..894f0de2d5 --- /dev/null +++ b/cli/tests/top_level_await_bug_nested.js @@ -0,0 +1,5 @@ +const sleep = (n) => new Promise((r) => setTimeout(r, n)); + +await sleep(100); + +export default 1; diff --git a/cli/worker.rs b/cli/worker.rs index 08ccd418e5..47e5c47618 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -189,7 +189,7 @@ impl Worker { ) -> Result<(), AnyError> { let id = self.preload_module(module_specifier).await?; self.wait_for_inspector_session(); - self.isolate.mod_evaluate(id) + self.isolate.mod_evaluate(id).await } /// Loads, instantiates and executes provided source code @@ -204,7 +204,7 @@ impl Worker { .load_module(module_specifier, Some(code)) .await?; self.wait_for_inspector_session(); - self.isolate.mod_evaluate(id) + self.isolate.mod_evaluate(id).await } /// Returns a way to communicate with the Worker from other threads. diff --git a/core/modules.rs b/core/modules.rs index 235bfeb4e4..a0e4fad958 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -341,6 +341,13 @@ pub struct ModuleInfo { pub name: String, pub handle: v8::Global, pub import_specifiers: Vec, + // TODO(bartlomieju): there should be "state" + // field that describes if module is already being loaded, + // so concurent dynamic imports don't introduce dead lock + // pub state: LoadState { + // Loading(shared_future), + // Loaded, + // }, } /// A symbolic module entity. @@ -667,7 +674,7 @@ mod tests { let a_id_fut = runtime.load_module(&spec, None); let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load"); - runtime.mod_evaluate(a_id).unwrap(); + futures::executor::block_on(runtime.mod_evaluate(a_id)).unwrap(); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -734,7 +741,7 @@ mod tests { let result = runtime.load_module(&spec, None).await; assert!(result.is_ok()); let circular1_id = result.unwrap(); - runtime.mod_evaluate(circular1_id).unwrap(); + runtime.mod_evaluate(circular1_id).await.unwrap(); let l = loads.lock().unwrap(); assert_eq!( @@ -811,7 +818,7 @@ mod tests { println!(">> result {:?}", result); assert!(result.is_ok()); let redirect1_id = result.unwrap(); - runtime.mod_evaluate(redirect1_id).unwrap(); + runtime.mod_evaluate(redirect1_id).await.unwrap(); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -961,7 +968,7 @@ mod tests { let main_id = futures::executor::block_on(main_id_fut).expect("Failed to load"); - runtime.mod_evaluate(main_id).unwrap(); + futures::executor::block_on(runtime.mod_evaluate(main_id)).unwrap(); let l = loads.lock().unwrap(); assert_eq!( diff --git a/core/runtime.rs b/core/runtime.rs index 514703f343..2474d18872 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -23,6 +23,8 @@ use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use crate::BufVec; use crate::OpState; +use futures::channel::mpsc; +use futures::future::poll_fn; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; use futures::stream::StreamFuture; @@ -84,6 +86,11 @@ pub struct JsRuntime { allocations: IsolateAllocations, } +type DynImportModEvaluate = + (ModuleId, v8::Global, v8::Global); +type ModEvaluate = + (v8::Global, mpsc::Sender>); + /// Internal state for JsRuntime which is stored in one of v8::Isolate's /// embedder slots. pub(crate) struct JsRuntimeState { @@ -92,6 +99,8 @@ pub(crate) struct JsRuntimeState { pub(crate) js_recv_cb: Option>, pub(crate) js_macrotask_cb: Option>, pub(crate) pending_promise_exceptions: HashMap>, + pub(crate) pending_dyn_mod_evaluate: HashMap, + pub(crate) pending_mod_evaluate: HashMap, pub(crate) js_error_create_fn: Box, pub(crate) shared: SharedQueue, pub(crate) pending_ops: FuturesUnordered, @@ -278,6 +287,8 @@ impl JsRuntime { isolate.set_slot(Rc::new(RefCell::new(JsRuntimeState { global_context: Some(global_context), pending_promise_exceptions: HashMap::new(), + pending_dyn_mod_evaluate: HashMap::new(), + pending_mod_evaluate: HashMap::new(), shared_ab: None, js_recv_cb: None, js_macrotask_cb: None, @@ -484,6 +495,9 @@ impl Future for JsRuntime { state.waker.register(cx.waker()); } + // Top level modules + runtime.poll_mod_evaluate(cx)?; + // Dynamic module loading - ie. modules loaded using "import()" { let poll_imports = runtime.prepare_dyn_imports(cx)?; @@ -492,6 +506,8 @@ impl Future for JsRuntime { let poll_imports = runtime.poll_dyn_imports(cx)?; assert!(poll_imports.is_ready()); + runtime.poll_dyn_imports_evaluate(cx)?; + runtime.check_promise_exceptions()?; } @@ -508,6 +524,8 @@ impl Future for JsRuntime { state.pending_ops.is_empty() && state.pending_dyn_imports.is_empty() && state.preparing_dyn_imports.is_empty() + && state.pending_dyn_mod_evaluate.is_empty() + && state.pending_mod_evaluate.is_empty() }; if is_idle { @@ -700,7 +718,91 @@ impl JsRuntime { /// `AnyError` can be downcast to a type that exposes additional information /// about the V8 exception. By default this type is `JsError`, however it may /// be a different type if `RuntimeOptions::js_error_create_fn` has been set. - pub fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> { + pub fn dyn_mod_evaluate( + &mut self, + load_id: ModuleLoadId, + id: ModuleId, + ) -> Result<(), AnyError> { + self.shared_init(); + + let state_rc = Self::state(self); + let context = self.global_context(); + let context1 = self.global_context(); + + let module_handle = state_rc + .borrow() + .modules + .get_info(id) + .expect("ModuleInfo not found") + .handle + .clone(); + + let status = { + let scope = &mut v8::HandleScope::with_context(&mut **self, context); + let module = module_handle.get(scope); + module.get_status() + }; + + if status == v8::ModuleStatus::Instantiated { + // IMPORTANT: Top-level-await is enabled, which means that return value + // of module evaluation is a promise. + // + // Because that promise is created internally by V8, when error occurs during + // module evaluation the promise is rejected, and since the promise has no rejection + // handler it will result in call to `bindings::promise_reject_callback` adding + // the promise to pending promise rejection table - meaning JsRuntime will return + // error on next poll(). + // + // This situation is not desirable as we want to manually return error at the + // end of this function to handle it further. It means we need to manually + // remove this promise from pending promise rejection table. + // + // For more details see: + // https://github.com/denoland/deno/issues/4908 + // https://v8.dev/features/top-level-await#module-execution-order + let scope = &mut v8::HandleScope::with_context(&mut **self, context1); + let module = v8::Local::new(scope, &module_handle); + let maybe_value = module.evaluate(scope); + + // Update status after evaluating. + let status = module.get_status(); + + if let Some(value) = maybe_value { + assert!( + status == v8::ModuleStatus::Evaluated + || status == v8::ModuleStatus::Errored + ); + let promise = v8::Local::::try_from(value) + .expect("Expected to get promise as module evaluation result"); + let promise_id = promise.get_identity_hash(); + let mut state = state_rc.borrow_mut(); + state.pending_promise_exceptions.remove(&promise_id); + let promise_global = v8::Global::new(scope, promise); + let module_global = v8::Global::new(scope, module); + state + .pending_dyn_mod_evaluate + .insert(load_id, (id, promise_global, module_global)); + } else { + assert!(status == v8::ModuleStatus::Errored); + } + } + + if status == v8::ModuleStatus::Evaluated { + self.dyn_import_done(load_id, id)?; + } + + Ok(()) + } + + /// Evaluates an already instantiated ES module. + /// + /// `AnyError` can be downcast to a type that exposes additional information + /// about the V8 exception. By default this type is `JsError`, however it may + /// be a different type if `RuntimeOptions::js_error_create_fn` has been set. + fn mod_evaluate_inner( + &mut self, + id: ModuleId, + ) -> Result>, AnyError> { self.shared_init(); let state_rc = Self::state(self); @@ -716,6 +818,8 @@ impl JsRuntime { .expect("ModuleInfo not found"); let mut status = module.get_status(); + let (sender, receiver) = mpsc::channel(1); + if status == v8::ModuleStatus::Instantiated { // IMPORTANT: Top-level-await is enabled, which means that return value // of module evaluation is a promise. @@ -748,20 +852,30 @@ impl JsRuntime { let promise_id = promise.get_identity_hash(); let mut state = state_rc.borrow_mut(); state.pending_promise_exceptions.remove(&promise_id); + let promise_global = v8::Global::new(scope, promise); + state + .pending_mod_evaluate + .insert(id, (promise_global, sender)); } else { assert!(status == v8::ModuleStatus::Errored); } } - match status { - v8::ModuleStatus::Evaluated => Ok(()), - v8::ModuleStatus::Errored => { - let exception = module.get_exception(); - exception_to_err_result(scope, exception) - .map_err(|err| attach_handle_to_error(scope, err, exception)) + Ok(receiver) + } + + pub async fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> { + let mut receiver = self.mod_evaluate_inner(id)?; + + poll_fn(|cx| { + if let Poll::Ready(result) = receiver.poll_next_unpin(cx) { + debug!("received module evaluate"); + return Poll::Ready(result.unwrap()); } - other => panic!("Unexpected module status {:?}", other), - } + let _r = self.poll_unpin(cx)?; + Poll::Pending + }) + .await } fn dyn_import_error( @@ -922,16 +1036,123 @@ impl JsRuntime { // Load is done. let module_id = load.root_module_id.unwrap(); self.mod_instantiate(module_id)?; - match self.mod_evaluate(module_id) { - Ok(()) => self.dyn_import_done(dyn_import_id, module_id)?, - Err(err) => self.dyn_import_error(dyn_import_id, err)?, - }; + self.dyn_mod_evaluate(dyn_import_id, module_id)?; } } } } } + fn poll_mod_evaluate(&mut self, _cx: &mut Context) -> Result<(), AnyError> { + let state_rc = Self::state(self); + + let context = self.global_context(); + { + let scope = &mut v8::HandleScope::with_context(&mut **self, context); + + let mut state = state_rc.borrow_mut(); + + if let Some(&module_id) = state.pending_mod_evaluate.keys().next() { + let handle = state.pending_mod_evaluate.remove(&module_id).unwrap(); + drop(state); + + let promise = handle.0.get(scope); + let mut sender = handle.1.clone(); + + let promise_state = promise.state(); + + match promise_state { + v8::PromiseState::Pending => { + state_rc + .borrow_mut() + .pending_mod_evaluate + .insert(module_id, handle); + state_rc.borrow().waker.wake(); + } + v8::PromiseState::Fulfilled => { + sender.try_send(Ok(())).unwrap(); + } + v8::PromiseState::Rejected => { + let exception = promise.result(scope); + let err1 = exception_to_err_result::<()>(scope, exception) + .map_err(|err| attach_handle_to_error(scope, err, exception)) + .unwrap_err(); + sender.try_send(Err(err1)).unwrap(); + } + } + } + }; + + Ok(()) + } + + fn poll_dyn_imports_evaluate( + &mut self, + _cx: &mut Context, + ) -> Result<(), AnyError> { + let state_rc = Self::state(self); + + loop { + let context = self.global_context(); + let maybe_result = { + let scope = &mut v8::HandleScope::with_context(&mut **self, context); + + let mut state = state_rc.borrow_mut(); + if let Some(&dyn_import_id) = + state.pending_dyn_mod_evaluate.keys().next() + { + let handle = state + .pending_dyn_mod_evaluate + .remove(&dyn_import_id) + .unwrap(); + drop(state); + + let module_id = handle.0; + let promise = handle.1.get(scope); + let _module = handle.2.get(scope); + + let promise_state = promise.state(); + + match promise_state { + v8::PromiseState::Pending => { + state_rc + .borrow_mut() + .pending_dyn_mod_evaluate + .insert(dyn_import_id, handle); + state_rc.borrow().waker.wake(); + None + } + v8::PromiseState::Fulfilled => Some(Ok((dyn_import_id, module_id))), + v8::PromiseState::Rejected => { + let exception = promise.result(scope); + let err1 = exception_to_err_result::<()>(scope, exception) + .map_err(|err| attach_handle_to_error(scope, err, exception)) + .unwrap_err(); + Some(Err((dyn_import_id, err1))) + } + } + } else { + None + } + }; + + if let Some(result) = maybe_result { + match result { + Ok((dyn_import_id, module_id)) => { + self.dyn_import_done(dyn_import_id, module_id)?; + } + Err((dyn_import_id, err1)) => { + self.dyn_import_error(dyn_import_id, err1)?; + } + } + } else { + break; + } + } + + Ok(()) + } + fn register_during_load( &mut self, info: ModuleSource, @@ -2003,7 +2224,7 @@ pub mod tests { runtime.mod_instantiate(mod_a).unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); - runtime.mod_evaluate(mod_a).unwrap(); + runtime.mod_evaluate_inner(mod_a).unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); } @@ -2246,7 +2467,7 @@ pub mod tests { ) .unwrap(); - runtime.mod_evaluate(module_id).unwrap(); + futures::executor::block_on(runtime.mod_evaluate(module_id)).unwrap(); let _snapshot = runtime.snapshot(); }