mirror of
https://github.com/denoland/deno.git
synced 2024-11-29 16:30:56 -05:00
fix(ext/kv): don't panic if listening on queues and KV is not closed (#20317)
fixes #20312
This commit is contained in:
parent
430c1f635a
commit
7043521ce8
3 changed files with 55 additions and 14 deletions
|
@ -46,6 +46,7 @@ util::unit_test_factory!(
|
|||
intl_test,
|
||||
io_test,
|
||||
kv_test,
|
||||
kv_queue_test_no_db_close,
|
||||
kv_queue_undelivered_test,
|
||||
link_test,
|
||||
make_temp_test,
|
||||
|
|
25
cli/tests/unit/kv_queue_test_no_db_close.ts
Normal file
25
cli/tests/unit/kv_queue_test_no_db_close.ts
Normal file
|
@ -0,0 +1,25 @@
|
|||
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||
import {
|
||||
assert,
|
||||
assertEquals,
|
||||
assertNotEquals,
|
||||
deferred,
|
||||
} from "./test_util.ts";
|
||||
|
||||
Deno.test({
|
||||
sanitizeOps: false,
|
||||
sanitizeResources: false,
|
||||
}, async function queueTestNoDbClose() {
|
||||
const db: Deno.Kv = await Deno.openKv(":memory:");
|
||||
const promise = deferred();
|
||||
let dequeuedMessage: unknown = null;
|
||||
db.listenQueue((msg) => {
|
||||
dequeuedMessage = msg;
|
||||
promise.resolve();
|
||||
});
|
||||
const res = await db.enqueue("test");
|
||||
assert(res.ok);
|
||||
assertNotEquals(res.versionstamp, null);
|
||||
await promise;
|
||||
assertEquals(dequeuedMessage, "test");
|
||||
});
|
|
@ -290,16 +290,7 @@ pub struct SqliteDb {
|
|||
|
||||
impl Drop for SqliteDb {
|
||||
fn drop(&mut self) {
|
||||
self.expiration_watcher.abort();
|
||||
|
||||
// The above `abort()` operation is asynchronous. It's not
|
||||
// guaranteed that the sqlite connection will be closed immediately.
|
||||
// So here we synchronously take the conn mutex and drop the connection.
|
||||
//
|
||||
// This blocks the event loop if the connection is still being used,
|
||||
// but ensures correctness - deleting the database file after calling
|
||||
// the `close` method will always work.
|
||||
self.conn.conn.lock().unwrap().take();
|
||||
self.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -449,9 +440,22 @@ impl SqliteQueue {
|
|||
Self::requeue_inflight_messages(conn.clone()).await.unwrap();
|
||||
|
||||
// Continuous dequeue loop.
|
||||
Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
|
||||
match Self::dequeue_loop(conn.clone(), dequeue_tx, shutdown_rx, waker_rx)
|
||||
.await
|
||||
.unwrap();
|
||||
{
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
// Exit the dequeue loop cleanly if the database has been closed.
|
||||
if get_custom_error_class(&e) == Some("TypeError")
|
||||
&& e.to_string() == ERROR_USING_CLOSED_DATABASE
|
||||
{
|
||||
Ok(())
|
||||
} else {
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
Self {
|
||||
|
@ -490,7 +494,7 @@ impl SqliteQueue {
|
|||
}
|
||||
|
||||
fn shutdown(&self) {
|
||||
self.shutdown_tx.send(()).unwrap();
|
||||
let _ = self.shutdown_tx.send(());
|
||||
}
|
||||
|
||||
async fn dequeue_loop(
|
||||
|
@ -573,7 +577,7 @@ impl SqliteQueue {
|
|||
};
|
||||
tokio::select! {
|
||||
_ = sleep_fut => {}
|
||||
_ = waker_rx.recv() => {}
|
||||
x = waker_rx.recv() => if x.is_none() {return Ok(());},
|
||||
_ = shutdown_rx.changed() => return Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -913,6 +917,17 @@ impl Database for SqliteDb {
|
|||
if let Some(queue) = self.queue.get() {
|
||||
queue.shutdown();
|
||||
}
|
||||
|
||||
self.expiration_watcher.abort();
|
||||
|
||||
// The above `abort()` operation is asynchronous. It's not
|
||||
// guaranteed that the sqlite connection will be closed immediately.
|
||||
// So here we synchronously take the conn mutex and drop the connection.
|
||||
//
|
||||
// This blocks the event loop if the connection is still being used,
|
||||
// but ensures correctness - deleting the database file after calling
|
||||
// the `close` method will always work.
|
||||
self.conn.conn.lock().unwrap().take();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue