1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-24 15:19:26 -05:00

feat(unstable): kv.watch() (#21147)

This commit adds support for a new `kv.watch()` method that allows
watching for changes to a key-value pair. This is useful for cases
where you want to be notified when a key-value pair changes, but
don't want to have to poll for changes.

---------

Co-authored-by: losfair <zhy20000919@hotmail.com>
This commit is contained in:
Luca Casonato 2023-12-05 14:21:46 +01:00 committed by Bartek Iwańczuk
parent 889e396b7e
commit 91cd0a2bef
No known key found for this signature in database
GPG key ID: 0C6BCDDC3B3AD750
11 changed files with 360 additions and 56 deletions

19
Cargo.lock generated
View file

@ -1703,13 +1703,14 @@ dependencies = [
[[package]]
name = "denokv_proto"
version = "0.2.1"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8952fb8c38c1dcd796d49b00030afb74aa184160ae86817b72a32a994c8e16f0"
checksum = "98a79f7e98bfd3c148ce782c27c1494e77c3c94ab87c9e7e86e901cbc1643449"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"futures",
"num-bigint",
"prost",
"prost-build",
@ -1719,15 +1720,17 @@ dependencies = [
[[package]]
name = "denokv_remote"
version = "0.2.3"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edfc8447324d783b01e215bd5040ff9149c34d9715c7b7b5080dd648ebf1148a"
checksum = "518e181eb14f1a3b8fc423e48de431048249780fb0815d81e8139faf347c3269"
dependencies = [
"anyhow",
"async-stream",
"async-trait",
"bytes",
"chrono",
"denokv_proto",
"futures",
"log",
"prost",
"rand",
@ -1735,26 +1738,30 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-util",
"url",
"uuid",
]
[[package]]
name = "denokv_sqlite"
version = "0.2.1"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ec76b691ff069f14e56e3e053c2b2163540b27e4b60179f2b120064a7e4960d"
checksum = "90af93f2ab8eec43fea9f8931fa99d38e73fa0af60aba0fae79de3fb87a0ed06"
dependencies = [
"anyhow",
"async-stream",
"async-trait",
"chrono",
"denokv_proto",
"futures",
"hex",
"log",
"num-bigint",
"rand",
"rusqlite",
"serde_json",
"thiserror",
"tokio",
"uuid",
]

View file

@ -49,10 +49,10 @@ test_util = { path = "./test_util" }
deno_lockfile = "0.17.2"
deno_media_type = { version = "0.1.1", features = ["module_specifier"] }
denokv_proto = "0.2.1"
denokv_proto = "0.5.0"
# denokv_sqlite brings in bundled sqlite if we don't disable the default features
denokv_sqlite = { default-features = false, version = "0.2.1" }
denokv_remote = "0.2.3"
denokv_sqlite = { default-features = false, version = "0.5.0" }
denokv_remote = "0.5.0"
# exts
deno_broadcast_channel = { version = "0.120.0", path = "./ext/broadcast_channel" }

View file

@ -2137,3 +2137,47 @@ Deno.test(
// calling [Symbol.dispose] after manual close is a no-op
},
);
dbTest("key watch", async (db) => {
const changeHistory: Deno.KvEntryMaybe<number>[] = [];
const watcher: ReadableStream<Deno.KvEntryMaybe<number>[]> = db.watch<
number[]
>([["key"]]);
const reader = watcher.getReader();
const expectedChanges = 2;
const work = (async () => {
for (let i = 0; i < expectedChanges; i++) {
const message = await reader.read();
if (message.done) {
throw new Error("Unexpected end of stream");
}
changeHistory.push(message.value[0]);
}
await reader.cancel();
})();
while (changeHistory.length !== 1) {
await sleep(100);
}
assertEquals(changeHistory[0], {
key: ["key"],
value: null,
versionstamp: null,
});
const { versionstamp } = await db.set(["key"], 1);
while (changeHistory.length as number !== 2) {
await sleep(100);
}
assertEquals(changeHistory[1], {
key: ["key"],
value: 1,
versionstamp,
});
await work;
await reader.cancel();
});

View file

@ -1994,6 +1994,48 @@ declare namespace Deno {
*/
atomic(): AtomicOperation;
/**
* Watch for changes to the given keys in the database. The returned stream
* is a {@linkcode ReadableStream} that emits a new value whenever any of
* the watched keys change their versionstamp. The emitted value is an array
* of {@linkcode Deno.KvEntryMaybe} objects, with the same length and order
* as the `keys` array. If no value exists for a given key, the returned
* entry will have a `null` value and versionstamp.
*
* The returned stream does not return every single intermediate state of
* the watched keys, but rather only keeps you up to date with the latest
* state of the keys. This means that if a key is modified multiple times
* quickly, you may not receive a notification for every single change, but
* rather only the latest state of the key.
*
* ```ts
* const db = await Deno.openKv();
*
* const stream = db.watch([["foo"], ["bar"]]);
* for await (const entries of stream) {
* entries[0].key; // ["foo"]
* entries[0].value; // "bar"
* entries[0].versionstamp; // "00000000000000010000"
* entries[1].key; // ["bar"]
* entries[1].value; // null
* entries[1].versionstamp; // null
* }
* ```
*
* The `options` argument can be used to specify additional options for the
* watch operation. The `raw` option can be used to specify whether a new
* value should be emitted whenever a mutation occurs on any of the watched
* keys (even if the value of the key does not change, such as deleting a
* deleted key), or only when entries have observably changed in some way.
* When `raw: true` is used, it is possible for the stream to occasionally
* emit values even if no mutations have occurred on any of the watched
* keys. The default value for this option is `false`.
*/
watch<T extends readonly unknown[]>(
keys: readonly [...{ [K in keyof T]: KvKey }],
options?: { raw?: boolean },
): ReadableStream<{ [K in keyof T]: KvEntryMaybe<T[K]> }>;
/**
* Close the database connection. This will prevent any further operations
* from being performed on the database, and interrupt any in-flight

View file

@ -11,8 +11,10 @@ const {
SymbolFor,
SymbolToStringTag,
Uint8ArrayPrototype,
Error,
} = globalThis.__bootstrap.primordials;
import { SymbolDispose } from "ext:deno_web/00_infra.js";
import { ReadableStream } from "ext:deno_web/06_streams.js";
const core = Deno.core;
const ops = core.ops;
@ -297,6 +299,71 @@ class Kv {
finishMessageOps.clear();
}
watch(keys: Deno.KvKey[], options = {}) {
const raw = options.raw ?? false;
const rid = ops.op_kv_watch(this.#rid, keys);
const lastEntries: (Deno.KvEntryMaybe<unknown> | undefined)[] = Array.from(
{ length: keys.length },
() => undefined,
);
return new ReadableStream({
async pull(controller) {
while (true) {
let updates;
try {
updates = await core.opAsync("op_kv_watch_next", rid);
} catch (err) {
core.tryClose(rid);
controller.error(err);
return;
}
if (updates === null) {
core.tryClose(rid);
controller.close();
return;
}
let changed = false;
for (let i = 0; i < keys.length; i++) {
if (updates[i] === "unchanged") {
if (lastEntries[i] === undefined) {
throw new Error(
"watch: invalid unchanged update (internal error)",
);
}
continue;
}
if (
lastEntries[i] !== undefined &&
(updates[i]?.versionstamp ?? null) ===
lastEntries[i]?.versionstamp
) {
continue;
}
changed = true;
if (updates[i] === null) {
lastEntries[i] = {
key: [...keys[i]],
value: null,
versionstamp: null,
};
} else {
lastEntries[i] = updates[i];
}
}
if (!changed && !raw) continue; // no change
const entries = lastEntries.map((entry) =>
entry.versionstamp === null ? { ...entry } : deserializeValue(entry)
);
controller.enqueue(entries);
return;
}
},
cancel() {
core.tryClose(rid);
},
});
}
close() {
core.close(this.#rid);
}

View file

@ -18,6 +18,7 @@ use deno_core::error::AnyError;
use deno_core::OpState;
use denokv_proto::CommitResult;
use denokv_proto::ReadRangeOutput;
use denokv_proto::WatchStream;
pub struct MultiBackendDbHandler {
backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
@ -55,7 +56,7 @@ impl MultiBackendDbHandler {
#[async_trait(?Send)]
impl DatabaseHandler for MultiBackendDbHandler {
type DB = Box<dyn DynamicDb>;
type DB = RcDynamicDb;
async fn open(
&self,
@ -88,12 +89,12 @@ pub trait DynamicDbHandler {
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Box<dyn DynamicDb>, AnyError>;
) -> Result<RcDynamicDb, AnyError>;
}
#[async_trait(?Send)]
impl DatabaseHandler for Box<dyn DynamicDbHandler> {
type DB = Box<dyn DynamicDb>;
type DB = RcDynamicDb;
async fn open(
&self,
@ -114,8 +115,8 @@ where
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Box<dyn DynamicDb>, AnyError> {
Ok(Box::new(self.open(state, path).await?))
) -> Result<RcDynamicDb, AnyError> {
Ok(RcDynamicDb(Rc::new(self.open(state, path).await?)))
}
}
@ -136,11 +137,16 @@ pub trait DynamicDb {
&self,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;
fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream;
fn dyn_close(&self);
}
#[derive(Clone)]
pub struct RcDynamicDb(Rc<dyn DynamicDb>);
#[async_trait(?Send)]
impl Database for Box<dyn DynamicDb> {
impl Database for RcDynamicDb {
type QMH = Box<dyn QueueMessageHandle>;
async fn snapshot_read(
@ -148,24 +154,28 @@ impl Database for Box<dyn DynamicDb> {
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
(**self).dyn_snapshot_read(requests, options).await
(*self.0).dyn_snapshot_read(requests, options).await
}
async fn atomic_write(
&self,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
(**self).dyn_atomic_write(write).await
(*self.0).dyn_atomic_write(write).await
}
async fn dequeue_next_message(
&self,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
(**self).dyn_dequeue_next_message().await
(*self.0).dyn_dequeue_next_message().await
}
fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
(*self.0).dyn_watch(keys)
}
fn close(&self) {
(**self).dyn_close()
(*self.0).dyn_close()
}
}
@ -201,6 +211,10 @@ where
)
}
fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
self.watch(keys)
}
fn dyn_close(&self) {
self.close()
}

View file

@ -20,12 +20,17 @@ use deno_core::anyhow::Context;
use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::serde_v8::AnyValue;
use deno_core::serde_v8::BigInt;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ToJsBuffer;
@ -45,6 +50,8 @@ use denokv_proto::MutationKind;
use denokv_proto::QueueMessageHandle;
use denokv_proto::ReadRange;
use denokv_proto::SnapshotReadOptions;
use denokv_proto::WatchKeyOutput;
use denokv_proto::WatchStream;
use log::debug;
use serde::Deserialize;
use serde::Serialize;
@ -62,6 +69,7 @@ const MAX_READ_RANGES: usize = 10;
const MAX_READ_ENTRIES: usize = 1000;
const MAX_CHECKS: usize = 100;
const MAX_MUTATIONS: usize = 1000;
const MAX_WATCHED_KEYS: usize = 10;
const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 800 * 1024;
const MAX_TOTAL_KEY_SIZE_BYTES: usize = 80 * 1024;
@ -75,6 +83,8 @@ deno_core::extension!(deno_kv,
op_kv_encode_cursor,
op_kv_dequeue_next_message<DBH>,
op_kv_finish_dequeued_message<DBH>,
op_kv_watch<DBH>,
op_kv_watch_next,
],
esm = [ "01_db.ts" ],
options = {
@ -86,7 +96,8 @@ deno_core::extension!(deno_kv,
);
struct DatabaseResource<DB: Database + 'static> {
db: Rc<DB>,
db: DB,
cancel_handle: Rc<CancelHandle>,
}
impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
@ -96,6 +107,23 @@ impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
fn close(self: Rc<Self>) {
self.db.close();
self.cancel_handle.cancel();
}
}
struct DatabaseWatcherResource {
stream: AsyncRefCell<WatchStream>,
db_cancel_handle: Rc<CancelHandle>,
cancel_handle: Rc<CancelHandle>,
}
impl Resource for DatabaseWatcherResource {
fn name(&self) -> Cow<str> {
"databaseWatcher".into()
}
fn close(self: Rc<Self>) {
self.cancel_handle.cancel()
}
}
@ -118,10 +146,10 @@ where
state.borrow::<Rc<DBH>>().clone()
};
let db = handler.open(state.clone(), path).await?;
let rid = state
.borrow_mut()
.resource_table
.add(DatabaseResource { db: Rc::new(db) });
let rid = state.borrow_mut().resource_table.add(DatabaseResource {
db,
cancel_handle: CancelHandle::new_rc(),
});
Ok(rid)
}
@ -354,6 +382,97 @@ where
Ok(Some((payload, handle_rid)))
}
#[op2]
#[smi]
fn op_kv_watch<DBH>(
state: &mut OpState,
#[smi] rid: ResourceId,
#[serde] keys: Vec<KvKey>,
) -> Result<ResourceId, AnyError>
where
DBH: DatabaseHandler + 'static,
{
let resource = state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
if keys.len() > MAX_WATCHED_KEYS {
return Err(type_error(format!(
"too many keys (max {})",
MAX_WATCHED_KEYS
)));
}
let keys: Vec<Vec<u8>> = keys
.into_iter()
.map(encode_v8_key)
.collect::<std::io::Result<_>>()?;
for k in &keys {
check_read_key_size(k)?;
}
let stream = resource.db.watch(keys);
let rid = state.resource_table.add(DatabaseWatcherResource {
stream: AsyncRefCell::new(stream),
db_cancel_handle: resource.cancel_handle.clone(),
cancel_handle: CancelHandle::new_rc(),
});
Ok(rid)
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase", untagged)]
enum WatchEntry {
Changed(Option<ToV8KvEntry>),
Unchanged,
}
#[op2(async)]
#[serde]
async fn op_kv_watch_next(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<Option<Vec<WatchEntry>>, AnyError> {
let resource = {
let state = state.borrow();
let resource = state.resource_table.get::<DatabaseWatcherResource>(rid)?;
resource.clone()
};
let db_cancel_handle = resource.db_cancel_handle.clone();
let cancel_handle = resource.cancel_handle.clone();
let stream = RcRef::map(resource, |r| &r.stream)
.borrow_mut()
.or_cancel(db_cancel_handle)
.or_cancel(cancel_handle)
.await;
let Ok(Ok(mut stream)) = stream else {
return Ok(None);
};
// doesn't need a cancel handle because the stream ends when the database
// connection is closed
let Some(res) = stream.next().await else {
return Ok(None);
};
let entries = res?;
let entries = entries
.into_iter()
.map(|entry| {
Ok(match entry {
WatchKeyOutput::Changed { entry } => {
WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?)
}
WatchKeyOutput::Unchanged => WatchEntry::Unchanged,
})
})
.collect::<Result<_, anyhow::Error>>()?;
Ok(Some(entries))
}
#[op2(async)]
async fn op_kv_finish_dequeued_message<DBH>(
state: Rc<RefCell<OpState>>,

View file

@ -66,6 +66,15 @@ pub struct PermissionChecker<P: RemoteDbHandlerPermissions> {
_permissions: PhantomData<P>,
}
impl<P: RemoteDbHandlerPermissions> Clone for PermissionChecker<P> {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
_permissions: PhantomData,
}
}
}
impl<P: RemoteDbHandlerPermissions + 'static> denokv_remote::RemotePermissions
for PermissionChecker<P>
{

View file

@ -8,7 +8,6 @@ use std::marker::PhantomData;
use std::path::Path;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::OnceLock;
@ -18,15 +17,15 @@ use deno_core::error::AnyError;
use deno_core::unsync::spawn_blocking;
use deno_core::OpState;
use deno_node::PathClean;
pub use denokv_sqlite::TypeError;
pub use denokv_sqlite::SqliteBackendError;
use denokv_sqlite::SqliteNotifier;
use rand::RngCore;
use rand::SeedableRng;
use rusqlite::OpenFlags;
use tokio::sync::Notify;
use crate::DatabaseHandler;
static QUEUE_WAKER_MAP: OnceLock<Mutex<HashMap<PathBuf, Arc<Notify>>>> =
static SQLITE_NOTIFIERS_MAP: OnceLock<Mutex<HashMap<PathBuf, SqliteNotifier>>> =
OnceLock::new();
pub struct SqliteDbHandler<P: SqliteDbHandlerPermissions + 'static> {
@ -85,47 +84,48 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
let path = path.clone();
let default_storage_dir = self.default_storage_dir.clone();
let (conn, queue_waker_key) = spawn_blocking(move || {
let (conn, notifier_key) = spawn_blocking(move || {
denokv_sqlite::sqlite_retry_loop(|| {
let (conn, queue_waker_key) =
match (path.as_deref(), &default_storage_dir) {
(Some(":memory:"), _) | (None, None) => {
(rusqlite::Connection::open_in_memory()?, None)
}
(Some(path), _) => {
let flags =
OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
let resolved_path = canonicalize_path(&PathBuf::from(path))?;
(
rusqlite::Connection::open_with_flags(path, flags)?,
Some(resolved_path),
)
}
(None, Some(path)) => {
std::fs::create_dir_all(path)?;
let path = path.join("kv.sqlite3");
(rusqlite::Connection::open(path.clone())?, Some(path))
}
};
let (conn, notifier_key) = match (path.as_deref(), &default_storage_dir)
{
(Some(":memory:"), _) | (None, None) => {
(rusqlite::Connection::open_in_memory()?, None)
}
(Some(path), _) => {
let flags =
OpenFlags::default().difference(OpenFlags::SQLITE_OPEN_URI);
let resolved_path = canonicalize_path(&PathBuf::from(path))
.map_err(anyhow::Error::from)?;
(
rusqlite::Connection::open_with_flags(path, flags)?,
Some(resolved_path),
)
}
(None, Some(path)) => {
std::fs::create_dir_all(path).map_err(anyhow::Error::from)?;
let path = path.join("kv.sqlite3");
(rusqlite::Connection::open(path.clone())?, Some(path))
}
};
conn.pragma_update(None, "journal_mode", "wal")?;
Ok::<_, AnyError>((conn, queue_waker_key))
Ok::<_, SqliteBackendError>((conn, notifier_key))
})
})
.await
.unwrap()?;
let dequeue_notify = if let Some(queue_waker_key) = queue_waker_key {
QUEUE_WAKER_MAP
let notifier = if let Some(notifier_key) = notifier_key {
SQLITE_NOTIFIERS_MAP
.get_or_init(Default::default)
.lock()
.unwrap()
.entry(queue_waker_key)
.entry(notifier_key)
.or_default()
.clone()
} else {
Arc::new(Notify::new())
SqliteNotifier::default()
};
let versionstamp_rng: Box<dyn RngCore + Send> =
@ -134,7 +134,7 @@ impl<P: SqliteDbHandlerPermissions> DatabaseHandler for SqliteDbHandler<P> {
None => Box::new(rand::rngs::StdRng::from_entropy()),
};
denokv_sqlite::Sqlite::new(conn, dequeue_notify, versionstamp_rng)
denokv_sqlite::Sqlite::new(conn, notifier, versionstamp_rng)
}
}

View file

@ -212,7 +212,7 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
.map(get_url_parse_error_class)
})
.or_else(|| {
e.downcast_ref::<deno_kv::sqlite::TypeError>()
e.downcast_ref::<deno_kv::sqlite::SqliteBackendError>()
.map(|_| "TypeError")
})
.or_else(|| {

View file

@ -11,6 +11,7 @@ use denokv_proto::datapath::AtomicWriteStatus;
use denokv_proto::datapath::ReadRangeOutput;
use denokv_proto::datapath::SnapshotRead;
use denokv_proto::datapath::SnapshotReadOutput;
use denokv_proto::datapath::SnapshotReadStatus;
use fastwebsockets::FragmentCollector;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
@ -1226,6 +1227,7 @@ async fn main_server(
.collect(),
read_disabled: false,
read_is_strongly_consistent: true,
status: SnapshotReadStatus::SrSuccess.into(),
}
.encode_to_vec(),
))