sort: delete the temporary directory when sort is terminated

When hitting Ctrl+C sort now deletes any temporary files. To make this easier I
created a new struct `TmpDirWrapper` that handles the creation of new temporary
files and the registration of the signal handler.
This commit is contained in:
Michael Debertol 2021-08-02 18:52:22 +02:00
parent a4709c805c
commit 0ae9e10ed3
7 changed files with 180 additions and 72 deletions

26
Cargo.lock generated
View file

@ -17,6 +17,12 @@ dependencies = [
"memchr 2.4.0",
]
[[package]]
name = "aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]]
name = "ansi_term"
version = "0.11.0"
@ -521,6 +527,16 @@ dependencies = [
"syn",
]
[[package]]
name = "ctrlc"
version = "3.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "232295399409a8b7ae41276757b5a1cc21032848d42bff2352261f958b3ca29a"
dependencies = [
"nix 0.20.0",
"winapi 0.3.9",
]
[[package]]
name = "custom_derive"
version = "0.1.7"
@ -1014,19 +1030,20 @@ dependencies = [
[[package]]
name = "ouroboros"
version = "0.9.5"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbeff60e3e37407a80ead3e9458145b456e978c4068cddbfea6afb48572962ca"
checksum = "84236d64f1718c387232287cf036eb6632a5ecff226f4ff9dccb8c2b79ba0bde"
dependencies = [
"aliasable",
"ouroboros_macro",
"stable_deref_trait",
]
[[package]]
name = "ouroboros_macro"
version = "0.9.5"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03f2cb802b5bdfdf52f1ffa0b54ce105e4d346e91990dd571f86c91321ad49e2"
checksum = "f463857a6eb96c0136b1d56e56c718350cef30412ec065b48294799a088bca68"
dependencies = [
"Inflector",
"proc-macro-error",
@ -2480,6 +2497,7 @@ dependencies = [
"binary-heap-plus",
"clap",
"compare",
"ctrlc",
"fnv",
"itertools 0.10.1",
"memchr 2.4.0",

View file

@ -18,10 +18,11 @@ path = "src/sort.rs"
binary-heap-plus = "0.4.1"
clap = { version = "2.33", features = ["wrap_help"] }
compare = "0.1.0"
ctrlc = { version = "3.0", features = ["termination"] }
fnv = "1.0.7"
itertools = "0.10.0"
memchr = "2.4.0"
ouroboros = "0.9.3"
ouroboros = "0.10.1"
rand = "0.7"
rayon = "1.5"
tempfile = "3"

View file

@ -13,7 +13,6 @@
use std::cmp::Ordering;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::{
io::Read,
@ -29,14 +28,13 @@ use crate::merge::ClosedTmpFile;
use crate::merge::WriteableCompressedTmpFile;
use crate::merge::WriteablePlainTmpFile;
use crate::merge::WriteableTmpFile;
use crate::tmp_dir::TmpDirWrapper;
use crate::Output;
use crate::SortError;
use crate::{
chunks::{self, Chunk},
compare_by, merge, sort_by, GlobalSettings,
};
use crate::{print_sorted, Line};
use tempfile::TempDir;
const START_BUFFER_SIZE: usize = 8_000;
@ -45,6 +43,7 @@ pub fn ext_sort(
files: &mut impl Iterator<Item = UResult<Box<dyn Read + Send>>>,
settings: &GlobalSettings,
output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> {
let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1);
let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1);
@ -59,6 +58,7 @@ pub fn ext_sort(
sorted_receiver,
recycled_sender,
output,
tmp_dir,
)
} else {
reader_writer::<_, WriteablePlainTmpFile>(
@ -67,6 +67,7 @@ pub fn ext_sort(
sorted_receiver,
recycled_sender,
output,
tmp_dir,
)
}
}
@ -80,6 +81,7 @@ fn reader_writer<
receiver: Receiver<Chunk>,
sender: SyncSender<Chunk>,
output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> {
let separator = if settings.zero_terminated {
b'\0'
@ -92,7 +94,7 @@ fn reader_writer<
let buffer_size = settings.buffer_size / 10;
let read_result: ReadResult<Tmp> = read_write_loop(
files,
&settings.tmp_dir,
tmp_dir,
separator,
buffer_size,
settings,
@ -100,12 +102,11 @@ fn reader_writer<
sender,
)?;
match read_result {
ReadResult::WroteChunksToFile { tmp_files, tmp_dir } => {
let tmp_dir_size = tmp_files.len();
ReadResult::WroteChunksToFile { tmp_files } => {
let merger = merge::merge_with_file_limit::<_, _, Tmp>(
tmp_files.into_iter().map(|c| c.reopen()),
settings,
Some((tmp_dir, tmp_dir_size)),
tmp_dir,
)?;
merger.write_all(settings, output)?;
}
@ -176,15 +177,12 @@ enum ReadResult<I: WriteableTmpFile> {
/// The input fits into two chunks, which were kept in memory.
SortedTwoChunks([Chunk; 2]),
/// The input was read into multiple chunks, which were written to auxiliary files.
WroteChunksToFile {
tmp_files: Vec<I::Closed>,
tmp_dir: TempDir,
},
WroteChunksToFile { tmp_files: Vec<I::Closed> },
}
/// The function that is executed on the reader/writer thread.
fn read_write_loop<I: WriteableTmpFile>(
mut files: impl Iterator<Item = UResult<Box<dyn Read + Send>>>,
tmp_dir_parent: &Path,
tmp_dir: &mut TmpDirWrapper,
separator: u8,
buffer_size: usize,
settings: &GlobalSettings,
@ -228,32 +226,24 @@ fn read_write_loop<I: WriteableTmpFile>(
}
}
let tmp_dir = tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(tmp_dir_parent)
.map_err(|_| SortError::TmpDirCreationFailed)?;
let mut sender_option = Some(sender);
let mut file_number = 0;
let mut tmp_files = vec![];
loop {
let mut chunk = match receiver.recv() {
Ok(it) => it,
_ => {
return Ok(ReadResult::WroteChunksToFile { tmp_files, tmp_dir });
return Ok(ReadResult::WroteChunksToFile { tmp_files });
}
};
let tmp_file = write::<I>(
&mut chunk,
tmp_dir.path().join(file_number.to_string()),
tmp_dir.next_file_path()?,
settings.compress_prog.as_deref(),
separator,
)?;
tmp_files.push(tmp_file);
file_number += 1;
let recycled_chunk = chunk.recycle();
if let Some(sender) = &sender_option {

View file

@ -22,45 +22,41 @@ use std::{
use compare::Compare;
use itertools::Itertools;
use tempfile::TempDir;
use uucore::error::UResult;
use crate::{
chunks::{self, Chunk, RecycledChunk},
compare_by, open, GlobalSettings, Output, SortError,
compare_by, open,
tmp_dir::TmpDirWrapper,
GlobalSettings, Output, SortError,
};
/// If the output file occurs in the input files as well, copy the contents of the output file
/// and replace its occurrences in the inputs with that copy.
fn replace_output_file_in_input_files(
files: &mut [OsString],
settings: &GlobalSettings,
output: Option<&str>,
) -> UResult<Option<(TempDir, usize)>> {
let mut copy: Option<(TempDir, PathBuf)> = None;
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> {
let mut copy: Option<PathBuf> = None;
if let Some(Ok(output_path)) = output.map(|path| Path::new(path).canonicalize()) {
for file in files {
if let Ok(file_path) = Path::new(file).canonicalize() {
if file_path == output_path {
if let Some((_dir, copy)) = &copy {
if let Some(copy) = &copy {
*file = copy.clone().into_os_string();
} else {
let tmp_dir = tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(&settings.tmp_dir)
.map_err(|_| SortError::TmpDirCreationFailed)?;
let copy_path = tmp_dir.path().join("0");
let copy_path = tmp_dir.next_file_path()?;
std::fs::copy(file_path, &copy_path)
.map_err(|error| SortError::OpenTmpFileFailed { error })?;
*file = copy_path.clone().into_os_string();
copy = Some((tmp_dir, copy_path))
copy = Some(copy_path)
}
}
}
}
}
// if we created a TempDir its size must be one.
Ok(copy.map(|(dir, _copy)| (dir, 1)))
Ok(())
}
/// Merge pre-sorted `Box<dyn Read>`s.
@ -71,8 +67,9 @@ pub fn merge<'a>(
files: &mut [OsString],
settings: &'a GlobalSettings,
output: Option<&str>,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<FileMerger<'a>> {
let tmp_dir = replace_output_file_in_input_files(files, settings, output)?;
replace_output_file_in_input_files(files, output, tmp_dir)?;
if settings.compress_prog.is_none() {
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(
files
@ -94,26 +91,16 @@ pub fn merge<'a>(
// Merge already sorted `MergeInput`s.
pub fn merge_with_file_limit<
'a,
M: MergeInput + 'static,
F: ExactSizeIterator<Item = UResult<M>>,
Tmp: WriteableTmpFile + 'static,
>(
files: F,
settings: &GlobalSettings,
tmp_dir: Option<(TempDir, usize)>,
) -> UResult<FileMerger> {
settings: &'a GlobalSettings,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<FileMerger<'a>> {
if files.len() > settings.merge_batch_size {
// If we did not get a tmp_dir, create one.
let (tmp_dir, mut tmp_dir_size) = match tmp_dir {
Some(x) => x,
None => (
tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(&settings.tmp_dir)
.map_err(|_| SortError::TmpDirCreationFailed)?,
0,
),
};
let mut remaining_files = files.len();
let batches = files.chunks(settings.merge_batch_size);
let mut batches = batches.into_iter();
@ -122,11 +109,8 @@ pub fn merge_with_file_limit<
// Work around the fact that `Chunks` is not an `ExactSizeIterator`.
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
let merger = merge_without_limit(batches.next().unwrap(), settings)?;
let mut tmp_file = Tmp::create(
tmp_dir.path().join(tmp_dir_size.to_string()),
settings.compress_prog.as_deref(),
)?;
tmp_dir_size += 1;
let mut tmp_file =
Tmp::create(tmp_dir.next_file_path()?, settings.compress_prog.as_deref())?;
merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?);
}
@ -139,7 +123,7 @@ pub fn merge_with_file_limit<
dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>,
>),
settings,
Some((tmp_dir, tmp_dir_size)),
tmp_dir,
)
} else {
merge_without_limit(files, settings)

View file

@ -22,6 +22,7 @@ mod custom_str_cmp;
mod ext_sort;
mod merge;
mod numeric_str_cmp;
mod tmp_dir;
use chunks::LineData;
use clap::{crate_version, App, Arg};
@ -49,6 +50,8 @@ use uucore::parse_size::{parse_size, ParseSizeError};
use uucore::version_cmp::version_cmp;
use uucore::InvalidEncodingHandling;
use crate::tmp_dir::TmpDirWrapper;
const NAME: &str = "sort";
const ABOUT: &str = "Display sorted concatenation of all FILE(s).";
@ -317,7 +320,6 @@ pub struct GlobalSettings {
threads: String,
zero_terminated: bool,
buffer_size: usize,
tmp_dir: PathBuf,
compress_prog: Option<String>,
merge_batch_size: usize,
precomputed: Precomputed,
@ -400,7 +402,6 @@ impl Default for GlobalSettings {
threads: String::new(),
zero_terminated: false,
buffer_size: DEFAULT_BUF_SIZE,
tmp_dir: PathBuf::new(),
compress_prog: None,
merge_batch_size: 32,
precomputed: Precomputed {
@ -1178,10 +1179,12 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
})
})?;
settings.tmp_dir = matches
.value_of(options::TMP_DIR)
.map(PathBuf::from)
.unwrap_or_else(env::temp_dir);
let mut tmp_dir = TmpDirWrapper::new(
matches
.value_of(options::TMP_DIR)
.map(PathBuf::from)
.unwrap_or_else(env::temp_dir),
);
settings.compress_prog = matches.value_of(options::COMPRESS_PROG).map(String::from);
@ -1280,7 +1283,7 @@ pub fn uumain(args: impl uucore::Args) -> UResult<()> {
settings.init_precomputed();
exec(&mut files, &settings, output)
exec(&mut files, &settings, output, &mut tmp_dir)
}
pub fn uu_app() -> App<'static, 'static> {
@ -1503,9 +1506,14 @@ pub fn uu_app() -> App<'static, 'static> {
)
}
fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> UResult<()> {
fn exec(
files: &mut [OsString],
settings: &GlobalSettings,
output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> {
if settings.merge {
let file_merger = merge::merge(files, settings, output.as_output_name())?;
let file_merger = merge::merge(files, settings, output.as_output_name(), tmp_dir)?;
file_merger.write_all(settings, output)
} else if settings.check {
if files.len() > 1 {
@ -1515,7 +1523,7 @@ fn exec(files: &mut [OsString], settings: &GlobalSettings, output: Output) -> UR
}
} else {
let mut lines = files.iter().map(open);
ext_sort(&mut lines, settings, output)
ext_sort(&mut lines, settings, output, tmp_dir)
}
}

View file

@ -0,0 +1,80 @@
use std::{
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
use tempfile::TempDir;
use uucore::error::{UResult, USimpleError};
use crate::SortError;
/// A wrapper around TempDir that may only exist once in a process.
///
/// `TmpDirWrapper` handles the allocation of new temporary files in this temporary directory and
/// deleting the whole directory when `SIGINT` is received. Creating a second `TmpDirWrapper` will
/// fail because `ctrlc::set_handler()` fails when there's already a handler.
/// The directory is only created once the first file is requested.
pub struct TmpDirWrapper {
temp_dir: Option<TempDir>,
parent_path: PathBuf,
size: usize,
lock: Arc<Mutex<()>>,
}
impl TmpDirWrapper {
pub fn new(path: PathBuf) -> Self {
Self {
parent_path: path,
size: 0,
temp_dir: None,
lock: Default::default(),
}
}
fn init_tmp_dir(&mut self) -> UResult<()> {
assert!(self.temp_dir.is_none());
assert_eq!(self.size, 0);
self.temp_dir = Some(
tempfile::Builder::new()
.prefix("uutils_sort")
.tempdir_in(&self.parent_path)
.map_err(|_| SortError::TmpDirCreationFailed)?,
);
let path = self.temp_dir.as_ref().unwrap().path().to_owned();
let lock = self.lock.clone();
ctrlc::set_handler(move || {
// Take the lock so that `next_file_path` returns no new file path.
let _lock = lock.lock().unwrap();
if let Err(e) = remove_tmp_dir(&path) {
show_error!("failed to delete temporary directory: {}", e);
}
std::process::exit(2)
})
.map_err(|e| USimpleError::new(2, format!("failed to set up signal handler: {}", e)))
}
pub fn next_file_path(&mut self) -> UResult<PathBuf> {
if self.temp_dir.is_none() {
self.init_tmp_dir()?;
}
let _lock = self.lock.lock().unwrap();
let file_name = self.size.to_string();
self.size += 1;
Ok(self.temp_dir.as_ref().unwrap().path().join(file_name))
}
}
/// Remove the directory at `path` by deleting its child files and then itself.
/// Errors while deleting child files are ignored.
fn remove_tmp_dir(path: &Path) -> std::io::Result<()> {
if let Ok(read_dir) = std::fs::read_dir(&path) {
for file in read_dir.flatten() {
// if we fail to delete the file here it was probably deleted by another thread
// in the meantime, but that's ok.
let _ = std::fs::remove_file(file.path());
}
}
std::fs::remove_dir(path)
}

View file

@ -1091,3 +1091,30 @@ fn test_wrong_args_exit_code() {
.status_code(2)
.stderr_contains("--misspelled");
}
#[test]
#[cfg(unix)]
fn test_tmp_files_deleted_on_sigint() {
use std::{fs::read_dir, time::Duration};
use nix::{sys::signal, unistd::Pid};
let (at, mut ucmd) = at_and_ucmd!();
at.mkdir("tmp_dir");
ucmd.args(&[
"ext_sort.txt",
"--buffer-size=1", // with a small buffer size `sort` will be forced to create a temporary directory very soon.
"--temporary-directory=tmp_dir",
]);
let mut child = ucmd.run_no_wait();
// wait a short amount of time so that `sort` can create a temporary directory.
std::thread::sleep(Duration::from_millis(100));
// `sort` should have created a temporary directory.
assert!(read_dir(at.plus("tmp_dir")).unwrap().next().is_some());
// kill sort with SIGINT
signal::kill(Pid::from_raw(child.id() as i32), signal::SIGINT).unwrap();
// wait for `sort` to exit
assert_eq!(child.wait().unwrap().code(), Some(2));
// `sort` should have deleted the temporary directory again.
assert!(read_dir(at.plus("tmp_dir")).unwrap().next().is_none());
}