1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 16:42:21 -05:00

fix(ext/node): make worker ids sequential (#22884)

This commit is contained in:
Satya Rohith 2024-03-14 01:22:53 +05:30 committed by GitHub
parent 0fd8f549e2
commit bbc211906d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 81 additions and 24 deletions

View file

@ -14,6 +14,7 @@ function initialize(
usesLocalNodeModulesDir, usesLocalNodeModulesDir,
argv0, argv0,
runningOnMainThread, runningOnMainThread,
workerId,
maybeWorkerMetadata, maybeWorkerMetadata,
) { ) {
if (initialized) { if (initialized) {
@ -39,7 +40,11 @@ function initialize(
// FIXME(bartlomieju): not nice to depend on `Deno` namespace here // FIXME(bartlomieju): not nice to depend on `Deno` namespace here
// but it's the only way to get `args` and `version` and this point. // but it's the only way to get `args` and `version` and this point.
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version); internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
internals.__initWorkerThreads(runningOnMainThread, maybeWorkerMetadata); internals.__initWorkerThreads(
runningOnMainThread,
workerId,
maybeWorkerMetadata,
);
internals.__setupChildProcessIpcChannel(); internals.__setupChildProcessIpcChannel();
// `Deno[Deno.internal].requireImpl` will be unreachable after this line. // `Deno[Deno.internal].requireImpl` will be unreachable after this line.
delete internals.requireImpl; delete internals.requireImpl;

View file

@ -134,7 +134,6 @@ function toFileUrl(path: string): URL {
: toFileUrlPosix(path); : toFileUrlPosix(path);
} }
let threads = 0;
const privateWorkerRef = Symbol("privateWorkerRef"); const privateWorkerRef = Symbol("privateWorkerRef");
class NodeWorker extends EventEmitter { class NodeWorker extends EventEmitter {
#id = 0; #id = 0;
@ -195,12 +194,10 @@ class NodeWorker extends EventEmitter {
name = "[worker eval]"; name = "[worker eval]";
} }
this.#name = name; this.#name = name;
this.threadId = ++threads;
const serializedWorkerMetadata = serializeJsMessageData({ const serializedWorkerMetadata = serializeJsMessageData({
workerData: options?.workerData, workerData: options?.workerData,
environmentData: environmentData, environmentData: environmentData,
threadId: this.threadId,
}, options?.transferList ?? []); }, options?.transferList ?? []);
const id = op_create_worker( const id = op_create_worker(
{ {
@ -216,6 +213,7 @@ class NodeWorker extends EventEmitter {
serializedWorkerMetadata, serializedWorkerMetadata,
); );
this.#id = id; this.#id = id;
this.threadId = id;
this.#pollControl(); this.#pollControl();
this.#pollMessages(); this.#pollMessages();
// https://nodejs.org/api/worker_threads.html#event-online // https://nodejs.org/api/worker_threads.html#event-online
@ -391,6 +389,7 @@ let parentPort: ParentPort = null as any;
internals.__initWorkerThreads = ( internals.__initWorkerThreads = (
runningOnMainThread: boolean, runningOnMainThread: boolean,
workerId,
maybeWorkerMetadata, maybeWorkerMetadata,
) => { ) => {
isMainThread = runningOnMainThread; isMainThread = runningOnMainThread;
@ -414,11 +413,11 @@ internals.__initWorkerThreads = (
>(); >();
parentPort = self as ParentPort; parentPort = self as ParentPort;
threadId = workerId;
if (maybeWorkerMetadata) { if (maybeWorkerMetadata) {
const { 0: metadata, 1: _ } = maybeWorkerMetadata; const { 0: metadata, 1: _ } = maybeWorkerMetadata;
workerData = metadata.workerData; workerData = metadata.workerData;
environmentData = metadata.environmentData; environmentData = metadata.environmentData;
threadId = metadata.threadId;
} }
defaultExport.workerData = workerData; defaultExport.workerData = workerData;
defaultExport.parentPort = parentPort; defaultExport.parentPort = parentPort;

View file

@ -794,6 +794,7 @@ function bootstrapWorkerRuntime(
runtimeOptions, runtimeOptions,
name, name,
internalName, internalName,
workerId,
maybeWorkerMetadata, maybeWorkerMetadata,
) { ) {
if (hasBootstrapped) { if (hasBootstrapped) {
@ -929,6 +930,7 @@ function bootstrapWorkerRuntime(
hasNodeModulesDir, hasNodeModulesDir,
argv0, argv0,
/* runningOnMainThread */ false, /* runningOnMainThread */ false,
workerId,
workerMetadata, workerMetadata,
); );
} }

View file

@ -95,7 +95,6 @@ deno_core::extension!(
}, },
state = |state, options| { state = |state, options| {
state.put::<WorkersTable>(WorkersTable::default()); state.put::<WorkersTable>(WorkersTable::default());
state.put::<WorkerId>(WorkerId::default());
let create_web_worker_cb_holder = let create_web_worker_cb_holder =
CreateWebWorkerCbHolder(options.create_web_worker_cb); CreateWebWorkerCbHolder(options.create_web_worker_cb);
@ -163,10 +162,9 @@ fn op_create_worker(
parent_permissions.clone() parent_permissions.clone()
}; };
let parent_permissions = parent_permissions.clone(); let parent_permissions = parent_permissions.clone();
let worker_id = state.take::<WorkerId>();
let create_web_worker_cb = state.borrow::<CreateWebWorkerCbHolder>().clone(); let create_web_worker_cb = state.borrow::<CreateWebWorkerCbHolder>().clone();
let format_js_error_fn = state.borrow::<FormatJsErrorFnHolder>().clone(); let format_js_error_fn = state.borrow::<FormatJsErrorFnHolder>().clone();
state.put::<WorkerId>(worker_id.next().unwrap()); let worker_id = WorkerId::new();
let module_specifier = deno_core::resolve_url(&specifier)?; let module_specifier = deno_core::resolve_url(&specifier)?;
let worker_name = args_name.unwrap_or_default(); let worker_name = args_name.unwrap_or_default();

View file

@ -55,11 +55,33 @@ use std::cell::RefCell;
use std::fmt; use std::fmt;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
static WORKER_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct WorkerId(u32);
impl WorkerId {
pub fn new() -> WorkerId {
let id = WORKER_ID_COUNTER.fetch_add(1, Ordering::SeqCst);
WorkerId(id)
}
}
impl fmt::Display for WorkerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "worker-{}", self.0)
}
}
impl Default for WorkerId {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum WebWorkerType { pub enum WebWorkerType {
@ -67,21 +89,6 @@ pub enum WebWorkerType {
Module, Module,
} }
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize,
)]
pub struct WorkerId(u32);
impl fmt::Display for WorkerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "worker-{}", self.0)
}
}
impl WorkerId {
pub fn next(&self) -> Option<WorkerId> {
self.0.checked_add(1).map(WorkerId)
}
}
/// Events that are sent to host from child /// Events that are sent to host from child
/// worker. /// worker.
pub enum WorkerControlEvent { pub enum WorkerControlEvent {
@ -630,11 +637,13 @@ impl WebWorker {
v8::String::new(scope, &format!("{}", self.id)) v8::String::new(scope, &format!("{}", self.id))
.unwrap() .unwrap()
.into(); .into();
let id: v8::Local<v8::Value> =
v8::Integer::new(scope, self.id.0 as i32).into();
bootstrap_fn bootstrap_fn
.call( .call(
scope, scope,
undefined.into(), undefined.into(),
&[args, name_str, id_str, worker_data], &[args, name_str, id_str, id, worker_data],
) )
.unwrap(); .unwrap();
} }

View file

@ -112,6 +112,12 @@ itest!(worker_doest_stall_event_loop {
exit_code: 0, exit_code: 0,
}); });
itest!(worker_ids_are_sequential {
args: "run --quiet -A workers/worker_ids_are_sequential.ts",
output: "workers/worker_ids_are_sequential.ts.out",
exit_code: 0,
});
// Test for https://github.com/denoland/deno/issues/22629 // Test for https://github.com/denoland/deno/issues/22629
itest!(node_worker_auto_exits { itest!(node_worker_auto_exits {
args: "run --quiet --allow-read workers/node_worker_auto_exits.mjs", args: "run --quiet --allow-read workers/node_worker_auto_exits.mjs",

View file

@ -0,0 +1,34 @@
import {
isMainThread,
parentPort,
threadId,
Worker,
} from "node:worker_threads";
console.log("threadId", threadId);
if (isMainThread) {
const worker = new Worker(new URL(import.meta.url));
worker.on("message", (msg) => console.log("from worker:", msg));
worker.on("error", () => {
throw new Error("error");
});
worker.on("exit", (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
} else if (threadId == 1) {
const worker = new Worker(new URL(import.meta.url));
worker.on("message", (msg) => console.log("from worker:", msg));
worker.on("error", () => {
throw new Error("error");
});
worker.on("exit", (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
} else {
parentPort.postMessage("hello!");
}

View file

@ -0,0 +1,4 @@
threadId 0
threadId 1
threadId 2
from worker: hello!