worker

This commit is contained in:
JMARyA 2025-03-07 20:04:58 +01:00
parent 5cb4facc48
commit e3393f1e09
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
11 changed files with 841 additions and 48 deletions

View file

@ -1,9 +1,10 @@
use std::sync::{Arc, Mutex, mpsc};
use crossbeam::channel::{Receiver, Sender};
use std::sync::mpsc;
#[derive(Clone)]
/// A generic job dispatcher struct that allows sending jobs of type `T` and receiving results of type `V` using message passing.
pub struct JobDispatcher<T: Send + 'static, V: Send + 'static> {
sender: Arc<Mutex<mpsc::Sender<JobOrder<T, V>>>>,
sender: Sender<JobOrder<T, V>>,
}
impl<T: Send + 'static, V: Send + 'static> JobDispatcher<T, V> {
@ -27,14 +28,10 @@ impl<T: Send + 'static, V: Send + 'static> JobDispatcher<T, V> {
/// assert_eq!(result, 4);
/// ```
#[must_use]
pub fn new() -> (Self, mpsc::Receiver<JobOrder<T, V>>) {
let (sender, receiver) = mpsc::channel();
(
Self {
sender: Arc::new(Mutex::new(sender)),
},
receiver,
)
pub fn new() -> (Self, Receiver<JobOrder<T, V>>) {
let (sender, receiver) = crossbeam::channel::bounded(12);
(Self { sender: sender }, receiver)
}
/// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`.
@ -47,7 +44,7 @@ impl<T: Send + 'static, V: Send + 'static> JobDispatcher<T, V> {
let job_order = JobOrder::new(param, move |ret| {
tx.send(ret).unwrap();
});
self.sender.lock().unwrap().send(job_order).unwrap();
self.sender.send(job_order).unwrap();
rx.recv().unwrap()
}
@ -58,7 +55,7 @@ impl<T: Send + 'static, V: Send + 'static> JobDispatcher<T, V> {
let job_order = JobOrder::new(param, move |ret| {
tx.send(ret).unwrap();
});
self.sender.lock().ok()?.send(job_order).ok()?;
self.sender.send(job_order).ok()?;
rx.recv().ok()
}
}

View file

@ -2,6 +2,19 @@ use std::{sync::mpsc, thread, time::Instant};
pub mod job;
pub mod service;
pub use comrade_macro::worker;
use dashmap::DashMap;
use once_cell::sync::Lazy;
pub use serde_json;
// TODO : worker docs + refactor
// TODO : worker non blocking fn call
// TODO : worker parallelism (Load Balanced Queue + Multiple Threads)
// TODO : refactor dispatcher backends (memory, valkey)
pub static UNION: Lazy<
DashMap<&'static str, job::JobDispatcher<serde_json::Value, serde_json::Value>>,
> = Lazy::new(DashMap::new);
/// Rally Function
///

View file

@ -54,24 +54,47 @@ impl Drop for DeadManSwitch {
/// Service Manager
///
/// This will manage and keep background services running-
pub struct ServiceManager<T: Fn(DeadManSwitch) -> () + 'static> {
pub services: HashMap<String, Arc<T>>,
pub struct ServiceManager {
pub services: HashMap<String, Arc<Box<dyn Fn(DeadManSwitch) -> () + 'static + Send + Sync>>>,
pub mode: ServiceMode,
}
impl<T: Fn(DeadManSwitch) -> () + 'static + Send + Sync> ServiceManager<T> {
// TODO : impl decay mode
/// The mode on which services should operate
pub enum ServiceMode {
/// Behave like a daemon. Services can never die and will come back to life after beeing killed. They will always haunt you.
Daemon,
/// Graceful Decay. Services stay dead after beeing killed. If no services are running anymore the `ServiceManager` will commit suicide.
Decay,
}
impl ServiceManager {
pub fn new() -> Self {
Self {
services: HashMap::new(),
mode: ServiceMode::Daemon,
}
}
/// Register a new background service
pub fn register(&mut self, service_name: &str, service: T) {
self.services
.insert(service_name.to_string(), Arc::new(service));
/// Set a `ServiceMode` to control the behaviour of services.
pub fn mode(mut self, mode: ServiceMode) -> Self {
self.mode = mode;
self
}
fn run(self) -> ! {
/// Register a new background service
pub fn register<T: Fn(DeadManSwitch) -> () + 'static + Send + Sync>(
mut self,
service_name: &str,
service: T,
) -> Self {
self.services
.insert(service_name.to_string(), Arc::new(Box::new(service)));
self
}
fn run(self) {
let mut threads = HashMap::new();
// Initial start
@ -93,6 +116,7 @@ impl<T: Fn(DeadManSwitch) -> () + 'static + Send + Sync> ServiceManager<T> {
// Monitor loop
loop {
let mut threads_new = HashMap::new();
let mut threads_rm = Vec::new();
for (service, (t, dm)) in &mut threads {
let alive = dm.alive();
@ -102,28 +126,46 @@ impl<T: Fn(DeadManSwitch) -> () + 'static + Send + Sync> ServiceManager<T> {
);
if !alive {
log::warn!("Service {service} died. Restarting...");
// Restart (Daemon)
match self.mode {
ServiceMode::Daemon => {
log::warn!("Service {service} died. Restarting...");
while !t.is_finished() {
log::debug!("waiting for thread");
while !t.is_finished() {
log::debug!("waiting for thread");
}
let f = Arc::clone(self.services.get(service).unwrap());
let (dms, dmr) = DeadManSwitch::new();
threads_new.insert(
service.clone(),
(
std::thread::spawn(move || {
f(dms);
}),
dmr,
),
);
}
ServiceMode::Decay => {
log::info!("Service {service} died.");
threads_rm.push(service.clone());
}
}
let f = Arc::clone(self.services.get(service).unwrap());
let (dms, dmr) = DeadManSwitch::new();
threads_new.insert(
service.clone(),
(
std::thread::spawn(move || {
f(dms);
}),
dmr,
),
);
}
}
for t in threads_rm {
threads.remove(&t);
}
threads.extend(threads_new);
if threads.is_empty() && matches!(self.mode, ServiceMode::Decay) {
log::info!("All my services are dead. Commiting suicide...");
break;
}
}
}