Split waiting for an event out of the primary drainer

This has the slight behavior change where we won't ask for new dependencies and
so forth if no events have been received but I believe that there's no activity
that can happen if an event hasn't occurred (i.e., no state change has occurred)
so there's no need for us to actually do anything in practice.

To make sure we still record CPU usage and such sufficiently often that is also
moved into the inner "waiting for events" loop.
This commit is contained in:
Mark Rousskov 2020-01-15 20:24:05 -05:00
parent 445de0ed4f
commit 0d722a4d83

View file

@ -510,6 +510,28 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
Ok(None)
}
// This will also tick the progress bar as appropriate
fn wait_for_events(&mut self) -> Vec<Message> {
// Drain all events at once to avoid displaying the progress bar
// unnecessarily. If there's no events we actually block waiting for
// an event, but we keep a "heartbeat" going to allow `record_cpu`
// to run above to calculate CPU usage over time. To do this we
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let events: Vec<_> = self.rx.try_iter().collect();
if events.is_empty() {
loop {
self.tick_progress();
match self.rx.recv_timeout(Duration::from_millis(500)) {
Ok(message) => break vec![message],
Err(_) => continue,
}
}
} else {
events
}
}
fn drain_the_queue(
&mut self,
cx: &mut Context<'a, '_>,
@ -553,36 +575,7 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
self.tokens.truncate(self.active.len() - 1);
// Record some timing information if `-Ztimings` is enabled, and
// this'll end up being a noop if we're not recording this
// information.
self.timings.mark_concurrency(
self.active.len(),
self.pending_queue.len(),
self.queue.len(),
self.rustc_tokens.len(),
);
self.timings.record_cpu();
// Drain all events at once to avoid displaying the progress bar
// unnecessarily. If there's no events we actually block waiting for
// an event, but we keep a "heartbeat" going to allow `record_cpu`
// to run above to calculate CPU usage over time. To do this we
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let events: Vec<_> = self.rx.try_iter().collect();
let events = if events.is_empty() {
self.show_progress();
match self.rx.recv_timeout(Duration::from_millis(500)) {
Ok(message) => vec![message],
Err(_) => continue,
}
} else {
events
};
for event in events {
for event in self.wait_for_events() {
if let Some(err) = self.handle_event(cx, jobserver_helper, plan, event)? {
error = Some(err);
}
@ -628,7 +621,21 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
}
}
fn show_progress(&mut self) {
// This also records CPU usage and marks concurrency; we roughly want to do
// this as often as we spin on the events receiver (at least every 500ms or
// so).
fn tick_progress(&mut self) {
// Record some timing information if `-Ztimings` is enabled, and
// this'll end up being a noop if we're not recording this
// information.
self.timings.mark_concurrency(
self.active.len(),
self.pending_queue.len(),
self.queue.len(),
self.rustc_tokens.len(),
);
self.timings.record_cpu();
let active_names = self
.active
.values()