diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d3e34c..02858db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ had before 8.3.0, i.e. there will be no leading `./` prefixes, unless `--exec`/`-x`, `--exec-batch`/`-X`, or `--print0`/`-0` are used. `--strip-cwd-prefix` can be used to strip that prefix in those cases. See #1046, #1115, and #1121 (@tavianator) +- `fd` could previously crash with a panic due to a race condition in Rusts standard library + (see https://github.com/rust-lang/rust/issues/39364). This has been fixed by switching to a different + message passing implementation, see #1060 and #1146 (@tavianator) +- `fd`s memory usage will not grow unboundedly on huge directory trees, see #1146 (@tavianator) - fd returns an error when current working directory does not exist while a search path is specified, see #1072 (@vijfhoek) - Improved "command not found" error message, see #1083 and #1109 (@themkat) @@ -17,9 +21,9 @@ ## Changes - No leading `./` prefix for non-interactive results, see above. +- fd now colorizes paths in parallel, significantly improving performance, see #1148 (@tavianator) - fd can now avoid `stat` syscalls even when colorizing paths, as long as the color scheme doesn't require metadata, see #1148 (@tavianator) -- fd now colorizes paths in parallel, significantly improving performance, see #1148 (@tavianator) ## Other diff --git a/Cargo.lock b/Cargo.lock index d539b6b..80c9b24 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -174,6 +174,16 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.12" @@ -307,6 +317,7 @@ dependencies = [ "chrono", "clap", "clap_complete", + "crossbeam-channel", "ctrlc", "diff", "dirs-next", diff --git a/Cargo.toml b/Cargo.toml index e23fc49..a75a005 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ dirs-next = "2.0" normpath = "0.3.2" chrono = "0.4" once_cell = "1.15.0" +crossbeam-channel = "0.5.6" clap_complete = {version = "4.0", optional = true} [dependencies.clap] diff --git a/src/exec/job.rs b/src/exec/job.rs index 36d52e4..b2ea0a4 100644 --- a/src/exec/job.rs +++ b/src/exec/job.rs @@ -1,6 +1,7 @@ -use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex}; +use crossbeam_channel::Receiver; + use crate::config::Config; use crate::dir_entry::DirEntry; use crate::error::print_error; @@ -13,7 +14,7 @@ use super::CommandSet; /// generate a command with the supplied command template. The generated command will then /// be executed, and this process will continue until the receiver's sender has closed. pub fn job( - rx: Arc>>, + rx: Receiver, cmd: Arc, out_perm: Arc>, config: &Config, @@ -23,12 +24,9 @@ pub fn job( let mut results: Vec = Vec::new(); loop { - // Create a lock on the shared receiver for this thread. - let lock = rx.lock().unwrap(); - // Obtain the next result from the receiver, else if the channel // has closed, exit from the loop - let dir_entry: DirEntry = match lock.recv() { + let dir_entry: DirEntry = match rx.recv() { Ok(WorkerResult::Entry(dir_entry)) => dir_entry, Ok(WorkerResult::Error(err)) => { if config.show_filesystem_errors { @@ -39,8 +37,6 @@ pub fn job( Err(_) => break, }; - // Drop the lock so that other threads can read from the receiver. - drop(lock); // Generate a command, execute it and store its exit code. results.push(cmd.execute( dir_entry.stripped_path(config), diff --git a/src/walk.rs b/src/walk.rs index b1931e3..280fbed 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -3,13 +3,13 @@ use std::io; use std::mem; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{borrow::Cow, io::Write}; use anyhow::{anyhow, Result}; +use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; use ignore::overrides::OverrideBuilder; use ignore::{self, WalkBuilder}; use regex::bytes::Regex; @@ -49,14 +49,13 @@ pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); /// If the `--exec` argument was supplied, this will create a thread pool for executing /// jobs in parallel from a given command line and the discovered paths. Otherwise, each /// path will simply be written to standard output. -pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) -> Result { - let mut path_iter = path_vec.iter(); - let first_path_buf = path_iter - .next() - .expect("Error: Path vector can not be empty"); - let (tx, rx) = channel(); +pub fn scan(paths: &[PathBuf], pattern: Arc, config: Arc) -> Result { + let first_path = &paths[0]; - let mut override_builder = OverrideBuilder::new(first_path_buf.as_path()); + // Channel capacity was chosen empircally to perform similarly to an unbounded channel + let (tx, rx) = bounded(0x4000 * config.threads); + + let mut override_builder = OverrideBuilder::new(first_path); for pattern in &config.exclude_patterns { override_builder @@ -67,7 +66,7 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) -> R .build() .map_err(|_| anyhow!("Mismatch in exclude patterns"))?; - let mut walker = WalkBuilder::new(first_path_buf.as_path()); + let mut walker = WalkBuilder::new(first_path); walker .hidden(config.ignore_hidden) .ignore(config.read_fdignore) @@ -121,8 +120,8 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) -> R } } - for path_entry in path_iter { - walker.add(path_entry.as_path()); + for path in &paths[1..] { + walker.add(path); } let parallel_walker = walker.threads(config.threads).build_parallel(); @@ -225,11 +224,7 @@ impl ReceiverBuffer { match self.mode { ReceiverMode::Buffering => { // Wait at most until we should switch to streaming - let now = Instant::now(); - self.deadline - .checked_duration_since(now) - .ok_or(RecvTimeoutError::Timeout) - .and_then(|t| self.rx.recv_timeout(t)) + self.rx.recv_deadline(self.deadline) } ReceiverMode::Streaming => { // Wait however long it takes for a result @@ -348,15 +343,13 @@ fn spawn_receiver( if cmd.in_batch_mode() { exec::batch(rx, cmd, &config) } else { - let shared_rx = Arc::new(Mutex::new(rx)); - let out_perm = Arc::new(Mutex::new(())); // Each spawned job will store it's thread handle in here. let mut handles = Vec::with_capacity(threads); for _ in 0..threads { let config = Arc::clone(&config); - let rx = Arc::clone(&shared_rx); + let rx = rx.clone(); let cmd = Arc::clone(cmd); let out_perm = Arc::clone(&out_perm);