From 46cf2f457282e0e6e5b3b8b54895917b1bbcd4d9 Mon Sep 17 00:00:00 2001 From: JMARyA Date: Wed, 30 Apr 2025 16:51:49 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20online?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 1 + Cargo.toml | 1 + src/herd.rs | 5 +++++ src/herd_core/mqtt.rs | 17 ++++++++++++++++- src/sheepd_core/mqtt.rs | 2 +- 5 files changed, 24 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5820f28..37a67f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3582,6 +3582,7 @@ dependencies = [ "axum", "axum-client-ip", "based", + "chrono", "dashmap", "hex", "http2", diff --git a/Cargo.toml b/Cargo.toml index 39511ff..9b0f66c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,3 +37,4 @@ rumqttc = { version = "0.24.0", features = ["url", "websocket"] } sage = { git = "https://git.hydrar.de/jmarya/sage" } dashmap = "6.1.0" ulid = { version = "1.2.1", features = ["serde"] } +chrono = "0.4.41" diff --git a/src/herd.rs b/src/herd.rs index 420a16d..e2891cb 100644 --- a/src/herd.rs +++ b/src/herd.rs @@ -4,6 +4,7 @@ use axum::{ }; use axum_client_ip::ClientIpSource; use based::auth::User; +use dashmap::DashMap; use owl::{prelude::*, set_global_db}; use rand::RngCore; use std::{net::SocketAddr, path::PathBuf}; @@ -21,6 +22,8 @@ use tokio::sync::OnceCell; pub static IDENTITY: OnceCell = OnceCell::const_new(); pub static CONFIG: OnceCell = OnceCell::const_new(); +pub static ONLINE: OnceCell>> = OnceCell::const_new(); + fn generate_token() -> String { let mut rng = rand::rng(); let mut token = vec![0u8; 32]; @@ -44,6 +47,8 @@ async fn main() { let config = Config::default(); let _ = crate::CONFIG.set(config); + crate::ONLINE.set(DashMap::new()).unwrap(); + let db = Database::filesystem("./herd/db"); set_global_db!(db); diff --git a/src/herd_core/mqtt.rs b/src/herd_core/mqtt.rs index bd0276f..2b6323e 100644 --- a/src/herd_core/mqtt.rs +++ b/src/herd_core/mqtt.rs @@ -7,6 +7,11 @@ use owl::{Serialize, get, query}; use rumqttc::AsyncClient; use sage::PersonaIdentity; +fn is_within_80_seconds(time: chrono::DateTime) -> bool { + let now = chrono::Utc::now(); + now.signed_duration_since(time).num_seconds() <= 80 +} + /// Handle herd MQTT pub async fn handle_mqtt(topic: String, data: Vec) { log::info!("Received client request from {topic}"); @@ -23,7 +28,17 @@ pub async fn handle_mqtt(topic: String, data: Vec) { match cat { "online" => { - log::info!("Device {client} reported ONLINE"); + if let Some(online) = crate::ONLINE.get().unwrap().get(client) { + if !is_within_80_seconds(*online) { + log::info!("Device {client} came back ONLINE"); + } + } else { + log::info!("Device {client} went ONLINE"); + } + crate::ONLINE + .get() + .unwrap() + .insert(client.to_string(), chrono::Utc::now()); } "respond" => { let resp: ServerResponse = serde_json::from_slice(&dec.payload).unwrap(); diff --git a/src/sheepd_core/mqtt.rs b/src/sheepd_core/mqtt.rs index 188db87..c9b5ce6 100644 --- a/src/sheepd_core/mqtt.rs +++ b/src/sheepd_core/mqtt.rs @@ -62,7 +62,7 @@ pub async fn send_back(client: &AsyncClient, topic: &str, data: T) .encrypt(data.as_bytes(), &rec); let topic = format!("{machine_id}/{topic}"); - log::info!("Publish to {machine_id}{topic}"); + log::info!("Publish to {topic}"); client .publish(topic, rumqttc::QoS::AtMostOnce, true, payload) .await