1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-07 06:46:59 -05:00

fix(workers): Make worker.terminate() not immediately kill the isolate (#12831)

Due to a bug in V8, terminating an isolate while a module with top-level
await is being evaluated would crash the process. This change makes it
so calling `worker.terminate()` will signal the worker to terminate at
the next iteration of the event loop, and it schedules a proper
termination of the worker's isolate after 2 seconds.

Co-authored-by: Andreu Botella <abb@randomunok.com>
This commit is contained in:
Bartek Iwańczuk 2021-11-29 20:39:37 +01:00 committed by GitHub
parent 44ccae108f
commit 49879bf38d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 96 additions and 21 deletions

View file

@ -99,3 +99,8 @@ itest!(worker_permissions_blob_local {
http_server: true,
exit_code: 1,
});
itest!(worker_terminate_tla_crash {
args: "run --quiet --reload workers/terminate_tla_crash.js",
output: "workers/terminate_tla_crash.js.out",
});

View file

@ -0,0 +1,21 @@
// Test for https://github.com/denoland/deno/issues/12658
//
// If a worker is terminated immediately after construction, and the worker's
// main module uses top-level await, V8 has a chance to crash.
//
// These crashes are so rare in debug mode that I've only seen them once. They
// happen a lot more often in release mode.
const workerModule = `
await new Promise(resolve => setTimeout(resolve, 1000));
`;
// Iterating 10 times to increase the likelihood of triggering the crash, at
// least in release mode.
for (let i = 0; i < 10; i++) {
const worker = new Worker(
`data:application/javascript;base64,${btoa(workerModule)}`,
{ type: "module" },
);
worker.terminate();
}

View file

View file

@ -12,6 +12,7 @@ use deno_core::error::JsError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::future::poll_fn;
use deno_core::futures::stream::StreamExt;
use deno_core::futures::task::AtomicWaker;
use deno_core::located_script_name;
use deno_core::serde::Deserialize;
use deno_core::serde::Serialize;
@ -113,7 +114,9 @@ pub struct WebWorkerInternalHandle {
sender: mpsc::Sender<WorkerControlEvent>,
pub port: Rc<MessagePort>,
pub cancel: Rc<CancelHandle>,
terminated: Arc<AtomicBool>,
termination_signal: Arc<AtomicBool>,
has_terminated: Arc<AtomicBool>,
terminate_waker: Arc<AtomicWaker>,
isolate_handle: v8::IsolateHandle,
pub worker_type: WebWorkerType,
}
@ -127,7 +130,7 @@ impl WebWorkerInternalHandle {
//
// Therefore just treat it as if the worker has terminated and return.
if sender.is_closed() {
self.terminated.store(true, Ordering::SeqCst);
self.has_terminated.store(true, Ordering::SeqCst);
return Ok(());
}
sender.try_send(event)?;
@ -136,7 +139,21 @@ impl WebWorkerInternalHandle {
/// Check if this worker is terminated or being terminated
pub fn is_terminated(&self) -> bool {
self.terminated.load(Ordering::SeqCst)
self.has_terminated.load(Ordering::SeqCst)
}
/// Check if this worker must terminate (because the termination signal is
/// set), and terminates it if so. Returns whether the worker is terminated or
/// being terminated, as with [`Self::is_terminated()`].
pub fn terminate_if_needed(&mut self) -> bool {
let has_terminated = self.is_terminated();
if !has_terminated && self.termination_signal.load(Ordering::SeqCst) {
self.terminate();
return true;
}
has_terminated
}
/// Terminate the worker
@ -147,7 +164,7 @@ impl WebWorkerInternalHandle {
// This function can be called multiple times by whomever holds
// the handle. However only a single "termination" should occur so
// we need a guard here.
let already_terminated = self.terminated.swap(true, Ordering::SeqCst);
let already_terminated = self.has_terminated.swap(true, Ordering::SeqCst);
if !already_terminated {
// Stop javascript execution
@ -162,7 +179,9 @@ impl WebWorkerInternalHandle {
pub struct SendableWebWorkerHandle {
port: MessagePort,
receiver: mpsc::Receiver<WorkerControlEvent>,
terminated: Arc<AtomicBool>,
termination_signal: Arc<AtomicBool>,
has_terminated: Arc<AtomicBool>,
terminate_waker: Arc<AtomicWaker>,
isolate_handle: v8::IsolateHandle,
}
@ -171,7 +190,9 @@ impl From<SendableWebWorkerHandle> for WebWorkerHandle {
WebWorkerHandle {
receiver: Rc::new(RefCell::new(handle.receiver)),
port: Rc::new(handle.port),
terminated: handle.terminated,
termination_signal: handle.termination_signal,
has_terminated: handle.has_terminated,
terminate_waker: handle.terminate_waker,
isolate_handle: handle.isolate_handle,
}
}
@ -188,7 +209,9 @@ impl From<SendableWebWorkerHandle> for WebWorkerHandle {
pub struct WebWorkerHandle {
pub port: Rc<MessagePort>,
receiver: Rc<RefCell<mpsc::Receiver<WorkerControlEvent>>>,
terminated: Arc<AtomicBool>,
termination_signal: Arc<AtomicBool>,
has_terminated: Arc<AtomicBool>,
terminate_waker: Arc<AtomicWaker>,
isolate_handle: v8::IsolateHandle,
}
@ -204,19 +227,37 @@ impl WebWorkerHandle {
}
/// Terminate the worker
/// This function will set terminated to true, terminate the isolate and close the message channel
/// This function will set the termination signal, close the message channel,
/// and schedule to terminate the isolate after two seconds.
pub fn terminate(self) {
// A WebWorkerHandle can be terminated / dropped after `self.close()` has
// been called inside the worker, but only a single "termination" can occur,
// so we need a guard here.
let already_terminated = self.terminated.swap(true, Ordering::SeqCst);
use std::thread::{sleep, spawn};
use std::time::Duration;
if !already_terminated {
// Stop javascript execution
self.isolate_handle.terminate_execution();
}
let schedule_termination =
!self.termination_signal.swap(true, Ordering::SeqCst);
self.port.disentangle();
if schedule_termination && !self.has_terminated.load(Ordering::SeqCst) {
// Wake up the worker's event loop so it can terminate.
self.terminate_waker.wake();
let has_terminated = self.has_terminated.clone();
// Schedule to terminate the isolate's execution.
spawn(move || {
sleep(Duration::from_secs(2));
// A worker's isolate can only be terminated once, so we need a guard
// here.
let already_terminated = has_terminated.swap(true, Ordering::SeqCst);
if !already_terminated {
// Stop javascript execution
self.isolate_handle.terminate_execution();
}
});
}
}
}
@ -226,11 +267,15 @@ fn create_handles(
) -> (WebWorkerInternalHandle, SendableWebWorkerHandle) {
let (parent_port, worker_port) = create_entangled_message_port();
let (ctrl_tx, ctrl_rx) = mpsc::channel::<WorkerControlEvent>(1);
let terminated = Arc::new(AtomicBool::new(false));
let termination_signal = Arc::new(AtomicBool::new(false));
let has_terminated = Arc::new(AtomicBool::new(false));
let terminate_waker = Arc::new(AtomicWaker::new());
let internal_handle = WebWorkerInternalHandle {
sender: ctrl_tx,
port: Rc::new(parent_port),
terminated: terminated.clone(),
termination_signal: termination_signal.clone(),
has_terminated: has_terminated.clone(),
terminate_waker: terminate_waker.clone(),
isolate_handle: isolate_handle.clone(),
cancel: CancelHandle::new_rc(),
worker_type,
@ -238,7 +283,9 @@ fn create_handles(
let external_handle = SendableWebWorkerHandle {
receiver: ctrl_rx,
port: worker_port,
terminated,
termination_signal,
has_terminated,
terminate_waker,
isolate_handle,
};
(internal_handle, external_handle)
@ -496,14 +543,16 @@ impl WebWorker {
wait_for_inspector: bool,
) -> Poll<Result<(), AnyError>> {
// If awakened because we are terminating, just return Ok
if self.internal_handle.is_terminated() {
if self.internal_handle.terminate_if_needed() {
return Poll::Ready(Ok(()));
}
self.internal_handle.terminate_waker.register(cx.waker());
match self.js_runtime.poll_event_loop(cx, wait_for_inspector) {
Poll::Ready(r) => {
// If js ended because we are terminating, just return Ok
if self.internal_handle.is_terminated() {
if self.internal_handle.terminate_if_needed() {
return Poll::Ready(Ok(()));
}