[vm/concurrency] Allow thread pool with maximum size to grow in size if workers are blocked in native code

To avoid deadlock scenarios where an application runs N isolates on a
thread pool with T threads (T < N) where all scheduled T isolates will
do FFI call and block. That prevents any other isolates from executing.

To avoid such a scenario we add support for the isolate-group specific
thread pool to dynamically increase the maximum size if a worker is
running a isolate mutator thread, which calls to C which exits the
isolate.
=> While such an isolate is descheduled but still occupies a thread
   pool worker we temporarily incrase the maximum size of the thread pool.

Issue https://github.com/dart-lang/sdk/issues/36097

Change-Id: Id61c39b06766da11f76d607ac7cbe8a8e623f250
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/148120
Commit-Queue: Martin Kustermann <kustermann@google.com>
Reviewed-by: Ryan Macnak <rmacnak@google.com>
This commit is contained in:
Martin Kustermann 2020-05-15 10:57:09 +00:00 committed by commit-bot@chromium.org
parent aa1e71e5f9
commit 874588a859
8 changed files with 242 additions and 37 deletions

View file

@ -757,4 +757,37 @@ DART_EXPORT void StopWorkSimulator2() {
SimulateWork2::StopWorkSimulator();
}
////////////////////////////////////////////////////////////////////////////////
// Helpers used for lightweight isolate tests.
////////////////////////////////////////////////////////////////////////////////
DART_EXPORT void ThreadPoolTest_BarrierSync(
Dart_Isolate (*dart_current_isolate)(),
void (*dart_enter_isolate)(Dart_Isolate),
void (*dart_exit_isolate)(),
intptr_t num_threads) {
// Guaranteed to be initialized exactly once (no race between multiple
// threads).
static std::mutex mutex;
static std::condition_variable cvar;
static intptr_t thread_count = 0;
const Dart_Isolate isolate = dart_current_isolate();
dart_exit_isolate();
{
std::unique_lock<std::mutex> lock(mutex);
++thread_count;
if (thread_count < num_threads) {
while (thread_count < num_threads) {
cvar.wait(lock);
}
} else {
if (thread_count != num_threads) FATAL("bug");
cvar.notify_all();
}
}
dart_enter_isolate(isolate);
}
} // namespace dart

View file

@ -0,0 +1,76 @@
// Copyright (c) 2020, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// SharedObjects=ffi_test_functions
// VMOptions=--enable-isolate-groups --disable-heap-verification
import 'dart:async';
import 'dart:ffi';
import 'package:expect/expect.dart';
import 'test_utils.dart';
import '../../../../../tests/ffi_2/dylib_utils.dart';
// This should be larger than max-new-space-size/tlab-size.
const int threadCount = 200;
class Isolate extends Struct {}
typedef Dart_CurrentIsolateFT = Pointer<Isolate> Function();
typedef Dart_CurrentIsolateNFT = Pointer<Isolate> Function();
typedef Dart_EnterIsolateFT = void Function(Pointer<Isolate>);
typedef Dart_EnterIsolateNFT = Void Function(Pointer<Isolate>);
typedef Dart_ExitIsolateFT = void Function();
typedef Dart_ExitIsolateNFT = Void Function();
final ffiTestFunctions = dlopenPlatformSpecific("ffi_test_functions");
final threadPoolBarrierSync = ffiTestFunctions.lookupFunction<
Void Function(
Pointer<NativeFunction<Dart_CurrentIsolateNFT>>,
Pointer<NativeFunction<Dart_EnterIsolateNFT>>,
Pointer<NativeFunction<Dart_ExitIsolateNFT>>,
IntPtr),
void Function(
Pointer<NativeFunction<Dart_CurrentIsolateNFT>>,
Pointer<NativeFunction<Dart_EnterIsolateNFT>>,
Pointer<NativeFunction<Dart_ExitIsolateNFT>>,
int)>('ThreadPoolTest_BarrierSync');
final Pointer<NativeFunction<Dart_CurrentIsolateNFT>> dartCurrentIsolate =
DynamicLibrary.executable().lookup("Dart_CurrentIsolate").cast();
final Pointer<NativeFunction<Dart_EnterIsolateNFT>> dartEnterIsolate =
DynamicLibrary.executable().lookup("Dart_EnterIsolate").cast();
final Pointer<NativeFunction<Dart_ExitIsolateNFT>> dartExitIsolate =
DynamicLibrary.executable().lookup("Dart_ExitIsolate").cast();
class Worker extends RingElement {
final int id;
Worker(this.id);
Future run(dynamic _, dynamic _2) async {
threadPoolBarrierSync(
dartCurrentIsolate, dartEnterIsolate, dartExitIsolate, threadCount);
return id;
}
}
main(args) async {
final ring = await Ring.create(threadCount);
// Let each worker:
// - call into C
// - exit the isolate
// - wait until notified
// - continue & exit
final results = await ring.run((int id) => Worker(id));
Expect.equals(threadCount, results.length);
for (int i = 0; i < threadCount; ++i) {
Expect.equals(i, results[i]);
}
await ring.close();
}

