mirror of
https://github.com/denoland/deno.git
synced 2025-01-11 00:21:05 -05:00
fix(napi): Implement napi_threadsafe_function
ref and unref (#17304)
Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
parent
cc806cdf21
commit
dd2829be0c
4 changed files with 201 additions and 18 deletions
|
@ -2,19 +2,33 @@
|
||||||
|
|
||||||
use deno_core::futures::channel::mpsc;
|
use deno_core::futures::channel::mpsc;
|
||||||
use deno_runtime::deno_napi::*;
|
use deno_runtime::deno_napi::*;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
use std::mem::forget;
|
use std::mem::forget;
|
||||||
|
use std::sync::atomic::AtomicUsize;
|
||||||
use std::sync::mpsc::channel;
|
use std::sync::mpsc::channel;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
static TS_FN_ID_COUNTER: Lazy<AtomicUsize> = Lazy::new(|| AtomicUsize::new(0));
|
||||||
|
|
||||||
pub struct TsFn {
|
pub struct TsFn {
|
||||||
|
pub id: usize,
|
||||||
pub env: *mut Env,
|
pub env: *mut Env,
|
||||||
pub maybe_func: Option<v8::Global<v8::Function>>,
|
pub maybe_func: Option<v8::Global<v8::Function>>,
|
||||||
pub maybe_call_js_cb: Option<napi_threadsafe_function_call_js>,
|
pub maybe_call_js_cb: Option<napi_threadsafe_function_call_js>,
|
||||||
pub context: *mut c_void,
|
pub context: *mut c_void,
|
||||||
pub thread_counter: usize,
|
pub thread_counter: usize,
|
||||||
|
pub ref_counter: Arc<AtomicUsize>,
|
||||||
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
|
sender: mpsc::UnboundedSender<PendingNapiAsyncWork>,
|
||||||
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
|
tsfn_sender: mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for TsFn {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let env = unsafe { self.env.as_mut().unwrap() };
|
||||||
|
env.remove_threadsafe_function_ref_counter(self.id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl TsFn {
|
impl TsFn {
|
||||||
pub fn acquire(&mut self) -> Result {
|
pub fn acquire(&mut self) -> Result {
|
||||||
self.thread_counter += 1;
|
self.thread_counter += 1;
|
||||||
|
@ -35,6 +49,29 @@ impl TsFn {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn ref_(&mut self) -> Result {
|
||||||
|
self
|
||||||
|
.ref_counter
|
||||||
|
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn unref(&mut self) -> Result {
|
||||||
|
let _ = self.ref_counter.fetch_update(
|
||||||
|
std::sync::atomic::Ordering::SeqCst,
|
||||||
|
std::sync::atomic::Ordering::SeqCst,
|
||||||
|
|x| {
|
||||||
|
if x == 0 {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(x - 1)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn call(&self, data: *mut c_void, is_blocking: bool) {
|
pub fn call(&self, data: *mut c_void, is_blocking: bool) {
|
||||||
let js_func = self.maybe_func.clone();
|
let js_func = self.maybe_func.clone();
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
|
@ -107,15 +144,21 @@ fn napi_create_threadsafe_function(
|
||||||
})
|
})
|
||||||
.transpose()?;
|
.transpose()?;
|
||||||
|
|
||||||
|
let id = TS_FN_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
|
||||||
let tsfn = TsFn {
|
let tsfn = TsFn {
|
||||||
|
id,
|
||||||
maybe_func,
|
maybe_func,
|
||||||
maybe_call_js_cb,
|
maybe_call_js_cb,
|
||||||
context,
|
context,
|
||||||
thread_counter: initial_thread_count,
|
thread_counter: initial_thread_count,
|
||||||
sender: env_ref.async_work_sender.clone(),
|
sender: env_ref.async_work_sender.clone(),
|
||||||
tsfn_sender: env_ref.threadsafe_function_sender.clone(),
|
tsfn_sender: env_ref.threadsafe_function_sender.clone(),
|
||||||
|
ref_counter: Arc::new(AtomicUsize::new(1)),
|
||||||
env,
|
env,
|
||||||
};
|
};
|
||||||
|
env_ref
|
||||||
|
.add_threadsafe_function_ref_counter(tsfn.id, tsfn.ref_counter.clone());
|
||||||
|
|
||||||
env_ref
|
env_ref
|
||||||
.threadsafe_function_sender
|
.threadsafe_function_sender
|
||||||
|
@ -142,7 +185,8 @@ fn napi_unref_threadsafe_function(
|
||||||
_env: &mut Env,
|
_env: &mut Env,
|
||||||
tsfn: napi_threadsafe_function,
|
tsfn: napi_threadsafe_function,
|
||||||
) -> Result {
|
) -> Result {
|
||||||
let _tsfn: &TsFn = &*(tsfn as *const TsFn);
|
let tsfn: &mut TsFn = &mut *(tsfn as *mut TsFn);
|
||||||
|
tsfn.unref()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -170,8 +214,12 @@ fn napi_call_threadsafe_function(
|
||||||
}
|
}
|
||||||
|
|
||||||
#[napi_sym::napi_sym]
|
#[napi_sym::napi_sym]
|
||||||
fn napi_ref_threadsafe_function() -> Result {
|
fn napi_ref_threadsafe_function(
|
||||||
// TODO
|
_env: &mut Env,
|
||||||
|
func: napi_threadsafe_function,
|
||||||
|
) -> Result {
|
||||||
|
let tsfn: &mut TsFn = &mut *(func as *mut TsFn);
|
||||||
|
tsfn.ref_()?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,8 @@ use std::ffi::CString;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
use std::sync::atomic::AtomicUsize;
|
||||||
|
use std::sync::Arc;
|
||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
use std::thread_local;
|
use std::thread_local;
|
||||||
|
|
||||||
|
@ -322,7 +324,7 @@ pub struct napi_node_version {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type PendingNapiAsyncWork = Box<dyn FnOnce()>;
|
pub type PendingNapiAsyncWork = Box<dyn FnOnce()>;
|
||||||
|
pub type ThreadsafeFunctionRefCounters = Vec<(usize, Arc<AtomicUsize>)>;
|
||||||
pub struct NapiState {
|
pub struct NapiState {
|
||||||
// Async tasks.
|
// Async tasks.
|
||||||
pub pending_async_work: Vec<PendingNapiAsyncWork>,
|
pub pending_async_work: Vec<PendingNapiAsyncWork>,
|
||||||
|
@ -336,6 +338,7 @@ pub struct NapiState {
|
||||||
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
|
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
|
||||||
pub env_cleanup_hooks:
|
pub env_cleanup_hooks:
|
||||||
Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>,
|
Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>,
|
||||||
|
pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for NapiState {
|
impl Drop for NapiState {
|
||||||
|
@ -391,6 +394,7 @@ pub struct Env {
|
||||||
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
|
mpsc::UnboundedSender<ThreadSafeFunctionStatus>,
|
||||||
pub cleanup_hooks:
|
pub cleanup_hooks:
|
||||||
Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>,
|
Rc<RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>>,
|
||||||
|
pub tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl Send for Env {}
|
unsafe impl Send for Env {}
|
||||||
|
@ -405,6 +409,7 @@ impl Env {
|
||||||
cleanup_hooks: Rc<
|
cleanup_hooks: Rc<
|
||||||
RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>,
|
RefCell<Vec<(extern "C" fn(*const c_void), *const c_void)>>,
|
||||||
>,
|
>,
|
||||||
|
tsfn_ref_counters: Rc<RefCell<ThreadsafeFunctionRefCounters>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let sc = sender.clone();
|
let sc = sender.clone();
|
||||||
ASYNC_WORK_SENDER.with(|s| {
|
ASYNC_WORK_SENDER.with(|s| {
|
||||||
|
@ -423,6 +428,7 @@ impl Env {
|
||||||
async_work_sender: sender,
|
async_work_sender: sender,
|
||||||
threadsafe_function_sender,
|
threadsafe_function_sender,
|
||||||
cleanup_hooks,
|
cleanup_hooks,
|
||||||
|
tsfn_ref_counters,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -458,6 +464,22 @@ impl Env {
|
||||||
// using `napi_open_handle_scope`.
|
// using `napi_open_handle_scope`.
|
||||||
unsafe { v8::CallbackScope::new(context) }
|
unsafe { v8::CallbackScope::new(context) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn add_threadsafe_function_ref_counter(
|
||||||
|
&mut self,
|
||||||
|
id: usize,
|
||||||
|
counter: Arc<AtomicUsize>,
|
||||||
|
) {
|
||||||
|
let mut counters = self.tsfn_ref_counters.borrow_mut();
|
||||||
|
assert!(!counters.iter().any(|(i, _)| *i == id));
|
||||||
|
counters.push((id, counter));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_threadsafe_function_ref_counter(&mut self, id: usize) {
|
||||||
|
let mut counters = self.tsfn_ref_counters.borrow_mut();
|
||||||
|
let index = counters.iter().position(|(i, _)| *i == id).unwrap();
|
||||||
|
counters.remove(index);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
|
pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
|
||||||
|
@ -479,22 +501,16 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
|
||||||
napi_state.pending_async_work.push(async_work_fut);
|
napi_state.pending_async_work.push(async_work_fut);
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Poll::Ready(Some(tsfn_status)) =
|
|
||||||
napi_state.threadsafe_function_receiver.poll_next_unpin(cx)
|
|
||||||
{
|
|
||||||
match tsfn_status {
|
|
||||||
ThreadSafeFunctionStatus::Alive => {
|
|
||||||
napi_state.active_threadsafe_functions += 1
|
|
||||||
}
|
|
||||||
ThreadSafeFunctionStatus::Dead => {
|
|
||||||
napi_state.active_threadsafe_functions -= 1
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
if napi_state.active_threadsafe_functions > 0 {
|
if napi_state.active_threadsafe_functions > 0 {
|
||||||
maybe_scheduling = true;
|
maybe_scheduling = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (_id, counter) in napi_state.tsfn_ref_counters.borrow().iter() {
|
||||||
|
if counter.load(std::sync::atomic::Ordering::SeqCst) > 0 {
|
||||||
|
maybe_scheduling = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -527,6 +543,7 @@ pub fn init<P: NapiPermissions + 'static>(unstable: bool) -> Extension {
|
||||||
threadsafe_function_receiver,
|
threadsafe_function_receiver,
|
||||||
active_threadsafe_functions: 0,
|
active_threadsafe_functions: 0,
|
||||||
env_cleanup_hooks: Rc::new(RefCell::new(vec![])),
|
env_cleanup_hooks: Rc::new(RefCell::new(vec![])),
|
||||||
|
tsfn_ref_counters: Rc::new(RefCell::new(vec![])),
|
||||||
});
|
});
|
||||||
state.put(Unstable(unstable));
|
state.put(Unstable(unstable));
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -563,7 +580,13 @@ where
|
||||||
let permissions = op_state.borrow_mut::<NP>();
|
let permissions = op_state.borrow_mut::<NP>();
|
||||||
permissions.check(Some(&PathBuf::from(&path)))?;
|
permissions.check(Some(&PathBuf::from(&path)))?;
|
||||||
|
|
||||||
let (async_work_sender, tsfn_sender, isolate_ptr, cleanup_hooks) = {
|
let (
|
||||||
|
async_work_sender,
|
||||||
|
tsfn_sender,
|
||||||
|
isolate_ptr,
|
||||||
|
cleanup_hooks,
|
||||||
|
tsfn_ref_counters,
|
||||||
|
) = {
|
||||||
let napi_state = op_state.borrow::<NapiState>();
|
let napi_state = op_state.borrow::<NapiState>();
|
||||||
let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>();
|
let isolate_ptr = op_state.borrow::<*mut v8::OwnedIsolate>();
|
||||||
(
|
(
|
||||||
|
@ -571,6 +594,7 @@ where
|
||||||
napi_state.threadsafe_function_sender.clone(),
|
napi_state.threadsafe_function_sender.clone(),
|
||||||
*isolate_ptr,
|
*isolate_ptr,
|
||||||
napi_state.env_cleanup_hooks.clone(),
|
napi_state.env_cleanup_hooks.clone(),
|
||||||
|
napi_state.tsfn_ref_counters.clone(),
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -593,6 +617,7 @@ where
|
||||||
async_work_sender,
|
async_work_sender,
|
||||||
tsfn_sender,
|
tsfn_sender,
|
||||||
cleanup_hooks,
|
cleanup_hooks,
|
||||||
|
tsfn_ref_counters,
|
||||||
);
|
);
|
||||||
env.shared = Box::into_raw(Box::new(env_shared));
|
env.shared = Box::into_raw(Box::new(env_shared));
|
||||||
let env_ptr = Box::into_raw(Box::new(env)) as _;
|
let env_ptr = Box::into_raw(Box::new(env)) as _;
|
||||||
|
|
|
@ -17,6 +17,7 @@ pub mod primitives;
|
||||||
pub mod promise;
|
pub mod promise;
|
||||||
pub mod properties;
|
pub mod properties;
|
||||||
pub mod strings;
|
pub mod strings;
|
||||||
|
pub mod tsfn;
|
||||||
pub mod typedarray;
|
pub mod typedarray;
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
|
@ -126,6 +127,7 @@ unsafe extern "C" fn napi_register_module_v1(
|
||||||
object_wrap::init(env, exports);
|
object_wrap::init(env, exports);
|
||||||
callback::init(env, exports);
|
callback::init(env, exports);
|
||||||
r#async::init(env, exports);
|
r#async::init(env, exports);
|
||||||
|
tsfn::init(env, exports);
|
||||||
init_cleanup_hook(env, exports);
|
init_cleanup_hook(env, exports);
|
||||||
|
|
||||||
exports
|
exports
|
||||||
|
|
108
test_napi/src/tsfn.rs
Normal file
108
test_napi/src/tsfn.rs
Normal file
|
@ -0,0 +1,108 @@
|
||||||
|
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
// This test performs initilization similar to napi-rs.
|
||||||
|
// https://github.com/napi-rs/napi-rs/commit/a5a04a4e545f268769cc78e2bd6c45af4336aac3
|
||||||
|
|
||||||
|
use napi_sys as sys;
|
||||||
|
use std::ffi::c_char;
|
||||||
|
use std::ffi::c_void;
|
||||||
|
use std::ptr;
|
||||||
|
|
||||||
|
macro_rules! check_status_or_panic {
|
||||||
|
($code:expr, $msg:expr) => {{
|
||||||
|
let c = $code;
|
||||||
|
match c {
|
||||||
|
sys::Status::napi_ok => {}
|
||||||
|
_ => panic!($msg),
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_custom_gc(env: sys::napi_env) {
|
||||||
|
let mut custom_gc_fn = ptr::null_mut();
|
||||||
|
check_status_or_panic!(
|
||||||
|
unsafe {
|
||||||
|
sys::napi_create_function(
|
||||||
|
env,
|
||||||
|
"custom_gc".as_ptr() as *const c_char,
|
||||||
|
9,
|
||||||
|
Some(empty),
|
||||||
|
ptr::null_mut(),
|
||||||
|
&mut custom_gc_fn,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
"Create Custom GC Function in napi_register_module_v1 failed"
|
||||||
|
);
|
||||||
|
let mut async_resource_name = ptr::null_mut();
|
||||||
|
check_status_or_panic!(
|
||||||
|
unsafe {
|
||||||
|
sys::napi_create_string_utf8(
|
||||||
|
env,
|
||||||
|
"CustomGC".as_ptr() as *const c_char,
|
||||||
|
8,
|
||||||
|
&mut async_resource_name,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
"Create async resource string in napi_register_module_v1 napi_register_module_v1"
|
||||||
|
);
|
||||||
|
let mut custom_gc_tsfn = ptr::null_mut();
|
||||||
|
check_status_or_panic!(
|
||||||
|
unsafe {
|
||||||
|
sys::napi_create_threadsafe_function(
|
||||||
|
env,
|
||||||
|
custom_gc_fn,
|
||||||
|
ptr::null_mut(),
|
||||||
|
async_resource_name,
|
||||||
|
0,
|
||||||
|
1,
|
||||||
|
ptr::null_mut(),
|
||||||
|
Some(custom_gc_finalize),
|
||||||
|
ptr::null_mut(),
|
||||||
|
Some(custom_gc),
|
||||||
|
&mut custom_gc_tsfn,
|
||||||
|
)
|
||||||
|
},
|
||||||
|
"Create Custom GC ThreadsafeFunction in napi_register_module_v1 failed"
|
||||||
|
);
|
||||||
|
check_status_or_panic!(
|
||||||
|
unsafe { sys::napi_unref_threadsafe_function(env, custom_gc_tsfn) },
|
||||||
|
"Unref Custom GC ThreadsafeFunction in napi_register_module_v1 failed"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe extern "C" fn empty(
|
||||||
|
_env: sys::napi_env,
|
||||||
|
_info: sys::napi_callback_info,
|
||||||
|
) -> sys::napi_value {
|
||||||
|
ptr::null_mut()
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe extern "C" fn custom_gc_finalize(
|
||||||
|
_env: sys::napi_env,
|
||||||
|
_finalize_data: *mut c_void,
|
||||||
|
_finalize_hint: *mut c_void,
|
||||||
|
) {
|
||||||
|
}
|
||||||
|
|
||||||
|
extern "C" fn custom_gc(
|
||||||
|
env: sys::napi_env,
|
||||||
|
_js_callback: sys::napi_value,
|
||||||
|
_context: *mut c_void,
|
||||||
|
data: *mut c_void,
|
||||||
|
) {
|
||||||
|
let mut ref_count = 0;
|
||||||
|
check_status_or_panic!(
|
||||||
|
unsafe {
|
||||||
|
sys::napi_reference_unref(env, data as sys::napi_ref, &mut ref_count)
|
||||||
|
},
|
||||||
|
"Failed to unref Buffer reference in Custom GC"
|
||||||
|
);
|
||||||
|
check_status_or_panic!(
|
||||||
|
unsafe { sys::napi_delete_reference(env, data as sys::napi_ref) },
|
||||||
|
"Failed to delete Buffer reference in Custom GC"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn init(env: sys::napi_env, _exports: sys::napi_value) {
|
||||||
|
create_custom_gc(env);
|
||||||
|
}
|
Loading…
Reference in a new issue