From 511d13abaf5cc98cc01250f7adc5edf15dc29276 Mon Sep 17 00:00:00 2001 From: Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> Date: Tue, 27 Aug 2024 20:05:32 -0700 Subject: [PATCH] fix(ext/node): emit `online` event after worker thread is initialized (#25243) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #23281. Part of #20613. We were emitting the `online` event in the constructor, so the caller could never receive it (since there was no time for them to add a listener). Instead, emit the event where it's intended – after the worker is initialized. --- After this parcel no longer freezes, but still will fail due to other bugs (which will be fixed in other PRs) --- ext/node/polyfills/worker_threads.ts | 37 +++++++++++++++++++++++--- tests/unit_node/worker_threads_test.ts | 31 +++++++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index eaabe9cd7e..24bcbe057b 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -52,6 +52,16 @@ function debugWT(...args) { } } +interface WorkerOnlineMsg { + type: "WORKER_ONLINE"; +} + +function isWorkerOnlineMsg(data: unknown): data is WorkerOnlineMsg { + return typeof data === "object" && data !== null && + ObjectHasOwn(data, "type") && + (data as { "type": unknown })["type"] === "WORKER_ONLINE"; +} + export interface WorkerOptions { // only for typings argv?: unknown[]; @@ -81,6 +91,7 @@ class NodeWorker extends EventEmitter { #refCount = 1; #messagePromise = undefined; #controlPromise = undefined; + #workerOnline = false; // "RUNNING" | "CLOSED" | "TERMINATED" // "TERMINATED" means that any controls or messages received will be // discarded. "CLOSED" means that we have received a control @@ -141,6 +152,7 @@ class NodeWorker extends EventEmitter { workerData: options?.workerData, environmentData: environmentData, env: env_, + isWorkerThread: true, }, options?.transferList ?? []); const id = op_create_worker( { @@ -159,8 +171,6 @@ class NodeWorker extends EventEmitter { this.threadId = id; this.#pollControl(); this.#pollMessages(); - // https://nodejs.org/api/worker_threads.html#event-online - this.emit("online"); } [privateWorkerRef](ref) { @@ -243,7 +253,17 @@ class NodeWorker extends EventEmitter { this.emit("messageerror", err); return; } - this.emit("message", message); + if ( + // only emit "online" event once, and since the message + // has to come before user messages, we are safe to assume + // it came from us + !this.#workerOnline && isWorkerOnlineMsg(message) + ) { + this.#workerOnline = true; + this.emit("online"); + } else { + this.emit("message", message); + } } }; @@ -358,10 +378,12 @@ internals.__initWorkerThreads = ( parentPort = globalThis as ParentPort; threadId = workerId; + let isWorkerThread = false; if (maybeWorkerMetadata) { const { 0: metadata, 1: _ } = maybeWorkerMetadata; workerData = metadata.workerData; environmentData = metadata.environmentData; + isWorkerThread = metadata.isWorkerThread; const env = metadata.env; if (env) { process.env = env; @@ -425,6 +447,15 @@ internals.__initWorkerThreads = ( parentPort.ref = () => { parentPort[unrefPollForMessages] = false; }; + + if (isWorkerThread) { + // Notify the host that the worker is online + parentPort.postMessage( + { + type: "WORKER_ONLINE", + } satisfies WorkerOnlineMsg, + ); + } } }; diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index d9fe9f77d5..ac797601f5 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -590,3 +590,34 @@ Deno.test({ channel.port2.close(); }, }); + +Deno.test({ + name: "[node/worker_threads] Emits online event", + async fn() { + const worker = new workerThreads.Worker( + ` + import { parentPort } from "node:worker_threads"; + const p = Promise.withResolvers(); + let ok = false; + parentPort.on("message", () => { + ok = true; + p.resolve(); + }); + await Promise.race([p.promise, new Promise(resolve => setTimeout(resolve, 20000))]); + if (ok) { + parentPort.postMessage("ok"); + } else { + parentPort.postMessage("timed out"); + } + `, + { + eval: true, + }, + ); + worker.on("online", () => { + worker.postMessage("ok"); + }); + assertEquals((await once(worker, "message"))[0], "ok"); + worker.terminate(); + }, +});