Update HttpRequest and HttpClientResponse to be Stream<Uint8List>

Bug: https://github.com/dart-lang/sdk/issues/36900
Change-Id: I1306b2df7c789597e49d2033b660a3ea62d6c1a4
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/104525
Commit-Queue: Todd Volkert <tvolkert@google.com>
Reviewed-by: Lasse R.H. Nielsen <lrn@google.com>
This commit is contained in:
Todd Volkert 2019-06-27 00:21:07 +00:00 committed by commit-bot@chromium.org
parent 91c2c3537b
commit c92af46433
28 changed files with 116 additions and 62 deletions

View file

@ -6,37 +6,45 @@
### Core libraries
* As part of (Issue [36900][]), the following methods and properties across
various core libraries, which used to declare a return type of `List<int>`,
were updated to declare a return type of `Uint8List`:
* **Breaking change:** As part of (Issue [36900][]), the following methods and
properties across various core libraries, which used to declare a return type
of `List<int>`, were updated to declare a return type of `Uint8List`:
* `Utf8Codec.encode()` (and `Utf8Encoder.convert()`)
* `BytesBuilder.takeBytes()`
* `BytesBuilder.toBytes()`
* `Datagram.data`
* `File.readAsBytes()` (`Future<Uint8List>`)
* `File.readAsBytesSync()`
* `InternetAddress.rawAddress`
* `RandomAccessFile.read()` (`Future<Uint8List>`)
* `RandomAccessFile.readSync()`
* `InternetAddress.rawAddress`
* `RawSocket.read()`
* `Utf8Codec.encode()` (and `Utf8Encoder.convert()`)
In addition, the following typed lists were updated to have their `sublist()`
In addition, the following methods and classes were updated to return or
implement `Stream<Uint8List>` rather than `Stream<List<int>>`:
* `File.openRead()`
* `HttpRequest`
* `HttpClientResponse`
Finally, the following typed lists were updated to have their `sublist()`
methods declare a return type that is the same as the source list:
* `Uint8List.sublist()``Uint8List`
* `Int8List.sublist()``Int8List`
* `Uint8ClampedList.sublist()``Uint8ClampedList`
* `Int16List.sublist()``Int16List`
* `Uint16List.sublist()``Uint16List`
* `Int32List.sublist()``Int32List`
* `Uint32List.sublist()``Uint32List`
* `Int64List.sublist()``Int64List`
* `Uint64List.sublist()` → `Uint64List`
* `Int32x4List.sublist()` → `Int32x4List`
* `Float32List.sublist()``Float32List`
* `Float64List.sublist()``Float64List`
* `Float32x4List.sublist()``Float32x4List`
* `Int32x4List.sublist()``Int32x4List`
* `Float64x2List.sublist()``Float64x2List`
* `Uint8List.sublist()``Uint8List`
* `Uint8ClampedList.sublist()``Uint8ClampedList`
* `Uint16List.sublist()``Uint16List`
* `Uint32List.sublist()``Uint32List`
* `Uint64List.sublist()``Uint64List`
[36900]: https://github.com/dart-lang/sdk/issues/36900

2
DEPS
View file

@ -116,7 +116,7 @@ vars = {
"pub_rev": "ecd5b413271f2699f8cd9e23aa4eebb5030c964f",
"pub_semver_tag": "1.4.2",
"quiver-dart_tag": "2.0.0+1",
"resource_rev": "2.1.5",
"resource_rev": "f8e37558a1c4f54550aa463b88a6a831e3e33cd6",
"root_certificates_rev": "16ef64be64c7dfdff2b9f4b910726e635ccc519e",
"shelf_static_rev": "v0.2.8",
"shelf_packages_handler_tag": "1.0.4",

View file

@ -31,7 +31,7 @@ class GetServerPortTest extends AbstractAnalysisServerIntegrationTest {
HttpClientRequest request = await client
.getUrl(Uri.parse('http://localhost:${result.port}/status'));
HttpClientResponse response = await request.close();
String responseBody = await utf8.decodeStream(response);
String responseBody = await utf8.decodeStream(response.cast<List<int>>());
expect(responseBody, contains('<title>Analysis Server</title>'));
}
}

View file

