Clippy fixes

This commit is contained in:
Arne Beer 2020-05-11 19:10:05 +02:00
parent 28e8b6d7b2
commit daf221a74f
15 changed files with 82 additions and 96 deletions

View file

@ -268,10 +268,10 @@ fn min_one(value: String) -> Result<(), String> {
if value < 1 {
return Err("You must provide a value that's bigger than 0".into());
}
return Ok(());
Ok(())
}
Err(_) => {
return Err("Failed to parse integer".into());
Err("Failed to parse integer".into())
}
}
}

View file

@ -48,7 +48,7 @@ impl Client {
opt,
daemon_address: address,
message,
settings: settings,
settings,
})
}

View file

@ -13,7 +13,7 @@ use crate::cli::SubCommand;
/// final edit message with the updated command to the daemon.
pub fn edit(message: EditResponseMessage, cli_command: &SubCommand) -> Message {
let edit_path = match cli_command {
SubCommand::Edit { task_id: _, path } => *path,
SubCommand::Edit { path, .. } => *path,
_ => panic!(
"Got wrong Subcommand {:?} in edit. This shouldn't happen",
cli_command

View file

@ -89,7 +89,7 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result<Message> {
};
Ok(Message::Send(message))
}
SubCommand::Edit { task_id, path: _ } => Ok(Message::EditRequest(*task_id)),
SubCommand::Edit { task_id, .. } => Ok(Message::EditRequest(*task_id)),
SubCommand::Group { add, remove } => {
let message = GroupMessage {
add: add.clone(),
@ -97,8 +97,8 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result<Message> {
};
Ok(Message::Group(message))
}
SubCommand::Status { json: _ } => Ok(Message::Status),
SubCommand::Log { task_ids, json: _ } => {
SubCommand::Status { .. } => Ok(Message::Status),
SubCommand::Log { task_ids, .. } => {
let message = LogRequestMessage {
task_ids: task_ids.clone(),
send_logs: !settings.client.read_local_logs,
@ -130,9 +130,6 @@ pub fn get_message_from_opt(opt: &Opt, settings: &Settings) -> Result<Message> {
};
Ok(Message::Parallel(message))
}
SubCommand::Completions {
shell: _,
output_directory: _,
} => Err(anyhow!("Completions have to be handled earlier")),
SubCommand::Completions { .. } => Err(anyhow!("Completions have to be handled earlier")),
}
}

View file

@ -234,7 +234,7 @@ pub fn print_log(task_log: &mut TaskLogMessage, settings: &Settings) {
// Print task id and exit code.
let task_text = style(format!("Task {} ", task.id)).attribute(Attribute::Bold);
let exit_status = match &task.result {
Some(TaskResult::Success) => style(format!("with exit code 0")).with(Color::Green),
Some(TaskResult::Success) => style("with exit code 0".into()).with(Color::Green),
Some(TaskResult::Failed(exit_code)) => {
style(format!("with exit code {}", exit_code)).with(Color::Red)
}

View file

@ -73,7 +73,7 @@ fn add_task(message: AddMessage, sender: &Sender<Message>, state: &SharedState)
// Create a new group in case the user used a unknown group.
if let Some(group) = &task.group {
if let None = state.groups.get(group) {
if state.groups.get(group).is_none() {
return create_failure_message(format!(
"Tried to create task with unknown group '{}'",
group
@ -168,7 +168,7 @@ fn stash(task_ids: Vec<usize>, state: &SharedState) -> Message {
let text = "Tasks are stashed";
let response = compile_task_response(text, matching, mismatching);
return create_success_message(response);
create_success_message(response)
}
/// Enqueue specific stashed tasks.
@ -199,7 +199,7 @@ fn enqueue(message: EnqueueMessage, state: &SharedState) -> Message {
};
let response = compile_task_response(text.as_str(), matching, mismatching);
return create_success_message(response);
create_success_message(response)
}
/// Forward the start message to the task handler, which then starts the process(es).
@ -218,7 +218,7 @@ fn start(task_ids: Vec<usize>, sender: &Sender<Message>, state: &SharedState) ->
return create_success_message(response);
}
return create_success_message("Daemon and all tasks are being resumed.");
create_success_message("Daemon and all tasks are being resumed.")
}
/// Create and enqueue tasks from already finished tasks.
@ -258,7 +258,7 @@ fn restart(message: RestartMessage, sender: &Sender<Message>, state: &SharedStat
sender.send(Message::Start(new_ids)).expect(SENDER_ERR);
}
return create_success_message(response);
create_success_message(response)
}
/// Forward the pause message to the task handler, which then pauses the process(es).
@ -277,7 +277,7 @@ fn pause(message: PauseMessage, sender: &Sender<Message>, state: &SharedState) -
return create_success_message(response);
}
return create_success_message("Daemon and all tasks are being paused.");
create_success_message("Daemon and all tasks are being paused.")
}
/// Forward the kill message to the task handler, which then kills the process.
@ -297,7 +297,7 @@ fn kill(message: KillMessage, sender: &Sender<Message>, state: &SharedState) ->
return create_success_message(response);
}
return create_success_message("All tasks are being killed.");
create_success_message("All tasks are being killed.")
}
/// The message will be forwarded to the task handler, which then sends the user input to the process.
@ -320,7 +320,7 @@ fn send(message: SendMessage, sender: &Sender<Message>, state: &SharedState) ->
// Check whether the task exists and is running, abort if that's not the case.
sender.send(Message::Send(message)).expect(SENDER_ERR);
return create_success_message("Message is being send to the process.");
create_success_message("Message is being send to the process.")
}
/// If a user wants to edit a message, we need to send him the current command.
@ -342,9 +342,9 @@ fn edit_request(task_id: usize, state: &SharedState) -> Message {
command: task.command.clone(),
path: task.path.clone(),
};
return Message::EditResponse(message);
Message::EditResponse(message)
}
None => return create_failure_message("No task with this id."),
None => create_failure_message("No task with this id."),
}
}
@ -364,10 +364,10 @@ fn edit(message: EditMessage, state: &SharedState) -> Message {
task.path = message.path.clone();
state.save();
return create_success_message("Command has been updated");
create_success_message("Command has been updated")
}
None => {
return create_failure_message(format!(
create_failure_message(format!(
"Task to edit has gone away: {}",
message.task_id
))
@ -430,7 +430,7 @@ fn clean(state: &SharedState) -> Message {
state.save();
return create_success_message("All finished tasks have been removed");
create_success_message("All finished tasks have been removed")
}
/// Forward the reset request to the task handler.
@ -439,7 +439,7 @@ fn clean(state: &SharedState) -> Message {
fn reset(sender: &Sender<Message>, state: &SharedState) -> Message {
sender.send(Message::Reset).expect(SENDER_ERR);
clean(state);
return create_success_message("Everything is being reset right now.");
create_success_message("Everything is being reset right now.")
}
/// Return the current state.
@ -462,36 +462,33 @@ fn get_log(message: LogRequestMessage, state: &SharedState) -> Message {
let mut tasks = BTreeMap::new();
for task_id in task_ids.iter() {
match state.tasks.get(task_id) {
Some(task) => {
// We send log output and the task at the same time.
// This isn't as efficient as sending the raw compressed data directly,
// but it's a lot more convenient for now.
let (stdout, stderr) = if message.send_logs {
match read_and_compress_log_files(
*task_id,
&state.settings.daemon.pueue_directory,
) {
Ok((stdout, stderr)) => (Some(stdout), Some(stderr)),
Err(err) => {
return create_failure_message(format!(
"Failed reading process output file: {:?}",
err
));
}
if let Some(task) = state.tasks.get(task_id) {
// We send log output and the task at the same time.
// This isn't as efficient as sending the raw compressed data directly,
// but it's a lot more convenient for now.
let (stdout, stderr) = if message.send_logs {
match read_and_compress_log_files(
*task_id,
&state.settings.daemon.pueue_directory,
) {
Ok((stdout, stderr)) => (Some(stdout), Some(stderr)),
Err(err) => {
return create_failure_message(format!(
"Failed reading process output file: {:?}",
err
));
}
} else {
(None, None)
};
}
} else {
(None, None)
};
let task_log = TaskLogMessage {
task: task.clone(),
stdout,
stderr,
};
tasks.insert(*task_id, task_log);
}
None => {}
let task_log = TaskLogMessage {
task: task.clone(),
stdout,
stderr,
};
tasks.insert(*task_id, task_log);
}
}
Message::LogResponse(tasks)
@ -502,7 +499,7 @@ fn set_parallel_tasks(message: ParallelMessage, state: &SharedState) -> Message
let mut state = state.lock().unwrap();
// Set the default parallel tasks if no group is specified.
if let None = message.group {
if message.group.is_none() {
state.settings.daemon.default_parallel_tasks = message.parallel_tasks;
return create_success_message("Parallel tasks setting adjusted");
}
@ -527,8 +524,8 @@ fn set_parallel_tasks(message: ParallelMessage, state: &SharedState) -> Message
return create_failure_message(format!("Failed while saving the config file: {}", error));
}
return create_success_message(format!(
create_success_message(format!(
"Parallel tasks setting for group {} adjusted",
group
));
))
}

View file

@ -1,4 +1,4 @@
use ::anyhow::{bail, Error, Result};
use ::anyhow::{bail, Result};
use ::simplelog::{Config, LevelFilter, SimpleLogger};
use ::std::fs::create_dir_all;
use ::std::path::Path;
@ -26,7 +26,6 @@ async fn main() -> Result<()> {
let settings = Settings::new()?;
match settings.save() {
Err(error) => {
let error: Error = From::from(error);
bail!(error.context("Failed saving the config file"));
}
Ok(()) => {}
@ -68,7 +67,7 @@ async fn main() -> Result<()> {
}
/// Initialize all directories needed for normal operation.
fn init_directories(path: &String) {
fn init_directories(path: &str) {
let pueue_dir = Path::new(path);
if !pueue_dir.exists() {
if let Err(error) = create_dir_all(&pueue_dir) {

View file

@ -47,7 +47,7 @@ async fn handle_incoming(
let payload_bytes = receive_bytes(&mut socket).await?;
// Didn't receive any bytes. The client disconnected.
if payload_bytes.len() == 0 {
if payload_bytes.is_empty() {
info!("Client went away");
return Ok(());
}

View file

@ -12,7 +12,7 @@ use ::pueue::protocol::send_message;
/// Handle the continuous stream of a message.
pub async fn handle_show(
pueue_directory: &String,
pueue_directory: &str,
socket: &mut TcpStream,
message: StreamRequestMessage,
) -> Result<Message> {

View file

@ -128,7 +128,7 @@ impl TaskHandler {
}
}
return state
state
.tasks
.iter()
.filter(|(_, task)| task.status == TaskStatus::Queued)
@ -161,15 +161,14 @@ impl TaskHandler {
running < &state.settings.daemon.default_parallel_tasks
}
})
.filter(|(_, task)| {
.find(|(_, task)| {
// Check whether all dependencies for this task are fulfilled.
task.dependencies
.iter()
.flat_map(|id| state.tasks.get(id))
.all(|task| task.status == TaskStatus::Done)
})
.next()
.map(|(id, _)| *id);
.map(|(id, _)| *id)
}
/// See if we can start a new queued task.
@ -424,9 +423,8 @@ impl TaskHandler {
//match self.receiver.recv_timeout(timeout) {
std::thread::sleep(timeout);
match self.receiver.try_recv() {
Ok(message) => self.handle_message(message),
Err(_) => {}
if let Ok(message) = self.receiver.try_recv() {
self.handle_message(message);
};
}

View file

@ -7,7 +7,7 @@ use ::std::io::prelude::*;
use ::std::path::{Path, PathBuf};
/// Return the paths to temporary stdout and stderr files for a task.
pub fn get_log_paths(task_id: usize, path: &String) -> (PathBuf, PathBuf) {
pub fn get_log_paths(task_id: usize, path: &str) -> (PathBuf, PathBuf) {
let pueue_dir = Path::new(path).join("task_logs");
let out_path = pueue_dir.join(format!("{}_stdout.log", task_id));
let err_path = pueue_dir.join(format!("{}_stderr.log", task_id));
@ -15,7 +15,7 @@ pub fn get_log_paths(task_id: usize, path: &String) -> (PathBuf, PathBuf) {
}
/// Create and return the file handle for temporary stdout and stderr files for a task.
pub fn create_log_file_handles(task_id: usize, path: &String) -> Result<(File, File)> {
pub fn create_log_file_handles(task_id: usize, path: &str) -> Result<(File, File)> {
let (out_path, err_path) = get_log_paths(task_id, path);
let stdout = File::create(out_path)?;
let stderr = File::create(err_path)?;
@ -24,7 +24,7 @@ pub fn create_log_file_handles(task_id: usize, path: &String) -> Result<(File, F
}
/// Return the file handle for temporary stdout and stderr files for a task.
pub fn get_log_file_handles(task_id: usize, path: &String) -> Result<(File, File)> {
pub fn get_log_file_handles(task_id: usize, path: &str) -> Result<(File, File)> {
let (out_path, err_path) = get_log_paths(task_id, path);
let stdout = File::open(out_path)?;
let stderr = File::open(err_path)?;
@ -33,7 +33,7 @@ pub fn get_log_file_handles(task_id: usize, path: &String) -> Result<(File, File
}
/// Return the content of temporary stdout and stderr files for a task.
pub fn read_log_files(task_id: usize, path: &String) -> Result<(String, String)> {
pub fn read_log_files(task_id: usize, path: &str) -> Result<(String, String)> {
let (mut stdout_handle, mut stderr_handle) = get_log_file_handles(task_id, path)?;
let mut stdout_buffer = Vec::new();
let mut stderr_buffer = Vec::new();
@ -48,7 +48,7 @@ pub fn read_log_files(task_id: usize, path: &String) -> Result<(String, String)>
}
/// Remove temporary stdout and stderr files for a task.
pub fn clean_log_handles(task_id: usize, path: &String) {
pub fn clean_log_handles(task_id: usize, path: &str) {
let (out_path, err_path) = get_log_paths(task_id, path);
if let Err(err) = remove_file(out_path) {
error!(
@ -66,7 +66,7 @@ pub fn clean_log_handles(task_id: usize, path: &String) {
/// Return stdout and stderr of a finished process.
/// Task output is compressed using snap to save some memory and bandwidth.
pub fn read_and_compress_log_files(task_id: usize, path: &String) -> Result<(Vec<u8>, Vec<u8>)> {
pub fn read_and_compress_log_files(task_id: usize, path: &str) -> Result<(Vec<u8>, Vec<u8>)> {
let (mut stdout_handle, mut stderr_handle) = match get_log_file_handles(task_id, path) {
Ok((stdout, stderr)) => (stdout, stderr),
Err(err) => {

View file

@ -30,8 +30,7 @@ pub async fn send_bytes(payload: Vec<u8>, socket: &mut TcpStream) -> Result<()>
socket.write_all(&header).await?;
// Split the payload into 1.5kbyte chunks (MUT for TCP)
let mut iter = payload.chunks(1500);
while let Some(chunk) = iter.next() {
for chunk in payload.chunks(1500) {
socket.write_all(chunk).await?;
}

View file

@ -74,7 +74,7 @@ impl Settings {
let config_path = default_config_path()?;
let config_dir = config_path
.parent()
.ok_or(anyhow!("Couldn't resolve config dir"))?;
.ok_or_else(|| anyhow!("Couldn't resolve config dir"))?;
// Create the config dir, if it doesn't exist yet
if !config_dir.exists() {
@ -106,7 +106,7 @@ fn parse_config(settings: &mut Config) -> Result<()> {
}
fn get_home_dir() -> Result<PathBuf> {
dirs::home_dir().ok_or(anyhow!("Couldn't resolve home dir"))
dirs::home_dir().ok_or_else(|| anyhow!("Couldn't resolve home dir"))
}
fn gen_random_secret() -> String {

View file

@ -37,7 +37,7 @@ impl State {
settings: settings.clone(),
running: true,
tasks: BTreeMap::new(),
groups: groups,
groups,
};
state.restore();
state.save();
@ -67,11 +67,11 @@ impl State {
/// Check if the given group already exists.
/// If it doesn't exist yet, create a state entry and a new settings entry.
pub fn create_group(&mut self, group: &String) -> Result<()> {
if let None = self.settings.daemon.groups.get(group) {
pub fn create_group(&mut self, group: &str) -> Result<()> {
if self.settings.daemon.groups.get(group).is_none() {
self.settings.daemon.groups.insert(group.into(), 1);
}
if let None = self.groups.get(group) {
if self.groups.get(group).is_none() {
self.groups.insert(group.into(), true);
}
@ -81,7 +81,7 @@ impl State {
/// Remove a group.
/// Also go through all tasks and set the removed group to `None`.
pub fn remove_group(&mut self, group: &String) -> Result<()> {
pub fn remove_group(&mut self, group: &str) -> Result<()> {
self.settings.daemon.groups.remove(group);
self.groups.remove(group);
@ -178,18 +178,14 @@ impl State {
let serialized = serialized.unwrap();
let path = Path::new(&self.settings.daemon.pueue_directory);
let temp: PathBuf;
let real: PathBuf;
if log {
let (temp, real) = if log {
let path = path.join("log");
let now: DateTime<Utc> = Utc::now();
let time = now.format("%Y-%m-%d_%H-%M-%S");
temp = path.join(format!("{}_backup.json.partial", time));
real = path.join(format!("{}_state.json", time));
(path.join(format!("{}_backup.json.partial", time)), path.join(format!("{}_state.json", time)))
} else {
temp = path.join("state.json.partial");
real = path.join("state.json");
}
(path.join("state.json.partial"), path.join("state.json"))
};
// Write to temporary log file first, to prevent loss due to crashes.
if let Err(error) = fs::write(&temp, serialized) {

View file

@ -68,7 +68,7 @@ impl Task {
id: 0,
command,
path,
group: group,
group,
enqueue_at,
dependencies,
status: starting_status.clone(),
@ -96,11 +96,11 @@ impl Task {
}
pub fn is_running(&self) -> bool {
return self.status == TaskStatus::Running || self.status == TaskStatus::Paused;
self.status == TaskStatus::Running || self.status == TaskStatus::Paused
}
pub fn is_done(&self) -> bool {
return self.status == TaskStatus::Done;
self.status == TaskStatus::Done
}
// Check if the task errored.
@ -114,6 +114,6 @@ impl Task {
}
pub fn is_queued(&self) -> bool {
return self.status == TaskStatus::Queued || self.status == TaskStatus::Stashed;
self.status == TaskStatus::Queued || self.status == TaskStatus::Stashed
}
}