// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use std::cell::OnceCell; use std::cell::RefCell; use std::collections::BTreeMap; use std::collections::HashMap; use std::env; use std::rc::Rc; use std::rc::Weak; use std::sync::Arc; use async_trait::async_trait; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures; use deno_core::futures::FutureExt; use deno_unsync::spawn; use deno_unsync::JoinHandle; use tokio::sync::mpsc; use tokio::sync::mpsc::WeakSender; use tokio::sync::OwnedSemaphorePermit; use tokio::sync::Semaphore; use crate::CronHandle; use crate::CronHandler; use crate::CronSpec; const MAX_CRONS: usize = 100; const DISPATCH_CONCURRENCY_LIMIT: usize = 50; const MAX_BACKOFF_MS: u32 = 60 * 60 * 1_000; // 1 hour const MAX_BACKOFF_COUNT: usize = 5; const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1_000, 5_000, 30_000, 60_000]; pub struct LocalCronHandler { cron_schedule_tx: OnceCell>, concurrency_limiter: Arc, cron_loop_join_handle: OnceCell>, runtime_state: Rc>, } struct RuntimeState { crons: HashMap, scheduled_deadlines: BTreeMap>, } struct Cron { spec: CronSpec, next_tx: mpsc::WeakSender<()>, current_execution_retries: u32, } impl Cron { fn backoff_schedule(&self) -> &[u32] { self .spec .backoff_schedule .as_deref() .unwrap_or(&DEFAULT_BACKOFF_SCHEDULE) } } impl Default for LocalCronHandler { fn default() -> Self { Self::new() } } impl LocalCronHandler { pub fn new() -> Self { Self { cron_schedule_tx: OnceCell::new(), concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)), cron_loop_join_handle: OnceCell::new(), runtime_state: Rc::new(RefCell::new(RuntimeState { crons: HashMap::new(), scheduled_deadlines: BTreeMap::new(), })), } } async fn cron_loop( runtime_state: Rc>, mut cron_schedule_rx: mpsc::Receiver<(String, bool)>, ) -> Result<(), AnyError> { loop { let earliest_deadline = runtime_state .borrow() .scheduled_deadlines .keys() .next() .copied(); let sleep_fut = if let Some(earliest_deadline) = earliest_deadline { let now = crate::time::utc_now().timestamp_millis() as u64; if let Some(delta) = earliest_deadline.checked_sub(now) { tokio::time::sleep(std::time::Duration::from_millis(delta)).boxed() } else { futures::future::ready(()).boxed() } } else { futures::future::pending().boxed() }; let cron_to_schedule = tokio::select! { _ = sleep_fut => None, x = cron_schedule_rx.recv() => { if x.is_none() { return Ok(()); }; x } }; // Schedule next execution of the cron if needed. if let Some((name, prev_success)) = cron_to_schedule { let mut runtime_state = runtime_state.borrow_mut(); if let Some(cron) = runtime_state.crons.get_mut(&name) { let backoff_schedule = cron.backoff_schedule(); let next_deadline = if !prev_success && cron.current_execution_retries < backoff_schedule.len() as u32 { let backoff_ms = backoff_schedule[cron.current_execution_retries as usize]; let now = crate::time::utc_now().timestamp_millis() as u64; cron.current_execution_retries += 1; now + backoff_ms as u64 } else { let next_ts = compute_next_deadline(&cron.spec.cron_schedule)?; cron.current_execution_retries = 0; next_ts }; runtime_state .scheduled_deadlines .entry(next_deadline) .or_default() .push(name.to_string()); } } // Dispatch ready to execute crons. let crons_to_execute = { let mut runtime_state = runtime_state.borrow_mut(); runtime_state.get_ready_crons()? }; for (_, tx) in crons_to_execute { if let Some(tx) = tx.upgrade() { let _ = tx.send(()).await; } } } } } impl RuntimeState { fn get_ready_crons( &mut self, ) -> Result)>, AnyError> { let now = crate::time::utc_now().timestamp_millis() as u64; let ready = { let to_remove = self .scheduled_deadlines .range(..=now) .map(|(ts, _)| *ts) .collect::>(); to_remove .iter() .flat_map(|ts| { self .scheduled_deadlines .remove(ts) .unwrap() .iter() .map(move |name| (*ts, name.clone())) .collect::>() }) .filter_map(|(_, name)| { self .crons .get(&name) .map(|c| (name.clone(), c.next_tx.clone())) }) .collect::>() }; Ok(ready) } } #[async_trait(?Send)] impl CronHandler for LocalCronHandler { type EH = CronExecutionHandle; fn create(&self, spec: CronSpec) -> Result { // Ensure that the cron loop is started. self.cron_loop_join_handle.get_or_init(|| { let (cron_schedule_tx, cron_schedule_rx) = mpsc::channel::<(String, bool)>(1); self.cron_schedule_tx.set(cron_schedule_tx).unwrap(); let runtime_state = self.runtime_state.clone(); spawn(async move { LocalCronHandler::cron_loop(runtime_state, cron_schedule_rx) .await .unwrap(); }) }); let mut runtime_state = self.runtime_state.borrow_mut(); if runtime_state.crons.len() > MAX_CRONS { return Err(type_error("Too many crons")); } if runtime_state.crons.contains_key(&spec.name) { return Err(type_error("Cron with this name already exists")); } // Validate schedule expression. spec .cron_schedule .parse::() .map_err(|_| type_error("Invalid cron schedule"))?; // Validate backoff_schedule. if let Some(backoff_schedule) = &spec.backoff_schedule { validate_backoff_schedule(backoff_schedule)?; } let (next_tx, next_rx) = mpsc::channel::<()>(1); let cron = Cron { spec: spec.clone(), next_tx: next_tx.downgrade(), current_execution_retries: 0, }; runtime_state.crons.insert(spec.name.clone(), cron); Ok(CronExecutionHandle { name: spec.name.clone(), cron_schedule_tx: self.cron_schedule_tx.get().unwrap().clone(), concurrency_limiter: self.concurrency_limiter.clone(), runtime_state: Rc::downgrade(&self.runtime_state), inner: RefCell::new(Inner { next_rx: Some(next_rx), shutdown_tx: Some(next_tx), permit: None, }), }) } } pub struct CronExecutionHandle { name: String, runtime_state: Weak>, cron_schedule_tx: mpsc::Sender<(String, bool)>, concurrency_limiter: Arc, inner: RefCell, } struct Inner { next_rx: Option>, shutdown_tx: Option>, permit: Option, } #[async_trait(?Send)] impl CronHandle for CronExecutionHandle { async fn next(&self, prev_success: bool) -> Result { self.inner.borrow_mut().permit.take(); if self .cron_schedule_tx .send((self.name.clone(), prev_success)) .await .is_err() { return Ok(false); }; let Some(mut next_rx) = self.inner.borrow_mut().next_rx.take() else { return Ok(false); }; if next_rx.recv().await.is_none() { return Ok(false); }; let permit = self.concurrency_limiter.clone().acquire_owned().await?; let mut inner = self.inner.borrow_mut(); inner.next_rx = Some(next_rx); inner.permit = Some(permit); Ok(true) } fn close(&self) { if let Some(tx) = self.inner.borrow_mut().shutdown_tx.take() { drop(tx) } if let Some(runtime_state) = self.runtime_state.upgrade() { let mut runtime_state = runtime_state.borrow_mut(); runtime_state.crons.remove(&self.name); } } } fn compute_next_deadline(cron_expression: &str) -> Result { let now = crate::time::utc_now(); if let Ok(test_schedule) = env::var("DENO_CRON_TEST_SCHEDULE_OFFSET") { if let Ok(offset) = test_schedule.parse::() { return Ok(now.timestamp_millis() as u64 + offset); } } let cron = cron_expression .parse::() .map_err(|_| anyhow::anyhow!("invalid cron expression"))?; let Some(next_deadline) = cron.next_after(now) else { return Err(anyhow::anyhow!("invalid cron expression")); }; Ok(next_deadline.timestamp_millis() as u64) } fn validate_backoff_schedule( backoff_schedule: &Vec, ) -> Result<(), AnyError> { if backoff_schedule.len() > MAX_BACKOFF_COUNT { return Err(type_error("Invalid backoff schedule")); } if backoff_schedule.iter().any(|s| *s > MAX_BACKOFF_MS) { return Err(type_error("Invalid backoff schedule")); } Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn test_compute_next_deadline() { let now = crate::time::utc_now().timestamp_millis() as u64; assert!(compute_next_deadline("*/1 * * * *").unwrap() > now); assert!(compute_next_deadline("* * * * *").unwrap() > now); assert!(compute_next_deadline("bogus").is_err()); assert!(compute_next_deadline("* * * * * *").is_err()); assert!(compute_next_deadline("* * *").is_err()); } }