diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index ab3834132e..49562d8921 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -17,6 +17,7 @@ import { MessagePort, MessagePortIdSymbol, MessagePortPrototype, + nodeWorkerThreadCloseCb, serializeJsMessageData, } from "ext:deno_web/13_message_port.js"; import * as webidl from "ext:deno_webidl/00_webidl.js"; @@ -342,6 +343,15 @@ internals.__initWorkerThreads = ( defaultExport.parentPort = parentPort; defaultExport.threadId = threadId; + for (const obj in workerData as Record) { + if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, workerData[obj])) { + workerData[obj] = webMessagePortToNodeMessagePort( + workerData[obj] as MessagePort, + ); + break; + } + } + parentPort.off = parentPort.removeListener = function ( this: ParentPort, name, @@ -357,7 +367,22 @@ internals.__initWorkerThreads = ( listener, ) { // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); + const _listener = (ev: any) => { + let message = ev.data; + if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message)) { + message = webMessagePortToNodeMessagePort(message); + } else { + for (const obj in message) { + if ( + ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message[obj]) + ) { + message[obj] = webMessagePortToNodeMessagePort(message[obj]); + break; + } + } + } + return listener(message); + }; listeners.set(listener, _listener); this.addEventListener(name, _listener); return this; @@ -424,10 +449,42 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined { return { message: deserializeJsMessageData(data)[0] }; } +class NodeMessageChannel { + port1: MessagePort; + port2: MessagePort; + + constructor() { + const { port1, port2 } = new MessageChannel(); + this.port1 = webMessagePortToNodeMessagePort(port1); + this.port2 = webMessagePortToNodeMessagePort(port2); + } +} + +function webMessagePortToNodeMessagePort(port: MessagePort) { + port.on = port.addListener = function (this: MessagePort, name, listener) { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + if (name == "message") { + port.onmessage = _listener; + } else if (name == "messageerror") { + port.onmessageerror = _listener; + } else if (name == "close") { + port.addEventListener("close", _listener); + } else { + throw new Error(`Unknown event: "${name}"`); + } + return this; + }; + port[nodeWorkerThreadCloseCb] = () => { + port.dispatchEvent(new Event("close")); + }; + return port; +} + export { BroadcastChannel, - MessageChannel, MessagePort, + NodeMessageChannel as MessageChannel, NodeWorker as Worker, parentPort, threadId, @@ -439,7 +496,7 @@ const defaultExport = { moveMessagePortToContext, receiveMessageOnPort, MessagePort, - MessageChannel, + MessageChannel: NodeMessageChannel, BroadcastChannel, Worker: NodeWorker, getEnvironmentData, diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index d953c52edd..83470c8957 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -18,6 +18,7 @@ const { ArrayPrototypeIncludes, ArrayPrototypePush, ObjectPrototypeIsPrototypeOf, + ObjectDefineProperty, Symbol, SymbolFor, SymbolIterator, @@ -85,6 +86,8 @@ const MessageChannelPrototype = MessageChannel.prototype; const _id = Symbol("id"); const MessagePortIdSymbol = _id; const _enabled = Symbol("enabled"); +const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb"); +const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked"); /** * @param {number} id @@ -98,6 +101,16 @@ function createMessagePort(id) { return port; } +function nodeWorkerThreadMaybeInvokeCloseCb(port) { + if ( + typeof port[nodeWorkerThreadCloseCb] == "function" && + !port[nodeWorkerThreadCloseCbInvoked] + ) { + port[nodeWorkerThreadCloseCb](); + port[nodeWorkerThreadCloseCbInvoked] = true; + } +} + class MessagePort extends EventTarget { /** @type {number | null} */ [_id] = null; @@ -106,6 +119,14 @@ class MessagePort extends EventTarget { constructor() { super(); + ObjectDefineProperty(this, nodeWorkerThreadCloseCb, { + value: null, + enumerable: false, + }); + ObjectDefineProperty(this, nodeWorkerThreadCloseCbInvoked, { + value: false, + enumerable: false, + }); webidl.illegalConstructor(); } @@ -160,9 +181,13 @@ class MessagePort extends EventTarget { ); } catch (err) { if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) break; + nodeWorkerThreadMaybeInvokeCloseCb(this); throw err; } - if (data === null) break; + if (data === null) { + nodeWorkerThreadMaybeInvokeCloseCb(this); + break; + } let message, transferables; try { const v = deserializeJsMessageData(data); @@ -193,6 +218,7 @@ class MessagePort extends EventTarget { if (this[_id] !== null) { core.close(this[_id]); this[_id] = null; + nodeWorkerThreadMaybeInvokeCloseCb(this); } } @@ -383,6 +409,7 @@ export { MessagePort, MessagePortIdSymbol, MessagePortPrototype, + nodeWorkerThreadCloseCb, serializeJsMessageData, structuredClone, }; diff --git a/tests/integration/worker_tests.rs b/tests/integration/worker_tests.rs index dd0c2d409d..abd07da16d 100644 --- a/tests/integration/worker_tests.rs +++ b/tests/integration/worker_tests.rs @@ -1,6 +1,7 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. use test_util::itest; +use test_util::itest_flaky; itest!(worker_error { args: "run -A workers/worker_error.ts", @@ -125,3 +126,15 @@ itest!(node_worker_auto_exits { output: "workers/node_worker_auto_exits.mjs.out", exit_code: 0, }); + +itest_flaky!(node_worker_message_port { + args: "run --quiet --allow-read workers/node_worker_message_port.mjs", + output: "workers/node_worker_message_port.mjs.out", + exit_code: 0, +}); + +itest!(node_worker_transfer_port { + args: "run --quiet --allow-read workers/node_worker_transfer_port.mjs", + output: "workers/node_worker_transfer_port.mjs.out", + exit_code: 0, +}); diff --git a/tests/testdata/workers/node_worker_message_port.mjs b/tests/testdata/workers/node_worker_message_port.mjs new file mode 100644 index 0000000000..71640fb400 --- /dev/null +++ b/tests/testdata/workers/node_worker_message_port.mjs @@ -0,0 +1,41 @@ +import workerThreads from "node:worker_threads"; + +const { port1: mainPort, port2: workerPort } = new workerThreads + .MessageChannel(); + +// Note: not using Promise.withResolver() because it's not available in Node.js +const deferred = createDeferred(); + +const worker = new workerThreads.Worker( + import.meta.resolve("./node_worker_message_port_1.cjs"), + { + workerData: { workerPort }, + transferList: [workerPort], + }, +); + +worker.on("message", (data) => { + console.log("worker:", data); + mainPort.on("message", (msg) => { + console.log("mainPort:", msg); + deferred.resolve(); + }); + mainPort.on("close", (_msg) => { + console.log("mainPort closed"); + }); +}); + +worker.postMessage("Hello from parent"); +await deferred.promise; +await worker.terminate(); +mainPort.close(); + +function createDeferred() { + let resolveCallback; + let rejectCallback; + const promise = new Promise((resolve, reject) => { + resolveCallback = resolve; + rejectCallback = reject; + }); + return { promise, resolve: resolveCallback, reject: rejectCallback }; +} diff --git a/tests/testdata/workers/node_worker_message_port.mjs.out b/tests/testdata/workers/node_worker_message_port.mjs.out new file mode 100644 index 0000000000..5317b65a06 --- /dev/null +++ b/tests/testdata/workers/node_worker_message_port.mjs.out @@ -0,0 +1,4 @@ +worker port closed +worker: Hello from worker on parentPort! +mainPort: Hello from worker on workerPort! +mainPort closed diff --git a/tests/testdata/workers/node_worker_message_port_1.cjs b/tests/testdata/workers/node_worker_message_port_1.cjs new file mode 100644 index 0000000000..01739c51ef --- /dev/null +++ b/tests/testdata/workers/node_worker_message_port_1.cjs @@ -0,0 +1,9 @@ +const { parentPort, workerData } = require("worker_threads"); + +parentPort.on("message", (msg) => { + const workerPort = workerData.workerPort; + parentPort.postMessage("Hello from worker on parentPort!"); + workerPort.postMessage("Hello from worker on workerPort!"); + workerPort.on("close", () => console.log("worker port closed")); + workerPort.close(); +}); diff --git a/tests/testdata/workers/node_worker_transfer_port.mjs b/tests/testdata/workers/node_worker_transfer_port.mjs new file mode 100644 index 0000000000..1b17ed1ab6 --- /dev/null +++ b/tests/testdata/workers/node_worker_transfer_port.mjs @@ -0,0 +1,14 @@ +import { MessageChannel, Worker } from "node:worker_threads"; + +const { port1, port2 } = new MessageChannel(); +const worker = new Worker( + import.meta.resolve("./node_worker_transfer_port_1.mjs"), +); +// Send the port directly after the worker is created +worker.postMessage(port2, [port2]); +// Send a message to the worker using the transferred port +port1.postMessage("Hello from main thread!"); +worker.on("message", (message) => { + console.log("Received message from worker:", message); + worker.terminate(); +}); diff --git a/tests/testdata/workers/node_worker_transfer_port.mjs.out b/tests/testdata/workers/node_worker_transfer_port.mjs.out new file mode 100644 index 0000000000..8e8f119405 --- /dev/null +++ b/tests/testdata/workers/node_worker_transfer_port.mjs.out @@ -0,0 +1,3 @@ +Worker thread started! +Received message from main thread: Hello from main thread! +Received message from worker: Reply from worker diff --git a/tests/testdata/workers/node_worker_transfer_port_1.mjs b/tests/testdata/workers/node_worker_transfer_port_1.mjs new file mode 100644 index 0000000000..4d0a38bd5f --- /dev/null +++ b/tests/testdata/workers/node_worker_transfer_port_1.mjs @@ -0,0 +1,10 @@ +import { parentPort } from "node:worker_threads"; + +parentPort.on("message", (message) => { + const transferredPort = message; + transferredPort.on("message", (message) => { + console.log("Received message from main thread:", message); + parentPort.postMessage("Reply from worker"); + }); + console.log("Worker thread started!"); +}); diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index 52c9cfffdc..2351e10529 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -16,7 +16,7 @@ Deno.test("[node/worker_threads] BroadcastChannel is exported", () => { }); Deno.test("[node/worker_threads] MessageChannel are MessagePort are exported", () => { - assertEquals(workerThreads.MessageChannel, MessageChannel); + assert(workerThreads.MessageChannel); assertEquals(workerThreads.MessagePort, MessagePort); });