Update Socket to be a Stream<Uint8List>

Bug: https://github.com/dart-lang/sdk/issues/36900
Change-Id: I600c28aebbe35f9e650f969adf356dda4eb0cacd
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/104524
Commit-Queue: Todd Volkert <tvolkert@google.com>
Reviewed-by: Lasse R.H. Nielsen <lrn@google.com>
This commit is contained in:
Todd Volkert 2019-07-08 16:42:47 +00:00 committed by commit-bot@chromium.org
parent a85f6ff348
commit cab2ca275d
19 changed files with 77 additions and 59 deletions

View file

@ -28,6 +28,28 @@
* `File.openRead()`
* `HttpRequest`
* `HttpClientResponse`
* `Socket`
**Possible errors and how to fix them**
* > The argument type 'Utf8Decoder' can't be assigned to the parameter type 'StreamTransformer<Uint8List, dynamic>'
> type 'Utf8Decoder' is not a subtype of type 'StreamTransformer' of 'streamTransformer'"
You can fix these call sites by updating your code to use
`StreamTransformer.bind()` instead of `Stream.transform()`, like so:
*Before:* `stream.transform(utf8.decoder)`
*After:* `utf8.decoder.bind(stream)`
* > The argument type 'IOSink' can't be assigned to the parameter type 'StreamConsumer<Uint8List>'
> type '_IOSinkImpl' is not a subtype of type 'StreamConsumer<Uint8List>' of 'streamConsumer'
You can fix these call sites by casting your stream instance to a `Stream<List<int>>` before calling `.pipe()` on the stream, like so:
*Before:* `stream.pipe(consumer)`
*After:* `stream.cast<List<int>>().pipe(consumer)`
Finally, the following typed lists were updated to have their `sublist()`
methods declare a return type that is the same as the source list:

2
DEPS
View file

