pueue/shared/state.rs

336 lines
11 KiB
Rust
Raw Normal View History

2019-12-12 00:55:12 +00:00
use ::anyhow::Result;
2019-11-30 15:59:42 +00:00
use ::chrono::prelude::*;
2019-12-14 19:25:11 +00:00
use ::log::{error, info};
2019-11-30 15:59:42 +00:00
use ::serde_derive::{Deserialize, Serialize};
2019-12-14 19:25:11 +00:00
use ::std::collections::BTreeMap;
use ::std::fs;
use ::std::path::{Path, PathBuf};
use ::std::sync::{Arc, Mutex};
use ::std::time::SystemTime;
2019-11-21 14:10:24 +00:00
use ::strum::IntoEnumIterator;
2019-11-11 14:59:01 +00:00
2019-11-30 15:59:42 +00:00
use crate::settings::Settings;
2019-12-14 19:25:11 +00:00
use crate::task::{Task, TaskStatus};
2019-11-11 14:59:01 +00:00
pub type SharedState = Arc<Mutex<State>>;
2019-11-14 00:26:30 +00:00
#[derive(Serialize, Deserialize, Clone, Debug)]
2019-11-11 14:59:01 +00:00
pub struct State {
2019-12-16 03:50:09 +00:00
max_id: usize,
2019-11-30 15:59:42 +00:00
settings: Settings,
2019-11-27 21:59:12 +00:00
pub running: bool,
2019-12-16 03:50:09 +00:00
pub tasks: BTreeMap<usize, Task>,
2019-11-11 14:59:01 +00:00
}
impl State {
2019-11-30 15:59:42 +00:00
pub fn new(settings: &Settings) -> State {
2019-12-12 00:55:12 +00:00
let mut state = State {
2019-11-11 22:02:13 +00:00
max_id: 0,
2019-11-30 15:59:42 +00:00
settings: settings.clone(),
2019-11-27 21:59:12 +00:00
running: true,
2019-11-13 23:15:46 +00:00
tasks: BTreeMap::new(),
2019-11-13 16:29:36 +00:00
};
2019-12-12 00:55:12 +00:00
state.restore();
state
2019-11-11 22:02:13 +00:00
}
2019-12-16 03:50:09 +00:00
pub fn add_task(&mut self, mut task: Task) -> usize {
2019-11-13 16:29:26 +00:00
task.id = self.max_id;
2019-11-13 23:15:46 +00:00
self.tasks.insert(self.max_id, task);
2019-11-11 14:59:01 +00:00
self.max_id += 1;
2019-11-30 15:59:42 +00:00
self.save();
2019-11-21 00:14:32 +00:00
self.max_id - 1
2019-11-11 14:59:01 +00:00
}
2020-04-15 08:33:51 +00:00
/// Search and return the next runnable task.
/// A runnable task:
/// - is in Queued state
/// - has all its dependencies in `Done` state
pub fn get_next_task_id(&mut self) -> Option<usize> {
2020-04-10 17:10:46 +00:00
return self
.tasks
.iter()
.filter(|(_, task)| task.status == TaskStatus::Queued)
.filter(|(_, task)| {
task.dependencies
.iter()
.flat_map(|id| self.tasks.get(id))
.all(|task| task.status == TaskStatus::Done)
})
.next()
.map(|(id, _)| *id);
2019-11-11 14:59:01 +00:00
}
2019-12-16 03:50:09 +00:00
pub fn change_status(&mut self, id: usize, new_status: TaskStatus) {
2019-11-13 23:15:46 +00:00
if let Some(ref mut task) = self.tasks.get_mut(&id) {
task.status = new_status;
2019-11-30 15:59:42 +00:00
self.save();
2019-11-11 14:59:01 +00:00
};
}
pub fn set_enqueue_at(&mut self, id: usize, enqueue_at: Option<DateTime<Local>>) {
if let Some(ref mut task) = self.tasks.get_mut(&id) {
task.enqueue_at = enqueue_at;
}
}
2019-11-21 14:10:24 +00:00
/// This checks, whether the given task_ids are in the specified statuses.
/// The first result is the list of task_ids that match these statuses.
/// The second result is the list of task_ids that don't match these statuses.
2019-11-21 00:14:32 +00:00
///
/// Additionally, a list of task_ids can be specified to only run the check
/// on a subset of all tasks.
2019-11-21 14:11:35 +00:00
pub fn tasks_in_statuses(
&mut self,
statuses: Vec<TaskStatus>,
task_ids: Option<Vec<usize>>,
2019-12-16 03:50:09 +00:00
) -> (Vec<usize>, Vec<usize>) {
2019-11-21 00:14:32 +00:00
let task_ids = match task_ids {
Some(ids) => ids,
None => self.tasks.keys().cloned().collect(),
};
let mut matching = Vec::new();
let mut mismatching = Vec::new();
2019-11-21 14:10:24 +00:00
// Filter all task id's that match the provided statuses.
2019-11-21 00:14:32 +00:00
for task_id in task_ids.iter() {
// Check whether the task exists
let task = match self.tasks.get(&task_id) {
None => {
mismatching.push(*task_id);
continue;
}
Some(task) => task,
};
2019-11-21 00:14:32 +00:00
// Check whether the task status matches the specified statuses
2019-11-21 14:10:24 +00:00
if statuses.contains(&task.status) {
2019-11-21 00:14:32 +00:00
matching.push(*task_id);
} else {
mismatching.push(*task_id);
}
}
(matching, mismatching)
}
2019-11-21 14:10:24 +00:00
/// The same as tasks_in_statuses, but with inverted statuses
2019-11-21 14:11:35 +00:00
pub fn tasks_not_in_statuses(
&mut self,
excluded_statuses: Vec<TaskStatus>,
task_ids: Option<Vec<usize>>,
2019-12-16 03:50:09 +00:00
) -> (Vec<usize>, Vec<usize>) {
2019-11-21 14:10:24 +00:00
let mut valid_statuses = Vec::new();
// Create a list of all valid statuses
// (statuses that aren't the exl
for status in TaskStatus::iter() {
if !excluded_statuses.contains(&status) {
valid_statuses.push(status);
}
}
self.tasks_in_statuses(valid_statuses, task_ids)
2019-11-21 14:10:24 +00:00
}
2019-11-22 14:30:47 +00:00
2019-12-12 00:55:12 +00:00
/// Remove all finished tasks (clean up the task queue)
2019-12-11 22:34:34 +00:00
pub fn clean(&mut self) {
self.backup();
2020-03-27 12:36:05 +00:00
let statuses = vec![TaskStatus::Done, TaskStatus::Failed, TaskStatus::Killed];
let (matching, _) = self.tasks_in_statuses(statuses, None);
2019-12-11 22:34:34 +00:00
for task_id in &matching {
let _ = self.tasks.remove(task_id).unwrap();
}
self.save();
}
2019-11-25 00:49:31 +00:00
pub fn reset(&mut self) {
self.backup();
2019-12-17 17:30:41 +00:00
self.running = true;
2019-11-22 14:30:47 +00:00
self.max_id = 0;
self.tasks = BTreeMap::new();
2019-11-30 15:59:42 +00:00
self.save();
}
2019-12-12 00:55:12 +00:00
/// Convenience wrapper around save_to_file
2019-11-30 15:59:42 +00:00
pub fn save(&mut self) {
self.save_to_file(false);
}
/// Save the current current state in a file with a timestamp
/// At the same time remove old state logs from the log directory
/// This function is called, when large changes to the state are applied, e.g. clean/reset
2019-12-12 00:55:12 +00:00
fn backup(&mut self) {
self.save_to_file(true);
2019-12-12 00:55:12 +00:00
if let Err(error) = self.rotate() {
error!("Failed to rotate files: {:?}", error);
};
}
2019-12-12 00:55:12 +00:00
/// Save the current state to disk.
/// We do this to restore in case of a crash
/// If log == true, the file will be saved with a time stamp
fn save_to_file(&mut self, log: bool) {
2019-11-30 15:59:42 +00:00
let serialized = serde_json::to_string(&self);
if let Err(error) = serialized {
error!("Failed to serialize state: {:?}", error);
return;
}
let serialized = serialized.unwrap();
let path = Path::new(&self.settings.daemon.pueue_directory);
let temp: PathBuf;
let real: PathBuf;
if log {
2019-12-12 00:55:12 +00:00
let path = path.join("log");
let now: DateTime<Utc> = Utc::now();
let time = now.format("%Y-%m-%d_%H-%M-%S");
temp = path.join(format!("{}_backup.json.partial", time));
real = path.join(format!("{}_state.json", time));
} else {
temp = path.join("state.json.partial");
real = path.join("state.json");
}
2019-11-30 15:59:42 +00:00
// Write to temporary log file first, to prevent loss due to crashes
if let Err(error) = fs::write(&temp, serialized) {
2019-12-14 19:25:11 +00:00
error!(
"Failed to write log to directory. File permissions? Error: {:?}",
error
);
2019-11-30 15:59:42 +00:00
return;
}
// Overwrite the original with the temp file, if everything went fine
if let Err(error) = fs::rename(&temp, real) {
2019-12-14 19:25:11 +00:00
error!(
"Failed to overwrite old log file. File permissions? Error: {:?}",
error
);
return;
2019-11-30 15:59:42 +00:00
}
2019-11-22 14:30:47 +00:00
}
2019-11-30 15:59:42 +00:00
/// Restore the last state from a previous session
/// The state is stored as json in the log directory
2019-12-12 00:55:12 +00:00
fn restore(&mut self) {
let path = Path::new(&self.settings.daemon.pueue_directory).join("state.json");
2019-11-30 15:59:42 +00:00
// Ignore if the file doesn't exist. It doesn't have to.
if !path.exists() {
2019-12-14 19:25:11 +00:00
info!(
"Couldn't find state from previous session at location: {:?}",
path
);
2019-11-30 15:59:42 +00:00
return;
}
info!("Start restoring state");
2019-11-30 15:59:42 +00:00
// Try to load the file
let data = fs::read_to_string(&path);
if let Err(error) = data {
error!("Failed to read previous state log: {:?}", error);
return;
}
let data = data.unwrap();
// Try to deserialize it into a state
let deserialized: Result<State, serde_json::error::Error> = serde_json::from_str(&data);
if let Err(error) = deserialized {
error!("Failed to deserialize previous state log: {:?}", error);
return;
}
let mut state = deserialized.unwrap();
// Restore the state from the file
// While restoring the tasks, check for any invalid/broken stati
for (task_id, task) in state.tasks.iter_mut() {
// Handle ungraceful shutdowns while executing tasks
if task.status == TaskStatus::Running || task.status == TaskStatus::Paused {
2020-03-27 12:36:05 +00:00
info!(
"Setting task {} with previous status {:?} to new status {:?}",
task.id,
task.status,
TaskStatus::Killed
);
task.status = TaskStatus::Killed;
}
// Crash during editing of the task command
if task.status == TaskStatus::Locked {
task.status = TaskStatus::Stashed;
}
// If there are any queued tasks, pause the daemon
// This should prevent any unwanted execution of tasks due to a system crash
if task.status == TaskStatus::Queued {
info!("Pausing daemon to prevent unwanted execution of previous tasks");
state.running = false;
}
2019-11-30 15:59:42 +00:00
self.tasks.insert(*task_id, task.clone());
}
self.running = state.running;
self.max_id = state.max_id;
}
2019-12-12 00:55:12 +00:00
/// Remove old logs that aren't needed any longer
fn rotate(&mut self) -> Result<()> {
let path = Path::new(&self.settings.daemon.pueue_directory);
let path = path.join("log");
// Get all log files in the directory with their respective system time
let mut entries: BTreeMap<SystemTime, PathBuf> = BTreeMap::new();
let mut directory_list = fs::read_dir(path)?;
while let Some(Ok(entry)) = directory_list.next() {
let path = entry.path();
let metadata = entry.metadata()?;
let time = metadata.modified()?;
entries.insert(time, path);
}
// Remove all files aove the threshold
// Old files are removed first (implictly by the BTree order)
let mut number_entries = entries.len();
let mut iter = entries.iter();
while number_entries > 10 {
if let Some((_, path)) = iter.next() {
fs::remove_file(path)?;
number_entries -= 1;
}
}
Ok(())
}
2020-04-11 17:58:42 +00:00
2020-04-15 08:33:51 +00:00
/// Ensure that no `Queued` tasks dont have any failed dependencies, otherwise set their status to `Failed`.
pub fn check_failed_dependencies(&mut self) {
2020-04-11 17:58:42 +00:00
let has_failed_deps: Vec<_> = self
.tasks
.iter()
.filter(|(_, task)| task.status == TaskStatus::Queued)
.filter_map(|(id, task)| {
let failed = task
.dependencies
.iter()
.flat_map(|id| self.tasks.get(id))
.filter(|task| task.is_errored())
.map(|task| task.id)
.next();
failed.map(|f| (*id, f))
})
.collect();
for (id, failed) in has_failed_deps {
if let Some(task) = self.tasks.get_mut(&id) {
task.status = TaskStatus::Failed;
task.stderr = Some(format!("Dependent task {:?} has failed", failed));
}
}
}
2019-11-11 14:59:01 +00:00
}