mirror of
https://github.com/rust-lang/cargo
synced 2024-09-13 21:11:44 +00:00
Replace std::sync::mpsc
with a much simpler queue
We don't need the complexity of most channels since this is not a performance sensitive part of Cargo, nor is it likely to be so any time soon. Coupled with recent bugs (#7840) we believe in `std::sync::mpsc`, let's just not use that and use a custom queue type locally which should be amenable to a blocking push soon too.
This commit is contained in:
parent
43aafb416f
commit
458138bf02
|
@ -23,7 +23,6 @@ atty = "0.2"
|
|||
bytesize = "1.0"
|
||||
cargo-platform = { path = "crates/cargo-platform", version = "0.1.1" }
|
||||
crates-io = { path = "crates/crates-io", version = "0.31" }
|
||||
crossbeam-channel = "0.4"
|
||||
crossbeam-utils = "0.7"
|
||||
crypto-hash = "0.3.1"
|
||||
curl = { version = "0.4.23", features = ["http2"] }
|
||||
|
|
|
@ -58,7 +58,6 @@ use std::sync::Arc;
|
|||
use std::time::Duration;
|
||||
|
||||
use anyhow::format_err;
|
||||
use crossbeam_channel::{unbounded, Receiver, Sender};
|
||||
use crossbeam_utils::thread::Scope;
|
||||
use jobserver::{Acquired, Client, HelperThread};
|
||||
use log::{debug, info, trace};
|
||||
|
@ -73,6 +72,7 @@ use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
|
|||
use crate::core::{PackageId, TargetKind};
|
||||
use crate::util;
|
||||
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
|
||||
use crate::util::Queue;
|
||||
use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
|
||||
use crate::util::{Config, DependencyQueue};
|
||||
use crate::util::{Progress, ProgressStyle};
|
||||
|
@ -98,8 +98,7 @@ struct DrainState<'a, 'cfg> {
|
|||
total_units: usize,
|
||||
|
||||
queue: DependencyQueue<Unit<'a>, Artifact, Job>,
|
||||
tx: Sender<Message>,
|
||||
rx: Receiver<Message>,
|
||||
messages: Arc<Queue<Message>>,
|
||||
active: HashMap<JobId, Unit<'a>>,
|
||||
compiled: HashSet<PackageId>,
|
||||
documented: HashSet<PackageId>,
|
||||
|
@ -145,7 +144,7 @@ impl std::fmt::Display for JobId {
|
|||
|
||||
pub struct JobState<'a> {
|
||||
/// Channel back to the main thread to coordinate messages and such.
|
||||
tx: Sender<Message>,
|
||||
messages: Arc<Queue<Message>>,
|
||||
|
||||
/// The job id that this state is associated with, used when sending
|
||||
/// messages back to the main thread.
|
||||
|
@ -199,7 +198,7 @@ enum Message {
|
|||
|
||||
impl<'a> JobState<'a> {
|
||||
pub fn running(&self, cmd: &ProcessBuilder) {
|
||||
let _ = self.tx.send(Message::Run(self.id, cmd.to_string()));
|
||||
self.messages.push(Message::Run(self.id, cmd.to_string()));
|
||||
}
|
||||
|
||||
pub fn build_plan(
|
||||
|
@ -208,17 +207,16 @@ impl<'a> JobState<'a> {
|
|||
cmd: ProcessBuilder,
|
||||
filenames: Arc<Vec<OutputFile>>,
|
||||
) {
|
||||
let _ = self
|
||||
.tx
|
||||
.send(Message::BuildPlanMsg(module_name, cmd, filenames));
|
||||
self.messages
|
||||
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
|
||||
}
|
||||
|
||||
pub fn stdout(&self, stdout: String) {
|
||||
drop(self.tx.send(Message::Stdout(stdout)));
|
||||
self.messages.push(Message::Stdout(stdout));
|
||||
}
|
||||
|
||||
pub fn stderr(&self, stderr: String) {
|
||||
drop(self.tx.send(Message::Stderr(stderr)));
|
||||
self.messages.push(Message::Stderr(stderr));
|
||||
}
|
||||
|
||||
/// A method used to signal to the coordinator thread that the rmeta file
|
||||
|
@ -228,9 +226,8 @@ impl<'a> JobState<'a> {
|
|||
/// produced once!
|
||||
pub fn rmeta_produced(&self) {
|
||||
self.rmeta_required.set(false);
|
||||
let _ = self
|
||||
.tx
|
||||
.send(Message::Finish(self.id, Artifact::Metadata, Ok(())));
|
||||
self.messages
|
||||
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
|
||||
}
|
||||
|
||||
/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
|
||||
|
@ -239,14 +236,14 @@ impl<'a> JobState<'a> {
|
|||
/// This should arrange for the associated client to eventually get a token via
|
||||
/// `client.release_raw()`.
|
||||
pub fn will_acquire(&self) {
|
||||
let _ = self.tx.send(Message::NeedsToken(self.id));
|
||||
self.messages.push(Message::NeedsToken(self.id));
|
||||
}
|
||||
|
||||
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
|
||||
///
|
||||
/// Note that it does *not* write that token back anywhere.
|
||||
pub fn release_token(&self) {
|
||||
let _ = self.tx.send(Message::ReleaseToken(self.id));
|
||||
self.messages.push(Message::ReleaseToken(self.id));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -340,13 +337,11 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
|
|||
let _p = profile::start("executing the job graph");
|
||||
self.queue.queue_finished();
|
||||
|
||||
let (tx, rx) = unbounded();
|
||||
let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
|
||||
let state = DrainState {
|
||||
total_units: self.queue.len(),
|
||||
queue: self.queue,
|
||||
tx,
|
||||
rx,
|
||||
messages: Arc::new(Queue::new()),
|
||||
active: HashMap::new(),
|
||||
compiled: HashSet::new(),
|
||||
documented: HashSet::new(),
|
||||
|
@ -354,7 +349,6 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
|
|||
progress,
|
||||
next_id: 0,
|
||||
timings: self.timings,
|
||||
|
||||
tokens: Vec::new(),
|
||||
rustc_tokens: HashMap::new(),
|
||||
to_send_clients: BTreeMap::new(),
|
||||
|
@ -364,25 +358,25 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
|
|||
};
|
||||
|
||||
// Create a helper thread for acquiring jobserver tokens
|
||||
let tx = state.tx.clone();
|
||||
let messages = state.messages.clone();
|
||||
let helper = cx
|
||||
.jobserver
|
||||
.clone()
|
||||
.into_helper_thread(move |token| {
|
||||
drop(tx.send(Message::Token(token)));
|
||||
drop(messages.push(Message::Token(token)));
|
||||
})
|
||||
.chain_err(|| "failed to create helper thread for jobserver management")?;
|
||||
|
||||
// Create a helper thread to manage the diagnostics for rustfix if
|
||||
// necessary.
|
||||
let tx = state.tx.clone();
|
||||
let messages = state.messages.clone();
|
||||
let _diagnostic_server = cx
|
||||
.bcx
|
||||
.build_config
|
||||
.rustfix_diagnostic_server
|
||||
.borrow_mut()
|
||||
.take()
|
||||
.map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
|
||||
.map(move |srv| srv.start(move |msg| drop(messages.push(Message::FixDiagnostic(msg)))));
|
||||
|
||||
crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper))
|
||||
.expect("child threads shouldn't panic")
|
||||
|
@ -584,7 +578,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
|
|||
// to run above to calculate CPU usage over time. To do this we
|
||||
// listen for a message with a timeout, and on timeout we run the
|
||||
// previous parts of the loop again.
|
||||
let events: Vec<_> = self.rx.try_iter().collect();
|
||||
let mut events = Vec::new();
|
||||
while let Some(event) = self.messages.try_pop() {
|
||||
events.push(event);
|
||||
}
|
||||
info!(
|
||||
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
|
||||
self.tokens.len(),
|
||||
|
@ -602,14 +599,16 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
|
|||
loop {
|
||||
self.tick_progress();
|
||||
self.tokens.truncate(self.active.len() - 1);
|
||||
match self.rx.recv_timeout(Duration::from_millis(500)) {
|
||||
Ok(message) => break vec![message],
|
||||
Err(_) => continue,
|
||||
match self.messages.pop(Duration::from_millis(500)) {
|
||||
Some(message) => {
|
||||
events.push(message);
|
||||
break;
|
||||
}
|
||||
None => continue,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
events
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
fn drain_the_queue(
|
||||
|
@ -756,7 +755,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
|
|||
assert!(self.active.insert(id, *unit).is_none());
|
||||
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
|
||||
|
||||
let my_tx = self.tx.clone();
|
||||
let messages = self.messages.clone();
|
||||
let fresh = job.freshness();
|
||||
let rmeta_required = cx.rmeta_required(unit);
|
||||
|
||||
|
@ -768,13 +767,13 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
|
|||
let doit = move || {
|
||||
let state = JobState {
|
||||
id,
|
||||
tx: my_tx.clone(),
|
||||
messages: messages.clone(),
|
||||
rmeta_required: Cell::new(rmeta_required),
|
||||
_marker: marker::PhantomData,
|
||||
};
|
||||
|
||||
let mut sender = FinishOnDrop {
|
||||
tx: &my_tx,
|
||||
messages: &messages,
|
||||
id,
|
||||
result: Err(format_err!("worker panicked")),
|
||||
};
|
||||
|
@ -793,9 +792,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
|
|||
// we need to make sure that the metadata is flagged as produced so
|
||||
// send a synthetic message here.
|
||||
if state.rmeta_required.get() && sender.result.is_ok() {
|
||||
my_tx
|
||||
.send(Message::Finish(id, Artifact::Metadata, Ok(())))
|
||||
.unwrap();
|
||||
messages.push(Message::Finish(id, Artifact::Metadata, Ok(())));
|
||||
}
|
||||
|
||||
// Use a helper struct with a `Drop` implementation to guarantee
|
||||
|
@ -803,7 +800,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
|
|||
// shouldn't panic unless there's a bug in Cargo, so we just need
|
||||
// to make sure nothing hangs by accident.
|
||||
struct FinishOnDrop<'a> {
|
||||
tx: &'a Sender<Message>,
|
||||
messages: &'a Queue<Message>,
|
||||
id: JobId,
|
||||
result: CargoResult<()>,
|
||||
}
|
||||
|
@ -811,7 +808,8 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
|
|||
impl Drop for FinishOnDrop<'_> {
|
||||
fn drop(&mut self) {
|
||||
let msg = mem::replace(&mut self.result, Ok(()));
|
||||
drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg)));
|
||||
self.messages
|
||||
.push(Message::Finish(self.id, Artifact::All, msg));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -18,6 +18,7 @@ pub use self::paths::{bytes2path, dylib_path, join_paths, path2bytes};
|
|||
pub use self::paths::{dylib_path_envvar, normalize_path};
|
||||
pub use self::process_builder::{process, ProcessBuilder};
|
||||
pub use self::progress::{Progress, ProgressStyle};
|
||||
pub use self::queue::Queue;
|
||||
pub use self::read2::read2;
|
||||
pub use self::restricted_names::validate_package_name;
|
||||
pub use self::rustc::Rustc;
|
||||
|
@ -51,6 +52,7 @@ pub mod paths;
|
|||
pub mod process_builder;
|
||||
pub mod profile;
|
||||
mod progress;
|
||||
mod queue;
|
||||
mod read2;
|
||||
pub mod restricted_names;
|
||||
pub mod rustc;
|
||||
|
|
54
src/cargo/util/queue.rs
Normal file
54
src/cargo/util/queue.rs
Normal file
|
@ -0,0 +1,54 @@
|
|||
use std::collections::VecDeque;
|
||||
use std::sync::{Condvar, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// A simple, threadsafe, queue of items of type `T`
|
||||
///
|
||||
/// This is a sort of channel where any thread can push to a queue and any
|
||||
/// thread can pop from a queue. Currently queues have infinite capacity where
|
||||
/// `push` will never block but `pop` will block.
|
||||
pub struct Queue<T> {
|
||||
state: Mutex<State<T>>,
|
||||
condvar: Condvar,
|
||||
}
|
||||
|
||||
struct State<T> {
|
||||
items: VecDeque<T>,
|
||||
}
|
||||
|
||||
impl<T> Queue<T> {
|
||||
pub fn new() -> Queue<T> {
|
||||
Queue {
|
||||
state: Mutex::new(State {
|
||||
items: VecDeque::new(),
|
||||
}),
|
||||
condvar: Condvar::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn push(&self, item: T) {
|
||||
self.state.lock().unwrap().items.push_back(item);
|
||||
self.condvar.notify_one();
|
||||
}
|
||||
|
||||
pub fn pop(&self, timeout: Duration) -> Option<T> {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
let now = Instant::now();
|
||||
while state.items.is_empty() {
|
||||
let elapsed = now.elapsed();
|
||||
if elapsed >= timeout {
|
||||
break;
|
||||
}
|
||||
let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap();
|
||||
state = lock;
|
||||
if result.timed_out() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
state.items.pop_front()
|
||||
}
|
||||
|
||||
pub fn try_pop(&self) -> Option<T> {
|
||||
self.state.lock().unwrap().items.pop_front()
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue