1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

fix(ext/node): emit online event after worker thread is initialized (#25243)

Fixes #23281. Part of #20613.

We were emitting the `online` event in the constructor, so the caller
could never receive it (since there was no time for them to add a
listener). Instead, emit the event where it's intended – after the
worker is initialized.

---

After this parcel no longer freezes, but still will fail due to other
bugs (which will be fixed in other PRs)
This commit is contained in:
Nathan Whitaker 2024-08-27 20:05:32 -07:00 committed by GitHub
parent 3dba98532a
commit 511d13abaf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 65 additions and 3 deletions

View file

@ -52,6 +52,16 @@ function debugWT(...args) {
} }
} }
interface WorkerOnlineMsg {
type: "WORKER_ONLINE";
}
function isWorkerOnlineMsg(data: unknown): data is WorkerOnlineMsg {
return typeof data === "object" && data !== null &&
ObjectHasOwn(data, "type") &&
(data as { "type": unknown })["type"] === "WORKER_ONLINE";
}
export interface WorkerOptions { export interface WorkerOptions {
// only for typings // only for typings
argv?: unknown[]; argv?: unknown[];
@ -81,6 +91,7 @@ class NodeWorker extends EventEmitter {
#refCount = 1; #refCount = 1;
#messagePromise = undefined; #messagePromise = undefined;
#controlPromise = undefined; #controlPromise = undefined;
#workerOnline = false;
// "RUNNING" | "CLOSED" | "TERMINATED" // "RUNNING" | "CLOSED" | "TERMINATED"
// "TERMINATED" means that any controls or messages received will be // "TERMINATED" means that any controls or messages received will be
// discarded. "CLOSED" means that we have received a control // discarded. "CLOSED" means that we have received a control
@ -141,6 +152,7 @@ class NodeWorker extends EventEmitter {
workerData: options?.workerData, workerData: options?.workerData,
environmentData: environmentData, environmentData: environmentData,
env: env_, env: env_,
isWorkerThread: true,
}, options?.transferList ?? []); }, options?.transferList ?? []);
const id = op_create_worker( const id = op_create_worker(
{ {
@ -159,8 +171,6 @@ class NodeWorker extends EventEmitter {
this.threadId = id; this.threadId = id;
this.#pollControl(); this.#pollControl();
this.#pollMessages(); this.#pollMessages();
// https://nodejs.org/api/worker_threads.html#event-online
this.emit("online");
} }
[privateWorkerRef](ref) { [privateWorkerRef](ref) {
@ -243,7 +253,17 @@ class NodeWorker extends EventEmitter {
this.emit("messageerror", err); this.emit("messageerror", err);
return; return;
} }
this.emit("message", message); if (
// only emit "online" event once, and since the message
// has to come before user messages, we are safe to assume
// it came from us
!this.#workerOnline && isWorkerOnlineMsg(message)
) {
this.#workerOnline = true;
this.emit("online");
} else {
this.emit("message", message);
}
} }
}; };
@ -358,10 +378,12 @@ internals.__initWorkerThreads = (
parentPort = globalThis as ParentPort; parentPort = globalThis as ParentPort;
threadId = workerId; threadId = workerId;
let isWorkerThread = false;
if (maybeWorkerMetadata) { if (maybeWorkerMetadata) {
const { 0: metadata, 1: _ } = maybeWorkerMetadata; const { 0: metadata, 1: _ } = maybeWorkerMetadata;
workerData = metadata.workerData; workerData = metadata.workerData;
environmentData = metadata.environmentData; environmentData = metadata.environmentData;
isWorkerThread = metadata.isWorkerThread;
const env = metadata.env; const env = metadata.env;
if (env) { if (env) {
process.env = env; process.env = env;
@ -425,6 +447,15 @@ internals.__initWorkerThreads = (
parentPort.ref = () => { parentPort.ref = () => {
parentPort[unrefPollForMessages] = false; parentPort[unrefPollForMessages] = false;
}; };
if (isWorkerThread) {
// Notify the host that the worker is online
parentPort.postMessage(
{
type: "WORKER_ONLINE",
} satisfies WorkerOnlineMsg,
);
}
} }
}; };

View file

@ -590,3 +590,34 @@ Deno.test({
channel.port2.close(); channel.port2.close();
}, },
}); });
Deno.test({
name: "[node/worker_threads] Emits online event",
async fn() {
const worker = new workerThreads.Worker(
`
import { parentPort } from "node:worker_threads";
const p = Promise.withResolvers();
let ok = false;
parentPort.on("message", () => {
ok = true;
p.resolve();
});
await Promise.race([p.promise, new Promise(resolve => setTimeout(resolve, 20000))]);
if (ok) {
parentPort.postMessage("ok");
} else {
parentPort.postMessage("timed out");
}
`,
{
eval: true,
},
);
worker.on("online", () => {
worker.postMessage("ok");
});
assertEquals((await once(worker, "message"))[0], "ok");
worker.terminate();
},
});