1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

fix(node): patch MessagePort in worker_thread message (#23871)

Our `MessagePort` to Node's `MessagePort` conversion logic was missing
the case where a `MessagePort` is sent _inside_ the message. This broke
`tinypool` which is used by `vitest` as it relies on some node specific
methods on `MessagePort`.

Fixes https://github.com/denoland/deno/issues/23854 , Fixes
https://github.com/denoland/deno/pull/23871
This commit is contained in:
Marvin Hagemeister 2024-05-20 15:01:40 +02:00 committed by GitHub
parent 927cbb5ecd
commit d7709daaa0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 50 additions and 10 deletions

View file

@ -32,7 +32,9 @@ import process from "node:process";
const { JSONParse, JSONStringify, ObjectPrototypeIsPrototypeOf } = primordials;
const {
Error,
ObjectHasOwn,
PromiseResolve,
SafeSet,
Symbol,
SymbolFor,
SymbolIterator,
@ -369,7 +371,7 @@ internals.__initWorkerThreads = (
defaultExport.parentPort = parentPort;
defaultExport.threadId = threadId;
workerData = patchMessagePortIfFound(workerData);
patchMessagePortIfFound(workerData);
parentPort.off = parentPort.removeListener = function (
this: ParentPort,
@ -387,8 +389,8 @@ internals.__initWorkerThreads = (
) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => {
let message = ev.data;
message = patchMessagePortIfFound(message);
const message = ev.data;
patchMessagePortIfFound(message);
return listener(message);
};
listeners.set(listener, _listener);
@ -484,7 +486,10 @@ const listeners = new SafeWeakMap<
function webMessagePortToNodeMessagePort(port: MessagePort) {
port.on = port.addListener = function (this: MessagePort, name, listener) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
const _listener = (ev: any) => {
patchMessagePortIfFound(ev.data);
listener(ev.data);
};
if (name == "message") {
if (port.onmessage === null) {
port.onmessage = _listener;
@ -534,19 +539,26 @@ function webMessagePortToNodeMessagePort(port: MessagePort) {
return port;
}
// TODO(@marvinhagemeister): Recursively iterating over all message
// properties seems slow.
// Maybe there is a way we can patch the prototype of MessagePort _only_
// inside worker_threads? For now correctness is more important than perf.
// deno-lint-ignore no-explicit-any
function patchMessagePortIfFound(data: any) {
function patchMessagePortIfFound(data: any, seen = new SafeSet<any>()) {
if (data === null || typeof data !== "object" || seen.has(data)) {
return;
}
seen.add(data);
if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, data)) {
data = webMessagePortToNodeMessagePort(data);
webMessagePortToNodeMessagePort(data);
} else {
for (const obj in data as Record<string, unknown>) {
if (ObjectPrototypeIsPrototypeOf(MessagePortPrototype, data[obj])) {
data[obj] = webMessagePortToNodeMessagePort(data[obj] as MessagePort);
break;
if (ObjectHasOwn(data, obj)) {
patchMessagePortIfFound(data[obj], seen);
}
}
}
return data;
}
export {

View file

@ -562,3 +562,31 @@ Deno.test({
port1.close();
},
});
// Test for https://github.com/denoland/deno/issues/23854
Deno.test({
name: "[node/worker_threads] MessagePort.addListener is present",
async fn() {
const channel = new workerThreads.MessageChannel();
const worker = new workerThreads.Worker(
`
import { parentPort } from "node:worker_threads";
parentPort.addListener("message", message => {
if (message.foo) {
const success = typeof message.foo.bar.addListener === "function";
parentPort.postMessage(success ? "it works" : "it doesn't work")
}
})
`,
{
eval: true,
},
);
worker.postMessage({ foo: { bar: channel.port1 } }, [channel.port1]);
assertEquals((await once(worker, "message"))[0], "it works");
worker.terminate();
channel.port1.close();
channel.port2.close();
},
});