View file

@ -33,6 +33,10 @@ dart/isolates/sum_recursive_call_ig_test: Skip # Only AOT has lightweight enough
dart/isolates/sum_recursive_call_test: Skip # Only AOT has lightweight enough isolates to run those tests.
dart/isolates/sum_recursive_tail_call_ig_test: Skip # Only AOT has lightweight enough isolates to run those tests.
dart/isolates/sum_recursive_tail_call_test: Skip # Only AOT has lightweight enough isolates to run those tests.
dart/isolates/thread_pool_test: Skip # Only AOT has lightweight enough isolates to run those tests.
[ $arch == simarm || $arch == simarm64 ]
dart/isolates/thread_pool_test: SkipByDesign # Test uses dart:ffi which is not supported on simulators.
[ $builder_tag == asan ]
dart/transferable_throws_oom_test: SkipByDesign # This test tries to allocate too much memory on purpose. Still dartbug.com/37188

View file

@ -595,6 +595,30 @@ void IsolateGroup::UnscheduleThread(Thread* thread,
UnscheduleThreadLocked(&ml, thread, is_mutator, bypass_safepoint);
}
void IsolateGroup::IncreaseMutatorCount(Isolate* mutator) {
ASSERT(mutator->group() == this);
// If the mutator was temporarily blocked on a worker thread, we have to
// unblock the worker thread again.
Thread* mutator_thread = mutator->mutator_thread();
if (mutator_thread != nullptr && mutator_thread->top_exit_frame_info() != 0) {
thread_pool()->MarkCurrentWorkerAsUnBlocked();
}
}
void IsolateGroup::DecreaseMutatorCount(Isolate* mutator) {
ASSERT(mutator->group() == this);
// If the mutator thread has an active stack and runs on our thread pool we
// will mark the worker as blocked, thereby possibly spawning a new worker for
// pending tasks (if there are any).
Thread* mutator_thread = mutator->mutator_thread();
ASSERT(mutator_thread != nullptr);
if (mutator_thread->top_exit_frame_info() != 0) {
thread_pool()->MarkCurrentWorkerAsBlocked();
}
}
#ifndef PRODUCT
void IsolateGroup::PrintJSON(JSONStream* stream, bool ref) {
JSONObject jsobj(stream);
@ -2544,7 +2568,7 @@ void Isolate::LowLevelCleanup(Isolate* isolate) {
const bool shutdown_group =
isolate_group->UnregisterIsolateDecrementCount(isolate);
if (shutdown_group) {
if (!OSThread::CurrentThreadRunsOn(isolate_group->thread_pool())) {
if (!isolate_group->thread_pool()->CurrentThreadIsWorker()) {
isolate_group->Shutdown();
} else {
class ShutdownGroupTask : public ThreadPool::Task {
@ -3614,6 +3638,10 @@ Monitor* IsolateGroup::threads_lock() const {
}
Thread* Isolate::ScheduleThread(bool is_mutator, bool bypass_safepoint) {
if (is_mutator) {
group()->IncreaseMutatorCount(this);
}
// We are about to associate the thread with an isolate group and it would
// not be possible to correctly track no_safepoint_scope_depth for the
// thread in the constructor/destructor of MonitorLocker,
@ -3658,31 +3686,36 @@ Thread* Isolate::ScheduleThread(bool is_mutator, bool bypass_safepoint) {
void Isolate::UnscheduleThread(Thread* thread,
bool is_mutator,
bool bypass_safepoint) {
// Disassociate the 'Thread' structure and unschedule the thread
// from this isolate.
// We are disassociating the thread from an isolate and it would
// not be possible to correctly track no_safepoint_scope_depth for the
// thread in the constructor/destructor of MonitorLocker,
// so we create a MonitorLocker object which does not do any
// no_safepoint_scope_depth increments/decrements.
MonitorLocker ml(group()->threads_lock(), false);
{
// Disassociate the 'Thread' structure and unschedule the thread
// from this isolate.
// We are disassociating the thread from an isolate and it would
// not be possible to correctly track no_safepoint_scope_depth for the
// thread in the constructor/destructor of MonitorLocker,
// so we create a MonitorLocker object which does not do any
// no_safepoint_scope_depth increments/decrements.
MonitorLocker ml(group()->threads_lock(), false);
if (is_mutator) {
if (thread->sticky_error() != Error::null()) {
ASSERT(sticky_error_ == Error::null());
sticky_error_ = thread->StealStickyError();
if (is_mutator) {
if (thread->sticky_error() != Error::null()) {
ASSERT(sticky_error_ == Error::null());
sticky_error_ = thread->StealStickyError();
}
ASSERT(mutator_thread_ == thread);
ASSERT(mutator_thread_ == scheduled_mutator_thread_);
scheduled_mutator_thread_ = nullptr;
} else {
// We only reset the isolate pointer for non-mutator threads, since
// mutator threads can still be visited during GC even if unscheduled.
// See also IsolateGroup::UnscheduleThreadLocked`
thread->isolate_ = nullptr;
}
ASSERT(mutator_thread_ == thread);
ASSERT(mutator_thread_ == scheduled_mutator_thread_);
scheduled_mutator_thread_ = nullptr;
} else {
// We only reset the isolate pointer for non-mutator threads, since mutator
// threads can still be visited during GC even if unscheduled.
// See also IsolateGroup::UnscheduleThreadLocked`
thread->isolate_ = nullptr;
thread->field_table_values_ = nullptr;
group()->UnscheduleThreadLocked(&ml, thread, is_mutator, bypass_safepoint);
}
if (is_mutator) {
group()->DecreaseMutatorCount(this);
}
thread->field_table_values_ = nullptr;
group()->UnscheduleThreadLocked(&ml, thread, is_mutator, bypass_safepoint);
}
static const char* NewConstChar(const char* chars) {

View file

@ -385,6 +385,9 @@ class IsolateGroup : public IntrusiveDListEntry<IsolateGroup> {
bool is_mutator,
bool bypass_safepoint = false);
void IncreaseMutatorCount(Isolate* mutator);
void DecreaseMutatorCount(Isolate* mutator);
Dart_LibraryTagHandler library_tag_handler() const {
return library_tag_handler_;
}
@ -631,7 +634,6 @@ class IsolateGroup : public IntrusiveDListEntry<IsolateGroup> {
ReversePcLookupCache* reverse_pc_lookup_cache_ = nullptr;
ArrayPtr saved_unlinked_calls_;
std::shared_ptr<FieldTable> saved_initial_field_table_;
uint32_t isolate_group_flags_ = 0;
};

View file

@ -38,7 +38,6 @@ class Log;
class Mutex;
class ThreadState;
class TimelineEventBlock;
class ThreadPool;
class Mutex {
public:
@ -236,11 +235,6 @@ class OSThread : public BaseThread {
static void DisableOSThreadCreation();
static void EnableOSThreadCreation();
static bool CurrentThreadRunsOn(ThreadPool* pool) {
auto owning_pool = OSThread::Current()->owning_thread_pool_;
return owning_pool != nullptr && owning_pool == pool;
}
static const intptr_t kStackSizeBufferMax = (16 * KB * kWordSize);
static constexpr float kStackSizeBufferFraction = 0.5;
@ -300,10 +294,10 @@ class OSThread : public BaseThread {
uword stack_limit_;
uword stack_headroom_;
ThreadState* thread_;
// The ThreadPool which owns this OSThread. If this OSThread was not started
// by a ThreadPool it will be nullptr. This TLS value is not protected and
// should only be read/written by the OSThread itself.
ThreadPool* owning_thread_pool_ = nullptr;
// The ThreadPool::Worker which owns this OSThread. If this OSThread was not
// started by a ThreadPool it will be nullptr. This TLS value is not
// protected and should only be read/written by the OSThread itself.
void* owning_thread_pool_worker_ = nullptr;
// thread_list_lock_ cannot have a static lifetime because the order in which
// destructors run is undefined. At the moment this lock cannot be deleted
@ -322,7 +316,7 @@ class OSThread : public BaseThread {
friend class OSThreadIterator;
friend class ThreadInterrupterWin;
friend class ThreadInterrupterFuchsia;
friend class ThreadPool; // to access owning_thread_pool_
friend class ThreadPool; // to access owning_thread_pool_worker_
};
// Note that this takes the thread list lock, prohibiting threads from coming

View file

@ -96,6 +96,53 @@ bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
return true;
}
bool ThreadPool::CurrentThreadIsWorker() {
auto worker =
static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
return worker != nullptr && worker->pool_ == this;
}
void ThreadPool::MarkCurrentWorkerAsBlocked() {
auto worker =
static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
Worker* new_worker = nullptr;
if (worker != nullptr) {
MonitorLocker ml(&pool_monitor_);
ASSERT(!worker->is_blocked_);
worker->is_blocked_ = true;
if (max_pool_size_ > 0) {
++max_pool_size_;
// This thread is blocked and therefore no longer usable as a worker.
// If we have pending tasks and there are no idle workers, we will spawn a
// new thread (temporarily allow exceeding the maximum pool size) to
// handle the pending tasks.
if (idle_workers_.IsEmpty() && pending_tasks_ > 0) {
new_worker = new Worker(this);
idle_workers_.Append(new_worker);
count_idle_++;
}
}
}
if (new_worker != nullptr) {
new_worker->StartThread();
}
}
void ThreadPool::MarkCurrentWorkerAsUnBlocked() {
auto worker =
static_cast<Worker*>(OSThread::Current()->owning_thread_pool_worker_);
if (worker != nullptr) {
MonitorLocker ml(&pool_monitor_);
if (worker->is_blocked_) {
worker->is_blocked_ = false;
if (max_pool_size_ > 0) {
--max_pool_size_;
ASSERT(max_pool_size_ > 0);
}
}
}
}
void ThreadPool::WorkerLoop(Worker* worker) {
WorkerList dead_workers_to_join;
@ -260,7 +307,8 @@ void ThreadPool::Worker::Main(uword args) {
Worker* worker = reinterpret_cast<Worker*>(args);
ThreadPool* pool = worker->pool_;
os_thread->owning_thread_pool_ = pool;
os_thread->owning_thread_pool_worker_ = worker;
worker->os_thread_ = os_thread;
// Once the worker quits it needs to be joined.
worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread);
@ -274,7 +322,8 @@ void ThreadPool::Worker::Main(uword args) {
pool->WorkerLoop(worker);
os_thread->owning_thread_pool_ = nullptr;
worker->os_thread_ = nullptr;
os_thread->owning_thread_pool_worker_ = nullptr;
// Call the thread exit hook here to notify the embedder that the
// thread pool thread is exiting.

View file

@ -46,6 +46,18 @@ class ThreadPool {
return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
}
// Returns `true` if the current thread is runing on the [this] thread pool.
bool CurrentThreadIsWorker();
// Mark the current thread as being blocked (e.g. in native code). This might
// temporarily increase the max thread pool size.
void MarkCurrentWorkerAsBlocked();
// Mark the current thread as being unblocked. Must be called iff
// [MarkCurrentWorkerAsBlocked] was called before and the thread is now ready
// to coninue executing.
void MarkCurrentWorkerAsUnBlocked();
// Triggers shutdown, prevents scheduling of new tasks.
void Shutdown();
@ -73,6 +85,8 @@ class ThreadPool {
// thread.
ThreadPool* pool_;
ThreadJoinId join_id_;
OSThread* os_thread_ = nullptr;
bool is_blocked_ = false;
DISALLOW_COPY_AND_ASSIGN(Worker);
};