From 91cd0a2bef60e61eb2bc396a70fa0c0cc8ed4053 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Tue, 5 Dec 2023 14:21:46 +0100 Subject: [PATCH] feat(unstable): kv.watch() (#21147) This commit adds support for a new `kv.watch()` method that allows watching for changes to a key-value pair. This is useful for cases where you want to be notified when a key-value pair changes, but don't want to have to poll for changes. --------- Co-authored-by: losfair --- Cargo.lock | 19 +++-- Cargo.toml | 6 +- cli/tests/unit/kv_test.ts | 44 ++++++++++ cli/tsc/dts/lib.deno.unstable.d.ts | 42 ++++++++++ ext/kv/01_db.ts | 67 +++++++++++++++ ext/kv/dynamic.rs | 34 +++++--- ext/kv/lib.rs | 129 +++++++++++++++++++++++++++-- ext/kv/remote.rs | 9 ++ ext/kv/sqlite.rs | 62 +++++++------- runtime/errors.rs | 2 +- test_util/src/lib.rs | 2 + 11 files changed, 360 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a93de7a0d8..9229109251 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1703,13 +1703,14 @@ dependencies = [ [[package]] name = "denokv_proto" -version = "0.2.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8952fb8c38c1dcd796d49b00030afb74aa184160ae86817b72a32a994c8e16f0" +checksum = "98a79f7e98bfd3c148ce782c27c1494e77c3c94ab87c9e7e86e901cbc1643449" dependencies = [ "anyhow", "async-trait", "chrono", + "futures", "num-bigint", "prost", "prost-build", @@ -1719,15 +1720,17 @@ dependencies = [ [[package]] name = "denokv_remote" -version = "0.2.3" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edfc8447324d783b01e215bd5040ff9149c34d9715c7b7b5080dd648ebf1148a" +checksum = "518e181eb14f1a3b8fc423e48de431048249780fb0815d81e8139faf347c3269" dependencies = [ "anyhow", + "async-stream", "async-trait", "bytes", "chrono", "denokv_proto", + "futures", "log", "prost", "rand", @@ -1735,26 +1738,30 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-util", "url", "uuid", ] [[package]] name = "denokv_sqlite" -version = "0.2.1" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ec76b691ff069f14e56e3e053c2b2163540b27e4b60179f2b120064a7e4960d" +checksum = "90af93f2ab8eec43fea9f8931fa99d38e73fa0af60aba0fae79de3fb87a0ed06" dependencies = [ "anyhow", + "async-stream", "async-trait", "chrono", "denokv_proto", "futures", + "hex", "log", "num-bigint", "rand", "rusqlite", "serde_json", + "thiserror", "tokio", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index d5177d4096..d4e9c18334 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,10 +49,10 @@ test_util = { path = "./test_util" } deno_lockfile = "0.17.2" deno_media_type = { version = "0.1.1", features = ["module_specifier"] } -denokv_proto = "0.2.1" +denokv_proto = "0.5.0" # denokv_sqlite brings in bundled sqlite if we don't disable the default features -denokv_sqlite = { default-features = false, version = "0.2.1" } -denokv_remote = "0.2.3" +denokv_sqlite = { default-features = false, version = "0.5.0" } +denokv_remote = "0.5.0" # exts deno_broadcast_channel = { version = "0.120.0", path = "./ext/broadcast_channel" } diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 73c85dd5cb..68b3c40139 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -2137,3 +2137,47 @@ Deno.test( // calling [Symbol.dispose] after manual close is a no-op }, ); + +dbTest("key watch", async (db) => { + const changeHistory: Deno.KvEntryMaybe[] = []; + const watcher: ReadableStream[]> = db.watch< + number[] + >([["key"]]); + + const reader = watcher.getReader(); + const expectedChanges = 2; + + const work = (async () => { + for (let i = 0; i < expectedChanges; i++) { + const message = await reader.read(); + if (message.done) { + throw new Error("Unexpected end of stream"); + } + changeHistory.push(message.value[0]); + } + + await reader.cancel(); + })(); + + while (changeHistory.length !== 1) { + await sleep(100); + } + assertEquals(changeHistory[0], { + key: ["key"], + value: null, + versionstamp: null, + }); + + const { versionstamp } = await db.set(["key"], 1); + while (changeHistory.length as number !== 2) { + await sleep(100); + } + assertEquals(changeHistory[1], { + key: ["key"], + value: 1, + versionstamp, + }); + + await work; + await reader.cancel(); +}); diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 5cba27a5e2..62f18ccbc4 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1994,6 +1994,48 @@ declare namespace Deno { */ atomic(): AtomicOperation; + /** + * Watch for changes to the given keys in the database. The returned stream + * is a {@linkcode ReadableStream} that emits a new value whenever any of + * the watched keys change their versionstamp. The emitted value is an array + * of {@linkcode Deno.KvEntryMaybe} objects, with the same length and order + * as the `keys` array. If no value exists for a given key, the returned + * entry will have a `null` value and versionstamp. + * + * The returned stream does not return every single intermediate state of + * the watched keys, but rather only keeps you up to date with the latest + * state of the keys. This means that if a key is modified multiple times + * quickly, you may not receive a notification for every single change, but + * rather only the latest state of the key. + * + * ```ts + * const db = await Deno.openKv(); + * + * const stream = db.watch([["foo"], ["bar"]]); + * for await (const entries of stream) { + * entries[0].key; // ["foo"] + * entries[0].value; // "bar" + * entries[0].versionstamp; // "00000000000000010000" + * entries[1].key; // ["bar"] + * entries[1].value; // null + * entries[1].versionstamp; // null + * } + * ``` + * + * The `options` argument can be used to specify additional options for the + * watch operation. The `raw` option can be used to specify whether a new + * value should be emitted whenever a mutation occurs on any of the watched + * keys (even if the value of the key does not change, such as deleting a + * deleted key), or only when entries have observably changed in some way. + * When `raw: true` is used, it is possible for the stream to occasionally + * emit values even if no mutations have occurred on any of the watched + * keys. The default value for this option is `false`. + */ + watch( + keys: readonly [...{ [K in keyof T]: KvKey }], + options?: { raw?: boolean }, + ): ReadableStream<{ [K in keyof T]: KvEntryMaybe }>; + /** * Close the database connection. This will prevent any further operations * from being performed on the database, and interrupt any in-flight diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 34678261a7..73deee27fc 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -11,8 +11,10 @@ const { SymbolFor, SymbolToStringTag, Uint8ArrayPrototype, + Error, } = globalThis.__bootstrap.primordials; import { SymbolDispose } from "ext:deno_web/00_infra.js"; +import { ReadableStream } from "ext:deno_web/06_streams.js"; const core = Deno.core; const ops = core.ops; @@ -297,6 +299,71 @@ class Kv { finishMessageOps.clear(); } + watch(keys: Deno.KvKey[], options = {}) { + const raw = options.raw ?? false; + const rid = ops.op_kv_watch(this.#rid, keys); + const lastEntries: (Deno.KvEntryMaybe | undefined)[] = Array.from( + { length: keys.length }, + () => undefined, + ); + return new ReadableStream({ + async pull(controller) { + while (true) { + let updates; + try { + updates = await core.opAsync("op_kv_watch_next", rid); + } catch (err) { + core.tryClose(rid); + controller.error(err); + return; + } + if (updates === null) { + core.tryClose(rid); + controller.close(); + return; + } + let changed = false; + for (let i = 0; i < keys.length; i++) { + if (updates[i] === "unchanged") { + if (lastEntries[i] === undefined) { + throw new Error( + "watch: invalid unchanged update (internal error)", + ); + } + continue; + } + if ( + lastEntries[i] !== undefined && + (updates[i]?.versionstamp ?? null) === + lastEntries[i]?.versionstamp + ) { + continue; + } + changed = true; + if (updates[i] === null) { + lastEntries[i] = { + key: [...keys[i]], + value: null, + versionstamp: null, + }; + } else { + lastEntries[i] = updates[i]; + } + } + if (!changed && !raw) continue; // no change + const entries = lastEntries.map((entry) => + entry.versionstamp === null ? { ...entry } : deserializeValue(entry) + ); + controller.enqueue(entries); + return; + } + }, + cancel() { + core.tryClose(rid); + }, + }); + } + close() { core.close(this.#rid); } diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs index b772d26b88..c8dd6640ca 100644 --- a/ext/kv/dynamic.rs +++ b/ext/kv/dynamic.rs @@ -18,6 +18,7 @@ use deno_core::error::AnyError; use deno_core::OpState; use denokv_proto::CommitResult; use denokv_proto::ReadRangeOutput; +use denokv_proto::WatchStream; pub struct MultiBackendDbHandler { backends: Vec<(&'static [&'static str], Box)>, @@ -55,7 +56,7 @@ impl MultiBackendDbHandler { #[async_trait(?Send)] impl DatabaseHandler for MultiBackendDbHandler { - type DB = Box; + type DB = RcDynamicDb; async fn open( &self, @@ -88,12 +89,12 @@ pub trait DynamicDbHandler { &self, state: Rc>, path: Option, - ) -> Result, AnyError>; + ) -> Result; } #[async_trait(?Send)] impl DatabaseHandler for Box { - type DB = Box; + type DB = RcDynamicDb; async fn open( &self, @@ -114,8 +115,8 @@ where &self, state: Rc>, path: Option, - ) -> Result, AnyError> { - Ok(Box::new(self.open(state, path).await?)) + ) -> Result { + Ok(RcDynamicDb(Rc::new(self.open(state, path).await?))) } } @@ -136,11 +137,16 @@ pub trait DynamicDb { &self, ) -> Result>, AnyError>; + fn dyn_watch(&self, keys: Vec>) -> WatchStream; + fn dyn_close(&self); } +#[derive(Clone)] +pub struct RcDynamicDb(Rc); + #[async_trait(?Send)] -impl Database for Box { +impl Database for RcDynamicDb { type QMH = Box; async fn snapshot_read( @@ -148,24 +154,28 @@ impl Database for Box { requests: Vec, options: SnapshotReadOptions, ) -> Result, AnyError> { - (**self).dyn_snapshot_read(requests, options).await + (*self.0).dyn_snapshot_read(requests, options).await } async fn atomic_write( &self, write: AtomicWrite, ) -> Result, AnyError> { - (**self).dyn_atomic_write(write).await + (*self.0).dyn_atomic_write(write).await } async fn dequeue_next_message( &self, ) -> Result>, AnyError> { - (**self).dyn_dequeue_next_message().await + (*self.0).dyn_dequeue_next_message().await + } + + fn watch(&self, keys: Vec>) -> WatchStream { + (*self.0).dyn_watch(keys) } fn close(&self) { - (**self).dyn_close() + (*self.0).dyn_close() } } @@ -201,6 +211,10 @@ where ) } + fn dyn_watch(&self, keys: Vec>) -> WatchStream { + self.watch(keys) + } + fn dyn_close(&self) { self.close() } diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index c0091d75d8..456a1ebf7a 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -20,12 +20,17 @@ use deno_core::anyhow::Context; use deno_core::error::get_custom_error_class; use deno_core::error::type_error; use deno_core::error::AnyError; +use deno_core::futures::StreamExt; use deno_core::op2; use deno_core::serde_v8::AnyValue; use deno_core::serde_v8::BigInt; +use deno_core::AsyncRefCell; use deno_core::ByteString; +use deno_core::CancelFuture; +use deno_core::CancelHandle; use deno_core::JsBuffer; use deno_core::OpState; +use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ToJsBuffer; @@ -45,6 +50,8 @@ use denokv_proto::MutationKind; use denokv_proto::QueueMessageHandle; use denokv_proto::ReadRange; use denokv_proto::SnapshotReadOptions; +use denokv_proto::WatchKeyOutput; +use denokv_proto::WatchStream; use log::debug; use serde::Deserialize; use serde::Serialize; @@ -62,6 +69,7 @@ const MAX_READ_RANGES: usize = 10; const MAX_READ_ENTRIES: usize = 1000; const MAX_CHECKS: usize = 100; const MAX_MUTATIONS: usize = 1000; +const MAX_WATCHED_KEYS: usize = 10; const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 800 * 1024; const MAX_TOTAL_KEY_SIZE_BYTES: usize = 80 * 1024; @@ -75,6 +83,8 @@ deno_core::extension!(deno_kv, op_kv_encode_cursor, op_kv_dequeue_next_message, op_kv_finish_dequeued_message, + op_kv_watch, + op_kv_watch_next, ], esm = [ "01_db.ts" ], options = { @@ -86,7 +96,8 @@ deno_core::extension!(deno_kv, ); struct DatabaseResource { - db: Rc, + db: DB, + cancel_handle: Rc, } impl Resource for DatabaseResource { @@ -96,6 +107,23 @@ impl Resource for DatabaseResource { fn close(self: Rc) { self.db.close(); + self.cancel_handle.cancel(); + } +} + +struct DatabaseWatcherResource { + stream: AsyncRefCell, + db_cancel_handle: Rc, + cancel_handle: Rc, +} + +impl Resource for DatabaseWatcherResource { + fn name(&self) -> Cow { + "databaseWatcher".into() + } + + fn close(self: Rc) { + self.cancel_handle.cancel() } } @@ -118,10 +146,10 @@ where state.borrow::>().clone() }; let db = handler.open(state.clone(), path).await?; - let rid = state - .borrow_mut() - .resource_table - .add(DatabaseResource { db: Rc::new(db) }); + let rid = state.borrow_mut().resource_table.add(DatabaseResource { + db, + cancel_handle: CancelHandle::new_rc(), + }); Ok(rid) } @@ -354,6 +382,97 @@ where Ok(Some((payload, handle_rid))) } +#[op2] +#[smi] +fn op_kv_watch( + state: &mut OpState, + #[smi] rid: ResourceId, + #[serde] keys: Vec, +) -> Result +where + DBH: DatabaseHandler + 'static, +{ + let resource = state.resource_table.get::>(rid)?; + + if keys.len() > MAX_WATCHED_KEYS { + return Err(type_error(format!( + "too many keys (max {})", + MAX_WATCHED_KEYS + ))); + } + + let keys: Vec> = keys + .into_iter() + .map(encode_v8_key) + .collect::>()?; + + for k in &keys { + check_read_key_size(k)?; + } + + let stream = resource.db.watch(keys); + + let rid = state.resource_table.add(DatabaseWatcherResource { + stream: AsyncRefCell::new(stream), + db_cancel_handle: resource.cancel_handle.clone(), + cancel_handle: CancelHandle::new_rc(), + }); + + Ok(rid) +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase", untagged)] +enum WatchEntry { + Changed(Option), + Unchanged, +} + +#[op2(async)] +#[serde] +async fn op_kv_watch_next( + state: Rc>, + #[smi] rid: ResourceId, +) -> Result>, AnyError> { + let resource = { + let state = state.borrow(); + let resource = state.resource_table.get::(rid)?; + resource.clone() + }; + + let db_cancel_handle = resource.db_cancel_handle.clone(); + let cancel_handle = resource.cancel_handle.clone(); + let stream = RcRef::map(resource, |r| &r.stream) + .borrow_mut() + .or_cancel(db_cancel_handle) + .or_cancel(cancel_handle) + .await; + let Ok(Ok(mut stream)) = stream else { + return Ok(None); + }; + + // doesn't need a cancel handle because the stream ends when the database + // connection is closed + let Some(res) = stream.next().await else { + return Ok(None); + }; + + let entries = res?; + let entries = entries + .into_iter() + .map(|entry| { + Ok(match entry { + WatchKeyOutput::Changed { entry } => { + WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?) + } + WatchKeyOutput::Unchanged => WatchEntry::Unchanged, + }) + }) + .collect::>()?; + + Ok(Some(entries)) +} + #[op2(async)] async fn op_kv_finish_dequeued_message( state: Rc>, diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs index 7cac6b9c34..855b091fa5 100644 --- a/ext/kv/remote.rs +++ b/ext/kv/remote.rs @@ -66,6 +66,15 @@ pub struct PermissionChecker { _permissions: PhantomData

, } +impl Clone for PermissionChecker

{ + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + _permissions: PhantomData, + } + } +} + impl denokv_remote::RemotePermissions for PermissionChecker

{ diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index b4e251f962..e0facace44 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -8,7 +8,6 @@ use std::marker::PhantomData; use std::path::Path; use std::path::PathBuf; use std::rc::Rc; -use std::sync::Arc; use std::sync::Mutex; use std::sync::OnceLock; @@ -18,15 +17,15 @@ use deno_core::error::AnyError; use deno_core::unsync::spawn_blocking; use deno_core::OpState; use deno_node::PathClean; -pub use denokv_sqlite::TypeError; +pub use denokv_sqlite::SqliteBackendError; +use denokv_sqlite::SqliteNotifier; use rand::RngCore; use rand::SeedableRng; use rusqlite::OpenFlags; -use tokio::sync::Notify; use crate::DatabaseHandler; -static QUEUE_WAKER_MAP: OnceLock>>> = +static SQLITE_NOTIFIERS_MAP: OnceLock>> = OnceLock::new(); pub struct SqliteDbHandler { @@ -85,47 +84,48 @@ impl DatabaseHandler for SqliteDbHandler

{ let path = path.clone(); let default_storage_dir = self.default_storage_dir.clone(); - let (conn, queue_waker_key) = spawn_blocking(move || { + let (conn, notifier_key) = spawn_blocking(move || { denokv_sqlite::sqlite_retry_loop(|| { - let (conn, queue_waker_key) = - match (path.as_deref(), &default_storage_dir) { - (Some(":memory:"), _) | (None, None) => { - (rusqlite::Connection::open_in_memory()?, None) - } - (Some(path), _) => { - let flags = - OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); - let resolved_path = canonicalize_path(&PathBuf::from(path))?; - ( - rusqlite::Connection::open_with_flags(path, flags)?, - Some(resolved_path), - ) - } - (None, Some(path)) => { - std::fs::create_dir_all(path)?; - let path = path.join("kv.sqlite3"); - (rusqlite::Connection::open(path.clone())?, Some(path)) - } - }; + let (conn, notifier_key) = match (path.as_deref(), &default_storage_dir) + { + (Some(":memory:"), _) | (None, None) => { + (rusqlite::Connection::open_in_memory()?, None) + } + (Some(path), _) => { + let flags = + OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI); + let resolved_path = canonicalize_path(&PathBuf::from(path)) + .map_err(anyhow::Error::from)?; + ( + rusqlite::Connection::open_with_flags(path, flags)?, + Some(resolved_path), + ) + } + (None, Some(path)) => { + std::fs::create_dir_all(path).map_err(anyhow::Error::from)?; + let path = path.join("kv.sqlite3"); + (rusqlite::Connection::open(path.clone())?, Some(path)) + } + }; conn.pragma_update(None, "journal_mode", "wal")?; - Ok::<_, AnyError>((conn, queue_waker_key)) + Ok::<_, SqliteBackendError>((conn, notifier_key)) }) }) .await .unwrap()?; - let dequeue_notify = if let Some(queue_waker_key) = queue_waker_key { - QUEUE_WAKER_MAP + let notifier = if let Some(notifier_key) = notifier_key { + SQLITE_NOTIFIERS_MAP .get_or_init(Default::default) .lock() .unwrap() - .entry(queue_waker_key) + .entry(notifier_key) .or_default() .clone() } else { - Arc::new(Notify::new()) + SqliteNotifier::default() }; let versionstamp_rng: Box = @@ -134,7 +134,7 @@ impl DatabaseHandler for SqliteDbHandler

{ None => Box::new(rand::rngs::StdRng::from_entropy()), }; - denokv_sqlite::Sqlite::new(conn, dequeue_notify, versionstamp_rng) + denokv_sqlite::Sqlite::new(conn, notifier, versionstamp_rng) } } diff --git a/runtime/errors.rs b/runtime/errors.rs index f48f01246e..2061a5e0b9 100644 --- a/runtime/errors.rs +++ b/runtime/errors.rs @@ -212,7 +212,7 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { .map(get_url_parse_error_class) }) .or_else(|| { - e.downcast_ref::() + e.downcast_ref::() .map(|_| "TypeError") }) .or_else(|| { diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs index 0f402fb52e..a2f947f560 100644 --- a/test_util/src/lib.rs +++ b/test_util/src/lib.rs @@ -11,6 +11,7 @@ use denokv_proto::datapath::AtomicWriteStatus; use denokv_proto::datapath::ReadRangeOutput; use denokv_proto::datapath::SnapshotRead; use denokv_proto::datapath::SnapshotReadOutput; +use denokv_proto::datapath::SnapshotReadStatus; use fastwebsockets::FragmentCollector; use fastwebsockets::Frame; use fastwebsockets::OpCode; @@ -1226,6 +1227,7 @@ async fn main_server( .collect(), read_disabled: false, read_is_strongly_consistent: true, + status: SnapshotReadStatus::SrSuccess.into(), } .encode_to_vec(), ))