Auto merge of #11758 - weihanglo:jobqueue, r=epage

refactor(job_queue): docs and move types around
This commit is contained in:
bors 2023-02-22 23:04:16 +00:00
commit 9d5b32f503
6 changed files with 325 additions and 259 deletions

View file

@ -1,5 +1,4 @@
use super::job::{Job, Work};
use super::{fingerprint, Context, LinkType, Unit};
use super::{fingerprint, Context, Job, LinkType, Unit, Work};
use crate::core::compiler::artifact;
use crate::core::compiler::context::Metadata;
use crate::core::compiler::job_queue::JobState;

View file

@ -374,8 +374,7 @@ use crate::util::{internal, path_args, profile, StableHasher};
use crate::{Config, CARGO_ENV};
use super::custom_build::BuildDeps;
use super::job::{Job, Work};
use super::{BuildContext, Context, FileFlavor, Unit};
use super::{BuildContext, Context, FileFlavor, Job, Unit, Work};
pub use dirty_reason::DirtyReason;

View file

@ -1,7 +1,9 @@
//! See [`Job`] and [`Work`].
use std::fmt;
use std::mem;
use super::job_queue::JobState;
use super::JobState;
use crate::core::compiler::fingerprint::DirtyReason;
use crate::util::CargoResult;

View file

@ -0,0 +1,213 @@
//! See [`JobState`].
use std::{cell::Cell, marker, sync::Arc};
use cargo_util::ProcessBuilder;
use crate::core::compiler::context::OutputFile;
use crate::core::compiler::future_incompat::FutureBreakageItem;
use crate::util::Queue;
use crate::CargoResult;
use super::{Artifact, DiagDedupe, Job, JobId, Message};
/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
/// necessary to communicate between the main thread and the execution of the job.
///
/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
/// main thread, the `output` field must be set to prevent a deadlock.
pub struct JobState<'a, 'cfg> {
/// Channel back to the main thread to coordinate messages and such.
///
/// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
/// the message queue to prevent a deadlock.
messages: Arc<Queue<Message>>,
/// Normally output is sent to the job queue with backpressure. When the job is fresh
/// however we need to immediately display the output to prevent a deadlock as the
/// output messages are processed on the same thread as they are sent from. `output`
/// defines where to output in this case.
///
/// Currently the `Shell` inside `Config` is wrapped in a `RefCell` and thus can't be passed
/// between threads. This means that it isn't possible for multiple output messages to be
/// interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
/// interleaving is still prevented as the lock would be held for the whole printing of an
/// output message.
output: Option<&'a DiagDedupe<'cfg>>,
/// The job id that this state is associated with, used when sending
/// messages back to the main thread.
id: JobId,
/// Whether or not we're expected to have a call to `rmeta_produced`. Once
/// that method is called this is dynamically set to `false` to prevent
/// sending a double message later on.
rmeta_required: Cell<bool>,
// Historical versions of Cargo made use of the `'a` argument here, so to
// leave the door open to future refactorings keep it here.
_marker: marker::PhantomData<&'a ()>,
}
impl<'a, 'cfg> JobState<'a, 'cfg> {
pub(super) fn new(
id: JobId,
messages: Arc<Queue<Message>>,
output: Option<&'a DiagDedupe<'cfg>>,
rmeta_required: bool,
) -> Self {
Self {
id,
messages,
output,
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
}
}
pub fn running(&self, cmd: &ProcessBuilder) {
self.messages.push(Message::Run(self.id, cmd.to_string()));
}
pub fn build_plan(
&self,
module_name: String,
cmd: ProcessBuilder,
filenames: Arc<Vec<OutputFile>>,
) {
self.messages
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
}
pub fn stdout(&self, stdout: String) -> CargoResult<()> {
if let Some(dedupe) = self.output {
writeln!(dedupe.config.shell().out(), "{}", stdout)?;
} else {
self.messages.push_bounded(Message::Stdout(stdout));
}
Ok(())
}
pub fn stderr(&self, stderr: String) -> CargoResult<()> {
if let Some(dedupe) = self.output {
let mut shell = dedupe.config.shell();
shell.print_ansi_stderr(stderr.as_bytes())?;
shell.err().write_all(b"\n")?;
} else {
self.messages.push_bounded(Message::Stderr(stderr));
}
Ok(())
}
/// See [`Message::Diagnostic`] and [`Message::WarningCount`].
pub fn emit_diag(&self, level: String, diag: String, fixable: bool) -> CargoResult<()> {
if let Some(dedupe) = self.output {
let emitted = dedupe.emit_diag(&diag)?;
if level == "warning" {
self.messages.push(Message::WarningCount {
id: self.id,
emitted,
fixable,
});
}
} else {
self.messages.push_bounded(Message::Diagnostic {
id: self.id,
level,
diag,
fixable,
});
}
Ok(())
}
/// See [`Message::Warning`].
pub fn warning(&self, warning: String) -> CargoResult<()> {
self.messages.push_bounded(Message::Warning {
id: self.id,
warning,
});
Ok(())
}
/// A method used to signal to the coordinator thread that the rmeta file
/// for an rlib has been produced. This is only called for some rmeta
/// builds when required, and can be called at any time before a job ends.
/// This should only be called once because a metadata file can only be
/// produced once!
pub fn rmeta_produced(&self) {
self.rmeta_required.set(false);
self.messages
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
}
/// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is
/// sent even if our job panics.
pub(super) fn run_to_finish(self, job: Job) {
let mut sender = FinishOnDrop {
messages: &self.messages,
id: self.id,
result: None,
};
sender.result = Some(job.run(&self));
// If the `rmeta_required` wasn't consumed but it was set
// previously, then we either have:
//
// 1. The `job` didn't do anything because it was "fresh".
// 2. The `job` returned an error and didn't reach the point where
// it called `rmeta_produced`.
// 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
//
// Ruling out the third, the other two are pretty common for 2
// we'll just naturally abort the compilation operation but for 1
// we need to make sure that the metadata is flagged as produced so
// send a synthetic message here.
if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
self.messages
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
}
// Use a helper struct with a `Drop` implementation to guarantee
// that a `Finish` message is sent even if our job panics. We
// shouldn't panic unless there's a bug in Cargo, so we just need
// to make sure nothing hangs by accident.
struct FinishOnDrop<'a> {
messages: &'a Queue<Message>,
id: JobId,
result: Option<CargoResult<()>>,
}
impl Drop for FinishOnDrop<'_> {
fn drop(&mut self) {
let result = self
.result
.take()
.unwrap_or_else(|| Err(anyhow::format_err!("worker panicked")));
self.messages
.push(Message::Finish(self.id, Artifact::All, result));
}
}
}
pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
self.messages
.push(Message::FutureIncompatReport(self.id, report));
}
/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
/// on the passed client.
///
/// This should arrange for the associated client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self) {
self.messages.push(Message::NeedsToken(self.id));
}
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
self.messages.push(Message::ReleaseToken(self.id));
}
}

