Clean up state reset and log handling

This commit is contained in:
Arne Beer 2020-05-15 17:55:49 +02:00
parent bff508a614
commit 6c5ab4d244
5 changed files with 35 additions and 25 deletions

View file

@ -7,8 +7,8 @@ use ::pueue::settings::Settings;
use crate::cli::{Opt, SubCommand}; use crate::cli::{Opt, SubCommand};
// Convert and pre-process the sub-command into a valid message /// Convert the cli command into the message that's being sent to the server,
// that can be understood by the daemon. /// so it can be understood by the daemon.
pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result<Message> { pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result<Message> {
match &opt.cmd { match &opt.cmd {
SubCommand::Add { SubCommand::Add {

View file

@ -28,7 +28,7 @@ pub fn handle_message(message: Message, sender: &Sender<Message>, state: &Shared
Message::Group(message) => group(message, state), Message::Group(message) => group(message, state),
Message::Clean => clean(state), Message::Clean => clean(state),
Message::Reset => reset(sender, state), Message::Reset => reset(sender),
Message::Status => get_status(state), Message::Status => get_status(state),
Message::Log(message) => get_log(message, state), Message::Log(message) => get_log(message, state),
Message::Parallel(message) => set_parallel_tasks(message, state), Message::Parallel(message) => set_parallel_tasks(message, state),
@ -485,9 +485,8 @@ fn clean(state: &SharedState) -> Message {
/// Invoked when calling `pueue reset`. /// Invoked when calling `pueue reset`.
/// Forward the reset request to the task handler. /// Forward the reset request to the task handler.
/// The handler then kills all children and clears the task queue. /// The handler then kills all children and clears the task queue.
fn reset(sender: &Sender<Message>, state: &SharedState) -> Message { fn reset(sender: &Sender<Message>) -> Message {
sender.send(Message::Reset).expect(SENDER_ERR); sender.send(Message::Reset).expect(SENDER_ERR);
clean(state);
create_success_message("Everything is being reset right now.") create_success_message("Everything is being reset right now.")
} }

View file

@ -68,6 +68,7 @@ async fn main() -> Result<()> {
/// Initialize all directories needed for normal operation. /// Initialize all directories needed for normal operation.
fn init_directories(path: &str) { fn init_directories(path: &str) {
// Pueue base path
let pueue_dir = Path::new(path); let pueue_dir = Path::new(path);
if !pueue_dir.exists() { if !pueue_dir.exists() {
if let Err(error) = create_dir_all(&pueue_dir) { if let Err(error) = create_dir_all(&pueue_dir) {
@ -77,6 +78,8 @@ fn init_directories(path: &str) {
); );
} }
} }
// Task log dir
let log_dir = pueue_dir.join("log"); let log_dir = pueue_dir.join("log");
if !log_dir.exists() { if !log_dir.exists() {
if let Err(error) = create_dir_all(&log_dir) { if let Err(error) = create_dir_all(&log_dir) {
@ -87,6 +90,7 @@ fn init_directories(path: &str) {
} }
} }
// Task log dir
let logs_dir = pueue_dir.join("task_logs"); let logs_dir = pueue_dir.join("task_logs");
if !logs_dir.exists() { if !logs_dir.exists() {
if let Err(error) = create_dir_all(&logs_dir) { if let Err(error) = create_dir_all(&logs_dir) {

View file

@ -82,6 +82,7 @@ impl TaskHandler {
loop { loop {
self.receive_commands(); self.receive_commands();
self.handle_finished_tasks(); self.handle_finished_tasks();
self.handle_reset();
self.check_callbacks(); self.check_callbacks();
self.check_stashed(); self.check_stashed();
self.check_failed_dependencies(); self.check_failed_dependencies();
@ -180,6 +181,20 @@ impl TaskHandler {
.map(|(id, _)| *id) .map(|(id, _)| *id)
} }
/// Users can issue to reset the daemon.
/// If that's the case, the `self.reset` flag is set to true, all children are killed
/// and no new tasks will be spawned.
/// This function checks, if all killed children have been handled.
/// If that's the case, completely reset the state
fn handle_reset(&mut self) {
// The daemon got a reset request and all children already finished
if self.reset && self.children.is_empty() {
let mut state = self.state.lock().unwrap();
state.reset();
self.reset = false;
}
}
/// See if we can start a new queued task. /// See if we can start a new queued task.
fn check_new(&mut self) -> Result<()> { fn check_new(&mut self) -> Result<()> {
// Get the next task id that can be started // Get the next task id that can be started
@ -336,13 +351,6 @@ impl TaskHandler {
fn handle_finished_tasks(&mut self) { fn handle_finished_tasks(&mut self) {
let (finished, errored) = self.get_finished(); let (finished, errored) = self.get_finished();
// The daemon got a reset request and all children already finished
if self.reset && self.children.is_empty() {
let mut state = self.state.lock().unwrap();
state.reset();
self.reset = false;
}
// Nothing to do. Early return // Nothing to do. Early return
if finished.is_empty() && errored.is_empty() { if finished.is_empty() && errored.is_empty() {
return; return;
@ -391,7 +399,7 @@ impl TaskHandler {
// Pause the daemon, if the settings say so and some process failed // Pause the daemon, if the settings say so and some process failed
if failed_task_exists && self.pause_on_failure { if failed_task_exists && self.pause_on_failure {
self.change_running(false); state.running = false;
} }
state.save() state.save()
@ -421,6 +429,7 @@ impl TaskHandler {
} }
/// Some client instructions require immediate action by the task handler /// Some client instructions require immediate action by the task handler
/// This function is also responsible for waiting
fn receive_commands(&mut self) { fn receive_commands(&mut self) {
// Sleep for a few milliseconds. We don't want to hurt the CPU. // Sleep for a few milliseconds. We don't want to hurt the CPU.
let timeout = Duration::from_millis(100); let timeout = Duration::from_millis(100);
@ -729,13 +738,6 @@ impl TaskHandler {
self.reset = true; self.reset = true;
} }
/// Change the running state consistently.
fn change_running(&mut self, running: bool) {
let mut state = self.state.lock().unwrap();
state.running = running;
state.save();
}
/// Users can specify a callback that's fired whenever a task finishes. /// Users can specify a callback that's fired whenever a task finishes.
/// Execute the callback by spawning a new subprocess. /// Execute the callback by spawning a new subprocess.
fn spawn_callback(&mut self, task: &Task) { fn spawn_callback(&mut self, task: &Task) {

View file

@ -1,6 +1,6 @@
use ::anyhow::Result; use ::anyhow::Result;
use ::chrono::prelude::*; use ::chrono::prelude::*;
use ::log::{error, info}; use ::log::{error, info, debug};
use ::serde_derive::{Deserialize, Serialize}; use ::serde_derive::{Deserialize, Serialize};
use ::std::collections::BTreeMap; use ::std::collections::BTreeMap;
use ::std::collections::HashMap; use ::std::collections::HashMap;
@ -165,10 +165,9 @@ impl State {
pub fn reset(&mut self) { pub fn reset(&mut self) {
self.backup(); self.backup();
self.set_status_for_all_groups(true);
self.max_id = 0; self.max_id = 0;
self.tasks = BTreeMap::new(); self.tasks = BTreeMap::new();
self.save(); self.set_status_for_all_groups(true);
} }
/// Convenience wrapper around save_to_file. /// Convenience wrapper around save_to_file.
@ -207,7 +206,7 @@ impl State {
let now: DateTime<Utc> = Utc::now(); let now: DateTime<Utc> = Utc::now();
let time = now.format("%Y-%m-%d_%H-%M-%S"); let time = now.format("%Y-%m-%d_%H-%M-%S");
( (
path.join(format!("{}_backup.json.partial", time)), path.join(format!("{}_state.json.partial", time)),
path.join(format!("{}_state.json", time)), path.join(format!("{}_state.json", time)),
) )
} else { } else {
@ -224,13 +223,19 @@ impl State {
} }
// Overwrite the original with the temp file, if everything went fine. // Overwrite the original with the temp file, if everything went fine.
if let Err(error) = fs::rename(&temp, real) { if let Err(error) = fs::rename(&temp, &real) {
error!( error!(
"Failed to overwrite old log file. File permissions? Error: {:?}", "Failed to overwrite old log file. File permissions? Error: {:?}",
error error
); );
return; return;
} }
if log {
debug!("State backup created at: {:?}", real);
} else {
debug!("State saved at: {:?}", real);
}
} }
/// Restore the last state from a previous session. /// Restore the last state from a previous session.