diff --git a/cli/dispatch_minimal.rs b/cli/dispatch_minimal.rs index 0e20d3b1ae..e00203576e 100644 --- a/cli/dispatch_minimal.rs +++ b/cli/dispatch_minimal.rs @@ -6,6 +6,7 @@ //! message or a "minimal" message. use crate::state::ThreadSafeState; use deno::Buf; +use deno::CoreOp; use deno::Op; use deno::PinnedBuf; use futures::Future; @@ -89,7 +90,7 @@ pub fn dispatch_minimal( state: &ThreadSafeState, mut record: Record, zero_copy: Option, -) -> Op { +) -> CoreOp { let is_sync = record.promise_id == 0; let min_op = match record.op_id { OP_READ => ops::read(record.arg, zero_copy), diff --git a/cli/errors.rs b/cli/errors.rs index eb0fc7d276..67eb54ea79 100644 --- a/cli/errors.rs +++ b/cli/errors.rs @@ -243,6 +243,20 @@ pub fn no_buffer_specified() -> DenoError { new(ErrorKind::InvalidInput, String::from("no buffer specified")) } +pub fn no_async_support() -> DenoError { + new( + ErrorKind::NoAsyncSupport, + String::from("op doesn't support async calls"), + ) +} + +pub fn no_sync_support() -> DenoError { + new( + ErrorKind::NoSyncSupport, + String::from("op doesn't support sync calls"), + ) +} + #[derive(Debug)] pub enum RustOrJsError { Rust(DenoError), diff --git a/cli/msg.fbs b/cli/msg.fbs index 56410097c3..7e8292740c 100644 --- a/cli/msg.fbs +++ b/cli/msg.fbs @@ -136,6 +136,8 @@ enum ErrorKind: byte { OpNotAvaiable, WorkerInitFailed, UnixError, + NoAsyncSupport, + NoSyncSupport, ImportMapError, } diff --git a/cli/ops.rs b/cli/ops.rs index e8fa47aada..2155dcd5a5 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -27,13 +27,14 @@ use crate::version; use crate::worker::Worker; use deno::js_check; use deno::Buf; +use deno::CoreOp; use deno::JSError; use deno::ModuleSpecifier; use deno::Op; +use deno::OpResult; use deno::PinnedBuf; use flatbuffers::FlatBufferBuilder; use futures; -use futures::future; use futures::Async; use futures::Poll; use futures::Sink; @@ -61,17 +62,13 @@ use std::os::unix::fs::PermissionsExt; #[cfg(unix)] use std::os::unix::process::ExitStatusExt; -type OpResult = DenoResult; +type CliOpResult = OpResult; -pub type OpWithError = dyn Future + Send; - -// TODO Ideally we wouldn't have to box the OpWithError being returned. -// The box is just to make it easier to get a prototype refactor working. -type OpCreator = +type CliDispatchFn = fn(state: &ThreadSafeState, base: &msg::Base<'_>, data: Option) - -> Box; + -> CliOpResult; -pub type OpSelector = fn(inner_type: msg::Any) -> Option; +pub type OpSelector = fn(inner_type: msg::Any) -> Option; #[inline] fn empty_buf() -> Buf { @@ -83,7 +80,7 @@ pub fn dispatch_all( control: &[u8], zero_copy: Option, op_selector: OpSelector, -) -> Op { +) -> CoreOp { let bytes_sent_control = control.len(); let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); let op = if let Some(min_record) = parse_min_record(control) { @@ -104,29 +101,78 @@ pub fn dispatch_all_legacy( control: &[u8], zero_copy: Option, op_selector: OpSelector, -) -> Op { +) -> CoreOp { let base = msg::get_root_as_base(&control); - let is_sync = base.sync(); let inner_type = base.inner_type(); + let is_sync = base.sync(); let cmd_id = base.cmd_id(); - let op_func: OpCreator = match op_selector(inner_type) { + debug!( + "msg_from_js {} sync {}", + msg::enum_name_any(inner_type), + is_sync + ); + + let op_func: CliDispatchFn = match op_selector(inner_type) { Some(v) => v, None => panic!("Unhandled message {}", msg::enum_name_any(inner_type)), }; - let op: Box = op_func(state, &base, zero_copy); + let op_result = op_func(state, &base, zero_copy); let state = state.clone(); - let fut = Box::new( - op.or_else(move |err: DenoError| -> Result { + match op_result { + Ok(Op::Sync(buf)) => { + state.metrics_op_completed(buf.len()); + Op::Sync(buf) + } + Ok(Op::Async(fut)) => { + let result_fut = Box::new( + fut.or_else(move |err: DenoError| -> Result { + debug!("op err {}", err); + // No matter whether we got an Err or Ok, we want a serialized message to + // send back. So transform the DenoError into a Buf. + let builder = &mut FlatBufferBuilder::new(); + let errmsg_offset = builder.create_string(&format!("{}", err)); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + error: Some(errmsg_offset), + error_kind: err.kind(), + ..Default::default() + }, + )) + }).and_then(move |buf: Buf| -> Result { + // Handle empty responses. For sync responses we just want + // to send null. For async we want to send a small message + // with the cmd_id. + let buf = if buf.len() > 0 { + buf + } else { + let builder = &mut FlatBufferBuilder::new(); + serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + ) + }; + state.metrics_op_completed(buf.len()); + Ok(buf) + }).map_err(|err| panic!("unexpected error {:?}", err)), + ); + Op::Async(result_fut) + } + Err(err) => { debug!("op err {}", err); // No matter whether we got an Err or Ok, we want a serialized message to // send back. So transform the DenoError into a Buf. let builder = &mut FlatBufferBuilder::new(); let errmsg_offset = builder.create_string(&format!("{}", err)); - Ok(serialize_response( + let response_buf = serialize_response( cmd_id, builder, msg::BaseArgs { @@ -134,51 +180,15 @@ pub fn dispatch_all_legacy( error_kind: err.kind(), ..Default::default() }, - )) - }).and_then(move |buf: Buf| -> Result { - // Handle empty responses. For sync responses we just want - // to send null. For async we want to send a small message - // with the cmd_id. - let buf = if is_sync || buf.len() > 0 { - buf - } else { - let builder = &mut FlatBufferBuilder::new(); - serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - ) - }; - state.metrics_op_completed(buf.len()); - Ok(buf) - }).map_err(|err| panic!("unexpected error {:?}", err)), - ); - - debug!( - "msg_from_js {} sync {}", - msg::enum_name_any(inner_type), - base.sync() - ); - - if base.sync() { - // TODO(ry) This is not correct! If the sync op is not actually synchronous - // (like in the case of op_fetch_module_meta_data) this wait() will block - // a thread in the Tokio runtime. Depending on the size of the runtime's - // thread pool, this may result in a dead lock! - // - // The solution is that ops should return an Op directly. Op::Sync contains - // the result value, so if its returned directly from the OpCreator, we - // know it has actually be evaluated synchronously. - Op::Sync(fut.wait().unwrap()) - } else { - Op::Async(fut) + ); + state.metrics_op_completed(response_buf.len()); + Op::Sync(response_buf) + } } } /// Standard ops set for most isolates -pub fn op_selector_std(inner_type: msg::Any) -> Option { +pub fn op_selector_std(inner_type: msg::Any) -> Option { match inner_type { msg::Any::Accept => Some(op_accept), msg::Any::Cache => Some(op_cache), @@ -249,7 +259,7 @@ fn op_now( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let seconds = state.start_time.elapsed().as_secs(); let mut subsec_nanos = state.start_time.elapsed().subsec_nanos(); @@ -270,7 +280,8 @@ fn op_now( subsec_nanos, }, ); - ok_future(serialize_response( + + ok_buf(serialize_response( base.cmd_id(), builder, msg::BaseArgs { @@ -285,7 +296,7 @@ fn op_is_tty( _state: &ThreadSafeState, base: &msg::Base<'_>, _data: Option, -) -> Box { +) -> CliOpResult { let builder = &mut FlatBufferBuilder::new(); let inner = msg::IsTTYRes::create( builder, @@ -295,7 +306,7 @@ fn op_is_tty( stderr: atty::is(atty::Stream::Stderr), }, ); - ok_future(serialize_response( + ok_buf(serialize_response( base.cmd_id(), builder, msg::BaseArgs { @@ -310,7 +321,7 @@ fn op_exit( _state: &ThreadSafeState, base: &msg::Base<'_>, _data: Option, -) -> Box { +) -> CliOpResult { let inner = base.inner_as_exit().unwrap(); std::process::exit(inner.code()) } @@ -319,7 +330,7 @@ fn op_start( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let mut builder = FlatBufferBuilder::new(); @@ -368,7 +379,7 @@ fn op_start( }, ); - ok_future(serialize_response( + ok_buf(serialize_response( base.cmd_id(), &mut builder, msg::BaseArgs { @@ -383,7 +394,7 @@ fn op_format_error( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_format_error().unwrap(); let orig_error = String::from(inner.error().unwrap()); @@ -402,7 +413,7 @@ fn op_format_error( }, ); - ok_future(serialize_response( + let response_buf = serialize_response( base.cmd_id(), &mut builder, msg::BaseArgs { @@ -410,7 +421,9 @@ fn op_format_error( inner: Some(inner.as_union_value()), ..Default::default() }, - )) + ); + + ok_buf(response_buf) } fn serialize_response( @@ -427,21 +440,20 @@ fn serialize_response( } #[inline] -pub fn ok_future(buf: Buf) -> Box { - Box::new(futures::future::ok(buf)) +pub fn ok_future(buf: Buf) -> CliOpResult { + Ok(Op::Async(Box::new(futures::future::ok(buf)))) } -// Shout out to Earl Sweatshirt. #[inline] -pub fn odd_future(err: DenoError) -> Box { - Box::new(futures::future::err(err)) +pub fn ok_buf(buf: Buf) -> CliOpResult { + Ok(Op::Sync(buf)) } fn op_cache( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_cache().unwrap(); let extension = inner.extension().unwrap(); @@ -455,11 +467,9 @@ fn op_cache( // cache path. In the future, checksums will not be used in the cache // filenames and this requirement can be removed. See // https://github.com/denoland/deno/issues/2057 - let r = state.dir.fetch_module_meta_data(module_id, ".", true, true); - if let Err(err) = r { - return odd_future(err); - } - let module_meta_data = r.unwrap(); + let module_meta_data = state + .dir + .fetch_module_meta_data(module_id, ".", true, true)?; let (js_cache_path, source_map_path) = state .dir @@ -467,21 +477,15 @@ fn op_cache( if extension == ".map" { debug!("cache {:?}", source_map_path); - let r = fs::write(source_map_path, contents); - if let Err(err) = r { - return odd_future(err.into()); - } + fs::write(source_map_path, contents).map_err(DenoError::from)?; } else if extension == ".js" { debug!("cache {:?}", js_cache_path); - let r = fs::write(js_cache_path, contents); - if let Err(err) = r { - return odd_future(err.into()); - } + fs::write(js_cache_path, contents).map_err(DenoError::from)?; } else { unreachable!(); } - ok_future(empty_buf()) + ok_buf(empty_buf()) } // https://github.com/denoland/deno/blob/golang/os.go#L100-L154 @@ -489,7 +493,10 @@ fn op_fetch_module_meta_data( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { + if !base.sync() { + return Err(errors::no_async_support()); + } assert!(data.is_none()); let inner = base.inner_as_fetch_module_meta_data().unwrap(); let cmd_id = base.cmd_id(); @@ -510,7 +517,7 @@ fn op_fetch_module_meta_data( Some(module_specifier) => module_specifier.to_string(), None => specifier.to_string(), }, - Err(err) => return odd_future(DenoError::from(err)), + Err(err) => return Err(DenoError::from(err)), }, None => specifier.to_string(), }; @@ -543,50 +550,48 @@ fn op_fetch_module_meta_data( )) }); - // Unfortunately TypeScript's CompilerHost interface does not leave room for - // asynchronous source code fetching. This complicates things greatly and - // requires us to use tokio_util::block_on() below. - assert!(base.sync()); - // WARNING: Here we use tokio_util::block_on() which starts a new Tokio // runtime for executing the future. This is so we don't inadvernently run // out of threads in the main runtime. - Box::new(futures::future::result(tokio_util::block_on(fut))) + let result_buf = tokio_util::block_on(fut)?; + Ok(Op::Sync(result_buf)) } fn op_chdir( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_chdir().unwrap(); let directory = inner.directory().unwrap(); - Box::new(futures::future::result(|| -> OpResult { - std::env::set_current_dir(&directory)?; - Ok(empty_buf()) - }())) + std::env::set_current_dir(&directory)?; + ok_buf(empty_buf()) } fn op_global_timer_stop( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { - assert!(base.sync()); +) -> CliOpResult { + if !base.sync() { + return Err(errors::no_async_support()); + } assert!(data.is_none()); let state = state; let mut t = state.global_timer.lock().unwrap(); t.cancel(); - ok_future(empty_buf()) + Ok(Op::Sync(empty_buf())) } fn op_global_timer( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { - assert!(!base.sync()); +) -> CliOpResult { + if base.sync() { + return Err(errors::no_sync_support()); + } assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_global_timer().unwrap(); @@ -598,7 +603,7 @@ fn op_global_timer( let deadline = Instant::now() + Duration::from_millis(val as u64); let f = t.new_timeout(deadline); - Box::new(f.then(move |_| { + Ok(Op::Async(Box::new(f.then(move |_| { let builder = &mut FlatBufferBuilder::new(); let inner = msg::GlobalTimerRes::create(builder, &msg::GlobalTimerResArgs {}); @@ -611,36 +616,32 @@ fn op_global_timer( ..Default::default() }, )) - })) + })))) } fn op_set_env( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_set_env().unwrap(); let key = inner.key().unwrap(); let value = inner.value().unwrap(); - if let Err(e) = state.check_env() { - return odd_future(e); - } + state.check_env()?; std::env::set_var(key, value); - ok_future(empty_buf()) + ok_buf(empty_buf()) } fn op_env( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); - if let Err(e) = state.check_env() { - return odd_future(e); - } + state.check_env()?; let builder = &mut FlatBufferBuilder::new(); let vars: Vec<_> = std::env::vars() @@ -651,7 +652,7 @@ fn op_env( builder, &msg::EnvironResArgs { map: Some(tables) }, ); - ok_future(serialize_response( + let response_buf = serialize_response( cmd_id, builder, msg::BaseArgs { @@ -659,14 +660,15 @@ fn op_env( inner_type: msg::Any::EnvironRes, ..Default::default() }, - )) + ); + ok_buf(response_buf) } fn op_permissions( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); let builder = &mut FlatBufferBuilder::new(); @@ -681,7 +683,7 @@ fn op_permissions( hrtime: state.permissions.allows_hrtime(), }, ); - ok_future(serialize_response( + let response_buf = serialize_response( cmd_id, builder, msg::BaseArgs { @@ -689,18 +691,19 @@ fn op_permissions( inner_type: msg::Any::PermissionsRes, ..Default::default() }, - )) + ); + ok_buf(response_buf) } fn op_revoke_permission( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_permission_revoke().unwrap(); let permission = inner.permission().unwrap(); - let result = match permission { + match permission { "run" => state.permissions.revoke_run(), "read" => state.permissions.revoke_read(), "write" => state.permissions.revoke_write(), @@ -708,18 +711,15 @@ fn op_revoke_permission( "env" => state.permissions.revoke_env(), "hrtime" => state.permissions.revoke_hrtime(), _ => Ok(()), - }; - if let Err(e) = result { - return odd_future(e); - } - ok_future(empty_buf()) + }?; + ok_buf(empty_buf()) } fn op_fetch( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { let inner = base.inner_as_fetch().unwrap(); let cmd_id = base.cmd_id(); @@ -732,19 +732,10 @@ fn op_fetch( Some(buf) => hyper::Body::from(Vec::from(&*buf)), }; - let maybe_req = msg_util::deserialize_request(header, body); - if let Err(e) = maybe_req { - return odd_future(e); - } - let req = maybe_req.unwrap(); + let req = msg_util::deserialize_request(header, body)?; - let url_ = match url::Url::parse(url) { - Err(err) => return odd_future(DenoError::from(err)), - Ok(v) => v, - }; - if let Err(e) = state.check_net_url(url_) { - return odd_future(e); - } + let url_ = url::Url::parse(url).map_err(DenoError::from)?; + state.check_net_url(url_)?; let client = http_util::get_client(); @@ -776,7 +767,12 @@ fn op_fetch( }, )) }); - Box::new(future) + if base.sync() { + let result_buf = future.wait()?; + Ok(Op::Sync(result_buf)) + } else { + Ok(Op::Async(Box::new(future))) + } } // This is just type conversion. Implement From trait? @@ -794,14 +790,17 @@ where } } -fn blocking(is_sync: bool, f: F) -> Box +fn blocking(is_sync: bool, f: F) -> CliOpResult where F: 'static + Send + FnOnce() -> DenoResult, { if is_sync { - Box::new(futures::future::result(f())) + let result_buf = f()?; + Ok(Op::Sync(result_buf)) } else { - Box::new(tokio_util::poll_fn(move || convert_blocking(f))) + Ok(Op::Async(Box::new(tokio_util::poll_fn(move || { + convert_blocking(f) + })))) } } @@ -809,22 +808,20 @@ fn op_make_temp_dir( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let base = Box::new(*base); let inner = base.inner_as_make_temp_dir().unwrap(); let cmd_id = base.cmd_id(); // FIXME - if let Err(e) = state.check_write("make_temp") { - return odd_future(e); - } + state.check_write("make_temp")?; let dir = inner.dir().map(PathBuf::from); let prefix = inner.prefix().map(String::from); let suffix = inner.suffix().map(String::from); - blocking(base.sync(), move || -> OpResult { + blocking(base.sync(), move || { // TODO(piscisaureus): use byte vector for paths, not a string. // See https://github.com/denoland/deno/issues/627. // We can't assume that paths are always valid utf8 strings. @@ -858,19 +855,14 @@ fn op_mkdir( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_mkdir().unwrap(); - let (path, path_) = match resolve_path(inner.path().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (path, path_) = resolve_path(inner.path().unwrap())?; let recursive = inner.recursive(); let mode = inner.mode(); - if let Err(e) = state.check_write(&path_) { - return odd_future(e); - } + state.check_write(&path_)?; blocking(base.sync(), move || { debug!("op_mkdir {}", path_); @@ -883,18 +875,13 @@ fn op_chmod( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_chmod().unwrap(); let _mode = inner.mode(); - let (path, path_) = match resolve_path(inner.path().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (path, path_) = resolve_path(inner.path().unwrap())?; - if let Err(e) = state.check_write(&path_) { - return odd_future(e); - } + state.check_write(&path_)?; blocking(base.sync(), move || { debug!("op_chmod {}", &path_); @@ -914,16 +901,14 @@ fn op_chown( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_chown().unwrap(); let path = String::from(inner.path().unwrap()); let uid = inner.uid(); let gid = inner.gid(); - if let Err(e) = state.check_write(&path) { - return odd_future(e); - } + state.check_write(&path)?; blocking(base.sync(), move || { debug!("op_chown {}", &path); @@ -938,14 +923,11 @@ fn op_open( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_open().unwrap(); - let (filename, filename_) = match resolve_path(inner.filename().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (filename, filename_) = resolve_path(inner.filename().unwrap())?; let mode = inner.mode().unwrap(); let mut open_options = tokio::fs::OpenOptions::new(); @@ -986,29 +968,21 @@ fn op_open( match mode { "r" => { - if let Err(e) = state.check_read(&filename_) { - return odd_future(e); - } + state.check_read(&filename_)?; } "w" | "a" | "x" => { - if let Err(e) = state.check_write(&filename_) { - return odd_future(e); - } + state.check_write(&filename_)?; } &_ => { - if let Err(e) = state.check_read(&filename_) { - return odd_future(e); - } - if let Err(e) = state.check_write(&filename_) { - return odd_future(e); - } + state.check_read(&filename_)?; + state.check_write(&filename_)?; } } let op = open_options .open(filename) .map_err(DenoError::from) - .and_then(move |fs_file| -> OpResult { + .and_then(move |fs_file| { let resource = resources::add_fs_file(fs_file); let builder = &mut FlatBufferBuilder::new(); let inner = @@ -1023,22 +997,27 @@ fn op_open( }, )) }); - Box::new(op) + if base.sync() { + let buf = op.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(op))) + } } fn op_close( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_close().unwrap(); let rid = inner.rid(); match resources::lookup(rid) { - None => odd_future(errors::bad_resource()), + None => Err(errors::bad_resource()), Some(resource) => { resource.close(); - ok_future(empty_buf()) + ok_buf(empty_buf()) } } } @@ -1047,28 +1026,26 @@ fn op_kill( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_kill().unwrap(); let pid = inner.pid(); let signo = inner.signo(); - match kill(pid, signo) { - Ok(_) => ok_future(empty_buf()), - Err(e) => odd_future(e), - } + kill(pid, signo)?; + ok_buf(empty_buf()) } fn op_shutdown( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_shutdown().unwrap(); let rid = inner.rid(); let how = inner.how(); match resources::lookup(rid) { - None => odd_future(errors::bad_resource()), + None => Err(errors::bad_resource()), Some(mut resource) => { let shutdown_mode = match how { 0 => Shutdown::Read, @@ -1088,13 +1065,13 @@ fn op_read( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { let cmd_id = base.cmd_id(); let inner = base.inner_as_read().unwrap(); let rid = inner.rid(); match resources::lookup(rid) { - None => odd_future(errors::bad_resource()), + None => Err(errors::bad_resource()), Some(resource) => { let op = tokio::io::read(resource, data.unwrap()) .map_err(DenoError::from) @@ -1117,7 +1094,12 @@ fn op_read( }, )) }); - Box::new(op) + if base.sync() { + let buf = op.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(op))) + } } } } @@ -1126,13 +1108,13 @@ fn op_write( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { let cmd_id = base.cmd_id(); let inner = base.inner_as_write().unwrap(); let rid = inner.rid(); match resources::lookup(rid) { - None => odd_future(errors::bad_resource()), + None => Err(errors::bad_resource()), Some(resource) => { let op = tokio_write::write(resource, data.unwrap()) .map_err(DenoError::from) @@ -1154,7 +1136,12 @@ fn op_write( }, )) }); - Box::new(op) + if base.sync() { + let buf = op.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(op))) + } } } } @@ -1163,20 +1150,24 @@ fn op_seek( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); - let _cmd_id = base.cmd_id(); let inner = base.inner_as_seek().unwrap(); let rid = inner.rid(); let offset = inner.offset(); let whence = inner.whence(); match resources::lookup(rid) { - None => odd_future(errors::bad_resource()), + None => Err(errors::bad_resource()), Some(resource) => { let op = resources::seek(resource, offset, whence) .and_then(move |_| Ok(empty_buf())); - Box::new(op) + if base.sync() { + let buf = op.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(op))) + } } } } @@ -1185,18 +1176,13 @@ fn op_remove( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_remove().unwrap(); - let (path, path_) = match resolve_path(inner.path().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (path, path_) = resolve_path(inner.path().unwrap())?; let recursive = inner.recursive(); - if let Err(e) = state.check_write(&path_) { - return odd_future(e); - } + state.check_write(&path_)?; blocking(base.sync(), move || { debug!("op_remove {}", path.display()); @@ -1216,24 +1202,14 @@ fn op_copy_file( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_copy_file().unwrap(); - let (from, from_) = match resolve_path(inner.from().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; - let (to, to_) = match resolve_path(inner.to().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (from, from_) = resolve_path(inner.from().unwrap())?; + let (to, to_) = resolve_path(inner.to().unwrap())?; - if let Err(e) = state.check_read(&from_) { - return odd_future(e); - } - if let Err(e) = state.check_write(&to_) { - return odd_future(e); - } + state.check_read(&from_)?; + state.check_write(&to_)?; debug!("op_copy_file {} {}", from.display(), to.display()); blocking(base.sync(), move || { @@ -1276,45 +1252,38 @@ fn op_cwd( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); - Box::new(futures::future::result(|| -> OpResult { - let path = std::env::current_dir()?; - let builder = &mut FlatBufferBuilder::new(); - let cwd = - builder.create_string(&path.into_os_string().into_string().unwrap()); - let inner = - msg::CwdRes::create(builder, &msg::CwdResArgs { cwd: Some(cwd) }); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::CwdRes, - ..Default::default() - }, - )) - }())) + let path = std::env::current_dir()?; + let builder = &mut FlatBufferBuilder::new(); + let cwd = + builder.create_string(&path.into_os_string().into_string().unwrap()); + let inner = msg::CwdRes::create(builder, &msg::CwdResArgs { cwd: Some(cwd) }); + let response_buf = serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::CwdRes, + ..Default::default() + }, + ); + ok_buf(response_buf) } fn op_stat( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_stat().unwrap(); let cmd_id = base.cmd_id(); - let (filename, filename_) = match resolve_path(inner.filename().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (filename, filename_) = resolve_path(inner.filename().unwrap())?; let lstat = inner.lstat(); - if let Err(e) = state.check_read(&filename_) { - return odd_future(e); - } + state.check_read(&filename_)?; blocking(base.sync(), move || { let builder = &mut FlatBufferBuilder::new(); @@ -1356,20 +1325,15 @@ fn op_read_dir( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_read_dir().unwrap(); let cmd_id = base.cmd_id(); - let (path, path_) = match resolve_path(inner.path().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (path, path_) = resolve_path(inner.path().unwrap())?; - if let Err(e) = state.check_read(&path_) { - return odd_future(e); - } + state.check_read(&path_)?; - blocking(base.sync(), move || -> OpResult { + blocking(base.sync(), move || { debug!("op_read_dir {}", path.display()); let builder = &mut FlatBufferBuilder::new(); let entries: Vec<_> = fs::read_dir(path)? @@ -1418,22 +1382,15 @@ fn op_rename( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_rename().unwrap(); - let (oldpath, _) = match resolve_path(inner.oldpath().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; - let (newpath, newpath_) = match resolve_path(inner.newpath().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (oldpath, _) = resolve_path(inner.oldpath().unwrap())?; + let (newpath, newpath_) = resolve_path(inner.newpath().unwrap())?; - if let Err(e) = state.check_write(&newpath_) { - return odd_future(e); - } - blocking(base.sync(), move || -> OpResult { + state.check_write(&newpath_)?; + + blocking(base.sync(), move || { debug!("op_rename {} {}", oldpath.display(), newpath.display()); fs::rename(&oldpath, &newpath)?; Ok(empty_buf()) @@ -1444,23 +1401,15 @@ fn op_link( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_link().unwrap(); - let (oldname, _) = match resolve_path(inner.oldname().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; - let (newname, newname_) = match resolve_path(inner.newname().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (oldname, _) = resolve_path(inner.oldname().unwrap())?; + let (newname, newname_) = resolve_path(inner.newname().unwrap())?; - if let Err(e) = state.check_write(&newname_) { - return odd_future(e); - } + state.check_write(&newname_)?; - blocking(base.sync(), move || -> OpResult { + blocking(base.sync(), move || { debug!("op_link {} {}", oldname.display(), newname.display()); std::fs::hard_link(&oldname, &newname)?; Ok(empty_buf()) @@ -1471,29 +1420,18 @@ fn op_symlink( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_symlink().unwrap(); - let (oldname, _) = match resolve_path(inner.oldname().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; - let (newname, newname_) = match resolve_path(inner.newname().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (oldname, _) = resolve_path(inner.oldname().unwrap())?; + let (newname, newname_) = resolve_path(inner.newname().unwrap())?; - if let Err(e) = state.check_write(&newname_) { - return odd_future(e); - } + state.check_write(&newname_)?; // TODO Use type for Windows. if cfg!(windows) { - return odd_future(errors::new( - ErrorKind::Other, - "Not implemented".to_string(), - )); + return Err(errors::new(ErrorKind::Other, "Not implemented".to_string())); } - blocking(base.sync(), move || -> OpResult { + blocking(base.sync(), move || { debug!("op_symlink {} {}", oldname.display(), newname.display()); #[cfg(any(unix))] std::os::unix::fs::symlink(&oldname, &newname)?; @@ -1505,20 +1443,15 @@ fn op_read_link( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_readlink().unwrap(); let cmd_id = base.cmd_id(); - let (name, name_) = match resolve_path(inner.name().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (name, name_) = resolve_path(inner.name().unwrap())?; - if let Err(e) = state.check_read(&name_) { - return odd_future(e); - } + state.check_read(&name_)?; - blocking(base.sync(), move || -> OpResult { + blocking(base.sync(), move || { debug!("op_read_link {}", name.display()); let path = fs::read_link(&name)?; let builder = &mut FlatBufferBuilder::new(); @@ -1545,7 +1478,7 @@ fn op_repl_start( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_repl_start().unwrap(); let cmd_id = base.cmd_id(); @@ -1561,7 +1494,7 @@ fn op_repl_start( builder, &msg::ReplStartResArgs { rid: resource.rid }, ); - ok_future(serialize_response( + ok_buf(serialize_response( cmd_id, builder, msg::BaseArgs { @@ -1576,7 +1509,7 @@ fn op_repl_readline( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_repl_readline().unwrap(); let cmd_id = base.cmd_id(); @@ -1584,7 +1517,7 @@ fn op_repl_readline( let prompt = inner.prompt().unwrap().to_owned(); debug!("op_repl_readline {} {}", rid, prompt); - blocking(base.sync(), move || -> OpResult { + blocking(base.sync(), move || { let repl = resources::get_repl(rid)?; let line = repl.lock().unwrap().readline(&prompt)?; @@ -1612,19 +1545,14 @@ fn op_truncate( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_truncate().unwrap(); - let (filename, filename_) = match resolve_path(inner.name().unwrap()) { - Err(err) => return odd_future(err), - Ok(v) => v, - }; + let (filename, filename_) = resolve_path(inner.name().unwrap())?; let len = inner.len(); - if let Err(e) = state.check_write(&filename_) { - return odd_future(e); - } + state.check_write(&filename_)?; blocking(base.sync(), move || { debug!("op_truncate {} {}", filename_, len); @@ -1638,7 +1566,7 @@ fn op_utime( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let inner = base.inner_as_utime().unwrap(); @@ -1646,9 +1574,7 @@ fn op_utime( let atime = inner.atime(); let mtime = inner.mtime(); - if let Err(e) = state.check_write(&filename) { - return odd_future(e); - } + state.check_write(&filename)?; blocking(base.sync(), move || { debug!("op_utimes {} {} {}", filename, atime, mtime); @@ -1661,7 +1587,7 @@ fn op_listen( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_listen().unwrap(); @@ -1669,33 +1595,28 @@ fn op_listen( assert_eq!(network, "tcp"); let address = inner.address().unwrap(); - if let Err(e) = state.check_net(&address) { - return odd_future(e); - } + state.check_net(&address)?; - Box::new(futures::future::result((move || { - let addr = resolve_addr(address).wait()?; - let listener = TcpListener::bind(&addr)?; - let resource = resources::add_tcp_listener(listener); + let addr = resolve_addr(address).wait()?; + let listener = TcpListener::bind(&addr)?; + let resource = resources::add_tcp_listener(listener); - let builder = &mut FlatBufferBuilder::new(); - let inner = msg::ListenRes::create( - builder, - &msg::ListenResArgs { rid: resource.rid }, - ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(inner.as_union_value()), - inner_type: msg::Any::ListenRes, - ..Default::default() - }, - )) - })())) + let builder = &mut FlatBufferBuilder::new(); + let inner = + msg::ListenRes::create(builder, &msg::ListenResArgs { rid: resource.rid }); + let response_buf = serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(inner.as_union_value()), + inner_type: msg::Any::ListenRes, + ..Default::default() + }, + ); + ok_buf(response_buf) } -fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> OpResult { +fn new_conn(cmd_id: u32, tcp_stream: TcpStream) -> DenoResult { let tcp_stream_resource = resources::add_tcp_stream(tcp_stream); // TODO forward socket_addr to client. @@ -1722,21 +1643,26 @@ fn op_accept( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_accept().unwrap(); let server_rid = inner.rid(); match resources::lookup(server_rid) { - None => odd_future(errors::bad_resource()), + None => Err(errors::bad_resource()), Some(server_resource) => { let op = tokio_util::accept(server_resource) .map_err(DenoError::from) .and_then(move |(tcp_stream, _socket_addr)| { new_conn(cmd_id, tcp_stream) }); - Box::new(op) + if base.sync() { + let buf = op.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(op))) + } } } } @@ -1745,7 +1671,7 @@ fn op_dial( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_dial().unwrap(); @@ -1753,9 +1679,7 @@ fn op_dial( assert_eq!(network, "tcp"); // TODO Support others. let address = inner.address().unwrap(); - if let Err(e) = state.check_net(&address) { - return odd_future(e); - } + state.check_net(&address)?; let op = resolve_addr(address) @@ -1765,14 +1689,19 @@ fn op_dial( .map_err(DenoError::from) .and_then(move |tcp_stream| new_conn(cmd_id, tcp_stream)) }); - Box::new(op) + if base.sync() { + let buf = op.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(op))) + } } fn op_metrics( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); @@ -1781,7 +1710,7 @@ fn op_metrics( builder, &msg::MetricsResArgs::from(&state.metrics), ); - ok_future(serialize_response( + ok_buf(serialize_response( cmd_id, builder, msg::BaseArgs { @@ -1796,7 +1725,7 @@ fn op_resources( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); @@ -1825,7 +1754,7 @@ fn op_resources( }, ); - ok_future(serialize_response( + ok_buf(serialize_response( cmd_id, builder, msg::BaseArgs { @@ -1848,13 +1777,13 @@ fn op_run( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { - assert!(base.sync()); +) -> CliOpResult { + if !base.sync() { + return Err(errors::no_async_support()); + } let cmd_id = base.cmd_id(); - if let Err(e) = state.check_run() { - return odd_future(e); - } + state.check_run()?; assert!(data.is_none()); let inner = base.inner_as_run().unwrap(); @@ -1878,12 +1807,7 @@ fn op_run( c.stderr(subprocess_stdio_map(inner.stderr())); // Spawn the command. - let child = match c.spawn_async() { - Ok(v) => v, - Err(err) => { - return odd_future(err.into()); - } - }; + let child = c.spawn_async().map_err(DenoError::from)?; let pid = child.id(); let resources = resources::add_child(child); @@ -1906,7 +1830,7 @@ fn op_run( let builder = &mut FlatBufferBuilder::new(); let inner = msg::RunRes::create(builder, &res_args); - ok_future(serialize_response( + Ok(Op::Sync(serialize_response( cmd_id, builder, msg::BaseArgs { @@ -1914,29 +1838,22 @@ fn op_run( inner_type: msg::Any::RunRes, ..Default::default() }, - )) + ))) } fn op_run_status( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_run_status().unwrap(); let rid = inner.rid(); - if let Err(e) = state.check_run() { - return odd_future(e); - } + state.check_run()?; - let future = match resources::child_status(rid) { - Err(e) => { - return odd_future(e); - } - Ok(f) => f, - }; + let future = resources::child_status(rid)?; let future = future.and_then(move |run_status| { let code = run_status.code(); @@ -1970,7 +1887,12 @@ fn op_run_status( }, )) }); - Box::new(future) + if base.sync() { + let buf = future.wait()?; + Ok(Op::Sync(buf)) + } else { + Ok(Op::Async(Box::new(future))) + } } struct GetMessageFuture { @@ -1994,7 +1916,10 @@ fn op_worker_get_message( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { + if base.sync() { + return Err(errors::no_sync_support()); + } assert!(data.is_none()); let cmd_id = base.cmd_id(); @@ -2021,7 +1946,7 @@ fn op_worker_get_message( }, )) }); - Box::new(op) + Ok(Op::Async(Box::new(op))) } /// Post message to host as guest worker @@ -2029,29 +1954,26 @@ fn op_worker_post_message( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { let cmd_id = base.cmd_id(); - let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); let tx = { let wc = state.worker_channels.lock().unwrap(); wc.0.clone() }; - let op = tx.send(d); - let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); - let op = op.and_then(move |_| -> DenoResult { - let builder = &mut FlatBufferBuilder::new(); + tx.send(d) + .wait() + .map_err(|e| errors::new(ErrorKind::Other, e.to_string()))?; + let builder = &mut FlatBufferBuilder::new(); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - )) - }); - Box::new(op) + ok_buf(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) } /// Create worker as the host @@ -2059,7 +1981,7 @@ fn op_create_worker( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_create_worker().unwrap(); @@ -2081,39 +2003,34 @@ fn op_create_worker( js_check(worker.execute("denoMain()")); js_check(worker.execute("workerMain()")); - let op = ModuleSpecifier::resolve_root(specifier) - .and_then(|module_specifier| { - Ok( - worker - .execute_mod_async(&module_specifier, false) - .and_then(move |()| { - let mut workers_tl = parent_state.workers.lock().unwrap(); - workers_tl.insert(rid, worker.shared()); - let builder = &mut FlatBufferBuilder::new(); - let msg_inner = msg::CreateWorkerRes::create( - builder, - &msg::CreateWorkerResArgs { rid }, - ); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - inner: Some(msg_inner.as_union_value()), - inner_type: msg::Any::CreateWorkerRes, - ..Default::default() - }, - )) - }).map_err(|err| match err { - errors::RustOrJsError::Js(_) => errors::worker_init_failed(), - errors::RustOrJsError::Rust(err) => err, - }), - ) - }).map_err(DenoError::from); + let module_specifier = ModuleSpecifier::resolve_root(specifier)?; - Box::new(match op { - Ok(op) => future::Either::A(op), - Err(err) => future::Either::B(future::result(Err(err))), - }) + let op = worker + .execute_mod_async(&module_specifier, false) + .and_then(move |()| { + let mut workers_tl = parent_state.workers.lock().unwrap(); + workers_tl.insert(rid, worker.shared()); + let builder = &mut FlatBufferBuilder::new(); + let msg_inner = msg::CreateWorkerRes::create( + builder, + &msg::CreateWorkerResArgs { rid }, + ); + Ok(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + inner: Some(msg_inner.as_union_value()), + inner_type: msg::Any::CreateWorkerRes, + ..Default::default() + }, + )) + }).map_err(|err| match err { + errors::RustOrJsError::Js(_) => errors::worker_init_failed(), + errors::RustOrJsError::Rust(err) => err, + }); + + let result = op.wait()?; + Ok(Op::Sync(result)) } /// Return when the worker closes @@ -2121,7 +2038,10 @@ fn op_host_get_worker_closed( state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { + if base.sync() { + return Err(errors::no_sync_support()); + } assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_host_get_worker_closed().unwrap(); @@ -2134,7 +2054,7 @@ fn op_host_get_worker_closed( worker.clone() }; - Box::new(shared_worker_future.then(move |_result| { + let op = Box::new(shared_worker_future.then(move |_result| { let builder = &mut FlatBufferBuilder::new(); Ok(serialize_response( @@ -2144,7 +2064,8 @@ fn op_host_get_worker_closed( ..Default::default() }, )) - })) + })); + Ok(Op::Async(Box::new(op))) } /// Get message from guest worker as host @@ -2152,7 +2073,10 @@ fn op_host_get_message( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { + if base.sync() { + return Err(errors::no_sync_support()); + } assert!(data.is_none()); let cmd_id = base.cmd_id(); let inner = base.inner_as_host_get_message().unwrap(); @@ -2178,7 +2102,7 @@ fn op_host_get_message( }, )) }); - Box::new(op) + Ok(Op::Async(Box::new(op))) } /// Post message to guest worker as host @@ -2186,34 +2110,32 @@ fn op_host_post_message( _state: &ThreadSafeState, base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { let cmd_id = base.cmd_id(); let inner = base.inner_as_host_post_message().unwrap(); let rid = inner.rid(); let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - let op = resources::post_message_to_worker(rid, d); - let op = op.map_err(|e| errors::new(ErrorKind::Other, e.to_string())); - let op = op.and_then(move |_| -> DenoResult { - let builder = &mut FlatBufferBuilder::new(); + resources::post_message_to_worker(rid, d) + .wait() + .map_err(|e| errors::new(ErrorKind::Other, e.to_string()))?; + let builder = &mut FlatBufferBuilder::new(); - Ok(serialize_response( - cmd_id, - builder, - msg::BaseArgs { - ..Default::default() - }, - )) - }); - Box::new(op) + ok_buf(serialize_response( + cmd_id, + builder, + msg::BaseArgs { + ..Default::default() + }, + )) } fn op_get_random_values( state: &ThreadSafeState, _base: &msg::Base<'_>, data: Option, -) -> Box { +) -> CliOpResult { if let Some(ref seeded_rng) = state.seeded_rng { let mut rng = seeded_rng.lock().unwrap(); rng.fill(&mut data.unwrap()[..]); @@ -2222,5 +2144,5 @@ fn op_get_random_values( rng.fill(&mut data.unwrap()[..]); } - Box::new(ok_future(empty_buf())) + ok_buf(empty_buf()) } diff --git a/cli/state.rs b/cli/state.rs index f5eb8ae7ab..aa4690d445 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -15,9 +15,9 @@ use crate::resources; use crate::resources::ResourceId; use crate::worker::Worker; use deno::Buf; +use deno::CoreOp; use deno::Loader; use deno::ModuleSpecifier; -use deno::Op; use deno::PinnedBuf; use futures::future::Either; use futures::future::Shared; @@ -106,7 +106,11 @@ impl Deref for ThreadSafeState { } impl ThreadSafeState { - pub fn dispatch(&self, control: &[u8], zero_copy: Option) -> Op { + pub fn dispatch( + &self, + control: &[u8], + zero_copy: Option, + ) -> CoreOp { ops::dispatch_all(self, control, zero_copy, self.dispatch_selector) } } diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index e8c5ec1b73..ee8de50864 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -111,7 +111,7 @@ fn test_record_from() { pub type HttpBenchOp = dyn Future + Send; -fn dispatch(control: &[u8], zero_copy_buf: Option) -> Op { +fn dispatch(control: &[u8], zero_copy_buf: Option) -> CoreOp { let record = Record::from(control); let is_sync = record.promise_id == 0; let http_bench_op = match record.op_id { diff --git a/core/isolate.rs b/core/isolate.rs index 14e1b88aad..1ba86d727a 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -29,13 +29,19 @@ use std::sync::{Arc, Mutex, Once, ONCE_INIT}; pub type Buf = Box<[u8]>; -pub type OpAsyncFuture = Box + Send>; +pub type OpAsyncFuture = Box + Send>; -pub enum Op { +pub enum Op { Sync(Buf), - Async(OpAsyncFuture), + Async(OpAsyncFuture), } +pub type CoreError = (); + +type CoreOpAsyncFuture = OpAsyncFuture; + +pub type CoreOp = Op; + /// Stores a script used to initalize a Isolate pub struct Script<'a> { pub source: &'a str, @@ -68,7 +74,9 @@ pub enum StartupData<'a> { None, } -type DispatchFn = Fn(&[u8], Option) -> Op; +pub type OpResult = Result, E>; + +type CoreDispatchFn = Fn(&[u8], Option) -> CoreOp; pub type DynImportFuture = Box + Send>; type DynImportFn = Fn(&str, &str) -> DynImportFuture; @@ -104,11 +112,11 @@ impl Future for DynImport { pub struct Isolate { libdeno_isolate: *const libdeno::isolate, shared_libdeno_isolate: Arc>>, - dispatch: Option>, + dispatch: Option>, dyn_import: Option>, needs_init: bool, shared: SharedQueue, - pending_ops: FuturesUnordered, + pending_ops: FuturesUnordered, pending_dyn_imports: FuturesUnordered, have_unpolled_ops: bool, startup_script: Option, @@ -184,7 +192,7 @@ impl Isolate { /// corresponds to the second argument of Deno.core.dispatch(). pub fn set_dispatch(&mut self, f: F) where - F: Fn(&[u8], Option) -> Op + Send + Sync + 'static, + F: Fn(&[u8], Option) -> CoreOp + Send + Sync + 'static, { self.dispatch = Some(Arc::new(f)); } @@ -664,7 +672,7 @@ pub mod tests { let dispatch_count_ = dispatch_count.clone(); let mut isolate = Isolate::new(StartupData::None, false); - isolate.set_dispatch(move |control, _| -> Op { + isolate.set_dispatch(move |control, _| -> CoreOp { dispatch_count_.fetch_add(1, Ordering::Relaxed); match mode { Mode::AsyncImmediate => { diff --git a/js/dispatch.ts b/js/dispatch.ts index 0c6e707091..cd11c93f6c 100644 --- a/js/dispatch.ts +++ b/js/dispatch.ts @@ -56,13 +56,27 @@ function sendInternal( innerType: msg.Any, inner: flatbuffers.Offset, zeroCopy: undefined | ArrayBufferView, - sync = true -): [number, null | Uint8Array] { + isSync: true +): Uint8Array; +function sendInternal( + builder: flatbuffers.Builder, + innerType: msg.Any, + inner: flatbuffers.Offset, + zeroCopy: undefined | ArrayBufferView, + isSync: false +): Promise; +function sendInternal( + builder: flatbuffers.Builder, + innerType: msg.Any, + inner: flatbuffers.Offset, + zeroCopy: undefined | ArrayBufferView, + isSync: boolean +): Promise | Uint8Array { const cmdId = nextPromiseId(); msg.Base.startBase(builder); msg.Base.addInner(builder, inner); msg.Base.addInnerType(builder, innerType); - msg.Base.addSync(builder, sync); + msg.Base.addSync(builder, isSync); msg.Base.addCmdId(builder, cmdId); builder.finish(msg.Base.endBase(builder)); @@ -74,7 +88,27 @@ function sendInternal( ); builder.inUse = false; - return [cmdId, response]; + + if (response == null) { + util.assert(!isSync); + const promise = util.createResolvable(); + promiseTable.set(cmdId, promise); + return promise; + } else { + if (!isSync) { + // We can easily and correctly allow for sync responses to async calls + // by creating and returning a promise from the sync response. + const bb = new flatbuffers.ByteBuffer(response); + const base = msg.Base.getRootAsBase(bb); + const err = errors.maybeError(base); + if (err != null) { + return Promise.reject(err); + } else { + return Promise.resolve(base); + } + } + return response; + } } // @internal @@ -84,17 +118,7 @@ export function sendAsync( inner: flatbuffers.Offset, data?: ArrayBufferView ): Promise { - const [cmdId, response] = sendInternal( - builder, - innerType, - inner, - data, - false - ); - util.assert(response == null); // null indicates async. - const promise = util.createResolvable(); - promiseTable.set(cmdId, promise); - return promise; + return sendInternal(builder, innerType, inner, data, false); } // @internal @@ -104,9 +128,7 @@ export function sendSync( inner: flatbuffers.Offset, data?: ArrayBufferView ): null | msg.Base { - const [cmdId, response] = sendInternal(builder, innerType, inner, data, true); - util.assert(cmdId >= 0); - util.assert(response != null); // null indicates async. + const response = sendInternal(builder, innerType, inner, data, true); if (response!.length === 0) { return null; } else {