From 93e54884202723255395664ac2f0926f42f09ecd Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Mon, 24 Oct 2022 11:38:20 -0400 Subject: [PATCH 1/4] walk: Simplify the code in scan() a bit --- src/walk.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/walk.rs b/src/walk.rs index 243eef9..7998282 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -49,14 +49,11 @@ pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); /// If the `--exec` argument was supplied, this will create a thread pool for executing /// jobs in parallel from a given command line and the discovered paths. Otherwise, each /// path will simply be written to standard output. -pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) -> Result { - let mut path_iter = path_vec.iter(); - let first_path_buf = path_iter - .next() - .expect("Error: Path vector can not be empty"); +pub fn scan(paths: &[PathBuf], pattern: Arc, config: Arc) -> Result { + let first_path = &paths[0]; let (tx, rx) = channel(); - let mut override_builder = OverrideBuilder::new(first_path_buf.as_path()); + let mut override_builder = OverrideBuilder::new(first_path); for pattern in &config.exclude_patterns { override_builder @@ -67,7 +64,7 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) -> R .build() .map_err(|_| anyhow!("Mismatch in exclude patterns"))?; - let mut walker = WalkBuilder::new(first_path_buf.as_path()); + let mut walker = WalkBuilder::new(first_path); walker .hidden(config.ignore_hidden) .ignore(config.read_fdignore) @@ -121,8 +118,8 @@ pub fn scan(path_vec: &[PathBuf], pattern: Arc, config: Arc) -> R } } - for path_entry in path_iter { - walker.add(path_entry.as_path()); + for path in &paths[1..] { + walker.add(path); } let parallel_walker = walker.threads(config.threads).build_parallel(); From 5bb7a52704175ad0d1a545646c96bd72cd1f4c58 Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Mon, 24 Oct 2022 10:17:02 -0400 Subject: [PATCH 2/4] walk: Switch back to crossbeam-channel Fixes #933. Fixes #1060. Fixes #1113. --- Cargo.lock | 11 +++++++++++ Cargo.toml | 1 + src/exec/job.rs | 12 ++++-------- src/walk.rs | 14 ++++---------- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e856d0d..0b21f8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -162,6 +162,16 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +[[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.12" @@ -295,6 +305,7 @@ dependencies = [ "chrono", "clap", "clap_complete", + "crossbeam-channel", "ctrlc", "diff", "dirs-next", diff --git a/Cargo.toml b/Cargo.toml index 5f627fd..fad68d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ dirs-next = "2.0" normpath = "0.3.2" chrono = "0.4" once_cell = "1.15.0" +crossbeam-channel = "0.5.6" [dependencies.clap] version = "3.1" diff --git a/src/exec/job.rs b/src/exec/job.rs index b803f79..1617ea7 100644 --- a/src/exec/job.rs +++ b/src/exec/job.rs @@ -1,6 +1,7 @@ -use std::sync::mpsc::Receiver; use std::sync::{Arc, Mutex}; +use crossbeam_channel::Receiver; + use crate::config::Config; use crate::dir_entry::DirEntry; use crate::error::print_error; @@ -13,7 +14,7 @@ use super::CommandSet; /// generate a command with the supplied command template. The generated command will then /// be executed, and this process will continue until the receiver's sender has closed. pub fn job( - rx: Arc>>, + rx: Receiver, cmd: Arc, out_perm: Arc>, config: &Config, @@ -23,12 +24,9 @@ pub fn job( let mut results: Vec = Vec::new(); loop { - // Create a lock on the shared receiver for this thread. - let lock = rx.lock().unwrap(); - // Obtain the next result from the receiver, else if the channel // has closed, exit from the loop - let dir_entry: DirEntry = match lock.recv() { + let dir_entry: DirEntry = match rx.recv() { Ok(WorkerResult::Entry(dir_entry)) => dir_entry, Ok(WorkerResult::Error(err)) => { if config.show_filesystem_errors { @@ -39,8 +37,6 @@ pub fn job( Err(_) => break, }; - // Drop the lock so that other threads can read from the receiver. - drop(lock); // Generate a command, execute it and store its exit code. results.push(cmd.execute( dir_entry.stripped_path(config), diff --git a/src/walk.rs b/src/walk.rs index 7998282..ef618ab 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -3,13 +3,13 @@ use std::io; use std::mem; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{borrow::Cow, io::Write}; use anyhow::{anyhow, Result}; +use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; use ignore::overrides::OverrideBuilder; use ignore::{self, WalkBuilder}; use regex::bytes::Regex; @@ -51,7 +51,7 @@ pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); /// path will simply be written to standard output. pub fn scan(paths: &[PathBuf], pattern: Arc, config: Arc) -> Result { let first_path = &paths[0]; - let (tx, rx) = channel(); + let (tx, rx) = unbounded(); let mut override_builder = OverrideBuilder::new(first_path); @@ -222,11 +222,7 @@ impl ReceiverBuffer { match self.mode { ReceiverMode::Buffering => { // Wait at most until we should switch to streaming - let now = Instant::now(); - self.deadline - .checked_duration_since(now) - .ok_or(RecvTimeoutError::Timeout) - .and_then(|t| self.rx.recv_timeout(t)) + self.rx.recv_deadline(self.deadline) } ReceiverMode::Streaming => { // Wait however long it takes for a result @@ -345,15 +341,13 @@ fn spawn_receiver( if cmd.in_batch_mode() { exec::batch(rx, cmd, &config) } else { - let shared_rx = Arc::new(Mutex::new(rx)); - let out_perm = Arc::new(Mutex::new(())); // Each spawned job will store it's thread handle in here. let mut handles = Vec::with_capacity(threads); for _ in 0..threads { let config = Arc::clone(&config); - let rx = Arc::clone(&shared_rx); + let rx = rx.clone(); let cmd = Arc::clone(cmd); let out_perm = Arc::clone(&out_perm); From 527840526396341f027b17f4d51eeb5ecdf8eacb Mon Sep 17 00:00:00 2001 From: Tavian Barnes Date: Mon, 24 Oct 2022 10:20:46 -0400 Subject: [PATCH 3/4] walk: Use a bounded queue. Fixes #918. --- src/walk.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/walk.rs b/src/walk.rs index ef618ab..b5b6650 100644 --- a/src/walk.rs +++ b/src/walk.rs @@ -9,7 +9,7 @@ use std::time::{Duration, Instant}; use std::{borrow::Cow, io::Write}; use anyhow::{anyhow, Result}; -use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}; +use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; use ignore::overrides::OverrideBuilder; use ignore::{self, WalkBuilder}; use regex::bytes::Regex; @@ -51,7 +51,9 @@ pub const DEFAULT_MAX_BUFFER_TIME: Duration = Duration::from_millis(100); /// path will simply be written to standard output. pub fn scan(paths: &[PathBuf], pattern: Arc, config: Arc) -> Result { let first_path = &paths[0]; - let (tx, rx) = unbounded(); + + // Channel capacity was chosen empircally to perform similarly to an unbounded channel + let (tx, rx) = bounded(0x4000 * config.threads); let mut override_builder = OverrideBuilder::new(first_path); From c6f9595a02750c4015df1781d313c80630608cfb Mon Sep 17 00:00:00 2001 From: David Peter Date: Tue, 1 Nov 2022 19:57:11 +0100 Subject: [PATCH 4/4] Update CHANGELOG --- CHANGELOG.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d3e34c..02858db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ had before 8.3.0, i.e. there will be no leading `./` prefixes, unless `--exec`/`-x`, `--exec-batch`/`-X`, or `--print0`/`-0` are used. `--strip-cwd-prefix` can be used to strip that prefix in those cases. See #1046, #1115, and #1121 (@tavianator) +- `fd` could previously crash with a panic due to a race condition in Rusts standard library + (see https://github.com/rust-lang/rust/issues/39364). This has been fixed by switching to a different + message passing implementation, see #1060 and #1146 (@tavianator) +- `fd`s memory usage will not grow unboundedly on huge directory trees, see #1146 (@tavianator) - fd returns an error when current working directory does not exist while a search path is specified, see #1072 (@vijfhoek) - Improved "command not found" error message, see #1083 and #1109 (@themkat) @@ -17,9 +21,9 @@ ## Changes - No leading `./` prefix for non-interactive results, see above. +- fd now colorizes paths in parallel, significantly improving performance, see #1148 (@tavianator) - fd can now avoid `stat` syscalls even when colorizing paths, as long as the color scheme doesn't require metadata, see #1148 (@tavianator) -- fd now colorizes paths in parallel, significantly improving performance, see #1148 (@tavianator) ## Other