1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-18 03:44:05 -05:00

refactor(core): Remove MaybeDone from ops to eventually remove the box (#19508)

This removes MaybeDone from op resolution. While it would be nice to avoid the box, most of the work for that future task is done here.
This commit is contained in:
Matt Mastracci 2023-06-14 16:22:54 -06:00 committed by GitHub
parent fc4e4c3e93
commit 48c6f71787
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 91 deletions

View file

@ -9,16 +9,13 @@ use crate::runtime::JsRuntimeState;
use crate::OpDecl; use crate::OpDecl;
use crate::OpsTracker; use crate::OpsTracker;
use anyhow::Error; use anyhow::Error;
use futures::future::MaybeDone;
use futures::task::AtomicWaker; use futures::task::AtomicWaker;
use futures::Future; use futures::Future;
use futures::FutureExt;
use pin_project::pin_project; use pin_project::pin_project;
use serde::Serialize; use serde::Serialize;
use std::cell::RefCell; use std::cell::RefCell;
use std::ops::Deref; use std::ops::Deref;
use std::ops::DerefMut; use std::ops::DerefMut;
use std::pin::Pin;
use std::ptr::NonNull; use std::ptr::NonNull;
use std::rc::Rc; use std::rc::Rc;
use std::rc::Weak; use std::rc::Weak;
@ -30,40 +27,26 @@ pub type PromiseId = i32;
pub type OpId = u16; pub type OpId = u16;
#[pin_project] #[pin_project]
pub struct OpCall { pub struct OpCall<F: Future<Output = OpResult>> {
promise_id: PromiseId, promise_id: PromiseId,
op_id: OpId, op_id: OpId,
/// Future is not necessarily Unpin, so we need to pin_project. /// Future is not necessarily Unpin, so we need to pin_project.
#[pin] #[pin]
fut: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>, fut: F,
} }
impl OpCall { impl<F: Future<Output = OpResult>> OpCall<F> {
/// Wraps a future; the inner future is polled the usual way (lazily). /// Wraps a future; the inner future is polled the usual way (lazily).
pub fn pending( pub fn new(op_ctx: &OpCtx, promise_id: PromiseId, fut: F) -> Self {
op_ctx: &OpCtx,
promise_id: PromiseId,
fut: Pin<Box<dyn Future<Output = OpResult> + 'static>>,
) -> Self {
Self { Self {
op_id: op_ctx.id, op_id: op_ctx.id,
promise_id, promise_id,
fut: MaybeDone::Future(fut), fut,
}
}
/// Create a future by specifying its output. This is basically the same as
/// `async { value }` or `futures::future::ready(value)`.
pub fn ready(op_ctx: &OpCtx, promise_id: PromiseId, value: OpResult) -> Self {
Self {
op_id: op_ctx.id,
promise_id,
fut: MaybeDone::Done(value),
} }
} }
} }
impl Future for OpCall { impl<F: Future<Output = OpResult>> Future for OpCall<F> {
type Output = (PromiseId, OpId, OpResult); type Output = (PromiseId, OpId, OpResult);
fn poll( fn poll(
@ -72,21 +55,8 @@ impl Future for OpCall {
) -> std::task::Poll<Self::Output> { ) -> std::task::Poll<Self::Output> {
let promise_id = self.promise_id; let promise_id = self.promise_id;
let op_id = self.op_id; let op_id = self.op_id;
let fut = &mut *self.project().fut; let fut = self.project().fut;
match fut { fut.poll(cx).map(move |res| (promise_id, op_id, res))
MaybeDone::Done(_) => {
// Let's avoid using take_output as it keeps our Pin::box
let res = std::mem::replace(fut, MaybeDone::Gone);
let MaybeDone::Done(res) = res
else {
unreachable!()
};
std::task::Poll::Ready(res)
}
MaybeDone::Future(f) => f.poll_unpin(cx),
MaybeDone::Gone => std::task::Poll::Pending,
}
.map(move |res| (promise_id, op_id, res))
} }
} }

View file

@ -3,13 +3,13 @@ use crate::ops::*;
use crate::OpResult; use crate::OpResult;
use crate::PromiseId; use crate::PromiseId;
use anyhow::Error; use anyhow::Error;
use futures::future::Either;
use futures::future::Future; use futures::future::Future;
use futures::future::FutureExt; use futures::future::FutureExt;
use futures::future::MaybeDone; use futures::task::noop_waker_ref;
use futures::task::noop_waker;
use std::cell::RefCell; use std::cell::RefCell;
use std::future::ready;
use std::option::Option; use std::option::Option;
use std::pin::Pin;
use std::task::Context; use std::task::Context;
use std::task::Poll; use std::task::Poll;
@ -24,12 +24,10 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
state.tracker.track_async(ctx.id); state.tracker.track_async(ctx.id);
state.get_error_class_fn state.get_error_class_fn
}; };
let fut = op let fut = op.map(|result| crate::_ops::to_op_result(get_class, result));
.map(|result| crate::_ops::to_op_result(get_class, result)) // SAFETY: this is guaranteed to be running on a current-thread executor
.boxed_local();
// SAFETY: this this is guaranteed to be running on a current-thread executor
ctx.context_state.borrow_mut().pending_ops.spawn(unsafe { ctx.context_state.borrow_mut().pending_ops.spawn(unsafe {
crate::task::MaskFutureAsSend::new(OpCall::pending(ctx, promise_id, fut)) crate::task::MaskFutureAsSend::new(OpCall::new(ctx, promise_id, fut))
}); });
} }
@ -37,36 +35,32 @@ pub fn queue_fast_async_op<R: serde::Serialize + 'static>(
pub fn map_async_op1<R: serde::Serialize + 'static>( pub fn map_async_op1<R: serde::Serialize + 'static>(
ctx: &OpCtx, ctx: &OpCtx,
op: impl Future<Output = Result<R, Error>> + 'static, op: impl Future<Output = Result<R, Error>> + 'static,
) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { ) -> impl Future<Output = OpResult> {
let get_class = { let get_class = {
let state = RefCell::borrow(&ctx.state); let state = RefCell::borrow(&ctx.state);
state.tracker.track_async(ctx.id); state.tracker.track_async(ctx.id);
state.get_error_class_fn state.get_error_class_fn
}; };
let fut = op op.map(|res| crate::_ops::to_op_result(get_class, res))
.map(|result| crate::_ops::to_op_result(get_class, result))
.boxed_local();
MaybeDone::Future(fut)
} }
#[inline] #[inline]
pub fn map_async_op2<R: serde::Serialize + 'static>( pub fn map_async_op2<R: serde::Serialize + 'static>(
ctx: &OpCtx, ctx: &OpCtx,
op: impl Future<Output = R> + 'static, op: impl Future<Output = R> + 'static,
) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { ) -> impl Future<Output = OpResult> {
let state = RefCell::borrow(&ctx.state); let state = RefCell::borrow(&ctx.state);
state.tracker.track_async(ctx.id); state.tracker.track_async(ctx.id);
let fut = op.map(|result| OpResult::Ok(result.into())).boxed_local(); op.map(|res| OpResult::Ok(res.into()))
MaybeDone::Future(fut)
} }
#[inline] #[inline]
pub fn map_async_op3<R: serde::Serialize + 'static>( pub fn map_async_op3<R: serde::Serialize + 'static>(
ctx: &OpCtx, ctx: &OpCtx,
op: Result<impl Future<Output = Result<R, Error>> + 'static, Error>, op: Result<impl Future<Output = Result<R, Error>> + 'static, Error>,
) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { ) -> impl Future<Output = OpResult> {
let get_class = { let get_class = {
let state = RefCell::borrow(&ctx.state); let state = RefCell::borrow(&ctx.state);
state.tracker.track_async(ctx.id); state.tracker.track_async(ctx.id);
@ -74,12 +68,12 @@ pub fn map_async_op3<R: serde::Serialize + 'static>(
}; };
match op { match op {
Err(err) => MaybeDone::Done(OpResult::Err(OpError::new(get_class, err))), Err(err) => {
Ok(fut) => MaybeDone::Future( Either::Left(ready(OpResult::Err(OpError::new(get_class, err))))
fut }
.map(|result| crate::_ops::to_op_result(get_class, result)) Ok(fut) => {
.boxed_local(), Either::Right(fut.map(|res| crate::_ops::to_op_result(get_class, res)))
), }
} }
} }
@ -87,7 +81,7 @@ pub fn map_async_op3<R: serde::Serialize + 'static>(
pub fn map_async_op4<R: serde::Serialize + 'static>( pub fn map_async_op4<R: serde::Serialize + 'static>(
ctx: &OpCtx, ctx: &OpCtx,
op: Result<impl Future<Output = R> + 'static, Error>, op: Result<impl Future<Output = R> + 'static, Error>,
) -> MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>> { ) -> impl Future<Output = OpResult> {
let get_class = { let get_class = {
let state = RefCell::borrow(&ctx.state); let state = RefCell::borrow(&ctx.state);
state.tracker.track_async(ctx.id); state.tracker.track_async(ctx.id);
@ -95,10 +89,10 @@ pub fn map_async_op4<R: serde::Serialize + 'static>(
}; };
match op { match op {
Err(err) => MaybeDone::Done(OpResult::Err(OpError::new(get_class, err))), Err(err) => {
Ok(fut) => MaybeDone::Future( Either::Left(ready(OpResult::Err(OpError::new(get_class, err))))
fut.map(|result| OpResult::Ok(result.into())).boxed_local(), }
), Ok(fut) => Either::Right(fut.map(|r| OpResult::Ok(r.into()))),
} }
} }
@ -107,7 +101,7 @@ pub fn queue_async_op<'s>(
scope: &'s mut v8::HandleScope, scope: &'s mut v8::HandleScope,
deferred: bool, deferred: bool,
promise_id: PromiseId, promise_id: PromiseId,
mut op: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>, op: impl Future<Output = OpResult> + 'static,
) -> Option<v8::Local<'s, v8::Value>> { ) -> Option<v8::Local<'s, v8::Value>> {
// An op's realm (as given by `OpCtx::realm_idx`) must match the realm in // An op's realm (as given by `OpCtx::realm_idx`) must match the realm in
// which it is invoked. Otherwise, we might have cross-realm object exposure. // which it is invoked. Otherwise, we might have cross-realm object exposure.
@ -119,38 +113,35 @@ pub fn queue_async_op<'s>(
// Some(scope.get_current_context()) // Some(scope.get_current_context())
// ); // );
// All ops are polled immediately let id = ctx.id;
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
// Note that MaybeDone returns () from the future // TODO(mmastrac): We have to poll every future here because that assumption is baked into a large number
let op_call = match op.poll_unpin(&mut cx) { // of ops. If we can figure out a way around this, we can remove this call to boxed_local and save a malloc per future.
Poll::Pending => { let mut pinned = op.map(move |res| (promise_id, id, res)).boxed_local();
let MaybeDone::Future(fut) = op else {
unreachable!()
};
OpCall::pending(ctx, promise_id, fut)
}
Poll::Ready(_) => {
let mut op_result = Pin::new(&mut op).take_output().unwrap();
// If the op is ready and is not marked as deferred we can immediately return
// the result.
if !deferred {
ctx.state.borrow_mut().tracker.track_async_completed(ctx.id);
return Some(op_result.to_v8(scope).unwrap());
}
OpCall::ready(ctx, promise_id, op_result) match pinned.poll_unpin(&mut Context::from_waker(noop_waker_ref())) {
} Poll::Pending => {}
}; Poll::Ready(mut res) => {
if deferred {
// Otherwise we will push it to the `pending_ops` and let it be polled again
// or resolved on the next tick of the event loop.
ctx ctx
.context_state .context_state
.borrow_mut() .borrow_mut()
.pending_ops .pending_ops
// SAFETY: this this is guaranteed to be running on a current-thread executor // SAFETY: this is guaranteed to be running on a current-thread executor
.spawn(unsafe { crate::task::MaskFutureAsSend::new(op_call) }); .spawn(unsafe { crate::task::MaskFutureAsSend::new(ready(res)) });
return None;
} else {
ctx.state.borrow_mut().tracker.track_async_completed(ctx.id);
return Some(res.2.to_v8(scope).unwrap());
}
}
}
ctx
.context_state
.borrow_mut()
.pending_ops
// SAFETY: this is guaranteed to be running on a current-thread executor
.spawn(unsafe { crate::task::MaskFutureAsSend::new(pinned) });
None None
} }