diff --git a/Cargo.lock b/Cargo.lock index d5f58e1..c161fd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,7 +62,7 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" dependencies = [ - "windows-sys", + "windows-sys 0.59.0", ] [[package]] @@ -73,9 +73,15 @@ checksum = "ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e" dependencies = [ "anstyle", "once_cell", - "windows-sys", + "windows-sys 0.59.0", ] +[[package]] +name = "arc-swap" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" + [[package]] name = "autocfg" version = "1.4.0" @@ -109,6 +115,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + [[package]] name = "cfg-if" version = "1.0.0" @@ -121,6 +133,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "comrade" version = "0.1.0" @@ -132,8 +154,11 @@ dependencies = [ "log", "once_cell", "rand", + "redis", + "serde", "serde_json", "tokio", + "uuid", ] [[package]] @@ -216,6 +241,17 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "env_filter" version = "0.1.3" @@ -239,6 +275,15 @@ dependencies = [ "log", ] +[[package]] +name = "form_urlencoded" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" +dependencies = [ + "percent-encoding", +] + [[package]] name = "getrandom" version = "0.3.1" @@ -269,6 +314,145 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "icu_collections" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_locid_transform" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_locid_transform_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_locid_transform_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" + +[[package]] +name = "icu_normalizer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "utf16_iter", + "utf8_iter", + "write16", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" + +[[package]] +name = "icu_properties" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locid_transform", + "icu_properties_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" + +[[package]] +name = "icu_provider" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_provider_macros", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_provider_macros" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "idna" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -287,6 +471,12 @@ version = "0.2.170" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +[[package]] +name = "litemap" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23fb14cb19457329c82206317a5663005a4d404783dc74f4252769b0d5f42856" + [[package]] name = "lock_api" version = "0.4.12" @@ -318,6 +508,34 @@ dependencies = [ "adler2", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.36.7" @@ -346,6 +564,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "percent-encoding" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" + [[package]] name = "pin-project-lite" version = "0.2.16" @@ -409,6 +633,23 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redis" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8034fb926579ff49d3fe58d288d5dcb580bf11e9bccd33224b45adebf0fd0c23" +dependencies = [ + "arc-swap", + "combine", + "itoa", + "num-bigint", + "percent-encoding", + "ryu", + "sha1_smol", + "socket2", + "url", +] + [[package]] name = "redox_syscall" version = "0.5.10" @@ -497,12 +738,34 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "smallvec" version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" +[[package]] +name = "socket2" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c970269d99b64e60ec3bd6ad27270092a5394c4e309314b18ae3fe575695fbe8" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "syn" version = "2.0.99" @@ -514,6 +777,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tinystr" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tokio" version = "1.43.0" @@ -530,12 +814,44 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "url" +version = "2.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf16_iter" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0f540e3240398cce6128b64ba83fdbdd86129c16a3aa1a3a252efd66eb3d587" +dependencies = [ + "getrandom", +] + [[package]] name = "wasi" version = "0.13.3+wasi-0.2.2" @@ -545,6 +861,15 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.59.0" @@ -627,6 +952,42 @@ dependencies = [ "bitflags", ] +[[package]] +name = "write16" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" + +[[package]] +name = "writeable" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" + +[[package]] +name = "yoke" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "120e6aef9aa629e3d4f52dc8cc43a015c7724194c97dfaf45180d2daf2b77f40" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -667,3 +1028,46 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerovec" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index 8416ea4..0b1331d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,6 @@ tokio = "1.43.0" comrade-macro = { path = "./comrade-macro" } serde_json = "1.0.140" rand = "0.9.0" +redis = "0.29.1" +serde = "1.0.218" +uuid = { version = "1.15.1", features = ["v4"] } diff --git a/README.md b/README.md index 4b8d13e..4c7ecc9 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ let res: (i32, i32) = rally(items, |item: &i32| { ### Background Tasks -Easily run tasks in the background without blocking the main thread. This is useful for code that needs be run without waiting for a result. +Easily run tasks in the background without blocking the main thread. This is useful for code that needs to be run without waiting for a result. ```rust fn handle() { @@ -85,6 +85,7 @@ pub fn multiply(a: i32, b: i32) -> i32 { ``` After initialization these functions can then be called anywhere and will be processed eventually by whatever worker picks it up. +Additionally there are new functions derived from your function. See the below example: ```rust fn main() { @@ -94,12 +95,63 @@ fn main() { manager = multiply_init(manager); let manager = manager.spawn(); + // works like the original function let res = multiply(2, 2); + // async + let e = take_time_async(1500); + + println!("This will run right after!"); + // ... + + // is OUR value ready? + println!("the value is {}", e.wait()); + // Shutdown worker thread multiply_shutdown(); manager.join().unwrap(); } ``` -**Future Plans**: The current system works in-memory with a single worker thread processing tasks. In the future, we plan to extend this to support distributed task queues across multiple machines, enabling shared workloads and infinite scalability. +These tasks can now be distributed with Valkey. + +Make sure you have a Valkey server running and the `$VALKEY_URL` environment variable is set for your application: +```yml +services: + valkey: + image: valkey/valkey + ports: + - 6379:6379 +``` + +Then you can spawn worker threads like that: + +```rust +fn main() { + let mut s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); + s = multiply_init_union(s); + s = myfn_init_union(s); + let s = s.spawn(); + + log::info!("Spawned workers. Working for 1 minute"); + std::thread::sleep(Duration::from_secs(60)); + + myfn_shutdown(); + multiply_shutdown(); + + s.join().unwrap(); +} +``` + +When workers are running, you can use them like: + +```rust +fn main() { + // Register workers in union + myfn_register_union(); + + // Will be computed somewhere else + let x = myfn(50); + println!("x is {x}"); +} +``` diff --git a/comrade-macro/src/lib.rs b/comrade-macro/src/lib.rs index 7d44f4c..ac7cb87 100644 --- a/comrade-macro/src/lib.rs +++ b/comrade-macro/src/lib.rs @@ -5,10 +5,13 @@ use syn::{FnArg, Ident, ItemFn, Pat, ReturnType, Type, parse_macro_input}; /// This macro turns this function into a worker. /// /// This will upgrade the function and generate a few ones (`fn` is a placeholder for the functions name): -/// - `fn()` - This function will be exactly the same as the original but it will be computed by a worker. -/// - `fn_init(&ServiceManager) -> ServiceManager` - This function registers a worker thread on a `ServiceManager`. -/// - `fn_shutdown()` - This function issues a shutdown request. -/// - `fn_init_scoped(&ServiceManager) -> (ServiceManager, fn_Scoped)` - This function registers a worker thread on a `ServiceManager` and returns a scoped struct. You can call the underlying function with `.call()` on the struct and it will automatically shutdown any workers if it gets out of scope. +/// - `fn()` - Behaves exactly the same as the original but it will be computed by a worker. +/// - `fn_init(ServiceManager) -> ServiceManager` - Registers worker threads on a `ServiceManager`. +/// - `fn_shutdown()` - Issue a shutdown request. +/// - `fn_init_scoped(ServiceManager) -> (ServiceManager, fn_Scoped)` - Registers worker threads on a `ServiceManager` and return a scoped struct. You can call the underlying function with `.call()` on the struct and it will automatically shutdown any workers if it gets out of scope. +/// - `fn_init_union(ServiceManager) -> ServiceManager` - Register worker threads and work in a union using Valkey. +/// - `fn_init_union_scoped(ServiceManager) -> ServiceManager` - Register worker threads and work in a union using Valkey and return a scoped struct. +/// - `fn_register_union()` - Register the worker in a union setup without starting local worker threads. /// /// # Examples /// ```ignore @@ -81,9 +84,12 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { let wrapper_fn = format_ident!("{}_wrapper", fn_name); let worker_fn = format_ident!("{}_worker", fn_name); let init_fn = format_ident!("{}_init", fn_name); + let init_fn_union = format_ident!("{}_init_union", fn_name); let init_fn_scoped = format_ident!("{}_init_scoped", fn_name); + let init_fn_scoped_union = format_ident!("{}_init_union_scoped", fn_name); let fn_scope_struct = format_ident!("{}_Scoped", fn_name); let fn_name_async = format_ident!("{}_async", fn_name); + let init_fn_register = format_ident!("{}_register_union", fn_name); let shutdown_fn = format_ident!("{}_shutdown", fn_name); let param_unpacking = param_names.iter().enumerate().map(|(i, name)| { @@ -109,13 +115,17 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { let output = quote! { pub fn #fn_name(#(#param_names: #param_types),*) -> #return_type { let i: comrade::serde_json::Value = comrade::serde_json::to_value( (#(#param_names),*) ).unwrap(); - serde_json::from_value(comrade::UNION.get(stringify!(#fn_name)).unwrap().send(i)).unwrap() + serde_json::from_value(comrade::UNION.get(stringify!(#fn_name)).expect("Function call is not registered in UNION").send(i)).unwrap() } #[doc = "Will run the function non blocking returning a `JobResult<_>` for fetching a result later."] pub fn #fn_name_async(#(#param_names: #param_types),*) -> comrade::job::JobResult { let i: comrade::serde_json::Value = comrade::serde_json::to_value( (#(#param_names),*) ).unwrap(); - comrade::UNION.get(stringify!(#fn_name)).unwrap().send_async(i) + if let Some(dispatch) = comrade::UNION.get(stringify!(#fn_name)) { + dispatch.send_async(i) + } else { + panic!("Function call {} is not registered in UNION", stringify!(#fn_name)) + } } fn #wrapper_fn(task: JobOrder) { @@ -131,7 +141,7 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { task.done(comrade::serde_json::to_value(&res).unwrap()); } - pub fn #worker_fn(recv: Receiver>) { + pub fn #worker_fn(recv: comrade::job::TaskReceiverBackend) { let mut metrics = (0, 0); loop { let task = recv.recv(); @@ -167,17 +177,59 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { let mut dispatchers = Vec::new(); let mut s = sm; + log::info!("Initializing worker {} with {} threads", stringify!(#worker_fn), #worker_count); + for i in 0..#worker_count { let (dispatch, recv): (JobDispatcher<_, _>, Receiver>) = JobDispatcher::new(); s = s.register( &format!("{}_{i}", stringify!(#worker_fn)), move |_| { - #worker_fn(recv.clone()) + #worker_fn(comrade::job::TaskReceiverBackend::Local(recv.clone())) } ); - dispatchers.push(dispatch); + dispatchers.push(comrade::job::Dispatcher::Local(dispatch)); + } + + comrade::UNION.insert(stringify!(#fn_name), comrade::job::JobMultiplexer::from(dispatchers)); + + s + } + + + #[doc = "Register worker union on Valkey backend without starting local workers."] + pub fn #init_fn_register() { + let mut dispatchers = Vec::new(); + + log::info!("Registering worker union {}", stringify!(#worker_fn)); + + let dispatch = comrade::job::ValkeyJobDispatcher::<_, _>::new_topic(stringify!(#worker_fn), false); + dispatchers.push(comrade::job::Dispatcher::Union(dispatch)); + + comrade::UNION.insert(stringify!(#fn_name), comrade::job::JobMultiplexer::from(dispatchers)); + } + + #[doc = "Initialize worker threads on `ServiceManager` with Valkey backend"] + pub fn #init_fn_union(sm: ServiceManager) -> ServiceManager { + let mut dispatchers = Vec::new(); + let mut s = sm; + + log::info!("Initializing worker union {} with {} threads", stringify!(#worker_fn), #worker_count); + + for i in 0..#worker_count { + let dispatch = comrade::job::ValkeyJobDispatcher::<_, _>::new_topic(stringify!(#worker_fn), true); + + let recv = dispatch.clone(); + + s = s.register( + &format!("{}_union_{i}", stringify!(#worker_fn)), + move |_| { + #worker_fn(comrade::job::TaskReceiverBackend::Union(recv.clone())) + } + ); + + dispatchers.push(comrade::job::Dispatcher::Union(dispatch)); } comrade::UNION.insert(stringify!(#fn_name), comrade::job::JobMultiplexer::from(dispatchers)); @@ -206,6 +258,12 @@ pub fn worker(attr: TokenStream, item: TokenStream) -> TokenStream { let sm = #init_fn(sm); (sm, #fn_scope_struct {}) } + + #[doc = "Initialize a worker union on `ServiceManager` on a scoped lifetime"] + pub fn #init_fn_scoped_union(sm: ServiceManager) -> (ServiceManager, #fn_scope_struct) { + let sm = #init_fn_union(sm); + (sm, #fn_scope_struct {}) + } }; output.into() diff --git a/examples/work_union.rs b/examples/work_union.rs new file mode 100644 index 0000000..c17ee10 --- /dev/null +++ b/examples/work_union.rs @@ -0,0 +1,76 @@ +use std::time::Duration; + +use comrade::{ + job::{JobDispatcher, JobOrder}, + service::ServiceManager, + worker, +}; +use crossbeam::channel::Receiver; + +#[worker] +pub fn take_time(i: i32) { + std::thread::sleep(Duration::from_millis(i as u64)); +} + +#[worker] +pub fn myfn(i: i32) -> i32 { + i * 2 +} + +#[worker(4)] +pub fn multiply(a: i32, b: i32) -> i32 { + a * b +} + +fn do_work(multiply: multiply_Scoped, myfn: myfn_Scoped) { + for i in 0..10 { + let x = multiply.call(i, i); + println!("myfn {i} -> {x}"); + let x = myfn.call(i); + println!("myfn {i} -> {x}"); + } +} + +fn main() { + env_logger::init(); + + let args: Vec = std::env::args().collect(); + + match args.get(1).unwrap().as_str() { + "serve" => { + let mut s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay); + s = multiply_init_union(s); + s = myfn_init_union(s); + s = take_time_init_union(s); + let s = s.spawn(); + + log::info!("Spawned workers. Working for 1 minute"); + + std::thread::sleep(Duration::from_secs(60)); + + myfn_shutdown(); + multiply_shutdown(); + take_time_shutdown(); + + s.join().unwrap(); + } + "use" => { + myfn_register_union(); + multiply_register_union(); + take_time_register_union(); + + do_work(multiply_Scoped {}, myfn_Scoped {}); + + let x = myfn(55); + println!("myfn {x}"); + + // decoupled + let e = take_time_async(1500); + println!("This will run right after!"); + println!("the value is {}", e.wait()); + } + _ => { + println!("Unknown command"); + } + }; +} diff --git a/src/job.rs b/src/job.rs index 1c85ab2..3a52009 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1,20 +1,218 @@ use crossbeam::channel::{Receiver, Sender}; use rand::Rng; +use redis::{Commands, RedisResult}; +use serde::{Deserialize, Serialize}; use std::sync::mpsc; +pub enum TaskReceiverBackend { + Local(Receiver>), + Union(ValkeyJobDispatcher), +} + +impl Deserialize<'a>, O: Serialize + for<'a> Deserialize<'a>> + TaskReceiverBackend +{ + pub fn recv(&self) -> Result, String> { + match self { + TaskReceiverBackend::Local(receiver) => receiver.recv().map_err(|x| x.to_string()), + TaskReceiverBackend::Union(valkey_job_dispatcher) => valkey_job_dispatcher.recv(), + } + } +} + +pub struct ValkeyTopicSubscriber { + output: std::marker::PhantomData, + topic: String, + client: redis::Client, +} + +impl Deserialize<'a>> ValkeyTopicSubscriber { + pub fn new(channel: &str) -> Self { + let client = + redis::Client::open(std::env::var("VALKEY_URL").expect("No $VALKEY_URL variable set")) + .expect("Invalid Redis URL"); + Self { + output: std::marker::PhantomData, + topic: channel.to_string(), + client: client, + } + } + + pub fn recv(&self) -> Option { + let mut con = self + .client + .get_connection() + .expect("Failed to connect to Redis"); + + let result: RedisResult> = con.blpop(&self.topic, 0.0); + + match result { + Ok(msg) => { + let msg = msg.iter().nth(1).unwrap(); + + Some(serde_json::from_str(&msg).unwrap()) + } + Err(_) => None, + } + } +} + +#[derive(Clone)] +pub struct ValkeyJobDispatcher { + input: std::marker::PhantomData, + output: std::marker::PhantomData, + topic: String, + client: redis::Client, + local: bool, +} + +impl Deserialize<'a>, O: Serialize + for<'a> Deserialize<'a>> + ValkeyJobDispatcher +{ + // Creates a new job dispatcher for the given topic. + pub fn new_topic(topic: &str, local: bool) -> Self { + let client = + redis::Client::open(std::env::var("VALKEY_URL").expect("No $VALKEY_URL variable set")) + .expect("Invalid Redis URL"); + ValkeyJobDispatcher { + input: std::marker::PhantomData, + output: std::marker::PhantomData, + topic: topic.to_string(), + client, + local, + } + } + + // todo : real pub sub + pub fn recv(&self) -> Result, String> { + let mut con = self + .client + .get_connection() + .expect("Failed to connect to Redis"); + + let result: RedisResult> = con.blpop(&self.topic, 0.0); + + match result { + Ok(msg) => { + let msg = msg.iter().nth(1).unwrap(); + + if let serde_json::Value::Object(task) = serde_json::from_str(&msg).unwrap() { + let channel_id = task.get("task").unwrap().as_str().unwrap().to_string(); + let params = task.get("params").unwrap(); + Ok(JobOrder::new( + serde_json::from_value(params.clone()).unwrap(), + move |res| { + // send back to channel + let _: () = con + .rpush( + &channel_id, + serde_json::to_string(&serde_json::to_value(&res).unwrap()) + .unwrap(), + ) + .expect("Failed to send job"); + }, + )) + } else { + Err(String::new()) + } + } + Err(e) => { + log::error!("Valkey error: {e:?}"); + Err(e.to_string()) + } + } + } +} + +impl Deserialize<'a>, O: for<'a> Deserialize<'a> + Serialize> + JobDispatch for ValkeyJobDispatcher +{ + // Sends a job to the Redis topic (publishes a message). + fn send(&self, param: I) -> O { + let mut con = self + .client + .get_connection() + .expect("Failed to connect to Redis"); + + // Pushing the job to the topic in Redis + let channel_id = uuid::Uuid::new_v4().to_string(); + let _: () = con + .rpush( + &self.topic, + serde_json::to_string(&serde_json::json!({ + "task": channel_id, + "params": ¶m + })) + .unwrap(), + ) + .expect("Failed to send job"); + + ValkeyTopicSubscriber::new(&channel_id).recv().unwrap() + } + + // Sends a job asynchronously (non-blocking). + fn send_async(&self, param: I) -> JobResult { + let mut con = self + .client + .get_connection() + .expect("Failed to connect to Redis"); + + // Pushing the job to the topic in Redis + let channel_id = uuid::Uuid::new_v4().to_string(); + let _: () = con + .rpush( + &self.topic, + serde_json::to_string(&serde_json::json!({ + "task": channel_id, + "params": ¶m + })) + .unwrap(), + ) + .expect("Failed to send job"); + + JobResult(ReceiverBackend::Valkey(ValkeyTopicSubscriber::new( + &channel_id, + ))) + } + + // Tries to send a job, returning None if unsuccessful. + fn try_send(&self, param: I) -> Option { + let res = self.send_async(param); + res.wait_try() + } +} + #[derive(Clone)] /// A generic job dispatcher struct that allows sending jobs of type `I` and receiving results of type `O` using message passing. pub struct JobDispatcher { sender: Sender>, } -pub struct JobResult(std::sync::mpsc::Receiver); +pub enum ReceiverBackend { + Local(std::sync::mpsc::Receiver), + Valkey(ValkeyTopicSubscriber), +} -impl JobResult { +impl Deserialize<'a>> ReceiverBackend { + pub fn recv(&self) -> Option { + match self { + ReceiverBackend::Local(receiver) => receiver.recv().ok(), + ReceiverBackend::Valkey(valkey) => valkey.recv(), + } + } +} + +pub struct JobResult(ReceiverBackend); + +impl Deserialize<'a>> JobResult { /// Wait for the Result of a Job. pub fn wait(self) -> O { self.0.recv().unwrap() } + + pub fn wait_try(self) -> Option { + self.0.recv() + } } impl JobDispatcher { @@ -43,13 +241,15 @@ impl JobDispatcher { (Self { sender: sender }, receiver) } +} +impl JobDispatch for JobDispatcher { /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. /// Returns the result of the job once it has been processed. /// # Panics /// This function panics when the `JobOrder` struct gets out of scope without returning a finished result. /// Additionally if the internal `Mutex` is poisoned, this function will panic as well. - pub fn send(&self, param: I) -> O { + fn send(&self, param: I) -> O { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); @@ -58,18 +258,18 @@ impl JobDispatcher { rx.recv().unwrap() } - pub fn send_async(&self, param: I) -> JobResult { + fn send_async(&self, param: I) -> JobResult { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); }); self.sender.send(job_order).unwrap(); - JobResult(rx) + JobResult(ReceiverBackend::Local(rx)) } /// Sends a job of type `T` to the job dispatcher and waits for its result of type `V`. /// Returns `Some(V)` when the job returns an result, `None` if somehow nothing was returned or the internal `Mutex` is poisoned. - pub fn try_send(&self, param: I) -> Option { + fn try_send(&self, param: I) -> Option { let (tx, rx) = mpsc::channel(); let job_order = JobOrder::new(param, move |ret| { tx.send(ret).unwrap(); @@ -79,6 +279,12 @@ impl JobDispatcher { } } +pub trait JobDispatch { + fn send(&self, param: I) -> O; + fn send_async(&self, param: I) -> JobResult; + fn try_send(&self, param: I) -> Option; +} + /// A struct that represents a job order that encapsulates a job of type `I` and its result of type `O`, along with a callback function that will send the result back to the job origin. pub struct JobOrder { /// The job parameter of type `T`. @@ -102,8 +308,40 @@ impl JobOrder { } } +pub enum Dispatcher { + Local(JobDispatcher), + Union(ValkeyJobDispatcher), +} + +impl< + I: Serialize + for<'a> Deserialize<'a> + Send + 'static, + O: Serialize + for<'a> Deserialize<'a> + Send + 'static, +> Dispatcher +{ + pub fn is_local(&self) -> bool { + match self { + Dispatcher::Local(_) => true, + Dispatcher::Union(valkey_job_dispatcher) => valkey_job_dispatcher.local, + } + } + + fn send(&self, param: I) -> O { + match self { + Dispatcher::Local(job_dispatcher) => job_dispatcher.send(param), + Dispatcher::Union(valkey_job_dispatcher) => valkey_job_dispatcher.send(param), + } + } + + fn send_async(&self, param: I) -> JobResult { + match self { + Dispatcher::Local(job_dispatcher) => job_dispatcher.send_async(param), + Dispatcher::Union(valkey_job_dispatcher) => valkey_job_dispatcher.send_async(param), + } + } +} + pub struct JobMultiplexer { - dispatchers: Vec>, + dispatchers: Vec>, } fn get_random_item(list: &[T]) -> Option<&T> { @@ -115,8 +353,12 @@ fn get_random_item(list: &[T]) -> Option<&T> { list.get(index) } -impl JobMultiplexer { - pub fn from(dispatchers: Vec>) -> Self { +impl< + I: Serialize + for<'a> Deserialize<'a> + Send + 'static, + O: Serialize + for<'a> Deserialize<'a> + Send + 'static, +> JobMultiplexer +{ + pub fn from(dispatchers: Vec>) -> Self { Self { dispatchers } } @@ -126,15 +368,22 @@ impl JobMultiplexer { } pub fn send_async(&self, param: I) -> JobResult { + log::info!("Sending async reqeust"); let d = get_random_item(&self.dispatchers).unwrap(); d.send_async(param) } } -impl JobMultiplexer { +impl< + I: Clone + Serialize + for<'a> Deserialize<'a> + Send + 'static, + O: Serialize + for<'a> Deserialize<'a> + Send + 'static, +> JobMultiplexer +{ pub fn send_all(&self, param: I) { for d in &self.dispatchers { - let _ = d.send(param.clone()); + if d.is_local() { + let _ = d.send(param.clone()); + } } } } diff --git a/src/lib.rs b/src/lib.rs index d895127..e3bb108 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,8 +8,6 @@ use once_cell::sync::Lazy; pub use serde_json; // TODO : worker docs + refactor -// TODO : worker parallelism (Load Balanced Queue + Multiple Threads) -// TODO : refactor dispatcher backends (memory, valkey) pub static UNION: Lazy< DashMap<&'static str, job::JobMultiplexer>,