# ☭ comrade `comrade` is a Rust crate designed for managing compute work. It allows seamless management of shared work and functions even across machines. ## Features - **Parallel Execution**: Dispatch tasks to run concurrently and gather their results. - **Rally Execution**: Run multiple tasks in parallel and return the result of the fastest one. - **Service Management**: Manage background services with different operating modes (`Decay`, `Daemon`). - **Worker Unions**: Delegate tasks using `#[worker]` annotations locally or as distributed task queues across multiple machines. - **Background Tasks**: Seamlessly run background tasks without blocking the main logic of your program. ## Core Concepts ### Higher Level Functions `comrade` provides various convenient functions. ```rust // process every item in parallel let results: Vec = parallel(items, |item: &i32| { // ... }); // rally (return fastest computed result out of items) // example: run multiple downloads and return the first finished one let res: (i32, i32) = rally(items, |item: &i32| { // ... }); // Run background tasks without blocking the main thread background!(|| { // Background task logic println!("This is a background task!"); }); fn some_fn() { println!("Hello World!"); defer!(|| { // this will run at the end of the scope println!("Bye World!"); }); println!("doing something"); } // Retry a `Fn() -> Option` until it returns `Some(_)`. let value: &str = retry(|| { if rand::rng().random_range(0..10) > 5 { Some("hello") } else { None } }) // Delayed execution delay(Duration::from_secs(4), || { println!("I will run in 4 seconds from now on!"); }); ``` ### Service Management `comrade` provides a way to manage persistent services with different modes. The `Decay` mode allows services to die, while the `Daemon` mode revives them and keeps them running indefinitely. ```rust use comrade::service::ServiceManager; fn run_services() { let mut manager = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); // Register and start services manager = manager.register("my_service", |_| { // Service logic here }); let thread_handle = manager.spawn(); thread_handle.join().unwrap(); } ``` #### Cron Tasks The ServiceManager also supports running functions periodically or on time: ```rust fn main() { let s = ServiceManager::new(); // Init Cron Manager let cron = Cron::new(); // Add Cron Task cron.add_task("4_sec", Schedule::Every(Duration::from_secs(4)), || { println!("I run every 4 at {}", chrono::Utc::now()); }); cron.add_task("2_sec", Schedule::Every(Duration::from_secs(2)), || { println!("I run every 2 seconds at {}", chrono::Utc::now()); }); cron.add_task( "daily", Schedule::Every(Duration::from_secs(60 * 60 * 24)), || { println!("I run daily"); }, ); // Start running the Cron Manager let (s, cron) = s.register_cron(cron.into()); let s = s.spawn(); defer!(|| { s.join().unwrap(); }); // Add another Cron Task after running the manager dynamically cron.add_task( "future_task", Schedule::At(datetime_in(Duration::from_secs(2))), || { println!("I am in the future"); }, ); // Functionally the same as above cron.run_at(datetime_in(Duration::from_secs(3)), || { println!("The Future"); }); } ``` ### Worker Unions You can annotate a function with `#[worker]` which gives them superpowers. These functions can be queued and dispatched by the system, and their results are returned when completed. ```rust use comrade::{worker}; // Single local worker #[worker] pub fn myfn(i: i32) -> i32 { i * 2 } // 4 local worker threads #[worker(4)] pub fn multiply(a: i32, b: i32) -> i32 { a * b } ``` After initialization these functions can then be called anywhere and will be processed eventually by whatever worker picks it up. Additionally there are new functions derived from your function. See the below example: ```rust fn main() { let mut manager = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); // Init worker thread on `ServiceManager` manager = multiply_init(manager); let manager = manager.spawn(); // works like the original function let res = multiply(2, 2); // async let e = take_time_async(1500); println!("This will run right after!"); // ... // is OUR value ready? println!("the value is {}", e.wait()); // Shutdown worker thread multiply_shutdown(); manager.join().unwrap(); } ``` You could effeciently run batch work: ```rust fn batch_work() { let mut work = Vec::new(); for i in 0..10 { work.push((i.to_string(), multiply_async(i, i))); } for (label, res) in LabelPendingTaskIterator(work) { println!("Finished task {label} -> {res}"); } } ``` These tasks can now be distributed with Valkey. Make sure you have a Valkey server running and the `$VALKEY_URL` environment variable is set for your application: ```yml services: valkey: image: valkey/valkey ports: - 6379:6379 ``` Then you can spawn worker threads like that: ```rust fn main() { let mut s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); s = multiply_init_union(s); s = myfn_init_union(s); let s = s.spawn(); log::info!("Spawned workers. Working for 1 minute"); std::thread::sleep(Duration::from_secs(60)); myfn_shutdown(); multiply_shutdown(); s.join().unwrap(); } ``` When workers are running, you can use them like: ```rust fn main() { // Register workers in union myfn_register_union(); // Will be computed somewhere else let x = myfn(50); println!("x is {x}"); } ``` ### Stateful Functions If you have a workload which can be iteratively computed by modifying state it can be modeled as a `IteratedFunction`. These functions can be paused, stopped, saved to disk and revived later. First define a iterative function: ```rust #[derive(Debug, Default, Clone)] pub struct AccFnContext { pub iter: u64, pub acc: u64, } pub fn multiply_iterated( mut ctx: FunctionContext, ) -> FunctionContext { // init let (a, b) = ctx.initial; // end condition (return) if b == ctx.context.iter { ctx.done(ctx.context.acc); return ctx; } // computation println!("doing calc {}", ctx.context.acc); std::thread::sleep(Duration::from_millis(50)); let val = ctx.context.acc + a; // saving state ctx.state(|x| { x.iter += 1; x.acc = val; }); return ctx; } ``` Then you can use it like: ```rust fn main() { // run normally let f = IteratedFunction::new_threaded(multiply_iterated, (5, 5)); println!("Result is {}", f.output()); // function starts running let f = IteratedFunction::new_threaded(multiply_iterated, (5, 50)); // pause the function f.pause(); // stop the function and get state let state = f.stop(); // revive and start running again from state let f = IteratedFunction::new_threaded_from_state(multiply_iterated, state); // get output println!("Result is {}", f.output()); } ```