From 7b6d58be334e972a2b55e5ba1f46743d7f143a1e Mon Sep 17 00:00:00 2001 From: JMARyA Date: Thu, 12 Sep 2024 16:55:36 +0200 Subject: [PATCH] add flows --- src/flow.rs | 131 +++++++++++++++++++++++++++++++++++++++++---- src/routes/flow.rs | 120 +++++++++++++++++++++++++++++++++++++++-- src/transaction.rs | 7 +++ src/variant.rs | 58 +++++++++++++++----- 4 files changed, 290 insertions(+), 26 deletions(-) diff --git a/src/flow.rs b/src/flow.rs index 54a0b02..69e6e2f 100644 --- a/src/flow.rs +++ b/src/flow.rs @@ -1,12 +1,12 @@ use std::collections::HashMap; -use mongod::{assert_reference_of, Reference, Validate}; +use mongod::{assert_reference_of, reference_of, Reference, Validate}; #[derive(Debug, Clone, Serialize, Deserialize, Model, Referencable)] pub struct FlowInfo { pub _id: String, pub name: String, - pub depends: HashMap, + pub depends: Vec, pub next: Option, pub produces: Option>, } @@ -44,6 +44,29 @@ use mongodb::bson::doc; use serde::{Deserialize, Serialize}; use serde_json::json; +use crate::item::Item; +use crate::transaction::{Price, Transaction}; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DoneInfo { + /// Timestamp when the flow was ended + pub ended: i64, + /// The flow succedding this one + pub next: Option, + /// Transactions this flow produced + pub produced: Option>, +} + +impl DoneInfo { + pub fn new(next: Option) -> Self { + Self { + ended: chrono::Utc::now().timestamp(), + next, + produced: None, + } + } +} + /// A production flow #[derive(Debug, Clone, Serialize, Deserialize, Model, Referencable)] pub struct Flow { @@ -51,20 +74,110 @@ pub struct Flow { pub _id: String, /// Tiemstamp when the flow was started pub started: i64, - /// Timestamp when the flow was ended - pub ended: i64, /// Kind of flow; ID of the describing JSON - pub kind: String, - /// The flow succedding this one - pub next: Option, + pub kind: Reference, + /// Input transactions + pub input: Option>, + /// Information when a flow is done + pub done: Option, } impl Validate for Flow { async fn validate(&self) -> Result<(), String> { - if let Some(next) = &self.next { - assert_reference_of!(next, Flow); + assert_reference_of!(self.kind, FlowInfo); + + if let Some(input) = &self.input { + for t in input { + assert_reference_of!(t, Transaction); + } + } + + if let Some(done) = &self.done { + if let Some(next) = &done.next { + assert_reference_of!(next, Flow); + } + if let Some(produced) = &done.produced { + for prod in produced { + assert_reference_of!(prod, Transaction); + } + } } Ok(()) } } + +impl Flow { + pub async fn create(kind: &str, input: Option>) -> Self { + let f = Self { + _id: uuid::Uuid::new_v4().to_string(), + started: chrono::Utc::now().timestamp(), + kind: reference_of!(FlowInfo, kind).unwrap(), + input: input, + done: None, + }; + + f.insert().await.unwrap(); + + f + } + + pub async fn end(self) -> Self { + self.change() + .done(Some(DoneInfo::new(None))) + .update() + .await + .unwrap() + } + + pub async fn end_with_produce( + self, + produced: &[HashMap], + ) -> HashMap> { + let mut ret = HashMap::new(); + let mut produced_ref = Vec::with_capacity(ret.len()); + + for prod in produced { + for (item_variant, amount) in prod { + let (item, variant) = item_variant.split_once("::").unwrap(); + for _ in 0..*amount { + let t = Item::get(item) + .await + .unwrap() + .variant(variant) + .unwrap() + .supply( + Price::zero(), + Some(&format!("flow::{}::{}", self.kind.id(), self._id)), + None, + ) + .await; + ret.entry(item_variant.clone()) + .or_insert(Vec::new()) + .push(t._id.clone()); + produced_ref.push(t.reference()); + } + } + } + + self.change() + .done(Some(DoneInfo { + ended: chrono::Utc::now().timestamp(), + next: None, + produced: Some(produced_ref), + })) + .update() + .await + .unwrap(); + + ret + } + + pub async fn continue_next(self, next_flow: &Flow) -> Self { + self.change() + .done(Some(DoneInfo::new(Some(next_flow.reference())))) + .update() + .await + .unwrap() + } +} diff --git a/src/routes/flow.rs b/src/routes/flow.rs index 069aab0..2f6f0b1 100644 --- a/src/routes/flow.rs +++ b/src/routes/flow.rs @@ -1,17 +1,21 @@ use std::collections::HashMap; -use mongod::ToAPI; -use rocket::{get, State}; +use mongod::{Model, Referencable, ToAPI}; +use rocket::{get, post, serde::json::Json, State}; +use serde::{Deserialize, Serialize}; use serde_json::json; use crate::{ check_auth, config::Config, - flow::FlowInfo, + flow::{Flow, FlowInfo}, json_store::JSONStore, routes::{api_error, FallibleApiResponse, Token}, + transaction::{Price, Transaction}, }; +use super::ApiError; + #[get("/flow//info")] pub async fn flow_info( id: &str, @@ -42,3 +46,113 @@ pub async fn flows_list( Ok(json!(ret)) } + +pub async fn create_flow( + kind: &str, + flows: &State>, + form: &CreateFlow, +) -> Result { + let info = flows.get(kind).ok_or_else(|| api_error("Unknown Flow"))?; + + let mut input_ref = Vec::new(); + + // verify valid input transactions + if let Some(input) = &form.input { + for t in input { + let t = Transaction::get(t) + .await + .ok_or_else(|| api_error(&format!("No such transaction {t}")))?; + + let item_variant = format!("{}::{}", t.item, t.variant); + + let valid = info.depends.iter().any(|x| item_variant.starts_with(x)); + + if !valid { + return Err(api_error(&format!( + "Invalid item variant. This flow only accepts {:?}", + info.depends + ))); + } + + input_ref.push(t); + } + } + + let flow = Flow::create( + kind, + if input_ref.is_empty() { + None + } else { + Some(input_ref.iter().map(|x| x.reference()).collect()) + }, + ) + .await; + + for t in input_ref { + t.consume(Price::zero(), &format!("flow::{kind}::{}", flow._id)) + .await; + } + + Ok(flow) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CreateFlow { + pub input: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EndFlow { + pub produced: Option>>, +} + +#[post("/flow/", data = "
")] +pub async fn create_flow_route( + id: &str, + form: Json, + flows: &State>, +) -> FallibleApiResponse { + let flow = create_flow(id, flows, &form).await?; + Ok(json!({"uuid": flow._id })) +} + +#[post("/flow//end", data = "")] +pub async fn end_flow_route(id: &str, form: Json) -> FallibleApiResponse { + let flow = Flow::get(id) + .await + .ok_or_else(|| api_error("Flow not found"))?; + + if let Some(produced) = &form.produced { + // todo : add transactions + let prod = flow.end_with_produce(produced).await; + Ok(json!({"produced": prod})) + } else { + flow.end().await; + Ok(json!({"ok": 1})) + } +} + +#[post("/flow//continue", data = "")] +pub async fn continue_flow_route( + id: &str, + flows: &State>, + form: Json, +) -> FallibleApiResponse { + let this_flow = Flow::get(id) + .await + .ok_or_else(|| api_error("Flow not found"))?; + + // create next flow + let next_kind = flows + .get(this_flow.kind.id()) + .ok_or_else(|| api_error("Flow not found"))? + .next + .clone() + .ok_or_else(|| api_error("Flow has no continuation"))?; + let next_flow = create_flow(&next_kind, flows, &form).await?; + + // end current flow + this_flow.continue_next(&next_flow).await; + + Ok(json!({"uuid": next_flow._id})) +} diff --git a/src/transaction.rs b/src/transaction.rs index 4a58772..84d9965 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -189,6 +189,13 @@ impl Price { } } + pub fn zero() -> Self { + Self { + value: 0.00, + currency: String::new(), + } + } + fn parse(price: &str) -> Option { let (value, currency) = price.split_once(' ')?; diff --git a/src/variant.rs b/src/variant.rs index e19c154..3ce32df 100644 --- a/src/variant.rs +++ b/src/variant.rs @@ -196,25 +196,29 @@ impl Variant { } pub async fn get_unique_origins(&self) -> Vec { - Transaction::unique( - doc! { - "item": &self.item, - "variant": &self.variant - }, - "origin", + unique_flows( + &Transaction::unique( + doc! { + "item": &self.item, + "variant": &self.variant + }, + "origin", + ) + .await, ) - .await } pub async fn get_unique_destinations(&self) -> Vec { - Transaction::unique( - doc! { - "item": &self.item, - "variant": &self.variant - }, - "consumed.destination", + unique_flows( + &Transaction::unique( + doc! { + "item": &self.item, + "variant": &self.variant + }, + "consumed.destination", + ) + .await, ) - .await } pub async fn price_history_by_origin(&self, origin: &str) -> Vec { @@ -287,3 +291,29 @@ impl Variant { }) } } + +pub fn unique_flows(i: &[String]) -> Vec { + let mut unique_vec: Vec = Vec::new(); + + for s in i { + // Check if the string starts with "flow::" + if let Some(suffix) = s.strip_prefix("flow::") { + // Extract the part after "flow::" and split on "::" to get the kind (ignoring id) + let parts: Vec<&str> = suffix.split("::").collect(); + if let Some(kind) = parts.get(0) { + // Build the common prefix "flow::kind" + let common_prefix = format!("flow::{}", kind); + + if !unique_vec.contains(&common_prefix) { + unique_vec.push(common_prefix); + } + } + } else { + // If the string doesn't start with "flow::", retain it + // Assumption: Except "flow::" values, everything should be already unique + unique_vec.push(s.to_string()); + } + } + + unique_vec +}