mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
refactor(ext/kv): use concrete error type (#26239)
This commit is contained in:
parent
ed13efc4ac
commit
7c790da826
4 changed files with 220 additions and 115 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1720,6 +1720,7 @@ dependencies = [
|
||||||
"rand",
|
"rand",
|
||||||
"rusqlite",
|
"rusqlite",
|
||||||
"serde",
|
"serde",
|
||||||
|
"thiserror",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ prost.workspace = true
|
||||||
rand.workspace = true
|
rand.workspace = true
|
||||||
rusqlite.workspace = true
|
rusqlite.workspace = true
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
|
thiserror.workspace = true
|
||||||
url.workspace = true
|
url.workspace = true
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
|
|
288
ext/kv/lib.rs
288
ext/kv/lib.rs
|
@ -12,15 +12,11 @@ use std::num::NonZeroU32;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use anyhow::bail;
|
|
||||||
use base64::prelude::BASE64_URL_SAFE;
|
use base64::prelude::BASE64_URL_SAFE;
|
||||||
use base64::Engine;
|
use base64::Engine;
|
||||||
use chrono::DateTime;
|
use chrono::DateTime;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use deno_core::anyhow::Context;
|
|
||||||
use deno_core::error::get_custom_error_class;
|
use deno_core::error::get_custom_error_class;
|
||||||
use deno_core::error::type_error;
|
|
||||||
use deno_core::error::AnyError;
|
|
||||||
use deno_core::futures::StreamExt;
|
use deno_core::futures::StreamExt;
|
||||||
use deno_core::op2;
|
use deno_core::op2;
|
||||||
use deno_core::serde_v8::AnyValue;
|
use deno_core::serde_v8::AnyValue;
|
||||||
|
@ -118,12 +114,72 @@ impl Resource for DatabaseWatcherResource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum KvError {
|
||||||
|
#[error(transparent)]
|
||||||
|
DatabaseHandler(deno_core::error::AnyError),
|
||||||
|
#[error(transparent)]
|
||||||
|
Resource(deno_core::error::AnyError),
|
||||||
|
#[error("Too many ranges (max {0})")]
|
||||||
|
TooManyRanges(usize),
|
||||||
|
#[error("Too many entries (max {0})")]
|
||||||
|
TooManyEntries(usize),
|
||||||
|
#[error("Too many checks (max {0})")]
|
||||||
|
TooManyChecks(usize),
|
||||||
|
#[error("Too many mutations (max {0})")]
|
||||||
|
TooManyMutations(usize),
|
||||||
|
#[error("Too many keys (max {0})")]
|
||||||
|
TooManyKeys(usize),
|
||||||
|
#[error("limit must be greater than 0")]
|
||||||
|
InvalidLimit,
|
||||||
|
#[error("Invalid boundary key")]
|
||||||
|
InvalidBoundaryKey,
|
||||||
|
#[error("Key too large for read (max {0} bytes)")]
|
||||||
|
KeyTooLargeToRead(usize),
|
||||||
|
#[error("Key too large for write (max {0} bytes)")]
|
||||||
|
KeyTooLargeToWrite(usize),
|
||||||
|
#[error("Total mutation size too large (max {0} bytes)")]
|
||||||
|
TotalMutationTooLarge(usize),
|
||||||
|
#[error("Total key size too large (max {0} bytes)")]
|
||||||
|
TotalKeyTooLarge(usize),
|
||||||
|
#[error(transparent)]
|
||||||
|
Kv(deno_core::error::AnyError),
|
||||||
|
#[error(transparent)]
|
||||||
|
Io(#[from] std::io::Error),
|
||||||
|
#[error("Queue message not found")]
|
||||||
|
QueueMessageNotFound,
|
||||||
|
#[error("Start key is not in the keyspace defined by prefix")]
|
||||||
|
StartKeyNotInKeyspace,
|
||||||
|
#[error("End key is not in the keyspace defined by prefix")]
|
||||||
|
EndKeyNotInKeyspace,
|
||||||
|
#[error("Start key is greater than end key")]
|
||||||
|
StartKeyGreaterThanEndKey,
|
||||||
|
#[error("Invalid check")]
|
||||||
|
InvalidCheck(#[source] KvCheckError),
|
||||||
|
#[error("Invalid mutation")]
|
||||||
|
InvalidMutation(#[source] KvMutationError),
|
||||||
|
#[error("Invalid enqueue")]
|
||||||
|
InvalidEnqueue(#[source] std::io::Error),
|
||||||
|
#[error("key cannot be empty")]
|
||||||
|
EmptyKey, // TypeError
|
||||||
|
#[error("Value too large (max {0} bytes)")]
|
||||||
|
ValueTooLarge(usize), // TypeError
|
||||||
|
#[error("enqueue payload too large (max {0} bytes)")]
|
||||||
|
EnqueuePayloadTooLarge(usize), // TypeError
|
||||||
|
#[error("invalid cursor")]
|
||||||
|
InvalidCursor,
|
||||||
|
#[error("cursor out of bounds")]
|
||||||
|
CursorOutOfBounds,
|
||||||
|
#[error("Invalid range")]
|
||||||
|
InvalidRange,
|
||||||
|
}
|
||||||
|
|
||||||
#[op2(async)]
|
#[op2(async)]
|
||||||
#[smi]
|
#[smi]
|
||||||
async fn op_kv_database_open<DBH>(
|
async fn op_kv_database_open<DBH>(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
#[string] path: Option<String>,
|
#[string] path: Option<String>,
|
||||||
) -> Result<ResourceId, AnyError>
|
) -> Result<ResourceId, KvError>
|
||||||
where
|
where
|
||||||
DBH: DatabaseHandler + 'static,
|
DBH: DatabaseHandler + 'static,
|
||||||
{
|
{
|
||||||
|
@ -134,7 +190,10 @@ where
|
||||||
.check_or_exit(UNSTABLE_FEATURE_NAME, "Deno.openKv");
|
.check_or_exit(UNSTABLE_FEATURE_NAME, "Deno.openKv");
|
||||||
state.borrow::<Rc<DBH>>().clone()
|
state.borrow::<Rc<DBH>>().clone()
|
||||||
};
|
};
|
||||||
let db = handler.open(state.clone(), path).await?;
|
let db = handler
|
||||||
|
.open(state.clone(), path)
|
||||||
|
.await
|
||||||
|
.map_err(KvError::DatabaseHandler)?;
|
||||||
let rid = state.borrow_mut().resource_table.add(DatabaseResource {
|
let rid = state.borrow_mut().resource_table.add(DatabaseResource {
|
||||||
db,
|
db,
|
||||||
cancel_handle: CancelHandle::new_rc(),
|
cancel_handle: CancelHandle::new_rc(),
|
||||||
|
@ -184,8 +243,8 @@ enum ToV8Value {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<FromV8Value> for KvValue {
|
impl TryFrom<FromV8Value> for KvValue {
|
||||||
type Error = AnyError;
|
type Error = num_bigint::TryFromBigIntError<num_bigint::BigInt>;
|
||||||
fn try_from(value: FromV8Value) -> Result<Self, AnyError> {
|
fn try_from(value: FromV8Value) -> Result<Self, Self::Error> {
|
||||||
Ok(match value {
|
Ok(match value {
|
||||||
FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()),
|
FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()),
|
||||||
FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()),
|
FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()),
|
||||||
|
@ -214,8 +273,8 @@ struct ToV8KvEntry {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<KvEntry> for ToV8KvEntry {
|
impl TryFrom<KvEntry> for ToV8KvEntry {
|
||||||
type Error = AnyError;
|
type Error = std::io::Error;
|
||||||
fn try_from(entry: KvEntry) -> Result<Self, AnyError> {
|
fn try_from(entry: KvEntry) -> Result<Self, Self::Error> {
|
||||||
Ok(ToV8KvEntry {
|
Ok(ToV8KvEntry {
|
||||||
key: decode_key(&entry.key)?
|
key: decode_key(&entry.key)?
|
||||||
.0
|
.0
|
||||||
|
@ -261,14 +320,16 @@ async fn op_kv_snapshot_read<DBH>(
|
||||||
#[smi] rid: ResourceId,
|
#[smi] rid: ResourceId,
|
||||||
#[serde] ranges: Vec<SnapshotReadRange>,
|
#[serde] ranges: Vec<SnapshotReadRange>,
|
||||||
#[serde] consistency: V8Consistency,
|
#[serde] consistency: V8Consistency,
|
||||||
) -> Result<Vec<Vec<ToV8KvEntry>>, AnyError>
|
) -> Result<Vec<Vec<ToV8KvEntry>>, KvError>
|
||||||
where
|
where
|
||||||
DBH: DatabaseHandler + 'static,
|
DBH: DatabaseHandler + 'static,
|
||||||
{
|
{
|
||||||
let db = {
|
let db = {
|
||||||
let state = state.borrow();
|
let state = state.borrow();
|
||||||
let resource =
|
let resource = state
|
||||||
state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
|
.resource_table
|
||||||
|
.get::<DatabaseResource<DBH::DB>>(rid)
|
||||||
|
.map_err(KvError::Resource)?;
|
||||||
resource.db.clone()
|
resource.db.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -278,10 +339,7 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
if ranges.len() > config.max_read_ranges {
|
if ranges.len() > config.max_read_ranges {
|
||||||
return Err(type_error(format!(
|
return Err(KvError::TooManyRanges(config.max_read_ranges));
|
||||||
"Too many ranges (max {})",
|
|
||||||
config.max_read_ranges
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut total_entries = 0usize;
|
let mut total_entries = 0usize;
|
||||||
|
@ -300,33 +358,32 @@ where
|
||||||
Ok(ReadRange {
|
Ok(ReadRange {
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
limit: NonZeroU32::new(limit)
|
limit: NonZeroU32::new(limit).ok_or(KvError::InvalidLimit)?,
|
||||||
.with_context(|| "limit must be greater than 0")?,
|
|
||||||
reverse,
|
reverse,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.collect::<Result<Vec<_>, AnyError>>()?;
|
.collect::<Result<Vec<_>, KvError>>()?;
|
||||||
|
|
||||||
if total_entries > config.max_read_entries {
|
if total_entries > config.max_read_entries {
|
||||||
return Err(type_error(format!(
|
return Err(KvError::TooManyEntries(config.max_read_entries));
|
||||||
"Too many entries (max {})",
|
|
||||||
config.max_read_entries
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let opts = SnapshotReadOptions {
|
let opts = SnapshotReadOptions {
|
||||||
consistency: consistency.into(),
|
consistency: consistency.into(),
|
||||||
};
|
};
|
||||||
let output_ranges = db.snapshot_read(read_ranges, opts).await?;
|
let output_ranges = db
|
||||||
|
.snapshot_read(read_ranges, opts)
|
||||||
|
.await
|
||||||
|
.map_err(KvError::Kv)?;
|
||||||
let output_ranges = output_ranges
|
let output_ranges = output_ranges
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|x| {
|
.map(|x| {
|
||||||
x.entries
|
x.entries
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(TryInto::try_into)
|
.map(TryInto::try_into)
|
||||||
.collect::<Result<Vec<_>, AnyError>>()
|
.collect::<Result<Vec<_>, std::io::Error>>()
|
||||||
})
|
})
|
||||||
.collect::<Result<Vec<_>, AnyError>>()?;
|
.collect::<Result<Vec<_>, std::io::Error>>()?;
|
||||||
Ok(output_ranges)
|
Ok(output_ranges)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -345,7 +402,7 @@ impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
|
||||||
async fn op_kv_dequeue_next_message<DBH>(
|
async fn op_kv_dequeue_next_message<DBH>(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
#[smi] rid: ResourceId,
|
#[smi] rid: ResourceId,
|
||||||
) -> Result<Option<(ToJsBuffer, ResourceId)>, AnyError>
|
) -> Result<Option<(ToJsBuffer, ResourceId)>, KvError>
|
||||||
where
|
where
|
||||||
DBH: DatabaseHandler + 'static,
|
DBH: DatabaseHandler + 'static,
|
||||||
{
|
{
|
||||||
|
@ -358,17 +415,19 @@ where
|
||||||
if get_custom_error_class(&err) == Some("BadResource") {
|
if get_custom_error_class(&err) == Some("BadResource") {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
} else {
|
} else {
|
||||||
return Err(err);
|
return Err(KvError::Resource(err));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
resource.db.clone()
|
resource.db.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
let Some(mut handle) = db.dequeue_next_message().await? else {
|
let Some(mut handle) =
|
||||||
|
db.dequeue_next_message().await.map_err(KvError::Kv)?
|
||||||
|
else {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
let payload = handle.take_payload().await?.into();
|
let payload = handle.take_payload().await.map_err(KvError::Kv)?.into();
|
||||||
let handle_rid = {
|
let handle_rid = {
|
||||||
let mut state = state.borrow_mut();
|
let mut state = state.borrow_mut();
|
||||||
state.resource_table.add(QueueMessageResource { handle })
|
state.resource_table.add(QueueMessageResource { handle })
|
||||||
|
@ -382,18 +441,18 @@ fn op_kv_watch<DBH>(
|
||||||
state: &mut OpState,
|
state: &mut OpState,
|
||||||
#[smi] rid: ResourceId,
|
#[smi] rid: ResourceId,
|
||||||
#[serde] keys: Vec<KvKey>,
|
#[serde] keys: Vec<KvKey>,
|
||||||
) -> Result<ResourceId, AnyError>
|
) -> Result<ResourceId, KvError>
|
||||||
where
|
where
|
||||||
DBH: DatabaseHandler + 'static,
|
DBH: DatabaseHandler + 'static,
|
||||||
{
|
{
|
||||||
let resource = state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
|
let resource = state
|
||||||
|
.resource_table
|
||||||
|
.get::<DatabaseResource<DBH::DB>>(rid)
|
||||||
|
.map_err(KvError::Resource)?;
|
||||||
let config = state.borrow::<Rc<KvConfig>>().clone();
|
let config = state.borrow::<Rc<KvConfig>>().clone();
|
||||||
|
|
||||||
if keys.len() > config.max_watched_keys {
|
if keys.len() > config.max_watched_keys {
|
||||||
return Err(type_error(format!(
|
return Err(KvError::TooManyKeys(config.max_watched_keys));
|
||||||
"Too many keys (max {})",
|
|
||||||
config.max_watched_keys
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let keys: Vec<Vec<u8>> = keys
|
let keys: Vec<Vec<u8>> = keys
|
||||||
|
@ -428,10 +487,13 @@ enum WatchEntry {
|
||||||
async fn op_kv_watch_next(
|
async fn op_kv_watch_next(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
#[smi] rid: ResourceId,
|
#[smi] rid: ResourceId,
|
||||||
) -> Result<Option<Vec<WatchEntry>>, AnyError> {
|
) -> Result<Option<Vec<WatchEntry>>, KvError> {
|
||||||
let resource = {
|
let resource = {
|
||||||
let state = state.borrow();
|
let state = state.borrow();
|
||||||
let resource = state.resource_table.get::<DatabaseWatcherResource>(rid)?;
|
let resource = state
|
||||||
|
.resource_table
|
||||||
|
.get::<DatabaseWatcherResource>(rid)
|
||||||
|
.map_err(KvError::Resource)?;
|
||||||
resource.clone()
|
resource.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -457,7 +519,7 @@ async fn op_kv_watch_next(
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
let entries = res?;
|
let entries = res.map_err(KvError::Kv)?;
|
||||||
let entries = entries
|
let entries = entries
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|entry| {
|
.map(|entry| {
|
||||||
|
@ -468,7 +530,7 @@ async fn op_kv_watch_next(
|
||||||
WatchKeyOutput::Unchanged => WatchEntry::Unchanged,
|
WatchKeyOutput::Unchanged => WatchEntry::Unchanged,
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.collect::<Result<_, anyhow::Error>>()?;
|
.collect::<Result<_, KvError>>()?;
|
||||||
|
|
||||||
Ok(Some(entries))
|
Ok(Some(entries))
|
||||||
}
|
}
|
||||||
|
@ -478,7 +540,7 @@ async fn op_kv_finish_dequeued_message<DBH>(
|
||||||
state: Rc<RefCell<OpState>>,
|
state: Rc<RefCell<OpState>>,
|
||||||
#[smi] handle_rid: ResourceId,
|
#[smi] handle_rid: ResourceId,
|
||||||
success: bool,
|
success: bool,
|
||||||
) -> Result<(), AnyError>
|
) -> Result<(), KvError>
|
||||||
where
|
where
|
||||||
DBH: DatabaseHandler + 'static,
|
DBH: DatabaseHandler + 'static,
|
||||||
{
|
{
|
||||||
|
@ -487,9 +549,9 @@ where
|
||||||
let handle = state
|
let handle = state
|
||||||
.resource_table
|
.resource_table
|
||||||
.take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid)
|
.take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid)
|
||||||
.map_err(|_| type_error("Queue message not found"))?;
|
.map_err(|_| KvError::QueueMessageNotFound)?;
|
||||||
Rc::try_unwrap(handle)
|
Rc::try_unwrap(handle)
|
||||||
.map_err(|_| type_error("Queue message not found"))?
|
.map_err(|_| KvError::QueueMessageNotFound)?
|
||||||
.handle
|
.handle
|
||||||
};
|
};
|
||||||
// if we fail to finish the message, there is not much we can do and the
|
// if we fail to finish the message, there is not much we can do and the
|
||||||
|
@ -500,32 +562,52 @@ where
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum KvCheckError {
|
||||||
|
#[error("invalid versionstamp")]
|
||||||
|
InvalidVersionstamp,
|
||||||
|
#[error(transparent)]
|
||||||
|
Io(std::io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
type V8KvCheck = (KvKey, Option<ByteString>);
|
type V8KvCheck = (KvKey, Option<ByteString>);
|
||||||
|
|
||||||
fn check_from_v8(value: V8KvCheck) -> Result<Check, AnyError> {
|
fn check_from_v8(value: V8KvCheck) -> Result<Check, KvCheckError> {
|
||||||
let versionstamp = match value.1 {
|
let versionstamp = match value.1 {
|
||||||
Some(data) => {
|
Some(data) => {
|
||||||
let mut out = [0u8; 10];
|
let mut out = [0u8; 10];
|
||||||
if data.len() != out.len() * 2 {
|
if data.len() != out.len() * 2 {
|
||||||
bail!(type_error("invalid versionstamp"));
|
return Err(KvCheckError::InvalidVersionstamp);
|
||||||
}
|
}
|
||||||
faster_hex::hex_decode(&data, &mut out)
|
faster_hex::hex_decode(&data, &mut out)
|
||||||
.map_err(|_| type_error("invalid versionstamp"))?;
|
.map_err(|_| KvCheckError::InvalidVersionstamp)?;
|
||||||
Some(out)
|
Some(out)
|
||||||
}
|
}
|
||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
Ok(Check {
|
Ok(Check {
|
||||||
key: encode_v8_key(value.0)?,
|
key: encode_v8_key(value.0).map_err(KvCheckError::Io)?,
|
||||||
versionstamp,
|
versionstamp,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum KvMutationError {
|
||||||
|
#[error(transparent)]
|
||||||
|
BigInt(#[from] num_bigint::TryFromBigIntError<num_bigint::BigInt>),
|
||||||
|
#[error(transparent)]
|
||||||
|
Io(#[from] std::io::Error),
|
||||||
|
#[error("Invalid mutation '{0}' with value")]
|
||||||
|
InvalidMutationWithValue(String),
|
||||||
|
#[error("Invalid mutation '{0}' without value")]
|
||||||
|
InvalidMutationWithoutValue(String),
|
||||||
|
}
|
||||||
|
|
||||||
type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);
|
type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);
|
||||||
|
|
||||||
fn mutation_from_v8(
|
fn mutation_from_v8(
|
||||||
(value, current_timstamp): (V8KvMutation, DateTime<Utc>),
|
(value, current_timstamp): (V8KvMutation, DateTime<Utc>),
|
||||||
) -> Result<Mutation, AnyError> {
|
) -> Result<Mutation, KvMutationError> {
|
||||||
let key = encode_v8_key(value.0)?;
|
let key = encode_v8_key(value.0)?;
|
||||||
let kind = match (value.1.as_str(), value.2) {
|
let kind = match (value.1.as_str(), value.2) {
|
||||||
("set", Some(value)) => MutationKind::Set(value.try_into()?),
|
("set", Some(value)) => MutationKind::Set(value.try_into()?),
|
||||||
|
@ -542,10 +624,10 @@ fn mutation_from_v8(
|
||||||
MutationKind::SetSuffixVersionstampedKey(value.try_into()?)
|
MutationKind::SetSuffixVersionstampedKey(value.try_into()?)
|
||||||
}
|
}
|
||||||
(op, Some(_)) => {
|
(op, Some(_)) => {
|
||||||
return Err(type_error(format!("Invalid mutation '{op}' with value")))
|
return Err(KvMutationError::InvalidMutationWithValue(op.to_string()))
|
||||||
}
|
}
|
||||||
(op, None) => {
|
(op, None) => {
|
||||||
return Err(type_error(format!("Invalid mutation '{op}' without value")))
|
return Err(KvMutationError::InvalidMutationWithoutValue(op.to_string()))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Ok(Mutation {
|
Ok(Mutation {
|
||||||
|
@ -562,7 +644,7 @@ type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>);
|
||||||
fn enqueue_from_v8(
|
fn enqueue_from_v8(
|
||||||
value: V8Enqueue,
|
value: V8Enqueue,
|
||||||
current_timestamp: DateTime<Utc>,
|
current_timestamp: DateTime<Utc>,
|
||||||
) -> Result<Enqueue, AnyError> {
|
) -> Result<Enqueue, std::io::Error> {
|
||||||
Ok(Enqueue {
|
Ok(Enqueue {
|
||||||
payload: value.0.to_vec(),
|
payload: value.0.to_vec(),
|
||||||
deadline: current_timestamp
|
deadline: current_timestamp
|
||||||
|
@ -597,7 +679,7 @@ impl RawSelector {
|
||||||
prefix: Option<KvKey>,
|
prefix: Option<KvKey>,
|
||||||
start: Option<KvKey>,
|
start: Option<KvKey>,
|
||||||
end: Option<KvKey>,
|
end: Option<KvKey>,
|
||||||
) -> Result<Self, AnyError> {
|
) -> Result<Self, KvError> {
|
||||||
let prefix = prefix.map(encode_v8_key).transpose()?;
|
let prefix = prefix.map(encode_v8_key).transpose()?;
|
||||||
let start = start.map(encode_v8_key).transpose()?;
|
let start = start.map(encode_v8_key).transpose()?;
|
||||||
let end = end.map(encode_v8_key).transpose()?;
|
let end = end.map(encode_v8_key).transpose()?;
|
||||||
|
@ -610,9 +692,7 @@ impl RawSelector {
|
||||||
}),
|
}),
|
||||||
(Some(prefix), Some(start), None) => {
|
(Some(prefix), Some(start), None) => {
|
||||||
if !start.starts_with(&prefix) || start.len() == prefix.len() {
|
if !start.starts_with(&prefix) || start.len() == prefix.len() {
|
||||||
return Err(type_error(
|
return Err(KvError::StartKeyNotInKeyspace);
|
||||||
"Start key is not in the keyspace defined by prefix",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
Ok(Self::Prefixed {
|
Ok(Self::Prefixed {
|
||||||
prefix,
|
prefix,
|
||||||
|
@ -622,9 +702,7 @@ impl RawSelector {
|
||||||
}
|
}
|
||||||
(Some(prefix), None, Some(end)) => {
|
(Some(prefix), None, Some(end)) => {
|
||||||
if !end.starts_with(&prefix) || end.len() == prefix.len() {
|
if !end.starts_with(&prefix) || end.len() == prefix.len() {
|
||||||
return Err(type_error(
|
return Err(KvError::EndKeyNotInKeyspace);
|
||||||
"End key is not in the keyspace defined by prefix",
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
Ok(Self::Prefixed {
|
Ok(Self::Prefixed {
|
||||||
prefix,
|
prefix,
|
||||||
|
@ -634,7 +712,7 @@ impl RawSelector {
|
||||||
}
|
}
|
||||||
(None, Some(start), Some(end)) => {
|
(None, Some(start), Some(end)) => {
|
||||||
if start > end {
|
if start > end {
|
||||||
return Err(type_error("Start key is greater than end key"));
|
return Err(KvError::StartKeyGreaterThanEndKey);
|
||||||
}
|
}
|
||||||
Ok(Self::Range { start, end })
|
Ok(Self::Range { start, end })
|
||||||
}
|
}
|
||||||
|
@ -642,7 +720,7 @@ impl RawSelector {
|
||||||
let end = start.iter().copied().chain(Some(0)).collect();
|
let end = start.iter().copied().chain(Some(0)).collect();
|
||||||
Ok(Self::Range { start, end })
|
Ok(Self::Range { start, end })
|
||||||
}
|
}
|
||||||
_ => Err(type_error("Invalid range")),
|
_ => Err(KvError::InvalidRange),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -701,10 +779,10 @@ fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] {
|
||||||
fn encode_cursor(
|
fn encode_cursor(
|
||||||
selector: &RawSelector,
|
selector: &RawSelector,
|
||||||
boundary_key: &[u8],
|
boundary_key: &[u8],
|
||||||
) -> Result<String, AnyError> {
|
) -> Result<String, KvError> {
|
||||||
let common_prefix = selector.common_prefix();
|
let common_prefix = selector.common_prefix();
|
||||||
if !boundary_key.starts_with(common_prefix) {
|
if !boundary_key.starts_with(common_prefix) {
|
||||||
return Err(type_error("Invalid boundary key"));
|
return Err(KvError::InvalidBoundaryKey);
|
||||||
}
|
}
|
||||||
Ok(BASE64_URL_SAFE.encode(&boundary_key[common_prefix.len()..]))
|
Ok(BASE64_URL_SAFE.encode(&boundary_key[common_prefix.len()..]))
|
||||||
}
|
}
|
||||||
|
@ -713,7 +791,7 @@ fn decode_selector_and_cursor(
|
||||||
selector: &RawSelector,
|
selector: &RawSelector,
|
||||||
reverse: bool,
|
reverse: bool,
|
||||||
cursor: Option<&ByteString>,
|
cursor: Option<&ByteString>,
|
||||||
) -> Result<(Vec<u8>, Vec<u8>), AnyError> {
|
) -> Result<(Vec<u8>, Vec<u8>), KvError> {
|
||||||
let Some(cursor) = cursor else {
|
let Some(cursor) = cursor else {
|
||||||
return Ok((selector.range_start_key(), selector.range_end_key()));
|
return Ok((selector.range_start_key(), selector.range_end_key()));
|
||||||
};
|
};
|
||||||
|
@ -721,7 +799,7 @@ fn decode_selector_and_cursor(
|
||||||
let common_prefix = selector.common_prefix();
|
let common_prefix = selector.common_prefix();
|
||||||
let cursor = BASE64_URL_SAFE
|
let cursor = BASE64_URL_SAFE
|
||||||
.decode(cursor)
|
.decode(cursor)
|
||||||
.map_err(|_| type_error("invalid cursor"))?;
|
.map_err(|_| KvError::InvalidCursor)?;
|
||||||
|
|
||||||
let first_key: Vec<u8>;
|
let first_key: Vec<u8>;
|
||||||
let last_key: Vec<u8>;
|
let last_key: Vec<u8>;
|
||||||
|
@ -746,13 +824,13 @@ fn decode_selector_and_cursor(
|
||||||
// Defend against out-of-bounds reading
|
// Defend against out-of-bounds reading
|
||||||
if let Some(start) = selector.start() {
|
if let Some(start) = selector.start() {
|
||||||
if &first_key[..] < start {
|
if &first_key[..] < start {
|
||||||
return Err(type_error("cursor out of bounds"));
|
return Err(KvError::CursorOutOfBounds);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(end) = selector.end() {
|
if let Some(end) = selector.end() {
|
||||||
if &last_key[..] > end {
|
if &last_key[..] > end {
|
||||||
return Err(type_error("cursor out of bounds"));
|
return Err(KvError::CursorOutOfBounds);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -767,15 +845,17 @@ async fn op_kv_atomic_write<DBH>(
|
||||||
#[serde] checks: Vec<V8KvCheck>,
|
#[serde] checks: Vec<V8KvCheck>,
|
||||||
#[serde] mutations: Vec<V8KvMutation>,
|
#[serde] mutations: Vec<V8KvMutation>,
|
||||||
#[serde] enqueues: Vec<V8Enqueue>,
|
#[serde] enqueues: Vec<V8Enqueue>,
|
||||||
) -> Result<Option<String>, AnyError>
|
) -> Result<Option<String>, KvError>
|
||||||
where
|
where
|
||||||
DBH: DatabaseHandler + 'static,
|
DBH: DatabaseHandler + 'static,
|
||||||
{
|
{
|
||||||
let current_timestamp = chrono::Utc::now();
|
let current_timestamp = chrono::Utc::now();
|
||||||
let db = {
|
let db = {
|
||||||
let state = state.borrow();
|
let state = state.borrow();
|
||||||
let resource =
|
let resource = state
|
||||||
state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
|
.resource_table
|
||||||
|
.get::<DatabaseResource<DBH::DB>>(rid)
|
||||||
|
.map_err(KvError::Resource)?;
|
||||||
resource.db.clone()
|
resource.db.clone()
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -785,34 +865,28 @@ where
|
||||||
};
|
};
|
||||||
|
|
||||||
if checks.len() > config.max_checks {
|
if checks.len() > config.max_checks {
|
||||||
return Err(type_error(format!(
|
return Err(KvError::TooManyChecks(config.max_checks));
|
||||||
"Too many checks (max {})",
|
|
||||||
config.max_checks
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if mutations.len() + enqueues.len() > config.max_mutations {
|
if mutations.len() + enqueues.len() > config.max_mutations {
|
||||||
return Err(type_error(format!(
|
return Err(KvError::TooManyMutations(config.max_mutations));
|
||||||
"Too many mutations (max {})",
|
|
||||||
config.max_mutations
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let checks = checks
|
let checks = checks
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(check_from_v8)
|
.map(check_from_v8)
|
||||||
.collect::<Result<Vec<Check>, AnyError>>()
|
.collect::<Result<Vec<Check>, KvCheckError>>()
|
||||||
.with_context(|| "invalid check")?;
|
.map_err(KvError::InvalidCheck)?;
|
||||||
let mutations = mutations
|
let mutations = mutations
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|mutation| mutation_from_v8((mutation, current_timestamp)))
|
.map(|mutation| mutation_from_v8((mutation, current_timestamp)))
|
||||||
.collect::<Result<Vec<Mutation>, AnyError>>()
|
.collect::<Result<Vec<Mutation>, KvMutationError>>()
|
||||||
.with_context(|| "Invalid mutation")?;
|
.map_err(KvError::InvalidMutation)?;
|
||||||
let enqueues = enqueues
|
let enqueues = enqueues
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|e| enqueue_from_v8(e, current_timestamp))
|
.map(|e| enqueue_from_v8(e, current_timestamp))
|
||||||
.collect::<Result<Vec<Enqueue>, AnyError>>()
|
.collect::<Result<Vec<Enqueue>, std::io::Error>>()
|
||||||
.with_context(|| "invalid enqueue")?;
|
.map_err(KvError::InvalidEnqueue)?;
|
||||||
|
|
||||||
let mut total_payload_size = 0usize;
|
let mut total_payload_size = 0usize;
|
||||||
let mut total_key_size = 0usize;
|
let mut total_key_size = 0usize;
|
||||||
|
@ -823,7 +897,7 @@ where
|
||||||
.chain(mutations.iter().map(|m| &m.key))
|
.chain(mutations.iter().map(|m| &m.key))
|
||||||
{
|
{
|
||||||
if key.is_empty() {
|
if key.is_empty() {
|
||||||
return Err(type_error("key cannot be empty"));
|
return Err(KvError::EmptyKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
total_payload_size += check_write_key_size(key, &config)?;
|
total_payload_size += check_write_key_size(key, &config)?;
|
||||||
|
@ -847,17 +921,13 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
if total_payload_size > config.max_total_mutation_size_bytes {
|
if total_payload_size > config.max_total_mutation_size_bytes {
|
||||||
return Err(type_error(format!(
|
return Err(KvError::TotalMutationTooLarge(
|
||||||
"Total mutation size too large (max {} bytes)",
|
config.max_total_mutation_size_bytes,
|
||||||
config.max_total_mutation_size_bytes
|
));
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if total_key_size > config.max_total_key_size_bytes {
|
if total_key_size > config.max_total_key_size_bytes {
|
||||||
return Err(type_error(format!(
|
return Err(KvError::TotalKeyTooLarge(config.max_total_key_size_bytes));
|
||||||
"Total key size too large (max {} bytes)",
|
|
||||||
config.max_total_key_size_bytes
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let atomic_write = AtomicWrite {
|
let atomic_write = AtomicWrite {
|
||||||
|
@ -866,7 +936,7 @@ where
|
||||||
enqueues,
|
enqueues,
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = db.atomic_write(atomic_write).await?;
|
let result = db.atomic_write(atomic_write).await.map_err(KvError::Kv)?;
|
||||||
|
|
||||||
Ok(result.map(|res| faster_hex::hex_string(&res.versionstamp)))
|
Ok(result.map(|res| faster_hex::hex_string(&res.versionstamp)))
|
||||||
}
|
}
|
||||||
|
@ -879,19 +949,16 @@ type EncodeCursorRangeSelector = (Option<KvKey>, Option<KvKey>, Option<KvKey>);
|
||||||
fn op_kv_encode_cursor(
|
fn op_kv_encode_cursor(
|
||||||
#[serde] (prefix, start, end): EncodeCursorRangeSelector,
|
#[serde] (prefix, start, end): EncodeCursorRangeSelector,
|
||||||
#[serde] boundary_key: KvKey,
|
#[serde] boundary_key: KvKey,
|
||||||
) -> Result<String, AnyError> {
|
) -> Result<String, KvError> {
|
||||||
let selector = RawSelector::from_tuple(prefix, start, end)?;
|
let selector = RawSelector::from_tuple(prefix, start, end)?;
|
||||||
let boundary_key = encode_v8_key(boundary_key)?;
|
let boundary_key = encode_v8_key(boundary_key)?;
|
||||||
let cursor = encode_cursor(&selector, &boundary_key)?;
|
let cursor = encode_cursor(&selector, &boundary_key)?;
|
||||||
Ok(cursor)
|
Ok(cursor)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_read_key_size(key: &[u8], config: &KvConfig) -> Result<(), AnyError> {
|
fn check_read_key_size(key: &[u8], config: &KvConfig) -> Result<(), KvError> {
|
||||||
if key.len() > config.max_read_key_size_bytes {
|
if key.len() > config.max_read_key_size_bytes {
|
||||||
Err(type_error(format!(
|
Err(KvError::KeyTooLargeToRead(config.max_read_key_size_bytes))
|
||||||
"Key too large for read (max {} bytes)",
|
|
||||||
config.max_read_key_size_bytes
|
|
||||||
)))
|
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -900,12 +967,9 @@ fn check_read_key_size(key: &[u8], config: &KvConfig) -> Result<(), AnyError> {
|
||||||
fn check_write_key_size(
|
fn check_write_key_size(
|
||||||
key: &[u8],
|
key: &[u8],
|
||||||
config: &KvConfig,
|
config: &KvConfig,
|
||||||
) -> Result<usize, AnyError> {
|
) -> Result<usize, KvError> {
|
||||||
if key.len() > config.max_write_key_size_bytes {
|
if key.len() > config.max_write_key_size_bytes {
|
||||||
Err(type_error(format!(
|
Err(KvError::KeyTooLargeToWrite(config.max_write_key_size_bytes))
|
||||||
"Key too large for write (max {} bytes)",
|
|
||||||
config.max_write_key_size_bytes
|
|
||||||
)))
|
|
||||||
} else {
|
} else {
|
||||||
Ok(key.len())
|
Ok(key.len())
|
||||||
}
|
}
|
||||||
|
@ -914,7 +978,7 @@ fn check_write_key_size(
|
||||||
fn check_value_size(
|
fn check_value_size(
|
||||||
value: &KvValue,
|
value: &KvValue,
|
||||||
config: &KvConfig,
|
config: &KvConfig,
|
||||||
) -> Result<usize, AnyError> {
|
) -> Result<usize, KvError> {
|
||||||
let payload = match value {
|
let payload = match value {
|
||||||
KvValue::Bytes(x) => x,
|
KvValue::Bytes(x) => x,
|
||||||
KvValue::V8(x) => x,
|
KvValue::V8(x) => x,
|
||||||
|
@ -922,10 +986,7 @@ fn check_value_size(
|
||||||
};
|
};
|
||||||
|
|
||||||
if payload.len() > config.max_value_size_bytes {
|
if payload.len() > config.max_value_size_bytes {
|
||||||
Err(type_error(format!(
|
Err(KvError::ValueTooLarge(config.max_value_size_bytes))
|
||||||
"Value too large (max {} bytes)",
|
|
||||||
config.max_value_size_bytes
|
|
||||||
)))
|
|
||||||
} else {
|
} else {
|
||||||
Ok(payload.len())
|
Ok(payload.len())
|
||||||
}
|
}
|
||||||
|
@ -934,12 +995,9 @@ fn check_value_size(
|
||||||
fn check_enqueue_payload_size(
|
fn check_enqueue_payload_size(
|
||||||
payload: &[u8],
|
payload: &[u8],
|
||||||
config: &KvConfig,
|
config: &KvConfig,
|
||||||
) -> Result<usize, AnyError> {
|
) -> Result<usize, KvError> {
|
||||||
if payload.len() > config.max_value_size_bytes {
|
if payload.len() > config.max_value_size_bytes {
|
||||||
Err(type_error(format!(
|
Err(KvError::EnqueuePayloadTooLarge(config.max_value_size_bytes))
|
||||||
"enqueue payload too large (max {} bytes)",
|
|
||||||
config.max_value_size_bytes
|
|
||||||
)))
|
|
||||||
} else {
|
} else {
|
||||||
Ok(payload.len())
|
Ok(payload.len())
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,9 @@ use deno_ffi::DlfcnError;
|
||||||
use deno_ffi::IRError;
|
use deno_ffi::IRError;
|
||||||
use deno_ffi::ReprError;
|
use deno_ffi::ReprError;
|
||||||
use deno_ffi::StaticError;
|
use deno_ffi::StaticError;
|
||||||
|
use deno_kv::KvCheckError;
|
||||||
|
use deno_kv::KvError;
|
||||||
|
use deno_kv::KvMutationError;
|
||||||
use deno_net::ops::NetError;
|
use deno_net::ops::NetError;
|
||||||
use deno_tls::TlsError;
|
use deno_tls::TlsError;
|
||||||
use deno_webstorage::WebStorageError;
|
use deno_webstorage::WebStorageError;
|
||||||
|
@ -293,6 +296,47 @@ fn get_broadcast_channel_error(error: &BroadcastChannelError) -> &'static str {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_kv_error(error: &KvError) -> &'static str {
|
||||||
|
match error {
|
||||||
|
KvError::DatabaseHandler(e) | KvError::Resource(e) | KvError::Kv(e) => {
|
||||||
|
get_error_class_name(e).unwrap_or("Error")
|
||||||
|
}
|
||||||
|
KvError::TooManyRanges(_) => "TypeError",
|
||||||
|
KvError::TooManyEntries(_) => "TypeError",
|
||||||
|
KvError::TooManyChecks(_) => "TypeError",
|
||||||
|
KvError::TooManyMutations(_) => "TypeError",
|
||||||
|
KvError::TooManyKeys(_) => "TypeError",
|
||||||
|
KvError::InvalidLimit => "TypeError",
|
||||||
|
KvError::InvalidBoundaryKey => "TypeError",
|
||||||
|
KvError::KeyTooLargeToRead(_) => "TypeError",
|
||||||
|
KvError::KeyTooLargeToWrite(_) => "TypeError",
|
||||||
|
KvError::TotalMutationTooLarge(_) => "TypeError",
|
||||||
|
KvError::TotalKeyTooLarge(_) => "TypeError",
|
||||||
|
KvError::Io(e) => get_io_error_class(e),
|
||||||
|
KvError::QueueMessageNotFound => "TypeError",
|
||||||
|
KvError::StartKeyNotInKeyspace => "TypeError",
|
||||||
|
KvError::EndKeyNotInKeyspace => "TypeError",
|
||||||
|
KvError::StartKeyGreaterThanEndKey => "TypeError",
|
||||||
|
KvError::InvalidCheck(e) => match e {
|
||||||
|
KvCheckError::InvalidVersionstamp => "TypeError",
|
||||||
|
KvCheckError::Io(e) => get_io_error_class(e),
|
||||||
|
},
|
||||||
|
KvError::InvalidMutation(e) => match e {
|
||||||
|
KvMutationError::BigInt(_) => "Error",
|
||||||
|
KvMutationError::Io(e) => get_io_error_class(e),
|
||||||
|
KvMutationError::InvalidMutationWithValue(_) => "TypeError",
|
||||||
|
KvMutationError::InvalidMutationWithoutValue(_) => "TypeError",
|
||||||
|
},
|
||||||
|
KvError::InvalidEnqueue(e) => get_io_error_class(e),
|
||||||
|
KvError::EmptyKey => "TypeError",
|
||||||
|
KvError::ValueTooLarge(_) => "TypeError",
|
||||||
|
KvError::EnqueuePayloadTooLarge(_) => "TypeError",
|
||||||
|
KvError::InvalidCursor => "TypeError",
|
||||||
|
KvError::CursorOutOfBounds => "TypeError",
|
||||||
|
KvError::InvalidRange => "TypeError",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn get_net_error(error: &NetError) -> &'static str {
|
fn get_net_error(error: &NetError) -> &'static str {
|
||||||
match error {
|
match error {
|
||||||
NetError::ListenerClosed => "BadResource",
|
NetError::ListenerClosed => "BadResource",
|
||||||
|
@ -359,6 +403,7 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
|
||||||
.or_else(|| e.downcast_ref::<CronError>().map(get_cron_error_class))
|
.or_else(|| e.downcast_ref::<CronError>().map(get_cron_error_class))
|
||||||
.or_else(|| e.downcast_ref::<CanvasError>().map(get_canvas_error))
|
.or_else(|| e.downcast_ref::<CanvasError>().map(get_canvas_error))
|
||||||
.or_else(|| e.downcast_ref::<CacheError>().map(get_cache_error))
|
.or_else(|| e.downcast_ref::<CacheError>().map(get_cache_error))
|
||||||
|
.or_else(|| e.downcast_ref::<KvError>().map(get_kv_error))
|
||||||
.or_else(|| e.downcast_ref::<NetError>().map(get_net_error))
|
.or_else(|| e.downcast_ref::<NetError>().map(get_net_error))
|
||||||
.or_else(|| {
|
.or_else(|| {
|
||||||
e.downcast_ref::<deno_net::io::MapError>()
|
e.downcast_ref::<deno_net::io::MapError>()
|
||||||
|
|
Loading…
Reference in a new issue