diff --git a/Cargo.lock b/Cargo.lock index dba945c..37a67f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3582,6 +3582,7 @@ dependencies = [ "axum", "axum-client-ip", "based", + "chrono", "dashmap", "hex", "http2", @@ -3596,6 +3597,7 @@ dependencies = [ "toml 0.8.22", "tracing", "tracing-subscriber", + "ulid", "ureq", ] diff --git a/Cargo.toml b/Cargo.toml index bcc8e0b..9b0f66c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,3 +36,5 @@ ureq = { version = "3.0.11", features = ["json"] } rumqttc = { version = "0.24.0", features = ["url", "websocket"] } sage = { git = "https://git.hydrar.de/jmarya/sage" } dashmap = "6.1.0" +ulid = { version = "1.2.1", features = ["serde"] } +chrono = "0.4.41" diff --git a/src/api.rs b/src/api.rs index 32e83f7..773401f 100644 --- a/src/api.rs +++ b/src/api.rs @@ -105,3 +105,43 @@ impl Result { Self { ok: 0 } } } + +#[derive(Debug, Serialize, Deserialize)] +pub struct ClientAction { + pub id: ulid::Ulid, + pub action: ClientActions, +} + +impl ClientAction { + pub fn new(action: ClientActions) -> Self { + Self { + id: ulid::Ulid::new(), + action, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum ClientActions { + OSQuery(String), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ServerResponse { + pub id: ulid::Ulid, + pub response: ServerResponses, +} + +impl ServerResponse { + pub fn of(client: &ClientAction, resp: ServerResponses) -> Self { + Self { + id: client.id, + response: resp, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum ServerResponses { + OSQuery(String), +} diff --git a/src/herd.rs b/src/herd.rs index 420a16d..e2891cb 100644 --- a/src/herd.rs +++ b/src/herd.rs @@ -4,6 +4,7 @@ use axum::{ }; use axum_client_ip::ClientIpSource; use based::auth::User; +use dashmap::DashMap; use owl::{prelude::*, set_global_db}; use rand::RngCore; use std::{net::SocketAddr, path::PathBuf}; @@ -21,6 +22,8 @@ use tokio::sync::OnceCell; pub static IDENTITY: OnceCell = OnceCell::const_new(); pub static CONFIG: OnceCell = OnceCell::const_new(); +pub static ONLINE: OnceCell>> = OnceCell::const_new(); + fn generate_token() -> String { let mut rng = rand::rng(); let mut token = vec![0u8; 32]; @@ -44,6 +47,8 @@ async fn main() { let config = Config::default(); let _ = crate::CONFIG.set(config); + crate::ONLINE.set(DashMap::new()).unwrap(); + let db = Database::filesystem("./herd/db"); set_global_db!(db); diff --git a/src/herd_core/mqtt.rs b/src/herd_core/mqtt.rs index 550bac0..2b6323e 100644 --- a/src/herd_core/mqtt.rs +++ b/src/herd_core/mqtt.rs @@ -1,9 +1,17 @@ +use std::sync::Arc; + use crate::Machine; +use crate::api::{ClientAction, ClientActions, ServerResponse}; use owl::prelude::*; use owl::{Serialize, get, query}; use rumqttc::AsyncClient; use sage::PersonaIdentity; +fn is_within_80_seconds(time: chrono::DateTime) -> bool { + let now = chrono::Utc::now(); + now.signed_duration_since(time).num_seconds() <= 80 +} + /// Handle herd MQTT pub async fn handle_mqtt(topic: String, data: Vec) { log::info!("Received client request from {topic}"); @@ -17,11 +25,25 @@ pub async fn handle_mqtt(topic: String, data: Vec) { .unwrap(); // TODO : check for recency - println!("got raw: {}", String::from_utf8(dec.payload).unwrap()); match cat { "online" => { - log::info!("Device {client} reported ONLINE"); + if let Some(online) = crate::ONLINE.get().unwrap().get(client) { + if !is_within_80_seconds(*online) { + log::info!("Device {client} came back ONLINE"); + } + } else { + log::info!("Device {client} went ONLINE"); + } + crate::ONLINE + .get() + .unwrap() + .insert(client.to_string(), chrono::Utc::now()); + } + "respond" => { + let resp: ServerResponse = serde_json::from_slice(&dec.payload).unwrap(); + + log::info!("Got response {:?}", resp); } _ => {} } @@ -53,6 +75,10 @@ pub async fn listen_to_device(client: &AsyncClient, machine_id: &str) { .subscribe(format!("{machine_id}/online"), rumqttc::QoS::AtMostOnce) .await .unwrap(); + client + .subscribe(format!("{machine_id}/respond"), rumqttc::QoS::AtMostOnce) + .await + .unwrap(); } /// Subscibe to incoming messages from all registered machines diff --git a/src/herd_core/route.rs b/src/herd_core/route.rs index b9777d1..60bd1c6 100644 --- a/src/herd_core/route.rs +++ b/src/herd_core/route.rs @@ -44,6 +44,8 @@ pub async fn join_device( let machine = Machine::from_join_param(payload); let new_token = machine.token.clone(); + // TODO : add device listener instantly + save!(machine); let i = crate::IDENTITY.get().unwrap(); diff --git a/src/sheepd_core/mqtt.rs b/src/sheepd_core/mqtt.rs index dea12e6..c9b5ce6 100644 --- a/src/sheepd_core/mqtt.rs +++ b/src/sheepd_core/mqtt.rs @@ -1,7 +1,11 @@ +use std::{os, process::Stdio}; + use owl::Serialize; use rumqttc::AsyncClient; use sage::PersonaIdentity; +use crate::api::{ClientAction, ServerResponse, ServerResponses}; + // Client MQTT pub async fn handle_mqtt(topic: String, data: Vec) { //println!("got real raw: {}", String::from_utf8_lossy(&data)); @@ -11,15 +15,38 @@ pub async fn handle_mqtt(topic: String, data: Vec) { ); let pk = pk.sign_key().unwrap(); let payload = crate::IDENTITY.get().unwrap().decrypt(&data, &pk).unwrap(); - println!( - "got payload {}", - String::from_utf8(payload.payload).unwrap() - ); + + let action: ClientAction = serde_json::from_slice(&payload.payload).unwrap(); + log::info!("Got action {action:?}"); + + match &action.action { + crate::api::ClientActions::OSQuery(query) => { + // TODO : run osquery + log::info!("Doing osquery with {query}"); + let res = osquery(&query); + send_back( + crate::MQTT.get().unwrap(), + "respond", + ServerResponse::of(&action, ServerResponses::OSQuery(res)), + ) + .await; + } + } +} + +pub fn osquery(query: &str) -> String { + let cmd = std::process::Command::new("osqueryi") + .arg("--csv") + .arg(query) + .stdout(Stdio::piped()) + .output() + .unwrap(); + String::from_utf8(cmd.stdout).unwrap() } /// Send something back to the server on `topic` -pub async fn send_back(client: &AsyncClient, topic: &str, request: T) { - let data = serde_json::to_string(&request).unwrap(); +pub async fn send_back(client: &AsyncClient, topic: &str, data: T) { + let data = serde_json::to_string(&data).unwrap(); let pk = crate::AGENT.get().unwrap(); let pk = (pk.server_age.clone(), String::new()); @@ -35,7 +62,7 @@ pub async fn send_back(client: &AsyncClient, topic: &str, request: .encrypt(data.as_bytes(), &rec); let topic = format!("{machine_id}/{topic}"); - log::info!("Publish to {machine_id}{topic}"); + log::info!("Publish to {topic}"); client .publish(topic, rumqttc::QoS::AtMostOnce, true, payload) .await