Revert "Introduce optional 'bool shared' parameter to ServerSocket.bind() ..."

Review URL: https://codereview.chromium.org//896213002

git-svn-id: https://dart.googlecode.com/svn/branches/bleeding_edge/dart@43476 260f80e4-7a28-3924-810f-c04153c831b5
This commit is contained in:
kustermann@google.com 2015-02-04 16:00:21 +00:00
parent e8cef2910e
commit 1d6231bacd
18 changed files with 681 additions and 1390 deletions

View file

@ -12,6 +12,8 @@
namespace dart {
namespace bin {
static const intptr_t kTimerId = -1;
static const intptr_t kInvalidId = -2;
void TimeoutQueue::UpdateTimeout(Dart_Port port, int64_t timeout) {
// Find port if present.
@ -83,7 +85,7 @@ EventHandlerImplementation* EventHandler::delegate() {
*/
void FUNCTION_NAME(EventHandler_SendData)(Dart_NativeArguments args) {
Dart_Handle sender = Dart_GetNativeArgument(args, 0);
intptr_t id;
intptr_t id = kInvalidId;
if (Dart_IsNull(sender)) {
id = kTimerId;
} else {

View file

@ -6,11 +6,8 @@
#define BIN_EVENTHANDLER_H_
#include "bin/builtin.h"
#include "bin/dartutils.h"
#include "bin/isolate_data.h"
#include "platform/hashmap.h"
namespace dart {
namespace bin {
@ -32,11 +29,6 @@ enum MessageFlags {
kPipe = 17,
};
#define EVENT_MASK ((1 << kInEvent) | \
(1 << kOutEvent) | \
(1 << kErrorEvent) | \
(1 << kCloseEvent) | \
(1 << kDestroyedEvent))
#define COMMAND_MASK ((1 << kCloseCommand) | \
(1 << kShutdownReadCommand) | \
(1 << kShutdownWriteCommand) | \
@ -102,436 +94,6 @@ class TimeoutQueue {
Timeout* timeouts_;
};
template<typename T>
class CircularLinkedList {
public:
CircularLinkedList() : head_(NULL) {}
// Returns true if the list was empty.
bool Add(T t) {
Entry* e = new Entry(t);
if (head_ == NULL) {
// Empty list, make e head, and point to itself.
e->next_ = e;
e->prev_ = e;
head_ = e;
return true;
} else {
// Insert e as the last element in the list.
e->prev_ = head_->prev_;
e->next_ = head_;
e->prev_->next_ = e;
head_->prev_ = e;
return false;
}
}
void RemoveHead() {
Entry* e = head_;
if (e->next_ == e) {
head_ = NULL;
} else {
e->prev_->next_ = e->next_;
e->next_->prev_ = e->prev_;
head_ = e->next_;
}
delete e;
}
void Remove(T item) {
if (head_ == NULL) {
return;
} else if (head_ == head_->next_) {
if (head_->t == item) {
head_ = NULL;
return;
}
} else {
Entry *current = head_;
do {
if (current->t == item) {
Entry *next = current->next_;
Entry *prev = current->prev_;
prev->next_ = next;
next->prev_ = prev;
delete current;
return;
}
} while (current != head_);
}
}
T head() const { return head_->t; }
bool HasHead() const {
return head_ != NULL;
}
void Rotate() {
head_ = head_->next_;
}
private:
struct Entry {
explicit Entry(const T& t) : t(t) {}
const T t;
Entry* next_;
Entry* prev_;
};
Entry* head_;
};
class DescriptorInfoBase {
public:
explicit DescriptorInfoBase(intptr_t fd) : fd_(fd) {
ASSERT(fd_ != -1);
}
virtual ~DescriptorInfoBase() {}
intptr_t fd() { return fd_; }
// Type of socket.
virtual bool IsListeningSocket() const = 0;
// Ports.
virtual bool SetPortAndMask(Dart_Port port, intptr_t mask) = 0;
virtual bool RemovePort(Dart_Port port) = 0;
// Returns the next port which should be used for sending events to.
virtual Dart_Port NextPort() = 0;
virtual bool HasNextPort() = 0;
// Will post `data` to all known Dart_Ports.
virtual void SendToAll(uintptr_t data) = 0;
// Tokens.
// Returns true if the last token was taken.
virtual bool TakeToken() = 0;
// Returns true if the tokens was 0 before adding.
virtual bool ReturnTokens(Dart_Port port, int count) = 0;
// Returns true if for any registired Dart_port tokens are available.
virtual bool HasTokens() const = 0;
// Other.
virtual intptr_t Mask() = 0;
virtual void Close() = 0;
protected:
intptr_t fd_;
};
// Describes a OS descriptor (e.g. file descriptor on linux or HANDLE on
// windows) which is connected to a single Dart_Port.
//
// Subclasses of this class can be e.g. connected tcp sockets
template<typename SI>
class DescriptorInfoSingleMixin : public SI {
public:
explicit DescriptorInfoSingleMixin(intptr_t fd)
: SI(fd), port_(0), tokens_(16), mask_(0) {}
virtual ~DescriptorInfoSingleMixin() { }
virtual bool IsListeningSocket() const { return false; }
virtual bool SetPortAndMask(Dart_Port port, intptr_t mask) {
ASSERT(port_ == 0 || port == port_);
port_ = port;
mask_ = mask;
return true;
}
virtual bool RemovePort(Dart_Port port) {
// TODO(kustermann): Find out where we call RemovePort() with the invalid
// port. Afterwards remove the part in the ASSERT here.
ASSERT(port_ == 0 || port_ == port);
port_ = 0;
return true;
}
virtual Dart_Port NextPort() {
ASSERT(port_ != 0);
return port_;
}
virtual bool HasNextPort() {
return port_ != 0;
}
virtual void SendToAll(uintptr_t data) {
if (port_ != 0) {
DartUtils::PostInt32(port_, data);
}
}
virtual bool TakeToken() {
ASSERT(tokens_ > 0);
tokens_--;
return tokens_ == 0;
}
virtual bool ReturnTokens(Dart_Port port, int count) {
ASSERT(port_ == port);
ASSERT(tokens_ >= 0);
bool was_empty = tokens_ == 0;
tokens_ += count;
return was_empty;
}
virtual bool HasTokens() const { return tokens_ > 0; }
virtual intptr_t Mask() {
return mask_;
}
virtual void Close() {
SI::Close();
}
private:
Dart_Port port_;
int tokens_;
intptr_t mask_;
};
// Describes a OS descriptor (e.g. file descriptor on linux or HANDLE on
// windows) which is connected to multiple Dart_Port's.
//
// Subclasses of this class can be e.g. a listening socket which multiple
// isolates are listening on.
template<typename SI>
class DescriptorInfoMultipleMixin : public SI {
private:
static const int kTokenCount = 4;
static bool SamePortValue(void* key1, void* key2) {
return reinterpret_cast<Dart_Port>(key1) ==
reinterpret_cast<Dart_Port>(key2);
}
static uint32_t GetHashmapHashFromPort(Dart_Port port) {
return static_cast<uint32_t>(port & 0xFFFFFFFF);
}
static void* GetHashmapKeyFromPort(Dart_Port port) {
return reinterpret_cast<void*>(port);
}
static bool IsReadingMask(intptr_t mask) {
if (mask == (1 << kInEvent)) {
return true;
} else {
ASSERT(mask == 0);
return false;
}
}
struct PortEntry {
Dart_Port dart_port;
intptr_t is_reading;
intptr_t token_count;
bool IsReady() { return token_count > 0 && is_reading; }
};
public:
explicit DescriptorInfoMultipleMixin(intptr_t fd)
: SI(fd), tokens_map_(&SamePortValue, 4) {}
virtual ~DescriptorInfoMultipleMixin() {}
virtual bool IsListeningSocket() const { return true; }
virtual bool SetPortAndMask(Dart_Port port, intptr_t mask) {
bool was_empty = !active_readers_.HasHead();
HashMap::Entry* entry = tokens_map_.Lookup(
GetHashmapKeyFromPort(port), GetHashmapHashFromPort(port), true);
PortEntry* pentry;
if (entry->value == NULL) {
pentry = new PortEntry();
pentry->dart_port = port;
pentry->token_count = kTokenCount;
pentry->is_reading = IsReadingMask(mask);
entry->value = reinterpret_cast<void*>(pentry);
if (pentry->IsReady()) {
active_readers_.Add(pentry);
}
} else {
pentry = reinterpret_cast<PortEntry*>(entry->value);
bool was_ready = pentry->IsReady();
pentry->is_reading = IsReadingMask(mask);
bool is_ready = pentry->IsReady();
if (was_ready && !is_ready) {
active_readers_.Remove(pentry);
} else if (!was_ready && is_ready) {
active_readers_.Add(pentry);
}
}
#ifdef DEBUG
// To ensure that all readers are ready.
PortEntry* root = reinterpret_cast<PortEntry*>(active_readers_.head());
int ready_count = 0;
if (root != NULL) {
PortEntry* current = root;
do {
ASSERT(current->IsReady());
ready_count++;
active_readers_.Rotate();
current = active_readers_.head();
} while (current != root);
}
for (HashMap::Entry *entry = tokens_map_.Start();
entry != NULL;
entry = tokens_map_.Next(entry)) {
PortEntry* pentry = reinterpret_cast<PortEntry*>(entry->value);
if (pentry->IsReady()) {
ready_count--;
}
}
// Ensure all ready items are in `active_readers_`.
ASSERT(ready_count == 0);
#endif
return was_empty && active_readers_.HasHead();
}
virtual bool RemovePort(Dart_Port port) {
HashMap::Entry* entry = tokens_map_.Lookup(
GetHashmapKeyFromPort(port), GetHashmapHashFromPort(port), false);
if (entry != NULL) {
PortEntry* pentry = reinterpret_cast<PortEntry*>(entry->value);
if (pentry->IsReady()) {
active_readers_.Remove(pentry);
}
tokens_map_.Remove(
GetHashmapKeyFromPort(port), GetHashmapHashFromPort(port));
delete pentry;
} else {
// NOTE: This is a listening socket which has been immediately closed.
//
// If a listening socket is not listened on, the event handler does not
// know about it beforehand. So the first time the event handler knows
// about it, is when it is supposed to be closed. We therefore do nothing
// here.
//
// But whether to close it, depends on whether other isolates have it open
// as well or not.
}
return !active_readers_.HasHead();
}
virtual Dart_Port NextPort() {
ASSERT(active_readers_.HasHead());
PortEntry* pentry = reinterpret_cast<PortEntry*>(active_readers_.head());
return pentry->dart_port;
}
virtual bool HasNextPort() {
return active_readers_.HasHead();
}
virtual void SendToAll(uintptr_t data) {
for (HashMap::Entry *entry = tokens_map_.Start();
entry != NULL;
entry = tokens_map_.Next(entry)) {
PortEntry* pentry = reinterpret_cast<PortEntry*>(entry->value);
DartUtils::PostInt32(pentry->dart_port, data);
}
}
virtual bool TakeToken() {
ASSERT(active_readers_.HasHead());
PortEntry* pentry = reinterpret_cast<PortEntry*>(active_readers_.head());
ASSERT(pentry->token_count > 0);
pentry->token_count--;
if (pentry->token_count == 0) {
active_readers_.RemoveHead();
return !active_readers_.HasHead();
} else {
active_readers_.Rotate();
return false;
}
}
virtual bool ReturnTokens(Dart_Port port, int count) {
HashMap::Entry* entry = tokens_map_.Lookup(
GetHashmapKeyFromPort(port), GetHashmapHashFromPort(port), false);
ASSERT(entry != NULL);
PortEntry* pentry = reinterpret_cast<PortEntry*>(entry->value);
pentry->token_count += count;
if (pentry->token_count == count && pentry->IsReady()) {
bool was_empty = !active_readers_.HasHead();
active_readers_.Add(pentry);
return was_empty;
}
return false;
}
virtual bool HasTokens() const {
return active_readers_.HasHead();
}
virtual intptr_t Mask() {
if (active_readers_.HasHead()) {
return 1 << kInEvent;
}
return 0;
}
virtual void Close() {
SI::Close();
}
private:
// The [Dart_Port]s which are not paused (i.e. are interested in read events,
// i.e. `mask == (1 << kInEvent)`) and we have enough tokens to communicate
// with them.
CircularLinkedList<PortEntry *> active_readers_;
// A convenience mapping:
// Dart_Port -> struct PortEntry { dart_port, mask, token_count }
HashMap tokens_map_;
};
class InterruptMessage {
public:
intptr_t id;
Dart_Port dart_port;
int64_t data;
};
static const int kInterruptMessageSize = sizeof(InterruptMessage);
static const int kInfinityTimeout = -1;
static const int kTimerId = -1;
static const int kShutdownId = -2;
} // namespace bin
} // namespace dart

View file

@ -6,7 +6,6 @@
#if defined(TARGET_OS_ANDROID)
#include "bin/eventhandler.h"
#include "bin/eventhandler_android.h"
#include <errno.h> // NOLINT
#include <pthread.h> // NOLINT
@ -20,8 +19,6 @@
#include "bin/dartutils.h"
#include "bin/fdutils.h"
#include "bin/log.h"
#include "bin/lockers.h"
#include "bin/socket.h"
#include "bin/thread.h"
#include "bin/utils.h"
#include "platform/hashmap.h"
@ -37,52 +34,52 @@
namespace dart {
namespace bin {
static const int kInterruptMessageSize = sizeof(InterruptMessage);
static const int kInfinityTimeout = -1;
static const int kTimerId = -1;
static const int kShutdownId = -2;
intptr_t DescriptorInfo::GetPollEvents() {
intptr_t SocketData::GetPollEvents() {
// Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
// triggered anyway.
intptr_t events = 0;
if ((Mask() & (1 << kInEvent)) != 0) {
if ((mask_ & (1 << kInEvent)) != 0) {
events |= EPOLLIN;
}
if ((Mask() & (1 << kOutEvent)) != 0) {
if ((mask_ & (1 << kOutEvent)) != 0) {
events |= EPOLLOUT;
}
return events;
}
// Unregister the file descriptor for a DescriptorInfo structure with
// epoll.
static void RemoveFromEpollInstance(intptr_t epoll_fd_,
DescriptorInfo* di) {
// Unregister the file descriptor for a SocketData structure with epoll.
static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_,
EPOLL_CTL_DEL,
di->fd(),
sd->fd(),
NULL));
}
static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
struct epoll_event event;
event.events = EPOLLRDHUP | di->GetPollEvents();
if (!di->IsListeningSocket()) {
event.events = EPOLLRDHUP | sd->GetPollEvents();
if (!sd->IsListeningSocket()) {
event.events |= EPOLLET;
}
event.data.ptr = di;
event.data.ptr = sd;
int status = NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
di->fd(),
sd->fd(),
&event));
if (status == -1) {
// TODO(kustermann): Verify that the dart end is handling this correctly &
// adapt this code to work for multiple listening sockets.
// Epoll does not accept the file descriptor. It could be due to
// already closed file descriptor, or unuspported devices, such
// as /dev/null. In such case, mark the file descriptor as closed,
// so dart will handle it accordingly.
DartUtils::PostInt32(di->NextPort(), 1 << kCloseEvent);
DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
}
}
@ -103,7 +100,7 @@ EventHandlerImplementation::EventHandlerImplementation()
static const int kEpollInitialSize = 64;
epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize));
if (epoll_fd_ == -1) {
FATAL1("Failed creating epoll file descriptor: %i", errno);
FATAL("Failed creating epoll file descriptor");
}
FDUtils::SetCloseOnExec(epoll_fd_);
// Register the interrupt_fd with the epoll instance.
@ -111,9 +108,9 @@ EventHandlerImplementation::EventHandlerImplementation()
event.events = EPOLLIN;
event.data.ptr = NULL;
int status = NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
interrupt_fds_[0],
&event));
EPOLL_CTL_ADD,
interrupt_fds_[0],
&event));
if (status == -1) {
FATAL("Failed adding interrupt fd to epoll instance");
}
@ -121,32 +118,25 @@ EventHandlerImplementation::EventHandlerImplementation()
EventHandlerImplementation::~EventHandlerImplementation() {
VOID_TEMP_FAILURE_RETRY(close(epoll_fd_));
VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[0]));
VOID_TEMP_FAILURE_RETRY(close(interrupt_fds_[1]));
}
DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
intptr_t fd, bool is_listening) {
SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
ASSERT(fd >= 0);
HashMap::Entry* entry = socket_map_.Lookup(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
ASSERT(entry != NULL);
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(entry->value);
if (di == NULL) {
SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
if (sd == NULL) {
// If there is no data in the hash map for this file descriptor a
// new DescriptorInfo for the file descriptor is inserted.
if (is_listening) {
di = new DescriptorInfoMultiple(fd);
} else {
di = new DescriptorInfoSingle(fd);
}
entry->value = di;
// new SocketData for the file descriptor is inserted.
sd = new SocketData(fd);
entry->value = sd;
}
ASSERT(fd == di->fd());
return di;
ASSERT(fd == sd->fd());
return sd;
}
@ -167,7 +157,7 @@ void EventHandlerImplementation::WakeupHandler(intptr_t id,
if (result == -1) {
perror("Interrupt message failure:");
}
FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
FATAL1("Interrupt message failure. Wrote %d bytes.", result);
}
}
@ -183,63 +173,36 @@ void EventHandlerImplementation::HandleInterruptFd() {
} else if (msg[i].id == kShutdownId) {
shutdown_ = true;
} else {
DescriptorInfo* di = GetDescriptorInfo(
msg[i].id, (msg[i].data & (1 << kListeningSocket)) != 0);
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for reading.
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
} else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for writing.
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
} else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
// Close the socket and free system resources and move on to next
// message.
bool no_more_listeners = di->RemovePort(msg[i].dart_port);
if (no_more_listeners) {
RemoveFromEpollInstance(epoll_fd_, di);
}
SocketData* sd = GetSocketData(msg[i].id);
intptr_t fd = di->fd();
if (di->IsListeningSocket()) {
// We only close the socket file descriptor from the operating
// system if there are no other dart socket objects which
// are listening on the same (address, port) combination.
{
MutexLocker ml(globalTcpListeningSocketRegistry.mutex());
if (globalTcpListeningSocketRegistry.CloseSafe(fd)) {
socket_map_.Remove(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
}
}
} else {
ASSERT(no_more_listeners);
socket_map_.Remove(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
}
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
// Close the socket for reading.
shutdown(sd->fd(), SHUT_RD);
} else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
// Close the socket for writing.
shutdown(sd->fd(), SHUT_WR);
} else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
// Close the socket and free system resources and move on to
// next message.
RemoveFromEpollInstance(epoll_fd_, sd);
intptr_t fd = sd->fd();
sd->Close();
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
delete sd;
DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
} else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
int count = TOKEN_COUNT(msg[i].data);
if (di->ReturnTokens(msg[i].dart_port, count)) {
AddToEpollInstance(epoll_fd_, di);
for (int i = 0; i < count; i++) {
if (sd->ReturnToken()) {
AddToEpollInstance(epoll_fd_, sd);
}
}
} else {
ASSERT_NO_COMMAND(msg[i].data);
bool had_listeners = di->HasNextPort();
di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
bool has_listeners = di->HasNextPort();
// Add/Remove from epoll set depending on previous and current state.
if (!had_listeners && has_listeners) {
AddToEpollInstance(epoll_fd_, di);
} else if (had_listeners && !has_listeners) {
RemoveFromEpollInstance(epoll_fd_, di);
}
// Setup events to wait for.
sd->SetPortAndMask(msg[i].dart_port, msg[i].data);
AddToEpollInstance(epoll_fd_, sd);
}
}
}
@ -266,9 +229,9 @@ static void PrintEventMask(intptr_t fd, intptr_t events) {
#endif
intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
DescriptorInfo* di) {
SocketData* sd) {
#ifdef DEBUG_POLL
PrintEventMask(di->fd(), events);
PrintEventMask(sd->fd(), events);
#endif
if (events & EPOLLERR) {
// Return error only if EPOLLIN is present.
@ -289,16 +252,15 @@ void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
if (events[i].data.ptr == NULL) {
interrupt_seen = true;
} else {
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
intptr_t event_mask = GetPollEvents(events[i].events, di);
SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr);
intptr_t event_mask = GetPollEvents(events[i].events, sd);
if (event_mask != 0) {
Dart_Port port = di->NextPort();
ASSERT(port != 0);
if (di->TakeToken()) {
if (sd->TakeToken()) {
// Took last token, remove from epoll.
RemoveFromEpollInstance(epoll_fd_, di);
RemoveFromEpollInstance(epoll_fd_, sd);
}
Dart_Port port = sd->port();
ASSERT(port != 0);
DartUtils::PostInt32(port, event_mask);
}
}
@ -337,32 +299,31 @@ void EventHandlerImplementation::Poll(uword args) {
ThreadSignalBlocker signal_blocker(SIGPROF);
static const intptr_t kMaxEvents = 16;
struct epoll_event events[kMaxEvents];
EventHandler* handler = reinterpret_cast<EventHandler*>(args);
EventHandlerImplementation* handler_impl = &handler->delegate_;
ASSERT(handler_impl != NULL);
while (!handler_impl->shutdown_) {
int64_t millis = handler_impl->GetTimeout();
EventHandlerImplementation* handler =
reinterpret_cast<EventHandlerImplementation*>(args);
ASSERT(handler != NULL);
while (!handler->shutdown_) {
int64_t millis = handler->GetTimeout();
ASSERT(millis == kInfinityTimeout || millis >= 0);
if (millis > kMaxInt32) millis = kMaxInt32;
intptr_t result = TEMP_FAILURE_RETRY_NO_SIGNAL_BLOCKER(
epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis));
epoll_wait(handler->epoll_fd_, events, kMaxEvents, millis));
ASSERT(EAGAIN == EWOULDBLOCK);
if (result == -1) {
if (errno != EWOULDBLOCK) {
perror("Poll failed");
}
} else {
handler_impl->HandleTimeout();
handler_impl->HandleEvents(events, result);
handler->HandleTimeout();
handler->HandleEvents(events, result);
}
}
delete handler;
}
void EventHandlerImplementation::Start(EventHandler* handler) {
int result = Thread::Start(&EventHandlerImplementation::Poll,
reinterpret_cast<uword>(handler));
reinterpret_cast<uword>(handler));
if (result != 0) {
FATAL1("Failed to start event handler thread %d", result);
}
@ -376,7 +337,7 @@ void EventHandlerImplementation::Shutdown() {
void EventHandlerImplementation::SendData(intptr_t id,
Dart_Port dart_port,
int64_t data) {
intptr_t data) {
WakeupHandler(id, dart_port, data);
}

View file

@ -22,36 +22,60 @@
namespace dart {
namespace bin {
class DescriptorInfo : public DescriptorInfoBase {
class InterruptMessage {
public:
explicit DescriptorInfo(intptr_t fd) : DescriptorInfoBase(fd) { }
intptr_t id;
Dart_Port dart_port;
int64_t data;
};
virtual ~DescriptorInfo() { }
class SocketData {
public:
explicit SocketData(intptr_t fd)
: fd_(fd), port_(0), mask_(0), tokens_(16) {
ASSERT(fd_ != -1);
}
intptr_t GetPollEvents();
virtual void Close() {
void Close() {
port_ = 0;
mask_ = 0;
VOID_TEMP_FAILURE_RETRY(close(fd_));
fd_ = -1;
}
};
void SetPortAndMask(Dart_Port port, intptr_t mask) {
ASSERT(fd_ != -1);
port_ = port;
mask_ = mask;
}
class DescriptorInfoSingle
: public DescriptorInfoSingleMixin<DescriptorInfo> {
public:
explicit DescriptorInfoSingle(intptr_t fd)
: DescriptorInfoSingleMixin(fd) {}
virtual ~DescriptorInfoSingle() {}
};
intptr_t fd() { return fd_; }
Dart_Port port() { return port_; }
bool IsListeningSocket() { return (mask_ & (1 << kListeningSocket)) != 0; }
class DescriptorInfoMultiple
: public DescriptorInfoMultipleMixin<DescriptorInfo> {
public:
explicit DescriptorInfoMultiple(intptr_t fd)
: DescriptorInfoMultipleMixin(fd) {}
virtual ~DescriptorInfoMultiple() {}
// Returns true if the last token was taken.
bool TakeToken() {
ASSERT(tokens_ > 0);
tokens_--;
return tokens_ == 0;
}
// Returns true if the tokens was 0 before adding.
bool ReturnToken() {
ASSERT(tokens_ >= 0);
tokens_++;
return tokens_ == 1;
}
private:
intptr_t fd_;
Dart_Port port_;
intptr_t mask_;
int tokens_;
};
@ -62,8 +86,8 @@ class EventHandlerImplementation {
// Gets the socket data structure for a given file
// descriptor. Creates a new one if one is not found.
DescriptorInfo* GetDescriptorInfo(intptr_t fd, bool is_listening);
void SendData(intptr_t id, Dart_Port dart_port, int64_t data);
SocketData* GetSocketData(intptr_t fd);
void SendData(intptr_t id, Dart_Port dart_port, intptr_t data);
void Start(EventHandler* handler);
void Shutdown();
@ -75,7 +99,7 @@ class EventHandlerImplementation {
void WakeupHandler(intptr_t id, Dart_Port dart_port, int64_t data);
void HandleInterruptFd();
void SetPort(intptr_t fd, Dart_Port dart_port, intptr_t mask);
intptr_t GetPollEvents(intptr_t events, DescriptorInfo* sd);
intptr_t GetPollEvents(intptr_t events, SocketData* sd);
static void* GetHashmapKeyFromFd(intptr_t fd);
static uint32_t GetHashmapHashFromFd(intptr_t fd);

View file

@ -6,7 +6,6 @@
#if defined(TARGET_OS_LINUX)
#include "bin/eventhandler.h"
#include "bin/eventhandler_linux.h"
#include <errno.h> // NOLINT
#include <pthread.h> // NOLINT
@ -21,7 +20,6 @@
#include "bin/dartutils.h"
#include "bin/fdutils.h"
#include "bin/log.h"
#include "bin/lockers.h"
#include "bin/socket.h"
#include "bin/thread.h"
#include "platform/utils.h"
@ -30,52 +28,51 @@
namespace dart {
namespace bin {
static const int kInterruptMessageSize = sizeof(InterruptMessage);
static const int kTimerId = -1;
static const int kShutdownId = -2;
intptr_t DescriptorInfo::GetPollEvents() {
intptr_t SocketData::GetPollEvents() {
// Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
// triggered anyway.
intptr_t events = 0;
if ((Mask() & (1 << kInEvent)) != 0) {
if ((mask_ & (1 << kInEvent)) != 0) {
events |= EPOLLIN;
}
if ((Mask() & (1 << kOutEvent)) != 0) {
if ((mask_ & (1 << kOutEvent)) != 0) {
events |= EPOLLOUT;
}
return events;
}
// Unregister the file descriptor for a DescriptorInfo structure with
// epoll.
static void RemoveFromEpollInstance(intptr_t epoll_fd_,
DescriptorInfo* di) {
// Unregister the file descriptor for a SocketData structure with epoll.
static void RemoveFromEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_,
EPOLL_CTL_DEL,
di->fd(),
sd->fd(),
NULL));
}
static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
static void AddToEpollInstance(intptr_t epoll_fd_, SocketData* sd) {
struct epoll_event event;
event.events = EPOLLRDHUP | di->GetPollEvents();
if (!di->IsListeningSocket()) {
event.events = EPOLLRDHUP | sd->GetPollEvents();
if (!sd->IsListeningSocket()) {
event.events |= EPOLLET;
}
event.data.ptr = di;
event.data.ptr = sd;
int status = NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_,
EPOLL_CTL_ADD,
di->fd(),
sd->fd(),
&event));
if (status == -1) {
// TODO(kustermann): Verify that the dart end is handling this correctly &
// adapt this code to work for multiple listening sockets.
// Epoll does not accept the file descriptor. It could be due to
// already closed file descriptor, or unuspported devices, such
// as /dev/null. In such case, mark the file descriptor as closed,
// so dart will handle it accordingly.
DartUtils::PostInt32(di->NextPort(), 1 << kCloseEvent);
DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
}
}
@ -136,26 +133,25 @@ EventHandlerImplementation::~EventHandlerImplementation() {
}
DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
intptr_t fd, bool is_listening) {
SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd,
bool is_listening) {
ASSERT(fd >= 0);
HashMap::Entry* entry = socket_map_.Lookup(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
ASSERT(entry != NULL);
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(entry->value);
if (di == NULL) {
SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
if (sd == NULL) {
// If there is no data in the hash map for this file descriptor a
// new DescriptorInfo for the file descriptor is inserted.
// new SocketData for the file descriptor is inserted.
if (is_listening) {
di = new DescriptorInfoMultiple(fd);
sd = new ListeningSocketData(fd);
} else {
di = new DescriptorInfoSingle(fd);
sd = new SocketData(fd);
}
entry->value = di;
entry->value = sd;
}
ASSERT(fd == di->fd());
return di;
ASSERT(fd == sd->fd());
return sd;
}
@ -201,62 +197,38 @@ void EventHandlerImplementation::HandleInterruptFd() {
} else if (msg[i].id == kShutdownId) {
shutdown_ = true;
} else {
DescriptorInfo* di = GetDescriptorInfo(
SocketData* sd = GetSocketData(
msg[i].id, (msg[i].data & (1 << kListeningSocket)) != 0);
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
ASSERT(!di->IsListeningSocket());
ASSERT(!sd->IsListeningSocket());
// Close the socket for reading.
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
VOID_NO_RETRY_EXPECTED(shutdown(sd->fd(), SHUT_RD));
} else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
ASSERT(!di->IsListeningSocket());
ASSERT(!sd->IsListeningSocket());
// Close the socket for writing.
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
VOID_NO_RETRY_EXPECTED(shutdown(sd->fd(), SHUT_WR));
} else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
// Close the socket and free system resources and move on to next
// message.
bool no_more_listeners = di->RemovePort(msg[i].dart_port);
if (no_more_listeners) {
RemoveFromEpollInstance(epoll_fd_, di);
}
intptr_t fd = di->fd();
if (di->IsListeningSocket()) {
// We only close the socket file descriptor from the operating
// system if there are no other dart socket objects which
// are listening on the same (address, port) combination.
{
MutexLocker ml(globalTcpListeningSocketRegistry.mutex());
if (globalTcpListeningSocketRegistry.CloseSafe(fd)) {
socket_map_.Remove(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
}
}
} else {
ASSERT(no_more_listeners);
socket_map_.Remove(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
if (sd->RemovePort(msg[i].dart_port)) {
RemoveFromEpollInstance(epoll_fd_, sd);
intptr_t fd = sd->fd();
sd->Close();
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
delete sd;
}
DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
} else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
int count = TOKEN_COUNT(msg[i].data);
if (di->ReturnTokens(msg[i].dart_port, count)) {
AddToEpollInstance(epoll_fd_, di);
if (sd->ReturnToken(msg[i].dart_port, count)) {
AddToEpollInstance(epoll_fd_, sd);
}
} else {
ASSERT_NO_COMMAND(msg[i].data);
bool had_listeners = di->HasNextPort();
di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
bool has_listeners = di->HasNextPort();
// Add/Remove from epoll set depending on previous and current state.
if (!had_listeners && has_listeners) {
AddToEpollInstance(epoll_fd_, di);
} else if (had_listeners && !has_listeners) {
RemoveFromEpollInstance(epoll_fd_, di);
// Setup events to wait for.
if (sd->AddPort(msg[i].dart_port)) {
sd->SetMask(msg[i].data);
AddToEpollInstance(epoll_fd_, sd);
}
}
}
@ -284,9 +256,9 @@ static void PrintEventMask(intptr_t fd, intptr_t events) {
#endif
intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
DescriptorInfo* di) {
SocketData* sd) {
#ifdef DEBUG_POLL
PrintEventMask(di->fd(), events);
PrintEventMask(sd->fd(), events);
#endif
if (events & EPOLLERR) {
// Return error only if EPOLLIN is present.
@ -315,16 +287,15 @@ void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
timeout_queue_.RemoveCurrent();
}
} else {
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
intptr_t event_mask = GetPollEvents(events[i].events, di);
SocketData* sd = reinterpret_cast<SocketData*>(events[i].data.ptr);
intptr_t event_mask = GetPollEvents(events[i].events, sd);
if (event_mask != 0) {
Dart_Port port = di->NextPort();
ASSERT(port != 0);
if (di->TakeToken()) {
Dart_Port port = sd->port();
if (sd->TakeToken()) {
// Took last token, remove from epoll.
RemoveFromEpollInstance(epoll_fd_, di);
RemoveFromEpollInstance(epoll_fd_, sd);
}
ASSERT(port != 0);
DartUtils::PostInt32(port, event_mask);
}
}
@ -362,7 +333,7 @@ void EventHandlerImplementation::Poll(uword args) {
void EventHandlerImplementation::Start(EventHandler* handler) {
int result = Thread::Start(&EventHandlerImplementation::Poll,
reinterpret_cast<uword>(handler));
reinterpret_cast<uword>(handler));
if (result != 0) {
FATAL1("Failed to start event handler thread %d", result);
}

View file

@ -21,36 +21,248 @@
namespace dart {
namespace bin {
class DescriptorInfo : public DescriptorInfoBase {
class InterruptMessage {
public:
explicit DescriptorInfo(intptr_t fd) : DescriptorInfoBase(fd) { }
intptr_t id;
Dart_Port dart_port;
int64_t data;
};
virtual ~DescriptorInfo() { }
template<typename T>
class CircularLinkedList {
public:
CircularLinkedList() : head_(NULL) {}
// Returns true if the list was empty.
bool Add(T t) {
Entry* e = new Entry(t);
if (head_ == NULL) {
// Empty list, make e head, and point to itself.
e->next_ = e;
e->prev_ = e;
head_ = e;
return true;
} else {
// Insert e as the last element in the list.
e->prev_ = head_->prev_;
e->next_ = head_;
e->prev_->next_ = e;
head_->prev_ = e;
return false;
}
}
void RemoveHead() {
Entry* e = head_;
if (e->next_ == e) {
head_ = NULL;
} else {
e->prev_->next_ = e->next_;
e->next_->prev_ = e->prev_;
head_ = e->next_;
}
delete e;
}
T head() const { return head_->t; }
bool HasHead() {
return head_ != NULL;
}
void Rotate() {
head_ = head_->next_;
}
private:
struct Entry {
explicit Entry(const T& t) : t(t) {}
const T t;
Entry* next_;
Entry* prev_;
};
Entry* head_;
};
class ListeningSocketData;
class SocketData {
public:
explicit SocketData(intptr_t fd)
: fd_(fd), port_(0), mask_(0), tokens_(16) {
ASSERT(fd_ != -1);
}
virtual ~SocketData() {
}
intptr_t GetPollEvents();
virtual void Close() {
void Close() {
port_ = 0;
mask_ = 0;
VOID_TEMP_FAILURE_RETRY(close(fd_));
fd_ = -1;
}
void SetMask(intptr_t mask) {
ASSERT(fd_ != -1);
mask_ = mask;
}
intptr_t fd() { return fd_; }
virtual Dart_Port port() { return port_; }
virtual bool IsListeningSocket() const { return false; }
virtual bool AddPort(Dart_Port port) {
ASSERT(port_ == 0);
port_ = port;
return true;
}
virtual bool RemovePort(Dart_Port port) {
ASSERT(port_ == 0 || port_ == port);
return true;
}
// Returns true if the last token was taken.
virtual bool TakeToken() {
ASSERT(tokens_ > 0);
tokens_--;
return tokens_ == 0;
}
// Returns true if the tokens was 0 before adding.
virtual bool ReturnToken(Dart_Port port, int count) {
ASSERT(port_ == port);
ASSERT(tokens_ >= 0);
bool was_empty = tokens_ == 0;
tokens_ += count;
return was_empty;
}
bool HasTokens() const { return tokens_ > 0; }
protected:
intptr_t fd_;
Dart_Port port_;
intptr_t mask_;
int tokens_;
};
class DescriptorInfoSingle
: public DescriptorInfoSingleMixin<DescriptorInfo> {
public:
explicit DescriptorInfoSingle(intptr_t fd)
: DescriptorInfoSingleMixin(fd) {}
virtual ~DescriptorInfoSingle() {}
};
class ListeningSocketData : public SocketData {
private:
static const int kTokenCount = 4;
static bool SamePortValue(void* key1, void* key2) {
return reinterpret_cast<Dart_Port>(key1) ==
reinterpret_cast<Dart_Port>(key2);
}
static uint32_t GetHashmapHashFromPort(Dart_Port port) {
return static_cast<uint32_t>(port & 0xFFFFFFFF);
}
static void* GetHashmapKeyFromPort(Dart_Port port) {
return reinterpret_cast<void*>(port);
}
class DescriptorInfoMultiple
: public DescriptorInfoMultipleMixin<DescriptorInfo> {
public:
explicit DescriptorInfoMultiple(intptr_t fd)
: DescriptorInfoMultipleMixin(fd) {}
virtual ~DescriptorInfoMultiple() {}
explicit ListeningSocketData(intptr_t fd)
: SocketData(fd),
tokens_map_(&SamePortValue, 4) {}
bool IsListeningSocket() const { return true; }
bool AddPort(Dart_Port port) {
HashMap::Entry* entry = tokens_map_.Lookup(
GetHashmapKeyFromPort(port), GetHashmapHashFromPort(port), true);
entry->value = reinterpret_cast<void*>(kTokenCount);
return live_ports_.Add(port);
}
virtual bool RemovePort(Dart_Port port) {
HashMap::Entry* entry = tokens_map_.Lookup(
GetHashmapKeyFromPort(port), GetHashmapHashFromPort(port), false);
if (entry != NULL) {
intptr_t tokens = reinterpret_cast<intptr_t>(entry->value);
if (tokens == 0) {
while (idle_ports_.head() != port) {
idle_ports_.Rotate();
}
idle_ports_.RemoveHead();
} else {
while (live_ports_.head() != port) {
live_ports_.Rotate();
}
live_ports_.RemoveHead();
}
tokens_map_.Remove(
GetHashmapKeyFromPort(port), GetHashmapHashFromPort(port));
} else {
// NOTE: This is a listening socket which has been immediately closed.
//
// If a listening socket is not listened on, the event handler does not
// know about it beforehand. So the first time the event handler knows
// about it, is when it is supposed to be closed. We therefore do nothing
// here.
//
// But whether to close it, depends on whether other isolates have it open
// as well or not.
}
return !live_ports_.HasHead();
}
bool TakeToken() {
ASSERT(live_ports_.HasHead());
Dart_Port port = live_ports_.head();
HashMap::Entry* entry = tokens_map_.Lookup(
GetHashmapKeyFromPort(port), GetHashmapHashFromPort(port), false);
ASSERT(entry != NULL);
intptr_t tokens = reinterpret_cast<intptr_t>(entry->value);
tokens--;
entry->value = reinterpret_cast<void*>(tokens);
if (tokens == 0) {
live_ports_.RemoveHead();
idle_ports_.Add(port);
if (!live_ports_.HasHead()) {
return true;
}
} else {
live_ports_.Rotate();
}
return false;
}
Dart_Port port() { return live_ports_.head(); }
bool ReturnToken(Dart_Port port, int count) {
HashMap::Entry* entry = tokens_map_.Lookup(
GetHashmapKeyFromPort(port), GetHashmapHashFromPort(port), false);
ASSERT(entry != NULL);
intptr_t tokens = reinterpret_cast<intptr_t>(entry->value);
tokens += count;
entry->value = reinterpret_cast<void*>(tokens);
if (tokens == count) {
// Return to live_ports_.
while (idle_ports_.head() != port) {
idle_ports_.Rotate();
}
idle_ports_.RemoveHead();
bool was_empty = !live_ports_.HasHead();
live_ports_.Add(port);
return was_empty;
}
return false;
}
private:
CircularLinkedList<Dart_Port> live_ports_;
CircularLinkedList<Dart_Port> idle_ports_;
HashMap tokens_map_;
};
@ -61,7 +273,7 @@ class EventHandlerImplementation {
// Gets the socket data structure for a given file
// descriptor. Creates a new one if one is not found.
DescriptorInfo* GetDescriptorInfo(intptr_t fd, bool is_listening);
SocketData* GetSocketData(intptr_t fd, bool is_listening);
void SendData(intptr_t id, Dart_Port dart_port, int64_t data);
void Start(EventHandler* handler);
void Shutdown();
@ -72,7 +284,7 @@ class EventHandlerImplementation {
void WakeupHandler(intptr_t id, Dart_Port dart_port, int64_t data);
void HandleInterruptFd();
void SetPort(intptr_t fd, Dart_Port dart_port, intptr_t mask);
intptr_t GetPollEvents(intptr_t events, DescriptorInfo* sd);
intptr_t GetPollEvents(intptr_t events, SocketData* sd);
static void* GetHashmapKeyFromFd(intptr_t fd);
static uint32_t GetHashmapHashFromFd(intptr_t fd);

View file

@ -6,7 +6,6 @@
#if defined(TARGET_OS_MACOS)
#include "bin/eventhandler.h"
#include "bin/eventhandler_macos.h"
#include <errno.h> // NOLINT
#include <pthread.h> // NOLINT
@ -18,9 +17,7 @@
#include "bin/dartutils.h"
#include "bin/fdutils.h"
#include "bin/lockers.h"
#include "bin/log.h"
#include "bin/socket.h"
#include "bin/thread.h"
#include "bin/utils.h"
#include "platform/hashmap.h"
@ -30,61 +27,66 @@
namespace dart {
namespace bin {
static const int kInterruptMessageSize = sizeof(InterruptMessage);
static const int kInfinityTimeout = -1;
static const int kTimerId = -1;
static const int kShutdownId = -2;
bool DescriptorInfo::HasReadEvent() {
return (Mask() & (1 << kInEvent)) != 0;
bool SocketData::HasReadEvent() {
return (mask_ & (1 << kInEvent)) != 0;
}
bool DescriptorInfo::HasWriteEvent() {
return (Mask() & (1 << kOutEvent)) != 0;
bool SocketData::HasWriteEvent() {
return (mask_ & (1 << kOutEvent)) != 0;
}
// Unregister the file descriptor for a SocketData structure with kqueue.
static void RemoveFromKqueue(intptr_t kqueue_fd_, DescriptorInfo* di) {
if (!di->tracked_by_kqueue()) return;
static void RemoveFromKqueue(intptr_t kqueue_fd_, SocketData* sd) {
if (!sd->tracked_by_kqueue()) return;
static const intptr_t kMaxChanges = 2;
struct kevent events[kMaxChanges];
EV_SET(events, di->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
EV_SET(events, sd->fd(), EVFILT_READ, EV_DELETE, 0, 0, NULL);
VOID_NO_RETRY_EXPECTED(kevent(kqueue_fd_, events, 1, NULL, 0, NULL));
EV_SET(events, di->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
EV_SET(events, sd->fd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
VOID_NO_RETRY_EXPECTED(kevent(kqueue_fd_, events, 1, NULL, 0, NULL));
di->set_tracked_by_kqueue(false);
sd->set_tracked_by_kqueue(false);
}
// Update the kqueue registration for SocketData structure to reflect
// the events currently of interest.
static void AddToKqueue(intptr_t kqueue_fd_, DescriptorInfo* di) {
ASSERT(!di->tracked_by_kqueue());
static void AddToKqueue(intptr_t kqueue_fd_, SocketData* sd) {
ASSERT(!sd->tracked_by_kqueue());
static const intptr_t kMaxChanges = 2;
intptr_t changes = 0;
struct kevent events[kMaxChanges];
int flags = EV_ADD;
if (!di->IsListeningSocket()) {
if (!sd->IsListeningSocket()) {
flags |= EV_CLEAR;
}
// Register or unregister READ filter if needed.
if (di->HasReadEvent()) {
if (sd->HasReadEvent()) {
EV_SET(events + changes,
di->fd(),
sd->fd(),
EVFILT_READ,
flags,
0,
0,
di);
sd);
++changes;
}
// Register or unregister WRITE filter if needed.
if (di->HasWriteEvent()) {
if (sd->HasWriteEvent()) {
EV_SET(events + changes,
di->fd(),
sd->fd(),
EVFILT_WRITE,
flags,
0,
0,
di);
sd);
++changes;
}
ASSERT(changes > 0);
@ -92,16 +94,13 @@ static void AddToKqueue(intptr_t kqueue_fd_, DescriptorInfo* di) {
int status =
NO_RETRY_EXPECTED(kevent(kqueue_fd_, events, changes, NULL, 0, NULL));
if (status == -1) {
// TODO(kustermann): Verify that the dart end is handling this correctly &
// adapt this code to work for multiple listening sockets.
// kQueue does not accept the file descriptor. It could be due to
// already closed file descriptor, or unuspported devices, such
// as /dev/null. In such case, mark the file descriptor as closed,
// so dart will handle it accordingly.
DartUtils::PostInt32(di->NextPort(), 1 << kCloseEvent);
DartUtils::PostInt32(sd->port(), 1 << kCloseEvent);
} else {
di->set_tracked_by_kqueue(true);
sd->set_tracked_by_kqueue(true);
}
}
@ -143,26 +142,20 @@ EventHandlerImplementation::~EventHandlerImplementation() {
}
DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
intptr_t fd, bool is_listening) {
SocketData* EventHandlerImplementation::GetSocketData(intptr_t fd) {
ASSERT(fd >= 0);
HashMap::Entry* entry = socket_map_.Lookup(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd), true);
ASSERT(entry != NULL);
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(entry->value);
if (di == NULL) {
SocketData* sd = reinterpret_cast<SocketData*>(entry->value);
if (sd == NULL) {
// If there is no data in the hash map for this file descriptor a
// new DescriptorInfo for the file descriptor is inserted.
if (is_listening) {
di = new DescriptorInfoMultiple(fd);
} else {
di = new DescriptorInfoSingle(fd);
}
entry->value = di;
// new SocketData for the file descriptor is inserted.
sd = new SocketData(fd);
entry->value = sd;
}
ASSERT(fd == di->fd());
return di;
ASSERT(fd == sd->fd());
return sd;
}
@ -198,64 +191,36 @@ void EventHandlerImplementation::HandleInterruptFd() {
} else if (msg[i].id == kShutdownId) {
shutdown_ = true;
} else {
DescriptorInfo* di = GetDescriptorInfo(
msg[i].id, (msg[i].data & (1 << kListeningSocket)) != 0);
SocketData* sd = GetSocketData(msg[i].id);
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for reading.
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
shutdown(sd->fd(), SHUT_RD);
} else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for writing.
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
shutdown(sd->fd(), SHUT_WR);
} else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
// Close the socket and free system resources and move on to next
// message.
bool no_more_listeners = di->RemovePort(msg[i].dart_port);
if (no_more_listeners) {
RemoveFromKqueue(kqueue_fd_, di);
}
intptr_t fd = di->fd();
if (di->IsListeningSocket()) {
// We only close the socket file descriptor from the operating
// system if there are no other dart socket objects which
// are listening on the same (address, port) combination.
{
MutexLocker ml(globalTcpListeningSocketRegistry.mutex());
if (globalTcpListeningSocketRegistry.CloseSafe(fd)) {
socket_map_.Remove(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
}
}
} else {
ASSERT(no_more_listeners);
socket_map_.Remove(
GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
}
// Close the socket and free system resources.
RemoveFromKqueue(kqueue_fd_, sd);
intptr_t fd = sd->fd();
VOID_TEMP_FAILURE_RETRY(close(fd));
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
delete sd;
DartUtils::PostInt32(msg[i].dart_port, 1 << kDestroyedEvent);
} else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
int count = TOKEN_COUNT(msg[i].data);
if (di->ReturnTokens(msg[i].dart_port, count)) {
AddToKqueue(kqueue_fd_, di);
for (int i = 0; i < count; i++) {
if (sd->ReturnToken()) {
AddToKqueue(kqueue_fd_, sd);
}
}
} else {
ASSERT_NO_COMMAND(msg[i].data);
// Setup events to wait for.
ASSERT((msg[i].data > 0) && (msg[i].data < kIntptrMax));
bool had_listeners = di->HasNextPort();
di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
bool has_listeners = di->HasNextPort();
// Add/Remove from epoll set depending on previous and current state.
if (!had_listeners && has_listeners) {
AddToKqueue(kqueue_fd_, di);
} else if (had_listeners && !has_listeners) {
RemoveFromKqueue(kqueue_fd_, di);
}
ASSERT(sd->port() == 0);
sd->SetPortAndMask(msg[i].dart_port,
static_cast<intptr_t>(msg[i].data));
AddToKqueue(kqueue_fd_, sd);
}
}
}
@ -283,12 +248,12 @@ static void PrintEventMask(intptr_t fd, struct kevent* event) {
intptr_t EventHandlerImplementation::GetEvents(struct kevent* event,
DescriptorInfo* di) {
SocketData* sd) {
#ifdef DEBUG_KQUEUE
PrintEventMask(di->fd(), event);
PrintEventMask(sd->fd(), event);
#endif
intptr_t event_mask = 0;
if (di->IsListeningSocket()) {
if (sd->IsListeningSocket()) {
// On a listening socket the READ event means that there are
// connections ready to be accepted.
if (event->filter == EVFILT_READ) {
@ -344,16 +309,15 @@ void EventHandlerImplementation::HandleEvents(struct kevent* events,
if (events[i].udata == NULL) {
interrupt_seen = true;
} else {
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(events[i].udata);
intptr_t event_mask = GetEvents(events + i, di);
SocketData* sd = reinterpret_cast<SocketData*>(events[i].udata);
intptr_t event_mask = GetEvents(events + i, sd);
if (event_mask != 0) {
Dart_Port port = di->NextPort();
ASSERT(port != 0);
if (di->TakeToken()) {
// Took last token, remove from kqueue.
RemoveFromKqueue(kqueue_fd_, di);
if (sd->TakeToken()) {
// Took last token, remove from epoll.
RemoveFromKqueue(kqueue_fd_, sd);
}
Dart_Port port = sd->port();
ASSERT(port != 0);
DartUtils::PostInt32(port, event_mask);
}
}

View file

@ -21,50 +21,62 @@
namespace dart {
namespace bin {
class DescriptorInfo : public DescriptorInfoBase {
class InterruptMessage {
public:
explicit DescriptorInfo(intptr_t fd)
: DescriptorInfoBase(fd), tracked_by_kqueue_(false) { }
intptr_t id;
Dart_Port dart_port;
int64_t data;
};
virtual ~DescriptorInfo() { }
intptr_t GetPollEvents();
virtual void Close() {
VOID_TEMP_FAILURE_RETRY(close(fd_));
fd_ = -1;
class SocketData {
public:
explicit SocketData(intptr_t fd)
: fd_(fd),
port_(0),
mask_(0),
tracked_by_kqueue_(false),
tokens_(16) {
ASSERT(fd_ != -1);
}
bool HasReadEvent();
bool HasWriteEvent();
bool IsListeningSocket() { return (mask_ & (1 << kListeningSocket)) != 0; }
void SetPortAndMask(Dart_Port port, intptr_t mask) {
ASSERT(fd_ != -1);
port_ = port;
mask_ = mask;
}
intptr_t fd() { return fd_; }
Dart_Port port() { return port_; }
intptr_t mask() { return mask_; }
bool tracked_by_kqueue() { return tracked_by_kqueue_; }
void set_tracked_by_kqueue(bool value) {
tracked_by_kqueue_ = value;
}
bool tracked_by_kqueue() { return tracked_by_kqueue_; }
// Returns true if the last token was taken.
bool TakeToken() {
tokens_--;
return tokens_ == 0;
}
bool HasReadEvent();
// Returns true if the tokens was 0 before adding.
bool ReturnToken() {
tokens_++;
return tokens_ == 1;
}
bool HasWriteEvent();
protected:
private:
intptr_t fd_;
Dart_Port port_;
intptr_t mask_;
bool tracked_by_kqueue_;
};
class DescriptorInfoSingle
: public DescriptorInfoSingleMixin<DescriptorInfo> {
public:
explicit DescriptorInfoSingle(intptr_t fd)
: DescriptorInfoSingleMixin(fd) {}
virtual ~DescriptorInfoSingle() {}
};
class DescriptorInfoMultiple
: public DescriptorInfoMultipleMixin<DescriptorInfo> {
public:
explicit DescriptorInfoMultiple(intptr_t fd)
: DescriptorInfoMultipleMixin(fd) {}
virtual ~DescriptorInfoMultiple() {}
int tokens_;
};
@ -75,7 +87,7 @@ class EventHandlerImplementation {
// Gets the socket data structure for a given file
// descriptor. Creates a new one if one is not found.
DescriptorInfo* GetDescriptorInfo(intptr_t fd, bool is_listening);
SocketData* GetSocketData(intptr_t fd);
void SendData(intptr_t id, Dart_Port dart_port, int64_t data);
void Start(EventHandler* handler);
void Shutdown();
@ -88,7 +100,7 @@ class EventHandlerImplementation {
void WakeupHandler(intptr_t id, Dart_Port dart_port, int64_t data);
void HandleInterruptFd();
void SetPort(intptr_t fd, Dart_Port dart_port, intptr_t mask);
intptr_t GetEvents(struct kevent* event, DescriptorInfo* di);
intptr_t GetEvents(struct kevent* event, SocketData* sd);
static void* GetHashmapKeyFromFd(intptr_t fd);
static uint32_t GetHashmapHashFromFd(intptr_t fd);

View file

@ -6,7 +6,6 @@
#if defined(TARGET_OS_WINDOWS)
#include "bin/eventhandler.h"
#include "bin/eventhandler_win.h"
#include <winsock2.h> // NOLINT
#include <ws2tcpip.h> // NOLINT
@ -30,6 +29,10 @@ namespace bin {
static const int kBufferSize = 64 * 1024;
static const int kStdOverlappedBufferSize = 16 * 1024;
static const int kInfinityTimeout = -1;
static const int kTimeoutId = -1;
static const int kShutdownId = -2;
OverlappedBuffer* OverlappedBuffer::AllocateBuffer(int buffer_size,
Operation operation) {
OverlappedBuffer* buffer =
@ -112,9 +115,26 @@ int OverlappedBuffer::GetRemainingLength() {
return data_length_ - index_;
}
Handle::Handle(intptr_t handle)
: DescriptorInfoBase(handle),
handle_(reinterpret_cast<HANDLE>(handle)),
Handle::Handle(HANDLE handle)
: handle_(reinterpret_cast<HANDLE>(handle)),
port_(0),
mask_(0),
completion_port_(INVALID_HANDLE_VALUE),
event_handler_(NULL),
data_ready_(NULL),
pending_read_(NULL),
pending_write_(NULL),
last_error_(NOERROR),
flags_(0) {
InitializeCriticalSection(&cs_);
}
Handle::Handle(HANDLE handle, Dart_Port port)
: handle_(reinterpret_cast<HANDLE>(handle)),
port_(port),
mask_(0),
completion_port_(INVALID_HANDLE_VALUE),
event_handler_(NULL),
data_ready_(NULL),
@ -273,7 +293,7 @@ bool Handle::IssueRead() {
// Completing asynchronously through thread.
pending_read_ = buffer;
int result = Thread::Start(ReadFileThread,
reinterpret_cast<uword>(this));
reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Failed to start read file thread %d", result);
}
@ -319,7 +339,7 @@ bool Handle::IssueSendTo(struct sockaddr* sa, socklen_t sa_len) {
static void HandleClosed(Handle* handle) {
if (!handle->IsClosing()) {
int event_mask = 1 << kCloseEvent;
DartUtils::PostInt32(handle->NextPort(), event_mask);
DartUtils::PostInt32(handle->port(), event_mask);
}
}
@ -327,8 +347,11 @@ static void HandleClosed(Handle* handle) {
static void HandleError(Handle* handle) {
handle->set_last_error(WSAGetLastError());
handle->MarkError();
if (!handle->IsClosing() && handle->HasNextPort()) {
DartUtils::PostInt32(handle->NextPort(), 1 << kErrorEvent);
if (!handle->IsClosing()) {
Dart_Port port = handle->port();
if (port != ILLEGAL_PORT) {
DartUtils::PostInt32(port, 1 << kErrorEvent);
}
}
}
@ -444,7 +467,6 @@ bool ListenSocket::LoadAcceptEx() {
bool ListenSocket::IssueAccept() {
ScopedLock lock(this);
// For AcceptEx there needs to be buffer storage for address
// information for two addresses (local and remote address). The
// AcceptEx documentation says: "This value must be at least 16
@ -493,7 +515,7 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
reinterpret_cast<char*>(&s), sizeof(s));
if (rc == NO_ERROR) {
// Insert the accepted socket into the list.
ClientSocket* client_socket = new ClientSocket(buffer->client());
ClientSocket* client_socket = new ClientSocket(buffer->client(), 0);
client_socket->mark_connected();
client_socket->CreateCompletionPort(completion_port);
if (accepted_head_ == NULL) {
@ -504,7 +526,6 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
accepted_tail_->set_next(client_socket);
accepted_tail_ = client_socket;
}
accepted_count_++;
} else {
closesocket(buffer->client());
}
@ -520,8 +541,11 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
static void DeleteIfClosed(Handle* handle) {
if (handle->IsClosed()) {
handle->SendToAll(1 << kDestroyedEvent);
Dart_Port port = handle->port();
delete handle;
if (port != ILLEGAL_PORT) {
DartUtils::PostInt32(port, 1 << kDestroyedEvent);
}
}
}
@ -550,23 +574,16 @@ bool ListenSocket::CanAccept() {
ClientSocket* ListenSocket::Accept() {
ScopedLock lock(this);
ClientSocket *result = NULL;
if (accepted_head_ != NULL) {
result = accepted_head_;
accepted_head_ = accepted_head_->next();
if (accepted_head_ == NULL) accepted_tail_ = NULL;
result->set_next(NULL);
accepted_count_--;
}
if (accepted_head_ == NULL) return NULL;
ClientSocket* result = accepted_head_;
accepted_head_ = accepted_head_->next();
if (accepted_head_ == NULL) accepted_tail_ = NULL;
result->set_next(NULL);
if (!IsClosing()) {
if (!IssueAccept()) {
HandleError(this);
}
}
return result;
}
@ -863,11 +880,9 @@ void ClientSocket::IssueDisconnect() {
if (ok || WSAGetLastError() != WSA_IO_PENDING) {
DisconnectComplete(buffer);
}
if (HasNextPort()) {
Dart_Port p = NextPort();
DartUtils::PostInt32(p, 1 << kDestroyedEvent);
RemovePort(p);
}
Dart_Port p = port();
if (p != ILLEGAL_PORT) DartUtils::PostInt32(p, 1 << kDestroyedEvent);
port_ = ILLEGAL_PORT;
}
@ -885,14 +900,15 @@ void ClientSocket::ConnectComplete(OverlappedBuffer* buffer) {
OverlappedBuffer::DisposeBuffer(buffer);
// Update socket to support full socket API, after ConnectEx completed.
setsockopt(socket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
if (HasNextPort()) {
Dart_Port p = port();
if (p != ILLEGAL_PORT) {
// If the port is set, we already listen for this socket in Dart.
// Handle the cases here.
if (!IsClosedRead()) {
IssueRead();
}
if (!IsClosedWrite()) {
DartUtils::PostInt32(NextPort(), 1 << kOutEvent);
DartUtils::PostInt32(p, 1 << kOutEvent);
}
}
}
@ -995,62 +1011,42 @@ void DatagramSocket::DoClose() {
void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
ASSERT(this != NULL);
if (msg->id == kTimerId) {
if (msg->id == kTimeoutId) {
// Change of timeout request. Just set the new timeout and port as the
// completion thread will use the new timeout value for its next wait.
timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
} else if (msg->id == kShutdownId) {
shutdown_ = true;
} else {
// No tokens to return on Windows.
if ((msg->data & (1 << kReturnTokenCommand)) != 0) return;
Handle* handle = reinterpret_cast<Handle*>(msg->id);
ASSERT(handle != NULL);
if (handle->is_listen_socket()) {
ListenSocket* listen_socket =
reinterpret_cast<ListenSocket*>(handle);
listen_socket->EnsureInitialized(this);
listen_socket->SetPortAndMask(msg->dart_port, msg->data);
Handle::ScopedLock lock(listen_socket);
// If incoming connections are requested make sure to post already
// accepted connections.
if ((msg->data & (1 << kInEvent)) != 0) {
listen_socket->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
TryDispatchingPendingAccepts(listen_socket);
}
if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
int count = TOKEN_COUNT(msg->data);
listen_socket->ReturnTokens(msg->dart_port, count);
TryDispatchingPendingAccepts(listen_socket);
return;
} else if (IS_COMMAND(msg->data, kCloseCommand)) {
Dart_Port port = msg->dart_port;
listen_socket->RemovePort(port);
MutexLocker locker(globalTcpListeningSocketRegistry.mutex());
if (globalTcpListeningSocketRegistry.CloseSafe(
reinterpret_cast<intptr_t>(listen_socket))) {
handle->Close();
if (listen_socket->CanAccept()) {
int event_mask = (1 << kInEvent);
handle->set_mask(handle->mask() & ~event_mask);
DartUtils::PostInt32(handle->port(), event_mask);
}
DartUtils::PostInt32(port, 1 << kDestroyedEvent);
}
} else {
handle->EnsureInitialized(this);
Handle::ScopedLock lock(handle);
if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
int count = TOKEN_COUNT(msg->data);
handle->ReturnTokens(msg->dart_port, count);
// TODO(kustermann): How can we continue with sending events
// to dart from here?
return;
}
// Only set mask if we turned on kInEvent or kOutEvent.
if ((msg->data & ((1 << kInEvent) | (1 << kOutEvent))) != 0) {
handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
handle->SetPortAndMask(msg->dart_port, msg->data);
}
// Issue a read.
@ -1073,10 +1069,10 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
if (!handle->HasPendingWrite()) {
if (handle->is_client_socket()) {
if (reinterpret_cast<ClientSocket*>(handle)->is_connected()) {
DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent);
DartUtils::PostInt32(handle->port(), 1 << kOutEvent);
}
} else {
DartUtils::PostInt32(handle->NextPort(), 1 << kOutEvent);
DartUtils::PostInt32(handle->port(), 1 << kOutEvent);
}
}
}
@ -1091,12 +1087,13 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
client_socket->Shutdown(SD_SEND);
}
}
if ((msg->data & (1 << kCloseCommand)) != 0) {
handle->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
handle->Close();
}
}
if ((msg->data & (1 << kCloseCommand)) != 0) {
handle->SetPortAndMask(msg->dart_port, msg->data);
handle->Close();
}
DeleteIfClosed(handle);
}
}
@ -1106,27 +1103,14 @@ void EventHandlerImplementation::HandleAccept(ListenSocket* listen_socket,
OverlappedBuffer* buffer) {
listen_socket->AcceptComplete(buffer, completion_port_);
TryDispatchingPendingAccepts(listen_socket);
DeleteIfClosed(listen_socket);
}
void EventHandlerImplementation::TryDispatchingPendingAccepts(
ListenSocket *listen_socket) {
Handle::ScopedLock lock(listen_socket);
if (!listen_socket->IsClosing() && listen_socket->CanAccept()) {
for (int i = 0;
i < listen_socket->accepted_count() && listen_socket->HasNextPort();
i++) {
Dart_Port port = listen_socket->NextPort();
DartUtils::PostInt32(port, 1 << kInEvent);
if (listen_socket->TakeToken()) {
break;
}
if (!listen_socket->IsClosing()) {
int event_mask = 1 << kInEvent;
if ((listen_socket->mask() & event_mask) != 0) {
DartUtils::PostInt32(listen_socket->port(), event_mask);
}
}
DeleteIfClosed(listen_socket);
}
@ -1138,8 +1122,8 @@ void EventHandlerImplementation::HandleRead(Handle* handle,
if (bytes > 0) {
if (!handle->IsClosing()) {
int event_mask = 1 << kInEvent;
if ((handle->Mask() & event_mask) != 0) {
DartUtils::PostInt32(handle->NextPort(), event_mask);
if ((handle->mask() & event_mask) != 0) {
DartUtils::PostInt32(handle->port(), event_mask);
}
}
} else {
@ -1163,8 +1147,8 @@ void EventHandlerImplementation::HandleRecvFrom(Handle* handle,
handle->ReadComplete(buffer);
if (!handle->IsClosing()) {
int event_mask = 1 << kInEvent;
if ((handle->Mask() & event_mask) != 0) {
DartUtils::PostInt32(handle->NextPort(), event_mask);
if ((handle->mask() & event_mask) != 0) {
DartUtils::PostInt32(handle->port(), event_mask);
}
}
@ -1182,8 +1166,8 @@ void EventHandlerImplementation::HandleWrite(Handle* handle,
int event_mask = 1 << kOutEvent;
ASSERT(!handle->is_client_socket() ||
reinterpret_cast<ClientSocket*>(handle)->is_connected());
if ((handle->Mask() & event_mask) != 0) {
DartUtils::PostInt32(handle->NextPort(), event_mask);
if ((handle->mask() & event_mask) != 0) {
DartUtils::PostInt32(handle->port(), event_mask);
}
}
} else {
@ -1366,7 +1350,7 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
void EventHandlerImplementation::Start(EventHandler* handler) {
int result = Thread::Start(EventHandlerEntry,
reinterpret_cast<uword>(handler));
reinterpret_cast<uword>(handler));
if (result != 0) {
FATAL1("Failed to start event handler thread %d", result);
}

View file

@ -29,6 +29,13 @@ class ClientSocket;
class ListenSocket;
struct InterruptMessage {
intptr_t id;
Dart_Port dart_port;
int64_t data;
};
// An OverlappedBuffer encapsulates the OVERLAPPED structure and the
// associated data buffer. For accept it also contains the pre-created
// socket for the client.
@ -154,7 +161,7 @@ class OverlappedBuffer {
// Abstract super class for holding information on listen and connected
// sockets.
class Handle : public DescriptorInfoBase {
class Handle {
public:
enum Type {
kFile,
@ -218,6 +225,7 @@ class Handle : public DescriptorInfoBase {
EventHandlerImplementation* event_handler) = 0;
HANDLE handle() { return handle_; }
Dart_Port port() { return port_; }
void Lock();
void Unlock();
@ -230,6 +238,10 @@ class Handle : public DescriptorInfoBase {
bool IsHandleClosed() const { return handle_ == INVALID_HANDLE_VALUE; }
void SetPortAndMask(Dart_Port port, intptr_t mask) {
port_ = port;
mask_ = mask;
}
Type type() { return type_; }
bool is_file() { return type_ == kFile; }
bool is_socket() { return type_ == kListenSocket ||
@ -238,6 +250,8 @@ class Handle : public DescriptorInfoBase {
bool is_listen_socket() { return type_ == kListenSocket; }
bool is_client_socket() { return type_ == kClientSocket; }
bool is_datagram_socket() { return type_ == kDatagramSocket; }
void set_mask(intptr_t mask) { mask_ = mask; }
intptr_t mask() { return mask_; }
void MarkDoesNotSupportOverlappedIO() {
flags_ |= (1 << kDoesNotSupportOverlappedIO);
@ -260,12 +274,15 @@ class Handle : public DescriptorInfoBase {
kError = 4
};
explicit Handle(intptr_t handle);
explicit Handle(HANDLE handle);
Handle(HANDLE handle, Dart_Port port);
virtual void HandleIssueError();
Type type_;
HANDLE handle_;
Dart_Port port_; // Dart port to communicate events for this socket.
intptr_t mask_; // Mask of events to report through the port.
HANDLE completion_port_;
EventHandlerImplementation* event_handler_;
@ -281,12 +298,12 @@ class Handle : public DescriptorInfoBase {
};
class FileHandle : public DescriptorInfoSingleMixin<Handle> {
class FileHandle : public Handle {
public:
explicit FileHandle(HANDLE handle)
: DescriptorInfoSingleMixin(reinterpret_cast<intptr_t>(handle)) {
type_ = kFile;
}
: Handle(handle) { type_ = kFile; }
FileHandle(HANDLE handle, Dart_Port port)
: Handle(handle, port) { type_ = kFile; }
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual bool IsClosed();
@ -322,10 +339,10 @@ class StdHandle : public FileHandle {
};
class DirectoryWatchHandle : public DescriptorInfoSingleMixin<Handle> {
class DirectoryWatchHandle : public Handle {
public:
DirectoryWatchHandle(HANDLE handle, int events, bool recursive)
: DescriptorInfoSingleMixin(reinterpret_cast<intptr_t>(handle)),
: Handle(handle),
events_(events),
recursive_(recursive) {
type_ = kDirectoryWatch;
@ -349,8 +366,11 @@ class SocketHandle : public Handle {
SOCKET socket() const { return socket_; }
protected:
explicit SocketHandle(intptr_t s)
: Handle(s),
explicit SocketHandle(SOCKET s)
: Handle(reinterpret_cast<HANDLE>(s)),
socket_(s) {}
SocketHandle(SOCKET s, Dart_Port port)
: Handle(reinterpret_cast<HANDLE>(s), port),
socket_(s) {}
virtual void HandleIssueError();
@ -361,14 +381,13 @@ class SocketHandle : public Handle {
// Information on listen sockets.
class ListenSocket : public DescriptorInfoMultipleMixin<SocketHandle> {
class ListenSocket : public SocketHandle {
public:
explicit ListenSocket(intptr_t s) : DescriptorInfoMultipleMixin(s),
AcceptEx_(NULL),
pending_accept_count_(0),
accepted_head_(NULL),
accepted_tail_(NULL),
accepted_count_(0) {
explicit ListenSocket(SOCKET s) : SocketHandle(s),
AcceptEx_(NULL),
pending_accept_count_(0),
accepted_head_(NULL),
accepted_tail_(NULL) {
type_ = kListenSocket;
}
virtual ~ListenSocket() {
@ -393,32 +412,22 @@ class ListenSocket : public DescriptorInfoMultipleMixin<SocketHandle> {
int pending_accept_count() { return pending_accept_count_; }
int accepted_count() { return accepted_count_; }
private:
bool LoadAcceptEx();
LPFN_ACCEPTEX AcceptEx_;
// The number of asynchronous `IssueAccept` operations which haven't completed
// yet.
int pending_accept_count_;
// Linked list of accepted connections provided by completion code. Ready to
// be handed over through accept.
ClientSocket* accepted_head_;
ClientSocket* accepted_tail_;
// The number of accepted connections which are waiting to be removed from
// this queue and processed by dart isolates.
int accepted_count_;
};
// Information on connected sockets.
class ClientSocket : public DescriptorInfoSingleMixin<SocketHandle> {
class ClientSocket : public SocketHandle {
public:
explicit ClientSocket(intptr_t s) : DescriptorInfoSingleMixin(s),
explicit ClientSocket(SOCKET s) : SocketHandle(s),
DisconnectEx_(NULL),
next_(NULL),
connected_(false),
@ -427,6 +436,15 @@ class ClientSocket : public DescriptorInfoSingleMixin<SocketHandle> {
type_ = kClientSocket;
}
ClientSocket(SOCKET s, Dart_Port port) : SocketHandle(s, port),
DisconnectEx_(NULL),
next_(NULL),
connected_(false),
closed_(false) {
LoadDisconnectEx();
type_ = kClientSocket;
}
virtual ~ClientSocket() {
// Don't delete this object until all pending requests have been handled.
ASSERT(!HasPendingRead());
@ -468,9 +486,9 @@ class ClientSocket : public DescriptorInfoSingleMixin<SocketHandle> {
};
class DatagramSocket : public DescriptorInfoSingleMixin<SocketHandle> {
class DatagramSocket : public SocketHandle {
public:
explicit DatagramSocket(intptr_t s) : DescriptorInfoSingleMixin(s) {
explicit DatagramSocket(SOCKET s) : SocketHandle(s) {
type_ = kDatagramSocket;
}
@ -505,7 +523,6 @@ class EventHandlerImplementation {
void HandleInterrupt(InterruptMessage* msg);
void HandleTimeout();
void HandleAccept(ListenSocket* listen_socket, OverlappedBuffer* buffer);
void TryDispatchingPendingAccepts(ListenSocket *listen_socket);
void HandleRead(Handle* handle, int bytes, OverlappedBuffer* buffer);
void HandleRecvFrom(Handle* handle, int bytes, OverlappedBuffer* buffer);
void HandleWrite(Handle* handle, int bytes, OverlappedBuffer* buffer);

View file

@ -66,7 +66,7 @@ namespace bin {
V(SecureSocket_RegisterHandshakeCompleteCallback, 2) \
V(SecureSocket_Renegotiate, 4) \
V(ServerSocket_Accept, 2) \
V(ServerSocket_CreateBindListen, 6) \
V(ServerSocket_CreateBindListen, 5) \
V(Socket_CreateConnect, 3) \
V(Socket_CreateBindDatagram, 4) \
V(Socket_Available, 1) \
@ -83,7 +83,6 @@ namespace bin {
V(Socket_SetOption, 4) \
V(Socket_JoinMulticast, 4) \
V(Socket_LeaveMulticast, 4) \
V(Socket_MarkSocketAsSharedHack, 1) \
V(Socket_GetSocketId, 1) \
V(Socket_SetSocketId, 2) \
V(Stdin_ReadByte, 1) \

View file

@ -7,7 +7,6 @@
#include "bin/dartutils.h"
#include "bin/socket.h"
#include "bin/thread.h"
#include "bin/lockers.h"
#include "bin/utils.h"
#include "platform/globals.h"
@ -18,144 +17,6 @@
namespace dart {
namespace bin {
Dart_Handle ListeningSocketRegistry::CreateBindListen(Dart_Handle socket_object,
RawAddr addr,
intptr_t port,
intptr_t backlog,
bool v6_only,
bool shared) {
MutexLocker ml(ListeningSocketRegistry::mutex_);
SocketsIterator it = sockets_by_port_.find(port);
OSSocket *first_os_socket = NULL;
if (it != sockets_by_port_.end()) {
first_os_socket = it->second;
}
if (first_os_socket != NULL) {
// There is already a socket listening on this port. We need to ensure
// that if there is one also listening on the same address, it was created
// with `shared = true`, ...
OSSocket *os_socket = it->second;
OSSocket *os_socket_same_addr = findOSSocketWithAddress(os_socket, addr);
if (os_socket_same_addr != NULL) {
if (!os_socket_same_addr->shared || !shared) {
OSError os_error(-1,
"The shared flag to bind() needs to be `true` if "
"binding multiple times on the same (address, port) "
"combination.",
OSError::kUnknown);
return DartUtils::NewDartOSError(&os_error);
}
if (os_socket_same_addr->v6_only != v6_only) {
OSError os_error(-1,
"The v6Only flag to bind() needs to be the same if "
"binding multiple times on the same (address, port) "
"combination.",
OSError::kUnknown);
return DartUtils::NewDartOSError(&os_error);
}
// This socket creation is the exact same as the one which originally
// created the socket. We therefore increment the refcount and return
// the file descriptor.
os_socket->ref_count++;
// We set as a side-effect the file descriptor on the dart socket_object.
Socket::SetSocketIdNativeField(socket_object, os_socket->socketfd);
return Dart_True();
}
}
// There is no socket listening on that (address, port), so we create new one.
intptr_t socketfd = ServerSocket::CreateBindListen(
addr, port, backlog, v6_only);
if (socketfd == -5) {
OSError os_error(-1, "Invalid host", OSError::kUnknown);
return DartUtils::NewDartOSError(&os_error);
}
if (socketfd < 0) {
OSError error;
return DartUtils::NewDartOSError(&error);
}
if (!ServerSocket::StartAccept(socketfd)) {
OSError os_error(-1, "Failed to start accept", OSError::kUnknown);
return DartUtils::NewDartOSError(&os_error);
}
intptr_t allocated_port = Socket::GetPort(socketfd);
ASSERT(allocated_port > 0);
OSSocket *os_socket =
new OSSocket(addr, allocated_port, v6_only, shared, socketfd);
os_socket->ref_count = 1;
os_socket->next = first_os_socket;
sockets_by_port_[allocated_port] = os_socket;
sockets_by_fd_[socketfd] = os_socket;
// We set as a side-effect the port on the dart socket_object.
Socket::SetSocketIdNativeField(socket_object, socketfd);
return Dart_True();
}
bool ListeningSocketRegistry::CloseSafe(int socketfd) {
ASSERT(!mutex_->TryLock());
SocketsIterator it = sockets_by_fd_.find(socketfd);
if (it != sockets_by_fd_.end()) {
OSSocket *os_socket = it->second;
ASSERT(os_socket->ref_count > 0);
os_socket->ref_count--;
if (os_socket->ref_count == 0) {
// We free the OS socket by removing it from two datastructures.
sockets_by_fd_.erase(socketfd);
OSSocket *prev = NULL;
OSSocket *current = sockets_by_port_[os_socket->port];
while (current != os_socket) {
ASSERT(current != NULL);
prev = current;
current = current->next;
}
if (prev == NULL && current->next == NULL) {
sockets_by_port_.erase(os_socket->port);
} else if (prev == NULL) {
sockets_by_port_[os_socket->port] = current->next;
} else {
prev->next = os_socket->next;
}
delete os_socket;
return true;
}
return false;
} else {
// It should be impossible for the event handler to close something that
// hasn't been created before.
UNREACHABLE();
return false;
}
}
Dart_Handle ListeningSocketRegistry::MarkSocketFdAsSharableHack(int socketfd) {
MutexLocker ml(ListeningSocketRegistry::mutex_);
SocketsIterator it = sockets_by_fd_.find(socketfd);
if (it != sockets_by_fd_.end()) {
it->second->shared = true;
return Dart_True();
} else {
return Dart_False();
}
}
ListeningSocketRegistry globalTcpListeningSocketRegistry;
static const int kSocketIdNativeField = 0;
void FUNCTION_NAME(InternetAddress_Parse)(Dart_NativeArguments args) {
@ -515,12 +376,20 @@ void FUNCTION_NAME(ServerSocket_CreateBindListen)(Dart_NativeArguments args) {
0,
65535);
bool v6_only = DartUtils::GetBooleanValue(Dart_GetNativeArgument(args, 4));
bool shared = DartUtils::GetBooleanValue(Dart_GetNativeArgument(args, 5));
Dart_Handle socket_object = Dart_GetNativeArgument(args, 0);
Dart_Handle result = globalTcpListeningSocketRegistry.CreateBindListen(
socket_object, addr, port, backlog, v6_only, shared);
Dart_SetReturnValue(args, result);
intptr_t socket = ServerSocket::CreateBindListen(
addr, port, backlog, v6_only);
OSError error;
if (socket >= 0 && ServerSocket::StartAccept(socket)) {
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket);
Dart_SetReturnValue(args, Dart_True());
} else {
if (socket == -5) {
OSError os_error(-1, "Invalid host", OSError::kUnknown);
Dart_SetReturnValue(args, DartUtils::NewDartOSError(&os_error));
} else {
Dart_SetReturnValue(args, DartUtils::NewDartOSError(&error));
}
}
}
@ -809,15 +678,6 @@ void FUNCTION_NAME(Socket_LeaveMulticast)(Dart_NativeArguments args) {
}
void FUNCTION_NAME(Socket_MarkSocketAsSharedHack)(Dart_NativeArguments args) {
intptr_t socketfd =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
Dart_SetReturnValue(args,
globalTcpListeningSocketRegistry.MarkSocketFdAsSharableHack(socketfd));
}
void Socket::SetSocketIdNativeField(Dart_Handle socket, intptr_t id) {
Dart_Handle err =
Dart_SetNativeInstanceField(socket, kSocketIdNativeField, id);

View file

@ -5,8 +5,6 @@
#ifndef BIN_SOCKET_H_
#define BIN_SOCKET_H_
#include <map>
#include "platform/globals.h"
#include "bin/builtin.h"
@ -78,21 +76,6 @@ class SocketAddress {
sizeof(struct in6_addr) : sizeof(struct in_addr);
}
static bool AreAddressesEqual(const RawAddr& a, const RawAddr& b) {
if (a.ss.ss_family == AF_INET) {
if (b.ss.ss_family != AF_INET) return false;
return memcmp(&a.in.sin_addr, &b.in.sin_addr, sizeof(a.in.sin_addr)) == 0;
} else if (a.ss.ss_family == AF_INET6) {
if (b.ss.ss_family != AF_INET6) return false;
return memcmp(&a.in6.sin6_addr,
&b.in6.sin6_addr,
sizeof(a.in6.sin6_addr)) == 0;
} else {
UNREACHABLE();
return false;
}
}
static void GetSockAddr(Dart_Handle obj, RawAddr* addr) {
Dart_TypedData_Type data_type;
uint8_t* data = NULL;
@ -329,76 +312,6 @@ class ServerSocket {
DISALLOW_IMPLICIT_CONSTRUCTORS(ServerSocket);
};
class ListeningSocketRegistry {
private:
struct OSSocket {
RawAddr address;
int port;
bool v6_only;
bool shared;
int ref_count;
int socketfd;
// Singly linked lists of OSSocket instances which listen on the same port
// but on different addresses.
OSSocket *next;
OSSocket(RawAddr address, int port, bool v6_only, bool shared, int socketfd)
: address(address), port(port), v6_only(v6_only), shared(shared),
ref_count(0), socketfd(socketfd), next(NULL) {}
};
public:
ListeningSocketRegistry() : mutex_(new Mutex()) {}
// This function should be called from a dart runtime call in order to create
// a new (potentially shared) socket.
Dart_Handle CreateBindListen(Dart_Handle socket_object,
RawAddr addr,
intptr_t port,
intptr_t backlog,
bool v6_only,
bool shared);
// This should be called from the event handler for every kCloseEvent it gets
// on listening sockets.
//
// Returns `true` if the last reference has been dropped and the underlying
// socket can be closed.
//
// The caller is responsible for obtaining the mutex first, before calling
// this function.
bool CloseSafe(int socketfd);
// Mark an existing socket as sharable if it is not already marked as
// sharable.
//
// NOTE: This is a temporary measure until ServerSocketReference's are
// removed.
Dart_Handle MarkSocketFdAsSharableHack(int socketfd);
Mutex *mutex() { return mutex_; }
private:
OSSocket *findOSSocketWithAddress(OSSocket *current, const RawAddr& addr) {
while (current != NULL) {
if (SocketAddress::AreAddressesEqual(current->address, addr)) {
return current;
}
current = current->next;
}
return NULL;
}
std::map<intptr_t, OSSocket*> sockets_by_port_;
std::map<intptr_t, OSSocket*> sockets_by_fd_;
Mutex *mutex_;
typedef std::map<intptr_t, OSSocket*>::iterator SocketsIterator;
};
extern ListeningSocketRegistry globalTcpListeningSocketRegistry;
} // namespace bin
} // namespace dart

View file

@ -6,9 +6,8 @@ patch class RawServerSocket {
/* patch */ static Future<RawServerSocket> bind(address,
int port,
{int backlog: 0,
bool v6Only: false,
bool shared: false}) {
return _RawServerSocket.bind(address, port, backlog, v6Only, shared);
bool v6Only: false}) {
return _RawServerSocket.bind(address, port, backlog, v6Only);
}
}
@ -460,8 +459,7 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
static Future<_NativeSocket> bind(host,
int port,
int backlog,
bool v6Only,
bool shared) {
bool v6Only) {
return new Future.value(host)
.then((host) {
if (host is _InternetAddress) return host;
@ -476,12 +474,10 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
.then((address) {
var socket = new _NativeSocket.listen();
socket.address = address;
var result = socket.nativeCreateBindListen(address._in_addr,
port,
backlog,
v6Only,
shared);
v6Only);
if (result is OSError) {
throw new SocketException("Failed to create server socket",
osError: result,
@ -527,9 +523,7 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
_NativeSocket.normal() : typeFlags = TYPE_NORMAL_SOCKET | TYPE_TCP_SOCKET;
_NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET | TYPE_TCP_SOCKET {
isClosedWrite = true;
}
_NativeSocket.listen() : typeFlags = TYPE_LISTENING_SOCKET | TYPE_TCP_SOCKET;
_NativeSocket.pipe() : typeFlags = TYPE_PIPE;
@ -1119,8 +1113,7 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
native "Socket_SendTo";
nativeCreateConnect(List<int> addr,
int port) native "Socket_CreateConnect";
nativeCreateBindListen(List<int> addr, int port, int backlog, bool v6Only,
bool shared)
nativeCreateBindListen(List<int> addr, int port, int backlog, bool v6Only)
native "ServerSocket_CreateBindListen";
nativeCreateBindDatagram(List<int> addr, int port, bool reuseAddress)
native "Socket_CreateBindDatagram";
@ -1138,8 +1131,6 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
bool nativeLeaveMulticast(
List<int> addr, List<int> interfaceAddr, int interfaceIndex)
native "Socket_LeaveMulticast";
bool _nativeMarkSocketAsSharedHack()
native "Socket_MarkSocketAsSharedHack";
}
@ -1148,21 +1139,19 @@ class _RawServerSocket extends Stream<RawSocket>
final _NativeSocket _socket;
StreamController<RawSocket> _controller;
ReceivePort _referencePort;
bool _v6Only;
static Future<_RawServerSocket> bind(address,
int port,
int backlog,
bool v6Only,
bool shared) {
bool v6Only) {
if (port < 0 || port > 0xFFFF)
throw new ArgumentError("Invalid port $port");
if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog");
return _NativeSocket.bind(address, port, backlog, v6Only, shared)
.then((socket) => new _RawServerSocket(socket, v6Only));
return _NativeSocket.bind(address, port, backlog, v6Only)
.then((socket) => new _RawServerSocket(socket));
}
_RawServerSocket(this._socket, this._v6Only);
_RawServerSocket(this._socket);
StreamSubscription<RawSocket> listen(void onData(RawSocket event),
{Function onError,
@ -1244,20 +1233,18 @@ class _RawServerSocket extends Stream<RawSocket>
RawServerSocketReference get reference {
if (_referencePort == null) {
bool successfull = _socket._nativeMarkSocketAsSharedHack();
_referencePort = new ReceivePort();
_referencePort.listen((sendPort) {
sendPort.send(
[_socket.address,
_socket.port,
_v6Only]);
[_socket.nativeGetSocketId(),
_socket.address,
_socket.localPort]);
});
}
return new _RawServerSocketReference(_referencePort.sendPort);
}
Map _toJSON(bool ref) => _socket._toJSON(ref);
void set _owner(owner) { _socket.owner = owner; }
}
@ -1270,21 +1257,20 @@ class _RawServerSocketReference implements RawServerSocketReference {
Future<RawServerSocket> create() {
var port = new ReceivePort();
_sendPort.send(port.sendPort);
return port.first.then((List args) {
return port.first.then((args) {
port.close();
InternetAddress address = args[0];
int tcpPort = args[1];
bool v6Only = args[2];
return
RawServerSocket.bind(address, tcpPort, v6Only: v6Only, shared: true);
var native = new _NativeSocket.listen();
native.nativeSetSocketId(args[0]);
native.address = args[1];
native.localPort = args[2];
return new _RawServerSocket(native);
});
}
int get hashCode => _sendPort.hashCode;
bool operator==(Object other)
=> other is _RawServerSocketReference && _sendPort == other._sendPort;
=> other is _RawServerSocketReference && _sendPort == other._sendPort;
}
@ -1443,9 +1429,8 @@ patch class ServerSocket {
/* patch */ static Future<ServerSocket> bind(address,
int port,
{int backlog: 0,
bool v6Only: false,
bool shared: false}) {
return _ServerSocket.bind(address, port, backlog, v6Only, shared);
bool v6Only: false}) {
return _ServerSocket.bind(address, port, backlog, v6Only);
}
}
@ -1468,9 +1453,8 @@ class _ServerSocket extends Stream<Socket>
static Future<_ServerSocket> bind(address,
int port,
int backlog,
bool v6Only,
bool shared) {
return _RawServerSocket.bind(address, port, backlog, v6Only, shared)
bool v6Only) {
return _RawServerSocket.bind(address, port, backlog, v6Only)
.then((socket) => new _ServerSocket(socket));
}
@ -1498,7 +1482,6 @@ class _ServerSocket extends Stream<Socket>
}
Map _toJSON(bool ref) => _socket._toJSON(ref);
void set _owner(owner) { _socket._owner = owner; }
}

View file

@ -359,8 +359,7 @@ class RawServerSocket {
static Future<RawServerSocket> bind(address,
int port,
{int backlog: 0,
bool v6Only: false,
bool shared: false}) {
bool v6Only: false}) {
throw new UnsupportedError("RawServerSocket.bind");
}
}
@ -371,8 +370,7 @@ class ServerSocket {
static Future<ServerSocket> bind(address,
int port,
{int backlog: 0,
bool v6Only: false,
bool shared: false}) {
bool v6Only: false}) {
throw new UnsupportedError("ServerSocket.bind");
}
}

View file

@ -67,8 +67,7 @@ class SecureServerSocket extends Stream<SecureSocket> {
bool v6Only: false,
bool requestClientCertificate: false,
bool requireClientCertificate: false,
List<String> supportedProtocols,
bool shared: false}) {
List<String> supportedProtocols}) {
return RawSecureServerSocket.bind(
address,
port,
@ -77,8 +76,7 @@ class SecureServerSocket extends Stream<SecureSocket> {
v6Only: v6Only,
requestClientCertificate: requestClientCertificate,
requireClientCertificate: requireClientCertificate,
supportedProtocols: supportedProtocols,
shared: shared).then(
supportedProtocols: supportedProtocols).then(
(serverSocket) => new SecureServerSocket._(serverSocket));
}
@ -194,10 +192,8 @@ class RawSecureServerSocket extends Stream<RawSecureSocket> {
bool v6Only: false,
bool requestClientCertificate: false,
bool requireClientCertificate: false,
List<String> supportedProtocols,
bool shared: false}) {
return RawServerSocket.bind(
address, port, backlog: backlog, v6Only: v6Only, shared: shared)
List<String> supportedProtocols}) {
return RawServerSocket.bind(address, port, backlog: backlog, v6Only: v6Only)
.then((serverSocket) => new RawSecureServerSocket._(
serverSocket,
certificateName,

View file

@ -227,8 +227,7 @@ abstract class RawServerSocket implements Stream<RawSocket> {
external static Future<RawServerSocket> bind(address,
int port,
{int backlog: 0,
bool v6Only: false,
bool shared: false});
bool v6Only: false});
/**
* Returns the port used by this socket.
@ -319,8 +318,7 @@ abstract class ServerSocket implements Stream<Socket> {
external static Future<ServerSocket> bind(address,
int port,
{int backlog: 0,
bool v6Only: false,
bool shared: false});
bool v6Only: false});
/**
* Returns the port used by this socket.

View file

@ -1,165 +0,0 @@
// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'dart:convert';
import 'package:async_helper/async_helper.dart';
import 'package:expect/expect.dart';
testBindShared(bool v6Only) {
asyncStart();
ServerSocket.bind(
'localhost', 0, v6Only: v6Only, shared: true).then((socket) {
Expect.isTrue(socket.port > 0);
asyncStart();
return ServerSocket.bind(
'localhost', socket.port, v6Only: v6Only, shared: true).then((socket2) {
Expect.equals(socket.address.address, socket2.address.address);
Expect.equals(socket.port, socket2.port);
socket.close().whenComplete(asyncEnd);
socket2.close().whenComplete(asyncEnd);
});
});
}
negTestBindSharedMismatch(bool v6Only) {
asyncStart();
ServerSocket.bind('localhost', 0, v6Only: v6Only).then((ServerSocket socket) {
Expect.isTrue(socket.port > 0);
asyncStart();
return ServerSocket.bind(
'localhost', socket.port, v6Only: v6Only).catchError((error) {
Expect.isTrue(error is SocketException);
Expect.isTrue('$error'.contains('shared flag'));
socket.close().whenComplete(asyncEnd);
asyncEnd();
});
});
}
negTestBindV6OnlyMismatch(bool v6Only) {
asyncStart();
ServerSocket.bind(
'localhost', 0, v6Only: v6Only, shared: true).then((ServerSocket socket) {
Expect.isTrue(socket.port > 0);
asyncStart();
return ServerSocket.bind(
'localhost', socket.port, v6Only: !v6Only, shared: true)
.catchError((error) {
Expect.isTrue(error is SocketException);
Expect.isTrue('$error'.contains('v6Only flag'));
socket.close().whenComplete(asyncEnd);
asyncEnd();
});
});
}
Future testBindDifferentAddresses(InternetAddress addr1,
InternetAddress addr2,
bool addr1V6Only,
bool addr2V6Only) {
asyncStart();
return ServerSocket.bind(
addr1, 0, v6Only: addr1V6Only, shared: false).then((socket) {
Expect.isTrue(socket.port > 0);
asyncStart();
return ServerSocket.bind(
addr2, socket.port, v6Only: addr2V6Only, shared: false).then((socket2) {
Expect.equals(socket.port, socket2.port);
return Future.wait([
socket.close().whenComplete(asyncEnd),
socket2.close().whenComplete(asyncEnd),
]);
});
});
}
testSocketReferenceInteroperability() {
asyncStart();
ServerSocket.bind('localhost', 0).then((ServerSocket socket) {
Expect.isTrue(socket.port > 0);
asyncStart();
socket.reference.create().then((socket2) {
bool gotResponseFrom1;
bool gotResponseFrom2;
Expect.isTrue(socket.port > 0);
Expect.equals(socket.port, socket2.port);
asyncStart();
asyncStart();
socket.listen((client) {
client.drain().whenComplete(asyncEnd);
client.write('1: hello world');
client.close();
socket.close().whenComplete(asyncEnd);
}, onDone: asyncEnd);
asyncStart();
asyncStart();
socket2.listen((client) {
client.drain().whenComplete(asyncEnd);
client.write('2: hello world');
client.close();
socket2.close().whenComplete(asyncEnd);
}, onDone: asyncEnd);
var futures = [];
for (int i = 0; i < 2; i++) {
asyncStart();
futures.add(
Socket.connect(socket.address, socket.port).then((Socket socket) {
socket.close().whenComplete(asyncEnd);
asyncStart();
return socket
.transform(ASCII.decoder).join('').then((String result) {
if (result == '1: hello world') gotResponseFrom1 = true;
else if (result == '2: hello world') gotResponseFrom2 = true;
else throw 'Unexpected result from server: $result';
asyncEnd();
});
}));
}
asyncStart();
Future.wait(futures).then((_) {
Expect.isTrue(gotResponseFrom1);
Expect.isTrue(gotResponseFrom2);
asyncEnd();
});
});
});
}
void main() {
testBindShared(false);
testBindShared(true);
negTestBindSharedMismatch(false);
negTestBindSharedMismatch(true);
negTestBindV6OnlyMismatch(true);
negTestBindV6OnlyMismatch(false);
asyncStart();
testBindDifferentAddresses(InternetAddress.ANY_IP_V6,
InternetAddress.ANY_IP_V4,
true,
false).then((_) {
testBindDifferentAddresses(InternetAddress.ANY_IP_V4,
InternetAddress.ANY_IP_V6,
false,
true);
asyncEnd();
});
testSocketReferenceInteroperability();
}