mqtt connections

almost working, need encryption
This commit is contained in:
JMARyA 2025-04-28 23:33:11 +02:00
parent 20ab0153d1
commit 125d50530d
Signed by: jmarya
GPG key ID: 901B2ADDF27C2263
15 changed files with 1259 additions and 42 deletions

5
.gitignore vendored
View file

@ -1 +1,6 @@
/target /target
/mosquitto/data
/mosquitto/log
/homeserver/age.key
/homeserver/sign.key
/homeserver/db

848
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -33,3 +33,6 @@ rand = "0.9.1"
based = { git = "https://git.hydrar.de/jmarya/based", branch = "owl" } based = { git = "https://git.hydrar.de/jmarya/based", branch = "owl" }
http2 = "0.4.21" http2 = "0.4.21"
ureq = { version = "3.0.11", features = ["json"] } ureq = { version = "3.0.11", features = ["json"] }
rumqttc = { version = "0.24.0", features = ["url", "websocket"] }
age = { version = "0.11.1", features = ["aes", "aes-gcm", "armor", "async", "ssh"] }
minisign = "0.7.9"

12
docker-compose.yml Normal file
View file

@ -0,0 +1,12 @@
services:
mosquitto:
image: eclipse-mosquitto:2
container_name: mosquitto
ports:
#- "1883:1883" # MQTT (classic TCP)
- "8083:8083" # MQTT over WebSocket
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
restart: unless-stopped

1
homeserver/config.toml Normal file
View file

@ -0,0 +1 @@
mqtt = "ws://127.0.0.1:8083"

3
justfile Normal file
View file

@ -0,0 +1,3 @@
server:
cargo run --bin homeserver --features homeserver

View file

@ -0,0 +1,17 @@
# mosquitto.conf
# Standard MQTT listener (for regular clients, optional)
listener 1883
protocol mqtt
# WebSocket MQTT listener
listener 8083
protocol websockets
# Allow anonymous connections (testing)
allow_anonymous true
max_packet_size 10485760 # 10 MB
# Logging
log_dest stdout

View file

