Make Stream.fromIterable handle errors in current and send a done.

The code didn't handle `iterator.current` throwing, and it didn't send a done
event after `iterator.moveNext()` throws.

Fixes #33431.

Bug: http://dartbug.com/33431
Change-Id: Ic8f7b5d52793ea3db30480e7aa69d01a86e93772
Reviewed-on: https://dart-review.googlesource.com/63841
Commit-Queue: Lasse R.H. Nielsen <lrn@google.com>
Reviewed-by: Leaf Petersen <leafp@google.com>
This commit is contained in:
Lasse Reichstein Holst Nielsen 2018-08-27 08:06:54 +00:00 committed by commit-bot@chromium.org
parent 07fb992964
commit 72e6353c32
8 changed files with 225 additions and 82 deletions

View file

@ -13,6 +13,10 @@
#### Other Tools
### Core library changes
* `dart:async`
* Update `Stream.fromIterable` to send a done event after the the error when
the iterator's `moveNext` throws, and handle if the `current` getter throws.
Issue [33431](http://dartbug.com/33431).
## 2.1.0-dev.2.0

View file

@ -98,6 +98,7 @@ import "dart:_internal"
show
CastStream,
CastStreamTransformer,
EmptyIterator,
printToZone,
printToConsole,
IterableElementError;

View file

@ -159,19 +159,21 @@ abstract class Stream<T> {
}
/**
* Creates a single-subscription stream that gets its data from [data].
* Creates a single-subscription stream that gets its data from [elements].
*
* The iterable is iterated when the stream receives a listener, and stops
* iterating if the listener cancels the subscription.
* iterating if the listener cancels the subscription, or if the
* [Iterator.moveNext] method returns `false` or throws.
* Iteration is suspended whild the stream subscription is paused.
*
* If iterating [data] throws an error, the stream ends immediately with
* that error. No done event will be sent (iteration is not complete), but no
* further data events will be generated either, since iteration cannot
* continue.
* If calling [Iterator.moveNext] on `elements.iterator` throws,
* the stream emits that error and then it closes.
* If reading [Iterator.current] on `elements.iterator` throws,
* the stream emits that error, but keeps iterating.
*/
factory Stream.fromIterable(Iterable<T> data) {
factory Stream.fromIterable(Iterable<T> elements) {
return new _GeneratedStreamImpl<T>(
() => new _IterablePendingEvents<T>(data));
() => new _IterablePendingEvents<T>(elements));
}
/**

View file

@ -526,22 +526,29 @@ class _IterablePendingEvents<T> extends _PendingEvents<T> {
}
// Send one event per call to moveNext.
// If moveNext returns true, send the current element as data.
// If current throws, send that error, but keep iterating.
// If moveNext returns false, send a done event and clear the _iterator.
// If moveNext throws an error, send an error and clear the _iterator.
// After an error, no further events will be sent.
bool isDone;
// If moveNext throws an error, send an error and prepare to send a done
// event afterwards.
bool hasMore;
try {
isDone = !_iterator.moveNext();
hasMore = _iterator.moveNext();
if (hasMore) {
dispatch._sendData(_iterator.current);
} else {
_iterator = null;
dispatch._sendDone();
}
} catch (e, s) {
_iterator = null;
dispatch._sendError(e, s);
return;
}
if (!isDone) {
dispatch._sendData(_iterator.current);
} else {
_iterator = null;
dispatch._sendDone();
if (hasMore == null) {
// Threw in .moveNext().
// Ensure that we send a done afterwards.
_iterator = const EmptyIterator<Null>();
dispatch._sendError(e, s);
} else {
// Threw in .current.
dispatch._sendError(e, s);
}
}
}

View file

@ -907,6 +907,7 @@ Utils/tests/Expect/identical_A01_t01: CompileTimeError # Uses Dart 1 constants
Utils/tests/Expect/setEquals_A01_t02: CompileTimeError # Uses Dart 1 constants
[ !$strong ]
LibTest/async/Stream/Stream.fromIterable_A02_t01: RuntimeError # Assumes no close after error.
LibTest/async/Stream/firstWhere_A01_t01: RuntimeError # co19 issue 137
LibTest/async/Stream/firstWhere_A02_t01: RuntimeError # co19 issue 137
LibTest/async/Stream/lastWhere_A01_t01: RuntimeError # co19 issue 137

View file

@ -2,110 +2,127 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
// Test merging streams.
library dart.test.stream_from_iterable;
// Test Stream.fromIterable.
import 'dart:async';
import 'package:async_helper/async_helper.dart';
import 'package:expect/expect.dart';
import 'package:expect/async_minitest.dart';
import 'event_helper.dart';
class IterableTest<T> {
static int counter = 0;
Iterable<T> iterable;
IterableTest(this.iterable);
void run() {
test("stream from iterable ${counter++}", () {
Events expected = new Events.fromIterable(iterable);
Stream<T> stream = new Stream<T>.fromIterable(iterable);
Events actual = new Events.capture(stream);
actual.onDone(expectAsync(() {
Expect.listEquals(expected.events, actual.events);
}));
});
}
// Tests that various typed iterables that do not throw creates
// suitable streams.
void iterableTest<T>(Iterable<T> iterable) {
asyncStart();
List<T> expected = iterable.toList();
Stream<T> stream = new Stream<T>.fromIterable(iterable);
var events = <T>[];
stream.listen(events.add, onDone: () {
Expect.listEquals(expected, events, "fromIterable $iterable");
asyncEnd();
});
}
main() {
new IterableTest([]).run();
new IterableTest([1]).run();
new IterableTest([1, "two", true, null]).run();
new IterableTest<int>([1, 2, 3, 4]).run();
new IterableTest<String>(["one", "two", "three", "four"]).run();
new IterableTest<int>(new Iterable<int>.generate(1000, (i) => i)).run();
new IterableTest<String>(
new Iterable<int>.generate(1000, (i) => i).map((i) => "$i")).run();
asyncStart();
iterableTest<Object>(<Object>[]);
iterableTest<Null>(<Null>[]);
iterableTest<Object>(<int>[1]);
iterableTest<Object>(<Object>[1, "two", true, null]);
iterableTest<int>(<int>[1, 2, 3, 4]);
iterableTest<String>(<String>["one", "two", "three", "four"]);
iterableTest<int>(new Iterable<int>.generate(1000, (i) => i));
iterableTest<String>(
new Iterable<int>.generate(1000, (i) => i).map((i) => "$i"));
Iterable<int> iter = new Iterable.generate(25, (i) => i * 2);
test("iterable-toList", () {
new Stream.fromIterable(iter).toList().then(expectAsync((actual) {
{
// Test that the stream's .toList works.
asyncStart();
new Stream.fromIterable(iter).toList().then((actual) {
List expected = iter.toList();
Expect.equals(25, expected.length);
Expect.listEquals(expected, actual);
}));
});
asyncEnd();
});
}
test("iterable-mapped-toList", () {
new Stream.fromIterable(iter)
.map((i) => i * 3)
.toList()
.then(expectAsync((actual) {
{
// Test that the stream's .map works.
asyncStart();
new Stream.fromIterable(iter).map((i) => i * 3).toList().then((actual) {
List expected = iter.map((i) => i * 3).toList();
Expect.listEquals(expected, actual);
}));
});
asyncEnd();
});
}
{
// Test that pause works.
asyncStart();
int ctr = 0;
var stream = new Stream<int>.fromIterable(iter.map((x) {
ctr++;
return x;
}));
test("iterable-paused", () {
var stream = new Stream<int>.fromIterable(iter);
Events actual = new Events();
StreamSubscription subscription;
var actual = [];
subscription = stream.listen((int value) {
actual.add(value);
// Do a 10 ms pause during the playback of the iterable.
Duration duration = const Duration(milliseconds: 10);
if (value == 20) {
subscription.pause(new Future.delayed(duration, () {}));
asyncStart();
int beforeCtr = ctr;
subscription.pause(new Future.delayed(duration, () {}).whenComplete(() {
Expect.equals(beforeCtr, ctr);
asyncEnd();
}));
}
}, onDone: expectAsync(() {
actual.close();
Events expected = new Events.fromIterable(iter);
Expect.listEquals(expected.events, actual.events);
}));
});
}, onDone: () {
Expect.listEquals(iter.toList(), actual);
asyncEnd();
});
}
test("iterable-single-subscription", () {
{
// Test that you can't listen twice..
Stream stream = new Stream.fromIterable(iter);
stream.listen((x) {});
Expect.throwsStateError(() => stream.listen((x) {}));
});
stream.listen((x) {}).cancel();
Expect.throws<StateError>(() => stream.listen((x) {}));
}
test("regression-14332", () {
{
// Regression test for http://dartbug.com/14332.
// This should succeede.
// This should succeed.
var from = new Stream.fromIterable([1, 2, 3, 4, 5]);
var c = new StreamController();
var sink = c.sink;
var done = expectAsync(() {}, count: 2);
asyncStart(2);
// if this goes first, test failed (hanged). Swapping addStream and toList
// made failure go away.
sink.addStream(from).then((_) {
c.close();
done();
asyncEnd();
});
c.stream.toList().then((x) {
Expect.listEquals([1, 2, 3, 4, 5], x);
done();
asyncEnd();
});
});
}
test("regression-14334-b", () {
{
// Regression test for issue 14334 (#2)
var from = new Stream.fromIterable([1, 2, 3, 4, 5]);
// odd numbers as data events, even numbers as error events
@ -113,17 +130,128 @@ main() {
var c = new StreamController();
var done = expectAsync(() {}, count: 2);
asyncStart();
var data = [], errors = [];
c.stream.listen(data.add, onError: errors.add, onDone: () {
Expect.listEquals([1, 3, 5], data);
Expect.listEquals([2, 4], errors);
done();
asyncEnd();
});
c.addStream(from).then((_) {
c.close();
done();
});
});
}
{
// Example from issue http://dartbug.com/33431.
asyncStart();
var asyncStarStream = () async* {
yield 1;
yield 2;
throw "bad";
}();
collectEvents(asyncStarStream).then((events2) {
Expect.listEquals(["value", 1, "value", 2, "error", "bad"], events2);
asyncEnd();
});
Iterable<int> throwingIterable() sync* {
yield 1;
yield 2;
throw "bad";
}
// Sanity check behavior.
var it = throwingIterable().iterator;
Expect.isTrue(it.moveNext());
Expect.equals(1, it.current);
Expect.isTrue(it.moveNext());
Expect.equals(2, it.current);
Expect.throws(it.moveNext, (e) => e == "bad");
asyncStart();
var syncStarStream = new Stream<int>.fromIterable(throwingIterable());
collectEvents(syncStarStream).then((events1) {
Expect.listEquals(["value", 1, "value", 2, "error", "bad"], events1);
asyncEnd();
});
}
{
// Test error behavior. Changed when fixing issue 33431.
// Iterable where "current" throws for third value, moveNext on fifth call.
var m = new MockIterable<int>((n) {
return n != 5 || (throw "moveNext");
}, (n) {
return n != 3 ? n : throw "current";
});
asyncStart();
collectEvents(new Stream<int>.fromIterable(m)).then((events) {
// Error on "current" does not stop iteration.
// Error on "moveNext" does.
Expect.listEquals([
"value",
1,
"value",
2,
"error",
"current",
"value",
4,
"error",
"moveNext"
], events);
asyncEnd();
});
}
asyncEnd();
}
// Collects value and error events in a list.
// Value events preceeded by "value", error events by "error".
// Completes on done event.
Future<List<Object>> collectEvents(Stream<Object> stream) {
var c = new Completer<List<Object>>();
var events = <Object>[];
stream.listen((value) {
events..add("value")..add(value);
}, onError: (error) {
events..add("error")..add(error);
}, onDone: () {
c.complete(events);
});
return c.future;
}
// Mock iterable.
// A `MockIterable<T>(f1, f2)` calls `f1` on `moveNext` calls with incrementing
// values starting from 1. Calls `f2` on `current` access, with the same integer
// as the most recent `f1` call.
class MockIterable<T> extends Iterable<T> {
final bool Function(int) _onMoveNext;
final T Function(int) _onCurrent;
MockIterable(this._onMoveNext, this._onCurrent);
Iterator<T> get iterator => MockIterator(_onMoveNext, _onCurrent);
}
class MockIterator<T> implements Iterator<T> {
final bool Function(int) _onMoveNext;
final T Function(int) _onCurrent;
int _counter = 0;
MockIterator(this._onMoveNext, this._onCurrent);
bool moveNext() {
_counter += 1;
return _onMoveNext(_counter);
}
T get current {
return _onCurrent(_counter);
}
}

View file

@ -121,6 +121,7 @@ isolate/spawn_uri_multi_test/none: RuntimeError # Issue 13544
[ !$strong ]
async/future_test: SkipByDesign # Uses Dart 2 syntax.
async/stream_first_where_test/badType: MissingCompileTimeError
async/stream_from_iterable_test: SkipByDesign # Uses Dart 2 syntax.
async/stream_last_where_test/badType: MissingCompileTimeError
mirrors/redirecting_factory_different_type_test/02: MissingCompileTimeError

View file

@ -59,7 +59,6 @@ async/schedule_microtask3_test: RuntimeError
async/schedule_microtask5_test: RuntimeError
async/stream_controller_async_test: RuntimeError
async/stream_first_where_test: RuntimeError
async/stream_from_iterable_test: RuntimeError
async/stream_iterator_test: RuntimeError
async/stream_join_test: RuntimeError
async/stream_last_where_test: RuntimeError