From 5cb4facc4872a961a3ad3b11d4b48d01016b6600 Mon Sep 17 00:00:00 2001 From: JMARyA Date: Fri, 7 Mar 2025 04:33:35 +0100 Subject: [PATCH] init --- .gitignore | 1 + Cargo.lock | 326 ++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 9 ++ examples/functions.rs | 16 +++ examples/services.rs | 29 ++++ src/job.rs | 87 +++++++++++ src/lib.rs | 80 +++++++++++ src/service.rs | 136 ++++++++++++++++++ 8 files changed, 684 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 examples/functions.rs create mode 100644 examples/services.rs create mode 100644 src/job.rs create mode 100644 src/lib.rs create mode 100644 src/service.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..7229682 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,326 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + +[[package]] +name = "anstream" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + +[[package]] +name = "anstyle-parse" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" +dependencies = [ + "anstyle", + "once_cell", + "windows-sys", +] + +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "colorchoice" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" + +[[package]] +name = "comrade" +version = "0.1.0" +dependencies = [ + "env_logger", + "log", + "tokio", +] + +[[package]] +name = "env_filter" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcaee3d8e3cfc3fd92428d477bc97fc29ec8716d180c0d74c643bb26166660e0" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + +[[package]] +name = "libc" +version = "0.2.170" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" + +[[package]] +name = "log" +version = "0.4.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e3e04debbb59698c15bacbb6d93584a8c0ca9cc3213cb423d31f760d8843ce5" +dependencies = [ + "adler2", +] + +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.20.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "regex" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "tokio" +version = "1.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +dependencies = [ + "backtrace", + "pin-project-lite", +] + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..0d8f24c --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "comrade" +version = "0.1.0" +edition = "2024" + +[dependencies] +env_logger = "0.11.6" +log = "0.4.26" +tokio = "1.43.0" diff --git a/examples/functions.rs b/examples/functions.rs new file mode 100644 index 0000000..7783239 --- /dev/null +++ b/examples/functions.rs @@ -0,0 +1,16 @@ +use std::time::Duration; + +use comrade::rally; + +fn main() { + env_logger::init(); + + let items = vec![1, 2, 3, 4, 5]; + + let (input, output) = rally(items, |item: &_| { + std::thread::sleep(Duration::from_millis(item * 100)); + return 0; + }); + + println!("RALLY RESULTS: {input:?} -> {output:?}"); +} diff --git a/examples/services.rs b/examples/services.rs new file mode 100644 index 0000000..cd83640 --- /dev/null +++ b/examples/services.rs @@ -0,0 +1,29 @@ +use std::time::Duration; + +use comrade::service::ServiceManager; + +fn main() { + env_logger::init(); + + log::info!("Running services example"); + + // persistent background services + let mut s = ServiceManager::new(); + + s.register("myservice", |_| { + let mut c = 0; + loop { + // ... + println!("I am doing something!"); + std::thread::sleep(Duration::from_secs(1)); + c += 1; + if c == 3 { + panic!("Oh no!"); + } + } + }); + + let st = s.spawn(); + + st.join().unwrap(); +} diff --git a/src/job.rs b/src/job.rs new file mode 100644 index 0000000..48d0035 --- /dev/null +++ b/src/job.rs @@ -0,0 +1,87 @@ +use std::sync::{Arc, Mutex, mpsc}; + +#[derive(Clone)] +/// A generic job dispatcher struct that allows sending jobs of type `T` and receiving results of type `V` using message passing. +pub struct JobDispatcher { + sender: Arc>>>, +} + +impl JobDispatcher { + /// Creates a new instance of `JobDispatcher` and returns a tuple that contains it and a receiver end for `JobOrder`s. + /// # Example: + /// ``` + /// use jobdispatcher::*; + /// // Create job dispatcher + /// let (dispatcher, recv) = JobDispatcher::::new(); + /// + /// // Worker Thread + /// std::thread::spawn(move || { + /// for job in recv { + /// let result = job.param + 1; + /// job.done(result); + /// } + /// }); + /// + /// // Usage + /// let result = dispatcher.send(3); + /// assert_eq!(result, 4); + /// ``` + #[must_use] + pub fn new() -> (Self, mpsc::Receiver>) { + let (sender, receiver) = mpsc::channel(); + ( + Self { + sender: Arc::new(Mutex::new(sender)), + }, + receiver, + ) + } + + /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. + /// Returns the result of the job once it has been processed. + /// # Panics + /// This function panics when the `JobOrder` struct gets out of scope without returning a finished result. + /// Additionally if the internal `Mutex` is poisoned, this function will panic as well. + pub fn send(&self, param: T) -> V { + let (tx, rx) = mpsc::channel(); + let job_order = JobOrder::new(param, move |ret| { + tx.send(ret).unwrap(); + }); + self.sender.lock().unwrap().send(job_order).unwrap(); + rx.recv().unwrap() + } + + /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. + /// Returns `Some(V)` when the job returns an result, `None` if somehow nothing was returned or the internal `Mutex` is poisoned. + pub fn try_send(&self, param: T) -> Option { + let (tx, rx) = mpsc::channel(); + let job_order = JobOrder::new(param, move |ret| { + tx.send(ret).unwrap(); + }); + self.sender.lock().ok()?.send(job_order).ok()?; + rx.recv().ok() + } +} + +/// A struct that represents a job order that encapsulates a job of type `T` and its result of type `V`, along with a callback function that will send the result back to the job origin. +pub struct JobOrder { + /// The job parameter of type `T`. + pub param: T, + callback: Box, +} + +impl JobOrder { + /// Creates a new `JobOrder` instance with the specified job parameter `param` of type `T` and a callback function that takes the job result of type `V` as an argument. + #[must_use] + fn new(param: T, callback: impl FnOnce(V) + Send + 'static) -> Self { + Self { + param, + callback: Box::new(callback), + } + } + + /// Send the result of the `JobOrder` back to it's origin + pub fn done(self, val: V) { + (self.callback)(val); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..abe173f --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,80 @@ +use std::{sync::mpsc, thread, time::Instant}; + +pub mod job; +pub mod service; + +/// Rally Function +/// +/// This executes a thread for every item executing `f(&item) -> X`. Whatever function returns first is returned while every other thread is killed. +pub fn rally(items: Vec, f: F) -> (T, X) +where + F: Fn(&T) -> X + Send + Sync + Copy + 'static, +{ + let (tx, rx) = mpsc::channel(); + let items_len = items.len(); + let mut handles = Vec::new(); + + for item in items { + let tx = tx.clone(); + let item_ref = item; + let f = f; + + let handle = thread::spawn(move || { + let start = Instant::now(); + let result = f(&item_ref); + let elapsed = start.elapsed(); + let _ = tx.send((item_ref, result, elapsed)); + }); + handles.push(handle); + } + + drop(tx); + + let (fastest_item, fastest_result, elapsed) = rx.recv().unwrap(); + + for handle in handles { + handle.thread().unpark(); + } + + log::info!("Rally ended with {items_len} items in {elapsed:?}"); + + (fastest_item, fastest_result) +} + +// TODO : async version +/* +pub fn rally_async(items: Vec, f: F) -> (T, X) +where + F: AsyncFn(&T) -> X + Send + Sync + Copy + 'static, +{ + let (tx, rx) = mpsc::channel(); + let mut handles = Vec::new(); + + for item in items { + let tx = tx.clone(); + let item_ref = item; + let f = f; + + tokio::task::spawn() + + let handle = thread::spawn(move || { + let start = Instant::now(); + let result = f(&item_ref); + let elapsed = start.elapsed(); + let _ = tx.send((item_ref, result, elapsed)); + }); + handles.push(handle); + } + + drop(tx); + + let (fastest_item, fastest_result, _) = rx.recv().unwrap(); + + for handle in handles { + handle.thread().unpark(); + } + + (fastest_item, fastest_result) +} + + */ diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..319d8b0 --- /dev/null +++ b/src/service.rs @@ -0,0 +1,136 @@ +use std::{ + collections::HashMap, + sync::{ + Arc, + mpsc::{Receiver, RecvTimeoutError, Sender}, + }, + thread::JoinHandle, + time::Duration, +}; + +/// Status receiver of a dead man switch +pub struct DeadManReceiver { + rx: Receiver, +} + +impl DeadManReceiver { + /// Checks if a dead man switch is alive + pub fn alive(&self) -> bool { + let res = self.rx.recv_timeout(Duration::from_millis(400)); + if let Err(err) = res { + return match err { + RecvTimeoutError::Timeout => true, + RecvTimeoutError::Disconnected => false, + }; + } + + false + } +} + +/// A Dead Man Switch +pub struct DeadManSwitch { + pub lifeline: Sender, +} + +impl DeadManSwitch { + /// Create a new Dead Man Switch + /// + /// The Dead Man Switch will notify the receiver and mark itself dead after it got dropped (for example in a panic) + pub fn new() -> (Self, DeadManReceiver) { + let (tx, rx) = std::sync::mpsc::channel(); + + (Self { lifeline: tx }, DeadManReceiver { rx }) + } +} + +impl Drop for DeadManSwitch { + fn drop(&mut self) { + log::info!("I am DEAD man 💀"); + let _ = self.lifeline.send(true); + } +} + +/// Service Manager +/// +/// This will manage and keep background services running- +pub struct ServiceManager () + 'static> { + pub services: HashMap>, +} + +impl () + 'static + Send + Sync> ServiceManager { + pub fn new() -> Self { + Self { + services: HashMap::new(), + } + } + + /// Register a new background service + pub fn register(&mut self, service_name: &str, service: T) { + self.services + .insert(service_name.to_string(), Arc::new(service)); + } + + fn run(self) -> ! { + let mut threads = HashMap::new(); + + // Initial start + for (service, f) in self.services.iter() { + log::info!("Starting service {service}"); + let (dms, dmr) = DeadManSwitch::new(); + let f = Arc::clone(&f); + threads.insert( + service.clone(), + ( + std::thread::spawn(move || { + f(dms); + }), + dmr, + ), + ); + } + + // Monitor loop + loop { + let mut threads_new = HashMap::new(); + + for (service, (t, dm)) in &mut threads { + let alive = dm.alive(); + log::debug!( + "Checking up on {service} -- {}", + if alive { "ALIVE" } else { "DEAD" } + ); + + if !alive { + log::warn!("Service {service} died. Restarting..."); + + while !t.is_finished() { + log::debug!("waiting for thread"); + } + + let f = Arc::clone(self.services.get(service).unwrap()); + + let (dms, dmr) = DeadManSwitch::new(); + threads_new.insert( + service.clone(), + ( + std::thread::spawn(move || { + f(dms); + }), + dmr, + ), + ); + } + } + + threads.extend(threads_new); + } + } + + /// Start the services + /// + /// This returns a ThreadHandle for the service monitor thread. + pub fn spawn(self) -> JoinHandle<()> { + std::thread::spawn(|| self.run()) + } +}