1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-25 15:29:32 -05:00

chore(ext/kv): add limits (#18415)

This commit is contained in:
Heyang Zhou 2023-03-25 15:29:36 +08:00 committed by GitHub
parent 1c6b797383
commit 27834dfc10
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 260 additions and 13 deletions

View file

@ -1004,3 +1004,136 @@ dbTest("key ordering", async (db) => {
[true], [true],
]); ]);
}); });
dbTest("key size limit", async (db) => {
// 1 byte prefix + 1 byte suffix + 2045 bytes key
const lastValidKey = new Uint8Array(2046).fill(1);
const firstInvalidKey = new Uint8Array(2047).fill(1);
await db.set([lastValidKey], 1);
assertEquals(await db.get([lastValidKey]), {
key: [lastValidKey],
value: 1,
versionstamp: "00000000000000010000",
});
await assertRejects(
async () => await db.set([firstInvalidKey], 1),
TypeError,
"key too large for write (max 2048 bytes)",
);
await assertRejects(
async () => await db.get([firstInvalidKey]),
TypeError,
"key too large for read (max 2049 bytes)",
);
});
dbTest("value size limit", async (db) => {
const lastValidValue = new Uint8Array(65536);
const firstInvalidValue = new Uint8Array(65537);
await db.set(["a"], lastValidValue);
assertEquals(await db.get(["a"]), {
key: ["a"],
value: lastValidValue,
versionstamp: "00000000000000010000",
});
await assertRejects(
async () => await db.set(["b"], firstInvalidValue),
TypeError,
"value too large (max 65536 bytes)",
);
});
dbTest("operation size limit", async (db) => {
const lastValidKeys: Deno.KvKey[] = new Array(10).fill(0).map((
_,
i,
) => ["a", i]);
const firstInvalidKeys: Deno.KvKey[] = new Array(11).fill(0).map((
_,
i,
) => ["a", i]);
assertEquals((await db.getMany(lastValidKeys)).length, 10);
await assertRejects(
async () => await db.getMany(firstInvalidKeys),
TypeError,
"too many ranges (max 10)",
);
assertEquals(
(await collect(db.list({
prefix: ["a"],
}, {
batchSize: 1000,
}))).length,
0,
);
assertRejects(
async () =>
await collect(db.list({
prefix: ["a"],
}, {
batchSize: 1001,
})),
TypeError,
"too many entries (max 1000)",
);
// when batchSize is not specified, limit is used but is clamped to 500
assertEquals(
(await collect(db.list({
prefix: ["a"],
}, {
limit: 1001,
}))).length,
0,
);
assertEquals(
await db.atomic().check(...lastValidKeys.map((key) => ({
key,
versionstamp: null,
}))).mutate(...lastValidKeys.map((key) => ({
key,
type: "set",
value: 1,
} satisfies Deno.KvMutation))).commit(),
true,
);
await assertRejects(
async () =>
await db.atomic().check(...firstInvalidKeys.map((key) => ({
key,
versionstamp: null,
}))).mutate(...lastValidKeys.map((key) => ({
key,
type: "set",
value: 1,
} satisfies Deno.KvMutation))).commit(),
TypeError,
"too many checks (max 10)",
);
await assertRejects(
async () =>
await db.atomic().check(...lastValidKeys.map((key) => ({
key,
versionstamp: null,
}))).mutate(...firstInvalidKeys.map((key) => ({
key,
type: "set",
value: 1,
} satisfies Deno.KvMutation))).commit(),
TypeError,
"too many mutations (max 10)",
);
});

View file

