diff --git a/.gitignore b/.gitignore index 37997be..0fe1ef9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ /target /mosquitto/data /mosquitto/log -/homeserver/age.key -/homeserver/sign.key -/homeserver/db \ No newline at end of file +/herd/age.key +/herd/sign.key +/herd/db \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 3132729..dba945c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -444,7 +444,7 @@ checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3" [[package]] name = "based" version = "0.1.0" -source = "git+https://git.hydrar.de/jmarya/based?branch=owl#ca6db5f1aae2e1dca0b39f9102552e6ad0b4970f" +source = "git+https://git.hydrar.de/jmarya/based?branch=owl#2f2d77ec4897bfe0b5aeaf908c8ec2231fbd5c15" dependencies = [ "async-stream", "bcrypt", @@ -568,9 +568,9 @@ checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" [[package]] name = "bytemuck" -version = "1.22.0" +version = "1.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6b1fc10dbac614ebc03540c9dbd60e83887fda27794998c6528f1782047d540" +checksum = "9134a6ef01ce4b366b50689c94f82c14bc72bc5d0386829828a2e2752ef7958c" [[package]] name = "byteorder" @@ -634,9 +634,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1107,7 +1107,7 @@ dependencies = [ "atomic 0.6.0", "pear", "serde", - "toml 0.8.21", + "toml 0.8.22", "uncased", "version_check", ] @@ -3357,6 +3357,17 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "sage" +version = "0.1.0" +source = "git+https://git.hydrar.de/jmarya/sage#79a13d39411fdcc2fd9b8a3dcf42f017c277cda5" +dependencies = [ + "age", + "chrono", + "log", + "minisign", +] + [[package]] name = "salsa20" version = "0.10.2" @@ -3567,22 +3578,22 @@ dependencies = [ name = "sheepd" version = "0.1.0" dependencies = [ - "age", "argh", "axum", "axum-client-ip", "based", + "dashmap", "hex", "http2", "log", - "minisign", "owl", "rand 0.9.1", "rumqttc", + "sage", "serde", "serde_json", "tokio", - "toml 0.8.21", + "toml 0.8.22", "tracing", "tracing-subscriber", "ureq", @@ -4179,9 +4190,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.21" +version = "0.8.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900f6c86a685850b1bc9f6223b20125115ee3f31e01207d81655bbcc0aea9231" +checksum = "05ae329d1f08c4d17a59bed7ff5b5a769d062e64a62d34a3261b219e62cd5aae" dependencies = [ "serde", "serde_spanned", @@ -4200,9 +4211,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.25" +version = "0.22.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10558ed0bd2a1562e630926a2d1f0b98c827da99fabd3fe20920a59642504485" +checksum = "310068873db2c5b3e7659d2cc35d21855dbafa50d1ce336397c666e3cb08137e" dependencies = [ "indexmap", "serde", @@ -4214,9 +4225,9 @@ dependencies = [ [[package]] name = "toml_write" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28391a4201ba7eb1984cfeb6862c0b3ea2cfe23332298967c749dddc0d6cd976" +checksum = "bfb942dfe1d8e29a7ee7fcbde5bd2b9a25fb89aa70caea2eba3bee836ff41076" [[package]] name = "tower" diff --git a/Cargo.toml b/Cargo.toml index b6ac753..bcc8e0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,12 +8,12 @@ name = "sheepd" path = "src/sheepd.rs" [[bin]] -name = "homeserver" -path = "src/server.rs" -required-features = ["homeserver"] +name = "herd" +path = "src/herd.rs" +required-features = ["herd"] [features] -homeserver = ["axum", "axum-client-ip"] +herd = ["axum", "axum-client-ip"] axum = ["dep:axum"] [dependencies] @@ -34,5 +34,5 @@ based = { git = "https://git.hydrar.de/jmarya/based", branch = "owl" } http2 = "0.4.21" ureq = { version = "3.0.11", features = ["json"] } rumqttc = { version = "0.24.0", features = ["url", "websocket"] } -age = { version = "0.11.1", features = ["aes", "aes-gcm", "armor", "async", "ssh"] } -minisign = "0.7.9" +sage = { git = "https://git.hydrar.de/jmarya/sage" } +dashmap = "6.1.0" diff --git a/homeserver/config.toml b/herd/config.toml similarity index 100% rename from homeserver/config.toml rename to herd/config.toml diff --git a/justfile b/justfile index ba6b7a6..4141b59 100644 --- a/justfile +++ b/justfile @@ -1,3 +1,9 @@ server: - cargo run --bin homeserver --features homeserver + cargo run --bin herd --features herd + +clean_server: + rm -rv herd/age.key herd/sign.key herd/db + +clean_client: + doas rm -rv /etc/sheepd diff --git a/src/api.rs b/src/api.rs index b05ecf2..65d567e 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,17 +1,6 @@ -use age::{Recipient, secrecy::ExposeSecret}; -use based::auth::{Sessions, User}; -use minisign::PublicKey; use owl::{Deserialize, Serialize}; -use owl::{prelude::*, query, save, set_global_db}; use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, Transport}; -use std::io::{Read, Seek}; use std::time::Duration; -use std::{ - io::{BufReader, Cursor}, - net::SocketAddr, - path::PathBuf, - str::FromStr, -}; use tokio::time::sleep; #[derive(Deserialize, Serialize)] @@ -57,7 +46,11 @@ pub fn mqtt_connect(machine_id: &str, mqtt: &str) -> (rumqttc::AsyncClient, rumq AsyncClient::new(mqttoptions, 10) } -pub async fn run_event_loop(mut eventloop: EventLoop) { +pub async fn run_event_loop(mut eventloop: EventLoop, handle_payload: F) +where + F: Fn(String, Vec) -> Fut + Send + Sync + 'static, + Fut: std::future::Future + Send + 'static, +{ log::info!("Handling MQTT events"); loop { match eventloop.poll().await { @@ -65,9 +58,9 @@ pub async fn run_event_loop(mut eventloop: EventLoop) { log::trace!("Incoming = {:?}", incoming); match incoming { Packet::Publish(publish) => { + log::info!("Got payload with size {}", publish.size()); let s = publish.payload; - // TODO : client decryption here - println!("got payload {}", String::from_utf8(s.to_vec()).unwrap()); + tokio::spawn(handle_payload(publish.topic, s.to_vec())); } _ => {} } @@ -83,103 +76,19 @@ pub async fn run_event_loop(mut eventloop: EventLoop) { } } -pub struct Identity { - pub age: age::x25519::Identity, - pub sign: minisign::KeyPair, +#[derive(Deserialize, Serialize)] +pub struct Result { + pub ok: u32, } -fn save_parts(part1: &[u8], part2: &[u8]) -> Vec { - let mut vec = Vec::new(); - - let offset = part1.len() as u32; // assume it fits into u32 - vec.extend_from_slice(&offset.to_le_bytes()); // or to_be_bytes() for big-endian - - vec.extend_from_slice(part1); - vec.extend_from_slice(part2); - - vec -} - -fn load_parts(data: &[u8]) -> (Vec, Vec) { - let offset_bytes: [u8; 4] = data[..4].try_into().unwrap(); - let offset = u32::from_le_bytes(offset_bytes) as usize; - - let part1 = data[4..4 + offset].to_vec(); - let part2 = data[4 + offset..].to_vec(); - - (part1, part2) -} - -impl Identity { - pub fn new() -> Self { - let age = age::x25519::Identity::generate(); - let sign = minisign::KeyPair::generate_encrypted_keypair(Some(String::new())).unwrap(); - Self { age, sign } +impl Result { + #[allow(non_snake_case)] + pub fn Ok() -> Self { + Self { ok: 1 } } - pub fn public(&self) -> (String, String) { - (self.pub_key_age(), self.pub_key_sign()) - } - - pub fn pub_key_age(&self) -> String { - self.age.to_public().to_string() - } - - pub fn pub_key_sign(&self) -> String { - self.sign.pk.to_box().unwrap().to_string() - } - - pub fn save(&self, dir: &PathBuf) { - let age_key = self.age.to_string(); - std::fs::write(dir.join("age.key"), age_key.expose_secret().to_string()).unwrap(); - - let kbx = self.sign.sk.to_box(None).unwrap(); - - std::fs::write(dir.join("sign.key"), kbx.into_string()).unwrap(); - log::info!("Saved identity to {dir:?}"); - } - - pub fn try_load(dir: &PathBuf) -> Option { - let age = - age::x25519::Identity::from_str(&std::fs::read_to_string(dir.join("age.key")).ok()?) - .unwrap(); - let kbx = minisign::SecretKeyBox::from_string( - &std::fs::read_to_string(dir.join("sign.key")).ok()?, - ) - .unwrap(); - let sign = kbx.into_secret_key(Some(String::new())).unwrap(); - let pk = minisign::PublicKey::from_secret_key(&sign).ok()?; - let kp = minisign::KeyPair { pk: pk, sk: sign }; - log::info!("Loaded identity from {dir:?}"); - Some(Self { age, sign: kp }) - } - - pub fn encrypt(&self, data: &[u8], recipient: &impl Recipient) -> Vec { - let signed = - minisign::sign(None, &self.sign.sk, data, Some("mynameisanubis"), None).unwrap(); - let signed = save_parts(&signed.to_bytes(), data); - let enc = age::encrypt(recipient, &signed).unwrap(); - enc - } - - pub fn decrypt(&self, data: &[u8], pk: &PublicKey) -> Option> { - let dec = age::decrypt(&self.age, data).unwrap(); - let (sig, data) = load_parts(&dec); - let sign_box = - minisign::SignatureBox::from_string(&String::from_utf8(sig).unwrap()).unwrap(); - if minisign::verify( - pk, - &sign_box, - BufReader::new(Cursor::new(data.clone())), - true, - false, - false, - ) - .is_ok() - { - return Some(data); - } - - None + #[allow(non_snake_case)] + pub fn Err() -> Self { + Self { ok: 0 } } } diff --git a/src/herd.rs b/src/herd.rs new file mode 100644 index 0000000..420a16d --- /dev/null +++ b/src/herd.rs @@ -0,0 +1,78 @@ +use axum::{ + Router, + routing::{get, post}, +}; +use axum_client_ip::ClientIpSource; +use based::auth::User; +use owl::{prelude::*, set_global_db}; +use rand::RngCore; +use std::{net::SocketAddr, path::PathBuf}; +mod api; +mod herd_core; +use crate::herd_core::mqtt::{handle_mqtt, listen_to_devices}; +use herd_core::model::Machine; +use herd_core::{ + config::Config, + route::{join_device, login_user}, +}; +use sage::Identity; +use tokio::sync::OnceCell; + +pub static IDENTITY: OnceCell = OnceCell::const_new(); +pub static CONFIG: OnceCell = OnceCell::const_new(); + +fn generate_token() -> String { + let mut rng = rand::rng(); + let mut token = vec![0u8; 32]; + rng.fill_bytes(&mut token); + + hex::encode(token) +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let i = if let Some(i) = Identity::try_load(&PathBuf::from("./herd")) { + i + } else { + let i = Identity::new(); + i.save(&PathBuf::from("./herd")); + i + }; + let _ = crate::IDENTITY.set(i); + + let config = Config::default(); + let _ = crate::CONFIG.set(config); + + let db = Database::filesystem("./herd/db"); + set_global_db!(db); + + let _ = User::create("admin".to_string(), "admin", based::auth::UserRole::Admin).await; + + let device = Router::new() + .route("/join", post(join_device)) + .layer(ClientIpSource::ConnectInfo.into_extension()); // Direct IP + // .layer(ClientIpSource::XRealIp.into_extension()) // Proxy + + let user = Router::new().route("/login", post(login_user)); + + let app = Router::new().merge(device).merge(user); + + log::info!("Starting server"); + + let (client, eventloop) = api::mqtt_connect("server", &crate::CONFIG.get().unwrap().mqtt); + + listen_to_devices(&client).await; + + tokio::spawn(async { + let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap(); + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .await + .unwrap(); + }); + + api::run_event_loop(eventloop, handle_mqtt).await; +} diff --git a/src/server_core/config.rs b/src/herd_core/config.rs similarity index 68% rename from src/server_core/config.rs rename to src/herd_core/config.rs index 757fc11..2eefab1 100644 --- a/src/server_core/config.rs +++ b/src/herd_core/config.rs @@ -8,6 +8,6 @@ pub struct Config { impl Default for Config { fn default() -> Self { - toml::from_str(&std::fs::read_to_string("./homeserver/config.toml").unwrap()).unwrap() + toml::from_str(&std::fs::read_to_string("./herd/config.toml").unwrap()).unwrap() } } diff --git a/src/server_core/mod.rs b/src/herd_core/mod.rs similarity index 76% rename from src/server_core/mod.rs rename to src/herd_core/mod.rs index bd39d83..a4bb294 100644 --- a/src/server_core/mod.rs +++ b/src/herd_core/mod.rs @@ -1,3 +1,4 @@ pub mod config; pub mod model; +pub mod mqtt; pub mod route; diff --git a/src/server_core/model.rs b/src/herd_core/model.rs similarity index 100% rename from src/server_core/model.rs rename to src/herd_core/model.rs diff --git a/src/herd_core/mqtt.rs b/src/herd_core/mqtt.rs new file mode 100644 index 0000000..6decdaa --- /dev/null +++ b/src/herd_core/mqtt.rs @@ -0,0 +1,64 @@ +use crate::Machine; +use owl::prelude::*; +use owl::{Serialize, get, query}; +use rumqttc::AsyncClient; +use sage::PersonaIdentity; + +pub async fn handle_mqtt(topic: String, data: Vec) { + log::info!("Received client request from {topic}"); + let (client, cat) = topic.split_once('/').unwrap(); + let mac: Model = get!(client).unwrap(); + + let dec = crate::IDENTITY + .get() + .unwrap() + .decrypt(&data, &mac.read().identity.sign_key().unwrap()) + .unwrap(); + + // TODO : check for recency + println!("got raw: {}", String::from_utf8(dec.payload).unwrap()); + + match cat { + "online" => { + log::info!("Device {client} reported ONLINE"); + } + _ => {} + } +} + +pub async fn send_msg(client: &AsyncClient, machine: &Model, request: T) { + let data = serde_json::to_string(&request).unwrap(); + let pk = &machine.read().identity; + let rec = pk.enc_key().unwrap(); + let machine_id = machine.read().id.to_string().replace("-", ""); + + let payload = crate::IDENTITY + .get() + .unwrap() + .encrypt(data.as_bytes(), &rec); + let topic = format!("{machine_id}/cmd"); + + client + .publish(topic, rumqttc::QoS::AtMostOnce, true, payload) + .await + .unwrap(); +} + +pub async fn listen_to_device(client: &AsyncClient, machine_id: &str) { + // Online Presence + client + .subscribe(format!("{machine_id}/online"), rumqttc::QoS::AtMostOnce) + .await + .unwrap(); +} + +pub async fn listen_to_devices(client: &AsyncClient) { + let machines: Vec> = query!(|_| true); + for machine in machines { + let machine_id = machine.read().id.to_string(); + let machine_id = machine_id.trim().replace("-", ""); + log::info!("Sub to {machine_id}"); + + listen_to_device(client, &machine_id).await; + } +} diff --git a/src/server_core/route.rs b/src/herd_core/route.rs similarity index 97% rename from src/server_core/route.rs rename to src/herd_core/route.rs index 459e739..3b82048 100644 --- a/src/server_core/route.rs +++ b/src/herd_core/route.rs @@ -1,6 +1,6 @@ use crate::api; use crate::api::JoinResponse; -use crate::server_core::model::Machine; +use crate::herd_core::model::Machine; use axum::Json; use axum::http::StatusCode; use axum_client_ip::ClientIp; diff --git a/src/server.rs b/src/server.rs deleted file mode 100644 index 66629ca..0000000 --- a/src/server.rs +++ /dev/null @@ -1,122 +0,0 @@ -use age::{Recipient, secrecy::ExposeSecret}; -use axum::{ - Json, Router, - http::StatusCode, - routing::{get, post}, -}; -use axum_client_ip::{ClientIp, ClientIpSource}; -use based::auth::{Sessions, User}; -use minisign::PublicKey; -use owl::{prelude::*, query, save, set_global_db}; -use rand::RngCore; -use rumqttc::{AsyncClient, Event}; -use serde::{Deserialize, Serialize}; -use serde_json::json; -use std::{ - io::{BufReader, Cursor}, - net::SocketAddr, - path::PathBuf, - str::FromStr, - time::Duration, -}; -mod api; -use std::io::{Read, Seek}; -mod server_core; -use crate::api::Identity; -use server_core::model::Machine; -use server_core::{ - config::Config, - route::{join_device, login_user}, -}; -use tokio::sync::OnceCell; - -pub static IDENTITY: OnceCell = OnceCell::const_new(); -pub static CONFIG: OnceCell = OnceCell::const_new(); - -fn generate_token() -> String { - let mut rng = rand::rng(); - let mut token = vec![0u8; 32]; - rng.fill_bytes(&mut token); - - hex::encode(token) -} - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt::init(); - let i = if let Some(i) = Identity::try_load(&PathBuf::from("./homeserver")) { - i - } else { - let i = Identity::new(); - i.save(&PathBuf::from("./homeserver")); - i - }; - let _ = crate::IDENTITY.set(i); - - let config = Config::default(); - let _ = crate::CONFIG.set(config); - - let db = Database::filesystem("./homeserver/db"); - set_global_db!(db); - - let _ = User::create("admin".to_string(), "admin", based::auth::UserRole::Admin).await; - - let device = Router::new() - .route("/join", post(join_device)) - .layer(ClientIpSource::ConnectInfo.into_extension()); // Direct IP - // .layer(ClientIpSource::XRealIp.into_extension()) // Proxy - - let user = Router::new().route("/login", post(login_user)); - - let app = Router::new().merge(device).merge(user); - - log::info!("Starting server"); - - let (client, mut eventloop) = api::mqtt_connect("server", &crate::CONFIG.get().unwrap().mqtt); - - tokio::spawn(async move { - loop { - let machines: Vec> = query!(|_| true); - for machine in machines { - let machine_id = machine.read().id.to_string(); - let machine_id = machine_id.trim(); - log::info!("Pushing topic to {machine_id}"); - client - .publish( - format!("{machine_id}/cmd"), - rumqttc::QoS::AtMostOnce, - false, - "Hello World".as_bytes(), - ) - .await - .unwrap(); - } - tokio::time::sleep(Duration::from_secs(5)).await; - } - }); - - tokio::spawn(async { - let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap(); - axum::serve( - listener, - app.into_make_service_with_connect_info::(), - ) - .await - .unwrap(); - }); - - api::run_event_loop(eventloop).await; -} - -pub fn send_rpc(client: AsyncClient, Machine: &Model) { - // TODO : pub encryption here - client - .publish( - format!("{machine_id}/cmd"), - rumqttc::QoS::AtMostOnce, - true, - "Hello World".as_bytes(), - ) - .await - .unwrap(); -} diff --git a/src/sheepd.rs b/src/sheepd.rs index 17b709b..9a47876 100644 --- a/src/sheepd.rs +++ b/src/sheepd.rs @@ -1,19 +1,16 @@ -use api::Identity; +use sage::Identity; use sheepd_core::{ args::{SheepdArgs, SheepdCommand}, config::AgentConfig, }; mod api; mod sheepd_core; -use rumqttc::{AsyncClient, Event, MqttOptions, QoS, Transport}; -use std::{error::Error, path::PathBuf, time::Duration}; -use tokio::{ - sync::OnceCell, - task, - time::{self, sleep}, -}; +use rumqttc::AsyncClient; +use tokio::sync::OnceCell; pub static MQTT: OnceCell = OnceCell::const_new(); +pub static IDENTITY: OnceCell = OnceCell::const_new(); +pub static AGENT: OnceCell = OnceCell::const_new(); #[tokio::main] async fn main() { @@ -25,42 +22,6 @@ async fn main() { SheepdCommand::Join(join_command) => sheepd_core::cmd::join(join_command), } } else { - log::info!("Starting sheepd"); - - let conf = AgentConfig::try_load(); - if conf.is_none() { - log::error!("No config file at /etc/sheepd/config.toml"); - std::process::exit(1); - } - - let i = if let Some(i) = Identity::try_load(&PathBuf::from("/etc/sheepd")) { - i - } else { - let i = Identity::new(); - i.save(&PathBuf::from("/etc/sheepd")); - i - }; - - let conf = conf.unwrap(); - let machine_id = std::fs::read_to_string("/etc/machine-id").unwrap(); - let machine_id = machine_id.trim(); - - log::info!("Connecting to MQTT as {machine_id}"); - - let (client, mut eventloop) = api::mqtt_connect(machine_id, &conf.mqtt); - - crate::MQTT.set(client).unwrap(); - - log::info!("Connection done"); - - log::info!("Start sub for {}", format!("{machine_id}/cmd")); - crate::MQTT - .get() - .unwrap() - .subscribe(format!("{machine_id}/cmd"), QoS::AtMostOnce) - .await - .unwrap(); - - api::run_event_loop(eventloop).await; + sheepd_core::daemon::start_daemon().await; } } diff --git a/src/sheepd_core/cmd.rs b/src/sheepd_core/cmd.rs index 49aac4b..b57af5e 100644 --- a/src/sheepd_core/cmd.rs +++ b/src/sheepd_core/cmd.rs @@ -1,7 +1,9 @@ use std::path::PathBuf; +use sage::Identity; + use crate::{ - api::{self, Identity, JoinResponse}, + api::{self, JoinResponse}, sheepd_core::config::AgentConfig, }; diff --git a/src/sheepd_core/config.rs b/src/sheepd_core/config.rs index 277a9c5..4c88897 100644 --- a/src/sheepd_core/config.rs +++ b/src/sheepd_core/config.rs @@ -2,7 +2,7 @@ use owl::{Deserialize, Serialize}; use crate::api::JoinResponse; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct AgentConfig { pub home: String, pub token: String, diff --git a/src/sheepd_core/daemon.rs b/src/sheepd_core/daemon.rs new file mode 100644 index 0000000..b5250c3 --- /dev/null +++ b/src/sheepd_core/daemon.rs @@ -0,0 +1,60 @@ +use std::{path::PathBuf, time::Duration}; + +use rumqttc::{AsyncClient, QoS}; +use sage::Identity; +use serde_json::json; + +use crate::{api, sheepd_core::config::AgentConfig}; + +use super::mqtt::send_back; + +pub async fn report_online(client: AsyncClient) { + loop { + send_back(&client, "online", json!(crate::api::Result::Ok())).await; + tokio::time::sleep(Duration::from_secs(60)).await; + } +} + +pub async fn start_daemon() { + log::info!("Starting sheepd"); + + let conf = AgentConfig::try_load(); + if conf.is_none() { + log::error!("No config file at /etc/sheepd/config.toml"); + std::process::exit(1); + } + + let i = if let Some(i) = Identity::try_load(&PathBuf::from("/etc/sheepd")) { + i + } else { + let i = Identity::new(); + i.save(&PathBuf::from("/etc/sheepd")); + i + }; + let _ = crate::IDENTITY.set(i); + + let conf = conf.unwrap(); + crate::AGENT.set(conf).unwrap(); + let machine_id = std::fs::read_to_string("/etc/machine-id").unwrap(); + let machine_id = machine_id.trim(); + + log::info!("Connecting to MQTT as {machine_id}"); + + let (client, eventloop) = api::mqtt_connect(machine_id, &crate::AGENT.get().unwrap().mqtt); + + crate::MQTT.set(client.clone()).unwrap(); + + log::info!("Connection done"); + + tokio::task::spawn(report_online(client.clone())); + + log::info!("Listen on {}", format!("{machine_id}/cmd")); + crate::MQTT + .get() + .unwrap() + .subscribe(format!("{machine_id}/cmd"), QoS::AtMostOnce) + .await + .unwrap(); + + api::run_event_loop(eventloop, crate::sheepd_core::mqtt::handle_mqtt).await; +} diff --git a/src/sheepd_core/mod.rs b/src/sheepd_core/mod.rs index e7ee58d..fa1660d 100644 --- a/src/sheepd_core/mod.rs +++ b/src/sheepd_core/mod.rs @@ -1,3 +1,5 @@ pub mod args; pub mod cmd; pub mod config; +pub mod daemon; +pub mod mqtt; diff --git a/src/sheepd_core/mqtt.rs b/src/sheepd_core/mqtt.rs new file mode 100644 index 0000000..bbc124b --- /dev/null +++ b/src/sheepd_core/mqtt.rs @@ -0,0 +1,42 @@ +use owl::Serialize; +use rumqttc::AsyncClient; +use sage::PersonaIdentity; + +// Client MQTT +pub async fn handle_mqtt(topic: String, data: Vec) { + //println!("got real raw: {}", String::from_utf8_lossy(&data)); + let pk = ( + String::new(), + crate::AGENT.get().unwrap().server_sign.clone(), + ); + let pk = pk.sign_key().unwrap(); + let payload = crate::IDENTITY.get().unwrap().decrypt(&data, &pk).unwrap(); + println!( + "got payload {}", + String::from_utf8(payload.payload).unwrap() + ); +} + +pub async fn send_back(client: &AsyncClient, topic: &str, request: T) { + let data = serde_json::to_string(&request).unwrap(); + + let pk = crate::AGENT.get().unwrap(); + let pk = (pk.server_age.clone(), String::new()); + let rec = pk.enc_key().unwrap(); + let machine_id = std::fs::read_to_string("/etc/machine-id") + .unwrap() + .trim() + .to_string(); + + let payload = crate::IDENTITY + .get() + .unwrap() + .encrypt(data.as_bytes(), &rec); + let topic = format!("{machine_id}/{topic}"); + + log::info!("Publish to {machine_id}{topic}"); + client + .publish(topic, rumqttc::QoS::AtMostOnce, true, payload) + .await + .unwrap(); +}