diff --git a/client/cli.rs b/client/cli.rs index aeced3a..2af2f19 100644 --- a/client/cli.rs +++ b/client/cli.rs @@ -268,10 +268,10 @@ fn min_one(value: String) -> Result<(), String> { if value < 1 { return Err("You must provide a value that's bigger than 0".into()); } - return Ok(()); + Ok(()) } Err(_) => { - return Err("Failed to parse integer".into()); + Err("Failed to parse integer".into()) } } } diff --git a/client/client.rs b/client/client.rs index 3a7adfb..2fbe96f 100644 --- a/client/client.rs +++ b/client/client.rs @@ -48,7 +48,7 @@ impl Client { opt, daemon_address: address, message, - settings: settings, + settings, }) } diff --git a/client/edit.rs b/client/edit.rs index 46bd7cb..e31c6d6 100644 --- a/client/edit.rs +++ b/client/edit.rs @@ -13,7 +13,7 @@ use crate::cli::SubCommand; /// final edit message with the updated command to the daemon. pub fn edit(message: EditResponseMessage, cli_command: &SubCommand) -> Message { let edit_path = match cli_command { - SubCommand::Edit { task_id: _, path } => *path, + SubCommand::Edit { path, .. } => *path, _ => panic!( "Got wrong Subcommand {:?} in edit. This shouldn't happen", cli_command diff --git a/client/message.rs b/client/message.rs index 1daf60b..a82ba89 100644 --- a/client/message.rs +++ b/client/message.rs @@ -89,7 +89,7 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result { }; Ok(Message::Send(message)) } - SubCommand::Edit { task_id, path: _ } => Ok(Message::EditRequest(*task_id)), + SubCommand::Edit { task_id, .. } => Ok(Message::EditRequest(*task_id)), SubCommand::Group { add, remove } => { let message = GroupMessage { add: add.clone(), @@ -97,8 +97,8 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result { }; Ok(Message::Group(message)) } - SubCommand::Status { json: _ } => Ok(Message::Status), - SubCommand::Log { task_ids, json: _ } => { + SubCommand::Status { .. } => Ok(Message::Status), + SubCommand::Log { task_ids, .. } => { let message = LogRequestMessage { task_ids: task_ids.clone(), send_logs: !settings.client.read_local_logs, @@ -130,9 +130,6 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result { }; Ok(Message::Parallel(message)) } - SubCommand::Completions { - shell: _, - output_directory: _, - } => Err(anyhow!("Completions have to be handled earlier")), + SubCommand::Completions { .. } => Err(anyhow!("Completions have to be handled earlier")), } } diff --git a/client/output.rs b/client/output.rs index 70dc222..aa75268 100644 --- a/client/output.rs +++ b/client/output.rs @@ -234,7 +234,7 @@ pub fn print_log(task_log: &mut TaskLogMessage, settings: &Settings) { // Print task id and exit code. let task_text = style(format!("Task {} ", task.id)).attribute(Attribute::Bold); let exit_status = match &task.result { - Some(TaskResult::Success) => style(format!("with exit code 0")).with(Color::Green), + Some(TaskResult::Success) => style("with exit code 0".into()).with(Color::Green), Some(TaskResult::Failed(exit_code)) => { style(format!("with exit code {}", exit_code)).with(Color::Red) } diff --git a/daemon/instructions.rs b/daemon/instructions.rs index 94c73a8..c433d84 100644 --- a/daemon/instructions.rs +++ b/daemon/instructions.rs @@ -73,7 +73,7 @@ fn add_task(message: AddMessage, sender: &Sender, state: &SharedState) // Create a new group in case the user used a unknown group. if let Some(group) = &task.group { - if let None = state.groups.get(group) { + if state.groups.get(group).is_none() { return create_failure_message(format!( "Tried to create task with unknown group '{}'", group @@ -168,7 +168,7 @@ fn stash(task_ids: Vec, state: &SharedState) -> Message { let text = "Tasks are stashed"; let response = compile_task_response(text, matching, mismatching); - return create_success_message(response); + create_success_message(response) } /// Enqueue specific stashed tasks. @@ -199,7 +199,7 @@ fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message { }; let response = compile_task_response(text.as_str(), matching, mismatching); - return create_success_message(response); + create_success_message(response) } /// Forward the start message to the task handler, which then starts the process(es). @@ -218,7 +218,7 @@ fn start(task_ids: Vec, sender: &Sender, state: &SharedState) -> return create_success_message(response); } - return create_success_message("Daemon and all tasks are being resumed."); + create_success_message("Daemon and all tasks are being resumed.") } /// Create and enqueue tasks from already finished tasks. @@ -258,7 +258,7 @@ fn restart(message: RestartMessage, sender: &Sender, state: &SharedStat sender.send(Message::Start(new_ids)).expect(SENDER_ERR); } - return create_success_message(response); + create_success_message(response) } /// Forward the pause message to the task handler, which then pauses the process(es). @@ -277,7 +277,7 @@ fn pause(message: PauseMessage, sender: &Sender, state: &SharedState) - return create_success_message(response); } - return create_success_message("Daemon and all tasks are being paused."); + create_success_message("Daemon and all tasks are being paused.") } /// Forward the kill message to the task handler, which then kills the process. @@ -297,7 +297,7 @@ fn kill(message: KillMessage, sender: &Sender, state: &SharedState) -> return create_success_message(response); } - return create_success_message("All tasks are being killed."); + create_success_message("All tasks are being killed.") } /// The message will be forwarded to the task handler, which then sends the user input to the process. @@ -320,7 +320,7 @@ fn send(message: SendMessage, sender: &Sender, state: &SharedState) -> // Check whether the task exists and is running, abort if that's not the case. sender.send(Message::Send(message)).expect(SENDER_ERR); - return create_success_message("Message is being send to the process."); + create_success_message("Message is being send to the process.") } /// If a user wants to edit a message, we need to send him the current command. @@ -342,9 +342,9 @@ fn edit_request(task_id: usize, state: &SharedState) -> Message { command: task.command.clone(), path: task.path.clone(), }; - return Message::EditResponse(message); + Message::EditResponse(message) } - None => return create_failure_message("No task with this id."), + None => create_failure_message("No task with this id."), } } @@ -364,10 +364,10 @@ fn edit(message: EditMessage, state: &SharedState) -> Message { task.path = message.path.clone(); state.save(); - return create_success_message("Command has been updated"); + create_success_message("Command has been updated") } None => { - return create_failure_message(format!( + create_failure_message(format!( "Task to edit has gone away: {}", message.task_id )) @@ -430,7 +430,7 @@ fn clean(state: &SharedState) -> Message { state.save(); - return create_success_message("All finished tasks have been removed"); + create_success_message("All finished tasks have been removed") } /// Forward the reset request to the task handler. @@ -439,7 +439,7 @@ fn clean(state: &SharedState) -> Message { fn reset(sender: &Sender, state: &SharedState) -> Message { sender.send(Message::Reset).expect(SENDER_ERR); clean(state); - return create_success_message("Everything is being reset right now."); + create_success_message("Everything is being reset right now.") } /// Return the current state. @@ -462,36 +462,33 @@ fn get_log(message: LogRequestMessage, state: &SharedState) -> Message { let mut tasks = BTreeMap::new(); for task_id in task_ids.iter() { - match state.tasks.get(task_id) { - Some(task) => { - // We send log output and the task at the same time. - // This isn't as efficient as sending the raw compressed data directly, - // but it's a lot more convenient for now. - let (stdout, stderr) = if message.send_logs { - match read_and_compress_log_files( - *task_id, - &state.settings.daemon.pueue_directory, - ) { - Ok((stdout, stderr)) => (Some(stdout), Some(stderr)), - Err(err) => { - return create_failure_message(format!( - "Failed reading process output file: {:?}", - err - )); - } + if let Some(task) = state.tasks.get(task_id) { + // We send log output and the task at the same time. + // This isn't as efficient as sending the raw compressed data directly, + // but it's a lot more convenient for now. + let (stdout, stderr) = if message.send_logs { + match read_and_compress_log_files( + *task_id, + &state.settings.daemon.pueue_directory, + ) { + Ok((stdout, stderr)) => (Some(stdout), Some(stderr)), + Err(err) => { + return create_failure_message(format!( + "Failed reading process output file: {:?}", + err + )); } - } else { - (None, None) - }; + } + } else { + (None, None) + }; - let task_log = TaskLogMessage { - task: task.clone(), - stdout, - stderr, - }; - tasks.insert(*task_id, task_log); - } - None => {} + let task_log = TaskLogMessage { + task: task.clone(), + stdout, + stderr, + }; + tasks.insert(*task_id, task_log); } } Message::LogResponse(tasks) @@ -502,7 +499,7 @@ fn set_parallel_tasks(message: ParallelMessage, state: &SharedState) -> Message let mut state = state.lock().unwrap(); // Set the default parallel tasks if no group is specified. - if let None = message.group { + if message.group.is_none() { state.settings.daemon.default_parallel_tasks = message.parallel_tasks; return create_success_message("Parallel tasks setting adjusted"); } @@ -527,8 +524,8 @@ fn set_parallel_tasks(message: ParallelMessage, state: &SharedState) -> Message return create_failure_message(format!("Failed while saving the config file: {}", error)); } - return create_success_message(format!( + create_success_message(format!( "Parallel tasks setting for group {} adjusted", group - )); + )) } diff --git a/daemon/main.rs b/daemon/main.rs index 4a5ba90..64432d8 100644 --- a/daemon/main.rs +++ b/daemon/main.rs @@ -1,4 +1,4 @@ -use ::anyhow::{bail, Error, Result}; +use ::anyhow::{bail, Result}; use ::simplelog::{Config, LevelFilter, SimpleLogger}; use ::std::fs::create_dir_all; use ::std::path::Path; @@ -26,7 +26,6 @@ async fn main() -> Result<()> { let settings = Settings::new()?; match settings.save() { Err(error) => { - let error: Error = From::from(error); bail!(error.context("Failed saving the config file")); } Ok(()) => {} @@ -68,7 +67,7 @@ async fn main() -> Result<()> { } /// Initialize all directories needed for normal operation. -fn init_directories(path: &String) { +fn init_directories(path: &str) { let pueue_dir = Path::new(path); if !pueue_dir.exists() { if let Err(error) = create_dir_all(&pueue_dir) { diff --git a/daemon/socket.rs b/daemon/socket.rs index 0a6d9bd..b23540d 100644 --- a/daemon/socket.rs +++ b/daemon/socket.rs @@ -47,7 +47,7 @@ async fn handle_incoming( let payload_bytes = receive_bytes(&mut socket).await?; // Didn't receive any bytes. The client disconnected. - if payload_bytes.len() == 0 { + if payload_bytes.is_empty() { info!("Client went away"); return Ok(()); } diff --git a/daemon/streaming.rs b/daemon/streaming.rs index 38e0f80..0541322 100644 --- a/daemon/streaming.rs +++ b/daemon/streaming.rs @@ -12,7 +12,7 @@ use ::pueue::protocol::send_message; /// Handle the continuous stream of a message. pub async fn handle_show( - pueue_directory: &String, + pueue_directory: &str, socket: &mut TcpStream, message: StreamRequestMessage, ) -> Result { diff --git a/daemon/task_handler.rs b/daemon/task_handler.rs index affeba5..ec36948 100644 --- a/daemon/task_handler.rs +++ b/daemon/task_handler.rs @@ -128,7 +128,7 @@ impl TaskHandler { } } - return state + state .tasks .iter() .filter(|(_, task)| task.status == TaskStatus::Queued) @@ -161,15 +161,14 @@ impl TaskHandler { running < &state.settings.daemon.default_parallel_tasks } }) - .filter(|(_, task)| { + .find(|(_, task)| { // Check whether all dependencies for this task are fulfilled. task.dependencies .iter() .flat_map(|id| state.tasks.get(id)) .all(|task| task.status == TaskStatus::Done) }) - .next() - .map(|(id, _)| *id); + .map(|(id, _)| *id) } /// See if we can start a new queued task. @@ -424,9 +423,8 @@ impl TaskHandler { //match self.receiver.recv_timeout(timeout) { std::thread::sleep(timeout); - match self.receiver.try_recv() { - Ok(message) => self.handle_message(message), - Err(_) => {} + if let Ok(message) = self.receiver.try_recv() { + self.handle_message(message); }; } diff --git a/shared/log.rs b/shared/log.rs index e923f7f..80aa3be 100644 --- a/shared/log.rs +++ b/shared/log.rs @@ -7,7 +7,7 @@ use ::std::io::prelude::*; use ::std::path::{Path, PathBuf}; /// Return the paths to temporary stdout and stderr files for a task. -pub fn get_log_paths(task_id: usize, path: &String) -> (PathBuf, PathBuf) { +pub fn get_log_paths(task_id: usize, path: &str) -> (PathBuf, PathBuf) { let pueue_dir = Path::new(path).join("task_logs"); let out_path = pueue_dir.join(format!("{}_stdout.log", task_id)); let err_path = pueue_dir.join(format!("{}_stderr.log", task_id)); @@ -15,7 +15,7 @@ pub fn get_log_paths(task_id: usize, path: &String) -> (PathBuf, PathBuf) { } /// Create and return the file handle for temporary stdout and stderr files for a task. -pub fn create_log_file_handles(task_id: usize, path: &String) -> Result<(File, File)> { +pub fn create_log_file_handles(task_id: usize, path: &str) -> Result<(File, File)> { let (out_path, err_path) = get_log_paths(task_id, path); let stdout = File::create(out_path)?; let stderr = File::create(err_path)?; @@ -24,7 +24,7 @@ pub fn create_log_file_handles(task_id: usize, path: &String) -> Result<(File, F } /// Return the file handle for temporary stdout and stderr files for a task. -pub fn get_log_file_handles(task_id: usize, path: &String) -> Result<(File, File)> { +pub fn get_log_file_handles(task_id: usize, path: &str) -> Result<(File, File)> { let (out_path, err_path) = get_log_paths(task_id, path); let stdout = File::open(out_path)?; let stderr = File::open(err_path)?; @@ -33,7 +33,7 @@ pub fn get_log_file_handles(task_id: usize, path: &String) -> Result<(File, File } /// Return the content of temporary stdout and stderr files for a task. -pub fn read_log_files(task_id: usize, path: &String) -> Result<(String, String)> { +pub fn read_log_files(task_id: usize, path: &str) -> Result<(String, String)> { let (mut stdout_handle, mut stderr_handle) = get_log_file_handles(task_id, path)?; let mut stdout_buffer = Vec::new(); let mut stderr_buffer = Vec::new(); @@ -48,7 +48,7 @@ pub fn read_log_files(task_id: usize, path: &String) -> Result<(String, String)> } /// Remove temporary stdout and stderr files for a task. -pub fn clean_log_handles(task_id: usize, path: &String) { +pub fn clean_log_handles(task_id: usize, path: &str) { let (out_path, err_path) = get_log_paths(task_id, path); if let Err(err) = remove_file(out_path) { error!( @@ -66,7 +66,7 @@ pub fn clean_log_handles(task_id: usize, path: &String) { /// Return stdout and stderr of a finished process. /// Task output is compressed using snap to save some memory and bandwidth. -pub fn read_and_compress_log_files(task_id: usize, path: &String) -> Result<(Vec, Vec)> { +pub fn read_and_compress_log_files(task_id: usize, path: &str) -> Result<(Vec, Vec)> { let (mut stdout_handle, mut stderr_handle) = match get_log_file_handles(task_id, path) { Ok((stdout, stderr)) => (stdout, stderr), Err(err) => { diff --git a/shared/protocol.rs b/shared/protocol.rs index dd3d550..2dd56c1 100644 --- a/shared/protocol.rs +++ b/shared/protocol.rs @@ -30,8 +30,7 @@ pub async fn send_bytes(payload: Vec, socket: &mut TcpStream) -> Result<()> socket.write_all(&header).await?; // Split the payload into 1.5kbyte chunks (MUT for TCP) - let mut iter = payload.chunks(1500); - while let Some(chunk) = iter.next() { + for chunk in payload.chunks(1500) { socket.write_all(chunk).await?; } diff --git a/shared/settings.rs b/shared/settings.rs index 4438745..08d67b8 100644 --- a/shared/settings.rs +++ b/shared/settings.rs @@ -74,7 +74,7 @@ impl Settings { let config_path = default_config_path()?; let config_dir = config_path .parent() - .ok_or(anyhow!("Couldn't resolve config dir"))?; + .ok_or_else(|| anyhow!("Couldn't resolve config dir"))?; // Create the config dir, if it doesn't exist yet if !config_dir.exists() { @@ -106,7 +106,7 @@ fn parse_config(settings: &mut Config) -> Result<()> { } fn get_home_dir() -> Result { - dirs::home_dir().ok_or(anyhow!("Couldn't resolve home dir")) + dirs::home_dir().ok_or_else(|| anyhow!("Couldn't resolve home dir")) } fn gen_random_secret() -> String { diff --git a/shared/state.rs b/shared/state.rs index 3ba129f..9ecc872 100644 --- a/shared/state.rs +++ b/shared/state.rs @@ -37,7 +37,7 @@ impl State { settings: settings.clone(), running: true, tasks: BTreeMap::new(), - groups: groups, + groups, }; state.restore(); state.save(); @@ -67,11 +67,11 @@ impl State { /// Check if the given group already exists. /// If it doesn't exist yet, create a state entry and a new settings entry. - pub fn create_group(&mut self, group: &String) -> Result<()> { - if let None = self.settings.daemon.groups.get(group) { + pub fn create_group(&mut self, group: &str) -> Result<()> { + if self.settings.daemon.groups.get(group).is_none() { self.settings.daemon.groups.insert(group.into(), 1); } - if let None = self.groups.get(group) { + if self.groups.get(group).is_none() { self.groups.insert(group.into(), true); } @@ -81,7 +81,7 @@ impl State { /// Remove a group. /// Also go through all tasks and set the removed group to `None`. - pub fn remove_group(&mut self, group: &String) -> Result<()> { + pub fn remove_group(&mut self, group: &str) -> Result<()> { self.settings.daemon.groups.remove(group); self.groups.remove(group); @@ -178,18 +178,14 @@ impl State { let serialized = serialized.unwrap(); let path = Path::new(&self.settings.daemon.pueue_directory); - let temp: PathBuf; - let real: PathBuf; - if log { + let (temp, real) = if log { let path = path.join("log"); let now: DateTime = Utc::now(); let time = now.format("%Y-%m-%d_%H-%M-%S"); - temp = path.join(format!("{}_backup.json.partial", time)); - real = path.join(format!("{}_state.json", time)); + (path.join(format!("{}_backup.json.partial", time)), path.join(format!("{}_state.json", time))) } else { - temp = path.join("state.json.partial"); - real = path.join("state.json"); - } + (path.join("state.json.partial"), path.join("state.json")) + }; // Write to temporary log file first, to prevent loss due to crashes. if let Err(error) = fs::write(&temp, serialized) { diff --git a/shared/task.rs b/shared/task.rs index 9b08c20..aa96fdf 100644 --- a/shared/task.rs +++ b/shared/task.rs @@ -68,7 +68,7 @@ impl Task { id: 0, command, path, - group: group, + group, enqueue_at, dependencies, status: starting_status.clone(), @@ -96,11 +96,11 @@ impl Task { } pub fn is_running(&self) -> bool { - return self.status == TaskStatus::Running || self.status == TaskStatus::Paused; + self.status == TaskStatus::Running || self.status == TaskStatus::Paused } pub fn is_done(&self) -> bool { - return self.status == TaskStatus::Done; + self.status == TaskStatus::Done } // Check if the task errored. @@ -114,6 +114,6 @@ impl Task { } pub fn is_queued(&self) -> bool { - return self.status == TaskStatus::Queued || self.status == TaskStatus::Stashed; + self.status == TaskStatus::Queued || self.status == TaskStatus::Stashed } }