Implement mac support for having multiple Dart_Port's registered on one OS socket

R=sgjesse@google.com

Review URL: https://codereview.chromium.org//908183002

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@43632 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
kustermann@google.com 2015-02-10 10:15:33 +00:00
parent 74c0f65c89
commit 1c7ef3c4f2
5 changed files with 162 additions and 116 deletions

View file

@ -327,7 +327,6 @@ class DescriptorInfoSingleMixin : public DI {
virtual void ReturnTokens(Dart_Port port, int count) {
ASSERT(port_ == port);
ASSERT(tokens_ >= 0);
if (!disable_tokens_) {
tokens_ += count;
}
@ -548,7 +547,6 @@ class DescriptorInfoMultipleMixin : public DI {
PortEntry* pentry = reinterpret_cast<PortEntry*>(entry->value);
bool was_ready = pentry->IsReady();
ASSERT(pentry->token_count >= 0);
if (!disable_tokens_) {
pentry->token_count += count;
}

View file

@ -133,13 +133,10 @@ void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
RemoveFromEpollInstance(epoll_fd_, di);
} else if (old_mask == 0 && new_mask != 0) {
AddToEpollInstance(epoll_fd_, di);
} else if (old_mask != 0 && new_mask != 0) {
if (di->IsListeningSocket()) {
ASSERT(old_mask == new_mask);
} else {
RemoveFromEpollInstance(epoll_fd_, di);
AddToEpollInstance(epoll_fd_, di);
}
} else if (old_mask != 0 && new_mask != 0 && old_mask != new_mask) {
ASSERT(!di->IsListeningSocket());
RemoveFromEpollInstance(epoll_fd_, di);
AddToEpollInstance(epoll_fd_, di);
}
}

View file

@ -142,13 +142,10 @@ void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
RemoveFromEpollInstance(epoll_fd_, di);
} else if (old_mask == 0 && new_mask != 0) {
AddToEpollInstance(epoll_fd_, di);
} else if (old_mask != 0 && new_mask != 0) {
if (di->IsListeningSocket()) {
ASSERT(old_mask == new_mask);
} else {
RemoveFromEpollInstance(epoll_fd_, di);
AddToEpollInstance(epoll_fd_, di);
}
} else if (old_mask != 0 && new_mask != 0 && old_mask != new_mask) {
ASSERT(!di->IsListeningSocket());
RemoveFromEpollInstance(epoll_fd_, di);
AddToEpollInstance(epoll_fd_, di);
}
}

View file

