1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-23 15:49:44 -05:00

compat: support --compat in web workers (#13629)

Adds another callback to WebWorkerOptions that allows to execute
some modules before actual worker code executes. This allows to set up Node
global using std/node.
This commit is contained in:
Bartek Iwańczuk 2022-02-11 13:41:56 +01:00 committed by GitHub
parent 2f2c778a07
commit 2fa0096821
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 194 additions and 19 deletions

View file

@ -71,6 +71,7 @@ use deno_ast::MediaType;
use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::futures::future::FutureExt;
use deno_core::futures::future::LocalFutureObj;
use deno_core::futures::Future;
use deno_core::located_script_name;
use deno_core::parking_lot::RwLock;
@ -82,6 +83,7 @@ use deno_core::Extension;
use deno_core::ModuleSpecifier;
use deno_runtime::colors;
use deno_runtime::ops::worker_host::CreateWebWorkerCb;
use deno_runtime::ops::worker_host::PreloadModuleCb;
use deno_runtime::permissions::Permissions;
use deno_runtime::tokio_util::run_basic;
use deno_runtime::web_worker::WebWorker;
@ -100,6 +102,24 @@ use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
fn create_web_worker_preload_module_callback(
ps: ProcState,
) -> Arc<PreloadModuleCb> {
let compat = ps.flags.compat;
Arc::new(move |mut worker| {
let fut = async move {
if compat {
worker.execute_side_module(&compat::GLOBAL_URL).await?;
worker.execute_side_module(&compat::MODULE_URL).await?;
}
Ok(worker)
};
LocalFutureObj::new(Box::new(fut))
})
}
fn create_web_worker_callback(ps: ProcState) -> Arc<CreateWebWorkerCb> {
Arc::new(move |args| {
let global_state_ = ps.clone();
@ -116,6 +136,8 @@ fn create_web_worker_callback(ps: ProcState) -> Arc<CreateWebWorkerCb> {
args.parent_permissions.clone(),
);
let create_web_worker_cb = create_web_worker_callback(ps.clone());
let preload_module_cb =
create_web_worker_preload_module_callback(ps.clone());
let extensions = ops::cli_exts(ps.clone(), args.use_deno_namespace);
@ -145,6 +167,7 @@ fn create_web_worker_callback(ps: ProcState) -> Arc<CreateWebWorkerCb> {
seed: ps.flags.seed,
module_loader,
create_web_worker_cb,
preload_module_cb,
js_error_create_fn: Some(js_error_create_fn),
use_deno_namespace: args.use_deno_namespace,
worker_type: args.worker_type,
@ -187,6 +210,8 @@ pub fn create_main_worker(
let should_break_on_first_statement = ps.flags.inspect_brk.is_some();
let create_web_worker_cb = create_web_worker_callback(ps.clone());
let web_worker_preload_module_cb =
create_web_worker_preload_module_callback(ps.clone());
let maybe_storage_key = if let Some(location) = &ps.flags.location {
// if a location is set, then the ascii serialization of the location is
@ -240,6 +265,7 @@ pub fn create_main_worker(
seed: ps.flags.seed,
js_error_create_fn: Some(js_error_create_fn),
create_web_worker_cb,
web_worker_preload_module_cb,
maybe_inspector_server,
should_break_on_first_statement,
module_loader,

View file

@ -208,6 +208,9 @@ pub async fn run(
let create_web_worker_cb = Arc::new(|_| {
todo!("Worker are currently not supported in standalone binaries");
});
let web_worker_preload_module_cb = Arc::new(|_| {
todo!("Worker are currently not supported in standalone binaries");
});
// Keep in sync with `main.rs`.
v8_set_flags(
@ -257,6 +260,7 @@ pub async fn run(
seed: metadata.seed,
js_error_create_fn: None,
create_web_worker_cb,
web_worker_preload_module_cb,
maybe_inspector_server: None,
should_break_on_first_statement: false,
module_loader,

View file

@ -90,6 +90,11 @@ itest!(top_level_fail_esm {
output: "compat/test_runner/top_level_fail_esm.out",
});
itest!(compat_worker {
args: "run --compat --unstable -A --quiet --no-check compat/worker/worker_test.mjs",
output: "compat/worker/worker_test.out",
});
#[test]
fn globals_in_repl() {
let (out, _err) = util::run_and_collect_output_with_args(

View file

@ -0,0 +1,9 @@
console.log("hello from worker");
self.onmessage = (e) => {
if (e.data != "hello") {
throw new Error("wrong message");
}
self.postMessage({ pid: process.pid });
}

View file

@ -0,0 +1,18 @@
import { deferred } from "../../../../../test_util/std/async/deferred.ts";
const promise = deferred();
const url = new URL("./worker.mjs", import.meta.url);
const worker = new Worker(url.href, { type: "module", deno: true });
worker.onmessage = (e) => {
const pid = e.data.pid;
if (typeof pid != "number") {
throw new Error("pid is not a number");
}
console.log("process.pid from worker:", pid);
promise.resolve();
};
worker.postMessage("hello");
await promise;
worker.terminate();

View file

@ -0,0 +1,2 @@
hello from worker
process.pid from worker: [WILDCARD]

View file

@ -22,6 +22,9 @@ async fn main() -> Result<(), AnyError> {
let create_web_worker_cb = Arc::new(|_| {
todo!("Web workers are not supported in the example");
});
let web_worker_preload_module_cb = Arc::new(|_| {
todo!("Web workers are not supported in the example");
});
let options = WorkerOptions {
bootstrap: BootstrapOptions {
@ -42,6 +45,7 @@ async fn main() -> Result<(), AnyError> {
user_agent: "hello_runtime".to_string(),
seed: None,
js_error_create_fn: None,
web_worker_preload_module_cb,
create_web_worker_cb,
maybe_inspector_server: None,
should_break_on_first_statement: false,

View file

@ -676,7 +676,7 @@ delete Object.prototype.__proto__;
numCpus = cpuCount;
registerErrors();
pollForMessages();
globalThis.pollForMessages = pollForMessages;
const internalSymbol = Symbol("Deno.internal");

View file

@ -12,6 +12,7 @@ use crate::web_worker::WebWorkerType;
use crate::web_worker::WorkerControlEvent;
use crate::web_worker::WorkerId;
use deno_core::error::AnyError;
use deno_core::futures::future::LocalFutureObj;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::serde::Deserialize;
@ -42,13 +43,24 @@ pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, Sendable
+ Sync
+ Send;
pub type PreloadModuleCb = dyn Fn(WebWorker) -> LocalFutureObj<'static, Result<WebWorker, AnyError>>
+ Sync
+ Send;
/// A holder for callback that is used to create a new
/// WebWorker. It's a struct instead of a type alias
/// because `GothamState` used in `OpState` overrides
/// value if type alises have the same underlying type
/// value if type aliases have the same underlying type
#[derive(Clone)]
pub struct CreateWebWorkerCbHolder(Arc<CreateWebWorkerCb>);
/// A holder for callback that can used to preload some modules into a WebWorker
/// before actual worker code is executed. It's a struct instead of a type
/// because `GothamState` used in `OpState` overrides
/// value if type aliases have the same underlying type
#[derive(Clone)]
pub struct PreloadModuleCbHolder(Arc<PreloadModuleCb>);
pub struct WorkerThread {
// It's an Option so we can take the value before dropping the WorkerThread.
join_handle: Option<JoinHandle<Result<(), AnyError>>>,
@ -91,15 +103,21 @@ impl Drop for WorkerThread {
pub type WorkersTable = HashMap<WorkerId, WorkerThread>;
pub fn init(create_web_worker_cb: Arc<CreateWebWorkerCb>) -> Extension {
pub fn init(
create_web_worker_cb: Arc<CreateWebWorkerCb>,
preload_module_cb: Arc<PreloadModuleCb>,
) -> Extension {
Extension::builder()
.state(move |state| {
state.put::<WorkersTable>(WorkersTable::default());
state.put::<WorkerId>(WorkerId::default());
let create_module_loader =
let create_web_worker_cb_holder =
CreateWebWorkerCbHolder(create_web_worker_cb.clone());
state.put::<CreateWebWorkerCbHolder>(create_module_loader);
state.put::<CreateWebWorkerCbHolder>(create_web_worker_cb_holder);
let preload_module_cb_holder =
PreloadModuleCbHolder(preload_module_cb.clone());
state.put::<PreloadModuleCbHolder>(preload_module_cb_holder);
Ok(())
})
@ -174,8 +192,10 @@ fn op_create_worker(
// have access to `exit_code` but the child does?
let maybe_exit_code = state.try_borrow::<Arc<AtomicI32>>().cloned();
let worker_id = state.take::<WorkerId>();
let create_module_loader = state.take::<CreateWebWorkerCbHolder>();
state.put::<CreateWebWorkerCbHolder>(create_module_loader.clone());
let create_web_worker_cb = state.take::<CreateWebWorkerCbHolder>();
state.put::<CreateWebWorkerCbHolder>(create_web_worker_cb.clone());
let preload_module_cb = state.take::<PreloadModuleCbHolder>();
state.put::<PreloadModuleCbHolder>(preload_module_cb.clone());
state.put::<WorkerId>(worker_id.next().unwrap());
let module_specifier = deno_core::resolve_url(&specifier)?;
@ -197,7 +217,7 @@ fn op_create_worker(
// - newly spawned thread exits
let (worker, external_handle) =
(create_module_loader.0)(CreateWebWorkerArgs {
(create_web_worker_cb.0)(CreateWebWorkerArgs {
name: worker_name,
worker_id,
parent_permissions,
@ -216,7 +236,12 @@ fn op_create_worker(
// is using `worker.internal_channels`.
//
// Host can already push messages and interact with worker.
run_web_worker(worker, module_specifier, maybe_source_code)
run_web_worker(
worker,
module_specifier,
maybe_source_code,
preload_module_cb.0,
)
})?;
// Receive WebWorkerHandle from newly created worker

View file

@ -304,6 +304,7 @@ pub struct WebWorker {
pub use_deno_namespace: bool,
pub worker_type: WebWorkerType,
pub main_module: ModuleSpecifier,
poll_for_messages_fn: Option<v8::Global<v8::Value>>,
}
pub struct WebWorkerOptions {
@ -315,6 +316,7 @@ pub struct WebWorkerOptions {
pub seed: Option<u64>,
pub module_loader: Rc<dyn ModuleLoader>,
pub create_web_worker_cb: Arc<ops::worker_host::CreateWebWorkerCb>,
pub preload_module_cb: Arc<ops::worker_host::PreloadModuleCb>,
pub js_error_create_fn: Option<Rc<JsErrorCreateFn>>,
pub use_deno_namespace: bool,
pub worker_type: WebWorkerType,
@ -395,7 +397,10 @@ impl WebWorker {
let runtime_exts = vec![
ops::web_worker::init(),
ops::runtime::init(main_module.clone()),
ops::worker_host::init(options.create_web_worker_cb.clone()),
ops::worker_host::init(
options.create_web_worker_cb.clone(),
options.preload_module_cb.clone(),
),
ops::io::init(),
];
@ -468,6 +473,7 @@ impl WebWorker {
use_deno_namespace: options.use_deno_namespace,
worker_type: options.worker_type,
main_module,
poll_for_messages_fn: None,
},
external_handle,
)
@ -486,6 +492,18 @@ impl WebWorker {
self
.execute_script(&located_script_name!(), &script)
.expect("Failed to execute worker bootstrap script");
// Save a reference to function that will start polling for messages
// from a worker host; it will be called after the user code is loaded.
let script = r#"
const pollForMessages = globalThis.pollForMessages;
delete globalThis.pollForMessages;
pollForMessages
"#;
let poll_for_messages_fn = self
.js_runtime
.execute_script(&located_script_name!(), script)
.expect("Failed to execute worker bootstrap script");
self.poll_for_messages_fn = Some(poll_for_messages_fn);
}
/// See [JsRuntime::execute_script](deno_core::JsRuntime::execute_script)
@ -519,11 +537,36 @@ impl WebWorker {
}
/// Loads, instantiates and executes specified JavaScript module.
pub async fn execute_main_module(
///
/// This method assumes that worker can't be terminated when executing
/// side module code.
pub async fn execute_side_module(
&mut self,
module_specifier: &ModuleSpecifier,
) -> Result<(), AnyError> {
let id = self.preload_module(module_specifier, true).await?;
let id = self.preload_module(module_specifier, false).await?;
let mut receiver = self.js_runtime.mod_evaluate(id);
tokio::select! {
maybe_result = &mut receiver => {
debug!("received module evaluate {:#?}", maybe_result);
maybe_result.expect("Module evaluation result not provided.")
}
event_loop_result = self.js_runtime.run_event_loop(false) => {
event_loop_result?;
let maybe_result = receiver.await;
maybe_result.expect("Module evaluation result not provided.")
}
}
}
/// Loads, instantiates and executes specified JavaScript module.
///
/// This module will have "import.meta.main" equal to true.
pub async fn execute_main_module(
&mut self,
id: ModuleId,
) -> Result<(), AnyError> {
let mut receiver = self.js_runtime.mod_evaluate(id);
tokio::select! {
maybe_result = &mut receiver => {
@ -582,6 +625,17 @@ impl WebWorker {
) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx, wait_for_inspector)).await
}
// Starts polling for messages from worker host from JavaScript.
fn start_polling_for_messages(&mut self) {
let poll_for_messages_fn = self.poll_for_messages_fn.take().unwrap();
let scope = &mut self.js_runtime.handle_scope();
let poll_for_messages =
v8::Local::<v8::Value>::new(scope, poll_for_messages_fn);
let fn_ = v8::Local::<v8::Function>::try_from(poll_for_messages).unwrap();
let undefined = v8::undefined(scope);
fn_.call(scope, undefined.into(), &[]).unwrap();
}
}
fn print_worker_error(error_str: String, name: &str) {
@ -596,9 +650,10 @@ fn print_worker_error(error_str: String, name: &str) {
/// This function should be called from a thread dedicated to this worker.
// TODO(bartlomieju): check if order of actions is aligned to Worker spec
pub fn run_web_worker(
mut worker: WebWorker,
worker: WebWorker,
specifier: ModuleSpecifier,
maybe_source_code: Option<String>,
preload_module_cb: Arc<ops::worker_host::PreloadModuleCb>,
) -> Result<(), AnyError> {
let name = worker.name.to_string();
@ -606,17 +661,39 @@ pub fn run_web_worker(
// with terminate
let fut = async move {
let internal_handle = worker.internal_handle.clone();
let result = (preload_module_cb)(worker).await;
let mut worker = match result {
Ok(worker) => worker,
Err(e) => {
print_worker_error(e.to_string(), &name);
internal_handle
.post_event(WorkerControlEvent::TerminalError(e))
.expect("Failed to post message to host");
// Failure to execute script is a terminal error, bye, bye.
return Ok(());
}
};
// Execute provided source code immediately
let result = if let Some(source_code) = maybe_source_code {
worker.execute_script(&located_script_name!(), &source_code)
let r = worker.execute_script(&located_script_name!(), &source_code);
worker.start_polling_for_messages();
r
} else {
// TODO(bartlomieju): add "type": "classic", ie. ability to load
// script instead of module
worker.execute_main_module(&specifier).await
match worker.preload_module(&specifier, true).await {
Ok(id) => {
worker.start_polling_for_messages();
worker.execute_main_module(id).await
}
Err(e) => Err(e),
}
};
let internal_handle = worker.internal_handle.clone();
// If sender is closed it means that worker has already been closed from
// within using "globalThis.close()"
if internal_handle.is_terminated() {

View file

@ -51,8 +51,9 @@ pub struct WorkerOptions {
pub user_agent: String,
pub seed: Option<u64>,
pub module_loader: Rc<dyn ModuleLoader>,
// Callback invoked when creating new instance of WebWorker
// Callbacks invoked when creating new instance of WebWorker
pub create_web_worker_cb: Arc<ops::worker_host::CreateWebWorkerCb>,
pub web_worker_preload_module_cb: Arc<ops::worker_host::PreloadModuleCb>,
pub js_error_create_fn: Option<Rc<JsErrorCreateFn>>,
pub maybe_inspector_server: Option<Arc<InspectorServer>>,
pub should_break_on_first_statement: bool,
@ -126,7 +127,10 @@ impl MainWorker {
deno_ffi::init::<Permissions>(unstable),
// Runtime ops
ops::runtime::init(main_module.clone()),
ops::worker_host::init(options.create_web_worker_cb.clone()),
ops::worker_host::init(
options.create_web_worker_cb.clone(),
options.web_worker_preload_module_cb.clone(),
),
ops::fs_events::init(),
ops::fs::init(),
ops::io::init(),
@ -367,6 +371,7 @@ mod tests {
root_cert_store: None,
seed: None,
js_error_create_fn: None,
web_worker_preload_module_cb: Arc::new(|_| unreachable!()),
create_web_worker_cb: Arc::new(|_| unreachable!()),
maybe_inspector_server: None,
should_break_on_first_statement: false,