1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-21 15:04:11 -05:00

Replace blocking! macro by generic function (#1305)

This commit is contained in:
F001 2018-12-11 21:36:34 +08:00 committed by Ryan Dahl
parent 9a960b9f58
commit c1de50b0ca
3 changed files with 58 additions and 35 deletions

View file

@ -13,9 +13,9 @@ use std::os::unix::fs::DirBuilderExt;
#[cfg(any(unix))]
use std::os::unix::fs::PermissionsExt;
pub fn write_file(
pub fn write_file<T: AsRef<[u8]>>(
filename: &Path,
data: &[u8],
data: T,
perm: u32,
) -> std::io::Result<()> {
let is_append = perm & (1 << 31) != 0;
@ -28,7 +28,7 @@ pub fn write_file(
.open(filename)?;
set_permissions(&mut file, perm)?;
file.write_all(data)
file.write_all(data.as_ref())
}
#[cfg(any(unix))]

View file

@ -16,7 +16,6 @@ use version;
use flatbuffers::FlatBufferBuilder;
use futures;
use futures::future::poll_fn;
use futures::Poll;
use hyper;
use hyper::rt::Future;
@ -42,6 +41,7 @@ use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio_process::CommandExt;
use tokio_threadpool;
use tokio_util;
type OpResult = DenoResult<Buf>;
@ -236,11 +236,13 @@ fn serialize_response(
data.into()
}
#[inline]
fn ok_future(buf: Buf) -> Box<Op> {
Box::new(futures::future::ok(buf))
}
// Shout out to Earl Sweatshirt.
#[inline]
fn odd_future(err: DenoError) -> Box<Op> {
Box::new(futures::future::err(err))
}
@ -460,21 +462,15 @@ where
}
}
// TODO Do not use macro for the blocking function.. We should instead be able
// to do this with a normal function, but there seems to some type system
// issues. The type of this function should be something like this:
// fn blocking<F>(is_sync: bool, f: F) -> Box<Op>
// where F: FnOnce() -> DenoResult<Buf>
macro_rules! blocking {
($is_sync:expr, $fn:expr) => {
if $is_sync {
// If synchronous, execute the function immediately on the main thread.
Box::new(futures::future::result($fn()))
} else {
// Otherwise dispatch to thread pool.
Box::new(poll_fn(move || convert_blocking($fn)))
}
};
fn blocking<F>(is_sync: bool, f: F) -> Box<Op>
where
F: 'static + Send + FnOnce() -> DenoResult<Buf>,
{
if is_sync {
Box::new(futures::future::result(f()))
} else {
Box::new(tokio_util::poll_fn(move || convert_blocking(f)))
}
}
fn op_make_temp_dir(
@ -496,7 +492,7 @@ fn op_make_temp_dir(
let prefix = inner.prefix().map(String::from);
let suffix = inner.suffix().map(String::from);
blocking!(base.sync(), || -> OpResult {
blocking(base.sync(), move || -> OpResult {
// TODO(piscisaureus): use byte vector for paths, not a string.
// See https://github.com/denoland/deno/issues/627.
// We can't assume that paths are always valid utf8 strings.
@ -539,7 +535,7 @@ fn op_mkdir(
if let Err(e) = state.check_write(&path) {
return odd_future(e);
}
blocking!(base.sync(), || {
blocking(base.sync(), move || {
debug!("op_mkdir {}", path);
deno_fs::mkdir(Path::new(&path), mode)?;
Ok(empty_buf())
@ -560,7 +556,7 @@ fn op_chmod(
return odd_future(e);
}
blocking!(base.sync(), || {
blocking(base.sync(), move || {
debug!("op_chmod {}", &path);
let path = PathBuf::from(&path);
// Still check file/dir exists on windows
@ -644,7 +640,7 @@ fn op_shutdown(
1 => Shutdown::Write,
_ => unimplemented!(),
};
blocking!(base.sync(), || {
blocking(base.sync(), move || {
// Use UFCS for disambiguation
Resource::shutdown(&mut resource, shutdown_mode)?;
Ok(empty_buf())
@ -743,7 +739,7 @@ fn op_remove(
return odd_future(e);
}
blocking!(base.sync(), || {
blocking(base.sync(), move || {
debug!("op_remove {}", path.display());
let metadata = fs::metadata(&path)?;
if metadata.is_file() {
@ -768,7 +764,7 @@ fn op_read_file(
let cmd_id = base.cmd_id();
let filename = PathBuf::from(inner.filename().unwrap());
debug!("op_read_file {}", filename.display());
blocking!(base.sync(), || {
blocking(base.sync(), move || {
let vec = fs::read(&filename)?;
// Build the response message. memcpy data into inner.
// TODO(ry) zero-copy.
@ -808,7 +804,7 @@ fn op_copy_file(
}
debug!("op_copy_file {} {}", from.display(), to.display());
blocking!(base.sync(), || {
blocking(base.sync(), move || {
// On *nix, Rust deem non-existent path as invalid input
// See https://github.com/rust-lang/rust/issues/54800
// Once the issue is reolved, we should remove this workaround.
@ -881,7 +877,7 @@ fn op_stat(
let filename = PathBuf::from(inner.filename().unwrap());
let lstat = inner.lstat();
blocking!(base.sync(), || {
blocking(base.sync(), move || {
let builder = &mut FlatBufferBuilder::new();
debug!("op_stat {} {}", filename.display(), lstat);
let metadata = if lstat {
@ -927,7 +923,7 @@ fn op_read_dir(
let cmd_id = base.cmd_id();
let path = String::from(inner.path().unwrap());
blocking!(base.sync(), || -> OpResult {
blocking(base.sync(), move || -> OpResult {
debug!("op_read_dir {}", path);
let builder = &mut FlatBufferBuilder::new();
let entries: Vec<_> = fs::read_dir(Path::new(&path))?
@ -986,9 +982,9 @@ fn op_write_file(
return odd_future(e);
}
blocking!(base.sync(), || -> OpResult {
blocking(base.sync(), move || -> OpResult {
debug!("op_write_file {} {}", filename, data.len());
deno_fs::write_file(Path::new(&filename), &data, perm)?;
deno_fs::write_file(Path::new(&filename), data, perm)?;
Ok(empty_buf())
})
}
@ -1006,7 +1002,7 @@ fn op_rename(
if let Err(e) = state.check_write(&newpath_) {
return odd_future(e);
}
blocking!(base.sync(), || -> OpResult {
blocking(base.sync(), move || -> OpResult {
debug!("op_rename {} {}", oldpath.display(), newpath.display());
fs::rename(&oldpath, &newpath)?;
Ok(empty_buf())
@ -1034,7 +1030,7 @@ fn op_symlink(
"Not implemented".to_string(),
));
}
blocking!(base.sync(), || -> OpResult {
blocking(base.sync(), move || -> OpResult {
debug!("op_symlink {} {}", oldname.display(), newname.display());
#[cfg(any(unix))]
std::os::unix::fs::symlink(&oldname, &newname)?;
@ -1052,7 +1048,7 @@ fn op_read_link(
let cmd_id = base.cmd_id();
let name = PathBuf::from(inner.name().unwrap());
blocking!(base.sync(), || -> OpResult {
blocking(base.sync(), move || -> OpResult {
debug!("op_read_link {}", name.display());
let path = fs::read_link(&name)?;
let builder = &mut FlatBufferBuilder::new();
@ -1121,7 +1117,7 @@ fn op_repl_readline(
// Ignore this clippy warning until this issue is addressed:
// https://github.com/rust-lang-nursery/rust-clippy/issues/1684
#[cfg_attr(feature = "cargo-clippy", allow(redundant_closure_call))]
blocking!(base.sync(), || -> OpResult {
blocking(base.sync(), move || -> OpResult {
let line = resources::readline(rid, &prompt)?;
let builder = &mut FlatBufferBuilder::new();
@ -1159,7 +1155,7 @@ fn op_truncate(
return odd_future(e);
}
blocking!(base.sync(), || {
blocking(base.sync(), move || {
debug!("op_truncate {} {}", filename, len);
let f = fs::OpenOptions::new().write(true).open(&filename)?;
f.set_len(u64::from(len))?;

View file

@ -73,3 +73,30 @@ impl Future for Accept {
}
}
}
/// `futures::future::poll_fn` only support `F: FnMut()->Poll<T, E>`
/// However, we require that `F: FnOnce()->Poll<T, E>`.
/// Therefore, we created our version of `poll_fn`.
pub fn poll_fn<T, E, F>(f: F) -> PollFn<F>
where
F: FnOnce() -> Poll<T, E>,
{
PollFn { inner: Some(f) }
}
pub struct PollFn<F> {
inner: Option<F>,
}
impl<T, E, F> Future for PollFn<F>
where
F: FnOnce() -> Poll<T, E>,
{
type Item = T;
type Error = E;
fn poll(&mut self) -> Poll<T, E> {
let f = self.inner.take().expect("Inner fn has been taken.");
f()
}
}