diff --git a/cli/dts/lib.deno.unstable.d.ts b/cli/dts/lib.deno.unstable.d.ts index ab6f8634f6..ac27648fe0 100644 --- a/cli/dts/lib.deno.unstable.d.ts +++ b/cli/dts/lib.deno.unstable.d.ts @@ -565,6 +565,9 @@ declare namespace Deno { * as C function pointers to ffi calls. * * The function pointer remains valid until the `close()` method is called. + * + * The callback can be explicitly ref'ed and deref'ed to stop Deno's + * process from exiting. */ export class UnsafeCallback< Definition extends UnsafeCallbackDefinition = UnsafeCallbackDefinition, @@ -584,6 +587,30 @@ declare namespace Deno { Definition["result"] >; + /** + * Adds one to this callback's reference counting. + * + * If the callback's reference count becomes non-zero, it will keep + * Deno's process from exiting. + */ + ref(): void; + + /** + * Removes one from this callback's reference counting. + * + * If the callback's reference counter becomes zero, it will no longer + * keep Deno's process from exiting. + */ + unref(): void; + + /** + * Removes the C function pointer associated with the UnsafeCallback. + * Continuing to use the instance after calling this object will lead to errors + * and crashes. + * + * Calling this method will also immediately set the callback's reference + * counting to zero and it will no longer keep Deno's process from exiting. + */ close(): void; } diff --git a/core/examples/schedule_task.rs b/core/examples/schedule_task.rs index 7812dcb497..bd4bcb028d 100644 --- a/core/examples/schedule_task.rs +++ b/core/examples/schedule_task.rs @@ -20,7 +20,8 @@ type Task = Box; fn main() { let my_ext = Extension::builder() .ops(vec![op_schedule_task::decl()]) - .event_loop_middleware(|state, cx| { + .event_loop_middleware(|state_rc, cx| { + let mut state = state_rc.borrow_mut(); let recv = state.borrow_mut::>(); let mut ref_loop = false; while let Poll::Ready(Some(call)) = recv.poll_next_unpin(cx) { diff --git a/core/extensions.rs b/core/extensions.rs index 6829871242..ce69578752 100644 --- a/core/extensions.rs +++ b/core/extensions.rs @@ -1,13 +1,13 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. use crate::OpState; use anyhow::Error; -use std::task::Context; +use std::{cell::RefCell, rc::Rc, task::Context}; pub type SourcePair = (&'static str, &'static str); pub type OpFnRef = v8::FunctionCallback; pub type OpMiddlewareFn = dyn Fn(OpDecl) -> OpDecl; pub type OpStateFn = dyn Fn(&mut OpState) -> Result<(), Error>; -pub type OpEventLoopFn = dyn Fn(&mut OpState, &mut Context) -> bool; +pub type OpEventLoopFn = dyn Fn(Rc>, &mut Context) -> bool; #[derive(Clone, Copy)] pub struct OpDecl { @@ -90,13 +90,13 @@ impl Extension { pub fn run_event_loop_middleware( &self, - op_state: &mut OpState, + op_state_rc: Rc>, cx: &mut Context, ) -> bool { self .event_loop_middleware .as_ref() - .map(|f| f(op_state, cx)) + .map(|f| f(op_state_rc, cx)) .unwrap_or(false) } @@ -148,7 +148,7 @@ impl ExtensionBuilder { pub fn event_loop_middleware(&mut self, middleware_fn: F) -> &mut Self where - F: Fn(&mut OpState, &mut Context) -> bool + 'static, + F: Fn(Rc>, &mut Context) -> bool + 'static, { self.event_loop_middleware = Some(Box::new(middleware_fn)); self diff --git a/core/runtime.rs b/core/runtime.rs index 7cb556fd3f..23fe730133 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -921,7 +921,7 @@ impl JsRuntime { let state = state_rc.borrow(); let op_state = state.op_state.clone(); for f in &self.event_loop_middlewares { - if f(&mut op_state.borrow_mut(), cx) { + if f(op_state.clone(), cx) { maybe_scheduling = true; } } diff --git a/ext/ffi/00_ffi.js b/ext/ffi/00_ffi.js index 1908733076..f308ecad9c 100644 --- a/ext/ffi/00_ffi.js +++ b/ext/ffi/00_ffi.js @@ -201,6 +201,7 @@ } class UnsafeCallback { + #refcount; #rid; definition; callback; @@ -217,13 +218,30 @@ definition, callback, ); + this.#refcount = 0; this.#rid = rid; this.pointer = pointer; this.definition = definition; this.callback = callback; } + ref() { + if (this.#refcount++ === 0) { + core.opSync("op_ffi_unsafe_callback_ref", true); + } + } + + unref() { + if (--this.#refcount === 0) { + core.opSync("op_ffi_unsafe_callback_ref", false); + } + } + close() { + if (this.#refcount) { + this.#refcount = 0; + core.opSync("op_ffi_unsafe_callback_ref", false); + } core.close(this.#rid); } } diff --git a/ext/ffi/lib.rs b/ext/ffi/lib.rs index 41b2f0a3b6..d90f20a29f 100644 --- a/ext/ffi/lib.rs +++ b/ext/ffi/lib.rs @@ -6,9 +6,11 @@ use deno_core::error::generic_error; use deno_core::error::range_error; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::channel::mpsc; use deno_core::futures::Future; use deno_core::include_js_files; use deno_core::op; +use std::sync::mpsc::sync_channel; use deno_core::serde_json::json; use deno_core::serde_json::Value; @@ -37,7 +39,7 @@ use std::ptr; use std::rc::Rc; thread_local! { - static IS_ISOLATE_THREAD: RefCell = RefCell::new(false); + static LOCAL_ISOLATE_POINTER: RefCell<*const v8::Isolate> = RefCell::new(ptr::null()); } pub struct Unstable(pub bool); @@ -122,7 +124,6 @@ impl DynamicLibraryResource { name: String, foreign_fn: ForeignFunction, ) -> Result<(), AnyError> { - IS_ISOLATE_THREAD.with(|s| s.replace(true)); let symbol = match &foreign_fn.name { Some(symbol) => symbol, None => &name, @@ -178,6 +179,14 @@ impl DynamicLibraryResource { } } +type PendingFfiAsyncWork = Box; + +struct FfiState { + async_work_sender: mpsc::UnboundedSender, + async_work_receiver: mpsc::UnboundedReceiver, + active_refed_functions: usize, +} + pub fn init(unstable: bool) -> Extension { Extension::builder() .js(include_js_files!( @@ -204,10 +213,51 @@ pub fn init(unstable: bool) -> Extension { op_ffi_read_f32::decl::

(), op_ffi_read_f64::decl::

(), op_ffi_unsafe_callback_create::decl::

(), + op_ffi_unsafe_callback_ref::decl(), ]) + .event_loop_middleware(|op_state_rc, _cx| { + // FFI callbacks coming in from other threads will call in and get queued. + let mut maybe_scheduling = false; + + let mut work_items: Vec = vec![]; + + { + let mut op_state = op_state_rc.borrow_mut(); + let ffi_state = op_state.borrow_mut::(); + + while let Ok(Some(async_work_fut)) = + ffi_state.async_work_receiver.try_next() + { + // Move received items to a temporary vector so that we can drop the `op_state` borrow before we do the work. + work_items.push(async_work_fut); + maybe_scheduling = true; + } + + if ffi_state.active_refed_functions > 0 { + maybe_scheduling = true; + } + + drop(op_state); + } + while let Some(async_work_fut) = work_items.pop() { + async_work_fut(); + } + + maybe_scheduling + }) .state(move |state| { // Stolen from deno_webgpu, is there a better option? state.put(Unstable(unstable)); + + let (async_work_sender, async_work_receiver) = + mpsc::unbounded::(); + + state.put(FfiState { + active_refed_functions: 0, + async_work_receiver, + async_work_sender, + }); + Ok(()) }) .build() @@ -831,6 +881,7 @@ impl Resource for UnsafeCallbackResource { } struct CallbackInfo { + pub async_work_sender: mpsc::UnboundedSender, pub callback: NonNull, pub context: NonNull, pub isolate: *mut v8::Isolate, @@ -842,21 +893,55 @@ unsafe extern "C" fn deno_ffi_callback( args: *const *const c_void, info: &CallbackInfo, ) { - let isolate = &mut *info.isolate; - let callback = v8::Global::from_raw(isolate, info.callback); + LOCAL_ISOLATE_POINTER.with(|s| { + if ptr::eq(*s.borrow(), info.isolate) { + // Own isolate thread, okay to call directly + do_ffi_callback( + cif, + result, + args, + info.callback, + info.context, + info.isolate, + ); + } else { + let async_work_sender = &info.async_work_sender; + // SAFETY: Safe as this function blocks until `do_ffi_callback` completes and a response message is received. + let cif: &'static libffi::low::ffi_cif = std::mem::transmute(cif); + let result: &'static mut c_void = std::mem::transmute(result); + let info: &'static CallbackInfo = std::mem::transmute(info); + let (response_sender, response_receiver) = sync_channel::<()>(0); + let fut = Box::new(move || { + do_ffi_callback( + cif, + result, + args, + info.callback, + info.context, + info.isolate, + ); + response_sender.send(()).unwrap(); + }); + async_work_sender.unbounded_send(fut).unwrap(); + response_receiver.recv().unwrap(); + } + }); +} + +unsafe fn do_ffi_callback( + cif: &libffi::low::ffi_cif, + result: &mut c_void, + args: *const *const c_void, + callback: NonNull, + context: NonNull, + isolate: *mut v8::Isolate, +) { + let isolate = &mut *isolate; + let callback = v8::Global::from_raw(isolate, callback); let context = std::mem::transmute::< NonNull, v8::Local, - >(info.context); - IS_ISOLATE_THREAD.with(|is_event_loop_thread| { - if !(*is_event_loop_thread.borrow()) { - // Call from another thread, not yet supported. - eprintln!( - "Calling Deno FFI's callbacks from other threads is not supported" - ); - std::process::exit(1); - } - }); + >(context); // Call from main thread. If this callback is being triggered due to a // function call coming from Deno itself, then this callback will build // ontop of that stack. @@ -1096,11 +1181,20 @@ where let cb = v8::Local::::try_from(v8_value)?; let isolate: *mut v8::Isolate = &mut *scope as &mut v8::Isolate; + LOCAL_ISOLATE_POINTER.with(|s| { + if s.borrow().is_null() { + s.replace(isolate); + } + }); + + let async_work_sender = + state.borrow_mut::().async_work_sender.clone(); let callback = v8::Global::new(scope, cb).into_raw(); let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); let info = Box::leak(Box::new(CallbackInfo { + async_work_sender, callback, context, isolate, @@ -1158,6 +1252,16 @@ where Ok(result) } +#[op] +fn op_ffi_unsafe_callback_ref(state: &mut deno_core::OpState, inc_dec: bool) { + let ffi_state = state.borrow_mut::(); + if inc_dec { + ffi_state.active_refed_functions += 1; + } else { + ffi_state.active_refed_functions -= 1; + } +} + #[op(v8)] fn op_ffi_call_ptr_nonblocking<'scope, FP>( scope: &mut v8::HandleScope<'scope>, diff --git a/test_ffi/src/lib.rs b/test_ffi/src/lib.rs index b4908d9cd7..d6f29cbb89 100644 --- a/test_ffi/src/lib.rs +++ b/test_ffi/src/lib.rs @@ -211,6 +211,19 @@ pub extern "C" fn call_stored_function_2(arg: u8) { } } +#[no_mangle] +pub extern "C" fn call_stored_function_thread_safe() { + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(1500)); + unsafe { + if STORED_FUNCTION.is_none() { + return; + } + STORED_FUNCTION.unwrap()(); + } + }); +} + // FFI performance helper functions #[no_mangle] pub extern "C" fn nop() {} diff --git a/test_ffi/tests/integration_tests.rs b/test_ffi/tests/integration_tests.rs index 77bcc758e3..35f37aa144 100644 --- a/test_ffi/tests/integration_tests.rs +++ b/test_ffi/tests/integration_tests.rs @@ -77,6 +77,8 @@ fn basic() { true\n\ Before\n\ true\n\ + After\n\ + true\n\ logCallback\n\ 1 -1 2 -2 3 -3 4n -4n 0.5 -0.5 1 2 3 4 5 6 7 8\n\ u8: 8\n\ @@ -85,12 +87,14 @@ fn basic() { 30\n\ STORED_FUNCTION cleared\n\ STORED_FUNCTION_2 cleared\n\ + Thread safe call counter: 0\n\ + logCallback\n\ + Thread safe call counter: 1\n\ + u8: 8\n\ Static u32: 42\n\ Static i64: -1242464576485n\n\ Static ptr: true\n\ Static ptr value: 42\n\ - After\n\ - true\n\ Correct number of resources\n"; assert_eq!(stdout, expected); assert_eq!(stderr, ""); @@ -118,3 +122,35 @@ fn symbol_types() { assert!(output.status.success()); assert_eq!(stderr, ""); } + +#[test] +fn thread_safe_callback() { + build(); + + let output = deno_cmd() + .arg("run") + .arg("--allow-ffi") + .arg("--allow-read") + .arg("--unstable") + .arg("--quiet") + .arg("tests/thread_safe_test.js") + .env("NO_COLOR", "1") + .output() + .unwrap(); + let stdout = std::str::from_utf8(&output.stdout).unwrap(); + let stderr = std::str::from_utf8(&output.stderr).unwrap(); + if !output.status.success() { + println!("stdout {}", stdout); + println!("stderr {}", stderr); + } + println!("{:?}", output.status); + assert!(output.status.success()); + let expected = "\ + Callback on main thread\n\ + Callback on worker thread\n\ + Calling callback, isolate should stay asleep until callback is called\n\ + Callback being called\n\ + Isolate should now exit\n"; + assert_eq!(stdout, expected); + assert_eq!(stderr, ""); +} diff --git a/test_ffi/tests/test.js b/test_ffi/tests/test.js index ab31dcb836..03c166a7c4 100644 --- a/test_ffi/tests/test.js +++ b/test_ffi/tests/test.js @@ -130,6 +130,12 @@ const dylib = Deno.dlopen(libPath, { parameters: ["function"], result: "void", }, + call_fn_ptr_thread_safe: { + name: "call_fn_ptr", + parameters: ["function"], + result: "void", + nonblocking: true, + }, call_fn_ptr_many_parameters: { parameters: ["function"], result: "void", @@ -138,6 +144,11 @@ const dylib = Deno.dlopen(libPath, { parameters: ["function"], result: "void", }, + call_fn_ptr_return_u8_thread_safe: { + name: "call_fn_ptr_return_u8", + parameters: ["function"], + result: "void", + }, call_fn_ptr_return_buffer: { parameters: ["function"], result: "void", @@ -292,15 +303,16 @@ console.log("After sleep_blocking"); console.log(performance.now() - start >= 100); start = performance.now(); -dylib.symbols.sleep_nonblocking(100).then(() => { +const promise_2 = dylib.symbols.sleep_nonblocking(100).then(() => { console.log("After"); console.log(performance.now() - start >= 100); - // Close after task is complete. - cleanup(); }); console.log("Before"); console.log(performance.now() - start < 100); +// Await to make sure `sleep_nonblocking` calls and logs before we proceed +await promise_2; + // Test calls with callback parameters const logCallback = new Deno.UnsafeCallback( { parameters: [], result: "void" }, @@ -376,6 +388,24 @@ dylib.symbols.store_function(ptr(nestedCallback)); dylib.symbols.store_function(null); dylib.symbols.store_function_2(null); +let counter = 0; +const addToFooCallback = new Deno.UnsafeCallback({ + parameters: [], + result: "void", +}, () => counter++); + +// Test thread safe callbacks +console.log("Thread safe call counter:", counter); +addToFooCallback.ref(); +await dylib.symbols.call_fn_ptr_thread_safe(ptr(addToFooCallback)); +addToFooCallback.unref(); +logCallback.ref(); +await dylib.symbols.call_fn_ptr_thread_safe(ptr(logCallback)); +logCallback.unref(); +console.log("Thread safe call counter:", counter); +returnU8Callback.ref(); +await dylib.symbols.call_fn_ptr_return_u8_thread_safe(ptr(returnU8Callback)); + // Test statics console.log("Static u32:", dylib.symbols.static_u32); console.log("Static i64:", dylib.symbols.static_i64); @@ -386,7 +416,7 @@ console.log( const view = new Deno.UnsafePointerView(dylib.symbols.static_ptr); console.log("Static ptr value:", view.getUint32()); -function cleanup() { +(function cleanup() { dylib.close(); throwCallback.close(); logCallback.close(); @@ -395,6 +425,7 @@ function cleanup() { returnBufferCallback.close(); add10Callback.close(); nestedCallback.close(); + addToFooCallback.close(); const resourcesPost = Deno.resources(); @@ -409,4 +440,4 @@ After: ${postStr}`, } console.log("Correct number of resources"); -} +})(); diff --git a/test_ffi/tests/thread_safe_test.js b/test_ffi/tests/thread_safe_test.js new file mode 100644 index 0000000000..e541140550 --- /dev/null +++ b/test_ffi/tests/thread_safe_test.js @@ -0,0 +1,101 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +// deno-lint-ignore-file + +const targetDir = Deno.execPath().replace(/[^\/\\]+$/, ""); +const [libPrefix, libSuffix] = { + darwin: ["lib", "dylib"], + linux: ["lib", "so"], + windows: ["", "dll"], +}[Deno.build.os]; +const libPath = `${targetDir}/${libPrefix}test_ffi.${libSuffix}`; + +const resourcesPre = Deno.resources(); + +const dylib = Deno.dlopen(libPath, { + store_function: { + parameters: ["function"], + result: "void", + }, + call_stored_function: { + parameters: [], + result: "void", + }, + call_stored_function_thread_safe: { + parameters: [], + result: "void", + }, +}); + +let resolveWorker; +let workerResponsePromise; + +const worker = new Worker( + new URL("./thread_safe_test_worker.js", import.meta.url).href, + { type: "module" }, +); + +worker.addEventListener("message", () => { + if (resolveWorker) { + resolveWorker(); + } +}); + +const sendWorkerMessage = async (data) => { + workerResponsePromise = new Promise((res) => { + resolveWorker = res; + }); + worker.postMessage(data); + await workerResponsePromise; +}; + +// Test step 1: Register main thread callback, trigger on worker thread + +const mainThreadCallback = new Deno.UnsafeCallback( + { parameters: [], result: "void" }, + () => { + console.log("Callback on main thread"); + }, +); + +mainThreadCallback.ref(); + +dylib.symbols.store_function(mainThreadCallback.pointer); + +await sendWorkerMessage("call"); + +// Test step 2: Register on worker thread, trigger on main thread + +await sendWorkerMessage("register"); + +dylib.symbols.call_stored_function(); + +// Unref both main and worker thread callbacks and terminate the wrorker: Note, the stored function pointer in lib is now dangling. + +mainThreadCallback.unref(); +await sendWorkerMessage("unref"); +worker.terminate(); + +// Test step 3: Register a callback that will be the only thing left keeping the isolate from exiting. +// Rely on it to keep Deno running until the callback comes in and unrefs the callback, after which Deno should exit. + +const cleanupCallback = new Deno.UnsafeCallback( + { parameters: [], result: "void" }, + () => { + console.log("Callback being called"); + Promise.resolve().then(() => cleanup()); + }, +); + +cleanupCallback.ref(); + +function cleanup() { + cleanupCallback.unref(); + console.log("Isolate should now exit"); +} + +dylib.symbols.store_function(cleanupCallback.pointer); + +console.log( + "Calling callback, isolate should stay asleep until callback is called", +); +dylib.symbols.call_stored_function_thread_safe(); diff --git a/test_ffi/tests/thread_safe_test_worker.js b/test_ffi/tests/thread_safe_test_worker.js new file mode 100644 index 0000000000..0670044698 --- /dev/null +++ b/test_ffi/tests/thread_safe_test_worker.js @@ -0,0 +1,41 @@ +// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +// deno-lint-ignore-file + +const targetDir = Deno.execPath().replace(/[^\/\\]+$/, ""); +const [libPrefix, libSuffix] = { + darwin: ["lib", "dylib"], + linux: ["lib", "so"], + windows: ["", "dll"], +}[Deno.build.os]; +const libPath = `${targetDir}/${libPrefix}test_ffi.${libSuffix}`; + +const dylib = Deno.dlopen(libPath, { + store_function: { + parameters: ["function"], + result: "void", + }, + call_stored_function: { + parameters: [], + result: "void", + }, +}); + +const callback = new Deno.UnsafeCallback( + { parameters: [], result: "void" }, + () => { + console.log("Callback on worker thread"); + }, +); + +callback.ref(); + +self.addEventListener("message", ({ data }) => { + if (data === "register") { + dylib.symbols.store_function(callback.pointer); + } else if (data === "call") { + dylib.symbols.call_stored_function(); + } else if (data === "unref") { + callback.unref(); + } + self.postMessage("done"); +});