mirror of
https://github.com/dart-lang/sdk
synced 2024-09-20 04:21:32 +00:00
4c83df8741
Closes https://github.com/dart-lang/sdk/issues/42097 Change-Id: I3fb89e18936b3ec95def268cd270ada889e0e9b5 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/149322 Reviewed-by: Alexander Aprelev <aam@google.com> Commit-Queue: Martin Kustermann <kustermann@google.com>
413 lines
14 KiB
C++
413 lines
14 KiB
C++
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
|
// for details. All rights reserved. Use of this source code is governed by a
|
|
// BSD-style license that can be found in the LICENSE file.
|
|
|
|
#include <utility>
|
|
|
|
#include "vm/message_handler.h"
|
|
#include "vm/port.h"
|
|
#include "vm/unit_test.h"
|
|
|
|
namespace dart {
|
|
|
|
class MessageHandlerTestPeer {
|
|
public:
|
|
explicit MessageHandlerTestPeer(MessageHandler* handler)
|
|
: handler_(handler) {}
|
|
|
|
void PostMessage(std::unique_ptr<Message> message) {
|
|
handler_->PostMessage(std::move(message));
|
|
}
|
|
void ClosePort(Dart_Port port) { handler_->ClosePort(port); }
|
|
void CloseAllPorts() { handler_->CloseAllPorts(); }
|
|
|
|
void increment_live_ports() { handler_->increment_live_ports(); }
|
|
void decrement_live_ports() { handler_->decrement_live_ports(); }
|
|
|
|
MessageQueue* queue() const { return handler_->queue_; }
|
|
MessageQueue* oob_queue() const { return handler_->oob_queue_; }
|
|
|
|
private:
|
|
MessageHandler* handler_;
|
|
|
|
DISALLOW_COPY_AND_ASSIGN(MessageHandlerTestPeer);
|
|
};
|
|
|
|
class TestMessageHandler : public MessageHandler {
|
|
public:
|
|
TestMessageHandler()
|
|
: port_buffer_(NULL),
|
|
port_buffer_size_(0),
|
|
notify_count_(0),
|
|
message_count_(0),
|
|
start_called_(false),
|
|
end_called_(false),
|
|
results_(NULL),
|
|
monitor_() {}
|
|
|
|
~TestMessageHandler() {
|
|
PortMap::ClosePorts(this);
|
|
delete[] port_buffer_;
|
|
}
|
|
|
|
void MessageNotify(Message::Priority priority) {
|
|
MonitorLocker ml(&monitor_);
|
|
notify_count_++;
|
|
ml.Notify();
|
|
}
|
|
|
|
MessageStatus HandleMessage(std::unique_ptr<Message> message) {
|
|
// For testing purposes, keep a list of the ports
|
|
// for all messages we receive.
|
|
MonitorLocker ml(&monitor_);
|
|
AddPortToBuffer(message->dest_port());
|
|
message_count_++;
|
|
MessageStatus status = kOK;
|
|
if (results_ != NULL) {
|
|
status = results_[0];
|
|
results_++;
|
|
}
|
|
ml.Notify();
|
|
return status;
|
|
}
|
|
|
|
MessageStatus Start() {
|
|
start_called_ = true;
|
|
return kOK;
|
|
}
|
|
|
|
void End() {
|
|
MonitorLocker ml(&monitor_);
|
|
end_called_ = true;
|
|
AddPortToBuffer(-2);
|
|
ml.Notify();
|
|
}
|
|
|
|
Dart_Port* port_buffer() const { return port_buffer_; }
|
|
int notify_count() const { return notify_count_; }
|
|
int message_count() const { return message_count_; }
|
|
bool start_called() const { return start_called_; }
|
|
bool end_called() const { return end_called_; }
|
|
|
|
void set_results(MessageStatus* results) { results_ = results; }
|
|
|
|
Monitor* monitor() { return &monitor_; }
|
|
|
|
private:
|
|
void AddPortToBuffer(Dart_Port port) {
|
|
if (port_buffer_ == NULL) {
|
|
port_buffer_ = new Dart_Port[10];
|
|
port_buffer_size_ = 10;
|
|
} else if (message_count_ == port_buffer_size_) {
|
|
int new_port_buffer_size_ = 2 * port_buffer_size_;
|
|
Dart_Port* new_port_buffer_ = new Dart_Port[new_port_buffer_size_];
|
|
for (int i = 0; i < port_buffer_size_; i++) {
|
|
new_port_buffer_[i] = port_buffer_[i];
|
|
}
|
|
delete[] port_buffer_;
|
|
port_buffer_ = new_port_buffer_;
|
|
port_buffer_size_ = new_port_buffer_size_;
|
|
}
|
|
port_buffer_[message_count_] = port;
|
|
}
|
|
|
|
Dart_Port* port_buffer_;
|
|
int port_buffer_size_;
|
|
int notify_count_;
|
|
int message_count_;
|
|
bool start_called_;
|
|
bool end_called_;
|
|
MessageStatus* results_;
|
|
Monitor monitor_;
|
|
|
|
DISALLOW_COPY_AND_ASSIGN(TestMessageHandler);
|
|
};
|
|
|
|
MessageHandler::MessageStatus TestStartFunction(uword data) {
|
|
return (reinterpret_cast<TestMessageHandler*>(data))->Start();
|
|
}
|
|
|
|
void TestEndFunction(uword data) {
|
|
return (reinterpret_cast<TestMessageHandler*>(data))->End();
|
|
}
|
|
|
|
static std::unique_ptr<Message> BlankMessage(Dart_Port dest,
|
|
Message::Priority priority) {
|
|
return Message::New(dest, reinterpret_cast<uint8_t*>(malloc(1)), 1, nullptr,
|
|
priority);
|
|
}
|
|
|
|
VM_UNIT_TEST_CASE(MessageHandler_PostMessage) {
|
|
TestMessageHandler handler;
|
|
MessageHandlerTestPeer handler_peer(&handler);
|
|
EXPECT_EQ(0, handler.notify_count());
|
|
|
|
// Post a message.
|
|
std::unique_ptr<Message> message = BlankMessage(1, Message::kNormalPriority);
|
|
Message* raw_message = message.get();
|
|
handler_peer.PostMessage(std::move(message));
|
|
|
|
// The notify callback is called.
|
|
EXPECT_EQ(1, handler.notify_count());
|
|
|
|
// The message has been added to the correct queue.
|
|
EXPECT(raw_message == handler_peer.queue()->Dequeue().get());
|
|
EXPECT(nullptr == handler_peer.oob_queue()->Dequeue());
|
|
|
|
// Post an oob message.
|
|
message = BlankMessage(1, Message::kOOBPriority);
|
|
raw_message = message.get();
|
|
handler_peer.PostMessage(std::move(message));
|
|
|
|
// The notify callback is called.
|
|
EXPECT_EQ(2, handler.notify_count());
|
|
|
|
// The message has been added to the correct queue.
|
|
EXPECT(raw_message == handler_peer.oob_queue()->Dequeue().get());
|
|
EXPECT(nullptr == handler_peer.queue()->Dequeue());
|
|
}
|
|
|
|
VM_UNIT_TEST_CASE(MessageHandler_HasOOBMessages) {
|
|
TestMessageHandler handler;
|
|
MessageHandlerTestPeer handler_peer(&handler);
|
|
|
|
EXPECT(!handler.HasOOBMessages());
|
|
|
|
// Post a normal message.
|
|
std::unique_ptr<Message> message = BlankMessage(1, Message::kNormalPriority);
|
|
handler_peer.PostMessage(std::move(message));
|
|
EXPECT(!handler.HasOOBMessages());
|
|
{
|
|
// Acquire ownership of message handler queues, verify one regular message.
|
|
MessageHandler::AcquiredQueues aq(&handler);
|
|
EXPECT(aq.queue()->Length() == 1);
|
|
}
|
|
|
|
// Post an oob message.
|
|
message = BlankMessage(1, Message::kOOBPriority);
|
|
handler_peer.PostMessage(std::move(message));
|
|
EXPECT(handler.HasOOBMessages());
|
|
{
|
|
// Acquire ownership of message handler queues, verify one regular and one
|
|
// OOB message.
|
|
MessageHandler::AcquiredQueues aq(&handler);
|
|
EXPECT(aq.queue()->Length() == 1);
|
|
EXPECT(aq.oob_queue()->Length() == 1);
|
|
}
|
|
|
|
// Delete all pending messages.
|
|
handler_peer.CloseAllPorts();
|
|
}
|
|
|
|
VM_UNIT_TEST_CASE(MessageHandler_ClosePort) {
|
|
TestMessageHandler handler;
|
|
MessageHandlerTestPeer handler_peer(&handler);
|
|
std::unique_ptr<Message> message;
|
|
message = BlankMessage(1, Message::kNormalPriority);
|
|
Message* raw_message1 = message.get();
|
|
handler_peer.PostMessage(std::move(message));
|
|
message = BlankMessage(2, Message::kNormalPriority);
|
|
Message* raw_message2 = message.get();
|
|
handler_peer.PostMessage(std::move(message));
|
|
|
|
handler_peer.ClosePort(1);
|
|
|
|
// Closing the port does not drop the messages from the queue.
|
|
EXPECT(raw_message1 == handler_peer.queue()->Dequeue().get());
|
|
EXPECT(raw_message2 == handler_peer.queue()->Dequeue().get());
|
|
}
|
|
|
|
VM_UNIT_TEST_CASE(MessageHandler_CloseAllPorts) {
|
|
TestMessageHandler handler;
|
|
MessageHandlerTestPeer handler_peer(&handler);
|
|
handler_peer.PostMessage(BlankMessage(1, Message::kNormalPriority));
|
|
handler_peer.PostMessage(BlankMessage(2, Message::kNormalPriority));
|
|
|
|
handler_peer.CloseAllPorts();
|
|
|
|
// All messages are dropped from the queue.
|
|
EXPECT(nullptr == handler_peer.queue()->Dequeue());
|
|
}
|
|
|
|
VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage) {
|
|
TestMessageHandler handler;
|
|
MessageHandlerTestPeer handler_peer(&handler);
|
|
Dart_Port port1 = PortMap::CreatePort(&handler);
|
|
Dart_Port port2 = PortMap::CreatePort(&handler);
|
|
Dart_Port port3 = PortMap::CreatePort(&handler);
|
|
handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
|
|
handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
|
|
handler_peer.PostMessage(BlankMessage(port2, Message::kNormalPriority));
|
|
handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
|
|
|
|
// We handle both oob messages and a single normal message.
|
|
EXPECT_EQ(MessageHandler::kOK, handler.HandleNextMessage());
|
|
EXPECT_EQ(3, handler.message_count());
|
|
Dart_Port* ports = handler.port_buffer();
|
|
EXPECT_EQ(port2, ports[0]);
|
|
EXPECT_EQ(port3, ports[1]);
|
|
EXPECT_EQ(port1, ports[2]);
|
|
}
|
|
|
|
VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_ProcessOOBAfterError) {
|
|
TestMessageHandler handler;
|
|
MessageHandler::MessageStatus results[] = {
|
|
MessageHandler::kError, // oob_message1
|
|
MessageHandler::kOK, // oob_message2
|
|
MessageHandler::kOK, // unused
|
|
};
|
|
handler.set_results(results);
|
|
MessageHandlerTestPeer handler_peer(&handler);
|
|
Dart_Port port1 = PortMap::CreatePort(&handler);
|
|
Dart_Port port2 = PortMap::CreatePort(&handler);
|
|
Dart_Port port3 = PortMap::CreatePort(&handler);
|
|
handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
|
|
handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
|
|
handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
|
|
|
|
// When we get an error, we continue processing oob messages but
|
|
// stop handling normal messages.
|
|
EXPECT_EQ(MessageHandler::kError, handler.HandleNextMessage());
|
|
EXPECT_EQ(2, handler.message_count());
|
|
Dart_Port* ports = handler.port_buffer();
|
|
EXPECT_EQ(port2, ports[0]); // oob_message1, error
|
|
EXPECT_EQ(port3, ports[1]); // oob_message2, ok
|
|
handler_peer.CloseAllPorts();
|
|
}
|
|
|
|
VM_UNIT_TEST_CASE(MessageHandler_HandleNextMessage_Shutdown) {
|
|
TestMessageHandler handler;
|
|
MessageHandler::MessageStatus results[] = {
|
|
MessageHandler::kOK, // oob_message1
|
|
MessageHandler::kShutdown, // oob_message2
|
|
MessageHandler::kOK, // unused
|
|
MessageHandler::kOK, // unused
|
|
};
|
|
handler.set_results(results);
|
|
MessageHandlerTestPeer handler_peer(&handler);
|
|
Dart_Port port1 = PortMap::CreatePort(&handler);
|
|
Dart_Port port2 = PortMap::CreatePort(&handler);
|
|
Dart_Port port3 = PortMap::CreatePort(&handler);
|
|
Dart_Port port4 = PortMap::CreatePort(&handler);
|
|
handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
|
|
handler_peer.PostMessage(BlankMessage(port2, Message::kOOBPriority));
|
|
handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
|
|
handler_peer.PostMessage(BlankMessage(port4, Message::kOOBPriority));
|
|
|
|
// When we get a shutdown message, we stop processing all messages.
|
|
EXPECT_EQ(MessageHandler::kShutdown, handler.HandleNextMessage());
|
|
EXPECT_EQ(2, handler.message_count());
|
|
Dart_Port* ports = handler.port_buffer();
|
|
EXPECT_EQ(port2, ports[0]); // oob_message1, ok
|
|
EXPECT_EQ(port3, ports[1]); // oob_message2, shutdown
|
|
{
|
|
// The oob queue has been cleared. oob_message3 is gone.
|
|
MessageHandler::AcquiredQueues aq(&handler);
|
|
EXPECT(aq.oob_queue()->Length() == 0);
|
|
}
|
|
handler_peer.CloseAllPorts();
|
|
}
|
|
|
|
VM_UNIT_TEST_CASE(MessageHandler_HandleOOBMessages) {
|
|
TestMessageHandler handler;
|
|
MessageHandlerTestPeer handler_peer(&handler);
|
|
Dart_Port port1 = PortMap::CreatePort(&handler);
|
|
Dart_Port port2 = PortMap::CreatePort(&handler);
|
|
Dart_Port port3 = PortMap::CreatePort(&handler);
|
|
Dart_Port port4 = PortMap::CreatePort(&handler);
|
|
handler_peer.PostMessage(BlankMessage(port1, Message::kNormalPriority));
|
|
handler_peer.PostMessage(BlankMessage(port2, Message::kNormalPriority));
|
|
handler_peer.PostMessage(BlankMessage(port3, Message::kOOBPriority));
|
|
handler_peer.PostMessage(BlankMessage(port4, Message::kOOBPriority));
|
|
|
|
// We handle both oob messages but no normal messages.
|
|
EXPECT_EQ(MessageHandler::kOK, handler.HandleOOBMessages());
|
|
EXPECT_EQ(2, handler.message_count());
|
|
Dart_Port* ports = handler.port_buffer();
|
|
EXPECT_EQ(port3, ports[0]);
|
|
EXPECT_EQ(port4, ports[1]);
|
|
handler_peer.CloseAllPorts();
|
|
}
|
|
|
|
struct ThreadStartInfo {
|
|
MessageHandler* handler;
|
|
Dart_Port* ports;
|
|
int count;
|
|
ThreadJoinId join_id;
|
|
};
|
|
|
|
static void SendMessages(uword param) {
|
|
ThreadStartInfo* info = reinterpret_cast<ThreadStartInfo*>(param);
|
|
info->join_id = OSThread::GetCurrentThreadJoinId(OSThread::Current());
|
|
MessageHandler* handler = info->handler;
|
|
MessageHandlerTestPeer handler_peer(handler);
|
|
for (int i = 0; i < info->count; i++) {
|
|
handler_peer.PostMessage(
|
|
BlankMessage(info->ports[i], Message::kNormalPriority));
|
|
}
|
|
}
|
|
|
|
VM_UNIT_TEST_CASE(MessageHandler_Run) {
|
|
TestMessageHandler handler;
|
|
ThreadPool pool;
|
|
MessageHandlerTestPeer handler_peer(&handler);
|
|
|
|
EXPECT(!handler.HasLivePorts());
|
|
handler_peer.increment_live_ports();
|
|
|
|
handler.Run(&pool, TestStartFunction, TestEndFunction,
|
|
reinterpret_cast<uword>(&handler));
|
|
Dart_Port port = PortMap::CreatePort(&handler);
|
|
handler_peer.PostMessage(BlankMessage(port, Message::kNormalPriority));
|
|
|
|
// Wait for the first message to be handled.
|
|
{
|
|
MonitorLocker ml(handler.monitor());
|
|
while (handler.message_count() < 1) {
|
|
ml.Wait();
|
|
}
|
|
EXPECT_EQ(1, handler.message_count());
|
|
EXPECT(handler.start_called());
|
|
EXPECT(!handler.end_called());
|
|
Dart_Port* handler_ports = handler.port_buffer();
|
|
EXPECT_EQ(port, handler_ports[0]);
|
|
}
|
|
|
|
// Start a thread which sends more messages.
|
|
Dart_Port ports[10];
|
|
for (int i = 0; i < 10; i++) {
|
|
ports[i] = PortMap::CreatePort(&handler);
|
|
}
|
|
ThreadStartInfo info;
|
|
info.handler = &handler;
|
|
info.ports = ports;
|
|
info.count = 10;
|
|
info.join_id = OSThread::kInvalidThreadJoinId;
|
|
OSThread::Start("SendMessages", SendMessages, reinterpret_cast<uword>(&info));
|
|
|
|
// Wait for the messages to be handled.
|
|
{
|
|
MonitorLocker ml(handler.monitor());
|
|
while (handler.message_count() < 11) {
|
|
ml.Wait();
|
|
}
|
|
Dart_Port* handler_ports = handler.port_buffer();
|
|
EXPECT_EQ(11, handler.message_count());
|
|
EXPECT(handler.start_called());
|
|
EXPECT(!handler.end_called());
|
|
EXPECT_EQ(port, handler_ports[0]);
|
|
for (int i = 1; i < 11; i++) {
|
|
EXPECT_EQ(ports[i - 1], handler_ports[i]);
|
|
}
|
|
handler_peer.decrement_live_ports();
|
|
EXPECT(!handler.HasLivePorts());
|
|
}
|
|
|
|
// Must join the thread or the VM shutdown is racing with any VM state the
|
|
// thread touched.
|
|
ASSERT(info.join_id != OSThread::kInvalidThreadJoinId);
|
|
OSThread::Join(info.join_id);
|
|
}
|
|
|
|
} // namespace dart
|