@ -1,8 +1,185 @@
use age::{Recipient, secrecy::ExposeSecret};
use based::auth::{Sessions, User};
use minisign::PublicKey;
use owl::{Deserialize, Serialize}; use owl::{Deserialize, Serialize};
use owl::{prelude::*, query, save, set_global_db};
use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, Transport};
use std::io::{Read, Seek};
use std::time::Duration;
use std::{
io::{BufReader, Cursor},
net::SocketAddr,
path::PathBuf,
str::FromStr,
};
use tokio::time::sleep;
#[derive(Deserialize, Serialize)] #[derive(Deserialize, Serialize)]
pub struct JoinParams { pub struct JoinParams {
pub join_token: Option<String>, pub join_token: Option<String>,
pub machine_id: String, pub machine_id: String,
pub hostname: String, pub hostname: String,
pub identity: (String, String),
}
#[derive(Deserialize, Serialize)]
pub struct JoinResponse {
pub token: String,
pub identity: (String, String),
pub mqtt: String,
}
pub fn mqtt_connect(machine_id: &str, mqtt: &str) -> (rumqttc::AsyncClient, rumqttc::EventLoop) {
let mqttoptions = if mqtt.starts_with("ws://") {
log::warn!("Using unencrypted WebSocket connection");
let mut mqttoptions = MqttOptions::new(
machine_id,
&format!("ws://{}", mqtt.trim_start_matches("ws://")),
8000,
);
mqttoptions.set_transport(Transport::Ws);
mqttoptions.set_keep_alive(Duration::from_secs(60));
mqttoptions
} else {
log::info!("Using encrypted WebSocket connection");
let mut mqttoptions = MqttOptions::new(
machine_id,
&format!("wss://{}", mqtt.trim_start_matches("wss://")),
8000,
);
mqttoptions.set_transport(Transport::wss_with_default_config());
mqttoptions.set_keep_alive(Duration::from_secs(60));
mqttoptions
};
AsyncClient::new(mqttoptions, 10)
}
pub async fn run_event_loop(mut eventloop: EventLoop) {
log::info!("Handling MQTT events");
loop {
match eventloop.poll().await {
Ok(Event::Incoming(incoming)) => {
log::trace!("Incoming = {:?}", incoming);
match incoming {
Packet::Publish(publish) => {
let s = publish.payload;
// TODO : client decryption here
println!("got payload {}", String::from_utf8(s.to_vec()).unwrap());
}
_ => {}
}
}
Ok(Event::Outgoing(outgoing)) => {
log::trace!("Outgoing = {:?}", outgoing);
}
Err(e) => {
log::error!("MQTT eventloop error = {:?}", e);
sleep(Duration::from_secs(1)).await;
}
}
}
}
pub struct Identity {
pub age: age::x25519::Identity,
pub sign: minisign::KeyPair,
}
fn save_parts(part1: &[u8], part2: &[u8]) -> Vec<u8> {
let mut vec = Vec::new();
let offset = part1.len() as u32; // assume it fits into u32
vec.extend_from_slice(&offset.to_le_bytes()); // or to_be_bytes() for big-endian
vec.extend_from_slice(part1);
vec.extend_from_slice(part2);
vec
}
fn load_parts(data: &[u8]) -> (Vec<u8>, Vec<u8>) {
let offset_bytes: [u8; 4] = data[..4].try_into().unwrap();
let offset = u32::from_le_bytes(offset_bytes) as usize;
let part1 = data[4..4 + offset].to_vec();
let part2 = data[4 + offset..].to_vec();
(part1, part2)
}
impl Identity {
pub fn new() -> Self {
let age = age::x25519::Identity::generate();
let sign = minisign::KeyPair::generate_encrypted_keypair(Some(String::new())).unwrap();
Self { age, sign }
}
pub fn public(&self) -> (String, String) {
(self.pub_key_age(), self.pub_key_sign())
}
pub fn pub_key_age(&self) -> String {
self.age.to_public().to_string()
}
pub fn pub_key_sign(&self) -> String {
self.sign.pk.to_box().unwrap().to_string()
}
pub fn save(&self, dir: &PathBuf) {
let age_key = self.age.to_string();
std::fs::write(dir.join("age.key"), age_key.expose_secret().to_string()).unwrap();
let kbx = self.sign.sk.to_box(None).unwrap();
std::fs::write(dir.join("sign.key"), kbx.into_string()).unwrap();
log::info!("Saved identity to {dir:?}");
}
pub fn try_load(dir: &PathBuf) -> Option<Self> {
let age =
age::x25519::Identity::from_str(&std::fs::read_to_string(dir.join("age.key")).ok()?)
.unwrap();
let kbx = minisign::SecretKeyBox::from_string(
&std::fs::read_to_string(dir.join("sign.key")).ok()?,
)
.unwrap();
let sign = kbx.into_secret_key(Some(String::new())).unwrap();
let pk = minisign::PublicKey::from_secret_key(&sign).ok()?;
let kp = minisign::KeyPair { pk: pk, sk: sign };
log::info!("Loaded identity from {dir:?}");
Some(Self { age, sign: kp })
}
pub fn encrypt(&self, data: &[u8], recipient: &impl Recipient) -> Vec<u8> {
let signed =
minisign::sign(None, &self.sign.sk, data, Some("mynameisanubis"), None).unwrap();
let signed = save_parts(&signed.to_bytes(), data);
let enc = age::encrypt(recipient, &signed).unwrap();
enc
}
pub fn decrypt(&self, data: &[u8], pk: &PublicKey) -> Option<Vec<u8>> {
let dec = age::decrypt(&self.age, data).unwrap();
let (sig, data) = load_parts(&dec);
let sign_box =
minisign::SignatureBox::from_string(&String::from_utf8(sig).unwrap()).unwrap();
if minisign::verify(
pk,
&sign_box,
BufReader::new(Cursor::new(data.clone())),
true,
false,
false,
)
.is_ok()
{
return Some(data);
}
None
}
} }

View file

