dart-sdk/runtime/bin/eventhandler_win.h
Slava Egorov 8b6e39696f [vm/io] Avoid leaking Handle::data_ready_ on Windows.
Handle::data_ready_ contains bytes which are ready to be sent to Dart
side. Not all code paths fully drain this buffer and delete it
before destroying the handle, for example directory watch implementation
was prone to leaking data_ready_ when subscription was cancelled.

This CL switches the code to use unique_ptr to hold on to data_ready_
which makes sure that it is deleted when Handle is destroyed.

The code would benefit from holding all OverlappedBuffer instances
through unique_ptr but that is a much larger refactoring which
we leave for a later date.

Fixes https://github.com/dart-lang/sdk/issues/52715

TEST=standalone{,_2}/regress_52715_test

Bug: 52715
Cq-Include-Trybots: luci.dart.try:vm-win-release-x64-try,vm-win-debug-x64-try,vm-aot-win-release-x64-try,analyzer-win-release-try
Change-Id: Ie8d728b823de7e8f9de1489898e270580c2af269
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/311841
Commit-Queue: Slava Egorov <vegorov@google.com>
Commit-Queue: Martin Kustermann <kustermann@google.com>
Reviewed-by: Martin Kustermann <kustermann@google.com>
Auto-Submit: Slava Egorov <vegorov@google.com>
2023-06-28 13:06:51 +00:00

591 lines
18 KiB
C++

// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
#ifndef RUNTIME_BIN_EVENTHANDLER_WIN_H_
#define RUNTIME_BIN_EVENTHANDLER_WIN_H_
#if !defined(RUNTIME_BIN_EVENTHANDLER_H_)
#error Do not include eventhandler_win.h directly; use eventhandler.h instead.
#endif
#include <mswsock.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#include <memory>
#include <utility>
#include "bin/builtin.h"
#include "bin/reference_counting.h"
#include "bin/socket_base.h"
#include "bin/thread.h"
namespace dart {
namespace bin {
// Forward declarations.
class EventHandlerImplementation;
class Handle;
class FileHandle;
class SocketHandle;
class ClientSocket;
class ListenSocket;
// An OverlappedBuffer encapsulates the OVERLAPPED structure and the
// associated data buffer. For accept it also contains the pre-created
// socket for the client.
class OverlappedBuffer {
public:
enum Operation {
kAccept,
kRead,
kRecvFrom,
kWrite,
kSendTo,
kDisconnect,
kConnect
};
static OverlappedBuffer* AllocateAcceptBuffer(int buffer_size);
static OverlappedBuffer* AllocateReadBuffer(int buffer_size);
static OverlappedBuffer* AllocateRecvFromBuffer(int buffer_size);
static OverlappedBuffer* AllocateWriteBuffer(int buffer_size);
static OverlappedBuffer* AllocateSendToBuffer(int buffer_size);
static OverlappedBuffer* AllocateDisconnectBuffer();
static OverlappedBuffer* AllocateConnectBuffer();
static void DisposeBuffer(OverlappedBuffer* buffer);
// Find the IO buffer from the OVERLAPPED address.
static OverlappedBuffer* GetFromOverlapped(OVERLAPPED* overlapped);
// Read data from a buffer which has been received. It will read up
// to num_bytes bytes of data returning the actual number of bytes
// read. This will update the index of the next byte in the buffer
// so calling Read several times will keep returning new data from
// the buffer until all data have been read.
int Read(void* buffer, int num_bytes);
// Write data to a buffer before sending it. Returns the number of bytes
// actually written to the buffer. Calls to Write will always write to
// the buffer from the beginning.
int Write(const void* buffer, int num_bytes);
// Check the amount of data in a read buffer which has not been read yet.
int GetRemainingLength();
bool IsEmpty() { return GetRemainingLength() == 0; }
Operation operation() const { return operation_; }
SOCKET client() const { return client_; }
char* GetBufferStart() { return reinterpret_cast<char*>(&buffer_data_); }
int GetBufferSize() const { return buflen_; }
struct sockaddr* from() const {
return from_;
}
socklen_t* from_len_addr() const { return from_len_addr_; }
socklen_t from_len() const { return from_ == nullptr ? 0 : *from_len_addr_; }
// Returns the address of the OVERLAPPED structure with all fields
// initialized to zero.
OVERLAPPED* GetCleanOverlapped() {
memset(&overlapped_, 0, sizeof(overlapped_));
return &overlapped_;
}
// Returns a WASBUF structure initialized with the data in this IO buffer.
WSABUF* GetWASBUF() {
wbuf_.buf = GetBufferStart();
wbuf_.len = GetBufferSize();
return &wbuf_;
}
void set_data_length(int data_length) { data_length_ = data_length; }
void operator delete(void* buffer) { free(buffer); }
private:
OverlappedBuffer(int buffer_size, Operation operation)
: operation_(operation), buflen_(buffer_size) {
memset(GetBufferStart(), 0, GetBufferSize());
if (operation == kRecvFrom) {
// Reserve part of the buffer for the length of source sockaddr
// and source sockaddr.
const int kAdditionalSize =
sizeof(struct sockaddr_storage) + sizeof(socklen_t);
ASSERT(buflen_ > kAdditionalSize);
buflen_ -= kAdditionalSize;
from_len_addr_ =
reinterpret_cast<socklen_t*>(GetBufferStart() + GetBufferSize());
*from_len_addr_ = sizeof(struct sockaddr_storage);
from_ = reinterpret_cast<struct sockaddr*>(from_len_addr_ + 1);
} else {
from_len_addr_ = nullptr;
from_ = nullptr;
}
index_ = 0;
data_length_ = 0;
if (operation_ == kAccept) {
client_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
}
}
void* operator new(size_t size, int buffer_size) {
return malloc(size + buffer_size);
}
// Allocate an overlapped buffer for thse specified amount of data and
// operation. Some operations need additional buffer space, which is
// handled by this method.
static OverlappedBuffer* AllocateBuffer(int buffer_size, Operation operation);
OVERLAPPED overlapped_; // OVERLAPPED structure for overlapped IO.
SOCKET client_; // Used for AcceptEx client socket.
int buflen_; // Length of the buffer.
Operation operation_; // Type of operation issued.
int index_; // Index for next read from read buffer.
int data_length_; // Length of the actual data in the buffer.
WSABUF wbuf_; // Structure for passing buffer to WSA functions.
// For the recvfrom operation additional storace is allocated for the
// source sockaddr.
socklen_t* from_len_addr_; // Pointer to source sockaddr size storage.
struct sockaddr* from_; // Pointer to source sockaddr storage.
// Buffer for recv/send/AcceptEx. This must be at the end of the
// object as the object is allocated larger than it's definition
// indicate to extend this array.
uint8_t buffer_data_[1];
DISALLOW_COPY_AND_ASSIGN(OverlappedBuffer);
};
// Abstract super class for holding information on listen and connected
// sockets.
class Handle : public ReferenceCounted<Handle>, public DescriptorInfoBase {
public:
enum Type {
kFile,
kStd,
kDirectoryWatch,
kClientSocket,
kListenSocket,
kDatagramSocket
};
// Socket interface exposing normal socket operations.
intptr_t Available();
bool DataReady();
intptr_t Read(void* buffer, intptr_t num_bytes);
intptr_t RecvFrom(void* buffer,
intptr_t num_bytes,
struct sockaddr* sa,
socklen_t addr_len);
virtual intptr_t Write(const void* buffer, intptr_t num_bytes);
virtual intptr_t SendTo(const void* buffer,
intptr_t num_bytes,
struct sockaddr* sa,
socklen_t sa_len);
// Internal interface used by the event handler.
virtual bool IssueRead();
virtual bool IssueRecvFrom();
virtual bool IssueWrite();
virtual bool IssueSendTo(struct sockaddr* sa, socklen_t sa_len);
bool HasPendingRead();
bool HasPendingWrite();
void ReadComplete(OverlappedBuffer* buffer);
void RecvFromComplete(OverlappedBuffer* buffer);
void WriteComplete(OverlappedBuffer* buffer);
bool IsClosing() { return (flags_ & (1 << kClosing)) != 0; }
bool IsClosedRead() { return (flags_ & (1 << kCloseRead)) != 0; }
bool IsClosedWrite() { return (flags_ & (1 << kCloseWrite)) != 0; }
bool IsError() { return (flags_ & (1 << kError)) != 0; }
void MarkClosing() { flags_ |= (1 << kClosing); }
void MarkClosedRead() { flags_ |= (1 << kCloseRead); }
void MarkClosedWrite() { flags_ |= (1 << kCloseWrite); }
void MarkError() { flags_ |= (1 << kError); }
virtual void EnsureInitialized(EventHandlerImplementation* event_handler) = 0;
HANDLE handle() { return handle_; }
bool CreateCompletionPort(HANDLE completion_port);
void Close();
virtual void DoClose();
virtual bool IsClosed() = 0;
bool IsHandleClosed() const { return handle_ == INVALID_HANDLE_VALUE; }
Type type() { return type_; }
bool is_file() { return type_ == kFile; }
bool is_socket() {
return type_ == kListenSocket || type_ == kClientSocket ||
type_ == kDatagramSocket;
}
bool is_listen_socket() { return type_ == kListenSocket; }
bool is_client_socket() { return type_ == kClientSocket; }
bool is_datagram_socket() { return type_ == kDatagramSocket; }
void MarkDoesNotSupportOverlappedIO() {
flags_ |= (1 << kDoesNotSupportOverlappedIO);
}
bool SupportsOverlappedIO() {
return (flags_ & (1 << kDoesNotSupportOverlappedIO)) == 0;
}
void ReadSyncCompleteAsync();
DWORD last_error() { return last_error_; }
void set_last_error(DWORD last_error) { last_error_ = last_error; }
void set_completion_port(HANDLE completion_port) {
completion_port_ = completion_port;
}
void set_event_handler(EventHandlerImplementation* event_handler) {
event_handler_ = event_handler;
}
protected:
// For access to monitor_;
friend class EventHandlerImplementation;
enum Flags {
kClosing = 0,
kCloseRead = 1,
kCloseWrite = 2,
kDoesNotSupportOverlappedIO = 3,
kError = 4
};
explicit Handle(intptr_t handle);
virtual ~Handle();
virtual void HandleIssueError();
Monitor monitor_;
Type type_;
HANDLE handle_;
HANDLE completion_port_;
EventHandlerImplementation* event_handler_;
std::unique_ptr<OverlappedBuffer>
data_ready_; // Buffer for data ready to be read.
OverlappedBuffer* pending_read_; // Buffer for pending read.
OverlappedBuffer* pending_write_; // Buffer for pending write
DWORD last_error_;
ThreadId read_thread_id_;
HANDLE read_thread_handle_;
bool read_thread_starting_;
bool read_thread_finished_;
private:
void WaitForReadThreadStarted();
void NotifyReadThreadStarted();
void WaitForReadThreadFinished();
void NotifyReadThreadFinished();
int flags_;
friend class ReferenceCounted<Handle>;
DISALLOW_COPY_AND_ASSIGN(Handle);
};
class FileHandle : public DescriptorInfoSingleMixin<Handle> {
public:
explicit FileHandle(HANDLE handle)
: DescriptorInfoSingleMixin(reinterpret_cast<intptr_t>(handle), true) {
type_ = kFile;
}
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual bool IsClosed();
private:
DISALLOW_COPY_AND_ASSIGN(FileHandle);
};
class StdHandle : public FileHandle {
public:
static StdHandle* Stdin(HANDLE handle);
virtual void DoClose();
virtual intptr_t Write(const void* buffer, intptr_t num_bytes);
void WriteSyncCompleteAsync();
void RunWriteLoop();
#if defined(DEBUG)
static StdHandle* StdinPtr() { return stdin_; }
#endif
private:
static Mutex* stdin_mutex_;
static StdHandle* stdin_;
explicit StdHandle(HANDLE handle)
: FileHandle(handle),
thread_id_(Thread::kInvalidThreadId),
thread_handle_(nullptr),
thread_wrote_(0),
write_thread_exists_(false),
write_thread_running_(false) {
type_ = kStd;
}
ThreadId thread_id_;
HANDLE thread_handle_;
intptr_t thread_wrote_;
bool write_thread_exists_;
bool write_thread_running_;
DISALLOW_COPY_AND_ASSIGN(StdHandle);
};
class DirectoryWatchHandle : public DescriptorInfoSingleMixin<Handle> {
public:
DirectoryWatchHandle(HANDLE handle, int events, bool recursive)
: DescriptorInfoSingleMixin(reinterpret_cast<intptr_t>(handle), true),
events_(events),
recursive_(recursive) {
type_ = kDirectoryWatch;
}
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual bool IsClosed();
virtual bool IssueRead();
void Stop();
private:
int events_;
bool recursive_;
DISALLOW_COPY_AND_ASSIGN(DirectoryWatchHandle);
};
class SocketHandle : public Handle {
public:
SOCKET socket() const { return socket_; }
protected:
explicit SocketHandle(intptr_t s) : Handle(s), socket_(s) {}
virtual void HandleIssueError();
private:
const SOCKET socket_;
DISALLOW_COPY_AND_ASSIGN(SocketHandle);
};
// Information on listen sockets.
class ListenSocket : public DescriptorInfoMultipleMixin<SocketHandle> {
public:
explicit ListenSocket(intptr_t s)
: DescriptorInfoMultipleMixin(s, true),
AcceptEx_(nullptr),
GetAcceptExSockaddrs_(nullptr),
pending_accept_count_(0),
accepted_head_(nullptr),
accepted_tail_(nullptr),
accepted_count_(0) {
type_ = kListenSocket;
}
virtual ~ListenSocket() {
ASSERT(!HasPendingAccept());
ASSERT(accepted_head_ == nullptr);
ASSERT(accepted_tail_ == nullptr);
}
// Socket interface exposing normal socket operations.
ClientSocket* Accept();
bool CanAccept();
// Internal interface used by the event handler.
bool HasPendingAccept() { return pending_accept_count_ > 0; }
bool IssueAccept();
void AcceptComplete(OverlappedBuffer* buffer, HANDLE completion_port);
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual void DoClose();
virtual bool IsClosed();
int pending_accept_count() { return pending_accept_count_; }
int accepted_count() { return accepted_count_; }
private:
bool LoadAcceptEx();
bool LoadGetAcceptExSockaddrs();
LPFN_ACCEPTEX AcceptEx_;
LPFN_GETACCEPTEXSOCKADDRS GetAcceptExSockaddrs_;
// The number of asynchronous `IssueAccept` operations which haven't completed
// yet.
int pending_accept_count_;
// Linked list of accepted connections provided by completion code. Ready to
// be handed over through accept.
ClientSocket* accepted_head_;
ClientSocket* accepted_tail_;
// The number of accepted connections which are waiting to be removed from
// this queue and processed by dart isolates.
int accepted_count_;
DISALLOW_COPY_AND_ASSIGN(ListenSocket);
};
// Information on connected sockets.
class ClientSocket : public DescriptorInfoSingleMixin<SocketHandle> {
public:
explicit ClientSocket(intptr_t s,
std::unique_ptr<RawAddr> remote_addr = nullptr)
: DescriptorInfoSingleMixin(s, true),
DisconnectEx_(nullptr),
next_(nullptr),
connected_(false),
closed_(false),
remote_addr_(std::move(remote_addr)) {
LoadDisconnectEx();
type_ = kClientSocket;
}
virtual ~ClientSocket() {
// Don't delete this object until all pending requests have been handled.
ASSERT(!HasPendingRead());
ASSERT(!HasPendingWrite());
ASSERT(next_ == nullptr);
ASSERT(closed_ == true);
}
void Shutdown(int how);
// Internal interface used by the event handler.
virtual bool IssueRead();
virtual bool IssueWrite();
void IssueDisconnect();
void DisconnectComplete(OverlappedBuffer* buffer);
void ConnectComplete(OverlappedBuffer* buffer);
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual void DoClose();
virtual bool IsClosed();
// If `ClientSocket` was constructed with a `remote_addr`, populate `addr`
// with that value and return `true`. Otherwise leave `addr` untouched and
// return `false`.
bool PopulateRemoteAddr(RawAddr& addr);
ClientSocket* next() { return next_; }
void set_next(ClientSocket* next) { next_ = next; }
void mark_connected() { connected_ = true; }
bool is_connected() const { return connected_; }
void mark_closed() { closed_ = true; }
#if defined(DEBUG)
static intptr_t disconnecting() { return disconnecting_; }
#endif
private:
bool LoadDisconnectEx();
LPFN_DISCONNECTEX DisconnectEx_;
ClientSocket* next_;
bool connected_;
bool closed_;
std::unique_ptr<RawAddr> remote_addr_;
#if defined(DEBUG)
static intptr_t disconnecting_;
#endif
DISALLOW_COPY_AND_ASSIGN(ClientSocket);
};
class DatagramSocket : public DescriptorInfoSingleMixin<SocketHandle> {
public:
explicit DatagramSocket(intptr_t s) : DescriptorInfoSingleMixin(s, true) {
type_ = kDatagramSocket;
}
virtual ~DatagramSocket() {
// Don't delete this object until all pending requests have been handled.
ASSERT(!HasPendingRead());
ASSERT(!HasPendingWrite());
}
// Internal interface used by the event handler.
virtual bool IssueRecvFrom();
virtual bool IssueSendTo(sockaddr* sa, socklen_t sa_len);
virtual void EnsureInitialized(EventHandlerImplementation* event_handler);
virtual void DoClose();
virtual bool IsClosed();
private:
DISALLOW_COPY_AND_ASSIGN(DatagramSocket);
};
// Event handler.
class EventHandlerImplementation {
public:
EventHandlerImplementation();
virtual ~EventHandlerImplementation();
void SendData(intptr_t id, Dart_Port dart_port, int64_t data);
void Start(EventHandler* handler);
void Shutdown();
static void EventHandlerEntry(uword args);
int64_t GetTimeout();
void HandleInterrupt(InterruptMessage* msg);
void HandleTimeout();
void HandleAccept(ListenSocket* listen_socket, OverlappedBuffer* buffer);
void TryDispatchingPendingAccepts(ListenSocket* listen_socket);
void HandleRead(Handle* handle, int bytes, OverlappedBuffer* buffer);
void HandleRecvFrom(Handle* handle, int bytes, OverlappedBuffer* buffer);
void HandleWrite(Handle* handle, int bytes, OverlappedBuffer* buffer);
void HandleDisconnect(ClientSocket* client_socket,
int bytes,
OverlappedBuffer* buffer);
void HandleConnect(ClientSocket* client_socket,
int bytes,
OverlappedBuffer* buffer);
void HandleIOCompletion(DWORD bytes, ULONG_PTR key, OVERLAPPED* overlapped);
void HandleCompletionOrInterrupt(BOOL ok,
DWORD bytes,
ULONG_PTR key,
OVERLAPPED* overlapped);
HANDLE completion_port() { return completion_port_; }
private:
Monitor startup_monitor_;
ThreadId handler_thread_id_;
HANDLE handler_thread_handle_;
TimeoutQueue timeout_queue_; // Time for next timeout.
bool shutdown_;
HANDLE completion_port_;
DISALLOW_COPY_AND_ASSIGN(EventHandlerImplementation);
};
} // namespace bin
} // namespace dart
#endif // RUNTIME_BIN_EVENTHANDLER_WIN_H_