diff --git a/CHANGELOG.md b/CHANGELOG.md index 18bdbe7..ccffea0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ **Improvements:** - Environment variable capture. Tasks will now start with the variables of the environment `pueue add` is being called in. - `follow` (previously `show`) now also reads directly from disk, if `read_local_logs` is set to `true`. +- Add new flags `--default` to `kill`, `start` and `pause`. With this flag only tasks in the default queue will be affected +- The `--all` flag now affects all groups AND the default queue for `kill`, `start` and `pause`. +- Added `--group` flag for `status`. This will only print tasks of a specific group # v0.4.0 **Features:** diff --git a/client/cli.rs b/client/cli.rs index 25ae01c..7f71f7e 100644 --- a/client/cli.rs +++ b/client/cli.rs @@ -88,16 +88,23 @@ pub enum SubCommand { delay_until: Option>, }, - /// Wake the daemon from its paused state and continue all paused tasks. - /// Can also be used to resume or start specific tasks or groups. + /// Resume operation of specific tasks or groups of tasks. + /// Without any parameters, resumes the default queue and all it's tasks. + /// Can also be used force specific tasks to start. + #[structopt(verbatim_doc_comment)] Start { - /// Enforce starting these tasks. - /// This doesn't affect any other tasks and can be on a paused daemon or group. + /// Enforce starting these tasks. Paused tasks will be started again. + /// This doesn't affect anything other than these tasks. task_ids: Vec, - /// Start a specific group. Any task_ids will be ignored when using this flag. - #[structopt(name = "group", short, long)] + /// Start a specific group and all paused tasks in it. + #[structopt(short, long, group("start"))] group: Option, + + /// Start a everything (Default queue and all groups)! + /// All groups will be set to `running` and all paused tasks will be resumed. + #[structopt(short, long, group("start"))] + all: bool, }, /// Restart task(s). @@ -116,34 +123,46 @@ pub enum SubCommand { stashed: bool, }, - /// Pause the daemon and all running tasks. - /// A paused daemon won't start any new tasks. - /// Daemon, groups and tasks can be resumed with `start` - /// Can also be used to pause a specific group or specific tasks. + /// Pause either running tasks or specific groups of tasks. + /// Without any parameters, pauses the default queue and all it's tasks. + /// A paused queue (group) won't start any new tasks. + /// Everything can be resumed with `start`. + #[structopt(verbatim_doc_comment)] Pause { - /// Let any running tasks finish by themselves, when pausing the daemon or a group. - #[structopt(short, long, group("pause"), conflicts_with("task_ids"))] - wait: bool, - /// Pause these specific tasks. - /// Doesn't affect the daemon, groups or any other tasks. - #[structopt(group("pause"))] + /// Doesn't affect the default queue, groups or any other tasks. task_ids: Vec, - /// Pause a specific group. Any task_ids will be ignored when using this flag. - #[structopt(name = "group", short, long)] + /// Pause a specific group. + #[structopt(short, long, group("pause"))] group: Option, - }, - /// Kill either all or only specific running tasks. - Kill { - /// Kill all running tasks, this also pauses the daemon. - #[structopt(short, long, group("kill"), conflicts_with("task_ids"))] + /// Pause everything (Default queue and all groups)! + #[structopt(short, long, group("pause"))] all: bool, + /// Don't pause already running tasks and let them finish by themselves, + /// when pausing with `default`, `all` or `group`. + #[structopt(short, long, group("pause"))] + wait: bool, + }, + + /// Kill specific running tasks or various groups of tasks. + Kill { /// The tasks that should be killed. - #[structopt(group("kill"))] task_ids: Vec, + + /// Kill all running tasks in the default queue. Pause the default queue. + #[structopt(short, long, group("kill"))] + default: bool, + + /// Kill all running in a group. Pauses the group. + #[structopt(short, long, group("kill"))] + group: Option, + + /// Kill ALL running tasks. This also pauses everything + #[structopt(short, long, group("kill"))] + all: bool, }, /// Send something to a task. Useful for sending confirmations ('y\n'). @@ -156,7 +175,8 @@ pub enum SubCommand { }, /// Edit the command or path of a stashed or queued task. - /// By default this edits the command of the task + /// This edits the command of the task by default. + #[structopt(verbatim_doc_comment)] Edit { /// The id of the task. task_id: usize, diff --git a/client/message.rs b/client/message.rs index e5fa067..8672ca1 100644 --- a/client/message.rs +++ b/client/message.rs @@ -63,10 +63,11 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result { }; Ok(Message::Enqueue(message)) } - SubCommand::Start { task_ids, group } => { + SubCommand::Start { task_ids, group, all } => { let message = StartMessage { task_ids: task_ids.clone(), group: group.clone(), + all: *all, }; Ok(Message::Start(message)) } @@ -83,21 +84,25 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result { Ok(Message::Restart(message)) } SubCommand::Pause { - wait, task_ids, group, + wait, + all, } => { let message = PauseMessage { - wait: *wait, task_ids: task_ids.clone(), group: group.clone(), + wait: *wait, + all: *all, }; Ok(Message::Pause(message)) } - SubCommand::Kill { all, task_ids } => { + SubCommand::Kill { task_ids, group, default, all} => { let message = KillMessage { - all: *all, task_ids: task_ids.clone(), + group: group.clone(), + default: *default, + all: *all, }; Ok(Message::Kill(message)) } diff --git a/client/output.rs b/client/output.rs index bf8906d..5c75600 100644 --- a/client/output.rs +++ b/client/output.rs @@ -41,7 +41,7 @@ pub fn print_state(state: State, cli_command: &SubCommand) { return; } - println!("{}", get_daemon_headline(&state)); + println!("{}", get_default_headline(&state)); // Early exit and hint if there are no tasks in the queue if state.tasks.is_empty() { diff --git a/client/output_helper.rs b/client/output_helper.rs index ae48f90..841d23a 100644 --- a/client/output_helper.rs +++ b/client/output_helper.rs @@ -20,7 +20,7 @@ pub fn has_special_columns(tasks: &BTreeMap) -> (bool, bool) { } /// Return a nicely formatted headline that's displayed at the start of `pueue status` -pub fn get_daemon_headline(state: &State) -> String { +pub fn get_default_headline(state: &State) -> String { // Print the current daemon state. let daemon_status_text = if state.running { style("running").with(Color::Green) @@ -30,7 +30,7 @@ pub fn get_daemon_headline(state: &State) -> String { let parallel = state.settings.daemon.default_parallel_tasks; format!( "{} ({} parallel): {}", - style("Daemon status").attribute(Attribute::Bold), + style("Default queue").attribute(Attribute::Bold), parallel, daemon_status_text ) diff --git a/daemon/instructions.rs b/daemon/instructions.rs index 2c74771..30844ed 100644 --- a/daemon/instructions.rs +++ b/daemon/instructions.rs @@ -36,9 +36,9 @@ pub fn handle_message(message: Message, sender: &Sender, state: &Shared } } +/// Invoked when calling `pueue add`. /// Queues a new task to the state. /// If the start_immediately flag is set, send a StartMessage to the task handler. -/// Invoked when calling `pueue add`. fn add_task(message: AddMessage, sender: &Sender, state: &SharedState) -> Message { let starting_status = if message.stashed || message.enqueue_at.is_some() { TaskStatus::Stashed @@ -89,7 +89,7 @@ fn add_task(message: AddMessage, sender: &Sender, state: &SharedState) sender .send(Message::Start(StartMessage { task_ids: vec![task_id], - group: None, + ..Default::default() })) .expect(SENDER_ERR); } @@ -108,9 +108,9 @@ fn add_task(message: AddMessage, sender: &Sender, state: &SharedState) create_success_message(message) } +/// Invoked when calling `pueue remove`. /// Remove tasks from the queue. /// We have to ensure that those tasks aren't running! -/// Invoked when calling `pueue remove`. fn remove(task_ids: Vec, state: &SharedState) -> Message { let mut state = state.lock().unwrap(); let statuses = vec![TaskStatus::Running, TaskStatus::Paused]; @@ -126,9 +126,9 @@ fn remove(task_ids: Vec, state: &SharedState) -> Message { create_success_message(response) } +/// Invoked when calling `pueue switch`. /// Switch the position of two tasks in the upcoming queue. /// We have to ensure that those tasks are either `Queued` or `Stashed` -/// Invoked when calling `pueue switch`. fn switch(message: SwitchMessage, state: &SharedState) -> Message { let task_ids = vec![message.task_id_1, message.task_id_2]; let statuses = vec![TaskStatus::Queued, TaskStatus::Stashed]; @@ -154,9 +154,9 @@ fn switch(message: SwitchMessage, state: &SharedState) -> Message { create_success_message("Tasks have been switched") } +/// Invoked when calling `pueue stash`. /// Stash specific queued tasks. /// They won't be executed until they're enqueued or explicitely started. -/// Invoked when calling `pueue stash`. fn stash(task_ids: Vec, state: &SharedState) -> Message { let (matching, mismatching) = { let mut state = state.lock().unwrap(); @@ -175,8 +175,8 @@ fn stash(task_ids: Vec, state: &SharedState) -> Message { create_success_message(response) } -/// Enqueue specific stashed tasks. /// Invoked when calling `pueue enqueue`. +/// Enqueue specific stashed tasks. fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message { let (matching, mismatching) = { let mut state = state.lock().unwrap(); @@ -206,33 +206,15 @@ fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message { create_success_message(response) } -/// Forward the start message to the task handler, which then starts the process(es). /// Invoked when calling `pueue start`. +/// Forward the start message to the task handler, which then starts the process(es). fn start(message: StartMessage, sender: &Sender, state: &SharedState) -> Message { - // Start a group - if let Some(group) = message.group { - let mut state = state.lock().unwrap(); - if !state.groups.contains_key(&group) { + // Check whether a given group exists + if let Some(group) = &message.group { + let state = state.lock().unwrap(); + if !state.groups.contains_key(group) { return create_failure_message(format!("Group {} doesn't exists", group)); } - - // Set the group to paused. - state.groups.insert(group.clone(), true); - state.save(); - - // Notify the task handler to start all (if any) paused tasks in that group. - let paused_tasks = state.task_ids_in_group_with_status(&group, TaskStatus::Paused); - if !paused_tasks.is_empty() { - let start_message = StartMessage { - task_ids: paused_tasks, - group: None, - }; - sender - .send(Message::Start(start_message)) - .expect(SENDER_ERR); - } - - return create_success_message(format!("Group {} started", group)); } sender @@ -248,12 +230,18 @@ fn start(message: StartMessage, sender: &Sender, state: &SharedState) - return create_success_message(response); } - create_success_message("Daemon and all tasks are being resumed.") + if let Some(group) = &message.group { + create_success_message(format!("Group {} is being resumed.", group)) + } else if message.all { + create_success_message("All queues are being resumed.") + } else { + create_success_message("Default queue is being resumed.") + } } +/// Invoked when calling `pueue restart`. /// Create and enqueue tasks from already finished tasks. /// The user can specify to immediately start the newly created tasks. -/// Invoked when calling `pueue restart`. fn restart(message: RestartMessage, sender: &Sender, state: &SharedState) -> Message { let new_status = if message.stashed { TaskStatus::Stashed @@ -287,7 +275,7 @@ fn restart(message: RestartMessage, sender: &Sender, state: &SharedStat if message.start_immediately { let message = StartMessage { task_ids: new_ids, - group: None, + ..Default::default() }; sender.send(Message::Start(message)).expect(SENDER_ERR); } @@ -295,36 +283,17 @@ fn restart(message: RestartMessage, sender: &Sender, state: &SharedStat create_success_message(response) } -/// Forward the pause message to the task handler, which then pauses the process(es). /// Invoked when calling `pueue pause`. +/// Forward the pause message to the task handler, which then pauses groups/tasks/everything. fn pause(message: PauseMessage, sender: &Sender, state: &SharedState) -> Message { - // Pause a specific group - if let Some(group) = message.group { - let mut state = state.lock().unwrap(); - if !state.groups.contains_key(&group) { + // Check whether a given group exists + if let Some(group) = &message.group { + let state = state.lock().unwrap(); + if !state.groups.contains_key(group) { return create_failure_message(format!("Group {} doesn't exists", group)); } - // Set the group to paused. - state.groups.insert(group.clone(), false); - state.save(); - - // Notify the task handler to pause all (if any) running tasks in that group. - let running_tasks = state.task_ids_in_group_with_status(&group, TaskStatus::Running); - if !message.wait && !running_tasks.is_empty() { - let pause_message = PauseMessage { - task_ids: running_tasks, - wait: false, - group: Some(group.clone()), - }; - sender - .send(Message::Pause(pause_message)) - .expect(SENDER_ERR); - } - - return create_success_message(format!("Group {} paused", group)); } - // Forward the pause message to the TaskHandler sender .send(Message::Pause(message.clone())) .expect(SENDER_ERR); @@ -337,12 +306,17 @@ fn pause(message: PauseMessage, sender: &Sender, state: &SharedState) - ); return create_success_message(response); } - - create_success_message("Daemon and all tasks are being paused.") + if let Some(group) = &message.group { + create_success_message(format!("Group {} is being paused.", group)) + } else if message.all { + create_success_message("All queues are being paused.") + } else { + create_success_message("Default queue is being paused.") + } } -/// Forward the kill message to the task handler, which then kills the process. /// Invoked when calling `pueue kill`. +/// Forward the kill message to the task handler, which then kills the process. fn kill(message: KillMessage, sender: &Sender, state: &SharedState) -> Message { sender .send(Message::Kill(message.clone())) @@ -358,12 +332,18 @@ fn kill(message: KillMessage, sender: &Sender, state: &SharedState) -> return create_success_message(response); } - create_success_message("All tasks are being killed.") + if let Some(group) = &message.group { + create_success_message(format!("All tasks of Group {} is being killed.", group)) + } else if message.all { + create_success_message("All tasks are being killed.") + } else { + create_success_message("All tasks of the default queue are being paused.") + } } +/// Invoked when calling `pueue send`. /// The message will be forwarded to the task handler, which then sends the user input to the process. /// In here we only do some error handling. -/// Invoked when calling `pueue send`. fn send(message: SendMessage, sender: &Sender, state: &SharedState) -> Message { // Check whether the task exists and is running. Abort if that's not the case. { @@ -384,9 +364,9 @@ fn send(message: SendMessage, sender: &Sender, state: &SharedState) -> create_success_message("Message is being send to the process.") } +/// Invoked when calling `pueue edit`. /// If a user wants to edit a message, we need to send him the current command. /// Lock the task to prevent execution, before the user has finished editing the command. -/// Invoked when calling `pueue edit`. fn edit_request(task_id: usize, state: &SharedState) -> Message { // Check whether the task exists and is queued/stashed. Abort if that's not the case. let mut state = state.lock().unwrap(); @@ -409,8 +389,8 @@ fn edit_request(task_id: usize, state: &SharedState) -> Message { } } -/// Now we actually update the message with the updated command from the client. /// Invoked after closing the editor on `pueue edit`. +/// Now we actually update the message with the updated command from the client. fn edit(message: EditMessage, state: &SharedState) -> Message { // Check whether the task exists and is locked. Abort if that's not the case let mut state = state.lock().unwrap(); @@ -431,11 +411,11 @@ fn edit(message: EditMessage, state: &SharedState) -> Message { } } +/// Invoked on `pueue groups`. /// Manage groups. /// - Show groups /// - Add group /// - Remove group -/// Invoked on `pueue groups`. fn group(message: GroupMessage, state: &SharedState) -> Message { let mut state = state.lock().unwrap(); @@ -485,8 +465,8 @@ fn group(message: GroupMessage, state: &SharedState) -> Message { create_success_message(group_status) } -/// Remove all failed or done tasks from the state. /// Invoked when calling `pueue clean`. +/// Remove all failed or done tasks from the state. fn clean(state: &SharedState) -> Message { let mut state = state.lock().unwrap(); state.backup(); @@ -502,24 +482,24 @@ fn clean(state: &SharedState) -> Message { create_success_message("All finished tasks have been removed") } +/// Invoked when calling `pueue reset`. /// Forward the reset request to the task handler. /// The handler then kills all children and clears the task queue. -/// Invoked when calling `pueue reset`. fn reset(sender: &Sender, state: &SharedState) -> Message { sender.send(Message::Reset).expect(SENDER_ERR); clean(state); create_success_message("Everything is being reset right now.") } -/// Return the current state. /// Invoked when calling `pueue status`. +/// Return the current state. fn get_status(state: &SharedState) -> Message { let state = state.lock().unwrap().clone(); Message::StatusResponse(state) } -/// Return the current state and the stdou/stderr of all tasks to the client. /// Invoked when calling `pueue log`. +/// Return the current state and the stdou/stderr of all tasks to the client. fn get_log(message: LogRequestMessage, state: &SharedState) -> Message { let state = state.lock().unwrap().clone(); // Return all logs, if no specific task id is specified diff --git a/daemon/task_handler.rs b/daemon/task_handler.rs index df0aeb6..866bf72 100644 --- a/daemon/task_handler.rs +++ b/daemon/task_handler.rs @@ -25,7 +25,6 @@ pub struct TaskHandler { receiver: Receiver, children: BTreeMap, callbacks: Vec, - running: bool, reset: bool, // Some static settings that are extracted from `state.settings` for convenience purposes. pueue_directory: String, @@ -35,11 +34,12 @@ pub struct TaskHandler { impl TaskHandler { pub fn new(state: SharedState, receiver: Receiver) -> Self { - let (running, pueue_directory, callback, pause_on_failure) = { + // Extract some static settings we often need. + // This prevents locking the State all the time. + let (pueue_directory, callback, pause_on_failure) = { let state = state.lock().unwrap(); let settings = &state.settings.daemon; ( - state.running, settings.pueue_directory.clone(), settings.callback.clone(), settings.pause_on_failure, @@ -50,7 +50,6 @@ impl TaskHandler { receiver, children: BTreeMap::new(), callbacks: Vec::new(), - running, reset: false, pueue_directory, callback, @@ -86,7 +85,7 @@ impl TaskHandler { self.check_callbacks(); self.check_stashed(); self.check_failed_dependencies(); - if self.running && !self.reset { + if !self.reset { let _res = self.check_new(); } } @@ -96,15 +95,17 @@ impl TaskHandler { /// Precondition for a task to be started: /// - is in Queued state /// - There are free slots in the task's group + /// - The group is running /// - has all its dependencies in `Done` state pub fn get_next_task_id(&mut self) -> Option { let state = self.state.lock().unwrap(); - // Check how many tasks are running in each group let mut running_tasks_per_group: HashMap = HashMap::new(); + // Create a default group for tasks without an explicit group running_tasks_per_group.insert("default".into(), 0); + // Walk through all tasks and save the number of running tasks by group for (_, task) in state.tasks.iter() { // We are only interested in currently running tasks. if ![TaskStatus::Running, TaskStatus::Paused].contains(&task.status) { @@ -158,9 +159,14 @@ impl TaskHandler { }, } } else { + // The task is assigned to the default queue. + // Check if the default queue is paused and return false if it's not. + if !state.running { + return false; + } + // We can unwrap safely, since default is always created. let running = running_tasks_per_group.get("default").unwrap(); - running < &state.settings.daemon.default_parallel_tasks } }) @@ -284,9 +290,7 @@ impl TaskHandler { // Pause the daemon, if the settings say so if self.pause_on_failure { - self.running = false; - state.running = false; - state.save(); + state.running = false } return; } @@ -336,7 +340,6 @@ impl TaskHandler { if self.reset && self.children.is_empty() { let mut state = self.state.lock().unwrap(); state.reset(); - self.running = true; self.reset = false; } @@ -388,8 +391,7 @@ impl TaskHandler { // Pause the daemon, if the settings say so and some process failed if failed_task_exists && self.pause_on_failure { - self.running = false; - state.running = false; + self.change_running(false); } state.save() @@ -465,6 +467,7 @@ impl TaskHandler { /// 2. Or force the start of specific tasks. fn start(&mut self, message: StartMessage) { // Only start specific tasks + // This is handled separately, since this can also force-spawn processes if !message.task_ids.is_empty() { for id in &message.task_ids { // Continue all children that are simply paused @@ -478,13 +481,42 @@ impl TaskHandler { return; } - // Start the daemon and all paused tasks - let keys: Vec = self.children.keys().cloned().collect(); + // Get the keys of all tasks that should be resumed + // These can either be + // - All running tasks + // - The paused tasks of a specific group + // - The paused tasks of the default queue + let keys: Vec = if message.all { + // Resume all groups and the default queue + info!("Resuming everything"); + let mut state = self.state.lock().unwrap(); + state.set_status_for_all_groups(true); + + self.children.keys().cloned().collect() + } else if let Some(group) = &message.group { + let mut state = self.state.lock().unwrap(); + // Ensure that a given group exists. (Might not happen due to concurrency) + if !state.groups.contains_key(group) { + return + } + // Set the group to running. + state.groups.insert(group.clone(), true); + info!("Resuming group {}", group); + + state.task_ids_in_group_with_stati(&message.group, vec![TaskStatus::Paused]) + } else { + let mut state = self.state.lock().unwrap(); + state.running = true; + info!("Resuming default queue"); + + state.save(); + state.task_ids_in_group_with_stati(&None, vec![TaskStatus::Paused]) + }; + + // Resume all specified paused tasks for id in keys { self.continue_task(id); } - info!("Resuming daemon (start)"); - self.change_running(true); } /// Send a start signal to a paused task to continue execution. @@ -521,23 +553,48 @@ impl TaskHandler { /// 1. Either pause the daemon and all tasks. /// 2. Or only pause specific tasks. fn pause(&mut self, message: PauseMessage) { + // Get the keys of all tasks that should be resumed + // These can either be + // - Specific tasks + // - All running tasks + // - The paused tasks of a group + // - The paused tasks of the default queue // Only pause specific tasks - if !message.task_ids.is_empty() { - for id in &message.task_ids { - self.pause_task(*id); - } - return; - } + let keys: Vec = if !message.task_ids.is_empty() { + message.task_ids + } else if message.all { + // Pause all running tasks + let mut state = self.state.lock().unwrap(); + state.set_status_for_all_groups(false); - // Pause the daemon and all tasks - let keys: Vec = self.children.keys().cloned().collect(); + info!("Pausing everything"); + self.children.keys().cloned().collect() + } else if let Some(group) = &message.group { + // Ensure that a given group exists. (Might not happen due to concurrency) + let mut state = self.state.lock().unwrap(); + if !state.groups.contains_key(group) { + return + } + // Pause a specific group. + state.groups.insert(group.clone(), false); + info!("Pausing group {}", group); + + state.task_ids_in_group_with_stati(&message.group, vec![TaskStatus::Running]) + } else { + // Pause the default queue + let mut state = self.state.lock().unwrap(); + state.running = false; + info!("Pausing default queue"); + + state.task_ids_in_group_with_stati(&None, vec![TaskStatus::Running]) + }; + + // Pause all specified tasks if !message.wait { for id in keys { self.pause_task(id); } } - info!("Pausing daemon"); - self.change_running(false); } /// Pause a specific task. @@ -564,26 +621,50 @@ impl TaskHandler { } } - /// Handle the pause message: - /// 1. Either kill all tasks. - /// 2. Or only kill specific tasks. + /// Handle the kill message: + /// 1. Kill specific tasks. + /// 2. Kill all tasks. + /// 3. Kill all tasks of a specific group. + /// 4. Kill all tasks of the default queue. fn kill(&mut self, message: KillMessage) { + // Get the keys of all tasks that should be resumed + // These can either be + // - Specific tasks + // - All running tasks + // - The paused tasks of a group + // - The paused tasks of the default queue // Only pause specific tasks - if !message.task_ids.is_empty() { - for id in message.task_ids { - self.kill_task(id); - } - return; - } + let task_ids: Vec = if !message.task_ids.is_empty() { + message.task_ids + } else if message.all { + // Pause all running tasks + let mut state = self.state.lock().unwrap(); + state.set_status_for_all_groups(false); - // Pause the daemon and kill all tasks. - if message.all { - info!("Killing all spawned children"); - self.change_running(false); - let keys: Vec = self.children.keys().cloned().collect(); - for id in keys { - self.kill_task(id); + info!("Killing all running tasks"); + self.children.keys().cloned().collect() + } else if let Some(group) = &message.group { + // Ensure that a given group exists. (Might not happen due to concurrency) + let mut state = self.state.lock().unwrap(); + if !state.groups.contains_key(group) { + return } + // Pause a specific group. + state.groups.insert(group.clone(), false); + info!("Killing tasks of group {}", group); + + state.task_ids_in_group_with_stati(&message.group, vec![TaskStatus::Running, TaskStatus::Paused]) + } else { + // Pause the default queue + let mut state = self.state.lock().unwrap(); + state.running = false; + info!("Killing tasks of the default queue"); + + state.task_ids_in_group_with_stati(&None, vec![TaskStatus::Running, TaskStatus::Paused]) + }; + + for task_id in task_ids { + self.kill_task(task_id); } } @@ -637,8 +718,8 @@ impl TaskHandler { /// If all children finished, the state will be completely reset. fn reset(&mut self) { let message = KillMessage { - task_ids: Vec::new(), all: true, + ..Default::default() }; self.kill(message); @@ -649,7 +730,6 @@ impl TaskHandler { fn change_running(&mut self, running: bool) { let mut state = self.state.lock().unwrap(); state.running = running; - self.running = running; state.save(); } diff --git a/shared/message.rs b/shared/message.rs index 25cfc55..5183abf 100644 --- a/shared/message.rs +++ b/shared/message.rs @@ -72,23 +72,27 @@ pub struct RestartMessage { pub stashed: bool, } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct StartMessage { pub task_ids: Vec, pub group: Option, + pub all: bool, } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct PauseMessage { pub task_ids: Vec, - pub wait: bool, pub group: Option, + pub wait: bool, + pub all: bool, } -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct KillMessage { - pub all: bool, pub task_ids: Vec, + pub group: Option, + pub default: bool, + pub all: bool, } #[derive(Clone, Debug, Deserialize, Serialize)] diff --git a/shared/state.rs b/shared/state.rs index 848b370..ed2632b 100644 --- a/shared/state.rs +++ b/shared/state.rs @@ -98,21 +98,27 @@ impl State { self.settings.save() } + /// Set the running status for all groups including the default queue + pub fn set_status_for_all_groups(&mut self, status: bool) { + self.running = status; + let keys = self.groups.keys().cloned().collect::>(); + for key in keys { + self.groups.insert(key.into(), status); + } + self.save() + } + /// Get all task ids of a specific group - pub fn task_ids_in_group_with_status( + pub fn task_ids_in_group_with_stati( &mut self, - group: &String, - status: TaskStatus, + group: &Option, + stati: Vec, ) -> Vec { self.tasks .iter() - .filter(|(_, task)| task.status == status) + .filter(|(_, task)| stati.contains(&task.status)) .filter(|(_, task)| { - if let Some(task_group) = &task.group { - return task_group == group; - } - - false + group == &task.group }) .map(|(id, _)| *id) .collect() @@ -161,7 +167,7 @@ impl State { pub fn reset(&mut self) { self.backup(); - self.running = true; + self.set_status_for_all_groups(true); self.max_id = 0; self.tasks = BTreeMap::new(); self.save(); diff --git a/test_helper/groups.sh b/test_helper/groups.sh new file mode 100755 index 0000000..db161ae --- /dev/null +++ b/test_helper/groups.sh @@ -0,0 +1,27 @@ +# Reset daemon +pueue reset +sleep 1 + +pueue group -a hdd +pueue group -a cpu +pueue parallel 1 +pueue parallel -g cpu 1 +pueue parallel -g hdd 1 + +pueue pause +pueue add sleep 6000 +pueue add sleep 6000 +pueue start 1 + +pueue add -g hdd "sleep 5000" +pueue pause -g hdd +pueue add -g hdd "sleep 5000" +pueue add -g hdd "sleep 5000" +pueue start 4 + + +pueue pause -g cpu +pueue add -g cpu "sleep 5000" +pueue add -g cpu "sleep 5000" +pueue add -g cpu "sleep 5000" +pueue start -g cpu