1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-10 08:09:06 -05:00

fix(core): Ensure we don't lose the waker when polling an empty JoinSet (#19655)

This is a reproduction and fix for a very obscure bug where the Deno
runtime locks up we end up polling an empty JoinSet and attempt to
resolve ops after-the-fact. There's a small footgun in the JoinSet API
where polling it while empty returns Ready(None), which means that it
never holds on to the waker. This means that if we aren't testing for
this particular return value and don't stash the waker ourselves for a
future async op to eventually queue, we can end up losing the waker
entirely and the op wakes up, notifies tokio, which notifies the
JoinSet, which then has nobody to notify 😢.

Co-authored-by: Luca Casonato <hello@lcas.dev>
Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
Matt Mastracci 2023-06-29 10:01:54 -06:00 committed by GitHub
parent 93b3ff0170
commit 98df69fd4c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 204 additions and 28 deletions

7
Cargo.lock generated
View file

@ -607,6 +607,12 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e"
[[package]]
name = "cooked-waker"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147be55d677052dabc6b22252d5dd0fd4c29c8c27aa4f2fbef0f94aa003b406f"
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.3" version = "0.9.3"
@ -971,6 +977,7 @@ version = "0.191.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
"cooked-waker",
"deno_ast", "deno_ast",
"deno_ops", "deno_ops",
"futures", "futures",

View file

@ -46,4 +46,5 @@ path = "examples/http_bench_json_ops/main.rs"
# These dependencies are only used for the 'http_bench_*_ops' examples. # These dependencies are only used for the 'http_bench_*_ops' examples.
[dev-dependencies] [dev-dependencies]
cooked-waker = "5"
deno_ast.workspace = true deno_ast.workspace = true

92
core/joinset.rs Normal file
View file

@ -0,0 +1,92 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Some code and comments under MIT license where adapted from Tokio code
// Copyright (c) 2023 Tokio Contributors
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
use futures::Future;
use tokio::task::AbortHandle;
use tokio::task::JoinError;
use crate::task::MaskFutureAsSend;
use crate::task::MaskResultAsSend;
/// Wraps the tokio [`JoinSet`] to make it !Send-friendly and to make it easier and safer for us to
/// poll while empty.
pub(crate) struct JoinSet<T> {
joinset: tokio::task::JoinSet<MaskResultAsSend<T>>,
/// If join_next returns Ready(None), we stash the waker
waker: Option<Waker>,
}
impl<T> Default for JoinSet<T> {
fn default() -> Self {
Self {
joinset: Default::default(),
waker: None,
}
}
}
impl<T: 'static> JoinSet<T> {
/// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`]
/// that can be used to remotely cancel the task.
///
/// The provided future will start running in the background immediately
/// when this method is called, even if you don't await anything on this
/// `JoinSet`.
///
/// # Panics
///
/// This method panics if called outside of a Tokio runtime.
///
/// [`AbortHandle`]: tokio::task::AbortHandle
#[track_caller]
pub fn spawn<F>(&mut self, task: F) -> AbortHandle
where
F: Future<Output = T>,
F: 'static,
T: 'static,
{
// SAFETY: We only use this with the single-thread executor
let handle = self.joinset.spawn(unsafe { MaskFutureAsSend::new(task) });
// If someone had called poll_join_next while we were empty, ask them to poll again
// so we can properly register the waker with the underlying JoinSet.
if let Some(waker) = self.waker.take() {
waker.wake();
}
handle
}
/// Returns the number of tasks currently in the `JoinSet`.
pub fn len(&self) -> usize {
self.joinset.len()
}
/// Waits until one of the tasks in the set completes and returns its output.
///
/// # Cancel Safety
///
/// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!`
/// statement and some other branch completes first, it is guaranteed that no tasks were
/// removed from this `JoinSet`.
pub fn poll_join_next(
&mut self,
cx: &mut Context,
) -> Poll<Result<T, JoinError>> {
// TODO(mmastrac): Use poll_join_next from Tokio
let next = std::pin::pin!(self.joinset.join_next());
match next.poll(cx) {
Poll::Ready(Some(res)) => Poll::Ready(res.map(|res| res.into_inner())),
Poll::Ready(None) => {
// Stash waker
self.waker = Some(cx.waker().clone());
Poll::Pending
}
Poll::Pending => Poll::Pending,
}
}
}

View file

