From b0a23beb8fae964be3cdd8c23c38af66257d34c7 Mon Sep 17 00:00:00 2001 From: andy finch Date: Mon, 1 Apr 2019 15:09:59 -0400 Subject: [PATCH] Add web worker JS API (#1993) * Refactored the way worker polling is scheduled and errors are handled. * Share the worker future as a Shared --- cli/compiler.rs | 304 ++++++++++++++---- cli/errors.rs | 10 +- cli/isolate.rs | 4 +- cli/isolate_state.rs | 15 +- cli/main.rs | 2 +- cli/msg.fbs | 38 +++ cli/ops.rs | 160 ++++++++- cli/resources.rs | 5 +- cli/startup_data.rs | 2 +- cli/workers.rs | 295 ++++++++++++----- core/js_errors.rs | 4 +- js/assets.ts | 16 +- js/compiler.ts | 26 +- js/globals.ts | 9 + js/main.ts | 4 +- js/workers.ts | 153 ++++++++- tests/026_workers.test | 2 + tests/026_workers.ts | 14 + tests/026_workers.ts.out | 4 + ...dule.disabled => error_004_missing_module} | 0 ...abled => error_005_missing_dynamic_import} | 0 ....disabled => error_006_import_ext_failure} | 0 tests/subdir/test_worker.js | 7 + tests/subdir/test_worker.ts | 7 + tools/ts_library_builder/ast_util.ts | 5 +- tools/ts_library_builder/build_library.ts | 166 +++++++--- tools/ts_library_builder/main.ts | 1 + tools/ts_library_builder/test.ts | 17 +- 28 files changed, 1013 insertions(+), 257 deletions(-) create mode 100644 tests/026_workers.test create mode 100644 tests/026_workers.ts create mode 100644 tests/026_workers.ts.out rename tests/{error_004_missing_module.disabled => error_004_missing_module} (100%) rename tests/{error_005_missing_dynamic_import.disabled => error_005_missing_dynamic_import} (100%) rename tests/{error_006_import_ext_failure.disabled => error_006_import_ext_failure} (100%) create mode 100644 tests/subdir/test_worker.js create mode 100644 tests/subdir/test_worker.ts diff --git a/cli/compiler.rs b/cli/compiler.rs index 4613aff995..bd0a763c1b 100644 --- a/cli/compiler.rs +++ b/cli/compiler.rs @@ -1,26 +1,53 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use core::ops::Deref; +use crate::flags::DenoFlags; use crate::isolate_state::*; +use crate::js_errors::JSErrorColor; use crate::msg; use crate::ops; use crate::resources; -use crate::resources::Resource; use crate::resources::ResourceId; use crate::startup_data; use crate::workers; use crate::workers::WorkerBehavior; +use crate::workers::WorkerInit; use deno::deno_buf; use deno::Behavior; use deno::Buf; +use deno::JSError; use deno::Op; use deno::StartupData; +use futures::future::*; +use futures::sync::oneshot; use futures::Future; use serde_json; use std::str; use std::sync::Arc; use std::sync::Mutex; +use tokio::runtime::Runtime; + +/// Used for normalization of types on internal future completions +type CompilerInnerResult = Result>; +type WorkerErrReceiver = oneshot::Receiver; + +/// Shared resources for used to complete compiler operations. +/// rid is the resource id for compiler worker resource used for sending it +/// compile requests +/// worker_err_receiver is a shared future that will compelete when the +/// compiler worker future completes, and send back an error if present +/// or a None if not +#[derive(Clone)] +struct CompilerShared { + pub rid: ResourceId, + pub worker_err_receiver: Shared, +} lazy_static! { - static ref C_RID: Mutex> = Mutex::new(None); + // Shared worker resources so we can spawn + static ref C_SHARED: Mutex> = Mutex::new(None); + // tokio runtime specifically for spawning logic that is dependent on + // completetion of the compiler worker future + static ref C_RUNTIME: Mutex = Mutex::new(Runtime::new().unwrap()); } pub struct CompilerBehavior { @@ -28,8 +55,10 @@ pub struct CompilerBehavior { } impl CompilerBehavior { - pub fn new(state: Arc) -> Self { - Self { state } + pub fn new(flags: DenoFlags, argv_rest: Vec) -> Self { + Self { + state: Arc::new(IsolateState::new(flags, argv_rest, None, true)), + } } } @@ -65,13 +94,14 @@ impl WorkerBehavior for CompilerBehavior { self.state.flags.clone(), self.state.argv.clone(), Some(worker_channels), + true, )); } } // This corresponds to JS ModuleMetaData. // TODO Rename one or the other so they correspond. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ModuleMetaData { pub module_name: String, pub filename: String, @@ -102,26 +132,60 @@ impl ModuleMetaData { } } -fn lazy_start(parent_state: Arc) -> Resource { - let mut cell = C_RID.lock().unwrap(); - let rid = cell.get_or_insert_with(|| { - let resource = workers::spawn( - CompilerBehavior::new(Arc::new(IsolateState::new( - parent_state.flags.clone(), - parent_state.argv.clone(), - None, - ))), - "compilerMain()".to_string(), - ); - resource.rid - }); - Resource { rid: *rid } +fn lazy_start(parent_state: Arc) -> CompilerShared { + let mut cell = C_SHARED.lock().unwrap(); + cell + .get_or_insert_with(|| { + let worker_result = workers::spawn( + CompilerBehavior::new( + parent_state.flags.clone(), + parent_state.argv.clone(), + ), + "TS", + WorkerInit::Script("compilerMain()".to_string()), + ); + match worker_result { + Ok(worker) => { + let rid = worker.resource.rid.clone(); + // create oneshot channels and use the sender to pass back + // results from worker future + let (err_sender, err_receiver) = + oneshot::channel::(); + let mut runtime = C_RUNTIME.lock().unwrap(); + runtime.spawn(lazy(move || { + let resource = worker.resource.clone(); + worker.then(move |result| -> Result<(), ()> { + resource.close(); + match result { + Err(err) => err_sender.send(Err(Some(err))).unwrap(), + _ => err_sender.send(Err(None)).unwrap(), + }; + Ok(()) + }) + })); + CompilerShared { + rid, + worker_err_receiver: err_receiver.shared(), + } + } + Err(err) => { + println!("{}", err.to_string()); + std::process::exit(1); + } + } + }).clone() } -fn req(specifier: &str, referrer: &str) -> Buf { +fn show_compiler_error(err: JSError) -> ModuleMetaData { + eprintln!("{}", JSErrorColor(&err).to_string()); + std::process::exit(1); +} + +fn req(specifier: &str, referrer: &str, is_worker_main: bool) -> Buf { json!({ "specifier": specifier, "referrer": referrer, + "isWorker": is_worker_main }).to_string() .into_boxed_str() .into_boxed_bytes() @@ -133,70 +197,172 @@ pub fn compile_sync( referrer: &str, module_meta_data: &ModuleMetaData, ) -> ModuleMetaData { - let req_msg = req(specifier, referrer); + let is_worker = parent_state.is_worker.clone(); + let shared = lazy_start(parent_state); - let compiler = lazy_start(parent_state); + let (local_sender, local_receiver) = + oneshot::channel::>>(); - let send_future = resources::worker_post_message(compiler.rid, req_msg); - send_future.wait().unwrap(); + // Just some extra scoping to keep things clean + { + let compiler_rid = shared.rid.clone(); + let module_meta_data_ = module_meta_data.clone(); + let req_msg = req(specifier, referrer, is_worker); + let sender_arc = Arc::new(Some(local_sender)); + let specifier_ = specifier.clone().to_string(); + let referrer_ = referrer.clone().to_string(); - let recv_future = resources::worker_recv_message(compiler.rid); - let result = recv_future.wait().unwrap(); - assert!(result.is_some()); - let res_msg = result.unwrap(); + let mut runtime = C_RUNTIME.lock().unwrap(); + runtime.spawn(lazy(move || { + debug!( + "Running rust part of compile_sync specifier: {} referrer: {}", + specifier_, referrer_ + ); + let mut send_sender_arc = sender_arc.clone(); + resources::post_message_to_worker(compiler_rid, req_msg) + .map_err(move |_| { + let sender = Arc::get_mut(&mut send_sender_arc).unwrap().take(); + sender.unwrap().send(Err(None)).unwrap() + }).and_then(move |_| { + debug!( + "Sent message to worker specifier: {} referrer: {}", + specifier_, referrer_ + ); + let mut get_sender_arc = sender_arc.clone(); + let mut result_sender_arc = sender_arc.clone(); + resources::get_message_from_worker(compiler_rid) + .map_err(move |_| { + let sender = Arc::get_mut(&mut get_sender_arc).unwrap().take(); + sender.unwrap().send(Err(None)).unwrap() + }).and_then(move |res_msg_option| -> Result<(), ()> { + debug!( + "Recieved message from worker specifier: {} referrer: {}", + specifier_, referrer_ + ); + let res_msg = res_msg_option.unwrap(); + let res_json = std::str::from_utf8(&res_msg).unwrap(); + let sender = Arc::get_mut(&mut result_sender_arc).unwrap().take(); + let sender = sender.unwrap(); + Ok( + sender + .send(Ok(match serde_json::from_str::( + res_json, + ) { + Ok(serde_json::Value::Object(map)) => ModuleMetaData { + module_name: module_meta_data_.module_name.clone(), + filename: module_meta_data_.filename.clone(), + media_type: module_meta_data_.media_type, + source_code: module_meta_data_.source_code.clone(), + maybe_output_code: match map["outputCode"].as_str() { + Some(str) => Some(str.as_bytes().to_owned()), + _ => None, + }, + maybe_output_code_filename: None, + maybe_source_map: match map["sourceMap"].as_str() { + Some(str) => Some(str.as_bytes().to_owned()), + _ => None, + }, + maybe_source_map_filename: None, + }, + _ => panic!("error decoding compiler response"), + })).unwrap(), + ) + }) + }) + })); + } - let res_json = std::str::from_utf8(&res_msg).unwrap(); - match serde_json::from_str::(res_json) { - Ok(serde_json::Value::Object(map)) => ModuleMetaData { - module_name: module_meta_data.module_name.clone(), - filename: module_meta_data.filename.clone(), - media_type: module_meta_data.media_type, - source_code: module_meta_data.source_code.clone(), - maybe_output_code: match map["outputCode"].as_str() { - Some(str) => Some(str.as_bytes().to_owned()), - _ => None, - }, - maybe_output_code_filename: None, - maybe_source_map: match map["sourceMap"].as_str() { - Some(str) => Some(str.as_bytes().to_owned()), - _ => None, - }, - maybe_source_map_filename: None, - }, - _ => panic!("error decoding compiler response"), + let worker_receiver = shared.worker_err_receiver.clone(); + + let union = + futures::future::select_all(vec![worker_receiver, local_receiver.shared()]); + + match union.wait() { + Ok((result, i, rest)) => { + // We got a sucessful finish before any recivers where canceled + let mut rest_mut = rest; + match ((*result.deref()).clone(), i) { + // Either receiver was completed with success. + (Ok(v), _) => v, + // Either receiver was completed with a valid error + // this should be fatal for now since it is not intended + // to be possible to recover from a uncaught error in a isolate + (Err(Some(err)), _) => show_compiler_error(err), + // local_receiver finished first with a none error. This is intended + // to catch when the local logic can't complete because it is unable + // to send and/or receive messages from the compiler worker. + // Due to the way that scheduling works it is very likely that the + // compiler worker future has already or will in the near future + // complete with a valid JSError or a None. + (Err(None), 1) => { + debug!("Compiler local exited with None error!"); + // While technically possible to get stuck here indefinately + // in theory it is highly unlikely. + debug!( + "Waiting on compiler worker result specifier: {} referrer: {}!", + specifier, referrer + ); + let worker_result = + (*rest_mut.remove(0).wait().unwrap().deref()).clone(); + debug!( + "Finished waiting on worker result specifier: {} referrer: {}!", + specifier, referrer + ); + match worker_result { + Err(Some(err)) => show_compiler_error(err), + Err(None) => panic!("Compiler exit for an unknown reason!"), + Ok(v) => v, + } + } + // While possible beccause the compiler worker can exit without error + // this shouldn't occurr normally and I don't intend to attempt to + // handle it right now + (_, i) => panic!("Odd compiler result for future {}!", i), + } + } + // This should always a result of a reciver being cancled + // in theory but why not give a print out just in case + Err((err, i, _)) => panic!("compile_sync {} failed: {}", i, err), } } #[cfg(test)] mod tests { use super::*; + use crate::tokio_util; #[test] fn test_compile_sync() { - let cwd = std::env::current_dir().unwrap(); - let cwd_string = cwd.to_str().unwrap().to_owned(); + tokio_util::init(|| { + let cwd = std::env::current_dir().unwrap(); + let cwd_string = cwd.to_str().unwrap().to_owned(); - let specifier = "./tests/002_hello.ts"; - let referrer = cwd_string + "/"; + let specifier = "./tests/002_hello.ts"; + let referrer = cwd_string + "/"; - let mut out = ModuleMetaData { - module_name: "xxx".to_owned(), - filename: "/tests/002_hello.ts".to_owned(), - media_type: msg::MediaType::TypeScript, - source_code: "console.log(\"Hello World\");".as_bytes().to_owned(), - maybe_output_code_filename: None, - maybe_output_code: None, - maybe_source_map_filename: None, - maybe_source_map: None, - }; + let mut out = ModuleMetaData { + module_name: "xxx".to_owned(), + filename: "/tests/002_hello.ts".to_owned(), + media_type: msg::MediaType::TypeScript, + source_code: include_bytes!("../tests/002_hello.ts").to_vec(), + maybe_output_code_filename: None, + maybe_output_code: None, + maybe_source_map_filename: None, + maybe_source_map: None, + }; - out = - compile_sync(Arc::new(IsolateState::mock()), specifier, &referrer, &out); - assert!( - out - .maybe_output_code - .unwrap() - .starts_with("console.log(\"Hello World\");".as_bytes()) - ); + out = compile_sync( + Arc::new(IsolateState::mock()), + specifier, + &referrer, + &out, + ); + assert!( + out + .maybe_output_code + .unwrap() + .starts_with("console.log(\"Hello World\");".as_bytes()) + ); + }); } } diff --git a/cli/errors.rs b/cli/errors.rs index a2c3c34416..3873f70ff6 100644 --- a/cli/errors.rs +++ b/cli/errors.rs @@ -180,7 +180,15 @@ pub fn permission_denied() -> DenoError { } pub fn op_not_implemented() -> DenoError { - new(ErrorKind::BadResource, String::from("op not implemented")) + new(ErrorKind::OpNotAvaiable, String::from("op not implemented")) +} + +pub fn worker_init_failed() -> DenoError { + // TODO(afinch7) pass worker error data through here + new( + ErrorKind::WorkerInitFailed, + String::from("worker init failed"), + ) } #[derive(Debug)] diff --git a/cli/isolate.rs b/cli/isolate.rs index 15ad4e125a..ced1cb7926 100644 --- a/cli/isolate.rs +++ b/cli/isolate.rs @@ -226,7 +226,7 @@ mod tests { let argv = vec![String::from("./deno"), filename.clone()]; let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv, None)); + let state = Arc::new(IsolateState::new(flags, rest_argv, None, false)); let state_ = state.clone(); tokio_util::run(lazy(move || { let cli = CliBehavior::new(None, state.clone()); @@ -249,7 +249,7 @@ mod tests { let argv = vec![String::from("./deno"), filename.clone()]; let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv, None)); + let state = Arc::new(IsolateState::new(flags, rest_argv, None, false)); let state_ = state.clone(); tokio_util::run(lazy(move || { let cli = CliBehavior::new(None, state.clone()); diff --git a/cli/isolate_state.rs b/cli/isolate_state.rs index 9f6749925c..b0fb97f108 100644 --- a/cli/isolate_state.rs +++ b/cli/isolate_state.rs @@ -5,9 +5,14 @@ use crate::flags; use crate::global_timer::GlobalTimer; use crate::modules::Modules; use crate::permissions::DenoPermissions; +use crate::resources::ResourceId; +use crate::workers::UserWorkerBehavior; +use crate::workers::Worker; use deno::Buf; +use futures::future::Shared; use futures::sync::mpsc as async_mpsc; use std; +use std::collections::HashMap; use std::env; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -16,6 +21,8 @@ use std::sync::Mutex; pub type WorkerSender = async_mpsc::Sender; pub type WorkerReceiver = async_mpsc::Receiver; pub type WorkerChannels = (WorkerSender, WorkerReceiver); +pub type UserWorkerTable = + HashMap>>; // AtomicU64 is currently unstable #[derive(Default)] @@ -42,6 +49,8 @@ pub struct IsolateState { pub modules: Mutex, pub worker_channels: Option>, pub global_timer: Mutex, + pub workers: Mutex, + pub is_worker: bool, } impl IsolateState { @@ -49,6 +58,7 @@ impl IsolateState { flags: flags::DenoFlags, argv_rest: Vec, worker_channels: Option, + is_worker: bool, ) -> Self { let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok(); @@ -61,9 +71,12 @@ impl IsolateState { modules: Mutex::new(Modules::new()), worker_channels: worker_channels.map(Mutex::new), global_timer: Mutex::new(GlobalTimer::new()), + workers: Mutex::new(UserWorkerTable::new()), + is_worker, } } + /// Read main module from argv pub fn main_module(&self) -> Option { if self.argv.len() <= 1 { None @@ -110,7 +123,7 @@ impl IsolateState { let argv = vec![String::from("./deno"), String::from("hello.js")]; // For debugging: argv.push_back(String::from("-D")); let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - IsolateState::new(flags, rest_argv, None) + IsolateState::new(flags, rest_argv, None, false) } pub fn metrics_op_dispatched( diff --git a/cli/main.rs b/cli/main.rs index 12f94650d9..f9a88803e4 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -108,7 +108,7 @@ fn main() { let should_prefetch = flags.prefetch || flags.info; let should_display_info = flags.info; - let state = Arc::new(IsolateState::new(flags, rest_argv, None)); + let state = Arc::new(IsolateState::new(flags, rest_argv, None, false)); let state_ = state.clone(); let startup_data = startup_data::deno_isolate_init(); let cli = CliBehavior::new(Some(startup_data), state_); diff --git a/cli/msg.fbs b/cli/msg.fbs index 45f940f7d0..695515f55f 100644 --- a/cli/msg.fbs +++ b/cli/msg.fbs @@ -63,6 +63,12 @@ union Any { StatRes, Symlink, Truncate, + CreateWorker, + CreateWorkerRes, + HostGetWorkerClosed, + HostGetMessage, + HostGetMessageRes, + HostPostMessage, WorkerGetMessage, WorkerGetMessageRes, WorkerPostMessage, @@ -121,6 +127,8 @@ enum ErrorKind: byte { // custom errors InvalidUri, InvalidSeekMode, + OpNotAvaiable, + WorkerInitFailed } table Cwd {} @@ -171,6 +179,35 @@ table FormatErrorRes { error: string; } +// Create worker as host +table CreateWorker { + specifier: string; +} + +table CreateWorkerRes { + rid: uint32; +} + +table HostGetWorkerClosed { + rid: uint32; +} + +// Get message from guest worker as host +table HostGetMessage { + rid: uint32; +} + +table HostGetMessageRes { + data: [ubyte]; +} + +// Post message to guest worker as host +table HostPostMessage { + rid: uint32; + // data passed thru the zero-copy data parameter. +} + +// Get message from host as guest worker table WorkerGetMessage { unused: int8; } @@ -179,6 +216,7 @@ table WorkerGetMessageRes { data: [ubyte]; } +// Post message to host as guest worker table WorkerPostMessage { // data passed thru the zero-copy data parameter. } diff --git a/cli/ops.rs b/cli/ops.rs index a7c2e868fc..c8119771d5 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -18,6 +18,7 @@ use crate::resources::Resource; use crate::tokio_util; use crate::tokio_write; use crate::version; +use crate::workers; use deno::deno_buf; use deno::Buf; use deno::JSError; @@ -141,13 +142,24 @@ pub fn dispatch_all( (base.sync(), boxed_op) } +/// Superset of op_selector_worker for compiler isolates pub fn op_selector_compiler(inner_type: msg::Any) -> Option { match inner_type { msg::Any::FetchModuleMetaData => Some(op_fetch_module_meta_data), + _ => op_selector_worker(inner_type), + } +} + +/// Superset of op_selector_std for worker isolates +pub fn op_selector_worker(inner_type: msg::Any) -> Option { + match inner_type { + msg::Any::WorkerGetMessage => Some(op_worker_get_message), + msg::Any::WorkerPostMessage => Some(op_worker_post_message), _ => op_selector_std(inner_type), } } +/// Standard ops set for most isolates pub fn op_selector_std(inner_type: msg::Any) -> Option { match inner_type { msg::Any::Accept => Some(op_accept), @@ -189,8 +201,10 @@ pub fn op_selector_std(inner_type: msg::Any) -> Option { msg::Any::Stat => Some(op_stat), msg::Any::Symlink => Some(op_symlink), msg::Any::Truncate => Some(op_truncate), - msg::Any::WorkerGetMessage => Some(op_worker_get_message), - msg::Any::WorkerPostMessage => Some(op_worker_post_message), + msg::Any::CreateWorker => Some(op_create_worker), + msg::Any::HostGetWorkerClosed => Some(op_host_get_worker_closed), + msg::Any::HostGetMessage => Some(op_host_get_message), + msg::Any::HostPostMessage => Some(op_host_post_message), msg::Any::Write => Some(op_write), _ => None, } @@ -1741,6 +1755,7 @@ impl Future for GetMessageFuture { } } +/// Get message from host as guest worker fn op_worker_get_message( sc: &IsolateStateContainer, base: &msg::Base<'_>, @@ -1775,6 +1790,7 @@ fn op_worker_get_message( Box::new(op) } +/// Post message to host as guest worker fn op_worker_post_message( sc: &IsolateStateContainer, base: &msg::Base<'_>, @@ -1807,3 +1823,143 @@ fn op_worker_post_message( }); Box::new(op) } + +/// Create worker as the host +fn op_create_worker( + sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_create_worker().unwrap(); + let specifier = inner.specifier().unwrap(); + + Box::new(futures::future::result(move || -> OpResult { + let parent_state = sc.state().clone(); + let behavior = workers::UserWorkerBehavior::new( + parent_state.flags.clone(), + parent_state.argv.clone(), + ); + match workers::spawn( + behavior, + &format!("USER-WORKER-{}", specifier), + workers::WorkerInit::Module(specifier.to_string()), + ) { + Ok(worker) => { + let mut workers_tl = parent_state.workers.lock().unwrap(); + let rid = worker.resource.rid.clone(); + workers_tl.insert(rid, worker.shared()); + let builder = &mut FlatBufferBuilder::new(); + let msg_inner = msg::CreateWorkerRes::create( + builder, + &msg::CreateWorkerResArgs { rid }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(msg_inner.as_union_value()), + inner_type: msg::Any::CreateWorkerRes, + ..Default::default() + }, + )) + } + Err(errors::RustOrJsError::Js(_)) => Err(errors::worker_init_failed()), + Err(errors::RustOrJsError::Rust(err)) => Err(err), + } + }())) +} + +/// Return when the worker closes +fn op_host_get_worker_closed( + sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_get_worker_closed().unwrap(); + let rid = inner.rid(); + let state = sc.state().clone(); + + let shared_worker_future = { + let workers_tl = state.workers.lock().unwrap(); + let worker = workers_tl.get(&rid).unwrap(); + worker.clone() + }; + + Box::new(shared_worker_future.then(move |_result| { + let builder = &mut FlatBufferBuilder::new(); + + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + })) +} + +/// Get message from guest worker as host +fn op_host_get_message( + _sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_get_message().unwrap(); + let rid = inner.rid(); + + let op = resources::get_message_from_worker(rid); + let op = op.map_err(move |_| -> DenoError { unimplemented!() }); + let op = op.and_then(move |maybe_buf| -> DenoResult { + let builder = &mut FlatBufferBuilder::new(); + + let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); + let msg_inner = msg::HostGetMessageRes::create( + builder, + &msg::HostGetMessageResArgs { data }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(msg_inner.as_union_value()), + inner_type: msg::Any::HostGetMessageRes, + ..Default::default() + }, + )) + }); + Box::new(op) +} + +/// Post message to guest worker as host +fn op_host_post_message( + _sc: &IsolateStateContainer, + base: &msg::Base<'_>, + data: deno_buf, +) -> Box { + let cmd_id = base.cmd_id(); + let inner = base.inner_as_host_post_message().unwrap(); + let rid = inner.rid(); + + let d = Vec::from(data.as_ref()).into_boxed_slice(); + + let op = resources::post_message_to_worker(rid, d); + let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); + let op = op.and_then(move |_| -> DenoResult { + let builder = &mut FlatBufferBuilder::new(); + + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + }); + Box::new(op) +} diff --git a/cli/resources.rs b/cli/resources.rs index 5b08e4b327..817f6062dd 100644 --- a/cli/resources.rs +++ b/cli/resources.rs @@ -305,7 +305,8 @@ pub fn add_worker(wc: WorkerChannels) -> Resource { Resource { rid } } -pub fn worker_post_message( +/// Post message to worker as a host or privilged overlord +pub fn post_message_to_worker( rid: ResourceId, buf: Buf, ) -> futures::sink::Send> { @@ -341,7 +342,7 @@ impl Future for WorkerReceiver { } } -pub fn worker_recv_message(rid: ResourceId) -> WorkerReceiver { +pub fn get_message_from_worker(rid: ResourceId) -> WorkerReceiver { WorkerReceiver { rid } } diff --git a/cli/startup_data.rs b/cli/startup_data.rs index 7f59c06781..61891ced48 100644 --- a/cli/startup_data.rs +++ b/cli/startup_data.rs @@ -31,7 +31,7 @@ pub fn deno_isolate_init() -> StartupData { pub fn compiler_isolate_init() -> StartupData { if cfg!(feature = "no-snapshot-init") { - debug!("Deno isolate init without snapshots."); + debug!("Compiler isolate init without snapshots."); #[cfg(not(feature = "check-only"))] let source_bytes = include_bytes!(concat!( env!("GN_OUT_DIR"), diff --git a/cli/workers.rs b/cli/workers.rs index 0c8d49fa7b..cb919e8ed8 100644 --- a/cli/workers.rs +++ b/cli/workers.rs @@ -1,17 +1,72 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +use crate::errors::*; +use crate::flags::DenoFlags; use crate::isolate::{DenoBehavior, Isolate}; +use crate::isolate_state::IsolateState; +use crate::isolate_state::IsolateStateContainer; use crate::isolate_state::WorkerChannels; -use crate::js_errors::JSErrorColor; +use crate::ops; use crate::resources; -use crate::tokio_util; +use crate::startup_data; +use deno::deno_buf; +use deno::Behavior; use deno::Buf; use deno::JSError; -use futures::future::lazy; +use deno::Op; +use deno::StartupData; use futures::sync::mpsc; -use futures::sync::oneshot; use futures::Future; use futures::Poll; -use std::thread; +use std::sync::Arc; + +pub struct UserWorkerBehavior { + pub state: Arc, +} + +impl UserWorkerBehavior { + pub fn new(flags: DenoFlags, argv_rest: Vec) -> Self { + Self { + state: Arc::new(IsolateState::new(flags, argv_rest, None, true)), + } + } +} + +impl IsolateStateContainer for UserWorkerBehavior { + fn state(&self) -> Arc { + self.state.clone() + } +} + +impl IsolateStateContainer for &UserWorkerBehavior { + fn state(&self) -> Arc { + self.state.clone() + } +} + +impl Behavior for UserWorkerBehavior { + fn startup_data(&mut self) -> Option { + Some(startup_data::deno_isolate_init()) + } + + fn dispatch( + &mut self, + control: &[u8], + zero_copy: deno_buf, + ) -> (bool, Box) { + ops::dispatch_all(self, control, zero_copy, ops::op_selector_worker) + } +} + +impl WorkerBehavior for UserWorkerBehavior { + fn set_internal_channels(&mut self, worker_channels: WorkerChannels) { + self.state = Arc::new(IsolateState::new( + self.state.flags.clone(), + self.state.argv.clone(), + Some(worker_channels), + true, + )); + } +} /// Behavior trait specific to workers pub trait WorkerBehavior: DenoBehavior { @@ -24,10 +79,11 @@ pub trait WorkerBehavior: DenoBehavior { /// Rust interface for WebWorkers. pub struct Worker { isolate: Isolate, + pub resource: resources::Resource, } impl Worker { - pub fn new(mut behavior: B) -> (Self, WorkerChannels) { + pub fn new(mut behavior: B) -> Self { let (worker_in_tx, worker_in_rx) = mpsc::channel::(1); let (worker_out_tx, worker_out_rx) = mpsc::channel::(1); @@ -38,13 +94,23 @@ impl Worker { let isolate = Isolate::new(behavior); - let worker = Worker { isolate }; - (worker, external_channels) + Worker { + isolate, + resource: resources::add_worker(external_channels), + } } pub fn execute(&mut self, js_source: &str) -> Result<(), JSError> { self.isolate.execute(js_source) } + + pub fn execute_mod( + &mut self, + js_filename: &str, + is_prefetch: bool, + ) -> Result<(), RustOrJsError> { + self.isolate.execute_mod(js_filename, is_prefetch) + } } impl Future for Worker { @@ -56,47 +122,48 @@ impl Future for Worker { } } +/// Method and data used to initalize a worker +pub enum WorkerInit { + Script(String), + Module(String), +} + pub fn spawn( behavior: B, - js_source: String, -) -> resources::Resource { - // TODO This function should return a Future, so that the caller can retrieve - // the JSError if one is thrown. Currently it just prints to stderr and calls - // exit(1). - // let (js_error_tx, js_error_rx) = oneshot::channel::(); - let (p, c) = oneshot::channel::(); - let builder = thread::Builder::new().name("worker".to_string()); + worker_debug_name: &str, + init: WorkerInit, +) -> Result, RustOrJsError> { + let state = behavior.state().clone(); + let mut worker = Worker::new(behavior); - let _tid = builder - .spawn(move || { - tokio_util::run(lazy(move || { - let (mut worker, external_channels) = Worker::new(behavior); - let resource = resources::add_worker(external_channels); - p.send(resource.clone()).unwrap(); + worker + .execute(&format!("denoMain('{}')", worker_debug_name)) + .expect("worker workerInit failed"); - worker - .execute("denoMain()") - .expect("worker denoMain failed"); - worker - .execute("workerMain()") - .expect("worker workerMain failed"); - worker.execute(&js_source).expect("worker js_source failed"); + worker + .execute("workerMain()") + .expect("worker workerMain failed"); - worker.then(move |r| -> Result<(), ()> { - resource.close(); - debug!("workers.rs after resource close"); - if let Err(err) = r { - eprintln!("{}", JSErrorColor(&err).to_string()); - std::process::exit(1); - } - Ok(()) - }) - })); + let init_result = match init { + WorkerInit::Script(script) => match worker.execute(&script) { + Ok(v) => Ok(v), + Err(e) => Err(RustOrJsError::Js(e)), + }, + WorkerInit::Module(specifier) => { + let should_prefetch = state.flags.prefetch || state.flags.info; + match state.dir.resolve_module_url(&specifier, ".") { + Err(err) => Err(RustOrJsError::Rust(DenoError::from(err))), + Ok(module_url) => { + worker.execute_mod(&module_url.to_string(), should_prefetch) + } + } + } + }; - debug!("workers.rs after spawn"); - }).unwrap(); - - c.wait().unwrap() + match init_result { + Ok(_) => Ok(worker), + Err(err) => Err(err), + } } #[cfg(test)] @@ -104,63 +171,117 @@ mod tests { use super::*; use crate::compiler::CompilerBehavior; use crate::isolate_state::IsolateState; - use std::sync::Arc; + use crate::js_errors::JSErrorColor; + use crate::tokio_util; + use futures::future::lazy; + use std::thread; #[test] fn test_spawn() { - let resource = spawn( - CompilerBehavior::new(Arc::new(IsolateState::mock())), - r#" - onmessage = function(e) { - let s = new TextDecoder().decode(e.data);; - console.log("msg from main script", s); - if (s == "exit") { - close(); - return; - } else { - console.assert(s === "hi"); + tokio_util::init(|| { + let worker_result = spawn( + CompilerBehavior::new( + IsolateState::mock().flags.clone(), + IsolateState::mock().argv.clone(), + ), + "TEST", + WorkerInit::Script( + r#" + onmessage = function(e) { + console.log("msg from main script", e.data); + if (e.data == "exit") { + close(); + return; + } else { + console.assert(e.data === "hi"); + } + postMessage([1, 2, 3]); + console.log("after postMessage"); } - postMessage(new Uint8Array([1, 2, 3])); - console.log("after postMessage"); - } - "#.into(), - ); - let msg = String::from("hi").into_boxed_str().into_boxed_bytes(); + "#.into(), + ), + ); + assert!(worker_result.is_ok()); + let worker = worker_result.unwrap(); + let resource = worker.resource.clone(); + let resource_ = resource.clone(); - let r = resources::worker_post_message(resource.rid, msg).wait(); - assert!(r.is_ok()); + tokio::spawn(lazy(move || { + worker.then(move |r| -> Result<(), ()> { + resource_.close(); + debug!("workers.rs after resource close"); + if let Err(err) = r { + eprintln!("{}", JSErrorColor(&err).to_string()); + assert!(false) + } + Ok(()) + }) + })); - let maybe_msg = - resources::worker_recv_message(resource.rid).wait().unwrap(); - assert!(maybe_msg.is_some()); - assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]); + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let msg = String::from("exit").into_boxed_str().into_boxed_bytes(); - let r = resources::worker_post_message(resource.rid, msg).wait(); - assert!(r.is_ok()); + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + + let maybe_msg = resources::get_message_from_worker(resource.rid) + .wait() + .unwrap(); + assert!(maybe_msg.is_some()); + // Check if message received is [1, 2, 3] in json + assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]"); + + let msg = json!("exit") + .to_string() + .into_boxed_str() + .into_boxed_bytes(); + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + }) } #[test] fn removed_from_resource_table_on_close() { - let resource = spawn( - CompilerBehavior::new(Arc::new(IsolateState::mock())), - "onmessage = () => close();".into(), - ); + tokio_util::init(|| { + let worker_result = spawn( + CompilerBehavior::new( + IsolateState::mock().flags.clone(), + IsolateState::mock().argv.clone(), + ), + "TEST", + WorkerInit::Script("onmessage = () => close();".into()), + ); + assert!(worker_result.is_ok()); + let worker = worker_result.unwrap(); + let resource = worker.resource.clone(); + let resource_ = resource.clone(); - assert_eq!( - resources::get_type(resource.rid), - Some("worker".to_string()) - ); + tokio::spawn(lazy(move || { + worker.then(move |r| -> Result<(), ()> { + resource_.close(); + debug!("workers.rs after resource close"); + if let Err(err) = r { + eprintln!("{}", JSErrorColor(&err).to_string()); + assert!(false) + } + Ok(()) + }) + })); - let msg = String::from("hi").into_boxed_str().into_boxed_bytes(); - let r = resources::worker_post_message(resource.rid, msg).wait(); - assert!(r.is_ok()); - println!("rid {:?}", resource.rid); + assert_eq!( + resources::get_type(resource.rid), + Some("worker".to_string()) + ); - // TODO Need a way to get a future for when a resource closes. - // For now, just sleep for a bit. - // resource.close(); - thread::sleep(std::time::Duration::from_millis(1000)); - assert_eq!(resources::get_type(resource.rid), None); + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = resources::post_message_to_worker(resource.rid, msg).wait(); + assert!(r.is_ok()); + println!("rid {:?}", resource.rid); + + // TODO Need a way to get a future for when a resource closes. + // For now, just sleep for a bit. + // resource.close(); + thread::sleep(std::time::Duration::from_millis(1000)); + assert_eq!(resources::get_type(resource.rid), None); + }) } } diff --git a/core/js_errors.rs b/core/js_errors.rs index e8fb0701c9..ee3272bafd 100644 --- a/core/js_errors.rs +++ b/core/js_errors.rs @@ -13,7 +13,7 @@ use serde_json; use std::fmt; use std::str; -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct StackFrame { pub line: i64, // zero indexed pub column: i64, // zero indexed @@ -24,7 +24,7 @@ pub struct StackFrame { pub is_wasm: bool, } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub struct JSError { pub message: String, diff --git a/js/assets.ts b/js/assets.ts index 4369067835..b3a6e00dbe 100644 --- a/js/assets.ts +++ b/js/assets.ts @@ -44,12 +44,8 @@ import libEsnextDts from "/third_party/node_modules/typescript/lib/lib.esnext.d. import libEsnextIntlDts from "/third_party/node_modules/typescript/lib/lib.esnext.intl.d.ts!string"; import libEsnextSymbolDts from "/third_party/node_modules/typescript/lib/lib.esnext.symbol.d.ts!string"; -// @internal -export const assetSourceCode: { [key: string]: string } = { - // Generated library - "lib.deno_runtime.d.ts": libDts, - - // Static libraries +// Default static libraries for all compile jobs +const defaultAssets: { [key: string]: string } = { "lib.es2015.collection.d.ts": libEs2015CollectionDts, "lib.es2015.core.d.ts": libEs2015CoreDts, "lib.es2015.d.ts": libEs2015Dts, @@ -85,3 +81,11 @@ export const assetSourceCode: { [key: string]: string } = { "lib.esnext.intl.d.ts": libEsnextIntlDts, "lib.esnext.symbol.d.ts": libEsnextSymbolDts }; + +// assests for normal compile jobs +// @internal +export const assetSourceCode: { [key: string]: string } = { + // Generated library + "lib.deno_runtime.d.ts": libDts, + ...defaultAssets +}; diff --git a/js/compiler.ts b/js/compiler.ts index 0f7070fd24..72ac391ea1 100644 --- a/js/compiler.ts +++ b/js/compiler.ts @@ -46,6 +46,7 @@ type SourceMap = string; interface CompilerLookup { specifier: ModuleSpecifier; referrer: ContainingFile; + isWorker: boolean; } /** Abstraction of the APIs required from the `os` module so they can be @@ -179,6 +180,8 @@ class Compiler implements ts.LanguageServiceHost, ts.FormatDiagnosticsHost { // testing private _ts: Ts = ts; + private readonly _assetsSourceCode: { [key: string]: string }; + /** The TypeScript language service often refers to the resolved fileName of * a module, this is a shortcut to avoid unnecessary module resolution logic * for modules that may have been initially resolved by a `moduleSpecifier` @@ -239,9 +242,12 @@ class Compiler implements ts.LanguageServiceHost, ts.FormatDiagnosticsHost { // not null assertion moduleId = moduleSpecifier.split("/").pop()!; const assetName = moduleId.includes(".") ? moduleId : `${moduleId}.d.ts`; - assert(assetName in assetSourceCode, `No such asset "${assetName}"`); + assert( + assetName in this._assetsSourceCode, + `No such asset "${assetName}"` + ); mediaType = msg.MediaType.TypeScript; - sourceCode = assetSourceCode[assetName]; + sourceCode = this._assetsSourceCode[assetName]; fileName = `${ASSETS}/${assetName}`; } else { // We query Rust with a CodeFetch message. It will load the sourceCode, @@ -299,7 +305,8 @@ class Compiler implements ts.LanguageServiceHost, ts.FormatDiagnosticsHost { innerMap.set(moduleSpecifier, fileName); } - constructor() { + constructor(assetsSourceCode: { [key: string]: string }) { + this._assetsSourceCode = assetsSourceCode; this._service = this._ts.createLanguageService(this); } @@ -498,7 +505,7 @@ class Compiler implements ts.LanguageServiceHost, ts.FormatDiagnosticsHost { } } -const compiler = new Compiler(); +const compiler = new Compiler(assetSourceCode); // set global objects for compiler web worker window.clearTimeout = clearTimer; @@ -514,17 +521,12 @@ window.TextEncoder = TextEncoder; // lazy instantiating the compiler web worker window.compilerMain = function compilerMain() { // workerMain should have already been called since a compiler is a worker. - const encoder = new TextEncoder(); - const decoder = new TextDecoder(); - window.onmessage = ({ data }: { data: Uint8Array }) => { - const json = decoder.decode(data); - const { specifier, referrer } = JSON.parse(json) as CompilerLookup; + window.onmessage = ({ data }: { data: CompilerLookup }) => { + const { specifier, referrer } = data; const result = compiler.compile(specifier, referrer); - const responseJson = JSON.stringify(result); - const response = encoder.encode(responseJson); - postMessage(response); + postMessage(result); }; }; diff --git a/js/globals.ts b/js/globals.ts index 5a0fb18cec..56956b4add 100644 --- a/js/globals.ts +++ b/js/globals.ts @@ -102,7 +102,16 @@ export type TextDecoder = textEncoding.TextDecoder; window.performance = new performanceUtil.Performance(); +// This variable functioning correctly depends on `declareAsLet` +// in //tools/ts_library_builder/main.ts +window.onmessage = workers.onmessage; + window.workerMain = workers.workerMain; +window.workerClose = workers.workerClose; +window.postMessage = workers.postMessage; + +window.Worker = workers.WorkerImpl; +export type Worker = workers.Worker; // below are interfaces that are available in TypeScript but // have different signatures diff --git a/js/main.ts b/js/main.ts index e7f7e284e6..c32f3ac9e3 100644 --- a/js/main.ts +++ b/js/main.ts @@ -18,8 +18,8 @@ import * as deno from "./deno"; // TODO(kitsonk) remove with `--types` below import libDts from "gen/cli/lib/lib.deno_runtime.d.ts!string"; -export default function denoMain(): void { - const startResMsg = os.start(); +export default function denoMain(name?: string): void { + const startResMsg = os.start(name); setVersions(startResMsg.denoVersion()!, startResMsg.v8Version()!); diff --git a/js/workers.ts b/js/workers.ts index bdfbed6404..601ffa0b15 100644 --- a/js/workers.ts +++ b/js/workers.ts @@ -1,33 +1,110 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import * as dispatch from "./dispatch"; +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { sendAsync, sendSync } from "./dispatch"; import * as msg from "gen/cli/msg_generated"; import * as flatbuffers from "./flatbuffers"; import { assert, log } from "./util"; +import { TextDecoder, TextEncoder } from "./text_encoding"; import { window } from "./window"; -export async function postMessage(data: Uint8Array): Promise { +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); + +export function encodeMessage(data: any): Uint8Array { + const dataJson = JSON.stringify(data); + return encoder.encode(dataJson); +} + +export function decodeMessage(dataIntArray: Uint8Array): any { + const dataJson = decoder.decode(dataIntArray); + return JSON.parse(dataJson); +} + +function createWorker(specifier: string): number { const builder = flatbuffers.createBuilder(); - msg.WorkerPostMessage.startWorkerPostMessage(builder); - const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder); - const baseRes = await dispatch.sendAsync( + const specifier_ = builder.createString(specifier); + msg.CreateWorker.startCreateWorker(builder); + msg.CreateWorker.addSpecifier(builder, specifier_); + const inner = msg.CreateWorker.endCreateWorker(builder); + const baseRes = sendSync(builder, msg.Any.CreateWorker, inner); + assert(baseRes != null); + assert( + msg.Any.CreateWorkerRes === baseRes!.innerType(), + `base.innerType() unexpectedly is ${baseRes!.innerType()}` + ); + const res = new msg.CreateWorkerRes(); + assert(baseRes!.inner(res) != null); + return res.rid(); +} + +async function hostGetWorkerClosed(rid: number): Promise { + const builder = flatbuffers.createBuilder(); + msg.HostGetWorkerClosed.startHostGetWorkerClosed(builder); + msg.HostGetWorkerClosed.addRid(builder, rid); + const inner = msg.HostGetWorkerClosed.endHostGetWorkerClosed(builder); + await sendAsync(builder, msg.Any.HostGetWorkerClosed, inner); +} + +function hostPostMessage(rid: number, data: any): void { + const dataIntArray = encodeMessage(data); + const builder = flatbuffers.createBuilder(); + msg.HostPostMessage.startHostPostMessage(builder); + msg.HostPostMessage.addRid(builder, rid); + const inner = msg.HostPostMessage.endHostPostMessage(builder); + const baseRes = sendSync( builder, - msg.Any.WorkerPostMessage, + msg.Any.HostPostMessage, inner, - data + dataIntArray ); assert(baseRes != null); } -export async function getMessage(): Promise { +async function hostGetMessage(rid: number): Promise { + const builder = flatbuffers.createBuilder(); + msg.HostGetMessage.startHostGetMessage(builder); + msg.HostGetMessage.addRid(builder, rid); + const inner = msg.HostGetMessage.endHostGetMessage(builder); + const baseRes = await sendAsync(builder, msg.Any.HostGetMessage, inner); + assert(baseRes != null); + assert( + msg.Any.HostGetMessageRes === baseRes!.innerType(), + `base.innerType() unexpectedly is ${baseRes!.innerType()}` + ); + const res = new msg.HostGetMessageRes(); + assert(baseRes!.inner(res) != null); + + const dataArray = res.dataArray(); + if (dataArray != null) { + return decodeMessage(dataArray); + } else { + return null; + } +} + +// Stuff for workers +export let onmessage: (e: { data: any }) => void = (): void => {}; + +export function postMessage(data: any): void { + const dataIntArray = encodeMessage(data); + const builder = flatbuffers.createBuilder(); + msg.WorkerPostMessage.startWorkerPostMessage(builder); + const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder); + const baseRes = sendSync( + builder, + msg.Any.WorkerPostMessage, + inner, + dataIntArray + ); + assert(baseRes != null); +} + +export async function getMessage(): Promise { log("getMessage"); const builder = flatbuffers.createBuilder(); msg.WorkerGetMessage.startWorkerGetMessage(builder); const inner = msg.WorkerGetMessage.endWorkerGetMessage(builder); - const baseRes = await dispatch.sendAsync( - builder, - msg.Any.WorkerGetMessage, - inner - ); + const baseRes = await sendAsync(builder, msg.Any.WorkerGetMessage, inner); assert(baseRes != null); assert( msg.Any.WorkerGetMessageRes === baseRes!.innerType(), @@ -37,14 +114,14 @@ export async function getMessage(): Promise { assert(baseRes!.inner(res) != null); const dataArray = res.dataArray(); - if (dataArray == null) { - return null; + if (dataArray != null) { + return decodeMessage(dataArray); } else { - return new Uint8Array(dataArray!); + return null; } } -let isClosing = false; +export let isClosing = false; export function workerClose(): void { isClosing = true; @@ -67,3 +144,45 @@ export async function workerMain(): Promise { } } } + +export interface Worker { + onerror?: () => void; + onmessage?: (e: { data: any }) => void; + onmessageerror?: () => void; + postMessage(data: any): void; +} + +export class WorkerImpl implements Worker { + private readonly rid: number; + private isClosing: boolean = false; + public onerror?: () => void; + public onmessage?: (data: any) => void; + public onmessageerror?: () => void; + + constructor(specifier: string) { + this.rid = createWorker(specifier); + this.run(); + hostGetWorkerClosed(this.rid).then(() => { + this.isClosing = true; + }); + } + + postMessage(data: any): void { + hostPostMessage(this.rid, data); + } + + private async run(): Promise { + while (!this.isClosing) { + const data = await hostGetMessage(this.rid); + if (data == null) { + log("worker got null message. quitting."); + break; + } + // TODO(afinch7) stop this from eating messages before onmessage has been assigned + if (this.onmessage) { + const event = { data }; + this.onmessage(event); + } + } + } +} diff --git a/tests/026_workers.test b/tests/026_workers.test new file mode 100644 index 0000000000..1c5b6f4e66 --- /dev/null +++ b/tests/026_workers.test @@ -0,0 +1,2 @@ +args: tests/026_workers.ts --reload +output: tests/026_workers.ts.out \ No newline at end of file diff --git a/tests/026_workers.ts b/tests/026_workers.ts new file mode 100644 index 0000000000..0cf8f53b1e --- /dev/null +++ b/tests/026_workers.ts @@ -0,0 +1,14 @@ +const jsWorker = new Worker("tests/subdir/test_worker.js"); +const tsWorker = new Worker("tests/subdir/test_worker.ts"); + +tsWorker.onmessage = e => { + console.log("Received ts: " + e.data); +}; + +jsWorker.onmessage = e => { + console.log("Received js: " + e.data); + + tsWorker.postMessage("Hello World"); +}; + +jsWorker.postMessage("Hello World"); diff --git a/tests/026_workers.ts.out b/tests/026_workers.ts.out new file mode 100644 index 0000000000..7538cc867a --- /dev/null +++ b/tests/026_workers.ts.out @@ -0,0 +1,4 @@ +Hello World +Received js: Hello World +Hello World +Received ts: Hello World diff --git a/tests/error_004_missing_module.disabled b/tests/error_004_missing_module similarity index 100% rename from tests/error_004_missing_module.disabled rename to tests/error_004_missing_module diff --git a/tests/error_005_missing_dynamic_import.disabled b/tests/error_005_missing_dynamic_import similarity index 100% rename from tests/error_005_missing_dynamic_import.disabled rename to tests/error_005_missing_dynamic_import diff --git a/tests/error_006_import_ext_failure.disabled b/tests/error_006_import_ext_failure similarity index 100% rename from tests/error_006_import_ext_failure.disabled rename to tests/error_006_import_ext_failure diff --git a/tests/subdir/test_worker.js b/tests/subdir/test_worker.js new file mode 100644 index 0000000000..53d38ba96c --- /dev/null +++ b/tests/subdir/test_worker.js @@ -0,0 +1,7 @@ +onmessage = function(e) { + console.log(e.data); + + postMessage(e.data); + + workerClose(); +}; diff --git a/tests/subdir/test_worker.ts b/tests/subdir/test_worker.ts new file mode 100644 index 0000000000..53d38ba96c --- /dev/null +++ b/tests/subdir/test_worker.ts @@ -0,0 +1,7 @@ +onmessage = function(e) { + console.log(e.data); + + postMessage(e.data); + + workerClose(); +}; diff --git a/tools/ts_library_builder/ast_util.ts b/tools/ts_library_builder/ast_util.ts index d195da721b..14546f9c51 100644 --- a/tools/ts_library_builder/ast_util.ts +++ b/tools/ts_library_builder/ast_util.ts @@ -94,11 +94,14 @@ export function addVariableDeclaration( node: StatementedNode, name: string, type: string, + isConst: boolean, hasDeclareKeyword?: boolean, jsdocs?: JSDoc[] ): VariableStatement { return node.addVariableStatement({ - declarationKind: VariableDeclarationKind.Const, + declarationKind: isConst + ? VariableDeclarationKind.Const + : VariableDeclarationKind.Let, declarations: [{ name, type }], docs: jsdocs && jsdocs.map(jsdoc => jsdoc.getText()), hasDeclareKeyword diff --git a/tools/ts_library_builder/build_library.ts b/tools/ts_library_builder/build_library.ts index 2045c288f0..22268f1dc5 100644 --- a/tools/ts_library_builder/build_library.ts +++ b/tools/ts_library_builder/build_library.ts @@ -56,6 +56,16 @@ export interface BuildLibraryOptions { * the basePath. */ inputs?: string[]; + /** + * Path to globals file to be used I.E. `js/globals.ts` + */ + additionalGlobals?: string[]; + + /** + * List of global variables to define as let instead of the default const. + */ + declareAsLet?: string[]; + /** * The path to the output library */ @@ -170,30 +180,21 @@ export function flatten({ } } -interface MergeGlobalOptions { - basePath: string; - debug?: boolean; - declarationProject: Project; - filePath: string; +interface PrepareFileForMergeOptions { globalVarName: string; - ignore?: string[]; - inputProject: Project; interfaceName: string; targetSourceFile: SourceFile; } -/** Take a module and merge it into the global scope */ -export function mergeGlobal({ - basePath, - debug, - declarationProject, - filePath, +interface PrepareFileForMergeReturn { + interfaceDeclaration: InterfaceDeclaration; +} + +export function prepareFileForMerge({ globalVarName, - ignore, - inputProject, interfaceName, targetSourceFile -}: MergeGlobalOptions): void { +}: PrepareFileForMergeOptions): PrepareFileForMergeReturn { // Add the global object interface const interfaceDeclaration = targetSourceFile.addInterface({ name: interfaceName, @@ -201,15 +202,56 @@ export function mergeGlobal({ }); // Declare the global variable - addVariableDeclaration(targetSourceFile, globalVarName, interfaceName, true); + addVariableDeclaration( + targetSourceFile, + globalVarName, + interfaceName, + true, + true + ); // `globalThis` accesses the global scope and is defined here: // https://github.com/tc39/proposal-global - addVariableDeclaration(targetSourceFile, "globalThis", interfaceName, true); + addVariableDeclaration( + targetSourceFile, + "globalThis", + interfaceName, + true, + true + ); // Add self reference to the global variable addInterfaceProperty(interfaceDeclaration, globalVarName, interfaceName); + return { + interfaceDeclaration + }; +} + +interface MergeGlobalOptions extends PrepareFileForMergeOptions { + basePath: string; + debug?: boolean; + declarationProject: Project; + filePath: string; + ignore?: string[]; + inputProject: Project; + prepareReturn: PrepareFileForMergeReturn; + declareAsLet?: string[]; +} + +/** Take a module and merge it into the global scope */ +export function mergeGlobals({ + basePath, + debug, + declarationProject, + filePath, + globalVarName, + ignore, + inputProject, + targetSourceFile, + declareAsLet, + prepareReturn: { interfaceDeclaration } +}: MergeGlobalOptions): void { // Retrieve source file from the input project const sourceFile = inputProject.getSourceFileOrThrow(filePath); @@ -267,7 +309,8 @@ export function mergeGlobal({ dependentSourceFiles.add(valueDeclaration.getSourceFile()); } } - addVariableDeclaration(targetSourceFile, property, type, true); + const isConst = !(declareAsLet && declareAsLet.includes(property)); + addVariableDeclaration(targetSourceFile, property, type, isConst, true); addInterfaceProperty(interfaceDeclaration, property, type); } } @@ -297,29 +340,32 @@ export function mergeGlobal({ const importDeclarations = sourceFile.getImportDeclarations(); const namespaces = new Set(); for (const declaration of importDeclarations) { - const declarationSourceFile = declaration.getModuleSpecifierSourceFile(); - if ( - declarationSourceFile && - dependentSourceFiles.has(declarationSourceFile) - ) { - // the source file will resolve to the original `.ts` file, but the - // information we really want is in the emitted `.d.ts` file, so we will - // resolve to that file - const dtsFilePath = declarationSourceFile - .getFilePath() - .replace(/\.ts$/, ".d.ts"); - const dtsSourceFile = declarationProject.getSourceFileOrThrow( - dtsFilePath - ); - targetSourceFile.addStatements( - namespaceSourceFile(dtsSourceFile, { - debug, - namespace: declaration.getNamespaceImportOrThrow().getText(), - namespaces, - rootPath: basePath, - sourceFileMap - }) - ); + const namespaceImport = declaration.getNamespaceImport(); + if (namespaceImport) { + const declarationSourceFile = declaration.getModuleSpecifierSourceFile(); + if ( + declarationSourceFile && + dependentSourceFiles.has(declarationSourceFile) + ) { + // the source file will resolve to the original `.ts` file, but the + // information we really want is in the emitted `.d.ts` file, so we will + // resolve to that file + const dtsFilePath = declarationSourceFile + .getFilePath() + .replace(/\.ts$/, ".d.ts"); + const dtsSourceFile = declarationProject.getSourceFileOrThrow( + dtsFilePath + ); + targetSourceFile.addStatements( + namespaceSourceFile(dtsSourceFile, { + debug, + namespace: namespaceImport.getText(), + namespaces, + rootPath: basePath, + sourceFileMap + }) + ); + } } } @@ -337,6 +383,8 @@ export function main({ buildPath, inline, inputs, + additionalGlobals, + declareAsLet, debug, outFile, silent @@ -476,20 +524,46 @@ export function main({ }${msgGeneratedDtsText}\n` }; - mergeGlobal({ + const prepareForMergeOpts: PrepareFileForMergeOptions = { + globalVarName: "window", + interfaceName: "Window", + targetSourceFile: libDTs + }; + + const prepareReturn = prepareFileForMerge(prepareForMergeOpts); + + mergeGlobals({ basePath, debug, declarationProject, filePath: `${basePath}/js/globals.ts`, - globalVarName: "window", inputProject, ignore: ["Deno"], - interfaceName: "Window", - targetSourceFile: libDTs + declareAsLet, + ...prepareForMergeOpts, + prepareReturn }); log(`Merged "globals" into global scope.`); + if (additionalGlobals) { + for (const additionalGlobal of additionalGlobals) { + mergeGlobals({ + basePath, + debug, + declarationProject, + filePath: `${basePath}/${additionalGlobal}`, + inputProject, + ignore: ["Deno"], + declareAsLet, + ...prepareForMergeOpts, + prepareReturn + }); + } + + log(`Added additional "globals" into global scope.`); + } + flatten({ basePath, customSources, diff --git a/tools/ts_library_builder/main.ts b/tools/ts_library_builder/main.ts index 54a659d016..e4e2e73ede 100644 --- a/tools/ts_library_builder/main.ts +++ b/tools/ts_library_builder/main.ts @@ -46,6 +46,7 @@ buildRuntimeLib({ "js/deno.ts", "js/globals.ts" ], + declareAsLet: ["onmessage"], outFile, silent }); diff --git a/tools/ts_library_builder/test.ts b/tools/ts_library_builder/test.ts index 5aeb8d611d..2b6abe7144 100644 --- a/tools/ts_library_builder/test.ts +++ b/tools/ts_library_builder/test.ts @@ -5,7 +5,7 @@ import * as assert from "assert"; import { Project, ts } from "ts-morph"; -import { flatten, mergeGlobal } from "./build_library"; +import { flatten, mergeGlobals, prepareFileForMerge } from "./build_library"; import { inlineFiles, loadDtsFiles } from "./ast_util"; const { ModuleKind, ModuleResolutionKind, ScriptTarget } = ts; @@ -146,15 +146,22 @@ function buildLibraryMerge(): void { outputSourceFile: targetSourceFile } = setupFixtures(); - mergeGlobal({ + const prepareForMergeOpts = { + globalVarName: "foobarbaz", + interfaceName: "FooBar", + targetSourceFile + }; + + const prepareReturn = prepareFileForMerge(prepareForMergeOpts); + + mergeGlobals({ basePath, declarationProject, debug, - globalVarName: "foobarbaz", filePath: `${buildPath}/globals.ts`, inputProject, - interfaceName: "FooBar", - targetSourceFile + ...prepareForMergeOpts, + prepareReturn }); assert(targetSourceFile.getNamespace("moduleC") != null);