diff --git a/Cargo.lock b/Cargo.lock index 31f069f0ae..944694a6d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1220,6 +1220,7 @@ dependencies = [ "base64 0.13.1", "deno_core", "hex", + "log", "num-bigint", "rand", "rusqlite", diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 3081917da4..74a8ed6b34 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -1756,3 +1756,53 @@ dbTest("atomic operation is exposed", (db) => { const ao = db.atomic(); assert(ao instanceof Deno.AtomicOperation); }); + +Deno.test({ + name: "racy open", + async fn() { + for (let i = 0; i < 100; i++) { + const filename = await Deno.makeTempFile({ prefix: "racy_open_db" }); + try { + const [db1, db2, db3] = await Promise.all([ + Deno.openKv(filename), + Deno.openKv(filename), + Deno.openKv(filename), + ]); + db1.close(); + db2.close(); + db3.close(); + } finally { + await Deno.remove(filename); + } + } + }, +}); + +Deno.test({ + name: "racy write", + async fn() { + const filename = await Deno.makeTempFile({ prefix: "racy_write_db" }); + const concurrency = 20; + const iterations = 5; + try { + const dbs = await Promise.all( + Array(concurrency).fill(0).map(() => Deno.openKv(filename)), + ); + try { + for (let i = 0; i < iterations; i++) { + await Promise.all( + dbs.map((db) => db.atomic().sum(["counter"], 1n).commit()), + ); + } + assertEquals( + ((await dbs[0].get(["counter"])).value as Deno.KvU64).value, + BigInt(concurrency * iterations), + ); + } finally { + dbs.forEach((db) => db.close()); + } + } finally { + await Deno.remove(filename); + } + }, +}); diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml index 7866605c05..645689b74b 100644 --- a/ext/kv/Cargo.toml +++ b/ext/kv/Cargo.toml @@ -19,6 +19,7 @@ async-trait.workspace = true base64.workspace = true deno_core.workspace = true hex.workspace = true +log.workspace = true num-bigint.workspace = true rand.workspace = true rusqlite.workspace = true diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index 808bf9b4f8..aea438d2df 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -3,6 +3,7 @@ use std::borrow::Cow; use std::cell::Cell; use std::cell::RefCell; +use std::future::Future; use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; @@ -21,6 +22,7 @@ use deno_core::task::spawn; use deno_core::task::spawn_blocking; use deno_core::AsyncRefCell; use deno_core::OpState; +use rand::Rng; use rusqlite::params; use rusqlite::OpenFlags; use rusqlite::OptionalExtension; @@ -165,28 +167,41 @@ impl DatabaseHandler for SqliteDbHandler

{ } } - let default_storage_dir = self.default_storage_dir.clone(); - let conn = spawn_blocking(move || { - let conn = match (path.as_deref(), &default_storage_dir) { - (Some(":memory:"), _) | (None, None) => { - rusqlite::Connection::open_in_memory()? - } - (Some(path), _) => { - let flags = - OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); - rusqlite::Connection::open_with_flags(path, flags)? - } - (None, Some(path)) => { - std::fs::create_dir_all(path)?; - let path = path.join("kv.sqlite3"); - rusqlite::Connection::open(&path)? - } - }; + let conn = sqlite_retry_loop(|| { + let path = path.clone(); + let default_storage_dir = self.default_storage_dir.clone(); + async move { + spawn_blocking(move || { + let conn = match (path.as_deref(), &default_storage_dir) { + (Some(":memory:"), _) | (None, None) => { + rusqlite::Connection::open_in_memory()? + } + (Some(path), _) => { + let flags = + OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); + rusqlite::Connection::open_with_flags(path, flags)? + } + (None, Some(path)) => { + std::fs::create_dir_all(path)?; + let path = path.join("kv.sqlite3"); + rusqlite::Connection::open(path)? + } + }; - conn.pragma_update(None, "journal_mode", "wal")?; - conn.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?; + conn.pragma_update(None, "journal_mode", "wal")?; - let current_version: usize = conn + Ok::<_, AnyError>(conn) + }) + .await + .unwrap() + } + }) + .await?; + let conn = Rc::new(AsyncRefCell::new(Cell::new(Some(conn)))); + SqliteDb::run_tx(conn.clone(), |tx| { + tx.execute(STATEMENT_CREATE_MIGRATION_TABLE, [])?; + + let current_version: usize = tx .query_row( "select version from migration_state where k = 0", [], @@ -198,21 +213,22 @@ impl DatabaseHandler for SqliteDbHandler

{ for (i, migration) in MIGRATIONS.iter().enumerate() { let version = i + 1; if version > current_version { - conn.execute_batch(migration)?; - conn.execute( + tx.execute_batch(migration)?; + tx.execute( "replace into migration_state (k, version) values(?, ?)", [&0, &version], )?; } } - Ok::<_, AnyError>(conn) + tx.commit()?; + + Ok(()) }) - .await - .unwrap()?; + .await?; Ok(SqliteDb { - conn: Rc::new(AsyncRefCell::new(Cell::new(Some(conn)))), + conn, queue: OnceCell::new(), }) } @@ -223,11 +239,48 @@ pub struct SqliteDb { queue: OnceCell, } +async fn sqlite_retry_loop>>( + mut f: impl FnMut() -> Fut, +) -> Result { + loop { + match f().await { + Ok(x) => return Ok(x), + Err(e) => { + if let Some(x) = e.downcast_ref::() { + if x.sqlite_error_code() == Some(rusqlite::ErrorCode::DatabaseBusy) { + log::debug!("kv: Database is busy, retrying"); + tokio::time::sleep(Duration::from_millis( + rand::thread_rng().gen_range(5..20), + )) + .await; + continue; + } + } + return Err(e); + } + } + } +} + impl SqliteDb { async fn run_tx( conn: Rc>>>, f: F, ) -> Result + where + F: (FnOnce(rusqlite::Transaction<'_>) -> Result) + + Clone + + Send + + 'static, + R: Send + 'static, + { + sqlite_retry_loop(|| Self::run_tx_inner(conn.clone(), f.clone())).await + } + + async fn run_tx_inner( + conn: Rc>>>, + f: F, + ) -> Result where F: (FnOnce(rusqlite::Transaction<'_>) -> Result) + Send @@ -579,9 +632,10 @@ impl Database for SqliteDb { requests: Vec, _options: SnapshotReadOptions, ) -> Result, AnyError> { + let requests = Arc::new(requests); Self::run_tx(self.conn.clone(), move |tx| { let mut responses = Vec::with_capacity(requests.len()); - for request in requests { + for request in &*requests { let mut stmt = tx.prepare_cached(if request.reverse { STATEMENT_KV_RANGE_SCAN_REVERSE } else { @@ -622,9 +676,10 @@ impl Database for SqliteDb { &self, write: AtomicWrite, ) -> Result, AnyError> { + let write = Arc::new(write); let (has_enqueues, commit_result) = Self::run_tx(self.conn.clone(), move |tx| { - for check in write.checks { + for check in &write.checks { let real_versionstamp = tx .prepare_cached(STATEMENT_KV_POINT_GET_VERSION_ONLY)? .query_row([check.key.as_slice()], |row| row.get(0)) @@ -639,10 +694,10 @@ impl Database for SqliteDb { .prepare_cached(STATEMENT_INC_AND_GET_DATA_VERSION)? .query_row([], |row| row.get(0))?; - for mutation in write.mutations { - match mutation.kind { + for mutation in &write.mutations { + match &mutation.kind { MutationKind::Set(value) => { - let (value, encoding) = encode_value(&value); + let (value, encoding) = encode_value(value); let changed = tx .prepare_cached(STATEMENT_KV_POINT_SET)? .execute(params![mutation.key, &value, &encoding, &version])?; @@ -659,7 +714,7 @@ impl Database for SqliteDb { &tx, &mutation.key, "sum", - &operand, + operand, version, |a, b| a.wrapping_add(b), )?; @@ -669,7 +724,7 @@ impl Database for SqliteDb { &tx, &mutation.key, "min", - &operand, + operand, version, |a, b| a.min(b), )?; @@ -679,7 +734,7 @@ impl Database for SqliteDb { &tx, &mutation.key, "max", - &operand, + operand, version, |a, b| a.max(b), )?; @@ -693,12 +748,13 @@ impl Database for SqliteDb { .as_millis() as u64; let has_enqueues = !write.enqueues.is_empty(); - for enqueue in write.enqueues { + for enqueue in &write.enqueues { let id = Uuid::new_v4().to_string(); let backoff_schedule = serde_json::to_string( &enqueue .backoff_schedule - .or_else(|| Some(DEFAULT_BACKOFF_SCHEDULE.to_vec())), + .as_deref() + .or_else(|| Some(&DEFAULT_BACKOFF_SCHEDULE[..])), )?; let keys_if_undelivered = serde_json::to_string(&enqueue.keys_if_undelivered)?;