[vm/concurrency] Refactor ThreadPool implementation to use work stealing

This is a preparation to be able to have fixed size thread pool (it's
also a nice cleanup of very hard to read code).

Issue https://github.com/dart-lang/sdk/issues/36097

Change-Id: I3b1fbb7c3b6f7be70be9df0e7885a74fdc613e3e
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/145381
Commit-Queue: Martin Kustermann <kustermann@google.com>
Reviewed-by: Ryan Macnak <rmacnak@google.com>
This commit is contained in:
Martin Kustermann 2020-05-02 17:22:05 +00:00 committed by commit-bot@chromium.org
parent be1f85847f
commit a42e6ac035
5 changed files with 322 additions and 501 deletions

View file

@ -227,6 +227,30 @@ class IntrusiveDList {
return next;
}
bool ContainsForDebugging(const T* a) {
for (auto entry : *this) {
if (entry == a) return true;
}
return false;
}
void AppendList(IntrusiveDList<T, N>* other) {
if (other->IsEmpty()) return;
auto other_first = other->head_.Next();
auto other_last = other->head_.Prev();
other->head_.next_ = &other->head_;
other->head_.prev_ = &other->head_;
auto prev = head_.prev_;
prev->next_ = other_first;
other_first->prev_ = prev;
other_last->next_ = &head_;
head_.prev_ = other_last;
}
private:
Entry head_;

View file

@ -152,4 +152,58 @@ UNIT_TEST_CASE(IntrusiveDListEraseIterator) {
EXPECT(all.IsEmpty());
}
UNIT_TEST_CASE(IntrusiveDListAppendListTest) {
// Append to empty list.
{
IntrusiveDList<Item> all;
IntrusiveDList<Item> other;
Item a1(1, 11), a2(2, 12);
all.Append(&a1);
all.Append(&a2);
other.AppendList(&all);
EXPECT(all.IsEmpty());
EXPECT(!other.IsEmpty());
EXPECT_EQ(&a1, other.First());
EXPECT_EQ(&a2, other.Last());
auto it = other.Begin();
EXPECT_EQ(&a1, *it);
it = other.Erase(it);
EXPECT_EQ(&a2, *it);
it = other.Erase(it);
EXPECT(it == other.end());
}
// Append to non-empty list.
{
IntrusiveDList<Item> all;
IntrusiveDList<Item> other;
Item a1(1, 11), a2(2, 12);
all.Append(&a1);
all.Append(&a2);
Item o1(1, 11);
other.Append(&o1);
other.AppendList(&all);
EXPECT(all.IsEmpty());
EXPECT(!other.IsEmpty());
EXPECT_EQ(&o1, other.First());
EXPECT_EQ(&a2, other.Last());
auto it = other.Begin();
EXPECT_EQ(&o1, *it);
it = other.Erase(it);
EXPECT_EQ(&a1, *it);
it = other.Erase(it);
EXPECT_EQ(&a2, *it);
it = other.Erase(it);
EXPECT(it == other.end());
}
}
} // namespace dart.

View file

