Make the event-handler handle backpreasure.

We now limit the number of active events per fd to 8.

BUG=
R=whesse@google.com

Review URL: https://codereview.chromium.org//198743002

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@33649 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
ajohnsen@google.com 2014-03-13 12:02:47 +00:00
parent 6aa0e46095
commit 9a71498127
11 changed files with 383 additions and 234 deletions

View file

@ -7,7 +7,6 @@
#include "bin/builtin.h"
#include "bin/isolate_data.h"
#include "bin/socket.h"
namespace dart {
namespace bin {
@ -25,6 +24,7 @@ enum MessageFlags {
kCloseCommand = 8,
kShutdownReadCommand = 9,
kShutdownWriteCommand = 10,
kReturnTokenCommand = 11,
kListeningSocket = 16,
kPipe = 17,
};
@ -108,7 +108,7 @@ namespace bin {
class EventHandler {
public:
void SendData(intptr_t id, Dart_Port dart_port, int64_t data) {
delegate_.Notify(id, dart_port, data);
delegate_.SendData(id, dart_port, data);
}
/**

View file

@ -40,26 +40,32 @@ static const int kTimerId = -1;
static const int kShutdownId = -2;
intptr_t SocketData::GetPollEvents() {
// Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
// triggered anyway.
intptr_t events = EPOLLET | EPOLLRDHUP;
if ((mask_ & (1 << kInEvent)) != 0) {
events |= EPOLLIN;
}
if ((mask_ & (1 << kOutEvent)) != 0) {
events |= EPOLLOUT;
}
return events;
}
// Unregister the file descriptor for a SocketData structure with epoll.
static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
if (!sd->tracked_by_epoll()) return;
int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_DEL,
sd->fd(),
NULL));
if (status == -1) {
FATAL("Failed unregistering events for file descriptor");
}
sd->set_tracked_by_epoll(false);
VOID_TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_DEL,
sd->fd(),
NULL));
}
static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
ASSERT(!sd->tracked_by_epoll());
struct epoll_event event;
event.events = EPOLLET | EPOLLRDHUP;
if ((sd->mask() & (1 << kInEvent)) != 0) event.events |= EPOLLIN;
if ((sd->mask() & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT;
event.events = sd->GetPollEvents();
event.data.ptr = sd;
int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
@ -71,8 +77,6 @@ static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
// as /dev/null. In such case, mark the file descriptor as closed,
// so dart will handle it accordingly.
DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
} else {
sd->set_tracked_by_epoll(true);
}
}
@ -116,8 +120,7 @@ EventHandlerImplementation::~EventHandlerImplementation() {
}
SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd,
bool* is_new) {
SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
ASSERT(fd >= 0);
HashMap::Entry* entry = socket_map_.Lookup(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
@ -128,7 +131,6 @@ SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd,
// new SocketData for the file descriptor is inserted.
sd = new SocketData(fd);
entry->value = sd;
*is_new = true;
}
ASSERT(fd == sd->fd());
return sd;
@ -168,20 +170,15 @@ void EventHandlerImplementation::HandleInterruptFd() {
} else if (msg[i].id == kShutdownId) {
shutdown_ = true;
} else {
bool is_new = false;
SocketData* sd = GetSocketData(msg[i].id, &is_new);
if (is_new) {
sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
AddToEpollInstance(epoll_fd_, sd);
}
SocketData* sd = GetSocketData(msg[i].id);
if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) {
ASSERT(msg[i].data == (1 << kShutdownReadCommand));
// Close the socket for reading.
sd->ShutdownRead();
shutdown(sd->fd(), SHUT_RD);
} else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) {
ASSERT(msg[i].data == (1 << kShutdownWriteCommand));
// Close the socket for writing.
sd->ShutdownWrite();
shutdown(sd->fd(), SHUT_WR);
} else if ((msg[i].data & (1 << kCloseCommand)) != 0) {
ASSERT(msg[i].data == (1 << kCloseCommand));
// Close the socket and free system resources and move on to
@ -192,6 +189,14 @@ void EventHandlerImplementation::HandleInterruptFd() {
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
delete sd;
DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
} else if ((msg[i].data & (1 << kReturnTokenCommand)) != 0) {
if (sd->ReturnToken()) {
AddToEpollInstance(epoll_fd_, sd);
}
} else {
// Setup events to wait for.
sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
AddToEpollInstance(epoll_fd_, sd);
}
}
}
@ -222,47 +227,14 @@ intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
#ifdef DEBUG_POLL
PrintEventMask(sd->fd(), events);
#endif
intptr_t event_mask = 0;
if (sd->IsListeningSocket()) {
// For listening sockets the EPOLLIN event indicate that there are
// connections ready for accept unless accompanied with one of the
// other flags.
if ((events & EPOLLIN) != 0) {
if ((events & EPOLLHUP) != 0) event_mask |= (1 << kCloseEvent);
if ((events & EPOLLERR) != 0) event_mask |= (1 << kErrorEvent);
if (event_mask == 0) event_mask |= (1 << kInEvent);
}
} else {
// Prioritize data events over close and error events.
if ((events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP)) != 0) {
// If we have EPOLLIN and we have available bytes, report that.
if ((events & EPOLLIN) != 0) {
event_mask = (1 << kInEvent);
}
if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
// If both EPOLLHUP and EPOLLERR are reported treat it as an
// error.
if ((events & EPOLLERR) != 0) {
event_mask = (1 << kErrorEvent);
} else {
event_mask |= (1 << kCloseEvent);
}
} else if ((events & EPOLLERR) != 0) {
event_mask = (1 << kErrorEvent);
}
}
if ((events & EPOLLOUT) != 0) {
if ((events & EPOLLERR) != 0) {
if (!sd->IsPipe()) {
event_mask = (1 << kErrorEvent);
}
} else {
event_mask |= (1 << kOutEvent);
}
}
if (events & EPOLLERR) {
// Return error only if EPOLLIN is present.
return (events & EPOLLIN) ? (1 << kErrorEvent) : 0;
}
intptr_t event_mask = 0;
if (events & EPOLLIN) event_mask |= (1 << kInEvent);
if (events & EPOLLOUT) event_mask |= (1 << kOutEvent);
if (events & (EPOLLHUP | EPOLLRDHUP)) event_mask |= (1 << kCloseEvent);
return event_mask;
}
@ -277,6 +249,10 @@ void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr);
intptr_t event_mask = GetPollEvents(events[i].events, sd);
if (event_mask != 0) {
if (sd->TakeToken()) {
// Took last token, remove from epoll.
RemoveFromEpollInstance(epoll_fd_, sd);
}
Dart_Port port = sd->port();
ASSERT(port != 0);
DartUtils::PostInt32(port, event_mask);
@ -350,13 +326,13 @@ void EventHandlerImplementation::Start(EventHandler* handler) {
void EventHandlerImplementation::Shutdown() {
Notify(kShutdownId, 0, 0);
SendData(kShutdownId, 0, 0);
}
void EventHandlerImplementation::Notify(intptr_t id,
Dart_Port dart_port,
intptr_t data) {
void EventHandlerImplementation::SendData(intptr_t id,
Dart_Port dart_port,
intptr_t data) {
WakeupHandler(id, dart_port, data);
}

View file

@ -31,17 +31,11 @@ class InterruptMessage {
class SocketData {
public:
explicit SocketData(intptr_t fd)
: tracked_by_epoll_(false), fd_(fd), port_(0), mask_(0) {
: fd_(fd), port_(0), mask_(0), tokens_(8) {
ASSERT(fd_ != -1);
}
void ShutdownRead() {
shutdown(fd_, SHUT_RD);
}
void ShutdownWrite() {
shutdown(fd_, SHUT_WR);
}
intptr_t GetPollEvents();
void Close() {
port_ = 0;
@ -50,9 +44,6 @@ class SocketData {
fd_ = -1;
}
bool IsListeningSocket() { return (mask_ & (1 << kListeningSocket)) != 0; }
bool IsPipe() { return (mask_ & (1 << kPipe)) != 0; }
void SetPortAndMask(Dart_Port port, intptr_t mask) {
ASSERT(fd_ != -1);
port_ = port;
@ -61,15 +52,26 @@ class SocketData {
intptr_t fd() { return fd_; }
Dart_Port port() { return port_; }
intptr_t mask() { return mask_; }
bool tracked_by_epoll() { return tracked_by_epoll_; }
void set_tracked_by_epoll(bool value) { tracked_by_epoll_ = value; }
// Returns true if the last token was taken.
bool TakeToken() {
ASSERT(tokens_ > 0);
tokens_--;
return tokens_ == 0;
}
// Returns true if the tokens was 0 before adding.
bool ReturnToken() {
ASSERT(tokens_ >= 0);
tokens_++;
return tokens_ == 1;
}
private:
bool tracked_by_epoll_;
intptr_t fd_;
Dart_Port port_;
intptr_t mask_;
int tokens_;
};
@ -80,8 +82,8 @@ class EventHandlerImplementation {
// Gets the socket data structure for a given file
// descriptor. Creates a new one if one is not found.
SocketData* GetSocketData(intptr_t fd, bool* is_new);
void Notify(intptr_t id, Dart_Port dart_port, intptr_t data);
SocketData* GetSocketData(intptr_t fd);
void SendData(intptr_t id, Dart_Port dart_port, intptr_t data);
void Start(EventHandler* handler);
void Shutdown();

View file

@ -20,7 +20,6 @@
#include "bin/dartutils.h"
#include "bin/fdutils.h"
#include "bin/log.h"
#include "bin/socket.h"
#include "platform/hashmap.h"
#include "platform/thread.h"
#include "platform/utils.h"
@ -29,36 +28,63 @@
namespace dart {
namespace bin {
static const int kInterruptMessageSize = sizeof(InterruptMessage);
static const int kTimerId = -1;
static const int kShutdownId = -2;
static void AddToEpollInstance(intptr_t epoll_fd_,
int fd, Dart_Port port,
int mask) {
struct epoll_event event;
event.events = EPOLLET | EPOLLRDHUP;
if ((mask & (1 << kInEvent)) != 0) event.events |= EPOLLIN;
if ((mask & (1 << kOutEvent)) != 0) event.events |= EPOLLOUT;
// Be sure we don't collide with the TIMER_BIT.
if (port == ILLEGAL_PORT) {
FATAL("Illigal port sent to event handler");
intptr_t SocketData::GetPollEvents() {
// Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
// triggered anyway.
intptr_t events = EPOLLET | EPOLLRDHUP;
if ((mask_ & (1 << kInEvent)) != 0) {
events |= EPOLLIN;
}
event.data.u64 = port;
if ((mask_ & (1 << kOutEvent)) != 0) {
events |= EPOLLOUT;
}
return events;
}
// Unregister the file descriptor for a SocketData structure with epoll.
static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
VOID_TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_DEL,
sd->fd(),
NULL));
}
static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
struct epoll_event event;
event.events = sd->GetPollEvents();
event.data.ptr = sd;
int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
fd,
sd->fd(),
&event));
if (status == -1) {
// Epoll does not accept the file descriptor. It could be due to
// already closed file descriptor, or unuspported devices, such
// as /dev/null. In such case, mark the file descriptor as closed,
// so dart will handle it accordingly.
DartUtils::PostInt32(port, 1 << kCloseEvent);
DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
}
}
EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) {
EventHandlerImplementation::EventHandlerImplementation()
: socket_map_(&HashMap::SamePointerValue, 16) {
intptr_t result;
result = TEMP_FAILURE_RETRY(pipe(interrupt_fds_));
if (result != 0) {
FATAL("Pipe creation failed");
}
FDUtils::SetNonBlocking(interrupt_fds_[0]);
FDUtils::SetCloseOnExec(interrupt_fds_[0]);
FDUtils::SetCloseOnExec(interrupt_fds_[1]);
shutdown_ = false;
// The initial size passed to epoll_create is ignore on newer (>=
// 2.6.8) Linux versions
static const int kEpollInitialSize = 64;
@ -67,18 +93,28 @@ EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) {
FATAL1("Failed creating epoll file descriptor: %i", errno);
}
FDUtils::SetCloseOnExec(epoll_fd_);
// Register the interrupt_fd with the epoll instance.
struct epoll_event event;
event.events = EPOLLIN;
event.data.ptr = NULL;
int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
interrupt_fds_[0],
&event));
if (status == -1) {
FATAL("Failed adding interrupt fd to epoll instance");
}
timer_fd_ = TEMP_FAILURE_RETRY(timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC));
if (timer_fd_ == -1) {
FATAL1("Failed creating timerfd file descriptor: %i", errno);
}
// Register the timer_fd_ with the epoll instance.
struct epoll_event event;
event.events = EPOLLIN;
event.data.u64 = ILLEGAL_PORT; // Use ILLEGAL_PORT to identify timer-fd.
int status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
timer_fd_,
&event));
event.data.fd = timer_fd_;
status = TEMP_FAILURE_RETRY(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
timer_fd_,
&event));
if (status == -1) {
FATAL2(
"Failed adding timerfd fd(%i) to epoll instance: %i", timer_fd_, errno);
@ -87,14 +123,106 @@ EventHandlerImplementation::EventHandlerImplementation() : shutdown_(false) {
EventHandlerImplementation::~EventHandlerImplementation() {
TEMP_FAILURE_RETRY(close(epoll_fd_));
TEMP_FAILURE_RETRY(close(timer_fd_));
VOID_TEMP_FAILURE_RETRY(close(epoll_fd_));
VOID_TEMP_FAILURE_RETRY(close(timer_fd_));
VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
}
SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
ASSERT(fd >= 0);
HashMap::Entry* entry = socket_map_.Lookup(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
ASSERT(entry != NULL);
SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
if (sd == NULL) {
// If there is no data in the hash map for this file descriptor a
// new SocketData for the file descriptor is inserted.
sd = new SocketData(fd);
entry->value = sd;
}
ASSERT(fd == sd->fd());
return sd;
}
void EventHandlerImplementation::WakeupHandler(intptr_t id,
Dart_Port dart_port,
int64_t data) {
InterruptMessage msg;
msg.id = id;
msg.dart_port = dart_port;
msg.data = data;
// WriteToBlocking will write up to 512 bytes atomically, and since our msg
// is smaller than 512, we don't need a thread lock.
// See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
ASSERT(kInterruptMessageSize < PIPE_BUF);
intptr_t result =
FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
if (result != kInterruptMessageSize) {
if (result == -1) {
perror("Interrupt message failure:");
}
FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
}
}
void EventHandlerImplementation::HandleInterruptFd() {
const intptr_t MAX_MESSAGES = kInterruptMessageSize;
InterruptMessage msg[MAX_MESSAGES];
ssize_t bytes = TEMP_FAILURE_RETRY(
read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
if (msg[i].id == kTimerId) {
timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
struct itimerspec it;
memset(&it, 0, sizeof(it));
if (timeout_queue_.HasTimeout()) {
int64_t millis = timeout_queue_.CurrentTimeout();
it.it_value.tv_sec = millis / 1000;
it.it_value.tv_nsec = (millis % 1000) * 1000000;
}
timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
} else if (msg[i].id == kShutdownId) {
shutdown_ = true;
} else {
SocketData* sd = GetSocketData(msg[i].id);
if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) {
ASSERT(msg[i].data == (1 << kShutdownReadCommand));
// Close the socket for reading.
shutdown(sd->fd(), SHUT_RD);
} else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) {
ASSERT(msg[i].data == (1 << kShutdownWriteCommand));
// Close the socket for writing.
shutdown(sd->fd(), SHUT_WR);
} else if ((msg[i].data & (1 << kCloseCommand)) != 0) {
ASSERT(msg[i].data == (1 << kCloseCommand));
// Close the socket and free system resources and move on to
// next message.
RemoveFromEpollInstance(epoll_fd_, sd);
intptr_t fd = sd->fd();
sd->Close();
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
delete sd;
DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
} else if ((msg[i].data & (1 << kReturnTokenCommand)) != 0) {
if (sd->ReturnToken()) {
AddToEpollInstance(epoll_fd_, sd);
}
} else {
// Setup events to wait for.
sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
AddToEpollInstance(epoll_fd_, sd);
}
}
}
}
#ifdef DEBUG_POLL
static void PrintEventMask(intptr_t events) {
// TODO(ajohnsen): When DEBUG_POLL is enabled, we could add the fd to the
// epoll-data as well.
static void PrintEventMask(intptr_t fd, intptr_t events) {
Log::Print("%d ", fd);
if ((events & EPOLLIN) != 0) Log::Print("EPOLLIN ");
if ((events & EPOLLPRI) != 0) Log::Print("EPOLLPRI ");
if ((events & EPOLLOUT) != 0) Log::Print("EPOLLOUT ");
@ -106,17 +234,19 @@ static void PrintEventMask(intptr_t events) {
if ((events & ~all_events) != 0) {
Log::Print("(and %08x) ", events & ~all_events);
}
Log::Print("(available %d) ", FDUtils::AvailableBytes(fd));
Log::Print("\n");
}
#endif
intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) {
intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
SocketData* sd) {
#ifdef DEBUG_POLL
PrintEventMask(events);
PrintEventMask(sd->fd(), events);
#endif
if (events & EPOLLERR) {
// Return only error if EPOLLIN is present.
// Return error only if EPOLLIN is present.
return (events & EPOLLIN) ? (1 << kErrorEvent) : 0;
}
intptr_t event_mask = 0;
@ -129,32 +259,40 @@ intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events) {
void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
int size) {
bool interrupt_seen = false;
for (int i = 0; i < size; i++) {
uint64_t data = events[i].data.u64;
// ILLEGAL_PORT is used to identify timer-fd.
if (data == ILLEGAL_PORT) {
if (events[i].data.ptr == NULL) {
interrupt_seen = true;
} else if (events[i].data.fd == timer_fd_) {
int64_t val;
VOID_TEMP_FAILURE_RETRY(read(timer_fd_, &val, sizeof(val)));
timer_mutex_.Lock();
if (timeout_queue_.HasTimeout()) {
DartUtils::PostNull(timeout_queue_.CurrentPort());
timeout_queue_.RemoveCurrent();
}
timer_mutex_.Unlock();
} else {
int32_t event_mask = GetPollEvents(events[i].events);
SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr);
intptr_t event_mask = GetPollEvents(events[i].events, sd);
if (event_mask != 0) {
Dart_Port port = data;
if (sd->TakeToken()) {
// Took last token, remove from epoll.
RemoveFromEpollInstance(epoll_fd_, sd);
}
Dart_Port port = sd->port();
ASSERT(port != 0);
DartUtils::PostInt32(port, event_mask);
}
}
}
if (interrupt_seen) {
// Handle after socket events, so we avoid closing a socket before we handle
// the current events.
HandleInterruptFd();
}
}
void EventHandlerImplementation::Poll(uword args) {
// Main event-handler thread loop.
static const intptr_t kMaxEvents = 16;
struct epoll_event events[kMaxEvents];
EventHandler* handler = reinterpret_cast<EventHandler*>(args);
@ -188,51 +326,26 @@ void EventHandlerImplementation::Start(EventHandler* handler) {
void EventHandlerImplementation::Shutdown() {
shutdown_ = true;
SendData(kShutdownId, 0, 0);
}
void EventHandlerImplementation::Notify(intptr_t id,
Dart_Port dart_port,
int64_t data) {
// This method is called by isolates, that is, not in the event-handler
// thread.
if (id == kTimerId) {
// Lock this region, as multiple isolates may attempt to update
// timeout_queue_.
// TODO(ajohnsen): Consider using a timer-fd per isolate to avoid the lock.
timer_mutex_.Lock();
timeout_queue_.UpdateTimeout(dart_port, data);
struct itimerspec it;
memset(&it, 0, sizeof(it));
if (timeout_queue_.HasTimeout()) {
int64_t millis = timeout_queue_.CurrentTimeout();
it.it_value.tv_sec = millis / 1000;
it.it_value.tv_nsec = (millis % 1000) * 1000000;
}
timerfd_settime(timer_fd_, TFD_TIMER_ABSTIME, &it, NULL);
timer_mutex_.Unlock();
} else {
if ((data & (1 << kShutdownReadCommand)) != 0) {
ASSERT(data == (1 << kShutdownReadCommand));
// Close the socket for reading.
shutdown(id, SHUT_RD);
} else if ((data & (1 << kShutdownWriteCommand)) != 0) {
ASSERT(data == (1 << kShutdownWriteCommand));
// Close the socket for writing.
shutdown(id, SHUT_WR);
} else if ((data & (1 << kCloseCommand)) != 0) {
ASSERT(data == (1 << kCloseCommand));
// Close the socket and free system resources and move on to
// next message.
// This will also remove the file descriptor from epoll.
Socket::Close(id);
DartUtils::PostInt32(dart_port, 1 << kDestroyedEvent);
} else {
// Add to epoll - this is the first time we see it.
AddToEpollInstance(epoll_fd_, id, dart_port, data);
}
}
void EventHandlerImplementation::SendData(intptr_t id,
Dart_Port dart_port,
int64_t data) {
WakeupHandler(id, dart_port, data);
}
void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
// The hashmap does not support keys with value 0.
return reinterpret_cast<void*>(fd + 1);
}
uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
// The hashmap does not support keys with value 0.
return dart::Utils::WordHash(fd + 1);
}
} // namespace bin

View file

@ -14,32 +14,93 @@
#include <sys/socket.h>
#include "platform/hashmap.h"
#include "platform/thread.h"
namespace dart {
namespace bin {
class InterruptMessage {
public:
intptr_t id;
Dart_Port dart_port;
int64_t data;
};
class SocketData {
public:
explicit SocketData(intptr_t fd) : fd_(fd), port_(0), mask_(0), tokens_(8) {
ASSERT(fd_ != -1);
}
intptr_t GetPollEvents();
void Close() {
port_ = 0;
mask_ = 0;
close(fd_);
fd_ = -1;
}
void SetPortAndMask(Dart_Port port, intptr_t mask) {
ASSERT(fd_ != -1);
port_ = port;
mask_ = mask;
}
intptr_t fd() { return fd_; }
Dart_Port port() { return port_; }
// Returns true if the last token was taken.
bool TakeToken() {
ASSERT(tokens_ > 0);
tokens_--;
return tokens_ == 0;
}
// Returns true if the tokens was 0 before adding.
bool ReturnToken() {
ASSERT(tokens_ >= 0);
tokens_++;
return tokens_ == 1;
}
private:
intptr_t fd_;
Dart_Port port_;
intptr_t mask_;
int tokens_;
};
class EventHandlerImplementation {
public:
EventHandlerImplementation();
~EventHandlerImplementation();
void Notify(intptr_t id, Dart_Port dart_port, int64_t data);
// Gets the socket data structure for a given file
// descriptor. Creates a new one if one is not found.
SocketData* GetSocketData(intptr_t fd);
void SendData(intptr_t id, Dart_Port dart_port, int64_t data);
void Start(EventHandler* handler);
void Shutdown();
private:
void HandleEvents(struct epoll_event* events, int size);
static void Poll(uword args);
void WakeupHandler(intptr_t id, Dart_Port dart_port, int64_t data);
void HandleInterruptFd();
void SetPort(intptr_t fd, Dart_Port dart_port, intptr_t mask);
intptr_t GetPollEvents(intptr_t events);
intptr_t GetPollEvents(intptr_t events, SocketData* sd);
static void* GetHashmapKeyFromFd(intptr_t fd);
static uint32_t GetHashmapHashFromFd(intptr_t fd);
HashMap socket_map_;
TimeoutQueue timeout_queue_;
bool shutdown_;
int interrupt_fds_[2];
int epoll_fd_;
int timer_fd_;
Mutex timer_mutex_;
};
} // namespace bin

View file

@ -47,26 +47,11 @@ bool SocketData::HasWriteEvent() {
static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) {
if (!sd->tracked_by_kqueue()) return;
static const intptr_t kMaxChanges = 2;
intptr_t changes = 0;
struct kevent events[kMaxChanges];
if (sd->HasReadEvent()) {
EV_SET(events + changes, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
++changes;
}
if (sd->HasWriteEvent()) {
EV_SET(events + changes, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
++changes;
}
ASSERT(changes > 0);
ASSERT(changes <= kMaxChanges);
int status =
TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
if (status == -1) {
const int kBufferSize = 1024;
char error_message[kBufferSize];
strerror_r(errno, error_message, kBufferSize);
FATAL1("Failed deleting events from kqueue: %s\n", error_message);
}
EV_SET(events, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
VOID_TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, 1, NULL, 0, NULL));
EV_SET(events, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
VOID_TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, 1, NULL, 0, NULL));
sd->set_tracked_by_kqueue(false);
}
@ -74,6 +59,7 @@ static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) {
// Update the kqueue registration for SocketData structure to reflect
// the events currently of interest.
static void AddToKqueue(intptr_t kqueue_fd_, SocketData* sd) {
ASSERT(!sd->tracked_by_kqueue());
static const intptr_t kMaxChanges = 2;
intptr_t changes = 0;
struct kevent events[kMaxChanges];
@ -102,7 +88,7 @@ static void AddToKqueue(intptr_t kqueue_fd_, SocketData* sd) {
ASSERT(changes > 0);
ASSERT(changes <= kMaxChanges);
int status =
TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
TEMP_FAILURE_RETRY(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
if (status == -1) {
// kQueue does not accept the file descriptor. It could be due to
// already closed file descriptor, or unuspported devices, such
@ -152,8 +138,7 @@ EventHandlerImplementation::~EventHandlerImplementation() {
}
SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd,
bool* is_new) {
SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
ASSERT(fd >= 0);
HashMap::Entry* entry = socket_map_.Lookup(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
@ -164,7 +149,6 @@ SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd,
// new SocketData for the file descriptor is inserted.
sd = new SocketData(fd);
entry->value = sd;
*is_new = true;
}
ASSERT(fd == sd->fd());
return sd;
@ -203,19 +187,15 @@ void EventHandlerImplementation::HandleInterruptFd() {
} else if (msg[i].id == kShutdownId) {
shutdown_ = true;
} else {
bool is_new = false;
SocketData* sd = GetSocketData(msg[i].id, &is_new);
if (is_new) {
sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
}
SocketData* sd = GetSocketData(msg[i].id);
if ((msg[i].data & (1 << kShutdownReadCommand)) != 0) {
ASSERT(msg[i].data == (1 << kShutdownReadCommand));
// Close the socket for reading.
sd->ShutdownRead();
shutdown(sd->fd(), SHUT_RD);
} else if ((msg[i].data & (1 << kShutdownWriteCommand)) != 0) {
ASSERT(msg[i].data == (1 << kShutdownWriteCommand));
// Close the socket for writing.
sd->ShutdownWrite();
shutdown(sd->fd(), SHUT_WR);
} else if ((msg[i].data & (1 << kCloseCommand)) != 0) {
ASSERT(msg[i].data == (1 << kCloseCommand));
// Close the socket and free system resources.
@ -225,10 +205,17 @@ void EventHandlerImplementation::HandleInterruptFd() {
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
delete sd;
DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
} else {
if (is_new) {
} else if ((msg[i].data & (1 << kReturnTokenCommand)) != 0) {
if (sd->ReturnToken()) {
AddToKqueue(kqueue_fd_, sd);
}
} else {
// Setup events to wait for.
ASSERT((msg[i].data > 0) && (msg[i].data < kIntptrMax));
ASSERT(sd->port() == 0);
sd->SetPortAndMask(msg[i].dart_port,
static_cast<intptr_t>(msg[i].data));
AddToKqueue(kqueue_fd_, sd);
}
}
}
@ -320,6 +307,10 @@ void EventHandlerImplementation::HandleEvents(struct kevent* events,
SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata);
intptr_t event_mask = GetEvents(events + i, sd);
if (event_mask != 0) {
if (sd->TakeToken()) {
// Took last token, remove from epoll.
RemoveFromKqueue(kqueue_fd_, sd);
}
Dart_Port port = sd->port();
ASSERT(port != 0);
DartUtils::PostInt32(port, event_mask);
@ -408,13 +399,13 @@ void EventHandlerImplementation::Start(EventHandler* handler) {
void EventHandlerImplementation::Shutdown() {
Notify(kShutdownId, 0, 0);
SendData(kShutdownId, 0, 0);
}
void EventHandlerImplementation::Notify(intptr_t id,
Dart_Port dart_port,
int64_t data) {
void EventHandlerImplementation::SendData(intptr_t id,
Dart_Port dart_port,
int64_t data) {
WakeupHandler(id, dart_port, data);
}

View file

@ -10,7 +10,6 @@
#endif
#include <unistd.h>
#include <errno.h>
#include <sys/event.h> // NOLINT
#include <sys/socket.h>
@ -34,21 +33,14 @@ class SocketData {
: fd_(fd),
port_(0),
mask_(0),
tracked_by_kqueue_(false) {
tracked_by_kqueue_(false),
tokens_(8) {
ASSERT(fd_ != -1);
}
bool HasReadEvent();
bool HasWriteEvent();
void ShutdownRead() {
shutdown(fd_, SHUT_RD);
}
void ShutdownWrite() {
shutdown(fd_, SHUT_WR);
}
void Close() {
port_ = 0;
mask_ = 0;
@ -57,7 +49,6 @@ class SocketData {
}
bool IsListeningSocket() { return (mask_ & (1 << kListeningSocket)) != 0; }
bool IsPipe() { return (mask_ & (1 << kPipe)) != 0; }
void SetPortAndMask(Dart_Port port, intptr_t mask) {
ASSERT(fd_ != -1);
@ -73,11 +64,24 @@ class SocketData {
tracked_by_kqueue_ = value;
}
// Returns true if the last token was taken.
bool TakeToken() {
tokens_--;
return tokens_ == 0;
}
// Returns true if the tokens was 0 before adding.
bool ReturnToken() {
tokens_++;
return tokens_ == 1;
}
private:
intptr_t fd_;
Dart_Port port_;
intptr_t mask_;
bool tracked_by_kqueue_;
int tokens_;
};
@ -88,8 +92,8 @@ class EventHandlerImplementation {
// Gets the socket data structure for a given file
// descriptor. Creates a new one if one is not found.
SocketData* GetSocketData(intptr_t fd, bool* is_new);
void Notify(intptr_t id, Dart_Port dart_port, int64_t data);
SocketData* GetSocketData(intptr_t fd);
void SendData(intptr_t id, Dart_Port dart_port, int64_t data);
void Start(EventHandler* handler);
void Shutdown();

View file

@ -373,9 +373,7 @@ bool DirectoryWatchHandle::IssueRead() {
// we create it.
if (pending_read_ != NULL) return true;
OverlappedBuffer* buffer = OverlappedBuffer::AllocateReadBuffer(kBufferSize);
ASSERT(completion_port_ != INVALID_HANDLE_VALUE);
BOOL ok = ReadDirectoryChangesW(handle_,
buffer->GetBufferStart(),
buffer->GetBufferSize(),
@ -1203,9 +1201,9 @@ int64_t EventHandlerImplementation::GetTimeout() {
}
void EventHandlerImplementation::Notify(intptr_t id,
Dart_Port dart_port,
int64_t data) {
void EventHandlerImplementation::SendData(intptr_t id,
Dart_Port dart_port,
int64_t data) {
InterruptMessage* msg = new InterruptMessage;
msg->id = id;
msg->dart_port = dart_port;
@ -1289,7 +1287,7 @@ void EventHandlerImplementation::Start(EventHandler* handler) {
void EventHandlerImplementation::Shutdown() {
Notify(kShutdownId, 0, 0);
SendData(kShutdownId, 0, 0);
}
} // namespace bin

View file

@ -485,7 +485,7 @@ class EventHandlerImplementation {
EventHandlerImplementation();
virtual ~EventHandlerImplementation();
void Notify(intptr_t id, Dart_Port dart_port, int64_t data);
void SendData(intptr_t id, Dart_Port dart_port, int64_t data);
void Start(EventHandler* handler);
void Shutdown();

View file

@ -323,7 +323,7 @@ intptr_t Socket::CreateBindDatagram(
bind(fd,
&addr->addr,
SocketAddress::GetAddrLength(addr))) < 0) {
TEMP_FAILURE_RETRY_BLOCK_SIGNALS(close(fd));
VOID_TEMP_FAILURE_RETRY_BLOCK_SIGNALS(close(fd));
return -1;
}
return fd;
@ -392,12 +392,12 @@ intptr_t ServerSocket::CreateBindListen(RawAddr addr,
if (fd < 0) return -1;
int optval = 1;
TEMP_FAILURE_RETRY_BLOCK_SIGNALS(
VOID_TEMP_FAILURE_RETRY_BLOCK_SIGNALS(
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
if (addr.ss.ss_family == AF_INET6) {
optval = v6_only ? 1 : 0;
TEMP_FAILURE_RETRY_BLOCK_SIGNALS(
VOID_TEMP_FAILURE_RETRY_BLOCK_SIGNALS(
setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)));
}

View file

@ -355,8 +355,9 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
static const int CLOSE_COMMAND = 8;
static const int SHUTDOWN_READ_COMMAND = 9;
static const int SHUTDOWN_WRITE_COMMAND = 10;
static const int RETURN_TOKEN_COMMAND = 11;
static const int FIRST_COMMAND = CLOSE_COMMAND;
static const int LAST_COMMAND = SHUTDOWN_WRITE_COMMAND;
static const int LAST_COMMAND = RETURN_TOKEN_COMMAND;
// Type flag send to the eventhandler providing additional
// information on the type of the file descriptor.
@ -817,6 +818,9 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
}
}
}
if (eventPort != null && !isClosing && !isClosed) {
sendToEventHandler(1 << RETURN_TOKEN_COMMAND);
}
}
void setHandlers({read, write, error, closed, destroyed}) {
@ -832,7 +836,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
sendWriteEvents = write;
if (read) issueReadEvent();
if (write) issueWriteEvent();
if (eventPort == null) {
if (eventPort == null && !isClosing && !isClosed) {
int flags = typeFlags & TYPE_TYPE_MASK;
if (!isClosedRead) flags |= 1 << READ_EVENT;
if (!isClosedWrite) flags |= 1 << WRITE_EVENT;
@ -889,7 +893,7 @@ class _NativeSocket extends NativeFieldWrapperClass1 {
}
void sendToEventHandler(int data) {
assert(!isClosed);
assert(!isClosing);
connectToEventHandler();
_EventHandler._sendData(this, eventPort, data);
}