1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-28 16:20:57 -05:00

fix(ext/node): make worker setup synchronous

This commit is contained in:
Bartek Iwańczuk 2024-03-09 00:02:43 +01:00
parent 66d1b155dd
commit 23237ff5b5
No known key found for this signature in database
GPG key ID: 0C6BCDDC3B3AD750
6 changed files with 54 additions and 7 deletions

View file

@ -841,6 +841,7 @@ fn create_web_worker_callback(
stdio: stdio.clone(),
cache_storage_dir,
feature_checker,
maybe_worker_data: args.maybe_worker_data,
};
WebWorker::bootstrap_from_options(

View file

@ -14,6 +14,7 @@ function initialize(
usesLocalNodeModulesDir,
argv0,
runningOnMainThread,
maybeWorkerData,
) {
if (initialized) {
throw Error("Node runtime already initialized");
@ -38,7 +39,7 @@ function initialize(
// FIXME(bartlomieju): not nice to depend on `Deno` namespace here
// but it's the only way to get `args` and `version` and this point.
internals.__bootstrapNodeProcess(argv0, Deno.args, Deno.version);
internals.__initWorkerThreads(runningOnMainThread);
internals.__initWorkerThreads(runningOnMainThread, maybeWorkerData);
internals.__setupChildProcessIpcChannel();
// `Deno[Deno.internal].requireImpl` will be unreachable after this line.
delete internals.requireImpl;

View file

@ -192,6 +192,12 @@ class NodeWorker extends EventEmitter {
name = "[worker eval]";
}
this.#name = name;
const maybeWorkerData = options?.workerData;
const serializedWorkerData = maybeWorkerData
? core.serialize(maybeWorkerData)
: undefined;
const id = op_create_worker(
{
// deno-lint-ignore prefer-primordials
@ -210,7 +216,6 @@ class NodeWorker extends EventEmitter {
this.postMessage({
environmentData,
threadId: (this.threadId = ++threads),
workerData: options?.workerData,
}, options?.transferList || []);
// https://nodejs.org/api/worker_threads.html#event-online
this.emit("online");
@ -383,7 +388,10 @@ type ParentPort = typeof self & NodeEventTarget;
// deno-lint-ignore no-explicit-any
let parentPort: ParentPort = null as any;
internals.__initWorkerThreads = (runningOnMainThread: boolean) => {
internals.__initWorkerThreads = (
runningOnMainThread: boolean,
maybeWorkerData,
) => {
isMainThread = runningOnMainThread;
defaultExport.isMainThread = isMainThread;
@ -405,6 +413,8 @@ internals.__initWorkerThreads = (runningOnMainThread: boolean) => {
>();
parentPort = self as ParentPort;
workerData = maybeWorkerData;
defaultExport.workerData = workerData;
const initPromise = PromisePrototypeThen(
once(
@ -420,11 +430,9 @@ internals.__initWorkerThreads = (runningOnMainThread: boolean) => {
// using the first message from the parent.
// This should be done synchronously.
threadId = data.threadId;
workerData = data.workerData;
environmentData = data.environmentData;
defaultExport.threadId = threadId;
defaultExport.workerData = workerData;
},
);

View file

@ -786,6 +786,7 @@ function bootstrapWorkerRuntime(
runtimeOptions,
name,
internalName,
maybeWorkerData,
) {
if (hasBootstrapped) {
throw new Error("Worker runtime already bootstrapped");
@ -908,8 +909,18 @@ function bootstrapWorkerRuntime(
// existing global `Deno` with `Deno` namespace from "./deno.ts".
ObjectDefineProperty(globalThis, "Deno", core.propReadOnly(finalDenoNs));
let workerData = undefined;
if (maybeWorkerData) {
workerData = core.deserialize(maybeWorkerData);
}
if (nodeBootstrap) {
nodeBootstrap(hasNodeModulesDir, argv0, /* runningOnMainThread */ false);
nodeBootstrap(
hasNodeModulesDir,
argv0,
/* runningOnMainThread */ false,
workerData,
);
}
}

View file

@ -15,8 +15,10 @@ use crate::worker::FormatJsErrorFn;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::serde::Deserialize;
use deno_core::v8;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::ModuleSpecifier;
use deno_core::OpState;
use deno_web::JsMessageData;
@ -35,6 +37,7 @@ pub struct CreateWebWorkerArgs {
pub permissions: PermissionsContainer,
pub main_module: ModuleSpecifier,
pub worker_type: WebWorkerType,
pub maybe_worker_data: Option<Vec<u8>>,
}
pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle)
@ -121,6 +124,7 @@ pub struct CreateWorkerArgs {
fn op_create_worker(
state: &mut OpState,
#[serde] args: CreateWorkerArgs,
#[buffer] maybe_worker_data: Option<JsBuffer>,
) -> Result<WorkerId, AnyError> {
let specifier = args.specifier.clone();
let maybe_source_code = if args.has_source_code {
@ -174,6 +178,7 @@ fn op_create_worker(
// Setup new thread
let thread_builder = std::thread::Builder::new().name(format!("{worker_id}"));
let maybe_worker_data = maybe_worker_data.map(|buf| buf.to_vec());
// Spawn it
thread_builder.spawn(move || {
// Any error inside this block is terminal:
@ -189,6 +194,7 @@ fn op_create_worker(
permissions: worker_permissions,
main_module: module_specifier.clone(),
worker_type,
maybe_worker_data,
});
// Send thread safe handle from newly created worker to host thread

View file

@ -24,6 +24,7 @@ use deno_core::serde::Deserialize;
use deno_core::serde::Serialize;
use deno_core::serde_json::json;
use deno_core::v8;
use deno_core::v8::ValueDeserializerHelper;
use deno_core::CancelHandle;
use deno_core::CompiledWasmModuleStore;
use deno_core::Extension;
@ -331,6 +332,8 @@ pub struct WebWorker {
pub main_module: ModuleSpecifier,
poll_for_messages_fn: Option<v8::Global<v8::Value>>,
bootstrap_fn_global: Option<v8::Global<v8::Function>>,
// Consumed when `bootstrap_fn` is called
maybe_worker_data: Option<Vec<u8>>,
}
pub struct WebWorkerOptions {
@ -356,6 +359,7 @@ pub struct WebWorkerOptions {
pub cache_storage_dir: Option<std::path::PathBuf>,
pub stdio: Stdio,
pub feature_checker: Arc<FeatureChecker>,
pub maybe_worker_data: Option<Vec<u8>>,
}
impl WebWorker {
@ -601,6 +605,7 @@ impl WebWorker {
main_module,
poll_for_messages_fn: None,
bootstrap_fn_global: Some(bootstrap_fn_global),
maybe_worker_data: options.maybe_worker_data,
},
external_handle,
)
@ -616,6 +621,17 @@ impl WebWorker {
let bootstrap_fn = self.bootstrap_fn_global.take().unwrap();
let bootstrap_fn = v8::Local::new(scope, bootstrap_fn);
let undefined = v8::undefined(scope);
let mut worker_data: v8::Local<v8::Value> = v8::undefined(scope).into();
if let Some(buf) = self.maybe_worker_data.take() {
let len = buf.len();
let store = v8::ArrayBuffer::new_backing_store_from_boxed_slice(
buf.into_boxed_slice(),
);
let ab =
v8::ArrayBuffer::with_backing_store(scope, &store.make_shared());
let v8_buf = v8::Uint8Array::new(scope, ab, 0, len).unwrap();
worker_data = v8_buf.into();
}
let name_str: v8::Local<v8::Value> =
v8::String::new(scope, &self.name).unwrap().into();
let id_str: v8::Local<v8::Value> =
@ -623,7 +639,11 @@ impl WebWorker {
.unwrap()
.into();
bootstrap_fn
.call(scope, undefined.into(), &[args, name_str, id_str])
.call(
scope,
undefined.into(),
&[args, name_str, id_str, worker_data],
)
.unwrap();
}
// TODO(bartlomieju): this could be done using V8 API, without calling `execute_script`.