Add -d to 'kill' and improve -a behavior for 'start', 'pause' and 'kill'

This commit is contained in:
Arne Beer 2020-05-14 13:51:15 +02:00
parent ecfdbcdba1
commit 9436819418
10 changed files with 286 additions and 161 deletions

View file

@ -12,6 +12,9 @@
**Improvements:** **Improvements:**
- Environment variable capture. Tasks will now start with the variables of the environment `pueue add` is being called in. - Environment variable capture. Tasks will now start with the variables of the environment `pueue add` is being called in.
- `follow` (previously `show`) now also reads directly from disk, if `read_local_logs` is set to `true`. - `follow` (previously `show`) now also reads directly from disk, if `read_local_logs` is set to `true`.
- Add new flags `--default` to `kill`, `start` and `pause`. With this flag only tasks in the default queue will be affected
- The `--all` flag now affects all groups AND the default queue for `kill`, `start` and `pause`.
- Added `--group` flag for `status`. This will only print tasks of a specific group
# v0.4.0 # v0.4.0
**Features:** **Features:**

View file

@ -88,16 +88,23 @@ pub enum SubCommand {
delay_until: Option<DateTime<Local>>, delay_until: Option<DateTime<Local>>,
}, },
/// Wake the daemon from its paused state and continue all paused tasks. /// Resume operation of specific tasks or groups of tasks.
/// Can also be used to resume or start specific tasks or groups. /// Without any parameters, resumes the default queue and all it's tasks.
/// Can also be used force specific tasks to start.
#[structopt(verbatim_doc_comment)]
Start { Start {
/// Enforce starting these tasks. /// Enforce starting these tasks. Paused tasks will be started again.
/// This doesn't affect any other tasks and can be on a paused daemon or group. /// This doesn't affect anything other than these tasks.
task_ids: Vec<usize>, task_ids: Vec<usize>,
/// Start a specific group. Any task_ids will be ignored when using this flag. /// Start a specific group and all paused tasks in it.
#[structopt(name = "group", short, long)] #[structopt(short, long, group("start"))]
group: Option<String>, group: Option<String>,
/// Start a everything (Default queue and all groups)!
/// All groups will be set to `running` and all paused tasks will be resumed.
#[structopt(short, long, group("start"))]
all: bool,
}, },
/// Restart task(s). /// Restart task(s).
@ -116,34 +123,46 @@ pub enum SubCommand {
stashed: bool, stashed: bool,
}, },
/// Pause the daemon and all running tasks. /// Pause either running tasks or specific groups of tasks.
/// A paused daemon won't start any new tasks. /// Without any parameters, pauses the default queue and all it's tasks.
/// Daemon, groups and tasks can be resumed with `start` /// A paused queue (group) won't start any new tasks.
/// Can also be used to pause a specific group or specific tasks. /// Everything can be resumed with `start`.
#[structopt(verbatim_doc_comment)]
Pause { Pause {
/// Let any running tasks finish by themselves, when pausing the daemon or a group.
#[structopt(short, long, group("pause"), conflicts_with("task_ids"))]
wait: bool,
/// Pause these specific tasks. /// Pause these specific tasks.
/// Doesn't affect the daemon, groups or any other tasks. /// Doesn't affect the default queue, groups or any other tasks.
#[structopt(group("pause"))]
task_ids: Vec<usize>, task_ids: Vec<usize>,
/// Pause a specific group. Any task_ids will be ignored when using this flag. /// Pause a specific group.
#[structopt(name = "group", short, long)] #[structopt(short, long, group("pause"))]
group: Option<String>, group: Option<String>,
},
/// Kill either all or only specific running tasks. /// Pause everything (Default queue and all groups)!
Kill { #[structopt(short, long, group("pause"))]
/// Kill all running tasks, this also pauses the daemon.
#[structopt(short, long, group("kill"), conflicts_with("task_ids"))]
all: bool, all: bool,
/// Don't pause already running tasks and let them finish by themselves,
/// when pausing with `default`, `all` or `group`.
#[structopt(short, long, group("pause"))]
wait: bool,
},
/// Kill specific running tasks or various groups of tasks.
Kill {
/// The tasks that should be killed. /// The tasks that should be killed.
#[structopt(group("kill"))]
task_ids: Vec<usize>, task_ids: Vec<usize>,
/// Kill all running tasks in the default queue. Pause the default queue.
#[structopt(short, long, group("kill"))]
default: bool,
/// Kill all running in a group. Pauses the group.
#[structopt(short, long, group("kill"))]
group: Option<String>,
/// Kill ALL running tasks. This also pauses everything
#[structopt(short, long, group("kill"))]
all: bool,
}, },
/// Send something to a task. Useful for sending confirmations ('y\n'). /// Send something to a task. Useful for sending confirmations ('y\n').
@ -156,7 +175,8 @@ pub enum SubCommand {
}, },
/// Edit the command or path of a stashed or queued task. /// Edit the command or path of a stashed or queued task.
/// By default this edits the command of the task /// This edits the command of the task by default.
#[structopt(verbatim_doc_comment)]
Edit { Edit {
/// The id of the task. /// The id of the task.
task_id: usize, task_id: usize,

View file

@ -63,10 +63,11 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result<Message> {
}; };
Ok(Message::Enqueue(message)) Ok(Message::Enqueue(message))
} }
SubCommand::Start { task_ids, group } => { SubCommand::Start { task_ids, group, all } => {
let message = StartMessage { let message = StartMessage {
task_ids: task_ids.clone(), task_ids: task_ids.clone(),
group: group.clone(), group: group.clone(),
all: *all,
}; };
Ok(Message::Start(message)) Ok(Message::Start(message))
} }
@ -83,21 +84,25 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result<Message> {
Ok(Message::Restart(message)) Ok(Message::Restart(message))
} }
SubCommand::Pause { SubCommand::Pause {
wait,
task_ids, task_ids,
group, group,
wait,
all,
} => { } => {
let message = PauseMessage { let message = PauseMessage {
wait: *wait,
task_ids: task_ids.clone(), task_ids: task_ids.clone(),
group: group.clone(), group: group.clone(),
wait: *wait,
all: *all,
}; };
Ok(Message::Pause(message)) Ok(Message::Pause(message))
} }
SubCommand::Kill { all, task_ids } => { SubCommand::Kill { task_ids, group, default, all} => {
let message = KillMessage { let message = KillMessage {
all: *all,
task_ids: task_ids.clone(), task_ids: task_ids.clone(),
group: group.clone(),
default: *default,
all: *all,
}; };
Ok(Message::Kill(message)) Ok(Message::Kill(message))
} }

