mirror of
https://github.com/dart-lang/sdk
synced 2024-10-04 16:54:55 +00:00
[dart:io] Adds Socket.startConnect
This is the second part of https://dart-review.googlesource.com/c/sdk/+/62484 This CL adds a startConnect method to Socket types that returns a ConnectionTask object that can be cancelled. Cancelling a ConnectionTask closes any sockets that were opened for the connection attempt that are not yet connected to the host. This allows a closing HttpClient to close sockets for pending requests whose sockets weren't fully connected yet. related https://github.com/flutter/flutter/issues/18617 Change-Id: I59c761b06e070d555fc514614079930b69c129dd Reviewed-on: https://dart-review.googlesource.com/62780 Commit-Queue: Zach Anderson <zra@google.com> Reviewed-by: Ryan Macnak <rmacnak@google.com>
This commit is contained in:
parent
74ba5c3705
commit
eb3becea2c
|
@ -20,6 +20,9 @@
|
|||
|
||||
* `dart:io`
|
||||
* Adds `HttpClient.connectionTimeout`.
|
||||
* Adds `{Socket,RawSocket,SecureSocket}.startConnect`. These return a
|
||||
`ConnectionTask`, which can be used to cancel an in-flight connection
|
||||
attempt.
|
||||
|
||||
## 2.0.0-dev.65.0
|
||||
|
||||
|
|
|
@ -461,6 +461,12 @@ class RawSocket {
|
|||
{sourceAddress, Duration timeout}) {
|
||||
throw UnsupportedError("RawSocket constructor");
|
||||
}
|
||||
|
||||
@patch
|
||||
static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
|
||||
{sourceAddress}) {
|
||||
throw UnsupportedError("RawSocket constructor");
|
||||
}
|
||||
}
|
||||
|
||||
@patch
|
||||
|
@ -470,6 +476,12 @@ class Socket {
|
|||
{sourceAddress, Duration timeout}) {
|
||||
throw UnsupportedError("Socket constructor");
|
||||
}
|
||||
|
||||
@patch
|
||||
static Future<ConnectionTask<Socket>> _startConnect(host, int port,
|
||||
{sourceAddress}) {
|
||||
throw UnsupportedError("Socket constructor");
|
||||
}
|
||||
}
|
||||
|
||||
@patch
|
||||
|
|
|
@ -20,6 +20,12 @@ class RawSocket {
|
|||
{sourceAddress, Duration timeout}) {
|
||||
return _RawSocket.connect(host, port, sourceAddress, timeout);
|
||||
}
|
||||
|
||||
@patch
|
||||
static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
|
||||
{sourceAddress}) {
|
||||
return _RawSocket.startConnect(host, port, sourceAddress);
|
||||
}
|
||||
}
|
||||
|
||||
@patch
|
||||
|
@ -350,8 +356,8 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
|
|||
|
||||
static Future<List<InternetAddress>> lookup(String host,
|
||||
{InternetAddressType type: InternetAddressType.any}) {
|
||||
return _IOService._dispatch(
|
||||
_IOService.socketLookup, [host, type._value]).then((response) {
|
||||
return _IOService._dispatch(_IOService.socketLookup, [host, type._value])
|
||||
.then((response) {
|
||||
if (isErrorResponse(response)) {
|
||||
throw createError(response, "Failed host lookup: '$host'");
|
||||
} else {
|
||||
|
@ -378,8 +384,8 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
|
|||
{bool includeLoopback: false,
|
||||
bool includeLinkLocal: false,
|
||||
InternetAddressType type: InternetAddressType.any}) {
|
||||
return _IOService._dispatch(
|
||||
_IOService.socketListInterfaces, [type._value]).then((response) {
|
||||
return _IOService._dispatch(_IOService.socketListInterfaces, [type._value])
|
||||
.then((response) {
|
||||
if (isErrorResponse(response)) {
|
||||
throw createError(response, "Failed listing interfaces");
|
||||
} else {
|
||||
|
@ -400,8 +406,8 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
|
|||
});
|
||||
}
|
||||
|
||||
static Future<_NativeSocket> connect(
|
||||
host, int port, sourceAddress, Duration timeout) {
|
||||
static Future<ConnectionTask<_NativeSocket>> startConnect(
|
||||
host, int port, sourceAddress) {
|
||||
_throwOnBadPort(port);
|
||||
if (sourceAddress != null && sourceAddress is! _InternetAddress) {
|
||||
if (sourceAddress is String) {
|
||||
|
@ -421,29 +427,11 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
|
|||
var it = (addresses as List<InternetAddress>).iterator;
|
||||
var error = null;
|
||||
var connecting = new HashMap();
|
||||
Timer timeoutTimer = null;
|
||||
void timeoutHandler() {
|
||||
connecting.forEach((s, t) {
|
||||
t.cancel();
|
||||
s.close();
|
||||
s.setHandlers();
|
||||
s.setListening(read: false, write: false);
|
||||
error = createError(
|
||||
null, "Connection timed out, host: ${host}, port: ${port}");
|
||||
completer.completeError(error);
|
||||
});
|
||||
}
|
||||
|
||||
void connectNext() {
|
||||
if ((timeout != null) && (timeoutTimer == null)) {
|
||||
timeoutTimer = new Timer(timeout, timeoutHandler);
|
||||
}
|
||||
if (!it.moveNext()) {
|
||||
if (connecting.isEmpty) {
|
||||
assert(error != null);
|
||||
if (timeoutTimer != null) {
|
||||
timeoutTimer.cancel();
|
||||
}
|
||||
completer.completeError(error);
|
||||
}
|
||||
return;
|
||||
|
@ -471,11 +459,13 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
|
|||
}
|
||||
connectNext();
|
||||
} else {
|
||||
// Query the local port, for error messages.
|
||||
// Query the local port for error messages.
|
||||
try {
|
||||
socket.port;
|
||||
} catch (e) {
|
||||
error = createError(e, "Connection failed", address, port);
|
||||
if (error == null) {
|
||||
error = createError(e, "Connection failed", address, port);
|
||||
}
|
||||
connectNext();
|
||||
}
|
||||
// Set up timer for when we should retry the next address
|
||||
|
@ -490,9 +480,6 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
|
|||
// indicate that the socket is fully connected.
|
||||
socket.setHandlers(write: () {
|
||||
timer.cancel();
|
||||
if (timeoutTimer != null) {
|
||||
timeoutTimer.cancel();
|
||||
}
|
||||
socket.setListening(read: false, write: false);
|
||||
completer.complete(socket);
|
||||
connecting.remove(socket);
|
||||
|
@ -514,8 +501,39 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
|
|||
}
|
||||
}
|
||||
|
||||
void onCancel() {
|
||||
connecting.forEach((s, t) {
|
||||
t.cancel();
|
||||
s.close();
|
||||
s.setHandlers();
|
||||
s.setListening(read: false, write: false);
|
||||
if (error == null) {
|
||||
error = createError(null,
|
||||
"Connection attempt canceled, host: ${host}, port: ${port}");
|
||||
}
|
||||
});
|
||||
completer.completeError(error);
|
||||
}
|
||||
|
||||
connectNext();
|
||||
return completer.future;
|
||||
return new ConnectionTask<_NativeSocket>._(
|
||||
socket: completer.future, onCancel: onCancel);
|
||||
});
|
||||
}
|
||||
|
||||
static Future<_NativeSocket> connect(
|
||||
host, int port, sourceAddress, Duration timeout) {
|
||||
return startConnect(host, port, sourceAddress)
|
||||
.then((ConnectionTask<_NativeSocket> task) {
|
||||
Future<_NativeSocket> socketFuture = task.socket;
|
||||
if (timeout != null) {
|
||||
socketFuture = socketFuture.timeout(timeout, onTimeout: () {
|
||||
task.cancel();
|
||||
throw createError(
|
||||
null, "Connection timed out, host: ${host}, port: ${port}");
|
||||
});
|
||||
}
|
||||
return socketFuture;
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -1134,8 +1152,7 @@ class _RawServerSocket extends Stream<RawSocket> implements RawServerSocket {
|
|||
address, int port, int backlog, bool v6Only, bool shared) {
|
||||
_throwOnBadPort(port);
|
||||
if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog");
|
||||
return _NativeSocket
|
||||
.bind(address, port, backlog, v6Only, shared)
|
||||
return _NativeSocket.bind(address, port, backlog, v6Only, shared)
|
||||
.then((socket) => new _RawServerSocket(socket, v6Only));
|
||||
}
|
||||
|
||||
|
@ -1228,11 +1245,21 @@ class _RawSocket extends Stream<RawSocketEvent> implements RawSocket {
|
|||
|
||||
static Future<RawSocket> connect(
|
||||
host, int port, sourceAddress, Duration timeout) {
|
||||
return _NativeSocket
|
||||
.connect(host, port, sourceAddress, timeout)
|
||||
return _NativeSocket.connect(host, port, sourceAddress, timeout)
|
||||
.then((socket) => new _RawSocket(socket));
|
||||
}
|
||||
|
||||
static Future<ConnectionTask<_RawSocket>> startConnect(
|
||||
host, int port, sourceAddress) {
|
||||
return _NativeSocket.startConnect(host, port, sourceAddress)
|
||||
.then((ConnectionTask<_NativeSocket> nativeTask) {
|
||||
final Future<_RawSocket> raw = nativeTask.socket
|
||||
.then((_NativeSocket nativeSocket) => new _RawSocket(nativeSocket));
|
||||
return new ConnectionTask<_RawSocket>._(
|
||||
socket: raw, onCancel: nativeTask._onCancel);
|
||||
});
|
||||
}
|
||||
|
||||
_RawSocket(this._socket) {
|
||||
var zone = Zone.current;
|
||||
_controller = new StreamController(
|
||||
|
@ -1381,8 +1408,7 @@ class _ServerSocket extends Stream<Socket> implements ServerSocket {
|
|||
|
||||
static Future<_ServerSocket> bind(
|
||||
address, int port, int backlog, bool v6Only, bool shared) {
|
||||
return _RawServerSocket
|
||||
.bind(address, port, backlog, v6Only, shared)
|
||||
return _RawServerSocket.bind(address, port, backlog, v6Only, shared)
|
||||
.then((socket) => new _ServerSocket(socket));
|
||||
}
|
||||
|
||||
|
@ -1414,10 +1440,22 @@ class Socket {
|
|||
@patch
|
||||
static Future<Socket> _connect(host, int port,
|
||||
{sourceAddress, Duration timeout}) {
|
||||
return RawSocket
|
||||
.connect(host, port, sourceAddress: sourceAddress, timeout: timeout)
|
||||
return RawSocket.connect(host, port,
|
||||
sourceAddress: sourceAddress, timeout: timeout)
|
||||
.then((socket) => new _Socket(socket));
|
||||
}
|
||||
|
||||
@patch
|
||||
static Future<ConnectionTask<Socket>> _startConnect(host, int port,
|
||||
{sourceAddress}) {
|
||||
return RawSocket.startConnect(host, port, sourceAddress: sourceAddress)
|
||||
.then((rawTask) {
|
||||
Future<Socket> socket =
|
||||
rawTask.socket.then((rawSocket) => new _Socket(rawSocket));
|
||||
return new ConnectionTask<Socket>._(
|
||||
socket: socket, onCancel: rawTask._onCancel);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
class _SocketStreamConsumer extends StreamConsumer<List<int>> {
|
||||
|
@ -1774,8 +1812,7 @@ class _RawDatagramSocket extends Stream<RawSocketEvent>
|
|||
|
||||
static Future<RawDatagramSocket> bind(host, int port, bool reuseAddress) {
|
||||
_throwOnBadPort(port);
|
||||
return _NativeSocket
|
||||
.bindDatagram(host, port, reuseAddress)
|
||||
return _NativeSocket.bindDatagram(host, port, reuseAddress)
|
||||
.then((socket) => new _RawDatagramSocket(socket));
|
||||
}
|
||||
|
||||
|
|
|
@ -1039,8 +1039,9 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
|
|||
|
||||
Future<HttpClientResponse> get done {
|
||||
if (_response == null) {
|
||||
_response = Future.wait([_responseCompleter.future, super.done],
|
||||
eagerError: true).then((list) => list[0]);
|
||||
_response =
|
||||
Future.wait([_responseCompleter.future, super.done], eagerError: true)
|
||||
.then((list) => list[0]);
|
||||
}
|
||||
return _response;
|
||||
}
|
||||
|
@ -1254,7 +1255,8 @@ class _HttpOutgoing implements StreamConsumer<List<int>> {
|
|||
outbound.headers.chunkedTransferEncoding) {
|
||||
List acceptEncodings =
|
||||
response._httpRequest.headers[HttpHeaders.acceptEncodingHeader];
|
||||
List contentEncoding = outbound.headers[HttpHeaders.contentEncodingHeader];
|
||||
List contentEncoding =
|
||||
outbound.headers[HttpHeaders.contentEncodingHeader];
|
||||
if (acceptEncodings != null &&
|
||||
acceptEncodings
|
||||
.expand((list) => list.split(","))
|
||||
|
@ -1657,8 +1659,8 @@ class _HttpClientConnection {
|
|||
if (proxy.isAuthenticated) {
|
||||
// If the proxy configuration contains user information use that
|
||||
// for proxy basic authorization.
|
||||
String auth = _CryptoUtils
|
||||
.bytesToBase64(utf8.encode("${proxy.username}:${proxy.password}"));
|
||||
String auth = _CryptoUtils.bytesToBase64(
|
||||
utf8.encode("${proxy.username}:${proxy.password}"));
|
||||
request.headers.set(HttpHeaders.proxyAuthorizationHeader, "Basic $auth");
|
||||
} else if (!proxy.isDirect && _httpClient._proxyCredentials.length > 0) {
|
||||
proxyCreds = _httpClient._findProxyCredentials(proxy);
|
||||
|
@ -1766,7 +1768,8 @@ class _HttpClientConnection {
|
|||
void close() {
|
||||
closed = true;
|
||||
_httpClient._connectionClosed(this);
|
||||
_streamFuture.timeout(_httpClient.idleTimeout)
|
||||
_streamFuture
|
||||
.timeout(_httpClient.idleTimeout)
|
||||
.then((_) => _socket.destroy());
|
||||
}
|
||||
|
||||
|
@ -1777,8 +1780,8 @@ class _HttpClientConnection {
|
|||
if (proxy.isAuthenticated) {
|
||||
// If the proxy configuration contains user information use that
|
||||
// for proxy basic authorization.
|
||||
String auth = _CryptoUtils
|
||||
.bytesToBase64(utf8.encode("${proxy.username}:${proxy.password}"));
|
||||
String auth = _CryptoUtils.bytesToBase64(
|
||||
utf8.encode("${proxy.username}:${proxy.password}"));
|
||||
request.headers.set(HttpHeaders.proxyAuthorizationHeader, "Basic $auth");
|
||||
}
|
||||
return request.close().then((response) {
|
||||
|
@ -1837,6 +1840,7 @@ class _ConnectionTarget {
|
|||
final SecurityContext context;
|
||||
final Set<_HttpClientConnection> _idle = new HashSet();
|
||||
final Set<_HttpClientConnection> _active = new HashSet();
|
||||
final Set<ConnectionTask> _socketTasks = new HashSet();
|
||||
final Queue _pending = new ListQueue();
|
||||
int _connecting = 0;
|
||||
|
||||
|
@ -1884,6 +1888,14 @@ class _ConnectionTarget {
|
|||
}
|
||||
|
||||
void close(bool force) {
|
||||
// Always cancel pending socket connections.
|
||||
for (var t in _socketTasks.toList()) {
|
||||
// Make sure the socket is destroyed if the ConnectionTask is cancelled.
|
||||
t.socket.then((s) {
|
||||
s.destroy();
|
||||
}, onError: (e) {});
|
||||
t.cancel();
|
||||
}
|
||||
if (force) {
|
||||
for (var c in _idle.toList()) {
|
||||
c.destroy();
|
||||
|
@ -1920,35 +1932,48 @@ class _ConnectionTarget {
|
|||
return currentBadCertificateCallback(certificate, uriHost, uriPort);
|
||||
}
|
||||
|
||||
Future socketFuture = (isSecure && proxy.isDirect
|
||||
? SecureSocket.connect(host, port,
|
||||
context: context, onBadCertificate: callback,
|
||||
timeout: client.connectionTimeout)
|
||||
: Socket.connect(host, port, timeout: client.connectionTimeout));
|
||||
Future<ConnectionTask> connectionTask = (isSecure && proxy.isDirect
|
||||
? SecureSocket.startConnect(host, port,
|
||||
context: context, onBadCertificate: callback)
|
||||
: Socket.startConnect(host, port));
|
||||
_connecting++;
|
||||
return socketFuture.then((socket) {
|
||||
_connecting--;
|
||||
socket.setOption(SocketOption.tcpNoDelay, true);
|
||||
var connection =
|
||||
new _HttpClientConnection(key, socket, client, false, context);
|
||||
if (isSecure && !proxy.isDirect) {
|
||||
connection._dispose = true;
|
||||
return connection
|
||||
.createProxyTunnel(uriHost, uriPort, proxy, callback)
|
||||
.then((tunnel) {
|
||||
client
|
||||
._getConnectionTarget(uriHost, uriPort, true)
|
||||
.addNewActive(tunnel);
|
||||
return new _ConnectionInfo(tunnel, proxy);
|
||||
return connectionTask.then((ConnectionTask task) {
|
||||
_socketTasks.add(task);
|
||||
Future socketFuture = task.socket;
|
||||
if (client.connectionTimeout != null) {
|
||||
socketFuture =
|
||||
socketFuture.timeout(client.connectionTimeout, onTimeout: () {
|
||||
_socketTasks.remove(task);
|
||||
task.cancel();
|
||||
});
|
||||
} else {
|
||||
addNewActive(connection);
|
||||
return new _ConnectionInfo(connection, proxy);
|
||||
}
|
||||
}, onError: (error) {
|
||||
_connecting--;
|
||||
_checkPending();
|
||||
throw error;
|
||||
return socketFuture.then((socket) {
|
||||
_connecting--;
|
||||
socket.setOption(SocketOption.tcpNoDelay, true);
|
||||
var connection =
|
||||
new _HttpClientConnection(key, socket, client, false, context);
|
||||
if (isSecure && !proxy.isDirect) {
|
||||
connection._dispose = true;
|
||||
return connection
|
||||
.createProxyTunnel(uriHost, uriPort, proxy, callback)
|
||||
.then((tunnel) {
|
||||
client
|
||||
._getConnectionTarget(uriHost, uriPort, true)
|
||||
.addNewActive(tunnel);
|
||||
_socketTasks.remove(task);
|
||||
return new _ConnectionInfo(tunnel, proxy);
|
||||
});
|
||||
} else {
|
||||
addNewActive(connection);
|
||||
_socketTasks.remove(task);
|
||||
return new _ConnectionInfo(connection, proxy);
|
||||
}
|
||||
}, onError: (error) {
|
||||
_connecting--;
|
||||
_socketTasks.remove(task);
|
||||
_checkPending();
|
||||
throw error;
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -2103,9 +2128,8 @@ class _HttpClient implements HttpClient {
|
|||
bool isSecure = (uri.scheme == "https");
|
||||
int port = uri.port;
|
||||
if (port == 0) {
|
||||
port = isSecure
|
||||
? HttpClient.defaultHttpsPort
|
||||
: HttpClient.defaultHttpPort;
|
||||
port =
|
||||
isSecure ? HttpClient.defaultHttpsPort : HttpClient.defaultHttpPort;
|
||||
}
|
||||
// Check to see if a proxy server should be used for this connection.
|
||||
var proxyConf = const _ProxyConfiguration.direct();
|
||||
|
@ -2502,8 +2526,8 @@ class _HttpServer extends Stream<HttpRequest>
|
|||
|
||||
static Future<HttpServer> bind(
|
||||
address, int port, int backlog, bool v6Only, bool shared) {
|
||||
return ServerSocket
|
||||
.bind(address, port, backlog: backlog, v6Only: v6Only, shared: shared)
|
||||
return ServerSocket.bind(address, port,
|
||||
backlog: backlog, v6Only: v6Only, shared: shared)
|
||||
.then<HttpServer>((socket) {
|
||||
return new _HttpServer._(socket, true);
|
||||
});
|
||||
|
@ -2517,8 +2541,7 @@ class _HttpServer extends Stream<HttpRequest>
|
|||
bool v6Only,
|
||||
bool requestClientCertificate,
|
||||
bool shared) {
|
||||
return SecureServerSocket
|
||||
.bind(address, port, context,
|
||||
return SecureServerSocket.bind(address, port, context,
|
||||
backlog: backlog,
|
||||
v6Only: v6Only,
|
||||
requestClientCertificate: requestClientCertificate,
|
||||
|
@ -2977,8 +3000,8 @@ class _SiteCredentials extends _Credentials {
|
|||
if (scheme != null && credentials.scheme != scheme) return false;
|
||||
if (uri.host != this.uri.host) return false;
|
||||
int thisPort =
|
||||
this.uri.port == 0 ? HttpClient.defaultHttpPort: this.uri.port;
|
||||
int otherPort = uri.port == 0 ? HttpClient.defaultHttpPort: uri.port;
|
||||
this.uri.port == 0 ? HttpClient.defaultHttpPort : this.uri.port;
|
||||
int otherPort = uri.port == 0 ? HttpClient.defaultHttpPort : uri.port;
|
||||
if (otherPort != thisPort) return false;
|
||||
return uri.path.startsWith(this.uri.path);
|
||||
}
|
||||
|
@ -3116,14 +3139,14 @@ class _HttpClientDigestCredentials extends _HttpClientCredentials
|
|||
}
|
||||
|
||||
void authorize(_Credentials credentials, HttpClientRequest request) {
|
||||
request.headers
|
||||
.set(HttpHeaders.authorizationHeader, authorization(credentials, request));
|
||||
request.headers.set(
|
||||
HttpHeaders.authorizationHeader, authorization(credentials, request));
|
||||
}
|
||||
|
||||
void authorizeProxy(
|
||||
_ProxyCredentials credentials, HttpClientRequest request) {
|
||||
request.headers.set(
|
||||
HttpHeaders.proxyAuthorizationHeader, authorization(credentials, request));
|
||||
request.headers.set(HttpHeaders.proxyAuthorizationHeader,
|
||||
authorization(credentials, request));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -461,6 +461,12 @@ class RawSocket {
|
|||
{sourceAddress, Duration timeout}) {
|
||||
throw new UnsupportedError("RawSocket constructor");
|
||||
}
|
||||
|
||||
@patch
|
||||
static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
|
||||
{sourceAddress}) {
|
||||
throw new UnsupportedError("RawSocket constructor");
|
||||
}
|
||||
}
|
||||
|
||||
@patch
|
||||
|
@ -470,6 +476,12 @@ class Socket {
|
|||
{sourceAddress, Duration timeout}) {
|
||||
throw new UnsupportedError("Socket constructor");
|
||||
}
|
||||
|
||||
@patch
|
||||
static Future<ConnectionTask<Socket>> _startConnect(host, int port,
|
||||
{sourceAddress}) {
|
||||
throw new UnsupportedError("Socket constructor");
|
||||
}
|
||||
}
|
||||
|
||||
@patch
|
||||
|
|
|
@ -83,6 +83,9 @@ abstract class IOOverrides {
|
|||
Future<Socket> Function(dynamic, int,
|
||||
{dynamic sourceAddress, Duration timeout})
|
||||
socketConnect,
|
||||
Future<ConnectionTask<Socket>> Function(dynamic, int,
|
||||
{dynamic sourceAddress})
|
||||
socketStartConnect,
|
||||
|
||||
// Optional Zone parameters
|
||||
ZoneSpecification zoneSpecification,
|
||||
|
@ -116,6 +119,7 @@ abstract class IOOverrides {
|
|||
|
||||
// Socket
|
||||
socketConnect,
|
||||
socketStartConnect,
|
||||
);
|
||||
return _asyncRunZoned<R>(body,
|
||||
zoneValues: {_ioOverridesToken: overrides},
|
||||
|
@ -254,12 +258,22 @@ abstract class IOOverrides {
|
|||
/// Asynchronously returns a [Socket] connected to the given host and port.
|
||||
///
|
||||
/// When this override is installed, this functions overrides the behavior of
|
||||
/// `Socet.connect(...)`.
|
||||
/// `Socket.connect(...)`.
|
||||
Future<Socket> socketConnect(host, int port,
|
||||
{sourceAddress, Duration timeout}) {
|
||||
return Socket._connect(host, port,
|
||||
sourceAddress: sourceAddress, timeout: timeout);
|
||||
}
|
||||
|
||||
/// Asynchronously returns a [ConnectionTask] that connects to the given host
|
||||
/// and port when successful.
|
||||
///
|
||||
/// When this override is installed, this functions overrides the behavior of
|
||||
/// `Socket.startConnect(...)`.
|
||||
Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
|
||||
{sourceAddress}) {
|
||||
return Socket._startConnect(host, port, sourceAddress: sourceAddress);
|
||||
}
|
||||
}
|
||||
|
||||
class _IOOverridesScope extends IOOverrides {
|
||||
|
@ -294,6 +308,8 @@ class _IOOverridesScope extends IOOverrides {
|
|||
// Socket
|
||||
Future<Socket> Function(dynamic, int,
|
||||
{dynamic sourceAddress, Duration timeout}) _socketConnect;
|
||||
Future<ConnectionTask<Socket>> Function(dynamic, int, {dynamic sourceAddress})
|
||||
_socketStartConnect;
|
||||
|
||||
_IOOverridesScope(
|
||||
// Directory
|
||||
|
@ -324,6 +340,7 @@ class _IOOverridesScope extends IOOverrides {
|
|||
|
||||
// Socket
|
||||
this._socketConnect,
|
||||
this._socketStartConnect,
|
||||
);
|
||||
|
||||
// Directory
|
||||
|
@ -448,4 +465,17 @@ class _IOOverridesScope extends IOOverrides {
|
|||
return super.socketConnect(host, port,
|
||||
sourceAddress: sourceAddress, timeout: timeout);
|
||||
}
|
||||
|
||||
@override
|
||||
Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
|
||||
{sourceAddress}) {
|
||||
if (_socketStartConnect != null) {
|
||||
return _socketStartConnect(host, port, sourceAddress: sourceAddress);
|
||||
}
|
||||
if (_previous != null) {
|
||||
return _previous.socketStartConnect(host, port,
|
||||
sourceAddress: sourceAddress);
|
||||
}
|
||||
return super.socketStartConnect(host, port, sourceAddress: sourceAddress);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,8 +46,7 @@ abstract class SecureSocket implements Socket {
|
|||
bool onBadCertificate(X509Certificate certificate),
|
||||
List<String> supportedProtocols,
|
||||
Duration timeout}) {
|
||||
return RawSecureSocket
|
||||
.connect(host, port,
|
||||
return RawSecureSocket.connect(host, port,
|
||||
context: context,
|
||||
onBadCertificate: onBadCertificate,
|
||||
supportedProtocols: supportedProtocols,
|
||||
|
@ -55,6 +54,25 @@ abstract class SecureSocket implements Socket {
|
|||
.then((rawSocket) => new SecureSocket._(rawSocket));
|
||||
}
|
||||
|
||||
/// Like [connect], but returns a [Future] that completes with a
|
||||
/// [ConnectionTask] that can be cancelled if the [SecureSocket] is no
|
||||
/// longer needed.
|
||||
static Future<ConnectionTask<SecureSocket>> startConnect(host, int port,
|
||||
{SecurityContext context,
|
||||
bool onBadCertificate(X509Certificate certificate),
|
||||
List<String> supportedProtocols}) {
|
||||
return RawSecureSocket.startConnect(host, port,
|
||||
context: context,
|
||||
onBadCertificate: onBadCertificate,
|
||||
supportedProtocols: supportedProtocols)
|
||||
.then((rawState) {
|
||||
Future<SecureSocket> socket =
|
||||
rawState.socket.then((rawSocket) => new SecureSocket._(rawSocket));
|
||||
return new ConnectionTask<SecureSocket>._(
|
||||
socket: socket, onCancel: rawState._onCancel);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes an already connected [socket] and starts client side TLS
|
||||
* handshake to make the communication secure. When the returned
|
||||
|
@ -215,6 +233,26 @@ abstract class RawSecureSocket implements RawSocket {
|
|||
});
|
||||
}
|
||||
|
||||
/// Like [connect], but returns a [Future] that completes with a
|
||||
/// [ConnectionTask] that can be cancelled if the [RawSecureSocket] is no
|
||||
/// longer needed.
|
||||
static Future<ConnectionTask<RawSecureSocket>> startConnect(host, int port,
|
||||
{SecurityContext context,
|
||||
bool onBadCertificate(X509Certificate certificate),
|
||||
List<String> supportedProtocols}) {
|
||||
return RawSocket.startConnect(host, port)
|
||||
.then((ConnectionTask<RawSocket> rawState) {
|
||||
Future<RawSecureSocket> socket = rawState.socket.then((rawSocket) {
|
||||
return secure(rawSocket,
|
||||
context: context,
|
||||
onBadCertificate: onBadCertificate,
|
||||
supportedProtocols: supportedProtocols);
|
||||
});
|
||||
return new ConnectionTask<RawSecureSocket>._(
|
||||
socket: socket, onCancel: rawState._onCancel);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes an already connected [socket] and starts client side TLS
|
||||
* handshake to make the communication secure. When the returned
|
||||
|
@ -990,8 +1028,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
|
|||
args[2 * i + 3] = bufs[i].end;
|
||||
}
|
||||
|
||||
return _IOService
|
||||
._dispatch(_IOService.sslProcessFilter, args)
|
||||
return _IOService._dispatch(_IOService.sslProcessFilter, args)
|
||||
.then((response) {
|
||||
if (response.length == 2) {
|
||||
if (wasInHandshake) {
|
||||
|
|
|
@ -426,6 +426,32 @@ class RawSocketEvent {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returned by the `startConnect` methods on client-side socket types `S`,
|
||||
/// `ConnectionTask<S>` allows cancelling an attempt to connect to a host.
|
||||
class ConnectionTask<S> {
|
||||
/// A `Future` that completes with value that `S.connect()` would return
|
||||
/// unless [cancel] is called on this [ConnectionTask].
|
||||
///
|
||||
/// If [cancel] is called, the `Future` completes with a [SocketException]
|
||||
/// error whose message indicates that the connection attempt was cancelled.
|
||||
final Future<S> socket;
|
||||
final void Function() _onCancel;
|
||||
|
||||
ConnectionTask._({Future<S> socket, void Function() onCancel})
|
||||
: assert(socket != null),
|
||||
assert(onCancel != null),
|
||||
this.socket = socket,
|
||||
this._onCancel = onCancel;
|
||||
|
||||
/// Cancels the connection attempt.
|
||||
///
|
||||
/// This also causes the [socket] `Future` to complete with a
|
||||
/// [SocketException] error.
|
||||
void cancel() {
|
||||
_onCancel();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The [RawSocket] is a low-level interface to a socket, exposing the raw
|
||||
* events signaled by the system. It's a [Stream] of [RawSocketEvent]s.
|
||||
|
@ -470,6 +496,12 @@ abstract class RawSocket implements Stream<RawSocketEvent> {
|
|||
external static Future<RawSocket> connect(host, int port,
|
||||
{sourceAddress, Duration timeout});
|
||||
|
||||
/// Like [connect], but returns a [Future] that completes with a
|
||||
/// [ConnectionTask] that can be cancelled if the [RawSocket] is no
|
||||
/// longer needed.
|
||||
external static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
|
||||
{sourceAddress});
|
||||
|
||||
/**
|
||||
* Returns the number of received and non-read bytes in the socket that
|
||||
* can be read.
|
||||
|
@ -583,9 +615,25 @@ abstract class Socket implements Stream<List<int>>, IOSink {
|
|||
sourceAddress: sourceAddress, timeout: timeout);
|
||||
}
|
||||
|
||||
/// Like [connect], but returns a [Future] that completes with a
|
||||
/// [ConnectionTask] that can be cancelled if the [Socket] is no
|
||||
/// longer needed.
|
||||
static Future<ConnectionTask<Socket>> startConnect(host, int port,
|
||||
{sourceAddress}) {
|
||||
final IOOverrides overrides = IOOverrides.current;
|
||||
if (overrides == null) {
|
||||
return Socket._startConnect(host, port, sourceAddress: sourceAddress);
|
||||
}
|
||||
return overrides.socketStartConnect(host, port,
|
||||
sourceAddress: sourceAddress);
|
||||
}
|
||||
|
||||
external static Future<Socket> _connect(host, int port,
|
||||
{sourceAddress, Duration timeout});
|
||||
|
||||
external static Future<ConnectionTask<Socket>> _startConnect(host, int port,
|
||||
{sourceAddress});
|
||||
|
||||
/**
|
||||
* Destroy the socket in both directions. Calling [destroy] will make the
|
||||
* send a close event on the stream and will no longer react on data being
|
||||
|
|
|
@ -162,6 +162,11 @@ Future<Socket> socketConnect(host, int port,
|
|||
return null;
|
||||
}
|
||||
|
||||
Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
|
||||
{sourceAddress}) {
|
||||
return null;
|
||||
}
|
||||
|
||||
Future<Null> ioOverridesRunTest() async {
|
||||
Future<Null> f = IOOverrides.runZoned(
|
||||
() async {
|
||||
|
@ -181,6 +186,7 @@ Future<Null> ioOverridesRunTest() async {
|
|||
Expect.isNull(new Directory("directory").watch());
|
||||
Expect.isTrue(new Link("link") is LinkMock);
|
||||
Expect.isNull(Socket.connect(null, 0));
|
||||
Expect.isNull(Socket.startConnect(null, 0));
|
||||
},
|
||||
createDirectory: DirectoryMock.createDirectory,
|
||||
getCurrentDirectory: DirectoryMock.getCurrent,
|
||||
|
@ -197,6 +203,7 @@ Future<Null> ioOverridesRunTest() async {
|
|||
fsWatchIsSupported: FileSystemWatcherMock.watchSupported,
|
||||
createLink: LinkMock.createLink,
|
||||
socketConnect: socketConnect,
|
||||
socketStartConnect: socketStartConnect,
|
||||
);
|
||||
Expect.isFalse(new Directory("directory") is DirectoryMock);
|
||||
Expect.isTrue(new Directory("directory") is Directory);
|
||||
|
|
|
@ -74,6 +74,21 @@ void testSimpleConnect() {
|
|||
});
|
||||
}
|
||||
|
||||
void testCancelConnect() {
|
||||
asyncStart();
|
||||
RawSocket.startConnect(InternetAddress.loopbackIPv4, 0)
|
||||
.then((ConnectionTask<RawSocket> task) {
|
||||
task.cancel();
|
||||
task.socket.catchError((error) {
|
||||
Expect.isTrue(error is SocketException);
|
||||
asyncEnd();
|
||||
});
|
||||
task.socket.then((s) {
|
||||
Expect.fail("Unreachable");
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void testCloseOneEnd(String toClose) {
|
||||
asyncStart();
|
||||
Completer serverDone = new Completer();
|
||||
|
@ -467,6 +482,7 @@ main() {
|
|||
testCloseOneEnd("server");
|
||||
testInvalidBind();
|
||||
testSimpleConnect();
|
||||
testCancelConnect();
|
||||
testServerListenAfterConnect();
|
||||
testSimpleReadWrite(dropReads: false);
|
||||
testSimpleReadWrite(dropReads: true);
|
||||
|
|
32
tests/standalone_2/io/socket_cancel_connect_test.dart
Normal file
32
tests/standalone_2/io/socket_cancel_connect_test.dart
Normal file
|
@ -0,0 +1,32 @@
|
|||
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved. Use of this source code is governed by a
|
||||
// BSD-style license that can be found in the LICENSE file.
|
||||
//
|
||||
// VMOptions=
|
||||
// VMOptions=--short_socket_read
|
||||
// VMOptions=--short_socket_write
|
||||
// VMOptions=--short_socket_read --short_socket_write
|
||||
|
||||
import "dart:async";
|
||||
import "dart:io";
|
||||
|
||||
import "package:async_helper/async_helper.dart";
|
||||
import "package:expect/expect.dart";
|
||||
|
||||
void main() {
|
||||
asyncStart();
|
||||
Duration timeout = new Duration(milliseconds: 20);
|
||||
Socket.startConnect("8.8.8.7", 80).then((task) {
|
||||
task.socket.timeout(timeout, onTimeout: () {
|
||||
task.cancel();
|
||||
});
|
||||
task.socket.then((socket) {
|
||||
Expect.fail("Unexpected connection made.");
|
||||
asyncEnd();
|
||||
}).catchError((e) {
|
||||
print(e);
|
||||
Expect.isTrue(e is SocketException);
|
||||
asyncEnd();
|
||||
});
|
||||
});
|
||||
}
|
Loading…
Reference in a new issue