diff --git a/cli/napi/async.rs b/cli/napi/async.rs index 48de367280..1fb0c6374c 100644 --- a/cli/napi/async.rs +++ b/cli/napi/async.rs @@ -11,6 +11,9 @@ pub struct AsyncWork { pub complete: napi_async_complete_callback, } +unsafe impl Send for AsyncWork {} +unsafe impl Sync for AsyncWork {} + #[napi_sym::napi_sym] fn napi_create_async_work( _env: *mut Env, @@ -61,12 +64,22 @@ fn napi_queue_async_work( return napi_invalid_arg; }; - let fut = Box::new(move || { - (work.execute)(env_ptr as napi_env, work.data); - // Note: Must be called from the loop thread. - (work.complete)(env_ptr as napi_env, napi_ok, work.data); - }); - env.add_async_work(fut); + #[repr(transparent)] + struct SendPtr(*const T); + unsafe impl Send for SendPtr {} + unsafe impl Sync for SendPtr {} + let send_env = SendPtr(env_ptr); + + #[inline(always)] + fn do_work(ptr: SendPtr, work: &AsyncWork) { + // SAFETY: This is a valid async work queue call and it runs on the event loop thread + unsafe { + (work.execute)(ptr.0 as napi_env, work.data); + (work.complete)(ptr.0 as napi_env, napi_ok, work.data); + } + } + + env.add_async_work(move || do_work(send_env, work)); napi_ok } diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs index f47da46e9c..15395529d8 100644 --- a/cli/napi/threadsafe_functions.rs +++ b/cli/napi/threadsafe_functions.rs @@ -1,11 +1,12 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use deno_core::futures::channel::mpsc; +use deno_core::V8CrossThreadTaskSpawner; use deno_runtime::deno_napi::*; use once_cell::sync::Lazy; use std::mem::forget; +use std::ptr::NonNull; use std::sync::atomic::AtomicUsize; -use std::sync::mpsc::channel; use std::sync::Arc; static TS_FN_ID_COUNTER: Lazy = Lazy::new(|| AtomicUsize::new(0)); @@ -20,7 +21,7 @@ pub struct TsFn { pub ref_counter: Arc, finalizer: Option, finalizer_data: *mut c_void, - sender: mpsc::UnboundedSender, + sender: V8CrossThreadTaskSpawner, tsfn_sender: mpsc::UnboundedSender, } @@ -84,47 +85,86 @@ impl TsFn { pub fn call(&self, data: *mut c_void, is_blocking: bool) { let js_func = self.maybe_func.clone(); - let (tx, rx) = channel(); + + #[repr(transparent)] + struct SendPtr(*const T); + unsafe impl Send for SendPtr {} + unsafe impl Sync for SendPtr {} + + let env = SendPtr(self.env); + let context = SendPtr(self.context); + let data = SendPtr(data); + + #[inline(always)] + fn spawn( + sender: &V8CrossThreadTaskSpawner, + is_blocking: bool, + f: impl FnOnce(&mut v8::HandleScope) + Send + 'static, + ) { + if is_blocking { + sender.spawn_blocking(f); + } else { + sender.spawn(f); + } + } if let Some(call_js_cb) = self.maybe_call_js_cb { - let context = self.context; - let env = self.env; - let call = Box::new(move || { - let scope = &mut unsafe { (*env).scope() }; - match js_func { - Some(func) => { - let func: v8::Local = - func.open(scope).to_object(scope).unwrap().into(); - unsafe { - call_js_cb(env as *mut c_void, func.into(), context, data) - }; - } - None => { - unsafe { - call_js_cb(env as *mut c_void, std::mem::zeroed(), context, data) - }; + if let Some(func) = js_func { + let func = SendPtr(func.into_raw().as_ptr()); + #[inline(always)] + fn call( + scope: &mut v8::HandleScope, + call_js_cb: napi_threadsafe_function_call_js, + func: SendPtr, + env: SendPtr, + context: SendPtr, + data: SendPtr, + ) { + // SAFETY: This is a valid global from above + let func: v8::Global = unsafe { + v8::Global::::from_raw( + scope, + NonNull::new_unchecked(func.0 as _), + ) + }; + let func: v8::Local = + func.open(scope).to_object(scope).unwrap().into(); + // SAFETY: env is valid for the duration of the callback. + // data lifetime is users responsibility. + unsafe { + call_js_cb(env.0 as _, func.into(), context.0 as _, data.0 as _) } } - - // Receiver might have been already dropped - let _ = tx.send(()); - }); - // This call should never fail - self.sender.unbounded_send(call).unwrap(); - } else if let Some(_js_func) = js_func { - let call = Box::new(move || { + spawn(&self.sender, is_blocking, move |scope| { + call(scope, call_js_cb, func, env, context, data); + }); + } else { + #[inline(always)] + fn call( + call_js_cb: napi_threadsafe_function_call_js, + env: SendPtr, + context: SendPtr, + data: SendPtr, + ) { + // SAFETY: We're calling the provided callback with valid args + unsafe { + call_js_cb( + env.0 as _, + std::mem::zeroed(), + context.0 as _, + data.0 as _, + ) + } + } + spawn(&self.sender, is_blocking, move |_| { + call(call_js_cb, env, context, data); + }); + } + } else { + spawn(&self.sender, is_blocking, |_| { // TODO: func.call - // let func = js_func.open(scope); - // Receiver might have been already dropped - let _ = tx.send(()); }); - // This call should never fail - self.sender.unbounded_send(call).unwrap(); - } - - if is_blocking { - let _ = rx.recv(); - } + }; } } diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs index e897e149d4..782635a27e 100644 --- a/ext/napi/lib.rs +++ b/ext/napi/lib.rs @@ -9,10 +9,10 @@ use core::ptr::NonNull; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::channel::mpsc; -use deno_core::futures::StreamExt; use deno_core::op2; use deno_core::parking_lot::Mutex; use deno_core::OpState; +use deno_core::V8CrossThreadTaskSpawner; use std::cell::RefCell; use std::ffi::CString; use std::path::Path; @@ -20,7 +20,6 @@ use std::path::PathBuf; use std::rc::Rc; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use std::task::Poll; use std::thread_local; #[cfg(unix)] @@ -231,13 +230,11 @@ pub struct napi_node_version { pub release: *const c_char, } -pub type PendingNapiAsyncWork = Box; +pub trait PendingNapiAsyncWork: FnOnce() + Send + 'static {} +impl PendingNapiAsyncWork for T where T: FnOnce() + Send + 'static {} + pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc)>; pub struct NapiState { - // Async tasks. - pub pending_async_work: Vec, - pub async_work_sender: mpsc::UnboundedSender, - pub async_work_receiver: mpsc::UnboundedReceiver, // Thread safe functions. pub active_threadsafe_functions: usize, pub threadsafe_function_receiver: @@ -318,7 +315,7 @@ pub struct Env { pub isolate_ptr: *mut v8::OwnedIsolate, pub open_handle_scopes: usize, pub shared: *mut EnvShared, - pub async_work_sender: mpsc::UnboundedSender, + pub async_work_sender: V8CrossThreadTaskSpawner, pub threadsafe_function_sender: mpsc::UnboundedSender, pub cleanup_hooks: @@ -336,7 +333,7 @@ impl Env { isolate_ptr: *mut v8::OwnedIsolate, context: v8::Global, global: v8::Global, - sender: mpsc::UnboundedSender, + sender: V8CrossThreadTaskSpawner, threadsafe_function_sender: mpsc::UnboundedSender, cleanup_hooks: Rc< RefCell>, @@ -372,8 +369,8 @@ impl Env { unsafe { &mut *self.shared } } - pub fn add_async_work(&mut self, async_work: PendingNapiAsyncWork) { - self.async_work_sender.unbounded_send(async_work).unwrap(); + pub fn add_async_work(&mut self, async_work: impl FnOnce() + Send + 'static) { + self.async_work_sender.spawn(|_| async_work()); } #[inline] @@ -418,14 +415,9 @@ deno_core::extension!(deno_napi, op_napi_open

], state = |state| { - let (async_work_sender, async_work_receiver) = - mpsc::unbounded::(); let (threadsafe_function_sender, threadsafe_function_receiver) = mpsc::unbounded::(); state.put(NapiState { - pending_async_work: Vec::new(), - async_work_sender, - async_work_receiver, threadsafe_function_sender, threadsafe_function_receiver, active_threadsafe_functions: 0, @@ -433,59 +425,8 @@ deno_core::extension!(deno_napi, tsfn_ref_counters: Arc::new(Mutex::new(vec![])), }); }, - event_loop_middleware = event_loop_middleware, ); -fn event_loop_middleware( - op_state_rc: Rc>, - cx: &mut std::task::Context, -) -> bool { - // `work` can call back into the runtime. It can also schedule an async task - // but we don't know that now. We need to make the runtime re-poll to make - // sure no pending NAPI tasks exist. - let mut maybe_scheduling = false; - - { - let mut op_state = op_state_rc.borrow_mut(); - let napi_state = op_state.borrow_mut::(); - - while let Poll::Ready(Some(async_work_fut)) = - napi_state.async_work_receiver.poll_next_unpin(cx) - { - napi_state.pending_async_work.push(async_work_fut); - } - - if napi_state.active_threadsafe_functions > 0 { - maybe_scheduling = true; - } - - let tsfn_ref_counters = napi_state.tsfn_ref_counters.lock().clone(); - for (_id, counter) in tsfn_ref_counters.iter() { - if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 { - maybe_scheduling = true; - break; - } - } - } - - loop { - let maybe_work = { - let mut op_state = op_state_rc.borrow_mut(); - let napi_state = op_state.borrow_mut::(); - napi_state.pending_async_work.pop() - }; - - if let Some(work) = maybe_work { - work(); - maybe_scheduling = true; - } else { - break; - } - } - - maybe_scheduling -} - pub trait NapiPermissions { fn check(&mut self, path: Option<&Path>) -> std::result::Result<(), AnyError>; @@ -557,7 +498,7 @@ where let napi_state = op_state.borrow::(); let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>(); ( - napi_state.async_work_sender.clone(), + op_state.borrow::().clone(), napi_state.threadsafe_function_sender.clone(), *isolate_ptr, napi_state.env_cleanup_hooks.clone(),