mirror of
https://github.com/denoland/deno.git
synced 2025-01-08 15:19:40 -05:00
Use the thread pool for blocking I/O
This commit is contained in:
parent
9e317c61d7
commit
df09fbad92
3 changed files with 104 additions and 64 deletions
1
BUILD.gn
1
BUILD.gn
|
@ -55,6 +55,7 @@ main_extern = [
|
||||||
"$rust_build:rand",
|
"$rust_build:rand",
|
||||||
"$rust_build:tokio",
|
"$rust_build:tokio",
|
||||||
"$rust_build:tokio_executor",
|
"$rust_build:tokio_executor",
|
||||||
|
"$rust_build:tokio_threadpool",
|
||||||
"$rust_build:url",
|
"$rust_build:url",
|
||||||
"$rust_build:remove_dir_all",
|
"$rust_build:remove_dir_all",
|
||||||
"$rust_build:dirs",
|
"$rust_build:dirs",
|
||||||
|
|
166
src/handlers.rs
166
src/handlers.rs
|
@ -10,7 +10,9 @@ use msg;
|
||||||
|
|
||||||
use flatbuffers::FlatBufferBuilder;
|
use flatbuffers::FlatBufferBuilder;
|
||||||
use futures;
|
use futures;
|
||||||
|
use futures::future::poll_fn;
|
||||||
use futures::sync::oneshot;
|
use futures::sync::oneshot;
|
||||||
|
use futures::Poll;
|
||||||
use hyper;
|
use hyper;
|
||||||
use hyper::rt::{Future, Stream};
|
use hyper::rt::{Future, Stream};
|
||||||
use hyper::Client;
|
use hyper::Client;
|
||||||
|
@ -20,10 +22,12 @@ use std::fs;
|
||||||
#[cfg(any(unix))]
|
#[cfg(any(unix))]
|
||||||
use std::os::unix::fs::PermissionsExt;
|
use std::os::unix::fs::PermissionsExt;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::UNIX_EPOCH;
|
use std::time::UNIX_EPOCH;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio::timer::Delay;
|
use tokio::timer::Delay;
|
||||||
|
use tokio_threadpool;
|
||||||
|
|
||||||
type OpResult = DenoResult<Buf>;
|
type OpResult = DenoResult<Buf>;
|
||||||
|
|
||||||
|
@ -91,8 +95,6 @@ pub fn msg_from_js(state: Arc<IsolateState>, bytes: &[u8]) -> (bool, Box<Op>) {
|
||||||
let buf = if is_sync || buf.len() > 0 {
|
let buf = if is_sync || buf.len() > 0 {
|
||||||
buf
|
buf
|
||||||
} else {
|
} else {
|
||||||
// async RPCs that return empty still need to
|
|
||||||
// send a message back to signal completion.
|
|
||||||
let builder = &mut FlatBufferBuilder::new();
|
let builder = &mut FlatBufferBuilder::new();
|
||||||
serialize_response(
|
serialize_response(
|
||||||
cmd_id,
|
cmd_id,
|
||||||
|
@ -402,23 +404,61 @@ where
|
||||||
(delay_task, cancel_tx)
|
(delay_task, cancel_tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is just type conversion. Implement From trait?
|
||||||
|
// See https://github.com/tokio-rs/tokio/blob/ffd73a64e7ec497622b7f939e38017afe7124dc4/tokio-fs/src/lib.rs#L76-L85
|
||||||
|
fn convert_blocking<F>(f: F) -> Poll<Buf, DenoError>
|
||||||
|
where
|
||||||
|
F: FnOnce() -> DenoResult<Buf>,
|
||||||
|
{
|
||||||
|
use futures::Async::*;
|
||||||
|
match tokio_threadpool::blocking(f) {
|
||||||
|
Ok(Ready(Ok(v))) => Ok(v.into()),
|
||||||
|
Ok(Ready(Err(err))) => Err(err),
|
||||||
|
Ok(NotReady) => Ok(NotReady),
|
||||||
|
Err(_) => panic!("blocking error"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO Do not use macro for the blocking function.. We should instead be able
|
||||||
|
// to do this with a normal function, but there seems to some type system
|
||||||
|
// issues. The type of this function should be something like this:
|
||||||
|
// fn blocking<F>(is_sync: bool, f: F) -> Box<Op>
|
||||||
|
// where F: FnOnce() -> DenoResult<Buf>
|
||||||
|
macro_rules! blocking {
|
||||||
|
($is_sync:expr,$fn:expr) => {
|
||||||
|
if $is_sync {
|
||||||
|
// If synchronous, execute the function immediately on the main thread.
|
||||||
|
Box::new(futures::future::result($fn()))
|
||||||
|
} else {
|
||||||
|
// Otherwise dispatch to thread pool.
|
||||||
|
Box::new(poll_fn(move || convert_blocking($fn)))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_make_temp_dir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
fn handle_make_temp_dir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
let base = Box::new(*base);
|
let base = Box::new(*base);
|
||||||
let msg = base.msg_as_make_temp_dir().unwrap();
|
let msg = base.msg_as_make_temp_dir().unwrap();
|
||||||
let cmd_id = base.cmd_id();
|
let cmd_id = base.cmd_id();
|
||||||
let dir = msg.dir();
|
|
||||||
let prefix = msg.prefix();
|
|
||||||
let suffix = msg.suffix();
|
|
||||||
|
|
||||||
if !state.flags.allow_write {
|
if !state.flags.allow_write {
|
||||||
return odd_future(permission_denied());
|
return odd_future(permission_denied());
|
||||||
}
|
}
|
||||||
// TODO Use blocking() here.
|
|
||||||
Box::new(futures::future::result(|| -> OpResult {
|
let dir = msg.dir().map(PathBuf::from);
|
||||||
|
let prefix = msg.prefix().map(String::from);
|
||||||
|
let suffix = msg.suffix().map(String::from);
|
||||||
|
|
||||||
|
blocking!(base.sync(), || -> OpResult {
|
||||||
// TODO(piscisaureus): use byte vector for paths, not a string.
|
// TODO(piscisaureus): use byte vector for paths, not a string.
|
||||||
// See https://github.com/denoland/isolate/issues/627.
|
// See https://github.com/denoland/isolate/issues/627.
|
||||||
// We can't assume that paths are always valid utf8 strings.
|
// We can't assume that paths are always valid utf8 strings.
|
||||||
let path = deno_fs::make_temp_dir(dir.map(Path::new), prefix, suffix)?;
|
let path = deno_fs::make_temp_dir(
|
||||||
|
// Converting Option<String> to Option<&str>
|
||||||
|
dir.as_ref().map(|x| &**x),
|
||||||
|
prefix.as_ref().map(|x| &**x),
|
||||||
|
suffix.as_ref().map(|x| &**x),
|
||||||
|
)?;
|
||||||
let builder = &mut FlatBufferBuilder::new();
|
let builder = &mut FlatBufferBuilder::new();
|
||||||
let path_off = builder.create_string(path.to_str().unwrap());
|
let path_off = builder.create_string(path.to_str().unwrap());
|
||||||
let msg = msg::MakeTempDirRes::create(
|
let msg = msg::MakeTempDirRes::create(
|
||||||
|
@ -437,59 +477,56 @@ fn handle_make_temp_dir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
}()))
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_mkdir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
fn handle_mkdir(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
let msg = base.msg_as_mkdir().unwrap();
|
let msg = base.msg_as_mkdir().unwrap();
|
||||||
let mode = msg.mode();
|
let mode = msg.mode();
|
||||||
let path = msg.path().unwrap();
|
let path = String::from(msg.path().unwrap());
|
||||||
|
|
||||||
if !state.flags.allow_write {
|
if !state.flags.allow_write {
|
||||||
return odd_future(permission_denied());
|
return odd_future(permission_denied());
|
||||||
}
|
}
|
||||||
// TODO Use tokio_threadpool.
|
|
||||||
Box::new(futures::future::result(|| -> OpResult {
|
blocking!(base.sync(), || {
|
||||||
debug!("handle_mkdir {}", path);
|
debug!("handle_mkdir {}", path);
|
||||||
deno_fs::mkdir(Path::new(path), mode)?;
|
deno_fs::mkdir(Path::new(&path), mode)?;
|
||||||
Ok(empty_buf())
|
Ok(empty_buf())
|
||||||
}()))
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_remove(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
fn handle_remove(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
let msg = base.msg_as_remove().unwrap();
|
let msg = base.msg_as_remove().unwrap();
|
||||||
let path = msg.path().unwrap();
|
let path = PathBuf::from(msg.path().unwrap());
|
||||||
let recursive = msg.recursive();
|
let recursive = msg.recursive();
|
||||||
|
|
||||||
if !state.flags.allow_write {
|
if !state.flags.allow_write {
|
||||||
return odd_future(permission_denied());
|
return odd_future(permission_denied());
|
||||||
}
|
}
|
||||||
// TODO Use tokio_threadpool.
|
blocking!(base.sync(), || {
|
||||||
Box::new(futures::future::result(|| -> OpResult {
|
debug!("handle_remove {}", path.display());
|
||||||
debug!("handle_remove {}", path);
|
let metadata = fs::metadata(&path)?;
|
||||||
let path_ = Path::new(&path);
|
|
||||||
let metadata = fs::metadata(&path_)?;
|
|
||||||
if metadata.is_file() {
|
if metadata.is_file() {
|
||||||
fs::remove_file(&path_)?;
|
fs::remove_file(&path)?;
|
||||||
} else {
|
} else {
|
||||||
if recursive {
|
if recursive {
|
||||||
remove_dir_all(&path_)?;
|
remove_dir_all(&path)?;
|
||||||
} else {
|
} else {
|
||||||
fs::remove_dir(&path_)?;
|
fs::remove_dir(&path)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(empty_buf())
|
Ok(empty_buf())
|
||||||
}()))
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prototype https://github.com/denoland/isolate/blob/golang/os.go#L171-L184
|
// Prototype https://github.com/denoland/isolate/blob/golang/os.go#L171-L184
|
||||||
fn handle_read_file(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
fn handle_read_file(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
let msg = base.msg_as_read_file().unwrap();
|
let msg = base.msg_as_read_file().unwrap();
|
||||||
let cmd_id = base.cmd_id();
|
let cmd_id = base.cmd_id();
|
||||||
let filename = String::from(msg.filename().unwrap());
|
let filename = PathBuf::from(msg.filename().unwrap());
|
||||||
Box::new(futures::future::result(|| -> OpResult {
|
debug!("handle_read_file {}", filename.display());
|
||||||
debug!("handle_read_file {}", filename);
|
blocking!(base.sync(), || {
|
||||||
let vec = fs::read(Path::new(&filename))?;
|
let vec = fs::read(&filename)?;
|
||||||
// Build the response message. memcpy data into msg.
|
// Build the response message. memcpy data into msg.
|
||||||
// TODO(ry) zero-copy.
|
// TODO(ry) zero-copy.
|
||||||
let builder = &mut FlatBufferBuilder::new();
|
let builder = &mut FlatBufferBuilder::new();
|
||||||
|
@ -510,7 +547,7 @@ fn handle_read_file(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
}()))
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
macro_rules! to_seconds {
|
macro_rules! to_seconds {
|
||||||
|
@ -536,17 +573,16 @@ fn get_mode(_perm: fs::Permissions) -> u32 {
|
||||||
fn handle_stat(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
fn handle_stat(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
let msg = base.msg_as_stat().unwrap();
|
let msg = base.msg_as_stat().unwrap();
|
||||||
let cmd_id = base.cmd_id();
|
let cmd_id = base.cmd_id();
|
||||||
let filename = String::from(msg.filename().unwrap());
|
let filename = PathBuf::from(msg.filename().unwrap());
|
||||||
let lstat = msg.lstat();
|
let lstat = msg.lstat();
|
||||||
|
|
||||||
Box::new(futures::future::result(|| -> OpResult {
|
blocking!(base.sync(), || {
|
||||||
let builder = &mut FlatBufferBuilder::new();
|
let builder = &mut FlatBufferBuilder::new();
|
||||||
debug!("handle_stat {} {}", filename, lstat);
|
debug!("handle_stat {} {}", filename.display(), lstat);
|
||||||
let path = Path::new(&filename);
|
|
||||||
let metadata = if lstat {
|
let metadata = if lstat {
|
||||||
fs::symlink_metadata(path)?
|
fs::symlink_metadata(&filename)?
|
||||||
} else {
|
} else {
|
||||||
fs::metadata(path)?
|
fs::metadata(&filename)?
|
||||||
};
|
};
|
||||||
|
|
||||||
let msg = msg::StatRes::create(
|
let msg = msg::StatRes::create(
|
||||||
|
@ -573,24 +609,25 @@ fn handle_stat(_config: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
}()))
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_write_file(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
fn handle_write_file(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
let msg = base.msg_as_write_file().unwrap();
|
let msg = base.msg_as_write_file().unwrap();
|
||||||
let filename = String::from(msg.filename().unwrap());
|
|
||||||
let data = msg.data().unwrap();
|
|
||||||
let perm = msg.perm();
|
|
||||||
|
|
||||||
if !state.flags.allow_write {
|
if !state.flags.allow_write {
|
||||||
return odd_future(permission_denied());
|
return odd_future(permission_denied());
|
||||||
}
|
}
|
||||||
|
|
||||||
Box::new(futures::future::result(|| -> OpResult {
|
let filename = String::from(msg.filename().unwrap());
|
||||||
|
let data = Vec::from(msg.data().unwrap());
|
||||||
|
let perm = msg.perm();
|
||||||
|
|
||||||
|
blocking!(base.sync(), || -> OpResult {
|
||||||
debug!("handle_write_file {}", filename);
|
debug!("handle_write_file {}", filename);
|
||||||
deno_fs::write_file(Path::new(&filename), data, perm)?;
|
deno_fs::write_file(Path::new(&filename), data.as_slice(), perm)?;
|
||||||
Ok(empty_buf())
|
Ok(empty_buf())
|
||||||
}()))
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove_timer(state: Arc<IsolateState>, timer_id: u32) {
|
fn remove_timer(state: Arc<IsolateState>, timer_id: u32) {
|
||||||
|
@ -654,13 +691,13 @@ fn handle_rename(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
return odd_future(permission_denied());
|
return odd_future(permission_denied());
|
||||||
}
|
}
|
||||||
let msg = base.msg_as_rename().unwrap();
|
let msg = base.msg_as_rename().unwrap();
|
||||||
let oldpath = String::from(msg.oldpath().unwrap());
|
let oldpath = PathBuf::from(msg.oldpath().unwrap());
|
||||||
let newpath = String::from(msg.newpath().unwrap());
|
let newpath = PathBuf::from(msg.newpath().unwrap());
|
||||||
Box::new(futures::future::result(|| -> OpResult {
|
blocking!(base.sync(), || -> OpResult {
|
||||||
debug!("handle_rename {} {}", oldpath, newpath);
|
debug!("handle_rename {} {}", oldpath.display(), newpath.display());
|
||||||
fs::rename(Path::new(&oldpath), Path::new(&newpath))?;
|
fs::rename(&oldpath, &newpath)?;
|
||||||
Ok(empty_buf())
|
Ok(empty_buf())
|
||||||
}()))
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_symlink(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
fn handle_symlink(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
|
@ -670,26 +707,27 @@ fn handle_symlink(state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
// TODO Use type for Windows.
|
// TODO Use type for Windows.
|
||||||
if cfg!(windows) {
|
if cfg!(windows) {
|
||||||
return odd_future(not_implemented());
|
return odd_future(not_implemented());
|
||||||
} else {
|
|
||||||
let msg = base.msg_as_symlink().unwrap();
|
|
||||||
let oldname = String::from(msg.oldname().unwrap());
|
|
||||||
let newname = String::from(msg.newname().unwrap());
|
|
||||||
Box::new(futures::future::result(|| -> OpResult {
|
|
||||||
debug!("handle_symlink {} {}", oldname, newname);
|
|
||||||
#[cfg(any(unix))]
|
|
||||||
std::os::unix::fs::symlink(Path::new(&oldname), Path::new(&newname))?;
|
|
||||||
Ok(empty_buf())
|
|
||||||
}()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let msg = base.msg_as_symlink().unwrap();
|
||||||
|
let oldname = PathBuf::from(msg.oldname().unwrap());
|
||||||
|
let newname = PathBuf::from(msg.newname().unwrap());
|
||||||
|
blocking!(base.sync(), || -> OpResult {
|
||||||
|
debug!("handle_symlink {} {}", oldname.display(), newname.display());
|
||||||
|
#[cfg(any(unix))]
|
||||||
|
std::os::unix::fs::symlink(&oldname, &newname)?;
|
||||||
|
Ok(empty_buf())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_read_link(_state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
fn handle_read_link(_state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
let msg = base.msg_as_readlink().unwrap();
|
let msg = base.msg_as_readlink().unwrap();
|
||||||
let cmd_id = base.cmd_id();
|
let cmd_id = base.cmd_id();
|
||||||
let name = String::from(msg.name().unwrap());
|
let name = PathBuf::from(msg.name().unwrap());
|
||||||
Box::new(futures::future::result(|| -> OpResult {
|
|
||||||
debug!("handle_read_link {}", name);
|
blocking!(base.sync(), || -> OpResult {
|
||||||
let path = fs::read_link(Path::new(&name))?;
|
debug!("handle_read_link {}", name.display());
|
||||||
|
let path = fs::read_link(&name)?;
|
||||||
let builder = &mut FlatBufferBuilder::new();
|
let builder = &mut FlatBufferBuilder::new();
|
||||||
let path_off = builder.create_string(path.to_str().unwrap());
|
let path_off = builder.create_string(path.to_str().unwrap());
|
||||||
let msg = msg::ReadlinkRes::create(
|
let msg = msg::ReadlinkRes::create(
|
||||||
|
@ -708,5 +746,5 @@ fn handle_read_link(_state: Arc<IsolateState>, base: &msg::Base) -> Box<Op> {
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
},
|
||||||
))
|
))
|
||||||
}()))
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ extern crate rand;
|
||||||
extern crate tempfile;
|
extern crate tempfile;
|
||||||
extern crate tokio;
|
extern crate tokio;
|
||||||
extern crate tokio_executor;
|
extern crate tokio_executor;
|
||||||
|
extern crate tokio_threadpool;
|
||||||
extern crate url;
|
extern crate url;
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate lazy_static;
|
extern crate lazy_static;
|
||||||
|
|
Loading…
Reference in a new issue