1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 08:33:43 -05:00

fix(ext/node): implement MessagePort.unref() (#23278)

Closes https://github.com/denoland/deno/issues/23252
Closes https://github.com/denoland/deno/issues/23264
This commit is contained in:
Satya Rohith 2024-04-09 23:45:55 +05:30 committed by GitHub
parent fad12b7c2e
commit 5a3ee6d9af
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 92 additions and 1 deletions

View file

@ -18,7 +18,9 @@ import {
MessagePortIdSymbol,
MessagePortPrototype,
nodeWorkerThreadCloseCb,
refMessagePort,
serializeJsMessageData,
unrefPollForMessages,
} from "ext:deno_web/13_message_port.js";
import * as webidl from "ext:deno_webidl/00_webidl.js";
import { notImplemented } from "ext:deno_node/_utils.ts";
@ -398,6 +400,12 @@ internals.__initWorkerThreads = (
parentPort.addEventListener("offline", () => {
parentPort.emit("close");
});
parentPort.unref = () => {
parentPort[unrefPollForMessages] = true;
};
parentPort.ref = () => {
parentPort[unrefPollForMessages] = false;
};
}
};
@ -467,6 +475,12 @@ function webMessagePortToNodeMessagePort(port: MessagePort) {
port[nodeWorkerThreadCloseCb] = () => {
port.dispatchEvent(new Event("close"));
};
port.unref = () => {
port[refMessagePort](false);
};
port.ref = () => {
port[refMessagePort](true);
};
return port;
}

View file

@ -89,8 +89,13 @@ const MessageChannelPrototype = MessageChannel.prototype;
const _id = Symbol("id");
const MessagePortIdSymbol = _id;
const _enabled = Symbol("enabled");
const _refed = Symbol("refed");
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
export const refMessagePort = Symbol("refMessagePort");
/** It is used by 99_main.js and worker_threads to
* unref/ref on the global pollForMessages promise. */
export const unrefPollForMessages = Symbol("unrefPollForMessages");
/**
* @param {number} id
@ -119,6 +124,7 @@ class MessagePort extends EventTarget {
[_id] = null;
/** @type {boolean} */
[_enabled] = false;
[_refed] = false;
constructor() {
super();
@ -216,6 +222,16 @@ class MessagePort extends EventTarget {
})();
}
[refMessagePort](ref) {
if (ref && !this[_refed]) {
this[_refed] = true;
messageEventListenerCount++;
} else if (!ref && this[_refed]) {
this[_refed] = false;
messageEventListenerCount = 0;
}
}
close() {
webidl.assertBranded(this, MessagePortPrototype);
if (this[_id] !== null) {
@ -235,6 +251,7 @@ class MessagePort extends EventTarget {
addEventListener(...args) {
if (args[0] == "message") {
messageEventListenerCount++;
if (!this[_refed]) this[_refed] = true;
}
super.addEventListener(...new SafeArrayIterator(args));
}

View file

@ -294,7 +294,12 @@ async function pollForMessages() {
);
}
while (!isClosing) {
const data = await op_worker_recv_message();
const recvMessage = op_worker_recv_message();
if (globalThis[messagePort.unrefPollForMessages] === true) {
core.unrefOpPromise(recvMessage);
}
const data = await recvMessage;
// const data = await op_worker_recv_message();
if (data === null) break;
const v = messagePort.deserializeJsMessageData(data);
const message = v[0];

View file

@ -137,3 +137,16 @@ itest!(node_worker_transfer_port {
output: "workers/node_worker_transfer_port.mjs.out",
exit_code: 0,
});
itest!(node_worker_message_port_unref {
args: "run --quiet --allow-env --allow-read workers/node_worker_message_port_unref.mjs",
output: "workers/node_worker_message_port_unref.mjs.out",
exit_code: 0,
});
itest!(node_worker_parent_port_unref {
envs: vec![("PARENT_PORT".into(), "1".into())],
args: "run --quiet --allow-env --allow-read workers/node_worker_message_port_unref.mjs",
output: "workers/node_worker_message_port_unref.mjs.out",
exit_code: 0,
});

View file

@ -0,0 +1,40 @@
import {
isMainThread,
MessageChannel,
parentPort,
Worker,
workerData,
} from "node:worker_threads";
const useParentPort = Deno.env.get("PARENT_PORT") === "1";
if (useParentPort) {
if (isMainThread) {
const worker = new Worker(import.meta.filename);
worker.postMessage("main says hi!");
worker.on("message", (msg) => console.log(msg));
} else {
parentPort.on("message", (msg) => {
console.log(msg);
parentPort.postMessage("worker says hi!");
parentPort.unref();
});
}
} else {
if (isMainThread) {
const { port1, port2 } = new MessageChannel();
const worker = new Worker(import.meta.filename, {
workerData: port2,
transferList: [port2],
});
port1.postMessage("main says hi!");
port1.on("message", (msg) => console.log(msg));
} else {
const port = workerData;
port.on("message", (msg) => {
console.log(msg);
port.postMessage("worker says hi!");
port.unref();
});
}
}

View file

@ -0,0 +1,2 @@
main says hi!
worker says hi!