mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 16:42:21 -05:00
refactor: split cli::Worker (#3735)
* cli::Worker is base struct to create specialized workers * add MainWorker * add CompilerWorker * refactor WebWorker to use Worker
This commit is contained in:
parent
229eb292f8
commit
ecd1d3abb0
13 changed files with 299 additions and 280 deletions
78
cli/compilers/compiler_worker.rs
Normal file
78
cli/compilers/compiler_worker.rs
Normal file
|
@ -0,0 +1,78 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
use crate::ops;
|
||||
use crate::state::ThreadSafeState;
|
||||
use crate::worker::Worker;
|
||||
use crate::worker::WorkerChannels;
|
||||
use deno_core;
|
||||
use deno_core::ErrBox;
|
||||
use deno_core::StartupData;
|
||||
use futures::future::FutureExt;
|
||||
use std::future::Future;
|
||||
use std::ops::Deref;
|
||||
use std::ops::DerefMut;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
/// This worker is used to host TypeScript and WASM compilers.
|
||||
///
|
||||
/// It provides minimal set of ops that are necessary to facilitate
|
||||
/// compilation.
|
||||
///
|
||||
/// NOTE: This worker is considered priveleged, because it may
|
||||
/// access file system without permission check.
|
||||
///
|
||||
/// At the moment this worker is meant to be single-use - after
|
||||
/// performing single compilation/bundling it should be destroyed.
|
||||
///
|
||||
/// TODO(bartlomieju): add support to reuse the worker - or in other
|
||||
/// words support stateful TS compiler
|
||||
#[derive(Clone)]
|
||||
pub struct CompilerWorker(Worker);
|
||||
|
||||
impl CompilerWorker {
|
||||
pub fn new(
|
||||
name: String,
|
||||
startup_data: StartupData,
|
||||
state: ThreadSafeState,
|
||||
external_channels: WorkerChannels,
|
||||
) -> Self {
|
||||
let state_ = state.clone();
|
||||
let worker = Worker::new(name, startup_data, state_, external_channels);
|
||||
{
|
||||
let mut isolate = worker.isolate.try_lock().unwrap();
|
||||
ops::compiler::init(&mut isolate, &state);
|
||||
ops::web_worker::init(&mut isolate, &state);
|
||||
// TODO(bartlomieju): CompilerWorker should not
|
||||
// depend on those ops
|
||||
ops::os::init(&mut isolate, &state);
|
||||
ops::files::init(&mut isolate, &state);
|
||||
ops::fs::init(&mut isolate, &state);
|
||||
ops::io::init(&mut isolate, &state);
|
||||
}
|
||||
|
||||
Self(worker)
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for CompilerWorker {
|
||||
type Target = Worker;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for CompilerWorker {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for CompilerWorker {
|
||||
type Output = Result<(), ErrBox>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let inner = self.get_mut();
|
||||
inner.0.poll_unpin(cx)
|
||||
}
|
||||
}
|
|
@ -3,6 +3,7 @@ use deno_core::ErrBox;
|
|||
use futures::Future;
|
||||
use serde_json::Value;
|
||||
|
||||
mod compiler_worker;
|
||||
mod js;
|
||||
mod json;
|
||||
mod ts;
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
use super::compiler_worker::CompilerWorker;
|
||||
use crate::compilers::CompilationResultFuture;
|
||||
use crate::compilers::CompiledModule;
|
||||
use crate::compilers::CompiledModuleFuture;
|
||||
|
@ -13,7 +14,6 @@ use crate::source_maps::SourceMapGetter;
|
|||
use crate::startup_data;
|
||||
use crate::state::*;
|
||||
use crate::version;
|
||||
use crate::worker::Worker;
|
||||
use deno_core::Buf;
|
||||
use deno_core::ErrBox;
|
||||
use deno_core::ModuleSpecifier;
|
||||
|
@ -228,7 +228,7 @@ impl TsCompiler {
|
|||
}
|
||||
|
||||
/// Create a new V8 worker with snapshot of TS compiler and setup compiler's runtime.
|
||||
fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
|
||||
fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker {
|
||||
let (int, ext) = ThreadSafeState::create_channels();
|
||||
let worker_state =
|
||||
ThreadSafeState::new(global_state.clone(), None, None, int)
|
||||
|
@ -240,7 +240,7 @@ impl TsCompiler {
|
|||
.compiler_starts
|
||||
.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
let mut worker = Worker::new(
|
||||
let mut worker = CompilerWorker::new(
|
||||
"TS".to_string(),
|
||||
startup_data::compiler_isolate_init(),
|
||||
worker_state,
|
||||
|
@ -279,7 +279,7 @@ impl TsCompiler {
|
|||
worker.post_message(req_msg).await?;
|
||||
worker.await?;
|
||||
debug!("Sent message to worker");
|
||||
let maybe_msg = worker_.get_message().await?;
|
||||
let maybe_msg = worker_.get_message().await;
|
||||
debug!("Received message from worker");
|
||||
if let Some(msg) = maybe_msg {
|
||||
let json_str = std::str::from_utf8(&msg).unwrap();
|
||||
|
@ -378,7 +378,7 @@ impl TsCompiler {
|
|||
worker.post_message(req_msg).await?;
|
||||
worker.await?;
|
||||
debug!("Sent message to worker");
|
||||
let maybe_msg = worker_.get_message().await?;
|
||||
let maybe_msg = worker_.get_message().await;
|
||||
if let Some(msg) = maybe_msg {
|
||||
let json_str = std::str::from_utf8(&msg).unwrap();
|
||||
debug!("Message: {}", json_str);
|
||||
|
@ -633,7 +633,7 @@ pub fn runtime_compile_async<S: BuildHasher>(
|
|||
worker.post_message(req_msg).await?;
|
||||
worker.await?;
|
||||
debug!("Sent message to worker");
|
||||
let msg = (worker_.get_message().await?).unwrap();
|
||||
let msg = (worker_.get_message().await).unwrap();
|
||||
let json_str = std::str::from_utf8(&msg).unwrap();
|
||||
Ok(json!(json_str))
|
||||
}
|
||||
|
@ -661,7 +661,7 @@ pub fn runtime_transpile_async<S: BuildHasher>(
|
|||
worker.post_message(req_msg).await?;
|
||||
worker.await?;
|
||||
debug!("Sent message to worker");
|
||||
let msg = (worker_.get_message().await?).unwrap();
|
||||
let msg = (worker_.get_message().await).unwrap();
|
||||
let json_str = std::str::from_utf8(&msg).unwrap();
|
||||
Ok(json!(json_str))
|
||||
}
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
use super::compiler_worker::CompilerWorker;
|
||||
use crate::compilers::CompiledModule;
|
||||
use crate::compilers::CompiledModuleFuture;
|
||||
use crate::file_fetcher::SourceFile;
|
||||
use crate::global_state::ThreadSafeGlobalState;
|
||||
use crate::startup_data;
|
||||
use crate::state::*;
|
||||
use crate::worker::Worker;
|
||||
use futures::FutureExt;
|
||||
use serde_derive::Deserialize;
|
||||
use serde_json;
|
||||
|
@ -42,7 +42,7 @@ pub struct WasmCompiler {
|
|||
|
||||
impl WasmCompiler {
|
||||
/// Create a new V8 worker with snapshot of WASM compiler and setup compiler's runtime.
|
||||
fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
|
||||
fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker {
|
||||
let (int, ext) = ThreadSafeState::create_channels();
|
||||
let worker_state =
|
||||
ThreadSafeState::new(global_state.clone(), None, None, int)
|
||||
|
@ -54,7 +54,7 @@ impl WasmCompiler {
|
|||
.compiler_starts
|
||||
.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
let mut worker = Worker::new(
|
||||
let mut worker = CompilerWorker::new(
|
||||
"WASM".to_string(),
|
||||
startup_data::compiler_isolate_init(),
|
||||
worker_state,
|
||||
|
@ -100,10 +100,9 @@ impl WasmCompiler {
|
|||
std::process::exit(1);
|
||||
}
|
||||
debug!("Sent message to worker");
|
||||
let maybe_msg = worker_.get_message().await.expect("not handled");
|
||||
let json_msg = worker_.get_message().await.expect("not handled");
|
||||
|
||||
debug!("Received message from worker");
|
||||
let json_msg = maybe_msg.unwrap();
|
||||
let module_info: WasmModuleInfo =
|
||||
serde_json::from_slice(&json_msg).unwrap();
|
||||
debug!("WASM module info: {:#?}", &module_info);
|
||||
|
|
13
cli/lib.rs
13
cli/lib.rs
|
@ -60,7 +60,7 @@ use crate::global_state::ThreadSafeGlobalState;
|
|||
use crate::ops::io::get_stdio;
|
||||
use crate::progress::Progress;
|
||||
use crate::state::ThreadSafeState;
|
||||
use crate::worker::Worker;
|
||||
use crate::worker::MainWorker;
|
||||
use deno_core::v8_set_flags;
|
||||
use deno_core::ErrBox;
|
||||
use deno_core::ModuleSpecifier;
|
||||
|
@ -97,7 +97,7 @@ impl log::Log for Logger {
|
|||
|
||||
fn create_worker_and_state(
|
||||
flags: DenoFlags,
|
||||
) -> (Worker, ThreadSafeGlobalState) {
|
||||
) -> (MainWorker, ThreadSafeGlobalState) {
|
||||
use crate::shell::Shell;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
@ -135,7 +135,7 @@ fn create_worker_and_state(
|
|||
resource_table.add("stderr", Box::new(stderr));
|
||||
}
|
||||
|
||||
let worker = Worker::new(
|
||||
let worker = MainWorker::new(
|
||||
"main".to_string(),
|
||||
startup_data::deno_isolate_init(),
|
||||
state,
|
||||
|
@ -150,7 +150,7 @@ fn types_command() {
|
|||
println!("{}", content);
|
||||
}
|
||||
|
||||
fn print_cache_info(worker: Worker) {
|
||||
fn print_cache_info(worker: MainWorker) {
|
||||
let state = &worker.state.global_state;
|
||||
|
||||
println!(
|
||||
|
@ -170,7 +170,10 @@ fn print_cache_info(worker: Worker) {
|
|||
);
|
||||
}
|
||||
|
||||
async fn print_file_info(worker: Worker, module_specifier: ModuleSpecifier) {
|
||||
async fn print_file_info(
|
||||
worker: MainWorker,
|
||||
module_specifier: ModuleSpecifier,
|
||||
) {
|
||||
let global_state_ = &worker.state.global_state;
|
||||
|
||||
let maybe_source_file = global_state_
|
||||
|
|
|
@ -1,14 +1,11 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
use super::dispatch_json::{Deserialize, JsonOp, Value};
|
||||
use crate::compilers::runtime_compile_async;
|
||||
use crate::compilers::runtime_transpile_async;
|
||||
use crate::futures::future::try_join_all;
|
||||
use crate::msg;
|
||||
use crate::ops::json_op;
|
||||
use crate::state::ThreadSafeState;
|
||||
use deno_core::Loader;
|
||||
use deno_core::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
|
||||
i.register_op("cache", s.core_op(json_op(s.stateful_op(op_cache))));
|
||||
|
@ -20,8 +17,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
|
|||
"fetch_source_files",
|
||||
s.core_op(json_op(s.stateful_op(op_fetch_source_files))),
|
||||
);
|
||||
i.register_op("compile", s.core_op(json_op(s.stateful_op(op_compile))));
|
||||
i.register_op("transpile", s.core_op(json_op(s.stateful_op(op_transpile))));
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
@ -150,46 +145,3 @@ fn op_fetch_source_files(
|
|||
|
||||
Ok(JsonOp::Async(future))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct CompileArgs {
|
||||
root_name: String,
|
||||
sources: Option<HashMap<String, String>>,
|
||||
bundle: bool,
|
||||
options: Option<String>,
|
||||
}
|
||||
|
||||
fn op_compile(
|
||||
state: &ThreadSafeState,
|
||||
args: Value,
|
||||
_zero_copy: Option<PinnedBuf>,
|
||||
) -> Result<JsonOp, ErrBox> {
|
||||
let args: CompileArgs = serde_json::from_value(args)?;
|
||||
Ok(JsonOp::Async(runtime_compile_async(
|
||||
state.global_state.clone(),
|
||||
&args.root_name,
|
||||
&args.sources,
|
||||
args.bundle,
|
||||
&args.options,
|
||||
)))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct TranspileArgs {
|
||||
sources: HashMap<String, String>,
|
||||
options: Option<String>,
|
||||
}
|
||||
|
||||
fn op_transpile(
|
||||
state: &ThreadSafeState,
|
||||
args: Value,
|
||||
_zero_copy: Option<PinnedBuf>,
|
||||
) -> Result<JsonOp, ErrBox> {
|
||||
let args: TranspileArgs = serde_json::from_value(args)?;
|
||||
Ok(JsonOp::Async(runtime_transpile_async(
|
||||
state.global_state.clone(),
|
||||
&args.sources,
|
||||
&args.options,
|
||||
)))
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ pub mod process;
|
|||
pub mod random;
|
||||
pub mod repl;
|
||||
pub mod resources;
|
||||
pub mod runtime_compiler;
|
||||
pub mod timers;
|
||||
pub mod tls;
|
||||
pub mod web_worker;
|
||||
|
|
56
cli/ops/runtime_compiler.rs
Normal file
56
cli/ops/runtime_compiler.rs
Normal file
|
@ -0,0 +1,56 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
use super::dispatch_json::{Deserialize, JsonOp, Value};
|
||||
use crate::compilers::runtime_compile_async;
|
||||
use crate::compilers::runtime_transpile_async;
|
||||
use crate::ops::json_op;
|
||||
use crate::state::ThreadSafeState;
|
||||
use deno_core::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
|
||||
i.register_op("compile", s.core_op(json_op(s.stateful_op(op_compile))));
|
||||
i.register_op("transpile", s.core_op(json_op(s.stateful_op(op_transpile))));
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct CompileArgs {
|
||||
root_name: String,
|
||||
sources: Option<HashMap<String, String>>,
|
||||
bundle: bool,
|
||||
options: Option<String>,
|
||||
}
|
||||
|
||||
fn op_compile(
|
||||
state: &ThreadSafeState,
|
||||
args: Value,
|
||||
_zero_copy: Option<PinnedBuf>,
|
||||
) -> Result<JsonOp, ErrBox> {
|
||||
let args: CompileArgs = serde_json::from_value(args)?;
|
||||
Ok(JsonOp::Async(runtime_compile_async(
|
||||
state.global_state.clone(),
|
||||
&args.root_name,
|
||||
&args.sources,
|
||||
args.bundle,
|
||||
&args.options,
|
||||
)))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
struct TranspileArgs {
|
||||
sources: HashMap<String, String>,
|
||||
options: Option<String>,
|
||||
}
|
||||
|
||||
fn op_transpile(
|
||||
state: &ThreadSafeState,
|
||||
args: Value,
|
||||
_zero_copy: Option<PinnedBuf>,
|
||||
) -> Result<JsonOp, ErrBox> {
|
||||
let args: TranspileArgs = serde_json::from_value(args)?;
|
||||
Ok(JsonOp::Async(runtime_transpile_async(
|
||||
state.global_state.clone(),
|
||||
&args.sources,
|
||||
&args.options,
|
||||
)))
|
||||
}
|
|
@ -11,10 +11,6 @@ use futures::sink::SinkExt;
|
|||
use futures::stream::StreamExt;
|
||||
use std;
|
||||
use std::convert::From;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
|
||||
i.register_op(
|
||||
|
@ -27,33 +23,16 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
|
|||
);
|
||||
}
|
||||
|
||||
struct GetMessageFuture {
|
||||
state: ThreadSafeState,
|
||||
}
|
||||
|
||||
impl Future for GetMessageFuture {
|
||||
type Output = Option<Buf>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let inner = self.get_mut();
|
||||
let mut channels = inner.state.worker_channels.lock().unwrap();
|
||||
let receiver = &mut channels.receiver;
|
||||
receiver.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Get message from host as guest worker
|
||||
fn op_worker_get_message(
|
||||
state: &ThreadSafeState,
|
||||
_args: Value,
|
||||
_data: Option<PinnedBuf>,
|
||||
) -> Result<JsonOp, ErrBox> {
|
||||
let op = GetMessageFuture {
|
||||
state: state.clone(),
|
||||
};
|
||||
|
||||
let state_ = state.clone();
|
||||
let op = async move {
|
||||
let maybe_buf = op.await;
|
||||
let mut receiver = state_.worker_channels.receiver.lock().await;
|
||||
let maybe_buf = receiver.next().await;
|
||||
debug!("op_worker_get_message");
|
||||
Ok(json!({ "data": maybe_buf }))
|
||||
};
|
||||
|
@ -68,8 +47,7 @@ fn op_worker_post_message(
|
|||
data: Option<PinnedBuf>,
|
||||
) -> Result<JsonOp, ErrBox> {
|
||||
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
|
||||
let mut channels = state.worker_channels.lock().unwrap();
|
||||
let sender = &mut channels.sender;
|
||||
let mut sender = state.worker_channels.sender.clone();
|
||||
futures::executor::block_on(sender.send(d))
|
||||
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
|
||||
|
||||
|
|
|
@ -57,21 +57,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) {
|
|||
i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics))));
|
||||
}
|
||||
|
||||
struct GetMessageFuture {
|
||||
state: ThreadSafeState,
|
||||
}
|
||||
|
||||
impl Future for GetMessageFuture {
|
||||
type Output = Option<Buf>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let inner = self.get_mut();
|
||||
let mut channels = inner.state.worker_channels.lock().unwrap();
|
||||
let receiver = &mut channels.receiver;
|
||||
receiver.poll_next_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct CreateWorkerArgs {
|
||||
|
@ -250,9 +235,12 @@ fn op_host_close_worker(
|
|||
let mut workers_table = state_.workers.lock().unwrap();
|
||||
let maybe_worker = workers_table.remove(&id);
|
||||
if let Some(worker) = maybe_worker {
|
||||
let mut channels = worker.state.worker_channels.lock().unwrap();
|
||||
channels.sender.close_channel();
|
||||
channels.receiver.close();
|
||||
let channels = worker.state.worker_channels.clone();
|
||||
let mut sender = channels.sender.clone();
|
||||
sender.close_channel();
|
||||
|
||||
let mut receiver = futures::executor::block_on(channels.receiver.lock());
|
||||
receiver.close();
|
||||
};
|
||||
|
||||
Ok(JsonOp::Sync(json!({})))
|
||||
|
@ -285,9 +273,9 @@ fn op_host_get_message(
|
|||
_data: Option<PinnedBuf>,
|
||||
) -> Result<JsonOp, ErrBox> {
|
||||
let args: HostGetMessageArgs = serde_json::from_value(args)?;
|
||||
|
||||
let state_ = state.clone();
|
||||
let id = args.id as u32;
|
||||
let mut table = state.workers.lock().unwrap();
|
||||
let mut table = state_.workers.lock().unwrap();
|
||||
// TODO: don't return bad resource anymore
|
||||
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
|
||||
let fut = worker.get_message();
|
||||
|
|
|
@ -35,6 +35,7 @@ use std::sync::Arc;
|
|||
use std::sync::Mutex;
|
||||
use std::sync::MutexGuard;
|
||||
use std::time::Instant;
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
|
||||
/// Isolate cannot be passed between threads but ThreadSafeState can.
|
||||
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be
|
||||
|
@ -46,7 +47,7 @@ pub struct State {
|
|||
pub global_state: ThreadSafeGlobalState,
|
||||
pub permissions: Arc<Mutex<DenoPermissions>>,
|
||||
pub main_module: Option<ModuleSpecifier>,
|
||||
pub worker_channels: Mutex<WorkerChannels>,
|
||||
pub worker_channels: WorkerChannels,
|
||||
/// When flags contains a `.import_map_path` option, the content of the
|
||||
/// import map file will be resolved and set.
|
||||
pub import_map: Option<ImportMap>,
|
||||
|
@ -203,11 +204,11 @@ impl ThreadSafeState {
|
|||
let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
|
||||
let internal_channels = WorkerChannels {
|
||||
sender: out_tx,
|
||||
receiver: in_rx,
|
||||
receiver: Arc::new(AsyncMutex::new(in_rx)),
|
||||
};
|
||||
let external_channels = WorkerChannels {
|
||||
sender: in_tx,
|
||||
receiver: out_rx,
|
||||
receiver: Arc::new(AsyncMutex::new(out_rx)),
|
||||
};
|
||||
(internal_channels, external_channels)
|
||||
}
|
||||
|
@ -241,7 +242,7 @@ impl ThreadSafeState {
|
|||
main_module,
|
||||
permissions,
|
||||
import_map,
|
||||
worker_channels: Mutex::new(internal_channels),
|
||||
worker_channels: internal_channels,
|
||||
metrics: Metrics::default(),
|
||||
global_timer: Mutex::new(GlobalTimer::new()),
|
||||
workers: Mutex::new(HashMap::new()),
|
||||
|
|
|
@ -1,35 +1,28 @@
|
|||
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
||||
use crate::fmt_errors::JSError;
|
||||
use crate::ops;
|
||||
use crate::state::ThreadSafeState;
|
||||
use crate::worker::Worker;
|
||||
use crate::worker::WorkerChannels;
|
||||
use crate::worker::WorkerReceiver;
|
||||
use deno_core;
|
||||
use deno_core::Buf;
|
||||
use deno_core::ErrBox;
|
||||
use deno_core::ModuleSpecifier;
|
||||
use deno_core::StartupData;
|
||||
use futures::future::FutureExt;
|
||||
use futures::future::TryFutureExt;
|
||||
use futures::sink::SinkExt;
|
||||
use futures::task::AtomicWaker;
|
||||
use std::env;
|
||||
use std::future::Future;
|
||||
use std::ops::Deref;
|
||||
use std::ops::DerefMut;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
use url::Url;
|
||||
|
||||
/// This worker is implementation of `Worker` Web API
|
||||
///
|
||||
/// At the moment this type of worker supports only
|
||||
/// communication with parent and creating new workers.
|
||||
///
|
||||
/// Each `WebWorker` is either a child of `MainWorker` or other
|
||||
/// `WebWorker`.
|
||||
#[derive(Clone)]
|
||||
pub struct WebWorker {
|
||||
pub name: String,
|
||||
pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>,
|
||||
pub state: ThreadSafeState,
|
||||
external_channels: Arc<Mutex<WorkerChannels>>,
|
||||
}
|
||||
pub struct WebWorker(Worker);
|
||||
|
||||
impl WebWorker {
|
||||
pub fn new(
|
||||
|
@ -38,92 +31,28 @@ impl WebWorker {
|
|||
state: ThreadSafeState,
|
||||
external_channels: WorkerChannels,
|
||||
) -> Self {
|
||||
let mut isolate =
|
||||
deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
|
||||
|
||||
let state_ = state.clone();
|
||||
let worker = Worker::new(name, startup_data, state_, external_channels);
|
||||
{
|
||||
let mut isolate = worker.isolate.try_lock().unwrap();
|
||||
ops::web_worker::init(&mut isolate, &state);
|
||||
ops::worker_host::init(&mut isolate, &state);
|
||||
}
|
||||
|
||||
let global_state_ = state.global_state.clone();
|
||||
isolate.set_js_error_create(move |v8_exception| {
|
||||
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
|
||||
});
|
||||
|
||||
Self {
|
||||
name,
|
||||
isolate: Arc::new(AsyncMutex::new(isolate)),
|
||||
state,
|
||||
external_channels: Arc::new(Mutex::new(external_channels)),
|
||||
Self(worker)
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as execute2() but the filename defaults to "$CWD/__anonymous__".
|
||||
pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> {
|
||||
let path = env::current_dir().unwrap().join("__anonymous__");
|
||||
let url = Url::from_file_path(path).unwrap();
|
||||
self.execute2(url.as_str(), js_source)
|
||||
}
|
||||
|
||||
/// Executes the provided JavaScript source code. The js_filename argument is
|
||||
/// provided only for debugging purposes.
|
||||
fn execute2(
|
||||
&mut self,
|
||||
js_filename: &str,
|
||||
js_source: &str,
|
||||
) -> Result<(), ErrBox> {
|
||||
let mut isolate = self.isolate.try_lock().unwrap();
|
||||
isolate.execute(js_filename, js_source)
|
||||
}
|
||||
|
||||
/// Executes the provided JavaScript module.
|
||||
///
|
||||
/// Takes ownership of the isolate behind mutex.
|
||||
pub async fn execute_mod_async(
|
||||
&mut self,
|
||||
module_specifier: &ModuleSpecifier,
|
||||
maybe_code: Option<String>,
|
||||
is_prefetch: bool,
|
||||
) -> Result<(), ErrBox> {
|
||||
let specifier = module_specifier.to_string();
|
||||
let worker = self.clone();
|
||||
|
||||
let mut isolate = self.isolate.lock().await;
|
||||
let id = isolate.load_module(&specifier, maybe_code).await?;
|
||||
worker.state.global_state.progress.done();
|
||||
|
||||
if !is_prefetch {
|
||||
return isolate.mod_evaluate(id);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Post message to worker as a host.
|
||||
///
|
||||
/// This method blocks current thread.
|
||||
pub fn post_message(
|
||||
&self,
|
||||
buf: Buf,
|
||||
) -> impl Future<Output = Result<(), ErrBox>> {
|
||||
let channels = self.external_channels.lock().unwrap();
|
||||
let mut sender = channels.sender.clone();
|
||||
async move {
|
||||
let result = sender.send(buf).map_err(ErrBox::from).await;
|
||||
drop(sender);
|
||||
result
|
||||
impl Deref for WebWorker {
|
||||
type Target = Worker;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Get message from worker as a host.
|
||||
pub fn get_message(&self) -> WorkerReceiver {
|
||||
WorkerReceiver {
|
||||
channels: self.external_channels.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn clear_exception(&mut self) {
|
||||
let mut isolate = self.isolate.try_lock().unwrap();
|
||||
isolate.clear_exception();
|
||||
impl DerefMut for WebWorker {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -132,14 +61,6 @@ impl Future for WebWorker {
|
|||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let inner = self.get_mut();
|
||||
let waker = AtomicWaker::new();
|
||||
waker.register(cx.waker());
|
||||
match inner.isolate.try_lock() {
|
||||
Ok(mut isolate) => isolate.poll_unpin(cx),
|
||||
Err(_) => {
|
||||
waker.wake();
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
inner.0.poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
|
151
cli/worker.rs
151
cli/worker.rs
|
@ -15,9 +15,10 @@ use futures::stream::StreamExt;
|
|||
use futures::task::AtomicWaker;
|
||||
use std::env;
|
||||
use std::future::Future;
|
||||
use std::ops::Deref;
|
||||
use std::ops::DerefMut;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
|
@ -26,19 +27,30 @@ use url::Url;
|
|||
/// Wraps mpsc channels so they can be referenced
|
||||
/// from ops and used to facilitate parent-child communication
|
||||
/// for workers.
|
||||
#[derive(Clone)]
|
||||
pub struct WorkerChannels {
|
||||
pub sender: mpsc::Sender<Buf>,
|
||||
pub receiver: mpsc::Receiver<Buf>,
|
||||
pub receiver: Arc<AsyncMutex<mpsc::Receiver<Buf>>>,
|
||||
}
|
||||
|
||||
/// Wraps deno_core::Isolate to provide source maps, ops for the CLI, and
|
||||
/// high-level module loading.
|
||||
/// Worker is a CLI wrapper for `deno_core::Isolate`.
|
||||
///
|
||||
/// It provides infrastructure to communicate with a worker and
|
||||
/// consequently between workers.
|
||||
///
|
||||
/// This struct is meant to be used as a base struct for concrete
|
||||
/// type of worker that registers set of ops.
|
||||
///
|
||||
/// Currently there are three types of workers:
|
||||
/// - `MainWorker`
|
||||
/// - `CompilerWorker`
|
||||
/// - `WebWorker`
|
||||
#[derive(Clone)]
|
||||
pub struct Worker {
|
||||
pub name: String,
|
||||
pub isolate: Arc<AsyncMutex<Box<deno_core::EsIsolate>>>,
|
||||
pub state: ThreadSafeState,
|
||||
external_channels: Arc<Mutex<WorkerChannels>>,
|
||||
external_channels: WorkerChannels,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
|
@ -50,26 +62,6 @@ impl Worker {
|
|||
) -> Self {
|
||||
let mut isolate =
|
||||
deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false);
|
||||
let op_registry = isolate.op_registry.clone();
|
||||
|
||||
ops::compiler::init(&mut isolate, &state);
|
||||
ops::errors::init(&mut isolate, &state);
|
||||
ops::fetch::init(&mut isolate, &state);
|
||||
ops::files::init(&mut isolate, &state);
|
||||
ops::fs::init(&mut isolate, &state);
|
||||
ops::io::init(&mut isolate, &state);
|
||||
ops::plugins::init(&mut isolate, &state, op_registry);
|
||||
ops::net::init(&mut isolate, &state);
|
||||
ops::tls::init(&mut isolate, &state);
|
||||
ops::os::init(&mut isolate, &state);
|
||||
ops::permissions::init(&mut isolate, &state);
|
||||
ops::process::init(&mut isolate, &state);
|
||||
ops::random::init(&mut isolate, &state);
|
||||
ops::repl::init(&mut isolate, &state);
|
||||
ops::resources::init(&mut isolate, &state);
|
||||
ops::timers::init(&mut isolate, &state);
|
||||
ops::worker_host::init(&mut isolate, &state);
|
||||
ops::web_worker::init(&mut isolate, &state);
|
||||
|
||||
let global_state_ = state.global_state.clone();
|
||||
isolate.set_js_error_create(move |v8_exception| {
|
||||
|
@ -80,7 +72,7 @@ impl Worker {
|
|||
name,
|
||||
isolate: Arc::new(AsyncMutex::new(isolate)),
|
||||
state,
|
||||
external_channels: Arc::new(Mutex::new(external_channels)),
|
||||
external_channels,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,24 +120,24 @@ impl Worker {
|
|||
/// Post message to worker as a host.
|
||||
///
|
||||
/// This method blocks current thread.
|
||||
pub fn post_message(
|
||||
&self,
|
||||
buf: Buf,
|
||||
) -> impl Future<Output = Result<(), ErrBox>> {
|
||||
let channels = self.external_channels.lock().unwrap();
|
||||
let mut sender = channels.sender.clone();
|
||||
async move {
|
||||
pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> {
|
||||
let mut sender = self.external_channels.sender.clone();
|
||||
let result = sender.send(buf).map_err(ErrBox::from).await;
|
||||
drop(sender);
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
/// Get message from worker as a host.
|
||||
pub fn get_message(&self) -> WorkerReceiver {
|
||||
WorkerReceiver {
|
||||
channels: self.external_channels.clone(),
|
||||
pub fn get_message(
|
||||
&self,
|
||||
) -> Pin<Box<dyn Future<Output = Option<Buf>> + Send>> {
|
||||
let receiver_mutex = self.external_channels.receiver.clone();
|
||||
|
||||
async move {
|
||||
let mut receiver = receiver_mutex.lock().await;
|
||||
receiver.next().await
|
||||
}
|
||||
.boxed()
|
||||
}
|
||||
|
||||
pub fn clear_exception(&mut self) {
|
||||
|
@ -171,22 +163,71 @@ impl Future for Worker {
|
|||
}
|
||||
}
|
||||
|
||||
/// This structure wraps worker's resource id to implement future
|
||||
/// that will return message received from worker or None
|
||||
/// if worker's channel has been closed.
|
||||
pub struct WorkerReceiver {
|
||||
pub channels: Arc<Mutex<WorkerChannels>>,
|
||||
/// This worker is created and used by Deno executable.
|
||||
///
|
||||
/// It provides ops available in the `Deno` namespace.
|
||||
///
|
||||
/// All WebWorkers created during program execution are decendants of
|
||||
/// this worker.
|
||||
#[derive(Clone)]
|
||||
pub struct MainWorker(Worker);
|
||||
|
||||
impl MainWorker {
|
||||
pub fn new(
|
||||
name: String,
|
||||
startup_data: StartupData,
|
||||
state: ThreadSafeState,
|
||||
external_channels: WorkerChannels,
|
||||
) -> Self {
|
||||
let state_ = state.clone();
|
||||
let worker = Worker::new(name, startup_data, state_, external_channels);
|
||||
{
|
||||
let mut isolate = worker.isolate.try_lock().unwrap();
|
||||
let op_registry = isolate.op_registry.clone();
|
||||
|
||||
ops::runtime_compiler::init(&mut isolate, &state);
|
||||
ops::errors::init(&mut isolate, &state);
|
||||
ops::fetch::init(&mut isolate, &state);
|
||||
ops::files::init(&mut isolate, &state);
|
||||
ops::fs::init(&mut isolate, &state);
|
||||
ops::io::init(&mut isolate, &state);
|
||||
ops::plugins::init(&mut isolate, &state, op_registry);
|
||||
ops::net::init(&mut isolate, &state);
|
||||
ops::tls::init(&mut isolate, &state);
|
||||
ops::os::init(&mut isolate, &state);
|
||||
ops::permissions::init(&mut isolate, &state);
|
||||
ops::process::init(&mut isolate, &state);
|
||||
ops::random::init(&mut isolate, &state);
|
||||
ops::repl::init(&mut isolate, &state);
|
||||
ops::resources::init(&mut isolate, &state);
|
||||
ops::timers::init(&mut isolate, &state);
|
||||
ops::worker_host::init(&mut isolate, &state);
|
||||
ops::web_worker::init(&mut isolate, &state);
|
||||
}
|
||||
|
||||
impl Future for WorkerReceiver {
|
||||
type Output = Result<Option<Buf>, ErrBox>;
|
||||
Self(worker)
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for MainWorker {
|
||||
type Target = Worker;
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for MainWorker {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for MainWorker {
|
||||
type Output = Result<(), ErrBox>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||
let mut channels = self.channels.lock().unwrap();
|
||||
match channels.receiver.poll_next_unpin(cx) {
|
||||
Poll::Ready(v) => Poll::Ready(Ok(v)),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
let inner = self.get_mut();
|
||||
inner.0.poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -248,7 +289,7 @@ mod tests {
|
|||
let state_ = state.clone();
|
||||
tokio_util::run(async move {
|
||||
let mut worker =
|
||||
Worker::new("TEST".to_string(), StartupData::None, state, ext);
|
||||
MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
|
||||
let result = worker
|
||||
.execute_mod_async(&module_specifier, None, false)
|
||||
.await;
|
||||
|
@ -291,7 +332,7 @@ mod tests {
|
|||
let state_ = state.clone();
|
||||
tokio_util::run(async move {
|
||||
let mut worker =
|
||||
Worker::new("TEST".to_string(), StartupData::None, state, ext);
|
||||
MainWorker::new("TEST".to_string(), StartupData::None, state, ext);
|
||||
let result = worker
|
||||
.execute_mod_async(&module_specifier, None, false)
|
||||
.await;
|
||||
|
@ -333,7 +374,7 @@ mod tests {
|
|||
let global_state_ = global_state;
|
||||
let state_ = state.clone();
|
||||
tokio_util::run(async move {
|
||||
let mut worker = Worker::new(
|
||||
let mut worker = MainWorker::new(
|
||||
"TEST".to_string(),
|
||||
startup_data::deno_isolate_init(),
|
||||
state,
|
||||
|
@ -359,13 +400,13 @@ mod tests {
|
|||
drop(http_server_guard);
|
||||
}
|
||||
|
||||
fn create_test_worker() -> Worker {
|
||||
fn create_test_worker() -> MainWorker {
|
||||
let (int, ext) = ThreadSafeState::create_channels();
|
||||
let state = ThreadSafeState::mock(
|
||||
vec![String::from("./deno"), String::from("hello.js")],
|
||||
int,
|
||||
);
|
||||
let mut worker = Worker::new(
|
||||
let mut worker = MainWorker::new(
|
||||
"TEST".to_string(),
|
||||
startup_data::deno_isolate_init(),
|
||||
state,
|
||||
|
@ -409,7 +450,7 @@ mod tests {
|
|||
let r = block_on(worker_.post_message(msg));
|
||||
assert!(r.is_ok());
|
||||
|
||||
let maybe_msg = block_on(worker_.get_message()).unwrap();
|
||||
let maybe_msg = block_on(worker_.get_message());
|
||||
assert!(maybe_msg.is_some());
|
||||
// Check if message received is [1, 2, 3] in json
|
||||
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
|
||||
|
|
Loading…
Reference in a new issue