Using brotli

This commit is contained in:
Arne Beer 2020-04-30 21:09:24 +02:00
parent 6a6a2f09d8
commit 01eadeb473
9 changed files with 193 additions and 50 deletions

View file

@ -1,4 +1,18 @@
# v0.3.0
# v0.4.0
**Features:**
- Add `--after [ids]` flag. Task with this flag will only start once all specified dependencies finished successfully
Tasks with failed dependencies will fail as well.
- New state `FailedToStart`. Used if the process cannot be started
- New state `DependencyFailed`. Used if any dependency of a task fails.
**Improvements:**
- Process log output is no longer stored in memory. This significantly reduced RAM usage for large log outputs.
- Process log output is compressed in-memory on read from disk. This leads to reduced bandwidth and RAM usage.
**Changes:**
- Pueue no longer stores log output in it's backup files
# v0.3.1
**Fixes:**
- Set `start` for processes. (Seems to have broken in 0.2.0)

51
Cargo.lock generated
View file

@ -8,6 +8,19 @@ dependencies = [
"memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "alloc-no-stdlib"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "alloc-stdlib"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"alloc-no-stdlib 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "ansi_term"
version = "0.11.0"
@ -133,6 +146,11 @@ name = "base64"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "base64"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bit-set"
version = "0.5.1"
@ -174,6 +192,25 @@ dependencies = [
"slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "brotli"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"alloc-no-stdlib 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"alloc-stdlib 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"brotli-decompressor 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "brotli-decompressor"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"alloc-no-stdlib 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"alloc-stdlib 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "bytecount"
version = "0.4.0"
@ -820,6 +857,8 @@ version = "0.3.2-alpha.0"
dependencies = [
"anyhow 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)",
"async-std 1.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"base64 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"brotli 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
@ -837,6 +876,7 @@ dependencies = [
"serde_json 1.0.48 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_yaml 0.8.11 (registry+https://github.com/rust-lang/crates.io-index)",
"simplelog 0.7.4 (registry+https://github.com/rust-lang/crates.io-index)",
"snap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"structopt 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)",
"strum 0.18.0 (registry+https://github.com/rust-lang/crates.io-index)",
"strum_macros 0.18.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1249,6 +1289,11 @@ name = "smallvec"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "snap"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "static_assertions"
version = "0.3.4"
@ -1490,6 +1535,8 @@ dependencies = [
[metadata]
"checksum aho-corasick 0.7.10 (registry+https://github.com/rust-lang/crates.io-index)" = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada"
"checksum alloc-no-stdlib 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5192ec435945d87bc2f70992b4d818154b5feede43c09fb7592146374eac90a6"
"checksum alloc-stdlib 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "697ed7edc0f1711de49ce108c541623a0af97c6c60b2f6e2b65229847ac843c2"
"checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
"checksum anyhow 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)" = "013a6e0a2cbe3d20f9c60b65458f7a7f7a5e636c5d0f45a5a6aee5d4b1f01785"
"checksum arc-swap 0.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "d663a8e9a99154b5fb793032533f6328da35e23aac63d5c152279aa8ba356825"
@ -1505,11 +1552,14 @@ dependencies = [
"checksum backtrace 0.3.46 (registry+https://github.com/rust-lang/crates.io-index)" = "b1e692897359247cc6bb902933361652380af0f1b7651ae5c5013407f30e109e"
"checksum backtrace-sys 0.1.35 (registry+https://github.com/rust-lang/crates.io-index)" = "7de8aba10a69c8e8d7622c5710229485ec32e9d55fdad160ea559c086fdcd118"
"checksum base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
"checksum base64 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7d5ca2cd0adc3f48f9e9ea5a6bbdf9ccc0bfade884847e484d452414c7ccffb3"
"checksum bit-set 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e84c238982c4b1e1ee668d136c510c67a13465279c0cb367ea6baf6310620a80"
"checksum bit-vec 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f59bbe95d4e52a6398ec21238d31577f2b28a9d86807f06ca59d191d8440d0bb"
"checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
"checksum blake2b_simd 0.5.10 (registry+https://github.com/rust-lang/crates.io-index)" = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a"
"checksum broadcaster 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d9c972e21e0d055a36cf73e4daae870941fe7a8abcd5ac3396aab9e4c126bd87"
"checksum brotli 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7f29919120f08613aadcd4383764e00526fc9f18b6c0895814faeed0dd78613e"
"checksum brotli-decompressor 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2a9f2b517b96b19d8f91c1ff5b1cf498e688850b32eae5d58e02d15c4d4fdc0c"
"checksum bytecount 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b92204551573580e078dc80017f36a213eb77a0450e4ddd8cfa0f3f2d1f0178f"
"checksum byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
"checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1"
@ -1634,6 +1684,7 @@ dependencies = [
"checksum skeptic 0.13.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d6fb8ed853fdc19ce09752d63f3a2e5b5158aeb261520cd75eb618bd60305165"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
"checksum smallvec 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5c2fb2ec9bcd216a5b0d0ccf31ab17b5ed1d627960edff65bbe95d3ce221cefc"
"checksum snap 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f7fb9b0bb877b35a1cc1474a3b43d9c226a2625311760cdda2cbccbc0c7a8376"
"checksum static_assertions 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "7f3eb36b47e512f8f1c9e3d10c2c1965bc992bd9cdb024fa581e2194501c83d3"
"checksum strsim 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
"checksum structopt 0.3.12 (registry+https://github.com/rust-lang/crates.io-index)" = "c8faa2719539bbe9d77869bfb15d4ee769f99525e707931452c97b693b3f159d"

View file

@ -37,7 +37,10 @@ strum = "^0.18"
strum_macros = "^0.18"
bytes = "^0.5"
base64 = "^0.12"
byteorder = "^1"
brotli = "^3.3"
snap = "1"
serde = "^1.0"
serde_json = "^1.0"
serde_yaml = "^0.8"

View file

@ -1,11 +1,16 @@
use ::anyhow::Result;
use ::base64::read::DecoderReader;
use ::brotli::BrotliDecompress;
use ::comfy_table::presets::UTF8_HORIZONTAL_BORDERS_ONLY;
use ::comfy_table::*;
use ::crossterm::style::style;
use ::std::collections::BTreeMap;
use ::std::io;
use ::std::string::ToString;
use ::pueue::message::TaskLogMessage;
use ::pueue::state::State;
use ::pueue::task::{TaskLog, TaskResult, TaskStatus};
use ::pueue::task::{TaskResult, TaskStatus};
use crate::cli::SubCommand;
@ -162,7 +167,7 @@ pub fn print_state(state: State, cli_command: &SubCommand) {
/// Print the log ouput of finished tasks.
/// Either print the logs of every task
/// or only print the logs of the specified tasks.
pub fn print_logs(task_logs: BTreeMap<usize, TaskLog>, cli_command: &SubCommand) {
pub fn print_logs(mut task_logs: BTreeMap<usize, TaskLogMessage>, cli_command: &SubCommand) {
let (json, task_ids) = match cli_command {
SubCommand::Log { json, task_ids } => (*json, task_ids.clone()),
_ => panic!(
@ -185,9 +190,9 @@ pub fn print_logs(task_logs: BTreeMap<usize, TaskLog>, cli_command: &SubCommand)
return;
}
let mut task_iter = task_logs.iter().peekable();
while let Some((_, task_log)) = task_iter.next() {
print_log(task_log);
let mut task_iter = task_logs.iter_mut().peekable();
while let Some((_, mut task_log)) = task_iter.next() {
print_log(&mut task_log);
// Add a newline if there is another task that's going to be printed
if let Some((_, task_log)) = task_iter.peek() {
@ -199,7 +204,7 @@ pub fn print_logs(task_logs: BTreeMap<usize, TaskLog>, cli_command: &SubCommand)
}
/// Print the log of a single task.
pub fn print_log(task_log: &TaskLog) {
pub fn print_log(task_log: &mut TaskLogMessage) {
let task = &task_log.task;
// We only show logs of finished tasks
if task.status != TaskStatus::Done {
@ -236,22 +241,56 @@ pub fn print_log(task_log: &TaskLog) {
}
if !task_log.stdout.is_empty() {
println!(
"{}",
style("Std_out: ")
.with(Color::Green)
.attribute(Attribute::Bold)
);
println!("{}", task_log.stdout);
if let Err(err) = print_task_stdout(task_log) {
println!("Error while parsing stdout: {}", err);
}
}
if !task_log.stderr.is_empty() {
println!(
"{}",
style("Std_err: ")
.with(Color::Red)
.attribute(Attribute::Bold)
);
println!("{}", task_log.stderr);
if let Err(err) = print_task_stderr(task_log) {
println!("Error while parsing stderr: {}", err);
};
}
}
/// Pritn the stdout of a finished process
/// The logs are compressed using Brotli and then encoded to Base64
pub fn print_task_stdout(task_log: &mut TaskLogMessage) -> Result<()> {
let mut bytes = task_log.stdout.as_bytes();
// Minimum empty base64 encoded message length
if bytes.len() <= 4 {
return Ok(());
}
println!(
"{}",
style("Std_out: ")
.with(Color::Green)
.attribute(Attribute::Bold)
);
let mut stderr_base64_decoder = DecoderReader::new(&mut bytes, base64::STANDARD);
BrotliDecompress(&mut stderr_base64_decoder, &mut io::stdout())?;
Ok(())
}
/// Print the stderr of a finished process
/// The logs are compressed using Brotli and then encoded to Base64
pub fn print_task_stderr(task_log: &mut TaskLogMessage) -> Result<()> {
let mut bytes = task_log.stderr.as_bytes();
// Minimum empty base64 encoded message length
if bytes.len() <= 4 {
return Ok(());
}
println!(
"{}",
style("Std_err: ")
.with(Color::Red)
.attribute(Attribute::Bold)
);
let mut stderr_base64_decoder = DecoderReader::new(&mut bytes, base64::STANDARD);
BrotliDecompress(&mut stderr_base64_decoder, &mut io::stdout())?;
Ok(())
}

View file

@ -1,11 +1,11 @@
use ::std::collections::BTreeMap;
use ::std::sync::mpsc::Sender;
use crate::log::{clean_log_handles, read_log_files};
use crate::log::{clean_log_handles, read_log_files_to_compressed_base64};
use crate::response_helper::*;
use ::pueue::message::*;
use ::pueue::state::SharedState;
use ::pueue::task::{Task, TaskStatus, TaskLog};
use ::pueue::task::{Task, TaskStatus};
static SENDER_ERR: &str = "Failed to send message to task handler thread";
@ -27,7 +27,7 @@ pub fn handle_message(message: Message, sender: &Sender<Message>, state: &Shared
Message::Edit(message) => edit(message, state),
Message::Clean => clean(state),
Message::Reset => reset(sender),
Message::Reset => reset(sender, state),
Message::Status => get_status(state),
Message::Log(task_ids) => get_log(task_ids, state),
Message::Parallel(amount) => set_parallel_tasks(amount, sender),
@ -364,8 +364,9 @@ fn clean(state: &SharedState) -> Message {
/// Forward the reset request to the task handler
/// The handler then kills all children and clears the task queue
fn reset(sender: &Sender<Message>) -> Message {
fn reset(sender: &Sender<Message>, state: &SharedState) -> Message {
sender.send(Message::Reset).expect(SENDER_ERR);
clean(state);
return create_success_message("Everything is being reset right now.");
}
@ -389,11 +390,18 @@ fn get_log(mut task_ids: Vec<usize>, state: &SharedState) -> Message {
for task_id in task_ids.iter() {
match state.tasks.get(task_id) {
Some(task) => {
let (stdout, stderr) = match read_log_files(*task_id, &state.settings) {
Ok((stdout, stderr)) => (stdout, stderr),
Err(err) => (String::new(), format!("Failed reading process output file: {:?}", err)),
};
let task_log = TaskLog {
// We send log output and the task at the same time
// This isn't as efficient as sending the raw compressed data directly,
// but it's a lot more convenient for now.
let (stdout, stderr) =
match read_log_files_to_compressed_base64(*task_id, &state.settings) {
Ok((stdout, stderr)) => (stdout, stderr),
Err(err) => (
String::new(),
format!("Failed reading process output file: {:?}", err),
),
};
let task_log = TaskLogMessage {
task: task.clone(),
stdout,
stderr,

View file

@ -1,4 +1,6 @@
use ::anyhow::Result;
use ::base64::write::EncoderWriter;
use ::brotli::enc::{BrotliCompress, BrotliEncoderParams};
use ::log::error;
use ::std::fs::{remove_file, File};
use ::std::io::prelude::*;
@ -63,3 +65,36 @@ pub fn clean_log_handles(task_id: usize, settings: &Settings) {
);
};
}
/// Return stdout and stderr of a finished process
/// Everything is compressed using Brotli and then encoded to Base64
pub fn read_log_files_to_compressed_base64(
task_id: usize,
settings: &Settings,
) -> Result<(String, String)> {
let (mut stdout_handle, mut stderr_handle) = match get_log_file_handles(task_id, settings) {
Ok((stdout, stderr)) => (stdout, stderr),
Err(err) => {
return Ok((String::new(), format!("Error while opening the output files: {}", err)));
},
};
let stdout_len = stdout_handle.metadata()?.len();
let stderr_len = stdout_handle.metadata()?.len();
let mut stdout = Vec::new();
let mut stderr = Vec::new();
{
// Base64 encode for easier handling of compressed bytes
let mut stdout_base64 = EncoderWriter::new(&mut stdout, base64::STANDARD);
let mut stderr_base64 = EncoderWriter::new(&mut stderr, base64::STANDARD);
// Compress log input and pipe it into the base64 encoder
let mut params = BrotliEncoderParams::default();
params.quality = 4;
params.quality = 4;
BrotliCompress(&mut stdout_handle, &mut stdout_base64, &params)?;
BrotliCompress(&mut stderr_handle, &mut stderr_base64, &params)?;
}
Ok((String::from_utf8(stdout)?, String::from_utf8(stderr)?))
}

View file

@ -65,19 +65,14 @@ pub async fn handle_show(
} else {
// The client requested a one-shot execution
// Simply read the file and send the current stdout/stderr once
let stdout: String;
let stderr: String;
match read_log_files(message.task_id, &settings) {
let (stdout, stderr) = match read_log_files(message.task_id, &settings) {
Err(_) => {
return Ok(create_failure_message(
"Couldn't find output files for task. Maybe it finished? Try `log`",
))
}
Ok((stdout_text, stderr_text)) => {
stdout = stdout_text;
stderr = stderr_text;
}
}
Ok((stdout, stderr)) => (stdout, stderr),
};
let response = format!("Stdout:\n{}\n\nStderr:\n{}", stdout, stderr);

View file

@ -3,7 +3,7 @@ use ::serde_derive::{Deserialize, Serialize};
use ::std::collections::BTreeMap;
use crate::state::State;
use crate::task::TaskLog;
use crate::task::Task;
/// The Message used to add a new command to the daemon.
#[derive(Serialize, Deserialize, Debug, Clone)]
@ -27,7 +27,7 @@ pub enum Message {
Status,
StatusResponse(State),
Log(Vec<usize>),
LogResponse(BTreeMap<usize, TaskLog>),
LogResponse(BTreeMap<usize, TaskLogMessage>),
Stream(String),
StreamRequest(StreamRequestMessage),
Reset,
@ -109,6 +109,14 @@ pub struct StreamRequestMessage {
pub err: bool,
}
/// Helper struct for sending tasks and their log output to the client
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TaskLogMessage {
pub task: Task,
pub stdout: String,
pub stderr: String,
}
pub fn create_success_message<T: ToString>(text: T) -> Message {
Message::Success(text.to_string())
}

View file

@ -113,13 +113,3 @@ impl Task {
return self.status == TaskStatus::Queued || self.status == TaskStatus::Stashed;
}
}
/// Helper struct for sending tasks and their log output to the client
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct TaskLog {
pub task: Task,
pub stdout: String,
pub stderr: String,
}