diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs index e6004ba49a..4c4b42142f 100644 --- a/cli/tests/integration/run_tests.rs +++ b/cli/tests/integration/run_tests.rs @@ -1610,6 +1610,11 @@ itest!(worker_drop_handle_race { exit_code: 1, }); +itest!(worker_drop_handle_race_terminate { + args: "run --unstable worker_drop_handle_race_terminate.js", + output: "worker_drop_handle_race_terminate.js.out", +}); + itest!(worker_close_nested { args: "run --quiet --reload --allow-read worker_close_nested.js", output: "worker_close_nested.js.out", diff --git a/cli/tests/testdata/worker_drop_handle_race_terminate.js b/cli/tests/testdata/worker_drop_handle_race_terminate.js new file mode 100644 index 0000000000..dfdd9c5615 --- /dev/null +++ b/cli/tests/testdata/worker_drop_handle_race_terminate.js @@ -0,0 +1,40 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. + +// Test that the panic in https://github.com/denoland/deno/issues/11342 does not +// happen when calling worker.terminate() after fixing +// https://github.com/denoland/deno/issues/13705 + +function getCodeBlobUrl(code) { + const blob = new Blob([code], { type: "text/javascript" }); + return URL.createObjectURL(blob); +} + +const WORKER2 = getCodeBlobUrl(` + console.log("Worker 2"); + self.postMessage(undefined); + + // We sleep for slightly under 2 seconds in order to make sure that worker 1 + // has closed, and that this worker's thread finishes normally rather than + // being killed (which happens 2 seconds after calling terminate). + Deno.sleepSync(1800); + console.log("Finished sleeping in worker 2"); +`); + +const WORKER1 = getCodeBlobUrl(` + console.log("Worker 1"); + const worker = new Worker( + ${JSON.stringify(WORKER2)}, + { type: "module", deno: { namespace: true } } + ); + + worker.addEventListener("message", () => { + console.log("Terminating"); + worker.terminate(); + self.close(); + }); +`); + +new Worker(WORKER1, { type: "module", deno: { namespace: true } }); + +// Don't kill the process before worker 2 is finished. +setTimeout(() => {}, 3000); diff --git a/cli/tests/testdata/worker_drop_handle_race_terminate.js.out b/cli/tests/testdata/worker_drop_handle_race_terminate.js.out new file mode 100644 index 0000000000..5ec1e7ff8e --- /dev/null +++ b/cli/tests/testdata/worker_drop_handle_race_terminate.js.out @@ -0,0 +1,4 @@ +Worker 1 +Worker 2 +Terminating +Finished sleeping in worker 2 diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index 23ffcd8b1b..bd1f1e3f57 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -17,6 +17,8 @@ use deno_core::futures::future::LocalFutureObj; use deno_core::op; use deno_core::serde::Deserialize; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::Extension; use deno_core::ModuleSpecifier; use deno_core::OpState; @@ -27,7 +29,6 @@ use std::collections::HashMap; use std::rc::Rc; use std::sync::atomic::AtomicI32; use std::sync::Arc; -use std::thread::JoinHandle; pub struct CreateWebWorkerArgs { pub name: String, @@ -66,9 +67,8 @@ pub struct FormatJsErrorFnHolder(Option>); pub struct PreloadModuleCbHolder(Arc); pub struct WorkerThread { - // It's an Option so we can take the value before dropping the WorkerThread. - join_handle: Option>>, worker_handle: WebWorkerHandle, + cancel_handle: Rc, // A WorkerThread that hasn't been explicitly terminated can only be removed // from the WorkersTable once close messages have been received for both the @@ -78,30 +78,16 @@ pub struct WorkerThread { } impl WorkerThread { - fn terminate(mut self) { - self.worker_handle.clone().terminate(); - self - .join_handle - .take() - .unwrap() - .join() - .expect("Worker thread panicked") - .expect("Panic in worker event loop"); - - // Optimization so the Drop impl doesn't try to terminate the worker handle - // again. - self.ctrl_closed = true; - self.message_closed = true; + fn terminate(self) { + // Cancel recv ops when terminating the worker, so they don't show up as + // pending ops. + self.cancel_handle.cancel(); } } impl Drop for WorkerThread { fn drop(&mut self) { - // If either of the channels is closed, the worker thread has at least - // started closing, and its event loop won't start another run. - if !(self.ctrl_closed || self.message_closed) { - self.worker_handle.clone().terminate(); - } + self.worker_handle.clone().terminate(); } } @@ -217,7 +203,7 @@ fn op_create_worker( std::thread::Builder::new().name(format!("{}", worker_id)); // Spawn it - let join_handle = thread_builder.spawn(move || { + thread_builder.spawn(move || { // Any error inside this block is terminal: // - JS worker is useless - meaning it throws an exception and can't do anything else, // all action done upon it should be noops @@ -256,8 +242,8 @@ fn op_create_worker( let worker_handle = handle_receiver.recv().unwrap()?; let worker_thread = WorkerThread { - join_handle: Some(join_handle), worker_handle: worker_handle.into(), + cancel_handle: CancelHandle::new_rc(), ctrl_closed: false, message_closed: false, }; @@ -330,30 +316,41 @@ async fn op_host_recv_ctrl( state: Rc>, id: WorkerId, ) -> Result { - let worker_handle = { + let (worker_handle, cancel_handle) = { let state = state.borrow(); let workers_table = state.borrow::(); let maybe_handle = workers_table.get(&id); if let Some(handle) = maybe_handle { - handle.worker_handle.clone() + (handle.worker_handle.clone(), handle.cancel_handle.clone()) } else { // If handle was not found it means worker has already shutdown return Ok(WorkerControlEvent::Close); } }; - let maybe_event = worker_handle.get_control_event().await?; - if let Some(event) = maybe_event { - // Terminal error means that worker should be removed from worker table. - if let WorkerControlEvent::TerminalError(_) = &event { - close_channel(state, id, WorkerChannel::Ctrl); + let maybe_event = worker_handle + .get_control_event() + .or_cancel(cancel_handle) + .await; + match maybe_event { + Ok(Ok(Some(event))) => { + // Terminal error means that worker should be removed from worker table. + if let WorkerControlEvent::TerminalError(_) = &event { + close_channel(state, id, WorkerChannel::Ctrl); + } + Ok(event) + } + Ok(Ok(None)) => { + // If there was no event from worker it means it has already been closed. + close_channel(state, id, WorkerChannel::Ctrl); + Ok(WorkerControlEvent::Close) + } + Ok(Err(err)) => Err(err), + Err(_) => { + // The worker was terminated. + Ok(WorkerControlEvent::Close) } - return Ok(event); } - - // If there was no event from worker it means it has already been closed. - close_channel(state, id, WorkerChannel::Ctrl); - Ok(WorkerControlEvent::Close) } #[op] @@ -361,23 +358,36 @@ async fn op_host_recv_message( state: Rc>, id: WorkerId, ) -> Result, AnyError> { - let worker_handle = { + let (worker_handle, cancel_handle) = { let s = state.borrow(); let workers_table = s.borrow::(); let maybe_handle = workers_table.get(&id); if let Some(handle) = maybe_handle { - handle.worker_handle.clone() + (handle.worker_handle.clone(), handle.cancel_handle.clone()) } else { // If handle was not found it means worker has already shutdown return Ok(None); } }; - let ret = worker_handle.port.recv(state.clone()).await?; - if ret.is_none() { - close_channel(state, id, WorkerChannel::Messages); + let ret = worker_handle + .port + .recv(state.clone()) + .or_cancel(cancel_handle) + .await; + match ret { + Ok(Ok(ret)) => { + if ret.is_none() { + close_channel(state, id, WorkerChannel::Messages); + } + Ok(ret) + } + Ok(Err(err)) => Err(err), + Err(_) => { + // The worker was terminated. + Ok(None) + } } - Ok(ret) } /// Post message to guest worker as host