mirror of
https://github.com/denoland/deno.git
synced 2024-12-11 10:07:54 -05:00
fa1ba256d2
This PR removes the use of the custom `utc_now` function in favor of the `chrono` implementation. It resolves #22864. --------- Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
344 lines
9.5 KiB
Rust
344 lines
9.5 KiB
Rust
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
|
|
|
use std::cell::OnceCell;
|
|
use std::cell::RefCell;
|
|
use std::collections::BTreeMap;
|
|
use std::collections::HashMap;
|
|
use std::env;
|
|
use std::rc::Rc;
|
|
use std::rc::Weak;
|
|
use std::sync::Arc;
|
|
|
|
use async_trait::async_trait;
|
|
use deno_core::error::type_error;
|
|
use deno_core::error::AnyError;
|
|
use deno_core::futures;
|
|
use deno_core::futures::FutureExt;
|
|
use deno_core::unsync::spawn;
|
|
use deno_core::unsync::JoinHandle;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::mpsc::WeakSender;
|
|
use tokio::sync::OwnedSemaphorePermit;
|
|
use tokio::sync::Semaphore;
|
|
|
|
use crate::CronHandle;
|
|
use crate::CronHandler;
|
|
use crate::CronSpec;
|
|
|
|
const MAX_CRONS: usize = 100;
|
|
const DISPATCH_CONCURRENCY_LIMIT: usize = 50;
|
|
const MAX_BACKOFF_MS: u32 = 60 * 60 * 1_000; // 1 hour
|
|
const MAX_BACKOFF_COUNT: usize = 5;
|
|
const DEFAULT_BACKOFF_SCHEDULE: [u32; 5] = [100, 1_000, 5_000, 30_000, 60_000];
|
|
|
|
pub struct LocalCronHandler {
|
|
cron_schedule_tx: OnceCell<mpsc::Sender<(String, bool)>>,
|
|
concurrency_limiter: Arc<Semaphore>,
|
|
cron_loop_join_handle: OnceCell<JoinHandle<()>>,
|
|
runtime_state: Rc<RefCell<RuntimeState>>,
|
|
}
|
|
|
|
struct RuntimeState {
|
|
crons: HashMap<String, Cron>,
|
|
scheduled_deadlines: BTreeMap<u64, Vec<String>>,
|
|
}
|
|
|
|
struct Cron {
|
|
spec: CronSpec,
|
|
next_tx: mpsc::WeakSender<()>,
|
|
current_execution_retries: u32,
|
|
}
|
|
|
|
impl Cron {
|
|
fn backoff_schedule(&self) -> &[u32] {
|
|
self
|
|
.spec
|
|
.backoff_schedule
|
|
.as_deref()
|
|
.unwrap_or(&DEFAULT_BACKOFF_SCHEDULE)
|
|
}
|
|
}
|
|
|
|
impl Default for LocalCronHandler {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
impl LocalCronHandler {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
cron_schedule_tx: OnceCell::new(),
|
|
concurrency_limiter: Arc::new(Semaphore::new(DISPATCH_CONCURRENCY_LIMIT)),
|
|
cron_loop_join_handle: OnceCell::new(),
|
|
runtime_state: Rc::new(RefCell::new(RuntimeState {
|
|
crons: HashMap::new(),
|
|
scheduled_deadlines: BTreeMap::new(),
|
|
})),
|
|
}
|
|
}
|
|
|
|
async fn cron_loop(
|
|
runtime_state: Rc<RefCell<RuntimeState>>,
|
|
mut cron_schedule_rx: mpsc::Receiver<(String, bool)>,
|
|
) -> Result<(), AnyError> {
|
|
loop {
|
|
let earliest_deadline = runtime_state
|
|
.borrow()
|
|
.scheduled_deadlines
|
|
.keys()
|
|
.next()
|
|
.copied();
|
|
|
|
let sleep_fut = if let Some(earliest_deadline) = earliest_deadline {
|
|
let now = chrono::Utc::now().timestamp_millis() as u64;
|
|
if let Some(delta) = earliest_deadline.checked_sub(now) {
|
|
tokio::time::sleep(std::time::Duration::from_millis(delta)).boxed()
|
|
} else {
|
|
futures::future::ready(()).boxed()
|
|
}
|
|
} else {
|
|
futures::future::pending().boxed()
|
|
};
|
|
|
|
let cron_to_schedule = tokio::select! {
|
|
_ = sleep_fut => None,
|
|
x = cron_schedule_rx.recv() => {
|
|
if x.is_none() {
|
|
return Ok(());
|
|
};
|
|
x
|
|
}
|
|
};
|
|
|
|
// Schedule next execution of the cron if needed.
|
|
if let Some((name, prev_success)) = cron_to_schedule {
|
|
let mut runtime_state = runtime_state.borrow_mut();
|
|
if let Some(cron) = runtime_state.crons.get_mut(&name) {
|
|
let backoff_schedule = cron.backoff_schedule();
|
|
let next_deadline = if !prev_success
|
|
&& cron.current_execution_retries < backoff_schedule.len() as u32
|
|
{
|
|
let backoff_ms =
|
|
backoff_schedule[cron.current_execution_retries as usize];
|
|
let now = chrono::Utc::now().timestamp_millis() as u64;
|
|
cron.current_execution_retries += 1;
|
|
now + backoff_ms as u64
|
|
} else {
|
|
let next_ts = compute_next_deadline(&cron.spec.cron_schedule)?;
|
|
cron.current_execution_retries = 0;
|
|
next_ts
|
|
};
|
|
runtime_state
|
|
.scheduled_deadlines
|
|
.entry(next_deadline)
|
|
.or_default()
|
|
.push(name.to_string());
|
|
}
|
|
}
|
|
|
|
// Dispatch ready to execute crons.
|
|
let crons_to_execute = {
|
|
let mut runtime_state = runtime_state.borrow_mut();
|
|
runtime_state.get_ready_crons()?
|
|
};
|
|
for (_, tx) in crons_to_execute {
|
|
if let Some(tx) = tx.upgrade() {
|
|
let _ = tx.send(()).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl RuntimeState {
|
|
fn get_ready_crons(
|
|
&mut self,
|
|
) -> Result<Vec<(String, WeakSender<()>)>, AnyError> {
|
|
let now = chrono::Utc::now().timestamp_millis() as u64;
|
|
|
|
let ready = {
|
|
let to_remove = self
|
|
.scheduled_deadlines
|
|
.range(..=now)
|
|
.map(|(ts, _)| *ts)
|
|
.collect::<Vec<_>>();
|
|
to_remove
|
|
.iter()
|
|
.flat_map(|ts| {
|
|
self
|
|
.scheduled_deadlines
|
|
.remove(ts)
|
|
.unwrap()
|
|
.iter()
|
|
.map(move |name| (*ts, name.clone()))
|
|
.collect::<Vec<_>>()
|
|
})
|
|
.filter_map(|(_, name)| {
|
|
self
|
|
.crons
|
|
.get(&name)
|
|
.map(|c| (name.clone(), c.next_tx.clone()))
|
|
})
|
|
.collect::<Vec<_>>()
|
|
};
|
|
|
|
Ok(ready)
|
|
}
|
|
}
|
|
|
|
#[async_trait(?Send)]
|
|
impl CronHandler for LocalCronHandler {
|
|
type EH = CronExecutionHandle;
|
|
|
|
fn create(&self, spec: CronSpec) -> Result<Self::EH, AnyError> {
|
|
// Ensure that the cron loop is started.
|
|
self.cron_loop_join_handle.get_or_init(|| {
|
|
let (cron_schedule_tx, cron_schedule_rx) =
|
|
mpsc::channel::<(String, bool)>(1);
|
|
self.cron_schedule_tx.set(cron_schedule_tx).unwrap();
|
|
let runtime_state = self.runtime_state.clone();
|
|
spawn(async move {
|
|
LocalCronHandler::cron_loop(runtime_state, cron_schedule_rx)
|
|
.await
|
|
.unwrap();
|
|
})
|
|
});
|
|
|
|
let mut runtime_state = self.runtime_state.borrow_mut();
|
|
|
|
if runtime_state.crons.len() > MAX_CRONS {
|
|
return Err(type_error("Too many crons"));
|
|
}
|
|
if runtime_state.crons.contains_key(&spec.name) {
|
|
return Err(type_error("Cron with this name already exists"));
|
|
}
|
|
|
|
// Validate schedule expression.
|
|
spec
|
|
.cron_schedule
|
|
.parse::<saffron::Cron>()
|
|
.map_err(|_| type_error("Invalid cron schedule"))?;
|
|
|
|
// Validate backoff_schedule.
|
|
if let Some(backoff_schedule) = &spec.backoff_schedule {
|
|
validate_backoff_schedule(backoff_schedule)?;
|
|
}
|
|
|
|
let (next_tx, next_rx) = mpsc::channel::<()>(1);
|
|
let cron = Cron {
|
|
spec: spec.clone(),
|
|
next_tx: next_tx.downgrade(),
|
|
current_execution_retries: 0,
|
|
};
|
|
runtime_state.crons.insert(spec.name.clone(), cron);
|
|
|
|
Ok(CronExecutionHandle {
|
|
name: spec.name.clone(),
|
|
cron_schedule_tx: self.cron_schedule_tx.get().unwrap().clone(),
|
|
concurrency_limiter: self.concurrency_limiter.clone(),
|
|
runtime_state: Rc::downgrade(&self.runtime_state),
|
|
inner: RefCell::new(Inner {
|
|
next_rx: Some(next_rx),
|
|
shutdown_tx: Some(next_tx),
|
|
permit: None,
|
|
}),
|
|
})
|
|
}
|
|
}
|
|
|
|
pub struct CronExecutionHandle {
|
|
name: String,
|
|
runtime_state: Weak<RefCell<RuntimeState>>,
|
|
cron_schedule_tx: mpsc::Sender<(String, bool)>,
|
|
concurrency_limiter: Arc<Semaphore>,
|
|
inner: RefCell<Inner>,
|
|
}
|
|
|
|
struct Inner {
|
|
next_rx: Option<mpsc::Receiver<()>>,
|
|
shutdown_tx: Option<mpsc::Sender<()>>,
|
|
permit: Option<OwnedSemaphorePermit>,
|
|
}
|
|
|
|
#[async_trait(?Send)]
|
|
impl CronHandle for CronExecutionHandle {
|
|
async fn next(&self, prev_success: bool) -> Result<bool, AnyError> {
|
|
self.inner.borrow_mut().permit.take();
|
|
|
|
if self
|
|
.cron_schedule_tx
|
|
.send((self.name.clone(), prev_success))
|
|
.await
|
|
.is_err()
|
|
{
|
|
return Ok(false);
|
|
};
|
|
|
|
let Some(mut next_rx) = self.inner.borrow_mut().next_rx.take() else {
|
|
return Ok(false);
|
|
};
|
|
if next_rx.recv().await.is_none() {
|
|
return Ok(false);
|
|
};
|
|
|
|
let permit = self.concurrency_limiter.clone().acquire_owned().await?;
|
|
let mut inner = self.inner.borrow_mut();
|
|
inner.next_rx = Some(next_rx);
|
|
inner.permit = Some(permit);
|
|
Ok(true)
|
|
}
|
|
|
|
fn close(&self) {
|
|
if let Some(tx) = self.inner.borrow_mut().shutdown_tx.take() {
|
|
drop(tx)
|
|
}
|
|
if let Some(runtime_state) = self.runtime_state.upgrade() {
|
|
let mut runtime_state = runtime_state.borrow_mut();
|
|
runtime_state.crons.remove(&self.name);
|
|
}
|
|
}
|
|
}
|
|
|
|
fn compute_next_deadline(cron_expression: &str) -> Result<u64, AnyError> {
|
|
let now = chrono::Utc::now();
|
|
|
|
if let Ok(test_schedule) = env::var("DENO_CRON_TEST_SCHEDULE_OFFSET") {
|
|
if let Ok(offset) = test_schedule.parse::<u64>() {
|
|
return Ok(now.timestamp_millis() as u64 + offset);
|
|
}
|
|
}
|
|
|
|
let cron = cron_expression
|
|
.parse::<saffron::Cron>()
|
|
.map_err(|_| anyhow::anyhow!("invalid cron expression"))?;
|
|
let Some(next_deadline) = cron.next_after(now) else {
|
|
return Err(anyhow::anyhow!("invalid cron expression"));
|
|
};
|
|
Ok(next_deadline.timestamp_millis() as u64)
|
|
}
|
|
|
|
fn validate_backoff_schedule(backoff_schedule: &[u32]) -> Result<(), AnyError> {
|
|
if backoff_schedule.len() > MAX_BACKOFF_COUNT {
|
|
return Err(type_error("Invalid backoff schedule"));
|
|
}
|
|
if backoff_schedule.iter().any(|s| *s > MAX_BACKOFF_MS) {
|
|
return Err(type_error("Invalid backoff schedule"));
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_compute_next_deadline() {
|
|
let now = chrono::Utc::now().timestamp_millis() as u64;
|
|
assert!(compute_next_deadline("*/1 * * * *").unwrap() > now);
|
|
assert!(compute_next_deadline("* * * * *").unwrap() > now);
|
|
assert!(compute_next_deadline("bogus").is_err());
|
|
assert!(compute_next_deadline("* * * * * *").is_err());
|
|
assert!(compute_next_deadline("* * *").is_err());
|
|
}
|
|
}
|