This commit is contained in:
JMARyA 2025-03-10 18:43:32 +01:00
parent 18c663fcdb
commit 46cb21dc2a
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
8 changed files with 575 additions and 6 deletions

195
src/cron.rs Normal file
View file

@ -0,0 +1,195 @@
use std::{
sync::{Arc, RwLock},
thread::JoinHandle,
time::Duration,
u64,
};
use chrono::Utc;
use rand::Rng;
pub enum Schedule {
Every(Duration),
At(chrono::DateTime<Utc>),
}
pub struct CronTask {
f: Arc<Box<dyn Fn() + Send + Sync + 'static>>,
schedule: Schedule,
name: String,
last_run: Option<chrono::DateTime<Utc>>,
}
impl CronTask {
pub fn new<F: Fn() + Send + Sync + 'static>(name: &str, schedule: Schedule, f: F) -> Self {
Self {
f: Arc::new(Box::new(f)),
schedule,
name: name.to_string(),
last_run: None,
}
}
pub fn is_absolute(&self) -> bool {
match self.schedule {
Schedule::Every(_) => false,
Schedule::At(_) => true,
}
}
pub fn run(&mut self) -> JoinHandle<()> {
log::info!("Starting cron task '{}'", self.name);
self.last_run = Some(Utc::now());
let f = Arc::clone(&self.f);
std::thread::spawn(move || {
f.as_ref()();
})
}
pub fn wait_until(&mut self) -> Duration {
match self.schedule {
Schedule::Every(duration) => {
let now = Utc::now();
if let Some(last_exec) = self.last_run {
let since_then = (now - last_exec).to_std().unwrap();
duration.checked_sub(since_then).unwrap_or(Duration::ZERO)
} else {
self.last_run = Some(Utc::now());
duration
}
}
Schedule::At(date_time) => {
if self.last_run.is_none() {
let now = Utc::now();
if let Ok(dur) = date_time.signed_duration_since(&now).to_std() {
dur
} else {
Duration::ZERO
}
} else {
Duration::from_secs(u64::MAX)
}
}
}
}
}
pub struct Cron {
tasks: RwLock<Vec<CronTask>>,
}
impl Cron {
pub fn new() -> Self {
Self {
tasks: RwLock::new(Vec::new()),
}
}
pub fn add_task<F: Fn() + Send + Sync + 'static>(&self, name: &str, schedule: Schedule, f: F) {
self.tasks
.write()
.unwrap()
.push(CronTask::new(name, schedule, f));
}
pub fn run_at<F: Fn() + Send + Sync + 'static>(&self, dt: chrono::DateTime<chrono::Utc>, f: F) {
let name = format!("delayed_{}", rand::rng().random_range(1000..9999));
self.tasks
.write()
.unwrap()
.push(CronTask::new(&name, Schedule::At(dt), f));
}
pub fn run(&self) {
loop {
// init
let mut last_wait = Duration::from_secs(u64::MAX);
let mut last_task: Option<usize> = None;
{
// find next task
let mut tasks = self.tasks.write().unwrap();
for (i, task) in tasks.iter_mut().enumerate() {
let wait_time = task.wait_until();
if wait_time < last_wait {
last_wait = wait_time;
last_task = Some(i);
}
}
}
if let Some(index) = last_task {
// init
let mut remove = false;
let mut skip = false;
// limit longest blocking time (5s)
let real_wait = if last_wait.gt(&Duration::from_secs(5)) {
skip = true;
Duration::from_secs(5)
} else {
last_wait
};
{
// logging
let tasks = self.tasks.read().unwrap();
log::debug!("Managing {} cron task(s)", tasks.len());
let task = tasks.get(index).unwrap();
if real_wait == last_wait {
log::debug!(
"Waiting for {real_wait:?} to start cron task '{}'",
task.name
);
} else {
log::debug!(
"Would wait for {last_wait:?} to start cron task '{}'. Waiting for {real_wait:?}",
task.name
);
}
// if somehow we wait indefinitely
if last_wait == Duration::from_secs(u64::MAX) {
log::warn!("Infinite wait time for cron");
continue;
}
// set remove flag for absolute time cron tasks
if task.is_absolute() {
log::info!(
"Removing task '{}' from cron because it will never run again",
task.name
);
remove = true;
}
}
// sleep until task
std::thread::sleep(real_wait);
// skip if we are still just sleeping
if skip {
continue;
}
{
// run cron task
let mut tasks = self.tasks.write().unwrap();
let task = tasks.get_mut(index).unwrap();
let _ = task.run();
}
if remove {
{
// remove if requested
let mut tasks = self.tasks.write().unwrap();
log::info!("Removing cron task #{index}");
tasks.remove(index);
}
}
}
}
}
}

