diff --git a/sdk/lib/_internal/vm/bin/common_patch.dart b/sdk/lib/_internal/vm/bin/common_patch.dart index cd9f149e080..f788ec7c15b 100644 --- a/sdk/lib/_internal/vm/bin/common_patch.dart +++ b/sdk/lib/_internal/vm/bin/common_patch.dart @@ -21,7 +21,7 @@ import "dart:async" Zone, scheduleMicrotask; -import "dart:collection" show HashMap; +import "dart:collection" show HashMap, Queue; import "dart:convert" show Encoding, utf8; diff --git a/sdk/lib/_internal/vm/bin/socket_patch.dart b/sdk/lib/_internal/vm/bin/socket_patch.dart index 151fffe20e3..fb9e7a9142d 100644 --- a/sdk/lib/_internal/vm/bin/socket_patch.dart +++ b/sdk/lib/_internal/vm/bin/socket_patch.dart @@ -491,21 +491,32 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject { // a HttpServer, a WebSocket connection, a process pipe, etc. Object? owner; - static Future> lookup(String host, + static Future> lookup(String host, {InternetAddressType type: InternetAddressType.any}) { return _IOService._dispatch(_IOService.socketLookup, [host, type._value]) .then((response) { if (isErrorResponse(response)) { throw createError(response, "Failed host lookup: '$host'"); - } else { - return response.skip(1).map((result) { - var type = InternetAddressType._from(result[0]); - return _InternetAddress(type, result[1], host, result[2], result[3]); - }).toList(); } + return [ + for (var result in response.skip(1)) + _InternetAddress(InternetAddressType._from(result[0]), result[1], + host, result[2], result[3]) + ]; }); } + static Stream> lookupAsStream(String host, + {InternetAddressType type: InternetAddressType.any}) { + final controller = StreamController>(); + controller.onListen = () { + lookup(host, type: type).then((list) { + controller.add(list); + }, onError: controller.addError).whenComplete(controller.close); + }; + return controller.stream; + } + static Future reverseLookup(InternetAddress addr) { return _IOService._dispatch(_IOService.socketReverseLookup, [(addr as _InternetAddress)._in_addr]).then((response) { @@ -572,6 +583,69 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject { (char == '8' || char == '9' || char == 'a' || char == 'b'); } + /// Explicitly makes two separate OS lookup requests: first for IPv4, then + /// after short delay for IPv6. + /// This avoids making single OS lookup request that internally does both IPv4 + /// and IPv6 together, which on iOS sometimes seems to be taking unreasonably + /// long because of slow IPv6 lookup even though IPv4 lookup is fast. + static Stream> staggeredLookup(String host) { + final controller = StreamController>(sync: true); + + controller.onListen = () { + // Completed when there are no further addresses, or when the returned + // stream is canceled, + // The latter signals that no further addresses are needed. + // When both completers are completed, one way or another, the stream is + // closed. + final ipv4Completer = Completer(); + final ipv6Completer = Completer(); + + void lookupAddresses(InternetAddressType type, Completer done) { + lookup(host, type: type).then((addresses) { + if (done.isCompleted) { + // By the time lookup is done, [connectNext] might have + // been able to connect to one of the resolved addresses. + return; + } + controller.add(addresses); + }, onError: (e, st) { + if (done.isCompleted) { + // By the time lookup is done, [connectNext] might have + // been able to connect to one of the resolved addresses. + return; + } + controller.addError(e, st); + }).whenComplete(() { + if (!done.isCompleted) { + done.complete(); + } + }); + } + + lookupAddresses(InternetAddressType.IPv4, ipv4Completer); + // Give a chance for a connect to an IPv4 address to complete before + // starting an IPv6 lookup. If IPv4 connect succeeds before timer goes + // off, the timer gets cancelled. + const concurrentLookupDelay = Duration(milliseconds: 10); + final ipv6LookupDelay = Timer(concurrentLookupDelay, () { + lookupAddresses(InternetAddressType.IPv6, ipv6Completer); + }); + + Future.wait([ipv4Completer.future, ipv6Completer.future]) + .then((_) => controller.close()); + + controller.onCancel = () { + // This is invoked when [connectNext] managed to connect to one of the + // looked-up addresses at which point we want to stop looking up + // the addresses. + if (!ipv4Completer.isCompleted) ipv4Completer.complete(); + if (!ipv6Completer.isCompleted) ipv6Completer.complete(); + ipv6LookupDelay.cancel(); + }; + }; + return controller.stream; + } + static Future> startConnect( dynamic host, int port, dynamic sourceAddress) { // Looks up [sourceAddress] to one or more IP addresses, @@ -596,174 +670,222 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject { "Must be a string or native InternetAddress"); } } - return new Future.value(host).then>((host) { - if (host is _InternetAddress) return [host]; - return lookup(host).then((addresses) { - if (addresses.isEmpty) { - throw createError(null, "Failed host lookup: '$host'"); - } - return addresses; - }); - }).then((addresses) { - assert(addresses.isNotEmpty); - // Completer for result. - var result = new Completer<_NativeSocket>(); - // Index of next address in [addresses] to try. - var index = 0; - // Error, set if an error occurs. - // Keeps first error if multiple errors occour. - var error = null; - // Active timers for on-going connection attempts. - // Contains all sockets which haven't received and initial - // write or error event. - var connecting = <_NativeSocket>{}; - // Timer counting down from the last connection attempt. - // Reset when a new connection is attempted, - // which happens either when a previous timer runs out, - // or when a previous connection attempt fails. - Timer? timer; + return new Future.value(host).then>((host) { + if (host is _InternetAddress) { + return tryConnectToResolvedAddresses( + host, port, source, Stream.value(<_InternetAddress>[host])); + } + final hostname = host as String; + final staggeredLookupOverride = bool.fromEnvironment( + "dart.library.io.force_staggered_ipv6_lookup", + defaultValue: false); - // Attempt to connect to the next address in [addresses]. - // - // Called initially, then when either a connection attempt fails, - // or an amount of time has passed since the last connection - // was attempted. - void connectNext() { - timer?.cancel(); - if (index >= addresses.length) { - if (connecting.isEmpty) { - assert(error != null); - assert(!result.isCompleted); - result.completeError(error); - } - return; - } - final address = addresses[index++] as _InternetAddress; - var socket = new _NativeSocket.normal(address); - // Will contain values of various types representing the result - // of trying to create a connection. - // A value of `true` means success, everything else means failure. - Object? connectionResult; - if (address.type == InternetAddressType.unix) { - if (source == null) { - connectionResult = socket.nativeCreateUnixDomainConnect( - address.address, _Namespace._namespace); - } else { - assert(source.type == InternetAddressType.unix); - connectionResult = socket.nativeCreateUnixDomainBindConnect( - address.address, source.address, _Namespace._namespace); - } - assert(connectionResult == true || - connectionResult is Error || - connectionResult is OSError); + // On ios name resolution can get delayed by slow IPv6 name resolution, + // so we run IPv4 and IPv6 name resolution in parallel(IPv6 slightly + // delayed so if IPv4 is successfully looked up, we don't do IPv6 look up + // at all) and grab first successfully resolved name we are able to connect to. + final Stream> stream = + Platform.isIOS || staggeredLookupOverride + ? staggeredLookup(hostname) + : lookupAsStream(hostname); + + return tryConnectToResolvedAddresses(host, port, source, stream); + }); + } + + static ConnectionTask<_NativeSocket> tryConnectToResolvedAddresses( + dynamic host, + int port, + _InternetAddress? source, + Stream> addresses) { + // Completer for result. + final result = new Completer<_NativeSocket>(); + // Error, set if an error occurs. + // Keeps first error if multiple errors occur. + var error = null; + // Contains all sockets which haven't received an initial + // write or error event. + final connecting = <_NativeSocket>{}; + // Timer counting down from the last connection attempt. + // Reset when a new connection is attempted, + // which happens either when a previous timer runs out, + // or when a previous connection attempt fails. + Timer? timer; + // Addresses arrived from lookup stream, but haven't been tried to connect + // to yet due to Timer-based throttling. + final pendingLookedUp = Queue<_InternetAddress>(); + + // When deciding how to handle errors we need to know whether more + // addresses potentially are coming from the lookup stream. + bool isLookedUpStreamClosed = false; + late StreamSubscription> addressesSubscription; + + Object? createConnection(_InternetAddress address, _InternetAddress? source, + _NativeSocket socket) { + Object? connectionResult; + if (address.type == InternetAddressType.unix) { + if (source == null) { + connectionResult = socket.nativeCreateUnixDomainConnect( + address.address, _Namespace._namespace); } else { - if (source == null) { - connectionResult = socket.nativeCreateConnect( - address._in_addr, port, address._scope_id); - } else { - connectionResult = socket.nativeCreateBindConnect( - address._in_addr, port, source._in_addr, address._scope_id); - } - assert(connectionResult == true || connectionResult is OSError); + assert(source.type == InternetAddressType.unix); + connectionResult = socket.nativeCreateUnixDomainBindConnect( + address.address, source.address, _Namespace._namespace); } - if (connectionResult != true) { - // connectionResult was not a success. - if (connectionResult is OSError) { - int errorCode = connectionResult.errorCode; - if (source != null && - errorCode != null && - socket.isBindError(errorCode)) { - error = createError(connectionResult, "Bind failed", source); - } else { - error = createError( - connectionResult, "Connection failed", address, port); - } - } else if (connectionResult is Error) { - error = connectionResult; - } else { - error = createError(null, "Connection failed", address); - } - connectNext(); // Try again after failure to connect. - return; - } - // Query the local port for error messages. - try { - socket.port; - } catch (e) { - error ??= createError(e, "Connection failed", address, port); - connectNext(); // Try again after failure to connect. - return; + assert(connectionResult == true || + connectionResult is Error || + connectionResult is OSError); + } else { + if (source == null) { + connectionResult = socket.nativeCreateConnect( + address._in_addr, port, address._scope_id); + } else { + connectionResult = socket.nativeCreateBindConnect( + address._in_addr, port, source._in_addr, address._scope_id); } + assert(connectionResult == true || connectionResult is OSError); + } + return connectionResult; + } - // Try again if no response (failure or success) within a duration. - // If this occurs, the socket is still trying to connect, and might - // succeed or fail later. - var duration = - address.isLoopback ? _retryDurationLoopback : _retryDuration; - timer = new Timer(duration, connectNext); + createConnectionError(Object? connectionResult, _InternetAddress address, + int port, _NativeSocket socket) { + if (connectionResult is OSError) { + final errorCode = connectionResult.errorCode; + if (source != null && + errorCode != null && + socket.isBindError(errorCode)) { + return createError(connectionResult, "Bind failed", source); + } else { + return createError( + connectionResult, "Connection failed", address, port); + } + } else if (connectionResult is Error) { + return connectionResult; + } + return createError(null, "Connection failed", address); + } - connecting.add(socket); - // Setup handlers for receiving the first write event which - // indicate that the socket is fully connected. - socket.setHandlers(write: () { - // First remote response on connection. - // If error, drop the socket and go to the next address. - // If success, complete with the socket - // and stop all other open connection attempts. - connecting.remove(socket); - // From 'man 2 connect': - // After select(2) indicates writability, use getsockopt(2) to read - // the SO_ERROR option at level SOL_SOCKET to determine whether - // connect() completed successfully (SO_ERROR is zero) or - // unsuccessfully. - OSError osError = socket.nativeGetError(); - if (osError.errorCode != 0) { - socket.close(); - error ??= osError; - connectNext(); // Try again after failure to connect. - return; - } - // Connection success! - // Stop all other connecting sockets and timers. - timer!.cancel(); - socket.setListening(read: false, write: false); - for (var s in connecting) { - s.close(); - s.setHandlers(); - s.setListening(read: false, write: false); - } - connecting.clear(); - result.complete(socket); - }, error: (e, st) { - connecting.remove(socket); - socket.close(); - socket.setHandlers(); - socket.setListening(read: false, write: false); - // Keep first error, if present. - error ??= e; - connectNext(); // Try again after failure to connect. - }); - socket.setListening(read: false, write: true); + // Invoked either directly or via throttling Timer callback when we + // are ready to verify that we can connect to resolved address. + connectNext() { + timer?.cancel(); + if (isLookedUpStreamClosed && + connecting.isEmpty && + pendingLookedUp.isEmpty) { + assert(error != null); + if (!result.isCompleted) { + // Might be already completed via onCancel + result.completeError(error); + } + return; + } + if (pendingLookedUp.isEmpty) { + assert(!isLookedUpStreamClosed || connecting.isNotEmpty); + return; + } + final address = pendingLookedUp.removeFirst(); + final socket = new _NativeSocket.normal(address); + // Will contain values of various types representing the result + // of trying to create a connection. + // A value of `true` means success, everything else means failure. + final Object? connectionResult = + createConnection(address, source, socket); + if (connectionResult != true) { + // connectionResult was not a success. + error = createConnectionError(connectionResult, address, port, socket); + connectNext(); // Try again after failure to connect. + return; + } + // Query the local port for error messages. + try { + socket.port; + } catch (e) { + error ??= createError(e, "Connection failed", address, port); + connectNext(); // Try again after failure to connect. + return; } - void onCancel() { - timer?.cancel(); + // Try again if no response (failure or success) within a duration. + // If this occurs, the socket is still trying to connect, and might + // succeed or fail later. + final duration = + address.isLoopback ? _retryDurationLoopback : _retryDuration; + timer = new Timer(duration, connectNext); + connecting.add(socket); + // Setup handlers for receiving the first write event which + // indicate that the socket is fully connected. + socket.setHandlers(write: () { + // First remote response on connection. + // If error, drop the socket and go to the next address. + // If success, complete with the socket + // and stop all other open connection attempts. + connecting.remove(socket); + // From 'man 2 connect': + // After select(2) indicates writability, use getsockopt(2) to read + // the SO_ERROR option at level SOL_SOCKET to determine whether + // connect() completed successfully (SO_ERROR is zero) or + // unsuccessfully. + final OSError osError = socket.nativeGetError(); + if (osError.errorCode != 0) { + socket.close(); + error ??= osError; + connectNext(); + return; + } + // Connection success! + // Stop all other connecting sockets and the timer. + timer!.cancel(); + socket.setListening(read: false, write: false); for (var s in connecting) { s.close(); s.setHandlers(); s.setListening(read: false, write: false); } connecting.clear(); - if (!result.isCompleted) { - error ??= createError(null, - "Connection attempt cancelled, host: ${host}, port: ${port}"); - result.completeError(error); - } - } + addressesSubscription.cancel(); + result.complete(socket); + }, error: (e, st) { + connecting.remove(socket); + socket.close(); + socket.setHandlers(); + socket.setListening(read: false, write: false); + // Keep first error, if present. + error ??= e; + connectNext(); // Try again after failure to connect. + }); + socket.setListening(read: false, write: true); + } + void onCancel() { + timer?.cancel(); + for (var s in connecting) { + s.close(); + s.setHandlers(); + s.setListening(read: false, write: false); + } + connecting.clear(); + if (!result.isCompleted) { + error ??= createError( + null, "Connection attempt cancelled, host: ${host}, port: ${port}"); + result.completeError(error); + } + } + + addressesSubscription = addresses.listen((address) { + pendingLookedUp.addAll(address); + if (timer == null || !timer!.isActive) { + connectNext(); + } + }, onDone: () { + isLookedUpStreamClosed = true; connectNext(); - return new ConnectionTask<_NativeSocket>._(result.future, onCancel); + }, onError: (e, st) { + error = e; }); + + connectNext(); + return new ConnectionTask<_NativeSocket>._(result.future, onCancel); } static Future<_NativeSocket> connect( diff --git a/tests/standalone/io/http_force_staggered_ipv6_lookup_test.dart b/tests/standalone/io/http_force_staggered_ipv6_lookup_test.dart new file mode 100644 index 00000000000..c64448840da --- /dev/null +++ b/tests/standalone/io/http_force_staggered_ipv6_lookup_test.dart @@ -0,0 +1,75 @@ +// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +// +// SharedOptions=-Ddart.library.io.force_staggered_ipv6_lookup=true +// + +import "dart:io"; + +import "package:async_helper/async_helper.dart"; +import "package:expect/expect.dart"; + +const sampleData = [1, 2, 3, 4, 5]; + +void testBadHostName() { + asyncStart(); + HttpClient client = new HttpClient(); + client.get("some.bad.host.name.7654321", 0, "/").then((request) { + Expect.fail("Should not open a request on bad hostname"); + }).catchError((error) { + asyncEnd(); // We expect onError to be called, due to bad host name. + }, test: (error) => error is! String); +} + +void testConnect(InternetAddress loopback, {int expectedElapsedMs: 0}) async { + asyncStart(); + final max = 10; + final servers = []; + for (var i = 0; i < max; i++) { + final server = await ServerSocket.bind(loopback, 0); + server.listen((Socket socket) { + socket.add(sampleData); + socket.destroy(); + }); + servers.add(server); + } + final sw = Stopwatch()..start(); + var got = 0; + for (var i = 0; i < max; i++) { + final client = await Socket.connect('localhost', servers[i].port, + sourceAddress: loopback); + client.listen((received) { + Expect.listEquals(sampleData, received); + }, onError: (e) { + Expect.fail('Unexpected failure $e'); + }, onDone: () { + client.close(); + got++; + if (got == max) { + // Test that no stack overflow happens. + for (final server in servers) { + server.close(); + } + Expect.isTrue(sw.elapsedMilliseconds > expectedElapsedMs); + asyncEnd(); + } + }); + } +} + +void main() async { + asyncStart(); + testBadHostName(); + var localhosts = await InternetAddress.lookup('localhost'); + if (localhosts.contains(InternetAddress.loopbackIPv4)) { + testConnect(InternetAddress.loopbackIPv4); + } + if (localhosts.contains(InternetAddress.loopbackIPv6)) { + // matches value in socket_patch.dart + const concurrentLookupDelay = Duration(milliseconds: 10); + testConnect(InternetAddress.loopbackIPv6, + expectedElapsedMs: concurrentLookupDelay.inMilliseconds); + } + asyncEnd(); +} diff --git a/tests/standalone_2/io/http_force_staggered_ipv6_lookup_test.dart b/tests/standalone_2/io/http_force_staggered_ipv6_lookup_test.dart new file mode 100644 index 00000000000..c64448840da --- /dev/null +++ b/tests/standalone_2/io/http_force_staggered_ipv6_lookup_test.dart @@ -0,0 +1,75 @@ +// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. +// +// SharedOptions=-Ddart.library.io.force_staggered_ipv6_lookup=true +// + +import "dart:io"; + +import "package:async_helper/async_helper.dart"; +import "package:expect/expect.dart"; + +const sampleData = [1, 2, 3, 4, 5]; + +void testBadHostName() { + asyncStart(); + HttpClient client = new HttpClient(); + client.get("some.bad.host.name.7654321", 0, "/").then((request) { + Expect.fail("Should not open a request on bad hostname"); + }).catchError((error) { + asyncEnd(); // We expect onError to be called, due to bad host name. + }, test: (error) => error is! String); +} + +void testConnect(InternetAddress loopback, {int expectedElapsedMs: 0}) async { + asyncStart(); + final max = 10; + final servers = []; + for (var i = 0; i < max; i++) { + final server = await ServerSocket.bind(loopback, 0); + server.listen((Socket socket) { + socket.add(sampleData); + socket.destroy(); + }); + servers.add(server); + } + final sw = Stopwatch()..start(); + var got = 0; + for (var i = 0; i < max; i++) { + final client = await Socket.connect('localhost', servers[i].port, + sourceAddress: loopback); + client.listen((received) { + Expect.listEquals(sampleData, received); + }, onError: (e) { + Expect.fail('Unexpected failure $e'); + }, onDone: () { + client.close(); + got++; + if (got == max) { + // Test that no stack overflow happens. + for (final server in servers) { + server.close(); + } + Expect.isTrue(sw.elapsedMilliseconds > expectedElapsedMs); + asyncEnd(); + } + }); + } +} + +void main() async { + asyncStart(); + testBadHostName(); + var localhosts = await InternetAddress.lookup('localhost'); + if (localhosts.contains(InternetAddress.loopbackIPv4)) { + testConnect(InternetAddress.loopbackIPv4); + } + if (localhosts.contains(InternetAddress.loopbackIPv6)) { + // matches value in socket_patch.dart + const concurrentLookupDelay = Duration(milliseconds: 10); + testConnect(InternetAddress.loopbackIPv6, + expectedElapsedMs: concurrentLookupDelay.inMilliseconds); + } + asyncEnd(); +}