diff --git a/comrade-macro/src/lib.rs b/comrade-macro/src/lib.rs index ac7cb87..654d81d 100644 --- a/comrade-macro/src/lib.rs +++ b/comrade-macro/src/lib.rs @@ -13,6 +13,9 @@ use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; /// - `fn_init_union_scoped(ServiceManager) -> ServiceManager` - Register worker threads and work in a union using Valkey and return a scoped struct. /// - `fn_register_union()` - Register the worker in a union setup without starting local worker threads. /// +/// # Notes +/// Keep in mind that `return` statements are not allowed in a worker function. The return values must be explicitly at the end of the function as an expression. +/// /// # Examples /// ```ignore /// use comrade::worker; @@ -128,7 +131,7 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { } } - fn #wrapper_fn(task: JobOrder) { + fn #wrapper_fn(task: comrade::job::JobOrder) { let i = task.param.clone(); // Deserialize the parameter into the function's expected types @@ -173,14 +176,14 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { } #[doc = "Initialize a worker thread on `ServiceManager`"] - pub fn #init_fn(sm: ServiceManager) -> ServiceManager { + pub fn #init_fn(sm: comrade::service::ServiceManager) -> comrade::service::ServiceManager { let mut dispatchers = Vec::new(); let mut s = sm; log::info!("Initializing worker {} with {} threads", stringify!(#worker_fn), #worker_count); for i in 0..#worker_count { - let (dispatch, recv): (JobDispatcher<_, _>, Receiver>) = JobDispatcher::new(); + let (dispatch, recv): (comrade::job::JobDispatcher<_, _>, comrade::crossbeam::channel::Receiver>) = comrade::job::JobDispatcher::new(); s = s.register( &format!("{}_{i}", stringify!(#worker_fn)), @@ -211,7 +214,7 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { } #[doc = "Initialize worker threads on `ServiceManager` with Valkey backend"] - pub fn #init_fn_union(sm: ServiceManager) -> ServiceManager { + pub fn #init_fn_union(sm: comrade::service::ServiceManager) -> comrade::service::ServiceManager { let mut dispatchers = Vec::new(); let mut s = sm; @@ -254,13 +257,13 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { } #[doc = "Initialize a worker thread on `ServiceManager` on a scoped lifetime"] - pub fn #init_fn_scoped(sm: ServiceManager) -> (ServiceManager, #fn_scope_struct) { + pub fn #init_fn_scoped(sm: comrade::service::ServiceManager) -> (comrade::service::ServiceManager, #fn_scope_struct) { let sm = #init_fn(sm); (sm, #fn_scope_struct {}) } #[doc = "Initialize a worker union on `ServiceManager` on a scoped lifetime"] - pub fn #init_fn_scoped_union(sm: ServiceManager) -> (ServiceManager, #fn_scope_struct) { + pub fn #init_fn_scoped_union(sm: comrade::service::ServiceManager) -> (comrade::service::ServiceManager, #fn_scope_struct) { let sm = #init_fn_union(sm); (sm, #fn_scope_struct {}) } diff --git a/examples/pending.rs b/examples/pending.rs new file mode 100644 index 0000000..29c3353 --- /dev/null +++ b/examples/pending.rs @@ -0,0 +1,30 @@ +use std::time::Duration; + +use comrade::{job::LabelPendingTaskIterator, service::ServiceManager, worker}; + +#[worker(5)] +pub fn wait_work(i: u64) -> u64 { + std::thread::sleep(Duration::from_secs(i)); + i +} + +fn main() { + let mut s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); + s = wait_work_init(s); + + let s = s.spawn(); + + let mut pending = Vec::new(); + + for i in 0..5 { + pending.push((i, wait_work_async(rand::random_range(0..5)))); + } + + for (res, label) in LabelPendingTaskIterator(pending) { + println!("Got value back {res:?} for {label:?}"); + } + + wait_work_shutdown(); + + s.join().unwrap(); +} diff --git a/src/job.rs b/src/job.rs index 3f3f12c..8c56621 100644 --- a/src/job.rs +++ b/src/job.rs @@ -2,7 +2,7 @@ use crossbeam::channel::{Receiver, Sender}; use rand::Rng; use redis::{Commands, RedisResult}; use serde::{Deserialize, Serialize}; -use std::sync::mpsc; +use std::{sync::mpsc, time::Duration}; pub enum TaskReceiverBackend { Local(Receiver>), @@ -55,6 +55,24 @@ impl Deserialize<'a>> ValkeyTopicSubscriber { Err(_) => None, } } + + pub fn recv_timeout(&self, timeout: std::time::Duration) -> Option { + let mut con = self + .client + .get_connection() + .expect("Failed to connect to Redis"); + + let result: RedisResult> = con.blpop(&self.topic, timeout.as_secs_f64()); + + match result { + Ok(msg) => { + let msg = msg.iter().nth(1).unwrap(); + + Some(serde_json::from_str(&msg).unwrap()) + } + Err(_) => None, + } + } } #[derive(Clone)] @@ -200,6 +218,17 @@ impl Deserialize<'a>> ReceiverBackend { ReceiverBackend::Valkey(valkey) => valkey.recv(), } } + + pub fn recv_timeout(&self) -> Option { + match self { + ReceiverBackend::Local(receiver) => { + receiver.recv_timeout(Duration::from_millis(300)).ok() + } + ReceiverBackend::Valkey(valkey_topic_subscriber) => { + valkey_topic_subscriber.recv_timeout(Duration::from_millis(300)) + } + } + } } pub struct JobResult(ReceiverBackend); @@ -213,6 +242,10 @@ impl Deserialize<'a>> JobResult { pub fn wait_try(self) -> Option { self.0.recv() } + + pub fn wait_timeout(&self) -> Option { + self.0.recv_timeout() + } } impl JobDispatcher { @@ -386,3 +419,94 @@ impl< } } } + +/// Iterator which returns ready results from a `Vec>. +/// +/// This Iterator waits for each `JobResult` with a timeout and yields a result once it finds one finished `JobResult`. +/// +/// # Example +/// ```ignore +/// // Started Tasks which are pending +/// let pending_tasks = vec![...]; +/// +/// for task in pending_tasks { +/// // blocks and waits for the first `JobResult<_>` even though the next ones in the `Vec<_>` could be finished and processed already. +/// let result = task.wait(); +/// // ... +/// } +/// +/// // With the Iterator +/// +/// for value in PendingTaskIterator(pending_tasks) { +/// // You can immidiatelly start processing the first finished result +/// // ... +/// } +/// ``` +pub struct PendingTaskIterator(pub Vec>); + +impl Deserialize<'a>> Iterator for PendingTaskIterator { + type Item = O; + + fn next(&mut self) -> Option { + if self.0.is_empty() { + return None; + } + + loop { + for (i, task) in self.0.iter().enumerate() { + if let Some(res) = task.wait_timeout() { + self.0.remove(i); + return Some(res); + } + } + } + } +} + +/// Iterator which returns ready results from a `Vec> along with a label. +/// +/// Compared to a normal `PendingTaskIterator` this Iterator takes a `Vec<(L, JobResult)>`. +/// You can use the variable `L` for an associated label to correlate results to their origins. +/// +/// This Iterator waits for each `JobResult` with a timeout and yields a result once it finds one finished `JobResult`. +/// +/// # Example +/// ```ignore +/// // Started Tasks which are pending +/// let pending_tasks = vec![...]; +/// +/// for task in pending_tasks { +/// // blocks and waits for the first `JobResult<_>` even though the next ones in the `Vec<_>` could be finished and processed already. +/// let result = task.wait(); +/// // ... +/// } +/// +/// // With the Iterator +/// +/// for value in LabelPendingTaskIterator(pending_tasks) { +/// // You can immidiatelly start processing the first finished result +/// // ... +/// } +/// ``` +pub struct LabelPendingTaskIterator(pub Vec<(L, JobResult)>); + +impl Deserialize<'a>> Iterator for LabelPendingTaskIterator { + type Item = (L, O); + + fn next(&mut self) -> Option { + if self.0.is_empty() { + return None; + } + + loop { + for (i, task) in self.0.iter().enumerate() { + let result = &task.1; + let label = task.0.clone(); + if let Some(res) = result.wait_timeout() { + self.0.remove(i); + return Some((label, res)); + } + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index e3bb108..c3abd0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,12 +3,23 @@ use std::{sync::mpsc, thread, time::Instant}; pub mod job; pub mod service; pub use comrade_macro::worker; +pub use crossbeam; use dashmap::DashMap; use once_cell::sync::Lazy; pub use serde_json; // TODO : worker docs + refactor +// TODO : functions which can be stopped, paused, etc +/* +Example: + +let myf = Function::new(|| do_something()); + +// stop fn +myf.stop(); +*/ + pub static UNION: Lazy< DashMap<&'static str, job::JobMultiplexer>, > = Lazy::new(DashMap::new); @@ -43,6 +54,7 @@ where let (fastest_item, fastest_result, elapsed) = rx.recv().unwrap(); for handle in handles { + // todo : threads do not get killed here handle.thread().unpark(); }