1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-25 08:39:09 -05:00

fix(runtime/web_worker): Don't block self.onmessage with TLA (#9619)

This commit rewrites implementation of "JsRuntime::mod_evaluate". 

Event loop is no longer polled automatically and users must manually
drive event loop forward after calling "mod_evaluate".

Co-authored-by: Nayeem Rahman <nayeemrmn99@gmail.com>
Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
Nayeem Rahman 2021-03-04 12:19:47 +00:00 committed by GitHub
parent af7e02124f
commit 0f2121355f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 98 additions and 30 deletions

View file

@ -675,3 +675,24 @@ Deno.test({
w.terminate(); w.terminate();
}, },
}); });
Deno.test({
name: "Worker with top-level-await",
fn: async function (): Promise<void> {
const result = deferred();
const worker = new Worker(
new URL("worker_with_top_level_await.ts", import.meta.url).href,
{ type: "module" },
);
worker.onmessage = (e): void => {
if (e.data == "ready") {
worker.postMessage("trigger worker handler");
} else if (e.data == "triggered worker handler") {
result.resolve();
} else {
result.reject(new Error("Handler didn't run during top-level delay."));
}
};
await result;
},
});

View file

@ -0,0 +1,15 @@
function delay(ms: number): Promise<void> {
return new Promise<void>((resolve) => {
setTimeout(() => {
resolve();
}, ms);
});
}
onmessage = (e: MessageEvent) => {
postMessage("triggered worker handler");
close();
};
postMessage("ready");
await delay(1000);
postMessage("never");

View file

