mirror of
https://github.com/denoland/deno.git
synced 2024-11-24 15:19:26 -05:00
core: express op as enum (#2255)
This commit is contained in:
parent
2f4fefd0f6
commit
c171813e89
4 changed files with 48 additions and 38 deletions
11
cli/ops.rs
11
cli/ops.rs
|
@ -83,7 +83,7 @@ pub fn dispatch_all(
|
|||
control: &[u8],
|
||||
zero_copy: Option<PinnedBuf>,
|
||||
op_selector: OpSelector,
|
||||
) -> (bool, Box<Op>) {
|
||||
) -> Op {
|
||||
let bytes_sent_control = control.len();
|
||||
let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0);
|
||||
let base = msg::get_root_as_base(&control);
|
||||
|
@ -101,7 +101,7 @@ pub fn dispatch_all(
|
|||
let state = state.clone();
|
||||
state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy);
|
||||
|
||||
let boxed_op = Box::new(
|
||||
let fut = Box::new(
|
||||
op.or_else(move |err: DenoError| -> Result<Buf, ()> {
|
||||
debug!("op err {}", err);
|
||||
// No matter whether we got an Err or Ok, we want a serialized message to
|
||||
|
@ -143,7 +143,12 @@ pub fn dispatch_all(
|
|||
msg::enum_name_any(inner_type),
|
||||
base.sync()
|
||||
);
|
||||
(base.sync(), boxed_op)
|
||||
|
||||
if base.sync() {
|
||||
Op::Sync(fut.wait().unwrap())
|
||||
} else {
|
||||
Op::Async(fut)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn op_selector_compiler(inner_type: msg::Any) -> Option<OpCreator> {
|
||||
|
|
|
@ -81,11 +81,7 @@ impl Deref for ThreadSafeState {
|
|||
}
|
||||
|
||||
impl ThreadSafeState {
|
||||
pub fn dispatch(
|
||||
&self,
|
||||
control: &[u8],
|
||||
zero_copy: Option<PinnedBuf>,
|
||||
) -> (bool, Box<Op>) {
|
||||
pub fn dispatch(&self, control: &[u8], zero_copy: Option<PinnedBuf>) -> Op {
|
||||
ops::dispatch_all(self, control, zero_copy, self.dispatch_selector)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,10 +111,7 @@ fn test_record_from() {
|
|||
|
||||
pub type HttpBenchOp = dyn Future<Item = i32, Error = std::io::Error> + Send;
|
||||
|
||||
fn dispatch(
|
||||
control: &[u8],
|
||||
zero_copy_buf: Option<PinnedBuf>,
|
||||
) -> (bool, Box<Op>) {
|
||||
fn dispatch(control: &[u8], zero_copy_buf: Option<PinnedBuf>) -> Op {
|
||||
let record = Record::from(control);
|
||||
let is_sync = record.promise_id == 0;
|
||||
let http_bench_op = match record.op_id {
|
||||
|
@ -147,7 +144,7 @@ fn dispatch(
|
|||
let mut record_a = record.clone();
|
||||
let mut record_b = record.clone();
|
||||
|
||||
let op = Box::new(
|
||||
let fut = Box::new(
|
||||
http_bench_op
|
||||
.and_then(move |result| {
|
||||
record_a.result = result;
|
||||
|
@ -161,7 +158,12 @@ fn dispatch(
|
|||
Ok(record.into())
|
||||
}),
|
||||
);
|
||||
(is_sync, op)
|
||||
|
||||
if is_sync {
|
||||
Op::Sync(fut.wait().unwrap())
|
||||
} else {
|
||||
Op::Async(fut)
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
|
|
|
@ -26,7 +26,13 @@ use std::ptr::null;
|
|||
use std::sync::{Arc, Mutex, Once, ONCE_INIT};
|
||||
|
||||
pub type Buf = Box<[u8]>;
|
||||
pub type Op = dyn Future<Item = Buf, Error = ()> + Send;
|
||||
|
||||
pub type OpAsyncFuture = Box<dyn Future<Item = Buf, Error = ()> + Send>;
|
||||
|
||||
pub enum Op {
|
||||
Sync(Buf),
|
||||
Async(OpAsyncFuture),
|
||||
}
|
||||
|
||||
/// Stores a script used to initalize a Isolate
|
||||
pub struct Script<'a> {
|
||||
|
@ -46,8 +52,7 @@ pub enum StartupData<'a> {
|
|||
|
||||
#[derive(Default)]
|
||||
pub struct Config {
|
||||
dispatch:
|
||||
Option<Arc<Fn(&[u8], Option<PinnedBuf>) -> (bool, Box<Op>) + Send + Sync>>,
|
||||
dispatch: Option<Arc<Fn(&[u8], Option<PinnedBuf>) -> Op>>,
|
||||
pub will_snapshot: bool,
|
||||
}
|
||||
|
||||
|
@ -57,7 +62,7 @@ impl Config {
|
|||
/// corresponds to the second argument of Deno.core.dispatch().
|
||||
pub fn dispatch<F>(&mut self, f: F)
|
||||
where
|
||||
F: Fn(&[u8], Option<PinnedBuf>) -> (bool, Box<Op>) + Send + Sync + 'static,
|
||||
F: Fn(&[u8], Option<PinnedBuf>) -> Op + Send + Sync + 'static,
|
||||
{
|
||||
self.dispatch = Some(Arc::new(f));
|
||||
}
|
||||
|
@ -69,15 +74,15 @@ impl Config {
|
|||
/// pending ops have completed.
|
||||
///
|
||||
/// Ops are created in JavaScript by calling Deno.core.dispatch(), and in Rust
|
||||
/// by implementing deno::Dispatch::dispatch. An Op corresponds exactly to a
|
||||
/// Promise in JavaScript.
|
||||
/// by implementing deno::Dispatch::dispatch. An async Op corresponds exactly to
|
||||
/// a Promise in JavaScript.
|
||||
pub struct Isolate {
|
||||
libdeno_isolate: *const libdeno::isolate,
|
||||
shared_libdeno_isolate: Arc<Mutex<Option<*const libdeno::isolate>>>,
|
||||
config: Config,
|
||||
needs_init: bool,
|
||||
shared: SharedQueue,
|
||||
pending_ops: FuturesUnordered<Box<Op>>,
|
||||
pending_ops: FuturesUnordered<OpAsyncFuture>,
|
||||
have_unpolled_ops: bool,
|
||||
}
|
||||
|
||||
|
@ -175,7 +180,7 @@ impl Isolate {
|
|||
let isolate = unsafe { Isolate::from_raw_ptr(user_data) };
|
||||
let control_shared = isolate.shared.shift();
|
||||
|
||||
let (is_sync, op) = if control_argv0.len() > 0 {
|
||||
let op = if control_argv0.len() > 0 {
|
||||
// The user called Deno.core.send(control)
|
||||
if let Some(ref f) = isolate.config.dispatch {
|
||||
f(control_argv0.as_ref(), PinnedBuf::new(zero_copy_buf))
|
||||
|
@ -201,16 +206,18 @@ impl Isolate {
|
|||
// At this point the SharedQueue should be empty.
|
||||
assert_eq!(isolate.shared.size(), 0);
|
||||
|
||||
if is_sync {
|
||||
let res_record = op.wait().unwrap();
|
||||
// For sync messages, we always return the response via Deno.core.send's
|
||||
// return value.
|
||||
// TODO(ry) check that if JSError thrown during respond(), that it will be
|
||||
// picked up.
|
||||
let _ = isolate.respond(Some(&res_record));
|
||||
} else {
|
||||
isolate.pending_ops.push(op);
|
||||
isolate.have_unpolled_ops = true;
|
||||
match op {
|
||||
Op::Sync(buf) => {
|
||||
// For sync messages, we always return the response via Deno.core.send's
|
||||
// return value.
|
||||
// TODO(ry) check that if JSError thrown during respond(), that it will be
|
||||
// picked up.
|
||||
let _ = isolate.respond(Some(&buf));
|
||||
}
|
||||
Op::Async(fut) => {
|
||||
isolate.pending_ops.push(fut);
|
||||
isolate.have_unpolled_ops = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -555,19 +562,19 @@ pub mod tests {
|
|||
let dispatch_count_ = dispatch_count.clone();
|
||||
|
||||
let mut config = Config::default();
|
||||
config.dispatch(move |control, _| -> (bool, Box<Op>) {
|
||||
config.dispatch(move |control, _| -> Op {
|
||||
dispatch_count_.fetch_add(1, Ordering::Relaxed);
|
||||
match mode {
|
||||
Mode::AsyncImmediate => {
|
||||
assert_eq!(control.len(), 1);
|
||||
assert_eq!(control[0], 42);
|
||||
let buf = vec![43u8].into_boxed_slice();
|
||||
(false, Box::new(futures::future::ok(buf)))
|
||||
Op::Async(Box::new(futures::future::ok(buf)))
|
||||
}
|
||||
Mode::OverflowReqSync => {
|
||||
assert_eq!(control.len(), 100 * 1024 * 1024);
|
||||
let buf = vec![43u8].into_boxed_slice();
|
||||
(true, Box::new(futures::future::ok(buf)))
|
||||
Op::Sync(buf)
|
||||
}
|
||||
Mode::OverflowResSync => {
|
||||
assert_eq!(control.len(), 1);
|
||||
|
@ -576,12 +583,12 @@ pub mod tests {
|
|||
vec.resize(100 * 1024 * 1024, 0);
|
||||
vec[0] = 99;
|
||||
let buf = vec.into_boxed_slice();
|
||||
(true, Box::new(futures::future::ok(buf)))
|
||||
Op::Sync(buf)
|
||||
}
|
||||
Mode::OverflowReqAsync => {
|
||||
assert_eq!(control.len(), 100 * 1024 * 1024);
|
||||
let buf = vec![43u8].into_boxed_slice();
|
||||
(false, Box::new(futures::future::ok(buf)))
|
||||
Op::Async(Box::new(futures::future::ok(buf)))
|
||||
}
|
||||
Mode::OverflowResAsync => {
|
||||
assert_eq!(control.len(), 1);
|
||||
|
@ -590,7 +597,7 @@ pub mod tests {
|
|||
vec.resize(100 * 1024 * 1024, 0);
|
||||
vec[0] = 4;
|
||||
let buf = vec.into_boxed_slice();
|
||||
(false, Box::new(futures::future::ok(buf)))
|
||||
Op::Async(Box::new(futures::future::ok(buf)))
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue