From 4d2b9cd37af2b9d6a145a04fc93117922e43df3a Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Sat, 18 Apr 2020 20:05:13 -0400 Subject: [PATCH] Fix Op definitions (#4814) --- cli/ops/dispatch_json.rs | 20 +++++------ cli/ops/dispatch_minimal.rs | 12 +++---- cli/state.rs | 14 ++++---- core/es_isolate.rs | 4 +-- core/examples/http_bench.rs | 69 ++++++++++++++++++++++--------------- core/isolate.rs | 23 ++++++------- core/ops.rs | 32 ++++++----------- core/plugins.rs | 4 +-- deno_typescript/lib.rs | 12 +++---- deno_typescript/ops.rs | 3 +- test_plugin/src/lib.rs | 7 ++-- 11 files changed, 98 insertions(+), 102 deletions(-) diff --git a/cli/ops/dispatch_json.rs b/cli/ops/dispatch_json.rs index 70498eb8f5..b1a7bc723f 100644 --- a/cli/ops/dispatch_json.rs +++ b/cli/ops/dispatch_json.rs @@ -41,7 +41,7 @@ struct AsyncArgs { promise_id: Option, } -pub fn json_op(d: D) -> impl Fn(&[u8], Option) -> CoreOp +pub fn json_op(d: D) -> impl Fn(&[u8], Option) -> Op where D: Fn(Value, Option) -> Result, { @@ -50,7 +50,7 @@ where Ok(args) => args, Err(e) => { let buf = serialize_result(None, Err(OpError::from(e))); - return CoreOp::Sync(buf); + return Op::Sync(buf); } }; let promise_id = async_args.promise_id; @@ -60,32 +60,32 @@ where .map_err(OpError::from) .and_then(|args| d(args, zero_copy)); - // Convert to CoreOp + // Convert to Op match result { Ok(JsonOp::Sync(sync_value)) => { assert!(promise_id.is_none()); - CoreOp::Sync(serialize_result(promise_id, Ok(sync_value))) + Op::Sync(serialize_result(promise_id, Ok(sync_value))) } Ok(JsonOp::Async(fut)) => { assert!(promise_id.is_some()); let fut2 = fut.then(move |result| { - futures::future::ok(serialize_result(promise_id, result)) + futures::future::ready(serialize_result(promise_id, result)) }); - CoreOp::Async(fut2.boxed_local()) + Op::Async(fut2.boxed_local()) } Ok(JsonOp::AsyncUnref(fut)) => { assert!(promise_id.is_some()); let fut2 = fut.then(move |result| { - futures::future::ok(serialize_result(promise_id, result)) + futures::future::ready(serialize_result(promise_id, result)) }); - CoreOp::AsyncUnref(fut2.boxed_local()) + Op::AsyncUnref(fut2.boxed_local()) } Err(sync_err) => { let buf = serialize_result(promise_id, Err(sync_err)); if is_sync { - CoreOp::Sync(buf) + Op::Sync(buf) } else { - CoreOp::Async(futures::future::ok(buf).boxed_local()) + Op::Async(futures::future::ready(buf).boxed_local()) } } } diff --git a/cli/ops/dispatch_minimal.rs b/cli/ops/dispatch_minimal.rs index 299462ca0a..7fdd12401e 100644 --- a/cli/ops/dispatch_minimal.rs +++ b/cli/ops/dispatch_minimal.rs @@ -7,7 +7,6 @@ use crate::op_error::OpError; use byteorder::{LittleEndian, WriteBytesExt}; use deno_core::Buf; -use deno_core::CoreOp; use deno_core::Op; use deno_core::ZeroCopyBuf; use futures::future::FutureExt; @@ -114,7 +113,7 @@ fn test_parse_min_record() { assert_eq!(parse_min_record(&buf), None); } -pub fn minimal_op(d: D) -> impl Fn(&[u8], Option) -> CoreOp +pub fn minimal_op(d: D) -> impl Fn(&[u8], Option) -> Op where D: Fn(bool, i32, Option) -> MinimalOp, { @@ -153,12 +152,11 @@ where } }), MinimalOp::Async(min_fut) => { - // Convert to CoreOp - let core_fut = async move { + let fut = async move { match min_fut.await { Ok(r) => { record.result = r; - Ok(record.into()) + record.into() } Err(err) => { let error_record = ErrorRecord { @@ -167,11 +165,11 @@ where error_code: err.kind as i32, error_message: err.msg.as_bytes().to_owned(), }; - Ok(error_record.into()) + error_record.into() } } }; - Op::Async(core_fut.boxed_local()) + Op::Async(fut.boxed_local()) } } } diff --git a/cli/state.rs b/cli/state.rs index f67d5e9a6c..82ac8c4c16 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -10,7 +10,6 @@ use crate::ops::MinimalOp; use crate::permissions::DenoPermissions; use crate::web_worker::WebWorkerHandle; use deno_core::Buf; -use deno_core::CoreOp; use deno_core::ErrBox; use deno_core::ModuleLoader; use deno_core::ModuleSpecifier; @@ -18,7 +17,6 @@ use deno_core::Op; use deno_core::ResourceTable; use deno_core::ZeroCopyBuf; use futures::future::FutureExt; -use futures::future::TryFutureExt; use rand::rngs::StdRng; use rand::SeedableRng; use serde_json::Value; @@ -75,7 +73,7 @@ impl State { pub fn stateful_json_op( &self, dispatcher: D, - ) -> impl Fn(&[u8], Option) -> CoreOp + ) -> impl Fn(&[u8], Option) -> Op where D: Fn(&State, Value, Option) -> Result, { @@ -87,13 +85,13 @@ impl State { pub fn core_op( &self, dispatcher: D, - ) -> impl Fn(&[u8], Option) -> CoreOp + ) -> impl Fn(&[u8], Option) -> Op where - D: Fn(&[u8], Option) -> CoreOp, + D: Fn(&[u8], Option) -> Op, { let state = self.clone(); - move |control: &[u8], zero_copy: Option| -> CoreOp { + move |control: &[u8], zero_copy: Option| -> Op { let bytes_sent_control = control.len() as u64; let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0) as u64; @@ -116,7 +114,7 @@ impl State { .metrics .op_dispatched_async(bytes_sent_control, bytes_sent_zero_copy); let state = state.clone(); - let result_fut = fut.map_ok(move |buf: Buf| { + let result_fut = fut.map(move |buf: Buf| { let mut state_ = state.borrow_mut(); state_.metrics.op_completed_async(buf.len() as u64); buf @@ -130,7 +128,7 @@ impl State { bytes_sent_zero_copy, ); let state = state.clone(); - let result_fut = fut.map_ok(move |buf: Buf| { + let result_fut = fut.map(move |buf: Buf| { let mut state_ = state.borrow_mut(); state_.metrics.op_completed_async_unref(buf.len() as u64); buf diff --git a/core/es_isolate.rs b/core/es_isolate.rs index aa17c1a467..8c2e5b26de 100644 --- a/core/es_isolate.rs +++ b/core/es_isolate.rs @@ -581,12 +581,12 @@ pub mod tests { let mut isolate = EsIsolate::new(loader, StartupData::None, false); let dispatcher = - move |control: &[u8], _zero_copy: Option| -> CoreOp { + move |control: &[u8], _zero_copy: Option| -> Op { dispatch_count_.fetch_add(1, Ordering::Relaxed); assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ready(buf).boxed()) }; isolate.register_op("test", dispatcher); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 159f23fb5e..27fefc8bbc 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -97,15 +97,38 @@ impl Isolate { state: Default::default(), }; - isolate.register_op("listen", op_listen); + isolate.register_sync_op("listen", op_listen); isolate.register_op("accept", op_accept); isolate.register_op("read", op_read); isolate.register_op("write", op_write); - isolate.register_op("close", op_close); + isolate.register_sync_op("close", op_close); isolate } + fn register_sync_op(&mut self, name: &'static str, handler: F) + where + F: 'static + Fn(State, u32, Option) -> Result, + { + let state = self.state.clone(); + let core_handler = + move |control_buf: &[u8], zero_copy_buf: Option| -> Op { + let state = state.clone(); + let record = Record::from(control_buf); + let is_sync = record.promise_id == 0; + assert!(is_sync); + + let result: i32 = match handler(state, record.rid, zero_copy_buf) { + Ok(r) => r as i32, + Err(_) => -1, + }; + let buf = RecordBuf::from(Record { result, ..record })[..].into(); + Op::Sync(buf) + }; + + self.core_isolate.register_op(name, core_handler); + } + fn register_op( &mut self, name: &'static str, @@ -117,10 +140,11 @@ impl Isolate { { let state = self.state.clone(); let core_handler = - move |control_buf: &[u8], zero_copy_buf: Option| -> CoreOp { + move |control_buf: &[u8], zero_copy_buf: Option| -> Op { let state = state.clone(); let record = Record::from(control_buf); let is_sync = record.promise_id == 0; + assert!(!is_sync); let fut = async move { let op = handler(state, record.rid, zero_copy_buf); @@ -128,14 +152,10 @@ impl Isolate { .map_ok(|r| r.try_into().expect("op result does not fit in i32")) .unwrap_or_else(|_| -1) .await; - Ok(RecordBuf::from(Record { result, ..record })[..].into()) + RecordBuf::from(Record { result, ..record })[..].into() }; - if is_sync { - Op::Sync(futures::executor::block_on(fut).unwrap()) - } else { - Op::Async(fut.boxed_local()) - } + Op::Async(fut.boxed_local()) }; self.core_isolate.register_op(name, core_handler); @@ -154,32 +174,27 @@ fn op_close( state: State, rid: u32, _buf: Option, -) -> impl TryFuture { +) -> Result { debug!("close rid={}", rid); - - async move { - let resource_table = &mut state.borrow_mut().resource_table; - resource_table - .close(rid) - .map(|_| 0) - .ok_or_else(bad_resource) - } + let resource_table = &mut state.borrow_mut().resource_table; + resource_table + .close(rid) + .map(|_| 0) + .ok_or_else(bad_resource) } fn op_listen( state: State, _rid: u32, _buf: Option, -) -> impl TryFuture { +) -> Result { debug!("listen"); - - async move { - let addr = "127.0.0.1:4544".parse::().unwrap(); - let listener = tokio::net::TcpListener::bind(&addr).await?; - let resource_table = &mut state.borrow_mut().resource_table; - let rid = resource_table.add("tcpListener", Box::new(listener)); - Ok(rid) - } + let addr = "127.0.0.1:4544".parse::().unwrap(); + let std_listener = std::net::TcpListener::bind(&addr)?; + let listener = TcpListener::from_std(std_listener)?; + let resource_table = &mut state.borrow_mut().resource_table; + let rid = resource_table.add("tcpListener", Box::new(listener)); + Ok(rid) } fn op_accept( diff --git a/core/isolate.rs b/core/isolate.rs index 31a4c401cb..18cd84eae2 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -14,7 +14,6 @@ use crate::ops::*; use crate::shared_queue::SharedQueue; use crate::shared_queue::RECOMMENDED_SIZE; use futures::future::FutureExt; -use futures::future::TryFutureExt; use futures::stream::select; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; @@ -34,6 +33,8 @@ use std::sync::{Arc, Mutex, Once}; use std::task::Context; use std::task::Poll; +type PendingOpFuture = Pin>>; + /// A ZeroCopyBuf encapsulates a slice that's been borrowed from a JavaScript /// ArrayBuffer object. JavaScript objects can normally be garbage collected, /// but the existence of a ZeroCopyBuf inhibits this until it is dropped. It @@ -344,7 +345,7 @@ impl Isolate { /// Requires runtime to explicitly ask for op ids before using any of the ops. pub fn register_op(&self, name: &str, op: F) -> OpId where - F: Fn(&[u8], Option) -> CoreOp + 'static, + F: Fn(&[u8], Option) -> Op + 'static, { self.op_registry.register(name, op) } @@ -402,13 +403,13 @@ impl Isolate { Some((op_id, buf)) } Op::Async(fut) => { - let fut2 = fut.map_ok(move |buf| (op_id, buf)); + let fut2 = fut.map(move |buf| (op_id, buf)); self.pending_ops.push(fut2.boxed_local()); self.have_unpolled_ops = true; None } Op::AsyncUnref(fut) => { - let fut2 = fut.map_ok(move |buf| (op_id, buf)); + let fut2 = fut.map(move |buf| (op_id, buf)); self.pending_unref_ops.push(fut2.boxed_local()); self.have_unpolled_ops = true; None @@ -528,10 +529,9 @@ impl Future for Isolate { match select(&mut inner.pending_ops, &mut inner.pending_unref_ops) .poll_next_unpin(cx) { - Poll::Ready(Some(Err(_))) => panic!("unexpected op error"), Poll::Ready(None) => break, Poll::Pending => break, - Poll::Ready(Some(Ok((op_id, buf)))) => { + Poll::Ready(Some((op_id, buf))) => { let successful_push = inner.shared.push(op_id, &buf); if !successful_push { // If we couldn't push the response to the shared queue, because @@ -769,14 +769,14 @@ pub mod tests { let mut isolate = Isolate::new(StartupData::None, false); let dispatcher = - move |control: &[u8], _zero_copy: Option| -> CoreOp { + move |control: &[u8], _zero_copy: Option| -> Op { dispatch_count_.fetch_add(1, Ordering::Relaxed); match mode { Mode::Async => { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ready(buf).boxed()) } Mode::AsyncUnref => { assert_eq!(control.len(), 1); @@ -784,8 +784,7 @@ pub mod tests { let fut = async { // This future never finish. futures::future::pending::<()>().await; - let buf = vec![43u8].into_boxed_slice(); - Ok(buf) + vec![43u8].into_boxed_slice() }; Op::AsyncUnref(fut.boxed()) } @@ -806,7 +805,7 @@ pub mod tests { Mode::OverflowReqAsync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8].into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ready(buf).boxed()) } Mode::OverflowResAsync => { assert_eq!(control.len(), 1); @@ -815,7 +814,7 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; let buf = vec.into_boxed_slice(); - Op::Async(futures::future::ok(buf).boxed()) + Op::Async(futures::future::ready(buf).boxed()) } } }; diff --git a/core/ops.rs b/core/ops.rs index 16807196ec..ab183f4ded 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -10,30 +10,18 @@ pub type OpId = u32; pub type Buf = Box<[u8]>; -pub type OpAsyncFuture = Pin>>>; +pub type OpAsyncFuture = Pin>>; -pub(crate) type PendingOpFuture = - Pin>>>; - -pub type OpResult = Result, E>; - -// TODO(ry) Op::Async should be Op::Async(Pin>>) -// The error should be encoded in the Buf. Notice how Sync ops do not return a -// result. The Sync and Async should be symmetrical! -pub enum Op { +pub enum Op { Sync(Buf), - Async(OpAsyncFuture), + Async(OpAsyncFuture), /// AsyncUnref is the variation of Async, which doesn't block the program /// exiting. - AsyncUnref(OpAsyncFuture), + AsyncUnref(OpAsyncFuture), } -pub type CoreError = (); - -pub type CoreOp = Op; - /// Main type describing op -pub type OpDispatcher = dyn Fn(&[u8], Option) -> CoreOp + 'static; +pub type OpDispatcher = dyn Fn(&[u8], Option) -> Op + 'static; #[derive(Default)] pub struct OpRegistry { @@ -54,7 +42,7 @@ impl OpRegistry { pub fn register(&self, name: &str, op: F) -> OpId where - F: Fn(&[u8], Option) -> CoreOp + 'static, + F: Fn(&[u8], Option) -> Op + 'static, { let mut lock = self.dispatchers.write().unwrap(); let op_id = lock.len() as u32; @@ -83,7 +71,7 @@ impl OpRegistry { op_id: OpId, control: &[u8], zero_copy_buf: Option, - ) -> Option { + ) -> Option { // Op with id 0 has special meaning - it's a special op that is always // provided to retrieve op id map. The map consists of name to `OpId` // mappings. @@ -113,7 +101,7 @@ fn test_op_registry() { let test_id = op_registry.register("test", move |_, _| { c_.fetch_add(1, atomic::Ordering::SeqCst); - CoreOp::Sync(Box::new([])) + Op::Sync(Box::new([])) }); assert!(test_id != 0); @@ -149,9 +137,9 @@ fn register_op_during_call() { let c__ = c_.clone(); op_registry_.register("test", move |_, _| { c__.fetch_add(1, atomic::Ordering::SeqCst); - CoreOp::Sync(Box::new([])) + Op::Sync(Box::new([])) }); - CoreOp::Sync(Box::new([])) + Op::Sync(Box::new([])) }); assert!(test_id != 0); diff --git a/core/plugins.rs b/core/plugins.rs index edb6751202..c6e63c9758 100644 --- a/core/plugins.rs +++ b/core/plugins.rs @@ -1,5 +1,5 @@ use crate::isolate::ZeroCopyBuf; -use crate::ops::CoreOp; +use crate::ops::Op; pub type PluginInitFn = fn(context: &mut dyn PluginInitContext); @@ -7,7 +7,7 @@ pub trait PluginInitContext { fn register_op( &mut self, name: &str, - op: Box) -> CoreOp + 'static>, + op: Box) -> Op + 'static>, ); } diff --git a/deno_typescript/lib.rs b/deno_typescript/lib.rs index 1fe81856c5..cd843041e2 100644 --- a/deno_typescript/lib.rs +++ b/deno_typescript/lib.rs @@ -8,10 +8,10 @@ extern crate serde_json; mod ops; use deno_core::js_check; pub use deno_core::v8_set_flags; -use deno_core::CoreOp; use deno_core::ErrBox; use deno_core::Isolate; use deno_core::ModuleSpecifier; +use deno_core::Op; use deno_core::StartupData; use deno_core::ZeroCopyBuf; pub use ops::EmitResult; @@ -49,11 +49,11 @@ pub struct TSState { fn compiler_op( ts_state: Arc>, dispatcher: D, -) -> impl Fn(&[u8], Option) -> CoreOp +) -> impl Fn(&[u8], Option) -> Op where - D: Fn(&mut TSState, &[u8]) -> CoreOp, + D: Fn(&mut TSState, &[u8]) -> Op, { - move |control: &[u8], zero_copy_buf: Option| -> CoreOp { + move |control: &[u8], zero_copy_buf: Option| -> Op { assert!(zero_copy_buf.is_none()); // zero_copy_buf unused in compiler. let mut s = ts_state.lock().unwrap(); dispatcher(&mut s, control) @@ -326,11 +326,11 @@ pub fn trace_serializer() { /// Isolate. pub fn op_fetch_asset( custom_assets: HashMap, -) -> impl Fn(&[u8], Option) -> CoreOp { +) -> impl Fn(&[u8], Option) -> Op { for (_, path) in custom_assets.iter() { println!("cargo:rerun-if-changed={}", path.display()); } - move |control: &[u8], zero_copy_buf: Option| -> CoreOp { + move |control: &[u8], zero_copy_buf: Option| -> Op { assert!(zero_copy_buf.is_none()); // zero_copy_buf unused in this op. let name = std::str::from_utf8(control).unwrap(); diff --git a/deno_typescript/ops.rs b/deno_typescript/ops.rs index 022eee00e3..f5904af1a6 100644 --- a/deno_typescript/ops.rs +++ b/deno_typescript/ops.rs @@ -1,5 +1,4 @@ use crate::TSState; -use deno_core::CoreOp; use deno_core::ErrBox; use deno_core::ModuleSpecifier; use deno_core::Op; @@ -16,7 +15,7 @@ pub struct WrittenFile { type Dispatcher = fn(state: &mut TSState, args: Value) -> Result; -pub fn json_op(d: Dispatcher) -> impl Fn(&mut TSState, &[u8]) -> CoreOp { +pub fn json_op(d: Dispatcher) -> impl Fn(&mut TSState, &[u8]) -> Op { move |state: &mut TSState, control: &[u8]| { let result = serde_json::from_slice(control) .map_err(ErrBox::from) diff --git a/test_plugin/src/lib.rs b/test_plugin/src/lib.rs index 922f336825..70ba6d9b24 100644 --- a/test_plugin/src/lib.rs +++ b/test_plugin/src/lib.rs @@ -2,7 +2,6 @@ extern crate deno_core; extern crate futures; -use deno_core::CoreOp; use deno_core::Op; use deno_core::PluginInitContext; use deno_core::{Buf, ZeroCopyBuf}; @@ -14,7 +13,7 @@ fn init(context: &mut dyn PluginInitContext) { } init_fn!(init); -pub fn op_test_sync(data: &[u8], zero_copy: Option) -> CoreOp { +pub fn op_test_sync(data: &[u8], zero_copy: Option) -> Op { if let Some(buf) = zero_copy { let data_str = std::str::from_utf8(&data[..]).unwrap(); let buf_str = std::str::from_utf8(&buf[..]).unwrap(); @@ -28,7 +27,7 @@ pub fn op_test_sync(data: &[u8], zero_copy: Option) -> CoreOp { Op::Sync(result_box) } -pub fn op_test_async(data: &[u8], zero_copy: Option) -> CoreOp { +pub fn op_test_async(data: &[u8], zero_copy: Option) -> Op { let data_str = std::str::from_utf8(&data[..]).unwrap().to_string(); let fut = async move { if let Some(buf) = zero_copy { @@ -46,7 +45,7 @@ pub fn op_test_async(data: &[u8], zero_copy: Option) -> CoreOp { assert!(rx.await.is_ok()); let result = b"test"; let result_box: Buf = Box::new(*result); - Ok(result_box) + result_box }; Op::Async(fut.boxed())