From df3bbb09ab2b2cace22d052e4a22370c88be9f2c Mon Sep 17 00:00:00 2001 From: JMARyA Date: Sat, 18 Mar 2023 17:56:31 +0100 Subject: [PATCH] init --- .gitignore | 2 ++ Cargo.toml | 7 +++++ README.md | 5 ++++ src/lib.rs | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4fffb2f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..218a217 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "jobdispatcher" +version = "0.1.0" +edition = "2021" +authors = ["JMARyA "] + +[dependencies] diff --git a/README.md b/README.md new file mode 100644 index 0000000..70c2a61 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +# JobDispatcher +A thread-safe job dispatcher written in rust. + +## Why does this exist? +I had a data structure which could not be send across threads. So with this crate one can send the input over, the data structure can do it's thing in it's own thread and send the result back \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..2c01eca --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,87 @@ +use std::sync::{mpsc, Arc, Mutex}; + +#[derive(Clone)] +/// A generic job dispatcher struct that allows sending jobs of type `T` and receiving results of type `V` using message passing. +pub struct JobDispatcher { + sender: Arc>>>, +} + +impl JobDispatcher { + /// Creates a new instance of `JobDispatcher` and returns a tuple that contains it and a receiver end for `JobOrder`s. + /// # Example: + /// ``` + /// use jobdispatcher::*; + /// // Create job dispatcher + /// let (dispatcher, recv) = JobDispatcher::::new(); + /// + /// // Worker Thread + /// std::thread::spawn(move || { + /// for job in recv { + /// let result = job.param + 1; + /// job.done(result); + /// } + /// }); + /// + /// // Usage + /// let result = dispatcher.send(3); + /// assert_eq!(result, 4); + /// ``` + #[must_use] + pub fn new() -> (Self, mpsc::Receiver>) { + let (sender, receiver) = mpsc::channel(); + ( + Self { + sender: Arc::new(Mutex::new(sender)), + }, + receiver, + ) + } + + /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. + /// Returns the result of the job once it has been processed. + /// # Panics + /// This function panics when the `JobOrder` struct gets out of scope without returning a finished result. + /// Additionally if the internal `Mutex` is poisoned, this function will panic as well. + pub fn send(&self, param: T) -> V { + let (tx, rx) = mpsc::channel(); + let job_order = JobOrder::new(param, move |ret| { + tx.send(ret).unwrap(); + }); + self.sender.lock().unwrap().send(job_order).unwrap(); + rx.recv().unwrap() + } + + /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. + /// Returns `Some(V)` when the job returns an result, `None` if somehow nothing was returned or the internal `Mutex` is poisoned. + pub fn try_send(&self, param: T) -> Option { + let (tx, rx) = mpsc::channel(); + let job_order = JobOrder::new(param, move |ret| { + tx.send(ret).unwrap(); + }); + self.sender.lock().ok()?.send(job_order).ok()?; + rx.recv().ok() + } +} + +/// A struct that represents a job order that encapsulates a job of type `T` and its result of type `V`, along with a callback function that will send the result back to the job origin. +pub struct JobOrder { + /// The job parameter of type `T`. + pub param: T, + callback: Box, +} + +impl JobOrder { + /// Creates a new `JobOrder` instance with the specified job parameter `param` of type `T` and a callback function that takes the job result of type `V` as an argument. + #[must_use] + fn new(param: T, callback: impl FnOnce(V) + Send + 'static) -> Self { + Self { + param, + callback: Box::new(callback), + } + } + + /// Send the result of the `JobOrder` back to it's origin + pub fn done(self, val: V) { + (self.callback)(val); + } +}