diff --git a/cli/tests/integration/worker_tests.rs b/cli/tests/integration/worker_tests.rs index c17b63af97..8bb051ff37 100644 --- a/cli/tests/integration/worker_tests.rs +++ b/cli/tests/integration/worker_tests.rs @@ -99,3 +99,8 @@ itest!(worker_permissions_blob_local { http_server: true, exit_code: 1, }); + +itest!(worker_terminate_tla_crash { + args: "run --quiet --reload workers/terminate_tla_crash.js", + output: "workers/terminate_tla_crash.js.out", +}); diff --git a/cli/tests/testdata/workers/terminate_tla_crash.js b/cli/tests/testdata/workers/terminate_tla_crash.js new file mode 100644 index 0000000000..f793b8c8ed --- /dev/null +++ b/cli/tests/testdata/workers/terminate_tla_crash.js @@ -0,0 +1,21 @@ +// Test for https://github.com/denoland/deno/issues/12658 +// +// If a worker is terminated immediately after construction, and the worker's +// main module uses top-level await, V8 has a chance to crash. +// +// These crashes are so rare in debug mode that I've only seen them once. They +// happen a lot more often in release mode. + +const workerModule = ` + await new Promise(resolve => setTimeout(resolve, 1000)); +`; + +// Iterating 10 times to increase the likelihood of triggering the crash, at +// least in release mode. +for (let i = 0; i < 10; i++) { + const worker = new Worker( + `data:application/javascript;base64,${btoa(workerModule)}`, + { type: "module" }, + ); + worker.terminate(); +} diff --git a/cli/tests/testdata/workers/terminate_tla_crash.js.out b/cli/tests/testdata/workers/terminate_tla_crash.js.out new file mode 100644 index 0000000000..e69de29bb2 diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index c7358bc74e..dd758cd440 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -12,6 +12,7 @@ use deno_core::error::JsError; use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; use deno_core::futures::stream::StreamExt; +use deno_core::futures::task::AtomicWaker; use deno_core::located_script_name; use deno_core::serde::Deserialize; use deno_core::serde::Serialize; @@ -113,7 +114,9 @@ pub struct WebWorkerInternalHandle { sender: mpsc::Sender, pub port: Rc, pub cancel: Rc, - terminated: Arc, + termination_signal: Arc, + has_terminated: Arc, + terminate_waker: Arc, isolate_handle: v8::IsolateHandle, pub worker_type: WebWorkerType, } @@ -127,7 +130,7 @@ impl WebWorkerInternalHandle { // // Therefore just treat it as if the worker has terminated and return. if sender.is_closed() { - self.terminated.store(true, Ordering::SeqCst); + self.has_terminated.store(true, Ordering::SeqCst); return Ok(()); } sender.try_send(event)?; @@ -136,7 +139,21 @@ impl WebWorkerInternalHandle { /// Check if this worker is terminated or being terminated pub fn is_terminated(&self) -> bool { - self.terminated.load(Ordering::SeqCst) + self.has_terminated.load(Ordering::SeqCst) + } + + /// Check if this worker must terminate (because the termination signal is + /// set), and terminates it if so. Returns whether the worker is terminated or + /// being terminated, as with [`Self::is_terminated()`]. + pub fn terminate_if_needed(&mut self) -> bool { + let has_terminated = self.is_terminated(); + + if !has_terminated && self.termination_signal.load(Ordering::SeqCst) { + self.terminate(); + return true; + } + + has_terminated } /// Terminate the worker @@ -147,7 +164,7 @@ impl WebWorkerInternalHandle { // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. - let already_terminated = self.terminated.swap(true, Ordering::SeqCst); + let already_terminated = self.has_terminated.swap(true, Ordering::SeqCst); if !already_terminated { // Stop javascript execution @@ -162,7 +179,9 @@ impl WebWorkerInternalHandle { pub struct SendableWebWorkerHandle { port: MessagePort, receiver: mpsc::Receiver, - terminated: Arc, + termination_signal: Arc, + has_terminated: Arc, + terminate_waker: Arc, isolate_handle: v8::IsolateHandle, } @@ -171,7 +190,9 @@ impl From for WebWorkerHandle { WebWorkerHandle { receiver: Rc::new(RefCell::new(handle.receiver)), port: Rc::new(handle.port), - terminated: handle.terminated, + termination_signal: handle.termination_signal, + has_terminated: handle.has_terminated, + terminate_waker: handle.terminate_waker, isolate_handle: handle.isolate_handle, } } @@ -188,7 +209,9 @@ impl From for WebWorkerHandle { pub struct WebWorkerHandle { pub port: Rc, receiver: Rc>>, - terminated: Arc, + termination_signal: Arc, + has_terminated: Arc, + terminate_waker: Arc, isolate_handle: v8::IsolateHandle, } @@ -204,19 +227,37 @@ impl WebWorkerHandle { } /// Terminate the worker - /// This function will set terminated to true, terminate the isolate and close the message channel + /// This function will set the termination signal, close the message channel, + /// and schedule to terminate the isolate after two seconds. pub fn terminate(self) { - // A WebWorkerHandle can be terminated / dropped after `self.close()` has - // been called inside the worker, but only a single "termination" can occur, - // so we need a guard here. - let already_terminated = self.terminated.swap(true, Ordering::SeqCst); + use std::thread::{sleep, spawn}; + use std::time::Duration; - if !already_terminated { - // Stop javascript execution - self.isolate_handle.terminate_execution(); - } + let schedule_termination = + !self.termination_signal.swap(true, Ordering::SeqCst); self.port.disentangle(); + + if schedule_termination && !self.has_terminated.load(Ordering::SeqCst) { + // Wake up the worker's event loop so it can terminate. + self.terminate_waker.wake(); + + let has_terminated = self.has_terminated.clone(); + + // Schedule to terminate the isolate's execution. + spawn(move || { + sleep(Duration::from_secs(2)); + + // A worker's isolate can only be terminated once, so we need a guard + // here. + let already_terminated = has_terminated.swap(true, Ordering::SeqCst); + + if !already_terminated { + // Stop javascript execution + self.isolate_handle.terminate_execution(); + } + }); + } } } @@ -226,11 +267,15 @@ fn create_handles( ) -> (WebWorkerInternalHandle, SendableWebWorkerHandle) { let (parent_port, worker_port) = create_entangled_message_port(); let (ctrl_tx, ctrl_rx) = mpsc::channel::(1); - let terminated = Arc::new(AtomicBool::new(false)); + let termination_signal = Arc::new(AtomicBool::new(false)); + let has_terminated = Arc::new(AtomicBool::new(false)); + let terminate_waker = Arc::new(AtomicWaker::new()); let internal_handle = WebWorkerInternalHandle { sender: ctrl_tx, port: Rc::new(parent_port), - terminated: terminated.clone(), + termination_signal: termination_signal.clone(), + has_terminated: has_terminated.clone(), + terminate_waker: terminate_waker.clone(), isolate_handle: isolate_handle.clone(), cancel: CancelHandle::new_rc(), worker_type, @@ -238,7 +283,9 @@ fn create_handles( let external_handle = SendableWebWorkerHandle { receiver: ctrl_rx, port: worker_port, - terminated, + termination_signal, + has_terminated, + terminate_waker, isolate_handle, }; (internal_handle, external_handle) @@ -496,14 +543,16 @@ impl WebWorker { wait_for_inspector: bool, ) -> Poll> { // If awakened because we are terminating, just return Ok - if self.internal_handle.is_terminated() { + if self.internal_handle.terminate_if_needed() { return Poll::Ready(Ok(())); } + self.internal_handle.terminate_waker.register(cx.waker()); + match self.js_runtime.poll_event_loop(cx, wait_for_inspector) { Poll::Ready(r) => { // If js ended because we are terminating, just return Ok - if self.internal_handle.is_terminated() { + if self.internal_handle.terminate_if_needed() { return Poll::Ready(Ok(())); }