From c0ccbcdaeee04407b2198557cdc55ee4adf1ee7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Mon, 7 Dec 2020 04:30:40 +0100 Subject: [PATCH] refactor(cli): Reorganize worker code, use stronger memory ordering (#8638) --- cli/ops/websocket.rs | 4 +- cli/ops/worker_host.rs | 286 +++++++++++++---------------------------- cli/web_worker.rs | 180 ++++++++++++++++++-------- cli/worker.rs | 3 +- 4 files changed, 219 insertions(+), 254 deletions(-) diff --git a/cli/ops/websocket.rs b/cli/ops/websocket.rs index 40f5fd53bf..c04c3b476b 100644 --- a/cli/ops/websocket.rs +++ b/cli/ops/websocket.rs @@ -93,8 +93,8 @@ pub async fn op_ws_create( } let ca_file = { - let cli_state = super::global_state2(&state); - cli_state.flags.ca_file.clone() + let program_state = super::global_state2(&state); + program_state.flags.ca_file.clone() }; let uri: Uri = args.url.parse()?; let mut request = Request::builder().method(Method::GET).uri(&uri); diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index c464e6df2f..6a2d799682 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -1,10 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use crate::colors; -use crate::ops::io::get_stdio; use crate::permissions::Permissions; -use crate::program_state::ProgramState; -use crate::tokio_util::create_basic_runtime; +use crate::web_worker::run_web_worker; use crate::web_worker::WebWorker; use crate::web_worker::WebWorkerHandle; use crate::web_worker::WorkerEvent; @@ -12,7 +9,6 @@ use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::error::JsError; use deno_core::futures::channel::mpsc; -use deno_core::futures::future::FutureExt; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::serde_json::Value; @@ -25,7 +21,6 @@ use std::cell::RefCell; use std::collections::HashMap; use std::convert::From; use std::rc::Rc; -use std::sync::Arc; use std::thread::JoinHandle; #[derive(Deserialize)] @@ -68,152 +63,14 @@ pub fn init( ); } -pub type WorkersTable = HashMap, WebWorkerHandle)>; +pub struct WorkerThread { + join_handle: JoinHandle>, + worker_handle: WebWorkerHandle, +} + +pub type WorkersTable = HashMap; pub type WorkerId = u32; -fn create_web_worker( - worker_id: u32, - name: String, - program_state: &Arc, - permissions: Permissions, - specifier: ModuleSpecifier, - has_deno_namespace: bool, -) -> Result { - let mut worker = WebWorker::new( - name.clone(), - permissions, - specifier, - program_state.clone(), - has_deno_namespace, - ); - - if has_deno_namespace { - let state = worker.js_runtime.op_state(); - let mut state = state.borrow_mut(); - let (stdin, stdout, stderr) = get_stdio(); - if let Some(stream) = stdin { - state.resource_table.add("stdin", Box::new(stream)); - } - if let Some(stream) = stdout { - state.resource_table.add("stdout", Box::new(stream)); - } - if let Some(stream) = stderr { - state.resource_table.add("stderr", Box::new(stream)); - } - } - - // Instead of using name for log we use `worker-${id}` because - // WebWorkers can have empty string as name. - let script = format!( - "bootstrap.workerRuntime(\"{}\", {}, \"worker-{}\")", - name, worker.has_deno_namespace, worker_id - ); - worker.execute(&script)?; - - Ok(worker) -} - -// TODO(bartlomieju): check if order of actions is aligned to Worker spec -fn run_worker_thread( - worker_id: u32, - name: String, - program_state: &Arc, - permissions: Permissions, - specifier: ModuleSpecifier, - has_deno_namespace: bool, - maybe_source_code: Option, -) -> Result<(JoinHandle<()>, WebWorkerHandle), AnyError> { - let program_state = program_state.clone(); - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::>(1); - - let builder = - std::thread::Builder::new().name(format!("deno-worker-{}", worker_id)); - let join_handle = builder.spawn(move || { - // Any error inside this block is terminal: - // - JS worker is useless - meaning it throws an exception and can't do anything else, - // all action done upon it should be noops - // - newly spawned thread exits - let result = create_web_worker( - worker_id, - name, - &program_state, - permissions, - specifier.clone(), - has_deno_namespace, - ); - - if let Err(err) = result { - handle_sender.send(Err(err)).unwrap(); - return; - } - - let mut worker = result.unwrap(); - let name = worker.name.to_string(); - // Send thread safe handle to newly created worker to host thread - handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); - drop(handle_sender); - - // At this point the only method of communication with host - // is using `worker.internal_channels`. - // - // Host can already push messages and interact with worker. - // - // Next steps: - // - create tokio runtime - // - load provided module or code - // - start driving worker's event loop - - let mut rt = create_basic_runtime(); - - // TODO: run with using select with terminate - - // Execute provided source code immediately - let result = if let Some(source_code) = maybe_source_code { - worker.execute(&source_code) - } else { - // TODO(bartlomieju): add "type": "classic", ie. ability to load - // script instead of module - let load_future = worker.execute_module(&specifier).boxed_local(); - - rt.block_on(load_future) - }; - - let mut sender = worker.internal_channels.sender.clone(); - - // If sender is closed it means that worker has already been closed from - // within using "globalThis.close()" - if sender.is_closed() { - return; - } - - if let Err(e) = result { - eprintln!( - "{}: Uncaught (in worker \"{}\") {}", - colors::red_bold("error"), - name, - e.to_string().trim_start_matches("Uncaught "), - ); - sender - .try_send(WorkerEvent::TerminalError(e)) - .expect("Failed to post message to host"); - - // Failure to execute script is a terminal error, bye, bye. - return; - } - - // TODO(bartlomieju): this thread should return result of event loop - // that means that we should store JoinHandle to thread to ensure - // that it actually terminates. - rt.block_on(worker.run_event_loop()) - .expect("Panic in event loop"); - debug!("Worker thread shuts down {}", &name); - })?; - - let worker_handle = handle_receiver.recv().unwrap()?; - Ok((join_handle, worker_handle)) -} - #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct CreateWorkerArgs { @@ -249,22 +106,53 @@ fn op_create_worker( let module_specifier = ModuleSpecifier::resolve_url(&specifier)?; let worker_name = args_name.unwrap_or_else(|| "".to_string()); - let cli_state = super::program_state(state); + let program_state = super::program_state(state); + + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::>(1); + + // Setup new thread + let thread_builder = + std::thread::Builder::new().name(format!("deno-worker-{}", worker_id)); + + // Spawn it + let join_handle = thread_builder.spawn(move || { + // Any error inside this block is terminal: + // - JS worker is useless - meaning it throws an exception and can't do anything else, + // all action done upon it should be noops + // - newly spawned thread exits + let worker = WebWorker::new( + worker_name, + permissions, + module_specifier.clone(), + program_state, + use_deno_namespace, + worker_id, + ); + + // Send thread safe handle to newly created worker to host thread + handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); + drop(handle_sender); + + // At this point the only method of communication with host + // is using `worker.internal_channels`. + // + // Host can already push messages and interact with worker. + run_web_worker(worker, module_specifier, maybe_source_code) + })?; + + let worker_handle = handle_receiver.recv().unwrap()?; + + let worker_thread = WorkerThread { + join_handle, + worker_handle, + }; - let (join_handle, worker_handle) = run_worker_thread( - worker_id, - worker_name, - &cli_state, - permissions, - module_specifier, - use_deno_namespace, - maybe_source_code, - )?; // At this point all interactions with worker happen using thread - // safe handler returned from previous function call + // safe handler returned from previous function calls state .borrow_mut::() - .insert(worker_id, (join_handle, worker_handle)); + .insert(worker_id, worker_thread); Ok(json!({ "id": worker_id })) } @@ -281,12 +169,16 @@ fn op_host_terminate_worker( ) -> Result { let args: WorkerArgs = serde_json::from_value(args)?; let id = args.id as u32; - let (join_handle, worker_handle) = state + let worker_thread = state .borrow_mut::() .remove(&id) .expect("No worker handle found"); - worker_handle.terminate(); - join_handle.join().expect("Panic in worker thread"); + worker_thread.worker_handle.terminate(); + worker_thread + .join_handle + .join() + .expect("Panic in worker thread") + .expect("Panic in worker event loop"); Ok(json!({})) } @@ -330,6 +222,22 @@ fn serialize_worker_event(event: WorkerEvent) -> Value { } } +/// Try to remove worker from workers table - NOTE: `Worker.terminate()` +/// might have been called already meaning that we won't find worker in +/// table - in that case ignore. +fn try_remove_and_close(state: Rc>, id: u32) { + let mut s = state.borrow_mut(); + let workers = s.borrow_mut::(); + if let Some(mut worker_thread) = workers.remove(&id) { + worker_thread.worker_handle.sender.close_channel(); + worker_thread + .join_handle + .join() + .expect("Worker thread panicked") + .expect("Panic in worker event loop"); + } +} + /// Get message from guest worker as host async fn op_host_get_message( state: Rc>, @@ -344,41 +252,25 @@ async fn op_host_get_message( let workers_table = s.borrow::(); let maybe_handle = workers_table.get(&id); if let Some(handle) = maybe_handle { - handle.1.clone() + handle.worker_handle.clone() } else { // If handle was not found it means worker has already shutdown return Ok(json!({ "type": "close" })); } }; - let response = match worker_handle.get_event().await? { - Some(event) => { - // Terminal error means that worker should be removed from worker table. - if let WorkerEvent::TerminalError(_) = &event { - let mut s = state.borrow_mut(); - if let Some((join_handle, mut worker_handle)) = - s.borrow_mut::().remove(&id) - { - worker_handle.sender.close_channel(); - join_handle.join().expect("Worker thread panicked"); - }; - } - serialize_worker_event(event) + let maybe_event = worker_handle.get_event().await?; + if let Some(event) = maybe_event { + // Terminal error means that worker should be removed from worker table. + if let WorkerEvent::TerminalError(_) = &event { + try_remove_and_close(state, id); } - None => { - // Worker shuts down - let mut s = state.borrow_mut(); - let workers = s.borrow_mut::(); - // Try to remove worker from workers table - NOTE: `Worker.terminate()` might have been called - // already meaning that we won't find worker in table - in that case ignore. - if let Some((join_handle, mut worker_handle)) = workers.remove(&id) { - worker_handle.sender.close_channel(); - join_handle.join().expect("Worker thread panicked"); - } - json!({ "type": "close" }) - } - }; - Ok(response) + return Ok(serialize_worker_event(event)); + } + + // If there was no event from worker it means it has already been closed. + try_remove_and_close(state, id); + Ok(json!({ "type": "close" })) } /// Post message to guest worker as host @@ -393,8 +285,10 @@ fn op_host_post_message( let msg = Vec::from(&*data[0]).into_boxed_slice(); debug!("post message to worker {}", id); - let workers = state.borrow::(); - let worker_handle = workers[&id].1.clone(); - worker_handle.post_message(msg)?; + let worker_thread = state + .borrow::() + .get(&id) + .expect("No worker handle found"); + worker_thread.worker_handle.post_message(msg)?; Ok(json!({})) } diff --git a/cli/web_worker.rs b/cli/web_worker.rs index 12b79cb2d6..ddce8666e7 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -10,6 +10,7 @@ use crate::ops; use crate::permissions::Permissions; use crate::program_state::ProgramState; use crate::source_maps::apply_source_map; +use crate::tokio_util::create_basic_runtime; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; @@ -77,7 +78,7 @@ impl WebWorkerHandle { // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. - let already_terminated = self.terminated.swap(true, Ordering::Relaxed); + let already_terminated = self.terminated.swap(true, Ordering::SeqCst); if !already_terminated { self.isolate_handle.terminate_execution(); @@ -134,6 +135,7 @@ impl WebWorker { main_module: ModuleSpecifier, program_state: Arc, has_deno_namespace: bool, + worker_id: u32, ) -> Self { let module_loader = CliModuleLoader::new_for_worker(); let global_state_ = program_state.clone(); @@ -173,7 +175,7 @@ impl WebWorker { inspector, internal_channels, js_runtime, - name, + name: name.clone(), waker: AtomicWaker::new(), event_loop_idle: false, terminate_rx, @@ -223,9 +225,32 @@ impl WebWorker { ops::signal::init(js_runtime); ops::tls::init(js_runtime); ops::tty::init(js_runtime); + + let op_state = js_runtime.op_state(); + let mut op_state = op_state.borrow_mut(); + let (stdin, stdout, stderr) = ops::io::get_stdio(); + if let Some(stream) = stdin { + op_state.resource_table.add("stdin", Box::new(stream)); + } + if let Some(stream) = stdout { + op_state.resource_table.add("stdout", Box::new(stream)); + } + if let Some(stream) = stderr { + op_state.resource_table.add("stderr", Box::new(stream)); + } } } + // Instead of using name for log we use `worker-${id}` because + // WebWorkers can have empty string as name. + let script = format!( + "bootstrap.workerRuntime(\"{}\", {}, \"worker-{}\")", + name, worker.has_deno_namespace, worker_id + ); + worker + .execute(&script) + .expect("Failed to execute worker bootstrap script"); + worker } @@ -250,13 +275,15 @@ impl WebWorker { self.handle.clone() } + pub fn has_been_terminated(&self) -> bool { + self.handle.terminated.load(Ordering::SeqCst) + } + pub fn poll_event_loop( &mut self, cx: &mut Context, ) -> Poll> { - let terminated = self.handle.terminated.load(Ordering::Relaxed); - - if terminated { + if self.has_been_terminated() { return Poll::Ready(Ok(())); } @@ -267,28 +294,20 @@ impl WebWorker { self.waker.register(cx.waker()); self.js_runtime.poll_event_loop(cx) }; - match poll_result { - Poll::Ready(r) => { - let terminated = self.handle.terminated.load(Ordering::Relaxed); - if terminated { - return Poll::Ready(Ok(())); - } - if let Err(e) = r { - eprintln!( - "{}: Uncaught (in worker \"{}\") {}", - colors::red_bold("error"), - self.name.to_string(), - e.to_string().trim_start_matches("Uncaught "), - ); - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - self.event_loop_idle = true; + if let Poll::Ready(r) = poll_result { + if self.has_been_terminated() { + return Poll::Ready(Ok(())); } - Poll::Pending => {} + + if let Err(e) = r { + print_worker_error(e.to_string(), &self.name); + let mut sender = self.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); + } + self.event_loop_idle = true; } } @@ -298,33 +317,32 @@ impl WebWorker { return Poll::Ready(Ok(())); } - if let Poll::Ready(r) = self.internal_channels.receiver.poll_next_unpin(cx) - { - match r { - Some(msg) => { - let msg = String::from_utf8(msg.to_vec()).unwrap(); - let script = format!("workerMessageRecvCallback({})", msg); + let maybe_msg_poll_result = + self.internal_channels.receiver.poll_next_unpin(cx); - if let Err(e) = self.execute(&script) { - // If execution was terminated during message callback then - // just ignore it - if self.handle.terminated.load(Ordering::Relaxed) { - return Poll::Ready(Ok(())); - } + if let Poll::Ready(maybe_msg) = maybe_msg_poll_result { + let msg = + maybe_msg.expect("Received `None` instead of message in worker"); + let msg = String::from_utf8(msg.to_vec()).unwrap(); + let script = format!("workerMessageRecvCallback({})", msg); - // Otherwise forward error to host - let mut sender = self.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - - // Let event loop be polled again - self.event_loop_idle = false; - self.waker.wake(); + if let Err(e) = self.execute(&script) { + // If execution was terminated during message callback then + // just ignore it + if self.has_been_terminated() { + return Poll::Ready(Ok(())); } - None => unreachable!(), + + // Otherwise forward error to host + let mut sender = self.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); } + + // Let event loop be polled again + self.event_loop_idle = false; + self.waker.wake(); } Poll::Pending @@ -343,6 +361,63 @@ impl Drop for WebWorker { } } +fn print_worker_error(error_str: String, name: &str) { + eprintln!( + "{}: Uncaught (in worker \"{}\") {}", + colors::red_bold("error"), + name, + error_str.trim_start_matches("Uncaught "), + ); +} + +/// This function should be called from a thread dedicated to this worker. +// TODO(bartlomieju): check if order of actions is aligned to Worker spec +pub fn run_web_worker( + mut worker: WebWorker, + specifier: ModuleSpecifier, + maybe_source_code: Option, +) -> Result<(), AnyError> { + let name = worker.name.to_string(); + + let mut rt = create_basic_runtime(); + + // TODO(bartlomieju): run following block using "select!" + // with terminate + + // Execute provided source code immediately + let result = if let Some(source_code) = maybe_source_code { + worker.execute(&source_code) + } else { + // TODO(bartlomieju): add "type": "classic", ie. ability to load + // script instead of module + let load_future = worker.execute_module(&specifier).boxed_local(); + + rt.block_on(load_future) + }; + + let mut sender = worker.internal_channels.sender.clone(); + + // If sender is closed it means that worker has already been closed from + // within using "globalThis.close()" + if sender.is_closed() { + return Ok(()); + } + + if let Err(e) = result { + print_worker_error(e.to_string(), &name); + sender + .try_send(WorkerEvent::TerminalError(e)) + .expect("Failed to post message to host"); + + // Failure to execute script is a terminal error, bye, bye. + return Ok(()); + } + + let result = rt.block_on(worker.run_event_loop()); + debug!("Worker thread shuts down {}", &name); + result +} + #[cfg(test)] mod tests { use super::*; @@ -354,17 +429,14 @@ mod tests { let main_module = ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap(); let program_state = ProgramState::mock(vec!["deno".to_string()], None); - let mut worker = WebWorker::new( + WebWorker::new( "TEST".to_string(), Permissions::allow_all(), main_module, program_state, false, - ); - worker - .execute("bootstrap.workerRuntime(\"TEST\", false)") - .unwrap(); - worker + 1, + ) } #[tokio::test] diff --git a/cli/worker.rs b/cli/worker.rs index 3068ab1f79..c2ed8871b4 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -7,7 +7,6 @@ use crate::js; use crate::metrics::Metrics; use crate::module_loader::CliModuleLoader; use crate::ops; -use crate::ops::io::get_stdio; use crate::permissions::Permissions; use crate::program_state::ProgramState; use crate::source_maps::apply_source_map; @@ -148,7 +147,7 @@ impl MainWorker { let op_state = js_runtime.op_state(); let mut op_state = op_state.borrow_mut(); let t = &mut op_state.resource_table; - let (stdin, stdout, stderr) = get_stdio(); + let (stdin, stdout, stderr) = ops::io::get_stdio(); if let Some(stream) = stdin { t.add("stdin", Box::new(stream)); }