1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-28 16:20:57 -05:00

fix(ext/kv): don't panic if listening on queues and KV is not closed (#20317)

fixes #20312
This commit is contained in:
Igor Zinkovsky 2023-08-29 11:24:44 -07:00 committed by GitHub
parent c4451d3076
commit 441b860978
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 14 deletions

View file

@ -46,6 +46,7 @@ util::unit_test_factory!(
intl_test,
io_test,
kv_test,
kv_queue_test_no_db_close,
kv_queue_undelivered_test,
link_test,
make_temp_test,

View file

@ -0,0 +1,25 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import {
assert,
assertEquals,
assertNotEquals,
deferred,
} from "./test_util.ts";
Deno.test({
sanitizeOps: false,
sanitizeResources: false,
}, async function queueTestNoDbClose() {
const db: Deno.Kv = await Deno.openKv(":memory:");
const promise = deferred();
let dequeuedMessage: unknown = null;
db.listenQueue((msg) => {
dequeuedMessage = msg;
promise.resolve();
});
const res = await db.enqueue("test");
assert(res.ok);
assertNotEquals(res.versionstamp, null);
await promise;
assertEquals(dequeuedMessage, "test");
});

View file

@ -290,16 +290,7 @@ pub struct SqliteDb {
impl Drop for SqliteDb {
fn drop(&mut self) {
self.expiration_watcher.abort();
// The above `abort()` operation is asynchronous. It's not
// guaranteed that the sqlite connection will be closed immediately.
// So here we synchronously take the conn mutex and drop the connection.
//
// This blocks the event loop if the connection is still being used,
// but ensures correctness - deleting the database file after calling
// the `close` method will always work.
self.conn.conn.lock().unwrap().take();
self.close();
}
}
@ -449,9 +440,22 @@ impl SqliteQueue {
Self::requeue_inflight_messages(conn.clone()).await.unwrap();
// Continuous dequeue loop.
Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
match Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
.await
.unwrap();
{
Ok(_) => Ok(()),
Err(e) => {
// Exit the dequeue loop cleanly if the database has been closed.
if get_custom_error_class(&e) == Some("TypeError")
&& e.to_string() == ERROR_USING_CLOSED_DATABASE
{
Ok(())
} else {
Err(e)
}
}
}
.unwrap();
});
Self {
@ -490,7 +494,7 @@ impl SqliteQueue {
}
fn shutdown(&self) {
self.shutdown_tx.send(()).unwrap();
let _ = self.shutdown_tx.send(());
}
async fn dequeue_loop(
@ -573,7 +577,7 @@ impl SqliteQueue {
};
tokio::select! {
_ = sleep_fut => {}
_ = waker_rx.recv() => {}
x = waker_rx.recv() => if x.is_none() {return Ok(());},
_ = shutdown_rx.changed() => return Ok(())
}
}
@ -913,6 +917,17 @@ impl Database for SqliteDb {
if let Some(queue) = self.queue.get() {
queue.shutdown();
}
self.expiration_watcher.abort();
// The above `abort()` operation is asynchronous. It's not
// guaranteed that the sqlite connection will be closed immediately.
// So here we synchronously take the conn mutex and drop the connection.
//
// This blocks the event loop if the connection is still being used,
// but ensures correctness - deleting the database file after calling
// the `close` method will always work.
self.conn.conn.lock().unwrap().take();
}
}