1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-28 16:20:57 -05:00

Make Deno multithreaded.

By using the tokio default runtime.

This patch makes all of the ops thread safe.

Adds libdeno to JS globals to make for easier testing.

Preliminary work for #733.
This commit is contained in:
Ryan Dahl 2018-09-18 11:53:16 -07:00
parent 7c128df4a0
commit 4fd2b19f64
7 changed files with 392 additions and 210 deletions

View file

@ -54,6 +54,7 @@ main_extern = [
"$rust_build:tempfile",
"$rust_build:rand",
"$rust_build:tokio",
"$rust_build:tokio_executor",
"$rust_build:url",
"$rust_build:remove_dir_all",
"$rust_build:dirs",

View file

@ -56,8 +56,6 @@ declare global {
export const window = globalEval("this");
window.window = window;
window.libdeno = null;
window.setTimeout = timers.setTimeout;
window.setInterval = timers.setInterval;
window.clearTimeout = timers.clearTimer;

View file

@ -1,46 +1,44 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
use errors::DenoError;
use errors::DenoResult;
use flatbuffers::FlatBufferBuilder;
use fs as deno_fs;
use isolate::Buf;
use isolate::IsolateState;
use isolate::Op;
use msg;
use flatbuffers::FlatBufferBuilder;
use futures;
use futures::sync::oneshot;
use hyper;
use hyper::rt::{Future, Stream};
use hyper::Client;
use isolate::from_c;
use libdeno;
use libdeno::{deno_buf, isolate};
use msg;
use remove_dir_all::remove_dir_all;
use std;
use std::fs;
#[cfg(any(unix))]
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
use std::time::{Duration, Instant};
use tokio::timer::Delay;
// Buf represents a byte array returned from a "Op".
// The message might be empty (which will be translated into a null object on
// the javascript side) or it is a heap allocated opaque sequence of bytes.
// Usually a flatbuffer message.
type Buf = Option<Box<[u8]>>;
// JS promises in Deno map onto a specific Future
// which yields either a DenoError or a byte array.
type Op = Future<Item = Buf, Error = DenoError>;
type OpResult = DenoResult<Buf>;
// TODO Ideally we wouldn't have to box the Op being returned.
// The box is just to make it easier to get a prototype refactor working.
type Handler = fn(i: *const isolate, base: &msg::Base) -> Box<Op>;
type Handler = fn(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op>;
pub extern "C" fn msg_from_js(i: *const isolate, buf: deno_buf) {
let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) };
// Hopefully Rust optimizes this away.
fn empty_buf() -> Buf {
Box::new([])
}
pub fn msg_from_js(state: Arc<IsolateState>, bytes: &[u8]) -> (bool, Box<Op>) {
let base = msg::get_root_as_base(bytes);
let is_sync = base.sync();
let msg_type = base.msg_type();
let cmd_id = base.cmd_id();
let handler: Handler = match msg_type {
@ -68,73 +66,51 @@ pub extern "C" fn msg_from_js(i: *const isolate, buf: deno_buf) {
)),
};
let future = handler(i, &base);
let future = future.or_else(move |err| {
// No matter whether we got an Err or Ok, we want a serialized message to
// send back. So transform the DenoError into a deno_buf.
let builder = &mut FlatBufferBuilder::new();
let errmsg_offset = builder.create_string(&format!("{}", err));
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
error: Some(errmsg_offset),
error_kind: err.kind(),
..Default::default()
},
))
});
let isolate = from_c(i);
if base.sync() {
// Execute future synchronously.
// println!("sync handler {}", msg::enum_name_any(msg_type));
let maybe_box_u8 = future.wait().unwrap();
match maybe_box_u8 {
None => {}
Some(box_u8) => {
let buf = deno_buf_from(box_u8);
// Set the synchronous response, the value returned from isolate.send().
unsafe { libdeno::deno_set_response(i, buf) }
}
}
} else {
// Execute future asynchornously.
let future = future.and_then(move |maybe_box_u8| {
let buf = match maybe_box_u8 {
Some(box_u8) => deno_buf_from(box_u8),
None => {
// async RPCs that return None still need to
// send a message back to signal completion.
let builder = &mut FlatBufferBuilder::new();
deno_buf_from(
serialize_response(
cmd_id,
builder,
msg::BaseArgs {
..Default::default()
},
).unwrap(),
)
}
let op: Box<Op> = handler(state.clone(), &base);
let boxed_op = Box::new(
op.or_else(move |err: DenoError| -> DenoResult<Buf> {
debug!("op err {}", err);
// No matter whether we got an Err or Ok, we want a serialized message to
// send back. So transform the DenoError into a deno_buf.
let builder = &mut FlatBufferBuilder::new();
let errmsg_offset = builder.create_string(&format!("{}", err));
Ok(serialize_response(
cmd_id,
builder,
msg::BaseArgs {
error: Some(errmsg_offset),
error_kind: err.kind(),
..Default::default()
},
))
}).and_then(move |buf: Buf| -> DenoResult<Buf> {
// Handle empty responses. For sync responses we just want
// to send null. For async we want to send a small message
// with the cmd_id.
let buf = if is_sync || buf.len() > 0 {
buf
} else {
// async RPCs that return empty still need to
// send a message back to signal completion.
let builder = &mut FlatBufferBuilder::new();
serialize_response(
cmd_id,
builder,
msg::BaseArgs {
..Default::default()
},
)
};
// TODO(ry) make this thread safe.
unsafe { libdeno::deno_send(i, buf) };
Ok(())
});
isolate.rt.spawn(future);
}
}
Ok(buf)
}),
);
fn deno_buf_from(x: Box<[u8]>) -> deno_buf {
let len = x.len();
let ptr = Box::into_raw(x);
deno_buf {
alloc_ptr: 0 as *mut u8,
alloc_len: 0,
data_ptr: ptr as *mut u8,
data_len: len,
}
debug!(
"msg_from_js {} sync {}",
msg::enum_name_any(msg_type),
base.sync()
);
return (base.sync(), boxed_op);
}
fn permission_denied() -> DenoError {
@ -151,16 +127,15 @@ fn not_implemented() -> DenoError {
))
}
fn handle_exit(_i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_exit(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_exit().unwrap();
std::process::exit(msg.code())
}
fn handle_start(i: *const isolate, base: &msg::Base) -> Box<Op> {
let isolate = from_c(i);
fn handle_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let mut builder = FlatBufferBuilder::new();
let argv = isolate.argv.iter().map(|s| s.as_str()).collect::<Vec<_>>();
let argv = state.argv.iter().map(|s| s.as_str()).collect::<Vec<_>>();
let argv_off = builder.create_vector_of_strings(argv.as_slice());
let cwd_path = std::env::current_dir().unwrap();
@ -172,8 +147,8 @@ fn handle_start(i: *const isolate, base: &msg::Base) -> Box<Op> {
&msg::StartResArgs {
cwd: Some(cwd_off),
argv: Some(argv_off),
debug_flag: isolate.flags.log_debug,
recompile_flag: isolate.flags.recompile,
debug_flag: state.flags.log_debug,
recompile_flag: state.flags.recompile,
..Default::default()
},
);
@ -200,7 +175,7 @@ fn serialize_response(
let data = builder.finished_data();
// println!("serialize_response {:x?}", data);
let vec = data.to_vec();
Some(vec.into_boxed_slice())
vec.into_boxed_slice()
}
fn ok_future(buf: Buf) -> Box<Op> {
@ -213,22 +188,17 @@ fn odd_future(err: DenoError) -> Box<Op> {
}
// https://github.com/denoland/isolate/blob/golang/os.go#L100-L154
fn handle_code_fetch(i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_code_fetch(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_code_fetch().unwrap();
let cmd_id = base.cmd_id();
let module_specifier = msg.module_specifier().unwrap();
let containing_file = msg.containing_file().unwrap();
let isolate = from_c(i);
assert_eq!(
isolate.dir.root.join("gen"),
isolate.dir.gen,
"Sanity check"
);
assert_eq!(state.dir.root.join("gen"), state.dir.gen, "Sanity check");
Box::new(futures::future::result(|| -> OpResult {
let builder = &mut FlatBufferBuilder::new();
let out = isolate.dir.code_fetch(module_specifier, containing_file)?;
let out = state.dir.code_fetch(module_specifier, containing_file)?;
let mut msg_args = msg::CodeFetchResArgs {
module_name: Some(builder.create_string(&out.module_name)),
filename: Some(builder.create_string(&out.filename)),
@ -255,36 +225,34 @@ fn handle_code_fetch(i: *const isolate, base: &msg::Base) -> Box<Op> {
}
// https://github.com/denoland/isolate/blob/golang/os.go#L156-L169
fn handle_code_cache(i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_code_cache(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_code_cache().unwrap();
let filename = msg.filename().unwrap();
let source_code = msg.source_code().unwrap();
let output_code = msg.output_code().unwrap();
Box::new(futures::future::result(|| -> OpResult {
let isolate = from_c(i);
isolate.dir.code_cache(filename, source_code, output_code)?;
Ok(None)
state.dir.code_cache(filename, source_code, output_code)?;
Ok(empty_buf())
}()))
}
fn handle_set_env(i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_set_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_set_env().unwrap();
let key = msg.key().unwrap();
let value = msg.value().unwrap();
let isolate = from_c(i);
if !isolate.flags.allow_env {
if !state.flags.allow_env {
return odd_future(permission_denied());
}
std::env::set_var(key, value);
ok_future(None)
ok_future(empty_buf())
}
fn handle_env(i: *const isolate, base: &msg::Base) -> Box<Op> {
let isolate = from_c(i);
fn handle_env(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let cmd_id = base.cmd_id();
if !isolate.flags.allow_env {
if !state.flags.allow_env {
return odd_future(permission_denied());
}
@ -322,22 +290,23 @@ fn handle_env(i: *const isolate, base: &msg::Base) -> Box<Op> {
))
}
fn handle_fetch_req(i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_fetch_req(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_fetch_req().unwrap();
let cmd_id = base.cmd_id();
let id = msg.id();
let url = msg.url().unwrap();
let isolate = from_c(i);
if !isolate.flags.allow_net {
if !state.flags.allow_net {
return odd_future(permission_denied());
}
let url = url.parse::<hyper::Uri>().unwrap();
let client = Client::new();
debug!("Before fetch {}", url);
let future = client.get(url).and_then(move |res| {
let status = res.status().as_u16() as i32;
debug!("fetch {}", status);
let headers = {
let map = res.headers();
@ -361,6 +330,7 @@ fn handle_fetch_req(i: *const isolate, base: &msg::Base) -> Box<Op> {
let future = future.map_err(|err| -> DenoError { err.into() }).and_then(
move |(status, body, headers)| {
debug!("fetch body ");
let builder = &mut FlatBufferBuilder::new();
// Send the first message without a body. This is just to indicate
// what status code.
@ -422,7 +392,7 @@ where
(delay_task, cancel_tx)
}
fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_make_temp_dir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let base = Box::new(*base);
let msg = base.msg_as_make_temp_dir().unwrap();
let cmd_id = base.cmd_id();
@ -430,8 +400,7 @@ fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box<Op> {
let prefix = msg.prefix();
let suffix = msg.suffix();
let isolate = from_c(i);
if !isolate.flags.allow_write {
if !state.flags.allow_write {
return odd_future(permission_denied());
}
// TODO Use blocking() here.
@ -461,28 +430,28 @@ fn handle_make_temp_dir(i: *const isolate, base: &msg::Base) -> Box<Op> {
}()))
}
fn handle_mkdir(i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_mkdir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_mkdir().unwrap();
let mode = msg.mode();
let path = msg.path().unwrap();
let isolate = from_c(i);
if !isolate.flags.allow_write {
if !state.flags.allow_write {
return odd_future(permission_denied());
}
// TODO Use tokio_threadpool.
Box::new(futures::future::result(|| -> OpResult {
debug!("handle_mkdir {}", path);
deno_fs::mkdir(Path::new(path), mode)?;
Ok(None)
Ok(empty_buf())
}()))
}
fn handle_remove(i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_remove(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_remove().unwrap();
let path = msg.path().unwrap();
let recursive = msg.recursive();
let isolate = from_c(i);
if !isolate.flags.allow_write {
if !state.flags.allow_write {
return odd_future(permission_denied());
}
// TODO Use tokio_threadpool.
@ -499,12 +468,12 @@ fn handle_remove(i: *const isolate, base: &msg::Base) -> Box<Op> {
fs::remove_dir(&path_)?;
}
}
Ok(None)
Ok(empty_buf())
}()))
}
// Prototype https://github.com/denoland/isolate/blob/golang/os.go#L171-L184
fn handle_read_file(_i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_read_file(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_read_file().unwrap();
let cmd_id = base.cmd_id();
let filename = String::from(msg.filename().unwrap());
@ -554,7 +523,7 @@ fn get_mode(_perm: fs::Permissions) -> u32 {
0
}
fn handle_stat(_i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_stat(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_stat().unwrap();
let cmd_id = base.cmd_id();
let filename = String::from(msg.filename().unwrap());
@ -597,48 +566,49 @@ fn handle_stat(_i: *const isolate, base: &msg::Base) -> Box<Op> {
}()))
}
fn handle_write_file(i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_write_file(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_write_file().unwrap();
let filename = String::from(msg.filename().unwrap());
let data = msg.data().unwrap();
let perm = msg.perm();
let isolate = from_c(i);
if !isolate.flags.allow_write {
if !state.flags.allow_write {
return odd_future(permission_denied());
}
Box::new(futures::future::result(|| -> OpResult {
debug!("handle_write_file {}", filename);
deno_fs::write_file(Path::new(&filename), data, perm)?;
Ok(None)
Ok(empty_buf())
}()))
}
fn remove_timer(i: *const isolate, timer_id: u32) {
let isolate = from_c(i);
isolate.timers.remove(&timer_id);
fn remove_timer(state: Arc<IsolateState>, timer_id: u32) {
let mut timers = state.timers.lock().unwrap();
timers.remove(&timer_id);
}
// Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L25-L39
fn handle_timer_start(i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_timer_start(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
debug!("handle_timer_start");
let msg = base.msg_as_timer_start().unwrap();
let cmd_id = base.cmd_id();
let timer_id = msg.id();
let delay = msg.delay();
let isolate = from_c(i);
let config2 = state.clone();
let future = {
let (delay_task, cancel_delay) = set_timeout(
move || {
remove_timer(i, timer_id);
remove_timer(config2, timer_id);
},
delay,
);
isolate.timers.insert(timer_id, cancel_delay);
let mut timers = state.timers.lock().unwrap();
timers.insert(timer_id, cancel_delay);
delay_task
};
Box::new(future.then(move |result| {
let r = Box::new(future.then(move |result| {
let builder = &mut FlatBufferBuilder::new();
let msg = msg::TimerReady::create(
builder,
@ -657,20 +627,20 @@ fn handle_timer_start(i: *const isolate, base: &msg::Base) -> Box<Op> {
..Default::default()
},
))
}))
}));
r
}
// Prototype: https://github.com/ry/isolate/blob/golang/timers.go#L40-L43
fn handle_timer_clear(i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_timer_clear(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_timer_clear().unwrap();
debug!("handle_timer_clear");
remove_timer(i, msg.id());
ok_future(None)
remove_timer(state, msg.id());
ok_future(empty_buf())
}
fn handle_rename(i: *const isolate, base: &msg::Base) -> Box<Op> {
let isolate = from_c(i);
if !isolate.flags.allow_write {
fn handle_rename(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
if !state.flags.allow_write {
return odd_future(permission_denied());
}
let msg = base.msg_as_rename().unwrap();
@ -679,13 +649,12 @@ fn handle_rename(i: *const isolate, base: &msg::Base) -> Box<Op> {
Box::new(futures::future::result(|| -> OpResult {
debug!("handle_rename {} {}", oldpath, newpath);
fs::rename(Path::new(&oldpath), Path::new(&newpath))?;
Ok(None)
Ok(empty_buf())
}()))
}
fn handle_symlink(i: *const isolate, base: &msg::Base) -> Box<Op> {
let deno = from_c(i);
if !deno.flags.allow_write {
fn handle_symlink(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
if !state.flags.allow_write {
return odd_future(permission_denied());
}
// TODO Use type for Windows.
@ -699,12 +668,12 @@ fn handle_symlink(i: *const isolate, base: &msg::Base) -> Box<Op> {
debug!("handle_symlink {} {}", oldname, newname);
#[cfg(any(unix))]
std::os::unix::fs::symlink(Path::new(&oldname), Path::new(&newname))?;
Ok(None)
Ok(empty_buf())
}()))
}
}
fn handle_read_link(_i: *const isolate, base: &msg::Base) -> Box<Op> {
fn handle_read_link(_state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
let msg = base.msg_as_readlink().unwrap();
let cmd_id = base.cmd_id();
let name = String::from(msg.name().unwrap());

View file

@ -1,10 +1,12 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
use errors::DenoResult;
use tokio_util;
use futures::Future;
use futures::Stream;
use hyper;
use hyper::client::Client;
use hyper::client::HttpConnector;
use hyper::Uri;
use hyper_rustls;
@ -29,21 +31,24 @@ pub fn get_client() -> Client<Connector, hyper::Body> {
pub fn fetch_sync_string(module_name: &str) -> DenoResult<String> {
let url = module_name.parse::<Uri>().unwrap();
let client = get_client();
// TODO Use Deno's RT
let mut rt = Runtime::new().unwrap();
let body = rt.block_on(
client
.get(url)
.and_then(|response| response.into_body().concat2()),
)?;
let future = client
.get(url)
.and_then(|response| response.into_body().concat2());
let body = tokio_util::block_on(future)?;
Ok(String::from_utf8(body.to_vec()).unwrap())
}
#[test]
fn test_fetch_sync_string() {
// Relies on external http server. See tools/http_server.py
let p = fetch_sync_string("http://localhost:4545/package.json").unwrap();
println!("package.json len {}", p.len());
assert!(p.len() > 1);
use futures;
tokio_util::init(|| {
tokio_util::block_on(futures::future::lazy(|| -> DenoResult<()> {
let p = fetch_sync_string("http://127.0.0.1:4545/package.json")?;
println!("package.json len {}", p.len());
assert!(p.len() > 1);
Ok(())
})).unwrap();
});
}

View file

@ -1,54 +1,113 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
// Do not use FlatBuffers in this module.
// TODO Currently this module uses Tokio, but it would be nice if they were
// decoupled.
use deno_dir;
use errors::DenoError;
use flags;
use futures;
use handlers;
use libc::c_void;
use libdeno;
use futures;
use futures::Future;
use libc::c_void;
use std;
use std::collections::HashMap;
use std::ffi::CStr;
use std::ffi::CString;
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use tokio;
use tokio_util;
type DenoException<'a> = &'a str;
// Buf represents a byte array returned from a "Op".
// The message might be empty (which will be translated into a null object on
// the javascript side) or it is a heap allocated opaque sequence of bytes.
// Usually a flatbuffer message.
pub type Buf = Box<[u8]>;
// JS promises in Deno map onto a specific Future
// which yields either a DenoError or a byte array.
pub type Op = Future<Item = Buf, Error = DenoError> + Send;
// Returns (is_sync, op)
pub type Dispatch = fn(state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>);
pub struct Isolate {
pub ptr: *const libdeno::isolate,
ptr: *const libdeno::isolate,
dispatch: Dispatch,
rx: mpsc::Receiver<Buf>,
ntasks: i32,
pub state: Arc<IsolateState>,
}
// Isolate cannot be passed between threads but IsolateState can. So any state that
// needs to be accessed outside the main V8 thread should be inside IsolateState.
pub struct IsolateState {
pub dir: deno_dir::DenoDir,
pub rt: tokio::runtime::current_thread::Runtime,
pub timers: HashMap<u32, futures::sync::oneshot::Sender<()>>,
pub timers: Mutex<HashMap<u32, futures::sync::oneshot::Sender<()>>>,
pub argv: Vec<String>,
pub flags: flags::DenoFlags,
tx: Mutex<Option<mpsc::Sender<Buf>>>,
}
impl IsolateState {
// Thread safe.
fn send_to_js(&self, buf: Buf) {
let mut g = self.tx.lock().unwrap();
let maybe_tx = g.as_mut();
assert!(maybe_tx.is_some(), "Expected tx to not be deleted.");
let tx = maybe_tx.unwrap();
tx.send(buf).expect("tx.send error");
}
}
static DENO_INIT: std::sync::Once = std::sync::ONCE_INIT;
impl Isolate {
pub fn new(argv: Vec<String>) -> Box<Isolate> {
pub fn new(argv: Vec<String>, dispatch: Dispatch) -> Box<Isolate> {
DENO_INIT.call_once(|| {
unsafe { libdeno::deno_init() };
});
let (flags, argv_rest) = flags::set_flags(argv);
let mut deno_box = Box::new(Isolate {
// This channel handles sending async messages back to the runtime.
let (tx, rx) = mpsc::channel::<Buf>();
let mut isolate = Box::new(Isolate {
ptr: 0 as *const libdeno::isolate,
dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
rt: tokio::runtime::current_thread::Runtime::new().unwrap(),
timers: HashMap::new(),
argv: argv_rest,
flags,
dispatch,
rx,
ntasks: 0,
state: Arc::new(IsolateState {
dir: deno_dir::DenoDir::new(flags.reload, None).unwrap(),
timers: Mutex::new(HashMap::new()),
argv: argv_rest,
flags,
tx: Mutex::new(Some(tx)),
}),
});
(*deno_box).ptr = unsafe {
(*isolate).ptr = unsafe {
libdeno::deno_new(
deno_box.as_ref() as *const _ as *const c_void,
handlers::msg_from_js,
isolate.as_ref() as *const _ as *const c_void,
pre_dispatch,
)
};
deno_box
isolate
}
pub fn from_c<'a>(d: *const libdeno::isolate) -> &'a mut Isolate {
let ptr = unsafe { libdeno::deno_get_data(d) };
let ptr = ptr as *mut Isolate;
let isolate_box = unsafe { Box::from_raw(ptr) };
Box::leak(isolate_box)
}
pub fn execute(
@ -68,6 +127,42 @@ impl Isolate {
}
Ok(())
}
pub fn set_response(&self, buf: Buf) {
unsafe { libdeno::deno_set_response(self.ptr, buf.into()) }
}
pub fn send(&self, buf: Buf) {
unsafe { libdeno::deno_send(self.ptr, buf.into()) };
}
// TODO Use Park abstraction? Note at time of writing Tokio default runtime
// does not have new_with_park().
pub fn event_loop(&mut self) {
// Main thread event loop.
while !self.is_idle() {
let buf = self.rx.recv().unwrap();
// Receiving a message on rx exactly corresponds to an async task
// completing.
self.ntasks_decrement();
// Call into JS with the buf.
self.send(buf);
}
}
fn ntasks_increment(&mut self) {
assert!(self.ntasks >= 0);
self.ntasks = self.ntasks + 1;
}
fn ntasks_decrement(&mut self) {
self.ntasks = self.ntasks - 1;
assert!(self.ntasks >= 0);
}
fn is_idle(&self) -> bool {
self.ntasks == 0
}
}
impl Drop for Isolate {
@ -76,22 +171,107 @@ impl Drop for Isolate {
}
}
pub fn from_c<'a>(i: *const libdeno::isolate) -> &'a mut Isolate {
let ptr = unsafe { libdeno::deno_get_data(i) };
let ptr = ptr as *mut Isolate;
let isolate_box = unsafe { Box::from_raw(ptr) };
Box::leak(isolate_box)
/// Converts Rust Buf to libdeno deno_buf.
impl From<Buf> for libdeno::deno_buf {
fn from(x: Buf) -> libdeno::deno_buf {
let len = x.len();
let ptr = Box::into_raw(x);
libdeno::deno_buf {
alloc_ptr: 0 as *mut u8,
alloc_len: 0,
data_ptr: ptr as *mut u8,
data_len: len,
}
}
}
#[test]
fn test_c_to_rust() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let isolate = Isolate::new(argv);
let isolate2 = from_c(isolate.ptr);
assert_eq!(isolate.ptr, isolate2.ptr);
assert_eq!(
isolate.dir.root.join("gen"),
isolate.dir.gen,
"Sanity check"
);
// Dereferences the C pointer into the Rust Isolate object.
extern "C" fn pre_dispatch(d: *const libdeno::isolate, buf: libdeno::deno_buf) {
let bytes = unsafe { std::slice::from_raw_parts(buf.data_ptr, buf.data_len) };
let isolate = Isolate::from_c(d);
let dispatch = isolate.dispatch;
let (is_sync, op) = dispatch(isolate.state.clone(), bytes);
if is_sync {
// Execute op synchronously.
let buf = tokio_util::block_on(op).unwrap();
if buf.len() != 0 {
// Set the synchronous response, the value returned from isolate.send().
isolate.set_response(buf);
}
} else {
// Execute op asynchronously.
let state = isolate.state.clone();
// TODO Ideally Tokio would could tell us how many tasks are executing, but
// it cannot currently. Therefore we track top-level promises/tasks
// manually.
isolate.ntasks_increment();
let task = op
.and_then(move |buf| {
state.send_to_js(buf);
Ok(())
}).map_err(|_| ());
tokio::spawn(task);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_c_to_rust() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let isolate = Isolate::new(argv, unreachable_dispatch);
let isolate2 = Isolate::from_c(isolate.ptr);
assert_eq!(isolate.ptr, isolate2.ptr);
assert_eq!(
isolate.state.dir.root.join("gen"),
isolate.state.dir.gen,
"Sanity check"
);
}
fn unreachable_dispatch(
_state: Arc<IsolateState>,
_buf: &[u8],
) -> (bool, Box<Op>) {
unreachable!();
}
#[test]
fn test_dispatch_sync() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let mut isolate = Isolate::new(argv, dispatch_sync);
tokio_util::init(|| {
isolate
.execute(
"y.js",
r#"
const m = new Uint8Array([4, 5, 6]);
let n = libdeno.send(m);
if (!(n.byteLength === 3 &&
n[0] === 1 &&
n[1] === 2 &&
n[2] === 3)) {
throw Error("assert error");
}
"#,
).expect("execute error");
isolate.event_loop();
});
}
fn dispatch_sync(_state: Arc<IsolateState>, buf: &[u8]) -> (bool, Box<Op>) {
assert_eq!(buf[0], 4);
assert_eq!(buf[1], 5);
assert_eq!(buf[2], 6);
// Send back some sync response.
let vec: Vec<u8> = vec![1, 2, 3];
let buf = vec.into_boxed_slice();
let op = Box::new(futures::future::ok(buf));
(true, op)
}
}

View file

@ -7,6 +7,7 @@ extern crate msg_rs as msg;
extern crate rand;
extern crate tempfile;
extern crate tokio;
extern crate tokio_executor;
extern crate url;
#[macro_use]
extern crate lazy_static;
@ -25,9 +26,9 @@ pub mod handlers;
mod http;
mod isolate;
mod libdeno;
mod tokio_util;
mod version;
use isolate::Isolate;
use std::env;
static LOGGER: Logger = Logger;
@ -49,18 +50,16 @@ impl log::Log for Logger {
fn main() {
log::set_logger(&LOGGER).unwrap();
let args = env::args().collect();
let mut isolate = Isolate::new(args);
flags::process(&isolate.flags);
isolate
.execute("deno_main.js", "denoMain();")
.unwrap_or_else(|err| {
error!("{}", err);
std::process::exit(1);
});
// Start the Tokio event loop
isolate.rt.run().expect("err");
let mut isolate = isolate::Isolate::new(args, handlers::msg_from_js);
flags::process(&isolate.state.flags);
tokio_util::init(|| {
isolate
.execute("deno_main.js", "denoMain();")
.unwrap_or_else(|err| {
error!("{}", err);
std::process::exit(1);
});
isolate.event_loop();
});
}

30
src/tokio_util.rs Normal file
View file

@ -0,0 +1,30 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
use futures;
use futures::Future;
use tokio;
use tokio_executor;
pub fn block_on<F, R, E>(future: F) -> Result<R, E>
where
F: Send + 'static + Future<Item = R, Error = E>,
R: Send + 'static,
E: Send + 'static,
{
let (tx, rx) = futures::sync::oneshot::channel();
tokio::spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!())));
rx.wait().unwrap()
}
// Set the default executor so we can use tokio::spawn(). It's difficult to
// pass around mut references to the runtime, so using with_default is
// preferable. Ideally Tokio would provide this function.
pub fn init<F>(f: F)
where
F: FnOnce(),
{
let rt = tokio::runtime::Runtime::new().unwrap();
let mut executor = rt.executor();
let mut enter = tokio_executor::enter().expect("Multiple executors at once");
tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f());
}