diff --git a/runtime/vm/heap/compactor.cc b/runtime/vm/heap/compactor.cc index a4bd2199719..7311ecf4c79 100644 --- a/runtime/vm/heap/compactor.cc +++ b/runtime/vm/heap/compactor.cc @@ -239,22 +239,23 @@ void GCCompactor::Compact(OldPage* pages, } { - ThreadBarrier barrier(num_tasks, heap_->barrier(), heap_->barrier_done()); + ThreadBarrier* barrier = new ThreadBarrier(num_tasks, num_tasks); RelaxedAtomic next_forwarding_task = {0}; for (intptr_t task_index = 0; task_index < num_tasks; task_index++) { if (task_index < (num_tasks - 1)) { // Begin compacting on a helper thread. Dart::thread_pool()->Run( - thread()->isolate_group(), this, &barrier, &next_forwarding_task, + thread()->isolate_group(), this, barrier, &next_forwarding_task, heads[task_index], &tails[task_index], freelist); } else { // Last worker is the main thread. - CompactorTask task(thread()->isolate_group(), this, &barrier, + CompactorTask task(thread()->isolate_group(), this, barrier, &next_forwarding_task, heads[task_index], &tails[task_index], freelist); task.RunEnteredIsolateGroup(); - barrier.Exit(); + barrier->Sync(); + barrier->Release(); } } } @@ -338,7 +339,8 @@ void CompactorTask::Run() { Thread::ExitIsolateGroupAsHelper(/*bypass_safepoint=*/true); // This task is done. Notify the original thread. - barrier_->Exit(); + barrier_->Sync(); + barrier_->Release(); } void CompactorTask::RunEnteredIsolateGroup() { @@ -434,8 +436,6 @@ void CompactorTask::RunEnteredIsolateGroup() { more_forwarding_tasks = false; } } - - barrier_->Sync(); } } diff --git a/runtime/vm/heap/heap.cc b/runtime/vm/heap/heap.cc index 9c8269be8f6..fe3956d63d3 100644 --- a/runtime/vm/heap/heap.cc +++ b/runtime/vm/heap/heap.cc @@ -48,8 +48,6 @@ Heap::Heap(IsolateGroup* isolate_group, is_vm_isolate_(is_vm_isolate), new_space_(this, max_new_gen_semi_words), old_space_(this, max_old_gen_words), - barrier_(), - barrier_done_(), read_only_(false), last_gc_was_old_space_(false), assume_scavenge_will_fail_(false), diff --git a/runtime/vm/heap/heap.h b/runtime/vm/heap/heap.h index 04e8a62b1f5..236070082c1 100644 --- a/runtime/vm/heap/heap.h +++ b/runtime/vm/heap/heap.h @@ -288,9 +288,6 @@ class Heap { IsolateGroup* isolate_group() const { return isolate_group_; } bool is_vm_isolate() const { return is_vm_isolate_; } - Monitor* barrier() const { return &barrier_; } - Monitor* barrier_done() const { return &barrier_done_; } - void SetupImagePage(void* pointer, uword size, bool is_executable) { old_space_.SetupImagePage(pointer, size, is_executable); } @@ -379,9 +376,6 @@ class Heap { WeakTable* new_weak_tables_[kNumWeakSelectors]; WeakTable* old_weak_tables_[kNumWeakSelectors]; - mutable Monitor barrier_; - mutable Monitor barrier_done_; - // GC stats collection. GCStats stats_; diff --git a/runtime/vm/heap/marker.cc b/runtime/vm/heap/marker.cc index a2f7477e7f0..e979a01bdff 100644 --- a/runtime/vm/heap/marker.cc +++ b/runtime/vm/heap/marker.cc @@ -35,6 +35,7 @@ class MarkingVisitorBase : public ObjectPointerVisitor { work_list_(marking_stack), deferred_work_list_(deferred_marking_stack), delayed_weak_properties_(WeakProperty::null()), + tail_(WeakProperty::null()), marked_bytes_(0), marked_micros_(0) { ASSERT(thread_->isolate_group() == isolate_group); @@ -50,7 +51,7 @@ class MarkingVisitorBase : public ObjectPointerVisitor { bool ProcessPendingWeakProperties() { bool more_to_mark = false; WeakPropertyPtr cur_weak = delayed_weak_properties_; - delayed_weak_properties_ = WeakProperty::null(); + tail_ = delayed_weak_properties_ = WeakProperty::null(); while (cur_weak != WeakProperty::null()) { WeakPropertyPtr next_weak = cur_weak->untag()->next_.Decompress(cur_weak->heap_base()); @@ -154,6 +155,9 @@ class MarkingVisitorBase : public ObjectPointerVisitor { ASSERT(raw_weak->untag()->next_ == CompressedWeakPropertyPtr(WeakProperty::null())); raw_weak->untag()->next_ = delayed_weak_properties_; + if (delayed_weak_properties_ == WeakProperty::null()) { + tail_ = raw_weak; + } delayed_weak_properties_ = raw_weak; } @@ -228,6 +232,26 @@ class MarkingVisitorBase : public ObjectPointerVisitor { return work_list_.WaitForWork(num_busy); } + void Flush(WeakPropertyPtr* head, WeakPropertyPtr* tail) { + work_list_.Flush(); + deferred_work_list_.Flush(); + if (*head == WeakProperty::null()) { + *head = delayed_weak_properties_; + *tail = tail_; + } else { + (*tail)->untag()->next_ = delayed_weak_properties_; + *tail = tail_; + } + tail_ = delayed_weak_properties_ = WeakProperty::null(); + } + + void Adopt(WeakPropertyPtr head, WeakPropertyPtr tail) { + ASSERT(delayed_weak_properties_ == WeakProperty::null()); + ASSERT(tail_ == WeakProperty::null()); + delayed_weak_properties_ = head; + tail_ = tail; + } + void AbandonWork() { work_list_.AbandonWork(); deferred_work_list_.AbandonWork(); @@ -302,6 +326,7 @@ class MarkingVisitorBase : public ObjectPointerVisitor { MarkerWorkList work_list_; MarkerWorkList deferred_work_list_; WeakPropertyPtr delayed_weak_properties_; + WeakPropertyPtr tail_; uintptr_t marked_bytes_; int64_t marked_micros_; @@ -542,6 +567,11 @@ class ParallelMarkTask : public ThreadPool::Task { num_busy_(num_busy) {} virtual void Run() { + if (!barrier_->TryEnter()) { + barrier_->Release(); + return; + } + bool result = Thread::EnterIsolateGroupAsHelper( isolate_group_, Thread::kMarkerTask, /*bypass_safepoint=*/true); ASSERT(result); @@ -550,8 +580,8 @@ class ParallelMarkTask : public ThreadPool::Task { Thread::ExitIsolateGroupAsHelper(/*bypass_safepoint=*/true); - // This task is done. Notify the original thread. - barrier_->Exit(); + barrier_->Sync(); + barrier_->Release(); } void RunEnteredIsolateGroup() { @@ -561,6 +591,7 @@ class ParallelMarkTask : public ThreadPool::Task { int64_t start = OS::GetCurrentMonotonicMicros(); // Phase 1: Iterate over roots and drain marking stack in tasks. + num_busy_->fetch_add(1u); marker_->IterateRoots(visitor_); visitor_->ProcessDeferredMarking(); @@ -603,7 +634,6 @@ class ParallelMarkTask : public ThreadPool::Task { // Phase 2: deferred marking. visitor_->ProcessDeferredMarking(); - visitor_->FinalizeMarking(); barrier_->Sync(); // Phase 3: Weak processing and statistics. @@ -615,7 +645,6 @@ class ParallelMarkTask : public ThreadPool::Task { THR_Print("Task marked %" Pd " bytes in %" Pd64 " micros.\n", visitor_->marked_bytes(), visitor_->marked_micros()); } - barrier_->Sync(); } } @@ -801,11 +830,16 @@ void GCMarker::MarkObjects(PageSpace* page_space) { marked_bytes_ += visitor.marked_bytes(); marked_micros_ += visitor.marked_micros(); } else { - ThreadBarrier barrier(num_tasks, heap_->barrier(), heap_->barrier_done()); + ThreadBarrier* barrier = new ThreadBarrier(num_tasks, 1); + ResetSlices(); // Used to coordinate draining among tasks; all start out as 'busy'. - RelaxedAtomic num_busy(num_tasks); + RelaxedAtomic num_busy = 0; // Phase 1: Iterate over roots and drain marking stack in tasks. + + WeakPropertyPtr head = WeakProperty::null(); + WeakPropertyPtr tail = WeakProperty::null(); + for (intptr_t i = 0; i < num_tasks; ++i) { SyncMarkingVisitor* visitor = visitors_[i]; // Visitors may or may not have already been created depending on @@ -816,23 +850,33 @@ void GCMarker::MarkObjects(PageSpace* page_space) { &marking_stack_, &deferred_marking_stack_); visitors_[i] = visitor; } + // Move all work from local blocks to the global list. Any given + // visitor might not get to run if it fails to reach TryEnter soon + // enough, and we must fail to visit objects but they're sitting in + // such a visitor's local blocks. + visitor->Flush(&head, &tail); + // Need to move weak property list too. + if (i < (num_tasks - 1)) { // Begin marking on a helper thread. bool result = Dart::thread_pool()->Run( - this, isolate_group_, &marking_stack_, &barrier, visitor, + this, isolate_group_, &marking_stack_, barrier, visitor, &num_busy); ASSERT(result); } else { // Last worker is the main thread. - ParallelMarkTask task(this, isolate_group_, &marking_stack_, &barrier, + visitor->Adopt(head, tail); + ParallelMarkTask task(this, isolate_group_, &marking_stack_, barrier, visitor, &num_busy); task.RunEnteredIsolateGroup(); - barrier.Exit(); + barrier->Sync(); + barrier->Release(); } } for (intptr_t i = 0; i < num_tasks; i++) { SyncMarkingVisitor* visitor = visitors_[i]; + visitor->FinalizeMarking(); marked_bytes_ += visitor->marked_bytes(); marked_micros_ += visitor->marked_micros(); delete visitor; diff --git a/runtime/vm/heap/pointer_block.h b/runtime/vm/heap/pointer_block.h index 5ef6dc2bd64..fb10fa6c27b 100644 --- a/runtime/vm/heap/pointer_block.h +++ b/runtime/vm/heap/pointer_block.h @@ -195,6 +195,17 @@ class BlockWorkList : public ValueObject { local_output_->Push(raw_obj); } + void Flush() { + if (!local_output_->IsEmpty()) { + stack_->PushBlock(local_output_); + local_output_ = stack_->PopEmptyBlock(); + } + if (!local_input_->IsEmpty()) { + stack_->PushBlock(local_input_); + local_input_ = stack_->PopEmptyBlock(); + } + } + bool WaitForWork(RelaxedAtomic* num_busy) { ASSERT(local_input_->IsEmpty()); Block* new_work = stack_->WaitForWork(num_busy); diff --git a/runtime/vm/heap/scavenger.cc b/runtime/vm/heap/scavenger.cc index 592627bc456..cac516f1d02 100644 --- a/runtime/vm/heap/scavenger.cc +++ b/runtime/vm/heap/scavenger.cc @@ -276,10 +276,7 @@ class ScavengerVisitorBase : public ObjectPointerVisitor { } void Finalize() { - if (scavenger_->abort_) { - promoted_list_.AbandonWork(); - delayed_weak_properties_ = WeakProperty::null(); - } else { + if (!scavenger_->abort_) { ASSERT(!HasWork()); for (NewPage* page = head_; page != nullptr; page = page->next()) { @@ -287,14 +284,19 @@ class ScavengerVisitorBase : public ObjectPointerVisitor { page->RecordSurvivors(); } - promoted_list_.Finalize(); - MournWeakProperties(); } page_space_->ReleaseLock(freelist_); thread_ = nullptr; } + void FinalizePromotion() { promoted_list_.Finalize(); } + + void AbandonWork() { + promoted_list_.AbandonWork(); + delayed_weak_properties_ = WeakProperty::null(); + } + NewPage* head() const { return head_; } NewPage* tail() const { return tail_; } @@ -557,6 +559,11 @@ class ParallelScavengerTask : public ThreadPool::Task { num_busy_(num_busy) {} virtual void Run() { + if (!barrier_->TryEnter()) { + barrier_->Release(); + return; + } + bool result = Thread::EnterIsolateGroupAsHelper( isolate_group_, Thread::kScavengerTask, /*bypass_safepoint=*/true); ASSERT(result); @@ -565,13 +572,14 @@ class ParallelScavengerTask : public ThreadPool::Task { Thread::ExitIsolateGroupAsHelper(/*bypass_safepoint=*/true); - // This task is done. Notify the original thread. - barrier_->Exit(); + barrier_->Sync(); + barrier_->Release(); } void RunEnteredIsolateGroup() { TIMELINE_FUNCTION_GC_DURATION(Thread::Current(), "ParallelScavenge"); + num_busy_->fetch_add(1u); visitor_->ProcessRoots(); // Phase 1: Copying. @@ -612,9 +620,10 @@ class ParallelScavengerTask : public ThreadPool::Task { barrier_->Sync(); } while (more_to_scavenge); + ASSERT(!visitor_->HasWork()); + // Phase 2: Weak processing, statistics. visitor_->Finalize(); - barrier_->Sync(); } private: @@ -1671,6 +1680,7 @@ intptr_t Scavenger::SerialScavenge(SemiSpace* from) { } visitor.Finalize(); + visitor.FinalizePromotion(); to_->AddList(visitor.head(), visitor.tail()); return visitor.bytes_promoted(); } @@ -1680,8 +1690,8 @@ intptr_t Scavenger::ParallelScavenge(SemiSpace* from) { const intptr_t num_tasks = FLAG_scavenger_tasks; ASSERT(num_tasks > 0); - ThreadBarrier barrier(num_tasks, heap_->barrier(), heap_->barrier_done()); - RelaxedAtomic num_busy = num_tasks; + ThreadBarrier* barrier = new ThreadBarrier(num_tasks, 1); + RelaxedAtomic num_busy = 0; ParallelScavengerVisitor** visitors = new ParallelScavengerVisitor*[num_tasks]; @@ -1692,21 +1702,28 @@ intptr_t Scavenger::ParallelScavenge(SemiSpace* from) { if (i < (num_tasks - 1)) { // Begin scavenging on a helper thread. bool result = Dart::thread_pool()->Run( - heap_->isolate_group(), &barrier, visitors[i], &num_busy); + heap_->isolate_group(), barrier, visitors[i], &num_busy); ASSERT(result); } else { // Last worker is the main thread. - ParallelScavengerTask task(heap_->isolate_group(), &barrier, visitors[i], + ParallelScavengerTask task(heap_->isolate_group(), barrier, visitors[i], &num_busy); task.RunEnteredIsolateGroup(); - barrier.Exit(); + barrier->Sync(); + barrier->Release(); } } for (intptr_t i = 0; i < num_tasks; i++) { - to_->AddList(visitors[i]->head(), visitors[i]->tail()); - bytes_promoted += visitors[i]->bytes_promoted(); - delete visitors[i]; + ParallelScavengerVisitor* visitor = visitors[i]; + if (abort_) { + visitor->AbandonWork(); + } else { + visitor->FinalizePromotion(); + } + to_->AddList(visitor->head(), visitor->tail()); + bytes_promoted += visitor->bytes_promoted(); + delete visitor; } delete[] visitors; diff --git a/runtime/vm/isolate_test.cc b/runtime/vm/isolate_test.cc index 200359182fc..2b8afacaa1e 100644 --- a/runtime/vm/isolate_test.cc +++ b/runtime/vm/isolate_test.cc @@ -104,7 +104,8 @@ class InterruptChecker : public ThreadPool::Task { barrier_->Sync(); } Thread::ExitIsolateAsHelper(); - barrier_->Exit(); + barrier_->Sync(); + barrier_->Release(); } private: @@ -125,24 +126,24 @@ const intptr_t InterruptChecker::kIterations = 10; // compiler and/or CPU could reorder operations to make the tasks observe the // round update *before* the interrupt is set. TEST_CASE(StackLimitInterrupts) { - auto heap = thread->isolate_group()->heap(); - ThreadBarrier barrier(InterruptChecker::kTaskCount + 1, heap->barrier(), - heap->barrier_done()); + ThreadBarrier* barrier = new ThreadBarrier(InterruptChecker::kTaskCount + 1, + InterruptChecker::kTaskCount + 1); // Start all tasks. They will busy-wait until interrupted in the first round. for (intptr_t task = 0; task < InterruptChecker::kTaskCount; task++) { - Dart::thread_pool()->Run(thread, &barrier); + Dart::thread_pool()->Run(thread, barrier); } // Wait for all tasks to get ready for the first round. - barrier.Sync(); + barrier->Sync(); for (intptr_t i = 0; i < InterruptChecker::kIterations; ++i) { thread->ScheduleInterrupts(Thread::kVMInterrupt); // Wait for all tasks to observe the interrupt. - barrier.Sync(); + barrier->Sync(); // Continue with next round. uword interrupts = thread->GetAndClearInterrupts(); EXPECT((interrupts & Thread::kVMInterrupt) != 0); } - barrier.Exit(); + barrier->Sync(); + barrier->Release(); } } // namespace dart diff --git a/runtime/vm/thread_barrier.h b/runtime/vm/thread_barrier.h index 0b422f7b3c9..810a7117edb 100644 --- a/runtime/vm/thread_barrier.h +++ b/runtime/vm/thread_barrier.h @@ -46,71 +46,55 @@ namespace dart { // class ThreadBarrier { public: - explicit ThreadBarrier(intptr_t num_threads, - Monitor* monitor, - Monitor* done_monitor) - : num_threads_(num_threads), - monitor_(monitor), - remaining_(num_threads), - parity_(false), - done_monitor_(done_monitor), - done_(false) { - ASSERT(remaining_ > 0); + explicit ThreadBarrier(intptr_t num_threads, intptr_t initial = 0) + : ref_count_(num_threads), + monitor_(), + participating_(initial), + remaining_(initial), + generation_(0) {} + + bool TryEnter() { + MonitorLocker ml(&monitor_); + if (generation_ != 0) { + return false; + } + remaining_++; + participating_++; + return true; } void Sync() { - MonitorLocker ml(monitor_); - ASSERT(remaining_ > 0); - if (--remaining_ > 0) { - // I'm not last to arrive; wait until next round. - bool old_parity = parity_; - while (parity_ == old_parity) { + MonitorLocker ml(&monitor_); + const intptr_t g = generation_; + remaining_--; + if (remaining_ == 0) { + // I'm last, advance to the next generation and wake the others. + generation_++; + remaining_ = participating_; + ml.NotifyAll(); + } else { + // Waiting for others. + while (g == generation_) { ml.Wait(); } - } else { - // Last one to arrive initiates the next round. - remaining_ = num_threads_; - parity_ = !parity_; - // Tell everyone else about the new round. - ml.NotifyAll(); } } - void Exit() { - bool last = false; - { - MonitorLocker ml(monitor_); - ASSERT(remaining_ > 0); - last = (--remaining_ == 0); + void Release() { + intptr_t old = ref_count_.fetch_sub(1, std::memory_order_acq_rel); + ASSERT(old > 0); + if (old == 1) { + delete this; } - if (last) { - // Last one to exit sets done_. - MonitorLocker ml(done_monitor_); - ASSERT(!done_); - done_ = true; - // Tell the destructor in case it's already waiting. - ml.Notify(); - } - } - - ~ThreadBarrier() { - MonitorLocker ml(done_monitor_); - // Wait for everyone to exit before destroying the monitors. - while (!done_) { - ml.Wait(); - } - ASSERT(remaining_ == 0); } private: - const intptr_t num_threads_; + std::atomic ref_count_; - Monitor* monitor_; + Monitor monitor_; + intptr_t participating_; intptr_t remaining_; - bool parity_; - - Monitor* done_monitor_; // TODO(koda): Try to optimize this away. - bool done_; + intptr_t generation_; DISALLOW_COPY_AND_ASSIGN(ThreadBarrier); }; diff --git a/runtime/vm/thread_barrier_test.cc b/runtime/vm/thread_barrier_test.cc index c937eeb29fd..48e40209d43 100644 --- a/runtime/vm/thread_barrier_test.cc +++ b/runtime/vm/thread_barrier_test.cc @@ -20,7 +20,7 @@ class FuzzTask : public ThreadPool::Task { RandomSleep(); barrier_->Sync(); } - barrier_->Exit(); + barrier_->Release(); } private: @@ -40,21 +40,14 @@ VM_UNIT_TEST_CASE(ThreadBarrier) { static const intptr_t kNumTasks = 5; static const intptr_t kNumRounds = 500; - Monitor* monitor = new Monitor(); - Monitor* monitor_done = new Monitor(); - { - ThreadBarrier barrier(kNumTasks + 1, monitor, monitor_done); - for (intptr_t i = 0; i < kNumTasks; ++i) { - Dart::thread_pool()->Run(kNumRounds, &barrier, i + 1); - } - for (intptr_t i = 0; i < kNumRounds; ++i) { - barrier.Sync(); - } - barrier.Exit(); + ThreadBarrier* barrier = new ThreadBarrier(kNumTasks + 1, kNumTasks + 1); + for (intptr_t i = 0; i < kNumTasks; ++i) { + Dart::thread_pool()->Run(kNumRounds, barrier, i + 1); } - - delete monitor_done; - delete monitor; + for (intptr_t i = 0; i < kNumRounds; ++i) { + barrier->Sync(); + } + barrier->Release(); } } // namespace dart