mirror of
https://github.com/nukesor/pueue
synced 2024-10-01 13:34:07 +00:00
change: Improve last lines logic
The logic for reading and streaming the last lines of a file has been improved. This is mostly relevant when reading a lot of lines in the daemon, which are then streamed to the client.
This commit is contained in:
parent
8ebb2d0ee1
commit
2a8e94eec4
|
@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
|||
|
||||
### Changed
|
||||
|
||||
- Improved memory footprint for reading partial remote logs.
|
||||
- The configuration for groups can no longer be done via configuration file.
|
||||
This means, that groups can only be edited, created or deleted via the commandline interface.
|
||||
- **Breaking changes:** The amount of parallel tasks will be reset to `1` for all groups.
|
||||
|
|
11
Cargo.lock
generated
11
Cargo.lock
generated
|
@ -1013,7 +1013,7 @@ dependencies = [
|
|||
"pretty_assertions",
|
||||
"rand 0.8.4",
|
||||
"rcgen",
|
||||
"rev_lines",
|
||||
"rev_buf_reader",
|
||||
"rustls",
|
||||
"rustls-pemfile",
|
||||
"serde",
|
||||
|
@ -1186,10 +1186,13 @@ dependencies = [
|
|||
]
|
||||
|
||||
[[package]]
|
||||
name = "rev_lines"
|
||||
version = "0.2.1"
|
||||
name = "rev_buf_reader"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "18eb52b6664d331053136fcac7e4883bdc6f5fc04a6aab3b0f75eafb80ab88b3"
|
||||
checksum = "5be7eaf8415a214da18dcf43c4db1c70394f68d77a0c6a20f33e95e7eba4f741"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
|
|
|
@ -28,7 +28,7 @@ tokio = { version="1", features=["macros", "net", "rt-multi-thread", "io-util"]
|
|||
tokio-rustls = "0.23"
|
||||
rustls = "0.20"
|
||||
rustls-pemfile = "0.2"
|
||||
rev_lines = "0.2"
|
||||
rev_buf_reader = "0.2"
|
||||
rcgen = "0.8"
|
||||
byteorder = "1"
|
||||
snap = "1"
|
||||
|
|
112
lib/src/log.rs
112
lib/src/log.rs
|
@ -1,8 +1,9 @@
|
|||
use std::fs::{read_dir, remove_file, File};
|
||||
use std::io::{self, BufReader, Cursor};
|
||||
use std::io::{self, prelude::*, Read, SeekFrom};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use log::error;
|
||||
use rev_buf_reader::RevBufReader;
|
||||
use snap::write::FrameEncoder;
|
||||
|
||||
use crate::error::Error;
|
||||
|
@ -66,20 +67,14 @@ pub fn read_and_compress_log_files(
|
|||
let mut stdout = Vec::new();
|
||||
let mut stderr = Vec::new();
|
||||
|
||||
// Move the cursor to the last few lines of both files.
|
||||
if let Some(lines) = lines {
|
||||
// Get the last few lines of both files
|
||||
let stdout_bytes = read_last_lines(&mut stdout_file, lines).into_bytes();
|
||||
let stderr_bytes = read_last_lines(&mut stderr_file, lines).into_bytes();
|
||||
let mut stdout_cursor = Cursor::new(stdout_bytes);
|
||||
let mut stderr_cursor = Cursor::new(stderr_bytes);
|
||||
seek_to_last_lines(&mut stdout_file, lines)?;
|
||||
seek_to_last_lines(&mut stderr_file, lines)?;
|
||||
}
|
||||
|
||||
// Compress the partial log input and pipe it into the snappy compressor
|
||||
let mut stdout_compressor = FrameEncoder::new(&mut stdout);
|
||||
io::copy(&mut stdout_cursor, &mut stdout_compressor)?;
|
||||
let mut stderr_compressor = FrameEncoder::new(&mut stderr);
|
||||
io::copy(&mut stderr_cursor, &mut stderr_compressor)?;
|
||||
} else {
|
||||
// Compress the full log input and pipe it into the snappy compressor
|
||||
// Compress the full log input and pipe it into the snappy compressor
|
||||
{
|
||||
let mut stdout_compressor = FrameEncoder::new(&mut stdout);
|
||||
io::copy(&mut stdout_file, &mut stdout_compressor)?;
|
||||
let mut stderr_compressor = FrameEncoder::new(&mut stderr);
|
||||
|
@ -131,20 +126,83 @@ pub fn reset_task_log_directory(path: &Path) -> Result<(), Error> {
|
|||
|
||||
/// Read the last `amount` lines of a file to a string.
|
||||
///
|
||||
/// TODO: This is super imperformant, but works as long as we don't use the last
|
||||
/// 1000 lines. It would be cleaner to seek to the beginning of the requested
|
||||
/// position and simply stream the content.
|
||||
/// Only use this for logic that doesn't stream from daemon to client!
|
||||
/// For streaming logic use the `seek_to_last_lines` and compress any data.
|
||||
// We allow this clippy check.
|
||||
// The iterators cannot be chained, as RevBufReader.lines doesn't implement the necessary traits.
|
||||
#[allow(clippy::needless_collect)]
|
||||
pub fn read_last_lines(file: &mut File, amount: usize) -> String {
|
||||
// Read the lines from the file.
|
||||
// Return a debug string if it fails.
|
||||
let last_lines: Vec<String> = match rev_lines::RevLines::new(BufReader::new(file)) {
|
||||
Ok(lines) => lines.take(amount).collect(),
|
||||
Err(error) => return format!("(Pueue error) Failed to read last lines of file: {}", error),
|
||||
};
|
||||
let reader = RevBufReader::new(file);
|
||||
|
||||
last_lines
|
||||
.into_iter()
|
||||
.rev()
|
||||
.collect::<Vec<String>>()
|
||||
.join("\n")
|
||||
let lines: Vec<String> = reader
|
||||
.lines()
|
||||
.take(amount)
|
||||
.map(|line| line.unwrap_or_else(|_| "Failed to read line.".to_string()))
|
||||
.collect();
|
||||
|
||||
lines.into_iter().rev().collect::<Vec<String>>().join("\n")
|
||||
}
|
||||
|
||||
/// Seek the cursor of the current file to the beginning of the line that's located `amount` newlines
|
||||
/// from the back of the file.
|
||||
pub fn seek_to_last_lines(file: &mut File, amount: usize) -> Result<(), Error> {
|
||||
let mut reader = RevBufReader::new(file);
|
||||
// The position from which the RevBufReader starts reading.
|
||||
// The file size might change while we're reading the file. Hence we have to save it now.
|
||||
let start_position = reader.get_mut().seek(SeekFrom::Current(0))?;
|
||||
let start_position: i64 = start_position.try_into().map_err(|_| {
|
||||
Error::Generic("Failed to convert start cursor position to i64".to_string())
|
||||
})?;
|
||||
|
||||
let mut total_read_bytes: i64 = 0;
|
||||
let mut found_lines = 0;
|
||||
|
||||
// Read in 4KB chunks until there's either nothing left or we found `amount` newline characters.
|
||||
'outer: loop {
|
||||
let mut buffer = vec![0; 4096];
|
||||
let read_bytes = reader.read(&mut buffer)?;
|
||||
|
||||
// Return if there's nothing left to read.
|
||||
// We hit the start of the file and read fewer lines then specified.
|
||||
if read_bytes == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
// Check each byte for a newline.
|
||||
// Even though the RevBufReader reads from behind, the bytes in the buffer are still in forward
|
||||
// order. Since we want to scan from the back, we have to reverse the buffer
|
||||
for byte in buffer[0..read_bytes].iter().rev() {
|
||||
total_read_bytes += 1;
|
||||
if *byte != b'\n' {
|
||||
continue;
|
||||
}
|
||||
|
||||
// We found a newline.
|
||||
found_lines += 1;
|
||||
|
||||
// We haven't visited the requested amount of lines yet.
|
||||
if found_lines != amount + 1 {
|
||||
continue;
|
||||
}
|
||||
|
||||
// The RevBufReader most likely already went past this point.
|
||||
// That's why we have to set the cursor to the position of the last newline.
|
||||
// Calculate the distance from the start to the desired location.
|
||||
let distance_to_file_start = start_position - total_read_bytes + 1;
|
||||
// Cast it to u64. If it somehow became negative, just seek to the start of the
|
||||
// file.
|
||||
let distance_to_file_start: u64 = distance_to_file_start.try_into().unwrap_or(0);
|
||||
|
||||
// We can safely unwrap `start_position`, as we previously casted it from an u64.
|
||||
if distance_to_file_start < start_position.try_into().unwrap() {
|
||||
// Seek to the position.
|
||||
let file = reader.get_mut();
|
||||
file.seek(SeekFrom::Start(distance_to_file_start))?;
|
||||
}
|
||||
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue