diff --git a/Cargo.lock b/Cargo.lock index dba945c..5820f28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3596,6 +3596,7 @@ dependencies = [ "toml 0.8.22", "tracing", "tracing-subscriber", + "ulid", "ureq", ] diff --git a/Cargo.toml b/Cargo.toml index bcc8e0b..39511ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,3 +36,4 @@ ureq = { version = "3.0.11", features = ["json"] } rumqttc = { version = "0.24.0", features = ["url", "websocket"] } sage = { git = "https://git.hydrar.de/jmarya/sage" } dashmap = "6.1.0" +ulid = { version = "1.2.1", features = ["serde"] } diff --git a/src/api.rs b/src/api.rs index 32e83f7..773401f 100644 --- a/src/api.rs +++ b/src/api.rs @@ -105,3 +105,43 @@ impl Result { Self { ok: 0 } } } + +#[derive(Debug, Serialize, Deserialize)] +pub struct ClientAction { + pub id: ulid::Ulid, + pub action: ClientActions, +} + +impl ClientAction { + pub fn new(action: ClientActions) -> Self { + Self { + id: ulid::Ulid::new(), + action, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum ClientActions { + OSQuery(String), +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ServerResponse { + pub id: ulid::Ulid, + pub response: ServerResponses, +} + +impl ServerResponse { + pub fn of(client: &ClientAction, resp: ServerResponses) -> Self { + Self { + id: client.id, + response: resp, + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum ServerResponses { + OSQuery(String), +} diff --git a/src/herd_core/mqtt.rs b/src/herd_core/mqtt.rs index 550bac0..bd0276f 100644 --- a/src/herd_core/mqtt.rs +++ b/src/herd_core/mqtt.rs @@ -1,4 +1,7 @@ +use std::sync::Arc; + use crate::Machine; +use crate::api::{ClientAction, ClientActions, ServerResponse}; use owl::prelude::*; use owl::{Serialize, get, query}; use rumqttc::AsyncClient; @@ -17,12 +20,16 @@ pub async fn handle_mqtt(topic: String, data: Vec) { .unwrap(); // TODO : check for recency - println!("got raw: {}", String::from_utf8(dec.payload).unwrap()); match cat { "online" => { log::info!("Device {client} reported ONLINE"); } + "respond" => { + let resp: ServerResponse = serde_json::from_slice(&dec.payload).unwrap(); + + log::info!("Got response {:?}", resp); + } _ => {} } } @@ -53,6 +60,10 @@ pub async fn listen_to_device(client: &AsyncClient, machine_id: &str) { .subscribe(format!("{machine_id}/online"), rumqttc::QoS::AtMostOnce) .await .unwrap(); + client + .subscribe(format!("{machine_id}/respond"), rumqttc::QoS::AtMostOnce) + .await + .unwrap(); } /// Subscibe to incoming messages from all registered machines diff --git a/src/herd_core/route.rs b/src/herd_core/route.rs index b9777d1..60bd1c6 100644 --- a/src/herd_core/route.rs +++ b/src/herd_core/route.rs @@ -44,6 +44,8 @@ pub async fn join_device( let machine = Machine::from_join_param(payload); let new_token = machine.token.clone(); + // TODO : add device listener instantly + save!(machine); let i = crate::IDENTITY.get().unwrap(); diff --git a/src/sheepd_core/mqtt.rs b/src/sheepd_core/mqtt.rs index dea12e6..188db87 100644 --- a/src/sheepd_core/mqtt.rs +++ b/src/sheepd_core/mqtt.rs @@ -1,7 +1,11 @@ +use std::{os, process::Stdio}; + use owl::Serialize; use rumqttc::AsyncClient; use sage::PersonaIdentity; +use crate::api::{ClientAction, ServerResponse, ServerResponses}; + // Client MQTT pub async fn handle_mqtt(topic: String, data: Vec) { //println!("got real raw: {}", String::from_utf8_lossy(&data)); @@ -11,15 +15,38 @@ pub async fn handle_mqtt(topic: String, data: Vec) { ); let pk = pk.sign_key().unwrap(); let payload = crate::IDENTITY.get().unwrap().decrypt(&data, &pk).unwrap(); - println!( - "got payload {}", - String::from_utf8(payload.payload).unwrap() - ); + + let action: ClientAction = serde_json::from_slice(&payload.payload).unwrap(); + log::info!("Got action {action:?}"); + + match &action.action { + crate::api::ClientActions::OSQuery(query) => { + // TODO : run osquery + log::info!("Doing osquery with {query}"); + let res = osquery(&query); + send_back( + crate::MQTT.get().unwrap(), + "respond", + ServerResponse::of(&action, ServerResponses::OSQuery(res)), + ) + .await; + } + } +} + +pub fn osquery(query: &str) -> String { + let cmd = std::process::Command::new("osqueryi") + .arg("--csv") + .arg(query) + .stdout(Stdio::piped()) + .output() + .unwrap(); + String::from_utf8(cmd.stdout).unwrap() } /// Send something back to the server on `topic` -pub async fn send_back(client: &AsyncClient, topic: &str, request: T) { - let data = serde_json::to_string(&request).unwrap(); +pub async fn send_back(client: &AsyncClient, topic: &str, data: T) { + let data = serde_json::to_string(&data).unwrap(); let pk = crate::AGENT.get().unwrap(); let pk = (pk.server_age.clone(), String::new());