mirror of
https://github.com/dart-lang/sdk
synced 2024-09-15 23:19:47 +00:00
[vm/concurrency] Avoid quadratic complexity when closing ports on isolate shutdown
When an isolate dies it has to close all it's open ports. Right now this is done by walking all ports in the system. => O(#num-isolates x #num-ports) complexity Instead we let each isolate remember it's own open ports: When the isolate dies we iterate over those ports that particular isolate has open and close them. => O(#num-isolates + #num-ports) complexity On a many-isolate test of the form: foo(int n) { if (n == 1) return 1; return foo(n-1) + 1; } for `foo(50k)` this reduces the runtime from 90 seconds to 30 seconds. Issue https://github.com/dart-lang/sdk/issues/36097 Change-Id: I3ac68c5334a1d5e8cac47e89e15cbb50c26b65ac Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/144261 Commit-Queue: Martin Kustermann <kustermann@google.com> Reviewed-by: Ryan Macnak <rmacnak@google.com>
This commit is contained in:
parent
d5137fac1a
commit
72320b6c9a
|
@ -11,6 +11,7 @@
|
|||
#include "vm/lockers.h"
|
||||
#include "vm/message.h"
|
||||
#include "vm/os_thread.h"
|
||||
#include "vm/port_set.h"
|
||||
#include "vm/thread_pool.h"
|
||||
|
||||
namespace dart {
|
||||
|
@ -211,6 +212,8 @@ class MessageHandler {
|
|||
friend class MessageHandlerTestPeer;
|
||||
friend class MessageHandlerTask;
|
||||
|
||||
struct PortSetEntry : public PortSet<PortSetEntry>::Entry {};
|
||||
|
||||
// Called by MessageHandlerTask to process our task queue.
|
||||
void TaskCallback();
|
||||
|
||||
|
@ -248,6 +251,8 @@ class MessageHandler {
|
|||
// thread.
|
||||
bool oob_message_handling_allowed_;
|
||||
bool paused_for_messages_;
|
||||
PortSet<PortSetEntry>
|
||||
ports_; // Only accessed by [PortMap], protected by [PortMap]s lock.
|
||||
intptr_t live_ports_; // The number of open ports, including control ports.
|
||||
intptr_t paused_; // The number of pause messages received.
|
||||
#if !defined(PRODUCT)
|
||||
|
|
|
@ -94,11 +94,20 @@ Dart_Port PortMap::CreatePort(MessageHandler* handler) {
|
|||
handler->CheckAccess();
|
||||
#endif
|
||||
|
||||
const Dart_Port port = AllocatePort();
|
||||
|
||||
// The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
|
||||
// by the [PortMap::mutex_] we already hold.
|
||||
MessageHandler::PortSetEntry isolate_entry;
|
||||
isolate_entry.port = port;
|
||||
handler->ports_.Insert(isolate_entry);
|
||||
|
||||
Entry entry;
|
||||
entry.port = AllocatePort();
|
||||
entry.port = port;
|
||||
entry.handler = handler;
|
||||
entry.state = kNewPort;
|
||||
ports_->Insert(entry);
|
||||
|
||||
if (FLAG_trace_isolates) {
|
||||
OS::PrintErr(
|
||||
"[+] Opening port: \n"
|
||||
|
@ -134,6 +143,13 @@ bool PortMap::ClosePort(Dart_Port port) {
|
|||
// while flushing the messages below.
|
||||
it.Delete();
|
||||
ports_->Rebalance();
|
||||
|
||||
// The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
|
||||
// by the [PortMap::mutex_] we already hold.
|
||||
auto isolate_it = handler->ports_.TryLookup(port);
|
||||
ASSERT(isolate_it != handler->ports_.end());
|
||||
isolate_it.Delete();
|
||||
handler->ports_.Rebalance();
|
||||
}
|
||||
handler->ClosePort(port);
|
||||
if (!handler->HasLivePorts() && handler->OwnedByPortMap()) {
|
||||
|
@ -146,15 +162,22 @@ bool PortMap::ClosePort(Dart_Port port) {
|
|||
void PortMap::ClosePorts(MessageHandler* handler) {
|
||||
{
|
||||
MutexLocker ml(mutex_);
|
||||
for (auto it = ports_->begin(); it != ports_->end(); ++it) {
|
||||
const auto& entry = *it;
|
||||
if (entry.handler == handler) {
|
||||
if (entry.state == kLivePort) {
|
||||
handler->decrement_live_ports();
|
||||
}
|
||||
it.Delete();
|
||||
// The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
|
||||
// by the [PortMap::mutex_] we already hold.
|
||||
for (auto isolate_it = handler->ports_.begin();
|
||||
isolate_it != handler->ports_.end(); ++isolate_it) {
|
||||
auto it = ports_->TryLookup((*isolate_it).port);
|
||||
ASSERT(it != ports_->end());
|
||||
Entry entry = *it;
|
||||
ASSERT(entry.port == (*isolate_it).port);
|
||||
ASSERT(entry.handler == handler);
|
||||
if (entry.state == kLivePort) {
|
||||
handler->decrement_live_ports();
|
||||
}
|
||||
it.Delete();
|
||||
isolate_it.Delete();
|
||||
}
|
||||
ASSERT(handler->ports_.IsEmpty());
|
||||
ports_->Rebalance();
|
||||
}
|
||||
handler->CloseAllPorts();
|
||||
|
|
|
@ -108,6 +108,8 @@ class PortSet {
|
|||
map_ = nullptr;
|
||||
}
|
||||
|
||||
bool IsEmpty() const { return used_ == 0; }
|
||||
|
||||
DART_FORCE_INLINE Iterator begin() {
|
||||
for (intptr_t i = 0; i < capacity_; ++i) {
|
||||
auto& entry = map_[i];
|
||||
|
|
Loading…
Reference in a new issue