postgres
This commit is contained in:
parent
584ffb6b11
commit
1faa3b9668
19 changed files with 1058 additions and 1244 deletions
253
src/variant.rs
253
src/variant.rs
|
@ -1,16 +1,9 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use futures::StreamExt;
|
||||
use mongod::{Model, Sort};
|
||||
use mongodb::bson::doc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
|
||||
use crate::transaction::{Price, Transaction};
|
||||
|
||||
pub fn sort_by_timestamp() -> mongodb::bson::Document {
|
||||
doc! { "timestamp": mongod::Sort::Descending }
|
||||
}
|
||||
use crate::{get_pg, transaction::Transaction};
|
||||
|
||||
pub fn timestamp_range(year: i32, month: u32) -> (i64, i64) {
|
||||
let d = chrono::NaiveDate::from_ymd_opt(year, month, 0).unwrap();
|
||||
|
@ -85,69 +78,51 @@ impl Variant {
|
|||
|
||||
/// Returns the IDs of Transactions from this Item Variant.
|
||||
pub async fn supply_log(&self) -> Vec<String> {
|
||||
let filter = doc! {
|
||||
"item": &self.item,
|
||||
"variant": &self.variant
|
||||
};
|
||||
let res: Vec<(uuid::Uuid,)> = sqlx::query_as(
|
||||
"SELECT id FROM transactions WHERE item = $1 AND variant = $2 ORDER BY created DESC",
|
||||
)
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.fetch_all(get_pg!())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let result = Transaction::find_partial(filter, json!({}), None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut ret = Vec::new();
|
||||
|
||||
for doc in result {
|
||||
ret.push(doc._id);
|
||||
}
|
||||
|
||||
ret
|
||||
res.into_iter().map(|x| x.0.to_string()).collect()
|
||||
}
|
||||
|
||||
/// Returns the active Transaction of this Item Variant which are not yet consumed.
|
||||
pub async fn inventory(&self) -> Vec<Transaction> {
|
||||
let filter = doc! {
|
||||
"item": &self.item,
|
||||
"variant": &self.variant,
|
||||
"consumed": { "$not": { "$type": "object" } }
|
||||
};
|
||||
|
||||
Transaction::find(filter, None, None).await.unwrap()
|
||||
sqlx::query_as("SELECT * FROM transactions WHERE item = $1 AND variant = $2 AND consumed_timestamp IS NULL ORDER BY created DESC")
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.fetch_all(get_pg!()).await.unwrap()
|
||||
}
|
||||
|
||||
/// Returns the IDs of the Transactions from this Item Variant which are consumed.
|
||||
pub async fn demand_log(&self, destination: Option<&str>) -> Vec<String> {
|
||||
let filter = if let Some(dest) = destination {
|
||||
doc! {
|
||||
"item": &self.item,
|
||||
"variant": &self.variant,
|
||||
"consumed": { "destination": dest }
|
||||
}
|
||||
let res: Vec<(uuid::Uuid,)> = if let Some(destination) = destination {
|
||||
sqlx::query_as(
|
||||
"SELECT id FROM transactions WHERE item = $1 AND variant = $2 AND consumed_timestamp IS NOT NULL AND destination = $3 ORDER BY created DESC"
|
||||
)
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.bind(destination)
|
||||
.fetch_all(get_pg!()).await.unwrap()
|
||||
} else {
|
||||
doc! {
|
||||
"item": &self.item,
|
||||
"variant": &self.variant,
|
||||
"consumed": { "$type": "object" }
|
||||
}
|
||||
sqlx::query_as(
|
||||
"SELECT id FROM transactions WHERE item = $1 AND variant = $2 AND consumed_timestamp IS NOT NULL ORDER BY created DESC"
|
||||
)
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.fetch_all(get_pg!()).await.unwrap()
|
||||
};
|
||||
|
||||
let result = Transaction::find_partial(filter, json!({}), None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut ret = Vec::new();
|
||||
|
||||
for doc in result {
|
||||
ret.push(doc._id);
|
||||
}
|
||||
|
||||
ret
|
||||
res.into_iter().map(|x| x.0.to_string()).collect()
|
||||
}
|
||||
|
||||
pub async fn demand(uuid: &str, price: Price, destination: &str) -> Option<Transaction> {
|
||||
pub async fn demand(uuid: &uuid::Uuid, price: f64, destination: &str) -> Option<Transaction> {
|
||||
// check if transaction exists
|
||||
let mut t = Transaction::get(uuid).await?;
|
||||
t = t.consume(price, destination).await;
|
||||
Some(t)
|
||||
let t = Transaction::get(uuid).await?;
|
||||
Some(t.consume(price, destination).await)
|
||||
}
|
||||
|
||||
/// Records a supply transaction in the database.
|
||||
|
@ -162,107 +137,80 @@ impl Variant {
|
|||
/// Returns a UUID string representing the transaction.
|
||||
pub async fn supply(
|
||||
&self,
|
||||
price: Price,
|
||||
price: f64,
|
||||
origin: Option<&str>,
|
||||
location: Option<&str>,
|
||||
note: Option<&str>,
|
||||
) -> Transaction {
|
||||
let t = Transaction::new(&self.item, &self.variant, price, origin, location, note).await;
|
||||
|
||||
t.insert().await.unwrap();
|
||||
|
||||
t
|
||||
Transaction::new(&self.item, &self.variant, price, origin, location, note).await
|
||||
}
|
||||
|
||||
/// Returns all Transactions of this Item Variant
|
||||
pub async fn get_all_transactions(&self) -> Vec<Transaction> {
|
||||
let filter = doc! {
|
||||
"item": &self.item,
|
||||
"variant": &self.variant
|
||||
};
|
||||
|
||||
Transaction::find(filter, None, Some(doc! { "timestamp": Sort::Descending }))
|
||||
.await
|
||||
.unwrap()
|
||||
sqlx::query_as(
|
||||
"SELECT * FROM transactions WHERE item = $1 AND variant = $2 ORDER BY created DESC",
|
||||
)
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.fetch_all(get_pg!())
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub async fn get_transaction_timeslice(&self, year: i32, month: u32) -> Vec<Transaction> {
|
||||
let (start, end) = timestamp_range(year, month);
|
||||
|
||||
Transaction::find(
|
||||
doc! {
|
||||
"timestamp": {
|
||||
"$gte": start,
|
||||
"$lte": end
|
||||
}
|
||||
},
|
||||
None,
|
||||
Some(sort_by_timestamp()),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
sqlx::query_as("SELECT * FROM transactions WHERE created BETWEEN to_timestamp($1) AND to_timestamp($2) ORDER BY created DESC")
|
||||
.bind(start)
|
||||
.bind(end)
|
||||
.fetch_all(get_pg!()).await.unwrap()
|
||||
}
|
||||
|
||||
pub async fn get_unique_origins(&self) -> Vec<String> {
|
||||
unique_flows(
|
||||
&Transaction::unique(
|
||||
doc! {
|
||||
"item": &self.item,
|
||||
"variant": &self.variant
|
||||
},
|
||||
"origin",
|
||||
)
|
||||
.await,
|
||||
)
|
||||
let res: Vec<(String,)> = sqlx::query_as("SELECT DISTINCT(origin) FROM transactions WHERE origin NOT LIKE 'flow::%' AND item = $1 AND variant = $2")
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.fetch_all(get_pg!()).await.unwrap();
|
||||
res.into_iter().map(|x| x.0).collect()
|
||||
}
|
||||
|
||||
pub async fn get_unique_destinations(&self) -> Vec<String> {
|
||||
unique_flows(
|
||||
&Transaction::unique(
|
||||
doc! {
|
||||
"item": &self.item,
|
||||
"variant": &self.variant
|
||||
},
|
||||
"consumed.destination",
|
||||
)
|
||||
.await,
|
||||
)
|
||||
let res: Vec<(String,)> = sqlx::query_as("SELECT DISTINCT(destination) FROM transactions WHERE destination NOT LIKE 'flow::%' AND item = $1 AND variant = $2")
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.fetch_all(get_pg!()).await.unwrap();
|
||||
res.into_iter().map(|x| x.0).collect()
|
||||
}
|
||||
|
||||
pub async fn price_history_by_origin(&self, origin: &str, limit: Option<i64>) -> Vec<Price> {
|
||||
Transaction::find(
|
||||
doc! {
|
||||
"item": &self.item,
|
||||
"variant": &self.variant,
|
||||
"origin": origin
|
||||
},
|
||||
limit,
|
||||
Some(sort_by_timestamp()),
|
||||
pub async fn price_history_by_origin(&self, origin: &str, limit: Option<i64>) -> Vec<f64> {
|
||||
let res: Vec<(f64,)> = sqlx::query_as(
|
||||
&format!("SELECT price FROM transactions WHERE item = $1 AND variant = $2 AND origin = $3 ORDER BY created DESC {}", if let Some(limit) = limit {
|
||||
format!("LIMIT {limit}")
|
||||
} else { String::new() })
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|x| x.price)
|
||||
.collect()
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.bind(origin)
|
||||
.fetch_all(get_pg!()).await.unwrap();
|
||||
res.into_iter().map(|x| x.0).collect()
|
||||
}
|
||||
|
||||
pub async fn get_latest_price(&self, origin: Option<String>) -> Price {
|
||||
let mut filter = doc! {
|
||||
"item": &self.item,
|
||||
"variant": &self.variant
|
||||
};
|
||||
|
||||
pub async fn get_latest_price(&self, origin: Option<String>) -> f64 {
|
||||
if let Some(origin) = origin {
|
||||
filter.insert("origin", origin);
|
||||
let res: (f64,) = sqlx::query_as("SELECT price FROM transactions WHERE item = $1 AND variant = $2 AND origin = $3 ORDER BY created DESC LIMIT 1")
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.bind(origin)
|
||||
.fetch_one(get_pg!()).await.unwrap();
|
||||
res.0
|
||||
} else {
|
||||
let res: (f64,) = sqlx::query_as("SELECT price FROM transactions WHERE item = $1 AND variant = $2 ORDER BY created DESC LIMIT 1")
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.bind(origin)
|
||||
.fetch_one(get_pg!()).await.unwrap();
|
||||
res.0
|
||||
}
|
||||
|
||||
Transaction::find(filter, Some(1), Some(sort_by_timestamp()))
|
||||
.await
|
||||
.unwrap()
|
||||
.first()
|
||||
.unwrap()
|
||||
.price
|
||||
.clone()
|
||||
}
|
||||
|
||||
/// Check if item variant is below minimum. Returns if this is the case and the number needed to fulfill minimum
|
||||
|
@ -280,8 +228,7 @@ impl Variant {
|
|||
pub async fn stat(&self, full: bool) -> serde_json::Value {
|
||||
let active_transactions = self.inventory().await;
|
||||
|
||||
// fix : ignores currency
|
||||
let total_price: f64 = active_transactions.iter().map(|x| x.price.value).sum();
|
||||
let total_price: f64 = active_transactions.iter().map(|x| x.price).sum();
|
||||
|
||||
if !full {
|
||||
return json!({
|
||||
|
@ -290,13 +237,15 @@ impl Variant {
|
|||
});
|
||||
}
|
||||
|
||||
let all_transactions = Transaction::find(
|
||||
doc! { "item": &self.item, "variant": &self.variant},
|
||||
None,
|
||||
None,
|
||||
let all_transactions: Vec<Transaction> = sqlx::query_as(
|
||||
"SELECT * FROM transactions WHERE item = $1 AND variant = $2 ORDER BY created DESC",
|
||||
)
|
||||
.bind(&self.item)
|
||||
.bind(&self.variant)
|
||||
.fetch_all(get_pg!())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut expired_count = 0.0;
|
||||
|
||||
for t in &all_transactions {
|
||||
|
@ -318,7 +267,7 @@ impl Variant {
|
|||
.price_history_by_origin(&origin, None)
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|x| x.value)
|
||||
.map(|x| x)
|
||||
.collect::<Vec<_>>();
|
||||
let prices_len = prices.len() as f64;
|
||||
let prices_summed = prices.into_iter().reduce(|acc, e| acc + e).unwrap_or(0.0);
|
||||
|
@ -349,29 +298,3 @@ impl Variant {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unique_flows(i: &[String]) -> Vec<String> {
|
||||
let mut unique_vec: Vec<String> = Vec::new();
|
||||
|
||||
for s in i {
|
||||
// Check if the string starts with "flow::"
|
||||
if let Some(suffix) = s.strip_prefix("flow::") {
|
||||
// Extract the part after "flow::" and split on "::" to get the kind (ignoring id)
|
||||
let parts: Vec<&str> = suffix.split("::").collect();
|
||||
if let Some(kind) = parts.first() {
|
||||
// Build the common prefix "flow::kind"
|
||||
let common_prefix = format!("flow::{}", kind);
|
||||
|
||||
if !unique_vec.contains(&common_prefix) {
|
||||
unique_vec.push(common_prefix);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// If the string doesn't start with "flow::", retain it
|
||||
// Assumption: Except "flow::" values, everything should be already unique
|
||||
unique_vec.push(s.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
unique_vec
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue