Compare commits
No commits in common. "46cb21dc2a11a4397c3a46a26607106303a5f77a" and "c010cc13e8689406efbd7fcedbf36ef6cc5d54f1" have entirely different histories.
46cb21dc2a
...
c010cc13e8
15 changed files with 63 additions and 1245 deletions
170
Cargo.lock
generated
170
Cargo.lock
generated
|
@ -26,21 +26,6 @@ dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "android-tzdata"
|
|
||||||
version = "0.1.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "android_system_properties"
|
|
||||||
version = "0.1.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
|
|
||||||
dependencies = [
|
|
||||||
"libc",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "anstream"
|
name = "anstream"
|
||||||
version = "0.6.18"
|
version = "0.6.18"
|
||||||
|
@ -124,12 +109,6 @@ version = "2.9.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
|
checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "bumpalo"
|
|
||||||
version = "3.17.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "byteorder"
|
name = "byteorder"
|
||||||
version = "1.5.0"
|
version = "1.5.0"
|
||||||
|
@ -142,35 +121,12 @@ version = "1.10.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
|
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "cc"
|
|
||||||
version = "1.2.16"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c"
|
|
||||||
dependencies = [
|
|
||||||
"shlex",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "cfg-if"
|
name = "cfg-if"
|
||||||
version = "1.0.0"
|
version = "1.0.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "chrono"
|
|
||||||
version = "0.4.40"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c"
|
|
||||||
dependencies = [
|
|
||||||
"android-tzdata",
|
|
||||||
"iana-time-zone",
|
|
||||||
"js-sys",
|
|
||||||
"num-traits",
|
|
||||||
"wasm-bindgen",
|
|
||||||
"windows-link",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "colorchoice"
|
name = "colorchoice"
|
||||||
version = "1.0.3"
|
version = "1.0.3"
|
||||||
|
@ -191,7 +147,6 @@ dependencies = [
|
||||||
name = "comrade"
|
name = "comrade"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"chrono",
|
|
||||||
"comrade-macro",
|
"comrade-macro",
|
||||||
"crossbeam",
|
"crossbeam",
|
||||||
"dashmap",
|
"dashmap",
|
||||||
|
@ -212,17 +167,10 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
"rand",
|
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "core-foundation-sys"
|
|
||||||
version = "0.8.7"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crossbeam"
|
name = "crossbeam"
|
||||||
version = "0.8.4"
|
version = "0.8.4"
|
||||||
|
@ -366,29 +314,6 @@ version = "2.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
|
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "iana-time-zone"
|
|
||||||
version = "0.1.61"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
|
|
||||||
dependencies = [
|
|
||||||
"android_system_properties",
|
|
||||||
"core-foundation-sys",
|
|
||||||
"iana-time-zone-haiku",
|
|
||||||
"js-sys",
|
|
||||||
"wasm-bindgen",
|
|
||||||
"windows-core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "iana-time-zone-haiku"
|
|
||||||
version = "0.1.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
|
|
||||||
dependencies = [
|
|
||||||
"cc",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "icu_collections"
|
name = "icu_collections"
|
||||||
version = "1.5.0"
|
version = "1.5.0"
|
||||||
|
@ -540,16 +465,6 @@ version = "1.0.15"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
|
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "js-sys"
|
|
||||||
version = "0.3.77"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f"
|
|
||||||
dependencies = [
|
|
||||||
"once_cell",
|
|
||||||
"wasm-bindgen",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "libc"
|
name = "libc"
|
||||||
version = "0.2.170"
|
version = "0.2.170"
|
||||||
|
@ -779,12 +694,6 @@ version = "0.1.24"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
|
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "rustversion"
|
|
||||||
version = "1.0.20"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ryu"
|
name = "ryu"
|
||||||
version = "1.0.20"
|
version = "1.0.20"
|
||||||
|
@ -835,12 +744,6 @@ version = "1.0.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
|
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "shlex"
|
|
||||||
version = "1.3.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "smallvec"
|
name = "smallvec"
|
||||||
version = "1.14.0"
|
version = "1.14.0"
|
||||||
|
@ -958,79 +861,6 @@ dependencies = [
|
||||||
"wit-bindgen-rt",
|
"wit-bindgen-rt",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "wasm-bindgen"
|
|
||||||
version = "0.2.100"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5"
|
|
||||||
dependencies = [
|
|
||||||
"cfg-if",
|
|
||||||
"once_cell",
|
|
||||||
"rustversion",
|
|
||||||
"wasm-bindgen-macro",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "wasm-bindgen-backend"
|
|
||||||
version = "0.2.100"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6"
|
|
||||||
dependencies = [
|
|
||||||
"bumpalo",
|
|
||||||
"log",
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
"wasm-bindgen-shared",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "wasm-bindgen-macro"
|
|
||||||
version = "0.2.100"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407"
|
|
||||||
dependencies = [
|
|
||||||
"quote",
|
|
||||||
"wasm-bindgen-macro-support",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "wasm-bindgen-macro-support"
|
|
||||||
version = "0.2.100"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
"wasm-bindgen-backend",
|
|
||||||
"wasm-bindgen-shared",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "wasm-bindgen-shared"
|
|
||||||
version = "0.2.100"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
|
|
||||||
dependencies = [
|
|
||||||
"unicode-ident",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows-core"
|
|
||||||
version = "0.52.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
|
|
||||||
dependencies = [
|
|
||||||
"windows-targets",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows-link"
|
|
||||||
version = "0.1.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "6dccfd733ce2b1753b03b6d3c65edf020262ea35e20ccdf3e288043e6dd620e3"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "windows-sys"
|
name = "windows-sys"
|
||||||
version = "0.52.0"
|
version = "0.52.0"
|
||||||
|
|
|
@ -16,4 +16,3 @@ rand = "0.9.0"
|
||||||
redis = "0.29.1"
|
redis = "0.29.1"
|
||||||
serde = "1.0.218"
|
serde = "1.0.218"
|
||||||
uuid = { version = "1.15.1", features = ["v4"] }
|
uuid = { version = "1.15.1", features = ["v4"] }
|
||||||
chrono = "0.4.40"
|
|
||||||
|
|
173
README.md
173
README.md
|
@ -11,52 +11,37 @@
|
||||||
|
|
||||||
## Core Concepts
|
## Core Concepts
|
||||||
|
|
||||||
### Higher Level Functions
|
### Parallel Execution
|
||||||
|
|
||||||
`comrade` provides various convenient functions.
|
`comrade` provides a simple interface for running tasks in parallel, perfect for independent tasks that can be processed concurrently.
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
// process every item in parallel
|
|
||||||
let results: Vec<i32> = parallel(items, |item: &i32| {
|
let results: Vec<i32> = parallel(items, |item: &i32| {
|
||||||
// ...
|
// ...
|
||||||
});
|
});
|
||||||
|
```
|
||||||
|
|
||||||
// rally (return fastest computed result out of items)
|
### Rally Execution
|
||||||
// example: run multiple downloads and return the first finished one
|
|
||||||
|
The `rally` function allows you to run multiple tasks in parallel and return the result of the **first task to finish**. This is useful when you want to prioritize the first available result from several tasks (example: download from multiple HTTP mirrors).
|
||||||
|
|
||||||
|
```rust
|
||||||
let res: (i32, i32) = rally(items, |item: &i32| {
|
let res: (i32, i32) = rally(items, |item: &i32| {
|
||||||
// ...
|
// ...
|
||||||
});
|
});
|
||||||
|
```
|
||||||
|
|
||||||
// Run background tasks without blocking the main thread
|
### Background Tasks
|
||||||
background!(|| {
|
|
||||||
|
Easily run tasks in the background without blocking the main thread. This is useful for code that needs to be run without waiting for a result.
|
||||||
|
|
||||||
|
```rust
|
||||||
|
fn handle() {
|
||||||
|
background(|| {
|
||||||
// Background task logic
|
// Background task logic
|
||||||
println!("This is a background task!");
|
println!("This is a background task!");
|
||||||
});
|
});
|
||||||
|
|
||||||
fn some_fn() {
|
|
||||||
println!("Hello World!");
|
|
||||||
|
|
||||||
defer!(|| {
|
|
||||||
// this will run at the end of the scope
|
|
||||||
println!("Bye World!");
|
|
||||||
});
|
|
||||||
|
|
||||||
println!("doing something");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retry a `Fn() -> Option<X>` until it returns `Some(_)`.
|
|
||||||
let value: &str = retry(|| {
|
|
||||||
if rand::rng().random_range(0..10) > 5 {
|
|
||||||
Some("hello")
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// Delayed execution
|
|
||||||
delay(Duration::from_secs(4), || {
|
|
||||||
println!("I will run in 4 seconds from now on!");
|
|
||||||
});
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Service Management
|
### Service Management
|
||||||
|
@ -79,54 +64,6 @@ fn run_services() {
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Cron Tasks
|
|
||||||
The ServiceManager also supports running functions periodically or on time:
|
|
||||||
```rust
|
|
||||||
fn main() {
|
|
||||||
let s = ServiceManager::new();
|
|
||||||
// Init Cron Manager
|
|
||||||
let cron = Cron::new();
|
|
||||||
|
|
||||||
// Add Cron Task
|
|
||||||
cron.add_task("4_sec", Schedule::Every(Duration::from_secs(4)), || {
|
|
||||||
println!("I run every 4 at {}", chrono::Utc::now());
|
|
||||||
});
|
|
||||||
|
|
||||||
cron.add_task("2_sec", Schedule::Every(Duration::from_secs(2)), || {
|
|
||||||
println!("I run every 2 seconds at {}", chrono::Utc::now());
|
|
||||||
});
|
|
||||||
|
|
||||||
cron.add_task(
|
|
||||||
"daily",
|
|
||||||
Schedule::Every(Duration::from_secs(60 * 60 * 24)),
|
|
||||||
|| {
|
|
||||||
println!("I run daily");
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// Start running the Cron Manager
|
|
||||||
let (s, cron) = s.register_cron(cron.into());
|
|
||||||
let s = s.spawn();
|
|
||||||
defer!(|| {
|
|
||||||
s.join().unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
// Add another Cron Task after running the manager dynamically
|
|
||||||
cron.add_task(
|
|
||||||
"future_task",
|
|
||||||
Schedule::At(datetime_in(Duration::from_secs(2))),
|
|
||||||
|| {
|
|
||||||
println!("I am in the future");
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// Functionally the same as above
|
|
||||||
cron.run_at(datetime_in(Duration::from_secs(3)), || {
|
|
||||||
println!("The Future");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### Worker Unions
|
### Worker Unions
|
||||||
|
|
||||||
You can annotate a function with `#[worker]` which gives them superpowers. These functions can be queued and dispatched by the system, and their results are returned when completed.
|
You can annotate a function with `#[worker]` which gives them superpowers. These functions can be queued and dispatched by the system, and their results are returned when completed.
|
||||||
|
@ -176,21 +113,6 @@ fn main() {
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
You could effeciently run batch work:
|
|
||||||
```rust
|
|
||||||
fn batch_work() {
|
|
||||||
let mut work = Vec::new();
|
|
||||||
|
|
||||||
for i in 0..10 {
|
|
||||||
work.push((i.to_string(), multiply_async(i, i)));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (label, res) in LabelPendingTaskIterator(work) {
|
|
||||||
println!("Finished task {label} -> {res}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
These tasks can now be distributed with Valkey.
|
These tasks can now be distributed with Valkey.
|
||||||
|
|
||||||
Make sure you have a Valkey server running and the `$VALKEY_URL` environment variable is set for your application:
|
Make sure you have a Valkey server running and the `$VALKEY_URL` environment variable is set for your application:
|
||||||
|
@ -233,66 +155,3 @@ fn main() {
|
||||||
println!("x is {x}");
|
println!("x is {x}");
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
### Stateful Functions
|
|
||||||
If you have a workload which can be iteratively computed by modifying state it can be modeled as a `IteratedFunction`.
|
|
||||||
These functions can be paused, stopped, saved to disk and revived later.
|
|
||||||
|
|
||||||
First define a iterative function:
|
|
||||||
```rust
|
|
||||||
#[derive(Debug, Default, Clone)]
|
|
||||||
pub struct AccFnContext {
|
|
||||||
pub iter: u64,
|
|
||||||
pub acc: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn multiply_iterated(
|
|
||||||
mut ctx: FunctionContext<AccFnContext, (u64, u64), u64>,
|
|
||||||
) -> FunctionContext<AccFnContext, (u64, u64), u64> {
|
|
||||||
// init
|
|
||||||
let (a, b) = ctx.initial;
|
|
||||||
|
|
||||||
// end condition (return)
|
|
||||||
if b == ctx.context.iter {
|
|
||||||
ctx.done(ctx.context.acc);
|
|
||||||
return ctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
// computation
|
|
||||||
println!("doing calc {}", ctx.context.acc);
|
|
||||||
std::thread::sleep(Duration::from_millis(50));
|
|
||||||
let val = ctx.context.acc + a;
|
|
||||||
|
|
||||||
// saving state
|
|
||||||
ctx.state(|x| {
|
|
||||||
x.iter += 1;
|
|
||||||
x.acc = val;
|
|
||||||
});
|
|
||||||
|
|
||||||
return ctx;
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
Then you can use it like:
|
|
||||||
```rust
|
|
||||||
fn main() {
|
|
||||||
// run normally
|
|
||||||
let f = IteratedFunction::new_threaded(multiply_iterated, (5, 5));
|
|
||||||
println!("Result is {}", f.output());
|
|
||||||
|
|
||||||
// function starts running
|
|
||||||
let f = IteratedFunction::new_threaded(multiply_iterated, (5, 50));
|
|
||||||
|
|
||||||
// pause the function
|
|
||||||
f.pause();
|
|
||||||
|
|
||||||
// stop the function and get state
|
|
||||||
let state = f.stop();
|
|
||||||
|
|
||||||
// revive and start running again from state
|
|
||||||
let f = IteratedFunction::new_threaded_from_state(multiply_iterated, state);
|
|
||||||
|
|
||||||
// get output
|
|
||||||
println!("Result is {}", f.output());
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
172
comrade-macro/Cargo.lock
generated
172
comrade-macro/Cargo.lock
generated
|
@ -2,68 +2,28 @@
|
||||||
# It is not intended for manual editing.
|
# It is not intended for manual editing.
|
||||||
version = 4
|
version = 4
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "bitflags"
|
|
||||||
version = "2.9.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "cfg-if"
|
|
||||||
version = "1.0.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "comrade-macro"
|
name = "comrade-macro"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
"rand",
|
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "getrandom"
|
|
||||||
version = "0.3.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8"
|
|
||||||
dependencies = [
|
|
||||||
"cfg-if",
|
|
||||||
"libc",
|
|
||||||
"wasi",
|
|
||||||
"windows-targets",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "1.0.15"
|
version = "1.0.15"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
|
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "libc"
|
|
||||||
version = "0.2.170"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828"
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "memchr"
|
name = "memchr"
|
||||||
version = "2.7.4"
|
version = "2.7.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
|
checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "ppv-lite86"
|
|
||||||
version = "0.2.21"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
|
|
||||||
dependencies = [
|
|
||||||
"zerocopy",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "proc-macro2"
|
name = "proc-macro2"
|
||||||
version = "1.0.94"
|
version = "1.0.94"
|
||||||
|
@ -82,36 +42,6 @@ dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "rand"
|
|
||||||
version = "0.9.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
|
|
||||||
dependencies = [
|
|
||||||
"rand_chacha",
|
|
||||||
"rand_core",
|
|
||||||
"zerocopy",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "rand_chacha"
|
|
||||||
version = "0.9.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
|
|
||||||
dependencies = [
|
|
||||||
"ppv-lite86",
|
|
||||||
"rand_core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "rand_core"
|
|
||||||
version = "0.9.3"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
|
|
||||||
dependencies = [
|
|
||||||
"getrandom",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ryu"
|
name = "ryu"
|
||||||
version = "1.0.20"
|
version = "1.0.20"
|
||||||
|
@ -166,105 +96,3 @@ name = "unicode-ident"
|
||||||
version = "1.0.18"
|
version = "1.0.18"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
|
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "wasi"
|
|
||||||
version = "0.13.3+wasi-0.2.2"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2"
|
|
||||||
dependencies = [
|
|
||||||
"wit-bindgen-rt",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows-targets"
|
|
||||||
version = "0.52.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
|
|
||||||
dependencies = [
|
|
||||||
"windows_aarch64_gnullvm",
|
|
||||||
"windows_aarch64_msvc",
|
|
||||||
"windows_i686_gnu",
|
|
||||||
"windows_i686_gnullvm",
|
|
||||||
"windows_i686_msvc",
|
|
||||||
"windows_x86_64_gnu",
|
|
||||||
"windows_x86_64_gnullvm",
|
|
||||||
"windows_x86_64_msvc",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_aarch64_gnullvm"
|
|
||||||
version = "0.52.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_aarch64_msvc"
|
|
||||||
version = "0.52.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_i686_gnu"
|
|
||||||
version = "0.52.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_i686_gnullvm"
|
|
||||||
version = "0.52.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_i686_msvc"
|
|
||||||
version = "0.52.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_x86_64_gnu"
|
|
||||||
version = "0.52.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_x86_64_gnullvm"
|
|
||||||
version = "0.52.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "windows_x86_64_msvc"
|
|
||||||
version = "0.52.6"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "wit-bindgen-rt"
|
|
||||||
version = "0.33.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c"
|
|
||||||
dependencies = [
|
|
||||||
"bitflags",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "zerocopy"
|
|
||||||
version = "0.8.23"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6"
|
|
||||||
dependencies = [
|
|
||||||
"zerocopy-derive",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "zerocopy-derive"
|
|
||||||
version = "0.8.23"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154"
|
|
||||||
dependencies = [
|
|
||||||
"proc-macro2",
|
|
||||||
"quote",
|
|
||||||
"syn",
|
|
||||||
]
|
|
||||||
|
|
|
@ -6,7 +6,6 @@ edition = "2024"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
proc-macro2 = "1.0.94"
|
proc-macro2 = "1.0.94"
|
||||||
quote = "1.0.39"
|
quote = "1.0.39"
|
||||||
rand = "0.9.0"
|
|
||||||
serde_json = "1.0.140"
|
serde_json = "1.0.140"
|
||||||
syn = { version = "2.0.99", features = ["full"] }
|
syn = { version = "2.0.99", features = ["full"] }
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use proc_macro::TokenStream;
|
use proc_macro::TokenStream;
|
||||||
use quote::{format_ident, quote};
|
use quote::{format_ident, quote};
|
||||||
use rand::Rng;
|
use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input};
|
||||||
use syn::{ExprClosure, FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input};
|
|
||||||
|
|
||||||
/// This macro turns this function into a worker.
|
/// This macro turns this function into a worker.
|
||||||
///
|
///
|
||||||
|
@ -272,35 +271,3 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream {
|
||||||
|
|
||||||
output.into()
|
output.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A procedural macro that defers execution of a closure until the surrounding scope ends similiarly to Go's defer statement.
|
|
||||||
///
|
|
||||||
/// # Example
|
|
||||||
/// ```
|
|
||||||
/// use comrade::defer;
|
|
||||||
///
|
|
||||||
/// fn main() {
|
|
||||||
/// defer!(|| {
|
|
||||||
/// println!("This will be executed at the end of the scope.");
|
|
||||||
/// });
|
|
||||||
/// println!("This runs first.");
|
|
||||||
/// }
|
|
||||||
/// ```
|
|
||||||
///
|
|
||||||
/// The `defer!` macro ensures that the provided closure is wrapped in a `Defer` instance, which will execute the closure when the variable goes out of scope.
|
|
||||||
#[proc_macro]
|
|
||||||
pub fn defer(input: TokenStream) -> TokenStream {
|
|
||||||
// Parse the input as a closure expression (|| { ... })
|
|
||||||
let closure = parse_macro_input!(input as ExprClosure);
|
|
||||||
|
|
||||||
// Generate a random number for a unique variable name
|
|
||||||
let random_number = rand::rng().random_range(1000..9999);
|
|
||||||
let rand_ident = format_ident!("{}", format!("defer_{}", random_number));
|
|
||||||
|
|
||||||
// Expand into a let statement that wraps the closure in Defer::new
|
|
||||||
let expanded = quote! {
|
|
||||||
let #rand_ident = comrade::Defer::new(#closure);
|
|
||||||
};
|
|
||||||
|
|
||||||
TokenStream::from(expanded)
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,59 +0,0 @@
|
||||||
use comrade::{
|
|
||||||
cron::{Cron, Schedule},
|
|
||||||
datetime_in, defer, delay,
|
|
||||||
service::ServiceManager,
|
|
||||||
};
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
env_logger::init();
|
|
||||||
|
|
||||||
let s = ServiceManager::new();
|
|
||||||
// Init Cron Manager
|
|
||||||
let cron = Cron::new();
|
|
||||||
|
|
||||||
// Add Cron Task
|
|
||||||
cron.add_task("4_sec", Schedule::Every(Duration::from_secs(4)), || {
|
|
||||||
println!("I run every 4 at {}", chrono::Utc::now());
|
|
||||||
});
|
|
||||||
|
|
||||||
cron.add_task("2_sec", Schedule::Every(Duration::from_secs(2)), || {
|
|
||||||
println!("I run every 2 seconds at {}", chrono::Utc::now());
|
|
||||||
});
|
|
||||||
|
|
||||||
cron.add_task(
|
|
||||||
"daily",
|
|
||||||
Schedule::Every(Duration::from_secs(60 * 60 * 24)),
|
|
||||||
|| {
|
|
||||||
println!("I run daily");
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// Start running the Cron Manager
|
|
||||||
let (s, cron) = s.register_cron(cron.into());
|
|
||||||
let s = s.spawn();
|
|
||||||
defer!(|| {
|
|
||||||
s.join().unwrap();
|
|
||||||
});
|
|
||||||
|
|
||||||
// Add another Cron Task after running the manager dynamically
|
|
||||||
cron.add_task(
|
|
||||||
"future_task",
|
|
||||||
Schedule::At(datetime_in(Duration::from_secs(2))),
|
|
||||||
|| {
|
|
||||||
println!("I am in the future");
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// Functionally the same as above
|
|
||||||
cron.run_at(datetime_in(Duration::from_secs(3)), || {
|
|
||||||
println!("The Future");
|
|
||||||
});
|
|
||||||
|
|
||||||
// ---
|
|
||||||
|
|
||||||
// Delayed execution
|
|
||||||
delay(Duration::from_secs(4), || {
|
|
||||||
println!("I will run in 4 seconds from now on!");
|
|
||||||
});
|
|
||||||
}
|
|
|
@ -1,12 +0,0 @@
|
||||||
use comrade::defer;
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
println!("Hello World!");
|
|
||||||
|
|
||||||
defer!(|| {
|
|
||||||
// this will run at the end of the scope
|
|
||||||
println!("Bye World!");
|
|
||||||
});
|
|
||||||
|
|
||||||
println!("doing something");
|
|
||||||
}
|
|
|
@ -1,67 +0,0 @@
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use comrade::iterated::{FunctionContext, IteratedFunction};
|
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone)]
|
|
||||||
pub struct AccFnContext {
|
|
||||||
pub iter: u64,
|
|
||||||
pub acc: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn multiply_iterated(
|
|
||||||
mut ctx: FunctionContext<AccFnContext, (u64, u64), u64>,
|
|
||||||
) -> FunctionContext<AccFnContext, (u64, u64), u64> {
|
|
||||||
// init
|
|
||||||
let (a, b) = ctx.initial;
|
|
||||||
|
|
||||||
// end condition (return)
|
|
||||||
if b == ctx.context.iter {
|
|
||||||
ctx.done(ctx.context.acc);
|
|
||||||
return ctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
// computation
|
|
||||||
println!("doing calc {}", ctx.context.acc);
|
|
||||||
std::thread::sleep(Duration::from_millis(50));
|
|
||||||
let val = ctx.context.acc + a;
|
|
||||||
|
|
||||||
// saving state
|
|
||||||
ctx.state(|x| {
|
|
||||||
x.iter += 1;
|
|
||||||
x.acc = val;
|
|
||||||
});
|
|
||||||
|
|
||||||
return ctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
env_logger::init();
|
|
||||||
|
|
||||||
let f: IteratedFunction<AccFnContext, (u64, u64), u64> =
|
|
||||||
IteratedFunction::new(multiply_iterated);
|
|
||||||
|
|
||||||
let x = f.run_to_end((5, 4));
|
|
||||||
println!("computed x -> {x}");
|
|
||||||
|
|
||||||
// async
|
|
||||||
let f = IteratedFunction::new_threaded(multiply_iterated, (5, 5));
|
|
||||||
println!("This is running");
|
|
||||||
println!("result is {}", f.output());
|
|
||||||
|
|
||||||
let f = IteratedFunction::new_threaded(multiply_iterated, (5, 50));
|
|
||||||
|
|
||||||
// pause the function
|
|
||||||
f.pause();
|
|
||||||
|
|
||||||
// stop the function and get state
|
|
||||||
let state = f.stop();
|
|
||||||
println!("This was running");
|
|
||||||
println!("state is {state:?}");
|
|
||||||
|
|
||||||
println!("reviving function");
|
|
||||||
// continue with previous computed state
|
|
||||||
let f = IteratedFunction::new_threaded_from_state(multiply_iterated, state);
|
|
||||||
|
|
||||||
// get output
|
|
||||||
println!("result is {}", f.output());
|
|
||||||
}
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use comrade::{
|
use comrade::{
|
||||||
job::{JobDispatcher, JobOrder, LabelPendingTaskIterator},
|
job::{JobDispatcher, JobOrder},
|
||||||
service::ServiceManager,
|
service::ServiceManager,
|
||||||
worker,
|
worker,
|
||||||
};
|
};
|
||||||
|
@ -22,18 +22,6 @@ pub fn multiply(a: i32, b: i32) -> i32 {
|
||||||
a * b
|
a * b
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn batch_work() {
|
|
||||||
let mut work = Vec::new();
|
|
||||||
|
|
||||||
for i in 0..10 {
|
|
||||||
work.push((i.to_string(), multiply_async(i, i)));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (label, res) in LabelPendingTaskIterator(work) {
|
|
||||||
println!("Finished task {label} -> {res}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn do_work(multiply: multiply_Scoped, myfn: myfn_Scoped) {
|
fn do_work(multiply: multiply_Scoped, myfn: myfn_Scoped) {
|
||||||
for i in 0..10 {
|
for i in 0..10 {
|
||||||
let x = multiply.call(i, i);
|
let x = multiply.call(i, i);
|
||||||
|
@ -53,19 +41,17 @@ fn main() {
|
||||||
let s = s.spawn();
|
let s = s.spawn();
|
||||||
|
|
||||||
do_work(multiply, myfn_fn);
|
do_work(multiply, myfn_fn);
|
||||||
|
|
||||||
s.join().unwrap();
|
s.join().unwrap();
|
||||||
|
|
||||||
let s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay);
|
let s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay);
|
||||||
let s = myfn_init(s);
|
let s = myfn_init(s);
|
||||||
let s = multiply_init(s);
|
|
||||||
let s = take_time_init(s);
|
let s = take_time_init(s);
|
||||||
let s = s.spawn();
|
let s = s.spawn();
|
||||||
|
|
||||||
let x = myfn(55);
|
let x = myfn(55);
|
||||||
println!("myfn {x}");
|
println!("myfn {x}");
|
||||||
|
|
||||||
batch_work();
|
|
||||||
|
|
||||||
// decoupled
|
// decoupled
|
||||||
let e = take_time_async(1500);
|
let e = take_time_async(1500);
|
||||||
println!("This will run right after!");
|
println!("This will run right after!");
|
||||||
|
|
195
src/cron.rs
195
src/cron.rs
|
@ -1,195 +0,0 @@
|
||||||
use std::{
|
|
||||||
sync::{Arc, RwLock},
|
|
||||||
thread::JoinHandle,
|
|
||||||
time::Duration,
|
|
||||||
u64,
|
|
||||||
};
|
|
||||||
|
|
||||||
use chrono::Utc;
|
|
||||||
use rand::Rng;
|
|
||||||
|
|
||||||
pub enum Schedule {
|
|
||||||
Every(Duration),
|
|
||||||
At(chrono::DateTime<Utc>),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct CronTask {
|
|
||||||
f: Arc<Box<dyn Fn() + Send + Sync + 'static>>,
|
|
||||||
schedule: Schedule,
|
|
||||||
name: String,
|
|
||||||
last_run: Option<chrono::DateTime<Utc>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CronTask {
|
|
||||||
pub fn new<F: Fn() + Send + Sync + 'static>(name: &str, schedule: Schedule, f: F) -> Self {
|
|
||||||
Self {
|
|
||||||
f: Arc::new(Box::new(f)),
|
|
||||||
schedule,
|
|
||||||
name: name.to_string(),
|
|
||||||
last_run: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_absolute(&self) -> bool {
|
|
||||||
match self.schedule {
|
|
||||||
Schedule::Every(_) => false,
|
|
||||||
Schedule::At(_) => true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run(&mut self) -> JoinHandle<()> {
|
|
||||||
log::info!("Starting cron task '{}'", self.name);
|
|
||||||
self.last_run = Some(Utc::now());
|
|
||||||
let f = Arc::clone(&self.f);
|
|
||||||
std::thread::spawn(move || {
|
|
||||||
f.as_ref()();
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn wait_until(&mut self) -> Duration {
|
|
||||||
match self.schedule {
|
|
||||||
Schedule::Every(duration) => {
|
|
||||||
let now = Utc::now();
|
|
||||||
if let Some(last_exec) = self.last_run {
|
|
||||||
let since_then = (now - last_exec).to_std().unwrap();
|
|
||||||
|
|
||||||
duration.checked_sub(since_then).unwrap_or(Duration::ZERO)
|
|
||||||
} else {
|
|
||||||
self.last_run = Some(Utc::now());
|
|
||||||
duration
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Schedule::At(date_time) => {
|
|
||||||
if self.last_run.is_none() {
|
|
||||||
let now = Utc::now();
|
|
||||||
if let Ok(dur) = date_time.signed_duration_since(&now).to_std() {
|
|
||||||
dur
|
|
||||||
} else {
|
|
||||||
Duration::ZERO
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Duration::from_secs(u64::MAX)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Cron {
|
|
||||||
tasks: RwLock<Vec<CronTask>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Cron {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
tasks: RwLock::new(Vec::new()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn add_task<F: Fn() + Send + Sync + 'static>(&self, name: &str, schedule: Schedule, f: F) {
|
|
||||||
self.tasks
|
|
||||||
.write()
|
|
||||||
.unwrap()
|
|
||||||
.push(CronTask::new(name, schedule, f));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run_at<F: Fn() + Send + Sync + 'static>(&self, dt: chrono::DateTime<chrono::Utc>, f: F) {
|
|
||||||
let name = format!("delayed_{}", rand::rng().random_range(1000..9999));
|
|
||||||
self.tasks
|
|
||||||
.write()
|
|
||||||
.unwrap()
|
|
||||||
.push(CronTask::new(&name, Schedule::At(dt), f));
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run(&self) {
|
|
||||||
loop {
|
|
||||||
// init
|
|
||||||
let mut last_wait = Duration::from_secs(u64::MAX);
|
|
||||||
let mut last_task: Option<usize> = None;
|
|
||||||
|
|
||||||
{
|
|
||||||
// find next task
|
|
||||||
let mut tasks = self.tasks.write().unwrap();
|
|
||||||
for (i, task) in tasks.iter_mut().enumerate() {
|
|
||||||
let wait_time = task.wait_until();
|
|
||||||
if wait_time < last_wait {
|
|
||||||
last_wait = wait_time;
|
|
||||||
last_task = Some(i);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(index) = last_task {
|
|
||||||
// init
|
|
||||||
let mut remove = false;
|
|
||||||
let mut skip = false;
|
|
||||||
|
|
||||||
// limit longest blocking time (5s)
|
|
||||||
let real_wait = if last_wait.gt(&Duration::from_secs(5)) {
|
|
||||||
skip = true;
|
|
||||||
Duration::from_secs(5)
|
|
||||||
} else {
|
|
||||||
last_wait
|
|
||||||
};
|
|
||||||
|
|
||||||
{
|
|
||||||
// logging
|
|
||||||
let tasks = self.tasks.read().unwrap();
|
|
||||||
log::debug!("Managing {} cron task(s)", tasks.len());
|
|
||||||
|
|
||||||
let task = tasks.get(index).unwrap();
|
|
||||||
if real_wait == last_wait {
|
|
||||||
log::debug!(
|
|
||||||
"Waiting for {real_wait:?} to start cron task '{}'",
|
|
||||||
task.name
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
log::debug!(
|
|
||||||
"Would wait for {last_wait:?} to start cron task '{}'. Waiting for {real_wait:?}",
|
|
||||||
task.name
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// if somehow we wait indefinitely
|
|
||||||
if last_wait == Duration::from_secs(u64::MAX) {
|
|
||||||
log::warn!("Infinite wait time for cron");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// set remove flag for absolute time cron tasks
|
|
||||||
if task.is_absolute() {
|
|
||||||
log::info!(
|
|
||||||
"Removing task '{}' from cron because it will never run again",
|
|
||||||
task.name
|
|
||||||
);
|
|
||||||
remove = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// sleep until task
|
|
||||||
std::thread::sleep(real_wait);
|
|
||||||
|
|
||||||
// skip if we are still just sleeping
|
|
||||||
if skip {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
|
||||||
// run cron task
|
|
||||||
let mut tasks = self.tasks.write().unwrap();
|
|
||||||
let task = tasks.get_mut(index).unwrap();
|
|
||||||
let _ = task.run();
|
|
||||||
}
|
|
||||||
|
|
||||||
if remove {
|
|
||||||
{
|
|
||||||
// remove if requested
|
|
||||||
let mut tasks = self.tasks.write().unwrap();
|
|
||||||
log::info!("Removing cron task #{index}");
|
|
||||||
tasks.remove(index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
22
src/defer.rs
22
src/defer.rs
|
@ -1,22 +0,0 @@
|
||||||
use std::mem::take;
|
|
||||||
|
|
||||||
pub struct Defer {
|
|
||||||
f: Option<Box<dyn FnOnce()>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Defer {
|
|
||||||
pub fn new<F: FnOnce() + 'static>(f: F) -> Self {
|
|
||||||
Self {
|
|
||||||
f: Some(Box::new(f)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for Defer {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
log::debug!("Calling defer function");
|
|
||||||
if let Some(f) = take(&mut self.f) {
|
|
||||||
f();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
240
src/iterated.rs
240
src/iterated.rs
|
@ -1,240 +0,0 @@
|
||||||
// TODO : docs
|
|
||||||
|
|
||||||
// TODO : measure avg iteration time
|
|
||||||
|
|
||||||
use std::{sync::Arc, time::Duration};
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
job::{JobDispatch, JobDispatcher},
|
|
||||||
retry,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct FunctionContext<C, I, O> {
|
|
||||||
pub context: C,
|
|
||||||
pub initial: I,
|
|
||||||
pub output: Option<O>,
|
|
||||||
done: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C: Default, I, O> FunctionContext<C, I, O> {
|
|
||||||
pub fn new(input: I) -> FunctionContext<C, I, O> {
|
|
||||||
FunctionContext {
|
|
||||||
context: C::default(),
|
|
||||||
initial: input,
|
|
||||||
output: None,
|
|
||||||
done: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C, I, O> FunctionContext<C, I, O> {
|
|
||||||
pub fn state<F: Fn(&mut C)>(&mut self, f: F) {
|
|
||||||
f(&mut self.context);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn done(&mut self, output: O) {
|
|
||||||
self.done = true;
|
|
||||||
self.output = Some(output);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct RunningIteratedFunction<C, I, O> {
|
|
||||||
output: std::marker::PhantomData<O>,
|
|
||||||
context: FunctionContext<C, I, O>,
|
|
||||||
f: Arc<Box<dyn Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C, I, O> RunningIteratedFunction<C, I, O> {
|
|
||||||
pub fn next(mut self) -> Self {
|
|
||||||
let new_ctx = self.f.as_ref()(self.context);
|
|
||||||
self.context = new_ctx;
|
|
||||||
return self;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn return_value(&self) -> Option<&O> {
|
|
||||||
self.context.output.as_ref()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn take_return_value(self) -> O {
|
|
||||||
self.context.output.unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_done(&self) -> bool {
|
|
||||||
self.context.done
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ThreadRunningIteratedFunction<C: Send + 'static, I: Send + 'static, O: Send + 'static>(
|
|
||||||
JobDispatcher<IteratedFnQuery, IteratedFnOutput<C, I, O>>,
|
|
||||||
);
|
|
||||||
|
|
||||||
impl<C: Send + 'static, I: Send + 'static, O: Send + 'static>
|
|
||||||
ThreadRunningIteratedFunction<C, I, O>
|
|
||||||
{
|
|
||||||
pub fn start(&self) {
|
|
||||||
let _ = self.0.try_send(IteratedFnQuery::Start);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn pause(&self) {
|
|
||||||
let _ = self.0.try_send(IteratedFnQuery::Pause);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn stop(&self) -> FunctionContext<C, I, O> {
|
|
||||||
match self.0.try_send(IteratedFnQuery::Stop) {
|
|
||||||
Some(IteratedFnOutput::Context(function_context)) => function_context,
|
|
||||||
_ => unreachable!(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn try_output(&self) -> Option<O> {
|
|
||||||
match self.0.send(IteratedFnQuery::GetOutput) {
|
|
||||||
IteratedFnOutput::Out(out) => Some(out),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn output(&self) -> O {
|
|
||||||
retry(|| self.try_output())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum IteratedFnQuery {
|
|
||||||
Pause,
|
|
||||||
Start,
|
|
||||||
Stop,
|
|
||||||
GetOutput,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub enum IteratedFnOutput<C, I, O> {
|
|
||||||
Out(O),
|
|
||||||
Context(FunctionContext<C, I, O>),
|
|
||||||
Ok,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct IteratedFunction<C, I, O> {
|
|
||||||
output: std::marker::PhantomData<O>,
|
|
||||||
f: Arc<Box<dyn Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<C: Default + Clone + Send, I: Clone + Send + 'static, O: Send + Clone>
|
|
||||||
IteratedFunction<C, I, O>
|
|
||||||
{
|
|
||||||
pub fn new<F: Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O> + 'static>(
|
|
||||||
f: F,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
f: Arc::new(Box::new(f)),
|
|
||||||
output: std::marker::PhantomData,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_threaded<
|
|
||||||
F: Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O> + Send + 'static,
|
|
||||||
>(
|
|
||||||
f: F,
|
|
||||||
input: I,
|
|
||||||
) -> ThreadRunningIteratedFunction<C, I, O> {
|
|
||||||
Self::new_threaded_from_state(f, FunctionContext::new(input))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn new_threaded_from_state<
|
|
||||||
F: Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O> + Send + 'static,
|
|
||||||
>(
|
|
||||||
f: F,
|
|
||||||
context: FunctionContext<C, I, O>,
|
|
||||||
) -> ThreadRunningIteratedFunction<C, I, O> {
|
|
||||||
let (dispatch, recv) = JobDispatcher::<IteratedFnQuery, IteratedFnOutput<C, I, O>>::new();
|
|
||||||
|
|
||||||
let _ = std::thread::spawn(move || {
|
|
||||||
let f = Self::new(f);
|
|
||||||
let mut f = f.call_with_context(context);
|
|
||||||
|
|
||||||
let mut counter = 0;
|
|
||||||
let mut sleep = false;
|
|
||||||
while !f.is_done() {
|
|
||||||
if sleep {
|
|
||||||
std::thread::sleep(Duration::from_secs(3));
|
|
||||||
}
|
|
||||||
|
|
||||||
if counter == 5 || sleep {
|
|
||||||
if let Ok(request) = recv.recv_timeout(Duration::from_millis(300)) {
|
|
||||||
match request.param {
|
|
||||||
IteratedFnQuery::Pause => {
|
|
||||||
log::info!("Paused threaded iterative function");
|
|
||||||
sleep = true;
|
|
||||||
}
|
|
||||||
IteratedFnQuery::Start => {
|
|
||||||
log::info!("Restarted threaded iterative function");
|
|
||||||
sleep = false;
|
|
||||||
}
|
|
||||||
IteratedFnQuery::Stop => {
|
|
||||||
log::info!("Ending threaded iterative function");
|
|
||||||
request.done(IteratedFnOutput::Context(f.context));
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
}
|
|
||||||
|
|
||||||
request.done(IteratedFnOutput::Ok);
|
|
||||||
}
|
|
||||||
counter = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if !sleep {
|
|
||||||
f = f.next();
|
|
||||||
}
|
|
||||||
|
|
||||||
counter += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if f.is_done() {
|
|
||||||
while let Ok(request) = recv.recv() {
|
|
||||||
match request.param {
|
|
||||||
IteratedFnQuery::Stop => {
|
|
||||||
log::warn!("Function was asked to stop but was already done");
|
|
||||||
request.done(IteratedFnOutput::Context(f.context.clone()));
|
|
||||||
}
|
|
||||||
IteratedFnQuery::GetOutput => {
|
|
||||||
request.done(IteratedFnOutput::Out(f.context.output.clone().unwrap()));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
request.done(IteratedFnOutput::Out(f.context.output.clone().unwrap()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
ThreadRunningIteratedFunction(dispatch)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn call_with_context(
|
|
||||||
&self,
|
|
||||||
ctx: FunctionContext<C, I, O>,
|
|
||||||
) -> RunningIteratedFunction<C, I, O> {
|
|
||||||
RunningIteratedFunction {
|
|
||||||
output: std::marker::PhantomData,
|
|
||||||
context: ctx,
|
|
||||||
f: self.f.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn call(&self, input: I) -> RunningIteratedFunction<C, I, O> {
|
|
||||||
RunningIteratedFunction {
|
|
||||||
output: std::marker::PhantomData,
|
|
||||||
context: FunctionContext::new(input),
|
|
||||||
f: self.f.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run_to_end(&self, input: I) -> O {
|
|
||||||
let mut f = self.call(input);
|
|
||||||
while !f.is_done() {
|
|
||||||
f = f.next();
|
|
||||||
}
|
|
||||||
return f.take_return_value();
|
|
||||||
}
|
|
||||||
}
|
|
129
src/lib.rs
129
src/lib.rs
|
@ -1,18 +1,8 @@
|
||||||
#![feature(fn_traits)]
|
use std::{sync::mpsc, thread, time::Instant};
|
||||||
use std::{
|
|
||||||
sync::mpsc,
|
|
||||||
thread,
|
|
||||||
time::{Duration, Instant},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub mod cron;
|
|
||||||
mod defer;
|
|
||||||
pub mod iterated;
|
|
||||||
pub mod job;
|
pub mod job;
|
||||||
pub mod service;
|
pub mod service;
|
||||||
pub use defer::Defer;
|
pub use comrade_macro::worker;
|
||||||
|
|
||||||
pub use comrade_macro::{defer, worker};
|
|
||||||
pub use crossbeam;
|
pub use crossbeam;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
|
@ -20,6 +10,16 @@ pub use serde_json;
|
||||||
|
|
||||||
// TODO : worker docs + refactor
|
// TODO : worker docs + refactor
|
||||||
|
|
||||||
|
// TODO : functions which can be stopped, paused, etc
|
||||||
|
/*
|
||||||
|
Example:
|
||||||
|
|
||||||
|
let myf = Function::new(|| do_something());
|
||||||
|
|
||||||
|
// stop fn
|
||||||
|
myf.stop();
|
||||||
|
*/
|
||||||
|
|
||||||
pub static UNION: Lazy<
|
pub static UNION: Lazy<
|
||||||
DashMap<&'static str, job::JobMultiplexer<serde_json::Value, serde_json::Value>>,
|
DashMap<&'static str, job::JobMultiplexer<serde_json::Value, serde_json::Value>>,
|
||||||
> = Lazy::new(DashMap::new);
|
> = Lazy::new(DashMap::new);
|
||||||
|
@ -63,83 +63,40 @@ where
|
||||||
(fastest_item, fastest_result)
|
(fastest_item, fastest_result)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn retry<O, F: Fn() -> Option<O>>(f: F) -> O {
|
// TODO : async version
|
||||||
loop {
|
/*
|
||||||
match f() {
|
pub fn rally_async<T: Send + Sync + 'static, F, X: Send + 'static>(items: Vec<T>, f: F) -> (T, X)
|
||||||
Some(resp) => {
|
|
||||||
return resp;
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
log::info!("Got nothing, retrying...");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Run a background task.
|
|
||||||
///
|
|
||||||
/// This spawns a seperate thread for a background process.
|
|
||||||
/// The background task is guaranteed to finish within its defined scope.
|
|
||||||
/// If the end of the scope is reached while the thread is still running it will block.
|
|
||||||
///
|
|
||||||
/// # Example
|
|
||||||
/// ```ignore
|
|
||||||
/// use comrade::background;
|
|
||||||
///
|
|
||||||
/// fn do_work() {
|
|
||||||
/// println!("doing work...");
|
|
||||||
///
|
|
||||||
/// // spawn background thread
|
|
||||||
/// background!(|| {
|
|
||||||
/// println!("doing something in the background");
|
|
||||||
/// std::thread::sleep(std::time::Duration::from_secs(3));
|
|
||||||
/// });
|
|
||||||
///
|
|
||||||
/// println!("doing something else...");
|
|
||||||
///
|
|
||||||
/// // end of scope
|
|
||||||
/// // the code will block until all background processes defined here are done.
|
|
||||||
/// }
|
|
||||||
///
|
|
||||||
/// fn main() {
|
|
||||||
/// do_work();
|
|
||||||
/// println!("finished with work");
|
|
||||||
/// }
|
|
||||||
/// ```
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! background {
|
|
||||||
($f:expr) => {
|
|
||||||
let handle = std::thread::spawn(move || $f());
|
|
||||||
comrade::defer!(|| {
|
|
||||||
handle.join().unwrap();
|
|
||||||
});
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Start running a function after `duration`.
|
|
||||||
pub fn delay<F: Fn() + Send + 'static>(duration: std::time::Duration, f: F) {
|
|
||||||
let _ = std::thread::spawn(move || {
|
|
||||||
log::info!("Will start running in {duration:?}");
|
|
||||||
std::thread::sleep(duration);
|
|
||||||
f();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Run `f(&T) -> X` for every item in `items`
|
|
||||||
pub fn parallel<T: Send + Sync + 'static, F, X: Send + 'static>(items: Vec<T>, f: F) -> Vec<X>
|
|
||||||
where
|
where
|
||||||
F: Fn(&T) -> X + Send + Sync + Copy + 'static,
|
F: AsyncFn(&T) -> X + Send + Sync + Copy + 'static,
|
||||||
{
|
{
|
||||||
let threads: Vec<_> = items
|
let (tx, rx) = mpsc::channel();
|
||||||
.into_iter()
|
let mut handles = Vec::new();
|
||||||
.map(|x| std::thread::spawn(move || f(&x)))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
threads.into_iter().map(|x| x.join().unwrap()).collect()
|
for item in items {
|
||||||
|
let tx = tx.clone();
|
||||||
|
let item_ref = item;
|
||||||
|
let f = f;
|
||||||
|
|
||||||
|
tokio::task::spawn()
|
||||||
|
|
||||||
|
let handle = thread::spawn(move || {
|
||||||
|
let start = Instant::now();
|
||||||
|
let result = f(&item_ref);
|
||||||
|
let elapsed = start.elapsed();
|
||||||
|
let _ = tx.send((item_ref, result, elapsed));
|
||||||
|
});
|
||||||
|
handles.push(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn datetime_in(d: Duration) -> chrono::DateTime<chrono::Utc> {
|
drop(tx);
|
||||||
chrono::Utc::now()
|
|
||||||
.checked_add_signed(chrono::TimeDelta::from_std(d).unwrap())
|
let (fastest_item, fastest_result, _) = rx.recv().unwrap();
|
||||||
.unwrap()
|
|
||||||
|
for handle in handles {
|
||||||
|
handle.thread().unpark();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(fastest_item, fastest_result)
|
||||||
|
}
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
|
@ -8,8 +8,6 @@ use std::{
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::cron::Cron;
|
|
||||||
|
|
||||||
/// Status receiver of a dead man switch
|
/// Status receiver of a dead man switch
|
||||||
pub struct DeadManReceiver {
|
pub struct DeadManReceiver {
|
||||||
rx: Receiver<bool>,
|
rx: Receiver<bool>,
|
||||||
|
@ -85,16 +83,6 @@ impl ServiceManager {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn register_cron(self, cron: Arc<Cron>) -> (Self, Arc<Cron>) {
|
|
||||||
let cron_ret = Arc::clone(&cron);
|
|
||||||
(
|
|
||||||
self.register("cron", move |_| {
|
|
||||||
cron.run();
|
|
||||||
}),
|
|
||||||
cron_ret,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Register a new background service
|
/// Register a new background service
|
||||||
pub fn register<T: Fn(DeadManSwitch) -> () + 'static + Send + Sync>(
|
pub fn register<T: Fn(DeadManSwitch) -> () + 'static + Send + Sync>(
|
||||||
mut self,
|
mut self,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue