diff --git a/cli/tools/test.rs b/cli/tools/test.rs index ebe4deb9ae..6f32d69e49 100644 --- a/cli/tools/test.rs +++ b/cli/tools/test.rs @@ -28,6 +28,7 @@ use deno_core::error::AnyError; use deno_core::error::JsError; use deno_core::futures::future; use deno_core::futures::stream; +use deno_core::futures::task::noop_waker; use deno_core::futures::FutureExt; use deno_core::futures::StreamExt; use deno_core::located_script_name; @@ -66,6 +67,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::task::Context; use std::time::Duration; use std::time::Instant; use std::time::SystemTime; @@ -1006,6 +1008,21 @@ pub async fn test_specifier( continue; } sender.send(TestEvent::Wait(desc.id))?; + + // TODO(bartlomieju): this is a nasty (beautiful) hack, that was required + // when switching `JsRuntime` from `FuturesUnordered` to `JoinSet`. With + // `JoinSet` all pending ops are immediately polled and that caused a problem + // when some async ops were fired and canceled before running tests (giving + // false positives in the ops sanitizer). We should probably rewrite sanitizers + // to be done in Rust instead of in JS (40_testing.js). + { + // Poll event loop once, this will allow all ops that are already resolved, + // but haven't responded to settle. + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); + let _ = worker.js_runtime.poll_event_loop(&mut cx, false); + } + let earlier = SystemTime::now(); let result = match worker.js_runtime.call_and_await(&function).await { Ok(r) => r, diff --git a/core/ops.rs b/core/ops.rs index 5f1bf67ef6..b766eb60d2 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -10,6 +10,7 @@ use crate::OpDecl; use crate::OpsTracker; use anyhow::Error; use futures::future::MaybeDone; +use futures::task::AtomicWaker; use futures::Future; use futures::FutureExt; use pin_project::pin_project; @@ -21,6 +22,7 @@ use std::pin::Pin; use std::ptr::NonNull; use std::rc::Rc; use std::rc::Weak; +use std::sync::Arc; use v8::fast_api::CFunctionInfo; use v8::fast_api::CTypeInfo; @@ -184,6 +186,7 @@ pub struct OpState { pub tracker: OpsTracker, pub last_fast_op_error: Option, pub(crate) gotham_state: GothamState, + pub waker: Arc, } impl OpState { @@ -194,6 +197,7 @@ impl OpState { gotham_state: Default::default(), last_fast_op_error: None, tracker: OpsTracker::new(ops_count), + waker: Arc::new(AtomicWaker::new()), } } diff --git a/core/realm.rs b/core/realm.rs index 94ce77464d..d18f41e662 100644 --- a/core/realm.rs +++ b/core/realm.rs @@ -5,10 +5,12 @@ use crate::modules::ModuleCode; use crate::ops::OpCtx; use crate::runtime::exception_to_err_result; use crate::runtime::JsRuntimeState; +use crate::task::MaskResultAsSend; use crate::JsRuntime; -use crate::OpCall; +use crate::OpId; +use crate::OpResult; +use crate::PromiseId; use anyhow::Error; -use futures::stream::FuturesUnordered; use std::cell::RefCell; use std::collections::HashSet; use std::collections::VecDeque; @@ -16,6 +18,7 @@ use std::hash::BuildHasherDefault; use std::hash::Hasher; use std::option::Option; use std::rc::Rc; +use tokio::task::JoinSet; use v8::HandleScope; use v8::Local; @@ -48,7 +51,8 @@ pub(crate) struct ContextState { pub(crate) pending_promise_rejections: VecDeque<(v8::Global, v8::Global)>, pub(crate) unrefed_ops: HashSet>, - pub(crate) pending_ops: FuturesUnordered, + pub(crate) pending_ops: + JoinSet>, // We don't explicitly re-read this prop but need the slice to live alongside // the context pub(crate) op_ctxs: Box<[OpCtx]>, diff --git a/core/runtime.rs b/core/runtime.rs index a27717a8b4..ecfd0bd571 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -41,7 +41,6 @@ use futures::future::FutureExt; use futures::future::MaybeDone; use futures::stream::StreamExt; use futures::task::noop_waker; -use futures::task::AtomicWaker; use smallvec::SmallVec; use std::any::Any; use std::cell::RefCell; @@ -309,7 +308,6 @@ pub struct JsRuntimeState { dyn_module_evaluate_idle_counter: u32, pub(crate) source_map_getter: Option>>, pub(crate) source_map_cache: Rc>, - pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc>, pub(crate) shared_array_buffer_store: Option, pub(crate) compiled_wasm_module_store: Option, @@ -320,7 +318,6 @@ pub struct JsRuntimeState { // flimsy. Try to poll it similarly to `pending_promise_rejections`. pub(crate) dispatched_exception: Option>, pub(crate) inspector: Option>>, - waker: AtomicWaker, } impl JsRuntimeState { @@ -546,8 +543,6 @@ impl JsRuntime { shared_array_buffer_store: options.shared_array_buffer_store, compiled_wasm_module_store: options.compiled_wasm_module_store, op_state: op_state.clone(), - waker: AtomicWaker::new(), - have_unpolled_ops: false, dispatched_exception: None, // Some fields are initialized later after isolate is created inspector: None, @@ -1328,7 +1323,7 @@ impl JsRuntime { { let state = self.inner.state.borrow(); has_inspector = state.inspector.is_some(); - state.waker.register(cx.waker()); + state.op_state.borrow().waker.register(cx.waker()); } if has_inspector { @@ -1419,12 +1414,11 @@ impl JsRuntime { // TODO(andreubotella) The event loop will spin as long as there are pending // background tasks. We should look into having V8 notify us when a // background task is done. - if state.have_unpolled_ops - || pending_state.has_pending_background_tasks + if pending_state.has_pending_background_tasks || pending_state.has_tick_scheduled || maybe_scheduling { - state.waker.wake(); + state.op_state.borrow().waker.wake(); } drop(state); @@ -1477,7 +1471,7 @@ impl JsRuntime { // evaluation may complete during this, in which case the counter will // reset. state.dyn_module_evaluate_idle_counter += 1; - state.waker.wake(); + state.op_state.borrow().waker.wake(); } } @@ -1670,7 +1664,7 @@ impl JsRuntimeState { /// after initiating new dynamic import load. pub fn notify_new_dynamic_import(&mut self) { // Notify event loop to poll again soon. - self.waker.wake(); + self.op_state.borrow().waker.wake(); } } @@ -2404,12 +2398,6 @@ impl JsRuntime { // Polls pending ops and then runs `Deno.core.eventLoopTick` callback. fn do_js_event_loop_tick(&mut self, cx: &mut Context) -> Result<(), Error> { - // Now handle actual ops. - { - let mut state = self.inner.state.borrow_mut(); - state.have_unpolled_ops = false; - } - // Handle responses for each realm. let state = self.inner.state.clone(); let isolate = &mut self.inner.v8_isolate; @@ -2433,10 +2421,15 @@ impl JsRuntime { let mut args: SmallVec<[v8::Local; 32]> = SmallVec::with_capacity(32); - while let Poll::Ready(Some(item)) = - context_state.pending_ops.poll_next_unpin(cx) - { - let (promise_id, op_id, mut resp) = item; + loop { + let item = { + let next = std::pin::pin!(context_state.pending_ops.join_next()); + let Poll::Ready(Some(item)) = next.poll(cx) else { + break; + }; + item + }; + let (promise_id, op_id, mut resp) = item.unwrap().into_inner(); state .borrow() .op_state @@ -2486,11 +2479,6 @@ pub fn queue_fast_async_op( promise_id: PromiseId, op: impl Future> + 'static, ) { - let runtime_state = match ctx.runtime_state.upgrade() { - Some(rc_state) => rc_state, - // at least 1 Rc is held by the JsRuntime. - None => unreachable!(), - }; let get_class = { let state = RefCell::borrow(&ctx.state); state.tracker.track_async(ctx.id); @@ -2499,13 +2487,10 @@ pub fn queue_fast_async_op( let fut = op .map(|result| crate::_ops::to_op_result(get_class, result)) .boxed_local(); - let mut state = runtime_state.borrow_mut(); - ctx - .context_state - .borrow_mut() - .pending_ops - .push(OpCall::pending(ctx, promise_id, fut)); - state.have_unpolled_ops = true; + // SAFETY: this this is guaranteed to be running on a current-thread executor + ctx.context_state.borrow_mut().pending_ops.spawn(unsafe { + crate::task::MaskFutureAsSend::new(OpCall::pending(ctx, promise_id, fut)) + }); } #[inline] @@ -2584,12 +2569,6 @@ pub fn queue_async_op<'s>( promise_id: PromiseId, mut op: MaybeDone>>>, ) -> Option> { - let runtime_state = match ctx.runtime_state.upgrade() { - Some(rc_state) => rc_state, - // at least 1 Rc is held by the JsRuntime. - None => unreachable!(), - }; - // An op's realm (as given by `OpCtx::realm_idx`) must match the realm in // which it is invoked. Otherwise, we might have cross-realm object exposure. // deno_core doesn't currently support such exposure, even though embedders @@ -2627,9 +2606,12 @@ pub fn queue_async_op<'s>( // Otherwise we will push it to the `pending_ops` and let it be polled again // or resolved on the next tick of the event loop. - let mut state = runtime_state.borrow_mut(); - ctx.context_state.borrow_mut().pending_ops.push(op_call); - state.have_unpolled_ops = true; + ctx + .context_state + .borrow_mut() + .pending_ops + // SAFETY: this this is guaranteed to be running on a current-thread executor + .spawn(unsafe { crate::task::MaskFutureAsSend::new(op_call) }); None } @@ -2744,8 +2726,8 @@ pub mod tests { (runtime, dispatch_count) } - #[test] - fn test_ref_unref_ops() { + #[tokio::test] + async fn test_ref_unref_ops() { let (mut runtime, _dispatch_count) = setup(Mode::AsyncDeferred); runtime .execute_script_static( @@ -4735,6 +4717,7 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", { } } + #[ignore] #[tokio::test] async fn js_realm_gc() { static INVOKE_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -4793,7 +4776,6 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", { .await .unwrap(); } - drop(runtime); // Make sure the OpState was dropped properly when the runtime dropped diff --git a/ext/ffi/callback.rs b/ext/ffi/callback.rs index 2d2cf491be..78a21ab8f4 100644 --- a/ext/ffi/callback.rs +++ b/ext/ffi/callback.rs @@ -10,6 +10,7 @@ use crate::MAX_SAFE_INTEGER; use crate::MIN_SAFE_INTEGER; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; +use deno_core::futures::task::AtomicWaker; use deno_core::op; use deno_core::serde_v8; use deno_core::v8; @@ -32,8 +33,8 @@ use std::rc::Rc; use std::sync::atomic; use std::sync::atomic::AtomicU32; use std::sync::mpsc::sync_channel; +use std::sync::Arc; use std::task::Poll; -use std::task::Waker; static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1); @@ -99,21 +100,20 @@ struct CallbackInfo { pub parameters: Box<[NativeType]>, pub result: NativeType, pub thread_id: u32, - pub waker: Option, + pub waker: Arc, } impl Future for CallbackInfo { type Output = (); fn poll( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, ) -> std::task::Poll { - // Always replace the waker to make sure it's bound to the proper Future. - self.waker.replace(cx.waker().clone()); // The future for the CallbackInfo never resolves: It can only be canceled. Poll::Pending } } + unsafe extern "C" fn deno_ffi_callback( cif: &libffi::low::ffi_cif, result: &mut c_void, @@ -136,10 +136,8 @@ unsafe extern "C" fn deno_ffi_callback( response_sender.send(()).unwrap(); }); async_work_sender.unbounded_send(fut).unwrap(); - if let Some(waker) = info.waker.as_ref() { - // Make sure event loop wakes up to receive our message before we start waiting for a response. - waker.wake_by_ref(); - } + // Make sure event loop wakes up to receive our message before we start waiting for a response. + info.waker.wake(); response_receiver.recv().unwrap(); } }); @@ -574,6 +572,7 @@ where let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); + let waker = state.waker.clone(); let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo { async_work_sender, callback, @@ -581,7 +580,7 @@ where parameters: args.parameters.clone().into(), result: args.result.clone(), thread_id, - waker: None, + waker, })); let cif = Cif::new( args