From 45d4fd44c9444241a898d3075b99e8871fccdd65 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Mon, 28 Sep 2020 12:14:11 +0200 Subject: [PATCH] refactor: move op state registration to workers (#7696) --- cli/main.rs | 1 - cli/ops/fetch.rs | 10 +- cli/ops/random.rs | 9 +- cli/ops/runtime.rs | 7 +- cli/ops/timers.rs | 6 + cli/ops/web_worker.rs | 2 +- cli/ops/worker_host.rs | 10 +- cli/web_worker.rs | 375 ------------------------------------ cli/worker.rs | 426 ++++++++++++++++++++++++++++++++++++----- 9 files changed, 415 insertions(+), 431 deletions(-) delete mode 100644 cli/web_worker.rs diff --git a/cli/main.rs b/cli/main.rs index 0acf74aded..1518ca98e9 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -54,7 +54,6 @@ mod tsc; mod tsc_config; mod upgrade; pub mod version; -mod web_worker; pub mod worker; use crate::coverage::CoverageCollector; diff --git a/cli/ops/fetch.rs b/cli/ops/fetch.rs index 54585dc3d4..8c1a2b39c5 100644 --- a/cli/ops/fetch.rs +++ b/cli/ops/fetch.rs @@ -1,7 +1,15 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::permissions::Permissions; +use deno_fetch::reqwest; -pub fn init(rt: &mut deno_core::JsRuntime) { +pub fn init(rt: &mut deno_core::JsRuntime, maybe_ca_file: Option<&str>) { + { + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + state.put::({ + crate::http_util::create_http_client(maybe_ca_file).unwrap() + }); + } super::reg_json_async(rt, "op_fetch", deno_fetch::op_fetch::); super::reg_json_async(rt, "op_fetch_read", deno_fetch::op_fetch_read); super::reg_json_sync( diff --git a/cli/ops/random.rs b/cli/ops/random.rs index 53aedb73fc..20296c667d 100644 --- a/cli/ops/random.rs +++ b/cli/ops/random.rs @@ -8,8 +8,15 @@ use deno_core::ZeroCopyBuf; use rand::rngs::StdRng; use rand::thread_rng; use rand::Rng; +use rand::SeedableRng; -pub fn init(rt: &mut deno_core::JsRuntime) { +pub fn init(rt: &mut deno_core::JsRuntime, maybe_seed: Option) { + if let Some(seed) = maybe_seed { + let rng = StdRng::seed_from_u64(seed); + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + state.put::(rng); + } super::reg_json_sync(rt, "op_get_random_values", op_get_random_values); } diff --git a/cli/ops/runtime.rs b/cli/ops/runtime.rs index d059301f16..b1eddc2654 100644 --- a/cli/ops/runtime.rs +++ b/cli/ops/runtime.rs @@ -14,7 +14,12 @@ use deno_core::OpState; use deno_core::ZeroCopyBuf; use std::env; -pub fn init(rt: &mut deno_core::JsRuntime) { +pub fn init(rt: &mut deno_core::JsRuntime, main_module: ModuleSpecifier) { + { + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + state.put::(main_module); + } super::reg_json_sync(rt, "op_start", op_start); super::reg_json_sync(rt, "op_main_module", op_main_module); super::reg_json_sync(rt, "op_metrics", op_metrics); diff --git a/cli/ops/timers.rs b/cli/ops/timers.rs index eb65611930..74edc7267e 100644 --- a/cli/ops/timers.rs +++ b/cli/ops/timers.rs @@ -67,6 +67,12 @@ impl GlobalTimer { } pub fn init(rt: &mut deno_core::JsRuntime) { + { + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + state.put::(GlobalTimer::default()); + state.put::(StartTime::now()); + } super::reg_json_sync(rt, "op_global_timer_stop", op_global_timer_stop); super::reg_json_sync(rt, "op_global_timer_start", op_global_timer_start); super::reg_json_async(rt, "op_global_timer", op_global_timer); diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs index e57edaf6c2..42b6a56ceb 100644 --- a/cli/ops/web_worker.rs +++ b/cli/ops/web_worker.rs @@ -1,6 +1,6 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use crate::web_worker::WebWorkerHandle; +use crate::worker::WebWorkerHandle; use crate::worker::WorkerEvent; use deno_core::futures::channel::mpsc; use deno_core::serde_json::json; diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index 60262b4f62..9175ca0f16 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -5,8 +5,8 @@ use crate::global_state::GlobalState; use crate::ops::io::get_stdio; use crate::permissions::Permissions; use crate::tokio_util::create_basic_runtime; -use crate::web_worker::WebWorker; -use crate::web_worker::WebWorkerHandle; +use crate::worker::WebWorker; +use crate::worker::WebWorkerHandle; use crate::worker::WorkerEvent; use deno_core::error::AnyError; use deno_core::futures::future::FutureExt; @@ -26,6 +26,12 @@ use std::sync::Arc; use std::thread::JoinHandle; pub fn init(rt: &mut deno_core::JsRuntime) { + { + let op_state = rt.op_state(); + let mut state = op_state.borrow_mut(); + state.put::(WorkersTable::default()); + state.put::(WorkerId::default()); + } super::reg_json_sync(rt, "op_create_worker", op_create_worker); super::reg_json_sync( rt, diff --git a/cli/web_worker.rs b/cli/web_worker.rs deleted file mode 100644 index cb2a8b87eb..0000000000 --- a/cli/web_worker.rs +++ /dev/null @@ -1,375 +0,0 @@ -// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. - -use crate::global_state::GlobalState; -use crate::js; -use crate::ops; -use crate::permissions::Permissions; -use crate::state::CliModuleLoader; -use crate::worker::Worker; -use crate::worker::WorkerEvent; -use crate::worker::WorkerHandle; -use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; -use deno_core::futures::future::FutureExt; -use deno_core::futures::stream::StreamExt; -use deno_core::v8; -use deno_core::ModuleSpecifier; -use std::future::Future; -use std::ops::Deref; -use std::ops::DerefMut; -use std::pin::Pin; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::task::Context; -use std::task::Poll; - -/// Wrapper for `WorkerHandle` that adds functionality -/// for terminating workers. -/// -/// This struct is used by host as well as worker itself. -/// -/// Host uses it to communicate with worker and terminate it, -/// while worker uses it only to finish execution on `self.close()`. -#[derive(Clone)] -pub struct WebWorkerHandle { - worker_handle: WorkerHandle, - terminate_tx: mpsc::Sender<()>, - terminated: Arc, - isolate_handle: v8::IsolateHandle, -} - -impl Deref for WebWorkerHandle { - type Target = WorkerHandle; - fn deref(&self) -> &Self::Target { - &self.worker_handle - } -} - -impl DerefMut for WebWorkerHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker_handle - } -} - -impl WebWorkerHandle { - pub fn terminate(&self) { - // This function can be called multiple times by whomever holds - // the handle. However only a single "termination" should occur so - // we need a guard here. - let already_terminated = self.terminated.swap(true, Ordering::Relaxed); - - if !already_terminated { - self.isolate_handle.terminate_execution(); - let mut sender = self.terminate_tx.clone(); - // This call should be infallible hence the `expect`. - // This might change in the future. - sender.try_send(()).expect("Failed to terminate"); - } - } -} - -/// This worker is implementation of `Worker` Web API -/// -/// At the moment this type of worker supports only -/// communication with parent and creating new workers. -/// -/// Each `WebWorker` is either a child of `MainWorker` or other -/// `WebWorker`. -pub struct WebWorker { - worker: Worker, - event_loop_idle: bool, - terminate_rx: mpsc::Receiver<()>, - handle: WebWorkerHandle, - pub has_deno_namespace: bool, -} - -impl WebWorker { - pub fn new( - name: String, - permissions: Permissions, - main_module: ModuleSpecifier, - global_state: Arc, - has_deno_namespace: bool, - ) -> Self { - let loader = CliModuleLoader::new_for_worker(); - let mut worker = Worker::new( - name, - Some(js::deno_isolate_init()), - permissions, - main_module, - global_state, - loader, - false, - ); - - let terminated = Arc::new(AtomicBool::new(false)); - let isolate_handle = worker.isolate.thread_safe_handle(); - let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); - - let handle = WebWorkerHandle { - worker_handle: worker.thread_safe_handle(), - terminated, - isolate_handle, - terminate_tx, - }; - - let mut web_worker = Self { - worker, - event_loop_idle: false, - terminate_rx, - handle, - has_deno_namespace, - }; - - { - ops::runtime::init(&mut web_worker.worker); - let sender = web_worker.worker.internal_channels.sender.clone(); - let handle = web_worker.thread_safe_handle(); - ops::web_worker::init(&mut web_worker.worker, sender, handle); - ops::worker_host::init(&mut web_worker.worker); - ops::reg_json_sync( - &mut web_worker.worker, - "op_domain_to_ascii", - deno_web::op_domain_to_ascii, - ); - ops::io::init(&mut web_worker.worker); - ops::reg_json_sync( - &mut web_worker.worker, - "op_close", - deno_core::op_close, - ); - ops::reg_json_sync( - &mut web_worker.worker, - "op_resources", - deno_core::op_resources, - ); - ops::errors::init(&mut web_worker.worker); - ops::timers::init(&mut web_worker.worker); - ops::fetch::init(&mut web_worker.worker); - ops::websocket::init(&mut web_worker.worker); - - if has_deno_namespace { - ops::runtime_compiler::init(&mut web_worker.worker); - ops::fs::init(&mut web_worker.worker); - ops::fs_events::init(&mut web_worker.worker); - ops::plugin::init(&mut web_worker.worker); - ops::net::init(&mut web_worker.worker); - ops::tls::init(&mut web_worker.worker); - ops::os::init(&mut web_worker.worker); - ops::permissions::init(&mut web_worker.worker); - ops::process::init(&mut web_worker.worker); - ops::random::init(&mut web_worker.worker); - ops::signal::init(&mut web_worker.worker); - ops::tty::init(&mut web_worker.worker); - } - } - - web_worker - } -} - -impl WebWorker { - /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WebWorkerHandle { - self.handle.clone() - } -} - -impl Deref for WebWorker { - type Target = Worker; - fn deref(&self) -> &Self::Target { - &self.worker - } -} - -impl DerefMut for WebWorker { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker - } -} - -impl Future for WebWorker { - type Output = Result<(), AnyError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - let worker = &mut inner.worker; - - let terminated = inner.handle.terminated.load(Ordering::Relaxed); - - if terminated { - return Poll::Ready(Ok(())); - } - - if !inner.event_loop_idle { - match worker.poll_unpin(cx) { - Poll::Ready(r) => { - let terminated = inner.handle.terminated.load(Ordering::Relaxed); - if terminated { - return Poll::Ready(Ok(())); - } - - if let Err(e) = r { - let mut sender = worker.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - inner.event_loop_idle = true; - } - Poll::Pending => {} - } - } - - if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) { - // terminate_rx should never be closed - assert!(r.is_some()); - return Poll::Ready(Ok(())); - } - - if let Poll::Ready(r) = - worker.internal_channels.receiver.poll_next_unpin(cx) - { - match r { - Some(msg) => { - let msg = String::from_utf8(msg.to_vec()).unwrap(); - let script = format!("workerMessageRecvCallback({})", msg); - - if let Err(e) = worker.execute(&script) { - // If execution was terminated during message callback then - // just ignore it - if inner.handle.terminated.load(Ordering::Relaxed) { - return Poll::Ready(Ok(())); - } - - // Otherwise forward error to host - let mut sender = worker.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - - // Let event loop be polled again - inner.event_loop_idle = false; - worker.waker.wake(); - } - None => unreachable!(), - } - } - - Poll::Pending - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::tokio_util; - use crate::worker::WorkerEvent; - use deno_core::serde_json::json; - - fn create_test_worker() -> WebWorker { - let main_module = - ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap(); - let global_state = GlobalState::mock(vec!["deno".to_string()], None); - let mut worker = WebWorker::new( - "TEST".to_string(), - Permissions::allow_all(), - main_module, - global_state, - false, - ); - worker - .execute("bootstrap.workerRuntime(\"TEST\", false)") - .unwrap(); - worker - } - #[test] - fn test_worker_messages() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_worker(); - let source = r#" - onmessage = function(e) { - console.log("msg from main script", e.data); - if (e.data == "exit") { - return close(); - } else { - console.assert(e.data === "hi"); - } - postMessage([1, 2, 3]); - console.log("after postMessage"); - } - "#; - worker.execute(source).unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - tokio_util::run_basic(async move { - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - match maybe_msg { - Some(WorkerEvent::Message(buf)) => { - assert_eq!(*buf, *b"[1,2,3]"); - } - _ => unreachable!(), - } - - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); - let r = handle.post_message(msg); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - }); - join_handle.join().expect("Failed to join worker thread"); - } - - #[test] - fn removed_from_resource_table_on_close() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_worker(); - worker.execute("onmessage = () => { close(); }").unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - tokio_util::run_basic(async move { - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - }); - join_handle.join().expect("Failed to join worker thread"); - } -} diff --git a/cli/worker.rs b/cli/worker.rs index 2fc02c6eec..f6c518d0ca 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -7,10 +7,6 @@ use crate::js; use crate::metrics::Metrics; use crate::ops; use crate::ops::io::get_stdio; -use crate::ops::timers::GlobalTimer; -use crate::ops::timers::StartTime; -use crate::ops::worker_host::WorkerId; -use crate::ops::worker_host::WorkersTable; use crate::permissions::Permissions; use crate::state::CliModuleLoader; use deno_core::error::AnyError; @@ -19,20 +15,20 @@ use deno_core::futures::future::FutureExt; use deno_core::futures::stream::StreamExt; use deno_core::futures::task::AtomicWaker; use deno_core::url::Url; +use deno_core::v8; use deno_core::JsRuntime; use deno_core::ModuleId; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; use deno_core::Snapshot; -use deno_fetch::reqwest; -use rand::rngs::StdRng; -use rand::SeedableRng; use std::env; use std::future::Future; use std::ops::Deref; use std::ops::DerefMut; use std::pin::Pin; use std::rc::Rc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -111,18 +107,16 @@ pub struct Worker { impl Worker { pub fn new( name: String, - startup_snapshot: Option, - permissions: Permissions, - main_module: ModuleSpecifier, + startup_snapshot: Snapshot, global_state: Arc, - state: Rc, + module_loader: Rc, is_main: bool, ) -> Self { let global_state_ = global_state.clone(); let mut isolate = JsRuntime::new(RuntimeOptions { - module_loader: Some(state), - startup_snapshot, + module_loader: Some(module_loader), + startup_snapshot: Some(startup_snapshot), js_error_create_fn: Some(Box::new(move |core_js_error| { JsError::create(core_js_error, &global_state_.ts_compiler) })), @@ -132,25 +126,6 @@ impl Worker { let op_state = isolate.op_state(); let mut op_state = op_state.borrow_mut(); op_state.get_error_class_fn = &crate::errors::get_error_class_name; - - op_state.put::(GlobalTimer::default()); - op_state.put::(StartTime::now()); - op_state.put::(Default::default()); - op_state.put::(WorkersTable::default()); - op_state.put::(WorkerId::default()); - op_state.put::(permissions); - op_state.put::(main_module); - op_state.put::>(global_state.clone()); - - op_state.put::({ - let ca_file = global_state.flags.ca_file.as_deref(); - crate::http_util::create_http_client(ca_file).unwrap() - }); - - if let Some(seed) = global_state.flags.seed { - let rng = StdRng::seed_from_u64(seed); - op_state.put::(rng); - } } let inspector = @@ -296,41 +271,48 @@ impl MainWorker { let loader = CliModuleLoader::new(global_state.maybe_import_map.clone()); let mut worker = Worker::new( "main".to_string(), - Some(js::deno_isolate_init()), - global_state.permissions.clone(), - main_module, + js::deno_isolate_init(), global_state.clone(), loader, true, ); { - ops::runtime::init(&mut worker); - ops::runtime_compiler::init(&mut worker); - ops::errors::init(&mut worker); - ops::fetch::init(&mut worker); - ops::websocket::init(&mut worker); - ops::fs::init(&mut worker); - ops::fs_events::init(&mut worker); + // All ops registered in this function depend on these + { + let op_state = worker.op_state(); + let mut op_state = op_state.borrow_mut(); + op_state.put::(Default::default()); + op_state.put::>(global_state.clone()); + op_state.put::(global_state.permissions.clone()); + } + + ops::runtime::init(&mut worker, main_module); + ops::fetch::init(&mut worker, global_state.flags.ca_file.as_deref()); + ops::timers::init(&mut worker); + ops::worker_host::init(&mut worker); + ops::random::init(&mut worker, global_state.flags.seed); + ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close); + ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources); ops::reg_json_sync( &mut worker, "op_domain_to_ascii", deno_web::op_domain_to_ascii, ); + ops::errors::init(&mut worker); + ops::fs_events::init(&mut worker); + ops::fs::init(&mut worker); ops::io::init(&mut worker); - ops::plugin::init(&mut worker); ops::net::init(&mut worker); - ops::tls::init(&mut worker); ops::os::init(&mut worker); ops::permissions::init(&mut worker); + ops::plugin::init(&mut worker); ops::process::init(&mut worker); - ops::random::init(&mut worker); ops::repl::init(&mut worker); - ops::reg_json_sync(&mut worker, "op_close", deno_core::op_close); - ops::reg_json_sync(&mut worker, "op_resources", deno_core::op_resources); + ops::runtime_compiler::init(&mut worker); ops::signal::init(&mut worker); - ops::timers::init(&mut worker); + ops::tls::init(&mut worker); ops::tty::init(&mut worker); - ops::worker_host::init(&mut worker); + ops::websocket::init(&mut worker); } { let op_state = worker.op_state(); @@ -367,12 +349,257 @@ impl DerefMut for MainWorker { } } +/// Wrapper for `WorkerHandle` that adds functionality +/// for terminating workers. +/// +/// This struct is used by host as well as worker itself. +/// +/// Host uses it to communicate with worker and terminate it, +/// while worker uses it only to finish execution on `self.close()`. +#[derive(Clone)] +pub struct WebWorkerHandle { + worker_handle: WorkerHandle, + terminate_tx: mpsc::Sender<()>, + terminated: Arc, + isolate_handle: v8::IsolateHandle, +} + +impl Deref for WebWorkerHandle { + type Target = WorkerHandle; + fn deref(&self) -> &Self::Target { + &self.worker_handle + } +} + +impl DerefMut for WebWorkerHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.worker_handle + } +} + +impl WebWorkerHandle { + pub fn terminate(&self) { + // This function can be called multiple times by whomever holds + // the handle. However only a single "termination" should occur so + // we need a guard here. + let already_terminated = self.terminated.swap(true, Ordering::Relaxed); + + if !already_terminated { + self.isolate_handle.terminate_execution(); + let mut sender = self.terminate_tx.clone(); + // This call should be infallible hence the `expect`. + // This might change in the future. + sender.try_send(()).expect("Failed to terminate"); + } + } +} + +/// This worker is implementation of `Worker` Web API +/// +/// At the moment this type of worker supports only +/// communication with parent and creating new workers. +/// +/// Each `WebWorker` is either a child of `MainWorker` or other +/// `WebWorker`. +pub struct WebWorker { + worker: Worker, + event_loop_idle: bool, + terminate_rx: mpsc::Receiver<()>, + handle: WebWorkerHandle, + pub has_deno_namespace: bool, +} + +impl WebWorker { + pub fn new( + name: String, + permissions: Permissions, + main_module: ModuleSpecifier, + global_state: Arc, + has_deno_namespace: bool, + ) -> Self { + let loader = CliModuleLoader::new_for_worker(); + let mut worker = Worker::new( + name, + js::deno_isolate_init(), + global_state.clone(), + loader, + false, + ); + + let terminated = Arc::new(AtomicBool::new(false)); + let isolate_handle = worker.isolate.thread_safe_handle(); + let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); + + let handle = WebWorkerHandle { + worker_handle: worker.thread_safe_handle(), + terminated, + isolate_handle, + terminate_tx, + }; + + let mut web_worker = Self { + worker, + event_loop_idle: false, + terminate_rx, + handle, + has_deno_namespace, + }; + + { + let handle = web_worker.thread_safe_handle(); + let sender = web_worker.worker.internal_channels.sender.clone(); + + // All ops registered in this function depend on these + { + let op_state = web_worker.op_state(); + let mut op_state = op_state.borrow_mut(); + op_state.put::(Default::default()); + op_state.put::>(global_state.clone()); + op_state.put::(permissions); + } + + ops::web_worker::init(&mut web_worker, sender, handle); + ops::runtime::init(&mut web_worker, main_module); + ops::fetch::init(&mut web_worker, global_state.flags.ca_file.as_deref()); + ops::timers::init(&mut web_worker); + ops::worker_host::init(&mut web_worker); + ops::reg_json_sync(&mut web_worker, "op_close", deno_core::op_close); + ops::reg_json_sync( + &mut web_worker, + "op_resources", + deno_core::op_resources, + ); + ops::reg_json_sync( + &mut web_worker, + "op_domain_to_ascii", + deno_web::op_domain_to_ascii, + ); + ops::errors::init(&mut web_worker); + ops::io::init(&mut web_worker); + ops::websocket::init(&mut web_worker); + + if has_deno_namespace { + ops::fs_events::init(&mut web_worker); + ops::fs::init(&mut web_worker); + ops::net::init(&mut web_worker); + ops::os::init(&mut web_worker); + ops::permissions::init(&mut web_worker); + ops::plugin::init(&mut web_worker); + ops::process::init(&mut web_worker); + ops::random::init(&mut web_worker, global_state.flags.seed); + ops::runtime_compiler::init(&mut web_worker); + ops::signal::init(&mut web_worker); + ops::tls::init(&mut web_worker); + ops::tty::init(&mut web_worker); + } + } + + web_worker + } +} + +impl WebWorker { + /// Returns a way to communicate with the Worker from other threads. + pub fn thread_safe_handle(&self) -> WebWorkerHandle { + self.handle.clone() + } +} + +impl Deref for WebWorker { + type Target = Worker; + fn deref(&self) -> &Self::Target { + &self.worker + } +} + +impl DerefMut for WebWorker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.worker + } +} + +impl Future for WebWorker { + type Output = Result<(), AnyError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + let worker = &mut inner.worker; + + let terminated = inner.handle.terminated.load(Ordering::Relaxed); + + if terminated { + return Poll::Ready(Ok(())); + } + + if !inner.event_loop_idle { + match worker.poll_unpin(cx) { + Poll::Ready(r) => { + let terminated = inner.handle.terminated.load(Ordering::Relaxed); + if terminated { + return Poll::Ready(Ok(())); + } + + if let Err(e) = r { + let mut sender = worker.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); + } + inner.event_loop_idle = true; + } + Poll::Pending => {} + } + } + + if let Poll::Ready(r) = inner.terminate_rx.poll_next_unpin(cx) { + // terminate_rx should never be closed + assert!(r.is_some()); + return Poll::Ready(Ok(())); + } + + if let Poll::Ready(r) = + worker.internal_channels.receiver.poll_next_unpin(cx) + { + match r { + Some(msg) => { + let msg = String::from_utf8(msg.to_vec()).unwrap(); + let script = format!("workerMessageRecvCallback({})", msg); + + if let Err(e) = worker.execute(&script) { + // If execution was terminated during message callback then + // just ignore it + if inner.handle.terminated.load(Ordering::Relaxed) { + return Poll::Ready(Ok(())); + } + + // Otherwise forward error to host + let mut sender = worker.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); + } + + // Let event loop be polled again + inner.event_loop_idle = false; + worker.waker.wake(); + } + None => unreachable!(), + } + } + + Poll::Pending + } +} + #[cfg(test)] mod tests { use super::*; use crate::flags::DenoSubcommand; use crate::flags::Flags; use crate::global_state::GlobalState; + use crate::tokio_util; + use crate::worker::WorkerEvent; + use deno_core::serde_json::json; fn create_test_worker() -> MainWorker { let main_module = @@ -466,4 +693,105 @@ mod tests { let result = worker.execute_module(&module_specifier).await; assert!(result.is_ok()); } + + fn create_test_web_worker() -> WebWorker { + let main_module = + ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap(); + let global_state = GlobalState::mock(vec!["deno".to_string()], None); + let mut worker = WebWorker::new( + "TEST".to_string(), + Permissions::allow_all(), + main_module, + global_state, + false, + ); + worker + .execute("bootstrap.workerRuntime(\"TEST\", false)") + .unwrap(); + worker + } + #[tokio::test] + async fn test_worker_messages() { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::(1); + + let join_handle = std::thread::spawn(move || { + let mut worker = create_test_web_worker(); + let source = r#" + onmessage = function(e) { + console.log("msg from main script", e.data); + if (e.data == "exit") { + return close(); + } else { + console.assert(e.data === "hi"); + } + postMessage([1, 2, 3]); + console.log("after postMessage"); + } + "#; + worker.execute(source).unwrap(); + let handle = worker.thread_safe_handle(); + handle_sender.send(handle).unwrap(); + let r = tokio_util::run_basic(worker); + assert!(r.is_ok()) + }); + + let mut handle = handle_receiver.recv().unwrap(); + + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()); + assert!(r.is_ok()); + + let maybe_msg = handle.get_event().await.unwrap(); + assert!(maybe_msg.is_some()); + + let r = handle.post_message(msg.clone()); + assert!(r.is_ok()); + + let maybe_msg = handle.get_event().await.unwrap(); + assert!(maybe_msg.is_some()); + match maybe_msg { + Some(WorkerEvent::Message(buf)) => { + assert_eq!(*buf, *b"[1,2,3]"); + } + _ => unreachable!(), + } + + let msg = json!("exit") + .to_string() + .into_boxed_str() + .into_boxed_bytes(); + let r = handle.post_message(msg); + assert!(r.is_ok()); + let event = handle.get_event().await.unwrap(); + assert!(event.is_none()); + handle.sender.close_channel(); + join_handle.join().expect("Failed to join worker thread"); + } + + #[tokio::test] + async fn removed_from_resource_table_on_close() { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::(1); + + let join_handle = std::thread::spawn(move || { + let mut worker = create_test_web_worker(); + worker.execute("onmessage = () => { close(); }").unwrap(); + let handle = worker.thread_safe_handle(); + handle_sender.send(handle).unwrap(); + let r = tokio_util::run_basic(worker); + assert!(r.is_ok()) + }); + + let mut handle = handle_receiver.recv().unwrap(); + + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()); + assert!(r.is_ok()); + let event = handle.get_event().await.unwrap(); + assert!(event.is_none()); + handle.sender.close_channel(); + + join_handle.join().expect("Failed to join worker thread"); + } }