Prevent removal of tasks with active dependants

This commit is contained in:
Arne Beer 2020-12-30 14:30:49 +01:00
parent f53e74c5fb
commit 1a4e815384
22 changed files with 934 additions and 622 deletions

View file

@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.10.2] - 31-12-2020
### Fixed
- It was possible to remove tasks with active dependants, i.e. tasks which have a dependency and didn't finish yet.
This didn't lead to any crashes, but could lead to unwanted behavior, since the dependant tasks simply started due to the dependency no longer existing.
It's however still possible to delete dependencies as long as their dependants are deleted as well.
## [0.10.1] - 29-12-2020
### Fixed

View file

@ -67,7 +67,10 @@ async fn main() -> Result<()> {
}
init_shared_secret(&settings.shared.shared_secret_path)?;
let state = State::new(&settings, opt.config.clone());
let mut state = State::new(&settings, opt.config.clone());
// Restore the previous state and save any changes that might have happened during this process
state.restore();
state.save();
let state = Arc::new(Mutex::new(state));
let (sender, receiver) = channel();

View file

@ -1,599 +0,0 @@
use std::sync::mpsc::Sender;
use std::{collections::BTreeMap, sync::MutexGuard};
use pueue::aliasing::insert_alias;
use pueue::log::{clean_log_handles, read_and_compress_log_files};
use pueue::network::message::*;
use pueue::network::protocol::socket_cleanup;
use pueue::state::{SharedState, State};
use pueue::task::{Task, TaskStatus};
use crate::network::response_helper::*;
static SENDER_ERR: &str = "Failed to send message to task handler thread";
pub fn handle_message(message: Message, sender: &Sender<Message>, state: &SharedState) -> Message {
match message {
Message::Add(message) => add_task(message, sender, state),
Message::Remove(task_ids) => remove(task_ids, state),
Message::Restart(message) => restart_multiple(message, sender, state),
Message::Switch(message) => switch(message, state),
Message::Stash(task_ids) => stash(task_ids, state),
Message::Enqueue(message) => enqueue(message, state),
Message::Start(message) => start(message, sender, state),
Message::Pause(message) => pause(message, sender, state),
Message::Kill(message) => kill(message, sender, state),
Message::Send(message) => send(message, sender, state),
Message::EditRequest(task_id) => edit_request(task_id, state),
Message::Edit(message) => edit(message, state),
Message::Group(message) => group(message, state),
Message::Clean => clean(state),
Message::Reset(message) => reset(message, sender),
Message::Status => get_status(state),
Message::Log(message) => get_log(message, state),
Message::Parallel(message) => set_parallel_tasks(message, state),
Message::DaemonShutdown => shutdown(sender, state),
_ => create_failure_message("Not implemented yet"),
}
}
/// Invoked when calling `pueue add`.
/// Queues a new task to the state.
/// If the start_immediately flag is set, send a StartMessage to the task handler.
fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
if let Err(message) = ensure_group_exists(&state, &message.group) {
return message;
}
let starting_status = if message.stashed || message.enqueue_at.is_some() {
TaskStatus::Stashed
} else {
TaskStatus::Queued
};
// Ensure that specified dependencies actually exist.
let not_found: Vec<_> = message
.dependencies
.iter()
.filter(|id| !state.tasks.contains_key(id))
.collect();
if !not_found.is_empty() {
return create_failure_message(format!(
"Unable to setup dependencies : task(s) {:?} not found",
not_found
));
}
// Create a new task and add it to the state.
let task = Task::new(
message.command,
message.path,
message.envs,
message.group,
starting_status,
message.enqueue_at,
message.dependencies,
message.label,
);
let task_id = state.add_task(task);
// Notify the task handler, in case the client wants to start the task immediately.
if message.start_immediately {
sender
.send(Message::Start(StartMessage {
task_ids: vec![task_id],
..Default::default()
}))
.expect(SENDER_ERR);
}
// Create the customized response for the client.
let message = if message.print_task_id {
task_id.to_string()
} else if let Some(enqueue_at) = message.enqueue_at {
format!(
"New task added (id {}). It will be enqueued at {}",
task_id,
enqueue_at.format("%Y-%m-%d %H:%M:%S")
)
} else {
format!("New task added (id {}).", task_id)
};
state.save();
create_success_message(message)
}
/// Invoked when calling `pueue remove`.
/// Remove tasks from the queue.
/// We have to ensure that those tasks aren't running!
fn remove(task_ids: Vec<usize>, state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
let statuses = vec![
TaskStatus::Queued,
TaskStatus::Stashed,
TaskStatus::Done,
TaskStatus::Locked,
];
let (not_running, running) = state.tasks_in_statuses(statuses, Some(task_ids));
for task_id in &not_running {
state.tasks.remove(task_id);
}
let text = "Tasks removed from list";
let response = compile_task_response(text, not_running, running);
create_success_message(response)
}
/// Invoked when calling `pueue switch`.
/// Switch the position of two tasks in the upcoming queue.
/// We have to ensure that those tasks are either `Queued` or `Stashed`
fn switch(message: SwitchMessage, state: &SharedState) -> Message {
let task_ids = vec![message.task_id_1, message.task_id_2];
let statuses = vec![TaskStatus::Queued, TaskStatus::Stashed];
let mut state = state.lock().unwrap();
let (_, mismatching) = state.tasks_in_statuses(statuses, Some(task_ids.to_vec()));
if !mismatching.is_empty() {
return create_failure_message("Tasks have to be either queued or stashed.");
}
// Get the tasks. Expect them to be there, since we found no mismatch
let mut first_task = state.tasks.remove(&task_ids[0]).unwrap();
let mut second_task = state.tasks.remove(&task_ids[1]).unwrap();
// Switch task ids
let temp_id = first_task.id;
first_task.id = second_task.id;
second_task.id = temp_id;
// Put tasks back in again
state.tasks.insert(first_task.id, first_task);
state.tasks.insert(second_task.id, second_task);
create_success_message("Tasks have been switched")
}
/// Invoked when calling `pueue stash`.
/// Stash specific queued tasks.
/// They won't be executed until they're enqueued or explicitely started.
fn stash(task_ids: Vec<usize>, state: &SharedState) -> Message {
let (matching, mismatching) = {
let mut state = state.lock().unwrap();
let (matching, mismatching) =
state.tasks_in_statuses(vec![TaskStatus::Queued, TaskStatus::Locked], Some(task_ids));
for task_id in &matching {
state.change_status(*task_id, TaskStatus::Stashed);
}
(matching, mismatching)
};
let text = "Tasks are stashed";
let response = compile_task_response(text, matching, mismatching);
create_success_message(response)
}
/// Invoked when calling `pueue enqueue`.
/// Enqueue specific stashed tasks.
fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message {
let (matching, mismatching) = {
let mut state = state.lock().unwrap();
let (matching, mismatching) = state.tasks_in_statuses(
vec![TaskStatus::Stashed, TaskStatus::Locked],
Some(message.task_ids),
);
for task_id in &matching {
state.set_enqueue_at(*task_id, message.enqueue_at);
state.change_status(*task_id, TaskStatus::Queued);
}
(matching, mismatching)
};
let text = if let Some(enqueue_at) = message.enqueue_at {
format!(
"Tasks will be enqueued at {}",
enqueue_at.format("%Y-%m-%d %H:%M:%S")
)
} else {
String::from("Tasks are enqueued")
};
let response = compile_task_response(text.as_str(), matching, mismatching);
create_success_message(response)
}
/// Invoked when calling `pueue start`.
/// Forward the start message to the task handler, which then starts the process(es).
fn start(message: StartMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
let state = state.lock().unwrap();
if let Err(message) = ensure_group_exists(&state, &message.group) {
return message;
}
sender
.send(Message::Start(message.clone()))
.expect(SENDER_ERR);
if !message.task_ids.is_empty() {
let response = task_response_helper(
"Tasks are being started",
message.task_ids,
vec![TaskStatus::Paused, TaskStatus::Queued, TaskStatus::Stashed],
&state,
);
return create_success_message(response);
}
if message.all {
create_success_message("All queues are being resumed.")
} else {
create_success_message(format!("Group \"{}\" is being resumed.", &message.group))
}
}
/// This is a small wrapper around the actual in-place task `restart` functionality.
fn restart_multiple(
message: RestartMessage,
sender: &Sender<Message>,
state: &SharedState,
) -> Message {
let mut state = state.lock().unwrap();
for task in message.tasks.iter() {
restart(&mut state, task, message.stashed);
}
// Tell the task manager to start the task immediately, if it's requested.
if message.start_immediately {
sender
.send(Message::Start(StartMessage {
task_ids: message.tasks.iter().map(|task| task.task_id).collect(),
..Default::default()
}))
.expect(SENDER_ERR);
}
create_success_message("Tasks restarted")
}
/// This is invoked, whenever a task is actually restarted (in-place) without creating a new task.
/// Update a possibly changed path/command and reset all infos from the previous run.
fn restart(state: &mut MutexGuard<State>, to_restart: &TasksToRestart, stashed: bool) {
// Check if we actually know this task.
let task = if let Some(task) = state.tasks.get_mut(&to_restart.task_id) {
task
} else {
return;
};
// Either enqueue the task or stash it.
task.status = if stashed {
TaskStatus::Stashed
} else {
TaskStatus::Queued
};
// Update command and path.
task.original_command = to_restart.command.clone();
task.command = insert_alias(to_restart.command.clone());
task.path = to_restart.path.clone();
// Reset all variables of any previous run.
task.result = None;
task.start = None;
task.end = None;
}
/// Invoked when calling `pueue pause`.
/// Forward the pause message to the task handler, which then pauses groups/tasks/everything.
fn pause(message: PauseMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
let state = state.lock().unwrap();
if let Err(message) = ensure_group_exists(&state, &message.group) {
return message;
}
sender
.send(Message::Pause(message.clone()))
.expect(SENDER_ERR);
if !message.task_ids.is_empty() {
let response = task_response_helper(
"Tasks are being paused",
message.task_ids,
vec![TaskStatus::Running],
&state,
);
return create_success_message(response);
}
if message.all {
create_success_message("All queues are being paused.")
} else {
create_success_message(format!("Group \"{}\" is being paused.", &message.group))
}
}
/// Invoked when calling `pueue kill`.
/// Forward the kill message to the task handler, which then kills the process.
fn kill(message: KillMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
sender
.send(Message::Kill(message.clone()))
.expect(SENDER_ERR);
if !message.task_ids.is_empty() {
let state = state.lock().unwrap();
let response = task_response_helper(
"Tasks are being killed",
message.task_ids,
vec![TaskStatus::Running, TaskStatus::Paused],
&state,
);
return create_success_message(response);
}
if message.all {
create_success_message("All tasks are being killed.")
} else {
create_success_message(format!(
"All tasks of group \"{}\" are being killed.",
&message.group
))
}
}
/// Invoked when calling `pueue send`.
/// The message will be forwarded to the task handler, which then sends the user input to the process.
/// In here we only do some error handling.
fn send(message: SendMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
// Check whether the task exists and is running. Abort if that's not the case.
{
let state = state.lock().unwrap();
match state.tasks.get(&message.task_id) {
Some(task) => {
if task.status != TaskStatus::Running {
return create_failure_message("You can only send input to a running task");
}
}
None => return create_failure_message("No task with this id."),
}
}
// Check whether the task exists and is running, abort if that's not the case.
sender.send(Message::Send(message)).expect(SENDER_ERR);
create_success_message("Message is being send to the process.")
}
/// Invoked when calling `pueue edit`.
/// If a user wants to edit a message, we need to send him the current command.
/// Lock the task to prevent execution, before the user has finished editing the command.
fn edit_request(task_id: usize, state: &SharedState) -> Message {
// Check whether the task exists and is queued/stashed. Abort if that's not the case.
let mut state = state.lock().unwrap();
match state.tasks.get_mut(&task_id) {
Some(task) => {
if !task.is_queued() {
return create_failure_message("You can only edit a queued/stashed task");
}
task.prev_status = task.status.clone();
task.status = TaskStatus::Locked;
let message = EditResponseMessage {
task_id: task.id,
command: task.original_command.clone(),
path: task.path.clone(),
};
Message::EditResponse(message)
}
None => create_failure_message("No task with this id."),
}
}
/// Invoked after closing the editor on `pueue edit`.
/// Now we actually update the message with the updated command from the client.
fn edit(message: EditMessage, state: &SharedState) -> Message {
// Check whether the task exists and is locked. Abort if that's not the case.
let mut state = state.lock().unwrap();
match state.tasks.get_mut(&message.task_id) {
Some(task) => {
if !(task.status == TaskStatus::Locked) {
return create_failure_message("Task is no longer locked.");
}
task.status = task.prev_status.clone();
task.original_command = message.command.clone();
task.command = insert_alias(message.command.clone());
task.path = message.path.clone();
state.save();
create_success_message("Command has been updated")
}
None => create_failure_message(format!("Task to edit has gone away: {}", message.task_id)),
}
}
/// Invoked on `pueue groups`.
/// Manage groups.
/// - Show groups
/// - Add group
/// - Remove group
fn group(message: GroupMessage, state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
// Create a new group.
if let Some(group) = message.add {
if state.groups.contains_key(&group) {
return create_failure_message(format!("Group \"{}\" already exists", group));
}
state.create_group(&group);
// Save the state and the settings file.
state.save();
if let Err(error) = state.save_settings() {
return create_failure_message(format!(
"Failed while saving the config file: {}",
error
));
}
return create_success_message(format!("Group \"{}\" created", group));
}
// Remove an existing group.
if let Some(group) = message.remove {
if let Err(message) = ensure_group_exists(&state, &group) {
return message;
}
if let Err(error) = state.remove_group(&group) {
return create_failure_message(format!("{}", error));
}
// Save the state and the settings file.
state.save();
if let Err(error) = state.save_settings() {
return create_failure_message(format!(
"Failed while saving the config file: {}",
error
));
}
return create_success_message(format!("Group \"{}\" removed", group));
}
// There are no groups yet.
if state.groups.is_empty() {
return create_success_message(
"There are no groups yet. You can add groups with the 'group --add' flag",
);
}
// Return information about all groups to the client.
Message::GroupResponse(GroupResponseMessage {
groups: state.groups.clone(),
settings: state.settings.daemon.groups.clone(),
})
}
/// Invoked when calling `pueue clean`.
/// Remove all failed or done tasks from the state.
fn clean(state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
state.backup();
let (matching, _) = state.tasks_in_statuses(vec![TaskStatus::Done], None);
for task_id in &matching {
let _ = state.tasks.remove(task_id).unwrap();
clean_log_handles(*task_id, &state.settings.shared.pueue_directory);
}
state.save();
create_success_message("All finished tasks have been removed")
}
/// Invoked when calling `pueue reset`.
/// Forward the reset request to the task handler.
/// The handler then kills all children and clears the task queue.
fn reset(message: ResetMessage, sender: &Sender<Message>) -> Message {
sender.send(Message::Reset(message)).expect(SENDER_ERR);
create_success_message("Everything is being reset right now.")
}
/// Invoked when calling `pueue status`.
/// Return the current state.
fn get_status(state: &SharedState) -> Message {
let state = state.lock().unwrap().clone();
Message::StatusResponse(Box::new(state))
}
/// Invoked when calling `pueue log`.
/// Return the current state and the stdou/stderr of all tasks to the client.
fn get_log(message: LogRequestMessage, state: &SharedState) -> Message {
let state = state.lock().unwrap().clone();
// Return all logs, if no specific task id is specified.
let task_ids = if message.task_ids.is_empty() {
state.tasks.keys().cloned().collect()
} else {
message.task_ids
};
let mut tasks = BTreeMap::new();
for task_id in task_ids.iter() {
if let Some(task) = state.tasks.get(task_id) {
// We send log output and the task at the same time.
// This isn't as efficient as sending the raw compressed data directly,
// but it's a lot more convenient for now.
let (stdout, stderr) = if message.send_logs {
match read_and_compress_log_files(*task_id, &state.settings.shared.pueue_directory)
{
Ok((stdout, stderr)) => (Some(stdout), Some(stderr)),
Err(err) => {
return create_failure_message(format!(
"Failed reading process output file: {:?}",
err
));
}
}
} else {
(None, None)
};
let task_log = TaskLogMessage {
task: task.clone(),
stdout,
stderr,
};
tasks.insert(*task_id, task_log);
}
}
Message::LogResponse(tasks)
}
/// Set the parallel tasks for either a specific group or the global default.
fn set_parallel_tasks(message: ParallelMessage, state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
if let Err(message) = ensure_group_exists(&state, &message.group) {
return message;
}
state
.settings
.daemon
.groups
.insert(message.group.clone(), message.parallel_tasks);
if let Err(error) = state.save_settings() {
return create_failure_message(format!("Failed while saving the config file: {}", error));
}
create_success_message(format!(
"Parallel tasks setting for group \"{}\" adjusted",
&message.group
))
}
/// Initialize the shutdown procedure.
/// At first, the unix socket will be removed.
///
/// Next, the DaemonShutdown Message will be forwarded to the TaskHandler.
/// The TaskHandler then gracefully shuts down all child processes
/// and exits with std::proces::exit(0).
fn shutdown(sender: &Sender<Message>, state: &SharedState) -> Message {
// Do some socket cleanup (unix socket).
{
let state = state.lock().unwrap();
socket_cleanup(&state.settings.shared);
}
// Notify the task handler.
sender.send(Message::DaemonShutdown).expect(SENDER_ERR);
create_success_message("Daemon is shutting down")
}

