diff --git a/Cargo.lock b/Cargo.lock index 7229682..6c720a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,6 +76,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + [[package]] name = "backtrace" version = "0.3.74" @@ -91,6 +97,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "bitflags" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" + [[package]] name = "cfg-if" version = "1.0.0" @@ -107,11 +119,96 @@ checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" name = "comrade" version = "0.1.0" dependencies = [ + "comrade-macro", + "crossbeam", + "dashmap", "env_logger", "log", + "once_cell", + "serde_json", "tokio", ] +[[package]] +name = "comrade-macro" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "serde_json", + "syn", +] + +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "env_filter" version = "0.1.3" @@ -141,6 +238,12 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "humantime" version = "2.1.0" @@ -153,12 +256,28 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + [[package]] name = "libc" version = "0.2.170" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.26" @@ -195,12 +314,52 @@ version = "1.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "pin-project-lite" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "proc-macro2" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b8c0c260b63a8219631167be35e6a988e9554dbd323f8bd08439c8ed1302bd1" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.11.1" @@ -236,6 +395,67 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.218" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8dfc9d19bdbf6d17e22319da49161d5d0108e4188e8b680aef6299eed22df60" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.218" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f09503e191f4e797cb8aac08e9a4a4695c5edf6a2e70e376d961ddd5c969f82b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "smallvec" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" + +[[package]] +name = "syn" +version = "2.0.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e02e925281e18ffd9d640e234264753c43edc62d64b2d4cf898f1bc5e75f3fc2" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "tokio" version = "1.43.0" @@ -246,6 +466,12 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + [[package]] name = "utf8parse" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index 0d8f24c..ddf0c9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,11 @@ version = "0.1.0" edition = "2024" [dependencies] +crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } +dashmap = "6.1.0" env_logger = "0.11.6" log = "0.4.26" +once_cell = "1.20.3" tokio = "1.43.0" +comrade-macro = { path = "./comrade-macro" } +serde_json = "1.0.140" diff --git a/README.md b/README.md new file mode 100644 index 0000000..8b1bf95 --- /dev/null +++ b/README.md @@ -0,0 +1,103 @@ +# ☭ comrade +`comrade` is a Rust crate designed for managing compute work. It allows seamless management of shared work and functions even across machines. + +## Features + +- **Parallel Execution**: Dispatch tasks to run concurrently and gather their results. +- **Rally Execution**: Run multiple tasks in parallel and return the result of the fastest one. +- **Service Management**: Manage background services with different operating modes (`Decay`, `Daemon`). +- **Worker Unions**: Delegate tasks using `#[worker]` annotations locally or as distributed task queues across multiple machines. +- **Background Tasks**: Seamlessly run background tasks without blocking the main logic of your program. + +## Core Concepts + +### Parallel Execution + +`comrade` provides a simple interface for running tasks in parallel, perfect for independent tasks that can be processed concurrently. + +```rust +let results: Vec = parallel(items, |item: &i32| { + // ... +}); +``` + +### Rally Execution + +The `rally` function allows you to run multiple tasks in parallel and return the result of the **first task to finish**. This is useful when you want to prioritize the first available result from several tasks (example: download from multiple HTTP mirrors). + +```rust +let res: (i32, i32) = rally(items, |item: &i32| { + // ... +}); +``` + +### Background Tasks + +Easily run tasks in the background without blocking the main thread. This is useful for code that needs be run without waiting for a result. + +```rust +fn handle() { + background(|| { + // Background task logic + println!("This is a background task!"); + }); +} +``` + +### Service Management + +`comrade` provides a way to manage persistent services with different modes. The `Decay` mode allows services to die, while the `Daemon` mode revives them and keeps them running indefinitely. + +```rust +use comrade::service::ServiceManager; + +fn run_services() { + let mut manager = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); + + // Register and start services + manager = manager.register("my_service", |_| { + // Service logic here + }); + + let thread_handle = manager.spawn(); + thread_handle.join().unwrap(); +} +``` + +### Worker Unions + +You can annotate a function with `#[worker]` which gives them superpowers. These functions can be queued and dispatched by the system, and their results are returned when completed. + +```rust +use comrade::{worker}; + +#[worker] +pub fn myfn(i: i32) -> i32 { + i * 2 +} + +#[worker] +pub fn multiply(a: i32, b: i32) -> i32 { + a * b +} +``` + +After initialization these functions can then be called anywhere and will be processed eventually by whatever worker picks it up. + +```rust +fn main() { + let mut manager = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); + + // Init worker thread on `ServiceManager` + manager = multiply_init(manager); + let manager = manager.spawn(); + + let res = multiply(2, 2); + + // Shutdown worker thread + multiply_shutdown(); + manager.join().unwrap(); +} +``` + +**Future Plans**: The current system works in-memory with a single worker thread processing tasks. In the future, we plan to extend this to support distributed task queues across multiple machines, enabling shared workloads and infinite scalability. diff --git a/comrade-macro/Cargo.lock b/comrade-macro/Cargo.lock new file mode 100644 index 0000000..c41495c --- /dev/null +++ b/comrade-macro/Cargo.lock @@ -0,0 +1,98 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "comrade-macro" +version = "0.1.0" +dependencies = [ + "proc-macro2", + "quote", + "serde_json", + "syn", +] + +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "proc-macro2" +version = "1.0.94" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31971752e70b8b2686d7e46ec17fb38dad4051d94024c88df49b667caea9c84" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1f1914ce909e1658d9907913b4b91947430c7d9be598b15a1912935b8c04801" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + +[[package]] +name = "serde" +version = "1.0.218" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8dfc9d19bdbf6d17e22319da49161d5d0108e4188e8b680aef6299eed22df60" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.218" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f09503e191f4e797cb8aac08e9a4a4695c5edf6a2e70e376d961ddd5c969f82b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "2.0.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e02e925281e18ffd9d640e234264753c43edc62d64b2d4cf898f1bc5e75f3fc2" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" diff --git a/comrade-macro/Cargo.toml b/comrade-macro/Cargo.toml new file mode 100644 index 0000000..bdf6f55 --- /dev/null +++ b/comrade-macro/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "comrade-macro" +version = "0.1.0" +edition = "2024" + +[dependencies] +proc-macro2 = "1.0.94" +quote = "1.0.39" +serde_json = "1.0.140" +syn = { version = "2.0.99", features = ["full"] } + +[lib] +proc-macro = true diff --git a/comrade-macro/src/lib.rs b/comrade-macro/src/lib.rs new file mode 100644 index 0000000..7ad07b6 --- /dev/null +++ b/comrade-macro/src/lib.rs @@ -0,0 +1,200 @@ +use proc_macro::TokenStream; +use quote::{ToTokens, format_ident, quote}; +use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; + +/// This macro turns this function into a worker. +/// +/// This will upgrade the function and generate a few ones (`fn` is a placeholder for the functions name): +/// - `fn()` - This function will be exactly the same as the original but it will be computed by a worker. +/// - `fn_init(&ServiceManager) -> ServiceManager` - This function registers a worker thread on a `ServiceManager`. +/// - `fn_shutdown()` - This function issues a shutdown request. +/// - `fn_init_scoped(&ServiceManager) -> (ServiceManager, fn_Scoped)` - This function registers a worker thread on a `ServiceManager` and returns a scoped struct. You can call the underlying function with `.call()` on the struct and it will automatically shutdown any workers if it gets out of scope. +/// +/// # Examples +/// ```ignore +/// use comrade::worker; +/// +/// // Declare worker +/// #[worker] +/// pub fn multiply(a: i32, b: i32) -> i32 { +/// a * b +/// } +/// +/// fn main() { +/// let s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); +/// +/// // Init worker thread +/// let s = multiply_init(s); +/// let s = s.spawn(); +/// +/// // Usage +/// let x = multiply(2, 8); +/// println!("myfn {x}"); +/// +/// // Shutdown worker thread +/// multiply_shutdown(); +/// +/// s.join().unwrap(); +/// } +/// ``` +#[proc_macro_attribute] +pub fn worker(_attr: TokenStream, item: TokenStream) -> TokenStream { + let input: ItemFn = parse_macro_input!(item as ItemFn); + let fn_name = &input.sig.ident; + + // Extract parameter names and types separately + let params: Vec<(Ident, Type)> = input + .sig + .inputs + .iter() + .filter_map(|arg| { + if let FnArg::Typed(pat_type) = arg { + let name = if let Pat::Ident(pat_ident) = *pat_type.pat.clone() { + pat_ident.ident.clone() + } else { + return None; + }; + let ty = *pat_type.ty.clone(); + Some((name, ty)) + } else { + None + } + }) + .collect(); + + // Extract parameter names and types into separate lists for quoting + let param_names: Vec = params.iter().map(|(name, _)| name.clone()).collect(); + let param_types: Vec = params.iter().map(|(_, ty)| ty.clone()).collect(); + + for t in ¶m_types { + println!("param {}", t.to_token_stream().to_string()); + } + + // Extract return type + let return_type = match &input.sig.output { + ReturnType::Type(_, ty) => quote!(#ty), + ReturnType::Default => quote!(()), + }; + + // Extract function body + let body = &input.block; + + let wrapper_fn = format_ident!("{}_wrapper", fn_name); + let worker_fn = format_ident!("{}_worker", fn_name); + let init_fn = format_ident!("{}_init", fn_name); + let init_fn_scoped = format_ident!("{}_init_scoped", fn_name); + let fn_scope_struct = format_ident!("{}_Scoped", fn_name); + let shutdown_fn = format_ident!("{}_shutdown", fn_name); + + let param_unpacking = param_names.iter().enumerate().map(|(i, name)| { + if param_names.len() == 1 { + return quote! { + let #name = i; + + }; + } + + let param_type = ¶m_types[i]; + if let Type::Path(_) = param_type { + quote! { + let #name = i.#i; + } + } else { + quote! { + let #name = i; + } + } + }); + + let output = quote! { + pub fn #fn_name(#(#param_names: #param_types),*) -> #return_type { + let i: comrade::serde_json::Value = comrade::serde_json::to_value( (#(#param_names),*) ).unwrap(); + serde_json::from_value(comrade::UNION.get(stringify!(#fn_name)).unwrap().send(i)).unwrap() + } + + fn #wrapper_fn(task: JobOrder) { + let i = task.param.clone(); + + // Deserialize the parameter into the function's expected types + let i: (#(#param_types),*) = comrade::serde_json::from_value(i).unwrap(); + + #(#param_unpacking)* + + let res = #body; + + task.done(comrade::serde_json::to_value(&res).unwrap()); + } + + pub fn #worker_fn(recv: Receiver>) { + loop { + let task = recv.recv(); + + match task { + Ok(task) => { + if let comrade::serde_json::Value::Object(obj) = &task.param { + if obj.contains_key("task") { + log::info!("Shutdown requested for task worker {}", stringify!(#fn_name)); + task.done(comrade::serde_json::json!({"ok": 1})); + break; + } + } + + #wrapper_fn(task) + }, + Err(e) => { + log::error!("Error receiving task: {e:?}"); + } + } + } + } + + #[doc = "Shutdown the worker"] + pub fn #shutdown_fn() { + comrade::UNION.get(stringify!(#fn_name)).unwrap().send(comrade::serde_json::json!({"task": "shutdown"})); + } + + #[doc = "Initialize a worker thread on `ServiceManager`"] + pub fn #init_fn(sm: ServiceManager) -> ServiceManager { + let (dispatch, recv): (JobDispatcher<_, _>, Receiver>) = JobDispatcher::new(); + + let sm = sm.register(stringify!(#worker_fn), move |_| { + #worker_fn(recv.clone()); + }); + + comrade::UNION.insert(stringify!(#fn_name), dispatch); + + sm + } + + #[allow(non_camel_case_types)] + pub struct #fn_scope_struct {} + + impl #fn_scope_struct { + pub fn call(&self, #(#param_names: #param_types),*) -> #return_type { + #fn_name(#(#param_names),*) + } + } + + impl Drop for #fn_scope_struct { + fn drop(&mut self) { + log::info!("Scoped task worker got dropped."); + #shutdown_fn(); + } + } + + #[doc = "Initialize a worker thread on `ServiceManager` on a scoped lifetime"] + pub fn #init_fn_scoped(sm: ServiceManager) -> (ServiceManager, #fn_scope_struct) { + let (dispatch, recv): (JobDispatcher<_, _>, Receiver>) = JobDispatcher::new(); + + let sm = sm.register(stringify!(#worker_fn), move |_| { + #worker_fn(recv.clone()); + }); + + comrade::UNION.insert(stringify!(#fn_name), dispatch); + + (sm, #fn_scope_struct {}) + } + }; + + output.into() +} diff --git a/examples/services.rs b/examples/services.rs index cd83640..5618d60 100644 --- a/examples/services.rs +++ b/examples/services.rs @@ -8,20 +8,66 @@ fn main() { log::info!("Running services example"); // persistent background services - let mut s = ServiceManager::new(); + let mut s_decay = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); - s.register("myservice", |_| { - let mut c = 0; - loop { - // ... - println!("I am doing something!"); - std::thread::sleep(Duration::from_secs(1)); - c += 1; - if c == 3 { - panic!("Oh no!"); + s_decay = s_decay.register( + "myservice", + Box::new(|_| { + let mut c = 0; + loop { + // ... + println!("I am doing something!"); + std::thread::sleep(Duration::from_millis(400)); + c += 1; + if c == 3 { + panic!("Oh no!"); + } } - } - }); + }), + ); + + let s = s_decay.register( + "myservice2", + Box::new(|_| { + let mut c = 0; + loop { + // ... + println!("I am doing something! 2"); + std::thread::sleep(Duration::from_millis(400)); + c += 1; + if c == 3 { + println!("Bye"); + break; + } + } + }), + ); + + let st = s.spawn(); + + st.join().unwrap(); + + println!("Ended decaying ServiceManager"); + + // daemon mode + + let mut s = ServiceManager::new().mode(comrade::service::ServiceMode::Daemon); + + s = s.register( + "myservice", + Box::new(|_| { + let mut c = 0; + loop { + // ... + println!("I am doing something forever!"); + std::thread::sleep(Duration::from_millis(400)); + c += 1; + if c == 3 { + panic!("Oh no!"); + } + } + }), + ); let st = s.spawn(); diff --git a/examples/work.rs b/examples/work.rs new file mode 100644 index 0000000..60e6e65 --- /dev/null +++ b/examples/work.rs @@ -0,0 +1,50 @@ +use comrade::{ + job::{JobDispatcher, JobOrder}, + service::ServiceManager, + worker, +}; +use crossbeam::channel::Receiver; + +#[worker] +pub fn myfn(i: i32) -> i32 { + i * 2 +} + +#[worker] +pub fn multiply(a: i32, b: i32) -> i32 { + a * b +} + +fn do_work(multiply: multiply_Scoped, myfn: myfn_Scoped) { + for i in 0..10 { + let x = multiply.call(i, i); + println!("myfn {i} -> {x}"); + let x = myfn.call(i); + println!("myfn {i} -> {x}"); + } +} + +fn main() { + env_logger::init(); + + let s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); + + let (s, multiply) = multiply_init_scoped(s); + let (s, myfn_fn) = myfn_init_scoped(s); + let s = s.spawn(); + + do_work(multiply, myfn_fn); + + s.join().unwrap(); + + let s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); + let s = myfn_init(s); + let s = s.spawn(); + + let x = myfn(55); + println!("myfn {x}"); + + myfn_shutdown(); + + s.join().unwrap(); +} diff --git a/src/job.rs b/src/job.rs index 48d0035..bb7b1f1 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1,9 +1,10 @@ -use std::sync::{Arc, Mutex, mpsc}; +use crossbeam::channel::{Receiver, Sender}; +use std::sync::mpsc; #[derive(Clone)] /// A generic job dispatcher struct that allows sending jobs of type `T` and receiving results of type `V` using message passing. pub struct JobDispatcher { - sender: Arc>>>, + sender: Sender>, } impl JobDispatcher { @@ -27,14 +28,10 @@ impl JobDispatcher { /// assert_eq!(result, 4); /// ``` #[must_use] - pub fn new() -> (Self, mpsc::Receiver>) { - let (sender, receiver) = mpsc::channel(); - ( - Self { - sender: Arc::new(Mutex::new(sender)), - }, - receiver, - ) + pub fn new() -> (Self, Receiver>) { + let (sender, receiver) = crossbeam::channel::bounded(12); + + (Self { sender: sender }, receiver) } /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. @@ -47,7 +44,7 @@ impl JobDispatcher { let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); }); - self.sender.lock().unwrap().send(job_order).unwrap(); + self.sender.send(job_order).unwrap(); rx.recv().unwrap() } @@ -58,7 +55,7 @@ impl JobDispatcher { let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); }); - self.sender.lock().ok()?.send(job_order).ok()?; + self.sender.send(job_order).ok()?; rx.recv().ok() } } diff --git a/src/lib.rs b/src/lib.rs index abe173f..e41d0ba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,19 @@ use std::{sync::mpsc, thread, time::Instant}; pub mod job; pub mod service; +pub use comrade_macro::worker; +use dashmap::DashMap; +use once_cell::sync::Lazy; +pub use serde_json; + +// TODO : worker docs + refactor +// TODO : worker non blocking fn call +// TODO : worker parallelism (Load Balanced Queue + Multiple Threads) +// TODO : refactor dispatcher backends (memory, valkey) + +pub static UNION: Lazy< + DashMap<&'static str, job::JobDispatcher>, +> = Lazy::new(DashMap::new); /// Rally Function /// diff --git a/src/service.rs b/src/service.rs index 319d8b0..84e76aa 100644 --- a/src/service.rs +++ b/src/service.rs @@ -54,24 +54,47 @@ impl Drop for DeadManSwitch { /// Service Manager /// /// This will manage and keep background services running- -pub struct ServiceManager () + 'static> { - pub services: HashMap>, +pub struct ServiceManager { + pub services: HashMap () + 'static + Send + Sync>>>, + pub mode: ServiceMode, } -impl () + 'static + Send + Sync> ServiceManager { +// TODO : impl decay mode + +/// The mode on which services should operate +pub enum ServiceMode { + /// Behave like a daemon. Services can never die and will come back to life after beeing killed. They will always haunt you. + Daemon, + /// Graceful Decay. Services stay dead after beeing killed. If no services are running anymore the `ServiceManager` will commit suicide. + Decay, +} + +impl ServiceManager { pub fn new() -> Self { Self { services: HashMap::new(), + mode: ServiceMode::Daemon, } } - /// Register a new background service - pub fn register(&mut self, service_name: &str, service: T) { - self.services - .insert(service_name.to_string(), Arc::new(service)); + /// Set a `ServiceMode` to control the behaviour of services. + pub fn mode(mut self, mode: ServiceMode) -> Self { + self.mode = mode; + self } - fn run(self) -> ! { + /// Register a new background service + pub fn register () + 'static + Send + Sync>( + mut self, + service_name: &str, + service: T, + ) -> Self { + self.services + .insert(service_name.to_string(), Arc::new(Box::new(service))); + self + } + + fn run(self) { let mut threads = HashMap::new(); // Initial start @@ -93,6 +116,7 @@ impl () + 'static + Send + Sync> ServiceManager { // Monitor loop loop { let mut threads_new = HashMap::new(); + let mut threads_rm = Vec::new(); for (service, (t, dm)) in &mut threads { let alive = dm.alive(); @@ -102,28 +126,46 @@ impl () + 'static + Send + Sync> ServiceManager { ); if !alive { - log::warn!("Service {service} died. Restarting..."); + // Restart (Daemon) + match self.mode { + ServiceMode::Daemon => { + log::warn!("Service {service} died. Restarting..."); - while !t.is_finished() { - log::debug!("waiting for thread"); + while !t.is_finished() { + log::debug!("waiting for thread"); + } + + let f = Arc::clone(self.services.get(service).unwrap()); + + let (dms, dmr) = DeadManSwitch::new(); + threads_new.insert( + service.clone(), + ( + std::thread::spawn(move || { + f(dms); + }), + dmr, + ), + ); + } + ServiceMode::Decay => { + log::info!("Service {service} died."); + threads_rm.push(service.clone()); + } } - - let f = Arc::clone(self.services.get(service).unwrap()); - - let (dms, dmr) = DeadManSwitch::new(); - threads_new.insert( - service.clone(), - ( - std::thread::spawn(move || { - f(dms); - }), - dmr, - ), - ); } } + for t in threads_rm { + threads.remove(&t); + } + threads.extend(threads_new); + + if threads.is_empty() && matches!(self.mode, ServiceMode::Decay) { + log::info!("All my services are dead. Commiting suicide..."); + break; + } } }