From 6f79ad721a9f8c9d66d79f21ea479286f3ca5374 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 8 Jan 2019 14:44:06 -0500 Subject: [PATCH] Minimal Worker support (#1476) This adds the ability to spawn additional Isolates from Rust and send and receive messages from them. This is preliminary work to support running the typescript compiler in a separate isolate and thus support native ES modules. Ref #975. --- BUILD.gn | 1 + js/globals.ts | 6 ++ js/workers.ts | 75 ++++++++++++++++++++ src/flags.rs | 2 +- src/isolate.rs | 38 +++++++---- src/main.rs | 3 +- src/msg.fbs | 15 ++++ src/ops.rs | 173 ++++++++++++++++++++++++++++++++++++----------- src/resources.rs | 64 +++++++++++++++++- src/workers.rs | 148 ++++++++++++++++++++++++++++++++++++++++ 10 files changed, 470 insertions(+), 55 deletions(-) create mode 100644 js/workers.ts create mode 100644 src/workers.rs diff --git a/BUILD.gn b/BUILD.gn index c183fe3e94..8e013a0e8c 100644 --- a/BUILD.gn +++ b/BUILD.gn @@ -99,6 +99,7 @@ ts_sources = [ "js/url.ts", "js/url_search_params.ts", "js/util.ts", + "js/workers.ts", "js/write_file.ts", "tsconfig.json", diff --git a/js/globals.ts b/js/globals.ts index 6632153418..849d2bb3fa 100644 --- a/js/globals.ts +++ b/js/globals.ts @@ -19,6 +19,7 @@ import * as textEncoding from "./text_encoding"; import * as timers from "./timers"; import * as url from "./url"; import * as urlSearchParams from "./url_search_params"; +import * as workers from "./workers"; // These imports are not exposed and therefore are fine to just import the // symbols required. @@ -86,3 +87,8 @@ window.TextEncoder = textEncoding.TextEncoder; export type TextEncoder = textEncoding.TextEncoder; window.TextDecoder = textEncoding.TextDecoder; export type TextDecoder = textEncoding.TextDecoder; + +window.workerMain = workers.workerMain; +// TODO These shouldn't be available in main isolate. +window.postMessage = workers.postMessage; +window.close = workers.workerClose; diff --git a/js/workers.ts b/js/workers.ts new file mode 100644 index 0000000000..f7aa857fc4 --- /dev/null +++ b/js/workers.ts @@ -0,0 +1,75 @@ +// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. +import * as dispatch from "./dispatch"; +import { libdeno } from "./libdeno"; +import * as msg from "gen/msg_generated"; +import * as flatbuffers from "./flatbuffers"; +import { assert, log } from "./util"; +import { globalEval } from "./global_eval"; + +export async function postMessage(data: Uint8Array): Promise { + const builder = flatbuffers.createBuilder(); + msg.WorkerPostMessage.startWorkerPostMessage(builder); + const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder); + const baseRes = await dispatch.sendAsync( + builder, + msg.Any.WorkerPostMessage, + inner, + data + ); + assert(baseRes != null); +} + +export async function getMessage(): Promise { + log("getMessage"); + const builder = flatbuffers.createBuilder(); + msg.WorkerGetMessage.startWorkerGetMessage(builder); + const inner = msg.WorkerGetMessage.endWorkerGetMessage(builder); + const baseRes = await dispatch.sendAsync( + builder, + msg.Any.WorkerGetMessage, + inner + ); + assert(baseRes != null); + assert( + msg.Any.WorkerGetMessageRes === baseRes!.innerType(), + `base.innerType() unexpectedly is ${baseRes!.innerType()}` + ); + const res = new msg.WorkerGetMessageRes(); + assert(baseRes!.inner(res) != null); + + const dataArray = res.dataArray(); + if (dataArray == null) { + return null; + } else { + return new Uint8Array(dataArray!); + } +} + +let isClosing = false; + +export function workerClose(): void { + isClosing = true; +} + +export async function workerMain() { + log("workerMain"); + libdeno.recv(dispatch.handleAsyncMsgFromRust); + + // TODO avoid using globalEval to get Window. But circular imports if getting + // it from globals.ts + const window = globalEval("this"); + + while (!isClosing) { + const data = await getMessage(); + if (data == null) { + log("workerMain got null message. quitting."); + break; + } + if (window["onmessage"]) { + const event = { data }; + window.onmessage(event); + } else { + break; + } + } +} diff --git a/src/flags.rs b/src/flags.rs index befb15ab8e..5e6855a3de 100644 --- a/src/flags.rs +++ b/src/flags.rs @@ -15,7 +15,7 @@ macro_rules! svec { } #[cfg_attr(feature = "cargo-clippy", allow(stutter))] -#[derive(Debug, PartialEq, Default)] +#[derive(Clone, Debug, PartialEq, Default)] pub struct DenoFlags { pub help: bool, pub log_debug: bool, diff --git a/src/isolate.rs b/src/isolate.rs index a2e5ae2754..c4174de3f9 100644 --- a/src/isolate.rs +++ b/src/isolate.rs @@ -12,6 +12,7 @@ use js_errors::JSError; use libdeno; use permissions::DenoPermissions; +use futures::sync::mpsc as async_mpsc; use futures::Future; use libc::c_char; use libc::c_void; @@ -23,6 +24,7 @@ use std::ffi::CString; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc; use std::sync::Arc; +use std::sync::Mutex; use std::time::Duration; use std::time::Instant; use tokio; @@ -53,6 +55,10 @@ pub struct Isolate { pub state: Arc, } +pub type WorkerSender = async_mpsc::Sender; +pub type WorkerReceiver = async_mpsc::Receiver; +pub type WorkerChannels = (WorkerSender, WorkerReceiver); + // Isolate cannot be passed between threads but IsolateState can. // IsolateState satisfies Send and Sync. // So any state that needs to be accessed outside the main V8 thread should be @@ -64,20 +70,35 @@ pub struct IsolateState { pub permissions: DenoPermissions, pub flags: flags::DenoFlags, pub metrics: Metrics, + pub worker_channels: Option>, } impl IsolateState { - pub fn new(flags: flags::DenoFlags, argv_rest: Vec) -> Self { + pub fn new( + flags: flags::DenoFlags, + argv_rest: Vec, + worker_channels: Option, + ) -> Self { let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok(); + Self { dir: deno_dir::DenoDir::new(flags.reload, custom_root).unwrap(), argv: argv_rest, permissions: DenoPermissions::new(&flags), flags, metrics: Metrics::default(), + worker_channels: worker_channels.map(|wc| Mutex::new(wc)), } } + #[cfg(test)] + pub fn mock() -> Arc { + let argv = vec![String::from("./deno"), String::from("hello.js")]; + // For debugging: argv.push_back(String::from("-D")); + let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); + Arc::new(IsolateState::new(flags, rest_argv, None)) + } + #[inline] pub fn check_write(&self, filename: &str) -> DenoResult<()> { self.permissions.check_write(filename) @@ -451,10 +472,7 @@ mod tests { #[test] fn test_dispatch_sync() { - let argv = vec![String::from("./deno"), String::from("hello.js")]; - let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - - let state = Arc::new(IsolateState::new(flags, rest_argv)); + let state = IsolateState::mock(); let snapshot = libdeno::deno_buf::empty(); let isolate = Isolate::new(snapshot, state, dispatch_sync); tokio_util::init(|| { @@ -493,9 +511,7 @@ mod tests { #[test] fn test_metrics_sync() { - let argv = vec![String::from("./deno"), String::from("hello.js")]; - let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv)); + let state = IsolateState::mock(); let snapshot = libdeno::deno_buf::empty(); let isolate = Isolate::new(snapshot, state, metrics_dispatch_sync); tokio_util::init(|| { @@ -529,9 +545,7 @@ mod tests { #[test] fn test_metrics_async() { - let argv = vec![String::from("./deno"), String::from("hello.js")]; - let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv)); + let state = IsolateState::mock(); let snapshot = libdeno::deno_buf::empty(); let isolate = Isolate::new(snapshot, state, metrics_dispatch_async); tokio_util::init(|| { @@ -619,7 +633,7 @@ mod tests { let argv = vec![String::from("./deno"), String::from(filename)]; let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); - let state = Arc::new(IsolateState::new(flags, rest_argv)); + let state = Arc::new(IsolateState::new(flags, rest_argv, None)); let snapshot = libdeno::deno_buf::empty(); let isolate = Isolate::new(snapshot, state, dispatch_sync); tokio_util::init(|| { diff --git a/src/main.rs b/src/main.rs index 364a9cf7ed..75cc61b582 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,6 +47,7 @@ pub mod snapshot; mod tokio_util; mod tokio_write; pub mod version; +mod workers; #[cfg(unix)] mod eager_unix; @@ -96,7 +97,7 @@ fn main() { log::LevelFilter::Warn }); - let state = Arc::new(isolate::IsolateState::new(flags, rest_argv)); + let state = Arc::new(isolate::IsolateState::new(flags, rest_argv, None)); let snapshot = snapshot::deno_snapshot(); let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch); tokio_util::init(|| { diff --git a/src/msg.fbs b/src/msg.fbs index 989fafd0b5..a9afb195f0 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -1,6 +1,9 @@ union Any { Start, StartRes, + WorkerGetMessage, + WorkerGetMessageRes, + WorkerPostMessage, CodeFetch, CodeFetchRes, CodeCache, @@ -149,6 +152,18 @@ table StartRes { v8_version: string; } +table WorkerGetMessage { + unused: int8; +} + +table WorkerGetMessageRes { + data: [ubyte]; +} + +table WorkerPostMessage { + // data passed thru the zero-copy data parameter. +} + table CodeFetch { specifier: string; referrer: string; diff --git a/src/ops.rs b/src/ops.rs index d678e97273..cf25f29e06 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -16,7 +16,10 @@ use version; use flatbuffers::FlatBufferBuilder; use futures; +use futures::Async; use futures::Poll; +use futures::Sink; +use futures::Stream; use hyper; use hyper::rt::Future; use remove_dir_all::remove_dir_all; @@ -34,6 +37,7 @@ use std::path::Path; use std::path::PathBuf; use std::process::Command; use std::str::FromStr; +use std::sync::Arc; use std::time::UNIX_EPOCH; use std::time::{Duration, Instant}; use tokio; @@ -48,7 +52,7 @@ type OpResult = DenoResult; // TODO Ideally we wouldn't have to box the Op being returned. // The box is just to make it easier to get a prototype refactor working. type OpCreator = - fn(state: &IsolateState, base: &msg::Base, data: libdeno::deno_buf) + fn(state: &Arc, base: &msg::Base, data: libdeno::deno_buf) -> Box; #[inline] @@ -113,8 +117,10 @@ pub fn dispatch( msg::Any::Stat => op_stat, msg::Any::Symlink => op_symlink, msg::Any::Truncate => op_truncate, - msg::Any::WriteFile => op_write_file, + msg::Any::WorkerGetMessage => op_worker_get_message, + msg::Any::WorkerPostMessage => op_worker_post_message, msg::Any::Write => op_write, + msg::Any::WriteFile => op_write_file, _ => panic!(format!( "Unhandled message {}", msg::enum_name_any(inner_type) @@ -168,7 +174,7 @@ pub fn dispatch( } fn op_exit( - _config: &IsolateState, + _config: &Arc, base: &msg::Base, _data: libdeno::deno_buf, ) -> Box { @@ -177,7 +183,7 @@ fn op_exit( } fn op_start( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -238,19 +244,19 @@ fn serialize_response( } #[inline] -fn ok_future(buf: Buf) -> Box { +pub fn ok_future(buf: Buf) -> Box { Box::new(futures::future::ok(buf)) } // Shout out to Earl Sweatshirt. #[inline] -fn odd_future(err: DenoError) -> Box { +pub fn odd_future(err: DenoError) -> Box { Box::new(futures::future::err(err)) } // https://github.com/denoland/deno/blob/golang/os.go#L100-L154 fn op_code_fetch( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -293,7 +299,7 @@ fn op_code_fetch( // https://github.com/denoland/deno/blob/golang/os.go#L156-L169 fn op_code_cache( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -312,7 +318,7 @@ fn op_code_cache( } fn op_chdir( - _state: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -344,7 +350,7 @@ fn op_set_timeout( } fn op_set_env( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -360,7 +366,7 @@ fn op_set_env( } fn op_env( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -392,7 +398,7 @@ fn op_env( } fn op_fetch( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -479,7 +485,7 @@ where } fn op_make_temp_dir( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -528,7 +534,7 @@ fn op_make_temp_dir( } fn op_mkdir( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -548,7 +554,7 @@ fn op_mkdir( } fn op_chmod( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -581,7 +587,7 @@ fn op_chmod( } fn op_open( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -657,7 +663,7 @@ fn op_open( } fn op_close( - _state: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -674,7 +680,7 @@ fn op_close( } fn op_shutdown( - _state: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -700,7 +706,7 @@ fn op_shutdown( } fn op_read( - _state: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -738,7 +744,7 @@ fn op_read( } fn op_write( - _state: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -775,7 +781,7 @@ fn op_write( } fn op_remove( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -805,7 +811,7 @@ fn op_remove( // Prototype https://github.com/denoland/deno/blob/golang/os.go#L171-L184 fn op_read_file( - _config: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -839,7 +845,7 @@ fn op_read_file( } fn op_copy_file( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -891,7 +897,7 @@ fn get_mode(_perm: &fs::Permissions) -> u32 { } fn op_cwd( - _state: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -917,7 +923,7 @@ fn op_cwd( } fn op_stat( - _config: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -964,7 +970,7 @@ fn op_stat( } fn op_read_dir( - _state: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1022,7 +1028,7 @@ fn op_read_dir( } fn op_write_file( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1042,7 +1048,7 @@ fn op_write_file( } fn op_rename( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1062,7 +1068,7 @@ fn op_rename( } fn op_symlink( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1091,7 +1097,7 @@ fn op_symlink( } fn op_read_link( - _state: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1124,7 +1130,7 @@ fn op_read_link( } fn op_repl_start( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1155,7 +1161,7 @@ fn op_repl_start( } fn op_repl_readline( - _state: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1193,7 +1199,7 @@ fn op_repl_readline( } fn op_truncate( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1216,7 +1222,7 @@ fn op_truncate( } fn op_listen( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1282,7 +1288,7 @@ fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult { } fn op_accept( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1308,7 +1314,7 @@ fn op_accept( } fn op_dial( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1332,7 +1338,7 @@ fn op_dial( } fn op_metrics( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1356,7 +1362,7 @@ fn op_metrics( } fn op_resources( - _state: &IsolateState, + _state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1408,7 +1414,7 @@ fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio { } fn op_run( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1476,7 +1482,7 @@ fn op_run( } fn op_run_status( - state: &IsolateState, + state: &Arc, base: &msg::Base, data: libdeno::deno_buf, ) -> Box { @@ -1530,3 +1536,90 @@ fn op_run_status( }); Box::new(future) } + +struct GetMessageFuture { + pub state: Arc, +} + +impl Future for GetMessageFuture { + type Item = Option; + type Error = (); + + fn poll(&mut self) -> Result, Self::Error> { + assert!(self.state.worker_channels.is_some()); + match self.state.worker_channels { + None => panic!("expected worker_channels"), + Some(ref wc) => { + let mut wc = wc.lock().unwrap(); + wc.1.poll() + } + } + } +} + +fn op_worker_get_message( + state: &Arc, + base: &msg::Base, + data: libdeno::deno_buf, +) -> Box { + assert_eq!(data.len(), 0); + let cmd_id = base.cmd_id(); + + let op = GetMessageFuture { + state: state.clone(), + }; + let op = op.map_err(move |_| -> DenoError { unimplemented!() }); + let op = op.and_then(move |maybe_buf| -> DenoResult { + debug!("op_worker_get_message"); + let builder = &mut FlatBufferBuilder::new(); + + let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf)); + let inner = msg::WorkerGetMessageRes::create( + builder, + &msg::WorkerGetMessageResArgs { data }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::WorkerGetMessageRes, + ..Default::default() + }, + )) + }); + Box::new(op) +} + +fn op_worker_post_message( + state: &Arc, + base: &msg::Base, + data: libdeno::deno_buf, +) -> Box { + let cmd_id = base.cmd_id(); + + let d = Vec::from(data.as_ref()).into_boxed_slice(); + + assert!(state.worker_channels.is_some()); + let tx = match state.worker_channels { + None => panic!("expected worker_channels"), + Some(ref wc) => { + let mut wc = wc.lock().unwrap(); + wc.0.clone() + } + }; + let op = tx.send(d); + let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); + let op = op.and_then(move |_| -> DenoResult { + let builder = &mut FlatBufferBuilder::new(); + + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) + }); + Box::new(op) +} diff --git a/src/resources.rs b/src/resources.rs index f1497f2143..69173fe856 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -10,10 +10,12 @@ #[cfg(unix)] use eager_unix as eager; +use errors; use errors::bad_resource; use errors::DenoError; use errors::DenoResult; use http_body::HttpBody; +use isolate::WorkerChannels; use repl::Repl; use tokio_util; use tokio_write; @@ -22,7 +24,10 @@ use futures; use futures::future::{Either, FutureResult}; use futures::Future; use futures::Poll; +use futures::Sink; +use futures::Stream; use hyper; +use isolate::Buf; use std; use std::collections::HashMap; use std::io::{Error, Read, Write}; @@ -96,6 +101,14 @@ enum Repr { ChildStdin(tokio_process::ChildStdin), ChildStdout(tokio_process::ChildStdout), ChildStderr(tokio_process::ChildStderr), + Worker(WorkerChannels), +} + +/// If the given rid is open, this returns the type of resource, E.G. "worker". +/// If the rid is closed or was never open, it returns None. +pub fn get_type(rid: ResourceId) -> Option { + let table = RESOURCE_TABLE.lock().unwrap(); + table.get(&rid).map(inspect_repr) } pub fn table_entries() -> Vec<(u32, String)> { @@ -131,6 +144,7 @@ fn inspect_repr(repr: &Repr) -> String { Repr::ChildStdin(_) => "childStdin", Repr::ChildStdout(_) => "childStdout", Repr::ChildStderr(_) => "childStderr", + Repr::Worker(_) => "worker", }; String::from(h_repr) @@ -138,7 +152,7 @@ fn inspect_repr(repr: &Repr) -> String { // Abstract async file interface. // Ideally in unix, if Resource represents an OS rid, it will be the same. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Resource { pub rid: ResourceId, } @@ -284,6 +298,54 @@ pub fn add_repl(repl: Repl) -> Resource { Resource { rid } } +pub fn add_worker(wc: WorkerChannels) -> Resource { + let rid = new_rid(); + let mut tg = RESOURCE_TABLE.lock().unwrap(); + let r = tg.insert(rid, Repr::Worker(wc)); + assert!(r.is_none()); + Resource { rid } +} + +pub fn worker_post_message( + rid: ResourceId, + buf: Buf, +) -> futures::sink::Send> { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&rid); + match maybe_repr { + Some(Repr::Worker(ref mut wc)) => { + // unwrap here is incorrect, but doing it anyway + wc.0.clone().send(buf) + } + _ => panic!("bad resource"), // futures::future::err(bad_resource()).into(), + } +} + +pub struct WorkerReceiver { + rid: ResourceId, +} + +// Invert the dumbness that tokio_process causes by making Child itself a future. +impl Future for WorkerReceiver { + type Item = Option; + type Error = DenoError; + + fn poll(&mut self) -> Poll, DenoError> { + let mut table = RESOURCE_TABLE.lock().unwrap(); + let maybe_repr = table.get_mut(&self.rid); + match maybe_repr { + Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(|()| { + errors::new(errors::ErrorKind::Other, "recv msg error".to_string()) + }), + _ => Err(bad_resource()), + } + } +} + +pub fn worker_recv_message(rid: ResourceId) -> WorkerReceiver { + WorkerReceiver { rid } +} + #[cfg_attr(feature = "cargo-clippy", allow(stutter))] pub struct ChildResources { pub child_rid: ResourceId, diff --git a/src/workers.rs b/src/workers.rs new file mode 100644 index 0000000000..319f4018d8 --- /dev/null +++ b/src/workers.rs @@ -0,0 +1,148 @@ +// Copyright 2018 the Deno authors. All rights reserved. MIT license. + +#![allow(dead_code)] + +use isolate::Buf; +use isolate::Isolate; +use isolate::IsolateState; +use isolate::WorkerChannels; +use js_errors::JSError; +use ops; +use resources; +use snapshot; +use tokio_util; + +use futures::sync::mpsc; +use futures::sync::oneshot; +use futures::Future; +use std::sync::Arc; +use std::thread; + +/// Rust interface for WebWorkers. +pub struct Worker { + isolate: Isolate, +} + +impl Worker { + pub fn new(parent_state: &Arc) -> (Self, WorkerChannels) { + let (worker_in_tx, worker_in_rx) = mpsc::channel::(1); + let (worker_out_tx, worker_out_rx) = mpsc::channel::(1); + + let internal_channels = (worker_out_tx, worker_in_rx); + let external_channels = (worker_in_tx, worker_out_rx); + + let state = Arc::new(IsolateState::new( + parent_state.flags.clone(), + parent_state.argv.clone(), + Some(internal_channels), + )); + + let snapshot = snapshot::deno_snapshot(); + let isolate = Isolate::new(snapshot, state, ops::dispatch); + + let worker = Worker { isolate }; + (worker, external_channels) + } + + pub fn execute(&self, js_source: &str) -> Result<(), JSError> { + self.isolate.execute(js_source) + } + + pub fn event_loop(&self) -> Result<(), JSError> { + self.isolate.event_loop() + } +} + +fn spawn(state: Arc, js_source: String) -> resources::Resource { + // TODO This function should return a Future, so that the caller can retrieve + // the JSError if one is thrown. Currently it just prints to stderr and calls + // exit(1). + // let (js_error_tx, js_error_rx) = oneshot::channel::(); + let (p, c) = oneshot::channel::(); + let builder = thread::Builder::new().name("worker".to_string()); + let _tid = builder + .spawn(move || { + let (worker, external_channels) = Worker::new(&state); + + let mut resource = resources::add_worker(external_channels); + p.send(resource.clone()).unwrap(); + + tokio_util::init(|| { + (|| -> Result<(), JSError> { + worker.execute("workerMain()")?; + worker.execute(&js_source)?; + worker.event_loop()?; + Ok(()) + })().or_else(|err: JSError| -> Result<(), JSError> { + eprintln!("{}", err.to_string()); + std::process::exit(1) + }).unwrap(); + }); + + resource.close(); + }).unwrap(); + + let resource = c.wait().unwrap(); + + resource +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_spawn() { + let resource = spawn( + IsolateState::mock(), + r#" + onmessage = function(e) { + let s = new TextDecoder().decode(e.data);; + console.log("msg from main script", s); + if (s == "exit") { + close(); + return; + } else { + console.assert(s === "hi"); + } + postMessage(new Uint8Array([1, 2, 3])); + console.log("after postMessage"); + } + "#.into(), + ); + let msg = String::from("hi").into_boxed_str().into_boxed_bytes(); + + let r = resources::worker_post_message(resource.rid, msg).wait(); + assert!(r.is_ok()); + + let maybe_msg = + resources::worker_recv_message(resource.rid).wait().unwrap(); + assert!(maybe_msg.is_some()); + assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]); + + let msg = String::from("exit").into_boxed_str().into_boxed_bytes(); + let r = resources::worker_post_message(resource.rid, msg).wait(); + assert!(r.is_ok()); + } + + #[test] + fn removed_from_resource_table_on_close() { + let resource = + spawn(IsolateState::mock(), "onmessage = () => close();".into()); + + assert_eq!( + resources::get_type(resource.rid), + Some("worker".to_string()) + ); + + let msg = String::from("hi").into_boxed_str().into_boxed_bytes(); + let r = resources::worker_post_message(resource.rid, msg).wait(); + assert!(r.is_ok()); + println!("rid {:?}", resource.rid); + + // TODO Need a way to get a future for when a resource closes. + // For now, just sleep for a bit. + thread::sleep(std::time::Duration::from_millis(100)); + assert_eq!(resources::get_type(resource.rid), None); + } +}