1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

fix(node): fix worker_threads issues blocking Angular support (#26024)

Fixes #22995. Fixes #23000.

There were a handful of bugs here causing the hang (each with a
corresponding minimized test):

- We were canceling recv futures when `receiveMessageOnPort` was called,
but this caused the "receive loop" in the message port to exit. This was
due to the fact that `CancelHandle`s are never reset (i.e., once you
`cancel` a `CancelHandle`, it remains cancelled). That meant that after
`receieveMessageOnPort` was called, the subsequent calls to
`op_message_port_recv_message` would throw `Interrupted` exceptions, and
we would exit the loop.

The cancellation, however, isn't actually necessary.
`op_message_port_recv_message` only borrows the underlying port for long
enough to poll the receiver, so the borrow there could never overlap
with `op_message_port_recv_message_sync`.

- Calling `MessagePort.unref()` caused the "receive loop" in the message
port to exit. This was because we were setting
`messageEventListenerCount` to 0 on unref. Not only does that break the
counter when multiple `MessagePort`s are present in the same thread, but
we also exited the "receive loop" whenever the listener count was 0. I
assume this was to prevent the recv promise from keeping the event loop
open.

Instead of this, I chose to just unref the recv promise as needed to
control the event loop.

- The last bug causing the hang (which was a doozy to debug) ended up
being an unfortunate interaction between how we implement our
messageport "receive loop" and a pattern found in `npm:piscina` (which
angular uses). The gist of it is that piscina uses an atomic wait loop
along with `receiveMessageOnPort` in its worker threads, and as the
worker is getting started, the following incredibly convoluted series of
events occurs:
   1. Parent sends a MessagePort `p` to worker
   2. Parent sends a message `m` to the port `p`
3. Parent notifies the worker with `Atomics.notify` that a new message
is available
   4. Worker receives message, adds "message" listener to port `p`
   5. Adding the listener triggers `MessagePort.start()` on `p`
6. Receive loop in MessagePort.start receives the message `m`, but then
hits an await point and yields (before dispatching the "message" event)
7. Worker continues execution, starts the atomic wait loop, and
immediately receives the existing notification from the parent that a
message is available
8. Worker attempts to receive the new message `m` with
`receiveMessageOnPort`, but this returns `undefined` because the receive
loop already took the message in 6
9. Atomic wait loop continues to next iteration, waiting for the next
message with `Atomic.wait`
10. `Atomic.wait` blocks the worker thread, which prevents the receive
loop from continuing and dispatching the "message" event for the
received message
11. The parent waits for the worker to respond to the first message, and
waits
12. The thread can't make any more progress, and the whole process hangs

The fix I've chosen here (which I don't particularly love, but it works)
is to just delay the `MessagePort.start` call until the end of the event
loop turn, so that the atomic wait loop receives the message first. This
prevents the hang.

---

Those were the main issues causing the hang. There ended up being a few
other small bugs as well, namely `exit` being emitted multiple times,
and not patching up the message port when it's received by
`receiveMessageOnPort`.
This commit is contained in:
Nathan Whitaker 2024-10-04 09:26:32 -07:00 committed by GitHub
parent 7b509e492e
commit dd8cbf5e29
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 278 additions and 29 deletions

View file