View file

@ -1,16 +1,22 @@
use std::mem::take;
pub struct Defer {
f: Box<dyn Fn()>,
f: Option<Box<dyn FnOnce()>>,
}
impl Defer {
pub fn new<F: Fn() + 'static>(f: F) -> Self {
Self { f: Box::new(f) }
pub fn new<F: FnOnce() + 'static>(f: F) -> Self {
Self {
f: Some(Box::new(f)),
}
}
}
impl Drop for Defer {
fn drop(&mut self) {
log::debug!("Calling defer function");
self.f.as_ref()();
if let Some(f) = take(&mut self.f) {
f();
}
}
}

View file

@ -1,5 +1,11 @@
use std::{sync::mpsc, thread, time::Instant};
#![feature(fn_traits)]
use std::{
sync::mpsc,
thread,
time::{Duration, Instant},
};
pub mod cron;
mod defer;
pub mod iterated;
pub mod job;
@ -69,3 +75,71 @@ pub fn retry<O, F: Fn() -> Option<O>>(f: F) -> O {
}
}
}
/// Run a background task.
///
/// This spawns a seperate thread for a background process.
/// The background task is guaranteed to finish within its defined scope.
/// If the end of the scope is reached while the thread is still running it will block.
///
/// # Example
/// ```ignore
/// use comrade::background;
///
/// fn do_work() {
/// println!("doing work...");
///
/// // spawn background thread
/// background!(|| {
/// println!("doing something in the background");
/// std::thread::sleep(std::time::Duration::from_secs(3));
/// });
///
/// println!("doing something else...");
///
/// // end of scope
/// // the code will block until all background processes defined here are done.
/// }
///
/// fn main() {
/// do_work();
/// println!("finished with work");
/// }
/// ```
#[macro_export]
macro_rules! background {
($f:expr) => {
let handle = std::thread::spawn(move || $f());
comrade::defer!(|| {
handle.join().unwrap();
});
};
}
/// Start running a function after `duration`.
pub fn delay<F: Fn() + Send + 'static>(duration: std::time::Duration, f: F) {
let _ = std::thread::spawn(move || {
log::info!("Will start running in {duration:?}");
std::thread::sleep(duration);
f();
});
}
/// Run `f(&T) -> X` for every item in `items`
pub fn parallel<T: Send + Sync + 'static, F, X: Send + 'static>(items: Vec<T>, f: F) -> Vec<X>
where
F: Fn(&T) -> X + Send + Sync + Copy + 'static,
{
let threads: Vec<_> = items
.into_iter()
.map(|x| std::thread::spawn(move || f(&x)))
.collect();
threads.into_iter().map(|x| x.join().unwrap()).collect()
}
pub fn datetime_in(d: Duration) -> chrono::DateTime<chrono::Utc> {
chrono::Utc::now()
.checked_add_signed(chrono::TimeDelta::from_std(d).unwrap())
.unwrap()
}

View file

@ -8,6 +8,8 @@ use std::{
time::Duration,
};
use crate::cron::Cron;
/// Status receiver of a dead man switch
pub struct DeadManReceiver {
rx: Receiver<bool>,
@ -83,6 +85,16 @@ impl ServiceManager {
self
}
pub fn register_cron(self, cron: Arc<Cron>) -> (Self, Arc<Cron>) {
let cron_ret = Arc::clone(&cron);
(
self.register("cron", move |_| {
cron.run();
}),
cron_ret,
)
}
/// Register a new background service
pub fn register<T: Fn(DeadManSwitch) -> () + 'static + Send + Sync>(
mut self,