diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 785bf021d4..1efada0564 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -12,6 +12,7 @@ import { notImplemented } from "ext:deno_node/_utils.ts"; import { EventEmitter, once } from "node:events"; import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js"; +import { refWorker, unrefWorker } from "ext:runtime/11_workers.js"; let environmentData = new Map(); let threads = 0; @@ -170,6 +171,14 @@ class _Worker extends EventEmitter { this.emit("exit", 0); } + ref() { + refWorker(this[kHandle]); + } + + unref() { + unrefWorker(this[kHandle]); + } + readonly getHeapSnapshot = () => notImplemented("Worker.prototype.getHeapSnapshot"); // fake performance diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js index 0cfd0a0c8f..15bbad1017 100644 --- a/runtime/js/11_workers.js +++ b/runtime/js/11_workers.js @@ -1,6 +1,6 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. -import { primordials } from "ext:core/mod.js"; +import { core, primordials } from "ext:core/mod.js"; import { op_create_worker, op_host_post_message, @@ -14,6 +14,7 @@ const { ObjectPrototypeIsPrototypeOf, String, StringPrototypeStartsWith, + Symbol, SymbolFor, SymbolIterator, SymbolToStringTag, @@ -72,9 +73,22 @@ function hostRecvMessage(id) { return op_host_recv_message(id); } +const privateWorkerRef = Symbol(); + +function refWorker(worker) { + worker[privateWorkerRef](true); +} + +function unrefWorker(worker) { + worker[privateWorkerRef](false); +} + class Worker extends EventTarget { #id = 0; #name = ""; + #refCount = 1; + #messagePromise = undefined; + #controlPromise = undefined; // "RUNNING" | "CLOSED" | "TERMINATED" // "TERMINATED" means that any controls or messages received will be @@ -128,6 +142,30 @@ class Worker extends EventTarget { this.#pollMessages(); } + [privateWorkerRef](ref) { + if (ref) { + this.#refCount++; + } else { + this.#refCount--; + } + + if (!ref && this.#refCount == 0) { + if (this.#controlPromise) { + core.unrefOpPromise(this.#controlPromise); + } + if (this.#messagePromise) { + core.unrefOpPromise(this.#messagePromise); + } + } else if (ref && this.#refCount == 1) { + if (this.#controlPromise) { + core.refOpPromise(this.#controlPromise); + } + if (this.#messagePromise) { + core.refOpPromise(this.#messagePromise); + } + } + } + #handleError(e) { const event = new ErrorEvent("error", { cancelable: true, @@ -151,7 +189,11 @@ class Worker extends EventTarget { #pollControl = async () => { while (this.#status === "RUNNING") { - const { 0: type, 1: data } = await hostRecvCtrl(this.#id); + this.#controlPromise = hostRecvCtrl(this.#id); + if (this.#refCount < 1) { + core.unrefOpPromise(this.#controlPromise); + } + const { 0: type, 1: data } = await this.#controlPromise; // If terminate was called then we ignore all messages if (this.#status === "TERMINATED") { @@ -182,7 +224,11 @@ class Worker extends EventTarget { #pollMessages = async () => { while (this.#status !== "TERMINATED") { - const data = await hostRecvMessage(this.#id); + this.#messagePromise = hostRecvMessage(this.#id); + if (this.#refCount < 1) { + core.unrefOpPromise(this.#messagePromise); + } + const data = await this.#messagePromise; if (this.#status === "TERMINATED" || data === null) { return; } @@ -279,4 +325,4 @@ webidl.converters["WorkerType"] = webidl.createEnumConverter("WorkerType", [ "module", ]); -export { Worker }; +export { refWorker, unrefWorker, Worker }; diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index 5d30230699..bc451e33f0 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -1,6 +1,11 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. -import { assert, assertEquals, assertObjectMatch } from "@std/assert/mod.ts"; +import { + assert, + assertEquals, + assertObjectMatch, + fail, +} from "@std/assert/mod.ts"; import { fromFileUrl, relative } from "@std/path/mod.ts"; import * as workerThreads from "node:worker_threads"; import { EventEmitter, once } from "node:events"; @@ -198,3 +203,18 @@ Deno.test({ worker.terminate(); }, }); + +Deno.test({ + name: "[worker_threads] unref", + async fn() { + const timeout = setTimeout(() => fail("Test timed out"), 60_000); + const child = new Deno.Command(Deno.execPath(), { + args: [ + "eval", + "import { Worker } from 'node:worker_threads'; new Worker('setTimeout(() => {}, 1_000_000)', {eval:true}).unref();", + ], + }).spawn(); + await child.status; + clearTimeout(timeout); + }, +});