@ -9,6 +9,7 @@ mod flags;
mod gotham_state; mod gotham_state;
mod inspector; mod inspector;
mod io; mod io;
mod joinset;
mod module_specifier; mod module_specifier;
mod modules; mod modules;
mod normalize_path; mod normalize_path;

View file

@ -1,10 +1,10 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use super::bindings; use super::bindings;
use crate::error::exception_to_err_result; use crate::error::exception_to_err_result;
use crate::joinset::JoinSet;
use crate::modules::ModuleCode; use crate::modules::ModuleCode;
use crate::ops::OpCtx; use crate::ops::OpCtx;
use crate::runtime::JsRuntimeState; use crate::runtime::JsRuntimeState;
use crate::task::MaskResultAsSend;
use crate::JsRuntime; use crate::JsRuntime;
use crate::OpId; use crate::OpId;
use crate::OpResult; use crate::OpResult;
@ -17,7 +17,6 @@ use std::hash::BuildHasherDefault;
use std::hash::Hasher; use std::hash::Hasher;
use std::option::Option; use std::option::Option;
use std::rc::Rc; use std::rc::Rc;
use tokio::task::JoinSet;
use v8::HandleScope; use v8::HandleScope;
use v8::Local; use v8::Local;
@ -50,8 +49,7 @@ pub(crate) struct ContextState {
pub(crate) pending_promise_rejections: pub(crate) pending_promise_rejections:
VecDeque<(v8::Global<v8::Promise>, v8::Global<v8::Value>)>, VecDeque<(v8::Global<v8::Promise>, v8::Global<v8::Value>)>,
pub(crate) unrefed_ops: HashSet<i32, BuildHasherDefault<IdentityHasher>>, pub(crate) unrefed_ops: HashSet<i32, BuildHasherDefault<IdentityHasher>>,
pub(crate) pending_ops: pub(crate) pending_ops: JoinSet<(PromiseId, OpId, OpResult)>,
JoinSet<MaskResultAsSend<(PromiseId, OpId, OpResult)>>,
// We don't explicitly re-read this prop but need the slice to live alongside // We don't explicitly re-read this prop but need the slice to live alongside
// the context // the context
pub(crate) op_ctxs: Box<[OpCtx]>, pub(crate) op_ctxs: Box<[OpCtx]>,

View file

@ -37,7 +37,6 @@ use anyhow::Context as AnyhowContext;
use anyhow::Error; use anyhow::Error;
use futures::channel::oneshot; use futures::channel::oneshot;
use futures::future::poll_fn; use futures::future::poll_fn;
use futures::future::Future;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::any::Any; use std::any::Any;
@ -2261,14 +2260,11 @@ impl JsRuntime {
SmallVec::with_capacity(32); SmallVec::with_capacity(32);
loop { loop {
let item = { let Poll::Ready(item) = context_state.pending_ops.poll_join_next(cx) else {
let next = std::pin::pin!(context_state.pending_ops.join_next()); break;
let Poll::Ready(Some(item)) = next.poll(cx) else {
break;
};
item
}; };
let (promise_id, op_id, mut resp) = item.unwrap().into_inner(); // TODO(mmastrac): If this task is really errored, things could be pretty bad
let (promise_id, op_id, mut resp) = item.unwrap();
state state
.borrow() .borrow()
.op_state .op_state

View file

@ -25,10 +25,11 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
state.get_error_class_fn state.get_error_class_fn
}; };
let fut = op.map(|result| crate::_ops::to_op_result(get_class, result)); let fut = op.map(|result| crate::_ops::to_op_result(get_class, result));
// SAFETY: this is guaranteed to be running on a current-thread executor ctx
ctx.context_state.borrow_mut().pending_ops.spawn(unsafe { .context_state
crate::task::MaskFutureAsSend::new(OpCall::new(ctx, promise_id, fut)) .borrow_mut()
}); .pending_ops
.spawn(OpCall::new(ctx, promise_id, fut));
} }
#[inline] #[inline]
@ -123,12 +124,7 @@ pub fn queue_async_op<'s>(
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(mut res) => { Poll::Ready(mut res) => {
if deferred { if deferred {
ctx ctx.context_state.borrow_mut().pending_ops.spawn(ready(res));
.context_state
.borrow_mut()
.pending_ops
// SAFETY: this is guaranteed to be running on a current-thread executor
.spawn(unsafe { crate::task::MaskFutureAsSend::new(ready(res)) });
return None; return None;
} else { } else {
ctx.state.borrow_mut().tracker.track_async_completed(ctx.id); ctx.state.borrow_mut().tracker.track_async_completed(ctx.id);
@ -137,12 +133,7 @@ pub fn queue_async_op<'s>(
} }
} }
ctx ctx.context_state.borrow_mut().pending_ops.spawn(pinned);
.context_state
.borrow_mut()
.pending_ops
// SAFETY: this is guaranteed to be running on a current-thread executor
.spawn(unsafe { crate::task::MaskFutureAsSend::new(pinned) });
None None
} }

