diff --git a/cli/dts/lib.deno.unstable.d.ts b/cli/dts/lib.deno.unstable.d.ts index a25c1011e8..6eb7153bad 100644 --- a/cli/dts/lib.deno.unstable.d.ts +++ b/cli/dts/lib.deno.unstable.d.ts @@ -831,20 +831,22 @@ declare namespace Deno { >; /** - * Adds one to this callback's reference counting. + * Adds one to this callback's reference counting and returns the + * new reference count. * * If the callback's reference count becomes non-zero, it will keep * Deno's process from exiting. */ - ref(): void; + ref(): number; /** - * Removes one from this callback's reference counting. + * Removes one from this callback's reference counting and returns + * the new reference count. * * If the callback's reference counter becomes zero, it will no longer * keep Deno's process from exiting. */ - unref(): void; + unref(): number; /** * Removes the C function pointer associated with the UnsafeCallback. diff --git a/ext/ffi/00_ffi.js b/ext/ffi/00_ffi.js index 30a02a6095..3f5c57a9c4 100644 --- a/ext/ffi/00_ffi.js +++ b/ext/ffi/00_ffi.js @@ -184,6 +184,8 @@ class UnsafeCallback { #refcount; + // Internal promise only meant to keep Deno from exiting + #refpromise; #rid; definition; callback; @@ -208,23 +210,25 @@ ref() { if (this.#refcount++ === 0) { - ops.op_ffi_unsafe_callback_ref(true); + this.#refpromise = core.opAsync( + "op_ffi_unsafe_callback_ref", + this.#rid, + ); } + return this.#refcount; } unref() { // Only decrement refcount if it is positive, and only // unref the callback if refcount reaches zero. if (this.#refcount > 0 && --this.#refcount === 0) { - ops.op_ffi_unsafe_callback_ref(false); + ops.op_ffi_unsafe_callback_unref(this.#rid); } + return this.#refcount; } close() { - if (this.#refcount) { - this.#refcount = 0; - ops.op_ffi_unsafe_callback_ref(false); - } + this.#refcount = 0; core.close(this.#rid); } } diff --git a/ext/ffi/lib.rs b/ext/ffi/lib.rs index 9342abf6bb..183fd9214f 100644 --- a/ext/ffi/lib.rs +++ b/ext/ffi/lib.rs @@ -13,6 +13,8 @@ use deno_core::op; use deno_core::serde_json::Value; use deno_core::serde_v8; use deno_core::v8; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::Extension; use deno_core::OpState; use deno_core::Resource; @@ -26,14 +28,18 @@ use std::cell::RefCell; use std::collections::HashMap; use std::ffi::c_void; use std::ffi::CStr; +use std::future::IntoFuture; use std::mem::size_of; use std::os::raw::c_char; use std::os::raw::c_short; use std::path::Path; use std::path::PathBuf; +use std::pin::Pin; use std::ptr; use std::rc::Rc; use std::sync::mpsc::sync_channel; +use std::task::Poll; +use std::task::Waker; mod fast_call; @@ -156,7 +162,6 @@ type PendingFfiAsyncWork = Box; struct FfiState { async_work_sender: mpsc::UnboundedSender, async_work_receiver: mpsc::UnboundedReceiver, - active_refed_functions: usize, } pub fn init(unstable: bool) -> Extension { @@ -188,6 +193,7 @@ pub fn init(unstable: bool) -> Extension { op_ffi_read_f64::decl::

(), op_ffi_unsafe_callback_create::decl::

(), op_ffi_unsafe_callback_ref::decl(), + op_ffi_unsafe_callback_unref::decl(), ]) .event_loop_middleware(|op_state_rc, _cx| { // FFI callbacks coming in from other threads will call in and get queued. @@ -207,10 +213,6 @@ pub fn init(unstable: bool) -> Extension { maybe_scheduling = true; } - if ffi_state.active_refed_functions > 0 { - maybe_scheduling = true; - } - drop(op_state); } while let Some(async_work_fut) = work_items.pop() { @@ -227,7 +229,6 @@ pub fn init(unstable: bool) -> Extension { mpsc::unbounded::(); state.put(FfiState { - active_refed_functions: 0, async_work_receiver, async_work_sender, }); @@ -1320,11 +1321,12 @@ fn ffi_call( } struct UnsafeCallbackResource { + cancel: Rc, // Closure is never directly touched, but it keeps the C callback alive // until `close()` method is called. #[allow(dead_code)] closure: libffi::middle::Closure<'static>, - info: *const CallbackInfo, + info: *mut CallbackInfo, } impl Resource for UnsafeCallbackResource { @@ -1333,15 +1335,16 @@ impl Resource for UnsafeCallbackResource { } fn close(self: Rc) { + self.cancel.cancel(); // SAFETY: This drops the closure and the callback info associated with it. // Any retained function pointers to the closure become dangling pointers. // It is up to the user to know that it is safe to call the `close()` on the // UnsafeCallback instance. unsafe { - let info = Box::from_raw(self.info as *mut CallbackInfo); + let info = Box::from_raw(self.info); let isolate = info.isolate.as_mut().unwrap(); - v8::Global::from_raw(isolate, info.callback); - v8::Global::from_raw(isolate, info.context); + let _ = v8::Global::from_raw(isolate, info.callback); + let _ = v8::Global::from_raw(isolate, info.context); } } } @@ -1353,6 +1356,7 @@ struct CallbackInfo { pub callback: NonNull, pub context: NonNull, pub isolate: *mut v8::Isolate, + pub waker: Option, } unsafe extern "C" fn deno_ffi_callback( @@ -1376,6 +1380,10 @@ unsafe extern "C" fn deno_ffi_callback( response_sender.send(()).unwrap(); }); async_work_sender.unbounded_send(fut).unwrap(); + if let Some(waker) = info.waker.as_ref() { + // Make sure event loop wakes up to receive our message before we start waiting for a response. + waker.wake_by_ref(); + } response_receiver.recv().unwrap(); } }); @@ -1725,22 +1733,30 @@ where let current_context = scope.get_current_context(); let context = v8::Global::new(scope, current_context).into_raw(); - let info = Box::leak(Box::new(CallbackInfo { + let info: *mut CallbackInfo = Box::leak(Box::new(CallbackInfo { parameters: args.parameters.clone(), result: args.result, async_work_sender, callback, context, isolate, + waker: None, })); let cif = Cif::new( args.parameters.into_iter().map(libffi::middle::Type::from), libffi::middle::Type::from(args.result), ); - let closure = libffi::middle::Closure::new(cif, deno_ffi_callback, info); + // SAFETY: CallbackInfo is leaked, is not null and stays valid as long as the callback exists. + let closure = libffi::middle::Closure::new(cif, deno_ffi_callback, unsafe { + info.as_ref().unwrap() + }); let ptr = *closure.code_ptr() as usize; - let resource = UnsafeCallbackResource { closure, info }; + let resource = UnsafeCallbackResource { + cancel: CancelHandle::new_rc(), + closure, + info, + }; let rid = state.resource_table.add(resource); let rid_local = v8::Integer::new_from_unsigned(scope, rid); @@ -1790,17 +1806,53 @@ where Ok(result) } -#[op] -fn op_ffi_unsafe_callback_ref(state: &mut deno_core::OpState, inc_dec: bool) { - check_unstable(state, "Deno.dlopen"); - let ffi_state = state.borrow_mut::(); - if inc_dec { - ffi_state.active_refed_functions += 1; - } else { - ffi_state.active_refed_functions -= 1; +impl Future for CallbackInfo { + type Output = (); + fn poll( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + // Always replace the waker to make sure it's bound to the proper Future. + self.waker.replace(cx.waker().clone()); + // The future for the CallbackInfo never resolves: It can only be canceled. + Poll::Pending } } +#[op] +fn op_ffi_unsafe_callback_ref( + state: &mut deno_core::OpState, + rid: ResourceId, +) -> Result>, AnyError> { + let callback_resource = + state.resource_table.get::(rid)?; + + Ok(async move { + let info: &mut CallbackInfo = + // SAFETY: CallbackInfo pointer stays valid as long as the resource is still alive. + unsafe { callback_resource.info.as_mut().unwrap() }; + // Ignore cancellation rejection + let _ = info + .into_future() + .or_cancel(callback_resource.cancel.clone()) + .await; + Ok(()) + }) +} + +#[op(fast)] +fn op_ffi_unsafe_callback_unref( + state: &mut deno_core::OpState, + rid: u32, +) -> Result<(), AnyError> { + state + .resource_table + .get::(rid)? + .cancel + .cancel(); + Ok(()) +} + #[op(v8)] fn op_ffi_call_ptr_nonblocking<'scope, FP>( scope: &mut v8::HandleScope<'scope>, diff --git a/test_ffi/tests/test.js b/test_ffi/tests/test.js index 6bf3c47f82..0af94b9065 100644 --- a/test_ffi/tests/test.js +++ b/test_ffi/tests/test.js @@ -515,6 +515,7 @@ logCallback.unref(); console.log("Thread safe call counter:", counter); returnU8Callback.ref(); await dylib.symbols.call_fn_ptr_return_u8_thread_safe(returnU8Callback.pointer); +// Purposefully do not unref returnU8Callback: Instead use it to test close() unrefing. // Test statics console.log("Static u32:", dylib.symbols.static_u32); @@ -585,7 +586,7 @@ After: ${postStr}`, })(); function assertIsOptimized(fn) { - const status = % GetOptimizationStatus(fn); + const status = %GetOptimizationStatus(fn); assert(status & (1 << 4), `expected ${fn.name} to be optimized, but wasn't`); }