0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-10-29 08:58:01 -04:00

refactor(core): store pending ops per realm (#19054)

Dispatches op per-realm, and allows JsRealm to be garbage collected.

Slight improvement to benchmarks, but opens opportunity to clean up event loop.

---------

Co-authored-by: Matt Mastracci <matthew@mastracci.com>
This commit is contained in:
Bartek Iwańczuk 2023-05-24 21:15:21 +02:00 committed by GitHub
parent ba6f573b4e
commit 072e2b2fa2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 345 additions and 288 deletions

View file

@ -2,6 +2,7 @@
use crate::error::AnyError;
use crate::gotham_state::GothamState;
use crate::realm::ContextState;
use crate::resources::ResourceTable;
use crate::runtime::GetErrorClassFn;
use crate::runtime::JsRuntimeState;
@ -23,13 +24,11 @@ use std::rc::Weak;
use v8::fast_api::CFunctionInfo;
use v8::fast_api::CTypeInfo;
pub type RealmIdx = u16;
pub type PromiseId = i32;
pub type OpId = u16;
#[pin_project]
pub struct OpCall {
realm_idx: RealmIdx,
promise_id: PromiseId,
op_id: OpId,
/// Future is not necessarily Unpin, so we need to pin_project.
@ -45,7 +44,6 @@ impl OpCall {
fut: Pin<Box<dyn Future<Output = OpResult> + 'static>>,
) -> Self {
Self {
realm_idx: op_ctx.realm_idx,
op_id: op_ctx.id,
promise_id,
fut: MaybeDone::Future(fut),
@ -56,7 +54,6 @@ impl OpCall {
/// `async { value }` or `futures::future::ready(value)`.
pub fn ready(op_ctx: &OpCtx, promise_id: PromiseId, value: OpResult) -> Self {
Self {
realm_idx: op_ctx.realm_idx,
op_id: op_ctx.id,
promise_id,
fut: MaybeDone::Done(value),
@ -65,13 +62,12 @@ impl OpCall {
}
impl Future for OpCall {
type Output = (RealmIdx, PromiseId, OpId, OpResult);
type Output = (PromiseId, OpId, OpResult);
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let realm_idx = self.realm_idx;
let promise_id = self.promise_id;
let op_id = self.op_id;
let fut = &mut *self.project().fut;
@ -88,7 +84,7 @@ impl Future for OpCall {
MaybeDone::Future(f) => f.poll_unpin(cx),
MaybeDone::Gone => std::task::Poll::Pending,
}
.map(move |res| (realm_idx, promise_id, op_id, res))
.map(move |res| (promise_id, op_id, res))
}
}
@ -145,14 +141,13 @@ pub struct OpCtx {
pub decl: Rc<OpDecl>,
pub fast_fn_c_info: Option<NonNull<v8::fast_api::CFunctionInfo>>,
pub runtime_state: Weak<RefCell<JsRuntimeState>>,
// Index of the current realm into `JsRuntimeState::known_realms`.
pub realm_idx: RealmIdx,
pub(crate) context_state: Rc<RefCell<ContextState>>,
}
impl OpCtx {
pub fn new(
pub(crate) fn new(
id: OpId,
realm_idx: RealmIdx,
context_state: Rc<RefCell<ContextState>>,
decl: Rc<OpDecl>,
state: Rc<RefCell<OpState>>,
runtime_state: Weak<RefCell<JsRuntimeState>>,
@ -176,7 +171,7 @@ impl OpCtx {
state,
runtime_state,
decl,
realm_idx,
context_state,
fast_fn_c_info,
}
}

View file

@ -4,8 +4,11 @@ use crate::bindings;
use crate::modules::ModuleCode;
use crate::ops::OpCtx;
use crate::runtime::exception_to_err_result;
use crate::runtime::JsRuntimeState;
use crate::JsRuntime;
use crate::OpCall;
use anyhow::Error;
use futures::stream::FuturesUnordered;
use std::cell::RefCell;
use std::collections::HashSet;
use std::collections::VecDeque;
@ -45,9 +48,11 @@ pub(crate) struct ContextState {
pub(crate) pending_promise_rejections:
VecDeque<(v8::Global<v8::Promise>, v8::Global<v8::Value>)>,
pub(crate) unrefed_ops: HashSet<i32, BuildHasherDefault<IdentityHasher>>,
pub(crate) pending_ops: FuturesUnordered<OpCall>,
// We don't explicitly re-read this prop but need the slice to live alongside
// the context
pub(crate) op_ctxs: Box<[OpCtx]>,
pub(crate) isolate: Option<*mut v8::OwnedIsolate>,
}
/// A representation of a JavaScript realm tied to a [`JsRuntime`], that allows
@ -95,28 +100,110 @@ pub(crate) struct ContextState {
/// keep the underlying V8 context alive even if it would have otherwise been
/// garbage collected.
#[derive(Clone)]
pub struct JsRealm(Rc<v8::Global<v8::Context>>);
impl JsRealm {
pub fn new(context: v8::Global<v8::Context>) -> Self {
JsRealm(Rc::new(context))
#[repr(transparent)]
pub struct JsRealm(pub(crate) JsRealmInner);
#[derive(Clone)]
pub(crate) struct JsRealmInner {
context_state: Rc<RefCell<ContextState>>,
context: Rc<v8::Global<v8::Context>>,
runtime_state: Rc<RefCell<JsRuntimeState>>,
is_global: bool,
}
impl JsRealmInner {
pub(crate) fn new(
context_state: Rc<RefCell<ContextState>>,
context: v8::Global<v8::Context>,
runtime_state: Rc<RefCell<JsRuntimeState>>,
is_global: bool,
) -> Self {
Self {
context_state,
context: context.into(),
runtime_state,
is_global,
}
}
pub fn num_pending_ops(&self) -> usize {
self.context_state.borrow().pending_ops.len()
}
pub fn num_unrefed_ops(&self) -> usize {
self.context_state.borrow().unrefed_ops.len()
}
#[inline(always)]
pub fn context(&self) -> &v8::Global<v8::Context> {
&self.0
&self.context
}
#[inline(always)]
pub(crate) fn state(
pub(crate) fn state(&self) -> Rc<RefCell<ContextState>> {
self.context_state.clone()
}
/// For info on the [`v8::Isolate`] parameter, check [`JsRealm#panics`].
#[inline(always)]
pub fn handle_scope<'s>(
&self,
isolate: &mut v8::Isolate,
) -> Rc<RefCell<ContextState>> {
self
.context()
.open(isolate)
.get_slot::<Rc<RefCell<ContextState>>>(isolate)
.unwrap()
.clone()
isolate: &'s mut v8::Isolate,
) -> v8::HandleScope<'s> {
v8::HandleScope::with_context(isolate, &*self.context)
}
pub(crate) fn check_promise_rejections(
&self,
scope: &mut v8::HandleScope,
) -> Result<(), Error> {
let Some((_, handle)) = self.context_state.borrow_mut().pending_promise_rejections.pop_front() else {
return Ok(());
};
let exception = v8::Local::new(scope, handle);
let state_rc = JsRuntime::state(scope);
let state = state_rc.borrow();
if let Some(inspector) = &state.inspector {
let inspector = inspector.borrow();
inspector.exception_thrown(scope, exception, true);
if inspector.has_blocking_sessions() {
return Ok(());
}
}
exception_to_err_result(scope, exception, true)
}
pub(crate) fn is_same(&self, other: &Rc<v8::Global<v8::Context>>) -> bool {
Rc::ptr_eq(&self.context, other)
}
pub fn destroy(self) {
let state = self.state();
let raw_ptr = self.state().borrow().isolate.unwrap();
// SAFETY: We know the isolate outlives the realm
let isolate = unsafe { raw_ptr.as_mut().unwrap() };
let mut realm_state = state.borrow_mut();
// These globals will prevent snapshots from completing, take them
std::mem::take(&mut realm_state.js_event_loop_tick_cb);
std::mem::take(&mut realm_state.js_build_custom_error_cb);
std::mem::take(&mut realm_state.js_promise_reject_cb);
std::mem::take(&mut realm_state.js_format_exception_cb);
std::mem::take(&mut realm_state.js_wasm_streaming_cb);
// The OpCtx slice may contain a circular reference
std::mem::take(&mut realm_state.op_ctxs);
self.context().open(isolate).clear_all_slots(isolate);
// Expect that this context is dead (we only check this in debug mode)
// TODO(mmastrac): This check fails for some tests, will need to fix this
// debug_assert_eq!(Rc::strong_count(&self.context), 1, "Realm was still alive when we wanted to destory it. Not dropped?");
}
}
impl JsRealm {
pub(crate) fn new(inner: JsRealmInner) -> Self {
Self(inner)
}
#[inline(always)]
@ -130,13 +217,28 @@ impl JsRealm {
.clone()
}
#[inline(always)]
pub fn num_pending_ops(&self) -> usize {
self.0.num_pending_ops()
}
#[inline(always)]
pub fn num_unrefed_ops(&self) -> usize {
self.0.num_unrefed_ops()
}
/// For info on the [`v8::Isolate`] parameter, check [`JsRealm#panics`].
#[inline(always)]
pub fn handle_scope<'s>(
&self,
isolate: &'s mut v8::Isolate,
) -> v8::HandleScope<'s> {
v8::HandleScope::with_context(isolate, &*self.0)
self.0.handle_scope(isolate)
}
#[inline(always)]
pub fn context(&self) -> &v8::Global<v8::Context> {
self.0.context()
}
/// For info on the [`v8::Isolate`] parameter, check [`JsRealm#panics`].
@ -144,8 +246,8 @@ impl JsRealm {
&self,
isolate: &'s mut v8::Isolate,
) -> v8::Local<'s, v8::Object> {
let scope = &mut self.handle_scope(isolate);
self.0.open(scope).global(scope)
let scope = &mut self.0.handle_scope(isolate);
self.0.context.open(scope).global(scope)
}
fn string_from_code<'a>(
@ -206,7 +308,7 @@ impl JsRealm {
name: &'static str,
source_code: ModuleCode,
) -> Result<v8::Global<v8::Value>, Error> {
let scope = &mut self.handle_scope(isolate);
let scope = &mut self.0.handle_scope(isolate);
let source = Self::string_from_code(scope, &source_code).unwrap();
debug_assert!(name.is_ascii());
@ -240,51 +342,23 @@ impl JsRealm {
// TODO(andreubotella): `mod_evaluate`, `load_main_module`, `load_side_module`
}
pub struct JsRealmLocal<'s>(v8::Local<'s, v8::Context>);
impl<'s> JsRealmLocal<'s> {
pub fn new(context: v8::Local<'s, v8::Context>) -> Self {
JsRealmLocal(context)
}
#[inline(always)]
pub fn context(&self) -> v8::Local<v8::Context> {
self.0
}
#[inline(always)]
pub(crate) fn state(
&self,
isolate: &mut v8::Isolate,
) -> Rc<RefCell<ContextState>> {
self
.context()
.get_slot::<Rc<RefCell<ContextState>>>(isolate)
.unwrap()
.clone()
}
pub(crate) fn check_promise_rejections(
&self,
scope: &mut v8::HandleScope,
) -> Result<(), Error> {
let context_state_rc = self.state(scope);
let mut context_state = context_state_rc.borrow_mut();
let Some((_, handle)) = context_state.pending_promise_rejections.pop_front() else {
return Ok(());
};
drop(context_state);
let exception = v8::Local::new(scope, handle);
let state_rc = JsRuntime::state(scope);
let state = state_rc.borrow();
if let Some(inspector) = &state.inspector {
let inspector = inspector.borrow();
inspector.exception_thrown(scope, exception, true);
if inspector.has_blocking_sessions() {
return Ok(());
}
impl Drop for JsRealm {
fn drop(&mut self) {
// Don't do anything special with the global realm
if self.0.is_global {
return;
}
// There's us and there's the runtime
if Rc::strong_count(&self.0.context) == 2 {
self
.0
.runtime_state
.borrow_mut()
.remove_realm(&self.0.context);
assert_eq!(Rc::strong_count(&self.0.context), 1);
self.0.clone().destroy();
assert_eq!(Rc::strong_count(&self.0.context_state), 1);
}
exception_to_err_result(scope, exception, true)
}
}

View file

@ -19,7 +19,7 @@ use crate::modules::ModuleMap;
use crate::ops::*;
use crate::realm::ContextState;
use crate::realm::JsRealm;
use crate::realm::JsRealmLocal;
use crate::realm::JsRealmInner;
use crate::snapshot_util;
use crate::source_map::SourceMapCache;
use crate::source_map::SourceMapGetter;
@ -36,7 +36,6 @@ use futures::future::poll_fn;
use futures::future::Future;
use futures::future::FutureExt;
use futures::future::MaybeDone;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
use futures::task::noop_waker;
use futures::task::AtomicWaker;
@ -156,7 +155,7 @@ pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>;
/// embedder slots.
pub struct JsRuntimeState {
global_realm: Option<JsRealm>,
known_realms: Vec<v8::Weak<v8::Context>>,
known_realms: Vec<JsRealmInner>,
pub(crate) has_tick_scheduled: bool,
pub(crate) pending_dyn_mod_evaluate: Vec<DynImportModEvaluate>,
pub(crate) pending_mod_evaluate: Option<ModEvaluate>,
@ -165,7 +164,6 @@ pub struct JsRuntimeState {
dyn_module_evaluate_idle_counter: u32,
pub(crate) source_map_getter: Option<Rc<Box<dyn SourceMapGetter>>>,
pub(crate) source_map_cache: Rc<RefCell<SourceMapCache>>,
pub(crate) pending_ops: FuturesUnordered<OpCall>,
pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>,
@ -180,6 +178,24 @@ pub struct JsRuntimeState {
waker: AtomicWaker,
}
impl JsRuntimeState {
pub(crate) fn destroy_all_realms(&mut self) {
self.global_realm.take();
for realm in self.known_realms.drain(..) {
realm.destroy()
}
}
pub(crate) fn remove_realm(
&mut self,
realm_context: &Rc<v8::Global<v8::Context>>,
) {
self
.known_realms
.retain(|realm| !realm.is_same(realm_context));
}
}
fn v8_init(
v8_platform: Option<v8::SharedRef<v8::Platform>>,
predictable: bool,
@ -285,9 +301,13 @@ pub struct RuntimeOptions {
impl Drop for JsRuntime {
fn drop(&mut self) {
// Forcibly destroy all outstanding realms
self.state.borrow_mut().destroy_all_realms();
if let Some(v8_isolate) = self.v8_isolate.as_mut() {
Self::drop_state_and_module_map(v8_isolate);
}
// Ensure that we've correctly dropped all references
debug_assert_eq!(Rc::strong_count(&self.state), 1);
}
}
@ -341,7 +361,6 @@ impl JsRuntime {
has_tick_scheduled: false,
source_map_getter: options.source_map_getter.map(Rc::new),
source_map_cache: Default::default(),
pending_ops: FuturesUnordered::new(),
shared_array_buffer_store: options.shared_array_buffer_store,
compiled_wasm_module_store: options.compiled_wasm_module_store,
op_state: op_state.clone(),
@ -355,20 +374,29 @@ impl JsRuntime {
}));
let weak = Rc::downgrade(&state_rc);
let context_state = Rc::new(RefCell::new(ContextState::default()));
let op_ctxs = ops
.into_iter()
.enumerate()
.map(|(id, decl)| {
OpCtx::new(id as u16, 0, Rc::new(decl), op_state.clone(), weak.clone())
OpCtx::new(
id as u16,
context_state.clone(),
Rc::new(decl),
op_state.clone(),
weak.clone(),
)
})
.collect::<Vec<_>>()
.into_boxed_slice();
context_state.borrow_mut().op_ctxs = op_ctxs;
context_state.borrow_mut().isolate = Some(isolate_ptr);
let snapshot_options = snapshot_util::SnapshotOptions::from_bools(
options.startup_snapshot.is_some(),
options.will_snapshot,
);
let refs = bindings::external_references(&op_ctxs);
let refs = bindings::external_references(&context_state.borrow().op_ctxs);
// V8 takes ownership of external_references.
let refs: &'static v8::ExternalReferences = Box::leak(Box::new(refs));
let global_context;
@ -380,8 +408,11 @@ impl JsRuntime {
let mut isolate = JsRuntime::setup_isolate(snapshot_creator);
{
let scope = &mut v8::HandleScope::new(&mut isolate);
let context =
bindings::initialize_context(scope, &op_ctxs, snapshot_options);
let context = bindings::initialize_context(
scope,
&context_state.borrow().op_ctxs,
snapshot_options,
);
// Get module map data from the snapshot
if has_startup_snapshot {
@ -415,8 +446,11 @@ impl JsRuntime {
let mut isolate = JsRuntime::setup_isolate(isolate);
{
let scope = &mut v8::HandleScope::new(&mut isolate);
let context =
bindings::initialize_context(scope, &op_ctxs, snapshot_options);
let context = bindings::initialize_context(
scope,
&context_state.borrow().op_ctxs,
snapshot_options,
);
// Get module map data from the snapshot
if has_startup_snapshot {
@ -437,13 +471,9 @@ impl JsRuntime {
isolate_ptr.read()
};
global_context.open(&mut isolate).set_slot(
&mut isolate,
Rc::new(RefCell::new(ContextState {
op_ctxs,
..Default::default()
})),
);
global_context
.open(&mut isolate)
.set_slot(&mut isolate, context_state.clone());
op_state.borrow_mut().put(isolate_ptr);
let inspector = if options.inspector {
@ -485,12 +515,16 @@ impl JsRuntime {
));
{
let global_realm = JsRealmInner::new(
context_state,
global_context.clone(),
state_rc.clone(),
true,
);
let mut state = state_rc.borrow_mut();
state.global_realm = Some(JsRealm::new(global_context.clone()));
state.global_realm = Some(JsRealm::new(global_realm.clone()));
state.inspector = inspector;
state
.known_realms
.push(v8::Weak::new(&mut isolate, &global_context));
state.known_realms.push(global_realm);
}
isolate.set_data(
Self::STATE_DATA_OFFSET,
@ -554,9 +588,14 @@ impl JsRuntime {
#[inline]
pub fn global_context(&mut self) -> v8::Global<v8::Context> {
let state = self.state.borrow();
let global_realm = state.global_realm.as_ref().unwrap();
global_realm.context().clone()
self
.state
.borrow()
.known_realms
.get(0)
.unwrap()
.context()
.clone()
}
#[inline]
@ -581,50 +620,50 @@ impl JsRuntime {
/// constructed.
pub fn create_realm(&mut self) -> Result<JsRealm, Error> {
let realm = {
let realm_idx = self.state.borrow().known_realms.len() as u16;
let context_state = Rc::new(RefCell::new(ContextState::default()));
let op_ctxs: Box<[OpCtx]> = self
.global_realm()
.state(self.v8_isolate())
.0
.state()
.borrow()
.op_ctxs
.iter()
.map(|op_ctx| {
OpCtx::new(
op_ctx.id,
realm_idx,
context_state.clone(),
op_ctx.decl.clone(),
op_ctx.state.clone(),
op_ctx.runtime_state.clone(),
)
})
.collect();
context_state.borrow_mut().op_ctxs = op_ctxs;
context_state.borrow_mut().isolate = Some(self.v8_isolate() as _);
let raw_ptr = self.v8_isolate() as *mut v8::OwnedIsolate;
// SAFETY: Having the scope tied to self's lifetime makes it impossible to
// reference JsRuntimeState::op_ctxs while the scope is alive. Here we
// turn it into an unbound lifetime, which is sound because 1. it only
// lives until the end of this block, and 2. the HandleScope only has
// access to the isolate, and nothing else we're accessing from self does.
let scope = &mut v8::HandleScope::new(unsafe {
&mut *(self.v8_isolate() as *mut v8::OwnedIsolate)
});
let context =
bindings::initialize_context(scope, &op_ctxs, self.snapshot_options);
context.set_slot(
let isolate = unsafe { raw_ptr.as_mut() }.unwrap();
let scope = &mut v8::HandleScope::new(isolate);
let context = bindings::initialize_context(
scope,
Rc::new(RefCell::new(ContextState {
op_ctxs,
..Default::default()
})),
&context_state.borrow().op_ctxs,
self.snapshot_options,
);
self
.state
.borrow_mut()
.known_realms
.push(v8::Weak::new(scope, context));
JsRealm::new(v8::Global::new(scope, context))
context.set_slot(scope, context_state.clone());
let realm = JsRealmInner::new(
context_state,
v8::Global::new(scope, context),
self.state.clone(),
false,
);
let mut state = self.state.borrow_mut();
state.known_realms.push(realm.clone());
JsRealm::new(realm)
};
self
@ -915,7 +954,7 @@ impl JsRuntime {
};
// Put global handles in the realm's ContextState
let state_rc = realm.state(self.v8_isolate());
let state_rc = realm.0.state();
let mut state = state_rc.borrow_mut();
state
.js_event_loop_tick_cb
@ -1043,24 +1082,8 @@ impl JsRuntime {
// Drop other v8::Global handles before snapshotting
{
for weak_context in &self.state.clone().borrow().known_realms {
let scope = &mut self.handle_scope();
if let Some(context) = weak_context.to_local(scope) {
let realm = JsRealmLocal::new(context);
let realm_state_rc = realm.state(scope);
let mut realm_state = realm_state_rc.borrow_mut();
std::mem::take(&mut realm_state.js_event_loop_tick_cb);
std::mem::take(&mut realm_state.js_build_custom_error_cb);
std::mem::take(&mut realm_state.js_promise_reject_cb);
std::mem::take(&mut realm_state.js_format_exception_cb);
std::mem::take(&mut realm_state.js_wasm_streaming_cb);
context.clear_all_slots(scope);
}
}
let mut state = self.state.borrow_mut();
state.known_realms.clear();
state.global_realm.take();
let state = self.state.clone();
state.borrow_mut().destroy_all_realms();
}
let snapshot_creator = self.v8_isolate.take().unwrap();
@ -1494,21 +1517,14 @@ impl EventLoopPendingState {
module_map: &ModuleMap,
) -> EventLoopPendingState {
let mut num_unrefed_ops = 0;
if state.known_realms.len() == 1 {
let realm = state.global_realm.as_ref().unwrap();
num_unrefed_ops += realm.state(scope).borrow().unrefed_ops.len();
} else {
for weak_context in &state.known_realms {
if let Some(context) = weak_context.to_local(scope) {
let realm = JsRealmLocal::new(context);
num_unrefed_ops += realm.state(scope).borrow().unrefed_ops.len();
}
}
let mut num_pending_ops = 0;
for realm in &state.known_realms {
num_unrefed_ops += realm.num_unrefed_ops();
num_pending_ops += realm.num_pending_ops();
}
EventLoopPendingState {
has_pending_refed_ops: state.pending_ops.len() > num_unrefed_ops,
has_pending_refed_ops: num_pending_ops > num_unrefed_ops,
has_pending_dyn_imports: module_map.has_pending_dynamic_imports(),
has_pending_dyn_module_evaluation: !state
.pending_dyn_mod_evaluate
@ -1823,7 +1839,8 @@ impl JsRuntime {
.contains(&promise_global);
if !pending_rejection_was_already_handled {
global_realm
.state(tc_scope)
.0
.state()
.borrow_mut()
.pending_promise_rejections
.retain(|(key, _)| key != &promise_global);
@ -2258,52 +2275,33 @@ impl JsRuntime {
let state = self.state.clone();
let scope = &mut self.handle_scope();
let state = state.borrow();
for weak_context in &state.known_realms {
if let Some(context) = weak_context.to_local(scope) {
JsRealmLocal::new(context).check_promise_rejections(scope)?;
}
for realm in &state.known_realms {
realm.check_promise_rejections(scope)?;
}
Ok(())
}
// Polls pending ops and then runs `Deno.core.eventLoopTick` callback.
fn do_js_event_loop_tick(&mut self, cx: &mut Context) -> Result<(), Error> {
// We have a specialized implementation of this method for the common case
// where there is only one realm.
let num_realms = self.state.borrow().known_realms.len();
if num_realms == 1 {
return self.do_single_realm_js_event_loop_tick(cx);
}
// `responses_per_realm[idx]` is a vector containing the promise ID and
// response for all promises in realm `self.state.known_realms[idx]`.
let mut responses_per_realm: Vec<Vec<(PromiseId, OpResult)>> =
(0..num_realms).map(|_| vec![]).collect();
// Now handle actual ops.
{
let mut state = self.state.borrow_mut();
state.have_unpolled_ops = false;
while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx)
{
let (realm_idx, promise_id, op_id, resp) = item;
state.op_state.borrow().tracker.track_async_completed(op_id);
responses_per_realm[realm_idx as usize].push((promise_id, resp));
}
}
// Handle responses for each realm.
let isolate = self.v8_isolate.as_mut().unwrap();
for (realm_idx, responses) in responses_per_realm.into_iter().enumerate() {
let realm = {
let context = self.state.borrow().known_realms[realm_idx]
.to_global(isolate)
.unwrap();
JsRealm::new(context)
};
let context_state_rc = realm.state(isolate);
let mut context_state = context_state_rc.borrow_mut();
let realm_count = self.state.clone().borrow().known_realms.len();
for realm_idx in 0..realm_count {
let realm = self
.state
.borrow()
.known_realms
.get(realm_idx)
.unwrap()
.clone();
let context_state = realm.state();
let mut context_state = context_state.borrow_mut();
let scope = &mut realm.handle_scope(isolate);
// We return async responses to JS in unbounded batches (may change),
@ -2317,9 +2315,19 @@ impl JsRuntime {
// This can handle 15 promises futures in a single batch without heap
// allocations.
let mut args: SmallVec<[v8::Local<v8::Value>; 32]> =
SmallVec::with_capacity(responses.len() * 2 + 2);
SmallVec::with_capacity(32);
for (promise_id, mut resp) in responses {
while let Poll::Ready(Some(item)) =
context_state.pending_ops.poll_next_unpin(cx)
{
let (promise_id, op_id, mut resp) = item;
self
.state
.borrow()
.op_state
.borrow()
.tracker
.track_async_completed(op_id);
context_state.unrefed_ops.remove(&promise_id);
args.push(v8::Integer::new(scope, promise_id).into());
args.push(match resp.to_v8(scope) {
@ -2355,88 +2363,6 @@ impl JsRuntime {
Ok(())
}
fn do_single_realm_js_event_loop_tick(
&mut self,
cx: &mut Context,
) -> Result<(), Error> {
let isolate = self.v8_isolate.as_mut().unwrap();
let scope = &mut self
.state
.borrow()
.global_realm
.as_ref()
.unwrap()
.handle_scope(isolate);
// We return async responses to JS in unbounded batches (may change),
// each batch is a flat vector of tuples:
// `[promise_id1, op_result1, promise_id2, op_result2, ...]`
// promise_id is a simple integer, op_result is an ops::OpResult
// which contains a value OR an error, encoded as a tuple.
// This batch is received in JS via the special `arguments` variable
// and then each tuple is used to resolve or reject promises
//
// This can handle 15 promises futures in a single batch without heap
// allocations.
let mut args: SmallVec<[v8::Local<v8::Value>; 32]> = SmallVec::new();
// Now handle actual ops.
{
let mut state = self.state.borrow_mut();
state.have_unpolled_ops = false;
let realm_state_rc = state.global_realm.as_ref().unwrap().state(scope);
let mut realm_state = realm_state_rc.borrow_mut();
while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx)
{
let (realm_idx, promise_id, op_id, mut resp) = item;
debug_assert_eq!(
state.known_realms[realm_idx as usize],
state.global_realm.as_ref().unwrap().context()
);
realm_state.unrefed_ops.remove(&promise_id);
state.op_state.borrow().tracker.track_async_completed(op_id);
args.push(v8::Integer::new(scope, promise_id).into());
args.push(match resp.to_v8(scope) {
Ok(v) => v,
Err(e) => OpResult::Err(OpError::new(&|_| "TypeError", e.into()))
.to_v8(scope)
.unwrap(),
});
}
}
let has_tick_scheduled =
v8::Boolean::new(scope, self.state.borrow().has_tick_scheduled);
args.push(has_tick_scheduled.into());
let js_event_loop_tick_cb_handle = {
let state = self.state.borrow_mut();
let realm_state_rc = state.global_realm.as_ref().unwrap().state(scope);
let handle = realm_state_rc
.borrow()
.js_event_loop_tick_cb
.clone()
.unwrap();
handle
};
let tc_scope = &mut v8::TryCatch::new(scope);
let js_event_loop_tick_cb = js_event_loop_tick_cb_handle.open(tc_scope);
let this = v8::undefined(tc_scope).into();
js_event_loop_tick_cb.call(tc_scope, this, args.as_slice());
if let Some(exception) = tc_scope.exception() {
return exception_to_err_result(tc_scope, exception, false);
}
if tc_scope.has_terminated() || tc_scope.is_execution_terminating() {
return Ok(());
}
Ok(())
}
}
#[inline]
@ -2459,7 +2385,9 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
.map(|result| crate::_ops::to_op_result(get_class, result))
.boxed_local();
let mut state = runtime_state.borrow_mut();
state
ctx
.context_state
.borrow_mut()
.pending_ops
.push(OpCall::pending(ctx, promise_id, fut));
state.have_unpolled_ops = true;
@ -2551,10 +2479,11 @@ pub fn queue_async_op<'s>(
// which it is invoked. Otherwise, we might have cross-realm object exposure.
// deno_core doesn't currently support such exposure, even though embedders
// can cause them, so we panic in debug mode (since the check is expensive).
debug_assert_eq!(
runtime_state.borrow().known_realms[ctx.realm_idx as usize].to_local(scope),
Some(scope.get_current_context())
);
// TODO(mmastrac): Restore this
// debug_assert_eq!(
// runtime_state.borrow().context(ctx.realm_idx as usize, scope),
// Some(scope.get_current_context())
// );
// All ops are polled immediately
let waker = noop_waker();
@ -2584,7 +2513,7 @@ pub fn queue_async_op<'s>(
// Otherwise we will push it to the `pending_ops` and let it be polled again
// or resolved on the next tick of the event loop.
let mut state = runtime_state.borrow_mut();
state.pending_ops.push(op_call);
ctx.context_state.borrow_mut().pending_ops.push(op_call);
state.have_unpolled_ops = true;
None
}
@ -2716,10 +2645,8 @@ pub mod tests {
.unwrap();
{
let realm = runtime.global_realm();
let isolate = runtime.v8_isolate();
let state_rc = JsRuntime::state(isolate);
assert_eq!(state_rc.borrow().pending_ops.len(), 2);
assert_eq!(realm.state(isolate).borrow().unrefed_ops.len(), 0);
assert_eq!(realm.num_pending_ops(), 2);
assert_eq!(realm.num_unrefed_ops(), 0);
}
runtime
.execute_script_static(
@ -2732,10 +2659,8 @@ pub mod tests {
.unwrap();
{
let realm = runtime.global_realm();
let isolate = runtime.v8_isolate();
let state_rc = JsRuntime::state(isolate);
assert_eq!(state_rc.borrow().pending_ops.len(), 2);
assert_eq!(realm.state(isolate).borrow().unrefed_ops.len(), 2);
assert_eq!(realm.num_pending_ops(), 2);
assert_eq!(realm.num_unrefed_ops(), 2);
}
runtime
.execute_script_static(
@ -2748,10 +2673,8 @@ pub mod tests {
.unwrap();
{
let realm = runtime.global_realm();
let isolate = runtime.v8_isolate();
let state_rc = JsRuntime::state(isolate);
assert_eq!(state_rc.borrow().pending_ops.len(), 2);
assert_eq!(realm.state(isolate).borrow().unrefed_ops.len(), 0);
assert_eq!(realm.num_pending_ops(), 2);
assert_eq!(realm.num_unrefed_ops(), 0);
}
}
@ -4696,6 +4619,71 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", {
}
}
#[tokio::test]
async fn js_realm_gc() {
static INVOKE_COUNT: AtomicUsize = AtomicUsize::new(0);
struct PendingFuture {}
impl Future for PendingFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
Poll::Pending
}
}
impl Drop for PendingFuture {
fn drop(&mut self) {
assert_eq!(INVOKE_COUNT.fetch_sub(1, Ordering::SeqCst), 1);
}
}
// Never resolves.
#[op]
async fn op_pending() {
assert_eq!(INVOKE_COUNT.fetch_add(1, Ordering::SeqCst), 0);
PendingFuture {}.await
}
deno_core::extension!(test_ext, ops = [op_pending]);
let mut runtime = JsRuntime::new(RuntimeOptions {
extensions: vec![test_ext::init_ops()],
..Default::default()
});
// Detect a drop in OpState
let opstate_drop_detect = Rc::new(());
runtime
.op_state()
.borrow_mut()
.put(opstate_drop_detect.clone());
assert_eq!(Rc::strong_count(&opstate_drop_detect), 2);
let other_realm = runtime.create_realm().unwrap();
other_realm
.execute_script(
runtime.v8_isolate(),
"future",
ModuleCode::from_static("Deno.core.opAsync('op_pending')"),
)
.unwrap();
while INVOKE_COUNT.load(Ordering::SeqCst) == 0 {
poll_fn(|cx: &mut Context| runtime.poll_event_loop(cx, false))
.await
.unwrap();
}
drop(other_realm);
while INVOKE_COUNT.load(Ordering::SeqCst) == 1 {
poll_fn(|cx| runtime.poll_event_loop(cx, false))
.await
.unwrap();
}
drop(runtime);
// Make sure the OpState was dropped properly when the runtime dropped
assert_eq!(Rc::strong_count(&opstate_drop_detect), 1);
}
#[tokio::test]
async fn js_realm_ref_unref_ops() {
// Never resolves.