Fuchsia: Partial implementation of dart:io sockets

This changes the eventhandler to epoll and adds some code for sockets
to runtime/bin/sockets_fuchsia.cc. It also adds some simple tests
to hello_fuchsia.dart.

R=asiva@google.com

Review URL: https://codereview.chromium.org/2515643004 .
This commit is contained in:
Zachary Anderson 2016-11-20 22:04:14 -08:00
parent 74d713ece9
commit 14233482d9
14 changed files with 1138 additions and 262 deletions

View file

@ -37,6 +37,7 @@
'extensions_win.cc',
'fdutils.h',
'fdutils_android.cc',
'fdutils_fuchsia.cc',
'fdutils_linux.cc',
'fdutils_macos.cc',
'file.cc',

View file

@ -10,13 +10,25 @@
#include "bin/eventhandler.h"
#include "bin/eventhandler_fuchsia.h"
#include <magenta/status.h>
#include <magenta/syscalls.h>
#include <errno.h> // NOLINT
#include <fcntl.h> // NOLINT
#include <pthread.h> // NOLINT
#include <stdio.h> // NOLINT
#include <string.h> // NOLINT
#include <sys/epoll.h> // NOLINT
#include <sys/stat.h> // NOLINT
#include <unistd.h> // NOLINT
#include "bin/fdutils.h"
#include "bin/lockers.h"
#include "bin/log.h"
#include "bin/socket.h"
#include "bin/thread.h"
#include "bin/utils.h"
#include "platform/hashmap.h"
#include "platform/utils.h"
// #define EVENTHANDLER_LOGGING 1
#if defined(EVENTHANDLER_LOGGING)
#define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__)
#define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__)
@ -28,115 +40,204 @@
namespace dart {
namespace bin {
MagentaWaitManyInfo::MagentaWaitManyInfo()
: capacity_(kInitialCapacity), size_(0) {
descriptor_infos_ = static_cast<DescriptorInfo**>(
malloc(kInitialCapacity * sizeof(*descriptor_infos_)));
if (descriptor_infos_ == NULL) {
FATAL("Failed to allocate descriptor_infos array");
#if defined(EVENTHANDLER_LOGGING)
static void PrintEventMask(intptr_t fd, intptr_t events) {
Log::PrintErr("%d ", fd);
if ((events & EPOLLIN) != 0) {
Log::PrintErr("EPOLLIN ");
}
items_ =
static_cast<mx_wait_item_t*>(malloc(kInitialCapacity * sizeof(*items_)));
if (items_ == NULL) {
FATAL("Failed to allocate items array");
if ((events & EPOLLPRI) != 0) {
Log::PrintErr("EPOLLPRI ");
}
if ((events & EPOLLOUT) != 0) {
Log::PrintErr("EPOLLOUT ");
}
if ((events & EPOLLERR) != 0) {
Log::PrintErr("EPOLLERR ");
}
if ((events & EPOLLHUP) != 0) {
Log::PrintErr("EPOLLHUP ");
}
if ((events & EPOLLRDHUP) != 0) {
Log::PrintErr("EPOLLRDHUP ");
}
int all_events =
EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
if ((events & ~all_events) != 0) {
Log::PrintErr("(and %08x) ", events & ~all_events);
}
Log::PrintErr("\n");
}
MagentaWaitManyInfo::~MagentaWaitManyInfo() {
free(descriptor_infos_);
free(items_);
}
void MagentaWaitManyInfo::AddHandle(mx_handle_t handle,
mx_signals_t signals,
DescriptorInfo* di) {
#if defined(DEBUG)
// Check that the handle is not already in the list.
for (intptr_t i = 0; i < size_; i++) {
if (items_[i].handle == handle) {
FATAL("The handle is already in the list!");
}
}
#endif
intptr_t new_size = size_ + 1;
GrowArraysIfNeeded(new_size);
descriptor_infos_[size_] = di;
items_[size_].handle = handle;
items_[size_].waitfor = signals;
items_[size_].pending = 0;
size_ = new_size;
LOG_INFO("AddHandle(%ld, %ld, %p), size = %ld\n", handle, signals, di, size_);
intptr_t DescriptorInfo::GetPollEvents() {
// Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
// triggered anyway.
intptr_t events = 0;
if ((Mask() & (1 << kInEvent)) != 0) {
events |= EPOLLIN;
}
if ((Mask() & (1 << kOutEvent)) != 0) {
events |= EPOLLOUT;
}
return events;
}
void MagentaWaitManyInfo::RemoveHandle(mx_handle_t handle) {
intptr_t idx;
for (idx = 1; idx < size_; idx++) {
if (handle == items_[idx].handle) {
break;
}
}
if (idx == size_) {
FATAL("Handle is not in the list!");
}
if (idx != (size_ - 1)) {
descriptor_infos_[idx] = descriptor_infos_[size_ - 1];
items_[idx] = items_[size_ - 1];
}
descriptor_infos_[size_ - 1] = NULL;
items_[size_ - 1] = {MX_HANDLE_INVALID, 0, 0};
size_ = size_ - 1;
LOG_INFO("RemoveHandle(%ld), size = %ld\n", handle, size_);
// Unregister the file descriptor for a DescriptorInfo structure with
// epoll.
static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd());
VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL));
}
void MagentaWaitManyInfo::GrowArraysIfNeeded(intptr_t desired_size) {
if (desired_size < capacity_) {
return;
static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
struct epoll_event event;
event.events = EPOLLRDHUP | di->GetPollEvents();
if (!di->IsListeningSocket()) {
event.events |= EPOLLET;
}
intptr_t new_capacity = desired_size + (desired_size >> 1);
descriptor_infos_ = static_cast<DescriptorInfo**>(
realloc(descriptor_infos_, new_capacity * sizeof(*descriptor_infos_)));
if (descriptor_infos_ == NULL) {
FATAL("Failed to grow descriptor_infos array");
event.data.ptr = di;
LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd());
int status =
NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event));
LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status);
#if defined(EVENTHANDLER_LOGGING)
PrintEventMask(di->fd(), event.events);
#endif
if (status == -1) {
// TODO(dart:io): Verify that the dart end is handling this correctly.
// Epoll does not accept the file descriptor. It could be due to
// already closed file descriptor, or unuspported devices, such
// as /dev/null. In such case, mark the file descriptor as closed,
// so dart will handle it accordingly.
di->NotifyAllDartPorts(1 << kCloseEvent);
}
items_ = static_cast<mx_wait_item_t*>(
realloc(items_, new_capacity * sizeof(*items_)));
if (items_ == NULL) {
FATAL("Failed to grow items array");
}
capacity_ = new_capacity;
LOG_INFO("GrowArraysIfNeeded(%ld), capacity = %ld\n", desired_size,
capacity_);
}
EventHandlerImplementation::EventHandlerImplementation() {
mx_status_t status =
mx_channel_create(0, &interrupt_handles_[0], &interrupt_handles_[1]);
if (status != NO_ERROR) {
FATAL1("mx_channel_create failed: %s\n", mx_status_get_string(status));
EventHandlerImplementation::EventHandlerImplementation()
: socket_map_(&HashMap::SamePointerValue, 16) {
intptr_t result;
result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
if (result != 0) {
FATAL("Pipe creation failed");
}
if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
FATAL("Failed to set pipe fd non blocking\n");
}
if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
FATAL("Failed to set pipe fd close on exec\n");
}
if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
FATAL("Failed to set pipe fd close on exec\n");
}
shutdown_ = false;
info_.AddHandle(interrupt_handles_[0],
MX_SIGNAL_READABLE | MX_SIGNAL_PEER_CLOSED, NULL);
LOG_INFO("EventHandlerImplementation initialized\n");
// The initial size passed to epoll_create is ignore on newer (>=
// 2.6.8) Linux versions
static const int kEpollInitialSize = 64;
epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize));
if (epoll_fd_ == -1) {
FATAL1("Failed creating epoll file descriptor: %i", errno);
}
if (!FDUtils::SetCloseOnExec(epoll_fd_)) {
FATAL("Failed to set epoll fd close on exec\n");
}
// Register the interrupt_fd with the epoll instance.
struct epoll_event event;
event.events = EPOLLIN;
event.data.ptr = NULL;
LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_);
int status = NO_RETRY_EXPECTED(
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n",
epoll_fd_, status);
if (status == -1) {
FATAL("Failed adding interrupt fd to epoll instance");
}
}
static void DeleteDescriptorInfo(void* info) {
DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
di->Close();
LOG_INFO("Closed %d\n", di->fd());
delete di;
}
EventHandlerImplementation::~EventHandlerImplementation() {
mx_status_t status = mx_handle_close(interrupt_handles_[0]);
if (status != NO_ERROR) {
FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status));
socket_map_.Clear(DeleteDescriptorInfo);
VOID_NO_RETRY_EXPECTED(close(epoll_fd_));
VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0]));
VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1]));
}
void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
DescriptorInfo* di) {
intptr_t new_mask = di->Mask();
LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask,
new_mask);
if ((old_mask != 0) && (new_mask == 0)) {
RemoveFromEpollInstance(epoll_fd_, di);
} else if ((old_mask == 0) && (new_mask != 0)) {
AddToEpollInstance(epoll_fd_, di);
} else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) {
ASSERT(!di->IsListeningSocket());
RemoveFromEpollInstance(epoll_fd_, di);
AddToEpollInstance(epoll_fd_, di);
}
status = mx_handle_close(interrupt_handles_[1]);
if (status != NO_ERROR) {
FATAL1("mx_handle_close failed: %s\n", mx_status_get_string(status));
}
DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
intptr_t fd,
bool is_listening) {
ASSERT(fd >= 0);
HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd),
GetHashmapHashFromFd(fd), true);
ASSERT(entry != NULL);
DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
if (di == NULL) {
// If there is no data in the hash map for this file descriptor a
// new DescriptorInfo for the file descriptor is inserted.
if (is_listening) {
di = new DescriptorInfoMultiple(fd);
} else {
di = new DescriptorInfoSingle(fd);
}
entry->value = di;
}
LOG_INFO("EventHandlerImplementation destroyed\n");
ASSERT(fd == di->fd());
return di;
}
static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) {
size_t remaining = count;
char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer));
while (remaining > 0) {
ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining));
if (bytes_written == 0) {
return count - remaining;
} else if (bytes_written == -1) {
ASSERT(EAGAIN == EWOULDBLOCK);
// Error code EWOULDBLOCK should only happen for non blocking
// file descriptors.
ASSERT(errno != EWOULDBLOCK);
return -1;
} else {
ASSERT(bytes_written > 0);
remaining -= bytes_written;
buffer_pos += bytes_written;
}
}
return count;
}
@ -147,71 +248,171 @@ void EventHandlerImplementation::WakeupHandler(intptr_t id,
msg.id = id;
msg.dart_port = dart_port;
msg.data = data;
mx_status_t status =
mx_channel_write(interrupt_handles_[1], 0, &msg, sizeof(msg), NULL, 0);
if (status != NO_ERROR) {
FATAL1("mx_channel_write failed: %s\n", mx_status_get_string(status));
// WriteToBlocking will write up to 512 bytes atomically, and since our msg
// is smaller than 512, we don't need a thread lock.
// See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
ASSERT(kInterruptMessageSize < PIPE_BUF);
intptr_t result =
WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
if (result != kInterruptMessageSize) {
if (result == -1) {
perror("Interrupt message failure:");
}
FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
}
LOG_INFO("WakeupHandler(%ld, %ld, %lld)\n", id, dart_port, data);
}
void EventHandlerImplementation::HandleInterruptFd() {
LOG_INFO("HandleInterruptFd entry\n");
InterruptMessage msg;
uint32_t bytes = kInterruptMessageSize;
mx_status_t status;
while (true) {
status = mx_channel_read(interrupt_handles_[0], 0, &msg, bytes, &bytes,
NULL, 0, NULL);
if (status != NO_ERROR) {
break;
}
ASSERT(bytes == kInterruptMessageSize);
if (msg.id == kTimerId) {
const intptr_t MAX_MESSAGES = kInterruptMessageSize;
InterruptMessage msg[MAX_MESSAGES];
ssize_t bytes = NO_RETRY_EXPECTED(
read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes);
for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
if (msg[i].id == kTimerId) {
LOG_INFO("HandleInterruptFd read timer update\n");
timeout_queue_.UpdateTimeout(msg.dart_port, msg.data);
} else if (msg.id == kShutdownId) {
timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
} else if (msg[i].id == kShutdownId) {
LOG_INFO("HandleInterruptFd read shutdown\n");
shutdown_ = true;
} else {
// TODO(zra): Handle commands to add and remove handles from the
// MagentaWaitManyInfo.
UNIMPLEMENTED();
ASSERT((msg[i].data & COMMAND_MASK) != 0);
LOG_INFO("HandleInterruptFd command\n");
DescriptorInfo* di =
GetDescriptorInfo(msg[i].id, IS_LISTENING_SOCKET(msg[i].data));
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for reading.
LOG_INFO("\tSHUT_RD: %d\n", di->fd());
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
} else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for writing.
LOG_INFO("\tSHUT_WR: %d\n", di->fd());
VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
} else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
// Close the socket and free system resources and move on to next
// message.
intptr_t old_mask = di->Mask();
Dart_Port port = msg[i].dart_port;
di->RemovePort(port);
intptr_t new_mask = di->Mask();
UpdateEpollInstance(old_mask, di);
LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask);
intptr_t fd = di->fd();
if (di->IsListeningSocket()) {
// We only close the socket file descriptor from the operating
// system if there are no other dart socket objects which
// are listening on the same (address, port) combination.
ListeningSocketRegistry* registry =
ListeningSocketRegistry::Instance();
MutexLocker locker(registry->mutex());
if (registry->CloseSafe(fd)) {
ASSERT(new_mask == 0);
socket_map_.Remove(GetHashmapKeyFromFd(fd),
GetHashmapHashFromFd(fd));
di->Close();
LOG_INFO("Closed %d\n", di->fd());
delete di;
}
} else {
ASSERT(new_mask == 0);
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
LOG_INFO("Closed %d\n", di->fd());
delete di;
}
DartUtils::PostInt32(port, 1 << kDestroyedEvent);
} else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
int count = TOKEN_COUNT(msg[i].data);
intptr_t old_mask = di->Mask();
LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask);
di->ReturnTokens(msg[i].dart_port, count);
UpdateEpollInstance(old_mask, di);
} else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
// `events` can only have kInEvent/kOutEvent flags set.
intptr_t events = msg[i].data & EVENT_MASK;
ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
intptr_t old_mask = di->Mask();
LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask,
msg[i].data & EVENT_MASK);
di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
UpdateEpollInstance(old_mask, di);
} else {
UNREACHABLE();
}
}
}
// status == ERR_SHOULD_WAIT when we try to read and there are no messages
// available, so it is an error if we get here and status != ERR_SHOULD_WAIT.
if (status != ERR_SHOULD_WAIT) {
FATAL1("mx_channel_read failed: %s\n", mx_status_get_string(status));
}
LOG_INFO("HandleInterruptFd exit\n");
}
void EventHandlerImplementation::HandleEvents() {
LOG_INFO("HandleEvents entry\n");
for (intptr_t i = 1; i < info_.size(); i++) {
const mx_wait_item_t& wait_item = info_.items()[i];
if (wait_item.pending & wait_item.waitfor) {
// Only the control handle has no descriptor info.
ASSERT(info_.descriptor_infos()[i] != NULL);
ASSERT(wait_item.handle != interrupt_handles_[0]);
// TODO(zra): Handle events on other handles. At the moment we are
// only interrupted when there is a message on interrupt_handles_[0].
UNIMPLEMENTED();
intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
DescriptorInfo* di) {
#ifdef EVENTHANDLER_LOGGING
PrintEventMask(di->fd(), events);
#endif
if ((events & EPOLLERR) != 0) {
// Return error only if EPOLLIN is present.
return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
}
intptr_t event_mask = 0;
if ((events & EPOLLIN) != 0) {
event_mask |= (1 << kInEvent);
}
if ((events & EPOLLOUT) != 0) {
event_mask |= (1 << kOutEvent);
}
if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
event_mask |= (1 << kCloseEvent);
}
return event_mask;
}
void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
int size) {
bool interrupt_seen = false;
for (int i = 0; i < size; i++) {
if (events[i].data.ptr == NULL) {
interrupt_seen = true;
} else {
DescriptorInfo* di =
reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
intptr_t event_mask = GetPollEvents(events[i].events, di);
if ((event_mask & (1 << kErrorEvent)) != 0) {
di->NotifyAllDartPorts(event_mask);
}
event_mask &= ~(1 << kErrorEvent);
LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask);
if (event_mask != 0) {
intptr_t old_mask = di->Mask();
Dart_Port port = di->NextNotifyDartPort(event_mask);
ASSERT(port != 0);
UpdateEpollInstance(old_mask, di);
LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask,
port, di->fd());
bool success = DartUtils::PostInt32(port, event_mask);
if (!success) {
// This can happen if e.g. the isolate that owns the port has died
// for some reason.
FATAL2("Failed to post event for fd %ld to port %ld", di->fd(), port);
}
}
}
}
if ((info_.items()[0].pending & MX_SIGNAL_PEER_CLOSED) != 0) {
FATAL("EventHandlerImplementation::Poll: Unexpected peer closed\n");
}
if ((info_.items()[0].pending & MX_SIGNAL_READABLE) != 0) {
LOG_INFO("HandleEvents interrupt_handles_[0] readable\n");
if (interrupt_seen) {
// Handle after socket events, so we avoid closing a socket before we handle
// the current events.
HandleInterruptFd();
} else {
LOG_INFO("HandleEvents interrupt_handles_[0] not readable\n");
}
}
@ -239,6 +440,8 @@ void EventHandlerImplementation::HandleTimeout() {
void EventHandlerImplementation::Poll(uword args) {
static const intptr_t kMaxEvents = 16;
struct epoll_event events[kMaxEvents];
EventHandler* handler = reinterpret_cast<EventHandler*>(args);
EventHandlerImplementation* handler_impl = &handler->delegate_;
ASSERT(handler_impl != NULL);
@ -246,23 +449,21 @@ void EventHandlerImplementation::Poll(uword args) {
while (!handler_impl->shutdown_) {
int64_t millis = handler_impl->GetTimeout();
ASSERT((millis == kInfinityTimeout) || (millis >= 0));
mx_time_t timeout =
millis * kMicrosecondsPerMillisecond * kNanosecondsPerMicrosecond;
const MagentaWaitManyInfo& info = handler_impl->info();
LOG_INFO("mx_handle_wait_many(%p, %ld, %lld)\n", info.items(), info.size(),
timeout);
mx_status_t status =
mx_handle_wait_many(info.items(), info.size(), timeout);
if ((status != NO_ERROR) && (status != ERR_TIMED_OUT)) {
FATAL1("mx_handle_wait_many failed: %s\n", mx_status_get_string(status));
LOG_INFO("epoll_wait(millis = %ld)\n", millis);
intptr_t result = NO_RETRY_EXPECTED(
epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis));
ASSERT(EAGAIN == EWOULDBLOCK);
LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result);
if (result < 0) {
if (errno != EWOULDBLOCK) {
perror("Poll failed");
}
} else {
LOG_INFO("mx_handle_wait_many returned: %ld\n", status);
handler_impl->HandleTimeout();
handler_impl->HandleEvents();
handler_impl->HandleEvents(events, result);
}
}
handler->NotifyShutdownDone();
LOG_INFO("EventHandlerImplementation notifying about shutdown\n");
}
@ -286,6 +487,17 @@ void EventHandlerImplementation::SendData(intptr_t id,
WakeupHandler(id, dart_port, data);
}
void* EventHandlerImplementation::GetHashmapKeyFromFd(intptr_t fd) {
// The hashmap does not support keys with value 0.
return reinterpret_cast<void*>(fd + 1);
}
uint32_t EventHandlerImplementation::GetHashmapHashFromFd(intptr_t fd) {
// The hashmap does not support keys with value 0.
return dart::Utils::WordHash(fd + 1);
}
} // namespace bin
} // namespace dart

