This commit is contained in:
JMARyA 2025-03-07 04:33:35 +01:00
commit 5cb4facc48
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
8 changed files with 684 additions and 0 deletions

87
src/job.rs Normal file
View file

@ -0,0 +1,87 @@
use std::sync::{Arc, Mutex, mpsc};
#[derive(Clone)]
/// A generic job dispatcher struct that allows sending jobs of type `T` and receiving results of type `V` using message passing.
pub struct JobDispatcher<T: Send + 'static, V: Send + 'static> {
sender: Arc<Mutex<mpsc::Sender<JobOrder<T, V>>>>,
}
impl<T: Send + 'static, V: Send + 'static> JobDispatcher<T, V> {
/// Creates a new instance of `JobDispatcher` and returns a tuple that contains it and a receiver end for `JobOrder`s.
/// # Example:
/// ```
/// use jobdispatcher::*;
/// // Create job dispatcher
/// let (dispatcher, recv) = JobDispatcher::<i32, i32>::new();
///
/// // Worker Thread
/// std::thread::spawn(move || {
/// for job in recv {
/// let result = job.param + 1;
/// job.done(result);
/// }
/// });
///
/// // Usage
/// let result = dispatcher.send(3);
/// assert_eq!(result, 4);
/// ```
#[must_use]
pub fn new() -> (Self, mpsc::Receiver<JobOrder<T, V>>) {
let (sender, receiver) = mpsc::channel();
(
Self {
sender: Arc::new(Mutex::new(sender)),
},
receiver,
)
}
/// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`.
/// Returns the result of the job once it has been processed.
/// # Panics
/// This function panics when the `JobOrder` struct gets out of scope without returning a finished result.
/// Additionally if the internal `Mutex` is poisoned, this function will panic as well.
pub fn send(&self, param: T) -> V {
let (tx, rx) = mpsc::channel();
let job_order = JobOrder::new(param, move |ret| {
tx.send(ret).unwrap();
});
self.sender.lock().unwrap().send(job_order).unwrap();
rx.recv().unwrap()
}
/// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`.
/// Returns `Some(V)` when the job returns an result, `None` if somehow nothing was returned or the internal `Mutex` is poisoned.
pub fn try_send(&self, param: T) -> Option<V> {
let (tx, rx) = mpsc::channel();
let job_order = JobOrder::new(param, move |ret| {
tx.send(ret).unwrap();
});
self.sender.lock().ok()?.send(job_order).ok()?;
rx.recv().ok()
}
}
/// A struct that represents a job order that encapsulates a job of type `T` and its result of type `V`, along with a callback function that will send the result back to the job origin.
pub struct JobOrder<T, V> {
/// The job parameter of type `T`.
pub param: T,
callback: Box<dyn FnOnce(V) + Send>,
}
impl<T, V> JobOrder<T, V> {
/// Creates a new `JobOrder` instance with the specified job parameter `param` of type `T` and a callback function that takes the job result of type `V` as an argument.
#[must_use]
fn new(param: T, callback: impl FnOnce(V) + Send + 'static) -> Self {
Self {
param,
callback: Box::new(callback),
}
}
/// Send the result of the `JobOrder` back to it's origin
pub fn done(self, val: V) {
(self.callback)(val);
}
}

80
src/lib.rs Normal file
View file

