Reland: [dart:io] Adds Socket.startConnect

This is a reland of https://dart-review.googlesource.com/c/sdk/+/62484
with the following changes:
- _NativeSocket.connect now drops references to pending sockets on
  an error or successful connection.
- eventhandlers are updated to ignore unset Dart ports on a close
  command.
- Test updated to account for new SocketException.

This is the second part of https://dart-review.googlesource.com/c/sdk/+/62484

This CL adds a startConnect method to Socket types that returns
a ConnectionTask object that can be cancelled. Cancelling
a ConnectionTask closes any sockets that were opened for the
connection attempt that are not yet connected to the host.

This allows a closing HttpClient to close sockets for pending
requests whose sockets weren't fully connected yet.

related https://github.com/flutter/flutter/issues/18617

Change-Id: I47fe3564e41197d622079aad4bb644bbdfe0bfe8
Reviewed-on: https://dart-review.googlesource.com/63040
Reviewed-by: Zach Anderson <zra@google.com>
Commit-Queue: Zach Anderson <zra@google.com>
This commit is contained in:
Zach Anderson 2018-07-03 14:47:41 +00:00 committed by commit-bot@chromium.org
parent c9fd95119d
commit 0ccdc3ec38
17 changed files with 371 additions and 98 deletions

View file

@ -39,6 +39,9 @@
* `dart:io`
* Adds `HttpClient.connectionTimeout`.
* Adds `{Socket,RawSocket,SecureSocket}.startConnect`. These return a
`ConnectionTask`, which can be used to cancel an in-flight connection
attempt.
## 2.0.0-dev.66.0

View file

@ -461,6 +461,12 @@ class RawSocket {
{sourceAddress, Duration timeout}) {
throw UnsupportedError("RawSocket constructor");
}
@patch
static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
{sourceAddress}) {
throw UnsupportedError("RawSocket constructor");
}
}
@patch
@ -470,6 +476,12 @@ class Socket {
{sourceAddress, Duration timeout}) {
throw UnsupportedError("Socket constructor");
}
@patch
static Future<ConnectionTask<Socket>> _startConnect(host, int port,
{sourceAddress}) {
throw UnsupportedError("Socket constructor");
}
}
@patch

View file

