// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. pub mod dynamic; mod interface; pub mod remote; pub mod sqlite; use std::borrow::Cow; use std::cell::RefCell; use std::num::NonZeroU32; use std::rc::Rc; use std::time::Duration; use anyhow::bail; use base64::prelude::BASE64_URL_SAFE; use base64::Engine; use chrono::DateTime; use chrono::Utc; use deno_core::anyhow::Context; use deno_core::error::get_custom_error_class; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::StreamExt; use deno_core::op2; use deno_core::serde_v8::AnyValue; use deno_core::serde_v8::BigInt; use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::JsBuffer; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ToJsBuffer; use denokv_proto::decode_key; use denokv_proto::encode_key; use denokv_proto::AtomicWrite; use denokv_proto::Check; use denokv_proto::Consistency; use denokv_proto::Database; use denokv_proto::Enqueue; use denokv_proto::Key; use denokv_proto::KeyPart; use denokv_proto::KvEntry; use denokv_proto::KvValue; use denokv_proto::Mutation; use denokv_proto::MutationKind; use denokv_proto::QueueMessageHandle; use denokv_proto::ReadRange; use denokv_proto::SnapshotReadOptions; use denokv_proto::WatchKeyOutput; use denokv_proto::WatchStream; use log::debug; use serde::Deserialize; use serde::Serialize; pub use crate::interface::*; pub const UNSTABLE_FEATURE_NAME: &str = "kv"; const MAX_WRITE_KEY_SIZE_BYTES: usize = 2048; // range selectors can contain 0x00 or 0xff suffixes const MAX_READ_KEY_SIZE_BYTES: usize = MAX_WRITE_KEY_SIZE_BYTES + 1; const MAX_VALUE_SIZE_BYTES: usize = 65536; const MAX_READ_RANGES: usize = 10; const MAX_READ_ENTRIES: usize = 1000; const MAX_CHECKS: usize = 100; const MAX_MUTATIONS: usize = 1000; const MAX_WATCHED_KEYS: usize = 10; const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 800 * 1024; const MAX_TOTAL_KEY_SIZE_BYTES: usize = 80 * 1024; deno_core::extension!(deno_kv, deps = [ deno_console, deno_web ], parameters = [ DBH: DatabaseHandler ], ops = [ op_kv_database_open, op_kv_snapshot_read, op_kv_atomic_write, op_kv_encode_cursor, op_kv_dequeue_next_message, op_kv_finish_dequeued_message, op_kv_watch, op_kv_watch_next, ], esm = [ "01_db.ts" ], options = { handler: DBH, }, state = |state, options| { state.put(Rc::new(options.handler)); } ); struct DatabaseResource { db: DB, cancel_handle: Rc, } impl Resource for DatabaseResource { fn name(&self) -> Cow { "database".into() } fn close(self: Rc) { self.db.close(); self.cancel_handle.cancel(); } } struct DatabaseWatcherResource { stream: AsyncRefCell, db_cancel_handle: Rc, cancel_handle: Rc, } impl Resource for DatabaseWatcherResource { fn name(&self) -> Cow { "databaseWatcher".into() } fn close(self: Rc) { self.cancel_handle.cancel() } } #[op2(async)] #[smi] async fn op_kv_database_open( state: Rc>, #[string] path: Option, ) -> Result where DBH: DatabaseHandler + 'static, { let handler = { let state = state.borrow(); // TODO(bartlomieju): replace with `state.feature_checker.check_or_exit` // once we phase out `check_or_exit_with_legacy_fallback` state .feature_checker .check_or_exit_with_legacy_fallback(UNSTABLE_FEATURE_NAME, "Deno.openKv"); state.borrow::>().clone() }; let db = handler.open(state.clone(), path).await?; let rid = state.borrow_mut().resource_table.add(DatabaseResource { db, cancel_handle: CancelHandle::new_rc(), }); Ok(rid) } type KvKey = Vec; fn key_part_from_v8(value: AnyValue) -> KeyPart { match value { AnyValue::Bool(false) => KeyPart::False, AnyValue::Bool(true) => KeyPart::True, AnyValue::Number(n) => KeyPart::Float(n), AnyValue::BigInt(n) => KeyPart::Int(n), AnyValue::String(s) => KeyPart::String(s), AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()), AnyValue::RustBuffer(_) => unreachable!(), } } fn key_part_to_v8(value: KeyPart) -> AnyValue { match value { KeyPart::False => AnyValue::Bool(false), KeyPart::True => AnyValue::Bool(true), KeyPart::Float(n) => AnyValue::Number(n), KeyPart::Int(n) => AnyValue::BigInt(n), KeyPart::String(s) => AnyValue::String(s), KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()), } } #[derive(Debug, Deserialize)] #[serde(tag = "kind", content = "value", rename_all = "snake_case")] enum FromV8Value { V8(JsBuffer), Bytes(JsBuffer), U64(BigInt), } #[derive(Debug, Serialize)] #[serde(tag = "kind", content = "value", rename_all = "snake_case")] enum ToV8Value { V8(ToJsBuffer), Bytes(ToJsBuffer), U64(BigInt), } impl TryFrom for KvValue { type Error = AnyError; fn try_from(value: FromV8Value) -> Result { Ok(match value { FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()), FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()), FromV8Value::U64(n) => { KvValue::U64(num_bigint::BigInt::from(n).try_into()?) } }) } } impl From for ToV8Value { fn from(value: KvValue) -> Self { match value { KvValue::V8(buf) => ToV8Value::V8(buf.into()), KvValue::Bytes(buf) => ToV8Value::Bytes(buf.into()), KvValue::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()), } } } #[derive(Serialize)] struct ToV8KvEntry { key: KvKey, value: ToV8Value, versionstamp: ByteString, } impl TryFrom for ToV8KvEntry { type Error = AnyError; fn try_from(entry: KvEntry) -> Result { Ok(ToV8KvEntry { key: decode_key(&entry.key)? .0 .into_iter() .map(key_part_to_v8) .collect(), value: entry.value.into(), versionstamp: faster_hex::hex_string(&entry.versionstamp).into(), }) } } #[derive(Deserialize, Serialize)] #[serde(rename_all = "camelCase")] enum V8Consistency { Strong, Eventual, } impl From for Consistency { fn from(value: V8Consistency) -> Self { match value { V8Consistency::Strong => Consistency::Strong, V8Consistency::Eventual => Consistency::Eventual, } } } // (prefix, start, end, limit, reverse, cursor) type SnapshotReadRange = ( Option, Option, Option, u32, bool, Option, ); #[op2(async)] #[serde] async fn op_kv_snapshot_read( state: Rc>, #[smi] rid: ResourceId, #[serde] ranges: Vec, #[serde] consistency: V8Consistency, ) -> Result>, AnyError> where DBH: DatabaseHandler + 'static, { let db = { let state = state.borrow(); let resource = state.resource_table.get::>(rid)?; resource.db.clone() }; if ranges.len() > MAX_READ_RANGES { return Err(type_error(format!( "too many ranges (max {})", MAX_READ_RANGES ))); } let mut total_entries = 0usize; let read_ranges = ranges .into_iter() .map(|(prefix, start, end, limit, reverse, cursor)| { let selector = RawSelector::from_tuple(prefix, start, end)?; let (start, end) = decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?; check_read_key_size(&start)?; check_read_key_size(&end)?; total_entries += limit as usize; Ok(ReadRange { start, end, limit: NonZeroU32::new(limit) .with_context(|| "limit must be greater than 0")?, reverse, }) }) .collect::, AnyError>>()?; if total_entries > MAX_READ_ENTRIES { return Err(type_error(format!( "too many entries (max {})", MAX_READ_ENTRIES ))); } let opts = SnapshotReadOptions { consistency: consistency.into(), }; let output_ranges = db.snapshot_read(read_ranges, opts).await?; let output_ranges = output_ranges .into_iter() .map(|x| { x.entries .into_iter() .map(TryInto::try_into) .collect::, AnyError>>() }) .collect::, AnyError>>()?; Ok(output_ranges) } struct QueueMessageResource { handle: QPH, } impl Resource for QueueMessageResource { fn name(&self) -> Cow { "queueMessage".into() } } #[op2(async)] #[serde] async fn op_kv_dequeue_next_message( state: Rc>, #[smi] rid: ResourceId, ) -> Result, AnyError> where DBH: DatabaseHandler + 'static, { let db = { let state = state.borrow(); let resource = match state.resource_table.get::>(rid) { Ok(resource) => resource, Err(err) => { if get_custom_error_class(&err) == Some("BadResource") { return Ok(None); } else { return Err(err); } } }; resource.db.clone() }; let Some(mut handle) = db.dequeue_next_message().await? else { return Ok(None); }; let payload = handle.take_payload().await?.into(); let handle_rid = { let mut state = state.borrow_mut(); state.resource_table.add(QueueMessageResource { handle }) }; Ok(Some((payload, handle_rid))) } #[op2] #[smi] fn op_kv_watch( state: &mut OpState, #[smi] rid: ResourceId, #[serde] keys: Vec, ) -> Result where DBH: DatabaseHandler + 'static, { let resource = state.resource_table.get::>(rid)?; if keys.len() > MAX_WATCHED_KEYS { return Err(type_error(format!( "too many keys (max {})", MAX_WATCHED_KEYS ))); } let keys: Vec> = keys .into_iter() .map(encode_v8_key) .collect::>()?; for k in &keys { check_read_key_size(k)?; } let stream = resource.db.watch(keys); let rid = state.resource_table.add(DatabaseWatcherResource { stream: AsyncRefCell::new(stream), db_cancel_handle: resource.cancel_handle.clone(), cancel_handle: CancelHandle::new_rc(), }); Ok(rid) } #[derive(Serialize)] #[serde(rename_all = "camelCase", untagged)] enum WatchEntry { Changed(Option), Unchanged, } #[op2(async)] #[serde] async fn op_kv_watch_next( state: Rc>, #[smi] rid: ResourceId, ) -> Result>, AnyError> { let resource = { let state = state.borrow(); let resource = state.resource_table.get::(rid)?; resource.clone() }; let db_cancel_handle = resource.db_cancel_handle.clone(); let cancel_handle = resource.cancel_handle.clone(); let stream = RcRef::map(resource, |r| &r.stream) .borrow_mut() .or_cancel(db_cancel_handle.clone()) .or_cancel(cancel_handle.clone()) .await; let Ok(Ok(mut stream)) = stream else { return Ok(None); }; // We hold a strong reference to `resource`, so we can't rely on the stream // being dropped when the db connection is closed let Ok(Ok(Some(res))) = stream .next() .or_cancel(db_cancel_handle) .or_cancel(cancel_handle) .await else { return Ok(None); }; let entries = res?; let entries = entries .into_iter() .map(|entry| { Ok(match entry { WatchKeyOutput::Changed { entry } => { WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?) } WatchKeyOutput::Unchanged => WatchEntry::Unchanged, }) }) .collect::>()?; Ok(Some(entries)) } #[op2(async)] async fn op_kv_finish_dequeued_message( state: Rc>, #[smi] handle_rid: ResourceId, success: bool, ) -> Result<(), AnyError> where DBH: DatabaseHandler + 'static, { let handle = { let mut state = state.borrow_mut(); let handle = state .resource_table .take::::DB as Database>::QMH>>(handle_rid) .map_err(|_| type_error("Queue message not found"))?; Rc::try_unwrap(handle) .map_err(|_| type_error("Queue message not found"))? .handle }; // if we fail to finish the message, there is not much we can do and the // message will be retried anyway, so we just ignore the error if let Err(err) = handle.finish(success).await { debug!("Failed to finish dequeued message: {}", err); }; Ok(()) } type V8KvCheck = (KvKey, Option); fn check_from_v8(value: V8KvCheck) -> Result { let versionstamp = match value.1 { Some(data) => { let mut out = [0u8; 10]; if data.len() != out.len() * 2 { bail!(type_error("invalid versionstamp")); } faster_hex::hex_decode(&data, &mut out) .map_err(|_| type_error("invalid versionstamp"))?; Some(out) } None => None, }; Ok(Check { key: encode_v8_key(value.0)?, versionstamp, }) } type V8KvMutation = (KvKey, String, Option, Option); fn mutation_from_v8( (value, current_timstamp): (V8KvMutation, DateTime), ) -> Result { let key = encode_v8_key(value.0)?; let kind = match (value.1.as_str(), value.2) { ("set", Some(value)) => MutationKind::Set(value.try_into()?), ("delete", None) => MutationKind::Delete, ("sum", Some(value)) => MutationKind::Sum { value: value.try_into()?, min_v8: vec![], max_v8: vec![], clamp: false, }, ("min", Some(value)) => MutationKind::Min(value.try_into()?), ("max", Some(value)) => MutationKind::Max(value.try_into()?), ("setSuffixVersionstampedKey", Some(value)) => { MutationKind::SetSuffixVersionstampedKey(value.try_into()?) } (op, Some(_)) => { return Err(type_error(format!("invalid mutation '{op}' with value"))) } (op, None) => { return Err(type_error(format!("invalid mutation '{op}' without value"))) } }; Ok(Mutation { key, kind, expire_at: value .3 .map(|expire_in| current_timstamp + Duration::from_millis(expire_in)), }) } type V8Enqueue = (JsBuffer, u64, Vec, Option>); fn enqueue_from_v8( value: V8Enqueue, current_timestamp: DateTime, ) -> Result { Ok(Enqueue { payload: value.0.to_vec(), deadline: current_timestamp + chrono::Duration::milliseconds(value.1 as i64), keys_if_undelivered: value .2 .into_iter() .map(encode_v8_key) .collect::>()?, backoff_schedule: value.3, }) } fn encode_v8_key(key: KvKey) -> Result, std::io::Error> { encode_key(&Key(key.into_iter().map(key_part_from_v8).collect())) } enum RawSelector { Prefixed { prefix: Vec, start: Option>, end: Option>, }, Range { start: Vec, end: Vec, }, } impl RawSelector { fn from_tuple( prefix: Option, start: Option, end: Option, ) -> Result { let prefix = prefix.map(encode_v8_key).transpose()?; let start = start.map(encode_v8_key).transpose()?; let end = end.map(encode_v8_key).transpose()?; match (prefix, start, end) { (Some(prefix), None, None) => Ok(Self::Prefixed { prefix, start: None, end: None, }), (Some(prefix), Some(start), None) => { if !start.starts_with(&prefix) || start.len() == prefix.len() { return Err(type_error( "start key is not in the keyspace defined by prefix", )); } Ok(Self::Prefixed { prefix, start: Some(start), end: None, }) } (Some(prefix), None, Some(end)) => { if !end.starts_with(&prefix) || end.len() == prefix.len() { return Err(type_error( "end key is not in the keyspace defined by prefix", )); } Ok(Self::Prefixed { prefix, start: None, end: Some(end), }) } (None, Some(start), Some(end)) => { if start > end { return Err(type_error("start key is greater than end key")); } Ok(Self::Range { start, end }) } (None, Some(start), None) => { let end = start.iter().copied().chain(Some(0)).collect(); Ok(Self::Range { start, end }) } _ => Err(type_error("invalid range")), } } fn start(&self) -> Option<&[u8]> { match self { Self::Prefixed { start, .. } => start.as_deref(), Self::Range { start, .. } => Some(start), } } fn end(&self) -> Option<&[u8]> { match self { Self::Prefixed { end, .. } => end.as_deref(), Self::Range { end, .. } => Some(end), } } fn common_prefix(&self) -> &[u8] { match self { Self::Prefixed { prefix, .. } => prefix, Self::Range { start, end } => common_prefix_for_bytes(start, end), } } fn range_start_key(&self) -> Vec { match self { Self::Prefixed { start: Some(start), .. } => start.clone(), Self::Range { start, .. } => start.clone(), Self::Prefixed { prefix, .. } => { prefix.iter().copied().chain(Some(0)).collect() } } } fn range_end_key(&self) -> Vec { match self { Self::Prefixed { end: Some(end), .. } => end.clone(), Self::Range { end, .. } => end.clone(), Self::Prefixed { prefix, .. } => { prefix.iter().copied().chain(Some(0xff)).collect() } } } } fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] { let mut i = 0; while i < a.len() && i < b.len() && a[i] == b[i] { i += 1; } &a[..i] } fn encode_cursor( selector: &RawSelector, boundary_key: &[u8], ) -> Result { let common_prefix = selector.common_prefix(); if !boundary_key.starts_with(common_prefix) { return Err(type_error("invalid boundary key")); } Ok(BASE64_URL_SAFE.encode(&boundary_key[common_prefix.len()..])) } fn decode_selector_and_cursor( selector: &RawSelector, reverse: bool, cursor: Option<&ByteString>, ) -> Result<(Vec, Vec), AnyError> { let Some(cursor) = cursor else { return Ok((selector.range_start_key(), selector.range_end_key())); }; let common_prefix = selector.common_prefix(); let cursor = BASE64_URL_SAFE .decode(cursor) .map_err(|_| type_error("invalid cursor"))?; let first_key: Vec; let last_key: Vec; if reverse { first_key = selector.range_start_key(); last_key = common_prefix .iter() .copied() .chain(cursor.iter().copied()) .collect(); } else { first_key = common_prefix .iter() .copied() .chain(cursor.iter().copied()) .chain(Some(0)) .collect(); last_key = selector.range_end_key(); } // Defend against out-of-bounds reading if let Some(start) = selector.start() { if &first_key[..] < start { return Err(type_error("cursor out of bounds")); } } if let Some(end) = selector.end() { if &last_key[..] > end { return Err(type_error("cursor out of bounds")); } } Ok((first_key, last_key)) } #[op2(async)] #[string] async fn op_kv_atomic_write( state: Rc>, #[smi] rid: ResourceId, #[serde] checks: Vec, #[serde] mutations: Vec, #[serde] enqueues: Vec, ) -> Result, AnyError> where DBH: DatabaseHandler + 'static, { let current_timestamp = chrono::Utc::now(); let db = { let state = state.borrow(); let resource = state.resource_table.get::>(rid)?; resource.db.clone() }; if checks.len() > MAX_CHECKS { return Err(type_error(format!("too many checks (max {})", MAX_CHECKS))); } if mutations.len() + enqueues.len() > MAX_MUTATIONS { return Err(type_error(format!( "too many mutations (max {})", MAX_MUTATIONS ))); } let checks = checks .into_iter() .map(check_from_v8) .collect::, AnyError>>() .with_context(|| "invalid check")?; let mutations = mutations .into_iter() .map(|mutation| mutation_from_v8((mutation, current_timestamp))) .collect::, AnyError>>() .with_context(|| "invalid mutation")?; let enqueues = enqueues .into_iter() .map(|e| enqueue_from_v8(e, current_timestamp)) .collect::, AnyError>>() .with_context(|| "invalid enqueue")?; let mut total_payload_size = 0usize; let mut total_key_size = 0usize; for key in checks .iter() .map(|c| &c.key) .chain(mutations.iter().map(|m| &m.key)) { if key.is_empty() { return Err(type_error("key cannot be empty")); } total_payload_size += check_write_key_size(key)?; } for (key, value) in mutations .iter() .flat_map(|m| m.kind.value().map(|x| (&m.key, x))) { let key_size = check_write_key_size(key)?; total_payload_size += check_value_size(value)? + key_size; total_key_size += key_size; } for enqueue in &enqueues { total_payload_size += check_enqueue_payload_size(&enqueue.payload)?; if let Some(schedule) = enqueue.backoff_schedule.as_ref() { total_payload_size += 4 * schedule.len(); } } if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES { return Err(type_error(format!( "total mutation size too large (max {} bytes)", MAX_TOTAL_MUTATION_SIZE_BYTES ))); } if total_key_size > MAX_TOTAL_KEY_SIZE_BYTES { return Err(type_error(format!( "total key size too large (max {} bytes)", MAX_TOTAL_KEY_SIZE_BYTES ))); } let atomic_write = AtomicWrite { checks, mutations, enqueues, }; let result = db.atomic_write(atomic_write).await?; Ok(result.map(|res| faster_hex::hex_string(&res.versionstamp))) } // (prefix, start, end) type EncodeCursorRangeSelector = (Option, Option, Option); #[op2] #[string] fn op_kv_encode_cursor( #[serde] (prefix, start, end): EncodeCursorRangeSelector, #[serde] boundary_key: KvKey, ) -> Result { let selector = RawSelector::from_tuple(prefix, start, end)?; let boundary_key = encode_v8_key(boundary_key)?; let cursor = encode_cursor(&selector, &boundary_key)?; Ok(cursor) } fn check_read_key_size(key: &[u8]) -> Result<(), AnyError> { if key.len() > MAX_READ_KEY_SIZE_BYTES { Err(type_error(format!( "key too large for read (max {} bytes)", MAX_READ_KEY_SIZE_BYTES ))) } else { Ok(()) } } fn check_write_key_size(key: &[u8]) -> Result { if key.len() > MAX_WRITE_KEY_SIZE_BYTES { Err(type_error(format!( "key too large for write (max {} bytes)", MAX_WRITE_KEY_SIZE_BYTES ))) } else { Ok(key.len()) } } fn check_value_size(value: &KvValue) -> Result { let payload = match value { KvValue::Bytes(x) => x, KvValue::V8(x) => x, KvValue::U64(_) => return Ok(8), }; if payload.len() > MAX_VALUE_SIZE_BYTES { Err(type_error(format!( "value too large (max {} bytes)", MAX_VALUE_SIZE_BYTES ))) } else { Ok(payload.len()) } } fn check_enqueue_payload_size(payload: &[u8]) -> Result { if payload.len() > MAX_VALUE_SIZE_BYTES { Err(type_error(format!( "enqueue payload too large (max {} bytes)", MAX_VALUE_SIZE_BYTES ))) } else { Ok(payload.len()) } }