diff --git a/cli/tests/unit/kv_queue_undelivered_test.ts b/cli/tests/unit/kv_queue_undelivered_test.ts index 2b1a2b0aa1..27d9505b88 100644 --- a/cli/tests/unit/kv_queue_undelivered_test.ts +++ b/cli/tests/unit/kv_queue_undelivered_test.ts @@ -43,8 +43,9 @@ queueTest("queue with undelivered", async (db) => { try { await db.enqueue("test", { keysIfUndelivered: [["queue_failed", "a"], ["queue_failed", "b"]], + backoffSchedule: [10, 20], }); - await sleep(100000); + await sleep(3000); const undelivered = await collect(db.list({ prefix: ["queue_failed"] })); assertEquals(undelivered.length, 2); assertEquals(undelivered[0].key, ["queue_failed", "a"]); diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 68b3c40139..f46099ed17 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -1600,6 +1600,24 @@ queueTest("queue retries", async (db) => { assertEquals(4, count); }); +queueTest("queue retries with backoffSchedule", async (db) => { + let count = 0; + const listener = db.listenQueue((_msg) => { + count += 1; + throw new TypeError("dequeue error"); + }); + try { + await db.enqueue("test", { backoffSchedule: [1] }); + await sleep(2000); + } finally { + db.close(); + await listener; + } + + // There should have been 1 attempt + 1 retry + assertEquals(2, count); +}); + queueTest("multiple listenQueues", async (db) => { const numListens = 10; let count = 0; @@ -1876,6 +1894,23 @@ Deno.test({ }, }); +dbTest("invalid backoffSchedule", async (db) => { + await assertRejects( + async () => { + await db.enqueue("foo", { backoffSchedule: [1, 1, 1, 1, 1, 1] }); + }, + TypeError, + "invalid backoffSchedule", + ); + await assertRejects( + async () => { + await db.enqueue("foo", { backoffSchedule: [3600001] }); + }, + TypeError, + "invalid backoffSchedule", + ); +}); + dbTest("atomic operation is exposed", (db) => { assert(Deno.AtomicOperation); const ao = db.atomic(); diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index 1858f6cd00..7778870e15 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1786,7 +1786,11 @@ declare namespace Deno { */ enqueue( value: unknown, - options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + options?: { + delay?: number; + keysIfUndelivered?: Deno.KvKey[]; + backoffSchedule?: number[]; + }, ): this; /** * Commit the operation to the KV store. Returns a value indicating whether @@ -1995,14 +1999,28 @@ declare namespace Deno { * listener after several attempts. The values are set to the value of * the queued message. * + * The `backoffSchedule` option can be used to specify the retry policy for + * failed message delivery. Each element in the array represents the number of + * milliseconds to wait before retrying the delivery. For example, + * `[1000, 5000, 10000]` means that a failed delivery will be retried + * at most 3 times, with 1 second, 5 seconds, and 10 seconds delay + * between each retry. + * * ```ts * const db = await Deno.openKv(); - * await db.enqueue("bar", { keysIfUndelivered: [["foo", "bar"]] }); + * await db.enqueue("bar", { + * keysIfUndelivered: [["foo", "bar"]], + * backoffSchedule: [1000, 5000, 10000], + * }); * ``` */ enqueue( value: unknown, - options?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + options?: { + delay?: number; + keysIfUndelivered?: Deno.KvKey[]; + backoffSchedule?: number[]; + }, ): Promise; /** diff --git a/ext/kv/01_db.ts b/ext/kv/01_db.ts index 2b0e141f8f..18d1907183 100644 --- a/ext/kv/01_db.ts +++ b/ext/kv/01_db.ts @@ -43,6 +43,20 @@ function validateQueueDelay(delay: number) { } } +const maxQueueBackoffIntervals = 5; +const maxQueueBackoffInterval = 60 * 60 * 1000; + +function validateBackoffSchedule(backoffSchedule: number[]) { + if (backoffSchedule.length > maxQueueBackoffIntervals) { + throw new TypeError("invalid backoffSchedule"); + } + for (const interval of backoffSchedule) { + if (interval < 0 || interval > maxQueueBackoffInterval || isNaN(interval)) { + throw new TypeError("invalid backoffSchedule"); + } + } +} + interface RawKvEntry { key: Deno.KvKey; value: RawValue; @@ -224,18 +238,25 @@ class Kv { async enqueue( message: unknown, - opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + opts?: { + delay?: number; + keysIfUndelivered?: Deno.KvKey[]; + backoffSchedule?: number[]; + }, ) { if (opts?.delay !== undefined) { validateQueueDelay(opts?.delay); } + if (opts?.backoffSchedule !== undefined) { + validateBackoffSchedule(opts?.backoffSchedule); + } const enqueues = [ [ core.serialize(message, { forStorage: true }), opts?.delay ?? 0, opts?.keysIfUndelivered ?? [], - null, + opts?.backoffSchedule ?? null, ], ]; @@ -468,16 +489,23 @@ class AtomicOperation { enqueue( message: unknown, - opts?: { delay?: number; keysIfUndelivered?: Deno.KvKey[] }, + opts?: { + delay?: number; + keysIfUndelivered?: Deno.KvKey[]; + backoffSchedule?: number[]; + }, ): this { if (opts?.delay !== undefined) { validateQueueDelay(opts?.delay); } + if (opts?.backoffSchedule !== undefined) { + validateBackoffSchedule(opts?.backoffSchedule); + } this.#enqueues.push([ core.serialize(message, { forStorage: true }), opts?.delay ?? 0, opts?.keysIfUndelivered ?? [], - null, + opts?.backoffSchedule ?? null, ]); return this; } diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index 456a1ebf7a..943aae460e 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -802,6 +802,9 @@ where for enqueue in &enqueues { total_payload_size += check_enqueue_payload_size(&enqueue.payload)?; + if let Some(schedule) = enqueue.backoff_schedule.as_ref() { + total_payload_size += 4 * schedule.len(); + } } if total_payload_size > MAX_TOTAL_MUTATION_SIZE_BYTES {