diff --git a/cli/tests/integration/run_tests.rs b/cli/tests/integration/run_tests.rs index 981b851992..2c0a165822 100644 --- a/cli/tests/integration/run_tests.rs +++ b/cli/tests/integration/run_tests.rs @@ -1177,6 +1177,11 @@ itest!(worker_close_race { output: "worker_close_race.js.out", }); +itest!(worker_message_before_close { + args: "run --quiet --reload --allow-read worker_message_before_close.js", + output: "worker_message_before_close.js.out", +}); + #[test] fn no_validate_asm() { let output = util::deno_cmd() diff --git a/cli/tests/testdata/worker_message_before_close.js b/cli/tests/testdata/worker_message_before_close.js new file mode 100644 index 0000000000..6a9149af13 --- /dev/null +++ b/cli/tests/testdata/worker_message_before_close.js @@ -0,0 +1,16 @@ +for (let i = 0; i < 4; i++) { + const worker = new Worker( + new URL("./workers/message_before_close.js", import.meta.url).href, + { type: "module", name: String(i) }, + ); + + worker.addEventListener("message", (message) => { + // Only print responses after all reception logs. + setTimeout(() => { + console.log("response from worker %d received", message.data); + }, 500); + }); + worker.postMessage(i); +} + +export {}; diff --git a/cli/tests/testdata/worker_message_before_close.js.out b/cli/tests/testdata/worker_message_before_close.js.out new file mode 100644 index 0000000000..f1cc558a24 --- /dev/null +++ b/cli/tests/testdata/worker_message_before_close.js.out @@ -0,0 +1,8 @@ +message received in worker 0 +message received in worker 1 +message received in worker 2 +message received in worker 3 +response from worker 0 received +response from worker 1 received +response from worker 2 received +response from worker 3 received diff --git a/cli/tests/testdata/workers/message_before_close.js b/cli/tests/testdata/workers/message_before_close.js new file mode 100644 index 0000000000..0213abcb30 --- /dev/null +++ b/cli/tests/testdata/workers/message_before_close.js @@ -0,0 +1,6 @@ +self.onmessage = (params) => { + const workerId = params.data; + console.log("message received in worker %d", workerId); + self.postMessage(workerId); + self.close(); +}; diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js index 86560b20d9..c9bfc172ab 100644 --- a/runtime/js/11_workers.js +++ b/runtime/js/11_workers.js @@ -139,7 +139,13 @@ class Worker extends EventTarget { #id = 0; #name = ""; - #terminated = false; + + // "RUNNING" | "CLOSED" | "TERMINATED" + // "TERMINATED" means that any controls or messages received will be + // discarded. "CLOSED" means that we have received a control + // indicating that the worker is no longer running, but there might + // still be messages left to receive. + #status = "RUNNING"; constructor(specifier, options = {}) { super(); @@ -243,17 +249,17 @@ } #pollControl = async () => { - while (!this.#terminated) { + while (this.#status === "RUNNING") { const [type, data] = await hostRecvCtrl(this.#id); // If terminate was called then we ignore all messages - if (this.#terminated) { + if (this.#status === "TERMINATED") { return; } switch (type) { case 1: { // TerminalError - this.#terminated = true; + this.#status = "CLOSED"; } /* falls through */ case 2: { // Error if (!this.#handleError(data)) { @@ -270,7 +276,7 @@ } case 3: { // Close log(`Host got "close" message from worker: ${this.#name}`); - this.#terminated = true; + this.#status = "CLOSED"; return; } default: { @@ -281,9 +287,11 @@ }; #pollMessages = async () => { - while (!this.terminated) { + while (this.#status !== "TERMINATED") { const data = await hostRecvMessage(this.#id); - if (data === null) break; + if (this.#status === "TERMINATED" || data === null) { + return; + } let message, transferables; try { const v = deserializeJsMessageData(data); @@ -332,13 +340,14 @@ } const { transfer } = options; const data = serializeJsMessageData(message, transfer); - if (this.#terminated) return; - hostPostMessage(this.#id, data); + if (this.#status === "RUNNING") { + hostPostMessage(this.#id, data); + } } terminate() { - if (!this.#terminated) { - this.#terminated = true; + if (this.#status !== "TERMINATED") { + this.#status = "TERMINATED"; hostTerminateWorker(this.#id); } } diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index d80a39502e..829681ab69 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -67,6 +67,12 @@ pub struct CreateWebWorkerCbHolder(Arc); pub struct WorkerThread { join_handle: JoinHandle>, worker_handle: WebWorkerHandle, + + // A WorkerThread that hasn't been explicitly terminated can only be removed + // from the WorkersTable once close messages have been received for both the + // control and message channels. See `close_channel`. + ctrl_closed: bool, + message_closed: bool, } pub type WorkersTable = HashMap; @@ -553,6 +559,8 @@ fn op_create_worker( let worker_thread = WorkerThread { join_handle, worker_handle: worker_handle.into(), + ctrl_closed: false, + message_closed: false, }; // At this point all interactions with worker happen using thread @@ -582,19 +590,49 @@ fn op_host_terminate_worker( Ok(()) } -/// Try to remove worker from workers table - NOTE: `Worker.terminate()` -/// might have been called already meaning that we won't find worker in -/// table - in that case ignore. -fn try_remove_and_close(state: Rc>, id: WorkerId) { +enum WorkerChannel { + Ctrl, + Messages, +} + +/// Close a worker's channel. If this results in both of a worker's channels +/// being closed, the worker will be removed from the workers table. +fn close_channel( + state: Rc>, + id: WorkerId, + channel: WorkerChannel, +) { + use std::collections::hash_map::Entry; + let mut s = state.borrow_mut(); let workers = s.borrow_mut::(); - if let Some(worker_thread) = workers.remove(&id) { - worker_thread.worker_handle.terminate(); - worker_thread - .join_handle - .join() - .expect("Worker thread panicked") - .expect("Panic in worker event loop"); + + // `Worker.terminate()` might have been called already, meaning that we won't + // find the worker in the table - in that case ignore. + if let Entry::Occupied(mut entry) = workers.entry(id) { + let terminate = { + let worker_thread = entry.get_mut(); + match channel { + WorkerChannel::Ctrl => { + worker_thread.ctrl_closed = true; + worker_thread.message_closed + } + WorkerChannel::Messages => { + worker_thread.message_closed = true; + worker_thread.ctrl_closed + } + } + }; + + if terminate { + let worker_thread = entry.remove(); + worker_thread.worker_handle.terminate(); + worker_thread + .join_handle + .join() + .expect("Worker thread panicked") + .expect("Panic in worker event loop"); + } } } @@ -620,13 +658,13 @@ async fn op_host_recv_ctrl( if let Some(event) = maybe_event { // Terminal error means that worker should be removed from worker table. if let WorkerControlEvent::TerminalError(_) = &event { - try_remove_and_close(state, id); + close_channel(state, id, WorkerChannel::Ctrl); } return Ok(event); } // If there was no event from worker it means it has already been closed. - try_remove_and_close(state, id); + close_channel(state, id, WorkerChannel::Ctrl); Ok(WorkerControlEvent::Close) } @@ -646,7 +684,12 @@ async fn op_host_recv_message( return Ok(None); } }; - worker_handle.port.recv(state).await + + let ret = worker_handle.port.recv(state.clone()).await?; + if ret.is_none() { + close_channel(state, id, WorkerChannel::Messages); + } + Ok(ret) } /// Post message to guest worker as host