diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index da5dba6af2..25abf4cde2 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -1819,6 +1819,16 @@ Deno.test({ }, }); +Deno.test({ + name: "queue graceful close", + async fn() { + const db: Deno.Kv = await Deno.openKv(":memory:"); + const listener = db.listenQueue((_msg) => {}); + db.close(); + await listener; + }, +}); + dbTest("atomic operation is exposed", (db) => { assert(Deno.AtomicOperation); const ao = db.atomic(); diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index e934a3b6dc..6e8a571f0c 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -61,7 +61,6 @@ const kvSymbol = Symbol("KvRid"); class Kv { #rid: number; - #closed: boolean; constructor(rid: number = undefined, symbol: symbol = undefined) { if (kvSymbol !== symbol) { @@ -70,7 +69,6 @@ class Kv { ); } this.#rid = rid; - this.#closed = false; } atomic() { @@ -251,20 +249,14 @@ class Kv { handler: (message: unknown) => Promise | void, ): Promise { const finishMessageOps = new Map>(); - while (!this.#closed) { + while (true) { // Wait for the next message. - let next: { 0: Uint8Array; 1: number }; - try { - next = await core.opAsync( - "op_kv_dequeue_next_message", - this.#rid, - ); - } catch (error) { - if (this.#closed) { - break; - } else { - throw error; - } + const next: { 0: Uint8Array; 1: number } = await core.opAsync( + "op_kv_dequeue_next_message", + this.#rid, + ); + if (next === null) { + break; } // Deserialize the payload. @@ -283,20 +275,16 @@ class Kv { } catch (error) { console.error("Exception in queue handler", error); } finally { - if (this.#closed) { - core.close(handleId); - } else { - const promise: Promise = core.opAsync( - "op_kv_finish_dequeued_message", - handleId, - success, - ); - finishMessageOps.set(handleId, promise); - try { - await promise; - } finally { - finishMessageOps.delete(handleId); - } + const promise: Promise = core.opAsync( + "op_kv_finish_dequeued_message", + handleId, + success, + ); + finishMessageOps.set(handleId, promise); + try { + await promise; + } finally { + finishMessageOps.delete(handleId); } } })(); @@ -310,7 +298,6 @@ class Kv { close() { core.close(this.#rid); - this.#closed = true; } } diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs index f79c10f559..9084cc1bf2 100644 --- a/ext/kv/dynamic.rs +++ b/ext/kv/dynamic.rs @@ -132,7 +132,7 @@ pub trait DynamicDb { async fn dyn_dequeue_next_message( &self, state: Rc>, - ) -> Result, AnyError>; + ) -> Result>, AnyError>; fn dyn_close(&self); } @@ -161,7 +161,7 @@ impl Database for Box { async fn dequeue_next_message( &self, state: Rc>, - ) -> Result, AnyError> { + ) -> Result>, AnyError> { (**self).dyn_dequeue_next_message(state).await } @@ -196,8 +196,13 @@ where async fn dyn_dequeue_next_message( &self, state: Rc>, - ) -> Result, AnyError> { - Ok(Box::new(self.dequeue_next_message(state).await?)) + ) -> Result>, AnyError> { + Ok( + self + .dequeue_next_message(state) + .await? + .map(|x| Box::new(x) as Box), + ) } fn dyn_close(&self) { diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs index abeaf8dd51..1acf3ce165 100644 --- a/ext/kv/interface.rs +++ b/ext/kv/interface.rs @@ -43,7 +43,7 @@ pub trait Database { async fn dequeue_next_message( &self, state: Rc>, - ) -> Result; + ) -> Result, AnyError>; fn close(&self); } diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index 72d5e862b0..762009d2a4 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -16,6 +16,7 @@ use chrono::Utc; use codec::decode_key; use codec::encode_key; use deno_core::anyhow::Context; +use deno_core::error::get_custom_error_class; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op2; @@ -322,24 +323,35 @@ impl Resource for QueueMessageResource { async fn op_kv_dequeue_next_message( state: Rc>, #[smi] rid: ResourceId, -) -> Result<(ToJsBuffer, ResourceId), AnyError> +) -> Result, AnyError> where DBH: DatabaseHandler + 'static, { let db = { let state = state.borrow(); let resource = - state.resource_table.get::>(rid)?; + match state.resource_table.get::>(rid) { + Ok(resource) => resource, + Err(err) => { + if get_custom_error_class(&err) == Some("BadResource") { + return Ok(None); + } else { + return Err(err); + } + } + }; resource.db.clone() }; - let mut handle = db.dequeue_next_message(state.clone()).await?; + let Some(mut handle) = db.dequeue_next_message(state.clone()).await? else { + return Ok(None); + }; let payload = handle.take_payload().await?.into(); let handle_rid = { let mut state = state.borrow_mut(); state.resource_table.add(QueueMessageResource { handle }) }; - Ok((payload, handle_rid)) + Ok(Some((payload, handle_rid))) } #[op2(async)] diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs index fc18e4615d..36c4d3af20 100644 --- a/ext/kv/remote.rs +++ b/ext/kv/remote.rs @@ -277,7 +277,7 @@ impl Database for RemoteDb

{ async fn dequeue_next_message( &self, _state: Rc>, - ) -> Result { + ) -> Result, AnyError> { let msg = "Deno.Kv.listenQueue is not supported for remote KV databases"; eprintln!("{}", yellow(msg)); deno_core::futures::future::pending().await diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index ceeb98c259..192141e275 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -395,9 +395,7 @@ impl QueueMessageHandle for DequeuedMessage { Err(e) => { // Silently ignore the error if the database has been closed // This message will be delivered on the next run - if get_custom_error_class(&e) == Some("TypeError") - && e.to_string() == ERROR_USING_CLOSED_DATABASE - { + if is_conn_closed_error(&e) { return Ok(()); } return Err(e); @@ -437,25 +435,25 @@ impl SqliteQueue { spawn(async move { // Oneshot requeue of all inflight messages. - Self::requeue_inflight_messages(conn.clone()).await.unwrap(); + if let Err(e) = Self::requeue_inflight_messages(conn.clone()).await { + // Exit the dequeue loop cleanly if the database has been closed. + if is_conn_closed_error(&e) { + return; + } + panic!("kv: Error in requeue_inflight_messages: {}", e); + } // Continuous dequeue loop. - match Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx) - .await + if let Err(e) = + Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx) + .await { - Ok(_) => Ok(()), - Err(e) => { - // Exit the dequeue loop cleanly if the database has been closed. - if get_custom_error_class(&e) == Some("TypeError") - && e.to_string() == ERROR_USING_CLOSED_DATABASE - { - Ok(()) - } else { - Err(e) - } + // Exit the dequeue loop cleanly if the database has been closed. + if is_conn_closed_error(&e) { + return; } + panic!("kv: Error in dequeue_loop: {}", e); } - .unwrap(); }); Self { @@ -467,25 +465,25 @@ impl SqliteQueue { } } - async fn dequeue(&self) -> Result { + async fn dequeue(&self) -> Result, AnyError> { // Wait for the next message to be available from dequeue_rx. let (payload, id) = { let mut queue_rx = self.dequeue_rx.borrow_mut().await; let Some(msg) = queue_rx.recv().await else { - return Err(type_error("Database closed")); + return Ok(None); }; msg }; let permit = self.concurrency_limiter.clone().acquire_owned().await?; - Ok(DequeuedMessage { + Ok(Some(DequeuedMessage { conn: self.conn.downgrade(), id, payload: Some(payload), waker_tx: self.waker_tx.clone(), _permit: permit, - }) + })) } async fn wake(&self) -> Result<(), AnyError> { @@ -904,7 +902,7 @@ impl Database for SqliteDb { async fn dequeue_next_message( &self, _state: Rc>, - ) -> Result { + ) -> Result, AnyError> { let queue = self .queue .get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) }) @@ -1013,3 +1011,8 @@ fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) { } } } + +fn is_conn_closed_error(e: &AnyError) -> bool { + get_custom_error_class(e) == Some("TypeError") + && e.to_string() == ERROR_USING_CLOSED_DATABASE +}