Compare commits

...

2 commits

Author SHA1 Message Date
46cb21dc2a
cron 2025-03-10 18:43:32 +01:00
18c663fcdb
iterated fn + defer + retry 2025-03-10 11:27:20 +01:00
15 changed files with 1247 additions and 65 deletions

170
Cargo.lock generated
View file

@ -26,6 +26,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.18"
@ -109,6 +124,12 @@ version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
[[package]]
name = "bumpalo"
version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
[[package]]
name = "byteorder"
version = "1.5.0"
@ -121,12 +142,35 @@ version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
[[package]]
name = "cc"
version = "1.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c"
dependencies = [
"shlex",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.40"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-link",
]
[[package]]
name = "colorchoice"
version = "1.0.3"
@ -147,6 +191,7 @@ dependencies = [
name = "comrade"
version = "0.1.0"
dependencies = [
"chrono",
"comrade-macro",
"crossbeam",
"dashmap",
@ -167,10 +212,17 @@ version = "0.1.0"
dependencies = [
"proc-macro2",
"quote",
"rand",
"serde_json",
"syn",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "crossbeam"
version = "0.8.4"
@ -314,6 +366,29 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "iana-time-zone"
version = "0.1.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "1.5.0"
@ -465,6 +540,16 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "js-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]]
name = "libc"
version = "0.2.170"
@ -694,6 +779,12 @@ version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustversion"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2"
[[package]]
name = "ryu"
version = "1.0.20"
@ -744,6 +835,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "smallvec"
version = "1.14.0"
@ -861,6 +958,79 @@ dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5"
dependencies = [
"cfg-if",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6"
dependencies = [
"bumpalo",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
dependencies = [
"unicode-ident",
]
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-link"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dccfd733ce2b1753b03b6d3c65edf020262ea35e20ccdf3e288043e6dd620e3"
[[package]]
name = "windows-sys"
version = "0.52.0"

View file

@ -16,3 +16,4 @@ rand = "0.9.0"
redis = "0.29.1"
serde = "1.0.218"
uuid = { version = "1.15.1", features = ["v4"] }
chrono = "0.4.40"

173
README.md
View file

@ -11,37 +11,52 @@
## Core Concepts
### Parallel Execution
### Higher Level Functions
`comrade` provides a simple interface for running tasks in parallel, perfect for independent tasks that can be processed concurrently.
`comrade` provides various convenient functions.
```rust
// process every item in parallel
let results: Vec<i32> = parallel(items, |item: &i32| {
// ...
});
```
### Rally Execution
The `rally` function allows you to run multiple tasks in parallel and return the result of the **first task to finish**. This is useful when you want to prioritize the first available result from several tasks (example: download from multiple HTTP mirrors).
```rust
// rally (return fastest computed result out of items)
// example: run multiple downloads and return the first finished one
let res: (i32, i32) = rally(items, |item: &i32| {
// ...
});
```
### Background Tasks
Easily run tasks in the background without blocking the main thread. This is useful for code that needs to be run without waiting for a result.
```rust
fn handle() {
background(|| {
// Run background tasks without blocking the main thread
background!(|| {
// Background task logic
println!("This is a background task!");
});
fn some_fn() {
println!("Hello World!");
defer!(|| {
// this will run at the end of the scope
println!("Bye World!");
});
println!("doing something");
}
// Retry a `Fn() -> Option<X>` until it returns `Some(_)`.
let value: &str = retry(|| {
if rand::rng().random_range(0..10) > 5 {
Some("hello")
} else {
None
}
})
// Delayed execution
delay(Duration::from_secs(4), || {
println!("I will run in 4 seconds from now on!");
});
```
### Service Management
@ -64,6 +79,54 @@ fn run_services() {
}
```
#### Cron Tasks
The ServiceManager also supports running functions periodically or on time:
```rust
fn main() {
let s = ServiceManager::new();
// Init Cron Manager
let cron = Cron::new();
// Add Cron Task
cron.add_task("4_sec", Schedule::Every(Duration::from_secs(4)), || {
println!("I run every 4 at {}", chrono::Utc::now());
});
cron.add_task("2_sec", Schedule::Every(Duration::from_secs(2)), || {
println!("I run every 2 seconds at {}", chrono::Utc::now());
});
cron.add_task(
"daily",
Schedule::Every(Duration::from_secs(60 * 60 * 24)),
|| {
println!("I run daily");
},
);
// Start running the Cron Manager
let (s, cron) = s.register_cron(cron.into());
let s = s.spawn();
defer!(|| {
s.join().unwrap();
});
// Add another Cron Task after running the manager dynamically
cron.add_task(
"future_task",
Schedule::At(datetime_in(Duration::from_secs(2))),
|| {
println!("I am in the future");
},
);
// Functionally the same as above
cron.run_at(datetime_in(Duration::from_secs(3)), || {
println!("The Future");
});
}
```
### Worker Unions
You can annotate a function with `#[worker]` which gives them superpowers. These functions can be queued and dispatched by the system, and their results are returned when completed.
@ -113,6 +176,21 @@ fn main() {
}
```
You could effeciently run batch work:
```rust
fn batch_work() {
let mut work = Vec::new();
for i in 0..10 {
work.push((i.to_string(), multiply_async(i, i)));
}
for (label, res) in LabelPendingTaskIterator(work) {
println!("Finished task {label} -> {res}");
}
}
```
These tasks can now be distributed with Valkey.
Make sure you have a Valkey server running and the `$VALKEY_URL` environment variable is set for your application:
@ -155,3 +233,66 @@ fn main() {
println!("x is {x}");
}
```
### Stateful Functions
If you have a workload which can be iteratively computed by modifying state it can be modeled as a `IteratedFunction`.
These functions can be paused, stopped, saved to disk and revived later.
First define a iterative function:
```rust
#[derive(Debug, Default, Clone)]
pub struct AccFnContext {
pub iter: u64,
pub acc: u64,
}
pub fn multiply_iterated(
mut ctx: FunctionContext<AccFnContext, (u64, u64), u64>,
) -> FunctionContext<AccFnContext, (u64, u64), u64> {
// init
let (a, b) = ctx.initial;
// end condition (return)
if b == ctx.context.iter {
ctx.done(ctx.context.acc);
return ctx;
}
// computation
println!("doing calc {}", ctx.context.acc);
std::thread::sleep(Duration::from_millis(50));
let val = ctx.context.acc + a;
// saving state
ctx.state(|x| {
x.iter += 1;
x.acc = val;
});
return ctx;
}
```
Then you can use it like:
```rust
fn main() {
// run normally
let f = IteratedFunction::new_threaded(multiply_iterated, (5, 5));
println!("Result is {}", f.output());
// function starts running
let f = IteratedFunction::new_threaded(multiply_iterated, (5, 50));
// pause the function
f.pause();
// stop the function and get state
let state = f.stop();
// revive and start running again from state
let f = IteratedFunction::new_threaded_from_state(multiply_iterated, state);
// get output
println!("Result is {}", f.output());
}
```

172
comrade-macro/Cargo.lock generated
View file

@ -2,28 +2,68 @@
# It is not intended for manual editing.
version = 4
[[package]]
name = "bitflags"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "comrade-macro"
version = "0.1.0"
dependencies = [
"proc-macro2",
"quote",
"rand",
"serde_json",
"syn",
]
[[package]]
name = "getrandom"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8"
dependencies = [
"cfg-if",
"libc",
"wasi",
"windows-targets",
]
[[package]]
name = "itoa"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "libc"
version = "0.2.170"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828"
[[package]]
name = "memchr"
version = "2.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
[[package]]
name = "ppv-lite86"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
dependencies = [
"zerocopy",
]
[[package]]
name = "proc-macro2"
version = "1.0.94"
@ -42,6 +82,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
dependencies = [
"rand_chacha",
"rand_core",
"zerocopy",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
"getrandom",
]
[[package]]
name = "ryu"
version = "1.0.20"
@ -96,3 +166,105 @@ name = "unicode-ident"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
[[package]]
name = "wasi"
version = "0.13.3+wasi-0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2"
dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "wit-bindgen-rt"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c"
dependencies = [
"bitflags",
]
[[package]]
name = "zerocopy"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View file

@ -6,6 +6,7 @@ edition = "2024"
[dependencies]
proc-macro2 = "1.0.94"
quote = "1.0.39"
rand = "0.9.0"
serde_json = "1.0.140"
syn = { version = "2.0.99", features = ["full"] }

View file

@ -1,6 +1,7 @@
use proc_macro::TokenStream;
use quote::{format_ident, quote};
use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input};
use rand::Rng;
use syn::{ExprClosure, FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input};
/// This macro turns this function into a worker.
///
@ -271,3 +272,35 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream {
output.into()
}
/// A procedural macro that defers execution of a closure until the surrounding scope ends similiarly to Go's defer statement.
///
/// # Example
/// ```
/// use comrade::defer;
///
/// fn main() {
/// defer!(|| {
/// println!("This will be executed at the end of the scope.");
/// });
/// println!("This runs first.");
/// }
/// ```
///
/// The `defer!` macro ensures that the provided closure is wrapped in a `Defer` instance, which will execute the closure when the variable goes out of scope.
#[proc_macro]
pub fn defer(input: TokenStream) -> TokenStream {
// Parse the input as a closure expression (|| { ... })
let closure = parse_macro_input!(input as ExprClosure);
// Generate a random number for a unique variable name
let random_number = rand::rng().random_range(1000..9999);
let rand_ident = format_ident!("{}", format!("defer_{}", random_number));
// Expand into a let statement that wraps the closure in Defer::new
let expanded = quote! {
let #rand_ident = comrade::Defer::new(#closure);
};
TokenStream::from(expanded)
}

59
examples/cron.rs Normal file
View file

@ -0,0 +1,59 @@
use comrade::{
cron::{Cron, Schedule},
datetime_in, defer, delay,
service::ServiceManager,
};
use std::time::Duration;
fn main() {
env_logger::init();
let s = ServiceManager::new();
// Init Cron Manager
let cron = Cron::new();
// Add Cron Task
cron.add_task("4_sec", Schedule::Every(Duration::from_secs(4)), || {
println!("I run every 4 at {}", chrono::Utc::now());
});
cron.add_task("2_sec", Schedule::Every(Duration::from_secs(2)), || {
println!("I run every 2 seconds at {}", chrono::Utc::now());
});
cron.add_task(
"daily",
Schedule::Every(Duration::from_secs(60 * 60 * 24)),
|| {
println!("I run daily");
},
);
// Start running the Cron Manager
let (s, cron) = s.register_cron(cron.into());
let s = s.spawn();
defer!(|| {
s.join().unwrap();
});
// Add another Cron Task after running the manager dynamically
cron.add_task(
"future_task",
Schedule::At(datetime_in(Duration::from_secs(2))),
|| {
println!("I am in the future");
},
);
// Functionally the same as above
cron.run_at(datetime_in(Duration::from_secs(3)), || {
println!("The Future");
});
// ---
// Delayed execution
delay(Duration::from_secs(4), || {
println!("I will run in 4 seconds from now on!");
});
}

12
examples/defer.rs Normal file
View file

@ -0,0 +1,12 @@
use comrade::defer;
fn main() {
println!("Hello World!");
defer!(|| {
// this will run at the end of the scope
println!("Bye World!");
});
println!("doing something");
}

67
examples/iterated.rs Normal file
View file

@ -0,0 +1,67 @@
use std::time::Duration;
use comrade::iterated::{FunctionContext, IteratedFunction};
#[derive(Debug, Default, Clone)]
pub struct AccFnContext {
pub iter: u64,
pub acc: u64,
}
pub fn multiply_iterated(
mut ctx: FunctionContext<AccFnContext, (u64, u64), u64>,
) -> FunctionContext<AccFnContext, (u64, u64), u64> {
// init
let (a, b) = ctx.initial;
// end condition (return)
if b == ctx.context.iter {
ctx.done(ctx.context.acc);
return ctx;
}
// computation
println!("doing calc {}", ctx.context.acc);
std::thread::sleep(Duration::from_millis(50));
let val = ctx.context.acc + a;
// saving state
ctx.state(|x| {
x.iter += 1;
x.acc = val;
});
return ctx;
}
fn main() {
env_logger::init();
let f: IteratedFunction<AccFnContext, (u64, u64), u64> =
IteratedFunction::new(multiply_iterated);
let x = f.run_to_end((5, 4));
println!("computed x -> {x}");
// async
let f = IteratedFunction::new_threaded(multiply_iterated, (5, 5));
println!("This is running");
println!("result is {}", f.output());
let f = IteratedFunction::new_threaded(multiply_iterated, (5, 50));
// pause the function
f.pause();
// stop the function and get state
let state = f.stop();
println!("This was running");
println!("state is {state:?}");
println!("reviving function");
// continue with previous computed state
let f = IteratedFunction::new_threaded_from_state(multiply_iterated, state);
// get output
println!("result is {}", f.output());
}

View file

@ -1,7 +1,7 @@
use std::time::Duration;
use comrade::{
job::{JobDispatcher, JobOrder},
job::{JobDispatcher, JobOrder, LabelPendingTaskIterator},
service::ServiceManager,
worker,
};
@ -22,6 +22,18 @@ pub fn multiply(a: i32, b: i32) -> i32 {
a * b
}
pub fn batch_work() {
let mut work = Vec::new();
for i in 0..10 {
work.push((i.to_string(), multiply_async(i, i)));
}
for (label, res) in LabelPendingTaskIterator(work) {
println!("Finished task {label} -> {res}");
}
}
fn do_work(multiply: multiply_Scoped, myfn: myfn_Scoped) {
for i in 0..10 {
let x = multiply.call(i, i);
@ -41,17 +53,19 @@ fn main() {
let s = s.spawn();
do_work(multiply, myfn_fn);
s.join().unwrap();
let s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay);
let s = myfn_init(s);
let s = multiply_init(s);
let s = take_time_init(s);
let s = s.spawn();
let x = myfn(55);
println!("myfn {x}");
batch_work();
// decoupled
let e = take_time_async(1500);
println!("This will run right after!");

195
src/cron.rs Normal file
View file

@ -0,0 +1,195 @@
use std::{
sync::{Arc, RwLock},
thread::JoinHandle,
time::Duration,
u64,
};
use chrono::Utc;
use rand::Rng;
pub enum Schedule {
Every(Duration),
At(chrono::DateTime<Utc>),
}
pub struct CronTask {
f: Arc<Box<dyn Fn() + Send + Sync + 'static>>,
schedule: Schedule,
name: String,
last_run: Option<chrono::DateTime<Utc>>,
}
impl CronTask {
pub fn new<F: Fn() + Send + Sync + 'static>(name: &str, schedule: Schedule, f: F) -> Self {
Self {
f: Arc::new(Box::new(f)),
schedule,
name: name.to_string(),
last_run: None,
}
}
pub fn is_absolute(&self) -> bool {
match self.schedule {
Schedule::Every(_) => false,
Schedule::At(_) => true,
}
}
pub fn run(&mut self) -> JoinHandle<()> {
log::info!("Starting cron task '{}'", self.name);
self.last_run = Some(Utc::now());
let f = Arc::clone(&self.f);
std::thread::spawn(move || {
f.as_ref()();
})
}
pub fn wait_until(&mut self) -> Duration {
match self.schedule {
Schedule::Every(duration) => {
let now = Utc::now();
if let Some(last_exec) = self.last_run {
let since_then = (now - last_exec).to_std().unwrap();
duration.checked_sub(since_then).unwrap_or(Duration::ZERO)
} else {
self.last_run = Some(Utc::now());
duration
}
}
Schedule::At(date_time) => {
if self.last_run.is_none() {
let now = Utc::now();
if let Ok(dur) = date_time.signed_duration_since(&now).to_std() {
dur
} else {
Duration::ZERO
}
} else {
Duration::from_secs(u64::MAX)
}
}
}
}
}
pub struct Cron {
tasks: RwLock<Vec<CronTask>>,
}
impl Cron {
pub fn new() -> Self {
Self {
tasks: RwLock::new(Vec::new()),
}
}
pub fn add_task<F: Fn() + Send + Sync + 'static>(&self, name: &str, schedule: Schedule, f: F) {
self.tasks
.write()
.unwrap()
.push(CronTask::new(name, schedule, f));
}
pub fn run_at<F: Fn() + Send + Sync + 'static>(&self, dt: chrono::DateTime<chrono::Utc>, f: F) {
let name = format!("delayed_{}", rand::rng().random_range(1000..9999));
self.tasks
.write()
.unwrap()
.push(CronTask::new(&name, Schedule::At(dt), f));
}
pub fn run(&self) {
loop {
// init
let mut last_wait = Duration::from_secs(u64::MAX);
let mut last_task: Option<usize> = None;
{
// find next task
let mut tasks = self.tasks.write().unwrap();
for (i, task) in tasks.iter_mut().enumerate() {
let wait_time = task.wait_until();
if wait_time < last_wait {
last_wait = wait_time;
last_task = Some(i);
}
}
}
if let Some(index) = last_task {
// init
let mut remove = false;
let mut skip = false;
// limit longest blocking time (5s)
let real_wait = if last_wait.gt(&Duration::from_secs(5)) {
skip = true;
Duration::from_secs(5)
} else {
last_wait
};
{
// logging
let tasks = self.tasks.read().unwrap();
log::debug!("Managing {} cron task(s)", tasks.len());
let task = tasks.get(index).unwrap();
if real_wait == last_wait {
log::debug!(
"Waiting for {real_wait:?} to start cron task '{}'",
task.name
);
} else {
log::debug!(
"Would wait for {last_wait:?} to start cron task '{}'. Waiting for {real_wait:?}",
task.name
);
}
// if somehow we wait indefinitely
if last_wait == Duration::from_secs(u64::MAX) {
log::warn!("Infinite wait time for cron");
continue;
}
// set remove flag for absolute time cron tasks
if task.is_absolute() {
log::info!(
"Removing task '{}' from cron because it will never run again",
task.name
);
remove = true;
}
}
// sleep until task
std::thread::sleep(real_wait);
// skip if we are still just sleeping
if skip {
continue;
}
{
// run cron task
let mut tasks = self.tasks.write().unwrap();
let task = tasks.get_mut(index).unwrap();
let _ = task.run();
}
if remove {
{
// remove if requested
let mut tasks = self.tasks.write().unwrap();
log::info!("Removing cron task #{index}");
tasks.remove(index);
}
}
}
}
}
}

22
src/defer.rs Normal file
View file

@ -0,0 +1,22 @@
use std::mem::take;
pub struct Defer {
f: Option<Box<dyn FnOnce()>>,
}
impl Defer {
pub fn new<F: FnOnce() + 'static>(f: F) -> Self {
Self {
f: Some(Box::new(f)),
}
}
}
impl Drop for Defer {
fn drop(&mut self) {
log::debug!("Calling defer function");
if let Some(f) = take(&mut self.f) {
f();
}
}
}

240
src/iterated.rs Normal file
View file

@ -0,0 +1,240 @@
// TODO : docs
// TODO : measure avg iteration time
use std::{sync::Arc, time::Duration};
use crate::{
job::{JobDispatch, JobDispatcher},
retry,
};
#[derive(Debug, Clone)]
pub struct FunctionContext<C, I, O> {
pub context: C,
pub initial: I,
pub output: Option<O>,
done: bool,
}
impl<C: Default, I, O> FunctionContext<C, I, O> {
pub fn new(input: I) -> FunctionContext<C, I, O> {
FunctionContext {
context: C::default(),
initial: input,
output: None,
done: false,
}
}
}
impl<C, I, O> FunctionContext<C, I, O> {
pub fn state<F: Fn(&mut C)>(&mut self, f: F) {
f(&mut self.context);
}
pub fn done(&mut self, output: O) {
self.done = true;
self.output = Some(output);
}
}
pub struct RunningIteratedFunction<C, I, O> {
output: std::marker::PhantomData<O>,
context: FunctionContext<C, I, O>,
f: Arc<Box<dyn Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O>>>,
}
impl<C, I, O> RunningIteratedFunction<C, I, O> {
pub fn next(mut self) -> Self {
let new_ctx = self.f.as_ref()(self.context);
self.context = new_ctx;
return self;
}
pub fn return_value(&self) -> Option<&O> {
self.context.output.as_ref()
}
pub fn take_return_value(self) -> O {
self.context.output.unwrap()
}
pub fn is_done(&self) -> bool {
self.context.done
}
}
pub struct ThreadRunningIteratedFunction<C: Send + 'static, I: Send + 'static, O: Send + 'static>(
JobDispatcher<IteratedFnQuery, IteratedFnOutput<C, I, O>>,
);
impl<C: Send + 'static, I: Send + 'static, O: Send + 'static>
ThreadRunningIteratedFunction<C, I, O>
{
pub fn start(&self) {
let _ = self.0.try_send(IteratedFnQuery::Start);
}
pub fn pause(&self) {
let _ = self.0.try_send(IteratedFnQuery::Pause);
}
pub fn stop(&self) -> FunctionContext<C, I, O> {
match self.0.try_send(IteratedFnQuery::Stop) {
Some(IteratedFnOutput::Context(function_context)) => function_context,
_ => unreachable!(),
}
}
pub fn try_output(&self) -> Option<O> {
match self.0.send(IteratedFnQuery::GetOutput) {
IteratedFnOutput::Out(out) => Some(out),
_ => None,
}
}
pub fn output(&self) -> O {
retry(|| self.try_output())
}
}
#[derive(Debug, Clone)]
pub enum IteratedFnQuery {
Pause,
Start,
Stop,
GetOutput,
}
pub enum IteratedFnOutput<C, I, O> {
Out(O),
Context(FunctionContext<C, I, O>),
Ok,
}
pub struct IteratedFunction<C, I, O> {
output: std::marker::PhantomData<O>,
f: Arc<Box<dyn Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O>>>,
}
impl<C: Default + Clone + Send, I: Clone + Send + 'static, O: Send + Clone>
IteratedFunction<C, I, O>
{
pub fn new<F: Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O> + 'static>(
f: F,
) -> Self {
Self {
f: Arc::new(Box::new(f)),
output: std::marker::PhantomData,
}
}
pub fn new_threaded<
F: Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O> + Send + 'static,
>(
f: F,
input: I,
) -> ThreadRunningIteratedFunction<C, I, O> {
Self::new_threaded_from_state(f, FunctionContext::new(input))
}
pub fn new_threaded_from_state<
F: Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O> + Send + 'static,
>(
f: F,
context: FunctionContext<C, I, O>,
) -> ThreadRunningIteratedFunction<C, I, O> {
let (dispatch, recv) = JobDispatcher::<IteratedFnQuery, IteratedFnOutput<C, I, O>>::new();
let _ = std::thread::spawn(move || {
let f = Self::new(f);
let mut f = f.call_with_context(context);
let mut counter = 0;
let mut sleep = false;
while !f.is_done() {
if sleep {
std::thread::sleep(Duration::from_secs(3));
}
if counter == 5 || sleep {
if let Ok(request) = recv.recv_timeout(Duration::from_millis(300)) {
match request.param {
IteratedFnQuery::Pause => {
log::info!("Paused threaded iterative function");
sleep = true;
}
IteratedFnQuery::Start => {
log::info!("Restarted threaded iterative function");
sleep = false;
}
IteratedFnQuery::Stop => {
log::info!("Ending threaded iterative function");
request.done(IteratedFnOutput::Context(f.context));
return;
}
_ => {}
}
request.done(IteratedFnOutput::Ok);
}
counter = 0;
}
if !sleep {
f = f.next();
}
counter += 1;
}
if f.is_done() {
while let Ok(request) = recv.recv() {
match request.param {
IteratedFnQuery::Stop => {
log::warn!("Function was asked to stop but was already done");
request.done(IteratedFnOutput::Context(f.context.clone()));
}
IteratedFnQuery::GetOutput => {
request.done(IteratedFnOutput::Out(f.context.output.clone().unwrap()));
break;
}
_ => {
request.done(IteratedFnOutput::Out(f.context.output.clone().unwrap()));
}
}
}
}
});
ThreadRunningIteratedFunction(dispatch)
}
pub fn call_with_context(
&self,
ctx: FunctionContext<C, I, O>,
) -> RunningIteratedFunction<C, I, O> {
RunningIteratedFunction {
output: std::marker::PhantomData,
context: ctx,
f: self.f.clone(),
}
}
pub fn call(&self, input: I) -> RunningIteratedFunction<C, I, O> {
RunningIteratedFunction {
output: std::marker::PhantomData,
context: FunctionContext::new(input),
f: self.f.clone(),
}
}
pub fn run_to_end(&self, input: I) -> O {
let mut f = self.call(input);
while !f.is_done() {
f = f.next();
}
return f.take_return_value();
}
}

