iterated fn + defer + retry

This commit is contained in:
JMARyA 2025-03-10 11:27:20 +01:00
parent c010cc13e8
commit 18c663fcdb
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
11 changed files with 678 additions and 65 deletions

16
src/defer.rs Normal file
View file

@ -0,0 +1,16 @@
pub struct Defer {
f: Box<dyn Fn()>,
}
impl Defer {
pub fn new<F: Fn() + 'static>(f: F) -> Self {
Self { f: Box::new(f) }
}
}
impl Drop for Defer {
fn drop(&mut self) {
log::debug!("Calling defer function");
self.f.as_ref()();
}
}

240
src/iterated.rs Normal file
View file

@ -0,0 +1,240 @@
// TODO : docs
// TODO : measure avg iteration time
use std::{sync::Arc, time::Duration};
use crate::{
job::{JobDispatch, JobDispatcher},
retry,
};
#[derive(Debug, Clone)]
pub struct FunctionContext<C, I, O> {
pub context: C,
pub initial: I,
pub output: Option<O>,
done: bool,
}
impl<C: Default, I, O> FunctionContext<C, I, O> {
pub fn new(input: I) -> FunctionContext<C, I, O> {
FunctionContext {
context: C::default(),
initial: input,
output: None,
done: false,
}
}
}
impl<C, I, O> FunctionContext<C, I, O> {
pub fn state<F: Fn(&mut C)>(&mut self, f: F) {
f(&mut self.context);
}
pub fn done(&mut self, output: O) {
self.done = true;
self.output = Some(output);
}
}
pub struct RunningIteratedFunction<C, I, O> {
output: std::marker::PhantomData<O>,
context: FunctionContext<C, I, O>,
f: Arc<Box<dyn Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O>>>,
}
impl<C, I, O> RunningIteratedFunction<C, I, O> {
pub fn next(mut self) -> Self {
let new_ctx = self.f.as_ref()(self.context);
self.context = new_ctx;
return self;
}
pub fn return_value(&self) -> Option<&O> {
self.context.output.as_ref()
}
pub fn take_return_value(self) -> O {
self.context.output.unwrap()
}
pub fn is_done(&self) -> bool {
self.context.done
}
}
pub struct ThreadRunningIteratedFunction<C: Send + 'static, I: Send + 'static, O: Send + 'static>(
JobDispatcher<IteratedFnQuery, IteratedFnOutput<C, I, O>>,
);
impl<C: Send + 'static, I: Send + 'static, O: Send + 'static>
ThreadRunningIteratedFunction<C, I, O>
{
pub fn start(&self) {
let _ = self.0.try_send(IteratedFnQuery::Start);
}
pub fn pause(&self) {
let _ = self.0.try_send(IteratedFnQuery::Pause);
}
pub fn stop(&self) -> FunctionContext<C, I, O> {
match self.0.try_send(IteratedFnQuery::Stop) {
Some(IteratedFnOutput::Context(function_context)) => function_context,
_ => unreachable!(),
}
}
pub fn try_output(&self) -> Option<O> {
match self.0.send(IteratedFnQuery::GetOutput) {
IteratedFnOutput::Out(out) => Some(out),
_ => None,
}
}
pub fn output(&self) -> O {
retry(|| self.try_output())
}
}
#[derive(Debug, Clone)]
pub enum IteratedFnQuery {
Pause,
Start,
Stop,
GetOutput,
}
pub enum IteratedFnOutput<C, I, O> {
Out(O),
Context(FunctionContext<C, I, O>),
Ok,
}
pub struct IteratedFunction<C, I, O> {
output: std::marker::PhantomData<O>,
f: Arc<Box<dyn Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O>>>,
}
impl<C: Default + Clone + Send, I: Clone + Send + 'static, O: Send + Clone>
IteratedFunction<C, I, O>
{
pub fn new<F: Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O> + 'static>(
f: F,
) -> Self {
Self {
f: Arc::new(Box::new(f)),
output: std::marker::PhantomData,
}
}
pub fn new_threaded<
F: Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O> + Send + 'static,
>(
f: F,
input: I,
) -> ThreadRunningIteratedFunction<C, I, O> {
Self::new_threaded_from_state(f, FunctionContext::new(input))
}
pub fn new_threaded_from_state<
F: Fn(FunctionContext<C, I, O>) -> FunctionContext<C, I, O> + Send + 'static,
>(
f: F,
context: FunctionContext<C, I, O>,
) -> ThreadRunningIteratedFunction<C, I, O> {
let (dispatch, recv) = JobDispatcher::<IteratedFnQuery, IteratedFnOutput<C, I, O>>::new();
let _ = std::thread::spawn(move || {
let f = Self::new(f);
let mut f = f.call_with_context(context);
let mut counter = 0;
let mut sleep = false;
while !f.is_done() {
if sleep {
std::thread::sleep(Duration::from_secs(3));
}
if counter == 5 || sleep {
if let Ok(request) = recv.recv_timeout(Duration::from_millis(300)) {
match request.param {
IteratedFnQuery::Pause => {
log::info!("Paused threaded iterative function");
sleep = true;
}
IteratedFnQuery::Start => {
log::info!("Restarted threaded iterative function");
sleep = false;
}
IteratedFnQuery::Stop => {
log::info!("Ending threaded iterative function");
request.done(IteratedFnOutput::Context(f.context));
return;
}
_ => {}
}
request.done(IteratedFnOutput::Ok);
}
counter = 0;
}
if !sleep {
f = f.next();
}
counter += 1;
}
if f.is_done() {
while let Ok(request) = recv.recv() {
match request.param {
IteratedFnQuery::Stop => {
log::warn!("Function was asked to stop but was already done");
request.done(IteratedFnOutput::Context(f.context.clone()));
}
IteratedFnQuery::GetOutput => {
request.done(IteratedFnOutput::Out(f.context.output.clone().unwrap()));
break;
}
_ => {
request.done(IteratedFnOutput::Out(f.context.output.clone().unwrap()));
}
}
}
}
});
ThreadRunningIteratedFunction(dispatch)
}
pub fn call_with_context(
&self,
ctx: FunctionContext<C, I, O>,
) -> RunningIteratedFunction<C, I, O> {
RunningIteratedFunction {
output: std::marker::PhantomData,
context: ctx,
f: self.f.clone(),
}
}
pub fn call(&self, input: I) -> RunningIteratedFunction<C, I, O> {
RunningIteratedFunction {
output: std::marker::PhantomData,
context: FunctionContext::new(input),
f: self.f.clone(),
}
}
pub fn run_to_end(&self, input: I) -> O {
let mut f = self.call(input);
while !f.is_done() {
f = f.next();
}
return f.take_return_value();
}
}