@ -1862,7 +1862,7 @@ declare namespace Deno {
* matches an expected versionstamp. * matches an expected versionstamp.
* *
* Keys have a maximum length of 2048 bytes after serialization. Values have a * Keys have a maximum length of 2048 bytes after serialization. Values have a
* maximum length of 16 KiB after serialization. Serialization of both keys * maximum length of 64 KiB after serialization. Serialization of both keys
* and values is somewhat opaque, but one can usually assume that the * and values is somewhat opaque, but one can usually assume that the
* serialization of any value is about the same length as the resulting string * serialization of any value is about the same length as the resulting string
* of a JSON serialization of that same value. * of a JSON serialization of that same value.

View file

@ -155,7 +155,7 @@ class Kv {
let batchSize = options.batchSize ?? (options.limit ?? 100); let batchSize = options.batchSize ?? (options.limit ?? 100);
if (batchSize <= 0) throw new Error("batchSize must be positive"); if (batchSize <= 0) throw new Error("batchSize must be positive");
if (batchSize > 500) batchSize = 500; if (options.batchSize === undefined && batchSize > 500) batchSize = 500;
return new KvListIterator({ return new KvListIterator({
limit: options.limit, limit: options.limit,

View file

@ -292,3 +292,15 @@ pub enum MutationKind {
Min(Value), Min(Value),
Max(Value), Max(Value),
} }
impl MutationKind {
pub fn value(&self) -> Option<&Value> {
match self {
MutationKind::Set(value) => Some(value),
MutationKind::Sum(value) => Some(value),
MutationKind::Min(value) => Some(value),
MutationKind::Max(value) => Some(value),
MutationKind::Delete => None,
}
}
}

View file

@ -27,6 +27,15 @@ use serde::Serialize;
pub use crate::interface::*; pub use crate::interface::*;
const MAX_WRITE_KEY_SIZE_BYTES: usize = 2048;
// range selectors can contain 0x00 or 0xff suffixes
const MAX_READ_KEY_SIZE_BYTES: usize = MAX_WRITE_KEY_SIZE_BYTES + 1;
const MAX_VALUE_SIZE_BYTES: usize = 65536;
const MAX_READ_RANGES: usize = 10;
const MAX_READ_ENTRIES: usize = 1000;
const MAX_CHECKS: usize = 10;
const MAX_MUTATIONS: usize = 10;
struct UnstableChecker { struct UnstableChecker {
pub unstable: bool, pub unstable: bool,
} }
@ -218,6 +227,16 @@ where
state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?; state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
resource.db.clone() resource.db.clone()
}; };
if ranges.len() > MAX_READ_RANGES {
return Err(type_error(format!(
"too many ranges (max {})",
MAX_READ_RANGES
)));
}
let mut total_entries = 0usize;
let read_ranges = ranges let read_ranges = ranges
.into_iter() .into_iter()
.map(|(prefix, start, end, limit, reverse, cursor)| { .map(|(prefix, start, end, limit, reverse, cursor)| {
@ -225,6 +244,10 @@ where
let (start, end) = let (start, end) =
decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?; decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?;
check_read_key_size(&start)?;
check_read_key_size(&end)?;
total_entries += limit as usize;
Ok(ReadRange { Ok(ReadRange {
start, start,
end, end,
@ -234,6 +257,14 @@ where
}) })
}) })
.collect::<Result<Vec<_>, AnyError>>()?; .collect::<Result<Vec<_>, AnyError>>()?;
if total_entries > MAX_READ_ENTRIES {
return Err(type_error(format!(
"too many entries (max {})",
MAX_READ_ENTRIES
)));
}
let opts = SnapshotReadOptions { let opts = SnapshotReadOptions {
consistency: consistency.into(), consistency: consistency.into(),
}; };
@ -499,32 +530,53 @@ where
resource.db.clone() resource.db.clone()
}; };
for key in checks if checks.len() > MAX_CHECKS {
.iter() return Err(type_error(format!("too many checks (max {})", MAX_CHECKS)));
.map(|c| &c.0) }
.chain(mutations.iter().map(|m| &m.0))
{ if mutations.len() + enqueues.len() > MAX_MUTATIONS {
if key.is_empty() { return Err(type_error(format!(
return Err(type_error("key cannot be empty")); "too many mutations (max {})",
} MAX_MUTATIONS
)));
} }
let checks = checks let checks = checks
.into_iter() .into_iter()
.map(TryInto::try_into) .map(TryInto::try_into)
.collect::<Result<_, AnyError>>() .collect::<Result<Vec<KvCheck>, AnyError>>()
.with_context(|| "invalid check")?; .with_context(|| "invalid check")?;
let mutations = mutations let mutations = mutations
.into_iter() .into_iter()
.map(TryInto::try_into) .map(TryInto::try_into)
.collect::<Result<_, AnyError>>() .collect::<Result<Vec<KvMutation>, AnyError>>()
.with_context(|| "invalid mutation")?; .with_context(|| "invalid mutation")?;
let enqueues = enqueues let enqueues = enqueues
.into_iter() .into_iter()
.map(TryInto::try_into) .map(TryInto::try_into)
.collect::<Result<_, AnyError>>() .collect::<Result<Vec<Enqueue>, AnyError>>()
.with_context(|| "invalid enqueue")?; .with_context(|| "invalid enqueue")?;
for key in checks
.iter()
.map(|c| &c.key)
.chain(mutations.iter().map(|m| &m.key))
{
if key.is_empty() {
return Err(type_error("key cannot be empty"));
}
check_write_key_size(key)?;
}
for value in mutations.iter().flat_map(|m| m.kind.value()) {
check_value_size(value)?;
}
for enqueue in &enqueues {
check_enqueue_payload_size(&enqueue.payload)?;
}
let atomic_write = AtomicWrite { let atomic_write = AtomicWrite {
checks, checks,
mutations, mutations,
@ -549,3 +601,53 @@ fn op_kv_encode_cursor(
let cursor = encode_cursor(&selector, &boundary_key)?; let cursor = encode_cursor(&selector, &boundary_key)?;
Ok(cursor) Ok(cursor)
} }
fn check_read_key_size(key: &[u8]) -> Result<(), AnyError> {
if key.len() > MAX_READ_KEY_SIZE_BYTES {
Err(type_error(format!(
"key too large for read (max {} bytes)",
MAX_READ_KEY_SIZE_BYTES
)))
} else {
Ok(())
}
}
fn check_write_key_size(key: &[u8]) -> Result<(), AnyError> {
if key.len() > MAX_WRITE_KEY_SIZE_BYTES {
Err(type_error(format!(
"key too large for write (max {} bytes)",
MAX_WRITE_KEY_SIZE_BYTES
)))
} else {
Ok(())
}
}
fn check_value_size(value: &Value) -> Result<(), AnyError> {
let payload = match value {
Value::Bytes(x) => x,
Value::V8(x) => x,
Value::U64(_) => return Ok(()),
};
if payload.len() > MAX_VALUE_SIZE_BYTES {
Err(type_error(format!(
"value too large (max {} bytes)",
MAX_VALUE_SIZE_BYTES
)))
} else {
Ok(())
}
}
fn check_enqueue_payload_size(payload: &[u8]) -> Result<(), AnyError> {
if payload.len() > MAX_VALUE_SIZE_BYTES {
Err(type_error(format!(
"enqueue payload too large (max {} bytes)",
MAX_VALUE_SIZE_BYTES
)))
} else {
Ok(())
}
}