mirror of
https://github.com/denoland/deno.git
synced 2024-12-28 18:19:08 -05:00
19f82b0eaa
This commit migrates "deno_core" from using "FuturesUnordered" to "tokio::task::JoinSet". This makes every op to be a separate Tokio task and should unlock better utilization of kqueue/epoll. There were two quirks added to this PR: - because of the fact that "JoinSet" immediately polls spawn tasks, op sanitizers can give false positives in some cases, this was alleviated by polling event loop once before running a test with "deno test", which gives canceled ops an opportunity to settle - "JsRuntimeState::waker" was moved to "OpState::waker" so that FFI API can still use threadsafe functions - without this change the registered wakers were wrong as they would not wake up the whole "JsRuntime" but the task associated with an op --------- Co-authored-by: Matt Mastracci <matthew@mastracci.com>
223 lines
5.4 KiB
Rust
223 lines
5.4 KiB
Rust
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
|
|
|
use crate::error::AnyError;
|
|
use crate::gotham_state::GothamState;
|
|
use crate::realm::ContextState;
|
|
use crate::resources::ResourceTable;
|
|
use crate::runtime::GetErrorClassFn;
|
|
use crate::runtime::JsRuntimeState;
|
|
use crate::OpDecl;
|
|
use crate::OpsTracker;
|
|
use anyhow::Error;
|
|
use futures::future::MaybeDone;
|
|
use futures::task::AtomicWaker;
|
|
use futures::Future;
|
|
use futures::FutureExt;
|
|
use pin_project::pin_project;
|
|
use serde::Serialize;
|
|
use std::cell::RefCell;
|
|
use std::ops::Deref;
|
|
use std::ops::DerefMut;
|
|
use std::pin::Pin;
|
|
use std::ptr::NonNull;
|
|
use std::rc::Rc;
|
|
use std::rc::Weak;
|
|
use std::sync::Arc;
|
|
use v8::fast_api::CFunctionInfo;
|
|
use v8::fast_api::CTypeInfo;
|
|
|
|
pub type PromiseId = i32;
|
|
pub type OpId = u16;
|
|
|
|
#[pin_project]
|
|
pub struct OpCall {
|
|
promise_id: PromiseId,
|
|
op_id: OpId,
|
|
/// Future is not necessarily Unpin, so we need to pin_project.
|
|
#[pin]
|
|
fut: MaybeDone<Pin<Box<dyn Future<Output = OpResult>>>>,
|
|
}
|
|
|
|
impl OpCall {
|
|
/// Wraps a future; the inner future is polled the usual way (lazily).
|
|
pub fn pending(
|
|
op_ctx: &OpCtx,
|
|
promise_id: PromiseId,
|
|
fut: Pin<Box<dyn Future<Output = OpResult> + 'static>>,
|
|
) -> Self {
|
|
Self {
|
|
op_id: op_ctx.id,
|
|
promise_id,
|
|
fut: MaybeDone::Future(fut),
|
|
}
|
|
}
|
|
|
|
/// Create a future by specifying its output. This is basically the same as
|
|
/// `async { value }` or `futures::future::ready(value)`.
|
|
pub fn ready(op_ctx: &OpCtx, promise_id: PromiseId, value: OpResult) -> Self {
|
|
Self {
|
|
op_id: op_ctx.id,
|
|
promise_id,
|
|
fut: MaybeDone::Done(value),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Future for OpCall {
|
|
type Output = (PromiseId, OpId, OpResult);
|
|
|
|
fn poll(
|
|
self: std::pin::Pin<&mut Self>,
|
|
cx: &mut std::task::Context<'_>,
|
|
) -> std::task::Poll<Self::Output> {
|
|
let promise_id = self.promise_id;
|
|
let op_id = self.op_id;
|
|
let fut = &mut *self.project().fut;
|
|
match fut {
|
|
MaybeDone::Done(_) => {
|
|
// Let's avoid using take_output as it keeps our Pin::box
|
|
let res = std::mem::replace(fut, MaybeDone::Gone);
|
|
let MaybeDone::Done(res) = res
|
|
else {
|
|
unreachable!()
|
|
};
|
|
std::task::Poll::Ready(res)
|
|
}
|
|
MaybeDone::Future(f) => f.poll_unpin(cx),
|
|
MaybeDone::Gone => std::task::Poll::Pending,
|
|
}
|
|
.map(move |res| (promise_id, op_id, res))
|
|
}
|
|
}
|
|
|
|
pub enum OpResult {
|
|
Ok(serde_v8::SerializablePkg),
|
|
Err(OpError),
|
|
}
|
|
|
|
impl OpResult {
|
|
pub fn to_v8<'a>(
|
|
&mut self,
|
|
scope: &mut v8::HandleScope<'a>,
|
|
) -> Result<v8::Local<'a, v8::Value>, serde_v8::Error> {
|
|
match self {
|
|
Self::Ok(x) => x.to_v8(scope),
|
|
Self::Err(err) => serde_v8::to_v8(scope, err),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct OpError {
|
|
#[serde(rename = "$err_class_name")]
|
|
class_name: &'static str,
|
|
message: String,
|
|
code: Option<&'static str>,
|
|
}
|
|
|
|
impl OpError {
|
|
pub fn new(get_class: GetErrorClassFn, err: Error) -> Self {
|
|
Self {
|
|
class_name: (get_class)(&err),
|
|
message: format!("{err:#}"),
|
|
code: crate::error_codes::get_error_code(&err),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn to_op_result<R: Serialize + 'static>(
|
|
get_class: GetErrorClassFn,
|
|
result: Result<R, Error>,
|
|
) -> OpResult {
|
|
match result {
|
|
Ok(v) => OpResult::Ok(v.into()),
|
|
Err(err) => OpResult::Err(OpError::new(get_class, err)),
|
|
}
|
|
}
|
|
|
|
// TODO(@AaronO): optimize OpCtx(s) mem usage ?
|
|
pub struct OpCtx {
|
|
pub id: OpId,
|
|
pub state: Rc<RefCell<OpState>>,
|
|
pub decl: Rc<OpDecl>,
|
|
pub fast_fn_c_info: Option<NonNull<v8::fast_api::CFunctionInfo>>,
|
|
pub runtime_state: Weak<RefCell<JsRuntimeState>>,
|
|
pub(crate) context_state: Rc<RefCell<ContextState>>,
|
|
}
|
|
|
|
impl OpCtx {
|
|
pub(crate) fn new(
|
|
id: OpId,
|
|
context_state: Rc<RefCell<ContextState>>,
|
|
decl: Rc<OpDecl>,
|
|
state: Rc<RefCell<OpState>>,
|
|
runtime_state: Weak<RefCell<JsRuntimeState>>,
|
|
) -> Self {
|
|
let mut fast_fn_c_info = None;
|
|
|
|
if let Some(fast_fn) = &decl.fast_fn {
|
|
let args = CTypeInfo::new_from_slice(fast_fn.args);
|
|
let ret = CTypeInfo::new(fast_fn.return_type);
|
|
|
|
// SAFETY: all arguments are coming from the trait and they have
|
|
// static lifetime
|
|
let c_fn = unsafe {
|
|
CFunctionInfo::new(args.as_ptr(), fast_fn.args.len(), ret.as_ptr())
|
|
};
|
|
fast_fn_c_info = Some(c_fn);
|
|
}
|
|
|
|
OpCtx {
|
|
id,
|
|
state,
|
|
runtime_state,
|
|
decl,
|
|
context_state,
|
|
fast_fn_c_info,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Maintains the resources and ops inside a JS runtime.
|
|
pub struct OpState {
|
|
pub resource_table: ResourceTable,
|
|
pub get_error_class_fn: GetErrorClassFn,
|
|
pub tracker: OpsTracker,
|
|
pub last_fast_op_error: Option<AnyError>,
|
|
pub(crate) gotham_state: GothamState,
|
|
pub waker: Arc<AtomicWaker>,
|
|
}
|
|
|
|
impl OpState {
|
|
pub fn new(ops_count: usize) -> OpState {
|
|
OpState {
|
|
resource_table: Default::default(),
|
|
get_error_class_fn: &|_| "Error",
|
|
gotham_state: Default::default(),
|
|
last_fast_op_error: None,
|
|
tracker: OpsTracker::new(ops_count),
|
|
waker: Arc::new(AtomicWaker::new()),
|
|
}
|
|
}
|
|
|
|
/// Clear all user-provided resources and state.
|
|
pub(crate) fn clear(&mut self) {
|
|
std::mem::take(&mut self.gotham_state);
|
|
std::mem::take(&mut self.resource_table);
|
|
}
|
|
}
|
|
|
|
impl Deref for OpState {
|
|
type Target = GothamState;
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
&self.gotham_state
|
|
}
|
|
}
|
|
|
|
impl DerefMut for OpState {
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
&mut self.gotham_state
|
|
}
|
|
}
|