mirror of
https://github.com/denoland/deno.git
synced 2025-01-15 02:20:15 -05:00
161cf7cdfd
This change simplifies how we execute V8. Previously V8 Isolates jumped around threads every time they were woken up. This was overly complex and potentially hurting performance in a myriad ways. Now isolates run on their own dedicated thread and never move. - blocking_json spawns a thread and does not use a thread pool - op_host_poll_worker and op_host_resume_worker are non-operational - removes Worker::get_message and Worker::post_message - ThreadSafeState::workers table contains WorkerChannel entries instead of actual Worker instances. - MainWorker and CompilerWorker are no longer Futures. - The multi-threaded version of deno_core_http_bench was removed. - AyncOps no longer need to be Send + Sync This PR is very large and several tests were disabled to speed integration: - installer_test_local_module_run - installer_test_remote_module_run - _015_duplicate_parallel_import - _026_workers
175 lines
4.6 KiB
Rust
175 lines
4.6 KiB
Rust
// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
|
|
use crate::ZeroCopyBuf;
|
|
use futures::Future;
|
|
use std::collections::HashMap;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use std::sync::RwLock;
|
|
|
|
pub type OpId = u32;
|
|
|
|
pub type Buf = Box<[u8]>;
|
|
|
|
pub type OpAsyncFuture<E> = Pin<Box<dyn Future<Output = Result<Buf, E>>>>;
|
|
|
|
pub(crate) type PendingOpFuture =
|
|
Pin<Box<dyn Future<Output = Result<(OpId, Buf), CoreError>>>>;
|
|
|
|
pub type OpResult<E> = Result<Op<E>, E>;
|
|
|
|
pub enum Op<E> {
|
|
Sync(Buf),
|
|
Async(OpAsyncFuture<E>),
|
|
/// AsyncUnref is the variation of Async, which doesn't block the program
|
|
/// exiting.
|
|
AsyncUnref(OpAsyncFuture<E>),
|
|
}
|
|
|
|
pub type CoreError = ();
|
|
|
|
pub type CoreOp = Op<CoreError>;
|
|
|
|
/// Main type describing op
|
|
pub type OpDispatcher = dyn Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static;
|
|
|
|
#[derive(Default)]
|
|
pub struct OpRegistry {
|
|
dispatchers: RwLock<Vec<Arc<Box<OpDispatcher>>>>,
|
|
name_to_id: RwLock<HashMap<String, OpId>>,
|
|
}
|
|
|
|
impl OpRegistry {
|
|
pub fn new() -> Self {
|
|
let registry = Self::default();
|
|
let op_id = registry.register("ops", |_, _| {
|
|
// ops is a special op which is handled in call.
|
|
unreachable!()
|
|
});
|
|
assert_eq!(op_id, 0);
|
|
registry
|
|
}
|
|
|
|
pub fn register<F>(&self, name: &str, op: F) -> OpId
|
|
where
|
|
F: Fn(&[u8], Option<ZeroCopyBuf>) -> CoreOp + 'static,
|
|
{
|
|
let mut lock = self.dispatchers.write().unwrap();
|
|
let op_id = lock.len() as u32;
|
|
|
|
let mut name_lock = self.name_to_id.write().unwrap();
|
|
let existing = name_lock.insert(name.to_string(), op_id);
|
|
assert!(
|
|
existing.is_none(),
|
|
format!("Op already registered: {}", name)
|
|
);
|
|
|
|
lock.push(Arc::new(Box::new(op)));
|
|
drop(name_lock);
|
|
drop(lock);
|
|
op_id
|
|
}
|
|
|
|
fn json_map(&self) -> Buf {
|
|
let lock = self.name_to_id.read().unwrap();
|
|
let op_map_json = serde_json::to_string(&*lock).unwrap();
|
|
op_map_json.as_bytes().to_owned().into_boxed_slice()
|
|
}
|
|
|
|
/// This function returns None only if op with given id doesn't exist in registry.
|
|
pub fn call(
|
|
&self,
|
|
op_id: OpId,
|
|
control: &[u8],
|
|
zero_copy_buf: Option<ZeroCopyBuf>,
|
|
) -> Option<CoreOp> {
|
|
// Op with id 0 has special meaning - it's a special op that is always
|
|
// provided to retrieve op id map. The map consists of name to `OpId`
|
|
// mappings.
|
|
if op_id == 0 {
|
|
return Some(Op::Sync(self.json_map()));
|
|
}
|
|
let lock = self.dispatchers.read().unwrap();
|
|
if let Some(op) = lock.get(op_id as usize) {
|
|
let op_ = Arc::clone(&op);
|
|
// This should allow for changes to the dispatcher list during a call.
|
|
drop(lock);
|
|
Some(op_(control, zero_copy_buf))
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_op_registry() {
|
|
use std::sync::atomic;
|
|
use std::sync::Arc;
|
|
let op_registry = OpRegistry::new();
|
|
|
|
let c = Arc::new(atomic::AtomicUsize::new(0));
|
|
let c_ = c.clone();
|
|
|
|
let test_id = op_registry.register("test", move |_, _| {
|
|
c_.fetch_add(1, atomic::Ordering::SeqCst);
|
|
CoreOp::Sync(Box::new([]))
|
|
});
|
|
assert!(test_id != 0);
|
|
|
|
let mut expected = HashMap::new();
|
|
expected.insert("ops".to_string(), 0);
|
|
expected.insert("test".to_string(), 1);
|
|
let name_to_id = op_registry.name_to_id.read().unwrap();
|
|
assert_eq!(*name_to_id, expected);
|
|
|
|
let res = op_registry.call(test_id, &[], None).unwrap();
|
|
if let Op::Sync(buf) = res {
|
|
assert_eq!(buf.len(), 0);
|
|
} else {
|
|
unreachable!();
|
|
}
|
|
assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
|
|
|
|
let res = op_registry.call(100, &[], None);
|
|
assert!(res.is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn register_op_during_call() {
|
|
use std::sync::atomic;
|
|
use std::sync::Arc;
|
|
let op_registry = Arc::new(OpRegistry::new());
|
|
|
|
let c = Arc::new(atomic::AtomicUsize::new(0));
|
|
let c_ = c.clone();
|
|
|
|
let op_registry_ = op_registry.clone();
|
|
let test_id = op_registry.register("dynamic_register_op", move |_, _| {
|
|
let c__ = c_.clone();
|
|
op_registry_.register("test", move |_, _| {
|
|
c__.fetch_add(1, atomic::Ordering::SeqCst);
|
|
CoreOp::Sync(Box::new([]))
|
|
});
|
|
CoreOp::Sync(Box::new([]))
|
|
});
|
|
assert!(test_id != 0);
|
|
|
|
op_registry.call(test_id, &[], None);
|
|
|
|
let mut expected = HashMap::new();
|
|
expected.insert("ops".to_string(), 0);
|
|
expected.insert("dynamic_register_op".to_string(), 1);
|
|
expected.insert("test".to_string(), 2);
|
|
let name_to_id = op_registry.name_to_id.read().unwrap();
|
|
assert_eq!(*name_to_id, expected);
|
|
|
|
let res = op_registry.call(2, &[], None).unwrap();
|
|
if let Op::Sync(buf) = res {
|
|
assert_eq!(buf.len(), 0);
|
|
} else {
|
|
unreachable!();
|
|
}
|
|
assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
|
|
|
|
let res = op_registry.call(100, &[], None);
|
|
assert!(res.is_none());
|
|
}
|