View file

@ -0,0 +1,75 @@
use std::sync::mpsc::Sender;
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::{Task, TaskStatus};
use super::*;
/// Invoked when calling `pueue add`.
/// Queues a new task to the state.
/// If the start_immediately flag is set, send a StartMessage to the task handler.
pub fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
if let Err(message) = ensure_group_exists(&state, &message.group) {
return message;
}
let starting_status = if message.stashed || message.enqueue_at.is_some() {
TaskStatus::Stashed
} else {
TaskStatus::Queued
};
// Ensure that specified dependencies actually exist.
let not_found: Vec<_> = message
.dependencies
.iter()
.filter(|id| !state.tasks.contains_key(id))
.collect();
if !not_found.is_empty() {
return create_failure_message(format!(
"Unable to setup dependencies : task(s) {:?} not found",
not_found
));
}
// Create a new task and add it to the state.
let task = Task::new(
message.command,
message.path,
message.envs,
message.group,
starting_status,
message.enqueue_at,
message.dependencies,
message.label,
);
let task_id = state.add_task(task);
// Notify the task handler, in case the client wants to start the task immediately.
if message.start_immediately {
sender
.send(Message::Start(StartMessage {
task_ids: vec![task_id],
..Default::default()
}))
.expect(SENDER_ERR);
}
// Create the customized response for the client.
let message = if message.print_task_id {
task_id.to_string()
} else if let Some(enqueue_at) = message.enqueue_at {
format!(
"New task added (id {}). It will be enqueued at {}",
task_id,
enqueue_at.format("%Y-%m-%d %H:%M:%S")
)
} else {
format!("New task added (id {}).", task_id)
};
state.save();
create_success_message(message)
}

