From 26f42a248f4764f85c1c3c3c511b82a990e4b651 Mon Sep 17 00:00:00 2001 From: Yoshiya Hinosawa Date: Wed, 24 May 2023 03:56:29 +0900 Subject: [PATCH] fix(ext/node): add basic node:worker_threads support (#19192) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR restores `node:worker_threads` implementation and test cases from [`std@0.175.0/node`](https://github.com/denoland/deno_std/blob/0.175.0/node/worker_threads.ts). --------- Co-authored-by: Bartek IwaƄczuk --- cli/tests/node_compat/config.jsonc | 5 +- cli/tests/node_compat/test/common/tmpdir.js | 13 +- .../unit_node/testdata/worker_threads.mjs | 34 ++++ cli/tests/unit_node/worker_threads_test.ts | 185 +++++++++++++++++- ext/node/polyfills/02_init.js | 1 + ext/node/polyfills/worker_threads.ts | 170 +++++++++------- 6 files changed, 324 insertions(+), 84 deletions(-) create mode 100644 cli/tests/unit_node/testdata/worker_threads.mjs diff --git a/cli/tests/node_compat/config.jsonc b/cli/tests/node_compat/config.jsonc index a3b97b5100..ccc83cd3de 100644 --- a/cli/tests/node_compat/config.jsonc +++ b/cli/tests/node_compat/config.jsonc @@ -1,7 +1,7 @@ { "nodeVersion": "18.12.1", "ignore": { - "common": ["index.js", "internet.js", "tmpdir.js"], + "common": ["index.js", "internet.js"], "fixtures": [ "child-process-spawn-node.js", "echo.js", @@ -121,7 +121,8 @@ "fixtures.js", "hijackstdio.js", "index.mjs", - "internet.js" + "internet.js", + "tmpdir.js" ], "fixtures": [ "GH-1899-output.js", diff --git a/cli/tests/node_compat/test/common/tmpdir.js b/cli/tests/node_compat/test/common/tmpdir.js index d3ce98e45b..dbc945c137 100644 --- a/cli/tests/node_compat/test/common/tmpdir.js +++ b/cli/tests/node_compat/test/common/tmpdir.js @@ -2,14 +2,14 @@ // deno-lint-ignore-file // Copyright Joyent and Node contributors. All rights reserved. MIT license. -// Taken from Node 16.13.0 +// Taken from Node 18.12.1 // This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually 'use strict'; const fs = require('fs'); const path = require('path'); -// const { isMainThread } = require('worker_threads'); +const { isMainThread } = require('worker_threads'); function rmSync(pathname) { fs.rmSync(pathname, { maxRetries: 3, recursive: true, force: true }); @@ -26,8 +26,8 @@ const tmpPath = path.join(testRoot, tmpdirName); let firstRefresh = true; function refresh() { - rmSync(this.path); - fs.mkdirSync(this.path); + rmSync(tmpPath); + fs.mkdirSync(tmpPath); if (firstRefresh) { firstRefresh = false; @@ -39,9 +39,8 @@ function refresh() { function onexit() { // Change directory to avoid possible EBUSY - // TODO(f3n67u): uncomment when `worker_thread.isMainThread` implemented - // if (isMainThread) - // process.chdir(testRoot); + if (isMainThread) + process.chdir(testRoot); try { rmSync(tmpPath); diff --git a/cli/tests/unit_node/testdata/worker_threads.mjs b/cli/tests/unit_node/testdata/worker_threads.mjs new file mode 100644 index 0000000000..03dc462f02 --- /dev/null +++ b/cli/tests/unit_node/testdata/worker_threads.mjs @@ -0,0 +1,34 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +import { + getEnvironmentData, + isMainThread, + parentPort, + threadId, + workerData, +} from "node:worker_threads"; +import { once } from "node:events"; + +async function message(expectedMessage) { + const [message] = await once(parentPort, "message"); + if (message !== expectedMessage) { + console.log(`Expected the message "${expectedMessage}", but got`, message); + // fail test + parentPort.close(); + } +} + +await message("Hello, how are you my thread?"); + +parentPort.postMessage("I'm fine!"); + +await new Promise((resolve) => setTimeout(resolve, 100)); + +parentPort.postMessage({ + isMainThread, + threadId, + workerData: Array.isArray(workerData) && + workerData[workerData.length - 1] instanceof MessagePort + ? workerData.slice(0, -1) + : workerData, + envData: [getEnvironmentData("test"), getEnvironmentData(1)], +}); diff --git a/cli/tests/unit_node/worker_threads_test.ts b/cli/tests/unit_node/worker_threads_test.ts index 17de7cca1e..f53b1e6927 100644 --- a/cli/tests/unit_node/worker_threads_test.ts +++ b/cli/tests/unit_node/worker_threads_test.ts @@ -1,7 +1,13 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -import { assertEquals } from "../../../test_util/std/testing/asserts.ts"; -import workerThreads from "node:worker_threads"; +import { + assert, + assertEquals, + assertObjectMatch, +} from "../../../test_util/std/testing/asserts.ts"; +import { fromFileUrl, relative } from "../../../test_util/std/path/mod.ts"; +import * as workerThreads from "node:worker_threads"; +import { EventEmitter, once } from "node:events"; Deno.test("[node/worker_threads] BroadcastChannel is exported", () => { assertEquals(workerThreads.BroadcastChannel, BroadcastChannel); @@ -11,3 +17,178 @@ Deno.test("[node/worker_threads] MessageChannel are MessagePort are exported", ( assertEquals(workerThreads.MessageChannel, MessageChannel); assertEquals(workerThreads.MessagePort, MessagePort); }); + +Deno.test({ + name: "[worker_threads] isMainThread", + fn() { + assertEquals(workerThreads.isMainThread, true); + }, +}); + +Deno.test({ + name: "[worker_threads] threadId", + fn() { + assertEquals(workerThreads.threadId, 0); + }, +}); + +Deno.test({ + name: "[worker_threads] resourceLimits", + fn() { + assertObjectMatch(workerThreads.resourceLimits, {}); + }, +}); + +Deno.test({ + name: "[worker_threads] parentPort", + fn() { + assertEquals(workerThreads.parentPort, null); + }, +}); + +Deno.test({ + name: "[worker_threads] workerData", + fn() { + assertEquals(workerThreads.workerData, null); + }, +}); + +Deno.test({ + name: "[worker_threads] setEnvironmentData / getEnvironmentData", + fn() { + workerThreads.setEnvironmentData("test", "test"); + assertEquals(workerThreads.getEnvironmentData("test"), "test"); + }, +}); + +Deno.test({ + name: "[worker_threads] Worker threadId", + async fn() { + const worker = new workerThreads.Worker( + new URL("./testdata/worker_threads.mjs", import.meta.url), + ); + worker.postMessage("Hello, how are you my thread?"); + await once(worker, "message"); + const message = await once(worker, "message"); + assertEquals(message[0].threadId, 1); + worker.terminate(); + + const worker1 = new workerThreads.Worker( + new URL("./testdata/worker_threads.mjs", import.meta.url), + ); + worker1.postMessage("Hello, how are you my thread?"); + await once(worker1, "message"); + assertEquals((await once(worker1, "message"))[0].threadId, 2); + worker1.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] Worker basics", + async fn() { + workerThreads.setEnvironmentData("test", "test"); + workerThreads.setEnvironmentData(1, { + test: "random", + random: "test", + }); + const { port1 } = new MessageChannel(); + const worker = new workerThreads.Worker( + new URL("./testdata/worker_threads.mjs", import.meta.url), + { + workerData: ["hey", true, false, 2, port1], + // deno-lint-ignore no-explicit-any + transferList: [port1 as any], + }, + ); + worker.postMessage("Hello, how are you my thread?"); + assertEquals((await once(worker, "message"))[0], "I'm fine!"); + const data = (await once(worker, "message"))[0]; + // data.threadId can be 1 when this test is runned individually + if (data.threadId === 1) data.threadId = 3; + assertObjectMatch(data, { + isMainThread: false, + threadId: 3, + workerData: ["hey", true, false, 2], + envData: ["test", { test: "random", random: "test" }], + }); + worker.terminate(); + }, + sanitizeResources: false, +}); + +Deno.test({ + name: "[worker_threads] Worker eval", + async fn() { + const worker = new workerThreads.Worker( + ` + import { parentPort } from "node:worker_threads"; + parentPort.postMessage("It works!"); + `, + { + eval: true, + }, + ); + assertEquals((await once(worker, "message"))[0], "It works!"); + worker.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] inheritences", + async fn() { + const worker = new workerThreads.Worker( + ` + import { EventEmitter } from "node:events"; + import { parentPort } from "node:worker_threads"; + parentPort.postMessage(parentPort instanceof EventTarget); + await new Promise(resolve => setTimeout(resolve, 100)); + parentPort.postMessage(parentPort instanceof EventEmitter); + `, + { + eval: true, + }, + ); + assertEquals((await once(worker, "message"))[0], true); + assertEquals((await once(worker, "message"))[0], false); + assert(worker instanceof EventEmitter); + assert(!(worker instanceof EventTarget)); + worker.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] Worker workerData", + async fn() { + const worker = new workerThreads.Worker( + new URL("./testdata/worker_threads.mjs", import.meta.url), + { + workerData: null, + }, + ); + worker.postMessage("Hello, how are you my thread?"); + await once(worker, "message"); + assertEquals((await once(worker, "message"))[0].workerData, null); + worker.terminate(); + + const worker1 = new workerThreads.Worker( + new URL("./testdata/worker_threads.mjs", import.meta.url), + ); + worker1.postMessage("Hello, how are you my thread?"); + await once(worker1, "message"); + assertEquals((await once(worker1, "message"))[0].workerData, undefined); + worker1.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] Worker with relative path", + async fn() { + const worker = new workerThreads.Worker(relative( + Deno.cwd(), + fromFileUrl(new URL("./testdata/worker_threads.mjs", import.meta.url)), + )); + worker.postMessage("Hello, how are you my thread?"); + assertEquals((await once(worker, "message"))[0], "I'm fine!"); + worker.terminate(); + }, +}); diff --git a/ext/node/polyfills/02_init.js b/ext/node/polyfills/02_init.js index dc8955d834..a2fba8c0c5 100644 --- a/ext/node/polyfills/02_init.js +++ b/ext/node/polyfills/02_init.js @@ -46,6 +46,7 @@ function initialize( // FIXME(bartlomieju): not nice to depend on `Deno` namespace here // but it's the only way to get `args` and `version` and this point. internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version); + internals.__initWorkerThreads(); // `Deno[Deno.internal].requireImpl` will be unreachable after this line. delete internals.requireImpl; } diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 2c13e4bc8d..8005506bb6 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -3,11 +3,11 @@ import { resolve, toFileUrl } from "ext:deno_node/path.ts"; import { notImplemented } from "ext:deno_node/_utils.ts"; -import { EventEmitter } from "ext:deno_node/events.ts"; +import { EventEmitter, once } from "ext:deno_node/events.ts"; import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js"; -const environmentData = new Map(); +let environmentData = new Map(); let threads = 0; export interface WorkerOptions { @@ -48,13 +48,18 @@ class _Worker extends EventEmitter { postMessage: Worker["postMessage"]; constructor(specifier: URL | string, options?: WorkerOptions) { - notImplemented("Worker"); super(); if (options?.eval === true) { specifier = `data:text/javascript,${specifier}`; } else if (typeof specifier === "string") { - // @ts-ignore This API is temporarily disabled - specifier = toFileUrl(resolve(specifier)); + specifier = resolve(specifier); + if (!specifier.toString().endsWith(".mjs")) { + const cwdFileUrl = toFileUrl(Deno.cwd()); + specifier = + `data:text/javascript,(async function() {const { createRequire } = await import("node:module");const require = createRequire("${cwdFileUrl}");require("${specifier}");})();`; + } else { + specifier = toFileUrl(specifier); + } } const handle = this[kHandle] = new Worker( specifier, @@ -95,20 +100,11 @@ class _Worker extends EventEmitter { readonly performance = globalThis.performance; } -export const isMainThread = - // deno-lint-ignore no-explicit-any - (globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME; +export let isMainThread; +export let resourceLimits; -// fake resourceLimits -export const resourceLimits = isMainThread ? {} : { - maxYoungGenerationSizeMb: 48, - maxOldGenerationSizeMb: 2048, - codeRangeSizeMb: 0, - stackSizeMb: 4, -}; - -const threadId = 0; -const workerData: unknown = null; +let threadId = 0; +let workerData: unknown = null; // Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611 interface NodeEventTarget extends @@ -131,74 +127,100 @@ interface NodeEventTarget extends type ParentPort = typeof self & NodeEventTarget; // deno-lint-ignore no-explicit-any -const parentPort: ParentPort = null as any; +let parentPort: ParentPort = null as any; -/* -if (!isMainThread) { - // deno-lint-ignore no-explicit-any - delete (globalThis as any).name; - // deno-lint-ignore no-explicit-any - const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>(); - - parentPort = self as ParentPort; - parentPort.off = parentPort.removeListener = function ( - this: ParentPort, - name, - listener, - ) { - this.removeEventListener(name, listeners.get(listener)!); - listeners.delete(listener); - return this; - }; - parentPort.on = parentPort.addListener = function ( - this: ParentPort, - name, - listener, - ) { +globalThis.__bootstrap.internals.__initWorkerThreads = () => { + isMainThread = // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); - listeners.set(listener, _listener); - this.addEventListener(name, _listener); - return this; + (globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME; + + defaultExport.isMainThread = isMainThread; + // fake resourceLimits + resourceLimits = isMainThread ? {} : { + maxYoungGenerationSizeMb: 48, + maxOldGenerationSizeMb: 2048, + codeRangeSizeMb: 0, + stackSizeMb: 4, }; - parentPort.once = function (this: ParentPort, name, listener) { + defaultExport.resourceLimits = resourceLimits; + + if (!isMainThread) { // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); - listeners.set(listener, _listener); - this.addEventListener(name, _listener); - return this; - }; + delete (globalThis as any).name; + // deno-lint-ignore no-explicit-any + const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>(); - // mocks - parentPort.setMaxListeners = () => {}; - parentPort.getMaxListeners = () => Infinity; - parentPort.eventNames = () => [""]; - parentPort.listenerCount = () => 0; + parentPort = self as ParentPort; - parentPort.emit = () => notImplemented("parentPort.emit"); - parentPort.removeAllListeners = () => - notImplemented("parentPort.removeAllListeners"); + const initPromise = once( + parentPort, + "message", + ).then((result) => { + // TODO(kt3k): The below values are set asynchronously + // using the first message from the parent. + // This should be done synchronously. + threadId = result[0].data.threadId; + workerData = result[0].data.workerData; + environmentData = result[0].data.environmentData; - // Receive startup message - [{ threadId, workerData, environmentData }] = await once( - parentPort, - "message", - ); + defaultExport.threadId = threadId; + defaultExport.workerData = workerData; + }); - // alias - parentPort.addEventListener("offline", () => { - parentPort.emit("close"); - }); -} -*/ + parentPort.off = parentPort.removeListener = function ( + this: ParentPort, + name, + listener, + ) { + this.removeEventListener(name, listeners.get(listener)!); + listeners.delete(listener); + return this; + }; + parentPort.on = parentPort.addListener = function ( + this: ParentPort, + name, + listener, + ) { + initPromise.then(() => { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); + }); + return this; + }; + + parentPort.once = function (this: ParentPort, name, listener) { + initPromise.then(() => { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); + }); + return this; + }; + + // mocks + parentPort.setMaxListeners = () => {}; + parentPort.getMaxListeners = () => Infinity; + parentPort.eventNames = () => [""]; + parentPort.listenerCount = () => 0; + + parentPort.emit = () => notImplemented("parentPort.emit"); + parentPort.removeAllListeners = () => + notImplemented("parentPort.removeAllListeners"); + + parentPort.addEventListener("offline", () => { + parentPort.emit("close"); + }); + } +}; export function getEnvironmentData(key: unknown) { - notImplemented("getEnvironmentData"); return environmentData.get(key); } export function setEnvironmentData(key: unknown, value?: unknown) { - notImplemented("setEnvironmentData"); if (value === undefined) { environmentData.delete(key); } else { @@ -226,7 +248,7 @@ export { workerData, }; -export default { +const defaultExport = { markAsUntransferable, moveMessagePortToContext, receiveMessageOnPort, @@ -243,3 +265,5 @@ export default { parentPort, isMainThread, }; + +export default defaultExport;