From 18c663fcdbcaf2dc468639050b190e22fd9caae0 Mon Sep 17 00:00:00 2001 From: JMARyA Date: Mon, 10 Mar 2025 11:27:20 +0100 Subject: [PATCH 1/2] =?UTF-8?q?=E2=9C=A8=20iterated=20fn=20+=20defer=20+?= =?UTF-8?q?=20retry?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 1 + README.md | 120 +++++++++++++++++--- comrade-macro/Cargo.lock | 172 ++++++++++++++++++++++++++++ comrade-macro/Cargo.toml | 1 + comrade-macro/src/lib.rs | 35 +++++- examples/defer.rs | 12 ++ examples/iterated.rs | 67 +++++++++++ examples/work.rs | 18 ++- src/defer.rs | 16 +++ src/iterated.rs | 240 +++++++++++++++++++++++++++++++++++++++ src/lib.rs | 61 +++------- 11 files changed, 678 insertions(+), 65 deletions(-) create mode 100644 examples/defer.rs create mode 100644 examples/iterated.rs create mode 100644 src/defer.rs create mode 100644 src/iterated.rs diff --git a/Cargo.lock b/Cargo.lock index c161fd0..aad561a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,6 +167,7 @@ version = "0.1.0" dependencies = [ "proc-macro2", "quote", + "rand", "serde_json", "syn", ] diff --git a/README.md b/README.md index 4c7ecc9..08ae4f1 100644 --- a/README.md +++ b/README.md @@ -11,37 +11,47 @@ ## Core Concepts -### Parallel Execution +### Higher Level Functions -`comrade` provides a simple interface for running tasks in parallel, perfect for independent tasks that can be processed concurrently. +`comrade` provides various convenient functions. ```rust +// process every item in parallel let results: Vec = parallel(items, |item: &i32| { // ... }); -``` -### Rally Execution - -The `rally` function allows you to run multiple tasks in parallel and return the result of the **first task to finish**. This is useful when you want to prioritize the first available result from several tasks (example: download from multiple HTTP mirrors). - -```rust +// rally (return fastest computed result out of items) +// example: run multiple downloads and return the first finished one let res: (i32, i32) = rally(items, |item: &i32| { // ... }); -``` -### Background Tasks +// Run background tasks without blocking the main thread +background(|| { + // Background task logic + println!("This is a background task!"); +}); -Easily run tasks in the background without blocking the main thread. This is useful for code that needs to be run without waiting for a result. +fn some_fn() { + println!("Hello World!"); -```rust -fn handle() { - background(|| { - // Background task logic - println!("This is a background task!"); + defer!(|| { + // this will run at the end of the scope + println!("Bye World!"); }); + + println!("doing something"); } + +// Retry a `Fn() -> Option` until it returns `Some(_)`. +let value: &str = retry(|| { + if rand::rng().random_range(0..10) > 5 { + Some("hello") + } else { + None + } +}) ``` ### Service Management @@ -113,6 +123,21 @@ fn main() { } ``` +You could effeciently run batch work: +```rust +fn batch_work() { + let mut work = Vec::new(); + + for i in 0..10 { + work.push((i.to_string(), multiply_async(i, i))); + } + + for (label, res) in LabelPendingTaskIterator(work) { + println!("Finished task {label} -> {res}"); + } +} +``` + These tasks can now be distributed with Valkey. Make sure you have a Valkey server running and the `$VALKEY_URL` environment variable is set for your application: @@ -155,3 +180,66 @@ fn main() { println!("x is {x}"); } ``` + +### Stateful Functions +If you have a workload which can be iteratively computed by modifying state it can be modeled as a `IteratedFunction`. +These functions can be paused, stopped, saved to disk and revived later. + +First define a iterative function: +```rust +#[derive(Debug, Default, Clone)] +pub struct AccFnContext { + pub iter: u64, + pub acc: u64, +} + +pub fn multiply_iterated( + mut ctx: FunctionContext, +) -> FunctionContext { + // init + let (a, b) = ctx.initial; + + // end condition (return) + if b == ctx.context.iter { + ctx.done(ctx.context.acc); + return ctx; + } + + // computation + println!("doing calc {}", ctx.context.acc); + std::thread::sleep(Duration::from_millis(50)); + let val = ctx.context.acc + a; + + // saving state + ctx.state(|x| { + x.iter += 1; + x.acc = val; + }); + + return ctx; +} +``` + +Then you can use it like: +```rust +fn main() { + // run normally + let f = IteratedFunction::new_threaded(multiply_iterated, (5, 5)); + println!("Result is {}", f.output()); + + // function starts running + let f = IteratedFunction::new_threaded(multiply_iterated, (5, 50)); + + // pause the function + f.pause(); + + // stop the function and get state + let state = f.stop(); + + // revive and start running again from state + let f = IteratedFunction::new_threaded_from_state(multiply_iterated, state); + + // get output + println!("Result is {}", f.output()); +} +``` diff --git a/comrade-macro/Cargo.lock b/comrade-macro/Cargo.lock index c41495c..c47ada0 100644 --- a/comrade-macro/Cargo.lock +++ b/comrade-macro/Cargo.lock @@ -2,28 +2,68 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "bitflags" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + [[package]] name = "comrade-macro" version = "0.1.0" dependencies = [ "proc-macro2", "quote", + "rand", "serde_json", "syn", ] +[[package]] +name = "getrandom" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" +dependencies = [ + "cfg-if", + "libc", + "wasi", + "windows-targets", +] + [[package]] name = "itoa" version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "libc" +version = "0.2.170" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" + [[package]] name = "memchr" version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "proc-macro2" version = "1.0.94" @@ -42,6 +82,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha", + "rand_core", + "zerocopy", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom", +] + [[package]] name = "ryu" version = "1.0.20" @@ -96,3 +166,105 @@ name = "unicode-ident" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "wasi" +version = "0.13.3+wasi-0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2" +dependencies = [ + "wit-bindgen-rt", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "wit-bindgen-rt" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" +dependencies = [ + "bitflags", +] + +[[package]] +name = "zerocopy" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/comrade-macro/Cargo.toml b/comrade-macro/Cargo.toml index bdf6f55..d001722 100644 --- a/comrade-macro/Cargo.toml +++ b/comrade-macro/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] proc-macro2 = "1.0.94" quote = "1.0.39" +rand = "0.9.0" serde_json = "1.0.140" syn = { version = "2.0.99", features = ["full"] } diff --git a/comrade-macro/src/lib.rs b/comrade-macro/src/lib.rs index 654d81d..2477421 100644 --- a/comrade-macro/src/lib.rs +++ b/comrade-macro/src/lib.rs @@ -1,6 +1,7 @@ use proc_macro::TokenStream; use quote::{format_ident, quote}; -use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; +use rand::Rng; +use syn::{ExprClosure, FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; /// This macro turns this function into a worker. /// @@ -271,3 +272,35 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { output.into() } + +/// A procedural macro that defers execution of a closure until the surrounding scope ends similiarly to Go's defer statement. +/// +/// # Example +/// ``` +/// use comrade::defer; +/// +/// fn main() { +/// defer!(|| { +/// println!("This will be executed at the end of the scope."); +/// }); +/// println!("This runs first."); +/// } +/// ``` +/// +/// The `defer!` macro ensures that the provided closure is wrapped in a `Defer` instance, which will execute the closure when the variable goes out of scope. +#[proc_macro] +pub fn defer(input: TokenStream) -> TokenStream { + // Parse the input as a closure expression (|| { ... }) + let closure = parse_macro_input!(input as ExprClosure); + + // Generate a random number for a unique variable name + let random_number = rand::rng().random_range(1000..9999); + let rand_ident = format_ident!("{}", format!("defer_{}", random_number)); + + // Expand into a let statement that wraps the closure in Defer::new + let expanded = quote! { + let #rand_ident = comrade::Defer::new(#closure); + }; + + TokenStream::from(expanded) +} diff --git a/examples/defer.rs b/examples/defer.rs new file mode 100644 index 0000000..f6ec416 --- /dev/null +++ b/examples/defer.rs @@ -0,0 +1,12 @@ +use comrade::defer; + +fn main() { + println!("Hello World!"); + + defer!(|| { + // this will run at the end of the scope + println!("Bye World!"); + }); + + println!("doing something"); +} diff --git a/examples/iterated.rs b/examples/iterated.rs new file mode 100644 index 0000000..52f3e60 --- /dev/null +++ b/examples/iterated.rs @@ -0,0 +1,67 @@ +use std::time::Duration; + +use comrade::iterated::{FunctionContext, IteratedFunction}; + +#[derive(Debug, Default, Clone)] +pub struct AccFnContext { + pub iter: u64, + pub acc: u64, +} + +pub fn multiply_iterated( + mut ctx: FunctionContext, +) -> FunctionContext { + // init + let (a, b) = ctx.initial; + + // end condition (return) + if b == ctx.context.iter { + ctx.done(ctx.context.acc); + return ctx; + } + + // computation + println!("doing calc {}", ctx.context.acc); + std::thread::sleep(Duration::from_millis(50)); + let val = ctx.context.acc + a; + + // saving state + ctx.state(|x| { + x.iter += 1; + x.acc = val; + }); + + return ctx; +} + +fn main() { + env_logger::init(); + + let f: IteratedFunction = + IteratedFunction::new(multiply_iterated); + + let x = f.run_to_end((5, 4)); + println!("computed x -> {x}"); + + // async + let f = IteratedFunction::new_threaded(multiply_iterated, (5, 5)); + println!("This is running"); + println!("result is {}", f.output()); + + let f = IteratedFunction::new_threaded(multiply_iterated, (5, 50)); + + // pause the function + f.pause(); + + // stop the function and get state + let state = f.stop(); + println!("This was running"); + println!("state is {state:?}"); + + println!("reviving function"); + // continue with previous computed state + let f = IteratedFunction::new_threaded_from_state(multiply_iterated, state); + + // get output + println!("result is {}", f.output()); +} diff --git a/examples/work.rs b/examples/work.rs index ecc64fb..3b745a6 100644 --- a/examples/work.rs +++ b/examples/work.rs @@ -1,7 +1,7 @@ use std::time::Duration; use comrade::{ - job::{JobDispatcher, JobOrder}, + job::{JobDispatcher, JobOrder, LabelPendingTaskIterator}, service::ServiceManager, worker, }; @@ -22,6 +22,18 @@ pub fn multiply(a: i32, b: i32) -> i32 { a * b } +pub fn batch_work() { + let mut work = Vec::new(); + + for i in 0..10 { + work.push((i.to_string(), multiply_async(i, i))); + } + + for (label, res) in LabelPendingTaskIterator(work) { + println!("Finished task {label} -> {res}"); + } +} + fn do_work(multiply: multiply_Scoped, myfn: myfn_Scoped) { for i in 0..10 { let x = multiply.call(i, i); @@ -41,17 +53,19 @@ fn main() { let s = s.spawn(); do_work(multiply, myfn_fn); - s.join().unwrap(); let s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); let s = myfn_init(s); + let s = multiply_init(s); let s = take_time_init(s); let s = s.spawn(); let x = myfn(55); println!("myfn {x}"); + batch_work(); + // decoupled let e = take_time_async(1500); println!("This will run right after!"); diff --git a/src/defer.rs b/src/defer.rs new file mode 100644 index 0000000..7fcb73c --- /dev/null +++ b/src/defer.rs @@ -0,0 +1,16 @@ +pub struct Defer { + f: Box, +} + +impl Defer { + pub fn new(f: F) -> Self { + Self { f: Box::new(f) } + } +} + +impl Drop for Defer { + fn drop(&mut self) { + log::debug!("Calling defer function"); + self.f.as_ref()(); + } +} diff --git a/src/iterated.rs b/src/iterated.rs new file mode 100644 index 0000000..8e951d3 --- /dev/null +++ b/src/iterated.rs @@ -0,0 +1,240 @@ +// TODO : docs + +// TODO : measure avg iteration time + +use std::{sync::Arc, time::Duration}; + +use crate::{ + job::{JobDispatch, JobDispatcher}, + retry, +}; + +#[derive(Debug, Clone)] +pub struct FunctionContext { + pub context: C, + pub initial: I, + pub output: Option, + done: bool, +} + +impl FunctionContext { + pub fn new(input: I) -> FunctionContext { + FunctionContext { + context: C::default(), + initial: input, + output: None, + done: false, + } + } +} + +impl FunctionContext { + pub fn state(&mut self, f: F) { + f(&mut self.context); + } + + pub fn done(&mut self, output: O) { + self.done = true; + self.output = Some(output); + } +} + +pub struct RunningIteratedFunction { + output: std::marker::PhantomData, + context: FunctionContext, + f: Arc) -> FunctionContext>>, +} + +impl RunningIteratedFunction { + pub fn next(mut self) -> Self { + let new_ctx = self.f.as_ref()(self.context); + self.context = new_ctx; + return self; + } + + pub fn return_value(&self) -> Option<&O> { + self.context.output.as_ref() + } + + pub fn take_return_value(self) -> O { + self.context.output.unwrap() + } + + pub fn is_done(&self) -> bool { + self.context.done + } +} + +pub struct ThreadRunningIteratedFunction( + JobDispatcher>, +); + +impl + ThreadRunningIteratedFunction +{ + pub fn start(&self) { + let _ = self.0.try_send(IteratedFnQuery::Start); + } + + pub fn pause(&self) { + let _ = self.0.try_send(IteratedFnQuery::Pause); + } + + pub fn stop(&self) -> FunctionContext { + match self.0.try_send(IteratedFnQuery::Stop) { + Some(IteratedFnOutput::Context(function_context)) => function_context, + _ => unreachable!(), + } + } + + pub fn try_output(&self) -> Option { + match self.0.send(IteratedFnQuery::GetOutput) { + IteratedFnOutput::Out(out) => Some(out), + _ => None, + } + } + + pub fn output(&self) -> O { + retry(|| self.try_output()) + } +} + +#[derive(Debug, Clone)] +pub enum IteratedFnQuery { + Pause, + Start, + Stop, + GetOutput, +} + +pub enum IteratedFnOutput { + Out(O), + Context(FunctionContext), + Ok, +} + +pub struct IteratedFunction { + output: std::marker::PhantomData, + f: Arc) -> FunctionContext>>, +} + +impl + IteratedFunction +{ + pub fn new) -> FunctionContext + 'static>( + f: F, + ) -> Self { + Self { + f: Arc::new(Box::new(f)), + output: std::marker::PhantomData, + } + } + + pub fn new_threaded< + F: Fn(FunctionContext) -> FunctionContext + Send + 'static, + >( + f: F, + input: I, + ) -> ThreadRunningIteratedFunction { + Self::new_threaded_from_state(f, FunctionContext::new(input)) + } + + pub fn new_threaded_from_state< + F: Fn(FunctionContext) -> FunctionContext + Send + 'static, + >( + f: F, + context: FunctionContext, + ) -> ThreadRunningIteratedFunction { + let (dispatch, recv) = JobDispatcher::>::new(); + + let _ = std::thread::spawn(move || { + let f = Self::new(f); + let mut f = f.call_with_context(context); + + let mut counter = 0; + let mut sleep = false; + while !f.is_done() { + if sleep { + std::thread::sleep(Duration::from_secs(3)); + } + + if counter == 5 || sleep { + if let Ok(request) = recv.recv_timeout(Duration::from_millis(300)) { + match request.param { + IteratedFnQuery::Pause => { + log::info!("Paused threaded iterative function"); + sleep = true; + } + IteratedFnQuery::Start => { + log::info!("Restarted threaded iterative function"); + sleep = false; + } + IteratedFnQuery::Stop => { + log::info!("Ending threaded iterative function"); + request.done(IteratedFnOutput::Context(f.context)); + return; + } + _ => {} + } + + request.done(IteratedFnOutput::Ok); + } + counter = 0; + } + + if !sleep { + f = f.next(); + } + + counter += 1; + } + + if f.is_done() { + while let Ok(request) = recv.recv() { + match request.param { + IteratedFnQuery::Stop => { + log::warn!("Function was asked to stop but was already done"); + request.done(IteratedFnOutput::Context(f.context.clone())); + } + IteratedFnQuery::GetOutput => { + request.done(IteratedFnOutput::Out(f.context.output.clone().unwrap())); + break; + } + _ => { + request.done(IteratedFnOutput::Out(f.context.output.clone().unwrap())); + } + } + } + } + }); + + ThreadRunningIteratedFunction(dispatch) + } + + pub fn call_with_context( + &self, + ctx: FunctionContext, + ) -> RunningIteratedFunction { + RunningIteratedFunction { + output: std::marker::PhantomData, + context: ctx, + f: self.f.clone(), + } + } + + pub fn call(&self, input: I) -> RunningIteratedFunction { + RunningIteratedFunction { + output: std::marker::PhantomData, + context: FunctionContext::new(input), + f: self.f.clone(), + } + } + + pub fn run_to_end(&self, input: I) -> O { + let mut f = self.call(input); + while !f.is_done() { + f = f.next(); + } + return f.take_return_value(); + } +} diff --git a/src/lib.rs b/src/lib.rs index c3abd0c..d09a38c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,12 @@ use std::{sync::mpsc, thread, time::Instant}; +mod defer; +pub mod iterated; pub mod job; pub mod service; -pub use comrade_macro::worker; +pub use defer::Defer; + +pub use comrade_macro::{defer, worker}; pub use crossbeam; use dashmap::DashMap; use once_cell::sync::Lazy; @@ -10,16 +14,6 @@ pub use serde_json; // TODO : worker docs + refactor -// TODO : functions which can be stopped, paused, etc -/* -Example: - -let myf = Function::new(|| do_something()); - -// stop fn -myf.stop(); -*/ - pub static UNION: Lazy< DashMap<&'static str, job::JobMultiplexer>, > = Lazy::new(DashMap::new); @@ -63,40 +57,15 @@ where (fastest_item, fastest_result) } -// TODO : async version -/* -pub fn rally_async(items: Vec, f: F) -> (T, X) -where - F: AsyncFn(&T) -> X + Send + Sync + Copy + 'static, -{ - let (tx, rx) = mpsc::channel(); - let mut handles = Vec::new(); - - for item in items { - let tx = tx.clone(); - let item_ref = item; - let f = f; - - tokio::task::spawn() - - let handle = thread::spawn(move || { - let start = Instant::now(); - let result = f(&item_ref); - let elapsed = start.elapsed(); - let _ = tx.send((item_ref, result, elapsed)); - }); - handles.push(handle); +pub fn retry Option>(f: F) -> O { + loop { + match f() { + Some(resp) => { + return resp; + } + None => { + log::info!("Got nothing, retrying..."); + } + } } - - drop(tx); - - let (fastest_item, fastest_result, _) = rx.recv().unwrap(); - - for handle in handles { - handle.thread().unpark(); - } - - (fastest_item, fastest_result) } - - */ From 46cb21dc2a11a4397c3a46a26607106303a5f77a Mon Sep 17 00:00:00 2001 From: JMARyA Date: Mon, 10 Mar 2025 18:43:32 +0100 Subject: [PATCH 2/2] =?UTF-8?q?=E2=9C=A8=20cron?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 169 ++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + README.md | 55 ++++++++++++- examples/cron.rs | 59 ++++++++++++++ src/cron.rs | 195 +++++++++++++++++++++++++++++++++++++++++++++++ src/defer.rs | 14 +++- src/lib.rs | 76 +++++++++++++++++- src/service.rs | 12 +++ 8 files changed, 575 insertions(+), 6 deletions(-) create mode 100644 examples/cron.rs create mode 100644 src/cron.rs diff --git a/Cargo.lock b/Cargo.lock index aad561a..973631c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.6.18" @@ -109,6 +124,12 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +[[package]] +name = "bumpalo" +version = "3.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" + [[package]] name = "byteorder" version = "1.5.0" @@ -121,12 +142,35 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +[[package]] +name = "cc" +version = "1.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c" +dependencies = [ + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -147,6 +191,7 @@ dependencies = [ name = "comrade" version = "0.1.0" dependencies = [ + "chrono", "comrade-macro", "crossbeam", "dashmap", @@ -172,6 +217,12 @@ dependencies = [ "syn", ] +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "crossbeam" version = "0.8.4" @@ -315,6 +366,29 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "iana-time-zone" +version = "0.1.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "1.5.0" @@ -466,6 +540,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "js-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + [[package]] name = "libc" version = "0.2.170" @@ -695,6 +779,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustversion" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" + [[package]] name = "ryu" version = "1.0.20" @@ -745,6 +835,12 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "smallvec" version = "1.14.0" @@ -862,6 +958,79 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasm-bindgen" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-link" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dccfd733ce2b1753b03b6d3c65edf020262ea35e20ccdf3e288043e6dd620e3" + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 0b1331d..248e653 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,4 @@ rand = "0.9.0" redis = "0.29.1" serde = "1.0.218" uuid = { version = "1.15.1", features = ["v4"] } +chrono = "0.4.40" diff --git a/README.md b/README.md index 08ae4f1..5512cee 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ let res: (i32, i32) = rally(items, |item: &i32| { }); // Run background tasks without blocking the main thread -background(|| { +background!(|| { // Background task logic println!("This is a background task!"); }); @@ -52,6 +52,11 @@ let value: &str = retry(|| { None } }) + +// Delayed execution +delay(Duration::from_secs(4), || { + println!("I will run in 4 seconds from now on!"); +}); ``` ### Service Management @@ -74,6 +79,54 @@ fn run_services() { } ``` +#### Cron Tasks +The ServiceManager also supports running functions periodically or on time: +```rust +fn main() { + let s = ServiceManager::new(); + // Init Cron Manager + let cron = Cron::new(); + + // Add Cron Task + cron.add_task("4_sec", Schedule::Every(Duration::from_secs(4)), || { + println!("I run every 4 at {}", chrono::Utc::now()); + }); + + cron.add_task("2_sec", Schedule::Every(Duration::from_secs(2)), || { + println!("I run every 2 seconds at {}", chrono::Utc::now()); + }); + + cron.add_task( + "daily", + Schedule::Every(Duration::from_secs(60 * 60 * 24)), + || { + println!("I run daily"); + }, + ); + + // Start running the Cron Manager + let (s, cron) = s.register_cron(cron.into()); + let s = s.spawn(); + defer!(|| { + s.join().unwrap(); + }); + + // Add another Cron Task after running the manager dynamically + cron.add_task( + "future_task", + Schedule::At(datetime_in(Duration::from_secs(2))), + || { + println!("I am in the future"); + }, + ); + + // Functionally the same as above + cron.run_at(datetime_in(Duration::from_secs(3)), || { + println!("The Future"); + }); +} +``` + ### Worker Unions You can annotate a function with `#[worker]` which gives them superpowers. These functions can be queued and dispatched by the system, and their results are returned when completed. diff --git a/examples/cron.rs b/examples/cron.rs new file mode 100644 index 0000000..8cbed29 --- /dev/null +++ b/examples/cron.rs @@ -0,0 +1,59 @@ +use comrade::{ + cron::{Cron, Schedule}, + datetime_in, defer, delay, + service::ServiceManager, +}; +use std::time::Duration; + +fn main() { + env_logger::init(); + + let s = ServiceManager::new(); + // Init Cron Manager + let cron = Cron::new(); + + // Add Cron Task + cron.add_task("4_sec", Schedule::Every(Duration::from_secs(4)), || { + println!("I run every 4 at {}", chrono::Utc::now()); + }); + + cron.add_task("2_sec", Schedule::Every(Duration::from_secs(2)), || { + println!("I run every 2 seconds at {}", chrono::Utc::now()); + }); + + cron.add_task( + "daily", + Schedule::Every(Duration::from_secs(60 * 60 * 24)), + || { + println!("I run daily"); + }, + ); + + // Start running the Cron Manager + let (s, cron) = s.register_cron(cron.into()); + let s = s.spawn(); + defer!(|| { + s.join().unwrap(); + }); + + // Add another Cron Task after running the manager dynamically + cron.add_task( + "future_task", + Schedule::At(datetime_in(Duration::from_secs(2))), + || { + println!("I am in the future"); + }, + ); + + // Functionally the same as above + cron.run_at(datetime_in(Duration::from_secs(3)), || { + println!("The Future"); + }); + + // --- + + // Delayed execution + delay(Duration::from_secs(4), || { + println!("I will run in 4 seconds from now on!"); + }); +} diff --git a/src/cron.rs b/src/cron.rs new file mode 100644 index 0000000..b08f1fd --- /dev/null +++ b/src/cron.rs @@ -0,0 +1,195 @@ +use std::{ + sync::{Arc, RwLock}, + thread::JoinHandle, + time::Duration, + u64, +}; + +use chrono::Utc; +use rand::Rng; + +pub enum Schedule { + Every(Duration), + At(chrono::DateTime), +} + +pub struct CronTask { + f: Arc>, + schedule: Schedule, + name: String, + last_run: Option>, +} + +impl CronTask { + pub fn new(name: &str, schedule: Schedule, f: F) -> Self { + Self { + f: Arc::new(Box::new(f)), + schedule, + name: name.to_string(), + last_run: None, + } + } + + pub fn is_absolute(&self) -> bool { + match self.schedule { + Schedule::Every(_) => false, + Schedule::At(_) => true, + } + } + + pub fn run(&mut self) -> JoinHandle<()> { + log::info!("Starting cron task '{}'", self.name); + self.last_run = Some(Utc::now()); + let f = Arc::clone(&self.f); + std::thread::spawn(move || { + f.as_ref()(); + }) + } + + pub fn wait_until(&mut self) -> Duration { + match self.schedule { + Schedule::Every(duration) => { + let now = Utc::now(); + if let Some(last_exec) = self.last_run { + let since_then = (now - last_exec).to_std().unwrap(); + + duration.checked_sub(since_then).unwrap_or(Duration::ZERO) + } else { + self.last_run = Some(Utc::now()); + duration + } + } + Schedule::At(date_time) => { + if self.last_run.is_none() { + let now = Utc::now(); + if let Ok(dur) = date_time.signed_duration_since(&now).to_std() { + dur + } else { + Duration::ZERO + } + } else { + Duration::from_secs(u64::MAX) + } + } + } + } +} + +pub struct Cron { + tasks: RwLock>, +} + +impl Cron { + pub fn new() -> Self { + Self { + tasks: RwLock::new(Vec::new()), + } + } + + pub fn add_task(&self, name: &str, schedule: Schedule, f: F) { + self.tasks + .write() + .unwrap() + .push(CronTask::new(name, schedule, f)); + } + + pub fn run_at(&self, dt: chrono::DateTime, f: F) { + let name = format!("delayed_{}", rand::rng().random_range(1000..9999)); + self.tasks + .write() + .unwrap() + .push(CronTask::new(&name, Schedule::At(dt), f)); + } + + pub fn run(&self) { + loop { + // init + let mut last_wait = Duration::from_secs(u64::MAX); + let mut last_task: Option = None; + + { + // find next task + let mut tasks = self.tasks.write().unwrap(); + for (i, task) in tasks.iter_mut().enumerate() { + let wait_time = task.wait_until(); + if wait_time < last_wait { + last_wait = wait_time; + last_task = Some(i); + } + } + } + + if let Some(index) = last_task { + // init + let mut remove = false; + let mut skip = false; + + // limit longest blocking time (5s) + let real_wait = if last_wait.gt(&Duration::from_secs(5)) { + skip = true; + Duration::from_secs(5) + } else { + last_wait + }; + + { + // logging + let tasks = self.tasks.read().unwrap(); + log::debug!("Managing {} cron task(s)", tasks.len()); + + let task = tasks.get(index).unwrap(); + if real_wait == last_wait { + log::debug!( + "Waiting for {real_wait:?} to start cron task '{}'", + task.name + ); + } else { + log::debug!( + "Would wait for {last_wait:?} to start cron task '{}'. Waiting for {real_wait:?}", + task.name + ); + } + + // if somehow we wait indefinitely + if last_wait == Duration::from_secs(u64::MAX) { + log::warn!("Infinite wait time for cron"); + continue; + } + + // set remove flag for absolute time cron tasks + if task.is_absolute() { + log::info!( + "Removing task '{}' from cron because it will never run again", + task.name + ); + remove = true; + } + } + + // sleep until task + std::thread::sleep(real_wait); + + // skip if we are still just sleeping + if skip { + continue; + } + + { + // run cron task + let mut tasks = self.tasks.write().unwrap(); + let task = tasks.get_mut(index).unwrap(); + let _ = task.run(); + } + + if remove { + { + // remove if requested + let mut tasks = self.tasks.write().unwrap(); + log::info!("Removing cron task #{index}"); + tasks.remove(index); + } + } + } + } + } +} diff --git a/src/defer.rs b/src/defer.rs index 7fcb73c..55e77a3 100644 --- a/src/defer.rs +++ b/src/defer.rs @@ -1,16 +1,22 @@ +use std::mem::take; + pub struct Defer { - f: Box, + f: Option>, } impl Defer { - pub fn new(f: F) -> Self { - Self { f: Box::new(f) } + pub fn new(f: F) -> Self { + Self { + f: Some(Box::new(f)), + } } } impl Drop for Defer { fn drop(&mut self) { log::debug!("Calling defer function"); - self.f.as_ref()(); + if let Some(f) = take(&mut self.f) { + f(); + } } } diff --git a/src/lib.rs b/src/lib.rs index d09a38c..957f948 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,11 @@ -use std::{sync::mpsc, thread, time::Instant}; +#![feature(fn_traits)] +use std::{ + sync::mpsc, + thread, + time::{Duration, Instant}, +}; +pub mod cron; mod defer; pub mod iterated; pub mod job; @@ -69,3 +75,71 @@ pub fn retry Option>(f: F) -> O { } } } + +/// Run a background task. +/// +/// This spawns a seperate thread for a background process. +/// The background task is guaranteed to finish within its defined scope. +/// If the end of the scope is reached while the thread is still running it will block. +/// +/// # Example +/// ```ignore +/// use comrade::background; +/// +/// fn do_work() { +/// println!("doing work..."); +/// +/// // spawn background thread +/// background!(|| { +/// println!("doing something in the background"); +/// std::thread::sleep(std::time::Duration::from_secs(3)); +/// }); +/// +/// println!("doing something else..."); +/// +/// // end of scope +/// // the code will block until all background processes defined here are done. +/// } +/// +/// fn main() { +/// do_work(); +/// println!("finished with work"); +/// } +/// ``` +#[macro_export] +macro_rules! background { + ($f:expr) => { + let handle = std::thread::spawn(move || $f()); + comrade::defer!(|| { + handle.join().unwrap(); + }); + }; +} + +/// Start running a function after `duration`. +pub fn delay(duration: std::time::Duration, f: F) { + let _ = std::thread::spawn(move || { + log::info!("Will start running in {duration:?}"); + std::thread::sleep(duration); + f(); + }); +} + +/// Run `f(&T) -> X` for every item in `items` +pub fn parallel(items: Vec, f: F) -> Vec +where + F: Fn(&T) -> X + Send + Sync + Copy + 'static, +{ + let threads: Vec<_> = items + .into_iter() + .map(|x| std::thread::spawn(move || f(&x))) + .collect(); + + threads.into_iter().map(|x| x.join().unwrap()).collect() +} + +pub fn datetime_in(d: Duration) -> chrono::DateTime { + chrono::Utc::now() + .checked_add_signed(chrono::TimeDelta::from_std(d).unwrap()) + .unwrap() +} diff --git a/src/service.rs b/src/service.rs index 3c5569f..9cf3f37 100644 --- a/src/service.rs +++ b/src/service.rs @@ -8,6 +8,8 @@ use std::{ time::Duration, }; +use crate::cron::Cron; + /// Status receiver of a dead man switch pub struct DeadManReceiver { rx: Receiver, @@ -83,6 +85,16 @@ impl ServiceManager { self } + pub fn register_cron(self, cron: Arc) -> (Self, Arc) { + let cron_ret = Arc::clone(&cron); + ( + self.register("cron", move |_| { + cron.run(); + }), + cron_ret, + ) + } + /// Register a new background service pub fn register () + 'static + Send + Sync>( mut self,