diff --git a/packages/flutter/lib/src/foundation/consolidate_response.dart b/packages/flutter/lib/src/foundation/consolidate_response.dart index 146ca250c83..b87501deb31 100644 --- a/packages/flutter/lib/src/foundation/consolidate_response.dart +++ b/packages/flutter/lib/src/foundation/consolidate_response.dart @@ -3,31 +3,121 @@ // found in the LICENSE file. import 'dart:async'; +import 'dart:convert'; import 'dart:io'; import 'dart:typed_data'; -/// Efficiently converts the response body of an [HttpClientResponse] into a [Uint8List]. +/// Signature for getting notified when chunks of bytes are received while +/// consolidating the bytes of an [HttpClientResponse] into a [Uint8List]. /// -/// The future returned will forward all errors emitted by [response]. -Future consolidateHttpClientResponseBytes(HttpClientResponse response) { - // response.contentLength is not trustworthy when GZIP is involved - // or other cases where an intermediate transformer has been applied - // to the stream. +/// The `cumulative` parameter will contain the total number of bytes received +/// thus far. If the response has been gzipped, this number will be the number +/// of compressed bytes that have been received _across the wire_. +/// +/// The `total` parameter will contain the _expected_ total number of bytes to +/// be received across the wire (extracted from the value of the +/// `Content-Length` HTTP response header), or -1 if the size of the response +/// body is not known in advance (this is common for HTTP chunked transfer +/// encoding, which itself is common when a large amount of data is being +/// returned to the client and the total size of the response may not be known +/// until the request has been fully processed). +/// +/// This is used in [consolidateHttpClientResponseBytes]. +typedef BytesReceivedCallback = void Function(int cumulative, int total); + +/// Efficiently converts the response body of an [HttpClientResponse] into a +/// [Uint8List]. +/// +/// The future returned will forward any error emitted by `response`. +/// +/// The `onBytesReceived` callback, if specified, will be invoked for every +/// chunk of bytes that is received while consolidating the response bytes. +/// If the callback throws an error, processing of the response will halt, and +/// the returned future will complete with the error that was thrown by the +/// callback. For more information on how to interpret the parameters to the +/// callback, see the documentation on [BytesReceivedCallback]. +/// +/// If the `response` is gzipped and the `autoUncompress` parameter is true, +/// this will automatically un-compress the bytes in the returned list if it +/// hasn't already been done via [HttpClient.autoUncompress]. To get compressed +/// bytes from this method (assuming the response is sending compressed bytes), +/// set both [HttpClient.autoUncompress] to false and the `autoUncompress` +/// parameter to false. +// TODO(tvolkert): Remove the [client] param once https://github.com/dart-lang/sdk/issues/36971 is fixed. +Future consolidateHttpClientResponseBytes( + HttpClientResponse response, { + HttpClient client, + bool autoUncompress = true, + BytesReceivedCallback onBytesReceived, +}) { + assert(autoUncompress != null); final Completer completer = Completer.sync(); - final List> chunks = >[]; - int contentLength = 0; - response.listen((List chunk) { - chunks.add(chunk); - contentLength += chunk.length; - }, onDone: () { - final Uint8List bytes = Uint8List(contentLength); - int offset = 0; - for (List chunk in chunks) { - bytes.setRange(offset, offset + chunk.length, chunk); - offset += chunk.length; + + final _OutputBuffer output = _OutputBuffer(); + ByteConversionSink sink = output; + int expectedContentLength = response.contentLength; + if (response.headers?.value(HttpHeaders.contentEncodingHeader) == 'gzip') { + if (client?.autoUncompress ?? true) { + // response.contentLength will not match our bytes stream, so we declare + // that we don't know the expected content length. + expectedContentLength = -1; + } else if (autoUncompress) { + // We need to un-compress the bytes as they come in. + sink = gzip.decoder.startChunkedConversion(output); } - completer.complete(bytes); + } + + int bytesReceived = 0; + StreamSubscription> subscription; + subscription = response.listen((List chunk) { + sink.add(chunk); + if (onBytesReceived != null) { + bytesReceived += chunk.length; + try { + onBytesReceived(bytesReceived, expectedContentLength); + } catch (error, stackTrace) { + completer.completeError(error, stackTrace); + subscription.cancel(); + return; + } + } + }, onDone: () { + sink.close(); + completer.complete(output.bytes); }, onError: completer.completeError, cancelOnError: true); return completer.future; } + +class _OutputBuffer extends ByteConversionSinkBase { + List> _chunks = >[]; + int _contentLength = 0; + Uint8List _bytes; + + @override + void add(List chunk) { + assert(_bytes == null); + _chunks.add(chunk); + _contentLength += chunk.length; + } + + @override + void close() { + if (_bytes != null) { + // We've already been closed; this is a no-op + return; + } + _bytes = Uint8List(_contentLength); + int offset = 0; + for (List chunk in _chunks) { + _bytes.setRange(offset, offset + chunk.length, chunk); + offset += chunk.length; + } + _chunks = null; + } + + Uint8List get bytes { + assert(_bytes != null); + return _bytes; + } +} diff --git a/packages/flutter/test/foundation/consolidate_response_test.dart b/packages/flutter/test/foundation/consolidate_response_test.dart index cd1d84d73d6..507b66a27ee 100644 --- a/packages/flutter/test/foundation/consolidate_response_test.dart +++ b/packages/flutter/test/foundation/consolidate_response_test.dart @@ -14,10 +14,17 @@ void main() { group(consolidateHttpClientResponseBytes, () { final List chunkOne = [0, 1, 2, 3, 4, 5]; final List chunkTwo = [6, 7, 8, 9, 10]; + MockHttpClient client; MockHttpClientResponse response; + MockHttpHeaders headers; setUp(() { + client = MockHttpClient(); response = MockHttpClientResponse(); + headers = MockHttpHeaders(); + when(client.autoUncompress).thenReturn(true); + when(response.headers).thenReturn(headers); + when(headers.value(HttpHeaders.contentEncodingHeader)).thenReturn(null); when(response.listen( any, onDone: anyNamed('onDone'), @@ -43,7 +50,7 @@ void main() { when(response.contentLength) .thenReturn(chunkOne.length + chunkTwo.length); final List bytes = - await consolidateHttpClientResponseBytes(response); + await consolidateHttpClientResponseBytes(response, client: client); expect(bytes, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); }); @@ -51,7 +58,7 @@ void main() { test('Converts a compressed HttpClientResponse with contentLength to bytes', () async { when(response.contentLength).thenReturn(chunkOne.length); final List bytes = - await consolidateHttpClientResponseBytes(response); + await consolidateHttpClientResponseBytes(response, client: client); expect(bytes, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); }); @@ -59,11 +66,31 @@ void main() { test('Converts an HttpClientResponse without contentLength to bytes', () async { when(response.contentLength).thenReturn(-1); final List bytes = - await consolidateHttpClientResponseBytes(response); + await consolidateHttpClientResponseBytes(response, client: client); expect(bytes, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); }); + test('Notifies onBytesReceived for every chunk of bytes', () async { + final int syntheticTotal = (chunkOne.length + chunkTwo.length) * 2; + when(response.contentLength).thenReturn(syntheticTotal); + final List records = []; + await consolidateHttpClientResponseBytes( + response, + client: client, + onBytesReceived: (int cumulative, int total) { + records.addAll([cumulative, total]); + }, + ); + + expect(records, [ + chunkOne.length, + syntheticTotal, + chunkOne.length + chunkTwo.length, + syntheticTotal, + ]); + }); + test('forwards errors from HttpClientResponse', () async { when(response.listen( any, @@ -87,10 +114,108 @@ void main() { }); when(response.contentLength).thenReturn(-1); - expect(consolidateHttpClientResponseBytes(response), + expect(consolidateHttpClientResponseBytes(response, client: client), throwsA(isInstanceOf())); }); + + test('Propagates error to Future return value if onBytesReceived throws', () async { + when(response.contentLength).thenReturn(-1); + final Future> result = consolidateHttpClientResponseBytes( + response, + client: client, + onBytesReceived: (int cumulative, int total) { + throw 'misbehaving callback'; + }, + ); + + expect(result, throwsA(equals('misbehaving callback'))); + }); + + group('when gzipped', () { + final List gzipped = gzip.encode(chunkOne.followedBy(chunkTwo).toList()); + final List gzippedChunkOne = gzipped.sublist(0, gzipped.length ~/ 2); + final List gzippedChunkTwo = gzipped.sublist(gzipped.length ~/ 2); + + setUp(() { + when(headers.value(HttpHeaders.contentEncodingHeader)).thenReturn('gzip'); + when(response.listen( + any, + onDone: anyNamed('onDone'), + onError: anyNamed('onError'), + cancelOnError: anyNamed('cancelOnError'), + )).thenAnswer((Invocation invocation) { + final void Function(List) onData = invocation.positionalArguments[0]; + final void Function(Object) onError = invocation.namedArguments[#onError]; + final void Function() onDone = invocation.namedArguments[#onDone]; + final bool cancelOnError = invocation.namedArguments[#cancelOnError]; + + return Stream>.fromIterable( + >[gzippedChunkOne, gzippedChunkTwo]).listen( + onData, + onDone: onDone, + onError: onError, + cancelOnError: cancelOnError, + ); + }); + }); + + test('Uncompresses GZIP bytes if autoUncompress is true and response.autoUncompress is false', () async { + when(client.autoUncompress).thenReturn(false); + when(response.contentLength).thenReturn(gzipped.length); + final List bytes = await consolidateHttpClientResponseBytes(response, client: client); + expect(bytes, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + }); + + test('returns gzipped bytes if autoUncompress is false and response.autoUncompress is false', () async { + when(client.autoUncompress).thenReturn(false); + when(response.contentLength).thenReturn(gzipped.length); + final List bytes = await consolidateHttpClientResponseBytes(response, client: client, autoUncompress: false); + expect(bytes, gzipped); + }); + + test('Notifies onBytesReceived with gzipped numbers', () async { + when(client.autoUncompress).thenReturn(false); + when(response.contentLength).thenReturn(gzipped.length); + final List records = []; + await consolidateHttpClientResponseBytes( + response, + client: client, + onBytesReceived: (int cumulative, int total) { + records.addAll([cumulative, total]); + }, + ); + + expect(records, [ + gzippedChunkOne.length, + gzipped.length, + gzipped.length, + gzipped.length, + ]); + }); + + test('Notifies onBytesReceived with expectedContentLength of -1 if response.autoUncompress is true', () async { + final int syntheticTotal = (chunkOne.length + chunkTwo.length) * 2; + when(response.contentLength).thenReturn(syntheticTotal); + final List records = []; + await consolidateHttpClientResponseBytes( + response, + client: client, + onBytesReceived: (int cumulative, int total) { + records.addAll([cumulative, total]); + }, + ); + + expect(records, [ + gzippedChunkOne.length, + -1, + gzipped.length, + -1, + ]); + }); + }); }); } +class MockHttpClient extends Mock implements HttpClient {} class MockHttpClientResponse extends Mock implements HttpClientResponse {} +class MockHttpHeaders extends Mock implements HttpHeaders {}