[io] Take pending writes into account in _SocketStreamConsumer

On Windows `_NativeSocket.nativeWrite` returning to caller does
not mean that the data was flushed into OS buffer. Overlapped
operation might still be in progress - and it will be cancelled
with data loss if we close the handle.

This means _SocketStreamConsumer should take such pending writes
into account and not issue `done` event until pending write
completes.

This change prevents data loss when communicating with
subprocesses on Windows. Previously even doing
`await process.stdin.flush()` would not guarantee data
delivery.

Fixes https://github.com/dart-lang/sdk/issues/50904

TEST=tests/standalone/io/regress_50904_test.dart

Cq-Include-Trybots: luci.dart.try:vm-win-release-x64-try,vm-win-debug-x64-try,pkg-win-release-try
Change-Id: I13ab2cc3ce45f1ff8cb3183884d26bbb2b72f7ac
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/307043
Reviewed-by: Brian Quinlan <bquinlan@google.com>
Commit-Queue: Slava Egorov <vegorov@google.com>
This commit is contained in:
Vyacheslav Egorov 2023-06-07 09:49:35 +00:00 committed by Commit Queue
parent 8144a29966
commit 74285a6e31
9 changed files with 233 additions and 23 deletions

View file

@ -171,6 +171,7 @@ namespace bin {
V(Socket_SetRawOption, 4) \
V(Socket_SetSocketId, 3) \
V(Socket_WriteList, 4) \
V(Socket_HasPendingWrite, 1) \
V(SocketControlMessage_fromHandles, 2) \
V(SocketControlMessageImpl_extractHandles, 1) \
V(Stdin_ReadByte, 1) \

View file

@ -719,6 +719,17 @@ void FUNCTION_NAME(Socket_ReceiveMessage)(Dart_NativeArguments args) {
Dart_SetReturnValue(args, list);
}
void FUNCTION_NAME(Socket_HasPendingWrite)(Dart_NativeArguments args) {
#if defined(DART_HOST_OS_WINDOWS)
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));
const bool result = SocketBase::HasPendingWrite(socket->fd());
#else
const bool result = false;
#endif // defined(DART_HOST_OS_WINDOWS)
Dart_SetReturnValue(args, Dart_NewBoolean(result));
}
void FUNCTION_NAME(Socket_WriteList)(Dart_NativeArguments args) {
Socket* socket =
Socket::GetSocketIdNativeField(Dart_GetNativeArgument(args, 0));

View file

@ -262,6 +262,10 @@ class SocketBase : public AllStatic {
const RawAddr& interface,
int interfaceIndex);
#if defined(DART_HOST_OS_WINDOWS)
static bool HasPendingWrite(intptr_t fd);
#endif
// Perform a hostname lookup. Returns a AddressList of SocketAddress's.
static AddressList<SocketAddress>* LookupAddress(const char* host,
int type,

View file

@ -121,6 +121,11 @@ bool SocketBase::AvailableDatagram(intptr_t fd,
return client_socket->DataReady();
}
bool SocketBase::HasPendingWrite(intptr_t fd) {
Handle* handle = reinterpret_cast<Handle*>(fd);
return handle->HasPendingWrite();
}
intptr_t SocketBase::Write(intptr_t fd,
const void* buffer,
intptr_t num_bytes,

View file

@ -1183,6 +1183,20 @@ base class _NativeSocket extends _NativeSocketNativeWrapper
static int _fixOffset(int? offset) => offset ?? 0;
// This code issues a native write operation.
//
// On POSIX systems the data will be written using `write` syscall.
// When `write` returns a positive value this means that this number
// of bytes have been transferred from [buffer] into the OS buffer.
// At this point if the underlying descriptor is closed the OS will
// still attempt to deliver already written bytes to the destination.
//
// On Windows we use overlapped IO instead: `write` returning a positive
// value simply means that we have initiated an asynchronous IO operation
// for this many bytes. Closing the underlying handle will simply cancel the
// operation midway. Consequently you can only assume that bytes left userland
// when asynchronous write operation completes and this socket receives
// a [writeEvent].
int write(List<int> buffer, int offset, int? bytes) {
// TODO(40614): Remove once non-nullability is sound.
offset = _fixOffset(offset);
@ -1213,7 +1227,7 @@ base class _NativeSocket extends _NativeSocketNativeWrapper
// The result may be negative, if we forced a short write for testing
// purpose. In such case, don't mark writeAvailable as false, as we don't
// know if we'll receive an event. It's better to just retry.
if (result >= 0 && result < bytes) {
if ((result >= 0 && result < bytes) || hasPendingWrite()) {
writeAvailable = false;
}
// Negate the result, as stated above.
@ -1420,7 +1434,15 @@ base class _NativeSocket extends _NativeSocketNativeWrapper
}
break;
case writeEvent:
writeAvailable = true;
// On Windows there are two sources of write events: when pending
// write completes and when we subscribe to write events via
// setEventMaskCommand. Furthermore we don't always wait for a
// write event to issue a write. This means when event triggered by
// setEventMaskCommand arrives we might have already initiated a
// write. This means we should check [hasPendingWrite] here to
// be absolutely certain that the pending write operation has
// completed.
writeAvailable = !hasPendingWrite();
issueWriteEvent(delayed: false);
continue;
case errorEvent:
@ -1690,6 +1712,10 @@ base class _NativeSocket extends _NativeSocketNativeWrapper
interfaceAddr?._in_addr, interfaceIndex);
}
bool hasPendingWrite() {
return Platform.isWindows && nativeHasPendingWrite();
}
@pragma("vm:external-name", "Socket_SetSocketId")
external void nativeSetSocketId(int id, int typeFlags);
@pragma("vm:external-name", "Socket_Available")
@ -1704,6 +1730,8 @@ base class _NativeSocket extends _NativeSocketNativeWrapper
external List<dynamic> nativeReceiveMessage(int len);
@pragma("vm:external-name", "Socket_WriteList")
external int nativeWrite(List<int> buffer, int offset, int bytes);
@pragma("vm:external-name", "Socket_HasPendingWrite")
external bool nativeHasPendingWrite();
@pragma("vm:external-name", "Socket_SendTo")
external int nativeSendTo(
List<int> buffer, int offset, int bytes, Uint8List address, int port);
@ -1962,6 +1990,7 @@ class _RawSocket extends Stream<RawSocketEvent>
return _socket.readMessage(count);
}
/// See [_NativeSocket.write] for some implementation notes.
int write(List<int> buffer, [int offset = 0, int? count]) =>
_socket.write(buffer, offset, count);
@ -2129,6 +2158,9 @@ class _SocketStreamConsumer implements StreamConsumer<List<int>> {
try {
write();
} catch (e) {
buffer = null;
offset = 0;
socket.destroy();
stop();
done(e);
@ -2137,6 +2169,10 @@ class _SocketStreamConsumer implements StreamConsumer<List<int>> {
socket.destroy();
done(error, stackTrace);
}, onDone: () {
// Note: stream only delivers done event if subscription is not paused.
// so it is crucial to keep subscription paused while writes are
// in flight.
assert(buffer == null);
done();
}, cancelOnError: true);
}
@ -2148,19 +2184,40 @@ class _SocketStreamConsumer implements StreamConsumer<List<int>> {
return new Future.value(socket);
}
bool get _previousWriteHasCompleted {
final rawSocket = socket._raw;
if (rawSocket is _RawSocket) {
return rawSocket._socket.writeAvailable;
}
assert(rawSocket is _RawSecureSocket);
// _RawSecureSocket has an internal buffering mechanism and it is going
// to flush its buffer before it shutsdown.
return true;
}
void write() {
final sub = subscription;
if (sub == null) return;
// Write as much as possible.
offset =
offset! + socket._write(buffer!, offset!, buffer!.length - offset!);
// We have something to write out.
if (offset! < buffer!.length) {
offset =
offset! + socket._write(buffer!, offset!, buffer!.length - offset!);
}
if (offset! < buffer!.length || !_previousWriteHasCompleted) {
// On Windows we might have written the whole buffer out but we are
// still waiting for the write to complete. We should not resume the
// subscription until the pending write finishes and we receive a
// writeEvent signaling that we can write the next chunk or that we
// can consider all data flushed from our side into kernel buffers.
if (!paused) {
paused = true;
sub.pause();
}
socket._enableWriteEvent();
} else {
// Write fully completed.
buffer = null;
if (paused) {
paused = false;

View file

@ -422,15 +422,12 @@ Future<void> testHttpAbortAfterClose() async {
});
final request = await HttpClient().get("127.0.0.1", server.port, "/");
request.close().then((response) {
request.abort();
response.listen((data) {
Expect.equals(utf8.decode(data), value);
}, onDone: () {
asyncEnd();
server.close();
});
});
final response = await request.close();
request.abort();
final data = await response.transform(utf8.decoder).join();
Expect.equals(value, data);
asyncEnd();
server.close();
}
void main() async {

View file

@ -0,0 +1,68 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:crypto/crypto.dart';
import 'package:expect/expect.dart';
Future<void> runTest(int length) async {
final Uint8List bytes = Uint8List(length);
for (var i = 0; i < bytes.length; i++) {
bytes[i] = i;
}
final digest = sha1.convert(bytes);
final Process proc = await Process.start(
Platform.executable,
<String>[Platform.script.toFilePath(), 'receiver'],
runInShell: true,
);
proc.stdin.add(bytes);
final result = proc.stdout.transform(utf8.decoder).join();
proc.stderr.transform(utf8.decoder).listen((data) {
stdout.write('stderr> $data');
});
await proc.stdin.flush();
await proc.stdin.close();
Expect.equals(0, await proc.exitCode);
Expect.equals('got(${bytes.length},${digest})\n', await result);
}
void main(List<String> arguments) async {
if (arguments.length == 1 && arguments.first == 'receiver') {
// Read [stdin] and respond with `got(bytes,sha1digest)`.
var gotBytes = 0;
late Digest digest;
final sha1Sink = sha1
.startChunkedConversion(ChunkedConversionSink.withCallback((result) {
digest = result.first;
}));
await stdin.listen((chunk) {
gotBytes += chunk.length;
sha1Sink.add(chunk);
}).asFuture();
sha1Sink.close();
stdout.writeln('got($gotBytes,$digest)');
await stdout.flush();
return;
}
for (var mul in [1, 2, 4, 8]) {
runTest(1437 * mul);
}
// kBufferSize in runtime/bin/eventhandler_win.cc
const overlappedIoBufferSize = 64 * 1024;
runTest(overlappedIoBufferSize);
runTest(overlappedIoBufferSize - 1);
runTest(overlappedIoBufferSize + 1);
}

View file

@ -422,15 +422,12 @@ Future<void> testHttpAbortAfterClose() async {
});
final request = await HttpClient().get("127.0.0.1", server.port, "/");
request.close().then((response) {
request.abort();
response.listen((data) {
Expect.equals(utf8.decode(data), value);
}, onDone: () {
asyncEnd();
server.close();
});
});
final response = await request.close();
request.abort();
final data = await response.transform(utf8.decoder).join();
Expect.equals(value, data);
asyncEnd();
server.close();
}
void main() async {

View file

@ -0,0 +1,70 @@
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// @dart=2.9
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:crypto/crypto.dart';
import 'package:expect/expect.dart';
Future<void> runTest(int length) async {
final Uint8List bytes = Uint8List(length);
for (var i = 0; i < bytes.length; i++) {
bytes[i] = i;
}
final digest = sha1.convert(bytes);
final Process proc = await Process.start(
Platform.executable,
<String>[Platform.script.toFilePath(), 'receiver'],
runInShell: true,
);
proc.stdin.add(bytes);
final result = proc.stdout.transform(utf8.decoder).join();
proc.stderr.transform(utf8.decoder).listen((data) {
stdout.write('stderr> $data');
});
await proc.stdin.flush();
await proc.stdin.close();
Expect.equals(0, await proc.exitCode);
Expect.equals('got(${bytes.length},${digest})\n', await result);
}
void main(List<String> arguments) async {
if (arguments.length == 1 && arguments.first == 'receiver') {
// Read [stdin] and respond with `got(bytes,sha1digest)`.
var gotBytes = 0;
Digest digest;
final sha1Sink = sha1
.startChunkedConversion(ChunkedConversionSink.withCallback((result) {
digest = result.first;
}));
await stdin.listen((chunk) {
gotBytes += chunk.length;
sha1Sink.add(chunk);
}).asFuture();
sha1Sink.close();
stdout.writeln('got($gotBytes,$digest)');
await stdout.flush();
return;
}
for (var mul in [1, 2, 4, 8]) {
runTest(1437 * mul);
}
// kBufferSize in runtime/bin/eventhandler_win.cc
const overlappedIoBufferSize = 64 * 1024;
runTest(overlappedIoBufferSize);
runTest(overlappedIoBufferSize - 1);
runTest(overlappedIoBufferSize + 1);
}