1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

fix(kv_queues): graceful shutdown (#20627)

This fixes the `TypeError: Database closed` error during shutdown.
This commit is contained in:
Igor Zinkovsky 2023-09-26 20:06:57 -07:00 committed by GitHub
parent b433133a1f
commit f0a022bed4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 79 additions and 62 deletions

View file

@ -1819,6 +1819,16 @@ Deno.test({
},
});
Deno.test({
name: "queue graceful close",
async fn() {
const db: Deno.Kv = await Deno.openKv(":memory:");
const listener = db.listenQueue((_msg) => {});
db.close();
await listener;
},
});
dbTest("atomic operation is exposed", (db) => {
assert(Deno.AtomicOperation);
const ao = db.atomic();

View file

@ -61,7 +61,6 @@ const kvSymbol = Symbol("KvRid");
class Kv {
#rid: number;
#closed: boolean;
constructor(rid: number = undefined, symbol: symbol = undefined) {
if (kvSymbol !== symbol) {
@ -70,7 +69,6 @@ class Kv {
);
}
this.#rid = rid;
this.#closed = false;
}
atomic() {
@ -251,20 +249,14 @@ class Kv {
handler: (message: unknown) => Promise<void> | void,
): Promise<void> {
const finishMessageOps = new Map<number, Promise<void>>();
while (!this.#closed) {
while (true) {
// Wait for the next message.
let next: { 0: Uint8Array; 1: number };
try {
next = await core.opAsync(
const next: { 0: Uint8Array; 1: number } = await core.opAsync(
"op_kv_dequeue_next_message",
this.#rid,
);
} catch (error) {
if (this.#closed) {
if (next === null) {
break;
} else {
throw error;
}
}
// Deserialize the payload.
@ -283,9 +275,6 @@ class Kv {
} catch (error) {
console.error("Exception in queue handler", error);
} finally {
if (this.#closed) {
core.close(handleId);
} else {
const promise: Promise<void> = core.opAsync(
"op_kv_finish_dequeued_message",
handleId,
@ -298,7 +287,6 @@ class Kv {
finishMessageOps.delete(handleId);
}
}
}
})();
}
@ -310,7 +298,6 @@ class Kv {
close() {
core.close(this.#rid);
this.#closed = true;
}
}

View file

@ -132,7 +132,7 @@ pub trait DynamicDb {
async fn dyn_dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Box<dyn QueueMessageHandle>, AnyError>;
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;
fn dyn_close(&self);
}
@ -161,7 +161,7 @@ impl Database for Box<dyn DynamicDb> {
async fn dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
(**self).dyn_dequeue_next_message(state).await
}
@ -196,8 +196,13 @@ where
async fn dyn_dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Box<dyn QueueMessageHandle>, AnyError> {
Ok(Box::new(self.dequeue_next_message(state).await?))
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
Ok(
self
.dequeue_next_message(state)
.await?
.map(|x| Box::new(x) as Box<dyn QueueMessageHandle>),
)
}
fn dyn_close(&self) {

View file

@ -43,7 +43,7 @@ pub trait Database {
async fn dequeue_next_message(
&self,
state: Rc<RefCell<OpState>>,
) -> Result<Self::QMH, AnyError>;
) -> Result<Option<Self::QMH>, AnyError>;
fn close(&self);
}

View file

@ -16,6 +16,7 @@ use chrono::Utc;
use codec::decode_key;
use codec::encode_key;
use deno_core::anyhow::Context;
use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;
@ -322,24 +323,35 @@ impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
async fn op_kv_dequeue_next_message<DBH>(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<(ToJsBuffer, ResourceId), AnyError>
) -> Result<Option<(ToJsBuffer, ResourceId)>, AnyError>
where
DBH: DatabaseHandler + 'static,
{
let db = {
let state = state.borrow();
let resource =
state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
match state.resource_table.get::<DatabaseResource<DBH::DB>>(rid) {
Ok(resource) => resource,
Err(err) => {
if get_custom_error_class(&err) == Some("BadResource") {
return Ok(None);
} else {
return Err(err);
}
}
};
resource.db.clone()
};
let mut handle = db.dequeue_next_message(state.clone()).await?;
let Some(mut handle) = db.dequeue_next_message(state.clone()).await? else {
return Ok(None);
};
let payload = handle.take_payload().await?.into();
let handle_rid = {
let mut state = state.borrow_mut();
state.resource_table.add(QueueMessageResource { handle })
};
Ok((payload, handle_rid))
Ok(Some((payload, handle_rid)))
}
#[op2(async)]

View file

@ -277,7 +277,7 @@ impl<P: RemoteDbHandlerPermissions> Database for RemoteDb<P> {
async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
) -> Result<Self::QMH, AnyError> {
) -> Result<Option<Self::QMH>, AnyError> {
let msg = "Deno.Kv.listenQueue is not supported for remote KV databases";
eprintln!("{}", yellow(msg));
deno_core::futures::future::pending().await

View file

@ -395,9 +395,7 @@ impl QueueMessageHandle for DequeuedMessage {
Err(e) => {
// Silently ignore the error if the database has been closed
// This message will be delivered on the next run
if get_custom_error_class(&e) == Some("TypeError")
&& e.to_string() == ERROR_USING_CLOSED_DATABASE
{
if is_conn_closed_error(&e) {
return Ok(());
}
return Err(e);
@ -437,25 +435,25 @@ impl SqliteQueue {
spawn(async move {
// Oneshot requeue of all inflight messages.
Self::requeue_inflight_messages(conn.clone()).await.unwrap();
if let Err(e) = Self::requeue_inflight_messages(conn.clone()).await {
// Exit the dequeue loop cleanly if the database has been closed.
if is_conn_closed_error(&e) {
return;
}
panic!("kv: Error in requeue_inflight_messages: {}", e);
}
// Continuous dequeue loop.
match Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
if let Err(e) =
Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
.await
{
Ok(_) => Ok(()),
Err(e) => {
// Exit the dequeue loop cleanly if the database has been closed.
if get_custom_error_class(&e) == Some("TypeError")
&& e.to_string() == ERROR_USING_CLOSED_DATABASE
{
Ok(())
} else {
Err(e)
if is_conn_closed_error(&e) {
return;
}
panic!("kv: Error in dequeue_loop: {}", e);
}
}
.unwrap();
});
Self {
@ -467,25 +465,25 @@ impl SqliteQueue {
}
}
async fn dequeue(&self) -> Result<DequeuedMessage, AnyError> {
async fn dequeue(&self) -> Result<Option<DequeuedMessage>, AnyError> {
// Wait for the next message to be available from dequeue_rx.
let (payload, id) = {
let mut queue_rx = self.dequeue_rx.borrow_mut().await;
let Some(msg) = queue_rx.recv().await else {
return Err(type_error("Database closed"));
return Ok(None);
};
msg
};
let permit = self.concurrency_limiter.clone().acquire_owned().await?;
Ok(DequeuedMessage {
Ok(Some(DequeuedMessage {
conn: self.conn.downgrade(),
id,
payload: Some(payload),
waker_tx: self.waker_tx.clone(),
_permit: permit,
})
}))
}
async fn wake(&self) -> Result<(), AnyError> {
@ -904,7 +902,7 @@ impl Database for SqliteDb {
async fn dequeue_next_message(
&self,
_state: Rc<RefCell<OpState>>,
) -> Result<Self::QMH, AnyError> {
) -> Result<Option<Self::QMH>, AnyError> {
let queue = self
.queue
.get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) })
@ -1013,3 +1011,8 @@ fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) {
}
}
}
fn is_conn_closed_error(e: &AnyError) -> bool {
get_custom_error_class(e) == Some("TypeError")
&& e.to_string() == ERROR_USING_CLOSED_DATABASE
}