View file

@ -41,7 +41,7 @@ pub fn print_state(state: State, cli_command: &SubCommand) {
return; return;
} }
println!("{}", get_daemon_headline(&state)); println!("{}", get_default_headline(&state));
// Early exit and hint if there are no tasks in the queue // Early exit and hint if there are no tasks in the queue
if state.tasks.is_empty() { if state.tasks.is_empty() {

View file

@ -20,7 +20,7 @@ pub fn has_special_columns(tasks: &BTreeMap<usize, Task>) -> (bool, bool) {
} }
/// Return a nicely formatted headline that's displayed at the start of `pueue status` /// Return a nicely formatted headline that's displayed at the start of `pueue status`
pub fn get_daemon_headline(state: &State) -> String { pub fn get_default_headline(state: &State) -> String {
// Print the current daemon state. // Print the current daemon state.
let daemon_status_text = if state.running { let daemon_status_text = if state.running {
style("running").with(Color::Green) style("running").with(Color::Green)
@ -30,7 +30,7 @@ pub fn get_daemon_headline(state: &State) -> String {
let parallel = state.settings.daemon.default_parallel_tasks; let parallel = state.settings.daemon.default_parallel_tasks;
format!( format!(
"{} ({} parallel): {}", "{} ({} parallel): {}",
style("Daemon status").attribute(Attribute::Bold), style("Default queue").attribute(Attribute::Bold),
parallel, parallel,
daemon_status_text daemon_status_text
) )

View file

@ -36,9 +36,9 @@ pub fn handle_message(message: Message, sender: &Sender<Message>, state: &Shared
} }
} }
/// Invoked when calling `pueue add`.
/// Queues a new task to the state. /// Queues a new task to the state.
/// If the start_immediately flag is set, send a StartMessage to the task handler. /// If the start_immediately flag is set, send a StartMessage to the task handler.
/// Invoked when calling `pueue add`.
fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState) -> Message { fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
let starting_status = if message.stashed || message.enqueue_at.is_some() { let starting_status = if message.stashed || message.enqueue_at.is_some() {
TaskStatus::Stashed TaskStatus::Stashed
@ -89,7 +89,7 @@ fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState)
sender sender
.send(Message::Start(StartMessage { .send(Message::Start(StartMessage {
task_ids: vec![task_id], task_ids: vec![task_id],
group: None, ..Default::default()
})) }))
.expect(SENDER_ERR); .expect(SENDER_ERR);
} }
@ -108,9 +108,9 @@ fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState)
create_success_message(message) create_success_message(message)
} }
/// Invoked when calling `pueue remove`.
/// Remove tasks from the queue. /// Remove tasks from the queue.
/// We have to ensure that those tasks aren't running! /// We have to ensure that those tasks aren't running!
/// Invoked when calling `pueue remove`.
fn remove(task_ids: Vec<usize>, state: &SharedState) -> Message { fn remove(task_ids: Vec<usize>, state: &SharedState) -> Message {
let mut state = state.lock().unwrap(); let mut state = state.lock().unwrap();
let statuses = vec![TaskStatus::Running, TaskStatus::Paused]; let statuses = vec![TaskStatus::Running, TaskStatus::Paused];
@ -126,9 +126,9 @@ fn remove(task_ids: Vec<usize>, state: &SharedState) -> Message {
create_success_message(response) create_success_message(response)
} }
/// Invoked when calling `pueue switch`.
/// Switch the position of two tasks in the upcoming queue. /// Switch the position of two tasks in the upcoming queue.
/// We have to ensure that those tasks are either `Queued` or `Stashed` /// We have to ensure that those tasks are either `Queued` or `Stashed`
/// Invoked when calling `pueue switch`.
fn switch(message: SwitchMessage, state: &SharedState) -> Message { fn switch(message: SwitchMessage, state: &SharedState) -> Message {
let task_ids = vec![message.task_id_1, message.task_id_2]; let task_ids = vec![message.task_id_1, message.task_id_2];
let statuses = vec![TaskStatus::Queued, TaskStatus::Stashed]; let statuses = vec![TaskStatus::Queued, TaskStatus::Stashed];
@ -154,9 +154,9 @@ fn switch(message: SwitchMessage, state: &SharedState) -> Message {
create_success_message("Tasks have been switched") create_success_message("Tasks have been switched")
} }
/// Invoked when calling `pueue stash`.
/// Stash specific queued tasks. /// Stash specific queued tasks.
/// They won't be executed until they're enqueued or explicitely started. /// They won't be executed until they're enqueued or explicitely started.
/// Invoked when calling `pueue stash`.
fn stash(task_ids: Vec<usize>, state: &SharedState) -> Message { fn stash(task_ids: Vec<usize>, state: &SharedState) -> Message {
let (matching, mismatching) = { let (matching, mismatching) = {
let mut state = state.lock().unwrap(); let mut state = state.lock().unwrap();
@ -175,8 +175,8 @@ fn stash(task_ids: Vec<usize>, state: &SharedState) -> Message {
create_success_message(response) create_success_message(response)
} }
/// Enqueue specific stashed tasks.
/// Invoked when calling `pueue enqueue`. /// Invoked when calling `pueue enqueue`.
/// Enqueue specific stashed tasks.
fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message { fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message {
let (matching, mismatching) = { let (matching, mismatching) = {
let mut state = state.lock().unwrap(); let mut state = state.lock().unwrap();
@ -206,33 +206,15 @@ fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message {
create_success_message(response) create_success_message(response)
} }
/// Forward the start message to the task handler, which then starts the process(es).
/// Invoked when calling `pueue start`. /// Invoked when calling `pueue start`.
/// Forward the start message to the task handler, which then starts the process(es).
fn start(message: StartMessage, sender: &Sender<Message>, state: &SharedState) -> Message { fn start(message: StartMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
// Start a group // Check whether a given group exists
if let Some(group) = message.group { if let Some(group) = &message.group {
let mut state = state.lock().unwrap(); let state = state.lock().unwrap();
if !state.groups.contains_key(&group) { if !state.groups.contains_key(group) {
return create_failure_message(format!("Group {} doesn't exists", group)); return create_failure_message(format!("Group {} doesn't exists", group));
} }
// Set the group to paused.
state.groups.insert(group.clone(), true);
state.save();
// Notify the task handler to start all (if any) paused tasks in that group.
let paused_tasks = state.task_ids_in_group_with_status(&group, TaskStatus::Paused);
if !paused_tasks.is_empty() {
let start_message = StartMessage {
task_ids: paused_tasks,
group: None,
};
sender
.send(Message::Start(start_message))
.expect(SENDER_ERR);
}
return create_success_message(format!("Group {} started", group));
} }
sender sender
@ -248,12 +230,18 @@ fn start(message: StartMessage, sender: &Sender<Message>, state: &SharedState) -
return create_success_message(response); return create_success_message(response);
} }
create_success_message("Daemon and all tasks are being resumed.") if let Some(group) = &message.group {
create_success_message(format!("Group {} is being resumed.", group))
} else if message.all {
create_success_message("All queues are being resumed.")
} else {
create_success_message("Default queue is being resumed.")
}
} }
/// Invoked when calling `pueue restart`.
/// Create and enqueue tasks from already finished tasks. /// Create and enqueue tasks from already finished tasks.
/// The user can specify to immediately start the newly created tasks. /// The user can specify to immediately start the newly created tasks.
/// Invoked when calling `pueue restart`.
fn restart(message: RestartMessage, sender: &Sender<Message>, state: &SharedState) -> Message { fn restart(message: RestartMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
let new_status = if message.stashed { let new_status = if message.stashed {
TaskStatus::Stashed TaskStatus::Stashed
@ -287,7 +275,7 @@ fn restart(message: RestartMessage, sender: &Sender<Message>, state: &SharedStat
if message.start_immediately { if message.start_immediately {
let message = StartMessage { let message = StartMessage {
task_ids: new_ids, task_ids: new_ids,
group: None, ..Default::default()
}; };
sender.send(Message::Start(message)).expect(SENDER_ERR); sender.send(Message::Start(message)).expect(SENDER_ERR);
} }
@ -295,36 +283,17 @@ fn restart(message: RestartMessage, sender: &Sender<Message>, state: &SharedStat
create_success_message(response) create_success_message(response)
} }
/// Forward the pause message to the task handler, which then pauses the process(es).
/// Invoked when calling `pueue pause`. /// Invoked when calling `pueue pause`.
/// Forward the pause message to the task handler, which then pauses groups/tasks/everything.
fn pause(message: PauseMessage, sender: &Sender<Message>, state: &SharedState) -> Message { fn pause(message: PauseMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
// Pause a specific group // Check whether a given group exists
if let Some(group) = message.group { if let Some(group) = &message.group {
let mut state = state.lock().unwrap(); let state = state.lock().unwrap();
if !state.groups.contains_key(&group) { if !state.groups.contains_key(group) {
return create_failure_message(format!("Group {} doesn't exists", group)); return create_failure_message(format!("Group {} doesn't exists", group));
} }
// Set the group to paused.
state.groups.insert(group.clone(), false);
state.save();
// Notify the task handler to pause all (if any) running tasks in that group.
let running_tasks = state.task_ids_in_group_with_status(&group, TaskStatus::Running);
if !message.wait && !running_tasks.is_empty() {
let pause_message = PauseMessage {
task_ids: running_tasks,
wait: false,
group: Some(group.clone()),
};
sender
.send(Message::Pause(pause_message))
.expect(SENDER_ERR);
}
return create_success_message(format!("Group {} paused", group));
} }
// Forward the pause message to the TaskHandler
sender sender
.send(Message::Pause(message.clone())) .send(Message::Pause(message.clone()))
.expect(SENDER_ERR); .expect(SENDER_ERR);
@ -337,12 +306,17 @@ fn pause(message: PauseMessage, sender: &Sender<Message>, state: &SharedState) -
); );
return create_success_message(response); return create_success_message(response);
} }
if let Some(group) = &message.group {
create_success_message("Daemon and all tasks are being paused.") create_success_message(format!("Group {} is being paused.", group))
} else if message.all {
create_success_message("All queues are being paused.")
} else {
create_success_message("Default queue is being paused.")
}
} }
/// Forward the kill message to the task handler, which then kills the process.
/// Invoked when calling `pueue kill`. /// Invoked when calling `pueue kill`.
/// Forward the kill message to the task handler, which then kills the process.
fn kill(message: KillMessage, sender: &Sender<Message>, state: &SharedState) -> Message { fn kill(message: KillMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
sender sender
.send(Message::Kill(message.clone())) .send(Message::Kill(message.clone()))
@ -358,12 +332,18 @@ fn kill(message: KillMessage, sender: &Sender<Message>, state: &SharedState) ->
return create_success_message(response); return create_success_message(response);
} }
create_success_message("All tasks are being killed.") if let Some(group) = &message.group {
create_success_message(format!("All tasks of Group {} is being killed.", group))
} else if message.all {
create_success_message("All tasks are being killed.")
} else {
create_success_message("All tasks of the default queue are being paused.")
}
} }
/// Invoked when calling `pueue send`.
/// The message will be forwarded to the task handler, which then sends the user input to the process. /// The message will be forwarded to the task handler, which then sends the user input to the process.
/// In here we only do some error handling. /// In here we only do some error handling.
/// Invoked when calling `pueue send`.
fn send(message: SendMessage, sender: &Sender<Message>, state: &SharedState) -> Message { fn send(message: SendMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
// Check whether the task exists and is running. Abort if that's not the case. // Check whether the task exists and is running. Abort if that's not the case.
{ {
@ -384,9 +364,9 @@ fn send(message: SendMessage, sender: &Sender<Message>, state: &SharedState) ->
create_success_message("Message is being send to the process.") create_success_message("Message is being send to the process.")
} }
/// Invoked when calling `pueue edit`.
/// If a user wants to edit a message, we need to send him the current command. /// If a user wants to edit a message, we need to send him the current command.
/// Lock the task to prevent execution, before the user has finished editing the command. /// Lock the task to prevent execution, before the user has finished editing the command.
/// Invoked when calling `pueue edit`.
fn edit_request(task_id: usize, state: &SharedState) -> Message { fn edit_request(task_id: usize, state: &SharedState) -> Message {
// Check whether the task exists and is queued/stashed. Abort if that's not the case. // Check whether the task exists and is queued/stashed. Abort if that's not the case.
let mut state = state.lock().unwrap(); let mut state = state.lock().unwrap();
@ -409,8 +389,8 @@ fn edit_request(task_id: usize, state: &SharedState) -> Message {
} }
} }
/// Now we actually update the message with the updated command from the client.
/// Invoked after closing the editor on `pueue edit`. /// Invoked after closing the editor on `pueue edit`.
/// Now we actually update the message with the updated command from the client.
fn edit(message: EditMessage, state: &SharedState) -> Message { fn edit(message: EditMessage, state: &SharedState) -> Message {
// Check whether the task exists and is locked. Abort if that's not the case // Check whether the task exists and is locked. Abort if that's not the case
let mut state = state.lock().unwrap(); let mut state = state.lock().unwrap();
@ -431,11 +411,11 @@ fn edit(message: EditMessage, state: &SharedState) -> Message {
} }
} }
/// Invoked on `pueue groups`.
/// Manage groups. /// Manage groups.
/// - Show groups /// - Show groups
/// - Add group /// - Add group
/// - Remove group /// - Remove group
/// Invoked on `pueue groups`.
fn group(message: GroupMessage, state: &SharedState) -> Message { fn group(message: GroupMessage, state: &SharedState) -> Message {
let mut state = state.lock().unwrap(); let mut state = state.lock().unwrap();
@ -485,8 +465,8 @@ fn group(message: GroupMessage, state: &SharedState) -> Message {
create_success_message(group_status) create_success_message(group_status)
} }
/// Remove all failed or done tasks from the state.
/// Invoked when calling `pueue clean`. /// Invoked when calling `pueue clean`.
/// Remove all failed or done tasks from the state.
fn clean(state: &SharedState) -> Message { fn clean(state: &SharedState) -> Message {
let mut state = state.lock().unwrap(); let mut state = state.lock().unwrap();
state.backup(); state.backup();
@ -502,24 +482,24 @@ fn clean(state: &SharedState) -> Message {
create_success_message("All finished tasks have been removed") create_success_message("All finished tasks have been removed")
} }
/// Invoked when calling `pueue reset`.
/// Forward the reset request to the task handler. /// Forward the reset request to the task handler.
/// The handler then kills all children and clears the task queue. /// The handler then kills all children and clears the task queue.
/// Invoked when calling `pueue reset`.
fn reset(sender: &Sender<Message>, state: &SharedState) -> Message { fn reset(sender: &Sender<Message>, state: &SharedState) -> Message {
sender.send(Message::Reset).expect(SENDER_ERR); sender.send(Message::Reset).expect(SENDER_ERR);
clean(state); clean(state);
create_success_message("Everything is being reset right now.") create_success_message("Everything is being reset right now.")
} }
/// Return the current state.
/// Invoked when calling `pueue status`. /// Invoked when calling `pueue status`.
/// Return the current state.
fn get_status(state: &SharedState) -> Message { fn get_status(state: &SharedState) -> Message {
let state = state.lock().unwrap().clone(); let state = state.lock().unwrap().clone();
Message::StatusResponse(state) Message::StatusResponse(state)
} }
/// Return the current state and the stdou/stderr of all tasks to the client.
/// Invoked when calling `pueue log`. /// Invoked when calling `pueue log`.
/// Return the current state and the stdou/stderr of all tasks to the client.
fn get_log(message: LogRequestMessage, state: &SharedState) -> Message { fn get_log(message: LogRequestMessage, state: &SharedState) -> Message {
let state = state.lock().unwrap().clone(); let state = state.lock().unwrap().clone();
// Return all logs, if no specific task id is specified // Return all logs, if no specific task id is specified

View file

@ -25,7 +25,6 @@ pub struct TaskHandler {
receiver: Receiver<Message>, receiver: Receiver<Message>,
children: BTreeMap<usize, Child>, children: BTreeMap<usize, Child>,
callbacks: Vec<Child>, callbacks: Vec<Child>,
running: bool,
reset: bool, reset: bool,
// Some static settings that are extracted from `state.settings` for convenience purposes. // Some static settings that are extracted from `state.settings` for convenience purposes.
pueue_directory: String, pueue_directory: String,
@ -35,11 +34,12 @@ pub struct TaskHandler {
impl TaskHandler { impl TaskHandler {
pub fn new(state: SharedState, receiver: Receiver<Message>) -> Self { pub fn new(state: SharedState, receiver: Receiver<Message>) -> Self {
let (running, pueue_directory, callback, pause_on_failure) = { // Extract some static settings we often need.
// This prevents locking the State all the time.
let (pueue_directory, callback, pause_on_failure) = {
let state = state.lock().unwrap(); let state = state.lock().unwrap();
let settings = &state.settings.daemon; let settings = &state.settings.daemon;
( (
state.running,
settings.pueue_directory.clone(), settings.pueue_directory.clone(),
settings.callback.clone(), settings.callback.clone(),
settings.pause_on_failure, settings.pause_on_failure,
@ -50,7 +50,6 @@ impl TaskHandler {
receiver, receiver,
children: BTreeMap::new(), children: BTreeMap::new(),
callbacks: Vec::new(), callbacks: Vec::new(),
running,
reset: false, reset: false,
pueue_directory, pueue_directory,
callback, callback,
@ -86,7 +85,7 @@ impl TaskHandler {
self.check_callbacks(); self.check_callbacks();
self.check_stashed(); self.check_stashed();
self.check_failed_dependencies(); self.check_failed_dependencies();
if self.running && !self.reset { if !self.reset {
let _res = self.check_new(); let _res = self.check_new();
} }
} }
@ -96,15 +95,17 @@ impl TaskHandler {
/// Precondition for a task to be started: /// Precondition for a task to be started:
/// - is in Queued state /// - is in Queued state
/// - There are free slots in the task's group /// - There are free slots in the task's group
/// - The group is running
/// - has all its dependencies in `Done` state /// - has all its dependencies in `Done` state
pub fn get_next_task_id(&mut self) -> Option<usize> { pub fn get_next_task_id(&mut self) -> Option<usize> {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
// Check how many tasks are running in each group // Check how many tasks are running in each group
let mut running_tasks_per_group: HashMap<String, usize> = HashMap::new(); let mut running_tasks_per_group: HashMap<String, usize> = HashMap::new();
// Create a default group for tasks without an explicit group // Create a default group for tasks without an explicit group
running_tasks_per_group.insert("default".into(), 0); running_tasks_per_group.insert("default".into(), 0);
// Walk through all tasks and save the number of running tasks by group
for (_, task) in state.tasks.iter() { for (_, task) in state.tasks.iter() {
// We are only interested in currently running tasks. // We are only interested in currently running tasks.
if ![TaskStatus::Running, TaskStatus::Paused].contains(&task.status) { if ![TaskStatus::Running, TaskStatus::Paused].contains(&task.status) {
@ -158,9 +159,14 @@ impl TaskHandler {
}, },
} }
} else { } else {
// The task is assigned to the default queue.
// Check if the default queue is paused and return false if it's not.
if !state.running {
return false;
}
// We can unwrap safely, since default is always created. // We can unwrap safely, since default is always created.
let running = running_tasks_per_group.get("default").unwrap(); let running = running_tasks_per_group.get("default").unwrap();
running < &state.settings.daemon.default_parallel_tasks running < &state.settings.daemon.default_parallel_tasks
} }
}) })
@ -284,9 +290,7 @@ impl TaskHandler {
// Pause the daemon, if the settings say so // Pause the daemon, if the settings say so
if self.pause_on_failure { if self.pause_on_failure {
self.running = false; state.running = false
state.running = false;
state.save();
} }
return; return;
} }
@ -336,7 +340,6 @@ impl TaskHandler {
if self.reset && self.children.is_empty() { if self.reset && self.children.is_empty() {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.reset(); state.reset();
self.running = true;
self.reset = false; self.reset = false;
} }
@ -388,8 +391,7 @@ impl TaskHandler {
// Pause the daemon, if the settings say so and some process failed // Pause the daemon, if the settings say so and some process failed
if failed_task_exists && self.pause_on_failure { if failed_task_exists && self.pause_on_failure {
self.running = false; self.change_running(false);
state.running = false;
} }
state.save() state.save()
@ -465,6 +467,7 @@ impl TaskHandler {
/// 2. Or force the start of specific tasks. /// 2. Or force the start of specific tasks.
fn start(&mut self, message: StartMessage) { fn start(&mut self, message: StartMessage) {
// Only start specific tasks // Only start specific tasks
// This is handled separately, since this can also force-spawn processes
if !message.task_ids.is_empty() { if !message.task_ids.is_empty() {
for id in &message.task_ids { for id in &message.task_ids {
// Continue all children that are simply paused // Continue all children that are simply paused
@ -478,13 +481,42 @@ impl TaskHandler {
return; return;
} }
// Start the daemon and all paused tasks // Get the keys of all tasks that should be resumed
let keys: Vec<usize> = self.children.keys().cloned().collect(); // These can either be
// - All running tasks
// - The paused tasks of a specific group
// - The paused tasks of the default queue
let keys: Vec<usize> = if message.all {
// Resume all groups and the default queue
info!("Resuming everything");
let mut state = self.state.lock().unwrap();
state.set_status_for_all_groups(true);
self.children.keys().cloned().collect()
} else if let Some(group) = &message.group {
let mut state = self.state.lock().unwrap();
// Ensure that a given group exists. (Might not happen due to concurrency)
if !state.groups.contains_key(group) {
return
}
// Set the group to running.
state.groups.insert(group.clone(), true);
info!("Resuming group {}", group);
state.task_ids_in_group_with_stati(&message.group, vec![TaskStatus::Paused])
} else {
let mut state = self.state.lock().unwrap();
state.running = true;
info!("Resuming default queue");
state.save();
state.task_ids_in_group_with_stati(&None, vec![TaskStatus::Paused])
};
// Resume all specified paused tasks
for id in keys { for id in keys {
self.continue_task(id); self.continue_task(id);
} }
info!("Resuming daemon (start)");
self.change_running(true);
} }
/// Send a start signal to a paused task to continue execution. /// Send a start signal to a paused task to continue execution.
@ -521,23 +553,48 @@ impl TaskHandler {
/// 1. Either pause the daemon and all tasks. /// 1. Either pause the daemon and all tasks.
/// 2. Or only pause specific tasks. /// 2. Or only pause specific tasks.
fn pause(&mut self, message: PauseMessage) { fn pause(&mut self, message: PauseMessage) {
// Get the keys of all tasks that should be resumed
// These can either be
// - Specific tasks
// - All running tasks
// - The paused tasks of a group
// - The paused tasks of the default queue
// Only pause specific tasks // Only pause specific tasks
if !message.task_ids.is_empty() { let keys: Vec<usize> = if !message.task_ids.is_empty() {
for id in &message.task_ids { message.task_ids
self.pause_task(*id); } else if message.all {
} // Pause all running tasks
return; let mut state = self.state.lock().unwrap();
} state.set_status_for_all_groups(false);
// Pause the daemon and all tasks info!("Pausing everything");
let keys: Vec<usize> = self.children.keys().cloned().collect(); self.children.keys().cloned().collect()
} else if let Some(group) = &message.group {
// Ensure that a given group exists. (Might not happen due to concurrency)
let mut state = self.state.lock().unwrap();
if !state.groups.contains_key(group) {
return
}
// Pause a specific group.
state.groups.insert(group.clone(), false);
info!("Pausing group {}", group);
state.task_ids_in_group_with_stati(&message.group, vec![TaskStatus::Running])
} else {
// Pause the default queue
let mut state = self.state.lock().unwrap();
state.running = false;
info!("Pausing default queue");
state.task_ids_in_group_with_stati(&None, vec![TaskStatus::Running])
};
// Pause all specified tasks
if !message.wait { if !message.wait {
for id in keys { for id in keys {
self.pause_task(id); self.pause_task(id);
} }
} }
info!("Pausing daemon");
self.change_running(false);
} }
/// Pause a specific task. /// Pause a specific task.
@ -564,26 +621,50 @@ impl TaskHandler {
} }
} }
/// Handle the pause message: /// Handle the kill message:
/// 1. Either kill all tasks. /// 1. Kill specific tasks.
/// 2. Or only kill specific tasks. /// 2. Kill all tasks.
/// 3. Kill all tasks of a specific group.
/// 4. Kill all tasks of the default queue.
fn kill(&mut self, message: KillMessage) { fn kill(&mut self, message: KillMessage) {
// Get the keys of all tasks that should be resumed
// These can either be
// - Specific tasks
// - All running tasks
// - The paused tasks of a group
// - The paused tasks of the default queue
// Only pause specific tasks // Only pause specific tasks
if !message.task_ids.is_empty() { let task_ids: Vec<usize> = if !message.task_ids.is_empty() {
for id in message.task_ids { message.task_ids
self.kill_task(id); } else if message.all {
} // Pause all running tasks
return; let mut state = self.state.lock().unwrap();
} state.set_status_for_all_groups(false);
// Pause the daemon and kill all tasks. info!("Killing all running tasks");
if message.all { self.children.keys().cloned().collect()
info!("Killing all spawned children"); } else if let Some(group) = &message.group {
self.change_running(false); // Ensure that a given group exists. (Might not happen due to concurrency)
let keys: Vec<usize> = self.children.keys().cloned().collect(); let mut state = self.state.lock().unwrap();
for id in keys { if !state.groups.contains_key(group) {
self.kill_task(id); return
} }
// Pause a specific group.
state.groups.insert(group.clone(), false);
info!("Killing tasks of group {}", group);
state.task_ids_in_group_with_stati(&message.group, vec![TaskStatus::Running, TaskStatus::Paused])
} else {
// Pause the default queue
let mut state = self.state.lock().unwrap();
state.running = false;
info!("Killing tasks of the default queue");
state.task_ids_in_group_with_stati(&None, vec![TaskStatus::Running, TaskStatus::Paused])
};
for task_id in task_ids {
self.kill_task(task_id);
} }
} }
@ -637,8 +718,8 @@ impl TaskHandler {
/// If all children finished, the state will be completely reset. /// If all children finished, the state will be completely reset.
fn reset(&mut self) { fn reset(&mut self) {
let message = KillMessage { let message = KillMessage {
task_ids: Vec::new(),
all: true, all: true,
..Default::default()
}; };
self.kill(message); self.kill(message);
@ -649,7 +730,6 @@ impl TaskHandler {
fn change_running(&mut self, running: bool) { fn change_running(&mut self, running: bool) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.running = running; state.running = running;
self.running = running;
state.save(); state.save();
} }

View file

@ -72,23 +72,27 @@ pub struct RestartMessage {
pub stashed: bool, pub stashed: bool,
} }
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct StartMessage { pub struct StartMessage {
pub task_ids: Vec<usize>, pub task_ids: Vec<usize>,
pub group: Option<String>, pub group: Option<String>,
pub all: bool,
} }
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct PauseMessage { pub struct PauseMessage {
pub task_ids: Vec<usize>, pub task_ids: Vec<usize>,
pub wait: bool,
pub group: Option<String>, pub group: Option<String>,
pub wait: bool,
pub all: bool,
} }
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct KillMessage { pub struct KillMessage {
pub all: bool,
pub task_ids: Vec<usize>, pub task_ids: Vec<usize>,
pub group: Option<String>,
pub default: bool,
pub all: bool,
} }
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]

View file

@ -98,21 +98,27 @@ impl State {
self.settings.save() self.settings.save()
} }
/// Set the running status for all groups including the default queue
pub fn set_status_for_all_groups(&mut self, status: bool) {
self.running = status;
let keys = self.groups.keys().cloned().collect::<Vec<String>>();
for key in keys {
self.groups.insert(key.into(), status);
}
self.save()
}
/// Get all task ids of a specific group /// Get all task ids of a specific group
pub fn task_ids_in_group_with_status( pub fn task_ids_in_group_with_stati(
&mut self, &mut self,
group: &String, group: &Option<String>,
status: TaskStatus, stati: Vec<TaskStatus>,
) -> Vec<usize> { ) -> Vec<usize> {
self.tasks self.tasks
.iter() .iter()
.filter(|(_, task)| task.status == status) .filter(|(_, task)| stati.contains(&task.status))
.filter(|(_, task)| { .filter(|(_, task)| {
if let Some(task_group) = &task.group { group == &task.group
return task_group == group;
}
false
}) })
.map(|(id, _)| *id) .map(|(id, _)| *id)
.collect() .collect()
@ -161,7 +167,7 @@ impl State {
pub fn reset(&mut self) { pub fn reset(&mut self) {
self.backup(); self.backup();
self.running = true; self.set_status_for_all_groups(true);
self.max_id = 0; self.max_id = 0;
self.tasks = BTreeMap::new(); self.tasks = BTreeMap::new();
self.save(); self.save();

27
test_helper/groups.sh Executable file
View file

@ -0,0 +1,27 @@
# Reset daemon
pueue reset
sleep 1
pueue group -a hdd
pueue group -a cpu
pueue parallel 1
pueue parallel -g cpu 1
pueue parallel -g hdd 1
pueue pause
pueue add sleep 6000
pueue add sleep 6000
pueue start 1
pueue add -g hdd "sleep 5000"
pueue pause -g hdd
pueue add -g hdd "sleep 5000"
pueue add -g hdd "sleep 5000"
pueue start 4
pueue pause -g cpu
pueue add -g cpu "sleep 5000"
pueue add -g cpu "sleep 5000"
pueue add -g cpu "sleep 5000"
pueue start -g cpu