1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 16:42:21 -05:00

fix(ext/node): MessagePort works (#22999)

Closes https://github.com/denoland/deno/issues/22951
Closes https://github.com/denoland/deno/issues/23001

Co-authored-by: Divy Srivastava <dj.srivastava23@gmail.com>
This commit is contained in:
Satya Rohith 2024-04-02 17:06:09 +05:30 committed by GitHub
parent 7ad76fd453
commit 4d66ec91c1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 183 additions and 5 deletions

View file

@ -17,6 +17,7 @@ import {
MessagePort, MessagePort,
MessagePortIdSymbol, MessagePortIdSymbol,
MessagePortPrototype, MessagePortPrototype,
nodeWorkerThreadCloseCb,
serializeJsMessageData, serializeJsMessageData,
} from "ext:deno_web/13_message_port.js"; } from "ext:deno_web/13_message_port.js";
import * as webidl from "ext:deno_webidl/00_webidl.js"; import * as webidl from "ext:deno_webidl/00_webidl.js";
@ -342,6 +343,15 @@ internals.__initWorkerThreads = (
defaultExport.parentPort = parentPort; defaultExport.parentPort = parentPort;
defaultExport.threadId = threadId; defaultExport.threadId = threadId;
for (const obj in workerData as Record<string, unknown>) {
if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, workerData[obj])) {
workerData[obj] = webMessagePortToNodeMessagePort(
workerData[obj] as MessagePort,
);
break;
}
}
parentPort.off = parentPort.removeListener = function ( parentPort.off = parentPort.removeListener = function (
this: ParentPort, this: ParentPort,
name, name,
@ -357,7 +367,22 @@ internals.__initWorkerThreads = (
listener, listener,
) { ) {
// deno-lint-ignore no-explicit-any // deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data); const _listener = (ev: any) => {
let message = ev.data;
if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message)) {
message = webMessagePortToNodeMessagePort(message);
} else {
for (const obj in message) {
if (
ObjectPrototypeIsPrototypeOf(MessagePortPrototype, message[obj])
) {
message[obj] = webMessagePortToNodeMessagePort(message[obj]);
break;
}
}
}
return listener(message);
};
listeners.set(listener, _listener); listeners.set(listener, _listener);
this.addEventListener(name, _listener); this.addEventListener(name, _listener);
return this; return this;
@ -424,10 +449,42 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined {
return { message: deserializeJsMessageData(data)[0] }; return { message: deserializeJsMessageData(data)[0] };
} }
class NodeMessageChannel {
port1: MessagePort;
port2: MessagePort;
constructor() {
const { port1, port2 } = new MessageChannel();
this.port1 = webMessagePortToNodeMessagePort(port1);
this.port2 = webMessagePortToNodeMessagePort(port2);
}
}
function webMessagePortToNodeMessagePort(port: MessagePort) {
port.on = port.addListener = function (this: MessagePort, name, listener) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
if (name == "message") {
port.onmessage = _listener;
} else if (name == "messageerror") {
port.onmessageerror = _listener;
} else if (name == "close") {
port.addEventListener("close", _listener);
} else {
throw new Error(`Unknown event: "${name}"`);
}
return this;
};
port[nodeWorkerThreadCloseCb] = () => {
port.dispatchEvent(new Event("close"));
};
return port;
}
export { export {
BroadcastChannel, BroadcastChannel,
MessageChannel,
MessagePort, MessagePort,
NodeMessageChannel as MessageChannel,
NodeWorker as Worker, NodeWorker as Worker,
parentPort, parentPort,
threadId, threadId,
@ -439,7 +496,7 @@ const defaultExport = {
moveMessagePortToContext, moveMessagePortToContext,
receiveMessageOnPort, receiveMessageOnPort,
MessagePort, MessagePort,
MessageChannel, MessageChannel: NodeMessageChannel,
BroadcastChannel, BroadcastChannel,
Worker: NodeWorker, Worker: NodeWorker,
getEnvironmentData, getEnvironmentData,

View file

@ -18,6 +18,7 @@ const {
ArrayPrototypeIncludes, ArrayPrototypeIncludes,
ArrayPrototypePush, ArrayPrototypePush,
ObjectPrototypeIsPrototypeOf, ObjectPrototypeIsPrototypeOf,
ObjectDefineProperty,
Symbol, Symbol,
SymbolFor, SymbolFor,
SymbolIterator, SymbolIterator,
@ -85,6 +86,8 @@ const MessageChannelPrototype = MessageChannel.prototype;
const _id = Symbol("id"); const _id = Symbol("id");
const MessagePortIdSymbol = _id; const MessagePortIdSymbol = _id;
const _enabled = Symbol("enabled"); const _enabled = Symbol("enabled");
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
/** /**
* @param {number} id * @param {number} id
@ -98,6 +101,16 @@ function createMessagePort(id) {
return port; return port;
} }
function nodeWorkerThreadMaybeInvokeCloseCb(port) {
if (
typeof port[nodeWorkerThreadCloseCb] == "function" &&
!port[nodeWorkerThreadCloseCbInvoked]
) {
port[nodeWorkerThreadCloseCb]();
port[nodeWorkerThreadCloseCbInvoked] = true;
}
}
class MessagePort extends EventTarget { class MessagePort extends EventTarget {
/** @type {number | null} */ /** @type {number | null} */
[_id] = null; [_id] = null;
@ -106,6 +119,14 @@ class MessagePort extends EventTarget {
constructor() { constructor() {
super(); super();
ObjectDefineProperty(this, nodeWorkerThreadCloseCb, {
value: null,
enumerable: false,
});
ObjectDefineProperty(this, nodeWorkerThreadCloseCbInvoked, {
value: false,
enumerable: false,
});
webidl.illegalConstructor(); webidl.illegalConstructor();
} }
@ -160,9 +181,13 @@ class MessagePort extends EventTarget {
); );
} catch (err) { } catch (err) {
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) break; if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) break;
nodeWorkerThreadMaybeInvokeCloseCb(this);
throw err; throw err;
} }
if (data === null) break; if (data === null) {
nodeWorkerThreadMaybeInvokeCloseCb(this);
break;
}
let message, transferables; let message, transferables;
try { try {
const v = deserializeJsMessageData(data); const v = deserializeJsMessageData(data);
@ -193,6 +218,7 @@ class MessagePort extends EventTarget {
if (this[_id] !== null) { if (this[_id] !== null) {
core.close(this[_id]); core.close(this[_id]);
this[_id] = null; this[_id] = null;
nodeWorkerThreadMaybeInvokeCloseCb(this);
} }
} }
@ -383,6 +409,7 @@ export {
MessagePort, MessagePort,
MessagePortIdSymbol, MessagePortIdSymbol,
MessagePortPrototype, MessagePortPrototype,
nodeWorkerThreadCloseCb,
serializeJsMessageData, serializeJsMessageData,
structuredClone, structuredClone,
}; };

