multi threaded workers

This commit is contained in:
JMARyA 2025-03-08 18:01:16 +01:00
parent 0c190df5d7
commit e827faaa3f
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
7 changed files with 208 additions and 45 deletions

View file

@ -1,5 +1,5 @@
use proc_macro::TokenStream;
use quote::{ToTokens, format_ident, quote};
use quote::{format_ident, quote};
use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input};
/// This macro turns this function into a worker.
@ -38,10 +38,13 @@ use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input};
/// }
/// ```
#[proc_macro_attribute]
pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream {
pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream {
let input: ItemFn = parse_macro_input!(item as ItemFn);
let fn_name = &input.sig.ident;
// Parse optional attribute argument (e.g., #[worker(4)])
let worker_count = attr.to_string().parse::<usize>().unwrap_or(1);
// Extract parameter names and types separately
let params: Vec<(Ident, Type)> = input
.sig
@ -66,10 +69,6 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream {
let param_names: Vec<Ident> = params.iter().map(|(name, _)| name.clone()).collect();
let param_types: Vec<Type> = params.iter().map(|(_, ty)| ty.clone()).collect();
for t in &param_types {
println!("param {}", t.to_token_stream().to_string());
}
// Extract return type
let return_type = match &input.sig.output {
ReturnType::Type(_, ty) => quote!(#ty),
@ -133,6 +132,7 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream {
}
pub fn #worker_fn(recv: Receiver<JobOrder<comrade::serde_json::Value, comrade::serde_json::Value>>) {
let mut metrics = (0, 0);
loop {
let task = recv.recv();
@ -140,15 +140,17 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream {
Ok(task) => {
if let comrade::serde_json::Value::Object(obj) = &task.param {
if obj.contains_key("task") {
log::info!("Shutdown requested for task worker {}", stringify!(#fn_name));
log::info!("Shutdown requested for task worker {}. Processed {} tasks since start with {} errors.", stringify!(#fn_name), metrics.0, metrics.1);
task.done(comrade::serde_json::json!({"ok": 1}));
break;
}
}
#wrapper_fn(task)
#wrapper_fn(task);
metrics.0 += 1;
},
Err(e) => {
metrics.1 += 1;
log::error!("Error receiving task: {e:?}");
}
}
@ -157,20 +159,30 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream {
#[doc = "Shutdown the worker"]
pub fn #shutdown_fn() {
comrade::UNION.get(stringify!(#fn_name)).unwrap().send(comrade::serde_json::json!({"task": "shutdown"}));
comrade::UNION.get(stringify!(#fn_name)).unwrap().send_all(comrade::serde_json::json!({"task": "shutdown"}));
}
#[doc = "Initialize a worker thread on `ServiceManager`"]
pub fn #init_fn(sm: ServiceManager) -> ServiceManager {
let (dispatch, recv): (JobDispatcher<_, _>, Receiver<JobOrder<_, _>>) = JobDispatcher::new();
let mut dispatchers = Vec::new();
let mut s = sm;
let sm = sm.register(stringify!(#worker_fn), move |_| {
#worker_fn(recv.clone());
});
for i in 0..#worker_count {
let (dispatch, recv): (JobDispatcher<_, _>, Receiver<JobOrder<_, _>>) = JobDispatcher::new();
comrade::UNION.insert(stringify!(#fn_name), dispatch);
s = s.register(
&format!("{}_{i}", stringify!(#worker_fn)),
move |_| {
#worker_fn(recv.clone())
}
);
sm
dispatchers.push(dispatch);
}
comrade::UNION.insert(stringify!(#fn_name), comrade::job::JobMultiplexer::from(dispatchers));
s
}
#[allow(non_camel_case_types)]
@ -191,14 +203,7 @@ pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream {
#[doc = "Initialize a worker thread on `ServiceManager` on a scoped lifetime"]
pub fn #init_fn_scoped(sm: ServiceManager) -> (ServiceManager, #fn_scope_struct) {
let (dispatch, recv): (JobDispatcher<_, _>, Receiver<JobOrder<_, _>>) = JobDispatcher::new();
let sm = sm.register(stringify!(#worker_fn), move |_| {
#worker_fn(recv.clone());
});
comrade::UNION.insert(stringify!(#fn_name), dispatch);
let sm = #init_fn(sm);
(sm, #fn_scope_struct {})
}
};