dart-sdk/runtime/vm/message_handler.cc

649 lines
19 KiB
C++
Raw Normal View History

// Copyright (c) 2011, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#include <utility>
#include "vm/message_handler.h"
#include "vm/dart.h"
#include "vm/heap/safepoint.h"
#include "vm/isolate.h"
#include "vm/lockers.h"
#include "vm/object.h"
#include "vm/object_store.h"
#include "vm/os.h"
#include "vm/port.h"
#include "vm/thread_interrupter.h"
namespace dart {
DECLARE_FLAG(bool, trace_service_pause_events);
class MessageHandlerTask : public ThreadPool::Task {
public:
explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) {
ASSERT(handler != nullptr);
}
virtual void Run() {
ASSERT(handler_ != nullptr);
handler_->TaskCallback();
}
private:
MessageHandler* handler_;
DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
};
// static
const char* MessageHandler::MessageStatusString(MessageStatus status) {
switch (status) {
case kOK:
return "OK";
case kError:
return "Error";
case kShutdown:
return "Shutdown";
default:
UNREACHABLE();
return "Illegal";
}
}
MessageHandler::MessageHandler()
: queue_(new MessageQueue()),
oob_queue_(new MessageQueue()),
oob_message_handling_allowed_(true),
paused_for_messages_(false),
paused_(0),
#if !defined(PRODUCT)
should_pause_on_start_(false),
should_pause_on_exit_(false),
is_paused_on_start_(false),
is_paused_on_exit_(false),
remembered_paused_on_exit_status_(kOK),
paused_timestamp_(-1),
#endif
task_running_(false),
delete_me_(false),
pool_(nullptr),
start_callback_(nullptr),
end_callback_(nullptr),
callback_data_(0) {
ASSERT(queue_ != nullptr);
ASSERT(oob_queue_ != nullptr);
}
MessageHandler::~MessageHandler() {
delete queue_;
delete oob_queue_;
queue_ = nullptr;
oob_queue_ = nullptr;
pool_ = nullptr;
}
const char* MessageHandler::name() const {
return "<unnamed>";
}
#if defined(DEBUG)
[vm] Disentagle PortMap/MessageHandler from port status tracking The main purpose of the low-level [PortMap] is to coordinate between ports being opened & closed and concurrent message senders. That's the only thing it should do. Each isolate owns [ReceivePort]s. Only the isolate mutator can create ports, delete them or change their "keeps-isolate-alive" state. Right now it requires going via [PortMap] (which acquires lock) and [MessageHandler] (which acquires lock) to change the "keeps-isolate-alive" state of a port. We'll move information whether a dart [ReceivePort] is closed and whether it keeps the isolate alive into the [ReceivePort] object itself. => Changing the "keeps-isolate-alive" state of the port no longer requires any locks. We could even avoid the runtime call itself in a future CL. Isolates are kept alive if there's any open receive ports (that have not been marked as "does not keep isolate alive"). This is a property of an isolate not of the message handler. For native message handlers we do have a 1<->1 correspondence between port and handler (i.e. there's no "number of open ports" tracking needed). => We'll move the logic of counting open receive ports and ports that keep the isolate alive to the [Isolate]. => We'll also remove locking around incrementing/decrementing or accessing the counts. => The [IsolateMessageHandler] will ask the [Isolate] whether there's any open ports for determining whether to shut down. => For native ports, the `Dart_NewNativePort()` & `Dart_CloseNativePort()` functions will manage the lifetime (as their name also suggests). Overall this makes the [Isolate] responsible for creation of dart [ReceivePort]s and tracking whether the isolate should be kept alive: * Isolate::CreateReceivePort() * Isolate::SetReceivePortKeepAliveState() * Isolate::CloseReceivePort() TEST=ci Change-Id: I847ae357c26254d3810cc277962e05deca18a1de Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/317960 Reviewed-by: Alexander Aprelev <aam@google.com> Reviewed-by: Ryan Macnak <rmacnak@google.com> Commit-Queue: Martin Kustermann <kustermann@google.com>
2023-08-08 10:57:47 +00:00
void MessageHandler::CheckAccess() const {
// By default there is no checking.
}
#endif
void MessageHandler::MessageNotify(Message::Priority priority) {
// By default, there is no custom message notification.
}
bool MessageHandler::Run(ThreadPool* pool,
StartCallback start_callback,
EndCallback end_callback,
CallbackData data) {
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
OS::PrintErr(
"[+] Starting message handler:\n"
"\thandler: %s\n",
name());
}
ASSERT(pool_ == nullptr);
ASSERT(!delete_me_);
pool_ = pool;
start_callback_ = start_callback;
end_callback_ = end_callback;
callback_data_ = data;
task_running_ = true;
bool result = pool_->Run<MessageHandlerTask>(this);
if (!result) {
pool_ = nullptr;
start_callback_ = nullptr;
end_callback_ = nullptr;
callback_data_ = 0;
task_running_ = false;
}
return result;
}
void MessageHandler::PostMessage(std::unique_ptr<Message> message,
bool before_events) {
Message::Priority saved_priority;
{
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
Isolate* source_isolate = Isolate::Current();
if (source_isolate != nullptr) {
OS::PrintErr(
"[>] Posting message:\n"
"\tlen: %" Pd "\n\tsource: (%" Pd64
") %s\n\tdest: %s\n"
"\tdest_port: %" Pd64 "\n",
message->Size(), static_cast<int64_t>(source_isolate->main_port()),
source_isolate->name(), name(), message->dest_port());
} else {
OS::PrintErr(
"[>] Posting message:\n"
"\tlen: %" Pd
"\n\tsource: <native code>\n"
"\tdest: %s\n"
"\tdest_port: %" Pd64 "\n",
message->Size(), name(), message->dest_port());
}
}
saved_priority = message->priority();
if (message->IsOOB()) {
oob_queue_->Enqueue(std::move(message), before_events);
} else {
queue_->Enqueue(std::move(message), before_events);
}
if (paused_for_messages_) {
ml.Notify();
}
if (pool_ != nullptr && !task_running_) {
ASSERT(!delete_me_);
task_running_ = true;
const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
ASSERT(launched_successfully);
}
}
// Invoke any custom message notification.
MessageNotify(saved_priority);
}
std::unique_ptr<Message> MessageHandler::DequeueMessage(
Message::Priority min_priority) {
// TODO(turnidge): Add assert that monitor_ is held here.
std::unique_ptr<Message> message = oob_queue_->Dequeue();
if ((message == nullptr) && (min_priority < Message::kOOBPriority)) {
message = queue_->Dequeue();
}
return message;
}
void MessageHandler::ClearOOBQueue() {
oob_queue_->Clear();
}
MessageHandler::MessageStatus MessageHandler::HandleMessages(
MonitorLocker* ml,
bool allow_normal_messages,
bool allow_multiple_normal_messages) {
ASSERT(monitor_.IsOwnedByCurrentThread());
// Scheduling of the mutator thread during the isolate start can cause this
// thread to safepoint.
// We want to avoid holding the message handler monitor during the safepoint
// operation to avoid possible deadlocks, which can occur if other threads are
// sending messages to this message handler.
//
// If isolate() returns nullptr [StartIsolateScope] does nothing.
ml->Exit();
StartIsolateScope start_isolate(isolate());
ml->Enter();
auto idle_time_handler =
isolate() != nullptr ? isolate()->group()->idle_time_handler() : nullptr;
MessageStatus max_status = kOK;
Message::Priority min_priority =
((allow_normal_messages && !paused()) ? Message::kNormalPriority
: Message::kOOBPriority);
std::unique_ptr<Message> message = DequeueMessage(min_priority);
while (message != nullptr) {
intptr_t message_len = message->Size();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[<] Handling message:\n"
"\tlen: %" Pd
"\n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
message_len, name(), message->dest_port());
}
// Release the monitor_ temporarily while we handle the message.
// The monitor was acquired in MessageHandler::TaskCallback().
ml->Exit();
Message::Priority saved_priority = message->priority();
Dart_Port saved_dest_port = message->dest_port();
MessageStatus status = kOK;
{
DisableIdleTimerScope disable_idle_timer(idle_time_handler);
status = HandleMessage(std::move(message));
}
if (status > max_status) {
max_status = status;
}
ml->Enter();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[.] Message handled (%s):\n"
"\tlen: %" Pd
"\n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
MessageStatusString(status), message_len, name(), saved_dest_port);
}
// If we are shutting down, do not process any more messages.
if (status == kShutdown) {
ClearOOBQueue();
break;
}
// Remember time since the last message. Don't consider OOB messages so
// using Observatory doesn't trigger additional idle tasks.
if ((FLAG_idle_timeout_micros != 0) &&
(saved_priority == Message::kNormalPriority)) {
if (idle_time_handler != nullptr) {
idle_time_handler->UpdateStartIdleTime();
}
}
// Some callers want to process only one normal message and then quit. At
// the same time it is OK to process multiple OOB messages.
if ((saved_priority == Message::kNormalPriority) &&
!allow_multiple_normal_messages) {
// We processed one normal message. Allow no more.
allow_normal_messages = false;
}
// Reevaluate the minimum allowable priority. The paused state
// may have changed as part of handling the message. We may also
// have encountered an error during message processing.
//
// Even if we encounter an error, we still process pending OOB
// messages so that we don't lose the message notification.
min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
? Message::kNormalPriority
: Message::kOOBPriority);
message = DequeueMessage(min_priority);
}
return max_status;
}
MessageHandler::MessageStatus MessageHandler::HandleNextMessage() {
// We can only call HandleNextMessage when this handler is not
// assigned to a thread pool.
MonitorLocker ml(&monitor_);
ASSERT(pool_ == nullptr);
ASSERT(!delete_me_);
#if defined(DEBUG)
CheckAccess();
#endif
return HandleMessages(&ml, true, false);
}
MessageHandler::MessageStatus MessageHandler::PauseAndHandleAllMessages(
int64_t timeout_millis) {
MonitorLocker ml(&monitor_, /*no_safepoint_scope=*/false);
ASSERT(task_running_);
ASSERT(!delete_me_);
#if defined(DEBUG)
CheckAccess();
#endif
paused_for_messages_ = true;
while (queue_->IsEmpty() && oob_queue_->IsEmpty()) {
Monitor::WaitResult wr;
{
// Ensure this thread is at a safepoint while we wait for new messages to
// arrive.
TransitionVMToNative transition(Thread::Current());
wr = ml.Wait(timeout_millis);
}
ASSERT(task_running_);
ASSERT(!delete_me_);
if (wr == Monitor::kTimedOut) {
break;
}
if (queue_->IsEmpty()) {
// There are only OOB messages. Handle them and then continue waiting for
// normal messages unless there is an error.
MessageStatus status = HandleMessages(&ml, false, false);
if (status != kOK) {
paused_for_messages_ = false;
return status;
}
}
}
paused_for_messages_ = false;
return HandleMessages(&ml, true, true);
}
MessageHandler::MessageStatus MessageHandler::HandleOOBMessages() {
if (!oob_message_handling_allowed_) {
return kOK;
}
MonitorLocker ml(&monitor_);
ASSERT(!delete_me_);
#if defined(DEBUG)
CheckAccess();
#endif
return HandleMessages(&ml, false, false);
}
#if !defined(PRODUCT)
bool MessageHandler::ShouldPauseOnStart(MessageStatus status) const {
Isolate* owning_isolate = isolate();
if (owning_isolate == nullptr) {
return false;
}
// If we are restarting or shutting down, we do not want to honor
// should_pause_on_start or should_pause_on_exit.
return (status != MessageHandler::kShutdown) && should_pause_on_start() &&
owning_isolate->is_runnable();
}
bool MessageHandler::ShouldPauseOnExit(MessageStatus status) const {
Isolate* owning_isolate = isolate();
if (owning_isolate == nullptr) {
return false;
}
return (status != MessageHandler::kShutdown) && should_pause_on_exit() &&
owning_isolate->is_runnable();
}
#endif
bool MessageHandler::HasOOBMessages() {
MonitorLocker ml(&monitor_);
return !oob_queue_->IsEmpty();
}
[vm] Make reloading of isolate groups use new safepoint-level mechanism The current hot-reload implementation [0] will perform a reload by first sending OOB messages to all isolates and waiting until those OOB messages are being handled. The handler of the OOB message will block the thread (and unschedule isolate) and notify the thread performing reload it's ready. This requires that all isolates within a group can actually run & block. This is the case for the VM implementation of isolates (as they are run an unlimited size thread pool). Though flutter seems to multiplex several engine isolates on the same OS thread. Reloading can then result in one engine isolate performing reload waiting for another to act on the OOB message (which it will not do as it's multiplexed on the same thread as the former). Now that we have a more flexible safepointing mechanism (introduced in [1]) we can utilize for hot reloading by introducing a new "reloading" safepoint level. Reload safepoints ----------------------- We introduce a new safepoint level (SafepointLevel::kGCAndDeoptAndReload). Being at a "reload safepoint" implies being at a "deopt safepoint" which implies being at a "gc safepoint". Code has to explicitly opt-into making safepoint checks participate / check into "reload safepoints" using [ReloadParticipationScope]. We do that at certain well-defined places where reload is possible (e.g. event loop boundaries, descheduling of isolates, OOM message processing, ...). While running under [NoReloadScope] we disable checking into "reload safepoints". Initiator of hot-reload ----------------------- When a mutator initiates a reload operation (e.g. as part of a `ReloadSources` `vm-service` API call) it will use a [ReloadSafepointOperationScope] to get all other mutators to a safepoint. For mutators that aren't already at a "reload safepoint", we'll notify them via an OOB message (instead of scheduling kVMInterrupt). While waiting for all mutators to check into a "reload safepoint", the thread is itself at a safepoint (as other mutators may perform lower level safepoint operations - e.g. GC, Deopt, ...) Once all mutators are at a "reload safepoint" the thread will take ownership of all safepoint levels. Other mutators ----------------------- Mutators can be at a "reload safepoint" already (e.g. isolate is not scheduled). If they try to exit safepoint they will block until the reload operation is finished. Mutators that are not at a "reload safepoint" (e.g. executing Dart or VM code) will be sent an OOB message indicating it should check into a "reload safepoint". We assume mutators make progress until they can process OOB message. Mutators may run under a [NoReloadScope] when handling the OOM message. In that case they will not check into the "reload safepoint" and simply ignore the message. To ensure the thread will eventually check-in, we'll make the destructor of [~NoReloadScope] check & send itself a new OOB message indicating reload should happen. Eventually getting the mutator to process the OOM message (which is a well-defined place where we can check into the reload safepoint). Non-isolate mutators such as the background compiler do not react to OOB messages. This means that either those mutators have to be stopped (e.g. bg compiler) before initiating a reload safepoint operation, the threads have to explicitly opt-into participating in reload safepoints or the threads have to deschedule themselves eventually. Misc ---- Owning a reload safepoint operation implies also owning the deopt & gc safepoint operation. Yet some code would like to ensure it actually runs under a [DeoptSafepointOperatoinScope]/[GCSafepointOperationScope]. => The `Thread::OwnsGCSafepoint()` handles that. While performing hot-reload we may exercise common code (e.g. kernel loader, ...) that acquires safepoint locks. Normally it's disallows to acquire safepoint locks while holding a safepoint operation (since mutators may be stopped at places where they hold locks, creating deadlock scenarios). => We explicitly opt code into participating in reload safepointing requests. Those well-defined places aren't holding safepoint locks. => The `Thread::CanAcquireSafepointLocks()` will return `true` despite owning a reload operation. (But if one also holds deopt/gc safepoint operation it will return false) Example where this matters: As part of hot-reload, we load kernel which may create new symbols. The symbol creation code may acquire the symbol lock and `InsertNewOrGet()` a symbol. This is safe as other mutators don't hold the symbol lock at reload safepoints. The same cannot be said for Deopt/GC safepoint operations - as they can interrupt code at many more places where there's no guarantee that no locks are held. [0] https://dart-review.googlesource.com/c/sdk/+/187461 [1] https://dart-review.googlesource.com/c/sdk/+/196927 Issue https://github.com/flutter/flutter/issues/124546 TEST=Newly added Reload_* tests. Change-Id: I6842d7d2b284d043cc047fd702b7c5c7dd1fa3c5 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/296183 Commit-Queue: Martin Kustermann <kustermann@google.com> Reviewed-by: Slava Egorov <vegorov@google.com>
2023-04-21 13:56:49 +00:00
#if defined(TESTING)
std::unique_ptr<Message> MessageHandler::StealOOBMessage() {
MonitorLocker ml(&monitor_);
ASSERT(!oob_queue_->IsEmpty());
return oob_queue_->Dequeue();
}
#endif
bool MessageHandler::HasMessages() {
MonitorLocker ml(&monitor_);
return !queue_->IsEmpty();
}
void MessageHandler::TaskCallback() {
ASSERT(Isolate::Current() == nullptr);
MessageStatus status = kOK;
bool run_end_callback = false;
bool delete_me = false;
EndCallback end_callback = nullptr;
CallbackData callback_data = 0;
{
// We will occasionally release and reacquire this monitor in this
// function. Whenever we reacquire the monitor we *must* process
// all pending OOB messages, or we may miss a request for vm
// shutdown.
MonitorLocker ml(&monitor_);
// This method is running on the message handler task. Which means no
// other message handler tasks will be started until this one sets
// [task_running_] to false.
ASSERT(task_running_);
#if !defined(PRODUCT)
if (ShouldPauseOnStart(kOK)) {
if (!is_paused_on_start()) {
PausedOnStartLocked(&ml, true);
}
// More messages may have come in before we (re)acquired the monitor.
status = HandleMessages(&ml, false, false);
if (ShouldPauseOnStart(status)) {
// Still paused.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false; // No task in queue.
return;
} else {
PausedOnStartLocked(&ml, false);
}
}
if (is_paused_on_exit()) {
status = HandleMessages(&ml, false, false);
if (ShouldPauseOnExit(status)) {
// Still paused.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false; // No task in queue.
return;
} else {
PausedOnExitLocked(&ml, false);
if (status != kShutdown) {
status = remembered_paused_on_exit_status_;
}
}
}
#endif // !defined(PRODUCT)
if (status == kOK) {
if (start_callback_ != nullptr) {
// Initialize the message handler by running its start function,
// if we have one. For an isolate, this will run the isolate's
// main() function.
//
// Release the monitor_ temporarily while we call the start callback.
ml.Exit();
status = start_callback_(callback_data_);
ASSERT(Isolate::Current() == nullptr);
start_callback_ = nullptr;
ml.Enter();
}
// Handle any pending messages for this message handler.
if (status != kShutdown) {
status = HandleMessages(&ml, (status == kOK), true);
}
}
// The isolate exits when it encounters an error or when it no
// longer has live ports.
[vm] Disentagle PortMap/MessageHandler from port status tracking The main purpose of the low-level [PortMap] is to coordinate between ports being opened & closed and concurrent message senders. That's the only thing it should do. Each isolate owns [ReceivePort]s. Only the isolate mutator can create ports, delete them or change their "keeps-isolate-alive" state. Right now it requires going via [PortMap] (which acquires lock) and [MessageHandler] (which acquires lock) to change the "keeps-isolate-alive" state of a port. We'll move information whether a dart [ReceivePort] is closed and whether it keeps the isolate alive into the [ReceivePort] object itself. => Changing the "keeps-isolate-alive" state of the port no longer requires any locks. We could even avoid the runtime call itself in a future CL. Isolates are kept alive if there's any open receive ports (that have not been marked as "does not keep isolate alive"). This is a property of an isolate not of the message handler. For native message handlers we do have a 1<->1 correspondence between port and handler (i.e. there's no "number of open ports" tracking needed). => We'll move the logic of counting open receive ports and ports that keep the isolate alive to the [Isolate]. => We'll also remove locking around incrementing/decrementing or accessing the counts. => The [IsolateMessageHandler] will ask the [Isolate] whether there's any open ports for determining whether to shut down. => For native ports, the `Dart_NewNativePort()` & `Dart_CloseNativePort()` functions will manage the lifetime (as their name also suggests). Overall this makes the [Isolate] responsible for creation of dart [ReceivePort]s and tracking whether the isolate should be kept alive: * Isolate::CreateReceivePort() * Isolate::SetReceivePortKeepAliveState() * Isolate::CloseReceivePort() TEST=ci Change-Id: I847ae357c26254d3810cc277962e05deca18a1de Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/317960 Reviewed-by: Alexander Aprelev <aam@google.com> Reviewed-by: Ryan Macnak <rmacnak@google.com> Commit-Queue: Martin Kustermann <kustermann@google.com>
2023-08-08 10:57:47 +00:00
if (status != kOK || !KeepAliveLocked()) {
#if !defined(PRODUCT)
if (ShouldPauseOnExit(status)) {
if (FLAG_trace_service_pause_events) {
OS::PrintErr(
"Isolate %s paused before exiting. "
"Use the Observatory to release it.\n",
name());
}
remembered_paused_on_exit_status_ = status;
PausedOnExitLocked(&ml, true);
// More messages may have come in while we released the monitor.
status = HandleMessages(&ml, /*allow_normal_messages=*/false,
/*allow_multiple_normal_messagesfalse=*/false);
if (ShouldPauseOnExit(status)) {
// Still paused.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false; // No task in queue.
return;
} else {
PausedOnExitLocked(&ml, false);
}
}
#endif // !defined(PRODUCT)
if (FLAG_trace_isolates) {
if (status != kOK && thread() != nullptr) {
const Error& error = Error::Handle(thread()->sticky_error());
OS::PrintErr(
"[-] Stopping message handler (%s):\n"
"\thandler: %s\n"
"\terror: %s\n",
MessageStatusString(status), name(), error.ToCString());
} else {
OS::PrintErr(
"[-] Stopping message handler (%s):\n"
"\thandler: %s\n",
MessageStatusString(status), name());
}
}
pool_ = nullptr;
// Decide if we have a callback before releasing the monitor.
end_callback = end_callback_;
callback_data = callback_data_;
run_end_callback = end_callback_ != nullptr;
delete_me = delete_me_;
}
// Clear task_running_ last. This allows other tasks to potentially start
// for this message handler.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false;
}
// The handler may have been deleted by another thread here if it is a native
// message handler.
// Message handlers either use delete_me or end_callback but not both.
ASSERT(!delete_me || !run_end_callback);
if (run_end_callback) {
ASSERT(end_callback != nullptr);
end_callback(callback_data);
// The handler may have been deleted after this point.
}
if (delete_me) {
delete this;
}
}
void MessageHandler::ClosePort(Dart_Port port) {
if (FLAG_trace_isolates) {
[vm] Disentagle PortMap/MessageHandler from port status tracking The main purpose of the low-level [PortMap] is to coordinate between ports being opened & closed and concurrent message senders. That's the only thing it should do. Each isolate owns [ReceivePort]s. Only the isolate mutator can create ports, delete them or change their "keeps-isolate-alive" state. Right now it requires going via [PortMap] (which acquires lock) and [MessageHandler] (which acquires lock) to change the "keeps-isolate-alive" state of a port. We'll move information whether a dart [ReceivePort] is closed and whether it keeps the isolate alive into the [ReceivePort] object itself. => Changing the "keeps-isolate-alive" state of the port no longer requires any locks. We could even avoid the runtime call itself in a future CL. Isolates are kept alive if there's any open receive ports (that have not been marked as "does not keep isolate alive"). This is a property of an isolate not of the message handler. For native message handlers we do have a 1<->1 correspondence between port and handler (i.e. there's no "number of open ports" tracking needed). => We'll move the logic of counting open receive ports and ports that keep the isolate alive to the [Isolate]. => We'll also remove locking around incrementing/decrementing or accessing the counts. => The [IsolateMessageHandler] will ask the [Isolate] whether there's any open ports for determining whether to shut down. => For native ports, the `Dart_NewNativePort()` & `Dart_CloseNativePort()` functions will manage the lifetime (as their name also suggests). Overall this makes the [Isolate] responsible for creation of dart [ReceivePort]s and tracking whether the isolate should be kept alive: * Isolate::CreateReceivePort() * Isolate::SetReceivePortKeepAliveState() * Isolate::CloseReceivePort() TEST=ci Change-Id: I847ae357c26254d3810cc277962e05deca18a1de Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/317960 Reviewed-by: Alexander Aprelev <aam@google.com> Reviewed-by: Ryan Macnak <rmacnak@google.com> Commit-Queue: Martin Kustermann <kustermann@google.com>
2023-08-08 10:57:47 +00:00
MonitorLocker ml(&monitor_);
OS::PrintErr(
"[-] Closing port:\n"
"\thandler: %s\n"
[vm] Disentagle PortMap/MessageHandler from port status tracking The main purpose of the low-level [PortMap] is to coordinate between ports being opened & closed and concurrent message senders. That's the only thing it should do. Each isolate owns [ReceivePort]s. Only the isolate mutator can create ports, delete them or change their "keeps-isolate-alive" state. Right now it requires going via [PortMap] (which acquires lock) and [MessageHandler] (which acquires lock) to change the "keeps-isolate-alive" state of a port. We'll move information whether a dart [ReceivePort] is closed and whether it keeps the isolate alive into the [ReceivePort] object itself. => Changing the "keeps-isolate-alive" state of the port no longer requires any locks. We could even avoid the runtime call itself in a future CL. Isolates are kept alive if there's any open receive ports (that have not been marked as "does not keep isolate alive"). This is a property of an isolate not of the message handler. For native message handlers we do have a 1<->1 correspondence between port and handler (i.e. there's no "number of open ports" tracking needed). => We'll move the logic of counting open receive ports and ports that keep the isolate alive to the [Isolate]. => We'll also remove locking around incrementing/decrementing or accessing the counts. => The [IsolateMessageHandler] will ask the [Isolate] whether there's any open ports for determining whether to shut down. => For native ports, the `Dart_NewNativePort()` & `Dart_CloseNativePort()` functions will manage the lifetime (as their name also suggests). Overall this makes the [Isolate] responsible for creation of dart [ReceivePort]s and tracking whether the isolate should be kept alive: * Isolate::CreateReceivePort() * Isolate::SetReceivePortKeepAliveState() * Isolate::CloseReceivePort() TEST=ci Change-Id: I847ae357c26254d3810cc277962e05deca18a1de Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/317960 Reviewed-by: Alexander Aprelev <aam@google.com> Reviewed-by: Ryan Macnak <rmacnak@google.com> Commit-Queue: Martin Kustermann <kustermann@google.com>
2023-08-08 10:57:47 +00:00
"\tport: %" Pd64 "\n",
name(), port);
}
}
void MessageHandler::CloseAllPorts() {
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
OS::PrintErr(
"[-] Closing all ports:\n"
"\thandler: %s\n",
name());
}
queue_->Clear();
oob_queue_->Clear();
}
void MessageHandler::RequestDeletion() {
{
MonitorLocker ml(&monitor_);
if (task_running_) {
// This message handler currently has a task running on the thread pool.
delete_me_ = true;
return;
}
}
// This message handler has no current task. Delete it.
delete this;
}
#if !defined(PRODUCT)
void MessageHandler::DebugDump() {
PortMap::DebugDumpForMessageHandler(this);
}
void MessageHandler::PausedOnStart(bool paused) {
MonitorLocker ml(&monitor_);
PausedOnStartLocked(&ml, paused);
}
void MessageHandler::PausedOnStartLocked(MonitorLocker* ml, bool paused) {
if (paused) {
ASSERT(!is_paused_on_start_);
ASSERT(paused_timestamp_ == -1);
paused_timestamp_ = OS::GetCurrentTimeMillis();
// Temporarily release the monitor when calling out to
// NotifyPauseOnStart. This avoids a dead lock that can occur
// when this message handler tries to post a message while a
// message is being posted to it.
ml->Exit();
NotifyPauseOnStart();
ml->Enter();
is_paused_on_start_ = true;
} else {
ASSERT(is_paused_on_start_);
ASSERT(paused_timestamp_ != -1);
paused_timestamp_ = -1;
// Resumed. Clear the resume request of the owning isolate.
Isolate* owning_isolate = isolate();
if (owning_isolate != nullptr) {
owning_isolate->GetAndClearResumeRequest();
}
is_paused_on_start_ = false;
}
}
void MessageHandler::PausedOnExit(bool paused) {
MonitorLocker ml(&monitor_);
PausedOnExitLocked(&ml, paused);
}
void MessageHandler::PausedOnExitLocked(MonitorLocker* ml, bool paused) {
if (paused) {
ASSERT(!is_paused_on_exit_);
ASSERT(paused_timestamp_ == -1);
paused_timestamp_ = OS::GetCurrentTimeMillis();
// Temporarily release the monitor when calling out to
// NotifyPauseOnExit. This avoids a dead lock that can
// occur when this message handler tries to post a message
// while a message is being posted to it.
ml->Exit();
NotifyPauseOnExit();
ml->Enter();
is_paused_on_exit_ = true;
} else {
ASSERT(is_paused_on_exit_);
ASSERT(paused_timestamp_ != -1);
paused_timestamp_ = -1;
// Resumed. Clear the resume request of the owning isolate.
Isolate* owning_isolate = isolate();
if (owning_isolate != nullptr) {
owning_isolate->GetAndClearResumeRequest();
}
is_paused_on_exit_ = false;
}
}
#endif // !defined(PRODUCT)
MessageHandler::AcquiredQueues::AcquiredQueues(MessageHandler* handler)
: handler_(handler), ml_(&handler->monitor_) {
ASSERT(handler != nullptr);
handler_->oob_message_handling_allowed_ = false;
}
MessageHandler::AcquiredQueues::~AcquiredQueues() {
ASSERT(handler_ != nullptr);
handler_->oob_message_handling_allowed_ = true;
}
} // namespace dart