1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

Modify op dispatcher to include &mut Isolate argument (#4821)

- Removes unnecessary RwLock and Rc around the op registry table
- Preparation to move resource_table to deno_core::Isolate.
- Towards #3453, #4222
This commit is contained in:
Ryan Dahl 2020-04-19 23:54:46 -04:00 committed by GitHub
parent 4e3532fe7b
commit c1ec042a00
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 272 additions and 203 deletions

View file

@ -41,11 +41,19 @@ struct AsyncArgs {
promise_id: Option<u64>, promise_id: Option<u64>,
} }
pub fn json_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op pub fn json_op<D>(
d: D,
) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where where
D: Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>, D: Fn(
&mut deno_core::Isolate,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>,
{ {
move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| { move |isolate: &mut deno_core::Isolate,
control: &[u8],
zero_copy: Option<ZeroCopyBuf>| {
let async_args: AsyncArgs = match serde_json::from_slice(control) { let async_args: AsyncArgs = match serde_json::from_slice(control) {
Ok(args) => args, Ok(args) => args,
Err(e) => { Err(e) => {
@ -58,7 +66,7 @@ where
let result = serde_json::from_slice(control) let result = serde_json::from_slice(control)
.map_err(OpError::from) .map_err(OpError::from)
.and_then(|args| d(args, zero_copy)); .and_then(|args| d(isolate, args, zero_copy));
// Convert to Op // Convert to Op
match result { match result {

View file

@ -113,11 +113,15 @@ fn test_parse_min_record() {
assert_eq!(parse_min_record(&buf), None); assert_eq!(parse_min_record(&buf), None);
} }
pub fn minimal_op<D>(d: D) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op pub fn minimal_op<D>(
d: D,
) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where where
D: Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp, D: Fn(bool, i32, Option<ZeroCopyBuf>) -> MinimalOp,
{ {
move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| { move |_isolate: &mut deno_core::Isolate,
control: &[u8],
zero_copy: Option<ZeroCopyBuf>| {
let mut record = match parse_min_record(control) { let mut record = match parse_min_record(control) {
Some(r) => r, Some(r) => r,
None => { None => {

View file

@ -3,20 +3,21 @@ use crate::fs as deno_fs;
use crate::op_error::OpError; use crate::op_error::OpError;
use crate::ops::json_op; use crate::ops::json_op;
use crate::state::State; use crate::state::State;
use deno_core::*; use deno_core::Isolate;
use deno_core::OpDispatcher;
use deno_core::OpId;
use deno_core::PluginInitContext;
use deno_core::PluginInitFn;
use deno_core::ZeroCopyBuf;
use dlopen::symbor::Library; use dlopen::symbor::Library;
use std::collections::HashMap; use std::collections::HashMap;
use std::ffi::OsStr; use std::ffi::OsStr;
use std::path::Path; use std::path::Path;
use std::rc::Rc;
pub fn init(i: &mut Isolate, s: &State, r: Rc<deno_core::OpRegistry>) { pub fn init(i: &mut Isolate, s: &State) {
let r_ = r;
i.register_op( i.register_op(
"op_open_plugin", "op_open_plugin",
s.core_op(json_op(s.stateful_op(move |state, args, zero_copy| { s.core_op(json_op(s.stateful_op2(op_open_plugin))),
op_open_plugin(&r_, state, args, zero_copy)
}))),
); );
} }
@ -52,7 +53,7 @@ struct OpenPluginArgs {
} }
pub fn op_open_plugin( pub fn op_open_plugin(
registry: &Rc<deno_core::OpRegistry>, isolate: &mut deno_core::Isolate,
state: &State, state: &State,
args: Value, args: Value,
_zero_copy: Option<ZeroCopyBuf>, _zero_copy: Option<ZeroCopyBuf>,
@ -91,8 +92,8 @@ pub fn op_open_plugin(
// The inclusion of prefix and rid is designed to avoid any // The inclusion of prefix and rid is designed to avoid any
// op name collision beyond the bound of a single loaded // op name collision beyond the bound of a single loaded
// plugin instance. // plugin instance.
let op_id = registry let op_id = isolate
.register(&format!("plugin_{}_{}", rid, op.0), state.core_op(op.1)); .register_op(&format!("plugin_{}_{}", rid, op.0), state.core_op(op.1));
plugin_resource.ops.insert(op.0, op_id); plugin_resource.ops.insert(op.0, op_id);
} }

View file

@ -12,7 +12,11 @@ use std::convert::From;
pub fn web_worker_op<D>( pub fn web_worker_op<D>(
sender: mpsc::Sender<WorkerEvent>, sender: mpsc::Sender<WorkerEvent>,
dispatcher: D, dispatcher: D,
) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError> ) -> impl Fn(
&mut deno_core::Isolate,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>
where where
D: Fn( D: Fn(
&mpsc::Sender<WorkerEvent>, &mpsc::Sender<WorkerEvent>,
@ -20,7 +24,8 @@ where
Option<ZeroCopyBuf>, Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>, ) -> Result<JsonOp, OpError>,
{ {
move |args: Value, move |_isolate: &mut deno_core::Isolate,
args: Value,
zero_copy: Option<ZeroCopyBuf>| zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> { dispatcher(&sender, args, zero_copy) } -> Result<JsonOp, OpError> { dispatcher(&sender, args, zero_copy) }
} }
@ -29,7 +34,11 @@ pub fn web_worker_op2<D>(
handle: WebWorkerHandle, handle: WebWorkerHandle,
sender: mpsc::Sender<WorkerEvent>, sender: mpsc::Sender<WorkerEvent>,
dispatcher: D, dispatcher: D,
) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError> ) -> impl Fn(
&mut deno_core::Isolate,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>
where where
D: Fn( D: Fn(
WebWorkerHandle, WebWorkerHandle,
@ -38,7 +47,8 @@ where
Option<ZeroCopyBuf>, Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>, ) -> Result<JsonOp, OpError>,
{ {
move |args: Value, move |_isolate: &mut deno_core::Isolate,
args: Value,
zero_copy: Option<ZeroCopyBuf>| zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> { -> Result<JsonOp, OpError> {
dispatcher(handle.clone(), &sender, args, zero_copy) dispatcher(handle.clone(), &sender, args, zero_copy)

View file

@ -73,7 +73,7 @@ impl State {
pub fn stateful_json_op<D>( pub fn stateful_json_op<D>(
&self, &self,
dispatcher: D, dispatcher: D,
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op ) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where where
D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>, D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
{ {
@ -85,18 +85,21 @@ impl State {
pub fn core_op<D>( pub fn core_op<D>(
&self, &self,
dispatcher: D, dispatcher: D,
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op ) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where where
D: Fn(&[u8], Option<ZeroCopyBuf>) -> Op, D: Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op,
{ {
let state = self.clone(); let state = self.clone();
move |control: &[u8], zero_copy: Option<ZeroCopyBuf>| -> Op { move |isolate: &mut deno_core::Isolate,
control: &[u8],
zero_copy: Option<ZeroCopyBuf>|
-> Op {
let bytes_sent_control = control.len() as u64; let bytes_sent_control = control.len() as u64;
let bytes_sent_zero_copy = let bytes_sent_zero_copy =
zero_copy.as_ref().map(|b| b.len()).unwrap_or(0) as u64; zero_copy.as_ref().map(|b| b.len()).unwrap_or(0) as u64;
let op = dispatcher(control, zero_copy); let op = dispatcher(isolate, control, zero_copy);
match op { match op {
Op::Sync(buf) => { Op::Sync(buf) => {
@ -162,15 +165,45 @@ impl State {
pub fn stateful_op<D>( pub fn stateful_op<D>(
&self, &self,
dispatcher: D, dispatcher: D,
) -> impl Fn(Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError> ) -> impl Fn(
&mut deno_core::Isolate,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>
where where
D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>, D: Fn(&State, Value, Option<ZeroCopyBuf>) -> Result<JsonOp, OpError>,
{ {
let state = self.clone(); let state = self.clone();
move |args: Value, move |_isolate: &mut deno_core::Isolate,
args: Value,
zero_copy: Option<ZeroCopyBuf>| zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> { dispatcher(&state, args, zero_copy) } -> Result<JsonOp, OpError> { dispatcher(&state, args, zero_copy) }
} }
pub fn stateful_op2<D>(
&self,
dispatcher: D,
) -> impl Fn(
&mut deno_core::Isolate,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>
where
D: Fn(
&mut deno_core::Isolate,
&State,
Value,
Option<ZeroCopyBuf>,
) -> Result<JsonOp, OpError>,
{
let state = self.clone();
move |isolate: &mut deno_core::Isolate,
args: Value,
zero_copy: Option<ZeroCopyBuf>|
-> Result<JsonOp, OpError> {
dispatcher(isolate, &state, args, zero_copy)
}
}
} }
impl ModuleLoader for State { impl ModuleLoader for State {

View file

@ -133,11 +133,10 @@ impl WebWorker {
ops::fetch::init(isolate, &state); ops::fetch::init(isolate, &state);
if has_deno_namespace { if has_deno_namespace {
let op_registry = isolate.op_registry.clone();
ops::runtime_compiler::init(isolate, &state); ops::runtime_compiler::init(isolate, &state);
ops::fs::init(isolate, &state); ops::fs::init(isolate, &state);
ops::fs_events::init(isolate, &state); ops::fs_events::init(isolate, &state);
ops::plugins::init(isolate, &state, op_registry); ops::plugins::init(isolate, &state);
ops::net::init(isolate, &state); ops::net::init(isolate, &state);
ops::tls::init(isolate, &state); ops::tls::init(isolate, &state);
ops::os::init(isolate, &state); ops::os::init(isolate, &state);

View file

@ -222,7 +222,6 @@ impl MainWorker {
let state_ = state.clone(); let state_ = state.clone();
let mut worker = Worker::new(name, startup_data, state_); let mut worker = Worker::new(name, startup_data, state_);
{ {
let op_registry = worker.isolate.op_registry.clone();
let isolate = &mut worker.isolate; let isolate = &mut worker.isolate;
ops::runtime::init(isolate, &state); ops::runtime::init(isolate, &state);
ops::runtime_compiler::init(isolate, &state); ops::runtime_compiler::init(isolate, &state);
@ -231,7 +230,7 @@ impl MainWorker {
ops::fs::init(isolate, &state); ops::fs::init(isolate, &state);
ops::fs_events::init(isolate, &state); ops::fs_events::init(isolate, &state);
ops::io::init(isolate, &state); ops::io::init(isolate, &state);
ops::plugins::init(isolate, &state, op_registry); ops::plugins::init(isolate, &state);
ops::net::init(isolate, &state); ops::net::init(isolate, &state);
ops::tls::init(isolate, &state); ops::tls::init(isolate, &state);
ops::os::init(isolate, &state); ops::os::init(isolate, &state);

View file

@ -580,14 +580,16 @@ pub mod tests {
let mut isolate = EsIsolate::new(loader, StartupData::None, false); let mut isolate = EsIsolate::new(loader, StartupData::None, false);
let dispatcher = let dispatcher = move |_isolate: &mut Isolate,
move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> Op { control: &[u8],
dispatch_count_.fetch_add(1, Ordering::Relaxed); _zero_copy: Option<ZeroCopyBuf>|
assert_eq!(control.len(), 1); -> Op {
assert_eq!(control[0], 42); dispatch_count_.fetch_add(1, Ordering::Relaxed);
let buf = vec![43u8, 0, 0, 0].into_boxed_slice(); assert_eq!(control.len(), 1);
Op::Async(futures::future::ready(buf).boxed()) assert_eq!(control[0], 42);
}; let buf = vec![43u8, 0, 0, 0].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
};
isolate.register_op("test", dispatcher); isolate.register_op("test", dispatcher);

View file

@ -111,20 +111,22 @@ impl Isolate {
F: 'static + Fn(State, u32, Option<ZeroCopyBuf>) -> Result<u32, Error>, F: 'static + Fn(State, u32, Option<ZeroCopyBuf>) -> Result<u32, Error>,
{ {
let state = self.state.clone(); let state = self.state.clone();
let core_handler = let core_handler = move |_isolate: &mut deno_core::Isolate,
move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op { control_buf: &[u8],
let state = state.clone(); zero_copy_buf: Option<ZeroCopyBuf>|
let record = Record::from(control_buf); -> Op {
let is_sync = record.promise_id == 0; let state = state.clone();
assert!(is_sync); let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(is_sync);
let result: i32 = match handler(state, record.rid, zero_copy_buf) { let result: i32 = match handler(state, record.rid, zero_copy_buf) {
Ok(r) => r as i32, Ok(r) => r as i32,
Err(_) => -1, Err(_) => -1,
};
let buf = RecordBuf::from(Record { result, ..record })[..].into();
Op::Sync(buf)
}; };
let buf = RecordBuf::from(Record { result, ..record })[..].into();
Op::Sync(buf)
};
self.core_isolate.register_op(name, core_handler); self.core_isolate.register_op(name, core_handler);
} }
@ -139,25 +141,27 @@ impl Isolate {
<F::Ok as TryInto<i32>>::Error: Debug, <F::Ok as TryInto<i32>>::Error: Debug,
{ {
let state = self.state.clone(); let state = self.state.clone();
let core_handler = let core_handler = move |_isolate: &mut deno_core::Isolate,
move |control_buf: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op { control_buf: &[u8],
let state = state.clone(); zero_copy_buf: Option<ZeroCopyBuf>|
let record = Record::from(control_buf); -> Op {
let is_sync = record.promise_id == 0; let state = state.clone();
assert!(!is_sync); let record = Record::from(control_buf);
let is_sync = record.promise_id == 0;
assert!(!is_sync);
let fut = async move { let fut = async move {
let op = handler(state, record.rid, zero_copy_buf); let op = handler(state, record.rid, zero_copy_buf);
let result = op let result = op
.map_ok(|r| r.try_into().expect("op result does not fit in i32")) .map_ok(|r| r.try_into().expect("op result does not fit in i32"))
.unwrap_or_else(|_| -1) .unwrap_or_else(|_| -1)
.await; .await;
RecordBuf::from(Record { result, ..record })[..].into() RecordBuf::from(Record { result, ..record })[..].into()
};
Op::Async(fut.boxed_local())
}; };
Op::Async(fut.boxed_local())
};
self.core_isolate.register_op(name, core_handler); self.core_isolate.register_op(name, core_handler);
} }
} }

View file

@ -28,7 +28,6 @@ use std::mem::forget;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::option::Option; use std::option::Option;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc;
use std::sync::{Arc, Mutex, Once}; use std::sync::{Arc, Mutex, Once};
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
@ -177,7 +176,7 @@ pub struct Isolate {
pending_unref_ops: FuturesUnordered<PendingOpFuture>, pending_unref_ops: FuturesUnordered<PendingOpFuture>,
have_unpolled_ops: bool, have_unpolled_ops: bool,
startup_script: Option<OwnedScript>, startup_script: Option<OwnedScript>,
pub op_registry: Rc<OpRegistry>, pub op_registry: OpRegistry,
waker: AtomicWaker, waker: AtomicWaker,
error_handler: Option<Box<IsolateErrorHandleFn>>, error_handler: Option<Box<IsolateErrorHandleFn>>,
} }
@ -313,7 +312,7 @@ impl Isolate {
pending_unref_ops: FuturesUnordered::new(), pending_unref_ops: FuturesUnordered::new(),
have_unpolled_ops: false, have_unpolled_ops: false,
startup_script, startup_script,
op_registry: Rc::new(OpRegistry::new()), op_registry: OpRegistry::new(),
waker: AtomicWaker::new(), waker: AtomicWaker::new(),
error_handler: None, error_handler: None,
}; };
@ -343,9 +342,9 @@ impl Isolate {
/// corresponds to the second argument of Deno.core.dispatch(). /// corresponds to the second argument of Deno.core.dispatch().
/// ///
/// Requires runtime to explicitly ask for op ids before using any of the ops. /// Requires runtime to explicitly ask for op ids before using any of the ops.
pub fn register_op<F>(&self, name: &str, op: F) -> OpId pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId
where where
F: Fn(&[u8], Option<ZeroCopyBuf>) -> Op + 'static, F: Fn(&mut Isolate, &[u8], Option<ZeroCopyBuf>) -> Op + 'static,
{ {
self.op_registry.register(name, op) self.op_registry.register(name, op)
} }
@ -381,17 +380,14 @@ impl Isolate {
control_buf: &[u8], control_buf: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>, zero_copy_buf: Option<ZeroCopyBuf>,
) -> Option<(OpId, Box<[u8]>)> { ) -> Option<(OpId, Box<[u8]>)> {
let maybe_op = self.op_registry.call(op_id, control_buf, zero_copy_buf); let op = if let Some(dispatcher) = self.op_registry.get(op_id) {
dispatcher(self, control_buf, zero_copy_buf)
let op = match maybe_op { } else {
Some(op) => op, let message =
None => { v8::String::new(scope, &format!("Unknown op id: {}", op_id)).unwrap();
let message = let exception = v8::Exception::type_error(scope, message);
v8::String::new(scope, &format!("Unknown op id: {}", op_id)).unwrap(); scope.isolate().throw_exception(exception);
let exception = v8::Exception::type_error(scope, message); return None;
scope.isolate().throw_exception(exception);
return None;
}
}; };
debug_assert_eq!(self.shared.size(), 0); debug_assert_eq!(self.shared.size(), 0);
@ -768,56 +764,58 @@ pub mod tests {
let mut isolate = Isolate::new(StartupData::None, false); let mut isolate = Isolate::new(StartupData::None, false);
let dispatcher = let dispatcher = move |_isolate: &mut Isolate,
move |control: &[u8], _zero_copy: Option<ZeroCopyBuf>| -> Op { control: &[u8],
dispatch_count_.fetch_add(1, Ordering::Relaxed); _zero_copy: Option<ZeroCopyBuf>|
match mode { -> Op {
Mode::Async => { dispatch_count_.fetch_add(1, Ordering::Relaxed);
assert_eq!(control.len(), 1); match mode {
assert_eq!(control[0], 42); Mode::Async => {
let buf = vec![43u8].into_boxed_slice(); assert_eq!(control.len(), 1);
Op::Async(futures::future::ready(buf).boxed()) assert_eq!(control[0], 42);
} let buf = vec![43u8].into_boxed_slice();
Mode::AsyncUnref => { Op::Async(futures::future::ready(buf).boxed())
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
vec![43u8].into_boxed_slice()
};
Op::AsyncUnref(fut.boxed())
}
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Sync(buf)
}
Mode::OverflowResSync => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 99;
let buf = vec.into_boxed_slice();
Op::Sync(buf)
}
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
} }
}; Mode::AsyncUnref => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let fut = async {
// This future never finish.
futures::future::pending::<()>().await;
vec![43u8].into_boxed_slice()
};
Op::AsyncUnref(fut.boxed())
}
Mode::OverflowReqSync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Sync(buf)
}
Mode::OverflowResSync => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 99;
let buf = vec.into_boxed_slice();
Op::Sync(buf)
}
Mode::OverflowReqAsync => {
assert_eq!(control.len(), 100 * 1024 * 1024);
let buf = vec![43u8].into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
Mode::OverflowResAsync => {
assert_eq!(control.len(), 1);
assert_eq!(control[0], 42);
let mut vec = Vec::<u8>::new();
vec.resize(100 * 1024 * 1024, 0);
vec[0] = 4;
let buf = vec.into_boxed_slice();
Op::Async(futures::future::ready(buf).boxed())
}
}
};
isolate.register_op("test", dispatcher); isolate.register_op("test", dispatcher);

View file

@ -1,10 +1,10 @@
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
use crate::Isolate;
use crate::ZeroCopyBuf; use crate::ZeroCopyBuf;
use futures::Future; use futures::Future;
use std::collections::HashMap; use std::collections::HashMap;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::sync::RwLock;
pub type OpId = u32; pub type OpId = u32;
@ -21,72 +21,48 @@ pub enum Op {
} }
/// Main type describing op /// Main type describing op
pub type OpDispatcher = dyn Fn(&[u8], Option<ZeroCopyBuf>) -> Op + 'static; pub type OpDispatcher =
dyn Fn(&mut Isolate, &[u8], Option<ZeroCopyBuf>) -> Op + 'static;
#[derive(Default)] #[derive(Default)]
pub struct OpRegistry { pub struct OpRegistry {
dispatchers: RwLock<Vec<Rc<OpDispatcher>>>, dispatchers: Vec<Rc<OpDispatcher>>,
name_to_id: RwLock<HashMap<String, OpId>>, name_to_id: HashMap<String, OpId>,
} }
impl OpRegistry { impl OpRegistry {
pub fn new() -> Self { pub fn new() -> Self {
let registry = Self::default(); let mut registry = Self::default();
let op_id = registry.register("ops", |_, _| { let op_id = registry.register("ops", |isolate, _, _| {
// ops is a special op which is handled in call. let buf = isolate.op_registry.json_map();
unreachable!() Op::Sync(buf)
}); });
assert_eq!(op_id, 0); assert_eq!(op_id, 0);
registry registry
} }
pub fn register<F>(&self, name: &str, op: F) -> OpId pub fn register<F>(&mut self, name: &str, op: F) -> OpId
where where
F: Fn(&[u8], Option<ZeroCopyBuf>) -> Op + 'static, F: Fn(&mut Isolate, &[u8], Option<ZeroCopyBuf>) -> Op + 'static,
{ {
let mut lock = self.dispatchers.write().unwrap(); let op_id = self.dispatchers.len() as u32;
let op_id = lock.len() as u32;
let mut name_lock = self.name_to_id.write().unwrap(); let existing = self.name_to_id.insert(name.to_string(), op_id);
let existing = name_lock.insert(name.to_string(), op_id);
assert!( assert!(
existing.is_none(), existing.is_none(),
format!("Op already registered: {}", name) format!("Op already registered: {}", name)
); );
lock.push(Rc::new(op)); self.dispatchers.push(Rc::new(op));
drop(name_lock);
drop(lock);
op_id op_id
} }
fn json_map(&self) -> Buf { fn json_map(&self) -> Buf {
let lock = self.name_to_id.read().unwrap(); let op_map_json = serde_json::to_string(&self.name_to_id).unwrap();
let op_map_json = serde_json::to_string(&*lock).unwrap();
op_map_json.as_bytes().to_owned().into_boxed_slice() op_map_json.as_bytes().to_owned().into_boxed_slice()
} }
/// This function returns None only if op with given id doesn't exist in registry. pub fn get(&self, op_id: OpId) -> Option<Rc<OpDispatcher>> {
pub fn call( self.dispatchers.get(op_id as usize).map(Rc::clone)
&self,
op_id: OpId,
control: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>,
) -> Option<Op> {
// Op with id 0 has special meaning - it's a special op that is always
// provided to retrieve op id map. The map consists of name to `OpId`
// mappings.
if op_id == 0 {
return Some(Op::Sync(self.json_map()));
}
let lock = self.dispatchers.read().unwrap();
if let Some(op) = lock.get(op_id as usize) {
let op_ = Rc::clone(&op);
// This should allow for changes to the dispatcher list during a call.
drop(lock);
Some(op_(control, zero_copy_buf))
} else {
None
}
} }
} }
@ -94,12 +70,12 @@ impl OpRegistry {
fn test_op_registry() { fn test_op_registry() {
use std::sync::atomic; use std::sync::atomic;
use std::sync::Arc; use std::sync::Arc;
let op_registry = OpRegistry::new(); let mut op_registry = OpRegistry::new();
let c = Arc::new(atomic::AtomicUsize::new(0)); let c = Arc::new(atomic::AtomicUsize::new(0));
let c_ = c.clone(); let c_ = c.clone();
let test_id = op_registry.register("test", move |_, _| { let test_id = op_registry.register("test", move |_, _, _| {
c_.fetch_add(1, atomic::Ordering::SeqCst); c_.fetch_add(1, atomic::Ordering::SeqCst);
Op::Sync(Box::new([])) Op::Sync(Box::new([]))
}); });
@ -108,10 +84,12 @@ fn test_op_registry() {
let mut expected = HashMap::new(); let mut expected = HashMap::new();
expected.insert("ops".to_string(), 0); expected.insert("ops".to_string(), 0);
expected.insert("test".to_string(), 1); expected.insert("test".to_string(), 1);
let name_to_id = op_registry.name_to_id.read().unwrap(); assert_eq!(op_registry.name_to_id, expected);
assert_eq!(*name_to_id, expected);
let res = op_registry.call(test_id, &[], None).unwrap(); let mut isolate = Isolate::new(crate::StartupData::None, false);
let dispatch = op_registry.get(test_id).unwrap();
let res = dispatch(&mut isolate, &[], None);
if let Op::Sync(buf) = res { if let Op::Sync(buf) = res {
assert_eq!(buf.len(), 0); assert_eq!(buf.len(), 0);
} else { } else {
@ -119,40 +97,57 @@ fn test_op_registry() {
} }
assert_eq!(c.load(atomic::Ordering::SeqCst), 1); assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
let res = op_registry.call(100, &[], None); assert!(op_registry.get(100).is_none());
assert!(res.is_none());
} }
#[test] #[test]
fn register_op_during_call() { fn register_op_during_call() {
use std::sync::atomic; use std::sync::atomic;
use std::sync::Arc; use std::sync::Arc;
let op_registry = Arc::new(OpRegistry::new()); use std::sync::Mutex;
let op_registry = Arc::new(Mutex::new(OpRegistry::new()));
let c = Arc::new(atomic::AtomicUsize::new(0)); let c = Arc::new(atomic::AtomicUsize::new(0));
let c_ = c.clone(); let c_ = c.clone();
let op_registry_ = op_registry.clone(); let op_registry_ = op_registry.clone();
let test_id = op_registry.register("dynamic_register_op", move |_, _| {
let c__ = c_.clone(); let test_id = {
op_registry_.register("test", move |_, _| { let mut g = op_registry.lock().unwrap();
c__.fetch_add(1, atomic::Ordering::SeqCst); g.register("dynamic_register_op", move |_, _, _| {
let c__ = c_.clone();
let mut g = op_registry_.lock().unwrap();
g.register("test", move |_, _, _| {
c__.fetch_add(1, atomic::Ordering::SeqCst);
Op::Sync(Box::new([]))
});
Op::Sync(Box::new([])) Op::Sync(Box::new([]))
}); })
Op::Sync(Box::new([])) };
});
assert!(test_id != 0); assert!(test_id != 0);
op_registry.call(test_id, &[], None); let mut isolate = Isolate::new(crate::StartupData::None, false);
let dispatcher1 = {
let g = op_registry.lock().unwrap();
g.get(test_id).unwrap()
};
dispatcher1(&mut isolate, &[], None);
let mut expected = HashMap::new(); let mut expected = HashMap::new();
expected.insert("ops".to_string(), 0); expected.insert("ops".to_string(), 0);
expected.insert("dynamic_register_op".to_string(), 1); expected.insert("dynamic_register_op".to_string(), 1);
expected.insert("test".to_string(), 2); expected.insert("test".to_string(), 2);
let name_to_id = op_registry.name_to_id.read().unwrap(); {
assert_eq!(*name_to_id, expected); let g = op_registry.lock().unwrap();
assert_eq!(g.name_to_id, expected);
}
let res = op_registry.call(2, &[], None).unwrap(); let dispatcher2 = {
let g = op_registry.lock().unwrap();
g.get(2).unwrap()
};
let res = dispatcher2(&mut isolate, &[], None);
if let Op::Sync(buf) = res { if let Op::Sync(buf) = res {
assert_eq!(buf.len(), 0); assert_eq!(buf.len(), 0);
} else { } else {
@ -160,6 +155,6 @@ fn register_op_during_call() {
} }
assert_eq!(c.load(atomic::Ordering::SeqCst), 1); assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
let res = op_registry.call(100, &[], None); let g = op_registry.lock().unwrap();
assert!(res.is_none()); assert!(g.get(100).is_none());
} }

View file

@ -1,5 +1,7 @@
use crate::isolate::ZeroCopyBuf; // TODO(ry) This plugin module is superfluous. Try to remove definitions for
use crate::ops::Op; // "init_fn!", "PluginInitFn", and "PluginInitContext".
use crate::ops::OpDispatcher;
pub type PluginInitFn = fn(context: &mut dyn PluginInitContext); pub type PluginInitFn = fn(context: &mut dyn PluginInitContext);
@ -7,7 +9,7 @@ pub trait PluginInitContext {
fn register_op( fn register_op(
&mut self, &mut self,
name: &str, name: &str,
op: Box<dyn Fn(&[u8], Option<ZeroCopyBuf>) -> Op + 'static>, op: Box<OpDispatcher>, // TODO(ry) rename to dispatcher, not op.
); );
} }

View file

@ -49,11 +49,14 @@ pub struct TSState {
fn compiler_op<D>( fn compiler_op<D>(
ts_state: Arc<Mutex<TSState>>, ts_state: Arc<Mutex<TSState>>,
dispatcher: D, dispatcher: D,
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op ) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op
where where
D: Fn(&mut TSState, &[u8]) -> Op, D: Fn(&mut TSState, &[u8]) -> Op,
{ {
move |control: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op { move |_isolate: &mut deno_core::Isolate,
control: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>|
-> Op {
assert!(zero_copy_buf.is_none()); // zero_copy_buf unused in compiler. assert!(zero_copy_buf.is_none()); // zero_copy_buf unused in compiler.
let mut s = ts_state.lock().unwrap(); let mut s = ts_state.lock().unwrap();
dispatcher(&mut s, control) dispatcher(&mut s, control)
@ -326,11 +329,14 @@ pub fn trace_serializer() {
/// Isolate. /// Isolate.
pub fn op_fetch_asset<S: ::std::hash::BuildHasher>( pub fn op_fetch_asset<S: ::std::hash::BuildHasher>(
custom_assets: HashMap<String, PathBuf, S>, custom_assets: HashMap<String, PathBuf, S>,
) -> impl Fn(&[u8], Option<ZeroCopyBuf>) -> Op { ) -> impl Fn(&mut deno_core::Isolate, &[u8], Option<ZeroCopyBuf>) -> Op {
for (_, path) in custom_assets.iter() { for (_, path) in custom_assets.iter() {
println!("cargo:rerun-if-changed={}", path.display()); println!("cargo:rerun-if-changed={}", path.display());
} }
move |control: &[u8], zero_copy_buf: Option<ZeroCopyBuf>| -> Op { move |_isolate: &mut deno_core::Isolate,
control: &[u8],
zero_copy_buf: Option<ZeroCopyBuf>|
-> Op {
assert!(zero_copy_buf.is_none()); // zero_copy_buf unused in this op. assert!(zero_copy_buf.is_none()); // zero_copy_buf unused in this op.
let name = std::str::from_utf8(control).unwrap(); let name = std::str::from_utf8(control).unwrap();

View file

@ -13,7 +13,11 @@ fn init(context: &mut dyn PluginInitContext) {
} }
init_fn!(init); init_fn!(init);
pub fn op_test_sync(data: &[u8], zero_copy: Option<ZeroCopyBuf>) -> Op { pub fn op_test_sync(
_isolate: &mut deno_core::Isolate,
data: &[u8],
zero_copy: Option<ZeroCopyBuf>,
) -> Op {
if let Some(buf) = zero_copy { if let Some(buf) = zero_copy {
let data_str = std::str::from_utf8(&data[..]).unwrap(); let data_str = std::str::from_utf8(&data[..]).unwrap();
let buf_str = std::str::from_utf8(&buf[..]).unwrap(); let buf_str = std::str::from_utf8(&buf[..]).unwrap();
@ -27,7 +31,11 @@ pub fn op_test_sync(data: &[u8], zero_copy: Option<ZeroCopyBuf>) -> Op {
Op::Sync(result_box) Op::Sync(result_box)
} }
pub fn op_test_async(data: &[u8], zero_copy: Option<ZeroCopyBuf>) -> Op { pub fn op_test_async(
_isolate: &mut deno_core::Isolate,
data: &[u8],
zero_copy: Option<ZeroCopyBuf>,
) -> Op {
let data_str = std::str::from_utf8(&data[..]).unwrap().to_string(); let data_str = std::str::from_utf8(&data[..]).unwrap().to_string();
let fut = async move { let fut = async move {
if let Some(buf) = zero_copy { if let Some(buf) = zero_copy {