1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

feat(ext/node): ref/unref on workers (#22778)

Implements ref/unref on worker to fix part of #22629
This commit is contained in:
Matt Mastracci 2024-03-07 18:51:19 -07:00 committed by GitHub
parent 2dfc0aca7c
commit 3745556ccd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 80 additions and 5 deletions

View file

@ -12,6 +12,7 @@ import { notImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter, once } from "node:events";
import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js";
import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js";
import { refWorker, unrefWorker } from "ext:runtime/11_workers.js";
let environmentData = new Map();
let threads = 0;
@ -170,6 +171,14 @@ class _Worker extends EventEmitter {
this.emit("exit", 0);
}
ref() {
refWorker(this[kHandle]);
}
unref() {
unrefWorker(this[kHandle]);
}
readonly getHeapSnapshot = () =>
notImplemented("Worker.prototype.getHeapSnapshot");
// fake performance

View file

@ -1,6 +1,6 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
import { primordials } from "ext:core/mod.js";
import { core, primordials } from "ext:core/mod.js";
import {
op_create_worker,
op_host_post_message,
@ -14,6 +14,7 @@ const {
ObjectPrototypeIsPrototypeOf,
String,
StringPrototypeStartsWith,
Symbol,
SymbolFor,
SymbolIterator,
SymbolToStringTag,
@ -72,9 +73,22 @@ function hostRecvMessage(id) {
return op_host_recv_message(id);
}
const privateWorkerRef = Symbol();
function refWorker(worker) {
worker[privateWorkerRef](true);
}
function unrefWorker(worker) {
worker[privateWorkerRef](false);
}
class Worker extends EventTarget {
#id = 0;
#name = "";
#refCount = 1;
#messagePromise = undefined;
#controlPromise = undefined;
// "RUNNING" | "CLOSED" | "TERMINATED"
// "TERMINATED" means that any controls or messages received will be
@ -128,6 +142,30 @@ class Worker extends EventTarget {
this.#pollMessages();
}
[privateWorkerRef](ref) {
if (ref) {
this.#refCount++;
} else {
this.#refCount--;
}
if (!ref && this.#refCount == 0) {
if (this.#controlPromise) {
core.unrefOpPromise(this.#controlPromise);
}
if (this.#messagePromise) {
core.unrefOpPromise(this.#messagePromise);
}
} else if (ref && this.#refCount == 1) {
if (this.#controlPromise) {
core.refOpPromise(this.#controlPromise);
}
if (this.#messagePromise) {
core.refOpPromise(this.#messagePromise);
}
}
}
#handleError(e) {
const event = new ErrorEvent("error", {
cancelable: true,
@ -151,7 +189,11 @@ class Worker extends EventTarget {
#pollControl = async () => {
while (this.#status === "RUNNING") {
const { 0: type, 1: data } = await hostRecvCtrl(this.#id);
this.#controlPromise = hostRecvCtrl(this.#id);
if (this.#refCount < 1) {
core.unrefOpPromise(this.#controlPromise);
}
const { 0: type, 1: data } = await this.#controlPromise;
// If terminate was called then we ignore all messages
if (this.#status === "TERMINATED") {
@ -182,7 +224,11 @@ class Worker extends EventTarget {
#pollMessages = async () => {
while (this.#status !== "TERMINATED") {
const data = await hostRecvMessage(this.#id);
this.#messagePromise = hostRecvMessage(this.#id);
if (this.#refCount < 1) {
core.unrefOpPromise(this.#messagePromise);
}
const data = await this.#messagePromise;
if (this.#status === "TERMINATED" || data === null) {
return;
}
@ -279,4 +325,4 @@ webidl.converters["WorkerType"] = webidl.createEnumConverter("WorkerType", [
"module",
]);
export { Worker };
export { refWorker, unrefWorker, Worker };

View file

@ -1,6 +1,11 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
import { assert, assertEquals, assertObjectMatch } from "@std/assert/mod.ts";
import {
assert,
assertEquals,
assertObjectMatch,
fail,
} from "@std/assert/mod.ts";
import { fromFileUrl, relative } from "@std/path/mod.ts";
import * as workerThreads from "node:worker_threads";
import { EventEmitter, once } from "node:events";
@ -198,3 +203,18 @@ Deno.test({
worker.terminate();
},
});
Deno.test({
name: "[worker_threads] unref",
async fn() {
const timeout = setTimeout(() => fail("Test timed out"), 60_000);
const child = new Deno.Command(Deno.execPath(), {
args: [
"eval",
"import { Worker } from 'node:worker_threads'; new Worker('setTimeout(() => {}, 1_000_000)', {eval:true}).unref();",
],
}).spawn();
await child.status;
clearTimeout(timeout);
},
});