1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-01 16:51:13 -05:00

Minimal Worker support (#1476)

This adds the ability to spawn additional Isolates from Rust and send
and receive messages from them. This is preliminary work to support
running the typescript compiler in a separate isolate and thus support
native ES modules. Ref #975.
This commit is contained in:
Ryan Dahl 2019-01-08 14:44:06 -05:00 committed by GitHub
parent 9ff6bca863
commit 6f79ad721a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 470 additions and 55 deletions

View file

@ -99,6 +99,7 @@ ts_sources = [
"js/url.ts", "js/url.ts",
"js/url_search_params.ts", "js/url_search_params.ts",
"js/util.ts", "js/util.ts",
"js/workers.ts",
"js/write_file.ts", "js/write_file.ts",
"tsconfig.json", "tsconfig.json",

View file

@ -19,6 +19,7 @@ import * as textEncoding from "./text_encoding";
import * as timers from "./timers"; import * as timers from "./timers";
import * as url from "./url"; import * as url from "./url";
import * as urlSearchParams from "./url_search_params"; import * as urlSearchParams from "./url_search_params";
import * as workers from "./workers";
// These imports are not exposed and therefore are fine to just import the // These imports are not exposed and therefore are fine to just import the
// symbols required. // symbols required.
@ -86,3 +87,8 @@ window.TextEncoder = textEncoding.TextEncoder;
export type TextEncoder = textEncoding.TextEncoder; export type TextEncoder = textEncoding.TextEncoder;
window.TextDecoder = textEncoding.TextDecoder; window.TextDecoder = textEncoding.TextDecoder;
export type TextDecoder = textEncoding.TextDecoder; export type TextDecoder = textEncoding.TextDecoder;
window.workerMain = workers.workerMain;
// TODO These shouldn't be available in main isolate.
window.postMessage = workers.postMessage;
window.close = workers.workerClose;

75
js/workers.ts Normal file
View file

@ -0,0 +1,75 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import * as dispatch from "./dispatch";
import { libdeno } from "./libdeno";
import * as msg from "gen/msg_generated";
import * as flatbuffers from "./flatbuffers";
import { assert, log } from "./util";
import { globalEval } from "./global_eval";
export async function postMessage(data: Uint8Array): Promise<void> {
const builder = flatbuffers.createBuilder();
msg.WorkerPostMessage.startWorkerPostMessage(builder);
const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder);
const baseRes = await dispatch.sendAsync(
builder,
msg.Any.WorkerPostMessage,
inner,
data
);
assert(baseRes != null);
}
export async function getMessage(): Promise<null | Uint8Array> {
log("getMessage");
const builder = flatbuffers.createBuilder();
msg.WorkerGetMessage.startWorkerGetMessage(builder);
const inner = msg.WorkerGetMessage.endWorkerGetMessage(builder);
const baseRes = await dispatch.sendAsync(
builder,
msg.Any.WorkerGetMessage,
inner
);
assert(baseRes != null);
assert(
msg.Any.WorkerGetMessageRes === baseRes!.innerType(),
`base.innerType() unexpectedly is ${baseRes!.innerType()}`
);
const res = new msg.WorkerGetMessageRes();
assert(baseRes!.inner(res) != null);
const dataArray = res.dataArray();
if (dataArray == null) {
return null;
} else {
return new Uint8Array(dataArray!);
}
}
let isClosing = false;
export function workerClose(): void {
isClosing = true;
}
export async function workerMain() {
log("workerMain");
libdeno.recv(dispatch.handleAsyncMsgFromRust);
// TODO avoid using globalEval to get Window. But circular imports if getting
// it from globals.ts
const window = globalEval("this");
while (!isClosing) {
const data = await getMessage();
if (data == null) {
log("workerMain got null message. quitting.");
break;
}
if (window["onmessage"]) {
const event = { data };
window.onmessage(event);
} else {
break;
}
}
}

View file

@ -15,7 +15,7 @@ macro_rules! svec {
} }
#[cfg_attr(feature = "cargo-clippy", allow(stutter))] #[cfg_attr(feature = "cargo-clippy", allow(stutter))]
#[derive(Debug, PartialEq, Default)] #[derive(Clone, Debug, PartialEq, Default)]
pub struct DenoFlags { pub struct DenoFlags {
pub help: bool, pub help: bool,
pub log_debug: bool, pub log_debug: bool,

View file

@ -12,6 +12,7 @@ use js_errors::JSError;
use libdeno; use libdeno;
use permissions::DenoPermissions; use permissions::DenoPermissions;
use futures::sync::mpsc as async_mpsc;
use futures::Future; use futures::Future;
use libc::c_char; use libc::c_char;
use libc::c_void; use libc::c_void;
@ -23,6 +24,7 @@ use std::ffi::CString;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc; use std::sync::mpsc;
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
use tokio; use tokio;
@ -53,6 +55,10 @@ pub struct Isolate {
pub state: Arc<IsolateState>, pub state: Arc<IsolateState>,
} }
pub type WorkerSender = async_mpsc::Sender<Buf>;
pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
pub type WorkerChannels = (WorkerSender, WorkerReceiver);
// Isolate cannot be passed between threads but IsolateState can. // Isolate cannot be passed between threads but IsolateState can.
// IsolateState satisfies Send and Sync. // IsolateState satisfies Send and Sync.
// So any state that needs to be accessed outside the main V8 thread should be // So any state that needs to be accessed outside the main V8 thread should be
@ -64,20 +70,35 @@ pub struct IsolateState {
pub permissions: DenoPermissions, pub permissions: DenoPermissions,
pub flags: flags::DenoFlags, pub flags: flags::DenoFlags,
pub metrics: Metrics, pub metrics: Metrics,
pub worker_channels: Option<Mutex<WorkerChannels>>,
} }
impl IsolateState { impl IsolateState {
pub fn new(flags: flags::DenoFlags, argv_rest: Vec<String>) -> Self { pub fn new(
flags: flags::DenoFlags,
argv_rest: Vec<String>,
worker_channels: Option<WorkerChannels>,
) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok(); let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();
Self { Self {
dir: deno_dir::DenoDir::new(flags.reload, custom_root).unwrap(), dir: deno_dir::DenoDir::new(flags.reload, custom_root).unwrap(),
argv: argv_rest, argv: argv_rest,
permissions: DenoPermissions::new(&flags), permissions: DenoPermissions::new(&flags),
flags, flags,
metrics: Metrics::default(), metrics: Metrics::default(),
worker_channels: worker_channels.map(|wc| Mutex::new(wc)),
} }
} }
#[cfg(test)]
pub fn mock() -> Arc<IsolateState> {
let argv = vec![String::from("./deno"), String::from("hello.js")];
// For debugging: argv.push_back(String::from("-D"));
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
Arc::new(IsolateState::new(flags, rest_argv, None))
}
#[inline] #[inline]
pub fn check_write(&self, filename: &str) -> DenoResult<()> { pub fn check_write(&self, filename: &str) -> DenoResult<()> {
self.permissions.check_write(filename) self.permissions.check_write(filename)
@ -451,10 +472,7 @@ mod tests {
#[test] #[test]
fn test_dispatch_sync() { fn test_dispatch_sync() {
let argv = vec![String::from("./deno"), String::from("hello.js")]; let state = IsolateState::mock();
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let snapshot = libdeno::deno_buf::empty(); let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync); let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| { tokio_util::init(|| {
@ -493,9 +511,7 @@ mod tests {
#[test] #[test]
fn test_metrics_sync() { fn test_metrics_sync() {
let argv = vec![String::from("./deno"), String::from("hello.js")]; let state = IsolateState::mock();
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let snapshot = libdeno::deno_buf::empty(); let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_sync); let isolate = Isolate::new(snapshot, state, metrics_dispatch_sync);
tokio_util::init(|| { tokio_util::init(|| {
@ -529,9 +545,7 @@ mod tests {
#[test] #[test]
fn test_metrics_async() { fn test_metrics_async() {
let argv = vec![String::from("./deno"), String::from("hello.js")]; let state = IsolateState::mock();
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let snapshot = libdeno::deno_buf::empty(); let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_async); let isolate = Isolate::new(snapshot, state, metrics_dispatch_async);
tokio_util::init(|| { tokio_util::init(|| {
@ -619,7 +633,7 @@ mod tests {
let argv = vec![String::from("./deno"), String::from(filename)]; let argv = vec![String::from("./deno"), String::from(filename)];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap(); let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv)); let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let snapshot = libdeno::deno_buf::empty(); let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync); let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| { tokio_util::init(|| {

View file

@ -47,6 +47,7 @@ pub mod snapshot;
mod tokio_util; mod tokio_util;
mod tokio_write; mod tokio_write;
pub mod version; pub mod version;
mod workers;
#[cfg(unix)] #[cfg(unix)]
mod eager_unix; mod eager_unix;
@ -96,7 +97,7 @@ fn main() {
log::LevelFilter::Warn log::LevelFilter::Warn
}); });
let state = Arc::new(isolate::IsolateState::new(flags, rest_argv)); let state = Arc::new(isolate::IsolateState::new(flags, rest_argv, None));
let snapshot = snapshot::deno_snapshot(); let snapshot = snapshot::deno_snapshot();
let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch); let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch);
tokio_util::init(|| { tokio_util::init(|| {

View file

@ -1,6 +1,9 @@
union Any { union Any {
Start, Start,
StartRes, StartRes,
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
CodeFetch, CodeFetch,
CodeFetchRes, CodeFetchRes,
CodeCache, CodeCache,
@ -149,6 +152,18 @@ table StartRes {
v8_version: string; v8_version: string;
} }
table WorkerGetMessage {
unused: int8;
}
table WorkerGetMessageRes {
data: [ubyte];
}
table WorkerPostMessage {
// data passed thru the zero-copy data parameter.
}
table CodeFetch { table CodeFetch {
specifier: string; specifier: string;
referrer: string; referrer: string;

View file

@ -16,7 +16,10 @@ use version;
use flatbuffers::FlatBufferBuilder; use flatbuffers::FlatBufferBuilder;
use futures; use futures;
use futures::Async;
use futures::Poll; use futures::Poll;
use futures::Sink;
use futures::Stream;
use hyper; use hyper;
use hyper::rt::Future; use hyper::rt::Future;
use remove_dir_all::remove_dir_all; use remove_dir_all::remove_dir_all;
@ -34,6 +37,7 @@ use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use std::process::Command; use std::process::Command;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use std::time::UNIX_EPOCH; use std::time::UNIX_EPOCH;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio; use tokio;
@ -48,7 +52,7 @@ type OpResult = DenoResult<Buf>;
// TODO Ideally we wouldn't have to box the Op being returned. // TODO Ideally we wouldn't have to box the Op being returned.
// The box is just to make it easier to get a prototype refactor working. // The box is just to make it easier to get a prototype refactor working.
type OpCreator = type OpCreator =
fn(state: &IsolateState, base: &msg::Base, data: libdeno::deno_buf) fn(state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf)
-> Box<Op>; -> Box<Op>;
#[inline] #[inline]
@ -113,8 +117,10 @@ pub fn dispatch(
msg::Any::Stat => op_stat, msg::Any::Stat => op_stat,
msg::Any::Symlink => op_symlink, msg::Any::Symlink => op_symlink,
msg::Any::Truncate => op_truncate, msg::Any::Truncate => op_truncate,
msg::Any::WriteFile => op_write_file, msg::Any::WorkerGetMessage => op_worker_get_message,
msg::Any::WorkerPostMessage => op_worker_post_message,
msg::Any::Write => op_write, msg::Any::Write => op_write,
msg::Any::WriteFile => op_write_file,
_ => panic!(format!( _ => panic!(format!(
"Unhandled message {}", "Unhandled message {}",
msg::enum_name_any(inner_type) msg::enum_name_any(inner_type)
@ -168,7 +174,7 @@ pub fn dispatch(
} }
fn op_exit( fn op_exit(
_config: &IsolateState, _config: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
_data: libdeno::deno_buf, _data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -177,7 +183,7 @@ fn op_exit(
} }
fn op_start( fn op_start(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -238,19 +244,19 @@ fn serialize_response(
} }
#[inline] #[inline]
fn ok_future(buf: Buf) -> Box<Op> { pub fn ok_future(buf: Buf) -> Box<Op> {
Box::new(futures::future::ok(buf)) Box::new(futures::future::ok(buf))
} }
// Shout out to Earl Sweatshirt. // Shout out to Earl Sweatshirt.
#[inline] #[inline]
fn odd_future(err: DenoError) -> Box<Op> { pub fn odd_future(err: DenoError) -> Box<Op> {
Box::new(futures::future::err(err)) Box::new(futures::future::err(err))
} }
// https://github.com/denoland/deno/blob/golang/os.go#L100-L154 // https://github.com/denoland/deno/blob/golang/os.go#L100-L154
fn op_code_fetch( fn op_code_fetch(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -293,7 +299,7 @@ fn op_code_fetch(
// https://github.com/denoland/deno/blob/golang/os.go#L156-L169 // https://github.com/denoland/deno/blob/golang/os.go#L156-L169
fn op_code_cache( fn op_code_cache(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -312,7 +318,7 @@ fn op_code_cache(
} }
fn op_chdir( fn op_chdir(
_state: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -344,7 +350,7 @@ fn op_set_timeout(
} }
fn op_set_env( fn op_set_env(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -360,7 +366,7 @@ fn op_set_env(
} }
fn op_env( fn op_env(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -392,7 +398,7 @@ fn op_env(
} }
fn op_fetch( fn op_fetch(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -479,7 +485,7 @@ where
} }
fn op_make_temp_dir( fn op_make_temp_dir(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -528,7 +534,7 @@ fn op_make_temp_dir(
} }
fn op_mkdir( fn op_mkdir(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -548,7 +554,7 @@ fn op_mkdir(
} }
fn op_chmod( fn op_chmod(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -581,7 +587,7 @@ fn op_chmod(
} }
fn op_open( fn op_open(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -657,7 +663,7 @@ fn op_open(
} }
fn op_close( fn op_close(
_state: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -674,7 +680,7 @@ fn op_close(
} }
fn op_shutdown( fn op_shutdown(
_state: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -700,7 +706,7 @@ fn op_shutdown(
} }
fn op_read( fn op_read(
_state: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -738,7 +744,7 @@ fn op_read(
} }
fn op_write( fn op_write(
_state: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -775,7 +781,7 @@ fn op_write(
} }
fn op_remove( fn op_remove(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -805,7 +811,7 @@ fn op_remove(
// Prototype https://github.com/denoland/deno/blob/golang/os.go#L171-L184 // Prototype https://github.com/denoland/deno/blob/golang/os.go#L171-L184
fn op_read_file( fn op_read_file(
_config: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -839,7 +845,7 @@ fn op_read_file(
} }
fn op_copy_file( fn op_copy_file(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -891,7 +897,7 @@ fn get_mode(_perm: &fs::Permissions) -> u32 {
} }
fn op_cwd( fn op_cwd(
_state: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -917,7 +923,7 @@ fn op_cwd(
} }
fn op_stat( fn op_stat(
_config: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -964,7 +970,7 @@ fn op_stat(
} }
fn op_read_dir( fn op_read_dir(
_state: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1022,7 +1028,7 @@ fn op_read_dir(
} }
fn op_write_file( fn op_write_file(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1042,7 +1048,7 @@ fn op_write_file(
} }
fn op_rename( fn op_rename(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1062,7 +1068,7 @@ fn op_rename(
} }
fn op_symlink( fn op_symlink(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1091,7 +1097,7 @@ fn op_symlink(
} }
fn op_read_link( fn op_read_link(
_state: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1124,7 +1130,7 @@ fn op_read_link(
} }
fn op_repl_start( fn op_repl_start(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1155,7 +1161,7 @@ fn op_repl_start(
} }
fn op_repl_readline( fn op_repl_readline(
_state: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1193,7 +1199,7 @@ fn op_repl_readline(
} }
fn op_truncate( fn op_truncate(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1216,7 +1222,7 @@ fn op_truncate(
} }
fn op_listen( fn op_listen(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1282,7 +1288,7 @@ fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult {
} }
fn op_accept( fn op_accept(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1308,7 +1314,7 @@ fn op_accept(
} }
fn op_dial( fn op_dial(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1332,7 +1338,7 @@ fn op_dial(
} }
fn op_metrics( fn op_metrics(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1356,7 +1362,7 @@ fn op_metrics(
} }
fn op_resources( fn op_resources(
_state: &IsolateState, _state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1408,7 +1414,7 @@ fn subprocess_stdio_map(v: msg::ProcessStdio) -> std::process::Stdio {
} }
fn op_run( fn op_run(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1476,7 +1482,7 @@ fn op_run(
} }
fn op_run_status( fn op_run_status(
state: &IsolateState, state: &Arc<IsolateState>,
base: &msg::Base, base: &msg::Base,
data: libdeno::deno_buf, data: libdeno::deno_buf,
) -> Box<Op> { ) -> Box<Op> {
@ -1530,3 +1536,90 @@ fn op_run_status(
}); });
Box::new(future) Box::new(future)
} }
struct GetMessageFuture {
pub state: Arc<IsolateState>,
}
impl Future for GetMessageFuture {
type Item = Option<Buf>;
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
assert!(self.state.worker_channels.is_some());
match self.state.worker_channels {
None => panic!("expected worker_channels"),
Some(ref wc) => {
let mut wc = wc.lock().unwrap();
wc.1.poll()
}
}
}
}
fn op_worker_get_message(
state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
assert_eq!(data.len(), 0);
let cmd_id = base.cmd_id();
let op = GetMessageFuture {
state: state.clone(),
};
let op = op.map_err(move |_| -> DenoError { unimplemented!() });
let op = op.and_then(move |maybe_buf| -> DenoResult<Buf> {
debug!("op_worker_get_message");
let builder = &mut FlatBufferBuilder::new();
let data = maybe_buf.as_ref().map(|buf| builder.create_vector(buf));
let inner = msg::WorkerGetMessageRes::create(
builder,
&msg::WorkerGetMessageResArgs { data },
);
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
inner: Some(inner.as_union_value()),
inner_type: msg::Any::WorkerGetMessageRes,
..Default::default()
},
))
});
Box::new(op)
}
fn op_worker_post_message(
state: &Arc<IsolateState>,
base: &msg::Base,
data: libdeno::deno_buf,
) -> Box<Op> {
let cmd_id = base.cmd_id();
let d = Vec::from(data.as_ref()).into_boxed_slice();
assert!(state.worker_channels.is_some());
let tx = match state.worker_channels {
None => panic!("expected worker_channels"),
Some(ref wc) => {
let mut wc = wc.lock().unwrap();
wc.0.clone()
}
};
let op = tx.send(d);
let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string()));
let op = op.and_then(move |_| -> DenoResult<Buf> {
let builder = &mut FlatBufferBuilder::new();
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
..Default::default()
},
))
});
Box::new(op)
}

View file

@ -10,10 +10,12 @@
#[cfg(unix)] #[cfg(unix)]
use eager_unix as eager; use eager_unix as eager;
use errors;
use errors::bad_resource; use errors::bad_resource;
use errors::DenoError; use errors::DenoError;
use errors::DenoResult; use errors::DenoResult;
use http_body::HttpBody; use http_body::HttpBody;
use isolate::WorkerChannels;
use repl::Repl; use repl::Repl;
use tokio_util; use tokio_util;
use tokio_write; use tokio_write;
@ -22,7 +24,10 @@ use futures;
use futures::future::{Either, FutureResult}; use futures::future::{Either, FutureResult};
use futures::Future; use futures::Future;
use futures::Poll; use futures::Poll;
use futures::Sink;
use futures::Stream;
use hyper; use hyper;
use isolate::Buf;
use std; use std;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Error, Read, Write}; use std::io::{Error, Read, Write};
@ -96,6 +101,14 @@ enum Repr {
ChildStdin(tokio_process::ChildStdin), ChildStdin(tokio_process::ChildStdin),
ChildStdout(tokio_process::ChildStdout), ChildStdout(tokio_process::ChildStdout),
ChildStderr(tokio_process::ChildStderr), ChildStderr(tokio_process::ChildStderr),
Worker(WorkerChannels),
}
/// If the given rid is open, this returns the type of resource, E.G. "worker".
/// If the rid is closed or was never open, it returns None.
pub fn get_type(rid: ResourceId) -> Option<String> {
let table = RESOURCE_TABLE.lock().unwrap();
table.get(&rid).map(inspect_repr)
} }
pub fn table_entries() -> Vec<(u32, String)> { pub fn table_entries() -> Vec<(u32, String)> {
@ -131,6 +144,7 @@ fn inspect_repr(repr: &Repr) -> String {
Repr::ChildStdin(_) => "childStdin", Repr::ChildStdin(_) => "childStdin",
Repr::ChildStdout(_) => "childStdout", Repr::ChildStdout(_) => "childStdout",
Repr::ChildStderr(_) => "childStderr", Repr::ChildStderr(_) => "childStderr",
Repr::Worker(_) => "worker",
}; };
String::from(h_repr) String::from(h_repr)
@ -138,7 +152,7 @@ fn inspect_repr(repr: &Repr) -> String {
// Abstract async file interface. // Abstract async file interface.
// Ideally in unix, if Resource represents an OS rid, it will be the same. // Ideally in unix, if Resource represents an OS rid, it will be the same.
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct Resource { pub struct Resource {
pub rid: ResourceId, pub rid: ResourceId,
} }
@ -284,6 +298,54 @@ pub fn add_repl(repl: Repl) -> Resource {
Resource { rid } Resource { rid }
} }
pub fn add_worker(wc: WorkerChannels) -> Resource {
let rid = new_rid();
let mut tg = RESOURCE_TABLE.lock().unwrap();
let r = tg.insert(rid, Repr::Worker(wc));
assert!(r.is_none());
Resource { rid }
}
pub fn worker_post_message(
rid: ResourceId,
buf: Buf,
) -> futures::sink::Send<futures::sync::mpsc::Sender<Buf>> {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&rid);
match maybe_repr {
Some(Repr::Worker(ref mut wc)) => {
// unwrap here is incorrect, but doing it anyway
wc.0.clone().send(buf)
}
_ => panic!("bad resource"), // futures::future::err(bad_resource()).into(),
}
}
pub struct WorkerReceiver {
rid: ResourceId,
}
// Invert the dumbness that tokio_process causes by making Child itself a future.
impl Future for WorkerReceiver {
type Item = Option<Buf>;
type Error = DenoError;
fn poll(&mut self) -> Poll<Option<Buf>, DenoError> {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&self.rid);
match maybe_repr {
Some(Repr::Worker(ref mut wc)) => wc.1.poll().map_err(|()| {
errors::new(errors::ErrorKind::Other, "recv msg error".to_string())
}),
_ => Err(bad_resource()),
}
}
}
pub fn worker_recv_message(rid: ResourceId) -> WorkerReceiver {
WorkerReceiver { rid }
}
#[cfg_attr(feature = "cargo-clippy", allow(stutter))] #[cfg_attr(feature = "cargo-clippy", allow(stutter))]
pub struct ChildResources { pub struct ChildResources {
pub child_rid: ResourceId, pub child_rid: ResourceId,

148
src/workers.rs Normal file
View file

@ -0,0 +1,148 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
#![allow(dead_code)]
use isolate::Buf;
use isolate::Isolate;
use isolate::IsolateState;
use isolate::WorkerChannels;
use js_errors::JSError;
use ops;
use resources;
use snapshot;
use tokio_util;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::Future;
use std::sync::Arc;
use std::thread;
/// Rust interface for WebWorkers.
pub struct Worker {
isolate: Isolate,
}
impl Worker {
pub fn new(parent_state: &Arc<IsolateState>) -> (Self, WorkerChannels) {
let (worker_in_tx, worker_in_rx) = mpsc::channel::<Buf>(1);
let (worker_out_tx, worker_out_rx) = mpsc::channel::<Buf>(1);
let internal_channels = (worker_out_tx, worker_in_rx);
let external_channels = (worker_in_tx, worker_out_rx);
let state = Arc::new(IsolateState::new(
parent_state.flags.clone(),
parent_state.argv.clone(),
Some(internal_channels),
));
let snapshot = snapshot::deno_snapshot();
let isolate = Isolate::new(snapshot, state, ops::dispatch);
let worker = Worker { isolate };
(worker, external_channels)
}
pub fn execute(&self, js_source: &str) -> Result<(), JSError> {
self.isolate.execute(js_source)
}
pub fn event_loop(&self) -> Result<(), JSError> {
self.isolate.event_loop()
}
}
fn spawn(state: Arc<IsolateState>, js_source: String) -> resources::Resource {
// TODO This function should return a Future, so that the caller can retrieve
// the JSError if one is thrown. Currently it just prints to stderr and calls
// exit(1).
// let (js_error_tx, js_error_rx) = oneshot::channel::<JSError>();
let (p, c) = oneshot::channel::<resources::Resource>();
let builder = thread::Builder::new().name("worker".to_string());
let _tid = builder
.spawn(move || {
let (worker, external_channels) = Worker::new(&state);
let mut resource = resources::add_worker(external_channels);
p.send(resource.clone()).unwrap();
tokio_util::init(|| {
(|| -> Result<(), JSError> {
worker.execute("workerMain()")?;
worker.execute(&js_source)?;
worker.event_loop()?;
Ok(())
})().or_else(|err: JSError| -> Result<(), JSError> {
eprintln!("{}", err.to_string());
std::process::exit(1)
}).unwrap();
});
resource.close();
}).unwrap();
let resource = c.wait().unwrap();
resource
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_spawn() {
let resource = spawn(
IsolateState::mock(),
r#"
onmessage = function(e) {
let s = new TextDecoder().decode(e.data);;
console.log("msg from main script", s);
if (s == "exit") {
close();
return;
} else {
console.assert(s === "hi");
}
postMessage(new Uint8Array([1, 2, 3]));
console.log("after postMessage");
}
"#.into(),
);
let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
let r = resources::worker_post_message(resource.rid, msg).wait();
assert!(r.is_ok());
let maybe_msg =
resources::worker_recv_message(resource.rid).wait().unwrap();
assert!(maybe_msg.is_some());
assert_eq!(*maybe_msg.unwrap(), [1, 2, 3]);
let msg = String::from("exit").into_boxed_str().into_boxed_bytes();
let r = resources::worker_post_message(resource.rid, msg).wait();
assert!(r.is_ok());
}
#[test]
fn removed_from_resource_table_on_close() {
let resource =
spawn(IsolateState::mock(), "onmessage = () => close();".into());
assert_eq!(
resources::get_type(resource.rid),
Some("worker".to_string())
);
let msg = String::from("hi").into_boxed_str().into_boxed_bytes();
let r = resources::worker_post_message(resource.rid, msg).wait();
assert!(r.is_ok());
println!("rid {:?}", resource.rid);
// TODO Need a way to get a future for when a resource closes.
// For now, just sleep for a bit.
thread::sleep(std::time::Duration::from_millis(100));
assert_eq!(resources::get_type(resource.rid), None);
}
}