dart-sdk/runtime/bin/eventhandler_win.h
Matthew Dempsky 6d5f763952 [vm] Eliminate Mutex/Monitor indirection where possible
In many cases, the Mutexes and Monitors have to be marked "mutable"
because they're used to synchronize const accessor methods.

Small text segment improvement for Product builds:

$ size dart.{arm,x64}.{before,after}
   text	   data	    bss	    dec	    hex	filename
19726069	 409960	 392332	20528361	1393ce9	dart.arm.before
19725525	 409960	 392332	20527817	1393ac9	dart.arm.after
22576021	 600376	1782824	24959221	17cd8f5	dart.x64.before
22574821	 600376	1782824	24958021	17cd445	dart.x64.after

Change-Id: I68f5cd5ad452044df8bfebd160910496036a3e6b
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/101745
Commit-Queue: Matthew Dempsky <mdempsky@google.com>
Reviewed-by: Vyacheslav Egorov <vegorov@google.com>
2019-05-13 21:17:51 +00:00

576 lines
17 KiB
C++

// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#ifndef RUNTIME_BIN_EVENTHANDLER_WIN_H_
#define RUNTIME_BIN_EVENTHANDLER_WIN_H_
#if !defined(RUNTIME_BIN_EVENTHANDLER_H_)
#error Do not include eventhandler_win.h directly; use eventhandler.h instead.
#endif
#include <mswsock.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include "bin/builtin.h"
#include "bin/reference_counting.h"
#include "bin/thread.h"
namespace dart {
namespace bin {
// Forward declarations.
class EventHandlerImplementation;
class Handle;
class FileHandle;
class SocketHandle;
class ClientSocket;
class ListenSocket;
// An OverlappedBuffer encapsulates the OVERLAPPED structure and the
// associated data buffer. For accept it also contains the pre-created
// socket for the client.
class OverlappedBuffer {
public:
enum Operation {
kAccept,
kRead,
kRecvFrom,
kWrite,
kSendTo,
kDisconnect,
kConnect
};
static OverlappedBuffer* AllocateAcceptBuffer(int buffer_size);
static OverlappedBuffer* AllocateReadBuffer(int buffer_size);
static OverlappedBuffer* AllocateRecvFromBuffer(int buffer_size);
static OverlappedBuffer* AllocateWriteBuffer(int buffer_size);
static OverlappedBuffer* AllocateSendToBuffer(int buffer_size);
static OverlappedBuffer* AllocateDisconnectBuffer();
static OverlappedBuffer* AllocateConnectBuffer();
static void DisposeBuffer(OverlappedBuffer* buffer);
// Find the IO buffer from the OVERLAPPED address.
static OverlappedBuffer* GetFromOverlapped(OVERLAPPED* overlapped);
// Read data from a buffer which has been received. It will read up
// to num_bytes bytes of data returning the actual number of bytes
// read. This will update the index of the next byte in the buffer
// so calling Read several times will keep returning new data from
// the buffer until all data have been read.
int Read(void* buffer, int num_bytes);
// Write data to a buffer before sending it. Returns the number of bytes
// actually written to the buffer. Calls to Write will always write to
// the buffer from the beginning.
int Write(const void* buffer, int num_bytes);
// Check the amount of data in a read buffer which has not been read yet.
int GetRemainingLength();
bool IsEmpty() { return GetRemainingLength() == 0; }
Operation operation() const { return operation_; }
SOCKET client() const { return client_; }
char* GetBufferStart() { return reinterpret_cast<char*>(&buffer_data_); }
int GetBufferSize() const { return buflen_; }
struct sockaddr* from() const {
return from_;
}
socklen_t* from_len_addr() const { return from_len_addr_; }
socklen_t from_len() const { return from_ == NULL ? 0 : *from_len_addr_; }
// Returns the address of the OVERLAPPED structure with all fields
// initialized to zero.
OVERLAPPED* GetCleanOverlapped() {
memset(&overlapped_, 0, sizeof(overlapped_));
return &overlapped_;
}
// Returns a WASBUF structure initialized with the data in this IO buffer.
WSABUF* GetWASBUF() {
wbuf_.buf = GetBufferStart();
wbuf_.len = GetBufferSize();
return &wbuf_;
}
void set_data_length(int data_length) { data_length_ = data_length; }
private:
OverlappedBuffer(int buffer_size, Operation operation)
: operation_(operation), buflen_(buffer_size) {
memset(GetBufferStart(), 0, GetBufferSize());
if (operation == kRecvFrom) {
// Reserve part of the buffer for the length of source sockaddr
// and source sockaddr.
const int kAdditionalSize =
sizeof(struct sockaddr_storage) + sizeof(socklen_t);
ASSERT(buflen_ > kAdditionalSize);
buflen_ -= kAdditionalSize;
from_len_addr_ =
reinterpret_cast<socklen_t*>(GetBufferStart() + GetBufferSize());
*from_len_addr_ = sizeof(struct sockaddr_storage);
from_ = reinterpret_cast<struct sockaddr*>(from_len_addr_ + 1);
} else {
from_len_addr_ = NULL;
from_ = NULL;
}
index_ = 0;
data_length_ = 0;
if (operation_ == kAccept) {
client_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
}
}
void* operator new(size_t size, int buffer_size) {
return malloc(size + buffer_size);
}
void operator delete(void* buffer) { free(buffer); }
// Allocate an overlapped buffer for thse specified amount of data and
// operation. Some operations need additional buffer space, which is
// handled by this method.
static OverlappedBuffer* AllocateBuffer(int buffer_size, Operation operation);
OVERLAPPED overlapped_; // OVERLAPPED structure for overlapped IO.
SOCKET client_; // Used for AcceptEx client socket.
int buflen_; // Length of the buffer.
Operation operation_; // Type of operation issued.
int index_; // Index for next read from read buffer.
int data_length_; // Length of the actual data in the buffer.
WSABUF wbuf_; // Structure for passing buffer to WSA functions.
// For the recvfrom operation additional storace is allocated for the
// source sockaddr.
socklen_t* from_len_addr_; // Pointer to source sockaddr size storage.
struct sockaddr* from_; // Pointer to source sockaddr storage.
// Buffer for recv/send/AcceptEx. This must be at the end of the
// object as the object is allocated larger than it's definition
// indicate to extend this array.
uint8_t buffer_data_[1];
DISALLOW_COPY_AND_ASSIGN(OverlappedBuffer);
};
// Abstract super class for holding information on listen and connected
// sockets.
class Handle : public ReferenceCounted<Handle>, public DescriptorInfoBase {
public:
enum Type {
kFile,
kStd,
kDirectoryWatch,
kClientSocket,
kListenSocket,
kDatagramSocket
};
// Socket interface exposing normal socket operations.
intptr_t Available();
intptr_t Read(void* buffer, intptr_t num_bytes);
intptr_t RecvFrom(void* buffer,
intptr_t num_bytes,
struct sockaddr* sa,
socklen_t addr_len);
virtual intptr_t Write(const void* buffer, intptr_t num_bytes);
virtual intptr_t SendTo(const void* buffer,
intptr_t num_bytes,
struct sockaddr* sa,
socklen_t sa_len);
// Internal interface used by the event handler.
virtual bool IssueRead();
virtual bool IssueRecvFrom();
virtual bool IssueWrite();
virtual bool IssueSendTo(struct sockaddr* sa, socklen_t sa_len);
bool HasPendingRead();
bool HasPendingWrite();
void ReadComplete(OverlappedBuffer* buffer);
void RecvFromComplete(OverlappedBuffer* buffer);
void WriteComplete(OverlappedBuffer* buffer);
bool IsClosing() { return (flags_ & (1 << kClosing)) != 0; }
bool IsClosedRead() { return (flags_ & (1 << kCloseRead)) != 0; }
bool IsClosedWrite() { return (flags_ & (1 << kCloseWrite)) != 0; }
bool IsError() { return (flags_ & (1 << kError)) != 0; }
void MarkClosing() { flags_ |= (1 << kClosing); }
void MarkClosedRead() { flags_ |= (1 << kCloseRead); }
void MarkClosedWrite() { flags_ |= (1 << kCloseWrite); }
void MarkError() { flags_ |= (1 << kError); }
virtual void EnsureInitialized(EventHandlerImplementation* event_handler) = 0;
HANDLE handle() { return handle_; }
bool CreateCompletionPort(HANDLE completion_port);
void Close();
virtual void DoClose();
virtual bool IsClosed() = 0;
bool IsHandleClosed() const { return handle_ == INVALID_HANDLE_VALUE; }
Type type() { return type_; }
bool is_file() { return type_ == kFile; }
bool is_socket() {
return type_ == kListenSocket || type_ == kClientSocket ||
type_ == kDatagramSocket;
}
bool is_listen_socket() { return type_ == kListenSocket; }
bool is_client_socket() { return type_ == kClientSocket; }
bool is_datagram_socket() { return type_ == kDatagramSocket; }
void MarkDoesNotSupportOverlappedIO() {
flags_ |= (1 << kDoesNotSupportOverlappedIO);
}
bool SupportsOverlappedIO() {
return (flags_ & (1 << kDoesNotSupportOverlappedIO)) == 0;
}
void ReadSyncCompleteAsync();
DWORD last_error() { return last_error_; }
void set_last_error(DWORD last_error) { last_error_ = last_error; }
void set_completion_port(HANDLE completion_port) {
completion_port_ = completion_port;
}
void set_event_handler(EventHandlerImplementation* event_handler) {
event_handler_ = event_handler;
}
protected:
// For access to monitor_;
friend class EventHandlerImplementation;
enum Flags {
kClosing = 0,
kCloseRead = 1,
kCloseWrite = 2,
kDoesNotSupportOverlappedIO = 3,
kError = 4
};
explicit Handle(intptr_t handle);
virtual ~Handle();
virtual void HandleIssueError();
Monitor monitor_;
Type type_;
HANDLE handle_;
HANDLE completion_port_;
EventHandlerImplementation* event_handler_;
OverlappedBuffer* data_ready_; // Buffer for data ready to be read.
OverlappedBuffer* pending_read_; // Buffer for pending read.
OverlappedBuffer* pending_write_; // Buffer for pending write
DWORD last_error_;
ThreadId read_thread_id_;
HANDLE read_thread_handle_;
bool read_thread_starting_;
bool read_thread_finished_;
private:
void WaitForReadThreadStarted();
void NotifyReadThreadStarted();
void WaitForReadThreadFinished();
void NotifyReadThreadFinished();
int flags_;
friend class ReferenceCounted<Handle>;
DISALLOW_COPY_AND_ASSIGN(Handle);
};
class FileHandle : public DescriptorInfoSingleMixin<Handle> {
public:
explicit FileHandle(HANDLE handle)
: DescriptorInfoSingleMixin(reinterpret_cast<intptr_t>(handle), true) {
type_ = kFile;
}
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual bool IsClosed();
private:
DISALLOW_COPY_AND_ASSIGN(FileHandle);
};
class StdHandle : public FileHandle {
public:
static StdHandle* Stdin(HANDLE handle);
virtual void DoClose();
virtual intptr_t Write(const void* buffer, intptr_t num_bytes);
void WriteSyncCompleteAsync();
void RunWriteLoop();
#if defined(DEBUG)
static StdHandle* StdinPtr() { return stdin_; }
#endif
private:
static Mutex* stdin_mutex_;
static StdHandle* stdin_;
explicit StdHandle(HANDLE handle)
: FileHandle(handle),
thread_id_(Thread::kInvalidThreadId),
thread_handle_(NULL),
thread_wrote_(0),
write_thread_exists_(false),
write_thread_running_(false) {
type_ = kStd;
}
ThreadId thread_id_;
HANDLE thread_handle_;
intptr_t thread_wrote_;
bool write_thread_exists_;
bool write_thread_running_;
DISALLOW_COPY_AND_ASSIGN(StdHandle);
};
class DirectoryWatchHandle : public DescriptorInfoSingleMixin<Handle> {
public:
DirectoryWatchHandle(HANDLE handle, int events, bool recursive)
: DescriptorInfoSingleMixin(reinterpret_cast<intptr_t>(handle), true),
events_(events),
recursive_(recursive) {
type_ = kDirectoryWatch;
}
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual bool IsClosed();
virtual bool IssueRead();
void Stop();
private:
int events_;
bool recursive_;
DISALLOW_COPY_AND_ASSIGN(DirectoryWatchHandle);
};
class SocketHandle : public Handle {
public:
SOCKET socket() const { return socket_; }
protected:
explicit SocketHandle(intptr_t s) : Handle(s), socket_(s) {}
virtual void HandleIssueError();
private:
const SOCKET socket_;
DISALLOW_COPY_AND_ASSIGN(SocketHandle);
};
// Information on listen sockets.
class ListenSocket : public DescriptorInfoMultipleMixin<SocketHandle> {
public:
explicit ListenSocket(intptr_t s)
: DescriptorInfoMultipleMixin(s, true),
AcceptEx_(NULL),
pending_accept_count_(0),
accepted_head_(NULL),
accepted_tail_(NULL),
accepted_count_(0) {
type_ = kListenSocket;
}
virtual ~ListenSocket() {
ASSERT(!HasPendingAccept());
ASSERT(accepted_head_ == NULL);
ASSERT(accepted_tail_ == NULL);
}
// Socket interface exposing normal socket operations.
ClientSocket* Accept();
bool CanAccept();
// Internal interface used by the event handler.
bool HasPendingAccept() { return pending_accept_count_ > 0; }
bool IssueAccept();
void AcceptComplete(OverlappedBuffer* buffer, HANDLE completion_port);
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual void DoClose();
virtual bool IsClosed();
int pending_accept_count() { return pending_accept_count_; }
int accepted_count() { return accepted_count_; }
private:
bool LoadAcceptEx();
LPFN_ACCEPTEX AcceptEx_;
// The number of asynchronous `IssueAccept` operations which haven't completed
// yet.
int pending_accept_count_;
// Linked list of accepted connections provided by completion code. Ready to
// be handed over through accept.
ClientSocket* accepted_head_;
ClientSocket* accepted_tail_;
// The number of accepted connections which are waiting to be removed from
// this queue and processed by dart isolates.
int accepted_count_;
DISALLOW_COPY_AND_ASSIGN(ListenSocket);
};
// Information on connected sockets.
class ClientSocket : public DescriptorInfoSingleMixin<SocketHandle> {
public:
explicit ClientSocket(intptr_t s)
: DescriptorInfoSingleMixin(s, true),
DisconnectEx_(NULL),
next_(NULL),
connected_(false),
closed_(false) {
LoadDisconnectEx();
type_ = kClientSocket;
}
virtual ~ClientSocket() {
// Don't delete this object until all pending requests have been handled.
ASSERT(!HasPendingRead());
ASSERT(!HasPendingWrite());
ASSERT(next_ == NULL);
ASSERT(closed_ == true);
}
void Shutdown(int how);
// Internal interface used by the event handler.
virtual bool IssueRead();
virtual bool IssueWrite();
void IssueDisconnect();
void DisconnectComplete(OverlappedBuffer* buffer);
void ConnectComplete(OverlappedBuffer* buffer);
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual void DoClose();
virtual bool IsClosed();
ClientSocket* next() { return next_; }
void set_next(ClientSocket* next) { next_ = next; }
void mark_connected() { connected_ = true; }
bool is_connected() const { return connected_; }
void mark_closed() { closed_ = true; }
#if defined(DEBUG)
static intptr_t disconnecting() { return disconnecting_; }
#endif
private:
bool LoadDisconnectEx();
LPFN_DISCONNECTEX DisconnectEx_;
ClientSocket* next_;
bool connected_;
bool closed_;
#if defined(DEBUG)
static intptr_t disconnecting_;
#endif
DISALLOW_COPY_AND_ASSIGN(ClientSocket);
};
class DatagramSocket : public DescriptorInfoSingleMixin<SocketHandle> {
public:
explicit DatagramSocket(intptr_t s) : DescriptorInfoSingleMixin(s, true) {
type_ = kDatagramSocket;
}
virtual ~DatagramSocket() {
// Don't delete this object until all pending requests have been handled.
ASSERT(!HasPendingRead());
ASSERT(!HasPendingWrite());
}
// Internal interface used by the event handler.
virtual bool IssueRecvFrom();
virtual bool IssueSendTo(sockaddr* sa, socklen_t sa_len);
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual void DoClose();
virtual bool IsClosed();
private:
DISALLOW_COPY_AND_ASSIGN(DatagramSocket);
};
// Event handler.
class EventHandlerImplementation {
public:
EventHandlerImplementation();
virtual ~EventHandlerImplementation();
void SendData(intptr_t id, Dart_Port dart_port, int64_t data);
void Start(EventHandler* handler);
void Shutdown();
static void EventHandlerEntry(uword args);
int64_t GetTimeout();
void HandleInterrupt(InterruptMessage* msg);
void HandleTimeout();
void HandleAccept(ListenSocket* listen_socket, OverlappedBuffer* buffer);
void TryDispatchingPendingAccepts(ListenSocket* listen_socket);
void HandleRead(Handle* handle, int bytes, OverlappedBuffer* buffer);
void HandleRecvFrom(Handle* handle, int bytes, OverlappedBuffer* buffer);
void HandleWrite(Handle* handle, int bytes, OverlappedBuffer* buffer);
void HandleDisconnect(ClientSocket* client_socket,
int bytes,
OverlappedBuffer* buffer);
void HandleConnect(ClientSocket* client_socket,
int bytes,
OverlappedBuffer* buffer);
void HandleIOCompletion(DWORD bytes, ULONG_PTR key, OVERLAPPED* overlapped);
void HandleCompletionOrInterrupt(BOOL ok,
DWORD bytes,
ULONG_PTR key,
OVERLAPPED* overlapped);
HANDLE completion_port() { return completion_port_; }
private:
Monitor startup_monitor_;
ThreadId handler_thread_id_;
HANDLE handler_thread_handle_;
TimeoutQueue timeout_queue_; // Time for next timeout.
bool shutdown_;
HANDLE completion_port_;
DISALLOW_COPY_AND_ASSIGN(EventHandlerImplementation);
};
} // namespace bin
} // namespace dart
#endif // RUNTIME_BIN_EVENTHANDLER_WIN_H_