Move the event handler on macos from poll to kqueue. Simpler and faster.

R=sgjesse@google.com
BUG=
TEST=

Review URL: https://chromiumcodereview.appspot.com//9365010

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@4028 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
ager@google.com 2012-02-08 12:18:29 +00:00
parent 4bb47cafcd
commit 0a0f1d9504
2 changed files with 183 additions and 164 deletions

View file

@ -5,10 +5,10 @@
#include "bin/eventhandler.h"
#include <errno.h>
#include <poll.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <sys/event.h>
#include <sys/time.h>
#include <unistd.h>
@ -33,21 +33,82 @@ static const int kInfinityTimeout = -1;
static const int kTimerId = -1;
intptr_t SocketData::GetPollEvents() {
// Do not ask for POLLERR and POLLHUP explicitly as they are
// triggered anyway.
intptr_t events = 0;
if (!IsClosedRead()) {
if ((mask_ & (1 << kInEvent)) != 0) {
events |= POLLIN;
bool SocketData::HasReadEvent() {
return !IsClosedRead() && ((mask_ & (1 << kInEvent)) != 0);
}
bool SocketData::HasWriteEvent() {
return !IsClosedWrite() && ((mask_ & (1 << kOutEvent)) != 0);
}
// Unregister the file descriptor for a SocketData structure with kqueue.
static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) {
static const intptr_t kMaxChanges = 2;
intptr_t changes = 0;
struct kevent events[kMaxChanges];
if (sd->read_tracked_by_kqueue()) {
EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
++changes;
sd->set_read_tracked_by_kqueue(false);
}
if (sd->write_tracked_by_kqueue()) {
EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, sd);
++changes;
sd->set_write_tracked_by_kqueue(false);
}
if (changes > 0) {
ASSERT(changes < kMaxChanges);
int status =
TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
if (status == -1) {
FATAL("Failed deleting events from kqueue");
}
}
if (!IsClosedWrite()) {
if ((mask_ & (1 << kOutEvent)) != 0) {
events |= POLLOUT;
}
// Update the kqueue registration for SocketData structure to reflect
// the events currently of interest.
static void UpdateKqueue(intptr_t kqueue_fd_, SocketData* sd) {
static const intptr_t kMaxChanges = 2;
intptr_t changes = 0;
struct kevent events[kMaxChanges];
if (sd->port() != 0) {
// Register or unregister READ filter if needed.
if (sd->HasReadEvent()) {
if (!sd->read_tracked_by_kqueue()) {
EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_ADD, 0, 0, sd);
++changes;
sd->set_read_tracked_by_kqueue(true);
}
} else if (sd->read_tracked_by_kqueue()) {
EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
++changes;
sd->set_read_tracked_by_kqueue(false);
}
// Register or unregister WRITE filter if needed.
if (sd->HasWriteEvent()) {
if (!sd->write_tracked_by_kqueue()) {
EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_ADD, 0, 0, sd);
++changes;
sd->set_write_tracked_by_kqueue(true);
}
} else if (sd->write_tracked_by_kqueue()) {
EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
++changes;
sd->set_write_tracked_by_kqueue(false);
}
}
if (changes > 0) {
ASSERT(changes < kMaxChanges);
int status =
TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
if (status == -1) {
FATAL("Failed updating kqueue");
}
}
return events;
}
@ -61,6 +122,18 @@ EventHandlerImplementation::EventHandlerImplementation()
FDUtils::SetNonBlocking(interrupt_fds_[0]);
timeout_ = kInfinityTimeout;
timeout_port_ = 0;
kqueue_fd_ = TEMP_FAILURE_RETRY(kqueue());
if (kqueue_fd_ == -1) {
FATAL("Failed creating kqueue");
}
// Register the interrupt_fd with the kqueue.
struct kevent event;
EV_SET(&event, interrupt_fds_[0], EVFILT_READ, EV_ADD, 0, 0, NULL);
int status = TEMP_FAILURE_RETRY(kevent(kqueue_fd_, &event, 1, NULL, 0, NULL));
if (status == -1) {
FATAL("Failed adding interrupt fd to kqueue");
}
}
@ -77,8 +150,8 @@ SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
ASSERT(entry != NULL);
SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
if (sd == NULL) {
// If there is no data in the hash map for this file descriptor
// then this is inserting a new SocketData for the file descriptor.
// If there is no data in the hash map for this file descriptor a
// new SocketData for the file descriptor is inserted.
sd = new SocketData(fd);
entry->value = sd;
}
@ -105,43 +178,6 @@ void EventHandlerImplementation::WakeupHandler(intptr_t id,
}
struct pollfd* EventHandlerImplementation::GetPollFds(intptr_t* pollfds_size) {
struct pollfd* pollfds;
// Calculate the number of file descriptors to poll on.
intptr_t numPollfds = 1;
for (HashMap::Entry* entry = socket_map_.Start();
entry != NULL;
entry = socket_map_.Next(entry)) {
SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
if (sd->port() > 0 && sd->GetPollEvents() != 0) numPollfds++;
}
pollfds = reinterpret_cast<struct pollfd*>(calloc(sizeof(struct pollfd),
numPollfds));
pollfds[0].fd = interrupt_fds_[0];
pollfds[0].events |= POLLIN;
int i = 1;
for (HashMap::Entry* entry = socket_map_.Start();
entry != NULL;
entry = socket_map_.Next(entry)) {
SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
intptr_t events = sd->GetPollEvents();
if (sd->port() > 0 && events != 0) {
// Fd is added to the poll set.
pollfds[i].fd = sd->fd();
pollfds[i].events = events;
i++;
}
}
ASSERT(numPollfds == i);
*pollfds_size = i;
return pollfds;
}
bool EventHandlerImplementation::GetInterruptMessage(InterruptMessage* msg) {
int total_read = 0;
int bytes_read =
@ -173,13 +209,16 @@ void EventHandlerImplementation::HandleInterruptFd() {
ASSERT(msg.data == (1 << kShutdownReadCommand));
// Close the socket for reading.
sd->ShutdownRead();
UpdateKqueue(kqueue_fd_, sd);
} else if ((msg.data & (1 << kShutdownWriteCommand)) != 0) {
ASSERT(msg.data == (1 << kShutdownWriteCommand));
// Close the socket for writing.
sd->ShutdownWrite();
UpdateKqueue(kqueue_fd_, sd);
} else if ((msg.data & (1 << kCloseCommand)) != 0) {
ASSERT(msg.data == (1 << kCloseCommand));
// Close the socket and free system resources.
RemoveFromKqueue(kqueue_fd_, sd);
intptr_t fd = sd->fd();
sd->Close();
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
@ -187,100 +226,64 @@ void EventHandlerImplementation::HandleInterruptFd() {
} else {
// Setup events to wait for.
sd->SetPortAndMask(msg.dart_port, msg.data);
UpdateKqueue(kqueue_fd_, sd);
}
}
}
}
#ifdef DEBUG_POLL
static void PrintEventMask(struct pollfd* pollfd) {
printf("%d ", pollfd->fd);
if ((pollfd->revents & POLLIN) != 0) printf("POLLIN ");
if ((pollfd->revents & POLLPRI) != 0) printf("POLLPRI ");
if ((pollfd->revents & POLLOUT) != 0) printf("POLLOUT ");
if ((pollfd->revents & POLLERR) != 0) printf("POLLERR ");
if ((pollfd->revents & POLLHUP) != 0) printf("POLLHUP ");
if ((pollfd->revents & POLLNVAL) != 0) printf("POLLNVAL ");
int all_events = POLLIN | POLLPRI | POLLOUT | POLLERR | POLLHUP | POLLNVAL;
if ((pollfd->revents & ~all_events) != 0) {
printf("(and %08x) ", pollfd->revents & ~all_events);
}
printf("(available %d) ", FDUtils::AvailableBytes(pollfd->fd));
#ifdef DEBUG_KQUEUE
static void PrintEventMask(intptr_t fd, struct kevent* event) {
printf("%d ", fd);
if (event->filter == EVFILT_READ) printf("EVFILT_READ ");
if (event->filter == EVFILT_WRITE) printf("EVFILT_WRITE ");
printf("flags: %x: ", event->flags);
if ((event->flags & EV_EOF) != 0) printf("EV_EOF ");
if ((event->flags & EV_ERROR) != 0) printf("EV_ERROR ");
printf("(available %d) ", FDUtils::AvailableBytes(fd));
printf("\n");
}
#endif
intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) {
#ifdef DEBUG_POLL
if (pollfd->fd != interrupt_fds_[0]) PrintEventMask(pollfd);
intptr_t EventHandlerImplementation::GetEvents(struct kevent* event,
SocketData* sd) {
#ifdef DEBUG_KQUEUE
PrintEventMask(sd->fd(), event);
#endif
intptr_t event_mask = 0;
SocketData* sd = GetSocketData(pollfd->fd);
if (sd->IsListeningSocket()) {
// For listening sockets the POLLIN event indicate that there are
// connections ready for accept unless accompanied with one of the
// other flags.
if ((pollfd->revents & POLLIN) != 0) {
if ((pollfd->revents & POLLHUP) != 0) event_mask |= (1 << kCloseEvent);
if ((pollfd->revents & POLLERR) != 0) event_mask |= (1 << kErrorEvent);
// On a listening socket the READ event means that there are
// connections ready to be accepted.
if (event->filter == EVFILT_READ) {
if ((event->flags & EV_EOF) != 0) event_mask |= (1 << kCloseEvent);
if ((event->flags & EV_ERROR) != 0) event_mask |= (1 << kErrorEvent);
if (event_mask == 0) event_mask |= (1 << kInEvent);
}
} else {
if ((pollfd->revents & POLLNVAL) != 0) {
return 0;
}
// Prioritize data events over close and error events.
if ((pollfd->revents & POLLIN) != 0) {
if (FDUtils::AvailableBytes(pollfd->fd) != 0) {
event_mask = (1 << kInEvent);
} else if (((pollfd->revents & POLLHUP) != 0)) {
if (event->filter == EVFILT_READ) {
if (FDUtils::AvailableBytes(sd->fd()) != 0) {
event_mask = (1 << kInEvent);
} else if ((event->flags & EV_EOF) != 0) {
event_mask = (1 << kCloseEvent);
sd->MarkClosedRead();
} else if ((pollfd->revents & POLLERR) != 0) {
} else if ((event->flags & EV_ERROR) != 0) {
event_mask = (1 << kErrorEvent);
} else {
if (sd->IsPipe()) {
// When reading from stdin (either from a terminal or piped
// input) treat POLLIN with 0 available bytes as
// end-of-file.
if (sd->fd() == STDIN_FILENO) {
event_mask = (1 << kCloseEvent);
sd->MarkClosedRead();
}
} else {
// If POLLIN is set with no available data and no POLLHUP use
// recv to peek for whether the other end of the socket
// actually closed.
char buffer;
ssize_t bytesPeeked =
TEMP_FAILURE_RETRY(recv(sd->fd(), &buffer, 1, MSG_PEEK));
ASSERT(EAGAIN == EWOULDBLOCK);
if (bytesPeeked == 0) {
event_mask = (1 << kCloseEvent);
sd->MarkClosedRead();
} else if (errno != EWOULDBLOCK) {
fprintf(stderr, "Error recv: %s\n", strerror(errno));
}
}
}
}
// On pipes POLLHUP is reported without POLLIN when there is no
// more data to read.
if (sd->IsPipe()) {
if (((pollfd->revents & POLLIN) == 0) &&
((pollfd->revents & POLLHUP) != 0)) {
event_mask = (1 << kCloseEvent);
sd->MarkClosedRead();
}
}
if ((pollfd->revents & POLLOUT) != 0) {
if ((pollfd->revents & POLLERR) != 0) {
if (event->filter == EVFILT_WRITE) {
if ((event->flags & EV_ERROR) != 0) {
event_mask = (1 << kErrorEvent);
sd->MarkClosedWrite();
} else if ((event->flags & EV_EOF) != 0) {
// If the receiver closed for reading, close for writing,
// update the registration with kqueue, and do not report a
// write event.
sd->MarkClosedWrite();
UpdateKqueue(kqueue_fd_, sd);
} else {
event_mask |= (1 << kOutEvent);
}
@ -291,25 +294,19 @@ intptr_t EventHandlerImplementation::GetPollEvents(struct pollfd* pollfd) {
}
void EventHandlerImplementation::HandleEvents(struct pollfd* pollfds,
int pollfds_size,
int result_size) {
if ((pollfds[0].revents & POLLIN) != 0) {
result_size -= 1;
}
if (result_size > 0) {
for (int i = 1; i < pollfds_size; i++) {
/*
* The fd is unregistered. It gets re-registered when the request
* was handled by dart.
*/
intptr_t event_mask = GetPollEvents(&pollfds[i]);
void EventHandlerImplementation::HandleEvents(struct kevent* events,
int size) {
for (int i = 0; i < size; i++) {
if (events[i].udata != NULL) {
SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata);
intptr_t event_mask = GetEvents(events + i, sd);
if (event_mask != 0) {
intptr_t fd = pollfds[i].fd;
SocketData* sd = GetSocketData(fd);
// Unregister events for the file descriptor. Events will be
// registered again when the current event has been handled in
// Dart code.
RemoveFromKqueue(kqueue_fd_, sd);
Dart_Port port = sd->port();
ASSERT(port != 0);
sd->Unregister();
DartUtils::PostInt32(port, event_mask);
}
}
@ -339,33 +336,44 @@ void EventHandlerImplementation::HandleTimeout() {
}
void EventHandlerImplementation::Poll(uword args) {
intptr_t pollfds_size;
struct pollfd* pollfds;
void EventHandlerImplementation::EventHandlerEntry(uword args) {
static const intptr_t kMaxEvents = 16;
struct kevent events[kMaxEvents];
EventHandlerImplementation* handler =
reinterpret_cast<EventHandlerImplementation*>(args);
ASSERT(handler != NULL);
while (1) {
pollfds = handler->GetPollFds(&pollfds_size);
intptr_t millis = handler->GetTimeout();
intptr_t result = TEMP_FAILURE_RETRY(poll(pollfds, pollfds_size, millis));
ASSERT(EAGAIN == EWOULDBLOCK);
struct timespec ts;
int64_t secs = 0;
int64_t nanos = 0;
if (millis > 0) {
secs = millis / 1000;
nanos = (millis - (secs * 1000)) * 1000000;
}
ts.tv_sec = secs;
ts.tv_nsec = nanos;
intptr_t result = TEMP_FAILURE_RETRY(kevent(handler->kqueue_fd_,
NULL,
0,
events,
kMaxEvents,
&ts));
if (result == -1) {
if (errno != EWOULDBLOCK) {
perror("Poll failed");
}
perror("kevent failed");
FATAL("kevent failed\n");
} else {
handler->HandleTimeout();
handler->HandleEvents(pollfds, pollfds_size, result);
handler->HandleEvents(events, result);
}
free(pollfds);
}
}
void EventHandlerImplementation::StartEventHandler() {
int result = dart::Thread::Start(&EventHandlerImplementation::Poll,
reinterpret_cast<uword>(this));
int result =
dart::Thread::Start(&EventHandlerImplementation::EventHandlerEntry,
reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Failed to start event handler thread %d", result);
}

View file

@ -30,16 +30,18 @@ enum PortDataFlags {
class SocketData {
public:
explicit SocketData(intptr_t fd) : fd_(fd), port_(0), mask_(0), flags_(0) {
explicit SocketData(intptr_t fd)
: fd_(fd),
port_(0),
mask_(0),
flags_(0),
read_tracked_by_kqueue_(false),
write_tracked_by_kqueue_(false) {
ASSERT(fd_ != -1);
}
intptr_t GetPollEvents();
void Unregister() {
port_ = 0;
mask_ = 0;
}
bool HasReadEvent();
bool HasWriteEvent();
void ShutdownRead() {
shutdown(fd_, SHUT_RD);
@ -52,7 +54,8 @@ class SocketData {
}
void Close() {
Unregister();
port_ = 0;
mask_ = 0;
flags_ = 0;
close(fd_);
fd_ = -1;
@ -66,8 +69,6 @@ class SocketData {
void MarkClosedRead() { flags_ |= (1 << kClosedRead); }
void MarkClosedWrite() { flags_ |= (1 << kClosedWrite); }
bool HasPollEvents() { return mask_ != 0; }
void SetPortAndMask(Dart_Port port, intptr_t mask) {
ASSERT(fd_ != -1);
port_ = port;
@ -77,12 +78,22 @@ class SocketData {
intptr_t fd() { return fd_; }
Dart_Port port() { return port_; }
intptr_t mask() { return mask_; }
bool read_tracked_by_kqueue() { return read_tracked_by_kqueue_; }
void set_read_tracked_by_kqueue(bool value) {
read_tracked_by_kqueue_ = value;
}
bool write_tracked_by_kqueue() { return write_tracked_by_kqueue_; }
void set_write_tracked_by_kqueue(bool value) {
write_tracked_by_kqueue_ = value;
}
private:
intptr_t fd_;
Dart_Port port_;
intptr_t mask_;
intptr_t flags_;
bool read_tracked_by_kqueue_;
bool write_tracked_by_kqueue_;
};
@ -92,7 +103,7 @@ class EventHandlerImplementation {
~EventHandlerImplementation();
// Gets the socket data structure for a given file
// descriptor. Creates a new one of one is not found.
// descriptor. Creates a new one if one is not found.
SocketData* GetSocketData(intptr_t fd);
void SendData(intptr_t id, Dart_Port dart_port, intptr_t data);
void StartEventHandler();
@ -100,14 +111,13 @@ class EventHandlerImplementation {
private:
intptr_t GetTimeout();
bool GetInterruptMessage(InterruptMessage* msg);
struct pollfd* GetPollFds(intptr_t* size);
void HandleEvents(struct pollfd* pollfds, int pollfds_size, int result_size);
void HandleEvents(struct kevent* events, int size);
void HandleTimeout();
static void Poll(uword args);
static void EventHandlerEntry(uword args);
void WakeupHandler(intptr_t id, Dart_Port dart_port, int64_t data);
void HandleInterruptFd();
void SetPort(intptr_t fd, Dart_Port dart_port, intptr_t mask);
intptr_t GetPollEvents(struct pollfd* pollfd);
intptr_t GetEvents(struct kevent* event, SocketData* sd);
static void* GetHashmapKeyFromFd(intptr_t fd);
static uint32_t GetHashmapHashFromFd(intptr_t fd);
@ -115,6 +125,7 @@ class EventHandlerImplementation {
int64_t timeout_; // Time for next timeout.
Dart_Port timeout_port_;
int interrupt_fds_[2];
int kqueue_fd_;
};