dart-sdk/tests/lib/async/stream_iterator_test.dart
Lasse R.H. Nielsen a15025fc85 Harden StreamIterator against out-of-order events.
Users have seen event being sent *during* the `listen` call
to the underlying stream. This caught the `StreamIterator`
in an unanticipated state (subscription wasn't available yet).

Sending events during a `listen` call can currently happen
using a synchronous broadcast stream controller adding
events in the `onListen` callback.
That should be fixed so that broadcast stream subscriptions
are treated as paused during the `onListen` call like
single-subscription streams already are.

This change hardens the class against that particular malpratice,
but it's still possible to get into inconsistent states if the
`listen` call somehow manages to call back into the same
stream-iterator again. So, don't do that.

(The class also assumes that no stream will send events/call callbacks
* while paused.
* after cancelling
* after a done event
* while delivering another event.

If a stream does so, things will still crash.
It is believed that no platform stream will do any of these.)

Fixes #43779.
BUG= http://dartbug.com/43779

Change-Id: If47065cfa9a1115425fdf51b147f2ed7154fef99
Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/167800
Reviewed-by: Nate Bosch <nbosch@google.com>
Reviewed-by: Lasse R.H. Nielsen <lrn@google.com>
Commit-Queue: Lasse R.H. Nielsen <lrn@google.com>
2020-10-16 20:20:02 +00:00

120 lines
3.7 KiB
Dart

// Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:async_helper/async_minitest.dart';
main() {
test("stream iterator basic", () async {
var stream = createStream();
StreamIterator iterator = new StreamIterator(stream);
expect(iterator.current, isNull);
expect(await iterator.moveNext(), isTrue);
expect(iterator.current, 42);
expect(await iterator.moveNext(), isTrue);
expect(iterator.current, 37);
expect(await iterator.moveNext(), isFalse);
expect(iterator.current, isNull);
expect(await iterator.moveNext(), isFalse);
});
test("stream iterator prefilled", () async {
Stream stream = createStream();
StreamIterator iterator = new StreamIterator(stream);
await new Future.delayed(Duration.zero);
expect(iterator.current, isNull);
expect(await iterator.moveNext(), isTrue);
expect(iterator.current, 42);
expect(await iterator.moveNext(), isTrue);
expect(iterator.current, 37);
expect(await iterator.moveNext(), isFalse);
expect(iterator.current, isNull);
expect(await iterator.moveNext(), isFalse);
});
test("stream iterator error", () async {
Stream stream = createErrorStream();
StreamIterator iterator = new StreamIterator(stream);
expect(await iterator.moveNext(), isTrue);
expect(iterator.current, 42);
var hasNext = iterator.moveNext();
expect(hasNext, throwsA("BAD")); // This is an async expectation,
await hasNext.catchError((_) {
// so we have to wait for the future too.
return false;
});
expect(iterator.current, isNull);
expect(await iterator.moveNext(), isFalse);
expect(iterator.current, isNull);
});
test("stream iterator current/moveNext during move", () async {
Stream stream = createStream();
StreamIterator iterator = new StreamIterator(stream);
var hasNext = iterator.moveNext();
expect(iterator.moveNext, throwsStateError);
expect(await hasNext, isTrue);
expect(iterator.current, 42);
iterator.cancel();
});
test("stream iterator error during cancel", () async {
Stream stream = createCancelErrorStream();
StreamIterator iterator = new StreamIterator(stream);
for (int i = 0; i < 10; i++) {
expect(await iterator.moveNext(), isTrue);
expect(iterator.current, i);
}
var hasNext = iterator.moveNext(); // active moveNext will be completed.
var cancel = iterator.cancel();
expect(cancel, throwsA("BAD"));
expect(await hasNext, isFalse);
expect(await iterator.moveNext(), isFalse);
});
test("regression 43799 (1)", () async {
// See: https://github.com/dart-lang/sdk/issues/43799
var badStream = StreamController<int>.broadcast(sync: true);
badStream.onListen = () {
badStream.add(1);
badStream.close();
};
var it = StreamIterator(badStream.stream);
expect(await it.moveNext(), false);
});
test("regression 43799 (2)", () async {
// See: https://github.com/dart-lang/sdk/issues/43799
var badStream = StreamController<int>.broadcast(sync: true);
badStream.onListen = () {
badStream.addError("bad");
};
var it = StreamIterator(badStream.stream);
expect(it.moveNext(), expectAsync(throwsA("bad")));
});
}
Stream createStream() async* {
yield 42;
yield 37;
}
Stream createErrorStream() async* {
yield 42;
// Emit an error without stopping the generator.
yield* (new Future.error("BAD").asStream());
yield 37;
}
/// Create a stream that throws when cancelled.
Stream createCancelErrorStream() async* {
int i = 0;
try {
while (true) yield i++;
} finally {
throw "BAD";
}
}