@ -687,7 +687,8 @@ mod tests {
let a_id_fut = runtime.load_module(&spec, None); let a_id_fut = runtime.load_module(&spec, None);
let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load"); let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load");
futures::executor::block_on(runtime.mod_evaluate(a_id)).unwrap(); runtime.mod_evaluate(a_id);
futures::executor::block_on(runtime.run_event_loop()).unwrap();
let l = loads.lock().unwrap(); let l = loads.lock().unwrap();
assert_eq!( assert_eq!(
l.to_vec(), l.to_vec(),
@ -754,7 +755,8 @@ mod tests {
let result = runtime.load_module(&spec, None).await; let result = runtime.load_module(&spec, None).await;
assert!(result.is_ok()); assert!(result.is_ok());
let circular1_id = result.unwrap(); let circular1_id = result.unwrap();
runtime.mod_evaluate(circular1_id).await.unwrap(); runtime.mod_evaluate(circular1_id);
runtime.run_event_loop().await.unwrap();
let l = loads.lock().unwrap(); let l = loads.lock().unwrap();
assert_eq!( assert_eq!(
@ -827,7 +829,8 @@ mod tests {
println!(">> result {:?}", result); println!(">> result {:?}", result);
assert!(result.is_ok()); assert!(result.is_ok());
let redirect1_id = result.unwrap(); let redirect1_id = result.unwrap();
runtime.mod_evaluate(redirect1_id).await.unwrap(); runtime.mod_evaluate(redirect1_id);
runtime.run_event_loop().await.unwrap();
let l = loads.lock().unwrap(); let l = loads.lock().unwrap();
assert_eq!( assert_eq!(
l.to_vec(), l.to_vec(),
@ -976,7 +979,8 @@ mod tests {
let main_id = let main_id =
futures::executor::block_on(main_id_fut).expect("Failed to load"); futures::executor::block_on(main_id_fut).expect("Failed to load");
futures::executor::block_on(runtime.mod_evaluate(main_id)).unwrap(); runtime.mod_evaluate(main_id);
futures::executor::block_on(runtime.run_event_loop()).unwrap();
let l = loads.lock().unwrap(); let l = loads.lock().unwrap();
assert_eq!( assert_eq!(

View file

@ -853,14 +853,19 @@ impl JsRuntime {
Ok(()) Ok(())
} }
// TODO(bartlomieju): make it return `ModuleEvaluationFuture`?
/// Evaluates an already instantiated ES module. /// Evaluates an already instantiated ES module.
/// ///
/// Returns a receiver handle that resolves when module promise resolves.
/// Implementors must manually call `run_event_loop()` to drive module
/// evaluation future.
///
/// `AnyError` can be downcast to a type that exposes additional information /// `AnyError` can be downcast to a type that exposes additional information
/// about the V8 exception. By default this type is `JsError`, however it may /// about the V8 exception. By default this type is `JsError`, however it may
/// be a different type if `RuntimeOptions::js_error_create_fn` has been set. /// be a different type if `RuntimeOptions::js_error_create_fn` has been set.
/// ///
/// This function panics if module has not been instantiated. /// This function panics if module has not been instantiated.
fn mod_evaluate_inner( pub fn mod_evaluate(
&mut self, &mut self,
id: ModuleId, id: ModuleId,
) -> mpsc::Receiver<Result<(), AnyError>> { ) -> mpsc::Receiver<Result<(), AnyError>> {
@ -929,24 +934,6 @@ impl JsRuntime {
receiver receiver
} }
pub async fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> {
let mut receiver = self.mod_evaluate_inner(id);
poll_fn(|cx| {
if let Poll::Ready(maybe_result) = receiver.poll_next_unpin(cx) {
debug!("received module evaluate {:#?}", maybe_result);
// If `None` is returned it means that runtime was destroyed before
// evaluation was complete. This can happen in Web Worker when `self.close()`
// is called at top level.
let result = maybe_result.unwrap_or(Ok(()));
return Poll::Ready(result);
}
let _r = self.poll_event_loop(cx)?;
Poll::Pending
})
.await
}
fn dyn_import_error(&mut self, id: ModuleLoadId, err: AnyError) { fn dyn_import_error(&mut self, id: ModuleLoadId, err: AnyError) {
let state_rc = Self::state(self.v8_isolate()); let state_rc = Self::state(self.v8_isolate());
let context = self.global_context(); let context = self.global_context();
@ -1140,7 +1127,8 @@ impl JsRuntime {
v8::PromiseState::Fulfilled => { v8::PromiseState::Fulfilled => {
state.pending_mod_evaluate.take(); state.pending_mod_evaluate.take();
scope.perform_microtask_checkpoint(); scope.perform_microtask_checkpoint();
sender.try_send(Ok(())).unwrap(); // Receiver end might have been already dropped, ignore the result
let _ = sender.try_send(Ok(()));
} }
v8::PromiseState::Rejected => { v8::PromiseState::Rejected => {
let exception = promise.result(scope); let exception = promise.result(scope);
@ -1150,7 +1138,8 @@ impl JsRuntime {
let err1 = exception_to_err_result::<()>(scope, exception, false) let err1 = exception_to_err_result::<()>(scope, exception, false)
.map_err(|err| attach_handle_to_error(scope, err, exception)) .map_err(|err| attach_handle_to_error(scope, err, exception))
.unwrap_err(); .unwrap_err();
sender.try_send(Err(err1)).unwrap(); // Receiver end might have been already dropped, ignore the result
let _ = sender.try_send(Err(err1));
} }
} }
} }
@ -1937,7 +1926,7 @@ pub mod tests {
throw Error("assert"); throw Error("assert");
} }
} }
let asyncRecv = 0; let asyncRecv = 0;
Deno.core.setAsyncHandler(1, (buf) => { Deno.core.setAsyncHandler(1, (buf) => {
assert(buf.byteLength === 100 * 1024 * 1024); assert(buf.byteLength === 100 * 1024 * 1024);
@ -2351,7 +2340,7 @@ pub mod tests {
runtime.mod_instantiate(mod_a).unwrap(); runtime.mod_instantiate(mod_a).unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); assert_eq!(dispatch_count.load(Ordering::Relaxed), 0);
runtime.mod_evaluate_inner(mod_a); runtime.mod_evaluate(mod_a);
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
} }
@ -2594,7 +2583,8 @@ pub mod tests {
) )
.unwrap(); .unwrap();
futures::executor::block_on(runtime.mod_evaluate(module_id)).unwrap(); runtime.mod_evaluate(module_id);
futures::executor::block_on(runtime.run_event_loop()).unwrap();
let _snapshot = runtime.snapshot(); let _snapshot = runtime.snapshot();
} }

View file

@ -325,7 +325,28 @@ impl WebWorker {
module_specifier: &ModuleSpecifier, module_specifier: &ModuleSpecifier,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let id = self.js_runtime.load_module(module_specifier, None).await?; let id = self.js_runtime.load_module(module_specifier, None).await?;
self.js_runtime.mod_evaluate(id).await
let mut receiver = self.js_runtime.mod_evaluate(id);
tokio::select! {
maybe_result = receiver.next() => {
debug!("received worker module evaluate {:#?}", maybe_result);
// If `None` is returned it means that runtime was destroyed before
// evaluation was complete. This can happen in Web Worker when `self.close()`
// is called at top level.
let result = maybe_result.unwrap_or(Ok(()));
return result;
}
event_loop_result = self.run_event_loop() => {
if self.has_been_terminated() {
return Ok(());
}
event_loop_result?;
let maybe_result = receiver.next().await;
let result = maybe_result.unwrap_or(Ok(()));
return result;
}
}
} }
/// Returns a way to communicate with the Worker from other threads. /// Returns a way to communicate with the Worker from other threads.
@ -384,6 +405,8 @@ impl WebWorker {
let msg = String::from_utf8(msg.to_vec()).unwrap(); let msg = String::from_utf8(msg.to_vec()).unwrap();
let script = format!("workerMessageRecvCallback({})", msg); let script = format!("workerMessageRecvCallback({})", msg);
// TODO(bartlomieju): set proper script name like "deno:runtime/web_worker.js"
// so it's dimmed in stack trace instead of using "__anonymous__"
if let Err(e) = self.execute(&script) { if let Err(e) = self.execute(&script) {
// If execution was terminated during message callback then // If execution was terminated during message callback then
// just ignore it // just ignore it

View file

@ -10,6 +10,7 @@ use crate::permissions::Permissions;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn; use deno_core::futures::future::poll_fn;
use deno_core::futures::future::FutureExt; use deno_core::futures::future::FutureExt;
use deno_core::futures::stream::StreamExt;
use deno_core::serde_json; use deno_core::serde_json;
use deno_core::serde_json::json; use deno_core::serde_json::json;
use deno_core::url::Url; use deno_core::url::Url;
@ -221,7 +222,21 @@ impl MainWorker {
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let id = self.preload_module(module_specifier).await?; let id = self.preload_module(module_specifier).await?;
self.wait_for_inspector_session(); self.wait_for_inspector_session();
self.js_runtime.mod_evaluate(id).await let mut receiver = self.js_runtime.mod_evaluate(id);
tokio::select! {
maybe_result = receiver.next() => {
debug!("received module evaluate {:#?}", maybe_result);
let result = maybe_result.expect("Module evaluation result not provided.");
return result;
}
event_loop_result = self.run_event_loop() => {
event_loop_result?;
let maybe_result = receiver.next().await;
let result = maybe_result.expect("Module evaluation result not provided.");
return result;
}
}
} }
fn wait_for_inspector_session(&mut self) { fn wait_for_inspector_session(&mut self) {