@ -1,3 +1,4 @@
use age::{Recipient, secrecy::ExposeSecret};
use axum::{ use axum::{
Json, Router, Json, Router,
http::StatusCode, http::StatusCode,
@ -5,14 +6,32 @@ use axum::{
}; };
use axum_client_ip::{ClientIp, ClientIpSource}; use axum_client_ip::{ClientIp, ClientIpSource};
use based::auth::{Sessions, User}; use based::auth::{Sessions, User};
use owl::{prelude::*, save, set_global_db}; use minisign::PublicKey;
use owl::{prelude::*, query, save, set_global_db};
use rand::RngCore; use rand::RngCore;
use rumqttc::{AsyncClient, Event};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use std::net::SocketAddr; use std::{
io::{BufReader, Cursor},
net::SocketAddr,
path::PathBuf,
str::FromStr,
time::Duration,
};
mod api; mod api;
use std::io::{Read, Seek};
mod server_core; mod server_core;
use server_core::route::{join_device, login_user}; use crate::api::Identity;
use server_core::model::Machine;
use server_core::{
config::Config,
route::{join_device, login_user},
};
use tokio::sync::OnceCell;
pub static IDENTITY: OnceCell<Identity> = OnceCell::const_new();
pub static CONFIG: OnceCell<Config> = OnceCell::const_new();
fn generate_token() -> String { fn generate_token() -> String {
let mut rng = rand::rng(); let mut rng = rand::rng();
@ -25,13 +44,22 @@ fn generate_token() -> String {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let i = if let Some(i) = Identity::try_load(&PathBuf::from("./homeserver")) {
i
} else {
let i = Identity::new();
i.save(&PathBuf::from("./homeserver"));
i
};
let _ = crate::IDENTITY.set(i);
let db = Database::in_memory(); let config = Config::default();
let _ = crate::CONFIG.set(config);
let db = Database::filesystem("./homeserver/db");
set_global_db!(db); set_global_db!(db);
User::create("admin".to_string(), "admin", based::auth::UserRole::Admin) let _ = User::create("admin".to_string(), "admin", based::auth::UserRole::Admin).await;
.await
.unwrap();
let device = Router::new() let device = Router::new()
.route("/join", post(join_device)) .route("/join", post(join_device))
@ -44,6 +72,30 @@ async fn main() {
log::info!("Starting server"); log::info!("Starting server");
let (client, mut eventloop) = api::mqtt_connect("server", &crate::CONFIG.get().unwrap().mqtt);
tokio::spawn(async move {
loop {
let machines: Vec<Model<Machine>> = query!(|_| true);
for machine in machines {
let machine_id = machine.read().id.to_string();
let machine_id = machine_id.trim();
log::info!("Pushing topic to {machine_id}");
client
.publish(
format!("{machine_id}/cmd"),
rumqttc::QoS::AtMostOnce,
false,
"Hello World".as_bytes(),
)
.await
.unwrap();
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
});
tokio::spawn(async {
let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap(); let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap();
axum::serve( axum::serve(
listener, listener,
@ -51,4 +103,20 @@ async fn main() {
) )
.await .await
.unwrap(); .unwrap();
});
api::run_event_loop(eventloop).await;
}
pub fn send_rpc(client: AsyncClient, Machine: &Model<Machine>) {
// TODO : pub encryption here
client
.publish(
format!("{machine_id}/cmd"),
rumqttc::QoS::AtMostOnce,
true,
"Hello World".as_bytes(),
)
.await
.unwrap();
} }

View file

@ -1,10 +1,13 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct Config {} pub struct Config {
/// Public MQTT endpoint
pub mqtt: String,
}
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
toml::from_str(&std::fs::read_to_string("./config.toml").unwrap()).unwrap() toml::from_str(&std::fs::read_to_string("./homeserver/config.toml").unwrap()).unwrap()
} }
} }

View file

@ -7,14 +7,16 @@ pub struct Machine {
pub id: Id, pub id: Id,
pub hostname: String, pub hostname: String,
pub token: String, pub token: String,
pub identity: (String, String),
pub next_token: Option<String>, pub next_token: Option<String>,
} }
impl Machine { impl Machine {
pub fn from_join_param(join: api::JoinParams) -> Self { pub fn from_join_param(join: api::JoinParams) -> Self {
Self { Self {
id: Id::String(join.machine_id), id: Id::String(join.machine_id.trim().to_string()),
hostname: join.hostname.trim().to_string(), hostname: join.hostname.trim().to_string(),
identity: join.identity,
token: generate_token(), token: generate_token(),
next_token: None, next_token: None,
} }

View file

@ -1,4 +1,5 @@
use crate::api; use crate::api;
use crate::api::JoinResponse;
use crate::server_core::model::Machine; use crate::server_core::model::Machine;
use axum::Json; use axum::Json;
use axum::http::StatusCode; use axum::http::StatusCode;
@ -44,5 +45,14 @@ pub async fn join_device(
save!(machine); save!(machine);
(StatusCode::OK, Json(json!({"ok": new_token}))) let i = crate::IDENTITY.get().unwrap();
(
StatusCode::OK,
Json(json!(JoinResponse {
token: new_token,
identity: i.public(),
mqtt: crate::CONFIG.get().unwrap().mqtt.clone()
})),
)
} }

View file

@ -1,8 +1,22 @@
use sheepd_core::args::{SheepdArgs, SheepdCommand}; use api::Identity;
use sheepd_core::{
args::{SheepdArgs, SheepdCommand},
config::AgentConfig,
};
mod api; mod api;
mod sheepd_core; mod sheepd_core;
use rumqttc::{AsyncClient, Event, MqttOptions, QoS, Transport};
use std::{error::Error, path::PathBuf, time::Duration};
use tokio::{
sync::OnceCell,
task,
time::{self, sleep},
};
fn main() { pub static MQTT: OnceCell<AsyncClient> = OnceCell::const_new();
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();
let args: SheepdArgs = argh::from_env(); let args: SheepdArgs = argh::from_env();
@ -13,6 +27,40 @@ fn main() {
} else { } else {
log::info!("Starting sheepd"); log::info!("Starting sheepd");
// TODO : daemon loop let conf = AgentConfig::try_load();
if conf.is_none() {
log::error!("No config file at /etc/sheepd/config.toml");
std::process::exit(1);
}
let i = if let Some(i) = Identity::try_load(&PathBuf::from("/etc/sheepd")) {
i
} else {
let i = Identity::new();
i.save(&PathBuf::from("/etc/sheepd"));
i
};
let conf = conf.unwrap();
let machine_id = std::fs::read_to_string("/etc/machine-id").unwrap();
let machine_id = machine_id.trim();
log::info!("Connecting to MQTT as {machine_id}");
let (client, mut eventloop) = api::mqtt_connect(machine_id, &conf.mqtt);
crate::MQTT.set(client).unwrap();
log::info!("Connection done");
log::info!("Start sub for {}", format!("{machine_id}/cmd"));
crate::MQTT
.get()
.unwrap()
.subscribe(format!("{machine_id}/cmd"), QoS::AtMostOnce)
.await
.unwrap();
api::run_event_loop(eventloop).await;
} }
} }

View file

@ -1,37 +1,53 @@
use crate::{api, sheepd_core::config::AgentConfig}; use std::path::PathBuf;
use crate::{
api::{self, Identity, JoinResponse},
sheepd_core::config::AgentConfig,
};
use super::args::JoinCommand; use super::args::JoinCommand;
fn domain(host: &str) -> String {
if host.starts_with("http") {
return host.to_string();
} else {
format!("https://{host}")
}
}
pub fn join(conf: JoinCommand) { pub fn join(conf: JoinCommand) {
// TODO : check for root // TODO : check for root
// TODO : check if joined somewhere already // TODO : check if joined somewhere already
log::info!("Joining to {}", conf.home); log::info!("Joining to {}", conf.home);
let url = format!("http://{}/join", conf.home); let _ = std::fs::create_dir_all("/etc/sheepd");
let i = if let Some(i) = Identity::try_load(&PathBuf::from("/etc/sheepd")) {
i
} else {
let i = Identity::new();
i.save(&PathBuf::from("/etc/sheepd"));
i
};
let url = format!("{}/join", domain(&conf.home));
println!("{url}"); println!("{url}");
let mut res = ureq::post(url) let mut res = ureq::post(url)
.send_json(&api::JoinParams { .send_json(&api::JoinParams {
join_token: None, join_token: None,
machine_id: std::fs::read_to_string("/etc/machine-id").unwrap(), machine_id: std::fs::read_to_string("/etc/machine-id").unwrap(),
hostname: std::fs::read_to_string("/etc/hostname").unwrap(), hostname: std::fs::read_to_string("/etc/hostname").unwrap(),
identity: i.public(),
}) })
.unwrap(); .unwrap();
let res: serde_json::Value = res.body_mut().read_json().unwrap(); let res: JoinResponse = res.body_mut().read_json().unwrap();
let token = res
.as_object()
.unwrap()
.get("ok")
.unwrap()
.as_str()
.unwrap();
log::info!("Joined {} successfully", conf.home); log::info!("Joined {} successfully", conf.home);
std::fs::write( std::fs::write(
"/etc/sheepd.toml", "/etc/sheepd/config.toml",
toml::to_string(&AgentConfig::new(&conf.home, token)).unwrap(), toml::to_string(&AgentConfig::new(&conf.home, res)).unwrap(),
) )
.unwrap(); .unwrap();
} }

View file

@ -1,16 +1,28 @@
use owl::{Deserialize, Serialize}; use owl::{Deserialize, Serialize};
use crate::api::JoinResponse;
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
pub struct AgentConfig { pub struct AgentConfig {
pub home: String, pub home: String,
pub token: String, pub token: String,
pub mqtt: String,
pub server_age: String,
pub server_sign: String,
} }
impl AgentConfig { impl AgentConfig {
pub fn new(home_server: &str, token: &str) -> Self { pub fn try_load() -> Option<Self> {
toml::from_str(&std::fs::read_to_string("/etc/sheepd/config.toml").ok()?).ok()
}
pub fn new(home: &str, join: JoinResponse) -> Self {
Self { Self {
home: home_server.to_string(), token: join.token,
token: token.to_string(), mqtt: join.mqtt,
home: home.to_string(),
server_age: join.identity.0,
server_sign: join.identity.1,
} }
} }
} }