encrypted msg + online reporting + refactor

This commit is contained in:
JMARyA 2025-04-30 09:35:21 +02:00
parent 125d50530d
commit a567214f58
19 changed files with 318 additions and 304 deletions

View file

@ -1,17 +1,6 @@
use age::{Recipient, secrecy::ExposeSecret};
use based::auth::{Sessions, User};
use minisign::PublicKey;
use owl::{Deserialize, Serialize};
use owl::{prelude::*, query, save, set_global_db};
use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, Transport};
use std::io::{Read, Seek};
use std::time::Duration;
use std::{
io::{BufReader, Cursor},
net::SocketAddr,
path::PathBuf,
str::FromStr,
};
use tokio::time::sleep;
#[derive(Deserialize, Serialize)]
@ -57,7 +46,11 @@ pub fn mqtt_connect(machine_id: &str, mqtt: &str) -> (rumqttc::AsyncClient, rumq
AsyncClient::new(mqttoptions, 10)
}
pub async fn run_event_loop(mut eventloop: EventLoop) {
pub async fn run_event_loop<F, Fut>(mut eventloop: EventLoop, handle_payload: F)
where
F: Fn(String, Vec<u8>) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = ()> + Send + 'static,
{
log::info!("Handling MQTT events");
loop {
match eventloop.poll().await {
@ -65,9 +58,9 @@ pub async fn run_event_loop(mut eventloop: EventLoop) {
log::trace!("Incoming = {:?}", incoming);
match incoming {
Packet::Publish(publish) => {
log::info!("Got payload with size {}", publish.size());
let s = publish.payload;
// TODO : client decryption here
println!("got payload {}", String::from_utf8(s.to_vec()).unwrap());
tokio::spawn(handle_payload(publish.topic, s.to_vec()));
}
_ => {}
}
@ -83,103 +76,19 @@ pub async fn run_event_loop(mut eventloop: EventLoop) {
}
}
pub struct Identity {
pub age: age::x25519::Identity,
pub sign: minisign::KeyPair,
#[derive(Deserialize, Serialize)]
pub struct Result {
pub ok: u32,
}
fn save_parts(part1: &[u8], part2: &[u8]) -> Vec<u8> {
let mut vec = Vec::new();
let offset = part1.len() as u32; // assume it fits into u32
vec.extend_from_slice(&offset.to_le_bytes()); // or to_be_bytes() for big-endian
vec.extend_from_slice(part1);
vec.extend_from_slice(part2);
vec
}
fn load_parts(data: &[u8]) -> (Vec<u8>, Vec<u8>) {
let offset_bytes: [u8; 4] = data[..4].try_into().unwrap();
let offset = u32::from_le_bytes(offset_bytes) as usize;
let part1 = data[4..4 + offset].to_vec();
let part2 = data[4 + offset..].to_vec();
(part1, part2)
}
impl Identity {
pub fn new() -> Self {
let age = age::x25519::Identity::generate();
let sign = minisign::KeyPair::generate_encrypted_keypair(Some(String::new())).unwrap();
Self { age, sign }
impl Result {
#[allow(non_snake_case)]
pub fn Ok() -> Self {
Self { ok: 1 }
}
pub fn public(&self) -> (String, String) {
(self.pub_key_age(), self.pub_key_sign())
}
pub fn pub_key_age(&self) -> String {
self.age.to_public().to_string()
}
pub fn pub_key_sign(&self) -> String {
self.sign.pk.to_box().unwrap().to_string()
}
pub fn save(&self, dir: &PathBuf) {
let age_key = self.age.to_string();
std::fs::write(dir.join("age.key"), age_key.expose_secret().to_string()).unwrap();
let kbx = self.sign.sk.to_box(None).unwrap();
std::fs::write(dir.join("sign.key"), kbx.into_string()).unwrap();
log::info!("Saved identity to {dir:?}");
}
pub fn try_load(dir: &PathBuf) -> Option<Self> {
let age =
age::x25519::Identity::from_str(&std::fs::read_to_string(dir.join("age.key")).ok()?)
.unwrap();
let kbx = minisign::SecretKeyBox::from_string(
&std::fs::read_to_string(dir.join("sign.key")).ok()?,
)
.unwrap();
let sign = kbx.into_secret_key(Some(String::new())).unwrap();
let pk = minisign::PublicKey::from_secret_key(&sign).ok()?;
let kp = minisign::KeyPair { pk: pk, sk: sign };
log::info!("Loaded identity from {dir:?}");
Some(Self { age, sign: kp })
}
pub fn encrypt(&self, data: &[u8], recipient: &impl Recipient) -> Vec<u8> {
let signed =
minisign::sign(None, &self.sign.sk, data, Some("mynameisanubis"), None).unwrap();
let signed = save_parts(&signed.to_bytes(), data);
let enc = age::encrypt(recipient, &signed).unwrap();
enc
}
pub fn decrypt(&self, data: &[u8], pk: &PublicKey) -> Option<Vec<u8>> {
let dec = age::decrypt(&self.age, data).unwrap();
let (sig, data) = load_parts(&dec);
let sign_box =
minisign::SignatureBox::from_string(&String::from_utf8(sig).unwrap()).unwrap();
if minisign::verify(
pk,
&sign_box,
BufReader::new(Cursor::new(data.clone())),
true,
false,
false,
)
.is_ok()
{
return Some(data);
}
None
#[allow(non_snake_case)]
pub fn Err() -> Self {
Self { ok: 0 }
}
}

78
src/herd.rs Normal file
View file

@ -0,0 +1,78 @@
use axum::{
Router,
routing::{get, post},
};
use axum_client_ip::ClientIpSource;
use based::auth::User;
use owl::{prelude::*, set_global_db};
use rand::RngCore;
use std::{net::SocketAddr, path::PathBuf};
mod api;
mod herd_core;
use crate::herd_core::mqtt::{handle_mqtt, listen_to_devices};
use herd_core::model::Machine;
use herd_core::{
config::Config,
route::{join_device, login_user},
};
use sage::Identity;
use tokio::sync::OnceCell;
pub static IDENTITY: OnceCell<Identity> = OnceCell::const_new();
pub static CONFIG: OnceCell<Config> = OnceCell::const_new();
fn generate_token() -> String {
let mut rng = rand::rng();
let mut token = vec![0u8; 32];
rng.fill_bytes(&mut token);
hex::encode(token)
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let i = if let Some(i) = Identity::try_load(&PathBuf::from("./herd")) {
i
} else {
let i = Identity::new();
i.save(&PathBuf::from("./herd"));
i
};
let _ = crate::IDENTITY.set(i);
let config = Config::default();
let _ = crate::CONFIG.set(config);
let db = Database::filesystem("./herd/db");
set_global_db!(db);
let _ = User::create("admin".to_string(), "admin", based::auth::UserRole::Admin).await;
let device = Router::new()
.route("/join", post(join_device))
.layer(ClientIpSource::ConnectInfo.into_extension()); // Direct IP
// .layer(ClientIpSource::XRealIp.into_extension()) // Proxy
let user = Router::new().route("/login", post(login_user));
let app = Router::new().merge(device).merge(user);
log::info!("Starting server");
let (client, eventloop) = api::mqtt_connect("server", &crate::CONFIG.get().unwrap().mqtt);
listen_to_devices(&client).await;
tokio::spawn(async {
let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap();
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await
.unwrap();
});
api::run_event_loop(eventloop, handle_mqtt).await;
}

View file

@ -8,6 +8,6 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
toml::from_str(&std::fs::read_to_string("./homeserver/config.toml").unwrap()).unwrap()
toml::from_str(&std::fs::read_to_string("./herd/config.toml").unwrap()).unwrap()
}
}

View file

@ -1,3 +1,4 @@
pub mod config;
pub mod model;
pub mod mqtt;
pub mod route;

64
src/herd_core/mqtt.rs Normal file
View file

@ -0,0 +1,64 @@
use crate::Machine;
use owl::prelude::*;
use owl::{Serialize, get, query};
use rumqttc::AsyncClient;
use sage::PersonaIdentity;
pub async fn handle_mqtt(topic: String, data: Vec<u8>) {
log::info!("Received client request from {topic}");
let (client, cat) = topic.split_once('/').unwrap();
let mac: Model<Machine> = get!(client).unwrap();
let dec = crate::IDENTITY
.get()
.unwrap()
.decrypt(&data, &mac.read().identity.sign_key().unwrap())
.unwrap();
// TODO : check for recency
println!("got raw: {}", String::from_utf8(dec.payload).unwrap());
match cat {
"online" => {
log::info!("Device {client} reported ONLINE");
}
_ => {}
}
}
pub async fn send_msg<T: Serialize>(client: &AsyncClient, machine: &Model<Machine>, request: T) {
let data = serde_json::to_string(&request).unwrap();
let pk = &machine.read().identity;
let rec = pk.enc_key().unwrap();
let machine_id = machine.read().id.to_string().replace("-", "");
let payload = crate::IDENTITY
.get()
.unwrap()
.encrypt(data.as_bytes(), &rec);
let topic = format!("{machine_id}/cmd");
client
.publish(topic, rumqttc::QoS::AtMostOnce, true, payload)
.await
.unwrap();
}
pub async fn listen_to_device(client: &AsyncClient, machine_id: &str) {
// Online Presence
client
.subscribe(format!("{machine_id}/online"), rumqttc::QoS::AtMostOnce)
.await
.unwrap();
}
pub async fn listen_to_devices(client: &AsyncClient) {
let machines: Vec<Model<Machine>> = query!(|_| true);
for machine in machines {
let machine_id = machine.read().id.to_string();
let machine_id = machine_id.trim().replace("-", "");
log::info!("Sub to {machine_id}");
listen_to_device(client, &machine_id).await;
}
}

View file

@ -1,6 +1,6 @@
use crate::api;
use crate::api::JoinResponse;
use crate::server_core::model::Machine;
use crate::herd_core::model::Machine;
use axum::Json;
use axum::http::StatusCode;
use axum_client_ip::ClientIp;

View file

@ -1,122 +0,0 @@
use age::{Recipient, secrecy::ExposeSecret};
use axum::{
Json, Router,
http::StatusCode,
routing::{get, post},
};
use axum_client_ip::{ClientIp, ClientIpSource};
use based::auth::{Sessions, User};
use minisign::PublicKey;
use owl::{prelude::*, query, save, set_global_db};
use rand::RngCore;
use rumqttc::{AsyncClient, Event};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::{
io::{BufReader, Cursor},
net::SocketAddr,
path::PathBuf,
str::FromStr,
time::Duration,
};
mod api;
use std::io::{Read, Seek};
mod server_core;
use crate::api::Identity;
use server_core::model::Machine;
use server_core::{
config::Config,
route::{join_device, login_user},
};
use tokio::sync::OnceCell;
pub static IDENTITY: OnceCell<Identity> = OnceCell::const_new();
pub static CONFIG: OnceCell<Config> = OnceCell::const_new();
fn generate_token() -> String {
let mut rng = rand::rng();
let mut token = vec![0u8; 32];
rng.fill_bytes(&mut token);
hex::encode(token)
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let i = if let Some(i) = Identity::try_load(&PathBuf::from("./homeserver")) {
i
} else {
let i = Identity::new();
i.save(&PathBuf::from("./homeserver"));
i
};
let _ = crate::IDENTITY.set(i);
let config = Config::default();
let _ = crate::CONFIG.set(config);
let db = Database::filesystem("./homeserver/db");
set_global_db!(db);
let _ = User::create("admin".to_string(), "admin", based::auth::UserRole::Admin).await;
let device = Router::new()
.route("/join", post(join_device))
.layer(ClientIpSource::ConnectInfo.into_extension()); // Direct IP
// .layer(ClientIpSource::XRealIp.into_extension()) // Proxy
let user = Router::new().route("/login", post(login_user));
let app = Router::new().merge(device).merge(user);
log::info!("Starting server");
let (client, mut eventloop) = api::mqtt_connect("server", &crate::CONFIG.get().unwrap().mqtt);
tokio::spawn(async move {
loop {
let machines: Vec<Model<Machine>> = query!(|_| true);
for machine in machines {
let machine_id = machine.read().id.to_string();
let machine_id = machine_id.trim();
log::info!("Pushing topic to {machine_id}");
client
.publish(
format!("{machine_id}/cmd"),
rumqttc::QoS::AtMostOnce,
false,
"Hello World".as_bytes(),
)
.await
.unwrap();
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
});
tokio::spawn(async {
let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap();
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await
.unwrap();
});
api::run_event_loop(eventloop).await;
}
pub fn send_rpc(client: AsyncClient, Machine: &Model<Machine>) {
// TODO : pub encryption here
client
.publish(
format!("{machine_id}/cmd"),
rumqttc::QoS::AtMostOnce,
true,
"Hello World".as_bytes(),
)
.await
.unwrap();
}

View file

@ -1,19 +1,16 @@
use api::Identity;
use sage::Identity;
use sheepd_core::{
args::{SheepdArgs, SheepdCommand},
config::AgentConfig,
};
mod api;
mod sheepd_core;
use rumqttc::{AsyncClient, Event, MqttOptions, QoS, Transport};
use std::{error::Error, path::PathBuf, time::Duration};
use tokio::{
sync::OnceCell,
task,
time::{self, sleep},
};
use rumqttc::AsyncClient;
use tokio::sync::OnceCell;
pub static MQTT: OnceCell<AsyncClient> = OnceCell::const_new();
pub static IDENTITY: OnceCell<Identity> = OnceCell::const_new();
pub static AGENT: OnceCell<AgentConfig> = OnceCell::const_new();
#[tokio::main]
async fn main() {
@ -25,42 +22,6 @@ async fn main() {
SheepdCommand::Join(join_command) => sheepd_core::cmd::join(join_command),
}
} else {
log::info!("Starting sheepd");
let conf = AgentConfig::try_load();
if conf.is_none() {
log::error!("No config file at /etc/sheepd/config.toml");
std::process::exit(1);
}
let i = if let Some(i) = Identity::try_load(&PathBuf::from("/etc/sheepd")) {
i
} else {
let i = Identity::new();
i.save(&PathBuf::from("/etc/sheepd"));
i
};
let conf = conf.unwrap();
let machine_id = std::fs::read_to_string("/etc/machine-id").unwrap();
let machine_id = machine_id.trim();
log::info!("Connecting to MQTT as {machine_id}");
let (client, mut eventloop) = api::mqtt_connect(machine_id, &conf.mqtt);
crate::MQTT.set(client).unwrap();
log::info!("Connection done");
log::info!("Start sub for {}", format!("{machine_id}/cmd"));
crate::MQTT
.get()
.unwrap()
.subscribe(format!("{machine_id}/cmd"), QoS::AtMostOnce)
.await
.unwrap();
api::run_event_loop(eventloop).await;
sheepd_core::daemon::start_daemon().await;
}
}

View file

@ -1,7 +1,9 @@
use std::path::PathBuf;
use sage::Identity;
use crate::{
api::{self, Identity, JoinResponse},
api::{self, JoinResponse},
sheepd_core::config::AgentConfig,
};

View file

@ -2,7 +2,7 @@ use owl::{Deserialize, Serialize};
use crate::api::JoinResponse;
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
pub struct AgentConfig {
pub home: String,
pub token: String,

60
src/sheepd_core/daemon.rs Normal file
View file

@ -0,0 +1,60 @@
use std::{path::PathBuf, time::Duration};
use rumqttc::{AsyncClient, QoS};
use sage::Identity;
use serde_json::json;
use crate::{api, sheepd_core::config::AgentConfig};
use super::mqtt::send_back;
pub async fn report_online(client: AsyncClient) {
loop {
send_back(&client, "online", json!(crate::api::Result::Ok())).await;
tokio::time::sleep(Duration::from_secs(60)).await;
}
}
pub async fn start_daemon() {
log::info!("Starting sheepd");
let conf = AgentConfig::try_load();
if conf.is_none() {
log::error!("No config file at /etc/sheepd/config.toml");
std::process::exit(1);
}
let i = if let Some(i) = Identity::try_load(&PathBuf::from("/etc/sheepd")) {
i
} else {
let i = Identity::new();
i.save(&PathBuf::from("/etc/sheepd"));
i
};
let _ = crate::IDENTITY.set(i);
let conf = conf.unwrap();
crate::AGENT.set(conf).unwrap();
let machine_id = std::fs::read_to_string("/etc/machine-id").unwrap();
let machine_id = machine_id.trim();
log::info!("Connecting to MQTT as {machine_id}");
let (client, eventloop) = api::mqtt_connect(machine_id, &crate::AGENT.get().unwrap().mqtt);
crate::MQTT.set(client.clone()).unwrap();
log::info!("Connection done");
tokio::task::spawn(report_online(client.clone()));
log::info!("Listen on {}", format!("{machine_id}/cmd"));
crate::MQTT
.get()
.unwrap()
.subscribe(format!("{machine_id}/cmd"), QoS::AtMostOnce)
.await
.unwrap();
api::run_event_loop(eventloop, crate::sheepd_core::mqtt::handle_mqtt).await;
}

View file

@ -1,3 +1,5 @@
pub mod args;
pub mod cmd;
pub mod config;
pub mod daemon;
pub mod mqtt;

42
src/sheepd_core/mqtt.rs Normal file
View file

@ -0,0 +1,42 @@
use owl::Serialize;
use rumqttc::AsyncClient;
use sage::PersonaIdentity;
// Client MQTT
pub async fn handle_mqtt(topic: String, data: Vec<u8>) {
//println!("got real raw: {}", String::from_utf8_lossy(&data));
let pk = (
String::new(),
crate::AGENT.get().unwrap().server_sign.clone(),
);
let pk = pk.sign_key().unwrap();
let payload = crate::IDENTITY.get().unwrap().decrypt(&data, &pk).unwrap();
println!(
"got payload {}",
String::from_utf8(payload.payload).unwrap()
);
}
pub async fn send_back<T: Serialize>(client: &AsyncClient, topic: &str, request: T) {
let data = serde_json::to_string(&request).unwrap();
let pk = crate::AGENT.get().unwrap();
let pk = (pk.server_age.clone(), String::new());
let rec = pk.enc_key().unwrap();
let machine_id = std::fs::read_to_string("/etc/machine-id")
.unwrap()
.trim()
.to_string();
let payload = crate::IDENTITY
.get()
.unwrap()
.encrypt(data.as_bytes(), &rec);
let topic = format!("{machine_id}/{topic}");
log::info!("Publish to {machine_id}{topic}");
client
.publish(topic, rumqttc::QoS::AtMostOnce, true, payload)
.await
.unwrap();
}