sort: read files as chunks, off-thread

Instead of using a BufReader and reading each line separately,
allocating a String for each one, we read to a chunk. Lines are
references to this chunk. This makes the allocator's job much easier
and yields performance improvements.

Chunks are read on a separate thread to further improve performance.
This commit is contained in:
Michael Debertol 2021-05-16 21:13:37 +02:00
parent 9e2c82d8e7
commit fcd48813e0
11 changed files with 1003 additions and 452 deletions

141
Cargo.lock generated
View file

@ -1,5 +1,11 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "Inflector"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
[[package]]
name = "advapi32-sys"
version = "0.2.0"
@ -63,6 +69,15 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "binary-heap-plus"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f068638f8ff9e118a9361e66a411eff410e7fb3ecaa23bf9272324f8fc606d7"
dependencies = [
"compare",
]
[[package]]
name = "bit-set"
version = "0.5.2"
@ -136,9 +151,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "cast"
version = "0.2.5"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc38c385bfd7e444464011bb24820f40dd1c76bcdfa1b78611cb7c2e5cafab75"
checksum = "57cdfa5d50aad6cb4d44dcab6101a7f79925bd59d82ca42f38a9856a28865374"
dependencies = [
"rustc_version",
]
@ -198,6 +213,12 @@ dependencies = [
"bitflags",
]
[[package]]
name = "compare"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "120133d4db2ec47efe2e26502ee984747630c67f51974fca0b6c1340cf2368d3"
[[package]]
name = "constant_time_eq"
version = "0.1.5"
@ -999,6 +1020,29 @@ version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "ouroboros"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc1f52300b81ac4eeeb6c00c20f7e86556c427d9fb2d92b68fc73c22f331cd15"
dependencies = [
"ouroboros_macro",
"stable_deref_trait",
]
[[package]]
name = "ouroboros_macro"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41db02c8f8731cdd7a72b433c7900cce4bf245465b452c364bfd21f4566ab055"
dependencies = [
"Inflector",
"proc-macro-error",
"proc-macro2",
"quote 1.0.9",
"syn",
]
[[package]]
name = "output_vt100"
version = "0.1.2"
@ -1027,6 +1071,15 @@ dependencies = [
"proc-macro-hack",
]
[[package]]
name = "pest"
version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f4872ae94d7b90ae48754df22fd42ad52ce740b8f370b03da4835417403e53"
dependencies = [
"ucd-trie",
]
[[package]]
name = "pkg-config"
version = "0.3.19"
@ -1089,6 +1142,30 @@ dependencies = [
"output_vt100",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote 1.0.9",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote 1.0.9",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@ -1336,11 +1413,11 @@ checksum = "3e52c148ef37f8c375d49d5a73aa70713125b7f19095948a923f80afdeb22ec2"
[[package]]
name = "rustc_version"
version = "0.2.3"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
checksum = "f0dfe2087c51c460008730de8b57e6a320782fbfb312e1f4d520e6c6fae155ee"
dependencies = [
"semver",
"semver 0.11.0",
]
[[package]]
@ -1370,7 +1447,16 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser",
"semver-parser 0.7.0",
]
[[package]]
name = "semver"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f301af10236f6df4160f7c3f04eec6dbc70ace82d23326abad5edee88801c6b6"
dependencies = [
"semver-parser 0.10.2",
]
[[package]]
@ -1380,10 +1466,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.125"
name = "semver-parser"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "558dc50e1a5a5fa7112ca2ce4effcb321b0300c0d4ccf0776a9f60cd89031171"
checksum = "00b0bef5b7f9e0df16536d3961cfb6e84331c065b4066afb39768d0e319411f7"
dependencies = [
"pest",
]
[[package]]
name = "serde"
version = "1.0.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
[[package]]
name = "serde_cbor"
@ -1397,9 +1492,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.125"
version = "1.0.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b093b7a2bb58203b5da3056c05b4ec1fed827dcfdb37347a8841695263b3d06d"
checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43"
dependencies = [
"proc-macro2",
"quote 1.0.9",
@ -1468,6 +1563,12 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "strsim"
version = "0.8.0"
@ -1627,6 +1728,12 @@ version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879f6906492a7cd215bfa4cf595b600146ccfac0c79bcbd1f3000162af5e8b06"
[[package]]
name = "ucd-trie"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c"
[[package]]
name = "unicode-segmentation"
version = "1.7.1"
@ -2402,12 +2509,16 @@ dependencies = [
name = "uu_sort"
version = "0.0.6"
dependencies = [
"binary-heap-plus",
"clap",
"compare",
"fnv",
"itertools 0.10.0",
"memchr 2.4.0",
"ouroboros",
"rand 0.7.3",
"rayon",
"semver",
"semver 0.9.0",
"tempdir",
"unicode-width",
"uucore",
@ -2720,6 +2831,12 @@ version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "void"
version = "1.0.2"

View file

@ -75,7 +75,20 @@ Try running commands with the `-S` option set to an amount of memory to be used,
huge files (ideally multiple Gigabytes) with `-S`. Creating such a large file can be achieved by running `cat shuffled_wordlist.txt | sort -R >> shuffled_wordlist.txt`
multiple times (this will add the contents of `shuffled_wordlist.txt` to itself).
Example: Run `hyperfine './target/release/coreutils sort shuffled_wordlist.txt -S 1M' 'sort shuffled_wordlist.txt -S 1M'`
`
## Merging
"Merge" sort merges already sorted files. It is a sub-step of external sorting, so benchmarking it separately may be helpful.
- Splitting `shuffled_wordlist.txt` can be achieved by running `split shuffled_wordlist.txt shuffled_wordlist_slice_ --additional-suffix=.txt`
- Sort each part by running `for f in shuffled_wordlist_slice_*; do sort $f -o $f; done`
- Benchmark merging by running `hyperfine "target/release/coreutils sort -m shuffled_wordlist_slice_*"`
## Check
When invoked with -c, we simply check if the input is already ordered. The input for benchmarking should be an already sorted file.
- Benchmark checking by running `hyperfine "target/release/coreutils sort -c sorted_wordlist.txt"`
## Stdout and stdin performance

View file

@ -15,16 +15,20 @@ edition = "2018"
path = "src/sort.rs"
[dependencies]
rayon = "1.5"
rand = "0.7"
binary-heap-plus = "0.4.1"
clap = "2.33"
compare = "0.1.0"
fnv = "1.0.7"
itertools = "0.10.0"
memchr = "2.4.0"
ouroboros = "0.9.3"
rand = "0.7"
rayon = "1.5"
semver = "0.9.0"
tempdir = "0.3.7"
unicode-width = "0.1.8"
uucore = { version=">=0.0.8", package="uucore", path="../../uucore", features=["fs"] }
uucore_procs = { version=">=0.0.5", package="uucore_procs", path="../../uucore_procs" }
tempdir = "0.3.7"
[[bin]]
name = "sort"

102
src/uu/sort/src/check.rs Normal file
View file

@ -0,0 +1,102 @@
// * This file is part of the uutils coreutils package.
// *
// * (c) Michael Debertol <michael.debertol..AT..gmail.com>
// *
// * For the full copyright and license information, please view the LICENSE
// * file that was distributed with this source code.
//! Check if a file is ordered
use crate::{
chunks::{self, Chunk},
compare_by, open, GlobalSettings,
};
use itertools::Itertools;
use std::{
cmp::Ordering,
io::Read,
iter,
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread,
};
/// Check if the file at `path` is ordered.
///
/// # Returns
///
/// The code we should exit with.
pub fn check(path: &str, settings: &GlobalSettings) -> i32 {
let file = open(path).expect("failed to open input file");
let (recycled_sender, recycled_receiver) = sync_channel(2);
let (loaded_sender, loaded_receiver) = sync_channel(2);
thread::spawn({
let settings = settings.clone();
move || reader(file, recycled_receiver, loaded_sender, &settings)
});
for _ in 0..2 {
recycled_sender
.send(Chunk::new(vec![0; 100 * 1024], |_| Vec::new()))
.unwrap();
}
let mut prev_chunk: Option<Chunk> = None;
let mut line_idx = 0;
for chunk in loaded_receiver.iter() {
line_idx += 1;
if let Some(prev_chunk) = prev_chunk.take() {
// Check if the first element of the new chunk is greater than the last
// element from the previous chunk
let prev_last = prev_chunk.borrow_lines().last().unwrap();
let new_first = chunk.borrow_lines().first().unwrap();
if compare_by(prev_last, new_first, &settings) == Ordering::Greater {
if !settings.check_silent {
println!("sort: {}:{}: disorder: {}", path, line_idx, new_first.line);
}
return 1;
}
recycled_sender.send(prev_chunk).ok();
}
for (a, b) in chunk.borrow_lines().iter().tuple_windows() {
line_idx += 1;
if compare_by(a, b, &settings) == Ordering::Greater {
if !settings.check_silent {
println!("sort: {}:{}: disorder: {}", path, line_idx, b.line);
}
return 1;
}
}
prev_chunk = Some(chunk);
}
0
}
/// The function running on the reader thread.
fn reader(
mut file: Box<dyn Read + Send>,
receiver: Receiver<Chunk>,
sender: SyncSender<Chunk>,
settings: &GlobalSettings,
) {
let mut sender = Some(sender);
let mut carry_over = vec![];
for chunk in receiver.iter() {
let (recycled_lines, recycled_buffer) = chunk.recycle();
chunks::read(
&mut sender,
recycled_buffer,
&mut carry_over,
&mut file,
&mut iter::empty(),
if settings.zero_terminated {
b'\0'
} else {
b'\n'
},
recycled_lines,
settings,
)
}
}

202
src/uu/sort/src/chunks.rs Normal file
View file

@ -0,0 +1,202 @@
// * This file is part of the uutils coreutils package.
// *
// * (c) Michael Debertol <michael.debertol..AT..gmail.com>
// *
// * For the full copyright and license information, please view the LICENSE
// * file that was distributed with this source code.
//! Utilities for reading files as chunks.
use std::{
io::{ErrorKind, Read},
sync::mpsc::SyncSender,
};
use memchr::memchr_iter;
use ouroboros::self_referencing;
use crate::{GlobalSettings, Line};
/// The chunk that is passed around between threads.
/// `lines` consist of slices into `buffer`.
#[self_referencing(pub_extras)]
#[derive(Debug)]
pub struct Chunk {
pub buffer: Vec<u8>,
#[borrows(buffer)]
#[covariant]
pub lines: Vec<Line<'this>>,
}
impl Chunk {
/// Destroy this chunk and return its components to be reused.
///
/// # Returns
///
/// * The `lines` vector, emptied
/// * The `buffer` vector, **not** emptied
pub fn recycle(mut self) -> (Vec<Line<'static>>, Vec<u8>) {
let recycled_lines = self.with_lines_mut(|lines| {
lines.clear();
unsafe {
// SAFETY: It is safe to (temporarily) transmute to a vector of lines with a longer lifetime,
// because the vector is empty.
// Transmuting is necessary to make recycling possible. See https://github.com/rust-lang/rfcs/pull/2802
// for a rfc to make this unnecessary. Its example is similar to the code here.
std::mem::transmute::<Vec<Line<'_>>, Vec<Line<'static>>>(std::mem::take(lines))
}
});
(recycled_lines, self.into_heads().buffer)
}
}
/// Read a chunk, parse lines and send them.
///
/// No empty chunk will be sent.
///
/// # Arguments
///
/// * `sender_option`: The sender to send the lines to the sorter. If `None`, does nothing.
/// * `buffer`: The recycled buffer. All contents will be overwritten, but it must already be filled.
/// (i.e. `buffer.len()` should be equal to `buffer.capacity()`)
/// * `carry_over`: The bytes that must be carried over in between invocations.
/// * `file`: The current file.
/// * `next_files`: What `file` should be updated to next.
/// * `separator`: The line separator.
/// * `lines`: The recycled vector to fill with lines. Must be empty.
/// * `settings`: The global settings.
#[allow(clippy::too_many_arguments)]
pub fn read(
sender_option: &mut Option<SyncSender<Chunk>>,
mut buffer: Vec<u8>,
carry_over: &mut Vec<u8>,
file: &mut Box<dyn Read + Send>,
next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
separator: u8,
lines: Vec<Line<'static>>,
settings: &GlobalSettings,
) {
assert!(lines.is_empty());
if let Some(sender) = sender_option {
if buffer.len() < carry_over.len() {
buffer.resize(carry_over.len() + 10 * 1024, 0);
}
buffer[..carry_over.len()].copy_from_slice(&carry_over);
let (read, should_continue) =
read_to_buffer(file, next_files, &mut buffer, carry_over.len(), separator);
carry_over.clear();
carry_over.extend_from_slice(&buffer[read..]);
let payload = Chunk::new(buffer, |buf| {
let mut lines = unsafe {
// SAFETY: It is safe to transmute to a vector of lines with shorter lifetime,
// because it was only temporarily transmuted to a Vec<Line<'static>> to make recycling possible.
std::mem::transmute::<Vec<Line<'static>>, Vec<Line<'_>>>(lines)
};
let read = crash_if_err!(1, std::str::from_utf8(&buf[..read]));
parse_lines(read, &mut lines, separator, &settings);
lines
});
if !payload.borrow_lines().is_empty() {
sender.send(payload).unwrap();
}
if !should_continue {
*sender_option = None;
}
}
}
/// Split `read` into `Line`s, and add them to `lines`.
fn parse_lines<'a>(
mut read: &'a str,
lines: &mut Vec<Line<'a>>,
separator: u8,
settings: &GlobalSettings,
) {
// Strip a trailing separator. TODO: Once our MinRustV is 1.45 or above, use strip_suffix() instead.
if read.ends_with(separator as char) {
read = &read[..read.len() - 1];
}
lines.extend(
read.split(separator as char)
.map(|line| Line::create(line, settings)),
);
}
/// Read from `file` into `buffer`.
///
/// This function makes sure that at least two lines are read (unless we reach EOF and there's no next file),
/// growing the buffer if necessary.
/// The last line is likely to not have been fully read into the buffer. Its bytes must be copied to
/// the front of the buffer for the next invocation so that it can be continued to be read
/// (see the return values and `start_offset`).
///
/// # Arguments
///
/// * `file`: The file to start reading from.
/// * `next_files`: When `file` reaches EOF, it is updated to `next_files.next()` if that is `Some`,
/// and this function continues reading.
/// * `buffer`: The buffer that is filled with bytes. Its contents will mostly be overwritten (see `start_offset`
/// as well). It will not be grown by default, unless that is necessary to read at least two lines.
/// * `start_offset`: The amount of bytes at the start of `buffer` that were carried over
/// from the previous read and should not be overwritten.
/// * `separator`: The byte that separates lines.
///
/// # Returns
///
/// * The amount of bytes in `buffer` that can now be interpreted as lines.
/// The remaining bytes must be copied to the start of the buffer for the next invocation,
/// if another invocation is necessary, which is determined by the other return value.
/// * Whether this function should be called again.
fn read_to_buffer(
file: &mut Box<dyn Read + Send>,
next_files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
buffer: &mut Vec<u8>,
start_offset: usize,
separator: u8,
) -> (usize, bool) {
let mut read_target = &mut buffer[start_offset..];
loop {
match file.read(read_target) {
Ok(0) => {
if read_target.is_empty() {
// chunk is full
let mut sep_iter = memchr_iter(separator, &buffer).rev();
let last_line_end = sep_iter.next();
if sep_iter.next().is_some() {
// We read enough lines.
let end = last_line_end.unwrap();
// We want to include the separator here, because it shouldn't be carried over.
return (end + 1, true);
} else {
// We need to read more lines
let len = buffer.len();
// resize the vector to 10 KB more
buffer.resize(len + 1024 * 10, 0);
read_target = &mut buffer[len..];
}
} else {
// This file is empty.
if let Some(next_file) = next_files.next() {
// There is another file.
*file = next_file;
} else {
// This was the last file.
let leftover_len = read_target.len();
return (buffer.len() - leftover_len, false);
}
}
}
Ok(n) => {
read_target = &mut read_target[n..];
}
Err(e) if e.kind() == ErrorKind::Interrupted => {
// retry
}
Err(e) => {
crash!(1, "{}", e)
}
}
}
}

160
src/uu/sort/src/ext_sort.rs Normal file
View file

@ -0,0 +1,160 @@
// * This file is part of the uutils coreutils package.
// *
// * (c) Michael Debertol <michael.debertol..AT..gmail.com>
// *
// * For the full copyright and license information, please view the LICENSE
// * file that was distributed with this source code.
//! Sort big files by using files for storing intermediate chunks.
//!
//! Files are read into chunks of memory which are then sorted individually and
//! written to temporary files. There are two threads: One sorter, and one reader/writer.
//! The buffers for the individual chunks are recycled. There are two buffers.
use std::io::{BufWriter, Write};
use std::path::Path;
use std::{
fs::OpenOptions,
io::Read,
sync::mpsc::{Receiver, SyncSender},
thread,
};
use tempdir::TempDir;
use crate::{
chunks::{self, Chunk},
merge::{self, FileMerger},
sort_by, GlobalSettings,
};
/// Iterator that wraps the
pub struct ExtSortedMerger<'a> {
pub file_merger: FileMerger<'a>,
// Keep _tmp_dir around, as it is deleted when dropped.
_tmp_dir: TempDir,
}
/// Sort big files by using files for storing intermediate chunks.
///
/// # Returns
///
/// An iterator that merges intermediate files back together.
pub fn ext_sort<'a>(
files: &mut impl Iterator<Item = Box<dyn Read + Send>>,
settings: &'a GlobalSettings,
) -> ExtSortedMerger<'a> {
let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort"));
let (sorted_sender, sorted_receiver) = std::sync::mpsc::sync_channel(1);
let (recycled_sender, recycled_receiver) = std::sync::mpsc::sync_channel(1);
thread::spawn({
let settings = settings.clone();
move || sorter(recycled_receiver, sorted_sender, settings)
});
let chunks_read = reader_writer(
files,
&tmp_dir,
if settings.zero_terminated {
b'\0'
} else {
b'\n'
},
// Heuristically chosen: Dividing by 10 seems to keep our memory usage roughly
// around settings.buffer_size as a whole.
settings.buffer_size / 10,
settings.clone(),
sorted_receiver,
recycled_sender,
);
let files = (0..chunks_read)
.map(|chunk_num| tmp_dir.path().join(chunk_num.to_string()))
.collect::<Vec<_>>();
ExtSortedMerger {
file_merger: merge::merge(&files, settings),
_tmp_dir: tmp_dir,
}
}
/// The function that is executed on the sorter thread.
fn sorter(receiver: Receiver<Chunk>, sender: SyncSender<Chunk>, settings: GlobalSettings) {
while let Ok(mut payload) = receiver.recv() {
payload.with_lines_mut(|lines| sort_by(lines, &settings));
sender.send(payload).unwrap();
}
}
/// The function that is executed on the reader/writer thread.
///
/// # Returns
/// * The number of chunks read.
fn reader_writer(
mut files: impl Iterator<Item = Box<dyn Read + Send>>,
tmp_dir: &TempDir,
separator: u8,
buffer_size: usize,
settings: GlobalSettings,
receiver: Receiver<Chunk>,
sender: SyncSender<Chunk>,
) -> usize {
let mut sender_option = Some(sender);
let mut file = files.next().unwrap();
let mut carry_over = vec![];
// kick things off with two reads
for _ in 0..2 {
chunks::read(
&mut sender_option,
vec![0; buffer_size],
&mut carry_over,
&mut file,
&mut files,
separator,
Vec::new(),
&settings,
)
}
let mut file_number = 0;
loop {
let mut chunk = match receiver.recv() {
Ok(it) => it,
_ => return file_number,
};
write(
&mut chunk,
&tmp_dir.path().join(file_number.to_string()),
separator,
);
let (recycled_lines, recycled_buffer) = chunk.recycle();
file_number += 1;
chunks::read(
&mut sender_option,
recycled_buffer,
&mut carry_over,
&mut file,
&mut files,
separator,
recycled_lines,
&settings,
);
}
}
/// Write the lines in `chunk` to `file`, separated by `separator`.
fn write(chunk: &mut Chunk, file: &Path, separator: u8) {
chunk.with_lines_mut(|lines| {
// Write the lines to the file
let file = crash_if_err!(1, OpenOptions::new().create(true).write(true).open(file));
let mut writer = BufWriter::new(file);
for s in lines.iter() {
crash_if_err!(1, writer.write_all(s.line.as_bytes()));
crash_if_err!(1, writer.write_all(&[separator]));
}
});
}

View file

@ -1,19 +0,0 @@
Copyright 2018 Battelle Memorial Institute
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -1,93 +0,0 @@
use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::path::Path;
use tempdir::TempDir;
use crate::{file_to_lines_iter, FileMerger};
use super::{GlobalSettings, Line};
/// Iterator that provides sorted `T`s
pub struct ExtSortedIterator<'a> {
file_merger: FileMerger<'a>,
// Keep tmp_dir around, it is deleted when dropped.
_tmp_dir: TempDir,
}
impl<'a> Iterator for ExtSortedIterator<'a> {
type Item = Line;
fn next(&mut self) -> Option<Self::Item> {
self.file_merger.next()
}
}
/// Sort (based on `compare`) the `T`s provided by `unsorted` and return an
/// iterator
///
/// # Panics
///
/// This method can panic due to issues writing intermediate sorted chunks
/// to disk.
pub fn ext_sort(
unsorted: impl Iterator<Item = Line>,
settings: &GlobalSettings,
) -> ExtSortedIterator {
let tmp_dir = crash_if_err!(1, TempDir::new_in(&settings.tmp_dir, "uutils_sort"));
let mut total_read = 0;
let mut chunk = Vec::new();
let mut chunks_read = 0;
let mut file_merger = FileMerger::new(settings);
// make the initial chunks on disk
for seq in unsorted {
let seq_size = seq.estimate_size();
total_read += seq_size;
chunk.push(seq);
if total_read >= settings.buffer_size && chunk.len() >= 2 {
super::sort_by(&mut chunk, &settings);
let file_path = tmp_dir.path().join(chunks_read.to_string());
write_chunk(settings, &file_path, &mut chunk);
chunk.clear();
total_read = 0;
chunks_read += 1;
file_merger.push_file(Box::new(file_to_lines_iter(file_path, settings).unwrap()))
}
}
// write the last chunk
if !chunk.is_empty() {
super::sort_by(&mut chunk, &settings);
let file_path = tmp_dir.path().join(chunks_read.to_string());
write_chunk(
settings,
&tmp_dir.path().join(chunks_read.to_string()),
&mut chunk,
);
file_merger.push_file(Box::new(file_to_lines_iter(file_path, settings).unwrap()));
}
ExtSortedIterator {
file_merger,
_tmp_dir: tmp_dir,
}
}
fn write_chunk(settings: &GlobalSettings, file: &Path, chunk: &mut Vec<Line>) {
let new_file = crash_if_err!(1, OpenOptions::new().create(true).append(true).open(file));
let mut buf_write = BufWriter::new(new_file);
for s in chunk {
crash_if_err!(1, buf_write.write_all(s.line.as_bytes()));
crash_if_err!(
1,
buf_write.write_all(if settings.zero_terminated { "\0" } else { "\n" }.as_bytes(),)
);
}
crash_if_err!(1, buf_write.flush());
}

223
src/uu/sort/src/merge.rs Normal file
View file

@ -0,0 +1,223 @@
//! Merge already sorted files.
//!
//! We achieve performance by splitting the tasks of sorting and writing, and reading and parsing between two threads.
//! The threads communicate over channels. There's one channel per file in the direction reader -> sorter, but only
//! one channel from the sorter back to the reader. The channels to the sorter are used to send the read chunks.
//! The sorter reads the next chunk from the channel whenever it needs the next chunk after running out of lines
//! from the previous read of the file. The channel back from the sorter to the reader has two purposes: To allow the reader
//! to reuse memory allocations and to tell the reader which file to read from next.
use std::{
cmp::Ordering,
ffi::OsStr,
io::{Read, Write},
iter,
rc::Rc,
sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender},
thread,
};
use compare::Compare;
use crate::{
chunks::{self, Chunk},
compare_by, open, GlobalSettings,
};
// Merge already sorted files.
pub fn merge<'a>(files: &[impl AsRef<OsStr>], settings: &'a GlobalSettings) -> FileMerger<'a> {
let (request_sender, request_receiver) = channel();
let mut reader_files = Vec::with_capacity(files.len());
let mut loaded_receivers = Vec::with_capacity(files.len());
for (file_number, file) in files.iter().filter_map(open).enumerate() {
let (sender, receiver) = sync_channel(2);
loaded_receivers.push(receiver);
reader_files.push(ReaderFile {
file,
sender: Some(sender),
carry_over: vec![],
});
request_sender
.send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new())))
.unwrap();
}
for file_number in 0..reader_files.len() {
request_sender
.send((file_number, Chunk::new(vec![0; 8 * 1024], |_| Vec::new())))
.unwrap();
}
thread::spawn({
let settings = settings.clone();
move || {
reader(
request_receiver,
&mut reader_files,
&settings,
if settings.zero_terminated {
b'\0'
} else {
b'\n'
},
)
}
});
let mut mergeable_files = vec![];
for (file_number, receiver) in loaded_receivers.into_iter().enumerate() {
mergeable_files.push(MergeableFile {
current_chunk: Rc::new(receiver.recv().unwrap()),
file_number,
line_idx: 0,
receiver,
})
}
FileMerger {
heap: binary_heap_plus::BinaryHeap::from_vec_cmp(
mergeable_files,
FileComparator { settings },
),
request_sender,
prev: None,
}
}
/// The struct on the reader thread representing an input file
struct ReaderFile {
file: Box<dyn Read + Send>,
sender: Option<SyncSender<Chunk>>,
carry_over: Vec<u8>,
}
/// The function running on the reader thread.
fn reader(
recycled_receiver: Receiver<(usize, Chunk)>,
files: &mut [ReaderFile],
settings: &GlobalSettings,
separator: u8,
) {
for (file_idx, chunk) in recycled_receiver.iter() {
let (recycled_lines, recycled_buffer) = chunk.recycle();
let ReaderFile {
file,
sender,
carry_over,
} = &mut files[file_idx];
chunks::read(
sender,
recycled_buffer,
carry_over,
file,
&mut iter::empty(),
separator,
recycled_lines,
settings,
);
}
}
/// The struct on the main thread representing an input file
pub struct MergeableFile {
current_chunk: Rc<Chunk>,
line_idx: usize,
receiver: Receiver<Chunk>,
file_number: usize,
}
/// A struct to keep track of the previous line we encountered.
///
/// This is required for deduplication purposes.
struct PreviousLine {
chunk: Rc<Chunk>,
line_idx: usize,
file_number: usize,
}
/// Merges files together. This is **not** an iterator because of lifetime problems.
pub struct FileMerger<'a> {
heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>,
request_sender: Sender<(usize, Chunk)>,
prev: Option<PreviousLine>,
}
impl<'a> FileMerger<'a> {
/// Write the merged contents to the output file.
pub fn write_all(&mut self, settings: &GlobalSettings) {
let mut out = settings.out_writer();
while self.write_next(settings, &mut out) {}
}
fn write_next(&mut self, settings: &GlobalSettings, out: &mut impl Write) -> bool {
if let Some(file) = self.heap.peek() {
let prev = self.prev.replace(PreviousLine {
chunk: file.current_chunk.clone(),
line_idx: file.line_idx,
file_number: file.file_number,
});
file.current_chunk.with_lines(|lines| {
let current_line = &lines[file.line_idx];
if settings.unique {
if let Some(prev) = &prev {
let cmp = compare_by(
&prev.chunk.borrow_lines()[prev.line_idx],
current_line,
settings,
);
if cmp == Ordering::Equal {
return;
}
}
}
current_line.print(out, settings);
});
let was_last_line_for_file =
file.current_chunk.borrow_lines().len() == file.line_idx + 1;
if was_last_line_for_file {
if let Ok(next_chunk) = file.receiver.recv() {
let mut file = self.heap.peek_mut().unwrap();
file.current_chunk = Rc::new(next_chunk);
file.line_idx = 0;
} else {
self.heap.pop();
}
} else {
self.heap.peek_mut().unwrap().line_idx += 1;
}
if let Some(prev) = prev {
if let Ok(prev_chunk) = Rc::try_unwrap(prev.chunk) {
self.request_sender
.send((prev.file_number, prev_chunk))
.ok();
}
}
}
!self.heap.is_empty()
}
}
/// Compares files by their current line.
struct FileComparator<'a> {
settings: &'a GlobalSettings,
}
impl<'a> Compare<MergeableFile> for FileComparator<'a> {
fn compare(&self, a: &MergeableFile, b: &MergeableFile) -> Ordering {
let mut cmp = compare_by(
&a.current_chunk.borrow_lines()[a.line_idx],
&b.current_chunk.borrow_lines()[b.line_idx],
self.settings,
);
if cmp == Ordering::Equal {
// To make sorting stable, we need to consider the file number as well,
// as lines from a file with a lower number are to be considered "earlier".
cmp = a.file_number.cmp(&b.file_number);
}
// Our BinaryHeap is a max heap. We use it as a min heap, so we need to reverse the ordering.
cmp.reverse()
}
}

View file

@ -15,13 +15,16 @@
#[macro_use]
extern crate uucore;
mod check;
mod chunks;
mod custom_str_cmp;
mod external_sort;
mod ext_sort;
mod merge;
mod numeric_str_cmp;
use clap::{App, Arg};
use custom_str_cmp::custom_str_cmp;
use external_sort::ext_sort;
use ext_sort::ext_sort;
use fnv::FnvHasher;
use itertools::Itertools;
use numeric_str_cmp::{numeric_str_cmp, NumInfo, NumInfoParseSettings};
@ -30,18 +33,15 @@ use rand::{thread_rng, Rng};
use rayon::prelude::*;
use semver::Version;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::env;
use std::ffi::OsStr;
use std::fs::File;
use std::hash::{Hash, Hasher};
use std::io::{stdin, stdout, BufRead, BufReader, BufWriter, Read, Write};
use std::mem::replace;
use std::ops::Range;
use std::path::Path;
use std::path::PathBuf;
use unicode_width::UnicodeWidthStr;
use uucore::fs::is_stdin_interactive; // for Iterator::dedup()
use uucore::InvalidEncodingHandling;
static NAME: &str = "sort";
@ -150,6 +150,19 @@ impl GlobalSettings {
};
num_usize * suf_usize
}
fn out_writer(&self) -> BufWriter<Box<dyn Write>> {
match self.outfile {
Some(ref filename) => match File::create(Path::new(&filename)) {
Ok(f) => BufWriter::new(Box::new(f) as Box<dyn Write>),
Err(e) => {
show_error!("{0}: {1}", filename, e.to_string());
panic!("Could not open output file");
}
},
None => BufWriter::new(Box::new(stdout()) as Box<dyn Write>),
}
}
}
impl Default for GlobalSettings {
@ -205,29 +218,7 @@ impl From<&GlobalSettings> for KeySettings {
}
}
#[derive(Debug, Clone)]
/// Represents the string selected by a FieldSelector.
struct SelectionRange {
range: Range<usize>,
}
impl SelectionRange {
fn new(range: Range<usize>) -> Self {
Self { range }
}
/// Gets the actual string slice represented by this Selection.
fn get_str<'a>(&self, line: &'a str) -> &'a str {
&line[self.range.to_owned()]
}
fn shorten(&mut self, new_range: Range<usize>) {
self.range.end = self.range.start + new_range.end;
self.range.start += new_range.start;
}
}
#[derive(Clone)]
#[derive(Clone, Debug)]
enum NumCache {
AsF64(GeneralF64ParseResult),
WithInfo(NumInfo),
@ -248,64 +239,53 @@ impl NumCache {
}
}
#[derive(Clone)]
struct Selection {
range: SelectionRange,
#[derive(Clone, Debug)]
struct Selection<'a> {
slice: &'a str,
num_cache: Option<Box<NumCache>>,
}
impl Selection {
/// Gets the actual string slice represented by this Selection.
fn get_str<'a>(&'a self, line: &'a Line) -> &'a str {
self.range.get_str(&line.line)
}
}
type Field = Range<usize>;
#[derive(Clone)]
pub struct Line {
line: Box<str>,
// The common case is not to specify fields. Let's make this fast.
first_selection: Selection,
other_selections: Box<[Selection]>,
#[derive(Clone, Debug)]
pub struct Line<'a> {
line: &'a str,
selections: Box<[Selection<'a>]>,
}
impl Line {
/// Estimate the number of bytes that this Line is occupying
pub fn estimate_size(&self) -> usize {
self.line.len()
+ self.other_selections.len() * std::mem::size_of::<Selection>()
+ std::mem::size_of::<Self>()
}
pub fn new(line: String, settings: &GlobalSettings) -> Self {
impl<'a> Line<'a> {
fn create(string: &'a str, settings: &GlobalSettings) -> Self {
let fields = if settings
.selectors
.iter()
.any(|selector| selector.needs_tokens())
.any(|selector| selector.needs_tokens)
{
// Only tokenize if we will need tokens.
Some(tokenize(&line, settings.separator))
Some(tokenize(string, settings.separator))
} else {
None
};
let mut selectors = settings.selectors.iter();
Line {
line: string,
selections: settings
.selectors
.iter()
.filter(|selector| !selector.is_default_selection)
.map(|selector| selector.get_selection(string, fields.as_deref()))
.collect(),
}
}
let first_selection = selectors
.next()
.unwrap()
.get_selection(&line, fields.as_deref());
let other_selections: Vec<Selection> = selectors
.map(|selector| selector.get_selection(&line, fields.as_deref()))
.collect();
Self {
line: line.into_boxed_str(),
first_selection,
other_selections: other_selections.into_boxed_slice(),
fn print(&self, writer: &mut impl Write, settings: &GlobalSettings) {
if settings.zero_terminated && !settings.debug {
crash_if_err!(1, writer.write_all(self.line.as_bytes()));
crash_if_err!(1, writer.write_all("\0".as_bytes()));
} else if !settings.debug {
crash_if_err!(1, writer.write_all(self.line.as_bytes()));
crash_if_err!(1, writer.write_all("\n".as_bytes()));
} else {
crash_if_err!(1, self.print_debug(settings, writer));
}
}
@ -314,7 +294,7 @@ impl Line {
fn print_debug(
&self,
settings: &GlobalSettings,
writer: &mut dyn Write,
writer: &mut impl Write,
) -> std::io::Result<()> {
// We do not consider this function performance critical, as debug output is only useful for small files,
// which are not a performance problem in any case. Therefore there aren't any special performance
@ -575,23 +555,39 @@ struct FieldSelector {
from: KeyPosition,
to: Option<KeyPosition>,
settings: KeySettings,
needs_tokens: bool,
// Whether the selection for each line is going to be the whole line with no NumCache
is_default_selection: bool,
}
impl FieldSelector {
fn needs_tokens(&self) -> bool {
self.from.field != 1 || self.from.char == 0 || self.to.is_some()
fn new(from: KeyPosition, to: Option<KeyPosition>, settings: KeySettings) -> Self {
Self {
is_default_selection: from.field == 1
&& from.char == 1
&& to.is_none()
// TODO: Once our MinRustV is 1.42 or higher, change this to the matches! macro
&& match settings.mode {
SortMode::Numeric | SortMode::GeneralNumeric | SortMode::HumanNumeric => false,
_ => true,
},
needs_tokens: from.field != 1 || from.char == 0 || to.is_some(),
from,
to,
settings,
}
}
/// Get the selection that corresponds to this selector for the line.
/// If needs_fields returned false, tokens may be None.
fn get_selection(&self, line: &str, tokens: Option<&[Field]>) -> Selection {
let mut range = SelectionRange::new(self.get_range(&line, tokens));
fn get_selection<'a>(&self, line: &'a str, tokens: Option<&[Field]>) -> Selection<'a> {
let mut range = &line[self.get_range(&line, tokens)];
let num_cache = if self.settings.mode == SortMode::Numeric
|| self.settings.mode == SortMode::HumanNumeric
{
// Parse NumInfo for this number.
let (info, num_range) = NumInfo::parse(
range.get_str(&line),
range,
NumInfoParseSettings {
accept_si_units: self.settings.mode == SortMode::HumanNumeric,
thousands_separator: Some(THOUSANDS_SEP),
@ -599,19 +595,21 @@ impl FieldSelector {
},
);
// Shorten the range to what we need to pass to numeric_str_cmp later.
range.shorten(num_range);
range = &range[num_range];
Some(Box::new(NumCache::WithInfo(info)))
} else if self.settings.mode == SortMode::GeneralNumeric {
// Parse this number as f64, as this is the requirement for general numeric sorting.
let str = range.get_str(&line);
Some(Box::new(NumCache::AsF64(general_f64_parse(
&str[get_leading_gen(str)],
&range[get_leading_gen(range)],
))))
} else {
// This is not a numeric sort, so we don't need a NumCache.
None
};
Selection { range, num_cache }
Selection {
slice: range,
num_cache,
}
}
/// Look up the range in the line that corresponds to this selector.
@ -701,91 +699,6 @@ impl FieldSelector {
}
}
struct MergeableFile<'a> {
lines: Box<dyn Iterator<Item = Line> + 'a>,
current_line: Line,
settings: &'a GlobalSettings,
file_index: usize,
}
// BinaryHeap depends on `Ord`. Note that we want to pop smallest items
// from the heap first, and BinaryHeap.pop() returns the largest, so we
// trick it into the right order by calling reverse() here.
impl<'a> Ord for MergeableFile<'a> {
fn cmp(&self, other: &MergeableFile) -> Ordering {
let comparison = compare_by(&self.current_line, &other.current_line, self.settings);
if comparison == Ordering::Equal {
// If lines are equal, the earlier file takes precedence.
self.file_index.cmp(&other.file_index)
} else {
comparison
}
.reverse()
}
}
impl<'a> PartialOrd for MergeableFile<'a> {
fn partial_cmp(&self, other: &MergeableFile) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<'a> PartialEq for MergeableFile<'a> {
fn eq(&self, other: &MergeableFile) -> bool {
Ordering::Equal == self.cmp(other)
}
}
impl<'a> Eq for MergeableFile<'a> {}
struct FileMerger<'a> {
heap: BinaryHeap<MergeableFile<'a>>,
settings: &'a GlobalSettings,
}
impl<'a> FileMerger<'a> {
fn new(settings: &'a GlobalSettings) -> FileMerger<'a> {
FileMerger {
heap: BinaryHeap::new(),
settings,
}
}
fn push_file(&mut self, mut lines: Box<dyn Iterator<Item = Line> + 'a>) {
if let Some(next_line) = lines.next() {
let mergeable_file = MergeableFile {
lines,
current_line: next_line,
settings: &self.settings,
file_index: self.heap.len(),
};
self.heap.push(mergeable_file);
}
}
}
impl<'a> Iterator for FileMerger<'a> {
type Item = Line;
fn next(&mut self) -> Option<Line> {
match self.heap.pop() {
Some(mut current) => {
match current.lines.next() {
Some(next_line) => {
let ret = replace(&mut current.current_line, next_line);
self.heap.push(current);
Some(ret)
}
_ => {
// Don't put it back in the heap (it's empty/erroring)
// but its first line is still valid.
Some(current.current_line)
}
}
}
None => None,
}
}
}
fn get_usage() -> String {
format!(
"{0} {1}
@ -985,7 +898,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
let mut files = Vec::new();
for path in &files0_from {
let (reader, _) = open(path.as_str()).expect("Could not read from file specified.");
let reader = open(path.as_str()).expect("Could not read from file specified.");
let buf_reader = BufReader::new(reader);
for line in buf_reader.split(b'\0').flatten() {
files.push(
@ -1112,11 +1025,7 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
let to = from_to
.next()
.map(|to| KeyPosition::parse(to, 0, &mut key_settings));
let field_selector = FieldSelector {
from,
to,
settings: key_settings,
};
let field_selector = FieldSelector::new(from, to, key_settings);
settings.selectors.push(field_selector);
}
}
@ -1124,48 +1033,21 @@ pub fn uumain(args: impl uucore::Args) -> i32 {
if !settings.stable || !matches.is_present(OPT_KEY) {
// add a default selector matching the whole line
let key_settings = KeySettings::from(&settings);
settings.selectors.push(FieldSelector {
from: KeyPosition {
settings.selectors.push(FieldSelector::new(
KeyPosition {
field: 1,
char: 1,
ignore_blanks: key_settings.ignore_blanks,
},
to: None,
settings: key_settings,
});
None,
key_settings,
));
}
exec(files, settings)
exec(&files, &settings)
}
fn file_to_lines_iter(
file: impl AsRef<OsStr>,
settings: &'_ GlobalSettings,
) -> Option<impl Iterator<Item = Line> + '_> {
let (reader, _) = match open(file) {
Some(x) => x,
None => return None,
};
let buf_reader = BufReader::new(reader);
Some(
buf_reader
.split(if settings.zero_terminated {
b'\0'
} else {
b'\n'
})
.map(move |line| {
Line::new(
crash_if_err!(1, String::from_utf8(crash_if_err!(1, line))),
settings,
)
}),
)
}
fn output_sorted_lines(iter: impl Iterator<Item = Line>, settings: &GlobalSettings) {
fn output_sorted_lines<'a>(iter: impl Iterator<Item = Line<'a>>, settings: &GlobalSettings) {
if settings.unique {
print_sorted(
iter.dedup_by(|a, b| compare_by(a, b, &settings) == Ordering::Equal),
@ -1176,87 +1058,48 @@ fn output_sorted_lines(iter: impl Iterator<Item = Line>, settings: &GlobalSettin
}
}
fn exec(files: Vec<String>, settings: GlobalSettings) -> i32 {
fn exec(files: &[String], settings: &GlobalSettings) -> i32 {
if settings.merge {
let mut file_merger = FileMerger::new(&settings);
for lines in files
.iter()
.filter_map(|file| file_to_lines_iter(file, &settings))
{
file_merger.push_file(Box::new(lines));
let mut file_merger = merge::merge(files, settings);
file_merger.write_all(settings);
} else if settings.check {
if files.len() > 1 {
crash!(1, "only one file allowed with -c");
}
output_sorted_lines(file_merger, &settings);
return check::check(files.first().unwrap(), settings);
} else if settings.ext_sort {
let mut lines = files.iter().filter_map(open);
let mut sorted = ext_sort(&mut lines, &settings);
sorted.file_merger.write_all(settings);
} else {
let lines = files
.iter()
.filter_map(|file| file_to_lines_iter(file, &settings))
.flatten();
let separator = if settings.zero_terminated { '\0' } else { '\n' };
let mut lines = vec![];
let mut full_string = String::new();
if settings.check {
return exec_check_file(lines, &settings);
}
for mut file in files.iter().filter_map(open) {
crash_if_err!(1, file.read_to_string(&mut full_string));
// Only use ext_sorter when we need to.
// Probably faster that we don't create
// an owned value each run
if settings.ext_sort {
let sorted_lines = ext_sort(lines, &settings);
output_sorted_lines(sorted_lines, &settings);
} else {
let mut lines = vec![];
// This is duplicated from fn file_to_lines_iter, but using that function directly results in a performance regression.
for (file, _) in files.iter().map(open).flatten() {
let buf_reader = BufReader::new(file);
for line in buf_reader.split(if settings.zero_terminated {
b'\0'
} else {
b'\n'
}) {
let string = crash_if_err!(1, String::from_utf8(crash_if_err!(1, line)));
lines.push(Line::new(string, &settings));
}
if !full_string.ends_with(separator) {
full_string.push(separator);
}
sort_by(&mut lines, &settings);
output_sorted_lines(lines.into_iter(), &settings);
}
}
if full_string.ends_with(separator) {
full_string.pop();
}
for line in full_string.split(if settings.zero_terminated { '\0' } else { '\n' }) {
lines.push(Line::create(line, &settings));
}
sort_by(&mut lines, &settings);
output_sorted_lines(lines.into_iter(), &settings);
}
0
}
fn exec_check_file(unwrapped_lines: impl Iterator<Item = Line>, settings: &GlobalSettings) -> i32 {
// errors yields the line before each disorder,
// plus the last line (quirk of .coalesce())
let mut errors = unwrapped_lines
.enumerate()
.coalesce(|(last_i, last_line), (i, line)| {
if compare_by(&last_line, &line, &settings) == Ordering::Greater {
Err(((last_i, last_line), (i, line)))
} else {
Ok((i, line))
}
});
if let Some((first_error_index, _line)) = errors.next() {
// Check for a second "error", as .coalesce() always returns the last
// line, no matter what our merging function does.
if let Some(_last_line_or_next_error) = errors.next() {
if !settings.check_silent {
println!("sort: disorder in line {}", first_error_index);
};
1
} else {
// first "error" was actually the last line.
0
}
} else {
// unwrapped_lines was empty. Empty files are defined to be sorted.
0
}
}
fn sort_by(unsorted: &mut Vec<Line>, settings: &GlobalSettings) {
fn sort_by<'a>(unsorted: &mut Vec<Line<'a>>, settings: &GlobalSettings) {
if settings.stable || settings.unique {
unsorted.par_sort_by(|a, b| compare_by(a, b, &settings))
} else {
@ -1264,19 +1107,39 @@ fn sort_by(unsorted: &mut Vec<Line>, settings: &GlobalSettings) {
}
}
fn compare_by(a: &Line, b: &Line, global_settings: &GlobalSettings) -> Ordering {
for (idx, selector) in global_settings.selectors.iter().enumerate() {
let (a_selection, b_selection) = if idx == 0 {
(&a.first_selection, &b.first_selection)
fn compare_by<'a>(a: &Line<'a>, b: &Line<'a>, global_settings: &GlobalSettings) -> Ordering {
let mut idx = 0;
for selector in &global_settings.selectors {
let mut _selections = None;
let (a_selection, b_selection) = if selector.is_default_selection {
// We can select the whole line.
// We have to store the selections outside of the if-block so that they live long enough.
_selections = Some((
Selection {
slice: a.line,
num_cache: None,
},
Selection {
slice: b.line,
num_cache: None,
},
));
// Unwrap the selections again, and return references to them.
(
&_selections.as_ref().unwrap().0,
&_selections.as_ref().unwrap().1,
)
} else {
(&a.other_selections[idx - 1], &b.other_selections[idx - 1])
let selections = (&a.selections[idx], &b.selections[idx]);
idx += 1;
selections
};
let a_str = a_selection.get_str(a);
let b_str = b_selection.get_str(b);
let a_str = a_selection.slice;
let b_str = b_selection.slice;
let settings = &selector.settings;
let cmp: Ordering = if settings.random {
random_shuffle(a_str, b_str, global_settings.salt.clone())
random_shuffle(a_str, b_str, &global_settings.salt)
} else {
match settings.mode {
SortMode::Numeric | SortMode::HumanNumeric => numeric_str_cmp(
@ -1307,7 +1170,7 @@ fn compare_by(a: &Line, b: &Line, global_settings: &GlobalSettings) -> Ordering
let cmp = if global_settings.random || global_settings.stable || global_settings.unique {
Ordering::Equal
} else {
a.line.cmp(&b.line)
a.line.cmp(b.line)
};
if global_settings.reverse {
@ -1362,7 +1225,7 @@ fn get_leading_gen(input: &str) -> Range<usize> {
leading_whitespace_len..input.len()
}
#[derive(Copy, Clone, PartialEq, PartialOrd)]
#[derive(Copy, Clone, PartialEq, PartialOrd, Debug)]
enum GeneralF64ParseResult {
Invalid,
NaN,
@ -1408,12 +1271,11 @@ fn get_hash<T: Hash>(t: &T) -> u64 {
s.finish()
}
fn random_shuffle(a: &str, b: &str, x: String) -> Ordering {
fn random_shuffle(a: &str, b: &str, salt: &str) -> Ordering {
#![allow(clippy::comparison_chain)]
let salt_slice = x.as_str();
let da = get_hash(&[a, salt_slice].concat());
let db = get_hash(&[b, salt_slice].concat());
let da = get_hash(&[a, salt].concat());
let db = get_hash(&[b, salt].concat());
da.cmp(&db)
}
@ -1504,45 +1366,23 @@ fn version_compare(a: &str, b: &str) -> Ordering {
}
}
fn print_sorted<T: Iterator<Item = Line>>(iter: T, settings: &GlobalSettings) {
let mut file: Box<dyn Write> = match settings.outfile {
Some(ref filename) => match File::create(Path::new(&filename)) {
Ok(f) => Box::new(BufWriter::new(f)) as Box<dyn Write>,
Err(e) => {
show_error!("{0}: {1}", filename, e.to_string());
panic!("Could not open output file");
}
},
None => Box::new(BufWriter::new(stdout())) as Box<dyn Write>,
};
if settings.zero_terminated && !settings.debug {
for line in iter {
crash_if_err!(1, file.write_all(line.line.as_bytes()));
crash_if_err!(1, file.write_all("\0".as_bytes()));
}
} else {
for line in iter {
if !settings.debug {
crash_if_err!(1, file.write_all(line.line.as_bytes()));
crash_if_err!(1, file.write_all("\n".as_bytes()));
} else {
crash_if_err!(1, line.print_debug(settings, &mut file));
}
}
fn print_sorted<'a, T: Iterator<Item = Line<'a>>>(iter: T, settings: &GlobalSettings) {
let mut writer = settings.out_writer();
for line in iter {
line.print(&mut writer, settings);
}
crash_if_err!(1, file.flush());
}
// from cat.rs
fn open(path: impl AsRef<OsStr>) -> Option<(Box<dyn Read>, bool)> {
fn open(path: impl AsRef<OsStr>) -> Option<Box<dyn Read + Send>> {
let path = path.as_ref();
if path == "-" {
let stdin = stdin();
return Some((Box::new(stdin) as Box<dyn Read>, is_stdin_interactive()));
return Some(Box::new(stdin) as Box<dyn Read + Send>);
}
match File::open(Path::new(path)) {
Ok(f) => Some((Box::new(f) as Box<dyn Read>, false)),
Ok(f) => Some(Box::new(f) as Box<dyn Read + Send>),
Err(e) => {
show_error!("{0:?}: {1}", path, e.to_string());
None
@ -1568,7 +1408,7 @@ mod tests {
let b = "Ted";
let c = get_rand_string();
assert_eq!(Ordering::Equal, random_shuffle(a, b, c));
assert_eq!(Ordering::Equal, random_shuffle(a, b, &c));
}
#[test]
@ -1592,7 +1432,7 @@ mod tests {
let b = "9";
let c = get_rand_string();
assert_eq!(Ordering::Equal, random_shuffle(a, b, c));
assert_eq!(Ordering::Equal, random_shuffle(a, b, &c));
}
#[test]
@ -1631,10 +1471,12 @@ mod tests {
fn test_line_size() {
// We should make sure to not regress the size of the Line struct because
// it is unconditional overhead for every line we sort.
assert_eq!(std::mem::size_of::<Line>(), 56);
assert_eq!(std::mem::size_of::<Line>(), 32);
// These are the fields of Line:
assert_eq!(std::mem::size_of::<Box<str>>(), 16);
assert_eq!(std::mem::size_of::<Selection>(), 24);
assert_eq!(std::mem::size_of::<&str>(), 16);
assert_eq!(std::mem::size_of::<Box<[Selection]>>(), 16);
// How big is a selection? Constant cost all lines pay when we need selections.
assert_eq!(std::mem::size_of::<Selection>(), 24);
}
}

View file

@ -122,7 +122,7 @@ fn test_check_zero_terminated_failure() {
.arg("-c")
.arg("zero-terminated.txt")
.fails()
.stdout_is("sort: disorder in line 0\n");
.stdout_is("sort: zero-terminated.txt:2: disorder: ../../fixtures/du\n");
}
#[test]
@ -621,7 +621,7 @@ fn test_check() {
.arg("-c")
.arg("check_fail.txt")
.fails()
.stdout_is("sort: disorder in line 4\n");
.stdout_is("sort: check_fail.txt:6: disorder: 5\n");
new_ucmd!()
.arg("-c")