View file

@ -0,0 +1,24 @@
use pueue::log::clean_log_handles;
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::TaskStatus;
/// Invoked when calling `pueue clean`.
/// Remove all failed or done tasks from the state.
pub fn clean(state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
state.backup();
let (matching, _) = state.tasks_in_statuses(vec![TaskStatus::Done], None);
for task_id in &matching {
if !state.is_task_removable(task_id, &[]) {
continue;
}
let _ = state.tasks.remove(task_id).unwrap();
clean_log_handles(*task_id, &state.settings.shared.pueue_directory);
}
state.save();
create_success_message("All finished tasks have been removed")
}

View file

@ -0,0 +1,52 @@
use pueue::aliasing::insert_alias;
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::TaskStatus;
/// Invoked when calling `pueue edit`.
/// If a user wants to edit a message, we need to send him the current command.
/// Lock the task to prevent execution, before the user has finished editing the command.
pub fn edit_request(task_id: usize, state: &SharedState) -> Message {
// Check whether the task exists and is queued/stashed. Abort if that's not the case.
let mut state = state.lock().unwrap();
match state.tasks.get_mut(&task_id) {
Some(task) => {
if !task.is_queued() {
return create_failure_message("You can only edit a queued/stashed task");
}
task.prev_status = task.status.clone();
task.status = TaskStatus::Locked;
let message = EditResponseMessage {
task_id: task.id,
command: task.original_command.clone(),
path: task.path.clone(),
};
Message::EditResponse(message)
}
None => create_failure_message("No task with this id."),
}
}
/// Invoked after closing the editor on `pueue edit`.
/// Now we actually update the message with the updated command from the client.
pub fn edit(message: EditMessage, state: &SharedState) -> Message {
// Check whether the task exists and is locked. Abort if that's not the case.
let mut state = state.lock().unwrap();
match state.tasks.get_mut(&message.task_id) {
Some(task) => {
if !(task.status == TaskStatus::Locked) {
return create_failure_message("Task is no longer locked.");
}
task.status = task.prev_status.clone();
task.original_command = message.command.clone();
task.command = insert_alias(message.command.clone());
task.path = message.path.clone();
state.save();
create_success_message("Command has been updated")
}
None => create_failure_message(format!("Task to edit has gone away: {}", message.task_id)),
}
}

View file

@ -0,0 +1,36 @@
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::TaskStatus;
use crate::network::response_helper::*;
/// Invoked when calling `pueue enqueue`.
/// Enqueue specific stashed tasks.
pub fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message {
let (matching, mismatching) = {
let mut state = state.lock().unwrap();
let (matching, mismatching) = state.tasks_in_statuses(
vec![TaskStatus::Stashed, TaskStatus::Locked],
Some(message.task_ids),
);
for task_id in &matching {
state.set_enqueue_at(*task_id, message.enqueue_at);
state.change_status(*task_id, TaskStatus::Queued);
}
(matching, mismatching)
};
let text = if let Some(enqueue_at) = message.enqueue_at {
format!(
"Tasks will be enqueued at {}",
enqueue_at.format("%Y-%m-%d %H:%M:%S")
)
} else {
String::from("Tasks are enqueued")
};
let response = compile_task_response(text.as_str(), matching, mismatching);
create_success_message(response)
}

View file

@ -0,0 +1,67 @@
use pueue::network::message::*;
use pueue::state::SharedState;
use crate::network::response_helper::ensure_group_exists;
/// Invoked on `pueue groups`.
/// Manage groups.
/// - Show groups
/// - Add group
/// - Remove group
pub fn group(message: GroupMessage, state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
// Create a new group.
if let Some(group) = message.add {
if state.groups.contains_key(&group) {
return create_failure_message(format!("Group \"{}\" already exists", group));
}
state.create_group(&group);
// Save the state and the settings file.
state.save();
if let Err(error) = state.save_settings() {
return create_failure_message(format!(
"Failed while saving the config file: {}",
error
));
}
return create_success_message(format!("Group \"{}\" created", group));
}
// Remove an existing group.
if let Some(group) = message.remove {
if let Err(message) = ensure_group_exists(&state, &group) {
return message;
}
if let Err(error) = state.remove_group(&group) {
return create_failure_message(format!("{}", error));
}
// Save the state and the settings file.
state.save();
if let Err(error) = state.save_settings() {
return create_failure_message(format!(
"Failed while saving the config file: {}",
error
));
}
return create_success_message(format!("Group \"{}\" removed", group));
}
// There are no groups yet.
if state.groups.is_empty() {
return create_success_message(
"There are no groups yet. You can add groups with the 'group --add' flag",
);
}
// Return information about all groups to the client.
Message::GroupResponse(GroupResponseMessage {
groups: state.groups.clone(),
settings: state.settings.daemon.groups.clone(),
})
}

View file

@ -0,0 +1,36 @@
use std::sync::mpsc::Sender;
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::TaskStatus;
use super::SENDER_ERR;
use crate::network::response_helper::task_response_helper;
/// Invoked when calling `pueue kill`.
/// Forward the kill message to the task handler, which then kills the process.
pub fn kill(message: KillMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
sender
.send(Message::Kill(message.clone()))
.expect(SENDER_ERR);
if !message.task_ids.is_empty() {
let state = state.lock().unwrap();
let response = task_response_helper(
"Tasks are being killed",
message.task_ids,
vec![TaskStatus::Running, TaskStatus::Paused],
&state,
);
return create_success_message(response);
}
if message.all {
create_success_message("All tasks are being killed.")
} else {
create_success_message(format!(
"All tasks of group \"{}\" are being killed.",
&message.group
))
}
}

View file

@ -0,0 +1,48 @@
use std::collections::BTreeMap;
use pueue::log::read_and_compress_log_files;
use pueue::network::message::*;
use pueue::state::SharedState;
/// Invoked when calling `pueue log`.
/// Return the current state and the stdou/stderr of all tasks to the client.
pub fn get_log(message: LogRequestMessage, state: &SharedState) -> Message {
let state = state.lock().unwrap().clone();
// Return all logs, if no specific task id is specified.
let task_ids = if message.task_ids.is_empty() {
state.tasks.keys().cloned().collect()
} else {
message.task_ids
};
let mut tasks = BTreeMap::new();
for task_id in task_ids.iter() {
if let Some(task) = state.tasks.get(task_id) {
// We send log output and the task at the same time.
// This isn't as efficient as sending the raw compressed data directly,
// but it's a lot more convenient for now.
let (stdout, stderr) = if message.send_logs {
match read_and_compress_log_files(*task_id, &state.settings.shared.pueue_directory)
{
Ok((stdout, stderr)) => (Some(stdout), Some(stderr)),
Err(err) => {
return create_failure_message(format!(
"Failed reading process output file: {:?}",
err
));
}
}
} else {
(None, None)
};
let task_log = TaskLogMessage {
task: task.clone(),
stdout,
stderr,
};
tasks.insert(*task_id, task_log);
}
}
Message::LogResponse(tasks)
}

View file

@ -0,0 +1,162 @@
use std::sync::mpsc::Sender;
use pueue::network::message::*;
use pueue::network::protocol::socket_cleanup;
use pueue::state::SharedState;
use crate::network::response_helper::*;
mod add;
mod clean;
mod edit;
mod enqueue;
mod group;
mod kill;
mod log;
mod parallel;
mod pause;
mod remove;
mod restart;
mod send;
mod start;
mod stash;
mod switch;
static SENDER_ERR: &str = "Failed to send message to task handler thread";
pub fn handle_message(message: Message, sender: &Sender<Message>, state: &SharedState) -> Message {
match message {
Message::Add(message) => add::add_task(message, sender, state),
Message::Clean => clean::clean(state),
Message::Edit(message) => edit::edit(message, state),
Message::EditRequest(task_id) => edit::edit_request(task_id, state),
Message::Enqueue(message) => enqueue::enqueue(message, state),
Message::Group(message) => group::group(message, state),
Message::Kill(message) => kill::kill(message, sender, state),
Message::Log(message) => log::get_log(message, state),
Message::Parallel(message) => parallel::set_parallel_tasks(message, state),
Message::Pause(message) => pause::pause(message, sender, state),
Message::Remove(task_ids) => remove::remove(task_ids, state),
Message::Reset(message) => reset(message, sender),
Message::Restart(message) => restart::restart_multiple(message, sender, state),
Message::Send(message) => send::send(message, sender, state),
Message::Start(message) => start::start(message, sender, state),
Message::Stash(task_ids) => stash::stash(task_ids, state),
Message::Switch(message) => switch::switch(message, state),
Message::Status => get_status(state),
Message::DaemonShutdown => shutdown(sender, state),
_ => create_failure_message("Not implemented yet"),
}
}
/// Invoked when calling `pueue reset`.
/// Forward the reset request to the task handler.
/// The handler then kills all children and clears the task queue.
fn reset(message: ResetMessage, sender: &Sender<Message>) -> Message {
sender.send(Message::Reset(message)).expect(SENDER_ERR);
create_success_message("Everything is being reset right now.")
}
/// Invoked when calling `pueue status`.
/// Return the current state.
fn get_status(state: &SharedState) -> Message {
let state = state.lock().unwrap().clone();
Message::StatusResponse(Box::new(state))
}
/// Initialize the shutdown procedure.
/// At first, the unix socket will be removed.
///
/// Next, the DaemonShutdown Message will be forwarded to the TaskHandler.
/// The TaskHandler then gracefully shuts down all child processes
/// and exits with std::proces::exit(0).
fn shutdown(sender: &Sender<Message>, state: &SharedState) -> Message {
// Do some socket cleanup (unix socket).
{
let state = state.lock().unwrap();
socket_cleanup(&state.settings.shared);
}
// Notify the task handler.
sender.send(Message::DaemonShutdown).expect(SENDER_ERR);
create_success_message("Daemon is shutting down")
}
#[cfg(test)]
mod fixtures {
use std::collections::HashMap;
pub use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
pub use pueue::network::message::*;
pub use pueue::network::protocol::socket_cleanup;
pub use pueue::settings::Settings;
pub use pueue::state::{SharedState, State};
pub use pueue::task::TaskResult;
pub use pueue::task::{Task, TaskStatus};
pub use super::*;
pub use crate::network::response_helper::*;
pub fn get_settings() -> Settings {
Settings::default_config()
.expect("Failed to get default config")
.try_into()
.expect("Failed to get test settings")
}
pub fn get_state() -> SharedState {
let settings = get_settings();
let state = State::new(&settings, None);
Arc::new(Mutex::new(state))
}
/// Create a new task with stub data
pub fn get_stub_task(id: &str) -> Task {
Task::new(
format!("ls {}", id),
"/tmp".to_string(),
HashMap::new(),
"default".to_string(),
TaskStatus::Queued,
None,
Vec::new(),
None,
)
}
pub fn get_stub_state() -> SharedState {
let state = get_state();
{
// Queued task
let mut state = state.lock().unwrap();
let task = get_stub_task("0");
state.add_task(task);
// Finished task
let mut task = get_stub_task("1");
task.status = TaskStatus::Done;
task.result = Some(TaskResult::Success);
state.add_task(task);
// Stashed task
let mut task = get_stub_task("2");
task.status = TaskStatus::Stashed;
state.add_task(task);
// Running task
let mut task = get_stub_task("3");
task.status = TaskStatus::Running;
state.add_task(task);
// Paused task
let mut task = get_stub_task("4");
task.status = TaskStatus::Paused;
state.add_task(task);
}
state
}
}

View file

@ -0,0 +1,27 @@
use pueue::network::message::*;
use pueue::state::SharedState;
use crate::network::response_helper::*;
/// Set the parallel tasks for either a specific group or the global default.
pub fn set_parallel_tasks(message: ParallelMessage, state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
if let Err(message) = ensure_group_exists(&state, &message.group) {
return message;
}
state
.settings
.daemon
.groups
.insert(message.group.clone(), message.parallel_tasks);
if let Err(error) = state.save_settings() {
return create_failure_message(format!("Failed while saving the config file: {}", error));
}
create_success_message(format!(
"Parallel tasks setting for group \"{}\" adjusted",
&message.group
))
}

View file

@ -0,0 +1,36 @@
use std::sync::mpsc::Sender;
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::TaskStatus;
use super::SENDER_ERR;
use crate::network::response_helper::*;
/// Invoked when calling `pueue pause`.
/// Forward the pause message to the task handler, which then pauses groups/tasks/everything.
pub fn pause(message: PauseMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
let state = state.lock().unwrap();
if let Err(message) = ensure_group_exists(&state, &message.group) {
return message;
}
sender
.send(Message::Pause(message.clone()))
.expect(SENDER_ERR);
if !message.task_ids.is_empty() {
let response = task_response_helper(
"Tasks are being paused",
message.task_ids,
vec![TaskStatus::Running],
&state,
);
return create_success_message(response);
}
if message.all {
create_success_message("All queues are being paused.")
} else {
create_success_message(format!("Group \"{}\" is being paused.", &message.group))
}
}

View file

@ -0,0 +1,115 @@
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::TaskStatus;
use crate::network::response_helper::*;
/// Invoked when calling `pueue remove`.
/// Remove tasks from the queue.
/// We have to ensure that those tasks aren't running!
pub fn remove(task_ids: Vec<usize>, state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
let statuses = vec![
TaskStatus::Queued,
TaskStatus::Stashed,
TaskStatus::Done,
TaskStatus::Locked,
];
let (mut not_running, mut running) = state.tasks_in_statuses(statuses, Some(task_ids));
// Don't delete tasks, if there are other tasks that depend on this one.
// However, we allow to delete those tasks, if they're supposed to be deleted as well.
for task_id in not_running.clone() {
if !state.is_task_removable(&task_id, &not_running) {
running.push(task_id);
not_running.retain(|id| id != &task_id);
};
}
for task_id in &not_running {
state.tasks.remove(task_id);
}
let text = "Tasks removed from list";
let response = compile_task_response(text, not_running, running);
create_success_message(response)
}
#[cfg(test)]
mod tests {
use super::super::fixtures::*;
use super::*;
#[test]
fn normal_remove() {
let state = get_stub_state();
// 3 and 4 aren't allowed to be removed, since they're running.
// The rest will succeed.
let message = remove(vec![0, 1, 2, 3, 4], &state);
assert!(matches!(message, Message::Success(_)));
if let Message::Success(text) = message {
assert_eq!(
text,
"Tasks removed from list: 0, 1, 2\nThe command failed for tasks: 3, 4"
);
};
let state = state.lock().unwrap();
assert_eq!(state.tasks.len(), 2);
}
#[test]
fn removal_of_dependencies() {
let state = get_stub_state();
{
let mut state = state.lock().unwrap();
// Add a task with a dependency to a finished task
let mut task = get_stub_task("5");
task.dependencies = vec![1];
state.add_task(task);
// Add a task depending on the previous task -> Linked dependencies
let mut task = get_stub_task("6");
task.dependencies = vec![5];
state.add_task(task);
}
// Make sure we cannot remove a task with dependencies.
let message = remove(vec![1], &state);
assert!(matches!(message, Message::Success(_)));
if let Message::Success(text) = message {
assert_eq!(text, "The command failed for tasks: 1");
};
{
let state = state.lock().unwrap();
assert_eq!(state.tasks.len(), 7);
}
// Make sure we cannot remove a task with recursive dependencies.
let message = remove(vec![1, 5], &state);
assert!(matches!(message, Message::Success(_)));
if let Message::Success(text) = message {
assert_eq!(text, "The command failed for tasks: 1, 5");
};
{
let state = state.lock().unwrap();
assert_eq!(state.tasks.len(), 7);
}
// Make sure we can remove tasks with dependencies if all dependencies are specified.
let message = remove(vec![1, 5, 6], &state);
assert!(matches!(message, Message::Success(_)));
if let Message::Success(text) = message {
assert_eq!(text, "Tasks removed from list: 1, 5, 6");
};
{
let state = state.lock().unwrap();
assert_eq!(state.tasks.len(), 4);
}
}
}

View file

@ -0,0 +1,61 @@
use std::sync::mpsc::Sender;
use std::sync::MutexGuard;
use pueue::aliasing::insert_alias;
use pueue::network::message::*;
use pueue::state::{SharedState, State};
use pueue::task::TaskStatus;
use super::SENDER_ERR;
/// This is a small wrapper around the actual in-place task `restart` functionality.
pub fn restart_multiple(
message: RestartMessage,
sender: &Sender<Message>,
state: &SharedState,
) -> Message {
let mut state = state.lock().unwrap();
for task in message.tasks.iter() {
restart(&mut state, task, message.stashed);
}
// Tell the task manager to start the task immediately, if it's requested.
if message.start_immediately {
sender
.send(Message::Start(StartMessage {
task_ids: message.tasks.iter().map(|task| task.task_id).collect(),
..Default::default()
}))
.expect(SENDER_ERR);
}
create_success_message("Tasks restarted")
}
/// This is invoked, whenever a task is actually restarted (in-place) without creating a new task.
/// Update a possibly changed path/command and reset all infos from the previous run.
fn restart(state: &mut MutexGuard<State>, to_restart: &TasksToRestart, stashed: bool) {
// Check if we actually know this task.
let task = if let Some(task) = state.tasks.get_mut(&to_restart.task_id) {
task
} else {
return;
};
// Either enqueue the task or stash it.
task.status = if stashed {
TaskStatus::Stashed
} else {
TaskStatus::Queued
};
// Update command and path.
task.original_command = to_restart.command.clone();
task.command = insert_alias(to_restart.command.clone());
task.path = to_restart.path.clone();
// Reset all variables of any previous run.
task.result = None;
task.start = None;
task.end = None;
}

View file

@ -0,0 +1,30 @@
use std::sync::mpsc::Sender;
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::TaskStatus;
use super::SENDER_ERR;
/// Invoked when calling `pueue send`.
/// The message will be forwarded to the task handler, which then sends the user input to the process.
/// In here we only do some error handling.
pub fn send(message: SendMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
// Check whether the task exists and is running. Abort if that's not the case.
{
let state = state.lock().unwrap();
match state.tasks.get(&message.task_id) {
Some(task) => {
if task.status != TaskStatus::Running {
return create_failure_message("You can only send input to a running task");
}
}
None => return create_failure_message("No task with this id."),
}
}
// Check whether the task exists and is running, abort if that's not the case.
sender.send(Message::Send(message)).expect(SENDER_ERR);
create_success_message("Message is being send to the process.")
}

View file

@ -0,0 +1,37 @@
use std::sync::mpsc::Sender;
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::TaskStatus;
use super::SENDER_ERR;
use crate::network::response_helper::*;
/// Invoked when calling `pueue start`.
/// Forward the start message to the task handler, which then starts the process(es).
pub fn start(message: StartMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
let state = state.lock().unwrap();
if let Err(message) = ensure_group_exists(&state, &message.group) {
return message;
}
sender
.send(Message::Start(message.clone()))
.expect(SENDER_ERR);
if !message.task_ids.is_empty() {
let response = task_response_helper(
"Tasks are being started",
message.task_ids,
vec![TaskStatus::Paused, TaskStatus::Queued, TaskStatus::Stashed],
&state,
);
return create_success_message(response);
}
if message.all {
create_success_message("All queues are being resumed.")
} else {
create_success_message(format!("Group \"{}\" is being resumed.", &message.group))
}
}

View file

@ -0,0 +1,26 @@
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::TaskStatus;
use crate::network::response_helper::*;
/// Invoked when calling `pueue stash`.
/// Stash specific queued tasks.
/// They won't be executed until they're enqueued or explicitely started.
pub fn stash(task_ids: Vec<usize>, state: &SharedState) -> Message {
let (matching, mismatching) = {
let mut state = state.lock().unwrap();
let (matching, mismatching) =
state.tasks_in_statuses(vec![TaskStatus::Queued, TaskStatus::Locked], Some(task_ids));
for task_id in &matching {
state.change_status(*task_id, TaskStatus::Stashed);
}
(matching, mismatching)
};
let text = "Tasks are stashed";
let response = compile_task_response(text, matching, mismatching);
create_success_message(response)
}