@ -302,8 +302,8 @@ class NodeWorker extends EventEmitter {
if (this.#status !== "TERMINATED") {
this.#status = "TERMINATED";
op_host_terminate_worker(this.#id);
this.emit("exit", 0);
}
this.emit("exit", 0);
return PromiseResolve(0);
}
@ -422,7 +422,11 @@ internals.__initWorkerThreads = (
parentPort.once = function (this: ParentPort, name, listener) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
const _listener = (ev: any) => {
const message = ev.data;
patchMessagePortIfFound(message);
return listener(message);
};
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
@ -494,7 +498,9 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined {
port[MessagePortReceiveMessageOnPortSymbol] = true;
const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]);
if (data === null) return undefined;
return { message: deserializeJsMessageData(data)[0] };
const message = deserializeJsMessageData(data)[0];
patchMessagePortIfFound(message);
return { message };
}
class NodeMessageChannel {

View file

@ -22,6 +22,7 @@ const {
Symbol,
SymbolFor,
SymbolIterator,
PromiseResolve,
SafeArrayIterator,
TypeError,
} = primordials;
@ -41,7 +42,10 @@ import {
import { isDetachedBuffer } from "./06_streams.js";
import { DOMException } from "./01_dom_exception.js";
let messageEventListenerCount = 0;
// counter of how many message ports are actively refed
// either due to the existence of "message" event listeners or
// explicit calls to ref/unref (in the case of node message ports)
let refedMessagePortsCount = 0;
class MessageChannel {
/** @type {MessagePort} */
@ -93,6 +97,7 @@ const MessagePortReceiveMessageOnPortSymbol = Symbol(
);
const _enabled = Symbol("enabled");
const _refed = Symbol("refed");
const _messageEventListenerCount = Symbol("messageEventListenerCount");
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
export const refMessagePort = Symbol("refMessagePort");
@ -109,6 +114,9 @@ function createMessagePort(id) {
port[core.hostObjectBrand] = core.hostObjectBrand;
setEventTargetData(port);
port[_id] = id;
port[_enabled] = false;
port[_messageEventListenerCount] = 0;
port[_refed] = false;
return port;
}
@ -122,12 +130,18 @@ function nodeWorkerThreadMaybeInvokeCloseCb(port) {
}
}
const _isRefed = Symbol("isRefed");
const _dataPromise = Symbol("dataPromise");
class MessagePort extends EventTarget {
/** @type {number | null} */
[_id] = null;
/** @type {boolean} */
[_enabled] = false;
[_refed] = false;
/** @type {Promise<any> | undefined} */
[_dataPromise] = undefined;
[_messageEventListenerCount] = 0;
constructor() {
super();
@ -193,24 +207,21 @@ class MessagePort extends EventTarget {
this[_enabled] = true;
while (true) {
if (this[_id] === null) break;
// Exit if no message event listeners are present in Node compat mode.
if (
typeof this[nodeWorkerThreadCloseCb] == "function" &&
messageEventListenerCount === 0
) break;
let data;
try {
data = await op_message_port_recv_message(
this[_dataPromise] = op_message_port_recv_message(
this[_id],
);
if (
typeof this[nodeWorkerThreadCloseCb] === "function" &&
!this[_refed]
) {
core.unrefOpPromise(this[_dataPromise]);
}
data = await this[_dataPromise];
this[_dataPromise] = undefined;
} catch (err) {
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) {
// If we were interrupted, check if the interruption is coming
// from `receiveMessageOnPort` API from Node compat, if so, continue.
if (this[MessagePortReceiveMessageOnPortSymbol]) {
this[MessagePortReceiveMessageOnPortSymbol] = false;
continue;
}
break;
}
nodeWorkerThreadMaybeInvokeCloseCb(this);
@ -246,12 +257,26 @@ class MessagePort extends EventTarget {
}
[refMessagePort](ref) {
if (ref && !this[_refed]) {
this[_refed] = true;
messageEventListenerCount++;
} else if (!ref && this[_refed]) {
this[_refed] = false;
messageEventListenerCount = 0;
if (ref) {
if (!this[_refed]) {
refedMessagePortsCount++;
if (
this[_dataPromise]
) {
core.refOpPromise(this[_dataPromise]);
}
this[_refed] = true;
}
} else if (!ref) {
if (this[_refed]) {
refedMessagePortsCount--;
if (
this[_dataPromise]
) {
core.unrefOpPromise(this[_dataPromise]);
}
this[_refed] = false;
}
}
}
@ -266,15 +291,20 @@ class MessagePort extends EventTarget {
removeEventListener(...args) {
if (args[0] == "message") {
messageEventListenerCount--;
if (--this[_messageEventListenerCount] === 0 && this[_refed]) {
refedMessagePortsCount--;
this[_refed] = false;
}
}
super.removeEventListener(...new SafeArrayIterator(args));
}
addEventListener(...args) {
if (args[0] == "message") {
messageEventListenerCount++;
if (!this[_refed]) this[_refed] = true;
if (++this[_messageEventListenerCount] === 1 && !this[_refed]) {
refedMessagePortsCount++;
this[_refed] = true;
}
}
super.addEventListener(...new SafeArrayIterator(args));
}
@ -295,7 +325,17 @@ class MessagePort extends EventTarget {
}
defineEventHandler(MessagePort.prototype, "message", function (self) {
self.start();
if (self[nodeWorkerThreadCloseCb]) {
(async () => {
// delay `start()` until he end of this event loop turn, to give `receiveMessageOnPort`
// a chance to receive a message first. this is primarily to resolve an issue with
// a pattern used in `npm:piscina` that results in an indefinite hang
await PromiseResolve();
self.start();
})();
} else {
self.start();
}
});
defineEventHandler(MessagePort.prototype, "messageerror");
@ -463,12 +503,12 @@ function structuredClone(value, options) {
export {
deserializeJsMessageData,
MessageChannel,
messageEventListenerCount,
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
MessagePortReceiveMessageOnPortSymbol,
nodeWorkerThreadCloseCb,
refedMessagePortsCount,
serializeJsMessageData,
structuredClone,
};

View file

@ -239,7 +239,6 @@ pub fn op_message_port_recv_message_sync(
#[smi] rid: ResourceId,
) -> Result<Option<JsMessageData>, AnyError> {
let resource = state.resource_table.get::<MessagePortResource>(rid)?;
resource.cancel.cancel();
let mut rx = resource.port.rx.borrow_mut();
match rx.try_recv() {

View file

@ -169,8 +169,11 @@ let isClosing = false;
let globalDispatchEvent;
function hasMessageEventListener() {
// the function name is kind of a misnomer, but we want to behave
// as if we have message event listeners if a node message port is explicitly
// refed (and the inverse as well)
return event.listenerCount(globalThis, "message") > 0 ||
messagePort.messageEventListenerCount > 0;
messagePort.refedMessagePortsCount > 0;
}
async function pollForMessages() {

View file

@ -621,3 +621,204 @@ Deno.test({
worker.terminate();
},
});
Deno.test({
name: "[node/worker_threads] receiveMessageOnPort doesn't exit receive loop",
async fn() {
const worker = new workerThreads.Worker(
`
import { parentPort, receiveMessageOnPort } from "node:worker_threads";
parentPort.on("message", (msg) => {
const port = msg.port;
port.on("message", (msg2) => {
if (msg2 === "c") {
port.postMessage("done");
port.unref();
parentPort.unref();
}
});
parentPort.postMessage("ready");
const msg2 = receiveMessageOnPort(port);
});
`,
{ eval: true },
);
const { port1, port2 } = new workerThreads.MessageChannel();
worker.postMessage({ port: port2 }, [port2]);
const done = Promise.withResolvers<boolean>();
port1.on("message", (msg) => {
assertEquals(msg, "done");
worker.unref();
port1.close();
done.resolve(true);
});
worker.on("message", (msg) => {
assertEquals(msg, "ready");
port1.postMessage("a");
port1.postMessage("b");
port1.postMessage("c");
});
const timeout = setTimeout(() => {
fail("Test timed out");
}, 20_000);
try {
const result = await done.promise;
assertEquals(result, true);
} finally {
clearTimeout(timeout);
}
},
});
Deno.test({
name: "[node/worker_threads] MessagePort.unref doesn't exit receive loop",
async fn() {
const worker = new workerThreads.Worker(
`
import { parentPort } from "node:worker_threads";
const assertEquals = (a, b) => {
if (a !== b) {
throw new Error();
}
};
let state = 0;
parentPort.on("message", (msg) => {
const port = msg.port;
const expect = ["a", "b", "c"];
port.on("message", (msg2) => {
assertEquals(msg2, expect[state++]);
if (msg2 === "c") {
port.postMessage({ type: "done", got: msg2 });
parentPort.unref();
}
});
port.unref();
parentPort.postMessage("ready");
});
`,
{ eval: true },
);
const { port1, port2 } = new workerThreads.MessageChannel();
const done = Promise.withResolvers<boolean>();
port1.on("message", (msg) => {
assertEquals(msg.type, "done");
assertEquals(msg.got, "c");
worker.unref();
port1.close();
done.resolve(true);
});
worker.on("message", (msg) => {
assertEquals(msg, "ready");
port1.postMessage("a");
port1.postMessage("b");
port1.postMessage("c");
});
worker.postMessage({ port: port2 }, [port2]);
const timeout = setTimeout(() => {
fail("Test timed out");
}, 20_000);
try {
const result = await done.promise;
assertEquals(result, true);
} finally {
clearTimeout(timeout);
}
},
});
Deno.test({
name: "[node/worker_threads] npm:piscina wait loop hang regression",
async fn() {
const worker = new workerThreads.Worker(
`
import { assert, assertEquals } from "@std/assert";
import { parentPort, receiveMessageOnPort } from "node:worker_threads";
assert(parentPort !== null);
let currentTasks = 0;
let lastSeen = 0;
parentPort.on("message", (msg) => {
(async () => {
assert(typeof msg === "object" && msg !== null);
assert(msg.buf !== undefined);
assert(msg.port !== undefined);
const { buf, port } = msg;
port.postMessage("ready");
port.on("message", (msg) => onMessage(msg, buf, port));
atomicsWaitLoop(buf, port);
})();
});
function onMessage(msg, buf, port) {
currentTasks++;
(async () => {
assert(msg.taskName !== undefined);
port.postMessage({ type: "response", taskName: msg.taskName });
currentTasks--;
atomicsWaitLoop(buf, port);
})();
}
function atomicsWaitLoop(buf, port) {
while (currentTasks === 0) {
Atomics.wait(buf, 0, lastSeen);
lastSeen = Atomics.load(buf, 0);
let task;
while ((task = receiveMessageOnPort(port)) !== undefined) {
onMessage(task.message, buf, port);
}
}
}
`,
{ eval: true },
);
const sab = new SharedArrayBuffer(4);
const buf = new Int32Array(sab);
const { port1, port2 } = new workerThreads.MessageChannel();
const done = Promise.withResolvers<boolean>();
port1.unref();
worker.postMessage({
type: "init",
buf,
port: port2,
}, [port2]);
let count = 0;
port1.on("message", (msg) => {
if (count++ === 0) {
assertEquals(msg, "ready");
} else {
assertEquals(msg.type, "response");
port1.close();
done.resolve(true);
}
});
port1.postMessage({
taskName: "doThing",
});
Atomics.add(buf, 0, 1);
Atomics.notify(buf, 0, 1);
worker.unref();
const result = await done.promise;
assertEquals(result, true);
},
});