sort: implement --compress-program

This commit is contained in:
Michael Debertol 2021-06-06 12:07:06 +02:00
parent 5a5c7c5a34
commit 8d213219c7
4 changed files with 115 additions and 15 deletions

View file

@ -12,8 +12,12 @@
//! The buffers for the individual chunks are recycled. There are two buffers.
use std::cmp::Ordering;
use std::fs::File;
use std::io::BufReader;
use std::io::{BufWriter, Write};
use std::path::Path;
use std::process::Child;
use std::process::{Command, Stdio};
use std::{
fs::OpenOptions,
io::Read,
@ -25,6 +29,7 @@ use itertools::Itertools;
use tempfile::TempDir;
use crate::Line;
use crate::{
chunks::{self, Chunk},
compare_by, merge, output_sorted_lines, sort_by, GlobalSettings,
@ -63,10 +68,31 @@ pub fn ext_sort(files: &mut impl Iterator<Item = Box<dyn Read + Send>>, settings
);
match read_result {
ReadResult::WroteChunksToFile { chunks_written } => {
let files = (0..chunks_written)
.map(|chunk_num| tmp_dir.path().join(chunk_num.to_string()))
.collect::<Vec<_>>();
let mut merger = merge::merge(&files, settings);
let mut children = Vec::new();
let files = (0..chunks_written).map(|chunk_num| {
let file_path = tmp_dir.path().join(chunk_num.to_string());
let file = File::open(file_path).unwrap();
if let Some(compress_prog) = &settings.compress_prog {
let mut command = Command::new(compress_prog);
command.stdin(file).stdout(Stdio::piped()).arg("-d");
let mut child = crash_if_err!(
2,
command.spawn().map_err(|err| format!(
"couldn't execute compress program: errno {}",
err.raw_os_error().unwrap()
))
);
let child_stdout = child.stdout.take().unwrap();
children.push(child);
Box::new(BufReader::new(child_stdout)) as Box<dyn Read + Send>
} else {
Box::new(BufReader::new(file)) as Box<dyn Read + Send>
}
});
let mut merger = merge::merge(files, settings);
for child in children {
assert_child_success(child, settings.compress_prog.as_ref().unwrap());
}
merger.write_all(settings);
}
ReadResult::SortedSingleChunk(chunk) => {
@ -178,6 +204,7 @@ fn reader_writer(
write(
&mut chunk,
&tmp_dir.path().join(file_number.to_string()),
settings.compress_prog.as_deref(),
separator,
);
@ -200,14 +227,42 @@ fn reader_writer(
}
/// Write the lines in `chunk` to `file`, separated by `separator`.
fn write(chunk: &mut Chunk, file: &Path, separator: u8) {
/// `compress_prog` is used to optionally compress file contents.
fn write(chunk: &mut Chunk, file: &Path, compress_prog: Option<&str>, separator: u8) {
chunk.with_lines_mut(|lines| {
// Write the lines to the file
let file = crash_if_err!(1, OpenOptions::new().create(true).write(true).open(file));
let mut writer = BufWriter::new(file);
for s in lines.iter() {
crash_if_err!(1, writer.write_all(s.line.as_bytes()));
crash_if_err!(1, writer.write_all(&[separator]));
}
if let Some(compress_prog) = compress_prog {
let mut command = Command::new(compress_prog);
command.stdin(Stdio::piped()).stdout(file);
let mut child = crash_if_err!(
2,
command.spawn().map_err(|err| format!(
"couldn't execute compress program: errno {}",
err.raw_os_error().unwrap()
))
);
let mut writer = BufWriter::new(child.stdin.take().unwrap());
write_lines(lines, &mut writer, separator);
writer.flush().unwrap();
drop(writer);
assert_child_success(child, compress_prog);
} else {
let mut writer = BufWriter::new(file);
write_lines(lines, &mut writer, separator);
};
});
}
fn write_lines<'a, T: Write>(lines: &[Line<'a>], writer: &mut T, separator: u8) {
for s in lines {
crash_if_err!(1, writer.write_all(s.line.as_bytes()));
crash_if_err!(1, writer.write_all(&[separator]));
}
}
fn assert_child_success(mut child: Child, program: &str) {
if !matches!(child.wait().map(|e| e.code()), Ok(Some(0)) | Ok(None)) {
crash!(2, "'{}' terminated abnormally", program)
}
}

View file

@ -9,7 +9,6 @@
use std::{
cmp::Ordering,
ffi::OsStr,
io::{Read, Write},
iter,
rc::Rc,
@ -21,15 +20,18 @@ use compare::Compare;
use crate::{
chunks::{self, Chunk},
compare_by, open, GlobalSettings,
compare_by, GlobalSettings,
};
// Merge already sorted files.
pub fn merge<'a>(files: &[impl AsRef<OsStr>], settings: &'a GlobalSettings) -> FileMerger<'a> {
pub fn merge<F: ExactSizeIterator<Item = Box<dyn Read + Send>>>(
files: F,
settings: &GlobalSettings,
) -> FileMerger {
let (request_sender, request_receiver) = channel();
let mut reader_files = Vec::with_capacity(files.len());
let mut loaded_receivers = Vec::with_capacity(files.len());
for (file_number, file) in files.iter().map(open).enumerate() {
for (file_number, file) in files.enumerate() {
let (sender, receiver) = sync_channel(2);
loaded_receivers.push(receiver);
reader_files.push(ReaderFile {

View file

@ -95,6 +95,7 @@ static OPT_PARALLEL: &str = "parallel";
static OPT_FILES0_FROM: &str = "files0-from";
static OPT_BUF_SIZE: &str = "buffer-size";
static OPT_TMP_DIR: &str = "temporary-directory";
static OPT_COMPRESS_PROG: &str = "compress-program";
static ARG_FILES: &str = "files";
@ -155,6 +156,7 @@ pub struct GlobalSettings {
zero_terminated: bool,
buffer_size: usize,
tmp_dir: PathBuf,
compress_prog: Option<String>,
}
impl GlobalSettings {
@ -223,6 +225,7 @@ impl Default for GlobalSettings {
zero_terminated: false,
buffer_size: DEFAULT_BUF_SIZE,
tmp_dir: PathBuf::new(),
compress_prog: None,
}
}
}
@ -1076,6 +1079,13 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
.takes_value(true)
.value_name("DIR"),
)
.arg(
Arg::with_name(OPT_COMPRESS_PROG)
.long(OPT_COMPRESS_PROG)
.help("compress temporary files with PROG, decompress with PROG -d")
.long_help("PROG has to take input from stdin and output to stdout")
.value_name("PROG")
)
.arg(
Arg::with_name(OPT_FILES0_FROM)
.long(OPT_FILES0_FROM)
@ -1165,6 +1175,8 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
.map(PathBuf::from)
.unwrap_or_else(env::temp_dir);
settings.compress_prog = matches.value_of(OPT_COMPRESS_PROG).map(String::from);
settings.zero_terminated = matches.is_present(OPT_ZERO_TERMINATED);
settings.merge = matches.is_present(OPT_MERGE);
@ -1240,7 +1252,7 @@ fn output_sorted_lines<'a>(iter: impl Iterator<Item = &'a Line<'a>>, settings: &
fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
if settings.merge {
let mut file_merger = merge::merge(files, settings);
let mut file_merger = merge::merge(files.iter().map(open), settings);
file_merger.write_all(settings);
} else if settings.check {
if files.len() > 1 {

View file

@ -809,3 +809,34 @@ fn sort_empty_chunk() {
.succeeds()
.stdout_is("a\na\n");
}
#[test]
#[cfg(target_os = "linux")]
fn test_compress() {
new_ucmd!()
.args(&[
"ext_sort.txt",
"-n",
"--compress-program",
"gzip",
"-S",
"10",
])
.succeeds()
.stdout_only_fixture("ext_sort.expected");
}
#[test]
fn test_compress_fail() {
new_ucmd!()
.args(&[
"ext_sort.txt",
"-n",
"--compress-program",
"nonexistent-program",
"-S",
"10",
])
.fails()
.stderr_only("sort: couldn't execute compress program: errno 2");
}