Compare commits

...

2 commits

Author SHA1 Message Date
46cf2f4572 online 2025-04-30 16:51:49 +02:00
6e31b95417 client server api 2025-04-30 16:28:48 +02:00
7 changed files with 113 additions and 9 deletions

2
Cargo.lock generated
View file

@ -3582,6 +3582,7 @@ dependencies = [
"axum", "axum",
"axum-client-ip", "axum-client-ip",
"based", "based",
"chrono",
"dashmap", "dashmap",
"hex", "hex",
"http2", "http2",
@ -3596,6 +3597,7 @@ dependencies = [
"toml 0.8.22", "toml 0.8.22",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"ulid",
"ureq", "ureq",
] ]

View file

@ -36,3 +36,5 @@ ureq = { version = "3.0.11", features = ["json"] }
rumqttc = { version = "0.24.0", features = ["url", "websocket"] } rumqttc = { version = "0.24.0", features = ["url", "websocket"] }
sage = { git = "https://git.hydrar.de/jmarya/sage" } sage = { git = "https://git.hydrar.de/jmarya/sage" }
dashmap = "6.1.0" dashmap = "6.1.0"
ulid = { version = "1.2.1", features = ["serde"] }
chrono = "0.4.41"

View file

@ -105,3 +105,43 @@ impl Result {
Self { ok: 0 } Self { ok: 0 }
} }
} }
#[derive(Debug, Serialize, Deserialize)]
pub struct ClientAction {
pub id: ulid::Ulid,
pub action: ClientActions,
}
impl ClientAction {
pub fn new(action: ClientActions) -> Self {
Self {
id: ulid::Ulid::new(),
action,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ClientActions {
OSQuery(String),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ServerResponse {
pub id: ulid::Ulid,
pub response: ServerResponses,
}
impl ServerResponse {
pub fn of(client: &ClientAction, resp: ServerResponses) -> Self {
Self {
id: client.id,
response: resp,
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ServerResponses {
OSQuery(String),
}

View file

@ -4,6 +4,7 @@ use axum::{
}; };
use axum_client_ip::ClientIpSource; use axum_client_ip::ClientIpSource;
use based::auth::User; use based::auth::User;
use dashmap::DashMap;
use owl::{prelude::*, set_global_db}; use owl::{prelude::*, set_global_db};
use rand::RngCore; use rand::RngCore;
use std::{net::SocketAddr, path::PathBuf}; use std::{net::SocketAddr, path::PathBuf};
@ -21,6 +22,8 @@ use tokio::sync::OnceCell;
pub static IDENTITY: OnceCell<Identity> = OnceCell::const_new(); pub static IDENTITY: OnceCell<Identity> = OnceCell::const_new();
pub static CONFIG: OnceCell<Config> = OnceCell::const_new(); pub static CONFIG: OnceCell<Config> = OnceCell::const_new();
pub static ONLINE: OnceCell<DashMap<String, chrono::DateTime<chrono::Utc>>> = OnceCell::const_new();
fn generate_token() -> String { fn generate_token() -> String {
let mut rng = rand::rng(); let mut rng = rand::rng();
let mut token = vec![0u8; 32]; let mut token = vec![0u8; 32];
@ -44,6 +47,8 @@ async fn main() {
let config = Config::default(); let config = Config::default();
let _ = crate::CONFIG.set(config); let _ = crate::CONFIG.set(config);
crate::ONLINE.set(DashMap::new()).unwrap();
let db = Database::filesystem("./herd/db"); let db = Database::filesystem("./herd/db");
set_global_db!(db); set_global_db!(db);

View file

@ -1,9 +1,17 @@
use std::sync::Arc;
use crate::Machine; use crate::Machine;
use crate::api::{ClientAction, ClientActions, ServerResponse};
use owl::prelude::*; use owl::prelude::*;
use owl::{Serialize, get, query}; use owl::{Serialize, get, query};
use rumqttc::AsyncClient; use rumqttc::AsyncClient;
use sage::PersonaIdentity; use sage::PersonaIdentity;
fn is_within_80_seconds(time: chrono::DateTime<chrono::Utc>) -> bool {
let now = chrono::Utc::now();
now.signed_duration_since(time).num_seconds() <= 80
}
/// Handle herd MQTT /// Handle herd MQTT
pub async fn handle_mqtt(topic: String, data: Vec<u8>) { pub async fn handle_mqtt(topic: String, data: Vec<u8>) {
log::info!("Received client request from {topic}"); log::info!("Received client request from {topic}");
@ -17,11 +25,25 @@ pub async fn handle_mqtt(topic: String, data: Vec<u8>) {
.unwrap(); .unwrap();
// TODO : check for recency // TODO : check for recency
println!("got raw: {}", String::from_utf8(dec.payload).unwrap());
match cat { match cat {
"online" => { "online" => {
log::info!("Device {client} reported ONLINE"); if let Some(online) = crate::ONLINE.get().unwrap().get(client) {
if !is_within_80_seconds(*online) {
log::info!("Device {client} came back ONLINE");
}
} else {
log::info!("Device {client} went ONLINE");
}
crate::ONLINE
.get()
.unwrap()
.insert(client.to_string(), chrono::Utc::now());
}
"respond" => {
let resp: ServerResponse = serde_json::from_slice(&dec.payload).unwrap();
log::info!("Got response {:?}", resp);
} }
_ => {} _ => {}
} }
@ -53,6 +75,10 @@ pub async fn listen_to_device(client: &AsyncClient, machine_id: &str) {
.subscribe(format!("{machine_id}/online"), rumqttc::QoS::AtMostOnce) .subscribe(format!("{machine_id}/online"), rumqttc::QoS::AtMostOnce)
.await .await
.unwrap(); .unwrap();
client
.subscribe(format!("{machine_id}/respond"), rumqttc::QoS::AtMostOnce)
.await
.unwrap();
} }
/// Subscibe to incoming messages from all registered machines /// Subscibe to incoming messages from all registered machines

View file

@ -44,6 +44,8 @@ pub async fn join_device(
let machine = Machine::from_join_param(payload); let machine = Machine::from_join_param(payload);
let new_token = machine.token.clone(); let new_token = machine.token.clone();
// TODO : add device listener instantly
save!(machine); save!(machine);
let i = crate::IDENTITY.get().unwrap(); let i = crate::IDENTITY.get().unwrap();

View file

@ -1,7 +1,11 @@
use std::{os, process::Stdio};
use owl::Serialize; use owl::Serialize;
use rumqttc::AsyncClient; use rumqttc::AsyncClient;
use sage::PersonaIdentity; use sage::PersonaIdentity;
use crate::api::{ClientAction, ServerResponse, ServerResponses};
// Client MQTT // Client MQTT
pub async fn handle_mqtt(topic: String, data: Vec<u8>) { pub async fn handle_mqtt(topic: String, data: Vec<u8>) {
//println!("got real raw: {}", String::from_utf8_lossy(&data)); //println!("got real raw: {}", String::from_utf8_lossy(&data));
@ -11,15 +15,38 @@ pub async fn handle_mqtt(topic: String, data: Vec<u8>) {
); );
let pk = pk.sign_key().unwrap(); let pk = pk.sign_key().unwrap();
let payload = crate::IDENTITY.get().unwrap().decrypt(&data, &pk).unwrap(); let payload = crate::IDENTITY.get().unwrap().decrypt(&data, &pk).unwrap();
println!(
"got payload {}", let action: ClientAction = serde_json::from_slice(&payload.payload).unwrap();
String::from_utf8(payload.payload).unwrap() log::info!("Got action {action:?}");
);
match &action.action {
crate::api::ClientActions::OSQuery(query) => {
// TODO : run osquery
log::info!("Doing osquery with {query}");
let res = osquery(&query);
send_back(
crate::MQTT.get().unwrap(),
"respond",
ServerResponse::of(&action, ServerResponses::OSQuery(res)),
)
.await;
}
}
}
pub fn osquery(query: &str) -> String {
let cmd = std::process::Command::new("osqueryi")
.arg("--csv")
.arg(query)
.stdout(Stdio::piped())
.output()
.unwrap();
String::from_utf8(cmd.stdout).unwrap()
} }
/// Send something back to the server on `topic` /// Send something back to the server on `topic`
pub async fn send_back<T: Serialize>(client: &AsyncClient, topic: &str, request: T) { pub async fn send_back<T: Serialize>(client: &AsyncClient, topic: &str, data: T) {
let data = serde_json::to_string(&request).unwrap(); let data = serde_json::to_string(&data).unwrap();
let pk = crate::AGENT.get().unwrap(); let pk = crate::AGENT.get().unwrap();
let pk = (pk.server_age.clone(), String::new()); let pk = (pk.server_age.clone(), String::new());
@ -35,7 +62,7 @@ pub async fn send_back<T: Serialize>(client: &AsyncClient, topic: &str, request:
.encrypt(data.as_bytes(), &rec); .encrypt(data.as_bytes(), &rec);
let topic = format!("{machine_id}/{topic}"); let topic = format!("{machine_id}/{topic}");
log::info!("Publish to {machine_id}{topic}"); log::info!("Publish to {topic}");
client client
.publish(topic, rumqttc::QoS::AtMostOnce, true, payload) .publish(topic, rumqttc::QoS::AtMostOnce, true, payload)
.await .await