diff --git a/comrade-macro/src/lib.rs b/comrade-macro/src/lib.rs index 7ad07b6..97533d4 100644 --- a/comrade-macro/src/lib.rs +++ b/comrade-macro/src/lib.rs @@ -84,6 +84,7 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream { let init_fn = format_ident!("{}_init", fn_name); let init_fn_scoped = format_ident!("{}_init_scoped", fn_name); let fn_scope_struct = format_ident!("{}_Scoped", fn_name); + let fn_name_async = format_ident!("{}_async", fn_name); let shutdown_fn = format_ident!("{}_shutdown", fn_name); let param_unpacking = param_names.iter().enumerate().map(|(i, name)| { @@ -112,6 +113,12 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream { serde_json::from_value(comrade::UNION.get(stringify!(#fn_name)).unwrap().send(i)).unwrap() } + #[doc = "Will run the function non blocking returning a `JobResult<_>` for fetching a result later."] + pub fn #fn_name_async(#(#param_names: #param_types),*) -> comrade::job::JobResult { + let i: comrade::serde_json::Value = comrade::serde_json::to_value( (#(#param_names),*) ).unwrap(); + comrade::UNION.get(stringify!(#fn_name)).unwrap().send_async(i) + } + fn #wrapper_fn(task: JobOrder) { let i = task.param.clone(); diff --git a/examples/work.rs b/examples/work.rs index 60e6e65..2605670 100644 --- a/examples/work.rs +++ b/examples/work.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use comrade::{ job::{JobDispatcher, JobOrder}, service::ServiceManager, @@ -5,6 +7,11 @@ use comrade::{ }; use crossbeam::channel::Receiver; +#[worker] +pub fn take_time(i: i32) { + std::thread::sleep(Duration::from_millis(i as u64)); +} + #[worker] pub fn myfn(i: i32) -> i32 { i * 2 @@ -39,12 +46,19 @@ fn main() { let s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); let s = myfn_init(s); + let s = take_time_init(s); let s = s.spawn(); let x = myfn(55); println!("myfn {x}"); + // decoupled + let e = take_time_async(1500); + println!("This will run right after!"); + println!("the value is {}", e.wait()); + myfn_shutdown(); + take_time_shutdown(); s.join().unwrap(); } diff --git a/src/job.rs b/src/job.rs index bb7b1f1..c569d85 100644 --- a/src/job.rs +++ b/src/job.rs @@ -7,6 +7,15 @@ pub struct JobDispatcher { sender: Sender>, } +pub struct JobResult(std::sync::mpsc::Receiver); + +impl JobResult { + /// Wait for the Result of a Job. + pub fn wait(self) -> V { + self.0.recv().unwrap() + } +} + impl JobDispatcher { /// Creates a new instance of `JobDispatcher` and returns a tuple that contains it and a receiver end for `JobOrder`s. /// # Example: @@ -48,6 +57,15 @@ impl JobDispatcher { rx.recv().unwrap() } + pub fn send_async(&self, param: T) -> JobResult { + let (tx, rx) = mpsc::channel(); + let job_order = JobOrder::new(param, move |ret| { + tx.send(ret).unwrap(); + }); + self.sender.send(job_order).unwrap(); + JobResult(rx) + } + /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. /// Returns `Some(V)` when the job returns an result, `None` if somehow nothing was returned or the internal `Mutex` is poisoned. pub fn try_send(&self, param: T) -> Option { diff --git a/src/lib.rs b/src/lib.rs index e41d0ba..a249b44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,6 @@ use once_cell::sync::Lazy; pub use serde_json; // TODO : worker docs + refactor -// TODO : worker non blocking fn call // TODO : worker parallelism (Load Balanced Queue + Multiple Threads) // TODO : refactor dispatcher backends (memory, valkey)