From e827faaa3fed15bbdc666fc6f8c4d2e865aee2d8 Mon Sep 17 00:00:00 2001 From: JMARyA Date: Sat, 8 Mar 2025 18:01:16 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20multi=20threaded=20workers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 117 +++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + README.md | 4 +- comrade-macro/src/lib.rs | 51 +++++++++-------- examples/work.rs | 2 +- src/job.rs | 76 ++++++++++++++++++------- src/lib.rs | 2 +- 7 files changed, 208 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6c720a2..d5f58e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,12 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "cfg-if" version = "1.0.0" @@ -125,6 +131,7 @@ dependencies = [ "env_logger", "log", "once_cell", + "rand", "serde_json", "tokio", ] @@ -232,6 +239,18 @@ dependencies = [ "log", ] +[[package]] +name = "getrandom" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" +dependencies = [ + "cfg-if", + "libc", + "wasi", + "windows-targets", +] + [[package]] name = "gimli" version = "0.31.1" @@ -333,6 +352,15 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy 0.7.35", +] + [[package]] name = "proc-macro2" version = "1.0.94" @@ -351,6 +379,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha", + "rand_core", + "zerocopy 0.8.23", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.5.10" @@ -478,6 +536,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "wasi" +version = "0.13.3+wasi-0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "windows-sys" version = "0.59.0" @@ -550,3 +617,53 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "wit-bindgen-rt" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" +dependencies = [ + "bitflags", +] + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6" +dependencies = [ + "zerocopy-derive 0.8.23", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index ddf0c9a..8416ea4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,3 +12,4 @@ once_cell = "1.20.3" tokio = "1.43.0" comrade-macro = { path = "./comrade-macro" } serde_json = "1.0.140" +rand = "0.9.0" diff --git a/README.md b/README.md index 8b1bf95..4b8d13e 100644 --- a/README.md +++ b/README.md @@ -71,12 +71,14 @@ You can annotate a function with `#[worker]` which gives them superpowers. These ```rust use comrade::{worker}; +// Single local worker #[worker] pub fn myfn(i: i32) -> i32 { i * 2 } -#[worker] +// 4 local worker threads +#[worker(4)] pub fn multiply(a: i32, b: i32) -> i32 { a * b } diff --git a/comrade-macro/src/lib.rs b/comrade-macro/src/lib.rs index 97533d4..7d44f4c 100644 --- a/comrade-macro/src/lib.rs +++ b/comrade-macro/src/lib.rs @@ -1,5 +1,5 @@ use proc_macro::TokenStream; -use quote::{ToTokens, format_ident, quote}; +use quote::{format_ident, quote}; use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; /// This macro turns this function into a worker. @@ -38,10 +38,13 @@ use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; /// } /// ``` #[proc_macro_attribute] -pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream { +pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { let input: ItemFn = parse_macro_input!(item as ItemFn); let fn_name = &input.sig.ident; + // Parse optional attribute argument (e.g., #[worker(4)]) + let worker_count = attr.to_string().parse::().unwrap_or(1); + // Extract parameter names and types separately let params: Vec<(Ident, Type)> = input .sig @@ -66,10 +69,6 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream { let param_names: Vec = params.iter().map(|(name, _)| name.clone()).collect(); let param_types: Vec = params.iter().map(|(_, ty)| ty.clone()).collect(); - for t in ¶m_types { - println!("param {}", t.to_token_stream().to_string()); - } - // Extract return type let return_type = match &input.sig.output { ReturnType::Type(_, ty) => quote!(#ty), @@ -133,6 +132,7 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream { } pub fn #worker_fn(recv: Receiver>) { + let mut metrics = (0, 0); loop { let task = recv.recv(); @@ -140,15 +140,17 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream { Ok(task) => { if let comrade::serde_json::Value::Object(obj) = &task.param { if obj.contains_key("task") { - log::info!("Shutdown requested for task worker {}", stringify!(#fn_name)); + log::info!("Shutdown requested for task worker {}. Processed {} tasks since start with {} errors.", stringify!(#fn_name), metrics.0, metrics.1); task.done(comrade::serde_json::json!({"ok": 1})); break; } } - #wrapper_fn(task) + #wrapper_fn(task); + metrics.0 += 1; }, Err(e) => { + metrics.1 += 1; log::error!("Error receiving task: {e:?}"); } } @@ -157,20 +159,30 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream { #[doc = "Shutdown the worker"] pub fn #shutdown_fn() { - comrade::UNION.get(stringify!(#fn_name)).unwrap().send(comrade::serde_json::json!({"task": "shutdown"})); + comrade::UNION.get(stringify!(#fn_name)).unwrap().send_all(comrade::serde_json::json!({"task": "shutdown"})); } #[doc = "Initialize a worker thread on `ServiceManager`"] pub fn #init_fn(sm: ServiceManager) -> ServiceManager { - let (dispatch, recv): (JobDispatcher<_, _>, Receiver>) = JobDispatcher::new(); + let mut dispatchers = Vec::new(); + let mut s = sm; - let sm = sm.register(stringify!(#worker_fn), move |_| { - #worker_fn(recv.clone()); - }); + for i in 0..#worker_count { + let (dispatch, recv): (JobDispatcher<_, _>, Receiver>) = JobDispatcher::new(); - comrade::UNION.insert(stringify!(#fn_name), dispatch); + s = s.register( + &format!("{}_{i}", stringify!(#worker_fn)), + move |_| { + #worker_fn(recv.clone()) + } + ); - sm + dispatchers.push(dispatch); + } + + comrade::UNION.insert(stringify!(#fn_name), comrade::job::JobMultiplexer::from(dispatchers)); + + s } #[allow(non_camel_case_types)] @@ -191,14 +203,7 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream { #[doc = "Initialize a worker thread on `ServiceManager` on a scoped lifetime"] pub fn #init_fn_scoped(sm: ServiceManager) -> (ServiceManager, #fn_scope_struct) { - let (dispatch, recv): (JobDispatcher<_, _>, Receiver>) = JobDispatcher::new(); - - let sm = sm.register(stringify!(#worker_fn), move |_| { - #worker_fn(recv.clone()); - }); - - comrade::UNION.insert(stringify!(#fn_name), dispatch); - + let sm = #init_fn(sm); (sm, #fn_scope_struct {}) } }; diff --git a/examples/work.rs b/examples/work.rs index 2605670..ecc64fb 100644 --- a/examples/work.rs +++ b/examples/work.rs @@ -17,7 +17,7 @@ pub fn myfn(i: i32) -> i32 { i * 2 } -#[worker] +#[worker(4)] pub fn multiply(a: i32, b: i32) -> i32 { a * b } diff --git a/src/job.rs b/src/job.rs index c569d85..1c85ab2 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1,22 +1,23 @@ use crossbeam::channel::{Receiver, Sender}; +use rand::Rng; use std::sync::mpsc; #[derive(Clone)] -/// A generic job dispatcher struct that allows sending jobs of type `T` and receiving results of type `V` using message passing. -pub struct JobDispatcher { - sender: Sender>, +/// A generic job dispatcher struct that allows sending jobs of type `I` and receiving results of type `O` using message passing. +pub struct JobDispatcher { + sender: Sender>, } -pub struct JobResult(std::sync::mpsc::Receiver); +pub struct JobResult(std::sync::mpsc::Receiver); -impl JobResult { +impl JobResult { /// Wait for the Result of a Job. - pub fn wait(self) -> V { + pub fn wait(self) -> O { self.0.recv().unwrap() } } -impl JobDispatcher { +impl JobDispatcher { /// Creates a new instance of `JobDispatcher` and returns a tuple that contains it and a receiver end for `JobOrder`s. /// # Example: /// ``` @@ -37,7 +38,7 @@ impl JobDispatcher { /// assert_eq!(result, 4); /// ``` #[must_use] - pub fn new() -> (Self, Receiver>) { + pub fn new() -> (Self, Receiver>) { let (sender, receiver) = crossbeam::channel::bounded(12); (Self { sender: sender }, receiver) @@ -48,7 +49,7 @@ impl JobDispatcher { /// # Panics /// This function panics when the `JobOrder` struct gets out of scope without returning a finished result. /// Additionally if the internal `Mutex` is poisoned, this function will panic as well. - pub fn send(&self, param: T) -> V { + pub fn send(&self, param: I) -> O { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); @@ -57,7 +58,7 @@ impl JobDispatcher { rx.recv().unwrap() } - pub fn send_async(&self, param: T) -> JobResult { + pub fn send_async(&self, param: I) -> JobResult { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); @@ -68,7 +69,7 @@ impl JobDispatcher { /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. /// Returns `Some(V)` when the job returns an result, `None` if somehow nothing was returned or the internal `Mutex` is poisoned. - pub fn try_send(&self, param: T) -> Option { + pub fn try_send(&self, param: I) -> Option { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); @@ -78,17 +79,17 @@ impl JobDispatcher { } } -/// A struct that represents a job order that encapsulates a job of type `T` and its result of type `V`, along with a callback function that will send the result back to the job origin. -pub struct JobOrder { +/// A struct that represents a job order that encapsulates a job of type `I` and its result of type `O`, along with a callback function that will send the result back to the job origin. +pub struct JobOrder { /// The job parameter of type `T`. - pub param: T, - callback: Box, + pub param: I, + callback: Box, } -impl JobOrder { - /// Creates a new `JobOrder` instance with the specified job parameter `param` of type `T` and a callback function that takes the job result of type `V` as an argument. +impl JobOrder { + /// Creates a new `JobOrder` instance with the specified job parameter `param` of type `I` and a callback function that takes the job result of type `O` as an argument. #[must_use] - fn new(param: T, callback: impl FnOnce(V) + Send + 'static) -> Self { + fn new(param: I, callback: impl FnOnce(O) + Send + 'static) -> Self { Self { param, callback: Box::new(callback), @@ -96,7 +97,44 @@ impl JobOrder { } /// Send the result of the `JobOrder` back to it's origin - pub fn done(self, val: V) { + pub fn done(self, val: O) { (self.callback)(val); } } + +pub struct JobMultiplexer { + dispatchers: Vec>, +} + +fn get_random_item(list: &[T]) -> Option<&T> { + if list.is_empty() { + return None; + } + let mut rng = rand::rng(); + let index = rng.random_range(0..list.len()); + list.get(index) +} + +impl JobMultiplexer { + pub fn from(dispatchers: Vec>) -> Self { + Self { dispatchers } + } + + pub fn send(&self, param: I) -> O { + let d = get_random_item(&self.dispatchers).unwrap(); + d.send(param) + } + + pub fn send_async(&self, param: I) -> JobResult { + let d = get_random_item(&self.dispatchers).unwrap(); + d.send_async(param) + } +} + +impl JobMultiplexer { + pub fn send_all(&self, param: I) { + for d in &self.dispatchers { + let _ = d.send(param.clone()); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index a249b44..d895127 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ pub use serde_json; // TODO : refactor dispatcher backends (memory, valkey) pub static UNION: Lazy< - DashMap<&'static str, job::JobDispatcher>, + DashMap<&'static str, job::JobMultiplexer>, > = Lazy::new(DashMap::new); /// Rally Function