1
0
mirror of https://github.com/nukesor/pueue synced 2024-06-29 06:04:21 +00:00

add: Allow resetting individual groups

This commit is contained in:
Arne Beer 2024-06-25 00:40:02 +02:00
parent 4e800a8544
commit 8316fcb4e2
No known key found for this signature in database
GPG Key ID: CC9408F679023B65
13 changed files with 236 additions and 74 deletions

View File

@ -30,6 +30,8 @@ Even though this refactoring significantly simplified the code, it also introduc
### Add
- Add `--all` and `--group` to `pueue log`. [#509](https://github.com/Nukesor/pueue/issues/509)
- Add `pueue reset [group_names]` to allow resetting individual groups. [#482](https://github.com/Nukesor/pueue/issues/482)
This also refactors the way resets are done internally, resulting in a cleaner code architecture.
## \[3.4.1\] - 2024-06-04

View File

@ -455,6 +455,10 @@ https://github.com/Nukesor/pueue/issues/350#issue-1359083118"
/// Kill all tasks, clean up afterwards and reset EVERYTHING!
Reset {
/// If groups are specified, only those specific groups will be reset.
#[arg(short, long)]
groups: Vec<String>,
/// Don't ask for any confirmation.
#[arg(short, long)]
force: bool,

View File

@ -545,12 +545,18 @@ impl Client {
group: group.clone(),
}
.into(),
SubCommand::Reset { force, .. } => {
SubCommand::Reset { force, groups, .. } => {
if self.settings.client.show_confirmation_questions && !force {
self.handle_user_confirmation("reset", &Vec::new())?;
}
ResetMessage {}.into()
let target = if groups.is_empty() {
ResetTarget::All
} else {
ResetTarget::Groups(groups.clone())
};
ResetMessage { target }.into()
}
SubCommand::Shutdown => Shutdown::Graceful.into(),
SubCommand::Parallel {

View File

@ -57,6 +57,7 @@ pub fn get_group_headline(name: &str, group: &Group, style: &OutputStyle) -> Str
let status = match group.status {
GroupStatus::Running => style.style_text("running", Some(Color::Green), None),
GroupStatus::Paused => style.style_text("paused", Some(Color::Yellow), None),
GroupStatus::Reset => style.style_text("resetting", Some(Color::Red), None),
};
format!("{} ({} parallel): {}", name, group.parallel_tasks, status)

View File

@ -38,7 +38,7 @@ pub fn handle_message(message: Message, state: &SharedState, settings: &Settings
Message::Parallel(message) => parallel::set_parallel_tasks(message, state),
Message::Pause(message) => pause::pause(settings, state, message),
Message::Remove(task_ids) => remove::remove(settings, state, task_ids),
Message::Reset(_) => reset::reset(settings, state),
Message::Reset(message) => reset::reset(settings, state, message),
Message::Restart(message) => restart::restart_multiple(settings, state, message),
Message::Send(message) => send::send(state, message),
Message::Start(message) => start::start(settings, state, message),

View File

@ -1,4 +1,5 @@
use pueue_lib::state::SharedState;
use pueue_lib::failure_msg;
use pueue_lib::state::{GroupStatus, SharedState};
use pueue_lib::{network::message::*, settings::Settings};
use crate::daemon::process_handler;
@ -6,9 +7,40 @@ use crate::daemon::process_handler;
/// Invoked when calling `pueue reset`.
/// Kill all children by using the `kill` function.
/// Set the full_reset flag, which will prevent new tasks from being spawned.
pub fn reset(settings: &Settings, state: &SharedState) -> Message {
pub fn reset(settings: &Settings, state: &SharedState, message: ResetMessage) -> Message {
let mut state = state.lock().unwrap();
state.full_reset = true;
process_handler::kill::kill(settings, &mut state, TaskSelection::All, false, None);
match message.target {
ResetTarget::All => {
// Mark all groups to be reset and kill all tasks
for (_name, group) in state.groups.iter_mut() {
group.status = GroupStatus::Reset;
}
process_handler::kill::kill(settings, &mut state, TaskSelection::All, false, None);
}
ResetTarget::Groups(groups) => {
// First up, check whether we actually have all requested groups.
for name in groups.iter() {
let group = state.groups.get(name);
if group.is_none() {
return failure_msg!("Group '{name}' doesn't exist.");
}
}
// Mark all groups to be reset and kill its tasks
for name in groups.iter() {
let group = state.groups.get_mut(name).unwrap();
group.status = GroupStatus::Reset;
process_handler::kill::kill(
settings,
&mut state,
TaskSelection::Group(name.to_string()),
false,
None,
);
}
}
}
create_success_message("Everything is being reset right now.")
}

View File

@ -2,6 +2,7 @@ use anyhow::Context;
use chrono::Local;
use log::info;
use pueue_lib::log::clean_log_handles;
use pueue_lib::state::GroupStatus;
use pueue_lib::task::{TaskResult, TaskStatus};
use super::*;
@ -95,8 +96,13 @@ pub fn handle_finished_tasks(settings: &Settings, state: &mut LockedState) {
pause_on_failure(state, settings, &task.group);
}
// Already remove the output files, if the daemon is being reset anyway
if state.full_reset {
// Already remove the output files, if this group is being reset.
if state
.groups
.get(&task.group)
.map(|group| group.status == GroupStatus::Reset)
.unwrap_or(true)
{
clean_log_handles(*task_id, &settings.shared.pueue_directory());
}
}

View File

@ -6,7 +6,6 @@ use chrono::prelude::*;
use log::{error, info};
use pueue_lib::children::Children;
use pueue_lib::log::*;
use pueue_lib::network::message::*;
use pueue_lib::network::protocol::socket_cleanup;
use pueue_lib::settings::Settings;
@ -14,7 +13,7 @@ use pueue_lib::state::{Group, GroupStatus, SharedState};
use pueue_lib::task::{TaskResult, TaskStatus};
use crate::daemon::pid::cleanup_pid_file;
use crate::daemon::state_helper::{reset_state, save_state};
use crate::daemon::state_helper::save_state;
use crate::ok_or_shutdown;
use super::callbacks::{check_callbacks, spawn_callback};
@ -46,26 +45,24 @@ pub async fn run(state: SharedState, settings: Settings) -> Result<()> {
}
loop {
{
'mutex_block: {
let mut state = state.lock().unwrap();
check_callbacks(&mut state);
handle_finished_tasks(&settings, &mut state);
// Check if we're in shutdown.
// If all tasks are killed, we do some cleanup and exit.
if state.shutdown.is_some() {
handle_shutdown(&settings, &mut state);
break 'mutex_block;
}
// If we aren't in shutdown mode, do the usual stuff
handle_group_resets(&settings, &mut state);
enqueue_delayed_tasks(&settings, &mut state);
check_failed_dependencies(&settings, &mut state);
if state.shutdown.is_some() {
// Check if we're in shutdown.
// If all tasks are killed, we do some cleanup and exit.
handle_shutdown(&settings, &mut state);
} else if state.full_reset {
// Wait until all tasks are killed.
// Once they are, reset everything and go back to normal
handle_reset(&settings, &mut state);
} else {
// Only start new tasks, if we aren't in the middle of a reset or shutdown.
spawn_new(&settings, &mut state);
}
spawn_new(&settings, &mut state);
}
tokio::time::sleep(Duration::from_millis(300)).await;
@ -106,20 +103,28 @@ fn handle_shutdown(settings: &Settings, state: &mut LockedState) {
/// and no new tasks will be spawned.
/// This function checks, if all killed children have been handled.
/// If that's the case, completely reset the state
fn handle_reset(settings: &Settings, state: &mut LockedState) {
// Don't do any reset logic, if we aren't in reset mode or if some children are still up.
if state.children.has_active_tasks() {
return;
fn handle_group_resets(_settings: &Settings, state: &mut LockedState) {
let groups_to_reset: Vec<String> = state
.groups
.iter()
.filter(|(_name, group)| group.status == GroupStatus::Reset)
.map(|(name, _)| name.to_string())
.collect();
for name in groups_to_reset.iter() {
// Don't do any reset logic, if there're still some children are still up.
if state.children.has_group_active_tasks(name) {
continue;
}
// Remove all tasks that belong to the group to reset
state.tasks.retain(|_id, task| &task.group != name);
// Restart the group, now that it's devoid of tasks.
if let Some(group) = state.groups.get_mut(name) {
group.status = GroupStatus::Running;
}
}
if let Err(error) = reset_state(state, settings) {
error!("Failed to reset state with error: {error:?}");
};
if let Err(error) = reset_task_log_directory(&settings.shared.pueue_directory()) {
panic!("Error while resetting task log directory: {error}");
};
state.full_reset = false;
}
/// As time passes, some delayed tasks may need to be enqueued.

View File

@ -1,9 +1,9 @@
use anyhow::{Context, Result};
use pueue_lib::network::message::*;
use pueue_lib::{network::message::*, state::GroupStatus};
use crate::helper::*;
/// A reset command kills all tasks and forces a clean state.
/// A reset command kills all tasks and forces a clean state accross groups.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_reset() -> Result<()> {
let daemon = daemon().await?;
@ -12,21 +12,128 @@ async fn test_reset() -> Result<()> {
// Start a long running task and make sure it's started
add_task(shared, "ls").await?;
add_task(shared, "failed").await?;
add_task(shared, "sleep 60").await?;
add_task_to_group(shared, "sleep 60", "test_2").await?;
add_task(shared, "ls").await?;
wait_for_task_condition(shared, 2, |task| task.is_running()).await?;
// Reset the daemon
send_message(shared, ResetMessage {})
.await
.context("Failed to send Start tasks message")?;
// Reset all groups of the daemon
send_message(
shared,
ResetMessage {
target: ResetTarget::All,
},
)
.await
.context("Failed to send Start tasks message")?;
// Resetting is asynchronous, wait for the first task to disappear.
// Resetting is asynchronous, wait for all task to disappear.
wait_for_task_absence(shared, 0).await?;
wait_for_task_absence(shared, 1).await?;
wait_for_task_absence(shared, 2).await?;
wait_for_task_absence(shared, 3).await?;
// All tasks should have been removed.
let state = get_state(shared).await?;
assert!(state.tasks.is_empty(),);
// Both groups should be running.
assert_eq!(
state.groups.get("default").unwrap().status,
GroupStatus::Running
);
assert_eq!(
state.groups.get("test_2").unwrap().status,
GroupStatus::Running
);
Ok(())
}
/// A reset command kills all tasks and forces a clean state.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_reset_single_group() -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;
// Start a long running task and make sure it's started
add_task(shared, "ls").await?;
add_task(shared, "failed").await?;
add_task_to_group(shared, "sleep 60", "test_2").await?;
add_task_to_group(shared, "sleep 60", "test_3").await?;
wait_for_task_condition(shared, 2, |task| task.is_running()).await?;
// Reset only the test_2 of the daemon.
send_message(
shared,
ResetMessage {
target: ResetTarget::Groups(vec!["test_2".to_string()]),
},
)
.await
.context("Failed to send Start tasks message")?;
// Resetting is asynchronous, wait for the third task to disappear.
wait_for_task_absence(shared, 2).await?;
// All tasks should have been removed.
let state = get_state(shared).await?;
assert_eq!(
state.tasks.len(),
3,
"Only a single task should have been removed"
);
assert_eq!(
state.groups.get("test_2").unwrap().status,
GroupStatus::Running
);
Ok(())
}
/// A reset command kills all tasks and forces a clean state.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_reset_multiple_groups() -> Result<()> {
let daemon = daemon().await?;
let shared = &daemon.settings.shared;
// Start a long running task and make sure it's started
add_task(shared, "ls").await?;
add_task(shared, "failed").await?;
add_task_to_group(shared, "sleep 60", "test_2").await?;
add_task_to_group(shared, "sleep 60", "test_3").await?;
wait_for_task_condition(shared, 2, |task| task.is_running()).await?;
// Reset only the test_2 of the daemon.
send_message(
shared,
ResetMessage {
target: ResetTarget::Groups(vec!["test_2".to_string(), "test_3".to_string()]),
},
)
.await
.context("Failed to send Start tasks message")?;
// Resetting is asynchronous, wait for the third task to disappear.
wait_for_task_absence(shared, 2).await?;
wait_for_task_absence(shared, 3).await?;
// All tasks should have been removed.
let state = get_state(shared).await?;
assert_eq!(
state.tasks.len(),
2,
"Only a two task should have been removed"
);
assert_eq!(
state.groups.get("test_2").unwrap().status,
GroupStatus::Running
);
assert_eq!(
state.groups.get("test_3").unwrap().status,
GroupStatus::Running
);
Ok(())
}

View File

@ -15,6 +15,16 @@ impl Children {
self.0.iter().any(|(_, pool)| !pool.is_empty())
}
/// Returns whether there are any active tasks for the given group.
///
/// Returns `false` if the group cannot be found.
pub fn has_group_active_tasks(&self, group: &str) -> bool {
self.0
.get(group)
.map(|pool| !pool.is_empty())
.unwrap_or(false)
}
/// A convenience function to check whether there's child with a given task_id.
/// We have to do a nested linear search, as these datastructure aren't indexed via task_ids.
pub fn has_child(&self, task_id: usize) -> bool {

View File

@ -1,4 +1,4 @@
use std::fs::{read_dir, remove_file, File};
use std::fs::{remove_file, File};
use std::io::{self, prelude::*, Read, SeekFrom};
use std::path::{Path, PathBuf};
@ -104,22 +104,6 @@ pub fn read_last_log_file_lines(
Ok(read_last_lines(&mut file, lines))
}
/// Remove all files in the log directory.
pub fn reset_task_log_directory(pueue_dir: &Path) -> Result<(), Error> {
let task_log_dir = pueue_dir.join("task_logs");
let files = read_dir(&task_log_dir)
.map_err(|err| Error::IoPathError(task_log_dir, "reading task log files", err))?;
for file in files.flatten() {
if let Err(err) = remove_file(file.path()) {
error!("Failed to delete log file: {err}");
}
}
Ok(())
}
/// Read the last `amount` lines of a file to a string.
///
/// Only use this for logic that doesn't stream from daemon to client!

View File

@ -280,7 +280,17 @@ pub struct GroupResponseMessage {
impl_into_message!(GroupResponseMessage, Message::GroupResponse);
#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
pub struct ResetMessage {}
pub enum ResetTarget {
// Reset all groups
All,
// Reset a list of specific groups
Groups(Vec<String>),
}
#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
pub struct ResetMessage {
pub target: ResetTarget,
}
impl_into_message!(ResetMessage, Message::Reset);

View File

@ -19,6 +19,9 @@ pub type SharedState = Arc<Mutex<State>>;
pub enum GroupStatus {
Running,
Paused,
// This state is set, if this group is being reset.
// This means that all tasks are being killed and removed.
Reset,
}
/// The representation of a group.
@ -62,11 +65,7 @@ pub struct State {
/// This is runtime state and won't be serialised to disk.
#[serde(default, skip)]
pub shutdown: Option<Shutdown>,
/// A simple flag which is used to signal that we're currently doing a full reset of the daemon.
/// This flag prevents new tasks from being spawned.
/// This is runtime state and won't be serialised to disk.
#[serde(default, skip)]
pub full_reset: bool,
/// Pueue's subprocess and worker pool representation.
/// Take a look at [Children] for more info.
/// This is runtime state and won't be serialised to disk.
@ -84,7 +83,6 @@ impl Clone for State {
tasks: self.tasks.clone(),
groups: self.groups.clone(),
shutdown: self.shutdown.clone(),
full_reset: self.full_reset,
..Default::default()
}
}
@ -94,10 +92,7 @@ impl Clone for State {
impl Eq for State {}
impl PartialEq for State {
fn eq(&self, other: &Self) -> bool {
self.tasks == other.tasks
&& self.groups == other.groups
&& self.shutdown == other.shutdown
&& self.full_reset == other.full_reset
self.tasks == other.tasks && self.groups == other.groups && self.shutdown == other.shutdown
}
}