diff --git a/cli/tests/workers/test.ts b/cli/tests/workers/test.ts index 0888e01db2..f411e434f1 100644 --- a/cli/tests/workers/test.ts +++ b/cli/tests/workers/test.ts @@ -675,3 +675,24 @@ Deno.test({ w.terminate(); }, }); + +Deno.test({ + name: "Worker with top-level-await", + fn: async function (): Promise { + const result = deferred(); + const worker = new Worker( + new URL("worker_with_top_level_await.ts", import.meta.url).href, + { type: "module" }, + ); + worker.onmessage = (e): void => { + if (e.data == "ready") { + worker.postMessage("trigger worker handler"); + } else if (e.data == "triggered worker handler") { + result.resolve(); + } else { + result.reject(new Error("Handler didn't run during top-level delay.")); + } + }; + await result; + }, +}); diff --git a/cli/tests/workers/worker_with_top_level_await.ts b/cli/tests/workers/worker_with_top_level_await.ts new file mode 100644 index 0000000000..6c55289008 --- /dev/null +++ b/cli/tests/workers/worker_with_top_level_await.ts @@ -0,0 +1,15 @@ +function delay(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, ms); + }); +} + +onmessage = (e: MessageEvent) => { + postMessage("triggered worker handler"); + close(); +}; +postMessage("ready"); +await delay(1000); +postMessage("never"); diff --git a/core/modules.rs b/core/modules.rs index ea772a8b2a..b9b99d3b50 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -687,7 +687,8 @@ mod tests { let a_id_fut = runtime.load_module(&spec, None); let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load"); - futures::executor::block_on(runtime.mod_evaluate(a_id)).unwrap(); + runtime.mod_evaluate(a_id); + futures::executor::block_on(runtime.run_event_loop()).unwrap(); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -754,7 +755,8 @@ mod tests { let result = runtime.load_module(&spec, None).await; assert!(result.is_ok()); let circular1_id = result.unwrap(); - runtime.mod_evaluate(circular1_id).await.unwrap(); + runtime.mod_evaluate(circular1_id); + runtime.run_event_loop().await.unwrap(); let l = loads.lock().unwrap(); assert_eq!( @@ -827,7 +829,8 @@ mod tests { println!(">> result {:?}", result); assert!(result.is_ok()); let redirect1_id = result.unwrap(); - runtime.mod_evaluate(redirect1_id).await.unwrap(); + runtime.mod_evaluate(redirect1_id); + runtime.run_event_loop().await.unwrap(); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -976,7 +979,8 @@ mod tests { let main_id = futures::executor::block_on(main_id_fut).expect("Failed to load"); - futures::executor::block_on(runtime.mod_evaluate(main_id)).unwrap(); + runtime.mod_evaluate(main_id); + futures::executor::block_on(runtime.run_event_loop()).unwrap(); let l = loads.lock().unwrap(); assert_eq!( diff --git a/core/runtime.rs b/core/runtime.rs index 595a7733bc..66d51eb730 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -853,14 +853,19 @@ impl JsRuntime { Ok(()) } + // TODO(bartlomieju): make it return `ModuleEvaluationFuture`? /// Evaluates an already instantiated ES module. /// + /// Returns a receiver handle that resolves when module promise resolves. + /// Implementors must manually call `run_event_loop()` to drive module + /// evaluation future. + /// /// `AnyError` can be downcast to a type that exposes additional information /// about the V8 exception. By default this type is `JsError`, however it may /// be a different type if `RuntimeOptions::js_error_create_fn` has been set. /// /// This function panics if module has not been instantiated. - fn mod_evaluate_inner( + pub fn mod_evaluate( &mut self, id: ModuleId, ) -> mpsc::Receiver> { @@ -929,24 +934,6 @@ impl JsRuntime { receiver } - pub async fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> { - let mut receiver = self.mod_evaluate_inner(id); - - poll_fn(|cx| { - if let Poll::Ready(maybe_result) = receiver.poll_next_unpin(cx) { - debug!("received module evaluate {:#?}", maybe_result); - // If `None` is returned it means that runtime was destroyed before - // evaluation was complete. This can happen in Web Worker when `self.close()` - // is called at top level. - let result = maybe_result.unwrap_or(Ok(())); - return Poll::Ready(result); - } - let _r = self.poll_event_loop(cx)?; - Poll::Pending - }) - .await - } - fn dyn_import_error(&mut self, id: ModuleLoadId, err: AnyError) { let state_rc = Self::state(self.v8_isolate()); let context = self.global_context(); @@ -1140,7 +1127,8 @@ impl JsRuntime { v8::PromiseState::Fulfilled => { state.pending_mod_evaluate.take(); scope.perform_microtask_checkpoint(); - sender.try_send(Ok(())).unwrap(); + // Receiver end might have been already dropped, ignore the result + let _ = sender.try_send(Ok(())); } v8::PromiseState::Rejected => { let exception = promise.result(scope); @@ -1150,7 +1138,8 @@ impl JsRuntime { let err1 = exception_to_err_result::<()>(scope, exception, false) .map_err(|err| attach_handle_to_error(scope, err, exception)) .unwrap_err(); - sender.try_send(Err(err1)).unwrap(); + // Receiver end might have been already dropped, ignore the result + let _ = sender.try_send(Err(err1)); } } } @@ -1937,7 +1926,7 @@ pub mod tests { throw Error("assert"); } } - + let asyncRecv = 0; Deno.core.setAsyncHandler(1, (buf) => { assert(buf.byteLength === 100 * 1024 * 1024); @@ -2351,7 +2340,7 @@ pub mod tests { runtime.mod_instantiate(mod_a).unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); - runtime.mod_evaluate_inner(mod_a); + runtime.mod_evaluate(mod_a); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); } @@ -2594,7 +2583,8 @@ pub mod tests { ) .unwrap(); - futures::executor::block_on(runtime.mod_evaluate(module_id)).unwrap(); + runtime.mod_evaluate(module_id); + futures::executor::block_on(runtime.run_event_loop()).unwrap(); let _snapshot = runtime.snapshot(); } diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 30869ff41e..73d351c9c6 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -325,7 +325,28 @@ impl WebWorker { module_specifier: &ModuleSpecifier, ) -> Result<(), AnyError> { let id = self.js_runtime.load_module(module_specifier, None).await?; - self.js_runtime.mod_evaluate(id).await + + let mut receiver = self.js_runtime.mod_evaluate(id); + tokio::select! { + maybe_result = receiver.next() => { + debug!("received worker module evaluate {:#?}", maybe_result); + // If `None` is returned it means that runtime was destroyed before + // evaluation was complete. This can happen in Web Worker when `self.close()` + // is called at top level. + let result = maybe_result.unwrap_or(Ok(())); + return result; + } + + event_loop_result = self.run_event_loop() => { + if self.has_been_terminated() { + return Ok(()); + } + event_loop_result?; + let maybe_result = receiver.next().await; + let result = maybe_result.unwrap_or(Ok(())); + return result; + } + } } /// Returns a way to communicate with the Worker from other threads. @@ -384,6 +405,8 @@ impl WebWorker { let msg = String::from_utf8(msg.to_vec()).unwrap(); let script = format!("workerMessageRecvCallback({})", msg); + // TODO(bartlomieju): set proper script name like "deno:runtime/web_worker.js" + // so it's dimmed in stack trace instead of using "__anonymous__" if let Err(e) = self.execute(&script) { // If execution was terminated during message callback then // just ignore it diff --git a/runtime/worker.rs b/runtime/worker.rs index 97466fadb7..e63fdbe18c 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -10,6 +10,7 @@ use crate::permissions::Permissions; use deno_core::error::AnyError; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; +use deno_core::futures::stream::StreamExt; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url::Url; @@ -221,7 +222,21 @@ impl MainWorker { ) -> Result<(), AnyError> { let id = self.preload_module(module_specifier).await?; self.wait_for_inspector_session(); - self.js_runtime.mod_evaluate(id).await + let mut receiver = self.js_runtime.mod_evaluate(id); + tokio::select! { + maybe_result = receiver.next() => { + debug!("received module evaluate {:#?}", maybe_result); + let result = maybe_result.expect("Module evaluation result not provided."); + return result; + } + + event_loop_result = self.run_event_loop() => { + event_loop_result?; + let maybe_result = receiver.next().await; + let result = maybe_result.expect("Module evaluation result not provided."); + return result; + } + } } fn wait_for_inspector_session(&mut self) {