Add support for native ports in the vm.

Dart_NewNativePort creates a port associated with a C handler
function.  When messages come in on this port, they are forwarded to
the C function for processing.

To support this, refactored PortMap so that it operates on a new
MessageHandler type instead of directly on Isolates.

For now, native ports have a dedicated single thread.  Eventually we
will back native ports (and possibly Isolates as well) by a shared
thread pool.
Review URL: https://chromiumcodereview.appspot.com//9169063

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@3804 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
turnidge@google.com 2012-02-01 18:53:40 +00:00
parent b2725b7068
commit 1f5364cd3b
17 changed files with 603 additions and 209 deletions

View file

@ -428,6 +428,12 @@ DART_EXPORT void Dart_InterruptIsolate(Dart_Isolate isolate);
*/
typedef int64_t Dart_Port;
/**
* kIllegalPort is a port number guaranteed never to be associated
* with a valid port.
*/
const Dart_Port kIllegalPort = 0;
/**
* A message notification callback.
*
@ -521,6 +527,46 @@ DART_EXPORT bool Dart_PostIntArray(Dart_Port port_id,
*/
DART_EXPORT bool Dart_Post(Dart_Port port_id, Dart_Handle object);
/**
* A native message handler.
*
* This handler is associated with a native port by calling
* Dart_NewNativePort.
*/
typedef void (*Dart_NativeMessageHandler)(Dart_Port dest_port_id,
Dart_Port reply_port_id,
uint8_t* data);
// TODO(turnidge): Make this function take more appropriate arguments.
/**
* Creates a new native port. When messages are received on this
* native port, then they will be dispatched to the provided native
* message handler.
*
* \param name The name of this port in debugging messages.
* \param handler The C handler to run when messages arrive on the port.
* \param handle_concurrently Is it okay to process requests on this
* native port concurrently?
*
* \return If successful, returns the port id for the native port. In
* case of error, returns kIllegalPort.
*/
DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
Dart_NativeMessageHandler handler,
bool handle_concurrently);
// TODO(turnidge): Currently handle_concurrently is ignored.
/**
* Closes the native port with the given id.
*
* The port must have been allocated by a call to Dart_NewNativePort.
*
* \param native_port_id The id of the native port to close.
*
* \return Returns true if the port was closed successfully.
*/
DART_EXPORT bool Dart_CloseNativePort(Dart_Port native_port_id);
/**
* Returns a new SendPort with the provided port id.
*/

View file

