mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
refactor(ops): replace ZeroCopyBuf
arg by 2nd generic deserializable arg (#10448)
This commit is contained in:
parent
f208e6a26f
commit
1e8e44f4c3
14 changed files with 88 additions and 114 deletions
|
@ -156,7 +156,7 @@ fn create_compiler_snapshot(
|
||||||
});
|
});
|
||||||
js_runtime.register_op(
|
js_runtime.register_op(
|
||||||
"op_build_info",
|
"op_build_info",
|
||||||
op_sync(move |_state, _args: Value, _bufs| {
|
op_sync(move |_state, _args: Value, _: ()| {
|
||||||
Ok(json!({
|
Ok(json!({
|
||||||
"buildSpecifier": build_specifier,
|
"buildSpecifier": build_specifier,
|
||||||
"libs": build_libs,
|
"libs": build_libs,
|
||||||
|
@ -167,7 +167,7 @@ fn create_compiler_snapshot(
|
||||||
// files, but a slightly different implementation at build time.
|
// files, but a slightly different implementation at build time.
|
||||||
js_runtime.register_op(
|
js_runtime.register_op(
|
||||||
"op_load",
|
"op_load",
|
||||||
op_sync(move |_state, args, _bufs| {
|
op_sync(move |_state, args, _: ()| {
|
||||||
let v: LoadArgs = serde_json::from_value(args)?;
|
let v: LoadArgs = serde_json::from_value(args)?;
|
||||||
// we need a basic file to send to tsc to warm it up.
|
// we need a basic file to send to tsc to warm it up.
|
||||||
if v.specifier == build_specifier {
|
if v.specifier == build_specifier {
|
||||||
|
|
|
@ -1826,7 +1826,7 @@ where
|
||||||
V: de::DeserializeOwned,
|
V: de::DeserializeOwned,
|
||||||
R: Serialize + 'static,
|
R: Serialize + 'static,
|
||||||
{
|
{
|
||||||
op_sync(move |s, args, _bufs| {
|
op_sync(move |s, args, _: ()| {
|
||||||
let state = s.borrow_mut::<State>();
|
let state = s.borrow_mut::<State>();
|
||||||
op_fn(state, args)
|
op_fn(state, args)
|
||||||
})
|
})
|
||||||
|
|
|
@ -14,7 +14,7 @@ unitTest(async function metrics(): Promise<void> {
|
||||||
assert(m1.opsDispatched > 0);
|
assert(m1.opsDispatched > 0);
|
||||||
assert(m1.opsCompleted > 0);
|
assert(m1.opsCompleted > 0);
|
||||||
assert(m1.bytesSentControl === 0);
|
assert(m1.bytesSentControl === 0);
|
||||||
assert(m1.bytesSentData >= 0);
|
assert(m1.bytesSentData === 0);
|
||||||
assert(m1.bytesReceived === 0);
|
assert(m1.bytesReceived === 0);
|
||||||
const m1OpWrite = m1.ops["op_write_async"];
|
const m1OpWrite = m1.ops["op_write_async"];
|
||||||
assert(m1OpWrite.opsDispatchedAsync > 0);
|
assert(m1OpWrite.opsDispatchedAsync > 0);
|
||||||
|
@ -29,15 +29,13 @@ unitTest(async function metrics(): Promise<void> {
|
||||||
assert(m2.opsDispatchedAsync > m1.opsDispatchedAsync);
|
assert(m2.opsDispatchedAsync > m1.opsDispatchedAsync);
|
||||||
assert(m2.opsCompletedAsync > m1.opsCompletedAsync);
|
assert(m2.opsCompletedAsync > m1.opsCompletedAsync);
|
||||||
assert(m2.bytesSentControl === m1.bytesSentControl);
|
assert(m2.bytesSentControl === m1.bytesSentControl);
|
||||||
assert(m2.bytesSentData >= m1.bytesSentData + dataMsg.byteLength);
|
assert(m2.bytesSentData === 0);
|
||||||
assert(m2.bytesReceived === m1.bytesReceived);
|
assert(m2.bytesReceived === m1.bytesReceived);
|
||||||
const m2OpWrite = m2.ops["op_write_async"];
|
const m2OpWrite = m2.ops["op_write_async"];
|
||||||
assert(m2OpWrite.opsDispatchedAsync > m1OpWrite.opsDispatchedAsync);
|
assert(m2OpWrite.opsDispatchedAsync > m1OpWrite.opsDispatchedAsync);
|
||||||
assert(m2OpWrite.opsCompletedAsync > m1OpWrite.opsCompletedAsync);
|
assert(m2OpWrite.opsCompletedAsync > m1OpWrite.opsCompletedAsync);
|
||||||
assert(m2OpWrite.bytesSentControl === m1OpWrite.bytesSentControl);
|
assert(m2OpWrite.bytesSentControl === m1OpWrite.bytesSentControl);
|
||||||
assert(
|
assert(m2OpWrite.bytesSentData === 0);
|
||||||
m2OpWrite.bytesSentData >= m1OpWrite.bytesSentData + dataMsg.byteLength,
|
|
||||||
);
|
|
||||||
assert(m2OpWrite.bytesReceived === m1OpWrite.bytesReceived);
|
assert(m2OpWrite.bytesReceived === m1OpWrite.bytesReceived);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -227,7 +227,7 @@ fn op<F>(op_fn: F) -> Box<OpFn>
|
||||||
where
|
where
|
||||||
F: Fn(&mut State, Value) -> Result<Value, AnyError> + 'static,
|
F: Fn(&mut State, Value) -> Result<Value, AnyError> + 'static,
|
||||||
{
|
{
|
||||||
op_sync(move |s, args, _bufs| {
|
op_sync(move |s, args, _: ()| {
|
||||||
let state = s.borrow_mut::<State>();
|
let state = s.borrow_mut::<State>();
|
||||||
op_fn(state, args)
|
op_fn(state, args)
|
||||||
})
|
})
|
||||||
|
|
|
@ -8,16 +8,15 @@ use deno_core::v8;
|
||||||
use deno_core::JsRuntime;
|
use deno_core::JsRuntime;
|
||||||
use deno_core::Op;
|
use deno_core::Op;
|
||||||
use deno_core::OpState;
|
use deno_core::OpState;
|
||||||
use deno_core::ZeroCopyBuf;
|
|
||||||
|
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
|
||||||
fn create_js_runtime() -> JsRuntime {
|
fn create_js_runtime() -> JsRuntime {
|
||||||
let mut runtime = JsRuntime::new(Default::default());
|
let mut runtime = JsRuntime::new(Default::default());
|
||||||
runtime.register_op("pi_json", op_sync(|_, _: (), _| Ok(314159)));
|
runtime.register_op("pi_json", op_sync(|_, _: (), _: ()| Ok(314159)));
|
||||||
runtime.register_op("pi_async", op_async(op_pi_async));
|
runtime.register_op("pi_async", op_async(op_pi_async));
|
||||||
runtime.register_op("nop", |state, _, _| {
|
runtime.register_op("nop", |state, _| {
|
||||||
Op::Sync(serialize_op_result(Ok(9), state))
|
Op::Sync(serialize_op_result(Ok(9), state))
|
||||||
});
|
});
|
||||||
runtime.sync_ops_cache();
|
runtime.sync_ops_cache();
|
||||||
|
@ -29,7 +28,7 @@ fn create_js_runtime() -> JsRuntime {
|
||||||
async fn op_pi_async(
|
async fn op_pi_async(
|
||||||
_: Rc<RefCell<OpState>>,
|
_: Rc<RefCell<OpState>>,
|
||||||
_: (),
|
_: (),
|
||||||
_: Option<ZeroCopyBuf>,
|
_: (),
|
||||||
) -> Result<i64, AnyError> {
|
) -> Result<i64, AnyError> {
|
||||||
Ok(314159)
|
Ok(314159)
|
||||||
}
|
}
|
||||||
|
|
|
@ -309,21 +309,17 @@ fn opcall<'s>(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Structured args
|
// Deserializable args (may be structured args or ZeroCopyBuf)
|
||||||
let v = args.get(2);
|
let a = args.get(2);
|
||||||
|
let b = args.get(3);
|
||||||
|
|
||||||
// Buf arg (optional)
|
let payload = OpPayload {
|
||||||
let arg3 = args.get(3);
|
scope,
|
||||||
let buf: Option<ZeroCopyBuf> = match serde_v8::from_v8(scope, arg3) {
|
a,
|
||||||
Ok(buf) => buf,
|
b,
|
||||||
Err(err) => {
|
promise_id,
|
||||||
throw_type_error(scope, format!("Err with buf arg: {}", err));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
let op = OpTable::route_op(op_id, state.op_state.clone(), payload);
|
||||||
let payload = OpPayload::new(scope, v, promise_id);
|
|
||||||
let op = OpTable::route_op(op_id, state.op_state.clone(), payload, buf);
|
|
||||||
match op {
|
match op {
|
||||||
Op::Sync(result) => {
|
Op::Sync(result) => {
|
||||||
rv.set(result.to_v8(scope).unwrap());
|
rv.set(result.to_v8(scope).unwrap());
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
|
|
||||||
use deno_core::op_sync;
|
use deno_core::op_sync;
|
||||||
use deno_core::JsRuntime;
|
use deno_core::JsRuntime;
|
||||||
|
use deno_core::ZeroCopyBuf;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
@ -26,21 +27,23 @@ fn main() {
|
||||||
// The op_fn callback takes a state object OpState,
|
// The op_fn callback takes a state object OpState,
|
||||||
// a structured arg of type `T` and an optional ZeroCopyBuf,
|
// a structured arg of type `T` and an optional ZeroCopyBuf,
|
||||||
// a mutable reference to a JavaScript ArrayBuffer
|
// a mutable reference to a JavaScript ArrayBuffer
|
||||||
op_sync(|_state, msg: Option<String>, zero_copy| {
|
op_sync(
|
||||||
let mut out = std::io::stdout();
|
|_state, msg: Option<String>, zero_copy: Option<ZeroCopyBuf>| {
|
||||||
|
let mut out = std::io::stdout();
|
||||||
|
|
||||||
// Write msg to stdout
|
// Write msg to stdout
|
||||||
if let Some(msg) = msg {
|
if let Some(msg) = msg {
|
||||||
out.write_all(msg.as_bytes()).unwrap();
|
out.write_all(msg.as_bytes()).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write the contents of every buffer to stdout
|
// Write the contents of every buffer to stdout
|
||||||
if let Some(buf) = zero_copy {
|
if let Some(buf) = zero_copy {
|
||||||
out.write_all(&buf).unwrap();
|
out.write_all(&buf).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(()) // No meaningful result
|
Ok(()) // No meaningful result
|
||||||
}),
|
},
|
||||||
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Register the JSON op for summing a number array.
|
// Register the JSON op for summing a number array.
|
||||||
|
@ -49,7 +52,7 @@ fn main() {
|
||||||
// The op_sync function automatically deserializes
|
// The op_sync function automatically deserializes
|
||||||
// the first ZeroCopyBuf and serializes the return value
|
// the first ZeroCopyBuf and serializes the return value
|
||||||
// to reduce boilerplate
|
// to reduce boilerplate
|
||||||
op_sync(|_state, nums: Vec<f64>, _| {
|
op_sync(|_state, nums: Vec<f64>, _: ()| {
|
||||||
// Sum inputs
|
// Sum inputs
|
||||||
let sum = nums.iter().fold(0.0, |a, v| a + v);
|
let sum = nums.iter().fold(0.0, |a, v| a + v);
|
||||||
// return as a Result<f64, AnyError>
|
// return as a Result<f64, AnyError>
|
||||||
|
|
61
core/ops.rs
61
core/ops.rs
|
@ -5,7 +5,6 @@ use crate::error::AnyError;
|
||||||
use crate::gotham_state::GothamState;
|
use crate::gotham_state::GothamState;
|
||||||
use crate::resources::ResourceTable;
|
use crate::resources::ResourceTable;
|
||||||
use crate::runtime::GetErrorClassFn;
|
use crate::runtime::GetErrorClassFn;
|
||||||
use crate::ZeroCopyBuf;
|
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use indexmap::IndexMap;
|
use indexmap::IndexMap;
|
||||||
use rusty_v8 as v8;
|
use rusty_v8 as v8;
|
||||||
|
@ -20,41 +19,28 @@ use std::rc::Rc;
|
||||||
|
|
||||||
pub type PromiseId = u64;
|
pub type PromiseId = u64;
|
||||||
pub type OpAsyncFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResult)>>>;
|
pub type OpAsyncFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResult)>>>;
|
||||||
pub type OpFn =
|
pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static;
|
||||||
dyn Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static;
|
|
||||||
pub type OpId = usize;
|
pub type OpId = usize;
|
||||||
|
|
||||||
pub struct OpPayload<'a, 'b, 'c> {
|
pub struct OpPayload<'a, 'b, 'c> {
|
||||||
pub(crate) scope: Option<&'a mut v8::HandleScope<'b>>,
|
pub(crate) scope: &'a mut v8::HandleScope<'b>,
|
||||||
pub(crate) value: Option<v8::Local<'c, v8::Value>>,
|
pub(crate) a: v8::Local<'c, v8::Value>,
|
||||||
|
pub(crate) b: v8::Local<'c, v8::Value>,
|
||||||
pub(crate) promise_id: PromiseId,
|
pub(crate) promise_id: PromiseId,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> {
|
impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> {
|
||||||
pub fn new(
|
pub fn deserialize<T: DeserializeOwned, U: DeserializeOwned>(
|
||||||
scope: &'a mut v8::HandleScope<'b>,
|
self,
|
||||||
value: v8::Local<'c, v8::Value>,
|
) -> Result<(T, U), AnyError> {
|
||||||
promise_id: PromiseId,
|
let a: T = serde_v8::from_v8(self.scope, self.a)
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
scope: Some(scope),
|
|
||||||
value: Some(value),
|
|
||||||
promise_id,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn empty() -> Self {
|
|
||||||
Self {
|
|
||||||
scope: None,
|
|
||||||
value: None,
|
|
||||||
promise_id: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn deserialize<T: DeserializeOwned>(self) -> Result<T, AnyError> {
|
|
||||||
serde_v8::from_v8(self.scope.unwrap(), self.value.unwrap())
|
|
||||||
.map_err(AnyError::from)
|
.map_err(AnyError::from)
|
||||||
.map_err(|e| type_error(format!("Error parsing args: {}", e)))
|
.map_err(|e| type_error(format!("Error parsing args: {}", e)))?;
|
||||||
|
|
||||||
|
let b: U = serde_v8::from_v8(self.scope, self.b)
|
||||||
|
.map_err(AnyError::from)
|
||||||
|
.map_err(|e| type_error(format!("Error parsing args: {}", e)))?;
|
||||||
|
Ok((a, b))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,7 +131,7 @@ pub struct OpTable(IndexMap<String, Rc<OpFn>>);
|
||||||
impl OpTable {
|
impl OpTable {
|
||||||
pub fn register_op<F>(&mut self, name: &str, op_fn: F) -> OpId
|
pub fn register_op<F>(&mut self, name: &str, op_fn: F) -> OpId
|
||||||
where
|
where
|
||||||
F: Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static,
|
F: Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static,
|
||||||
{
|
{
|
||||||
let (op_id, prev) = self.0.insert_full(name.to_owned(), Rc::new(op_fn));
|
let (op_id, prev) = self.0.insert_full(name.to_owned(), Rc::new(op_fn));
|
||||||
assert!(prev.is_none());
|
assert!(prev.is_none());
|
||||||
|
@ -160,7 +146,6 @@ impl OpTable {
|
||||||
op_id: OpId,
|
op_id: OpId,
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
payload: OpPayload,
|
payload: OpPayload,
|
||||||
buf: Option<ZeroCopyBuf>,
|
|
||||||
) -> Op {
|
) -> Op {
|
||||||
let op_fn = state
|
let op_fn = state
|
||||||
.borrow()
|
.borrow()
|
||||||
|
@ -169,7 +154,7 @@ impl OpTable {
|
||||||
.get_index(op_id)
|
.get_index(op_id)
|
||||||
.map(|(_, op_fn)| op_fn.clone());
|
.map(|(_, op_fn)| op_fn.clone());
|
||||||
match op_fn {
|
match op_fn {
|
||||||
Some(f) => (f)(state, payload, buf),
|
Some(f) => (f)(state, payload),
|
||||||
None => Op::NotFound,
|
None => Op::NotFound,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -177,11 +162,7 @@ impl OpTable {
|
||||||
|
|
||||||
impl Default for OpTable {
|
impl Default for OpTable {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
fn dummy(
|
fn dummy(_state: Rc<RefCell<OpState>>, _p: OpPayload) -> Op {
|
||||||
_state: Rc<RefCell<OpState>>,
|
|
||||||
_p: OpPayload,
|
|
||||||
_b: Option<ZeroCopyBuf>,
|
|
||||||
) -> Op {
|
|
||||||
unreachable!()
|
unreachable!()
|
||||||
}
|
}
|
||||||
Self(once(("ops".to_owned(), Rc::new(dummy) as _)).collect())
|
Self(once(("ops".to_owned(), Rc::new(dummy) as _)).collect())
|
||||||
|
@ -200,11 +181,11 @@ mod tests {
|
||||||
let bar_id;
|
let bar_id;
|
||||||
{
|
{
|
||||||
let op_table = &mut state.borrow_mut().op_table;
|
let op_table = &mut state.borrow_mut().op_table;
|
||||||
foo_id = op_table
|
foo_id =
|
||||||
.register_op("foo", |_, _, _| Op::Sync(OpResult::Ok(321.into())));
|
op_table.register_op("foo", |_, _| Op::Sync(OpResult::Ok(321.into())));
|
||||||
assert_eq!(foo_id, 1);
|
assert_eq!(foo_id, 1);
|
||||||
bar_id = op_table
|
bar_id =
|
||||||
.register_op("bar", |_, _, _| Op::Sync(OpResult::Ok(123.into())));
|
op_table.register_op("bar", |_, _| Op::Sync(OpResult::Ok(123.into())));
|
||||||
assert_eq!(bar_id, 2);
|
assert_eq!(bar_id, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ use crate::serialize_op_result;
|
||||||
use crate::Op;
|
use crate::Op;
|
||||||
use crate::OpFn;
|
use crate::OpFn;
|
||||||
use crate::OpState;
|
use crate::OpState;
|
||||||
use crate::ZeroCopyBuf;
|
|
||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
|
@ -35,16 +34,17 @@ use std::rc::Rc;
|
||||||
///
|
///
|
||||||
/// `runtime.sync_ops_cache()` must be called after registering new ops
|
/// `runtime.sync_ops_cache()` must be called after registering new ops
|
||||||
/// A more complete example is available in the examples directory.
|
/// A more complete example is available in the examples directory.
|
||||||
pub fn op_sync<F, V, R>(op_fn: F) -> Box<OpFn>
|
pub fn op_sync<F, A, B, R>(op_fn: F) -> Box<OpFn>
|
||||||
where
|
where
|
||||||
F: Fn(&mut OpState, V, Option<ZeroCopyBuf>) -> Result<R, AnyError> + 'static,
|
F: Fn(&mut OpState, A, B) -> Result<R, AnyError> + 'static,
|
||||||
V: DeserializeOwned,
|
A: DeserializeOwned,
|
||||||
|
B: DeserializeOwned,
|
||||||
R: Serialize + 'static,
|
R: Serialize + 'static,
|
||||||
{
|
{
|
||||||
Box::new(move |state, payload, buf| -> Op {
|
Box::new(move |state, payload| -> Op {
|
||||||
let result = payload
|
let result = payload
|
||||||
.deserialize()
|
.deserialize()
|
||||||
.and_then(|args| op_fn(&mut state.borrow_mut(), args, buf));
|
.and_then(|(a, b)| op_fn(&mut state.borrow_mut(), a, b));
|
||||||
Op::Sync(serialize_op_result(result, state))
|
Op::Sync(serialize_op_result(result, state))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -73,14 +73,15 @@ where
|
||||||
///
|
///
|
||||||
/// `runtime.sync_ops_cache()` must be called after registering new ops
|
/// `runtime.sync_ops_cache()` must be called after registering new ops
|
||||||
/// A more complete example is available in the examples directory.
|
/// A more complete example is available in the examples directory.
|
||||||
pub fn op_async<F, V, R, RV>(op_fn: F) -> Box<OpFn>
|
pub fn op_async<F, A, B, R, RV>(op_fn: F) -> Box<OpFn>
|
||||||
where
|
where
|
||||||
F: Fn(Rc<RefCell<OpState>>, V, Option<ZeroCopyBuf>) -> R + 'static,
|
F: Fn(Rc<RefCell<OpState>>, A, B) -> R + 'static,
|
||||||
V: DeserializeOwned,
|
A: DeserializeOwned,
|
||||||
|
B: DeserializeOwned,
|
||||||
R: Future<Output = Result<RV, AnyError>> + 'static,
|
R: Future<Output = Result<RV, AnyError>> + 'static,
|
||||||
RV: Serialize + 'static,
|
RV: Serialize + 'static,
|
||||||
{
|
{
|
||||||
Box::new(move |state, payload, buf| -> Op {
|
Box::new(move |state, payload| -> Op {
|
||||||
let pid = payload.promise_id;
|
let pid = payload.promise_id;
|
||||||
// Deserialize args, sync error on failure
|
// Deserialize args, sync error on failure
|
||||||
let args = match payload.deserialize() {
|
let args = match payload.deserialize() {
|
||||||
|
@ -89,9 +90,10 @@ where
|
||||||
return Op::Sync(serialize_op_result(Err::<(), AnyError>(err), state))
|
return Op::Sync(serialize_op_result(Err::<(), AnyError>(err), state))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let (a, b) = args;
|
||||||
|
|
||||||
use crate::futures::FutureExt;
|
use crate::futures::FutureExt;
|
||||||
let fut = op_fn(state.clone(), args, buf)
|
let fut = op_fn(state.clone(), a, b)
|
||||||
.map(move |result| (pid, serialize_op_result(result, state)));
|
.map(move |result| (pid, serialize_op_result(result, state)));
|
||||||
Op::Async(Box::pin(fut))
|
Op::Async(Box::pin(fut))
|
||||||
})
|
})
|
||||||
|
@ -108,10 +110,9 @@ mod tests {
|
||||||
async fn op_throw(
|
async fn op_throw(
|
||||||
_state: Rc<RefCell<OpState>>,
|
_state: Rc<RefCell<OpState>>,
|
||||||
msg: Option<String>,
|
msg: Option<String>,
|
||||||
zero_copy: Option<ZeroCopyBuf>,
|
_: (),
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
assert_eq!(msg.unwrap(), "hello");
|
assert_eq!(msg.unwrap(), "hello");
|
||||||
assert!(zero_copy.is_none());
|
|
||||||
Err(crate::error::generic_error("foo"))
|
Err(crate::error::generic_error("foo"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,6 @@ use crate::OpPayload;
|
||||||
use crate::OpResult;
|
use crate::OpResult;
|
||||||
use crate::OpState;
|
use crate::OpState;
|
||||||
use crate::PromiseId;
|
use crate::PromiseId;
|
||||||
use crate::ZeroCopyBuf;
|
|
||||||
use futures::channel::mpsc;
|
use futures::channel::mpsc;
|
||||||
use futures::future::poll_fn;
|
use futures::future::poll_fn;
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
|
@ -520,7 +519,7 @@ impl JsRuntime {
|
||||||
/// * [op_async()](fn.op_async.html)
|
/// * [op_async()](fn.op_async.html)
|
||||||
pub fn register_op<F>(&mut self, name: &str, op_fn: F) -> OpId
|
pub fn register_op<F>(&mut self, name: &str, op_fn: F) -> OpId
|
||||||
where
|
where
|
||||||
F: Fn(Rc<RefCell<OpState>>, OpPayload, Option<ZeroCopyBuf>) -> Op + 'static,
|
F: Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static,
|
||||||
{
|
{
|
||||||
Self::state(self.v8_isolate())
|
Self::state(self.v8_isolate())
|
||||||
.borrow_mut()
|
.borrow_mut()
|
||||||
|
@ -1524,6 +1523,7 @@ pub mod tests {
|
||||||
use crate::error::custom_error;
|
use crate::error::custom_error;
|
||||||
use crate::modules::ModuleSourceFuture;
|
use crate::modules::ModuleSourceFuture;
|
||||||
use crate::op_sync;
|
use crate::op_sync;
|
||||||
|
use crate::ZeroCopyBuf;
|
||||||
use futures::future::lazy;
|
use futures::future::lazy;
|
||||||
use futures::FutureExt;
|
use futures::FutureExt;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
@ -1549,18 +1549,15 @@ pub mod tests {
|
||||||
dispatch_count: Arc<AtomicUsize>,
|
dispatch_count: Arc<AtomicUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dispatch(
|
fn dispatch(rc_op_state: Rc<RefCell<OpState>>, payload: OpPayload) -> Op {
|
||||||
rc_op_state: Rc<RefCell<OpState>>,
|
|
||||||
payload: OpPayload,
|
|
||||||
buf: Option<ZeroCopyBuf>,
|
|
||||||
) -> Op {
|
|
||||||
let rc_op_state2 = rc_op_state.clone();
|
let rc_op_state2 = rc_op_state.clone();
|
||||||
let op_state_ = rc_op_state2.borrow();
|
let op_state_ = rc_op_state2.borrow();
|
||||||
let test_state = op_state_.borrow::<TestState>();
|
let test_state = op_state_.borrow::<TestState>();
|
||||||
test_state.dispatch_count.fetch_add(1, Ordering::Relaxed);
|
test_state.dispatch_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
let (control, buf): (u8, Option<ZeroCopyBuf>) =
|
||||||
|
payload.deserialize().unwrap();
|
||||||
match test_state.mode {
|
match test_state.mode {
|
||||||
Mode::Async => {
|
Mode::Async => {
|
||||||
let control: u8 = payload.deserialize().unwrap();
|
|
||||||
assert_eq!(control, 42);
|
assert_eq!(control, 42);
|
||||||
let resp = (0, serialize_op_result(Ok(43), rc_op_state));
|
let resp = (0, serialize_op_result(Ok(43), rc_op_state));
|
||||||
Op::Async(Box::pin(futures::future::ready(resp)))
|
Op::Async(Box::pin(futures::future::ready(resp)))
|
||||||
|
@ -1970,9 +1967,9 @@ pub mod tests {
|
||||||
let dispatch_count = Arc::new(AtomicUsize::new(0));
|
let dispatch_count = Arc::new(AtomicUsize::new(0));
|
||||||
let dispatch_count_ = dispatch_count.clone();
|
let dispatch_count_ = dispatch_count.clone();
|
||||||
|
|
||||||
let dispatcher = move |state, payload: OpPayload, _buf| -> Op {
|
let dispatcher = move |state, payload: OpPayload| -> Op {
|
||||||
dispatch_count_.fetch_add(1, Ordering::Relaxed);
|
dispatch_count_.fetch_add(1, Ordering::Relaxed);
|
||||||
let control: u8 = payload.deserialize().unwrap();
|
let (control, _): (u8, ()) = payload.deserialize().unwrap();
|
||||||
assert_eq!(control, 42);
|
assert_eq!(control, 42);
|
||||||
let resp = (0, serialize_op_result(Ok(43), state));
|
let resp = (0, serialize_op_result(Ok(43), state));
|
||||||
Op::Async(Box::pin(futures::future::ready(resp)))
|
Op::Async(Box::pin(futures::future::ready(resp)))
|
||||||
|
|
|
@ -150,7 +150,7 @@ use deno_core::OpFn;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
pub fn metrics_op(name: &'static str, op_fn: Box<OpFn>) -> Box<OpFn> {
|
pub fn metrics_op(name: &'static str, op_fn: Box<OpFn>) -> Box<OpFn> {
|
||||||
Box::new(move |op_state, payload, buf| -> Op {
|
Box::new(move |op_state, payload| -> Op {
|
||||||
// TODOs:
|
// TODOs:
|
||||||
// * The 'bytes' metrics seem pretty useless, especially now that the
|
// * The 'bytes' metrics seem pretty useless, especially now that the
|
||||||
// distinction between 'control' and 'data' buffers has become blurry.
|
// distinction between 'control' and 'data' buffers has become blurry.
|
||||||
|
@ -160,12 +160,9 @@ pub fn metrics_op(name: &'static str, op_fn: Box<OpFn>) -> Box<OpFn> {
|
||||||
|
|
||||||
// TODO: remove this, doesn't make a ton of sense
|
// TODO: remove this, doesn't make a ton of sense
|
||||||
let bytes_sent_control = 0;
|
let bytes_sent_control = 0;
|
||||||
let bytes_sent_data = match buf {
|
let bytes_sent_data = 0;
|
||||||
Some(ref b) => b.len(),
|
|
||||||
None => 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
let op = (op_fn)(op_state.clone(), payload, buf);
|
let op = (op_fn)(op_state.clone(), payload);
|
||||||
|
|
||||||
let op_state_ = op_state.clone();
|
let op_state_ = op_state.clone();
|
||||||
let mut s = op_state.borrow_mut();
|
let mut s = op_state.borrow_mut();
|
||||||
|
@ -181,9 +178,9 @@ pub fn metrics_op(name: &'static str, op_fn: Box<OpFn>) -> Box<OpFn> {
|
||||||
use deno_core::futures::future::FutureExt;
|
use deno_core::futures::future::FutureExt;
|
||||||
|
|
||||||
match op {
|
match op {
|
||||||
Op::Sync(buf) => {
|
Op::Sync(result) => {
|
||||||
metrics.op_sync(bytes_sent_control, bytes_sent_data, 0);
|
metrics.op_sync(bytes_sent_control, bytes_sent_data, 0);
|
||||||
Op::Sync(buf)
|
Op::Sync(result)
|
||||||
}
|
}
|
||||||
Op::Async(fut) => {
|
Op::Async(fut) => {
|
||||||
metrics.op_dispatched_async(bytes_sent_control, bytes_sent_data);
|
metrics.op_dispatched_async(bytes_sent_control, bytes_sent_data);
|
||||||
|
|
|
@ -104,9 +104,10 @@ impl<'a> plugin_api::Interface for PluginInterface<'a> {
|
||||||
dispatch_op_fn: plugin_api::DispatchOpFn,
|
dispatch_op_fn: plugin_api::DispatchOpFn,
|
||||||
) -> OpId {
|
) -> OpId {
|
||||||
let plugin_lib = self.plugin_lib.clone();
|
let plugin_lib = self.plugin_lib.clone();
|
||||||
let plugin_op_fn: Box<OpFn> = Box::new(move |state_rc, _payload, buf| {
|
let plugin_op_fn: Box<OpFn> = Box::new(move |state_rc, payload| {
|
||||||
let mut state = state_rc.borrow_mut();
|
let mut state = state_rc.borrow_mut();
|
||||||
let mut interface = PluginInterface::new(&mut state, &plugin_lib);
|
let mut interface = PluginInterface::new(&mut state, &plugin_lib);
|
||||||
|
let (_, buf): ((), Option<ZeroCopyBuf>) = payload.deserialize().unwrap();
|
||||||
let op = dispatch_op_fn(&mut interface, buf);
|
let op = dispatch_op_fn(&mut interface, buf);
|
||||||
match op {
|
match op {
|
||||||
sync_op @ Op::Sync(..) => sync_op,
|
sync_op @ Op::Sync(..) => sync_op,
|
||||||
|
|
|
@ -6,13 +6,14 @@ use deno_core::error::null_opbuf;
|
||||||
use deno_core::futures::channel::mpsc;
|
use deno_core::futures::channel::mpsc;
|
||||||
use deno_core::op_sync;
|
use deno_core::op_sync;
|
||||||
use deno_core::Extension;
|
use deno_core::Extension;
|
||||||
|
use deno_core::ZeroCopyBuf;
|
||||||
|
|
||||||
pub fn init() -> Extension {
|
pub fn init() -> Extension {
|
||||||
Extension::builder()
|
Extension::builder()
|
||||||
.ops(vec![
|
.ops(vec![
|
||||||
(
|
(
|
||||||
"op_worker_post_message",
|
"op_worker_post_message",
|
||||||
op_sync(move |state, _args: (), buf| {
|
op_sync(move |state, _args: (), buf: Option<ZeroCopyBuf>| {
|
||||||
let buf = buf.ok_or_else(null_opbuf)?;
|
let buf = buf.ok_or_else(null_opbuf)?;
|
||||||
let msg_buf: Box<[u8]> = (*buf).into();
|
let msg_buf: Box<[u8]> = (*buf).into();
|
||||||
let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
|
let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
|
||||||
|
@ -25,7 +26,7 @@ pub fn init() -> Extension {
|
||||||
// Notify host that guest worker closes.
|
// Notify host that guest worker closes.
|
||||||
(
|
(
|
||||||
"op_worker_close",
|
"op_worker_close",
|
||||||
op_sync(move |state, _args: (), _bufs| {
|
op_sync(move |state, _: (), _: ()| {
|
||||||
// Notify parent that we're finished
|
// Notify parent that we're finished
|
||||||
let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
|
let mut sender = state.borrow::<mpsc::Sender<WorkerEvent>>().clone();
|
||||||
sender.close_channel();
|
sender.close_channel();
|
||||||
|
|
|
@ -96,7 +96,7 @@ pub fn init(
|
||||||
("op_host_get_message", op_async(op_host_get_message)),
|
("op_host_get_message", op_async(op_host_get_message)),
|
||||||
(
|
(
|
||||||
"op_host_unhandled_error",
|
"op_host_unhandled_error",
|
||||||
op_sync(move |state, message: String, _| {
|
op_sync(move |state, message: String, _: ()| {
|
||||||
if is_main_worker {
|
if is_main_worker {
|
||||||
return Err(generic_error("Cannot be called from main worker."));
|
return Err(generic_error("Cannot be called from main worker."));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue