Merge pull request #547 from Nukesor/internal-refactor

Internal refactor
This commit is contained in:
Arne Christian Beer 2024-06-23 15:17:48 +02:00 committed by GitHub
commit a9ed6f7cee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
52 changed files with 1385 additions and 1500 deletions

4
.gitignore vendored
View file

@ -3,6 +3,10 @@
target/
lib/target/
# Perf files
flamegraph.svg
perf.data
# These are backup files generated by rustfmt
*.rs.bk

View file

@ -4,7 +4,25 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## \[Unreleased\]
## \[4.0.0\] - unreleased
This release introduces a new major version, as it includes a major internal refactoring.
That refactoring corrects an old architectural design decision to have the subprocess state live in dedicated thread.
This design forced client commands that directly affected processes, such as `pueue start --immediate`, to be forwarded to that thread via an `mpsc` channel.
That thread would then check for new `mpsc` messages in a loop and eventually execute that command.
This design resulted in short delays until those commands would actually take effect, which was problematic during testing or scripting.
The new design fixes this issue and moves all process state into the global shared state (behind a Mutex), which allows Pueue to do process manipulation directly inside of the client message handlers.
Furthermore, this change makes Pueue better suited to be scripted, as it effectively eliminates the need to call `pueue wait` in certain scenarios. The focus of Pueue, however, lies still on human interaction.
Even though this refactoring significantly simplified the code, it also introduced a few mean and subtle bugs. Large parts of the internal state handling have been changed after all. Hopefully most have been caught by Pueue's extensive test suite, but there's still a chance that I overlooked something.
So even though this is technically not a breaking change, I'll treat it as one to make you aware of possible issues that may arise.
### Fixed
- Fixed delay after sending process related commands from client. [#540](https://github.com/Nukesor/pueue/pull/540)
### Added

View file

@ -30,20 +30,16 @@ This includes:
The daemon is composed of two main components.
1. Request handling in `pueue/src/daemon/network/`.
This is the code responsible for communicating with clients.
In `pueue/src/daemon/network/message_handler/` you can find neatly separated handlers for all of Pueue's subcommands.
This is the code responsible for communicating with clients.
In `pueue/src/daemon/network/message_handler/` you can find neatly separated handlers for all of Pueue's subcommands.
2. The TaskHandler in `pueue/src/daemon/task_handler/`.
It's responsible for everything regarding process interaction.
It's responsible for everything regarding process interaction.
All information that's not sub-process specific, is stored in the `State` (`pueue-lib/state.rs`) struct. \
All information, including process specific information, is stored in the `State` (`pueue-lib/state.rs`) struct. \
Both components share a reference to the State, a `Arc<Mutex<State>>`.
That way we can guarantee a single source of truth and a consistent state.
It's also important to know, that there's a `mpsc` channel. \
This channel is used to send on-demand messages from the network request handler to the the TaskHandler.
This includes stuff like "Start/Pause/Kill" sub-processes or "Reset everything".
### Request handling
### Message Handlers
The `pueue/src/daemon/network/socket.rs` module contains the logic for accepting client connections and receiving payloads.
The request accept and handle logic is a single async-await loop run by the main thread.
@ -51,18 +47,13 @@ The request accept and handle logic is a single async-await loop run by the main
The payload is then deserialized to `Message` (`pueue-lib/message.rs`) and handled by its respective function.
All functions used for handling these messages can be found in `pueue/src/daemon/network/message_handler`.
Many messages can be instantly handled by simply modifying or reading the state. \
However, sometimes the TaskHandler has to be notified, if something involves modifying actual system processes (start/pause/kill tasks).
That's when the `mpsc` channel to the TaskHandler comes into play.
### TaskHandler
The TaskHandler is responsible for actually starting and managing system processes. \
It's further important to note, that it runs in its own thread.
It shares the async main thread with the message handlers in a `try_join!` call.
The TaskHandler runs a never ending loop, which checks a few times each second, if
- there are new instructions in the `mpsc` channel.
- a new task can be started.
- tasks finished and can be finalized.
- delayed tasks can be enqueued (`-d` flag on `pueue add`)
@ -74,12 +65,9 @@ The TaskHandler is by far the most complex piece of code in this project, but th
Whenever you're writing some core-logic in Pueue, please make sure to understand how mutexes work.
Try to be conservative with your `state.lock()` calls, since this also blocks the request handler!
Only use the state, if you absolutely have to.
As a general rule of thumb, the state should only ever be locked in message handler functions and at the top of the TaskHandler's main loop.
At the same time, you should also lock early enough to prevent inconsistent states.
Operations should generally be atomic. \
Anyhow, working with mutexes is usually straight-forward, but can sometimes be a little bit tricky.
This rule allows us to be very conservative with state locking to prevent deadlocks.
## Code Style

View file

@ -0,0 +1,135 @@
use std::collections::HashMap;
use chrono::{DateTime, Local};
use handlebars::{Handlebars, RenderError};
use log::{debug, error, info};
use pueue_lib::{
log::{get_log_path, read_last_log_file_lines},
process_helper::compile_shell_command,
settings::Settings,
task::{Task, TaskResult, TaskStatus},
};
use super::state_helper::LockedState;
/// Users can specify a callback that's fired whenever a task finishes.
/// Execute the callback by spawning a new subprocess.
pub fn spawn_callback(settings: &Settings, state: &mut LockedState, task: &Task) {
// Return early, if there's no callback specified
let Some(template_string) = &settings.daemon.callback else {
return;
};
// Build the command to be called from the template string in the configuration file.
let callback_command = match build_callback_command(settings, task, template_string) {
Ok(callback_command) => callback_command,
Err(err) => {
error!("Failed to create callback command from template with error: {err}");
return;
}
};
let mut command = compile_shell_command(settings, &callback_command);
// Spawn the callback subprocess and log if it fails.
let spawn_result = command.spawn();
let child = match spawn_result {
Err(error) => {
error!("Failed to spawn callback with error: {error}");
return;
}
Ok(child) => child,
};
debug!("Spawned callback for task {}", task.id);
state.callbacks.push(child);
}
/// Take the callback template string from the configuration and insert all parameters from the
/// finished task.
pub fn build_callback_command(
settings: &Settings,
task: &Task,
template_string: &str,
) -> Result<String, RenderError> {
// Init Handlebars. We set to strict, as we want to show an error on missing variables.
let mut handlebars = Handlebars::new();
handlebars.set_strict_mode(true);
// Add templating variables.
let mut parameters = HashMap::new();
parameters.insert("id", task.id.to_string());
parameters.insert("command", task.command.clone());
parameters.insert("path", (*task.path.to_string_lossy()).to_owned());
parameters.insert("group", task.group.clone());
// Result takes the TaskResult Enum strings, unless it didn't finish yet.
if let TaskStatus::Done(result) = &task.status {
parameters.insert("result", result.to_string());
} else {
parameters.insert("result", "None".into());
}
// Format and insert start and end times.
let print_time = |time: Option<DateTime<Local>>| {
time.map(|time| time.timestamp().to_string())
.unwrap_or_default()
};
parameters.insert("start", print_time(task.start));
parameters.insert("end", print_time(task.end));
// Read the last lines of the process' output and make it available.
if let Ok(output) = read_last_log_file_lines(
task.id,
&settings.shared.pueue_directory(),
settings.daemon.callback_log_lines,
) {
parameters.insert("output", output);
} else {
parameters.insert("output", "".to_string());
}
let out_path = get_log_path(task.id, &settings.shared.pueue_directory());
// Using Display impl of PathBuf which isn't necessarily a perfect
// representation of the path but should work for most cases here
parameters.insert("output_path", out_path.display().to_string());
// Get the exit code
if let TaskStatus::Done(result) = &task.status {
match result {
TaskResult::Success => parameters.insert("exit_code", "0".into()),
TaskResult::Failed(code) => parameters.insert("exit_code", code.to_string()),
_ => parameters.insert("exit_code", "None".into()),
};
} else {
parameters.insert("exit_code", "None".into());
}
handlebars.render_template(template_string, &parameters)
}
/// Look at all running callbacks and log any errors.
/// If everything went smoothly, simply remove them from the list.
pub fn check_callbacks(state: &mut LockedState) {
let mut finished = Vec::new();
for (id, child) in state.callbacks.iter_mut().enumerate() {
match child.try_wait() {
// Handle a child error.
Err(error) => {
error!("Callback failed with error {error:?}");
finished.push(id);
}
// Child process did not exit yet.
Ok(None) => continue,
Ok(exit_status) => {
info!("Callback finished with exit code {exit_status:?}");
finished.push(id);
}
}
}
finished.reverse();
for id in finished.iter() {
state.callbacks.remove(*id);
}
}

View file

@ -4,26 +4,28 @@ use std::{fs::create_dir_all, path::PathBuf};
use anyhow::{bail, Context, Result};
use log::warn;
use std::sync::mpsc::channel;
use process_handler::initiate_shutdown;
use pueue_lib::error::Error;
use pueue_lib::network::certificate::create_certificates;
use pueue_lib::network::message::Shutdown;
use pueue_lib::network::protocol::socket_cleanup;
use pueue_lib::network::secret::init_shared_secret;
use pueue_lib::settings::Settings;
use pueue_lib::state::State;
use pueue_lib::state::{SharedState, State};
use tokio::try_join;
use self::state_helper::{restore_state, save_state};
use crate::daemon::network::socket::accept_incoming;
use crate::daemon::task_handler::{TaskHandler, TaskSender};
mod callbacks;
pub mod cli;
mod network;
mod pid;
mod process_handler;
/// Contains re-usable helper functions, that operate on the pueue-lib state.
pub mod state_helper;
mod task_handler;
pub mod task_handler;
/// The main entry point for the daemon logic.
/// It's basically the `main`, but publicly exported as a library.
@ -77,24 +79,18 @@ pub async fn run(config_path: Option<PathBuf>, profile: Option<String>, test: bo
save_state(&state, &settings).context("Failed to save state on startup.")?;
let state = Arc::new(Mutex::new(state));
let (sender, receiver) = channel();
let sender = TaskSender::new(sender);
let mut task_handler = TaskHandler::new(state.clone(), settings.clone(), receiver);
// Don't set ctrlc and panic handlers during testing.
// This is necessary for multithreaded integration testing, since multiple listener per process
// aren't allowed. On top of this, ctrlc also somehow breaks test error output.
if !test {
setup_signal_panic_handling(&settings, &sender)?;
setup_signal_panic_handling(&settings, state.clone())?;
}
std::thread::spawn(move || {
task_handler.run();
});
accept_incoming(sender, state.clone(), settings.clone()).await?;
Ok(())
// Run both the task handler and the message handler in the same tokio task.
// If any of them fails, return an error immediately.
let task_handler = task_handler::run(state.clone(), settings.clone());
let message_handler = accept_incoming(settings.clone(), state.clone());
try_join!(task_handler, message_handler).map(|_| ())
}
/// Initialize all directories needed for normal operation.
@ -136,17 +132,16 @@ fn init_directories(pueue_dir: &Path) -> Result<()> {
/// TaskHandler. This is to prevent dangling processes and other weird edge-cases.
///
/// On panic, we want to cleanup existing unix sockets and the PID file.
fn setup_signal_panic_handling(settings: &Settings, sender: &TaskSender) -> Result<()> {
let sender_clone = sender.clone();
fn setup_signal_panic_handling(settings: &Settings, state: SharedState) -> Result<()> {
let state_clone = state.clone();
let settings_clone = settings.clone();
// This section handles Shutdown via SigTerm/SigInt process signals
// Notify the TaskHandler, so it can shutdown gracefully.
// The actual program exit will be done via the TaskHandler.
ctrlc::set_handler(move || {
// Notify the task handler
sender_clone
.send(Shutdown::Graceful)
.expect("Failed to send Message to TaskHandler on Shutdown");
let mut state = state_clone.lock().unwrap();
initiate_shutdown(&settings_clone, &mut state, Shutdown::Graceful);
})?;
// Try to do some final cleanup, even if we panic.

View file

@ -9,7 +9,12 @@ use pueue_lib::network::message::*;
use pueue_lib::network::protocol::{send_message, GenericStream};
use pueue_lib::state::SharedState;
/// Handle the continuous stream of a message.
/// Handle the continuous stream of a some log output.
///
/// It's not actually a stream in the sense of a low-level network stream, but rather a series of
/// `Message::Stream` messages, that each send a portion of new log output.
///
/// It's basically our own chunked stream implementation on top of the protocol we established.
pub async fn handle_follow(
pueue_directory: &Path,
stream: &mut GenericStream,

View file

@ -1,22 +1,19 @@
use chrono::Local;
use pueue_lib::aliasing::insert_alias;
use pueue_lib::failure_msg;
use pueue_lib::network::message::*;
use pueue_lib::state::{GroupStatus, SharedState};
use pueue_lib::task::{Task, TaskStatus};
use super::*;
use crate::daemon::process_handler;
use crate::daemon::state_helper::save_state;
use crate::ok_or_return_failure_message;
use crate::ok_or_save_state_failure;
/// Invoked when calling `pueue add`.
/// Queues a new task to the state.
/// If the start_immediately flag is set, send a StartMessage to the task handler.
pub fn add_task(
message: AddMessage,
sender: &TaskSender,
state: &SharedState,
settings: &Settings,
) -> Message {
pub fn add_task(settings: &Settings, state: &SharedState, message: AddMessage) -> Message {
let mut state = state.lock().unwrap();
if let Err(message) = ensure_group_exists(&mut state, &message.group) {
return message;
@ -29,9 +26,7 @@ pub fn add_task(
.filter(|id| !state.tasks.contains_key(id))
.collect();
if !not_found.is_empty() {
return create_failure_message(format!(
"Unable to setup dependencies : task(s) {not_found:?} not found",
));
return failure_msg!("Unable to setup dependencies : task(s) {not_found:?} not found",);
}
// Create a new task and add it to the state.
@ -78,15 +73,11 @@ pub fn add_task(
// Add the task and persist the state.
let task_id = state.add_task(task);
ok_or_return_failure_message!(save_state(&state, settings));
ok_or_save_state_failure!(save_state(&state, settings));
// Notify the task handler, in case the client wants to start the task immediately.
if message.start_immediately {
sender
.send(StartMessage {
tasks: TaskSelection::TaskIds(vec![task_id]),
})
.expect(SENDER_ERR);
process_handler::start::start(settings, &mut state, TaskSelection::TaskIds(vec![task_id]));
}
// Create the customized response for the client.

View file

@ -5,7 +5,7 @@ use pueue_lib::task::{TaskResult, TaskStatus};
use super::*;
use crate::daemon::state_helper::{is_task_removable, save_state};
use crate::ok_or_return_failure_message;
use crate::ok_or_save_state_failure;
fn construct_success_clean_message(message: CleanMessage) -> String {
let successful_only_fix = if message.successful_only {
@ -24,7 +24,7 @@ fn construct_success_clean_message(message: CleanMessage) -> String {
/// Invoked when calling `pueue clean`.
/// Remove all failed or done tasks from the state.
pub fn clean(message: CleanMessage, state: &SharedState, settings: &Settings) -> Message {
pub fn clean(settings: &Settings, state: &SharedState, message: CleanMessage) -> Message {
let mut state = state.lock().unwrap();
let filtered_tasks =
@ -56,7 +56,7 @@ pub fn clean(message: CleanMessage, state: &SharedState, settings: &Settings) ->
clean_log_handles(*task_id, &settings.shared.pueue_directory());
}
ok_or_return_failure_message!(save_state(&state, settings));
ok_or_save_state_failure!(save_state(&state, settings));
create_success_message(construct_success_clean_message(message))
}
@ -116,7 +116,7 @@ mod tests {
let (state, settings, _tempdir) = get_stub_state();
// Only task 1 will be removed, since it's the only TaskStatus with `Done`.
let message = clean(get_message(false, None), &state, &settings);
let message = clean(&settings, &state, get_message(false, None));
// Return message is correct
assert!(matches!(message, Message::Success(_)));
@ -133,7 +133,7 @@ mod tests {
let (state, settings, _tempdir) = get_clean_test_state(&[PUEUE_DEFAULT_GROUP]);
// All finished tasks should removed when calling default `clean`.
let message = clean(get_message(false, None), &state, &settings);
let message = clean(&settings, &state, get_message(false, None));
// Return message is correct
assert!(matches!(message, Message::Success(_)));
@ -151,7 +151,7 @@ mod tests {
// Only successfully finished tasks should get removed when
// calling `clean` with the `successful_only` flag.
let message = clean(get_message(true, None), &state, &settings);
let message = clean(&settings, &state, get_message(true, None));
// Return message is correct
assert!(matches!(message, Message::Success(_)));
@ -170,7 +170,7 @@ mod tests {
let (state, settings, _tempdir) = get_clean_test_state(&[PUEUE_DEFAULT_GROUP, "other"]);
// All finished tasks should removed in selected group (other)
let message = clean(get_message(false, Some("other".into())), &state, &settings);
let message = clean(&settings, &state, get_message(false, Some("other".into())));
// Return message is correct
assert!(matches!(message, Message::Success(_)));
@ -193,7 +193,7 @@ mod tests {
let (state, settings, _tempdir) = get_clean_test_state(&[PUEUE_DEFAULT_GROUP, "other"]);
// Only successfully finished tasks should removed in the 'other' group
let message = clean(get_message(true, Some("other".into())), &state, &settings);
let message = clean(&settings, &state, get_message(true, Some("other".into())));
// Return message is correct
assert!(matches!(message, Message::Success(_)));

View file

@ -2,15 +2,16 @@ use pueue_lib::aliasing::insert_alias;
use pueue_lib::network::message::*;
use pueue_lib::state::SharedState;
use pueue_lib::task::TaskStatus;
use pueue_lib::{failure_msg, success_msg};
use super::*;
use crate::daemon::state_helper::save_state;
use crate::ok_or_return_failure_message;
use crate::ok_or_save_state_failure;
/// Invoked when calling `pueue edit`.
/// If a user wants to edit a message, we need to send him the current command.
/// Lock the task to prevent execution, before the user has finished editing the command.
pub fn edit_request(task_id: usize, state: &SharedState) -> Message {
pub fn edit_request(state: &SharedState, task_id: usize) -> Message {
// Check whether the task exists and is queued/stashed. Abort if that's not the case.
let mut state = state.lock().unwrap();
match state.tasks.get_mut(&task_id) {
@ -36,7 +37,7 @@ pub fn edit_request(task_id: usize, state: &SharedState) -> Message {
/// Invoked after closing the editor on `pueue edit`.
/// Now we actually update the message with the updated command from the client.
pub fn edit(message: EditMessage, state: &SharedState, settings: &Settings) -> Message {
pub fn edit(settings: &Settings, state: &SharedState, message: EditMessage) -> Message {
// Check whether the task exists and is locked. Abort if that's not the case.
let mut state = state.lock().unwrap();
match state.tasks.get_mut(&message.task_id) {
@ -68,16 +69,16 @@ pub fn edit(message: EditMessage, state: &SharedState, settings: &Settings) -> M
task.priority = priority;
}
ok_or_return_failure_message!(save_state(&state, settings));
ok_or_save_state_failure!(save_state(&state, settings));
create_success_message("Command has been updated")
}
None => create_failure_message(format!("Task to edit has gone away: {}", message.task_id)),
None => failure_msg!("Task to edit has gone away: {}", message.task_id),
}
}
/// Invoked if a client fails to edit a task and asks the daemon to restore the task's status.
pub fn edit_restore(task_id: usize, state: &SharedState) -> Message {
pub fn edit_restore(state: &SharedState, task_id: usize) -> Message {
// Check whether the task exists and is queued/stashed. Abort if that's not the case.
let mut state = state.lock().unwrap();
match state.tasks.get_mut(&task_id) {
@ -87,10 +88,10 @@ pub fn edit_restore(task_id: usize, state: &SharedState) -> Message {
}
task.status = task.prev_status.clone();
create_success_message(format!(
success_msg!(
"The requested task's status has been restored to '{}'",
task.status
))
)
}
None => create_failure_message("No task with this id."),
}

View file

@ -7,7 +7,7 @@ use crate::daemon::network::response_helper::*;
/// Invoked when calling `pueue enqueue`.
/// Enqueue specific stashed tasks.
pub fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message {
pub fn enqueue(state: &SharedState, message: EnqueueMessage) -> Message {
let mut state = state.lock().unwrap();
let filtered_tasks = state.filter_tasks(
|task| matches!(task.status, TaskStatus::Stashed { .. } | TaskStatus::Locked),

View file

@ -1,17 +1,22 @@
use pueue_lib::network::message::*;
use pueue_lib::state::{SharedState, PUEUE_DEFAULT_GROUP};
use std::collections::BTreeMap;
use pueue_lib::network::message::*;
use pueue_lib::settings::Settings;
use pueue_lib::state::{SharedState, PUEUE_DEFAULT_GROUP};
use pueue_lib::{failure_msg, success_msg};
use super::TaskSender;
use crate::daemon::network::message_handler::ok_or_failure_message;
use crate::daemon::network::response_helper::ensure_group_exists;
use crate::ok_or_return_failure_message;
use crate::daemon::process_handler::initiate_shutdown;
use crate::daemon::state_helper::save_state;
use crate::ok_or_save_state_failure;
/// Invoked on `pueue groups`.
/// Manage groups.
/// - Show groups
/// - Add group
/// - Remove group
pub fn group(message: GroupMessage, sender: &TaskSender, state: &SharedState) -> Message {
pub fn group(settings: &Settings, state: &SharedState, message: GroupMessage) -> Message {
let mut state = state.lock().unwrap();
match message {
@ -27,18 +32,20 @@ pub fn group(message: GroupMessage, sender: &TaskSender, state: &SharedState) ->
parallel_tasks,
} => {
if state.groups.contains_key(&name) {
return create_failure_message(format!("Group \"{name}\" already exists"));
return failure_msg!("Group \"{name}\" already exists");
}
// Propagate the message to the TaskHandler, which is responsible for actually
// manipulating our internal data
let result = sender.send(GroupMessage::Add {
name: name.clone(),
parallel_tasks,
});
ok_or_return_failure_message!(result);
let group = state.create_group(&name);
if let Some(parallel_tasks) = parallel_tasks {
group.parallel_tasks = parallel_tasks;
}
// Create the worker pool.
state.children.0.insert(name.clone(), BTreeMap::new());
create_success_message(format!("Group \"{name}\" is being created"))
// Persist the state.
ok_or_save_state_failure!(save_state(&state, settings));
success_msg!("New group \"{name}\" has been created")
}
GroupMessage::Remove(group) => {
if let Err(message) = ensure_group_exists(&mut state, &group) {
@ -46,22 +53,39 @@ pub fn group(message: GroupMessage, sender: &TaskSender, state: &SharedState) ->
}
if group == PUEUE_DEFAULT_GROUP {
return create_failure_message("You cannot delete the default group".to_string());
return failure_msg!("You cannot delete the default group");
}
// Make sure there are no tasks in that group.
if state.tasks.iter().any(|(_, task)| task.group == group) {
return create_failure_message(
"You cannot remove a group, if there're still tasks in it.".to_string(),
);
return failure_msg!("You cannot remove a group, if there're still tasks in it.");
}
// Propagate the message to the TaskHandler, which is responsible for actually
// manipulating our internal data
let result = sender.send(GroupMessage::Remove(group.clone()));
ok_or_return_failure_message!(result);
// Make sure the worker pool exists and is empty.
// There shouldn't be any children, if there are no tasks in this group.
// Those are critical errors, as they indicate desynchronization inside our
// internal datastructures, which is really bad.
if let Some(pool) = state.children.0.get(&group) {
if !pool.is_empty() {
initiate_shutdown(settings, &mut state, Shutdown::Emergency);
return failure_msg!("Encountered a non-empty worker pool, while removing a group. This is a critical error. Please report this bug.");
}
} else {
initiate_shutdown(settings, &mut state, Shutdown::Emergency);
return failure_msg!("Encountered an group without an worker pool, while removing a group. This is a critical error. Please report this bug.");
}
create_success_message(format!("Group \"{group}\" is being removed"))
if let Err(error) = state.remove_group(&group) {
return failure_msg!("Error while removing group: \"{error}\"");
}
// Actually remove the worker pool.
state.children.0.remove(&group);
// Persist the state.
ok_or_save_state_failure!(save_state(&state, settings));
success_msg!("Group \"{group}\" has been removed")
}
}
}

View file

@ -1,12 +1,13 @@
use pueue_lib::network::message::*;
use pueue_lib::state::SharedState;
use pueue_lib::success_msg;
use pueue_lib::{network::message::*, settings::Settings};
use super::{TaskSender, SENDER_ERR};
use crate::daemon::network::response_helper::{ensure_group_exists, task_action_response_helper};
use crate::daemon::process_handler;
/// Invoked when calling `pueue kill`.
/// Forward the kill message to the task handler, which then kills the process.
pub fn kill(message: KillMessage, sender: &TaskSender, state: &SharedState) -> Message {
pub fn kill(settings: &Settings, state: &SharedState, message: KillMessage) -> Message {
let mut state = state.lock().unwrap();
// If a group is selected, make sure it exists.
@ -25,11 +26,11 @@ pub fn kill(message: KillMessage, sender: &TaskSender, state: &SharedState) -> M
|task| task.is_running(),
&state,
),
TaskSelection::Group(group) => create_success_message(format!(
"Sending signal {signal} to all running tasks of group {group}.",
)),
TaskSelection::Group(group) => {
success_msg!("Sending signal {signal} to all running tasks of group {group}.",)
}
TaskSelection::All => {
create_success_message(format!("Sending signal {signal} to all running tasks."))
success_msg!("Sending signal {signal} to all running tasks.")
}
}
} else {
@ -40,18 +41,18 @@ pub fn kill(message: KillMessage, sender: &TaskSender, state: &SharedState) -> M
|task| task.is_running(),
&state,
),
TaskSelection::Group(group) => create_success_message(format!(
TaskSelection::Group(group) => success_msg!(
"All tasks of group \"{group}\" are being killed. The group will also be paused!!!"
)),
),
TaskSelection::All => {
create_success_message("All tasks are being killed. All groups will be paused!!!")
success_msg!("All tasks are being killed. All groups will be paused!!!")
}
}
};
// Actually execute the command
if let Message::Success(_) = response {
// Forward the message to the task handler, but only if there is something to kill.
sender.send(message).expect(SENDER_ERR);
process_handler::kill::kill(settings, &mut state, message.tasks, true, message.signal);
}
response

View file

@ -1,5 +1,6 @@
use std::collections::BTreeMap;
use pueue_lib::failure_msg;
use pueue_lib::log::read_and_compress_log_file;
use pueue_lib::network::message::*;
use pueue_lib::settings::Settings;
@ -7,7 +8,7 @@ use pueue_lib::state::SharedState;
/// Invoked when calling `pueue log`.
/// Return tasks and their output to the client.
pub fn get_log(message: LogRequestMessage, state: &SharedState, settings: &Settings) -> Message {
pub fn get_log(settings: &Settings, state: &SharedState, message: LogRequestMessage) -> Message {
let state = { state.lock().unwrap().clone() };
// Return all logs, if no specific task id is specified.
let task_ids = if message.task_ids.is_empty() {
@ -31,9 +32,7 @@ pub fn get_log(message: LogRequestMessage, state: &SharedState, settings: &Setti
Ok((output, output_complete)) => (Some(output), output_complete),
Err(err) => {
// Fail early if there's some problem with getting the log output
return create_failure_message(format!(
"Failed reading process output file: {err:?}"
));
return failure_msg!("Failed reading process output file: {err:?}");
}
}
} else {

View file

@ -1,10 +1,10 @@
use std::fmt::Display;
use pueue_lib::failure_msg;
use pueue_lib::network::message::*;
use pueue_lib::settings::Settings;
use pueue_lib::state::SharedState;
use super::TaskSender;
use crate::daemon::network::response_helper::*;
mod add;
@ -17,52 +17,38 @@ mod log;
mod parallel;
mod pause;
mod remove;
mod reset;
mod restart;
mod send;
mod start;
mod stash;
mod switch;
pub static SENDER_ERR: &str = "Failed to send message to task handler thread";
pub fn handle_message(
message: Message,
sender: &TaskSender,
state: &SharedState,
settings: &Settings,
) -> Message {
pub fn handle_message(message: Message, state: &SharedState, settings: &Settings) -> Message {
match message {
Message::Add(message) => add::add_task(message, sender, state, settings),
Message::Clean(message) => clean::clean(message, state, settings),
Message::Edit(message) => edit::edit(message, state, settings),
Message::EditRequest(task_id) => edit::edit_request(task_id, state),
Message::EditRestore(task_id) => edit::edit_restore(task_id, state),
Message::Enqueue(message) => enqueue::enqueue(message, state),
Message::Group(message) => group::group(message, sender, state),
Message::Kill(message) => kill::kill(message, sender, state),
Message::Log(message) => log::get_log(message, state, settings),
Message::Add(message) => add::add_task(settings, state, message),
Message::Clean(message) => clean::clean(settings, state, message),
Message::Edit(message) => edit::edit(settings, state, message),
Message::EditRequest(task_id) => edit::edit_request(state, task_id),
Message::EditRestore(task_id) => edit::edit_restore(state, task_id),
Message::Enqueue(message) => enqueue::enqueue(state, message),
Message::Group(message) => group::group(settings, state, message),
Message::Kill(message) => kill::kill(settings, state, message),
Message::Log(message) => log::get_log(settings, state, message),
Message::Parallel(message) => parallel::set_parallel_tasks(message, state),
Message::Pause(message) => pause::pause(message, sender, state),
Message::Remove(task_ids) => remove::remove(task_ids, state, settings),
Message::Reset(message) => reset(message, sender),
Message::Restart(message) => restart::restart_multiple(message, sender, state, settings),
Message::Send(message) => send::send(message, sender, state),
Message::Start(message) => start::start(message, sender, state),
Message::Stash(task_ids) => stash::stash(task_ids, state),
Message::Switch(message) => switch::switch(message, state, settings),
Message::Pause(message) => pause::pause(settings, state, message),
Message::Remove(task_ids) => remove::remove(settings, state, task_ids),
Message::Reset(_) => reset::reset(settings, state),
Message::Restart(message) => restart::restart_multiple(settings, state, message),
Message::Send(message) => send::send(state, message),
Message::Start(message) => start::start(settings, state, message),
Message::Stash(task_ids) => stash::stash(state, task_ids),
Message::Switch(message) => switch::switch(settings, state, message),
Message::Status => get_status(state),
_ => create_failure_message("Not yet implemented"),
}
}
/// Invoked when calling `pueue reset`.
/// Forward the reset request to the task handler.
/// The handler then kills all children and clears the task queue.
fn reset(message: ResetMessage, sender: &TaskSender) -> Message {
sender.send(message).expect(SENDER_ERR);
create_success_message("Everything is being reset right now.")
}
/// Invoked when calling `pueue status`.
/// Return the current state.
fn get_status(state: &SharedState) -> Message {
@ -73,14 +59,12 @@ fn get_status(state: &SharedState) -> Message {
fn ok_or_failure_message<T, E: Display>(result: Result<T, E>) -> Result<T, Message> {
match result {
Ok(inner) => Ok(inner),
Err(error) => Err(create_failure_message(format!(
"Failed to save state. This is a bug: {error}"
))),
Err(error) => Err(failure_msg!("Failed to save state. This is a bug: {error}")),
}
}
#[macro_export]
macro_rules! ok_or_return_failure_message {
macro_rules! ok_or_save_state_failure {
($expression:expr) => {
match ok_or_failure_message($expression) {
Ok(task_id) => task_id,

View file

@ -1,5 +1,6 @@
use pueue_lib::network::message::*;
use pueue_lib::state::SharedState;
use pueue_lib::success_msg;
use crate::daemon::network::response_helper::*;
@ -13,8 +14,8 @@ pub fn set_parallel_tasks(message: ParallelMessage, state: &SharedState) -> Mess
group.parallel_tasks = message.parallel_tasks;
create_success_message(format!(
success_msg!(
"Parallel tasks setting for group \"{}\" adjusted",
&message.group
))
)
}

View file

@ -1,13 +1,14 @@
use pueue_lib::network::message::*;
use pueue_lib::state::SharedState;
use pueue_lib::success_msg;
use pueue_lib::task::TaskStatus;
use pueue_lib::{network::message::*, settings::Settings};
use super::{TaskSender, SENDER_ERR};
use crate::daemon::network::response_helper::*;
use crate::daemon::process_handler;
/// Invoked when calling `pueue pause`.
/// Forward the pause message to the task handler, which then pauses groups/tasks/everything.
pub fn pause(message: PauseMessage, sender: &TaskSender, state: &SharedState) -> Message {
pub fn pause(settings: &Settings, state: &SharedState, message: PauseMessage) -> Message {
let mut state = state.lock().unwrap();
// If a group is selected, make sure it exists.
if let TaskSelection::Group(group) = &message.tasks {
@ -25,14 +26,14 @@ pub fn pause(message: PauseMessage, sender: &TaskSender, state: &SharedState) ->
&state,
),
TaskSelection::Group(group) => {
create_success_message(format!("Group \"{group}\" is being paused."))
success_msg!("Group \"{group}\" is being paused.")
}
TaskSelection::All => create_success_message("All queues are being paused."),
TaskSelection::All => success_msg!("All queues are being paused."),
};
// Actually execute the command
if let Message::Success(_) = response {
// Forward the message to the task handler, but only if there is something to pause.
sender.send(message).expect(SENDER_ERR);
process_handler::pause::pause(settings, &mut state, message.tasks, message.wait);
}
response

View file

@ -7,12 +7,12 @@ use pueue_lib::task::{Task, TaskStatus};
use super::ok_or_failure_message;
use crate::daemon::network::response_helper::*;
use crate::daemon::state_helper::{is_task_removable, save_state};
use crate::ok_or_return_failure_message;
use crate::ok_or_save_state_failure;
/// Invoked when calling `pueue remove`.
/// Remove tasks from the queue.
/// We have to ensure that those tasks aren't running!
pub fn remove(task_ids: Vec<usize>, state: &SharedState, settings: &Settings) -> Message {
pub fn remove(settings: &Settings, state: &SharedState, task_ids: Vec<usize>) -> Message {
let mut state = state.lock().unwrap();
// Filter all running tasks, since we cannot remove them.
@ -42,7 +42,7 @@ pub fn remove(task_ids: Vec<usize>, state: &SharedState, settings: &Settings) ->
clean_log_handles(*task_id, &settings.shared.pueue_directory());
}
ok_or_return_failure_message!(save_state(&state, settings));
ok_or_save_state_failure!(save_state(&state, settings));
compile_task_response("Tasks removed from list", filtered_tasks)
}
@ -60,7 +60,7 @@ mod tests {
// 3 and 4 aren't allowed to be removed, since they're running.
// The rest will succeed.
let message = remove(vec![0, 1, 2, 3, 4], &state, &settings);
let message = remove(&settings, &state, vec![0, 1, 2, 3, 4]);
// Return message is correct
assert!(matches!(message, Message::Success(_)));
@ -93,7 +93,7 @@ mod tests {
}
// Make sure we cannot remove a task with dependencies.
let message = remove(vec![1], &state, &settings);
let message = remove(&settings, &state, vec![1]);
// Return message is correct
assert!(matches!(message, Message::Failure(_)));
@ -107,7 +107,7 @@ mod tests {
}
// Make sure we cannot remove a task with recursive dependencies.
let message = remove(vec![1, 5], &state, &settings);
let message = remove(&settings, &state, vec![1, 5]);
// Return message is correct
assert!(matches!(message, Message::Failure(_)));
@ -121,7 +121,7 @@ mod tests {
}
// Make sure we can remove tasks with dependencies if all dependencies are specified.
let message = remove(vec![1, 5, 6], &state, &settings);
let message = remove(&settings, &state, vec![1, 5, 6]);
// Return message is correct
assert!(matches!(message, Message::Success(_)));

View file

@ -0,0 +1,14 @@
use pueue_lib::state::SharedState;
use pueue_lib::{network::message::*, settings::Settings};
use crate::daemon::process_handler;
/// Invoked when calling `pueue reset`.
/// Kill all children by using the `kill` function.
/// Set the full_reset flag, which will prevent new tasks from being spawned.
pub fn reset(settings: &Settings, state: &SharedState) -> Message {
let mut state = state.lock().unwrap();
state.full_reset = true;
process_handler::kill::kill(settings, &mut state, TaskSelection::All, false, None);
create_success_message("Everything is being reset right now.")
}

View file

@ -7,17 +7,18 @@ use pueue_lib::network::message::*;
use pueue_lib::state::{SharedState, State};
use pueue_lib::task::TaskStatus;
use super::{task_action_response_helper, TaskSender, SENDER_ERR};
use crate::daemon::process_handler;
use super::task_action_response_helper;
/// This is a small wrapper around the actual in-place task `restart` functionality.
///
/// The "not in-place" restart functionality is actually just a copy the finished task + create a
/// new task, which is completely handled on the client-side.
pub fn restart_multiple(
message: RestartMessage,
sender: &TaskSender,
state: &SharedState,
settings: &Settings,
state: &SharedState,
message: RestartMessage,
) -> Message {
let task_ids: Vec<usize> = message.tasks.iter().map(|task| task.task_id).collect();
let mut state = state.lock().unwrap();
@ -31,18 +32,14 @@ pub fn restart_multiple(
&state,
);
// Actually restart all tasks
for task in message.tasks.into_iter() {
// Restart a tasks in-place
for task in message.tasks.iter() {
restart(&mut state, task, message.stashed, settings);
}
// Tell the task manager to start the task immediately if requested.
// Actually start the processes if we should do so.
if message.start_immediately {
sender
.send(StartMessage {
tasks: TaskSelection::TaskIds(task_ids),
})
.expect(SENDER_ERR);
process_handler::start::start(settings, &mut state, TaskSelection::TaskIds(task_ids));
}
response
@ -55,7 +52,7 @@ pub fn restart_multiple(
/// new task, which is completely handled on the client-side.
fn restart(
state: &mut MutexGuard<State>,
to_restart: TaskToRestart,
to_restart: &TaskToRestart,
stashed: bool,
settings: &Settings,
) {
@ -79,19 +76,19 @@ fn restart(
};
// Update command if applicable.
if let Some(new_command) = to_restart.command {
if let Some(new_command) = to_restart.command.clone() {
task.original_command = new_command.clone();
task.command = insert_alias(settings, new_command);
}
// Update path if applicable.
if let Some(path) = to_restart.path {
if let Some(path) = to_restart.path.clone() {
task.path = path;
}
// Update path if applicable.
if to_restart.label.is_some() {
task.label = to_restart.label;
task.label = to_restart.label.clone();
} else if to_restart.delete_label {
task.label = None
}

View file

@ -1,28 +1,28 @@
use pueue_lib::network::message::*;
use pueue_lib::state::SharedState;
use pueue_lib::task::TaskStatus;
use std::io::Write;
use super::{TaskSender, SENDER_ERR};
use pueue_lib::state::SharedState;
use pueue_lib::{failure_msg, network::message::*};
/// Invoked when calling `pueue send`.
/// The message will be forwarded to the task handler, which then sends the user input to the process.
/// In here we only do some error handling.
pub fn send(message: SendMessage, sender: &TaskSender, state: &SharedState) -> Message {
// Check whether the task exists and is running. Abort if that's not the case.
{
let state = state.lock().unwrap();
match state.tasks.get(&message.task_id) {
Some(task) => {
if task.status != TaskStatus::Running {
return create_failure_message("You can only send input to a running task");
}
}
None => return create_failure_message("No task with this id."),
}
}
pub fn send(state: &SharedState, message: SendMessage) -> Message {
let task_id = message.task_id;
let mut state = state.lock().unwrap();
// Check whether the task exists and is running, abort if that's not the case.
sender.send(message).expect(SENDER_ERR);
// Check whether the task exists and is running. Abort if that's not the case.
let child = match state.children.get_child_mut(task_id) {
Some(child) => child,
None => {
return failure_msg!("You can only send input to a running process.");
}
};
{
let child_stdin = child.inner().stdin.as_mut().unwrap();
if let Err(err) = child_stdin.write_all(&message.input.into_bytes()) {
return failure_msg!("Failed to send input to task {task_id} with err {err:?}");
};
}
create_success_message("Message is being send to the process.")
}

View file

@ -1,13 +1,15 @@
use pueue_lib::network::message::*;
use pueue_lib::settings::Settings;
use pueue_lib::state::SharedState;
use pueue_lib::success_msg;
use pueue_lib::task::TaskStatus;
use super::{TaskSender, SENDER_ERR};
use crate::daemon::network::response_helper::*;
use crate::daemon::process_handler;
/// Invoked when calling `pueue start`.
/// Forward the start message to the task handler, which then starts the process(es).
pub fn start(message: StartMessage, sender: &TaskSender, state: &SharedState) -> Message {
pub fn start(settings: &Settings, state: &SharedState, message: StartMessage) -> Message {
let mut state = state.lock().unwrap();
// If a group is selected, make sure it exists.
if let TaskSelection::Group(group) = &message.tasks {
@ -29,14 +31,13 @@ pub fn start(message: StartMessage, sender: &TaskSender, state: &SharedState) ->
&state,
),
TaskSelection::Group(group) => {
create_success_message(format!("Group \"{group}\" is being resumed."))
success_msg!("Group \"{group}\" is being resumed.")
}
TaskSelection::All => create_success_message("All queues are being resumed."),
TaskSelection::All => success_msg!("All queues are being resumed."),
};
if let Message::Success(_) = response {
// Forward the message to the task handler, but only if something can be started
sender.send(message).expect(SENDER_ERR);
process_handler::start::start(settings, &mut state, message.tasks);
}
// Return a response depending on the selected tasks.

View file

@ -7,7 +7,7 @@ use crate::daemon::network::response_helper::*;
/// Invoked when calling `pueue stash`.
/// Stash specific queued tasks.
/// They won't be executed until they're enqueued or explicitly started.
pub fn stash(task_ids: Vec<usize>, state: &SharedState) -> Message {
pub fn stash(state: &SharedState, task_ids: Vec<usize>) -> Message {
let mut state = state.lock().unwrap();
let filtered_tasks = state.filter_tasks(
|task| matches!(task.status, TaskStatus::Queued | TaskStatus::Locked),

View file

@ -1,3 +1,4 @@
use pueue_lib::failure_msg;
use pueue_lib::network::message::*;
use pueue_lib::settings::Settings;
use pueue_lib::state::SharedState;
@ -5,12 +6,12 @@ use pueue_lib::task::TaskStatus;
use super::ok_or_failure_message;
use crate::daemon::state_helper::save_state;
use crate::ok_or_return_failure_message;
use crate::ok_or_save_state_failure;
/// Invoked when calling `pueue switch`.
/// Switch the position of two tasks in the upcoming queue.
/// We have to ensure that those tasks are either `Queued` or `Stashed`
pub fn switch(message: SwitchMessage, state: &SharedState, settings: &Settings) -> Message {
pub fn switch(settings: &Settings, state: &SharedState, message: SwitchMessage) -> Message {
let mut state = state.lock().unwrap();
let task_ids = [message.task_id_1, message.task_id_2];
@ -19,10 +20,10 @@ pub fn switch(message: SwitchMessage, state: &SharedState, settings: &Settings)
Some(task_ids.to_vec()),
);
if !filtered_tasks.non_matching_ids.is_empty() {
return create_failure_message("Tasks have to be either queued or stashed.");
return failure_msg!("Tasks have to be either queued or stashed.");
}
if task_ids[0] == task_ids[1] {
return create_failure_message("You cannot switch a task with itself.");
return failure_msg!("You cannot switch a task with itself.");
}
// Get the tasks. Expect them to be there, since we found no mismatch
@ -55,7 +56,7 @@ pub fn switch(message: SwitchMessage, state: &SharedState, settings: &Settings)
}
}
ok_or_return_failure_message!(save_state(&state, settings));
ok_or_save_state_failure!(save_state(&state, settings));
create_success_message("Tasks have been switched")
}
@ -112,7 +113,7 @@ mod tests {
fn switch_normal() {
let (state, settings, _tempdir) = get_test_state();
let message = switch(get_message(1, 2), &state, &settings);
let message = switch(&settings, &state, get_message(1, 2));
// Return message is correct
assert!(matches!(message, Message::Success(_)));
@ -130,7 +131,7 @@ mod tests {
fn switch_task_with_itself() {
let (state, settings, _tempdir) = get_test_state();
let message = switch(get_message(1, 1), &state, &settings);
let message = switch(&settings, &state, get_message(1, 1));
// Return message is correct
assert!(matches!(message, Message::Failure(_)));
@ -145,7 +146,7 @@ mod tests {
fn switch_task_with_dependant() {
let (state, settings, _tempdir) = get_test_state();
switch(get_message(0, 3), &state, &settings);
switch(&settings, &state, get_message(0, 3));
let state = state.lock().unwrap();
assert_eq!(state.tasks.get(&4).unwrap().dependencies, vec![0, 3]);
@ -157,7 +158,7 @@ mod tests {
fn switch_double_dependency() {
let (state, settings, _tempdir) = get_test_state();
switch(get_message(1, 2), &state, &settings);
switch(&settings, &state, get_message(1, 2));
let state = state.lock().unwrap();
assert_eq!(state.tasks.get(&5).unwrap().dependencies, vec![2]);
@ -182,7 +183,7 @@ mod tests {
];
for ids in combinations {
let message = switch(get_message(ids.0, ids.1), &state, &settings);
let message = switch(&settings, &state, get_message(ids.0, ids.1));
// Assert, that we get a Failure message with the correct text.
assert!(matches!(message, Message::Failure(_)));

View file

@ -2,5 +2,3 @@ pub mod follow_log;
pub mod message_handler;
pub mod response_helper;
pub mod socket;
use super::TaskSender;

View file

@ -6,7 +6,7 @@ use pueue_lib::task::Task;
use crate::daemon::state_helper::LockedState;
/// Check whether the given group exists. Return an failure message if it doesn't.
/// Check whether a given group exists. Return a failure message if it doesn't.
pub fn ensure_group_exists<'state>(
state: &'state mut LockedState,
group: &str,
@ -24,8 +24,8 @@ pub fn ensure_group_exists<'state>(
/// Compile a response for actions that affect several given tasks.
/// These actions can sometimes only succeed for a part of the given tasks.
///
/// That's why this helper exists, which determines based on a given criterion `filter`
/// for which tasks the action succeeded and which tasks failed.
/// That's why this helper exists, which determines for which tasks the action succeeded
/// and which tasks failed, based on a given `filter` criterion.
pub fn task_action_response_helper<F>(
message: &str,
task_ids: Vec<usize>,
@ -41,7 +41,7 @@ where
compile_task_response(message, filtered_tasks)
}
/// Compile a response for instructions with multiple tasks ids
/// Compile a response for instructions with multiple tasks ids.
/// A custom message will be combined with a text about all matching tasks
/// and possibly tasks for which the instruction cannot be executed.
pub fn compile_task_response(message: &str, filtered_tasks: FilteredTasks) -> Message {

View file

@ -13,16 +13,12 @@ use pueue_lib::settings::Settings;
use pueue_lib::state::SharedState;
use crate::daemon::network::follow_log::handle_follow;
use crate::daemon::network::message_handler::{handle_message, SENDER_ERR};
use crate::daemon::task_handler::TaskSender;
use crate::daemon::network::message_handler::handle_message;
use crate::daemon::process_handler::initiate_shutdown;
/// Poll the listener and accept new incoming connections.
/// Create a new future to handle the message and spawn it.
pub async fn accept_incoming(
sender: TaskSender,
state: SharedState,
settings: Settings,
) -> Result<()> {
pub async fn accept_incoming(settings: Settings, state: SharedState) -> Result<()> {
let listener = get_listener(&settings.shared).await?;
// Read secret once to prevent multiple disk reads.
let secret = read_shared_secret(&settings.shared.shared_secret_path())?;
@ -38,19 +34,11 @@ pub async fn accept_incoming(
};
// Start a new task for the request
let sender_clone = sender.clone();
let state_clone = state.clone();
let secret_clone = secret.clone();
let settings_clone = settings.clone();
tokio::spawn(async move {
let _result = handle_incoming(
stream,
sender_clone,
state_clone,
settings_clone,
secret_clone,
)
.await;
let _result = handle_incoming(stream, state_clone, settings_clone, secret_clone).await;
});
}
}
@ -60,7 +48,6 @@ pub async fn accept_incoming(
/// The response future is added to unix_responses and handled in a separate function.
async fn handle_incoming(
mut stream: GenericStream,
sender: TaskSender,
state: SharedState,
settings: Settings,
secret: Vec<u8>,
@ -137,14 +124,14 @@ async fn handle_incoming(
let response = create_success_message("Daemon is shutting down");
send_message(response, &mut stream).await?;
// Notify the task handler.
sender.send(shutdown_type).expect(SENDER_ERR);
let mut state = state.lock().unwrap();
initiate_shutdown(&settings, &mut state, shutdown_type);
return Ok(());
}
_ => {
// Process a normal message.
handle_message(message, &sender, &state, &settings)
handle_message(message, &state, &settings)
}
};

View file

@ -0,0 +1,129 @@
use anyhow::Context;
use chrono::Local;
use log::info;
use pueue_lib::log::clean_log_handles;
use pueue_lib::task::{TaskResult, TaskStatus};
use super::*;
use crate::daemon::callbacks::spawn_callback;
use crate::daemon::state_helper::{pause_on_failure, save_state, LockedState};
use crate::ok_or_shutdown;
/// Check whether there are any finished processes
/// In case there are, handle them and update the shared state
pub fn handle_finished_tasks(settings: &Settings, state: &mut LockedState) {
// Clone the state ref, so we don't have two mutable borrows later on.
let finished = get_finished(state);
// Nothing to do. Early return
if finished.is_empty() {
return;
}
for ((task_id, group, worker_id), error) in finished.iter() {
// Handle std::io errors on child processes.
// I have never seen something like this, but it might happen.
if let Some(error) = error {
let (_taks_id, _child) = state
.children
.0
.get_mut(group)
.expect("Worker group must exist when handling finished tasks.")
.remove(worker_id)
.expect("Errored child went missing while handling finished task.");
// Update the tasks's state and return a clone for callbacks and notifications.
let task = {
let task = state.tasks.get_mut(task_id).unwrap();
task.status = TaskStatus::Done(TaskResult::Errored);
task.end = Some(Local::now());
task.clone()
};
spawn_callback(settings, state, &task);
error!("Child {} failed with io::Error: {:?}", task_id, error);
pause_on_failure(state, settings, &task.group);
continue;
}
// Handle any tasks that exited with some kind of exit code
let (_task_id, mut child) = state
.children
.0
.get_mut(group)
.expect("Worker group must exist when handling finished tasks.")
.remove(worker_id)
.expect("Child of task {} went away while handling finished task.");
// Get the exit code of the child.
// Errors really shouldn't happen in here, since we already checked if it's finished
// with try_wait() before.
let exit_code_result = child.wait();
let exit_code = exit_code_result
.context(format!(
"Failed on wait() for finished task {task_id} with error: {error:?}"
))
.unwrap()
.code();
// Processes with exit code 0 exited successfully
// Processes with `None` have been killed by a Signal
let result = match exit_code {
Some(0) => TaskResult::Success,
Some(exit_code) => TaskResult::Failed(exit_code),
None => TaskResult::Killed,
};
// Update the tasks's state and return a clone for callbacks and notifications.
let task = {
let task = state
.tasks
.get_mut(task_id)
.expect("Task was removed before child process has finished!");
task.status = TaskStatus::Done(result.clone());
task.end = Some(Local::now());
task.clone()
};
spawn_callback(settings, state, &task);
if let TaskResult::Failed(_) = result {
pause_on_failure(state, settings, &task.group);
}
// Already remove the output files, if the daemon is being reset anyway
if state.full_reset {
clean_log_handles(*task_id, &settings.shared.pueue_directory());
}
}
ok_or_shutdown!(settings, state, save_state(state, settings));
}
/// Gather all finished tasks and sort them by finished and errored.
/// Returns a list of finished task ids and whether they errored or not.
fn get_finished(state: &mut LockedState) -> Vec<((usize, String, usize), Option<std::io::Error>)> {
let mut finished = Vec::new();
for (group, children) in state.children.0.iter_mut() {
for (worker_id, (task_id, child)) in children.iter_mut() {
match child.try_wait() {
// Handle a child error.
Err(error) => {
finished.push(((*task_id, group.clone(), *worker_id), Some(error)));
}
// Child process did not exit yet
Ok(None) => continue,
Ok(_exit_status) => {
info!("Task {task_id} just finished");
finished.push(((*task_id, group.clone(), *worker_id), None));
}
}
}
}
finished
}

View file

@ -0,0 +1,125 @@
use log::{error, info, warn};
use pueue_lib::network::message::{Signal, TaskSelection};
use pueue_lib::process_helper::*;
use pueue_lib::settings::Settings;
use pueue_lib::state::GroupStatus;
use pueue_lib::task::TaskStatus;
use crate::daemon::state_helper::{save_state, LockedState};
use crate::ok_or_shutdown;
/// Kill specific tasks or groups.
///
/// By default, this kills tasks with Rust's subprocess handling "kill" logic.
/// However, the user can decide to send unix signals to the processes as well.
///
/// `issued_by_user` This is `true` when a kill is issued by an actual user.
/// It is `false`, if the daemon resets or during shutdown.
///
/// In case `true` is given and a `group` or `all` are killed the affected groups should
/// be paused under some circumstances. is mostly to prevent any further task execution
/// during an emergency. These circumstances are:
/// - There're further queued or scheduled tasks in a killed group.
///
/// `signal` Don't kill the task as usual, but rather send a unix process signal.
pub fn kill(
settings: &Settings,
state: &mut LockedState,
tasks: TaskSelection,
issued_by_user: bool,
signal: Option<Signal>,
) {
// Get the keys of all tasks that should be resumed
let task_ids = match tasks {
TaskSelection::TaskIds(task_ids) => task_ids,
TaskSelection::Group(group_name) => {
// Ensure that a given group exists. (Might not happen due to concurrency)
if !state.groups.contains_key(&group_name) {
return;
};
// Check whether the group should be paused before killing the tasks.
if should_pause_group(state, issued_by_user, &group_name) {
let group = state.groups.get_mut(&group_name).unwrap();
group.status = GroupStatus::Paused;
}
// Determine all running or paused tasks in that group.
let filtered_tasks = state.filter_tasks_of_group(
|task| matches!(task.status, TaskStatus::Running | TaskStatus::Paused),
&group_name,
);
info!("Killing tasks of group {group_name}");
filtered_tasks.matching_ids
}
TaskSelection::All => {
// Pause all groups, if applicable
let group_names: Vec<String> = state.groups.keys().cloned().collect();
for group_name in group_names {
if should_pause_group(state, issued_by_user, &group_name) {
state.set_status_for_all_groups(GroupStatus::Paused);
}
}
info!("Killing all running tasks");
state.children.all_task_ids()
}
};
for task_id in task_ids {
if let Some(signal) = signal.clone() {
send_internal_signal(state, task_id, signal);
} else {
kill_task(state, task_id);
}
}
ok_or_shutdown!(settings, state, save_state(state, settings));
}
/// Send a signal to a specific child process.
/// This is a wrapper around [send_signal_to_child], which does a little bit of
/// additional error handling.
pub fn send_internal_signal(state: &mut LockedState, task_id: usize, signal: Signal) {
let child = match state.children.get_child_mut(task_id) {
Some(child) => child,
None => {
warn!("Tried to kill non-existing child: {task_id}");
return;
}
};
if let Err(err) = send_signal_to_child(child, signal) {
warn!("Failed to send signal to task {task_id} with error: {err}");
};
}
/// Kill a specific task and handle it accordingly.
/// Triggered on `reset` and `kill`.
pub fn kill_task(state: &mut LockedState, task_id: usize) {
if let Some(child) = state.children.get_child_mut(task_id) {
kill_child(task_id, child).unwrap_or_else(|err| {
warn!(
"Failed to send kill to task {task_id} child process {child:?} with error {err:?}"
);
})
} else {
warn!("Tried to kill non-existing child: {task_id}");
}
}
/// Determine, whether a group should be paused during a kill command.
/// It should only be paused if:
/// - The kill was issued by the user, i.e. it wasn't issued by a system during shutdown/reset.
/// - The group that's being killed must have queued or stashed-enqueued tasks.
fn should_pause_group(state: &LockedState, issued_by_user: bool, group: &str) -> bool {
if !issued_by_user {
return false;
}
// Check if there're tasks that're queued or enqueued.
let filtered_tasks = state.filter_tasks_of_group(|task| task.is_queued(), group);
!filtered_tasks.matching_ids.is_empty()
}

View file

@ -0,0 +1,66 @@
use anyhow::Result;
use log::{debug, error};
use pueue_lib::network::message::{Shutdown, TaskSelection};
use pueue_lib::process_helper::{send_signal_to_child, ProcessAction};
use pueue_lib::settings::Settings;
use super::state_helper::LockedState;
pub mod finish;
pub mod kill;
pub mod pause;
pub mod spawn;
pub mod start;
/// This is a little helper macro, which looks at a critical result and shuts the
/// TaskHandler down, if an error occurred. This is mostly used if the state cannot
/// be written due to IO errors.
/// Those errors are considered unrecoverable and we should initiate a graceful shutdown
/// immediately.
#[macro_export]
macro_rules! ok_or_shutdown {
($settings:expr, $state:expr, $result:expr) => {
match $result {
Err(err) => {
use log::error;
use pueue_lib::network::message::Shutdown;
use $crate::daemon::process_handler::initiate_shutdown;
error!("Initializing graceful shutdown. Encountered error in TaskHandler: {err}");
initiate_shutdown($settings, $state, Shutdown::Emergency);
return;
}
Ok(inner) => inner,
}
};
}
/// Initiate shutdown, which includes killing all children and pausing all groups.
/// We don't have to pause any groups, as no new tasks will be spawned during shutdown anyway.
/// Any groups with queued tasks, will be automatically paused on state-restoration.
pub fn initiate_shutdown(settings: &Settings, state: &mut LockedState, shutdown: Shutdown) {
// Only start shutdown if we aren't already in one.
// Otherwise, we might end up with an endless recursion as `kill` might fail and initiate shutdown
// once again.
if state.shutdown.is_none() {
state.shutdown = Some(shutdown);
self::kill::kill(settings, state, TaskSelection::All, false, None);
}
}
/// This is a small wrapper around the real platform dependant process handling logic
/// It only ensures, that the process we want to manipulate really does exists.
pub fn perform_action(state: &mut LockedState, id: usize, action: ProcessAction) -> Result<bool> {
match state.children.get_child_mut(id) {
Some(child) => {
debug!("Executing action {action:?} to {id}");
send_signal_to_child(child, &action)?;
Ok(true)
}
None => {
error!("Tried to execute action {action:?} to non existing task {id}");
Ok(false)
}
}
}

View file

@ -0,0 +1,69 @@
use log::{error, info};
use pueue_lib::network::message::TaskSelection;
use pueue_lib::process_helper::ProcessAction;
use pueue_lib::settings::Settings;
use pueue_lib::state::GroupStatus;
use pueue_lib::task::TaskStatus;
use crate::daemon::state_helper::{save_state, LockedState};
use crate::ok_or_shutdown;
use super::perform_action;
/// Pause specific tasks or groups.
///
/// `wait` decides, whether running tasks will kept running until they finish on their own.
pub fn pause(settings: &Settings, state: &mut LockedState, selection: TaskSelection, wait: bool) {
// Get the keys of all tasks that should be paused
let keys: Vec<usize> = match selection {
TaskSelection::TaskIds(task_ids) => task_ids,
TaskSelection::Group(group_name) => {
// Ensure that a given group exists. (Might not happen due to concurrency)
let group = match state.groups.get_mut(&group_name) {
Some(group) => group,
None => return,
};
// Pause a specific group.
group.status = GroupStatus::Paused;
info!("Pausing group {group_name}");
let filtered_tasks = state.filter_tasks_of_group(
|task| matches!(task.status, TaskStatus::Running),
&group_name,
);
filtered_tasks.matching_ids
}
TaskSelection::All => {
// Pause all groups, since we're pausing the whole daemon.
state.set_status_for_all_groups(GroupStatus::Paused);
info!("Pausing everything");
state.children.all_task_ids()
}
};
// Pause all tasks that were found.
if !wait {
for id in keys {
pause_task(state, id);
}
}
ok_or_shutdown!(settings, state, save_state(state, settings));
}
/// Pause a specific task.
/// Send a signal to the process to actually pause the OS process.
fn pause_task(state: &mut LockedState, id: usize) {
match perform_action(state, id, ProcessAction::Pause) {
Err(err) => error!("Failed pausing task {id}: {err:?}"),
Ok(success) => {
if success {
state.change_status(id, TaskStatus::Paused);
}
}
}
}

View file

@ -0,0 +1,219 @@
use std::io::Write;
use std::process::Stdio;
use chrono::Local;
use command_group::CommandGroup;
use log::{error, info};
use pueue_lib::log::{create_log_file_handles, get_writable_log_file_handle};
use pueue_lib::process_helper::compile_shell_command;
use pueue_lib::settings::Settings;
use pueue_lib::state::GroupStatus;
use pueue_lib::task::{Task, TaskResult, TaskStatus};
use crate::daemon::callbacks::spawn_callback;
use crate::daemon::state_helper::{pause_on_failure, save_state, LockedState};
use crate::ok_or_shutdown;
/// See if we can start a new queued task.
pub fn spawn_new(settings: &Settings, state: &mut LockedState) {
// Check whether a new task can be started.
// Spawn tasks until we no longer have free slots available.
while let Some(id) = get_next_task_id(state) {
spawn_process(settings, state, id);
}
}
/// Search and return the next task that can be started.
/// Precondition for a task to be started:
/// - is in Queued state
/// - There are free slots in the task's group
/// - The group is running
/// - has all its dependencies in `Done` state
///
/// Order at which tasks are picked (descending relevancy):
/// - Task with highest priority first
/// - Task with lowest ID first
pub fn get_next_task_id(state: &LockedState) -> Option<usize> {
// Get all tasks that could theoretically be started right now.
let mut potential_tasks: Vec<&Task> = state
.tasks
.iter()
.filter(|(_, task)| task.status == TaskStatus::Queued)
.filter(|(_, task)| {
// Make sure the task is assigned to an existing group.
let group = match state.groups.get(&task.group) {
Some(group) => group,
None => {
error!(
"Got task with unknown group {}. Please report this!",
&task.group
);
return false;
}
};
// Let's check if the group is running. If it isn't, simply return false.
if group.status != GroupStatus::Running {
return false;
}
// If parallel tasks are set to `0`, this means an unlimited amount of tasks may
// run at any given time.
if group.parallel_tasks == 0 {
return true;
}
// Get the currently running tasks by looking at the actually running processes.
// They're sorted by group, which makes this quite convenient.
let running_tasks = match state.children.0.get(&task.group) {
Some(children) => children.len(),
None => {
error!(
"Got valid group {}, but no worker pool has been initialized. This is a bug!",
&task.group
);
return false
}
};
// Make sure there are free slots in the task's group
running_tasks < group.parallel_tasks
})
.filter(|(_, task)| {
// Check whether all dependencies for this task are fulfilled.
task.dependencies
.iter()
.flat_map(|id| state.tasks.get(id))
.all(|task| matches!(task.status, TaskStatus::Done(TaskResult::Success)))
})
.map(|(_, task)| {task})
.collect();
// Order the tasks based on their priortiy and their task id.
// Tasks with higher priority go first.
// Tasks with the same priority are ordered by their id in ascending order, meaning that
// tasks with smaller id will be processed first.
potential_tasks.sort_by(|a, b| {
// If they have the same prio, decide the execution order by task_id!
if a.priority == b.priority {
return a.id.cmp(&b.id);
}
// Otherwise, let the priority decide.
b.priority.cmp(&a.priority)
});
// Return the id of the first task (if one has been found).
potential_tasks.first().map(|task| task.id)
}
/// Actually spawn a new sub process
/// The output of subprocesses is piped into a separate file for easier access
pub fn spawn_process(settings: &Settings, state: &mut LockedState, task_id: usize) {
// Check if the task exists and can actually be spawned. Otherwise do an early return.
match state.tasks.get(&task_id) {
Some(task) => {
if !matches!(
&task.status,
TaskStatus::Stashed { .. } | TaskStatus::Queued | TaskStatus::Paused
) {
info!("Tried to start task with status: {}", task.status);
return;
}
}
None => {
info!("Tried to start non-existing task: {task_id}");
return;
}
};
let pueue_directory = settings.shared.pueue_directory();
// Try to get the log file to which the output of the process will be written to.
// Panic if this doesn't work! This is unrecoverable.
let (stdout_log, stderr_log) = match create_log_file_handles(task_id, &pueue_directory) {
Ok((out, err)) => (out, err),
Err(err) => {
panic!("Failed to create child log files: {err:?}");
}
};
// Get all necessary info for starting the task
let (command, path, group, mut envs) = {
let task = state.tasks.get(&task_id).unwrap();
(
task.command.clone(),
task.path.clone(),
task.group.clone(),
task.envs.clone(),
)
};
// Build the shell command that should be executed.
let mut command = compile_shell_command(settings, &command);
// Determine the worker's id depending on the current group.
// Inject that info into the environment.
let worker_id = state.children.get_next_group_worker(&group);
envs.insert("PUEUE_GROUP".into(), group.clone());
envs.insert("PUEUE_WORKER_ID".into(), worker_id.to_string());
// Spawn the actual subprocess
let spawned_command = command
.current_dir(path)
.stdin(Stdio::piped())
.env_clear()
.envs(envs.clone())
.stdout(Stdio::from(stdout_log))
.stderr(Stdio::from(stderr_log))
.group_spawn();
// Check if the task managed to spawn
let child = match spawned_command {
Ok(child) => child,
Err(err) => {
let error = format!("Failed to spawn child {task_id} with err: {err:?}");
error!("{}", error);
// Write some debug log output to the task's log file.
// This should always work, but print a datailed error if it didn't work.
if let Ok(mut file) = get_writable_log_file_handle(task_id, &pueue_directory) {
let log_output =
format!("Pueue error, failed to spawn task. Check your command.\n{error}");
let write_result = file.write_all(log_output.as_bytes());
if let Err(write_err) = write_result {
error!("Failed to write spawn error to task log: {}", write_err);
}
}
// Update all necessary fields on the task.
let task = {
let task = state.tasks.get_mut(&task_id).unwrap();
task.status = TaskStatus::Done(TaskResult::FailedToSpawn(error));
task.start = Some(Local::now());
task.end = Some(Local::now());
task.clone()
};
// Spawn any callback if necessary
spawn_callback(settings, state, &task);
pause_on_failure(state, settings, &task.group);
ok_or_shutdown!(settings, state, save_state(state, settings));
return;
}
};
// Save the process handle in our self.children datastructure.
state.children.add_child(&group, worker_id, task_id, child);
let task = state.tasks.get_mut(&task_id).unwrap();
task.start = Some(Local::now());
task.status = TaskStatus::Running;
// Overwrite the task's environment variables with the new ones, containing the
// PUEUE_WORKER_ID and PUEUE_GROUP variables.
task.envs = envs;
info!("Started task: {}", task.command);
ok_or_shutdown!(settings, state, save_state(state, settings));
}

View file

@ -0,0 +1,95 @@
use log::{error, info, warn};
use pueue_lib::{
network::message::TaskSelection, process_helper::ProcessAction, settings::Settings,
state::GroupStatus, task::TaskStatus,
};
use crate::daemon::process_handler::spawn::spawn_process;
use crate::daemon::state_helper::{save_state, LockedState};
use crate::ok_or_shutdown;
use super::perform_action;
/// Start specific tasks or groups.
///
/// By default, this command only resumes tasks.
/// However, if specific task_ids are provided, tasks can actually be force-started.
/// Of course, they can only be started if they're in a valid status, i.e. Queued/Stashed.
pub fn start(settings: &Settings, state: &mut LockedState, tasks: TaskSelection) {
let task_ids = match tasks {
TaskSelection::TaskIds(task_ids) => {
// Start specific tasks.
// This is handled differently and results in an early return, as this branch is
// capable of force-spawning processes, instead of simply resuming tasks.
for task_id in task_ids {
// Continue all children that are simply paused
if state.children.has_child(task_id) {
continue_task(state, task_id);
} else {
// Start processes for all tasks that haven't been started yet
spawn_process(settings, state, task_id);
}
}
ok_or_shutdown!(settings, state, save_state(state, settings));
return;
}
TaskSelection::Group(group_name) => {
// Ensure that a given group exists. (Might not happen due to concurrency)
let group = match state.groups.get_mut(&group_name) {
Some(group) => group,
None => return,
};
// Set the group to running.
group.status = GroupStatus::Running;
info!("Resuming group {}", &group_name);
let filtered_tasks = state.filter_tasks_of_group(
|task| matches!(task.status, TaskStatus::Paused),
&group_name,
);
filtered_tasks.matching_ids
}
TaskSelection::All => {
// Resume all groups and the default queue
info!("Resuming everything");
state.set_status_for_all_groups(GroupStatus::Running);
state.children.all_task_ids()
}
};
// Resume all specified paused tasks
for task_id in task_ids {
continue_task(state, task_id);
}
ok_or_shutdown!(settings, state, save_state(state, settings));
}
/// Send a start signal to a paused task to continue execution.
fn continue_task(state: &mut LockedState, task_id: usize) {
// Task doesn't exist
if !state.children.has_child(task_id) {
return;
}
// Task is already done
if state.tasks.get(&task_id).unwrap().is_done() {
return;
}
let success = match perform_action(state, task_id, ProcessAction::Resume) {
Err(err) => {
warn!("Failed to resume task {}: {:?}", task_id, err);
false
}
Ok(success) => success,
};
if success {
state.change_status(task_id, TaskStatus::Running);
}
}

View file

@ -0,0 +1,203 @@
use std::collections::BTreeMap;
use std::time::Duration;
use anyhow::Result;
use chrono::prelude::*;
use log::{error, info};
use pueue_lib::children::Children;
use pueue_lib::log::*;
use pueue_lib::network::message::*;
use pueue_lib::network::protocol::socket_cleanup;
use pueue_lib::settings::Settings;
use pueue_lib::state::{Group, GroupStatus, SharedState};
use pueue_lib::task::{TaskResult, TaskStatus};
use crate::daemon::pid::cleanup_pid_file;
use crate::daemon::state_helper::{reset_state, save_state};
use crate::ok_or_shutdown;
use super::callbacks::{check_callbacks, spawn_callback};
use super::process_handler::finish::handle_finished_tasks;
use super::process_handler::spawn::spawn_new;
use super::state_helper::LockedState;
/// Main task handling loop.
/// In here a few things happen:
///
/// - Handle finished tasks, i.e. cleanup processes, update statuses.
/// - Callback handling logic. This is rather uncritical.
/// - Enqueue any stashed processes which are ready for being queued.
/// - Ensure tasks with dependencies have no failed ancestors
/// - Handle shutdown logic (graceful & not graceful).
/// - If the client requested a reset: reset the state if all children have been killed and handled.
/// - Check whether we can spawn new tasks.
///
/// We also wait for 300ms to prevent this loop from running hot.
pub async fn run(state: SharedState, settings: Settings) -> Result<()> {
// Initialize the subprocess management structure.
{
let mut state = state.lock().unwrap();
let mut pools = BTreeMap::new();
for group in state.groups.keys() {
pools.insert(group.clone(), BTreeMap::new());
}
state.children = Children(pools);
}
loop {
{
let mut state = state.lock().unwrap();
check_callbacks(&mut state);
handle_finished_tasks(&settings, &mut state);
enqueue_delayed_tasks(&settings, &mut state);
check_failed_dependencies(&settings, &mut state);
if state.shutdown.is_some() {
// Check if we're in shutdown.
// If all tasks are killed, we do some cleanup and exit.
handle_shutdown(&settings, &mut state);
} else if state.full_reset {
// Wait until all tasks are killed.
// Once they are, reset everything and go back to normal
handle_reset(&settings, &mut state);
} else {
// Only start new tasks, if we aren't in the middle of a reset or shutdown.
spawn_new(&settings, &mut state);
}
}
tokio::time::sleep(Duration::from_millis(300)).await;
}
}
/// Check if all tasks are killed.
/// If they aren't, we'll wait a little longer.
/// Once they're, we do some cleanup and exit.
fn handle_shutdown(settings: &Settings, state: &mut LockedState) {
// There are still active tasks. Continue waiting until they're killed and cleaned up.
if state.children.has_active_tasks() {
return;
}
// Remove the unix socket.
if let Err(error) = socket_cleanup(&settings.shared) {
println!("Failed to cleanup socket during shutdown.");
println!("{error}");
}
// Cleanup the pid file
if let Err(error) = cleanup_pid_file(&settings.shared.pid_path()) {
println!("Failed to cleanup pid during shutdown.");
println!("{error}");
}
// Actually exit the program the way we're supposed to.
// Depending on the current shutdown type, we exit with different exit codes.
if matches!(state.shutdown, Some(Shutdown::Emergency)) {
std::process::exit(1);
}
std::process::exit(0);
}
/// Users can issue to reset the daemon.
/// If that's the case, the `self.full_reset` flag is set to true, all children are killed
/// and no new tasks will be spawned.
/// This function checks, if all killed children have been handled.
/// If that's the case, completely reset the state
fn handle_reset(settings: &Settings, state: &mut LockedState) {
// Don't do any reset logic, if we aren't in reset mode or if some children are still up.
if state.children.has_active_tasks() {
return;
}
if let Err(error) = reset_state(state, settings) {
error!("Failed to reset state with error: {error:?}");
};
if let Err(error) = reset_task_log_directory(&settings.shared.pueue_directory()) {
panic!("Error while resetting task log directory: {error}");
};
state.full_reset = false;
}
/// As time passes, some delayed tasks may need to be enqueued.
/// Gather all stashed tasks and enqueue them if it is after the task's enqueue_at
fn enqueue_delayed_tasks(settings: &Settings, state: &mut LockedState) {
let mut changed = false;
for (_, task) in state.tasks.iter_mut() {
if let TaskStatus::Stashed {
enqueue_at: Some(time),
} = task.status
{
if time <= Local::now() {
info!("Enqueuing delayed task : {}", task.id);
task.status = TaskStatus::Queued;
task.enqueued_at = Some(Local::now());
changed = true;
}
}
}
// Save the state if a task has been enqueued
if changed {
ok_or_shutdown!(settings, state, save_state(state, settings));
}
}
/// Ensure that no `Queued` tasks have any failed dependencies.
/// Otherwise set their status to `Done` and result to `DependencyFailed`.
fn check_failed_dependencies(settings: &Settings, state: &mut LockedState) {
// Get id's of all tasks with failed dependencies
let has_failed_deps: Vec<_> = state
.tasks
.iter()
.filter(|(_, task)| task.status == TaskStatus::Queued && !task.dependencies.is_empty())
.filter_map(|(id, task)| {
// At this point we got all queued tasks with dependencies.
// Go through all dependencies and ensure they didn't fail.
let failed = task
.dependencies
.iter()
.flat_map(|id| state.tasks.get(id))
.filter(|task| task.failed())
.map(|task| task.id)
.next();
failed.map(|f| (*id, f))
})
.collect();
// Update the state of all tasks with failed dependencies.
for (id, _) in has_failed_deps {
// Get the task's group, since we have to check if it's paused.
let group = if let Some(task) = state.tasks.get(&id) {
task.group.clone()
} else {
continue;
};
// Only update the status, if the group isn't paused.
// This allows users to fix and restart dependencies in-place without
// breaking the dependency chain.
if let Some(&Group {
status: GroupStatus::Paused,
..
}) = state.groups.get(&group)
{
continue;
}
// Update the task and return a clone to build the callback.
let task = {
let task = state.tasks.get_mut(&id).unwrap();
task.status = TaskStatus::Done(TaskResult::DependencyFailed);
task.start = Some(Local::now());
task.end = Some(Local::now());
task.clone()
};
spawn_callback(settings, state, &task);
}
}

View file

@ -1,127 +0,0 @@
use handlebars::RenderError;
use super::*;
impl TaskHandler {
/// Users can specify a callback that's fired whenever a task finishes.
/// Execute the callback by spawning a new subprocess.
pub fn spawn_callback(&mut self, task: &Task) {
// Return early, if there's no callback specified
let Some(template_string) = &self.settings.daemon.callback else {
return;
};
// Build the command to be called from the template string in the configuration file.
let callback_command = match self.build_callback_command(task, template_string) {
Ok(callback_command) => callback_command,
Err(err) => {
error!("Failed to create callback command from template with error: {err}");
return;
}
};
let mut command = compile_shell_command(&self.settings, &callback_command);
// Spawn the callback subprocess and log if it fails.
let spawn_result = command.spawn();
let child = match spawn_result {
Err(error) => {
error!("Failed to spawn callback with error: {error}");
return;
}
Ok(child) => child,
};
debug!("Spawned callback for task {}", task.id);
self.callbacks.push(child);
}
/// Take the callback template string from the configuration and insert all parameters from the
/// finished task.
pub fn build_callback_command(
&self,
task: &Task,
template_string: &str,
) -> Result<String, RenderError> {
// Init Handlebars. We set to strict, as we want to show an error on missing variables.
let mut handlebars = Handlebars::new();
handlebars.set_strict_mode(true);
// Add templating variables.
let mut parameters = HashMap::new();
parameters.insert("id", task.id.to_string());
parameters.insert("command", task.command.clone());
parameters.insert("path", (*task.path.to_string_lossy()).to_owned());
parameters.insert("group", task.group.clone());
// Result takes the TaskResult Enum strings, unless it didn't finish yet.
if let TaskStatus::Done(result) = &task.status {
parameters.insert("result", result.to_string());
} else {
parameters.insert("result", "None".into());
}
// Format and insert start and end times.
let print_time = |time: Option<DateTime<Local>>| {
time.map(|time| time.timestamp().to_string())
.unwrap_or_default()
};
parameters.insert("start", print_time(task.start));
parameters.insert("end", print_time(task.end));
// Read the last lines of the process' output and make it available.
if let Ok(output) = read_last_log_file_lines(
task.id,
&self.pueue_directory,
self.settings.daemon.callback_log_lines,
) {
parameters.insert("output", output);
} else {
parameters.insert("output", "".to_string());
}
let out_path = get_log_path(task.id, &self.pueue_directory);
// Using Display impl of PathBuf which isn't necessarily a perfect
// representation of the path but should work for most cases here
parameters.insert("output_path", out_path.display().to_string());
// Get the exit code
if let TaskStatus::Done(result) = &task.status {
match result {
TaskResult::Success => parameters.insert("exit_code", "0".into()),
TaskResult::Failed(code) => parameters.insert("exit_code", code.to_string()),
_ => parameters.insert("exit_code", "None".into()),
};
} else {
parameters.insert("exit_code", "None".into());
}
handlebars.render_template(template_string, &parameters)
}
/// Look at all running callbacks and log any errors.
/// If everything went smoothly, simply remove them from the list.
pub fn check_callbacks(&mut self) {
let mut finished = Vec::new();
for (id, child) in self.callbacks.iter_mut().enumerate() {
match child.try_wait() {
// Handle a child error.
Err(error) => {
error!("Callback failed with error {error:?}");
finished.push(id);
}
// Child process did not exit yet.
Ok(None) => continue,
Ok(exit_status) => {
info!("Callback finished with exit code {exit_status:?}");
finished.push(id);
}
}
}
finished.reverse();
for id in finished.iter() {
self.callbacks.remove(*id);
}
}
}

View file

@ -1,60 +0,0 @@
use super::*;
use pueue_lib::state::Group;
impl TaskHandler {
/// Ensure that no `Queued` tasks have any failed dependencies.
/// Otherwise set their status to `Done` and result to `DependencyFailed`.
pub fn check_failed_dependencies(&mut self) {
// Clone the state ref, so we don't have two mutable borrows later on.
let state_ref = self.state.clone();
let mut state = state_ref.lock().unwrap();
// Get id's of all tasks with failed dependencies
let has_failed_deps: Vec<_> = state
.tasks
.iter()
.filter(|(_, task)| task.status == TaskStatus::Queued && !task.dependencies.is_empty())
.filter_map(|(id, task)| {
// At this point we got all queued tasks with dependencies.
// Go through all dependencies and ensure they didn't fail.
let failed = task
.dependencies
.iter()
.flat_map(|id| state.tasks.get(id))
.filter(|task| task.failed())
.map(|task| task.id)
.next();
failed.map(|f| (*id, f))
})
.collect();
// Update the state of all tasks with failed dependencies.
for (id, _) in has_failed_deps {
// Get the task's group, since we have to check if it's paused.
let group = if let Some(task) = state.tasks.get(&id) {
task.group.clone()
} else {
continue;
};
// Only update the status, if the group isn't paused.
// This allows users to fix and restart dependencies in-place without
// breaking the dependency chain.
if let Some(&Group {
status: GroupStatus::Paused,
..
}) = state.groups.get(&group)
{
continue;
}
let task = state.tasks.get_mut(&id).unwrap();
task.status = TaskStatus::Done(TaskResult::DependencyFailed);
task.start = Some(Local::now());
task.end = Some(Local::now());
self.spawn_callback(task);
}
}
}

View file

@ -1,127 +0,0 @@
use anyhow::Context;
use super::*;
use crate::daemon::state_helper::{pause_on_failure, save_state};
use crate::ok_or_shutdown;
impl TaskHandler {
/// Check whether there are any finished processes
/// In case there are, handle them and update the shared state
pub fn handle_finished_tasks(&mut self) {
let finished = self.get_finished();
// Nothing to do. Early return
if finished.is_empty() {
return;
}
// Clone the state ref, so we don't have two mutable borrows later on.
let state_ref = self.state.clone();
let mut state = state_ref.lock().unwrap();
for ((task_id, group, worker_id), error) in finished.iter() {
// Handle std::io errors on child processes.
// I have never seen something like this, but it might happen.
if let Some(error) = error {
let (_taks_id, _child) = self
.children
.0
.get_mut(group)
.expect("Worker group must exist when handling finished tasks.")
.remove(worker_id)
.expect("Errored child went missing while handling finished task.");
let group = {
let task = state.tasks.get_mut(task_id).unwrap();
task.status = TaskStatus::Done(TaskResult::Errored);
task.end = Some(Local::now());
self.spawn_callback(task);
task.group.clone()
};
error!("Child {} failed with io::Error: {:?}", task_id, error);
pause_on_failure(&mut state, &self.settings, &group);
continue;
}
// Handle any tasks that exited with some kind of exit code
let (_task_id, mut child) = self
.children
.0
.get_mut(group)
.expect("Worker group must exist when handling finished tasks.")
.remove(worker_id)
.expect("Child of task {} went away while handling finished task.");
// Get the exit code of the child.
// Errors really shouldn't happen in here, since we already checked if it's finished
// with try_wait() before.
let exit_code_result = child.wait();
let exit_code = exit_code_result
.context(format!(
"Failed on wait() for finished task {task_id} with error: {error:?}"
))
.unwrap()
.code();
// Processes with exit code 0 exited successfully
// Processes with `None` have been killed by a Signal
let result = match exit_code {
Some(0) => TaskResult::Success,
Some(exit_code) => TaskResult::Failed(exit_code),
None => TaskResult::Killed,
};
// Update all properties on the task and get the group for later
let group = {
let task = state
.tasks
.get_mut(task_id)
.expect("Task was removed before child process has finished!");
task.status = TaskStatus::Done(result.clone());
task.end = Some(Local::now());
self.spawn_callback(task);
task.group.clone()
};
if let TaskResult::Failed(_) = result {
pause_on_failure(&mut state, &self.settings, &group);
}
// Already remove the output files, if the daemon is being reset anyway
if self.full_reset {
clean_log_handles(*task_id, &self.pueue_directory);
}
}
ok_or_shutdown!(self, save_state(&state, &self.settings));
}
/// Gather all finished tasks and sort them by finished and errored.
/// Returns a list of finished task ids and whether they errored or not.
fn get_finished(&mut self) -> Vec<((usize, String, usize), Option<std::io::Error>)> {
let mut finished = Vec::new();
for (group, children) in self.children.0.iter_mut() {
for (worker_id, (task_id, child)) in children.iter_mut() {
match child.try_wait() {
// Handle a child error.
Err(error) => {
finished.push(((*task_id, group.clone(), *worker_id), Some(error)));
}
// Child process did not exit yet
Ok(None) => continue,
Ok(_exit_status) => {
info!("Task {task_id} just finished");
finished.push(((*task_id, group.clone(), *worker_id), None));
}
}
}
}
finished
}
}

View file

@ -1,87 +0,0 @@
use std::collections::BTreeMap;
use log::{error, info};
use pueue_lib::network::message::GroupMessage;
use crate::daemon::state_helper::save_state;
use crate::daemon::task_handler::{Shutdown, TaskHandler};
use crate::ok_or_shutdown;
impl TaskHandler {
/// Handle the addition and the removal of groups.
///
/// This is done in the TaskHandler, as we also have to create/remove worker pools.
/// I.e. we have to touch three things:
/// - state.groups
/// - state.config.daemon.groups
/// - self.children
pub fn handle_group_message(&mut self, message: GroupMessage) {
let cloned_state_mutex = self.state.clone();
let mut state = cloned_state_mutex.lock().unwrap();
match message {
GroupMessage::List => {}
GroupMessage::Add {
name,
parallel_tasks,
} => {
if state.groups.contains_key(&name) {
error!("Group \"{name}\" already exists");
return;
}
let group = state.create_group(&name);
if let Some(parallel_tasks) = parallel_tasks {
group.parallel_tasks = parallel_tasks;
}
info!("New group \"{name}\" has been created");
// Create the worker pool.
self.children.0.insert(name, BTreeMap::new());
// Persist the state.
ok_or_shutdown!(self, save_state(&state, &self.settings));
}
GroupMessage::Remove(group) => {
if !state.groups.contains_key(&group) {
error!("Group \"{group}\" to be remove doesn't exists");
return;
}
// Make sure there are no tasks in that group.
if state.tasks.iter().any(|(_, task)| task.group == group) {
error!("Tried to remove group \"{group}\", while it still contained tasks.");
return;
}
if let Err(error) = state.remove_group(&group) {
error!("Error while removing group: \"{error}\"");
return;
}
// Make sure the worker pool exists and is empty.
// There shouldn't be any children, if there are no tasks in this group.
// Those are critical errors, as they indicate desynchronization inside our
// internal datastructures, which is really bad.
if let Some(pool) = self.children.0.get(&group) {
if !pool.is_empty() {
error!("Encountered a non-empty worker pool, while removing a group. This is a critical error. Please report this bug.");
self.initiate_shutdown(Shutdown::Emergency);
return;
}
} else {
error!("Encountered an group without an worker pool, while removing a group. This is a critical error. Please report this bug.");
self.initiate_shutdown(Shutdown::Emergency);
return;
}
// Actually remove the worker pool.
self.children.0.remove(&group);
// Persist the state.
ok_or_shutdown!(self, save_state(&state, &self.settings));
info!("Group \"{group}\" has been removed");
}
}
}
}

View file

@ -1,121 +0,0 @@
use log::{error, info, warn};
use pueue_lib::network::message::{Signal, TaskSelection};
use pueue_lib::process_helper::*;
use pueue_lib::state::GroupStatus;
use pueue_lib::task::TaskStatus;
use crate::daemon::state_helper::{save_state, LockedState};
use crate::daemon::task_handler::{Shutdown, TaskHandler};
use crate::ok_or_shutdown;
impl TaskHandler {
/// Kill specific tasks or groups.
///
/// By default, this kills tasks with Rust's subprocess handling "kill" logic.
/// However, the user can decide to send unix signals to the processes as well.
///
/// `issued_by_user` This is `true` when a kill is issued by an actual user.
/// It is `false`, if the daemon resets or during shutdown.
///
/// In case `true` is given and a `group` or `all` are killed the affected groups should
/// be paused under some circumstances. is mostly to prevent any further task execution
/// during an emergency. These circumstances are:
/// - There're further queued or scheduled tasks in a killed group.
///
/// `signal` Don't kill the task as usual, but rather send a unix process signal.
pub fn kill(&mut self, tasks: TaskSelection, issued_by_user: bool, signal: Option<Signal>) {
let cloned_state_mutex = self.state.clone();
let mut state = cloned_state_mutex.lock().unwrap();
// Get the keys of all tasks that should be resumed
let task_ids = match tasks {
TaskSelection::TaskIds(task_ids) => task_ids,
TaskSelection::Group(group_name) => {
// Ensure that a given group exists. (Might not happen due to concurrency)
if !state.groups.contains_key(&group_name) {
return;
};
// Check whether the group should be paused before killing the tasks.
if should_pause_group(&state, issued_by_user, &group_name) {
let group = state.groups.get_mut(&group_name).unwrap();
group.status = GroupStatus::Paused;
}
// Determine all running or paused tasks in that group.
let filtered_tasks = state.filter_tasks_of_group(
|task| matches!(task.status, TaskStatus::Running | TaskStatus::Paused),
&group_name,
);
info!("Killing tasks of group {group_name}");
filtered_tasks.matching_ids
}
TaskSelection::All => {
// Pause all groups, if applicable
let group_names: Vec<String> = state.groups.keys().cloned().collect();
for group_name in group_names {
if should_pause_group(&state, issued_by_user, &group_name) {
state.set_status_for_all_groups(GroupStatus::Paused);
}
}
info!("Killing all running tasks");
self.children.all_task_ids()
}
};
for task_id in task_ids {
if let Some(signal) = signal.clone() {
self.send_internal_signal(task_id, signal);
} else {
self.kill_task(task_id);
}
}
ok_or_shutdown!(self, save_state(&state, &self.settings));
}
/// Send a signal to a specific child process.
/// This is a wrapper around [send_signal_to_child], which does a little bit of
/// additional error handling.
pub fn send_internal_signal(&mut self, task_id: usize, signal: Signal) {
let child = match self.children.get_child_mut(task_id) {
Some(child) => child,
None => {
warn!("Tried to kill non-existing child: {task_id}");
return;
}
};
if let Err(err) = send_signal_to_child(child, signal) {
warn!("Failed to send signal to task {task_id} with error: {err}");
};
}
/// Kill a specific task and handle it accordingly.
/// Triggered on `reset` and `kill`.
pub fn kill_task(&mut self, task_id: usize) {
if let Some(child) = self.children.get_child_mut(task_id) {
kill_child(task_id, child).unwrap_or_else(|err| {
warn!("Failed to send kill to task {task_id} child process {child:?} with error {err:?}");
})
} else {
warn!("Tried to kill non-existing child: {task_id}");
}
}
}
/// Determine, whether a group should be paused during a kill command.
/// It should only be paused if:
/// - The kill was issued by the user, i.e. it wasn't issued by a system during shutdown/reset.
/// - The group that's being killed must have queued or stashed-enqueued tasks.
fn should_pause_group(state: &LockedState, issued_by_user: bool, group: &str) -> bool {
if !issued_by_user {
return false;
}
// Check if there're tasks that're queued or enqueued.
let filtered_tasks = state.filter_tasks_of_group(|task| task.is_queued(), group);
!filtered_tasks.matching_ids.is_empty()
}

View file

@ -1,40 +0,0 @@
use std::time::Duration;
use log::warn;
use pueue_lib::network::message::*;
use crate::daemon::task_handler::TaskHandler;
mod group;
mod kill;
mod pause;
mod send;
mod start;
impl TaskHandler {
/// Some client instructions require immediate action by the task handler
/// This function is also responsible for waiting
pub fn receive_messages(&mut self) {
// Sleep for a few milliseconds. We don't want to hurt the CPU.
let timeout = Duration::from_millis(200);
if let Ok(message) = self.receiver.recv_timeout(timeout) {
self.handle_message(message);
};
}
fn handle_message(&mut self, message: Message) {
match message {
Message::Pause(message) => self.pause(message.tasks, message.wait),
Message::Start(message) => self.start(message.tasks),
Message::Kill(message) => self.kill(message.tasks, true, message.signal),
Message::Send(message) => self.send(message.task_id, message.input),
Message::Reset(_) => self.reset(),
Message::Group(message) => self.handle_group_message(message),
Message::DaemonShutdown(shutdown) => {
self.initiate_shutdown(shutdown);
}
_ => warn!("Received unhandled message {message:?}"),
}
}
}

View file

@ -1,70 +0,0 @@
use log::{error, info};
use pueue_lib::network::message::TaskSelection;
use pueue_lib::state::GroupStatus;
use pueue_lib::task::TaskStatus;
use crate::daemon::state_helper::{save_state, LockedState};
use crate::daemon::task_handler::{ProcessAction, Shutdown, TaskHandler};
use crate::ok_or_shutdown;
impl TaskHandler {
/// Pause specific tasks or groups.
///
/// `wait` decides, whether running tasks will kept running until they finish on their own.
pub fn pause(&mut self, selection: TaskSelection, wait: bool) {
let cloned_state_mutex = self.state.clone();
let mut state = cloned_state_mutex.lock().unwrap();
// Get the keys of all tasks that should be paused
let keys: Vec<usize> = match selection {
TaskSelection::TaskIds(task_ids) => task_ids,
TaskSelection::Group(group_name) => {
// Ensure that a given group exists. (Might not happen due to concurrency)
let group = match state.groups.get_mut(&group_name) {
Some(group) => group,
None => return,
};
// Pause a specific group.
group.status = GroupStatus::Paused;
info!("Pausing group {group_name}");
let filtered_tasks = state.filter_tasks_of_group(
|task| matches!(task.status, TaskStatus::Running),
&group_name,
);
filtered_tasks.matching_ids
}
TaskSelection::All => {
// Pause all groups, since we're pausing the whole daemon.
state.set_status_for_all_groups(GroupStatus::Paused);
info!("Pausing everything");
self.children.all_task_ids()
}
};
// Pause all tasks that were found.
if !wait {
for id in keys {
self.pause_task(&mut state, id);
}
}
ok_or_shutdown!(self, save_state(&state, &self.settings));
}
/// Pause a specific task.
/// Send a signal to the process to actually pause the OS process.
fn pause_task(&mut self, state: &mut LockedState, id: usize) {
match self.perform_action(id, ProcessAction::Pause) {
Err(err) => error!("Failed pausing task {id}: {err:?}"),
Ok(success) => {
if success {
state.change_status(id, TaskStatus::Paused);
}
}
}
}
}

View file

@ -1,24 +0,0 @@
use std::io::Write;
use log::{error, warn};
use crate::daemon::task_handler::TaskHandler;
impl TaskHandler {
/// Send some input to a child process' stdin.
pub fn send(&mut self, task_id: usize, input: String) {
let child = match self.children.get_child_mut(task_id) {
Some(child) => child,
None => {
warn!("Task {task_id} finished before input could be sent");
return;
}
};
{
let child_stdin = child.inner().stdin.as_mut().unwrap();
if let Err(err) = child_stdin.write_all(&input.into_bytes()) {
error!("Failed to send input to task {task_id} with err {err:?}");
};
}
}
}

View file

@ -1,97 +0,0 @@
use log::{error, info, warn};
use pueue_lib::network::message::TaskSelection;
use pueue_lib::state::GroupStatus;
use pueue_lib::task::TaskStatus;
use crate::daemon::state_helper::{save_state, LockedState};
use crate::daemon::task_handler::{ProcessAction, Shutdown, TaskHandler};
use crate::ok_or_shutdown;
impl TaskHandler {
/// Start specific tasks or groups.
///
/// By default, this command only resumes tasks.
/// However, if specific task_ids are provided, tasks can actually be force-started.
/// Of course, they can only be started if they're in a valid status, i.e. Queued/Stashed.
pub fn start(&mut self, tasks: TaskSelection) {
let cloned_state_mutex = self.state.clone();
let mut state = cloned_state_mutex.lock().unwrap();
let task_ids = match tasks {
TaskSelection::TaskIds(task_ids) => {
// Start specific tasks.
// This is handled differently and results in an early return, as this branch is
// capable of force-spawning processes, instead of simply resuming tasks.
for task_id in task_ids {
// Continue all children that are simply paused
if self.children.has_child(task_id) {
self.continue_task(&mut state, task_id);
} else {
// Start processes for all tasks that haven't been started yet
self.start_process(task_id, &mut state);
}
}
ok_or_shutdown!(self, save_state(&state, &self.settings));
return;
}
TaskSelection::Group(group_name) => {
// Ensure that a given group exists. (Might not happen due to concurrency)
let group = match state.groups.get_mut(&group_name) {
Some(group) => group,
None => return,
};
// Set the group to running.
group.status = GroupStatus::Running;
info!("Resuming group {}", &group_name);
let filtered_tasks = state.filter_tasks_of_group(
|task| matches!(task.status, TaskStatus::Paused),
&group_name,
);
filtered_tasks.matching_ids
}
TaskSelection::All => {
// Resume all groups and the default queue
info!("Resuming everything");
state.set_status_for_all_groups(GroupStatus::Running);
self.children.all_task_ids()
}
};
// Resume all specified paused tasks
for task_id in task_ids {
self.continue_task(&mut state, task_id);
}
ok_or_shutdown!(self, save_state(&state, &self.settings));
}
/// Send a start signal to a paused task to continue execution.
fn continue_task(&mut self, state: &mut LockedState, task_id: usize) {
// Task doesn't exist
if !self.children.has_child(task_id) {
return;
}
// Task is already done
if state.tasks.get(&task_id).unwrap().is_done() {
return;
}
let success = match self.perform_action(task_id, ProcessAction::Resume) {
Err(err) => {
warn!("Failed to resume task {}: {:?}", task_id, err);
false
}
Ok(success) => success,
};
if success {
state.change_status(task_id, TaskStatus::Running);
}
}
}

View file

@ -1,277 +0,0 @@
use std::collections::{BTreeMap, HashMap};
use std::path::PathBuf;
use std::process::Child;
use std::process::Stdio;
use std::sync::mpsc::{Receiver, SendError, Sender};
use anyhow::Result;
use chrono::prelude::*;
use command_group::CommandGroup;
use handlebars::Handlebars;
use log::{debug, error, info};
use pueue_lib::log::*;
use pueue_lib::network::message::*;
use pueue_lib::network::protocol::socket_cleanup;
use pueue_lib::process_helper::*;
use pueue_lib::settings::Settings;
use pueue_lib::state::{GroupStatus, SharedState};
use pueue_lib::task::{Task, TaskResult, TaskStatus};
use crate::daemon::pid::cleanup_pid_file;
use crate::daemon::state_helper::{reset_state, save_state};
mod callback;
/// A helper newtype struct, which implements convenience methods for our child process management
/// datastructure.
mod children;
/// Logic for handling dependencies
mod dependencies;
/// Logic for finishing and cleaning up completed tasks.
mod finish_task;
/// This module contains all logic that's triggered by messages received via the mpsc channel.
/// These messages are sent by the threads that handle the client messages.
mod messages;
/// Everything regarding actually spawning task processes.
mod spawn_task;
use self::children::Children;
/// This is a little helper macro, which looks at a critical result and shuts the
/// TaskHandler down, if an error occurred. This is mostly used if the state cannot
/// be written due to IO errors.
/// Those errors are considered unrecoverable and we should initiate a graceful shutdown
/// immediately.
#[macro_export]
macro_rules! ok_or_shutdown {
($task_manager:ident, $result:expr) => {
match $result {
Err(err) => {
error!("Initializing graceful shutdown. Encountered error in TaskHandler: {err}");
$task_manager.initiate_shutdown(Shutdown::Emergency);
return;
}
Ok(inner) => inner,
}
};
}
/// Sender<TaskMessage> wrapper that takes Into<Message> as a convenience option
#[derive(Debug, Clone)]
pub struct TaskSender {
sender: Sender<Message>,
}
impl TaskSender {
pub fn new(sender: Sender<Message>) -> Self {
Self { sender }
}
#[inline]
pub fn send<T>(&self, message: T) -> Result<(), SendError<Message>>
where
T: Into<Message>,
{
self.sender.send(message.into())
}
}
pub struct TaskHandler {
/// The state that's shared between the TaskHandler and the message handling logic.
state: SharedState,
/// The receiver for the MPSC channel that's used to push notificatoins from our message
/// handling to the TaskHandler.
receiver: Receiver<Message>,
/// Pueue's subprocess and worker pool representation. Take a look at [Children] for more info.
children: Children,
/// These are the currently running callbacks. They're usually very short-lived.
callbacks: Vec<Child>,
/// A simple flag which is used to signal that we're currently doing a full reset of the daemon.
/// This flag prevents new tasks from being spawned.
full_reset: bool,
/// Whether we're currently in the process of a graceful shutdown.
/// Depending on the shutdown type, we're exiting with different exitcodes.
shutdown: Option<Shutdown>,
/// The settings that are passed at program start.
settings: Settings,
// Some static settings that are extracted from `settings` for convenience purposes.
pueue_directory: PathBuf,
}
impl TaskHandler {
pub fn new(shared_state: SharedState, settings: Settings, receiver: Receiver<Message>) -> Self {
// Clone the pointer, as we need to regularly access it inside the TaskHandler.
let state_clone = shared_state.clone();
let state = state_clone.lock().unwrap();
// Initialize the subprocess management structure.
let mut pools = BTreeMap::new();
for group in state.groups.keys() {
pools.insert(group.clone(), BTreeMap::new());
}
TaskHandler {
state: shared_state,
receiver,
children: Children(pools),
callbacks: Vec::new(),
full_reset: false,
shutdown: None,
pueue_directory: settings.shared.pueue_directory(),
settings,
}
}
/// Main loop of the task handler.
/// In here a few things happen:
///
/// - Receive and handle instructions from the client.
/// - Handle finished tasks, i.e. cleanup processes, update statuses.
/// - Callback handling logic. This is rather uncritical.
/// - Enqueue any stashed processes which are ready for being queued.
/// - Ensure tasks with dependencies have no failed ancestors
/// - Whether whe should perform a shutdown.
/// - If the client requested a reset: reset the state if all children have been killed and handled.
/// - Check whether we can spawn new tasks.
///
/// This first step waits for 200ms while receiving new messages.
/// This prevents this loop from running hot, but also means that we only check if a new task
/// can be scheduled or if tasks are finished, every 200ms.
pub fn run(&mut self) {
loop {
self.receive_messages();
self.handle_finished_tasks();
self.check_callbacks();
self.enqueue_delayed_tasks();
self.check_failed_dependencies();
if self.shutdown.is_some() {
// Check if we're in shutdown.
// If all tasks are killed, we do some cleanup and exit.
self.handle_shutdown();
} else if self.full_reset {
// Wait until all tasks are killed.
// Once they are, reset everything and go back to normal
self.handle_reset();
} else {
// Only start new tasks, if we aren't in the middle of a reset or shutdown.
self.spawn_new();
}
}
}
/// Initiate shutdown, which includes killing all children and pausing all groups.
/// We don't have to pause any groups, as no new tasks will be spawned during shutdown anyway.
/// Any groups with queued tasks, will be automatically paused on state-restoration.
fn initiate_shutdown(&mut self, shutdown: Shutdown) {
self.shutdown = Some(shutdown);
self.kill(TaskSelection::All, false, None);
}
/// Check if all tasks are killed.
/// If they aren't, we'll wait a little longer.
/// Once they're, we do some cleanup and exit.
fn handle_shutdown(&mut self) {
// There are still active tasks. Continue waiting until they're killed and cleaned up.
if self.children.has_active_tasks() {
return;
}
// Lock the state. This prevents any further connections/alterations from this point on.
let _state = self.state.lock().unwrap();
// Remove the unix socket.
if let Err(error) = socket_cleanup(&self.settings.shared) {
println!("Failed to cleanup socket during shutdown.");
println!("{error}");
}
// Cleanup the pid file
if let Err(error) = cleanup_pid_file(&self.settings.shared.pid_path()) {
println!("Failed to cleanup pid during shutdown.");
println!("{error}");
}
// Actually exit the program the way we're supposed to.
// Depending on the current shutdown type, we exit with different exit codes.
if matches!(self.shutdown, Some(Shutdown::Emergency)) {
std::process::exit(1);
}
std::process::exit(0);
}
/// Users can issue to reset the daemon.
/// If that's the case, the `self.full_reset` flag is set to true, all children are killed
/// and no new tasks will be spawned.
/// This function checks, if all killed children have been handled.
/// If that's the case, completely reset the state
fn handle_reset(&mut self) {
// Don't do any reset logic, if we aren't in reset mode or if some children are still up.
if self.children.has_active_tasks() {
return;
}
let mut state = self.state.lock().unwrap();
if let Err(error) = reset_state(&mut state, &self.settings) {
error!("Failed to reset state with error: {error:?}");
};
if let Err(error) = reset_task_log_directory(&self.pueue_directory) {
panic!("Error while resetting task log directory: {error}");
};
self.full_reset = false;
}
/// Kill all children by using the `kill` function.
/// Set the respective group's statuses to `Reset`. This will prevent new tasks from being spawned.
fn reset(&mut self) {
self.full_reset = true;
self.kill(TaskSelection::All, false, None);
}
/// As time passes, some delayed tasks may need to be enqueued.
/// Gather all stashed tasks and enqueue them if it is after the task's enqueue_at
fn enqueue_delayed_tasks(&mut self) {
let state_clone = self.state.clone();
let mut state = state_clone.lock().unwrap();
let mut changed = false;
for (_, task) in state.tasks.iter_mut() {
if let TaskStatus::Stashed {
enqueue_at: Some(time),
} = task.status
{
if time <= Local::now() {
info!("Enqueuing delayed task : {}", task.id);
task.status = TaskStatus::Queued;
task.enqueued_at = Some(Local::now());
changed = true;
}
}
}
// Save the state if a task has been enqueued
if changed {
ok_or_shutdown!(self, save_state(&state, &self.settings));
}
}
/// This is a small wrapper around the real platform dependant process handling logic
/// It only ensures, that the process we want to manipulate really does exists.
fn perform_action(&mut self, id: usize, action: ProcessAction) -> Result<bool> {
match self.children.get_child_mut(id) {
Some(child) => {
debug!("Executing action {action:?} to {id}");
send_signal_to_child(child, &action)?;
Ok(true)
}
None => {
error!("Tried to execute action {action:?} to non existing task {id}");
Ok(false)
}
}
}
}

View file

@ -1,212 +0,0 @@
use std::io::Write;
use super::*;
use crate::daemon::state_helper::{pause_on_failure, save_state, LockedState};
use crate::ok_or_shutdown;
impl TaskHandler {
/// See if we can start a new queued task.
pub fn spawn_new(&mut self) {
let cloned_state_mutex = self.state.clone();
let mut state = cloned_state_mutex.lock().unwrap();
// Check whether a new task can be started.
// Spawn tasks until we no longer have free slots available.
while let Some(id) = self.get_next_task_id(&state) {
self.start_process(id, &mut state);
}
}
/// Search and return the next task that can be started.
/// Precondition for a task to be started:
/// - is in Queued state
/// - There are free slots in the task's group
/// - The group is running
/// - has all its dependencies in `Done` state
///
/// Order at which tasks are picked (descending relevancy):
/// - Task with highest priority first
/// - Task with lowest ID first
pub fn get_next_task_id(&mut self, state: &LockedState) -> Option<usize> {
// Get all tasks that could theoretically be started right now.
let mut potential_tasks: Vec<&Task> = state
.tasks
.iter()
.filter(|(_, task)| task.status == TaskStatus::Queued)
.filter(|(_, task)| {
// Make sure the task is assigned to an existing group.
let group = match state.groups.get(&task.group) {
Some(group) => group,
None => {
error!(
"Got task with unknown group {}. Please report this!",
&task.group
);
return false;
}
};
// Let's check if the group is running. If it isn't, simply return false.
if group.status != GroupStatus::Running {
return false;
}
// If parallel tasks are set to `0`, this means an unlimited amount of tasks may
// run at any given time.
if group.parallel_tasks == 0 {
return true;
}
// Get the currently running tasks by looking at the actually running processes.
// They're sorted by group, which makes this quite convenient.
let running_tasks = match self.children.0.get(&task.group) {
Some(children) => children.len(),
None => {
error!(
"Got valid group {}, but no worker pool has been initialized. This is a bug!",
&task.group
);
return false
}
};
// Make sure there are free slots in the task's group
running_tasks < group.parallel_tasks
})
.filter(|(_, task)| {
// Check whether all dependencies for this task are fulfilled.
task.dependencies
.iter()
.flat_map(|id| state.tasks.get(id))
.all(|task| matches!(task.status, TaskStatus::Done(TaskResult::Success)))
})
.map(|(_, task)| {task})
.collect();
// Order the tasks based on their priortiy and their task id.
// Tasks with higher priority go first.
// Tasks with the same priority are ordered by their id in ascending order, meaning that
// tasks with smaller id will be processed first.
potential_tasks.sort_by(|a, b| {
// If they have the same prio, decide the execution order by task_id!
if a.priority == b.priority {
return a.id.cmp(&b.id);
}
// Otherwise, let the priority decide.
b.priority.cmp(&a.priority)
});
// Return the id of the first task (if one has been found).
potential_tasks.first().map(|task| task.id)
}
/// Actually spawn a new sub process
/// The output of subprocesses is piped into a separate file for easier access
pub fn start_process(&mut self, task_id: usize, state: &mut LockedState) {
// Check if the task exists and can actually be spawned. Otherwise do an early return.
match state.tasks.get(&task_id) {
Some(task) => {
if !matches!(
&task.status,
TaskStatus::Stashed { .. } | TaskStatus::Queued | TaskStatus::Paused
) {
info!("Tried to start task with status: {}", task.status);
return;
}
}
None => {
info!("Tried to start non-existing task: {task_id}");
return;
}
};
// Try to get the log file to which the output of the process will be written to.
// Panic if this doesn't work! This is unrecoverable.
let (stdout_log, stderr_log) = match create_log_file_handles(task_id, &self.pueue_directory)
{
Ok((out, err)) => (out, err),
Err(err) => {
panic!("Failed to create child log files: {err:?}");
}
};
// Get all necessary info for starting the task
let (command, path, group, mut envs) = {
let task = state.tasks.get(&task_id).unwrap();
(
task.command.clone(),
task.path.clone(),
task.group.clone(),
task.envs.clone(),
)
};
// Build the shell command that should be executed.
let mut command = compile_shell_command(&self.settings, &command);
// Determine the worker's id depending on the current group.
// Inject that info into the environment.
let worker_id = self.children.get_next_group_worker(&group);
envs.insert("PUEUE_GROUP".into(), group.clone());
envs.insert("PUEUE_WORKER_ID".into(), worker_id.to_string());
// Spawn the actual subprocess
let spawned_command = command
.current_dir(path)
.stdin(Stdio::piped())
.env_clear()
.envs(envs.clone())
.stdout(Stdio::from(stdout_log))
.stderr(Stdio::from(stderr_log))
.group_spawn();
// Check if the task managed to spawn
let child = match spawned_command {
Ok(child) => child,
Err(err) => {
let error = format!("Failed to spawn child {task_id} with err: {err:?}");
error!("{}", error);
// Write some debug log output to the task's log file.
// This should always work, but print a datailed error if it didn't work.
if let Ok(mut file) = get_writable_log_file_handle(task_id, &self.pueue_directory) {
let log_output =
format!("Pueue error, failed to spawn task. Check your command.\n{error}");
let write_result = file.write_all(log_output.as_bytes());
if let Err(write_err) = write_result {
error!("Failed to write spawn error to task log: {}", write_err);
}
}
// Update all necessary fields on the task.
let group = {
let task = state.tasks.get_mut(&task_id).unwrap();
task.status = TaskStatus::Done(TaskResult::FailedToSpawn(error));
task.start = Some(Local::now());
task.end = Some(Local::now());
self.spawn_callback(task);
task.group.clone()
};
pause_on_failure(state, &self.settings, &group);
ok_or_shutdown!(self, save_state(state, &self.settings));
return;
}
};
// Save the process handle in our self.children datastructure.
self.children.add_child(&group, worker_id, task_id, child);
let task = state.tasks.get_mut(&task_id).unwrap();
task.start = Some(Local::now());
task.status = TaskStatus::Running;
// Overwrite the task's environment variables with the new ones, containing the
// PUEUE_WORKER_ID and PUEUE_GROUP variables.
task.envs = envs;
info!("Started task: {}", task.command);
ok_or_shutdown!(self, save_state(state, &self.settings));
}
}

View file

@ -1,4 +1,4 @@
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use crate::helper::*;
@ -19,7 +19,10 @@ async fn test_ctrlc() -> Result<()> {
sleep_ms(500).await;
let result = child.try_wait();
assert!(matches!(result, Ok(Some(_))));
if !matches!(result, Ok(Some(_))) {
println!("Got error when sending SIGTERM to daemon. {result:?}");
bail!("Daemon process crashed after sending SIGTERM.");
}
let code = result.unwrap().unwrap();
assert!(matches!(code.code(), Some(0)));

View file

@ -12,6 +12,7 @@ use tokio::io::{self, AsyncWriteExt};
use pueue::daemon::run;
use pueue_lib::settings::*;
use tokio::task::JoinHandle;
use crate::helper::*;
@ -22,6 +23,18 @@ pub struct PueueDaemon {
pub tempdir: TempDir,
#[allow(dead_code)]
pub pid: i32,
// The async join handle of the daemon function.
// Can be used to abort the daemon manually.
pub join_handle: JoinHandle<Result<()>>,
}
/// Implement a custom drop for the Daemon test struct
impl Drop for PueueDaemon {
fn drop(&mut self) {
// The daemon runs in background tokio task.
// Use this handle to make sure that it gets always killed.
self.join_handle.abort_handle();
}
}
/// A helper function which creates some test config, sets up a temporary directory and spawns
@ -38,12 +51,13 @@ pub async fn daemon() -> Result<PueueDaemon> {
pub async fn daemon_with_settings(settings: Settings, tempdir: TempDir) -> Result<PueueDaemon> {
// Uncomment the next line to get some daemon logging.
// Ignore any logger initialization errors, as multiple loggers will be initialized.
//let _ = simplelog::SimpleLogger::init(log::LevelFilter::Debug, simplelog::Config::default());
// let _ =
// simplelog::SimpleLogger::init(simplelog::LevelFilter::Debug, simplelog::Config::default());
let pueue_dir = tempdir.path();
let path = pueue_dir.to_path_buf();
// Start/spin off the daemon and get its PID
tokio::spawn(run_and_handle_error(path, true));
let join_handle = tokio::spawn(run_and_handle_error(path, true));
let pid = get_pid(&settings.shared.pid_path()).await?;
let sleep = 50;
@ -60,6 +74,7 @@ pub async fn daemon_with_settings(settings: Settings, tempdir: TempDir) -> Resul
settings,
tempdir,
pid,
join_handle,
});
}

View file

@ -6,6 +6,7 @@ use std::collections::BTreeMap;
///
/// The datastructure represents the following data:
/// BTreeMap<group_name, BTreeMap<group_worker_id, (task_id, subprocess_handle)>
#[derive(Debug, Default)]
pub struct Children(pub BTreeMap<String, BTreeMap<usize, (usize, GroupChild)>>);
impl Children {

View file

@ -3,6 +3,9 @@
/// Shared module for internal logic!
/// Contains helper for command aliasing.
pub mod aliasing;
/// A helper newtype struct, which implements convenience methods for our child process management
/// datastructure.
pub mod children;
/// Pueue lib's own Error implementation.
pub mod error;
/// Helper classes to read and write log files of Pueue's tasks.

View file

@ -21,6 +21,22 @@ macro_rules! impl_into_message {
};
}
/// Macro to simplify creating success_messages
#[macro_export]
macro_rules! success_msg {
($($arg:tt)*) => {{
create_success_message(format!($($arg)*))
}}
}
/// Macro to simplify creating failure_messages
#[macro_export]
macro_rules! failure_msg {
($($arg:tt)*) => {{
create_failure_message(format!($($arg)*))
}}
}
/// This is the main message enum. \
/// Everything that's send between the daemon and a client can be represented by this enum.
#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]

View file

@ -9,7 +9,6 @@ pub fn process_exists(pid: u32) -> bool {
pub mod tests {
/// Get all processes in a process group
pub fn get_process_group_pids(pgrp: i32) -> Vec<i32> {
/// TODO
return {};
}
}

View file

@ -89,7 +89,6 @@ pub fn kill_child(task_id: usize, child: &mut GroupChild) -> std::io::Result<()>
}
/// Get current task pid, all child pid and all children's children
/// TODO: see if this can be simplified using QueryInformationJobObject
/// on the job object created by command_group.
fn get_cur_task_processes(task_pid: u32) -> Vec<u32> {
let mut all_pids = Vec::new();

View file

@ -1,9 +1,12 @@
use std::collections::BTreeMap;
use std::process::Child;
use std::sync::{Arc, Mutex};
use serde_derive::{Deserialize, Serialize};
use crate::children::Children;
use crate::error::Error;
use crate::network::message::Shutdown;
use crate::task::{Task, TaskStatus};
pub const PUEUE_DEFAULT_GROUP: &str = "default";
@ -48,17 +51,53 @@ pub struct Group {
/// The daemon uses the state as a piece of shared memory between it's threads.
/// It's wrapped in a MutexGuard, which allows us to guarantee sequential access to any crucial
/// information, such as status changes and incoming commands by the client.
#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct State {
/// All tasks currently managed by the daemon.
pub tasks: BTreeMap<usize, Task>,
/// All groups with their current state a configuration.
pub groups: BTreeMap<String, Group>,
/// Whether we're currently in the process of a graceful shutdown.
/// Depending on the shutdown type, we're exiting with different exitcodes.
/// This is runtime state and won't be serialised to disk.
#[serde(default, skip)]
pub shutdown: Option<Shutdown>,
/// A simple flag which is used to signal that we're currently doing a full reset of the daemon.
/// This flag prevents new tasks from being spawned.
/// This is runtime state and won't be serialised to disk.
#[serde(default, skip)]
pub full_reset: bool,
/// Pueue's subprocess and worker pool representation.
/// Take a look at [Children] for more info.
/// This is runtime state and won't be serialised to disk.
#[serde(default, skip)]
pub children: Children,
/// These are the currently running callbacks. They're usually very short-lived.
#[serde(default, skip)]
pub callbacks: Vec<Child>,
}
impl Default for State {
fn default() -> Self {
Self::new()
// Implement a custom Clone, as the child processes don't implement Clone.
impl Clone for State {
fn clone(&self) -> Self {
State {
tasks: self.tasks.clone(),
groups: self.groups.clone(),
shutdown: self.shutdown.clone(),
full_reset: self.full_reset,
..Default::default()
}
}
}
// Implement a custom PartialEq, as the child processes don't ipmlement PartialEq.
impl Eq for State {}
impl PartialEq for State {
fn eq(&self, other: &Self) -> bool {
self.tasks == other.tasks
&& self.groups == other.groups
&& self.shutdown == other.shutdown
&& self.full_reset == other.full_reset
}
}
@ -76,6 +115,7 @@ impl State {
let mut state = State {
tasks: BTreeMap::new(),
groups: BTreeMap::new(),
..Default::default()
};
state.create_group(PUEUE_DEFAULT_GROUP);
state