1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

refactor: worker is no longer a resource (#3290)

This commit is contained in:
Bartek Iwańczuk 2019-11-09 21:07:14 +01:00 committed by Bert Belder
parent d586f119fa
commit 335e8bd33c
7 changed files with 153 additions and 142 deletions

View file

@ -223,8 +223,10 @@ impl TsCompiler {
/// Create a new V8 worker with snapshot of TS compiler and setup compiler's runtime.
fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker {
let worker_state = ThreadSafeState::new(global_state.clone(), None, true)
.expect("Unable to create worker state");
let (int, ext) = ThreadSafeState::create_channels();
let worker_state =
ThreadSafeState::new(global_state.clone(), None, true, int)
.expect("Unable to create worker state");
// Count how many times we start the compiler worker.
global_state
@ -236,6 +238,7 @@ impl TsCompiler {
"TS".to_string(),
startup_data::compiler_isolate_init(),
worker_state,
ext,
);
worker.execute("denoMain()").unwrap();
worker.execute("workerMain()").unwrap();

View file

@ -2647,7 +2647,7 @@ declare namespace workers {
noDenoNamespace?: boolean;
}
export class WorkerImpl implements Worker {
private readonly rid;
private readonly id;
private isClosing;
private readonly isClosedPromise;
onerror?: () => void;

View file

@ -35,17 +35,17 @@ function createWorker(
});
}
async function hostGetWorkerClosed(rid: number): Promise<void> {
await sendAsync(dispatch.OP_HOST_GET_WORKER_CLOSED, { rid });
async function hostGetWorkerClosed(id: number): Promise<void> {
await sendAsync(dispatch.OP_HOST_GET_WORKER_CLOSED, { id });
}
function hostPostMessage(rid: number, data: any): void {
function hostPostMessage(id: number, data: any): void {
const dataIntArray = encodeMessage(data);
sendSync(dispatch.OP_HOST_POST_MESSAGE, { rid }, dataIntArray);
sendSync(dispatch.OP_HOST_POST_MESSAGE, { id }, dataIntArray);
}
async function hostGetMessage(rid: number): Promise<any> {
const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { rid });
async function hostGetMessage(id: number): Promise<any> {
const res = await sendAsync(dispatch.OP_HOST_GET_MESSAGE, { id });
if (res.data != null) {
return decodeMessage(new Uint8Array(res.data));
@ -123,7 +123,7 @@ export interface DenoWorkerOptions extends WorkerOptions {
}
export class WorkerImpl implements Worker {
private readonly rid: number;
private readonly id: number;
private isClosing = false;
private readonly isClosedPromise: Promise<void>;
public onerror?: () => void;
@ -152,14 +152,14 @@ export class WorkerImpl implements Worker {
sourceCode = blobBytes!;
}
this.rid = createWorker(
this.id = createWorker(
specifier,
includeDenoNamespace,
hasSourceCode,
sourceCode
);
this.run();
this.isClosedPromise = hostGetWorkerClosed(this.rid);
this.isClosedPromise = hostGetWorkerClosed(this.id);
this.isClosedPromise.then(
(): void => {
this.isClosing = true;
@ -172,12 +172,12 @@ export class WorkerImpl implements Worker {
}
postMessage(data: any): void {
hostPostMessage(this.rid, data);
hostPostMessage(this.id, data);
}
private async run(): Promise<void> {
while (!this.isClosing) {
const data = await hostGetMessage(this.rid);
const data = await hostGetMessage(this.id);
if (data == null) {
log("worker got null message. quitting.");
break;

View file

@ -118,16 +118,22 @@ fn create_worker_and_state(
.map_err(deno_error::print_err_and_exit)
.unwrap();
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(
global_state.clone(),
global_state.main_module.clone(),
true,
int,
)
.map_err(deno_error::print_err_and_exit)
.unwrap();
let worker =
Worker::new("main".to_string(), startup_data::deno_isolate_init(), state);
let worker = Worker::new(
"main".to_string(),
startup_data::deno_isolate_init(),
state,
ext,
);
(worker, global_state)
}

View file

@ -1,5 +1,6 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use super::dispatch_json::{Deserialize, JsonOp, Value};
use crate::deno_error::bad_resource;
use crate::deno_error::js_check;
use crate::deno_error::DenoError;
use crate::deno_error::ErrorKind;
@ -11,7 +12,6 @@ use deno::*;
use futures;
use futures::Async;
use futures::Future;
use futures::IntoFuture;
use futures::Sink;
use futures::Stream;
use std;
@ -138,23 +138,23 @@ fn op_create_worker(
}
}
let (int, ext) = ThreadSafeState::create_channels();
let child_state = ThreadSafeState::new(
state.global_state.clone(),
Some(module_specifier.clone()),
include_deno_namespace,
int,
)?;
let rid = child_state.rid;
let name = format!("USER-WORKER-{}", specifier);
let deno_main_call = format!("denoMain({})", include_deno_namespace);
let mut worker =
Worker::new(name, startup_data::deno_isolate_init(), child_state);
Worker::new(name, startup_data::deno_isolate_init(), child_state, ext);
js_check(worker.execute(&deno_main_call));
js_check(worker.execute("workerMain()"));
let exec_cb = move |worker: Worker| {
let mut workers_tl = parent_state.workers.lock().unwrap();
workers_tl.insert(rid, worker.shared());
json!(rid)
let worker_id = parent_state.add_child_worker(worker);
json!(worker_id)
};
// Has provided source code, execute immediately.
@ -173,7 +173,7 @@ fn op_create_worker(
#[derive(Deserialize)]
struct HostGetWorkerClosedArgs {
rid: i32,
id: i32,
}
/// Return when the worker closes
@ -183,37 +183,41 @@ fn op_host_get_worker_closed(
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostGetWorkerClosedArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let state_ = state.clone();
let workers_table = state.workers.lock().unwrap();
// TODO: handle bad worker id gracefully
let worker = workers_table.get(&id).unwrap();
let shared_worker_future = worker.clone().shared();
let rid = args.rid as u32;
let state = state.clone();
let shared_worker_future = {
let workers_tl = state.workers.lock().unwrap();
let worker = workers_tl.get(&rid).unwrap();
worker.clone()
};
let op =
shared_worker_future.then(move |_result| futures::future::ok(json!({})));
let op = shared_worker_future.then(move |_result| {
let mut workers_table = state_.workers.lock().unwrap();
workers_table.remove(&id);
futures::future::ok(json!({}))
});
Ok(JsonOp::Async(Box::new(op)))
}
#[derive(Deserialize)]
struct HostGetMessageArgs {
rid: i32,
id: i32,
}
/// Get message from guest worker as host
fn op_host_get_message(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
_data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostGetMessageArgs = serde_json::from_value(args)?;
let rid = args.rid as u32;
let op = Worker::get_message_from_resource(rid)
let id = args.id as u32;
let mut table = state.workers.lock().unwrap();
// TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
let op = worker
.get_message()
.map_err(move |_| -> ErrBox { unimplemented!() })
.and_then(move |maybe_buf| {
futures::future::ok(json!({
@ -226,27 +230,26 @@ fn op_host_get_message(
#[derive(Deserialize)]
struct HostPostMessageArgs {
rid: i32,
id: i32,
}
/// Post message to guest worker as host
fn op_host_post_message(
_state: &ThreadSafeState,
state: &ThreadSafeState,
args: Value,
data: Option<PinnedBuf>,
) -> Result<JsonOp, ErrBox> {
let args: HostPostMessageArgs = serde_json::from_value(args)?;
let id = args.id as u32;
let msg = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
let rid = args.rid as u32;
let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice();
// TODO: rename to post_message_to_child(rid, d)
Worker::post_message_to_resource(rid, d)
.into_future()
.wait()
debug!("post message to worker {}", id);
let mut table = state.workers.lock().unwrap();
// TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
worker
.post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
Ok(JsonOp::Sync(json!({})))
}

View file

@ -6,8 +6,6 @@ use crate::import_map::ImportMap;
use crate::metrics::Metrics;
use crate::ops::JsonOp;
use crate::permissions::DenoPermissions;
use crate::resources;
use crate::resources::ResourceId;
use crate::worker::Worker;
use crate::worker::WorkerChannels;
use deno::Buf;
@ -17,7 +15,6 @@ use deno::Loader;
use deno::ModuleSpecifier;
use deno::Op;
use deno::PinnedBuf;
use futures::future::Shared;
use futures::Future;
use rand::rngs::StdRng;
use rand::SeedableRng;
@ -26,15 +23,12 @@ use std;
use std::collections::HashMap;
use std::ops::Deref;
use std::str;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Instant;
use tokio::sync::mpsc as async_mpsc;
// TODO: hold references to concrete Workers instead of shared futures of
// those workers?
pub type UserWorkerTable = HashMap<ResourceId, Shared<Worker>>;
use tokio::sync::mpsc;
/// Isolate cannot be passed between threads but ThreadSafeState can.
/// ThreadSafeState satisfies Send and Sync. So any state that needs to be
@ -53,10 +47,9 @@ pub struct State {
pub import_map: Option<ImportMap>,
pub metrics: Metrics,
pub global_timer: Mutex<GlobalTimer>,
pub workers: Mutex<UserWorkerTable>,
pub workers: Mutex<HashMap<u32, Worker>>,
pub next_worker_id: AtomicUsize,
pub start_time: Instant,
/// A reference to this worker's resource.
pub rid: ResourceId,
pub seeded_rng: Option<Mutex<StdRng>>,
pub include_deno_namespace: bool,
}
@ -179,25 +172,26 @@ impl Loader for ThreadSafeState {
}
impl ThreadSafeState {
pub fn create_channels() -> (WorkerChannels, WorkerChannels) {
let (in_tx, in_rx) = mpsc::channel::<Buf>(1);
let (out_tx, out_rx) = mpsc::channel::<Buf>(1);
let internal_channels = WorkerChannels {
sender: out_tx,
receiver: in_rx,
};
let external_channels = WorkerChannels {
sender: in_tx,
receiver: out_rx,
};
(internal_channels, external_channels)
}
pub fn new(
global_state: ThreadSafeGlobalState,
main_module: Option<ModuleSpecifier>,
include_deno_namespace: bool,
internal_channels: WorkerChannels,
) -> Result<Self, ErrBox> {
let (worker_in_tx, worker_in_rx) = async_mpsc::channel::<Buf>(1);
let (worker_out_tx, worker_out_rx) = async_mpsc::channel::<Buf>(1);
let internal_channels = WorkerChannels {
sender: worker_out_tx,
receiver: worker_in_rx,
};
let external_channels = WorkerChannels {
sender: worker_in_tx,
receiver: worker_out_rx,
};
let mut table = resources::lock_resource_table();
let rid = table.add("worker", Box::new(external_channels));
let import_map: Option<ImportMap> =
match global_state.flags.import_map_path.as_ref() {
None => None,
@ -221,9 +215,9 @@ impl ThreadSafeState {
worker_channels: Mutex::new(internal_channels),
metrics: Metrics::default(),
global_timer: Mutex::new(GlobalTimer::new()),
workers: Mutex::new(UserWorkerTable::new()),
workers: Mutex::new(HashMap::new()),
next_worker_id: AtomicUsize::new(0),
start_time: Instant::now(),
rid,
seeded_rng,
include_deno_namespace,
};
@ -231,6 +225,13 @@ impl ThreadSafeState {
Ok(ThreadSafeState(Arc::new(state)))
}
pub fn add_child_worker(&self, worker: Worker) -> u32 {
let worker_id = self.next_worker_id.fetch_add(1, Ordering::Relaxed) as u32;
let mut workers_tl = self.workers.lock().unwrap();
workers_tl.insert(worker_id, worker);
worker_id
}
#[inline]
pub fn check_read(&self, filename: &str) -> Result<(), ErrBox> {
self.permissions.check_read(filename)
@ -286,7 +287,10 @@ impl ThreadSafeState {
}
#[cfg(test)]
pub fn mock(argv: Vec<String>) -> ThreadSafeState {
pub fn mock(
argv: Vec<String>,
internal_channels: WorkerChannels,
) -> ThreadSafeState {
let module_specifier = if argv.is_empty() {
None
} else {
@ -299,6 +303,7 @@ impl ThreadSafeState {
ThreadSafeGlobalState::mock(argv),
module_specifier,
true,
internal_channels,
)
.unwrap()
}
@ -331,8 +336,9 @@ impl ThreadSafeState {
#[test]
fn thread_safe() {
fn f<S: Send + Sync>(_: S) {}
f(ThreadSafeState::mock(vec![
String::from("./deno"),
String::from("hello.js"),
]));
let (int, _) = ThreadSafeState::create_channels();
f(ThreadSafeState::mock(
vec![String::from("./deno"), String::from("hello.js")],
int,
));
}

View file

@ -1,10 +1,6 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use crate::deno_error::bad_resource;
use crate::fmt_errors::JSError;
use crate::ops;
use crate::resources;
use crate::resources::Resource;
use crate::resources::ResourceId;
use crate::state::ThreadSafeState;
use deno;
use deno::Buf;
@ -23,7 +19,7 @@ use std::sync::Mutex;
use tokio::sync::mpsc;
use url::Url;
/// Wraps mpsc channels into a generic resource so they can be referenced
/// Wraps mpsc channels so they can be referenced
/// from ops and used to facilitate parent-child communication
/// for workers.
pub struct WorkerChannels {
@ -31,8 +27,6 @@ pub struct WorkerChannels {
pub receiver: mpsc::Receiver<Buf>,
}
impl Resource for WorkerChannels {}
/// Wraps deno::Isolate to provide source maps, ops for the CLI, and
/// high-level module loading.
#[derive(Clone)]
@ -40,6 +34,7 @@ pub struct Worker {
pub name: String,
isolate: Arc<Mutex<deno::Isolate>>,
pub state: ThreadSafeState,
external_channels: Arc<Mutex<WorkerChannels>>,
}
impl Worker {
@ -47,6 +42,7 @@ impl Worker {
name: String,
startup_data: StartupData,
state: ThreadSafeState,
external_channels: WorkerChannels,
) -> Self {
let isolate = Arc::new(Mutex::new(deno::Isolate::new(startup_data, false)));
{
@ -86,10 +82,12 @@ impl Worker {
JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler)
})
}
Self {
name,
isolate,
state,
external_channels: Arc::new(Mutex::new(external_channels)),
}
}
@ -140,35 +138,20 @@ impl Worker {
})
}
/// Post message to worker as a host or privileged overlord
pub fn post_message(self: &Self, buf: Buf) -> Result<Async<()>, ErrBox> {
Worker::post_message_to_resource(self.state.rid, buf)
}
pub fn post_message_to_resource(
rid: resources::ResourceId,
buf: Buf,
) -> Result<Async<()>, ErrBox> {
debug!("post message to resource {}", rid);
let mut table = resources::lock_resource_table();
let worker = table
.get_mut::<WorkerChannels>(rid)
.ok_or_else(bad_resource)?;
let sender = &mut worker.sender;
sender
.send(buf)
.poll()
.map(|_| Async::Ready(()))
.map_err(ErrBox::from)
/// Post message to worker as a host.
///
/// This method blocks current thread.
pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> {
let mut channels = self.external_channels.lock().unwrap();
let sender = &mut channels.sender;
sender.send(buf).wait().map(|_| ()).map_err(ErrBox::from)
}
/// Get message from worker as a host.
pub fn get_message(self: &Self) -> WorkerReceiver {
Worker::get_message_from_resource(self.state.rid)
}
pub fn get_message_from_resource(rid: ResourceId) -> WorkerReceiver {
debug!("get message from resource {}", rid);
WorkerReceiver { rid }
WorkerReceiver {
channels: self.external_channels.clone(),
}
}
}
@ -186,7 +169,7 @@ impl Future for Worker {
/// that will return message received from worker or None
/// if worker's channel has been closed.
pub struct WorkerReceiver {
rid: ResourceId,
channels: Arc<Mutex<WorkerChannels>>,
}
impl Future for WorkerReceiver {
@ -194,12 +177,8 @@ impl Future for WorkerReceiver {
type Error = ErrBox;
fn poll(&mut self) -> Poll<Option<Buf>, ErrBox> {
let mut table = resources::lock_resource_table();
let worker = table
.get_mut::<WorkerChannels>(self.rid)
.ok_or_else(bad_resource)?;
let receiver = &mut worker.receiver;
receiver.poll().map_err(ErrBox::from)
let mut channels = self.channels.lock().unwrap();
channels.receiver.poll().map_err(ErrBox::from)
}
}
@ -214,7 +193,6 @@ mod tests {
use crate::state::ThreadSafeState;
use crate::tokio_util;
use futures::future::lazy;
use futures::IntoFuture;
use std::sync::atomic::Ordering;
#[test]
@ -233,13 +211,18 @@ mod tests {
Progress::new(),
)
.unwrap();
let state =
ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
.unwrap();
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(
global_state,
Some(module_specifier.clone()),
true,
int,
)
.unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
Worker::new("TEST".to_string(), StartupData::None, state);
Worker::new("TEST".to_string(), StartupData::None, state, ext);
worker
.execute_mod_async(&module_specifier, None, false)
.then(|result| {
@ -269,13 +252,18 @@ mod tests {
let global_state =
ThreadSafeGlobalState::new(DenoFlags::default(), argv, Progress::new())
.unwrap();
let state =
ThreadSafeState::new(global_state, Some(module_specifier.clone()), true)
.unwrap();
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(
global_state,
Some(module_specifier.clone()),
true,
int,
)
.unwrap();
let state_ = state.clone();
tokio_util::run(lazy(move || {
let mut worker =
Worker::new("TEST".to_string(), StartupData::None, state);
Worker::new("TEST".to_string(), StartupData::None, state, ext);
worker
.execute_mod_async(&module_specifier, None, false)
.then(|result| {
@ -308,10 +296,12 @@ mod tests {
flags.reload = true;
let global_state =
ThreadSafeGlobalState::new(flags, argv, Progress::new()).unwrap();
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::new(
global_state.clone(),
Some(module_specifier.clone()),
true,
int,
)
.unwrap();
let global_state_ = global_state.clone();
@ -321,6 +311,7 @@ mod tests {
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
ext,
);
worker.execute("denoMain()").unwrap();
worker
@ -343,12 +334,17 @@ mod tests {
}
fn create_test_worker() -> Worker {
let state = ThreadSafeState::mock(vec![
String::from("./deno"),
String::from("hello.js"),
]);
let mut worker =
Worker::new("TEST".to_string(), startup_data::deno_isolate_init(), state);
let (int, ext) = ThreadSafeState::create_channels();
let state = ThreadSafeState::mock(
vec![String::from("./deno"), String::from("hello.js")],
int,
);
let mut worker = Worker::new(
"TEST".to_string(),
startup_data::deno_isolate_init(),
state,
ext,
);
worker.execute("denoMain()").unwrap();
worker.execute("workerMain()").unwrap();
worker
@ -384,7 +380,7 @@ mod tests {
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = worker_.post_message(msg).into_future().wait();
let r = worker_.post_message(msg);
assert!(r.is_ok());
let maybe_msg = worker_.get_message().wait().unwrap();
@ -396,7 +392,7 @@ mod tests {
.to_string()
.into_boxed_str()
.into_boxed_bytes();
let r = worker_.post_message(msg).into_future().wait();
let r = worker_.post_message(msg);
assert!(r.is_ok());
})
}
@ -409,9 +405,7 @@ mod tests {
.execute("onmessage = () => { delete window.onmessage; }")
.unwrap();
let rid = worker.state.rid;
let worker_ = worker.clone();
let worker_future = worker
.then(move |r| -> Result<(), ()> {
println!("workers.rs after resource close");
@ -424,9 +418,8 @@ mod tests {
tokio::spawn(lazy(move || worker_future_.then(|_| Ok(()))));
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = worker_.post_message(msg).into_future().wait();
let r = worker_.post_message(msg);
assert!(r.is_ok());
debug!("rid {:?}", rid);
worker_future.wait().unwrap();
})