Rename show to follow and respect read_local_logs

This commit is contained in:
Arne Beer 2020-05-13 22:18:36 +02:00
parent 145e64a917
commit 0dc29184ea
11 changed files with 114 additions and 82 deletions

View file

@ -6,6 +6,13 @@
- Environment variable capture. Tasks will now start with the variables of the environment `pueue add` is being called in.
- Add the option for specifying a custom callback that'll be called whenever tasks finish.
**Changes:**
- `log` now also works on running and paused tasks. It thereby replaces some of `show`'s functionality.
- Rename `show` to `follow`. The `follow` is now only for actually following the output of a single command.
**Improvements:**
- `follow` (previously `show`) now also reads directly from disk, if `read_local_logs` is set to `true`.
# v0.4.0
**Features:**
- Dependencies! This adds the `--after [ids]` option.

View file

@ -11,7 +11,6 @@ pub enum SubCommand {
/// Enqueue a task for execution.
Add {
/// The command that should be added.
#[structopt()]
command: Vec<String>,
/// Start the task immediately.
@ -105,7 +104,6 @@ pub enum SubCommand {
/// Identical tasks will be created and instantly queued (unless specified otherwise).
Restart {
/// The tasks you want to restart.
#[structopt()]
task_ids: Vec<usize>,
/// Immediately start the task(s).
@ -193,7 +191,6 @@ pub enum SubCommand {
/// Display the log output of finished tasks.
Log {
/// Specify for which specific tasks you want to see the output.
#[structopt()]
task_ids: Vec<usize>,
/// Print the current state as json.
/// Includes EVERYTHING.
@ -201,15 +198,13 @@ pub enum SubCommand {
json: bool,
},
/// Show the output of a currently running task.
/// This command allows following (like `tail -f`).
Show {
/// Follow the output of a currently running task.
/// This command works like `tail -f`.
Follow {
/// The id of the task.
task_id: usize,
/// Continuously print stdout (like `tail -f`).
#[structopt(short, long)]
follow: bool,
/// Like -f, but shows stderr instead of stdout.
/// Show stderr instead of stdout.
#[structopt(short, long)]
err: bool,
},

View file

@ -2,6 +2,7 @@ use ::anyhow::Result;
use ::simplelog::{Config, LevelFilter, SimpleLogger};
use ::structopt::StructOpt;
use ::pueue::message::Message;
use ::pueue::settings::Settings;
pub mod cli;
@ -14,6 +15,7 @@ pub mod output_helper;
use crate::cli::{Opt, SubCommand};
use crate::client::Client;
use crate::message::get_message_from_opt;
use crate::output::follow_task_logs;
#[async_std::main]
async fn main() -> Result<()> {
@ -51,6 +53,21 @@ async fn main() -> Result<()> {
// Create the message that should be sent to the daemon
// depending on the given commandline options.
let message = get_message_from_opt(&opt, &settings)?;
// Some special command handling.
// Simple log output follows for local logs don't need any communication with the daemon.
// Thereby we handle this separately over here.
match &message {
Message::StreamRequest(message) => {
if settings.client.read_local_logs {
let pueue_directory = settings.daemon.pueue_directory.clone();
follow_task_logs(pueue_directory, message.task_id, message.err);
return Ok(());
}
}
_ => (),
}
let mut client = Client::new(settings, message, opt)?;
client.run().await?;

View file

@ -125,17 +125,12 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result<Message> {
};
Ok(Message::Log(message))
}
SubCommand::Show {
task_id,
follow,
err,
} => {
SubCommand::Follow { task_id, err } => {
let message = StreamRequestMessage {
task_id: *task_id,
follow: *follow,
err: *err,
};
Ok(Message::StreamRequest(message))
return Ok(Message::StreamRequest(message));
}
SubCommand::Clean => Ok(Message::Clean),
SubCommand::Reset => Ok(Message::Reset),

View file

@ -7,7 +7,7 @@ use ::std::collections::BTreeMap;
use ::std::io;
use ::std::string::ToString;
use ::pueue::log::get_log_file_handles;
use ::pueue::log::{get_log_file_handles, get_log_paths};
use ::pueue::message::TaskLogMessage;
use ::pueue::settings::Settings;
use ::pueue::state::State;
@ -173,6 +173,7 @@ pub fn print_logs(
) {
let (json, task_ids) = match cli_command {
SubCommand::Log { json, task_ids } => (*json, task_ids.clone()),
SubCommand::Follow { task_id, .. } => (false, vec![*task_id]),
_ => panic!(
"Got wrong Subcommand {:?} in print_log. This shouldn't happen",
cli_command
@ -199,7 +200,9 @@ pub fn print_logs(
// Add a newline if there is another task that's going to be printed.
if let Some((_, task_log)) = task_iter.peek() {
if task_log.task.status == TaskStatus::Done {
if !vec![TaskStatus::Done, TaskStatus::Running, TaskStatus::Paused]
.contains(&task_log.task.status)
{
println!();
}
}
@ -209,8 +212,8 @@ pub fn print_logs(
/// Print the log of a single task.
pub fn print_log(task_log: &mut TaskLogMessage, settings: &Settings) {
let task = &task_log.task;
// We only show logs of finished tasks.
if task.status != TaskStatus::Done {
// We only show logs of finished or running tasks.
if !vec![TaskStatus::Done, TaskStatus::Running, TaskStatus::Paused].contains(&task.status) {
return;
}
@ -228,7 +231,7 @@ pub fn print_log(task_log: &mut TaskLogMessage, settings: &Settings) {
Some(TaskResult::DependencyFailed) => {
style("dependency failed".to_string()).with(Color::Red)
}
None => panic!("Got a 'Done' task without a task result. Please report this bug."),
None => style("running".to_string()),
};
print!("{} {}", task_text, exit_status);
@ -338,3 +341,36 @@ pub fn print_remote_task_output(task_log: &TaskLogMessage, stdout: bool) -> Resu
Ok(())
}
/// Print the log ouput of finished tasks.
/// Either print the logs of every task
/// or only print the logs of the specified tasks.
pub fn follow_task_logs(pueue_directory: String, task_id: usize, stderr: bool) {
let (stdout_handle, stderr_handle) = match get_log_file_handles(task_id, &pueue_directory) {
Ok((stdout, stderr)) => (stdout, stderr),
Err(err) => {
println!("Failed to get log file handles: {}", err);
return;
}
};
let mut handle = if stderr { stderr_handle } else { stdout_handle };
let (out_path, err_path) = get_log_paths(task_id, &pueue_directory);
let handle_path = if stderr { err_path } else { out_path };
// Stdout handler to directly write log file output to io::stdout
// without having to load anything into memory.
let mut stdout = io::stdout();
loop {
// Check whether the file still exists. Exit if it doesn't.
if !handle_path.exists() {
println!("File has gone away. Did somebody remove the task?");
return;
}
// Read the next chunk of text from the last position.
if let Err(err) = io::copy(&mut handle, &mut stdout) {
println!("Error while reading file: {}", err);
return;
};
}
}

View file

@ -468,6 +468,7 @@ fn group(message: GroupMessage, state: &SharedState) -> Message {
return create_success_message(format!("Group {} removed", group));
}
// Compile a small minimalistic text with all important information about all known groups
let mut group_status = String::new();
let mut group_iter = state.groups.iter().peekable();
while let Some((group, running)) = group_iter.next() {
@ -481,7 +482,6 @@ fn group(message: GroupMessage, state: &SharedState) -> Message {
group_status.push('\n');
}
}
// Print all groups
create_success_message(group_status)
}

View file

@ -16,65 +16,48 @@ pub async fn handle_show(
socket: &mut TcpStream,
message: StreamRequestMessage,
) -> Result<Message> {
if message.follow || message.err {
// The client requested streaming of stdout.
let mut handle: File;
match get_log_file_handles(message.task_id, pueue_directory) {
Err(_) => {
return Ok(create_failure_message(
"Couldn't find output files for task. Maybe it finished? Try `log`",
))
}
Ok((stdout_handle, stderr_handle)) => {
handle = if message.err {
stderr_handle
} else {
stdout_handle
};
}
// The client requested streaming of stdout.
let mut handle: File;
match get_log_file_handles(message.task_id, pueue_directory) {
Err(_) => {
return Ok(create_failure_message(
"Couldn't find output files for task. Maybe it finished? Try `log`",
))
}
// Get the stdout/stderr path.
// We need to check continuously, whether the file still exists,
// since the file can go away (e.g. due to finishing a task).
let (out_path, err_path) = get_log_paths(message.task_id, pueue_directory);
let handle_path = if message.err { err_path } else { out_path };
loop {
// Check whether the file still exists. Exit if it doesn't.
if !handle_path.exists() {
return Ok(create_success_message(
"File has gone away. The task probably just finished",
));
}
// Read the next chunk of text from the last position.
let mut buffer = Vec::new();
if let Err(err) = handle.read_to_end(&mut buffer) {
return Ok(create_failure_message(format!("Error: {}", err)));
Ok((stdout_handle, stderr_handle)) => {
handle = if message.err {
stderr_handle
} else {
stdout_handle
};
let text = String::from_utf8_lossy(&buffer).to_string();
// Send the new chunk and wait for 1 second.
let response = Message::Stream(text);
send_message(response, socket).await?;
let wait = future::ready(1).delay(Duration::from_millis(1000));
wait.await;
}
} else {
// The client requested a one-shot execution.
// Simply read the file and send the current stdout/stderr once.
let (stdout, stderr) = match read_log_files(message.task_id, pueue_directory) {
Err(_) => {
return Ok(create_failure_message(
"Couldn't find output files for task. Maybe it finished? Try `log`",
))
}
Ok((stdout, stderr)) => (stdout, stderr),
}
// Get the stdout/stderr path.
// We need to check continuously, whether the file still exists,
// since the file can go away (e.g. due to finishing a task).
let (out_path, err_path) = get_log_paths(message.task_id, pueue_directory);
let handle_path = if message.err { err_path } else { out_path };
loop {
// Check whether the file still exists. Exit if it doesn't.
if !handle_path.exists() {
return Ok(create_success_message(
"File has gone away. Did somebody remove the task?",
));
}
// Read the next chunk of text from the last position.
let mut buffer = Vec::new();
if let Err(err) = handle.read_to_end(&mut buffer) {
return Ok(create_failure_message(format!("Error: {}", err)));
};
let text = String::from_utf8_lossy(&buffer).to_string();
let response = format!("Stdout:\n{}\n\nStderr:\n{}", stdout, stderr);
Ok(create_success_message(response))
// Send the new chunk and wait for 1 second.
let response = Message::Stream(text);
send_message(response, socket).await?;
let wait = future::ready(1).delay(Duration::from_millis(1000));
wait.await;
}
}

View file

@ -117,11 +117,10 @@ pub struct GroupMessage {
pub remove: Option<String>,
}
// The booleans decides, whether the stream should be continuous or a oneshot.
/// `err` decides, whether you should stream stderr or stdout.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct StreamRequestMessage {
pub task_id: usize,
pub follow: bool,
pub err: bool,
}

View file

@ -1,5 +1,5 @@
pueue reset
sleep 0.5
sleep 1
pueue add ls
pueue add failing
pueue add sleep 6000

View file

@ -1,4 +1,4 @@
counter = 1
counter=1
while true; do
echo "nice ${counter}"