@ -6,6 +6,7 @@
#if defined(TARGET_OS_MACOS)
#include "bin/eventhandler.h"
#include "bin/eventhandler_macos.h"
#include <errno.h> // NOLINT
#include <pthread.h> // NOLINT
@ -17,7 +18,9 @@
#include "bin/dartutils.h"
#include "bin/fdutils.h"
#include "bin/lockers.h"
#include "bin/log.h"
#include "bin/socket.h"
#include "bin/thread.h"
#include "bin/utils.h"
#include "platform/hashmap.h"
@ -27,60 +30,64 @@
namespace dart {
namespace bin {
bool SocketData::HasReadEvent() {
return (mask_ & (1 << kInEvent)) != 0;
bool DescriptorInfo::HasReadEvent() {
return (Mask() & (1 << kInEvent)) != 0;
}
bool SocketData::HasWriteEvent() {
return (mask_ & (1 << kOutEvent)) != 0;
bool DescriptorInfo::HasWriteEvent() {
return (Mask() & (1 << kOutEvent)) != 0;
}
// Unregister the file descriptor for a SocketData structure with kqueue.
static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) {
if (!sd->tracked_by_kqueue()) return;
static void RemoveFromKqueue(intptr_t kqueue_fd_, DescriptorInfo* di) {
if (!di->tracked_by_kqueue()) return;
static const intptr_t kMaxChanges = 2;
struct kevent events[kMaxChanges];
EV_SET(events, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(events, di->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
VOID_NO_RETRY_EXPECTED(kevent(kqueue_fd_, events, 1, NULL, 0, NULL));
EV_SET(events, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
EV_SET(events, di->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
VOID_NO_RETRY_EXPECTED(kevent(kqueue_fd_, events, 1, NULL, 0, NULL));
sd->set_tracked_by_kqueue(false);
di->set_tracked_by_kqueue(false);
}
// Update the kqueue registration for SocketData structure to reflect
// the events currently of interest.
static void AddToKqueue(intptr_t kqueue_fd_, SocketData* sd) {
ASSERT(!sd->tracked_by_kqueue());
static void AddToKqueue(intptr_t kqueue_fd_, DescriptorInfo* di) {
ASSERT(!di->tracked_by_kqueue());
static const intptr_t kMaxChanges = 2;
intptr_t changes = 0;
struct kevent events[kMaxChanges];
int flags = EV_ADD;
if (!sd->IsListeningSocket()) {
if (!di->IsListeningSocket()) {
flags |= EV_CLEAR;
}
ASSERT(di->HasReadEvent() || di->HasWriteEvent());
// Register or unregister READ filter if needed.
if (sd->HasReadEvent()) {
if (di->HasReadEvent()) {
EV_SET(events + changes,
sd->fd(),
di->fd(),
EVFILT_READ,
flags,
0,
0,
sd);
di);
++changes;
}
// Register or unregister WRITE filter if needed.
if (sd->HasWriteEvent()) {
if (di->HasWriteEvent()) {
EV_SET(events + changes,
sd->fd(),
di->fd(),
EVFILT_WRITE,
flags,
0,
0,
sd);
di);
++changes;
}
ASSERT(changes > 0);
@ -88,13 +95,15 @@ static void AddToKqueue(intptr_t kqueue_fd_, SocketData* sd) {
int status =
NO_RETRY_EXPECTED(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
if (status == -1) {
// TODO(dart:io): Verify that the dart end is handling this correctly.
// kQueue does not accept the file descriptor. It could be due to
// already closed file descriptor, or unuspported devices, such
// as /dev/null. In such case, mark the file descriptor as closed,
// so dart will handle it accordingly.
DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
di->NotifyAllDartPorts(1 << kCloseEvent);
} else {
sd->set_tracked_by_kqueue(true);
di->set_tracked_by_kqueue(true);
}
}
@ -136,21 +145,41 @@ EventHandlerImplementation::~EventHandlerImplementation() {
}
SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd,
bool is_listening) {
void EventHandlerImplementation::UpdateKQueueInstance(intptr_t old_mask,
DescriptorInfo *di) {
intptr_t new_mask = di->Mask();
if (old_mask != 0 && new_mask == 0) {
RemoveFromKqueue(kqueue_fd_, di);
} else if (old_mask == 0 && new_mask != 0) {
AddToKqueue(kqueue_fd_, di);
} else if (old_mask != 0 && new_mask != 0 && old_mask != new_mask) {
ASSERT(!di->IsListeningSocket());
RemoveFromKqueue(kqueue_fd_, di);
AddToKqueue(kqueue_fd_, di);
}
}
DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
intptr_t fd, bool is_listening) {
ASSERT(fd >= 0);
HashMap::Entry* entry = socket_map_.Lookup(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
ASSERT(entry != NULL);
SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
if (sd == NULL) {
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(entry->value);
if (di == NULL) {
// If there is no data in the hash map for this file descriptor a
// new SocketData for the file descriptor is inserted.
sd = new SocketData(fd, is_listening);
entry->value = sd;
// new DescriptorInfo for the file descriptor is inserted.
if (is_listening) {
di = new DescriptorInfoMultiple(fd);
} else {
di = new DescriptorInfoSingle(fd);
}
entry->value = di;
}
ASSERT(fd == sd->fd());
return sd;
ASSERT(fd == di->fd());
return di;
}
@ -188,38 +217,63 @@ void EventHandlerImplementation::HandleInterruptFd() {
} else {
ASSERT((msg[i].data & COMMAND_MASK) != 0);
SocketData* sd = GetSocketData(
DescriptorInfo* di = GetDescriptorInfo(
msg[i].id, IS_LISTENING_SOCKET(msg[i].data));
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for reading.
shutdown(sd->fd(), SHUT_RD);
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
} else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for writing.
shutdown(sd->fd(), SHUT_WR);
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
} else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
// Close the socket and free system resources.
RemoveFromKqueue(kqueue_fd_, sd);
intptr_t fd = sd->fd();
VOID_TEMP_FAILURE_RETRY(close(fd));
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
delete sd;
DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
} else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
int count = TOKEN_COUNT(msg[i].data);
for (int i = 0; i < count; i++) {
if (sd->ReturnToken()) {
AddToKqueue(kqueue_fd_, sd);
// Close the socket and free system resources and move on to next
// message.
intptr_t old_mask = di->Mask();
Dart_Port port = msg[i].dart_port;
di->RemovePort(port);
intptr_t new_mask = di->Mask();
UpdateKQueueInstance(old_mask, di);
intptr_t fd = di->fd();
if (di->IsListeningSocket()) {
// We only close the socket file descriptor from the operating
// system if there are no other dart socket objects which
// are listening on the same (address, port) combination.
// TODO(dart:io): This assumes that all sockets listen before we
// close.
// This needs to be synchronized with a global datastructure.
if (new_mask == 0) {
socket_map_.Remove(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
}
} else {
ASSERT(new_mask == 0);
socket_map_.Remove(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
}
DartUtils::PostInt32(port, 1 << kDestroyedEvent);
} else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
intptr_t old_mask = di->Mask();
di->ReturnTokens(msg[i].dart_port, TOKEN_COUNT(msg[i].data));
UpdateKQueueInstance(old_mask, di);
} else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
// `events` can only have kInEvent/kOutEvent flags set.
intptr_t events = msg[i].data & EVENT_MASK;
ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
// Setup events to wait for.
ASSERT(sd->port() == 0);
sd->SetPortAndMask(msg[i].dart_port, events);
AddToKqueue(kqueue_fd_, sd);
intptr_t old_mask = di->Mask();
di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
UpdateKQueueInstance(old_mask, di);
} else {
UNREACHABLE();
}
@ -249,12 +303,12 @@ static void PrintEventMask(intptr_t fd, struct kevent* event) {
intptr_t EventHandlerImplementation::GetEvents(struct kevent* event,
SocketData* sd) {
DescriptorInfo* di) {
#ifdef DEBUG_KQUEUE
PrintEventMask(sd->fd(), event);
PrintEventMask(di->fd(), event);
#endif
intptr_t event_mask = 0;
if (sd->IsListeningSocket()) {
if (di->IsListeningSocket()) {
// On a listening socket the READ event means that there are
// connections ready to be accepted.
if (event->filter == EVFILT_READ) {
@ -310,15 +364,19 @@ void EventHandlerImplementation::HandleEvents(struct kevent* events,
if (events[i].udata == NULL) {
interrupt_seen = true;
} else {
SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata);
intptr_t event_mask = GetEvents(events + i, sd);
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(events[i].udata);
intptr_t event_mask = GetEvents(events + i, di);
if ((event_mask & (1 << kErrorEvent)) != 0) {
di->NotifyAllDartPorts(event_mask);
}
event_mask &= ~(1 << kErrorEvent);
if (event_mask != 0) {
if (sd->TakeToken()) {
// Took last token, remove from epoll.
RemoveFromKqueue(kqueue_fd_, sd);
}
Dart_Port port = sd->port();
intptr_t old_mask = di->Mask();
Dart_Port port = di->NextNotifyDartPort(event_mask);
ASSERT(port != 0);
UpdateKQueueInstance(old_mask, di);
DartUtils::PostInt32(port, event_mask);
}
}

View file

@ -21,56 +21,50 @@
namespace dart {
namespace bin {
class SocketData {
class DescriptorInfo : public DescriptorInfoBase {
public:
explicit SocketData(intptr_t fd, bool is_listening)
: fd_(fd),
port_(0),
mask_(0),
tracked_by_kqueue_(false),
tokens_(16),
is_listening_(is_listening) {
ASSERT(fd_ != -1);
explicit DescriptorInfo(intptr_t fd)
: DescriptorInfoBase(fd), tracked_by_kqueue_(false) { }
virtual ~DescriptorInfo() { }
intptr_t GetPollEvents();
virtual void Close() {
VOID_TEMP_FAILURE_RETRY(close(fd_));
fd_ = -1;
}
bool HasReadEvent();
bool HasWriteEvent();
bool IsListeningSocket() { return is_listening_; }
void SetPortAndMask(Dart_Port port, intptr_t mask) {
ASSERT(fd_ != -1);
port_ = port;
mask_ = mask;
}
intptr_t fd() { return fd_; }
Dart_Port port() { return port_; }
intptr_t mask() { return mask_; }
bool tracked_by_kqueue() { return tracked_by_kqueue_; }
void set_tracked_by_kqueue(bool value) {
tracked_by_kqueue_ = value;
}
// Returns true if the last token was taken.
bool TakeToken() {
tokens_--;
return tokens_ == 0;
}
bool tracked_by_kqueue() { return tracked_by_kqueue_; }
// Returns true if the tokens was 0 before adding.
bool ReturnToken() {
tokens_++;
return tokens_ == 1;
}
bool HasReadEvent();
private:
intptr_t fd_;
Dart_Port port_;
intptr_t mask_;
bool HasWriteEvent();
protected:
bool tracked_by_kqueue_;
int tokens_;
bool is_listening_;
};
class DescriptorInfoSingle
: public DescriptorInfoSingleMixin<DescriptorInfo> {
public:
explicit DescriptorInfoSingle(intptr_t fd)
: DescriptorInfoSingleMixin(fd) {}
virtual ~DescriptorInfoSingle() {}
};
class DescriptorInfoMultiple
: public DescriptorInfoMultipleMixin<DescriptorInfo> {
public:
explicit DescriptorInfoMultiple(intptr_t fd)
: DescriptorInfoMultipleMixin(fd) {}
virtual ~DescriptorInfoMultiple() {}
};
@ -79,9 +73,11 @@ class EventHandlerImplementation {
EventHandlerImplementation();
~EventHandlerImplementation();
void UpdateKQueueInstance(intptr_t old_mask, DescriptorInfo *di);
// Gets the socket data structure for a given file
// descriptor. Creates a new one if one is not found.
SocketData* GetSocketData(intptr_t fd, bool is_listening);
DescriptorInfo* GetDescriptorInfo(intptr_t fd, bool is_listening);
void SendData(intptr_t id, Dart_Port dart_port, int64_t data);
void Start(EventHandler* handler);
void Shutdown();
@ -94,7 +90,7 @@ class EventHandlerImplementation {
void WakeupHandler(intptr_t id, Dart_Port dart_port, int64_t data);
void HandleInterruptFd();
void SetPort(intptr_t fd, Dart_Port dart_port, intptr_t mask);
intptr_t GetEvents(struct kevent* event, SocketData* sd);
intptr_t GetEvents(struct kevent* event, DescriptorInfo* di);
static void* GetHashmapKeyFromFd(intptr_t fd);
static uint32_t GetHashmapHashFromFd(intptr_t fd);