View file

@ -10,7 +10,9 @@
#endif
#include <errno.h>
#include <magenta/syscalls.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <unistd.h>
#include "platform/signal_blocker.h"
@ -23,8 +25,11 @@ class DescriptorInfo : public DescriptorInfoBase {
virtual ~DescriptorInfo() {}
intptr_t GetPollEvents();
virtual void Close() {
VOID_TEMP_FAILURE_RETRY(close(fd_));
// Should be VOID_TEMP_FAILURE_RETRY
VOID_NO_RETRY_EXPECTED(close(fd_));
fd_ = -1;
}
@ -53,57 +58,37 @@ class DescriptorInfoMultiple
DISALLOW_COPY_AND_ASSIGN(DescriptorInfoMultiple);
};
// Information needed to call mx_handle_wait_many(), and to handle events.
class MagentaWaitManyInfo {
public:
MagentaWaitManyInfo();
~MagentaWaitManyInfo();
intptr_t capacity() const { return capacity_; }
intptr_t size() const { return size_; }
DescriptorInfo** descriptor_infos() const { return descriptor_infos_; }
mx_wait_item_t* items() const { return items_; }
void AddHandle(mx_handle_t handle, mx_signals_t signals, DescriptorInfo* di);
void RemoveHandle(mx_handle_t handle);
private:
static const intptr_t kInitialCapacity = 32;
void GrowArraysIfNeeded(intptr_t desired_size);
intptr_t capacity_;
intptr_t size_;
DescriptorInfo** descriptor_infos_;
mx_wait_item_t* items_;
DISALLOW_COPY_AND_ASSIGN(MagentaWaitManyInfo);
};
class EventHandlerImplementation {
public:
EventHandlerImplementation();
~EventHandlerImplementation();
void UpdateEpollInstance(intptr_t old_mask, DescriptorInfo* di);
// Gets the socket data structure for a given file
// descriptor. Creates a new one if one is not found.
DescriptorInfo* GetDescriptorInfo(intptr_t fd, bool is_listening);
void SendData(intptr_t id, Dart_Port dart_port, int64_t data);
void Start(EventHandler* handler);
void Shutdown();
const MagentaWaitManyInfo& info() const { return info_; }
private:
static void Poll(uword args);
static void* GetHashmapKeyFromFd(intptr_t fd);
static uint32_t GetHashmapHashFromFd(intptr_t fd);
int64_t GetTimeout() const;
void HandleEvents();
void HandleEvents(struct epoll_event* events, int size);
void HandleTimeout();
void WakeupHandler(intptr_t id, Dart_Port dart_port, int64_t data);
intptr_t GetPollEvents(intptr_t events, DescriptorInfo* di);
void HandleInterruptFd();
static void Poll(uword args);
HashMap socket_map_;
TimeoutQueue timeout_queue_;
bool shutdown_;
mx_handle_t interrupt_handles_[2];
MagentaWaitManyInfo info_;
int interrupt_fds_[2];
int epoll_fd_;
DISALLOW_COPY_AND_ASSIGN(EventHandlerImplementation);
};

