diff --git a/Cargo.lock b/Cargo.lock index d5f58e1..6c720a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,12 +103,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - [[package]] name = "cfg-if" version = "1.0.0" @@ -131,7 +125,6 @@ dependencies = [ "env_logger", "log", "once_cell", - "rand", "serde_json", "tokio", ] @@ -239,18 +232,6 @@ dependencies = [ "log", ] -[[package]] -name = "getrandom" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" -dependencies = [ - "cfg-if", - "libc", - "wasi", - "windows-targets", -] - [[package]] name = "gimli" version = "0.31.1" @@ -352,15 +333,6 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" -[[package]] -name = "ppv-lite86" -version = "0.2.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" -dependencies = [ - "zerocopy 0.7.35", -] - [[package]] name = "proc-macro2" version = "1.0.94" @@ -379,36 +351,6 @@ dependencies = [ "proc-macro2", ] -[[package]] -name = "rand" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" -dependencies = [ - "rand_chacha", - "rand_core", - "zerocopy 0.8.23", -] - -[[package]] -name = "rand_chacha" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" -dependencies = [ - "getrandom", -] - [[package]] name = "redox_syscall" version = "0.5.10" @@ -536,15 +478,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" -[[package]] -name = "wasi" -version = "0.13.3+wasi-0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2" -dependencies = [ - "wit-bindgen-rt", -] - [[package]] name = "windows-sys" version = "0.59.0" @@ -617,53 +550,3 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" - -[[package]] -name = "wit-bindgen-rt" -version = "0.33.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" -dependencies = [ - "bitflags", -] - -[[package]] -name = "zerocopy" -version = "0.7.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" -dependencies = [ - "byteorder", - "zerocopy-derive 0.7.35", -] - -[[package]] -name = "zerocopy" -version = "0.8.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd97444d05a4328b90e75e503a34bad781f14e28a823ad3557f0750df1ebcbc6" -dependencies = [ - "zerocopy-derive 0.8.23", -] - -[[package]] -name = "zerocopy-derive" -version = "0.7.35" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "zerocopy-derive" -version = "0.8.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6352c01d0edd5db859a63e2605f4ea3183ddbd15e2c4a9e7d32184df75e4f154" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] diff --git a/Cargo.toml b/Cargo.toml index 8416ea4..ddf0c9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,4 +12,3 @@ once_cell = "1.20.3" tokio = "1.43.0" comrade-macro = { path = "./comrade-macro" } serde_json = "1.0.140" -rand = "0.9.0" diff --git a/README.md b/README.md index 4b8d13e..8b1bf95 100644 --- a/README.md +++ b/README.md @@ -71,14 +71,12 @@ You can annotate a function with `#[worker]` which gives them superpowers. These ```rust use comrade::{worker}; -// Single local worker #[worker] pub fn myfn(i: i32) -> i32 { i * 2 } -// 4 local worker threads -#[worker(4)] +#[worker] pub fn multiply(a: i32, b: i32) -> i32 { a * b } diff --git a/comrade-macro/src/lib.rs b/comrade-macro/src/lib.rs index 7d44f4c..7ad07b6 100644 --- a/comrade-macro/src/lib.rs +++ b/comrade-macro/src/lib.rs @@ -1,5 +1,5 @@ use proc_macro::TokenStream; -use quote::{format_ident, quote}; +use quote::{ToTokens, format_ident, quote}; use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; /// This macro turns this function into a worker. @@ -38,13 +38,10 @@ use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; /// } /// ``` #[proc_macro_attribute] -pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { +pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream { let input: ItemFn = parse_macro_input!(item as ItemFn); let fn_name = &input.sig.ident; - // Parse optional attribute argument (e.g., #[worker(4)]) - let worker_count = attr.to_string().parse::().unwrap_or(1); - // Extract parameter names and types separately let params: Vec<(Ident, Type)> = input .sig @@ -69,6 +66,10 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { let param_names: Vec = params.iter().map(|(name, _)| name.clone()).collect(); let param_types: Vec = params.iter().map(|(_, ty)| ty.clone()).collect(); + for t in ¶m_types { + println!("param {}", t.to_token_stream().to_string()); + } + // Extract return type let return_type = match &input.sig.output { ReturnType::Type(_, ty) => quote!(#ty), @@ -83,7 +84,6 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { let init_fn = format_ident!("{}_init", fn_name); let init_fn_scoped = format_ident!("{}_init_scoped", fn_name); let fn_scope_struct = format_ident!("{}_Scoped", fn_name); - let fn_name_async = format_ident!("{}_async", fn_name); let shutdown_fn = format_ident!("{}_shutdown", fn_name); let param_unpacking = param_names.iter().enumerate().map(|(i, name)| { @@ -112,12 +112,6 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { serde_json::from_value(comrade::UNION.get(stringify!(#fn_name)).unwrap().send(i)).unwrap() } - #[doc = "Will run the function non blocking returning a `JobResult<_>` for fetching a result later."] - pub fn #fn_name_async(#(#param_names: #param_types),*) -> comrade::job::JobResult { - let i: comrade::serde_json::Value = comrade::serde_json::to_value( (#(#param_names),*) ).unwrap(); - comrade::UNION.get(stringify!(#fn_name)).unwrap().send_async(i) - } - fn #wrapper_fn(task: JobOrder) { let i = task.param.clone(); @@ -132,7 +126,6 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { } pub fn #worker_fn(recv: Receiver>) { - let mut metrics = (0, 0); loop { let task = recv.recv(); @@ -140,17 +133,15 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { Ok(task) => { if let comrade::serde_json::Value::Object(obj) = &task.param { if obj.contains_key("task") { - log::info!("Shutdown requested for task worker {}. Processed {} tasks since start with {} errors.", stringify!(#fn_name), metrics.0, metrics.1); + log::info!("Shutdown requested for task worker {}", stringify!(#fn_name)); task.done(comrade::serde_json::json!({"ok": 1})); break; } } - #wrapper_fn(task); - metrics.0 += 1; + #wrapper_fn(task) }, Err(e) => { - metrics.1 += 1; log::error!("Error receiving task: {e:?}"); } } @@ -159,30 +150,20 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { #[doc = "Shutdown the worker"] pub fn #shutdown_fn() { - comrade::UNION.get(stringify!(#fn_name)).unwrap().send_all(comrade::serde_json::json!({"task": "shutdown"})); + comrade::UNION.get(stringify!(#fn_name)).unwrap().send(comrade::serde_json::json!({"task": "shutdown"})); } #[doc = "Initialize a worker thread on `ServiceManager`"] pub fn #init_fn(sm: ServiceManager) -> ServiceManager { - let mut dispatchers = Vec::new(); - let mut s = sm; + let (dispatch, recv): (JobDispatcher<_, _>, Receiver>) = JobDispatcher::new(); - for i in 0..#worker_count { - let (dispatch, recv): (JobDispatcher<_, _>, Receiver>) = JobDispatcher::new(); + let sm = sm.register(stringify!(#worker_fn), move |_| { + #worker_fn(recv.clone()); + }); - s = s.register( - &format!("{}_{i}", stringify!(#worker_fn)), - move |_| { - #worker_fn(recv.clone()) - } - ); + comrade::UNION.insert(stringify!(#fn_name), dispatch); - dispatchers.push(dispatch); - } - - comrade::UNION.insert(stringify!(#fn_name), comrade::job::JobMultiplexer::from(dispatchers)); - - s + sm } #[allow(non_camel_case_types)] @@ -203,7 +184,14 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { #[doc = "Initialize a worker thread on `ServiceManager` on a scoped lifetime"] pub fn #init_fn_scoped(sm: ServiceManager) -> (ServiceManager, #fn_scope_struct) { - let sm = #init_fn(sm); + let (dispatch, recv): (JobDispatcher<_, _>, Receiver>) = JobDispatcher::new(); + + let sm = sm.register(stringify!(#worker_fn), move |_| { + #worker_fn(recv.clone()); + }); + + comrade::UNION.insert(stringify!(#fn_name), dispatch); + (sm, #fn_scope_struct {}) } }; diff --git a/examples/work.rs b/examples/work.rs index ecc64fb..60e6e65 100644 --- a/examples/work.rs +++ b/examples/work.rs @@ -1,5 +1,3 @@ -use std::time::Duration; - use comrade::{ job::{JobDispatcher, JobOrder}, service::ServiceManager, @@ -7,17 +5,12 @@ use comrade::{ }; use crossbeam::channel::Receiver; -#[worker] -pub fn take_time(i: i32) { - std::thread::sleep(Duration::from_millis(i as u64)); -} - #[worker] pub fn myfn(i: i32) -> i32 { i * 2 } -#[worker(4)] +#[worker] pub fn multiply(a: i32, b: i32) -> i32 { a * b } @@ -46,19 +39,12 @@ fn main() { let s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); let s = myfn_init(s); - let s = take_time_init(s); let s = s.spawn(); let x = myfn(55); println!("myfn {x}"); - // decoupled - let e = take_time_async(1500); - println!("This will run right after!"); - println!("the value is {}", e.wait()); - myfn_shutdown(); - take_time_shutdown(); s.join().unwrap(); } diff --git a/src/job.rs b/src/job.rs index 1c85ab2..bb7b1f1 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1,23 +1,13 @@ use crossbeam::channel::{Receiver, Sender}; -use rand::Rng; use std::sync::mpsc; #[derive(Clone)] -/// A generic job dispatcher struct that allows sending jobs of type `I` and receiving results of type `O` using message passing. -pub struct JobDispatcher { - sender: Sender>, +/// A generic job dispatcher struct that allows sending jobs of type `T` and receiving results of type `V` using message passing. +pub struct JobDispatcher { + sender: Sender>, } -pub struct JobResult(std::sync::mpsc::Receiver); - -impl JobResult { - /// Wait for the Result of a Job. - pub fn wait(self) -> O { - self.0.recv().unwrap() - } -} - -impl JobDispatcher { +impl JobDispatcher { /// Creates a new instance of `JobDispatcher` and returns a tuple that contains it and a receiver end for `JobOrder`s. /// # Example: /// ``` @@ -38,7 +28,7 @@ impl JobDispatcher { /// assert_eq!(result, 4); /// ``` #[must_use] - pub fn new() -> (Self, Receiver>) { + pub fn new() -> (Self, Receiver>) { let (sender, receiver) = crossbeam::channel::bounded(12); (Self { sender: sender }, receiver) @@ -49,7 +39,7 @@ impl JobDispatcher { /// # Panics /// This function panics when the `JobOrder` struct gets out of scope without returning a finished result. /// Additionally if the internal `Mutex` is poisoned, this function will panic as well. - pub fn send(&self, param: I) -> O { + pub fn send(&self, param: T) -> V { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); @@ -58,18 +48,9 @@ impl JobDispatcher { rx.recv().unwrap() } - pub fn send_async(&self, param: I) -> JobResult { - let (tx, rx) = mpsc::channel(); - let job_order = JobOrder::new(param, move |ret| { - tx.send(ret).unwrap(); - }); - self.sender.send(job_order).unwrap(); - JobResult(rx) - } - /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. /// Returns `Some(V)` when the job returns an result, `None` if somehow nothing was returned or the internal `Mutex` is poisoned. - pub fn try_send(&self, param: I) -> Option { + pub fn try_send(&self, param: T) -> Option { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); @@ -79,17 +60,17 @@ impl JobDispatcher { } } -/// A struct that represents a job order that encapsulates a job of type `I` and its result of type `O`, along with a callback function that will send the result back to the job origin. -pub struct JobOrder { +/// A struct that represents a job order that encapsulates a job of type `T` and its result of type `V`, along with a callback function that will send the result back to the job origin. +pub struct JobOrder { /// The job parameter of type `T`. - pub param: I, - callback: Box, + pub param: T, + callback: Box, } -impl JobOrder { - /// Creates a new `JobOrder` instance with the specified job parameter `param` of type `I` and a callback function that takes the job result of type `O` as an argument. +impl JobOrder { + /// Creates a new `JobOrder` instance with the specified job parameter `param` of type `T` and a callback function that takes the job result of type `V` as an argument. #[must_use] - fn new(param: I, callback: impl FnOnce(O) + Send + 'static) -> Self { + fn new(param: T, callback: impl FnOnce(V) + Send + 'static) -> Self { Self { param, callback: Box::new(callback), @@ -97,44 +78,7 @@ impl JobOrder { } /// Send the result of the `JobOrder` back to it's origin - pub fn done(self, val: O) { + pub fn done(self, val: V) { (self.callback)(val); } } - -pub struct JobMultiplexer { - dispatchers: Vec>, -} - -fn get_random_item(list: &[T]) -> Option<&T> { - if list.is_empty() { - return None; - } - let mut rng = rand::rng(); - let index = rng.random_range(0..list.len()); - list.get(index) -} - -impl JobMultiplexer { - pub fn from(dispatchers: Vec>) -> Self { - Self { dispatchers } - } - - pub fn send(&self, param: I) -> O { - let d = get_random_item(&self.dispatchers).unwrap(); - d.send(param) - } - - pub fn send_async(&self, param: I) -> JobResult { - let d = get_random_item(&self.dispatchers).unwrap(); - d.send_async(param) - } -} - -impl JobMultiplexer { - pub fn send_all(&self, param: I) { - for d in &self.dispatchers { - let _ = d.send(param.clone()); - } - } -} diff --git a/src/lib.rs b/src/lib.rs index d895127..e41d0ba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,11 +8,12 @@ use once_cell::sync::Lazy; pub use serde_json; // TODO : worker docs + refactor +// TODO : worker non blocking fn call // TODO : worker parallelism (Load Balanced Queue + Multiple Threads) // TODO : refactor dispatcher backends (memory, valkey) pub static UNION: Lazy< - DashMap<&'static str, job::JobMultiplexer>, + DashMap<&'static str, job::JobDispatcher>, > = Lazy::new(DashMap::new); /// Rally Function