@ -34,7 +34,7 @@ class ServerTest extends AbstractLspAnalysisServerIntegrationTest {
HttpClientRequest request = await client
.getUrl(Uri.parse('http://localhost:${server.port}/status'));
final response = await request.close();
final responseBody = await utf8.decodeStream(response);
final responseBody = await utf8.decodeStream(response.cast<List<int>>());
expect(responseBody, contains('<title>Analysis Server</title>'));
}

View file

@ -31,7 +31,7 @@ badRequest(HttpRequest request, int status, String message) {
}
collectLog(DateTime time, HttpRequest request) async {
String json = await request.transform(utf8.decoder).join();
String json = await request.cast<List<int>>().transform(utf8.decoder).join();
var data;
try {
data = jsonDecode(json);

View file

@ -1194,7 +1194,7 @@ class BrowserTestingServer {
errorReportingServer = server;
void errorReportingHandler(HttpRequest request) {
var buffer = StringBuffer();
request.transform(utf8.decoder).listen((data) {
request.cast<List<int>>().transform(utf8.decoder).listen((data) {
buffer.write(data);
}, onDone: () {
var back = buffer.toString();
@ -1285,7 +1285,7 @@ class BrowserTestingServer {
void handleReport(HttpRequest request, String browserId, int testId,
{bool isStatusUpdate}) {
var buffer = StringBuffer();
request.transform(utf8.decoder).listen((data) {
request.cast<List<int>>().transform(utf8.decoder).listen((data) {
buffer.write(data);
}, onDone: () {
var back = buffer.toString();
@ -1306,7 +1306,7 @@ class BrowserTestingServer {
// If an error occurs while receiving the data from the request stream,
// we don't handle it specially. We can safely ignore it, since the started
// events are not crucial.
request.transform(utf8.decoder).listen((data) {
request.cast<List<int>>().transform(utf8.decoder).listen((data) {
buffer.write(data);
}, onDone: () {
var back = buffer.toString();

View file

@ -223,7 +223,7 @@ class TestingServers {
void _handleEchoRequest(HttpRequest request) {
request.response.headers.set("Access-Control-Allow-Origin", "*");
request.pipe(request.response).catchError((e) {
request.cast<List<int>>().pipe(request.response).catchError((e) {
DebugLogger.warning(
'HttpServer: error while closing the response stream', e);
});

View file

@ -298,8 +298,8 @@ class Server {
String result;
try {
result = await _service.devfs.handlePutStream(
fsName, fsPath, fsUri, request.transform(GZIP.decoder));
result = await _service.devfs.handlePutStream(fsName, fsPath, fsUri,
request.cast<List<int>>().transform(GZIP.decoder));
} catch (e) {
request.response.statusCode = HttpStatus.internalServerError;
request.response.write(e);

View file

@ -12,7 +12,7 @@ import 'test_helper.dart';
Future<String> readResponse(HttpClientResponse response) {
var completer = new Completer<String>();
var contents = new StringBuffer();
response.transform(utf8.decoder).listen((String data) {
response.cast<List<int>>().transform(utf8.decoder).listen((String data) {
contents.write(data);
}, onDone: () => completer.complete(contents.toString()));
return completer.future;

View file

@ -12,7 +12,7 @@ import 'test_helper.dart';
Future<String> readResponse(HttpClientResponse response) {
var completer = new Completer<String>();
var contents = new StringBuffer();
response.transform(utf8.decoder).listen((String data) {
response.cast<List<int>>().transform(utf8.decoder).listen((String data) {
contents.write(data);
}, onDone: () => completer.complete(contents.toString()));
return completer.future;

View file

@ -12,7 +12,7 @@ import 'test_helper.dart';
Future<String> readResponse(HttpClientResponse response) {
var completer = new Completer<String>();
var contents = new StringBuffer();
response.transform(utf8.decoder).listen((String data) {
response.cast<List<int>>().transform(utf8.decoder).listen((String data) {
contents.write(data);
}, onDone: () => completer.complete(contents.toString()));
return completer.future;

View file

@ -22,6 +22,7 @@ Future<String> getIsolateId(io.HttpClient httpClient, Uri serverUri) async {
final requestUri = serverUri.replace(pathSegments: pathSegments);
var request = await httpClient.getUrl(requestUri);
Map response = await (await request.close())
.cast<List<int>>()
.transform(utf8.decoder)
.transform(json.decoder)
.first;
@ -55,6 +56,7 @@ Future<Null> testeeBefore() async {
try {
var request = await httpClient.getUrl(requestUri);
Map response = await (await request.close())
.cast<List<int>>()
.transform(utf8.decoder)
.transform(json.decoder)
.first;

View file

@ -31,6 +31,7 @@ Future<Null> testeeBefore() async {
try {
var request = await httpClient.getUrl(requestUri);
Map response = await (await request.close())
.cast<List<int>>()
.transform(utf8.decoder)
.transform(json.decoder)
.first;

View file

@ -1051,7 +1051,7 @@ abstract class Cookie {
* res.close();
* }
*/
abstract class HttpRequest implements Stream<List<int>> {
abstract class HttpRequest implements Stream<Uint8List> {
/**
* The content length of the request body.
*
@ -1947,7 +1947,7 @@ abstract class HttpClientRequest implements IOSink {
* });
* });
*/
abstract class HttpClientResponse implements Stream<List<int>> {
abstract class HttpClientResponse implements Stream<Uint8List> {
/**
* Returns the status code.
*

View file

@ -125,10 +125,10 @@ const int _OUTGOING_BUFFER_SIZE = 8 * 1024;
typedef void _BytesConsumer(List<int> bytes);
class _HttpIncoming extends Stream<List<int>> {
class _HttpIncoming extends Stream<Uint8List> {
final int _transferLength;
final Completer _dataCompleter = new Completer();
Stream<List<int>> _stream;
Stream<Uint8List> _stream;
bool fullBodyRead = false;
@ -154,7 +154,7 @@ class _HttpIncoming extends Stream<List<int>> {
_HttpIncoming(this.headers, this._transferLength, this._stream);
StreamSubscription<List<int>> listen(void onData(List<int> event),
StreamSubscription<Uint8List> listen(void onData(Uint8List event),
{Function onError, void onDone(), bool cancelOnError}) {
hasSubscriber = true;
return _stream.handleError((error) {
@ -173,7 +173,7 @@ class _HttpIncoming extends Stream<List<int>> {
}
}
abstract class _HttpInboundMessage extends Stream<List<int>> {
abstract class _HttpInboundMessage extends Stream<Uint8List> {
final _HttpIncoming _incoming;
List<Cookie> _cookies;
@ -225,7 +225,7 @@ class _HttpRequest extends _HttpInboundMessage implements HttpRequest {
}
}
StreamSubscription<List<int>> listen(void onData(List<int> event),
StreamSubscription<Uint8List> listen(void onData(Uint8List event),
{Function onError, void onDone(), bool cancelOnError}) {
return _incoming.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
@ -382,18 +382,21 @@ class _HttpClientResponse extends _HttpInboundMessage
});
}
StreamSubscription<List<int>> listen(void onData(List<int> event),
StreamSubscription<Uint8List> listen(void onData(Uint8List event),
{Function onError, void onDone(), bool cancelOnError}) {
if (_incoming.upgraded) {
// If upgraded, the connection is already 'removed' form the client.
// Since listening to upgraded data is 'bogus', simply close and
// return empty stream subscription.
_httpRequest._httpClientConnection.destroy();
return new Stream<List<int>>.empty().listen(null, onDone: onDone);
return new Stream<Uint8List>.empty().listen(null, onDone: onDone);
}
Stream<List<int>> stream = _incoming;
Stream<Uint8List> stream = _incoming;
if (compressionState == HttpClientResponseCompressionState.decompressed) {
stream = stream.transform(gzip.decoder);
stream = stream
.cast<List<int>>()
.transform(gzip.decoder)
.transform(const _ToUint8List());
}
return stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
@ -534,6 +537,30 @@ class _HttpClientResponse extends _HttpInboundMessage
}
}
class _ToUint8List extends Converter<List<int>, Uint8List> {
const _ToUint8List();
Uint8List convert(List<int> input) => Uint8List.fromList(input);
Sink<List<int>> startChunkedConversion(Sink<Uint8List> sink) {
return _Uint8ListConversionSink(sink);
}
}
class _Uint8ListConversionSink implements Sink<List<int>> {
const _Uint8ListConversionSink(this._target);
final Sink<Uint8List> _target;
void add(List<int> data) {
_target.add(Uint8List.fromList(data));
}
void close() {
_target.close();
}
}
class _StreamSinkImpl<T> implements StreamSink<T> {
final StreamConsumer<T> _target;
final Completer _doneCompleter = new Completer();

View file

@ -265,7 +265,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
bool _paused = true;
bool _bodyPaused = false;
StreamController<_HttpIncoming> _controller;
StreamController<List<int>> _bodyController;
StreamController<Uint8List> _bodyController;
factory _HttpParser.requestParser() {
return new _HttpParser._(true);
@ -762,7 +762,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
// Always present the data as a view. This way we can handle all
// cases like this, and the user will not experience different data
// typed (which could lead to polymorphic user code).
List<int> data = new Uint8List.view(
Uint8List data = new Uint8List.view(
_buffer.buffer, _buffer.offsetInBytes + _index, dataAvailable);
_bodyController.add(data);
if (_remainingContent != -1) {
@ -993,7 +993,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
assert(_bodyController == null);
assert(!_bodyPaused);
var incoming;
_bodyController = new StreamController<List<int>>(
_bodyController = new StreamController<Uint8List>(
sync: true,
onListen: () {
if (incoming != _incoming) return;

View file

@ -104,7 +104,7 @@ class TestServer {
var response = request.response;
Expect.equals("POST", request.method);
response.contentLength = request.contentLength;
request.pipe(response);
request.cast<List<int>>().pipe(response);
}
// Echo the request content back to the response.

View file

@ -17,7 +17,7 @@ import "package:expect/expect.dart";
void testGetEmptyRequest() {
HttpServer.bind("127.0.0.1", 0).then((server) {
server.listen((request) {
request.pipe(request.response);
request.cast<List<int>>().pipe(request.response);
});
var client = new HttpClient();
@ -35,7 +35,7 @@ void testGetDataRequest() {
var data = "lalala".codeUnits;
server.listen((request) {
request.response.add(data);
request.pipe(request.response);
request.cast<List<int>>().pipe(request.response);
});
var client = new HttpClient();
@ -163,7 +163,7 @@ void testOpenEmptyRequest() {
HttpServer.bind("127.0.0.1", 0).then((server) {
server.listen((request) {
Expect.equals(method[1], request.method);
request.pipe(request.response);
request.cast<List<int>>().pipe(request.response);
});
Callback1 cb = method[0] as Callback1;
@ -192,7 +192,7 @@ void testOpenUrlEmptyRequest() {
HttpServer.bind("127.0.0.1", 0).then((server) {
server.listen((request) {
Expect.equals(method[1], request.method);
request.pipe(request.response);
request.cast<List<int>>().pipe(request.response);
});
Callback2 cb = method[0] as Callback2;
@ -220,8 +220,10 @@ void testNoBuffer() {
.get("127.0.0.1", server.port, "/")
.then((request) => request.close())
.then((clientResponse) {
var iterator = new StreamIterator(
clientResponse.transform(utf8.decoder).transform(new LineSplitter()));
var iterator = new StreamIterator(clientResponse
.cast<List<int>>()
.transform(utf8.decoder)
.transform(new LineSplitter()));
iterator.moveNext().then((hasValue) {
Expect.isTrue(hasValue);
Expect.equals('init', iterator.current);

View file

@ -27,15 +27,14 @@ void main(List<String> args) {
Future makeServer() {
return HttpServer.bind(InternetAddress.loopbackIPv4, 0).then((server) {
server.listen((request) {
request.pipe(request.response);
request.cast<List<int>>().pipe(request.response);
});
return server;
});
}
Future runClientProcess(int port) {
return Process
.run(
return Process.run(
Platform.executable,
[]
..addAll(Platform.executableArguments)

View file

@ -35,8 +35,9 @@ class Server {
Future<Server> start() {
return (secure
? HttpServer.bindSecure("localhost", 0, serverContext)
: HttpServer.bind("localhost", 0)).then((s) {
? HttpServer.bindSecure("localhost", 0, serverContext)
: HttpServer.bind("localhost", 0))
.then((s) {
server = s;
server.listen(requestHandler);
return this;
@ -256,9 +257,11 @@ class ProxyServer {
clientRequest.headers
.add(HttpHeaders.viaHeader, "${viaPrefix}1.1 localhost:$port");
// Copy all content.
return request.pipe(clientRequest);
return request.cast<List<int>>().pipe(clientRequest);
}).then((clientResponse) {
(clientResponse as HttpClientResponse).pipe(request.response);
(clientResponse as HttpClientResponse)
.cast<List<int>>()
.pipe(request.response);
});
}
});

View file

@ -34,8 +34,9 @@ class Server {
Future<Server> start() {
return (secure
? HttpServer.bindSecure("localhost", 0, serverContext)
: HttpServer.bind("localhost", 0)).then((s) {
? HttpServer.bindSecure("localhost", 0, serverContext)
: HttpServer.bind("localhost", 0))
.then((s) {
server = s;
server.listen(requestHandler);
return this;
@ -244,9 +245,11 @@ class ProxyServer {
clientRequest.headers
.add(HttpHeaders.viaHeader, "${viaPrefix}1.1 localhost:$port");
// Copy all content.
return request.pipe(clientRequest);
return request.cast<List<int>>().pipe(clientRequest);
}).then((clientResponse) {
(clientResponse as HttpClientResponse).pipe(request.response);
(clientResponse as HttpClientResponse)
.cast<List<int>>()
.pipe(request.response);
});
}
});

View file

@ -107,7 +107,7 @@ class TestServer {
var response = request.response;
Expect.equals("POST", request.method);
response.contentLength = request.contentLength;
request.pipe(response);
request.cast<List<int>>().pipe(response);
}
// Return a 404.

View file

@ -18,7 +18,7 @@ Future<int> runServer(int port, int connections, bool clean) {
HttpServer.bind("127.0.0.1", port).then((server) {
int i = 0;
server.listen((request) {
request.pipe(request.response);
request.cast<List<int>>().pipe(request.response);
i++;
if (!clean && i == 10) {
int port = server.port;

View file

@ -141,6 +141,7 @@ Future<String> loadLog(String id, String step) async {
throw new Exception("Failed to download $logUrl: ${response.statusCode}");
}
final contents = (await response
.cast<List<int>>()
.transform(new Utf8Decoder())
.timeout(const Duration(seconds: 60))
.toList())
@ -278,6 +279,7 @@ Future<Map<String, dynamic>> loadJsonPrefixedAPI(String url) async {
throw new Exception("Failed to request $url: ${response.statusCode}");
}
final text = await response
.cast<List<int>>()
.transform(utf8.decoder)
.join()
.timeout(const Duration(seconds: 30));
@ -462,6 +464,7 @@ ${parser.usage}""");
throw new Exception("Failed to request try runs for $gerrit");
}
final Map<String, dynamic> object = await response
.cast<List<int>>()
.transform(new Utf8Decoder())
.transform(new JsonDecoder())
.first

View file

@ -60,6 +60,7 @@ ${parser.usage}""");
final request = await client.getUrl(url).timeout(timeout);
final response = await request.close().timeout(timeout);
object = await response
.cast<List<int>>()
.transform(new Utf8Decoder())
.transform(new JsonDecoder())
.first

View file

@ -97,7 +97,7 @@ class LuciApi {
response.drain();
throw new HttpException(response.reasonPhrase, uri: uri);
}
return response.transform(utf8.decoder).join();
return response.cast<List<int>>().transform(utf8.decoder).join();
}
/// [_makePostRequest] performs a post request to [uri], where the posted

View file

@ -88,9 +88,13 @@ Future<String> readUriAsText(
throw new HttpException(uri, response.statusCode);
}
if (timeout != null) {
return response.timeout(timeout).transform(utf8.decoder).join();
return response
.timeout(timeout)
.cast<List<int>>()
.transform(utf8.decoder)
.join();
} else {
return response.transform(utf8.decoder).join();
return response.cast<List<int>>().transform(utf8.decoder).join();
}
}

View file

@ -261,6 +261,7 @@ Future<BuildSearchResult> searchForBuild(String builder, String commit) async {
final request = await client.getUrl(requestUrl);
final response = await request.close();
final Map<String, dynamic> object = await response
.cast<List<int>>()
.transform(new Utf8Decoder())
.transform(new JsonDecoder())
.first;