View file

@ -1,8 +1,12 @@
use std::{sync::mpsc, thread, time::Instant};
mod defer;
pub mod iterated;
pub mod job;
pub mod service;
pub use comrade_macro::worker;
pub use defer::Defer;
pub use comrade_macro::{defer, worker};
pub use crossbeam;
use dashmap::DashMap;
use once_cell::sync::Lazy;
@ -10,16 +14,6 @@ pub use serde_json;
// TODO : worker docs + refactor
// TODO : functions which can be stopped, paused, etc
/*
Example:
let myf = Function::new(|| do_something());
// stop fn
myf.stop();
*/
pub static UNION: Lazy<
DashMap<&'static str, job::JobMultiplexer<serde_json::Value, serde_json::Value>>,
> = Lazy::new(DashMap::new);
@ -63,40 +57,15 @@ where
(fastest_item, fastest_result)
}
// TODO : async version
/*
pub fn rally_async<T: Send + Sync + 'static, F, X: Send + 'static>(items: Vec<T>, f: F) -> (T, X)
where
F: AsyncFn(&T) -> X + Send + Sync + Copy + 'static,
{
let (tx, rx) = mpsc::channel();
let mut handles = Vec::new();
for item in items {
let tx = tx.clone();
let item_ref = item;
let f = f;
tokio::task::spawn()
let handle = thread::spawn(move || {
let start = Instant::now();
let result = f(&item_ref);
let elapsed = start.elapsed();
let _ = tx.send((item_ref, result, elapsed));
});
handles.push(handle);
pub fn retry<O, F: Fn() -> Option<O>>(f: F) -> O {
loop {
match f() {
Some(resp) => {
return resp;
}
None => {
log::info!("Got nothing, retrying...");
}
}
}
drop(tx);
let (fastest_item, fastest_result, _) = rx.recv().unwrap();
for handle in handles {
handle.thread().unpark();
}
(fastest_item, fastest_result)
}
*/