1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-24 15:19:26 -05:00

feat(cron) implement Deno.cron() (#21019)

This PR adds unstable `Deno.cron` API to trigger execution of cron jobs.

* State: All cron state is in memory. Cron jobs are scheduled according
to the cron schedule expression and the current time. No state is
persisted to disk.
* Time zone: Cron expressions specify time in UTC.
* Overlapping executions: not permitted. If the next scheduled execution
time occurs while the same cron job is still executing, the scheduled
execution is skipped.
* Retries: failed jobs are automatically retried until they succeed or
until retry threshold is reached. Retry policy can be optionally
specified using `options.backoffSchedule`.
This commit is contained in:
Igor Zinkovsky 2023-11-01 11:57:55 -07:00 committed by GitHub
parent 82643857cc
commit 01d3e0f317
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 918 additions and 4 deletions

42
Cargo.lock generated
View file

@ -205,7 +205,7 @@ dependencies = [
"asn1-rs-derive",
"asn1-rs-impl",
"displaydoc",
"nom",
"nom 7.1.3",
"num-traits",
"rusticata-macros",
"thiserror",
@ -1280,6 +1280,19 @@ dependencies = [
"v8",
]
[[package]]
name = "deno_cron"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"chrono",
"deno_core",
"deno_unsync 0.1.1",
"saffron",
"tokio",
]
[[package]]
name = "deno_crypto"
version = "0.135.0"
@ -1696,6 +1709,7 @@ dependencies = [
"deno_cache",
"deno_console",
"deno_core",
"deno_cron",
"deno_crypto",
"deno_fetch",
"deno_ffi",
@ -1967,7 +1981,7 @@ checksum = "dbd676fbbab537128ef0278adb5576cf363cff6aa22a7b24effe97347cfab61e"
dependencies = [
"asn1-rs",
"displaydoc",
"nom",
"nom 7.1.3",
"num-bigint",
"num-traits",
"rusticata-macros",
@ -3680,6 +3694,16 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "nom"
version = "5.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08959a387a676302eebf4ddbcbc611da04285579f76f88ee0506c63b1a61dd4b"
dependencies = [
"memchr",
"version_check",
]
[[package]]
name = "nom"
version = "7.1.3"
@ -4747,7 +4771,7 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faf0c4a6ece9950b9abdb62b1cfcf2a68b3b67a10ba445b3bb85be2a293d0632"
dependencies = [
"nom",
"nom 7.1.3",
]
[[package]]
@ -4888,6 +4912,16 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072"
[[package]]
name = "saffron"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03fb9a628596fc7590eb7edbf7b0613287be78df107f5f97b118aad59fb2eea9"
dependencies = [
"chrono",
"nom 5.1.3",
]
[[package]]
name = "salsa20"
version = "0.10.2"
@ -7033,7 +7067,7 @@ dependencies = [
"data-encoding",
"der-parser",
"lazy_static",
"nom",
"nom 7.1.3",
"oid-registry",
"rusticata-macros",
"thiserror",

View file

@ -13,6 +13,7 @@ members = [
"ext/broadcast_channel",
"ext/cache",
"ext/console",
"ext/cron",
"ext/crypto",
"ext/fetch",
"ext/ffi",
@ -56,6 +57,7 @@ denokv_remote = "0.2.3"
deno_broadcast_channel = { version = "0.115.0", path = "./ext/broadcast_channel" }
deno_cache = { version = "0.53.0", path = "./ext/cache" }
deno_console = { version = "0.121.0", path = "./ext/console" }
deno_cron = { version = "0.1.0", path = "./ext/cron" }
deno_crypto = { version = "0.135.0", path = "./ext/crypto" }
deno_fetch = { version = "0.145.0", path = "./ext/fetch" }
deno_ffi = { version = "0.108.0", path = "./ext/ffi" }
@ -133,6 +135,7 @@ rustls-webpki = "0.101.4"
rustls-native-certs = "0.6.2"
webpki-roots = "0.25.2"
scopeguard = "1.2.0"
saffron = "=0.1.0"
serde = { version = "1.0.149", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1.0.85"

View file

@ -349,6 +349,7 @@ deno_core::extension!(
fn create_cli_snapshot(snapshot_path: PathBuf) -> CreateSnapshotOutput {
use deno_core::Extension;
use deno_runtime::deno_cache::SqliteBackedCache;
use deno_runtime::deno_cron::local::LocalCronHandler;
use deno_runtime::deno_http::DefaultHttpPropertyExtractor;
use deno_runtime::deno_kv::sqlite::SqliteDbHandler;
use deno_runtime::permissions::PermissionsContainer;
@ -383,6 +384,7 @@ fn create_cli_snapshot(snapshot_path: PathBuf) -> CreateSnapshotOutput {
deno_kv::deno_kv::init_ops(SqliteDbHandler::<PermissionsContainer>::new(
None, None,
)),
deno_cron::deno_cron::init_ops(LocalCronHandler::new()),
deno_napi::deno_napi::init_ops::<PermissionsContainer>(),
deno_http::deno_http::init_ops::<DefaultHttpPropertyExtractor>(),
deno_io::deno_io::init_ops(Default::default()),

View file

@ -24,6 +24,7 @@ util::unit_test_factory!(
console_test,
copy_file_test,
custom_event_test,
cron_test,
dir_test,
dom_exception_test,
error_stack_test,

242
cli/tests/unit/cron_test.ts Normal file
View file

@ -0,0 +1,242 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { assertEquals, assertThrows, deferred } from "./test_util.ts";
const sleep = (time: number) => new Promise((r) => setTimeout(r, time));
Deno.test(function noNameTest() {
assertThrows(
// @ts-ignore test
() => Deno.cron(),
TypeError,
"Deno.cron requires a unique name",
);
});
Deno.test(function noSchedule() {
assertThrows(
// @ts-ignore test
() => Deno.cron("foo"),
TypeError,
"Deno.cron requires a valid schedule",
);
});
Deno.test(function noHandler() {
assertThrows(
// @ts-ignore test
() => Deno.cron("foo", "*/1 * * * *"),
TypeError,
"Deno.cron requires a handler",
);
});
Deno.test(function invalidNameTest() {
assertThrows(
() => Deno.cron("abc[]", "*/1 * * * *", () => {}),
TypeError,
"Invalid cron name",
);
assertThrows(
() => Deno.cron("a**bc", "*/1 * * * *", () => {}),
TypeError,
"Invalid cron name",
);
assertThrows(
() => Deno.cron("abc<>", "*/1 * * * *", () => {}),
TypeError,
"Invalid cron name",
);
assertThrows(
() => Deno.cron(";']", "*/1 * * * *", () => {}),
TypeError,
"Invalid cron name",
);
assertThrows(
() =>
Deno.cron(
"0000000000000000000000000000000000000000000000000000000000000000000000",
"*/1 * * * *",
() => {},
),
TypeError,
"Cron name is too long",
);
});
Deno.test(function invalidScheduleTest() {
assertThrows(
() => Deno.cron("abc", "bogus", () => {}),
TypeError,
"Invalid cron schedule",
);
assertThrows(
() => Deno.cron("abc", "* * * * * *", () => {}),
TypeError,
"Invalid cron schedule",
);
assertThrows(
() => Deno.cron("abc", "* * * *", () => {}),
TypeError,
"Invalid cron schedule",
);
assertThrows(
() => Deno.cron("abc", "m * * * *", () => {}),
TypeError,
"Invalid cron schedule",
);
});
Deno.test(function invalidBackoffScheduleTest() {
assertThrows(
() =>
Deno.cron("abc", "*/1 * * * *", () => {}, {
backoffSchedule: [1, 1, 1, 1, 1, 1],
}),
TypeError,
"Invalid backoff schedule",
);
assertThrows(
() =>
Deno.cron("abc", "*/1 * * * *", () => {}, {
backoffSchedule: [3600001],
}),
TypeError,
"Invalid backoff schedule",
);
});
Deno.test(async function tooManyCrons() {
const crons: Promise<void>[] = [];
const ac = new AbortController();
for (let i = 0; i <= 100; i++) {
const c = Deno.cron(`abc_${i}`, "*/1 * * * *", () => {}, {
signal: ac.signal,
});
crons.push(c);
}
try {
assertThrows(
() => {
Deno.cron("next-cron", "*/1 * * * *", () => {}, { signal: ac.signal });
},
TypeError,
"Too many crons",
);
} finally {
ac.abort();
for (const c of crons) {
await c;
}
}
});
Deno.test(async function duplicateCrons() {
const ac = new AbortController();
const c = Deno.cron("abc", "*/20 * * * *", () => {
}, { signal: ac.signal });
try {
assertThrows(
() => Deno.cron("abc", "*/20 * * * *", () => {}),
TypeError,
"Cron with this name already exists",
);
} finally {
ac.abort();
await c;
}
});
Deno.test(async function basicTest() {
Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "100");
let count = 0;
const promise = deferred();
const ac = new AbortController();
const c = Deno.cron("abc", "*/20 * * * *", () => {
count++;
if (count > 5) {
promise.resolve();
}
}, { signal: ac.signal });
try {
await promise;
} finally {
ac.abort();
await c;
}
});
Deno.test(async function multipleCrons() {
Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "100");
let count0 = 0;
let count1 = 0;
const promise0 = deferred();
const promise1 = deferred();
const ac = new AbortController();
const c0 = Deno.cron("abc", "*/20 * * * *", () => {
count0++;
if (count0 > 5) {
promise0.resolve();
}
}, { signal: ac.signal });
const c1 = Deno.cron("xyz", "*/20 * * * *", () => {
count1++;
if (count1 > 5) {
promise1.resolve();
}
}, { signal: ac.signal });
try {
await promise0;
await promise1;
} finally {
ac.abort();
await c0;
await c1;
}
});
Deno.test(async function overlappingExecutions() {
Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "100");
let count = 0;
const promise0 = deferred();
const promise1 = deferred();
const ac = new AbortController();
const c = Deno.cron("abc", "*/20 * * * *", async () => {
promise0.resolve();
count++;
await promise1;
}, { signal: ac.signal });
try {
await promise0;
} finally {
await sleep(2000);
promise1.resolve();
ac.abort();
await c;
}
assertEquals(count, 1);
});
Deno.test(async function retriesWithBackkoffSchedule() {
Deno.env.set("DENO_CRON_TEST_SCHEDULE_OFFSET", "5000");
let count = 0;
const ac = new AbortController();
const c = Deno.cron("abc", "*/20 * * * *", async () => {
count += 1;
await sleep(10);
throw new TypeError("cron error");
}, { signal: ac.signal, backoffSchedule: [10, 20] });
try {
await sleep(6000);
} finally {
ac.abort();
await c;
}
// The cron should have executed 3 times (1st attempt and 2 retries).
assertEquals(count, 3);
});

View file

@ -1317,6 +1317,31 @@ declare namespace Deno {
*/
export function openKv(path?: string): Promise<Deno.Kv>;
/** **UNSTABLE**: New API, yet to be vetted.
*
* Create a cron job that will periodically execute the provided handler
* callback based on the specified schedule.
*
* ```ts
* Deno.cron("sample cron", "*\/20 * * * *", () => {
* console.log("cron job executed");
* });
* ```
* `backoffSchedule` option can be used to specify the retry policy for failed
* executions. Each element in the array represents the number of milliseconds
* to wait before retrying the execution. For example, `[1000, 5000, 10000]`
* means that a failed execution will be retried at most 3 times, with 1
* second, 5 seconds, and 10 seconds delay between each retry.
*
* @category Cron
*/
export function cron(
name: string,
schedule: string,
handler: () => Promise<void> | void,
options?: { backoffSchedule?: number[]; signal?: AbortSignal },
): Promise<void>;
/** **UNSTABLE**: New API, yet to be vetted.
*
* A key to be persisted in a {@linkcode Deno.Kv}. A key is a sequence

58
ext/cron/01_cron.ts Normal file
View file

@ -0,0 +1,58 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// @ts-ignore internal api
const core = Deno.core;
function cron(
name: string,
schedule: string,
handler: () => Promise<void> | void,
options?: { backoffSchedule?: number[]; signal?: AbortSignal },
) {
if (name === undefined) {
throw new TypeError("Deno.cron requires a unique name");
}
if (schedule === undefined) {
throw new TypeError("Deno.cron requires a valid schedule");
}
if (handler === undefined) {
throw new TypeError("Deno.cron requires a handler");
}
const rid = core.ops.op_cron_create(
name,
schedule,
options?.backoffSchedule,
);
if (options?.signal) {
const signal = options?.signal;
signal.addEventListener(
"abort",
() => {
core.close(rid);
},
{ once: true },
);
}
return (async () => {
let success = true;
while (true) {
const r = await core.opAsync("op_cron_next", rid, success);
if (r === false) {
break;
}
try {
const result = handler();
const _res = result instanceof Promise ? (await result) : result;
success = true;
} catch (error) {
console.error(`Exception in cron handler ${name}`, error);
success = false;
}
}
})();
}
export { cron };

23
ext/cron/Cargo.toml Normal file
View file

@ -0,0 +1,23 @@
# Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
[package]
name = "deno_cron"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
license.workspace = true
readme = "README.md"
repository.workspace = true
description = "Implementation of the Deno cron API"
[lib]
path = "lib.rs"
[dependencies]
anyhow.workspace = true
async-trait.workspace = true
chrono.workspace = true
deno_core.workspace = true
deno_unsync = "0.1.1"
saffron.workspace = true
tokio.workspace = true

23
ext/cron/interface.rs Normal file
View file

@ -0,0 +1,23 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use async_trait::async_trait;
use deno_core::error::AnyError;
pub trait CronHandler {
type EH: CronHandle + 'static;
fn create(&self, spec: CronSpec) -> Result<Self::EH, AnyError>;
}
#[async_trait(?Send)]
pub trait CronHandle {
async fn next(&self, prev_success: bool) -> Result<bool, AnyError>;
fn close(&self);
}
#[derive(Clone)]
pub struct CronSpec {
pub name: String,
pub cron_schedule: String,
pub backoff_schedule: Option<Vec<u32>>,
}

128
ext/cron/lib.rs Normal file
View file

@ -0,0 +1,128 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
mod interface;
pub mod local;
mod time;
use std::borrow::Cow;
use std::cell::RefCell;
use std::rc::Rc;
use deno_core::error::get_custom_error_class;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;
use deno_core::OpState;
use deno_core::Resource;
use deno_core::ResourceId;
pub use crate::interface::*;
pub const UNSTABLE_FEATURE_NAME: &str = "cron";
deno_core::extension!(deno_cron,
deps = [ deno_console ],
parameters = [ C: CronHandler ],
ops = [
op_cron_create<C>,
op_cron_next<C>,
],
esm = [ "01_cron.ts" ],
options = {
cron_handler: C,
},
state = |state, options| {
state.put(Rc::new(options.cron_handler));
}
);
struct CronResource<EH: CronHandle + 'static> {
handle: Rc<EH>,
}
impl<EH: CronHandle + 'static> Resource for CronResource<EH> {
fn name(&self) -> Cow<str> {
"cron".into()
}
fn close(self: Rc<Self>) {
self.handle.close();
}
}
#[op2]
#[smi]
fn op_cron_create<C>(
state: Rc<RefCell<OpState>>,
#[string] name: String,
#[string] cron_schedule: String,
#[serde] backoff_schedule: Option<Vec<u32>>,
) -> Result<ResourceId, AnyError>
where
C: CronHandler + 'static,
{
let cron_handler = {
let state = state.borrow();
// TODO(bartlomieju): replace with `state.feature_checker.check_or_exit`
// once we phase out `check_or_exit_with_legacy_fallback`
state
.feature_checker
.check_or_exit_with_legacy_fallback(UNSTABLE_FEATURE_NAME, "Deno.cron");
state.borrow::<Rc<C>>().clone()
};
validate_cron_name(&name)?;
let handle = cron_handler.create(CronSpec {
name,
cron_schedule,
backoff_schedule,
})?;
let handle_rid = {
let mut state = state.borrow_mut();
state.resource_table.add(CronResource {
handle: Rc::new(handle),
})
};
Ok(handle_rid)
}
#[op2(async)]
async fn op_cron_next<C>(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
prev_success: bool,
) -> Result<bool, AnyError>
where
C: CronHandler + 'static,
{
let cron_handler = {
let state = state.borrow();
let resource = match state.resource_table.get::<CronResource<C::EH>>(rid) {
Ok(resource) => resource,
Err(err) => {
if get_custom_error_class(&err) == Some("BadResource") {
return Ok(false);
} else {
return Err(err);
}
}
};
resource.handle.clone()
};
cron_handler.next(prev_success).await
}
fn validate_cron_name(name: &str) -> Result<(), AnyError> {
if name.len() > 64 {
return Err(type_error("Cron name is too long"));
}
if !name.chars().all(|c| {
c.is_ascii_whitespace() || c.is_ascii_alphanumeric() || c == '_' || c == '-'
}) {
return Err(type_error("Invalid cron name"));
}
Ok(())
}

343
ext/cron/local.rs Normal file
View file

@ -0,0 +1,343 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::OnceCell;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::env;
use std::rc::Rc;
use std::rc::Weak;
use std::sync::Arc;
use async_trait::async_trait;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures;
use deno_core::futures::FutureExt;
use deno_unsync::spawn;
use deno_unsync::JoinHandle;
use tokio::sync::mpsc;
use tokio::sync::mpsc::WeakSender;
use tokio::sync::OwnedSemaphorePermit;
use tokio::sync::Semaphore;
use crate::CronHandle;
use crate::CronHandler;
use crate::CronSpec;
const MAX_CRONS: usize = 100;
const DISPATCH_CONCURRENCY_LIMIT: usize = 50;
const MAX_BACKOFF_MS: u32 = 60 * 60 * 1_000; // 1 hour
const MAX_BACKOFF_COUNT: usize = 5;
const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1_000, 5_000, 30_000, 60_000];
pub struct LocalCronHandler {
cron_schedule_tx: OnceCell<mpsc::Sender<(String, bool)>>,
concurrency_limiter: Arc<Semaphore>,
cron_loop_join_handle: OnceCell<JoinHandle<()>>,
runtime_state: Rc<RefCell<RuntimeState>>,
}
struct RuntimeState {
crons: HashMap<String, Cron>,
scheduled_deadlines: BTreeMap<u64, Vec<String>>,
}
struct Cron {
spec: CronSpec,
next_tx: mpsc::WeakSender<()>,
current_execution_retries: u32,
}
impl Cron {
fn backoff_schedule(&self) -> &[u32] {
self
.spec
.backoff_schedule
.as_deref()
.unwrap_or(&DEFAULT_BACKOFF_SCHEDULE)
}
}
impl Default for LocalCronHandler {
fn default() -> Self {
Self::new()
}
}
impl LocalCronHandler {
pub fn new() -> Self {
Self {
cron_schedule_tx: OnceCell::new(),
concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)),
cron_loop_join_handle: OnceCell::new(),
runtime_state: Rc::new(RefCell::new(RuntimeState {
crons: HashMap::new(),
scheduled_deadlines: BTreeMap::new(),
})),
}
}
async fn cron_loop(
runtime_state: Rc<RefCell<RuntimeState>>,
mut cron_schedule_rx: mpsc::Receiver<(String, bool)>,
) -> Result<(), AnyError> {
loop {
let earliest_deadline = runtime_state
.borrow()
.scheduled_deadlines
.keys()
.next()
.copied();
let sleep_fut = if let Some(earliest_deadline) = earliest_deadline {
let now = crate::time::utc_now().timestamp_millis() as u64;
if let Some(delta) = earliest_deadline.checked_sub(now) {
tokio::time::sleep(std::time::Duration::from_millis(delta)).boxed()
} else {
futures::future::ready(()).boxed()
}
} else {
futures::future::pending().boxed()
};
let cron_to_schedule = tokio::select! {
_ = sleep_fut => None,
x = cron_schedule_rx.recv() => {
if x.is_none() {
return Ok(());
};
x
}
};
// Schedule next execution of the cron if needed.
if let Some((name, prev_success)) = cron_to_schedule {
let mut runtime_state = runtime_state.borrow_mut();
if let Some(cron) = runtime_state.crons.get_mut(&name) {
let backoff_schedule = cron.backoff_schedule();
let next_deadline = if !prev_success
&& cron.current_execution_retries < backoff_schedule.len() as u32
{
let backoff_ms =
backoff_schedule[cron.current_execution_retries as usize];
let now = crate::time::utc_now().timestamp_millis() as u64;
cron.current_execution_retries += 1;
now + backoff_ms as u64
} else {
let next_ts = compute_next_deadline(&cron.spec.cron_schedule)?;
cron.current_execution_retries = 0;
next_ts
};
runtime_state
.scheduled_deadlines
.entry(next_deadline)
.or_default()
.push(name.to_string());
}
}
// Dispatch ready to execute crons.
let crons_to_execute = {
let mut runtime_state = runtime_state.borrow_mut();
runtime_state.get_ready_crons()?
};
for (_, tx) in crons_to_execute {
if let Some(tx) = tx.upgrade() {
let _ = tx.send(()).await;
}
}
}
}
}
impl RuntimeState {
fn get_ready_crons(
&mut self,
) -> Result<Vec<(String, WeakSender<()>)>, AnyError> {
let now = crate::time::utc_now().timestamp_millis() as u64;
let ready = {
let to_remove = self
.scheduled_deadlines
.range(..=now)
.map(|(ts, _)| *ts)
.collect::<Vec<_>>();
to_remove
.iter()
.flat_map(|ts| {
self
.scheduled_deadlines
.remove(ts)
.unwrap()
.iter()
.map(move |name| (*ts, name.clone()))
.collect::<Vec<_>>()
})
.map(|(_, name)| {
(name.clone(), self.crons.get(&name).unwrap().next_tx.clone())
})
.collect::<Vec<_>>()
};
Ok(ready)
}
}
#[async_trait(?Send)]
impl CronHandler for LocalCronHandler {
type EH = CronExecutionHandle;
fn create(&self, spec: CronSpec) -> Result<Self::EH, AnyError> {
// Ensure that the cron loop is started.
self.cron_loop_join_handle.get_or_init(|| {
let (cron_schedule_tx, cron_schedule_rx) =
mpsc::channel::<(String, bool)>(1);
self.cron_schedule_tx.set(cron_schedule_tx).unwrap();
let runtime_state = self.runtime_state.clone();
spawn(async move {
LocalCronHandler::cron_loop(runtime_state, cron_schedule_rx)
.await
.unwrap();
})
});
let mut runtime_state = self.runtime_state.borrow_mut();
if runtime_state.crons.len() > MAX_CRONS {
return Err(type_error("Too many crons"));
}
if runtime_state.crons.contains_key(&spec.name) {
return Err(type_error("Cron with this name already exists"));
}
// Validate schedule expression.
spec
.cron_schedule
.parse::<saffron::Cron>()
.map_err(|_| type_error("Invalid cron schedule"))?;
// Validate backoff_schedule.
if let Some(backoff_schedule) = &spec.backoff_schedule {
validate_backoff_schedule(backoff_schedule)?;
}
let (next_tx, next_rx) = mpsc::channel::<()>(1);
let cron = Cron {
spec: spec.clone(),
next_tx: next_tx.downgrade(),
current_execution_retries: 0,
};
runtime_state.crons.insert(spec.name.clone(), cron);
Ok(CronExecutionHandle {
name: spec.name.clone(),
cron_schedule_tx: self.cron_schedule_tx.get().unwrap().clone(),
concurrency_limiter: self.concurrency_limiter.clone(),
runtime_state: Rc::downgrade(&self.runtime_state),
inner: RefCell::new(Inner {
next_rx: Some(next_rx),
shutdown_tx: Some(next_tx),
permit: None,
}),
})
}
}
pub struct CronExecutionHandle {
name: String,
runtime_state: Weak<RefCell<RuntimeState>>,
cron_schedule_tx: mpsc::Sender<(String, bool)>,
concurrency_limiter: Arc<Semaphore>,
inner: RefCell<Inner>,
}
struct Inner {
next_rx: Option<mpsc::Receiver<()>>,
shutdown_tx: Option<mpsc::Sender<()>>,
permit: Option<OwnedSemaphorePermit>,
}
#[async_trait(?Send)]
impl CronHandle for CronExecutionHandle {
async fn next(&self, prev_success: bool) -> Result<bool, AnyError> {
self.inner.borrow_mut().permit.take();
if self
.cron_schedule_tx
.send((self.name.clone(), prev_success))
.await
.is_err()
{
return Ok(false);
};
let Some(mut next_rx) = self.inner.borrow_mut().next_rx.take() else {
return Ok(false);
};
if next_rx.recv().await.is_none() {
return Ok(false);
};
let permit = self.concurrency_limiter.clone().acquire_owned().await?;
let mut inner = self.inner.borrow_mut();
inner.next_rx = Some(next_rx);
inner.permit = Some(permit);
Ok(true)
}
fn close(&self) {
if let Some(tx) = self.inner.borrow_mut().shutdown_tx.take() {
drop(tx)
}
if let Some(runtime_state) = self.runtime_state.upgrade() {
let mut runtime_state = runtime_state.borrow_mut();
runtime_state.crons.remove(&self.name);
}
}
}
fn compute_next_deadline(cron_expression: &str) -> Result<u64, AnyError> {
let now = crate::time::utc_now();
if let Ok(test_schedule) = env::var("DENO_CRON_TEST_SCHEDULE_OFFSET") {
if let Ok(offset) = test_schedule.parse::<u64>() {
return Ok(now.timestamp_millis() as u64 + offset);
}
}
let cron = cron_expression
.parse::<saffron::Cron>()
.map_err(|_| anyhow::anyhow!("invalid cron expression"))?;
let Some(next_deadline) = cron.next_after(now) else {
return Err(anyhow::anyhow!("invalid cron expression"));
};
Ok(next_deadline.timestamp_millis() as u64)
}
fn validate_backoff_schedule(
backoff_schedule: &Vec<u32>,
) -> Result<(), AnyError> {
if backoff_schedule.len() > MAX_BACKOFF_COUNT {
return Err(type_error("Invalid backoff schedule"));
}
if backoff_schedule.iter().any(|s| *s > MAX_BACKOFF_MS) {
return Err(type_error("Invalid backoff schedule"));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_compute_next_deadline() {
let now = crate::time::utc_now().timestamp_millis() as u64;
assert!(compute_next_deadline("*/1 * * * *").unwrap() > now);
assert!(compute_next_deadline("* * * * *").unwrap() > now);
assert!(compute_next_deadline("bogus").is_err());
assert!(compute_next_deadline("* * * * * *").is_err());
assert!(compute_next_deadline("* * *").is_err());
}
}

19
ext/cron/time.rs Normal file
View file

@ -0,0 +1,19 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
/// Identical to chrono::Utc::now() but without the system "clock"
/// feature flag.
///
/// The "clock" feature flag pulls in the "iana-time-zone" crate
/// which links to macOS's "CoreFoundation" framework which increases
/// startup time for the CLI.
pub fn utc_now() -> chrono::DateTime<chrono::Utc> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system time before Unix epoch");
let naive = chrono::NaiveDateTime::from_timestamp_opt(
now.as_secs() as i64,
now.subsec_nanos(),
)
.unwrap();
chrono::DateTime::from_naive_utc_and_offset(naive, chrono::Utc)
}

View file

@ -44,6 +44,7 @@ deno_broadcast_channel.workspace = true
deno_cache.workspace = true
deno_console.workspace = true
deno_core.workspace = true
deno_cron.workspace = true
deno_crypto.workspace = true
deno_fetch.workspace = true
deno_ffi.workspace = true
@ -71,6 +72,7 @@ deno_broadcast_channel.workspace = true
deno_cache.workspace = true
deno_console.workspace = true
deno_core.workspace = true
deno_cron.workspace = true
deno_crypto.workspace = true
deno_fetch.workspace = true
deno_ffi.workspace = true

View file

@ -223,6 +223,9 @@ mod startup_snapshot {
deno_kv::deno_kv::init_ops_and_esm(deno_kv::sqlite::SqliteDbHandler::<
Permissions,
>::new(None, None)),
deno_cron::deno_cron::init_ops_and_esm(
deno_cron::local::LocalCronHandler::new(),
),
deno_napi::deno_napi::init_ops_and_esm::<Permissions>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_io::deno_io::init_ops_and_esm(Default::default()),

View file

@ -24,6 +24,7 @@ import * as tty from "ext:runtime/40_tty.js";
// TODO(bartlomieju): this is funky we have two `http` imports
import * as httpRuntime from "ext:runtime/40_http.js";
import * as kv from "ext:deno_kv/01_db.ts";
import * as cron from "ext:deno_cron/01_cron.ts";
const denoNs = {
metrics: core.metrics,
@ -179,6 +180,7 @@ const denoNsUnstable = {
Kv: kv.Kv,
KvU64: kv.KvU64,
KvListIterator: kv.KvListIterator,
cron: cron.cron,
};
export { denoNs, denoNsUnstable };

View file

@ -4,6 +4,7 @@ pub use deno_broadcast_channel;
pub use deno_cache;
pub use deno_console;
pub use deno_core;
pub use deno_cron;
pub use deno_crypto;
pub use deno_fetch;
pub use deno_ffi;

View file

@ -36,6 +36,7 @@ use deno_core::RuntimeOptions;
use deno_core::SharedArrayBufferStore;
use deno_core::Snapshot;
use deno_core::SourceMapGetter;
use deno_cron::local::LocalCronHandler;
use deno_fs::FileSystem;
use deno_http::DefaultHttpPropertyExtractor;
use deno_io::Stdio;
@ -450,6 +451,7 @@ impl WebWorker {
},
),
),
deno_cron::deno_cron::init_ops_and_esm(LocalCronHandler::new()),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_io::deno_io::init_ops_and_esm(Some(options.stdio)),

View file

@ -31,6 +31,7 @@ use deno_core::RuntimeOptions;
use deno_core::SharedArrayBufferStore;
use deno_core::Snapshot;
use deno_core::SourceMapGetter;
use deno_cron::local::LocalCronHandler;
use deno_fs::FileSystem;
use deno_http::DefaultHttpPropertyExtractor;
use deno_io::Stdio;
@ -273,6 +274,7 @@ impl MainWorker {
},
),
),
deno_cron::deno_cron::init_ops_and_esm(LocalCronHandler::new()),
deno_napi::deno_napi::init_ops_and_esm::<PermissionsContainer>(),
deno_http::deno_http::init_ops_and_esm::<DefaultHttpPropertyExtractor>(),
deno_io::deno_io::init_ops_and_esm(Some(options.stdio)),

View file

@ -3,6 +3,7 @@
"ext:deno_broadcast_channel/01_broadcast_channel.js": "../ext/broadcast_channel/01_broadcast_channel.js",
"ext:deno_cache/01_cache.js": "../ext/cache/01_cache.js",
"ext:deno_console/01_console.js": "../ext/console/01_console.js",
"ext:deno_cron/01_cron.ts": "../ext/cron/01_cron.ts",
"ext:deno_crypto/00_crypto.js": "../ext/crypto/00_crypto.js",
"ext:deno_fetch/20_headers.js": "../ext/fetch/20_headers.js",
"ext:deno_fetch/21_formdata.js": "../ext/fetch/21_formdata.js",