1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-22 15:06:54 -05:00
denoland-deno/ext/kv/lib.rs
Bartek Iwańczuk dda0f1c343
refactor(serde_v8): split ZeroCopyBuf into JsBuffer and ToJsBuffer (#19566)
`ZeroCopyBuf` was convenient to use, but sometimes it did hide details
that some copies were necessary in certain cases. Also it made it way to easy
for the caller to pass around and convert into different values. This commit
splits `ZeroCopyBuf` into `JsBuffer` (an array buffer coming from V8) and
`ToJsBuffer` (a Rust buffer that will be converted into a V8 array buffer).

As a result some magical conversions were removed (they were never used)
limiting the API surface and preparing for changes in #19534.
2023-06-22 23:37:56 +02:00

727 lines
18 KiB
Rust

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
pub mod codec;
mod interface;
pub mod sqlite;
use std::borrow::Cow;
use std::cell::RefCell;
use std::num::NonZeroU32;
use std::rc::Rc;
use std::vec;
use codec::decode_key;
use codec::encode_key;
use deno_core::anyhow::Context;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::serde_v8::AnyValue;
use deno_core::serde_v8::BigInt;
use deno_core::ByteString;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ToJsBuffer;
use serde::Deserialize;
use serde::Serialize;
pub use crate::interface::*;
const MAX_WRITE_KEY_SIZE_BYTES: usize = 2048;
// range selectors can contain 0x00 or 0xff suffixes
const MAX_READ_KEY_SIZE_BYTES: usize = MAX_WRITE_KEY_SIZE_BYTES + 1;
const MAX_VALUE_SIZE_BYTES: usize = 65536;
const MAX_READ_RANGES: usize = 10;
const MAX_READ_ENTRIES: usize = 1000;
const MAX_CHECKS: usize = 10;
const MAX_MUTATIONS: usize = 10;
struct UnstableChecker {
pub unstable: bool,
}
impl UnstableChecker {
// NOTE(bartlomieju): keep in sync with `cli/program_state.rs`
pub fn check_unstable(&self, api_name: &str) {
if !self.unstable {
eprintln!(
"Unstable API '{api_name}'. The --unstable flag must be provided."
);
std::process::exit(70);
}
}
}
deno_core::extension!(deno_kv,
deps = [ deno_console ],
parameters = [ DBH: DatabaseHandler ],
ops = [
op_kv_database_open<DBH>,
op_kv_snapshot_read<DBH>,
op_kv_atomic_write<DBH>,
op_kv_encode_cursor,
op_kv_dequeue_next_message<DBH>,
op_kv_finish_dequeued_message<DBH>,
],
esm = [ "01_db.ts" ],
options = {
handler: DBH,
unstable: bool,
},
state = |state, options| {
state.put(Rc::new(options.handler));
state.put(UnstableChecker { unstable: options.unstable })
}
);
struct DatabaseResource<DB: Database + 'static> {
db: Rc<DB>,
}
impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
fn name(&self) -> Cow<str> {
"database".into()
}
fn close(self: Rc<Self>) {
self.db.close();
}
}
#[op]
async fn op_kv_database_open<DBH>(
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<ResourceId, AnyError>
where
DBH: DatabaseHandler + 'static,
{
let handler = {
let state = state.borrow();
state
.borrow::<UnstableChecker>()
.check_unstable("Deno.openKv");
state.borrow::<Rc<DBH>>().clone()
};
let db = handler.open(state.clone(), path).await?;
let rid = state
.borrow_mut()
.resource_table
.add(DatabaseResource { db: Rc::new(db) });
Ok(rid)
}
type KvKey = Vec<AnyValue>;
impl From<AnyValue> for KeyPart {
fn from(value: AnyValue) -> Self {
match value {
AnyValue::Bool(false) => KeyPart::False,
AnyValue::Bool(true) => KeyPart::True,
AnyValue::Number(n) => KeyPart::Float(n),
AnyValue::BigInt(n) => KeyPart::Int(n),
AnyValue::String(s) => KeyPart::String(s),
AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
AnyValue::RustBuffer(_) => unreachable!(),
}
}
}
impl From<KeyPart> for AnyValue {
fn from(value: KeyPart) -> Self {
match value {
KeyPart::False => AnyValue::Bool(false),
KeyPart::True => AnyValue::Bool(true),
KeyPart::Float(n) => AnyValue::Number(n),
KeyPart::Int(n) => AnyValue::BigInt(n),
KeyPart::String(s) => AnyValue::String(s),
KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()),
}
}
}
#[derive(Debug, Deserialize)]
#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
enum FromV8Value {
V8(JsBuffer),
Bytes(JsBuffer),
U64(BigInt),
}
#[derive(Debug, Serialize)]
#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
enum ToV8Value {
V8(ToJsBuffer),
Bytes(ToJsBuffer),
U64(BigInt),
}
impl TryFrom<FromV8Value> for Value {
type Error = AnyError;
fn try_from(value: FromV8Value) -> Result<Self, AnyError> {
Ok(match value {
FromV8Value::V8(buf) => Value::V8(buf.to_vec()),
FromV8Value::Bytes(buf) => Value::Bytes(buf.to_vec()),
FromV8Value::U64(n) => {
Value::U64(num_bigint::BigInt::from(n).try_into()?)
}
})
}
}
impl From<Value> for ToV8Value {
fn from(value: Value) -> Self {
match value {
Value::V8(buf) => ToV8Value::V8(buf.into()),
Value::Bytes(buf) => ToV8Value::Bytes(buf.into()),
Value::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()),
}
}
}
#[derive(Serialize)]
struct ToV8KvEntry {
key: KvKey,
value: ToV8Value,
versionstamp: ByteString,
}
impl TryFrom<KvEntry> for ToV8KvEntry {
type Error = AnyError;
fn try_from(entry: KvEntry) -> Result<Self, AnyError> {
Ok(ToV8KvEntry {
key: decode_key(&entry.key)?
.0
.into_iter()
.map(Into::into)
.collect(),
value: entry.value.into(),
versionstamp: hex::encode(entry.versionstamp).into(),
})
}
}
#[derive(Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
enum V8Consistency {
Strong,
Eventual,
}
impl From<V8Consistency> for Consistency {
fn from(value: V8Consistency) -> Self {
match value {
V8Consistency::Strong => Consistency::Strong,
V8Consistency::Eventual => Consistency::Eventual,
}
}
}
// (prefix, start, end, limit, reverse, cursor)
type SnapshotReadRange = (
Option<KvKey>,
Option<KvKey>,
Option<KvKey>,
u32,
bool,
Option<ByteString>,
);
#[op]
async fn op_kv_snapshot_read<DBH>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
ranges: Vec<SnapshotReadRange>,
consistency: V8Consistency,
) -> Result<Vec<Vec<ToV8KvEntry>>, AnyError>
where
DBH: DatabaseHandler + 'static,
{
let db = {
let state = state.borrow();
let resource =
state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
resource.db.clone()
};
if ranges.len() > MAX_READ_RANGES {
return Err(type_error(format!(
"too many ranges (max {})",
MAX_READ_RANGES
)));
}
let mut total_entries = 0usize;
let read_ranges = ranges
.into_iter()
.map(|(prefix, start, end, limit, reverse, cursor)| {
let selector = RawSelector::from_tuple(prefix, start, end)?;
let (start, end) =
decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?;
check_read_key_size(&start)?;
check_read_key_size(&end)?;
total_entries += limit as usize;
Ok(ReadRange {
start,
end,
limit: NonZeroU32::new(limit)
.with_context(|| "limit must be greater than 0")?,
reverse,
})
})
.collect::<Result<Vec<_>, AnyError>>()?;
if total_entries > MAX_READ_ENTRIES {
return Err(type_error(format!(
"too many entries (max {})",
MAX_READ_ENTRIES
)));
}
let opts = SnapshotReadOptions {
consistency: consistency.into(),
};
let output_ranges = db.snapshot_read(read_ranges, opts).await?;
let output_ranges = output_ranges
.into_iter()
.map(|x| {
x.entries
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, AnyError>>()
})
.collect::<Result<Vec<_>, AnyError>>()?;
Ok(output_ranges)
}
struct QueueMessageResource<QPH: QueueMessageHandle + 'static> {
handle: QPH,
}
impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
fn name(&self) -> Cow<str> {
"queue_message".into()
}
}
#[op]
async fn op_kv_dequeue_next_message<DBH>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
) -> Result<(ToJsBuffer, ResourceId), AnyError>
where
DBH: DatabaseHandler + 'static,
{
let db = {
let state = state.borrow();
let resource =
state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
resource.db.clone()
};
let mut handle = db.dequeue_next_message().await?;
let payload = handle.take_payload().await?.into();
let handle_rid = {
let mut state = state.borrow_mut();
state.resource_table.add(QueueMessageResource { handle })
};
Ok((payload, handle_rid))
}
#[op]
async fn op_kv_finish_dequeued_message<DBH>(
state: Rc<RefCell<OpState>>,
handle_rid: ResourceId,
success: bool,
) -> Result<(), AnyError>
where
DBH: DatabaseHandler + 'static,
{
let handle = {
let mut state = state.borrow_mut();
let handle = state
.resource_table
.take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid)
.map_err(|_| type_error("Queue message not found"))?;
Rc::try_unwrap(handle)
.map_err(|_| type_error("Queue message not found"))?
.handle
};
handle.finish(success).await
}
type V8KvCheck = (KvKey, Option<ByteString>);
impl TryFrom<V8KvCheck> for KvCheck {
type Error = AnyError;
fn try_from(value: V8KvCheck) -> Result<Self, AnyError> {
let versionstamp = match value.1 {
Some(data) => {
let mut out = [0u8; 10];
hex::decode_to_slice(data, &mut out)
.map_err(|_| type_error("invalid versionstamp"))?;
Some(out)
}
None => None,
};
Ok(KvCheck {
key: encode_v8_key(value.0)?,
versionstamp,
})
}
}
type V8KvMutation = (KvKey, String, Option<FromV8Value>);
impl TryFrom<V8KvMutation> for KvMutation {
type Error = AnyError;
fn try_from(value: V8KvMutation) -> Result<Self, AnyError> {
let key = encode_v8_key(value.0)?;
let kind = match (value.1.as_str(), value.2) {
("set", Some(value)) => MutationKind::Set(value.try_into()?),
("delete", None) => MutationKind::Delete,
("sum", Some(value)) => MutationKind::Sum(value.try_into()?),
("min", Some(value)) => MutationKind::Min(value.try_into()?),
("max", Some(value)) => MutationKind::Max(value.try_into()?),
(op, Some(_)) => {
return Err(type_error(format!("invalid mutation '{op}' with value")))
}
(op, None) => {
return Err(type_error(format!(
"invalid mutation '{op}' without value"
)))
}
};
Ok(KvMutation { key, kind })
}
}
type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>);
impl TryFrom<V8Enqueue> for Enqueue {
type Error = AnyError;
fn try_from(value: V8Enqueue) -> Result<Self, AnyError> {
Ok(Enqueue {
payload: value.0.to_vec(),
delay_ms: value.1,
keys_if_undelivered: value
.2
.into_iter()
.map(encode_v8_key)
.collect::<std::io::Result<_>>()?,
backoff_schedule: value.3,
})
}
}
fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> {
encode_key(&Key(key.into_iter().map(From::from).collect()))
}
enum RawSelector {
Prefixed {
prefix: Vec<u8>,
start: Option<Vec<u8>>,
end: Option<Vec<u8>>,
},
Range {
start: Vec<u8>,
end: Vec<u8>,
},
}
impl RawSelector {
fn from_tuple(
prefix: Option<KvKey>,
start: Option<KvKey>,
end: Option<KvKey>,
) -> Result<Self, AnyError> {
let prefix = prefix.map(encode_v8_key).transpose()?;
let start = start.map(encode_v8_key).transpose()?;
let end = end.map(encode_v8_key).transpose()?;
match (prefix, start, end) {
(Some(prefix), None, None) => Ok(Self::Prefixed {
prefix,
start: None,
end: None,
}),
(Some(prefix), Some(start), None) => Ok(Self::Prefixed {
prefix,
start: Some(start),
end: None,
}),
(Some(prefix), None, Some(end)) => Ok(Self::Prefixed {
prefix,
start: None,
end: Some(end),
}),
(None, Some(start), Some(end)) => Ok(Self::Range { start, end }),
(None, Some(start), None) => {
let end = start.iter().copied().chain(Some(0)).collect();
Ok(Self::Range { start, end })
}
_ => Err(type_error("invalid range")),
}
}
fn start(&self) -> Option<&[u8]> {
match self {
Self::Prefixed { start, .. } => start.as_deref(),
Self::Range { start, .. } => Some(start),
}
}
fn end(&self) -> Option<&[u8]> {
match self {
Self::Prefixed { end, .. } => end.as_deref(),
Self::Range { end, .. } => Some(end),
}
}
fn common_prefix(&self) -> &[u8] {
match self {
Self::Prefixed { prefix, .. } => prefix,
Self::Range { start, end } => common_prefix_for_bytes(start, end),
}
}
fn range_start_key(&self) -> Vec<u8> {
match self {
Self::Prefixed {
start: Some(start), ..
} => start.clone(),
Self::Range { start, .. } => start.clone(),
Self::Prefixed { prefix, .. } => {
prefix.iter().copied().chain(Some(0)).collect()
}
}
}
fn range_end_key(&self) -> Vec<u8> {
match self {
Self::Prefixed { end: Some(end), .. } => end.clone(),
Self::Range { end, .. } => end.clone(),
Self::Prefixed { prefix, .. } => {
prefix.iter().copied().chain(Some(0xff)).collect()
}
}
}
}
fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] {
let mut i = 0;
while i < a.len() && i < b.len() && a[i] == b[i] {
i += 1;
}
&a[..i]
}
fn encode_cursor(
selector: &RawSelector,
boundary_key: &[u8],
) -> Result<String, AnyError> {
let common_prefix = selector.common_prefix();
if !boundary_key.starts_with(common_prefix) {
return Err(type_error("invalid boundary key"));
}
Ok(base64::encode_config(
&boundary_key[common_prefix.len()..],
base64::URL_SAFE,
))
}
fn decode_selector_and_cursor(
selector: &RawSelector,
reverse: bool,
cursor: Option<&ByteString>,
) -> Result<(Vec<u8>, Vec<u8>), AnyError> {
let Some(cursor) = cursor else {
return Ok((selector.range_start_key(), selector.range_end_key()));
};
let common_prefix = selector.common_prefix();
let cursor = base64::decode_config(cursor, base64::URL_SAFE)
.map_err(|_| type_error("invalid cursor"))?;
let first_key: Vec<u8>;
let last_key: Vec<u8>;
if reverse {
first_key = selector.range_start_key();
last_key = common_prefix
.iter()
.copied()
.chain(cursor.iter().copied())
.collect();
} else {
first_key = common_prefix
.iter()
.copied()
.chain(cursor.iter().copied())
.chain(Some(0))
.collect();
last_key = selector.range_end_key();
}
// Defend against out-of-bounds reading
if let Some(start) = selector.start() {
if &first_key[..] < start {
return Err(type_error("cursor out of bounds"));
}
}
if let Some(end) = selector.end() {
if &last_key[..] > end {
return Err(type_error("cursor out of bounds"));
}
}
Ok((first_key, last_key))
}
#[op]
async fn op_kv_atomic_write<DBH>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
checks: Vec<V8KvCheck>,
mutations: Vec<V8KvMutation>,
enqueues: Vec<V8Enqueue>,
) -> Result<Option<String>, AnyError>
where
DBH: DatabaseHandler + 'static,
{
let db = {
let state = state.borrow();
let resource =
state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
resource.db.clone()
};
if checks.len() > MAX_CHECKS {
return Err(type_error(format!("too many checks (max {})", MAX_CHECKS)));
}
if mutations.len() + enqueues.len() > MAX_MUTATIONS {
return Err(type_error(format!(
"too many mutations (max {})",
MAX_MUTATIONS
)));
}
let checks = checks
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<KvCheck>, AnyError>>()
.with_context(|| "invalid check")?;
let mutations = mutations
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<KvMutation>, AnyError>>()
.with_context(|| "invalid mutation")?;
let enqueues = enqueues
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<Enqueue>, AnyError>>()
.with_context(|| "invalid enqueue")?;
for key in checks
.iter()
.map(|c| &c.key)
.chain(mutations.iter().map(|m| &m.key))
{
if key.is_empty() {
return Err(type_error("key cannot be empty"));
}
check_write_key_size(key)?;
}
for value in mutations.iter().flat_map(|m| m.kind.value()) {
check_value_size(value)?;
}
for enqueue in &enqueues {
check_enqueue_payload_size(&enqueue.payload)?;
}
let atomic_write = AtomicWrite {
checks,
mutations,
enqueues,
};
let result = db.atomic_write(atomic_write).await?;
Ok(result.map(|res| hex::encode(res.versionstamp)))
}
// (prefix, start, end)
type EncodeCursorRangeSelector = (Option<KvKey>, Option<KvKey>, Option<KvKey>);
#[op]
fn op_kv_encode_cursor(
(prefix, start, end): EncodeCursorRangeSelector,
boundary_key: KvKey,
) -> Result<String, AnyError> {
let selector = RawSelector::from_tuple(prefix, start, end)?;
let boundary_key = encode_v8_key(boundary_key)?;
let cursor = encode_cursor(&selector, &boundary_key)?;
Ok(cursor)
}
fn check_read_key_size(key: &[u8]) -> Result<(), AnyError> {
if key.len() > MAX_READ_KEY_SIZE_BYTES {
Err(type_error(format!(
"key too large for read (max {} bytes)",
MAX_READ_KEY_SIZE_BYTES
)))
} else {
Ok(())
}
}
fn check_write_key_size(key: &[u8]) -> Result<(), AnyError> {
if key.len() > MAX_WRITE_KEY_SIZE_BYTES {
Err(type_error(format!(
"key too large for write (max {} bytes)",
MAX_WRITE_KEY_SIZE_BYTES
)))
} else {
Ok(())
}
}
fn check_value_size(value: &Value) -> Result<(), AnyError> {
let payload = match value {
Value::Bytes(x) => x,
Value::V8(x) => x,
Value::U64(_) => return Ok(()),
};
if payload.len() > MAX_VALUE_SIZE_BYTES {
Err(type_error(format!(
"value too large (max {} bytes)",
MAX_VALUE_SIZE_BYTES
)))
} else {
Ok(())
}
}
fn check_enqueue_payload_size(payload: &[u8]) -> Result<(), AnyError> {
if payload.len() > MAX_VALUE_SIZE_BYTES {
Err(type_error(format!(
"enqueue payload too large (max {} bytes)",
MAX_VALUE_SIZE_BYTES
)))
} else {
Ok(())
}
}