[io/lookup] On iOS run both ipv6- and ipv4-lookup concurrently.

On iOS name resolution can take unexpectedly long because it waits for both ipv4 and ipv6 results and in some cases ipv6 lookup can take long. This CL runs explicit ipv4 and ipv6 concurrently and picks up first successfully resolved name it can connect to.

Fixes https://github.com/dart-lang/sdk/issues/41451

TEST=standalone/io/http_force_concurrent_ipv6_lookup_test

Change-Id: Ie9b7ca0493d945e2eeb8de1dec7bbacdeb902785
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/177560
Commit-Queue: Alexander Aprelev <aam@google.com>
Reviewed-by: Lasse R.H. Nielsen <lrn@google.com>
This commit is contained in:
Alexander Aprelev 2021-01-11 17:07:26 +00:00 committed by commit-bot@chromium.org
parent 63373b9e9c
commit a173599f4c
4 changed files with 430 additions and 158 deletions

View file

@ -21,7 +21,7 @@ import "dart:async"
Zone,
scheduleMicrotask;
import "dart:collection" show HashMap;
import "dart:collection" show HashMap, Queue;
import "dart:convert" show Encoding, utf8;

View file

@ -491,21 +491,32 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
// a HttpServer, a WebSocket connection, a process pipe, etc.
Object? owner;
static Future<List<InternetAddress>> lookup(String host,
static Future<List<_InternetAddress>> lookup(String host,
{InternetAddressType type: InternetAddressType.any}) {
return _IOService._dispatch(_IOService.socketLookup, [host, type._value])
.then((response) {
if (isErrorResponse(response)) {
throw createError(response, "Failed host lookup: '$host'");
} else {
return response.skip(1).map<InternetAddress>((result) {
var type = InternetAddressType._from(result[0]);
return _InternetAddress(type, result[1], host, result[2], result[3]);
}).toList();
}
return [
for (var result in response.skip(1))
_InternetAddress(InternetAddressType._from(result[0]), result[1],
host, result[2], result[3])
];
});
}
static Stream<List<_InternetAddress>> lookupAsStream(String host,
{InternetAddressType type: InternetAddressType.any}) {
final controller = StreamController<List<_InternetAddress>>();
controller.onListen = () {
lookup(host, type: type).then((list) {
controller.add(list);
}, onError: controller.addError).whenComplete(controller.close);
};
return controller.stream;
}
static Future<InternetAddress> reverseLookup(InternetAddress addr) {
return _IOService._dispatch(_IOService.socketReverseLookup,
[(addr as _InternetAddress)._in_addr]).then((response) {
@ -572,6 +583,69 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
(char == '8' || char == '9' || char == 'a' || char == 'b');
}
/// Explicitly makes two separate OS lookup requests: first for IPv4, then
/// after short delay for IPv6.
/// This avoids making single OS lookup request that internally does both IPv4
/// and IPv6 together, which on iOS sometimes seems to be taking unreasonably
/// long because of slow IPv6 lookup even though IPv4 lookup is fast.
static Stream<List<_InternetAddress>> staggeredLookup(String host) {
final controller = StreamController<List<_InternetAddress>>(sync: true);
controller.onListen = () {
// Completed when there are no further addresses, or when the returned
// stream is canceled,
// The latter signals that no further addresses are needed.
// When both completers are completed, one way or another, the stream is
// closed.
final ipv4Completer = Completer<void>();
final ipv6Completer = Completer<void>();
void lookupAddresses(InternetAddressType type, Completer<void> done) {
lookup(host, type: type).then((addresses) {
if (done.isCompleted) {
// By the time lookup is done, [connectNext] might have
// been able to connect to one of the resolved addresses.
return;
}
controller.add(addresses);
}, onError: (e, st) {
if (done.isCompleted) {
// By the time lookup is done, [connectNext] might have
// been able to connect to one of the resolved addresses.
return;
}
controller.addError(e, st);
}).whenComplete(() {
if (!done.isCompleted) {
done.complete();
}
});
}
lookupAddresses(InternetAddressType.IPv4, ipv4Completer);
// Give a chance for a connect to an IPv4 address to complete before
// starting an IPv6 lookup. If IPv4 connect succeeds before timer goes
// off, the timer gets cancelled.
const concurrentLookupDelay = Duration(milliseconds: 10);
final ipv6LookupDelay = Timer(concurrentLookupDelay, () {
lookupAddresses(InternetAddressType.IPv6, ipv6Completer);
});
Future.wait([ipv4Completer.future, ipv6Completer.future])
.then((_) => controller.close());
controller.onCancel = () {
// This is invoked when [connectNext] managed to connect to one of the
// looked-up addresses at which point we want to stop looking up
// the addresses.
if (!ipv4Completer.isCompleted) ipv4Completer.complete();
if (!ipv6Completer.isCompleted) ipv6Completer.complete();
ipv6LookupDelay.cancel();
};
};
return controller.stream;
}
static Future<ConnectionTask<_NativeSocket>> startConnect(
dynamic host, int port, dynamic sourceAddress) {
// Looks up [sourceAddress] to one or more IP addresses,
@ -596,53 +670,58 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
"Must be a string or native InternetAddress");
}
}
return new Future.value(host).then<List<InternetAddress>>((host) {
if (host is _InternetAddress) return [host];
return lookup(host).then((addresses) {
if (addresses.isEmpty) {
throw createError(null, "Failed host lookup: '$host'");
return new Future.value(host).then<ConnectionTask<_NativeSocket>>((host) {
if (host is _InternetAddress) {
return tryConnectToResolvedAddresses(
host, port, source, Stream.value(<_InternetAddress>[host]));
}
return addresses;
final hostname = host as String;
final staggeredLookupOverride = bool.fromEnvironment(
"dart.library.io.force_staggered_ipv6_lookup",
defaultValue: false);
// On ios name resolution can get delayed by slow IPv6 name resolution,
// so we run IPv4 and IPv6 name resolution in parallel(IPv6 slightly
// delayed so if IPv4 is successfully looked up, we don't do IPv6 look up
// at all) and grab first successfully resolved name we are able to connect to.
final Stream<List<_InternetAddress>> stream =
Platform.isIOS || staggeredLookupOverride
? staggeredLookup(hostname)
: lookupAsStream(hostname);
return tryConnectToResolvedAddresses(host, port, source, stream);
});
}).then((addresses) {
assert(addresses.isNotEmpty);
}
static ConnectionTask<_NativeSocket> tryConnectToResolvedAddresses(
dynamic host,
int port,
_InternetAddress? source,
Stream<List<_InternetAddress>> addresses) {
// Completer for result.
var result = new Completer<_NativeSocket>();
// Index of next address in [addresses] to try.
var index = 0;
final result = new Completer<_NativeSocket>();
// Error, set if an error occurs.
// Keeps first error if multiple errors occour.
// Keeps first error if multiple errors occur.
var error = null;
// Active timers for on-going connection attempts.
// Contains all sockets which haven't received and initial
// Contains all sockets which haven't received an initial
// write or error event.
var connecting = <_NativeSocket>{};
final connecting = <_NativeSocket>{};
// Timer counting down from the last connection attempt.
// Reset when a new connection is attempted,
// which happens either when a previous timer runs out,
// or when a previous connection attempt fails.
Timer? timer;
// Addresses arrived from lookup stream, but haven't been tried to connect
// to yet due to Timer-based throttling.
final pendingLookedUp = Queue<_InternetAddress>();
// Attempt to connect to the next address in [addresses].
//
// Called initially, then when either a connection attempt fails,
// or an amount of time has passed since the last connection
// was attempted.
void connectNext() {
timer?.cancel();
if (index >= addresses.length) {
if (connecting.isEmpty) {
assert(error != null);
assert(!result.isCompleted);
result.completeError(error);
}
return;
}
final address = addresses[index++] as _InternetAddress;
var socket = new _NativeSocket.normal(address);
// Will contain values of various types representing the result
// of trying to create a connection.
// A value of `true` means success, everything else means failure.
// When deciding how to handle errors we need to know whether more
// addresses potentially are coming from the lookup stream.
bool isLookedUpStreamClosed = false;
late StreamSubscription<List<_InternetAddress>> addressesSubscription;
Object? createConnection(_InternetAddress address, _InternetAddress? source,
_NativeSocket socket) {
Object? connectionResult;
if (address.type == InternetAddressType.unix) {
if (source == null) {
@ -666,23 +745,55 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
}
assert(connectionResult == true || connectionResult is OSError);
}
if (connectionResult != true) {
// connectionResult was not a success.
return connectionResult;
}
createConnectionError(Object? connectionResult, _InternetAddress address,
int port, _NativeSocket socket) {
if (connectionResult is OSError) {
int errorCode = connectionResult.errorCode;
final errorCode = connectionResult.errorCode;
if (source != null &&
errorCode != null &&
socket.isBindError(errorCode)) {
error = createError(connectionResult, "Bind failed", source);
return createError(connectionResult, "Bind failed", source);
} else {
error = createError(
return createError(
connectionResult, "Connection failed", address, port);
}
} else if (connectionResult is Error) {
error = connectionResult;
} else {
error = createError(null, "Connection failed", address);
return connectionResult;
}
return createError(null, "Connection failed", address);
}
// Invoked either directly or via throttling Timer callback when we
// are ready to verify that we can connect to resolved address.
connectNext() {
timer?.cancel();
if (isLookedUpStreamClosed &&
connecting.isEmpty &&
pendingLookedUp.isEmpty) {
assert(error != null);
if (!result.isCompleted) {
// Might be already completed via onCancel
result.completeError(error);
}
return;
}
if (pendingLookedUp.isEmpty) {
assert(!isLookedUpStreamClosed || connecting.isNotEmpty);
return;
}
final address = pendingLookedUp.removeFirst();
final socket = new _NativeSocket.normal(address);
// Will contain values of various types representing the result
// of trying to create a connection.
// A value of `true` means success, everything else means failure.
final Object? connectionResult =
createConnection(address, source, socket);
if (connectionResult != true) {
// connectionResult was not a success.
error = createConnectionError(connectionResult, address, port, socket);
connectNext(); // Try again after failure to connect.
return;
}
@ -698,10 +809,9 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
// Try again if no response (failure or success) within a duration.
// If this occurs, the socket is still trying to connect, and might
// succeed or fail later.
var duration =
final duration =
address.isLoopback ? _retryDurationLoopback : _retryDuration;
timer = new Timer(duration, connectNext);
connecting.add(socket);
// Setup handlers for receiving the first write event which
// indicate that the socket is fully connected.
@ -716,15 +826,15 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
// the SO_ERROR option at level SOL_SOCKET to determine whether
// connect() completed successfully (SO_ERROR is zero) or
// unsuccessfully.
OSError osError = socket.nativeGetError();
final OSError osError = socket.nativeGetError();
if (osError.errorCode != 0) {
socket.close();
error ??= osError;
connectNext(); // Try again after failure to connect.
connectNext();
return;
}
// Connection success!
// Stop all other connecting sockets and timers.
// Stop all other connecting sockets and the timer.
timer!.cancel();
socket.setListening(read: false, write: false);
for (var s in connecting) {
@ -733,6 +843,7 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
s.setListening(read: false, write: false);
}
connecting.clear();
addressesSubscription.cancel();
result.complete(socket);
}, error: (e, st) {
connecting.remove(socket);
@ -755,15 +866,26 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
}
connecting.clear();
if (!result.isCompleted) {
error ??= createError(null,
"Connection attempt cancelled, host: ${host}, port: ${port}");
error ??= createError(
null, "Connection attempt cancelled, host: ${host}, port: ${port}");
result.completeError(error);
}
}
addressesSubscription = addresses.listen((address) {
pendingLookedUp.addAll(address);
if (timer == null || !timer!.isActive) {
connectNext();
}
}, onDone: () {
isLookedUpStreamClosed = true;
connectNext();
}, onError: (e, st) {
error = e;
});
connectNext();
return new ConnectionTask<_NativeSocket>._(result.future, onCancel);
});
}
static Future<_NativeSocket> connect(

View file

@ -0,0 +1,75 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// SharedOptions=-Ddart.library.io.force_staggered_ipv6_lookup=true
//
import "dart:io";
import "package:async_helper/async_helper.dart";
import "package:expect/expect.dart";
const sampleData = <int>[1, 2, 3, 4, 5];
void testBadHostName() {
asyncStart();
HttpClient client = new HttpClient();
client.get("some.bad.host.name.7654321", 0, "/").then((request) {
Expect.fail("Should not open a request on bad hostname");
}).catchError((error) {
asyncEnd(); // We expect onError to be called, due to bad host name.
}, test: (error) => error is! String);
}
void testConnect(InternetAddress loopback, {int expectedElapsedMs: 0}) async {
asyncStart();
final max = 10;
final servers = <ServerSocket>[];
for (var i = 0; i < max; i++) {
final server = await ServerSocket.bind(loopback, 0);
server.listen((Socket socket) {
socket.add(sampleData);
socket.destroy();
});
servers.add(server);
}
final sw = Stopwatch()..start();
var got = 0;
for (var i = 0; i < max; i++) {
final client = await Socket.connect('localhost', servers[i].port,
sourceAddress: loopback);
client.listen((received) {
Expect.listEquals(sampleData, received);
}, onError: (e) {
Expect.fail('Unexpected failure $e');
}, onDone: () {
client.close();
got++;
if (got == max) {
// Test that no stack overflow happens.
for (final server in servers) {
server.close();
}
Expect.isTrue(sw.elapsedMilliseconds > expectedElapsedMs);
asyncEnd();
}
});
}
}
void main() async {
asyncStart();
testBadHostName();
var localhosts = await InternetAddress.lookup('localhost');
if (localhosts.contains(InternetAddress.loopbackIPv4)) {
testConnect(InternetAddress.loopbackIPv4);
}
if (localhosts.contains(InternetAddress.loopbackIPv6)) {
// matches value in socket_patch.dart
const concurrentLookupDelay = Duration(milliseconds: 10);
testConnect(InternetAddress.loopbackIPv6,
expectedElapsedMs: concurrentLookupDelay.inMilliseconds);
}
asyncEnd();
}

View file

@ -0,0 +1,75 @@
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// SharedOptions=-Ddart.library.io.force_staggered_ipv6_lookup=true
//
import "dart:io";
import "package:async_helper/async_helper.dart";
import "package:expect/expect.dart";
const sampleData = <int>[1, 2, 3, 4, 5];
void testBadHostName() {
asyncStart();
HttpClient client = new HttpClient();
client.get("some.bad.host.name.7654321", 0, "/").then((request) {
Expect.fail("Should not open a request on bad hostname");
}).catchError((error) {
asyncEnd(); // We expect onError to be called, due to bad host name.
}, test: (error) => error is! String);
}
void testConnect(InternetAddress loopback, {int expectedElapsedMs: 0}) async {
asyncStart();
final max = 10;
final servers = <ServerSocket>[];
for (var i = 0; i < max; i++) {
final server = await ServerSocket.bind(loopback, 0);
server.listen((Socket socket) {
socket.add(sampleData);
socket.destroy();
});
servers.add(server);
}
final sw = Stopwatch()..start();
var got = 0;
for (var i = 0; i < max; i++) {
final client = await Socket.connect('localhost', servers[i].port,
sourceAddress: loopback);
client.listen((received) {
Expect.listEquals(sampleData, received);
}, onError: (e) {
Expect.fail('Unexpected failure $e');
}, onDone: () {
client.close();
got++;
if (got == max) {
// Test that no stack overflow happens.
for (final server in servers) {
server.close();
}
Expect.isTrue(sw.elapsedMilliseconds > expectedElapsedMs);
asyncEnd();
}
});
}
}
void main() async {
asyncStart();
testBadHostName();
var localhosts = await InternetAddress.lookup('localhost');
if (localhosts.contains(InternetAddress.loopbackIPv4)) {
testConnect(InternetAddress.loopbackIPv4);
}
if (localhosts.contains(InternetAddress.loopbackIPv6)) {
// matches value in socket_patch.dart
const concurrentLookupDelay = Duration(milliseconds: 10);
testConnect(InternetAddress.loopbackIPv6,
expectedElapsedMs: concurrentLookupDelay.inMilliseconds);
}
asyncEnd();
}