Use a BytesBuilder to group length and message, enable tcpNoDelay.

This makes the socket benchmark (and real world code also), about 100x faster.

Change-Id: Iaa130b7bf69e4c89d3c3a221680e5b62ffe10583
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/296641
Auto-Submit: Jake Macdonald <jakemac@google.com>
Commit-Queue: Jake Macdonald <jakemac@google.com>
Reviewed-by: Bob Nystrom <rnystrom@google.com>
This commit is contained in:
Jake Macdonald 2023-04-20 23:35:51 +00:00 committed by Commit Queue
parent 600fccb9bb
commit 922f611d7f
4 changed files with 45 additions and 21 deletions

View file

@ -159,8 +159,10 @@ Future<void> _separateProcessStdioBenchmarks() async {
responseCompleter = Completer();
var result = serialize();
if (result is List<int>) {
_writeLength(result, process.stdin);
process.stdin.add(result);
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
process.stdin.add(bytesBuilder.takeBytes());
} else {
process.stdin.writeln(jsonEncode(result));
}
@ -172,8 +174,10 @@ Future<void> _separateProcessStdioBenchmarks() async {
responseCompleter = Completer();
var result = serialize();
if (result is List<int>) {
_writeLength(result, process.stdin);
process.stdin.add(result);
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
process.stdin.add(bytesBuilder.takeBytes());
} else {
process.stdin.writeln(jsonEncode(result));
}
@ -218,6 +222,8 @@ Future<void> _separateProcessSocketBenchmarks() async {
serverSocket.port.toString(),
]);
var client = await clientCompleter.future;
// Nagle's algorithm slows us down >100x, disable it.
client.setOption(SocketOption.tcpNoDelay, true);
var listeners = <StreamSubscription>[
(serializationMode == SerializationMode.jsonServer ||
@ -240,8 +246,10 @@ Future<void> _separateProcessSocketBenchmarks() async {
responseCompleter = Completer();
var result = serialize();
if (result is List<int>) {
_writeLength(result, client);
client.add(result);
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
client.add(bytesBuilder.takeBytes());
} else {
client.write(jsonEncode(result));
}
@ -253,8 +261,10 @@ Future<void> _separateProcessSocketBenchmarks() async {
responseCompleter = Completer();
var result = serialize();
if (result is List<int>) {
_writeLength(result, client);
client.add(result);
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
client.add(bytesBuilder.takeBytes());
} else {
client.write(jsonEncode(result));
}
@ -265,7 +275,7 @@ Future<void> _separateProcessSocketBenchmarks() async {
listeners.forEach((l) => l.cancel());
process.kill();
await serverSocket.close();
await client.close();
client.destroy();
} catch (e, s) {
print('Error running benchmark \n$e\n\n$s');
} finally {
@ -273,12 +283,12 @@ Future<void> _separateProcessSocketBenchmarks() async {
}
}
void _writeLength(List<int> result, Sink<List<int>> sink) {
void _writeLength(List<int> result, BytesBuilder bytesBuilder) {
int length = (result as Uint8List).lengthInBytes;
if (length > 0xffffffff) {
throw new StateError('Message was larger than the allowed size!');
}
sink.add([
bytesBuilder.add([
length >> 24 & 0xff,
length >> 16 & 0xff,
length >> 8 & 0xff,
@ -323,8 +333,10 @@ String childProgram(SerializationMode mode) => '''
MessageGrouper(socket).messageStream.listen((data) {
deserialize(data);
var result = serialize() as Uint8List;
_writeLength(result, socket);
socket.add(result);
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
socket.add(bytesBuilder.takeBytes());
});
}
} else {
@ -345,8 +357,10 @@ String childProgram(SerializationMode mode) => '''
MessageGrouper(stdin).messageStream.listen((data) {
deserialize(data);
var result = serialize() as Uint8List;
_writeLength(result, stdout);
stdout.add(result);
final bytesBuilder = BytesBuilder(copy: false);
_writeLength(result, bytesBuilder);
bytesBuilder.add(result);
stdout.add(bytesBuilder.takeBytes());
});
}
}
@ -392,12 +406,12 @@ String childProgram(SerializationMode mode) => '''
}
}
void _writeLength(Uint8List result, Sink<List<int>> sink) {
void _writeLength(Uint8List result, BytesBuilder bytesBuilder) {
int length = result.lengthInBytes;
if (length > 0xffffffff) {
throw new StateError('Message was larger than the allowed size!');
}
sink.add([
bytesBuilder.add([
length >> 24 & 0xff,
length >> 16 & 0xff,
length >> 8 & 0xff,

View file

@ -102,6 +102,8 @@ void main(List<String> arguments, [SendPort? sendPort]) {
late Stream<List<int>> inputStream;
if (socketAddress != null && socketPort != null) {
var socket = await Socket.connect(socketAddress, socketPort);
// Nagle's algorithm slows us down >100x, disable it.
socket.setOption(SocketOption.tcpNoDelay, true);
sendResult = _sendIOSinkResultFactory(socket);
inputStream = socket;
} else {
@ -380,13 +382,15 @@ void Function(Serializer) _sendIOSinkResultFactory(IOSink sink) =>
} else if (serializationMode == SerializationMode.byteDataClient) {
Uint8List result = (serializer as ByteDataSerializer).result;
int length = result.lengthInBytes;
sink.add([
final bytesBuilder = BytesBuilder(copy: false);
bytesBuilder.add([
length >> 24 & 0xff,
length >> 16 & 0xff,
length >> 8 & 0xff,
length & 0xff,
]);
sink.add(result);
bytesBuilder.add(result);
sink.add(bytesBuilder.takeBytes());
} else {
throw new UnsupportedError(
'Unsupported serialization mode \$serializationMode for '

View file

@ -76,6 +76,8 @@ class _SingleProcessMacroExecutor extends ExternalMacroExecutorBase {
clientCompleter.complete(client);
});
Socket client = await clientCompleter.future;
// Nagle's algorithm slows us down >100x, disable it.
client.setOption(SocketOption.tcpNoDelay, true);
Stream<Object> messageStream;
@ -152,13 +154,15 @@ class _SingleProcessMacroExecutor extends ExternalMacroExecutorBase {
if (length > 0xffffffff) {
throw new StateError('Message was larger than the allowed size!');
}
outSink.add([
BytesBuilder bytesBuilder = new BytesBuilder(copy: false);
bytesBuilder.add([
length >> 24 & 0xff,
length >> 16 & 0xff,
length >> 8 & 0xff,
length & 0xff
]);
outSink.add(result);
bytesBuilder.add(result);
outSink.add(bytesBuilder.takeBytes());
} else {
throw new UnsupportedError(
'Unsupported serialization mode $serializationMode for '

View file

@ -938,6 +938,7 @@ mus
mvar
n
na
nagle's
nameless
namer
natively
@ -1535,6 +1536,7 @@ talks
tarjan
tarjan's
tb
tcp
team
tearoff
tearoffable