1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-23 15:16:54 -05:00

fix(napi): Implement napi_threadsafe_function ref and unref (#17304)

Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
Divy Srivastava 2023-01-12 04:47:55 -08:00 committed by David Sherret
parent b851b3ab74
commit 5a01dbf43f
4 changed files with 201 additions and 18 deletions

View file

@ -2,19 +2,33 @@
use deno_core::futures::channel::mpsc; use deno_core::futures::channel::mpsc;
use deno_runtime::deno_napi::*; use deno_runtime::deno_napi::*;
use once_cell::sync::Lazy;
use std::mem::forget; use std::mem::forget;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::Arc;
static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
pub struct TsFn { pub struct TsFn {
pub id: usize,
pub env: *mut Env, pub env: *mut Env,
pub maybe_func: Option<v8::Global<v8::Function>>, pub maybe_func: Option<v8::Global<v8::Function>>,
pub maybe_call_js_cb: Option<napi_threadsafe_function_call_js>, pub maybe_call_js_cb: Option<napi_threadsafe_function_call_js>,
pub context: *mut c_void, pub context: *mut c_void,
pub thread_counter: usize, pub thread_counter: usize,
pub ref_counter: Arc<AtomicUsize>,
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>, sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>, tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
} }
impl Drop for TsFn {
fn drop(&mut self) {
let env = unsafe { self.env.as_mut().unwrap() };
env.remove_threadsafe_function_ref_counter(self.id)
}
}
impl TsFn { impl TsFn {
pub fn acquire(&mut self) -> Result { pub fn acquire(&mut self) -> Result {
self.thread_counter += 1; self.thread_counter += 1;
@ -35,6 +49,29 @@ impl TsFn {
Ok(()) Ok(())
} }
pub fn ref_(&mut self) -> Result {
self
.ref_counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
pub fn unref(&mut self) -> Result {
let _ = self.ref_counter.fetch_update(
std::sync::atomic::Ordering::SeqCst,
std::sync::atomic::Ordering::SeqCst,
|x| {
if x == 0 {
None
} else {
Some(x - 1)
}
},
);
Ok(())
}
pub fn call(&self, data: *mut c_void, is_blocking: bool) { pub fn call(&self, data: *mut c_void, is_blocking: bool) {
let js_func = self.maybe_func.clone(); let js_func = self.maybe_func.clone();
let (tx, rx) = channel(); let (tx, rx) = channel();
@ -107,15 +144,21 @@ fn napi_create_threadsafe_function(
}) })
.transpose()?; .transpose()?;
let id = TS_FN_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let tsfn = TsFn { let tsfn = TsFn {
id,
maybe_func, maybe_func,
maybe_call_js_cb, maybe_call_js_cb,
context, context,
thread_counter: initial_thread_count, thread_counter: initial_thread_count,
sender: env_ref.async_work_sender.clone(), sender: env_ref.async_work_sender.clone(),
tsfn_sender: env_ref.threadsafe_function_sender.clone(), tsfn_sender: env_ref.threadsafe_function_sender.clone(),
ref_counter: Arc::new(AtomicUsize::new(1)),
env, env,
}; };
env_ref
.add_threadsafe_function_ref_counter(tsfn.id, tsfn.ref_counter.clone());
env_ref env_ref
.threadsafe_function_sender .threadsafe_function_sender
@ -142,7 +185,8 @@ fn napi_unref_threadsafe_function(
_env: &mut Env, _env: &mut Env,
tsfn: napi_threadsafe_function, tsfn: napi_threadsafe_function,
) -> Result { ) -> Result {
let _tsfn: &TsFn = &*(tsfn as *const TsFn); let tsfn: &mut TsFn = &mut *(tsfn as *mut TsFn);
tsfn.unref()?;
Ok(()) Ok(())
} }
@ -170,8 +214,12 @@ fn napi_call_threadsafe_function(
} }
#[napi_sym::napi_sym] #[napi_sym::napi_sym]
fn napi_ref_threadsafe_function() -> Result { fn napi_ref_threadsafe_function(
// TODO _env: &mut Env,
func: napi_threadsafe_function,
) -> Result {
let tsfn: &mut TsFn = &mut *(func as *mut TsFn);
tsfn.ref_()?;
Ok(()) Ok(())
} }

View file

@ -19,6 +19,8 @@ use std::ffi::CString;
use std::path::Path; use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use std::task::Poll; use std::task::Poll;
use std::thread_local; use std::thread_local;
@ -322,7 +324,7 @@ pub struct napi_node_version {
} }
pub type PendingNapiAsyncWork = Box<dyn FnOnce()>; pub type PendingNapiAsyncWork = Box<dyn FnOnce()>;
pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>;
pub struct NapiState { pub struct NapiState {
// Async tasks. // Async tasks.
pub pending_async_work: Vec<PendingNapiAsyncWork>, pub pending_async_work: Vec<PendingNapiAsyncWork>,
@ -336,6 +338,7 @@ pub struct NapiState {
mpsc::UnboundedSender<ThreadSafeFunctionStatus>, mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
pub env_cleanup_hooks: pub env_cleanup_hooks:
Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>, Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>,
pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
} }
impl Drop for NapiState { impl Drop for NapiState {
@ -391,6 +394,7 @@ pub struct Env {
mpsc::UnboundedSender<ThreadSafeFunctionStatus>, mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
pub cleanup_hooks: pub cleanup_hooks:
Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>, Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>,
pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
} }
unsafe impl Send for Env {} unsafe impl Send for Env {}
@ -405,6 +409,7 @@ impl Env {
cleanup_hooks: Rc< cleanup_hooks: Rc<
RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>, RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>,
>, >,
tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
) -> Self { ) -> Self {
let sc = sender.clone(); let sc = sender.clone();
ASYNC_WORK_SENDER.with(|s| { ASYNC_WORK_SENDER.with(|s| {
@ -423,6 +428,7 @@ impl Env {
async_work_sender: sender, async_work_sender: sender,
threadsafe_function_sender, threadsafe_function_sender,
cleanup_hooks, cleanup_hooks,
tsfn_ref_counters,
} }
} }
@ -458,6 +464,22 @@ impl Env {
// using `napi_open_handle_scope`. // using `napi_open_handle_scope`.
unsafe { v8::CallbackScope::new(context) } unsafe { v8::CallbackScope::new(context) }
} }
pub fn add_threadsafe_function_ref_counter(
&mut self,
id: usize,
counter: Arc<AtomicUsize>,
) {
let mut counters = self.tsfn_ref_counters.borrow_mut();
assert!(!counters.iter().any(|(i, _)| *i == id));
counters.push((id, counter));
}
pub fn remove_threadsafe_function_ref_counter(&mut self, id: usize) {
let mut counters = self.tsfn_ref_counters.borrow_mut();
let index = counters.iter().position(|(i, _)| *i == id).unwrap();
counters.remove(index);
}
} }
pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension { pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
@ -479,22 +501,16 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
napi_state.pending_async_work.push(async_work_fut); napi_state.pending_async_work.push(async_work_fut);
} }
while let Poll::Ready(Some(tsfn_status)) =
napi_state.threadsafe_function_receiver.poll_next_unpin(cx)
{
match tsfn_status {
ThreadSafeFunctionStatus::Alive => {
napi_state.active_threadsafe_functions += 1
}
ThreadSafeFunctionStatus::Dead => {
napi_state.active_threadsafe_functions -= 1
}
};
}
if napi_state.active_threadsafe_functions > 0 { if napi_state.active_threadsafe_functions > 0 {
maybe_scheduling = true; maybe_scheduling = true;
} }
for (_id, counter) in napi_state.tsfn_ref_counters.borrow().iter() {
if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 {
maybe_scheduling = true;
break;
}
}
} }
loop { loop {
@ -527,6 +543,7 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
threadsafe_function_receiver, threadsafe_function_receiver,
active_threadsafe_functions: 0, active_threadsafe_functions: 0,
env_cleanup_hooks: Rc::new(RefCell::new(vec![])), env_cleanup_hooks: Rc::new(RefCell::new(vec![])),
tsfn_ref_counters: Rc::new(RefCell::new(vec![])),
}); });
state.put(Unstable(unstable)); state.put(Unstable(unstable));
Ok(()) Ok(())
@ -563,7 +580,13 @@ where
let permissions = op_state.borrow_mut::<NP>(); let permissions = op_state.borrow_mut::<NP>();
permissions.check(Some(&PathBuf::from(&path)))?; permissions.check(Some(&PathBuf::from(&path)))?;
let (async_work_sender, tsfn_sender, isolate_ptr, cleanup_hooks) = { let (
async_work_sender,
tsfn_sender,
isolate_ptr,
cleanup_hooks,
tsfn_ref_counters,
) = {
let napi_state = op_state.borrow::<NapiState>(); let napi_state = op_state.borrow::<NapiState>();
let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>(); let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>();
( (
@ -571,6 +594,7 @@ where
napi_state.threadsafe_function_sender.clone(), napi_state.threadsafe_function_sender.clone(),
*isolate_ptr, *isolate_ptr,
napi_state.env_cleanup_hooks.clone(), napi_state.env_cleanup_hooks.clone(),
napi_state.tsfn_ref_counters.clone(),
) )
}; };
@ -593,6 +617,7 @@ where
async_work_sender, async_work_sender,
tsfn_sender, tsfn_sender,
cleanup_hooks, cleanup_hooks,
tsfn_ref_counters,
); );
env.shared = Box::into_raw(Box::new(env_shared)); env.shared = Box::into_raw(Box::new(env_shared));
let env_ptr = Box::into_raw(Box::new(env)) as _; let env_ptr = Box::into_raw(Box::new(env)) as _;