@ -0,0 +1,80 @@
use std::{sync::mpsc, thread, time::Instant};
pub mod job;
pub mod service;
/// Rally Function
///
/// This executes a thread for every item executing `f(&item) -> X`. Whatever function returns first is returned while every other thread is killed.
pub fn rally<T: Send + Sync + 'static, F, X: Send + 'static>(items: Vec<T>, f: F) -> (T, X)
where
F: Fn(&T) -> X + Send + Sync + Copy + 'static,
{
let (tx, rx) = mpsc::channel();
let items_len = items.len();
let mut handles = Vec::new();
for item in items {
let tx = tx.clone();
let item_ref = item;
let f = f;
let handle = thread::spawn(move || {
let start = Instant::now();
let result = f(&item_ref);
let elapsed = start.elapsed();
let _ = tx.send((item_ref, result, elapsed));
});
handles.push(handle);
}
drop(tx);
let (fastest_item, fastest_result, elapsed) = rx.recv().unwrap();
for handle in handles {
handle.thread().unpark();
}
log::info!("Rally ended with {items_len} items in {elapsed:?}");
(fastest_item, fastest_result)
}
// TODO : async version
/*
pub fn rally_async<T: Send + Sync + 'static, F, X: Send + 'static>(items: Vec<T>, f: F) -> (T, X)
where
F: AsyncFn(&T) -> X + Send + Sync + Copy + 'static,
{
let (tx, rx) = mpsc::channel();
let mut handles = Vec::new();
for item in items {
let tx = tx.clone();
let item_ref = item;
let f = f;
tokio::task::spawn()
let handle = thread::spawn(move || {
let start = Instant::now();
let result = f(&item_ref);
let elapsed = start.elapsed();
let _ = tx.send((item_ref, result, elapsed));
});
handles.push(handle);
}
drop(tx);
let (fastest_item, fastest_result, _) = rx.recv().unwrap();
for handle in handles {
handle.thread().unpark();
}
(fastest_item, fastest_result)
}
*/

136
src/service.rs Normal file
View file

@ -0,0 +1,136 @@
use std::{
collections::HashMap,
sync::{
Arc,
mpsc::{Receiver, RecvTimeoutError, Sender},
},
thread::JoinHandle,
time::Duration,
};
/// Status receiver of a dead man switch
pub struct DeadManReceiver {
rx: Receiver<bool>,
}
impl DeadManReceiver {
/// Checks if a dead man switch is alive
pub fn alive(&self) -> bool {
let res = self.rx.recv_timeout(Duration::from_millis(400));
if let Err(err) = res {
return match err {
RecvTimeoutError::Timeout => true,
RecvTimeoutError::Disconnected => false,
};
}
false
}
}
/// A Dead Man Switch
pub struct DeadManSwitch {
pub lifeline: Sender<bool>,
}
impl DeadManSwitch {
/// Create a new Dead Man Switch
///
/// The Dead Man Switch will notify the receiver and mark itself dead after it got dropped (for example in a panic)
pub fn new() -> (Self, DeadManReceiver) {
let (tx, rx) = std::sync::mpsc::channel();
(Self { lifeline: tx }, DeadManReceiver { rx })
}
}
impl Drop for DeadManSwitch {
fn drop(&mut self) {
log::info!("I am DEAD man 💀");
let _ = self.lifeline.send(true);
}
}
/// Service Manager
///
/// This will manage and keep background services running-
pub struct ServiceManager<T: Fn(DeadManSwitch) -> () + 'static> {
pub services: HashMap<String, Arc<T>>,
}
impl<T: Fn(DeadManSwitch) -> () + 'static + Send + Sync> ServiceManager<T> {
pub fn new() -> Self {
Self {
services: HashMap::new(),
}
}
/// Register a new background service
pub fn register(&mut self, service_name: &str, service: T) {
self.services
.insert(service_name.to_string(), Arc::new(service));
}
fn run(self) -> ! {
let mut threads = HashMap::new();
// Initial start
for (service, f) in self.services.iter() {
log::info!("Starting service {service}");
let (dms, dmr) = DeadManSwitch::new();
let f = Arc::clone(&f);
threads.insert(
service.clone(),
(
std::thread::spawn(move || {
f(dms);
}),
dmr,
),
);
}
// Monitor loop
loop {
let mut threads_new = HashMap::new();
for (service, (t, dm)) in &mut threads {
let alive = dm.alive();
log::debug!(
"Checking up on {service} -- {}",
if alive { "ALIVE" } else { "DEAD" }
);
if !alive {
log::warn!("Service {service} died. Restarting...");
while !t.is_finished() {
log::debug!("waiting for thread");
}
let f = Arc::clone(self.services.get(service).unwrap());
let (dms, dmr) = DeadManSwitch::new();
threads_new.insert(
service.clone(),
(
std::thread::spawn(move || {
f(dms);
}),
dmr,
),
);
}
}
threads.extend(threads_new);
}
}
/// Start the services
///
/// This returns a ThreadHandle for the service monitor thread.
pub fn spawn(self) -> JoinHandle<()> {
std::thread::spawn(|| self.run())
}
}