diff --git a/cli/napi/threadsafe_functions.rs b/cli/napi/threadsafe_functions.rs index d692b5bb27..119ee81da6 100644 --- a/cli/napi/threadsafe_functions.rs +++ b/cli/napi/threadsafe_functions.rs @@ -2,19 +2,33 @@ use deno_core::futures::channel::mpsc; use deno_runtime::deno_napi::*; +use once_cell::sync::Lazy; use std::mem::forget; +use std::sync::atomic::AtomicUsize; use std::sync::mpsc::channel; +use std::sync::Arc; + +static TS_FN_ID_COUNTER: Lazy = Lazy::new(|| AtomicUsize::new(0)); pub struct TsFn { + pub id: usize, pub env: *mut Env, pub maybe_func: Option>, pub maybe_call_js_cb: Option, pub context: *mut c_void, pub thread_counter: usize, + pub ref_counter: Arc, sender: mpsc::UnboundedSender, tsfn_sender: mpsc::UnboundedSender, } +impl Drop for TsFn { + fn drop(&mut self) { + let env = unsafe { self.env.as_mut().unwrap() }; + env.remove_threadsafe_function_ref_counter(self.id) + } +} + impl TsFn { pub fn acquire(&mut self) -> Result { self.thread_counter += 1; @@ -35,6 +49,29 @@ impl TsFn { Ok(()) } + pub fn ref_(&mut self) -> Result { + self + .ref_counter + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) + } + + pub fn unref(&mut self) -> Result { + let _ = self.ref_counter.fetch_update( + std::sync::atomic::Ordering::SeqCst, + std::sync::atomic::Ordering::SeqCst, + |x| { + if x == 0 { + None + } else { + Some(x - 1) + } + }, + ); + + Ok(()) + } + pub fn call(&self, data: *mut c_void, is_blocking: bool) { let js_func = self.maybe_func.clone(); let (tx, rx) = channel(); @@ -107,15 +144,21 @@ fn napi_create_threadsafe_function( }) .transpose()?; + let id = TS_FN_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + let tsfn = TsFn { + id, maybe_func, maybe_call_js_cb, context, thread_counter: initial_thread_count, sender: env_ref.async_work_sender.clone(), tsfn_sender: env_ref.threadsafe_function_sender.clone(), + ref_counter: Arc::new(AtomicUsize::new(1)), env, }; + env_ref + .add_threadsafe_function_ref_counter(tsfn.id, tsfn.ref_counter.clone()); env_ref .threadsafe_function_sender @@ -142,7 +185,8 @@ fn napi_unref_threadsafe_function( _env: &mut Env, tsfn: napi_threadsafe_function, ) -> Result { - let _tsfn: &TsFn = &*(tsfn as *const TsFn); + let tsfn: &mut TsFn = &mut *(tsfn as *mut TsFn); + tsfn.unref()?; Ok(()) } @@ -170,8 +214,12 @@ fn napi_call_threadsafe_function( } #[napi_sym::napi_sym] -fn napi_ref_threadsafe_function() -> Result { - // TODO +fn napi_ref_threadsafe_function( + _env: &mut Env, + func: napi_threadsafe_function, +) -> Result { + let tsfn: &mut TsFn = &mut *(func as *mut TsFn); + tsfn.ref_()?; Ok(()) } diff --git a/ext/napi/lib.rs b/ext/napi/lib.rs index 882f7c19d7..57f73a0ca2 100644 --- a/ext/napi/lib.rs +++ b/ext/napi/lib.rs @@ -19,6 +19,8 @@ use std::ffi::CString; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; use std::task::Poll; use std::thread_local; @@ -322,7 +324,7 @@ pub struct napi_node_version { } pub type PendingNapiAsyncWork = Box; - +pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc)>; pub struct NapiState { // Async tasks. pub pending_async_work: Vec, @@ -336,6 +338,7 @@ pub struct NapiState { mpsc::UnboundedSender, pub env_cleanup_hooks: Rc>>, + pub tsfn_ref_counters: Rc>, } impl Drop for NapiState { @@ -391,6 +394,7 @@ pub struct Env { mpsc::UnboundedSender, pub cleanup_hooks: Rc>>, + pub tsfn_ref_counters: Rc>, } unsafe impl Send for Env {} @@ -405,6 +409,7 @@ impl Env { cleanup_hooks: Rc< RefCell>, >, + tsfn_ref_counters: Rc>, ) -> Self { let sc = sender.clone(); ASYNC_WORK_SENDER.with(|s| { @@ -423,6 +428,7 @@ impl Env { async_work_sender: sender, threadsafe_function_sender, cleanup_hooks, + tsfn_ref_counters, } } @@ -458,6 +464,22 @@ impl Env { // using `napi_open_handle_scope`. unsafe { v8::CallbackScope::new(context) } } + + pub fn add_threadsafe_function_ref_counter( + &mut self, + id: usize, + counter: Arc, + ) { + let mut counters = self.tsfn_ref_counters.borrow_mut(); + assert!(!counters.iter().any(|(i, _)| *i == id)); + counters.push((id, counter)); + } + + pub fn remove_threadsafe_function_ref_counter(&mut self, id: usize) { + let mut counters = self.tsfn_ref_counters.borrow_mut(); + let index = counters.iter().position(|(i, _)| *i == id).unwrap(); + counters.remove(index); + } } pub fn init(unstable: bool) -> Extension { @@ -479,22 +501,16 @@ pub fn init(unstable: bool) -> Extension { napi_state.pending_async_work.push(async_work_fut); } - while let Poll::Ready(Some(tsfn_status)) = - napi_state.threadsafe_function_receiver.poll_next_unpin(cx) - { - match tsfn_status { - ThreadSafeFunctionStatus::Alive => { - napi_state.active_threadsafe_functions += 1 - } - ThreadSafeFunctionStatus::Dead => { - napi_state.active_threadsafe_functions -= 1 - } - }; - } - if napi_state.active_threadsafe_functions > 0 { maybe_scheduling = true; } + + for (_id, counter) in napi_state.tsfn_ref_counters.borrow().iter() { + if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 { + maybe_scheduling = true; + break; + } + } } loop { @@ -527,6 +543,7 @@ pub fn init(unstable: bool) -> Extension { threadsafe_function_receiver, active_threadsafe_functions: 0, env_cleanup_hooks: Rc::new(RefCell::new(vec![])), + tsfn_ref_counters: Rc::new(RefCell::new(vec![])), }); state.put(Unstable(unstable)); Ok(()) @@ -563,7 +580,13 @@ where let permissions = op_state.borrow_mut::(); permissions.check(Some(&PathBuf::from(&path)))?; - let (async_work_sender, tsfn_sender, isolate_ptr, cleanup_hooks) = { + let ( + async_work_sender, + tsfn_sender, + isolate_ptr, + cleanup_hooks, + tsfn_ref_counters, + ) = { let napi_state = op_state.borrow::(); let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>(); ( @@ -571,6 +594,7 @@ where napi_state.threadsafe_function_sender.clone(), *isolate_ptr, napi_state.env_cleanup_hooks.clone(), + napi_state.tsfn_ref_counters.clone(), ) }; @@ -593,6 +617,7 @@ where async_work_sender, tsfn_sender, cleanup_hooks, + tsfn_ref_counters, ); env.shared = Box::into_raw(Box::new(env_shared)); let env_ptr = Box::into_raw(Box::new(env)) as _; diff --git a/test_napi/src/lib.rs b/test_napi/src/lib.rs index 3a28e4471a..025fbf5d2b 100644 --- a/test_napi/src/lib.rs +++ b/test_napi/src/lib.rs @@ -17,6 +17,7 @@ pub mod primitives; pub mod promise; pub mod properties; pub mod strings; +pub mod tsfn; pub mod typedarray; #[macro_export] @@ -126,6 +127,7 @@ unsafe extern "C" fn napi_register_module_v1( object_wrap::init(env, exports); callback::init(env, exports); r#async::init(env, exports); + tsfn::init(env, exports); init_cleanup_hook(env, exports); exports diff --git a/test_napi/src/tsfn.rs b/test_napi/src/tsfn.rs new file mode 100644 index 0000000000..314975f39b --- /dev/null +++ b/test_napi/src/tsfn.rs @@ -0,0 +1,108 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// This test performs initilization similar to napi-rs. +// https://github.com/napi-rs/napi-rs/commit/a5a04a4e545f268769cc78e2bd6c45af4336aac3 + +use napi_sys as sys; +use std::ffi::c_char; +use std::ffi::c_void; +use std::ptr; + +macro_rules! check_status_or_panic { + ($code:expr, $msg:expr) => {{ + let c = $code; + match c { + sys::Status::napi_ok => {} + _ => panic!($msg), + } + }}; +} + +fn create_custom_gc(env: sys::napi_env) { + let mut custom_gc_fn = ptr::null_mut(); + check_status_or_panic!( + unsafe { + sys::napi_create_function( + env, + "custom_gc".as_ptr() as *const c_char, + 9, + Some(empty), + ptr::null_mut(), + &mut custom_gc_fn, + ) + }, + "Create Custom GC Function in napi_register_module_v1 failed" + ); + let mut async_resource_name = ptr::null_mut(); + check_status_or_panic!( + unsafe { + sys::napi_create_string_utf8( + env, + "CustomGC".as_ptr() as *const c_char, + 8, + &mut async_resource_name, + ) + }, + "Create async resource string in napi_register_module_v1 napi_register_module_v1" + ); + let mut custom_gc_tsfn = ptr::null_mut(); + check_status_or_panic!( + unsafe { + sys::napi_create_threadsafe_function( + env, + custom_gc_fn, + ptr::null_mut(), + async_resource_name, + 0, + 1, + ptr::null_mut(), + Some(custom_gc_finalize), + ptr::null_mut(), + Some(custom_gc), + &mut custom_gc_tsfn, + ) + }, + "Create Custom GC ThreadsafeFunction in napi_register_module_v1 failed" + ); + check_status_or_panic!( + unsafe { sys::napi_unref_threadsafe_function(env, custom_gc_tsfn) }, + "Unref Custom GC ThreadsafeFunction in napi_register_module_v1 failed" + ); +} + +unsafe extern "C" fn empty( + _env: sys::napi_env, + _info: sys::napi_callback_info, +) -> sys::napi_value { + ptr::null_mut() +} + +unsafe extern "C" fn custom_gc_finalize( + _env: sys::napi_env, + _finalize_data: *mut c_void, + _finalize_hint: *mut c_void, +) { +} + +extern "C" fn custom_gc( + env: sys::napi_env, + _js_callback: sys::napi_value, + _context: *mut c_void, + data: *mut c_void, +) { + let mut ref_count = 0; + check_status_or_panic!( + unsafe { + sys::napi_reference_unref(env, data as sys::napi_ref, &mut ref_count) + }, + "Failed to unref Buffer reference in Custom GC" + ); + check_status_or_panic!( + unsafe { sys::napi_delete_reference(env, data as sys::napi_ref) }, + "Failed to delete Buffer reference in Custom GC" + ); +} + +pub fn init(env: sys::napi_env, _exports: sys::napi_value) { + create_custom_gc(env); +}