View file

@ -21,6 +21,9 @@ use crate::Extension;
use crate::JsBuffer; use crate::JsBuffer;
use crate::*; use crate::*;
use anyhow::Error; use anyhow::Error;
use cooked_waker::IntoWaker;
use cooked_waker::Wake;
use cooked_waker::WakeRef;
use deno_ops::op; use deno_ops::op;
use futures::future::poll_fn; use futures::future::poll_fn;
use futures::future::Future; use futures::future::Future;
@ -28,11 +31,14 @@ use futures::FutureExt;
use std::cell::RefCell; use std::cell::RefCell;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI8;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
use std::time::Duration;
// deno_ops macros generate code assuming deno_core in scope. // deno_ops macros generate code assuming deno_core in scope.
mod deno_core { mod deno_core {
@ -264,6 +270,90 @@ fn test_execute_script_return_value() {
} }
} }
#[derive(Default)]
struct LoggingWaker {
woken: AtomicBool,
}
impl Wake for LoggingWaker {
fn wake(self) {
self.woken.store(true, Ordering::SeqCst);
}
}
impl WakeRef for LoggingWaker {
fn wake_by_ref(&self) {
self.woken.store(true, Ordering::SeqCst);
}
}
/// This is a reproduction for a very obscure bug where the Deno runtime locks up we end up polling
/// an empty JoinSet and attempt to resolve ops after-the-fact. There's a small footgun in the JoinSet
/// API where polling it while empty returns Ready(None), which means that it never holds on to the
/// waker. This means that if we aren't testing for this particular return value and don't stash the waker
/// ourselves for a future async op to eventually queue, we can end up losing the waker entirely and the
/// op wakes up, notifies tokio, which notifies the JoinSet, which then has nobody to notify )`:.
#[tokio::test]
async fn test_wakers_for_async_ops() {
static STATE: AtomicI8 = AtomicI8::new(0);
#[op]
async fn op_async_sleep() -> Result<(), Error> {
STATE.store(1, Ordering::SeqCst);
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
STATE.store(2, Ordering::SeqCst);
Ok(())
}
STATE.store(0, Ordering::SeqCst);
let logging_waker = Arc::new(LoggingWaker::default());
let waker = logging_waker.clone().into_waker();
deno_core::extension!(test_ext, ops = [op_async_sleep]);
let mut runtime = JsRuntime::new(RuntimeOptions {
extensions: vec![test_ext::init_ops()],
..Default::default()
});
// Drain events until we get to Ready
loop {
logging_waker.woken.store(false, Ordering::SeqCst);
let res = runtime.poll_event_loop(&mut Context::from_waker(&waker), false);
let ready = matches!(res, Poll::Ready(Ok(())));
assert!(ready || logging_waker.woken.load(Ordering::SeqCst));
if ready {
break;
}
}
// Start the AIIFE
runtime
.execute_script(
"",
FastString::from_static(
"(async () => { await Deno.core.opAsync('op_async_sleep'); })()",
),
)
.unwrap();
// Wait for future to finish
while STATE.load(Ordering::SeqCst) < 2 {
tokio::time::sleep(Duration::from_millis(1)).await;
}
// This shouldn't take one minute, but if it does, things are definitely locked up
for _ in 0..Duration::from_secs(60).as_millis() {
if logging_waker.woken.load(Ordering::SeqCst) {
// Success
return;
}
tokio::time::sleep(Duration::from_millis(1)).await;
}
panic!("The waker was never woken after the future completed");
}
#[tokio::test] #[tokio::test]
async fn test_poll_value() { async fn test_poll_value() {
let mut runtime = JsRuntime::new(Default::default()); let mut runtime = JsRuntime::new(Default::default());