use std::sync::Arc; use crate::Machine; use crate::api::{ClientAction, ClientActions, ServerResponse}; use owl::prelude::*; use owl::{Serialize, get, query}; use rumqttc::AsyncClient; use sage::PersonaIdentity; pub fn is_within_80_seconds(time: chrono::DateTime) -> bool { let now = chrono::Utc::now(); now.signed_duration_since(time).num_seconds() <= 80 } /// Handle herd MQTT pub async fn handle_mqtt(topic: String, data: Vec) { log::info!("Received client request from {topic}"); let (client, cat) = topic.split_once('/').unwrap(); let mac: Model = get!(client).unwrap(); let dec = crate::IDENTITY .get() .unwrap() .decrypt(&data, &mac.read().identity.sign_key().unwrap()) .unwrap(); // TODO : check for recency match cat { "online" => { if let Some(online) = crate::ONLINE.get().unwrap().get(client) { if !is_within_80_seconds(*online) { log::info!("Device {client} came back ONLINE"); } } else { log::info!("Device {client} went ONLINE"); } crate::ONLINE .get() .unwrap() .insert(client.to_string(), chrono::Utc::now()); } "respond" => { let resp: ServerResponse = serde_json::from_slice(&dec.payload).unwrap(); log::info!("Got response {:?}", resp); let entry = crate::DISPATCH.get().unwrap().get(&resp.id).unwrap(); entry.send(resp); } _ => {} } } pub struct TaskWaiter { pub id: ulid::Ulid, pub recv: crossbeam::channel::Receiver, } impl TaskWaiter { pub async fn wait_for(&self, timeout: std::time::Duration) -> Option { // TODO tokio spawn blocking? self.recv.recv_timeout(timeout).ok() } } /// Send a message to a registered `machine` pub async fn send_msg( client: &AsyncClient, machine: &Model, request: ClientAction, ) -> TaskWaiter { let data = serde_json::to_string(&request).unwrap(); let pk = &machine.read().identity; let rec = pk.enc_key().unwrap(); let machine_id = machine.read().id.to_string().replace("-", ""); let payload = crate::IDENTITY .get() .unwrap() .encrypt(data.as_bytes(), &rec); let topic = format!("{machine_id}/cmd"); client .publish(topic, rumqttc::QoS::AtMostOnce, true, payload) .await .unwrap(); let (sender, recv) = tokio::sync::mpsc::channel(100); crate::DISPATCH.get().unwrap().insert(request.id, sender); TaskWaiter { id: request.id, recv, } } /// Subscribe to all `device->server` topics pub async fn listen_to_device(client: &AsyncClient, machine_id: &str) { // Online Presence client .subscribe(format!("{machine_id}/online"), rumqttc::QoS::AtMostOnce) .await .unwrap(); client .subscribe(format!("{machine_id}/respond"), rumqttc::QoS::AtMostOnce) .await .unwrap(); } /// Subscibe to incoming messages from all registered machines pub async fn listen_to_devices(client: &AsyncClient) { let machines: Vec> = query!(|_| true); for machine in machines { let machine_id = machine.read().id.to_string(); let machine_id = machine_id.trim().replace("-", ""); log::info!("Sub to {machine_id}"); listen_to_device(client, &machine_id).await; } }