View file

@ -17,6 +17,7 @@ pub mod primitives;
pub mod promise; pub mod promise;
pub mod properties; pub mod properties;
pub mod strings; pub mod strings;
pub mod tsfn;
pub mod typedarray; pub mod typedarray;
#[macro_export] #[macro_export]
@ -126,6 +127,7 @@ unsafe extern "C" fn napi_register_module_v1(
object_wrap::init(env, exports); object_wrap::init(env, exports);
callback::init(env, exports); callback::init(env, exports);
r#async::init(env, exports); r#async::init(env, exports);
tsfn::init(env, exports);
init_cleanup_hook(env, exports); init_cleanup_hook(env, exports);
exports exports

108
test_napi/src/tsfn.rs Normal file
View file

@ -0,0 +1,108 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// This test performs initilization similar to napi-rs.
// https://github.com/napi-rs/napi-rs/commit/a5a04a4e545f268769cc78e2bd6c45af4336aac3
use napi_sys as sys;
use std::ffi::c_char;
use std::ffi::c_void;
use std::ptr;
macro_rules! check_status_or_panic {
($code:expr, $msg:expr) => {{
let c = $code;
match c {
sys::Status::napi_ok => {}
_ => panic!($msg),
}
}};
}
fn create_custom_gc(env: sys::napi_env) {
let mut custom_gc_fn = ptr::null_mut();
check_status_or_panic!(
unsafe {
sys::napi_create_function(
env,
"custom_gc".as_ptr() as *const c_char,
9,
Some(empty),
ptr::null_mut(),
&mut custom_gc_fn,
)
},
"Create Custom GC Function in napi_register_module_v1 failed"
);
let mut async_resource_name = ptr::null_mut();
check_status_or_panic!(
unsafe {
sys::napi_create_string_utf8(
env,
"CustomGC".as_ptr() as *const c_char,
8,
&mut async_resource_name,
)
},
"Create async resource string in napi_register_module_v1 napi_register_module_v1"
);
let mut custom_gc_tsfn = ptr::null_mut();
check_status_or_panic!(
unsafe {
sys::napi_create_threadsafe_function(
env,
custom_gc_fn,
ptr::null_mut(),
async_resource_name,
0,
1,
ptr::null_mut(),
Some(custom_gc_finalize),
ptr::null_mut(),
Some(custom_gc),
&mut custom_gc_tsfn,
)
},
"Create Custom GC ThreadsafeFunction in napi_register_module_v1 failed"
);
check_status_or_panic!(
unsafe { sys::napi_unref_threadsafe_function(env, custom_gc_tsfn) },
"Unref Custom GC ThreadsafeFunction in napi_register_module_v1 failed"
);
}
unsafe extern "C" fn empty(
_env: sys::napi_env,
_info: sys::napi_callback_info,
) -> sys::napi_value {
ptr::null_mut()
}
unsafe extern "C" fn custom_gc_finalize(
_env: sys::napi_env,
_finalize_data: *mut c_void,
_finalize_hint: *mut c_void,
) {
}
extern "C" fn custom_gc(
env: sys::napi_env,
_js_callback: sys::napi_value,
_context: *mut c_void,
data: *mut c_void,
) {
let mut ref_count = 0;
check_status_or_panic!(
unsafe {
sys::napi_reference_unref(env, data as sys::napi_ref, &mut ref_count)
},
"Failed to unref Buffer reference in Custom GC"
);
check_status_or_panic!(
unsafe { sys::napi_delete_reference(env, data as sys::napi_ref) },
"Failed to delete Buffer reference in Custom GC"
);
}
pub fn init(env: sys::napi_env, _exports: sys::napi_value) {
create_custom_gc(env);
}