diff --git a/Cargo.lock b/Cargo.lock index 973631c..c161fd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,21 +26,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "android-tzdata" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" - -[[package]] -name = "android_system_properties" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" -dependencies = [ - "libc", -] - [[package]] name = "anstream" version = "0.6.18" @@ -124,12 +109,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" -[[package]] -name = "bumpalo" -version = "3.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" - [[package]] name = "byteorder" version = "1.5.0" @@ -142,35 +121,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" -[[package]] -name = "cc" -version = "1.2.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c" -dependencies = [ - "shlex", -] - [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chrono" -version = "0.4.40" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" -dependencies = [ - "android-tzdata", - "iana-time-zone", - "js-sys", - "num-traits", - "wasm-bindgen", - "windows-link", -] - [[package]] name = "colorchoice" version = "1.0.3" @@ -191,7 +147,6 @@ dependencies = [ name = "comrade" version = "0.1.0" dependencies = [ - "chrono", "comrade-macro", "crossbeam", "dashmap", @@ -212,17 +167,10 @@ version = "0.1.0" dependencies = [ "proc-macro2", "quote", - "rand", "serde_json", "syn", ] -[[package]] -name = "core-foundation-sys" -version = "0.8.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" - [[package]] name = "crossbeam" version = "0.8.4" @@ -366,29 +314,6 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" -[[package]] -name = "iana-time-zone" -version = "0.1.61" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" -dependencies = [ - "android_system_properties", - "core-foundation-sys", - "iana-time-zone-haiku", - "js-sys", - "wasm-bindgen", - "windows-core", -] - -[[package]] -name = "iana-time-zone-haiku" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" -dependencies = [ - "cc", -] - [[package]] name = "icu_collections" version = "1.5.0" @@ -540,16 +465,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" -[[package]] -name = "js-sys" -version = "0.3.77" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" -dependencies = [ - "once_cell", - "wasm-bindgen", -] - [[package]] name = "libc" version = "0.2.170" @@ -779,12 +694,6 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" -[[package]] -name = "rustversion" -version = "1.0.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" - [[package]] name = "ryu" version = "1.0.20" @@ -835,12 +744,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" -[[package]] -name = "shlex" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" - [[package]] name = "smallvec" version = "1.14.0" @@ -958,79 +861,6 @@ dependencies = [ "wit-bindgen-rt", ] -[[package]] -name = "wasm-bindgen" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" -dependencies = [ - "cfg-if", - "once_cell", - "rustversion", - "wasm-bindgen-macro", -] - -[[package]] -name = "wasm-bindgen-backend" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" -dependencies = [ - "bumpalo", - "log", - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-macro" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" -dependencies = [ - "quote", - "wasm-bindgen-macro-support", -] - -[[package]] -name = "wasm-bindgen-macro-support" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "wasm-bindgen-backend", - "wasm-bindgen-shared", -] - -[[package]] -name = "wasm-bindgen-shared" -version = "0.2.100" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" -dependencies = [ - "unicode-ident", -] - -[[package]] -name = "windows-core" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" -dependencies = [ - "windows-targets", -] - -[[package]] -name = "windows-link" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dccfd733ce2b1753b03b6d3c65edf020262ea35e20ccdf3e288043e6dd620e3" - [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 248e653..0b1331d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,4 +16,3 @@ rand = "0.9.0" redis = "0.29.1" serde = "1.0.218" uuid = { version = "1.15.1", features = ["v4"] } -chrono = "0.4.40" diff --git a/README.md b/README.md index 5512cee..4c7ecc9 100644 --- a/README.md +++ b/README.md @@ -11,52 +11,37 @@ ## Core Concepts -### Higher Level Functions +### Parallel Execution -`comrade` provides various convenient functions. +`comrade` provides a simple interface for running tasks in parallel, perfect for independent tasks that can be processed concurrently. ```rust -// process every item in parallel let results: Vec = parallel(items, |item: &i32| { // ... }); +``` -// rally (return fastest computed result out of items) -// example: run multiple downloads and return the first finished one +### Rally Execution + +The `rally` function allows you to run multiple tasks in parallel and return the result of the **first task to finish**. This is useful when you want to prioritize the first available result from several tasks (example: download from multiple HTTP mirrors). + +```rust let res: (i32, i32) = rally(items, |item: &i32| { // ... }); +``` -// Run background tasks without blocking the main thread -background!(|| { - // Background task logic - println!("This is a background task!"); -}); +### Background Tasks -fn some_fn() { - println!("Hello World!"); +Easily run tasks in the background without blocking the main thread. This is useful for code that needs to be run without waiting for a result. - defer!(|| { - // this will run at the end of the scope - println!("Bye World!"); +```rust +fn handle() { + background(|| { + // Background task logic + println!("This is a background task!"); }); - - println!("doing something"); } - -// Retry a `Fn() -> Option` until it returns `Some(_)`. -let value: &str = retry(|| { - if rand::rng().random_range(0..10) > 5 { - Some("hello") - } else { - None - } -}) - -// Delayed execution -delay(Duration::from_secs(4), || { - println!("I will run in 4 seconds from now on!"); -}); ``` ### Service Management @@ -79,54 +64,6 @@ fn run_services() { } ``` -#### Cron Tasks -The ServiceManager also supports running functions periodically or on time: -```rust -fn main() { - let s = ServiceManager::new(); - // Init Cron Manager - let cron = Cron::new(); - - // Add Cron Task - cron.add_task("4_sec", Schedule::Every(Duration::from_secs(4)), || { - println!("I run every 4 at {}", chrono::Utc::now()); - }); - - cron.add_task("2_sec", Schedule::Every(Duration::from_secs(2)), || { - println!("I run every 2 seconds at {}", chrono::Utc::now()); - }); - - cron.add_task( - "daily", - Schedule::Every(Duration::from_secs(60 * 60 * 24)), - || { - println!("I run daily"); - }, - ); - - // Start running the Cron Manager - let (s, cron) = s.register_cron(cron.into()); - let s = s.spawn(); - defer!(|| { - s.join().unwrap(); - }); - - // Add another Cron Task after running the manager dynamically - cron.add_task( - "future_task", - Schedule::At(datetime_in(Duration::from_secs(2))), - || { - println!("I am in the future"); - }, - ); - - // Functionally the same as above - cron.run_at(datetime_in(Duration::from_secs(3)), || { - println!("The Future"); - }); -} -``` - ### Worker Unions You can annotate a function with `#[worker]` which gives them superpowers. These functions can be queued and dispatched by the system, and their results are returned when completed. @@ -176,21 +113,6 @@ fn main() { } ``` -You could effeciently run batch work: -```rust -fn batch_work() { - let mut work = Vec::new(); - - for i in 0..10 { - work.push((i.to_string(), multiply_async(i, i))); - } - - for (label, res) in LabelPendingTaskIterator(work) { - println!("Finished task {label} -> {res}"); - } -} -``` - These tasks can now be distributed with Valkey. Make sure you have a Valkey server running and the `$VALKEY_URL` environment variable is set for your application: @@ -233,66 +155,3 @@ fn main() { println!("x is {x}"); } ``` - -### Stateful Functions -If you have a workload which can be iteratively computed by modifying state it can be modeled as a `IteratedFunction`. -These functions can be paused, stopped, saved to disk and revived later. - -First define a iterative function: -```rust -#[derive(Debug, Default, Clone)] -pub struct AccFnContext { - pub iter: u64, - pub acc: u64, -} - -pub fn multiply_iterated( - mut ctx: FunctionContext, -) -> FunctionContext { - // init - let (a, b) = ctx.initial; - - // end condition (return) - if b == ctx.context.iter { - ctx.done(ctx.context.acc); - return ctx; - } - - // computation - println!("doing calc {}", ctx.context.acc); - std::thread::sleep(Duration::from_millis(50)); - let val = ctx.context.acc + a; - - // saving state - ctx.state(|x| { - x.iter += 1; - x.acc = val; - }); - - return ctx; -} -``` - -Then you can use it like: -```rust -fn main() { - // run normally - let f = IteratedFunction::new_threaded(multiply_iterated, (5, 5)); - println!("Result is {}", f.output()); - - // function starts running - let f = IteratedFunction::new_threaded(multiply_iterated, (5, 50)); - - // pause the function - f.pause(); - - // stop the function and get state - let state = f.stop(); - - // revive and start running again from state - let f = IteratedFunction::new_threaded_from_state(multiply_iterated, state); - - // get output - println!("Result is {}", f.output()); -} -``` diff --git a/comrade-macro/Cargo.lock b/comrade-macro/Cargo.lock index c47ada0..c41495c 100644 --- a/comrade-macro/Cargo.lock +++ b/comrade-macro/Cargo.lock @@ -2,68 +2,28 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "bitflags" -version = "2.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" - -[[package]] -name = "cfg-if" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" - [[package]] name = "comrade-macro" version = "0.1.0" dependencies = [ "proc-macro2", "quote", - "rand", "serde_json", "syn", ] -[[package]] -name = "getrandom" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" -dependencies = [ - "cfg-if", - "libc", - "wasi", - "windows-targets", -] - [[package]] name = "itoa" version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" -[[package]] -name = "libc" -version = "0.2.170" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" - [[package]] name = "memchr" version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "ppv-lite86" -version = "0.2.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" -dependencies = [ - "zerocopy", -] - [[package]] name = "proc-macro2" version = "1.0.94" @@ -82,36 +42,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" -dependencies = [ - "rand_chacha", - "rand_core", - "zerocopy", -] - -[[package]] -name = "rand_chacha" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" -dependencies = [ - "getrandom", -] - [[package]] name = "ryu" version = "1.0.20" @@ -166,105 +96,3 @@ name = "unicode-ident" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" - -[[package]] -name = "wasi" -version = "0.13.3+wasi-0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2" -dependencies = [ - "wit-bindgen-rt", -] - -[[package]] -name = "windows-targets" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" -dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", -] - -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" - -[[package]] -name = "windows_aarch64_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" - -[[package]] -name = "windows_i686_gnu" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" - -[[package]] -name = "windows_i686_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" - -[[package]] -name = "windows_i686_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" - -[[package]] -name = "windows_x86_64_gnu" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" - -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" - -[[package]] -name = "windows_x86_64_msvc" -version = "0.52.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" - -[[package]] -name = "wit-bindgen-rt" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" -dependencies = [ - "bitflags", -] - -[[package]] -name = "zerocopy" -version = "0.8.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6" -dependencies = [ - "zerocopy-derive", -] - -[[package]] -name = "zerocopy-derive" -version = "0.8.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] diff --git a/comrade-macro/Cargo.toml b/comrade-macro/Cargo.toml index d001722..bdf6f55 100644 --- a/comrade-macro/Cargo.toml +++ b/comrade-macro/Cargo.toml @@ -6,7 +6,6 @@ edition = "2024" [dependencies] proc-macro2 = "1.0.94" quote = "1.0.39" -rand = "0.9.0" serde_json = "1.0.140" syn = { version = "2.0.99", features = ["full"] } diff --git a/comrade-macro/src/lib.rs b/comrade-macro/src/lib.rs index 2477421..654d81d 100644 --- a/comrade-macro/src/lib.rs +++ b/comrade-macro/src/lib.rs @@ -1,7 +1,6 @@ use proc_macro::TokenStream; use quote::{format_ident, quote}; -use rand::Rng; -use syn::{ExprClosure, FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; +use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; /// This macro turns this function into a worker. /// @@ -272,35 +271,3 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { output.into() } - -/// A procedural macro that defers execution of a closure until the surrounding scope ends similiarly to Go's defer statement. -/// -/// # Example -/// ``` -/// use comrade::defer; -/// -/// fn main() { -/// defer!(|| { -/// println!("This will be executed at the end of the scope."); -/// }); -/// println!("This runs first."); -/// } -/// ``` -/// -/// The `defer!` macro ensures that the provided closure is wrapped in a `Defer` instance, which will execute the closure when the variable goes out of scope. -#[proc_macro] -pub fn defer(input: TokenStream) -> TokenStream { - // Parse the input as a closure expression (|| { ... }) - let closure = parse_macro_input!(input as ExprClosure); - - // Generate a random number for a unique variable name - let random_number = rand::rng().random_range(1000..9999); - let rand_ident = format_ident!("{}", format!("defer_{}", random_number)); - - // Expand into a let statement that wraps the closure in Defer::new - let expanded = quote! { - let #rand_ident = comrade::Defer::new(#closure); - }; - - TokenStream::from(expanded) -} diff --git a/examples/cron.rs b/examples/cron.rs deleted file mode 100644 index 8cbed29..0000000 --- a/examples/cron.rs +++ /dev/null @@ -1,59 +0,0 @@ -use comrade::{ - cron::{Cron, Schedule}, - datetime_in, defer, delay, - service::ServiceManager, -}; -use std::time::Duration; - -fn main() { - env_logger::init(); - - let s = ServiceManager::new(); - // Init Cron Manager - let cron = Cron::new(); - - // Add Cron Task - cron.add_task("4_sec", Schedule::Every(Duration::from_secs(4)), || { - println!("I run every 4 at {}", chrono::Utc::now()); - }); - - cron.add_task("2_sec", Schedule::Every(Duration::from_secs(2)), || { - println!("I run every 2 seconds at {}", chrono::Utc::now()); - }); - - cron.add_task( - "daily", - Schedule::Every(Duration::from_secs(60 * 60 * 24)), - || { - println!("I run daily"); - }, - ); - - // Start running the Cron Manager - let (s, cron) = s.register_cron(cron.into()); - let s = s.spawn(); - defer!(|| { - s.join().unwrap(); - }); - - // Add another Cron Task after running the manager dynamically - cron.add_task( - "future_task", - Schedule::At(datetime_in(Duration::from_secs(2))), - || { - println!("I am in the future"); - }, - ); - - // Functionally the same as above - cron.run_at(datetime_in(Duration::from_secs(3)), || { - println!("The Future"); - }); - - // --- - - // Delayed execution - delay(Duration::from_secs(4), || { - println!("I will run in 4 seconds from now on!"); - }); -} diff --git a/examples/defer.rs b/examples/defer.rs deleted file mode 100644 index f6ec416..0000000 --- a/examples/defer.rs +++ /dev/null @@ -1,12 +0,0 @@ -use comrade::defer; - -fn main() { - println!("Hello World!"); - - defer!(|| { - // this will run at the end of the scope - println!("Bye World!"); - }); - - println!("doing something"); -} diff --git a/examples/iterated.rs b/examples/iterated.rs deleted file mode 100644 index 52f3e60..0000000 --- a/examples/iterated.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::time::Duration; - -use comrade::iterated::{FunctionContext, IteratedFunction}; - -#[derive(Debug, Default, Clone)] -pub struct AccFnContext { - pub iter: u64, - pub acc: u64, -} - -pub fn multiply_iterated( - mut ctx: FunctionContext, -) -> FunctionContext { - // init - let (a, b) = ctx.initial; - - // end condition (return) - if b == ctx.context.iter { - ctx.done(ctx.context.acc); - return ctx; - } - - // computation - println!("doing calc {}", ctx.context.acc); - std::thread::sleep(Duration::from_millis(50)); - let val = ctx.context.acc + a; - - // saving state - ctx.state(|x| { - x.iter += 1; - x.acc = val; - }); - - return ctx; -} - -fn main() { - env_logger::init(); - - let f: IteratedFunction = - IteratedFunction::new(multiply_iterated); - - let x = f.run_to_end((5, 4)); - println!("computed x -> {x}"); - - // async - let f = IteratedFunction::new_threaded(multiply_iterated, (5, 5)); - println!("This is running"); - println!("result is {}", f.output()); - - let f = IteratedFunction::new_threaded(multiply_iterated, (5, 50)); - - // pause the function - f.pause(); - - // stop the function and get state - let state = f.stop(); - println!("This was running"); - println!("state is {state:?}"); - - println!("reviving function"); - // continue with previous computed state - let f = IteratedFunction::new_threaded_from_state(multiply_iterated, state); - - // get output - println!("result is {}", f.output()); -} diff --git a/examples/work.rs b/examples/work.rs index 3b745a6..ecc64fb 100644 --- a/examples/work.rs +++ b/examples/work.rs @@ -1,7 +1,7 @@ use std::time::Duration; use comrade::{ - job::{JobDispatcher, JobOrder, LabelPendingTaskIterator}, + job::{JobDispatcher, JobOrder}, service::ServiceManager, worker, }; @@ -22,18 +22,6 @@ pub fn multiply(a: i32, b: i32) -> i32 { a * b } -pub fn batch_work() { - let mut work = Vec::new(); - - for i in 0..10 { - work.push((i.to_string(), multiply_async(i, i))); - } - - for (label, res) in LabelPendingTaskIterator(work) { - println!("Finished task {label} -> {res}"); - } -} - fn do_work(multiply: multiply_Scoped, myfn: myfn_Scoped) { for i in 0..10 { let x = multiply.call(i, i); @@ -53,19 +41,17 @@ fn main() { let s = s.spawn(); do_work(multiply, myfn_fn); + s.join().unwrap(); let s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); let s = myfn_init(s); - let s = multiply_init(s); let s = take_time_init(s); let s = s.spawn(); let x = myfn(55); println!("myfn {x}"); - batch_work(); - // decoupled let e = take_time_async(1500); println!("This will run right after!"); diff --git a/src/cron.rs b/src/cron.rs deleted file mode 100644 index b08f1fd..0000000 --- a/src/cron.rs +++ /dev/null @@ -1,195 +0,0 @@ -use std::{ - sync::{Arc, RwLock}, - thread::JoinHandle, - time::Duration, - u64, -}; - -use chrono::Utc; -use rand::Rng; - -pub enum Schedule { - Every(Duration), - At(chrono::DateTime), -} - -pub struct CronTask { - f: Arc>, - schedule: Schedule, - name: String, - last_run: Option>, -} - -impl CronTask { - pub fn new(name: &str, schedule: Schedule, f: F) -> Self { - Self { - f: Arc::new(Box::new(f)), - schedule, - name: name.to_string(), - last_run: None, - } - } - - pub fn is_absolute(&self) -> bool { - match self.schedule { - Schedule::Every(_) => false, - Schedule::At(_) => true, - } - } - - pub fn run(&mut self) -> JoinHandle<()> { - log::info!("Starting cron task '{}'", self.name); - self.last_run = Some(Utc::now()); - let f = Arc::clone(&self.f); - std::thread::spawn(move || { - f.as_ref()(); - }) - } - - pub fn wait_until(&mut self) -> Duration { - match self.schedule { - Schedule::Every(duration) => { - let now = Utc::now(); - if let Some(last_exec) = self.last_run { - let since_then = (now - last_exec).to_std().unwrap(); - - duration.checked_sub(since_then).unwrap_or(Duration::ZERO) - } else { - self.last_run = Some(Utc::now()); - duration - } - } - Schedule::At(date_time) => { - if self.last_run.is_none() { - let now = Utc::now(); - if let Ok(dur) = date_time.signed_duration_since(&now).to_std() { - dur - } else { - Duration::ZERO - } - } else { - Duration::from_secs(u64::MAX) - } - } - } - } -} - -pub struct Cron { - tasks: RwLock>, -} - -impl Cron { - pub fn new() -> Self { - Self { - tasks: RwLock::new(Vec::new()), - } - } - - pub fn add_task(&self, name: &str, schedule: Schedule, f: F) { - self.tasks - .write() - .unwrap() - .push(CronTask::new(name, schedule, f)); - } - - pub fn run_at(&self, dt: chrono::DateTime, f: F) { - let name = format!("delayed_{}", rand::rng().random_range(1000..9999)); - self.tasks - .write() - .unwrap() - .push(CronTask::new(&name, Schedule::At(dt), f)); - } - - pub fn run(&self) { - loop { - // init - let mut last_wait = Duration::from_secs(u64::MAX); - let mut last_task: Option = None; - - { - // find next task - let mut tasks = self.tasks.write().unwrap(); - for (i, task) in tasks.iter_mut().enumerate() { - let wait_time = task.wait_until(); - if wait_time < last_wait { - last_wait = wait_time; - last_task = Some(i); - } - } - } - - if let Some(index) = last_task { - // init - let mut remove = false; - let mut skip = false; - - // limit longest blocking time (5s) - let real_wait = if last_wait.gt(&Duration::from_secs(5)) { - skip = true; - Duration::from_secs(5) - } else { - last_wait - }; - - { - // logging - let tasks = self.tasks.read().unwrap(); - log::debug!("Managing {} cron task(s)", tasks.len()); - - let task = tasks.get(index).unwrap(); - if real_wait == last_wait { - log::debug!( - "Waiting for {real_wait:?} to start cron task '{}'", - task.name - ); - } else { - log::debug!( - "Would wait for {last_wait:?} to start cron task '{}'. Waiting for {real_wait:?}", - task.name - ); - } - - // if somehow we wait indefinitely - if last_wait == Duration::from_secs(u64::MAX) { - log::warn!("Infinite wait time for cron"); - continue; - } - - // set remove flag for absolute time cron tasks - if task.is_absolute() { - log::info!( - "Removing task '{}' from cron because it will never run again", - task.name - ); - remove = true; - } - } - - // sleep until task - std::thread::sleep(real_wait); - - // skip if we are still just sleeping - if skip { - continue; - } - - { - // run cron task - let mut tasks = self.tasks.write().unwrap(); - let task = tasks.get_mut(index).unwrap(); - let _ = task.run(); - } - - if remove { - { - // remove if requested - let mut tasks = self.tasks.write().unwrap(); - log::info!("Removing cron task #{index}"); - tasks.remove(index); - } - } - } - } - } -} diff --git a/src/defer.rs b/src/defer.rs deleted file mode 100644 index 55e77a3..0000000 --- a/src/defer.rs +++ /dev/null @@ -1,22 +0,0 @@ -use std::mem::take; - -pub struct Defer { - f: Option>, -} - -impl Defer { - pub fn new(f: F) -> Self { - Self { - f: Some(Box::new(f)), - } - } -} - -impl Drop for Defer { - fn drop(&mut self) { - log::debug!("Calling defer function"); - if let Some(f) = take(&mut self.f) { - f(); - } - } -} diff --git a/src/iterated.rs b/src/iterated.rs deleted file mode 100644 index 8e951d3..0000000 --- a/src/iterated.rs +++ /dev/null @@ -1,240 +0,0 @@ -// TODO : docs - -// TODO : measure avg iteration time - -use std::{sync::Arc, time::Duration}; - -use crate::{ - job::{JobDispatch, JobDispatcher}, - retry, -}; - -#[derive(Debug, Clone)] -pub struct FunctionContext { - pub context: C, - pub initial: I, - pub output: Option, - done: bool, -} - -impl FunctionContext { - pub fn new(input: I) -> FunctionContext { - FunctionContext { - context: C::default(), - initial: input, - output: None, - done: false, - } - } -} - -impl FunctionContext { - pub fn state(&mut self, f: F) { - f(&mut self.context); - } - - pub fn done(&mut self, output: O) { - self.done = true; - self.output = Some(output); - } -} - -pub struct RunningIteratedFunction { - output: std::marker::PhantomData, - context: FunctionContext, - f: Arc) -> FunctionContext>>, -} - -impl RunningIteratedFunction { - pub fn next(mut self) -> Self { - let new_ctx = self.f.as_ref()(self.context); - self.context = new_ctx; - return self; - } - - pub fn return_value(&self) -> Option<&O> { - self.context.output.as_ref() - } - - pub fn take_return_value(self) -> O { - self.context.output.unwrap() - } - - pub fn is_done(&self) -> bool { - self.context.done - } -} - -pub struct ThreadRunningIteratedFunction( - JobDispatcher>, -); - -impl - ThreadRunningIteratedFunction -{ - pub fn start(&self) { - let _ = self.0.try_send(IteratedFnQuery::Start); - } - - pub fn pause(&self) { - let _ = self.0.try_send(IteratedFnQuery::Pause); - } - - pub fn stop(&self) -> FunctionContext { - match self.0.try_send(IteratedFnQuery::Stop) { - Some(IteratedFnOutput::Context(function_context)) => function_context, - _ => unreachable!(), - } - } - - pub fn try_output(&self) -> Option { - match self.0.send(IteratedFnQuery::GetOutput) { - IteratedFnOutput::Out(out) => Some(out), - _ => None, - } - } - - pub fn output(&self) -> O { - retry(|| self.try_output()) - } -} - -#[derive(Debug, Clone)] -pub enum IteratedFnQuery { - Pause, - Start, - Stop, - GetOutput, -} - -pub enum IteratedFnOutput { - Out(O), - Context(FunctionContext), - Ok, -} - -pub struct IteratedFunction { - output: std::marker::PhantomData, - f: Arc) -> FunctionContext>>, -} - -impl - IteratedFunction -{ - pub fn new) -> FunctionContext + 'static>( - f: F, - ) -> Self { - Self { - f: Arc::new(Box::new(f)), - output: std::marker::PhantomData, - } - } - - pub fn new_threaded< - F: Fn(FunctionContext) -> FunctionContext + Send + 'static, - >( - f: F, - input: I, - ) -> ThreadRunningIteratedFunction { - Self::new_threaded_from_state(f, FunctionContext::new(input)) - } - - pub fn new_threaded_from_state< - F: Fn(FunctionContext) -> FunctionContext + Send + 'static, - >( - f: F, - context: FunctionContext, - ) -> ThreadRunningIteratedFunction { - let (dispatch, recv) = JobDispatcher::>::new(); - - let _ = std::thread::spawn(move || { - let f = Self::new(f); - let mut f = f.call_with_context(context); - - let mut counter = 0; - let mut sleep = false; - while !f.is_done() { - if sleep { - std::thread::sleep(Duration::from_secs(3)); - } - - if counter == 5 || sleep { - if let Ok(request) = recv.recv_timeout(Duration::from_millis(300)) { - match request.param { - IteratedFnQuery::Pause => { - log::info!("Paused threaded iterative function"); - sleep = true; - } - IteratedFnQuery::Start => { - log::info!("Restarted threaded iterative function"); - sleep = false; - } - IteratedFnQuery::Stop => { - log::info!("Ending threaded iterative function"); - request.done(IteratedFnOutput::Context(f.context)); - return; - } - _ => {} - } - - request.done(IteratedFnOutput::Ok); - } - counter = 0; - } - - if !sleep { - f = f.next(); - } - - counter += 1; - } - - if f.is_done() { - while let Ok(request) = recv.recv() { - match request.param { - IteratedFnQuery::Stop => { - log::warn!("Function was asked to stop but was already done"); - request.done(IteratedFnOutput::Context(f.context.clone())); - } - IteratedFnQuery::GetOutput => { - request.done(IteratedFnOutput::Out(f.context.output.clone().unwrap())); - break; - } - _ => { - request.done(IteratedFnOutput::Out(f.context.output.clone().unwrap())); - } - } - } - } - }); - - ThreadRunningIteratedFunction(dispatch) - } - - pub fn call_with_context( - &self, - ctx: FunctionContext, - ) -> RunningIteratedFunction { - RunningIteratedFunction { - output: std::marker::PhantomData, - context: ctx, - f: self.f.clone(), - } - } - - pub fn call(&self, input: I) -> RunningIteratedFunction { - RunningIteratedFunction { - output: std::marker::PhantomData, - context: FunctionContext::new(input), - f: self.f.clone(), - } - } - - pub fn run_to_end(&self, input: I) -> O { - let mut f = self.call(input); - while !f.is_done() { - f = f.next(); - } - return f.take_return_value(); - } -} diff --git a/src/lib.rs b/src/lib.rs index 957f948..c3abd0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,18 +1,8 @@ -#![feature(fn_traits)] -use std::{ - sync::mpsc, - thread, - time::{Duration, Instant}, -}; +use std::{sync::mpsc, thread, time::Instant}; -pub mod cron; -mod defer; -pub mod iterated; pub mod job; pub mod service; -pub use defer::Defer; - -pub use comrade_macro::{defer, worker}; +pub use comrade_macro::worker; pub use crossbeam; use dashmap::DashMap; use once_cell::sync::Lazy; @@ -20,6 +10,16 @@ pub use serde_json; // TODO : worker docs + refactor +// TODO : functions which can be stopped, paused, etc +/* +Example: + +let myf = Function::new(|| do_something()); + +// stop fn +myf.stop(); +*/ + pub static UNION: Lazy< DashMap<&'static str, job::JobMultiplexer>, > = Lazy::new(DashMap::new); @@ -63,83 +63,40 @@ where (fastest_item, fastest_result) } -pub fn retry Option>(f: F) -> O { - loop { - match f() { - Some(resp) => { - return resp; - } - None => { - log::info!("Got nothing, retrying..."); - } - } - } -} - -/// Run a background task. -/// -/// This spawns a seperate thread for a background process. -/// The background task is guaranteed to finish within its defined scope. -/// If the end of the scope is reached while the thread is still running it will block. -/// -/// # Example -/// ```ignore -/// use comrade::background; -/// -/// fn do_work() { -/// println!("doing work..."); -/// -/// // spawn background thread -/// background!(|| { -/// println!("doing something in the background"); -/// std::thread::sleep(std::time::Duration::from_secs(3)); -/// }); -/// -/// println!("doing something else..."); -/// -/// // end of scope -/// // the code will block until all background processes defined here are done. -/// } -/// -/// fn main() { -/// do_work(); -/// println!("finished with work"); -/// } -/// ``` -#[macro_export] -macro_rules! background { - ($f:expr) => { - let handle = std::thread::spawn(move || $f()); - comrade::defer!(|| { - handle.join().unwrap(); - }); - }; -} - -/// Start running a function after `duration`. -pub fn delay(duration: std::time::Duration, f: F) { - let _ = std::thread::spawn(move || { - log::info!("Will start running in {duration:?}"); - std::thread::sleep(duration); - f(); - }); -} - -/// Run `f(&T) -> X` for every item in `items` -pub fn parallel(items: Vec, f: F) -> Vec +// TODO : async version +/* +pub fn rally_async(items: Vec, f: F) -> (T, X) where - F: Fn(&T) -> X + Send + Sync + Copy + 'static, + F: AsyncFn(&T) -> X + Send + Sync + Copy + 'static, { - let threads: Vec<_> = items - .into_iter() - .map(|x| std::thread::spawn(move || f(&x))) - .collect(); + let (tx, rx) = mpsc::channel(); + let mut handles = Vec::new(); - threads.into_iter().map(|x| x.join().unwrap()).collect() + for item in items { + let tx = tx.clone(); + let item_ref = item; + let f = f; + + tokio::task::spawn() + + let handle = thread::spawn(move || { + let start = Instant::now(); + let result = f(&item_ref); + let elapsed = start.elapsed(); + let _ = tx.send((item_ref, result, elapsed)); + }); + handles.push(handle); + } + + drop(tx); + + let (fastest_item, fastest_result, _) = rx.recv().unwrap(); + + for handle in handles { + handle.thread().unpark(); + } + + (fastest_item, fastest_result) } -pub fn datetime_in(d: Duration) -> chrono::DateTime { - chrono::Utc::now() - .checked_add_signed(chrono::TimeDelta::from_std(d).unwrap()) - .unwrap() -} + */ diff --git a/src/service.rs b/src/service.rs index 9cf3f37..3c5569f 100644 --- a/src/service.rs +++ b/src/service.rs @@ -8,8 +8,6 @@ use std::{ time::Duration, }; -use crate::cron::Cron; - /// Status receiver of a dead man switch pub struct DeadManReceiver { rx: Receiver, @@ -85,16 +83,6 @@ impl ServiceManager { self } - pub fn register_cron(self, cron: Arc) -> (Self, Arc) { - let cron_ret = Arc::clone(&cron); - ( - self.register("cron", move |_| { - cron.run(); - }), - cron_ret, - ) - } - /// Register a new background service pub fn register () + 'static + Send + Sync>( mut self,