1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-24 15:19:26 -05:00
denoland-deno/cli/workers.rs
andy finch b0a23beb8f Add web worker JS API (#1993)
* Refactored the way worker polling is scheduled and errors are handled.
* Share the worker future as a Shared
2019-04-01 15:09:59 -04:00

287 lines
7.6 KiB
Rust

// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
use crate::errors::*;
use crate::flags::DenoFlags;
use crate::isolate::{DenoBehavior, Isolate};
use crate::isolate_state::IsolateState;
use crate::isolate_state::IsolateStateContainer;
use crate::isolate_state::WorkerChannels;
use crate::ops;
use crate::resources;
use crate::startup_data;
use deno::deno_buf;
use deno::Behavior;
use deno::Buf;
use deno::JSError;
use deno::Op;
use deno::StartupData;
use futures::sync::mpsc;
use futures::Future;
use futures::Poll;
use std::sync::Arc;
pub struct UserWorkerBehavior {
pub state: Arc<IsolateState>,
}
impl UserWorkerBehavior {
pub fn new(flags: DenoFlags, argv_rest: Vec<String>) -> Self {
Self {
state: Arc::new(IsolateState::new(flags, argv_rest, None, true)),
}
}
}
impl IsolateStateContainer for UserWorkerBehavior {
fn state(&self) -> Arc<IsolateState> {
self.state.clone()
}
}
impl IsolateStateContainer for &UserWorkerBehavior {
fn state(&self) -> Arc<IsolateState> {
self.state.clone()
}
}
impl Behavior for UserWorkerBehavior {
fn startup_data(&mut self) -> Option<StartupData> {
Some(startup_data::deno_isolate_init())
}
fn dispatch(
&mut self,
control: &[u8],
zero_copy: deno_buf,
) -> (bool, Box<Op>) {
ops::dispatch_all(self, control, zero_copy, ops::op_selector_worker)
}
}
impl WorkerBehavior for UserWorkerBehavior {
fn set_internal_channels(&mut self, worker_channels: WorkerChannels) {
self.state = Arc::new(IsolateState::new(
self.state.flags.clone(),
self.state.argv.clone(),
Some(worker_channels),
true,
));
}
}
/// Behavior trait specific to workers
pub trait WorkerBehavior: DenoBehavior {
/// Used to setup internal channels at worker creation.
/// This is intended to be temporary fix.
/// TODO(afinch7) come up with a better solution to set worker channels
fn set_internal_channels(&mut self, worker_channels: WorkerChannels);
}
/// Rust interface for WebWorkers.
pub struct Worker<B: WorkerBehavior> {
isolate: Isolate<B>,
pub resource: resources::Resource,
}
impl<B: WorkerBehavior> Worker<B> {
pub fn new(mut behavior: B) -> Self {
let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1);
let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1);
let internal_channels = (worker_out_tx, worker_in_rx);
let external_channels = (worker_in_tx, worker_out_rx);
behavior.set_internal_channels(internal_channels);
let isolate = Isolate::new(behavior);
Worker {
isolate,
resource: resources::add_worker(external_channels),
}
}
pub fn execute(&mut self, js_source: &str) -> Result<(), JSError> {
self.isolate.execute(js_source)
}
pub fn execute_mod(
&mut self,
js_filename: &str,
is_prefetch: bool,
) -> Result<(), RustOrJsError> {
self.isolate.execute_mod(js_filename, is_prefetch)
}
}
impl<B: WorkerBehavior> Future for Worker<B> {
type Item = ();
type Error = JSError;
fn poll(&mut self) -> Poll<(), JSError> {
self.isolate.poll()
}
}
/// Method and data used to initalize a worker
pub enum WorkerInit {
Script(String),
Module(String),
}
pub fn spawn<B: WorkerBehavior + 'static>(
behavior: B,
worker_debug_name: &str,
init: WorkerInit,
) -> Result<Worker<B>, RustOrJsError> {
let state = behavior.state().clone();
let mut worker = Worker::new(behavior);
worker
.execute(&format!("denoMain('{}')", worker_debug_name))
.expect("worker workerInit failed");
worker
.execute("workerMain()")
.expect("worker workerMain failed");
let init_result = match init {
WorkerInit::Script(script) => match worker.execute(&script) {
Ok(v) => Ok(v),
Err(e) => Err(RustOrJsError::Js(e)),
},
WorkerInit::Module(specifier) => {
let should_prefetch = state.flags.prefetch || state.flags.info;
match state.dir.resolve_module_url(&specifier, ".") {
Err(err) => Err(RustOrJsError::Rust(DenoError::from(err))),
Ok(module_url) => {
worker.execute_mod(&module_url.to_string(), should_prefetch)
}
}
}
};
match init_result {
Ok(_) => Ok(worker),
Err(err) => Err(err),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compiler::CompilerBehavior;
use crate::isolate_state::IsolateState;
use crate::js_errors::JSErrorColor;
use crate::tokio_util;
use futures::future::lazy;
use std::thread;
#[test]
fn test_spawn() {
tokio_util::init(|| {
let worker_result = spawn(
CompilerBehavior::new(
IsolateState::mock().flags.clone(),
IsolateState::mock().argv.clone(),
),
"TEST",
WorkerInit::Script(
r#"
onmessage = function(e) {
console.log("msg from main script", e.data);
if (e.data == "exit") {
close();
return;
} else {
console.assert(e.data === "hi");
}
postMessage([1, 2, 3]);
console.log("after postMessage");
}
"#.into(),
),
);
assert!(worker_result.is_ok());
let worker = worker_result.unwrap();
let resource = worker.resource.clone();
let resource_ = resource.clone();
tokio::spawn(lazy(move || {
worker.then(move |r| -> Result<(), ()> {
resource_.close();
debug!("workers.rs after resource close");
if let Err(err) = r {
eprintln!("{}", JSErrorColor(&err).to_string());
assert!(false)
}
Ok(())
})
}));
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = resources::post_message_to_worker(resource.rid, msg).wait();
assert!(r.is_ok());
let maybe_msg = resources::get_message_from_worker(resource.rid)
.wait()
.unwrap();
assert!(maybe_msg.is_some());
// Check if message received is [1, 2, 3] in json
assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");
let msg = json!("exit")
.to_string()
.into_boxed_str()
.into_boxed_bytes();
let r = resources::post_message_to_worker(resource.rid, msg).wait();
assert!(r.is_ok());
})
}
#[test]
fn removed_from_resource_table_on_close() {
tokio_util::init(|| {
let worker_result = spawn(
CompilerBehavior::new(
IsolateState::mock().flags.clone(),
IsolateState::mock().argv.clone(),
),
"TEST",
WorkerInit::Script("onmessage = () => close();".into()),
);
assert!(worker_result.is_ok());
let worker = worker_result.unwrap();
let resource = worker.resource.clone();
let resource_ = resource.clone();
tokio::spawn(lazy(move || {
worker.then(move |r| -> Result<(), ()> {
resource_.close();
debug!("workers.rs after resource close");
if let Err(err) = r {
eprintln!("{}", JSErrorColor(&err).to_string());
assert!(false)
}
Ok(())
})
}));
assert_eq!(
resources::get_type(resource.rid),
Some("worker".to_string())
);
let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = resources::post_message_to_worker(resource.rid, msg).wait();
assert!(r.is_ok());
println!("rid {:?}", resource.rid);
// TODO Need a way to get a future for when a resource closes.
// For now, just sleep for a bit.
// resource.close();
thread::sleep(std::time::Duration::from_millis(1000));
assert_eq!(resources::get_type(resource.rid), None);
})
}
}