198 lines
5.1 KiB
Rust
198 lines
5.1 KiB
Rust
#![feature(fn_traits)]
|
|
use std::{
|
|
sync::mpsc,
|
|
thread,
|
|
time::{Duration, Instant},
|
|
};
|
|
|
|
pub mod cron;
|
|
mod defer;
|
|
pub mod iterated;
|
|
pub mod job;
|
|
pub mod service;
|
|
pub use defer::Defer;
|
|
|
|
pub use comrade_macro::{defer, worker};
|
|
pub use crossbeam;
|
|
use dashmap::DashMap;
|
|
use once_cell::sync::Lazy;
|
|
pub use serde_json;
|
|
|
|
// TODO : worker docs + refactor
|
|
|
|
pub static UNION: Lazy<
|
|
DashMap<&'static str, job::JobMultiplexer<serde_json::Value, serde_json::Value>>,
|
|
> = Lazy::new(DashMap::new);
|
|
|
|
/// Rally Function
|
|
///
|
|
/// This executes a thread for every item executing `f(&item) -> X`. Whatever function returns first is returned while every other thread is killed.
|
|
pub fn rally<T: Send + Sync + 'static, F, X: Send + 'static>(items: Vec<T>, f: F) -> (T, X)
|
|
where
|
|
F: Fn(&T) -> Option<X> + Send + Sync + Copy + 'static,
|
|
{
|
|
let item_len = items.len();
|
|
|
|
let (tx, rx) = mpsc::channel();
|
|
let items_len = items.len();
|
|
let mut handles = Vec::new();
|
|
|
|
for item in items {
|
|
let tx = tx.clone();
|
|
let item_ref = item;
|
|
let f = f;
|
|
|
|
let handle = thread::spawn(move || {
|
|
let start = Instant::now();
|
|
let result = f(&item_ref);
|
|
let elapsed = start.elapsed();
|
|
let _ = tx.send((item_ref, result, elapsed));
|
|
});
|
|
handles.push(handle);
|
|
}
|
|
|
|
drop(tx);
|
|
|
|
let mut count = 0;
|
|
|
|
while count < item_len {
|
|
let (fastest_item, fastest_result, elapsed) = rx.recv().unwrap();
|
|
count += 1;
|
|
|
|
if fastest_result.is_some() {
|
|
for handle in handles {
|
|
// todo : threads do not get killed here
|
|
handle.thread().unpark();
|
|
}
|
|
|
|
log::info!("Rally ended with {items_len} items in {elapsed:?}");
|
|
|
|
return (fastest_item, fastest_result.unwrap());
|
|
}
|
|
}
|
|
|
|
panic!("No useable results in rally")
|
|
}
|
|
|
|
pub fn retry<O, F: Fn() -> Option<O>>(f: F) -> O {
|
|
loop {
|
|
match f() {
|
|
Some(resp) => {
|
|
return resp;
|
|
}
|
|
None => {
|
|
log::info!("Got nothing, retrying...");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn retry_max<O, F: Fn() -> Option<O>>(max: u64, f: F) -> Option<O> {
|
|
let mut counter = 0;
|
|
while counter < max {
|
|
match f() {
|
|
Some(resp) => {
|
|
return Some(resp);
|
|
}
|
|
None => {
|
|
log::info!("Got nothing, retrying {}/{}", counter + 1, max);
|
|
}
|
|
}
|
|
counter += 1;
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
/// Retries a function gracefully with exponential backoff.
|
|
///
|
|
/// - `f`: A function that returns an `Option<O>`, retried on `None`.
|
|
/// - Starts with `base_delay` and doubles it each attempt, capping at `max_delay`.
|
|
///
|
|
/// Returns `Some(O)` if successful, otherwise keeps retrying indefinitely.
|
|
pub fn retry_backoff<O, F: Fn() -> Option<O>>(
|
|
f: F,
|
|
base_delay: Duration,
|
|
max_delay: Duration,
|
|
) -> Option<O> {
|
|
let mut delay = base_delay;
|
|
|
|
loop {
|
|
match f() {
|
|
Some(resp) => return Some(resp),
|
|
None => {
|
|
log::info!("Got nothing, retrying in {:?}", delay);
|
|
thread::sleep(delay);
|
|
delay = (delay * 2).min(max_delay);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Run a background task.
|
|
///
|
|
/// This spawns a seperate thread for a background process.
|
|
/// The background task is guaranteed to finish within its defined scope.
|
|
/// If the end of the scope is reached while the thread is still running it will block.
|
|
///
|
|
/// # Example
|
|
/// ```ignore
|
|
/// use comrade::background;
|
|
///
|
|
/// fn do_work() {
|
|
/// println!("doing work...");
|
|
///
|
|
/// // spawn background thread
|
|
/// background!(|| {
|
|
/// println!("doing something in the background");
|
|
/// std::thread::sleep(std::time::Duration::from_secs(3));
|
|
/// });
|
|
///
|
|
/// println!("doing something else...");
|
|
///
|
|
/// // end of scope
|
|
/// // the code will block until all background processes defined here are done.
|
|
/// }
|
|
///
|
|
/// fn main() {
|
|
/// do_work();
|
|
/// println!("finished with work");
|
|
/// }
|
|
/// ```
|
|
#[macro_export]
|
|
macro_rules! background {
|
|
($f:expr) => {
|
|
let handle = std::thread::spawn(move || $f());
|
|
comrade::defer!(|| {
|
|
handle.join().unwrap();
|
|
});
|
|
};
|
|
}
|
|
|
|
/// Start running a function after `duration`.
|
|
pub fn delay<F: Fn() + Send + 'static>(duration: std::time::Duration, f: F) {
|
|
let _ = std::thread::spawn(move || {
|
|
log::info!("Will start running in {duration:?}");
|
|
std::thread::sleep(duration);
|
|
f();
|
|
});
|
|
}
|
|
|
|
/// Run `f(&T) -> X` for every item in `items`
|
|
pub fn parallel<T: Send + Sync + 'static, F, X: Send + 'static>(items: Vec<T>, f: F) -> Vec<X>
|
|
where
|
|
F: Fn(&T) -> X + Send + Sync + Copy + 'static,
|
|
{
|
|
let threads: Vec<_> = items
|
|
.into_iter()
|
|
.map(|x| std::thread::spawn(move || f(&x)))
|
|
.collect();
|
|
|
|
threads.into_iter().map(|x| x.join().unwrap()).collect()
|
|
}
|
|
|
|
pub fn datetime_in(d: Duration) -> chrono::DateTime<chrono::Utc> {
|
|
chrono::Utc::now()
|
|
.checked_add_signed(chrono::TimeDelta::from_std(d).unwrap())
|
|
.unwrap()
|
|
}
|