@ -10,7 +10,7 @@
#include "vm/dart_entry.h"
#include "vm/exceptions.h"
#include "vm/longjump.h"
#include "vm/message_queue.h"
#include "vm/message.h"
#include "vm/object.h"
#include "vm/object_store.h"
#include "vm/port.h"
@ -344,7 +344,8 @@ DEFINE_NATIVE_ENTRY(IsolateNatives_start, 2) {
DEFINE_NATIVE_ENTRY(ReceivePortImpl_factory, 1) {
ASSERT(AbstractTypeArguments::CheckedHandle(arguments->At(0)).IsNull());
intptr_t port_id = PortMap::CreatePort();
intptr_t port_id =
PortMap::CreatePort(arguments->isolate()->message_handler());
const Instance& port = Instance::Handle(ReceivePortCreate(port_id));
arguments->SetReturn(port);
}

View file

@ -10,16 +10,25 @@
namespace dart {
StackResource::StackResource(Isolate* isolate) : isolate_(isolate) {
previous_ = isolate->top_resource();
isolate->set_top_resource(this);
StackResource::StackResource(Isolate* isolate)
: isolate_(isolate), previous_(NULL) {
// We can only have longjumps and exceptions when there is a current
// isolate. If there is no current isolate, we don't need to
// protect this case.
if (isolate) {
previous_ = isolate->top_resource();
isolate->set_top_resource(this);
}
}
StackResource::~StackResource() {
StackResource* top = isolate()->top_resource();
ASSERT(top == this);
isolate()->set_top_resource(previous_);
if (isolate()) {
StackResource* top = isolate()->top_resource();
ASSERT(top == this);
isolate()->set_top_resource(previous_);
}
ASSERT(Isolate::Current() == isolate());
}
ZoneAllocated::~ZoneAllocated() {

View file

@ -15,8 +15,9 @@
#include "vm/exceptions.h"
#include "vm/growable_array.h"
#include "vm/longjump.h"
#include "vm/message_queue.h"
#include "vm/message.h"
#include "vm/native_entry.h"
#include "vm/native_message_handler.h"
#include "vm/object.h"
#include "vm/object_store.h"
#include "vm/port.h"
@ -676,7 +677,7 @@ DART_EXPORT Dart_Handle Dart_HandleMessage() {
Message::Priority priority = Message::kNormalPriority;
do {
DARTSCOPE(isolate);
message = isolate->message_queue()->DequeueNoWait();
message = isolate->message_handler()->queue()->DequeueNoWait();
if (message == NULL) {
break;
}
@ -705,7 +706,7 @@ DART_EXPORT Dart_Handle Dart_HandleMessage() {
DART_EXPORT bool Dart_HasLivePorts() {
Isolate* isolate = Isolate::Current();
ASSERT(isolate);
return isolate->live_ports() > 0;
return isolate->message_handler()->HasLivePorts();
}
@ -743,6 +744,37 @@ DART_EXPORT bool Dart_Post(Dart_Port port_id, Dart_Handle handle) {
}
DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
Dart_NativeMessageHandler handler,
bool handle_concurrently) {
if (name == NULL) {
name = "<UnnamedNativePort>";
}
if (handler == NULL) {
OS::PrintErr("%s expects argument 'handler' to be non-null.", CURRENT_FUNC);
return kIllegalPort;
}
// Start the native port without a current isolate.
IsolateSaver saver(Isolate::Current());
Isolate::SetCurrent(NULL);
NativeMessageHandler* nmh = new NativeMessageHandler(name, handler);
Dart_Port port_id = PortMap::CreatePort(nmh);
nmh->StartWorker();
return port_id;
}
DART_EXPORT bool Dart_CloseNativePort(Dart_Port native_port_id) {
// Close the native port without a current isolate.
IsolateSaver saver(Isolate::Current());
Isolate::SetCurrent(NULL);
// TODO(turnidge): Check that the port is native before trying to close.
return PortMap::ClosePort(native_port_id);
}
DART_EXPORT Dart_Handle Dart_NewSendPort(Dart_Port port_id) {
Isolate* isolate = Isolate::Current();
DARTSCOPE(isolate);

View file

@ -123,6 +123,20 @@ class Api : AllStatic {
static uword Reallocate(uword ptr, intptr_t old_size, intptr_t new_size);
};
class IsolateSaver {
public:
explicit IsolateSaver(Isolate* current_isolate)
: saved_isolate_(current_isolate) {
}
~IsolateSaver() {
Isolate::SetCurrent(saved_isolate_);
}
private:
Isolate* saved_isolate_;
DISALLOW_COPY_AND_ASSIGN(IsolateSaver);
};
} // namespace dart.
#endif // VM_DART_API_IMPL_H_

View file

@ -2801,6 +2801,85 @@ TEST_CASE(ImportLibrary5) {
}
void NewNativePort_send123(Dart_Port dest_port_id,
Dart_Port reply_port_id,
uint8_t* data) {
intptr_t response = 123;
Dart_PostIntArray(reply_port_id, 1, &response);
}
void NewNativePort_send321(Dart_Port dest_port_id,
Dart_Port reply_port_id,
uint8_t* data) {
intptr_t response = 321;
Dart_PostIntArray(reply_port_id, 1, &response);
}
UNIT_TEST_CASE(NewNativePort) {
// Create a port with a bogus handler.
Dart_Port error_port = Dart_NewNativePort("Foo", NULL, true);
EXPECT_EQ(kIllegalPort, error_port);
// Create the port w/o a current isolate, just to make sure that works.
Dart_Port port_id1 =
Dart_NewNativePort("Port123", NewNativePort_send123, true);
TestIsolateScope __test_isolate__;
const char* kScriptChars =
"void callPort(SendPort port) {\n"
" port.call(null).receive((message, replyTo) {\n"
" throw new Exception(message[0]);\n"
" });\n"
"}\n";
Dart_Handle lib = TestCase::LoadTestScript(kScriptChars, NULL);
Dart_EnterScope();
// Create a port w/ a current isolate, to make sure that works too.
Dart_Port port_id2 =
Dart_NewNativePort("Port321", NewNativePort_send321, true);
Dart_Handle send_port1 = Dart_NewSendPort(port_id1);
EXPECT_VALID(send_port1);
Dart_Handle send_port2 = Dart_NewSendPort(port_id2);
EXPECT_VALID(send_port2);
// Test first port.
Dart_Handle dart_args[1];
dart_args[0] = send_port1;
Dart_Handle result = Dart_InvokeStatic(lib,
Dart_NewString(""),
Dart_NewString("callPort"),
1,
dart_args);
EXPECT_VALID(result);
result = Dart_RunLoop();
EXPECT(Dart_IsError(result));
EXPECT(Dart_ErrorHasException(result));
EXPECT_SUBSTRING("Exception: 123\n", Dart_GetError(result));
// result second port.
dart_args[0] = send_port2;
result = Dart_InvokeStatic(lib,
Dart_NewString(""),
Dart_NewString("callPort"),
1,
dart_args);
EXPECT_VALID(result);
result = Dart_RunLoop();
EXPECT(Dart_IsError(result));
EXPECT(Dart_ErrorHasException(result));
EXPECT_SUBSTRING("Exception: 321\n", Dart_GetError(result));
Dart_ExitScope();
// Delete the native ports.
EXPECT(Dart_CloseNativePort(port_id1));
EXPECT(Dart_CloseNativePort(port_id2));
}
static bool RunLoopTestCallback(const char* name_prefix,
void* data, char** error) {
const char* kScriptChars =

View file

@ -14,7 +14,7 @@
#include "vm/debugger.h"
#include "vm/debuginfo.h"
#include "vm/heap.h"
#include "vm/message_queue.h"
#include "vm/message.h"
#include "vm/object_store.h"
#include "vm/parser.h"
#include "vm/port.h"
@ -34,13 +34,61 @@ DEFINE_FLAG(bool, trace_isolates, false,
DECLARE_FLAG(bool, generate_gdb_symbols);
class IsolateMessageHandler : public MessageHandler {
public:
explicit IsolateMessageHandler(Isolate* isolate);
~IsolateMessageHandler();
const char* name() const;
void MessageNotify(Message::Priority priority);
#if defined(DEBUG)
// Check that it is safe to access this handler.
void CheckAccess();
#endif
private:
Isolate* isolate_;
};
IsolateMessageHandler::IsolateMessageHandler(Isolate* isolate)
: isolate_(isolate) {
}
IsolateMessageHandler::~IsolateMessageHandler() {
}
const char* IsolateMessageHandler::name() const {
return isolate_->name();
}
void IsolateMessageHandler::MessageNotify(Message::Priority priority) {
if (priority >= Message::kOOBPriority) {
// Handle out of band messages even if the isolate is busy.
// isolate_->ScheduleInterrupts(Isolate::kMessageInterrupt);
UNIMPLEMENTED();
}
Dart_MessageNotifyCallback callback = isolate_->message_notify_callback();
if (callback) {
// Allow the embedder to handle message notification.
(*callback)(Api::CastIsolate(isolate_));
}
}
#if defined(DEBUG)
void IsolateMessageHandler::CheckAccess() {
ASSERT(isolate_ == Isolate::Current());
}
#endif
Isolate::Isolate()
: store_buffer_(),
message_queue_(NULL),
message_notify_callback_(NULL),
name_(NULL),
num_ports_(0),
live_ports_(0),
main_port_(0),
heap_(NULL),
object_store_(NULL),
@ -72,7 +120,6 @@ Isolate::Isolate()
Isolate::~Isolate() {
delete [] name_;
delete message_queue_;
delete heap_;
delete object_store_;
// Do not delete stack resources: top_resource_ and current_zone_.
@ -83,48 +130,8 @@ Isolate::~Isolate() {
delete debugger_;
delete mutex_;
mutex_ = NULL; // Fail fast if interrupts are scheduled on a dead isolate.
}
void Isolate::PostMessage(Message* message) {
if (FLAG_trace_isolates) {
const char* source_name = "<native code>";
Isolate* source_isolate = Isolate::Current();
if (source_isolate) {
source_name = source_isolate->name();
}
OS::Print("[>] Posting message:\n"
"\tsource: %s\n"
"\treply_port: %lld\n"
"\tdest: %s\n"
"\tdest_port: %lld\n",
source_name, message->reply_port(), name(), message->dest_port());
}
Message::Priority priority = message->priority();
message_queue()->Enqueue(message);
message = NULL; // Do not access message. May have been deleted.
ASSERT(priority < Message::kOOBPriority);
if (priority >= Message::kOOBPriority) {
// Handle out of band messages even if the isolate is busy.
ScheduleInterrupts(Isolate::kMessageInterrupt);
}
Dart_MessageNotifyCallback callback = message_notify_callback();
if (callback) {
// Allow the embedder to handle message notification.
(*callback)(Api::CastIsolate(this));
}
}
void Isolate::ClosePort(Dart_Port port) {
message_queue()->Flush(port);
}
void Isolate::CloseAllPorts() {
message_queue()->FlushAll();
delete message_handler_;
message_handler_ = NULL; // Fail fast if we send messages to a dead isolate.
}
@ -136,10 +143,10 @@ Isolate* Isolate::Init(const char* name_prefix) {
// the current isolate.
SetCurrent(result);
// Set up the isolate message queue.
MessageQueue* queue = new MessageQueue();
ASSERT(queue != NULL);
result->set_message_queue(queue);
// Setup the isolate message handler.
MessageHandler* handler = new IsolateMessageHandler(result);
ASSERT(handler != NULL);
result->set_message_handler(handler);
// Setup the Dart API state.
ApiState* state = new ApiState();
@ -151,13 +158,13 @@ Isolate* Isolate::Init(const char* name_prefix) {
// TODO(5411455): Need to figure out how to set the stack limit for the
// main thread.
result->SetStackLimitFromCurrentTOS(reinterpret_cast<uword>(&result));
result->set_main_port(PortMap::CreatePort());
result->set_main_port(PortMap::CreatePort(result->message_handler()));
result->BuildName(name_prefix);
result->debugger_ = new Debugger();
result->debugger_->Initialize(result);
if (FLAG_trace_isolates) {
if (strcmp(name_prefix, "vm-isolate") != 0) {
if (name_prefix == NULL || strcmp(name_prefix, "vm-isolate") != 0) {
OS::Print("[+] Starting isolate:\n"
"\tisolate: %s\n", result->name());
}
@ -288,10 +295,11 @@ void Isolate::Shutdown() {
}
// Close all the ports owned by this isolate.
PortMap::ClosePorts();
PortMap::ClosePorts(message_handler());
delete message_queue();
set_message_queue(NULL);
// Fail fast if anybody tries to post any more messsages to this isolate.
delete message_handler();
set_message_handler(NULL);
// Dump all accumalated timer data for the isolate.
timer_list_.ReportTimers();
@ -353,13 +361,14 @@ static RawInstance* DeserializeMessage(void* data) {
RawObject* Isolate::StandardRunLoop() {
ASSERT(long_jump_base() != NULL);
ASSERT(message_notify_callback() == NULL);
ASSERT(message_handler() != NULL);
while (live_ports() > 0) {
while (message_handler()->HasLivePorts()) {
ASSERT(this == Isolate::Current());
Zone zone(this);
HandleScope handle_scope(this);
Message* message = message_queue()->Dequeue(0);
Message* message = message_handler()->queue()->Dequeue(0);
if (message != NULL) {
if (message->priority() >= Message::kOOBPriority) {
// TODO(turnidge): Out of band messages will not go through the

View file

@ -21,8 +21,7 @@ class HandleScope;
class HandleVisitor;
class Heap;
class LongJump;
class Message;
class MessageQueue;
class MessageHandler;
class Mutex;
class ObjectPointerVisitor;
class ObjectStore;
@ -57,34 +56,8 @@ class Isolate {
message_notify_callback_ = value;
}
MessageQueue* message_queue() const { return message_queue_; }
void set_message_queue(MessageQueue* value) { message_queue_ = value; }
const char* name() const { return name_; }
// The number of ports is only correct when read from the current
// isolate. This value is not protected from being updated
// concurrently.
intptr_t num_ports() const { return num_ports_; }
void increment_num_ports() {
ASSERT(this == Isolate::Current());
num_ports_++;
}
void decrement_num_ports() {
ASSERT(this == Isolate::Current());
num_ports_--;
}
intptr_t live_ports() const { return live_ports_; }
void increment_live_ports() {
ASSERT(this == Isolate::Current());
live_ports_++;
}
void decrement_live_ports() {
ASSERT(this == Isolate::Current());
live_ports_--;
}
Dart_Port main_port() { return main_port_; }
void set_main_port(Dart_Port port) {
ASSERT(main_port_ == 0); // Only set main port once.
@ -241,9 +214,8 @@ class Isolate {
void ScheduleInterrupts(uword interrupt_bits);
uword GetAndClearInterrupts();
void PostMessage(Message* message);
void ClosePort(Dart_Port port);
void CloseAllPorts();
MessageHandler* message_handler() const { return message_handler_; }
void set_message_handler(MessageHandler* value) { message_handler_ = value; }
// Returns null on success, unhandled exception on failure.
RawObject* StandardRunLoop();
@ -271,11 +243,8 @@ class Isolate {
static const uword kDefaultStackSize = (1 * MB);
StoreBufferBlock store_buffer_;
MessageQueue* message_queue_;
Dart_MessageNotifyCallback message_notify_callback_;
char* name_;
intptr_t num_ports_;
intptr_t live_ports_;
Dart_Port main_port_;
Heap* heap_;
ObjectStore* object_store_;
@ -302,6 +271,7 @@ class Isolate {
Mutex* mutex_; // protects stack_limit_ and saved_stack_limit_.
uword stack_limit_;
uword saved_stack_limit_;
MessageHandler* message_handler_;
static Dart_IsolateCreateCallback create_callback_;
static Dart_IsolateInterruptCallback interrupt_callback_;

View file

@ -2,10 +2,76 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#include "vm/message_queue.h"
#include "vm/message.h"
namespace dart {
DECLARE_FLAG(bool, trace_isolates);
MessageHandler::MessageHandler()
: live_ports_(0),
queue_(new MessageQueue()) {
ASSERT(queue_ != NULL);
}
MessageHandler::~MessageHandler() {
delete queue_;
}
const char* MessageHandler::name() const {
return "<unnamed>";
}
#if defined(DEBUG)
void MessageHandler::CheckAccess() {
// By default there is no checking.
}
#endif
void MessageHandler::MessageNotify(Message::Priority priority) {
// By default, there is no custom message notification.
}
void MessageHandler::PostMessage(Message* message) {
if (FLAG_trace_isolates) {
const char* source_name = "<native code>";
Isolate* source_isolate = Isolate::Current();
if (source_isolate) {
source_name = source_isolate->name();
}
OS::Print("[>] Posting message:\n"
"\tsource: %s\n"
"\treply_port: %lld\n"
"\tdest: %s\n"
"\tdest_port: %lld\n",
source_name, message->reply_port(), name(), message->dest_port());
}
Message::Priority priority = message->priority();
queue()->Enqueue(message);
message = NULL; // Do not access message. May have been deleted.
// Invoke any custom message notification.
MessageNotify(priority);
}
void MessageHandler::ClosePort(Dart_Port port) {
queue()->Flush(port);
}
void MessageHandler::CloseAllPorts() {
queue()->FlushAll();
}
MessageQueue::MessageQueue() {
for (int p = Message::kFirstPriority; p < Message::kNumPriorities; p++) {
head_[p] = NULL;
@ -25,11 +91,7 @@ MessageQueue::~MessageQueue() {
void MessageQueue::Enqueue(Message* msg) {
// TODO(turnidge): Add a scoped locker for monitors which is not a
// stack resource. This would probably be useful in the platform
// headers.
monitor_.Enter();
MonitorLocker ml(&monitor_);
Message::Priority p = msg->priority();
// Make sure messages are not reused.
ASSERT(msg->next_ == NULL);
@ -46,8 +108,6 @@ void MessageQueue::Enqueue(Message* msg) {
tail_[p]->next_ = msg;
tail_[p] = msg;
}
monitor_.Exit();
}
Message* MessageQueue::DequeueNoWait() {
@ -78,11 +138,10 @@ Message* MessageQueue::DequeueNoWaitHoldsLock() {
Message* MessageQueue::Dequeue(int64_t millis) {
ASSERT(millis >= 0);
MonitorLocker ml(&monitor_);
Message* result = DequeueNoWaitHoldsLock();
if (result == NULL) {
// No message available at any priority.
ml.Wait(millis);
monitor_.Wait(millis);
result = DequeueNoWaitHoldsLock();
}
return result;

View file

@ -2,12 +2,14 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#ifndef VM_MESSAGE_QUEUE_H_
#define VM_MESSAGE_QUEUE_H_
#ifndef VM_MESSAGE_H_
#define VM_MESSAGE_H_
#include "include/dart_api.h"
#include "vm/thread.h"
// Duplicated from dart_api.h to avoid including the whole header.
typedef int64_t Dart_Port;
namespace dart {
class Message {
@ -22,7 +24,7 @@ class Message {
} Priority;
// A port number which is never used.
static const int kIllegalPort = 0;
static const Dart_Port kIllegalPort = 0;
// A new message to be sent between two isolates. The data handed to this
// message will be disposed by calling free() once the message object is
@ -57,7 +59,6 @@ class Message {
DISALLOW_COPY_AND_ASSIGN(Message);
};
// There is a message queue per isolate.
class MessageQueue {
public:
@ -92,6 +93,60 @@ class MessageQueue {
DISALLOW_COPY_AND_ASSIGN(MessageQueue);
};
// A MessageHandler is an entity capable of accepting messages.
class MessageHandler {
protected:
MessageHandler();
// Allows subclasses to provide custom message notification.
virtual void MessageNotify(Message::Priority priority);
public:
virtual ~MessageHandler();
// Allow subclasses to provide a handler name.
virtual const char* name() const;
#if defined(DEBUG)
// Check that it is safe to access this message handler.
//
// For example, if this MessageHandler is an isolate, then it is
// only safe to access it when the MessageHandler is the current
// isolate.
virtual void CheckAccess();
#endif
void PostMessage(Message* message);
void ClosePort(Dart_Port port);
void CloseAllPorts();
// A message handler tracks how many live ports it has.
bool HasLivePorts() const { return live_ports_ > 0; }
void increment_live_ports() {
#if defined(DEBUG)
CheckAccess();
#endif
live_ports_++;
}
void decrement_live_ports() {
#if defined(DEBUG)
CheckAccess();
#endif
live_ports_--;
}
// Returns true if the handler is owned by the PortMap.
//
// This is used to delete handlers when their last live port is closed.
virtual bool OwnedByPortMap() const { return false; }
MessageQueue* queue() const { return queue_; }
private:
intptr_t live_ports_;
MessageQueue* queue_;
};
} // namespace dart
#endif // VM_MESSAGE_QUEUE_H_
#endif // VM_MESSAGE_H_

View file

@ -3,7 +3,7 @@
// BSD-style license that can be found in the LICENSE file.
#include "platform/assert.h"
#include "vm/message_queue.h"
#include "vm/message.h"
#include "vm/unit_test.h"
namespace dart {

View file

@ -0,0 +1,67 @@
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#include "vm/native_message_handler.h"
#include "vm/isolate.h"
#include "vm/message.h"
#include "vm/thread.h"
namespace dart {
NativeMessageHandler::NativeMessageHandler(const char* name,
Dart_NativeMessageHandler func)
: name_(strdup(name)),
func_(func) {
// A NativeMessageHandler always has one live port.
increment_live_ports();
}
NativeMessageHandler::~NativeMessageHandler() {
free(name_);
}
#if defined(DEBUG)
void NativeMessageHandler::CheckAccess() {
ASSERT(Isolate::Current() == NULL);
}
#endif
static void RunWorker(uword parameter) {
NativeMessageHandler* handler =
reinterpret_cast<NativeMessageHandler*>(parameter);
#if defined(DEBUG)
handler->CheckAccess();
#endif
while (handler->HasLivePorts()) {
Message* message = handler->queue()->Dequeue(0);
if (message != NULL) {
if (message->priority() >= Message::kOOBPriority) {
// TODO(turnidge): Out of band messages will not go through
// the regular message handler. Instead they will be
// dispatched to special vm code. Implement.
UNIMPLEMENTED();
}
// TODO(sgjesse): Once CMessageReader::ReadObject is committed,
// use that here and pass the resulting data object to the
// handler instead.
(*handler->func())(message->dest_port(),
message->reply_port(),
message->data());
delete message;
}
}
}
void NativeMessageHandler::StartWorker() {
new Thread(RunWorker, reinterpret_cast<uword>(this));
}
} // namespace dart

View file

@ -0,0 +1,47 @@
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#ifndef VM_NATIVE_MESSAGE_HANDLER_H_
#define VM_NATIVE_MESSAGE_HANDLER_H_
#include "include/dart_api.h"
#include "vm/message.h"
namespace dart {
// A NativeMessageHandler accepts messages and dispatches them to
// native C handlers.
class NativeMessageHandler : public MessageHandler {
public:
NativeMessageHandler(const char* name, Dart_NativeMessageHandler func);
~NativeMessageHandler();
const char* name() const { return name_; }
Dart_NativeMessageHandler func() const { return func_; }
#if defined(DEBUG)
// Check that it is safe to access this handler.
void CheckAccess();
#endif
// Delete this handlers when its last live port is closed.
virtual bool OwnedByPortMap() const { return true; }
// Start a worker thread which will service messages for this handler.
//
// TODO(turnidge): Instead of starting a worker for each
// NativeMessageHandler, we should instead use a shared thread pool
// which services a queue of ready MessageHandlers. If we implement
// this correctly, the same pool will work for
// IsolateMessageHandlers as well.
void StartWorker();
private:
char* name_;
Dart_NativeMessageHandler func_;
};
} // namespace dart
#endif // VM_NATIVE_MESSAGE_HANDLER_H_

View file

@ -7,7 +7,7 @@
#include "platform/utils.h"
#include "vm/dart_api_impl.h"
#include "vm/isolate.h"
#include "vm/message_queue.h"
#include "vm/message.h"
#include "vm/thread.h"
namespace dart {
@ -15,13 +15,11 @@ namespace dart {
DECLARE_FLAG(bool, trace_isolates);
Mutex* PortMap::mutex_ = NULL;
PortMap::Entry* PortMap::map_ = NULL;
Isolate* PortMap::deleted_entry_ = reinterpret_cast<Isolate*>(1);
MessageHandler* PortMap::deleted_entry_ = reinterpret_cast<MessageHandler*>(1);
intptr_t PortMap::capacity_ = 0;
intptr_t PortMap::used_ = 0;
intptr_t PortMap::deleted_ = 0;
Dart_Port PortMap::next_port_ = 7111;
@ -29,7 +27,7 @@ intptr_t PortMap::FindPort(Dart_Port port) {
intptr_t index = port % capacity_;
intptr_t start_index = index;
Entry entry = map_[index];
while (entry.isolate != NULL) {
while (entry.handler != NULL) {
if (entry.port == port) {
return index;
}
@ -83,7 +81,7 @@ void PortMap::SetLive(Dart_Port port) {
intptr_t index = FindPort(port);
ASSERT(index >= 0);
map_[index].live = true;
map_[index].isolate->increment_live_ports();
map_[index].handler->increment_live_ports();
}
@ -100,13 +98,16 @@ void PortMap::MaintainInvariants() {
}
Dart_Port PortMap::CreatePort() {
Isolate* isolate = Isolate::Current();
Dart_Port PortMap::CreatePort(MessageHandler* handler) {
ASSERT(handler != NULL);
MutexLocker ml(mutex_);
#if defined(DEBUG)
handler->CheckAccess();
#endif
Entry entry;
entry.port = AllocatePort();
entry.isolate = isolate;
entry.handler = handler;
entry.live = false;
// Search for the first unused slot. Make use of the knowledge that here is
@ -124,14 +125,13 @@ Dart_Port PortMap::CreatePort() {
ASSERT(index >= 0);
ASSERT(index < capacity_);
ASSERT(map_[index].port == 0);
ASSERT((map_[index].isolate == NULL) ||
(map_[index].isolate == deleted_entry_));
if (map_[index].isolate == deleted_entry_) {
ASSERT((map_[index].handler == NULL) ||
(map_[index].handler == deleted_entry_));
if (map_[index].handler == deleted_entry_) {
// Consuming a deleted entry.
deleted_--;
}
map_[index] = entry;
isolate->increment_num_ports();
// Increment number of used slots and grow if necessary.
used_++;
@ -141,74 +141,78 @@ Dart_Port PortMap::CreatePort() {
}
void PortMap::ClosePort(Dart_Port port) {
Isolate* isolate = Isolate::Current();
bool PortMap::ClosePort(Dart_Port port) {
MessageHandler* handler = NULL;
{
MutexLocker ml(mutex_);
intptr_t index = FindPort(port);
if (index < 0) {
return;
return false;
}
ASSERT(index < capacity_);
ASSERT(map_[index].port != 0);
ASSERT(map_[index].isolate == isolate);
ASSERT(map_[index].handler != deleted_entry_);
ASSERT(map_[index].handler != NULL);
handler = map_[index].handler;
#if defined(DEBUG)
handler->CheckAccess();
#endif
// Before releasing the lock mark the slot in the map as deleted. This makes
// it possible to release the port map lock before flushing all of its
// pending messages below.
map_[index].port = 0;
map_[index].isolate = deleted_entry_;
isolate->decrement_num_ports();
map_[index].handler = deleted_entry_;
if (map_[index].live) {
isolate->decrement_live_ports();
handler->decrement_live_ports();
}
used_--;
deleted_++;
MaintainInvariants();
}
isolate->ClosePort(port);
handler->ClosePort(port);
if (!handler->HasLivePorts() && handler->OwnedByPortMap()) {
delete handler;
}
return true;
}
void PortMap::ClosePorts() {
Isolate* isolate = Isolate::Current();
void PortMap::ClosePorts(MessageHandler* handler) {
{
MutexLocker ml(mutex_);
for (intptr_t i = 0; i < capacity_; i++) {
if (map_[i].isolate == isolate) {
if (map_[i].handler == handler) {
// Mark the slot as deleted.
map_[i].port = 0;
map_[i].isolate = deleted_entry_;
isolate->decrement_num_ports();
map_[i].handler = deleted_entry_;
if (map_[i].live) {
handler->decrement_live_ports();
}
used_--;
deleted_++;
}
}
MaintainInvariants();
}
isolate->CloseAllPorts();
handler->CloseAllPorts();
}
bool PortMap::PostMessage(Message* message) {
// TODO(turnidge): Add a scoped locker for mutexes which is not a
// stack resource. This would probably be useful in the platform
// headers.
mutex_->Lock();
MutexLocker ml(mutex_);
intptr_t index = FindPort(message->dest_port());
if (index < 0) {
free(message);
mutex_->Unlock();
return false;
}
ASSERT(index >= 0);
ASSERT(index < capacity_);
Isolate* isolate = map_[index].isolate;
MessageHandler* handler = map_[index].handler;
ASSERT(map_[index].port != 0);
ASSERT((isolate != NULL) && (isolate != deleted_entry_));
isolate->PostMessage(message);
mutex_->Unlock();
ASSERT((handler != NULL) && (handler != deleted_entry_));
handler->PostMessage(message);
return true;
}
@ -226,5 +230,4 @@ void PortMap::InitOnce() {
deleted_ = 0;
}
} // namespace dart

View file

@ -13,23 +13,26 @@ namespace dart {
class Isolate;
class Message;
class MessageHandler;
class Mutex;
class PortMapTestPeer;
class PortMap: public AllStatic {
public:
// Allocate a port in the current isolate and return its VM-global id.
static Dart_Port CreatePort();
// Allocate a port for the provided handler and return its VM-global id.
static Dart_Port CreatePort(MessageHandler* handler);
// Indicates that a port has had a ReceivePort created for it at the
// dart language level. The port remains live until it is closed.
static void SetLive(Dart_Port id);
// Close the port with id. All pending messages will be dropped.
static void ClosePort(Dart_Port id);
//
// Returns true if the port is successfully closed.
static bool ClosePort(Dart_Port id);
// Close all the ports of the current isolate.
static void ClosePorts();
// Close all the ports for the provided handler.
static void ClosePorts(MessageHandler* handler);
// Enqueues the message in the port with id. Returns false if the port is not
// active any longer.
@ -42,12 +45,13 @@ class PortMap: public AllStatic {
private:
friend class dart::PortMapTestPeer;
// Mapping between port numbers and isolates.
// Free entries have id == 0 and isolate == NULL. Deleted entries have id == 0
// and isolate == deleted_entry_.
// Mapping between port numbers and handlers.
//
// Free entries have id == 0 and handler == NULL. Deleted entries
// have id == 0 and handler == deleted_entry_.
typedef struct {
Dart_Port port;
Isolate* isolate;
MessageHandler* handler;
bool live;
} Entry;
@ -67,7 +71,7 @@ class PortMap: public AllStatic {
// Hashmap of ports.
static Entry* map_;
static Isolate* deleted_entry_;
static MessageHandler* deleted_entry_;
static intptr_t capacity_;
static intptr_t used_;
static intptr_t deleted_;

View file

@ -3,7 +3,7 @@
// BSD-style license that can be found in the LICENSE file.
#include "platform/assert.h"
#include "vm/message_queue.h"
#include "vm/message.h"
#include "vm/os.h"
#include "vm/port.h"
#include "vm/unit_test.h"
@ -29,22 +29,21 @@ class PortMapTestPeer {
};
// Intercept the post message callback and just store a copy of the message.
static int notify_count = 0;
static void MyMessageNotifyCallback(Dart_Isolate dest_isolate) {
notify_count++;
}
class TestMessageHandler : public MessageHandler {
public:
TestMessageHandler() : notify_count(0) {}
void MessageNotify(Message::Priority priority) {
notify_count++;
}
static void InitPortMapTest() {
Dart_SetMessageNotifyCallback(&MyMessageNotifyCallback);
notify_count = 0;
}
int notify_count;
};
TEST_CASE(PortMap_CreateAndCloseOnePort) {
InitPortMapTest();
intptr_t port = PortMap::CreatePort();
TestMessageHandler handler;
intptr_t port = PortMap::CreatePort(&handler);
EXPECT_NE(0, port);
EXPECT(PortMapTestPeer::IsActivePort(port));
@ -54,9 +53,9 @@ TEST_CASE(PortMap_CreateAndCloseOnePort) {
TEST_CASE(PortMap_CreateAndCloseTwoPorts) {
InitPortMapTest();
Dart_Port port1 = PortMap::CreatePort();
Dart_Port port2 = PortMap::CreatePort();
TestMessageHandler handler;
Dart_Port port1 = PortMap::CreatePort(&handler);
Dart_Port port2 = PortMap::CreatePort(&handler);
EXPECT(PortMapTestPeer::IsActivePort(port1));
EXPECT(PortMapTestPeer::IsActivePort(port2));
@ -74,23 +73,23 @@ TEST_CASE(PortMap_CreateAndCloseTwoPorts) {
TEST_CASE(PortMap_ClosePorts) {
InitPortMapTest();
Dart_Port port1 = PortMap::CreatePort();
Dart_Port port2 = PortMap::CreatePort();
TestMessageHandler handler;
Dart_Port port1 = PortMap::CreatePort(&handler);
Dart_Port port2 = PortMap::CreatePort(&handler);
EXPECT(PortMapTestPeer::IsActivePort(port1));
EXPECT(PortMapTestPeer::IsActivePort(port2));
// Close all ports at once.
PortMap::ClosePorts();
PortMap::ClosePorts(&handler);
EXPECT(!PortMapTestPeer::IsActivePort(port1));
EXPECT(!PortMapTestPeer::IsActivePort(port2));
}
TEST_CASE(PortMap_CreateManyPorts) {
InitPortMapTest();
TestMessageHandler handler;
for (int i = 0; i < 32; i++) {
Dart_Port port = PortMap::CreatePort();
Dart_Port port = PortMap::CreatePort(&handler);
EXPECT(PortMapTestPeer::IsActivePort(port));
PortMap::ClosePort(port);
EXPECT(!PortMapTestPeer::IsActivePort(port));
@ -99,8 +98,8 @@ TEST_CASE(PortMap_CreateManyPorts) {
TEST_CASE(PortMap_SetLive) {
InitPortMapTest();
intptr_t port = PortMap::CreatePort();
TestMessageHandler handler;
intptr_t port = PortMap::CreatePort(&handler);
EXPECT_NE(0, port);
EXPECT(PortMapTestPeer::IsActivePort(port));
EXPECT(!PortMapTestPeer::IsLivePort(port));
@ -116,26 +115,24 @@ TEST_CASE(PortMap_SetLive) {
TEST_CASE(PortMap_PostMessage) {
InitPortMapTest();
Dart_Port port = PortMap::CreatePort();
TestMessageHandler handler;
Dart_Port port = PortMap::CreatePort(&handler);
EXPECT_EQ(0, handler.notify_count);
EXPECT(PortMap::PostMessage(new Message(
port, 0, reinterpret_cast<uint8_t*>(strdup("msg")),
Message::kNormalPriority)));
// Check that the message notify callback was called.
EXPECT_EQ(1, notify_count);
PortMap::ClosePorts();
EXPECT_EQ(1, handler.notify_count);
PortMap::ClosePorts(&handler);
}
TEST_CASE(PortMap_PostMessageInvalidPort) {
InitPortMapTest();
EXPECT(!PortMap::PostMessage(new Message(
0, 0, reinterpret_cast<uint8_t*>(strdup("msg")),
Message::kNormalPriority)));
// Check that the message notifycallback was not called.
EXPECT_STREQ(0, notify_count);
}
@ -157,19 +154,19 @@ intptr_t GetIntData(uint8_t* data) {
static Message* NextMessage() {
Isolate* isolate = Isolate::Current();
Message* result = isolate->message_queue()->Dequeue(0);
Message* result = isolate->message_handler()->queue()->Dequeue(0);
return result;
}
void ThreadedPort_start(uword parameter) {
// We only need an isolate here because the MutexLocker in
// PortMap::CreatePort expects it, we don't need to initialize
// the isolate as it does not run any dart code.
Dart::CreateIsolate(NULL);
// TODO(turnidge): We only use the isolate to get access to its
// message handler. I should rewrite this test to use a
// TestMessageHandler instead.
Isolate* isolate = Dart::CreateIsolate(NULL);
intptr_t remote = parameter;
intptr_t local = PortMap::CreatePort();
intptr_t local = PortMap::CreatePort(isolate->message_handler());
PortMap::PostMessage(new Message(
remote, 0, AllocIntData(local), Message::kNormalPriority));
@ -194,7 +191,7 @@ void ThreadedPort_start(uword parameter) {
TEST_CASE(ThreadedPort) {
intptr_t local = PortMap::CreatePort();
intptr_t local = PortMap::CreatePort(Isolate::Current()->message_handler());
Thread* thr = new Thread(ThreadedPort_start, local);
EXPECT(thr != NULL);

View file

@ -156,15 +156,17 @@
'memory_region.cc',
'memory_region.h',
'memory_region_test.cc',
'message_queue.cc',
'message_queue.h',
'message_queue_test.cc',
'message.cc',
'message.h',
'message_test.cc',
'native_arguments.cc',
'native_arguments.h',
'native_entry.cc',
'native_entry.h',
'native_entry_test.cc',
'native_entry_test.h',
'native_message_handler.cc',
'native_message_handler.h',
'object.cc',
'object.h',
'object_test.cc',