View file

@ -39,6 +39,9 @@ class FDUtils {
// descriptor must be in blocking mode.
static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count);
// Closes fd without modifying errno.
static void SaveErrorAndClose(intptr_t fd);
private:
DISALLOW_ALLOCATION();
DISALLOW_IMPLICIT_CONSTRUCTORS(FDUtils);

View file

@ -137,6 +137,13 @@ ssize_t FDUtils::WriteToBlocking(int fd, const void* buffer, size_t count) {
return count;
}
void FDUtils::SaveErrorAndClose(intptr_t fd) {
int err = errno;
VOID_TEMP_FAILURE_RETRY(close(fd));
errno = err;
}
} // namespace bin
} // namespace dart

View file

@ -0,0 +1,154 @@
// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#include "platform/globals.h"
#if defined(TARGET_OS_FUCHSIA)
#include "bin/fdutils.h"
#include <errno.h> // NOLINT
#include <fcntl.h> // NOLINT
#include <sys/ioctl.h> // NOLINT
#include <unistd.h> // NOLINT
#include "platform/signal_blocker.h"
namespace dart {
namespace bin {
bool FDUtils::SetCloseOnExec(intptr_t fd) {
intptr_t status;
status = NO_RETRY_EXPECTED(fcntl(fd, F_GETFD));
if (status < 0) {
perror("fcntl(F_GETFD) failed");
return false;
}
status |= FD_CLOEXEC;
if (NO_RETRY_EXPECTED(fcntl(fd, F_SETFD, status)) < 0) {
perror("fcntl(F_SETFD, FD_CLOEXEC) failed");
return false;
}
return true;
}
static bool SetBlockingHelper(intptr_t fd, bool blocking) {
intptr_t status;
status = NO_RETRY_EXPECTED(fcntl(fd, F_GETFL));
if (status < 0) {
perror("fcntl(F_GETFL) failed");
return false;
}
status = blocking ? (status & ~O_NONBLOCK) : (status | O_NONBLOCK);
if (NO_RETRY_EXPECTED(fcntl(fd, F_SETFL, status)) < 0) {
perror("fcntl(F_SETFL, O_NONBLOCK) failed");
return false;
}
return true;
}
bool FDUtils::SetNonBlocking(intptr_t fd) {
return SetBlockingHelper(fd, false);
}
bool FDUtils::SetBlocking(intptr_t fd) {
return SetBlockingHelper(fd, true);
}
bool FDUtils::IsBlocking(intptr_t fd, bool* is_blocking) {
intptr_t status;
status = NO_RETRY_EXPECTED(fcntl(fd, F_GETFL));
if (status < 0) {
return false;
}
*is_blocking = (status & O_NONBLOCK) == 0;
return true;
}
intptr_t FDUtils::AvailableBytes(intptr_t fd) {
// TODO(MG-364): Enable this code when it is supported.
#if 0
int available; // ioctl for FIONREAD expects an 'int*' argument.
int result = NO_RETRY_EXPECTED(ioctl(fd, FIONREAD, &available));
if (result < 0) {
return result;
}
ASSERT(available >= 0);
return static_cast<intptr_t>(available);
#endif
errno = ENOTSUP;
return -1;
}
ssize_t FDUtils::ReadFromBlocking(int fd, void* buffer, size_t count) {
#ifdef DEBUG
bool is_blocking = false;
ASSERT(FDUtils::IsBlocking(fd, &is_blocking));
ASSERT(is_blocking);
#endif
size_t remaining = count;
char* buffer_pos = reinterpret_cast<char*>(buffer);
while (remaining > 0) {
ssize_t bytes_read = NO_RETRY_EXPECTED(read(fd, buffer_pos, remaining));
if (bytes_read == 0) {
return count - remaining;
} else if (bytes_read == -1) {
ASSERT(EAGAIN == EWOULDBLOCK);
// Error code EWOULDBLOCK should only happen for non blocking
// file descriptors.
ASSERT(errno != EWOULDBLOCK);
return -1;
} else {
ASSERT(bytes_read > 0);
remaining -= bytes_read;
buffer_pos += bytes_read;
}
}
return count;
}
ssize_t FDUtils::WriteToBlocking(int fd, const void* buffer, size_t count) {
#ifdef DEBUG
bool is_blocking = false;
ASSERT(FDUtils::IsBlocking(fd, &is_blocking));
ASSERT(is_blocking);
#endif
size_t remaining = count;
char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer));
while (remaining > 0) {
ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining));
if (bytes_written == 0) {
return count - remaining;
} else if (bytes_written == -1) {
ASSERT(EAGAIN == EWOULDBLOCK);
// Error code EWOULDBLOCK should only happen for non blocking
// file descriptors.
ASSERT(errno != EWOULDBLOCK);
return -1;
} else {
ASSERT(bytes_written > 0);
remaining -= bytes_written;
buffer_pos += bytes_written;
}
}
return count;
}
void FDUtils::SaveErrorAndClose(intptr_t fd) {
int err = errno;
NO_RETRY_EXPECTED(close(fd));
errno = err;
}
} // namespace bin
} // namespace dart
#endif // defined(TARGET_OS_FUCHSIA)