@ -85,7 +85,7 @@ vars = {
"fixnum_tag": "0.10.9",
"glob_tag": "1.1.7",
"html_tag" : "0.14.0+1",
"http_io_rev": "0b05781c273a040ef521b5f7771dbc0356305872",
"http_io_rev": "2fa188caf7937e313026557713f7feffedd4978b",
"http_multi_server_tag" : "2.0.5",
"http_parser_tag" : "3.1.3",
"http_retry_tag": "0.1.1",

View file

@ -1606,10 +1606,10 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> {
}
}
class _Socket extends Stream<List<int>> implements Socket {
class _Socket extends Stream<Uint8List> implements Socket {
RawSocket _raw; // Set to null when the raw socket is closed.
bool _closed = false; // Set to true when the raw socket is closed.
StreamController<List<int>> _controller;
StreamController<Uint8List> _controller;
bool _controllerClosed = false;
_SocketStreamConsumer _consumer;
IOSink _sink;
@ -1617,7 +1617,7 @@ class _Socket extends Stream<List<int>> implements Socket {
var _detachReady;
_Socket(this._raw) {
_controller = new StreamController<List<int>>(
_controller = new StreamController<Uint8List>(
sync: true,
onListen: _onSubscriptionStateChange,
onCancel: _onSubscriptionStateChange,
@ -1647,7 +1647,7 @@ class _Socket extends Stream<List<int>> implements Socket {
// is Socket and not _NativeSocket.
_NativeSocket get _nativeSocket => (_raw as _RawSocket)._socket;
StreamSubscription<List<int>> listen(void onData(List<int> event),
StreamSubscription<Uint8List> listen(void onData(Uint8List event),
{Function onError, void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);

View file

@ -14,7 +14,7 @@ Future setupTCP() async {
// to allow us to query them from the other isolate.
var serverSocket = await io.ServerSocket.bind('127.0.0.1', 0);
serverSocket.listen((s) {
s.transform(utf8.decoder).listen(print);
utf8.decoder.bind(s).listen(print);
s.close();
});
var socket = await io.Socket.connect("127.0.0.1", serverSocket.port);

View file

@ -2892,13 +2892,13 @@ class _HttpConnectionInfo implements HttpConnectionInfo {
}
}
class _DetachedSocket extends Stream<List<int>> implements Socket {
final Stream<List<int>> _incoming;
class _DetachedSocket extends Stream<Uint8List> implements Socket {
final Stream<Uint8List> _incoming;
final Socket _socket;
_DetachedSocket(this._socket, this._incoming);
StreamSubscription<List<int>> listen(void onData(List<int> event),
StreamSubscription<Uint8List> listen(void onData(Uint8List event),
{Function onError, void onDone(), bool cancelOnError}) {
return _incoming.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);

View file

@ -108,9 +108,9 @@ class _MessageType {
* _HttpDetachedStreamSubscription is resumed, it'll deliver the data before
* resuming the underlaying subscription.
*/
class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> {
StreamSubscription<List<int>> _subscription;
List<int> _injectData;
class _HttpDetachedStreamSubscription implements StreamSubscription<Uint8List> {
StreamSubscription<Uint8List> _subscription;
Uint8List _injectData;
bool _isCanceled = false;
int _pauseCount = 1;
Function _userOnData;
@ -130,7 +130,7 @@ class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> {
return _subscription.cancel();
}
void onData(void handleData(List<int> data)) {
void onData(void handleData(Uint8List data)) {
_userOnData = handleData;
_subscription.onData(handleData);
}
@ -182,13 +182,13 @@ class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> {
}
}
class _HttpDetachedIncoming extends Stream<List<int>> {
final StreamSubscription<List<int>> subscription;
final List<int> bufferedData;
class _HttpDetachedIncoming extends Stream<Uint8List> {
final StreamSubscription<Uint8List> subscription;
final Uint8List bufferedData;
_HttpDetachedIncoming(this.subscription, this.bufferedData);
StreamSubscription<List<int>> listen(void onData(List<int> event),
StreamSubscription<Uint8List> listen(void onData(Uint8List event),
{Function onError, void onDone(), bool cancelOnError}) {
if (subscription != null) {
subscription
@ -203,7 +203,7 @@ class _HttpDetachedIncoming extends Stream<List<int>> {
..resume();
} else {
// TODO(26379): add test for this branch.
return new Stream<List<int>>.fromIterable([bufferedData]).listen(onData,
return new Stream<Uint8List>.fromIterable([bufferedData]).listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
}
@ -261,7 +261,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
// The current incoming connection.
_HttpIncoming _incoming;
StreamSubscription<List<int>> _socketSubscription;
StreamSubscription<Uint8List> _socketSubscription;
bool _paused = true;
bool _bodyPaused = false;
StreamController<_HttpIncoming> _controller;
@ -303,7 +303,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
void listenToStream(Stream<List<int>> stream) {
void listenToStream(Stream<Uint8List> stream) {
// Listen to the stream and handle data accordingly. When a
// _HttpIncoming is created, _dataPause, _dataResume, _dataDone is
// given to provide a way of controlling the parser.
@ -802,7 +802,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
}
}
void _onData(List<int> buffer) {
void _onData(Uint8List buffer) {
_socketSubscription.pause();
assert(_buffer == null);
_buffer = buffer;
@ -888,7 +888,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
return new _HttpDetachedIncoming(_socketSubscription, readUnparsedData());
}
List<int> readUnparsedData() {
Uint8List readUnparsedData() {
if (_buffer == null) return null;
if (_index == _buffer.length) return null;
var result = _buffer.sublist(_index);

View file

@ -1128,7 +1128,7 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
_deflate = deflate;
var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
_subscription = _socket.transform(transformer).listen((data) {
_subscription = transformer.bind(_socket).listen((data) {
if (data is _WebSocketPing) {
if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
} else if (data is _WebSocketPong) {

View file

@ -710,7 +710,7 @@ abstract class RawSocket implements Stream<RawSocketEvent> {
* The [Socket] exposes both a [Stream] and a [IOSink] interface, making it
* ideal for using together with other [Stream]s.
*/
abstract class Socket implements Stream<List<int>>, IOSink {
abstract class Socket implements Stream<Uint8List>, IOSink {
/**
* Creates a new socket connection to the host and port and returns a [Future]
* that will complete with either a [Socket] once connected or an error

View file

@ -237,8 +237,6 @@
"Dynamic invocation of 'close'.": 1
},
"org-dartlang-sdk:///sdk/lib/_http/websocket_impl.dart": {
"Dynamic invocation of 'transform'.": 1,
"Dynamic invocation of 'listen'.": 1,
"Dynamic access of 'address'.": 1,
"Dynamic access of 'host'.": 1,
"Dynamic access of 'port'.": 1,

View file

@ -15,7 +15,7 @@ void clientSocketAddCloseNoErrorTest() {
var completer = new Completer();
server.listen((socket) {
// The socket is 'paused' until the future completes.
completer.future.then((_) => socket.pipe(socket));
completer.future.then((_) => socket.cast<List<int>>().pipe(socket));
});
Socket.connect("127.0.0.1", server.port).then((client) {
const int SIZE = 1024 * 1024;

View file

@ -13,7 +13,7 @@ import "package:expect/expect.dart";
void clientSocketDestroyNoErrorTest() {
ServerSocket.bind("127.0.0.1", 0).then((server) {
server.listen((socket) {
socket.pipe(socket);
socket.cast<List<int>>().pipe(socket);
});
Socket.connect("127.0.0.1", server.port).then((client) {
client.listen((data) {}, onDone: server.close);

View file

@ -151,7 +151,7 @@ void testUpgradedConnection() {
if (request.headers.value('upgrade') == 'mine') {
asyncStart();
request.response.detachSocket().then((socket) {
socket.pipe(socket).then((_) {
socket.cast<List<int>>().pipe(socket).then((_) {
asyncEnd();
});
});

View file

@ -39,7 +39,7 @@ class HttpParserTest {
int unparsedLength: 0,
bool connectionClose: false,
String expectedVersion: "1.1"}) {
StreamController<List<int>> controller;
StreamController<Uint8List> controller;
void reset() {
_HttpParser httpParser = new _HttpParser.requestParser();
controller = new StreamController(sync: true);
@ -162,7 +162,7 @@ class HttpParserTest {
static void _testParseInvalidRequest(String request) {
_HttpParser httpParser;
bool errorCalled;
StreamController<List<int>> controller;
StreamController<Uint8List> controller;
void reset() {
httpParser = new _HttpParser.requestParser();
@ -214,7 +214,7 @@ class HttpParserTest {
bool upgrade: false,
int unparsedLength: 0,
String expectedVersion: "1.1"}) {
StreamController<List<int>> controller;
StreamController<Uint8List> controller;
bool upgraded;
void reset() {
@ -309,7 +309,7 @@ class HttpParserTest {
static void _testParseInvalidResponse(String response, [bool close = false]) {
void testWrite(List<int> requestData, [int chunkSize = -1]) {
_HttpParser httpParser = new _HttpParser.responseParser();
StreamController<List<int>> controller = new StreamController(sync: true);
StreamController<Uint8List> controller = new StreamController(sync: true);
bool errorCalled = false;
;

View file

@ -234,8 +234,8 @@ class ProxyServer {
Socket.connect(tmp[0], int.parse(tmp[1])).then((socket) {
request.response.reasonPhrase = "Connection established";
request.response.detachSocket().then((detached) {
socket.pipe(detached);
detached.pipe(socket);
socket.cast<List<int>>().pipe(detached);
detached.cast<List<int>>().pipe(socket);
});
});
} else {

View file

@ -222,8 +222,8 @@ class ProxyServer {
Socket.connect(tmp[0], int.parse(tmp[1])).then((socket) {
request.response.reasonPhrase = "Connection established";
request.response.detachSocket().then((detached) {
socket.pipe(detached);
detached.pipe(socket);
socket.cast<List<int>>().pipe(detached);
detached.cast<List<int>>().pipe(socket);
});
});
} else {

View file

@ -51,7 +51,7 @@ class PipeServerGame {
var dstFile = new File(dstFileName);
dstFile.createSync();
var fileOutput = dstFile.openWrite();
_socket.pipe(fileOutput).then((_) {
_socket.cast<List<int>>().pipe(fileOutput).then((_) {
// Check that the resulting file is equal to the initial
// file.
bool result = compareFileContent(srcFileName, dstFileName);
@ -109,7 +109,7 @@ void startPipeServer(Object replyPortObj) {
// stream to its output stream.
class PipeServer extends TestingServer {
void onConnection(Socket connection) {
connection.pipe(connection);
connection.cast<List<int>>().pipe(connection);
}
}

View file

@ -33,8 +33,7 @@ void testSuccessfulAlpnNegotiationConnection(List<String> clientProtocols,
List<String> serverProtocols, String selectedProtocol) {
asyncStart();
var sContext = serverContext()..setAlpnProtocols(serverProtocols, true);
SecureServerSocket
.bind('localhost', 0, sContext)
SecureServerSocket.bind('localhost', 0, sContext)
.then((SecureServerSocket server) {
asyncStart();
server.first.then((SecureSocket socket) {
@ -42,22 +41,21 @@ void testSuccessfulAlpnNegotiationConnection(List<String> clientProtocols,
socket
..write('server message')
..close();
socket.transform(ascii.decoder).join('').then((String s) {
ascii.decoder.bind(socket).join('').then((String s) {
Expect.equals('client message', s);
asyncEnd();
});
});
asyncStart();
SecureSocket
.connect('localhost', server.port,
SecureSocket.connect('localhost', server.port,
context: clientContext(), supportedProtocols: clientProtocols)
.then((socket) {
Expect.equals(selectedProtocol, socket.selectedProtocol);
socket
..write('client message')
..close();
socket.transform(ascii.decoder).join('').then((String s) {
ascii.decoder.bind(socket).join('').then((String s) {
Expect.equals('server message', s);
server.close();
asyncEnd();
@ -108,8 +106,7 @@ void testInvalidArgumentClientConnect(
});
asyncStart();
SecureSocket
.connect('localhost', server.port,
SecureSocket.connect('localhost', server.port,
context: clientContext(), supportedProtocols: protocols)
.then((socket) {
Expect.fail(

View file

@ -26,14 +26,13 @@ SecurityContext serverContext = new SecurityContext()
password: 'dartdart');
Future<SecureServerSocket> runServer() {
return SecureServerSocket
.bind(HOST_NAME, 0, serverContext)
return SecureServerSocket.bind(HOST_NAME, 0, serverContext)
.then((SecureServerSocket server) {
server.listen((SecureSocket socket) {
Expect.isNull(socket.peerCertificate);
StreamIterator<String> input = new StreamIterator(
socket.transform(utf8.decoder).transform(new LineSplitter()));
utf8.decoder.bind(socket).transform(new LineSplitter()));
input.moveNext().then((success) {
Expect.isTrue(success);
Expect.equals('first', input.current);
@ -66,12 +65,15 @@ Future<SecureServerSocket> runServer() {
void main() {
runServer().then((SecureServerSocket server) {
var clientScript =
Platform.script.resolve('secure_socket_renegotiate_client.dart').toFilePath();
Process
.run(Platform.executable,
[]..addAll(Platform.executableArguments)..addAll([clientScript, server.port.toString()])).then(
(ProcessResult result) {
var clientScript = Platform.script
.resolve('secure_socket_renegotiate_client.dart')
.toFilePath();
Process.run(
Platform.executable,
[]
..addAll(Platform.executableArguments)
..addAll([clientScript, server.port.toString()]))
.then((ProcessResult result) {
if (result.exitCode != 0) {
print("Client failed, stdout:");
print(result.stdout);

View file

@ -27,15 +27,14 @@ void main(List<String> args) {
Future makeServer() {
return ServerSocket.bind(InternetAddress.loopbackIPv4, 0).then((server) {
server.listen((request) {
request.pipe(request);
request.cast<List<int>>().pipe(request);
});
return server;
});
}
Future runClientProcess(int port) {
return Process
.run(
return Process.run(
Platform.executable,
[]
..addAll(Platform.executableArguments)