diff --git a/src/api.rs b/src/api.rs index 7956a32..ab26d36 100644 --- a/src/api.rs +++ b/src/api.rs @@ -17,7 +17,7 @@ pub struct LoginParam { pub password: String, } -#[derive(Deserialize, Serialize)] +#[derive(Deserialize, Serialize, Clone)] /// Join Request pub struct JoinParams { /// Optional join token @@ -150,6 +150,14 @@ impl Result { Err(self.err.unwrap()) } } + + #[allow(non_snake_case)] + pub fn Err(msg: &str) -> Self { + Self { + ok: None, + err: Some(msg.to_string()), + } + } } impl Result { @@ -160,14 +168,6 @@ impl Result { err: None, } } - - #[allow(non_snake_case)] - pub fn Err(msg: &str) -> Self { - Self { - ok: None, - err: Some(msg.to_string()), - } - } } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/herd.rs b/src/herd.rs index 39bb952..f9767cc 100644 --- a/src/herd.rs +++ b/src/herd.rs @@ -4,12 +4,12 @@ use axum::{ routing::{get, post}, }; use axum_client_ip::ClientIpSource; -use based::auth::User; use dashmap::DashMap; use owl::{prelude::*, set_global_db}; use rand::RngCore; use std::{net::SocketAddr, path::PathBuf}; mod api; +use based_auth::User; mod herd_core; use crate::herd_core::mqtt::{handle_mqtt, listen_to_devices}; use herd_core::{ @@ -17,6 +17,7 @@ use herd_core::{ route::{device_get_api, device_shell_cmd, join_device, login_user}, }; use herd_core::{model::Machine, route::devices_list}; +use rumqttc::AsyncClient; use sage::Identity; use tokio::sync::OnceCell; @@ -57,7 +58,7 @@ async fn main() { let db = Database::filesystem("./herd/db"); set_global_db!(db); - let _ = User::create("admin".to_string(), "admin", based::auth::UserRole::Admin).await; + let _ = User::create("admin".to_string(), "admin", based_auth::UserRole::Admin).await; let device = Router::new() .route("/join", post(join_device)) @@ -67,7 +68,7 @@ async fn main() { let user = Router::new() .route("/login", post(login_user)) .route("/device/{device_id}", get(device_get_api)) - .route("/device/{device_id}/shell", post(device_shell_cmd)) + // .route("/device/{device_id}/shell", post(device_shell_cmd)) .route("/devices", get(devices_list)); let app = Router::new().merge(device).merge(user); diff --git a/src/herd_core/mqtt.rs b/src/herd_core/mqtt.rs index a13095b..b3ca2e2 100644 --- a/src/herd_core/mqtt.rs +++ b/src/herd_core/mqtt.rs @@ -44,7 +44,11 @@ pub async fn handle_mqtt(topic: String, data: Vec) { let resp: ServerResponse = serde_json::from_slice(&dec.payload).unwrap(); log::info!("Got response {:?}", resp); - let entry = crate::DISPATCH.get().unwrap().get(&resp.id).unwrap(); + let entry = crate::DISPATCH + .get() + .unwrap() + .get(&resp.id.to_string()) + .unwrap(); entry.send(resp); } _ => {} @@ -85,8 +89,11 @@ pub async fn send_msg( .await .unwrap(); - let (sender, recv) = tokio::sync::mpsc::channel(100); - crate::DISPATCH.get().unwrap().insert(request.id, sender); + let (sender, recv) = crossbeam::channel::bounded(100); + crate::DISPATCH + .get() + .unwrap() + .insert(request.id.to_string(), sender); TaskWaiter { id: request.id, diff --git a/src/herd_core/route.rs b/src/herd_core/route.rs index 01ba4fd..1171464 100644 --- a/src/herd_core/route.rs +++ b/src/herd_core/route.rs @@ -2,21 +2,22 @@ use std::ops::Deref; use crate::api; use crate::api::ClientAction; +use crate::api::ClientActions; use crate::api::JoinResponse; use crate::api::LoginParam; -use crate::api::MachineAPI; use crate::api::ShellResponse; use crate::herd_core::model::Machine; use crate::herd_core::mqtt::listen_to_device; use axum::Json; use axum::extract::FromRequestParts; +use axum::extract::Path; use axum::http::StatusCode; use axum_client_ip::ClientIp; use axum_extra::TypedHeader; use axum_extra::headers::Authorization; use axum_extra::headers::authorization::Bearer; -use based::auth::Sessions; -use based::auth::User; +use based_auth::Sessions; +use based_auth::User; use owl::get; use owl::prelude::Model; use owl::query; @@ -25,13 +26,12 @@ use serde::Deserialize; use serde_json::json; use sheepd::DeviceEntry; use sheepd::DeviceList; -use ureq::http::StatusCode; use super::mqtt::is_within_80_seconds; use super::mqtt::send_msg; pub async fn device_shell_cmd( - Path((device_id)): Path<(String)>, + Path(device_id): Path, Json(payload): Json, session: TypedHeader>, ) -> (StatusCode, Json>) { @@ -41,7 +41,7 @@ pub async fn device_shell_cmd( if let Some(machine) = machine { let resp = send_msg( - crate::MQTT.get(), + crate::MQTT.get().unwrap(), &machine, ClientAction::new(ClientActions::Shell(payload.cmd, payload.cwd)), ) @@ -64,31 +64,32 @@ pub async fn device_shell_cmd( } pub async fn device_get_api( - Path((device_id)): Path<(String)>, + Path(device_id): Path, session: TypedHeader>, ) -> (StatusCode, Json>) { // TODO : check auth - let machine: Option> = get!(device_id); + let machine: Option> = get!(device_id.clone()); if let Some(machine) = machine { let api = machine.read(); let api = DeviceEntry { - id: device_id, - hostname: api.hostname, + id: device_id.clone(), + hostname: api.hostname.clone(), online: device_online(&device_id), }; (StatusCode::OK, Json(api::Result::OkVal(api))) } else { - (StatusCode::NOT_FOUND, Json(api::Result::Err("Not Found"))) + let res = api::Result::::Err("Not Found"); + (StatusCode::NOT_FOUND, Json(res)) } } -pub fn device_online(id: &str) -> bool { +pub fn device_online(id: &String) -> bool { crate::ONLINE .get() .unwrap() - .get(&id) + .get(id) .map(|x| is_within_80_seconds(*x.deref())) .unwrap_or(false) } @@ -129,7 +130,7 @@ pub async fn login_user(Json(payload): Json) -> (StatusCode, Json::Err("invalid"))), ) } } @@ -148,10 +149,10 @@ pub async fn join_device( payload.machine_id ); - let machine = Machine::from_join_param(payload); + let machine = Machine::from_join_param(payload.clone()); let new_token = machine.token.clone(); - listen_to_device(crate::MQTT.get(), &payload.machine_id).await; + listen_to_device(crate::MQTT.get().unwrap(), &payload.machine_id).await; save!(machine);