View file

@ -1,8 +1,18 @@
use std::{sync::mpsc, thread, time::Instant};
#![feature(fn_traits)]
use std::{
sync::mpsc,
thread,
time::{Duration, Instant},
};
pub mod cron;
mod defer;
pub mod iterated;
pub mod job;
pub mod service;
pub use comrade_macro::worker;
pub use defer::Defer;
pub use comrade_macro::{defer, worker};
pub use crossbeam;
use dashmap::DashMap;
use once_cell::sync::Lazy;
@ -10,16 +20,6 @@ pub use serde_json;
// TODO : worker docs + refactor
// TODO : functions which can be stopped, paused, etc
/*
Example:
let myf = Function::new(|| do_something());
// stop fn
myf.stop();
*/
pub static UNION: Lazy<
DashMap<&'static str, job::JobMultiplexer<serde_json::Value, serde_json::Value>>,
> = Lazy::new(DashMap::new);
@ -63,40 +63,83 @@ where
(fastest_item, fastest_result)
}
// TODO : async version
/*
pub fn rally_async<T: Send + Sync + 'static, F, X: Send + 'static>(items: Vec<T>, f: F) -> (T, X)
where
F: AsyncFn(&T) -> X + Send + Sync + Copy + 'static,
{
let (tx, rx) = mpsc::channel();
let mut handles = Vec::new();
for item in items {
let tx = tx.clone();
let item_ref = item;
let f = f;
tokio::task::spawn()
let handle = thread::spawn(move || {
let start = Instant::now();
let result = f(&item_ref);
let elapsed = start.elapsed();
let _ = tx.send((item_ref, result, elapsed));
});
handles.push(handle);
pub fn retry<O, F: Fn() -> Option<O>>(f: F) -> O {
loop {
match f() {
Some(resp) => {
return resp;
}
None => {
log::info!("Got nothing, retrying...");
}
}
drop(tx);
let (fastest_item, fastest_result, _) = rx.recv().unwrap();
for handle in handles {
handle.thread().unpark();
}
(fastest_item, fastest_result)
}
*/
/// Run a background task.
///
/// This spawns a seperate thread for a background process.
/// The background task is guaranteed to finish within its defined scope.
/// If the end of the scope is reached while the thread is still running it will block.
///
/// # Example
/// ```ignore
/// use comrade::background;
///
/// fn do_work() {
/// println!("doing work...");
///
/// // spawn background thread
/// background!(|| {
/// println!("doing something in the background");
/// std::thread::sleep(std::time::Duration::from_secs(3));
/// });
///
/// println!("doing something else...");
///
/// // end of scope
/// // the code will block until all background processes defined here are done.
/// }
///
/// fn main() {
/// do_work();
/// println!("finished with work");
/// }
/// ```
#[macro_export]
macro_rules! background {
($f:expr) => {
let handle = std::thread::spawn(move || $f());
comrade::defer!(|| {
handle.join().unwrap();
});
};
}
/// Start running a function after `duration`.
pub fn delay<F: Fn() + Send + 'static>(duration: std::time::Duration, f: F) {
let _ = std::thread::spawn(move || {
log::info!("Will start running in {duration:?}");
std::thread::sleep(duration);
f();
});
}
/// Run `f(&T) -> X` for every item in `items`
pub fn parallel<T: Send + Sync + 'static, F, X: Send + 'static>(items: Vec<T>, f: F) -> Vec<X>
where
F: Fn(&T) -> X + Send + Sync + Copy + 'static,
{
let threads: Vec<_> = items
.into_iter()
.map(|x| std::thread::spawn(move || f(&x)))
.collect();
threads.into_iter().map(|x| x.join().unwrap()).collect()
}
pub fn datetime_in(d: Duration) -> chrono::DateTime<chrono::Utc> {
chrono::Utc::now()
.checked_add_signed(chrono::TimeDelta::from_std(d).unwrap())
.unwrap()
}

View file

@ -8,6 +8,8 @@ use std::{
time::Duration,
};
use crate::cron::Cron;
/// Status receiver of a dead man switch
pub struct DeadManReceiver {
rx: Receiver<bool>,
@ -83,6 +85,16 @@ impl ServiceManager {
self
}
pub fn register_cron(self, cron: Arc<Cron>) -> (Self, Arc<Cron>) {
let cron_ret = Arc::clone(&cron);
(
self.register("cron", move |_| {
cron.run();
}),
cron_ret,
)
}
/// Register a new background service
pub fn register<T: Fn(DeadManSwitch) -> () + 'static + Send + Sync>(
mut self,