1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 16:42:21 -05:00

fix(ext/ffi): Fix re-ref'ing UnsafeCallback (#17704)

This commit is contained in:
Aapo Alasuutari 2023-02-22 21:09:59 +02:00 committed by GitHub
parent 9b8992d4b4
commit 0f9daaeacb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 89 additions and 41 deletions

View file

@ -97,7 +97,6 @@ declare namespace Deno {
/** **UNSTABLE**: New API, yet to be vetted. /** **UNSTABLE**: New API, yet to be vetted.
* *
* The native struct type for interfacing with foreign functions. * The native struct type for interfacing with foreign functions.
*
*/ */
type NativeStructType = { readonly struct: readonly NativeType[] }; type NativeStructType = { readonly struct: readonly NativeType[] };
@ -512,46 +511,80 @@ declare namespace Deno {
* *
* The function pointer remains valid until the `close()` method is called. * The function pointer remains valid until the `close()` method is called.
* *
* The callback can be explicitly referenced via `ref()` and dereferenced via * All `UnsafeCallback` are always thread safe in that they can be called from
* `deref()` to stop Deno's process from exiting. * foreign threads without crashing. However, they do not wake up the Deno event
* loop by default.
*
* If a callback is to be called from foreign threads, use the `threadSafe()`
* static constructor or explicitly call `ref()` to have the callback wake up
* the Deno event loop when called from foreign threads. This also stops
* Deno's process from exiting while the callback still exists and is not
* unref'ed.
*
* Use `deref()` to then allow Deno's process to exit. Calling `deref()` on
* a ref'ed callback does not stop it from waking up the Deno event loop when
* called from foreign threads.
* *
* @category FFI * @category FFI
*/ */
export class UnsafeCallback< export class UnsafeCallback<
Definition extends UnsafeCallbackDefinition = UnsafeCallbackDefinition, Definition extends UnsafeCallbackDefinition = UnsafeCallbackDefinition
> { > {
constructor( constructor(
definition: Const<Definition>, definition: Const<Definition>,
callback: UnsafeCallbackFunction< callback: UnsafeCallbackFunction<
Definition["parameters"], Definition["parameters"],
Definition["result"] Definition["result"]
>, >
); );
/** The pointer to the unsafe callback. */ /** The pointer to the unsafe callback. */
pointer: NonNullable<PointerValue>; readonly pointer: NonNullable<PointerValue>;
/** The definition of the unsafe callback. */ /** The definition of the unsafe callback. */
definition: Definition; readonly definition: Definition;
/** The callback function. */ /** The callback function. */
callback: UnsafeCallbackFunction< readonly callback: UnsafeCallbackFunction<
Definition["parameters"], Definition["parameters"],
Definition["result"] Definition["result"]
>; >;
/** /**
* Adds one to this callback's reference counting and returns the new * Creates an {@linkcode UnsafeCallback} and calls `ref()` once to allow it to
* wake up the Deno event loop when called from foreign threads.
*
* This also stops Deno's process from exiting while the callback still
* exists and is not unref'ed.
*/
static threadSafe<
Definition extends UnsafeCallbackDefinition = UnsafeCallbackDefinition
>(
definition: Const<Definition>,
callback: UnsafeCallbackFunction<
Definition["parameters"],
Definition["result"]
>
): UnsafeCallback<Definition>;
/**
* Increments the callback's reference counting and returns the new
* reference count. * reference count.
* *
* If the callback's reference count is non-zero, it will keep Deno's * After `ref()` has been called, the callback always wakes up the
* Deno event loop when called from foreign threads.
*
* If the callback's reference count is non-zero, it keeps Deno's
* process from exiting. * process from exiting.
*/ */
ref(): number; ref(): number;
/** /**
* Removes one from this callback's reference counting and returns the new * Decrements the callback's reference counting and returns the new
* reference count. * reference count.
*
* Calling `unref()` does not stop a callback from waking up the Deno
* event loop when called from foreign threads.
* *
* If the callback's reference counter is zero, it will no longer keep * If the callback's reference counter is zero, it no longer keeps
* Deno's process from exiting. * Deno's process from exiting.
*/ */
unref(): number; unref(): number;
@ -559,11 +592,12 @@ declare namespace Deno {
/** /**
* Removes the C function pointer associated with this instance. * Removes the C function pointer associated with this instance.
* *
* Continuing to use the instance after calling this object will lead to * Continuing to use the instance or the C function pointer after closing
* errors and crashes. * the `UnsafeCallback` will lead to errors and crashes.
* *
* Calling this method will also immediately set the callback's reference * Calling this method sets the callback's reference counting to zero,
* counting to zero and it will no longer keep Deno's process from exiting. * stops the callback from waking up the Deno event loop when called from
* foreign threads and no longer keeps Deno's process from exiting.
*/ */
close(): void; close(): void;
} }

View file

@ -19,7 +19,7 @@ declare namespace Deno {
/** Mark following promise as "unref", ie. event loop will exit /** Mark following promise as "unref", ie. event loop will exit
* if there are only "unref" promises left. */ * if there are only "unref" promises left. */
function unrefOps(promiseId: number): void; function unrefOp(promiseId: number): void;
/** /**
* List of all registered ops, in the form of a map that maps op * List of all registered ops, in the form of a map that maps op

View file

@ -25,8 +25,11 @@ const {
MathCeil, MathCeil,
SafeMap, SafeMap,
SafeArrayIterator, SafeArrayIterator,
SymbolFor,
} = primordials; } = primordials;
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
const U32_BUFFER = new Uint32Array(2); const U32_BUFFER = new Uint32Array(2);
const U64_BUFFER = new BigUint64Array(U32_BUFFER.buffer); const U64_BUFFER = new BigUint64Array(U32_BUFFER.buffer);
const I64_BUFFER = new BigInt64Array(U32_BUFFER.buffer); const I64_BUFFER = new BigInt64Array(U32_BUFFER.buffer);
@ -360,12 +363,23 @@ class UnsafeCallback {
this.callback = callback; this.callback = callback;
} }
static threadSafe(definition, callback) {
const unsafeCallback = new UnsafeCallback(definition, callback);
unsafeCallback.ref();
return unsafeCallback;
}
ref() { ref() {
if (this.#refcount++ === 0) { if (this.#refcount++ === 0) {
this.#refpromise = core.opAsync( if (this.#refpromise) {
"op_ffi_unsafe_callback_ref", // Re-refing
this.#rid, core.refOp(this.#refpromise[promiseIdSymbol]);
); } else {
this.#refpromise = core.opAsync(
"op_ffi_unsafe_callback_ref",
this.#rid,
);
}
} }
return this.#refcount; return this.#refcount;
} }
@ -374,7 +388,7 @@ class UnsafeCallback {
// Only decrement refcount if it is positive, and only // Only decrement refcount if it is positive, and only
// unref the callback if refcount reaches zero. // unref the callback if refcount reaches zero.
if (this.#refcount > 0 && --this.#refcount === 0) { if (this.#refcount > 0 && --this.#refcount === 0) {
ops.op_ffi_unsafe_callback_unref(this.#rid); core.unrefOp(this.#refpromise[promiseIdSymbol]);
} }
return this.#refcount; return this.#refcount;
} }

View file

@ -532,19 +532,6 @@ pub fn op_ffi_unsafe_callback_ref(
}) })
} }
#[op(fast)]
pub fn op_ffi_unsafe_callback_unref(
state: &mut deno_core::OpState,
rid: u32,
) -> Result<(), AnyError> {
state
.resource_table
.get::<UnsafeCallbackResource>(rid)?
.cancel
.cancel();
Ok(())
}
#[derive(Deserialize)] #[derive(Deserialize)]
pub struct RegisterCallbackArgs { pub struct RegisterCallbackArgs {
parameters: Vec<NativeType>, parameters: Vec<NativeType>,

View file

@ -29,7 +29,6 @@ use call::op_ffi_call_ptr;
use call::op_ffi_call_ptr_nonblocking; use call::op_ffi_call_ptr_nonblocking;
use callback::op_ffi_unsafe_callback_create; use callback::op_ffi_unsafe_callback_create;
use callback::op_ffi_unsafe_callback_ref; use callback::op_ffi_unsafe_callback_ref;
use callback::op_ffi_unsafe_callback_unref;
use dlfcn::op_ffi_load; use dlfcn::op_ffi_load;
use dlfcn::ForeignFunction; use dlfcn::ForeignFunction;
use r#static::op_ffi_get_static; use r#static::op_ffi_get_static;
@ -113,7 +112,6 @@ pub fn init<P: FfiPermissions + 'static>(unstable: bool) -> Extension {
op_ffi_read_ptr::decl::<P>(), op_ffi_read_ptr::decl::<P>(),
op_ffi_unsafe_callback_create::decl::<P>(), op_ffi_unsafe_callback_create::decl::<P>(),
op_ffi_unsafe_callback_ref::decl(), op_ffi_unsafe_callback_ref::decl(),
op_ffi_unsafe_callback_unref::decl(),
]) ])
.event_loop_middleware(|op_state_rc, _cx| { .event_loop_middleware(|op_state_rc, _cx| {
// FFI callbacks coming in from other threads will call in and get queued. // FFI callbacks coming in from other threads will call in and get queued.

View file

@ -26,6 +26,7 @@ const dylib = Deno.dlopen(
} as const, } as const,
); );
let retry = false;
const tripleLogCallback = () => { const tripleLogCallback = () => {
console.log("Sync"); console.log("Sync");
Promise.resolve().then(() => { Promise.resolve().then(() => {
@ -35,10 +36,18 @@ const tripleLogCallback = () => {
setTimeout(() => { setTimeout(() => {
console.log("Timeout"); console.log("Timeout");
callback.unref(); callback.unref();
if (retry) {
// Re-ref and retry the call to make sure re-refing works.
console.log("RETRY THREAD SAFE");
retry = false;
callback.ref();
dylib.symbols.call_stored_function_thread_safe_and_log();
}
}, 10); }, 10);
}; };
const callback = new Deno.UnsafeCallback( const callback = Deno.UnsafeCallback.threadSafe(
{ {
parameters: [], parameters: [],
result: "void", result: "void",
@ -57,10 +66,11 @@ console.log("STORED_FUNCTION called");
// Wait to make sure synch logging and async logging // Wait to make sure synch logging and async logging
await new Promise((res) => setTimeout(res, 100)); await new Promise((res) => setTimeout(res, 100));
// Ref twice to make sure both `Promise.resolve().then()` and `setTimeout()` // Ref once to make sure both `Promise.resolve().then()` and `setTimeout()`
// must resolve before isolate exists. // must resolve and unref before isolate exists.
callback.ref(); // One ref'ing has been done by `threadSafe` constructor.
callback.ref(); callback.ref();
console.log("THREAD SAFE"); console.log("THREAD SAFE");
retry = true;
dylib.symbols.call_stored_function_thread_safe_and_log(); dylib.symbols.call_stored_function_thread_safe_and_log();

View file

@ -225,6 +225,11 @@ fn event_loop_integration() {
Sync\n\ Sync\n\
Async\n\ Async\n\
STORED_FUNCTION called\n\ STORED_FUNCTION called\n\
Timeout\n\
RETRY THREAD SAFE\n\
Sync\n\
Async\n\
STORED_FUNCTION called\n\
Timeout\n"; Timeout\n";
assert_eq!(stdout, expected); assert_eq!(stdout, expected);
assert_eq!(stderr, ""); assert_eq!(stderr, "");