From e4cebf3e0da539321727b0f0e43ddce5679635b1 Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Sat, 26 Aug 2023 18:26:09 -0700 Subject: [PATCH] fix(kv) increase number of allowed mutations in atomic (#20126) fixes #19741 Impose a limit on the total atomic payload size --- cli/tests/unit/kv_test.ts | 54 +++++++++++++++++++++++++++++++++++++-- ext/kv/lib.rs | 32 +++++++++++++++-------- 2 files changed, 73 insertions(+), 13 deletions(-) diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index acda9a0e2e..de764cf805 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -1214,6 +1214,28 @@ dbTest("operation size limit", async (db) => { "too many checks (max 10)", ); + const validMutateKeys: Deno.KvKey[] = new Array(1000).fill(0).map(( + _, + i, + ) => ["a", i]); + const invalidMutateKeys: Deno.KvKey[] = new Array(1001).fill(0).map(( + _, + i, + ) => ["a", i]); + + const res4 = await db.atomic() + .check(...lastValidKeys.map((key) => ({ + key, + versionstamp: null, + }))) + .mutate(...validMutateKeys.map((key) => ({ + key, + type: "set", + value: 1, + } satisfies Deno.KvMutation))) + .commit(); + assert(res4); + await assertRejects( async () => { await db.atomic() @@ -1221,7 +1243,7 @@ dbTest("operation size limit", async (db) => { key, versionstamp: null, }))) - .mutate(...firstInvalidKeys.map((key) => ({ + .mutate(...invalidMutateKeys.map((key) => ({ key, type: "set", value: 1, @@ -1229,7 +1251,35 @@ dbTest("operation size limit", async (db) => { .commit(); }, TypeError, - "too many mutations (max 10)", + "too many mutations (max 1000)", + ); +}); + +dbTest("total mutation size limit", async (db) => { + const keys: Deno.KvKey[] = new Array(1000).fill(0).map(( + _, + i, + ) => ["a", i]); + + const atomic = db.atomic(); + for (const key of keys) { + atomic.set(key, "foo"); + } + const res = await atomic.commit(); + assert(res); + + // Use bigger values to trigger "total mutation size too large" error + await assertRejects( + async () => { + const value = new Array(3000).fill("a").join(""); + const atomic = db.atomic(); + for (const key of keys) { + atomic.set(key, value); + } + await atomic.commit(); + }, + TypeError, + "total mutation size too large (max 819200 bytes)", ); }); diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index f226b11ae7..ab78fe4c34 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -38,7 +38,8 @@ const MAX_VALUE_SIZE_BYTES: usize = 65536; const MAX_READ_RANGES: usize = 10; const MAX_READ_ENTRIES: usize = 1000; const MAX_CHECKS: usize = 10; -const MAX_MUTATIONS: usize = 10; +const MAX_MUTATIONS: usize = 1000; +const MAX_TOTAL_MUTATION_SIZE_BYTES: usize = 819200; struct UnstableChecker { pub unstable: bool, @@ -638,6 +639,8 @@ where .collect::, AnyError>>() .with_context(|| "invalid enqueue")?; + let mut total_payload_size = 0usize; + for key in checks .iter() .map(|c| &c.key) @@ -647,15 +650,22 @@ where return Err(type_error("key cannot be empty")); } - check_write_key_size(key)?; + total_payload_size += check_write_key_size(key)?; } for value in mutations.iter().flat_map(|m| m.kind.value()) { - check_value_size(value)?; + total_payload_size += check_value_size(value)?; } for enqueue in &enqueues { - check_enqueue_payload_size(&enqueue.payload)?; + total_payload_size += check_enqueue_payload_size(&enqueue.payload)?; + } + + if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES { + return Err(type_error(format!( + "total mutation size too large (max {} bytes)", + MAX_TOTAL_MUTATION_SIZE_BYTES + ))); } let atomic_write = AtomicWrite { @@ -694,22 +704,22 @@ fn check_read_key_size(key: &[u8]) -> Result<(), AnyError> { } } -fn check_write_key_size(key: &[u8]) -> Result<(), AnyError> { +fn check_write_key_size(key: &[u8]) -> Result { if key.len() > MAX_WRITE_KEY_SIZE_BYTES { Err(type_error(format!( "key too large for write (max {} bytes)", MAX_WRITE_KEY_SIZE_BYTES ))) } else { - Ok(()) + Ok(key.len()) } } -fn check_value_size(value: &Value) -> Result<(), AnyError> { +fn check_value_size(value: &Value) -> Result { let payload = match value { Value::Bytes(x) => x, Value::V8(x) => x, - Value::U64(_) => return Ok(()), + Value::U64(_) => return Ok(8), }; if payload.len() > MAX_VALUE_SIZE_BYTES { @@ -718,17 +728,17 @@ fn check_value_size(value: &Value) -> Result<(), AnyError> { MAX_VALUE_SIZE_BYTES ))) } else { - Ok(()) + Ok(payload.len()) } } -fn check_enqueue_payload_size(payload: &[u8]) -> Result<(), AnyError> { +fn check_enqueue_payload_size(payload: &[u8]) -> Result { if payload.len() > MAX_VALUE_SIZE_BYTES { Err(type_error(format!( "enqueue payload too large (max {} bytes)", MAX_VALUE_SIZE_BYTES ))) } else { - Ok(()) + Ok(payload.len()) } }