From 61b91e10ad41e6d207d60113a2f6f2b63a706940 Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Fri, 29 Sep 2023 11:40:36 -0700 Subject: [PATCH] fix(ext/kv): send queue wake messages accross different kv instances (#20465) fixes #20454 Current KV queues implementation assumes that `enqueue` and `listenQueue` are called on the same instance of `Deno.Kv`. It's possible that the same Deno process opens multiple KV instances pointing to the same fs path, and in that case `listenQueue` should still get notified of messages enqueued through a different KV instance. --- Cargo.lock | 1 + cli/tests/unit/kv_test.ts | 34 ++++++++ ext/kv/Cargo.toml | 1 + ext/kv/sqlite.rs | 163 ++++++++++++++++++++++++++++++-------- 4 files changed, 165 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 192f522fcb..d26fa833b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1430,6 +1430,7 @@ dependencies = [ "base64 0.13.1", "chrono", "deno_core", + "deno_node", "deno_unsync 0.1.1", "hex", "log", diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 25abf4cde2..7cb8ebccfa 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -1819,6 +1819,40 @@ Deno.test({ }, }); +Deno.test({ + name: "different kv instances for enqueue and queueListen", + async fn() { + const filename = await Deno.makeTempFile({ prefix: "queue_db" }); + try { + const db0 = await Deno.openKv(filename); + const db1 = await Deno.openKv(filename); + const promise = deferred(); + let dequeuedMessage: unknown = null; + const listener = db0.listenQueue((msg) => { + dequeuedMessage = msg; + promise.resolve(); + }); + try { + const res = await db1.enqueue("test"); + assert(res.ok); + assertNotEquals(res.versionstamp, null); + await promise; + assertEquals(dequeuedMessage, "test"); + } finally { + db0.close(); + await listener; + db1.close(); + } + } finally { + try { + await Deno.remove(filename); + } catch { + // pass + } + } + }, +}); + Deno.test({ name: "queue graceful close", async fn() { diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml index 890c8370ca..0dbf15b721 100644 --- a/ext/kv/Cargo.toml +++ b/ext/kv/Cargo.toml @@ -19,6 +19,7 @@ async-trait.workspace = true base64.workspace = true chrono.workspace = true deno_core.workspace = true +deno_node.workspace = true deno_unsync = "0.1.1" hex.workspace = true log.workspace = true diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index 192141e275..327091f057 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -2,7 +2,10 @@ use std::borrow::Cow; use std::cell::RefCell; +use std::collections::HashMap; +use std::env::current_dir; use std::future::Future; +use std::io::ErrorKind; use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; @@ -23,11 +26,14 @@ use deno_core::unsync::spawn; use deno_core::unsync::spawn_blocking; use deno_core::AsyncRefCell; use deno_core::OpState; +use deno_node::PathClean; use rand::Rng; use rusqlite::params; use rusqlite::OpenFlags; use rusqlite::OptionalExtension; use rusqlite::Transaction; +use tokio::sync::broadcast; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::mpsc; use tokio::sync::watch; use tokio::sync::OnceCell; @@ -212,30 +218,35 @@ impl DatabaseHandler for SqliteDbHandler

{ } } - let conn = sqlite_retry_loop(|| { + let (conn, queue_waker_key) = sqlite_retry_loop(|| { let path = path.clone(); let default_storage_dir = self.default_storage_dir.clone(); async move { spawn_blocking(move || { - let conn = match (path.as_deref(), &default_storage_dir) { - (Some(":memory:"), _) | (None, None) => { - rusqlite::Connection::open_in_memory()? - } - (Some(path), _) => { - let flags = - OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); - rusqlite::Connection::open_with_flags(path, flags)? - } - (None, Some(path)) => { - std::fs::create_dir_all(path)?; - let path = path.join("kv.sqlite3"); - rusqlite::Connection::open(path)? - } - }; + let (conn, queue_waker_key) = + match (path.as_deref(), &default_storage_dir) { + (Some(":memory:"), _) | (None, None) => { + (rusqlite::Connection::open_in_memory()?, None) + } + (Some(path), _) => { + let flags = + OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); + let resolved_path = canonicalize_path(&PathBuf::from(path))?; + ( + rusqlite::Connection::open_with_flags(path, flags)?, + Some(resolved_path), + ) + } + (None, Some(path)) => { + std::fs::create_dir_all(path)?; + let path = path.join("kv.sqlite3"); + (rusqlite::Connection::open(path.clone())?, Some(path)) + } + }; conn.pragma_update(None, "journal_mode", "wal")?; - Ok::<_, AnyError>(conn) + Ok::<_, AnyError>((conn, queue_waker_key)) }) .await .unwrap() @@ -277,6 +288,7 @@ impl DatabaseHandler for SqliteDbHandler

{ Ok(SqliteDb { conn, queue: OnceCell::new(), + queue_waker_key, expiration_watcher, }) } @@ -285,6 +297,7 @@ impl DatabaseHandler for SqliteDbHandler

{ pub struct SqliteDb { conn: ProtectedConn, queue: OnceCell, + queue_waker_key: Option, expiration_watcher: deno_core::unsync::JoinHandle<()>, } @@ -363,7 +376,7 @@ pub struct DequeuedMessage { conn: WeakProtectedConn, id: String, payload: Option>, - waker_tx: mpsc::Sender<()>, + waker_tx: broadcast::Sender<()>, _permit: OwnedSemaphorePermit, } @@ -403,7 +416,7 @@ impl QueueMessageHandle for DequeuedMessage { }; if requeued { // If the message was requeued, wake up the dequeue loop. - self.waker_tx.send(()).await?; + let _ = self.waker_tx.send(()); } Ok(()) } @@ -422,15 +435,18 @@ struct SqliteQueue { conn: ProtectedConn, dequeue_rx: Rc>, concurrency_limiter: Arc, - waker_tx: mpsc::Sender<()>, + waker_tx: broadcast::Sender<()>, shutdown_tx: watch::Sender<()>, } impl SqliteQueue { - fn new(conn: ProtectedConn) -> Self { + fn new( + conn: ProtectedConn, + waker_tx: broadcast::Sender<()>, + waker_rx: broadcast::Receiver<()>, + ) -> Self { let conn_clone = conn.clone(); let (shutdown_tx, shutdown_rx) = watch::channel::<()>(()); - let (waker_tx, waker_rx) = mpsc::channel::<()>(1); let (dequeue_tx, dequeue_rx) = mpsc::channel::<(Vec, String)>(64); spawn(async move { @@ -486,11 +502,6 @@ impl SqliteQueue { })) } - async fn wake(&self) -> Result<(), AnyError> { - self.waker_tx.send(()).await?; - Ok(()) - } - fn shutdown(&self) { let _ = self.shutdown_tx.send(()); } @@ -499,7 +510,7 @@ impl SqliteQueue { conn: ProtectedConn, dequeue_tx: mpsc::Sender<(Vec, String)>, mut shutdown_rx: watch::Receiver<()>, - mut waker_rx: mpsc::Receiver<()>, + mut waker_rx: broadcast::Receiver<()>, ) -> Result<(), AnyError> { loop { let messages = SqliteDb::run_tx(conn.clone(), move |tx| { @@ -575,7 +586,9 @@ impl SqliteQueue { }; tokio::select! { _ = sleep_fut => {} - x = waker_rx.recv() => if x.is_none() {return Ok(());}, + x = waker_rx.recv() => { + if let Err(RecvError::Closed) = x {return Ok(());} + }, _ = shutdown_rx.changed() => return Ok(()) } } @@ -773,7 +786,7 @@ impl Database for SqliteDb { async fn atomic_write( &self, - _state: Rc>, + state: Rc>, write: AtomicWrite, ) -> Result, AnyError> { let write = Arc::new(write); @@ -892,8 +905,17 @@ impl Database for SqliteDb { .await?; if has_enqueues { - if let Some(queue) = self.queue.get() { - queue.wake().await?; + match self.queue.get() { + Some(queue) => { + let _ = queue.waker_tx.send(()); + } + None => { + if let Some(waker_key) = &self.queue_waker_key { + let (waker_tx, _) = + shared_queue_waker_channel(waker_key, state.clone()); + let _ = waker_tx.send(()); + } + } } } Ok(commit_result) @@ -901,11 +923,21 @@ impl Database for SqliteDb { async fn dequeue_next_message( &self, - _state: Rc>, + state: Rc>, ) -> Result, AnyError> { let queue = self .queue - .get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) }) + .get_or_init(|| async move { + let (waker_tx, waker_rx) = { + match &self.queue_waker_key { + Some(waker_key) => { + shared_queue_waker_channel(waker_key, state.clone()) + } + None => broadcast::channel(1), + } + }; + SqliteQueue::new(self.conn.clone(), waker_tx, waker_rx) + }) .await; let handle = queue.dequeue().await?; Ok(handle) @@ -1012,6 +1044,69 @@ fn encode_value(value: &crate::Value) -> (Cow<'_, [u8]>, i64) { } } +pub struct QueueWaker { + wakers_tx: HashMap>, +} + +fn shared_queue_waker_channel( + waker_key: &Path, + state: Rc>, +) -> (broadcast::Sender<()>, broadcast::Receiver<()>) { + let mut state = state.borrow_mut(); + let waker = { + let waker = state.try_borrow_mut::(); + match waker { + Some(waker) => waker, + None => { + let waker = QueueWaker { + wakers_tx: HashMap::new(), + }; + state.put::(waker); + state.borrow_mut::() + } + } + }; + + let waker_tx = waker + .wakers_tx + .entry(waker_key.to_path_buf()) + .or_insert_with(|| { + let (waker_tx, _) = broadcast::channel(1); + waker_tx + }); + + (waker_tx.clone(), waker_tx.subscribe()) +} + +/// Same as Path::canonicalize, but also handles non-existing paths. +fn canonicalize_path(path: &Path) -> Result { + let path = path.to_path_buf().clean(); + let mut path = path; + let mut names_stack = Vec::new(); + loop { + match path.canonicalize() { + Ok(mut canonicalized_path) => { + for name in names_stack.into_iter().rev() { + canonicalized_path = canonicalized_path.join(name); + } + return Ok(canonicalized_path); + } + Err(err) if err.kind() == ErrorKind::NotFound => { + let file_name = path.file_name().map(|os_str| os_str.to_os_string()); + if let Some(file_name) = file_name { + names_stack.push(file_name.to_str().unwrap().to_string()); + path = path.parent().unwrap().to_path_buf(); + } else { + names_stack.push(path.to_str().unwrap().to_string()); + let current_dir = current_dir()?; + path = current_dir.clone(); + } + } + Err(err) => return Err(err.into()), + } + } +} + fn is_conn_closed_error(e: &AnyError) -> bool { get_custom_error_class(e) == Some("TypeError") && e.to_string() == ERROR_USING_CLOSED_DATABASE