mirror of
https://github.com/nukesor/pueue
synced 2024-10-07 00:09:10 +00:00
Code and comment refactoring
This commit is contained in:
parent
818d95fc15
commit
dafa4623ee
|
@ -8,13 +8,13 @@ use ::structopt::StructOpt;
|
|||
|
||||
#[derive(StructOpt, Debug)]
|
||||
pub enum SubCommand {
|
||||
/// Enqueue a task for execution
|
||||
/// Enqueue a task for execution.
|
||||
Add {
|
||||
/// The command that should be added.
|
||||
#[structopt()]
|
||||
command: Vec<String>,
|
||||
|
||||
/// Start the task immediately
|
||||
/// Start the task immediately.
|
||||
#[structopt(name = "immediate", short, long, conflicts_with = "stash")]
|
||||
start_immediately: bool,
|
||||
|
||||
|
@ -28,33 +28,33 @@ pub enum SubCommand {
|
|||
delay_until: Option<DateTime<Local>>,
|
||||
|
||||
/// Assign the task to a group. Groups kind of act as separate queues.
|
||||
/// I.e. all groups run in parallel and you can specify the amount of parallel tasks for each group
|
||||
/// I.e. all groups run in parallel and you can specify the amount of parallel tasks for each group.
|
||||
/// If no group is specified, the default group will be used.
|
||||
#[structopt(name = "group", short, long)]
|
||||
group: Option<String>,
|
||||
|
||||
/// Start the task once all specified tasks have successfully finished.
|
||||
/// As soon as one of the dependencies fails, this task will fail as well
|
||||
/// As soon as one of the dependencies fails, this task will fail as well.
|
||||
#[structopt(name = "after", short, long)]
|
||||
dependencies: Vec<usize>,
|
||||
},
|
||||
/// Remove tasks from the list.
|
||||
/// Running or paused tasks need to be killed first.
|
||||
Remove {
|
||||
/// The task ids to be removed
|
||||
/// The task ids to be removed.
|
||||
task_ids: Vec<usize>,
|
||||
},
|
||||
/// Switches the queue position of two commands. Only works on queued and stashed commands.
|
||||
Switch {
|
||||
/// The first task id
|
||||
/// The first task id.
|
||||
task_id_1: usize,
|
||||
/// The second task id
|
||||
/// The second task id.
|
||||
task_id_2: usize,
|
||||
},
|
||||
/// Stashed tasks won't be automatically started.
|
||||
/// Either `enqueue` them, to be normally handled or explicitly `start` them.
|
||||
Stash {
|
||||
/// The id(s) of the tasks you want to stash
|
||||
/// The id(s) of the tasks you want to stash.
|
||||
task_ids: Vec<usize>,
|
||||
},
|
||||
/// Enqueue stashed tasks. They'll be handled normally afterwards.
|
||||
|
@ -81,10 +81,10 @@ pub enum SubCommand {
|
|||
3600s // 3600 seconds from now
|
||||
")]
|
||||
Enqueue {
|
||||
/// The id(s) of the tasks you want to enqueue
|
||||
/// The id(s) of the tasks you want to enqueue.
|
||||
task_ids: Vec<usize>,
|
||||
|
||||
/// Delay enqueuing the tasks until <delay> elapses. See DELAY FORMAT below
|
||||
/// Delay enqueuing the tasks until <delay> elapses. See DELAY FORMAT below.
|
||||
#[structopt(name = "delay", short, long, parse(try_from_str=parse_delay_until))]
|
||||
delay_until: Option<DateTime<Local>>,
|
||||
},
|
||||
|
@ -103,7 +103,7 @@ pub enum SubCommand {
|
|||
#[structopt()]
|
||||
task_ids: Vec<usize>,
|
||||
|
||||
/// Start the task(s) immediately
|
||||
/// Start the task(s) immediately.
|
||||
#[structopt(name = "immediate", short, long)]
|
||||
start_immediately: bool,
|
||||
|
||||
|
@ -137,51 +137,51 @@ pub enum SubCommand {
|
|||
task_ids: Vec<usize>,
|
||||
},
|
||||
|
||||
/// Send something to a task. Useful for sending confirmations ('y\n')
|
||||
/// Send something to a task. Useful for sending confirmations ('y\n').
|
||||
Send {
|
||||
/// The id of the task
|
||||
/// The id of the task.
|
||||
task_id: usize,
|
||||
|
||||
/// The input that should be sent to the process
|
||||
/// The input that should be sent to the process.
|
||||
input: String,
|
||||
},
|
||||
/// Edit the command or the path of a stashed or queued task.
|
||||
Edit {
|
||||
/// The id of the task
|
||||
/// The id of the task.
|
||||
task_id: usize,
|
||||
|
||||
/// Edit the path of the task
|
||||
/// Edit the path of the task.
|
||||
#[structopt(short, long)]
|
||||
path: bool,
|
||||
},
|
||||
|
||||
/// Display the current status of all tasks
|
||||
/// Display the current status of all tasks.
|
||||
Status {
|
||||
/// Print the current state as json to stdout
|
||||
/// Print the current state as json to stdout.
|
||||
/// This doesn't include stdout/stderr of tasks.
|
||||
/// Use `log -j` if you want everything
|
||||
/// Use `log -j` if you want everything.
|
||||
#[structopt(short, long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Display the log output of finished tasks
|
||||
/// Display the log output of finished tasks.
|
||||
Log {
|
||||
/// Specify for which specific tasks you want to see the output
|
||||
/// Specify for which specific tasks you want to see the output.
|
||||
#[structopt()]
|
||||
task_ids: Vec<usize>,
|
||||
/// Print the current state as json
|
||||
/// Includes EVERYTHING
|
||||
/// Print the current state as json.
|
||||
/// Includes EVERYTHING.
|
||||
#[structopt(short, long)]
|
||||
json: bool,
|
||||
},
|
||||
/// Show the output of a currently running task
|
||||
/// This command allows following (like `tail -f`)
|
||||
/// Show the output of a currently running task.
|
||||
/// This command allows following (like `tail -f`).
|
||||
Show {
|
||||
/// The id of the task
|
||||
/// The id of the task.
|
||||
task_id: usize,
|
||||
/// Continuously print stdout (like `tail -f`)
|
||||
/// Continuously print stdout (like `tail -f`).
|
||||
#[structopt(short, long)]
|
||||
follow: bool,
|
||||
/// Like -f, but shows stderr instead of stdeout.
|
||||
/// Like -f, but shows stderr instead of stdout.
|
||||
#[structopt(short, long)]
|
||||
err: bool,
|
||||
},
|
||||
|
@ -192,22 +192,22 @@ pub enum SubCommand {
|
|||
/// Remotely shut down the daemon. Should only be used if the daemon isn't started by a service manager.
|
||||
Shutdown,
|
||||
|
||||
/// Set the amount of allowed parallel tasks
|
||||
/// Set the amount of allowed parallel tasks.
|
||||
Parallel {
|
||||
/// The amount of allowed parallel tasks
|
||||
/// The amount of allowed parallel tasks.
|
||||
#[structopt(validator=min_one)]
|
||||
parallel_tasks: usize,
|
||||
|
||||
/// Specify the amount of parallel tasks for this group
|
||||
/// Specify the amount of parallel tasks for this group.
|
||||
#[structopt(name = "group", short, long)]
|
||||
group: Option<String>,
|
||||
},
|
||||
/// Generates shell completion files.
|
||||
/// Ingore for normal operations
|
||||
/// Ingore for normal operations.
|
||||
Completions {
|
||||
/// The target shell. Can be `bash`, `fish`, `powershell`, `elvish` and `zsh`.
|
||||
shell: Shell,
|
||||
/// The output directory to which the file should be written
|
||||
/// The output directory to which the file should be written.
|
||||
output_directory: PathBuf,
|
||||
},
|
||||
}
|
||||
|
@ -219,7 +219,7 @@ pub enum SubCommand {
|
|||
author = "Arne Beer <contact@arne.beer>"
|
||||
)]
|
||||
pub struct Opt {
|
||||
// The number of occurrences of the `v/verbose` flag
|
||||
// The number of occurrences of the `v/verbose` flag.
|
||||
/// Verbose mode (-v, -vv, -vvv)
|
||||
#[structopt(short, long, parse(from_occurrences))]
|
||||
pub verbose: u8,
|
||||
|
@ -227,7 +227,7 @@ pub struct Opt {
|
|||
// /// The url for the daemon. Overwrites the address in the config file
|
||||
// #[structopt(short, long)]
|
||||
// pub address: Option<String>,
|
||||
/// The port for the daemon. Overwrites the port in the config file
|
||||
/// The port for the daemon. Overwrites the port in the config file.
|
||||
#[structopt(short, long)]
|
||||
pub port: Option<String>,
|
||||
|
||||
|
@ -250,17 +250,17 @@ fn parse_delay_until(src: &str) -> Result<DateTime<Local>, String> {
|
|||
))
|
||||
}
|
||||
|
||||
|
||||
/// Validator function. The input string has to be parsable as int and bigger than 0
|
||||
fn min_one(value: String) -> Result<(), String> {
|
||||
match value.parse::<i32>() {
|
||||
match value.parse::<usize>() {
|
||||
Ok(value) => {
|
||||
if value < 1 {
|
||||
return Err("You must provide a value thats bigger than 1".into());
|
||||
return Err("You must provide a value that's bigger than 0".into());
|
||||
}
|
||||
return Ok(())
|
||||
},
|
||||
return Ok(());
|
||||
}
|
||||
Err(_) => {
|
||||
return Err("Failed to parse integer".into());
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,13 +10,13 @@ use ::pueue::message::*;
|
|||
use ::pueue::protocol::*;
|
||||
use ::pueue::settings::Settings;
|
||||
|
||||
/// Representation of a client
|
||||
/// For convenience purposes this logic has been wrapped in a struct
|
||||
/// Representation of a client.
|
||||
/// For convenience purposes this logic has been wrapped in a struct.
|
||||
/// The client is responsible for connecting to the daemon, sending an instruction
|
||||
/// and interpreting the response
|
||||
/// and interpreting the response.
|
||||
///
|
||||
/// Most commands are a simple ping-pong. Though, some commands require a more complex
|
||||
/// communication pattern (e.g. `show -f`, which contiuously streams the output of a task)
|
||||
/// communication pattern (e.g. `show -f`, which contiuously streams the output of a task).
|
||||
pub struct Client {
|
||||
opt: Opt,
|
||||
daemon_address: String,
|
||||
|
|
|
@ -7,10 +7,10 @@ use ::pueue::message::*;
|
|||
|
||||
use crate::cli::SubCommand;
|
||||
|
||||
/// This function allows the user to edit a task's command or path
|
||||
/// Save the string to a temporary file, which is the edited by the user with $EDITOR
|
||||
/// This function allows the user to edit a task's command or path.
|
||||
/// Save the string to a temporary file, which is the edited by the user with $EDITOR.
|
||||
/// As soon as the editor is closed, read the file content and return the
|
||||
/// final edit message with the updated command to the daemon
|
||||
/// final edit message with the updated command to the daemon.
|
||||
pub fn edit(message: EditResponseMessage, cli_command: &SubCommand) -> Message {
|
||||
let edit_path = match cli_command {
|
||||
SubCommand::Edit { task_id: _, path } => *path,
|
||||
|
@ -28,18 +28,18 @@ pub fn edit(message: EditResponseMessage, cli_command: &SubCommand) -> Message {
|
|||
command.clone()
|
||||
};
|
||||
|
||||
// Create a temporary file with the command, vim can edit
|
||||
// Create a temporary file with the command so we can edit it with the editor.
|
||||
let mut file = NamedTempFile::new().expect("Failed to create a temporary file");
|
||||
writeln!(file, "{}", to_edit).expect("Failed writing to temporary file");
|
||||
|
||||
// Start the editor on this file
|
||||
// Start the editor on this file.
|
||||
let editor = &env::var("EDITOR").unwrap_or_else(|_e| "vi".to_string());
|
||||
Command::new(editor)
|
||||
.arg(file.path())
|
||||
.status()
|
||||
.expect("Failed to start editor");
|
||||
|
||||
// Read the file
|
||||
// Read the file.
|
||||
let mut file = file.into_file();
|
||||
file.seek(SeekFrom::Start(0))
|
||||
.expect("Couldn't seek to start of file. Aborting.");
|
||||
|
@ -47,7 +47,7 @@ pub fn edit(message: EditResponseMessage, cli_command: &SubCommand) -> Message {
|
|||
file.read_to_string(&mut to_edit)
|
||||
.expect("Failed to read Command after editing");
|
||||
|
||||
// Remove any trailing newlines from the command
|
||||
// Remove any trailing newlines from the command.
|
||||
while to_edit.ends_with('\n') || to_edit.ends_with('\r') {
|
||||
to_edit.pop();
|
||||
}
|
||||
|
|
|
@ -23,10 +23,10 @@ async fn main() -> Result<()> {
|
|||
println!("{:?}", save_result.err());
|
||||
}
|
||||
|
||||
// Parse commandline options
|
||||
// Parse commandline options.
|
||||
let opt = Opt::from_args();
|
||||
|
||||
// Set the verbosity level for the client app
|
||||
// Set the verbosity level for the client app.
|
||||
if opt.verbose >= 3 {
|
||||
SimpleLogger::init(LevelFilter::Debug, Config::default())?;
|
||||
} else if opt.verbose == 2 {
|
||||
|
@ -48,7 +48,7 @@ async fn main() -> Result<()> {
|
|||
}
|
||||
|
||||
// Create the message that should be sent to the daemon
|
||||
// depending on the given commandline options
|
||||
// depending on the given commandline options.
|
||||
let message = get_message_from_opt(&opt, &settings)?;
|
||||
let mut client = Client::new(settings, message, opt)?;
|
||||
client.run().await?;
|
||||
|
|
|
@ -7,7 +7,7 @@ use ::pueue::settings::Settings;
|
|||
use crate::cli::{Opt, SubCommand};
|
||||
|
||||
// Convert and pre-process the sub-command into a valid message
|
||||
// that can be understood by the daemon
|
||||
// that can be understood by the daemon.
|
||||
pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result<Message> {
|
||||
match &opt.cmd {
|
||||
SubCommand::Add {
|
||||
|
@ -114,13 +114,16 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result<Message> {
|
|||
SubCommand::Clean => Ok(Message::Clean),
|
||||
SubCommand::Reset => Ok(Message::Reset),
|
||||
SubCommand::Shutdown => Ok(Message::DaemonShutdown),
|
||||
SubCommand::Parallel { parallel_tasks, group } => {
|
||||
SubCommand::Parallel {
|
||||
parallel_tasks,
|
||||
group,
|
||||
} => {
|
||||
let message = ParallelMessage {
|
||||
parallel_tasks: *parallel_tasks,
|
||||
group: group.clone(),
|
||||
};
|
||||
Ok(Message::Parallel(message))
|
||||
},
|
||||
}
|
||||
SubCommand::Completions {
|
||||
shell: _,
|
||||
output_directory: _,
|
||||
|
|
|
@ -24,7 +24,7 @@ pub fn print_error(message: String) {
|
|||
println!("{}", styled);
|
||||
}
|
||||
|
||||
/// Print the current state of the daemon in a nicely formatted table
|
||||
/// Print the current state of the daemon in a nicely formatted table.
|
||||
pub fn print_state(state: State, cli_command: &SubCommand) {
|
||||
let json = match cli_command {
|
||||
SubCommand::Status { json } => *json,
|
||||
|
@ -34,13 +34,13 @@ pub fn print_state(state: State, cli_command: &SubCommand) {
|
|||
),
|
||||
};
|
||||
|
||||
// If the json flag is specified, print the state as json and exit
|
||||
// If the json flag is specified, print the state as json and exit.
|
||||
if json {
|
||||
println!("{}", serde_json::to_string(&state).unwrap());
|
||||
return;
|
||||
}
|
||||
|
||||
// Print the current daemon state
|
||||
// Print the current daemon state.
|
||||
if state.running {
|
||||
println!("{}", style("Daemon status: running").with(Color::Green));
|
||||
} else {
|
||||
|
@ -54,21 +54,21 @@ pub fn print_state(state: State, cli_command: &SubCommand) {
|
|||
}
|
||||
|
||||
// Check whether there are any delayed tasks.
|
||||
// In case there are, we need to add another column to the table
|
||||
// In case there are, we need to add another column to the table.
|
||||
let has_delayed_tasks = state
|
||||
.tasks
|
||||
.iter()
|
||||
.any(|(_id, task)| task.enqueue_at.is_some());
|
||||
|
||||
// Check whether there are any tasks with dependencies.
|
||||
// In case there are, we need to add another column to the table
|
||||
// In case there are, we need to add another column to the table.
|
||||
let has_dependencies = state
|
||||
.tasks
|
||||
.iter()
|
||||
.any(|(_id, task)| !task.dependencies.is_empty());
|
||||
|
||||
// Check whether there are any tasks with dependencies.
|
||||
// In case there are, we need to add another column to the table
|
||||
// Check whether there are any tasks with a custom group.
|
||||
// In case there are, we need to add another column to the table.
|
||||
let has_group = state.tasks.iter().any(|(_id, task)| task.group.is_some());
|
||||
|
||||
// Create table header row
|
||||
|
@ -90,19 +90,19 @@ pub fn print_state(state: State, cli_command: &SubCommand) {
|
|||
Cell::new("End"),
|
||||
]);
|
||||
|
||||
// Initialize comfy table
|
||||
// Initialize comfy table.
|
||||
let mut table = Table::new();
|
||||
table
|
||||
.set_content_arrangement(ContentArrangement::Dynamic)
|
||||
.load_preset(UTF8_HORIZONTAL_BORDERS_ONLY)
|
||||
.set_header(headers);
|
||||
|
||||
// Add rows one by one
|
||||
// Add rows one by one.
|
||||
for (id, task) in state.tasks {
|
||||
let mut row = Row::new();
|
||||
row.add_cell(Cell::new(&id.to_string()));
|
||||
|
||||
// Determine the human readable task status representation and the respective color
|
||||
// Determine the human readable task status representation and the respective color.
|
||||
let status_string = task.status.to_string();
|
||||
let (status_text, color) = match task.status {
|
||||
TaskStatus::Running => (status_string, Color::Green),
|
||||
|
@ -144,7 +144,7 @@ pub fn print_state(state: State, cli_command: &SubCommand) {
|
|||
}
|
||||
}
|
||||
|
||||
// Match the color of the exit code
|
||||
// Match the color of the exit code.
|
||||
// If the exit_code is none, it has been killed by the task handler.
|
||||
let exit_code_cell = match task.result {
|
||||
Some(TaskResult::Success) => Cell::new("0").fg(Color::Green),
|
||||
|
@ -153,11 +153,11 @@ pub fn print_state(state: State, cli_command: &SubCommand) {
|
|||
};
|
||||
row.add_cell(exit_code_cell);
|
||||
|
||||
// Add command and path
|
||||
// Add command and path.
|
||||
row.add_cell(Cell::new(&task.command));
|
||||
row.add_cell(Cell::new(&task.path));
|
||||
|
||||
// Add start time, if already set
|
||||
// Add start time, if already set.
|
||||
if let Some(start) = task.start {
|
||||
let formatted = start.format("%H:%M").to_string();
|
||||
row.add_cell(Cell::new(&formatted));
|
||||
|
@ -165,7 +165,7 @@ pub fn print_state(state: State, cli_command: &SubCommand) {
|
|||
row.add_cell(Cell::new(""));
|
||||
}
|
||||
|
||||
// Add finish time, if already set
|
||||
// Add finish time, if already set.
|
||||
if let Some(end) = task.end {
|
||||
let formatted = end.format("%H:%M").to_string();
|
||||
row.add_cell(Cell::new(&formatted));
|
||||
|
@ -176,7 +176,7 @@ pub fn print_state(state: State, cli_command: &SubCommand) {
|
|||
table.add_row(row);
|
||||
}
|
||||
|
||||
// Print the table
|
||||
// Print the table.
|
||||
println!("{}", table);
|
||||
}
|
||||
|
||||
|
@ -214,7 +214,7 @@ pub fn print_logs(
|
|||
while let Some((_, mut task_log)) = task_iter.next() {
|
||||
print_log(&mut task_log, settings);
|
||||
|
||||
// Add a newline if there is another task that's going to be printed
|
||||
// Add a newline if there is another task that's going to be printed.
|
||||
if let Some((_, task_log)) = task_iter.peek() {
|
||||
if task_log.task.status == TaskStatus::Done {
|
||||
println!();
|
||||
|
@ -226,12 +226,12 @@ pub fn print_logs(
|
|||
/// Print the log of a single task.
|
||||
pub fn print_log(task_log: &mut TaskLogMessage, settings: &Settings) {
|
||||
let task = &task_log.task;
|
||||
// We only show logs of finished tasks
|
||||
// We only show logs of finished tasks.
|
||||
if task.status != TaskStatus::Done {
|
||||
return;
|
||||
}
|
||||
|
||||
// Print task id and exit code
|
||||
// Print task id and exit code.
|
||||
let task_text = style(format!("Task {} ", task.id)).attribute(Attribute::Bold);
|
||||
let exit_status = match &task.result {
|
||||
Some(TaskResult::Success) => style(format!("with exit code 0")).with(Color::Green),
|
||||
|
@ -249,7 +249,7 @@ pub fn print_log(task_log: &mut TaskLogMessage, settings: &Settings) {
|
|||
};
|
||||
print!("{} {}", task_text, exit_status);
|
||||
|
||||
// Print command and path
|
||||
// Print command and path.
|
||||
println!("Command: {}", task.command);
|
||||
println!("Path: {}", task.path);
|
||||
|
||||
|
@ -281,7 +281,7 @@ pub fn print_local_log_output(task_id: usize, settings: &Settings) {
|
|||
}
|
||||
};
|
||||
// Stdout handler to directly write log file output to io::stdout
|
||||
// without having to load anything into memory
|
||||
// without having to load anything into memory.
|
||||
let mut stdout = io::stdout();
|
||||
|
||||
if let Ok(metadata) = stdout_log.metadata() {
|
||||
|
@ -315,9 +315,9 @@ pub fn print_local_log_output(task_id: usize, settings: &Settings) {
|
|||
}
|
||||
}
|
||||
|
||||
/// Prints log output received from the daemon
|
||||
/// Prints log output received from the daemon.
|
||||
/// We can safely call .unwrap() on stdout and stderr in here, since this
|
||||
/// branch is always called after ensuring that both are `Some`
|
||||
/// branch is always called after ensuring that both are `Some`.
|
||||
pub fn print_task_output_from_daemon(task_log: &TaskLogMessage) {
|
||||
if !task_log.stdout.as_ref().unwrap().is_empty() {
|
||||
if let Err(err) = print_remote_task_output(&task_log, true) {
|
||||
|
|
|
@ -11,17 +11,13 @@ pub struct Opt {
|
|||
#[structopt(short, long, parse(from_occurrences))]
|
||||
pub verbose: u8,
|
||||
|
||||
/// If this flag is set, the daemon will start and fork itself
|
||||
/// into the background. Closing the terminal won't kill the
|
||||
/// daemon any longer. This should be avoided and rather be
|
||||
/// properly done using a service manager.
|
||||
/// If this flag is set, the daemon will start and fork itself into the background.
|
||||
/// Closing the terminal won't kill the daemon any longer.
|
||||
/// This should be avoided and rather be properly done using a service manager.
|
||||
#[structopt(short, long)]
|
||||
pub daemonize: bool,
|
||||
|
||||
// /// The ip the daemon listens on. Overwrites the address in the config file
|
||||
// #[structopt(short, long)]
|
||||
// pub address: Option<String>,
|
||||
/// The port the daemon listens on. Overwrites the port in the config file
|
||||
/// The port the daemon listens on. Overwrites the port in the config file.
|
||||
#[structopt(short, long)]
|
||||
pub port: Option<String>,
|
||||
}
|
||||
|
|
|
@ -36,7 +36,8 @@ pub fn handle_message(message: Message, sender: &Sender<Message>, state: &Shared
|
|||
}
|
||||
|
||||
/// Queues a new task to the state.
|
||||
/// If the start_immediately flag is set, send a StartMessage to the task handler
|
||||
/// If the start_immediately flag is set, send a StartMessage to the task handler.
|
||||
/// Invoked when calling `pueue add`.
|
||||
fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
|
||||
let starting_status = if message.stashed || message.enqueue_at.is_some() {
|
||||
TaskStatus::Stashed
|
||||
|
@ -59,7 +60,7 @@ fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState)
|
|||
));
|
||||
}
|
||||
|
||||
// Create a new task and add it to the state
|
||||
// Create a new task and add it to the state.
|
||||
let task = Task::new(
|
||||
message.command,
|
||||
message.path,
|
||||
|
@ -69,7 +70,7 @@ fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState)
|
|||
message.dependencies,
|
||||
);
|
||||
|
||||
// Create a new group in case the user used a unknown group
|
||||
// Create a new group in case the user used a unknown group.
|
||||
if let Some(group) = &task.group {
|
||||
if let None = state.groups.get(group) {
|
||||
return create_failure_message(format!(
|
||||
|
@ -87,7 +88,7 @@ fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState)
|
|||
.send(Message::Start(vec![task_id]))
|
||||
.expect(SENDER_ERR);
|
||||
}
|
||||
// Create the customized response for the client
|
||||
// Create the customized response for the client.
|
||||
let message = if let Some(enqueue_at) = message.enqueue_at {
|
||||
format!(
|
||||
"New task added (id {}). It will be enqueued at {}",
|
||||
|
@ -102,7 +103,9 @@ fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState)
|
|||
create_success_message(message)
|
||||
}
|
||||
|
||||
/// Remove tasks from the queue
|
||||
/// Remove tasks from the queue.
|
||||
/// We have to ensure that those tasks aren't running!
|
||||
/// Invoked when calling `pueue remove`.
|
||||
fn remove(task_ids: Vec<usize>, state: &SharedState) -> Message {
|
||||
let mut state = state.lock().unwrap();
|
||||
let statuses = vec![TaskStatus::Running, TaskStatus::Paused];
|
||||
|
@ -118,7 +121,9 @@ fn remove(task_ids: Vec<usize>, state: &SharedState) -> Message {
|
|||
create_success_message(response)
|
||||
}
|
||||
|
||||
/// Switch the position of two tasks in the upcoming queue
|
||||
/// Switch the position of two tasks in the upcoming queue.
|
||||
/// We have to ensure that those tasks are either `Queued` or `Stashed`
|
||||
/// Invoked when calling `pueue switch`.
|
||||
fn switch(message: SwitchMessage, state: &SharedState) -> Message {
|
||||
let task_ids = vec![message.task_id_1, message.task_id_2];
|
||||
let statuses = vec![TaskStatus::Queued, TaskStatus::Stashed];
|
||||
|
@ -145,7 +150,8 @@ fn switch(message: SwitchMessage, state: &SharedState) -> Message {
|
|||
}
|
||||
|
||||
/// Stash specific queued tasks.
|
||||
/// They won't be executed until enqueued again or explicitely started
|
||||
/// They won't be executed until they're enqueued or explicitely started.
|
||||
/// Invoked when calling `pueue stash`.
|
||||
fn stash(task_ids: Vec<usize>, state: &SharedState) -> Message {
|
||||
let (matching, mismatching) = {
|
||||
let mut state = state.lock().unwrap();
|
||||
|
@ -165,7 +171,7 @@ fn stash(task_ids: Vec<usize>, state: &SharedState) -> Message {
|
|||
}
|
||||
|
||||
/// Enqueue specific stashed tasks.
|
||||
/// They will be normally handled afterwards.
|
||||
/// Invoked when calling `pueue enqueue`.
|
||||
fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message {
|
||||
let (matching, mismatching) = {
|
||||
let mut state = state.lock().unwrap();
|
||||
|
@ -195,7 +201,8 @@ fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message {
|
|||
return create_success_message(response);
|
||||
}
|
||||
|
||||
/// Forward the start message to the task handler and respond to the client
|
||||
/// Forward the start message to the task handler, which then starts the process(es).
|
||||
/// Invoked when calling `pueue start`.
|
||||
fn start(task_ids: Vec<usize>, sender: &Sender<Message>, state: &SharedState) -> Message {
|
||||
sender
|
||||
.send(Message::Start(task_ids.clone()))
|
||||
|
@ -213,8 +220,9 @@ fn start(task_ids: Vec<usize>, sender: &Sender<Message>, state: &SharedState) ->
|
|||
return create_success_message("Daemon and all tasks are being resumed.");
|
||||
}
|
||||
|
||||
/// Create and enqueue tasks from already finished tasks
|
||||
/// Create and enqueue tasks from already finished tasks.
|
||||
/// The user can specify to immediately start the newly created tasks.
|
||||
/// Invoked when calling `pueue restart`.
|
||||
fn restart(message: RestartMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
|
||||
let new_status = if message.stashed {
|
||||
TaskStatus::Stashed
|
||||
|
@ -236,8 +244,8 @@ fn restart(message: RestartMessage, sender: &Sender<Message>, state: &SharedStat
|
|||
new_ids.push(state.add_task(new_task));
|
||||
}
|
||||
|
||||
// Already create the response string in here. Otherwise we would
|
||||
// need to get matching/mismatching out of this scope
|
||||
// Already create the response string in here.
|
||||
// Otherwise we would need to get matching/mismatching out of this scope.
|
||||
response = compile_task_response("Restarted tasks", matching, mismatching);
|
||||
|
||||
new_ids
|
||||
|
@ -252,7 +260,8 @@ fn restart(message: RestartMessage, sender: &Sender<Message>, state: &SharedStat
|
|||
return create_success_message(response);
|
||||
}
|
||||
|
||||
/// Forward the pause message to the task handler and respond to the client
|
||||
/// Forward the pause message to the task handler, which then pauses the process(es).
|
||||
/// Invoked when calling `pueue pause`.
|
||||
fn pause(message: PauseMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
|
||||
sender
|
||||
.send(Message::Pause(message.clone()))
|
||||
|
@ -270,7 +279,8 @@ fn pause(message: PauseMessage, sender: &Sender<Message>, state: &SharedState) -
|
|||
return create_success_message("Daemon and all tasks are being paused.");
|
||||
}
|
||||
|
||||
/// Forward the kill message to the task handler and respond to the client
|
||||
/// Forward the kill message to the task handler, which then kills the process.
|
||||
/// Invoked when calling `pueue kill`.
|
||||
fn kill(message: KillMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
|
||||
sender
|
||||
.send(Message::Kill(message.clone()))
|
||||
|
@ -289,11 +299,11 @@ fn kill(message: KillMessage, sender: &Sender<Message>, state: &SharedState) ->
|
|||
return create_success_message("All tasks are being killed.");
|
||||
}
|
||||
|
||||
// Send some user defined input to a process
|
||||
// The message will be forwarded to the task handler.
|
||||
// In here we only do some error handling.
|
||||
/// The message will be forwarded to the task handler, which then sends the user input to the process.
|
||||
/// In here we only do some error handling.
|
||||
/// Invoked when calling `pueue send`.
|
||||
fn send(message: SendMessage, sender: &Sender<Message>, state: &SharedState) -> Message {
|
||||
// Check whether the task exists and is running, abort if that's not the case
|
||||
// Check whether the task exists and is running. Abort if that's not the case.
|
||||
{
|
||||
let state = state.lock().unwrap();
|
||||
match state.tasks.get(&message.task_id) {
|
||||
|
@ -306,16 +316,17 @@ fn send(message: SendMessage, sender: &Sender<Message>, state: &SharedState) ->
|
|||
}
|
||||
}
|
||||
|
||||
// Check whether the task exists and is running, abort if that's not the case
|
||||
// Check whether the task exists and is running, abort if that's not the case.
|
||||
sender.send(Message::Send(message)).expect(SENDER_ERR);
|
||||
|
||||
return create_success_message("Message is being send to the process.");
|
||||
}
|
||||
|
||||
// If a user wants to edit a message, we need to send him the current command
|
||||
// and lock the task to prevent execution, before the user has finished editing the command
|
||||
/// If a user wants to edit a message, we need to send him the current command.
|
||||
/// Lock the task to prevent execution, before the user has finished editing the command.
|
||||
/// Invoked when calling `pueue edit`.
|
||||
fn edit_request(task_id: usize, state: &SharedState) -> Message {
|
||||
// Check whether the task exists and is queued/stashed, abort if that's not the case
|
||||
// Check whether the task exists and is queued/stashed. Abort if that's not the case.
|
||||
let mut state = state.lock().unwrap();
|
||||
match state.tasks.get_mut(&task_id) {
|
||||
Some(task) => {
|
||||
|
@ -336,9 +347,10 @@ fn edit_request(task_id: usize, state: &SharedState) -> Message {
|
|||
}
|
||||
}
|
||||
|
||||
// Handle the actual updated command
|
||||
/// Now we actually update the message with the updated command from the client.
|
||||
/// Invoked after closing the editor on `pueue edit`.
|
||||
fn edit(message: EditMessage, state: &SharedState) -> Message {
|
||||
// Check whether the task exists and is queued/stashed, abort if that's not the case
|
||||
// Check whether the task exists and is locked. Abort if that's not the case
|
||||
let mut state = state.lock().unwrap();
|
||||
match state.tasks.get_mut(&message.task_id) {
|
||||
Some(task) => {
|
||||
|
@ -362,7 +374,8 @@ fn edit(message: EditMessage, state: &SharedState) -> Message {
|
|||
}
|
||||
}
|
||||
|
||||
/// Remove all failed or done tasks from the state
|
||||
/// Remove all failed or done tasks from the state.
|
||||
/// Invoked when calling `pueue clean`.
|
||||
fn clean(state: &SharedState) -> Message {
|
||||
let mut state = state.lock().unwrap();
|
||||
state.backup();
|
||||
|
@ -378,23 +391,24 @@ fn clean(state: &SharedState) -> Message {
|
|||
return create_success_message("All finished tasks have been removed");
|
||||
}
|
||||
|
||||
/// Forward the reset request to the task handler
|
||||
/// The handler then kills all children and clears the task queue
|
||||
/// Forward the reset request to the task handler.
|
||||
/// The handler then kills all children and clears the task queue.
|
||||
/// Invoked when calling `pueue reset`.
|
||||
fn reset(sender: &Sender<Message>, state: &SharedState) -> Message {
|
||||
sender.send(Message::Reset).expect(SENDER_ERR);
|
||||
clean(state);
|
||||
return create_success_message("Everything is being reset right now.");
|
||||
}
|
||||
|
||||
/// Return the current state
|
||||
/// Invoked when calling `pueue status`
|
||||
/// Return the current state.
|
||||
/// Invoked when calling `pueue status`.
|
||||
fn get_status(state: &SharedState) -> Message {
|
||||
let state = state.lock().unwrap().clone();
|
||||
Message::StatusResponse(state)
|
||||
}
|
||||
|
||||
/// Return the current state and the stdou/stderr of all tasks to the client
|
||||
/// Invoked when calling `pueue log`
|
||||
/// Return the current state and the stdou/stderr of all tasks to the client.
|
||||
/// Invoked when calling `pueue log`.
|
||||
fn get_log(message: LogRequestMessage, state: &SharedState) -> Message {
|
||||
let state = state.lock().unwrap().clone();
|
||||
// Return all logs, if no specific task id is specified
|
||||
|
@ -445,19 +459,29 @@ fn get_log(message: LogRequestMessage, state: &SharedState) -> Message {
|
|||
fn set_parallel_tasks(message: ParallelMessage, state: &SharedState) -> Message {
|
||||
let mut state = state.lock().unwrap();
|
||||
|
||||
// Set the global default if no group is specified
|
||||
// Set the default parallel tasks if no group is specified.
|
||||
if let None = message.group {
|
||||
state.settings.daemon.default_parallel_tasks = message.parallel_tasks;
|
||||
return create_success_message("Parallel tasks setting adjusted");
|
||||
}
|
||||
|
||||
// We can safely unwrap, since we handled the None case above
|
||||
// We can safely unwrap, since we handled the `None` case above.
|
||||
let group = &message.group.unwrap();
|
||||
// Check if the given group exists.
|
||||
if !state.groups.contains_key(group) {
|
||||
return create_failure_message(format!("Unknown group. Use one of these: {:?}", state.groups.keys()));
|
||||
return create_failure_message(format!(
|
||||
"Unknown group. Use one of these: {:?}",
|
||||
state.groups.keys()
|
||||
));
|
||||
}
|
||||
|
||||
state.settings.daemon.groups.insert(group.into(), message.parallel_tasks);
|
||||
return create_success_message(format!("Parallel tasks setting for group {} adjusted", group));
|
||||
state
|
||||
.settings
|
||||
.daemon
|
||||
.groups
|
||||
.insert(group.into(), message.parallel_tasks);
|
||||
return create_success_message(format!(
|
||||
"Parallel tasks setting for group {} adjusted",
|
||||
group
|
||||
));
|
||||
}
|
||||
|
|
|
@ -32,14 +32,14 @@ async fn main() -> Result<()> {
|
|||
Ok(()) => {}
|
||||
};
|
||||
|
||||
// Parse commandline options
|
||||
// Parse commandline options.
|
||||
let opt = Opt::from_args();
|
||||
|
||||
if opt.daemonize {
|
||||
fork_daemon(&opt)?;
|
||||
}
|
||||
|
||||
// Set the verbosity level for the client app
|
||||
// Set the verbosity level for the client app.
|
||||
if opt.verbose >= 3 {
|
||||
SimpleLogger::init(LevelFilter::Debug, Config::default())?;
|
||||
} else if opt.verbose == 2 {
|
||||
|
@ -67,7 +67,7 @@ async fn main() -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize all directories needed for normal operation
|
||||
/// Initialize all directories needed for normal operation.
|
||||
fn init_directories(path: &String) {
|
||||
let pueue_dir = Path::new(path);
|
||||
if !pueue_dir.exists() {
|
||||
|
@ -99,8 +99,8 @@ fn init_directories(path: &String) {
|
|||
}
|
||||
}
|
||||
|
||||
/// This is a simple and cheap custom fork method
|
||||
/// Simply spawn a new child with identical arguments and exit right away
|
||||
/// This is a simple and cheap custom fork method.
|
||||
/// Simply spawn a new child with identical arguments and exit right away.
|
||||
fn fork_daemon(opt: &Opt) -> Result<()> {
|
||||
let mut arguments = Vec::<String>::new();
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ pub fn task_response_helper(
|
|||
statuses: Vec<TaskStatus>,
|
||||
state: &SharedState,
|
||||
) -> String {
|
||||
// Get all matching/mismatching task_ids for all given ids and statuses
|
||||
// Get all matching/mismatching task_ids for all given ids and statuses.
|
||||
let (matching, mismatching) = {
|
||||
let mut state = state.lock().unwrap();
|
||||
state.tasks_in_statuses(statuses, Some(task_ids))
|
||||
|
@ -28,7 +28,7 @@ pub fn compile_task_response(
|
|||
let mismatching: Vec<String> = mismatching.iter().map(|id| id.to_string()).collect();
|
||||
let matching_string = matching.join(", ");
|
||||
|
||||
// We don't have any mismatching ids, return the simple message
|
||||
// We don't have any mismatching ids, return the simple message.
|
||||
if mismatching.is_empty() {
|
||||
return format!("{}: {}", message, matching_string);
|
||||
}
|
||||
|
@ -36,12 +36,12 @@ pub fn compile_task_response(
|
|||
let mismatched_message = "The command couldn't be executed for these tasks";
|
||||
let mismatching_string = mismatching.join(", ");
|
||||
|
||||
// All given ids are invalid
|
||||
// All given ids are invalid.
|
||||
if matching.is_empty() {
|
||||
return format!("{}: {}", mismatched_message, mismatching_string);
|
||||
}
|
||||
|
||||
// Some ids were valid, some were invalid
|
||||
// Some ids were valid, some were invalid.
|
||||
format!(
|
||||
"{}: {}\n{}: {}",
|
||||
message, matching_string, mismatched_message, mismatching_string
|
||||
|
|
|
@ -11,17 +11,16 @@ use ::pueue::message::*;
|
|||
use ::pueue::protocol::*;
|
||||
use ::pueue::state::SharedState;
|
||||
|
||||
/// Poll the unix listener and accept new incoming connections
|
||||
/// Create a new future to handle the message and spawn it
|
||||
/// Poll the unix listener and accept new incoming connections.
|
||||
/// Create a new future to handle the message and spawn it.
|
||||
pub async fn accept_incoming(sender: Sender<Message>, state: SharedState, opt: Opt) -> Result<()> {
|
||||
// Commandline argument overwrites the configuration files values for port
|
||||
// Commandline argument overwrites the configuration files values for port.
|
||||
let port = if let Some(port) = opt.port.clone() {
|
||||
port
|
||||
} else {
|
||||
let state = state.lock().unwrap();
|
||||
state.settings.daemon.port.clone()
|
||||
};
|
||||
// let address = format!("{}:{}", address, port);
|
||||
let address = format!("127.0.0.1:{}", port);
|
||||
let listener = TcpListener::bind(address).await?;
|
||||
|
||||
|
@ -47,7 +46,7 @@ async fn handle_incoming(
|
|||
// Receive the secret once and check, whether the client is allowed to connect
|
||||
let payload_bytes = receive_bytes(&mut socket).await?;
|
||||
|
||||
// Didn't receive any bytes. The client disconnected
|
||||
// Didn't receive any bytes. The client disconnected.
|
||||
if payload_bytes.len() == 0 {
|
||||
info!("Client went away");
|
||||
return Ok(());
|
||||
|
@ -55,7 +54,7 @@ async fn handle_incoming(
|
|||
|
||||
let secret = String::from_utf8(payload_bytes)?;
|
||||
|
||||
// Return immediately, if we got a wrong secret from the client
|
||||
// Return immediately, if we got a wrong secret from the client.
|
||||
{
|
||||
let state = state.lock().unwrap();
|
||||
if secret != state.settings.daemon.secret {
|
||||
|
@ -77,20 +76,20 @@ async fn handle_incoming(
|
|||
info!("Received instruction: {:?}", message);
|
||||
|
||||
let response = if let Message::StreamRequest(message) = message {
|
||||
// The client requested the output of a task
|
||||
// Since we allow streaming, this needs to be handled seperately
|
||||
// The client requested the output of a task.
|
||||
// Since we allow streaming, this needs to be handled seperately.
|
||||
handle_show(&pueue_directory, &mut socket, message).await?
|
||||
} else if let Message::DaemonShutdown = message {
|
||||
// Simply shut down the daemon right after sending a success response
|
||||
// Simply shut down the daemon right after sending a success response.
|
||||
let response = create_success_message("Daemon is shutting down");
|
||||
send_message(response, &mut socket).await?;
|
||||
std::process::exit(0);
|
||||
} else {
|
||||
// Process a normal message
|
||||
// Process a normal message.
|
||||
handle_message(message, &sender, &state)
|
||||
};
|
||||
|
||||
// Respond to the client
|
||||
// Respond to the client.
|
||||
send_message(response, &mut socket).await?;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,14 +10,14 @@ use ::pueue::log::*;
|
|||
use ::pueue::message::*;
|
||||
use ::pueue::protocol::send_message;
|
||||
|
||||
/// Handle the continuous stream of a message
|
||||
/// Handle the continuous stream of a message.
|
||||
pub async fn handle_show(
|
||||
pueue_directory: &String,
|
||||
socket: &mut TcpStream,
|
||||
message: StreamRequestMessage,
|
||||
) -> Result<Message> {
|
||||
if message.follow || message.err {
|
||||
// The client requested streaming of stdout
|
||||
// The client requested streaming of stdout.
|
||||
let mut handle: File;
|
||||
match get_log_file_handles(message.task_id, pueue_directory) {
|
||||
Err(_) => {
|
||||
|
@ -34,20 +34,20 @@ pub async fn handle_show(
|
|||
}
|
||||
}
|
||||
|
||||
// Get the stdout/stderr path
|
||||
// Get the stdout/stderr path.
|
||||
// We need to check continuously, whether the file still exists,
|
||||
// since the file can go away (e.g. due to finishing a task)
|
||||
// since the file can go away (e.g. due to finishing a task).
|
||||
let (out_path, err_path) = get_log_paths(message.task_id, pueue_directory);
|
||||
let handle_path = if message.err { err_path } else { out_path };
|
||||
|
||||
loop {
|
||||
// Check whether the file still exists. Exit if it doesn't
|
||||
// Check whether the file still exists. Exit if it doesn't.
|
||||
if !handle_path.exists() {
|
||||
return Ok(create_success_message(
|
||||
"File has gone away. The task probably just finished",
|
||||
));
|
||||
}
|
||||
// Read the next chunk of text from the last position
|
||||
// Read the next chunk of text from the last position.
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
if let Err(err) = handle.read_to_end(&mut buffer) {
|
||||
|
@ -55,15 +55,15 @@ pub async fn handle_show(
|
|||
};
|
||||
let text = String::from_utf8_lossy(&buffer).to_string();
|
||||
|
||||
// Send the new chunk and wait for 1 second
|
||||
// Send the new chunk and wait for 1 second.
|
||||
let response = Message::Stream(text);
|
||||
send_message(response, socket).await?;
|
||||
let wait = future::ready(1).delay(Duration::from_millis(1000));
|
||||
wait.await;
|
||||
}
|
||||
} else {
|
||||
// The client requested a one-shot execution
|
||||
// Simply read the file and send the current stdout/stderr once
|
||||
// The client requested a one-shot execution.
|
||||
// Simply read the file and send the current stdout/stderr once.
|
||||
let (stdout, stderr) = match read_log_files(message.task_id, pueue_directory) {
|
||||
Err(_) => {
|
||||
return Ok(create_failure_message(
|
||||
|
|
|
@ -27,8 +27,7 @@ pub struct TaskHandler {
|
|||
callbacks: Vec<Child>,
|
||||
running: bool,
|
||||
reset: bool,
|
||||
// Some static settings that are extracted from `state.settings`
|
||||
// for convenience purposes.
|
||||
// Some static settings that are extracted from `state.settings` for convenience purposes.
|
||||
pueue_directory: String,
|
||||
callback: Option<String>,
|
||||
pause_on_failure: bool,
|
||||
|
@ -60,8 +59,8 @@ impl TaskHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/// The task handler needs to kill all child processes as soon, as the program exits
|
||||
/// This is needed to prevent detached processes
|
||||
/// The task handler needs to kill all child processes as soon, as the program exits.
|
||||
/// This is needed to prevent detached processes.
|
||||
impl Drop for TaskHandler {
|
||||
fn drop(&mut self) {
|
||||
let ids: Vec<usize> = self.children.keys().cloned().collect();
|
||||
|
@ -74,21 +73,14 @@ impl Drop for TaskHandler {
|
|||
}
|
||||
|
||||
impl TaskHandler {
|
||||
/// Main loop of the task handler
|
||||
/// Main loop of the task handler.
|
||||
/// In here a few things happen:
|
||||
/// 1. Propagated commands from socket communication is received and handled
|
||||
/// 2. Check whether any tasks just finished
|
||||
/// 3. Check if there are any stashed processes ready for being enqueued
|
||||
/// 4. Check whether we can spawn new tasks
|
||||
/// 1. Propagated commands from socket communication is received and handled.
|
||||
/// 2. Check whether any tasks just finished.
|
||||
/// 3. Check if there are any stashed processes ready for being enqueued.
|
||||
/// 4. Check whether we can spawn new tasks.
|
||||
pub fn run(&mut self) {
|
||||
loop {
|
||||
// Sleep for a few milliseconds. We don't want to hurt the CPU
|
||||
let timeout = Duration::from_millis(100);
|
||||
// Don't use recv_timeout for now, until this bug get's fixed
|
||||
// https://github.com/rust-lang/rust/issues/39364
|
||||
//match self.receiver.recv_timeout(timeout) {
|
||||
std::thread::sleep(timeout);
|
||||
|
||||
self.receive_commands();
|
||||
self.handle_finished_tasks();
|
||||
self.check_callbacks();
|
||||
|
@ -424,8 +416,14 @@ impl TaskHandler {
|
|||
}
|
||||
|
||||
/// Some client instructions require immediate action by the task handler
|
||||
/// These commands are
|
||||
fn receive_commands(&mut self) {
|
||||
// Sleep for a few milliseconds. We don't want to hurt the CPU.
|
||||
let timeout = Duration::from_millis(100);
|
||||
// Don't use recv_timeout for now, until this bug get's fixed.
|
||||
// https://github.com/rust-lang/rust/issues/39364
|
||||
//match self.receiver.recv_timeout(timeout) {
|
||||
std::thread::sleep(timeout);
|
||||
|
||||
match self.receiver.try_recv() {
|
||||
Ok(message) => self.handle_message(message),
|
||||
Err(_) => {}
|
||||
|
@ -443,7 +441,7 @@ impl TaskHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/// Send a signal to a unix process
|
||||
/// Send a signal to a unix process.
|
||||
#[cfg(not(windows))]
|
||||
fn send_signal(&mut self, id: usize, signal: Signal) -> Result<bool, nix::Error> {
|
||||
if let Some(child) = self.children.get(&id) {
|
||||
|
@ -487,7 +485,7 @@ impl TaskHandler {
|
|||
self.change_running(true);
|
||||
}
|
||||
|
||||
/// Send a start signal to a paused task to continue execution
|
||||
/// Send a start signal to a paused task to continue execution.
|
||||
fn continue_task(&mut self, id: usize) {
|
||||
if !self.children.contains_key(&id) {
|
||||
return;
|
||||
|
@ -541,7 +539,7 @@ impl TaskHandler {
|
|||
}
|
||||
|
||||
/// Pause a specific task.
|
||||
/// Send a signal to the process to actually pause the OS process
|
||||
/// Send a signal to the process to actually pause the OS process.
|
||||
fn pause_task(&mut self, id: usize) {
|
||||
if !self.children.contains_key(&id) {
|
||||
return;
|
||||
|
@ -576,7 +574,7 @@ impl TaskHandler {
|
|||
return;
|
||||
}
|
||||
|
||||
// Pause the daemon and kill all tasks
|
||||
// Pause the daemon and kill all tasks.
|
||||
if message.all {
|
||||
info!("Killing all spawned children");
|
||||
self.change_running(false);
|
||||
|
@ -587,7 +585,7 @@ impl TaskHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/// Kill a specific task and handle it accordingly
|
||||
/// Kill a specific task and handle it accordingly.
|
||||
/// Triggered on `reset` and `kill`.
|
||||
fn kill_task(&mut self, task_id: usize) {
|
||||
if let Some(child) = self.children.get_mut(&task_id) {
|
||||
|
@ -607,7 +605,7 @@ impl TaskHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/// Send some input to a child process
|
||||
/// Send some input to a child process.
|
||||
fn send(&mut self, message: SendMessage) {
|
||||
let task_id = message.task_id;
|
||||
let input = message.input;
|
||||
|
@ -632,7 +630,7 @@ impl TaskHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/// Kill all children by reusing the `kill` function
|
||||
/// Kill all children by reusing the `kill` function.
|
||||
/// Set the `reset` flag, which will prevent new tasks from being spawned.
|
||||
/// If all children finished, the state will be completely reset.
|
||||
fn reset(&mut self) {
|
||||
|
@ -645,7 +643,7 @@ impl TaskHandler {
|
|||
self.reset = true;
|
||||
}
|
||||
|
||||
/// Change the running state consistently
|
||||
/// Change the running state consistently.
|
||||
fn change_running(&mut self, running: bool) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
state.running = running;
|
||||
|
@ -653,7 +651,7 @@ impl TaskHandler {
|
|||
state.save();
|
||||
}
|
||||
|
||||
/// Users can specify a callback that's fired whenever a task finishes
|
||||
/// Users can specify a callback that's fired whenever a task finishes.
|
||||
/// Execute the callback by spawning a new subprocess.
|
||||
fn spawn_callback(&mut self, task: &Task) {
|
||||
// Return early, if there's no callback specified
|
||||
|
@ -663,10 +661,10 @@ impl TaskHandler {
|
|||
return;
|
||||
};
|
||||
|
||||
// Build the callback command from the given template
|
||||
// Build the callback command from the given template.
|
||||
let mut handlebars = Handlebars::new();
|
||||
handlebars.set_strict_mode(true);
|
||||
// Build templating variables
|
||||
// Build templating variables.
|
||||
let mut parameters = HashMap::new();
|
||||
parameters.insert("id", task.id.to_string());
|
||||
parameters.insert("command", task.command.clone());
|
||||
|
@ -708,8 +706,8 @@ impl TaskHandler {
|
|||
self.callbacks.push(child);
|
||||
}
|
||||
|
||||
/// Look at all running callbacks and log any errors
|
||||
/// If everything went smoothly, simply remove them from the list
|
||||
/// Look at all running callbacks and log any errors.
|
||||
/// If everything went smoothly, simply remove them from the list.
|
||||
fn check_callbacks(&mut self) {
|
||||
let mut finished = Vec::new();
|
||||
for (id, child) in self.callbacks.iter_mut().enumerate() {
|
||||
|
@ -719,7 +717,7 @@ impl TaskHandler {
|
|||
error!("Callback failed with error {:?}", error);
|
||||
finished.push(id);
|
||||
}
|
||||
// Child process did not exit yet
|
||||
// Child process did not exit yet.
|
||||
Ok(None) => continue,
|
||||
Ok(exit_status) => {
|
||||
info!("Callback finished with exit code {:?}", exit_status);
|
||||
|
|
|
@ -6,7 +6,7 @@ use ::std::io;
|
|||
use ::std::io::prelude::*;
|
||||
use ::std::path::{Path, PathBuf};
|
||||
|
||||
/// Return the paths to temporary stdout and stderr files for a task
|
||||
/// Return the paths to temporary stdout and stderr files for a task.
|
||||
pub fn get_log_paths(task_id: usize, path: &String) -> (PathBuf, PathBuf) {
|
||||
let pueue_dir = Path::new(path).join("task_logs");
|
||||
let out_path = pueue_dir.join(format!("{}_stdout.log", task_id));
|
||||
|
@ -14,7 +14,7 @@ pub fn get_log_paths(task_id: usize, path: &String) -> (PathBuf, PathBuf) {
|
|||
(out_path, err_path)
|
||||
}
|
||||
|
||||
/// Create and return the file handle for temporary stdout and stderr files for a task
|
||||
/// Create and return the file handle for temporary stdout and stderr files for a task.
|
||||
pub fn create_log_file_handles(task_id: usize, path: &String) -> Result<(File, File)> {
|
||||
let (out_path, err_path) = get_log_paths(task_id, path);
|
||||
let stdout = File::create(out_path)?;
|
||||
|
@ -23,7 +23,7 @@ pub fn create_log_file_handles(task_id: usize, path: &String) -> Result<(File, F
|
|||
Ok((stdout, stderr))
|
||||
}
|
||||
|
||||
/// Return the file handle for temporary stdout and stderr files for a task
|
||||
/// Return the file handle for temporary stdout and stderr files for a task.
|
||||
pub fn get_log_file_handles(task_id: usize, path: &String) -> Result<(File, File)> {
|
||||
let (out_path, err_path) = get_log_paths(task_id, path);
|
||||
let stdout = File::open(out_path)?;
|
||||
|
@ -32,7 +32,7 @@ pub fn get_log_file_handles(task_id: usize, path: &String) -> Result<(File, File
|
|||
Ok((stdout, stderr))
|
||||
}
|
||||
|
||||
/// Return the content of temporary stdout and stderr files for a task
|
||||
/// Return the content of temporary stdout and stderr files for a task.
|
||||
pub fn read_log_files(task_id: usize, path: &String) -> Result<(String, String)> {
|
||||
let (mut stdout_handle, mut stderr_handle) = get_log_file_handles(task_id, path)?;
|
||||
let mut stdout_buffer = Vec::new();
|
||||
|
@ -47,7 +47,7 @@ pub fn read_log_files(task_id: usize, path: &String) -> Result<(String, String)>
|
|||
Ok((stdout.to_string(), stderr.to_string()))
|
||||
}
|
||||
|
||||
/// Remove temporary stdout and stderr files for a task
|
||||
/// Remove temporary stdout and stderr files for a task.
|
||||
pub fn clean_log_handles(task_id: usize, path: &String) {
|
||||
let (out_path, err_path) = get_log_paths(task_id, path);
|
||||
if let Err(err) = remove_file(out_path) {
|
||||
|
@ -64,8 +64,8 @@ pub fn clean_log_handles(task_id: usize, path: &String) {
|
|||
};
|
||||
}
|
||||
|
||||
/// Return stdout and stderr of a finished process
|
||||
/// Everything is compressed using Brotli and then encoded to Base64
|
||||
/// Return stdout and stderr of a finished process.
|
||||
/// Task output is compressed using snap to save some memory and bandwidth.
|
||||
pub fn read_and_compress_log_files(task_id: usize, path: &String) -> Result<(Vec<u8>, Vec<u8>)> {
|
||||
let (mut stdout_handle, mut stderr_handle) = match get_log_file_handles(task_id, path) {
|
||||
Ok((stdout, stderr)) => (stdout, stderr),
|
||||
|
|
|
@ -102,7 +102,7 @@ pub struct EditResponseMessage {
|
|||
pub path: String,
|
||||
}
|
||||
|
||||
// The booleans decides, whether the stream should be continuous or a oneshot
|
||||
// The booleans decides, whether the stream should be continuous or a oneshot.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct StreamRequestMessage {
|
||||
pub task_id: usize,
|
||||
|
@ -120,7 +120,7 @@ pub struct LogRequestMessage {
|
|||
pub send_logs: bool,
|
||||
}
|
||||
|
||||
/// Helper struct for sending tasks and their log output to the client
|
||||
/// Helper struct for sending tasks and their log output to the client.
|
||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||
pub struct TaskLogMessage {
|
||||
pub task: Task,
|
||||
|
|
|
@ -7,8 +7,8 @@ use ::std::io::Cursor;
|
|||
|
||||
use crate::message::*;
|
||||
|
||||
/// Convenience wrapper around send_bytes
|
||||
/// Deserialize a message and feed the bytes into send_bytes
|
||||
/// Convenience wrapper around send_bytes.
|
||||
/// Deserialize a message and feed the bytes into send_bytes.
|
||||
pub async fn send_message(message: Message, socket: &mut TcpStream) -> Result<()> {
|
||||
debug!("Sending message: {:?}", message);
|
||||
// Prepare command for transfer and determine message byte size
|
||||
|
@ -38,8 +38,8 @@ pub async fn send_bytes(payload: Vec<u8>, socket: &mut TcpStream) -> Result<()>
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Receive a byte stream depending on a given header
|
||||
/// This is the basic protocol beneath all pueue communication
|
||||
/// Receive a byte stream depending on a given header.
|
||||
/// This is the basic protocol beneath all pueue communication.
|
||||
pub async fn receive_bytes(socket: &mut TcpStream) -> Result<Vec<u8>> {
|
||||
// Receive the header with the overall message size
|
||||
let mut header = vec![0; 8];
|
||||
|
@ -75,11 +75,11 @@ pub async fn receive_bytes(socket: &mut TcpStream) -> Result<Vec<u8>> {
|
|||
Ok(payload_bytes)
|
||||
}
|
||||
|
||||
/// Convenience wrapper that receives a message and converts it into a Message
|
||||
/// Convenience wrapper that receives a message and converts it into a Message.
|
||||
pub async fn receive_message(socket: &mut TcpStream) -> Result<Message> {
|
||||
let payload_bytes = receive_bytes(socket).await?;
|
||||
|
||||
// Deserialize the message
|
||||
// Deserialize the message.
|
||||
let message: Message = bincode::deserialize(&payload_bytes).context(
|
||||
"In case you updated Pueue, try restarting the daemon. Otherwise please report this",
|
||||
)?;
|
||||
|
|
|
@ -69,7 +69,7 @@ impl Settings {
|
|||
}
|
||||
|
||||
/// Save the current configuration as a file to the configuration path.
|
||||
/// The file is written to the main configuration directory of the respective OS
|
||||
/// The file is written to the main configuration directory of the respective OS.
|
||||
pub fn save(&self) -> Result<()> {
|
||||
let config_path = default_config_path()?;
|
||||
let config_dir = config_path
|
||||
|
|
|
@ -26,7 +26,7 @@ pub struct State {
|
|||
|
||||
impl State {
|
||||
pub fn new(settings: &Settings) -> State {
|
||||
// Create a default group state
|
||||
// Create a default group state.
|
||||
let mut groups = HashMap::new();
|
||||
for group in settings.daemon.groups.keys() {
|
||||
groups.insert(group.into(), true);
|
||||
|
@ -65,11 +65,14 @@ impl State {
|
|||
}
|
||||
}
|
||||
|
||||
/// Check if the given group already exists
|
||||
/// If it doesn't exist yet, create a state entry and a new settings entry
|
||||
/// Check if the given group already exists.
|
||||
/// If it doesn't exist yet, create a state entry and a new settings entry.
|
||||
pub fn create_group(&mut self, group: &String) {
|
||||
if let None = self.settings.daemon.groups.get(group) {
|
||||
self.settings.daemon.groups.insert(group.into(), self.settings.daemon.default_parallel_tasks);
|
||||
self.settings
|
||||
.daemon
|
||||
.groups
|
||||
.insert(group.into(), self.settings.daemon.default_parallel_tasks);
|
||||
}
|
||||
if let None = self.groups.get(group) {
|
||||
self.groups.insert(group.into(), true);
|
||||
|
@ -97,14 +100,14 @@ impl State {
|
|||
|
||||
// Filter all task id's that match the provided statuses.
|
||||
for task_id in task_ids.iter() {
|
||||
// Check whether the task exists
|
||||
// Check whether the task exists and save all non-existing task ids.
|
||||
match self.tasks.get(&task_id) {
|
||||
None => {
|
||||
mismatching.push(*task_id);
|
||||
continue;
|
||||
}
|
||||
Some(task) => {
|
||||
// Check whether the task status matches the specified statuses
|
||||
// Check whether the task status matches the specified statuses.
|
||||
if statuses.contains(&task.status) {
|
||||
matching.push(*task_id);
|
||||
} else {
|
||||
|
@ -125,14 +128,14 @@ impl State {
|
|||
self.save();
|
||||
}
|
||||
|
||||
/// Convenience wrapper around save_to_file
|
||||
/// Convenience wrapper around save_to_file.
|
||||
pub fn save(&mut self) {
|
||||
self.save_to_file(false);
|
||||
}
|
||||
|
||||
/// Save the current current state in a file with a timestamp
|
||||
/// At the same time remove old state logs from the log directory
|
||||
/// This function is called, when large changes to the state are applied, e.g. clean/reset
|
||||
/// Save the current current state in a file with a timestamp.
|
||||
/// At the same time remove old state logs from the log directory.
|
||||
/// This function is called, when large changes to the state are applied, e.g. clean/reset.
|
||||
pub fn backup(&mut self) {
|
||||
self.save_to_file(true);
|
||||
if let Err(error) = self.rotate() {
|
||||
|
@ -141,11 +144,11 @@ impl State {
|
|||
}
|
||||
|
||||
/// Save the current state to disk.
|
||||
/// We do this to restore in case of a crash
|
||||
/// If log == true, the file will be saved with a time stamp
|
||||
/// We do this to restore in case of a crash.
|
||||
/// If log == true, the file will be saved with a time stamp.
|
||||
///
|
||||
/// In comparison to the daemon -> client communication, the state is saved
|
||||
/// as JSON for better readability and debug purposes
|
||||
/// as JSON for better readability and debug purposes.
|
||||
fn save_to_file(&mut self, log: bool) {
|
||||
let serialized = serde_json::to_string(&self);
|
||||
if let Err(error) = serialized {
|
||||
|
@ -169,7 +172,7 @@ impl State {
|
|||
real = path.join("state.json");
|
||||
}
|
||||
|
||||
// Write to temporary log file first, to prevent loss due to crashes
|
||||
// Write to temporary log file first, to prevent loss due to crashes.
|
||||
if let Err(error) = fs::write(&temp, serialized) {
|
||||
error!(
|
||||
"Failed to write log to directory. File permissions? Error: {:?}",
|
||||
|
@ -178,7 +181,7 @@ impl State {
|
|||
return;
|
||||
}
|
||||
|
||||
// Overwrite the original with the temp file, if everything went fine
|
||||
// Overwrite the original with the temp file, if everything went fine.
|
||||
if let Err(error) = fs::rename(&temp, real) {
|
||||
error!(
|
||||
"Failed to overwrite old log file. File permissions? Error: {:?}",
|
||||
|
@ -188,8 +191,8 @@ impl State {
|
|||
}
|
||||
}
|
||||
|
||||
/// Restore the last state from a previous session
|
||||
/// The state is stored as json in the log directory
|
||||
/// Restore the last state from a previous session.
|
||||
/// The state is stored as json in the log directory.
|
||||
fn restore(&mut self) {
|
||||
let path = Path::new(&self.settings.daemon.pueue_directory).join("state.json");
|
||||
|
||||
|
@ -203,7 +206,7 @@ impl State {
|
|||
}
|
||||
info!("Start restoring state");
|
||||
|
||||
// Try to load the file
|
||||
// Try to load the file.
|
||||
let data = fs::read_to_string(&path);
|
||||
if let Err(error) = data {
|
||||
error!("Failed to read previous state log: {:?}", error);
|
||||
|
@ -211,7 +214,7 @@ impl State {
|
|||
}
|
||||
let data = data.unwrap();
|
||||
|
||||
// Try to deserialize the state file
|
||||
// Try to deserialize the state file.
|
||||
let deserialized: Result<State, serde_json::error::Error> = serde_json::from_str(&data);
|
||||
if let Err(error) = deserialized {
|
||||
error!("Failed to deserialize previous state log: {:?}", error);
|
||||
|
@ -265,7 +268,7 @@ impl State {
|
|||
self.max_id = state.max_id;
|
||||
}
|
||||
|
||||
/// Remove old logs that aren't needed any longer
|
||||
/// Remove old logs that aren't needed any longer.
|
||||
fn rotate(&mut self) -> Result<()> {
|
||||
let path = Path::new(&self.settings.daemon.pueue_directory);
|
||||
let path = path.join("log");
|
||||
|
|
|
@ -103,8 +103,8 @@ impl Task {
|
|||
return self.status == TaskStatus::Done;
|
||||
}
|
||||
|
||||
// Check if the task errored
|
||||
// The only case when it didn't error is if it didn't run yet or if the task exited successfully
|
||||
// Check if the task errored.
|
||||
// The only case when it didn't error is if it didn't run yet or if the task exited successfully.
|
||||
pub fn failed(&self) -> bool {
|
||||
match self.result {
|
||||
None => false,
|
||||
|
|
Loading…
Reference in a new issue