du: start printing output immediately (#5552)

* du: very rough draft of continuously printing output

* du: clean up printing logic, still needs some polishing

* du: gracefully handle case where `du` returns no `Stat`s

* du: print output using separate thread

* du: clean up print thread implementation

* du: send ownership of `Stat`s to printing thread as soon as `du` is done with them

* du: add basic error handling for communication between threads, use `StatPrinter` to handle printing thread logic

* du: move printing grand total into `StatPrinter`, and move initialization of printing-related variables into `StatPrinter::new`

* du: clean up calculation of `convert_size` function, and separate printing a single stat our into its own method in `StatPrinter`

* du: have printing thread handle printing IO-related errors, to ensure error messages and regular output message are written one at a time

* du: add comment explaining print thread, remove outdated comments and clippy allows

* du: restore clippy allows for cognitive complexity

---------

Co-authored-by: clint <cteece3@gatech.edu>
This commit is contained in:
Clint Teece 2023-11-25 09:13:12 -05:00 committed by GitHub
parent 64d47730bc
commit 84b5e6f0a1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -16,8 +16,6 @@ use std::fs::File;
use std::fs::Metadata;
use std::io::BufRead;
use std::io::BufReader;
use std::io::Result;
use std::iter;
#[cfg(not(windows))]
use std::os::unix::fs::MetadataExt;
#[cfg(windows)]
@ -27,15 +25,17 @@ use std::os::windows::io::AsRawHandle;
use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, UNIX_EPOCH};
use std::{error::Error, fmt::Display};
use uucore::display::{print_verbatim, Quotable};
use uucore::error::FromIo;
use uucore::error::{set_exit_code, UError, UResult, USimpleError};
use uucore::error::{UError, UResult, USimpleError};
use uucore::line_ending::LineEnding;
use uucore::parse_glob;
use uucore::parse_size::{parse_size_u64, ParseSizeError};
use uucore::{format_usage, help_about, help_section, help_usage, show, show_error, show_warning};
use uucore::{format_usage, help_about, help_section, help_usage, show, show_warning};
#[cfg(windows)]
use windows_sys::Win32::Foundation::HANDLE;
#[cfg(windows)]
@ -81,6 +81,7 @@ const USAGE: &str = help_usage!("du.md");
// TODO: Support Z & Y (currently limited by size of u64)
const UNITS: [(char, u32); 6] = [('E', 6), ('P', 5), ('T', 4), ('G', 3), ('M', 2), ('K', 1)];
#[derive(Clone)]
struct Options {
all: bool,
max_depth: Option<usize>,
@ -93,7 +94,7 @@ struct Options {
verbose: bool,
}
#[derive(PartialEq)]
#[derive(PartialEq, Clone)]
enum Deref {
All,
Args(Vec<PathBuf>),
@ -119,7 +120,7 @@ struct Stat {
}
impl Stat {
fn new(path: &Path, options: &Options) -> Result<Self> {
fn new(path: &Path, options: &Options) -> std::io::Result<Self> {
// Determine whether to dereference (follow) the symbolic link
let should_dereference = match &options.dereference {
Deref::All => true,
@ -290,7 +291,6 @@ fn choose_size(matches: &ArgMatches, stat: &Stat) -> u64 {
}
// this takes `my_stat` to avoid having to stat files multiple times.
// XXX: this should use the impl Trait return type when it is stabilized
#[allow(clippy::cognitive_complexity)]
fn du(
mut my_stat: Stat,
@ -298,18 +298,16 @@ fn du(
depth: usize,
seen_inodes: &mut HashSet<FileInfo>,
exclude: &[Pattern],
) -> Box<dyn DoubleEndedIterator<Item = Stat>> {
let mut stats = vec![];
let mut futures = vec![];
print_tx: &mpsc::Sender<UResult<StatPrintInfo>>,
) -> Result<Stat, Box<mpsc::SendError<UResult<StatPrintInfo>>>> {
if my_stat.is_dir {
let read = match fs::read_dir(&my_stat.path) {
Ok(read) => read,
Err(e) => {
show!(
e.map_err_context(|| format!("cannot read directory {}", my_stat.path.quote()))
);
return Box::new(iter::once(my_stat));
print_tx.send(Err(e.map_err_context(|| {
format!("cannot read directory {}", my_stat.path.quote())
})))?;
return Ok(my_stat);
}
};
@ -354,44 +352,48 @@ fn du(
}
}
}
futures.push(du(
let this_stat = du(
this_stat,
options,
depth + 1,
seen_inodes,
exclude,
));
print_tx,
)?;
if !options.separate_dirs {
my_stat.size += this_stat.size;
my_stat.blocks += this_stat.blocks;
my_stat.inodes += this_stat.inodes;
}
print_tx.send(Ok(StatPrintInfo {
stat: this_stat,
depth: depth + 1,
}))?;
} else {
my_stat.size += this_stat.size;
my_stat.blocks += this_stat.blocks;
my_stat.inodes += 1;
if options.all {
stats.push(this_stat);
print_tx.send(Ok(StatPrintInfo {
stat: this_stat,
depth: depth + 1,
}))?;
}
}
}
Err(e) => show!(
e.map_err_context(|| format!("cannot access {}", entry.path().quote()))
),
Err(e) => print_tx.send(Err(e.map_err_context(|| {
format!("cannot access {}", entry.path().quote())
})))?,
}
}
Err(error) => show_error!("{}", error),
Err(error) => print_tx.send(Err(error.into()))?,
}
}
}
stats.extend(futures.into_iter().flatten().filter(|stat| {
if !options.separate_dirs && stat.path.parent().unwrap() == my_stat.path {
my_stat.size += stat.size;
my_stat.blocks += stat.blocks;
my_stat.inodes += stat.inodes;
}
options
.max_depth
.map_or(true, |max_depth| depth < max_depth)
}));
stats.push(my_stat);
Box::new(stats.into_iter())
Ok(my_stat)
}
fn convert_size_human(size: u64, multiplier: u64, _block_size: u64) -> String {
@ -426,7 +428,7 @@ fn convert_size_other(size: u64, _multiplier: u64, block_size: u64) -> String {
format!("{}", ((size as f64) / (block_size as f64)).ceil())
}
fn get_convert_size_fn(matches: &ArgMatches) -> Box<dyn Fn(u64, u64, u64) -> String> {
fn get_convert_size_fn(matches: &ArgMatches) -> Box<dyn Fn(u64, u64, u64) -> String + Send> {
if matches.get_flag(options::HUMAN_READABLE) || matches.get_flag(options::SI) {
Box::new(convert_size_human)
} else if matches.get_flag(options::BYTES) {
@ -532,6 +534,137 @@ fn build_exclude_patterns(matches: &ArgMatches) -> UResult<Vec<Pattern>> {
Ok(exclude_patterns)
}
struct StatPrintInfo {
stat: Stat,
depth: usize,
}
struct StatPrinter {
matches: ArgMatches,
threshold: Option<Threshold>,
summarize: bool,
time_format_str: String,
line_ending: LineEnding,
options: Options,
convert_size: Box<dyn Fn(u64) -> String + Send>,
}
impl StatPrinter {
fn new(matches: ArgMatches, options: Options, summarize: bool) -> UResult<Self> {
let block_size = read_block_size(
matches
.get_one::<String>(options::BLOCK_SIZE)
.map(|s| s.as_str()),
)?;
let multiplier: u64 = if matches.get_flag(options::SI) {
1000
} else {
1024
};
let convert_size_fn = get_convert_size_fn(&matches);
let convert_size: Box<dyn Fn(u64) -> String + Send> = if options.inodes {
Box::new(|size: u64| size.to_string())
} else {
Box::new(move |size: u64| convert_size_fn(size, multiplier, block_size))
};
let threshold = match matches.get_one::<String>(options::THRESHOLD) {
Some(s) => match Threshold::from_str(s) {
Ok(t) => Some(t),
Err(e) => {
return Err(USimpleError::new(
1,
format_error_message(&e, s, options::THRESHOLD),
))
}
},
None => None,
};
let time_format_str =
parse_time_style(matches.get_one::<String>("time-style").map(|s| s.as_str()))?
.to_string();
let line_ending = LineEnding::from_zero_flag(matches.get_flag(options::NULL));
Ok(Self {
matches,
threshold,
summarize,
time_format_str,
line_ending,
options,
convert_size,
})
}
fn print_stats(&self, rx: &mpsc::Receiver<UResult<StatPrintInfo>>) -> UResult<()> {
let mut grand_total = 0;
loop {
let received = rx.recv();
match received {
Ok(message) => match message {
Ok(stat_info) => {
let size = choose_size(&self.matches, &stat_info.stat);
if stat_info.depth == 0 {
grand_total += size;
}
if !self
.threshold
.map_or(false, |threshold| threshold.should_exclude(size))
&& self
.options
.max_depth
.map_or(true, |max_depth| stat_info.depth <= max_depth)
&& (!self.summarize || stat_info.depth == 0)
{
self.print_stat(&stat_info.stat, size)?;
}
}
Err(e) => show!(e),
},
Err(_) => break,
}
}
if self.options.total {
print!("{}\ttotal", (self.convert_size)(grand_total));
print!("{}", self.line_ending);
}
Ok(())
}
fn print_stat(&self, stat: &Stat, size: u64) -> UResult<()> {
if self.matches.contains_id(options::TIME) {
let tm = {
let secs = self
.matches
.get_one::<String>(options::TIME)
.map(|s| get_time_secs(s, stat))
.transpose()?
.unwrap_or(stat.modified);
DateTime::<Local>::from(UNIX_EPOCH + Duration::from_secs(secs))
};
let time_str = tm.format(&self.time_format_str).to_string();
print!("{}\t{}\t", (self.convert_size)(size), time_str);
} else {
print!("{}\t", (self.convert_size)(size));
}
print_verbatim(&stat.path).unwrap();
print!("{}", self.line_ending);
Ok(())
}
}
#[uucore::main]
#[allow(clippy::cognitive_complexity)]
pub fn uumain(args: impl uucore::Args) -> UResult<()> {
@ -582,49 +715,13 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
show_warning!("options --apparent-size and -b are ineffective with --inodes");
}
let block_size = read_block_size(
matches
.get_one::<String>(options::BLOCK_SIZE)
.map(|s| s.as_str()),
)?;
let threshold = match matches.get_one::<String>(options::THRESHOLD) {
Some(s) => match Threshold::from_str(s) {
Ok(t) => Some(t),
Err(e) => {
return Err(USimpleError::new(
1,
format_error_message(&e, s, options::THRESHOLD),
))
}
},
None => None,
};
let multiplier: u64 = if matches.get_flag(options::SI) {
1000
} else {
1024
};
let convert_size_fn = get_convert_size_fn(&matches);
let convert_size = |size: u64| {
if options.inodes {
size.to_string()
} else {
convert_size_fn(size, multiplier, block_size)
}
};
let time_format_str =
parse_time_style(matches.get_one::<String>("time-style").map(|s| s.as_str()))?;
let line_ending = LineEnding::from_zero_flag(matches.get_flag(options::NULL));
// Use separate thread to print output, so we can print finished results while computation is still running
let stat_printer = StatPrinter::new(matches.clone(), options.clone(), summarize)?;
let (print_tx, rx) = mpsc::channel::<UResult<StatPrintInfo>>();
let printing_thread = thread::spawn(move || stat_printer.print_stats(&rx));
let excludes = build_exclude_patterns(&matches)?;
let mut grand_total = 0;
'loop_file: for path in files {
// Skip if we don't want to ignore anything
if !&excludes.is_empty() {
@ -647,63 +744,35 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
if let Some(inode) = stat.inode {
seen_inodes.insert(inode);
}
let iter = du(stat, &options, 0, &mut seen_inodes, &excludes);
let stat = du(stat, &options, 0, &mut seen_inodes, &excludes, &print_tx)
.map_err(|e| USimpleError::new(1, e.to_string()))?;
// Sum up all the returned `Stat`s and display results
let (_, len) = iter.size_hint();
let len = len.unwrap();
for (index, stat) in iter.enumerate() {
let size = choose_size(&matches, &stat);
if threshold.map_or(false, |threshold| threshold.should_exclude(size)) {
continue;
}
if matches.contains_id(options::TIME) {
let tm = {
let secs = matches
.get_one::<String>(options::TIME)
.map(|s| get_time_secs(s, &stat))
.transpose()?
.unwrap_or(stat.modified);
DateTime::<Local>::from(UNIX_EPOCH + Duration::from_secs(secs))
};
if !summarize || index == len - 1 {
let time_str = tm.format(time_format_str).to_string();
print!("{}\t{}\t", convert_size(size), time_str);
print_verbatim(stat.path).unwrap();
print!("{line_ending}");
}
} else if !summarize || index == len - 1 {
print!("{}\t", convert_size(size));
print_verbatim(stat.path).unwrap();
print!("{line_ending}");
}
if options.total && index == (len - 1) {
// The last element will be the total size of the the path under
// path_string. We add it to the grand total.
grand_total += size;
}
}
print_tx
.send(Ok(StatPrintInfo { stat, depth: 0 }))
.map_err(|e| USimpleError::new(1, e.to_string()))?;
} else {
show_error!(
"{}: {}",
path.to_string_lossy().maybe_quote(),
"No such file or directory"
);
set_exit_code(1);
print_tx
.send(Err(USimpleError::new(
1,
format!(
"{}: No such file or directory",
path.to_string_lossy().maybe_quote()
),
)))
.map_err(|e| USimpleError::new(1, e.to_string()))?;
}
}
if options.total {
print!("{}\ttotal", convert_size(grand_total));
print!("{line_ending}");
}
drop(print_tx);
printing_thread
.join()
.map_err(|_| USimpleError::new(1, "Printing thread panicked."))??;
Ok(())
}
fn get_time_secs(s: &str, stat: &Stat) -> std::result::Result<u64, DuError> {
fn get_time_secs(s: &str, stat: &Stat) -> Result<u64, DuError> {
let secs = match s {
"ctime" | "status" => stat.modified,
"access" | "atime" | "use" => stat.accessed,
@ -966,7 +1035,7 @@ enum Threshold {
impl FromStr for Threshold {
type Err = ParseSizeError;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
fn from_str(s: &str) -> Result<Self, Self::Err> {
let offset = usize::from(s.starts_with(&['-', '+'][..]));
let size = parse_size_u64(&s[offset..])?;