[vm/io] Make sure to write until EAGAIN on POSIX

We are using `epoll()` in edge-triggerred mode, which requires
us to hit `EAGAIN` on `write()` before we are guaranteed
to receive an `EPOLLOUT` event.

`_SocketStreamConsumer` looks at `_NativeSocket.writeAvailable`
after issuing a `write` to decide whether it should continue
writing or pause and wait until event handler sends us `writeEvent`.

However we did not previously set `writeAvailable` to `true` until
the first `writeEvent` arrived, which lead to a hang on some
Linux servers: we would write a small amount of bytes into a pipe
and then wait for `writeEvent`, which would never come, as `epoll()`
is only guaranteed to wake up with `EPOLLOUT` if we hit `EAGAIN`
on `write()`.

This CL also changes `_NativeSocket.nativeWrite` implementation
to write bytes into the file descriptor until it gets EAGAIN.
This reduces the number of roundtrips between Dart and
native.

Unfortunately this CL does not come with a regression test
because it is relying on concrete `epoll()` behaviour and it this
behaviour does not reproduce for me.

TEST=Tested manually on the affected servers.

FIXED=b/286346121

Fixed: 286346121
Change-Id: I37fef9aa12b1da724b035aa9577b414a8057217e
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/308101
Reviewed-by: Martin Kustermann <kustermann@google.com>
Commit-Queue: Slava Egorov <vegorov@google.com>
This commit is contained in:
Vyacheslav Egorov 2023-06-08 13:17:28 +00:00 committed by Commit Queue
parent b66afce6ca
commit 0b62bb66ed
6 changed files with 70 additions and 46 deletions

View file

@ -315,5 +315,34 @@ bool SocketBase::IsValidAddress(const char* address) {
return SocketBase::ParseAddress(type, address, &raw);
}
#if !defined(DART_HOST_OS_WINDOWS)
intptr_t SocketBase::Write(intptr_t fd,
const void* buffer,
intptr_t num_bytes,
SocketOpKind sync) {
// For non-blocking sockets we must write as many bytes as possible into
// the output to trigger EAGAIN otherwise we are not guaranteed to
// receive an event from epoll which we are using in edge-triggering
// (EPOLLET) mode. See man epoll for more information and guidelines.
ssize_t num_bytes_left = num_bytes;
while (num_bytes_left > 0) {
ssize_t written_bytes = WriteImpl(fd, buffer, num_bytes_left, sync);
static_assert(EAGAIN == EWOULDBLOCK);
if (written_bytes == -1) {
if ((sync == kAsync) && (errno == EWOULDBLOCK)) {
break;
}
return -1; // Error occurred.
}
num_bytes_left -= written_bytes;
buffer = static_cast<const char*>(buffer) + written_bytes;
}
return num_bytes - num_bytes_left;
}
#endif
} // namespace bin
} // namespace dart

View file

@ -198,6 +198,7 @@ class SocketBase : public AllStatic {
const void* buffer,
intptr_t num_bytes,
SocketOpKind sync);
// Send data on a socket. The port to send to is specified in the port
// component of the passed RawAddr structure. The RawAddr structure is only
// used for datagram sockets.
@ -290,6 +291,13 @@ class SocketBase : public AllStatic {
OSError** os_error);
private:
#if !defined(DART_HOST_OS_WINDOWS)
static intptr_t WriteImpl(intptr_t fd,
const void* buffer,
intptr_t num_bytes,
SocketOpKind sync);
#endif
DISALLOW_ALLOCATION();
DISALLOW_IMPLICIT_CONSTRUCTORS(SocketBase);
};

View file

@ -122,19 +122,12 @@ bool SocketBase::AvailableDatagram(intptr_t fd,
return read_bytes >= 0;
}
intptr_t SocketBase::Write(intptr_t fd,
const void* buffer,
intptr_t num_bytes,
SocketOpKind sync) {
intptr_t SocketBase::WriteImpl(intptr_t fd,
const void* buffer,
intptr_t num_bytes,
SocketOpKind sync) {
ASSERT(fd >= 0);
ssize_t written_bytes = TEMP_FAILURE_RETRY(write(fd, buffer, num_bytes));
ASSERT(EAGAIN == EWOULDBLOCK);
if ((sync == kAsync) && (written_bytes == -1) && (errno == EWOULDBLOCK)) {
// If the would block we need to retry and therefore return 0 as
// the number of bytes written.
written_bytes = 0;
}
return written_bytes;
return TEMP_FAILURE_RETRY(write(fd, buffer, num_bytes));
}
intptr_t SocketBase::SendTo(intptr_t fd,

View file

@ -144,26 +144,21 @@ bool SocketBase::AvailableDatagram(intptr_t fd,
return false;
}
intptr_t SocketBase::Write(intptr_t fd,
const void* buffer,
intptr_t num_bytes,
SocketOpKind sync) {
intptr_t SocketBase::WriteImpl(intptr_t fd,
const void* buffer,
intptr_t num_bytes,
SocketOpKind sync) {
IOHandle* handle = reinterpret_cast<IOHandle*>(fd);
ASSERT(handle->fd() >= 0);
LOG_INFO("SocketBase::Write: calling write(%ld, %p, %ld)\n", handle->fd(),
LOG_INFO("SocketBase::WriteImpl: calling write(%ld, %p, %ld)\n", handle->fd(),
buffer, num_bytes);
intptr_t written_bytes = handle->Write(buffer, num_bytes);
ASSERT(EAGAIN == EWOULDBLOCK);
if ((sync == kAsync) && (written_bytes == -1) && (errno == EWOULDBLOCK)) {
// If the would block we need to retry and therefore return 0 as
// the number of bytes written.
written_bytes = 0;
} else if (written_bytes == -1) {
LOG_ERR("SocketBase::Write: write(%ld, %p, %ld) failed\n", handle->fd(),
if (written_bytes == -1 && !(sync == kAsync && errno == EWOULDBLOCK)) {
LOG_ERR("SocketBase::WriteImpl: write(%ld, %p, %ld) failed\n", handle->fd(),
buffer, num_bytes);
} else {
LOG_INFO("SocketBase::Write: write(%ld, %p, %ld) succeeded\n", handle->fd(),
buffer, num_bytes);
LOG_INFO("SocketBase::WriteImpl: write(%ld, %p, %ld) succeeded\n",
handle->fd(), buffer, num_bytes);
}
return written_bytes;
}

View file

@ -192,19 +192,11 @@ bool SocketBase::AvailableDatagram(intptr_t fd,
return read_bytes >= 0;
}
intptr_t SocketBase::Write(intptr_t fd,
const void* buffer,
intptr_t num_bytes,
SocketOpKind sync) {
ASSERT(fd >= 0);
ssize_t written_bytes = TEMP_FAILURE_RETRY(write(fd, buffer, num_bytes));
ASSERT(EAGAIN == EWOULDBLOCK);
if ((sync == kAsync) && (written_bytes == -1) && (errno == EWOULDBLOCK)) {
// If the would block we need to retry and therefore return 0 as
// the number of bytes written.
written_bytes = 0;
}
return written_bytes;
intptr_t SocketBase::WriteImpl(intptr_t fd,
const void* buffer,
intptr_t num_bytes,
SocketOpKind sync) {
return TEMP_FAILURE_RETRY(write(fd, buffer, num_bytes));
}
intptr_t SocketBase::SendTo(intptr_t fd,

View file

@ -1224,14 +1224,21 @@ base class _NativeSocket extends _NativeSocketNativeWrapper
}
int result =
nativeWrite(bufferAndStart.buffer, bufferAndStart.start, bytes);
// The result may be negative, if we forced a short write for testing
// purpose. In such case, don't mark writeAvailable as false, as we don't
// know if we'll receive an event. It's better to just retry.
if ((result >= 0 && result < bytes) || hasPendingWrite()) {
writeAvailable = false;
if (result >= 0) {
// If write succeeded only partially or is pending then we should
// pause writing and wait for the write event to arrive from the
// event handler. If the write has fully completed then we should
// continue writing.
writeAvailable = (result == bytes) && !hasPendingWrite();
} else {
// Negative result indicates that we forced a short write for testing
// purpose. We are not guaranteed to get a writeEvent in this case
// unless there is a pending write - which will trigger an event
// when it completes. So the caller should continue writing into
// this socket.
result = -result;
writeAvailable = !hasPendingWrite();
}
// Negate the result, as stated above.
if (result < 0) result = -result;
return result;
} catch (e) {
StackTrace st = StackTrace.current;