[vm, gc] Allow scavenge and marking to proceed even if workers are slow to start up.

This makes the ParallelScavenge and ParallelMark tasks safe to be run by an embedder-provided task runner that doesn't guarantee immediate execution.

TEST=ci
Change-Id: I485c1873a5eb7a208a9cc4fb32aa770c0ad6f773
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/214870
Reviewed-by: Liam Appelbe <liama@google.com>
Commit-Queue: Ryan Macnak <rmacnak@google.com>
This commit is contained in:
Ryan Macnak 2021-11-02 17:42:52 +00:00 committed by commit-bot@chromium.org
parent 1ca06b9531
commit 7aa8716f6d
9 changed files with 158 additions and 116 deletions

View file

@ -239,22 +239,23 @@ void GCCompactor::Compact(OldPage* pages,
}
{
ThreadBarrier barrier(num_tasks, heap_->barrier(), heap_->barrier_done());
ThreadBarrier* barrier = new ThreadBarrier(num_tasks, num_tasks);
RelaxedAtomic<intptr_t> next_forwarding_task = {0};
for (intptr_t task_index = 0; task_index < num_tasks; task_index++) {
if (task_index < (num_tasks - 1)) {
// Begin compacting on a helper thread.
Dart::thread_pool()->Run<CompactorTask>(
thread()->isolate_group(), this, &barrier, &next_forwarding_task,
thread()->isolate_group(), this, barrier, &next_forwarding_task,
heads[task_index], &tails[task_index], freelist);
} else {
// Last worker is the main thread.
CompactorTask task(thread()->isolate_group(), this, &barrier,
CompactorTask task(thread()->isolate_group(), this, barrier,
&next_forwarding_task, heads[task_index],
&tails[task_index], freelist);
task.RunEnteredIsolateGroup();
barrier.Exit();
barrier->Sync();
barrier->Release();
}
}
}
@ -338,7 +339,8 @@ void CompactorTask::Run() {
Thread::ExitIsolateGroupAsHelper(/*bypass_safepoint=*/true);
// This task is done. Notify the original thread.
barrier_->Exit();
barrier_->Sync();
barrier_->Release();
}
void CompactorTask::RunEnteredIsolateGroup() {
@ -434,8 +436,6 @@ void CompactorTask::RunEnteredIsolateGroup() {
more_forwarding_tasks = false;
}
}
barrier_->Sync();
}
}

View file

