pending collector iterators

This commit is contained in:
JMARyA 2025-03-09 06:01:38 +01:00
parent 00e448ac31
commit b043db3f4e
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
4 changed files with 176 additions and 7 deletions

View file

@ -13,6 +13,9 @@ use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input};
/// - `fn_init_union_scoped(ServiceManager) -> ServiceManager` - Register worker threads and work in a union using Valkey and return a scoped struct.
/// - `fn_register_union()` - Register the worker in a union setup without starting local worker threads.
///
/// # Notes
/// Keep in mind that `return` statements are not allowed in a worker function. The return values must be explicitly at the end of the function as an expression.
///
/// # Examples
/// ```ignore
/// use comrade::worker;
@ -128,7 +131,7 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream {
}
}
fn #wrapper_fn(task: JobOrder<comrade::serde_json::Value, comrade::serde_json::Value>) {
fn #wrapper_fn(task: comrade::job::JobOrder<comrade::serde_json::Value, comrade::serde_json::Value>) {
let i = task.param.clone();
// Deserialize the parameter into the function's expected types
@ -173,14 +176,14 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream {
}
#[doc = "Initialize a worker thread on `ServiceManager`"]
pub fn #init_fn(sm: ServiceManager) -> ServiceManager {
pub fn #init_fn(sm: comrade::service::ServiceManager) -> comrade::service::ServiceManager {
let mut dispatchers = Vec::new();
let mut s = sm;
log::info!("Initializing worker {} with {} threads", stringify!(#worker_fn), #worker_count);
for i in 0..#worker_count {
let (dispatch, recv): (JobDispatcher<_, _>, Receiver<JobOrder<_, _>>) = JobDispatcher::new();
let (dispatch, recv): (comrade::job::JobDispatcher<_, _>, comrade::crossbeam::channel::Receiver<comrade::job::JobOrder<_, _>>) = comrade::job::JobDispatcher::new();
s = s.register(
&format!("{}_{i}", stringify!(#worker_fn)),
@ -211,7 +214,7 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream {
}
#[doc = "Initialize worker threads on `ServiceManager` with Valkey backend"]
pub fn #init_fn_union(sm: ServiceManager) -> ServiceManager {
pub fn #init_fn_union(sm: comrade::service::ServiceManager) -> comrade::service::ServiceManager {
let mut dispatchers = Vec::new();
let mut s = sm;
@ -254,13 +257,13 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream {
}
#[doc = "Initialize a worker thread on `ServiceManager` on a scoped lifetime"]
pub fn #init_fn_scoped(sm: ServiceManager) -> (ServiceManager, #fn_scope_struct) {
pub fn #init_fn_scoped(sm: comrade::service::ServiceManager) -> (comrade::service::ServiceManager, #fn_scope_struct) {
let sm = #init_fn(sm);
(sm, #fn_scope_struct {})
}
#[doc = "Initialize a worker union on `ServiceManager` on a scoped lifetime"]
pub fn #init_fn_scoped_union(sm: ServiceManager) -> (ServiceManager, #fn_scope_struct) {
pub fn #init_fn_scoped_union(sm: comrade::service::ServiceManager) -> (comrade::service::ServiceManager, #fn_scope_struct) {
let sm = #init_fn_union(sm);
(sm, #fn_scope_struct {})
}

30
examples/pending.rs Normal file
View file

@ -0,0 +1,30 @@
use std::time::Duration;
use comrade::{job::LabelPendingTaskIterator, service::ServiceManager, worker};
#[worker(5)]
pub fn wait_work(i: u64) -> u64 {
std::thread::sleep(Duration::from_secs(i));
i
}
fn main() {
let mut s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay);
s = wait_work_init(s);
let s = s.spawn();
let mut pending = Vec::new();
for i in 0..5 {
pending.push((i, wait_work_async(rand::random_range(0..5))));
}
for (res, label) in LabelPendingTaskIterator(pending) {
println!("Got value back {res:?} for {label:?}");
}
wait_work_shutdown();
s.join().unwrap();
}

View file

@ -2,7 +2,7 @@ use crossbeam::channel::{Receiver, Sender};
use rand::Rng;
use redis::{Commands, RedisResult};
use serde::{Deserialize, Serialize};
use std::sync::mpsc;
use std::{sync::mpsc, time::Duration};
pub enum TaskReceiverBackend<I, O> {
Local(Receiver<JobOrder<I, O>>),
@ -55,6 +55,24 @@ impl<O: for<'a> Deserialize<'a>> ValkeyTopicSubscriber<O> {
Err(_) => None,
}
}
pub fn recv_timeout(&self, timeout: std::time::Duration) -> Option<O> {
let mut con = self
.client
.get_connection()
.expect("Failed to connect to Redis");
let result: RedisResult<Vec<String>> = con.blpop(&self.topic, timeout.as_secs_f64());
match result {
Ok(msg) => {
let msg = msg.iter().nth(1).unwrap();
Some(serde_json::from_str(&msg).unwrap())
}
Err(_) => None,
}
}
}
#[derive(Clone)]
@ -200,6 +218,17 @@ impl<O: for<'a> Deserialize<'a>> ReceiverBackend<O> {
ReceiverBackend::Valkey(valkey) => valkey.recv(),
}
}
pub fn recv_timeout(&self) -> Option<O> {
match self {
ReceiverBackend::Local(receiver) => {
receiver.recv_timeout(Duration::from_millis(300)).ok()
}
ReceiverBackend::Valkey(valkey_topic_subscriber) => {
valkey_topic_subscriber.recv_timeout(Duration::from_millis(300))
}
}
}
}
pub struct JobResult<O>(ReceiverBackend<O>);
@ -213,6 +242,10 @@ impl<O: for<'a> Deserialize<'a>> JobResult<O> {
pub fn wait_try(self) -> Option<O> {
self.0.recv()
}
pub fn wait_timeout(&self) -> Option<O> {
self.0.recv_timeout()
}
}
impl<I: Send + 'static, O: Send + 'static> JobDispatcher<I, O> {
@ -386,3 +419,94 @@ impl<
}
}
}
/// Iterator which returns ready results from a `Vec<JobResult<O>>.
///
/// This Iterator waits for each `JobResult<O>` with a timeout and yields a result once it finds one finished `JobResult<O>`.
///
/// # Example
/// ```ignore
/// // Started Tasks which are pending
/// let pending_tasks = vec![...];
///
/// for task in pending_tasks {
/// // blocks and waits for the first `JobResult<_>` even though the next ones in the `Vec<_>` could be finished and processed already.
/// let result = task.wait();
/// // ...
/// }
///
/// // With the Iterator
///
/// for value in PendingTaskIterator(pending_tasks) {
/// // You can immidiatelly start processing the first finished result
/// // ...
/// }
/// ```
pub struct PendingTaskIterator<O>(pub Vec<JobResult<O>>);
impl<O: for<'a> Deserialize<'a>> Iterator for PendingTaskIterator<O> {
type Item = O;
fn next(&mut self) -> Option<Self::Item> {
if self.0.is_empty() {
return None;
}
loop {
for (i, task) in self.0.iter().enumerate() {
if let Some(res) = task.wait_timeout() {
self.0.remove(i);
return Some(res);
}
}
}
}
}
/// Iterator which returns ready results from a `Vec<JobResult<O>> along with a label.
///
/// Compared to a normal `PendingTaskIterator<O>` this Iterator takes a `Vec<(L, JobResult<I, O>)>`.
/// You can use the variable `L` for an associated label to correlate results to their origins.
///
/// This Iterator waits for each `JobResult<O>` with a timeout and yields a result once it finds one finished `JobResult<O>`.
///
/// # Example
/// ```ignore
/// // Started Tasks which are pending
/// let pending_tasks = vec![...];
///
/// for task in pending_tasks {
/// // blocks and waits for the first `JobResult<_>` even though the next ones in the `Vec<_>` could be finished and processed already.
/// let result = task.wait();
/// // ...
/// }
///
/// // With the Iterator
///
/// for value in LabelPendingTaskIterator(pending_tasks) {
/// // You can immidiatelly start processing the first finished result
/// // ...
/// }
/// ```
pub struct LabelPendingTaskIterator<L, O>(pub Vec<(L, JobResult<O>)>);
impl<L: Clone, O: for<'a> Deserialize<'a>> Iterator for LabelPendingTaskIterator<L, O> {
type Item = (L, O);
fn next(&mut self) -> Option<Self::Item> {
if self.0.is_empty() {
return None;
}
loop {
for (i, task) in self.0.iter().enumerate() {
let result = &task.1;
let label = task.0.clone();
if let Some(res) = result.wait_timeout() {
self.0.remove(i);
return Some((label, res));
}
}
}
}
}

View file

@ -3,12 +3,23 @@ use std::{sync::mpsc, thread, time::Instant};
pub mod job;
pub mod service;
pub use comrade_macro::worker;
pub use crossbeam;
use dashmap::DashMap;
use once_cell::sync::Lazy;
pub use serde_json;
// TODO : worker docs + refactor
// TODO : functions which can be stopped, paused, etc
/*
Example:
let myf = Function::new(|| do_something());
// stop fn
myf.stop();
*/
pub static UNION: Lazy<
DashMap<&'static str, job::JobMultiplexer<serde_json::Value, serde_json::Value>>,
> = Lazy::new(DashMap::new);
@ -43,6 +54,7 @@ where
let (fastest_item, fastest_result, elapsed) = rx.recv().unwrap();
for handle in handles {
// todo : threads do not get killed here
handle.thread().unpark();
}