diff --git a/.gitignore b/.gitignore index 0fe1ef9..37997be 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ /target /mosquitto/data /mosquitto/log -/herd/age.key -/herd/sign.key -/herd/db \ No newline at end of file +/homeserver/age.key +/homeserver/sign.key +/homeserver/db \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index dba945c..3132729 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -444,7 +444,7 @@ checksum = "89e25b6adfb930f02d1981565a6e5d9c547ac15a96606256d3b59040e5cd4ca3" [[package]] name = "based" version = "0.1.0" -source = "git+https://git.hydrar.de/jmarya/based?branch=owl#2f2d77ec4897bfe0b5aeaf908c8ec2231fbd5c15" +source = "git+https://git.hydrar.de/jmarya/based?branch=owl#ca6db5f1aae2e1dca0b39f9102552e6ad0b4970f" dependencies = [ "async-stream", "bcrypt", @@ -568,9 +568,9 @@ checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" [[package]] name = "bytemuck" -version = "1.23.0" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9134a6ef01ce4b366b50689c94f82c14bc72bc5d0386829828a2e2752ef7958c" +checksum = "b6b1fc10dbac614ebc03540c9dbd60e83887fda27794998c6528f1782047d540" [[package]] name = "byteorder" @@ -634,9 +634,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.41" +version = "0.4.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" dependencies = [ "android-tzdata", "iana-time-zone", @@ -1107,7 +1107,7 @@ dependencies = [ "atomic 0.6.0", "pear", "serde", - "toml 0.8.22", + "toml 0.8.21", "uncased", "version_check", ] @@ -3357,17 +3357,6 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" -[[package]] -name = "sage" -version = "0.1.0" -source = "git+https://git.hydrar.de/jmarya/sage#79a13d39411fdcc2fd9b8a3dcf42f017c277cda5" -dependencies = [ - "age", - "chrono", - "log", - "minisign", -] - [[package]] name = "salsa20" version = "0.10.2" @@ -3578,22 +3567,22 @@ dependencies = [ name = "sheepd" version = "0.1.0" dependencies = [ + "age", "argh", "axum", "axum-client-ip", "based", - "dashmap", "hex", "http2", "log", + "minisign", "owl", "rand 0.9.1", "rumqttc", - "sage", "serde", "serde_json", "tokio", - "toml 0.8.22", + "toml 0.8.21", "tracing", "tracing-subscriber", "ureq", @@ -4190,9 +4179,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.22" +version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ae329d1f08c4d17a59bed7ff5b5a769d062e64a62d34a3261b219e62cd5aae" +checksum = "900f6c86a685850b1bc9f6223b20125115ee3f31e01207d81655bbcc0aea9231" dependencies = [ "serde", "serde_spanned", @@ -4211,9 +4200,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.26" +version = "0.22.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "310068873db2c5b3e7659d2cc35d21855dbafa50d1ce336397c666e3cb08137e" +checksum = "10558ed0bd2a1562e630926a2d1f0b98c827da99fabd3fe20920a59642504485" dependencies = [ "indexmap", "serde", @@ -4225,9 +4214,9 @@ dependencies = [ [[package]] name = "toml_write" -version = "0.1.1" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfb942dfe1d8e29a7ee7fcbde5bd2b9a25fb89aa70caea2eba3bee836ff41076" +checksum = "28391a4201ba7eb1984cfeb6862c0b3ea2cfe23332298967c749dddc0d6cd976" [[package]] name = "tower" diff --git a/Cargo.toml b/Cargo.toml index bcc8e0b..b6ac753 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,12 +8,12 @@ name = "sheepd" path = "src/sheepd.rs" [[bin]] -name = "herd" -path = "src/herd.rs" -required-features = ["herd"] +name = "homeserver" +path = "src/server.rs" +required-features = ["homeserver"] [features] -herd = ["axum", "axum-client-ip"] +homeserver = ["axum", "axum-client-ip"] axum = ["dep:axum"] [dependencies] @@ -34,5 +34,5 @@ based = { git = "https://git.hydrar.de/jmarya/based", branch = "owl" } http2 = "0.4.21" ureq = { version = "3.0.11", features = ["json"] } rumqttc = { version = "0.24.0", features = ["url", "websocket"] } -sage = { git = "https://git.hydrar.de/jmarya/sage" } -dashmap = "6.1.0" +age = { version = "0.11.1", features = ["aes", "aes-gcm", "armor", "async", "ssh"] } +minisign = "0.7.9" diff --git a/herd/config.toml b/homeserver/config.toml similarity index 100% rename from herd/config.toml rename to homeserver/config.toml diff --git a/justfile b/justfile index 4141b59..ba6b7a6 100644 --- a/justfile +++ b/justfile @@ -1,9 +1,3 @@ server: - cargo run --bin herd --features herd - -clean_server: - rm -rv herd/age.key herd/sign.key herd/db - -clean_client: - doas rm -rv /etc/sheepd + cargo run --bin homeserver --features homeserver diff --git a/src/api.rs b/src/api.rs index 32e83f7..b05ecf2 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,34 +1,34 @@ +use age::{Recipient, secrecy::ExposeSecret}; +use based::auth::{Sessions, User}; +use minisign::PublicKey; use owl::{Deserialize, Serialize}; +use owl::{prelude::*, query, save, set_global_db}; use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, Transport}; +use std::io::{Read, Seek}; use std::time::Duration; +use std::{ + io::{BufReader, Cursor}, + net::SocketAddr, + path::PathBuf, + str::FromStr, +}; use tokio::time::sleep; #[derive(Deserialize, Serialize)] -/// Join Request pub struct JoinParams { - /// Optional join token pub join_token: Option, - /// Machine ID pub machine_id: String, - /// Hostname pub hostname: String, - /// Public Key Identity pub identity: (String, String), } #[derive(Deserialize, Serialize)] pub struct JoinResponse { - /// Server Token pub token: String, - /// Server Identity pub identity: (String, String), - /// MQTT endpoint pub mqtt: String, } -/// Setup a MQTT connection for `machine_id` on `mqtt`. -/// -/// This will connect either over `ws://` or `wss://` depending on the scheme of `mqtt`. By default it will use `wss://`. pub fn mqtt_connect(machine_id: &str, mqtt: &str) -> (rumqttc::AsyncClient, rumqttc::EventLoop) { let mqttoptions = if mqtt.starts_with("ws://") { log::warn!("Using unencrypted WebSocket connection"); @@ -57,12 +57,7 @@ pub fn mqtt_connect(machine_id: &str, mqtt: &str) -> (rumqttc::AsyncClient, rumq AsyncClient::new(mqttoptions, 10) } -/// Run the async MQTT event loop -pub async fn run_event_loop(mut eventloop: EventLoop, handle_payload: F) -where - F: Fn(String, Vec) -> Fut + Send + Sync + 'static, - Fut: std::future::Future + Send + 'static, -{ +pub async fn run_event_loop(mut eventloop: EventLoop) { log::info!("Handling MQTT events"); loop { match eventloop.poll().await { @@ -70,9 +65,9 @@ where log::trace!("Incoming = {:?}", incoming); match incoming { Packet::Publish(publish) => { - log::info!("Got payload with size {}", publish.size()); let s = publish.payload; - tokio::spawn(handle_payload(publish.topic, s.to_vec())); + // TODO : client decryption here + println!("got payload {}", String::from_utf8(s.to_vec()).unwrap()); } _ => {} } @@ -88,20 +83,103 @@ where } } -#[derive(Deserialize, Serialize)] -/// Generic JSON API result -pub struct Result { - pub ok: u32, +pub struct Identity { + pub age: age::x25519::Identity, + pub sign: minisign::KeyPair, } -impl Result { - #[allow(non_snake_case)] - pub fn Ok() -> Self { - Self { ok: 1 } +fn save_parts(part1: &[u8], part2: &[u8]) -> Vec { + let mut vec = Vec::new(); + + let offset = part1.len() as u32; // assume it fits into u32 + vec.extend_from_slice(&offset.to_le_bytes()); // or to_be_bytes() for big-endian + + vec.extend_from_slice(part1); + vec.extend_from_slice(part2); + + vec +} + +fn load_parts(data: &[u8]) -> (Vec, Vec) { + let offset_bytes: [u8; 4] = data[..4].try_into().unwrap(); + let offset = u32::from_le_bytes(offset_bytes) as usize; + + let part1 = data[4..4 + offset].to_vec(); + let part2 = data[4 + offset..].to_vec(); + + (part1, part2) +} + +impl Identity { + pub fn new() -> Self { + let age = age::x25519::Identity::generate(); + let sign = minisign::KeyPair::generate_encrypted_keypair(Some(String::new())).unwrap(); + Self { age, sign } } - #[allow(non_snake_case)] - pub fn Err() -> Self { - Self { ok: 0 } + pub fn public(&self) -> (String, String) { + (self.pub_key_age(), self.pub_key_sign()) + } + + pub fn pub_key_age(&self) -> String { + self.age.to_public().to_string() + } + + pub fn pub_key_sign(&self) -> String { + self.sign.pk.to_box().unwrap().to_string() + } + + pub fn save(&self, dir: &PathBuf) { + let age_key = self.age.to_string(); + std::fs::write(dir.join("age.key"), age_key.expose_secret().to_string()).unwrap(); + + let kbx = self.sign.sk.to_box(None).unwrap(); + + std::fs::write(dir.join("sign.key"), kbx.into_string()).unwrap(); + log::info!("Saved identity to {dir:?}"); + } + + pub fn try_load(dir: &PathBuf) -> Option { + let age = + age::x25519::Identity::from_str(&std::fs::read_to_string(dir.join("age.key")).ok()?) + .unwrap(); + let kbx = minisign::SecretKeyBox::from_string( + &std::fs::read_to_string(dir.join("sign.key")).ok()?, + ) + .unwrap(); + let sign = kbx.into_secret_key(Some(String::new())).unwrap(); + let pk = minisign::PublicKey::from_secret_key(&sign).ok()?; + let kp = minisign::KeyPair { pk: pk, sk: sign }; + log::info!("Loaded identity from {dir:?}"); + Some(Self { age, sign: kp }) + } + + pub fn encrypt(&self, data: &[u8], recipient: &impl Recipient) -> Vec { + let signed = + minisign::sign(None, &self.sign.sk, data, Some("mynameisanubis"), None).unwrap(); + let signed = save_parts(&signed.to_bytes(), data); + let enc = age::encrypt(recipient, &signed).unwrap(); + enc + } + + pub fn decrypt(&self, data: &[u8], pk: &PublicKey) -> Option> { + let dec = age::decrypt(&self.age, data).unwrap(); + let (sig, data) = load_parts(&dec); + let sign_box = + minisign::SignatureBox::from_string(&String::from_utf8(sig).unwrap()).unwrap(); + if minisign::verify( + pk, + &sign_box, + BufReader::new(Cursor::new(data.clone())), + true, + false, + false, + ) + .is_ok() + { + return Some(data); + } + + None } } diff --git a/src/herd.rs b/src/herd.rs deleted file mode 100644 index 420a16d..0000000 --- a/src/herd.rs +++ /dev/null @@ -1,78 +0,0 @@ -use axum::{ - Router, - routing::{get, post}, -}; -use axum_client_ip::ClientIpSource; -use based::auth::User; -use owl::{prelude::*, set_global_db}; -use rand::RngCore; -use std::{net::SocketAddr, path::PathBuf}; -mod api; -mod herd_core; -use crate::herd_core::mqtt::{handle_mqtt, listen_to_devices}; -use herd_core::model::Machine; -use herd_core::{ - config::Config, - route::{join_device, login_user}, -}; -use sage::Identity; -use tokio::sync::OnceCell; - -pub static IDENTITY: OnceCell = OnceCell::const_new(); -pub static CONFIG: OnceCell = OnceCell::const_new(); - -fn generate_token() -> String { - let mut rng = rand::rng(); - let mut token = vec![0u8; 32]; - rng.fill_bytes(&mut token); - - hex::encode(token) -} - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt::init(); - let i = if let Some(i) = Identity::try_load(&PathBuf::from("./herd")) { - i - } else { - let i = Identity::new(); - i.save(&PathBuf::from("./herd")); - i - }; - let _ = crate::IDENTITY.set(i); - - let config = Config::default(); - let _ = crate::CONFIG.set(config); - - let db = Database::filesystem("./herd/db"); - set_global_db!(db); - - let _ = User::create("admin".to_string(), "admin", based::auth::UserRole::Admin).await; - - let device = Router::new() - .route("/join", post(join_device)) - .layer(ClientIpSource::ConnectInfo.into_extension()); // Direct IP - // .layer(ClientIpSource::XRealIp.into_extension()) // Proxy - - let user = Router::new().route("/login", post(login_user)); - - let app = Router::new().merge(device).merge(user); - - log::info!("Starting server"); - - let (client, eventloop) = api::mqtt_connect("server", &crate::CONFIG.get().unwrap().mqtt); - - listen_to_devices(&client).await; - - tokio::spawn(async { - let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap(); - axum::serve( - listener, - app.into_make_service_with_connect_info::(), - ) - .await - .unwrap(); - }); - - api::run_event_loop(eventloop, handle_mqtt).await; -} diff --git a/src/herd_core/mqtt.rs b/src/herd_core/mqtt.rs deleted file mode 100644 index 550bac0..0000000 --- a/src/herd_core/mqtt.rs +++ /dev/null @@ -1,68 +0,0 @@ -use crate::Machine; -use owl::prelude::*; -use owl::{Serialize, get, query}; -use rumqttc::AsyncClient; -use sage::PersonaIdentity; - -/// Handle herd MQTT -pub async fn handle_mqtt(topic: String, data: Vec) { - log::info!("Received client request from {topic}"); - let (client, cat) = topic.split_once('/').unwrap(); - let mac: Model = get!(client).unwrap(); - - let dec = crate::IDENTITY - .get() - .unwrap() - .decrypt(&data, &mac.read().identity.sign_key().unwrap()) - .unwrap(); - - // TODO : check for recency - println!("got raw: {}", String::from_utf8(dec.payload).unwrap()); - - match cat { - "online" => { - log::info!("Device {client} reported ONLINE"); - } - _ => {} - } -} - -/// Send a message to a registered `machine` -pub async fn send_msg(client: &AsyncClient, machine: &Model, request: T) { - let data = serde_json::to_string(&request).unwrap(); - let pk = &machine.read().identity; - let rec = pk.enc_key().unwrap(); - let machine_id = machine.read().id.to_string().replace("-", ""); - - let payload = crate::IDENTITY - .get() - .unwrap() - .encrypt(data.as_bytes(), &rec); - let topic = format!("{machine_id}/cmd"); - - client - .publish(topic, rumqttc::QoS::AtMostOnce, true, payload) - .await - .unwrap(); -} - -/// Subscribe to all `device->server` topics -pub async fn listen_to_device(client: &AsyncClient, machine_id: &str) { - // Online Presence - client - .subscribe(format!("{machine_id}/online"), rumqttc::QoS::AtMostOnce) - .await - .unwrap(); -} - -/// Subscibe to incoming messages from all registered machines -pub async fn listen_to_devices(client: &AsyncClient) { - let machines: Vec> = query!(|_| true); - for machine in machines { - let machine_id = machine.read().id.to_string(); - let machine_id = machine_id.trim().replace("-", ""); - log::info!("Sub to {machine_id}"); - - listen_to_device(client, &machine_id).await; - } -} diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..66629ca --- /dev/null +++ b/src/server.rs @@ -0,0 +1,122 @@ +use age::{Recipient, secrecy::ExposeSecret}; +use axum::{ + Json, Router, + http::StatusCode, + routing::{get, post}, +}; +use axum_client_ip::{ClientIp, ClientIpSource}; +use based::auth::{Sessions, User}; +use minisign::PublicKey; +use owl::{prelude::*, query, save, set_global_db}; +use rand::RngCore; +use rumqttc::{AsyncClient, Event}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::{ + io::{BufReader, Cursor}, + net::SocketAddr, + path::PathBuf, + str::FromStr, + time::Duration, +}; +mod api; +use std::io::{Read, Seek}; +mod server_core; +use crate::api::Identity; +use server_core::model::Machine; +use server_core::{ + config::Config, + route::{join_device, login_user}, +}; +use tokio::sync::OnceCell; + +pub static IDENTITY: OnceCell = OnceCell::const_new(); +pub static CONFIG: OnceCell = OnceCell::const_new(); + +fn generate_token() -> String { + let mut rng = rand::rng(); + let mut token = vec![0u8; 32]; + rng.fill_bytes(&mut token); + + hex::encode(token) +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let i = if let Some(i) = Identity::try_load(&PathBuf::from("./homeserver")) { + i + } else { + let i = Identity::new(); + i.save(&PathBuf::from("./homeserver")); + i + }; + let _ = crate::IDENTITY.set(i); + + let config = Config::default(); + let _ = crate::CONFIG.set(config); + + let db = Database::filesystem("./homeserver/db"); + set_global_db!(db); + + let _ = User::create("admin".to_string(), "admin", based::auth::UserRole::Admin).await; + + let device = Router::new() + .route("/join", post(join_device)) + .layer(ClientIpSource::ConnectInfo.into_extension()); // Direct IP + // .layer(ClientIpSource::XRealIp.into_extension()) // Proxy + + let user = Router::new().route("/login", post(login_user)); + + let app = Router::new().merge(device).merge(user); + + log::info!("Starting server"); + + let (client, mut eventloop) = api::mqtt_connect("server", &crate::CONFIG.get().unwrap().mqtt); + + tokio::spawn(async move { + loop { + let machines: Vec> = query!(|_| true); + for machine in machines { + let machine_id = machine.read().id.to_string(); + let machine_id = machine_id.trim(); + log::info!("Pushing topic to {machine_id}"); + client + .publish( + format!("{machine_id}/cmd"), + rumqttc::QoS::AtMostOnce, + false, + "Hello World".as_bytes(), + ) + .await + .unwrap(); + } + tokio::time::sleep(Duration::from_secs(5)).await; + } + }); + + tokio::spawn(async { + let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap(); + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .await + .unwrap(); + }); + + api::run_event_loop(eventloop).await; +} + +pub fn send_rpc(client: AsyncClient, Machine: &Model) { + // TODO : pub encryption here + client + .publish( + format!("{machine_id}/cmd"), + rumqttc::QoS::AtMostOnce, + true, + "Hello World".as_bytes(), + ) + .await + .unwrap(); +} diff --git a/src/herd_core/config.rs b/src/server_core/config.rs similarity index 63% rename from src/herd_core/config.rs rename to src/server_core/config.rs index 1e1094f..757fc11 100644 --- a/src/herd_core/config.rs +++ b/src/server_core/config.rs @@ -1,6 +1,5 @@ use serde::{Deserialize, Serialize}; -/// Herd Server Configuration #[derive(Serialize, Deserialize)] pub struct Config { /// Public MQTT endpoint @@ -9,6 +8,6 @@ pub struct Config { impl Default for Config { fn default() -> Self { - toml::from_str(&std::fs::read_to_string("./herd/config.toml").unwrap()).unwrap() + toml::from_str(&std::fs::read_to_string("./homeserver/config.toml").unwrap()).unwrap() } } diff --git a/src/herd_core/mod.rs b/src/server_core/mod.rs similarity index 76% rename from src/herd_core/mod.rs rename to src/server_core/mod.rs index a4bb294..bd39d83 100644 --- a/src/herd_core/mod.rs +++ b/src/server_core/mod.rs @@ -1,4 +1,3 @@ pub mod config; pub mod model; -pub mod mqtt; pub mod route; diff --git a/src/herd_core/model.rs b/src/server_core/model.rs similarity index 85% rename from src/herd_core/model.rs rename to src/server_core/model.rs index f4e8cbf..ceaef09 100644 --- a/src/herd_core/model.rs +++ b/src/server_core/model.rs @@ -2,18 +2,13 @@ use crate::api; use crate::generate_token; use owl::prelude::*; -/// Joined Machine #[model] pub struct Machine { - /// Machine ID pub id: Id, - /// Hostname pub hostname: String, - /// Token pub token: String, - pub next_token: Option, - /// Identity (Public Keys) pub identity: (String, String), + pub next_token: Option, } impl Machine { diff --git a/src/herd_core/route.rs b/src/server_core/route.rs similarity index 95% rename from src/herd_core/route.rs rename to src/server_core/route.rs index b9777d1..459e739 100644 --- a/src/herd_core/route.rs +++ b/src/server_core/route.rs @@ -1,6 +1,6 @@ use crate::api; use crate::api::JoinResponse; -use crate::herd_core::model::Machine; +use crate::server_core::model::Machine; use axum::Json; use axum::http::StatusCode; use axum_client_ip::ClientIp; @@ -27,7 +27,6 @@ pub async fn login_user(Json(payload): Json) -> (StatusCode, Json, diff --git a/src/sheepd.rs b/src/sheepd.rs index 9a47876..17b709b 100644 --- a/src/sheepd.rs +++ b/src/sheepd.rs @@ -1,16 +1,19 @@ -use sage::Identity; +use api::Identity; use sheepd_core::{ args::{SheepdArgs, SheepdCommand}, config::AgentConfig, }; mod api; mod sheepd_core; -use rumqttc::AsyncClient; -use tokio::sync::OnceCell; +use rumqttc::{AsyncClient, Event, MqttOptions, QoS, Transport}; +use std::{error::Error, path::PathBuf, time::Duration}; +use tokio::{ + sync::OnceCell, + task, + time::{self, sleep}, +}; pub static MQTT: OnceCell = OnceCell::const_new(); -pub static IDENTITY: OnceCell = OnceCell::const_new(); -pub static AGENT: OnceCell = OnceCell::const_new(); #[tokio::main] async fn main() { @@ -22,6 +25,42 @@ async fn main() { SheepdCommand::Join(join_command) => sheepd_core::cmd::join(join_command), } } else { - sheepd_core::daemon::start_daemon().await; + log::info!("Starting sheepd"); + + let conf = AgentConfig::try_load(); + if conf.is_none() { + log::error!("No config file at /etc/sheepd/config.toml"); + std::process::exit(1); + } + + let i = if let Some(i) = Identity::try_load(&PathBuf::from("/etc/sheepd")) { + i + } else { + let i = Identity::new(); + i.save(&PathBuf::from("/etc/sheepd")); + i + }; + + let conf = conf.unwrap(); + let machine_id = std::fs::read_to_string("/etc/machine-id").unwrap(); + let machine_id = machine_id.trim(); + + log::info!("Connecting to MQTT as {machine_id}"); + + let (client, mut eventloop) = api::mqtt_connect(machine_id, &conf.mqtt); + + crate::MQTT.set(client).unwrap(); + + log::info!("Connection done"); + + log::info!("Start sub for {}", format!("{machine_id}/cmd")); + crate::MQTT + .get() + .unwrap() + .subscribe(format!("{machine_id}/cmd"), QoS::AtMostOnce) + .await + .unwrap(); + + api::run_event_loop(eventloop).await; } } diff --git a/src/sheepd_core/cmd.rs b/src/sheepd_core/cmd.rs index 8b1c7ec..49aac4b 100644 --- a/src/sheepd_core/cmd.rs +++ b/src/sheepd_core/cmd.rs @@ -1,9 +1,7 @@ use std::path::PathBuf; -use sage::Identity; - use crate::{ - api::{self, JoinResponse}, + api::{self, Identity, JoinResponse}, sheepd_core::config::AgentConfig, }; @@ -17,7 +15,6 @@ fn domain(host: &str) -> String { } } -/// Join a herd as client pub fn join(conf: JoinCommand) { // TODO : check for root // TODO : check if joined somewhere already @@ -35,6 +32,7 @@ pub fn join(conf: JoinCommand) { }; let url = format!("{}/join", domain(&conf.home)); + println!("{url}"); let mut res = ureq::post(url) .send_json(&api::JoinParams { join_token: None, diff --git a/src/sheepd_core/config.rs b/src/sheepd_core/config.rs index ad217a9..277a9c5 100644 --- a/src/sheepd_core/config.rs +++ b/src/sheepd_core/config.rs @@ -2,18 +2,12 @@ use owl::{Deserialize, Serialize}; use crate::api::JoinResponse; -/// Client agent configuration at `/etc/sheepd/config.toml` -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize)] pub struct AgentConfig { - /// Homeserver Domain pub home: String, - /// Homeserver Token pub token: String, - /// MQTT endpoint pub mqtt: String, - /// Server Public Encryption Key pub server_age: String, - /// Server Public Sign Key pub server_sign: String, } diff --git a/src/sheepd_core/daemon.rs b/src/sheepd_core/daemon.rs deleted file mode 100644 index 2f84c35..0000000 --- a/src/sheepd_core/daemon.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::{path::PathBuf, time::Duration}; - -use rumqttc::{AsyncClient, QoS}; -use sage::Identity; -use serde_json::json; - -use crate::{api, sheepd_core::config::AgentConfig}; - -use super::mqtt::send_back; - -/// Report online status back to the server. -/// By default every 60 seconds. -pub async fn report_online(client: AsyncClient) { - loop { - send_back(&client, "online", json!(crate::api::Result::Ok())).await; - tokio::time::sleep(Duration::from_secs(60)).await; - } -} - -/// Start the daemon loop -pub async fn start_daemon() { - log::info!("Starting sheepd"); - - let conf = AgentConfig::try_load(); - if conf.is_none() { - log::error!("No config file at /etc/sheepd/config.toml"); - std::process::exit(1); - } - - let i = if let Some(i) = Identity::try_load(&PathBuf::from("/etc/sheepd")) { - i - } else { - let i = Identity::new(); - i.save(&PathBuf::from("/etc/sheepd")); - i - }; - let _ = crate::IDENTITY.set(i); - - let conf = conf.unwrap(); - crate::AGENT.set(conf).unwrap(); - let machine_id = std::fs::read_to_string("/etc/machine-id").unwrap(); - let machine_id = machine_id.trim(); - - log::info!("Connecting to MQTT as {machine_id}"); - - let (client, eventloop) = api::mqtt_connect(machine_id, &crate::AGENT.get().unwrap().mqtt); - - crate::MQTT.set(client.clone()).unwrap(); - - log::info!("Connection done"); - - tokio::task::spawn(report_online(client.clone())); - - log::info!("Listen on {}", format!("{machine_id}/cmd")); - crate::MQTT - .get() - .unwrap() - .subscribe(format!("{machine_id}/cmd"), QoS::AtMostOnce) - .await - .unwrap(); - - api::run_event_loop(eventloop, crate::sheepd_core::mqtt::handle_mqtt).await; -} diff --git a/src/sheepd_core/mod.rs b/src/sheepd_core/mod.rs index fa1660d..e7ee58d 100644 --- a/src/sheepd_core/mod.rs +++ b/src/sheepd_core/mod.rs @@ -1,5 +1,3 @@ pub mod args; pub mod cmd; pub mod config; -pub mod daemon; -pub mod mqtt; diff --git a/src/sheepd_core/mqtt.rs b/src/sheepd_core/mqtt.rs deleted file mode 100644 index dea12e6..0000000 --- a/src/sheepd_core/mqtt.rs +++ /dev/null @@ -1,43 +0,0 @@ -use owl::Serialize; -use rumqttc::AsyncClient; -use sage::PersonaIdentity; - -// Client MQTT -pub async fn handle_mqtt(topic: String, data: Vec) { - //println!("got real raw: {}", String::from_utf8_lossy(&data)); - let pk = ( - String::new(), - crate::AGENT.get().unwrap().server_sign.clone(), - ); - let pk = pk.sign_key().unwrap(); - let payload = crate::IDENTITY.get().unwrap().decrypt(&data, &pk).unwrap(); - println!( - "got payload {}", - String::from_utf8(payload.payload).unwrap() - ); -} - -/// Send something back to the server on `topic` -pub async fn send_back(client: &AsyncClient, topic: &str, request: T) { - let data = serde_json::to_string(&request).unwrap(); - - let pk = crate::AGENT.get().unwrap(); - let pk = (pk.server_age.clone(), String::new()); - let rec = pk.enc_key().unwrap(); - let machine_id = std::fs::read_to_string("/etc/machine-id") - .unwrap() - .trim() - .to_string(); - - let payload = crate::IDENTITY - .get() - .unwrap() - .encrypt(data.as_bytes(), &rec); - let topic = format!("{machine_id}/{topic}"); - - log::info!("Publish to {machine_id}{topic}"); - client - .publish(topic, rumqttc::QoS::AtMostOnce, true, payload) - .await - .unwrap(); -}