@ -48,8 +48,6 @@ Heap::Heap(IsolateGroup* isolate_group,
is_vm_isolate_(is_vm_isolate),
new_space_(this, max_new_gen_semi_words),
old_space_(this, max_old_gen_words),
barrier_(),
barrier_done_(),
read_only_(false),
last_gc_was_old_space_(false),
assume_scavenge_will_fail_(false),

View file

@ -288,9 +288,6 @@ class Heap {
IsolateGroup* isolate_group() const { return isolate_group_; }
bool is_vm_isolate() const { return is_vm_isolate_; }
Monitor* barrier() const { return &barrier_; }
Monitor* barrier_done() const { return &barrier_done_; }
void SetupImagePage(void* pointer, uword size, bool is_executable) {
old_space_.SetupImagePage(pointer, size, is_executable);
}
@ -379,9 +376,6 @@ class Heap {
WeakTable* new_weak_tables_[kNumWeakSelectors];
WeakTable* old_weak_tables_[kNumWeakSelectors];
mutable Monitor barrier_;
mutable Monitor barrier_done_;
// GC stats collection.
GCStats stats_;

View file

@ -35,6 +35,7 @@ class MarkingVisitorBase : public ObjectPointerVisitor {
work_list_(marking_stack),
deferred_work_list_(deferred_marking_stack),
delayed_weak_properties_(WeakProperty::null()),
tail_(WeakProperty::null()),
marked_bytes_(0),
marked_micros_(0) {
ASSERT(thread_->isolate_group() == isolate_group);
@ -50,7 +51,7 @@ class MarkingVisitorBase : public ObjectPointerVisitor {
bool ProcessPendingWeakProperties() {
bool more_to_mark = false;
WeakPropertyPtr cur_weak = delayed_weak_properties_;
delayed_weak_properties_ = WeakProperty::null();
tail_ = delayed_weak_properties_ = WeakProperty::null();
while (cur_weak != WeakProperty::null()) {
WeakPropertyPtr next_weak =
cur_weak->untag()->next_.Decompress(cur_weak->heap_base());
@ -154,6 +155,9 @@ class MarkingVisitorBase : public ObjectPointerVisitor {
ASSERT(raw_weak->untag()->next_ ==
CompressedWeakPropertyPtr(WeakProperty::null()));
raw_weak->untag()->next_ = delayed_weak_properties_;
if (delayed_weak_properties_ == WeakProperty::null()) {
tail_ = raw_weak;
}
delayed_weak_properties_ = raw_weak;
}
@ -228,6 +232,26 @@ class MarkingVisitorBase : public ObjectPointerVisitor {
return work_list_.WaitForWork(num_busy);
}
void Flush(WeakPropertyPtr* head, WeakPropertyPtr* tail) {
work_list_.Flush();
deferred_work_list_.Flush();
if (*head == WeakProperty::null()) {
*head = delayed_weak_properties_;
*tail = tail_;
} else {
(*tail)->untag()->next_ = delayed_weak_properties_;
*tail = tail_;
}
tail_ = delayed_weak_properties_ = WeakProperty::null();
}
void Adopt(WeakPropertyPtr head, WeakPropertyPtr tail) {
ASSERT(delayed_weak_properties_ == WeakProperty::null());
ASSERT(tail_ == WeakProperty::null());
delayed_weak_properties_ = head;
tail_ = tail;
}
void AbandonWork() {
work_list_.AbandonWork();
deferred_work_list_.AbandonWork();
@ -302,6 +326,7 @@ class MarkingVisitorBase : public ObjectPointerVisitor {
MarkerWorkList work_list_;
MarkerWorkList deferred_work_list_;
WeakPropertyPtr delayed_weak_properties_;
WeakPropertyPtr tail_;
uintptr_t marked_bytes_;
int64_t marked_micros_;
@ -542,6 +567,11 @@ class ParallelMarkTask : public ThreadPool::Task {
num_busy_(num_busy) {}
virtual void Run() {
if (!barrier_->TryEnter()) {
barrier_->Release();
return;
}
bool result = Thread::EnterIsolateGroupAsHelper(
isolate_group_, Thread::kMarkerTask, /*bypass_safepoint=*/true);
ASSERT(result);
@ -550,8 +580,8 @@ class ParallelMarkTask : public ThreadPool::Task {
Thread::ExitIsolateGroupAsHelper(/*bypass_safepoint=*/true);
// This task is done. Notify the original thread.
barrier_->Exit();
barrier_->Sync();
barrier_->Release();
}
void RunEnteredIsolateGroup() {
@ -561,6 +591,7 @@ class ParallelMarkTask : public ThreadPool::Task {
int64_t start = OS::GetCurrentMonotonicMicros();
// Phase 1: Iterate over roots and drain marking stack in tasks.
num_busy_->fetch_add(1u);
marker_->IterateRoots(visitor_);
visitor_->ProcessDeferredMarking();
@ -603,7 +634,6 @@ class ParallelMarkTask : public ThreadPool::Task {
// Phase 2: deferred marking.
visitor_->ProcessDeferredMarking();
visitor_->FinalizeMarking();
barrier_->Sync();
// Phase 3: Weak processing and statistics.
@ -615,7 +645,6 @@ class ParallelMarkTask : public ThreadPool::Task {
THR_Print("Task marked %" Pd " bytes in %" Pd64 " micros.\n",
visitor_->marked_bytes(), visitor_->marked_micros());
}
barrier_->Sync();
}
}
@ -801,11 +830,16 @@ void GCMarker::MarkObjects(PageSpace* page_space) {
marked_bytes_ += visitor.marked_bytes();
marked_micros_ += visitor.marked_micros();
} else {
ThreadBarrier barrier(num_tasks, heap_->barrier(), heap_->barrier_done());
ThreadBarrier* barrier = new ThreadBarrier(num_tasks, 1);
ResetSlices();
// Used to coordinate draining among tasks; all start out as 'busy'.
RelaxedAtomic<uintptr_t> num_busy(num_tasks);
RelaxedAtomic<uintptr_t> num_busy = 0;
// Phase 1: Iterate over roots and drain marking stack in tasks.
WeakPropertyPtr head = WeakProperty::null();
WeakPropertyPtr tail = WeakProperty::null();
for (intptr_t i = 0; i < num_tasks; ++i) {
SyncMarkingVisitor* visitor = visitors_[i];
// Visitors may or may not have already been created depending on
@ -816,23 +850,33 @@ void GCMarker::MarkObjects(PageSpace* page_space) {
&marking_stack_, &deferred_marking_stack_);
visitors_[i] = visitor;
}
// Move all work from local blocks to the global list. Any given
// visitor might not get to run if it fails to reach TryEnter soon
// enough, and we must fail to visit objects but they're sitting in
// such a visitor's local blocks.
visitor->Flush(&head, &tail);
// Need to move weak property list too.
if (i < (num_tasks - 1)) {
// Begin marking on a helper thread.
bool result = Dart::thread_pool()->Run<ParallelMarkTask>(
this, isolate_group_, &marking_stack_, &barrier, visitor,
this, isolate_group_, &marking_stack_, barrier, visitor,
&num_busy);
ASSERT(result);
} else {
// Last worker is the main thread.
ParallelMarkTask task(this, isolate_group_, &marking_stack_, &barrier,
visitor->Adopt(head, tail);
ParallelMarkTask task(this, isolate_group_, &marking_stack_, barrier,
visitor, &num_busy);
task.RunEnteredIsolateGroup();
barrier.Exit();
barrier->Sync();
barrier->Release();
}
}
for (intptr_t i = 0; i < num_tasks; i++) {
SyncMarkingVisitor* visitor = visitors_[i];
visitor->FinalizeMarking();
marked_bytes_ += visitor->marked_bytes();
marked_micros_ += visitor->marked_micros();
delete visitor;

View file

@ -195,6 +195,17 @@ class BlockWorkList : public ValueObject {
local_output_->Push(raw_obj);
}
void Flush() {
if (!local_output_->IsEmpty()) {
stack_->PushBlock(local_output_);
local_output_ = stack_->PopEmptyBlock();
}
if (!local_input_->IsEmpty()) {
stack_->PushBlock(local_input_);
local_input_ = stack_->PopEmptyBlock();
}
}
bool WaitForWork(RelaxedAtomic<uintptr_t>* num_busy) {
ASSERT(local_input_->IsEmpty());
Block* new_work = stack_->WaitForWork(num_busy);

View file

@ -276,10 +276,7 @@ class ScavengerVisitorBase : public ObjectPointerVisitor {
}
void Finalize() {
if (scavenger_->abort_) {
promoted_list_.AbandonWork();
delayed_weak_properties_ = WeakProperty::null();
} else {
if (!scavenger_->abort_) {
ASSERT(!HasWork());
for (NewPage* page = head_; page != nullptr; page = page->next()) {
@ -287,14 +284,19 @@ class ScavengerVisitorBase : public ObjectPointerVisitor {
page->RecordSurvivors();
}
promoted_list_.Finalize();
MournWeakProperties();
}
page_space_->ReleaseLock(freelist_);
thread_ = nullptr;
}
void FinalizePromotion() { promoted_list_.Finalize(); }
void AbandonWork() {
promoted_list_.AbandonWork();
delayed_weak_properties_ = WeakProperty::null();
}
NewPage* head() const { return head_; }
NewPage* tail() const { return tail_; }
@ -557,6 +559,11 @@ class ParallelScavengerTask : public ThreadPool::Task {
num_busy_(num_busy) {}
virtual void Run() {
if (!barrier_->TryEnter()) {
barrier_->Release();
return;
}
bool result = Thread::EnterIsolateGroupAsHelper(
isolate_group_, Thread::kScavengerTask, /*bypass_safepoint=*/true);
ASSERT(result);
@ -565,13 +572,14 @@ class ParallelScavengerTask : public ThreadPool::Task {
Thread::ExitIsolateGroupAsHelper(/*bypass_safepoint=*/true);
// This task is done. Notify the original thread.
barrier_->Exit();
barrier_->Sync();
barrier_->Release();
}
void RunEnteredIsolateGroup() {
TIMELINE_FUNCTION_GC_DURATION(Thread::Current(), "ParallelScavenge");
num_busy_->fetch_add(1u);
visitor_->ProcessRoots();
// Phase 1: Copying.
@ -612,9 +620,10 @@ class ParallelScavengerTask : public ThreadPool::Task {
barrier_->Sync();
} while (more_to_scavenge);
ASSERT(!visitor_->HasWork());
// Phase 2: Weak processing, statistics.
visitor_->Finalize();
barrier_->Sync();
}
private:
@ -1671,6 +1680,7 @@ intptr_t Scavenger::SerialScavenge(SemiSpace* from) {
}
visitor.Finalize();
visitor.FinalizePromotion();
to_->AddList(visitor.head(), visitor.tail());
return visitor.bytes_promoted();
}
@ -1680,8 +1690,8 @@ intptr_t Scavenger::ParallelScavenge(SemiSpace* from) {
const intptr_t num_tasks = FLAG_scavenger_tasks;
ASSERT(num_tasks > 0);
ThreadBarrier barrier(num_tasks, heap_->barrier(), heap_->barrier_done());
RelaxedAtomic<uintptr_t> num_busy = num_tasks;
ThreadBarrier* barrier = new ThreadBarrier(num_tasks, 1);
RelaxedAtomic<uintptr_t> num_busy = 0;
ParallelScavengerVisitor** visitors =
new ParallelScavengerVisitor*[num_tasks];
@ -1692,21 +1702,28 @@ intptr_t Scavenger::ParallelScavenge(SemiSpace* from) {
if (i < (num_tasks - 1)) {
// Begin scavenging on a helper thread.
bool result = Dart::thread_pool()->Run<ParallelScavengerTask>(
heap_->isolate_group(), &barrier, visitors[i], &num_busy);
heap_->isolate_group(), barrier, visitors[i], &num_busy);
ASSERT(result);
} else {
// Last worker is the main thread.
ParallelScavengerTask task(heap_->isolate_group(), &barrier, visitors[i],
ParallelScavengerTask task(heap_->isolate_group(), barrier, visitors[i],
&num_busy);
task.RunEnteredIsolateGroup();
barrier.Exit();
barrier->Sync();
barrier->Release();
}
}
for (intptr_t i = 0; i < num_tasks; i++) {
to_->AddList(visitors[i]->head(), visitors[i]->tail());
bytes_promoted += visitors[i]->bytes_promoted();
delete visitors[i];
ParallelScavengerVisitor* visitor = visitors[i];
if (abort_) {
visitor->AbandonWork();
} else {
visitor->FinalizePromotion();
}
to_->AddList(visitor->head(), visitor->tail());
bytes_promoted += visitor->bytes_promoted();
delete visitor;
}
delete[] visitors;

View file

@ -104,7 +104,8 @@ class InterruptChecker : public ThreadPool::Task {
barrier_->Sync();
}
Thread::ExitIsolateAsHelper();
barrier_->Exit();
barrier_->Sync();
barrier_->Release();
}
private:
@ -125,24 +126,24 @@ const intptr_t InterruptChecker::kIterations = 10;
// compiler and/or CPU could reorder operations to make the tasks observe the
// round update *before* the interrupt is set.
TEST_CASE(StackLimitInterrupts) {
auto heap = thread->isolate_group()->heap();
ThreadBarrier barrier(InterruptChecker::kTaskCount + 1, heap->barrier(),
heap->barrier_done());
ThreadBarrier* barrier = new ThreadBarrier(InterruptChecker::kTaskCount + 1,
InterruptChecker::kTaskCount + 1);
// Start all tasks. They will busy-wait until interrupted in the first round.
for (intptr_t task = 0; task < InterruptChecker::kTaskCount; task++) {
Dart::thread_pool()->Run<InterruptChecker>(thread, &barrier);
Dart::thread_pool()->Run<InterruptChecker>(thread, barrier);
}
// Wait for all tasks to get ready for the first round.
barrier.Sync();
barrier->Sync();
for (intptr_t i = 0; i < InterruptChecker::kIterations; ++i) {
thread->ScheduleInterrupts(Thread::kVMInterrupt);
// Wait for all tasks to observe the interrupt.
barrier.Sync();
barrier->Sync();
// Continue with next round.
uword interrupts = thread->GetAndClearInterrupts();
EXPECT((interrupts & Thread::kVMInterrupt) != 0);
}
barrier.Exit();
barrier->Sync();
barrier->Release();
}
} // namespace dart

View file

@ -46,71 +46,55 @@ namespace dart {
//
class ThreadBarrier {
public:
explicit ThreadBarrier(intptr_t num_threads,
Monitor* monitor,
Monitor* done_monitor)
: num_threads_(num_threads),
monitor_(monitor),
remaining_(num_threads),
parity_(false),
done_monitor_(done_monitor),
done_(false) {
ASSERT(remaining_ > 0);
explicit ThreadBarrier(intptr_t num_threads, intptr_t initial = 0)
: ref_count_(num_threads),
monitor_(),
participating_(initial),
remaining_(initial),
generation_(0) {}
bool TryEnter() {
MonitorLocker ml(&monitor_);
if (generation_ != 0) {
return false;
}
remaining_++;
participating_++;
return true;
}
void Sync() {
MonitorLocker ml(monitor_);
ASSERT(remaining_ > 0);
if (--remaining_ > 0) {
// I'm not last to arrive; wait until next round.
bool old_parity = parity_;
while (parity_ == old_parity) {
MonitorLocker ml(&monitor_);
const intptr_t g = generation_;
remaining_--;
if (remaining_ == 0) {
// I'm last, advance to the next generation and wake the others.
generation_++;
remaining_ = participating_;
ml.NotifyAll();
} else {
// Waiting for others.
while (g == generation_) {
ml.Wait();
}
} else {
// Last one to arrive initiates the next round.
remaining_ = num_threads_;
parity_ = !parity_;
// Tell everyone else about the new round.
ml.NotifyAll();
}
}
void Exit() {
bool last = false;
{
MonitorLocker ml(monitor_);
ASSERT(remaining_ > 0);
last = (--remaining_ == 0);
void Release() {
intptr_t old = ref_count_.fetch_sub(1, std::memory_order_acq_rel);
ASSERT(old > 0);
if (old == 1) {
delete this;
}
if (last) {
// Last one to exit sets done_.
MonitorLocker ml(done_monitor_);
ASSERT(!done_);
done_ = true;
// Tell the destructor in case it's already waiting.
ml.Notify();
}
}
~ThreadBarrier() {
MonitorLocker ml(done_monitor_);
// Wait for everyone to exit before destroying the monitors.
while (!done_) {
ml.Wait();
}
ASSERT(remaining_ == 0);
}
private:
const intptr_t num_threads_;
std::atomic<intptr_t> ref_count_;
Monitor* monitor_;
Monitor monitor_;
intptr_t participating_;
intptr_t remaining_;
bool parity_;
Monitor* done_monitor_; // TODO(koda): Try to optimize this away.
bool done_;
intptr_t generation_;
DISALLOW_COPY_AND_ASSIGN(ThreadBarrier);
};

View file

@ -20,7 +20,7 @@ class FuzzTask : public ThreadPool::Task {
RandomSleep();
barrier_->Sync();
}
barrier_->Exit();
barrier_->Release();
}
private:
@ -40,21 +40,14 @@ VM_UNIT_TEST_CASE(ThreadBarrier) {
static const intptr_t kNumTasks = 5;
static const intptr_t kNumRounds = 500;
Monitor* monitor = new Monitor();
Monitor* monitor_done = new Monitor();
{
ThreadBarrier barrier(kNumTasks + 1, monitor, monitor_done);
for (intptr_t i = 0; i < kNumTasks; ++i) {
Dart::thread_pool()->Run<FuzzTask>(kNumRounds, &barrier, i + 1);
}
for (intptr_t i = 0; i < kNumRounds; ++i) {
barrier.Sync();
}
barrier.Exit();
ThreadBarrier* barrier = new ThreadBarrier(kNumTasks + 1, kNumTasks + 1);
for (intptr_t i = 0; i < kNumTasks; ++i) {
Dart::thread_pool()->Run<FuzzTask>(kNumRounds, barrier, i + 1);
}
delete monitor_done;
delete monitor;
for (intptr_t i = 0; i < kNumRounds; ++i) {
barrier->Sync();
}
barrier->Release();
}
} // namespace dart