@ -214,7 +214,9 @@ void EventHandlerImplementation::HandleInterruptFd() {
// message.
intptr_t old_mask = di->Mask();
Dart_Port port = msg[i].dart_port;
di->RemovePort(port);
if (port != ILLEGAL_PORT) {
di->RemovePort(port);
}
intptr_t new_mask = di->Mask();
UpdateEpollInstance(old_mask, di);

View file

@ -385,7 +385,9 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
// message.
const intptr_t old_mask = di->Mask();
Dart_Port port = msg->dart_port;
di->RemovePort(port);
if (port != ILLEGAL_PORT) {
di->RemovePort(port);
}
const intptr_t new_mask = di->Mask();
UpdatePort(old_mask, di);

View file

@ -223,11 +223,14 @@ void EventHandlerImplementation::HandleInterruptFd() {
// message.
intptr_t old_mask = di->Mask();
Dart_Port port = msg[i].dart_port;
di->RemovePort(port);
if (port != ILLEGAL_PORT) {
di->RemovePort(port);
}
intptr_t new_mask = di->Mask();
UpdateEpollInstance(old_mask, di);
intptr_t fd = di->fd();
ASSERT(fd == socket->fd());
if (di->IsListeningSocket()) {
// We only close the socket file descriptor from the operating
// system if there are no other dart socket objects which

View file

@ -230,7 +230,9 @@ void EventHandlerImplementation::HandleInterruptFd() {
// message.
intptr_t old_mask = di->Mask();
Dart_Port port = msg[i].dart_port;
di->RemovePort(port);
if (port != ILLEGAL_PORT) {
di->RemovePort(port);
}
intptr_t new_mask = di->Mask();
UpdateKQueueInstance(old_mask, di);

View file

@ -1056,7 +1056,9 @@ void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
listen_socket->SetPortAndMask(msg->dart_port, events);
TryDispatchingPendingAccepts(listen_socket);
} else if (IS_COMMAND(msg->data, kCloseCommand)) {
listen_socket->RemovePort(msg->dart_port);
if (msg->dart_port != ILLEGAL_PORT) {
listen_socket->RemovePort(msg->dart_port);
}
// We only close the socket file descriptor from the operating
// system if there are no other dart socket objects which

View file

@ -20,6 +20,12 @@ class RawSocket {
{sourceAddress, Duration timeout}) {
return _RawSocket.connect(host, port, sourceAddress, timeout);
}
@patch
static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
{sourceAddress}) {
return _RawSocket.startConnect(host, port, sourceAddress);
}
}
@patch
@ -350,8 +356,8 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
static Future<List<InternetAddress>> lookup(String host,
{InternetAddressType type: InternetAddressType.any}) {
return _IOService._dispatch(
_IOService.socketLookup, [host, type._value]).then((response) {
return _IOService._dispatch(_IOService.socketLookup, [host, type._value])
.then((response) {
if (isErrorResponse(response)) {
throw createError(response, "Failed host lookup: '$host'");
} else {
@ -378,8 +384,8 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
{bool includeLoopback: false,
bool includeLinkLocal: false,
InternetAddressType type: InternetAddressType.any}) {
return _IOService._dispatch(
_IOService.socketListInterfaces, [type._value]).then((response) {
return _IOService._dispatch(_IOService.socketListInterfaces, [type._value])
.then((response) {
if (isErrorResponse(response)) {
throw createError(response, "Failed listing interfaces");
} else {
@ -400,8 +406,8 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
});
}
static Future<_NativeSocket> connect(
host, int port, sourceAddress, Duration timeout) {
static Future<ConnectionTask<_NativeSocket>> startConnect(
host, int port, sourceAddress) {
_throwOnBadPort(port);
if (sourceAddress != null && sourceAddress is! _InternetAddress) {
if (sourceAddress is String) {
@ -421,29 +427,11 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
var it = (addresses as List<InternetAddress>).iterator;
var error = null;
var connecting = new HashMap();
Timer timeoutTimer = null;
void timeoutHandler() {
connecting.forEach((s, t) {
t.cancel();
s.close();
s.setHandlers();
s.setListening(read: false, write: false);
error = createError(
null, "Connection timed out, host: ${host}, port: ${port}");
completer.completeError(error);
});
}
void connectNext() {
if ((timeout != null) && (timeoutTimer == null)) {
timeoutTimer = new Timer(timeout, timeoutHandler);
}
if (!it.moveNext()) {
if (connecting.isEmpty) {
assert(error != null);
if (timeoutTimer != null) {
timeoutTimer.cancel();
}
completer.completeError(error);
}
return;
@ -471,11 +459,13 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
}
connectNext();
} else {
// Query the local port, for error messages.
// Query the local port for error messages.
try {
socket.port;
} catch (e) {
error = createError(e, "Connection failed", address, port);
if (error == null) {
error = createError(e, "Connection failed", address, port);
}
connectNext();
}
// Set up timer for when we should retry the next address
@ -490,9 +480,6 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
// indicate that the socket is fully connected.
socket.setHandlers(write: () {
timer.cancel();
if (timeoutTimer != null) {
timeoutTimer.cancel();
}
socket.setListening(read: false, write: false);
completer.complete(socket);
connecting.remove(socket);
@ -502,6 +489,7 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
s.setHandlers();
s.setListening(read: false, write: false);
});
connecting.clear();
}, error: (e) {
timer.cancel();
socket.close();
@ -514,8 +502,42 @@ class _NativeSocket extends _NativeSocketNativeWrapper with _ServiceObject {
}
}
void onCancel() {
connecting.forEach((s, t) {
t.cancel();
s.close();
s.setHandlers();
s.setListening(read: false, write: false);
if (error == null) {
error = createError(null,
"Connection attempt cancelled, host: ${host}, port: ${port}");
}
});
connecting.clear();
if (!completer.isCompleted) {
completer.completeError(error);
}
}
connectNext();
return completer.future;
return new ConnectionTask<_NativeSocket>._(
socket: completer.future, onCancel: onCancel);
});
}
static Future<_NativeSocket> connect(
host, int port, sourceAddress, Duration timeout) {
return startConnect(host, port, sourceAddress)
.then((ConnectionTask<_NativeSocket> task) {
Future<_NativeSocket> socketFuture = task.socket;
if (timeout != null) {
socketFuture = socketFuture.timeout(timeout, onTimeout: () {
task.cancel();
throw createError(
null, "Connection timed out, host: ${host}, port: ${port}");
});
}
return socketFuture;
});
}
@ -1134,8 +1156,7 @@ class _RawServerSocket extends Stream<RawSocket> implements RawServerSocket {
address, int port, int backlog, bool v6Only, bool shared) {
_throwOnBadPort(port);
if (backlog < 0) throw new ArgumentError("Invalid backlog $backlog");
return _NativeSocket
.bind(address, port, backlog, v6Only, shared)
return _NativeSocket.bind(address, port, backlog, v6Only, shared)
.then((socket) => new _RawServerSocket(socket, v6Only));
}
@ -1228,11 +1249,21 @@ class _RawSocket extends Stream<RawSocketEvent> implements RawSocket {
static Future<RawSocket> connect(
host, int port, sourceAddress, Duration timeout) {
return _NativeSocket
.connect(host, port, sourceAddress, timeout)
return _NativeSocket.connect(host, port, sourceAddress, timeout)
.then((socket) => new _RawSocket(socket));
}
static Future<ConnectionTask<_RawSocket>> startConnect(
host, int port, sourceAddress) {
return _NativeSocket.startConnect(host, port, sourceAddress)
.then((ConnectionTask<_NativeSocket> nativeTask) {
final Future<_RawSocket> raw = nativeTask.socket
.then((_NativeSocket nativeSocket) => new _RawSocket(nativeSocket));
return new ConnectionTask<_RawSocket>._(
socket: raw, onCancel: nativeTask._onCancel);
});
}
_RawSocket(this._socket) {
var zone = Zone.current;
_controller = new StreamController(
@ -1381,8 +1412,7 @@ class _ServerSocket extends Stream<Socket> implements ServerSocket {
static Future<_ServerSocket> bind(
address, int port, int backlog, bool v6Only, bool shared) {
return _RawServerSocket
.bind(address, port, backlog, v6Only, shared)
return _RawServerSocket.bind(address, port, backlog, v6Only, shared)
.then((socket) => new _ServerSocket(socket));
}
@ -1414,10 +1444,22 @@ class Socket {
@patch
static Future<Socket> _connect(host, int port,
{sourceAddress, Duration timeout}) {
return RawSocket
.connect(host, port, sourceAddress: sourceAddress, timeout: timeout)
return RawSocket.connect(host, port,
sourceAddress: sourceAddress, timeout: timeout)
.then((socket) => new _Socket(socket));
}
@patch
static Future<ConnectionTask<Socket>> _startConnect(host, int port,
{sourceAddress}) {
return RawSocket.startConnect(host, port, sourceAddress: sourceAddress)
.then((rawTask) {
Future<Socket> socket =
rawTask.socket.then((rawSocket) => new _Socket(rawSocket));
return new ConnectionTask<Socket>._(
socket: socket, onCancel: rawTask._onCancel);
});
}
}
class _SocketStreamConsumer extends StreamConsumer<List<int>> {
@ -1774,8 +1816,7 @@ class _RawDatagramSocket extends Stream<RawSocketEvent>
static Future<RawDatagramSocket> bind(host, int port, bool reuseAddress) {
_throwOnBadPort(port);
return _NativeSocket
.bindDatagram(host, port, reuseAddress)
return _NativeSocket.bindDatagram(host, port, reuseAddress)
.then((socket) => new _RawDatagramSocket(socket));
}

View file

@ -1039,8 +1039,9 @@ class _HttpClientRequest extends _HttpOutboundMessage<HttpClientResponse>
Future<HttpClientResponse> get done {
if (_response == null) {
_response = Future.wait([_responseCompleter.future, super.done],
eagerError: true).then((list) => list[0]);
_response =
Future.wait([_responseCompleter.future, super.done], eagerError: true)
.then((list) => list[0]);
}
return _response;
}
@ -1254,7 +1255,8 @@ class _HttpOutgoing implements StreamConsumer<List<int>> {
outbound.headers.chunkedTransferEncoding) {
List acceptEncodings =
response._httpRequest.headers[HttpHeaders.acceptEncodingHeader];
List contentEncoding = outbound.headers[HttpHeaders.contentEncodingHeader];
List contentEncoding =
outbound.headers[HttpHeaders.contentEncodingHeader];
if (acceptEncodings != null &&
acceptEncodings
.expand((list) => list.split(","))
@ -1657,8 +1659,8 @@ class _HttpClientConnection {
if (proxy.isAuthenticated) {
// If the proxy configuration contains user information use that
// for proxy basic authorization.
String auth = _CryptoUtils
.bytesToBase64(utf8.encode("${proxy.username}:${proxy.password}"));
String auth = _CryptoUtils.bytesToBase64(
utf8.encode("${proxy.username}:${proxy.password}"));
request.headers.set(HttpHeaders.proxyAuthorizationHeader, "Basic $auth");
} else if (!proxy.isDirect && _httpClient._proxyCredentials.length > 0) {
proxyCreds = _httpClient._findProxyCredentials(proxy);
@ -1766,7 +1768,8 @@ class _HttpClientConnection {
void close() {
closed = true;
_httpClient._connectionClosed(this);
_streamFuture.timeout(_httpClient.idleTimeout)
_streamFuture
.timeout(_httpClient.idleTimeout)
.then((_) => _socket.destroy());
}
@ -1777,8 +1780,8 @@ class _HttpClientConnection {
if (proxy.isAuthenticated) {
// If the proxy configuration contains user information use that
// for proxy basic authorization.
String auth = _CryptoUtils
.bytesToBase64(utf8.encode("${proxy.username}:${proxy.password}"));
String auth = _CryptoUtils.bytesToBase64(
utf8.encode("${proxy.username}:${proxy.password}"));
request.headers.set(HttpHeaders.proxyAuthorizationHeader, "Basic $auth");
}
return request.close().then((response) {
@ -1837,6 +1840,7 @@ class _ConnectionTarget {
final SecurityContext context;
final Set<_HttpClientConnection> _idle = new HashSet();
final Set<_HttpClientConnection> _active = new HashSet();
final Set<ConnectionTask> _socketTasks = new HashSet();
final Queue _pending = new ListQueue();
int _connecting = 0;
@ -1884,6 +1888,14 @@ class _ConnectionTarget {
}
void close(bool force) {
// Always cancel pending socket connections.
for (var t in _socketTasks.toList()) {
// Make sure the socket is destroyed if the ConnectionTask is cancelled.
t.socket.then((s) {
s.destroy();
}, onError: (e) {});
t.cancel();
}
if (force) {
for (var c in _idle.toList()) {
c.destroy();
@ -1920,35 +1932,48 @@ class _ConnectionTarget {
return currentBadCertificateCallback(certificate, uriHost, uriPort);
}
Future socketFuture = (isSecure && proxy.isDirect
? SecureSocket.connect(host, port,
context: context, onBadCertificate: callback,
timeout: client.connectionTimeout)
: Socket.connect(host, port, timeout: client.connectionTimeout));
Future<ConnectionTask> connectionTask = (isSecure && proxy.isDirect
? SecureSocket.startConnect(host, port,
context: context, onBadCertificate: callback)
: Socket.startConnect(host, port));
_connecting++;
return socketFuture.then((socket) {
_connecting--;
socket.setOption(SocketOption.tcpNoDelay, true);
var connection =
new _HttpClientConnection(key, socket, client, false, context);
if (isSecure && !proxy.isDirect) {
connection._dispose = true;
return connection
.createProxyTunnel(uriHost, uriPort, proxy, callback)
.then((tunnel) {
client
._getConnectionTarget(uriHost, uriPort, true)
.addNewActive(tunnel);
return new _ConnectionInfo(tunnel, proxy);
return connectionTask.then((ConnectionTask task) {
_socketTasks.add(task);
Future socketFuture = task.socket;
if (client.connectionTimeout != null) {
socketFuture =
socketFuture.timeout(client.connectionTimeout, onTimeout: () {
_socketTasks.remove(task);
task.cancel();
});
} else {
addNewActive(connection);
return new _ConnectionInfo(connection, proxy);
}
}, onError: (error) {
_connecting--;
_checkPending();
throw error;
return socketFuture.then((socket) {
_connecting--;
socket.setOption(SocketOption.tcpNoDelay, true);
var connection =
new _HttpClientConnection(key, socket, client, false, context);
if (isSecure && !proxy.isDirect) {
connection._dispose = true;
return connection
.createProxyTunnel(uriHost, uriPort, proxy, callback)
.then((tunnel) {
client
._getConnectionTarget(uriHost, uriPort, true)
.addNewActive(tunnel);
_socketTasks.remove(task);
return new _ConnectionInfo(tunnel, proxy);
});
} else {
addNewActive(connection);
_socketTasks.remove(task);
return new _ConnectionInfo(connection, proxy);
}
}, onError: (error) {
_connecting--;
_socketTasks.remove(task);
_checkPending();
throw error;
});
});
}
}
@ -2103,9 +2128,8 @@ class _HttpClient implements HttpClient {
bool isSecure = (uri.scheme == "https");
int port = uri.port;
if (port == 0) {
port = isSecure
? HttpClient.defaultHttpsPort
: HttpClient.defaultHttpPort;
port =
isSecure ? HttpClient.defaultHttpsPort : HttpClient.defaultHttpPort;
}
// Check to see if a proxy server should be used for this connection.
var proxyConf = const _ProxyConfiguration.direct();
@ -2502,8 +2526,8 @@ class _HttpServer extends Stream<HttpRequest>
static Future<HttpServer> bind(
address, int port, int backlog, bool v6Only, bool shared) {
return ServerSocket
.bind(address, port, backlog: backlog, v6Only: v6Only, shared: shared)
return ServerSocket.bind(address, port,
backlog: backlog, v6Only: v6Only, shared: shared)
.then<HttpServer>((socket) {
return new _HttpServer._(socket, true);
});
@ -2517,8 +2541,7 @@ class _HttpServer extends Stream<HttpRequest>
bool v6Only,
bool requestClientCertificate,
bool shared) {
return SecureServerSocket
.bind(address, port, context,
return SecureServerSocket.bind(address, port, context,
backlog: backlog,
v6Only: v6Only,
requestClientCertificate: requestClientCertificate,
@ -2977,8 +3000,8 @@ class _SiteCredentials extends _Credentials {
if (scheme != null && credentials.scheme != scheme) return false;
if (uri.host != this.uri.host) return false;
int thisPort =
this.uri.port == 0 ? HttpClient.defaultHttpPort: this.uri.port;
int otherPort = uri.port == 0 ? HttpClient.defaultHttpPort: uri.port;
this.uri.port == 0 ? HttpClient.defaultHttpPort : this.uri.port;
int otherPort = uri.port == 0 ? HttpClient.defaultHttpPort : uri.port;
if (otherPort != thisPort) return false;
return uri.path.startsWith(this.uri.path);
}
@ -3116,14 +3139,14 @@ class _HttpClientDigestCredentials extends _HttpClientCredentials
}
void authorize(_Credentials credentials, HttpClientRequest request) {
request.headers
.set(HttpHeaders.authorizationHeader, authorization(credentials, request));
request.headers.set(
HttpHeaders.authorizationHeader, authorization(credentials, request));
}
void authorizeProxy(
_ProxyCredentials credentials, HttpClientRequest request) {
request.headers.set(
HttpHeaders.proxyAuthorizationHeader, authorization(credentials, request));
request.headers.set(HttpHeaders.proxyAuthorizationHeader,
authorization(credentials, request));
}
}

View file

@ -461,6 +461,12 @@ class RawSocket {
{sourceAddress, Duration timeout}) {
throw new UnsupportedError("RawSocket constructor");
}
@patch
static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
{sourceAddress}) {
throw new UnsupportedError("RawSocket constructor");
}
}
@patch
@ -470,6 +476,12 @@ class Socket {
{sourceAddress, Duration timeout}) {
throw new UnsupportedError("Socket constructor");
}
@patch
static Future<ConnectionTask<Socket>> _startConnect(host, int port,
{sourceAddress}) {
throw new UnsupportedError("Socket constructor");
}
}
@patch

View file

@ -83,6 +83,9 @@ abstract class IOOverrides {
Future<Socket> Function(dynamic, int,
{dynamic sourceAddress, Duration timeout})
socketConnect,
Future<ConnectionTask<Socket>> Function(dynamic, int,
{dynamic sourceAddress})
socketStartConnect,
// Optional Zone parameters
ZoneSpecification zoneSpecification,
@ -116,6 +119,7 @@ abstract class IOOverrides {
// Socket
socketConnect,
socketStartConnect,
);
return _asyncRunZoned<R>(body,
zoneValues: {_ioOverridesToken: overrides},
@ -254,12 +258,22 @@ abstract class IOOverrides {
/// Asynchronously returns a [Socket] connected to the given host and port.
///
/// When this override is installed, this functions overrides the behavior of
/// `Socet.connect(...)`.
/// `Socket.connect(...)`.
Future<Socket> socketConnect(host, int port,
{sourceAddress, Duration timeout}) {
return Socket._connect(host, port,
sourceAddress: sourceAddress, timeout: timeout);
}
/// Asynchronously returns a [ConnectionTask] that connects to the given host
/// and port when successful.
///
/// When this override is installed, this functions overrides the behavior of
/// `Socket.startConnect(...)`.
Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
{sourceAddress}) {
return Socket._startConnect(host, port, sourceAddress: sourceAddress);
}
}
class _IOOverridesScope extends IOOverrides {
@ -294,6 +308,8 @@ class _IOOverridesScope extends IOOverrides {
// Socket
Future<Socket> Function(dynamic, int,
{dynamic sourceAddress, Duration timeout}) _socketConnect;
Future<ConnectionTask<Socket>> Function(dynamic, int, {dynamic sourceAddress})
_socketStartConnect;
_IOOverridesScope(
// Directory
@ -324,6 +340,7 @@ class _IOOverridesScope extends IOOverrides {
// Socket
this._socketConnect,
this._socketStartConnect,
);
// Directory
@ -448,4 +465,17 @@ class _IOOverridesScope extends IOOverrides {
return super.socketConnect(host, port,
sourceAddress: sourceAddress, timeout: timeout);
}
@override
Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
{sourceAddress}) {
if (_socketStartConnect != null) {
return _socketStartConnect(host, port, sourceAddress: sourceAddress);
}
if (_previous != null) {
return _previous.socketStartConnect(host, port,
sourceAddress: sourceAddress);
}
return super.socketStartConnect(host, port, sourceAddress: sourceAddress);
}
}

View file

@ -46,8 +46,7 @@ abstract class SecureSocket implements Socket {
bool onBadCertificate(X509Certificate certificate),
List<String> supportedProtocols,
Duration timeout}) {
return RawSecureSocket
.connect(host, port,
return RawSecureSocket.connect(host, port,
context: context,
onBadCertificate: onBadCertificate,
supportedProtocols: supportedProtocols,
@ -55,6 +54,25 @@ abstract class SecureSocket implements Socket {
.then((rawSocket) => new SecureSocket._(rawSocket));
}
/// Like [connect], but returns a [Future] that completes with a
/// [ConnectionTask] that can be cancelled if the [SecureSocket] is no
/// longer needed.
static Future<ConnectionTask<SecureSocket>> startConnect(host, int port,
{SecurityContext context,
bool onBadCertificate(X509Certificate certificate),
List<String> supportedProtocols}) {
return RawSecureSocket.startConnect(host, port,
context: context,
onBadCertificate: onBadCertificate,
supportedProtocols: supportedProtocols)
.then((rawState) {
Future<SecureSocket> socket =
rawState.socket.then((rawSocket) => new SecureSocket._(rawSocket));
return new ConnectionTask<SecureSocket>._(
socket: socket, onCancel: rawState._onCancel);
});
}
/**
* Takes an already connected [socket] and starts client side TLS
* handshake to make the communication secure. When the returned
@ -215,6 +233,26 @@ abstract class RawSecureSocket implements RawSocket {
});
}
/// Like [connect], but returns a [Future] that completes with a
/// [ConnectionTask] that can be cancelled if the [RawSecureSocket] is no
/// longer needed.
static Future<ConnectionTask<RawSecureSocket>> startConnect(host, int port,
{SecurityContext context,
bool onBadCertificate(X509Certificate certificate),
List<String> supportedProtocols}) {
return RawSocket.startConnect(host, port)
.then((ConnectionTask<RawSocket> rawState) {
Future<RawSecureSocket> socket = rawState.socket.then((rawSocket) {
return secure(rawSocket,
context: context,
onBadCertificate: onBadCertificate,
supportedProtocols: supportedProtocols);
});
return new ConnectionTask<RawSecureSocket>._(
socket: socket, onCancel: rawState._onCancel);
});
}
/**
* Takes an already connected [socket] and starts client side TLS
* handshake to make the communication secure. When the returned
@ -990,8 +1028,7 @@ class _RawSecureSocket extends Stream<RawSocketEvent>
args[2 * i + 3] = bufs[i].end;
}
return _IOService
._dispatch(_IOService.sslProcessFilter, args)
return _IOService._dispatch(_IOService.sslProcessFilter, args)
.then((response) {
if (response.length == 2) {
if (wasInHandshake) {

View file

@ -426,6 +426,32 @@ class RawSocketEvent {
}
}
/// Returned by the `startConnect` methods on client-side socket types `S`,
/// `ConnectionTask<S>` allows cancelling an attempt to connect to a host.
class ConnectionTask<S> {
/// A `Future` that completes with value that `S.connect()` would return
/// unless [cancel] is called on this [ConnectionTask].
///
/// If [cancel] is called, the `Future` completes with a [SocketException]
/// error whose message indicates that the connection attempt was cancelled.
final Future<S> socket;
final void Function() _onCancel;
ConnectionTask._({Future<S> socket, void Function() onCancel})
: assert(socket != null),
assert(onCancel != null),
this.socket = socket,
this._onCancel = onCancel;
/// Cancels the connection attempt.
///
/// This also causes the [socket] `Future` to complete with a
/// [SocketException] error.
void cancel() {
_onCancel();
}
}
/**
* The [RawSocket] is a low-level interface to a socket, exposing the raw
* events signaled by the system. It's a [Stream] of [RawSocketEvent]s.
@ -470,6 +496,12 @@ abstract class RawSocket implements Stream<RawSocketEvent> {
external static Future<RawSocket> connect(host, int port,
{sourceAddress, Duration timeout});
/// Like [connect], but returns a [Future] that completes with a
/// [ConnectionTask] that can be cancelled if the [RawSocket] is no
/// longer needed.
external static Future<ConnectionTask<RawSocket>> startConnect(host, int port,
{sourceAddress});
/**
* Returns the number of received and non-read bytes in the socket that
* can be read.
@ -583,9 +615,25 @@ abstract class Socket implements Stream<List<int>>, IOSink {
sourceAddress: sourceAddress, timeout: timeout);
}
/// Like [connect], but returns a [Future] that completes with a
/// [ConnectionTask] that can be cancelled if the [Socket] is no
/// longer needed.
static Future<ConnectionTask<Socket>> startConnect(host, int port,
{sourceAddress}) {
final IOOverrides overrides = IOOverrides.current;
if (overrides == null) {
return Socket._startConnect(host, port, sourceAddress: sourceAddress);
}
return overrides.socketStartConnect(host, port,
sourceAddress: sourceAddress);
}
external static Future<Socket> _connect(host, int port,
{sourceAddress, Duration timeout});
external static Future<ConnectionTask<Socket>> _startConnect(host, int port,
{sourceAddress});
/**
* Destroy the socket in both directions. Calling [destroy] will make the
* send a close event on the stream and will no longer react on data being

View file

@ -162,6 +162,11 @@ Future<Socket> socketConnect(host, int port,
return null;
}
Future<ConnectionTask<Socket>> socketStartConnect(host, int port,
{sourceAddress}) {
return null;
}
Future<Null> ioOverridesRunTest() async {
Future<Null> f = IOOverrides.runZoned(
() async {
@ -181,6 +186,7 @@ Future<Null> ioOverridesRunTest() async {
Expect.isNull(new Directory("directory").watch());
Expect.isTrue(new Link("link") is LinkMock);
Expect.isNull(Socket.connect(null, 0));
Expect.isNull(Socket.startConnect(null, 0));
},
createDirectory: DirectoryMock.createDirectory,
getCurrentDirectory: DirectoryMock.getCurrent,
@ -197,6 +203,7 @@ Future<Null> ioOverridesRunTest() async {
fsWatchIsSupported: FileSystemWatcherMock.watchSupported,
createLink: LinkMock.createLink,
socketConnect: socketConnect,
socketStartConnect: socketStartConnect,
);
Expect.isFalse(new Directory("directory") is DirectoryMock);
Expect.isTrue(new Directory("directory") is Directory);

View file

@ -160,7 +160,8 @@ void test5(int totalConnections) {
return request.close();
})
.then((response) {})
.catchError((e) {}, test: (e) => e is HttpException);
.catchError((e) {},
test: (e) => e is HttpException || e is SocketException);
}
bool clientClosed = false;
new Timer.periodic(new Duration(milliseconds: 100), (timer) {

View file

@ -74,6 +74,21 @@ void testSimpleConnect() {
});
}
void testCancelConnect() {
asyncStart();
RawSocket.startConnect(InternetAddress.loopbackIPv4, 0)
.then((ConnectionTask<RawSocket> task) {
task.cancel();
task.socket.catchError((error) {
Expect.isTrue(error is SocketException);
asyncEnd();
});
task.socket.then((s) {
Expect.fail("Unreachable");
});
});
}
void testCloseOneEnd(String toClose) {
asyncStart();
Completer serverDone = new Completer();
@ -467,6 +482,7 @@ main() {
testCloseOneEnd("server");
testInvalidBind();
testSimpleConnect();
testCancelConnect();
testServerListenAfterConnect();
testSimpleReadWrite(dropReads: false);
testSimpleReadWrite(dropReads: true);

View file

@ -0,0 +1,32 @@
// Copyright (c) 2018, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
//
// VMOptions=
// VMOptions=--short_socket_read
// VMOptions=--short_socket_write
// VMOptions=--short_socket_read --short_socket_write
import "dart:async";
import "dart:io";
import "package:async_helper/async_helper.dart";
import "package:expect/expect.dart";
void main() {
asyncStart();
Duration timeout = new Duration(milliseconds: 20);
Socket.startConnect("8.8.8.7", 80).then((task) {
task.socket.timeout(timeout, onTimeout: () {
task.cancel();
});
task.socket.then((socket) {
Expect.fail("Unexpected connection made.");
asyncEnd();
}).catchError((e) {
print(e);
Expect.isTrue(e is SocketException);
asyncEnd();
});
});
}