View file

@ -1,16 +1,39 @@
//! This module implements the job queue which determines the ordering in which
//! rustc is spawned off. It also manages the allocation of jobserver tokens to
//! rustc beyond the implicit token each rustc owns (i.e., the ones used for
//! parallel LLVM work and parallel rustc threads).
//! Management of the interaction between the main `cargo` and all spawned jobs.
//!
//! ## Overview
//!
//! This module implements a job queue. A job here represents a unit of work,
//! which is roughly a rusc invocation, a build script run, or just a no-op.
//! The job queue primarily handles the following things:
//!
//! * Spawns concurrent jobs. Depending on its [`Freshness`], a job could be
//! either executed on a spawned thread or ran on the same thread to avoid
//! the threading overhead.
//! * Controls the number of concurrency. It allocates and manages [`jobserver`]
//! tokens to each spawned off rustc and build scripts.
//! * Manages the communication between the main `cargo` process and its
//! spawned jobs. Those [`Message`]s are sent over a [`Queue`] shared
//! across threads.
//! * Schedules the execution order of each [`Job`]. Priorities are determined
//! when calling [`JobQueue::enqueue`] to enqueue a job. The scheduling is
//! relatively rudimentary and could likely be improved.
//!
//! A rough outline of building a queue and executing jobs is:
//!
//! 1. [`JobQueue::new`] to simply create one queue.
//! 2. [`JobQueue::enqueue`] to add new jobs onto the queue.
//! 3. Consumes the queue and executes all jobs via [`JobQueue::execute`].
//!
//! The primary loop happens insides [`JobQueue::execute`], which is effectively
//! [`DrainState::drain_the_queue`]. [`DrainState`] is, as its name tells,
//! the running state of the job queue getting drained.
//!
//! ## Jobserver
//!
//! Cargo and rustc have a somewhat non-trivial jobserver relationship with each
//! other, which is due to scaling issues with sharing a single jobserver
//! amongst what is potentially hundreds of threads of work on many-cored
//! systems on (at least) linux, and likely other platforms as well.
//!
//! The details of this algorithm are (also) written out in
//! src/librustc_jobserver/lib.rs. What follows is a description focusing on the
//! Cargo side of things.
//! systems on (at least) Linux, and likely other platforms as well.
//!
//! Cargo wants to complete the build as quickly as possible, fully saturating
//! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn
@ -23,37 +46,86 @@
//! implement prioritizes spawning as many crates (i.e., rustc processes) as
//! possible, and then filling each rustc with tokens on demand.
//!
//! The primary loop is in `drain_the_queue` below.
//!
//! We integrate with the jobserver, originating from GNU make, to make sure
//! We integrate with the [jobserver], originating from GNU make, to make sure
//! that build scripts which use make to build C code can cooperate with us on
//! the number of used tokens and avoid overfilling the system we're on.
//!
//! The jobserver is unfortunately a very simple protocol, so we enhance it a
//! little when we know that there is a rustc on the other end. Via the stderr
//! pipe we have to rustc, we get messages such as "NeedsToken" and
//! "ReleaseToken" from rustc.
//! pipe we have to rustc, we get messages such as `NeedsToken` and
//! `ReleaseToken` from rustc.
//!
//! "NeedsToken" indicates that a rustc is interested in acquiring a token, but
//! never that it would be impossible to make progress without one (i.e., it
//! would be incorrect for rustc to not terminate due to an unfulfilled
//! NeedsToken request); we do not usually fulfill all NeedsToken requests for a
//! [`NeedsToken`] indicates that a rustc is interested in acquiring a token,
//! but never that it would be impossible to make progress without one (i.e.,
//! it would be incorrect for rustc to not terminate due to an unfulfilled
//! `NeedsToken` request); we do not usually fulfill all `NeedsToken` requests for a
//! given rustc.
//!
//! "ReleaseToken" indicates that a rustc is done with one of its tokens and is
//! ready for us to re-acquire ownership -- we will either release that token
//! [`ReleaseToken`] indicates that a rustc is done with one of its tokens and
//! is ready for us to re-acquire ownership — we will either release that token
//! back into the general pool or reuse it ourselves. Note that rustc will
//! inform us that it is releasing a token even if it itself is also requesting
//! tokens; is is up to us whether to return the token to that same rustc.
//! tokens; is up to us whether to return the token to that same rustc.
//!
//! The current scheduling algorithm is relatively primitive and could likely be
//! improved.
//! `jobserver` also manages the allocation of tokens to rustc beyond
//! the implicit token each rustc owns (i.e., the ones used for parallel LLVM
//! work and parallel rustc threads).
//!
//! ## Scheduling
//!
//! The current scheduling algorithm is not really polished. It is simply based
//! on a dependency graph [`DependencyQueue`]. We continue adding nodes onto
//! the graph until we finalize it. When the graph gets finalized, it finds the
//! sum of the cost of each dependencies of each node, including transitively.
//! The sum of dependency cost turns out to be the cost of each given node.
//!
//! At the time being, the cost is just passed as a fixed placeholder in
//! [`JobQueue::enqueue`]. In the future, we could explore more possibilities
//! around it. For instance, we start persisting timing information for each
//! build somewhere. For a subsequent build, we can look into the historical
//! data and perform a PGO-like optimization to prioritize jobs, making a build
//! fully pipelined.
//!
//! ## Message queue
//!
//! Each spawned thread running a process uses the message queue [`Queue`] to
//! send messages back to the main thread (the one running `cargo`).
//! The main thread coordinates everything, and handles printing output.
//!
//! It is important to be careful which messages use [`push`] vs [`push_bounded`].
//! `push` is for priority messages (like tokens, or "finished") where the
//! sender shouldn't block. We want to handle those so real work can proceed
//! ASAP.
//!
//! `push_bounded` is only for messages being printed to stdout/stderr. Being
//! bounded prevents a flood of messages causing a large amount of memory
//! being used.
//!
//! `push` also avoids blocking which helps avoid deadlocks. For example, when
//! the diagnostic server thread is dropped, it waits for the thread to exit.
//! But if the thread is blocked on a full queue, and there is a critical
//! error, the drop will deadlock. This should be fixed at some point in the
//! future. The jobserver thread has a similar problem, though it will time
//! out after 1 second.
//!
//! To access the message queue, each running `Job` is given its own [`JobState`],
//! containing everything it needs to communicate with the main thread.
//!
//! See [`Message`] for all available message kinds.
//!
//! [jobserver]: https://docs.rs/jobserver
//! [`NeedsToken`]: Message::NeedsToken
//! [`ReleaseToken`]: Message::ReleaseToken
//! [`push`]: Queue::push
//! [`push_bounded`]: Queue::push_bounded
use std::cell::{Cell, RefCell};
mod job;
mod job_state;
use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Write as _;
use std::io;
use std::marker;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread::{self, Scope};
@ -65,11 +137,10 @@ use jobserver::{Acquired, Client, HelperThread};
use log::{debug, trace};
use semver::Version;
pub use self::job::Freshness::{self, Dirty, Fresh};
pub use self::job::{Job, Work};
pub use self::job_state::JobState;
use super::context::OutputFile;
use super::job::{
Freshness::{self, Dirty, Fresh},
Job,
};
use super::timings::Timings;
use super::{BuildContext, BuildPlan, CompileMode, Context, Unit};
use crate::core::compiler::future_incompat::{
@ -100,28 +171,6 @@ pub struct JobQueue<'cfg> {
///
/// It is created from JobQueue when we have fully assembled the crate graph
/// (i.e., all package dependencies are known).
///
/// # Message queue
///
/// Each thread running a process uses the message queue to send messages back
/// to the main thread. The main thread coordinates everything, and handles
/// printing output.
///
/// It is important to be careful which messages use `push` vs `push_bounded`.
/// `push` is for priority messages (like tokens, or "finished") where the
/// sender shouldn't block. We want to handle those so real work can proceed
/// ASAP.
///
/// `push_bounded` is only for messages being printed to stdout/stderr. Being
/// bounded prevents a flood of messages causing a large amount of memory
/// being used.
///
/// `push` also avoids blocking which helps avoid deadlocks. For example, when
/// the diagnostic server thread is dropped, it waits for the thread to exit.
/// But if the thread is blocked on a full queue, and there is a critical
/// error, the drop will deadlock. This should be fixed at some point in the
/// future. The jobserver thread has a similar problem, though it will time
/// out after 1 second.
struct DrainState<'cfg> {
// This is the length of the DependencyQueue when starting out
total_units: usize,
@ -256,44 +305,6 @@ impl std::fmt::Display for JobId {
}
}
/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything
/// necessary to communicate between the main thread and the execution of the job.
///
/// The job may execute on either a dedicated thread or the main thread. If the job executes on the
/// main thread, the `output` field must be set to prevent a deadlock.
pub struct JobState<'a, 'cfg> {
/// Channel back to the main thread to coordinate messages and such.
///
/// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on
/// the message queue to prevent a deadlock.
messages: Arc<Queue<Message>>,
/// Normally output is sent to the job queue with backpressure. When the job is fresh
/// however we need to immediately display the output to prevent a deadlock as the
/// output messages are processed on the same thread as they are sent from. `output`
/// defines where to output in this case.
///
/// Currently the `Shell` inside `Config` is wrapped in a `RefCell` and thus can't be passed
/// between threads. This means that it isn't possible for multiple output messages to be
/// interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case
/// interleaving is still prevented as the lock would be held for the whole printing of an
/// output message.
output: Option<&'a DiagDedupe<'cfg>>,
/// The job id that this state is associated with, used when sending
/// messages back to the main thread.
id: JobId,
/// Whether or not we're expected to have a call to `rmeta_produced`. Once
/// that method is called this is dynamically set to `false` to prevent
/// sending a double message later on.
rmeta_required: Cell<bool>,
// Historical versions of Cargo made use of the `'a` argument here, so to
// leave the door open to future refactorings keep it here.
_marker: marker::PhantomData<&'a ()>,
}
/// Handler for deduplicating diagnostics.
struct DiagDedupe<'cfg> {
seen: RefCell<HashSet<u64>>,
@ -384,105 +395,6 @@ enum Message {
ReleaseToken(JobId),
}
impl<'a, 'cfg> JobState<'a, 'cfg> {
pub fn running(&self, cmd: &ProcessBuilder) {
self.messages.push(Message::Run(self.id, cmd.to_string()));
}
pub fn build_plan(
&self,
module_name: String,
cmd: ProcessBuilder,
filenames: Arc<Vec<OutputFile>>,
) {
self.messages
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
}
pub fn stdout(&self, stdout: String) -> CargoResult<()> {
if let Some(dedupe) = self.output {
writeln!(dedupe.config.shell().out(), "{}", stdout)?;
} else {
self.messages.push_bounded(Message::Stdout(stdout));
}
Ok(())
}
pub fn stderr(&self, stderr: String) -> CargoResult<()> {
if let Some(dedupe) = self.output {
let mut shell = dedupe.config.shell();
shell.print_ansi_stderr(stderr.as_bytes())?;
shell.err().write_all(b"\n")?;
} else {
self.messages.push_bounded(Message::Stderr(stderr));
}
Ok(())
}
/// See [`Message::Diagnostic`] and [`Message::WarningCount`].
pub fn emit_diag(&self, level: String, diag: String, fixable: bool) -> CargoResult<()> {
if let Some(dedupe) = self.output {
let emitted = dedupe.emit_diag(&diag)?;
if level == "warning" {
self.messages.push(Message::WarningCount {
id: self.id,
emitted,
fixable,
});
}
} else {
self.messages.push_bounded(Message::Diagnostic {
id: self.id,
level,
diag,
fixable,
});
}
Ok(())
}
/// See [`Message::Warning`].
pub fn warning(&self, warning: String) -> CargoResult<()> {
self.messages.push_bounded(Message::Warning {
id: self.id,
warning,
});
Ok(())
}
/// A method used to signal to the coordinator thread that the rmeta file
/// for an rlib has been produced. This is only called for some rmeta
/// builds when required, and can be called at any time before a job ends.
/// This should only be called once because a metadata file can only be
/// produced once!
pub fn rmeta_produced(&self) {
self.rmeta_required.set(false);
self.messages
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
}
pub fn future_incompat_report(&self, report: Vec<FutureBreakageItem>) {
self.messages
.push(Message::FutureIncompatReport(self.id, report));
}
/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
/// on the passed client.
///
/// This should arrange for the associated client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self) {
self.messages.push(Message::NeedsToken(self.id));
}
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
self.messages.push(Message::ReleaseToken(self.id));
}
}
impl<'cfg> JobQueue<'cfg> {
pub fn new(bcx: &BuildContext<'_, 'cfg>) -> JobQueue<'cfg> {
JobQueue {
@ -1119,52 +1031,9 @@ impl<'cfg> DrainState<'cfg> {
let is_fresh = job.freshness().is_fresh();
let rmeta_required = cx.rmeta_required(unit);
let doit = move |state: JobState<'_, '_>| {
let mut sender = FinishOnDrop {
messages: &state.messages,
id,
result: None,
};
sender.result = Some(job.run(&state));
// If the `rmeta_required` wasn't consumed but it was set
// previously, then we either have:
//
// 1. The `job` didn't do anything because it was "fresh".
// 2. The `job` returned an error and didn't reach the point where
// it called `rmeta_produced`.
// 3. We forgot to call `rmeta_produced` and there's a bug in Cargo.
//
// Ruling out the third, the other two are pretty common for 2
// we'll just naturally abort the compilation operation but for 1
// we need to make sure that the metadata is flagged as produced so
// send a synthetic message here.
if state.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() {
state
.messages
.push(Message::Finish(state.id, Artifact::Metadata, Ok(())));
}
// Use a helper struct with a `Drop` implementation to guarantee
// that a `Finish` message is sent even if our job panics. We
// shouldn't panic unless there's a bug in Cargo, so we just need
// to make sure nothing hangs by accident.
struct FinishOnDrop<'a> {
messages: &'a Queue<Message>,
id: JobId,
result: Option<CargoResult<()>>,
}
impl Drop for FinishOnDrop<'_> {
fn drop(&mut self) {
let result = self
.result
.take()
.unwrap_or_else(|| Err(format_err!("worker panicked")));
self.messages
.push(Message::Finish(self.id, Artifact::All, result));
}
}
let doit = move |diag_dedupe| {
let state = JobState::new(id, messages, diag_dedupe, rmeta_required);
state.run_to_finish(job);
};
match is_fresh {
@ -1172,25 +1041,11 @@ impl<'cfg> DrainState<'cfg> {
self.timings.add_fresh();
// Running a fresh job on the same thread is often much faster than spawning a new
// thread to run the job.
doit(JobState {
id,
messages,
output: Some(&self.diag_dedupe),
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
});
doit(Some(&self.diag_dedupe));
}
false => {
self.timings.add_dirty();
scope.spawn(move || {
doit(JobState {
id,
messages: messages.clone(),
output: None,
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
})
});
scope.spawn(move || doit(None));
}
}
}

View file

@ -42,7 +42,6 @@ mod crate_type;
mod custom_build;
mod fingerprint;
pub mod future_incompat;
mod job;
mod job_queue;
mod layout;
mod links;
@ -79,9 +78,8 @@ pub use self::context::{Context, Metadata};
pub use self::crate_type::CrateType;
pub use self::custom_build::{BuildOutput, BuildScriptOutputs, BuildScripts};
pub(crate) use self::fingerprint::DirtyReason;
pub use self::job::Freshness;
use self::job::{Job, Work};
use self::job_queue::{JobQueue, JobState};
pub use self::job_queue::Freshness;
use self::job_queue::{Job, JobQueue, JobState, Work};
pub(crate) use self::layout::Layout;
pub use self::lto::Lto;
use self::output_depinfo::output_depinfo;