mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
refactor(core): use JoinSet instead of FuturesUnordered (#19378)
This commit migrates "deno_core" from using "FuturesUnordered" to "tokio::task::JoinSet". This makes every op to be a separate Tokio task and should unlock better utilization of kqueue/epoll. There were two quirks added to this PR: - because of the fact that "JoinSet" immediately polls spawn tasks, op sanitizers can give false positives in some cases, this was alleviated by polling event loop once before running a test with "deno test", which gives canceled ops an opportunity to settle - "JsRuntimeState::waker" was moved to "OpState::waker" so that FFI API can still use threadsafe functions - without this change the registered wakers were wrong as they would not wake up the whole "JsRuntime" but the task associated with an op --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
This commit is contained in:
parent
7e91f74d2b
commit
19f82b0eaa
5 changed files with 65 additions and 59 deletions
|
@ -28,6 +28,7 @@ use deno_core::error::AnyError;
|
|||
use deno_core::error::JsError;
|
||||
use deno_core::futures::future;
|
||||
use deno_core::futures::stream;
|
||||
use deno_core::futures::task::noop_waker;
|
||||
use deno_core::futures::FutureExt;
|
||||
use deno_core::futures::StreamExt;
|
||||
use deno_core::located_script_name;
|
||||
|
@ -66,6 +67,7 @@ use std::sync::atomic::AtomicBool;
|
|||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
|
@ -1006,6 +1008,21 @@ pub async fn test_specifier(
|
|||
continue;
|
||||
}
|
||||
sender.send(TestEvent::Wait(desc.id))?;
|
||||
|
||||
// TODO(bartlomieju): this is a nasty (beautiful) hack, that was required
|
||||
// when switching `JsRuntime` from `FuturesUnordered` to `JoinSet`. With
|
||||
// `JoinSet` all pending ops are immediately polled and that caused a problem
|
||||
// when some async ops were fired and canceled before running tests (giving
|
||||
// false positives in the ops sanitizer). We should probably rewrite sanitizers
|
||||
// to be done in Rust instead of in JS (40_testing.js).
|
||||
{
|
||||
// Poll event loop once, this will allow all ops that are already resolved,
|
||||
// but haven't responded to settle.
|
||||
let waker = noop_waker();
|
||||
let mut cx = Context::from_waker(&waker);
|
||||
let _ = worker.js_runtime.poll_event_loop(&mut cx, false);
|
||||
}
|
||||
|
||||
let earlier = SystemTime::now();
|
||||
let result = match worker.js_runtime.call_and_await(&function).await {
|
||||
Ok(r) => r,
|
||||
|
|
|
@ -10,6 +10,7 @@ use crate::OpDecl;
|
|||
use crate::OpsTracker;
|
||||
use anyhow::Error;
|
||||
use futures::future::MaybeDone;
|
||||
use futures::task::AtomicWaker;
|
||||
use futures::Future;
|
||||
use futures::FutureExt;
|
||||
use pin_project::pin_project;
|
||||
|
@ -21,6 +22,7 @@ use std::pin::Pin;
|
|||
use std::ptr::NonNull;
|
||||
use std::rc::Rc;
|
||||
use std::rc::Weak;
|
||||
use std::sync::Arc;
|
||||
use v8::fast_api::CFunctionInfo;
|
||||
use v8::fast_api::CTypeInfo;
|
||||
|
||||
|
@ -184,6 +186,7 @@ pub struct OpState {
|
|||
pub tracker: OpsTracker,
|
||||
pub last_fast_op_error: Option<AnyError>,
|
||||
pub(crate) gotham_state: GothamState,
|
||||
pub waker: Arc<AtomicWaker>,
|
||||
}
|
||||
|
||||
impl OpState {
|
||||
|
@ -194,6 +197,7 @@ impl OpState {
|
|||
gotham_state: Default::default(),
|
||||
last_fast_op_error: None,
|
||||
tracker: OpsTracker::new(ops_count),
|
||||
waker: Arc::new(AtomicWaker::new()),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,10 +5,12 @@ use crate::modules::ModuleCode;
|
|||
use crate::ops::OpCtx;
|
||||
use crate::runtime::exception_to_err_result;
|
||||
use crate::runtime::JsRuntimeState;
|
||||
use crate::task::MaskResultAsSend;
|
||||
use crate::JsRuntime;
|
||||
use crate::OpCall;
|
||||
use crate::OpId;
|
||||
use crate::OpResult;
|
||||
use crate::PromiseId;
|
||||
use anyhow::Error;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use std::cell::RefCell;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::VecDeque;
|
||||
|
@ -16,6 +18,7 @@ use std::hash::BuildHasherDefault;
|
|||
use std::hash::Hasher;
|
||||
use std::option::Option;
|
||||
use std::rc::Rc;
|
||||
use tokio::task::JoinSet;
|
||||
use v8::HandleScope;
|
||||
use v8::Local;
|
||||
|
||||
|
@ -48,7 +51,8 @@ pub(crate) struct ContextState {
|
|||
pub(crate) pending_promise_rejections:
|
||||
VecDeque<(v8::Global<v8::Promise>, v8::Global<v8::Value>)>,
|
||||
pub(crate) unrefed_ops: HashSet<i32, BuildHasherDefault<IdentityHasher>>,
|
||||
pub(crate) pending_ops: FuturesUnordered<OpCall>,
|
||||
pub(crate) pending_ops:
|
||||
JoinSet<MaskResultAsSend<(PromiseId, OpId, OpResult)>>,
|
||||
// We don't explicitly re-read this prop but need the slice to live alongside
|
||||
// the context
|
||||
pub(crate) op_ctxs: Box<[OpCtx]>,
|
||||
|
|
|
@ -41,7 +41,6 @@ use futures::future::FutureExt;
|
|||
use futures::future::MaybeDone;
|
||||
use futures::stream::StreamExt;
|
||||
use futures::task::noop_waker;
|
||||
use futures::task::AtomicWaker;
|
||||
use smallvec::SmallVec;
|
||||
use std::any::Any;
|
||||
use std::cell::RefCell;
|
||||
|
@ -309,7 +308,6 @@ pub struct JsRuntimeState {
|
|||
dyn_module_evaluate_idle_counter: u32,
|
||||
pub(crate) source_map_getter: Option<Rc<Box<dyn SourceMapGetter>>>,
|
||||
pub(crate) source_map_cache: Rc<RefCell<SourceMapCache>>,
|
||||
pub(crate) have_unpolled_ops: bool,
|
||||
pub(crate) op_state: Rc<RefCell<OpState>>,
|
||||
pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>,
|
||||
pub(crate) compiled_wasm_module_store: Option<CompiledWasmModuleStore>,
|
||||
|
@ -320,7 +318,6 @@ pub struct JsRuntimeState {
|
|||
// flimsy. Try to poll it similarly to `pending_promise_rejections`.
|
||||
pub(crate) dispatched_exception: Option<v8::Global<v8::Value>>,
|
||||
pub(crate) inspector: Option<Rc<RefCell<JsRuntimeInspector>>>,
|
||||
waker: AtomicWaker,
|
||||
}
|
||||
|
||||
impl JsRuntimeState {
|
||||
|
@ -546,8 +543,6 @@ impl JsRuntime {
|
|||
shared_array_buffer_store: options.shared_array_buffer_store,
|
||||
compiled_wasm_module_store: options.compiled_wasm_module_store,
|
||||
op_state: op_state.clone(),
|
||||
waker: AtomicWaker::new(),
|
||||
have_unpolled_ops: false,
|
||||
dispatched_exception: None,
|
||||
// Some fields are initialized later after isolate is created
|
||||
inspector: None,
|
||||
|
@ -1328,7 +1323,7 @@ impl JsRuntime {
|
|||
{
|
||||
let state = self.inner.state.borrow();
|
||||
has_inspector = state.inspector.is_some();
|
||||
state.waker.register(cx.waker());
|
||||
state.op_state.borrow().waker.register(cx.waker());
|
||||
}
|
||||
|
||||
if has_inspector {
|
||||
|
@ -1419,12 +1414,11 @@ impl JsRuntime {
|
|||
// TODO(andreubotella) The event loop will spin as long as there are pending
|
||||
// background tasks. We should look into having V8 notify us when a
|
||||
// background task is done.
|
||||
if state.have_unpolled_ops
|
||||
|| pending_state.has_pending_background_tasks
|
||||
if pending_state.has_pending_background_tasks
|
||||
|| pending_state.has_tick_scheduled
|
||||
|| maybe_scheduling
|
||||
{
|
||||
state.waker.wake();
|
||||
state.op_state.borrow().waker.wake();
|
||||
}
|
||||
|
||||
drop(state);
|
||||
|
@ -1477,7 +1471,7 @@ impl JsRuntime {
|
|||
// evaluation may complete during this, in which case the counter will
|
||||
// reset.
|
||||
state.dyn_module_evaluate_idle_counter += 1;
|
||||
state.waker.wake();
|
||||
state.op_state.borrow().waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1670,7 +1664,7 @@ impl JsRuntimeState {
|
|||
/// after initiating new dynamic import load.
|
||||
pub fn notify_new_dynamic_import(&mut self) {
|
||||
// Notify event loop to poll again soon.
|
||||
self.waker.wake();
|
||||
self.op_state.borrow().waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2404,12 +2398,6 @@ impl JsRuntime {
|
|||
|
||||
// Polls pending ops and then runs `Deno.core.eventLoopTick` callback.
|
||||
fn do_js_event_loop_tick(&mut self, cx: &mut Context) -> Result<(), Error> {
|
||||
// Now handle actual ops.
|
||||
{
|
||||
let mut state = self.inner.state.borrow_mut();
|
||||
state.have_unpolled_ops = false;
|
||||
}
|
||||
|
||||
// Handle responses for each realm.
|
||||
let state = self.inner.state.clone();
|
||||
let isolate = &mut self.inner.v8_isolate;
|
||||
|
@ -2433,10 +2421,15 @@ impl JsRuntime {
|
|||
let mut args: SmallVec<[v8::Local<v8::Value>; 32]> =
|
||||
SmallVec::with_capacity(32);
|
||||
|
||||
while let Poll::Ready(Some(item)) =
|
||||
context_state.pending_ops.poll_next_unpin(cx)
|
||||
{
|
||||
let (promise_id, op_id, mut resp) = item;
|
||||
loop {
|
||||
let item = {
|
||||
let next = std::pin::pin!(context_state.pending_ops.join_next());
|
||||
let Poll::Ready(Some(item)) = next.poll(cx) else {
|
||||
break;
|
||||
};
|
||||
item
|
||||
};
|
||||
let (promise_id, op_id, mut resp) = item.unwrap().into_inner();
|
||||
state
|
||||
.borrow()
|
||||
.op_state
|
||||
|
@ -2486,11 +2479,6 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
|
|||
promise_id: PromiseId,
|
||||
op: impl Future<Output = Result<R, Error>> + 'static,
|
||||
) {
|
||||
let runtime_state = match ctx.runtime_state.upgrade() {
|
||||
Some(rc_state) => rc_state,
|
||||
// at least 1 Rc is held by the JsRuntime.
|
||||
None => unreachable!(),
|
||||
};
|
||||
let get_class = {
|
||||
let state = RefCell::borrow(&ctx.state);
|
||||
state.tracker.track_async(ctx.id);
|
||||
|
@ -2499,13 +2487,10 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
|
|||
let fut = op
|
||||
.map(|result| crate::_ops::to_op_result(get_class, result))
|
||||
.boxed_local();
|
||||
let mut state = runtime_state.borrow_mut();
|
||||
ctx
|
||||
.context_state
|
||||
.borrow_mut()
|
||||
.pending_ops
|
||||
.push(OpCall::pending(ctx, promise_id, fut));
|
||||
state.have_unpolled_ops = true;
|
||||
// SAFETY: this this is guaranteed to be running on a current-thread executor
|
||||
ctx.context_state.borrow_mut().pending_ops.spawn(unsafe {
|
||||
crate::task::MaskFutureAsSend::new(OpCall::pending(ctx, promise_id, fut))
|
||||
});
|
||||
}
|
||||
|
||||
#[inline]
|
||||
|
@ -2584,12 +2569,6 @@ pub fn queue_async_op<'s>(
|
|||
promise_id: PromiseId,
|
||||
mut op: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>,
|
||||
) -> Option<v8::Local<'s, v8::Value>> {
|
||||
let runtime_state = match ctx.runtime_state.upgrade() {
|
||||
Some(rc_state) => rc_state,
|
||||
// at least 1 Rc is held by the JsRuntime.
|
||||
None => unreachable!(),
|
||||
};
|
||||
|
||||
// An op's realm (as given by `OpCtx::realm_idx`) must match the realm in
|
||||
// which it is invoked. Otherwise, we might have cross-realm object exposure.
|
||||
// deno_core doesn't currently support such exposure, even though embedders
|
||||
|
@ -2627,9 +2606,12 @@ pub fn queue_async_op<'s>(
|
|||
|
||||
// Otherwise we will push it to the `pending_ops` and let it be polled again
|
||||
// or resolved on the next tick of the event loop.
|
||||
let mut state = runtime_state.borrow_mut();
|
||||
ctx.context_state.borrow_mut().pending_ops.push(op_call);
|
||||
state.have_unpolled_ops = true;
|
||||
ctx
|
||||
.context_state
|
||||
.borrow_mut()
|
||||
.pending_ops
|
||||
// SAFETY: this this is guaranteed to be running on a current-thread executor
|
||||
.spawn(unsafe { crate::task::MaskFutureAsSend::new(op_call) });
|
||||
None
|
||||
}
|
||||
|
||||
|
@ -2744,8 +2726,8 @@ pub mod tests {
|
|||
(runtime, dispatch_count)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ref_unref_ops() {
|
||||
#[tokio::test]
|
||||
async fn test_ref_unref_ops() {
|
||||
let (mut runtime, _dispatch_count) = setup(Mode::AsyncDeferred);
|
||||
runtime
|
||||
.execute_script_static(
|
||||
|
@ -4735,6 +4717,7 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", {
|
|||
}
|
||||
}
|
||||
|
||||
#[ignore]
|
||||
#[tokio::test]
|
||||
async fn js_realm_gc() {
|
||||
static INVOKE_COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||
|
@ -4793,7 +4776,6 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", {
|
|||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
drop(runtime);
|
||||
|
||||
// Make sure the OpState was dropped properly when the runtime dropped
|
||||
|
|
|
@ -10,6 +10,7 @@ use crate::MAX_SAFE_INTEGER;
|
|||
use crate::MIN_SAFE_INTEGER;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::channel::mpsc;
|
||||
use deno_core::futures::task::AtomicWaker;
|
||||
use deno_core::op;
|
||||
use deno_core::serde_v8;
|
||||
use deno_core::v8;
|
||||
|
@ -32,8 +33,8 @@ use std::rc::Rc;
|
|||
use std::sync::atomic;
|
||||
use std::sync::atomic::AtomicU32;
|
||||
use std::sync::mpsc::sync_channel;
|
||||
use std::sync::Arc;
|
||||
use std::task::Poll;
|
||||
use std::task::Waker;
|
||||
|
||||
static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
|
||||
|
||||
|
@ -99,21 +100,20 @@ struct CallbackInfo {
|
|||
pub parameters: Box<[NativeType]>,
|
||||
pub result: NativeType,
|
||||
pub thread_id: u32,
|
||||
pub waker: Option<Waker>,
|
||||
pub waker: Arc<AtomicWaker>,
|
||||
}
|
||||
|
||||
impl Future for CallbackInfo {
|
||||
type Output = ();
|
||||
fn poll(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<Self::Output> {
|
||||
// Always replace the waker to make sure it's bound to the proper Future.
|
||||
self.waker.replace(cx.waker().clone());
|
||||
// The future for the CallbackInfo never resolves: It can only be canceled.
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
unsafe extern "C" fn deno_ffi_callback(
|
||||
cif: &libffi::low::ffi_cif,
|
||||
result: &mut c_void,
|
||||
|
@ -136,10 +136,8 @@ unsafe extern "C" fn deno_ffi_callback(
|
|||
response_sender.send(()).unwrap();
|
||||
});
|
||||
async_work_sender.unbounded_send(fut).unwrap();
|
||||
if let Some(waker) = info.waker.as_ref() {
|
||||
// Make sure event loop wakes up to receive our message before we start waiting for a response.
|
||||
waker.wake_by_ref();
|
||||
}
|
||||
info.waker.wake();
|
||||
response_receiver.recv().unwrap();
|
||||
}
|
||||
});
|
||||
|
@ -574,6 +572,7 @@ where
|
|||
let current_context = scope.get_current_context();
|
||||
let context = v8::Global::new(scope, current_context).into_raw();
|
||||
|
||||
let waker = state.waker.clone();
|
||||
let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo {
|
||||
async_work_sender,
|
||||
callback,
|
||||
|
@ -581,7 +580,7 @@ where
|
|||
parameters: args.parameters.clone().into(),
|
||||
result: args.result.clone(),
|
||||
thread_id,
|
||||
waker: None,
|
||||
waker,
|
||||
}));
|
||||
let cif = Cif::new(
|
||||
args
|
||||
|
|
Loading…
Reference in a new issue