1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-12 09:03:42 -05:00

fix(ext/node): add basic node:worker_threads support (#19192)

This PR restores `node:worker_threads` implementation and test cases
from
[`std@0.175.0/node`](https://github.com/denoland/deno_std/blob/0.175.0/node/worker_threads.ts).

---------

Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
Yoshiya Hinosawa 2023-05-24 03:56:29 +09:00 committed by GitHub
parent 3d865949c2
commit 26f42a248f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 324 additions and 84 deletions

View file

@ -1,7 +1,7 @@
{ {
"nodeVersion": "18.12.1", "nodeVersion": "18.12.1",
"ignore": { "ignore": {
"common": ["index.js", "internet.js", "tmpdir.js"], "common": ["index.js", "internet.js"],
"fixtures": [ "fixtures": [
"child-process-spawn-node.js", "child-process-spawn-node.js",
"echo.js", "echo.js",
@ -121,7 +121,8 @@
"fixtures.js", "fixtures.js",
"hijackstdio.js", "hijackstdio.js",
"index.mjs", "index.mjs",
"internet.js" "internet.js",
"tmpdir.js"
], ],
"fixtures": [ "fixtures": [
"GH-1899-output.js", "GH-1899-output.js",

View file

@ -2,14 +2,14 @@
// deno-lint-ignore-file // deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license. // Copyright Joyent and Node contributors. All rights reserved. MIT license.
// Taken from Node 16.13.0 // Taken from Node 18.12.1
// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually // This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
'use strict'; 'use strict';
const fs = require('fs'); const fs = require('fs');
const path = require('path'); const path = require('path');
// const { isMainThread } = require('worker_threads'); const { isMainThread } = require('worker_threads');
function rmSync(pathname) { function rmSync(pathname) {
fs.rmSync(pathname, { maxRetries: 3, recursive: true, force: true }); fs.rmSync(pathname, { maxRetries: 3, recursive: true, force: true });
@ -26,8 +26,8 @@ const tmpPath = path.join(testRoot, tmpdirName);
let firstRefresh = true; let firstRefresh = true;
function refresh() { function refresh() {
rmSync(this.path); rmSync(tmpPath);
fs.mkdirSync(this.path); fs.mkdirSync(tmpPath);
if (firstRefresh) { if (firstRefresh) {
firstRefresh = false; firstRefresh = false;
@ -39,9 +39,8 @@ function refresh() {
function onexit() { function onexit() {
// Change directory to avoid possible EBUSY // Change directory to avoid possible EBUSY
// TODO(f3n67u): uncomment when `worker_thread.isMainThread` implemented if (isMainThread)
// if (isMainThread) process.chdir(testRoot);
// process.chdir(testRoot);
try { try {
rmSync(tmpPath); rmSync(tmpPath);

View file

@ -0,0 +1,34 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import {
getEnvironmentData,
isMainThread,
parentPort,
threadId,
workerData,
} from "node:worker_threads";
import { once } from "node:events";
async function message(expectedMessage) {
const [message] = await once(parentPort, "message");
if (message !== expectedMessage) {
console.log(`Expected the message "${expectedMessage}", but got`, message);
// fail test
parentPort.close();
}
}
await message("Hello, how are you my thread?");
parentPort.postMessage("I'm fine!");
await new Promise((resolve) => setTimeout(resolve, 100));
parentPort.postMessage({
isMainThread,
threadId,
workerData: Array.isArray(workerData) &&
workerData[workerData.length - 1] instanceof MessagePort
? workerData.slice(0, -1)
: workerData,
envData: [getEnvironmentData("test"), getEnvironmentData(1)],
});

View file

@ -1,7 +1,13 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { assertEquals } from "../../../test_util/std/testing/asserts.ts"; import {
import workerThreads from "node:worker_threads"; assert,
assertEquals,
assertObjectMatch,
} from "../../../test_util/std/testing/asserts.ts";
import { fromFileUrl, relative } from "../../../test_util/std/path/mod.ts";
import * as workerThreads from "node:worker_threads";
import { EventEmitter, once } from "node:events";
Deno.test("[node/worker_threads] BroadcastChannel is exported", () => { Deno.test("[node/worker_threads] BroadcastChannel is exported", () => {
assertEquals<unknown>(workerThreads.BroadcastChannel, BroadcastChannel); assertEquals<unknown>(workerThreads.BroadcastChannel, BroadcastChannel);
@ -11,3 +17,178 @@ Deno.test("[node/worker_threads] MessageChannel are MessagePort are exported", (
assertEquals<unknown>(workerThreads.MessageChannel, MessageChannel); assertEquals<unknown>(workerThreads.MessageChannel, MessageChannel);
assertEquals<unknown>(workerThreads.MessagePort, MessagePort); assertEquals<unknown>(workerThreads.MessagePort, MessagePort);
}); });
Deno.test({
name: "[worker_threads] isMainThread",
fn() {
assertEquals(workerThreads.isMainThread, true);
},
});
Deno.test({
name: "[worker_threads] threadId",
fn() {
assertEquals(workerThreads.threadId, 0);
},
});
Deno.test({
name: "[worker_threads] resourceLimits",
fn() {
assertObjectMatch(workerThreads.resourceLimits, {});
},
});
Deno.test({
name: "[worker_threads] parentPort",
fn() {
assertEquals(workerThreads.parentPort, null);
},
});
Deno.test({
name: "[worker_threads] workerData",
fn() {
assertEquals(workerThreads.workerData, null);
},
});
Deno.test({
name: "[worker_threads] setEnvironmentData / getEnvironmentData",
fn() {
workerThreads.setEnvironmentData("test", "test");
assertEquals(workerThreads.getEnvironmentData("test"), "test");
},
});
Deno.test({
name: "[worker_threads] Worker threadId",
async fn() {
const worker = new workerThreads.Worker(
new URL("./testdata/worker_threads.mjs", import.meta.url),
);
worker.postMessage("Hello, how are you my thread?");
await once(worker, "message");
const message = await once(worker, "message");
assertEquals(message[0].threadId, 1);
worker.terminate();
const worker1 = new workerThreads.Worker(
new URL("./testdata/worker_threads.mjs", import.meta.url),
);
worker1.postMessage("Hello, how are you my thread?");
await once(worker1, "message");
assertEquals((await once(worker1, "message"))[0].threadId, 2);
worker1.terminate();
},
});
Deno.test({
name: "[worker_threads] Worker basics",
async fn() {
workerThreads.setEnvironmentData("test", "test");
workerThreads.setEnvironmentData(1, {
test: "random",
random: "test",
});
const { port1 } = new MessageChannel();
const worker = new workerThreads.Worker(
new URL("./testdata/worker_threads.mjs", import.meta.url),
{
workerData: ["hey", true, false, 2, port1],
// deno-lint-ignore no-explicit-any
transferList: [port1 as any],
},
);
worker.postMessage("Hello, how are you my thread?");
assertEquals((await once(worker, "message"))[0], "I'm fine!");
const data = (await once(worker, "message"))[0];
// data.threadId can be 1 when this test is runned individually
if (data.threadId === 1) data.threadId = 3;
assertObjectMatch(data, {
isMainThread: false,
threadId: 3,
workerData: ["hey", true, false, 2],
envData: ["test", { test: "random", random: "test" }],
});
worker.terminate();
},
sanitizeResources: false,
});
Deno.test({
name: "[worker_threads] Worker eval",
async fn() {
const worker = new workerThreads.Worker(
`
import { parentPort } from "node:worker_threads";
parentPort.postMessage("It works!");
`,
{
eval: true,
},
);
assertEquals((await once(worker, "message"))[0], "It works!");
worker.terminate();
},
});
Deno.test({
name: "[worker_threads] inheritences",
async fn() {
const worker = new workerThreads.Worker(
`
import { EventEmitter } from "node:events";
import { parentPort } from "node:worker_threads";
parentPort.postMessage(parentPort instanceof EventTarget);
await new Promise(resolve => setTimeout(resolve, 100));
parentPort.postMessage(parentPort instanceof EventEmitter);
`,
{
eval: true,
},
);
assertEquals((await once(worker, "message"))[0], true);
assertEquals((await once(worker, "message"))[0], false);
assert(worker instanceof EventEmitter);
assert(!(worker instanceof EventTarget));
worker.terminate();
},
});
Deno.test({
name: "[worker_threads] Worker workerData",
async fn() {
const worker = new workerThreads.Worker(
new URL("./testdata/worker_threads.mjs", import.meta.url),
{
workerData: null,
},
);
worker.postMessage("Hello, how are you my thread?");
await once(worker, "message");
assertEquals((await once(worker, "message"))[0].workerData, null);
worker.terminate();
const worker1 = new workerThreads.Worker(
new URL("./testdata/worker_threads.mjs", import.meta.url),
);
worker1.postMessage("Hello, how are you my thread?");
await once(worker1, "message");
assertEquals((await once(worker1, "message"))[0].workerData, undefined);
worker1.terminate();
},
});
Deno.test({
name: "[worker_threads] Worker with relative path",
async fn() {
const worker = new workerThreads.Worker(relative(
Deno.cwd(),
fromFileUrl(new URL("./testdata/worker_threads.mjs", import.meta.url)),
));
worker.postMessage("Hello, how are you my thread?");
assertEquals((await once(worker, "message"))[0], "I'm fine!");
worker.terminate();
},
});

View file

@ -46,6 +46,7 @@ function initialize(
// FIXME(bartlomieju): not nice to depend on `Deno` namespace here // FIXME(bartlomieju): not nice to depend on `Deno` namespace here
// but it's the only way to get `args` and `version` and this point. // but it's the only way to get `args` and `version` and this point.
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version); internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
internals.__initWorkerThreads();
// `Deno[Deno.internal].requireImpl` will be unreachable after this line. // `Deno[Deno.internal].requireImpl` will be unreachable after this line.
delete internals.requireImpl; delete internals.requireImpl;
} }

View file

@ -3,11 +3,11 @@
import { resolve, toFileUrl } from "ext:deno_node/path.ts"; import { resolve, toFileUrl } from "ext:deno_node/path.ts";
import { notImplemented } from "ext:deno_node/_utils.ts"; import { notImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter } from "ext:deno_node/events.ts"; import { EventEmitter, once } from "ext:deno_node/events.ts";
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js"; import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js";
const environmentData = new Map(); let environmentData = new Map();
let threads = 0; let threads = 0;
export interface WorkerOptions { export interface WorkerOptions {
@ -48,13 +48,18 @@ class _Worker extends EventEmitter {
postMessage: Worker["postMessage"]; postMessage: Worker["postMessage"];
constructor(specifier: URL | string, options?: WorkerOptions) { constructor(specifier: URL | string, options?: WorkerOptions) {
notImplemented("Worker");
super(); super();
if (options?.eval === true) { if (options?.eval === true) {
specifier = `data:text/javascript,${specifier}`; specifier = `data:text/javascript,${specifier}`;
} else if (typeof specifier === "string") { } else if (typeof specifier === "string") {
// @ts-ignore This API is temporarily disabled specifier = resolve(specifier);
specifier = toFileUrl(resolve(specifier)); if (!specifier.toString().endsWith(".mjs")) {
const cwdFileUrl = toFileUrl(Deno.cwd());
specifier =
`data:text/javascript,(async function() {const { createRequire } = await import("node:module");const require = createRequire("${cwdFileUrl}");require("${specifier}");})();`;
} else {
specifier = toFileUrl(specifier);
}
} }
const handle = this[kHandle] = new Worker( const handle = this[kHandle] = new Worker(
specifier, specifier,
@ -95,20 +100,11 @@ class _Worker extends EventEmitter {
readonly performance = globalThis.performance; readonly performance = globalThis.performance;
} }
export const isMainThread = export let isMainThread;
// deno-lint-ignore no-explicit-any export let resourceLimits;
(globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME;
// fake resourceLimits let threadId = 0;
export const resourceLimits = isMainThread ? {} : { let workerData: unknown = null;
maxYoungGenerationSizeMb: 48,
maxOldGenerationSizeMb: 2048,
codeRangeSizeMb: 0,
stackSizeMb: 4,
};
const threadId = 0;
const workerData: unknown = null;
// Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611 // Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611
interface NodeEventTarget extends interface NodeEventTarget extends
@ -131,9 +127,23 @@ interface NodeEventTarget extends
type ParentPort = typeof self & NodeEventTarget; type ParentPort = typeof self & NodeEventTarget;
// deno-lint-ignore no-explicit-any // deno-lint-ignore no-explicit-any
const parentPort: ParentPort = null as any; let parentPort: ParentPort = null as any;
globalThis.__bootstrap.internals.__initWorkerThreads = () => {
isMainThread =
// deno-lint-ignore no-explicit-any
(globalThis as any).name !== PRIVATE_WORKER_THREAD_NAME;
defaultExport.isMainThread = isMainThread;
// fake resourceLimits
resourceLimits = isMainThread ? {} : {
maxYoungGenerationSizeMb: 48,
maxOldGenerationSizeMb: 2048,
codeRangeSizeMb: 0,
stackSizeMb: 4,
};
defaultExport.resourceLimits = resourceLimits;
/*
if (!isMainThread) { if (!isMainThread) {
// deno-lint-ignore no-explicit-any // deno-lint-ignore no-explicit-any
delete (globalThis as any).name; delete (globalThis as any).name;
@ -141,6 +151,22 @@ if (!isMainThread) {
const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>(); const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>();
parentPort = self as ParentPort; parentPort = self as ParentPort;
const initPromise = once(
parentPort,
"message",
).then((result) => {
// TODO(kt3k): The below values are set asynchronously
// using the first message from the parent.
// This should be done synchronously.
threadId = result[0].data.threadId;
workerData = result[0].data.workerData;
environmentData = result[0].data.environmentData;
defaultExport.threadId = threadId;
defaultExport.workerData = workerData;
});
parentPort.off = parentPort.removeListener = function ( parentPort.off = parentPort.removeListener = function (
this: ParentPort, this: ParentPort,
name, name,
@ -155,17 +181,22 @@ if (!isMainThread) {
name, name,
listener, listener,
) { ) {
initPromise.then(() => {
// deno-lint-ignore no-explicit-any // deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data); const _listener = (ev: any) => listener(ev.data);
listeners.set(listener, _listener); listeners.set(listener, _listener);
this.addEventListener(name, _listener); this.addEventListener(name, _listener);
});
return this; return this;
}; };
parentPort.once = function (this: ParentPort, name, listener) { parentPort.once = function (this: ParentPort, name, listener) {
initPromise.then(() => {
// deno-lint-ignore no-explicit-any // deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data); const _listener = (ev: any) => listener(ev.data);
listeners.set(listener, _listener); listeners.set(listener, _listener);
this.addEventListener(name, _listener); this.addEventListener(name, _listener);
});
return this; return this;
}; };
@ -179,26 +210,17 @@ if (!isMainThread) {
parentPort.removeAllListeners = () => parentPort.removeAllListeners = () =>
notImplemented("parentPort.removeAllListeners"); notImplemented("parentPort.removeAllListeners");
// Receive startup message
[{ threadId, workerData, environmentData }] = await once(
parentPort,
"message",
);
// alias
parentPort.addEventListener("offline", () => { parentPort.addEventListener("offline", () => {
parentPort.emit("close"); parentPort.emit("close");
}); });
} }
*/ };
export function getEnvironmentData(key: unknown) { export function getEnvironmentData(key: unknown) {
notImplemented("getEnvironmentData");
return environmentData.get(key); return environmentData.get(key);
} }
export function setEnvironmentData(key: unknown, value?: unknown) { export function setEnvironmentData(key: unknown, value?: unknown) {
notImplemented("setEnvironmentData");
if (value === undefined) { if (value === undefined) {
environmentData.delete(key); environmentData.delete(key);
} else { } else {
@ -226,7 +248,7 @@ export {
workerData, workerData,
}; };
export default { const defaultExport = {
markAsUntransferable, markAsUntransferable,
moveMessagePortToContext, moveMessagePortToContext,
receiveMessageOnPort, receiveMessageOnPort,
@ -243,3 +265,5 @@ export default {
parentPort, parentPort,
isMainThread, isMainThread,
}; };
export default defaultExport;