use crossbeam::channel::{Receiver, Sender}; use rand::Rng; use redis::{Commands, RedisResult}; use serde::{Deserialize, Serialize}; use std::{sync::mpsc, time::Duration}; pub enum TaskReceiverBackend { Local(Receiver>), Union(ValkeyJobDispatcher), } impl Deserialize<'a>, O: Serialize + for<'a> Deserialize<'a>> TaskReceiverBackend { pub fn recv(&self) -> Result, String> { match self { TaskReceiverBackend::Local(receiver) => receiver.recv().map_err(|x| x.to_string()), TaskReceiverBackend::Union(valkey_job_dispatcher) => valkey_job_dispatcher.recv(), } } } pub struct ValkeyTopicSubscriber { output: std::marker::PhantomData, topic: String, client: redis::Client, } impl Deserialize<'a>> ValkeyTopicSubscriber { pub fn new(channel: &str) -> Self { let client = redis::Client::open(std::env::var("VALKEY_URL").expect("No $VALKEY_URL variable set")) .expect("Invalid Redis URL"); Self { output: std::marker::PhantomData, topic: channel.to_string(), client: client, } } pub fn recv(&self) -> Option { let mut con = self .client .get_connection() .expect("Failed to connect to Redis"); let result: RedisResult> = con.blpop(&self.topic, 0.0); match result { Ok(msg) => { let msg = msg.iter().nth(1).unwrap(); Some(serde_json::from_str(&msg).unwrap()) } Err(_) => None, } } pub fn recv_timeout(&self, timeout: std::time::Duration) -> Option { let mut con = self .client .get_connection() .expect("Failed to connect to Redis"); let result: RedisResult> = con.blpop(&self.topic, timeout.as_secs_f64()); match result { Ok(msg) => { let msg = msg.iter().nth(1).unwrap(); Some(serde_json::from_str(&msg).unwrap()) } Err(_) => None, } } } #[derive(Clone)] pub struct ValkeyJobDispatcher { input: std::marker::PhantomData, output: std::marker::PhantomData, topic: String, client: redis::Client, local: bool, } impl Deserialize<'a>, O: Serialize + for<'a> Deserialize<'a>> ValkeyJobDispatcher { // Creates a new job dispatcher for the given topic. pub fn new_topic(topic: &str, local: bool) -> Self { let client = redis::Client::open(std::env::var("VALKEY_URL").expect("No $VALKEY_URL variable set")) .expect("Invalid Redis URL"); ValkeyJobDispatcher { input: std::marker::PhantomData, output: std::marker::PhantomData, topic: topic.to_string(), client, local, } } // todo : real pub sub pub fn recv(&self) -> Result, String> { let mut con = self .client .get_connection() .expect("Failed to connect to Redis"); let result: RedisResult> = con.blpop(&self.topic, 0.0); match result { Ok(msg) => { let msg = msg.iter().nth(1).unwrap(); if let serde_json::Value::Object(task) = serde_json::from_str(&msg).unwrap() { let channel_id = task.get("task").unwrap().as_str().unwrap().to_string(); let params = task.get("params").unwrap(); Ok(JobOrder::new( serde_json::from_value(params.clone()).unwrap(), move |res| { // send back to channel let _: () = con .rpush( &channel_id, serde_json::to_string(&serde_json::to_value(&res).unwrap()) .unwrap(), ) .expect("Failed to send job"); }, )) } else { Err(String::new()) } } Err(e) => { log::error!("Valkey error: {e:?}"); Err(e.to_string()) } } } } impl Deserialize<'a>, O: for<'a> Deserialize<'a> + Serialize> JobDispatch for ValkeyJobDispatcher { // Sends a job to the Redis topic (publishes a message). fn send(&self, param: I) -> O { let mut con = self .client .get_connection() .expect("Failed to connect to Redis"); // Pushing the job to the topic in Redis let channel_id = uuid::Uuid::new_v4().to_string(); let _: () = con .rpush( &self.topic, serde_json::to_string(&serde_json::json!({ "task": channel_id, "params": ¶m })) .unwrap(), ) .expect("Failed to send job"); ValkeyTopicSubscriber::new(&channel_id).recv().unwrap() } // Sends a job asynchronously (non-blocking). fn send_async(&self, param: I) -> JobResult { let mut con = self .client .get_connection() .expect("Failed to connect to Redis"); // Pushing the job to the topic in Redis let channel_id = uuid::Uuid::new_v4().to_string(); let _: () = con .rpush( &self.topic, serde_json::to_string(&serde_json::json!({ "task": channel_id, "params": ¶m })) .unwrap(), ) .expect("Failed to send job"); JobResult(ReceiverBackend::Valkey(ValkeyTopicSubscriber::new( &channel_id, ))) } // Tries to send a job, returning None if unsuccessful. fn try_send(&self, param: I) -> Option { let res = self.send_async(param); res.wait_try() } } #[derive(Clone)] /// A generic job dispatcher struct that allows sending jobs of type `I` and receiving results of type `O` using message passing. pub struct JobDispatcher { sender: Sender>, } pub enum ReceiverBackend { Local(std::sync::mpsc::Receiver), Valkey(ValkeyTopicSubscriber), } impl Deserialize<'a>> ReceiverBackend { pub fn recv(&self) -> Option { match self { ReceiverBackend::Local(receiver) => receiver.recv().ok(), ReceiverBackend::Valkey(valkey) => valkey.recv(), } } pub fn recv_timeout(&self) -> Option { match self { ReceiverBackend::Local(receiver) => { receiver.recv_timeout(Duration::from_millis(300)).ok() } ReceiverBackend::Valkey(valkey_topic_subscriber) => { valkey_topic_subscriber.recv_timeout(Duration::from_millis(300)) } } } } pub struct JobResult(ReceiverBackend); impl Deserialize<'a>> JobResult { /// Wait for the Result of a Job. pub fn wait(self) -> O { self.0.recv().unwrap() } pub fn wait_try(self) -> Option { self.0.recv() } pub fn wait_timeout(&self) -> Option { self.0.recv_timeout() } } impl JobDispatcher { /// Creates a new instance of `JobDispatcher` and returns a tuple that contains it and a receiver end for `JobOrder`s. /// # Example: /// ``` /// use jobdispatcher::*; /// // Create job dispatcher /// let (dispatcher, recv) = JobDispatcher::::new(); /// /// // Worker Thread /// std::thread::spawn(move || { /// for job in recv { /// let result = job.param + 1; /// job.done(result); /// } /// }); /// /// // Usage /// let result = dispatcher.send(3); /// assert_eq!(result, 4); /// ``` #[must_use] pub fn new() -> (Self, Receiver>) { let (sender, receiver) = crossbeam::channel::bounded(8092); (Self { sender: sender }, receiver) } } impl JobDispatch for JobDispatcher { /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. /// Returns the result of the job once it has been processed. /// # Panics /// This function panics when the `JobOrder` struct gets out of scope without returning a finished result. /// Additionally if the internal `Mutex` is poisoned, this function will panic as well. fn send(&self, param: I) -> O { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); }); self.sender.send(job_order).unwrap(); rx.recv().unwrap() } fn send_async(&self, param: I) -> JobResult { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); }); self.sender.send(job_order).unwrap(); JobResult(ReceiverBackend::Local(rx)) } /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. /// Returns `Some(V)` when the job returns an result, `None` if somehow nothing was returned or the internal `Mutex` is poisoned. fn try_send(&self, param: I) -> Option { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); }); self.sender.send(job_order).ok()?; rx.recv().ok() } } pub trait JobDispatch { fn send(&self, param: I) -> O; fn send_async(&self, param: I) -> JobResult; fn try_send(&self, param: I) -> Option; } /// A struct that represents a job order that encapsulates a job of type `I` and its result of type `O`, along with a callback function that will send the result back to the job origin. pub struct JobOrder { /// The job parameter of type `T`. pub param: I, callback: Box, } impl JobOrder { /// Creates a new `JobOrder` instance with the specified job parameter `param` of type `I` and a callback function that takes the job result of type `O` as an argument. #[must_use] fn new(param: I, callback: impl FnOnce(O) + Send + 'static) -> Self { Self { param, callback: Box::new(callback), } } /// Send the result of the `JobOrder` back to it's origin pub fn done(self, val: O) { (self.callback)(val); } } pub enum Dispatcher { Local(JobDispatcher), Union(ValkeyJobDispatcher), } impl< I: Serialize + for<'a> Deserialize<'a> + Send + 'static, O: Serialize + for<'a> Deserialize<'a> + Send + 'static, > Dispatcher { pub fn is_local(&self) -> bool { match self { Dispatcher::Local(_) => true, Dispatcher::Union(valkey_job_dispatcher) => valkey_job_dispatcher.local, } } fn send(&self, param: I) -> O { match self { Dispatcher::Local(job_dispatcher) => job_dispatcher.send(param), Dispatcher::Union(valkey_job_dispatcher) => valkey_job_dispatcher.send(param), } } fn send_async(&self, param: I) -> JobResult { match self { Dispatcher::Local(job_dispatcher) => job_dispatcher.send_async(param), Dispatcher::Union(valkey_job_dispatcher) => valkey_job_dispatcher.send_async(param), } } } pub struct JobMultiplexer { dispatchers: Vec>, } fn get_random_item(list: &[T]) -> Option<&T> { if list.is_empty() { return None; } let mut rng = rand::rng(); let index = rng.random_range(0..list.len()); list.get(index) } impl< I: Serialize + for<'a> Deserialize<'a> + Send + 'static, O: Serialize + for<'a> Deserialize<'a> + Send + 'static, > JobMultiplexer { pub fn from(dispatchers: Vec>) -> Self { Self { dispatchers } } pub fn send(&self, param: I) -> O { let d = get_random_item(&self.dispatchers).unwrap(); d.send(param) } pub fn send_async(&self, param: I) -> JobResult { let d = get_random_item(&self.dispatchers).unwrap(); d.send_async(param) } } impl< I: Clone + Serialize + for<'a> Deserialize<'a> + Send + 'static, O: Serialize + for<'a> Deserialize<'a> + Send + 'static, > JobMultiplexer { pub fn send_all(&self, param: I) { for d in &self.dispatchers { if d.is_local() { let _ = d.send(param.clone()); } } } } /// Iterator which returns ready results from a `Vec>. /// /// This Iterator waits for each `JobResult` with a timeout and yields a result once it finds one finished `JobResult`. /// /// # Example /// ```ignore /// // Started Tasks which are pending /// let pending_tasks = vec![...]; /// /// for task in pending_tasks { /// // blocks and waits for the first `JobResult<_>` even though the next ones in the `Vec<_>` could be finished and processed already. /// let result = task.wait(); /// // ... /// } /// /// // With the Iterator /// /// for value in PendingTaskIterator(pending_tasks) { /// // You can immidiatelly start processing the first finished result /// // ... /// } /// ``` pub struct PendingTaskIterator(pub Vec>); impl Deserialize<'a>> Iterator for PendingTaskIterator { type Item = O; fn next(&mut self) -> Option { if self.0.is_empty() { return None; } loop { for (i, task) in self.0.iter().enumerate() { if let Some(res) = task.wait_timeout() { self.0.remove(i); return Some(res); } } } } } /// Iterator which returns ready results from a `Vec> along with a label. /// /// Compared to a normal `PendingTaskIterator` this Iterator takes a `Vec<(L, JobResult)>`. /// You can use the variable `L` for an associated label to correlate results to their origins. /// /// This Iterator waits for each `JobResult` with a timeout and yields a result once it finds one finished `JobResult`. /// /// # Example /// ```ignore /// // Started Tasks which are pending /// let pending_tasks = vec![...]; /// /// for task in pending_tasks { /// // blocks and waits for the first `JobResult<_>` even though the next ones in the `Vec<_>` could be finished and processed already. /// let result = task.wait(); /// // ... /// } /// /// // With the Iterator /// /// for value in LabelPendingTaskIterator(pending_tasks) { /// // You can immidiatelly start processing the first finished result /// // ... /// } /// ``` pub struct LabelPendingTaskIterator(pub Vec<(L, JobResult)>); impl Deserialize<'a>> Iterator for LabelPendingTaskIterator { type Item = (L, O); fn next(&mut self) -> Option { if self.0.is_empty() { return None; } loop { for (i, task) in self.0.iter().enumerate() { let result = &task.1; let label = task.0.clone(); if let Some(res) = result.wait_timeout() { self.0.remove(i); return Some((label, res)); } } } } }