View file

@ -0,0 +1,31 @@
use pueue::network::message::*;
use pueue::state::SharedState;
use pueue::task::TaskStatus;
/// Invoked when calling `pueue switch`.
/// Switch the position of two tasks in the upcoming queue.
/// We have to ensure that those tasks are either `Queued` or `Stashed`
pub fn switch(message: SwitchMessage, state: &SharedState) -> Message {
let task_ids = vec![message.task_id_1, message.task_id_2];
let statuses = vec![TaskStatus::Queued, TaskStatus::Stashed];
let mut state = state.lock().unwrap();
let (_, mismatching) = state.tasks_in_statuses(statuses, Some(task_ids.to_vec()));
if !mismatching.is_empty() {
return create_failure_message("Tasks have to be either queued or stashed.");
}
// Get the tasks. Expect them to be there, since we found no mismatch
let mut first_task = state.tasks.remove(&task_ids[0]).unwrap();
let mut second_task = state.tasks.remove(&task_ids[1]).unwrap();
// Switch task ids
let temp_id = first_task.id;
first_task.id = second_task.id;
second_task.id = temp_id;
// Put tasks back in again
state.tasks.insert(first_task.id, first_task);
state.tasks.insert(second_task.id, second_task);
create_success_message("Tasks have been switched")
}

View file

@ -16,12 +16,13 @@ use crate::network::message_handler::handle_message;
/// Poll the listener and accept new incoming connections.
/// Create a new future to handle the message and spawn it.
pub async fn accept_incoming(sender: Sender<Message>, state: SharedState) -> Result<()> {
let (listener, secret) = {
// Get cloned shared settings to avoid holding a mutex over an await point.
let shared_settings = {
let state = state.lock().unwrap();
let listener = get_listener(&state.settings.shared).await?;
let secret = read_shared_secret(&state.settings.shared.shared_secret_path)?;
(listener, secret)
state.settings.shared.clone()
};
let listener = get_listener(&shared_settings).await?;
let secret = read_shared_secret(&shared_settings.shared_secret_path)?;
loop {
// Poll incoming connections.

View file

@ -61,6 +61,26 @@ impl Settings {
///
/// If `require_config` is `true`, an error will be thrown, if no configuration file can be found.
pub fn new(require_config: bool, from_file: &Option<PathBuf>) -> Result<Settings> {
let mut config = Settings::default_config()?;
// Load the config from a very specific file path
if let Some(path) = from_file {
if !path.exists() {
bail!("Couldn't find config at path {:?}", path);
}
info!("Using config file at: {:?}", path);
let config_file = config::File::with_name(path.to_str().unwrap());
config.merge(config_file)?;
} else {
// Load settings from the default config paths.
parse_config(&mut config, require_config)?;
}
// Try to can deserialize the entire configuration
Ok(config.try_into()?)
}
pub fn default_config() -> Result<Config> {
let mut config = Config::new();
let pueue_path = default_pueue_path()?;
config.set_default("shared.pueue_directory", pueue_path.clone())?;
@ -95,21 +115,7 @@ impl Settings {
config.set_default("daemon.callback", None::<String>)?;
config.set_default("daemon.groups", HashMap::<String, i64>::new())?;
// Load the config from a very specific file path
if let Some(path) = from_file {
if !path.exists() {
bail!("Couldn't find config at path {:?}", path);
}
info!("Using config file at: {:?}", path);
let config_file = config::File::with_name(path.to_str().unwrap());
config.merge(config_file)?;
} else {
// Load settings from the default config paths.
parse_config(&mut config, require_config)?;
}
// Try to can deserialize the entire configuration
Ok(config.try_into()?)
Ok(config)
}
/// Try to read the config file without any default values.

View file

@ -61,8 +61,6 @@ impl State {
config_path,
};
state.create_group("default");
state.restore();
state.save();
state
}
@ -188,6 +186,38 @@ impl State {
(matching, mismatching)
}
/// Check if a task can be deleted.
/// We have to check all dependant tasks, which haven't finished yet.
/// This is necessary to prevent deletion of tasks which are specified as a dependency.
///
/// `to_delete` A list of task ids, which should also be deleted.
pub fn is_task_removable(&self, task_id: &usize, to_delete: &[usize]) -> bool {
// Get all task ids of any dependant tasks.
let dependants: Vec<usize> = self
.tasks
.iter()
.filter(|(_, task)| {
task.dependencies.contains(&task_id) && task.status != TaskStatus::Done
})
.map(|(_, task)| task.id)
.collect();
if dependants.is_empty() {
return true;
}
// Check if the dependants are supposed to be deleted as well.
let should_delete_dependants = dependants.iter().all(|task_id| to_delete.contains(task_id));
if !should_delete_dependants {
return false;
}
// Lastly, do a recursive check if there are any dependants on our dependants
dependants
.iter()
.all(|task_id| self.is_task_removable(task_id, to_delete))
}
/// Users can specify to pause either the task's group or all groups on a failure.
pub fn handle_task_failure(&mut self, group: String) {
if self.settings.daemon.pause_group_on_failure {
@ -278,7 +308,7 @@ impl State {
/// Restore the last state from a previous session.
/// The state is stored as json in the log directory.
fn restore(&mut self) {
pub fn restore(&mut self) {
let path = Path::new(&self.settings.shared.pueue_directory).join("state.json");
// Ignore if the file doesn't exist. It doesn't have to.