@ -191,6 +191,22 @@ class MonitorLocker : public ValueObject {
DISALLOW_COPY_AND_ASSIGN(MonitorLocker);
};
// Leaves the given monitor during the scope of the object.
class MonitorLeaveScope : public ValueObject {
public:
explicit MonitorLeaveScope(MonitorLocker* monitor)
: monitor_locker_(monitor) {
monitor_locker_->Exit();
}
virtual ~MonitorLeaveScope() { monitor_locker_->Enter(); }
private:
MonitorLocker* const monitor_locker_;
DISALLOW_COPY_AND_ASSIGN(MonitorLeaveScope);
};
/*
* Safepoint mutex locker :
* This locker abstraction should be used when the enclosing code could

View file

@ -15,341 +15,6 @@ DEFINE_FLAG(int,
5000,
"Free workers when they have been idle for this amount of time.");
ThreadPool::ThreadPool()
: shutting_down_(false),
all_workers_(NULL),
idle_workers_(NULL),
count_started_(0),
count_stopped_(0),
count_running_(0),
count_idle_(0),
shutting_down_workers_(NULL),
join_list_(NULL) {}
ThreadPool::~ThreadPool() {
Shutdown();
}
bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
Worker* worker = NULL;
bool new_worker = false;
{
// We need ThreadPool::mutex_ to access worker lists and other
// ThreadPool state.
MutexLocker ml(&mutex_);
if (shutting_down_) {
return false;
}
if (idle_workers_ == NULL) {
worker = new Worker(this);
ASSERT(worker != NULL);
new_worker = true;
count_started_++;
// Add worker to the all_workers_ list.
worker->all_next_ = all_workers_;
all_workers_ = worker;
worker->owned_ = true;
count_running_++;
} else {
// Get the first worker from the idle worker list.
worker = idle_workers_;
idle_workers_ = worker->idle_next_;
worker->idle_next_ = NULL;
count_idle_--;
count_running_++;
}
}
// Release ThreadPool::mutex_ before calling Worker functions.
ASSERT(worker != NULL);
worker->SetTask(std::move(task));
if (new_worker) {
// Call StartThread after we've assigned the first task.
worker->StartThread();
}
return true;
}
void ThreadPool::Shutdown() {
Worker* saved = NULL;
{
MutexLocker ml(&mutex_);
shutting_down_ = true;
saved = all_workers_;
all_workers_ = NULL;
idle_workers_ = NULL;
Worker* current = saved;
while (current != NULL) {
Worker* next = current->all_next_;
current->idle_next_ = NULL;
current->owned_ = false;
current = next;
count_stopped_++;
}
count_idle_ = 0;
count_running_ = 0;
ASSERT(count_started_ == count_stopped_);
}
// Release ThreadPool::mutex_ before calling Worker functions.
{
MonitorLocker eml(&exit_monitor_);
// First tell all the workers to shut down.
Worker* current = saved;
OSThread* os_thread = OSThread::Current();
ASSERT(os_thread != NULL);
ThreadId id = os_thread->id();
while (current != NULL) {
Worker* next = current->all_next_;
ThreadId currentId = current->id();
if (currentId != id) {
AddWorkerToShutdownList(current);
}
current->Shutdown();
current = next;
}
saved = NULL;
// Wait until all workers will exit.
while (shutting_down_workers_ != NULL) {
// Here, we are waiting for workers to exit. When a worker exits we will
// be notified.
eml.Wait();
}
}
// Extract the join list, and join on the threads.
JoinList* list = NULL;
{
MutexLocker ml(&mutex_);
list = join_list_;
join_list_ = NULL;
}
// Join non-idle threads.
JoinList::Join(&list);
#if defined(DEBUG)
{
MutexLocker ml(&mutex_);
ASSERT(join_list_ == NULL);
}
#endif
}
bool ThreadPool::IsIdle(Worker* worker) {
ASSERT(worker != NULL && worker->owned_);
for (Worker* current = idle_workers_; current != NULL;
current = current->idle_next_) {
if (current == worker) {
return true;
}
}
return false;
}
bool ThreadPool::RemoveWorkerFromIdleList(Worker* worker) {
ASSERT(worker != NULL && worker->owned_);
if (idle_workers_ == NULL) {
return false;
}
// Special case head of list.
if (idle_workers_ == worker) {
idle_workers_ = worker->idle_next_;
worker->idle_next_ = NULL;
return true;
}
for (Worker* current = idle_workers_; current->idle_next_ != NULL;
current = current->idle_next_) {
if (current->idle_next_ == worker) {
current->idle_next_ = worker->idle_next_;
worker->idle_next_ = NULL;
return true;
}
}
return false;
}
bool ThreadPool::RemoveWorkerFromAllList(Worker* worker) {
ASSERT(worker != NULL && worker->owned_);
if (all_workers_ == NULL) {
return false;
}
// Special case head of list.
if (all_workers_ == worker) {
all_workers_ = worker->all_next_;
worker->all_next_ = NULL;
worker->owned_ = false;
worker->done_ = true;
return true;
}
for (Worker* current = all_workers_; current->all_next_ != NULL;
current = current->all_next_) {
if (current->all_next_ == worker) {
current->all_next_ = worker->all_next_;
worker->all_next_ = NULL;
worker->owned_ = false;
return true;
}
}
return false;
}
void ThreadPool::SetIdleLocked(Worker* worker) {
ASSERT(mutex_.IsOwnedByCurrentThread());
ASSERT(worker->owned_ && !IsIdle(worker));
worker->idle_next_ = idle_workers_;
idle_workers_ = worker;
count_idle_++;
count_running_--;
}
void ThreadPool::SetIdleAndReapExited(Worker* worker) {
JoinList* list = NULL;
{
MutexLocker ml(&mutex_);
if (shutting_down_) {
return;
}
if (join_list_ == NULL) {
// Nothing to join, add to the idle list and return.
SetIdleLocked(worker);
return;
}
// There is something to join. Grab the join list, drop the lock, do the
// join, then grab the lock again and add to the idle list.
list = join_list_;
join_list_ = NULL;
}
JoinList::Join(&list);
{
MutexLocker ml(&mutex_);
if (shutting_down_) {
return;
}
SetIdleLocked(worker);
}
}
bool ThreadPool::ReleaseIdleWorker(Worker* worker) {
MutexLocker ml(&mutex_);
if (shutting_down_) {
return false;
}
// Remove from idle list.
if (!RemoveWorkerFromIdleList(worker)) {
return false;
}
// Remove from all list.
bool found = RemoveWorkerFromAllList(worker);
ASSERT(found);
// The thread for worker will exit. Add its ThreadId to the join_list_
// so that we can join on it at the next opportunity.
OSThread* os_thread = OSThread::Current();
ASSERT(os_thread != NULL);
ThreadJoinId join_id = OSThread::GetCurrentThreadJoinId(os_thread);
JoinList::AddLocked(join_id, &join_list_);
count_stopped_++;
count_idle_--;
return true;
}
// Only call while holding the exit_monitor_
void ThreadPool::AddWorkerToShutdownList(Worker* worker) {
ASSERT(exit_monitor_.IsOwnedByCurrentThread());
worker->shutdown_next_ = shutting_down_workers_;
shutting_down_workers_ = worker;
}
// Only call while holding the exit_monitor_
bool ThreadPool::RemoveWorkerFromShutdownList(Worker* worker) {
ASSERT(worker != NULL);
ASSERT(shutting_down_workers_ != NULL);
ASSERT(exit_monitor_.IsOwnedByCurrentThread());
// Special case head of list.
if (shutting_down_workers_ == worker) {
shutting_down_workers_ = worker->shutdown_next_;
worker->shutdown_next_ = NULL;
return true;
}
for (Worker* current = shutting_down_workers_;
current->shutdown_next_ != NULL; current = current->shutdown_next_) {
if (current->shutdown_next_ == worker) {
current->shutdown_next_ = worker->shutdown_next_;
worker->shutdown_next_ = NULL;
return true;
}
}
return false;
}
void ThreadPool::JoinList::AddLocked(ThreadJoinId id, JoinList** list) {
*list = new JoinList(id, *list);
}
void ThreadPool::JoinList::Join(JoinList** list) {
while (*list != NULL) {
JoinList* current = *list;
*list = current->next();
OSThread::Join(current->id());
delete current;
}
}
ThreadPool::Task::Task() {}
ThreadPool::Task::~Task() {}
ThreadPool::Worker::Worker(ThreadPool* pool)
: pool_(pool),
task_(nullptr),
id_(OSThread::kInvalidThreadId),
done_(false),
owned_(false),
all_next_(NULL),
idle_next_(NULL),
shutdown_next_(NULL) {}
ThreadId ThreadPool::Worker::id() {
MonitorLocker ml(&monitor_);
return id_;
}
void ThreadPool::Worker::StartThread() {
#if defined(DEBUG)
// Must call SetTask before StartThread.
{ // NOLINT
MonitorLocker ml(&monitor_);
ASSERT(task_ != nullptr);
}
#endif
int result = OSThread::Start("DartWorker", &Worker::Main,
reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Could not start worker thread: result = %d.", result);
}
}
void ThreadPool::Worker::SetTask(std::unique_ptr<Task> task) {
std::atomic_thread_fence(std::memory_order_release);
MonitorLocker ml(&monitor_);
ASSERT(task_ == nullptr);
task_ = std::move(task);
ml.Notify();
}
static int64_t ComputeTimeout(int64_t idle_start) {
int64_t worker_timeout_micros =
FLAG_worker_timeout_millis * kMicrosecondsPerMillisecond;
@ -369,110 +34,217 @@ static int64_t ComputeTimeout(int64_t idle_start) {
}
}
bool ThreadPool::Worker::Loop() {
MonitorLocker ml(&monitor_);
int64_t idle_start;
while (true) {
ASSERT(task_ != nullptr);
std::unique_ptr<Task> task = std::move(task_);
ThreadPool::ThreadPool() : all_workers_dead_(false) {}
// Release monitor while handling the task.
ml.Exit();
std::atomic_thread_fence(std::memory_order_acquire);
task->Run();
ASSERT(Isolate::Current() == NULL);
task.reset();
ml.Enter();
ThreadPool::~ThreadPool() {
TriggerShutdown();
ASSERT(task_ == nullptr);
if (IsDone()) {
// Wait until all workers are dead. Any new death will notify the exit
// monitor.
{
MonitorLocker eml(&exit_monitor_);
while (!all_workers_dead_) {
eml.Wait();
}
}
// Join all dead workers.
WorkerList dead_workers_to_join;
{
MonitorLocker ml(&pool_monitor_);
ObtainDeadWorkersLocked(&dead_workers_to_join);
}
JoinDeadWorkersLocked(&dead_workers_to_join);
}
bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
Worker* new_worker = nullptr;
{
MonitorLocker ml(&pool_monitor_);
if (shutting_down_) {
return false;
}
ASSERT(!done_);
pool_->SetIdleAndReapExited(this);
idle_start = OS::GetCurrentMonotonicMicros();
while (true) {
Monitor::WaitResult result = ml.WaitMicros(ComputeTimeout(idle_start));
if (task_ != nullptr) {
// We've found a task. Process it, regardless of whether the
// worker is done_.
new_worker = ScheduleTaskLocked(&ml, std::move(task));
}
if (new_worker != nullptr) {
new_worker->StartThread();
}
return true;
}
void ThreadPool::WorkerLoop(Worker* worker) {
WorkerList dead_workers_to_join;
while (true) {
MonitorLocker ml(&pool_monitor_);
if (!tasks_.IsEmpty()) {
IdleToRunningLocked(worker);
while (!tasks_.IsEmpty()) {
std::unique_ptr<Task> task(tasks_.RemoveFirst());
pending_tasks_--;
MonitorLeaveScope mls(&ml);
task->Run();
ASSERT(Isolate::Current() == nullptr);
task.reset();
}
RunningToIdleLocked(worker);
}
if (shutting_down_) {
ObtainDeadWorkersLocked(&dead_workers_to_join);
IdleToDeadLocked(worker);
break;
}
// Sleep until we get a new task, we time out or we're shutdown.
const int64_t idle_start = OS::GetCurrentMonotonicMicros();
bool done = false;
while (!done) {
const auto result = ml.WaitMicros(ComputeTimeout(idle_start));
// We have to drain all pending tasks.
if (!tasks_.IsEmpty()) break;
if (shutting_down_ || result == Monitor::kTimedOut) {
done = true;
break;
}
if (IsDone()) {
return false;
}
if ((result == Monitor::kTimedOut) && pool_->ReleaseIdleWorker(this)) {
return true;
}
}
if (done) {
ObtainDeadWorkersLocked(&dead_workers_to_join);
IdleToDeadLocked(worker);
break;
}
}
UNREACHABLE();
return false;
// Before we transitioned to dead we obtained the list of previously died dead
// workers, which we join here. Since every death of a worker will join
// previously died workers, we keep the pending non-joined [dead_workers_] to
// effectively 1.
JoinDeadWorkersLocked(&dead_workers_to_join);
}
void ThreadPool::Worker::Shutdown() {
MonitorLocker ml(&monitor_);
done_ = true;
ml.Notify();
}
void ThreadPool::TriggerShutdown() {
MonitorLocker ml(&pool_monitor_);
// static
void ThreadPool::Worker::Main(uword args) {
Worker* worker = reinterpret_cast<Worker*>(args);
OSThread* os_thread = OSThread::Current();
ASSERT(os_thread != NULL);
ThreadId id = os_thread->id();
ThreadPool* pool;
// Prevent scheduling of new tasks.
shutting_down_ = true;
{
MonitorLocker ml(&worker->monitor_);
ASSERT(worker->task_);
worker->id_ = id;
pool = worker->pool_;
if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
// All workers have already died.
all_workers_dead_ = true;
} else {
// Tell workers to drain remaining work and then shut down.
ml.NotifyAll();
}
}
bool released = worker->Loop();
void ThreadPool::IdleToRunningLocked(Worker* worker) {
ASSERT(idle_workers_.ContainsForDebugging(worker));
idle_workers_.Remove(worker);
running_workers_.Append(worker);
count_idle_--;
count_running_++;
}
// It should be okay to access these unlocked here in this assert.
// worker->all_next_ is retained by the pool for shutdown monitoring.
ASSERT(!worker->owned_ && (worker->idle_next_ == NULL));
void ThreadPool::RunningToIdleLocked(Worker* worker) {
ASSERT(tasks_.IsEmpty());
if (!released) {
// This worker is exiting because the thread pool is being shut down.
// Inform the thread pool that we are exiting. We remove this worker from
// shutting_down_workers_ list because there will be no need for the
// ThreadPool to take action for this worker.
ThreadJoinId join_id = OSThread::GetCurrentThreadJoinId(os_thread);
{
MutexLocker ml(&pool->mutex_);
JoinList::AddLocked(join_id, &pool->join_list_);
}
ASSERT(running_workers_.ContainsForDebugging(worker));
running_workers_.Remove(worker);
idle_workers_.Append(worker);
count_running_--;
count_idle_++;
}
// worker->id_ should never be read again, so set to invalid in debug mode
// for asserts.
#if defined(DEBUG)
{
MonitorLocker ml(&worker->monitor_);
worker->id_ = OSThread::kInvalidThreadId;
}
#endif
void ThreadPool::IdleToDeadLocked(Worker* worker) {
ASSERT(tasks_.IsEmpty());
// Remove from the shutdown list, delete, and notify the thread pool.
{
MonitorLocker eml(&pool->exit_monitor_);
pool->RemoveWorkerFromShutdownList(worker);
delete worker;
ASSERT(idle_workers_.ContainsForDebugging(worker));
idle_workers_.Remove(worker);
dead_workers_.Append(worker);
count_idle_--;
count_dead_++;
// Notify shutdown thread that the worker thread is about to finish.
if (shutting_down_) {
if (running_workers_.IsEmpty() && idle_workers_.IsEmpty()) {
all_workers_dead_ = true;
MonitorLocker eml(&exit_monitor_);
eml.Notify();
}
} else {
// This worker is going down because it was idle for too long. This case
// is not due to a ThreadPool Shutdown. Thus, we simply delete the worker.
// The worker's id is added to the thread pool's join list by
// ReleaseIdleWorker, so in the case that the thread pool begins shutting
// down immediately after returning from worker->Loop() above, we still
// wait for the thread to exit by joining on it in Shutdown().
}
}
void ThreadPool::ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join) {
dead_workers_to_join->AppendList(&dead_workers_);
ASSERT(dead_workers_.IsEmpty());
count_dead_ = 0;
}
void ThreadPool::JoinDeadWorkersLocked(WorkerList* dead_workers_to_join) {
auto it = dead_workers_to_join->begin();
while (it != dead_workers_to_join->end()) {
Worker* worker = *it;
it = dead_workers_to_join->Erase(it);
OSThread::Join(worker->join_id_);
delete worker;
}
ASSERT(dead_workers_to_join->IsEmpty());
}
ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
std::unique_ptr<Task> task) {
// Enqueue the new task.
tasks_.Append(task.release());
pending_tasks_++;
ASSERT(pending_tasks_ >= 1);
// Notify existing idle worker (if available).
if (count_idle_ >= pending_tasks_) {
ASSERT(!idle_workers_.IsEmpty());
ml->Notify();
return nullptr;
}
// Otherwise start a new worker.
auto new_worker = new Worker(this);
idle_workers_.Append(new_worker);
count_idle_++;
return new_worker;
}
ThreadPool::Worker::Worker(ThreadPool* pool)
: pool_(pool), join_id_(OSThread::kInvalidThreadJoinId) {}
void ThreadPool::Worker::StartThread() {
int result = OSThread::Start("DartWorker", &Worker::Main,
reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Could not start worker thread: result = %d.", result);
}
}
void ThreadPool::Worker::Main(uword args) {
OSThread* os_thread = OSThread::Current();
ASSERT(os_thread != nullptr);
Worker* worker = reinterpret_cast<Worker*>(args);
ThreadPool* pool = worker->pool_;
// Once the worker quits it needs to be joined.
worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread);
#if defined(DEBUG)
{
MonitorLocker ml(&pool->pool_monitor_);
ASSERT(pool->idle_workers_.ContainsForDebugging(worker));
}
#endif
pool->WorkerLoop(worker);
// Call the thread exit hook here to notify the embedder that the
// thread pool thread is exiting.

View file

@ -10,19 +10,22 @@
#include "vm/allocation.h"
#include "vm/globals.h"
#include "vm/intrusive_dlist.h"
#include "vm/os_thread.h"
namespace dart {
class MonitorLocker;
class ThreadPool {
public:
// Subclasses of Task are able to run on a ThreadPool.
class Task {
class Task : public IntrusiveDListEntry<Task> {
protected:
Task();
Task() {}
public:
virtual ~Task();
virtual ~Task() {}
// Override this to provide task-specific behavior.
virtual void Run() = 0;
@ -32,10 +35,7 @@ class ThreadPool {
};
ThreadPool();
// Shuts down this thread pool. Causes workers to terminate
// themselves when they are active again.
~ThreadPool();
virtual ~ThreadPool();
// Runs a task on the thread pool.
template <typename T, typename... Args>
@ -43,110 +43,65 @@ class ThreadPool {
return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
}
// Some simple stats.
uint64_t workers_running() const { return count_running_; }
uint64_t workers_idle() const { return count_idle_; }
uint64_t workers_started() const { return count_started_; }
uint64_t workers_stopped() const { return count_stopped_; }
// Trigger shutdown, prevents scheduling of new tasks.
void TriggerShutdown();
// Exposed for unit test in thread_pool_test.cc
uint64_t workers_started() const { return count_idle_ + count_running_; }
// Exposed for unit test in thread_pool_test.cc
uint64_t workers_stopped() const { return count_dead_; }
private:
class Worker {
class Worker : public IntrusiveDListEntry<Worker> {
public:
explicit Worker(ThreadPool* pool);
// Sets a task on the worker.
void SetTask(std::unique_ptr<Task> task);
// Starts the thread for the worker. This should only be called
// after a task has been set by the initial call to SetTask().
void StartThread();
// Main loop for a worker. Returns true if worker is removed from thread
// lists, false otherwise.
bool Loop();
// Causes worker to terminate eventually.
void Shutdown();
// Get the Worker's thread id.
ThreadId id();
private:
friend class ThreadPool;
// The main entry point for new worker threads.
static void Main(uword args);
bool IsDone() const { return done_; }
// Fields owned by Worker.
Monitor monitor_;
// Fields initialized during construction or in start of main function of
// thread.
ThreadPool* pool_;
std::unique_ptr<Task> task_;
ThreadId id_;
bool done_;
// Fields owned by ThreadPool. Workers should not look at these
// directly. It's like looking at the sun.
bool owned_; // Protected by ThreadPool::mutex_
Worker* all_next_; // Protected by ThreadPool::mutex_
Worker* idle_next_; // Protected by ThreadPool::mutex_
Worker* shutdown_next_; // Protected by ThreadPool::exit_monitor
ThreadJoinId join_id_;
DISALLOW_COPY_AND_ASSIGN(Worker);
};
class JoinList {
public:
explicit JoinList(ThreadJoinId id, JoinList* next) : id_(id), next_(next) {}
// The thread pool's mutex_ must be held when calling this.
static void AddLocked(ThreadJoinId id, JoinList** list);
static void Join(JoinList** list);
ThreadJoinId id() const { return id_; }
JoinList* next() const { return next_; }
private:
ThreadJoinId id_;
JoinList* next_;
DISALLOW_COPY_AND_ASSIGN(JoinList);
};
private:
using TaskList = IntrusiveDList<Task>;
using WorkerList = IntrusiveDList<Worker>;
bool RunImpl(std::unique_ptr<Task> task);
void Shutdown();
void WorkerLoop(Worker* worker);
// Expensive. Use only in assertions.
bool IsIdle(Worker* worker);
Worker* ScheduleTaskLocked(MonitorLocker* ml, std::unique_ptr<Task> task);
bool RemoveWorkerFromIdleList(Worker* worker);
bool RemoveWorkerFromAllList(Worker* worker);
void IdleToRunningLocked(Worker* worker);
void RunningToIdleLocked(Worker* worker);
void IdleToDeadLocked(Worker* worker);
void ObtainDeadWorkersLocked(WorkerList* dead_workers_to_join);
void JoinDeadWorkersLocked(WorkerList* dead_workers_to_join);
void AddWorkerToShutdownList(Worker* worker);
bool RemoveWorkerFromShutdownList(Worker* worker);
void ReapExitedIdleThreads();
// Worker operations.
void SetIdleLocked(Worker* worker); // Assumes mutex_ is held.
void SetIdleAndReapExited(Worker* worker);
bool ReleaseIdleWorker(Worker* worker);
Mutex mutex_;
bool shutting_down_;
Worker* all_workers_;
Worker* idle_workers_;
uint64_t count_started_;
uint64_t count_stopped_;
uint64_t count_running_;
uint64_t count_idle_;
Monitor pool_monitor_;
bool shutting_down_ = false;
uint64_t count_running_ = 0;
uint64_t count_idle_ = 0;
uint64_t count_dead_ = 0;
WorkerList running_workers_;
WorkerList idle_workers_;
WorkerList dead_workers_;
uint64_t pending_tasks_ = 0;
TaskList tasks_;
Monitor exit_monitor_;
Worker* shutting_down_workers_;
JoinList* join_list_;
std::atomic<bool> all_workers_dead_;
DISALLOW_COPY_AND_ASSIGN(ThreadPool);
};