View file

@ -137,6 +137,13 @@ ssize_t FDUtils::WriteToBlocking(int fd, const void* buffer, size_t count) {
return count;
}
void FDUtils::SaveErrorAndClose(intptr_t fd) {
int err = errno;
VOID_TEMP_FAILURE_RETRY(close(fd));
errno = err;
}
} // namespace bin
} // namespace dart

View file

@ -137,6 +137,13 @@ ssize_t FDUtils::WriteToBlocking(int fd, const void* buffer, size_t count) {
return count;
}
void FDUtils::SaveErrorAndClose(intptr_t fd) {
int err = errno;
VOID_TEMP_FAILURE_RETRY(close(fd));
errno = err;
}
} // namespace bin
} // namespace dart

View file

@ -25,7 +25,7 @@
namespace dart {
namespace bin {
static void SaveErrorAndClose(intptr_t fd) {
static void FDUtils::SaveErrorAndClose(intptr_t fd) {
int err = errno;
VOID_TEMP_FAILURE_RETRY(close(fd));
errno = err;
@ -63,7 +63,7 @@ static intptr_t Create(const RawAddr& addr) {
return -1;
}
if (!FDUtils::SetCloseOnExec(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return fd;
@ -76,7 +76,7 @@ static intptr_t Connect(intptr_t fd, const RawAddr& addr) {
if ((result == 0) || (errno == EINPROGRESS)) {
return fd;
}
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -88,7 +88,7 @@ intptr_t Socket::CreateConnect(const RawAddr& addr) {
}
if (!FDUtils::SetNonBlocking(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return Connect(fd, addr);
@ -105,7 +105,7 @@ intptr_t Socket::CreateBindConnect(const RawAddr& addr,
intptr_t result = TEMP_FAILURE_RETRY(
bind(fd, &source_addr.addr, SocketAddress::GetAddrLength(source_addr)));
if ((result != 0) && (errno != EINPROGRESS)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -323,7 +323,7 @@ intptr_t Socket::CreateBindDatagram(const RawAddr& addr, bool reuseAddress) {
}
if (!FDUtils::SetCloseOnExec(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -335,12 +335,12 @@ intptr_t Socket::CreateBindDatagram(const RawAddr& addr, bool reuseAddress) {
if (NO_RETRY_EXPECTED(
bind(fd, &addr.addr, SocketAddress::GetAddrLength(addr))) < 0) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
if (!FDUtils::SetNonBlocking(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return fd;
@ -377,7 +377,7 @@ intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
}
if (!FDUtils::SetCloseOnExec(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -393,7 +393,7 @@ intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
if (NO_RETRY_EXPECTED(
bind(fd, &addr.addr, SocketAddress::GetAddrLength(addr))) < 0) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -403,17 +403,17 @@ intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
// Don't close the socket until we have created a new socket, ensuring
// that we do not get the bad port number again.
intptr_t new_fd = CreateBindListen(addr, backlog, v6_only);
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return new_fd;
}
if (NO_RETRY_EXPECTED(listen(fd, backlog > 0 ? backlog : SOMAXCONN)) != 0) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
if (!FDUtils::SetNonBlocking(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return fd;
@ -451,11 +451,11 @@ intptr_t ServerSocket::Accept(intptr_t fd) {
}
} else {
if (!FDUtils::SetCloseOnExec(socket)) {
SaveErrorAndClose(socket);
FDUtils::SaveErrorAndClose(socket);
return -1;
}
if (!FDUtils::SetNonBlocking(socket)) {
SaveErrorAndClose(socket);
FDUtils::SaveErrorAndClose(socket);
return -1;
}
}

View file

@ -10,56 +10,181 @@
#include "bin/socket.h"
#include "bin/socket_fuchsia.h"
#include <errno.h> // NOLINT
#include <fcntl.h> // NOLINT
#include <ifaddrs.h> // NOLINT
#include <net/if.h> // NOLINT
#include <netinet/tcp.h> // NOLINT
#include <stdio.h> // NOLINT
#include <stdlib.h> // NOLINT
#include <string.h> // NOLINT
#include <sys/ioctl.h> // NOLINT
#include <sys/stat.h> // NOLINT
#include <unistd.h> // NOLINT
#include "bin/fdutils.h"
#include "bin/file.h"
#include "platform/signal_blocker.h"
// #define SOCKET_LOG_INFO 1
// #define SOCKET_LOG_ERROR 1
// define SOCKET_LOG_ERROR to get log messages only for errors.
// define SOCKET_LOG_INFO to get log messages for both information and errors.
#if defined(SOCKET_LOG_INFO) || defined(SOCKET_LOG_ERROR)
#define LOG_ERR(msg, ...) \
{ \
int err = errno; \
Log::PrintErr("Dart Socket ERROR: %s:%d: " msg, __FILE__, __LINE__, \
##__VA_ARGS__); \
errno = err; \
}
#if defined(SOCKET_LOG_INFO)
#define LOG_INFO(msg, ...) \
Log::Print("Dart Socket INFO: %s:%d: " msg, __FILE__, __LINE__, ##__VA_ARGS__)
#else
#define LOG_INFO(msg, ...)
#endif // defined(SOCKET_LOG_INFO)
#else
#define LOG_ERR(msg, ...)
#define LOG_INFO(msg, ...)
#endif // defined(SOCKET_LOG_INFO) || defined(SOCKET_LOG_ERROR)
namespace dart {
namespace bin {
SocketAddress::SocketAddress(struct sockaddr* sa) {
UNIMPLEMENTED();
ASSERT(INET6_ADDRSTRLEN >= INET_ADDRSTRLEN);
if (!Socket::FormatNumericAddress(*reinterpret_cast<RawAddr*>(sa), as_string_,
INET6_ADDRSTRLEN)) {
as_string_[0] = 0;
}
socklen_t salen = GetAddrLength(*reinterpret_cast<RawAddr*>(sa));
memmove(reinterpret_cast<void*>(&addr_), sa, salen);
}
bool Socket::FormatNumericAddress(const RawAddr& addr, char* address, int len) {
UNIMPLEMENTED();
return false;
socklen_t salen = SocketAddress::GetAddrLength(addr);
LOG_INFO("Socket::FormatNumericAddress: calling getnameinfo\n");
return (NO_RETRY_EXPECTED(getnameinfo(&addr.addr, salen, address, len, NULL,
0, NI_NUMERICHOST) == 0));
}
bool Socket::Initialize() {
UNIMPLEMENTED();
// Nothing to do on Fuchsia.
return true;
}
intptr_t Socket::CreateConnect(const RawAddr& addr) {
UNIMPLEMENTED();
static intptr_t Create(const RawAddr& addr) {
LOG_INFO("Create: calling socket(SOCK_STREAM)\n");
intptr_t fd = NO_RETRY_EXPECTED(socket(addr.ss.ss_family, SOCK_STREAM, 0));
if (fd < 0) {
LOG_ERR("Create: socket(SOCK_STREAM) failed\n");
return -1;
}
LOG_INFO("Create: socket(SOCK_STREAM) -> fd %ld\n", fd);
if (!FDUtils::SetCloseOnExec(fd)) {
LOG_ERR("Create: FDUtils::SetCloseOnExec(%ld) failed\n", fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return fd;
}
static intptr_t CheckConnect(intptr_t fd) {
int val;
socklen_t vallen = sizeof(val);
LOG_INFO("CheckConnect: calling getsockopt(%ld)\n", fd);
intptr_t result = getsockopt(fd, SOL_SOCKET, SO_ERROR, &val, &vallen);
if (result != 0) {
FATAL1("CheckConnect: getsockopt(%ld) failed\n", fd);
} else if (vallen != sizeof(val)) {
FATAL1("CheckConnect: getsockopt(%ld) vallen != sizeof(val)!?!?\n", fd);
} else if (val != 0) {
LOG_ERR("CheckConnect: getsockopt(%ld) val = %d\n", fd, val);
return val;
}
LOG_INFO("CheckConnect: getsockopt(%ld) connected\n", fd);
return 0;
}
static intptr_t Connect(intptr_t fd, const RawAddr& addr) {
LOG_INFO("Connect: calling connect(%ld)\n", fd);
intptr_t result = NO_RETRY_EXPECTED(
connect(fd, &addr.addr, SocketAddress::GetAddrLength(addr)));
if ((result == 0) || (errno == EINPROGRESS)) {
LOG_INFO("Connect: connect(%ld) succeeded\n", fd);
intptr_t error = 0;
// TODO(US-87): When the issue is resolved this check is no longer needed.
while ((error = CheckConnect(fd)) != 0) {
if (error != EINPROGRESS) {
errno = error;
FDUtils::SaveErrorAndClose(fd);
return -1;
}
}
return fd;
}
LOG_ERR("Connect: connect(%ld) failed\n", fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
intptr_t Socket::CreateConnect(const RawAddr& addr) {
intptr_t fd = Create(addr);
if (fd < 0) {
return fd;
}
if (!FDUtils::SetNonBlocking(fd)) {
LOG_ERR("CreateConnect: FDUtils::SetNonBlocking(%ld) failed\n", fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return Connect(fd, addr);
}
intptr_t Socket::CreateBindConnect(const RawAddr& addr,
const RawAddr& source_addr) {
LOG_ERR("Socket::CreateBindConnect is unimplemented\n");
UNIMPLEMENTED();
return -1;
}
bool Socket::IsBindError(intptr_t error_number) {
UNIMPLEMENTED();
return false;
return error_number == EADDRINUSE || error_number == EADDRNOTAVAIL ||
error_number == EINVAL;
}
intptr_t Socket::Available(intptr_t fd) {
UNIMPLEMENTED();
return -1;
return FDUtils::AvailableBytes(fd);
}
intptr_t Socket::Read(intptr_t fd, void* buffer, intptr_t num_bytes) {
UNIMPLEMENTED();
return -1;
ASSERT(fd >= 0);
LOG_INFO("Socket::Read: calling read(%ld, %p, %ld)\n", fd, buffer, num_bytes);
ssize_t read_bytes = NO_RETRY_EXPECTED(read(fd, buffer, num_bytes));
ASSERT(EAGAIN == EWOULDBLOCK);
if ((read_bytes == -1) && (errno == EWOULDBLOCK)) {
// If the read would block we need to retry and therefore return 0
// as the number of bytes written.
read_bytes = 0;
} else if (read_bytes == -1) {
LOG_ERR("Socket::Read: read(%ld, %p, %ld) failed\n", fd, buffer, num_bytes);
} else {
LOG_INFO("Socket::Read: read(%ld, %p, %ld) succeeded\n", fd, buffer,
num_bytes);
}
return read_bytes;
}
@ -67,14 +192,30 @@ intptr_t Socket::RecvFrom(intptr_t fd,
void* buffer,
intptr_t num_bytes,
RawAddr* addr) {
LOG_ERR("Socket::RecvFrom is unimplemented\n");
UNIMPLEMENTED();
return -1;
}
intptr_t Socket::Write(intptr_t fd, const void* buffer, intptr_t num_bytes) {
UNIMPLEMENTED();
return -1;
ASSERT(fd >= 0);
LOG_INFO("Socket::Write: calling write(%ld, %p, %ld)\n", fd, buffer,
num_bytes);
ssize_t written_bytes = NO_RETRY_EXPECTED(write(fd, buffer, num_bytes));
ASSERT(EAGAIN == EWOULDBLOCK);
if ((written_bytes == -1) && (errno == EWOULDBLOCK)) {
// If the would block we need to retry and therefore return 0 as
// the number of bytes written.
written_bytes = 0;
} else if (written_bytes == -1) {
LOG_ERR("Socket::Write: write(%ld, %p, %ld) failed\n", fd, buffer,
num_bytes);
} else {
LOG_INFO("Socket::Write: write(%ld, %p, %ld) succeeded\n", fd, buffer,
num_bytes);
}
return written_bytes;
}
@ -82,35 +223,46 @@ intptr_t Socket::SendTo(intptr_t fd,
const void* buffer,
intptr_t num_bytes,
const RawAddr& addr) {
LOG_ERR("Socket::SendTo is unimplemented\n");
UNIMPLEMENTED();
return -1;
}
intptr_t Socket::GetPort(intptr_t fd) {
UNIMPLEMENTED();
return -1;
ASSERT(fd >= 0);
RawAddr raw;
socklen_t size = sizeof(raw);
LOG_INFO("Socket::GetPort: calling getsockname(%ld)\n", fd);
if (NO_RETRY_EXPECTED(getsockname(fd, &raw.addr, &size))) {
return 0;
}
return SocketAddress::GetAddrPort(raw);
}
SocketAddress* Socket::GetRemotePeer(intptr_t fd, intptr_t* port) {
LOG_ERR("Socket::GetRemotePeer is unimplemented\n");
UNIMPLEMENTED();
return NULL;
}
void Socket::GetError(intptr_t fd, OSError* os_error) {
LOG_ERR("Socket::GetError is unimplemented\n");
UNIMPLEMENTED();
}
int Socket::GetType(intptr_t fd) {
LOG_ERR("Socket::GetType is unimplemented\n");
UNIMPLEMENTED();
return File::kOther;
}
intptr_t Socket::GetStdioHandle(intptr_t num) {
LOG_ERR("Socket::GetStdioHandle is unimplemented\n");
UNIMPLEMENTED();
return num;
}
@ -119,13 +271,45 @@ intptr_t Socket::GetStdioHandle(intptr_t num) {
AddressList<SocketAddress>* Socket::LookupAddress(const char* host,
int type,
OSError** os_error) {
// UNIMPLEMENTED
ASSERT(*os_error == NULL);
*os_error = new OSError(-1,
"Socket::LookupAddress not implemented in "
"Fuchsia Dart VM runtime",
OSError::kGetAddressInfo);
return NULL;
// Perform a name lookup for a host name.
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = SocketAddress::FromType(type);
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_ADDRCONFIG;
hints.ai_protocol = IPPROTO_TCP;
struct addrinfo* info = NULL;
LOG_INFO("Socket::LookupAddress: calling getaddrinfo\n");
int status = NO_RETRY_EXPECTED(getaddrinfo(host, 0, &hints, &info));
if (status != 0) {
// We failed, try without AI_ADDRCONFIG. This can happen when looking up
// e.g. '::1', when there are no global IPv6 addresses.
hints.ai_flags = 0;
LOG_INFO("Socket::LookupAddress: calling getaddrinfo again\n");
status = NO_RETRY_EXPECTED(getaddrinfo(host, 0, &hints, &info));
if (status != 0) {
ASSERT(*os_error == NULL);
*os_error =
new OSError(status, gai_strerror(status), OSError::kGetAddressInfo);
return NULL;
}
}
intptr_t count = 0;
for (struct addrinfo* c = info; c != NULL; c = c->ai_next) {
if ((c->ai_family == AF_INET) || (c->ai_family == AF_INET6)) {
count++;
}
}
intptr_t i = 0;
AddressList<SocketAddress>* addresses = new AddressList<SocketAddress>(count);
for (struct addrinfo* c = info; c != NULL; c = c->ai_next) {
if ((c->ai_family == AF_INET) || (c->ai_family == AF_INET6)) {
addresses->SetAt(i, new SocketAddress(c->ai_addr));
i++;
}
}
freeaddrinfo(info);
return addresses;
}
@ -133,18 +317,21 @@ bool Socket::ReverseLookup(const RawAddr& addr,
char* host,
intptr_t host_len,
OSError** os_error) {
LOG_ERR("Socket::ReverseLookup is unimplemented\n");
UNIMPLEMENTED();
return false;
}
bool Socket::ParseAddress(int type, const char* address, RawAddr* addr) {
LOG_ERR("Socket::ParseAddress is unimplemented\n");
UNIMPLEMENTED();
return false;
}
intptr_t Socket::CreateBindDatagram(const RawAddr& addr, bool reuseAddress) {
LOG_ERR("Socket::CreateBindDatagram is unimplemented\n");
UNIMPLEMENTED();
return -1;
}
@ -166,71 +353,175 @@ AddressList<InterfaceSocketAddress>* Socket::ListInterfaces(
intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
intptr_t backlog,
bool v6_only) {
UNIMPLEMENTED();
return -1;
LOG_INFO("ServerSocket::CreateBindListen: calling socket(SOCK_STREAM)\n");
intptr_t fd = NO_RETRY_EXPECTED(socket(addr.ss.ss_family, SOCK_STREAM, 0));
if (fd < 0) {
LOG_ERR("ServerSocket::CreateBindListen: socket() failed\n");
return -1;
}
LOG_INFO("ServerSocket::CreateBindListen: socket(SOCK_STREAM) -> %ld\n", fd);
if (!FDUtils::SetCloseOnExec(fd)) {
LOG_ERR("ServerSocket::CreateBindListen: SetCloseOnExec(%ld) failed\n", fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
LOG_INFO("ServerSocket::CreateBindListen: calling setsockopt(%ld)\n", fd);
int optval = 1;
VOID_NO_RETRY_EXPECTED(
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)));
if (addr.ss.ss_family == AF_INET6) {
optval = v6_only ? 1 : 0;
LOG_INFO("ServerSocket::CreateBindListen: calling setsockopt(%ld)\n", fd);
VOID_NO_RETRY_EXPECTED(
setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval)));
}
LOG_INFO("ServerSocket::CreateBindListen: calling bind(%ld)\n", fd);
if (NO_RETRY_EXPECTED(
bind(fd, &addr.addr, SocketAddress::GetAddrLength(addr))) < 0) {
LOG_ERR("ServerSocket::CreateBindListen: bind(%ld) failed\n", fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
LOG_INFO("ServerSocket::CreateBindListen: bind(%ld) succeeded\n", fd);
// Test for invalid socket port 65535 (some browsers disallow it).
if ((SocketAddress::GetAddrPort(addr) == 0) &&
(Socket::GetPort(fd) == 65535)) {
// Don't close the socket until we have created a new socket, ensuring
// that we do not get the bad port number again.
intptr_t new_fd = CreateBindListen(addr, backlog, v6_only);
FDUtils::SaveErrorAndClose(fd);
return new_fd;
}
LOG_INFO("ServerSocket::CreateBindListen: calling listen(%ld)\n", fd);
if (NO_RETRY_EXPECTED(listen(fd, backlog > 0 ? backlog : SOMAXCONN)) != 0) {
LOG_ERR("ServerSocket::CreateBindListen: listen failed(%ld)\n", fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
LOG_INFO("ServerSocket::CreateBindListen: listen(%ld) succeeded\n", fd);
if (!FDUtils::SetNonBlocking(fd)) {
LOG_ERR("CreateBindListen: FDUtils::SetNonBlocking(%ld) failed\n", fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return fd;
}
bool ServerSocket::StartAccept(intptr_t fd) {
UNIMPLEMENTED();
return false;
USE(fd);
return true;
}
static bool IsTemporaryAcceptError(int error) {
// On Linux a number of protocol errors should be treated as EAGAIN.
// These are the ones for TCP/IP.
return (error == EAGAIN) || (error == ENETDOWN) || (error == EPROTO) ||
(error == ENOPROTOOPT) || (error == EHOSTDOWN) || (error == ENONET) ||
(error == EHOSTUNREACH) || (error == EOPNOTSUPP) ||
(error == ENETUNREACH);
}
intptr_t ServerSocket::Accept(intptr_t fd) {
UNIMPLEMENTED();
return -1;
intptr_t socket;
struct sockaddr clientaddr;
socklen_t addrlen = sizeof(clientaddr);
LOG_INFO("ServerSocket::Accept: calling accept(%ld)\n", fd);
socket = NO_RETRY_EXPECTED(accept(fd, &clientaddr, &addrlen));
if (socket == -1) {
if (IsTemporaryAcceptError(errno)) {
// We need to signal to the caller that this is actually not an
// error. We got woken up from the poll on the listening socket,
// but there is no connection ready to be accepted.
ASSERT(kTemporaryFailure != -1);
socket = kTemporaryFailure;
} else {
LOG_ERR("ServerSocket::Accept: accept(%ld) failed\n", fd);
}
} else {
LOG_INFO("ServerSocket::Accept: accept(%ld) -> socket %ld\n", fd, socket);
if (!FDUtils::SetCloseOnExec(socket)) {
LOG_ERR("FDUtils::SetCloseOnExec(%ld) failed\n", socket);
FDUtils::SaveErrorAndClose(socket);
return -1;
}
if (!FDUtils::SetNonBlocking(socket)) {
LOG_ERR("FDUtils::SetNonBlocking(%ld) failed\n", socket);
FDUtils::SaveErrorAndClose(socket);
return -1;
}
}
return socket;
}
void Socket::Close(intptr_t fd) {
UNIMPLEMENTED();
ASSERT(fd >= 0);
NO_RETRY_EXPECTED(close(fd));
}
bool Socket::GetNoDelay(intptr_t fd, bool* enabled) {
LOG_ERR("Socket::GetNoDelay is unimplemented\n");
UNIMPLEMENTED();
return false;
}
bool Socket::SetNoDelay(intptr_t fd, bool enabled) {
UNIMPLEMENTED();
return false;
int on = enabled ? 1 : 0;
return NO_RETRY_EXPECTED(setsockopt(fd, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char*>(&on),
sizeof(on))) == 0;
}
bool Socket::GetMulticastLoop(intptr_t fd, intptr_t protocol, bool* enabled) {
LOG_ERR("Socket::GetMulticastLoop is unimplemented\n");
UNIMPLEMENTED();
return false;
}
bool Socket::SetMulticastLoop(intptr_t fd, intptr_t protocol, bool enabled) {
LOG_ERR("Socket::SetMulticastLoop is unimplemented\n");
UNIMPLEMENTED();
return false;
}
bool Socket::GetMulticastHops(intptr_t fd, intptr_t protocol, int* value) {
LOG_ERR("Socket::GetMulticastHops is unimplemented\n");
UNIMPLEMENTED();
return false;
}
bool Socket::SetMulticastHops(intptr_t fd, intptr_t protocol, int value) {
LOG_ERR("Socket::SetMulticastHops is unimplemented\n");
UNIMPLEMENTED();
return false;
}
bool Socket::GetBroadcast(intptr_t fd, bool* enabled) {
LOG_ERR("Socket::GetBroadcast is unimplemented\n");
UNIMPLEMENTED();
return false;
}
bool Socket::SetBroadcast(intptr_t fd, bool enabled) {
LOG_ERR("Socket::SetBroadcast is unimplemented\n");
UNIMPLEMENTED();
return false;
}
@ -240,6 +531,7 @@ bool Socket::JoinMulticast(intptr_t fd,
const RawAddr& addr,
const RawAddr&,
int interfaceIndex) {
LOG_ERR("Socket::JoinMulticast is unimplemented\n");
UNIMPLEMENTED();
return false;
}
@ -249,6 +541,7 @@ bool Socket::LeaveMulticast(intptr_t fd,
const RawAddr& addr,
const RawAddr&,
int interfaceIndex) {
LOG_ERR("Socket::LeaveMulticast is unimplemented\n");
UNIMPLEMENTED();
return false;
}

View file

@ -9,4 +9,8 @@
#error Do not include socket_fuchsia.h directly. Use socket.h.
#endif
#include <arpa/inet.h>
#include <netdb.h>
#include <sys/socket.h>
#endif // RUNTIME_BIN_SOCKET_FUCHSIA_H_

View file

@ -28,13 +28,6 @@
namespace dart {
namespace bin {
static void SaveErrorAndClose(intptr_t fd) {
int err = errno;
VOID_TEMP_FAILURE_RETRY(close(fd));
errno = err;
}
SocketAddress::SocketAddress(struct sockaddr* sa) {
ASSERT(INET6_ADDRSTRLEN >= INET_ADDRSTRLEN);
if (!Socket::FormatNumericAddress(*reinterpret_cast<RawAddr*>(sa), as_string_,
@ -76,7 +69,7 @@ static intptr_t Connect(intptr_t fd, const RawAddr& addr) {
if ((result == 0) || (errno == EINPROGRESS)) {
return fd;
}
SaveErrorAndClose(fd);
FDUtils::FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -100,7 +93,7 @@ intptr_t Socket::CreateBindConnect(const RawAddr& addr,
intptr_t result = TEMP_FAILURE_RETRY(
bind(fd, &source_addr.addr, SocketAddress::GetAddrLength(source_addr)));
if ((result != 0) && (errno != EINPROGRESS)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -329,7 +322,7 @@ intptr_t Socket::CreateBindDatagram(const RawAddr& addr, bool reuseAddress) {
if (NO_RETRY_EXPECTED(
bind(fd, &addr.addr, SocketAddress::GetAddrLength(addr))) < 0) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return fd;
@ -415,7 +408,7 @@ intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
if (NO_RETRY_EXPECTED(
bind(fd, &addr.addr, SocketAddress::GetAddrLength(addr))) < 0) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -425,12 +418,12 @@ intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
// Don't close the socket until we have created a new socket, ensuring
// that we do not get the bad port number again.
intptr_t new_fd = CreateBindListen(addr, backlog, v6_only);
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return new_fd;
}
if (NO_RETRY_EXPECTED(listen(fd, backlog > 0 ? backlog : SOMAXCONN)) != 0) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -469,11 +462,11 @@ intptr_t ServerSocket::Accept(intptr_t fd) {
}
} else {
if (!FDUtils::SetCloseOnExec(socket)) {
SaveErrorAndClose(socket);
FDUtils::SaveErrorAndClose(socket);
return -1;
}
if (!FDUtils::SetNonBlocking(socket)) {
SaveErrorAndClose(socket);
FDUtils::SaveErrorAndClose(socket);
return -1;
}
}

View file

@ -27,7 +27,7 @@
namespace dart {
namespace bin {
static void SaveErrorAndClose(intptr_t fd) {
static void FDUtils::SaveErrorAndClose(intptr_t fd) {
int err = errno;
VOID_TEMP_FAILURE_RETRY(close(fd));
errno = err;
@ -65,11 +65,11 @@ static intptr_t Create(const RawAddr& addr) {
return -1;
}
if (!FDUtils::SetCloseOnExec(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
if (!FDUtils::SetNonBlocking(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return fd;
@ -82,7 +82,7 @@ static intptr_t Connect(intptr_t fd, const RawAddr& addr) {
if ((result == 0) || (errno == EINPROGRESS)) {
return fd;
}
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -107,7 +107,7 @@ intptr_t Socket::CreateBindConnect(const RawAddr& addr,
intptr_t result = TEMP_FAILURE_RETRY(
bind(fd, &source_addr.addr, SocketAddress::GetAddrLength(source_addr)));
if ((result != 0) && (errno != EINPROGRESS)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -318,7 +318,7 @@ intptr_t Socket::CreateBindDatagram(const RawAddr& addr, bool reuseAddress) {
}
if (!FDUtils::SetCloseOnExec(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -330,12 +330,12 @@ intptr_t Socket::CreateBindDatagram(const RawAddr& addr, bool reuseAddress) {
if (NO_RETRY_EXPECTED(
bind(fd, &addr.addr, SocketAddress::GetAddrLength(addr))) < 0) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
if (!FDUtils::SetNonBlocking(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return fd;
@ -409,7 +409,7 @@ intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
}
if (!FDUtils::SetCloseOnExec(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -425,7 +425,7 @@ intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
if (NO_RETRY_EXPECTED(
bind(fd, &addr.addr, SocketAddress::GetAddrLength(addr))) < 0) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
@ -435,17 +435,17 @@ intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
// Don't close the socket until we have created a new socket, ensuring
// that we do not get the bad port number again.
intptr_t new_fd = CreateBindListen(addr, backlog, v6_only);
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return new_fd;
}
if (NO_RETRY_EXPECTED(listen(fd, backlog > 0 ? backlog : SOMAXCONN)) != 0) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
if (!FDUtils::SetNonBlocking(fd)) {
SaveErrorAndClose(fd);
FDUtils::SaveErrorAndClose(fd);
return -1;
}
return fd;
@ -473,11 +473,11 @@ intptr_t ServerSocket::Accept(intptr_t fd) {
}
} else {
if (!FDUtils::SetCloseOnExec(socket)) {
SaveErrorAndClose(socket);
FDUtils::SaveErrorAndClose(socket);
return -1;
}
if (!FDUtils::SetNonBlocking(socket)) {
SaveErrorAndClose(socket);
FDUtils::SaveErrorAndClose(socket);
return -1;
}
}

View file

@ -2,6 +2,216 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
main() {
print("Hello, Fuchsia!");
import "dart:async";
import "dart:io";
testSimpleBind() async {
var s = await RawServerSocket.bind(InternetAddress.LOOPBACK_IP_V4, 0);
print("port = ${s.port}");
await s.close();
}
testSimpleConnect() async {
var server = await RawServerSocket.bind(InternetAddress.LOOPBACK_IP_V4, 0);
print("server port = ${server.port}");
server.listen((socket) {
print("listen socket port = ${socket.port}");
socket.close();
});
var socket = await RawSocket.connect("127.0.0.1", server.port);
print("socket port = ${socket.port}");
await server.close();
await socket.close();
}
testSimpleReadWrite({bool dropReads}) async {
// This test creates a server and a client connects. The client then
// writes and the server echos. When the server has finished its
// echo it half-closes. When the client gets the close event is
// closes fully.
const messageSize = 1000;
int serverReadCount = 0;
int clientReadCount = 0;
List<int> createTestData() {
return new List<int>.generate(messageSize, (index) => index & 0xff);
}
void verifyTestData(List<int> data) {
assert(messageSize == data.length);
List<int> expected = createTestData();
for (int i = 0; i < messageSize; i++) {
assert(expected[i] == data[i]);
}
}
var server = await RawServerSocket.bind(InternetAddress.LOOPBACK_IP_V4, 0);
server.listen((client) {
int bytesRead = 0;
int bytesWritten = 0;
bool closedEventReceived = false;
List<int> data = new List<int>(messageSize);
bool doneReading = false;
client.writeEventsEnabled = false;
client.listen((event) {
switch (event) {
case RawSocketEvent.READ:
if (doneReading) {
break;
}
if (dropReads) {
if (serverReadCount != 10) {
serverReadCount++;
break;
} else {
serverReadCount = 0;
}
}
print("client READ event bytesRead = $bytesRead");
assert(bytesWritten == 0);
assert(client.available() > 0);
var buffer = client.read(200);
print("client READ event: read ${buffer.length} more bytes");
data.setRange(bytesRead, bytesRead + buffer.length, buffer);
bytesRead += buffer.length;
if (bytesRead == data.length) {
verifyTestData(data);
print("client READ event. Done reading, enabling writes");
client.writeEventsEnabled = true;
doneReading = true;
}
break;
case RawSocketEvent.WRITE:
assert(!client.writeEventsEnabled);
bytesWritten += client.write(
data, bytesWritten, data.length - bytesWritten);
print("client WRITE event: $bytesWritten written");
if (bytesWritten < data.length) {
client.writeEventsEnabled = true;
}
if (bytesWritten == data.length) {
print("client WRITE event: done writing.");
client.shutdown(SocketDirection.SEND);
}
break;
case RawSocketEvent.READ_CLOSED:
print("client READ_CLOSED event");
server.close();
break;
case RawSocketEvent.CLOSED:
assert(!closedEventReceived);
print("client CLOSED event");
closedEventReceived = true;
break;
default: throw "Unexpected event $event";
}
},
onDone: () {assert(closedEventReceived);});
});
{
var completer = new Completer();
var socket = await RawSocket.connect("127.0.0.1", server.port);
int bytesRead = 0;
int bytesWritten = 0;
bool closedEventReceived = false;
List<int> data = createTestData();
socket.listen((event) {
switch (event) {
case RawSocketEvent.READ:
assert(socket.available() > 0);
if (dropReads) {
if (clientReadCount != 10) {
clientReadCount++;
break;
} else {
clientReadCount = 0;
}
}
print("server READ event: ${bytesRead} read");
var buffer = socket.read();
print("server READ event: read ${buffer.length} more bytes");
data.setRange(bytesRead, bytesRead + buffer.length, buffer);
bytesRead += buffer.length;
break;
case RawSocketEvent.WRITE:
assert(bytesRead == 0);
assert(!socket.writeEventsEnabled);
bytesWritten += socket.write(
data, bytesWritten, data.length - bytesWritten);
print("server WRITE event: ${bytesWritten} written");
if (bytesWritten < data.length) {
socket.writeEventsEnabled = true;
} else {
print("server WRITE event: done writing");
data = new List<int>(messageSize);
}
break;
case RawSocketEvent.READ_CLOSED:
print("server READ_CLOSED event");
verifyTestData(data);
socket.close();
break;
case RawSocketEvent.CLOSED:
assert(!closedEventReceived);
print("server CLOSED event");
closedEventReceived = true;
break;
default: throw "Unexpected event $event";
}
},
onDone: () {
assert(closedEventReceived);
completer.complete(null);
});
return completer.future;
}
}
Future testGoogleUrl(SecurityContext context, String outcome) async {
var client = new HttpClient(context: context);
// We need to use an external server that is backed by a
// built-in root certificate authority.
try {
// First, check if the lookup works.
await InternetAddress.lookup('www.google.com');
var request = await client.getUrl(Uri.parse('http://www.google.com'));
request.followRedirects = false;
var response = await request.close();
assert('pass' == outcome);
try { await response.drain(); } catch (e) {
print('drain failed: $e');
}
} catch (e) {
// Lookup failed or connection failed. Don't report a failure.
print("SocketException: $e");
} finally {
client.close();
}
}
main() async {
print("Hello, Fuchsia!");
print("testSimpleBind");
await testSimpleBind();
print("testSimpleBind done");
print("testSimpleConnect");
await testSimpleConnect();
print("testSimpleConnect done");
// print("testSimpleReadWrite");
// await testSimpleReadWrite(dropReads: false);
// print("testSimpleReadWrite done");
// print("testGoogleUrl");
// await testGoogleUrl(null, 'pass');
// print("testGoogleUrl done");
print("Goodbyte, Fuchsia!");
}