View file

@ -1,6 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use test_util::itest; use test_util::itest;
use test_util::itest_flaky;
itest!(worker_error { itest!(worker_error {
args: "run -A workers/worker_error.ts", args: "run -A workers/worker_error.ts",
@ -125,3 +126,15 @@ itest!(node_worker_auto_exits {
output: "workers/node_worker_auto_exits.mjs.out", output: "workers/node_worker_auto_exits.mjs.out",
exit_code: 0, exit_code: 0,
}); });
itest_flaky!(node_worker_message_port {
args: "run --quiet --allow-read workers/node_worker_message_port.mjs",
output: "workers/node_worker_message_port.mjs.out",
exit_code: 0,
});
itest!(node_worker_transfer_port {
args: "run --quiet --allow-read workers/node_worker_transfer_port.mjs",
output: "workers/node_worker_transfer_port.mjs.out",
exit_code: 0,
});

View file

@ -0,0 +1,41 @@
import workerThreads from "node:worker_threads";
const { port1: mainPort, port2: workerPort } = new workerThreads
.MessageChannel();
// Note: not using Promise.withResolver() because it's not available in Node.js
const deferred = createDeferred();
const worker = new workerThreads.Worker(
import.meta.resolve("./node_worker_message_port_1.cjs"),
{
workerData: { workerPort },
transferList: [workerPort],
},
);
worker.on("message", (data) => {
console.log("worker:", data);
mainPort.on("message", (msg) => {
console.log("mainPort:", msg);
deferred.resolve();
});
mainPort.on("close", (_msg) => {
console.log("mainPort closed");
});
});
worker.postMessage("Hello from parent");
await deferred.promise;
await worker.terminate();
mainPort.close();
function createDeferred() {
let resolveCallback;
let rejectCallback;
const promise = new Promise((resolve, reject) => {
resolveCallback = resolve;
rejectCallback = reject;
});
return { promise, resolve: resolveCallback, reject: rejectCallback };
}

View file

@ -0,0 +1,4 @@
worker port closed
worker: Hello from worker on parentPort!
mainPort: Hello from worker on workerPort!
mainPort closed

View file

@ -0,0 +1,9 @@
const { parentPort, workerData } = require("worker_threads");
parentPort.on("message", (msg) => {
const workerPort = workerData.workerPort;
parentPort.postMessage("Hello from worker on parentPort!");
workerPort.postMessage("Hello from worker on workerPort!");
workerPort.on("close", () => console.log("worker port closed"));
workerPort.close();
});

View file

@ -0,0 +1,14 @@
import { MessageChannel, Worker } from "node:worker_threads";
const { port1, port2 } = new MessageChannel();
const worker = new Worker(
import.meta.resolve("./node_worker_transfer_port_1.mjs"),
);
// Send the port directly after the worker is created
worker.postMessage(port2, [port2]);
// Send a message to the worker using the transferred port
port1.postMessage("Hello from main thread!");
worker.on("message", (message) => {
console.log("Received message from worker:", message);
worker.terminate();
});

View file

@ -0,0 +1,3 @@
Worker thread started!
Received message from main thread: Hello from main thread!
Received message from worker: Reply from worker

View file

@ -0,0 +1,10 @@
import { parentPort } from "node:worker_threads";
parentPort.on("message", (message) => {
const transferredPort = message;
transferredPort.on("message", (message) => {
console.log("Received message from main thread:", message);
parentPort.postMessage("Reply from worker");
});
console.log("Worker thread started!");
});

View file

@ -16,7 +16,7 @@ Deno.test("[node/worker_threads] BroadcastChannel is exported", () => {
}); });
Deno.test("[node/worker_threads] MessageChannel are MessagePort are exported", () => { Deno.test("[node/worker_threads] MessageChannel are MessagePort are exported", () => {
assertEquals<unknown>(workerThreads.MessageChannel, MessageChannel); assert(workerThreads.MessageChannel);
assertEquals<unknown>(workerThreads.MessagePort, MessagePort); assertEquals<unknown>(workerThreads.MessagePort, MessagePort);
}); });