mirror of
https://github.com/denoland/deno.git
synced 2024-12-23 15:49:44 -05:00
fix(workers): Make worker.terminate()
not block the current thread (#13941)
Calling `worker.terminate()` used to kill the worker's isolate and then block until the worker's thread finished. This blocks the calling thread if the worker's event loop was blocked in a sync op (as with `Deno.sleepSync`), which wasn't realized at the time, but since the worker's isolate was killed at that moment, it would not block the calling thread if the worker was in a JS endless loop. However, in #12831, in order to work around a V8 bug, worker termination was changed to first set a signal to let the worker event loop know that termination has been requested, and only kill the isolate if the event loop has not finished after 2 seconds. However, this change kept the blocking, which meant that JS endless loops in the worker now blocked the parent for 2 seconds. As it turns out, after #12831 it is fine to signal termination and even kill the worker's isolate without waiting for the thread to finish, so this change does that. However, that might leave the async ops that receive messages and control data from the worker pending after `worker.terminate()`, which leads to odd results from the op sanitizer. Therefore, we set up a `CancelHandler` to cancel those ops when the worker is terminated.
This commit is contained in:
parent
8b8b21b553
commit
ba799b6729
4 changed files with 101 additions and 42 deletions
|
@ -1610,6 +1610,11 @@ itest!(worker_drop_handle_race {
|
|||
exit_code: 1,
|
||||
});
|
||||
|
||||
itest!(worker_drop_handle_race_terminate {
|
||||
args: "run --unstable worker_drop_handle_race_terminate.js",
|
||||
output: "worker_drop_handle_race_terminate.js.out",
|
||||
});
|
||||
|
||||
itest!(worker_close_nested {
|
||||
args: "run --quiet --reload --allow-read worker_close_nested.js",
|
||||
output: "worker_close_nested.js.out",
|
||||
|
|
40
cli/tests/testdata/worker_drop_handle_race_terminate.js
vendored
Normal file
40
cli/tests/testdata/worker_drop_handle_race_terminate.js
vendored
Normal file
|
@ -0,0 +1,40 @@
|
|||
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
// Test that the panic in https://github.com/denoland/deno/issues/11342 does not
|
||||
// happen when calling worker.terminate() after fixing
|
||||
// https://github.com/denoland/deno/issues/13705
|
||||
|
||||
function getCodeBlobUrl(code) {
|
||||
const blob = new Blob([code], { type: "text/javascript" });
|
||||
return URL.createObjectURL(blob);
|
||||
}
|
||||
|
||||
const WORKER2 = getCodeBlobUrl(`
|
||||
console.log("Worker 2");
|
||||
self.postMessage(undefined);
|
||||
|
||||
// We sleep for slightly under 2 seconds in order to make sure that worker 1
|
||||
// has closed, and that this worker's thread finishes normally rather than
|
||||
// being killed (which happens 2 seconds after calling terminate).
|
||||
Deno.sleepSync(1800);
|
||||
console.log("Finished sleeping in worker 2");
|
||||
`);
|
||||
|
||||
const WORKER1 = getCodeBlobUrl(`
|
||||
console.log("Worker 1");
|
||||
const worker = new Worker(
|
||||
${JSON.stringify(WORKER2)},
|
||||
{ type: "module", deno: { namespace: true } }
|
||||
);
|
||||
|
||||
worker.addEventListener("message", () => {
|
||||
console.log("Terminating");
|
||||
worker.terminate();
|
||||
self.close();
|
||||
});
|
||||
`);
|
||||
|
||||
new Worker(WORKER1, { type: "module", deno: { namespace: true } });
|
||||
|
||||
// Don't kill the process before worker 2 is finished.
|
||||
setTimeout(() => {}, 3000);
|
4
cli/tests/testdata/worker_drop_handle_race_terminate.js.out
vendored
Normal file
4
cli/tests/testdata/worker_drop_handle_race_terminate.js.out
vendored
Normal file
|
@ -0,0 +1,4 @@
|
|||
Worker 1
|
||||
Worker 2
|
||||
Terminating
|
||||
Finished sleeping in worker 2
|
|
@ -17,6 +17,8 @@ use deno_core::futures::future::LocalFutureObj;
|
|||
use deno_core::op;
|
||||
|
||||
use deno_core::serde::Deserialize;
|
||||
use deno_core::CancelFuture;
|
||||
use deno_core::CancelHandle;
|
||||
use deno_core::Extension;
|
||||
use deno_core::ModuleSpecifier;
|
||||
use deno_core::OpState;
|
||||
|
@ -27,7 +29,6 @@ use std::collections::HashMap;
|
|||
use std::rc::Rc;
|
||||
use std::sync::atomic::AtomicI32;
|
||||
use std::sync::Arc;
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
pub struct CreateWebWorkerArgs {
|
||||
pub name: String,
|
||||
|
@ -66,9 +67,8 @@ pub struct FormatJsErrorFnHolder(Option<Arc<FormatJsErrorFn>>);
|
|||
pub struct PreloadModuleCbHolder(Arc<PreloadModuleCb>);
|
||||
|
||||
pub struct WorkerThread {
|
||||
// It's an Option so we can take the value before dropping the WorkerThread.
|
||||
join_handle: Option<JoinHandle<Result<(), AnyError>>>,
|
||||
worker_handle: WebWorkerHandle,
|
||||
cancel_handle: Rc<CancelHandle>,
|
||||
|
||||
// A WorkerThread that hasn't been explicitly terminated can only be removed
|
||||
// from the WorkersTable once close messages have been received for both the
|
||||
|
@ -78,30 +78,16 @@ pub struct WorkerThread {
|
|||
}
|
||||
|
||||
impl WorkerThread {
|
||||
fn terminate(mut self) {
|
||||
self.worker_handle.clone().terminate();
|
||||
self
|
||||
.join_handle
|
||||
.take()
|
||||
.unwrap()
|
||||
.join()
|
||||
.expect("Worker thread panicked")
|
||||
.expect("Panic in worker event loop");
|
||||
|
||||
// Optimization so the Drop impl doesn't try to terminate the worker handle
|
||||
// again.
|
||||
self.ctrl_closed = true;
|
||||
self.message_closed = true;
|
||||
fn terminate(self) {
|
||||
// Cancel recv ops when terminating the worker, so they don't show up as
|
||||
// pending ops.
|
||||
self.cancel_handle.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WorkerThread {
|
||||
fn drop(&mut self) {
|
||||
// If either of the channels is closed, the worker thread has at least
|
||||
// started closing, and its event loop won't start another run.
|
||||
if !(self.ctrl_closed || self.message_closed) {
|
||||
self.worker_handle.clone().terminate();
|
||||
}
|
||||
self.worker_handle.clone().terminate();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -217,7 +203,7 @@ fn op_create_worker(
|
|||
std::thread::Builder::new().name(format!("{}", worker_id));
|
||||
|
||||
// Spawn it
|
||||
let join_handle = thread_builder.spawn(move || {
|
||||
thread_builder.spawn(move || {
|
||||
// Any error inside this block is terminal:
|
||||
// - JS worker is useless - meaning it throws an exception and can't do anything else,
|
||||
// all action done upon it should be noops
|
||||
|
@ -256,8 +242,8 @@ fn op_create_worker(
|
|||
let worker_handle = handle_receiver.recv().unwrap()?;
|
||||
|
||||
let worker_thread = WorkerThread {
|
||||
join_handle: Some(join_handle),
|
||||
worker_handle: worker_handle.into(),
|
||||
cancel_handle: CancelHandle::new_rc(),
|
||||
ctrl_closed: false,
|
||||
message_closed: false,
|
||||
};
|
||||
|
@ -330,30 +316,41 @@ async fn op_host_recv_ctrl(
|
|||
state: Rc<RefCell<OpState>>,
|
||||
id: WorkerId,
|
||||
) -> Result<WorkerControlEvent, AnyError> {
|
||||
let worker_handle = {
|
||||
let (worker_handle, cancel_handle) = {
|
||||
let state = state.borrow();
|
||||
let workers_table = state.borrow::<WorkersTable>();
|
||||
let maybe_handle = workers_table.get(&id);
|
||||
if let Some(handle) = maybe_handle {
|
||||
handle.worker_handle.clone()
|
||||
(handle.worker_handle.clone(), handle.cancel_handle.clone())
|
||||
} else {
|
||||
// If handle was not found it means worker has already shutdown
|
||||
return Ok(WorkerControlEvent::Close);
|
||||
}
|
||||
};
|
||||
|
||||
let maybe_event = worker_handle.get_control_event().await?;
|
||||
if let Some(event) = maybe_event {
|
||||
// Terminal error means that worker should be removed from worker table.
|
||||
if let WorkerControlEvent::TerminalError(_) = &event {
|
||||
close_channel(state, id, WorkerChannel::Ctrl);
|
||||
let maybe_event = worker_handle
|
||||
.get_control_event()
|
||||
.or_cancel(cancel_handle)
|
||||
.await;
|
||||
match maybe_event {
|
||||
Ok(Ok(Some(event))) => {
|
||||
// Terminal error means that worker should be removed from worker table.
|
||||
if let WorkerControlEvent::TerminalError(_) = &event {
|
||||
close_channel(state, id, WorkerChannel::Ctrl);
|
||||
}
|
||||
Ok(event)
|
||||
}
|
||||
Ok(Ok(None)) => {
|
||||
// If there was no event from worker it means it has already been closed.
|
||||
close_channel(state, id, WorkerChannel::Ctrl);
|
||||
Ok(WorkerControlEvent::Close)
|
||||
}
|
||||
Ok(Err(err)) => Err(err),
|
||||
Err(_) => {
|
||||
// The worker was terminated.
|
||||
Ok(WorkerControlEvent::Close)
|
||||
}
|
||||
return Ok(event);
|
||||
}
|
||||
|
||||
// If there was no event from worker it means it has already been closed.
|
||||
close_channel(state, id, WorkerChannel::Ctrl);
|
||||
Ok(WorkerControlEvent::Close)
|
||||
}
|
||||
|
||||
#[op]
|
||||
|
@ -361,23 +358,36 @@ async fn op_host_recv_message(
|
|||
state: Rc<RefCell<OpState>>,
|
||||
id: WorkerId,
|
||||
) -> Result<Option<JsMessageData>, AnyError> {
|
||||
let worker_handle = {
|
||||
let (worker_handle, cancel_handle) = {
|
||||
let s = state.borrow();
|
||||
let workers_table = s.borrow::<WorkersTable>();
|
||||
let maybe_handle = workers_table.get(&id);
|
||||
if let Some(handle) = maybe_handle {
|
||||
handle.worker_handle.clone()
|
||||
(handle.worker_handle.clone(), handle.cancel_handle.clone())
|
||||
} else {
|
||||
// If handle was not found it means worker has already shutdown
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let ret = worker_handle.port.recv(state.clone()).await?;
|
||||
if ret.is_none() {
|
||||
close_channel(state, id, WorkerChannel::Messages);
|
||||
let ret = worker_handle
|
||||
.port
|
||||
.recv(state.clone())
|
||||
.or_cancel(cancel_handle)
|
||||
.await;
|
||||
match ret {
|
||||
Ok(Ok(ret)) => {
|
||||
if ret.is_none() {
|
||||
close_channel(state, id, WorkerChannel::Messages);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
Ok(Err(err)) => Err(err),
|
||||
Err(_) => {
|
||||
// The worker was terminated.
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
/// Post message to guest worker as host
|
||||
|
|
Loading…
Reference in a new issue