1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-28 16:20:57 -05:00

fix(kv) increase number of allowed mutations in atomic (#20126)

fixes #19741

Impose a limit on the total atomic payload size
This commit is contained in:
Igor Zinkovsky 2023-08-26 18:26:09 -07:00 committed by GitHub
parent d104a09f79
commit e4cebf3e0d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 73 additions and 13 deletions

View file

@ -1214,6 +1214,28 @@ dbTest("operation size limit", async (db) => {
"too many checks (max 10)",
);
const validMutateKeys: Deno.KvKey[] = new Array(1000).fill(0).map((
_,
i,
) => ["a", i]);
const invalidMutateKeys: Deno.KvKey[] = new Array(1001).fill(0).map((
_,
i,
) => ["a", i]);
const res4 = await db.atomic()
.check(...lastValidKeys.map((key) => ({
key,
versionstamp: null,
})))
.mutate(...validMutateKeys.map((key) => ({
key,
type: "set",
value: 1,
} satisfies Deno.KvMutation)))
.commit();
assert(res4);
await assertRejects(
async () => {
await db.atomic()
@ -1221,7 +1243,7 @@ dbTest("operation size limit", async (db) => {
key,
versionstamp: null,
})))
.mutate(...firstInvalidKeys.map((key) => ({
.mutate(...invalidMutateKeys.map((key) => ({
key,
type: "set",
value: 1,
@ -1229,7 +1251,35 @@ dbTest("operation size limit", async (db) => {
.commit();
},
TypeError,
"too many mutations (max 10)",
"too many mutations (max 1000)",
);
});
dbTest("total mutation size limit", async (db) => {
const keys: Deno.KvKey[] = new Array(1000).fill(0).map((
_,
i,
) => ["a", i]);
const atomic = db.atomic();
for (const key of keys) {
atomic.set(key, "foo");
}
const res = await atomic.commit();
assert(res);
// Use bigger values to trigger "total mutation size too large" error
await assertRejects(
async () => {
const value = new Array(3000).fill("a").join("");
const atomic = db.atomic();
for (const key of keys) {
atomic.set(key, value);
}
await atomic.commit();
},
TypeError,
"total mutation size too large (max 819200 bytes)",
);
});

View file

@ -38,7 +38,8 @@ const MAX_VALUE_SIZE_BYTES: usize = 65536;
const MAX_READ_RANGES: usize = 10;
const MAX_READ_ENTRIES: usize = 1000;
const MAX_CHECKS: usize = 10;
const MAX_MUTATIONS: usize = 10;
const MAX_MUTATIONS: usize = 1000;
const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 819200;
struct UnstableChecker {
pub unstable: bool,
@ -638,6 +639,8 @@ where
.collect::<Result<Vec<Enqueue>, AnyError>>()
.with_context(|| "invalid enqueue")?;
let mut total_payload_size = 0usize;
for key in checks
.iter()
.map(|c| &c.key)
@ -647,15 +650,22 @@ where
return Err(type_error("key cannot be empty"));
}
check_write_key_size(key)?;
total_payload_size += check_write_key_size(key)?;
}
for value in mutations.iter().flat_map(|m| m.kind.value()) {
check_value_size(value)?;
total_payload_size += check_value_size(value)?;
}
for enqueue in &enqueues {
check_enqueue_payload_size(&enqueue.payload)?;
total_payload_size += check_enqueue_payload_size(&enqueue.payload)?;
}
if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES {
return Err(type_error(format!(
"total mutation size too large (max {} bytes)",
MAX_TOTAL_MUTATION_SIZE_BYTES
)));
}
let atomic_write = AtomicWrite {
@ -694,22 +704,22 @@ fn check_read_key_size(key: &[u8]) -> Result<(), AnyError> {
}
}
fn check_write_key_size(key: &[u8]) -> Result<(), AnyError> {
fn check_write_key_size(key: &[u8]) -> Result<usize, AnyError> {
if key.len() > MAX_WRITE_KEY_SIZE_BYTES {
Err(type_error(format!(
"key too large for write (max {} bytes)",
MAX_WRITE_KEY_SIZE_BYTES
)))
} else {
Ok(())
Ok(key.len())
}
}
fn check_value_size(value: &Value) -> Result<(), AnyError> {
fn check_value_size(value: &Value) -> Result<usize, AnyError> {
let payload = match value {
Value::Bytes(x) => x,
Value::V8(x) => x,
Value::U64(_) => return Ok(()),
Value::U64(_) => return Ok(8),
};
if payload.len() > MAX_VALUE_SIZE_BYTES {
@ -718,17 +728,17 @@ fn check_value_size(value: &Value) -> Result<(), AnyError> {
MAX_VALUE_SIZE_BYTES
)))
} else {
Ok(())
Ok(payload.len())
}
}
fn check_enqueue_payload_size(payload: &[u8]) -> Result<(), AnyError> {
fn check_enqueue_payload_size(payload: &[u8]) -> Result<usize, AnyError> {
if payload.len() > MAX_VALUE_SIZE_BYTES {
Err(type_error(format!(
"enqueue payload too large (max {} bytes)",
MAX_VALUE_SIZE_BYTES
)))
} else {
Ok(())
Ok(payload.len())
}
}