1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 07:14:47 -05:00

feat(core): Add support for async ops in realms (#14734)

Pull request #14019 enabled initial support for realms, but it did not
include support for async ops anywhere other than the main realm. The
main issue was that the `js_recv_cb` callback, which resolves promises
corresponding to async ops, was only set for the main realm, so async
ops in other realms would never resolve. Furthermore, promise ID's are
specific to each realm, which meant that async ops from other realms
would result in a wrong promise from the main realm being resolved.

This change creates a `ContextState` struct, similar to
`JsRuntimeState` but stored in a slot of each `v8::Context`, which
contains a `js_recv_cb` callback for each realm. Combined with a new
list of known realms, which stores them as `v8::Weak<v8::Context>`,
and a change in the `#[op]` macro to pass the current context to
`queue_async_op`, this makes it possible to send the results of
promises for different realms to their realm, and prevent the ID's
from getting mixed up.

Additionally, since promise ID's are no longer unique to the isolate,
having a single set of unrefed ops doesn't work. This change therefore
also moves `unrefed_ops` from `JsRuntimeState` to `ContextState`, and
adds the lengths of the unrefed op sets for all known realms to get
the total number of unrefed ops to compare in the event loop.

Co-authored-by: Luis Malheiro <luismalheiro@gmail.com>
This commit is contained in:
Andreu Botella 2022-08-10 20:04:20 +02:00 committed by GitHub
parent 5b2ae25f13
commit f16fe44303
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 309 additions and 59 deletions

View file

@ -8,6 +8,7 @@ use crate::ops_builtin::WasmStreamingResource;
use crate::resolve_url_or_path;
use crate::serde_v8::from_v8;
use crate::source_map::apply_source_map as apply_source_map_;
use crate::JsRealm;
use crate::JsRuntime;
use crate::OpDecl;
use crate::ZeroCopyBuf;
@ -64,14 +65,14 @@ fn to_v8_fn(
#[op(v8)]
fn op_ref_op(scope: &mut v8::HandleScope, promise_id: i32) {
let state_rc = JsRuntime::state(scope);
state_rc.borrow_mut().unrefed_ops.remove(&promise_id);
let context_state = JsRealm::state_from_scope(scope);
context_state.borrow_mut().unrefed_ops.remove(&promise_id);
}
#[op(v8)]
fn op_unref_op(scope: &mut v8::HandleScope, promise_id: i32) {
let state_rc = JsRuntime::state(scope);
state_rc.borrow_mut().unrefed_ops.insert(promise_id);
let context_state = JsRealm::state_from_scope(scope);
context_state.borrow_mut().unrefed_ops.insert(promise_id);
}
#[op(v8)]

View file

@ -46,7 +46,8 @@ use std::sync::Once;
use std::task::Context;
use std::task::Poll;
type PendingOpFuture = OpCall<(PromiseId, OpId, OpResult)>;
type PendingOpFuture =
OpCall<(v8::Global<v8::Context>, PromiseId, OpId, OpResult)>;
pub enum Snapshot {
Static(&'static [u8]),
@ -146,11 +147,19 @@ pub type SharedArrayBufferStore =
pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>;
#[derive(Default)]
pub(crate) struct ContextState {
js_recv_cb: Option<v8::Global<v8::Function>>,
// TODO(andreubotella): Move the rest of Option<Global<Function>> fields from
// JsRuntimeState to this struct.
pub(crate) unrefed_ops: HashSet<i32>,
}
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
global_realm: Option<JsRealm>,
pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>,
known_realms: Vec<v8::Weak<v8::Context>>,
pub(crate) js_macrotask_cbs: Vec<v8::Global<v8::Function>>,
pub(crate) js_nexttick_cbs: Vec<v8::Global<v8::Function>>,
pub(crate) js_promise_reject_cb: Option<v8::Global<v8::Function>>,
@ -167,7 +176,6 @@ pub(crate) struct JsRuntimeState {
pub(crate) source_map_getter: Option<Box<dyn SourceMapGetter>>,
pub(crate) source_map_cache: SourceMapCache,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) unrefed_ops: HashSet<i32>,
pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
#[allow(dead_code)]
@ -383,13 +391,14 @@ impl JsRuntime {
.module_loader
.unwrap_or_else(|| Rc::new(NoopModuleLoader));
let known_realms = vec![v8::Weak::new(&mut isolate, &global_context)];
isolate.set_slot(Rc::new(RefCell::new(JsRuntimeState {
global_realm: Some(JsRealm(global_context)),
global_realm: Some(JsRealm(global_context.clone())),
known_realms,
pending_promise_exceptions: HashMap::new(),
pending_dyn_mod_evaluate: vec![],
pending_mod_evaluate: None,
dyn_module_evaluate_idle_counter: 0,
js_recv_cb: None,
js_macrotask_cbs: vec![],
js_nexttick_cbs: vec![],
js_promise_reject_cb: None,
@ -399,7 +408,6 @@ impl JsRuntime {
source_map_getter: options.source_map_getter,
source_map_cache: Default::default(),
pending_ops: FuturesUnordered::new(),
unrefed_ops: HashSet::new(),
shared_array_buffer_store: options.shared_array_buffer_store,
compiled_wasm_module_store: options.compiled_wasm_module_store,
op_state: op_state.clone(),
@ -409,6 +417,10 @@ impl JsRuntime {
waker: AtomicWaker::new(),
})));
global_context
.open(&mut isolate)
.set_slot(&mut isolate, Rc::<RefCell<ContextState>>::default());
let module_map = ModuleMap::new(loader, op_state);
isolate.set_slot(Rc::new(RefCell::new(module_map)));
@ -432,7 +444,8 @@ impl JsRuntime {
// Init extension ops
js_runtime.init_extension_ops().unwrap();
// Init callbacks (opresolve)
js_runtime.init_cbs();
let global_realm = js_runtime.global_realm();
js_runtime.init_cbs(&global_realm);
js_runtime
}
@ -470,12 +483,22 @@ impl JsRuntime {
&Self::state(self.v8_isolate()).borrow().op_ctxs,
self.built_from_snapshot,
);
context.set_slot(scope, Rc::<RefCell<ContextState>>::default());
Self::state(scope)
.borrow_mut()
.known_realms
.push(v8::Weak::new(scope, &context));
JsRealm::new(v8::Global::new(scope, context))
};
if !self.built_from_snapshot {
self.init_extension_js(&realm)?;
}
self.init_cbs(&realm);
Ok(realm)
}
@ -629,15 +652,17 @@ impl JsRuntime {
}
/// Grabs a reference to core.js' opresolve & syncOpsCache()
fn init_cbs(&mut self) {
let scope = &mut self.handle_scope();
let recv_cb =
Self::grab_global::<v8::Function>(scope, "Deno.core.opresolve").unwrap();
let recv_cb = v8::Global::new(scope, recv_cb);
// Put global handles in state
let state_rc = JsRuntime::state(scope);
let mut state = state_rc.borrow_mut();
state.js_recv_cb.replace(recv_cb);
fn init_cbs(&mut self, realm: &JsRealm) {
let recv_cb = {
let scope = &mut realm.handle_scope(self.v8_isolate());
let recv_cb =
Self::grab_global::<v8::Function>(scope, "Deno.core.opresolve")
.expect("Deno.core.opresolve is undefined in the realm");
v8::Global::new(scope, recv_cb)
};
// Put global handle in callback state
let state = realm.state(self.v8_isolate());
state.borrow_mut().js_recv_cb.replace(recv_cb);
}
/// Returns the runtime's op state, which can be used to maintain ops
@ -704,8 +729,21 @@ impl JsRuntime {
Rc::new(NoopModuleLoader),
state.borrow().op_state.clone(),
))));
// Drop other v8::Global handles before snapshotting
std::mem::take(&mut state.borrow_mut().js_recv_cb);
{
for weak_context in &state.borrow().known_realms {
if let Some(context) = weak_context.to_global(self.v8_isolate()) {
let realm = JsRealm::new(context.clone());
let realm_state = realm.state(self.v8_isolate());
std::mem::take(&mut realm_state.borrow_mut().js_recv_cb);
context
.open(self.v8_isolate())
.clear_all_slots(self.v8_isolate());
}
}
state.borrow_mut().known_realms.clear();
}
let snapshot_creator = self.snapshot_creator.as_mut().unwrap();
let snapshot = snapshot_creator
@ -1023,8 +1061,16 @@ Pending dynamic modules:\n".to_string();
let state = state_rc.borrow_mut();
let module_map = module_map_rc.borrow();
let mut num_unrefed_ops = 0;
for weak_context in &state.known_realms {
if let Some(context) = weak_context.to_global(isolate) {
let realm = JsRealm::new(context);
num_unrefed_ops += realm.state(isolate).borrow().unrefed_ops.len();
}
}
EventLoopPendingState {
has_pending_refed_ops: state.pending_ops.len() > state.unrefed_ops.len(),
has_pending_refed_ops: state.pending_ops.len() > num_unrefed_ops,
has_pending_dyn_imports: module_map.has_pending_dynamic_imports(),
has_pending_dyn_module_evaluation: !state
.pending_dyn_mod_evaluate
@ -1792,17 +1838,30 @@ impl JsRuntime {
fn resolve_async_ops(&mut self, cx: &mut Context) -> Result<(), Error> {
let state_rc = Self::state(self.v8_isolate());
let js_recv_cb_handle = state_rc.borrow().js_recv_cb.clone().unwrap();
let scope = &mut self.handle_scope();
// We keep a list of promise IDs and OpResults per realm. Since v8::Context
// isn't hashable, `results_per_realm` is a vector of (context, list) tuples
type ResultList = Vec<(i32, OpResult)>;
let mut results_per_realm: Vec<(v8::Global<v8::Context>, ResultList)> = {
let known_realms = &mut state_rc.borrow_mut().known_realms;
let mut results = Vec::with_capacity(known_realms.len());
// We return async responses to JS in unbounded batches (may change),
// each batch is a flat vector of tuples:
// `[promise_id1, op_result1, promise_id2, op_result2, ...]`
// promise_id is a simple integer, op_result is an ops::OpResult
// which contains a value OR an error, encoded as a tuple.
// This batch is received in JS via the special `arguments` variable
// and then each tuple is used to resolve or reject promises
let mut args: Vec<v8::Local<v8::Value>> = vec![];
// Avoid calling the method multiple times
let isolate = self.v8_isolate();
// Remove GC'd realms from `known_realms` at the same time as we populate
// `results` with those that are still alive.
known_realms.retain(|weak| {
if !weak.is_empty() {
let context = weak.to_global(isolate).unwrap();
results.push((context, vec![]));
true
} else {
false
}
});
results
};
// Now handle actual ops.
{
@ -1811,27 +1870,61 @@ impl JsRuntime {
while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx)
{
let (promise_id, op_id, resp) = item;
state.unrefed_ops.remove(&promise_id);
let (context, promise_id, op_id, resp) = item;
state.op_state.borrow().tracker.track_async_completed(op_id);
args.push(v8::Integer::new(scope, promise_id as i32).into());
args.push(resp.to_v8(scope).unwrap());
for (context2, results) in results_per_realm.iter_mut() {
if context == *context2 {
results.push((promise_id, resp));
break;
}
}
JsRealm::new(context)
.state(self.v8_isolate())
.borrow_mut()
.unrefed_ops
.remove(&promise_id);
}
}
if args.is_empty() {
return Ok(());
for (context, results) in results_per_realm {
if results.is_empty() {
continue;
}
let realm = JsRealm::new(context);
let js_recv_cb_handle = realm
.state(self.v8_isolate())
.borrow()
.js_recv_cb
.clone()
.unwrap();
let scope = &mut realm.handle_scope(self.v8_isolate());
// We return async responses to JS in unbounded batches (may change),
// each batch is a flat vector of tuples:
// `[promise_id1, op_result1, promise_id2, op_result2, ...]`
// promise_id is a simple integer, op_result is an ops::OpResult
// which contains a value OR an error, encoded as a tuple.
// This batch is received in JS via the special `arguments` variable
// and then each tuple is used to resolve or reject promises
let mut args = vec![];
for (promise_id, resp) in results.into_iter() {
args.push(v8::Integer::new(scope, promise_id as i32).into());
args.push(resp.to_v8(scope).unwrap());
}
let tc_scope = &mut v8::TryCatch::new(scope);
let js_recv_cb = js_recv_cb_handle.open(tc_scope);
let this = v8::undefined(tc_scope).into();
js_recv_cb.call(tc_scope, this, args.as_slice());
if let Some(exception) = tc_scope.exception() {
return exception_to_err_result(tc_scope, exception, false);
}
}
let tc_scope = &mut v8::TryCatch::new(scope);
let js_recv_cb = js_recv_cb_handle.open(tc_scope);
let this = v8::undefined(tc_scope).into();
js_recv_cb.call(tc_scope, this, args.as_slice());
match tc_scope.exception() {
None => Ok(()),
Some(exception) => exception_to_err_result(tc_scope, exception, false),
}
Ok(())
}
fn drain_macrotasks(&mut self) -> Result<(), Error> {
@ -1937,6 +2030,28 @@ impl JsRealm {
&self.0
}
pub(crate) fn state(
&self,
isolate: &mut v8::Isolate,
) -> Rc<RefCell<ContextState>> {
self
.context()
.open(isolate)
.get_slot::<Rc<RefCell<ContextState>>>(isolate)
.unwrap()
.clone()
}
pub(crate) fn state_from_scope(
scope: &mut v8::HandleScope,
) -> Rc<RefCell<ContextState>> {
let context = scope.get_current_context();
context
.get_slot::<Rc<RefCell<ContextState>>>(scope)
.unwrap()
.clone()
}
pub fn handle_scope<'s>(
&self,
isolate: &'s mut v8::Isolate,
@ -2005,7 +2120,8 @@ impl JsRealm {
#[inline]
pub fn queue_async_op(
scope: &v8::Isolate,
op: impl Future<Output = (PromiseId, OpId, OpResult)> + 'static,
op: impl Future<Output = (v8::Global<v8::Context>, PromiseId, OpId, OpResult)>
+ 'static,
) {
let state_rc = JsRuntime::state(scope);
let mut state = state_rc.borrow_mut();
@ -2162,11 +2278,11 @@ pub mod tests {
)
.unwrap();
{
let realm = runtime.global_realm();
let isolate = runtime.v8_isolate();
let state_rc = JsRuntime::state(isolate);
let state = state_rc.borrow();
assert_eq!(state.pending_ops.len(), 2);
assert_eq!(state.unrefed_ops.len(), 0);
assert_eq!(state_rc.borrow().pending_ops.len(), 2);
assert_eq!(realm.state(isolate).borrow().unrefed_ops.len(), 0);
}
runtime
.execute_script(
@ -2178,11 +2294,11 @@ pub mod tests {
)
.unwrap();
{
let realm = runtime.global_realm();
let isolate = runtime.v8_isolate();
let state_rc = JsRuntime::state(isolate);
let state = state_rc.borrow();
assert_eq!(state.pending_ops.len(), 2);
assert_eq!(state.unrefed_ops.len(), 2);
assert_eq!(state_rc.borrow().pending_ops.len(), 2);
assert_eq!(realm.state(isolate).borrow().unrefed_ops.len(), 2);
}
runtime
.execute_script(
@ -2194,11 +2310,11 @@ pub mod tests {
)
.unwrap();
{
let realm = runtime.global_realm();
let isolate = runtime.v8_isolate();
let state_rc = JsRuntime::state(isolate);
let state = state_rc.borrow();
assert_eq!(state.pending_ops.len(), 2);
assert_eq!(state.unrefed_ops.len(), 0);
assert_eq!(state_rc.borrow().pending_ops.len(), 2);
assert_eq!(realm.state(isolate).borrow().unrefed_ops.len(), 0);
}
}
@ -3635,4 +3751,132 @@ assertEquals(1, notify_return_value);
assert!(ret.open(runtime.v8_isolate()).is_true());
}
}
#[tokio::test]
async fn js_realm_async_ops() {
// Test that returning a ZeroCopyBuf and throwing an exception from a async
// op result in objects with prototypes from the right realm. Note that we
// don't test the result of returning structs, because they will be
// serialized to objects with null prototype.
#[op]
async fn op_test(fail: bool) -> Result<ZeroCopyBuf, Error> {
if !fail {
Ok(ZeroCopyBuf::empty())
} else {
Err(crate::error::type_error("Test"))
}
}
let mut runtime = JsRuntime::new(RuntimeOptions {
extensions: vec![Extension::builder().ops(vec![op_test::decl()]).build()],
get_error_class_fn: Some(&|error| {
crate::error::get_custom_error_class(error).unwrap()
}),
..Default::default()
});
let global_realm = runtime.global_realm();
let new_realm = runtime.create_realm().unwrap();
let mut rets = vec![];
// Test in both realms
for realm in [global_realm, new_realm].into_iter() {
let ret = realm
.execute_script(
runtime.v8_isolate(),
"",
r#"
(async function () {
const buf = await Deno.core.opAsync("op_test", false);
let err;
try {
await Deno.core.opAsync("op_test", true);
} catch(e) {
err = e;
}
return buf instanceof Uint8Array && buf.byteLength === 0 &&
err instanceof TypeError && err.message === "Test" ;
})();
"#,
)
.unwrap();
rets.push((realm, ret));
}
runtime.run_event_loop(false).await.unwrap();
for ret in rets {
let scope = &mut ret.0.handle_scope(runtime.v8_isolate());
let value = v8::Local::new(scope, ret.1);
let promise = v8::Local::<v8::Promise>::try_from(value).unwrap();
let result = promise.result(scope);
assert!(result.is_boolean() && result.is_true());
}
}
#[tokio::test]
async fn js_realm_ref_unref_ops() {
run_in_task(|cx| {
// Never resolves.
#[op]
async fn op_pending() {
futures::future::pending().await
}
let mut runtime = JsRuntime::new(RuntimeOptions {
extensions: vec![Extension::builder()
.ops(vec![op_pending::decl()])
.build()],
..Default::default()
});
let main_realm = runtime.global_realm();
let other_realm = runtime.create_realm().unwrap();
main_realm
.execute_script(
runtime.v8_isolate(),
"",
"var promise = Deno.core.opAsync('op_pending');",
)
.unwrap();
other_realm
.execute_script(
runtime.v8_isolate(),
"",
"var promise = Deno.core.opAsync('op_pending');",
)
.unwrap();
assert!(matches!(runtime.poll_event_loop(cx, false), Poll::Pending));
main_realm
.execute_script(
runtime.v8_isolate(),
"",
r#"
let promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId");
Deno.core.opSync("op_unref_op", promise[promiseIdSymbol]);
"#,
)
.unwrap();
assert!(matches!(runtime.poll_event_loop(cx, false), Poll::Pending));
other_realm
.execute_script(
runtime.v8_isolate(),
"",
r#"
let promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId");
Deno.core.opSync("op_unref_op", promise[promiseIdSymbol]);
"#,
)
.unwrap();
assert!(matches!(
runtime.poll_event_loop(cx, false),
Poll::Ready(Ok(()))
));
});
}
}

View file

@ -190,7 +190,7 @@ fn codegen_v8_async(
quote! {
let result = match result {
Ok(fut) => fut.await,
Err(e) => return (promise_id, op_id, #core::_ops::to_op_result::<()>(get_class, Err(e))),
Err(e) => return (context, promise_id, op_id, #core::_ops::to_op_result::<()>(get_class, Err(e))),
};
}
} else {
@ -233,11 +233,16 @@ fn codegen_v8_async(
state.get_error_class_fn
};
let context = {
let local = scope.get_current_context();
#core::v8::Global::new(scope, local)
};
#pre_result
#core::_ops::queue_async_op(scope, async move {
let result = #result_fut
#result_wrapper
(promise_id, op_id, #core::_ops::to_op_result(get_class, result))
(context, promise_id, op_id, #core::_ops::to_op_result(get_class, result))
});
}
}