mirror of
https://github.com/denoland/deno.git
synced 2024-12-24 08:09:08 -05:00
feat(ext/ffi): Thread safe callbacks (#14942)
This commit is contained in:
parent
ed7a321994
commit
0aaeea8b67
11 changed files with 400 additions and 28 deletions
27
cli/dts/lib.deno.unstable.d.ts
vendored
27
cli/dts/lib.deno.unstable.d.ts
vendored
|
@ -565,6 +565,9 @@ declare namespace Deno {
|
||||||
* as C function pointers to ffi calls.
|
* as C function pointers to ffi calls.
|
||||||
*
|
*
|
||||||
* The function pointer remains valid until the `close()` method is called.
|
* The function pointer remains valid until the `close()` method is called.
|
||||||
|
*
|
||||||
|
* The callback can be explicitly ref'ed and deref'ed to stop Deno's
|
||||||
|
* process from exiting.
|
||||||
*/
|
*/
|
||||||
export class UnsafeCallback<
|
export class UnsafeCallback<
|
||||||
Definition extends UnsafeCallbackDefinition = UnsafeCallbackDefinition,
|
Definition extends UnsafeCallbackDefinition = UnsafeCallbackDefinition,
|
||||||
|
@ -584,6 +587,30 @@ declare namespace Deno {
|
||||||
Definition["result"]
|
Definition["result"]
|
||||||
>;
|
>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds one to this callback's reference counting.
|
||||||
|
*
|
||||||
|
* If the callback's reference count becomes non-zero, it will keep
|
||||||
|
* Deno's process from exiting.
|
||||||
|
*/
|
||||||
|
ref(): void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes one from this callback's reference counting.
|
||||||
|
*
|
||||||
|
* If the callback's reference counter becomes zero, it will no longer
|
||||||
|
* keep Deno's process from exiting.
|
||||||
|
*/
|
||||||
|
unref(): void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the C function pointer associated with the UnsafeCallback.
|
||||||
|
* Continuing to use the instance after calling this object will lead to errors
|
||||||
|
* and crashes.
|
||||||
|
*
|
||||||
|
* Calling this method will also immediately set the callback's reference
|
||||||
|
* counting to zero and it will no longer keep Deno's process from exiting.
|
||||||
|
*/
|
||||||
close(): void;
|
close(): void;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,8 @@ type Task = Box<dyn FnOnce()>;
|
||||||
fn main() {
|
fn main() {
|
||||||
let my_ext = Extension::builder()
|
let my_ext = Extension::builder()
|
||||||
.ops(vec![op_schedule_task::decl()])
|
.ops(vec![op_schedule_task::decl()])
|
||||||
.event_loop_middleware(|state, cx| {
|
.event_loop_middleware(|state_rc, cx| {
|
||||||
|
let mut state = state_rc.borrow_mut();
|
||||||
let recv = state.borrow_mut::<mpsc::UnboundedReceiver<Task>>();
|
let recv = state.borrow_mut::<mpsc::UnboundedReceiver<Task>>();
|
||||||
let mut ref_loop = false;
|
let mut ref_loop = false;
|
||||||
while let Poll::Ready(Some(call)) = recv.poll_next_unpin(cx) {
|
while let Poll::Ready(Some(call)) = recv.poll_next_unpin(cx) {
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||||
use crate::OpState;
|
use crate::OpState;
|
||||||
use anyhow::Error;
|
use anyhow::Error;
|
||||||
use std::task::Context;
|
use std::{cell::RefCell, rc::Rc, task::Context};
|
||||||
|
|
||||||
pub type SourcePair = (&'static str, &'static str);
|
pub type SourcePair = (&'static str, &'static str);
|
||||||
pub type OpFnRef = v8::FunctionCallback;
|
pub type OpFnRef = v8::FunctionCallback;
|
||||||
pub type OpMiddlewareFn = dyn Fn(OpDecl) -> OpDecl;
|
pub type OpMiddlewareFn = dyn Fn(OpDecl) -> OpDecl;
|
||||||
pub type OpStateFn = dyn Fn(&mut OpState) -> Result<(), Error>;
|
pub type OpStateFn = dyn Fn(&mut OpState) -> Result<(), Error>;
|
||||||
pub type OpEventLoopFn = dyn Fn(&mut OpState, &mut Context) -> bool;
|
pub type OpEventLoopFn = dyn Fn(Rc<RefCell<OpState>>, &mut Context) -> bool;
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy)]
|
||||||
pub struct OpDecl {
|
pub struct OpDecl {
|
||||||
|
@ -90,13 +90,13 @@ impl Extension {
|
||||||
|
|
||||||
pub fn run_event_loop_middleware(
|
pub fn run_event_loop_middleware(
|
||||||
&self,
|
&self,
|
||||||
op_state: &mut OpState,
|
op_state_rc: Rc<RefCell<OpState>>,
|
||||||
cx: &mut Context,
|
cx: &mut Context,
|
||||||
) -> bool {
|
) -> bool {
|
||||||
self
|
self
|
||||||
.event_loop_middleware
|
.event_loop_middleware
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.map(|f| f(op_state, cx))
|
.map(|f| f(op_state_rc, cx))
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,7 +148,7 @@ impl ExtensionBuilder {
|
||||||
|
|
||||||
pub fn event_loop_middleware<F>(&mut self, middleware_fn: F) -> &mut Self
|
pub fn event_loop_middleware<F>(&mut self, middleware_fn: F) -> &mut Self
|
||||||
where
|
where
|
||||||
F: Fn(&mut OpState, &mut Context) -> bool + 'static,
|
F: Fn(Rc<RefCell<OpState>>, &mut Context) -> bool + 'static,
|
||||||
{
|
{
|
||||||
self.event_loop_middleware = Some(Box::new(middleware_fn));
|
self.event_loop_middleware = Some(Box::new(middleware_fn));
|
||||||
self
|
self
|
||||||
|
|
|
@ -921,7 +921,7 @@ impl JsRuntime {
|
||||||
let state = state_rc.borrow();
|
let state = state_rc.borrow();
|
||||||
let op_state = state.op_state.clone();
|
let op_state = state.op_state.clone();
|
||||||
for f in &self.event_loop_middlewares {
|
for f in &self.event_loop_middlewares {
|
||||||
if f(&mut op_state.borrow_mut(), cx) {
|
if f(op_state.clone(), cx) {
|
||||||
maybe_scheduling = true;
|
maybe_scheduling = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -201,6 +201,7 @@
|
||||||
}
|
}
|
||||||
|
|
||||||
class UnsafeCallback {
|
class UnsafeCallback {
|
||||||
|
#refcount;
|
||||||
#rid;
|
#rid;
|
||||||
definition;
|
definition;
|
||||||
callback;
|
callback;
|
||||||
|
@ -217,13 +218,30 @@
|
||||||
definition,
|
definition,
|
||||||
callback,
|
callback,
|
||||||
);
|
);
|
||||||
|
this.#refcount = 0;
|
||||||
this.#rid = rid;
|
this.#rid = rid;
|
||||||
this.pointer = pointer;
|
this.pointer = pointer;
|
||||||
this.definition = definition;
|
this.definition = definition;
|
||||||
this.callback = callback;
|
this.callback = callback;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ref() {
|
||||||
|
if (this.#refcount++ === 0) {
|
||||||
|
core.opSync("op_ffi_unsafe_callback_ref", true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unref() {
|
||||||
|
if (--this.#refcount === 0) {
|
||||||
|
core.opSync("op_ffi_unsafe_callback_ref", false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
close() {
|
close() {
|
||||||
|
if (this.#refcount) {
|
||||||
|
this.#refcount = 0;
|
||||||
|
core.opSync("op_ffi_unsafe_callback_ref", false);
|
||||||
|
}
|
||||||
core.close(this.#rid);
|
core.close(this.#rid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
132
ext/ffi/lib.rs
132
ext/ffi/lib.rs
|
@ -6,9 +6,11 @@ use deno_core::error::generic_error;
|
||||||
use deno_core::error::range_error;
|
use deno_core::error::range_error;
|
||||||
use deno_core::error::type_error;
|
use deno_core::error::type_error;
|
||||||
use deno_core::error::AnyError;
|
use deno_core::error::AnyError;
|
||||||
|
use deno_core::futures::channel::mpsc;
|
||||||
use deno_core::futures::Future;
|
use deno_core::futures::Future;
|
||||||
use deno_core::include_js_files;
|
use deno_core::include_js_files;
|
||||||
use deno_core::op;
|
use deno_core::op;
|
||||||
|
use std::sync::mpsc::sync_channel;
|
||||||
|
|
||||||
use deno_core::serde_json::json;
|
use deno_core::serde_json::json;
|
||||||
use deno_core::serde_json::Value;
|
use deno_core::serde_json::Value;
|
||||||
|
@ -37,7 +39,7 @@ use std::ptr;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
|
|
||||||
thread_local! {
|
thread_local! {
|
||||||
static IS_ISOLATE_THREAD: RefCell<bool> = RefCell::new(false);
|
static LOCAL_ISOLATE_POINTER: RefCell<*const v8::Isolate> = RefCell::new(ptr::null());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Unstable(pub bool);
|
pub struct Unstable(pub bool);
|
||||||
|
@ -122,7 +124,6 @@ impl DynamicLibraryResource {
|
||||||
name: String,
|
name: String,
|
||||||
foreign_fn: ForeignFunction,
|
foreign_fn: ForeignFunction,
|
||||||
) -> Result<(), AnyError> {
|
) -> Result<(), AnyError> {
|
||||||
IS_ISOLATE_THREAD.with(|s| s.replace(true));
|
|
||||||
let symbol = match &foreign_fn.name {
|
let symbol = match &foreign_fn.name {
|
||||||
Some(symbol) => symbol,
|
Some(symbol) => symbol,
|
||||||
None => &name,
|
None => &name,
|
||||||
|
@ -178,6 +179,14 @@ impl DynamicLibraryResource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PendingFfiAsyncWork = Box<dyn FnOnce()>;
|
||||||
|
|
||||||
|
struct FfiState {
|
||||||
|
async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
|
||||||
|
async_work_receiver: mpsc::UnboundedReceiver<PendingFfiAsyncWork>,
|
||||||
|
active_refed_functions: usize,
|
||||||
|
}
|
||||||
|
|
||||||
pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
|
pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
|
||||||
Extension::builder()
|
Extension::builder()
|
||||||
.js(include_js_files!(
|
.js(include_js_files!(
|
||||||
|
@ -204,10 +213,51 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
|
||||||
op_ffi_read_f32::decl::<P>(),
|
op_ffi_read_f32::decl::<P>(),
|
||||||
op_ffi_read_f64::decl::<P>(),
|
op_ffi_read_f64::decl::<P>(),
|
||||||
op_ffi_unsafe_callback_create::decl::<P>(),
|
op_ffi_unsafe_callback_create::decl::<P>(),
|
||||||
|
op_ffi_unsafe_callback_ref::decl(),
|
||||||
])
|
])
|
||||||
|
.event_loop_middleware(|op_state_rc, _cx| {
|
||||||
|
// FFI callbacks coming in from other threads will call in and get queued.
|
||||||
|
let mut maybe_scheduling = false;
|
||||||
|
|
||||||
|
let mut work_items: Vec<PendingFfiAsyncWork> = vec![];
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut op_state = op_state_rc.borrow_mut();
|
||||||
|
let ffi_state = op_state.borrow_mut::<FfiState>();
|
||||||
|
|
||||||
|
while let Ok(Some(async_work_fut)) =
|
||||||
|
ffi_state.async_work_receiver.try_next()
|
||||||
|
{
|
||||||
|
// Move received items to a temporary vector so that we can drop the `op_state` borrow before we do the work.
|
||||||
|
work_items.push(async_work_fut);
|
||||||
|
maybe_scheduling = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ffi_state.active_refed_functions > 0 {
|
||||||
|
maybe_scheduling = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(op_state);
|
||||||
|
}
|
||||||
|
while let Some(async_work_fut) = work_items.pop() {
|
||||||
|
async_work_fut();
|
||||||
|
}
|
||||||
|
|
||||||
|
maybe_scheduling
|
||||||
|
})
|
||||||
.state(move |state| {
|
.state(move |state| {
|
||||||
// Stolen from deno_webgpu, is there a better option?
|
// Stolen from deno_webgpu, is there a better option?
|
||||||
state.put(Unstable(unstable));
|
state.put(Unstable(unstable));
|
||||||
|
|
||||||
|
let (async_work_sender, async_work_receiver) =
|
||||||
|
mpsc::unbounded::<PendingFfiAsyncWork>();
|
||||||
|
|
||||||
|
state.put(FfiState {
|
||||||
|
active_refed_functions: 0,
|
||||||
|
async_work_receiver,
|
||||||
|
async_work_sender,
|
||||||
|
});
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.build()
|
.build()
|
||||||
|
@ -831,6 +881,7 @@ impl Resource for UnsafeCallbackResource {
|
||||||
}
|
}
|
||||||
|
|
||||||
struct CallbackInfo {
|
struct CallbackInfo {
|
||||||
|
pub async_work_sender: mpsc::UnboundedSender<PendingFfiAsyncWork>,
|
||||||
pub callback: NonNull<v8::Function>,
|
pub callback: NonNull<v8::Function>,
|
||||||
pub context: NonNull<v8::Context>,
|
pub context: NonNull<v8::Context>,
|
||||||
pub isolate: *mut v8::Isolate,
|
pub isolate: *mut v8::Isolate,
|
||||||
|
@ -842,21 +893,55 @@ unsafe extern "C" fn deno_ffi_callback(
|
||||||
args: *const *const c_void,
|
args: *const *const c_void,
|
||||||
info: &CallbackInfo,
|
info: &CallbackInfo,
|
||||||
) {
|
) {
|
||||||
let isolate = &mut *info.isolate;
|
LOCAL_ISOLATE_POINTER.with(|s| {
|
||||||
let callback = v8::Global::from_raw(isolate, info.callback);
|
if ptr::eq(*s.borrow(), info.isolate) {
|
||||||
|
// Own isolate thread, okay to call directly
|
||||||
|
do_ffi_callback(
|
||||||
|
cif,
|
||||||
|
result,
|
||||||
|
args,
|
||||||
|
info.callback,
|
||||||
|
info.context,
|
||||||
|
info.isolate,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
let async_work_sender = &info.async_work_sender;
|
||||||
|
// SAFETY: Safe as this function blocks until `do_ffi_callback` completes and a response message is received.
|
||||||
|
let cif: &'static libffi::low::ffi_cif = std::mem::transmute(cif);
|
||||||
|
let result: &'static mut c_void = std::mem::transmute(result);
|
||||||
|
let info: &'static CallbackInfo = std::mem::transmute(info);
|
||||||
|
let (response_sender, response_receiver) = sync_channel::<()>(0);
|
||||||
|
let fut = Box::new(move || {
|
||||||
|
do_ffi_callback(
|
||||||
|
cif,
|
||||||
|
result,
|
||||||
|
args,
|
||||||
|
info.callback,
|
||||||
|
info.context,
|
||||||
|
info.isolate,
|
||||||
|
);
|
||||||
|
response_sender.send(()).unwrap();
|
||||||
|
});
|
||||||
|
async_work_sender.unbounded_send(fut).unwrap();
|
||||||
|
response_receiver.recv().unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn do_ffi_callback(
|
||||||
|
cif: &libffi::low::ffi_cif,
|
||||||
|
result: &mut c_void,
|
||||||
|
args: *const *const c_void,
|
||||||
|
callback: NonNull<v8::Function>,
|
||||||
|
context: NonNull<v8::Context>,
|
||||||
|
isolate: *mut v8::Isolate,
|
||||||
|
) {
|
||||||
|
let isolate = &mut *isolate;
|
||||||
|
let callback = v8::Global::from_raw(isolate, callback);
|
||||||
let context = std::mem::transmute::<
|
let context = std::mem::transmute::<
|
||||||
NonNull<v8::Context>,
|
NonNull<v8::Context>,
|
||||||
v8::Local<v8::Context>,
|
v8::Local<v8::Context>,
|
||||||
>(info.context);
|
>(context);
|
||||||
IS_ISOLATE_THREAD.with(|is_event_loop_thread| {
|
|
||||||
if !(*is_event_loop_thread.borrow()) {
|
|
||||||
// Call from another thread, not yet supported.
|
|
||||||
eprintln!(
|
|
||||||
"Calling Deno FFI's callbacks from other threads is not supported"
|
|
||||||
);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
// Call from main thread. If this callback is being triggered due to a
|
// Call from main thread. If this callback is being triggered due to a
|
||||||
// function call coming from Deno itself, then this callback will build
|
// function call coming from Deno itself, then this callback will build
|
||||||
// ontop of that stack.
|
// ontop of that stack.
|
||||||
|
@ -1096,11 +1181,20 @@ where
|
||||||
let cb = v8::Local::<v8::Function>::try_from(v8_value)?;
|
let cb = v8::Local::<v8::Function>::try_from(v8_value)?;
|
||||||
|
|
||||||
let isolate: *mut v8::Isolate = &mut *scope as &mut v8::Isolate;
|
let isolate: *mut v8::Isolate = &mut *scope as &mut v8::Isolate;
|
||||||
|
LOCAL_ISOLATE_POINTER.with(|s| {
|
||||||
|
if s.borrow().is_null() {
|
||||||
|
s.replace(isolate);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let async_work_sender =
|
||||||
|
state.borrow_mut::<FfiState>().async_work_sender.clone();
|
||||||
let callback = v8::Global::new(scope, cb).into_raw();
|
let callback = v8::Global::new(scope, cb).into_raw();
|
||||||
let current_context = scope.get_current_context();
|
let current_context = scope.get_current_context();
|
||||||
let context = v8::Global::new(scope, current_context).into_raw();
|
let context = v8::Global::new(scope, current_context).into_raw();
|
||||||
|
|
||||||
let info = Box::leak(Box::new(CallbackInfo {
|
let info = Box::leak(Box::new(CallbackInfo {
|
||||||
|
async_work_sender,
|
||||||
callback,
|
callback,
|
||||||
context,
|
context,
|
||||||
isolate,
|
isolate,
|
||||||
|
@ -1158,6 +1252,16 @@ where
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[op]
|
||||||
|
fn op_ffi_unsafe_callback_ref(state: &mut deno_core::OpState, inc_dec: bool) {
|
||||||
|
let ffi_state = state.borrow_mut::<FfiState>();
|
||||||
|
if inc_dec {
|
||||||
|
ffi_state.active_refed_functions += 1;
|
||||||
|
} else {
|
||||||
|
ffi_state.active_refed_functions -= 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[op(v8)]
|
#[op(v8)]
|
||||||
fn op_ffi_call_ptr_nonblocking<'scope, FP>(
|
fn op_ffi_call_ptr_nonblocking<'scope, FP>(
|
||||||
scope: &mut v8::HandleScope<'scope>,
|
scope: &mut v8::HandleScope<'scope>,
|
||||||
|
|
|
@ -211,6 +211,19 @@ pub extern "C" fn call_stored_function_2(arg: u8) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[no_mangle]
|
||||||
|
pub extern "C" fn call_stored_function_thread_safe() {
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(1500));
|
||||||
|
unsafe {
|
||||||
|
if STORED_FUNCTION.is_none() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
STORED_FUNCTION.unwrap()();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// FFI performance helper functions
|
// FFI performance helper functions
|
||||||
#[no_mangle]
|
#[no_mangle]
|
||||||
pub extern "C" fn nop() {}
|
pub extern "C" fn nop() {}
|
||||||
|
|
|
@ -77,6 +77,8 @@ fn basic() {
|
||||||
true\n\
|
true\n\
|
||||||
Before\n\
|
Before\n\
|
||||||
true\n\
|
true\n\
|
||||||
|
After\n\
|
||||||
|
true\n\
|
||||||
logCallback\n\
|
logCallback\n\
|
||||||
1 -1 2 -2 3 -3 4n -4n 0.5 -0.5 1 2 3 4 5 6 7 8\n\
|
1 -1 2 -2 3 -3 4n -4n 0.5 -0.5 1 2 3 4 5 6 7 8\n\
|
||||||
u8: 8\n\
|
u8: 8\n\
|
||||||
|
@ -85,12 +87,14 @@ fn basic() {
|
||||||
30\n\
|
30\n\
|
||||||
STORED_FUNCTION cleared\n\
|
STORED_FUNCTION cleared\n\
|
||||||
STORED_FUNCTION_2 cleared\n\
|
STORED_FUNCTION_2 cleared\n\
|
||||||
|
Thread safe call counter: 0\n\
|
||||||
|
logCallback\n\
|
||||||
|
Thread safe call counter: 1\n\
|
||||||
|
u8: 8\n\
|
||||||
Static u32: 42\n\
|
Static u32: 42\n\
|
||||||
Static i64: -1242464576485n\n\
|
Static i64: -1242464576485n\n\
|
||||||
Static ptr: true\n\
|
Static ptr: true\n\
|
||||||
Static ptr value: 42\n\
|
Static ptr value: 42\n\
|
||||||
After\n\
|
|
||||||
true\n\
|
|
||||||
Correct number of resources\n";
|
Correct number of resources\n";
|
||||||
assert_eq!(stdout, expected);
|
assert_eq!(stdout, expected);
|
||||||
assert_eq!(stderr, "");
|
assert_eq!(stderr, "");
|
||||||
|
@ -118,3 +122,35 @@ fn symbol_types() {
|
||||||
assert!(output.status.success());
|
assert!(output.status.success());
|
||||||
assert_eq!(stderr, "");
|
assert_eq!(stderr, "");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn thread_safe_callback() {
|
||||||
|
build();
|
||||||
|
|
||||||
|
let output = deno_cmd()
|
||||||
|
.arg("run")
|
||||||
|
.arg("--allow-ffi")
|
||||||
|
.arg("--allow-read")
|
||||||
|
.arg("--unstable")
|
||||||
|
.arg("--quiet")
|
||||||
|
.arg("tests/thread_safe_test.js")
|
||||||
|
.env("NO_COLOR", "1")
|
||||||
|
.output()
|
||||||
|
.unwrap();
|
||||||
|
let stdout = std::str::from_utf8(&output.stdout).unwrap();
|
||||||
|
let stderr = std::str::from_utf8(&output.stderr).unwrap();
|
||||||
|
if !output.status.success() {
|
||||||
|
println!("stdout {}", stdout);
|
||||||
|
println!("stderr {}", stderr);
|
||||||
|
}
|
||||||
|
println!("{:?}", output.status);
|
||||||
|
assert!(output.status.success());
|
||||||
|
let expected = "\
|
||||||
|
Callback on main thread\n\
|
||||||
|
Callback on worker thread\n\
|
||||||
|
Calling callback, isolate should stay asleep until callback is called\n\
|
||||||
|
Callback being called\n\
|
||||||
|
Isolate should now exit\n";
|
||||||
|
assert_eq!(stdout, expected);
|
||||||
|
assert_eq!(stderr, "");
|
||||||
|
}
|
||||||
|
|
|
@ -130,6 +130,12 @@ const dylib = Deno.dlopen(libPath, {
|
||||||
parameters: ["function"],
|
parameters: ["function"],
|
||||||
result: "void",
|
result: "void",
|
||||||
},
|
},
|
||||||
|
call_fn_ptr_thread_safe: {
|
||||||
|
name: "call_fn_ptr",
|
||||||
|
parameters: ["function"],
|
||||||
|
result: "void",
|
||||||
|
nonblocking: true,
|
||||||
|
},
|
||||||
call_fn_ptr_many_parameters: {
|
call_fn_ptr_many_parameters: {
|
||||||
parameters: ["function"],
|
parameters: ["function"],
|
||||||
result: "void",
|
result: "void",
|
||||||
|
@ -138,6 +144,11 @@ const dylib = Deno.dlopen(libPath, {
|
||||||
parameters: ["function"],
|
parameters: ["function"],
|
||||||
result: "void",
|
result: "void",
|
||||||
},
|
},
|
||||||
|
call_fn_ptr_return_u8_thread_safe: {
|
||||||
|
name: "call_fn_ptr_return_u8",
|
||||||
|
parameters: ["function"],
|
||||||
|
result: "void",
|
||||||
|
},
|
||||||
call_fn_ptr_return_buffer: {
|
call_fn_ptr_return_buffer: {
|
||||||
parameters: ["function"],
|
parameters: ["function"],
|
||||||
result: "void",
|
result: "void",
|
||||||
|
@ -292,15 +303,16 @@ console.log("After sleep_blocking");
|
||||||
console.log(performance.now() - start >= 100);
|
console.log(performance.now() - start >= 100);
|
||||||
|
|
||||||
start = performance.now();
|
start = performance.now();
|
||||||
dylib.symbols.sleep_nonblocking(100).then(() => {
|
const promise_2 = dylib.symbols.sleep_nonblocking(100).then(() => {
|
||||||
console.log("After");
|
console.log("After");
|
||||||
console.log(performance.now() - start >= 100);
|
console.log(performance.now() - start >= 100);
|
||||||
// Close after task is complete.
|
|
||||||
cleanup();
|
|
||||||
});
|
});
|
||||||
console.log("Before");
|
console.log("Before");
|
||||||
console.log(performance.now() - start < 100);
|
console.log(performance.now() - start < 100);
|
||||||
|
|
||||||
|
// Await to make sure `sleep_nonblocking` calls and logs before we proceed
|
||||||
|
await promise_2;
|
||||||
|
|
||||||
// Test calls with callback parameters
|
// Test calls with callback parameters
|
||||||
const logCallback = new Deno.UnsafeCallback(
|
const logCallback = new Deno.UnsafeCallback(
|
||||||
{ parameters: [], result: "void" },
|
{ parameters: [], result: "void" },
|
||||||
|
@ -376,6 +388,24 @@ dylib.symbols.store_function(ptr(nestedCallback));
|
||||||
dylib.symbols.store_function(null);
|
dylib.symbols.store_function(null);
|
||||||
dylib.symbols.store_function_2(null);
|
dylib.symbols.store_function_2(null);
|
||||||
|
|
||||||
|
let counter = 0;
|
||||||
|
const addToFooCallback = new Deno.UnsafeCallback({
|
||||||
|
parameters: [],
|
||||||
|
result: "void",
|
||||||
|
}, () => counter++);
|
||||||
|
|
||||||
|
// Test thread safe callbacks
|
||||||
|
console.log("Thread safe call counter:", counter);
|
||||||
|
addToFooCallback.ref();
|
||||||
|
await dylib.symbols.call_fn_ptr_thread_safe(ptr(addToFooCallback));
|
||||||
|
addToFooCallback.unref();
|
||||||
|
logCallback.ref();
|
||||||
|
await dylib.symbols.call_fn_ptr_thread_safe(ptr(logCallback));
|
||||||
|
logCallback.unref();
|
||||||
|
console.log("Thread safe call counter:", counter);
|
||||||
|
returnU8Callback.ref();
|
||||||
|
await dylib.symbols.call_fn_ptr_return_u8_thread_safe(ptr(returnU8Callback));
|
||||||
|
|
||||||
// Test statics
|
// Test statics
|
||||||
console.log("Static u32:", dylib.symbols.static_u32);
|
console.log("Static u32:", dylib.symbols.static_u32);
|
||||||
console.log("Static i64:", dylib.symbols.static_i64);
|
console.log("Static i64:", dylib.symbols.static_i64);
|
||||||
|
@ -386,7 +416,7 @@ console.log(
|
||||||
const view = new Deno.UnsafePointerView(dylib.symbols.static_ptr);
|
const view = new Deno.UnsafePointerView(dylib.symbols.static_ptr);
|
||||||
console.log("Static ptr value:", view.getUint32());
|
console.log("Static ptr value:", view.getUint32());
|
||||||
|
|
||||||
function cleanup() {
|
(function cleanup() {
|
||||||
dylib.close();
|
dylib.close();
|
||||||
throwCallback.close();
|
throwCallback.close();
|
||||||
logCallback.close();
|
logCallback.close();
|
||||||
|
@ -395,6 +425,7 @@ function cleanup() {
|
||||||
returnBufferCallback.close();
|
returnBufferCallback.close();
|
||||||
add10Callback.close();
|
add10Callback.close();
|
||||||
nestedCallback.close();
|
nestedCallback.close();
|
||||||
|
addToFooCallback.close();
|
||||||
|
|
||||||
const resourcesPost = Deno.resources();
|
const resourcesPost = Deno.resources();
|
||||||
|
|
||||||
|
@ -409,4 +440,4 @@ After: ${postStr}`,
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log("Correct number of resources");
|
console.log("Correct number of resources");
|
||||||
}
|
})();
|
||||||
|
|
101
test_ffi/tests/thread_safe_test.js
Normal file
101
test_ffi/tests/thread_safe_test.js
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||||
|
// deno-lint-ignore-file
|
||||||
|
|
||||||
|
const targetDir = Deno.execPath().replace(/[^\/\\]+$/, "");
|
||||||
|
const [libPrefix, libSuffix] = {
|
||||||
|
darwin: ["lib", "dylib"],
|
||||||
|
linux: ["lib", "so"],
|
||||||
|
windows: ["", "dll"],
|
||||||
|
}[Deno.build.os];
|
||||||
|
const libPath = `${targetDir}/${libPrefix}test_ffi.${libSuffix}`;
|
||||||
|
|
||||||
|
const resourcesPre = Deno.resources();
|
||||||
|
|
||||||
|
const dylib = Deno.dlopen(libPath, {
|
||||||
|
store_function: {
|
||||||
|
parameters: ["function"],
|
||||||
|
result: "void",
|
||||||
|
},
|
||||||
|
call_stored_function: {
|
||||||
|
parameters: [],
|
||||||
|
result: "void",
|
||||||
|
},
|
||||||
|
call_stored_function_thread_safe: {
|
||||||
|
parameters: [],
|
||||||
|
result: "void",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
let resolveWorker;
|
||||||
|
let workerResponsePromise;
|
||||||
|
|
||||||
|
const worker = new Worker(
|
||||||
|
new URL("./thread_safe_test_worker.js", import.meta.url).href,
|
||||||
|
{ type: "module" },
|
||||||
|
);
|
||||||
|
|
||||||
|
worker.addEventListener("message", () => {
|
||||||
|
if (resolveWorker) {
|
||||||
|
resolveWorker();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const sendWorkerMessage = async (data) => {
|
||||||
|
workerResponsePromise = new Promise((res) => {
|
||||||
|
resolveWorker = res;
|
||||||
|
});
|
||||||
|
worker.postMessage(data);
|
||||||
|
await workerResponsePromise;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Test step 1: Register main thread callback, trigger on worker thread
|
||||||
|
|
||||||
|
const mainThreadCallback = new Deno.UnsafeCallback(
|
||||||
|
{ parameters: [], result: "void" },
|
||||||
|
() => {
|
||||||
|
console.log("Callback on main thread");
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
mainThreadCallback.ref();
|
||||||
|
|
||||||
|
dylib.symbols.store_function(mainThreadCallback.pointer);
|
||||||
|
|
||||||
|
await sendWorkerMessage("call");
|
||||||
|
|
||||||
|
// Test step 2: Register on worker thread, trigger on main thread
|
||||||
|
|
||||||
|
await sendWorkerMessage("register");
|
||||||
|
|
||||||
|
dylib.symbols.call_stored_function();
|
||||||
|
|
||||||
|
// Unref both main and worker thread callbacks and terminate the wrorker: Note, the stored function pointer in lib is now dangling.
|
||||||
|
|
||||||
|
mainThreadCallback.unref();
|
||||||
|
await sendWorkerMessage("unref");
|
||||||
|
worker.terminate();
|
||||||
|
|
||||||
|
// Test step 3: Register a callback that will be the only thing left keeping the isolate from exiting.
|
||||||
|
// Rely on it to keep Deno running until the callback comes in and unrefs the callback, after which Deno should exit.
|
||||||
|
|
||||||
|
const cleanupCallback = new Deno.UnsafeCallback(
|
||||||
|
{ parameters: [], result: "void" },
|
||||||
|
() => {
|
||||||
|
console.log("Callback being called");
|
||||||
|
Promise.resolve().then(() => cleanup());
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
cleanupCallback.ref();
|
||||||
|
|
||||||
|
function cleanup() {
|
||||||
|
cleanupCallback.unref();
|
||||||
|
console.log("Isolate should now exit");
|
||||||
|
}
|
||||||
|
|
||||||
|
dylib.symbols.store_function(cleanupCallback.pointer);
|
||||||
|
|
||||||
|
console.log(
|
||||||
|
"Calling callback, isolate should stay asleep until callback is called",
|
||||||
|
);
|
||||||
|
dylib.symbols.call_stored_function_thread_safe();
|
41
test_ffi/tests/thread_safe_test_worker.js
Normal file
41
test_ffi/tests/thread_safe_test_worker.js
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||||
|
// deno-lint-ignore-file
|
||||||
|
|
||||||
|
const targetDir = Deno.execPath().replace(/[^\/\\]+$/, "");
|
||||||
|
const [libPrefix, libSuffix] = {
|
||||||
|
darwin: ["lib", "dylib"],
|
||||||
|
linux: ["lib", "so"],
|
||||||
|
windows: ["", "dll"],
|
||||||
|
}[Deno.build.os];
|
||||||
|
const libPath = `${targetDir}/${libPrefix}test_ffi.${libSuffix}`;
|
||||||
|
|
||||||
|
const dylib = Deno.dlopen(libPath, {
|
||||||
|
store_function: {
|
||||||
|
parameters: ["function"],
|
||||||
|
result: "void",
|
||||||
|
},
|
||||||
|
call_stored_function: {
|
||||||
|
parameters: [],
|
||||||
|
result: "void",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const callback = new Deno.UnsafeCallback(
|
||||||
|
{ parameters: [], result: "void" },
|
||||||
|
() => {
|
||||||
|
console.log("Callback on worker thread");
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
callback.ref();
|
||||||
|
|
||||||
|
self.addEventListener("message", ({ data }) => {
|
||||||
|
if (data === "register") {
|
||||||
|
dylib.symbols.store_function(callback.pointer);
|
||||||
|
} else if (data === "call") {
|
||||||
|
dylib.symbols.call_stored_function();
|
||||||
|
} else if (data === "unref") {
|
||||||
|
callback.unref();
|
||||||
|
}
|
||||||
|
self.postMessage("done");
|
||||||
|
});
|
Loading…
Reference in a new issue