add flows

This commit is contained in:
JMARyA 2024-09-12 16:55:36 +02:00
parent 94459d5481
commit 7b6d58be33
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
4 changed files with 290 additions and 26 deletions

View file

@ -1,12 +1,12 @@
use std::collections::HashMap; use std::collections::HashMap;
use mongod::{assert_reference_of, Reference, Validate}; use mongod::{assert_reference_of, reference_of, Reference, Validate};
#[derive(Debug, Clone, Serialize, Deserialize, Model, Referencable)] #[derive(Debug, Clone, Serialize, Deserialize, Model, Referencable)]
pub struct FlowInfo { pub struct FlowInfo {
pub _id: String, pub _id: String,
pub name: String, pub name: String,
pub depends: HashMap<String, i64>, pub depends: Vec<String>,
pub next: Option<String>, pub next: Option<String>,
pub produces: Option<Vec<String>>, pub produces: Option<Vec<String>>,
} }
@ -44,6 +44,29 @@ use mongodb::bson::doc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use crate::item::Item;
use crate::transaction::{Price, Transaction};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DoneInfo {
/// Timestamp when the flow was ended
pub ended: i64,
/// The flow succedding this one
pub next: Option<Reference>,
/// Transactions this flow produced
pub produced: Option<Vec<Reference>>,
}
impl DoneInfo {
pub fn new(next: Option<Reference>) -> Self {
Self {
ended: chrono::Utc::now().timestamp(),
next,
produced: None,
}
}
}
/// A production flow /// A production flow
#[derive(Debug, Clone, Serialize, Deserialize, Model, Referencable)] #[derive(Debug, Clone, Serialize, Deserialize, Model, Referencable)]
pub struct Flow { pub struct Flow {
@ -51,20 +74,110 @@ pub struct Flow {
pub _id: String, pub _id: String,
/// Tiemstamp when the flow was started /// Tiemstamp when the flow was started
pub started: i64, pub started: i64,
/// Timestamp when the flow was ended
pub ended: i64,
/// Kind of flow; ID of the describing JSON /// Kind of flow; ID of the describing JSON
pub kind: String, pub kind: Reference,
/// The flow succedding this one /// Input transactions
pub next: Option<Reference>, pub input: Option<Vec<Reference>>,
/// Information when a flow is done
pub done: Option<DoneInfo>,
} }
impl Validate for Flow { impl Validate for Flow {
async fn validate(&self) -> Result<(), String> { async fn validate(&self) -> Result<(), String> {
if let Some(next) = &self.next { assert_reference_of!(self.kind, FlowInfo);
assert_reference_of!(next, Flow);
if let Some(input) = &self.input {
for t in input {
assert_reference_of!(t, Transaction);
}
}
if let Some(done) = &self.done {
if let Some(next) = &done.next {
assert_reference_of!(next, Flow);
}
if let Some(produced) = &done.produced {
for prod in produced {
assert_reference_of!(prod, Transaction);
}
}
} }
Ok(()) Ok(())
} }
} }
impl Flow {
pub async fn create(kind: &str, input: Option<Vec<Reference>>) -> Self {
let f = Self {
_id: uuid::Uuid::new_v4().to_string(),
started: chrono::Utc::now().timestamp(),
kind: reference_of!(FlowInfo, kind).unwrap(),
input: input,
done: None,
};
f.insert().await.unwrap();
f
}
pub async fn end(self) -> Self {
self.change()
.done(Some(DoneInfo::new(None)))
.update()
.await
.unwrap()
}
pub async fn end_with_produce(
self,
produced: &[HashMap<String, u64>],
) -> HashMap<String, Vec<String>> {
let mut ret = HashMap::new();
let mut produced_ref = Vec::with_capacity(ret.len());
for prod in produced {
for (item_variant, amount) in prod {
let (item, variant) = item_variant.split_once("::").unwrap();
for _ in 0..*amount {
let t = Item::get(item)
.await
.unwrap()
.variant(variant)
.unwrap()
.supply(
Price::zero(),
Some(&format!("flow::{}::{}", self.kind.id(), self._id)),
None,
)
.await;
ret.entry(item_variant.clone())
.or_insert(Vec::new())
.push(t._id.clone());
produced_ref.push(t.reference());
}
}
}
self.change()
.done(Some(DoneInfo {
ended: chrono::Utc::now().timestamp(),
next: None,
produced: Some(produced_ref),
}))
.update()
.await
.unwrap();
ret
}
pub async fn continue_next(self, next_flow: &Flow) -> Self {
self.change()
.done(Some(DoneInfo::new(Some(next_flow.reference()))))
.update()
.await
.unwrap()
}
}

View file

@ -1,17 +1,21 @@
use std::collections::HashMap; use std::collections::HashMap;
use mongod::ToAPI; use mongod::{Model, Referencable, ToAPI};
use rocket::{get, State}; use rocket::{get, post, serde::json::Json, State};
use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use crate::{ use crate::{
check_auth, check_auth,
config::Config, config::Config,
flow::FlowInfo, flow::{Flow, FlowInfo},
json_store::JSONStore, json_store::JSONStore,
routes::{api_error, FallibleApiResponse, Token}, routes::{api_error, FallibleApiResponse, Token},
transaction::{Price, Transaction},
}; };
use super::ApiError;
#[get("/flow/<id>/info")] #[get("/flow/<id>/info")]
pub async fn flow_info( pub async fn flow_info(
id: &str, id: &str,
@ -42,3 +46,113 @@ pub async fn flows_list(
Ok(json!(ret)) Ok(json!(ret))
} }
pub async fn create_flow(
kind: &str,
flows: &State<JSONStore<FlowInfo>>,
form: &CreateFlow,
) -> Result<Flow, ApiError> {
let info = flows.get(kind).ok_or_else(|| api_error("Unknown Flow"))?;
let mut input_ref = Vec::new();
// verify valid input transactions
if let Some(input) = &form.input {
for t in input {
let t = Transaction::get(t)
.await
.ok_or_else(|| api_error(&format!("No such transaction {t}")))?;
let item_variant = format!("{}::{}", t.item, t.variant);
let valid = info.depends.iter().any(|x| item_variant.starts_with(x));
if !valid {
return Err(api_error(&format!(
"Invalid item variant. This flow only accepts {:?}",
info.depends
)));
}
input_ref.push(t);
}
}
let flow = Flow::create(
kind,
if input_ref.is_empty() {
None
} else {
Some(input_ref.iter().map(|x| x.reference()).collect())
},
)
.await;
for t in input_ref {
t.consume(Price::zero(), &format!("flow::{kind}::{}", flow._id))
.await;
}
Ok(flow)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateFlow {
pub input: Option<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EndFlow {
pub produced: Option<Vec<HashMap<String, u64>>>,
}
#[post("/flow/<id>", data = "<form>")]
pub async fn create_flow_route(
id: &str,
form: Json<CreateFlow>,
flows: &State<JSONStore<FlowInfo>>,
) -> FallibleApiResponse {
let flow = create_flow(id, flows, &form).await?;
Ok(json!({"uuid": flow._id }))
}
#[post("/flow/<id>/end", data = "<form>")]
pub async fn end_flow_route(id: &str, form: Json<EndFlow>) -> FallibleApiResponse {
let flow = Flow::get(id)
.await
.ok_or_else(|| api_error("Flow not found"))?;
if let Some(produced) = &form.produced {
// todo : add transactions
let prod = flow.end_with_produce(produced).await;
Ok(json!({"produced": prod}))
} else {
flow.end().await;
Ok(json!({"ok": 1}))
}
}
#[post("/flow/<id>/continue", data = "<form>")]
pub async fn continue_flow_route(
id: &str,
flows: &State<JSONStore<FlowInfo>>,
form: Json<CreateFlow>,
) -> FallibleApiResponse {
let this_flow = Flow::get(id)
.await
.ok_or_else(|| api_error("Flow not found"))?;
// create next flow
let next_kind = flows
.get(this_flow.kind.id())
.ok_or_else(|| api_error("Flow not found"))?
.next
.clone()
.ok_or_else(|| api_error("Flow has no continuation"))?;
let next_flow = create_flow(&next_kind, flows, &form).await?;
// end current flow
this_flow.continue_next(&next_flow).await;
Ok(json!({"uuid": next_flow._id}))
}

View file

@ -189,6 +189,13 @@ impl Price {
} }
} }
pub fn zero() -> Self {
Self {
value: 0.00,
currency: String::new(),
}
}
fn parse(price: &str) -> Option<Self> { fn parse(price: &str) -> Option<Self> {
let (value, currency) = price.split_once(' ')?; let (value, currency) = price.split_once(' ')?;

View file

@ -196,25 +196,29 @@ impl Variant {
} }
pub async fn get_unique_origins(&self) -> Vec<String> { pub async fn get_unique_origins(&self) -> Vec<String> {
Transaction::unique( unique_flows(
doc! { &Transaction::unique(
"item": &self.item, doc! {
"variant": &self.variant "item": &self.item,
}, "variant": &self.variant
"origin", },
"origin",
)
.await,
) )
.await
} }
pub async fn get_unique_destinations(&self) -> Vec<String> { pub async fn get_unique_destinations(&self) -> Vec<String> {
Transaction::unique( unique_flows(
doc! { &Transaction::unique(
"item": &self.item, doc! {
"variant": &self.variant "item": &self.item,
}, "variant": &self.variant
"consumed.destination", },
"consumed.destination",
)
.await,
) )
.await
} }
pub async fn price_history_by_origin(&self, origin: &str) -> Vec<Price> { pub async fn price_history_by_origin(&self, origin: &str) -> Vec<Price> {
@ -287,3 +291,29 @@ impl Variant {
}) })
} }
} }
pub fn unique_flows(i: &[String]) -> Vec<String> {
let mut unique_vec: Vec<String> = Vec::new();
for s in i {
// Check if the string starts with "flow::"
if let Some(suffix) = s.strip_prefix("flow::") {
// Extract the part after "flow::" and split on "::" to get the kind (ignoring id)
let parts: Vec<&str> = suffix.split("::").collect();
if let Some(kind) = parts.get(0) {
// Build the common prefix "flow::kind"
let common_prefix = format!("flow::{}", kind);
if !unique_vec.contains(&common_prefix) {
unique_vec.push(common_prefix);
}
}
} else {
// If the string doesn't start with "flow::", retain it
// Assumption: Except "flow::" values, everything should be already unique
unique_vec.push(s.to_string());
}
}
unique_vec
}