1
0
Fork 0
forked from navos/sheepd
sheepd/src/herd_core/mqtt.rs
2025-05-06 15:59:54 +02:00

135 lines
3.8 KiB
Rust

use std::sync::Arc;
use crate::Machine;
use crate::api::{ClientAction, ClientActions, ServerResponse};
use owl::prelude::*;
use owl::{Serialize, get, query};
use rumqttc::AsyncClient;
use sage::PersonaIdentity;
pub fn is_within_80_seconds(time: chrono::DateTime<chrono::Utc>) -> bool {
let now = chrono::Utc::now();
now.signed_duration_since(time).num_seconds() <= 80
}
/// Handle herd MQTT
pub async fn handle_mqtt(topic: String, data: Vec<u8>) {
log::info!("Received client request from {topic}");
let (client, cat) = topic.split_once('/').unwrap();
let mac: Model<Machine> = get!(client).unwrap();
let dec = crate::IDENTITY
.get()
.unwrap()
.decrypt(&data, &mac.read().identity.sign_key().unwrap())
.unwrap();
// TODO : check for recency
match cat {
"online" => {
if let Some(online) = crate::ONLINE.get().unwrap().get(client) {
if !is_within_80_seconds(*online) {
log::info!("Device {client} came back ONLINE");
}
} else {
log::info!("Device {client} went ONLINE");
}
crate::ONLINE
.get()
.unwrap()
.insert(client.to_string(), chrono::Utc::now());
}
"respond" => {
let resp: ServerResponse = serde_json::from_slice(&dec.payload).unwrap();
log::info!("Got response {:?}", resp);
let (id, entry) = crate::DISPATCH
.get()
.unwrap()
.remove(&resp.id.to_string())
.unwrap();
if entry.send(resp).is_err() {
log::error!(
"Could not send back response for action {id}. Probably due to timeout"
);
}
}
_ => {}
}
}
pub struct TaskWaiter {
pub id: ulid::Ulid,
pub recv: tokio::sync::oneshot::Receiver<ServerResponse>,
}
impl TaskWaiter {
pub async fn wait_for(self, timeout: std::time::Duration) -> Option<ServerResponse> {
if let Ok(in_time) = tokio::time::timeout(timeout, self.recv).await {
return in_time.ok();
}
None
}
}
/// Send a message to a registered `machine`
pub async fn send_msg(
client: &AsyncClient,
machine: &Model<Machine>,
request: ClientAction,
) -> TaskWaiter {
let data = serde_json::to_string(&request).unwrap();
let pk = &machine.read().identity;
let rec = pk.enc_key().unwrap();
let machine_id = machine.read().id.to_string().replace("-", "");
let payload = crate::IDENTITY
.get()
.unwrap()
.encrypt(data.as_bytes(), &rec);
let topic = format!("{machine_id}/cmd");
client
.publish(topic, rumqttc::QoS::AtMostOnce, true, payload)
.await
.unwrap();
let (sender, recv) = tokio::sync::oneshot::channel();
crate::DISPATCH
.get()
.unwrap()
.insert(request.id.to_string(), sender);
TaskWaiter {
id: request.id,
recv,
}
}
/// Subscribe to all `device->server` topics
pub async fn listen_to_device(client: &AsyncClient, machine_id: &str) {
// Online Presence
client
.subscribe(format!("{machine_id}/online"), rumqttc::QoS::AtMostOnce)
.await
.unwrap();
client
.subscribe(format!("{machine_id}/respond"), rumqttc::QoS::AtMostOnce)
.await
.unwrap();
}
/// Subscibe to incoming messages from all registered machines
pub async fn listen_to_devices(client: &AsyncClient) {
let machines: Vec<Model<Machine>> = query!(|_| true);
for machine in machines {
let machine_id = machine.read().id.to_string();
let machine_id = machine_id.trim().replace("-", "");
log::info!("Sub to {machine_id}");
listen_to_device(client, &machine_id).await;
}
}