[dart:io] Adds a finalizer to _NativeSocket to avoid socket leaks

The finalizer sends the "close" message to the EventHandler for the
file descriptor in the _NativeSocket's native field. To avoid races and
spurious messages, this CL stores a pointer to a wrapper object in the
native field instead of the file descriptor. All messsages about the
_NativeSocket sent to the EventHandler use the wrapper object instead of
the file descriptor. When the EventHandler closes the file, the file
descriptor in the wrapper object is set to -1 so that the finalizer will
instead do nothing.

On Windows, there is another level of indirection since the OS HANDLEs
were already wrapped in various kinds of Handle objects. As an additional
complication, ClientSocket close on Windows is asynchronous, so the
EventHandler may shutdown before all of the ClientSocket Handles can be
destroyed.

related #27898, #28081

R=johnmccutchan@google.com

Review-Url: https://codereview.chromium.org/2760293002 .
This commit is contained in:
Zachary Anderson 2017-03-28 07:44:04 -07:00
parent 011a5a6a66
commit 89dba57bcf
23 changed files with 563 additions and 174 deletions

View file

@ -33,6 +33,7 @@ void ErrorExit(int exit_code, const char* format, ...) {
free(error);
}
Process::ClearAllSignalHandlers();
EventHandler::Stop();
Platform::Exit(exit_code);
}

View file

@ -112,18 +112,16 @@ EventHandlerImplementation* EventHandler::delegate() {
}
void EventHandler::SendFromNative(intptr_t id, Dart_Port port, int64_t data) {
event_handler->SendData(id, port, data);
}
/*
* Send data to the EventHandler thread to register for a given instance
* args[0] a ReceivePort args[1] with a notification event args[2].
*/
void FUNCTION_NAME(EventHandler_SendData)(Dart_NativeArguments args) {
Dart_Handle sender = Dart_GetNativeArgument(args, 0);
intptr_t id;
if (Dart_IsNull(sender)) {
id = kTimerId;
} else {
id = Socket::GetSocketIdNativeField(sender);
}
// Get the id out of the send port. If the handle is not a send port
// we will get an error and propagate that out.
Dart_Handle handle = Dart_GetNativeArgument(args, 1);
@ -133,6 +131,17 @@ void FUNCTION_NAME(EventHandler_SendData)(Dart_NativeArguments args) {
Dart_PropagateError(handle);
UNREACHABLE();
}
Dart_Handle sender = Dart_GetNativeArgument(args, 0);
intptr_t id;
if (Dart_IsNull(sender)) {
id = kTimerId;
} else {
Socket* socket = Socket::GetSocketIdNativeField(sender);
ASSERT(dart_port != ILLEGAL_PORT);
socket->set_port(dart_port);
socket->Retain(); // inc refcount before sending to the eventhandler.
id = reinterpret_cast<intptr_t>(socket);
}
int64_t data = DartUtils::GetIntegerValue(Dart_GetNativeArgument(args, 2));
event_handler->SendData(id, dart_port, data);
}

View file

@ -647,6 +647,8 @@ class EventHandler {
static EventHandlerImplementation* delegate();
static void SendFromNative(intptr_t id, Dart_Port port, int64_t data);
private:
friend class EventHandlerImplementation;
EventHandlerImplementation delegate_;

View file

@ -205,9 +205,13 @@ void EventHandlerImplementation::HandleInterruptFd() {
shutdown_ = true;
} else {
ASSERT((msg[i].data & COMMAND_MASK) != 0);
Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
RefCntReleaseScope<Socket> rs(socket);
if (socket->fd() == -1) {
continue;
}
DescriptorInfo* di =
GetDescriptorInfo(msg[i].id, IS_LISTENING_SOCKET(msg[i].data));
GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for reading.
@ -235,18 +239,20 @@ void EventHandlerImplementation::HandleInterruptFd() {
MutexLocker locker(registry->mutex());
if (registry->CloseSafe(fd)) {
if (registry->CloseSafe(socket)) {
ASSERT(new_mask == 0);
socket_map_.Remove(GetHashmapKeyFromFd(fd),
GetHashmapHashFromFd(fd));
di->Close();
delete di;
socket->SetClosedFd();
}
} else {
ASSERT(new_mask == 0);
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
socket->SetClosedFd();
}
DartUtils::PostInt32(port, 1 << kDestroyedEvent);
@ -405,6 +411,7 @@ void EventHandlerImplementation::Poll(uword args) {
handler_impl->HandleEvents(events, result);
}
}
DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
handler->NotifyShutdownDone();
}

View file

@ -279,8 +279,13 @@ void EventHandlerImplementation::HandleInterruptFd() {
} else {
ASSERT((msg[i].data & COMMAND_MASK) != 0);
LOG_INFO("HandleInterruptFd command\n");
Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
RefCntReleaseScope<Socket> rs(socket);
if (socket->fd() == -1) {
continue;
}
DescriptorInfo* di =
GetDescriptorInfo(msg[i].id, IS_LISTENING_SOCKET(msg[i].data));
GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for reading.
@ -311,13 +316,14 @@ void EventHandlerImplementation::HandleInterruptFd() {
MutexLocker locker(registry->mutex());
if (registry->CloseSafe(fd)) {
if (registry->CloseSafe(socket)) {
ASSERT(new_mask == 0);
socket_map_.Remove(GetHashmapKeyFromFd(fd),
GetHashmapHashFromFd(fd));
di->Close();
LOG_INFO("Closed %d\n", di->fd());
delete di;
socket->SetClosedFd();
}
} else {
ASSERT(new_mask == 0);
@ -325,6 +331,7 @@ void EventHandlerImplementation::HandleInterruptFd() {
di->Close();
LOG_INFO("Closed %d\n", di->fd());
delete di;
socket->SetClosedFd();
}
bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
@ -468,6 +475,7 @@ void EventHandlerImplementation::Poll(uword args) {
handler_impl->HandleEvents(events, result);
}
}
DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
handler->NotifyShutdownDone();
}

View file

@ -222,9 +222,13 @@ void EventHandlerImplementation::HandleInterruptFd() {
shutdown_ = true;
} else {
ASSERT((msg[i].data & COMMAND_MASK) != 0);
Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
RefCntReleaseScope<Socket> rs(socket);
if (socket->fd() == -1) {
continue;
}
DescriptorInfo* di =
GetDescriptorInfo(msg[i].id, IS_LISTENING_SOCKET(msg[i].data));
GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for reading.
@ -252,20 +256,21 @@ void EventHandlerImplementation::HandleInterruptFd() {
MutexLocker locker(registry->mutex());
if (registry->CloseSafe(fd)) {
if (registry->CloseSafe(socket)) {
ASSERT(new_mask == 0);
socket_map_.Remove(GetHashmapKeyFromFd(fd),
GetHashmapHashFromFd(fd));
di->Close();
delete di;
socket->SetClosedFd();
}
} else {
ASSERT(new_mask == 0);
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
socket->SetClosedFd();
}
DartUtils::PostInt32(port, 1 << kDestroyedEvent);
} else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
int count = TOKEN_COUNT(msg[i].data);
@ -402,6 +407,7 @@ void EventHandlerImplementation::Poll(uword args) {
handler_impl->HandleEvents(events, result);
}
}
DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
handler->NotifyShutdownDone();
}

View file

@ -222,9 +222,13 @@ void EventHandlerImplementation::HandleInterruptFd() {
shutdown_ = true;
} else {
ASSERT((msg[i].data & COMMAND_MASK) != 0);
Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
RefCntReleaseScope<Socket> rs(socket);
if (socket->fd() == -1) {
continue;
}
DescriptorInfo* di =
GetDescriptorInfo(msg[i].id, IS_LISTENING_SOCKET(msg[i].data));
GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
ASSERT(!di->IsListeningSocket());
// Close the socket for reading.
@ -252,18 +256,20 @@ void EventHandlerImplementation::HandleInterruptFd() {
MutexLocker locker(registry->mutex());
if (registry->CloseSafe(fd)) {
if (registry->CloseSafe(socket)) {
ASSERT(new_mask == 0);
socket_map_.Remove(GetHashmapKeyFromFd(fd),
GetHashmapHashFromFd(fd));
di->Close();
delete di;
socket->SetClosedFd();
}
} else {
ASSERT(new_mask == 0);
socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
di->Close();
delete di;
socket->SetClosedFd();
}
DartUtils::PostInt32(port, 1 << kDestroyedEvent);
@ -469,6 +475,7 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
handler_impl->HandleEvents(events, result);
}
}
DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
handler->NotifyShutdownDone();
}

View file

@ -115,8 +115,10 @@ int OverlappedBuffer::GetRemainingLength() {
return data_length_ - index_;
}
Handle::Handle(intptr_t handle)
: DescriptorInfoBase(handle),
: ReferenceCounted(),
DescriptorInfoBase(handle),
handle_(reinterpret_cast<HANDLE>(handle)),
completion_port_(INVALID_HANDLE_VALUE),
event_handler_(NULL),
@ -138,6 +140,10 @@ Handle::~Handle() {
bool Handle::CreateCompletionPort(HANDLE completion_port) {
ASSERT(completion_port_ == INVALID_HANDLE_VALUE);
// A reference to the Handle is Retained by the IO completion port.
// It is Released by DeleteIfClosed.
Retain();
completion_port_ = CreateIoCompletionPort(
handle(), completion_port, reinterpret_cast<ULONG_PTR>(this), 0);
return (completion_port_ != NULL);
@ -392,8 +398,16 @@ void Handle::HandleIssueError() {
void FileHandle::EnsureInitialized(EventHandlerImplementation* event_handler) {
MonitorLocker ml(monitor_);
event_handler_ = event_handler;
if (SupportsOverlappedIO() && (completion_port_ == INVALID_HANDLE_VALUE)) {
CreateCompletionPort(event_handler_->completion_port());
if (completion_port_ == INVALID_HANDLE_VALUE) {
if (SupportsOverlappedIO()) {
CreateCompletionPort(event_handler_->completion_port());
} else {
// We need to retain the Handle even if overlapped IO is not supported.
// It is Released by DeleteIfClosed after ReadSyncCompleteAsync
// manually puts an event on the IO completion port.
Retain();
completion_port_ = event_handler_->completion_port();
}
}
}
@ -546,9 +560,13 @@ void ListenSocket::AcceptComplete(OverlappedBuffer* buffer,
static void DeleteIfClosed(Handle* handle) {
if (handle->IsClosed()) {
handle->set_completion_port(INVALID_HANDLE_VALUE);
handle->set_event_handler(NULL);
handle->NotifyAllDartPorts(1 << kDestroyedEvent);
handle->RemoveAllPorts();
delete handle;
// Once the Handle is closed, no further events on the IO completion port
// will mention it. Thus, we can drop the reference here.
handle->Release();
}
}
@ -561,11 +579,22 @@ void ListenSocket::DoClose() {
ClientSocket* client = Accept();
if (client != NULL) {
client->Close();
// Release the reference from the list.
// When an accept completes, we make a new ClientSocket (1 reference),
// and add it to the IO completion port (1 more reference). If an
// accepted connection is never requested by the Dart code, then
// this list owns a reference (first Release), and the IO completion
// port owns a reference, (second Release in DeleteIfClosed).
client->Release();
DeleteIfClosed(client);
} else {
break;
}
}
// To finish resetting the state of the ListenSocket back to what it was
// before EnsureInitialized was called, we have to reset the AcceptEx_
// function pointer.
AcceptEx_ = NULL;
}
@ -792,6 +821,10 @@ intptr_t StdHandle::Write(const void* buffer, intptr_t num_bytes) {
}
if (!write_thread_exists_) {
write_thread_exists_ = true;
// The write thread gets a reference to the Handle, which it places in
// the events it puts on the IO completion port. The reference is
// Released by DeleteIfClosed.
Retain();
int result = Thread::Start(WriteFileThread, reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Failed to start write file thread %d", result);
@ -828,6 +861,11 @@ void StdHandle::DoClose() {
}
#if defined(DEBUG)
intptr_t ClientSocket::disconnecting_ = 0;
#endif
bool ClientSocket::LoadDisconnectEx() {
// Load the DisconnectEx function into memory using WSAIoctl.
GUID guid_disconnect_ex = WSAID_DISCONNECTEX;
@ -914,8 +952,18 @@ void ClientSocket::IssueDisconnect() {
if (ok || (WSAGetLastError() != WSA_IO_PENDING)) {
DisconnectComplete(buffer);
}
// When the Dart side receives this event, it may decide to close its Dart
// ports. When all ports are closed, the VM will shut down. The EventHandler
// will then shut down. If the EventHandler shuts down before this
// asynchronous disconnect finishes, this ClientSocket will be leaked.
// TODO(dart:io): Retain a list of client sockets that are in the process of
// disconnecting. Disconnect them forcefully, and clean up their resources
// when the EventHandler shuts down.
NotifyAllDartPorts(1 << kDestroyedEvent);
RemoveAllPorts();
#if defined(DEBUG)
disconnecting_++;
#endif
}
@ -926,6 +974,9 @@ void ClientSocket::DisconnectComplete(OverlappedBuffer* buffer) {
OverlappedBuffer::DisposeBuffer(data_ready_);
}
mark_closed();
#if defined(DEBUG)
disconnecting_--;
#endif
}
@ -1037,7 +1088,12 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
} else if (msg->id == kShutdownId) {
shutdown_ = true;
} else {
Handle* handle = reinterpret_cast<Handle*>(msg->id);
Socket* socket = reinterpret_cast<Socket*>(msg->id);
RefCntReleaseScope<Socket> rs(socket);
if (socket->fd() == -1) {
return;
}
Handle* handle = reinterpret_cast<Handle*>(socket->fd());
ASSERT(handle != NULL);
if (handle->is_listen_socket()) {
@ -1062,9 +1118,10 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
// are listening on the same (address, port) combination.
ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
MutexLocker locker(registry->mutex());
if (registry->CloseSafe(reinterpret_cast<intptr_t>(listen_socket))) {
if (registry->CloseSafe(socket)) {
ASSERT(listen_socket->Mask() == 0);
listen_socket->Close();
socket->SetClosedFd();
}
DartUtils::PostInt32(msg->dart_port, 1 << kDestroyedEvent);
@ -1132,6 +1189,7 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
} else if (IS_COMMAND(msg->data, kCloseCommand)) {
handle->SetPortAndMask(msg->dart_port, 0);
handle->Close();
socket->SetClosedFd();
} else {
UNREACHABLE();
}
@ -1313,6 +1371,44 @@ void EventHandlerImplementation::HandleIOCompletion(DWORD bytes,
}
void EventHandlerImplementation::HandleCompletionOrInterrupt(
BOOL ok,
DWORD bytes,
ULONG_PTR key,
OVERLAPPED* overlapped) {
if (!ok) {
// Treat ERROR_CONNECTION_ABORTED as connection closed.
// The error ERROR_OPERATION_ABORTED is set for pending
// accept requests for a listen socket which is closed.
// ERROR_NETNAME_DELETED occurs when the client closes
// the socket it is reading from.
DWORD last_error = GetLastError();
if ((last_error == ERROR_CONNECTION_ABORTED) ||
(last_error == ERROR_OPERATION_ABORTED) ||
(last_error == ERROR_NETNAME_DELETED) ||
(last_error == ERROR_BROKEN_PIPE)) {
ASSERT(bytes == 0);
HandleIOCompletion(bytes, key, overlapped);
} else if (last_error == ERROR_MORE_DATA) {
// Don't ASSERT no bytes in this case. This can happen if the receive
// buffer for datagram sockets is too small to contain a full datagram,
// and in this case bytes hold the bytes that was read.
HandleIOCompletion(-1, key, overlapped);
} else {
ASSERT(bytes == 0);
HandleIOCompletion(-1, key, overlapped);
}
} else if (key == NULL) {
// A key of NULL signals an interrupt message.
InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
HandleInterrupt(msg);
delete msg;
} else {
HandleIOCompletion(bytes, key, overlapped);
}
}
EventHandlerImplementation::EventHandlerImplementation() {
startup_monitor_ = new Monitor();
handler_thread_id_ = Thread::kInvalidThreadId;
@ -1374,19 +1470,20 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
ml.Notify();
}
DWORD bytes;
ULONG_PTR key;
OVERLAPPED* overlapped;
BOOL ok;
while (!handler_impl->shutdown_) {
DWORD bytes;
ULONG_PTR key;
OVERLAPPED* overlapped;
int64_t millis = handler_impl->GetTimeout();
ASSERT(millis == kInfinityTimeout || millis >= 0);
if (millis > kMaxInt32) {
millis = kMaxInt32;
}
ASSERT(sizeof(int32_t) == sizeof(DWORD));
BOOL ok =
GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes, &key,
&overlapped, static_cast<DWORD>(millis));
DWORD timeout = static_cast<DWORD>(millis);
ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
&key, &overlapped, timeout);
if (!ok && (overlapped == NULL)) {
if (GetLastError() == ERROR_ABANDONED_WAIT_0) {
@ -1397,37 +1494,35 @@ void EventHandlerImplementation::EventHandlerEntry(uword args) {
// Timeout is signalled by false result and NULL in overlapped.
handler_impl->HandleTimeout();
}
} else if (!ok) {
// Treat ERROR_CONNECTION_ABORTED as connection closed.
// The error ERROR_OPERATION_ABORTED is set for pending
// accept requests for a listen socket which is closed.
// ERROR_NETNAME_DELETED occurs when the client closes
// the socket it is reading from.
DWORD last_error = GetLastError();
if ((last_error == ERROR_CONNECTION_ABORTED) ||
(last_error == ERROR_OPERATION_ABORTED) ||
(last_error == ERROR_NETNAME_DELETED) ||
(last_error == ERROR_BROKEN_PIPE)) {
ASSERT(bytes == 0);
handler_impl->HandleIOCompletion(bytes, key, overlapped);
} else if (last_error == ERROR_MORE_DATA) {
// Don't ASSERT no bytes in this case. This can happen if the receive
// buffer for datagram sockets is to small to contain a full datagram,
// and in this case bytes hold the bytes that was read.
handler_impl->HandleIOCompletion(-1, key, overlapped);
} else {
ASSERT(bytes == 0);
handler_impl->HandleIOCompletion(-1, key, overlapped);
}
} else if (key == NULL) {
// A key of NULL signals an interrupt message.
InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(overlapped);
handler_impl->HandleInterrupt(msg);
delete msg;
} else {
handler_impl->HandleIOCompletion(bytes, key, overlapped);
handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
}
}
// In a Debug build, drain the IO completion port to make sure we aren't
// leaking any (non-disconnecting) Handles. In a Release build, we don't care
// because the VM is going down, and the asserts below are Debug-only.
#if defined(DEBUG)
while (true) {
ok = GetQueuedCompletionStatus(handler_impl->completion_port(), &bytes,
&key, &overlapped, 0);
if (!ok && (overlapped == NULL)) {
// There was an error or nothing is ready. Assume the port is drained.
break;
}
handler_impl->HandleCompletionOrInterrupt(ok, bytes, key, overlapped);
}
#endif
// The eventhandler thread is going down so there should be no more live
// Handles or Sockets.
// TODO(dart:io): It would be nice to be able to assert here that:
// ReferenceCounted<Handle>::instances() == 0;
// However, we cannot at the moment. See the TODO on:
// ClientSocket::IssueDisconnect()
DEBUG_ASSERT(ReferenceCounted<Handle>::instances() ==
ClientSocket::disconnecting());
DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
handler->NotifyShutdownDone();
}

View file

@ -14,6 +14,7 @@
#include <ws2tcpip.h>
#include "bin/builtin.h"
#include "bin/reference_counting.h"
#include "bin/thread.h"
namespace dart {
@ -70,15 +71,15 @@ class OverlappedBuffer {
int GetRemainingLength();
bool IsEmpty() { return GetRemainingLength() == 0; }
Operation operation() { return operation_; }
SOCKET client() { return client_; }
Operation operation() const { return operation_; }
SOCKET client() const { return client_; }
char* GetBufferStart() { return reinterpret_cast<char*>(&buffer_data_); }
int GetBufferSize() { return buflen_; }
struct sockaddr* from() {
int GetBufferSize() const { return buflen_; }
struct sockaddr* from() const {
return from_;
}
socklen_t* from_len_addr() { return from_len_addr_; }
socklen_t from_len() { return from_ == NULL ? 0 : *from_len_addr_; }
socklen_t* from_len_addr() const { return from_len_addr_; }
socklen_t from_len() const { return from_ == NULL ? 0 : *from_len_addr_; }
// Returns the address of the OVERLAPPED structure with all fields
// initialized to zero.
@ -159,7 +160,7 @@ class OverlappedBuffer {
// Abstract super class for holding information on listen and connected
// sockets.
class Handle : public DescriptorInfoBase {
class Handle : public ReferenceCounted<Handle>, public DescriptorInfoBase {
public:
enum Type {
kFile,
@ -170,8 +171,6 @@ class Handle : public DescriptorInfoBase {
kDatagramSocket
};
virtual ~Handle();
// Socket interface exposing normal socket operations.
intptr_t Available();
intptr_t Read(void* buffer, intptr_t num_bytes);
@ -239,6 +238,14 @@ class Handle : public DescriptorInfoBase {
DWORD last_error() { return last_error_; }
void set_last_error(DWORD last_error) { last_error_ = last_error; }
void set_completion_port(HANDLE completion_port) {
completion_port_ = completion_port;
}
void set_event_handler(EventHandlerImplementation* event_handler) {
event_handler_ = event_handler;
}
protected:
// For access to monitor_;
friend class EventHandlerImplementation;
@ -252,6 +259,7 @@ class Handle : public DescriptorInfoBase {
};
explicit Handle(intptr_t handle);
virtual ~Handle();
virtual void HandleIssueError();
@ -280,6 +288,7 @@ class Handle : public DescriptorInfoBase {
int flags_;
friend class ReferenceCounted<Handle>;
DISALLOW_COPY_AND_ASSIGN(Handle);
};
@ -468,6 +477,10 @@ class ClientSocket : public DescriptorInfoSingleMixin<SocketHandle> {
void mark_closed() { closed_ = true; }
#if defined(DEBUG)
static intptr_t disconnecting() { return disconnecting_; }
#endif
private:
bool LoadDisconnectEx();
@ -476,6 +489,10 @@ class ClientSocket : public DescriptorInfoSingleMixin<SocketHandle> {
bool connected_;
bool closed_;
#if defined(DEBUG)
static intptr_t disconnecting_;
#endif
DISALLOW_COPY_AND_ASSIGN(ClientSocket);
};
@ -533,11 +550,14 @@ class EventHandlerImplementation {
OverlappedBuffer* buffer);
void HandleIOCompletion(DWORD bytes, ULONG_PTR key, OVERLAPPED* overlapped);
void HandleCompletionOrInterrupt(BOOL ok,
DWORD bytes,
ULONG_PTR key,
OVERLAPPED* overlapped);
HANDLE completion_port() { return completion_port_; }
private:
ClientSocket* client_sockets_head_;
Monitor* startup_monitor_;
ThreadId handler_thread_id_;
HANDLE handler_thread_handle_;

View file

@ -1269,6 +1269,7 @@ bool RunMainIsolate(const char* script_name, CommandLineOptions* dart_options) {
Log::PrintErr("VM cleanup failed: %s\n", error);
free(error);
}
Process::ClearAllSignalHandlers();
EventHandler::Stop();
Platform::Exit((exit_code != 0) ? exit_code : kErrorExitCode);
}
@ -1687,6 +1688,7 @@ void main(int argc, char** argv) {
Log::PrintErr("VM cleanup failed: %s\n", error);
free(error);
}
Process::ClearAllSignalHandlers();
EventHandler::Stop();
delete app_snapshot;

View file

@ -72,6 +72,14 @@ static char** ExtractCStringList(Dart_Handle strings,
return string_args;
}
void Process::ClearAllSignalHandlers() {
for (intptr_t i = 1; i <= kLastSignal; i++) {
ClearSignalHandler(i);
}
}
void FUNCTION_NAME(Process_Start)(Dart_NativeArguments args) {
Dart_Handle process = Dart_GetNativeArgument(args, 0);
intptr_t process_stdin;
@ -153,12 +161,12 @@ void FUNCTION_NAME(Process_Start)(Dart_NativeArguments args) {
&process_stdin, &process_stderr, &pid, &exit_event, &os_error_message);
if (error_code == 0) {
if (mode != kDetached) {
Socket::SetSocketIdNativeField(stdin_handle, process_stdin);
Socket::SetSocketIdNativeField(stdout_handle, process_stdout);
Socket::SetSocketIdNativeField(stderr_handle, process_stderr);
Socket::SetSocketIdNativeField(stdin_handle, process_stdin, false);
Socket::SetSocketIdNativeField(stdout_handle, process_stdout, false);
Socket::SetSocketIdNativeField(stderr_handle, process_stderr, false);
}
if (mode == kNormal) {
Socket::SetSocketIdNativeField(exit_handle, exit_event);
Socket::SetSocketIdNativeField(exit_handle, exit_event, false);
}
Process::SetProcessIdNativeField(process, pid);
} else {
@ -181,19 +189,26 @@ void FUNCTION_NAME(Process_Start)(Dart_NativeArguments args) {
void FUNCTION_NAME(Process_Wait)(Dart_NativeArguments args) {
Dart_Handle process = Dart_GetNativeArgument(args, 0);
intptr_t process_stdin =
Socket* process_stdin =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 1));
intptr_t process_stdout =
Socket* process_stdout =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 2));
intptr_t process_stderr =
Socket* process_stderr =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 3));
intptr_t exit_event =
Socket* exit_event =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 4));
ProcessResult result;
intptr_t pid;
Process::GetProcessIdNativeField(process, &pid);
if (Process::Wait(pid, process_stdin, process_stdout, process_stderr,
exit_event, &result)) {
bool success = Process::Wait(pid, process_stdin->fd(), process_stdout->fd(),
process_stderr->fd(), exit_event->fd(), &result);
// Process::Wait() closes the file handles, so blow away the fds in the
// Sockets so that they don't get picked up by the finalizer on _NativeSocket.
process_stdin->SetClosedFd();
process_stdout->SetClosedFd();
process_stderr->SetClosedFd();
exit_event->SetClosedFd();
if (success) {
Dart_Handle out = result.stdout_data();
if (Dart_IsError(out)) {
Dart_PropagateError(out);

View file

@ -73,7 +73,8 @@ enum ProcessSignals {
kSigprof = 27,
kSigwinch = 28,
kSigpoll = 29,
kSigsys = 31
kSigsys = 31,
kLastSignal = kSigsys,
};
@ -139,6 +140,7 @@ class Process {
static intptr_t SetSignalHandler(intptr_t signal);
static void ClearSignalHandler(intptr_t signal);
static void ClearAllSignalHandlers();
static Dart_Handle GetProcessIdNativeField(Dart_Handle process,
intptr_t* pid);

View file

@ -946,7 +946,9 @@ static Mutex* signal_mutex = new Mutex();
SignalInfo::~SignalInfo() {
reinterpret_cast<FileHandle*>(fd_)->Close();
FileHandle* file_handle = reinterpret_cast<FileHandle*>(fd_);
file_handle->Close();
file_handle->Release();
}
@ -1005,7 +1007,12 @@ intptr_t Process::SetSignalHandler(intptr_t signal) {
if (signal_handlers == NULL) {
if (SetConsoleCtrlHandler(SignalHandler, true) == 0) {
int error_code = GetLastError();
delete write_handle;
// Since SetConsoleCtrlHandler failed, the IO completion port will
// never receive an event for this handle, and will therefore never
// release the reference Retained by EnsureInitialized(). So, we
// have to do a second Release() here.
write_handle->Release();
write_handle->Release();
CloseProcessPipe(fds);
SetLastError(error_code);
return -1;
@ -1038,6 +1045,8 @@ void Process::ClearSignalHandler(intptr_t signal) {
if (signal_handlers == NULL) {
USE(SetConsoleCtrlHandler(SignalHandler, false));
}
FileHandle* file_handle = reinterpret_cast<FileHandle*>(handler->fd());
file_handle->Release();
}
delete handler;
}

View file

@ -33,32 +33,54 @@ class RefCntReleaseScope;
template <class Derived>
class ReferenceCounted {
public:
ReferenceCounted() : ref_count_(1) {}
ReferenceCounted() : ref_count_(1) {
#if defined(DEBUG)
AtomicOperations::FetchAndIncrement(&instances_);
#endif // defined(DEBUG)
}
~ReferenceCounted() { ASSERT(ref_count_ == 0); }
virtual ~ReferenceCounted() {
ASSERT(ref_count_ == 0);
#if defined(DEBUG)
AtomicOperations::FetchAndDecrement(&instances_);
#endif // defined(DEBUG)
}
void Retain() {
uintptr_t old = AtomicOperations::FetchAndIncrement(&ref_count_);
intptr_t old = AtomicOperations::FetchAndIncrement(&ref_count_);
ASSERT(old > 0);
}
void Release() {
uintptr_t old = AtomicOperations::FetchAndDecrement(&ref_count_);
intptr_t old = AtomicOperations::FetchAndDecrement(&ref_count_);
ASSERT(old > 0);
if (old == 1) {
delete static_cast<Derived*>(this);
}
}
#if defined(DEBUG)
static intptr_t instances() { return instances_; }
#endif // defined(DEBUG)
private:
uintptr_t ref_count_;
#if defined(DEBUG)
static intptr_t instances_;
#endif // defined(DEBUG)
intptr_t ref_count_;
// These are used only in the ASSERT below in RefCntReleaseScope.
uintptr_t ref_count() const { return ref_count_; }
intptr_t ref_count() const { return ref_count_; }
friend class RefCntReleaseScope<Derived>;
DISALLOW_COPY_AND_ASSIGN(ReferenceCounted);
};
#if defined(DEBUG)
template <class Derived>
intptr_t ReferenceCounted<Derived>::instances_ = 0;
#endif
// Creates a scope at the end of which a reference counted object is
// Released. This is useful for reference counted objects recieved by the IO
// Service, which have already been Retained E.g.:

View file

@ -7,6 +7,7 @@
#include "bin/socket.h"
#include "bin/dartutils.h"
#include "bin/eventhandler.h"
#include "bin/io_buffer.h"
#include "bin/isolate_data.h"
#include "bin/lockers.h"
@ -72,9 +73,10 @@ void ListeningSocketRegistry::RemoveByPort(intptr_t port) {
ListeningSocketRegistry::OSSocket* ListeningSocketRegistry::LookupByFd(
intptr_t fd) {
Socket* fd) {
HashMap::Entry* entry = sockets_by_fd_.Lookup(
GetHashmapKeyFromIntptr(fd), GetHashmapHashFromIntptr(fd), false);
GetHashmapKeyFromIntptr(reinterpret_cast<intptr_t>(fd)),
GetHashmapHashFromIntptr(reinterpret_cast<intptr_t>(fd)), false);
if (entry == NULL) {
return NULL;
}
@ -82,17 +84,19 @@ ListeningSocketRegistry::OSSocket* ListeningSocketRegistry::LookupByFd(
}
void ListeningSocketRegistry::InsertByFd(intptr_t fd, OSSocket* socket) {
void ListeningSocketRegistry::InsertByFd(Socket* fd, OSSocket* socket) {
HashMap::Entry* entry = sockets_by_fd_.Lookup(
GetHashmapKeyFromIntptr(fd), GetHashmapHashFromIntptr(fd), true);
GetHashmapKeyFromIntptr(reinterpret_cast<intptr_t>(fd)),
GetHashmapHashFromIntptr(reinterpret_cast<intptr_t>(fd)), true);
ASSERT(entry != NULL);
entry->value = reinterpret_cast<void*>(socket);
}
void ListeningSocketRegistry::RemoveByFd(intptr_t fd) {
sockets_by_fd_.Remove(GetHashmapKeyFromIntptr(fd),
GetHashmapHashFromIntptr(fd));
void ListeningSocketRegistry::RemoveByFd(Socket* fd) {
sockets_by_fd_.Remove(
GetHashmapKeyFromIntptr(reinterpret_cast<intptr_t>(fd)),
GetHashmapHashFromIntptr(reinterpret_cast<intptr_t>(fd)));
}
@ -112,7 +116,7 @@ Dart_Handle ListeningSocketRegistry::CreateBindListen(Dart_Handle socket_object,
// that if there is one also listening on the same address, it was created
// with `shared = true`, ...
OSSocket* os_socket = first_os_socket;
OSSocket* os_socket_same_addr = findOSSocketWithAddress(os_socket, addr);
OSSocket* os_socket_same_addr = FindOSSocketWithAddress(os_socket, addr);
if (os_socket_same_addr != NULL) {
if (!os_socket_same_addr->shared || !shared) {
@ -137,9 +141,13 @@ Dart_Handle ListeningSocketRegistry::CreateBindListen(Dart_Handle socket_object,
// the file descriptor.
os_socket->ref_count++;
// The same Socket is used by a second Dart _NativeSocket object.
// It Retains a reference.
os_socket->socketfd->Retain();
// We set as a side-effect the file descriptor on the dart
// socket_object.
Socket::SetSocketIdNativeField(socket_object, os_socket->socketfd);
Socket::ReuseSocketIdNativeField(socket_object, os_socket->socketfd,
true);
return Dart_True();
}
@ -147,20 +155,20 @@ Dart_Handle ListeningSocketRegistry::CreateBindListen(Dart_Handle socket_object,
}
// There is no socket listening on that (address, port), so we create new one.
intptr_t socketfd = ServerSocket::CreateBindListen(addr, backlog, v6_only);
if (socketfd == -5) {
intptr_t fd = ServerSocket::CreateBindListen(addr, backlog, v6_only);
if (fd == -5) {
OSError os_error(-1, "Invalid host", OSError::kUnknown);
return DartUtils::NewDartOSError(&os_error);
}
if (socketfd < 0) {
if (fd < 0) {
OSError error;
return DartUtils::NewDartOSError(&error);
}
if (!ServerSocket::StartAccept(socketfd)) {
if (!ServerSocket::StartAccept(fd)) {
OSError os_error(-1, "Failed to start accept", OSError::kUnknown);
return DartUtils::NewDartOSError(&os_error);
}
intptr_t allocated_port = Socket::GetPort(socketfd);
intptr_t allocated_port = Socket::GetPort(fd);
ASSERT(allocated_port > 0);
if (allocated_port != port) {
@ -181,6 +189,7 @@ Dart_Handle ListeningSocketRegistry::CreateBindListen(Dart_Handle socket_object,
first_os_socket = LookupByPort(allocated_port);
}
Socket* socketfd = new Socket(fd);
OSSocket* os_socket =
new OSSocket(addr, allocated_port, v6_only, shared, socketfd);
os_socket->ref_count = 1;
@ -190,7 +199,7 @@ Dart_Handle ListeningSocketRegistry::CreateBindListen(Dart_Handle socket_object,
InsertByFd(socketfd, os_socket);
// We set as a side-effect the port on the dart socket_object.
Socket::SetSocketIdNativeField(socket_object, socketfd);
Socket::ReuseSocketIdNativeField(socket_object, socketfd, true);
return Dart_True();
}
@ -245,16 +254,16 @@ void ListeningSocketRegistry::CloseAllSafe() {
}
bool ListeningSocketRegistry::CloseSafe(intptr_t socketfd) {
bool ListeningSocketRegistry::CloseSafe(Socket* socketfd) {
ASSERT(!mutex_->TryLock());
OSSocket* os_socket = LookupByFd(socketfd);
if (os_socket != NULL) {
return CloseOneSafe(os_socket, true);
} else {
// It should be impossible for the event handler to close something that
// hasn't been created before.
UNREACHABLE();
return false;
// A finalizer may direct the event handler to close a listening socket
// that it has never seen before. In this case, we return true to direct
// the eventhandler to clean up the socket.
return true;
}
}
@ -295,7 +304,8 @@ void FUNCTION_NAME(Socket_CreateConnect)(Dart_NativeArguments args) {
intptr_t socket = Socket::CreateConnect(addr);
OSError error;
if (socket >= 0) {
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket);
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket,
false);
Dart_SetReturnValue(args, Dart_True());
} else {
Dart_SetReturnValue(args, DartUtils::NewDartOSError(&error));
@ -314,7 +324,8 @@ void FUNCTION_NAME(Socket_CreateBindConnect)(Dart_NativeArguments args) {
intptr_t socket = Socket::CreateBindConnect(addr, sourceAddr);
OSError error;
if (socket >= 0) {
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket);
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket,
false);
Dart_SetReturnValue(args, Dart_True());
} else {
Dart_SetReturnValue(args, DartUtils::NewDartOSError(&error));
@ -337,7 +348,8 @@ void FUNCTION_NAME(Socket_CreateBindDatagram)(Dart_NativeArguments args) {
bool reuse_addr = DartUtils::GetBooleanValue(Dart_GetNativeArgument(args, 3));
intptr_t socket = Socket::CreateBindDatagram(addr, reuse_addr);
if (socket >= 0) {
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket);
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket,
false);
Dart_SetReturnValue(args, Dart_True());
} else {
OSError error;
@ -347,9 +359,9 @@ void FUNCTION_NAME(Socket_CreateBindDatagram)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_Available)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
intptr_t available = Socket::Available(socket);
intptr_t available = Socket::Available(socket->fd());
if (available >= 0) {
Dart_SetReturnValue(args, Dart_NewInteger(available));
} else {
@ -361,7 +373,7 @@ void FUNCTION_NAME(Socket_Available)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_Read)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
int64_t length = 0;
if (DartUtils::GetInt64Value(Dart_GetNativeArgument(args, 1), &length)) {
@ -374,7 +386,7 @@ void FUNCTION_NAME(Socket_Read)(Dart_NativeArguments args) {
Dart_PropagateError(result);
}
ASSERT(buffer != NULL);
intptr_t bytes_read = Socket::Read(socket, buffer, length);
intptr_t bytes_read = Socket::Read(socket->fd(), buffer, length);
if (bytes_read == length) {
Dart_SetReturnValue(args, result);
} else if (bytes_read > 0) {
@ -402,7 +414,7 @@ void FUNCTION_NAME(Socket_Read)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_RecvFrom)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
// TODO(sgjesse): Use a MTU value here. Only the loopback adapter can
@ -414,8 +426,8 @@ void FUNCTION_NAME(Socket_RecvFrom)(Dart_NativeArguments args) {
reinterpret_cast<uint8_t*>(malloc(65536));
}
RawAddr addr;
intptr_t bytes_read =
Socket::RecvFrom(socket, isolate_data->udp_receive_buffer, 65536, &addr);
intptr_t bytes_read = Socket::RecvFrom(
socket->fd(), isolate_data->udp_receive_buffer, 65536, &addr);
if (bytes_read == 0) {
Dart_SetReturnValue(args, Dart_Null());
return;
@ -472,7 +484,7 @@ void FUNCTION_NAME(Socket_RecvFrom)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_WriteList)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
Dart_Handle buffer_obj = Dart_GetNativeArgument(args, 1);
ASSERT(Dart_IsList(buffer_obj));
@ -495,7 +507,7 @@ void FUNCTION_NAME(Socket_WriteList)(Dart_NativeArguments args) {
}
ASSERT((offset + length) <= len);
buffer += offset;
intptr_t bytes_written = Socket::Write(socket, buffer, length);
intptr_t bytes_written = Socket::Write(socket->fd(), buffer, length);
if (bytes_written >= 0) {
Dart_TypedDataReleaseData(buffer_obj);
if (short_write) {
@ -515,7 +527,7 @@ void FUNCTION_NAME(Socket_WriteList)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_SendTo)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
Dart_Handle buffer_obj = Dart_GetNativeArgument(args, 1);
intptr_t offset = DartUtils::GetIntptrValue(Dart_GetNativeArgument(args, 2));
@ -537,7 +549,7 @@ void FUNCTION_NAME(Socket_SendTo)(Dart_NativeArguments args) {
}
ASSERT((offset + length) <= len);
buffer += offset;
intptr_t bytes_written = Socket::SendTo(socket, buffer, length, addr);
intptr_t bytes_written = Socket::SendTo(socket->fd(), buffer, length, addr);
if (bytes_written >= 0) {
Dart_TypedDataReleaseData(buffer_obj);
Dart_SetReturnValue(args, Dart_NewInteger(bytes_written));
@ -551,10 +563,10 @@ void FUNCTION_NAME(Socket_SendTo)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_GetPort)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
OSError os_error;
intptr_t port = Socket::GetPort(socket);
intptr_t port = Socket::GetPort(socket->fd());
if (port > 0) {
Dart_SetReturnValue(args, Dart_NewInteger(port));
} else {
@ -564,11 +576,11 @@ void FUNCTION_NAME(Socket_GetPort)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_GetRemotePeer)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
OSError os_error;
intptr_t port = 0;
SocketAddress* addr = Socket::GetRemotePeer(socket, &port);
SocketAddress* addr = Socket::GetRemotePeer(socket->fd(), &port);
if (addr != NULL) {
Dart_Handle list = Dart_NewList(2);
@ -590,19 +602,19 @@ void FUNCTION_NAME(Socket_GetRemotePeer)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_GetError)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
OSError os_error;
Socket::GetError(socket, &os_error);
Socket::GetError(socket->fd(), &os_error);
Dart_SetReturnValue(args, DartUtils::NewDartOSError(&os_error));
}
void FUNCTION_NAME(Socket_GetType)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
OSError os_error;
intptr_t type = Socket::GetType(socket);
intptr_t type = Socket::GetType(socket->fd());
if (type >= 0) {
Dart_SetReturnValue(args, Dart_NewInteger(type));
} else {
@ -615,20 +627,23 @@ void FUNCTION_NAME(Socket_GetStdioHandle)(Dart_NativeArguments args) {
int64_t num =
DartUtils::GetInt64ValueCheckRange(Dart_GetNativeArgument(args, 1), 0, 2);
intptr_t socket = Socket::GetStdioHandle(num);
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket);
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), socket,
false);
Dart_SetReturnValue(args, Dart_NewBoolean(socket >= 0));
}
void FUNCTION_NAME(Socket_GetSocketId)(Dart_NativeArguments args) {
intptr_t id = Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
intptr_t id = reinterpret_cast<intptr_t>(socket);
Dart_SetReturnValue(args, Dart_NewInteger(id));
}
void FUNCTION_NAME(Socket_SetSocketId)(Dart_NativeArguments args) {
intptr_t id = DartUtils::GetIntptrValue(Dart_GetNativeArgument(args, 1));
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), id);
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 0), id, false);
}
@ -651,11 +666,12 @@ void FUNCTION_NAME(ServerSocket_CreateBindListen)(Dart_NativeArguments args) {
void FUNCTION_NAME(ServerSocket_Accept)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
intptr_t new_socket = ServerSocket::Accept(socket);
intptr_t new_socket = ServerSocket::Accept(socket->fd());
if (new_socket >= 0) {
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 1), new_socket);
Socket::SetSocketIdNativeField(Dart_GetNativeArgument(args, 1), new_socket,
false);
Dart_SetReturnValue(args, Dart_True());
} else if (new_socket == ServerSocket::kTemporaryFailure) {
Dart_SetReturnValue(args, Dart_False());
@ -791,7 +807,7 @@ CObject* Socket::ListInterfacesRequest(const CObjectArray& request) {
void FUNCTION_NAME(Socket_GetOption)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
int64_t option = DartUtils::GetIntegerValue(Dart_GetNativeArgument(args, 1));
intptr_t protocol = static_cast<intptr_t>(
@ -800,7 +816,7 @@ void FUNCTION_NAME(Socket_GetOption)(Dart_NativeArguments args) {
switch (option) {
case 0: { // TCP_NODELAY.
bool enabled;
ok = Socket::GetNoDelay(socket, &enabled);
ok = Socket::GetNoDelay(socket->fd(), &enabled);
if (ok) {
Dart_SetReturnValue(args, enabled ? Dart_True() : Dart_False());
}
@ -808,7 +824,7 @@ void FUNCTION_NAME(Socket_GetOption)(Dart_NativeArguments args) {
}
case 1: { // IP_MULTICAST_LOOP.
bool enabled;
ok = Socket::GetMulticastLoop(socket, protocol, &enabled);
ok = Socket::GetMulticastLoop(socket->fd(), protocol, &enabled);
if (ok) {
Dart_SetReturnValue(args, enabled ? Dart_True() : Dart_False());
}
@ -816,7 +832,7 @@ void FUNCTION_NAME(Socket_GetOption)(Dart_NativeArguments args) {
}
case 2: { // IP_MULTICAST_TTL.
int value;
ok = Socket::GetMulticastHops(socket, protocol, &value);
ok = Socket::GetMulticastHops(socket->fd(), protocol, &value);
if (ok) {
Dart_SetReturnValue(args, Dart_NewInteger(value));
}
@ -828,7 +844,7 @@ void FUNCTION_NAME(Socket_GetOption)(Dart_NativeArguments args) {
}
case 4: { // IP_BROADCAST.
bool enabled;
ok = Socket::GetBroadcast(socket, &enabled);
ok = Socket::GetBroadcast(socket->fd(), &enabled);
if (ok) {
Dart_SetReturnValue(args, enabled ? Dart_True() : Dart_False());
}
@ -847,7 +863,7 @@ void FUNCTION_NAME(Socket_GetOption)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_SetOption)(Dart_NativeArguments args) {
bool result = false;
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
int64_t option = DartUtils::GetIntegerValue(Dart_GetNativeArgument(args, 1));
int64_t protocol = DartUtils::GetInt64ValueCheckRange(
@ -856,16 +872,17 @@ void FUNCTION_NAME(Socket_SetOption)(Dart_NativeArguments args) {
switch (option) {
case 0: // TCP_NODELAY.
result = Socket::SetNoDelay(
socket, DartUtils::GetBooleanValue(Dart_GetNativeArgument(args, 3)));
socket->fd(),
DartUtils::GetBooleanValue(Dart_GetNativeArgument(args, 3)));
break;
case 1: // IP_MULTICAST_LOOP.
result = Socket::SetMulticastLoop(
socket, protocol,
socket->fd(), protocol,
DartUtils::GetBooleanValue(Dart_GetNativeArgument(args, 3)));
break;
case 2: // IP_MULTICAST_TTL.
result = Socket::SetMulticastHops(
socket, protocol,
socket->fd(), protocol,
DartUtils::GetIntegerValue(Dart_GetNativeArgument(args, 3)));
break;
case 3: { // IP_MULTICAST_IF.
@ -874,7 +891,8 @@ void FUNCTION_NAME(Socket_SetOption)(Dart_NativeArguments args) {
}
case 4: // IP_BROADCAST.
result = Socket::SetBroadcast(
socket, DartUtils::GetBooleanValue(Dart_GetNativeArgument(args, 3)));
socket->fd(),
DartUtils::GetBooleanValue(Dart_GetNativeArgument(args, 3)));
break;
default:
Dart_PropagateError(Dart_NewApiError("Value outside expected range"));
@ -889,7 +907,7 @@ void FUNCTION_NAME(Socket_SetOption)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_JoinMulticast)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
RawAddr addr;
SocketAddress::GetSockAddr(Dart_GetNativeArgument(args, 1), &addr);
@ -899,7 +917,7 @@ void FUNCTION_NAME(Socket_JoinMulticast)(Dart_NativeArguments args) {
}
int interfaceIndex =
DartUtils::GetIntegerValue(Dart_GetNativeArgument(args, 3));
if (Socket::JoinMulticast(socket, addr, interface, interfaceIndex)) {
if (Socket::JoinMulticast(socket->fd(), addr, interface, interfaceIndex)) {
Dart_SetReturnValue(args, Dart_Null());
} else {
Dart_SetReturnValue(args, DartUtils::NewDartOSError());
@ -908,7 +926,7 @@ void FUNCTION_NAME(Socket_JoinMulticast)(Dart_NativeArguments args) {
void FUNCTION_NAME(Socket_LeaveMulticast)(Dart_NativeArguments args) {
intptr_t socket =
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
RawAddr addr;
SocketAddress::GetSockAddr(Dart_GetNativeArgument(args, 1), &addr);
@ -918,7 +936,7 @@ void FUNCTION_NAME(Socket_LeaveMulticast)(Dart_NativeArguments args) {
}
int interfaceIndex =
DartUtils::GetIntegerValue(Dart_GetNativeArgument(args, 3));
if (Socket::LeaveMulticast(socket, addr, interface, interfaceIndex)) {
if (Socket::LeaveMulticast(socket->fd(), addr, interface, interfaceIndex)) {
Dart_SetReturnValue(args, Dart_Null());
} else {
Dart_SetReturnValue(args, DartUtils::NewDartOSError());
@ -926,22 +944,68 @@ void FUNCTION_NAME(Socket_LeaveMulticast)(Dart_NativeArguments args) {
}
void Socket::SetSocketIdNativeField(Dart_Handle socket, intptr_t id) {
Dart_Handle err =
Dart_SetNativeInstanceField(socket, kSocketIdNativeField, id);
static void SocketFinalizer(void* isolate_data,
Dart_WeakPersistentHandle handle,
void* data) {
Socket* socket = reinterpret_cast<Socket*>(data);
if (socket->fd() >= 0) {
const int64_t flags = 1 << kCloseCommand;
socket->Retain();
EventHandler::SendFromNative(reinterpret_cast<intptr_t>(socket),
socket->port(), flags);
}
socket->Release();
}
static void ListeningSocketFinalizer(void* isolate_data,
Dart_WeakPersistentHandle handle,
void* data) {
Socket* socket = reinterpret_cast<Socket*>(data);
if (socket->fd() >= 0) {
const int64_t flags = (1 << kListeningSocket) | (1 << kCloseCommand);
socket->Retain();
EventHandler::SendFromNative(reinterpret_cast<intptr_t>(socket),
socket->port(), flags);
}
socket->Release();
}
void Socket::ReuseSocketIdNativeField(Dart_Handle handle,
Socket* socket,
bool listening) {
Dart_Handle err = Dart_SetNativeInstanceField(
handle, kSocketIdNativeField, reinterpret_cast<intptr_t>(socket));
if (Dart_IsError(err)) {
Dart_PropagateError(err);
}
if (listening) {
Dart_NewWeakPersistentHandle(handle, reinterpret_cast<void*>(socket),
sizeof(Socket), ListeningSocketFinalizer);
} else {
Dart_NewWeakPersistentHandle(handle, reinterpret_cast<void*>(socket),
sizeof(Socket), SocketFinalizer);
}
}
intptr_t Socket::GetSocketIdNativeField(Dart_Handle socket_obj) {
intptr_t socket = 0;
void Socket::SetSocketIdNativeField(Dart_Handle handle,
intptr_t id,
bool listening) {
Socket* socket = new Socket(id);
ReuseSocketIdNativeField(handle, socket, listening);
}
Socket* Socket::GetSocketIdNativeField(Dart_Handle socket_obj) {
intptr_t id;
Dart_Handle err =
Dart_GetNativeInstanceField(socket_obj, kSocketIdNativeField, &socket);
Dart_GetNativeInstanceField(socket_obj, kSocketIdNativeField, &id);
if (Dart_IsError(err)) {
Dart_PropagateError(err);
}
Socket* socket = reinterpret_cast<Socket*>(id);
return socket;
}

View file

@ -27,6 +27,7 @@
#include "bin/builtin.h"
#include "bin/dartutils.h"
#include "bin/reference_counting.h"
#include "bin/thread.h"
#include "bin/utils.h"
#include "platform/hashmap.h"
@ -247,7 +248,11 @@ class AddressList {
};
class Socket {
// We write Sockets into the native field of the _NativeSocket object
// on the Dart side. They are allocated in SetSocketIdNativeField(), and are
// deallocated either from the finalizer attached to _NativeSockets there, or
// from the eventhandler, whichever drops the last reference.
class Socket : public ReferenceCounted<Socket> {
public:
enum SocketRequest {
kLookupRequest = 0,
@ -255,6 +260,15 @@ class Socket {
kReverseLookupRequest = 2,
};
explicit Socket(intptr_t fd);
intptr_t fd() const { return fd_; }
void SetClosedFd();
Dart_Port port() const { return port_; }
void set_port(Dart_Port port) { port_ = port; }
// TODO(dart:io): Convert these to instance methods where possible.
static bool Initialize();
static intptr_t Available(intptr_t fd);
static intptr_t Read(intptr_t fd, void* buffer, intptr_t num_bytes);
@ -333,12 +347,24 @@ class Socket {
static Dart_Port GetServicePort();
static void SetSocketIdNativeField(Dart_Handle socket, intptr_t id);
static intptr_t GetSocketIdNativeField(Dart_Handle socket);
static void SetSocketIdNativeField(Dart_Handle handle,
intptr_t id,
bool listening);
static void ReuseSocketIdNativeField(Dart_Handle handle,
Socket* socket,
bool listening);
static Socket* GetSocketIdNativeField(Dart_Handle socket);
private:
DISALLOW_ALLOCATION();
DISALLOW_IMPLICIT_CONSTRUCTORS(Socket);
~Socket() { ASSERT(fd_ == kClosedFd); }
static const int kClosedFd = -1;
intptr_t fd_;
Dart_Port port_;
friend class ReferenceCounted<Socket>;
DISALLOW_COPY_AND_ASSIGN(Socket);
};
@ -405,7 +431,7 @@ class ListeningSocketRegistry {
//
// The caller is responsible for obtaining the mutex first, before calling
// this function.
bool CloseSafe(intptr_t socketfd);
bool CloseSafe(Socket* socketfd);
Mutex* mutex() { return mutex_; }
@ -416,7 +442,7 @@ class ListeningSocketRegistry {
bool v6_only;
bool shared;
int ref_count;
intptr_t socketfd;
Socket* socketfd;
// Singly linked lists of OSSocket instances which listen on the same port
// but on different addresses.
@ -426,7 +452,7 @@ class ListeningSocketRegistry {
int port,
bool v6_only,
bool shared,
intptr_t socketfd)
Socket* socketfd)
: address(address),
port(port),
v6_only(v6_only),
@ -438,7 +464,7 @@ class ListeningSocketRegistry {
static const intptr_t kInitialSocketsCount = 8;
OSSocket* findOSSocketWithAddress(OSSocket* current, const RawAddr& addr) {
OSSocket* FindOSSocketWithAddress(OSSocket* current, const RawAddr& addr) {
while (current != NULL) {
if (SocketAddress::AreAddressesEqual(current->address, addr)) {
return current;
@ -465,9 +491,9 @@ class ListeningSocketRegistry {
void InsertByPort(intptr_t port, OSSocket* socket);
void RemoveByPort(intptr_t port);
OSSocket* LookupByFd(intptr_t fd);
void InsertByFd(intptr_t fd, OSSocket* socket);
void RemoveByFd(intptr_t fd);
OSSocket* LookupByFd(Socket* fd);
void InsertByFd(Socket* fd, OSSocket* socket);
void RemoveByFd(Socket* fd);
bool CloseOneSafe(OSSocket* os_socket, bool update_hash_maps);
void CloseAllSafe();

View file

@ -43,6 +43,15 @@ bool Socket::FormatNumericAddress(const RawAddr& addr, char* address, int len) {
}
Socket::Socket(intptr_t fd)
: ReferenceCounted(), fd_(fd), port_(ILLEGAL_PORT) {}
void Socket::SetClosedFd() {
fd_ = kClosedFd;
}
bool Socket::Initialize() {
// Nothing to do on Android.
return true;

View file

@ -72,6 +72,15 @@ bool Socket::FormatNumericAddress(const RawAddr& addr, char* address, int len) {
}
Socket::Socket(intptr_t fd)
: ReferenceCounted(), fd_(fd), port_(ILLEGAL_PORT) {}
void Socket::SetClosedFd() {
fd_ = kClosedFd;
}
bool Socket::Initialize() {
// Nothing to do on Fuchsia.
return true;

View file

@ -46,6 +46,15 @@ bool Socket::FormatNumericAddress(const RawAddr& addr, char* address, int len) {
}
Socket::Socket(intptr_t fd)
: ReferenceCounted(), fd_(fd), port_(ILLEGAL_PORT) {}
void Socket::SetClosedFd() {
fd_ = kClosedFd;
}
bool Socket::Initialize() {
// Nothing to do on Linux.
return true;

View file

@ -45,6 +45,15 @@ bool Socket::FormatNumericAddress(const RawAddr& addr, char* address, int len) {
}
Socket::Socket(intptr_t fd)
: ReferenceCounted(), fd_(fd), port_(ILLEGAL_PORT) {}
void Socket::SetClosedFd() {
fd_ = kClosedFd;
}
bool Socket::Initialize() {
// Nothing to do on Mac OS.
return true;

View file

@ -46,6 +46,22 @@ bool Socket::FormatNumericAddress(const RawAddr& addr, char* address, int len) {
}
Socket::Socket(intptr_t fd) : ReferenceCounted(), fd_(fd), port_(ILLEGAL_PORT) {
ASSERT(fd_ != kClosedFd);
Handle* handle = reinterpret_cast<Handle*>(fd_);
ASSERT(handle != NULL);
}
void Socket::SetClosedFd() {
ASSERT(fd_ != kClosedFd);
Handle* handle = reinterpret_cast<Handle*>(fd_);
ASSERT(handle != NULL);
handle->Release();
fd_ = kClosedFd;
}
static Mutex* init_mutex = new Mutex();
static bool socket_initialized = false;
@ -165,7 +181,7 @@ static intptr_t Connect(intptr_t fd,
if (status != NO_ERROR) {
int rc = WSAGetLastError();
handle->mark_closed(); // Destructor asserts that socket is marked closed.
delete handle;
handle->Release();
closesocket(s);
SetLastError(rc);
return -1;
@ -196,11 +212,12 @@ static intptr_t Connect(intptr_t fd,
rc = WSAGetLastError();
// Cleanup in case of error.
OverlappedBuffer::DisposeBuffer(overlapped);
handle->Release();
} else {
rc = WSAGetLastError();
}
handle->Close();
delete handle;
handle->Release();
SetLastError(rc);
return -1;
}
@ -497,7 +514,7 @@ intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
intptr_t new_s = CreateBindListen(addr, backlog, v6_only);
DWORD rc = WSAGetLastError();
closesocket(s);
delete listen_socket;
listen_socket->Release();
SetLastError(rc);
return new_s;
}
@ -506,7 +523,7 @@ intptr_t ServerSocket::CreateBindListen(const RawAddr& addr,
if (status == SOCKET_ERROR) {
DWORD rc = WSAGetLastError();
closesocket(s);
delete listen_socket;
listen_socket->Release();
SetLastError(rc);
return -1;
}
@ -526,7 +543,7 @@ bool ServerSocket::StartAccept(intptr_t fd) {
if (!listen_socket->HasPendingAccept()) {
// Delete socket now, if there are no pending accepts. Otherwise,
// the event-handler will take care of deleting it.
delete listen_socket;
listen_socket->Release();
}
SetLastError(rc);
return false;

View file

@ -110,6 +110,8 @@ testListenCloseListenClose(String host) async {
await client.close();
await client.drain();
// Close the second server socket.
await socket2.close();
asyncEnd();
}

View file

@ -0,0 +1,39 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// This test checks that sockets belonging to an isolate are properly cleaned up
// when the isolate shuts down abnormally. If the socket is not properly cleaned
// up, the test will time out.
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import "package:async_helper/async_helper.dart";
import "package:expect/expect.dart";
ConnectorIsolate(int port) async {
Socket socket = await Socket.connect("127.0.0.1", port);
socket.listen((_) {});
}
main() async {
asyncStart();
ServerSocket server = await ServerSocket.bind("127.0.0.1", 0);
Isolate isolate = await Isolate.spawn(ConnectorIsolate, server.port);
Completer<Null> completer = new Completer<Null>();
server.listen((Socket socket) {
socket.listen((_) {}, onDone: () {
print("Socket closed normally");
completer.complete(null);
socket.close();
}, onError: (e) {
Expect.fail("Socket error $e");
});
isolate.kill();
});
await completer.future;
await server.close();
asyncEnd();
}