From 6c5ab4d244076e5747bf7d91a524e7a1fee22eb7 Mon Sep 17 00:00:00 2001 From: Arne Beer Date: Fri, 15 May 2020 17:55:49 +0200 Subject: [PATCH] Clean up state reset and log handling --- client/message.rs | 4 ++-- daemon/instructions.rs | 5 ++--- daemon/main.rs | 4 ++++ daemon/task_handler.rs | 32 +++++++++++++++++--------------- shared/state.rs | 15 ++++++++++----- 5 files changed, 35 insertions(+), 25 deletions(-) diff --git a/client/message.rs b/client/message.rs index a79867b..98206bc 100644 --- a/client/message.rs +++ b/client/message.rs @@ -7,8 +7,8 @@ use ::pueue::settings::Settings; use crate::cli::{Opt, SubCommand}; -// Convert and pre-process the sub-command into a valid message -// that can be understood by the daemon. +/// Convert the cli command into the message that's being sent to the server, +/// so it can be understood by the daemon. pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result { match &opt.cmd { SubCommand::Add { diff --git a/daemon/instructions.rs b/daemon/instructions.rs index f90373e..370a3ab 100644 --- a/daemon/instructions.rs +++ b/daemon/instructions.rs @@ -28,7 +28,7 @@ pub fn handle_message(message: Message, sender: &Sender, state: &Shared Message::Group(message) => group(message, state), Message::Clean => clean(state), - Message::Reset => reset(sender, state), + Message::Reset => reset(sender), Message::Status => get_status(state), Message::Log(message) => get_log(message, state), Message::Parallel(message) => set_parallel_tasks(message, state), @@ -485,9 +485,8 @@ fn clean(state: &SharedState) -> Message { /// Invoked when calling `pueue reset`. /// Forward the reset request to the task handler. /// The handler then kills all children and clears the task queue. -fn reset(sender: &Sender, state: &SharedState) -> Message { +fn reset(sender: &Sender) -> Message { sender.send(Message::Reset).expect(SENDER_ERR); - clean(state); create_success_message("Everything is being reset right now.") } diff --git a/daemon/main.rs b/daemon/main.rs index 64432d8..05497f1 100644 --- a/daemon/main.rs +++ b/daemon/main.rs @@ -68,6 +68,7 @@ async fn main() -> Result<()> { /// Initialize all directories needed for normal operation. fn init_directories(path: &str) { + // Pueue base path let pueue_dir = Path::new(path); if !pueue_dir.exists() { if let Err(error) = create_dir_all(&pueue_dir) { @@ -77,6 +78,8 @@ fn init_directories(path: &str) { ); } } + + // Task log dir let log_dir = pueue_dir.join("log"); if !log_dir.exists() { if let Err(error) = create_dir_all(&log_dir) { @@ -87,6 +90,7 @@ fn init_directories(path: &str) { } } + // Task log dir let logs_dir = pueue_dir.join("task_logs"); if !logs_dir.exists() { if let Err(error) = create_dir_all(&logs_dir) { diff --git a/daemon/task_handler.rs b/daemon/task_handler.rs index 4168941..76d0e82 100644 --- a/daemon/task_handler.rs +++ b/daemon/task_handler.rs @@ -82,6 +82,7 @@ impl TaskHandler { loop { self.receive_commands(); self.handle_finished_tasks(); + self.handle_reset(); self.check_callbacks(); self.check_stashed(); self.check_failed_dependencies(); @@ -180,6 +181,20 @@ impl TaskHandler { .map(|(id, _)| *id) } + /// Users can issue to reset the daemon. + /// If that's the case, the `self.reset` flag is set to true, all children are killed + /// and no new tasks will be spawned. + /// This function checks, if all killed children have been handled. + /// If that's the case, completely reset the state + fn handle_reset(&mut self) { + // The daemon got a reset request and all children already finished + if self.reset && self.children.is_empty() { + let mut state = self.state.lock().unwrap(); + state.reset(); + self.reset = false; + } + } + /// See if we can start a new queued task. fn check_new(&mut self) -> Result<()> { // Get the next task id that can be started @@ -336,13 +351,6 @@ impl TaskHandler { fn handle_finished_tasks(&mut self) { let (finished, errored) = self.get_finished(); - // The daemon got a reset request and all children already finished - if self.reset && self.children.is_empty() { - let mut state = self.state.lock().unwrap(); - state.reset(); - self.reset = false; - } - // Nothing to do. Early return if finished.is_empty() && errored.is_empty() { return; @@ -391,7 +399,7 @@ impl TaskHandler { // Pause the daemon, if the settings say so and some process failed if failed_task_exists && self.pause_on_failure { - self.change_running(false); + state.running = false; } state.save() @@ -421,6 +429,7 @@ impl TaskHandler { } /// Some client instructions require immediate action by the task handler + /// This function is also responsible for waiting fn receive_commands(&mut self) { // Sleep for a few milliseconds. We don't want to hurt the CPU. let timeout = Duration::from_millis(100); @@ -729,13 +738,6 @@ impl TaskHandler { self.reset = true; } - /// Change the running state consistently. - fn change_running(&mut self, running: bool) { - let mut state = self.state.lock().unwrap(); - state.running = running; - state.save(); - } - /// Users can specify a callback that's fired whenever a task finishes. /// Execute the callback by spawning a new subprocess. fn spawn_callback(&mut self, task: &Task) { diff --git a/shared/state.rs b/shared/state.rs index 7d5ad26..1130b6a 100644 --- a/shared/state.rs +++ b/shared/state.rs @@ -1,6 +1,6 @@ use ::anyhow::Result; use ::chrono::prelude::*; -use ::log::{error, info}; +use ::log::{error, info, debug}; use ::serde_derive::{Deserialize, Serialize}; use ::std::collections::BTreeMap; use ::std::collections::HashMap; @@ -165,10 +165,9 @@ impl State { pub fn reset(&mut self) { self.backup(); - self.set_status_for_all_groups(true); self.max_id = 0; self.tasks = BTreeMap::new(); - self.save(); + self.set_status_for_all_groups(true); } /// Convenience wrapper around save_to_file. @@ -207,7 +206,7 @@ impl State { let now: DateTime = Utc::now(); let time = now.format("%Y-%m-%d_%H-%M-%S"); ( - path.join(format!("{}_backup.json.partial", time)), + path.join(format!("{}_state.json.partial", time)), path.join(format!("{}_state.json", time)), ) } else { @@ -224,13 +223,19 @@ impl State { } // Overwrite the original with the temp file, if everything went fine. - if let Err(error) = fs::rename(&temp, real) { + if let Err(error) = fs::rename(&temp, &real) { error!( "Failed to overwrite old log file. File permissions? Error: {:?}", error ); return; } + + if log { + debug!("State backup created at: {:?}", real); + } else { + debug!("State saved at: {:?}", real); + } } /// Restore the last state from a previous session.