1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 16:42:21 -05:00

refactor(core): use JoinSet instead of FuturesUnordered (#19378)

This commit migrates "deno_core" from using "FuturesUnordered" to
"tokio::task::JoinSet". This makes every op to be a separate Tokio task
and should unlock better utilization of kqueue/epoll.

There were two quirks added to this PR:
- because of the fact that "JoinSet" immediately polls spawn tasks,
op sanitizers can give false positives in some cases, this was
alleviated by polling event loop once before running a test with 
"deno test", which gives canceled ops an opportunity to settle
- "JsRuntimeState::waker" was moved to "OpState::waker" so that FFI
API can still use threadsafe functions - without this change the
registered wakers were wrong as they would not wake up the 
whole "JsRuntime" but the task associated with an op

---------

Co-authored-by: Matt Mastracci <matthew@mastracci.com>
This commit is contained in:
Bartek Iwańczuk 2023-06-07 23:50:14 +02:00
parent 853719d3d3
commit 220a7d544a
No known key found for this signature in database
GPG key ID: 0C6BCDDC3B3AD750
5 changed files with 65 additions and 59 deletions

View file

@ -28,6 +28,7 @@ use deno_core::error::AnyError;
use deno_core::error::JsError;
use deno_core::futures::future;
use deno_core::futures::stream;
use deno_core::futures::task::noop_waker;
use deno_core::futures::FutureExt;
use deno_core::futures::StreamExt;
use deno_core::located_script_name;
@ -66,6 +67,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::Context;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
@ -1006,6 +1008,21 @@ pub async fn test_specifier(
continue;
}
sender.send(TestEvent::Wait(desc.id))?;
// TODO(bartlomieju): this is a nasty (beautiful) hack, that was required
// when switching `JsRuntime` from `FuturesUnordered` to `JoinSet`. With
// `JoinSet` all pending ops are immediately polled and that caused a problem
// when some async ops were fired and canceled before running tests (giving
// false positives in the ops sanitizer). We should probably rewrite sanitizers
// to be done in Rust instead of in JS (40_testing.js).
{
// Poll event loop once, this will allow all ops that are already resolved,
// but haven't responded to settle.
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = worker.js_runtime.poll_event_loop(&mut cx, false);
}
let earlier = SystemTime::now();
let result = match worker.js_runtime.call_and_await(&function).await {
Ok(r) => r,

View file

@ -10,6 +10,7 @@ use crate::OpDecl;
use crate::OpsTracker;
use anyhow::Error;
use futures::future::MaybeDone;
use futures::task::AtomicWaker;
use futures::Future;
use futures::FutureExt;
use pin_project::pin_project;
@ -21,6 +22,7 @@ use std::pin::Pin;
use std::ptr::NonNull;
use std::rc::Rc;
use std::rc::Weak;
use std::sync::Arc;
use v8::fast_api::CFunctionInfo;
use v8::fast_api::CTypeInfo;
@ -184,6 +186,7 @@ pub struct OpState {
pub tracker: OpsTracker,
pub last_fast_op_error: Option<AnyError>,
pub(crate) gotham_state: GothamState,
pub waker: Arc<AtomicWaker>,
}
impl OpState {
@ -194,6 +197,7 @@ impl OpState {
gotham_state: Default::default(),
last_fast_op_error: None,
tracker: OpsTracker::new(ops_count),
waker: Arc::new(AtomicWaker::new()),
}
}

View file

@ -5,10 +5,12 @@ use crate::modules::ModuleCode;
use crate::ops::OpCtx;
use crate::runtime::exception_to_err_result;
use crate::runtime::JsRuntimeState;
use crate::task::MaskResultAsSend;
use crate::JsRuntime;
use crate::OpCall;
use crate::OpId;
use crate::OpResult;
use crate::PromiseId;
use anyhow::Error;
use futures::stream::FuturesUnordered;
use std::cell::RefCell;
use std::collections::HashSet;
use std::collections::VecDeque;
@ -16,6 +18,7 @@ use std::hash::BuildHasherDefault;
use std::hash::Hasher;
use std::option::Option;
use std::rc::Rc;
use tokio::task::JoinSet;
use v8::HandleScope;
use v8::Local;
@ -48,7 +51,8 @@ pub(crate) struct ContextState {
pub(crate) pending_promise_rejections:
VecDeque<(v8::Global<v8::Promise>, v8::Global<v8::Value>)>,
pub(crate) unrefed_ops: HashSet<i32, BuildHasherDefault<IdentityHasher>>,
pub(crate) pending_ops: FuturesUnordered<OpCall>,
pub(crate) pending_ops:
JoinSet<MaskResultAsSend<(PromiseId, OpId, OpResult)>>,
// We don't explicitly re-read this prop but need the slice to live alongside
// the context
pub(crate) op_ctxs: Box<[OpCtx]>,

View file

@ -41,7 +41,6 @@ use futures::future::FutureExt;
use futures::future::MaybeDone;
use futures::stream::StreamExt;
use futures::task::noop_waker;
use futures::task::AtomicWaker;
use smallvec::SmallVec;
use std::any::Any;
use std::cell::RefCell;
@ -309,7 +308,6 @@ pub struct JsRuntimeState {
dyn_module_evaluate_idle_counter: u32,
pub(crate) source_map_getter: Option<Rc<Box<dyn SourceMapGetter>>>,
pub(crate) source_map_cache: Rc<RefCell<SourceMapCache>>,
pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>,
pub(crate) compiled_wasm_module_store: Option<CompiledWasmModuleStore>,
@ -320,7 +318,6 @@ pub struct JsRuntimeState {
// flimsy. Try to poll it similarly to `pending_promise_rejections`.
pub(crate) dispatched_exception: Option<v8::Global<v8::Value>>,
pub(crate) inspector: Option<Rc<RefCell<JsRuntimeInspector>>>,
waker: AtomicWaker,
}
impl JsRuntimeState {
@ -546,8 +543,6 @@ impl JsRuntime {
shared_array_buffer_store: options.shared_array_buffer_store,
compiled_wasm_module_store: options.compiled_wasm_module_store,
op_state: op_state.clone(),
waker: AtomicWaker::new(),
have_unpolled_ops: false,
dispatched_exception: None,
// Some fields are initialized later after isolate is created
inspector: None,
@ -1328,7 +1323,7 @@ impl JsRuntime {
{
let state = self.inner.state.borrow();
has_inspector = state.inspector.is_some();
state.waker.register(cx.waker());
state.op_state.borrow().waker.register(cx.waker());
}
if has_inspector {
@ -1419,12 +1414,11 @@ impl JsRuntime {
// TODO(andreubotella) The event loop will spin as long as there are pending
// background tasks. We should look into having V8 notify us when a
// background task is done.
if state.have_unpolled_ops
|| pending_state.has_pending_background_tasks
if pending_state.has_pending_background_tasks
|| pending_state.has_tick_scheduled
|| maybe_scheduling
{
state.waker.wake();
state.op_state.borrow().waker.wake();
}
drop(state);
@ -1477,7 +1471,7 @@ impl JsRuntime {
// evaluation may complete during this, in which case the counter will
// reset.
state.dyn_module_evaluate_idle_counter += 1;
state.waker.wake();
state.op_state.borrow().waker.wake();
}
}
@ -1670,7 +1664,7 @@ impl JsRuntimeState {
/// after initiating new dynamic import load.
pub fn notify_new_dynamic_import(&mut self) {
// Notify event loop to poll again soon.
self.waker.wake();
self.op_state.borrow().waker.wake();
}
}
@ -2404,12 +2398,6 @@ impl JsRuntime {
// Polls pending ops and then runs `Deno.core.eventLoopTick` callback.
fn do_js_event_loop_tick(&mut self, cx: &mut Context) -> Result<(), Error> {
// Now handle actual ops.
{
let mut state = self.inner.state.borrow_mut();
state.have_unpolled_ops = false;
}
// Handle responses for each realm.
let state = self.inner.state.clone();
let isolate = &mut self.inner.v8_isolate;
@ -2433,10 +2421,15 @@ impl JsRuntime {
let mut args: SmallVec<[v8::Local<v8::Value>; 32]> =
SmallVec::with_capacity(32);
while let Poll::Ready(Some(item)) =
context_state.pending_ops.poll_next_unpin(cx)
{
let (promise_id, op_id, mut resp) = item;
loop {
let item = {
let next = std::pin::pin!(context_state.pending_ops.join_next());
let Poll::Ready(Some(item)) = next.poll(cx) else {
break;
};
item
};
let (promise_id, op_id, mut resp) = item.unwrap().into_inner();
state
.borrow()
.op_state
@ -2486,11 +2479,6 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
promise_id: PromiseId,
op: impl Future<Output = Result<R, Error>> + 'static,
) {
let runtime_state = match ctx.runtime_state.upgrade() {
Some(rc_state) => rc_state,
// at least 1 Rc is held by the JsRuntime.
None => unreachable!(),
};
let get_class = {
let state = RefCell::borrow(&ctx.state);
state.tracker.track_async(ctx.id);
@ -2499,13 +2487,10 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
let fut = op
.map(|result| crate::_ops::to_op_result(get_class, result))
.boxed_local();
let mut state = runtime_state.borrow_mut();
ctx
.context_state
.borrow_mut()
.pending_ops
.push(OpCall::pending(ctx, promise_id, fut));
state.have_unpolled_ops = true;
// SAFETY: this this is guaranteed to be running on a current-thread executor
ctx.context_state.borrow_mut().pending_ops.spawn(unsafe {
crate::task::MaskFutureAsSend::new(OpCall::pending(ctx, promise_id, fut))
});
}
#[inline]
@ -2584,12 +2569,6 @@ pub fn queue_async_op<'s>(
promise_id: PromiseId,
mut op: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>,
) -> Option<v8::Local<'s, v8::Value>> {
let runtime_state = match ctx.runtime_state.upgrade() {
Some(rc_state) => rc_state,
// at least 1 Rc is held by the JsRuntime.
None => unreachable!(),
};
// An op's realm (as given by `OpCtx::realm_idx`) must match the realm in
// which it is invoked. Otherwise, we might have cross-realm object exposure.
// deno_core doesn't currently support such exposure, even though embedders
@ -2627,9 +2606,12 @@ pub fn queue_async_op<'s>(
// Otherwise we will push it to the `pending_ops` and let it be polled again
// or resolved on the next tick of the event loop.
let mut state = runtime_state.borrow_mut();
ctx.context_state.borrow_mut().pending_ops.push(op_call);
state.have_unpolled_ops = true;
ctx
.context_state
.borrow_mut()
.pending_ops
// SAFETY: this this is guaranteed to be running on a current-thread executor
.spawn(unsafe { crate::task::MaskFutureAsSend::new(op_call) });
None
}
@ -2744,8 +2726,8 @@ pub mod tests {
(runtime, dispatch_count)
}
#[test]
fn test_ref_unref_ops() {
#[tokio::test]
async fn test_ref_unref_ops() {
let (mut runtime, _dispatch_count) = setup(Mode::AsyncDeferred);
runtime
.execute_script_static(
@ -4735,6 +4717,7 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", {
}
}
#[ignore]
#[tokio::test]
async fn js_realm_gc() {
static INVOKE_COUNT: AtomicUsize = AtomicUsize::new(0);
@ -4793,7 +4776,6 @@ Deno.core.opAsync("op_async_serialize_object_with_numbers_as_keys", {
.await
.unwrap();
}
drop(runtime);
// Make sure the OpState was dropped properly when the runtime dropped

View file

@ -10,6 +10,7 @@ use crate::MAX_SAFE_INTEGER;
use crate::MIN_SAFE_INTEGER;
use deno_core::error::AnyError;
use deno_core::futures::channel::mpsc;
use deno_core::futures::task::AtomicWaker;
use deno_core::op;
use deno_core::serde_v8;
use deno_core::v8;
@ -32,8 +33,8 @@ use std::rc::Rc;
use std::sync::atomic;
use std::sync::atomic::AtomicU32;
use std::sync::mpsc::sync_channel;
use std::sync::Arc;
use std::task::Poll;
use std::task::Waker;
static THREAD_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
@ -99,21 +100,20 @@ struct CallbackInfo {
pub parameters: Box<[NativeType]>,
pub result: NativeType,
pub thread_id: u32,
pub waker: Option<Waker>,
pub waker: Arc<AtomicWaker>,
}
impl Future for CallbackInfo {
type Output = ();
fn poll(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
// Always replace the waker to make sure it's bound to the proper Future.
self.waker.replace(cx.waker().clone());
// The future for the CallbackInfo never resolves: It can only be canceled.
Poll::Pending
}
}
unsafe extern "C" fn deno_ffi_callback(
cif: &libffi::low::ffi_cif,
result: &mut c_void,
@ -136,10 +136,8 @@ unsafe extern "C" fn deno_ffi_callback(
response_sender.send(()).unwrap();
});
async_work_sender.unbounded_send(fut).unwrap();
if let Some(waker) = info.waker.as_ref() {
// Make sure event loop wakes up to receive our message before we start waiting for a response.
waker.wake_by_ref();
}
info.waker.wake();
response_receiver.recv().unwrap();
}
});
@ -574,6 +572,7 @@ where
let current_context = scope.get_current_context();
let context = v8::Global::new(scope, current_context).into_raw();
let waker = state.waker.clone();
let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo {
async_work_sender,
callback,
@ -581,7 +580,7 @@ where
parameters: args.parameters.clone().into(),
result: args.result.clone(),
thread_id,
waker: None,
waker,
}));
let cif = Cif::new(
args