worker union or something
Find a file
2025-03-10 18:59:35 +01:00
comrade-macro iterated fn + defer + retry 2025-03-10 11:27:20 +01:00
examples cron 2025-03-10 18:43:32 +01:00
src more retries 2025-03-10 18:59:35 +01:00
.gitignore init 2025-03-07 04:33:35 +01:00
Cargo.lock cron 2025-03-10 18:43:32 +01:00
Cargo.toml cron 2025-03-10 18:43:32 +01:00
README.md cron 2025-03-10 18:43:32 +01:00

☭ comrade

comrade is a Rust crate designed for managing compute work. It allows seamless management of shared work and functions even across machines.

Features

  • Parallel Execution: Dispatch tasks to run concurrently and gather their results.
  • Rally Execution: Run multiple tasks in parallel and return the result of the fastest one.
  • Service Management: Manage background services with different operating modes (Decay, Daemon).
  • Worker Unions: Delegate tasks using #[worker] annotations locally or as distributed task queues across multiple machines.
  • Background Tasks: Seamlessly run background tasks without blocking the main logic of your program.

Core Concepts

Higher Level Functions

comrade provides various convenient functions.

// process every item in parallel
let results: Vec<i32> = parallel(items, |item: &i32| {
    // ...
});

// rally (return fastest computed result out of items)
// example: run multiple downloads and return the first finished one
let res: (i32, i32) = rally(items, |item: &i32| {
    // ...
});

// Run background tasks without blocking the main thread
background!(|| {
    // Background task logic
    println!("This is a background task!");
});

fn some_fn() {
    println!("Hello World!");

    defer!(|| {
        // this will run at the end of the scope
        println!("Bye World!");
    });

    println!("doing something");
}

// Retry a `Fn() -> Option<X>` until it returns `Some(_)`.
let value: &str = retry(|| {
    if rand::rng().random_range(0..10) > 5 {
        Some("hello")
    } else {
        None
    }
})

// Delayed execution
delay(Duration::from_secs(4), || {
    println!("I will run in 4 seconds from now on!");
});

Service Management

comrade provides a way to manage persistent services with different modes. The Decay mode allows services to die, while the Daemon mode revives them and keeps them running indefinitely.

use comrade::service::ServiceManager;

fn run_services() {
    let mut manager = ServiceManager::new().mode(comrade::service::ServiceMode::Decay);

    // Register and start services
    manager = manager.register("my_service", |_| {
        // Service logic here
    });

    let thread_handle = manager.spawn();
    thread_handle.join().unwrap();
}

Cron Tasks

The ServiceManager also supports running functions periodically or on time:

fn main() {
    let s = ServiceManager::new();
    // Init Cron Manager
    let cron = Cron::new();

    // Add Cron Task
    cron.add_task("4_sec", Schedule::Every(Duration::from_secs(4)), || {
        println!("I run every 4 at {}", chrono::Utc::now());
    });

    cron.add_task("2_sec", Schedule::Every(Duration::from_secs(2)), || {
        println!("I run every 2 seconds at {}", chrono::Utc::now());
    });

    cron.add_task(
        "daily",
        Schedule::Every(Duration::from_secs(60 * 60 * 24)),
        || {
            println!("I run daily");
        },
    );

    // Start running the Cron Manager
    let (s, cron) = s.register_cron(cron.into());
    let s = s.spawn();
    defer!(|| {
        s.join().unwrap();
    });

    // Add another Cron Task after running the manager dynamically
    cron.add_task(
        "future_task",
        Schedule::At(datetime_in(Duration::from_secs(2))),
        || {
            println!("I am in the future");
        },
    );

    // Functionally the same as above
    cron.run_at(datetime_in(Duration::from_secs(3)), || {
        println!("The Future");
    });
}

Worker Unions

You can annotate a function with #[worker] which gives them superpowers. These functions can be queued and dispatched by the system, and their results are returned when completed.

use comrade::{worker};

// Single local worker
#[worker]
pub fn myfn(i: i32) -> i32 {
    i * 2
}

// 4 local worker threads
#[worker(4)]
pub fn multiply(a: i32, b: i32) -> i32 {
    a * b
}

After initialization these functions can then be called anywhere and will be processed eventually by whatever worker picks it up.
Additionally there are new functions derived from your function. See the below example:

fn main() {
    let mut manager = ServiceManager::new().mode(comrade::service::ServiceMode::Decay);

    // Init worker thread on `ServiceManager`
    manager = multiply_init(manager);
    let manager = manager.spawn();

    // works like the original function
    let res = multiply(2, 2);

    // async
    let e = take_time_async(1500);
    
    println!("This will run right after!");
    // ...
    
    // is OUR value ready?
    println!("the value is {}", e.wait());

    // Shutdown worker thread
    multiply_shutdown();
    manager.join().unwrap();
}

You could effeciently run batch work:

fn batch_work() {
    let mut work = Vec::new();

    for i in 0..10 {
        work.push((i.to_string(), multiply_async(i, i)));
    }

    for (label, res) in LabelPendingTaskIterator(work) {
        println!("Finished task {label} -> {res}");
    }
}

These tasks can now be distributed with Valkey.

Make sure you have a Valkey server running and the $VALKEY_URL environment variable is set for your application:

services:
    valkey:
        image: valkey/valkey
        ports:
            - 6379:6379

Then you can spawn worker threads like that:

fn main() {
    let mut s = ServiceManager::new().mode(comrade::service::ServiceMode::Decay);
    s = multiply_init_union(s);
    s = myfn_init_union(s);
    let s = s.spawn();
    
    log::info!("Spawned workers. Working for 1 minute");
    std::thread::sleep(Duration::from_secs(60));

    myfn_shutdown();
    multiply_shutdown();

    s.join().unwrap();
}

When workers are running, you can use them like:

fn main() {
    // Register workers in union
    myfn_register_union();

    // Will be computed somewhere else
    let x = myfn(50);
    println!("x is {x}");
}

Stateful Functions

If you have a workload which can be iteratively computed by modifying state it can be modeled as a IteratedFunction.
These functions can be paused, stopped, saved to disk and revived later.

First define a iterative function:

#[derive(Debug, Default, Clone)]
pub struct AccFnContext {
    pub iter: u64,
    pub acc: u64,
}

pub fn multiply_iterated(
    mut ctx: FunctionContext<AccFnContext, (u64, u64), u64>,
) -> FunctionContext<AccFnContext, (u64, u64), u64> {
    // init
    let (a, b) = ctx.initial;

    // end condition (return)
    if b == ctx.context.iter {
        ctx.done(ctx.context.acc);
        return ctx;
    }

    // computation
    println!("doing calc {}", ctx.context.acc);
    std::thread::sleep(Duration::from_millis(50));
    let val = ctx.context.acc + a;

    // saving state
    ctx.state(|x| {
        x.iter += 1;
        x.acc = val;
    });

    return ctx;
}

Then you can use it like:

fn main() {
    // run normally
    let f = IteratedFunction::new_threaded(multiply_iterated, (5, 5));
    println!("Result is {}", f.output());

    // function starts running
    let f = IteratedFunction::new_threaded(multiply_iterated, (5, 50));

    // pause the function
    f.pause();

    // stop the function and get state
    let state = f.stop();

    // revive and start running again from state
    let f = IteratedFunction::new_threaded_from_state(multiply_iterated, state);

    // get output
    println!("Result is {}", f.output());
}